Skip to main content

daemon/
lib.rs

1//! The `flusso` daemon — the supervisor around the [`engine`].
2//!
3//! It builds the pluggable parts from a validated [`Config`], wires a
4//! [`StatusObserver`] that updates a shared [`Status`], runs the engine, and
5//! polls source lag out of band.
6//!
7//! It owns the **domain**: the pipeline and its observable state, and it is
8//! telemetry-agnostic — it depends only on the engine's [`Observer`] trait, not
9//! on any metrics backend. It does *not* own **transport**: the HTTP surface,
10//! process signals, the telemetry exporter, *and the metrics recording itself*
11//! live in the binary (the CLI), which installs a meter provider, attaches its
12//! own metrics observer via [`Daemon::with_observer`], reads the [`Status`]
13//! handle this exposes, serves it, and drives shutdown:
14//!
15//! ```text
16//!   CLI ── install meter provider ─▶ Daemon::start ──▶ RunningDaemon
17//!    │                                                   │  .status() ─▶ Arc<Status>  (CLI serves it)
18//!    └── shutdown future (signals) ─▶ RunningDaemon::run(shutdown)
19//! ```
20
21mod backends;
22mod lag;
23mod observer;
24pub mod status;
25
26pub use backends::{Backends, SourceParts};
27pub use observer::StatusObserver;
28pub use status::{IndexState, Phase, Status, StatusSnapshot};
29
30// Re-exported so a binary can attach its own observer (e.g. a metrics recorder)
31// without depending on `engine`/`schema-core` directly — these are part of the
32// daemon's observe-the-pipeline surface.
33pub use engine::{BatchStats, Observer};
34pub use schema_core::IndexName;
35
36use std::future::Future;
37use std::sync::Arc;
38use std::time::{Duration, Instant};
39
40use anyhow::Context;
41use engine::{Engine, FailurePolicies, FanOut};
42use schema::Config;
43use sources_core::cdc::ChangeCapture;
44
45/// How a [`Daemon`] run is parameterized — the pipeline knobs the CLI exposes as
46/// flags. Transport settings (HTTP address, …) are the binary's concern, not the
47/// daemon's, so they are not here.
48#[derive(Debug, Clone)]
49pub struct DaemonOptions {
50    /// Logical replication slot to consume. Must already exist or be creatable.
51    pub slot: String,
52    /// Publication to subscribe to.
53    pub publication: String,
54    /// Auto-create/extend the publication to cover every table the indexes read
55    /// when the source role is privileged enough. When false, a coverage gap is
56    /// only reported (the source still streams whatever the publication covers).
57    pub manage_publication: bool,
58    /// Skip the initial backfill and resume live capture only.
59    pub skip_backfill: bool,
60    /// Changes buffered between capture and processing.
61    pub queue_capacity: usize,
62    /// Pretty-print documents on the stdout fallback sink (no sink configured).
63    pub pretty: bool,
64    /// How often to sample source capture lag.
65    pub lag_poll_interval: Duration,
66}
67
68impl Default for DaemonOptions {
69    fn default() -> Self {
70        Self {
71            slot: "flusso".to_owned(),
72            publication: "flusso".to_owned(),
73            manage_publication: true,
74            skip_backfill: false,
75            queue_capacity: 1024,
76            pretty: false,
77            lag_poll_interval: Duration::from_secs(15),
78        }
79    }
80}
81
82/// A configured-but-not-yet-running sync daemon over one [`Config`].
83#[derive(Debug)]
84pub struct Daemon {
85    config: Config,
86    options: DaemonOptions,
87    backends: Arc<dyn Backends>,
88    extra_observers: Vec<Arc<dyn Observer>>,
89    status: Option<Arc<Status>>,
90}
91
92impl Daemon {
93    /// Create a daemon for `config` with default [`DaemonOptions`].
94    ///
95    /// `backends` builds the concrete source/sink the engine drives; the daemon
96    /// itself never names a backend (see [`Backends`]). The composition root
97    /// supplies it.
98    pub fn new(config: Config, backends: Arc<dyn Backends>) -> Self {
99        Self {
100            config,
101            options: DaemonOptions::default(),
102            backends,
103            extra_observers: Vec::new(),
104            status: None,
105        }
106    }
107
108    pub fn with_options(mut self, options: DaemonOptions) -> Self {
109        self.options = options;
110        self
111    }
112
113    /// Attach an additional [`Observer`] alongside the daemon's own status
114    /// observer — e.g. a metrics recorder the binary owns. All attached
115    /// observers receive every event (the engine drives a [`FanOut`]).
116    pub fn with_observer(mut self, observer: Arc<dyn Observer>) -> Self {
117        self.extra_observers.push(observer);
118        self
119    }
120
121    /// Provide the [`Status`] handle to update instead of minting a fresh one.
122    ///
123    /// The binary uses this to keep **one** process-lifetime status across
124    /// pipeline restarts (e.g. an on-demand reindex): the long-lived HTTP surface
125    /// and metrics keep reading the same handle, and its counters and uptime
126    /// survive the restart rather than resetting. Without it, [`start`](Self::start)
127    /// creates a new status each time.
128    pub fn with_status(mut self, status: Arc<Status>) -> Self {
129        self.status = Some(status);
130        self
131    }
132
133    /// Build the pipeline and its observable state, returning a [`RunningDaemon`]
134    /// whose [`status`](RunningDaemon::status) can be read (e.g. served over HTTP)
135    /// while it runs.
136    ///
137    /// If an attached observer (via [`with_observer`](Self::with_observer)) records
138    /// to the global OpenTelemetry meter, install a meter provider *before* calling
139    /// this; otherwise its instruments are no-ops.
140    #[tracing::instrument(name = "daemon.start", skip_all)]
141    pub async fn start(self) -> anyhow::Result<RunningDaemon> {
142        let Daemon {
143            config,
144            options,
145            backends,
146            extra_observers,
147            status,
148        } = self;
149
150        tracing::info!(
151            slot = %options.slot,
152            publication = %options.publication,
153            indexes = config.indexes.len(),
154            "starting sync",
155        );
156
157        // Reset the phase to `Starting`: a reused status may
158        // have been left `Stopped` by a previous run.
159        let status = status.unwrap_or_else(|| {
160            Arc::new(Status::new(config.indexes.keys().cloned(), Instant::now()))
161        });
162        status.set_phase(Phase::Starting);
163        let mut observers: Vec<Arc<dyn Observer>> =
164            vec![Arc::new(StatusObserver::new(Arc::clone(&status)))];
165        observers.extend(extra_observers);
166        let observer: Arc<dyn Observer> = Arc::new(FanOut::new(observers));
167
168        let config = Arc::new(config);
169        let SourceParts { capture, documents } =
170            backends.source(Arc::clone(&config), &options).await?;
171        let sink = backends.sink(&config, &options).await?;
172
173        let mut failure_policies = FailurePolicies::new(config.on_error);
174        for (name, index) in &config.indexes {
175            if let Some(policy) = index.on_error {
176                failure_policies = failure_policies.with_override(name.as_ref(), policy);
177            }
178        }
179
180        let engine = Engine::new(Arc::clone(&capture), documents, sink)
181            .with_observer(Arc::clone(&observer))
182            .with_queue_capacity(options.queue_capacity)
183            .skip_backfill(options.skip_backfill)
184            .with_failure_policies(failure_policies);
185
186        Ok(RunningDaemon {
187            status,
188            engine,
189            source: capture,
190            observer,
191            lag_poll_interval: options.lag_poll_interval,
192        })
193    }
194}
195
196/// A built sync daemon, ready to run. Exposes its live [`Status`] so a transport
197/// the binary owns can serve it concurrently with the run.
198#[derive(Debug)]
199pub struct RunningDaemon {
200    status: Arc<Status>,
201    engine: Engine,
202    source: Arc<dyn ChangeCapture>,
203    observer: Arc<dyn Observer>,
204    lag_poll_interval: Duration,
205}
206
207impl RunningDaemon {
208    /// A handle to the live operational status, for a transport (HTTP, a TUI, …)
209    /// to read while the daemon runs. Cheap to clone.
210    pub fn status(&self) -> Arc<Status> {
211        Arc::clone(&self.status)
212    }
213
214    /// Run until the live stream ends, an error stops the pipeline, or `shutdown`
215    /// resolves — typically a signal future the binary owns. A pending batch on
216    /// shutdown is simply redelivered on the next run (at-least-once), so
217    /// dropping the run mid-flight is safe.
218    #[tracing::instrument(name = "daemon.run", skip_all)]
219    pub async fn run(self, shutdown: impl Future<Output = ()> + Send) -> anyhow::Result<()> {
220        let RunningDaemon {
221            status,
222            engine,
223            source,
224            observer,
225            lag_poll_interval,
226        } = self;
227
228        // Held in a guard so it's aborted however this returns — a normal stop
229        // *or* the future being cancelled (e.g. the binary dropping the run for a
230        // reindex restart) — rather than detaching onto the shared status.
231        let _lag = LagGuard(tokio::spawn(lag::poll(source, observer, lag_poll_interval)));
232
233        let result = tokio::select! {
234            res = engine.run() => res.context("sync engine stopped"),
235            () = shutdown => {
236                tracing::info!("shutdown requested; stopping pipeline");
237                Ok(())
238            }
239        };
240
241        status.set_phase(Phase::Stopped);
242        result
243    }
244}
245
246/// Aborts the lag poller when dropped — on a normal stop or on cancellation
247/// (the run future being dropped for a restart) alike. Its result is discarded,
248/// so there's nothing to join.
249#[derive(Debug)]
250struct LagGuard(tokio::task::JoinHandle<()>);
251
252impl Drop for LagGuard {
253    fn drop(&mut self) {
254        self.0.abort();
255    }
256}
257
258#[cfg(test)]
259#[allow(clippy::unwrap_used)]
260mod tests;