faucet-source-postgres-cdc
PostgreSQL CDC (Change Data Capture) source for faucet-stream. Subscribes to a Postgres logical replication slot using the pgoutput plugin and emits each row-level change (INSERT / UPDATE / DELETE / TRUNCATE) as a JSON record. Replication position is persisted via any faucet-core StateStore, so pipelines resume exactly where they left off — no duplicates, no gap.
Quick start
# pipeline.yaml
version: 1
source:
type: postgres-cdc
config:
connection_url: postgres://user:pass@localhost:5432/appdb
slot_name: faucet_slot
publication_name: faucet_pub
create_slot_if_missing: true
idle_timeout: 30
sink:
type: jsonl
config:
path: ./changes.jsonl
state:
type: file
config:
Postgres setup (one-time)
-- 1. Database needs logical replication enabled.
ALTER SYSTEM SET wal_level = 'logical';
ALTER SYSTEM SET max_replication_slots = 4;
ALTER SYSTEM SET max_wal_senders = 4;
-- restart Postgres after this
-- 2. The role that connects must have REPLICATION.
ALTER ROLE faucet_user WITH REPLICATION;
-- 3. Create a publication for the tables you want to capture.
CREATE PUBLICATION faucet_pub FOR TABLE public.users, public.orders;
-- or FOR ALL TABLES if you really want everything.
-- 4. (Optional but recommended) get a full pre-image on UPDATE/DELETE.
public.users REPLICA IDENTITY FULL;
The replication slot is created automatically on first run when
create_slot_if_missing = true (the default).
Output record schema
Every change event is one JSON object:
,
"after": null |
}
lsnis thecommit_lsnof the enclosing transaction.ts_msis Unix-epoch milliseconds, derived from the COMMIT's timestamp.beforeis populated ondeletealways, and onupdateonly when the table isREPLICA IDENTITY FULL. Otherwise it isnull.afteris populated oninsertandupdate, andnullondelete/truncate.- If any column arrived as "unchanged TOAST" (Postgres elides large
out-of-line values whose stored copy was not rewritten), the column is
dropped from
before/afterand its name is recorded inbefore.__unchanged_toast__/after.__unchanged_toast__.
truncate emits one record per truncated relation with before = after = null.
Column type mapping
Values arrive in Postgres text form and are decoded by column type OID:
| Postgres type | JSON |
|---|---|
bool |
true / false |
int2 / int4 / int8 |
number |
float4 / float8 (finite) |
number |
float4 / float8 NaN / ±Infinity |
"NaN" / "Infinity" / "-Infinity" (string, to stay distinct from a SQL NULL → null) |
numeric |
string (exact precision) |
bytea |
base64 string |
json / jsonb |
parsed JSON value |
one-dimensional arrays of the above scalar types (int4[], text[], uuid[], …) |
JSON array, decoded element-by-element (NULL elements → null) |
uuid, date, time, timestamp, timestamptz, and anything else |
string (raw Postgres text — stable under the default DateStyle ISO) |
Multi-dimensional arrays, ranges, composites, and enums fall back to the raw Postgres array/text string.
Stream robustness
The decoder fails fast (FaucetError::Source, which the pipeline restarts from the durable bookmark) rather than silently dropping data on a protocol desync: a COMMIT without a BEGIN, a second BEGIN while a transaction is still staged, or a ReplicationEvent variant the decoder doesn't recognise. A relation whose column set changes mid-stream (an ALTER TABLE) is logged at warn — subsequent rows decode against the new descriptor.
Configuration
| Field | Type | Default | Description |
|---|---|---|---|
connection_url |
string | — | Postgres connection URL. |
slot_name |
string | — | Logical replication slot. Must match [a-z0-9_]{1,63}. |
publication_name |
string | — | Publication that selects which tables are replicated. |
create_slot_if_missing |
bool | true |
Create the slot on first run if it does not exist. |
slot_type |
enum | permanent |
permanent (survives disconnect, pins WAL until consumed/dropped) or temporary (auto-dropped on disconnect; resets resume position, so not for cross-run bookmark resume). Permanent-slot creation logs a loud warning. Drop an unused permanent slot via PostgresCdcSource::drop_slot(). |
tls |
object | {mode: disable} |
Replication-connection TLS: {mode: disable} (plaintext, default), {mode: require}, {mode: verify_ca, ca_path?}, or {mode: verify_full, ca_path?}. Use require/verify_* in production — disable sends credentials and WAL in plaintext. |
start_lsn |
string? | null |
One-time override (e.g. "0/16A4F88"); ignored if a state-store bookmark exists. |
proto_version |
u32 | 1 |
pgoutput protocol version. Only 1 is supported in this release. |
idle_timeout |
seconds | 30 |
Stop the current fetch cycle after this long without a new replication message. |
max_messages |
usize? | null |
Optional cap on change events per fetch call. The cap is checked after each COMMIT; a transaction larger than max_messages still emits atomically. |
max_staged_records |
usize? | null |
Max change records buffered for a single in-progress transaction before the run aborts with a typed error. null = unbounded. A safety valve against OOM on huge bulk transactions — see Transactional consistency. |
status_update_interval |
seconds | 10 |
Standby Status Update cadence. Must be < idle_timeout and well under the server's wal_sender_timeout. |
tcp_keepalive |
seconds | 60 |
TCP keepalive on the replication connection. |
slot_acquire_retries |
u32 | 10 |
Retries when the slot is still active (held by a not-yet-released prior connection) on a rapid restart. Both the pre-stream slot advance and START_REPLICATION are retried with exponential backoff (250 ms, doubling, capped at 4 s). 0 = fail fast. |
Transactional consistency
The connector buffers each transaction in memory and only flushes its records
to the sink on COMMIT. Partial transactions (BEGIN seen, no COMMIT yet
within idle_timeout / max_messages) are dropped and redelivered after the
next START_REPLICATION. This makes the output transactionally consistent —
sinks never see half a transaction — at the cost of needing each
transaction to fit within one fetch cycle.
Because a transaction is buffered in full until its COMMIT, a single bulk
UPDATE/DELETE/COPY of millions of rows holds every decoded row in
memory at once. Set max_staged_records to bound that: when an in-progress
transaction exceeds it the run aborts with a typed FaucetError::Source
instead of risking an OOM-kill. Leave it null only if you are sure your
source transactions fit comfortably in memory.
At-least-once semantics
- Postgres redelivers everything after the most recent
confirmed_flush_lsnon everySTART_REPLICATION, with no duplicates and no gap. faucet-stream's pipeline only writes the new bookmark to the state store after the sink confirms the batch was flushed.- On the next run,
apply_start_bookmarkis called with that bookmark and the connector advancesconfirmed_flush_lsnaccordingly. - The advertised
confirmed_flush_lsnis advanced only from a durable bookmark (viaapply_start_bookmark) — never from decoded WAL at commit-decode time, and never from a keepalive'swal_end. This guarantees Postgres is never told to recycle WAL for changes the consumer has not durably persisted, so a crash can never lose committed data (it replays instead). - If the process crashes between sink flush and state-store write, the next run will replay the most recent transaction. Sinks should be idempotent or accept duplicates at transaction boundaries.
Operational caveats
- Slot bloat / WAL retention: the connector advances
confirmed_flush_lsnonly at the start of a run, from the durable bookmark. Within a single long-running fetch cycle it does not advance it, so Postgres retains all WAL produced during that cycle until the next run resumes from the persisted bookmark. Without a state store it never advances at all and WAL is retained indefinitely. Always configure a state store in production, and run frequently enough (or keep fetch cycles short enough) that WAL retention stays within your disk budget. A permanent slot (the default) keeps pinning WAL even when no consumer is connected, so decommission an unused pipeline by callingPostgresCdcSource::drop_slot()(or setslot_type: temporaryfor ephemeral runs that should self-clean on disconnect). - Encryption: the replication connection is plaintext unless you set
tlstorequire/verify_ca/verify_full— credentials and WAL data travel unencrypted under the defaultdisable. - Heartbeats: if the network is silent for longer than the server's
wal_sender_timeout(default 60 s), Postgres kills the replication connection. The defaultstatus_update_intervalof 10 s leaves ample margin. - DDL is invisible. Logical replication does not replicate schema
changes. After an
ALTER TABLE, the next change event from that table will arrive with a newRelationmessage — the connector picks it up automatically. - Reconnection on drop: v1 surfaces a connection drop as
FaucetError::Source. Re-run the pipeline to reconnect; the persisted LSN bookmark ensures no data loss. - Slot creation timing: the slot is created on the first fetch call.
Changes applied before the slot exists are not replicated. To avoid
losing initial changes, either create the slot before applying writes
(e.g. via
psql) or do a warm-up fetch and then begin writes.
Implementation notes
Built on top of the pgwire-replication
crate for the Postgres logical-replication wire protocol; the pgoutput
payload bytes are decoded by a hand-rolled decoder in this crate so that the
output record shape stays under our control.
See also
crates/source/postgres/— query-mode Postgres source (snapshots, not CDC).crates/state/postgres/— Postgres-backedStateStoreyou can pair with this source.cli/examples/postgres_cdc_to_jsonl.yaml— end-to-end demo configuration.