Skip to main content

kuberic_core/replicator/
queue.rs

1use std::collections::BTreeMap;
2
3use bytes::Bytes;
4
5use crate::types::Lsn;
6
7/// In-memory replication queue. Retains ops from first_lsn to highest_lsn.
8/// Ops are GC'd when all configured secondaries have ACKed them.
9///
10/// This serves the same role as SF's ReplicationQueueManager — it bridges
11/// the gap between the copy stream (committed state) and live replication
12/// (new ops) by retaining in-flight ops that can be replayed to new replicas.
13pub struct ReplicationQueue {
14    ops: BTreeMap<Lsn, Bytes>,
15}
16
17impl ReplicationQueue {
18    pub fn new() -> Self {
19        Self {
20            ops: BTreeMap::new(),
21        }
22    }
23
24    /// Append a new op. Called from the data path on every replicate.
25    pub fn push(&mut self, lsn: Lsn, data: Bytes) {
26        self.ops.insert(lsn, data);
27    }
28
29    /// Get all ops from `from_lsn` onward (inclusive).
30    /// Used at add_secondary time to replay pending ops to a new replica.
31    pub fn ops_from(&self, from_lsn: Lsn) -> Vec<(Lsn, Bytes)> {
32        self.ops
33            .range(from_lsn..)
34            .map(|(&lsn, data)| (lsn, data.clone()))
35            .collect()
36    }
37
38    /// Remove all ops with LSN <= `acked_lsn`.
39    /// Called when the minimum ACKed LSN across all secondaries advances.
40    pub fn gc(&mut self, acked_lsn: Lsn) {
41        // split_off returns everything >= acked_lsn + 1, we keep that
42        let keep = self.ops.split_off(&(acked_lsn + 1));
43        self.ops = keep;
44    }
45
46    /// Number of ops retained in the queue.
47    pub fn len(&self) -> usize {
48        self.ops.len()
49    }
50
51    pub fn is_empty(&self) -> bool {
52        self.ops.is_empty()
53    }
54
55    /// Clear all ops (on role change / close).
56    pub fn clear(&mut self) {
57        self.ops.clear();
58    }
59}
60
61impl Default for ReplicationQueue {
62    fn default() -> Self {
63        Self::new()
64    }
65}