crabka-broker 0.3.6

Single-node Apache Kafka-compatible broker (MVP)
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
//! Share-partition leader manager (KIP-932).
//!
//! Owns one [`AcquisitionState`] machine per `(group, topic_id, partition)`
//! this broker leads, lazily loaded from the durable `SharePersister` and
//! re-persisted whenever it goes dirty. The `ShareFetch`/`ShareAcknowledge`
//! handlers drive the per-cell state under its `tokio::sync::Mutex`; a
//! background sweeper expires acquisition locks.
//!
//! Locking discipline: the `DashMap` guard is NEVER held across an `.await`.
//! Callers clone the cell `Arc` out of the map first, then lock and await.

use std::sync::Arc;
use std::time::Duration;

use dashmap::DashMap;
use tokio::sync::Mutex;
use tracing::warn;

use crabka_metadata::NodeId;

use crate::coordinator::unified::share::config::ShareGroupConfig;
use crate::metadata_source::MetadataSource;
use crate::partition_registry::PartitionRegistry;
use crate::share_coordinator::persister_client::SharePersister;
use crate::share_partition::session::ShareSessionCache;
use crate::share_partition::state::AcquisitionState;

/// Live acquisition-state machines keyed by `(group, topic_id, partition)`.
type LeaderKey = (String, uuid::Uuid, i32);

/// Per-broker owner of the share-partition acquisition state machines for the
/// `(group, topic, partition)` triples this broker leads.
pub(crate) struct SharePartitionLeaderManager {
    node_id: NodeId,
    partitions: Arc<PartitionRegistry>,
    controller: Arc<dyn MetadataSource>,
    persister: Arc<SharePersister>,
    config: Arc<ShareGroupConfig>,
    sessions: ShareSessionCache,
    leaders: DashMap<LeaderKey, Arc<Mutex<AcquisitionState>>>,
}

impl std::fmt::Debug for SharePartitionLeaderManager {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("SharePartitionLeaderManager")
            .field("node_id", &self.node_id)
            .field("live_partitions", &self.leaders.len())
            .finish_non_exhaustive()
    }
}

impl SharePartitionLeaderManager {
    pub(crate) fn new(
        node_id: NodeId,
        partitions: Arc<PartitionRegistry>,
        controller: Arc<dyn MetadataSource>,
        persister: Arc<SharePersister>,
        config: Arc<ShareGroupConfig>,
    ) -> Self {
        // The share-session cache is capped at the same per-broker session
        // ceiling as classic fetch sessions; `max_groups` of 0 means
        // "unbounded" in `ShareGroupConfig`, so fall back to a generous cap.
        let session_max = if config.max_groups == 0 {
            10_000
        } else {
            config.max_groups.saturating_mul(config.max_size.max(1))
        };
        Self {
            node_id,
            partitions,
            controller,
            persister,
            config,
            sessions: ShareSessionCache::new(session_max),
            leaders: DashMap::new(),
        }
    }

    /// Validate (and advance) the share session for `(group, member)`. See
    /// [`ShareSessionCache::validate`].
    pub(crate) fn validate_session(
        &self,
        group: &str,
        member: &str,
        epoch: i32,
    ) -> Result<(), i16> {
        self.sessions.validate(group, member, epoch)
    }

    /// Resolve the wire `(leader_id, leader_epoch)` for `(topic_id, partition)`
    /// from the metadata image, for the `current_leader` redirect hint a
    /// not-leader `ShareFetch`/`ShareAcknowledge` response carries. Returns
    /// `(-1, -1)` when the topic/partition is unknown.
    pub(crate) fn current_leader_of(&self, topic_id: uuid::Uuid, partition: i32) -> (i32, i32) {
        let image = self.controller.current_image();
        let Some(topic) = image.topics().find(|t| t.topic_id == topic_id) else {
            return (-1, -1);
        };
        image
            .partition(&topic.name, partition)
            .map_or((-1, -1), |p| {
                (i32::try_from(p.leader).unwrap_or(-1), p.leader_epoch)
            })
    }

    /// Resolve the data-topic name for `topic_id` from the metadata image, or
    /// `None` when the id is unknown. The share path carries only `topic_id`;
    /// the handlers need the name to look up the local [`PartitionRegistry`]
    /// entry and to key per-topic `Read` ACL checks.
    pub(crate) fn topic_name_for(&self, topic_id: uuid::Uuid) -> Option<String> {
        self.controller
            .current_image()
            .topics()
            .find(|t| t.topic_id == topic_id)
            .map(|t| t.name.clone())
    }

