sources_postgres/cdc/capture.rs
1use std::sync::Arc;
2
3use async_trait::async_trait;
4use futures::stream::BoxStream;
5use pgwire_replication::{ReplicationClient, ReplicationConfig};
6use sources_core::cdc::{AckSink, Change, ChangeCapture};
7use sources_core::{Result, SnapshotTable, SourceError};
8use sqlx::{PgPool, Row};
9use tokio::sync::OnceCell;
10
11use super::ack::{AckShared, WalAckSink};
12use super::{backfill, stream};
13
14/// Postgres change capture over logical replication (pgoutput).
15///
16/// Exposes the two [`ChangeCapture`] capabilities the engine orchestrates:
17///
18/// - [`live`](ChangeCapture::live) connects to a replication slot and streams
19/// committed row changes as thin [`Change`]s. Resume is the slot's: its
20/// `confirmed_flush_lsn` is the durable cursor on the server, advanced as the
21/// engine confirms changes (see [`Ack`](sources_core::cdc::Ack)).
22/// - [`snapshot`](ChangeCapture::snapshot) reads current rows over an ordinary
23/// SQL connection for an initial backfill (see the crate-private `backfill`). The engine calls
24/// it only for tables backing an index the sink reports as unseeded.
25///
26/// # Prerequisites
27///
28/// The server must have `wal_level = logical` and the configured **publication**
29/// must already exist and cover every table any index reads from. The replication
30/// **slot** is created automatically on first connect if it does not exist yet.
31#[derive(Debug, Clone)]
32pub struct WalChangeCapture {
33 config: ReplicationConfig,
34 /// Ordinary SQL connection URL, used by [`snapshot`](Self::snapshot) and
35 /// for the automatic slot creation check.
36 connection_url: String,
37 /// A small, lazily-opened SQL pool shared by the slot check and the
38 /// out-of-band [`lag`](Self::lag) polling, so periodic status probes reuse
39 /// connections instead of opening and tearing one down each time. Shared
40 /// across clones (an `Arc`), opened on first use. The bulk snapshot read
41 /// stays on its own connection (see [`snapshot`](Self::snapshot)).
42 admin_pool: Arc<OnceCell<PgPool>>,
43}
44
45impl WalChangeCapture {
46 /// Create a capture from a `pgwire-replication` configuration and the
47 /// ordinary SQL connection URL the snapshot reads through (the same URL the
48 /// document builder connects with).
49 ///
50 /// Leave `config.start_lsn` at [`Lsn::ZERO`](pgwire_replication::Lsn::ZERO)
51 /// to resume from the slot's `confirmed_flush_lsn` — the usual choice.
52 pub fn new(config: ReplicationConfig, connection_url: impl Into<String>) -> Self {
53 Self {
54 config,
55 connection_url: connection_url.into(),
56 admin_pool: Arc::new(OnceCell::new()),
57 }
58 }
59
60 /// The shared admin pool, opened on first call and reused thereafter. Kept
61 /// deliberately small — it serves only the slot check and lag probes, not
62 /// the change or snapshot paths.
63 async fn admin_pool(&self) -> Result<&PgPool> {
64 self.admin_pool
65 .get_or_try_init(|| async {
66 sqlx::postgres::PgPoolOptions::new()
67 .max_connections(2)
68 .connect(&self.connection_url)
69 .await
70 .map_err(|e| SourceError::Connection(e.to_string()))
71 })
72 .await
73 }
74
75 /// Ensure the replication slot exists, creating it if it does not.
76 ///
77 /// Runs over the shared admin pool so the check can run before the
78 /// replication connection is opened. If the slot already exists its plugin
79 /// is validated; a slot with the wrong plugin name is an error (it was
80 /// created for a different consumer and we should not clobber it).
81 async fn ensure_slot(&self) -> Result<()> {
82 let pool = self.admin_pool().await?;
83
84 let row = sqlx::query("SELECT plugin FROM pg_replication_slots WHERE slot_name = $1")
85 .bind(&self.config.slot)
86 .fetch_optional(pool)
87 .await
88 .map_err(|e| SourceError::Query(e.to_string()))?;
89
90 match row {
91 Some(row) => {
92 let plugin: String = row
93 .try_get("plugin")
94 .map_err(|e| SourceError::Query(e.to_string()))?;
95 if plugin != "pgoutput" {
96 return Err(SourceError::Connection(format!(
97 "replication slot '{}' exists but uses plugin '{}', expected 'pgoutput'",
98 self.config.slot, plugin,
99 )));
100 }
101 tracing::debug!(slot = %self.config.slot, "replication slot already exists");
102 }
103 None => {
104 sqlx::query("SELECT pg_create_logical_replication_slot($1, 'pgoutput')")
105 .bind(&self.config.slot)
106 .execute(pool)
107 .await
108 .map_err(|e| {
109 SourceError::Connection(format!(
110 "failed to create replication slot '{}': {e}",
111 self.config.slot,
112 ))
113 })?;
114 tracing::info!(slot = %self.config.slot, "created replication slot");
115 }
116 }
117
118 Ok(())
119 }
120}
121
122#[async_trait]
123impl ChangeCapture for WalChangeCapture {
124 #[tracing::instrument(name = "wal.live", skip_all, err)]
125 async fn live(&self) -> Result<BoxStream<'static, Result<Change>>> {
126 self.ensure_slot().await?;
127
128 let client = ReplicationClient::connect(self.config.clone())
129 .await
130 .map_err(|e| SourceError::Connection(e.to_string()))?;
131
132 let ack = Arc::new(AckShared::new(self.config.start_lsn.as_u64()));
133 let sink: Arc<dyn AckSink> = Arc::new(WalAckSink::new(Arc::clone(&ack)));
134 tracing::info!(
135 start_lsn = self.config.start_lsn.as_u64(),
136 "opened replication stream"
137 );
138 Ok(stream::build(client, ack, sink))
139 }
140
141 #[tracing::instrument(name = "wal.snapshot", skip_all, fields(tables = tables.len()), err)]
142 async fn snapshot(
143 &self,
144 tables: &[SnapshotTable],
145 ) -> Result<BoxStream<'static, Result<Change>>> {
146 tracing::info!(tables = tables.len(), "starting snapshot");
147 backfill::snapshot(&self.connection_url, tables).await
148 }
149
150 /// Bytes between the slot's `confirmed_flush_lsn` and the server's current
151 /// WAL LSN — how far behind the destination is. Returns `None` until the
152 /// slot exists (it is created on the first [`live`](Self::live) connect).
153 #[tracing::instrument(name = "wal.lag", skip_all, err)]
154 async fn lag(&self) -> Result<Option<u64>> {
155 let pool = self.admin_pool().await?;
156
157 // `pg_wal_lsn_diff` yields a numeric byte distance; cast to bigint so it
158 // decodes as an integer. A slot whose consumer is fully caught up reads
159 // zero; a never-connected slot has no row, hence `Option`.
160 let row = sqlx::query(
161 "SELECT pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)::bigint AS lag \
162 FROM pg_replication_slots WHERE slot_name = $1",
163 )
164 .bind(&self.config.slot)
165 .fetch_optional(pool)
166 .await
167 .map_err(|e| SourceError::Query(e.to_string()))?;
168
169 let lag = match row {
170 Some(row) => {
171 let bytes: i64 = row
172 .try_get("lag")
173 .map_err(|e| SourceError::Query(e.to_string()))?;
174 // A negative diff (slot momentarily ahead of the read LSN) clamps
175 // to zero — there is no meaningful "negative lag".
176 Some(bytes.max(0) as u64)
177 }
178 None => None,
179 };
180 Ok(lag)
181 }
182}