faucet-source-postgres-cdc 1.0.0

PostgreSQL logical replication (CDC) source for the faucet-stream ecosystem
Documentation

faucet-source-postgres-cdc

PostgreSQL CDC (Change Data Capture) source for faucet-stream. Subscribes to a Postgres logical replication slot using the pgoutput plugin and emits each row-level change (INSERT / UPDATE / DELETE / TRUNCATE) as a JSON record. Replication position is persisted via any faucet-core StateStore, so pipelines resume exactly where they left off — no duplicates, no gap.


Quick start

# pipeline.yaml
version: 1
source:
  type: postgres-cdc
  config:
    connection_url: postgres://user:pass@localhost:5432/appdb
    slot_name: faucet_slot
    publication_name: faucet_pub
    create_slot_if_missing: true
    idle_timeout: 30
sink:
  type: jsonl
  config:
    path: ./changes.jsonl
state:
  type: file
  config: { path: ./state }
faucet run pipeline.yaml

Postgres setup (one-time)

-- 1. Database needs logical replication enabled.
ALTER SYSTEM SET wal_level = 'logical';
ALTER SYSTEM SET max_replication_slots = 4;
ALTER SYSTEM SET max_wal_senders = 4;
-- restart Postgres after this

-- 2. The role that connects must have REPLICATION.
ALTER ROLE faucet_user WITH REPLICATION;

-- 3. Create a publication for the tables you want to capture.
CREATE PUBLICATION faucet_pub FOR TABLE public.users, public.orders;
-- or FOR ALL TABLES if you really want everything.

-- 4. (Optional but recommended) get a full pre-image on UPDATE/DELETE.
ALTER TABLE public.users REPLICA IDENTITY FULL;

The replication slot is created automatically on first run when create_slot_if_missing = true (the default).


Output record schema

Every change event is one JSON object:

{
  "op":     "insert | update | delete | truncate",
  "schema": "public",
  "table":  "users",
  "lsn":    "0/16A4F88",
  "ts_ms":  1779019200000,
  "before": null | { ...row... },
  "after":  null | { ...row..., "__unchanged_toast__": ["col1", "col2"] }
}
  • lsn is the commit_lsn of the enclosing transaction.
  • ts_ms is Unix-epoch milliseconds, derived from the COMMIT's timestamp.
  • before is populated on delete always, and on update only when the table is REPLICA IDENTITY FULL. Otherwise it is null.
  • after is populated on insert and update, and null on delete / truncate.
  • If any column arrived as "unchanged TOAST" (Postgres elides large out-of-line values whose stored copy was not rewritten), the column is dropped from before / after and its name is recorded in before.__unchanged_toast__ / after.__unchanged_toast__.

truncate emits one record per truncated relation with before = after = null.

Column type mapping

Values arrive in Postgres text form and are decoded by column type OID:

Postgres type JSON
bool true / false
int2 / int4 / int8 number
float4 / float8 (finite) number
float4 / float8 NaN / ±Infinity "NaN" / "Infinity" / "-Infinity" (string, to stay distinct from a SQL NULLnull)
numeric string (exact precision)
bytea base64 string
json / jsonb parsed JSON value
one-dimensional arrays of the above scalar types (int4[], text[], uuid[], …) JSON array, decoded element-by-element (NULL elements → null)
uuid, date, time, timestamp, timestamptz, and anything else string (raw Postgres text — stable under the default DateStyle ISO)

Multi-dimensional arrays, ranges, composites, and enums fall back to the raw Postgres array/text string.

Stream robustness

The decoder fails fast (FaucetError::Source, which the pipeline restarts from the durable bookmark) rather than silently dropping data on a protocol desync: a COMMIT without a BEGIN, a second BEGIN while a transaction is still staged, or a ReplicationEvent variant the decoder doesn't recognise. A relation whose column set changes mid-stream (an ALTER TABLE) is logged at warn — subsequent rows decode against the new descriptor.


Configuration

