sources_core/provisioning.rs
1//! Provisioning the prerequisites a source needs to stream a set of tables.
2//!
3//! A change-capture mechanism only sees a row change if the source is configured
4//! to stream that row's table. For Postgres logical replication that means a
5//! *publication* covering the table; another mechanism would have its own
6//! prerequisite. This module abstracts that uniformly: given the set of tables an
7//! index reads (see [`SourceSpec::all_tables`](crate::SourceSpec::all_tables)), a
8//! source can **inspect** whether they're covered and, when it has the privilege,
9//! **ensure** they are.
10//!
11//! The contract is deliberately mechanism-neutral — the trait and the
12//! [`CoverageReport`] never name "publication". A backend that *can* provision
13//! the gap describes how to in [`CoverageReport::remediation`] (Postgres puts the
14//! `CREATE`/`ALTER PUBLICATION` SQL there as opaque strings); a caller prints
15//! those steps but never interprets them. This keeps the daemon, the CLI, and the
16//! shared printer free of any Postgres specifics.
17
18use std::collections::BTreeSet;
19
20use async_trait::async_trait;
21
22use crate::{QualifiedTable, Result};
23
24/// What a source found when asked whether it can stream a set of tables.
25///
26/// `present` + `missing` partition the requested set. When `missing` is
27/// non-empty the source is not yet streaming every table an index reads (live
28/// changes to a `missing` table would be silently dropped); `manageable` says
29/// whether *this* source — with its current credentials — can close the gap
30/// itself, and `remediation` carries the steps to do so (the operator can run
31/// them by hand regardless of `manageable`). `blockers` explains a
32/// `manageable == false` verdict in human terms.
33#[derive(Debug, Clone, Default)]
34pub struct CoverageReport {
35 /// Every requested table is already streamable — nothing to do.
36 pub satisfied: bool,
37 /// Requested tables the source already streams.
38 pub present: Vec<QualifiedTable>,
39 /// Requested tables not yet streamed.
40 pub missing: Vec<QualifiedTable>,
41 /// Whether the source can provision the `missing` tables with its current
42 /// privileges. Meaningless when `missing` is empty.
43 pub manageable: bool,
44 /// Why `manageable` is false (e.g. "role does not own table public.orders").
45 pub blockers: Vec<String>,
46 /// Backend-specific steps that would close the gap — opaque to callers,
47 /// meant to be shown verbatim (for Postgres, the `CREATE`/`ALTER PUBLICATION`
48 /// statements). Empty when `satisfied`.
49 pub remediation: Vec<String>,
50}
51
52/// A source's ability to report and provision the prerequisites for streaming a
53/// set of tables. Implemented per mechanism (Postgres backs it with a
54/// publication); consumed by the CLI (`check` reports, `run` ensures) only
55/// through this neutral surface.
56#[async_trait]
57pub trait CaptureProvisioning: Send + Sync {
58 /// Read-only: report coverage of `required` against what the source streams,
59 /// and whether the gap (if any) is `manageable`. Never mutates anything.
60 async fn inspect_coverage(&self, required: &BTreeSet<QualifiedTable>)
61 -> Result<CoverageReport>;
62
63 /// Provision any missing tables when `manage` is set and the gap is
64 /// manageable; otherwise a no-op. Returns the report as observed *before*
65 /// acting (so a caller can log what was — or would have been — done). A
66 /// no-op for an already-satisfied set.
67 async fn ensure_coverage(
68 &self,
69 required: &BTreeSet<QualifiedTable>,
70 manage: bool,
71 ) -> Result<CoverageReport>;
72}