Skip to main content

sources_postgres/cdc/
capture.rs

1use std::collections::BTreeSet;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use futures::stream::BoxStream;
6use pgwire_replication::{ReplicationClient, ReplicationConfig};
7use sources_core::cdc::{AckSink, Change, ChangeCapture};
8use sources_core::{
9    CaptureProvisioning, CoverageReport, QualifiedTable, Result, SnapshotTable, SourceError,
10};
11use sqlx::{PgPool, Row};
12use tokio::sync::OnceCell;
13
14use super::ack::{AckShared, WalAckSink};
15use super::{backfill, publication, stream};
16
17/// Postgres change capture over logical replication (pgoutput).
18///
19/// Exposes the two [`ChangeCapture`] capabilities the engine orchestrates:
20///
21/// - [`live`](ChangeCapture::live) connects to a replication slot and streams
22///   committed row changes as thin [`Change`]s. Resume is the slot's: its
23///   `confirmed_flush_lsn` is the durable cursor on the server, advanced as the
24///   engine confirms changes (see [`Ack`](sources_core::cdc::Ack)).
25/// - [`snapshot`](ChangeCapture::snapshot) reads current rows over an ordinary
26///   SQL connection for an initial backfill (see the crate-private `backfill`). The engine calls
27///   it only for tables backing an index the sink reports as unseeded.
28///
29/// # Prerequisites
30///
31/// The server must have `wal_level = logical`. The replication **slot** is
32/// created automatically on first connect if it does not exist yet, and the
33/// **publication** is created/extended to cover every table any index reads —
34/// see [`CaptureProvisioning`] — when the role is privileged enough and
35/// management is not opted out; otherwise flusso warns with the SQL to run.
36#[derive(Debug, Clone)]
37pub struct WalChangeCapture {
38    config: ReplicationConfig,
39    /// Ordinary SQL connection URL, used by [`snapshot`](Self::snapshot) and
40    /// for the automatic slot creation check.
41    connection_url: String,
42    /// A small, lazily-opened SQL pool shared by the slot check and the
43    /// out-of-band [`lag`](Self::lag) polling, so periodic status probes reuse
44    /// connections instead of opening and tearing one down each time. Shared
45    /// across clones (an `Arc`), opened on first use. The bulk snapshot read
46    /// stays on its own connection (see [`snapshot`](Self::snapshot)).
47    admin_pool: Arc<OnceCell<PgPool>>,
48    /// Every table the enabled indexes read — the set the publication must
49    /// cover. Empty unless set via [`with_publication_management`](Self::with_publication_management).
50    required_tables: BTreeSet<QualifiedTable>,
51    /// Whether to auto-create/extend the publication on [`live`](Self::live).
52    /// When false, a coverage gap is only reported, never provisioned.
53    manage_publication: bool,
54}
55
56impl WalChangeCapture {
57    /// Create a capture from a `pgwire-replication` configuration and the
58    /// ordinary SQL connection URL the snapshot reads through (the same URL the
59    /// document builder connects with).
60    ///
61    /// Leave `config.start_lsn` at [`Lsn::ZERO`](pgwire_replication::Lsn::ZERO)
62    /// to resume from the slot's `confirmed_flush_lsn` — the usual choice.
63    pub fn new(config: ReplicationConfig, connection_url: impl Into<String>) -> Self {
64        Self {
65            config,
66            connection_url: connection_url.into(),
67            admin_pool: Arc::new(OnceCell::new()),
68            required_tables: BTreeSet::new(),
69            manage_publication: false,
70        }
71    }
72
73    /// Declare the tables the publication must cover and whether to provision
74    /// the gap automatically on [`live`](Self::live). `required` is typically
75    /// [`SourceSpec::all_tables`](sources_core::SourceSpec::all_tables); the
76    /// composition root supplies it along with the `manage` opt-out.
77    pub fn with_publication_management(
78        mut self,
79        required: BTreeSet<QualifiedTable>,
80        manage: bool,
81    ) -> Self {
82        self.required_tables = required;
83        self.manage_publication = manage;
84        self
85    }
86
87    /// The shared admin pool, opened on first call and reused thereafter. Kept
88    /// deliberately small — it serves only the slot check and lag probes, not
89    /// the change or snapshot paths.
90    async fn admin_pool(&self) -> Result<&PgPool> {
91        self.admin_pool
92            .get_or_try_init(|| async {
93                sqlx::postgres::PgPoolOptions::new()
94                    .max_connections(2)
95                    .connect(&self.connection_url)
96                    .await
97                    .map_err(|e| SourceError::Connection(e.to_string()))
98            })
99            .await
100    }
101
102    /// Ensure the replication slot exists, creating it if it does not.
103    ///
104    /// Runs over the shared admin pool so the check can run before the
105    /// replication connection is opened. If the slot already exists its plugin
106    /// is validated; a slot with the wrong plugin name is an error (it was
107    /// created for a different consumer and we should not clobber it).
108    async fn ensure_slot(&self) -> Result<()> {
109        let pool = self.admin_pool().await?;
110
111        let row = sqlx::query("SELECT plugin FROM pg_replication_slots WHERE slot_name = $1")
112            .bind(&self.config.slot)
113            .fetch_optional(pool)
114            .await
115            .map_err(|e| SourceError::Query(e.to_string()))?;
116
117        match row {
118            Some(row) => {
119                let plugin: String = row
120                    .try_get("plugin")
121                    .map_err(|e| SourceError::Query(e.to_string()))?;
122                if plugin != "pgoutput" {
123                    return Err(SourceError::Connection(format!(
124                        "replication slot '{}' exists but uses plugin '{}', expected 'pgoutput'",
125                        self.config.slot, plugin,
126                    )));
127                }
128                tracing::debug!(slot = %self.config.slot, "replication slot already exists");
129            }
130            None => {
131                sqlx::query("SELECT pg_create_logical_replication_slot($1, 'pgoutput')")
132                    .bind(&self.config.slot)
133                    .execute(pool)
134                    .await
135                    .map_err(|e| {
136                        SourceError::Connection(format!(
137                            "failed to create replication slot '{}': {e}",
138                            self.config.slot,
139                        ))
140                    })?;
141                tracing::info!(slot = %self.config.slot, "created replication slot");
142            }
143        }
144
145        Ok(())
146    }
147}
148
149#[async_trait]
150impl ChangeCapture for WalChangeCapture {
151    #[tracing::instrument(name = "wal.live", skip_all, err)]
152    async fn live(&self) -> Result<BoxStream<'static, Result<Change>>> {
153        self.ensure_slot().await?;
154        self.ensure_coverage(&self.required_tables, self.manage_publication)
155            .await?;
156
157        let client = ReplicationClient::connect(self.config.clone())
158            .await
159            .map_err(|e| SourceError::Connection(e.to_string()))?;
160
161        let ack = Arc::new(AckShared::new(self.config.start_lsn.as_u64()));
162        let sink: Arc<dyn AckSink> = Arc::new(WalAckSink::new(Arc::clone(&ack)));
163        tracing::info!(
164            start_lsn = self.config.start_lsn.as_u64(),
165            "opened replication stream"
166        );
167        Ok(stream::build(client, ack, sink))
168    }
169
170    #[tracing::instrument(name = "wal.snapshot", skip_all, fields(tables = tables.len()), err)]
171    async fn snapshot(
172        &self,
173        tables: &[SnapshotTable],
174    ) -> Result<BoxStream<'static, Result<Change>>> {
175        tracing::info!(tables = tables.len(), "starting snapshot");
176        backfill::snapshot(&self.connection_url, tables).await
177    }
178
179    /// Bytes between the slot's `confirmed_flush_lsn` and the server's current
180    /// WAL LSN — how far behind the destination is. Returns `None` until the
181    /// slot exists (it is created on the first [`live`](Self::live) connect).
182    #[tracing::instrument(name = "wal.lag", skip_all, err)]
183    async fn lag(&self) -> Result<Option<u64>> {
184        let pool = self.admin_pool().await?;
185
186        // `pg_wal_lsn_diff` yields a numeric byte distance; cast to bigint so it
187        // decodes as an integer. A slot whose consumer is fully caught up reads
188        // zero; a never-connected slot has no row, hence `Option`.
189        let row = sqlx::query(
190            "SELECT pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)::bigint AS lag \
191             FROM pg_replication_slots WHERE slot_name = $1",
192        )
193        .bind(&self.config.slot)
194        .fetch_optional(pool)
195        .await
196        .map_err(|e| SourceError::Query(e.to_string()))?;
197
198        let lag = match row {
199            Some(row) => {
200                let bytes: i64 = row
201                    .try_get("lag")
202                    .map_err(|e| SourceError::Query(e.to_string()))?;
203                // A negative diff (slot momentarily ahead of the read LSN) clamps
204                // to zero — there is no meaningful "negative lag".
205                Some(bytes.max(0) as u64)
206            }
207            None => None,
208        };
209        Ok(lag)
210    }
211}
212
213#[async_trait]
214impl CaptureProvisioning for WalChangeCapture {
215    async fn inspect_coverage(
216        &self,
217        required: &BTreeSet<QualifiedTable>,
218    ) -> Result<CoverageReport> {
219        let pool = self.admin_pool().await?;
220        publication::inspect_publication(pool, &self.config.publication, required).await
221    }
222
223    #[tracing::instrument(name = "wal.ensure_coverage", skip_all, err)]
224    async fn ensure_coverage(
225        &self,
226        required: &BTreeSet<QualifiedTable>,
227        manage: bool,
228    ) -> Result<CoverageReport> {
229        let pool = self.admin_pool().await?;
230        let report =
231            publication::inspect_publication(pool, &self.config.publication, required).await?;
232
233        if report.satisfied {
234            tracing::debug!(
235                publication = %self.config.publication,
236                "publication covers every required table",
237            );
238            return Ok(report);
239        }
240
241        let missing = report
242            .missing
243            .iter()
244            .map(|table| table.to_string())
245            .collect::<Vec<_>>()
246            .join(", ");
247
248        if manage && report.manageable {
249            publication::apply_publication(pool, &self.config.publication, &report.missing).await?;
250            tracing::info!(
251                publication = %self.config.publication,
252                tables = %missing,
253                "provisioned publication for missing tables",
254            );
255        } else {
256            let reason = if !manage {
257                "automatic publication management is disabled".to_owned()
258            } else {
259                report.blockers.join("; ")
260            };
261            tracing::warn!(
262                publication = %self.config.publication,
263                missing = %missing,
264                reason = %reason,
265                remediation = %report.remediation.join(" "),
266                "publication is missing tables and flusso will not create them automatically; \
267                 run the printed SQL to stream every table (changes to missing tables are dropped)",
268            );
269        }
270
271        Ok(report)
272    }
273}