Field Type Default Description
connection_url string Postgres connection URL.
slot_name string Logical replication slot. Must match [a-z0-9_]{1,63}.
publication_name string Publication that selects which tables are replicated.
create_slot_if_missing bool true Create the slot on first run if it does not exist.
slot_type enum permanent permanent (survives disconnect, pins WAL until consumed/dropped) or temporary (auto-dropped on disconnect; resets resume position, so not for cross-run bookmark resume). Permanent-slot creation logs a loud warning. Drop an unused permanent slot via PostgresCdcSource::drop_slot().
tls object {mode: disable} Replication-connection TLS: {mode: disable} (plaintext, default), {mode: require}, {mode: verify_ca, ca_path?}, or {mode: verify_full, ca_path?}. Use require/verify_* in production — disable sends credentials and WAL in plaintext.
start_lsn string? null One-time override (e.g. "0/16A4F88"); ignored if a state-store bookmark exists.
proto_version u32 1 pgoutput protocol version. Only 1 is supported in this release.
idle_timeout seconds 30 Stop the current fetch cycle after this long without a new replication message.
max_messages usize? null Optional cap on change events per fetch call. The cap is checked after each COMMIT; a transaction larger than max_messages still emits atomically.
max_staged_records usize? null Max change records buffered for a single in-progress transaction before the run aborts with a typed error. null = unbounded. A safety valve against OOM on huge bulk transactions — see Transactional consistency.
status_update_interval seconds 10 Standby Status Update cadence. Must be < idle_timeout and well under the server's wal_sender_timeout.
tcp_keepalive seconds 60 TCP keepalive on the replication connection.
slot_acquire_retries u32 10 Retries when the slot is still active (held by a not-yet-released prior connection) on a rapid restart. Both the pre-stream slot advance and START_REPLICATION are retried with exponential backoff (250 ms, doubling, capped at 4 s). 0 = fail fast.

Transactional consistency

The connector buffers each transaction in memory and only flushes its records to the sink on COMMIT. Partial transactions (BEGIN seen, no COMMIT yet within idle_timeout / max_messages) are dropped and redelivered after the next START_REPLICATION. This makes the output transactionally consistent — sinks never see half a transaction — at the cost of needing each transaction to fit within one fetch cycle.

Because a transaction is buffered in full until its COMMIT, a single bulk UPDATE/DELETE/COPY of millions of rows holds every decoded row in memory at once. Set max_staged_records to bound that: when an in-progress transaction exceeds it the run aborts with a typed FaucetError::Source instead of risking an OOM-kill. Leave it null only if you are sure your source transactions fit comfortably in memory.


At-least-once semantics

  • Postgres redelivers everything after the most recent confirmed_flush_lsn on every START_REPLICATION, with no duplicates and no gap.
  • faucet-stream's pipeline only writes the new bookmark to the state store after the sink confirms the batch was flushed.
  • On the next run, apply_start_bookmark is called with that bookmark and the connector advances confirmed_flush_lsn accordingly.
  • The advertised confirmed_flush_lsn is advanced only from a durable bookmark (via apply_start_bookmark) — never from decoded WAL at commit-decode time, and never from a keepalive's wal_end. This guarantees Postgres is never told to recycle WAL for changes the consumer has not durably persisted, so a crash can never lose committed data (it replays instead).
  • If the process crashes between sink flush and state-store write, the next run will replay the most recent transaction. Sinks should be idempotent or accept duplicates at transaction boundaries.

Operational caveats

  • Slot bloat / WAL retention: the connector advances confirmed_flush_lsn only at the start of a run, from the durable bookmark. Within a single long-running fetch cycle it does not advance it, so Postgres retains all WAL produced during that cycle until the next run resumes from the persisted bookmark. Without a state store it never advances at all and WAL is retained indefinitely. Always configure a state store in production, and run frequently enough (or keep fetch cycles short enough) that WAL retention stays within your disk budget. A permanent slot (the default) keeps pinning WAL even when no consumer is connected, so decommission an unused pipeline by calling PostgresCdcSource::drop_slot() (or set slot_type: temporary for ephemeral runs that should self-clean on disconnect).
  • Encryption: the replication connection is plaintext unless you set tls to require / verify_ca / verify_full — credentials and WAL data travel unencrypted under the default disable.
  • Heartbeats: if the network is silent for longer than the server's wal_sender_timeout (default 60 s), Postgres kills the replication connection. The default status_update_interval of 10 s leaves ample margin.
  • DDL is invisible. Logical replication does not replicate schema changes. After an ALTER TABLE, the next change event from that table will arrive with a new Relation message — the connector picks it up automatically.
  • Reconnection on drop: v1 surfaces a connection drop as FaucetError::Source. Re-run the pipeline to reconnect; the persisted LSN bookmark ensures no data loss.
  • Slot creation timing: the slot is created on the first fetch call. Changes applied before the slot exists are not replicated. To avoid losing initial changes, either create the slot before applying writes (e.g. via psql) or do a warm-up fetch and then begin writes.

Implementation notes

Built on top of the pgwire-replication crate for the Postgres logical-replication wire protocol; the pgoutput payload bytes are decoded by a hand-rolled decoder in this crate so that the output record shape stays under our control.


See also

  • crates/source/postgres/ — query-mode Postgres source (snapshots, not CDC).
  • crates/state/postgres/ — Postgres-backed StateStore you can pair with this source.
  • cli/examples/postgres_cdc_to_jsonl.yaml — end-to-end demo configuration.