    /// Returns `true` if this broker is the topic-partition leader for the
    /// data topic identified by `topic_id`. Resolves the topic name from the
    /// metadata image (the share path carries only `topic_id`) and compares
    /// the partition leader to `node_id`.
    ///
    /// Consumed by the ShareFetch/ShareAcknowledge handlers.
    pub(crate) fn topic_leader_is_self(&self, topic_id: uuid::Uuid, partition: i32) -> bool {
        let image = self.controller.current_image();
        let Some(topic) = image.topics().find(|t| t.topic_id == topic_id) else {
            return false;
        };
        image
            .partition(&topic.name, partition)
            .is_some_and(|p| p.leader == self.node_id)
    }

    /// Current `leader_epoch` for `(topic_id, partition)` from the local
    /// partition's atomic, or `0` when the partition isn't materialized here.
    fn leader_epoch_for(&self, topic_id: uuid::Uuid, partition: i32) -> i32 {
        let image = self.controller.current_image();
        let Some(topic) = image.topics().find(|t| t.topic_id == topic_id) else {
            return 0;
        };
        self.partitions.get(&topic.name, partition).map_or(0, |p| {
            p.current_leader_epoch
                .load(std::sync::atomic::Ordering::Acquire)
        })
    }

    /// Get (or lazily load) the acquisition-state cell for
    /// `(group, topic_id, partition)`.
    ///
    /// On a cache miss the durable state is read from the persister and folded
    /// into a fresh [`AcquisitionState`] (or an empty one when none exists).
    /// The `DashMap` guard is dropped before the load `.await`; a concurrent
    /// loader losing the insert race adopts the winner's cell.
    ///
    /// Consumed by the ShareFetch/ShareAcknowledge handlers.
    pub(crate) async fn get_or_load(
        &self,
        group: &str,
        topic_id: uuid::Uuid,
        partition: i32,
    ) -> Arc<Mutex<AcquisitionState>> {
        let key = (group.to_string(), topic_id, partition);
        if let Some(cell) = self.leaders.get(&key) {
            return cell.value().clone();
        }

        // Miss: load from the persister WITHOUT holding any DashMap guard.
        let leader_epoch = self.leader_epoch_for(topic_id, partition);
        let loaded = match self.persister.read_state(group, topic_id, partition).await {
            Ok(Some(persisted)) => {
                let mut st = AcquisitionState::new(persisted.start_offset);
                st.load_from(
                    persisted.start_offset,
                    persisted.state_epoch,
                    leader_epoch,
                    persisted.delivery_complete_count,
                    &persisted.state_batches,
                );
                st
            }
            Ok(None) => {
                let mut st = AcquisitionState::new(0);
                st.leader_epoch = leader_epoch;
                st
            }
            Err(e) => {
                warn!(
                    group,
                    %topic_id, partition, error = %e,
                    "share-partition state load failed; starting from empty window"
                );
                let mut st = AcquisitionState::new(0);
                st.leader_epoch = leader_epoch;
                st
            }
        };

        let cell = Arc::new(Mutex::new(loaded));
        // Adopt the winner if another task loaded the same key concurrently.
        self.leaders.entry(key).or_insert(cell).value().clone()
    }

    /// Drop the cached acquisition-state cell for `(group, topic_id, partition)`
    /// so the next `get_or_load` re-reads the durable SPSO. The admin offset
    /// RPCs call this after `AlterShareGroupOffsets`/`DeleteShareGroupOffsets`
    /// rewrite the persister state, so an in-flight reset is observed by
    /// subsequent `ShareFetch` on this broker. Cross-broker cells refresh on
    /// their own next load, matching the classic offset-reset behavior.
    pub(crate) fn invalidate(&self, group: &str, topic_id: uuid::Uuid, partition: i32) {
        self.leaders
            .remove(&(group.to_string(), topic_id, partition));
    }

