1use std::collections::VecDeque;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::time::{SystemTime, UNIX_EPOCH};
9
10use crate::json::{Map, Value as JsonValue};
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum ChangeOperation {
15 Insert,
16 Update,
17 Delete,
18 Refresh,
24}
25
26impl ChangeOperation {
27 pub fn from_str(value: &str) -> Option<Self> {
28 match value {
29 "insert" => Some(Self::Insert),
30 "update" => Some(Self::Update),
31 "delete" => Some(Self::Delete),
32 "refresh" => Some(Self::Refresh),
33 _ => None,
34 }
35 }
36
37 pub fn as_str(&self) -> &'static str {
38 match self {
39 Self::Insert => "insert",
40 Self::Update => "update",
41 Self::Delete => "delete",
42 Self::Refresh => "refresh",
43 }
44 }
45}
46
47#[derive(Debug, Clone)]
49pub struct ChangeEvent {
50 pub lsn: u64,
52 pub timestamp: u64,
54 pub operation: ChangeOperation,
56 pub collection: String,
58 pub entity_id: u64,
60 pub entity_kind: String,
62 pub changed_columns: Option<Vec<String>>,
69 pub kv: Option<KvWatchEvent>,
72}
73
74impl ChangeEvent {
75 pub fn rid(&self) -> u64 {
76 self.entity_id
77 }
78
79 pub fn kind(&self) -> &'static str {
80 public_item_kind(&self.entity_kind)
81 }
82}
83
84#[derive(Debug, Clone, PartialEq)]
86pub struct KvWatchEvent {
87 pub collection: String,
88 pub key: String,
89 pub op: ChangeOperation,
90 pub before: Option<JsonValue>,
91 pub after: Option<JsonValue>,
92 pub lsn: u64,
93 pub committed_at: u64,
94 pub dropped_event_count: u64,
95}
96
97impl KvWatchEvent {
98 pub fn to_json_value(&self) -> JsonValue {
99 let mut object = Map::new();
100 object.insert("key".to_string(), JsonValue::String(self.key.clone()));
101 object.insert(
102 "op".to_string(),
103 JsonValue::String(self.op.as_str().to_string()),
104 );
105 object.insert(
106 "before".to_string(),
107 self.before.clone().unwrap_or(JsonValue::Null),
108 );
109 object.insert(
110 "after".to_string(),
111 self.after.clone().unwrap_or(JsonValue::Null),
112 );
113 object.insert("lsn".to_string(), JsonValue::Number(self.lsn as f64));
114 object.insert(
115 "committed_at".to_string(),
116 JsonValue::Number(self.committed_at as f64),
117 );
118 object.insert(
119 "dropped_event_count".to_string(),
120 JsonValue::Number(self.dropped_event_count as f64),
121 );
122 JsonValue::Object(object)
123 }
124}
125
126#[derive(Debug, Clone)]
129pub struct ChangeRecord {
130 pub term: u64,
131 pub lsn: u64,
132 pub timestamp: u64,
133 pub operation: ChangeOperation,
134 pub collection: String,
135 pub entity_id: u64,
136 pub entity_kind: String,
137 pub entity_bytes: Option<Vec<u8>>,
138 pub metadata: Option<JsonValue>,
139 pub refresh_records: Option<Vec<Vec<u8>>>,
145}
146
147impl ChangeRecord {
148 pub fn from_entity(
149 lsn: u64,
150 timestamp: u64,
151 operation: ChangeOperation,
152 collection: impl Into<String>,
153 entity_kind: impl Into<String>,
154 entity: &crate::storage::UnifiedEntity,
155 format_version: u32,
156 metadata: Option<JsonValue>,
157 ) -> Self {
158 let entity_bytes = match operation {
159 ChangeOperation::Delete | ChangeOperation::Refresh => None,
160 ChangeOperation::Insert | ChangeOperation::Update => Some(
161 crate::storage::UnifiedStore::serialize_entity(entity, format_version),
162 ),
163 };
164
165 Self {
166 term: crate::replication::DEFAULT_REPLICATION_TERM,
167 lsn,
168 timestamp,
169 operation,
170 collection: collection.into(),
171 entity_id: entity.id.raw(),
172 entity_kind: entity_kind.into(),
173 entity_bytes,
174 metadata,
175 refresh_records: None,
176 }
177 }
178
179 pub fn for_refresh(
183 lsn: u64,
184 timestamp: u64,
185 collection: impl Into<String>,
186 records: Vec<Vec<u8>>,
187 ) -> Self {
188 Self {
189 term: crate::replication::DEFAULT_REPLICATION_TERM,
190 lsn,
191 timestamp,
192 operation: ChangeOperation::Refresh,
193 collection: collection.into(),
194 entity_id: 0,
195 entity_kind: "refresh".to_string(),
196 entity_bytes: None,
197 metadata: None,
198 refresh_records: Some(records),
199 }
200 }
201
202 pub fn to_json_value(&self) -> JsonValue {
203 let mut object = Map::new();
204 object.insert("term".to_string(), JsonValue::Number(self.term as f64));
205 object.insert("lsn".to_string(), JsonValue::Number(self.lsn as f64));
206 object.insert(
207 "timestamp".to_string(),
208 JsonValue::Number(self.timestamp as f64),
209 );
210 object.insert(
211 "operation".to_string(),
212 JsonValue::String(self.operation.as_str().to_string()),
213 );
214 object.insert(
215 "collection".to_string(),
216 JsonValue::String(self.collection.clone()),
217 );
218 object.insert("rid".to_string(), JsonValue::Number(self.entity_id as f64));
219 object.insert(
220 "kind".to_string(),
221 JsonValue::String(public_item_kind(&self.entity_kind).to_string()),
222 );
223 if let Some(bytes) = &self.entity_bytes {
224 object.insert(
225 "entity_bytes_hex".to_string(),
226 JsonValue::String(hex::encode(bytes)),
227 );
228 }
229 if let Some(metadata) = &self.metadata {
230 object.insert("metadata".to_string(), metadata.clone());
231 }
232 if let Some(records) = &self.refresh_records {
233 let arr = records
234 .iter()
235 .map(|bytes| JsonValue::String(hex::encode(bytes)))
236 .collect();
237 object.insert("refresh_records_hex".to_string(), JsonValue::Array(arr));
238 }
239 JsonValue::Object(object)
240 }
241
242 pub fn encode(&self) -> Vec<u8> {
243 crate::json::to_string(&self.to_json_value())
244 .unwrap_or_else(|_| "{}".to_string())
245 .into_bytes()
246 }
247
248 pub fn with_term(mut self, term: u64) -> Self {
249 self.term = term;
250 self
251 }
252
253 pub fn decode(bytes: &[u8]) -> Result<Self, String> {
254 let text = std::str::from_utf8(bytes).map_err(|err| err.to_string())?;
255 let value = crate::json::from_str::<JsonValue>(text).map_err(|err| err.to_string())?;
256 let operation = value
257 .get("operation")
258 .and_then(JsonValue::as_str)
259 .and_then(ChangeOperation::from_str)
260 .ok_or_else(|| "invalid replication operation".to_string())?;
261 let entity_bytes = value
262 .get("entity_bytes_hex")
263 .and_then(JsonValue::as_str)
264 .map(hex::decode)
265 .transpose()
266 .map_err(|err| err.to_string())?;
267
268 Ok(Self {
269 term: value
270 .get("term")
271 .and_then(JsonValue::as_u64)
272 .unwrap_or(crate::replication::DEFAULT_REPLICATION_TERM),
273 lsn: value.get("lsn").and_then(JsonValue::as_u64).unwrap_or(0),
274 timestamp: value
275 .get("timestamp")
276 .and_then(JsonValue::as_u64)
277 .unwrap_or(0),
278 operation,
279 collection: value
280 .get("collection")
281 .and_then(JsonValue::as_str)
282 .unwrap_or_default()
283 .to_string(),
284 entity_id: value
285 .get("rid")
286 .or_else(|| value.get("entity_id"))
287 .and_then(JsonValue::as_u64)
288 .unwrap_or(0),
289 entity_kind: value
290 .get("kind")
291 .or_else(|| value.get("entity_kind"))
292 .and_then(JsonValue::as_str)
293 .unwrap_or("entity")
294 .to_string(),
295 entity_bytes,
296 metadata: value.get("metadata").cloned(),
297 refresh_records: match value.get("refresh_records_hex") {
298 Some(JsonValue::Array(items)) => {
299 let mut out = Vec::with_capacity(items.len());
300 for item in items {
301 let hex_str = item
302 .as_str()
303 .ok_or_else(|| "refresh_records_hex entry not a string".to_string())?;
304 let bytes = hex::decode(hex_str).map_err(|err| err.to_string())?;
305 out.push(bytes);
306 }
307 Some(out)
308 }
309 None | Some(JsonValue::Null) => None,
310 _ => return Err("refresh_records_hex is not an array".to_string()),
311 },
312 })
313 }
314}
315
316pub fn public_item_kind(entity_kind: &str) -> &'static str {
317 match entity_kind {
318 "table" | "entity" | "row" => "row",
319 "graph_node" | "node" => "node",
320 "graph_edge" | "edge" => "edge",
321 "kv" => "kv",
322 "document" => "document",
323 "vector" => "vector",
324 other if other.contains("kv") => "kv",
325 other if other.contains("document") => "document",
326 other if other.contains("vector") => "vector",
327 _ => "item",
328 }
329}
330
331pub struct CdcBuffer {
352 next_lsn: AtomicU64,
353 events: parking_lot::Mutex<VecDeque<ChangeEvent>>,
354 max_size: usize,
355}
356
357impl CdcBuffer {
358 pub fn new(max_size: usize) -> Self {
360 Self {
361 next_lsn: AtomicU64::new(0),
362 events: parking_lot::Mutex::new(VecDeque::with_capacity(max_size.min(10_000))),
363 max_size,
364 }
365 }
366
367 pub fn emit(
372 &self,
373 operation: ChangeOperation,
374 collection: &str,
375 entity_id: u64,
376 entity_kind: &str,
377 ) -> u64 {
378 self.emit_with_columns(operation, collection, entity_id, entity_kind, None)
379 }
380
381 pub fn emit_with_columns(
386 &self,
387 operation: ChangeOperation,
388 collection: &str,
389 entity_id: u64,
390 entity_kind: &str,
391 changed_columns: Option<Vec<String>>,
392 ) -> u64 {
393 let event_lsn = self.next_lsn.fetch_add(1, Ordering::AcqRel) + 1;
396
397 let event = ChangeEvent {
398 lsn: event_lsn,
399 timestamp: SystemTime::now()
400 .duration_since(UNIX_EPOCH)
401 .unwrap_or_default()
402 .as_millis() as u64,
403 operation,
404 collection: collection.to_string(),
405 entity_id,
406 entity_kind: entity_kind.to_string(),
407 changed_columns,
408 kv: None,
409 };
410
411 let mut events = self.events.lock();
415 if events.len() >= self.max_size {
416 events.pop_front();
417 }
418 events.push_back(event);
419
420 event_lsn
421 }
422
423 pub fn emit_batch_same_collection<I>(
427 &self,
428 operation: ChangeOperation,
429 collection: &str,
430 entity_kind: &str,
431 entity_ids: I,
432 ) -> Vec<u64>
433 where
434 I: IntoIterator<Item = u64>,
435 I::IntoIter: ExactSizeIterator,
436 {
437 let iter = entity_ids.into_iter();
438 let count = iter.len();
439 if count == 0 {
440 return Vec::new();
441 }
442
443 let first_lsn = self.next_lsn.fetch_add(count as u64, Ordering::AcqRel) + 1;
444 let lsns = (0..count)
445 .map(|idx| first_lsn + idx as u64)
446 .collect::<Vec<_>>();
447 if self.max_size == 0 {
448 return lsns;
449 }
450
451 let timestamp = SystemTime::now()
452 .duration_since(UNIX_EPOCH)
453 .unwrap_or_default()
454 .as_millis() as u64;
455 let collection = collection.to_string();
456 let entity_kind = entity_kind.to_string();
457
458 let skip = count.saturating_sub(self.max_size);
459 let kept = count - skip;
460 let mut events = self.events.lock();
461 let overflow = events
462 .len()
463 .saturating_add(kept)
464 .saturating_sub(self.max_size);
465 for _ in 0..overflow {
466 events.pop_front();
467 }
468
469 for (idx, entity_id) in iter.enumerate().skip(skip) {
470 events.push_back(ChangeEvent {
471 lsn: first_lsn + idx as u64,
472 timestamp,
473 operation,
474 collection: collection.clone(),
475 entity_id,
476 entity_kind: entity_kind.clone(),
477 changed_columns: None,
478 kv: None,
479 });
480 }
481 lsns
482 }
483
484 pub fn emit_kv(
487 &self,
488 operation: ChangeOperation,
489 collection: &str,
490 key: &str,
491 entity_id: u64,
492 before: Option<JsonValue>,
493 after: Option<JsonValue>,
494 ) -> u64 {
495 let event_lsn = self.next_lsn.fetch_add(1, Ordering::AcqRel) + 1;
496 let timestamp = SystemTime::now()
497 .duration_since(UNIX_EPOCH)
498 .unwrap_or_default()
499 .as_millis() as u64;
500 let kv = KvWatchEvent {
501 collection: collection.to_string(),
502 key: key.to_string(),
503 op: operation,
504 before,
505 after,
506 lsn: event_lsn,
507 committed_at: timestamp,
508 dropped_event_count: 0,
509 };
510 let event = ChangeEvent {
511 lsn: event_lsn,
512 timestamp,
513 operation,
514 collection: collection.to_string(),
515 entity_id,
516 entity_kind: "kv".to_string(),
517 changed_columns: Some(vec!["value".to_string()]),
518 kv: Some(kv),
519 };
520
521 let mut events = self.events.lock();
522 if events.len() >= self.max_size {
523 events.pop_front();
524 }
525 events.push_back(event);
526 event_lsn
527 }
528
529 pub fn poll(&self, since_lsn: u64, max_count: usize) -> Vec<ChangeEvent> {
531 let events = self.events.lock();
532 events
533 .iter()
534 .filter(|e| e.lsn > since_lsn)
535 .take(max_count)
536 .cloned()
537 .collect()
538 }
539
540 pub fn current_lsn(&self) -> u64 {
542 self.next_lsn.load(Ordering::Acquire)
543 }
544
545 pub fn set_current_lsn(&self, lsn: u64) {
549 let mut current = self.next_lsn.load(Ordering::Acquire);
550 while lsn > current {
551 match self
552 .next_lsn
553 .compare_exchange(current, lsn, Ordering::AcqRel, Ordering::Acquire)
554 {
555 Ok(_) => break,
556 Err(observed) => current = observed,
557 }
558 }
559 }
560
561 pub fn oldest_lsn(&self) -> Option<u64> {
563 self.events.lock().front().map(|e| e.lsn)
564 }
565
566 pub fn stats(&self) -> CdcStats {
568 let events = self.events.lock();
569 CdcStats {
570 buffered_events: events.len(),
571 current_lsn: self.next_lsn.load(Ordering::Acquire),
572 oldest_lsn: events.front().map(|e| e.lsn),
573 newest_lsn: events.back().map(|e| e.lsn),
574 }
575 }
576}
577
578#[derive(Debug, Clone)]
580pub struct CdcStats {
581 pub buffered_events: usize,
582 pub current_lsn: u64,
583 pub oldest_lsn: Option<u64>,
584 pub newest_lsn: Option<u64>,
585}
586
587#[cfg(test)]
588mod tests {
589 use super::*;
590
591 #[test]
592 fn test_emit_and_poll() {
593 let buf = CdcBuffer::new(100);
594 buf.emit(ChangeOperation::Insert, "users", 1, "table");
595 buf.emit(ChangeOperation::Update, "users", 1, "table");
596 buf.emit(ChangeOperation::Delete, "users", 1, "table");
597
598 let events = buf.poll(0, 10);
599 assert_eq!(events.len(), 3);
600 assert_eq!(events[0].operation, ChangeOperation::Insert);
601 assert_eq!(events[1].operation, ChangeOperation::Update);
602 assert_eq!(events[2].operation, ChangeOperation::Delete);
603 assert!(events[0].changed_columns.is_none());
605 assert!(events[1].changed_columns.is_none());
606 }
607
608 #[test]
609 fn test_emit_with_columns_propagates_damage_vector() {
610 let buf = CdcBuffer::new(100);
611 buf.emit_with_columns(
612 ChangeOperation::Update,
613 "users",
614 7,
615 "table",
616 Some(vec!["email".to_string(), "age".to_string()]),
617 );
618 buf.emit(ChangeOperation::Update, "users", 8, "table");
619
620 let events = buf.poll(0, 10);
621 assert_eq!(events.len(), 2);
622 assert_eq!(
623 events[0].changed_columns.as_deref(),
624 Some(vec!["email".to_string(), "age".to_string()].as_slice())
625 );
626 assert!(events[1].changed_columns.is_none());
627 }
628
629 #[test]
630 fn test_poll_with_cursor() {
631 let buf = CdcBuffer::new(100);
632 buf.emit(ChangeOperation::Insert, "a", 1, "table");
633 buf.emit(ChangeOperation::Insert, "b", 2, "table");
634 buf.emit(ChangeOperation::Insert, "c", 3, "table");
635
636 let events = buf.poll(1, 10);
638 assert_eq!(events.len(), 2);
639 assert_eq!(events[0].collection, "b");
640 assert_eq!(events[1].collection, "c");
641 }
642
643 #[test]
644 fn test_emit_batch_same_collection_assigns_contiguous_lsns() {
645 let buf = CdcBuffer::new(100);
646 buf.emit(ChangeOperation::Insert, "users", 10, "table");
647 buf.emit_batch_same_collection(ChangeOperation::Insert, "users", "table", [11, 12, 13]);
648
649 let events = buf.poll(0, 10);
650 assert_eq!(events.len(), 4);
651 assert_eq!(events[1].lsn, 2);
652 assert_eq!(events[2].lsn, 3);
653 assert_eq!(events[3].lsn, 4);
654 assert_eq!(events[3].entity_id, 13);
655 assert_eq!(buf.current_lsn(), 4);
656 }
657
658 #[test]
659 fn test_emit_batch_same_collection_respects_ring_size() {
660 let buf = CdcBuffer::new(3);
661 buf.emit_batch_same_collection(ChangeOperation::Insert, "users", "table", [1, 2, 3, 4, 5]);
662
663 let events = buf.poll(0, 10);
664 assert_eq!(events.len(), 3);
665 assert_eq!(
666 events
667 .iter()
668 .map(|event| event.entity_id)
669 .collect::<Vec<_>>(),
670 vec![3, 4, 5]
671 );
672 assert_eq!(events[0].lsn, 3);
673 assert_eq!(events[2].lsn, 5);
674 assert_eq!(buf.current_lsn(), 5);
675 }
676
677 #[test]
678 fn test_circular_eviction() {
679 let buf = CdcBuffer::new(3);
680 buf.emit(ChangeOperation::Insert, "a", 1, "table");
681 buf.emit(ChangeOperation::Insert, "b", 2, "table");
682 buf.emit(ChangeOperation::Insert, "c", 3, "table");
683 buf.emit(ChangeOperation::Insert, "d", 4, "table"); let events = buf.poll(0, 10);
686 assert_eq!(events.len(), 3);
687 assert_eq!(events[0].collection, "b"); }
689
690 #[test]
691 fn test_stats() {
692 let buf = CdcBuffer::new(100);
693 buf.emit(ChangeOperation::Insert, "x", 1, "table");
694 buf.emit(ChangeOperation::Insert, "y", 2, "table");
695
696 let stats = buf.stats();
697 assert_eq!(stats.buffered_events, 2);
698 assert_eq!(stats.current_lsn, 2);
699 assert_eq!(stats.oldest_lsn, Some(1));
700 assert_eq!(stats.newest_lsn, Some(2));
701 }
702
703 #[test]
704 fn test_change_record_roundtrip() {
705 let record = ChangeRecord {
706 term: 3,
707 lsn: 7,
708 timestamp: 1234,
709 operation: ChangeOperation::Update,
710 collection: "users".to_string(),
711 entity_id: 42,
712 entity_kind: "row".to_string(),
713 entity_bytes: Some(vec![1, 2, 3]),
714 metadata: Some(crate::json!({"role": "admin"})),
715 refresh_records: None,
716 };
717
718 let decoded = ChangeRecord::decode(&record.encode()).expect("decode");
719 assert_eq!(decoded.term, record.term);
720 assert_eq!(decoded.lsn, record.lsn);
721 assert_eq!(decoded.collection, record.collection);
722 assert_eq!(decoded.entity_id, record.entity_id);
723 assert_eq!(decoded.entity_bytes, record.entity_bytes);
724 }
725
726 #[test]
733 fn test_change_record_refresh_roundtrip() {
734 let records = vec![vec![0x10, 0x20, 0x30], vec![0xAA, 0xBB], Vec::new()];
735 let record =
736 ChangeRecord::for_refresh(11, 99, "mv_orders_summary", records.clone()).with_term(4);
737
738 let decoded = ChangeRecord::decode(&record.encode()).expect("decode");
739 assert_eq!(decoded.term, 4);
740 assert_eq!(decoded.operation, ChangeOperation::Refresh);
741 assert_eq!(decoded.collection, "mv_orders_summary");
742 assert_eq!(decoded.refresh_records.as_deref(), Some(&records[..]));
743 }
744
745 #[test]
746 fn test_change_record_legacy_payload_defaults_term() {
747 let legacy = br#"{"lsn":9,"timestamp":1,"operation":"delete","collection":"users","rid":5,"kind":"row"}"#;
748 let decoded = ChangeRecord::decode(legacy).expect("decode legacy record");
749 assert_eq!(decoded.term, crate::replication::DEFAULT_REPLICATION_TERM);
750 assert_eq!(decoded.lsn, 9);
751 }
752}