faucet-source-postgres-cdc 1.0.0

PostgreSQL logical replication (CDC) source for the faucet-stream ecosystem
Documentation
# faucet-source-postgres-cdc

PostgreSQL CDC (Change Data Capture) source for [`faucet-stream`](https://github.com/PawanSikawat/faucet-stream). Subscribes to a Postgres logical replication slot using the `pgoutput` plugin and emits each row-level change (INSERT / UPDATE / DELETE / TRUNCATE) as a JSON record. Replication position is persisted via any `faucet-core` `StateStore`, so pipelines resume exactly where they left off — no duplicates, no gap.

---

## Quick start

```yaml
# pipeline.yaml
version: 1
source:
  type: postgres-cdc
  config:
    connection_url: postgres://user:pass@localhost:5432/appdb
    slot_name: faucet_slot
    publication_name: faucet_pub
    create_slot_if_missing: true
    idle_timeout: 30
sink:
  type: jsonl
  config:
    path: ./changes.jsonl
state:
  type: file
  config: { path: ./state }
```

```bash
faucet run pipeline.yaml
```

---

## Postgres setup (one-time)

```sql
-- 1. Database needs logical replication enabled.
ALTER SYSTEM SET wal_level = 'logical';
ALTER SYSTEM SET max_replication_slots = 4;
ALTER SYSTEM SET max_wal_senders = 4;
-- restart Postgres after this

-- 2. The role that connects must have REPLICATION.
ALTER ROLE faucet_user WITH REPLICATION;

-- 3. Create a publication for the tables you want to capture.
CREATE PUBLICATION faucet_pub FOR TABLE public.users, public.orders;
-- or FOR ALL TABLES if you really want everything.

-- 4. (Optional but recommended) get a full pre-image on UPDATE/DELETE.
ALTER TABLE public.users REPLICA IDENTITY FULL;
```

The replication slot is created automatically on first run when
`create_slot_if_missing = true` (the default).

---

## Output record schema

Every change event is one JSON object:

```json
{
  "op":     "insert | update | delete | truncate",
  "schema": "public",
  "table":  "users",
  "lsn":    "0/16A4F88",
  "ts_ms":  1779019200000,
  "before": null | { ...row... },
  "after":  null | { ...row..., "__unchanged_toast__": ["col1", "col2"] }
}
```

- `lsn` is the `commit_lsn` of the enclosing transaction.
- `ts_ms` is Unix-epoch milliseconds, derived from the COMMIT's timestamp.
- `before` is populated on `delete` always, and on `update` only when the
  table is `REPLICA IDENTITY FULL`. Otherwise it is `null`.
- `after` is populated on `insert` and `update`, and `null` on `delete` /
  `truncate`.
- If any column arrived as "unchanged TOAST" (Postgres elides large
  out-of-line values whose stored copy was not rewritten), the column is
  dropped from `before` / `after` and its name is recorded in
  `before.__unchanged_toast__` / `after.__unchanged_toast__`.

`truncate` emits one record per truncated relation with `before = after = null`.

### Column type mapping

Values arrive in Postgres text form and are decoded by column type OID:

| Postgres type | JSON |
|---|---|
| `bool` | `true` / `false` |
| `int2` / `int4` / `int8` | number |
| `float4` / `float8` (finite) | number |
| `float4` / `float8` `NaN` / `±Infinity` | `"NaN"` / `"Infinity"` / `"-Infinity"` (string, to stay distinct from a SQL `NULL``null`) |
| `numeric` | string (exact precision) |
| `bytea` | base64 string |
| `json` / `jsonb` | parsed JSON value |
| one-dimensional arrays of the above scalar types (`int4[]`, `text[]`, `uuid[]`, …) | JSON array, decoded element-by-element (`NULL` elements → `null`) |
| `uuid`, `date`, `time`, `timestamp`, `timestamptz`, and anything else | string (raw Postgres text — stable under the default `DateStyle ISO`) |

Multi-dimensional arrays, ranges, composites, and enums fall back to the raw Postgres array/text string.

### Stream robustness

The decoder fails fast (`FaucetError::Source`, which the pipeline restarts from the durable bookmark) rather than silently dropping data on a protocol desync: a `COMMIT` without a `BEGIN`, a second `BEGIN` while a transaction is still staged, or a `ReplicationEvent` variant the decoder doesn't recognise. A relation whose column set changes mid-stream (an `ALTER TABLE`) is logged at `warn` — subsequent rows decode against the new descriptor.

---

## Configuration

| Field                       | Type     | Default | Description |
|-----------------------------|----------|---------|-------------|
| `connection_url`            | string   || Postgres connection URL. |
| `slot_name`                 | string   || Logical replication slot. Must match `[a-z0-9_]{1,63}`. |
| `publication_name`          | string   || Publication that selects which tables are replicated. |
| `create_slot_if_missing`    | bool     | `true`  | Create the slot on first run if it does not exist. |
| `slot_type`                 | enum     | `permanent` | `permanent` (survives disconnect, pins WAL until consumed/dropped) or `temporary` (auto-dropped on disconnect; resets resume position, so not for cross-run bookmark resume). Permanent-slot creation logs a loud warning. Drop an unused permanent slot via `PostgresCdcSource::drop_slot()`. |
| `tls`                       | object   | `{mode: disable}` | Replication-connection TLS: `{mode: disable}` (plaintext, default), `{mode: require}`, `{mode: verify_ca, ca_path?}`, or `{mode: verify_full, ca_path?}`. Use `require`/`verify_*` in production — `disable` sends credentials and WAL in plaintext. |
| `start_lsn`                 | string?  | `null`  | One-time override (e.g. `"0/16A4F88"`); ignored if a state-store bookmark exists. |
| `proto_version`             | u32      | `1`     | pgoutput protocol version. Only `1` is supported in this release. |
| `idle_timeout`              | seconds  | `30`    | Stop the current fetch cycle after this long without a new replication message. |
| `max_messages`              | usize?   | `null`  | Optional cap on change events per fetch call. The cap is checked **after each COMMIT**; a transaction larger than `max_messages` still emits atomically. |
| `max_staged_records`        | usize?   | `null`  | Max change records buffered for a **single in-progress transaction** before the run aborts with a typed error. `null` = unbounded. A safety valve against OOM on huge bulk transactions — see [Transactional consistency]#transactional-consistency. |
| `status_update_interval`    | seconds  | `10`    | Standby Status Update cadence. Must be `< idle_timeout` and well under the server's `wal_sender_timeout`. |
| `tcp_keepalive`             | seconds  | `60`    | TCP keepalive on the replication connection. |
| `slot_acquire_retries`      | u32      | `10`    | Retries when the slot is still **active** (held by a not-yet-released prior connection) on a rapid restart. Both the pre-stream slot advance and `START_REPLICATION` are retried with exponential backoff (250 ms, doubling, capped at 4 s). `0` = fail fast. |

---

## Transactional consistency

The connector buffers each transaction in memory and only flushes its records
to the sink on `COMMIT`. Partial transactions (BEGIN seen, no COMMIT yet
within `idle_timeout` / `max_messages`) are dropped and redelivered after the
next `START_REPLICATION`. This makes the output transactionally consistent —
sinks never see half a transaction — at the cost of needing each
transaction to fit within one fetch cycle.

Because a transaction is buffered in full until its `COMMIT`, a single bulk
`UPDATE`/`DELETE`/`COPY` of millions of rows holds every decoded row in
memory at once. Set `max_staged_records` to bound that: when an in-progress
transaction exceeds it the run aborts with a typed `FaucetError::Source`
instead of risking an OOM-kill. Leave it `null` only if you are sure your
source transactions fit comfortably in memory.

---

## At-least-once semantics

- Postgres redelivers everything after the most recent `confirmed_flush_lsn`
  on every `START_REPLICATION`, with no duplicates and no gap.
- `faucet-stream`'s pipeline only writes the new bookmark to the state store
  **after the sink confirms** the batch was flushed.
- On the next run, `apply_start_bookmark` is called with that bookmark and
  the connector advances `confirmed_flush_lsn` accordingly.
- **The advertised `confirmed_flush_lsn` is advanced _only_ from a durable
  bookmark** (via `apply_start_bookmark`) — never from decoded WAL at
  commit-decode time, and never from a keepalive's `wal_end`. This guarantees
  Postgres is never told to recycle WAL for changes the consumer has not
  durably persisted, so a crash can never lose committed data (it replays
  instead).
- If the process crashes between sink flush and state-store write, the next
  run will replay the most recent transaction. Sinks should be idempotent or
  accept duplicates at transaction boundaries.

---

## Operational caveats

- **Slot bloat / WAL retention:** the connector advances
  `confirmed_flush_lsn` only at the *start* of a run, from the durable
  bookmark. Within a single long-running fetch cycle it does not advance it,
  so Postgres retains all WAL produced during that cycle until the next run
  resumes from the persisted bookmark. Without a state store it never
  advances at all and WAL is retained indefinitely. Always configure a state
  store in production, and run frequently enough (or keep fetch cycles short
  enough) that WAL retention stays within your disk budget. A **permanent**
  slot (the default) keeps pinning WAL even when no consumer is connected, so
  decommission an unused pipeline by calling `PostgresCdcSource::drop_slot()`
  (or set `slot_type: temporary` for ephemeral runs that should self-clean on
  disconnect).
- **Encryption:** the replication connection is plaintext unless you set
  `tls` to `require` / `verify_ca` / `verify_full` — credentials and WAL data
  travel unencrypted under the default `disable`.
- **Heartbeats:** if the network is silent for longer than the server's
  `wal_sender_timeout` (default 60 s), Postgres kills the replication
  connection. The default `status_update_interval` of 10 s leaves ample
  margin.
- **DDL is invisible.** Logical replication does not replicate schema
  changes. After an `ALTER TABLE`, the next change event from that table
  will arrive with a new `Relation` message — the connector picks it up
  automatically.
- **Reconnection on drop:** v1 surfaces a connection drop as
  `FaucetError::Source`. Re-run the pipeline to reconnect; the persisted
  LSN bookmark ensures no data loss.
- **Slot creation timing:** the slot is created on the first fetch call.
  Changes applied **before** the slot exists are not replicated. To avoid
  losing initial changes, either create the slot before applying writes
  (e.g. via `psql`) or do a warm-up fetch and then begin writes.

---

## Implementation notes

Built on top of the [`pgwire-replication`](https://crates.io/crates/pgwire-replication)
crate for the Postgres logical-replication wire protocol; the `pgoutput`
payload bytes are decoded by a hand-rolled decoder in this crate so that the
output record shape stays under our control.

---

## See also

- `crates/source/postgres/` — query-mode Postgres source (snapshots, not CDC).
- `crates/state/postgres/` — Postgres-backed `StateStore` you can pair with this source.
- `cli/examples/postgres_cdc_to_jsonl.yaml` — end-to-end demo configuration.