    /// Persist `st` if it's dirty, then clear the dirty flag. Errors are
    /// logged and swallowed — persistence is best-effort and never panics or
    /// fails the calling fetch/ack.
    pub(crate) async fn persist_if_dirty(
        &self,
        group: &str,
        topic_id: uuid::Uuid,
        partition: i32,
        st: &mut AcquisitionState,
    ) {
        if !st.dirty {
            return;
        }
        let (start, dcc, batches) = st.to_persist_batches();
        match self
            .persister
            .write_state(
                group,
                topic_id,
                partition,
                st.state_epoch,
                st.leader_epoch,
                start,
                dcc,
                batches,
            )
            .await
        {
            // Clear `dirty` only on a durable write. On failure we leave it set
            // so the background sweeper (and the next fetch/ack) retries.
            Ok(()) => st.dirty = false,
            Err(e) => warn!(
                group,
                %topic_id, partition, error = %e,
                "share-partition state persist failed; will retry on next change"
            ),
        }
    }

    /// Spawn the background acquisition-lock-timeout sweeper.
    ///
    /// Every `record_lock_duration / 2` (min 100ms) it snapshots the live
    /// cells (cloning their `Arc`s out of the `DashMap` so no guard is held
    /// across an `.await`), expires any timed-out locks, and re-persists the
    /// ones that changed. Runs detached for the broker's lifetime.
    pub(crate) fn spawn_lock_sweeper(self: &Arc<Self>) {
        let mgr = Arc::clone(self);
        let period = (mgr.config.record_lock_duration / 2).max(Duration::from_millis(100));
        tokio::spawn(async move {
            let mut tick = tokio::time::interval(period);
            loop {
                tick.tick().await;
                // Snapshot keys + cells, releasing all DashMap guards first.
                let cells: Vec<(LeaderKey, Arc<Mutex<AcquisitionState>>)> = mgr
                    .leaders
                    .iter()
                    .map(|e| (e.key().clone(), e.value().clone()))
                    .collect();
                let now = std::time::Instant::now();
                for ((group, topic_id, partition), cell) in cells {
                    let mut st = cell.lock().await;
                    st.expire_locks(now);
                    if st.dirty {
                        mgr.persist_if_dirty(&group, topic_id, partition, &mut st)
                            .await;
                    }
                }
            }
        });
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use assert2::assert;
    use std::collections::BTreeSet;
    use std::net::SocketAddr;

    const LOCK: Duration = Duration::from_secs(30);

    use async_trait::async_trait;
    use tokio::sync::watch;

    use crabka_metadata::{MetadataImage, MetadataRecord};
    use crabka_raft::{
        AddVoter, Node, QuorumState, RaftError, ReconfigOutcome, RemoveVoter, SnapshotRange,
        UpdateVoter,
    };
    use crabka_security::ListenerProtocol;

    use crate::network::client::InterBrokerClient;
    use crate::share_coordinator::config::ShareCoordinatorConfig;
    use crate::share_coordinator::coordinator::ShareCoordinator;

    /// Minimal `MetadataSource` over a fixed (empty-of-brokers) image. The
    /// share-state topic can't be bootstrapped against it (no brokers), so the
    /// persister's `read_state` short-circuits with an error before any
    /// routing — exercising `get_or_load`'s best-effort empty-window fallback
    /// without standing up an inter-broker server.
    struct MockSource {
        image: Arc<MetadataImage>,
        leader_rx: watch::Receiver<Option<NodeId>>,
        _leader_tx: watch::Sender<Option<NodeId>>,
    }

    impl MockSource {
        fn new() -> Self {
            let (tx, rx) = watch::channel(Some(1));
            Self {
                image: Arc::new(MetadataImage::new(uuid::Uuid::nil())),
                leader_rx: rx,
                _leader_tx: tx,
            }
        }
    }

    #[async_trait]
    impl MetadataSource for MockSource {
        fn current_image(&self) -> Arc<MetadataImage> {
            self.image.clone()
        }
        fn watch_image(&self) -> watch::Receiver<Arc<MetadataImage>> {
            unimplemented!()
        }
        fn watch_leader(&self) -> watch::Receiver<Option<NodeId>> {
            self.leader_rx.clone()
        }
        fn quorum_state(&self) -> QuorumState {
            unimplemented!()
        }
        async fn submit_change(&self, _records: Vec<MetadataRecord>) -> Result<(), RaftError> {
            Ok(())
        }
        async fn change_membership(&self, _new_voters: BTreeSet<NodeId>) -> Result<(), RaftError> {
            unimplemented!()
        }
        async fn add_learner(&self, _node_id: NodeId, _node: Node) -> Result<(), RaftError> {
            unimplemented!()
        }
        fn controller_bound_addr(&self) -> SocketAddr {
            unimplemented!()
        }
        fn read_snapshot_range(&self, _position: i64, _max_bytes: i32) -> SnapshotRange {
            unimplemented!()
        }
        async fn trigger_snapshot(&self) -> Result<(), RaftError> {
            unimplemented!()
        }
        async fn add_voter(&self, _req: AddVoter) -> Result<ReconfigOutcome, RaftError> {
            unimplemented!()
        }
        async fn remove_voter(&self, _req: RemoveVoter) -> Result<ReconfigOutcome, RaftError> {
            unimplemented!()
        }
        async fn update_voter(&self, _req: UpdateVoter) -> Result<ReconfigOutcome, RaftError> {
            unimplemented!()
        }
        async fn cancel(&self) {}
    }

    fn manager() -> Arc<SharePartitionLeaderManager> {
        let reg = Arc::new(PartitionRegistry::new());
        let controller: Arc<dyn MetadataSource> = Arc::new(MockSource::new());
        let coord = Arc::new(ShareCoordinator::new(
            1,
            reg.clone(),
            ShareCoordinatorConfig::default(),
        ));
        let client = Arc::new(InterBrokerClient::new(None, None));
        let persister = Arc::new(SharePersister::new(
            1,
            coord,
            controller.clone(),
            client,
            ListenerProtocol::Plaintext,
            "INTERNAL".to_string(),
        ));
        Arc::new(SharePartitionLeaderManager::new(
            1,
            reg,
            controller,
            persister,
            Arc::new(ShareGroupConfig::default()),
        ))
    }

    #[tokio::test]
    async fn get_or_load_fresh_returns_empty_window_and_caches() {
        let mgr = manager();
        let tid = uuid::Uuid::from_bytes([21; 16]);

        let cell = mgr.get_or_load("g1", tid, 0).await;
        let st = cell.lock().await;
        assert!(st.start_offset == 0);
        assert!(!st.dirty);
        drop(st);
        // A second call returns the same cached cell.
        let cell2 = mgr.get_or_load("g1", tid, 0).await;
        assert!(Arc::ptr_eq(&cell, &cell2));
    }

    #[tokio::test]
    async fn persist_if_dirty_is_noop_when_clean() {
        let mgr = manager();
        let tid = uuid::Uuid::from_bytes([22; 16]);

        let cell = mgr.get_or_load("g1", tid, 0).await;
        let mut st = cell.lock().await;
        assert!(!st.dirty);
        // Clean state: no-op, no panic, stays clean.
        mgr.persist_if_dirty("g1", tid, 0, &mut st).await;
        assert!(!st.dirty);
    }

    #[tokio::test]
    async fn persist_if_dirty_keeps_dirty_on_write_failure() {
        // Under MockSource the persister can't bootstrap the share-state topic,
        // so `write_state` errors. A failed durable write must leave `dirty`
        // set so the sweeper/next-ack retries (F4 durability fix).
        let mgr = manager();
        let tid = uuid::Uuid::from_bytes([25; 16]);

        let cell = mgr.get_or_load("g1", tid, 0).await;
        let mut st = cell.lock().await;
        // Make the state dirty with persistable content.
        st.materialize(4, 100);
        let _ = st.acquire("m1", 10, i32::MAX, std::time::Instant::now(), LOCK, 5);
        assert!(st.dirty);

        mgr.persist_if_dirty("g1", tid, 0, &mut st).await;
        // Write failed -> dirty stays set for retry.
        assert!(st.dirty);
    }

    #[tokio::test]
    async fn topic_leader_is_self_false_for_unknown_topic() {
        let mgr = manager();
        let tid = uuid::Uuid::from_bytes([23; 16]);
        assert!(!mgr.topic_leader_is_self(tid, 0));
    }

    #[tokio::test]
    async fn invalidate_removes_cached_cell() {
        let mgr = manager();
        let tid = uuid::Uuid::from_bytes([24; 16]);

        // Populate the cache, then invalidate; a subsequent load yields a
        // fresh, distinct cell.
        let cell = mgr.get_or_load("g1", tid, 0).await;
        mgr.invalidate("g1", tid, 0);
        let cell2 = mgr.get_or_load("g1", tid, 0).await;
        assert!(!Arc::ptr_eq(&cell, &cell2));
    }
}