1use std::collections::VecDeque;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::time::{SystemTime, UNIX_EPOCH};
9
10use crate::json::{Map, Value as JsonValue};
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum ChangeOperation {
15 Insert,
16 Update,
17 Delete,
18 Refresh,
24}
25
26impl ChangeOperation {
27 pub fn from_str(value: &str) -> Option<Self> {
28 match value {
29 "insert" => Some(Self::Insert),
30 "update" => Some(Self::Update),
31 "delete" => Some(Self::Delete),
32 "refresh" => Some(Self::Refresh),
33 _ => None,
34 }
35 }
36
37 pub fn as_str(&self) -> &'static str {
38 match self {
39 Self::Insert => "insert",
40 Self::Update => "update",
41 Self::Delete => "delete",
42 Self::Refresh => "refresh",
43 }
44 }
45}
46
47#[derive(Debug, Clone)]
49pub struct ChangeEvent {
50 pub lsn: u64,
52 pub timestamp: u64,
54 pub operation: ChangeOperation,
56 pub collection: String,
58 pub entity_id: u64,
60 pub entity_kind: String,
62 pub changed_columns: Option<Vec<String>>,
69 pub kv: Option<KvWatchEvent>,
72}
73
74impl ChangeEvent {
75 pub fn rid(&self) -> u64 {
76 self.entity_id
77 }
78
79 pub fn kind(&self) -> &'static str {
80 public_item_kind(&self.entity_kind)
81 }
82}
83
84#[derive(Debug, Clone, PartialEq)]
86pub struct KvWatchEvent {
87 pub collection: String,
88 pub key: String,
89 pub op: ChangeOperation,
90 pub before: Option<JsonValue>,
91 pub after: Option<JsonValue>,
92 pub lsn: u64,
93 pub committed_at: u64,
94 pub dropped_event_count: u64,
95}
96
97impl KvWatchEvent {
98 pub fn to_json_value(&self) -> JsonValue {
99 let mut object = Map::new();
100 object.insert("key".to_string(), JsonValue::String(self.key.clone()));
101 object.insert(
102 "op".to_string(),
103 JsonValue::String(self.op.as_str().to_string()),
104 );
105 object.insert(
106 "before".to_string(),
107 self.before.clone().unwrap_or(JsonValue::Null),
108 );
109 object.insert(
110 "after".to_string(),
111 self.after.clone().unwrap_or(JsonValue::Null),
112 );
113 object.insert("lsn".to_string(), JsonValue::Number(self.lsn as f64));
114 object.insert(
115 "committed_at".to_string(),
116 JsonValue::Number(self.committed_at as f64),
117 );
118 object.insert(
119 "dropped_event_count".to_string(),
120 JsonValue::Number(self.dropped_event_count as f64),
121 );
122 JsonValue::Object(object)
123 }
124}
125
126#[derive(Debug, Clone)]
129pub struct ChangeRecord {
130 pub lsn: u64,
131 pub timestamp: u64,
132 pub operation: ChangeOperation,
133 pub collection: String,
134 pub entity_id: u64,
135 pub entity_kind: String,
136 pub entity_bytes: Option<Vec<u8>>,
137 pub metadata: Option<JsonValue>,
138 pub refresh_records: Option<Vec<Vec<u8>>>,
144}
145
146impl ChangeRecord {
147 pub fn from_entity(
148 lsn: u64,
149 timestamp: u64,
150 operation: ChangeOperation,
151 collection: impl Into<String>,
152 entity_kind: impl Into<String>,
153 entity: &crate::storage::UnifiedEntity,
154 format_version: u32,
155 metadata: Option<JsonValue>,
156 ) -> Self {
157 let entity_bytes = match operation {
158 ChangeOperation::Delete | ChangeOperation::Refresh => None,
159 ChangeOperation::Insert | ChangeOperation::Update => Some(
160 crate::storage::UnifiedStore::serialize_entity(entity, format_version),
161 ),
162 };
163
164 Self {
165 lsn,
166 timestamp,
167 operation,
168 collection: collection.into(),
169 entity_id: entity.id.raw(),
170 entity_kind: entity_kind.into(),
171 entity_bytes,
172 metadata,
173 refresh_records: None,
174 }
175 }
176
177 pub fn for_refresh(
181 lsn: u64,
182 timestamp: u64,
183 collection: impl Into<String>,
184 records: Vec<Vec<u8>>,
185 ) -> Self {
186 Self {
187 lsn,
188 timestamp,
189 operation: ChangeOperation::Refresh,
190 collection: collection.into(),
191 entity_id: 0,
192 entity_kind: "refresh".to_string(),
193 entity_bytes: None,
194 metadata: None,
195 refresh_records: Some(records),
196 }
197 }
198
199 pub fn to_json_value(&self) -> JsonValue {
200 let mut object = Map::new();
201 object.insert("lsn".to_string(), JsonValue::Number(self.lsn as f64));
202 object.insert(
203 "timestamp".to_string(),
204 JsonValue::Number(self.timestamp as f64),
205 );
206 object.insert(
207 "operation".to_string(),
208 JsonValue::String(self.operation.as_str().to_string()),
209 );
210 object.insert(
211 "collection".to_string(),
212 JsonValue::String(self.collection.clone()),
213 );
214 object.insert("rid".to_string(), JsonValue::Number(self.entity_id as f64));
215 object.insert(
216 "kind".to_string(),
217 JsonValue::String(public_item_kind(&self.entity_kind).to_string()),
218 );
219 if let Some(bytes) = &self.entity_bytes {
220 object.insert(
221 "entity_bytes_hex".to_string(),
222 JsonValue::String(hex::encode(bytes)),
223 );
224 }
225 if let Some(metadata) = &self.metadata {
226 object.insert("metadata".to_string(), metadata.clone());
227 }
228 if let Some(records) = &self.refresh_records {
229 let arr = records
230 .iter()
231 .map(|bytes| JsonValue::String(hex::encode(bytes)))
232 .collect();
233 object.insert("refresh_records_hex".to_string(), JsonValue::Array(arr));
234 }
235 JsonValue::Object(object)
236 }
237
238 pub fn encode(&self) -> Vec<u8> {
239 crate::json::to_string(&self.to_json_value())
240 .unwrap_or_else(|_| "{}".to_string())
241 .into_bytes()
242 }
243
244 pub fn decode(bytes: &[u8]) -> Result<Self, String> {
245 let text = std::str::from_utf8(bytes).map_err(|err| err.to_string())?;
246 let value = crate::json::from_str::<JsonValue>(text).map_err(|err| err.to_string())?;
247 let operation = value
248 .get("operation")
249 .and_then(JsonValue::as_str)
250 .and_then(ChangeOperation::from_str)
251 .ok_or_else(|| "invalid replication operation".to_string())?;
252 let entity_bytes = value
253 .get("entity_bytes_hex")
254 .and_then(JsonValue::as_str)
255 .map(hex::decode)
256 .transpose()
257 .map_err(|err| err.to_string())?;
258
259 Ok(Self {
260 lsn: value.get("lsn").and_then(JsonValue::as_u64).unwrap_or(0),
261 timestamp: value
262 .get("timestamp")
263 .and_then(JsonValue::as_u64)
264 .unwrap_or(0),
265 operation,
266 collection: value
267 .get("collection")
268 .and_then(JsonValue::as_str)
269 .unwrap_or_default()
270 .to_string(),
271 entity_id: value
272 .get("rid")
273 .or_else(|| value.get("entity_id"))
274 .and_then(JsonValue::as_u64)
275 .unwrap_or(0),
276 entity_kind: value
277 .get("kind")
278 .or_else(|| value.get("entity_kind"))
279 .and_then(JsonValue::as_str)
280 .unwrap_or("entity")
281 .to_string(),
282 entity_bytes,
283 metadata: value.get("metadata").cloned(),
284 refresh_records: match value.get("refresh_records_hex") {
285 Some(JsonValue::Array(items)) => {
286 let mut out = Vec::with_capacity(items.len());
287 for item in items {
288 let hex_str = item
289 .as_str()
290 .ok_or_else(|| "refresh_records_hex entry not a string".to_string())?;
291 let bytes = hex::decode(hex_str).map_err(|err| err.to_string())?;
292 out.push(bytes);
293 }
294 Some(out)
295 }
296 None | Some(JsonValue::Null) => None,
297 _ => return Err("refresh_records_hex is not an array".to_string()),
298 },
299 })
300 }
301}
302
303pub fn public_item_kind(entity_kind: &str) -> &'static str {
304 match entity_kind {
305 "table" | "entity" | "row" => "row",
306 "graph_node" | "node" => "node",
307 "graph_edge" | "edge" => "edge",
308 "kv" => "kv",
309 "document" => "document",
310 "vector" => "vector",
311 other if other.contains("kv") => "kv",
312 other if other.contains("document") => "document",
313 other if other.contains("vector") => "vector",
314 _ => "item",
315 }
316}
317
318pub struct CdcBuffer {
339 next_lsn: AtomicU64,
340 events: parking_lot::Mutex<VecDeque<ChangeEvent>>,
341 max_size: usize,
342}
343
344impl CdcBuffer {
345 pub fn new(max_size: usize) -> Self {
347 Self {
348 next_lsn: AtomicU64::new(0),
349 events: parking_lot::Mutex::new(VecDeque::with_capacity(max_size.min(10_000))),
350 max_size,
351 }
352 }
353
354 pub fn emit(
359 &self,
360 operation: ChangeOperation,
361 collection: &str,
362 entity_id: u64,
363 entity_kind: &str,
364 ) -> u64 {
365 self.emit_with_columns(operation, collection, entity_id, entity_kind, None)
366 }
367
368 pub fn emit_with_columns(
373 &self,
374 operation: ChangeOperation,
375 collection: &str,
376 entity_id: u64,
377 entity_kind: &str,
378 changed_columns: Option<Vec<String>>,
379 ) -> u64 {
380 let event_lsn = self.next_lsn.fetch_add(1, Ordering::AcqRel) + 1;
383
384 let event = ChangeEvent {
385 lsn: event_lsn,
386 timestamp: SystemTime::now()
387 .duration_since(UNIX_EPOCH)
388 .unwrap_or_default()
389 .as_millis() as u64,
390 operation,
391 collection: collection.to_string(),
392 entity_id,
393 entity_kind: entity_kind.to_string(),
394 changed_columns,
395 kv: None,
396 };
397
398 let mut events = self.events.lock();
402 if events.len() >= self.max_size {
403 events.pop_front();
404 }
405 events.push_back(event);
406
407 event_lsn
408 }
409
410 pub fn emit_batch_same_collection<I>(
414 &self,
415 operation: ChangeOperation,
416 collection: &str,
417 entity_kind: &str,
418 entity_ids: I,
419 ) -> Vec<u64>
420 where
421 I: IntoIterator<Item = u64>,
422 I::IntoIter: ExactSizeIterator,
423 {
424 let iter = entity_ids.into_iter();
425 let count = iter.len();
426 if count == 0 {
427 return Vec::new();
428 }
429
430 let first_lsn = self.next_lsn.fetch_add(count as u64, Ordering::AcqRel) + 1;
431 let lsns = (0..count)
432 .map(|idx| first_lsn + idx as u64)
433 .collect::<Vec<_>>();
434 if self.max_size == 0 {
435 return lsns;
436 }
437
438 let timestamp = SystemTime::now()
439 .duration_since(UNIX_EPOCH)
440 .unwrap_or_default()
441 .as_millis() as u64;
442 let collection = collection.to_string();
443 let entity_kind = entity_kind.to_string();
444
445 let skip = count.saturating_sub(self.max_size);
446 let kept = count - skip;
447 let mut events = self.events.lock();
448 let overflow = events
449 .len()
450 .saturating_add(kept)
451 .saturating_sub(self.max_size);
452 for _ in 0..overflow {
453 events.pop_front();
454 }
455
456 for (idx, entity_id) in iter.enumerate().skip(skip) {
457 events.push_back(ChangeEvent {
458 lsn: first_lsn + idx as u64,
459 timestamp,
460 operation,
461 collection: collection.clone(),
462 entity_id,
463 entity_kind: entity_kind.clone(),
464 changed_columns: None,
465 kv: None,
466 });
467 }
468 lsns
469 }
470
471 pub fn emit_kv(
474 &self,
475 operation: ChangeOperation,
476 collection: &str,
477 key: &str,
478 entity_id: u64,
479 before: Option<JsonValue>,
480 after: Option<JsonValue>,
481 ) -> u64 {
482 let event_lsn = self.next_lsn.fetch_add(1, Ordering::AcqRel) + 1;
483 let timestamp = SystemTime::now()
484 .duration_since(UNIX_EPOCH)
485 .unwrap_or_default()
486 .as_millis() as u64;
487 let kv = KvWatchEvent {
488 collection: collection.to_string(),
489 key: key.to_string(),
490 op: operation,
491 before,
492 after,
493 lsn: event_lsn,
494 committed_at: timestamp,
495 dropped_event_count: 0,
496 };
497 let event = ChangeEvent {
498 lsn: event_lsn,
499 timestamp,
500 operation,
501 collection: collection.to_string(),
502 entity_id,
503 entity_kind: "kv".to_string(),
504 changed_columns: Some(vec!["value".to_string()]),
505 kv: Some(kv),
506 };
507
508 let mut events = self.events.lock();
509 if events.len() >= self.max_size {
510 events.pop_front();
511 }
512 events.push_back(event);
513 event_lsn
514 }
515
516 pub fn poll(&self, since_lsn: u64, max_count: usize) -> Vec<ChangeEvent> {
518 let events = self.events.lock();
519 events
520 .iter()
521 .filter(|e| e.lsn > since_lsn)
522 .take(max_count)
523 .cloned()
524 .collect()
525 }
526
527 pub fn current_lsn(&self) -> u64 {
529 self.next_lsn.load(Ordering::Acquire)
530 }
531
532 pub fn set_current_lsn(&self, lsn: u64) {
536 let mut current = self.next_lsn.load(Ordering::Acquire);
537 while lsn > current {
538 match self
539 .next_lsn
540 .compare_exchange(current, lsn, Ordering::AcqRel, Ordering::Acquire)
541 {
542 Ok(_) => break,
543 Err(observed) => current = observed,
544 }
545 }
546 }
547
548 pub fn oldest_lsn(&self) -> Option<u64> {
550 self.events.lock().front().map(|e| e.lsn)
551 }
552
553 pub fn stats(&self) -> CdcStats {
555 let events = self.events.lock();
556 CdcStats {
557 buffered_events: events.len(),
558 current_lsn: self.next_lsn.load(Ordering::Acquire),
559 oldest_lsn: events.front().map(|e| e.lsn),
560 newest_lsn: events.back().map(|e| e.lsn),
561 }
562 }
563}
564
565#[derive(Debug, Clone)]
567pub struct CdcStats {
568 pub buffered_events: usize,
569 pub current_lsn: u64,
570 pub oldest_lsn: Option<u64>,
571 pub newest_lsn: Option<u64>,
572}
573
574#[cfg(test)]
575mod tests {
576 use super::*;
577
578 #[test]
579 fn test_emit_and_poll() {
580 let buf = CdcBuffer::new(100);
581 buf.emit(ChangeOperation::Insert, "users", 1, "table");
582 buf.emit(ChangeOperation::Update, "users", 1, "table");
583 buf.emit(ChangeOperation::Delete, "users", 1, "table");
584
585 let events = buf.poll(0, 10);
586 assert_eq!(events.len(), 3);
587 assert_eq!(events[0].operation, ChangeOperation::Insert);
588 assert_eq!(events[1].operation, ChangeOperation::Update);
589 assert_eq!(events[2].operation, ChangeOperation::Delete);
590 assert!(events[0].changed_columns.is_none());
592 assert!(events[1].changed_columns.is_none());
593 }
594
595 #[test]
596 fn test_emit_with_columns_propagates_damage_vector() {
597 let buf = CdcBuffer::new(100);
598 buf.emit_with_columns(
599 ChangeOperation::Update,
600 "users",
601 7,
602 "table",
603 Some(vec!["email".to_string(), "age".to_string()]),
604 );
605 buf.emit(ChangeOperation::Update, "users", 8, "table");
606
607 let events = buf.poll(0, 10);
608 assert_eq!(events.len(), 2);
609 assert_eq!(
610 events[0].changed_columns.as_deref(),
611 Some(vec!["email".to_string(), "age".to_string()].as_slice())
612 );
613 assert!(events[1].changed_columns.is_none());
614 }
615
616 #[test]
617 fn test_poll_with_cursor() {
618 let buf = CdcBuffer::new(100);
619 buf.emit(ChangeOperation::Insert, "a", 1, "table");
620 buf.emit(ChangeOperation::Insert, "b", 2, "table");
621 buf.emit(ChangeOperation::Insert, "c", 3, "table");
622
623 let events = buf.poll(1, 10);
625 assert_eq!(events.len(), 2);
626 assert_eq!(events[0].collection, "b");
627 assert_eq!(events[1].collection, "c");
628 }
629
630 #[test]
631 fn test_emit_batch_same_collection_assigns_contiguous_lsns() {
632 let buf = CdcBuffer::new(100);
633 buf.emit(ChangeOperation::Insert, "users", 10, "table");
634 buf.emit_batch_same_collection(ChangeOperation::Insert, "users", "table", [11, 12, 13]);
635
636 let events = buf.poll(0, 10);
637 assert_eq!(events.len(), 4);
638 assert_eq!(events[1].lsn, 2);
639 assert_eq!(events[2].lsn, 3);
640 assert_eq!(events[3].lsn, 4);
641 assert_eq!(events[3].entity_id, 13);
642 assert_eq!(buf.current_lsn(), 4);
643 }
644
645 #[test]
646 fn test_emit_batch_same_collection_respects_ring_size() {
647 let buf = CdcBuffer::new(3);
648 buf.emit_batch_same_collection(ChangeOperation::Insert, "users", "table", [1, 2, 3, 4, 5]);
649
650 let events = buf.poll(0, 10);
651 assert_eq!(events.len(), 3);
652 assert_eq!(
653 events
654 .iter()
655 .map(|event| event.entity_id)
656 .collect::<Vec<_>>(),
657 vec![3, 4, 5]
658 );
659 assert_eq!(events[0].lsn, 3);
660 assert_eq!(events[2].lsn, 5);
661 assert_eq!(buf.current_lsn(), 5);
662 }
663
664 #[test]
665 fn test_circular_eviction() {
666 let buf = CdcBuffer::new(3);
667 buf.emit(ChangeOperation::Insert, "a", 1, "table");
668 buf.emit(ChangeOperation::Insert, "b", 2, "table");
669 buf.emit(ChangeOperation::Insert, "c", 3, "table");
670 buf.emit(ChangeOperation::Insert, "d", 4, "table"); let events = buf.poll(0, 10);
673 assert_eq!(events.len(), 3);
674 assert_eq!(events[0].collection, "b"); }
676
677 #[test]
678 fn test_stats() {
679 let buf = CdcBuffer::new(100);
680 buf.emit(ChangeOperation::Insert, "x", 1, "table");
681 buf.emit(ChangeOperation::Insert, "y", 2, "table");
682
683 let stats = buf.stats();
684 assert_eq!(stats.buffered_events, 2);
685 assert_eq!(stats.current_lsn, 2);
686 assert_eq!(stats.oldest_lsn, Some(1));
687 assert_eq!(stats.newest_lsn, Some(2));
688 }
689
690 #[test]
691 fn test_change_record_roundtrip() {
692 let record = ChangeRecord {
693 lsn: 7,
694 timestamp: 1234,
695 operation: ChangeOperation::Update,
696 collection: "users".to_string(),
697 entity_id: 42,
698 entity_kind: "row".to_string(),
699 entity_bytes: Some(vec![1, 2, 3]),
700 metadata: Some(crate::json!({"role": "admin"})),
701 refresh_records: None,
702 };
703
704 let decoded = ChangeRecord::decode(&record.encode()).expect("decode");
705 assert_eq!(decoded.lsn, record.lsn);
706 assert_eq!(decoded.collection, record.collection);
707 assert_eq!(decoded.entity_id, record.entity_id);
708 assert_eq!(decoded.entity_bytes, record.entity_bytes);
709 }
710
711 #[test]
718 fn test_change_record_refresh_roundtrip() {
719 let records = vec![vec![0x10, 0x20, 0x30], vec![0xAA, 0xBB], Vec::new()];
720 let record = ChangeRecord::for_refresh(11, 99, "mv_orders_summary", records.clone());
721
722 let decoded = ChangeRecord::decode(&record.encode()).expect("decode");
723 assert_eq!(decoded.operation, ChangeOperation::Refresh);
724 assert_eq!(decoded.collection, "mv_orders_summary");
725 assert_eq!(decoded.refresh_records.as_deref(), Some(&records[..]));
726 }
727}