Skip to main content

reddb_server/replication/
cdc.rs

1//! Change Data Capture (CDC) — stream of database change events.
2//!
3//! Exposes entity mutations (insert, update, delete) as a pollable event stream.
4//! Consumers poll with a cursor (LSN) to receive new events since their last position.
5
6use std::collections::VecDeque;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::time::{SystemTime, UNIX_EPOCH};
9
10use crate::json::{Map, Value as JsonValue};
11
12/// Type of change operation.
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum ChangeOperation {
15    Insert,
16    Update,
17    Delete,
18}
19
20impl ChangeOperation {
21    pub fn from_str(value: &str) -> Option<Self> {
22        match value {
23            "insert" => Some(Self::Insert),
24            "update" => Some(Self::Update),
25            "delete" => Some(Self::Delete),
26            _ => None,
27        }
28    }
29
30    pub fn as_str(&self) -> &'static str {
31        match self {
32            Self::Insert => "insert",
33            Self::Update => "update",
34            Self::Delete => "delete",
35        }
36    }
37}
38
39/// A single change event.
40#[derive(Debug, Clone)]
41pub struct ChangeEvent {
42    /// Monotonically increasing sequence number
43    pub lsn: u64,
44    /// When the change occurred (unix ms)
45    pub timestamp: u64,
46    /// Type of operation
47    pub operation: ChangeOperation,
48    /// Collection name
49    pub collection: String,
50    /// Entity ID affected
51    pub entity_id: u64,
52    /// Entity kind (table, graph_node, graph_edge, vector, etc.)
53    pub entity_kind: String,
54    /// For `Update` events, the list of column names whose values
55    /// changed (including added/removed columns). `None` when the
56    /// emitter didn't compute a damage vector (inserts, deletes, and
57    /// coarse-grained update paths that haven't been rewired yet).
58    /// Downstream CDC consumers can use this to skip replaying
59    /// updates that touched columns they don't care about.
60    pub changed_columns: Option<Vec<String>>,
61    /// Optional KV-specific payload for WATCH subscribers. Existing CDC
62    /// consumers can ignore this field and continue using the generic shape.
63    pub kv: Option<KvWatchEvent>,
64}
65
66/// A committed single-key KV change surfaced by WATCH.
67#[derive(Debug, Clone, PartialEq)]
68pub struct KvWatchEvent {
69    pub collection: String,
70    pub key: String,
71    pub op: ChangeOperation,
72    pub before: Option<JsonValue>,
73    pub after: Option<JsonValue>,
74    pub lsn: u64,
75    pub committed_at: u64,
76    pub dropped_event_count: u64,
77}
78
79impl KvWatchEvent {
80    pub fn to_json_value(&self) -> JsonValue {
81        let mut object = Map::new();
82        object.insert("key".to_string(), JsonValue::String(self.key.clone()));
83        object.insert(
84            "op".to_string(),
85            JsonValue::String(self.op.as_str().to_string()),
86        );
87        object.insert(
88            "before".to_string(),
89            self.before.clone().unwrap_or(JsonValue::Null),
90        );
91        object.insert(
92            "after".to_string(),
93            self.after.clone().unwrap_or(JsonValue::Null),
94        );
95        object.insert("lsn".to_string(), JsonValue::Number(self.lsn as f64));
96        object.insert(
97            "committed_at".to_string(),
98            JsonValue::Number(self.committed_at as f64),
99        );
100        object.insert(
101            "dropped_event_count".to_string(),
102            JsonValue::Number(self.dropped_event_count as f64),
103        );
104        JsonValue::Object(object)
105    }
106}
107
108/// Structured logical WAL record serialized into the replication buffer and
109/// archived segments.
110#[derive(Debug, Clone)]
111pub struct ChangeRecord {
112    pub lsn: u64,
113    pub timestamp: u64,
114    pub operation: ChangeOperation,
115    pub collection: String,
116    pub entity_id: u64,
117    pub entity_kind: String,
118    pub entity_bytes: Option<Vec<u8>>,
119    pub metadata: Option<JsonValue>,
120}
121
122impl ChangeRecord {
123    pub fn from_entity(
124        lsn: u64,
125        timestamp: u64,
126        operation: ChangeOperation,
127        collection: impl Into<String>,
128        entity_kind: impl Into<String>,
129        entity: &crate::storage::UnifiedEntity,
130        format_version: u32,
131        metadata: Option<JsonValue>,
132    ) -> Self {
133        let entity_bytes = match operation {
134            ChangeOperation::Delete => None,
135            ChangeOperation::Insert | ChangeOperation::Update => Some(
136                crate::storage::UnifiedStore::serialize_entity(entity, format_version),
137            ),
138        };
139
140        Self {
141            lsn,
142            timestamp,
143            operation,
144            collection: collection.into(),
145            entity_id: entity.id.raw(),
146            entity_kind: entity_kind.into(),
147            entity_bytes,
148            metadata,
149        }
150    }
151
152    pub fn to_json_value(&self) -> JsonValue {
153        let mut object = Map::new();
154        object.insert("lsn".to_string(), JsonValue::Number(self.lsn as f64));
155        object.insert(
156            "timestamp".to_string(),
157            JsonValue::Number(self.timestamp as f64),
158        );
159        object.insert(
160            "operation".to_string(),
161            JsonValue::String(self.operation.as_str().to_string()),
162        );
163        object.insert(
164            "collection".to_string(),
165            JsonValue::String(self.collection.clone()),
166        );
167        object.insert(
168            "entity_id".to_string(),
169            JsonValue::Number(self.entity_id as f64),
170        );
171        object.insert(
172            "entity_kind".to_string(),
173            JsonValue::String(self.entity_kind.clone()),
174        );
175        if let Some(bytes) = &self.entity_bytes {
176            object.insert(
177                "entity_bytes_hex".to_string(),
178                JsonValue::String(hex::encode(bytes)),
179            );
180        }
181        if let Some(metadata) = &self.metadata {
182            object.insert("metadata".to_string(), metadata.clone());
183        }
184        JsonValue::Object(object)
185    }
186
187    pub fn encode(&self) -> Vec<u8> {
188        crate::json::to_string(&self.to_json_value())
189            .unwrap_or_else(|_| "{}".to_string())
190            .into_bytes()
191    }
192
193    pub fn decode(bytes: &[u8]) -> Result<Self, String> {
194        let text = std::str::from_utf8(bytes).map_err(|err| err.to_string())?;
195        let value = crate::json::from_str::<JsonValue>(text).map_err(|err| err.to_string())?;
196        let operation = value
197            .get("operation")
198            .and_then(JsonValue::as_str)
199            .and_then(ChangeOperation::from_str)
200            .ok_or_else(|| "invalid replication operation".to_string())?;
201        let entity_bytes = value
202            .get("entity_bytes_hex")
203            .and_then(JsonValue::as_str)
204            .map(hex::decode)
205            .transpose()
206            .map_err(|err| err.to_string())?;
207
208        Ok(Self {
209            lsn: value.get("lsn").and_then(JsonValue::as_u64).unwrap_or(0),
210            timestamp: value
211                .get("timestamp")
212                .and_then(JsonValue::as_u64)
213                .unwrap_or(0),
214            operation,
215            collection: value
216                .get("collection")
217                .and_then(JsonValue::as_str)
218                .unwrap_or_default()
219                .to_string(),
220            entity_id: value
221                .get("entity_id")
222                .and_then(JsonValue::as_u64)
223                .unwrap_or(0),
224            entity_kind: value
225                .get("entity_kind")
226                .and_then(JsonValue::as_str)
227                .unwrap_or("entity")
228                .to_string(),
229            entity_bytes,
230            metadata: value.get("metadata").cloned(),
231        })
232    }
233}
234
235/// CDC event buffer — circular buffer of change events.
236///
237/// Splits the "next LSN" counter (write-contended on every emit)
238/// from the event ring (short-hold push/pop) so that concurrent
239/// emitters don't serialise on a single RwLock. The previous
240/// design used one `RwLock<CdcState>` that turned every insert
241/// into a write-lock acquire, capping 16-way concurrent writes
242/// at ~1000 ops/s (each writer paid ~1ms queueing for the same
243/// mutex even though the work it guarded was a one-line VecDeque
244/// push).
245///
246/// New layout:
247///   - LSN is an `AtomicU64`, assigned with `fetch_add(1)`.
248///     Zero contention.
249///   - Events are guarded by a `parking_lot::Mutex<VecDeque>`.
250///     The critical section is `pop_front (if full) + push_back`
251///     — microseconds at most, parking-free at low contention.
252///
253/// Readers (`poll`, `current_lsn`, `stats`) take the same mutex
254/// briefly; they're cold paths compared to the write hot path.
255pub struct CdcBuffer {
256    next_lsn: AtomicU64,
257    events: parking_lot::Mutex<VecDeque<ChangeEvent>>,
258    max_size: usize,
259}
260
261impl CdcBuffer {
262    /// Create a new CDC buffer with maximum capacity.
263    pub fn new(max_size: usize) -> Self {
264        Self {
265            next_lsn: AtomicU64::new(0),
266            events: parking_lot::Mutex::new(VecDeque::with_capacity(max_size.min(10_000))),
267            max_size,
268        }
269    }
270
271    /// Emit a change event into the buffer. `changed_columns`
272    /// defaults to `None` for backwards compatibility; call sites
273    /// that have a damage vector available should use
274    /// [`Self::emit_with_columns`] instead.
275    pub fn emit(
276        &self,
277        operation: ChangeOperation,
278        collection: &str,
279        entity_id: u64,
280        entity_kind: &str,
281    ) -> u64 {
282        self.emit_with_columns(operation, collection, entity_id, entity_kind, None)
283    }
284
285    /// Emit a change event with an optional list of column names
286    /// that were affected. Use from update paths that have already
287    /// computed a [`RowDamageVector`](crate::application::entity::RowDamageVector)
288    /// so CDC consumers can filter by touched column without re-diffing.
289    pub fn emit_with_columns(
290        &self,
291        operation: ChangeOperation,
292        collection: &str,
293        entity_id: u64,
294        entity_kind: &str,
295        changed_columns: Option<Vec<String>>,
296    ) -> u64 {
297        // LSN assignment is lock-free — multiple emitters each get a
298        // unique monotonic LSN without waiting on any other emitter.
299        let event_lsn = self.next_lsn.fetch_add(1, Ordering::AcqRel) + 1;
300
301        let event = ChangeEvent {
302            lsn: event_lsn,
303            timestamp: SystemTime::now()
304                .duration_since(UNIX_EPOCH)
305                .unwrap_or_default()
306                .as_millis() as u64,
307            operation,
308            collection: collection.to_string(),
309            entity_id,
310            entity_kind: entity_kind.to_string(),
311            changed_columns,
312            kv: None,
313        };
314
315        // Short-hold ring push. Under heavy contention parking_lot
316        // spins a few times before parking, so typical hold time is
317        // a couple hundred nanoseconds.
318        let mut events = self.events.lock();
319        if events.len() >= self.max_size {
320            events.pop_front();
321        }
322        events.push_back(event);
323
324        event_lsn
325    }
326
327    /// Emit many same-collection events with one LSN reservation and one
328    /// ring-buffer lock. This is used by bulk insert paths that do not need
329    /// per-row logical-WAL records.
330    pub fn emit_batch_same_collection<I>(
331        &self,
332        operation: ChangeOperation,
333        collection: &str,
334        entity_kind: &str,
335        entity_ids: I,
336    ) -> Vec<u64>
337    where
338        I: IntoIterator<Item = u64>,
339        I::IntoIter: ExactSizeIterator,
340    {
341        let iter = entity_ids.into_iter();
342        let count = iter.len();
343        if count == 0 {
344            return Vec::new();
345        }
346
347        let first_lsn = self.next_lsn.fetch_add(count as u64, Ordering::AcqRel) + 1;
348        let lsns = (0..count)
349            .map(|idx| first_lsn + idx as u64)
350            .collect::<Vec<_>>();
351        if self.max_size == 0 {
352            return lsns;
353        }
354
355        let timestamp = SystemTime::now()
356            .duration_since(UNIX_EPOCH)
357            .unwrap_or_default()
358            .as_millis() as u64;
359        let collection = collection.to_string();
360        let entity_kind = entity_kind.to_string();
361
362        let skip = count.saturating_sub(self.max_size);
363        let kept = count - skip;
364        let mut events = self.events.lock();
365        let overflow = events
366            .len()
367            .saturating_add(kept)
368            .saturating_sub(self.max_size);
369        for _ in 0..overflow {
370            events.pop_front();
371        }
372
373        for (idx, entity_id) in iter.enumerate().skip(skip) {
374            events.push_back(ChangeEvent {
375                lsn: first_lsn + idx as u64,
376                timestamp,
377                operation,
378                collection: collection.clone(),
379                entity_id,
380                entity_kind: entity_kind.clone(),
381                changed_columns: None,
382                kv: None,
383            });
384        }
385        lsns
386    }
387
388    /// Emit a committed logical KV event into the same CDC ring used by
389    /// result-cache invalidation and `/changes` consumers.
390    pub fn emit_kv(
391        &self,
392        operation: ChangeOperation,
393        collection: &str,
394        key: &str,
395        entity_id: u64,
396        before: Option<JsonValue>,
397        after: Option<JsonValue>,
398    ) -> u64 {
399        let event_lsn = self.next_lsn.fetch_add(1, Ordering::AcqRel) + 1;
400        let timestamp = SystemTime::now()
401            .duration_since(UNIX_EPOCH)
402            .unwrap_or_default()
403            .as_millis() as u64;
404        let kv = KvWatchEvent {
405            collection: collection.to_string(),
406            key: key.to_string(),
407            op: operation,
408            before,
409            after,
410            lsn: event_lsn,
411            committed_at: timestamp,
412            dropped_event_count: 0,
413        };
414        let event = ChangeEvent {
415            lsn: event_lsn,
416            timestamp,
417            operation,
418            collection: collection.to_string(),
419            entity_id,
420            entity_kind: "kv".to_string(),
421            changed_columns: Some(vec!["value".to_string()]),
422            kv: Some(kv),
423        };
424
425        let mut events = self.events.lock();
426        if events.len() >= self.max_size {
427            events.pop_front();
428        }
429        events.push_back(event);
430        event_lsn
431    }
432
433    /// Poll for events since a given LSN.
434    pub fn poll(&self, since_lsn: u64, max_count: usize) -> Vec<ChangeEvent> {
435        let events = self.events.lock();
436        events
437            .iter()
438            .filter(|e| e.lsn > since_lsn)
439            .take(max_count)
440            .cloned()
441            .collect()
442    }
443
444    /// Get the current (latest) LSN.
445    pub fn current_lsn(&self) -> u64 {
446        self.next_lsn.load(Ordering::Acquire)
447    }
448
449    /// Restore the LSN cursor after process restart. Only advances;
450    /// never rewinds. Under concurrent emit this is guarded by a
451    /// compare-exchange loop.
452    pub fn set_current_lsn(&self, lsn: u64) {
453        let mut current = self.next_lsn.load(Ordering::Acquire);
454        while lsn > current {
455            match self
456                .next_lsn
457                .compare_exchange(current, lsn, Ordering::AcqRel, Ordering::Acquire)
458            {
459                Ok(_) => break,
460                Err(observed) => current = observed,
461            }
462        }
463    }
464
465    /// Get the oldest available LSN (or None if empty).
466    pub fn oldest_lsn(&self) -> Option<u64> {
467        self.events.lock().front().map(|e| e.lsn)
468    }
469
470    /// Get buffer stats (single lock acquisition — no deadlock risk).
471    pub fn stats(&self) -> CdcStats {
472        let events = self.events.lock();
473        CdcStats {
474            buffered_events: events.len(),
475            current_lsn: self.next_lsn.load(Ordering::Acquire),
476            oldest_lsn: events.front().map(|e| e.lsn),
477            newest_lsn: events.back().map(|e| e.lsn),
478        }
479    }
480}
481
482/// CDC buffer statistics.
483#[derive(Debug, Clone)]
484pub struct CdcStats {
485    pub buffered_events: usize,
486    pub current_lsn: u64,
487    pub oldest_lsn: Option<u64>,
488    pub newest_lsn: Option<u64>,
489}
490
491#[cfg(test)]
492mod tests {
493    use super::*;
494
495    #[test]
496    fn test_emit_and_poll() {
497        let buf = CdcBuffer::new(100);
498        buf.emit(ChangeOperation::Insert, "users", 1, "table");
499        buf.emit(ChangeOperation::Update, "users", 1, "table");
500        buf.emit(ChangeOperation::Delete, "users", 1, "table");
501
502        let events = buf.poll(0, 10);
503        assert_eq!(events.len(), 3);
504        assert_eq!(events[0].operation, ChangeOperation::Insert);
505        assert_eq!(events[1].operation, ChangeOperation::Update);
506        assert_eq!(events[2].operation, ChangeOperation::Delete);
507        // Backwards-compat emit should leave changed_columns None.
508        assert!(events[0].changed_columns.is_none());
509        assert!(events[1].changed_columns.is_none());
510    }
511
512    #[test]
513    fn test_emit_with_columns_propagates_damage_vector() {
514        let buf = CdcBuffer::new(100);
515        buf.emit_with_columns(
516            ChangeOperation::Update,
517            "users",
518            7,
519            "table",
520            Some(vec!["email".to_string(), "age".to_string()]),
521        );
522        buf.emit(ChangeOperation::Update, "users", 8, "table");
523
524        let events = buf.poll(0, 10);
525        assert_eq!(events.len(), 2);
526        assert_eq!(
527            events[0].changed_columns.as_deref(),
528            Some(vec!["email".to_string(), "age".to_string()].as_slice())
529        );
530        assert!(events[1].changed_columns.is_none());
531    }
532
533    #[test]
534    fn test_poll_with_cursor() {
535        let buf = CdcBuffer::new(100);
536        buf.emit(ChangeOperation::Insert, "a", 1, "table");
537        buf.emit(ChangeOperation::Insert, "b", 2, "table");
538        buf.emit(ChangeOperation::Insert, "c", 3, "table");
539
540        // Poll from lsn=1, should get events 2 and 3
541        let events = buf.poll(1, 10);
542        assert_eq!(events.len(), 2);
543        assert_eq!(events[0].collection, "b");
544        assert_eq!(events[1].collection, "c");
545    }
546
547    #[test]
548    fn test_emit_batch_same_collection_assigns_contiguous_lsns() {
549        let buf = CdcBuffer::new(100);
550        buf.emit(ChangeOperation::Insert, "users", 10, "table");
551        buf.emit_batch_same_collection(ChangeOperation::Insert, "users", "table", [11, 12, 13]);
552
553        let events = buf.poll(0, 10);
554        assert_eq!(events.len(), 4);
555        assert_eq!(events[1].lsn, 2);
556        assert_eq!(events[2].lsn, 3);
557        assert_eq!(events[3].lsn, 4);
558        assert_eq!(events[3].entity_id, 13);
559        assert_eq!(buf.current_lsn(), 4);
560    }
561
562    #[test]
563    fn test_emit_batch_same_collection_respects_ring_size() {
564        let buf = CdcBuffer::new(3);
565        buf.emit_batch_same_collection(ChangeOperation::Insert, "users", "table", [1, 2, 3, 4, 5]);
566
567        let events = buf.poll(0, 10);
568        assert_eq!(events.len(), 3);
569        assert_eq!(
570            events
571                .iter()
572                .map(|event| event.entity_id)
573                .collect::<Vec<_>>(),
574            vec![3, 4, 5]
575        );
576        assert_eq!(events[0].lsn, 3);
577        assert_eq!(events[2].lsn, 5);
578        assert_eq!(buf.current_lsn(), 5);
579    }
580
581    #[test]
582    fn test_circular_eviction() {
583        let buf = CdcBuffer::new(3);
584        buf.emit(ChangeOperation::Insert, "a", 1, "table");
585        buf.emit(ChangeOperation::Insert, "b", 2, "table");
586        buf.emit(ChangeOperation::Insert, "c", 3, "table");
587        buf.emit(ChangeOperation::Insert, "d", 4, "table"); // evicts "a"
588
589        let events = buf.poll(0, 10);
590        assert_eq!(events.len(), 3);
591        assert_eq!(events[0].collection, "b"); // "a" was evicted
592    }
593
594    #[test]
595    fn test_stats() {
596        let buf = CdcBuffer::new(100);
597        buf.emit(ChangeOperation::Insert, "x", 1, "table");
598        buf.emit(ChangeOperation::Insert, "y", 2, "table");
599
600        let stats = buf.stats();
601        assert_eq!(stats.buffered_events, 2);
602        assert_eq!(stats.current_lsn, 2);
603        assert_eq!(stats.oldest_lsn, Some(1));
604        assert_eq!(stats.newest_lsn, Some(2));
605    }
606
607    #[test]
608    fn test_change_record_roundtrip() {
609        let record = ChangeRecord {
610            lsn: 7,
611            timestamp: 1234,
612            operation: ChangeOperation::Update,
613            collection: "users".to_string(),
614            entity_id: 42,
615            entity_kind: "row".to_string(),
616            entity_bytes: Some(vec![1, 2, 3]),
617            metadata: Some(crate::json!({"role": "admin"})),
618        };
619
620        let decoded = ChangeRecord::decode(&record.encode()).expect("decode");
621        assert_eq!(decoded.lsn, record.lsn);
622        assert_eq!(decoded.collection, record.collection);
623        assert_eq!(decoded.entity_id, record.entity_id);
624        assert_eq!(decoded.entity_bytes, record.entity_bytes);
625    }
626}