Skip to main content

daemon/
backends.rs

1//! The backend-assembler seam.
2//!
3//! The daemon owns the pipeline but not the concrete backends behind it. A
4//! [`Backends`] implementation — supplied by the composition root (the CLI) —
5//! turns a validated [`Config`] into the source capture, its document builder,
6//! and the sink, all as trait objects. This keeps the daemon depending only on
7//! the source/sink *abstractions* (`sources-core`/`sinks-core`), never on a
8//! concrete Postgres/OpenSearch crate.
9
10use std::sync::Arc;
11
12use schema::Config;
13use sinks_core::Sink;
14use sources_core::cdc::ChangeCapture;
15use sources_core::document::DocumentBuilder;
16
17use crate::DaemonOptions;
18
19/// A source capture paired with the document builder over the same source.
20///
21/// They are built together because both read from one source and share its
22/// connection/config — the capture tails changes and reports lag; the builder
23/// resolves changed rows to documents.
24#[derive(Debug)]
25pub struct SourceParts {
26    /// Streams row changes (live tail + backfill snapshot) and reports lag.
27    pub capture: Arc<dyn ChangeCapture>,
28    /// Resolves changed rows to [`DocumentId`](sources_core::document)s and
29    /// assembles the documents.
30    pub documents: Arc<dyn DocumentBuilder>,
31}
32
33/// Builds the concrete pipeline backends from a validated [`Config`].
34///
35/// The daemon depends only on this trait; the composition root (the CLI) is the
36/// single place that names concrete backends. Connection and credentials are
37/// resolved by the implementation, in the environment that *runs* the pipeline —
38/// so a compiled `flusso.lock` carries no secret it wasn't given literally.
39#[async_trait::async_trait]
40pub trait Backends: std::fmt::Debug + Send + Sync {
41    /// Build the source capture and its document builder.
42    async fn source(
43        &self,
44        config: Arc<Config>,
45        options: &DaemonOptions,
46    ) -> anyhow::Result<SourceParts>;
47
48    /// Build the sink the engine writes to — a single configured sink, a
49    /// fan-out over several, or a stdout fallback when none are configured.
50    async fn sink(&self, config: &Config, options: &DaemonOptions)
51    -> anyhow::Result<Arc<dyn Sink>>;
52}