sources_postgres/cdc/
capture.rs1use std::collections::BTreeSet;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use futures::stream::BoxStream;
6use pgwire_replication::{ReplicationClient, ReplicationConfig};
7use sources_core::cdc::{AckSink, Change, ChangeCapture};
8use sources_core::{
9 CaptureProvisioning, CoverageReport, QualifiedTable, Result, SnapshotTable, SourceError,
10};
11use sqlx::{PgPool, Row};
12use tokio::sync::OnceCell;
13
14use super::ack::{AckShared, WalAckSink};
15use super::{backfill, publication, stream};
16
17#[derive(Debug, Clone)]
37pub struct WalChangeCapture {
38 config: ReplicationConfig,
39 connection_url: String,
42 admin_pool: Arc<OnceCell<PgPool>>,
48 required_tables: BTreeSet<QualifiedTable>,
51 manage_publication: bool,
54}
55
56impl WalChangeCapture {
57 pub fn new(config: ReplicationConfig, connection_url: impl Into<String>) -> Self {
64 Self {
65 config,
66 connection_url: connection_url.into(),
67 admin_pool: Arc::new(OnceCell::new()),
68 required_tables: BTreeSet::new(),
69 manage_publication: false,
70 }
71 }
72
73 pub fn with_publication_management(
78 mut self,
79 required: BTreeSet<QualifiedTable>,
80 manage: bool,
81 ) -> Self {
82 self.required_tables = required;
83 self.manage_publication = manage;
84 self
85 }
86
87 async fn admin_pool(&self) -> Result<&PgPool> {
91 self.admin_pool
92 .get_or_try_init(|| async {
93 sqlx::postgres::PgPoolOptions::new()
94 .max_connections(2)
95 .connect(&self.connection_url)
96 .await
97 .map_err(|e| SourceError::Connection(e.to_string()))
98 })
99 .await
100 }
101
102 async fn ensure_slot(&self) -> Result<()> {
109 let pool = self.admin_pool().await?;
110
111 let row = sqlx::query("SELECT plugin FROM pg_replication_slots WHERE slot_name = $1")
112 .bind(&self.config.slot)
113 .fetch_optional(pool)
114 .await
115 .map_err(|e| SourceError::Query(e.to_string()))?;
116
117 match row {
118 Some(row) => {
119 let plugin: String = row
120 .try_get("plugin")
121 .map_err(|e| SourceError::Query(e.to_string()))?;
122 if plugin != "pgoutput" {
123 return Err(SourceError::Connection(format!(
124 "replication slot '{}' exists but uses plugin '{}', expected 'pgoutput'",
125 self.config.slot, plugin,
126 )));
127 }
128 tracing::debug!(slot = %self.config.slot, "replication slot already exists");
129 }
130 None => {
131 sqlx::query("SELECT pg_create_logical_replication_slot($1, 'pgoutput')")
132 .bind(&self.config.slot)
133 .execute(pool)
134 .await
135 .map_err(|e| {
136 SourceError::Connection(format!(
137 "failed to create replication slot '{}': {e}",
138 self.config.slot,
139 ))
140 })?;
141 tracing::info!(slot = %self.config.slot, "created replication slot");
142 }
143 }
144
145 Ok(())
146 }
147}
148
149#[async_trait]
150impl ChangeCapture for WalChangeCapture {
151 #[tracing::instrument(name = "wal.live", skip_all, err)]
152 async fn live(&self) -> Result<BoxStream<'static, Result<Change>>> {
153 self.ensure_slot().await?;
154 self.ensure_coverage(&self.required_tables, self.manage_publication)
155 .await?;
156
157 let client = ReplicationClient::connect(self.config.clone())
158 .await
159 .map_err(|e| SourceError::Connection(e.to_string()))?;
160
161 let ack = Arc::new(AckShared::new(self.config.start_lsn.as_u64()));
162 let sink: Arc<dyn AckSink> = Arc::new(WalAckSink::new(Arc::clone(&ack)));
163 tracing::info!(
164 start_lsn = self.config.start_lsn.as_u64(),
165 "opened replication stream"
166 );
167 Ok(stream::build(client, ack, sink))
168 }
169
170 #[tracing::instrument(name = "wal.snapshot", skip_all, fields(tables = tables.len()), err)]
171 async fn snapshot(
172 &self,
173 tables: &[SnapshotTable],
174 ) -> Result<BoxStream<'static, Result<Change>>> {
175 tracing::info!(tables = tables.len(), "starting snapshot");
176 backfill::snapshot(&self.connection_url, tables).await
177 }
178
179 #[tracing::instrument(name = "wal.lag", skip_all, err)]
183 async fn lag(&self) -> Result<Option<u64>> {
184 let pool = self.admin_pool().await?;
185
186 let row = sqlx::query(
190 "SELECT pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)::bigint AS lag \
191 FROM pg_replication_slots WHERE slot_name = $1",
192 )
193 .bind(&self.config.slot)
194 .fetch_optional(pool)
195 .await
196 .map_err(|e| SourceError::Query(e.to_string()))?;
197
198 let lag = match row {
199 Some(row) => {
200 let bytes: i64 = row
201 .try_get("lag")
202 .map_err(|e| SourceError::Query(e.to_string()))?;
203 Some(bytes.max(0) as u64)
206 }
207 None => None,
208 };
209 Ok(lag)
210 }
211}
212
213#[async_trait]
214impl CaptureProvisioning for WalChangeCapture {
215 async fn inspect_coverage(
216 &self,
217 required: &BTreeSet<QualifiedTable>,
218 ) -> Result<CoverageReport> {
219 let pool = self.admin_pool().await?;
220 publication::inspect_publication(pool, &self.config.publication, required).await
221 }
222
223 #[tracing::instrument(name = "wal.ensure_coverage", skip_all, err)]
224 async fn ensure_coverage(
225 &self,
226 required: &BTreeSet<QualifiedTable>,
227 manage: bool,
228 ) -> Result<CoverageReport> {
229 let pool = self.admin_pool().await?;
230 let report =
231 publication::inspect_publication(pool, &self.config.publication, required).await?;
232
233 if report.satisfied {
234 tracing::debug!(
235 publication = %self.config.publication,
236 "publication covers every required table",
237 );
238 return Ok(report);
239 }
240
241 let missing = report
242 .missing
243 .iter()
244 .map(|table| table.to_string())
245 .collect::<Vec<_>>()
246 .join(", ");
247
248 if manage && report.manageable {
249 publication::apply_publication(pool, &self.config.publication, &report.missing).await?;
250 tracing::info!(
251 publication = %self.config.publication,
252 tables = %missing,
253 "provisioned publication for missing tables",
254 );
255 } else {
256 let reason = if !manage {
257 "automatic publication management is disabled".to_owned()
258 } else {
259 report.blockers.join("; ")
260 };
261 tracing::warn!(
262 publication = %self.config.publication,
263 missing = %missing,
264 reason = %reason,
265 remediation = %report.remediation.join(" "),
266 "publication is missing tables and flusso will not create them automatically; \
267 run the printed SQL to stream every table (changes to missing tables are dropped)",
268 );
269 }
270
271 Ok(report)
272 }
273}