Skip to main content

engine/
lib.rs

1//! The `flusso` sync engine.
2//!
3//! Wires the pluggable edges together and runs the pipeline:
4//!
5//! ```text
6//! ChangeCapture ─▶ queue ─▶ resolve ─▶ build ─▶ Sink ─▶ flush ─▶ ack
7//! ```
8//!
9//! A **capture** task drains the source's change stream into a bounded
10//! in-process [`queue`](queue_channel) (back-pressure: capture blocks when the
11//! queue is full). A **worker** pulls changes and, for the row each names,
12//! resolves the affected document ids, assembles each one, and writes it to the
13//! [`Sink`]'s buffer.
14//!
15//! Writes are **batched**: the worker groups up to [`BatchPolicy::max_changes`]
16//! changes (or whatever arrives within [`BatchPolicy::max_delay`], whichever
17//! comes first) into a single [`flush`](Sink::flush), turning N changes into
18//! ⌈N / max_changes⌉ bulk round-trips instead of N. The source acks for a batch
19//! are confirmed **only after** the flush that persisted their documents, so the
20//! replication slot advances past a change exactly when its documents are
21//! durable downstream — preserving at-least-once delivery: a crash before the
22//! flush leaves the whole batch unconfirmed, so it is redelivered on restart and
23//! re-applied idempotently (documents are rebuilt from the current row and
24//! written by deterministic id).
25//!
26//! Stopping on any error is therefore safe: unconfirmed changes are redelivered
27//! when the run restarts.
28//!
29//! Before anything else, the engine asks the [`DocumentBuilder`] for each
30//! index's resolved mapping and tells the sink to create it
31//! ([`ensure_index`](Sink::ensure_index)) — so the destination uses the
32//! configured field types instead of guessing. This is idempotent, so it runs
33//! on every start, including resumes.
34//!
35//! Before live capture, the engine runs an optional **backfill** phase. It asks
36//! the [`DocumentBuilder`] which indexes exist and the sink whether each is
37//! already seeded; for those that aren't, it asks the source to
38//! [`snapshot`](ChangeCapture::snapshot) their root tables and drives that
39//! finite stream through the same queue → resolve → build → sink path (scoped to
40//! just the unseeded indexes), then records each as seeded. So "is a backfill
41//! needed?" is the destination's call, not the source's.
42//!
43//! The queue, source, sink, and document builder are all trait objects, so the
44//! backend choices (WAL vs polling, stdout vs OpenSearch, channel vs a durable
45//! broker) are swappable without touching this loop.
46//!
47//! The implementation is split across modules: `policy` holds the run
48//! configuration ([`BatchPolicy`], [`FailurePolicies`]); `pipeline` holds the
49//! `Pipeline` execution machinery this `Engine` drives; `observer` the progress
50//! trait; `error` the error type.
51
52// The pipeline benchmark (in `benches/`) pulls a concrete source and sink as
53// dev-dependencies the unit-test build doesn't touch; allow that only under
54// `cfg(test)` — the normal build still enforces unused dependencies.
55#![cfg_attr(test, allow(unused_crate_dependencies))]
56
57mod error;
58mod observer;
59mod pipeline;
60mod policy;
61
62#[cfg(test)]
63mod tests;
64
65pub use error::*;
66pub use observer::*;
67pub use policy::{BatchPolicy, FailurePolicies, FailurePolicy};
68
69use std::sync::Arc;
70
71use sinks_core::Sink;
72use sources_core::cdc::ChangeCapture;
73use sources_core::document::DocumentBuilder;
74
75use crate::pipeline::{Pipeline, run_inner};
76
77/// Pending changes buffered between capture and the worker.
78const DEFAULT_QUEUE_CAPACITY: usize = 1024;
79
80/// Drives changes from a source through to a sink.
81#[derive(Debug)]
82pub struct Engine {
83    source: Arc<dyn ChangeCapture>,
84    documents: Arc<dyn DocumentBuilder>,
85    sink: Arc<dyn Sink>,
86    observer: Arc<dyn Observer>,
87    queue_capacity: usize,
88    batch: BatchPolicy,
89    skip_backfill: bool,
90    failure_policies: FailurePolicies,
91}
92
93impl Engine {
94    pub fn new(
95        source: Arc<dyn ChangeCapture>,
96        documents: Arc<dyn DocumentBuilder>,
97        sink: Arc<dyn Sink>,
98    ) -> Self {
99        Self {
100            source,
101            documents,
102            sink,
103            observer: Arc::new(NoopObserver),
104            queue_capacity: DEFAULT_QUEUE_CAPACITY,
105            batch: BatchPolicy::default(),
106            skip_backfill: false,
107            failure_policies: FailurePolicies::default(),
108        }
109    }
110
111    /// Report lifecycle and progress events to `observer` (metrics, a live
112    /// status surface, …). Defaults to [`NoopObserver`]. See [`Observer`].
113    pub fn with_observer(mut self, observer: Arc<dyn Observer>) -> Self {
114        self.observer = observer;
115        self
116    }
117
118    /// Set how many changes may buffer between capture and the worker.
119    pub fn with_queue_capacity(mut self, capacity: usize) -> Self {
120        self.queue_capacity = capacity.max(1);
121        self
122    }
123
124    /// Set how the worker groups changes into one sink flush (see
125    /// [`BatchPolicy`]). `max_changes` is clamped to at least 1.
126    pub fn with_batch(mut self, batch: BatchPolicy) -> Self {
127        self.batch = BatchPolicy {
128            max_changes: batch.max_changes.max(1),
129            ..batch
130        };
131        self
132    }
133
134    /// Force-skip the backfill phase entirely, regardless of what the sink
135    /// reports. An escape hatch for sinks that can't persist seeded-state (so
136    /// they would otherwise re-seed every run) or to resume without re-checking.
137    pub fn skip_backfill(mut self, skip: bool) -> Self {
138        self.skip_backfill = skip;
139        self
140    }
141
142    /// Set how the engine resolves the policy for documents a sink rejects at
143    /// the item level (see [`FailurePolicies`]). Defaults to
144    /// [`FailurePolicy::Stop`] for every index.
145    pub fn with_failure_policies(mut self, policies: FailurePolicies) -> Self {
146        self.failure_policies = policies;
147        self
148    }
149
150    /// Run until the live change stream ends or an error stops the pipeline.
151    ///
152    /// First seeds any unseeded index (unless [`skip_backfill`](Self::skip_backfill)
153    /// is set), then follows live changes.
154    #[tracing::instrument(
155        name = "engine.run",
156        skip_all,
157        fields(
158            skip_backfill = self.skip_backfill,
159            queue_capacity = self.queue_capacity,
160            max_changes = self.batch.max_changes,
161            max_delay_ms = self.batch.max_delay.as_millis() as u64,
162        ),
163    )]
164    pub async fn run(self) -> Result<()> {
165        let Engine {
166            source,
167            documents,
168            sink,
169            observer,
170            queue_capacity,
171            batch,
172            skip_backfill,
173            failure_policies,
174        } = self;
175        let pipeline = Pipeline {
176            documents: documents.as_ref(),
177            sink: sink.as_ref(),
178            observer: &observer,
179            queue_capacity,
180            batch,
181            failure_policies: &failure_policies,
182        };
183        let result = run_inner(pipeline, source.as_ref(), skip_backfill).await;
184        if let Err(error) = &result {
185            observer.on_error(&error.to_string());
186        }
187        result
188    }
189}