# faucet-source-postgres-cdc
PostgreSQL CDC (Change Data Capture) source for [`faucet-stream`](https://github.com/PawanSikawat/faucet-stream). Subscribes to a Postgres logical replication slot using the `pgoutput` plugin and emits each row-level change (INSERT / UPDATE / DELETE / TRUNCATE) as a JSON record. Replication position is persisted via any `faucet-core` `StateStore`, so pipelines resume exactly where they left off — no duplicates, no gap.
---
## Quick start
```yaml
# pipeline.yaml
version: 1
source:
type: postgres-cdc
config:
connection_url: postgres://user:pass@localhost:5432/appdb
slot_name: faucet_slot
publication_name: faucet_pub
create_slot_if_missing: true
idle_timeout: 30
sink:
type: jsonl
config:
path: ./changes.jsonl
state:
type: file
config: { path: ./state }
```
```bash
faucet run pipeline.yaml
```
---
## Postgres setup (one-time)
```sql
-- 1. Database needs logical replication enabled.
ALTER SYSTEM SET wal_level = 'logical';
ALTER SYSTEM SET max_replication_slots = 4;
ALTER SYSTEM SET max_wal_senders = 4;
-- restart Postgres after this
-- 2. The role that connects must have REPLICATION.
ALTER ROLE faucet_user WITH REPLICATION;
-- 3. Create a publication for the tables you want to capture.
CREATE PUBLICATION faucet_pub FOR TABLE public.users, public.orders;
-- or FOR ALL TABLES if you really want everything.
-- 4. (Optional but recommended) get a full pre-image on UPDATE/DELETE.
ALTER TABLE public.users REPLICA IDENTITY FULL;
```
The replication slot is created automatically on first run when
`create_slot_if_missing = true` (the default).
---
## Output record schema
Every change event is one JSON object:
```json
{
"table": "users",
"lsn": "0/16A4F88",
"ts_ms": 1779019200000,
"before": null | { ...row... },
"after": null | { ...row..., "__unchanged_toast__": ["col1", "col2"] }
}
```
- `lsn` is the `commit_lsn` of the enclosing transaction.
- `ts_ms` is Unix-epoch milliseconds, derived from the COMMIT's timestamp.
- `before` is populated on `delete` always, and on `update` only when the
table is `REPLICA IDENTITY FULL`. Otherwise it is `null`.
- `after` is populated on `insert` and `update`, and `null` on `delete` /
`truncate`.
- If any column arrived as "unchanged TOAST" (Postgres elides large
out-of-line values whose stored copy was not rewritten), the column is
dropped from `before` / `after` and its name is recorded in
`before.__unchanged_toast__` / `after.__unchanged_toast__`.
`truncate` emits one record per truncated relation with `before = after = null`.
### Column type mapping
Values arrive in Postgres text form and are decoded by column type OID:
| `bool` | `true` / `false` |
| `int2` / `int4` / `int8` | number |
| `float4` / `float8` (finite) | number |
| `float4` / `float8` `NaN` / `±Infinity` | `"NaN"` / `"Infinity"` / `"-Infinity"` (string, to stay distinct from a SQL `NULL` → `null`) |
| `numeric` | string (exact precision) |
| `bytea` | base64 string |
| `json` / `jsonb` | parsed JSON value |
| one-dimensional arrays of the above scalar types (`int4[]`, `text[]`, `uuid[]`, …) | JSON array, decoded element-by-element (`NULL` elements → `null`) |
| `uuid`, `date`, `time`, `timestamp`, `timestamptz`, and anything else | string (raw Postgres text — stable under the default `DateStyle ISO`) |
Multi-dimensional arrays, ranges, composites, and enums fall back to the raw Postgres array/text string.
### Stream robustness
The decoder fails fast (`FaucetError::Source`, which the pipeline restarts from the durable bookmark) rather than silently dropping data on a protocol desync: a `COMMIT` without a `BEGIN`, a second `BEGIN` while a transaction is still staged, or a `ReplicationEvent` variant the decoder doesn't recognise. A relation whose column set changes mid-stream (an `ALTER TABLE`) is logged at `warn` — subsequent rows decode against the new descriptor.
---
## Configuration
| `connection_url` | string | — | Postgres connection URL. |
| `slot_name` | string | — | Logical replication slot. Must match `[a-z0-9_]{1,63}`. |
| `publication_name` | string | — | Publication that selects which tables are replicated. |
| `create_slot_if_missing` | bool | `true` | Create the slot on first run if it does not exist. |
| `slot_type` | enum | `permanent` | `permanent` (survives disconnect, pins WAL until consumed/dropped) or `temporary` (auto-dropped on disconnect; resets resume position, so not for cross-run bookmark resume). Permanent-slot creation logs a loud warning. Drop an unused permanent slot via `PostgresCdcSource::drop_slot()`. |
| `tls` | object | `{mode: disable}` | Replication-connection TLS: `{mode: disable}` (plaintext, default), `{mode: require}`, `{mode: verify_ca, ca_path?}`, or `{mode: verify_full, ca_path?}`. Use `require`/`verify_*` in production — `disable` sends credentials and WAL in plaintext. |
| `start_lsn` | string? | `null` | One-time override (e.g. `"0/16A4F88"`); ignored if a state-store bookmark exists. |
| `proto_version` | u32 | `1` | pgoutput protocol version. Only `1` is supported in this release. |
| `idle_timeout` | seconds | `30` | Stop the current fetch cycle after this long without a new replication message. |
| `max_messages` | usize? | `null` | Optional cap on change events per fetch call. The cap is checked **after each COMMIT**; a transaction larger than `max_messages` still emits atomically. |
| `max_staged_records` | usize? | `null` | Max change records buffered for a **single in-progress transaction** before the run aborts with a typed error. `null` = unbounded. A safety valve against OOM on huge bulk transactions — see [Transactional consistency](#transactional-consistency). |
| `status_update_interval` | seconds | `10` | Standby Status Update cadence. Must be `< idle_timeout` and well under the server's `wal_sender_timeout`. |
| `tcp_keepalive` | seconds | `60` | TCP keepalive on the replication connection. |
| `slot_acquire_retries` | u32 | `10` | Retries when the slot is still **active** (held by a not-yet-released prior connection) on a rapid restart. Both the pre-stream slot advance and `START_REPLICATION` are retried with exponential backoff (250 ms, doubling, capped at 4 s). `0` = fail fast. |
---
## Transactional consistency
The connector buffers each transaction in memory and only flushes its records
to the sink on `COMMIT`. Partial transactions (BEGIN seen, no COMMIT yet
within `idle_timeout` / `max_messages`) are dropped and redelivered after the
next `START_REPLICATION`. This makes the output transactionally consistent —
sinks never see half a transaction — at the cost of needing each
transaction to fit within one fetch cycle.
Because a transaction is buffered in full until its `COMMIT`, a single bulk
`UPDATE`/`DELETE`/`COPY` of millions of rows holds every decoded row in
memory at once. Set `max_staged_records` to bound that: when an in-progress
transaction exceeds it the run aborts with a typed `FaucetError::Source`
instead of risking an OOM-kill. Leave it `null` only if you are sure your
source transactions fit comfortably in memory.
---
## At-least-once semantics
- Postgres redelivers everything after the most recent `confirmed_flush_lsn`
on every `START_REPLICATION`, with no duplicates and no gap.
- `faucet-stream`'s pipeline only writes the new bookmark to the state store
**after the sink confirms** the batch was flushed.
- On the next run, `apply_start_bookmark` is called with that bookmark and
the connector advances `confirmed_flush_lsn` accordingly.
- **The advertised `confirmed_flush_lsn` is advanced _only_ from a durable
bookmark** (via `apply_start_bookmark`) — never from decoded WAL at
commit-decode time, and never from a keepalive's `wal_end`. This guarantees
Postgres is never told to recycle WAL for changes the consumer has not
durably persisted, so a crash can never lose committed data (it replays
instead).
- If the process crashes between sink flush and state-store write, the next
run will replay the most recent transaction. Sinks should be idempotent or
accept duplicates at transaction boundaries.
---
## Operational caveats
- **Slot bloat / WAL retention:** the connector advances
`confirmed_flush_lsn` only at the *start* of a run, from the durable
bookmark. Within a single long-running fetch cycle it does not advance it,
so Postgres retains all WAL produced during that cycle until the next run
resumes from the persisted bookmark. Without a state store it never
advances at all and WAL is retained indefinitely. Always configure a state
store in production, and run frequently enough (or keep fetch cycles short
enough) that WAL retention stays within your disk budget. A **permanent**
slot (the default) keeps pinning WAL even when no consumer is connected, so
decommission an unused pipeline by calling `PostgresCdcSource::drop_slot()`
(or set `slot_type: temporary` for ephemeral runs that should self-clean on
disconnect).
- **Encryption:** the replication connection is plaintext unless you set
`tls` to `require` / `verify_ca` / `verify_full` — credentials and WAL data
travel unencrypted under the default `disable`.
- **Heartbeats:** if the network is silent for longer than the server's
`wal_sender_timeout` (default 60 s), Postgres kills the replication
connection. The default `status_update_interval` of 10 s leaves ample
margin.
- **DDL is invisible.** Logical replication does not replicate schema
changes. After an `ALTER TABLE`, the next change event from that table
will arrive with a new `Relation` message — the connector picks it up
automatically.
- **Reconnection on drop:** v1 surfaces a connection drop as
`FaucetError::Source`. Re-run the pipeline to reconnect; the persisted
LSN bookmark ensures no data loss.
- **Slot creation timing:** the slot is created on the first fetch call.
Changes applied **before** the slot exists are not replicated. To avoid
losing initial changes, either create the slot before applying writes
(e.g. via `psql`) or do a warm-up fetch and then begin writes.
---
## Implementation notes
Built on top of the [`pgwire-replication`](https://crates.io/crates/pgwire-replication)
crate for the Postgres logical-replication wire protocol; the `pgoutput`
payload bytes are decoded by a hand-rolled decoder in this crate so that the
output record shape stays under our control.
---
## See also
- `crates/source/postgres/` — query-mode Postgres source (snapshots, not CDC).
- `crates/state/postgres/` — Postgres-backed `StateStore` you can pair with this source.
- `cli/examples/postgres_cdc_to_jsonl.yaml` — end-to-end demo configuration.