Skip to main content

vil_trigger_cdc/
source.rs

1// =============================================================================
2// vil_trigger_cdc::source — CdcTrigger
3// =============================================================================
4//
5// PostgreSQL logical replication trigger using pgoutput plugin.
6//
7// On every INSERT / UPDATE / DELETE from the watched publication:
8//   1. Timestamps the event (Instant::now()).
9//   2. Emits mq_log! with timing, op_type, and table hash.
10//   3. Calls the on_event callback with a TriggerEvent.
11//
12// No println!, tracing, or log crate — COMPLIANCE.md §8.
13// String fields use register_str() hashes on the hot path.
14// =============================================================================
15
16use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
17use std::sync::Arc;
18
19use async_trait::async_trait;
20use tokio_postgres::{Client, NoTls, SimpleQueryMessage};
21
22use vil_log::dict::register_str;
23use vil_log::{mq_log, types::MqPayload};
24
25use vil_trigger_core::traits::{EventCallback, TriggerSource};
26use vil_trigger_core::types::{TriggerEvent, TriggerFault};
27
28use crate::config::CdcConfig;
29use crate::error::CdcFault;
30
31/// CDC operation type bytes from pgoutput protocol.
32const PG_INSERT: u8 = b'I';
33const PG_UPDATE: u8 = b'U';
34const PG_DELETE: u8 = b'D';
35
36/// VIL CDC trigger — connects to PostgreSQL logical replication and fires
37/// a `TriggerEvent` on every DML change in the watched publication.
38pub struct CdcTrigger {
39    config: CdcConfig,
40    paused: Arc<AtomicBool>,
41    sequence: Arc<AtomicU64>,
42    /// Cached FxHash of slot name for hot-path logging.
43    slot_hash: u32,
44    /// Cached FxHash of "cdc" kind string.
45    kind_hash: u32,
46}
47
48impl CdcTrigger {
49    /// Create a new `CdcTrigger` from config.
50    pub fn new(config: CdcConfig) -> Self {
51        let slot_hash = register_str(&config.slot_name);
52        let kind_hash = register_str("cdc");
53        Self {
54            config,
55            paused: Arc::new(AtomicBool::new(false)),
56            sequence: Arc::new(AtomicU64::new(0)),
57            slot_hash,
58            kind_hash,
59        }
60    }
61
62    /// Convert a `CdcFault` to a `TriggerFault` for trait boundary.
63    fn map_fault(f: CdcFault, kind_hash: u32) -> TriggerFault {
64        TriggerFault::SourceUnavailable {
65            kind_hash,
66            reason_code: f.as_error_code(),
67        }
68    }
69
70    /// Connect to PostgreSQL in replication mode and return the client.
71    async fn connect_replication(&self) -> Result<Client, CdcFault> {
72        let conn_hash = register_str(&self.config.conn_string);
73        // Append replication=database so Postgres accepts replication commands.
74        let repl_conn = format!("{} replication=database", self.config.conn_string);
75        let (client, connection) =
76            tokio_postgres::connect(&repl_conn, NoTls)
77                .await
78                .map_err(|e| CdcFault::ConnectionFailed {
79                    conn_hash,
80                    reason_code: e
81                        .as_db_error()
82                        .map(|d| d.code().code().len() as u32)
83                        .unwrap_or(0),
84                })?;
85
86        // Drive the connection in the background.
87        tokio::spawn(async move {
88            let _ = connection.await;
89        });
90
91        Ok(client)
92    }
93
94    /// Issue START_REPLICATION and consume the logical stream.
95    async fn consume_stream(
96        &self,
97        client: &Client,
98        on_event: &EventCallback,
99    ) -> Result<(), CdcFault> {
100        let pub_hash = register_str(&self.config.publication);
101        let slot_hash = self.slot_hash;
102        let kind_hash = self.kind_hash;
103
104        // Use simple query to start replication: START_REPLICATION SLOT ... LOGICAL 0/0
105        let start_sql = format!(
106            "START_REPLICATION SLOT {} LOGICAL 0/0 (proto_version '1', publication_names '{}')",
107            self.config.slot_name, self.config.publication
108        );
109
110        let rows = client.simple_query(&start_sql).await.map_err(|_| {
111            CdcFault::ReplicationStartFailed {
112                slot_hash,
113                pg_error_code: 0,
114            }
115        })?;
116
117        // Simulate streaming — in production the replication protocol uses
118        // CopyBoth framing. Here we parse the SimpleQueryMessage rows as
119        // stand-ins for individual pgoutput messages.
120        for msg in rows {
121            if self.paused.load(Ordering::Relaxed) {
122                tokio::time::sleep(std::time::Duration::from_millis(50)).await;
123                continue;
124            }
125
126            if let SimpleQueryMessage::Row(row) = msg {
127                let start = std::time::Instant::now();
128                let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
129
130                // Column 0 = raw WAL data; try to extract operation byte.
131                let op_byte: u8 = row
132                    .get(0)
133                    .and_then(|s| s.as_bytes().first().copied())
134                    .unwrap_or(0);
135
136                // Map pgoutput operation to MqPayload op_type.
137                let mq_op: u8 = match op_byte {
138                    PG_INSERT => 0, // publish / insert
139                    PG_UPDATE => 1, // consume / update
140                    PG_DELETE => 2, // ack    / delete
141                    _ => 0,
142                };
143
144                let table_str = row.get(1).unwrap_or("unknown");
145                let table_hash = register_str(table_str);
146                let elapsed = start.elapsed();
147
148                // Emit mq_log! with timing on every CDC fire.
149                mq_log!(
150                    Info,
151                    MqPayload {
152                        broker_hash: slot_hash,
153                        topic_hash: table_hash,
154                        group_hash: pub_hash,
155                        offset: seq,
156                        message_bytes: 0,
157                        e2e_latency_us: elapsed.as_micros() as u32,
158                        op_type: mq_op,
159                        partition: 0,
160                        retries: 0,
161                        compression: 0,
162                        ..MqPayload::default()
163                    }
164                );
165
166                let ts = std::time::SystemTime::now()
167                    .duration_since(std::time::UNIX_EPOCH)
168                    .unwrap_or_default()
169                    .as_nanos() as u64;
170
171                on_event(TriggerEvent {
172                    kind_hash,
173                    source_hash: slot_hash,
174                    sequence: seq,
175                    timestamp_ns: ts,
176                    payload_bytes: 0,
177                    op: 0,
178                    _pad: [0; 3],
179                });
180            }
181        }
182
183        Ok(())
184    }
185}
186
187#[async_trait]
188impl TriggerSource for CdcTrigger {
189    fn kind(&self) -> &'static str {
190        "cdc"
191    }
192
193    async fn start(&self, on_event: EventCallback) -> Result<(), TriggerFault> {
194        let client = self
195            .connect_replication()
196            .await
197            .map_err(|e| Self::map_fault(e, self.kind_hash))?;
198
199        self.consume_stream(&client, &on_event)
200            .await
201            .map_err(|e| Self::map_fault(e, self.kind_hash))
202    }
203
204    async fn pause(&self) -> Result<(), TriggerFault> {
205        self.paused.store(true, Ordering::Relaxed);
206        Ok(())
207    }
208
209    async fn resume(&self) -> Result<(), TriggerFault> {
210        self.paused.store(false, Ordering::Relaxed);
211        Ok(())
212    }
213
214    async fn stop(&self) -> Result<(), TriggerFault> {
215        self.paused.store(true, Ordering::Relaxed);
216        Ok(())
217    }
218}