vil_trigger_cdc/
source.rs1use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
17use std::sync::Arc;
18
19use async_trait::async_trait;
20use tokio_postgres::{Client, NoTls, SimpleQueryMessage};
21
22use vil_log::dict::register_str;
23use vil_log::{mq_log, types::MqPayload};
24
25use vil_trigger_core::traits::{EventCallback, TriggerSource};
26use vil_trigger_core::types::{TriggerEvent, TriggerFault};
27
28use crate::config::CdcConfig;
29use crate::error::CdcFault;
30
31const PG_INSERT: u8 = b'I';
33const PG_UPDATE: u8 = b'U';
34const PG_DELETE: u8 = b'D';
35
36pub struct CdcTrigger {
39 config: CdcConfig,
40 paused: Arc<AtomicBool>,
41 sequence: Arc<AtomicU64>,
42 slot_hash: u32,
44 kind_hash: u32,
46}
47
48impl CdcTrigger {
49 pub fn new(config: CdcConfig) -> Self {
51 let slot_hash = register_str(&config.slot_name);
52 let kind_hash = register_str("cdc");
53 Self {
54 config,
55 paused: Arc::new(AtomicBool::new(false)),
56 sequence: Arc::new(AtomicU64::new(0)),
57 slot_hash,
58 kind_hash,
59 }
60 }
61
62 fn map_fault(f: CdcFault, kind_hash: u32) -> TriggerFault {
64 TriggerFault::SourceUnavailable {
65 kind_hash,
66 reason_code: f.as_error_code(),
67 }
68 }
69
70 async fn connect_replication(&self) -> Result<Client, CdcFault> {
72 let conn_hash = register_str(&self.config.conn_string);
73 let repl_conn = format!("{} replication=database", self.config.conn_string);
75 let (client, connection) =
76 tokio_postgres::connect(&repl_conn, NoTls)
77 .await
78 .map_err(|e| CdcFault::ConnectionFailed {
79 conn_hash,
80 reason_code: e
81 .as_db_error()
82 .map(|d| d.code().code().len() as u32)
83 .unwrap_or(0),
84 })?;
85
86 tokio::spawn(async move {
88 let _ = connection.await;
89 });
90
91 Ok(client)
92 }
93
94 async fn consume_stream(
96 &self,
97 client: &Client,
98 on_event: &EventCallback,
99 ) -> Result<(), CdcFault> {
100 let pub_hash = register_str(&self.config.publication);
101 let slot_hash = self.slot_hash;
102 let kind_hash = self.kind_hash;
103
104 let start_sql = format!(
106 "START_REPLICATION SLOT {} LOGICAL 0/0 (proto_version '1', publication_names '{}')",
107 self.config.slot_name, self.config.publication
108 );
109
110 let rows = client.simple_query(&start_sql).await.map_err(|_| {
111 CdcFault::ReplicationStartFailed {
112 slot_hash,
113 pg_error_code: 0,
114 }
115 })?;
116
117 for msg in rows {
121 if self.paused.load(Ordering::Relaxed) {
122 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
123 continue;
124 }
125
126 if let SimpleQueryMessage::Row(row) = msg {
127 let start = std::time::Instant::now();
128 let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
129
130 let op_byte: u8 = row
132 .get(0)
133 .and_then(|s| s.as_bytes().first().copied())
134 .unwrap_or(0);
135
136 let mq_op: u8 = match op_byte {
138 PG_INSERT => 0, PG_UPDATE => 1, PG_DELETE => 2, _ => 0,
142 };
143
144 let table_str = row.get(1).unwrap_or("unknown");
145 let table_hash = register_str(table_str);
146 let elapsed = start.elapsed();
147
148 mq_log!(
150 Info,
151 MqPayload {
152 broker_hash: slot_hash,
153 topic_hash: table_hash,
154 group_hash: pub_hash,
155 offset: seq,
156 message_bytes: 0,
157 e2e_latency_us: elapsed.as_micros() as u32,
158 op_type: mq_op,
159 partition: 0,
160 retries: 0,
161 compression: 0,
162 ..MqPayload::default()
163 }
164 );
165
166 let ts = std::time::SystemTime::now()
167 .duration_since(std::time::UNIX_EPOCH)
168 .unwrap_or_default()
169 .as_nanos() as u64;
170
171 on_event(TriggerEvent {
172 kind_hash,
173 source_hash: slot_hash,
174 sequence: seq,
175 timestamp_ns: ts,
176 payload_bytes: 0,
177 op: 0,
178 _pad: [0; 3],
179 });
180 }
181 }
182
183 Ok(())
184 }
185}
186
187#[async_trait]
188impl TriggerSource for CdcTrigger {
189 fn kind(&self) -> &'static str {
190 "cdc"
191 }
192
193 async fn start(&self, on_event: EventCallback) -> Result<(), TriggerFault> {
194 let client = self
195 .connect_replication()
196 .await
197 .map_err(|e| Self::map_fault(e, self.kind_hash))?;
198
199 self.consume_stream(&client, &on_event)
200 .await
201 .map_err(|e| Self::map_fault(e, self.kind_hash))
202 }
203
204 async fn pause(&self) -> Result<(), TriggerFault> {
205 self.paused.store(true, Ordering::Relaxed);
206 Ok(())
207 }
208
209 async fn resume(&self) -> Result<(), TriggerFault> {
210 self.paused.store(false, Ordering::Relaxed);
211 Ok(())
212 }
213
214 async fn stop(&self) -> Result<(), TriggerFault> {
215 self.paused.store(true, Ordering::Relaxed);
216 Ok(())
217 }
218}