Skip to main content

reddb_server/replication/
cdc.rs

1//! Change Data Capture (CDC) — stream of database change events.
2//!
3//! Exposes entity mutations (insert, update, delete) as a pollable event stream.
4//! Consumers poll with a cursor (LSN) to receive new events since their last position.
5
6use std::collections::VecDeque;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::time::{SystemTime, UNIX_EPOCH};
9
10use crate::json::{Map, Value as JsonValue};
11
12pub use reddb_wire::replication::{public_item_kind, ChangeOperation, ChangeRecord};
13
14/// A single change event.
15#[derive(Debug, Clone)]
16pub struct ChangeEvent {
17    /// Monotonically increasing sequence number
18    pub lsn: u64,
19    /// When the change occurred (unix ms)
20    pub timestamp: u64,
21    /// Type of operation
22    pub operation: ChangeOperation,
23    /// Collection name
24    pub collection: String,
25    /// Entity ID affected
26    pub entity_id: u64,
27    /// Entity kind (table, graph_node, graph_edge, vector, etc.)
28    pub entity_kind: String,
29    /// For `Update` events, the list of column names whose values
30    /// changed (including added/removed columns). `None` when the
31    /// emitter didn't compute a damage vector (inserts, deletes, and
32    /// coarse-grained update paths that haven't been rewired yet).
33    /// Downstream CDC consumers can use this to skip replaying
34    /// updates that touched columns they don't care about.
35    pub changed_columns: Option<Vec<String>>,
36    /// Optional KV-specific payload for WATCH subscribers. Existing CDC
37    /// consumers can ignore this field and continue using the generic shape.
38    pub kv: Option<KvWatchEvent>,
39}
40
41impl ChangeEvent {
42    pub fn rid(&self) -> u64 {
43        self.entity_id
44    }
45
46    pub fn kind(&self) -> &'static str {
47        public_item_kind(&self.entity_kind)
48    }
49}
50
51/// A committed single-key KV change surfaced by WATCH.
52#[derive(Debug, Clone, PartialEq)]
53pub struct KvWatchEvent {
54    pub collection: String,
55    pub key: String,
56    pub op: ChangeOperation,
57    pub before: Option<JsonValue>,
58    pub after: Option<JsonValue>,
59    pub lsn: u64,
60    pub committed_at: u64,
61    pub dropped_event_count: u64,
62}
63
64impl KvWatchEvent {
65    pub fn to_json_value(&self) -> JsonValue {
66        let mut object = Map::new();
67        object.insert("key".to_string(), JsonValue::String(self.key.clone()));
68        object.insert(
69            "op".to_string(),
70            JsonValue::String(self.op.as_str().to_string()),
71        );
72        object.insert(
73            "before".to_string(),
74            self.before.clone().unwrap_or(JsonValue::Null),
75        );
76        object.insert(
77            "after".to_string(),
78            self.after.clone().unwrap_or(JsonValue::Null),
79        );
80        object.insert("lsn".to_string(), JsonValue::Number(self.lsn as f64));
81        object.insert(
82            "committed_at".to_string(),
83            JsonValue::Number(self.committed_at as f64),
84        );
85        object.insert(
86            "dropped_event_count".to_string(),
87            JsonValue::Number(self.dropped_event_count as f64),
88        );
89        JsonValue::Object(object)
90    }
91}
92
93pub fn change_record_from_entity(
94    lsn: u64,
95    timestamp: u64,
96    operation: ChangeOperation,
97    collection: impl Into<String>,
98    entity_kind: impl Into<String>,
99    entity: &crate::storage::UnifiedEntity,
100    format_version: u32,
101    metadata: Option<JsonValue>,
102) -> ChangeRecord {
103    let entity_bytes = match operation {
104        ChangeOperation::Delete | ChangeOperation::Refresh => None,
105        ChangeOperation::Insert | ChangeOperation::Update => Some(
106            crate::storage::UnifiedStore::serialize_entity(entity, format_version),
107        ),
108    };
109
110    ChangeRecord {
111        term: crate::replication::DEFAULT_REPLICATION_TERM,
112        lsn,
113        timestamp,
114        operation,
115        collection: collection.into(),
116        entity_id: entity.id.raw(),
117        entity_kind: entity_kind.into(),
118        entity_bytes,
119        metadata: metadata.map(server_json_to_wire_json),
120        refresh_records: None,
121    }
122}
123
124pub fn server_json_to_wire_json(
125    value: JsonValue,
126) -> reddb_wire::replication::ChangeRecordJsonValue {
127    reddb_wire::replication::parse_change_record_json_value(&value.to_string_compact())
128        .unwrap_or(reddb_wire::replication::ChangeRecordJsonValue::Null)
129}
130
131pub fn wire_json_to_server_json(
132    value: &reddb_wire::replication::ChangeRecordJsonValue,
133) -> JsonValue {
134    crate::json::from_str(&reddb_wire::replication::change_record_json_value_to_string(value))
135        .unwrap_or(JsonValue::Null)
136}
137
138/// CDC event buffer — circular buffer of change events.
139///
140/// Splits the "next LSN" counter (write-contended on every emit)
141/// from the event ring (short-hold push/pop) so that concurrent
142/// emitters don't serialise on a single RwLock. The previous
143/// design used one `RwLock<CdcState>` that turned every insert
144/// into a write-lock acquire, capping 16-way concurrent writes
145/// at ~1000 ops/s (each writer paid ~1ms queueing for the same
146/// mutex even though the work it guarded was a one-line VecDeque
147/// push).
148///
149/// New layout:
150///   - LSN is an `AtomicU64`, assigned with `fetch_add(1)`.
151///     Zero contention.
152///   - Events are guarded by a `parking_lot::Mutex<VecDeque>`.
153///     The critical section is `pop_front (if full) + push_back`
154///     — microseconds at most, parking-free at low contention.
155///
156/// Readers (`poll`, `current_lsn`, `stats`) take the same mutex
157/// briefly; they're cold paths compared to the write hot path.
158pub struct CdcBuffer {
159    next_lsn: AtomicU64,
160    events: parking_lot::Mutex<VecDeque<ChangeEvent>>,
161    max_size: usize,
162}
163
164impl CdcBuffer {
165    /// Create a new CDC buffer with maximum capacity.
166    pub fn new(max_size: usize) -> Self {
167        Self {
168            next_lsn: AtomicU64::new(0),
169            events: parking_lot::Mutex::new(VecDeque::with_capacity(max_size.min(10_000))),
170            max_size,
171        }
172    }
173
174    /// Emit a change event into the buffer. `changed_columns`
175    /// defaults to `None` for backwards compatibility; call sites
176    /// that have a damage vector available should use
177    /// [`Self::emit_with_columns`] instead.
178    pub fn emit(
179        &self,
180        operation: ChangeOperation,
181        collection: &str,
182        entity_id: u64,
183        entity_kind: &str,
184    ) -> u64 {
185        self.emit_with_columns(operation, collection, entity_id, entity_kind, None)
186    }
187
188    /// Emit a change event with an optional list of column names
189    /// that were affected. Use from update paths that have already
190    /// computed a [`RowDamageVector`](crate::application::entity::RowDamageVector)
191    /// so CDC consumers can filter by touched column without re-diffing.
192    pub fn emit_with_columns(
193        &self,
194        operation: ChangeOperation,
195        collection: &str,
196        entity_id: u64,
197        entity_kind: &str,
198        changed_columns: Option<Vec<String>>,
199    ) -> u64 {
200        // LSN assignment is lock-free — multiple emitters each get a
201        // unique monotonic LSN without waiting on any other emitter.
202        let event_lsn = self.next_lsn.fetch_add(1, Ordering::AcqRel) + 1;
203
204        let event = ChangeEvent {
205            lsn: event_lsn,
206            timestamp: SystemTime::now()
207                .duration_since(UNIX_EPOCH)
208                .unwrap_or_default()
209                .as_millis() as u64,
210            operation,
211            collection: collection.to_string(),
212            entity_id,
213            entity_kind: entity_kind.to_string(),
214            changed_columns,
215            kv: None,
216        };
217
218        // Short-hold ring push. Under heavy contention parking_lot
219        // spins a few times before parking, so typical hold time is
220        // a couple hundred nanoseconds.
221        let mut events = self.events.lock();
222        if events.len() >= self.max_size {
223            events.pop_front();
224        }
225        events.push_back(event);
226
227        event_lsn
228    }
229
230    /// Emit many same-collection events with one LSN reservation and one
231    /// ring-buffer lock. This is used by bulk insert paths that do not need
232    /// per-row logical-WAL records.
233    pub fn emit_batch_same_collection<I>(
234        &self,
235        operation: ChangeOperation,
236        collection: &str,
237        entity_kind: &str,
238        entity_ids: I,
239    ) -> Vec<u64>
240    where
241        I: IntoIterator<Item = u64>,
242        I::IntoIter: ExactSizeIterator,
243    {
244        let iter = entity_ids.into_iter();
245        let count = iter.len();
246        if count == 0 {
247            return Vec::new();
248        }
249
250        let first_lsn = self.next_lsn.fetch_add(count as u64, Ordering::AcqRel) + 1;
251        let lsns = (0..count)
252            .map(|idx| first_lsn + idx as u64)
253            .collect::<Vec<_>>();
254        if self.max_size == 0 {
255            return lsns;
256        }
257
258        let timestamp = SystemTime::now()
259            .duration_since(UNIX_EPOCH)
260            .unwrap_or_default()
261            .as_millis() as u64;
262        let collection = collection.to_string();
263        let entity_kind = entity_kind.to_string();
264
265        let skip = count.saturating_sub(self.max_size);
266        let kept = count - skip;
267        let mut events = self.events.lock();
268        let overflow = events
269            .len()
270            .saturating_add(kept)
271            .saturating_sub(self.max_size);
272        for _ in 0..overflow {
273            events.pop_front();
274        }
275
276        for (idx, entity_id) in iter.enumerate().skip(skip) {
277            events.push_back(ChangeEvent {
278                lsn: first_lsn + idx as u64,
279                timestamp,
280                operation,
281                collection: collection.clone(),
282                entity_id,
283                entity_kind: entity_kind.clone(),
284                changed_columns: None,
285                kv: None,
286            });
287        }
288        lsns
289    }
290
291    /// Emit a committed logical KV event into the same CDC ring used by
292    /// result-cache invalidation and `/changes` consumers.
293    pub fn emit_kv(
294        &self,
295        operation: ChangeOperation,
296        collection: &str,
297        key: &str,
298        entity_id: u64,
299        before: Option<JsonValue>,
300        after: Option<JsonValue>,
301    ) -> u64 {
302        let event_lsn = self.next_lsn.fetch_add(1, Ordering::AcqRel) + 1;
303        let timestamp = SystemTime::now()
304            .duration_since(UNIX_EPOCH)
305            .unwrap_or_default()
306            .as_millis() as u64;
307        let kv = KvWatchEvent {
308            collection: collection.to_string(),
309            key: key.to_string(),
310            op: operation,
311            before,
312            after,
313            lsn: event_lsn,
314            committed_at: timestamp,
315            dropped_event_count: 0,
316        };
317        let event = ChangeEvent {
318            lsn: event_lsn,
319            timestamp,
320            operation,
321            collection: collection.to_string(),
322            entity_id,
323            entity_kind: "kv".to_string(),
324            changed_columns: Some(vec!["value".to_string()]),
325            kv: Some(kv),
326        };
327
328        let mut events = self.events.lock();
329        if events.len() >= self.max_size {
330            events.pop_front();
331        }
332        events.push_back(event);
333        event_lsn
334    }
335
336    /// Poll for events since a given LSN.
337    pub fn poll(&self, since_lsn: u64, max_count: usize) -> Vec<ChangeEvent> {
338        let events = self.events.lock();
339        events
340            .iter()
341            .filter(|e| e.lsn > since_lsn)
342            .take(max_count)
343            .cloned()
344            .collect()
345    }
346
347    /// Get the current (latest) LSN.
348    pub fn current_lsn(&self) -> u64 {
349        self.next_lsn.load(Ordering::Acquire)
350    }
351
352    /// Restore the LSN cursor after process restart. Only advances;
353    /// never rewinds. Under concurrent emit this is guarded by a
354    /// compare-exchange loop.
355    pub fn set_current_lsn(&self, lsn: u64) {
356        let mut current = self.next_lsn.load(Ordering::Acquire);
357        while lsn > current {
358            match self
359                .next_lsn
360                .compare_exchange(current, lsn, Ordering::AcqRel, Ordering::Acquire)
361            {
362                Ok(_) => break,
363                Err(observed) => current = observed,
364            }
365        }
366    }
367
368    /// Get the oldest available LSN (or None if empty).
369    pub fn oldest_lsn(&self) -> Option<u64> {
370        self.events.lock().front().map(|e| e.lsn)
371    }
372
373    /// Get buffer stats (single lock acquisition — no deadlock risk).
374    pub fn stats(&self) -> CdcStats {
375        let events = self.events.lock();
376        CdcStats {
377            buffered_events: events.len(),
378            current_lsn: self.next_lsn.load(Ordering::Acquire),
379            oldest_lsn: events.front().map(|e| e.lsn),
380            newest_lsn: events.back().map(|e| e.lsn),
381        }
382    }
383}
384
385/// CDC buffer statistics.
386#[derive(Debug, Clone)]
387pub struct CdcStats {
388    pub buffered_events: usize,
389    pub current_lsn: u64,
390    pub oldest_lsn: Option<u64>,
391    pub newest_lsn: Option<u64>,
392}
393
394#[cfg(test)]
395mod tests {
396    use super::*;
397
398    #[test]
399    fn test_emit_and_poll() {
400        let buf = CdcBuffer::new(100);
401        buf.emit(ChangeOperation::Insert, "users", 1, "table");
402        buf.emit(ChangeOperation::Update, "users", 1, "table");
403        buf.emit(ChangeOperation::Delete, "users", 1, "table");
404
405        let events = buf.poll(0, 10);
406        assert_eq!(events.len(), 3);
407        assert_eq!(events[0].operation, ChangeOperation::Insert);
408        assert_eq!(events[1].operation, ChangeOperation::Update);
409        assert_eq!(events[2].operation, ChangeOperation::Delete);
410        // Backwards-compat emit should leave changed_columns None.
411        assert!(events[0].changed_columns.is_none());
412        assert!(events[1].changed_columns.is_none());
413    }
414
415    #[test]
416    fn test_emit_with_columns_propagates_damage_vector() {
417        let buf = CdcBuffer::new(100);
418        buf.emit_with_columns(
419            ChangeOperation::Update,
420            "users",
421            7,
422            "table",
423            Some(vec!["email".to_string(), "age".to_string()]),
424        );
425        buf.emit(ChangeOperation::Update, "users", 8, "table");
426
427        let events = buf.poll(0, 10);
428        assert_eq!(events.len(), 2);
429        assert_eq!(
430            events[0].changed_columns.as_deref(),
431            Some(vec!["email".to_string(), "age".to_string()].as_slice())
432        );
433        assert!(events[1].changed_columns.is_none());
434    }
435
436    #[test]
437    fn test_poll_with_cursor() {
438        let buf = CdcBuffer::new(100);
439        buf.emit(ChangeOperation::Insert, "a", 1, "table");
440        buf.emit(ChangeOperation::Insert, "b", 2, "table");
441        buf.emit(ChangeOperation::Insert, "c", 3, "table");
442
443        // Poll from lsn=1, should get events 2 and 3
444        let events = buf.poll(1, 10);
445        assert_eq!(events.len(), 2);
446        assert_eq!(events[0].collection, "b");
447        assert_eq!(events[1].collection, "c");
448    }
449
450    #[test]
451    fn test_emit_batch_same_collection_assigns_contiguous_lsns() {
452        let buf = CdcBuffer::new(100);
453        buf.emit(ChangeOperation::Insert, "users", 10, "table");
454        buf.emit_batch_same_collection(ChangeOperation::Insert, "users", "table", [11, 12, 13]);
455
456        let events = buf.poll(0, 10);
457        assert_eq!(events.len(), 4);
458        assert_eq!(events[1].lsn, 2);
459        assert_eq!(events[2].lsn, 3);
460        assert_eq!(events[3].lsn, 4);
461        assert_eq!(events[3].entity_id, 13);
462        assert_eq!(buf.current_lsn(), 4);
463    }
464
465    #[test]
466    fn test_emit_batch_same_collection_respects_ring_size() {
467        let buf = CdcBuffer::new(3);
468        buf.emit_batch_same_collection(ChangeOperation::Insert, "users", "table", [1, 2, 3, 4, 5]);
469
470        let events = buf.poll(0, 10);
471        assert_eq!(events.len(), 3);
472        assert_eq!(
473            events
474                .iter()
475                .map(|event| event.entity_id)
476                .collect::<Vec<_>>(),
477            vec![3, 4, 5]
478        );
479        assert_eq!(events[0].lsn, 3);
480        assert_eq!(events[2].lsn, 5);
481        assert_eq!(buf.current_lsn(), 5);
482    }
483
484    #[test]
485    fn test_circular_eviction() {
486        let buf = CdcBuffer::new(3);
487        buf.emit(ChangeOperation::Insert, "a", 1, "table");
488        buf.emit(ChangeOperation::Insert, "b", 2, "table");
489        buf.emit(ChangeOperation::Insert, "c", 3, "table");
490        buf.emit(ChangeOperation::Insert, "d", 4, "table"); // evicts "a"
491
492        let events = buf.poll(0, 10);
493        assert_eq!(events.len(), 3);
494        assert_eq!(events[0].collection, "b"); // "a" was evicted
495    }
496
497    #[test]
498    fn test_stats() {
499        let buf = CdcBuffer::new(100);
500        buf.emit(ChangeOperation::Insert, "x", 1, "table");
501        buf.emit(ChangeOperation::Insert, "y", 2, "table");
502
503        let stats = buf.stats();
504        assert_eq!(stats.buffered_events, 2);
505        assert_eq!(stats.current_lsn, 2);
506        assert_eq!(stats.oldest_lsn, Some(1));
507        assert_eq!(stats.newest_lsn, Some(2));
508    }
509
510    #[test]
511    fn test_change_record_roundtrip() {
512        let record = ChangeRecord {
513            term: 3,
514            lsn: 7,
515            timestamp: 1234,
516            operation: ChangeOperation::Update,
517            collection: "users".to_string(),
518            entity_id: 42,
519            entity_kind: "row".to_string(),
520            entity_bytes: Some(vec![1, 2, 3]),
521            metadata: Some(server_json_to_wire_json(crate::json!({"role": "admin"}))),
522            refresh_records: None,
523        };
524
525        let decoded = ChangeRecord::decode(&record.encode()).expect("decode");
526        assert_eq!(decoded.term, record.term);
527        assert_eq!(decoded.lsn, record.lsn);
528        assert_eq!(decoded.collection, record.collection);
529        assert_eq!(decoded.entity_id, record.entity_id);
530        assert_eq!(decoded.entity_bytes, record.entity_bytes);
531    }
532
533    /// Issue #596 slice 9d — refresh records survive a JSON round-trip
534    /// through the logical-WAL wire format the primary writes and the
535    /// replica reads. Bit-for-bit equality on every payload byte is the
536    /// contract — the replica calls `bulk_insert` with whatever bytes
537    /// land in `refresh_records`, so a decode that silently drops or
538    /// re-orders them would silently diverge replica state.
539    #[test]
540    fn test_change_record_refresh_roundtrip() {
541        let records = vec![vec![0x10, 0x20, 0x30], vec![0xAA, 0xBB], Vec::new()];
542        let record =
543            ChangeRecord::for_refresh(11, 99, "mv_orders_summary", records.clone()).with_term(4);
544
545        let decoded = ChangeRecord::decode(&record.encode()).expect("decode");
546        assert_eq!(decoded.term, 4);
547        assert_eq!(decoded.operation, ChangeOperation::Refresh);
548        assert_eq!(decoded.collection, "mv_orders_summary");
549        assert_eq!(decoded.refresh_records.as_deref(), Some(&records[..]));
550    }
551
552    #[test]
553    fn test_change_record_legacy_payload_defaults_term() {
554        let legacy = br#"{"lsn":9,"timestamp":1,"operation":"delete","collection":"users","rid":5,"kind":"row"}"#;
555        let decoded = ChangeRecord::decode(legacy).expect("decode legacy record");
556        assert_eq!(decoded.term, crate::replication::DEFAULT_REPLICATION_TERM);
557        assert_eq!(decoded.lsn, 9);
558    }
559}