1use std::collections::VecDeque;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::time::{SystemTime, UNIX_EPOCH};
9
10use crate::json::{Map, Value as JsonValue};
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum ChangeOperation {
15 Insert,
16 Update,
17 Delete,
18}
19
20impl ChangeOperation {
21 pub fn from_str(value: &str) -> Option<Self> {
22 match value {
23 "insert" => Some(Self::Insert),
24 "update" => Some(Self::Update),
25 "delete" => Some(Self::Delete),
26 _ => None,
27 }
28 }
29
30 pub fn as_str(&self) -> &'static str {
31 match self {
32 Self::Insert => "insert",
33 Self::Update => "update",
34 Self::Delete => "delete",
35 }
36 }
37}
38
39#[derive(Debug, Clone)]
41pub struct ChangeEvent {
42 pub lsn: u64,
44 pub timestamp: u64,
46 pub operation: ChangeOperation,
48 pub collection: String,
50 pub entity_id: u64,
52 pub entity_kind: String,
54 pub changed_columns: Option<Vec<String>>,
61 pub kv: Option<KvWatchEvent>,
64}
65
66impl ChangeEvent {
67 pub fn rid(&self) -> u64 {
68 self.entity_id
69 }
70
71 pub fn kind(&self) -> &'static str {
72 public_item_kind(&self.entity_kind)
73 }
74}
75
76#[derive(Debug, Clone, PartialEq)]
78pub struct KvWatchEvent {
79 pub collection: String,
80 pub key: String,
81 pub op: ChangeOperation,
82 pub before: Option<JsonValue>,
83 pub after: Option<JsonValue>,
84 pub lsn: u64,
85 pub committed_at: u64,
86 pub dropped_event_count: u64,
87}
88
89impl KvWatchEvent {
90 pub fn to_json_value(&self) -> JsonValue {
91 let mut object = Map::new();
92 object.insert("key".to_string(), JsonValue::String(self.key.clone()));
93 object.insert(
94 "op".to_string(),
95 JsonValue::String(self.op.as_str().to_string()),
96 );
97 object.insert(
98 "before".to_string(),
99 self.before.clone().unwrap_or(JsonValue::Null),
100 );
101 object.insert(
102 "after".to_string(),
103 self.after.clone().unwrap_or(JsonValue::Null),
104 );
105 object.insert("lsn".to_string(), JsonValue::Number(self.lsn as f64));
106 object.insert(
107 "committed_at".to_string(),
108 JsonValue::Number(self.committed_at as f64),
109 );
110 object.insert(
111 "dropped_event_count".to_string(),
112 JsonValue::Number(self.dropped_event_count as f64),
113 );
114 JsonValue::Object(object)
115 }
116}
117
118#[derive(Debug, Clone)]
121pub struct ChangeRecord {
122 pub lsn: u64,
123 pub timestamp: u64,
124 pub operation: ChangeOperation,
125 pub collection: String,
126 pub entity_id: u64,
127 pub entity_kind: String,
128 pub entity_bytes: Option<Vec<u8>>,
129 pub metadata: Option<JsonValue>,
130}
131
132impl ChangeRecord {
133 pub fn from_entity(
134 lsn: u64,
135 timestamp: u64,
136 operation: ChangeOperation,
137 collection: impl Into<String>,
138 entity_kind: impl Into<String>,
139 entity: &crate::storage::UnifiedEntity,
140 format_version: u32,
141 metadata: Option<JsonValue>,
142 ) -> Self {
143 let entity_bytes = match operation {
144 ChangeOperation::Delete => None,
145 ChangeOperation::Insert | ChangeOperation::Update => Some(
146 crate::storage::UnifiedStore::serialize_entity(entity, format_version),
147 ),
148 };
149
150 Self {
151 lsn,
152 timestamp,
153 operation,
154 collection: collection.into(),
155 entity_id: entity.id.raw(),
156 entity_kind: entity_kind.into(),
157 entity_bytes,
158 metadata,
159 }
160 }
161
162 pub fn to_json_value(&self) -> JsonValue {
163 let mut object = Map::new();
164 object.insert("lsn".to_string(), JsonValue::Number(self.lsn as f64));
165 object.insert(
166 "timestamp".to_string(),
167 JsonValue::Number(self.timestamp as f64),
168 );
169 object.insert(
170 "operation".to_string(),
171 JsonValue::String(self.operation.as_str().to_string()),
172 );
173 object.insert(
174 "collection".to_string(),
175 JsonValue::String(self.collection.clone()),
176 );
177 object.insert("rid".to_string(), JsonValue::Number(self.entity_id as f64));
178 object.insert(
179 "kind".to_string(),
180 JsonValue::String(public_item_kind(&self.entity_kind).to_string()),
181 );
182 if let Some(bytes) = &self.entity_bytes {
183 object.insert(
184 "entity_bytes_hex".to_string(),
185 JsonValue::String(hex::encode(bytes)),
186 );
187 }
188 if let Some(metadata) = &self.metadata {
189 object.insert("metadata".to_string(), metadata.clone());
190 }
191 JsonValue::Object(object)
192 }
193
194 pub fn encode(&self) -> Vec<u8> {
195 crate::json::to_string(&self.to_json_value())
196 .unwrap_or_else(|_| "{}".to_string())
197 .into_bytes()
198 }
199
200 pub fn decode(bytes: &[u8]) -> Result<Self, String> {
201 let text = std::str::from_utf8(bytes).map_err(|err| err.to_string())?;
202 let value = crate::json::from_str::<JsonValue>(text).map_err(|err| err.to_string())?;
203 let operation = value
204 .get("operation")
205 .and_then(JsonValue::as_str)
206 .and_then(ChangeOperation::from_str)
207 .ok_or_else(|| "invalid replication operation".to_string())?;
208 let entity_bytes = value
209 .get("entity_bytes_hex")
210 .and_then(JsonValue::as_str)
211 .map(hex::decode)
212 .transpose()
213 .map_err(|err| err.to_string())?;
214
215 Ok(Self {
216 lsn: value.get("lsn").and_then(JsonValue::as_u64).unwrap_or(0),
217 timestamp: value
218 .get("timestamp")
219 .and_then(JsonValue::as_u64)
220 .unwrap_or(0),
221 operation,
222 collection: value
223 .get("collection")
224 .and_then(JsonValue::as_str)
225 .unwrap_or_default()
226 .to_string(),
227 entity_id: value
228 .get("rid")
229 .or_else(|| value.get("entity_id"))
230 .and_then(JsonValue::as_u64)
231 .unwrap_or(0),
232 entity_kind: value
233 .get("kind")
234 .or_else(|| value.get("entity_kind"))
235 .and_then(JsonValue::as_str)
236 .unwrap_or("entity")
237 .to_string(),
238 entity_bytes,
239 metadata: value.get("metadata").cloned(),
240 })
241 }
242}
243
244pub fn public_item_kind(entity_kind: &str) -> &'static str {
245 match entity_kind {
246 "table" | "entity" | "row" => "row",
247 "graph_node" | "node" => "node",
248 "graph_edge" | "edge" => "edge",
249 "kv" => "kv",
250 "document" => "document",
251 "vector" => "vector",
252 other if other.contains("kv") => "kv",
253 other if other.contains("document") => "document",
254 other if other.contains("vector") => "vector",
255 _ => "item",
256 }
257}
258
259pub struct CdcBuffer {
280 next_lsn: AtomicU64,
281 events: parking_lot::Mutex<VecDeque<ChangeEvent>>,
282 max_size: usize,
283}
284
285impl CdcBuffer {
286 pub fn new(max_size: usize) -> Self {
288 Self {
289 next_lsn: AtomicU64::new(0),
290 events: parking_lot::Mutex::new(VecDeque::with_capacity(max_size.min(10_000))),
291 max_size,
292 }
293 }
294
295 pub fn emit(
300 &self,
301 operation: ChangeOperation,
302 collection: &str,
303 entity_id: u64,
304 entity_kind: &str,
305 ) -> u64 {
306 self.emit_with_columns(operation, collection, entity_id, entity_kind, None)
307 }
308
309 pub fn emit_with_columns(
314 &self,
315 operation: ChangeOperation,
316 collection: &str,
317 entity_id: u64,
318 entity_kind: &str,
319 changed_columns: Option<Vec<String>>,
320 ) -> u64 {
321 let event_lsn = self.next_lsn.fetch_add(1, Ordering::AcqRel) + 1;
324
325 let event = ChangeEvent {
326 lsn: event_lsn,
327 timestamp: SystemTime::now()
328 .duration_since(UNIX_EPOCH)
329 .unwrap_or_default()
330 .as_millis() as u64,
331 operation,
332 collection: collection.to_string(),
333 entity_id,
334 entity_kind: entity_kind.to_string(),
335 changed_columns,
336 kv: None,
337 };
338
339 let mut events = self.events.lock();
343 if events.len() >= self.max_size {
344 events.pop_front();
345 }
346 events.push_back(event);
347
348 event_lsn
349 }
350
351 pub fn emit_batch_same_collection<I>(
355 &self,
356 operation: ChangeOperation,
357 collection: &str,
358 entity_kind: &str,
359 entity_ids: I,
360 ) -> Vec<u64>
361 where
362 I: IntoIterator<Item = u64>,
363 I::IntoIter: ExactSizeIterator,
364 {
365 let iter = entity_ids.into_iter();
366 let count = iter.len();
367 if count == 0 {
368 return Vec::new();
369 }
370
371 let first_lsn = self.next_lsn.fetch_add(count as u64, Ordering::AcqRel) + 1;
372 let lsns = (0..count)
373 .map(|idx| first_lsn + idx as u64)
374 .collect::<Vec<_>>();
375 if self.max_size == 0 {
376 return lsns;
377 }
378
379 let timestamp = SystemTime::now()
380 .duration_since(UNIX_EPOCH)
381 .unwrap_or_default()
382 .as_millis() as u64;
383 let collection = collection.to_string();
384 let entity_kind = entity_kind.to_string();
385
386 let skip = count.saturating_sub(self.max_size);
387 let kept = count - skip;
388 let mut events = self.events.lock();
389 let overflow = events
390 .len()
391 .saturating_add(kept)
392 .saturating_sub(self.max_size);
393 for _ in 0..overflow {
394 events.pop_front();
395 }
396
397 for (idx, entity_id) in iter.enumerate().skip(skip) {
398 events.push_back(ChangeEvent {
399 lsn: first_lsn + idx as u64,
400 timestamp,
401 operation,
402 collection: collection.clone(),
403 entity_id,
404 entity_kind: entity_kind.clone(),
405 changed_columns: None,
406 kv: None,
407 });
408 }
409 lsns
410 }
411
412 pub fn emit_kv(
415 &self,
416 operation: ChangeOperation,
417 collection: &str,
418 key: &str,
419 entity_id: u64,
420 before: Option<JsonValue>,
421 after: Option<JsonValue>,
422 ) -> u64 {
423 let event_lsn = self.next_lsn.fetch_add(1, Ordering::AcqRel) + 1;
424 let timestamp = SystemTime::now()
425 .duration_since(UNIX_EPOCH)
426 .unwrap_or_default()
427 .as_millis() as u64;
428 let kv = KvWatchEvent {
429 collection: collection.to_string(),
430 key: key.to_string(),
431 op: operation,
432 before,
433 after,
434 lsn: event_lsn,
435 committed_at: timestamp,
436 dropped_event_count: 0,
437 };
438 let event = ChangeEvent {
439 lsn: event_lsn,
440 timestamp,
441 operation,
442 collection: collection.to_string(),
443 entity_id,
444 entity_kind: "kv".to_string(),
445 changed_columns: Some(vec!["value".to_string()]),
446 kv: Some(kv),
447 };
448
449 let mut events = self.events.lock();
450 if events.len() >= self.max_size {
451 events.pop_front();
452 }
453 events.push_back(event);
454 event_lsn
455 }
456
457 pub fn poll(&self, since_lsn: u64, max_count: usize) -> Vec<ChangeEvent> {
459 let events = self.events.lock();
460 events
461 .iter()
462 .filter(|e| e.lsn > since_lsn)
463 .take(max_count)
464 .cloned()
465 .collect()
466 }
467
468 pub fn current_lsn(&self) -> u64 {
470 self.next_lsn.load(Ordering::Acquire)
471 }
472
473 pub fn set_current_lsn(&self, lsn: u64) {
477 let mut current = self.next_lsn.load(Ordering::Acquire);
478 while lsn > current {
479 match self
480 .next_lsn
481 .compare_exchange(current, lsn, Ordering::AcqRel, Ordering::Acquire)
482 {
483 Ok(_) => break,
484 Err(observed) => current = observed,
485 }
486 }
487 }
488
489 pub fn oldest_lsn(&self) -> Option<u64> {
491 self.events.lock().front().map(|e| e.lsn)
492 }
493
494 pub fn stats(&self) -> CdcStats {
496 let events = self.events.lock();
497 CdcStats {
498 buffered_events: events.len(),
499 current_lsn: self.next_lsn.load(Ordering::Acquire),
500 oldest_lsn: events.front().map(|e| e.lsn),
501 newest_lsn: events.back().map(|e| e.lsn),
502 }
503 }
504}
505
506#[derive(Debug, Clone)]
508pub struct CdcStats {
509 pub buffered_events: usize,
510 pub current_lsn: u64,
511 pub oldest_lsn: Option<u64>,
512 pub newest_lsn: Option<u64>,
513}
514
515#[cfg(test)]
516mod tests {
517 use super::*;
518
519 #[test]
520 fn test_emit_and_poll() {
521 let buf = CdcBuffer::new(100);
522 buf.emit(ChangeOperation::Insert, "users", 1, "table");
523 buf.emit(ChangeOperation::Update, "users", 1, "table");
524 buf.emit(ChangeOperation::Delete, "users", 1, "table");
525
526 let events = buf.poll(0, 10);
527 assert_eq!(events.len(), 3);
528 assert_eq!(events[0].operation, ChangeOperation::Insert);
529 assert_eq!(events[1].operation, ChangeOperation::Update);
530 assert_eq!(events[2].operation, ChangeOperation::Delete);
531 assert!(events[0].changed_columns.is_none());
533 assert!(events[1].changed_columns.is_none());
534 }
535
536 #[test]
537 fn test_emit_with_columns_propagates_damage_vector() {
538 let buf = CdcBuffer::new(100);
539 buf.emit_with_columns(
540 ChangeOperation::Update,
541 "users",
542 7,
543 "table",
544 Some(vec!["email".to_string(), "age".to_string()]),
545 );
546 buf.emit(ChangeOperation::Update, "users", 8, "table");
547
548 let events = buf.poll(0, 10);
549 assert_eq!(events.len(), 2);
550 assert_eq!(
551 events[0].changed_columns.as_deref(),
552 Some(vec!["email".to_string(), "age".to_string()].as_slice())
553 );
554 assert!(events[1].changed_columns.is_none());
555 }
556
557 #[test]
558 fn test_poll_with_cursor() {
559 let buf = CdcBuffer::new(100);
560 buf.emit(ChangeOperation::Insert, "a", 1, "table");
561 buf.emit(ChangeOperation::Insert, "b", 2, "table");
562 buf.emit(ChangeOperation::Insert, "c", 3, "table");
563
564 let events = buf.poll(1, 10);
566 assert_eq!(events.len(), 2);
567 assert_eq!(events[0].collection, "b");
568 assert_eq!(events[1].collection, "c");
569 }
570
571 #[test]
572 fn test_emit_batch_same_collection_assigns_contiguous_lsns() {
573 let buf = CdcBuffer::new(100);
574 buf.emit(ChangeOperation::Insert, "users", 10, "table");
575 buf.emit_batch_same_collection(ChangeOperation::Insert, "users", "table", [11, 12, 13]);
576
577 let events = buf.poll(0, 10);
578 assert_eq!(events.len(), 4);
579 assert_eq!(events[1].lsn, 2);
580 assert_eq!(events[2].lsn, 3);
581 assert_eq!(events[3].lsn, 4);
582 assert_eq!(events[3].entity_id, 13);
583 assert_eq!(buf.current_lsn(), 4);
584 }
585
586 #[test]
587 fn test_emit_batch_same_collection_respects_ring_size() {
588 let buf = CdcBuffer::new(3);
589 buf.emit_batch_same_collection(ChangeOperation::Insert, "users", "table", [1, 2, 3, 4, 5]);
590
591 let events = buf.poll(0, 10);
592 assert_eq!(events.len(), 3);
593 assert_eq!(
594 events
595 .iter()
596 .map(|event| event.entity_id)
597 .collect::<Vec<_>>(),
598 vec![3, 4, 5]
599 );
600 assert_eq!(events[0].lsn, 3);
601 assert_eq!(events[2].lsn, 5);
602 assert_eq!(buf.current_lsn(), 5);
603 }
604
605 #[test]
606 fn test_circular_eviction() {
607 let buf = CdcBuffer::new(3);
608 buf.emit(ChangeOperation::Insert, "a", 1, "table");
609 buf.emit(ChangeOperation::Insert, "b", 2, "table");
610 buf.emit(ChangeOperation::Insert, "c", 3, "table");
611 buf.emit(ChangeOperation::Insert, "d", 4, "table"); let events = buf.poll(0, 10);
614 assert_eq!(events.len(), 3);
615 assert_eq!(events[0].collection, "b"); }
617
618 #[test]
619 fn test_stats() {
620 let buf = CdcBuffer::new(100);
621 buf.emit(ChangeOperation::Insert, "x", 1, "table");
622 buf.emit(ChangeOperation::Insert, "y", 2, "table");
623
624 let stats = buf.stats();
625 assert_eq!(stats.buffered_events, 2);
626 assert_eq!(stats.current_lsn, 2);
627 assert_eq!(stats.oldest_lsn, Some(1));
628 assert_eq!(stats.newest_lsn, Some(2));
629 }
630
631 #[test]
632 fn test_change_record_roundtrip() {
633 let record = ChangeRecord {
634 lsn: 7,
635 timestamp: 1234,
636 operation: ChangeOperation::Update,
637 collection: "users".to_string(),
638 entity_id: 42,
639 entity_kind: "row".to_string(),
640 entity_bytes: Some(vec![1, 2, 3]),
641 metadata: Some(crate::json!({"role": "admin"})),
642 };
643
644 let decoded = ChangeRecord::decode(&record.encode()).expect("decode");
645 assert_eq!(decoded.lsn, record.lsn);
646 assert_eq!(decoded.collection, record.collection);
647 assert_eq!(decoded.entity_id, record.entity_id);
648 assert_eq!(decoded.entity_bytes, record.entity_bytes);
649 }
650}