daemon/lib.rs
1//! The `flusso` daemon — the supervisor around the [`engine`].
2//!
3//! It builds the pluggable parts from a validated [`Config`], wires a
4//! [`StatusObserver`] that updates a shared [`Status`], runs the engine, and
5//! polls source lag out of band.
6//!
7//! It owns the **domain**: the pipeline and its observable state, and it is
8//! telemetry-agnostic — it depends only on the engine's [`Observer`] trait, not
9//! on any metrics backend. It does *not* own **transport**: the HTTP surface,
10//! process signals, the telemetry exporter, *and the metrics recording itself*
11//! live in the binary (the CLI), which installs a meter provider, attaches its
12//! own metrics observer via [`Daemon::with_observer`], reads the [`Status`]
13//! handle this exposes, serves it, and drives shutdown:
14//!
15//! ```text
16//! CLI ── install meter provider ─▶ Daemon::start ──▶ RunningDaemon
17//! │ │ .status() ─▶ Arc<Status> (CLI serves it)
18//! └── shutdown future (signals) ─▶ RunningDaemon::run(shutdown)
19//! ```
20
21mod backends;
22mod lag;
23mod observer;
24pub mod status;
25
26pub use backends::{Backends, SourceParts};
27pub use observer::StatusObserver;
28pub use status::{IndexState, Phase, Status, StatusSnapshot};
29
30// Re-exported so a binary can attach its own observer (e.g. a metrics recorder)
31// without depending on `engine`/`schema-core` directly — these are part of the
32// daemon's observe-the-pipeline surface.
33pub use engine::{BatchStats, Observer};
34pub use schema_core::IndexName;
35
36use std::future::Future;
37use std::sync::Arc;
38use std::time::{Duration, Instant};
39
40use anyhow::Context;
41use engine::{Engine, FailurePolicies, FanOut};
42use schema::Config;
43use sources_core::cdc::ChangeCapture;
44
45/// How a [`Daemon`] run is parameterized — the pipeline knobs the CLI exposes as
46/// flags. Transport settings (HTTP address, …) are the binary's concern, not the
47/// daemon's, so they are not here.
48#[derive(Debug, Clone)]
49pub struct DaemonOptions {
50 /// Logical replication slot to consume. Must already exist or be creatable.
51 pub slot: String,
52 /// Publication to subscribe to.
53 pub publication: String,
54 /// Auto-create/extend the publication to cover every table the indexes read
55 /// when the source role is privileged enough. When false, a coverage gap is
56 /// only reported (the source still streams whatever the publication covers).
57 pub manage_publication: bool,
58 /// Skip the initial backfill and resume live capture only.
59 pub skip_backfill: bool,
60 /// Changes buffered between capture and processing.
61 pub queue_capacity: usize,
62 /// Pretty-print documents on the stdout fallback sink (no sink configured).
63 pub pretty: bool,
64 /// How often to sample source capture lag.
65 pub lag_poll_interval: Duration,
66}
67
68impl Default for DaemonOptions {
69 fn default() -> Self {
70 Self {
71 slot: "flusso".to_owned(),
72 publication: "flusso".to_owned(),
73 manage_publication: true,
74 skip_backfill: false,
75 queue_capacity: 1024,
76 pretty: false,
77 lag_poll_interval: Duration::from_secs(15),
78 }
79 }
80}
81
82/// A configured-but-not-yet-running sync daemon over one [`Config`].
83#[derive(Debug)]
84pub struct Daemon {
85 config: Config,
86 options: DaemonOptions,
87 backends: Arc<dyn Backends>,
88 extra_observers: Vec<Arc<dyn Observer>>,
89 status: Option<Arc<Status>>,
90}
91
92impl Daemon {
93 /// Create a daemon for `config` with default [`DaemonOptions`].
94 ///
95 /// `backends` builds the concrete source/sink the engine drives; the daemon
96 /// itself never names a backend (see [`Backends`]). The composition root
97 /// supplies it.
98 pub fn new(config: Config, backends: Arc<dyn Backends>) -> Self {
99 Self {
100 config,
101 options: DaemonOptions::default(),
102 backends,
103 extra_observers: Vec::new(),
104 status: None,
105 }
106 }
107
108 pub fn with_options(mut self, options: DaemonOptions) -> Self {
109 self.options = options;
110 self
111 }
112
113 /// Attach an additional [`Observer`] alongside the daemon's own status
114 /// observer — e.g. a metrics recorder the binary owns. All attached
115 /// observers receive every event (the engine drives a [`FanOut`]).
116 pub fn with_observer(mut self, observer: Arc<dyn Observer>) -> Self {
117 self.extra_observers.push(observer);
118 self
119 }
120
121 /// Provide the [`Status`] handle to update instead of minting a fresh one.
122 ///
123 /// The binary uses this to keep **one** process-lifetime status across
124 /// pipeline restarts (e.g. an on-demand reindex): the long-lived HTTP surface
125 /// and metrics keep reading the same handle, and its counters and uptime
126 /// survive the restart rather than resetting. Without it, [`start`](Self::start)
127 /// creates a new status each time.
128 pub fn with_status(mut self, status: Arc<Status>) -> Self {
129 self.status = Some(status);
130 self
131 }
132
133 /// Build the pipeline and its observable state, returning a [`RunningDaemon`]
134 /// whose [`status`](RunningDaemon::status) can be read (e.g. served over HTTP)
135 /// while it runs.
136 ///
137 /// If an attached observer (via [`with_observer`](Self::with_observer)) records
138 /// to the global OpenTelemetry meter, install a meter provider *before* calling
139 /// this; otherwise its instruments are no-ops.
140 #[tracing::instrument(name = "daemon.start", skip_all)]
141 pub async fn start(self) -> anyhow::Result<RunningDaemon> {
142 let Daemon {
143 config,
144 options,
145 backends,
146 extra_observers,
147 status,
148 } = self;
149
150 tracing::info!(
151 slot = %options.slot,
152 publication = %options.publication,
153 indexes = config.indexes.len(),
154 "starting sync",
155 );
156
157 // Reset the phase to `Starting`: a reused status may
158 // have been left `Stopped` by a previous run.
159 let status = status.unwrap_or_else(|| {
160 Arc::new(Status::new(config.indexes.keys().cloned(), Instant::now()))
161 });
162 status.set_phase(Phase::Starting);
163 let mut observers: Vec<Arc<dyn Observer>> =
164 vec![Arc::new(StatusObserver::new(Arc::clone(&status)))];
165 observers.extend(extra_observers);
166 let observer: Arc<dyn Observer> = Arc::new(FanOut::new(observers));
167
168 let config = Arc::new(config);
169 let SourceParts { capture, documents } =
170 backends.source(Arc::clone(&config), &options).await?;
171 let sink = backends.sink(&config, &options).await?;
172
173 let mut failure_policies = FailurePolicies::new(config.on_error);
174 for (name, index) in &config.indexes {
175 if let Some(policy) = index.on_error {
176 failure_policies = failure_policies.with_override(name.as_ref(), policy);
177 }
178 }
179
180 let engine = Engine::new(Arc::clone(&capture), documents, sink)
181 .with_observer(Arc::clone(&observer))
182 .with_queue_capacity(options.queue_capacity)
183 .skip_backfill(options.skip_backfill)
184 .with_failure_policies(failure_policies);
185
186 Ok(RunningDaemon {
187 status,
188 engine,
189 source: capture,
190 observer,
191 lag_poll_interval: options.lag_poll_interval,
192 })
193 }
194}
195
196/// A built sync daemon, ready to run. Exposes its live [`Status`] so a transport
197/// the binary owns can serve it concurrently with the run.
198#[derive(Debug)]
199pub struct RunningDaemon {
200 status: Arc<Status>,
201 engine: Engine,
202 source: Arc<dyn ChangeCapture>,
203 observer: Arc<dyn Observer>,
204 lag_poll_interval: Duration,
205}
206
207impl RunningDaemon {
208 /// A handle to the live operational status, for a transport (HTTP, a TUI, …)
209 /// to read while the daemon runs. Cheap to clone.
210 pub fn status(&self) -> Arc<Status> {
211 Arc::clone(&self.status)
212 }
213
214 /// Run until the live stream ends, an error stops the pipeline, or `shutdown`
215 /// resolves — typically a signal future the binary owns. A pending batch on
216 /// shutdown is simply redelivered on the next run (at-least-once), so
217 /// dropping the run mid-flight is safe.
218 #[tracing::instrument(name = "daemon.run", skip_all)]
219 pub async fn run(self, shutdown: impl Future<Output = ()> + Send) -> anyhow::Result<()> {
220 let RunningDaemon {
221 status,
222 engine,
223 source,
224 observer,
225 lag_poll_interval,
226 } = self;
227
228 // Held in a guard so it's aborted however this returns — a normal stop
229 // *or* the future being cancelled (e.g. the binary dropping the run for a
230 // reindex restart) — rather than detaching onto the shared status.
231 let _lag = LagGuard(tokio::spawn(lag::poll(source, observer, lag_poll_interval)));
232
233 let result = tokio::select! {
234 res = engine.run() => res.context("sync engine stopped"),
235 () = shutdown => {
236 tracing::info!("shutdown requested; stopping pipeline");
237 Ok(())
238 }
239 };
240
241 status.set_phase(Phase::Stopped);
242 result
243 }
244}
245
246/// Aborts the lag poller when dropped — on a normal stop or on cancellation
247/// (the run future being dropped for a restart) alike. Its result is discarded,
248/// so there's nothing to join.
249#[derive(Debug)]
250struct LagGuard(tokio::task::JoinHandle<()>);
251
252impl Drop for LagGuard {
253 fn drop(&mut self) {
254 self.0.abort();
255 }
256}
257
258#[cfg(test)]
259#[allow(clippy::unwrap_used)]
260mod tests;