Skip to main content

ormdb_server/cdc/
mod.rs

1//! Change Data Capture (CDC) processing.
2//!
3//! This module provides a background task that processes CDC events from the
4//! changelog and publishes them to the PubSubManager for subscriber delivery.
5
6use std::sync::Arc;
7
8use tokio::sync::mpsc;
9use tracing::{debug, info};
10
11use ormdb_proto::replication::ChangeLogEntry;
12
13use crate::pubsub::PubSubManager;
14
15/// CDC event processor that bridges the changelog to the PubSubManager.
16///
17/// This processor receives change log entries from a channel and publishes
18/// them as events to matching subscribers via the PubSubManager.
19pub struct CDCProcessor {
20    /// Receiver for changelog entries.
21    rx: mpsc::Receiver<ChangeLogEntry>,
22    /// Pub-sub manager for event distribution.
23    pubsub: Arc<PubSubManager>,
24}
25
26impl CDCProcessor {
27    /// Create a new CDC processor.
28    pub fn new(rx: mpsc::Receiver<ChangeLogEntry>, pubsub: Arc<PubSubManager>) -> Self {
29        Self { rx, pubsub }
30    }
31
32    /// Run the CDC processor as a background task.
33    ///
34    /// This will process events until the channel is closed.
35    pub async fn run(mut self) {
36        info!("CDC processor started");
37
38        while let Some(entry) = self.rx.recv().await {
39            self.process_entry(&entry).await;
40        }
41
42        info!("CDC processor stopped (channel closed)");
43    }
44
45    /// Process a single changelog entry.
46    async fn process_entry(&self, entry: &ChangeLogEntry) {
47        debug!(
48            lsn = entry.lsn,
49            entity = %entry.entity_type,
50            change_type = ?entry.change_type,
51            "processing CDC entry"
52        );
53
54        self.pubsub
55            .publish_event(
56                &entry.entity_type,
57                entry.entity_id,
58                entry.change_type,
59                entry.changed_fields.clone(),
60                entry.schema_version,
61            )
62            .await;
63    }
64}
65
66/// CDC channel sender for submitting changelog entries.
67pub type CDCSender = mpsc::Sender<ChangeLogEntry>;
68
69/// CDC channel receiver for processing changelog entries.
70pub type CDCReceiver = mpsc::Receiver<ChangeLogEntry>;
71
72/// Create a new CDC channel with the given buffer size.
73pub fn channel(buffer_size: usize) -> (CDCSender, CDCReceiver) {
74    mpsc::channel(buffer_size)
75}
76
77/// Handle for a running CDC processor task.
78pub struct CDCHandle {
79    tx: CDCSender,
80}
81
82impl CDCHandle {
83    /// Create a new CDC handle.
84    pub fn new(tx: CDCSender) -> Self {
85        Self { tx }
86    }
87
88    /// Submit a changelog entry for processing.
89    ///
90    /// This is non-blocking and will return an error if the channel is full.
91    pub fn try_send(&self, entry: ChangeLogEntry) -> Result<(), mpsc::error::TrySendError<ChangeLogEntry>> {
92        self.tx.try_send(entry)
93    }
94
95    /// Submit a changelog entry, waiting if the channel is full.
96    pub async fn send(&self, entry: ChangeLogEntry) -> Result<(), mpsc::error::SendError<ChangeLogEntry>> {
97        self.tx.send(entry).await
98    }
99
100    /// Get a reference to the sender.
101    pub fn sender(&self) -> &CDCSender {
102        &self.tx
103    }
104
105    /// Clone the sender for use in another context.
106    pub fn clone_sender(&self) -> CDCSender {
107        self.tx.clone()
108    }
109}
110
111impl Clone for CDCHandle {
112    fn clone(&self) -> Self {
113        Self {
114            tx: self.tx.clone(),
115        }
116    }
117}
118
119/// Start a CDC processor and return a handle for sending events.
120///
121/// This spawns a background task that processes CDC events.
122pub fn start_processor(pubsub: Arc<PubSubManager>, buffer_size: usize) -> CDCHandle {
123    let (tx, rx) = channel(buffer_size);
124    let processor = CDCProcessor::new(rx, pubsub);
125
126    tokio::spawn(async move {
127        processor.run().await;
128    });
129
130    CDCHandle::new(tx)
131}
132
133#[cfg(test)]
134mod tests {
135    use super::*;
136    use ormdb_proto::ChangeType;
137
138    fn create_test_entry(entity: &str, id: [u8; 16], lsn: u64) -> ChangeLogEntry {
139        ChangeLogEntry {
140            lsn,
141            timestamp: lsn * 1000,
142            entity_type: entity.to_string(),
143            entity_id: id,
144            change_type: ChangeType::Insert,
145            changed_fields: vec!["name".to_string()],
146            before_data: None,
147            after_data: Some(vec![1, 2, 3, 4]),
148            schema_version: 1,
149        }
150    }
151
152    #[tokio::test]
153    async fn test_cdc_processor_processes_entries() {
154        let pubsub = Arc::new(PubSubManager::new());
155        let (tx, rx) = channel(10);
156
157        // Spawn processor
158        let processor = CDCProcessor::new(rx, pubsub.clone());
159        let handle = tokio::spawn(async move {
160            processor.run().await;
161        });
162
163        // Send an entry
164        let entry = create_test_entry("User", [1u8; 16], 1);
165        tx.send(entry).await.unwrap();
166
167        // Give it time to process
168        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
169
170        // Close the channel
171        drop(tx);
172
173        // Wait for processor to finish
174        handle.await.unwrap();
175    }
176
177    #[tokio::test]
178    async fn test_cdc_handle_try_send() {
179        let pubsub = Arc::new(PubSubManager::new());
180        let handle = start_processor(pubsub, 10);
181
182        let entry = create_test_entry("User", [1u8; 16], 1);
183        assert!(handle.try_send(entry).is_ok());
184    }
185
186    #[tokio::test]
187    async fn test_cdc_handle_clone() {
188        let pubsub = Arc::new(PubSubManager::new());
189        let handle1 = start_processor(pubsub, 10);
190        let handle2 = handle1.clone();
191
192        let entry1 = create_test_entry("User", [1u8; 16], 1);
193        let entry2 = create_test_entry("Post", [2u8; 16], 2);
194
195        assert!(handle1.try_send(entry1).is_ok());
196        assert!(handle2.try_send(entry2).is_ok());
197    }
198}