1use std::collections::VecDeque;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::time::{SystemTime, UNIX_EPOCH};
9
10use crate::json::{Map, Value as JsonValue};
11
12pub use reddb_wire::replication::{public_item_kind, ChangeOperation, ChangeRecord};
13pub use reddb_wire::replication::{RangeAdmitError, RangeAuthority};
17pub use reddb_wire::replication::{
21 classify_range_record, plan_range_catchup, RangeCatchupPlan, RangeProgressTracker,
22 RangeStreamDecision, RangeStreamPosition, RangeStreamProgress, RangeStreamReject,
23};
24
25#[derive(Debug, Clone)]
27pub struct ChangeEvent {
28 pub lsn: u64,
30 pub timestamp: u64,
32 pub operation: ChangeOperation,
34 pub collection: String,
36 pub entity_id: u64,
38 pub entity_kind: String,
40 pub changed_columns: Option<Vec<String>>,
47 pub kv: Option<KvWatchEvent>,
50}
51
52impl ChangeEvent {
53 pub fn rid(&self) -> u64 {
54 self.entity_id
55 }
56
57 pub fn kind(&self) -> &'static str {
58 public_item_kind(&self.entity_kind)
59 }
60}
61
62#[derive(Debug, Clone, PartialEq)]
64pub struct KvWatchEvent {
65 pub collection: String,
66 pub key: String,
67 pub op: ChangeOperation,
68 pub before: Option<JsonValue>,
69 pub after: Option<JsonValue>,
70 pub lsn: u64,
71 pub committed_at: u64,
72 pub dropped_event_count: u64,
73}
74
75impl KvWatchEvent {
76 pub fn to_json_value(&self) -> JsonValue {
77 let mut object = Map::new();
78 object.insert("key".to_string(), JsonValue::String(self.key.clone()));
79 object.insert(
80 "op".to_string(),
81 JsonValue::String(self.op.as_str().to_string()),
82 );
83 object.insert(
84 "before".to_string(),
85 self.before.clone().unwrap_or(JsonValue::Null),
86 );
87 object.insert(
88 "after".to_string(),
89 self.after.clone().unwrap_or(JsonValue::Null),
90 );
91 object.insert("lsn".to_string(), JsonValue::Number(self.lsn as f64));
92 object.insert(
93 "committed_at".to_string(),
94 JsonValue::Number(self.committed_at as f64),
95 );
96 object.insert(
97 "dropped_event_count".to_string(),
98 JsonValue::Number(self.dropped_event_count as f64),
99 );
100 JsonValue::Object(object)
101 }
102}
103
104pub fn change_record_from_entity(
105 lsn: u64,
106 timestamp: u64,
107 operation: ChangeOperation,
108 collection: impl Into<String>,
109 entity_kind: impl Into<String>,
110 entity: &crate::storage::UnifiedEntity,
111 format_version: u32,
112 metadata: Option<JsonValue>,
113) -> ChangeRecord {
114 let entity_bytes = match operation {
115 ChangeOperation::Delete | ChangeOperation::Refresh => None,
116 ChangeOperation::Insert | ChangeOperation::Update => Some(
117 crate::storage::UnifiedStore::serialize_entity(entity, format_version),
118 ),
119 };
120
121 ChangeRecord {
122 term: crate::replication::DEFAULT_REPLICATION_TERM,
123 lsn,
124 timestamp,
125 operation,
126 collection: collection.into(),
127 entity_id: entity.id.raw(),
128 entity_kind: entity_kind.into(),
129 entity_bytes,
130 metadata: metadata.map(server_json_to_wire_json),
131 refresh_records: None,
132 range_id: None,
136 ownership_epoch: None,
137 }
138}
139
140pub fn server_json_to_wire_json(
141 value: JsonValue,
142) -> reddb_wire::replication::ChangeRecordJsonValue {
143 reddb_wire::replication::parse_change_record_json_value(&value.to_string_compact())
144 .unwrap_or(reddb_wire::replication::ChangeRecordJsonValue::Null)
145}
146
147pub fn wire_json_to_server_json(
148 value: &reddb_wire::replication::ChangeRecordJsonValue,
149) -> JsonValue {
150 crate::json::from_str(&reddb_wire::replication::change_record_json_value_to_string(value))
151 .unwrap_or(JsonValue::Null)
152}
153
154pub struct CdcBuffer {
175 next_lsn: AtomicU64,
176 events: parking_lot::Mutex<VecDeque<ChangeEvent>>,
177 max_size: usize,
178}
179
180impl CdcBuffer {
181 pub fn new(max_size: usize) -> Self {
183 Self {
184 next_lsn: AtomicU64::new(0),
185 events: parking_lot::Mutex::new(VecDeque::with_capacity(max_size.min(10_000))),
186 max_size,
187 }
188 }
189
190 pub fn emit(
195 &self,
196 operation: ChangeOperation,
197 collection: &str,
198 entity_id: u64,
199 entity_kind: &str,
200 ) -> u64 {
201 self.emit_with_columns(operation, collection, entity_id, entity_kind, None)
202 }
203
204 pub fn emit_with_columns(
209 &self,
210 operation: ChangeOperation,
211 collection: &str,
212 entity_id: u64,
213 entity_kind: &str,
214 changed_columns: Option<Vec<String>>,
215 ) -> u64 {
216 let event_lsn = self.next_lsn.fetch_add(1, Ordering::AcqRel) + 1;
219
220 let event = ChangeEvent {
221 lsn: event_lsn,
222 timestamp: SystemTime::now()
223 .duration_since(UNIX_EPOCH)
224 .unwrap_or_default()
225 .as_millis() as u64,
226 operation,
227 collection: collection.to_string(),
228 entity_id,
229 entity_kind: entity_kind.to_string(),
230 changed_columns,
231 kv: None,
232 };
233
234 let mut events = self.events.lock();
238 if events.len() >= self.max_size {
239 events.pop_front();
240 }
241 events.push_back(event);
242
243 event_lsn
244 }
245
246 pub fn emit_batch_same_collection<I>(
250 &self,
251 operation: ChangeOperation,
252 collection: &str,
253 entity_kind: &str,
254 entity_ids: I,
255 ) -> Vec<u64>
256 where
257 I: IntoIterator<Item = u64>,
258 I::IntoIter: ExactSizeIterator,
259 {
260 let iter = entity_ids.into_iter();
261 let count = iter.len();
262 if count == 0 {
263 return Vec::new();
264 }
265
266 let first_lsn = self.next_lsn.fetch_add(count as u64, Ordering::AcqRel) + 1;
267 let lsns = (0..count)
268 .map(|idx| first_lsn + idx as u64)
269 .collect::<Vec<_>>();
270 if self.max_size == 0 {
271 return lsns;
272 }
273
274 let timestamp = SystemTime::now()
275 .duration_since(UNIX_EPOCH)
276 .unwrap_or_default()
277 .as_millis() as u64;
278 let collection = collection.to_string();
279 let entity_kind = entity_kind.to_string();
280
281 let skip = count.saturating_sub(self.max_size);
282 let kept = count - skip;
283 let mut events = self.events.lock();
284 let overflow = events
285 .len()
286 .saturating_add(kept)
287 .saturating_sub(self.max_size);
288 for _ in 0..overflow {
289 events.pop_front();
290 }
291
292 for (idx, entity_id) in iter.enumerate().skip(skip) {
293 events.push_back(ChangeEvent {
294 lsn: first_lsn + idx as u64,
295 timestamp,
296 operation,
297 collection: collection.clone(),
298 entity_id,
299 entity_kind: entity_kind.clone(),
300 changed_columns: None,
301 kv: None,
302 });
303 }
304 lsns
305 }
306
307 pub fn emit_kv(
310 &self,
311 operation: ChangeOperation,
312 collection: &str,
313 key: &str,
314 entity_id: u64,
315 before: Option<JsonValue>,
316 after: Option<JsonValue>,
317 ) -> u64 {
318 let event_lsn = self.next_lsn.fetch_add(1, Ordering::AcqRel) + 1;
319 let timestamp = SystemTime::now()
320 .duration_since(UNIX_EPOCH)
321 .unwrap_or_default()
322 .as_millis() as u64;
323 let kv = KvWatchEvent {
324 collection: collection.to_string(),
325 key: key.to_string(),
326 op: operation,
327 before,
328 after,
329 lsn: event_lsn,
330 committed_at: timestamp,
331 dropped_event_count: 0,
332 };
333 let event = ChangeEvent {
334 lsn: event_lsn,
335 timestamp,
336 operation,
337 collection: collection.to_string(),
338 entity_id,
339 entity_kind: "kv".to_string(),
340 changed_columns: Some(vec!["value".to_string()]),
341 kv: Some(kv),
342 };
343
344 let mut events = self.events.lock();
345 if events.len() >= self.max_size {
346 events.pop_front();
347 }
348 events.push_back(event);
349 event_lsn
350 }
351
352 pub fn poll(&self, since_lsn: u64, max_count: usize) -> Vec<ChangeEvent> {
354 let events = self.events.lock();
355 events
356 .iter()
357 .filter(|e| e.lsn > since_lsn)
358 .take(max_count)
359 .cloned()
360 .collect()
361 }
362
363 pub fn current_lsn(&self) -> u64 {
365 self.next_lsn.load(Ordering::Acquire)
366 }
367
368 pub fn set_current_lsn(&self, lsn: u64) {
372 let mut current = self.next_lsn.load(Ordering::Acquire);
373 while lsn > current {
374 match self
375 .next_lsn
376 .compare_exchange(current, lsn, Ordering::AcqRel, Ordering::Acquire)
377 {
378 Ok(_) => break,
379 Err(observed) => current = observed,
380 }
381 }
382 }
383
384 pub fn oldest_lsn(&self) -> Option<u64> {
386 self.events.lock().front().map(|e| e.lsn)
387 }
388
389 pub fn stats(&self) -> CdcStats {
391 let events = self.events.lock();
392 CdcStats {
393 buffered_events: events.len(),
394 current_lsn: self.next_lsn.load(Ordering::Acquire),
395 oldest_lsn: events.front().map(|e| e.lsn),
396 newest_lsn: events.back().map(|e| e.lsn),
397 }
398 }
399}
400
401#[derive(Debug, Clone)]
403pub struct CdcStats {
404 pub buffered_events: usize,
405 pub current_lsn: u64,
406 pub oldest_lsn: Option<u64>,
407 pub newest_lsn: Option<u64>,
408}
409
410#[cfg(test)]
411mod tests {
412 use super::*;
413
414 #[test]
415 fn test_emit_and_poll() {
416 let buf = CdcBuffer::new(100);
417 buf.emit(ChangeOperation::Insert, "users", 1, "table");
418 buf.emit(ChangeOperation::Update, "users", 1, "table");
419 buf.emit(ChangeOperation::Delete, "users", 1, "table");
420
421 let events = buf.poll(0, 10);
422 assert_eq!(events.len(), 3);
423 assert_eq!(events[0].operation, ChangeOperation::Insert);
424 assert_eq!(events[1].operation, ChangeOperation::Update);
425 assert_eq!(events[2].operation, ChangeOperation::Delete);
426 assert!(events[0].changed_columns.is_none());
428 assert!(events[1].changed_columns.is_none());
429 }
430
431 #[test]
432 fn test_emit_with_columns_propagates_damage_vector() {
433 let buf = CdcBuffer::new(100);
434 buf.emit_with_columns(
435 ChangeOperation::Update,
436 "users",
437 7,
438 "table",
439 Some(vec!["email".to_string(), "age".to_string()]),
440 );
441 buf.emit(ChangeOperation::Update, "users", 8, "table");
442
443 let events = buf.poll(0, 10);
444 assert_eq!(events.len(), 2);
445 assert_eq!(
446 events[0].changed_columns.as_deref(),
447 Some(vec!["email".to_string(), "age".to_string()].as_slice())
448 );
449 assert!(events[1].changed_columns.is_none());
450 }
451
452 #[test]
453 fn test_poll_with_cursor() {
454 let buf = CdcBuffer::new(100);
455 buf.emit(ChangeOperation::Insert, "a", 1, "table");
456 buf.emit(ChangeOperation::Insert, "b", 2, "table");
457 buf.emit(ChangeOperation::Insert, "c", 3, "table");
458
459 let events = buf.poll(1, 10);
461 assert_eq!(events.len(), 2);
462 assert_eq!(events[0].collection, "b");
463 assert_eq!(events[1].collection, "c");
464 }
465
466 #[test]
467 fn test_emit_batch_same_collection_assigns_contiguous_lsns() {
468 let buf = CdcBuffer::new(100);
469 buf.emit(ChangeOperation::Insert, "users", 10, "table");
470 buf.emit_batch_same_collection(ChangeOperation::Insert, "users", "table", [11, 12, 13]);
471
472 let events = buf.poll(0, 10);
473 assert_eq!(events.len(), 4);
474 assert_eq!(events[1].lsn, 2);
475 assert_eq!(events[2].lsn, 3);
476 assert_eq!(events[3].lsn, 4);
477 assert_eq!(events[3].entity_id, 13);
478 assert_eq!(buf.current_lsn(), 4);
479 }
480
481 #[test]
482 fn test_emit_batch_same_collection_respects_ring_size() {
483 let buf = CdcBuffer::new(3);
484 buf.emit_batch_same_collection(ChangeOperation::Insert, "users", "table", [1, 2, 3, 4, 5]);
485
486 let events = buf.poll(0, 10);
487 assert_eq!(events.len(), 3);
488 assert_eq!(
489 events
490 .iter()
491 .map(|event| event.entity_id)
492 .collect::<Vec<_>>(),
493 vec![3, 4, 5]
494 );
495 assert_eq!(events[0].lsn, 3);
496 assert_eq!(events[2].lsn, 5);
497 assert_eq!(buf.current_lsn(), 5);
498 }
499
500 #[test]
501 fn test_circular_eviction() {
502 let buf = CdcBuffer::new(3);
503 buf.emit(ChangeOperation::Insert, "a", 1, "table");
504 buf.emit(ChangeOperation::Insert, "b", 2, "table");
505 buf.emit(ChangeOperation::Insert, "c", 3, "table");
506 buf.emit(ChangeOperation::Insert, "d", 4, "table"); let events = buf.poll(0, 10);
509 assert_eq!(events.len(), 3);
510 assert_eq!(events[0].collection, "b"); }
512
513 #[test]
514 fn test_stats() {
515 let buf = CdcBuffer::new(100);
516 buf.emit(ChangeOperation::Insert, "x", 1, "table");
517 buf.emit(ChangeOperation::Insert, "y", 2, "table");
518
519 let stats = buf.stats();
520 assert_eq!(stats.buffered_events, 2);
521 assert_eq!(stats.current_lsn, 2);
522 assert_eq!(stats.oldest_lsn, Some(1));
523 assert_eq!(stats.newest_lsn, Some(2));
524 }
525
526 #[test]
527 fn test_change_record_roundtrip() {
528 let record = ChangeRecord {
529 term: 3,
530 lsn: 7,
531 timestamp: 1234,
532 operation: ChangeOperation::Update,
533 collection: "users".to_string(),
534 entity_id: 42,
535 entity_kind: "row".to_string(),
536 entity_bytes: Some(vec![1, 2, 3]),
537 metadata: Some(server_json_to_wire_json(crate::json!({"role": "admin"}))),
538 refresh_records: None,
539 range_id: None,
540 ownership_epoch: None,
541 };
542
543 let decoded = ChangeRecord::decode(&record.encode()).expect("decode");
544 assert_eq!(decoded.term, record.term);
545 assert_eq!(decoded.lsn, record.lsn);
546 assert_eq!(decoded.collection, record.collection);
547 assert_eq!(decoded.entity_id, record.entity_id);
548 assert_eq!(decoded.entity_bytes, record.entity_bytes);
549 }
550
551 #[test]
558 fn test_change_record_refresh_roundtrip() {
559 let records = vec![vec![0x10, 0x20, 0x30], vec![0xAA, 0xBB], Vec::new()];
560 let record =
561 ChangeRecord::for_refresh(11, 99, "mv_orders_summary", records.clone()).with_term(4);
562
563 let decoded = ChangeRecord::decode(&record.encode()).expect("decode");
564 assert_eq!(decoded.term, 4);
565 assert_eq!(decoded.operation, ChangeOperation::Refresh);
566 assert_eq!(decoded.collection, "mv_orders_summary");
567 assert_eq!(decoded.refresh_records.as_deref(), Some(&records[..]));
568 }
569
570 #[test]
571 fn test_change_record_legacy_payload_defaults_term() {
572 let legacy = br#"{"lsn":9,"timestamp":1,"operation":"delete","collection":"users","rid":5,"kind":"row"}"#;
573 let decoded = ChangeRecord::decode(legacy).expect("decode legacy record");
574 assert_eq!(decoded.term, crate::replication::DEFAULT_REPLICATION_TERM);
575 assert_eq!(decoded.lsn, 9);
576 }
577}