1use std::collections::VecDeque;
43use std::sync::atomic::{AtomicU64, Ordering};
44use std::sync::{Arc, Condvar, Mutex, RwLock};
45use std::time::{Duration, SystemTime, UNIX_EPOCH};
46
47#[derive(Debug, Clone, PartialEq)]
53pub struct CdcEvent {
54 pub sequence: u64,
56 pub timestamp_us: u64,
58 pub txn_id: u64,
60 pub table: String,
62 pub key: Vec<u8>,
64 pub operation: CdcOperation,
66}
67
68#[derive(Debug, Clone, PartialEq, Eq)]
70pub enum CdcOperation {
71 Insert { after: Vec<u8> },
73 Update {
76 before: Option<Vec<u8>>,
77 after: Vec<u8>,
78 },
79 Delete { before: Option<Vec<u8>> },
81 SchemaChange { ddl: String },
83}
84
85#[derive(Debug, Clone, PartialEq, Eq)]
91pub enum CdcError {
92 Overrun {
95 requested: u64,
96 oldest_available: u64,
97 },
98 Shutdown,
100 Timeout,
102}
103
104impl std::fmt::Display for CdcError {
105 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106 match self {
107 CdcError::Overrun {
108 requested,
109 oldest_available,
110 } => write!(
111 f,
112 "CDC overrun: requested seq {} but oldest available is {}",
113 requested, oldest_available
114 ),
115 CdcError::Shutdown => write!(f, "CDC engine shut down"),
116 CdcError::Timeout => write!(f, "Timed out waiting for CDC events"),
117 }
118 }
119}
120
121impl std::error::Error for CdcError {}
122
123pub type CdcResult<T> = Result<T, CdcError>;
124
125#[derive(Debug, Clone)]
131pub struct CdcConfig {
132 pub capacity: usize,
135 pub enabled: bool,
137}
138
139impl Default for CdcConfig {
140 fn default() -> Self {
141 Self {
142 capacity: 65_536,
143 enabled: true,
144 }
145 }
146}
147
148pub struct CdcLog {
150 buffer: RwLock<VecDeque<CdcEvent>>,
152 capacity: usize,
154 next_seq: AtomicU64,
156 notify: Arc<(Mutex<bool>, Condvar)>,
158 running: AtomicU64, }
161
162impl CdcLog {
163 pub fn new(config: CdcConfig) -> Arc<Self> {
165 Arc::new(Self {
166 buffer: RwLock::new(VecDeque::with_capacity(config.capacity)),
167 capacity: config.capacity,
168 next_seq: AtomicU64::new(1),
169 notify: Arc::new((Mutex::new(false), Condvar::new())),
170 running: AtomicU64::new(1),
171 })
172 }
173
174 pub fn emit(&self, events: Vec<CdcEvent>) {
180 if self.running.load(Ordering::Relaxed) == 0 {
181 return;
182 }
183 if events.is_empty() {
184 return;
185 }
186
187 let mut buf = self.buffer.write().unwrap();
188 for event in events {
189 if buf.len() >= self.capacity {
190 buf.pop_front(); }
192 buf.push_back(event);
193 }
194 drop(buf);
195
196 let (lock, cvar) = &*self.notify;
198 let mut ready = lock.lock().unwrap();
199 *ready = true;
200 cvar.notify_all();
201 }
202
203 pub fn emit_one(&self, event: CdcEvent) {
205 if self.running.load(Ordering::Relaxed) == 0 {
206 return;
207 }
208
209 let mut buf = self.buffer.write().unwrap();
210 if buf.len() >= self.capacity {
211 buf.pop_front();
212 }
213 buf.push_back(event);
214 drop(buf);
215
216 let (lock, cvar) = &*self.notify;
217 let mut ready = lock.lock().unwrap();
218 *ready = true;
219 cvar.notify_all();
220 }
221
222 pub fn next_sequence(&self) -> u64 {
224 self.next_seq.fetch_add(1, Ordering::SeqCst)
225 }
226
227 pub fn current_sequence(&self) -> u64 {
229 self.next_seq.load(Ordering::SeqCst).saturating_sub(1)
230 }
231
232 pub fn read_from(&self, from_seq: u64, max_events: usize) -> CdcResult<Vec<CdcEvent>> {
237 let buf = self.buffer.read().unwrap();
238
239 if buf.is_empty() {
240 return Ok(Vec::new());
241 }
242
243 let oldest_seq = buf.front().map(|e| e.sequence).unwrap_or(0);
244 let newest_seq = buf.back().map(|e| e.sequence).unwrap_or(0);
245
246 if from_seq < oldest_seq {
247 return Err(CdcError::Overrun {
248 requested: from_seq,
249 oldest_available: oldest_seq,
250 });
251 }
252
253 if from_seq > newest_seq {
254 return Ok(Vec::new()); }
256
257 let start_idx = buf
259 .iter()
260 .position(|e| e.sequence >= from_seq)
261 .unwrap_or(buf.len());
262
263 let events: Vec<CdcEvent> = buf
264 .iter()
265 .skip(start_idx)
266 .take(max_events)
267 .cloned()
268 .collect();
269
270 Ok(events)
271 }
272
273 pub fn wait_for_events(
278 &self,
279 after_seq: u64,
280 max_events: usize,
281 timeout: Duration,
282 ) -> CdcResult<Vec<CdcEvent>> {
283 if self.running.load(Ordering::Relaxed) == 0 {
284 return Err(CdcError::Shutdown);
285 }
286
287 let events = self.read_from(after_seq + 1, max_events)?;
289 if !events.is_empty() {
290 return Ok(events);
291 }
292
293 let (lock, cvar) = &*self.notify;
295 let mut ready = lock.lock().unwrap();
296 let start = std::time::Instant::now();
297
298 loop {
299 if self.running.load(Ordering::Relaxed) == 0 {
300 return Err(CdcError::Shutdown);
301 }
302
303 let remaining = timeout
304 .checked_sub(start.elapsed())
305 .unwrap_or(Duration::ZERO);
306 if remaining.is_zero() {
307 return Err(CdcError::Timeout);
308 }
309
310 let result = cvar.wait_timeout(ready, remaining).unwrap();
311 ready = result.0;
312
313 let events = self.read_from(after_seq + 1, max_events)?;
315 if !events.is_empty() {
316 *ready = false;
317 return Ok(events);
318 }
319
320 if result.1.timed_out() {
321 return Err(CdcError::Timeout);
322 }
323 }
324 }
325
326 pub fn oldest_sequence(&self) -> u64 {
328 self.buffer
329 .read()
330 .unwrap()
331 .front()
332 .map(|e| e.sequence)
333 .unwrap_or(0)
334 }
335
336 pub fn len(&self) -> usize {
338 self.buffer.read().unwrap().len()
339 }
340
341 pub fn is_empty(&self) -> bool {
343 self.buffer.read().unwrap().is_empty()
344 }
345
346 pub fn shutdown(&self) {
348 self.running.store(0, Ordering::SeqCst);
349 let (lock, cvar) = &*self.notify;
350 let mut ready = lock.lock().unwrap();
351 *ready = true;
352 cvar.notify_all();
353 drop(ready);
354 }
355}
356
357pub struct CdcEmitter {
372 log: Arc<CdcLog>,
373 txn_id: u64,
374 pending: Vec<CdcEvent>,
375}
376
377impl CdcEmitter {
378 pub fn new(log: Arc<CdcLog>, txn_id: u64) -> Self {
379 Self {
380 log,
381 txn_id,
382 pending: Vec::new(),
383 }
384 }
385
386 fn now_us() -> u64 {
387 SystemTime::now()
388 .duration_since(UNIX_EPOCH)
389 .unwrap_or_default()
390 .as_micros() as u64
391 }
392
393 pub fn insert(&mut self, table: &str, key: Vec<u8>, value: Vec<u8>) {
395 let seq = self.log.next_sequence();
396 self.pending.push(CdcEvent {
397 sequence: seq,
398 timestamp_us: Self::now_us(),
399 txn_id: self.txn_id,
400 table: table.to_string(),
401 key,
402 operation: CdcOperation::Insert { after: value },
403 });
404 }
405
406 pub fn update(&mut self, table: &str, key: Vec<u8>, new_value: Vec<u8>) {
408 let seq = self.log.next_sequence();
409 self.pending.push(CdcEvent {
410 sequence: seq,
411 timestamp_us: Self::now_us(),
412 txn_id: self.txn_id,
413 table: table.to_string(),
414 key,
415 operation: CdcOperation::Update {
416 before: None,
417 after: new_value,
418 },
419 });
420 }
421
422 pub fn delete(&mut self, table: &str, key: Vec<u8>) {
424 let seq = self.log.next_sequence();
425 self.pending.push(CdcEvent {
426 sequence: seq,
427 timestamp_us: Self::now_us(),
428 txn_id: self.txn_id,
429 table: table.to_string(),
430 key,
431 operation: CdcOperation::Delete { before: None },
432 });
433 }
434
435 pub fn schema_change(&mut self, table: &str, ddl: String) {
437 let seq = self.log.next_sequence();
438 self.pending.push(CdcEvent {
439 sequence: seq,
440 timestamp_us: Self::now_us(),
441 txn_id: self.txn_id,
442 table: table.to_string(),
443 key: Vec::new(),
444 operation: CdcOperation::SchemaChange { ddl },
445 });
446 }
447
448 pub fn flush(self) {
451 if !self.pending.is_empty() {
452 self.log.emit(self.pending);
453 }
454 }
455
456 pub fn discard(self) {
458 }
460
461 pub fn pending_count(&self) -> usize {
463 self.pending.len()
464 }
465}
466
467pub struct CdcSubscriber {
473 log: Arc<CdcLog>,
474 last_seq: u64,
476 table_filter: Option<Vec<String>>,
478}
479
480impl CdcSubscriber {
481 pub fn new(log: Arc<CdcLog>, from_seq: u64) -> Self {
484 Self {
485 log,
486 last_seq: from_seq,
487 table_filter: None,
488 }
489 }
490
491 pub fn from_latest(log: Arc<CdcLog>) -> Self {
493 let seq = log.current_sequence();
494 Self {
495 log,
496 last_seq: seq,
497 table_filter: None,
498 }
499 }
500
501 pub fn with_tables(mut self, tables: Vec<String>) -> Self {
503 self.table_filter = Some(tables);
504 self
505 }
506
507 pub fn poll(&mut self, max_events: usize) -> CdcResult<Vec<CdcEvent>> {
509 let events = self.log.read_from(self.last_seq + 1, max_events)?;
510 let filtered = self.filter_events(events);
511 if let Some(last) = filtered.last() {
512 self.last_seq = last.sequence;
513 }
514 Ok(filtered)
515 }
516
517 pub fn next_batch(&mut self, max_events: usize, timeout: Duration) -> CdcResult<Vec<CdcEvent>> {
519 let events = self
520 .log
521 .wait_for_events(self.last_seq, max_events, timeout)?;
522 let filtered = self.filter_events(events);
523 if let Some(last) = filtered.last() {
524 self.last_seq = last.sequence;
525 }
526 Ok(filtered)
527 }
528
529 pub fn position(&self) -> u64 {
531 self.last_seq
532 }
533
534 fn filter_events(&self, events: Vec<CdcEvent>) -> Vec<CdcEvent> {
535 if let Some(ref tables) = self.table_filter {
536 events
537 .into_iter()
538 .filter(|e| tables.iter().any(|t| *t == e.table))
539 .collect()
540 } else {
541 events
542 }
543 }
544}
545
546#[cfg(test)]
551mod tests {
552 use super::*;
553 use std::thread;
554
555 fn make_log(cap: usize) -> Arc<CdcLog> {
556 CdcLog::new(CdcConfig {
557 capacity: cap,
558 enabled: true,
559 })
560 }
561
562 #[test]
563 fn test_cdc_emit_and_read() {
564 let log = make_log(100);
565 let mut emitter = CdcEmitter::new(log.clone(), 42);
566
567 emitter.insert("users", b"key1".to_vec(), b"val1".to_vec());
568 emitter.insert("users", b"key2".to_vec(), b"val2".to_vec());
569 assert_eq!(emitter.pending_count(), 2);
570
571 emitter.flush();
572
573 assert_eq!(log.len(), 2);
574 let events = log.read_from(1, 10).unwrap();
575 assert_eq!(events.len(), 2);
576 assert_eq!(events[0].table, "users");
577 assert_eq!(events[0].txn_id, 42);
578 assert_eq!(events[0].sequence, 1);
579 assert_eq!(events[1].sequence, 2);
580 }
581
582 #[test]
583 fn test_cdc_ring_buffer_overflow() {
584 let log = make_log(3);
585
586 for i in 1..=5 {
587 log.emit_one(CdcEvent {
588 sequence: log.next_sequence(),
589 timestamp_us: 0,
590 txn_id: i,
591 table: "t".into(),
592 key: vec![i as u8],
593 operation: CdcOperation::Insert {
594 after: vec![i as u8],
595 },
596 });
597 }
598
599 assert_eq!(log.len(), 3);
601 assert_eq!(log.oldest_sequence(), 3);
602
603 let err = log.read_from(1, 10).unwrap_err();
605 assert!(matches!(
606 err,
607 CdcError::Overrun {
608 requested: 1,
609 oldest_available: 3
610 }
611 ));
612
613 let events = log.read_from(3, 10).unwrap();
615 assert_eq!(events.len(), 3);
616 }
617
618 #[test]
619 fn test_cdc_subscriber() {
620 let log = make_log(100);
621
622 let mut emitter = CdcEmitter::new(log.clone(), 1);
624 emitter.insert("users", b"u1".to_vec(), b"v1".to_vec());
625 emitter.insert("orders", b"o1".to_vec(), b"v2".to_vec());
626 emitter.flush();
627
628 let mut sub = CdcSubscriber::new(log.clone(), 0);
630 let events = sub.poll(10).unwrap();
631 assert_eq!(events.len(), 2);
632 assert_eq!(sub.position(), 2);
633
634 let events = sub.poll(10).unwrap();
636 assert_eq!(events.len(), 0);
637
638 let mut emitter = CdcEmitter::new(log.clone(), 2);
640 emitter.update("users", b"u1".to_vec(), b"v1_updated".to_vec());
641 emitter.flush();
642
643 let events = sub.poll(10).unwrap();
644 assert_eq!(events.len(), 1);
645 assert!(matches!(events[0].operation, CdcOperation::Update { .. }));
646 }
647
648 #[test]
649 fn test_cdc_table_filter() {
650 let log = make_log(100);
651
652 let mut emitter = CdcEmitter::new(log.clone(), 1);
653 emitter.insert("users", b"u1".to_vec(), b"v1".to_vec());
654 emitter.insert("orders", b"o1".to_vec(), b"v2".to_vec());
655 emitter.insert("users", b"u2".to_vec(), b"v3".to_vec());
656 emitter.flush();
657
658 let mut sub = CdcSubscriber::new(log.clone(), 0).with_tables(vec!["users".to_string()]);
659 let events = sub.poll(10).unwrap();
660 assert_eq!(events.len(), 2);
661 assert!(events.iter().all(|e| e.table == "users"));
662 }
663
664 #[test]
665 fn test_cdc_subscriber_from_latest() {
666 let log = make_log(100);
667
668 log.emit_one(CdcEvent {
670 sequence: log.next_sequence(),
671 timestamp_us: 0,
672 txn_id: 1,
673 table: "old".into(),
674 key: vec![],
675 operation: CdcOperation::Insert { after: vec![] },
676 });
677
678 let mut sub = CdcSubscriber::from_latest(log.clone());
680
681 log.emit_one(CdcEvent {
683 sequence: log.next_sequence(),
684 timestamp_us: 0,
685 txn_id: 2,
686 table: "new".into(),
687 key: vec![],
688 operation: CdcOperation::Insert { after: vec![] },
689 });
690
691 let events = sub.poll(10).unwrap();
692 assert_eq!(events.len(), 1);
693 assert_eq!(events[0].table, "new");
694 }
695
696 #[test]
697 fn test_cdc_wait_for_events() {
698 let log = make_log(100);
699 let log_clone = log.clone();
700
701 let handle = thread::spawn(move || {
703 thread::sleep(Duration::from_millis(50));
704 log_clone.emit_one(CdcEvent {
705 sequence: log_clone.next_sequence(),
706 timestamp_us: 0,
707 txn_id: 1,
708 table: "t".into(),
709 key: vec![1],
710 operation: CdcOperation::Insert { after: vec![1] },
711 });
712 });
713
714 let events = log.wait_for_events(0, 10, Duration::from_secs(2)).unwrap();
715 assert_eq!(events.len(), 1);
716 handle.join().unwrap();
717 }
718
719 #[test]
720 fn test_cdc_wait_timeout() {
721 let log = make_log(100);
722 let err = log
723 .wait_for_events(0, 10, Duration::from_millis(50))
724 .unwrap_err();
725 assert!(matches!(err, CdcError::Timeout));
726 }
727
728 #[test]
729 fn test_cdc_shutdown() {
730 let log = make_log(100);
731 let log_clone = log.clone();
732
733 let handle =
734 thread::spawn(move || log_clone.wait_for_events(0, 10, Duration::from_secs(5)));
735
736 thread::sleep(Duration::from_millis(50));
737 log.shutdown();
738
739 let result = handle.join().unwrap();
740 assert!(matches!(result, Err(CdcError::Shutdown)));
741 }
742
743 #[test]
744 fn test_cdc_emitter_discard() {
745 let log = make_log(100);
746 let mut emitter = CdcEmitter::new(log.clone(), 1);
747 emitter.insert("t", b"k".to_vec(), b"v".to_vec());
748 emitter.discard(); assert!(log.is_empty());
751 }
752
753 #[test]
754 fn test_cdc_schema_change() {
755 let log = make_log(100);
756 let mut emitter = CdcEmitter::new(log.clone(), 1);
757 emitter.schema_change("users", "ALTER TABLE users ADD COLUMN age INT".to_string());
758 emitter.flush();
759
760 let events = log.read_from(1, 10).unwrap();
761 assert_eq!(events.len(), 1);
762 assert!(matches!(
763 &events[0].operation,
764 CdcOperation::SchemaChange { ddl } if ddl.contains("ALTER TABLE")
765 ));
766 }
767
768 #[test]
769 fn test_cdc_concurrent_emit_and_read() {
770 let log = make_log(10_000);
771 let log_clone = log.clone();
772
773 let writer = thread::spawn(move || {
775 for i in 0..1000 {
776 log_clone.emit_one(CdcEvent {
777 sequence: log_clone.next_sequence(),
778 timestamp_us: 0,
779 txn_id: i as u64,
780 table: "t".into(),
781 key: vec![],
782 operation: CdcOperation::Insert { after: vec![] },
783 });
784 }
785 });
786
787 writer.join().unwrap();
788
789 let events = log.read_from(1, 10_000).unwrap();
790 assert_eq!(events.len(), 1000);
791 for i in 1..events.len() {
793 assert!(events[i].sequence > events[i - 1].sequence);
794 }
795 }
796}