1use std::collections::VecDeque;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::time::{SystemTime, UNIX_EPOCH};
9
10use crate::json::{Map, Value as JsonValue};
11
12pub use reddb_wire::replication::{public_item_kind, ChangeOperation, ChangeRecord};
13
14#[derive(Debug, Clone)]
16pub struct ChangeEvent {
17 pub lsn: u64,
19 pub timestamp: u64,
21 pub operation: ChangeOperation,
23 pub collection: String,
25 pub entity_id: u64,
27 pub entity_kind: String,
29 pub changed_columns: Option<Vec<String>>,
36 pub kv: Option<KvWatchEvent>,
39}
40
41impl ChangeEvent {
42 pub fn rid(&self) -> u64 {
43 self.entity_id
44 }
45
46 pub fn kind(&self) -> &'static str {
47 public_item_kind(&self.entity_kind)
48 }
49}
50
51#[derive(Debug, Clone, PartialEq)]
53pub struct KvWatchEvent {
54 pub collection: String,
55 pub key: String,
56 pub op: ChangeOperation,
57 pub before: Option<JsonValue>,
58 pub after: Option<JsonValue>,
59 pub lsn: u64,
60 pub committed_at: u64,
61 pub dropped_event_count: u64,
62}
63
64impl KvWatchEvent {
65 pub fn to_json_value(&self) -> JsonValue {
66 let mut object = Map::new();
67 object.insert("key".to_string(), JsonValue::String(self.key.clone()));
68 object.insert(
69 "op".to_string(),
70 JsonValue::String(self.op.as_str().to_string()),
71 );
72 object.insert(
73 "before".to_string(),
74 self.before.clone().unwrap_or(JsonValue::Null),
75 );
76 object.insert(
77 "after".to_string(),
78 self.after.clone().unwrap_or(JsonValue::Null),
79 );
80 object.insert("lsn".to_string(), JsonValue::Number(self.lsn as f64));
81 object.insert(
82 "committed_at".to_string(),
83 JsonValue::Number(self.committed_at as f64),
84 );
85 object.insert(
86 "dropped_event_count".to_string(),
87 JsonValue::Number(self.dropped_event_count as f64),
88 );
89 JsonValue::Object(object)
90 }
91}
92
93pub fn change_record_from_entity(
94 lsn: u64,
95 timestamp: u64,
96 operation: ChangeOperation,
97 collection: impl Into<String>,
98 entity_kind: impl Into<String>,
99 entity: &crate::storage::UnifiedEntity,
100 format_version: u32,
101 metadata: Option<JsonValue>,
102) -> ChangeRecord {
103 let entity_bytes = match operation {
104 ChangeOperation::Delete | ChangeOperation::Refresh => None,
105 ChangeOperation::Insert | ChangeOperation::Update => Some(
106 crate::storage::UnifiedStore::serialize_entity(entity, format_version),
107 ),
108 };
109
110 ChangeRecord {
111 term: crate::replication::DEFAULT_REPLICATION_TERM,
112 lsn,
113 timestamp,
114 operation,
115 collection: collection.into(),
116 entity_id: entity.id.raw(),
117 entity_kind: entity_kind.into(),
118 entity_bytes,
119 metadata: metadata.map(server_json_to_wire_json),
120 refresh_records: None,
121 }
122}
123
124pub fn server_json_to_wire_json(
125 value: JsonValue,
126) -> reddb_wire::replication::ChangeRecordJsonValue {
127 reddb_wire::replication::parse_change_record_json_value(&value.to_string_compact())
128 .unwrap_or(reddb_wire::replication::ChangeRecordJsonValue::Null)
129}
130
131pub fn wire_json_to_server_json(
132 value: &reddb_wire::replication::ChangeRecordJsonValue,
133) -> JsonValue {
134 crate::json::from_str(&reddb_wire::replication::change_record_json_value_to_string(value))
135 .unwrap_or(JsonValue::Null)
136}
137
138pub struct CdcBuffer {
159 next_lsn: AtomicU64,
160 events: parking_lot::Mutex<VecDeque<ChangeEvent>>,
161 max_size: usize,
162}
163
164impl CdcBuffer {
165 pub fn new(max_size: usize) -> Self {
167 Self {
168 next_lsn: AtomicU64::new(0),
169 events: parking_lot::Mutex::new(VecDeque::with_capacity(max_size.min(10_000))),
170 max_size,
171 }
172 }
173
174 pub fn emit(
179 &self,
180 operation: ChangeOperation,
181 collection: &str,
182 entity_id: u64,
183 entity_kind: &str,
184 ) -> u64 {
185 self.emit_with_columns(operation, collection, entity_id, entity_kind, None)
186 }
187
188 pub fn emit_with_columns(
193 &self,
194 operation: ChangeOperation,
195 collection: &str,
196 entity_id: u64,
197 entity_kind: &str,
198 changed_columns: Option<Vec<String>>,
199 ) -> u64 {
200 let event_lsn = self.next_lsn.fetch_add(1, Ordering::AcqRel) + 1;
203
204 let event = ChangeEvent {
205 lsn: event_lsn,
206 timestamp: SystemTime::now()
207 .duration_since(UNIX_EPOCH)
208 .unwrap_or_default()
209 .as_millis() as u64,
210 operation,
211 collection: collection.to_string(),
212 entity_id,
213 entity_kind: entity_kind.to_string(),
214 changed_columns,
215 kv: None,
216 };
217
218 let mut events = self.events.lock();
222 if events.len() >= self.max_size {
223 events.pop_front();
224 }
225 events.push_back(event);
226
227 event_lsn
228 }
229
230 pub fn emit_batch_same_collection<I>(
234 &self,
235 operation: ChangeOperation,
236 collection: &str,
237 entity_kind: &str,
238 entity_ids: I,
239 ) -> Vec<u64>
240 where
241 I: IntoIterator<Item = u64>,
242 I::IntoIter: ExactSizeIterator,
243 {
244 let iter = entity_ids.into_iter();
245 let count = iter.len();
246 if count == 0 {
247 return Vec::new();
248 }
249
250 let first_lsn = self.next_lsn.fetch_add(count as u64, Ordering::AcqRel) + 1;
251 let lsns = (0..count)
252 .map(|idx| first_lsn + idx as u64)
253 .collect::<Vec<_>>();
254 if self.max_size == 0 {
255 return lsns;
256 }
257
258 let timestamp = SystemTime::now()
259 .duration_since(UNIX_EPOCH)
260 .unwrap_or_default()
261 .as_millis() as u64;
262 let collection = collection.to_string();
263 let entity_kind = entity_kind.to_string();
264
265 let skip = count.saturating_sub(self.max_size);
266 let kept = count - skip;
267 let mut events = self.events.lock();
268 let overflow = events
269 .len()
270 .saturating_add(kept)
271 .saturating_sub(self.max_size);
272 for _ in 0..overflow {
273 events.pop_front();
274 }
275
276 for (idx, entity_id) in iter.enumerate().skip(skip) {
277 events.push_back(ChangeEvent {
278 lsn: first_lsn + idx as u64,
279 timestamp,
280 operation,
281 collection: collection.clone(),
282 entity_id,
283 entity_kind: entity_kind.clone(),
284 changed_columns: None,
285 kv: None,
286 });
287 }
288 lsns
289 }
290
291 pub fn emit_kv(
294 &self,
295 operation: ChangeOperation,
296 collection: &str,
297 key: &str,
298 entity_id: u64,
299 before: Option<JsonValue>,
300 after: Option<JsonValue>,
301 ) -> u64 {
302 let event_lsn = self.next_lsn.fetch_add(1, Ordering::AcqRel) + 1;
303 let timestamp = SystemTime::now()
304 .duration_since(UNIX_EPOCH)
305 .unwrap_or_default()
306 .as_millis() as u64;
307 let kv = KvWatchEvent {
308 collection: collection.to_string(),
309 key: key.to_string(),
310 op: operation,
311 before,
312 after,
313 lsn: event_lsn,
314 committed_at: timestamp,
315 dropped_event_count: 0,
316 };
317 let event = ChangeEvent {
318 lsn: event_lsn,
319 timestamp,
320 operation,
321 collection: collection.to_string(),
322 entity_id,
323 entity_kind: "kv".to_string(),
324 changed_columns: Some(vec!["value".to_string()]),
325 kv: Some(kv),
326 };
327
328 let mut events = self.events.lock();
329 if events.len() >= self.max_size {
330 events.pop_front();
331 }
332 events.push_back(event);
333 event_lsn
334 }
335
336 pub fn poll(&self, since_lsn: u64, max_count: usize) -> Vec<ChangeEvent> {
338 let events = self.events.lock();
339 events
340 .iter()
341 .filter(|e| e.lsn > since_lsn)
342 .take(max_count)
343 .cloned()
344 .collect()
345 }
346
347 pub fn current_lsn(&self) -> u64 {
349 self.next_lsn.load(Ordering::Acquire)
350 }
351
352 pub fn set_current_lsn(&self, lsn: u64) {
356 let mut current = self.next_lsn.load(Ordering::Acquire);
357 while lsn > current {
358 match self
359 .next_lsn
360 .compare_exchange(current, lsn, Ordering::AcqRel, Ordering::Acquire)
361 {
362 Ok(_) => break,
363 Err(observed) => current = observed,
364 }
365 }
366 }
367
368 pub fn oldest_lsn(&self) -> Option<u64> {
370 self.events.lock().front().map(|e| e.lsn)
371 }
372
373 pub fn stats(&self) -> CdcStats {
375 let events = self.events.lock();
376 CdcStats {
377 buffered_events: events.len(),
378 current_lsn: self.next_lsn.load(Ordering::Acquire),
379 oldest_lsn: events.front().map(|e| e.lsn),
380 newest_lsn: events.back().map(|e| e.lsn),
381 }
382 }
383}
384
385#[derive(Debug, Clone)]
387pub struct CdcStats {
388 pub buffered_events: usize,
389 pub current_lsn: u64,
390 pub oldest_lsn: Option<u64>,
391 pub newest_lsn: Option<u64>,
392}
393
394#[cfg(test)]
395mod tests {
396 use super::*;
397
398 #[test]
399 fn test_emit_and_poll() {
400 let buf = CdcBuffer::new(100);
401 buf.emit(ChangeOperation::Insert, "users", 1, "table");
402 buf.emit(ChangeOperation::Update, "users", 1, "table");
403 buf.emit(ChangeOperation::Delete, "users", 1, "table");
404
405 let events = buf.poll(0, 10);
406 assert_eq!(events.len(), 3);
407 assert_eq!(events[0].operation, ChangeOperation::Insert);
408 assert_eq!(events[1].operation, ChangeOperation::Update);
409 assert_eq!(events[2].operation, ChangeOperation::Delete);
410 assert!(events[0].changed_columns.is_none());
412 assert!(events[1].changed_columns.is_none());
413 }
414
415 #[test]
416 fn test_emit_with_columns_propagates_damage_vector() {
417 let buf = CdcBuffer::new(100);
418 buf.emit_with_columns(
419 ChangeOperation::Update,
420 "users",
421 7,
422 "table",
423 Some(vec!["email".to_string(), "age".to_string()]),
424 );
425 buf.emit(ChangeOperation::Update, "users", 8, "table");
426
427 let events = buf.poll(0, 10);
428 assert_eq!(events.len(), 2);
429 assert_eq!(
430 events[0].changed_columns.as_deref(),
431 Some(vec!["email".to_string(), "age".to_string()].as_slice())
432 );
433 assert!(events[1].changed_columns.is_none());
434 }
435
436 #[test]
437 fn test_poll_with_cursor() {
438 let buf = CdcBuffer::new(100);
439 buf.emit(ChangeOperation::Insert, "a", 1, "table");
440 buf.emit(ChangeOperation::Insert, "b", 2, "table");
441 buf.emit(ChangeOperation::Insert, "c", 3, "table");
442
443 let events = buf.poll(1, 10);
445 assert_eq!(events.len(), 2);
446 assert_eq!(events[0].collection, "b");
447 assert_eq!(events[1].collection, "c");
448 }
449
450 #[test]
451 fn test_emit_batch_same_collection_assigns_contiguous_lsns() {
452 let buf = CdcBuffer::new(100);
453 buf.emit(ChangeOperation::Insert, "users", 10, "table");
454 buf.emit_batch_same_collection(ChangeOperation::Insert, "users", "table", [11, 12, 13]);
455
456 let events = buf.poll(0, 10);
457 assert_eq!(events.len(), 4);
458 assert_eq!(events[1].lsn, 2);
459 assert_eq!(events[2].lsn, 3);
460 assert_eq!(events[3].lsn, 4);
461 assert_eq!(events[3].entity_id, 13);
462 assert_eq!(buf.current_lsn(), 4);
463 }
464
465 #[test]
466 fn test_emit_batch_same_collection_respects_ring_size() {
467 let buf = CdcBuffer::new(3);
468 buf.emit_batch_same_collection(ChangeOperation::Insert, "users", "table", [1, 2, 3, 4, 5]);
469
470 let events = buf.poll(0, 10);
471 assert_eq!(events.len(), 3);
472 assert_eq!(
473 events
474 .iter()
475 .map(|event| event.entity_id)
476 .collect::<Vec<_>>(),
477 vec![3, 4, 5]
478 );
479 assert_eq!(events[0].lsn, 3);
480 assert_eq!(events[2].lsn, 5);
481 assert_eq!(buf.current_lsn(), 5);
482 }
483
484 #[test]
485 fn test_circular_eviction() {
486 let buf = CdcBuffer::new(3);
487 buf.emit(ChangeOperation::Insert, "a", 1, "table");
488 buf.emit(ChangeOperation::Insert, "b", 2, "table");
489 buf.emit(ChangeOperation::Insert, "c", 3, "table");
490 buf.emit(ChangeOperation::Insert, "d", 4, "table"); let events = buf.poll(0, 10);
493 assert_eq!(events.len(), 3);
494 assert_eq!(events[0].collection, "b"); }
496
497 #[test]
498 fn test_stats() {
499 let buf = CdcBuffer::new(100);
500 buf.emit(ChangeOperation::Insert, "x", 1, "table");
501 buf.emit(ChangeOperation::Insert, "y", 2, "table");
502
503 let stats = buf.stats();
504 assert_eq!(stats.buffered_events, 2);
505 assert_eq!(stats.current_lsn, 2);
506 assert_eq!(stats.oldest_lsn, Some(1));
507 assert_eq!(stats.newest_lsn, Some(2));
508 }
509
510 #[test]
511 fn test_change_record_roundtrip() {
512 let record = ChangeRecord {
513 term: 3,
514 lsn: 7,
515 timestamp: 1234,
516 operation: ChangeOperation::Update,
517 collection: "users".to_string(),
518 entity_id: 42,
519 entity_kind: "row".to_string(),
520 entity_bytes: Some(vec![1, 2, 3]),
521 metadata: Some(server_json_to_wire_json(crate::json!({"role": "admin"}))),
522 refresh_records: None,
523 };
524
525 let decoded = ChangeRecord::decode(&record.encode()).expect("decode");
526 assert_eq!(decoded.term, record.term);
527 assert_eq!(decoded.lsn, record.lsn);
528 assert_eq!(decoded.collection, record.collection);
529 assert_eq!(decoded.entity_id, record.entity_id);
530 assert_eq!(decoded.entity_bytes, record.entity_bytes);
531 }
532
533 #[test]
540 fn test_change_record_refresh_roundtrip() {
541 let records = vec![vec![0x10, 0x20, 0x30], vec![0xAA, 0xBB], Vec::new()];
542 let record =
543 ChangeRecord::for_refresh(11, 99, "mv_orders_summary", records.clone()).with_term(4);
544
545 let decoded = ChangeRecord::decode(&record.encode()).expect("decode");
546 assert_eq!(decoded.term, 4);
547 assert_eq!(decoded.operation, ChangeOperation::Refresh);
548 assert_eq!(decoded.collection, "mv_orders_summary");
549 assert_eq!(decoded.refresh_records.as_deref(), Some(&records[..]));
550 }
551
552 #[test]
553 fn test_change_record_legacy_payload_defaults_term() {
554 let legacy = br#"{"lsn":9,"timestamp":1,"operation":"delete","collection":"users","rid":5,"kind":"row"}"#;
555 let decoded = ChangeRecord::decode(legacy).expect("decode legacy record");
556 assert_eq!(decoded.term, crate::replication::DEFAULT_REPLICATION_TERM);
557 assert_eq!(decoded.lsn, 9);
558 }
559}