Skip to main content

reddb_server/replication/
cdc.rs

1//! Change Data Capture (CDC) — stream of database change events.
2//!
3//! Exposes entity mutations (insert, update, delete) as a pollable event stream.
4//! Consumers poll with a cursor (LSN) to receive new events since their last position.
5
6use std::collections::VecDeque;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::time::{SystemTime, UNIX_EPOCH};
9
10use crate::json::{Map, Value as JsonValue};
11
12/// Type of change operation.
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum ChangeOperation {
15    Insert,
16    Update,
17    Delete,
18}
19
20impl ChangeOperation {
21    pub fn from_str(value: &str) -> Option<Self> {
22        match value {
23            "insert" => Some(Self::Insert),
24            "update" => Some(Self::Update),
25            "delete" => Some(Self::Delete),
26            _ => None,
27        }
28    }
29
30    pub fn as_str(&self) -> &'static str {
31        match self {
32            Self::Insert => "insert",
33            Self::Update => "update",
34            Self::Delete => "delete",
35        }
36    }
37}
38
39/// A single change event.
40#[derive(Debug, Clone)]
41pub struct ChangeEvent {
42    /// Monotonically increasing sequence number
43    pub lsn: u64,
44    /// When the change occurred (unix ms)
45    pub timestamp: u64,
46    /// Type of operation
47    pub operation: ChangeOperation,
48    /// Collection name
49    pub collection: String,
50    /// Entity ID affected
51    pub entity_id: u64,
52    /// Entity kind (table, graph_node, graph_edge, vector, etc.)
53    pub entity_kind: String,
54    /// For `Update` events, the list of column names whose values
55    /// changed (including added/removed columns). `None` when the
56    /// emitter didn't compute a damage vector (inserts, deletes, and
57    /// coarse-grained update paths that haven't been rewired yet).
58    /// Downstream CDC consumers can use this to skip replaying
59    /// updates that touched columns they don't care about.
60    pub changed_columns: Option<Vec<String>>,
61    /// Optional KV-specific payload for WATCH subscribers. Existing CDC
62    /// consumers can ignore this field and continue using the generic shape.
63    pub kv: Option<KvWatchEvent>,
64}
65
66impl ChangeEvent {
67    pub fn rid(&self) -> u64 {
68        self.entity_id
69    }
70
71    pub fn kind(&self) -> &'static str {
72        public_item_kind(&self.entity_kind)
73    }
74}
75
76/// A committed single-key KV change surfaced by WATCH.
77#[derive(Debug, Clone, PartialEq)]
78pub struct KvWatchEvent {
79    pub collection: String,
80    pub key: String,
81    pub op: ChangeOperation,
82    pub before: Option<JsonValue>,
83    pub after: Option<JsonValue>,
84    pub lsn: u64,
85    pub committed_at: u64,
86    pub dropped_event_count: u64,
87}
88
89impl KvWatchEvent {
90    pub fn to_json_value(&self) -> JsonValue {
91        let mut object = Map::new();
92        object.insert("key".to_string(), JsonValue::String(self.key.clone()));
93        object.insert(
94            "op".to_string(),
95            JsonValue::String(self.op.as_str().to_string()),
96        );
97        object.insert(
98            "before".to_string(),
99            self.before.clone().unwrap_or(JsonValue::Null),
100        );
101        object.insert(
102            "after".to_string(),
103            self.after.clone().unwrap_or(JsonValue::Null),
104        );
105        object.insert("lsn".to_string(), JsonValue::Number(self.lsn as f64));
106        object.insert(
107            "committed_at".to_string(),
108            JsonValue::Number(self.committed_at as f64),
109        );
110        object.insert(
111            "dropped_event_count".to_string(),
112            JsonValue::Number(self.dropped_event_count as f64),
113        );
114        JsonValue::Object(object)
115    }
116}
117
118/// Structured logical WAL record serialized into the replication buffer and
119/// archived segments.
120#[derive(Debug, Clone)]
121pub struct ChangeRecord {
122    pub lsn: u64,
123    pub timestamp: u64,
124    pub operation: ChangeOperation,
125    pub collection: String,
126    pub entity_id: u64,
127    pub entity_kind: String,
128    pub entity_bytes: Option<Vec<u8>>,
129    pub metadata: Option<JsonValue>,
130}
131
132impl ChangeRecord {
133    pub fn from_entity(
134        lsn: u64,
135        timestamp: u64,
136        operation: ChangeOperation,
137        collection: impl Into<String>,
138        entity_kind: impl Into<String>,
139        entity: &crate::storage::UnifiedEntity,
140        format_version: u32,
141        metadata: Option<JsonValue>,
142    ) -> Self {
143        let entity_bytes = match operation {
144            ChangeOperation::Delete => None,
145            ChangeOperation::Insert | ChangeOperation::Update => Some(
146                crate::storage::UnifiedStore::serialize_entity(entity, format_version),
147            ),
148        };
149
150        Self {
151            lsn,
152            timestamp,
153            operation,
154            collection: collection.into(),
155            entity_id: entity.id.raw(),
156            entity_kind: entity_kind.into(),
157            entity_bytes,
158            metadata,
159        }
160    }
161
162    pub fn to_json_value(&self) -> JsonValue {
163        let mut object = Map::new();
164        object.insert("lsn".to_string(), JsonValue::Number(self.lsn as f64));
165        object.insert(
166            "timestamp".to_string(),
167            JsonValue::Number(self.timestamp as f64),
168        );
169        object.insert(
170            "operation".to_string(),
171            JsonValue::String(self.operation.as_str().to_string()),
172        );
173        object.insert(
174            "collection".to_string(),
175            JsonValue::String(self.collection.clone()),
176        );
177        object.insert("rid".to_string(), JsonValue::Number(self.entity_id as f64));
178        object.insert(
179            "kind".to_string(),
180            JsonValue::String(public_item_kind(&self.entity_kind).to_string()),
181        );
182        if let Some(bytes) = &self.entity_bytes {
183            object.insert(
184                "entity_bytes_hex".to_string(),
185                JsonValue::String(hex::encode(bytes)),
186            );
187        }
188        if let Some(metadata) = &self.metadata {
189            object.insert("metadata".to_string(), metadata.clone());
190        }
191        JsonValue::Object(object)
192    }
193
194    pub fn encode(&self) -> Vec<u8> {
195        crate::json::to_string(&self.to_json_value())
196            .unwrap_or_else(|_| "{}".to_string())
197            .into_bytes()
198    }
199
200    pub fn decode(bytes: &[u8]) -> Result<Self, String> {
201        let text = std::str::from_utf8(bytes).map_err(|err| err.to_string())?;
202        let value = crate::json::from_str::<JsonValue>(text).map_err(|err| err.to_string())?;
203        let operation = value
204            .get("operation")
205            .and_then(JsonValue::as_str)
206            .and_then(ChangeOperation::from_str)
207            .ok_or_else(|| "invalid replication operation".to_string())?;
208        let entity_bytes = value
209            .get("entity_bytes_hex")
210            .and_then(JsonValue::as_str)
211            .map(hex::decode)
212            .transpose()
213            .map_err(|err| err.to_string())?;
214
215        Ok(Self {
216            lsn: value.get("lsn").and_then(JsonValue::as_u64).unwrap_or(0),
217            timestamp: value
218                .get("timestamp")
219                .and_then(JsonValue::as_u64)
220                .unwrap_or(0),
221            operation,
222            collection: value
223                .get("collection")
224                .and_then(JsonValue::as_str)
225                .unwrap_or_default()
226                .to_string(),
227            entity_id: value
228                .get("rid")
229                .or_else(|| value.get("entity_id"))
230                .and_then(JsonValue::as_u64)
231                .unwrap_or(0),
232            entity_kind: value
233                .get("kind")
234                .or_else(|| value.get("entity_kind"))
235                .and_then(JsonValue::as_str)
236                .unwrap_or("entity")
237                .to_string(),
238            entity_bytes,
239            metadata: value.get("metadata").cloned(),
240        })
241    }
242}
243
244pub fn public_item_kind(entity_kind: &str) -> &'static str {
245    match entity_kind {
246        "table" | "entity" | "row" => "row",
247        "graph_node" | "node" => "node",
248        "graph_edge" | "edge" => "edge",
249        "kv" => "kv",
250        "document" => "document",
251        "vector" => "vector",
252        other if other.contains("kv") => "kv",
253        other if other.contains("document") => "document",
254        other if other.contains("vector") => "vector",
255        _ => "item",
256    }
257}
258
259/// CDC event buffer — circular buffer of change events.
260///
261/// Splits the "next LSN" counter (write-contended on every emit)
262/// from the event ring (short-hold push/pop) so that concurrent
263/// emitters don't serialise on a single RwLock. The previous
264/// design used one `RwLock<CdcState>` that turned every insert
265/// into a write-lock acquire, capping 16-way concurrent writes
266/// at ~1000 ops/s (each writer paid ~1ms queueing for the same
267/// mutex even though the work it guarded was a one-line VecDeque
268/// push).
269///
270/// New layout:
271///   - LSN is an `AtomicU64`, assigned with `fetch_add(1)`.
272///     Zero contention.
273///   - Events are guarded by a `parking_lot::Mutex<VecDeque>`.
274///     The critical section is `pop_front (if full) + push_back`
275///     — microseconds at most, parking-free at low contention.
276///
277/// Readers (`poll`, `current_lsn`, `stats`) take the same mutex
278/// briefly; they're cold paths compared to the write hot path.
279pub struct CdcBuffer {
280    next_lsn: AtomicU64,
281    events: parking_lot::Mutex<VecDeque<ChangeEvent>>,
282    max_size: usize,
283}
284
285impl CdcBuffer {
286    /// Create a new CDC buffer with maximum capacity.
287    pub fn new(max_size: usize) -> Self {
288        Self {
289            next_lsn: AtomicU64::new(0),
290            events: parking_lot::Mutex::new(VecDeque::with_capacity(max_size.min(10_000))),
291            max_size,
292        }
293    }
294
295    /// Emit a change event into the buffer. `changed_columns`
296    /// defaults to `None` for backwards compatibility; call sites
297    /// that have a damage vector available should use
298    /// [`Self::emit_with_columns`] instead.
299    pub fn emit(
300        &self,
301        operation: ChangeOperation,
302        collection: &str,
303        entity_id: u64,
304        entity_kind: &str,
305    ) -> u64 {
306        self.emit_with_columns(operation, collection, entity_id, entity_kind, None)
307    }
308
309    /// Emit a change event with an optional list of column names
310    /// that were affected. Use from update paths that have already
311    /// computed a [`RowDamageVector`](crate::application::entity::RowDamageVector)
312    /// so CDC consumers can filter by touched column without re-diffing.
313    pub fn emit_with_columns(
314        &self,
315        operation: ChangeOperation,
316        collection: &str,
317        entity_id: u64,
318        entity_kind: &str,
319        changed_columns: Option<Vec<String>>,
320    ) -> u64 {
321        // LSN assignment is lock-free — multiple emitters each get a
322        // unique monotonic LSN without waiting on any other emitter.
323        let event_lsn = self.next_lsn.fetch_add(1, Ordering::AcqRel) + 1;
324
325        let event = ChangeEvent {
326            lsn: event_lsn,
327            timestamp: SystemTime::now()
328                .duration_since(UNIX_EPOCH)
329                .unwrap_or_default()
330                .as_millis() as u64,
331            operation,
332            collection: collection.to_string(),
333            entity_id,
334            entity_kind: entity_kind.to_string(),
335            changed_columns,
336            kv: None,
337        };
338
339        // Short-hold ring push. Under heavy contention parking_lot
340        // spins a few times before parking, so typical hold time is
341        // a couple hundred nanoseconds.
342        let mut events = self.events.lock();
343        if events.len() >= self.max_size {
344            events.pop_front();
345        }
346        events.push_back(event);
347
348        event_lsn
349    }
350
351    /// Emit many same-collection events with one LSN reservation and one
352    /// ring-buffer lock. This is used by bulk insert paths that do not need
353    /// per-row logical-WAL records.
354    pub fn emit_batch_same_collection<I>(
355        &self,
356        operation: ChangeOperation,
357        collection: &str,
358        entity_kind: &str,
359        entity_ids: I,
360    ) -> Vec<u64>
361    where
362        I: IntoIterator<Item = u64>,
363        I::IntoIter: ExactSizeIterator,
364    {
365        let iter = entity_ids.into_iter();
366        let count = iter.len();
367        if count == 0 {
368            return Vec::new();
369        }
370
371        let first_lsn = self.next_lsn.fetch_add(count as u64, Ordering::AcqRel) + 1;
372        let lsns = (0..count)
373            .map(|idx| first_lsn + idx as u64)
374            .collect::<Vec<_>>();
375        if self.max_size == 0 {
376            return lsns;
377        }
378
379        let timestamp = SystemTime::now()
380            .duration_since(UNIX_EPOCH)
381            .unwrap_or_default()
382            .as_millis() as u64;
383        let collection = collection.to_string();
384        let entity_kind = entity_kind.to_string();
385
386        let skip = count.saturating_sub(self.max_size);
387        let kept = count - skip;
388        let mut events = self.events.lock();
389        let overflow = events
390            .len()
391            .saturating_add(kept)
392            .saturating_sub(self.max_size);
393        for _ in 0..overflow {
394            events.pop_front();
395        }
396
397        for (idx, entity_id) in iter.enumerate().skip(skip) {
398            events.push_back(ChangeEvent {
399                lsn: first_lsn + idx as u64,
400                timestamp,
401                operation,
402                collection: collection.clone(),
403                entity_id,
404                entity_kind: entity_kind.clone(),
405                changed_columns: None,
406                kv: None,
407            });
408        }
409        lsns
410    }
411
412    /// Emit a committed logical KV event into the same CDC ring used by
413    /// result-cache invalidation and `/changes` consumers.
414    pub fn emit_kv(
415        &self,
416        operation: ChangeOperation,
417        collection: &str,
418        key: &str,
419        entity_id: u64,
420        before: Option<JsonValue>,
421        after: Option<JsonValue>,
422    ) -> u64 {
423        let event_lsn = self.next_lsn.fetch_add(1, Ordering::AcqRel) + 1;
424        let timestamp = SystemTime::now()
425            .duration_since(UNIX_EPOCH)
426            .unwrap_or_default()
427            .as_millis() as u64;
428        let kv = KvWatchEvent {
429            collection: collection.to_string(),
430            key: key.to_string(),
431            op: operation,
432            before,
433            after,
434            lsn: event_lsn,
435            committed_at: timestamp,
436            dropped_event_count: 0,
437        };
438        let event = ChangeEvent {
439            lsn: event_lsn,
440            timestamp,
441            operation,
442            collection: collection.to_string(),
443            entity_id,
444            entity_kind: "kv".to_string(),
445            changed_columns: Some(vec!["value".to_string()]),
446            kv: Some(kv),
447        };
448
449        let mut events = self.events.lock();
450        if events.len() >= self.max_size {
451            events.pop_front();
452        }
453        events.push_back(event);
454        event_lsn
455    }
456
457    /// Poll for events since a given LSN.
458    pub fn poll(&self, since_lsn: u64, max_count: usize) -> Vec<ChangeEvent> {
459        let events = self.events.lock();
460        events
461            .iter()
462            .filter(|e| e.lsn > since_lsn)
463            .take(max_count)
464            .cloned()
465            .collect()
466    }
467
468    /// Get the current (latest) LSN.
469    pub fn current_lsn(&self) -> u64 {
470        self.next_lsn.load(Ordering::Acquire)
471    }
472
473    /// Restore the LSN cursor after process restart. Only advances;
474    /// never rewinds. Under concurrent emit this is guarded by a
475    /// compare-exchange loop.
476    pub fn set_current_lsn(&self, lsn: u64) {
477        let mut current = self.next_lsn.load(Ordering::Acquire);
478        while lsn > current {
479            match self
480                .next_lsn
481                .compare_exchange(current, lsn, Ordering::AcqRel, Ordering::Acquire)
482            {
483                Ok(_) => break,
484                Err(observed) => current = observed,
485            }
486        }
487    }
488
489    /// Get the oldest available LSN (or None if empty).
490    pub fn oldest_lsn(&self) -> Option<u64> {
491        self.events.lock().front().map(|e| e.lsn)
492    }
493
494    /// Get buffer stats (single lock acquisition — no deadlock risk).
495    pub fn stats(&self) -> CdcStats {
496        let events = self.events.lock();
497        CdcStats {
498            buffered_events: events.len(),
499            current_lsn: self.next_lsn.load(Ordering::Acquire),
500            oldest_lsn: events.front().map(|e| e.lsn),
501            newest_lsn: events.back().map(|e| e.lsn),
502        }
503    }
504}
505
506/// CDC buffer statistics.
507#[derive(Debug, Clone)]
508pub struct CdcStats {
509    pub buffered_events: usize,
510    pub current_lsn: u64,
511    pub oldest_lsn: Option<u64>,
512    pub newest_lsn: Option<u64>,
513}
514
515#[cfg(test)]
516mod tests {
517    use super::*;
518
519    #[test]
520    fn test_emit_and_poll() {
521        let buf = CdcBuffer::new(100);
522        buf.emit(ChangeOperation::Insert, "users", 1, "table");
523        buf.emit(ChangeOperation::Update, "users", 1, "table");
524        buf.emit(ChangeOperation::Delete, "users", 1, "table");
525
526        let events = buf.poll(0, 10);
527        assert_eq!(events.len(), 3);
528        assert_eq!(events[0].operation, ChangeOperation::Insert);
529        assert_eq!(events[1].operation, ChangeOperation::Update);
530        assert_eq!(events[2].operation, ChangeOperation::Delete);
531        // Backwards-compat emit should leave changed_columns None.
532        assert!(events[0].changed_columns.is_none());
533        assert!(events[1].changed_columns.is_none());
534    }
535
536    #[test]
537    fn test_emit_with_columns_propagates_damage_vector() {
538        let buf = CdcBuffer::new(100);
539        buf.emit_with_columns(
540            ChangeOperation::Update,
541            "users",
542            7,
543            "table",
544            Some(vec!["email".to_string(), "age".to_string()]),
545        );
546        buf.emit(ChangeOperation::Update, "users", 8, "table");
547
548        let events = buf.poll(0, 10);
549        assert_eq!(events.len(), 2);
550        assert_eq!(
551            events[0].changed_columns.as_deref(),
552            Some(vec!["email".to_string(), "age".to_string()].as_slice())
553        );
554        assert!(events[1].changed_columns.is_none());
555    }
556
557    #[test]
558    fn test_poll_with_cursor() {
559        let buf = CdcBuffer::new(100);
560        buf.emit(ChangeOperation::Insert, "a", 1, "table");
561        buf.emit(ChangeOperation::Insert, "b", 2, "table");
562        buf.emit(ChangeOperation::Insert, "c", 3, "table");
563
564        // Poll from lsn=1, should get events 2 and 3
565        let events = buf.poll(1, 10);
566        assert_eq!(events.len(), 2);
567        assert_eq!(events[0].collection, "b");
568        assert_eq!(events[1].collection, "c");
569    }
570
571    #[test]
572    fn test_emit_batch_same_collection_assigns_contiguous_lsns() {
573        let buf = CdcBuffer::new(100);
574        buf.emit(ChangeOperation::Insert, "users", 10, "table");
575        buf.emit_batch_same_collection(ChangeOperation::Insert, "users", "table", [11, 12, 13]);
576
577        let events = buf.poll(0, 10);
578        assert_eq!(events.len(), 4);
579        assert_eq!(events[1].lsn, 2);
580        assert_eq!(events[2].lsn, 3);
581        assert_eq!(events[3].lsn, 4);
582        assert_eq!(events[3].entity_id, 13);
583        assert_eq!(buf.current_lsn(), 4);
584    }
585
586    #[test]
587    fn test_emit_batch_same_collection_respects_ring_size() {
588        let buf = CdcBuffer::new(3);
589        buf.emit_batch_same_collection(ChangeOperation::Insert, "users", "table", [1, 2, 3, 4, 5]);
590
591        let events = buf.poll(0, 10);
592        assert_eq!(events.len(), 3);
593        assert_eq!(
594            events
595                .iter()
596                .map(|event| event.entity_id)
597                .collect::<Vec<_>>(),
598            vec![3, 4, 5]
599        );
600        assert_eq!(events[0].lsn, 3);
601        assert_eq!(events[2].lsn, 5);
602        assert_eq!(buf.current_lsn(), 5);
603    }
604
605    #[test]
606    fn test_circular_eviction() {
607        let buf = CdcBuffer::new(3);
608        buf.emit(ChangeOperation::Insert, "a", 1, "table");
609        buf.emit(ChangeOperation::Insert, "b", 2, "table");
610        buf.emit(ChangeOperation::Insert, "c", 3, "table");
611        buf.emit(ChangeOperation::Insert, "d", 4, "table"); // evicts "a"
612
613        let events = buf.poll(0, 10);
614        assert_eq!(events.len(), 3);
615        assert_eq!(events[0].collection, "b"); // "a" was evicted
616    }
617
618    #[test]
619    fn test_stats() {
620        let buf = CdcBuffer::new(100);
621        buf.emit(ChangeOperation::Insert, "x", 1, "table");
622        buf.emit(ChangeOperation::Insert, "y", 2, "table");
623
624        let stats = buf.stats();
625        assert_eq!(stats.buffered_events, 2);
626        assert_eq!(stats.current_lsn, 2);
627        assert_eq!(stats.oldest_lsn, Some(1));
628        assert_eq!(stats.newest_lsn, Some(2));
629    }
630
631    #[test]
632    fn test_change_record_roundtrip() {
633        let record = ChangeRecord {
634            lsn: 7,
635            timestamp: 1234,
636            operation: ChangeOperation::Update,
637            collection: "users".to_string(),
638            entity_id: 42,
639            entity_kind: "row".to_string(),
640            entity_bytes: Some(vec![1, 2, 3]),
641            metadata: Some(crate::json!({"role": "admin"})),
642        };
643
644        let decoded = ChangeRecord::decode(&record.encode()).expect("decode");
645        assert_eq!(decoded.lsn, record.lsn);
646        assert_eq!(decoded.collection, record.collection);
647        assert_eq!(decoded.entity_id, record.entity_id);
648        assert_eq!(decoded.entity_bytes, record.entity_bytes);
649    }
650}