1use std::collections::VecDeque;
43use std::sync::atomic::{AtomicU64, Ordering};
44use std::sync::{Arc, Condvar, Mutex, RwLock};
45use std::time::{Duration, SystemTime, UNIX_EPOCH};
46
47#[derive(Debug, Clone, PartialEq)]
53pub struct CdcEvent {
54 pub sequence: u64,
56 pub timestamp_us: u64,
58 pub txn_id: u64,
60 pub table: String,
62 pub key: Vec<u8>,
64 pub operation: CdcOperation,
66}
67
68#[derive(Debug, Clone, PartialEq, Eq)]
70pub enum CdcOperation {
71 Insert { after: Vec<u8> },
73 Update {
76 before: Option<Vec<u8>>,
77 after: Vec<u8>,
78 },
79 Delete { before: Option<Vec<u8>> },
81 SchemaChange { ddl: String },
83}
84
85#[derive(Debug, Clone, PartialEq, Eq)]
91pub enum CdcError {
92 Overrun {
95 requested: u64,
96 oldest_available: u64,
97 },
98 Shutdown,
100 Timeout,
102}
103
104impl std::fmt::Display for CdcError {
105 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106 match self {
107 CdcError::Overrun {
108 requested,
109 oldest_available,
110 } => write!(
111 f,
112 "CDC overrun: requested seq {} but oldest available is {}",
113 requested, oldest_available
114 ),
115 CdcError::Shutdown => write!(f, "CDC engine shut down"),
116 CdcError::Timeout => write!(f, "Timed out waiting for CDC events"),
117 }
118 }
119}
120
121impl std::error::Error for CdcError {}
122
123pub type CdcResult<T> = Result<T, CdcError>;
124
125#[derive(Debug, Clone)]
131pub struct CdcConfig {
132 pub capacity: usize,
135 pub enabled: bool,
137}
138
139impl Default for CdcConfig {
140 fn default() -> Self {
141 Self {
142 capacity: 65_536,
143 enabled: true,
144 }
145 }
146}
147
148pub struct CdcLog {
150 buffer: RwLock<VecDeque<CdcEvent>>,
152 capacity: usize,
154 next_seq: AtomicU64,
156 notify: Arc<(Mutex<bool>, Condvar)>,
158 running: AtomicU64, }
161
162impl CdcLog {
163 pub fn new(config: CdcConfig) -> Arc<Self> {
165 Arc::new(Self {
166 buffer: RwLock::new(VecDeque::with_capacity(config.capacity)),
167 capacity: config.capacity,
168 next_seq: AtomicU64::new(1),
169 notify: Arc::new((Mutex::new(false), Condvar::new())),
170 running: AtomicU64::new(1),
171 })
172 }
173
174 pub fn emit(&self, events: Vec<CdcEvent>) {
180 if self.running.load(Ordering::Relaxed) == 0 {
181 return;
182 }
183 if events.is_empty() {
184 return;
185 }
186
187 let mut buf = self.buffer.write().unwrap();
188 for event in events {
189 if buf.len() >= self.capacity {
190 buf.pop_front(); }
192 buf.push_back(event);
193 }
194 drop(buf);
195
196 let (lock, cvar) = &*self.notify;
198 let mut ready = lock.lock().unwrap();
199 *ready = true;
200 cvar.notify_all();
201 }
202
203 pub fn emit_one(&self, event: CdcEvent) {
205 if self.running.load(Ordering::Relaxed) == 0 {
206 return;
207 }
208
209 let mut buf = self.buffer.write().unwrap();
210 if buf.len() >= self.capacity {
211 buf.pop_front();
212 }
213 buf.push_back(event);
214 drop(buf);
215
216 let (lock, cvar) = &*self.notify;
217 let mut ready = lock.lock().unwrap();
218 *ready = true;
219 cvar.notify_all();
220 }
221
222 pub fn next_sequence(&self) -> u64 {
224 self.next_seq.fetch_add(1, Ordering::SeqCst)
225 }
226
227 pub fn current_sequence(&self) -> u64 {
229 self.next_seq.load(Ordering::SeqCst).saturating_sub(1)
230 }
231
232 pub fn read_from(&self, from_seq: u64, max_events: usize) -> CdcResult<Vec<CdcEvent>> {
237 let buf = self.buffer.read().unwrap();
238
239 if buf.is_empty() {
240 return Ok(Vec::new());
241 }
242
243 let oldest_seq = buf.front().map(|e| e.sequence).unwrap_or(0);
244 let newest_seq = buf.back().map(|e| e.sequence).unwrap_or(0);
245
246 if from_seq < oldest_seq {
247 return Err(CdcError::Overrun {
248 requested: from_seq,
249 oldest_available: oldest_seq,
250 });
251 }
252
253 if from_seq > newest_seq {
254 return Ok(Vec::new()); }
256
257 let start_idx = buf
259 .iter()
260 .position(|e| e.sequence >= from_seq)
261 .unwrap_or(buf.len());
262
263 let events: Vec<CdcEvent> = buf
264 .iter()
265 .skip(start_idx)
266 .take(max_events)
267 .cloned()
268 .collect();
269
270 Ok(events)
271 }
272
273 pub fn wait_for_events(
278 &self,
279 after_seq: u64,
280 max_events: usize,
281 timeout: Duration,
282 ) -> CdcResult<Vec<CdcEvent>> {
283 if self.running.load(Ordering::Relaxed) == 0 {
284 return Err(CdcError::Shutdown);
285 }
286
287 let events = self.read_from(after_seq + 1, max_events)?;
289 if !events.is_empty() {
290 return Ok(events);
291 }
292
293 let (lock, cvar) = &*self.notify;
295 let mut ready = lock.lock().unwrap();
296 let start = std::time::Instant::now();
297
298 loop {
299 if self.running.load(Ordering::Relaxed) == 0 {
300 return Err(CdcError::Shutdown);
301 }
302
303 let remaining = timeout.checked_sub(start.elapsed()).unwrap_or(Duration::ZERO);
304 if remaining.is_zero() {
305 return Err(CdcError::Timeout);
306 }
307
308 let result = cvar.wait_timeout(ready, remaining).unwrap();
309 ready = result.0;
310
311 let events = self.read_from(after_seq + 1, max_events)?;
313 if !events.is_empty() {
314 *ready = false;
315 return Ok(events);
316 }
317
318 if result.1.timed_out() {
319 return Err(CdcError::Timeout);
320 }
321 }
322 }
323
324 pub fn oldest_sequence(&self) -> u64 {
326 self.buffer
327 .read()
328 .unwrap()
329 .front()
330 .map(|e| e.sequence)
331 .unwrap_or(0)
332 }
333
334 pub fn len(&self) -> usize {
336 self.buffer.read().unwrap().len()
337 }
338
339 pub fn is_empty(&self) -> bool {
341 self.buffer.read().unwrap().is_empty()
342 }
343
344 pub fn shutdown(&self) {
346 self.running.store(0, Ordering::SeqCst);
347 let (lock, cvar) = &*self.notify;
348 let mut ready = lock.lock().unwrap();
349 *ready = true;
350 cvar.notify_all();
351 drop(ready);
352 }
353}
354
355pub struct CdcEmitter {
370 log: Arc<CdcLog>,
371 txn_id: u64,
372 pending: Vec<CdcEvent>,
373}
374
375impl CdcEmitter {
376 pub fn new(log: Arc<CdcLog>, txn_id: u64) -> Self {
377 Self {
378 log,
379 txn_id,
380 pending: Vec::new(),
381 }
382 }
383
384 fn now_us() -> u64 {
385 SystemTime::now()
386 .duration_since(UNIX_EPOCH)
387 .unwrap_or_default()
388 .as_micros() as u64
389 }
390
391 pub fn insert(&mut self, table: &str, key: Vec<u8>, value: Vec<u8>) {
393 let seq = self.log.next_sequence();
394 self.pending.push(CdcEvent {
395 sequence: seq,
396 timestamp_us: Self::now_us(),
397 txn_id: self.txn_id,
398 table: table.to_string(),
399 key,
400 operation: CdcOperation::Insert { after: value },
401 });
402 }
403
404 pub fn update(&mut self, table: &str, key: Vec<u8>, new_value: Vec<u8>) {
406 let seq = self.log.next_sequence();
407 self.pending.push(CdcEvent {
408 sequence: seq,
409 timestamp_us: Self::now_us(),
410 txn_id: self.txn_id,
411 table: table.to_string(),
412 key,
413 operation: CdcOperation::Update {
414 before: None,
415 after: new_value,
416 },
417 });
418 }
419
420 pub fn delete(&mut self, table: &str, key: Vec<u8>) {
422 let seq = self.log.next_sequence();
423 self.pending.push(CdcEvent {
424 sequence: seq,
425 timestamp_us: Self::now_us(),
426 txn_id: self.txn_id,
427 table: table.to_string(),
428 key,
429 operation: CdcOperation::Delete { before: None },
430 });
431 }
432
433 pub fn schema_change(&mut self, table: &str, ddl: String) {
435 let seq = self.log.next_sequence();
436 self.pending.push(CdcEvent {
437 sequence: seq,
438 timestamp_us: Self::now_us(),
439 txn_id: self.txn_id,
440 table: table.to_string(),
441 key: Vec::new(),
442 operation: CdcOperation::SchemaChange { ddl },
443 });
444 }
445
446 pub fn flush(self) {
449 if !self.pending.is_empty() {
450 self.log.emit(self.pending);
451 }
452 }
453
454 pub fn discard(self) {
456 }
458
459 pub fn pending_count(&self) -> usize {
461 self.pending.len()
462 }
463}
464
465pub struct CdcSubscriber {
471 log: Arc<CdcLog>,
472 last_seq: u64,
474 table_filter: Option<Vec<String>>,
476}
477
478impl CdcSubscriber {
479 pub fn new(log: Arc<CdcLog>, from_seq: u64) -> Self {
482 Self {
483 log,
484 last_seq: from_seq,
485 table_filter: None,
486 }
487 }
488
489 pub fn from_latest(log: Arc<CdcLog>) -> Self {
491 let seq = log.current_sequence();
492 Self {
493 log,
494 last_seq: seq,
495 table_filter: None,
496 }
497 }
498
499 pub fn with_tables(mut self, tables: Vec<String>) -> Self {
501 self.table_filter = Some(tables);
502 self
503 }
504
505 pub fn poll(&mut self, max_events: usize) -> CdcResult<Vec<CdcEvent>> {
507 let events = self.log.read_from(self.last_seq + 1, max_events)?;
508 let filtered = self.filter_events(events);
509 if let Some(last) = filtered.last() {
510 self.last_seq = last.sequence;
511 }
512 Ok(filtered)
513 }
514
515 pub fn next_batch(
517 &mut self,
518 max_events: usize,
519 timeout: Duration,
520 ) -> CdcResult<Vec<CdcEvent>> {
521 let events = self
522 .log
523 .wait_for_events(self.last_seq, max_events, timeout)?;
524 let filtered = self.filter_events(events);
525 if let Some(last) = filtered.last() {
526 self.last_seq = last.sequence;
527 }
528 Ok(filtered)
529 }
530
531 pub fn position(&self) -> u64 {
533 self.last_seq
534 }
535
536 fn filter_events(&self, events: Vec<CdcEvent>) -> Vec<CdcEvent> {
537 if let Some(ref tables) = self.table_filter {
538 events
539 .into_iter()
540 .filter(|e| tables.iter().any(|t| *t == e.table))
541 .collect()
542 } else {
543 events
544 }
545 }
546}
547
548#[cfg(test)]
553mod tests {
554 use super::*;
555 use std::thread;
556
557 fn make_log(cap: usize) -> Arc<CdcLog> {
558 CdcLog::new(CdcConfig {
559 capacity: cap,
560 enabled: true,
561 })
562 }
563
564 #[test]
565 fn test_cdc_emit_and_read() {
566 let log = make_log(100);
567 let mut emitter = CdcEmitter::new(log.clone(), 42);
568
569 emitter.insert("users", b"key1".to_vec(), b"val1".to_vec());
570 emitter.insert("users", b"key2".to_vec(), b"val2".to_vec());
571 assert_eq!(emitter.pending_count(), 2);
572
573 emitter.flush();
574
575 assert_eq!(log.len(), 2);
576 let events = log.read_from(1, 10).unwrap();
577 assert_eq!(events.len(), 2);
578 assert_eq!(events[0].table, "users");
579 assert_eq!(events[0].txn_id, 42);
580 assert_eq!(events[0].sequence, 1);
581 assert_eq!(events[1].sequence, 2);
582 }
583
584 #[test]
585 fn test_cdc_ring_buffer_overflow() {
586 let log = make_log(3);
587
588 for i in 1..=5 {
589 log.emit_one(CdcEvent {
590 sequence: log.next_sequence(),
591 timestamp_us: 0,
592 txn_id: i,
593 table: "t".into(),
594 key: vec![i as u8],
595 operation: CdcOperation::Insert {
596 after: vec![i as u8],
597 },
598 });
599 }
600
601 assert_eq!(log.len(), 3);
603 assert_eq!(log.oldest_sequence(), 3);
604
605 let err = log.read_from(1, 10).unwrap_err();
607 assert!(matches!(
608 err,
609 CdcError::Overrun {
610 requested: 1,
611 oldest_available: 3
612 }
613 ));
614
615 let events = log.read_from(3, 10).unwrap();
617 assert_eq!(events.len(), 3);
618 }
619
620 #[test]
621 fn test_cdc_subscriber() {
622 let log = make_log(100);
623
624 let mut emitter = CdcEmitter::new(log.clone(), 1);
626 emitter.insert("users", b"u1".to_vec(), b"v1".to_vec());
627 emitter.insert("orders", b"o1".to_vec(), b"v2".to_vec());
628 emitter.flush();
629
630 let mut sub = CdcSubscriber::new(log.clone(), 0);
632 let events = sub.poll(10).unwrap();
633 assert_eq!(events.len(), 2);
634 assert_eq!(sub.position(), 2);
635
636 let events = sub.poll(10).unwrap();
638 assert_eq!(events.len(), 0);
639
640 let mut emitter = CdcEmitter::new(log.clone(), 2);
642 emitter.update("users", b"u1".to_vec(), b"v1_updated".to_vec());
643 emitter.flush();
644
645 let events = sub.poll(10).unwrap();
646 assert_eq!(events.len(), 1);
647 assert!(matches!(
648 events[0].operation,
649 CdcOperation::Update { .. }
650 ));
651 }
652
653 #[test]
654 fn test_cdc_table_filter() {
655 let log = make_log(100);
656
657 let mut emitter = CdcEmitter::new(log.clone(), 1);
658 emitter.insert("users", b"u1".to_vec(), b"v1".to_vec());
659 emitter.insert("orders", b"o1".to_vec(), b"v2".to_vec());
660 emitter.insert("users", b"u2".to_vec(), b"v3".to_vec());
661 emitter.flush();
662
663 let mut sub =
664 CdcSubscriber::new(log.clone(), 0).with_tables(vec!["users".to_string()]);
665 let events = sub.poll(10).unwrap();
666 assert_eq!(events.len(), 2);
667 assert!(events.iter().all(|e| e.table == "users"));
668 }
669
670 #[test]
671 fn test_cdc_subscriber_from_latest() {
672 let log = make_log(100);
673
674 log.emit_one(CdcEvent {
676 sequence: log.next_sequence(),
677 timestamp_us: 0,
678 txn_id: 1,
679 table: "old".into(),
680 key: vec![],
681 operation: CdcOperation::Insert { after: vec![] },
682 });
683
684 let mut sub = CdcSubscriber::from_latest(log.clone());
686
687 log.emit_one(CdcEvent {
689 sequence: log.next_sequence(),
690 timestamp_us: 0,
691 txn_id: 2,
692 table: "new".into(),
693 key: vec![],
694 operation: CdcOperation::Insert { after: vec![] },
695 });
696
697 let events = sub.poll(10).unwrap();
698 assert_eq!(events.len(), 1);
699 assert_eq!(events[0].table, "new");
700 }
701
702 #[test]
703 fn test_cdc_wait_for_events() {
704 let log = make_log(100);
705 let log_clone = log.clone();
706
707 let handle = thread::spawn(move || {
709 thread::sleep(Duration::from_millis(50));
710 log_clone.emit_one(CdcEvent {
711 sequence: log_clone.next_sequence(),
712 timestamp_us: 0,
713 txn_id: 1,
714 table: "t".into(),
715 key: vec![1],
716 operation: CdcOperation::Insert { after: vec![1] },
717 });
718 });
719
720 let events = log.wait_for_events(0, 10, Duration::from_secs(2)).unwrap();
721 assert_eq!(events.len(), 1);
722 handle.join().unwrap();
723 }
724
725 #[test]
726 fn test_cdc_wait_timeout() {
727 let log = make_log(100);
728 let err = log
729 .wait_for_events(0, 10, Duration::from_millis(50))
730 .unwrap_err();
731 assert!(matches!(err, CdcError::Timeout));
732 }
733
734 #[test]
735 fn test_cdc_shutdown() {
736 let log = make_log(100);
737 let log_clone = log.clone();
738
739 let handle = thread::spawn(move || {
740 log_clone.wait_for_events(0, 10, Duration::from_secs(5))
741 });
742
743 thread::sleep(Duration::from_millis(50));
744 log.shutdown();
745
746 let result = handle.join().unwrap();
747 assert!(matches!(result, Err(CdcError::Shutdown)));
748 }
749
750 #[test]
751 fn test_cdc_emitter_discard() {
752 let log = make_log(100);
753 let mut emitter = CdcEmitter::new(log.clone(), 1);
754 emitter.insert("t", b"k".to_vec(), b"v".to_vec());
755 emitter.discard(); assert!(log.is_empty());
758 }
759
760 #[test]
761 fn test_cdc_schema_change() {
762 let log = make_log(100);
763 let mut emitter = CdcEmitter::new(log.clone(), 1);
764 emitter.schema_change("users", "ALTER TABLE users ADD COLUMN age INT".to_string());
765 emitter.flush();
766
767 let events = log.read_from(1, 10).unwrap();
768 assert_eq!(events.len(), 1);
769 assert!(matches!(
770 &events[0].operation,
771 CdcOperation::SchemaChange { ddl } if ddl.contains("ALTER TABLE")
772 ));
773 }
774
775 #[test]
776 fn test_cdc_concurrent_emit_and_read() {
777 let log = make_log(10_000);
778 let log_clone = log.clone();
779
780 let writer = thread::spawn(move || {
782 for i in 0..1000 {
783 log_clone.emit_one(CdcEvent {
784 sequence: log_clone.next_sequence(),
785 timestamp_us: 0,
786 txn_id: i as u64,
787 table: "t".into(),
788 key: vec![],
789 operation: CdcOperation::Insert { after: vec![] },
790 });
791 }
792 });
793
794 writer.join().unwrap();
795
796 let events = log.read_from(1, 10_000).unwrap();
797 assert_eq!(events.len(), 1000);
798 for i in 1..events.len() {
800 assert!(events[i].sequence > events[i - 1].sequence);
801 }
802 }
803}