use std::collections::VecDeque;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Condvar, Mutex, RwLock};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone, PartialEq)]
pub struct CdcEvent {
pub sequence: u64,
pub timestamp_us: u64,
pub txn_id: u64,
pub table: String,
pub key: Vec<u8>,
pub operation: CdcOperation,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CdcOperation {
Insert { after: Vec<u8> },
Update {
before: Option<Vec<u8>>,
after: Vec<u8>,
},
Delete { before: Option<Vec<u8>> },
SchemaChange { ddl: String },
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CdcError {
Overrun {
requested: u64,
oldest_available: u64,
},
Shutdown,
Timeout,
}
impl std::fmt::Display for CdcError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
CdcError::Overrun {
requested,
oldest_available,
} => write!(
f,
"CDC overrun: requested seq {} but oldest available is {}",
requested, oldest_available
),
CdcError::Shutdown => write!(f, "CDC engine shut down"),
CdcError::Timeout => write!(f, "Timed out waiting for CDC events"),
}
}
}
impl std::error::Error for CdcError {}
pub type CdcResult<T> = Result<T, CdcError>;
#[derive(Debug, Clone)]
pub struct CdcConfig {
pub capacity: usize,
pub enabled: bool,
}
impl Default for CdcConfig {
fn default() -> Self {
Self {
capacity: 65_536,
enabled: true,
}
}
}
pub struct CdcLog {
buffer: RwLock<VecDeque<CdcEvent>>,
capacity: usize,
next_seq: AtomicU64,
notify: Arc<(Mutex<bool>, Condvar)>,
running: AtomicU64, }
impl CdcLog {
pub fn new(config: CdcConfig) -> Arc<Self> {
Arc::new(Self {
buffer: RwLock::new(VecDeque::with_capacity(config.capacity)),
capacity: config.capacity,
next_seq: AtomicU64::new(1),
notify: Arc::new((Mutex::new(false), Condvar::new())),
running: AtomicU64::new(1),
})
}
pub fn emit(&self, events: Vec<CdcEvent>) {
if self.running.load(Ordering::Relaxed) == 0 {
return;
}
if events.is_empty() {
return;
}
let mut buf = self.buffer.write().unwrap();
for event in events {
if buf.len() >= self.capacity {
buf.pop_front(); }
buf.push_back(event);
}
drop(buf);
let (lock, cvar) = &*self.notify;
let mut ready = lock.lock().unwrap();
*ready = true;
cvar.notify_all();
}
pub fn emit_one(&self, event: CdcEvent) {
if self.running.load(Ordering::Relaxed) == 0 {
return;
}
let mut buf = self.buffer.write().unwrap();
if buf.len() >= self.capacity {
buf.pop_front();
}
buf.push_back(event);
drop(buf);
let (lock, cvar) = &*self.notify;
let mut ready = lock.lock().unwrap();
*ready = true;
cvar.notify_all();
}
pub fn next_sequence(&self) -> u64 {
self.next_seq.fetch_add(1, Ordering::SeqCst)
}
pub fn current_sequence(&self) -> u64 {
self.next_seq.load(Ordering::SeqCst).saturating_sub(1)
}
pub fn read_from(&self, from_seq: u64, max_events: usize) -> CdcResult<Vec<CdcEvent>> {
let buf = self.buffer.read().unwrap();
if buf.is_empty() {
return Ok(Vec::new());
}
let oldest_seq = buf.front().map(|e| e.sequence).unwrap_or(0);
let newest_seq = buf.back().map(|e| e.sequence).unwrap_or(0);
if from_seq < oldest_seq {
return Err(CdcError::Overrun {
requested: from_seq,
oldest_available: oldest_seq,
});
}
if from_seq > newest_seq {
return Ok(Vec::new()); }
let start_idx = buf
.iter()
.position(|e| e.sequence >= from_seq)
.unwrap_or(buf.len());
let events: Vec<CdcEvent> = buf
.iter()
.skip(start_idx)
.take(max_events)
.cloned()
.collect();
Ok(events)
}
pub fn wait_for_events(
&self,
after_seq: u64,
max_events: usize,
timeout: Duration,
) -> CdcResult<Vec<CdcEvent>> {
if self.running.load(Ordering::Relaxed) == 0 {
return Err(CdcError::Shutdown);
}
let events = self.read_from(after_seq + 1, max_events)?;
if !events.is_empty() {
return Ok(events);
}
let (lock, cvar) = &*self.notify;
let mut ready = lock.lock().unwrap();
let start = std::time::Instant::now();
loop {
if self.running.load(Ordering::Relaxed) == 0 {
return Err(CdcError::Shutdown);
}
let remaining = timeout.checked_sub(start.elapsed()).unwrap_or(Duration::ZERO);
if remaining.is_zero() {
return Err(CdcError::Timeout);
}
let result = cvar.wait_timeout(ready, remaining).unwrap();
ready = result.0;
let events = self.read_from(after_seq + 1, max_events)?;
if !events.is_empty() {
*ready = false;
return Ok(events);
}
if result.1.timed_out() {
return Err(CdcError::Timeout);
}
}
}
pub fn oldest_sequence(&self) -> u64 {
self.buffer
.read()
.unwrap()
.front()
.map(|e| e.sequence)
.unwrap_or(0)
}
pub fn len(&self) -> usize {
self.buffer.read().unwrap().len()
}
pub fn is_empty(&self) -> bool {
self.buffer.read().unwrap().is_empty()
}
pub fn shutdown(&self) {
self.running.store(0, Ordering::SeqCst);
let (lock, cvar) = &*self.notify;
let mut ready = lock.lock().unwrap();
*ready = true;
cvar.notify_all();
drop(ready);
}
}
pub struct CdcEmitter {
log: Arc<CdcLog>,
txn_id: u64,
pending: Vec<CdcEvent>,
}
impl CdcEmitter {
pub fn new(log: Arc<CdcLog>, txn_id: u64) -> Self {
Self {
log,
txn_id,
pending: Vec::new(),
}
}
fn now_us() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_micros() as u64
}
pub fn insert(&mut self, table: &str, key: Vec<u8>, value: Vec<u8>) {
let seq = self.log.next_sequence();
self.pending.push(CdcEvent {
sequence: seq,
timestamp_us: Self::now_us(),
txn_id: self.txn_id,
table: table.to_string(),
key,
operation: CdcOperation::Insert { after: value },
});
}
pub fn update(&mut self, table: &str, key: Vec<u8>, new_value: Vec<u8>) {
let seq = self.log.next_sequence();
self.pending.push(CdcEvent {
sequence: seq,
timestamp_us: Self::now_us(),
txn_id: self.txn_id,
table: table.to_string(),
key,
operation: CdcOperation::Update {
before: None,
after: new_value,
},
});
}
pub fn delete(&mut self, table: &str, key: Vec<u8>) {
let seq = self.log.next_sequence();
self.pending.push(CdcEvent {
sequence: seq,
timestamp_us: Self::now_us(),
txn_id: self.txn_id,
table: table.to_string(),
key,
operation: CdcOperation::Delete { before: None },
});
}
pub fn schema_change(&mut self, table: &str, ddl: String) {
let seq = self.log.next_sequence();
self.pending.push(CdcEvent {
sequence: seq,
timestamp_us: Self::now_us(),
txn_id: self.txn_id,
table: table.to_string(),
key: Vec::new(),
operation: CdcOperation::SchemaChange { ddl },
});
}
pub fn flush(self) {
if !self.pending.is_empty() {
self.log.emit(self.pending);
}
}
pub fn discard(self) {
}
pub fn pending_count(&self) -> usize {
self.pending.len()
}
}
pub struct CdcSubscriber {
log: Arc<CdcLog>,
last_seq: u64,
table_filter: Option<Vec<String>>,
}
impl CdcSubscriber {
pub fn new(log: Arc<CdcLog>, from_seq: u64) -> Self {
Self {
log,
last_seq: from_seq,
table_filter: None,
}
}
pub fn from_latest(log: Arc<CdcLog>) -> Self {
let seq = log.current_sequence();
Self {
log,
last_seq: seq,
table_filter: None,
}
}
pub fn with_tables(mut self, tables: Vec<String>) -> Self {
self.table_filter = Some(tables);
self
}
pub fn poll(&mut self, max_events: usize) -> CdcResult<Vec<CdcEvent>> {
let events = self.log.read_from(self.last_seq + 1, max_events)?;
let filtered = self.filter_events(events);
if let Some(last) = filtered.last() {
self.last_seq = last.sequence;
}
Ok(filtered)
}
pub fn next_batch(
&mut self,
max_events: usize,
timeout: Duration,
) -> CdcResult<Vec<CdcEvent>> {
let events = self
.log
.wait_for_events(self.last_seq, max_events, timeout)?;
let filtered = self.filter_events(events);
if let Some(last) = filtered.last() {
self.last_seq = last.sequence;
}
Ok(filtered)
}
pub fn position(&self) -> u64 {
self.last_seq
}
fn filter_events(&self, events: Vec<CdcEvent>) -> Vec<CdcEvent> {
if let Some(ref tables) = self.table_filter {
events
.into_iter()
.filter(|e| tables.iter().any(|t| *t == e.table))
.collect()
} else {
events
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
fn make_log(cap: usize) -> Arc<CdcLog> {
CdcLog::new(CdcConfig {
capacity: cap,
enabled: true,
})
}
#[test]
fn test_cdc_emit_and_read() {
let log = make_log(100);
let mut emitter = CdcEmitter::new(log.clone(), 42);
emitter.insert("users", b"key1".to_vec(), b"val1".to_vec());
emitter.insert("users", b"key2".to_vec(), b"val2".to_vec());
assert_eq!(emitter.pending_count(), 2);
emitter.flush();
assert_eq!(log.len(), 2);
let events = log.read_from(1, 10).unwrap();
assert_eq!(events.len(), 2);
assert_eq!(events[0].table, "users");
assert_eq!(events[0].txn_id, 42);
assert_eq!(events[0].sequence, 1);
assert_eq!(events[1].sequence, 2);
}
#[test]
fn test_cdc_ring_buffer_overflow() {
let log = make_log(3);
for i in 1..=5 {
log.emit_one(CdcEvent {
sequence: log.next_sequence(),
timestamp_us: 0,
txn_id: i,
table: "t".into(),
key: vec![i as u8],
operation: CdcOperation::Insert {
after: vec![i as u8],
},
});
}
assert_eq!(log.len(), 3);
assert_eq!(log.oldest_sequence(), 3);
let err = log.read_from(1, 10).unwrap_err();
assert!(matches!(
err,
CdcError::Overrun {
requested: 1,
oldest_available: 3
}
));
let events = log.read_from(3, 10).unwrap();
assert_eq!(events.len(), 3);
}
#[test]
fn test_cdc_subscriber() {
let log = make_log(100);
let mut emitter = CdcEmitter::new(log.clone(), 1);
emitter.insert("users", b"u1".to_vec(), b"v1".to_vec());
emitter.insert("orders", b"o1".to_vec(), b"v2".to_vec());
emitter.flush();
let mut sub = CdcSubscriber::new(log.clone(), 0);
let events = sub.poll(10).unwrap();
assert_eq!(events.len(), 2);
assert_eq!(sub.position(), 2);
let events = sub.poll(10).unwrap();
assert_eq!(events.len(), 0);
let mut emitter = CdcEmitter::new(log.clone(), 2);
emitter.update("users", b"u1".to_vec(), b"v1_updated".to_vec());
emitter.flush();
let events = sub.poll(10).unwrap();
assert_eq!(events.len(), 1);
assert!(matches!(
events[0].operation,
CdcOperation::Update { .. }
));
}
#[test]
fn test_cdc_table_filter() {
let log = make_log(100);
let mut emitter = CdcEmitter::new(log.clone(), 1);
emitter.insert("users", b"u1".to_vec(), b"v1".to_vec());
emitter.insert("orders", b"o1".to_vec(), b"v2".to_vec());
emitter.insert("users", b"u2".to_vec(), b"v3".to_vec());
emitter.flush();
let mut sub =
CdcSubscriber::new(log.clone(), 0).with_tables(vec!["users".to_string()]);
let events = sub.poll(10).unwrap();
assert_eq!(events.len(), 2);
assert!(events.iter().all(|e| e.table == "users"));
}
#[test]
fn test_cdc_subscriber_from_latest() {
let log = make_log(100);
log.emit_one(CdcEvent {
sequence: log.next_sequence(),
timestamp_us: 0,
txn_id: 1,
table: "old".into(),
key: vec![],
operation: CdcOperation::Insert { after: vec![] },
});
let mut sub = CdcSubscriber::from_latest(log.clone());
log.emit_one(CdcEvent {
sequence: log.next_sequence(),
timestamp_us: 0,
txn_id: 2,
table: "new".into(),
key: vec![],
operation: CdcOperation::Insert { after: vec![] },
});
let events = sub.poll(10).unwrap();
assert_eq!(events.len(), 1);
assert_eq!(events[0].table, "new");
}
#[test]
fn test_cdc_wait_for_events() {
let log = make_log(100);
let log_clone = log.clone();
let handle = thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
log_clone.emit_one(CdcEvent {
sequence: log_clone.next_sequence(),
timestamp_us: 0,
txn_id: 1,
table: "t".into(),
key: vec![1],
operation: CdcOperation::Insert { after: vec![1] },
});
});
let events = log.wait_for_events(0, 10, Duration::from_secs(2)).unwrap();
assert_eq!(events.len(), 1);
handle.join().unwrap();
}
#[test]
fn test_cdc_wait_timeout() {
let log = make_log(100);
let err = log
.wait_for_events(0, 10, Duration::from_millis(50))
.unwrap_err();
assert!(matches!(err, CdcError::Timeout));
}
#[test]
fn test_cdc_shutdown() {
let log = make_log(100);
let log_clone = log.clone();
let handle = thread::spawn(move || {
log_clone.wait_for_events(0, 10, Duration::from_secs(5))
});
thread::sleep(Duration::from_millis(50));
log.shutdown();
let result = handle.join().unwrap();
assert!(matches!(result, Err(CdcError::Shutdown)));
}
#[test]
fn test_cdc_emitter_discard() {
let log = make_log(100);
let mut emitter = CdcEmitter::new(log.clone(), 1);
emitter.insert("t", b"k".to_vec(), b"v".to_vec());
emitter.discard();
assert!(log.is_empty());
}
#[test]
fn test_cdc_schema_change() {
let log = make_log(100);
let mut emitter = CdcEmitter::new(log.clone(), 1);
emitter.schema_change("users", "ALTER TABLE users ADD COLUMN age INT".to_string());
emitter.flush();
let events = log.read_from(1, 10).unwrap();
assert_eq!(events.len(), 1);
assert!(matches!(
&events[0].operation,
CdcOperation::SchemaChange { ddl } if ddl.contains("ALTER TABLE")
));
}
#[test]
fn test_cdc_concurrent_emit_and_read() {
let log = make_log(10_000);
let log_clone = log.clone();
let writer = thread::spawn(move || {
for i in 0..1000 {
log_clone.emit_one(CdcEvent {
sequence: log_clone.next_sequence(),
timestamp_us: 0,
txn_id: i as u64,
table: "t".into(),
key: vec![],
operation: CdcOperation::Insert { after: vec![] },
});
}
});
writer.join().unwrap();
let events = log.read_from(1, 10_000).unwrap();
assert_eq!(events.len(), 1000);
for i in 1..events.len() {
assert!(events[i].sequence > events[i - 1].sequence);
}
}
}