rivet-cli 0.16.3

Rivet: PostgreSQL/MySQL/SQL Server → Parquet/CSV (local, S3, GCS, Azure). Crate name rivet-cli; binary rivet.
Documentation
# CDC Reference

Rivet's change-data-capture (CDC) reads a source's **transaction log** — not the
tables — and emits each `INSERT` / `UPDATE` / `DELETE` as a row change. Because it
tails the log that the database already writes for replication and durability, it
adds almost no load to the OLTP path: no table scan, no locks, no read snapshot
(see [Why CDC is gentle on the source](#why-cdc-is-gentle-on-the-source)).

> **Status.** All three engines support NDJSON streaming and typed Parquet/CSV
> `--output` (real `Timestamp` / `Date32` / `Decimal128` columns). This page
> documents all three so the **permissions** are clear up front — they are the
> part operators most need to get right.

## The command

```bash
# stream changes as NDJSON to stdout (no schema resolution, fewest privileges)
rivet cdc --source 'mysql://rivet_cdc:***@db:3306/app' --table orders

# write typed Parquet files (one row per change, after-image / upsert shape)
rivet cdc --source 'mysql://rivet_cdc:***@db:3306/app' \
          --table orders --output ./cdc-out --format parquet \
          --checkpoint ./orders.ckpt --rollover 10000
```

Prefer `--source-env VAR` or `--source-file path` over an inline URL outside local
dev — the URL is otherwise visible in `ps` / shell history.

| flag | meaning |
| ------ | --------- |
| `--server-id` | replica id for the binlog connection (MySQL). **Must be unique** — distinct from the source and every real replica. Default `4271`. |
| `--checkpoint PATH` | persist/resume the log position. Omit to tail from the current position without checkpointing. |
| `--table NAME` | only emit this table (repeatable for NDJSON; **exactly one** required for `--output`, whose schema is resolved from the source). |
| `--output DIR` | write typed Parquet/CSV files instead of NDJSON. |
| `--max-events N` | stop after N changes (otherwise stream until interrupted; the per-event checkpoint makes an interrupted run resumable). |
| `--slot NAME` | PostgreSQL logical slot (default `rivet_slot`; created if absent). |
| `--capture-instance NAME` | SQL Server CDC capture instance (e.g. `dbo_orders`) — required for `sqlserver://`. |
| `--until-current` | Catch up to the source's current log end, then **exit** instead of streaming. For MySQL this sets `BINLOG_DUMP_NON_BLOCK`; PostgreSQL / SQL Server already drain-and-exit. With `--max-events N`, the run stops at the smaller of "N events" or "end of log" — so it never blocks waiting for the N-th event. Ideal for a scheduler. |

The engine is chosen from the URL scheme (`mysql://` / `postgresql://` /
`sqlserver://`) by `create_change_stream`, the CDC sibling of the batch
`create_source`. With `--output`, each part is uploaded through the same commit
path the batch export uses (destination + content-MD5 + transit-integrity check,
ADR-0004) and a `manifest.json` + `_SUCCESS` is written at clean end — so a
`--output gs://…` / `s3://…` works via the same `DestinationConfig`. Typed columns
(real `Timestamp` / `Date32` / `Decimal128`, not strings) flow through `RivetValue`
structural typing — for all three engines (MySQL binlog values, PostgreSQL
test_decoding parse, SQL Server change-table `ColumnData`).

## From config (`rivet run`)

CDC also runs as an export in a config, so a scheduled `rivet run` captures
changes alongside batch exports and records the run the same way:

```yaml
source:
  type: mysql
  url_env: DATABASE_URL          # credentials out of the file
  tls: { mode: verify-full }     # required for a remote host (see below)
exports:
  - name: orders_cdc
    table: orders
    mode: cdc
    format: parquet
    cdc:
      checkpoint: /var/lib/rivet/orders.ckpt
      until_current: true        # drain to now and exit — for a scheduler
      # per-engine, all optional:
      server_id: 4271            # MySQL replica id
      slot: rivet_orders         # PostgreSQL logical slot
      capture_instance: dbo_orders  # SQL Server (required for sqlserver://)
    destination: { type: gcs, bucket: my-bucket, prefix: cdc/orders }
```

```bash
rivet run --config cdc.yaml      # captures, writes typed Parquet, records the run
rivet metrics                    # the CDC run appears with mode=cdc, like a batch
```

A `mode: cdc` export reuses the export's `table`, `destination`, and `format`; the
`cdc:` block carries only the CDC-specific knobs. Each run produces the standard
per-export summary block and an `export_metrics` row (rows / files / bytes /
duration / status), so CDC shows up in `rivet metrics` and the run aggregate
exactly like a batch export. **TLS:** unlike the CLI (which is loopback-only),
the config path passes `source.tls` to the change stream — so a remote source over
TLS requires the `tls:` block, and a remote host without it is refused before any
connection (the same gate the batch path uses).

## The three models

Rivet normalises three different source mechanisms behind one `ChangeStream`:

| engine | mechanism | model |
| -------- | ----------- | ------- |
| **MySQL** | binlog (ROW) streamed as a replica | push — the client reads the log directly |
| **PostgreSQL** | logical replication slot (`test_decoding`) | poll the slot via `pg_logical_slot_get_changes()` |
| **SQL Server** | `cdc.*` change tables the capture Agent extracts | poll the change function by LSN window |

MySQL and PostgreSQL expose the log to the client; SQL Server does not — there a
server-side Agent extracts the log into change tables that rivet polls.

---

## Permissions & prerequisites

### MySQL — the binlog grants

Rivet registers as a **replica** and streams the binlog. Least privilege:

```sql
CREATE USER 'rivet_cdc'@'%' IDENTIFIED BY '***';

-- read the binlog stream (register as replica, COM_BINLOG_DUMP).
-- Server-wide: REPLICATION SLAVE cannot be scoped to a database/table.
GRANT REPLICATION SLAVE  ON *.* TO 'rivet_cdc'@'%';

-- read the current binlog coordinate (SHOW MASTER STATUS) when starting
-- without a checkpoint.
GRANT REPLICATION CLIENT ON *.* TO 'rivet_cdc'@'%';

-- ONLY for `--output`: rivet resolves the table's column types with
-- `SELECT * FROM <table> LIMIT 0` (metadata only, no rows). Not needed for NDJSON.
GRANT SELECT ON `app`.`orders` TO 'rivet_cdc'@'%';

FLUSH PRIVILEGES;
```

Server configuration (`my.cnf` `[mysqld]`, or `SET GLOBAL` + restart where
allowed):

```ini
log_bin           = ON       # binary logging on (often already on for replication/PITR)
binlog_format     = ROW      # rivet needs row images, not statements — MIXED/STATEMENT will not work
binlog_row_image  = FULL     # full before/after image — REQUIRED for the after-image / MERGE shape;
                             # MINIMAL drops unchanged columns and breaks "overwrite all columns"
server_id         = 1        # any unique id for the source; rivet uses a DIFFERENT --server-id
```

Notes:

- **`REPLICATION SLAVE` is server-wide by design.** You cannot grant binlog
  access for one database only — the binlog is a single server-wide stream. Scope
  data exposure with `--table` (rivet filters client-side) and the `SELECT` grant.
- **A stale `server_id` collision silently kills the stream.** Give rivet a
  `--server-id` no other replica uses.
- **Connect CDC *directly* to MySQL — not through ProxySQL / MaxScale.** The binlog
  stream is `COM_BINLOG_DUMP`, a replication protocol query proxies don't carry; the
  batch path can go through a pooler, CDC cannot. Rivet probes the connection and
  fails fast with this exact reason if it sees a proxy, so point the source at the
  MySQL host (the replication endpoint), not the proxy port.
- `binlog_row_image = FULL` is MySQL's default; the risk is a source that has set
  it to `MINIMAL` to shrink the binlog — that path needs the column-mask MERGE,
  not the simple overwrite (see [Output shape]#output-shape).

### PostgreSQL — the logical slot

Rivet's PostgreSQL reader consumes a logical slot through the **normal SQL
connection** (`pg_logical_slot_get_changes()`), not the streaming-replication
protocol. That changes what you must grant:

```sql
-- REPLICATION attribute: required to create and read a logical slot, even via
-- the SQL functions (pg_create_logical_replication_slot / _get_changes).
ALTER ROLE rivet_cdc WITH LOGIN REPLICATION PASSWORD '***';

-- ONLY for `--output`: schema resolution (SELECT ... LIMIT 0).
GRANT SELECT ON app.orders TO rivet_cdc;
```

Server configuration (`postgresql.conf`, needs a **restart**):

```ini
wal_level            = logical   # log enough to decode row changes (default is 'replica')
max_replication_slots = 10       # >= 1 (defaults are usually fine)
max_wal_senders       = 10       # >= 1
```

Notes:

- **No `pg_hba.conf` `replication` line is required.** That entry is for the
  streaming walsender protocol; rivet's poll model uses an ordinary connection, so
  the normal `host app rivet_cdc ...` rule suffices. (This is the main way the
  poll model is operationally lighter than streaming CDC tools.)
- **A logical slot pins WAL** until it is consumed/advanced. An abandoned slot
  prevents WAL recycling and **fills the disk** — the number-one PostgreSQL CDC
  foot-gun. Drop unused slots with
  `SELECT pg_drop_replication_slot('rivet_slot');`.
- `wal2json` / `pgoutput` are alternatives to `test_decoding`; `test_decoding` is
  always built in and needs no extension.

### SQL Server — CDC change tables

SQL Server has **no client-streamable log**. A server-side Agent job extracts the
log into `cdc.*` change tables, which rivet polls. Two distinct privilege levels:

```sql
-- ONE-TIME ENABLE (requires sysadmin or db_owner):
EXEC sys.sp_cdc_enable_db;                         -- creates the cdc schema + capture job
EXEC sys.sp_cdc_enable_table
     @source_schema = N'dbo', @source_name = N'orders',
     @role_name = N'cdc_reader',                   -- gating role for readers (or NULL = no gate)
     @capture_instance = N'dbo_orders',
     @supports_net_changes = 0;

-- RUNTIME READER (what rivet connects as — least privilege):
CREATE USER rivet_cdc FOR LOGIN rivet_cdc;
GRANT SELECT ON SCHEMA::cdc TO rivet_cdc;          -- read the change tables + functions
ALTER ROLE cdc_reader ADD MEMBER rivet_cdc;        -- if a gating role was set above
```

Notes:

- **SQL Server Agent must be running.** The capture job (default ~5 s scan cycle)
  is what populates the change tables. If the Agent stops, the change tables
  silently freeze **and the transaction log can't truncate** — disk pressure. A
  production reader should watch for a non-advancing `sys.fn_cdc_get_max_lsn()`,
  not read "no rows" as "no changes".
- **Edition gate:** CDC is on Enterprise / Standard / Developer — **not Express or
  Web**. On Express, use Change Tracking instead (different, lighter, but only
  tells you *which* rows changed, not the data).
- Enabling CDC needs `sysadmin`/`db_owner`; the **runtime reader needs only the
  `SELECT` grant** above.
- **Retention:** the cleanup job keeps ~3 days by default. If rivet is offline
  longer than retention, the saved LSN falls below `sys.fn_cdc_get_min_lsn()` and
  the read errors — fall back to a full re-snapshot.

---

## Reading from a replica (no primary access)

A common real-world constraint: you're handed a database but **only a read
replica**, never the master. Rivet reads the **log of whatever host you point
`source.url` at** — it never needs the primary *specifically*. Whether a replica
can serve that log is an engine + replica-config question, not a rivet limitation:

| engine | from a replica? | what the replica needs |
| --- | --- | --- |
| **MySQL** | ✅ yes | `log_bin = ON` **and `log_replica_updates = ON`** (`log_slave_updates` pre-8.0) so the replica re-logs replicated changes into its *own* binlog — this is **off by default**: a replica applies changes but does not re-log them without it. Plus the `REPLICATION SLAVE` / `REPLICATION CLIENT` grant and a `server_id` distinct from both the primary and the replica. |
| **PostgreSQL** | ⚠️ PG **16+** only | Logical decoding on a standby is a PostgreSQL 16 feature. On < 16 a logical slot can only be created on the primary — rivet's slot creation fails (PostgreSQL refuses logical decoding *"while in recovery"*). On 16+, set `hot_standby_feedback = on` on the standby (or a physical `primary_slot_name`) so the primary doesn't recycle WAL the standby's slot still needs. |
| **SQL Server** | ✅ yes (readable secondary) | CDC is enabled on the **primary**; the `cdc.*` change tables are ordinary tables in the database, so they **replicate to an Always On readable secondary**. Point rivet at the secondary — the reads are plain `SELECT`s on the replicated change tables. The capture job still runs on the primary; LSNs are consistent across the availability group. |

**MySQL caveat — the checkpoint is replica-local.** Rivet resumes by binlog
`{file, pos}` (not GTID), and a replica's binlog coordinates are its *own*, not the
primary's. A checkpoint taken against one replica does **not** transfer to another
host: if you fail over (to a different replica, or to the primary), re-snapshot
(`mode: full`) and restart CDC from a fresh checkpoint there.

So the answer to "can I read the log from a slave?" is **yes for MySQL (with
`log_replica_updates`) and SQL Server (readable secondary), and PostgreSQL 16+** —
point `source.url` at the replica; everything else (grants, `mode: cdc`, output) is
identical to running against a primary.

---

## Output shape

`--output` writes one row per change in the **typed after-image (upsert)** shape:

```
__op     __pos                              id   name    amount
insert   {"file":"binlog.000046","pos":681} 1    alice   100
update   {"file":"binlog.000046","pos":682} 1    alice   150
delete   {"file":"binlog.000046","pos":683} 2    bob     200
```

- `__op``insert` / `update` / `delete`.
- `__pos` — the engine resume position (the same value rivet checkpoints).
- the source columns, **typed** (resolved from the source schema), carrying the
  **after-image** for insert/update and the **key (before-image)** for delete.

Downstream applies it by primary key:

```sql
MERGE target t USING staged s ON t.id = s.id
WHEN MATCHED AND s.__op = 'delete' THEN DELETE
WHEN MATCHED                       THEN UPDATE SET t.* = s.*   -- overwrite all columns
WHEN NOT MATCHED AND s.__op <> 'delete' THEN INSERT (...);
```

With a full row image, *which* columns changed is irrelevant — the latest image
per key (highest `__pos`) already contains every prior change, so dedup-by-key +
overwrite is correct. This is why `binlog_row_image = FULL` matters.

### Downstream loading

CDC output is the **same typed Parquet** the batch export writes (same
`build_arrow_field` pipeline), so the warehouse-loading recipes apply unchanged —
the engine-specific `MERGE` and the JSON-as-`BYTES` / naive-timestamp autoload
recovery are in
[recipes/idempotent-warehouse-load.md](../recipes/idempotent-warehouse-load.md)
(BigQuery) and [recipes/snowflake-load.md](../recipes/snowflake-load.md), keyed on
the PK + `__op` above.

Verified cross-engine on a CDC part: **DuckDB** reads `json` natively, **ClickHouse**
as `String` (`JSONExtract*` parses it), **BigQuery** as `BYTES` (`PARSE_JSON` after
`SAFE_CONVERT_BYTES_TO_STRING`); integers keep their width (`INT32`/`INT64`) and
timestamps their microseconds. The JSON text round-trips losslessly in all three — the
type that auto-detects differs, the data does not.

Without `--output`, rivet emits the same information as NDJSON (one JSON object
per change) to stdout.

## Why CDC is gentle on the source

| | batch (`SELECT`) | CDC (log) |
| --- | --- | --- |
| touches | the **table** | the **log only** |
| locks / read snapshot | yes | **no** |
| buffer-pool eviction | yes (scans cold pages) | **no** |
| cost scales with | **table size** (re-scan) | **change rate** (deltas) |
| when it costs | actively, every run | latently (log retention / disk) |

The log is written anyway (WAL for durability, binlog for replication/PITR), so on
MySQL/PostgreSQL CDC mostly *reads what already exists* — near-zero incremental
OLTP cost. The one real CDC cost is **disk via log retention** if the consumer
lags (PG slots pin WAL; MySQL keeps binlog until read). SQL Server is the
exception: its Agent **writes** changes into change tables (extra write volume +
storage), so CDC there trades read-contention for an ongoing write/storage
overhead.

## Failure modes & recovery

Every CDC run is **bounded and resumable**, and the durable sequence is
flush → checkpoint → ack: the resume position only advances **after** the part is
durably written. So on *any* failure — a dropped connection, a query error, a full
source disk — the run **fails loudly** (non-zero exit, with the per-engine setup
hint), the checkpoint/slot is **not advanced**, and the **next run re-reads** from
the last good position. Rivet never silently loses a change; the trade-off is
at-least-once, so a failed run's already-uploaded parts can reappear — **dedupe
downstream by primary key + `__op`** (the output is the upsert / after-image shape).

A failed run leaves its durable parts in the destination but **no `manifest.json` /
`_SUCCESS`** — that pair marks a *clean* end, so a missing `_SUCCESS` is how you (and
`rivet validate`) tell a partial run from a complete one.

### PostgreSQL — the slot fills / the source disk fills

A logical slot pins WAL until rivet advances it (`confirmed_flush_lsn`). The
behaviour depends on whether rivet is **running**:

- **Running + advancing** — each successful run reads the changes, writes them
  durably, then advances the slot, so PostgreSQL **releases the WAL** up to that
  point. The slot only ever holds the WAL *since the last advance* — it does not
  grow unbounded while rivet keeps the slot moving.
- **Stopped (abandoned slot)** — rivet does nothing (it isn't running); the slot
  keeps pinning WAL and the **source disk fills**. This is the number-one
  PostgreSQL CDC foot-gun, and it is *operator* responsibility:
  `SELECT pg_drop_replication_slot('rivet_slot');` when you stop capturing for good.
- **Source disk already full** — run rivet (it *reads* WAL to advance the slot,
  which **releases** WAL and relieves the pressure) or drop the slot. If PostgreSQL
  is too degraded to answer, rivet's query fails → the run fails → re-read next run.

**Bound the blast radius:** set `max_slot_wal_keep_size` (PG 13+). PostgreSQL then
**invalidates the slot** rather than fill the disk; rivet's next run fails with a
slot-invalidated error and you re-snapshot. **Monitor** `pg_replication_slots`
(`active`, and `restart_lsn` vs the current LSN = how much WAL the slot is holding).

### MySQL — the binlog was purged

If rivet is offline long enough that the saved binlog position is **purged**
(`binlog_expire_logs_seconds` / `PURGE BINARY LOGS`), the resume read fails with
MySQL **ERROR 1236** (the requested binlog file is gone). The position is
unrecoverable — **re-snapshot** (`mode: full`) and restart CDC from a fresh
checkpoint. Size binlog retention comfortably above your CDC cadence.

### SQL Server — the checkpoint fell below retention

If the saved LSN falls **below** `sys.fn_cdc_get_min_lsn()` (the cleanup job — ~3
days by default — removed the changes after it), rivet **fails loudly** — *"the
resume position is older than the change-table retention … re-snapshot"* — rather
than resume from the new min and **silently skip the gap**. Re-snapshot and restart
from a fresh checkpoint. Also watch for a **non-advancing `sys.fn_cdc_get_max_lsn()`**:
that means the **Agent capture job stopped**, so the change tables are frozen — read
"no rows" as "the job is down", not "no changes".

### Recovery, in one line

Re-run to resume from the last checkpoint (the common case). If the run reports the
position is unrecoverable (PostgreSQL slot invalidated, MySQL binlog purged, SQL
Server retention exceeded), **re-snapshot the table with `mode: full` and restart
CDC from a new checkpoint** — the only safe recovery once the source log no longer
covers the gap.

## Limitations (current)

Typed output (real `Timestamp`/`Date32`/`Decimal128`), commit-boundary
checkpointing, cloud destinations + `manifest.json`/`_SUCCESS`, and the
config-driven `rivet run` path with a recorded run are all in place for all three
engines. What remains:

- **Continuous capture for PostgreSQL / SQL Server** is poll-once-and-exit (they
  drain their backlog and stop); a long-running daemon reconstructs the stream each
  cycle. The supported continuous model today is a scheduler running
  `--until-current` (or `rivet run` with `cdc.until_current`) on an interval, each
  run resuming from the checkpoint. MySQL streams continuously without `--until-current`.
- **Schema drift:** the sink schema is frozen at the first flush — a column added
  mid-run is not picked up until the next run re-resolves the table.
- **No lag metric:** the run records rows / files / bytes / duration / status, but
  not replication lag ("how far behind the source is") — the next observability step.
- **Pre-image completeness** depends on the source config: full UPDATE/DELETE
  before-images need `binlog_row_image=FULL` (MySQL) / `REPLICA IDENTITY FULL`
  (PostgreSQL); otherwise only key columns are carried.