source-postgres-cdc and source-rest only.Expand description
Low-level replication-connection wrapper.
This module wraps pgwire_replication to provide the slot lifecycle
(ensure_slot) and streaming helpers (start_replication, recv,
send_status_update) used by the rest of the CDC source.
§Design
pgwire_replication handles everything from TCP connect through auth,
START_REPLICATION, keepalive replies, and StandbyStatusUpdate — all
internally. The library delivers events as a typed enum; recv surfaces
the full ReplicationEvent to callers (absorbing only KeepAlive and
StoppedAt internally) so Tasks 9+ can observe transaction boundaries.
Slot creation (CREATE_REPLICATION_SLOT) is a control-plane operation that
requires an ordinary (non-replication) SQL connection, so ensure_slot
uses sqlx for that single query.
§Type aliases
The plan requires stable names Client and Duplex so that Tasks 9+ can
refer to concrete types. We define:
Client— a lightweight holder ofReplicationParamsused to verify connectivity and create the replication slot, before the stream is opened.Duplex— the live replication stream; a thin wrapper aroundpgwire_replication::ReplicationClient.
Structs§
- Client
- Pre-stream handle returned by
connectonce the connection URL has been validated. The actual connection is opened byensure_slot/start_replicationfrom the borrowedReplicationParams, so this is a lightweight marker — it deliberately holds no owned copy of the connection string, which previously sat in leaked (Box::leak) heap for the process lifetime, including the password (#78/#13). - Duplex
- Live replication stream. Wraps
pgwire_replication::ReplicationClient. Obtained fromstart_replication. - Replication
Params - All parameters required to establish a logical replication connection.
Enums§
- Replication
Event - Events emitted by the replication worker.
Constants§
- POSTGRES_
EPOCH_ MICROS - Microseconds between the Unix epoch (1970-01-01) and the Postgres epoch (2000-01-01). Used for converting between Postgres timestamps and Unix time.
Functions§
- advance_
slot - Advance the slot’s
confirmed_flush_lsntolsnvia a control-plane SQL call (pg_replication_slot_advance), before the replication stream is opened. - connect
- Validate connectivity and return a
Clienthandle. - drop_
slot - Drop a logical replication slot via a control-plane SQL call
(
pg_drop_replication_slot). A missing slot is treated as success (no-op); an active slot (currently in use by another connection) surfaces an error. - ensure_
slot - Ensure the replication slot exists.
- is_
slot_ active_ error - Returns
trueiferris Postgres reporting that the replication slot is still active — held by a backend that has not yet released it. Postgres raises “replication slot "…" is active for PID …” (SQLSTATE55006). - postgres_
clock_ now - Current time as a Postgres-epoch timestamp (µs since 2000-01-01 UTC).
- postgres_
clock_ to_ unix_ ms - Convert a Postgres-epoch timestamp (µs since 2000-01-01) to Unix milliseconds (ms since 1970-01-01).
- recv
- Receive the next meaningful replication event from the server.
- retry_
on_ slot_ active - Run
op, retrying up tomax_retriestimes with exponential backoff while it fails because the replication slot is still active (#146 M12). Any other error — and the final attempt’s error after exhausting retries — is returned immediately.max_retries = 0preserves the previous fail-fast behaviour. - send_
status_ update - Report progress to the server (Standby Status Update).
- start_
replication - Open a logical replication stream and return a
Duplexhandle.