daemon/backends.rs
1//! The backend-assembler seam.
2//!
3//! The daemon owns the pipeline but not the concrete backends behind it. A
4//! [`Backends`] implementation — supplied by the composition root (the CLI) —
5//! turns a validated [`Config`] into the source capture, its document builder,
6//! and the sink, all as trait objects. This keeps the daemon depending only on
7//! the source/sink *abstractions* (`sources-core`/`sinks-core`), never on a
8//! concrete Postgres/OpenSearch crate.
9
10use std::sync::Arc;
11
12use schema::Config;
13use sinks_core::Sink;
14use sources_core::cdc::ChangeCapture;
15use sources_core::document::DocumentBuilder;
16
17use crate::DaemonOptions;
18
19/// A source capture paired with the document builder over the same source.
20///
21/// They are built together because both read from one source and share its
22/// connection/config — the capture tails changes and reports lag; the builder
23/// resolves changed rows to documents.
24#[derive(Debug)]
25pub struct SourceParts {
26 /// Streams row changes (live tail + backfill snapshot) and reports lag.
27 pub capture: Arc<dyn ChangeCapture>,
28 /// Resolves changed rows to [`DocumentId`](sources_core::document)s and
29 /// assembles the documents.
30 pub documents: Arc<dyn DocumentBuilder>,
31}
32
33/// Builds the concrete pipeline backends from a validated [`Config`].
34///
35/// The daemon depends only on this trait; the composition root (the CLI) is the
36/// single place that names concrete backends. Connection and credentials are
37/// resolved by the implementation, in the environment that *runs* the pipeline —
38/// so a compiled `flusso.lock` carries no secret it wasn't given literally.
39#[async_trait::async_trait]
40pub trait Backends: std::fmt::Debug + Send + Sync {
41 async fn source(
42 &self,
43 config: Arc<Config>,
44 options: &DaemonOptions,
45 ) -> anyhow::Result<SourceParts>;
46
47 /// Build the sink the engine writes to — a single configured sink, a
48 /// fan-out over several, or a stdout fallback when none are configured.
49 async fn sink(&self, config: &Config, options: &DaemonOptions)
50 -> anyhow::Result<Arc<dyn Sink>>;
51}