pub struct PostgresCdcSourceConfig {Show 15 fields
pub connection_url: String,
pub slot_name: String,
pub publication_name: String,
pub create_slot_if_missing: bool,
pub slot_type: SlotType,
pub tls: CdcTls,
pub start_lsn: Option<String>,
pub proto_version: u32,
pub idle_timeout: Duration,
pub max_messages: Option<usize>,
pub max_staged_records: Option<usize>,
pub status_update_interval: Duration,
pub tcp_keepalive: Duration,
pub batch_size: usize,
pub slot_acquire_retries: u32,
}source-postgres-cdc and source-rest only.Expand description
Configuration for PostgresCdcSource.
Fields§
§connection_url: StringConnection URL pointing at the database whose WAL we want to read.
The crate internally upgrades the connection to replication=database
— callers do not need to add it themselves.
slot_name: StringLogical replication slot name. Must match the Postgres naming rules: 1–63 chars, lowercase letters / digits / underscores only.
publication_name: StringPublication name on the server. Must already exist (faucet does not create publications — they’re a DBA-level concern that determines which tables are replicated).
create_slot_if_missing: boolIf the slot does not exist, create it as a logical/pgoutput slot
at connection time. Default: true.
slot_type: SlotTypeWhether a newly-created slot is permanent (survives disconnect) or
temporary (auto-dropped when the replication connection closes).
Default permanent (back-compatible). A permanent slot pins WAL on
the server until it is consumed or dropped — an abandoned permanent
slot fills pg_wal and can take the whole instance down. Use
temporary for ephemeral / test runs (note: a temporary slot resets on
reconnect, so bookmark-based resume across runs requires a permanent
slot). Drop an unused permanent slot explicitly with
PostgresCdcSource::drop_slot.
tls: CdcTlsTLS settings for the replication connection. Default disable
(plaintext) for back-compatibility, but credentials and all WAL data
then travel unencrypted — set require/verify_ca/verify_full in
production.
start_lsn: Option<String>Optional starting LSN override (e.g. "0/16A4F88"). Ignored when a
state-store-managed bookmark is present (that bookmark wins).
When neither is set, replication starts from the slot’s
confirmed_flush_lsn.
proto_version: u32pgoutput protocol version. Only 1 is fully exercised in v1; 2 is
accepted but streaming-transaction messages (S/E/c/A) are not yet
decoded. Default: 1.
idle_timeout: DurationMaximum time to wait for new replication messages before returning the current batch. Default: 30 s.
max_messages: Option<usize>Optional cap on the number of change events drained per fetch call.
Acts as a safety bound — idle_timeout is the primary terminator.
Note: the cap is checked after each COMMIT, never mid-
transaction. A single transaction larger than max_messages will
still be emitted atomically (the fetch returns only after that
transaction’s COMMIT and may produce more records than max_messages).
To bound the memory a single in-progress transaction can consume,
use max_staged_records instead.
max_staged_records: Option<usize>Maximum number of change records buffered in memory for a single in-progress transaction before it is aborted.
Logical replication requires a transaction to be buffered until its
COMMIT so it can be emitted atomically (partial transactions must
never leak downstream). A single bulk UPDATE/DELETE/COPY of
millions of rows therefore buffers every decoded row as a
serde_json::Value in RAM, which can OOM the process. This bound is a
safety valve: when an in-progress transaction’s staged record count
exceeds it, the source aborts with a typed
FaucetError::Source rather than
being OOM-killed.
None (the default) means unbounded — atomic delivery of arbitrarily
large transactions at the cost of unbounded memory. Set a value sized
to your available memory if you replicate tables subject to large
bulk writes.
status_update_interval: DurationInterval at which Standby Status Update keepalives are sent to the
server. Must be shorter than idle_timeout and well under the
server’s wal_sender_timeout (default 60 s). Default: 10 s.
tcp_keepalive: DurationTCP keepalive for the replication connection. Default: 60 s.
batch_size: usizeAdvisory page size for
Source::stream_pages. The CDC
source emits one StreamPage per committed transaction so the
pipeline gets per-transaction durability via its per-page bookmark
persist. Because transactions are atomic units they are never split
across pages — a single transaction whose record count exceeds
batch_size still emits as one page. Defaults to
DEFAULT_BATCH_SIZE.
batch_size = 0 is the “no batching” sentinel: every committed
transaction during the run window is accumulated into a single page
that is emitted at the end with bookmark = max(commit_lsn). This
negates per-transaction durability and is only useful for tests or
initial-snapshot style runs.
slot_acquire_retries: u32Number of times to retry acquiring the replication slot when the server
reports it is still active (held by a not-yet-released prior
connection). On a rapid restart — a scheduler or serve re-running the
pipeline before the previous backend has dropped the slot — both the
pre-stream pg_replication_slot_advance and START_REPLICATION fail
with “replication slot … is active for PID …”. Each retry waits an
exponentially increasing backoff (250 ms, doubling, capped at 4 s).
0 disables retries (fail fast). Defaults to 10.
Implementations§
Source§impl PostgresCdcSourceConfig
impl PostgresCdcSourceConfig
Sourcepub fn with_batch_size(self, batch_size: usize) -> PostgresCdcSourceConfig
pub fn with_batch_size(self, batch_size: usize) -> PostgresCdcSourceConfig
Override the advisory per-page record count emitted by
Source::stream_pages.
Pass 0 to disable per-transaction emission — every transaction in
the run window will be accumulated into a single trailing page with
bookmark = max(commit_lsn). Transactions are never split regardless
of batch_size.
Sourcepub fn validate(&self) -> Result<(), FaucetError>
pub fn validate(&self) -> Result<(), FaucetError>
Validate fail-fast invariants. Called from PostgresCdcSource::new.
Trait Implementations§
Source§impl Clone for PostgresCdcSourceConfig
impl Clone for PostgresCdcSourceConfig
Source§fn clone(&self) -> PostgresCdcSourceConfig
fn clone(&self) -> PostgresCdcSourceConfig
1.0.0 (const: unstable) · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read moreSource§impl Debug for PostgresCdcSourceConfig
impl Debug for PostgresCdcSourceConfig
Source§impl<'de> Deserialize<'de> for PostgresCdcSourceConfig
impl<'de> Deserialize<'de> for PostgresCdcSourceConfig
Source§fn deserialize<__D>(
__deserializer: __D,
) -> Result<PostgresCdcSourceConfig, <__D as Deserializer<'de>>::Error>where
__D: Deserializer<'de>,
fn deserialize<__D>(
__deserializer: __D,
) -> Result<PostgresCdcSourceConfig, <__D as Deserializer<'de>>::Error>where
__D: Deserializer<'de>,
Source§impl JsonSchema for PostgresCdcSourceConfig
impl JsonSchema for PostgresCdcSourceConfig
Source§fn schema_id() -> Cow<'static, str>
fn schema_id() -> Cow<'static, str>
Source§fn json_schema(generator: &mut SchemaGenerator) -> Schema
fn json_schema(generator: &mut SchemaGenerator) -> Schema
Source§fn inline_schema() -> bool
fn inline_schema() -> bool
$ref keyword. Read moreSource§impl Serialize for PostgresCdcSourceConfig
impl Serialize for PostgresCdcSourceConfig
Source§fn serialize<__S>(
&self,
__serializer: __S,
) -> Result<<__S as Serializer>::Ok, <__S as Serializer>::Error>where
__S: Serializer,
fn serialize<__S>(
&self,
__serializer: __S,
) -> Result<<__S as Serializer>::Ok, <__S as Serializer>::Error>where
__S: Serializer,
Auto Trait Implementations§
impl Freeze for PostgresCdcSourceConfig
impl RefUnwindSafe for PostgresCdcSourceConfig
impl Send for PostgresCdcSourceConfig
impl Sync for PostgresCdcSourceConfig
impl Unpin for PostgresCdcSourceConfig
impl UnsafeUnpin for PostgresCdcSourceConfig
impl UnwindSafe for PostgresCdcSourceConfig
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> FmtForward for T
impl<T> FmtForward for T
Source§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self to use its Binary implementation when Debug-formatted.Source§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self to use its Display implementation when
Debug-formatted.Source§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self to use its LowerExp implementation when
Debug-formatted.Source§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self to use its LowerHex implementation when
Debug-formatted.Source§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self to use its Octal implementation when Debug-formatted.Source§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self to use its Pointer implementation when
Debug-formatted.Source§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self to use its UpperExp implementation when
Debug-formatted.Source§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self to use its UpperHex implementation when
Debug-formatted.Source§impl<T> FutureExt for T
impl<T> FutureExt for T
Source§fn with_context(self, otel_cx: Context) -> WithContext<Self> ⓘ
fn with_context(self, otel_cx: Context) -> WithContext<Self> ⓘ
Source§fn with_current_context(self) -> WithContext<Self> ⓘ
fn with_current_context(self) -> WithContext<Self> ⓘ
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self> ⓘ
fn instrument(self, span: Span) -> Instrumented<Self> ⓘ
Source§fn in_current_span(self) -> Instrumented<Self> ⓘ
fn in_current_span(self) -> Instrumented<Self> ⓘ
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self> ⓘ
fn into_either(self, into_left: bool) -> Either<Self, Self> ⓘ
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self> ⓘ
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self> ⓘ
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::RequestSource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::RequestSource§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
Source§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
Source§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self and passes that borrow into the pipe function. Read moreSource§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self and passes that borrow into the pipe function. Read moreSource§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
Source§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
Source§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self, then passes self.as_ref() into the pipe function.Source§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self, then passes self.as_mut() into the pipe
function.Source§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self, then passes self.deref() into the pipe function.Source§impl<T> Pointable for T
impl<T> Pointable for T
Source§impl<T> PolicyExt for Twhere
T: ?Sized,
impl<T> PolicyExt for Twhere
T: ?Sized,
Source§impl<T> Tap for T
impl<T> Tap for T
Source§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B> of a value. Read moreSource§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B> of a value. Read moreSource§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R> view of a value. Read moreSource§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R> view of a value. Read moreSource§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target of a value. Read moreSource§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target of a value. Read moreSource§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap() only in debug builds, and is erased in release builds.Source§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut() only in debug builds, and is erased in release
builds.Source§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow() only in debug builds, and is erased in release
builds.Source§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut() only in debug builds, and is erased in release
builds.Source§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref() only in debug builds, and is erased in release
builds.Source§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut() only in debug builds, and is erased in release
builds.Source§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref() only in debug builds, and is erased in release
builds.