1use std::sync::Arc;
7
8use tokio::sync::mpsc;
9use tracing::{debug, info};
10
11use ormdb_proto::replication::ChangeLogEntry;
12
13use crate::pubsub::PubSubManager;
14
15pub struct CDCProcessor {
20 rx: mpsc::Receiver<ChangeLogEntry>,
22 pubsub: Arc<PubSubManager>,
24}
25
26impl CDCProcessor {
27 pub fn new(rx: mpsc::Receiver<ChangeLogEntry>, pubsub: Arc<PubSubManager>) -> Self {
29 Self { rx, pubsub }
30 }
31
32 pub async fn run(mut self) {
36 info!("CDC processor started");
37
38 while let Some(entry) = self.rx.recv().await {
39 self.process_entry(&entry).await;
40 }
41
42 info!("CDC processor stopped (channel closed)");
43 }
44
45 async fn process_entry(&self, entry: &ChangeLogEntry) {
47 debug!(
48 lsn = entry.lsn,
49 entity = %entry.entity_type,
50 change_type = ?entry.change_type,
51 "processing CDC entry"
52 );
53
54 self.pubsub
55 .publish_event(
56 &entry.entity_type,
57 entry.entity_id,
58 entry.change_type,
59 entry.changed_fields.clone(),
60 entry.schema_version,
61 )
62 .await;
63 }
64}
65
66pub type CDCSender = mpsc::Sender<ChangeLogEntry>;
68
69pub type CDCReceiver = mpsc::Receiver<ChangeLogEntry>;
71
72pub fn channel(buffer_size: usize) -> (CDCSender, CDCReceiver) {
74 mpsc::channel(buffer_size)
75}
76
77pub struct CDCHandle {
79 tx: CDCSender,
80}
81
82impl CDCHandle {
83 pub fn new(tx: CDCSender) -> Self {
85 Self { tx }
86 }
87
88 pub fn try_send(&self, entry: ChangeLogEntry) -> Result<(), mpsc::error::TrySendError<ChangeLogEntry>> {
92 self.tx.try_send(entry)
93 }
94
95 pub async fn send(&self, entry: ChangeLogEntry) -> Result<(), mpsc::error::SendError<ChangeLogEntry>> {
97 self.tx.send(entry).await
98 }
99
100 pub fn sender(&self) -> &CDCSender {
102 &self.tx
103 }
104
105 pub fn clone_sender(&self) -> CDCSender {
107 self.tx.clone()
108 }
109}
110
111impl Clone for CDCHandle {
112 fn clone(&self) -> Self {
113 Self {
114 tx: self.tx.clone(),
115 }
116 }
117}
118
119pub fn start_processor(pubsub: Arc<PubSubManager>, buffer_size: usize) -> CDCHandle {
123 let (tx, rx) = channel(buffer_size);
124 let processor = CDCProcessor::new(rx, pubsub);
125
126 tokio::spawn(async move {
127 processor.run().await;
128 });
129
130 CDCHandle::new(tx)
131}
132
133#[cfg(test)]
134mod tests {
135 use super::*;
136 use ormdb_proto::ChangeType;
137
138 fn create_test_entry(entity: &str, id: [u8; 16], lsn: u64) -> ChangeLogEntry {
139 ChangeLogEntry {
140 lsn,
141 timestamp: lsn * 1000,
142 entity_type: entity.to_string(),
143 entity_id: id,
144 change_type: ChangeType::Insert,
145 changed_fields: vec!["name".to_string()],
146 before_data: None,
147 after_data: Some(vec![1, 2, 3, 4]),
148 schema_version: 1,
149 }
150 }
151
152 #[tokio::test]
153 async fn test_cdc_processor_processes_entries() {
154 let pubsub = Arc::new(PubSubManager::new());
155 let (tx, rx) = channel(10);
156
157 let processor = CDCProcessor::new(rx, pubsub.clone());
159 let handle = tokio::spawn(async move {
160 processor.run().await;
161 });
162
163 let entry = create_test_entry("User", [1u8; 16], 1);
165 tx.send(entry).await.unwrap();
166
167 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
169
170 drop(tx);
172
173 handle.await.unwrap();
175 }
176
177 #[tokio::test]
178 async fn test_cdc_handle_try_send() {
179 let pubsub = Arc::new(PubSubManager::new());
180 let handle = start_processor(pubsub, 10);
181
182 let entry = create_test_entry("User", [1u8; 16], 1);
183 assert!(handle.try_send(entry).is_ok());
184 }
185
186 #[tokio::test]
187 async fn test_cdc_handle_clone() {
188 let pubsub = Arc::new(PubSubManager::new());
189 let handle1 = start_processor(pubsub, 10);
190 let handle2 = handle1.clone();
191
192 let entry1 = create_test_entry("User", [1u8; 16], 1);
193 let entry2 = create_test_entry("Post", [2u8; 16], 2);
194
195 assert!(handle1.try_send(entry1).is_ok());
196 assert!(handle2.try_send(entry2).is_ok());
197 }
198}