Skip to main content

reddb_server/replication/
cdc.rs

1//! Change Data Capture (CDC) — stream of database change events.
2//!
3//! Exposes entity mutations (insert, update, delete) as a pollable event stream.
4//! Consumers poll with a cursor (LSN) to receive new events since their last position.
5
6use std::collections::VecDeque;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::time::{SystemTime, UNIX_EPOCH};
9
10use crate::json::{Map, Value as JsonValue};
11
12/// Type of change operation.
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum ChangeOperation {
15    Insert,
16    Update,
17    Delete,
18    /// Issue #596 slice 9d — atomic full-collection replace.
19    /// Carries a list of serialized entity records in
20    /// [`ChangeRecord::refresh_records`]; the replica applier
21    /// calls `UnifiedStore::refresh_collection` to apply the
22    /// swap atomically (no partial state visible to readers).
23    Refresh,
24}
25
26impl ChangeOperation {
27    pub fn from_str(value: &str) -> Option<Self> {
28        match value {
29            "insert" => Some(Self::Insert),
30            "update" => Some(Self::Update),
31            "delete" => Some(Self::Delete),
32            "refresh" => Some(Self::Refresh),
33            _ => None,
34        }
35    }
36
37    pub fn as_str(&self) -> &'static str {
38        match self {
39            Self::Insert => "insert",
40            Self::Update => "update",
41            Self::Delete => "delete",
42            Self::Refresh => "refresh",
43        }
44    }
45}
46
47/// A single change event.
48#[derive(Debug, Clone)]
49pub struct ChangeEvent {
50    /// Monotonically increasing sequence number
51    pub lsn: u64,
52    /// When the change occurred (unix ms)
53    pub timestamp: u64,
54    /// Type of operation
55    pub operation: ChangeOperation,
56    /// Collection name
57    pub collection: String,
58    /// Entity ID affected
59    pub entity_id: u64,
60    /// Entity kind (table, graph_node, graph_edge, vector, etc.)
61    pub entity_kind: String,
62    /// For `Update` events, the list of column names whose values
63    /// changed (including added/removed columns). `None` when the
64    /// emitter didn't compute a damage vector (inserts, deletes, and
65    /// coarse-grained update paths that haven't been rewired yet).
66    /// Downstream CDC consumers can use this to skip replaying
67    /// updates that touched columns they don't care about.
68    pub changed_columns: Option<Vec<String>>,
69    /// Optional KV-specific payload for WATCH subscribers. Existing CDC
70    /// consumers can ignore this field and continue using the generic shape.
71    pub kv: Option<KvWatchEvent>,
72}
73
74impl ChangeEvent {
75    pub fn rid(&self) -> u64 {
76        self.entity_id
77    }
78
79    pub fn kind(&self) -> &'static str {
80        public_item_kind(&self.entity_kind)
81    }
82}
83
84/// A committed single-key KV change surfaced by WATCH.
85#[derive(Debug, Clone, PartialEq)]
86pub struct KvWatchEvent {
87    pub collection: String,
88    pub key: String,
89    pub op: ChangeOperation,
90    pub before: Option<JsonValue>,
91    pub after: Option<JsonValue>,
92    pub lsn: u64,
93    pub committed_at: u64,
94    pub dropped_event_count: u64,
95}
96
97impl KvWatchEvent {
98    pub fn to_json_value(&self) -> JsonValue {
99        let mut object = Map::new();
100        object.insert("key".to_string(), JsonValue::String(self.key.clone()));
101        object.insert(
102            "op".to_string(),
103            JsonValue::String(self.op.as_str().to_string()),
104        );
105        object.insert(
106            "before".to_string(),
107            self.before.clone().unwrap_or(JsonValue::Null),
108        );
109        object.insert(
110            "after".to_string(),
111            self.after.clone().unwrap_or(JsonValue::Null),
112        );
113        object.insert("lsn".to_string(), JsonValue::Number(self.lsn as f64));
114        object.insert(
115            "committed_at".to_string(),
116            JsonValue::Number(self.committed_at as f64),
117        );
118        object.insert(
119            "dropped_event_count".to_string(),
120            JsonValue::Number(self.dropped_event_count as f64),
121        );
122        JsonValue::Object(object)
123    }
124}
125
126/// Structured logical WAL record serialized into the replication buffer and
127/// archived segments.
128#[derive(Debug, Clone)]
129pub struct ChangeRecord {
130    pub lsn: u64,
131    pub timestamp: u64,
132    pub operation: ChangeOperation,
133    pub collection: String,
134    pub entity_id: u64,
135    pub entity_kind: String,
136    pub entity_bytes: Option<Vec<u8>>,
137    pub metadata: Option<JsonValue>,
138    /// Issue #596 slice 9d — payload for [`ChangeOperation::Refresh`].
139    /// Each element is a serialized entity record (the same
140    /// `serialize_entity_record` payload the store's `RefreshCollection`
141    /// WAL action uses), in the order the replica should `bulk_insert`
142    /// after dropping the prior backing-collection contents.
143    pub refresh_records: Option<Vec<Vec<u8>>>,
144}
145
146impl ChangeRecord {
147    pub fn from_entity(
148        lsn: u64,
149        timestamp: u64,
150        operation: ChangeOperation,
151        collection: impl Into<String>,
152        entity_kind: impl Into<String>,
153        entity: &crate::storage::UnifiedEntity,
154        format_version: u32,
155        metadata: Option<JsonValue>,
156    ) -> Self {
157        let entity_bytes = match operation {
158            ChangeOperation::Delete | ChangeOperation::Refresh => None,
159            ChangeOperation::Insert | ChangeOperation::Update => Some(
160                crate::storage::UnifiedStore::serialize_entity(entity, format_version),
161            ),
162        };
163
164        Self {
165            lsn,
166            timestamp,
167            operation,
168            collection: collection.into(),
169            entity_id: entity.id.raw(),
170            entity_kind: entity_kind.into(),
171            entity_bytes,
172            metadata,
173            refresh_records: None,
174        }
175    }
176
177    /// Build a refresh change record carrying the new contents for a
178    /// `REFRESH MATERIALIZED VIEW` replay on the replica. Issue #596
179    /// slice 9d.
180    pub fn for_refresh(
181        lsn: u64,
182        timestamp: u64,
183        collection: impl Into<String>,
184        records: Vec<Vec<u8>>,
185    ) -> Self {
186        Self {
187            lsn,
188            timestamp,
189            operation: ChangeOperation::Refresh,
190            collection: collection.into(),
191            entity_id: 0,
192            entity_kind: "refresh".to_string(),
193            entity_bytes: None,
194            metadata: None,
195            refresh_records: Some(records),
196        }
197    }
198
199    pub fn to_json_value(&self) -> JsonValue {
200        let mut object = Map::new();
201        object.insert("lsn".to_string(), JsonValue::Number(self.lsn as f64));
202        object.insert(
203            "timestamp".to_string(),
204            JsonValue::Number(self.timestamp as f64),
205        );
206        object.insert(
207            "operation".to_string(),
208            JsonValue::String(self.operation.as_str().to_string()),
209        );
210        object.insert(
211            "collection".to_string(),
212            JsonValue::String(self.collection.clone()),
213        );
214        object.insert("rid".to_string(), JsonValue::Number(self.entity_id as f64));
215        object.insert(
216            "kind".to_string(),
217            JsonValue::String(public_item_kind(&self.entity_kind).to_string()),
218        );
219        if let Some(bytes) = &self.entity_bytes {
220            object.insert(
221                "entity_bytes_hex".to_string(),
222                JsonValue::String(hex::encode(bytes)),
223            );
224        }
225        if let Some(metadata) = &self.metadata {
226            object.insert("metadata".to_string(), metadata.clone());
227        }
228        if let Some(records) = &self.refresh_records {
229            let arr = records
230                .iter()
231                .map(|bytes| JsonValue::String(hex::encode(bytes)))
232                .collect();
233            object.insert("refresh_records_hex".to_string(), JsonValue::Array(arr));
234        }
235        JsonValue::Object(object)
236    }
237
238    pub fn encode(&self) -> Vec<u8> {
239        crate::json::to_string(&self.to_json_value())
240            .unwrap_or_else(|_| "{}".to_string())
241            .into_bytes()
242    }
243
244    pub fn decode(bytes: &[u8]) -> Result<Self, String> {
245        let text = std::str::from_utf8(bytes).map_err(|err| err.to_string())?;
246        let value = crate::json::from_str::<JsonValue>(text).map_err(|err| err.to_string())?;
247        let operation = value
248            .get("operation")
249            .and_then(JsonValue::as_str)
250            .and_then(ChangeOperation::from_str)
251            .ok_or_else(|| "invalid replication operation".to_string())?;
252        let entity_bytes = value
253            .get("entity_bytes_hex")
254            .and_then(JsonValue::as_str)
255            .map(hex::decode)
256            .transpose()
257            .map_err(|err| err.to_string())?;
258
259        Ok(Self {
260            lsn: value.get("lsn").and_then(JsonValue::as_u64).unwrap_or(0),
261            timestamp: value
262                .get("timestamp")
263                .and_then(JsonValue::as_u64)
264                .unwrap_or(0),
265            operation,
266            collection: value
267                .get("collection")
268                .and_then(JsonValue::as_str)
269                .unwrap_or_default()
270                .to_string(),
271            entity_id: value
272                .get("rid")
273                .or_else(|| value.get("entity_id"))
274                .and_then(JsonValue::as_u64)
275                .unwrap_or(0),
276            entity_kind: value
277                .get("kind")
278                .or_else(|| value.get("entity_kind"))
279                .and_then(JsonValue::as_str)
280                .unwrap_or("entity")
281                .to_string(),
282            entity_bytes,
283            metadata: value.get("metadata").cloned(),
284            refresh_records: match value.get("refresh_records_hex") {
285                Some(JsonValue::Array(items)) => {
286                    let mut out = Vec::with_capacity(items.len());
287                    for item in items {
288                        let hex_str = item
289                            .as_str()
290                            .ok_or_else(|| "refresh_records_hex entry not a string".to_string())?;
291                        let bytes = hex::decode(hex_str).map_err(|err| err.to_string())?;
292                        out.push(bytes);
293                    }
294                    Some(out)
295                }
296                None | Some(JsonValue::Null) => None,
297                _ => return Err("refresh_records_hex is not an array".to_string()),
298            },
299        })
300    }
301}
302
303pub fn public_item_kind(entity_kind: &str) -> &'static str {
304    match entity_kind {
305        "table" | "entity" | "row" => "row",
306        "graph_node" | "node" => "node",
307        "graph_edge" | "edge" => "edge",
308        "kv" => "kv",
309        "document" => "document",
310        "vector" => "vector",
311        other if other.contains("kv") => "kv",
312        other if other.contains("document") => "document",
313        other if other.contains("vector") => "vector",
314        _ => "item",
315    }
316}
317
318/// CDC event buffer — circular buffer of change events.
319///
320/// Splits the "next LSN" counter (write-contended on every emit)
321/// from the event ring (short-hold push/pop) so that concurrent
322/// emitters don't serialise on a single RwLock. The previous
323/// design used one `RwLock<CdcState>` that turned every insert
324/// into a write-lock acquire, capping 16-way concurrent writes
325/// at ~1000 ops/s (each writer paid ~1ms queueing for the same
326/// mutex even though the work it guarded was a one-line VecDeque
327/// push).
328///
329/// New layout:
330///   - LSN is an `AtomicU64`, assigned with `fetch_add(1)`.
331///     Zero contention.
332///   - Events are guarded by a `parking_lot::Mutex<VecDeque>`.
333///     The critical section is `pop_front (if full) + push_back`
334///     — microseconds at most, parking-free at low contention.
335///
336/// Readers (`poll`, `current_lsn`, `stats`) take the same mutex
337/// briefly; they're cold paths compared to the write hot path.
338pub struct CdcBuffer {
339    next_lsn: AtomicU64,
340    events: parking_lot::Mutex<VecDeque<ChangeEvent>>,
341    max_size: usize,
342}
343
344impl CdcBuffer {
345    /// Create a new CDC buffer with maximum capacity.
346    pub fn new(max_size: usize) -> Self {
347        Self {
348            next_lsn: AtomicU64::new(0),
349            events: parking_lot::Mutex::new(VecDeque::with_capacity(max_size.min(10_000))),
350            max_size,
351        }
352    }
353
354    /// Emit a change event into the buffer. `changed_columns`
355    /// defaults to `None` for backwards compatibility; call sites
356    /// that have a damage vector available should use
357    /// [`Self::emit_with_columns`] instead.
358    pub fn emit(
359        &self,
360        operation: ChangeOperation,
361        collection: &str,
362        entity_id: u64,
363        entity_kind: &str,
364    ) -> u64 {
365        self.emit_with_columns(operation, collection, entity_id, entity_kind, None)
366    }
367
368    /// Emit a change event with an optional list of column names
369    /// that were affected. Use from update paths that have already
370    /// computed a [`RowDamageVector`](crate::application::entity::RowDamageVector)
371    /// so CDC consumers can filter by touched column without re-diffing.
372    pub fn emit_with_columns(
373        &self,
374        operation: ChangeOperation,
375        collection: &str,
376        entity_id: u64,
377        entity_kind: &str,
378        changed_columns: Option<Vec<String>>,
379    ) -> u64 {
380        // LSN assignment is lock-free — multiple emitters each get a
381        // unique monotonic LSN without waiting on any other emitter.
382        let event_lsn = self.next_lsn.fetch_add(1, Ordering::AcqRel) + 1;
383
384        let event = ChangeEvent {
385            lsn: event_lsn,
386            timestamp: SystemTime::now()
387                .duration_since(UNIX_EPOCH)
388                .unwrap_or_default()
389                .as_millis() as u64,
390            operation,
391            collection: collection.to_string(),
392            entity_id,
393            entity_kind: entity_kind.to_string(),
394            changed_columns,
395            kv: None,
396        };
397
398        // Short-hold ring push. Under heavy contention parking_lot
399        // spins a few times before parking, so typical hold time is
400        // a couple hundred nanoseconds.
401        let mut events = self.events.lock();
402        if events.len() >= self.max_size {
403            events.pop_front();
404        }
405        events.push_back(event);
406
407        event_lsn
408    }
409
410    /// Emit many same-collection events with one LSN reservation and one
411    /// ring-buffer lock. This is used by bulk insert paths that do not need
412    /// per-row logical-WAL records.
413    pub fn emit_batch_same_collection<I>(
414        &self,
415        operation: ChangeOperation,
416        collection: &str,
417        entity_kind: &str,
418        entity_ids: I,
419    ) -> Vec<u64>
420    where
421        I: IntoIterator<Item = u64>,
422        I::IntoIter: ExactSizeIterator,
423    {
424        let iter = entity_ids.into_iter();
425        let count = iter.len();
426        if count == 0 {
427            return Vec::new();
428        }
429
430        let first_lsn = self.next_lsn.fetch_add(count as u64, Ordering::AcqRel) + 1;
431        let lsns = (0..count)
432            .map(|idx| first_lsn + idx as u64)
433            .collect::<Vec<_>>();
434        if self.max_size == 0 {
435            return lsns;
436        }
437
438        let timestamp = SystemTime::now()
439            .duration_since(UNIX_EPOCH)
440            .unwrap_or_default()
441            .as_millis() as u64;
442        let collection = collection.to_string();
443        let entity_kind = entity_kind.to_string();
444
445        let skip = count.saturating_sub(self.max_size);
446        let kept = count - skip;
447        let mut events = self.events.lock();
448        let overflow = events
449            .len()
450            .saturating_add(kept)
451            .saturating_sub(self.max_size);
452        for _ in 0..overflow {
453            events.pop_front();
454        }
455
456        for (idx, entity_id) in iter.enumerate().skip(skip) {
457            events.push_back(ChangeEvent {
458                lsn: first_lsn + idx as u64,
459                timestamp,
460                operation,
461                collection: collection.clone(),
462                entity_id,
463                entity_kind: entity_kind.clone(),
464                changed_columns: None,
465                kv: None,
466            });
467        }
468        lsns
469    }
470
471    /// Emit a committed logical KV event into the same CDC ring used by
472    /// result-cache invalidation and `/changes` consumers.
473    pub fn emit_kv(
474        &self,
475        operation: ChangeOperation,
476        collection: &str,
477        key: &str,
478        entity_id: u64,
479        before: Option<JsonValue>,
480        after: Option<JsonValue>,
481    ) -> u64 {
482        let event_lsn = self.next_lsn.fetch_add(1, Ordering::AcqRel) + 1;
483        let timestamp = SystemTime::now()
484            .duration_since(UNIX_EPOCH)
485            .unwrap_or_default()
486            .as_millis() as u64;
487        let kv = KvWatchEvent {
488            collection: collection.to_string(),
489            key: key.to_string(),
490            op: operation,
491            before,
492            after,
493            lsn: event_lsn,
494            committed_at: timestamp,
495            dropped_event_count: 0,
496        };
497        let event = ChangeEvent {
498            lsn: event_lsn,
499            timestamp,
500            operation,
501            collection: collection.to_string(),
502            entity_id,
503            entity_kind: "kv".to_string(),
504            changed_columns: Some(vec!["value".to_string()]),
505            kv: Some(kv),
506        };
507
508        let mut events = self.events.lock();
509        if events.len() >= self.max_size {
510            events.pop_front();
511        }
512        events.push_back(event);
513        event_lsn
514    }
515
516    /// Poll for events since a given LSN.
517    pub fn poll(&self, since_lsn: u64, max_count: usize) -> Vec<ChangeEvent> {
518        let events = self.events.lock();
519        events
520            .iter()
521            .filter(|e| e.lsn > since_lsn)
522            .take(max_count)
523            .cloned()
524            .collect()
525    }
526
527    /// Get the current (latest) LSN.
528    pub fn current_lsn(&self) -> u64 {
529        self.next_lsn.load(Ordering::Acquire)
530    }
531
532    /// Restore the LSN cursor after process restart. Only advances;
533    /// never rewinds. Under concurrent emit this is guarded by a
534    /// compare-exchange loop.
535    pub fn set_current_lsn(&self, lsn: u64) {
536        let mut current = self.next_lsn.load(Ordering::Acquire);
537        while lsn > current {
538            match self
539                .next_lsn
540                .compare_exchange(current, lsn, Ordering::AcqRel, Ordering::Acquire)
541            {
542                Ok(_) => break,
543                Err(observed) => current = observed,
544            }
545        }
546    }
547
548    /// Get the oldest available LSN (or None if empty).
549    pub fn oldest_lsn(&self) -> Option<u64> {
550        self.events.lock().front().map(|e| e.lsn)
551    }
552
553    /// Get buffer stats (single lock acquisition — no deadlock risk).
554    pub fn stats(&self) -> CdcStats {
555        let events = self.events.lock();
556        CdcStats {
557            buffered_events: events.len(),
558            current_lsn: self.next_lsn.load(Ordering::Acquire),
559            oldest_lsn: events.front().map(|e| e.lsn),
560            newest_lsn: events.back().map(|e| e.lsn),
561        }
562    }
563}
564
565/// CDC buffer statistics.
566#[derive(Debug, Clone)]
567pub struct CdcStats {
568    pub buffered_events: usize,
569    pub current_lsn: u64,
570    pub oldest_lsn: Option<u64>,
571    pub newest_lsn: Option<u64>,
572}
573
574#[cfg(test)]
575mod tests {
576    use super::*;
577
578    #[test]
579    fn test_emit_and_poll() {
580        let buf = CdcBuffer::new(100);
581        buf.emit(ChangeOperation::Insert, "users", 1, "table");
582        buf.emit(ChangeOperation::Update, "users", 1, "table");
583        buf.emit(ChangeOperation::Delete, "users", 1, "table");
584
585        let events = buf.poll(0, 10);
586        assert_eq!(events.len(), 3);
587        assert_eq!(events[0].operation, ChangeOperation::Insert);
588        assert_eq!(events[1].operation, ChangeOperation::Update);
589        assert_eq!(events[2].operation, ChangeOperation::Delete);
590        // Backwards-compat emit should leave changed_columns None.
591        assert!(events[0].changed_columns.is_none());
592        assert!(events[1].changed_columns.is_none());
593    }
594
595    #[test]
596    fn test_emit_with_columns_propagates_damage_vector() {
597        let buf = CdcBuffer::new(100);
598        buf.emit_with_columns(
599            ChangeOperation::Update,
600            "users",
601            7,
602            "table",
603            Some(vec!["email".to_string(), "age".to_string()]),
604        );
605        buf.emit(ChangeOperation::Update, "users", 8, "table");
606
607        let events = buf.poll(0, 10);
608        assert_eq!(events.len(), 2);
609        assert_eq!(
610            events[0].changed_columns.as_deref(),
611            Some(vec!["email".to_string(), "age".to_string()].as_slice())
612        );
613        assert!(events[1].changed_columns.is_none());
614    }
615
616    #[test]
617    fn test_poll_with_cursor() {
618        let buf = CdcBuffer::new(100);
619        buf.emit(ChangeOperation::Insert, "a", 1, "table");
620        buf.emit(ChangeOperation::Insert, "b", 2, "table");
621        buf.emit(ChangeOperation::Insert, "c", 3, "table");
622
623        // Poll from lsn=1, should get events 2 and 3
624        let events = buf.poll(1, 10);
625        assert_eq!(events.len(), 2);
626        assert_eq!(events[0].collection, "b");
627        assert_eq!(events[1].collection, "c");
628    }
629
630    #[test]
631    fn test_emit_batch_same_collection_assigns_contiguous_lsns() {
632        let buf = CdcBuffer::new(100);
633        buf.emit(ChangeOperation::Insert, "users", 10, "table");
634        buf.emit_batch_same_collection(ChangeOperation::Insert, "users", "table", [11, 12, 13]);
635
636        let events = buf.poll(0, 10);
637        assert_eq!(events.len(), 4);
638        assert_eq!(events[1].lsn, 2);
639        assert_eq!(events[2].lsn, 3);
640        assert_eq!(events[3].lsn, 4);
641        assert_eq!(events[3].entity_id, 13);
642        assert_eq!(buf.current_lsn(), 4);
643    }
644
645    #[test]
646    fn test_emit_batch_same_collection_respects_ring_size() {
647        let buf = CdcBuffer::new(3);
648        buf.emit_batch_same_collection(ChangeOperation::Insert, "users", "table", [1, 2, 3, 4, 5]);
649
650        let events = buf.poll(0, 10);
651        assert_eq!(events.len(), 3);
652        assert_eq!(
653            events
654                .iter()
655                .map(|event| event.entity_id)
656                .collect::<Vec<_>>(),
657            vec![3, 4, 5]
658        );
659        assert_eq!(events[0].lsn, 3);
660        assert_eq!(events[2].lsn, 5);
661        assert_eq!(buf.current_lsn(), 5);
662    }
663
664    #[test]
665    fn test_circular_eviction() {
666        let buf = CdcBuffer::new(3);
667        buf.emit(ChangeOperation::Insert, "a", 1, "table");
668        buf.emit(ChangeOperation::Insert, "b", 2, "table");
669        buf.emit(ChangeOperation::Insert, "c", 3, "table");
670        buf.emit(ChangeOperation::Insert, "d", 4, "table"); // evicts "a"
671
672        let events = buf.poll(0, 10);
673        assert_eq!(events.len(), 3);
674        assert_eq!(events[0].collection, "b"); // "a" was evicted
675    }
676
677    #[test]
678    fn test_stats() {
679        let buf = CdcBuffer::new(100);
680        buf.emit(ChangeOperation::Insert, "x", 1, "table");
681        buf.emit(ChangeOperation::Insert, "y", 2, "table");
682
683        let stats = buf.stats();
684        assert_eq!(stats.buffered_events, 2);
685        assert_eq!(stats.current_lsn, 2);
686        assert_eq!(stats.oldest_lsn, Some(1));
687        assert_eq!(stats.newest_lsn, Some(2));
688    }
689
690    #[test]
691    fn test_change_record_roundtrip() {
692        let record = ChangeRecord {
693            lsn: 7,
694            timestamp: 1234,
695            operation: ChangeOperation::Update,
696            collection: "users".to_string(),
697            entity_id: 42,
698            entity_kind: "row".to_string(),
699            entity_bytes: Some(vec![1, 2, 3]),
700            metadata: Some(crate::json!({"role": "admin"})),
701            refresh_records: None,
702        };
703
704        let decoded = ChangeRecord::decode(&record.encode()).expect("decode");
705        assert_eq!(decoded.lsn, record.lsn);
706        assert_eq!(decoded.collection, record.collection);
707        assert_eq!(decoded.entity_id, record.entity_id);
708        assert_eq!(decoded.entity_bytes, record.entity_bytes);
709    }
710
711    /// Issue #596 slice 9d — refresh records survive a JSON round-trip
712    /// through the logical-WAL wire format the primary writes and the
713    /// replica reads. Bit-for-bit equality on every payload byte is the
714    /// contract — the replica calls `bulk_insert` with whatever bytes
715    /// land in `refresh_records`, so a decode that silently drops or
716    /// re-orders them would silently diverge replica state.
717    #[test]
718    fn test_change_record_refresh_roundtrip() {
719        let records = vec![vec![0x10, 0x20, 0x30], vec![0xAA, 0xBB], Vec::new()];
720        let record = ChangeRecord::for_refresh(11, 99, "mv_orders_summary", records.clone());
721
722        let decoded = ChangeRecord::decode(&record.encode()).expect("decode");
723        assert_eq!(decoded.operation, ChangeOperation::Refresh);
724        assert_eq!(decoded.collection, "mv_orders_summary");
725        assert_eq!(decoded.refresh_records.as_deref(), Some(&records[..]));
726    }
727}