Skip to main content

sources_core/cdc/
capture.rs

1use async_trait::async_trait;
2use futures::stream::{self, BoxStream};
3
4use crate::{Result, SnapshotTable};
5
6use super::Change;
7
8/// A pluggable change-capture mechanism — logical replication (WAL) today,
9/// polling or trigger-based capture later.
10///
11/// The mechanism exposes two independent capabilities; the engine decides when
12/// to use each:
13///
14/// - [`live`](Self::live) streams ongoing changes, resuming from the
15///   mechanism's own durable position (a replication slot's
16///   `confirmed_flush_lsn`, a poll cursor, …). No position is threaded through
17///   this API — resume state is the mechanism's to own.
18/// - [`snapshot`](Self::snapshot) reads the *current* rows of a set of tables as
19///   a finite stream — the data an initial backfill needs. Whether a backfill
20///   is *needed* is not the mechanism's call: the engine asks the **sink**
21///   whether a target is already seeded and only then requests a snapshot. A
22///   mechanism that cannot snapshot keeps the default (an empty stream).
23///
24/// Each emitted [`Change`] carries an [`Ack`](super::Ack); for `live`, the
25/// mechanism only advances its durable resume point once changes are confirmed,
26/// which makes delivery at-least-once across restarts. Snapshot changes are not
27/// resumable (a crashed backfill simply re-runs, idempotently), so their acks
28/// need not move any cursor.
29///
30/// Returned streams are `'static` and `Send`: an implementation moves whatever
31/// it needs (its connection, its [`AckSink`](super::AckSink)) into the stream
32/// rather than borrowing from `self`.
33#[async_trait]
34pub trait ChangeCapture: std::fmt::Debug + Send + Sync {
35    /// Connect, ensure setup, resume from the last confirmed point, and stream
36    /// live changes.
37    async fn live(&self) -> Result<BoxStream<'static, Result<Change>>>;
38
39    /// Snapshot the current rows of `tables` as a finite stream of
40    /// [`Upsert`](super::ChangeEvent::Upsert) changes — the rows to seed an
41    /// index with. The stream ends when the snapshot is complete; there is no
42    /// in-band boundary marker.
43    ///
44    /// The default is an empty stream, for mechanisms that cannot snapshot.
45    async fn snapshot(
46        &self,
47        tables: &[SnapshotTable],
48    ) -> Result<BoxStream<'static, Result<Change>>> {
49        let _ = tables;
50        Ok(Box::pin(stream::empty()))
51    }
52
53    /// How far the mechanism's durable resume point trails the source's latest
54    /// position, in bytes — e.g. a replication slot's distance from the server's
55    /// current WAL LSN. A growing value means the consumer is falling behind the
56    /// source; it is the single best signal of pipeline health.
57    ///
58    /// This is sampled out-of-band (by a supervisor, on a timer), not on the
59    /// change path, so it opens its own short-lived connection rather than
60    /// borrowing the live stream's. The default is `Ok(None)` — for mechanisms
61    /// that have no notion of lag (e.g. a finite snapshot-only source).
62    async fn lag(&self) -> Result<Option<u64>> {
63        Ok(None)
64    }
65}