Skip to main content

sources_postgres/cdc/
capture.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use futures::stream::BoxStream;
5use pgwire_replication::{ReplicationClient, ReplicationConfig};
6use sources_core::cdc::{AckSink, Change, ChangeCapture};
7use sources_core::{Result, SnapshotTable, SourceError};
8use sqlx::{PgPool, Row};
9use tokio::sync::OnceCell;
10
11use super::ack::{AckShared, WalAckSink};
12use super::{backfill, stream};
13
14/// Postgres change capture over logical replication (pgoutput).
15///
16/// Exposes the two [`ChangeCapture`] capabilities the engine orchestrates:
17///
18/// - [`live`](ChangeCapture::live) connects to a replication slot and streams
19///   committed row changes as thin [`Change`]s. Resume is the slot's: its
20///   `confirmed_flush_lsn` is the durable cursor on the server, advanced as the
21///   engine confirms changes (see [`Ack`](sources_core::cdc::Ack)).
22/// - [`snapshot`](ChangeCapture::snapshot) reads current rows over an ordinary
23///   SQL connection for an initial backfill (see the crate-private `backfill`). The engine calls
24///   it only for tables backing an index the sink reports as unseeded.
25///
26/// # Prerequisites
27///
28/// The server must have `wal_level = logical` and the configured **publication**
29/// must already exist and cover every table any index reads from. The replication
30/// **slot** is created automatically on first connect if it does not exist yet.
31#[derive(Debug, Clone)]
32pub struct WalChangeCapture {
33    config: ReplicationConfig,
34    /// Ordinary SQL connection URL, used by [`snapshot`](Self::snapshot) and
35    /// for the automatic slot creation check.
36    connection_url: String,
37    /// A small, lazily-opened SQL pool shared by the slot check and the
38    /// out-of-band [`lag`](Self::lag) polling, so periodic status probes reuse
39    /// connections instead of opening and tearing one down each time. Shared
40    /// across clones (an `Arc`), opened on first use. The bulk snapshot read
41    /// stays on its own connection (see [`snapshot`](Self::snapshot)).
42    admin_pool: Arc<OnceCell<PgPool>>,
43}
44
45impl WalChangeCapture {
46    /// Create a capture from a `pgwire-replication` configuration and the
47    /// ordinary SQL connection URL the snapshot reads through (the same URL the
48    /// document builder connects with).
49    ///
50    /// Leave `config.start_lsn` at [`Lsn::ZERO`](pgwire_replication::Lsn::ZERO)
51    /// to resume from the slot's `confirmed_flush_lsn` — the usual choice.
52    pub fn new(config: ReplicationConfig, connection_url: impl Into<String>) -> Self {
53        Self {
54            config,
55            connection_url: connection_url.into(),
56            admin_pool: Arc::new(OnceCell::new()),
57        }
58    }
59
60    /// The shared admin pool, opened on first call and reused thereafter. Kept
61    /// deliberately small — it serves only the slot check and lag probes, not
62    /// the change or snapshot paths.
63    async fn admin_pool(&self) -> Result<&PgPool> {
64        self.admin_pool
65            .get_or_try_init(|| async {
66                sqlx::postgres::PgPoolOptions::new()
67                    .max_connections(2)
68                    .connect(&self.connection_url)
69                    .await
70                    .map_err(|e| SourceError::Connection(e.to_string()))
71            })
72            .await
73    }
74
75    /// Ensure the replication slot exists, creating it if it does not.
76    ///
77    /// Runs over the shared admin pool so the check can run before the
78    /// replication connection is opened. If the slot already exists its plugin
79    /// is validated; a slot with the wrong plugin name is an error (it was
80    /// created for a different consumer and we should not clobber it).
81    async fn ensure_slot(&self) -> Result<()> {
82        let pool = self.admin_pool().await?;
83
84        let row = sqlx::query("SELECT plugin FROM pg_replication_slots WHERE slot_name = $1")
85            .bind(&self.config.slot)
86            .fetch_optional(pool)
87            .await
88            .map_err(|e| SourceError::Query(e.to_string()))?;
89
90        match row {
91            Some(row) => {
92                let plugin: String = row
93                    .try_get("plugin")
94                    .map_err(|e| SourceError::Query(e.to_string()))?;
95                if plugin != "pgoutput" {
96                    return Err(SourceError::Connection(format!(
97                        "replication slot '{}' exists but uses plugin '{}', expected 'pgoutput'",
98                        self.config.slot, plugin,
99                    )));
100                }
101                tracing::debug!(slot = %self.config.slot, "replication slot already exists");
102            }
103            None => {
104                sqlx::query("SELECT pg_create_logical_replication_slot($1, 'pgoutput')")
105                    .bind(&self.config.slot)
106                    .execute(pool)
107                    .await
108                    .map_err(|e| {
109                        SourceError::Connection(format!(
110                            "failed to create replication slot '{}': {e}",
111                            self.config.slot,
112                        ))
113                    })?;
114                tracing::info!(slot = %self.config.slot, "created replication slot");
115            }
116        }
117
118        Ok(())
119    }
120}
121
122#[async_trait]
123impl ChangeCapture for WalChangeCapture {
124    #[tracing::instrument(name = "wal.live", skip_all, err)]
125    async fn live(&self) -> Result<BoxStream<'static, Result<Change>>> {
126        self.ensure_slot().await?;
127
128        let client = ReplicationClient::connect(self.config.clone())
129            .await
130            .map_err(|e| SourceError::Connection(e.to_string()))?;
131
132        let ack = Arc::new(AckShared::new(self.config.start_lsn.as_u64()));
133        let sink: Arc<dyn AckSink> = Arc::new(WalAckSink::new(Arc::clone(&ack)));
134        tracing::info!(
135            start_lsn = self.config.start_lsn.as_u64(),
136            "opened replication stream"
137        );
138        Ok(stream::build(client, ack, sink))
139    }
140
141    #[tracing::instrument(name = "wal.snapshot", skip_all, fields(tables = tables.len()), err)]
142    async fn snapshot(
143        &self,
144        tables: &[SnapshotTable],
145    ) -> Result<BoxStream<'static, Result<Change>>> {
146        tracing::info!(tables = tables.len(), "starting snapshot");
147        backfill::snapshot(&self.connection_url, tables).await
148    }
149
150    /// Bytes between the slot's `confirmed_flush_lsn` and the server's current
151    /// WAL LSN — how far behind the destination is. Returns `None` until the
152    /// slot exists (it is created on the first [`live`](Self::live) connect).
153    #[tracing::instrument(name = "wal.lag", skip_all, err)]
154    async fn lag(&self) -> Result<Option<u64>> {
155        let pool = self.admin_pool().await?;
156
157        // `pg_wal_lsn_diff` yields a numeric byte distance; cast to bigint so it
158        // decodes as an integer. A slot whose consumer is fully caught up reads
159        // zero; a never-connected slot has no row, hence `Option`.
160        let row = sqlx::query(
161            "SELECT pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)::bigint AS lag \
162             FROM pg_replication_slots WHERE slot_name = $1",
163        )
164        .bind(&self.config.slot)
165        .fetch_optional(pool)
166        .await
167        .map_err(|e| SourceError::Query(e.to_string()))?;
168
169        let lag = match row {
170            Some(row) => {
171                let bytes: i64 = row
172                    .try_get("lag")
173                    .map_err(|e| SourceError::Query(e.to_string()))?;
174                // A negative diff (slot momentarily ahead of the read LSN) clamps
175                // to zero — there is no meaningful "negative lag".
176                Some(bytes.max(0) as u64)
177            }
178            None => None,
179        };
180        Ok(lag)
181    }
182}