Skip to main content

reddb_server/replication/
cdc.rs

1//! Change Data Capture (CDC) — stream of database change events.
2//!
3//! Exposes entity mutations (insert, update, delete) as a pollable event stream.
4//! Consumers poll with a cursor (LSN) to receive new events since their last position.
5
6use std::collections::VecDeque;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::time::{SystemTime, UNIX_EPOCH};
9
10use crate::json::{Map, Value as JsonValue};
11
12/// Type of change operation.
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum ChangeOperation {
15    Insert,
16    Update,
17    Delete,
18    /// Issue #596 slice 9d — atomic full-collection replace.
19    /// Carries a list of serialized entity records in
20    /// [`ChangeRecord::refresh_records`]; the replica applier
21    /// calls `UnifiedStore::refresh_collection` to apply the
22    /// swap atomically (no partial state visible to readers).
23    Refresh,
24}
25
26impl ChangeOperation {
27    pub fn from_str(value: &str) -> Option<Self> {
28        match value {
29            "insert" => Some(Self::Insert),
30            "update" => Some(Self::Update),
31            "delete" => Some(Self::Delete),
32            "refresh" => Some(Self::Refresh),
33            _ => None,
34        }
35    }
36
37    pub fn as_str(&self) -> &'static str {
38        match self {
39            Self::Insert => "insert",
40            Self::Update => "update",
41            Self::Delete => "delete",
42            Self::Refresh => "refresh",
43        }
44    }
45}
46
47/// A single change event.
48#[derive(Debug, Clone)]
49pub struct ChangeEvent {
50    /// Monotonically increasing sequence number
51    pub lsn: u64,
52    /// When the change occurred (unix ms)
53    pub timestamp: u64,
54    /// Type of operation
55    pub operation: ChangeOperation,
56    /// Collection name
57    pub collection: String,
58    /// Entity ID affected
59    pub entity_id: u64,
60    /// Entity kind (table, graph_node, graph_edge, vector, etc.)
61    pub entity_kind: String,
62    /// For `Update` events, the list of column names whose values
63    /// changed (including added/removed columns). `None` when the
64    /// emitter didn't compute a damage vector (inserts, deletes, and
65    /// coarse-grained update paths that haven't been rewired yet).
66    /// Downstream CDC consumers can use this to skip replaying
67    /// updates that touched columns they don't care about.
68    pub changed_columns: Option<Vec<String>>,
69    /// Optional KV-specific payload for WATCH subscribers. Existing CDC
70    /// consumers can ignore this field and continue using the generic shape.
71    pub kv: Option<KvWatchEvent>,
72}
73
74impl ChangeEvent {
75    pub fn rid(&self) -> u64 {
76        self.entity_id
77    }
78
79    pub fn kind(&self) -> &'static str {
80        public_item_kind(&self.entity_kind)
81    }
82}
83
84/// A committed single-key KV change surfaced by WATCH.
85#[derive(Debug, Clone, PartialEq)]
86pub struct KvWatchEvent {
87    pub collection: String,
88    pub key: String,
89    pub op: ChangeOperation,
90    pub before: Option<JsonValue>,
91    pub after: Option<JsonValue>,
92    pub lsn: u64,
93    pub committed_at: u64,
94    pub dropped_event_count: u64,
95}
96
97impl KvWatchEvent {
98    pub fn to_json_value(&self) -> JsonValue {
99        let mut object = Map::new();
100        object.insert("key".to_string(), JsonValue::String(self.key.clone()));
101        object.insert(
102            "op".to_string(),
103            JsonValue::String(self.op.as_str().to_string()),
104        );
105        object.insert(
106            "before".to_string(),
107            self.before.clone().unwrap_or(JsonValue::Null),
108        );
109        object.insert(
110            "after".to_string(),
111            self.after.clone().unwrap_or(JsonValue::Null),
112        );
113        object.insert("lsn".to_string(), JsonValue::Number(self.lsn as f64));
114        object.insert(
115            "committed_at".to_string(),
116            JsonValue::Number(self.committed_at as f64),
117        );
118        object.insert(
119            "dropped_event_count".to_string(),
120            JsonValue::Number(self.dropped_event_count as f64),
121        );
122        JsonValue::Object(object)
123    }
124}
125
126/// Structured logical WAL record serialized into the replication buffer and
127/// archived segments.
128#[derive(Debug, Clone)]
129pub struct ChangeRecord {
130    pub term: u64,
131    pub lsn: u64,
132    pub timestamp: u64,
133    pub operation: ChangeOperation,
134    pub collection: String,
135    pub entity_id: u64,
136    pub entity_kind: String,
137    pub entity_bytes: Option<Vec<u8>>,
138    pub metadata: Option<JsonValue>,
139    /// Issue #596 slice 9d — payload for [`ChangeOperation::Refresh`].
140    /// Each element is a serialized entity record (the same
141    /// `serialize_entity_record` payload the store's `RefreshCollection`
142    /// WAL action uses), in the order the replica should `bulk_insert`
143    /// after dropping the prior backing-collection contents.
144    pub refresh_records: Option<Vec<Vec<u8>>>,
145}
146
147impl ChangeRecord {
148    pub fn from_entity(
149        lsn: u64,
150        timestamp: u64,
151        operation: ChangeOperation,
152        collection: impl Into<String>,
153        entity_kind: impl Into<String>,
154        entity: &crate::storage::UnifiedEntity,
155        format_version: u32,
156        metadata: Option<JsonValue>,
157    ) -> Self {
158        let entity_bytes = match operation {
159            ChangeOperation::Delete | ChangeOperation::Refresh => None,
160            ChangeOperation::Insert | ChangeOperation::Update => Some(
161                crate::storage::UnifiedStore::serialize_entity(entity, format_version),
162            ),
163        };
164
165        Self {
166            term: crate::replication::DEFAULT_REPLICATION_TERM,
167            lsn,
168            timestamp,
169            operation,
170            collection: collection.into(),
171            entity_id: entity.id.raw(),
172            entity_kind: entity_kind.into(),
173            entity_bytes,
174            metadata,
175            refresh_records: None,
176        }
177    }
178
179    /// Build a refresh change record carrying the new contents for a
180    /// `REFRESH MATERIALIZED VIEW` replay on the replica. Issue #596
181    /// slice 9d.
182    pub fn for_refresh(
183        lsn: u64,
184        timestamp: u64,
185        collection: impl Into<String>,
186        records: Vec<Vec<u8>>,
187    ) -> Self {
188        Self {
189            term: crate::replication::DEFAULT_REPLICATION_TERM,
190            lsn,
191            timestamp,
192            operation: ChangeOperation::Refresh,
193            collection: collection.into(),
194            entity_id: 0,
195            entity_kind: "refresh".to_string(),
196            entity_bytes: None,
197            metadata: None,
198            refresh_records: Some(records),
199        }
200    }
201
202    pub fn to_json_value(&self) -> JsonValue {
203        let mut object = Map::new();
204        object.insert("term".to_string(), JsonValue::Number(self.term as f64));
205        object.insert("lsn".to_string(), JsonValue::Number(self.lsn as f64));
206        object.insert(
207            "timestamp".to_string(),
208            JsonValue::Number(self.timestamp as f64),
209        );
210        object.insert(
211            "operation".to_string(),
212            JsonValue::String(self.operation.as_str().to_string()),
213        );
214        object.insert(
215            "collection".to_string(),
216            JsonValue::String(self.collection.clone()),
217        );
218        object.insert("rid".to_string(), JsonValue::Number(self.entity_id as f64));
219        object.insert(
220            "kind".to_string(),
221            JsonValue::String(public_item_kind(&self.entity_kind).to_string()),
222        );
223        if let Some(bytes) = &self.entity_bytes {
224            object.insert(
225                "entity_bytes_hex".to_string(),
226                JsonValue::String(hex::encode(bytes)),
227            );
228        }
229        if let Some(metadata) = &self.metadata {
230            object.insert("metadata".to_string(), metadata.clone());
231        }
232        if let Some(records) = &self.refresh_records {
233            let arr = records
234                .iter()
235                .map(|bytes| JsonValue::String(hex::encode(bytes)))
236                .collect();
237            object.insert("refresh_records_hex".to_string(), JsonValue::Array(arr));
238        }
239        JsonValue::Object(object)
240    }
241
242    pub fn encode(&self) -> Vec<u8> {
243        crate::json::to_string(&self.to_json_value())
244            .unwrap_or_else(|_| "{}".to_string())
245            .into_bytes()
246    }
247
248    pub fn with_term(mut self, term: u64) -> Self {
249        self.term = term;
250        self
251    }
252
253    pub fn decode(bytes: &[u8]) -> Result<Self, String> {
254        let text = std::str::from_utf8(bytes).map_err(|err| err.to_string())?;
255        let value = crate::json::from_str::<JsonValue>(text).map_err(|err| err.to_string())?;
256        let operation = value
257            .get("operation")
258            .and_then(JsonValue::as_str)
259            .and_then(ChangeOperation::from_str)
260            .ok_or_else(|| "invalid replication operation".to_string())?;
261        let entity_bytes = value
262            .get("entity_bytes_hex")
263            .and_then(JsonValue::as_str)
264            .map(hex::decode)
265            .transpose()
266            .map_err(|err| err.to_string())?;
267
268        Ok(Self {
269            term: value
270                .get("term")
271                .and_then(JsonValue::as_u64)
272                .unwrap_or(crate::replication::DEFAULT_REPLICATION_TERM),
273            lsn: value.get("lsn").and_then(JsonValue::as_u64).unwrap_or(0),
274            timestamp: value
275                .get("timestamp")
276                .and_then(JsonValue::as_u64)
277                .unwrap_or(0),
278            operation,
279            collection: value
280                .get("collection")
281                .and_then(JsonValue::as_str)
282                .unwrap_or_default()
283                .to_string(),
284            entity_id: value
285                .get("rid")
286                .or_else(|| value.get("entity_id"))
287                .and_then(JsonValue::as_u64)
288                .unwrap_or(0),
289            entity_kind: value
290                .get("kind")
291                .or_else(|| value.get("entity_kind"))
292                .and_then(JsonValue::as_str)
293                .unwrap_or("entity")
294                .to_string(),
295            entity_bytes,
296            metadata: value.get("metadata").cloned(),
297            refresh_records: match value.get("refresh_records_hex") {
298                Some(JsonValue::Array(items)) => {
299                    let mut out = Vec::with_capacity(items.len());
300                    for item in items {
301                        let hex_str = item
302                            .as_str()
303                            .ok_or_else(|| "refresh_records_hex entry not a string".to_string())?;
304                        let bytes = hex::decode(hex_str).map_err(|err| err.to_string())?;
305                        out.push(bytes);
306                    }
307                    Some(out)
308                }
309                None | Some(JsonValue::Null) => None,
310                _ => return Err("refresh_records_hex is not an array".to_string()),
311            },
312        })
313    }
314}
315
316pub fn public_item_kind(entity_kind: &str) -> &'static str {
317    match entity_kind {
318        "table" | "entity" | "row" => "row",
319        "graph_node" | "node" => "node",
320        "graph_edge" | "edge" => "edge",
321        "kv" => "kv",
322        "document" => "document",
323        "vector" => "vector",
324        other if other.contains("kv") => "kv",
325        other if other.contains("document") => "document",
326        other if other.contains("vector") => "vector",
327        _ => "item",
328    }
329}
330
331/// CDC event buffer — circular buffer of change events.
332///
333/// Splits the "next LSN" counter (write-contended on every emit)
334/// from the event ring (short-hold push/pop) so that concurrent
335/// emitters don't serialise on a single RwLock. The previous
336/// design used one `RwLock<CdcState>` that turned every insert
337/// into a write-lock acquire, capping 16-way concurrent writes
338/// at ~1000 ops/s (each writer paid ~1ms queueing for the same
339/// mutex even though the work it guarded was a one-line VecDeque
340/// push).
341///
342/// New layout:
343///   - LSN is an `AtomicU64`, assigned with `fetch_add(1)`.
344///     Zero contention.
345///   - Events are guarded by a `parking_lot::Mutex<VecDeque>`.
346///     The critical section is `pop_front (if full) + push_back`
347///     — microseconds at most, parking-free at low contention.
348///
349/// Readers (`poll`, `current_lsn`, `stats`) take the same mutex
350/// briefly; they're cold paths compared to the write hot path.
351pub struct CdcBuffer {
352    next_lsn: AtomicU64,
353    events: parking_lot::Mutex<VecDeque<ChangeEvent>>,
354    max_size: usize,
355}
356
357impl CdcBuffer {
358    /// Create a new CDC buffer with maximum capacity.
359    pub fn new(max_size: usize) -> Self {
360        Self {
361            next_lsn: AtomicU64::new(0),
362            events: parking_lot::Mutex::new(VecDeque::with_capacity(max_size.min(10_000))),
363            max_size,
364        }
365    }
366
367    /// Emit a change event into the buffer. `changed_columns`
368    /// defaults to `None` for backwards compatibility; call sites
369    /// that have a damage vector available should use
370    /// [`Self::emit_with_columns`] instead.
371    pub fn emit(
372        &self,
373        operation: ChangeOperation,
374        collection: &str,
375        entity_id: u64,
376        entity_kind: &str,
377    ) -> u64 {
378        self.emit_with_columns(operation, collection, entity_id, entity_kind, None)
379    }
380
381    /// Emit a change event with an optional list of column names
382    /// that were affected. Use from update paths that have already
383    /// computed a [`RowDamageVector`](crate::application::entity::RowDamageVector)
384    /// so CDC consumers can filter by touched column without re-diffing.
385    pub fn emit_with_columns(
386        &self,
387        operation: ChangeOperation,
388        collection: &str,
389        entity_id: u64,
390        entity_kind: &str,
391        changed_columns: Option<Vec<String>>,
392    ) -> u64 {
393        // LSN assignment is lock-free — multiple emitters each get a
394        // unique monotonic LSN without waiting on any other emitter.
395        let event_lsn = self.next_lsn.fetch_add(1, Ordering::AcqRel) + 1;
396
397        let event = ChangeEvent {
398            lsn: event_lsn,
399            timestamp: SystemTime::now()
400                .duration_since(UNIX_EPOCH)
401                .unwrap_or_default()
402                .as_millis() as u64,
403            operation,
404            collection: collection.to_string(),
405            entity_id,
406            entity_kind: entity_kind.to_string(),
407            changed_columns,
408            kv: None,
409        };
410
411        // Short-hold ring push. Under heavy contention parking_lot
412        // spins a few times before parking, so typical hold time is
413        // a couple hundred nanoseconds.
414        let mut events = self.events.lock();
415        if events.len() >= self.max_size {
416            events.pop_front();
417        }
418        events.push_back(event);
419
420        event_lsn
421    }
422
423    /// Emit many same-collection events with one LSN reservation and one
424    /// ring-buffer lock. This is used by bulk insert paths that do not need
425    /// per-row logical-WAL records.
426    pub fn emit_batch_same_collection<I>(
427        &self,
428        operation: ChangeOperation,
429        collection: &str,
430        entity_kind: &str,
431        entity_ids: I,
432    ) -> Vec<u64>
433    where
434        I: IntoIterator<Item = u64>,
435        I::IntoIter: ExactSizeIterator,
436    {
437        let iter = entity_ids.into_iter();
438        let count = iter.len();
439        if count == 0 {
440            return Vec::new();
441        }
442
443        let first_lsn = self.next_lsn.fetch_add(count as u64, Ordering::AcqRel) + 1;
444        let lsns = (0..count)
445            .map(|idx| first_lsn + idx as u64)
446            .collect::<Vec<_>>();
447        if self.max_size == 0 {
448            return lsns;
449        }
450
451        let timestamp = SystemTime::now()
452            .duration_since(UNIX_EPOCH)
453            .unwrap_or_default()
454            .as_millis() as u64;
455        let collection = collection.to_string();
456        let entity_kind = entity_kind.to_string();
457
458        let skip = count.saturating_sub(self.max_size);
459        let kept = count - skip;
460        let mut events = self.events.lock();
461        let overflow = events
462            .len()
463            .saturating_add(kept)
464            .saturating_sub(self.max_size);
465        for _ in 0..overflow {
466            events.pop_front();
467        }
468
469        for (idx, entity_id) in iter.enumerate().skip(skip) {
470            events.push_back(ChangeEvent {
471                lsn: first_lsn + idx as u64,
472                timestamp,
473                operation,
474                collection: collection.clone(),
475                entity_id,
476                entity_kind: entity_kind.clone(),
477                changed_columns: None,
478                kv: None,
479            });
480        }
481        lsns
482    }
483
484    /// Emit a committed logical KV event into the same CDC ring used by
485    /// result-cache invalidation and `/changes` consumers.
486    pub fn emit_kv(
487        &self,
488        operation: ChangeOperation,
489        collection: &str,
490        key: &str,
491        entity_id: u64,
492        before: Option<JsonValue>,
493        after: Option<JsonValue>,
494    ) -> u64 {
495        let event_lsn = self.next_lsn.fetch_add(1, Ordering::AcqRel) + 1;
496        let timestamp = SystemTime::now()
497            .duration_since(UNIX_EPOCH)
498            .unwrap_or_default()
499            .as_millis() as u64;
500        let kv = KvWatchEvent {
501            collection: collection.to_string(),
502            key: key.to_string(),
503            op: operation,
504            before,
505            after,
506            lsn: event_lsn,
507            committed_at: timestamp,
508            dropped_event_count: 0,
509        };
510        let event = ChangeEvent {
511            lsn: event_lsn,
512            timestamp,
513            operation,
514            collection: collection.to_string(),
515            entity_id,
516            entity_kind: "kv".to_string(),
517            changed_columns: Some(vec!["value".to_string()]),
518            kv: Some(kv),
519        };
520
521        let mut events = self.events.lock();
522        if events.len() >= self.max_size {
523            events.pop_front();
524        }
525        events.push_back(event);
526        event_lsn
527    }
528
529    /// Poll for events since a given LSN.
530    pub fn poll(&self, since_lsn: u64, max_count: usize) -> Vec<ChangeEvent> {
531        let events = self.events.lock();
532        events
533            .iter()
534            .filter(|e| e.lsn > since_lsn)
535            .take(max_count)
536            .cloned()
537            .collect()
538    }
539
540    /// Get the current (latest) LSN.
541    pub fn current_lsn(&self) -> u64 {
542        self.next_lsn.load(Ordering::Acquire)
543    }
544
545    /// Restore the LSN cursor after process restart. Only advances;
546    /// never rewinds. Under concurrent emit this is guarded by a
547    /// compare-exchange loop.
548    pub fn set_current_lsn(&self, lsn: u64) {
549        let mut current = self.next_lsn.load(Ordering::Acquire);
550        while lsn > current {
551            match self
552                .next_lsn
553                .compare_exchange(current, lsn, Ordering::AcqRel, Ordering::Acquire)
554            {
555                Ok(_) => break,
556                Err(observed) => current = observed,
557            }
558        }
559    }
560
561    /// Get the oldest available LSN (or None if empty).
562    pub fn oldest_lsn(&self) -> Option<u64> {
563        self.events.lock().front().map(|e| e.lsn)
564    }
565
566    /// Get buffer stats (single lock acquisition — no deadlock risk).
567    pub fn stats(&self) -> CdcStats {
568        let events = self.events.lock();
569        CdcStats {
570            buffered_events: events.len(),
571            current_lsn: self.next_lsn.load(Ordering::Acquire),
572            oldest_lsn: events.front().map(|e| e.lsn),
573            newest_lsn: events.back().map(|e| e.lsn),
574        }
575    }
576}
577
578/// CDC buffer statistics.
579#[derive(Debug, Clone)]
580pub struct CdcStats {
581    pub buffered_events: usize,
582    pub current_lsn: u64,
583    pub oldest_lsn: Option<u64>,
584    pub newest_lsn: Option<u64>,
585}
586
587#[cfg(test)]
588mod tests {
589    use super::*;
590
591    #[test]
592    fn test_emit_and_poll() {
593        let buf = CdcBuffer::new(100);
594        buf.emit(ChangeOperation::Insert, "users", 1, "table");
595        buf.emit(ChangeOperation::Update, "users", 1, "table");
596        buf.emit(ChangeOperation::Delete, "users", 1, "table");
597
598        let events = buf.poll(0, 10);
599        assert_eq!(events.len(), 3);
600        assert_eq!(events[0].operation, ChangeOperation::Insert);
601        assert_eq!(events[1].operation, ChangeOperation::Update);
602        assert_eq!(events[2].operation, ChangeOperation::Delete);
603        // Backwards-compat emit should leave changed_columns None.
604        assert!(events[0].changed_columns.is_none());
605        assert!(events[1].changed_columns.is_none());
606    }
607
608    #[test]
609    fn test_emit_with_columns_propagates_damage_vector() {
610        let buf = CdcBuffer::new(100);
611        buf.emit_with_columns(
612            ChangeOperation::Update,
613            "users",
614            7,
615            "table",
616            Some(vec!["email".to_string(), "age".to_string()]),
617        );
618        buf.emit(ChangeOperation::Update, "users", 8, "table");
619
620        let events = buf.poll(0, 10);
621        assert_eq!(events.len(), 2);
622        assert_eq!(
623            events[0].changed_columns.as_deref(),
624            Some(vec!["email".to_string(), "age".to_string()].as_slice())
625        );
626        assert!(events[1].changed_columns.is_none());
627    }
628
629    #[test]
630    fn test_poll_with_cursor() {
631        let buf = CdcBuffer::new(100);
632        buf.emit(ChangeOperation::Insert, "a", 1, "table");
633        buf.emit(ChangeOperation::Insert, "b", 2, "table");
634        buf.emit(ChangeOperation::Insert, "c", 3, "table");
635
636        // Poll from lsn=1, should get events 2 and 3
637        let events = buf.poll(1, 10);
638        assert_eq!(events.len(), 2);
639        assert_eq!(events[0].collection, "b");
640        assert_eq!(events[1].collection, "c");
641    }
642
643    #[test]
644    fn test_emit_batch_same_collection_assigns_contiguous_lsns() {
645        let buf = CdcBuffer::new(100);
646        buf.emit(ChangeOperation::Insert, "users", 10, "table");
647        buf.emit_batch_same_collection(ChangeOperation::Insert, "users", "table", [11, 12, 13]);
648
649        let events = buf.poll(0, 10);
650        assert_eq!(events.len(), 4);
651        assert_eq!(events[1].lsn, 2);
652        assert_eq!(events[2].lsn, 3);
653        assert_eq!(events[3].lsn, 4);
654        assert_eq!(events[3].entity_id, 13);
655        assert_eq!(buf.current_lsn(), 4);
656    }
657
658    #[test]
659    fn test_emit_batch_same_collection_respects_ring_size() {
660        let buf = CdcBuffer::new(3);
661        buf.emit_batch_same_collection(ChangeOperation::Insert, "users", "table", [1, 2, 3, 4, 5]);
662
663        let events = buf.poll(0, 10);
664        assert_eq!(events.len(), 3);
665        assert_eq!(
666            events
667                .iter()
668                .map(|event| event.entity_id)
669                .collect::<Vec<_>>(),
670            vec![3, 4, 5]
671        );
672        assert_eq!(events[0].lsn, 3);
673        assert_eq!(events[2].lsn, 5);
674        assert_eq!(buf.current_lsn(), 5);
675    }
676
677    #[test]
678    fn test_circular_eviction() {
679        let buf = CdcBuffer::new(3);
680        buf.emit(ChangeOperation::Insert, "a", 1, "table");
681        buf.emit(ChangeOperation::Insert, "b", 2, "table");
682        buf.emit(ChangeOperation::Insert, "c", 3, "table");
683        buf.emit(ChangeOperation::Insert, "d", 4, "table"); // evicts "a"
684
685        let events = buf.poll(0, 10);
686        assert_eq!(events.len(), 3);
687        assert_eq!(events[0].collection, "b"); // "a" was evicted
688    }
689
690    #[test]
691    fn test_stats() {
692        let buf = CdcBuffer::new(100);
693        buf.emit(ChangeOperation::Insert, "x", 1, "table");
694        buf.emit(ChangeOperation::Insert, "y", 2, "table");
695
696        let stats = buf.stats();
697        assert_eq!(stats.buffered_events, 2);
698        assert_eq!(stats.current_lsn, 2);
699        assert_eq!(stats.oldest_lsn, Some(1));
700        assert_eq!(stats.newest_lsn, Some(2));
701    }
702
703    #[test]
704    fn test_change_record_roundtrip() {
705        let record = ChangeRecord {
706            term: 3,
707            lsn: 7,
708            timestamp: 1234,
709            operation: ChangeOperation::Update,
710            collection: "users".to_string(),
711            entity_id: 42,
712            entity_kind: "row".to_string(),
713            entity_bytes: Some(vec![1, 2, 3]),
714            metadata: Some(crate::json!({"role": "admin"})),
715            refresh_records: None,
716        };
717
718        let decoded = ChangeRecord::decode(&record.encode()).expect("decode");
719        assert_eq!(decoded.term, record.term);
720        assert_eq!(decoded.lsn, record.lsn);
721        assert_eq!(decoded.collection, record.collection);
722        assert_eq!(decoded.entity_id, record.entity_id);
723        assert_eq!(decoded.entity_bytes, record.entity_bytes);
724    }
725
726    /// Issue #596 slice 9d — refresh records survive a JSON round-trip
727    /// through the logical-WAL wire format the primary writes and the
728    /// replica reads. Bit-for-bit equality on every payload byte is the
729    /// contract — the replica calls `bulk_insert` with whatever bytes
730    /// land in `refresh_records`, so a decode that silently drops or
731    /// re-orders them would silently diverge replica state.
732    #[test]
733    fn test_change_record_refresh_roundtrip() {
734        let records = vec![vec![0x10, 0x20, 0x30], vec![0xAA, 0xBB], Vec::new()];
735        let record =
736            ChangeRecord::for_refresh(11, 99, "mv_orders_summary", records.clone()).with_term(4);
737
738        let decoded = ChangeRecord::decode(&record.encode()).expect("decode");
739        assert_eq!(decoded.term, 4);
740        assert_eq!(decoded.operation, ChangeOperation::Refresh);
741        assert_eq!(decoded.collection, "mv_orders_summary");
742        assert_eq!(decoded.refresh_records.as_deref(), Some(&records[..]));
743    }
744
745    #[test]
746    fn test_change_record_legacy_payload_defaults_term() {
747        let legacy = br#"{"lsn":9,"timestamp":1,"operation":"delete","collection":"users","rid":5,"kind":"row"}"#;
748        let decoded = ChangeRecord::decode(legacy).expect("decode legacy record");
749        assert_eq!(decoded.term, crate::replication::DEFAULT_REPLICATION_TERM);
750        assert_eq!(decoded.lsn, 9);
751    }
752}