kuberic_core/replicator/queue.rs
1use std::collections::BTreeMap;
2
3use bytes::Bytes;
4
5use crate::types::Lsn;
6
7/// In-memory replication queue. Retains ops from first_lsn to highest_lsn.
8/// Ops are GC'd when all configured secondaries have ACKed them.
9///
10/// This serves the same role as SF's ReplicationQueueManager — it bridges
11/// the gap between the copy stream (committed state) and live replication
12/// (new ops) by retaining in-flight ops that can be replayed to new replicas.
13pub struct ReplicationQueue {
14 ops: BTreeMap<Lsn, Bytes>,
15}
16
17impl ReplicationQueue {
18 pub fn new() -> Self {
19 Self {
20 ops: BTreeMap::new(),
21 }
22 }
23
24 /// Append a new op. Called from the data path on every replicate.
25 pub fn push(&mut self, lsn: Lsn, data: Bytes) {
26 self.ops.insert(lsn, data);
27 }
28
29 /// Get all ops from `from_lsn` onward (inclusive).
30 /// Used at add_secondary time to replay pending ops to a new replica.
31 pub fn ops_from(&self, from_lsn: Lsn) -> Vec<(Lsn, Bytes)> {
32 self.ops
33 .range(from_lsn..)
34 .map(|(&lsn, data)| (lsn, data.clone()))
35 .collect()
36 }
37
38 /// Remove all ops with LSN <= `acked_lsn`.
39 /// Called when the minimum ACKed LSN across all secondaries advances.
40 pub fn gc(&mut self, acked_lsn: Lsn) {
41 // split_off returns everything >= acked_lsn + 1, we keep that
42 let keep = self.ops.split_off(&(acked_lsn + 1));
43 self.ops = keep;
44 }
45
46 /// Number of ops retained in the queue.
47 pub fn len(&self) -> usize {
48 self.ops.len()
49 }
50
51 pub fn is_empty(&self) -> bool {
52 self.ops.is_empty()
53 }
54
55 /// Clear all ops (on role change / close).
56 pub fn clear(&mut self) {
57 self.ops.clear();
58 }
59}
60
61impl Default for ReplicationQueue {
62 fn default() -> Self {
63 Self::new()
64 }
65}