Skip to main content

kora_cdc/
ring.rs

1//! Per-shard CDC ring buffer.
2//!
3//! Each Kōra shard owns a [`CdcRing`] that records every mutation as a
4//! [`CdcEvent`] with a monotonically increasing sequence number. The ring has
5//! a fixed capacity: once full, the oldest events are silently overwritten.
6//! Consumers detect lost events through the gap flag returned by [`CdcRing::read`].
7
8/// Discriminant for the kind of mutation recorded in a [`CdcEvent`].
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub enum CdcOp {
11    /// A string key was created or overwritten.
12    Set,
13    /// A key was explicitly deleted.
14    Del,
15    /// A key was removed by TTL expiration.
16    Expire,
17    /// A field was set inside a hash.
18    HSet,
19    /// A value was pushed to the head of a list.
20    LPush,
21    /// A value was pushed to the tail of a list.
22    RPush,
23    /// A member was added to a set.
24    SAdd,
25    /// The entire database was flushed.
26    FlushDb,
27}
28
29/// A single captured mutation event.
30///
31/// Every write operation in a shard produces one `CdcEvent`. Events are
32/// immutable once written to the ring and are identified by their unique,
33/// monotonically increasing [`seq`](CdcEvent::seq).
34#[derive(Debug, Clone)]
35pub struct CdcEvent {
36    /// Unique, monotonically increasing sequence number assigned at write time.
37    pub seq: u64,
38    /// Wall-clock timestamp in milliseconds provided by the caller.
39    pub timestamp_ms: u64,
40    /// The kind of mutation that occurred.
41    pub op: CdcOp,
42    /// The affected key. Empty for [`CdcOp::FlushDb`].
43    pub key: Vec<u8>,
44    /// The new value, when the operation carries one (e.g. `Set`, `HSet`).
45    pub value: Option<Vec<u8>>,
46}
47
48/// Fixed-capacity circular buffer for CDC events.
49///
50/// Events are appended with [`push`](CdcRing::push) and assigned a
51/// monotonically increasing sequence number. When the buffer is full the
52/// oldest slot is silently overwritten. Consumers read batches via
53/// [`read`](CdcRing::read) and are informed through
54/// [`CdcReadResult::gap`] when events they have not yet consumed were
55/// evicted.
56pub struct CdcRing {
57    buffer: Vec<Option<CdcEvent>>,
58    capacity: usize,
59    write_seq: u64,
60    start_seq: u64,
61}
62
63impl CdcRing {
64    /// Create a new ring buffer that can hold up to `capacity` events.
65    ///
66    /// # Panics
67    ///
68    /// Panics if `capacity` is zero.
69    pub fn new(capacity: usize) -> Self {
70        assert!(capacity > 0, "CDC ring capacity must be > 0");
71        let mut buffer = Vec::with_capacity(capacity);
72        for _ in 0..capacity {
73            buffer.push(None);
74        }
75        Self {
76            buffer,
77            capacity,
78            write_seq: 0,
79            start_seq: 0,
80        }
81    }
82
83    /// Append a mutation event to the ring, assigning it the next sequence number.
84    ///
85    /// If the buffer is full the oldest event is overwritten and
86    /// [`start_seq`](CdcRing::start_seq) advances accordingly.
87    pub fn push(&mut self, op: CdcOp, key: Vec<u8>, value: Option<Vec<u8>>, timestamp_ms: u64) {
88        let seq = self.write_seq;
89        let idx = (seq as usize) % self.capacity;
90
91        self.buffer[idx] = Some(CdcEvent {
92            seq,
93            timestamp_ms,
94            op,
95            key,
96            value,
97        });
98
99        self.write_seq += 1;
100
101        if self.write_seq > self.capacity as u64 {
102            self.start_seq = self.write_seq - self.capacity as u64;
103        }
104    }
105
106    /// Return the sequence number that will be assigned to the next pushed event.
107    pub fn write_seq(&self) -> u64 {
108        self.write_seq
109    }
110
111    /// Return the earliest sequence number still available in the buffer.
112    pub fn start_seq(&self) -> u64 {
113        self.start_seq
114    }
115
116    /// Return the number of events currently retained in the buffer.
117    pub fn len(&self) -> usize {
118        (self.write_seq - self.start_seq) as usize
119    }
120
121    /// Return `true` if no events have been pushed yet.
122    pub fn is_empty(&self) -> bool {
123        self.write_seq == 0
124    }
125
126    /// Read up to `limit` events starting at `from_seq`.
127    ///
128    /// If `from_seq` has already been evicted the read begins at the
129    /// earliest available sequence and [`CdcReadResult::gap`] is set to
130    /// `true`, signalling that the consumer missed events.
131    pub fn read(&self, from_seq: u64, limit: usize) -> CdcReadResult {
132        if from_seq >= self.write_seq {
133            return CdcReadResult {
134                events: vec![],
135                next_seq: self.write_seq,
136                gap: false,
137            };
138        }
139
140        let actual_start = if from_seq < self.start_seq {
141            self.start_seq
142        } else {
143            from_seq
144        };
145
146        let gap = from_seq < self.start_seq;
147        let available = (self.write_seq - actual_start) as usize;
148        let count = available.min(limit);
149
150        let mut events = Vec::with_capacity(count);
151        for i in 0..count {
152            let seq = actual_start + i as u64;
153            let idx = (seq as usize) % self.capacity;
154            if let Some(ref event) = self.buffer[idx] {
155                events.push(event.clone());
156            }
157        }
158
159        CdcReadResult {
160            next_seq: actual_start + count as u64,
161            events,
162            gap,
163        }
164    }
165
166    /// Look up a single event by its sequence number.
167    ///
168    /// Returns `None` if `seq` has been evicted or has not been written yet.
169    pub fn get(&self, seq: u64) -> Option<&CdcEvent> {
170        if seq < self.start_seq || seq >= self.write_seq {
171            return None;
172        }
173        let idx = (seq as usize) % self.capacity;
174        self.buffer[idx].as_ref()
175    }
176}
177
178/// Outcome of a [`CdcRing::read`] call.
179#[derive(Debug)]
180pub struct CdcReadResult {
181    /// The batch of events returned by this read.
182    pub events: Vec<CdcEvent>,
183    /// The sequence number the caller should pass to the next read.
184    pub next_seq: u64,
185    /// `true` when the requested `from_seq` had already been evicted,
186    /// meaning the consumer missed one or more events.
187    pub gap: bool,
188}
189
190#[cfg(test)]
191mod tests {
192    use super::*;
193
194    #[test]
195    fn test_push_and_read() {
196        let mut ring = CdcRing::new(10);
197        ring.push(CdcOp::Set, b"k1".to_vec(), Some(b"v1".to_vec()), 100);
198        ring.push(CdcOp::Set, b"k2".to_vec(), Some(b"v2".to_vec()), 200);
199        ring.push(CdcOp::Del, b"k1".to_vec(), None, 300);
200
201        assert_eq!(ring.len(), 3);
202        assert_eq!(ring.write_seq(), 3);
203        assert_eq!(ring.start_seq(), 0);
204
205        let result = ring.read(0, 100);
206        assert_eq!(result.events.len(), 3);
207        assert!(!result.gap);
208        assert_eq!(result.next_seq, 3);
209        assert_eq!(result.events[0].op, CdcOp::Set);
210        assert_eq!(result.events[0].key, b"k1");
211        assert_eq!(result.events[2].op, CdcOp::Del);
212    }
213
214    #[test]
215    fn test_read_with_limit() {
216        let mut ring = CdcRing::new(10);
217        for i in 0..5 {
218            ring.push(CdcOp::Set, format!("k{}", i).into_bytes(), None, i as u64);
219        }
220
221        let result = ring.read(0, 2);
222        assert_eq!(result.events.len(), 2);
223        assert_eq!(result.next_seq, 2);
224
225        let result = ring.read(2, 2);
226        assert_eq!(result.events.len(), 2);
227        assert_eq!(result.next_seq, 4);
228    }
229
230    #[test]
231    fn test_wrapping() {
232        let mut ring = CdcRing::new(4);
233        for i in 0..10 {
234            ring.push(CdcOp::Set, format!("k{}", i).into_bytes(), None, i as u64);
235        }
236
237        assert_eq!(ring.len(), 4);
238        assert_eq!(ring.start_seq(), 6);
239        assert_eq!(ring.write_seq(), 10);
240
241        // Can only read the last 4 events
242        let result = ring.read(0, 100);
243        assert!(result.gap); // consumer was behind
244        assert_eq!(result.events.len(), 4);
245        assert_eq!(result.events[0].seq, 6);
246        assert_eq!(result.events[3].seq, 9);
247    }
248
249    #[test]
250    fn test_gap_detection() {
251        let mut ring = CdcRing::new(4);
252        for i in 0..10 {
253            ring.push(CdcOp::Set, format!("k{}", i).into_bytes(), None, i as u64);
254        }
255
256        // Consumer at seq 3 has fallen behind (events 3-5 are lost)
257        let result = ring.read(3, 100);
258        assert!(result.gap);
259        assert_eq!(result.events[0].seq, 6); // starts from earliest available
260    }
261
262    #[test]
263    fn test_read_at_write_head() {
264        let mut ring = CdcRing::new(10);
265        ring.push(CdcOp::Set, b"k".to_vec(), None, 0);
266
267        let result = ring.read(1, 100);
268        assert!(result.events.is_empty());
269        assert_eq!(result.next_seq, 1);
270    }
271
272    #[test]
273    fn test_get_single() {
274        let mut ring = CdcRing::new(10);
275        ring.push(CdcOp::Set, b"k0".to_vec(), None, 0);
276        ring.push(CdcOp::Del, b"k1".to_vec(), None, 1);
277
278        let event = ring.get(0).unwrap();
279        assert_eq!(event.op, CdcOp::Set);
280
281        let event = ring.get(1).unwrap();
282        assert_eq!(event.op, CdcOp::Del);
283
284        assert!(ring.get(2).is_none());
285    }
286
287    #[test]
288    fn test_empty_ring() {
289        let ring = CdcRing::new(10);
290        assert!(ring.is_empty());
291        assert_eq!(ring.len(), 0);
292
293        let result = ring.read(0, 100);
294        assert!(result.events.is_empty());
295    }
296
297    #[test]
298    fn test_all_ops() {
299        let mut ring = CdcRing::new(100);
300        let ops = [
301            CdcOp::Set,
302            CdcOp::Del,
303            CdcOp::Expire,
304            CdcOp::HSet,
305            CdcOp::LPush,
306            CdcOp::RPush,
307            CdcOp::SAdd,
308            CdcOp::FlushDb,
309        ];
310
311        for (i, op) in ops.iter().enumerate() {
312            ring.push(op.clone(), format!("k{}", i).into_bytes(), None, i as u64);
313        }
314
315        let result = ring.read(0, 100);
316        assert_eq!(result.events.len(), 8);
317        for (i, event) in result.events.iter().enumerate() {
318            assert_eq!(event.op, ops[i]);
319        }
320    }
321}