sources_core/cdc/capture.rs
1use async_trait::async_trait;
2use futures::stream::{self, BoxStream};
3
4use crate::{Result, SnapshotTable};
5
6use super::Change;
7
8/// A pluggable change-capture mechanism — logical replication (WAL) today,
9/// polling or trigger-based capture later.
10///
11/// The mechanism exposes two independent capabilities; the engine decides when
12/// to use each:
13///
14/// - [`live`](Self::live) streams ongoing changes, resuming from the
15/// mechanism's own durable position (a replication slot's
16/// `confirmed_flush_lsn`, a poll cursor, …). No position is threaded through
17/// this API — resume state is the mechanism's to own.
18/// - [`snapshot`](Self::snapshot) reads the *current* rows of a set of tables as
19/// a finite stream — the data an initial backfill needs. Whether a backfill
20/// is *needed* is not the mechanism's call: the engine asks the **sink**
21/// whether a target is already seeded and only then requests a snapshot. A
22/// mechanism that cannot snapshot keeps the default (an empty stream).
23///
24/// Each emitted [`Change`] carries an [`Ack`](super::Ack); for `live`, the
25/// mechanism only advances its durable resume point once changes are confirmed,
26/// which makes delivery at-least-once across restarts. Snapshot changes are not
27/// resumable (a crashed backfill simply re-runs, idempotently), so their acks
28/// need not move any cursor.
29///
30/// Returned streams are `'static` and `Send`: an implementation moves whatever
31/// it needs (its connection, its [`AckSink`](super::AckSink)) into the stream
32/// rather than borrowing from `self`.
33#[async_trait]
34pub trait ChangeCapture: std::fmt::Debug + Send + Sync {
35 /// Connect, ensure setup, resume from the last confirmed point, and stream
36 /// live changes.
37 async fn live(&self) -> Result<BoxStream<'static, Result<Change>>>;
38
39 /// Snapshot the current rows of `tables` as a finite stream of
40 /// [`Upsert`](super::ChangeEvent::Upsert) changes — the rows to seed an
41 /// index with. The stream ends when the snapshot is complete; there is no
42 /// in-band boundary marker.
43 ///
44 /// The default is an empty stream, for mechanisms that cannot snapshot.
45 async fn snapshot(
46 &self,
47 tables: &[SnapshotTable],
48 ) -> Result<BoxStream<'static, Result<Change>>> {
49 let _ = tables;
50 Ok(Box::pin(stream::empty()))
51 }
52
53 /// How far the mechanism's durable resume point trails the source's latest
54 /// position, in bytes — e.g. a replication slot's distance from the server's
55 /// current WAL LSN. A growing value means the consumer is falling behind the
56 /// source; it is the single best signal of pipeline health.
57 ///
58 /// This is sampled out-of-band (by a supervisor, on a timer), not on the
59 /// change path, so it opens its own short-lived connection rather than
60 /// borrowing the live stream's. The default is `Ok(None)` — for mechanisms
61 /// that have no notion of lag (e.g. a finite snapshot-only source).
62 async fn lag(&self) -> Result<Option<u64>> {
63 Ok(None)
64 }
65}