Skip to main content

reddb_server/replication/
cdc.rs

1//! Change Data Capture (CDC) — stream of database change events.
2//!
3//! Exposes entity mutations (insert, update, delete) as a pollable event stream.
4//! Consumers poll with a cursor (LSN) to receive new events since their last position.
5
6use std::collections::VecDeque;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::time::{SystemTime, UNIX_EPOCH};
9
10use crate::json::{Map, Value as JsonValue};
11
12pub use reddb_wire::replication::{public_item_kind, ChangeOperation, ChangeRecord};
13// Issue #991 — range-authority fence types travel with the ChangeRecord
14// contract; re-exported separately to keep the pinned contract line above
15// byte-for-byte (see `protocol_authority` reddb-wire test).
16pub use reddb_wire::replication::{RangeAdmitError, RangeAuthority};
17// Issue #992 — range-indexed WAL streaming / per-range catch-up primitives.
18// The filtering and per-range progress contract lives in reddb-wire; the
19// server drives them over the single physical WAL's derived stream.
20pub use reddb_wire::replication::{
21    classify_range_record, plan_range_catchup, RangeCatchupPlan, RangeProgressTracker,
22    RangeStreamDecision, RangeStreamPosition, RangeStreamProgress, RangeStreamReject,
23};
24
25/// A single change event.
26#[derive(Debug, Clone)]
27pub struct ChangeEvent {
28    /// Monotonically increasing sequence number
29    pub lsn: u64,
30    /// When the change occurred (unix ms)
31    pub timestamp: u64,
32    /// Type of operation
33    pub operation: ChangeOperation,
34    /// Collection name
35    pub collection: String,
36    /// Entity ID affected
37    pub entity_id: u64,
38    /// Entity kind (table, graph_node, graph_edge, vector, etc.)
39    pub entity_kind: String,
40    /// For `Update` events, the list of column names whose values
41    /// changed (including added/removed columns). `None` when the
42    /// emitter didn't compute a damage vector (inserts, deletes, and
43    /// coarse-grained update paths that haven't been rewired yet).
44    /// Downstream CDC consumers can use this to skip replaying
45    /// updates that touched columns they don't care about.
46    pub changed_columns: Option<Vec<String>>,
47    /// Optional KV-specific payload for WATCH subscribers. Existing CDC
48    /// consumers can ignore this field and continue using the generic shape.
49    pub kv: Option<KvWatchEvent>,
50}
51
52impl ChangeEvent {
53    pub fn rid(&self) -> u64 {
54        self.entity_id
55    }
56
57    pub fn kind(&self) -> &'static str {
58        public_item_kind(&self.entity_kind)
59    }
60}
61
62/// A committed single-key KV change surfaced by WATCH.
63#[derive(Debug, Clone, PartialEq)]
64pub struct KvWatchEvent {
65    pub collection: String,
66    pub key: String,
67    pub op: ChangeOperation,
68    pub before: Option<JsonValue>,
69    pub after: Option<JsonValue>,
70    pub lsn: u64,
71    pub committed_at: u64,
72    pub dropped_event_count: u64,
73}
74
75impl KvWatchEvent {
76    pub fn to_json_value(&self) -> JsonValue {
77        let mut object = Map::new();
78        object.insert("key".to_string(), JsonValue::String(self.key.clone()));
79        object.insert(
80            "op".to_string(),
81            JsonValue::String(self.op.as_str().to_string()),
82        );
83        object.insert(
84            "before".to_string(),
85            self.before.clone().unwrap_or(JsonValue::Null),
86        );
87        object.insert(
88            "after".to_string(),
89            self.after.clone().unwrap_or(JsonValue::Null),
90        );
91        object.insert("lsn".to_string(), JsonValue::Number(self.lsn as f64));
92        object.insert(
93            "committed_at".to_string(),
94            JsonValue::Number(self.committed_at as f64),
95        );
96        object.insert(
97            "dropped_event_count".to_string(),
98            JsonValue::Number(self.dropped_event_count as f64),
99        );
100        JsonValue::Object(object)
101    }
102}
103
104pub fn change_record_from_entity(
105    lsn: u64,
106    timestamp: u64,
107    operation: ChangeOperation,
108    collection: impl Into<String>,
109    entity_kind: impl Into<String>,
110    entity: &crate::storage::UnifiedEntity,
111    format_version: u32,
112    metadata: Option<JsonValue>,
113) -> ChangeRecord {
114    let entity_bytes = match operation {
115        ChangeOperation::Delete | ChangeOperation::Refresh => None,
116        ChangeOperation::Insert | ChangeOperation::Update => Some(
117            crate::storage::UnifiedStore::serialize_entity(entity, format_version),
118        ),
119    };
120
121    ChangeRecord {
122        term: crate::replication::DEFAULT_REPLICATION_TERM,
123        lsn,
124        timestamp,
125        operation,
126        collection: collection.into(),
127        entity_id: entity.id.raw(),
128        entity_kind: entity_kind.into(),
129        entity_bytes,
130        metadata: metadata.map(server_json_to_wire_json),
131        refresh_records: None,
132        // Issue #991 — range authority is stamped by callers that route the
133        // change through the ownership catalog via `with_range_authority`;
134        // the base builder leaves it unset so non-range paths are unaffected.
135        range_id: None,
136        ownership_epoch: None,
137    }
138}
139
140pub fn server_json_to_wire_json(
141    value: JsonValue,
142) -> reddb_wire::replication::ChangeRecordJsonValue {
143    reddb_wire::replication::parse_change_record_json_value(&value.to_string_compact())
144        .unwrap_or(reddb_wire::replication::ChangeRecordJsonValue::Null)
145}
146
147pub fn wire_json_to_server_json(
148    value: &reddb_wire::replication::ChangeRecordJsonValue,
149) -> JsonValue {
150    crate::json::from_str(&reddb_wire::replication::change_record_json_value_to_string(value))
151        .unwrap_or(JsonValue::Null)
152}
153
154/// CDC event buffer — circular buffer of change events.
155///
156/// Splits the "next LSN" counter (write-contended on every emit)
157/// from the event ring (short-hold push/pop) so that concurrent
158/// emitters don't serialise on a single RwLock. The previous
159/// design used one `RwLock<CdcState>` that turned every insert
160/// into a write-lock acquire, capping 16-way concurrent writes
161/// at ~1000 ops/s (each writer paid ~1ms queueing for the same
162/// mutex even though the work it guarded was a one-line VecDeque
163/// push).
164///
165/// New layout:
166///   - LSN is an `AtomicU64`, assigned with `fetch_add(1)`.
167///     Zero contention.
168///   - Events are guarded by a `parking_lot::Mutex<VecDeque>`.
169///     The critical section is `pop_front (if full) + push_back`
170///     — microseconds at most, parking-free at low contention.
171///
172/// Readers (`poll`, `current_lsn`, `stats`) take the same mutex
173/// briefly; they're cold paths compared to the write hot path.
174pub struct CdcBuffer {
175    next_lsn: AtomicU64,
176    events: parking_lot::Mutex<VecDeque<ChangeEvent>>,
177    max_size: usize,
178}
179
180impl CdcBuffer {
181    /// Create a new CDC buffer with maximum capacity.
182    pub fn new(max_size: usize) -> Self {
183        Self {
184            next_lsn: AtomicU64::new(0),
185            events: parking_lot::Mutex::new(VecDeque::with_capacity(max_size.min(10_000))),
186            max_size,
187        }
188    }
189
190    /// Emit a change event into the buffer. `changed_columns`
191    /// defaults to `None` for backwards compatibility; call sites
192    /// that have a damage vector available should use
193    /// [`Self::emit_with_columns`] instead.
194    pub fn emit(
195        &self,
196        operation: ChangeOperation,
197        collection: &str,
198        entity_id: u64,
199        entity_kind: &str,
200    ) -> u64 {
201        self.emit_with_columns(operation, collection, entity_id, entity_kind, None)
202    }
203
204    /// Emit a change event with an optional list of column names
205    /// that were affected. Use from update paths that have already
206    /// computed a [`RowDamageVector`](crate::application::entity::RowDamageVector)
207    /// so CDC consumers can filter by touched column without re-diffing.
208    pub fn emit_with_columns(
209        &self,
210        operation: ChangeOperation,
211        collection: &str,
212        entity_id: u64,
213        entity_kind: &str,
214        changed_columns: Option<Vec<String>>,
215    ) -> u64 {
216        // LSN assignment is lock-free — multiple emitters each get a
217        // unique monotonic LSN without waiting on any other emitter.
218        let event_lsn = self.next_lsn.fetch_add(1, Ordering::AcqRel) + 1;
219
220        let event = ChangeEvent {
221            lsn: event_lsn,
222            timestamp: SystemTime::now()
223                .duration_since(UNIX_EPOCH)
224                .unwrap_or_default()
225                .as_millis() as u64,
226            operation,
227            collection: collection.to_string(),
228            entity_id,
229            entity_kind: entity_kind.to_string(),
230            changed_columns,
231            kv: None,
232        };
233
234        // Short-hold ring push. Under heavy contention parking_lot
235        // spins a few times before parking, so typical hold time is
236        // a couple hundred nanoseconds.
237        let mut events = self.events.lock();
238        if events.len() >= self.max_size {
239            events.pop_front();
240        }
241        events.push_back(event);
242
243        event_lsn
244    }
245
246    /// Emit many same-collection events with one LSN reservation and one
247    /// ring-buffer lock. This is used by bulk insert paths that do not need
248    /// per-row logical-WAL records.
249    pub fn emit_batch_same_collection<I>(
250        &self,
251        operation: ChangeOperation,
252        collection: &str,
253        entity_kind: &str,
254        entity_ids: I,
255    ) -> Vec<u64>
256    where
257        I: IntoIterator<Item = u64>,
258        I::IntoIter: ExactSizeIterator,
259    {
260        let iter = entity_ids.into_iter();
261        let count = iter.len();
262        if count == 0 {
263            return Vec::new();
264        }
265
266        let first_lsn = self.next_lsn.fetch_add(count as u64, Ordering::AcqRel) + 1;
267        let lsns = (0..count)
268            .map(|idx| first_lsn + idx as u64)
269            .collect::<Vec<_>>();
270        if self.max_size == 0 {
271            return lsns;
272        }
273
274        let timestamp = SystemTime::now()
275            .duration_since(UNIX_EPOCH)
276            .unwrap_or_default()
277            .as_millis() as u64;
278        let collection = collection.to_string();
279        let entity_kind = entity_kind.to_string();
280
281        let skip = count.saturating_sub(self.max_size);
282        let kept = count - skip;
283        let mut events = self.events.lock();
284        let overflow = events
285            .len()
286            .saturating_add(kept)
287            .saturating_sub(self.max_size);
288        for _ in 0..overflow {
289            events.pop_front();
290        }
291
292        for (idx, entity_id) in iter.enumerate().skip(skip) {
293            events.push_back(ChangeEvent {
294                lsn: first_lsn + idx as u64,
295                timestamp,
296                operation,
297                collection: collection.clone(),
298                entity_id,
299                entity_kind: entity_kind.clone(),
300                changed_columns: None,
301                kv: None,
302            });
303        }
304        lsns
305    }
306
307    /// Emit a committed logical KV event into the same CDC ring used by
308    /// result-cache invalidation and `/changes` consumers.
309    pub fn emit_kv(
310        &self,
311        operation: ChangeOperation,
312        collection: &str,
313        key: &str,
314        entity_id: u64,
315        before: Option<JsonValue>,
316        after: Option<JsonValue>,
317    ) -> u64 {
318        let event_lsn = self.next_lsn.fetch_add(1, Ordering::AcqRel) + 1;
319        let timestamp = SystemTime::now()
320            .duration_since(UNIX_EPOCH)
321            .unwrap_or_default()
322            .as_millis() as u64;
323        let kv = KvWatchEvent {
324            collection: collection.to_string(),
325            key: key.to_string(),
326            op: operation,
327            before,
328            after,
329            lsn: event_lsn,
330            committed_at: timestamp,
331            dropped_event_count: 0,
332        };
333        let event = ChangeEvent {
334            lsn: event_lsn,
335            timestamp,
336            operation,
337            collection: collection.to_string(),
338            entity_id,
339            entity_kind: "kv".to_string(),
340            changed_columns: Some(vec!["value".to_string()]),
341            kv: Some(kv),
342        };
343
344        let mut events = self.events.lock();
345        if events.len() >= self.max_size {
346            events.pop_front();
347        }
348        events.push_back(event);
349        event_lsn
350    }
351
352    /// Poll for events since a given LSN.
353    pub fn poll(&self, since_lsn: u64, max_count: usize) -> Vec<ChangeEvent> {
354        let events = self.events.lock();
355        events
356            .iter()
357            .filter(|e| e.lsn > since_lsn)
358            .take(max_count)
359            .cloned()
360            .collect()
361    }
362
363    /// Get the current (latest) LSN.
364    pub fn current_lsn(&self) -> u64 {
365        self.next_lsn.load(Ordering::Acquire)
366    }
367
368    /// Restore the LSN cursor after process restart. Only advances;
369    /// never rewinds. Under concurrent emit this is guarded by a
370    /// compare-exchange loop.
371    pub fn set_current_lsn(&self, lsn: u64) {
372        let mut current = self.next_lsn.load(Ordering::Acquire);
373        while lsn > current {
374            match self
375                .next_lsn
376                .compare_exchange(current, lsn, Ordering::AcqRel, Ordering::Acquire)
377            {
378                Ok(_) => break,
379                Err(observed) => current = observed,
380            }
381        }
382    }
383
384    /// Get the oldest available LSN (or None if empty).
385    pub fn oldest_lsn(&self) -> Option<u64> {
386        self.events.lock().front().map(|e| e.lsn)
387    }
388
389    /// Get buffer stats (single lock acquisition — no deadlock risk).
390    pub fn stats(&self) -> CdcStats {
391        let events = self.events.lock();
392        CdcStats {
393            buffered_events: events.len(),
394            current_lsn: self.next_lsn.load(Ordering::Acquire),
395            oldest_lsn: events.front().map(|e| e.lsn),
396            newest_lsn: events.back().map(|e| e.lsn),
397        }
398    }
399}
400
401/// CDC buffer statistics.
402#[derive(Debug, Clone)]
403pub struct CdcStats {
404    pub buffered_events: usize,
405    pub current_lsn: u64,
406    pub oldest_lsn: Option<u64>,
407    pub newest_lsn: Option<u64>,
408}
409
410#[cfg(test)]
411mod tests {
412    use super::*;
413
414    #[test]
415    fn test_emit_and_poll() {
416        let buf = CdcBuffer::new(100);
417        buf.emit(ChangeOperation::Insert, "users", 1, "table");
418        buf.emit(ChangeOperation::Update, "users", 1, "table");
419        buf.emit(ChangeOperation::Delete, "users", 1, "table");
420
421        let events = buf.poll(0, 10);
422        assert_eq!(events.len(), 3);
423        assert_eq!(events[0].operation, ChangeOperation::Insert);
424        assert_eq!(events[1].operation, ChangeOperation::Update);
425        assert_eq!(events[2].operation, ChangeOperation::Delete);
426        // Backwards-compat emit should leave changed_columns None.
427        assert!(events[0].changed_columns.is_none());
428        assert!(events[1].changed_columns.is_none());
429    }
430
431    #[test]
432    fn test_emit_with_columns_propagates_damage_vector() {
433        let buf = CdcBuffer::new(100);
434        buf.emit_with_columns(
435            ChangeOperation::Update,
436            "users",
437            7,
438            "table",
439            Some(vec!["email".to_string(), "age".to_string()]),
440        );
441        buf.emit(ChangeOperation::Update, "users", 8, "table");
442
443        let events = buf.poll(0, 10);
444        assert_eq!(events.len(), 2);
445        assert_eq!(
446            events[0].changed_columns.as_deref(),
447            Some(vec!["email".to_string(), "age".to_string()].as_slice())
448        );
449        assert!(events[1].changed_columns.is_none());
450    }
451
452    #[test]
453    fn test_poll_with_cursor() {
454        let buf = CdcBuffer::new(100);
455        buf.emit(ChangeOperation::Insert, "a", 1, "table");
456        buf.emit(ChangeOperation::Insert, "b", 2, "table");
457        buf.emit(ChangeOperation::Insert, "c", 3, "table");
458
459        // Poll from lsn=1, should get events 2 and 3
460        let events = buf.poll(1, 10);
461        assert_eq!(events.len(), 2);
462        assert_eq!(events[0].collection, "b");
463        assert_eq!(events[1].collection, "c");
464    }
465
466    #[test]
467    fn test_emit_batch_same_collection_assigns_contiguous_lsns() {
468        let buf = CdcBuffer::new(100);
469        buf.emit(ChangeOperation::Insert, "users", 10, "table");
470        buf.emit_batch_same_collection(ChangeOperation::Insert, "users", "table", [11, 12, 13]);
471
472        let events = buf.poll(0, 10);
473        assert_eq!(events.len(), 4);
474        assert_eq!(events[1].lsn, 2);
475        assert_eq!(events[2].lsn, 3);
476        assert_eq!(events[3].lsn, 4);
477        assert_eq!(events[3].entity_id, 13);
478        assert_eq!(buf.current_lsn(), 4);
479    }
480
481    #[test]
482    fn test_emit_batch_same_collection_respects_ring_size() {
483        let buf = CdcBuffer::new(3);
484        buf.emit_batch_same_collection(ChangeOperation::Insert, "users", "table", [1, 2, 3, 4, 5]);
485
486        let events = buf.poll(0, 10);
487        assert_eq!(events.len(), 3);
488        assert_eq!(
489            events
490                .iter()
491                .map(|event| event.entity_id)
492                .collect::<Vec<_>>(),
493            vec![3, 4, 5]
494        );
495        assert_eq!(events[0].lsn, 3);
496        assert_eq!(events[2].lsn, 5);
497        assert_eq!(buf.current_lsn(), 5);
498    }
499
500    #[test]
501    fn test_circular_eviction() {
502        let buf = CdcBuffer::new(3);
503        buf.emit(ChangeOperation::Insert, "a", 1, "table");
504        buf.emit(ChangeOperation::Insert, "b", 2, "table");
505        buf.emit(ChangeOperation::Insert, "c", 3, "table");
506        buf.emit(ChangeOperation::Insert, "d", 4, "table"); // evicts "a"
507
508        let events = buf.poll(0, 10);
509        assert_eq!(events.len(), 3);
510        assert_eq!(events[0].collection, "b"); // "a" was evicted
511    }
512
513    #[test]
514    fn test_stats() {
515        let buf = CdcBuffer::new(100);
516        buf.emit(ChangeOperation::Insert, "x", 1, "table");
517        buf.emit(ChangeOperation::Insert, "y", 2, "table");
518
519        let stats = buf.stats();
520        assert_eq!(stats.buffered_events, 2);
521        assert_eq!(stats.current_lsn, 2);
522        assert_eq!(stats.oldest_lsn, Some(1));
523        assert_eq!(stats.newest_lsn, Some(2));
524    }
525
526    #[test]
527    fn test_change_record_roundtrip() {
528        let record = ChangeRecord {
529            term: 3,
530            lsn: 7,
531            timestamp: 1234,
532            operation: ChangeOperation::Update,
533            collection: "users".to_string(),
534            entity_id: 42,
535            entity_kind: "row".to_string(),
536            entity_bytes: Some(vec![1, 2, 3]),
537            metadata: Some(server_json_to_wire_json(crate::json!({"role": "admin"}))),
538            refresh_records: None,
539            range_id: None,
540            ownership_epoch: None,
541        };
542
543        let decoded = ChangeRecord::decode(&record.encode()).expect("decode");
544        assert_eq!(decoded.term, record.term);
545        assert_eq!(decoded.lsn, record.lsn);
546        assert_eq!(decoded.collection, record.collection);
547        assert_eq!(decoded.entity_id, record.entity_id);
548        assert_eq!(decoded.entity_bytes, record.entity_bytes);
549    }
550
551    /// Issue #596 slice 9d — refresh records survive a JSON round-trip
552    /// through the logical-WAL wire format the primary writes and the
553    /// replica reads. Bit-for-bit equality on every payload byte is the
554    /// contract — the replica calls `bulk_insert` with whatever bytes
555    /// land in `refresh_records`, so a decode that silently drops or
556    /// re-orders them would silently diverge replica state.
557    #[test]
558    fn test_change_record_refresh_roundtrip() {
559        let records = vec![vec![0x10, 0x20, 0x30], vec![0xAA, 0xBB], Vec::new()];
560        let record =
561            ChangeRecord::for_refresh(11, 99, "mv_orders_summary", records.clone()).with_term(4);
562
563        let decoded = ChangeRecord::decode(&record.encode()).expect("decode");
564        assert_eq!(decoded.term, 4);
565        assert_eq!(decoded.operation, ChangeOperation::Refresh);
566        assert_eq!(decoded.collection, "mv_orders_summary");
567        assert_eq!(decoded.refresh_records.as_deref(), Some(&records[..]));
568    }
569
570    #[test]
571    fn test_change_record_legacy_payload_defaults_term() {
572        let legacy = br#"{"lsn":9,"timestamp":1,"operation":"delete","collection":"users","rid":5,"kind":"row"}"#;
573        let decoded = ChangeRecord::decode(legacy).expect("decode legacy record");
574        assert_eq!(decoded.term, crate::replication::DEFAULT_REPLICATION_TERM);
575        assert_eq!(decoded.lsn, 9);
576    }
577}