Skip to main content

Module replication

Module replication 

Source
Available on crate features source-postgres-cdc and source-rest only.
Expand description

Low-level replication-connection wrapper.

This module wraps pgwire_replication to provide the slot lifecycle (ensure_slot) and streaming helpers (start_replication, recv, send_status_update) used by the rest of the CDC source.

§Design

pgwire_replication handles everything from TCP connect through auth, START_REPLICATION, keepalive replies, and StandbyStatusUpdate — all internally. The library delivers events as a typed enum; recv surfaces the full ReplicationEvent to callers (absorbing only KeepAlive and StoppedAt internally) so Tasks 9+ can observe transaction boundaries.

Slot creation (CREATE_REPLICATION_SLOT) is a control-plane operation that requires an ordinary (non-replication) SQL connection, so ensure_slot uses sqlx for that single query.

§Type aliases

The plan requires stable names Client and Duplex so that Tasks 9+ can refer to concrete types. We define:

  • Client — a lightweight holder of ReplicationParams used to verify connectivity and create the replication slot, before the stream is opened.
  • Duplex — the live replication stream; a thin wrapper around pgwire_replication::ReplicationClient.

Structs§

Client
Pre-stream handle returned by connect once the connection URL has been validated. The actual connection is opened by ensure_slot / start_replication from the borrowed ReplicationParams, so this is a lightweight marker — it deliberately holds no owned copy of the connection string, which previously sat in leaked (Box::leak) heap for the process lifetime, including the password (#78/#13).
Duplex
Live replication stream. Wraps pgwire_replication::ReplicationClient. Obtained from start_replication.
ReplicationParams
All parameters required to establish a logical replication connection.

Enums§

ReplicationEvent
Events emitted by the replication worker.

Constants§

POSTGRES_EPOCH_MICROS
Microseconds between the Unix epoch (1970-01-01) and the Postgres epoch (2000-01-01). Used for converting between Postgres timestamps and Unix time.

Functions§

advance_slot
Advance the slot’s confirmed_flush_lsn to lsn via a control-plane SQL call (pg_replication_slot_advance), before the replication stream is opened.
connect
Validate connectivity and return a Client handle.
drop_slot
Drop a logical replication slot via a control-plane SQL call (pg_drop_replication_slot). A missing slot is treated as success (no-op); an active slot (currently in use by another connection) surfaces an error.
ensure_slot
Ensure the replication slot exists.
is_slot_active_error
Returns true if err is Postgres reporting that the replication slot is still active — held by a backend that has not yet released it. Postgres raises “replication slot "…" is active for PID …” (SQLSTATE 55006).
postgres_clock_now
Current time as a Postgres-epoch timestamp (µs since 2000-01-01 UTC).
postgres_clock_to_unix_ms
Convert a Postgres-epoch timestamp (µs since 2000-01-01) to Unix milliseconds (ms since 1970-01-01).
recv
Receive the next meaningful replication event from the server.
retry_on_slot_active
Run op, retrying up to max_retries times with exponential backoff while it fails because the replication slot is still active (#146 M12). Any other error — and the final attempt’s error after exhausting retries — is returned immediately. max_retries = 0 preserves the previous fail-fast behaviour.
send_status_update
Report progress to the server (Standby Status Update).
start_replication
Open a logical replication stream and return a Duplex handle.