Skip to main content

faucet_source_postgres_cdc/
stream.rs

1//! `PostgresCdcSource` — public `Source` implementation.
2
3use crate::config::PostgresCdcSourceConfig;
4use crate::pgoutput::decoder::decode_message;
5use crate::pgoutput::messages::{
6    Delete, Insert, Message, Relation, Truncate, TupleCell, TupleData, Update,
7};
8use crate::pgoutput::registry::RelationRegistry;
9use crate::pgoutput::values::text_to_json;
10use crate::replication::{
11    self, ReplicationEvent, ReplicationParams, postgres_clock_to_unix_ms, recv, send_status_update,
12};
13use crate::state::{Bookmark, format_lsn, parse_lsn, state_key};
14use async_trait::async_trait;
15use faucet_core::{FaucetError, Source, Stream, StreamPage};
16use serde_json::{Map, Value, json};
17use std::collections::HashMap;
18use std::pin::Pin;
19use std::time::{Duration, Instant};
20use tokio::sync::Mutex;
21
22pub struct PostgresCdcSource {
23    config: PostgresCdcSourceConfig,
24    state_key_value: String,
25    /// Bookmark provided by `apply_start_bookmark`, applied at the start of
26    /// the next fetch cycle. Becomes the new "confirmed_flush_lsn" we
27    /// advertise to Postgres.
28    pending_bookmark: Mutex<Option<Bookmark>>,
29    /// Last LSN we have told Postgres is durable (advertised as the slot's
30    /// `confirmed_flush_lsn`, which authorises WAL recycling up to it).
31    ///
32    /// Advanced **only** by `apply_start_bookmark` — i.e. after the pipeline
33    /// has durably persisted a bookmark — or seeded from `start_lsn` config.
34    /// It is never advanced from decoded WAL (commit-decode time) or from
35    /// keepalive `wal_end`, because those positions are not yet durable
36    /// downstream; doing so would let Postgres discard WAL for unwritten
37    /// changes and lose data on a crash (#78/#1).
38    confirmed_lsn: Mutex<u64>,
39}
40
41impl PostgresCdcSource {
42    pub async fn new(config: PostgresCdcSourceConfig) -> Result<Self, FaucetError> {
43        config.validate()?;
44        let key = state_key(&config.slot_name);
45        let initial_lsn = match config.start_lsn.as_deref() {
46            Some(s) => parse_lsn(s)?,
47            None => 0,
48        };
49        Ok(Self {
50            config,
51            state_key_value: key,
52            pending_bookmark: Mutex::new(None),
53            confirmed_lsn: Mutex::new(initial_lsn),
54        })
55    }
56
57    /// Drop this source's replication slot on the server, freeing the WAL it
58    /// pins. A no-op if the slot doesn't exist; errors if the slot is still
59    /// active (in use by a live replication connection). Call this when
60    /// decommissioning a `permanent` slot so it doesn't leak WAL (#78/#12).
61    pub async fn drop_slot(&self) -> Result<(), FaucetError> {
62        replication::drop_slot(&self.config.connection_url, &self.config.slot_name).await
63    }
64}
65
66#[async_trait]
67impl Source for PostgresCdcSource {
68    async fn fetch_with_context(
69        &self,
70        ctx: &HashMap<String, Value>,
71    ) -> Result<Vec<Value>, FaucetError> {
72        let (records, _bookmark) = self.fetch_with_context_incremental(ctx).await?;
73        Ok(records)
74    }
75
76    /// Drain the replication stream into a single `Vec<Value>` plus the
77    /// bookmark of the most recent COMMIT.
78    ///
79    /// Implemented by collecting [`Source::stream_pages`] with the
80    /// `batch_size = 0` sentinel — that sentinel coalesces every transaction
81    /// in the run window into a single trailing page, which exactly matches
82    /// the historical `fetch_with_context_incremental` contract (one
83    /// aggregated buffer, one max-LSN bookmark). The streaming pipeline
84    /// (`Pipeline::run` / `run_stream`) drives `stream_pages` directly with
85    /// the per-source `batch_size` config field instead so it gets
86    /// per-transaction durability.
87    async fn fetch_with_context_incremental(
88        &self,
89        ctx: &HashMap<String, Value>,
90    ) -> Result<(Vec<Value>, Option<Value>), FaucetError> {
91        use futures::StreamExt;
92        let mut pages = self.stream_pages_with_batch_size(ctx, 0);
93        let mut all: Vec<Value> = Vec::new();
94        let mut bookmark: Option<Value> = None;
95        while let Some(page) = pages.next().await {
96            let page = page?;
97            all.extend(page.records);
98            if page.bookmark.is_some() {
99                bookmark = page.bookmark;
100            }
101        }
102        Ok((all, bookmark))
103    }
104
105    /// Per-transaction streaming.
106    ///
107    /// Each committed transaction is emitted as its own
108    /// [`StreamPage`] with `bookmark = Some(commit_lsn)`. Because the
109    /// pipeline flushes the sink and persists the bookmark on every page
110    /// that carries one, a mid-stream crash recovers from the last fully-
111    /// committed transaction with no partial-transaction leakage.
112    ///
113    /// **Atomic transactions.** A transaction is never split across pages.
114    /// If a single transaction's record count exceeds
115    /// [`PostgresCdcSourceConfig::batch_size`] it is still emitted as one
116    /// page; `batch_size` is advisory.
117    ///
118    /// **`batch_size = 0`** is the "no batching" sentinel: every committed
119    /// transaction during the run window is accumulated into a single
120    /// trailing page with `bookmark = max(commit_lsn)`. This negates
121    /// per-transaction durability and is only useful for tests / initial
122    /// snapshot runs.
123    ///
124    /// The trait-level `batch_size` argument is intentionally ignored in
125    /// favour of the config field (matches the convention used by the
126    /// query-mode postgres source and the rest source).
127    fn stream_pages<'a>(
128        &'a self,
129        ctx: &'a HashMap<String, Value>,
130        _batch_size: usize,
131    ) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>> {
132        self.stream_pages_with_batch_size(ctx, self.config.batch_size)
133    }
134
135    fn config_schema(&self) -> Value {
136        let schema = schemars::schema_for!(PostgresCdcSourceConfig);
137        serde_json::to_value(&schema).unwrap_or(Value::Null)
138    }
139
140    fn state_key(&self) -> Option<String> {
141        Some(self.state_key_value.clone())
142    }
143
144    async fn apply_start_bookmark(&self, bookmark: Value) -> Result<(), FaucetError> {
145        let parsed = Bookmark::from_value(bookmark)?;
146        // Update confirmed_lsn so the next initial status update (in the next
147        // fetch cycle) advertises the correct position.
148        *self.confirmed_lsn.lock().await = parsed.as_u64()?;
149        *self.pending_bookmark.lock().await = Some(parsed);
150        Ok(())
151    }
152
153    fn connector_name(&self) -> &'static str {
154        "postgres-cdc"
155    }
156
157    /// Preflight probe that does **not** start replication.
158    ///
159    /// The default [`Source::check`] would call `stream_pages`, which opens the
160    /// replication stream and consumes/holds WAL (a side effect that pins server
161    /// resources) — unacceptable as a preflight. Instead we open a *normal*
162    /// (non-replication) SQL connection — the same connection style
163    /// `ensure_slot` uses — and inspect the slot catalog without touching the
164    /// replication protocol:
165    ///
166    /// - connection fails → `auth` probe `Fail` (could not connect / authenticate),
167    /// - connected but the slot row is absent → `slot` probe `Skip`
168    ///   (faucet can create it on the first run),
169    /// - slot present → `slot` probe `Pass`.
170    ///
171    /// The whole call is bounded by `ctx.timeout`.
172    async fn check(
173        &self,
174        ctx: &faucet_core::check::CheckContext,
175    ) -> Result<faucet_core::check::CheckReport, FaucetError> {
176        use faucet_core::check::{CheckReport, Probe};
177        use sqlx::ConnectOptions as _;
178        use sqlx::postgres::{PgConnectOptions, PgConnection};
179
180        let start = std::time::Instant::now();
181
182        // A bad connection URL is a config error, not an unreachable server.
183        let opts: PgConnectOptions = match self.config.connection_url.parse() {
184            Ok(o) => o,
185            Err(e) => {
186                return Ok(CheckReport::single(Probe::fail_hint(
187                    "auth",
188                    start.elapsed(),
189                    format!("invalid connection URL: {e}"),
190                    "connection_url must be a valid postgres:// URL",
191                )));
192            }
193        };
194
195        // Bound the whole connect+query under ctx.timeout so an unreachable
196        // host doesn't hang the probe.
197        let probe = async {
198            let mut conn: PgConnection = opts.connect().await.map_err(|e| {
199                Probe::fail_hint(
200                    "auth",
201                    start.elapsed(),
202                    format!("could not connect: {e}"),
203                    "verify the host is reachable and credentials are valid",
204                )
205            })?;
206
207            let row: Option<(String,)> = sqlx::query_as(
208                "SELECT slot_name::text FROM pg_replication_slots WHERE slot_name = $1",
209            )
210            .bind(&self.config.slot_name)
211            .fetch_optional(&mut conn)
212            .await
213            .map_err(|e| {
214                Probe::fail(
215                    "slot",
216                    start.elapsed(),
217                    format!("could not query pg_replication_slots: {e}"),
218                )
219            })?;
220
221            Ok::<Probe, Probe>(match row {
222                Some(_) => Probe::pass("slot", start.elapsed()),
223                None => Probe::skip(
224                    "slot",
225                    format!(
226                        "replication slot {} does not exist yet (faucet run can create it)",
227                        self.config.slot_name
228                    ),
229                ),
230            })
231        };
232
233        let probe = match tokio::time::timeout(ctx.timeout, probe).await {
234            Ok(Ok(p)) | Ok(Err(p)) => p,
235            Err(_elapsed) => Probe::fail_hint(
236                "auth",
237                start.elapsed(),
238                "connection timed out",
239                "the database did not respond within the check timeout",
240            ),
241        };
242        Ok(CheckReport::single(probe))
243    }
244}
245
246impl PostgresCdcSource {
247    /// Shared streaming implementation used by both `stream_pages` (per-
248    /// transaction page emission when `batch_size > 0`) and the legacy
249    /// `fetch_with_context_incremental` (single-page aggregation when
250    /// `batch_size == 0`).
251    ///
252    /// The slot lifecycle (`connect` → `ensure_slot` → `start_replication`)
253    /// and Standby Status Update bootstrap are identical to the pre-Plan-15
254    /// behaviour. The only difference is when records cross the page
255    /// boundary: on every COMMIT (`batch_size > 0`) or once at the end of
256    /// the run window (`batch_size == 0`).
257    fn stream_pages_with_batch_size<'a>(
258        &'a self,
259        _ctx: &'a HashMap<String, Value>,
260        batch_size: usize,
261    ) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>> {
262        let max_messages = self.config.max_messages.unwrap_or(usize::MAX);
263        let idle_timeout = self.config.idle_timeout;
264        let per_transaction = batch_size != 0;
265
266        Box::pin(async_stream::try_stream! {
267            // 1. Resolve start_lsn for THIS fetch cycle.
268            let pending = {
269                let mut g = self.pending_bookmark.lock().await;
270                g.take()
271            };
272            let start_lsn = if let Some(b) = pending.as_ref() {
273                let lsn = b.as_u64()?;
274                *self.confirmed_lsn.lock().await = lsn;
275                Some(lsn)
276            } else {
277                self.config
278                    .start_lsn
279                    .as_deref()
280                    .map(parse_lsn)
281                    .transpose()?
282            };
283
284            // 2. Open replication connection + ensure slot + START_REPLICATION.
285            let params = ReplicationParams {
286                connection_url: &self.config.connection_url,
287                slot_name: &self.config.slot_name,
288                publication_name: &self.config.publication_name,
289                proto_version: self.config.proto_version,
290                create_slot_if_missing: self.config.create_slot_if_missing,
291                start_lsn,
292                status_update_interval: self.config.status_update_interval,
293                tcp_keepalive: self.config.tcp_keepalive,
294                slot_type: self.config.slot_type,
295                tls: &self.config.tls,
296            };
297            let client = replication::connect(&params).await?;
298            replication::ensure_slot(
299                &client,
300                &self.config.connection_url,
301                &self.config.slot_name,
302                self.config.create_slot_if_missing,
303                self.config.slot_type,
304            )
305            .await?;
306
307            // Advance the slot's confirmed_flush_lsn to the durable resume
308            // point BEFORE opening the stream. For a logical slot,
309            // START_REPLICATION resumes decoding from confirmed_flush_lsn (the
310            // client-supplied start LSN does not skip already-committed
311            // transactions), so this is the only way to skip changes we have
312            // already consumed and durably persisted. `start_lsn` is set only
313            // from a durable bookmark (apply_start_bookmark) or the start_lsn
314            // config override; when it is `None` (no durable resume point) we
315            // do NOT advance, so an interrupted run with no persisted bookmark
316            // redelivers rather than loses data (#78/#1).
317            if let Some(lsn) = start_lsn {
318                // Both the slot advance and START_REPLICATION require the slot
319                // to be inactive; on a rapid re-run the prior connection may not
320                // have released it yet ("slot is active"). Retry with bounded
321                // backoff instead of failing the whole run (#146 M12).
322                replication::retry_on_slot_active(self.config.slot_acquire_retries, || {
323                    replication::advance_slot(
324                        &self.config.connection_url,
325                        &self.config.slot_name,
326                        lsn,
327                    )
328                })
329                .await?;
330            }
331
332            let mut duplex = replication::retry_on_slot_active(
333                self.config.slot_acquire_retries,
334                || replication::start_replication(&client, &params),
335            )
336            .await?;
337
338            // Send an initial Standby Status Update advertising the same
339            // durable position so the server's bookkeeping is consistent.
340            let initial_confirmed = *self.confirmed_lsn.lock().await;
341            send_status_update(&mut duplex, initial_confirmed, false).await?;
342
343            // 4. Drain the replication stream until idle_timeout, ctrl_c, or
344            //    max_messages. Per-transaction mode (`batch_size > 0`) emits
345            //    one page per COMMIT carrying `bookmark = Some(commit_lsn)`.
346            //    Aggregated mode (`batch_size == 0`) accumulates every
347            //    transaction's records into one buffer and emits a single
348            //    trailing page with `bookmark = max(commit_lsn)`.
349            let mut registry = RelationRegistry::new();
350            let mut state = TxnState {
351                max_staged_records: self.config.max_staged_records,
352                ..TxnState::default()
353            };
354            let mut agg_records: Vec<Value> = Vec::new();
355            let mut total_records: usize = 0;
356            let mut last_message_at = Instant::now();
357
358            loop {
359                let idle_deadline = last_message_at + idle_timeout;
360                let budget = idle_deadline
361                    .checked_duration_since(Instant::now())
362                    .unwrap_or(Duration::ZERO);
363
364                // Tracks per-iteration outcomes that we cannot propagate
365                // directly from inside `tokio::select!` arms while remaining
366                // inside `try_stream!`.
367                let mut stop = false;
368                let mut just_committed: Option<(u64, Vec<Value>)> = None;
369                let mut fatal: Option<FaucetError> = None;
370                let mut unexpected_end = false;
371                tokio::select! {
372                    biased;
373                    _ = tokio::signal::ctrl_c() => {
374                        tracing::info!("postgres-cdc: ctrl_c received, stopping cleanly");
375                        stop = true;
376                    }
377                    ev = tokio::time::timeout(budget, recv(&mut duplex)) => {
378                        match ev {
379                            Ok(Ok(Some(event))) => {
380                                // Reset on any server activity, not just committed
381                                // output, so idle_timeout never fires mid-transaction
382                                // while WAL is still flowing.
383                                last_message_at = Instant::now();
384                                let was_in_txn = state.in_txn;
385                                let pre_commit_count = state.last_committed;
386                                let mut committed_records: Vec<Value> = Vec::new();
387                                if let Err(e) = handle_event(
388                                    event,
389                                    &mut registry,
390                                    &mut state,
391                                    &mut committed_records,
392                                ) {
393                                    fatal = Some(e);
394                                } else if was_in_txn
395                                    && !state.in_txn
396                                    && state.last_committed != pre_commit_count
397                                {
398                                    // A COMMIT was just processed and
399                                    // `committed_records` holds the drained
400                                    // staged records.
401                                    let lsn = state.last_committed
402                                        .expect("last_committed set on commit");
403                                    total_records += committed_records.len();
404                                    just_committed = Some((lsn, committed_records));
405                                }
406                            }
407                            Ok(Ok(None)) => {
408                                unexpected_end = true;
409                            }
410                            Ok(Err(e)) => {
411                                fatal = Some(e);
412                            }
413                            Err(_timeout) => {
414                                tracing::debug!(
415                                    "postgres-cdc: idle_timeout reached, stopping"
416                                );
417                                stop = true;
418                            }
419                        }
420                    }
421                }
422
423                if let Some(e) = fatal {
424                    Err(e)?;
425                }
426                if unexpected_end {
427                    Err(FaucetError::Source(
428                        "postgres-cdc: replication stream ended unexpectedly".into(),
429                    ))?;
430                }
431                if let Some((lsn, drained)) = just_committed {
432                    // NOTE: we deliberately do NOT advance `confirmed_lsn` here.
433                    // `confirmed_lsn` is the position advertised to Postgres as
434                    // `confirmed_flush_lsn` (which lets the server recycle WAL),
435                    // and it must only ever reflect data the consumer has
436                    // *durably* persisted. The only durable signal is the
437                    // bookmark the pipeline persists after the sink flush, which
438                    // arrives back via `apply_start_bookmark` at the start of the
439                    // next run. Advancing here at commit-decode time would tell
440                    // Postgres to discard WAL for changes that were never written
441                    // downstream — a crash in that window loses data (#78/#1).
442                    if per_transaction {
443                        let bookmark = Some(Bookmark::from_u64(lsn).to_value()?);
444                        yield StreamPage {
445                            records: drained,
446                            bookmark,
447                        };
448                    } else {
449                        agg_records.extend(drained);
450                    }
451                    if total_records >= max_messages {
452                        stop = true;
453                    }
454                }
455
456                if stop {
457                    break;
458                }
459            }
460
461            // 5. In aggregated mode emit the single trailing page (carrying
462            //    the max LSN seen). In per-transaction mode the trailing
463            //    `state.staged` is an *uncommitted* partial transaction and
464            //    must be dropped — Postgres will redeliver after the next
465            //    START_REPLICATION.
466            if !per_transaction
467                && let Some(lsn) = state.last_committed
468            {
469                let bookmark = Some(Bookmark::from_u64(lsn).to_value()?);
470                yield StreamPage {
471                    records: agg_records,
472                    bookmark,
473                };
474            }
475
476            tracing::info!(
477                records = total_records,
478                batch_size,
479                "postgres-cdc: stream complete",
480            );
481        })
482    }
483}
484
485/// One decoded tuple, split into its values and the names of any unchanged
486/// (large/TOAST) columns whose value the server didn't re-send.
487struct TupleRow {
488    values: Map<String, Value>,
489    unchanged_toast: Vec<String>,
490}
491
492/// In-flight transaction state while draining the replication stream.
493#[derive(Default)]
494struct TxnState {
495    /// Records produced inside the current BEGIN..COMMIT, buffered until
496    /// COMMIT is seen so partial transactions never leak into the output.
497    /// On COMMIT, `handle_event` drains this into the caller-supplied
498    /// `out: &mut Vec<Value>`.
499    staged: Vec<Value>,
500    /// commit_lsn of the most recently fully-applied transaction.
501    last_committed: Option<u64>,
502    /// commit_ts (Postgres epoch micros) of the in-progress transaction,
503    /// set by BEGIN.
504    in_progress_ts: i64,
505    /// commit_lsn announced by the in-progress BEGIN (== final_lsn).
506    in_progress_lsn: u64,
507    /// Whether we are currently inside a BEGIN..COMMIT pair.
508    in_txn: bool,
509    /// Optional cap on `staged.len()` for a single transaction. `None` means
510    /// unbounded. Exceeding it aborts the run with a typed error instead of
511    /// risking an OOM-kill on a huge bulk transaction (see
512    /// [`PostgresCdcSourceConfig::max_staged_records`]).
513    max_staged_records: Option<usize>,
514}
515
516impl TxnState {
517    /// Stage one decoded change record, enforcing
518    /// [`max_staged_records`](Self::max_staged_records).
519    fn push_staged(&mut self, record: Value) -> Result<(), FaucetError> {
520        if let Some(max) = self.max_staged_records
521            && self.staged.len() >= max
522        {
523            return Err(FaucetError::Source(format!(
524                "postgres-cdc: in-progress transaction exceeded max_staged_records ({max}); \
525                 aborting to avoid unbounded memory growth. Raise max_staged_records or \
526                 reduce the size of the source transaction."
527            )));
528        }
529        self.staged.push(record);
530        Ok(())
531    }
532}
533
534fn handle_event(
535    event: ReplicationEvent,
536    registry: &mut RelationRegistry,
537    state: &mut TxnState,
538    out: &mut Vec<Value>,
539) -> Result<(), FaucetError> {
540    match event {
541        ReplicationEvent::Begin {
542            final_lsn,
543            commit_time_micros,
544            xid: _,
545        } => {
546            if state.in_txn {
547                // A second BEGIN without an intervening COMMIT is a protocol
548                // desync: silently discarding the staged records would drop
549                // committed-but-unemitted changes. Fail fast so the run
550                // restarts cleanly from the durable bookmark (#78/#46).
551                return Err(FaucetError::Source(format!(
552                    "postgres-cdc: BEGIN received while a previous transaction was still \
553                     in progress ({} records staged) — replication stream desync",
554                    state.staged.len()
555                )));
556            }
557            state.in_txn = true;
558            state.in_progress_lsn = final_lsn.as_u64();
559            state.in_progress_ts = commit_time_micros;
560            state.staged.clear();
561        }
562        ReplicationEvent::Commit {
563            lsn: _,
564            commit_time_micros: _,
565            end_lsn,
566        } => {
567            if !state.in_txn {
568                return Err(FaucetError::Source(
569                    "postgres-cdc: COMMIT without BEGIN".into(),
570                ));
571            }
572            // Drain staged records into the caller-supplied output buffer.
573            // The caller is responsible for emitting a StreamPage with these
574            // records and the bookmark.
575            //
576            // The bookmark is the commit's `end_lsn` — the WAL position
577            // immediately AFTER the commit record — not the commit_lsn. To
578            // resume *past* a consumed transaction the slot's
579            // confirmed_flush_lsn must be set to a position beyond the commit
580            // record; advancing only to commit_lsn leaves the commit record
581            // unconfirmed and Postgres redelivers the whole transaction
582            // (#78/#1). `end_lsn` is the position a standby reports as flushed.
583            //
584            // The exactly-once-across-resume boundary (a transaction whose
585            // commit lands exactly at the persisted bookmark is delivered once,
586            // not skipped or duplicated) is exercised by the Docker integration
587            // tests `resume_from_bookmark_skips_already_consumed` and
588            // `lsn_not_advanced_without_durable_bookmark_redelivers` (#78 LOW).
589            out.append(&mut state.staged);
590            state.last_committed = Some(end_lsn.as_u64());
591            state.in_txn = false;
592        }
593        ReplicationEvent::XLogData { data, .. } => {
594            let msg = decode_message(&data)?;
595            handle_pgoutput(msg, registry, state)?;
596        }
597        ReplicationEvent::Message { .. } => {
598            // pg_logical_emit_message — a user-emitted logical message, never a
599            // table change. Intentionally ignored by this row-oriented source.
600        }
601        // KeepAlive and StoppedAt are filtered inside recv(). Any other variant
602        // is one this decoder doesn't understand — it may carry table data, so
603        // dropping it silently risks data loss. Fail fast instead (#78/#46).
604        other => {
605            return Err(FaucetError::Source(format!(
606                "postgres-cdc: unhandled ReplicationEvent variant {other:?} — refusing to \
607                 continue rather than risk silently dropping change data"
608            )));
609        }
610    }
611    Ok(())
612}
613
614fn handle_pgoutput(
615    msg: Message,
616    registry: &mut RelationRegistry,
617    state: &mut TxnState,
618) -> Result<(), FaucetError> {
619    match msg {
620        Message::Relation(r) => registry.insert(r),
621        Message::Origin | Message::Type => {} // ignored
622        Message::Insert(i) => stage_insert(state, registry, i)?,
623        Message::Update(u) => stage_update(state, registry, u)?,
624        Message::Delete(d) => stage_delete(state, registry, d)?,
625        Message::Truncate(t) => stage_truncate(state, registry, t)?,
626        // Begin/Commit pgoutput messages should never arrive here — the
627        // pgwire-replication library decodes them into structured
628        // ReplicationEvent::Begin / Commit variants, handled in handle_event.
629        // If we see one, log a warning and ignore.
630        Message::Begin(_) | Message::Commit(_) => {
631            tracing::warn!(
632                "postgres-cdc: pgoutput Begin/Commit reached pgoutput decoder; \
633                 pgwire-replication should have intercepted it"
634            );
635        }
636    }
637    Ok(())
638}
639
640fn stage_insert(
641    state: &mut TxnState,
642    registry: &RelationRegistry,
643    i: Insert,
644) -> Result<(), FaucetError> {
645    let rel = registry.get(i.relation_oid)?;
646    let after = tuple_to_object(rel, &i.new)?;
647    let r = record(rel, "insert", state, None, Some(after));
648    state.push_staged(r)
649}
650
651fn stage_update(
652    state: &mut TxnState,
653    registry: &RelationRegistry,
654    u: Update,
655) -> Result<(), FaucetError> {
656    let rel = registry.get(u.relation_oid)?;
657    let before = match &u.old {
658        Some(t) => Some(tuple_to_object(rel, t)?),
659        None => None,
660    };
661    let after = tuple_to_object(rel, &u.new)?;
662    let r = record(rel, "update", state, before, Some(after));
663    state.push_staged(r)
664}
665
666fn stage_delete(
667    state: &mut TxnState,
668    registry: &RelationRegistry,
669    d: Delete,
670) -> Result<(), FaucetError> {
671    let rel = registry.get(d.relation_oid)?;
672    let before = Some(tuple_to_object(rel, &d.old)?);
673    let r = record(rel, "delete", state, before, None);
674    state.push_staged(r)
675}
676
677fn stage_truncate(
678    state: &mut TxnState,
679    registry: &RelationRegistry,
680    t: Truncate,
681) -> Result<(), FaucetError> {
682    for oid in &t.relation_oids {
683        let rel = registry.get(*oid)?;
684        let r = record(rel, "truncate", state, None, None);
685        state.push_staged(r)?;
686    }
687    Ok(())
688}
689
690fn record(
691    rel: &Relation,
692    op: &str,
693    state: &TxnState,
694    before: Option<TupleRow>,
695    after: Option<TupleRow>,
696) -> Value {
697    fn to_value(row: TupleRow) -> Value {
698        let mut o = row.values;
699        if !row.unchanged_toast.is_empty() {
700            o.insert("__unchanged_toast__".into(), json!(row.unchanged_toast));
701        }
702        Value::Object(o)
703    }
704    let mut obj = Map::new();
705    obj.insert("op".into(), json!(op));
706    obj.insert("schema".into(), json!(rel.namespace));
707    obj.insert("table".into(), json!(rel.name));
708    obj.insert("lsn".into(), json!(format_lsn(state.in_progress_lsn)));
709    obj.insert(
710        "ts_ms".into(),
711        json!(postgres_clock_to_unix_ms(state.in_progress_ts)),
712    );
713    obj.insert("before".into(), before.map(to_value).unwrap_or(Value::Null));
714    obj.insert("after".into(), after.map(to_value).unwrap_or(Value::Null));
715    Value::Object(obj)
716}
717
718/// Convert a tuple's text cells to a [`TupleRow`].
719fn tuple_to_object(rel: &Relation, tup: &TupleData) -> Result<TupleRow, FaucetError> {
720    if tup.cells.len() != rel.columns.len() {
721        return Err(FaucetError::Source(format!(
722            "postgres-cdc: tuple has {} cells but relation {}.{} has {} columns",
723            tup.cells.len(),
724            rel.namespace,
725            rel.name,
726            rel.columns.len()
727        )));
728    }
729    let mut values = Map::with_capacity(rel.columns.len());
730    let mut unchanged_toast = Vec::new();
731    for (col, cell) in rel.columns.iter().zip(&tup.cells) {
732        match cell {
733            TupleCell::Null => {
734                values.insert(col.name.clone(), Value::Null);
735            }
736            TupleCell::UnchangedToast => {
737                unchanged_toast.push(col.name.clone());
738            }
739            TupleCell::Text(s) => {
740                values.insert(col.name.clone(), text_to_json(col.type_oid, s)?);
741            }
742        }
743    }
744    Ok(TupleRow {
745        values,
746        unchanged_toast,
747    })
748}
749
750#[cfg(test)]
751mod tests {
752    use super::*;
753    use crate::pgoutput::messages::{ColumnDesc, ReplicaIdentity};
754    use crate::replication::ReplicationEvent;
755    use pgwire_replication::Lsn;
756
757    fn rel_users() -> Relation {
758        Relation {
759            oid: 16384,
760            namespace: "public".into(),
761            name: "users".into(),
762            replica_identity: ReplicaIdentity::Default,
763            columns: vec![
764                ColumnDesc {
765                    flags: 1,
766                    name: "id".into(),
767                    type_oid: 23,
768                    type_modifier: -1,
769                },
770                ColumnDesc {
771                    flags: 0,
772                    name: "name".into(),
773                    type_oid: 25,
774                    type_modifier: -1,
775                },
776            ],
777        }
778    }
779
780    fn xlogdata(payload: Vec<u8>) -> ReplicationEvent {
781        ReplicationEvent::XLogData {
782            wal_start: Lsn::from_u64(0),
783            wal_end: Lsn::from_u64(0x16A_4F88),
784            server_time_micros: 0,
785            data: bytes::Bytes::from(payload),
786        }
787    }
788
789    fn insert_payload(relation_oid: u32, cells: &[(&str, &str)]) -> Vec<u8> {
790        let mut buf: Vec<u8> = Vec::new();
791        buf.push(b'I');
792        buf.extend_from_slice(&relation_oid.to_be_bytes());
793        buf.push(b'N');
794        buf.extend_from_slice(&(cells.len() as u16).to_be_bytes());
795        for (_, val) in cells {
796            text_cell(&mut buf, val);
797        }
798        buf
799    }
800
801    /// 'U' relation 'O' fullold 'N' new — exercises REPLICA IDENTITY FULL.
802    fn update_full_payload(
803        relation_oid: u32,
804        old_cells: &[(&str, &str)],
805        new_cells: &[(&str, &str)],
806    ) -> Vec<u8> {
807        let mut buf: Vec<u8> = Vec::new();
808        buf.push(b'U');
809        buf.extend_from_slice(&relation_oid.to_be_bytes());
810        buf.push(b'O');
811        buf.extend_from_slice(&(old_cells.len() as u16).to_be_bytes());
812        for (_, val) in old_cells {
813            text_cell(&mut buf, val);
814        }
815        buf.push(b'N');
816        buf.extend_from_slice(&(new_cells.len() as u16).to_be_bytes());
817        for (_, val) in new_cells {
818            text_cell(&mut buf, val);
819        }
820        buf
821    }
822
823    /// 'D' relation 'O' fullold — REPLICA IDENTITY FULL delete.
824    fn delete_full_payload(relation_oid: u32, old_cells: &[(&str, &str)]) -> Vec<u8> {
825        let mut buf: Vec<u8> = Vec::new();
826        buf.push(b'D');
827        buf.extend_from_slice(&relation_oid.to_be_bytes());
828        buf.push(b'O');
829        buf.extend_from_slice(&(old_cells.len() as u16).to_be_bytes());
830        for (_, val) in old_cells {
831            text_cell(&mut buf, val);
832        }
833        buf
834    }
835
836    /// 'T' flags=0 oids... — truncate without cascade/restart_identity.
837    fn truncate_payload(relation_oids: &[u32]) -> Vec<u8> {
838        let mut buf: Vec<u8> = Vec::new();
839        buf.push(b'T');
840        buf.extend_from_slice(&(relation_oids.len() as u32).to_be_bytes());
841        buf.push(0u8); // flags
842        for oid in relation_oids {
843            buf.extend_from_slice(&oid.to_be_bytes());
844        }
845        buf
846    }
847
848    fn text_cell(buf: &mut Vec<u8>, val: &str) {
849        buf.push(b't');
850        buf.extend_from_slice(&(val.len() as u32).to_be_bytes());
851        buf.extend_from_slice(val.as_bytes());
852    }
853
854    fn begin_event(final_lsn: u64) -> ReplicationEvent {
855        ReplicationEvent::Begin {
856            final_lsn: Lsn::from_u64(final_lsn),
857            xid: 1,
858            commit_time_micros: 0,
859        }
860    }
861
862    fn commit_event(lsn: u64) -> ReplicationEvent {
863        ReplicationEvent::Commit {
864            lsn: Lsn::from_u64(lsn),
865            end_lsn: Lsn::from_u64(lsn + 0x10),
866            commit_time_micros: 0,
867        }
868    }
869
870    #[test]
871    fn full_transaction_promotes_to_output_on_commit() {
872        let mut registry = RelationRegistry::new();
873        registry.insert(rel_users());
874        let mut state = TxnState::default();
875        let mut out = vec![];
876
877        handle_event(begin_event(0x16A_4F88), &mut registry, &mut state, &mut out).unwrap();
878        assert!(out.is_empty());
879
880        handle_event(
881            xlogdata(insert_payload(16384, &[("id", "1"), ("name", "alice")])),
882            &mut registry,
883            &mut state,
884            &mut out,
885        )
886        .unwrap();
887        assert!(out.is_empty(), "records stay staged until COMMIT");
888
889        handle_event(
890            commit_event(0x16A_4F88),
891            &mut registry,
892            &mut state,
893            &mut out,
894        )
895        .unwrap();
896
897        assert_eq!(out.len(), 1);
898        assert_eq!(out[0]["op"], "insert");
899        assert_eq!(out[0]["schema"], "public");
900        assert_eq!(out[0]["table"], "users");
901        assert_eq!(out[0]["lsn"], "0/16A4F88");
902        assert_eq!(out[0]["after"]["id"], 1);
903        assert_eq!(out[0]["after"]["name"], "alice");
904        assert_eq!(out[0]["before"], Value::Null);
905
906        // The bookmark is the commit's `end_lsn` (the resume position just
907        // past the commit record), not the commit_lsn. `commit_event` sets
908        // end_lsn = commit_lsn + 0x10. The record's own "lsn" field above is
909        // still the commit_lsn (display/identity), which is intentional.
910        assert_eq!(state.last_committed, Some(0x16A_4F88 + 0x10));
911    }
912
913    #[test]
914    fn staging_beyond_max_staged_records_aborts() {
915        // Regression for #78/#2: a single transaction that stages more records
916        // than `max_staged_records` must abort with a typed error rather than
917        // buffering unboundedly (OOM risk).
918        let mut registry = RelationRegistry::new();
919        registry.insert(rel_users());
920        let mut state = TxnState {
921            max_staged_records: Some(2),
922            ..TxnState::default()
923        };
924        let mut out = vec![];
925
926        handle_event(begin_event(0x16A_4F88), &mut registry, &mut state, &mut out).unwrap();
927        // First two inserts stage fine.
928        for id in ["1", "2"] {
929            handle_event(
930                xlogdata(insert_payload(16384, &[("id", id), ("name", "x")])),
931                &mut registry,
932                &mut state,
933                &mut out,
934            )
935            .unwrap();
936        }
937        // The third exceeds the cap and must error.
938        let err = handle_event(
939            xlogdata(insert_payload(16384, &[("id", "3"), ("name", "x")])),
940            &mut registry,
941            &mut state,
942            &mut out,
943        )
944        .unwrap_err();
945        assert!(
946            format!("{err}").contains("max_staged_records"),
947            "error must name the cap: {err}"
948        );
949        assert!(matches!(err, FaucetError::Source(_)));
950    }
951
952    #[test]
953    fn no_cap_allows_large_transactions() {
954        // With max_staged_records = None (default) an arbitrarily large
955        // transaction stages without error.
956        let mut registry = RelationRegistry::new();
957        registry.insert(rel_users());
958        let mut state = TxnState::default();
959        let mut out = vec![];
960
961        handle_event(begin_event(0x16A_4F88), &mut registry, &mut state, &mut out).unwrap();
962        for id in 0..50 {
963            handle_event(
964                xlogdata(insert_payload(
965                    16384,
966                    &[("id", &id.to_string()), ("name", "x")],
967                )),
968                &mut registry,
969                &mut state,
970                &mut out,
971            )
972            .unwrap();
973        }
974        handle_event(
975            commit_event(0x16A_4F88),
976            &mut registry,
977            &mut state,
978            &mut out,
979        )
980        .unwrap();
981        assert_eq!(out.len(), 50);
982    }
983
984    #[test]
985    fn commit_without_begin_errors() {
986        let mut registry = RelationRegistry::new();
987        let mut state = TxnState::default();
988        let mut out = vec![];
989
990        let err = handle_event(
991            ReplicationEvent::Commit {
992                lsn: Lsn::from_u64(1),
993                end_lsn: Lsn::from_u64(2),
994                commit_time_micros: 0,
995            },
996            &mut registry,
997            &mut state,
998            &mut out,
999        )
1000        .unwrap_err();
1001        assert!(format!("{err}").contains("COMMIT without BEGIN"));
1002    }
1003
1004    #[test]
1005    fn double_begin_errors() {
1006        // Regression for #78/#46: a second BEGIN without an intervening COMMIT
1007        // is a stream desync and must abort, not silently discard staged rows.
1008        let mut registry = RelationRegistry::new();
1009        registry.insert(rel_users());
1010        let mut state = TxnState::default();
1011        let mut out = vec![];
1012
1013        handle_event(begin_event(0x100), &mut registry, &mut state, &mut out).unwrap();
1014        handle_event(
1015            xlogdata(insert_payload(16384, &[("id", "1"), ("name", "alice")])),
1016            &mut registry,
1017            &mut state,
1018            &mut out,
1019        )
1020        .unwrap();
1021
1022        let err =
1023            handle_event(begin_event(0x200), &mut registry, &mut state, &mut out).unwrap_err();
1024        assert!(format!("{err}").contains("desync"), "{err}");
1025    }
1026
1027    #[test]
1028    fn unknown_relation_in_insert_errors() {
1029        let mut registry = RelationRegistry::new();
1030        let mut state = TxnState::default();
1031        let mut out = vec![];
1032
1033        handle_event(begin_event(1), &mut registry, &mut state, &mut out).unwrap();
1034        // Insert references relation 99999 which is not in the registry.
1035        let err = handle_event(
1036            xlogdata(insert_payload(99999, &[("id", "1"), ("name", "alice")])),
1037            &mut registry,
1038            &mut state,
1039            &mut out,
1040        )
1041        .unwrap_err();
1042        assert!(format!("{err}").contains("99999"));
1043    }
1044
1045    #[test]
1046    fn update_with_replica_identity_full_emits_before_and_after() {
1047        let mut registry = RelationRegistry::new();
1048        registry.insert(rel_users());
1049        let mut state = TxnState::default();
1050        let mut out = vec![];
1051
1052        handle_event(begin_event(0x16A_4F88), &mut registry, &mut state, &mut out).unwrap();
1053        handle_event(
1054            xlogdata(update_full_payload(
1055                16384,
1056                &[("id", "1"), ("name", "alice")],
1057                &[("id", "1"), ("name", "alice2")],
1058            )),
1059            &mut registry,
1060            &mut state,
1061            &mut out,
1062        )
1063        .unwrap();
1064        handle_event(
1065            commit_event(0x16A_4F88),
1066            &mut registry,
1067            &mut state,
1068            &mut out,
1069        )
1070        .unwrap();
1071
1072        assert_eq!(out.len(), 1);
1073        assert_eq!(out[0]["op"], "update");
1074        assert_eq!(out[0]["before"]["id"], 1);
1075        assert_eq!(out[0]["before"]["name"], "alice");
1076        assert_eq!(out[0]["after"]["name"], "alice2");
1077    }
1078
1079    #[test]
1080    fn delete_with_replica_identity_full_emits_before_only() {
1081        let mut registry = RelationRegistry::new();
1082        registry.insert(rel_users());
1083        let mut state = TxnState::default();
1084        let mut out = vec![];
1085
1086        handle_event(begin_event(0x16A_4F88), &mut registry, &mut state, &mut out).unwrap();
1087        handle_event(
1088            xlogdata(delete_full_payload(
1089                16384,
1090                &[("id", "1"), ("name", "alice")],
1091            )),
1092            &mut registry,
1093            &mut state,
1094            &mut out,
1095        )
1096        .unwrap();
1097        handle_event(
1098            commit_event(0x16A_4F88),
1099            &mut registry,
1100            &mut state,
1101            &mut out,
1102        )
1103        .unwrap();
1104
1105        assert_eq!(out.len(), 1);
1106        assert_eq!(out[0]["op"], "delete");
1107        assert_eq!(out[0]["before"]["id"], 1);
1108        assert_eq!(out[0]["before"]["name"], "alice");
1109        assert_eq!(out[0]["after"], Value::Null);
1110    }
1111
1112    #[test]
1113    fn truncate_emits_one_record_per_relation() {
1114        let mut registry = RelationRegistry::new();
1115        registry.insert(rel_users());
1116        // Build a second relation so the truncate-list has two known OIDs.
1117        let mut second = rel_users();
1118        second.oid = 16385;
1119        second.name = "orders".into();
1120        registry.insert(second);
1121
1122        let mut state = TxnState::default();
1123        let mut out = vec![];
1124
1125        handle_event(begin_event(0x16A_4F88), &mut registry, &mut state, &mut out).unwrap();
1126        handle_event(
1127            xlogdata(truncate_payload(&[16384, 16385])),
1128            &mut registry,
1129            &mut state,
1130            &mut out,
1131        )
1132        .unwrap();
1133        handle_event(
1134            commit_event(0x16A_4F88),
1135            &mut registry,
1136            &mut state,
1137            &mut out,
1138        )
1139        .unwrap();
1140
1141        assert_eq!(out.len(), 2);
1142        assert!(out.iter().all(|r| r["op"] == "truncate"));
1143        let tables: Vec<_> = out.iter().map(|r| r["table"].as_str().unwrap()).collect();
1144        assert!(tables.contains(&"users"));
1145        assert!(tables.contains(&"orders"));
1146    }
1147
1148    #[test]
1149    fn unchanged_toast_in_before_surfaces_via_metadata() {
1150        // Exercise Fix 1: a REPLICA IDENTITY FULL update with an UnchangedToast
1151        // cell in the old tuple must record the column name in
1152        // before.__unchanged_toast__.
1153        let mut registry = RelationRegistry::new();
1154        registry.insert(rel_users());
1155        let mut state = TxnState::default();
1156        let mut out = vec![];
1157
1158        handle_event(begin_event(0x16A_4F88), &mut registry, &mut state, &mut out).unwrap();
1159        // Hand-build an UPDATE where the OLD tuple's `name` cell is 'u' (unchanged TOAST).
1160        let mut buf: Vec<u8> = Vec::new();
1161        buf.push(b'U');
1162        buf.extend_from_slice(&16384u32.to_be_bytes());
1163        buf.push(b'O');
1164        buf.extend_from_slice(&2u16.to_be_bytes());
1165        // id = text "1"
1166        text_cell(&mut buf, "1");
1167        // name = unchanged TOAST
1168        buf.push(b'u');
1169        // New tuple: id=1, name="alice2"
1170        buf.push(b'N');
1171        buf.extend_from_slice(&2u16.to_be_bytes());
1172        text_cell(&mut buf, "1");
1173        text_cell(&mut buf, "alice2");
1174        handle_event(xlogdata(buf), &mut registry, &mut state, &mut out).unwrap();
1175        handle_event(
1176            commit_event(0x16A_4F88),
1177            &mut registry,
1178            &mut state,
1179            &mut out,
1180        )
1181        .unwrap();
1182
1183        assert_eq!(out.len(), 1);
1184        assert_eq!(out[0]["before"]["__unchanged_toast__"], json!(["name"]));
1185        assert!(out[0]["before"].get("name").is_none());
1186        assert_eq!(out[0]["before"]["id"], 1);
1187        assert_eq!(out[0]["after"]["name"], "alice2");
1188    }
1189}