Skip to main content

daemon/
lib.rs

1#![doc = include_str!("../README.md")]
2
3mod backends;
4mod lag;
5mod observer;
6pub mod status;
7
8pub use backends::{Backends, SourceParts};
9pub use observer::StatusObserver;
10pub use status::{IndexState, Phase, Status, StatusSnapshot};
11
12// Re-exported so a binary can attach its own observer (e.g. a metrics recorder)
13// without depending on `engine`/`schema-core` directly — these are part of the
14// daemon's observe-the-pipeline surface.
15pub use engine::{BatchStats, Observer};
16pub use schema_core::IndexName;
17
18use std::future::Future;
19use std::sync::Arc;
20use std::time::{Duration, Instant};
21
22use anyhow::Context;
23use engine::{Engine, FailurePolicies, FanOut};
24use schema::Config;
25use sources_core::cdc::ChangeCapture;
26
27/// How a [`Daemon`] run is parameterized — the pipeline knobs the CLI exposes as
28/// flags. Transport settings (HTTP address, …) are the binary's concern, not the
29/// daemon's, so they are not here.
30#[derive(Debug, Clone)]
31pub struct DaemonOptions {
32    /// Logical replication slot to consume. Must already exist or be creatable.
33    pub slot: String,
34    /// Publication to subscribe to.
35    pub publication: String,
36    /// Auto-create/extend the publication to cover every table the indexes read
37    /// when the source role is privileged enough. When false, a coverage gap is
38    /// only reported (the source still streams whatever the publication covers).
39    pub manage_publication: bool,
40    /// Skip the initial backfill and resume live capture only.
41    pub skip_backfill: bool,
42    /// Changes buffered between capture and processing.
43    pub queue_capacity: usize,
44    /// Pretty-print documents on the stdout fallback sink (no sink configured).
45    pub pretty: bool,
46    /// How often to sample source capture lag.
47    pub lag_poll_interval: Duration,
48}
49
50impl Default for DaemonOptions {
51    fn default() -> Self {
52        Self {
53            slot: "flusso".to_owned(),
54            publication: "flusso".to_owned(),
55            manage_publication: true,
56            skip_backfill: false,
57            queue_capacity: 1024,
58            pretty: false,
59            lag_poll_interval: Duration::from_secs(15),
60        }
61    }
62}
63
64/// A configured-but-not-yet-running sync daemon over one [`Config`].
65#[derive(Debug)]
66pub struct Daemon {
67    config: Config,
68    options: DaemonOptions,
69    backends: Arc<dyn Backends>,
70    extra_observers: Vec<Arc<dyn Observer>>,
71    status: Option<Arc<Status>>,
72}
73
74impl Daemon {
75    /// Create a daemon for `config` with default [`DaemonOptions`].
76    ///
77    /// `backends` builds the concrete source/sink the engine drives; the daemon
78    /// itself never names a backend (see [`Backends`]). The composition root
79    /// supplies it.
80    pub fn new(config: Config, backends: Arc<dyn Backends>) -> Self {
81        Self {
82            config,
83            options: DaemonOptions::default(),
84            backends,
85            extra_observers: Vec::new(),
86            status: None,
87        }
88    }
89
90    pub fn with_options(mut self, options: DaemonOptions) -> Self {
91        self.options = options;
92        self
93    }
94
95    /// Attach an additional [`Observer`] alongside the daemon's own status
96    /// observer — e.g. a metrics recorder the binary owns. All attached
97    /// observers receive every event (the engine drives a [`FanOut`]).
98    pub fn with_observer(mut self, observer: Arc<dyn Observer>) -> Self {
99        self.extra_observers.push(observer);
100        self
101    }
102
103    /// Provide the [`Status`] handle to update instead of minting a fresh one.
104    ///
105    /// The binary uses this to keep **one** process-lifetime status across
106    /// pipeline restarts (e.g. an on-demand reindex): the long-lived HTTP surface
107    /// and metrics keep reading the same handle, and its counters and uptime
108    /// survive the restart rather than resetting. Without it, [`start`](Self::start)
109    /// creates a new status each time.
110    pub fn with_status(mut self, status: Arc<Status>) -> Self {
111        self.status = Some(status);
112        self
113    }
114
115    /// Build the pipeline and its observable state, returning a [`RunningDaemon`]
116    /// whose [`status`](RunningDaemon::status) can be read (e.g. served over HTTP)
117    /// while it runs.
118    ///
119    /// If an attached observer (via [`with_observer`](Self::with_observer)) records
120    /// to the global OpenTelemetry meter, install a meter provider *before* calling
121    /// this; otherwise its instruments are no-ops.
122    #[tracing::instrument(name = "daemon.start", skip_all)]
123    pub async fn start(self) -> anyhow::Result<RunningDaemon> {
124        let Daemon {
125            config,
126            options,
127            backends,
128            extra_observers,
129            status,
130        } = self;
131
132        tracing::info!(
133            slot = %options.slot,
134            publication = %options.publication,
135            indexes = config.indexes.len(),
136            "starting sync",
137        );
138
139        // Reset the phase to `Starting`: a reused status may
140        // have been left `Stopped` by a previous run.
141        let status = status.unwrap_or_else(|| {
142            Arc::new(Status::new(config.indexes.keys().cloned(), Instant::now()))
143        });
144        status.set_phase(Phase::Starting);
145        let mut observers: Vec<Arc<dyn Observer>> =
146            vec![Arc::new(StatusObserver::new(Arc::clone(&status)))];
147        observers.extend(extra_observers);
148        let observer: Arc<dyn Observer> = Arc::new(FanOut::new(observers));
149
150        let config = Arc::new(config);
151        let SourceParts { capture, documents } =
152            backends.source(Arc::clone(&config), &options).await?;
153        let sink = backends.sink(&config, &options).await?;
154
155        let mut failure_policies = FailurePolicies::new(config.on_error);
156        for (name, index) in &config.indexes {
157            if let Some(policy) = index.on_error {
158                failure_policies = failure_policies.with_override(name.as_ref(), policy);
159            }
160        }
161
162        let engine = Engine::new(Arc::clone(&capture), documents, sink)
163            .with_observer(Arc::clone(&observer))
164            .with_queue_capacity(options.queue_capacity)
165            .skip_backfill(options.skip_backfill)
166            .with_failure_policies(failure_policies);
167
168        Ok(RunningDaemon {
169            status,
170            engine,
171            source: capture,
172            observer,
173            lag_poll_interval: options.lag_poll_interval,
174        })
175    }
176}
177
178/// A built sync daemon, ready to run. Exposes its live [`Status`] so a transport
179/// the binary owns can serve it concurrently with the run.
180#[derive(Debug)]
181pub struct RunningDaemon {
182    status: Arc<Status>,
183    engine: Engine,
184    source: Arc<dyn ChangeCapture>,
185    observer: Arc<dyn Observer>,
186    lag_poll_interval: Duration,
187}
188
189impl RunningDaemon {
190    /// A handle to the live operational status, for a transport (HTTP, a TUI, …)
191    /// to read while the daemon runs. Cheap to clone.
192    pub fn status(&self) -> Arc<Status> {
193        Arc::clone(&self.status)
194    }
195
196    /// Run until the live stream ends, an error stops the pipeline, or `shutdown`
197    /// resolves — typically a signal future the binary owns. A pending batch on
198    /// shutdown is simply redelivered on the next run (at-least-once), so
199    /// dropping the run mid-flight is safe.
200    #[tracing::instrument(name = "daemon.run", skip_all)]
201    pub async fn run(self, shutdown: impl Future<Output = ()> + Send) -> anyhow::Result<()> {
202        let RunningDaemon {
203            status,
204            engine,
205            source,
206            observer,
207            lag_poll_interval,
208        } = self;
209
210        // Held in a guard so it's aborted however this returns — a normal stop
211        // *or* the future being cancelled (e.g. the binary dropping the run for a
212        // reindex restart) — rather than detaching onto the shared status.
213        let _lag = LagGuard(tokio::spawn(lag::poll(source, observer, lag_poll_interval)));
214
215        let result = tokio::select! {
216            res = engine.run() => res.context("sync engine stopped"),
217            () = shutdown => {
218                tracing::info!("shutdown requested; stopping pipeline");
219                Ok(())
220            }
221        };
222
223        status.set_phase(Phase::Stopped);
224        result
225    }
226}
227
228/// Aborts the lag poller when dropped — on a normal stop or on cancellation
229/// (the run future being dropped for a restart) alike. Its result is discarded,
230/// so there's nothing to join.
231#[derive(Debug)]
232struct LagGuard(tokio::task::JoinHandle<()>);
233
234impl Drop for LagGuard {
235    fn drop(&mut self) {
236        self.0.abort();
237    }
238}
239
240#[cfg(test)]
241#[allow(clippy::unwrap_used)]
242mod tests;