1use std::collections::VecDeque;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::time::{SystemTime, UNIX_EPOCH};
9
10use crate::json::{Map, Value as JsonValue};
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum ChangeOperation {
15 Insert,
16 Update,
17 Delete,
18}
19
20impl ChangeOperation {
21 pub fn from_str(value: &str) -> Option<Self> {
22 match value {
23 "insert" => Some(Self::Insert),
24 "update" => Some(Self::Update),
25 "delete" => Some(Self::Delete),
26 _ => None,
27 }
28 }
29
30 pub fn as_str(&self) -> &'static str {
31 match self {
32 Self::Insert => "insert",
33 Self::Update => "update",
34 Self::Delete => "delete",
35 }
36 }
37}
38
39#[derive(Debug, Clone)]
41pub struct ChangeEvent {
42 pub lsn: u64,
44 pub timestamp: u64,
46 pub operation: ChangeOperation,
48 pub collection: String,
50 pub entity_id: u64,
52 pub entity_kind: String,
54 pub changed_columns: Option<Vec<String>>,
61 pub kv: Option<KvWatchEvent>,
64}
65
66#[derive(Debug, Clone, PartialEq)]
68pub struct KvWatchEvent {
69 pub collection: String,
70 pub key: String,
71 pub op: ChangeOperation,
72 pub before: Option<JsonValue>,
73 pub after: Option<JsonValue>,
74 pub lsn: u64,
75 pub committed_at: u64,
76 pub dropped_event_count: u64,
77}
78
79impl KvWatchEvent {
80 pub fn to_json_value(&self) -> JsonValue {
81 let mut object = Map::new();
82 object.insert("key".to_string(), JsonValue::String(self.key.clone()));
83 object.insert(
84 "op".to_string(),
85 JsonValue::String(self.op.as_str().to_string()),
86 );
87 object.insert(
88 "before".to_string(),
89 self.before.clone().unwrap_or(JsonValue::Null),
90 );
91 object.insert(
92 "after".to_string(),
93 self.after.clone().unwrap_or(JsonValue::Null),
94 );
95 object.insert("lsn".to_string(), JsonValue::Number(self.lsn as f64));
96 object.insert(
97 "committed_at".to_string(),
98 JsonValue::Number(self.committed_at as f64),
99 );
100 object.insert(
101 "dropped_event_count".to_string(),
102 JsonValue::Number(self.dropped_event_count as f64),
103 );
104 JsonValue::Object(object)
105 }
106}
107
108#[derive(Debug, Clone)]
111pub struct ChangeRecord {
112 pub lsn: u64,
113 pub timestamp: u64,
114 pub operation: ChangeOperation,
115 pub collection: String,
116 pub entity_id: u64,
117 pub entity_kind: String,
118 pub entity_bytes: Option<Vec<u8>>,
119 pub metadata: Option<JsonValue>,
120}
121
122impl ChangeRecord {
123 pub fn from_entity(
124 lsn: u64,
125 timestamp: u64,
126 operation: ChangeOperation,
127 collection: impl Into<String>,
128 entity_kind: impl Into<String>,
129 entity: &crate::storage::UnifiedEntity,
130 format_version: u32,
131 metadata: Option<JsonValue>,
132 ) -> Self {
133 let entity_bytes = match operation {
134 ChangeOperation::Delete => None,
135 ChangeOperation::Insert | ChangeOperation::Update => Some(
136 crate::storage::UnifiedStore::serialize_entity(entity, format_version),
137 ),
138 };
139
140 Self {
141 lsn,
142 timestamp,
143 operation,
144 collection: collection.into(),
145 entity_id: entity.id.raw(),
146 entity_kind: entity_kind.into(),
147 entity_bytes,
148 metadata,
149 }
150 }
151
152 pub fn to_json_value(&self) -> JsonValue {
153 let mut object = Map::new();
154 object.insert("lsn".to_string(), JsonValue::Number(self.lsn as f64));
155 object.insert(
156 "timestamp".to_string(),
157 JsonValue::Number(self.timestamp as f64),
158 );
159 object.insert(
160 "operation".to_string(),
161 JsonValue::String(self.operation.as_str().to_string()),
162 );
163 object.insert(
164 "collection".to_string(),
165 JsonValue::String(self.collection.clone()),
166 );
167 object.insert(
168 "entity_id".to_string(),
169 JsonValue::Number(self.entity_id as f64),
170 );
171 object.insert(
172 "entity_kind".to_string(),
173 JsonValue::String(self.entity_kind.clone()),
174 );
175 if let Some(bytes) = &self.entity_bytes {
176 object.insert(
177 "entity_bytes_hex".to_string(),
178 JsonValue::String(hex::encode(bytes)),
179 );
180 }
181 if let Some(metadata) = &self.metadata {
182 object.insert("metadata".to_string(), metadata.clone());
183 }
184 JsonValue::Object(object)
185 }
186
187 pub fn encode(&self) -> Vec<u8> {
188 crate::json::to_string(&self.to_json_value())
189 .unwrap_or_else(|_| "{}".to_string())
190 .into_bytes()
191 }
192
193 pub fn decode(bytes: &[u8]) -> Result<Self, String> {
194 let text = std::str::from_utf8(bytes).map_err(|err| err.to_string())?;
195 let value = crate::json::from_str::<JsonValue>(text).map_err(|err| err.to_string())?;
196 let operation = value
197 .get("operation")
198 .and_then(JsonValue::as_str)
199 .and_then(ChangeOperation::from_str)
200 .ok_or_else(|| "invalid replication operation".to_string())?;
201 let entity_bytes = value
202 .get("entity_bytes_hex")
203 .and_then(JsonValue::as_str)
204 .map(hex::decode)
205 .transpose()
206 .map_err(|err| err.to_string())?;
207
208 Ok(Self {
209 lsn: value.get("lsn").and_then(JsonValue::as_u64).unwrap_or(0),
210 timestamp: value
211 .get("timestamp")
212 .and_then(JsonValue::as_u64)
213 .unwrap_or(0),
214 operation,
215 collection: value
216 .get("collection")
217 .and_then(JsonValue::as_str)
218 .unwrap_or_default()
219 .to_string(),
220 entity_id: value
221 .get("entity_id")
222 .and_then(JsonValue::as_u64)
223 .unwrap_or(0),
224 entity_kind: value
225 .get("entity_kind")
226 .and_then(JsonValue::as_str)
227 .unwrap_or("entity")
228 .to_string(),
229 entity_bytes,
230 metadata: value.get("metadata").cloned(),
231 })
232 }
233}
234
235pub struct CdcBuffer {
256 next_lsn: AtomicU64,
257 events: parking_lot::Mutex<VecDeque<ChangeEvent>>,
258 max_size: usize,
259}
260
261impl CdcBuffer {
262 pub fn new(max_size: usize) -> Self {
264 Self {
265 next_lsn: AtomicU64::new(0),
266 events: parking_lot::Mutex::new(VecDeque::with_capacity(max_size.min(10_000))),
267 max_size,
268 }
269 }
270
271 pub fn emit(
276 &self,
277 operation: ChangeOperation,
278 collection: &str,
279 entity_id: u64,
280 entity_kind: &str,
281 ) -> u64 {
282 self.emit_with_columns(operation, collection, entity_id, entity_kind, None)
283 }
284
285 pub fn emit_with_columns(
290 &self,
291 operation: ChangeOperation,
292 collection: &str,
293 entity_id: u64,
294 entity_kind: &str,
295 changed_columns: Option<Vec<String>>,
296 ) -> u64 {
297 let event_lsn = self.next_lsn.fetch_add(1, Ordering::AcqRel) + 1;
300
301 let event = ChangeEvent {
302 lsn: event_lsn,
303 timestamp: SystemTime::now()
304 .duration_since(UNIX_EPOCH)
305 .unwrap_or_default()
306 .as_millis() as u64,
307 operation,
308 collection: collection.to_string(),
309 entity_id,
310 entity_kind: entity_kind.to_string(),
311 changed_columns,
312 kv: None,
313 };
314
315 let mut events = self.events.lock();
319 if events.len() >= self.max_size {
320 events.pop_front();
321 }
322 events.push_back(event);
323
324 event_lsn
325 }
326
327 pub fn emit_batch_same_collection<I>(
331 &self,
332 operation: ChangeOperation,
333 collection: &str,
334 entity_kind: &str,
335 entity_ids: I,
336 ) -> Vec<u64>
337 where
338 I: IntoIterator<Item = u64>,
339 I::IntoIter: ExactSizeIterator,
340 {
341 let iter = entity_ids.into_iter();
342 let count = iter.len();
343 if count == 0 {
344 return Vec::new();
345 }
346
347 let first_lsn = self.next_lsn.fetch_add(count as u64, Ordering::AcqRel) + 1;
348 let lsns = (0..count)
349 .map(|idx| first_lsn + idx as u64)
350 .collect::<Vec<_>>();
351 if self.max_size == 0 {
352 return lsns;
353 }
354
355 let timestamp = SystemTime::now()
356 .duration_since(UNIX_EPOCH)
357 .unwrap_or_default()
358 .as_millis() as u64;
359 let collection = collection.to_string();
360 let entity_kind = entity_kind.to_string();
361
362 let skip = count.saturating_sub(self.max_size);
363 let kept = count - skip;
364 let mut events = self.events.lock();
365 let overflow = events
366 .len()
367 .saturating_add(kept)
368 .saturating_sub(self.max_size);
369 for _ in 0..overflow {
370 events.pop_front();
371 }
372
373 for (idx, entity_id) in iter.enumerate().skip(skip) {
374 events.push_back(ChangeEvent {
375 lsn: first_lsn + idx as u64,
376 timestamp,
377 operation,
378 collection: collection.clone(),
379 entity_id,
380 entity_kind: entity_kind.clone(),
381 changed_columns: None,
382 kv: None,
383 });
384 }
385 lsns
386 }
387
388 pub fn emit_kv(
391 &self,
392 operation: ChangeOperation,
393 collection: &str,
394 key: &str,
395 entity_id: u64,
396 before: Option<JsonValue>,
397 after: Option<JsonValue>,
398 ) -> u64 {
399 let event_lsn = self.next_lsn.fetch_add(1, Ordering::AcqRel) + 1;
400 let timestamp = SystemTime::now()
401 .duration_since(UNIX_EPOCH)
402 .unwrap_or_default()
403 .as_millis() as u64;
404 let kv = KvWatchEvent {
405 collection: collection.to_string(),
406 key: key.to_string(),
407 op: operation,
408 before,
409 after,
410 lsn: event_lsn,
411 committed_at: timestamp,
412 dropped_event_count: 0,
413 };
414 let event = ChangeEvent {
415 lsn: event_lsn,
416 timestamp,
417 operation,
418 collection: collection.to_string(),
419 entity_id,
420 entity_kind: "kv".to_string(),
421 changed_columns: Some(vec!["value".to_string()]),
422 kv: Some(kv),
423 };
424
425 let mut events = self.events.lock();
426 if events.len() >= self.max_size {
427 events.pop_front();
428 }
429 events.push_back(event);
430 event_lsn
431 }
432
433 pub fn poll(&self, since_lsn: u64, max_count: usize) -> Vec<ChangeEvent> {
435 let events = self.events.lock();
436 events
437 .iter()
438 .filter(|e| e.lsn > since_lsn)
439 .take(max_count)
440 .cloned()
441 .collect()
442 }
443
444 pub fn current_lsn(&self) -> u64 {
446 self.next_lsn.load(Ordering::Acquire)
447 }
448
449 pub fn set_current_lsn(&self, lsn: u64) {
453 let mut current = self.next_lsn.load(Ordering::Acquire);
454 while lsn > current {
455 match self
456 .next_lsn
457 .compare_exchange(current, lsn, Ordering::AcqRel, Ordering::Acquire)
458 {
459 Ok(_) => break,
460 Err(observed) => current = observed,
461 }
462 }
463 }
464
465 pub fn oldest_lsn(&self) -> Option<u64> {
467 self.events.lock().front().map(|e| e.lsn)
468 }
469
470 pub fn stats(&self) -> CdcStats {
472 let events = self.events.lock();
473 CdcStats {
474 buffered_events: events.len(),
475 current_lsn: self.next_lsn.load(Ordering::Acquire),
476 oldest_lsn: events.front().map(|e| e.lsn),
477 newest_lsn: events.back().map(|e| e.lsn),
478 }
479 }
480}
481
482#[derive(Debug, Clone)]
484pub struct CdcStats {
485 pub buffered_events: usize,
486 pub current_lsn: u64,
487 pub oldest_lsn: Option<u64>,
488 pub newest_lsn: Option<u64>,
489}
490
491#[cfg(test)]
492mod tests {
493 use super::*;
494
495 #[test]
496 fn test_emit_and_poll() {
497 let buf = CdcBuffer::new(100);
498 buf.emit(ChangeOperation::Insert, "users", 1, "table");
499 buf.emit(ChangeOperation::Update, "users", 1, "table");
500 buf.emit(ChangeOperation::Delete, "users", 1, "table");
501
502 let events = buf.poll(0, 10);
503 assert_eq!(events.len(), 3);
504 assert_eq!(events[0].operation, ChangeOperation::Insert);
505 assert_eq!(events[1].operation, ChangeOperation::Update);
506 assert_eq!(events[2].operation, ChangeOperation::Delete);
507 assert!(events[0].changed_columns.is_none());
509 assert!(events[1].changed_columns.is_none());
510 }
511
512 #[test]
513 fn test_emit_with_columns_propagates_damage_vector() {
514 let buf = CdcBuffer::new(100);
515 buf.emit_with_columns(
516 ChangeOperation::Update,
517 "users",
518 7,
519 "table",
520 Some(vec!["email".to_string(), "age".to_string()]),
521 );
522 buf.emit(ChangeOperation::Update, "users", 8, "table");
523
524 let events = buf.poll(0, 10);
525 assert_eq!(events.len(), 2);
526 assert_eq!(
527 events[0].changed_columns.as_deref(),
528 Some(vec!["email".to_string(), "age".to_string()].as_slice())
529 );
530 assert!(events[1].changed_columns.is_none());
531 }
532
533 #[test]
534 fn test_poll_with_cursor() {
535 let buf = CdcBuffer::new(100);
536 buf.emit(ChangeOperation::Insert, "a", 1, "table");
537 buf.emit(ChangeOperation::Insert, "b", 2, "table");
538 buf.emit(ChangeOperation::Insert, "c", 3, "table");
539
540 let events = buf.poll(1, 10);
542 assert_eq!(events.len(), 2);
543 assert_eq!(events[0].collection, "b");
544 assert_eq!(events[1].collection, "c");
545 }
546
547 #[test]
548 fn test_emit_batch_same_collection_assigns_contiguous_lsns() {
549 let buf = CdcBuffer::new(100);
550 buf.emit(ChangeOperation::Insert, "users", 10, "table");
551 buf.emit_batch_same_collection(ChangeOperation::Insert, "users", "table", [11, 12, 13]);
552
553 let events = buf.poll(0, 10);
554 assert_eq!(events.len(), 4);
555 assert_eq!(events[1].lsn, 2);
556 assert_eq!(events[2].lsn, 3);
557 assert_eq!(events[3].lsn, 4);
558 assert_eq!(events[3].entity_id, 13);
559 assert_eq!(buf.current_lsn(), 4);
560 }
561
562 #[test]
563 fn test_emit_batch_same_collection_respects_ring_size() {
564 let buf = CdcBuffer::new(3);
565 buf.emit_batch_same_collection(ChangeOperation::Insert, "users", "table", [1, 2, 3, 4, 5]);
566
567 let events = buf.poll(0, 10);
568 assert_eq!(events.len(), 3);
569 assert_eq!(
570 events
571 .iter()
572 .map(|event| event.entity_id)
573 .collect::<Vec<_>>(),
574 vec![3, 4, 5]
575 );
576 assert_eq!(events[0].lsn, 3);
577 assert_eq!(events[2].lsn, 5);
578 assert_eq!(buf.current_lsn(), 5);
579 }
580
581 #[test]
582 fn test_circular_eviction() {
583 let buf = CdcBuffer::new(3);
584 buf.emit(ChangeOperation::Insert, "a", 1, "table");
585 buf.emit(ChangeOperation::Insert, "b", 2, "table");
586 buf.emit(ChangeOperation::Insert, "c", 3, "table");
587 buf.emit(ChangeOperation::Insert, "d", 4, "table"); let events = buf.poll(0, 10);
590 assert_eq!(events.len(), 3);
591 assert_eq!(events[0].collection, "b"); }
593
594 #[test]
595 fn test_stats() {
596 let buf = CdcBuffer::new(100);
597 buf.emit(ChangeOperation::Insert, "x", 1, "table");
598 buf.emit(ChangeOperation::Insert, "y", 2, "table");
599
600 let stats = buf.stats();
601 assert_eq!(stats.buffered_events, 2);
602 assert_eq!(stats.current_lsn, 2);
603 assert_eq!(stats.oldest_lsn, Some(1));
604 assert_eq!(stats.newest_lsn, Some(2));
605 }
606
607 #[test]
608 fn test_change_record_roundtrip() {
609 let record = ChangeRecord {
610 lsn: 7,
611 timestamp: 1234,
612 operation: ChangeOperation::Update,
613 collection: "users".to_string(),
614 entity_id: 42,
615 entity_kind: "row".to_string(),
616 entity_bytes: Some(vec![1, 2, 3]),
617 metadata: Some(crate::json!({"role": "admin"})),
618 };
619
620 let decoded = ChangeRecord::decode(&record.encode()).expect("decode");
621 assert_eq!(decoded.lsn, record.lsn);
622 assert_eq!(decoded.collection, record.collection);
623 assert_eq!(decoded.entity_id, record.entity_id);
624 assert_eq!(decoded.entity_bytes, record.entity_bytes);
625 }
626}