Skip to main content

sochdb_storage/
cdc.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4
5//! # WAL-Derived Change Data Capture (CDC) Engine
6//!
7//! Provides a log-structured stream of database mutations (inserts, updates,
8//! deletes) that subscribers can consume from any position.
9//!
10//! ## Architecture
11//!
12//! ```text
13//!  Database commit path
14//!         │
15//!         ▼
16//!   ┌───────────┐    emit()     ┌─────────────┐
17//!   │ CdcEmitter│──────────────▶│  CdcLog      │
18//!   └───────────┘               │ (ring buffer) │
19//!                               └──────┬────────┘
20//!                                      │ subscribe(from_seq)
21//!                          ┌───────────┼───────────┐
22//!                          ▼           ▼           ▼
23//!                     Subscriber₁  Subscriber₂  SubscriberN
24//! ```
25//!
26//! ## Design Decisions
27//!
28//! - **After-image only**: Events carry the new value but not the old value.
29//!   The active WAL path (`TxnWalEntry`) doesn't record before-images.
30//!   A future enhancement can bridge the ARIES `WalRecord` path for full
31//!   before/after CDC.
32//!
33//! - **Ring buffer with overflow**: Fixed-capacity ring buffer. When the buffer
34//!   is full, the oldest events are dropped. Slow subscribers must catch up
35//!   from WAL replay (not yet implemented — returns `CdcError::Overrun`).
36//!
37//! - **Sequence numbers**: Events are assigned monotonically increasing sequence
38//!   numbers, independent of WAL LSNs. Subscribers track their position via
39//!   these sequence numbers.
40//!
41
42use std::collections::VecDeque;
43use std::sync::atomic::{AtomicU64, Ordering};
44use std::sync::{Arc, Condvar, Mutex, RwLock};
45use std::time::{Duration, SystemTime, UNIX_EPOCH};
46
47// ============================================================================
48// CDC Event Types
49// ============================================================================
50
51/// A CDC event representing a single mutation.
52#[derive(Debug, Clone, PartialEq)]
53pub struct CdcEvent {
54    /// Monotonically increasing sequence number.
55    pub sequence: u64,
56    /// Timestamp (microseconds since epoch).
57    pub timestamp_us: u64,
58    /// Transaction ID that produced this event.
59    pub txn_id: u64,
60    /// Name of the affected table.
61    pub table: String,
62    /// Primary key (raw bytes).
63    pub key: Vec<u8>,
64    /// The type of operation.
65    pub operation: CdcOperation,
66}
67
68/// The type of mutation.
69#[derive(Debug, Clone, PartialEq, Eq)]
70pub enum CdcOperation {
71    /// Row inserted. `after` contains the new value.
72    Insert { after: Vec<u8> },
73    /// Row updated. `after` contains the new value.
74    /// `before` is `None` in the current implementation (after-image only).
75    Update {
76        before: Option<Vec<u8>>,
77        after: Vec<u8>,
78    },
79    /// Row deleted. `before` is `None` in the current implementation.
80    Delete { before: Option<Vec<u8>> },
81    /// Schema change (CREATE TABLE, ALTER TABLE, DROP TABLE).
82    SchemaChange { ddl: String },
83}
84
85// ============================================================================
86// CDC Errors
87// ============================================================================
88
89/// Errors that can occur in the CDC subsystem.
90#[derive(Debug, Clone, PartialEq, Eq)]
91pub enum CdcError {
92    /// The requested sequence number has been evicted from the ring buffer.
93    /// The subscriber must fall back to WAL replay.
94    Overrun {
95        requested: u64,
96        oldest_available: u64,
97    },
98    /// The CDC engine has been shut down.
99    Shutdown,
100    /// Timed out waiting for new events.
101    Timeout,
102}
103
104impl std::fmt::Display for CdcError {
105    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106        match self {
107            CdcError::Overrun {
108                requested,
109                oldest_available,
110            } => write!(
111                f,
112                "CDC overrun: requested seq {} but oldest available is {}",
113                requested, oldest_available
114            ),
115            CdcError::Shutdown => write!(f, "CDC engine shut down"),
116            CdcError::Timeout => write!(f, "Timed out waiting for CDC events"),
117        }
118    }
119}
120
121impl std::error::Error for CdcError {}
122
123pub type CdcResult<T> = Result<T, CdcError>;
124
125// ============================================================================
126// CDC Log (Ring Buffer)
127// ============================================================================
128
129/// Configuration for the CDC engine.
130#[derive(Debug, Clone)]
131pub struct CdcConfig {
132    /// Maximum number of events to keep in the ring buffer.
133    /// Default: 65536 (~64K events). At ~1KB per event, this is ~64MB.
134    pub capacity: usize,
135    /// Whether CDC is enabled. If false, `emit()` is a no-op.
136    pub enabled: bool,
137}
138
139impl Default for CdcConfig {
140    fn default() -> Self {
141        Self {
142            capacity: 65_536,
143            enabled: true,
144        }
145    }
146}
147
148/// The core CDC log — a ring buffer of events with subscriber notification.
149pub struct CdcLog {
150    /// Ring buffer of events.
151    buffer: RwLock<VecDeque<CdcEvent>>,
152    /// Maximum ring buffer capacity.
153    capacity: usize,
154    /// Next sequence number to assign.
155    next_seq: AtomicU64,
156    /// Condition variable for subscriber notification.
157    notify: Arc<(Mutex<bool>, Condvar)>,
158    /// Whether the engine is running.
159    running: AtomicU64, // 0 = stopped, 1 = running
160}
161
162impl CdcLog {
163    /// Create a new CDC log with the given configuration.
164    pub fn new(config: CdcConfig) -> Arc<Self> {
165        Arc::new(Self {
166            buffer: RwLock::new(VecDeque::with_capacity(config.capacity)),
167            capacity: config.capacity,
168            next_seq: AtomicU64::new(1),
169            notify: Arc::new((Mutex::new(false), Condvar::new())),
170            running: AtomicU64::new(1),
171        })
172    }
173
174    /// Emit a batch of CDC events (typically one per row in a transaction).
175    ///
176    /// This is called from the commit path after WAL flush + group commit.
177    /// Must be fast — no I/O, no allocations on the hot path (beyond the
178    /// ring buffer push).
179    pub fn emit(&self, events: Vec<CdcEvent>) {
180        if self.running.load(Ordering::Relaxed) == 0 {
181            return;
182        }
183        if events.is_empty() {
184            return;
185        }
186
187        let mut buf = self.buffer.write().unwrap();
188        for event in events {
189            if buf.len() >= self.capacity {
190                buf.pop_front(); // drop oldest
191            }
192            buf.push_back(event);
193        }
194        drop(buf);
195
196        // Notify subscribers
197        let (lock, cvar) = &*self.notify;
198        let mut ready = lock.lock().unwrap();
199        *ready = true;
200        cvar.notify_all();
201    }
202
203    /// Emit a single event.
204    pub fn emit_one(&self, event: CdcEvent) {
205        if self.running.load(Ordering::Relaxed) == 0 {
206            return;
207        }
208
209        let mut buf = self.buffer.write().unwrap();
210        if buf.len() >= self.capacity {
211            buf.pop_front();
212        }
213        buf.push_back(event);
214        drop(buf);
215
216        let (lock, cvar) = &*self.notify;
217        let mut ready = lock.lock().unwrap();
218        *ready = true;
219        cvar.notify_all();
220    }
221
222    /// Allocate the next sequence number.
223    pub fn next_sequence(&self) -> u64 {
224        self.next_seq.fetch_add(1, Ordering::SeqCst)
225    }
226
227    /// Get the current (latest) sequence number.
228    pub fn current_sequence(&self) -> u64 {
229        self.next_seq.load(Ordering::SeqCst).saturating_sub(1)
230    }
231
232    /// Read events starting from `from_seq` (inclusive).
233    ///
234    /// Returns up to `max_events` events. If `from_seq` has been evicted
235    /// from the ring buffer, returns `CdcError::Overrun`.
236    pub fn read_from(&self, from_seq: u64, max_events: usize) -> CdcResult<Vec<CdcEvent>> {
237        let buf = self.buffer.read().unwrap();
238
239        if buf.is_empty() {
240            return Ok(Vec::new());
241        }
242
243        let oldest_seq = buf.front().map(|e| e.sequence).unwrap_or(0);
244        let newest_seq = buf.back().map(|e| e.sequence).unwrap_or(0);
245
246        if from_seq < oldest_seq {
247            return Err(CdcError::Overrun {
248                requested: from_seq,
249                oldest_available: oldest_seq,
250            });
251        }
252
253        if from_seq > newest_seq {
254            return Ok(Vec::new()); // no new events yet
255        }
256
257        // Binary search for the start position
258        let start_idx = buf
259            .iter()
260            .position(|e| e.sequence >= from_seq)
261            .unwrap_or(buf.len());
262
263        let events: Vec<CdcEvent> = buf
264            .iter()
265            .skip(start_idx)
266            .take(max_events)
267            .cloned()
268            .collect();
269
270        Ok(events)
271    }
272
273    /// Wait for new events after `after_seq`, with timeout.
274    ///
275    /// Returns events with sequence > `after_seq`, blocking until at least
276    /// one event is available or the timeout expires.
277    pub fn wait_for_events(
278        &self,
279        after_seq: u64,
280        max_events: usize,
281        timeout: Duration,
282    ) -> CdcResult<Vec<CdcEvent>> {
283        if self.running.load(Ordering::Relaxed) == 0 {
284            return Err(CdcError::Shutdown);
285        }
286
287        // Fast path: check if events are already available
288        let events = self.read_from(after_seq + 1, max_events)?;
289        if !events.is_empty() {
290            return Ok(events);
291        }
292
293        // Slow path: wait for notification
294        let (lock, cvar) = &*self.notify;
295        let mut ready = lock.lock().unwrap();
296        let start = std::time::Instant::now();
297
298        loop {
299            if self.running.load(Ordering::Relaxed) == 0 {
300                return Err(CdcError::Shutdown);
301            }
302
303            let remaining = timeout
304                .checked_sub(start.elapsed())
305                .unwrap_or(Duration::ZERO);
306            if remaining.is_zero() {
307                return Err(CdcError::Timeout);
308            }
309
310            let result = cvar.wait_timeout(ready, remaining).unwrap();
311            ready = result.0;
312
313            // Check for events
314            let events = self.read_from(after_seq + 1, max_events)?;
315            if !events.is_empty() {
316                *ready = false;
317                return Ok(events);
318            }
319
320            if result.1.timed_out() {
321                return Err(CdcError::Timeout);
322            }
323        }
324    }
325
326    /// Get the oldest available sequence number (or 0 if empty).
327    pub fn oldest_sequence(&self) -> u64 {
328        self.buffer
329            .read()
330            .unwrap()
331            .front()
332            .map(|e| e.sequence)
333            .unwrap_or(0)
334    }
335
336    /// Get the number of events in the buffer.
337    pub fn len(&self) -> usize {
338        self.buffer.read().unwrap().len()
339    }
340
341    /// Check if the buffer is empty.
342    pub fn is_empty(&self) -> bool {
343        self.buffer.read().unwrap().is_empty()
344    }
345
346    /// Shut down the CDC engine, waking all waiting subscribers.
347    pub fn shutdown(&self) {
348        self.running.store(0, Ordering::SeqCst);
349        let (lock, cvar) = &*self.notify;
350        let mut ready = lock.lock().unwrap();
351        *ready = true;
352        cvar.notify_all();
353        drop(ready);
354    }
355}
356
357// ============================================================================
358// CDC Emitter — Helper for producing CDC events from the commit path
359// ============================================================================
360
361/// Helper struct for building CDC events during a transaction.
362///
363/// Usage (in the SQL execution layer):
364/// ```ignore
365/// let mut emitter = CdcEmitter::new(cdc_log.clone(), txn_id);
366/// emitter.insert("users", key, value);
367/// emitter.update("users", key, new_value);
368/// emitter.delete("users", key);
369/// emitter.flush(); // called after successful commit
370/// ```
371pub struct CdcEmitter {
372    log: Arc<CdcLog>,
373    txn_id: u64,
374    pending: Vec<CdcEvent>,
375}
376
377impl CdcEmitter {
378    pub fn new(log: Arc<CdcLog>, txn_id: u64) -> Self {
379        Self {
380            log,
381            txn_id,
382            pending: Vec::new(),
383        }
384    }
385
386    fn now_us() -> u64 {
387        SystemTime::now()
388            .duration_since(UNIX_EPOCH)
389            .unwrap_or_default()
390            .as_micros() as u64
391    }
392
393    /// Record an INSERT event.
394    pub fn insert(&mut self, table: &str, key: Vec<u8>, value: Vec<u8>) {
395        let seq = self.log.next_sequence();
396        self.pending.push(CdcEvent {
397            sequence: seq,
398            timestamp_us: Self::now_us(),
399            txn_id: self.txn_id,
400            table: table.to_string(),
401            key,
402            operation: CdcOperation::Insert { after: value },
403        });
404    }
405
406    /// Record an UPDATE event.
407    pub fn update(&mut self, table: &str, key: Vec<u8>, new_value: Vec<u8>) {
408        let seq = self.log.next_sequence();
409        self.pending.push(CdcEvent {
410            sequence: seq,
411            timestamp_us: Self::now_us(),
412            txn_id: self.txn_id,
413            table: table.to_string(),
414            key,
415            operation: CdcOperation::Update {
416                before: None,
417                after: new_value,
418            },
419        });
420    }
421
422    /// Record a DELETE event.
423    pub fn delete(&mut self, table: &str, key: Vec<u8>) {
424        let seq = self.log.next_sequence();
425        self.pending.push(CdcEvent {
426            sequence: seq,
427            timestamp_us: Self::now_us(),
428            txn_id: self.txn_id,
429            table: table.to_string(),
430            key,
431            operation: CdcOperation::Delete { before: None },
432        });
433    }
434
435    /// Record a schema change event.
436    pub fn schema_change(&mut self, table: &str, ddl: String) {
437        let seq = self.log.next_sequence();
438        self.pending.push(CdcEvent {
439            sequence: seq,
440            timestamp_us: Self::now_us(),
441            txn_id: self.txn_id,
442            table: table.to_string(),
443            key: Vec::new(),
444            operation: CdcOperation::SchemaChange { ddl },
445        });
446    }
447
448    /// Flush all pending events to the CDC log.
449    /// Call this AFTER the transaction has been committed successfully.
450    pub fn flush(self) {
451        if !self.pending.is_empty() {
452            self.log.emit(self.pending);
453        }
454    }
455
456    /// Discard all pending events (e.g., on transaction abort).
457    pub fn discard(self) {
458        // drop self — pending events are lost
459    }
460
461    /// Number of pending events.
462    pub fn pending_count(&self) -> usize {
463        self.pending.len()
464    }
465}
466
467// ============================================================================
468// CDC Subscriber — Convenience wrapper for consuming events
469// ============================================================================
470
471/// A subscriber that tracks its position in the CDC log.
472pub struct CdcSubscriber {
473    log: Arc<CdcLog>,
474    /// The last sequence number that was consumed.
475    last_seq: u64,
476    /// Table filter (if Some, only events for these tables are returned).
477    table_filter: Option<Vec<String>>,
478}
479
480impl CdcSubscriber {
481    /// Create a subscriber starting from the given sequence number.
482    /// Use `0` to start from the beginning of the buffer.
483    pub fn new(log: Arc<CdcLog>, from_seq: u64) -> Self {
484        Self {
485            log,
486            last_seq: from_seq,
487            table_filter: None,
488        }
489    }
490
491    /// Create a subscriber starting from the current (latest) position.
492    pub fn from_latest(log: Arc<CdcLog>) -> Self {
493        let seq = log.current_sequence();
494        Self {
495            log,
496            last_seq: seq,
497            table_filter: None,
498        }
499    }
500
501    /// Filter events to only include the given tables.
502    pub fn with_tables(mut self, tables: Vec<String>) -> Self {
503        self.table_filter = Some(tables);
504        self
505    }
506
507    /// Poll for new events (non-blocking).
508    pub fn poll(&mut self, max_events: usize) -> CdcResult<Vec<CdcEvent>> {
509        let events = self.log.read_from(self.last_seq + 1, max_events)?;
510        let filtered = self.filter_events(events);
511        if let Some(last) = filtered.last() {
512            self.last_seq = last.sequence;
513        }
514        Ok(filtered)
515    }
516
517    /// Wait for new events (blocking with timeout).
518    pub fn next_batch(&mut self, max_events: usize, timeout: Duration) -> CdcResult<Vec<CdcEvent>> {
519        let events = self
520            .log
521            .wait_for_events(self.last_seq, max_events, timeout)?;
522        let filtered = self.filter_events(events);
523        if let Some(last) = filtered.last() {
524            self.last_seq = last.sequence;
525        }
526        Ok(filtered)
527    }
528
529    /// Get the subscriber's current position.
530    pub fn position(&self) -> u64 {
531        self.last_seq
532    }
533
534    fn filter_events(&self, events: Vec<CdcEvent>) -> Vec<CdcEvent> {
535        if let Some(ref tables) = self.table_filter {
536            events
537                .into_iter()
538                .filter(|e| tables.iter().any(|t| *t == e.table))
539                .collect()
540        } else {
541            events
542        }
543    }
544}
545
546// ============================================================================
547// Tests
548// ============================================================================
549
550#[cfg(test)]
551mod tests {
552    use super::*;
553    use std::thread;
554
555    fn make_log(cap: usize) -> Arc<CdcLog> {
556        CdcLog::new(CdcConfig {
557            capacity: cap,
558            enabled: true,
559        })
560    }
561
562    #[test]
563    fn test_cdc_emit_and_read() {
564        let log = make_log(100);
565        let mut emitter = CdcEmitter::new(log.clone(), 42);
566
567        emitter.insert("users", b"key1".to_vec(), b"val1".to_vec());
568        emitter.insert("users", b"key2".to_vec(), b"val2".to_vec());
569        assert_eq!(emitter.pending_count(), 2);
570
571        emitter.flush();
572
573        assert_eq!(log.len(), 2);
574        let events = log.read_from(1, 10).unwrap();
575        assert_eq!(events.len(), 2);
576        assert_eq!(events[0].table, "users");
577        assert_eq!(events[0].txn_id, 42);
578        assert_eq!(events[0].sequence, 1);
579        assert_eq!(events[1].sequence, 2);
580    }
581
582    #[test]
583    fn test_cdc_ring_buffer_overflow() {
584        let log = make_log(3);
585
586        for i in 1..=5 {
587            log.emit_one(CdcEvent {
588                sequence: log.next_sequence(),
589                timestamp_us: 0,
590                txn_id: i,
591                table: "t".into(),
592                key: vec![i as u8],
593                operation: CdcOperation::Insert {
594                    after: vec![i as u8],
595                },
596            });
597        }
598
599        // Buffer holds only the last 3 events (seq 3, 4, 5)
600        assert_eq!(log.len(), 3);
601        assert_eq!(log.oldest_sequence(), 3);
602
603        // Reading from seq 1 should return Overrun
604        let err = log.read_from(1, 10).unwrap_err();
605        assert!(matches!(
606            err,
607            CdcError::Overrun {
608                requested: 1,
609                oldest_available: 3
610            }
611        ));
612
613        // Reading from seq 3 should work
614        let events = log.read_from(3, 10).unwrap();
615        assert_eq!(events.len(), 3);
616    }
617
618    #[test]
619    fn test_cdc_subscriber() {
620        let log = make_log(100);
621
622        // Emit some events
623        let mut emitter = CdcEmitter::new(log.clone(), 1);
624        emitter.insert("users", b"u1".to_vec(), b"v1".to_vec());
625        emitter.insert("orders", b"o1".to_vec(), b"v2".to_vec());
626        emitter.flush();
627
628        // Subscribe from beginning
629        let mut sub = CdcSubscriber::new(log.clone(), 0);
630        let events = sub.poll(10).unwrap();
631        assert_eq!(events.len(), 2);
632        assert_eq!(sub.position(), 2);
633
634        // No new events
635        let events = sub.poll(10).unwrap();
636        assert_eq!(events.len(), 0);
637
638        // Emit more
639        let mut emitter = CdcEmitter::new(log.clone(), 2);
640        emitter.update("users", b"u1".to_vec(), b"v1_updated".to_vec());
641        emitter.flush();
642
643        let events = sub.poll(10).unwrap();
644        assert_eq!(events.len(), 1);
645        assert!(matches!(events[0].operation, CdcOperation::Update { .. }));
646    }
647
648    #[test]
649    fn test_cdc_table_filter() {
650        let log = make_log(100);
651
652        let mut emitter = CdcEmitter::new(log.clone(), 1);
653        emitter.insert("users", b"u1".to_vec(), b"v1".to_vec());
654        emitter.insert("orders", b"o1".to_vec(), b"v2".to_vec());
655        emitter.insert("users", b"u2".to_vec(), b"v3".to_vec());
656        emitter.flush();
657
658        let mut sub = CdcSubscriber::new(log.clone(), 0).with_tables(vec!["users".to_string()]);
659        let events = sub.poll(10).unwrap();
660        assert_eq!(events.len(), 2);
661        assert!(events.iter().all(|e| e.table == "users"));
662    }
663
664    #[test]
665    fn test_cdc_subscriber_from_latest() {
666        let log = make_log(100);
667
668        // Emit events before subscriber
669        log.emit_one(CdcEvent {
670            sequence: log.next_sequence(),
671            timestamp_us: 0,
672            txn_id: 1,
673            table: "old".into(),
674            key: vec![],
675            operation: CdcOperation::Insert { after: vec![] },
676        });
677
678        // Subscribe from latest — should not see old events
679        let mut sub = CdcSubscriber::from_latest(log.clone());
680
681        // Emit new event
682        log.emit_one(CdcEvent {
683            sequence: log.next_sequence(),
684            timestamp_us: 0,
685            txn_id: 2,
686            table: "new".into(),
687            key: vec![],
688            operation: CdcOperation::Insert { after: vec![] },
689        });
690
691        let events = sub.poll(10).unwrap();
692        assert_eq!(events.len(), 1);
693        assert_eq!(events[0].table, "new");
694    }
695
696    #[test]
697    fn test_cdc_wait_for_events() {
698        let log = make_log(100);
699        let log_clone = log.clone();
700
701        // Spawn a thread that emits events after a delay
702        let handle = thread::spawn(move || {
703            thread::sleep(Duration::from_millis(50));
704            log_clone.emit_one(CdcEvent {
705                sequence: log_clone.next_sequence(),
706                timestamp_us: 0,
707                txn_id: 1,
708                table: "t".into(),
709                key: vec![1],
710                operation: CdcOperation::Insert { after: vec![1] },
711            });
712        });
713
714        let events = log.wait_for_events(0, 10, Duration::from_secs(2)).unwrap();
715        assert_eq!(events.len(), 1);
716        handle.join().unwrap();
717    }
718
719    #[test]
720    fn test_cdc_wait_timeout() {
721        let log = make_log(100);
722        let err = log
723            .wait_for_events(0, 10, Duration::from_millis(50))
724            .unwrap_err();
725        assert!(matches!(err, CdcError::Timeout));
726    }
727
728    #[test]
729    fn test_cdc_shutdown() {
730        let log = make_log(100);
731        let log_clone = log.clone();
732
733        let handle =
734            thread::spawn(move || log_clone.wait_for_events(0, 10, Duration::from_secs(5)));
735
736        thread::sleep(Duration::from_millis(50));
737        log.shutdown();
738
739        let result = handle.join().unwrap();
740        assert!(matches!(result, Err(CdcError::Shutdown)));
741    }
742
743    #[test]
744    fn test_cdc_emitter_discard() {
745        let log = make_log(100);
746        let mut emitter = CdcEmitter::new(log.clone(), 1);
747        emitter.insert("t", b"k".to_vec(), b"v".to_vec());
748        emitter.discard(); // should NOT emit
749
750        assert!(log.is_empty());
751    }
752
753    #[test]
754    fn test_cdc_schema_change() {
755        let log = make_log(100);
756        let mut emitter = CdcEmitter::new(log.clone(), 1);
757        emitter.schema_change("users", "ALTER TABLE users ADD COLUMN age INT".to_string());
758        emitter.flush();
759
760        let events = log.read_from(1, 10).unwrap();
761        assert_eq!(events.len(), 1);
762        assert!(matches!(
763            &events[0].operation,
764            CdcOperation::SchemaChange { ddl } if ddl.contains("ALTER TABLE")
765        ));
766    }
767
768    #[test]
769    fn test_cdc_concurrent_emit_and_read() {
770        let log = make_log(10_000);
771        let log_clone = log.clone();
772
773        // Writer thread
774        let writer = thread::spawn(move || {
775            for i in 0..1000 {
776                log_clone.emit_one(CdcEvent {
777                    sequence: log_clone.next_sequence(),
778                    timestamp_us: 0,
779                    txn_id: i as u64,
780                    table: "t".into(),
781                    key: vec![],
782                    operation: CdcOperation::Insert { after: vec![] },
783                });
784            }
785        });
786
787        writer.join().unwrap();
788
789        let events = log.read_from(1, 10_000).unwrap();
790        assert_eq!(events.len(), 1000);
791        // Verify monotonic sequences
792        for i in 1..events.len() {
793            assert!(events[i].sequence > events[i - 1].sequence);
794        }
795    }
796}