Skip to main content

sochdb_storage/
cdc.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4
5//! # WAL-Derived Change Data Capture (CDC) Engine
6//!
7//! Provides a log-structured stream of database mutations (inserts, updates,
8//! deletes) that subscribers can consume from any position.
9//!
10//! ## Architecture
11//!
12//! ```text
13//!  Database commit path
14//!         │
15//!         ▼
16//!   ┌───────────┐    emit()     ┌─────────────┐
17//!   │ CdcEmitter│──────────────▶│  CdcLog      │
18//!   └───────────┘               │ (ring buffer) │
19//!                               └──────┬────────┘
20//!                                      │ subscribe(from_seq)
21//!                          ┌───────────┼───────────┐
22//!                          ▼           ▼           ▼
23//!                     Subscriber₁  Subscriber₂  SubscriberN
24//! ```
25//!
26//! ## Design Decisions
27//!
28//! - **After-image only**: Events carry the new value but not the old value.
29//!   The active WAL path (`TxnWalEntry`) doesn't record before-images.
30//!   A future enhancement can bridge the ARIES `WalRecord` path for full
31//!   before/after CDC.
32//!
33//! - **Ring buffer with overflow**: Fixed-capacity ring buffer. When the buffer
34//!   is full, the oldest events are dropped. Slow subscribers must catch up
35//!   from WAL replay (not yet implemented — returns `CdcError::Overrun`).
36//!
37//! - **Sequence numbers**: Events are assigned monotonically increasing sequence
38//!   numbers, independent of WAL LSNs. Subscribers track their position via
39//!   these sequence numbers.
40//!
41
42use std::collections::VecDeque;
43use std::sync::atomic::{AtomicU64, Ordering};
44use std::sync::{Arc, Condvar, Mutex, RwLock};
45use std::time::{Duration, SystemTime, UNIX_EPOCH};
46
47// ============================================================================
48// CDC Event Types
49// ============================================================================
50
51/// A CDC event representing a single mutation.
52#[derive(Debug, Clone, PartialEq)]
53pub struct CdcEvent {
54    /// Monotonically increasing sequence number.
55    pub sequence: u64,
56    /// Timestamp (microseconds since epoch).
57    pub timestamp_us: u64,
58    /// Transaction ID that produced this event.
59    pub txn_id: u64,
60    /// Name of the affected table.
61    pub table: String,
62    /// Primary key (raw bytes).
63    pub key: Vec<u8>,
64    /// The type of operation.
65    pub operation: CdcOperation,
66}
67
68/// The type of mutation.
69#[derive(Debug, Clone, PartialEq, Eq)]
70pub enum CdcOperation {
71    /// Row inserted. `after` contains the new value.
72    Insert { after: Vec<u8> },
73    /// Row updated. `after` contains the new value.
74    /// `before` is `None` in the current implementation (after-image only).
75    Update {
76        before: Option<Vec<u8>>,
77        after: Vec<u8>,
78    },
79    /// Row deleted. `before` is `None` in the current implementation.
80    Delete { before: Option<Vec<u8>> },
81    /// Schema change (CREATE TABLE, ALTER TABLE, DROP TABLE).
82    SchemaChange { ddl: String },
83}
84
85// ============================================================================
86// CDC Errors
87// ============================================================================
88
89/// Errors that can occur in the CDC subsystem.
90#[derive(Debug, Clone, PartialEq, Eq)]
91pub enum CdcError {
92    /// The requested sequence number has been evicted from the ring buffer.
93    /// The subscriber must fall back to WAL replay.
94    Overrun {
95        requested: u64,
96        oldest_available: u64,
97    },
98    /// The CDC engine has been shut down.
99    Shutdown,
100    /// Timed out waiting for new events.
101    Timeout,
102}
103
104impl std::fmt::Display for CdcError {
105    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106        match self {
107            CdcError::Overrun {
108                requested,
109                oldest_available,
110            } => write!(
111                f,
112                "CDC overrun: requested seq {} but oldest available is {}",
113                requested, oldest_available
114            ),
115            CdcError::Shutdown => write!(f, "CDC engine shut down"),
116            CdcError::Timeout => write!(f, "Timed out waiting for CDC events"),
117        }
118    }
119}
120
121impl std::error::Error for CdcError {}
122
123pub type CdcResult<T> = Result<T, CdcError>;
124
125// ============================================================================
126// CDC Log (Ring Buffer)
127// ============================================================================
128
129/// Configuration for the CDC engine.
130#[derive(Debug, Clone)]
131pub struct CdcConfig {
132    /// Maximum number of events to keep in the ring buffer.
133    /// Default: 65536 (~64K events). At ~1KB per event, this is ~64MB.
134    pub capacity: usize,
135    /// Whether CDC is enabled. If false, `emit()` is a no-op.
136    pub enabled: bool,
137}
138
139impl Default for CdcConfig {
140    fn default() -> Self {
141        Self {
142            capacity: 65_536,
143            enabled: true,
144        }
145    }
146}
147
148/// The core CDC log — a ring buffer of events with subscriber notification.
149pub struct CdcLog {
150    /// Ring buffer of events.
151    buffer: RwLock<VecDeque<CdcEvent>>,
152    /// Maximum ring buffer capacity.
153    capacity: usize,
154    /// Next sequence number to assign.
155    next_seq: AtomicU64,
156    /// Condition variable for subscriber notification.
157    notify: Arc<(Mutex<bool>, Condvar)>,
158    /// Whether the engine is running.
159    running: AtomicU64, // 0 = stopped, 1 = running
160}
161
162impl CdcLog {
163    /// Create a new CDC log with the given configuration.
164    pub fn new(config: CdcConfig) -> Arc<Self> {
165        Arc::new(Self {
166            buffer: RwLock::new(VecDeque::with_capacity(config.capacity)),
167            capacity: config.capacity,
168            next_seq: AtomicU64::new(1),
169            notify: Arc::new((Mutex::new(false), Condvar::new())),
170            running: AtomicU64::new(1),
171        })
172    }
173
174    /// Emit a batch of CDC events (typically one per row in a transaction).
175    ///
176    /// This is called from the commit path after WAL flush + group commit.
177    /// Must be fast — no I/O, no allocations on the hot path (beyond the
178    /// ring buffer push).
179    pub fn emit(&self, events: Vec<CdcEvent>) {
180        if self.running.load(Ordering::Relaxed) == 0 {
181            return;
182        }
183        if events.is_empty() {
184            return;
185        }
186
187        let mut buf = self.buffer.write().unwrap();
188        for event in events {
189            if buf.len() >= self.capacity {
190                buf.pop_front(); // drop oldest
191            }
192            buf.push_back(event);
193        }
194        drop(buf);
195
196        // Notify subscribers
197        let (lock, cvar) = &*self.notify;
198        let mut ready = lock.lock().unwrap();
199        *ready = true;
200        cvar.notify_all();
201    }
202
203    /// Emit a single event.
204    pub fn emit_one(&self, event: CdcEvent) {
205        if self.running.load(Ordering::Relaxed) == 0 {
206            return;
207        }
208
209        let mut buf = self.buffer.write().unwrap();
210        if buf.len() >= self.capacity {
211            buf.pop_front();
212        }
213        buf.push_back(event);
214        drop(buf);
215
216        let (lock, cvar) = &*self.notify;
217        let mut ready = lock.lock().unwrap();
218        *ready = true;
219        cvar.notify_all();
220    }
221
222    /// Allocate the next sequence number.
223    pub fn next_sequence(&self) -> u64 {
224        self.next_seq.fetch_add(1, Ordering::SeqCst)
225    }
226
227    /// Get the current (latest) sequence number.
228    pub fn current_sequence(&self) -> u64 {
229        self.next_seq.load(Ordering::SeqCst).saturating_sub(1)
230    }
231
232    /// Read events starting from `from_seq` (inclusive).
233    ///
234    /// Returns up to `max_events` events. If `from_seq` has been evicted
235    /// from the ring buffer, returns `CdcError::Overrun`.
236    pub fn read_from(&self, from_seq: u64, max_events: usize) -> CdcResult<Vec<CdcEvent>> {
237        let buf = self.buffer.read().unwrap();
238
239        if buf.is_empty() {
240            return Ok(Vec::new());
241        }
242
243        let oldest_seq = buf.front().map(|e| e.sequence).unwrap_or(0);
244        let newest_seq = buf.back().map(|e| e.sequence).unwrap_or(0);
245
246        if from_seq < oldest_seq {
247            return Err(CdcError::Overrun {
248                requested: from_seq,
249                oldest_available: oldest_seq,
250            });
251        }
252
253        if from_seq > newest_seq {
254            return Ok(Vec::new()); // no new events yet
255        }
256
257        // Binary search for the start position
258        let start_idx = buf
259            .iter()
260            .position(|e| e.sequence >= from_seq)
261            .unwrap_or(buf.len());
262
263        let events: Vec<CdcEvent> = buf
264            .iter()
265            .skip(start_idx)
266            .take(max_events)
267            .cloned()
268            .collect();
269
270        Ok(events)
271    }
272
273    /// Wait for new events after `after_seq`, with timeout.
274    ///
275    /// Returns events with sequence > `after_seq`, blocking until at least
276    /// one event is available or the timeout expires.
277    pub fn wait_for_events(
278        &self,
279        after_seq: u64,
280        max_events: usize,
281        timeout: Duration,
282    ) -> CdcResult<Vec<CdcEvent>> {
283        if self.running.load(Ordering::Relaxed) == 0 {
284            return Err(CdcError::Shutdown);
285        }
286
287        // Fast path: check if events are already available
288        let events = self.read_from(after_seq + 1, max_events)?;
289        if !events.is_empty() {
290            return Ok(events);
291        }
292
293        // Slow path: wait for notification
294        let (lock, cvar) = &*self.notify;
295        let mut ready = lock.lock().unwrap();
296        let start = std::time::Instant::now();
297
298        loop {
299            if self.running.load(Ordering::Relaxed) == 0 {
300                return Err(CdcError::Shutdown);
301            }
302
303            let remaining = timeout.checked_sub(start.elapsed()).unwrap_or(Duration::ZERO);
304            if remaining.is_zero() {
305                return Err(CdcError::Timeout);
306            }
307
308            let result = cvar.wait_timeout(ready, remaining).unwrap();
309            ready = result.0;
310
311            // Check for events
312            let events = self.read_from(after_seq + 1, max_events)?;
313            if !events.is_empty() {
314                *ready = false;
315                return Ok(events);
316            }
317
318            if result.1.timed_out() {
319                return Err(CdcError::Timeout);
320            }
321        }
322    }
323
324    /// Get the oldest available sequence number (or 0 if empty).
325    pub fn oldest_sequence(&self) -> u64 {
326        self.buffer
327            .read()
328            .unwrap()
329            .front()
330            .map(|e| e.sequence)
331            .unwrap_or(0)
332    }
333
334    /// Get the number of events in the buffer.
335    pub fn len(&self) -> usize {
336        self.buffer.read().unwrap().len()
337    }
338
339    /// Check if the buffer is empty.
340    pub fn is_empty(&self) -> bool {
341        self.buffer.read().unwrap().is_empty()
342    }
343
344    /// Shut down the CDC engine, waking all waiting subscribers.
345    pub fn shutdown(&self) {
346        self.running.store(0, Ordering::SeqCst);
347        let (lock, cvar) = &*self.notify;
348        let mut ready = lock.lock().unwrap();
349        *ready = true;
350        cvar.notify_all();
351        drop(ready);
352    }
353}
354
355// ============================================================================
356// CDC Emitter — Helper for producing CDC events from the commit path
357// ============================================================================
358
359/// Helper struct for building CDC events during a transaction.
360///
361/// Usage (in the SQL execution layer):
362/// ```ignore
363/// let mut emitter = CdcEmitter::new(cdc_log.clone(), txn_id);
364/// emitter.insert("users", key, value);
365/// emitter.update("users", key, new_value);
366/// emitter.delete("users", key);
367/// emitter.flush(); // called after successful commit
368/// ```
369pub struct CdcEmitter {
370    log: Arc<CdcLog>,
371    txn_id: u64,
372    pending: Vec<CdcEvent>,
373}
374
375impl CdcEmitter {
376    pub fn new(log: Arc<CdcLog>, txn_id: u64) -> Self {
377        Self {
378            log,
379            txn_id,
380            pending: Vec::new(),
381        }
382    }
383
384    fn now_us() -> u64 {
385        SystemTime::now()
386            .duration_since(UNIX_EPOCH)
387            .unwrap_or_default()
388            .as_micros() as u64
389    }
390
391    /// Record an INSERT event.
392    pub fn insert(&mut self, table: &str, key: Vec<u8>, value: Vec<u8>) {
393        let seq = self.log.next_sequence();
394        self.pending.push(CdcEvent {
395            sequence: seq,
396            timestamp_us: Self::now_us(),
397            txn_id: self.txn_id,
398            table: table.to_string(),
399            key,
400            operation: CdcOperation::Insert { after: value },
401        });
402    }
403
404    /// Record an UPDATE event.
405    pub fn update(&mut self, table: &str, key: Vec<u8>, new_value: Vec<u8>) {
406        let seq = self.log.next_sequence();
407        self.pending.push(CdcEvent {
408            sequence: seq,
409            timestamp_us: Self::now_us(),
410            txn_id: self.txn_id,
411            table: table.to_string(),
412            key,
413            operation: CdcOperation::Update {
414                before: None,
415                after: new_value,
416            },
417        });
418    }
419
420    /// Record a DELETE event.
421    pub fn delete(&mut self, table: &str, key: Vec<u8>) {
422        let seq = self.log.next_sequence();
423        self.pending.push(CdcEvent {
424            sequence: seq,
425            timestamp_us: Self::now_us(),
426            txn_id: self.txn_id,
427            table: table.to_string(),
428            key,
429            operation: CdcOperation::Delete { before: None },
430        });
431    }
432
433    /// Record a schema change event.
434    pub fn schema_change(&mut self, table: &str, ddl: String) {
435        let seq = self.log.next_sequence();
436        self.pending.push(CdcEvent {
437            sequence: seq,
438            timestamp_us: Self::now_us(),
439            txn_id: self.txn_id,
440            table: table.to_string(),
441            key: Vec::new(),
442            operation: CdcOperation::SchemaChange { ddl },
443        });
444    }
445
446    /// Flush all pending events to the CDC log.
447    /// Call this AFTER the transaction has been committed successfully.
448    pub fn flush(self) {
449        if !self.pending.is_empty() {
450            self.log.emit(self.pending);
451        }
452    }
453
454    /// Discard all pending events (e.g., on transaction abort).
455    pub fn discard(self) {
456        // drop self — pending events are lost
457    }
458
459    /// Number of pending events.
460    pub fn pending_count(&self) -> usize {
461        self.pending.len()
462    }
463}
464
465// ============================================================================
466// CDC Subscriber — Convenience wrapper for consuming events
467// ============================================================================
468
469/// A subscriber that tracks its position in the CDC log.
470pub struct CdcSubscriber {
471    log: Arc<CdcLog>,
472    /// The last sequence number that was consumed.
473    last_seq: u64,
474    /// Table filter (if Some, only events for these tables are returned).
475    table_filter: Option<Vec<String>>,
476}
477
478impl CdcSubscriber {
479    /// Create a subscriber starting from the given sequence number.
480    /// Use `0` to start from the beginning of the buffer.
481    pub fn new(log: Arc<CdcLog>, from_seq: u64) -> Self {
482        Self {
483            log,
484            last_seq: from_seq,
485            table_filter: None,
486        }
487    }
488
489    /// Create a subscriber starting from the current (latest) position.
490    pub fn from_latest(log: Arc<CdcLog>) -> Self {
491        let seq = log.current_sequence();
492        Self {
493            log,
494            last_seq: seq,
495            table_filter: None,
496        }
497    }
498
499    /// Filter events to only include the given tables.
500    pub fn with_tables(mut self, tables: Vec<String>) -> Self {
501        self.table_filter = Some(tables);
502        self
503    }
504
505    /// Poll for new events (non-blocking).
506    pub fn poll(&mut self, max_events: usize) -> CdcResult<Vec<CdcEvent>> {
507        let events = self.log.read_from(self.last_seq + 1, max_events)?;
508        let filtered = self.filter_events(events);
509        if let Some(last) = filtered.last() {
510            self.last_seq = last.sequence;
511        }
512        Ok(filtered)
513    }
514
515    /// Wait for new events (blocking with timeout).
516    pub fn next_batch(
517        &mut self,
518        max_events: usize,
519        timeout: Duration,
520    ) -> CdcResult<Vec<CdcEvent>> {
521        let events = self
522            .log
523            .wait_for_events(self.last_seq, max_events, timeout)?;
524        let filtered = self.filter_events(events);
525        if let Some(last) = filtered.last() {
526            self.last_seq = last.sequence;
527        }
528        Ok(filtered)
529    }
530
531    /// Get the subscriber's current position.
532    pub fn position(&self) -> u64 {
533        self.last_seq
534    }
535
536    fn filter_events(&self, events: Vec<CdcEvent>) -> Vec<CdcEvent> {
537        if let Some(ref tables) = self.table_filter {
538            events
539                .into_iter()
540                .filter(|e| tables.iter().any(|t| *t == e.table))
541                .collect()
542        } else {
543            events
544        }
545    }
546}
547
548// ============================================================================
549// Tests
550// ============================================================================
551
552#[cfg(test)]
553mod tests {
554    use super::*;
555    use std::thread;
556
557    fn make_log(cap: usize) -> Arc<CdcLog> {
558        CdcLog::new(CdcConfig {
559            capacity: cap,
560            enabled: true,
561        })
562    }
563
564    #[test]
565    fn test_cdc_emit_and_read() {
566        let log = make_log(100);
567        let mut emitter = CdcEmitter::new(log.clone(), 42);
568
569        emitter.insert("users", b"key1".to_vec(), b"val1".to_vec());
570        emitter.insert("users", b"key2".to_vec(), b"val2".to_vec());
571        assert_eq!(emitter.pending_count(), 2);
572
573        emitter.flush();
574
575        assert_eq!(log.len(), 2);
576        let events = log.read_from(1, 10).unwrap();
577        assert_eq!(events.len(), 2);
578        assert_eq!(events[0].table, "users");
579        assert_eq!(events[0].txn_id, 42);
580        assert_eq!(events[0].sequence, 1);
581        assert_eq!(events[1].sequence, 2);
582    }
583
584    #[test]
585    fn test_cdc_ring_buffer_overflow() {
586        let log = make_log(3);
587
588        for i in 1..=5 {
589            log.emit_one(CdcEvent {
590                sequence: log.next_sequence(),
591                timestamp_us: 0,
592                txn_id: i,
593                table: "t".into(),
594                key: vec![i as u8],
595                operation: CdcOperation::Insert {
596                    after: vec![i as u8],
597                },
598            });
599        }
600
601        // Buffer holds only the last 3 events (seq 3, 4, 5)
602        assert_eq!(log.len(), 3);
603        assert_eq!(log.oldest_sequence(), 3);
604
605        // Reading from seq 1 should return Overrun
606        let err = log.read_from(1, 10).unwrap_err();
607        assert!(matches!(
608            err,
609            CdcError::Overrun {
610                requested: 1,
611                oldest_available: 3
612            }
613        ));
614
615        // Reading from seq 3 should work
616        let events = log.read_from(3, 10).unwrap();
617        assert_eq!(events.len(), 3);
618    }
619
620    #[test]
621    fn test_cdc_subscriber() {
622        let log = make_log(100);
623
624        // Emit some events
625        let mut emitter = CdcEmitter::new(log.clone(), 1);
626        emitter.insert("users", b"u1".to_vec(), b"v1".to_vec());
627        emitter.insert("orders", b"o1".to_vec(), b"v2".to_vec());
628        emitter.flush();
629
630        // Subscribe from beginning
631        let mut sub = CdcSubscriber::new(log.clone(), 0);
632        let events = sub.poll(10).unwrap();
633        assert_eq!(events.len(), 2);
634        assert_eq!(sub.position(), 2);
635
636        // No new events
637        let events = sub.poll(10).unwrap();
638        assert_eq!(events.len(), 0);
639
640        // Emit more
641        let mut emitter = CdcEmitter::new(log.clone(), 2);
642        emitter.update("users", b"u1".to_vec(), b"v1_updated".to_vec());
643        emitter.flush();
644
645        let events = sub.poll(10).unwrap();
646        assert_eq!(events.len(), 1);
647        assert!(matches!(
648            events[0].operation,
649            CdcOperation::Update { .. }
650        ));
651    }
652
653    #[test]
654    fn test_cdc_table_filter() {
655        let log = make_log(100);
656
657        let mut emitter = CdcEmitter::new(log.clone(), 1);
658        emitter.insert("users", b"u1".to_vec(), b"v1".to_vec());
659        emitter.insert("orders", b"o1".to_vec(), b"v2".to_vec());
660        emitter.insert("users", b"u2".to_vec(), b"v3".to_vec());
661        emitter.flush();
662
663        let mut sub =
664            CdcSubscriber::new(log.clone(), 0).with_tables(vec!["users".to_string()]);
665        let events = sub.poll(10).unwrap();
666        assert_eq!(events.len(), 2);
667        assert!(events.iter().all(|e| e.table == "users"));
668    }
669
670    #[test]
671    fn test_cdc_subscriber_from_latest() {
672        let log = make_log(100);
673
674        // Emit events before subscriber
675        log.emit_one(CdcEvent {
676            sequence: log.next_sequence(),
677            timestamp_us: 0,
678            txn_id: 1,
679            table: "old".into(),
680            key: vec![],
681            operation: CdcOperation::Insert { after: vec![] },
682        });
683
684        // Subscribe from latest — should not see old events
685        let mut sub = CdcSubscriber::from_latest(log.clone());
686
687        // Emit new event
688        log.emit_one(CdcEvent {
689            sequence: log.next_sequence(),
690            timestamp_us: 0,
691            txn_id: 2,
692            table: "new".into(),
693            key: vec![],
694            operation: CdcOperation::Insert { after: vec![] },
695        });
696
697        let events = sub.poll(10).unwrap();
698        assert_eq!(events.len(), 1);
699        assert_eq!(events[0].table, "new");
700    }
701
702    #[test]
703    fn test_cdc_wait_for_events() {
704        let log = make_log(100);
705        let log_clone = log.clone();
706
707        // Spawn a thread that emits events after a delay
708        let handle = thread::spawn(move || {
709            thread::sleep(Duration::from_millis(50));
710            log_clone.emit_one(CdcEvent {
711                sequence: log_clone.next_sequence(),
712                timestamp_us: 0,
713                txn_id: 1,
714                table: "t".into(),
715                key: vec![1],
716                operation: CdcOperation::Insert { after: vec![1] },
717            });
718        });
719
720        let events = log.wait_for_events(0, 10, Duration::from_secs(2)).unwrap();
721        assert_eq!(events.len(), 1);
722        handle.join().unwrap();
723    }
724
725    #[test]
726    fn test_cdc_wait_timeout() {
727        let log = make_log(100);
728        let err = log
729            .wait_for_events(0, 10, Duration::from_millis(50))
730            .unwrap_err();
731        assert!(matches!(err, CdcError::Timeout));
732    }
733
734    #[test]
735    fn test_cdc_shutdown() {
736        let log = make_log(100);
737        let log_clone = log.clone();
738
739        let handle = thread::spawn(move || {
740            log_clone.wait_for_events(0, 10, Duration::from_secs(5))
741        });
742
743        thread::sleep(Duration::from_millis(50));
744        log.shutdown();
745
746        let result = handle.join().unwrap();
747        assert!(matches!(result, Err(CdcError::Shutdown)));
748    }
749
750    #[test]
751    fn test_cdc_emitter_discard() {
752        let log = make_log(100);
753        let mut emitter = CdcEmitter::new(log.clone(), 1);
754        emitter.insert("t", b"k".to_vec(), b"v".to_vec());
755        emitter.discard(); // should NOT emit
756
757        assert!(log.is_empty());
758    }
759
760    #[test]
761    fn test_cdc_schema_change() {
762        let log = make_log(100);
763        let mut emitter = CdcEmitter::new(log.clone(), 1);
764        emitter.schema_change("users", "ALTER TABLE users ADD COLUMN age INT".to_string());
765        emitter.flush();
766
767        let events = log.read_from(1, 10).unwrap();
768        assert_eq!(events.len(), 1);
769        assert!(matches!(
770            &events[0].operation,
771            CdcOperation::SchemaChange { ddl } if ddl.contains("ALTER TABLE")
772        ));
773    }
774
775    #[test]
776    fn test_cdc_concurrent_emit_and_read() {
777        let log = make_log(10_000);
778        let log_clone = log.clone();
779
780        // Writer thread
781        let writer = thread::spawn(move || {
782            for i in 0..1000 {
783                log_clone.emit_one(CdcEvent {
784                    sequence: log_clone.next_sequence(),
785                    timestamp_us: 0,
786                    txn_id: i as u64,
787                    table: "t".into(),
788                    key: vec![],
789                    operation: CdcOperation::Insert { after: vec![] },
790                });
791            }
792        });
793
794        writer.join().unwrap();
795
796        let events = log.read_from(1, 10_000).unwrap();
797        assert_eq!(events.len(), 1000);
798        // Verify monotonic sequences
799        for i in 1..events.len() {
800            assert!(events[i].sequence > events[i - 1].sequence);
801        }
802    }
803}