pub struct PostgresCdcSource { /* private fields */ }Implementations§
Source§impl PostgresCdcSource
impl PostgresCdcSource
pub async fn new(config: PostgresCdcSourceConfig) -> Result<Self, FaucetError>
Sourcepub async fn drop_slot(&self) -> Result<(), FaucetError>
pub async fn drop_slot(&self) -> Result<(), FaucetError>
Drop this source’s replication slot on the server, freeing the WAL it
pins. A no-op if the slot doesn’t exist; errors if the slot is still
active (in use by a live replication connection). Call this when
decommissioning a permanent slot so it doesn’t leak WAL (#78/#12).
Trait Implementations§
Source§impl Source for PostgresCdcSource
impl Source for PostgresCdcSource
Source§fn fetch_with_context_incremental<'life0, 'life1, 'async_trait>(
&'life0 self,
ctx: &'life1 HashMap<String, Value>,
) -> Pin<Box<dyn Future<Output = Result<(Vec<Value>, Option<Value>), FaucetError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn fetch_with_context_incremental<'life0, 'life1, 'async_trait>(
&'life0 self,
ctx: &'life1 HashMap<String, Value>,
) -> Pin<Box<dyn Future<Output = Result<(Vec<Value>, Option<Value>), FaucetError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Drain the replication stream into a single Vec<Value> plus the
bookmark of the most recent COMMIT.
Implemented by collecting Source::stream_pages with the
batch_size = 0 sentinel — that sentinel coalesces every transaction
in the run window into a single trailing page, which exactly matches
the historical fetch_with_context_incremental contract (one
aggregated buffer, one max-LSN bookmark). The streaming pipeline
(Pipeline::run / run_stream) drives stream_pages directly with
the per-source batch_size config field instead so it gets
per-transaction durability.
Source§fn stream_pages<'a>(
&'a self,
ctx: &'a HashMap<String, Value>,
_batch_size: usize,
) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>>
fn stream_pages<'a>( &'a self, ctx: &'a HashMap<String, Value>, _batch_size: usize, ) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>>
Per-transaction streaming.
Each committed transaction is emitted as its own
StreamPage with bookmark = Some(commit_lsn). Because the
pipeline flushes the sink and persists the bookmark on every page
that carries one, a mid-stream crash recovers from the last fully-
committed transaction with no partial-transaction leakage.
Atomic transactions. A transaction is never split across pages.
If a single transaction’s record count exceeds
PostgresCdcSourceConfig::batch_size it is still emitted as one
page; batch_size is advisory.
batch_size = 0 is the “no batching” sentinel: every committed
transaction during the run window is accumulated into a single
trailing page with bookmark = max(commit_lsn). This negates
per-transaction durability and is only useful for tests / initial
snapshot runs.
The trait-level batch_size argument is intentionally ignored in
favour of the config field (matches the convention used by the
query-mode postgres source and the rest source).
Source§fn check<'life0, 'life1, 'async_trait>(
&'life0 self,
ctx: &'life1 CheckContext,
) -> Pin<Box<dyn Future<Output = Result<CheckReport, FaucetError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn check<'life0, 'life1, 'async_trait>(
&'life0 self,
ctx: &'life1 CheckContext,
) -> Pin<Box<dyn Future<Output = Result<CheckReport, FaucetError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Preflight probe that does not start replication.
The default Source::check would call stream_pages, which opens the
replication stream and consumes/holds WAL (a side effect that pins server
resources) — unacceptable as a preflight. Instead we open a normal
(non-replication) SQL connection — the same connection style
ensure_slot uses — and inspect the slot catalog without touching the
replication protocol:
- connection fails →
authprobeFail(could not connect / authenticate), - connected but the slot row is absent →
slotprobeSkip(faucet can create it on the first run), - slot present →
slotprobePass.
The whole call is bounded by ctx.timeout.
Source§fn fetch_with_context<'life0, 'life1, 'async_trait>(
&'life0 self,
ctx: &'life1 HashMap<String, Value>,
) -> Pin<Box<dyn Future<Output = Result<Vec<Value>, FaucetError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn fetch_with_context<'life0, 'life1, 'async_trait>(
&'life0 self,
ctx: &'life1 HashMap<String, Value>,
) -> Pin<Box<dyn Future<Output = Result<Vec<Value>, FaucetError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Source§fn config_schema(&self) -> Value
fn config_schema(&self) -> Value
Source§fn state_key(&self) -> Option<String>
fn state_key(&self) -> Option<String>
StateStore. Read moreSource§fn apply_start_bookmark<'life0, 'async_trait>(
&'life0 self,
bookmark: Value,
) -> Pin<Box<dyn Future<Output = Result<(), FaucetError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn apply_start_bookmark<'life0, 'async_trait>(
&'life0 self,
bookmark: Value,
) -> Pin<Box<dyn Future<Output = Result<(), FaucetError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
StateStore
as this run’s starting point. Read moreSource§fn connector_name(&self) -> &'static str
fn connector_name(&self) -> &'static str
connector label on metrics and the
connector attribute on spans. Defaults to the final segment of
std::any::type_name::<Self>(), e.g. "RestSource". Built-in
connectors override with a short, friendly snake_case name (e.g.
"rest"). Must return a non-empty string; observability decorators
fall back to "unknown" in release builds if it is empty (and
debug_assert! in debug builds).Auto Trait Implementations§
impl !Freeze for PostgresCdcSource
impl !RefUnwindSafe for PostgresCdcSource
impl Send for PostgresCdcSource
impl Sync for PostgresCdcSource
impl Unpin for PostgresCdcSource
impl UnsafeUnpin for PostgresCdcSource
impl UnwindSafe for PostgresCdcSource
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more