Skip to main content

daemon/
lib.rs

1//! The `flusso` daemon — the supervisor around the [`engine`].
2//!
3//! It builds the pluggable parts from a validated [`Config`], wires a
4//! [`StatusObserver`] that updates a shared [`Status`], runs the engine, and
5//! polls source lag out of band.
6//!
7//! It owns the **domain**: the pipeline and its observable state, and it is
8//! telemetry-agnostic — it depends only on the engine's [`Observer`] trait, not
9//! on any metrics backend. It does *not* own **transport**: the HTTP surface,
10//! process signals, the telemetry exporter, *and the metrics recording itself*
11//! live in the binary (the CLI), which installs a meter provider, attaches its
12//! own metrics observer via [`Daemon::with_observer`], reads the [`Status`]
13//! handle this exposes, serves it, and drives shutdown:
14//!
15//! ```text
16//!   CLI ── install meter provider ─▶ Daemon::start ──▶ RunningDaemon
17//!    │                                                   │  .status() ─▶ Arc<Status>  (CLI serves it)
18//!    └── shutdown future (signals) ─▶ RunningDaemon::run(shutdown)
19//! ```
20
21mod backends;
22mod lag;
23mod observer;
24pub mod status;
25
26pub use backends::{Backends, SourceParts};
27pub use observer::StatusObserver;
28pub use status::{IndexState, Phase, Status, StatusSnapshot};
29
30// Re-exported so a binary can attach its own observer (e.g. a metrics recorder)
31// without depending on `engine`/`schema-core` directly — these are part of the
32// daemon's observe-the-pipeline surface.
33pub use engine::{BatchStats, Observer};
34pub use schema_core::IndexName;
35
36use std::future::Future;
37use std::sync::Arc;
38use std::time::{Duration, Instant};
39
40use anyhow::Context;
41use engine::{Engine, FailurePolicies, FanOut};
42use schema::Config;
43use sources_core::cdc::ChangeCapture;
44
45/// How a [`Daemon`] run is parameterized — the pipeline knobs the CLI exposes as
46/// flags. Transport settings (HTTP address, …) are the binary's concern, not the
47/// daemon's, so they are not here.
48#[derive(Debug, Clone)]
49pub struct DaemonOptions {
50    /// Logical replication slot to consume. Must already exist or be creatable.
51    pub slot: String,
52    /// Publication to subscribe to.
53    pub publication: String,
54    /// Auto-create/extend the publication to cover every table the indexes read
55    /// when the source role is privileged enough. When false, a coverage gap is
56    /// only reported (the source still streams whatever the publication covers).
57    pub manage_publication: bool,
58    /// Skip the initial backfill and resume live capture only.
59    pub skip_backfill: bool,
60    /// Changes buffered between capture and processing.
61    pub queue_capacity: usize,
62    /// Pretty-print documents on the stdout fallback sink (no sink configured).
63    pub pretty: bool,
64    /// How often to sample source capture lag.
65    pub lag_poll_interval: Duration,
66}
67
68impl Default for DaemonOptions {
69    fn default() -> Self {
70        Self {
71            slot: "flusso".to_owned(),
72            publication: "flusso".to_owned(),
73            manage_publication: true,
74            skip_backfill: false,
75            queue_capacity: 1024,
76            pretty: false,
77            lag_poll_interval: Duration::from_secs(15),
78        }
79    }
80}
81
82/// A configured-but-not-yet-running sync daemon over one [`Config`].
83#[derive(Debug)]
84pub struct Daemon {
85    config: Config,
86    options: DaemonOptions,
87    backends: Arc<dyn Backends>,
88    extra_observers: Vec<Arc<dyn Observer>>,
89    status: Option<Arc<Status>>,
90}
91
92impl Daemon {
93    /// Create a daemon for `config` with default [`DaemonOptions`].
94    ///
95    /// `backends` builds the concrete source/sink the engine drives; the daemon
96    /// itself never names a backend (see [`Backends`]). The composition root
97    /// supplies it.
98    pub fn new(config: Config, backends: Arc<dyn Backends>) -> Self {
99        Self {
100            config,
101            options: DaemonOptions::default(),
102            backends,
103            extra_observers: Vec::new(),
104            status: None,
105        }
106    }
107
108    pub fn with_options(mut self, options: DaemonOptions) -> Self {
109        self.options = options;
110        self
111    }
112
113    /// Attach an additional [`Observer`] alongside the daemon's own status
114    /// observer — e.g. a metrics recorder the binary owns. All attached
115    /// observers receive every event (the engine drives a [`FanOut`]).
116    pub fn with_observer(mut self, observer: Arc<dyn Observer>) -> Self {
117        self.extra_observers.push(observer);
118        self
119    }
120
121    /// Provide the [`Status`] handle to update instead of minting a fresh one.
122    ///
123    /// The binary uses this to keep **one** process-lifetime status across
124    /// pipeline restarts (e.g. an on-demand reindex): the long-lived HTTP surface
125    /// and metrics keep reading the same handle, and its counters and uptime
126    /// survive the restart rather than resetting. Without it, [`start`](Self::start)
127    /// creates a new status each time.
128    pub fn with_status(mut self, status: Arc<Status>) -> Self {
129        self.status = Some(status);
130        self
131    }
132
133    /// Build the pipeline and its observable state, returning a [`RunningDaemon`]
134    /// whose [`status`](RunningDaemon::status) can be read (e.g. served over HTTP)
135    /// while it runs.
136    ///
137    /// If an attached observer (via [`with_observer`](Self::with_observer)) records
138    /// to the global OpenTelemetry meter, install a meter provider *before* calling
139    /// this; otherwise its instruments are no-ops.
140    #[tracing::instrument(name = "daemon.start", skip_all)]
141    pub async fn start(self) -> anyhow::Result<RunningDaemon> {
142        let Daemon {
143            config,
144            options,
145            backends,
146            extra_observers,
147            status,
148        } = self;
149
150        tracing::info!(
151            slot = %options.slot,
152            publication = %options.publication,
153            indexes = config.indexes.len(),
154            "starting sync",
155        );
156
157        // Reset the phase to `Starting`: a reused status may
158        // have been left `Stopped` by a previous run.
159        let status = status.unwrap_or_else(|| {
160            Arc::new(Status::new(config.indexes.keys().cloned(), Instant::now()))
161        });
162        status.set_phase(Phase::Starting);
163        let mut observers: Vec<Arc<dyn Observer>> =
164            vec![Arc::new(StatusObserver::new(Arc::clone(&status)))];
165        observers.extend(extra_observers);
166        let observer: Arc<dyn Observer> = Arc::new(FanOut::new(observers));
167
168        let config = Arc::new(config);
169        let SourceParts { capture, documents } =
170            backends.source(Arc::clone(&config), &options).await?;
171        let sink = backends.sink(&config, &options).await?;
172
173        let mut failure_policies = FailurePolicies::new(config.on_error);
174        for (name, index) in &config.indexes {
175            if let Some(policy) = index.on_error {
176                failure_policies = failure_policies.with_override(name.as_ref(), policy);
177            }
178        }
179
180        let engine = Engine::new(Arc::clone(&capture), documents, sink)
181            .with_observer(Arc::clone(&observer))
182            .with_queue_capacity(options.queue_capacity)
183            .skip_backfill(options.skip_backfill)
184            .with_failure_policies(failure_policies);
185
186        Ok(RunningDaemon {
187            status,
188            engine,
189            source: capture,
190            observer,
191            lag_poll_interval: options.lag_poll_interval,
192        })
193    }
194}
195
196/// A built sync daemon, ready to run. Exposes its live [`Status`] so a transport
197/// the binary owns can serve it concurrently with the run.
198#[derive(Debug)]
199pub struct RunningDaemon {
200    status: Arc<Status>,
201    engine: Engine,
202    source: Arc<dyn ChangeCapture>,
203    observer: Arc<dyn Observer>,
204    lag_poll_interval: Duration,
205}
206
207impl RunningDaemon {
208    /// A handle to the live operational status, for a transport (HTTP, a TUI, …)
209    /// to read while the daemon runs. Cheap to clone.
210    pub fn status(&self) -> Arc<Status> {
211        Arc::clone(&self.status)
212    }
213
214    /// Run until the live stream ends, an error stops the pipeline, or `shutdown`
215    /// resolves — typically a signal future the binary owns. A pending batch on
216    /// shutdown is simply redelivered on the next run (at-least-once), so
217    /// dropping the run mid-flight is safe.
218    #[tracing::instrument(name = "daemon.run", skip_all)]
219    pub async fn run(self, shutdown: impl Future<Output = ()> + Send) -> anyhow::Result<()> {
220        let RunningDaemon {
221            status,
222            engine,
223            source,
224            observer,
225            lag_poll_interval,
226        } = self;
227
228        // Held in a guard so it's aborted however this returns — a normal stop
229        // *or* the future being cancelled (e.g. the binary dropping the run for a
230        // reindex restart) — rather than detaching onto the shared status.
231        let _lag = LagGuard(tokio::spawn(lag::poll(source, observer, lag_poll_interval)));
232
233        let result = tokio::select! {
234            res = engine.run() => res.context("sync engine stopped"),
235            () = shutdown => {
236                tracing::info!("shutdown requested; stopping pipeline");
237                Ok(())
238            }
239        };
240
241        status.set_phase(Phase::Stopped);
242        result
243    }
244}
245
246/// Aborts the lag poller when dropped — on a normal stop or on cancellation
247/// (the run future being dropped for a restart) alike. Its result is discarded,
248/// so there's nothing to join.
249#[derive(Debug)]
250struct LagGuard(tokio::task::JoinHandle<()>);
251
252impl Drop for LagGuard {
253    fn drop(&mut self) {
254        self.0.abort();
255    }
256}
257
258#[cfg(test)]
259#[allow(clippy::unwrap_used)]
260mod tests {
261    use super::*;
262
263    use std::collections::BTreeMap;
264    use std::sync::Mutex;
265    use std::sync::atomic::{AtomicU64, Ordering};
266    use std::time::Duration;
267
268    use async_trait::async_trait;
269    use engine::BatchStats;
270    use futures::stream::{self, BoxStream};
271    use schema::{Source, SourceType};
272    use schema_core::{ColumnName, DatabaseSchema, GenericValue, IndexName, TableName};
273    use sinks_core::{FlushReport, Sink};
274    use sources_core::cdc::{Ack, AckSink, Change, ChangeEvent};
275    use sources_core::document::{Document, DocumentBuilder, DocumentId, IndexScope};
276    use sources_core::{RowKey, SnapshotTable};
277    use tokio::sync::Notify;
278
279    use crate::observer::StatusObserver;
280    use crate::status::{IndexState, Phase};
281
282    fn users() -> IndexName {
283        IndexName::try_new("users").unwrap()
284    }
285
286    /// The observer drives the status surface through a full lifecycle, and the
287    /// snapshot serializes to the expected JSON shape.
288    #[test]
289    fn observer_drives_status_through_its_lifecycle() {
290        let status = Arc::new(Status::new([users()], Instant::now()));
291        let observer = StatusObserver::new(Arc::clone(&status));
292
293        // Starts pending, before any events.
294        let snap = status.snapshot();
295        assert_eq!(snap.phase, Phase::Starting);
296        assert_eq!(snap.indexes.get("users"), Some(&IndexState::Pending));
297
298        observer.on_indexes_ensured(1);
299        observer.on_backfill_started(&[users()]);
300        let snap = status.snapshot();
301        assert_eq!(snap.phase, Phase::Backfilling);
302        assert_eq!(snap.indexes.get("users"), Some(&IndexState::Backfilling));
303
304        observer.on_index_seeded(&users());
305        observer.on_backfill_completed();
306        observer.on_live_started();
307
308        // Three changes captured, two distinct documents built in one batch.
309        observer.on_change_captured();
310        observer.on_change_captured();
311        observer.on_change_captured();
312        observer.on_batch_committed(BatchStats {
313            changes: 3,
314            documents: 2,
315            documents_by_index: vec![(users(), 2)],
316            flush: Duration::from_millis(5),
317        });
318        observer.on_slot_lag(4096);
319
320        let snap = status.snapshot();
321        assert_eq!(snap.phase, Phase::Live);
322        assert_eq!(snap.indexes.get("users"), Some(&IndexState::Seeded));
323        assert_eq!(snap.changes_captured, 3);
324        assert_eq!(snap.changes_committed, 3);
325        assert_eq!(snap.changes_in_flight, 0);
326        assert_eq!(snap.documents_built, 2);
327        assert_eq!(snap.batches, 1);
328        assert_eq!(snap.slot_lag_bytes, Some(4096));
329        assert_eq!(snap.errors, 0);
330
331        // The JSON the `/status` endpoint returns.
332        let json = serde_json::to_value(&snap).unwrap();
333        assert_eq!(json["phase"], "live");
334        assert_eq!(json["indexes"]["users"], "seeded");
335        assert_eq!(json["changes_in_flight"], 0);
336        assert_eq!(json["slot_lag_bytes"], 4096);
337    }
338
339    /// Reaching live with a never-backfilled index (already seeded on start)
340    /// still reports it seeded, and an error moves the phase to `Stopped`.
341    #[test]
342    fn already_seeded_index_and_error_phase() {
343        let status = Arc::new(Status::new([users()], Instant::now()));
344        let observer = StatusObserver::new(Arc::clone(&status));
345
346        // No backfill_started for `users` — it was already seeded.
347        observer.on_live_started();
348        assert_eq!(
349            status.snapshot().indexes.get("users"),
350            Some(&IndexState::Seeded),
351            "an index live without a backfill this run is reported seeded",
352        );
353
354        observer.on_error("boom");
355        let snap = status.snapshot();
356        assert_eq!(snap.phase, Phase::Stopped);
357        assert_eq!(snap.errors, 1);
358        assert_eq!(snap.last_error.as_deref(), Some("boom"));
359    }
360
361    /// A source that reports a fixed lag and an empty live stream.
362    #[derive(Debug)]
363    struct LaggySource(Option<u64>);
364
365    #[async_trait]
366    impl ChangeCapture for LaggySource {
367        async fn live(
368            &self,
369        ) -> sources_core::Result<BoxStream<'static, sources_core::Result<Change>>> {
370            Ok(Box::pin(stream::empty()))
371        }
372
373        async fn lag(&self) -> sources_core::Result<Option<u64>> {
374            Ok(self.0)
375        }
376    }
377
378    /// Records the slot lag it's told and signals each report, so the poller
379    /// test can await a real report instead of sleeping a fixed duration.
380    #[derive(Debug, Default)]
381    struct LagObserver {
382        last: Mutex<Option<u64>>,
383        reported: Notify,
384    }
385
386    impl Observer for LagObserver {
387        fn on_slot_lag(&self, bytes: u64) {
388            *self.last.lock().unwrap() = Some(bytes);
389            self.reported.notify_one();
390        }
391    }
392
393    /// The lag poller samples the source and reports each known value to the
394    /// observer. Deterministic: it awaits an actual report (the poller's first
395    /// interval tick fires immediately), with a generous timeout as a backstop.
396    #[tokio::test]
397    async fn lag_poller_reports_each_sampled_value() {
398        let observer = Arc::new(LagObserver::default());
399        let source: Arc<dyn ChangeCapture> = Arc::new(LaggySource(Some(8192)));
400
401        let handle = tokio::spawn(lag::poll(
402            source,
403            Arc::clone(&observer) as Arc<dyn Observer>,
404            Duration::from_millis(5),
405        ));
406        tokio::time::timeout(Duration::from_secs(5), observer.reported.notified())
407            .await
408            .expect("the poller should report a lag sample");
409        handle.abort();
410
411        assert_eq!(*observer.last.lock().unwrap(), Some(8192));
412    }
413
414    // --- The daemon driven end-to-end over injected backends -----------------
415    //
416    // These exercise `Daemon::start`/`run` with no Postgres/OpenSearch by
417    // supplying a `Backends` that hands back test doubles — the seam the
418    // pluggable-backends refactor exists to enable.
419
420    /// A `Backends` that returns pre-built test doubles, ignoring the `Config`.
421    /// Counts how often each edge was asked for, to prove the daemon builds its
422    /// backends *through* the seam rather than naming any concrete one.
423    #[derive(Debug)]
424    struct MockBackends {
425        capture: Arc<dyn ChangeCapture>,
426        documents: Arc<dyn DocumentBuilder>,
427        sink: Arc<dyn Sink>,
428        source_built: Arc<AtomicU64>,
429        sink_built: Arc<AtomicU64>,
430    }
431
432    #[async_trait]
433    impl Backends for MockBackends {
434        async fn source(
435            &self,
436            _config: Arc<Config>,
437            _options: &DaemonOptions,
438        ) -> anyhow::Result<SourceParts> {
439            self.source_built.fetch_add(1, Ordering::SeqCst);
440            Ok(SourceParts {
441                capture: Arc::clone(&self.capture),
442                documents: Arc::clone(&self.documents),
443            })
444        }
445
446        async fn sink(
447            &self,
448            _config: &Config,
449            _options: &DaemonOptions,
450        ) -> anyhow::Result<Arc<dyn Sink>> {
451            self.sink_built.fetch_add(1, Ordering::SeqCst);
452            Ok(Arc::clone(&self.sink))
453        }
454    }
455
456    /// Replays a fixed list of changes on the live stream, once, then ends — so
457    /// `engine.run()` completes on its own without a shutdown signal.
458    #[derive(Debug)]
459    struct ScriptedSource {
460        changes: Mutex<Option<Vec<Change>>>,
461    }
462
463    #[async_trait]
464    impl ChangeCapture for ScriptedSource {
465        async fn live(
466            &self,
467        ) -> sources_core::Result<BoxStream<'static, sources_core::Result<Change>>> {
468            let changes = self.changes.lock().unwrap().take().unwrap_or_default();
469            Ok(Box::pin(stream::iter(
470                changes
471                    .into_iter()
472                    .map(Ok::<Change, sources_core::SourceError>),
473            )))
474        }
475
476        async fn lag(&self) -> sources_core::Result<Option<u64>> {
477            Ok(None)
478        }
479    }
480
481    /// Resolves each change to one `users` document; key value `2` is a delete.
482    #[derive(Debug)]
483    struct ScriptedDocuments;
484
485    #[async_trait]
486    impl DocumentBuilder for ScriptedDocuments {
487        async fn resolve(
488            &self,
489            _table: &TableName,
490            key: &RowKey,
491        ) -> sources_core::Result<Vec<DocumentId>> {
492            Ok(vec![DocumentId {
493                index: users(),
494                key: key.clone(),
495            }])
496        }
497
498        async fn build(&self, id: &DocumentId) -> sources_core::Result<Document> {
499            let deleted = matches!(id.key.0.first(), Some((_, GenericValue::Int(2))));
500            Ok(if deleted {
501                Document::Delete { id: id.clone() }
502            } else {
503                Document::Upsert {
504                    id: id.clone(),
505                    body: GenericValue::Map(Default::default()),
506                }
507            })
508        }
509
510        fn backfill_scopes(&self) -> Vec<IndexScope> {
511            vec![IndexScope {
512                index: users(),
513                root: SnapshotTable {
514                    db_schema: DatabaseSchema::try_new("public").unwrap(),
515                    table: TableName::try_new("users").unwrap(),
516                },
517            }]
518        }
519    }
520
521    /// Records the sink ops it receives, in order.
522    #[derive(Debug, Default)]
523    struct RecordingSink {
524        ops: Arc<Mutex<Vec<String>>>,
525    }
526
527    #[async_trait]
528    impl Sink for RecordingSink {
529        async fn upsert(
530            &self,
531            index: &IndexName,
532            id: &str,
533            _document: &GenericValue,
534        ) -> sinks_core::Result<()> {
535            self.ops
536                .lock()
537                .unwrap()
538                .push(format!("upsert {} {id}", index.as_ref()));
539            Ok(())
540        }
541
542        async fn delete(&self, index: &IndexName, id: &str) -> sinks_core::Result<()> {
543            self.ops
544                .lock()
545                .unwrap()
546                .push(format!("delete {} {id}", index.as_ref()));
547            Ok(())
548        }
549
550        async fn flush(&self, _caught_up: bool) -> sinks_core::Result<FlushReport> {
551            Ok(FlushReport::clean())
552        }
553    }
554
555    /// Counts the changes confirmed back to the source.
556    #[derive(Debug)]
557    struct CountingAck(Arc<AtomicU64>);
558
559    impl AckSink for CountingAck {
560        fn confirm(&self, _seq: u64) {
561            self.0.fetch_add(1, Ordering::SeqCst);
562        }
563    }
564
565    fn row_change(delete: bool, id: i64, seq: u64, acks: &Arc<AtomicU64>) -> Change {
566        let table = TableName::try_new("users").unwrap();
567        let key = RowKey(vec![(
568            ColumnName::try_new("id").unwrap(),
569            GenericValue::Int(id),
570        )]);
571        let event = if delete {
572            ChangeEvent::Delete { table, key }
573        } else {
574            ChangeEvent::Upsert { table, key }
575        };
576        Change {
577            event,
578            ack: Ack::new(seq, Arc::new(CountingAck(Arc::clone(acks)))),
579        }
580    }
581
582    /// A config the `MockBackends` ignores — only `indexes` is read by the
583    /// daemon (for the status surface), and it's intentionally empty.
584    fn backendless_config() -> Config {
585        Config {
586            source: Source {
587                source_type: SourceType::Postgres,
588                connection: None,
589                manage_publication: true,
590            },
591            sinks: BTreeMap::new(),
592            indexes: BTreeMap::new(),
593            on_error: Default::default(),
594            server: Default::default(),
595        }
596    }
597
598    fn daemon_over(backends: Arc<MockBackends>) -> Daemon {
599        Daemon::new(backendless_config(), backends).with_options(DaemonOptions {
600            // No backfill: the test drives the live path directly.
601            skip_backfill: true,
602            ..DaemonOptions::default()
603        })
604    }
605
606    /// `Daemon::start` builds both edges **through** the injected `Backends`
607    /// (never naming a concrete backend itself), and a run over an empty live
608    /// stream returns cleanly with the status surface left at `Stopped`.
609    #[tokio::test]
610    async fn start_builds_backends_through_the_seam() {
611        let source_built = Arc::new(AtomicU64::new(0));
612        let sink_built = Arc::new(AtomicU64::new(0));
613
614        let backends = Arc::new(MockBackends {
615            capture: Arc::new(ScriptedSource {
616                changes: Mutex::new(Some(Vec::new())),
617            }),
618            documents: Arc::new(ScriptedDocuments),
619            sink: Arc::new(RecordingSink::default()),
620            source_built: Arc::clone(&source_built),
621            sink_built: Arc::clone(&sink_built),
622        });
623
624        let running = daemon_over(backends).start().await.unwrap();
625        let status = running.status();
626
627        // The daemon asked the seam — not a hardcoded backend — for each edge.
628        assert_eq!(source_built.load(Ordering::SeqCst), 1);
629        assert_eq!(sink_built.load(Ordering::SeqCst), 1);
630
631        // An empty live stream completes on its own; the shutdown never fires.
632        running.run(std::future::pending::<()>()).await.unwrap();
633
634        let snap = status.snapshot();
635        assert_eq!(snap.phase, Phase::Stopped);
636        assert_eq!(snap.changes_committed, 0);
637    }
638
639    /// A run over a non-empty live stream drives changes all the way through the
640    /// injected document builder and sink — capture, build, write, flush, ack —
641    /// with no real source or sink, and the status counters reflect it.
642    #[tokio::test]
643    async fn drives_changes_through_injected_backends() {
644        let acks = Arc::new(AtomicU64::new(0));
645        let ops = Arc::new(Mutex::new(Vec::new()));
646
647        let changes = vec![
648            row_change(false, 1, 0, &acks),
649            row_change(true, 2, 1, &acks),
650        ];
651
652        let backends = Arc::new(MockBackends {
653            capture: Arc::new(ScriptedSource {
654                changes: Mutex::new(Some(changes)),
655            }),
656            documents: Arc::new(ScriptedDocuments),
657            sink: Arc::new(RecordingSink {
658                ops: Arc::clone(&ops),
659            }),
660            source_built: Arc::new(AtomicU64::new(0)),
661            sink_built: Arc::new(AtomicU64::new(0)),
662        });
663
664        let running = daemon_over(backends).start().await.unwrap();
665        let status = running.status();
666        running.run(std::future::pending::<()>()).await.unwrap();
667
668        assert_eq!(
669            *ops.lock().unwrap(),
670            vec!["upsert users 1".to_owned(), "delete users 2".to_owned()],
671        );
672        assert_eq!(acks.load(Ordering::SeqCst), 2, "both changes acked");
673
674        let snap = status.snapshot();
675        assert_eq!(snap.changes_captured, 2);
676        assert_eq!(snap.changes_committed, 2);
677        assert_eq!(snap.changes_in_flight, 0);
678        assert_eq!(snap.phase, Phase::Stopped);
679    }
680}