Skip to main content

engine/
lib.rs

1//! The `flusso` sync engine.
2//!
3//! Wires the pluggable edges together and runs the pipeline:
4//!
5//! ```text
6//! ChangeCapture ─▶ queue ─▶ resolve ─▶ build ─▶ Sink ─▶ flush ─▶ ack
7//! ```
8//!
9//! A **capture** task drains the source's change stream into a bounded
10//! in-process [`queue`](queue_channel) (back-pressure: capture blocks when the
11//! queue is full). A **worker** pulls changes and, for the row each names,
12//! resolves the affected document ids, assembles each one, and writes it to the
13//! [`Sink`]'s buffer.
14//!
15//! Writes are **batched**: the worker groups up to [`BatchPolicy::max_changes`]
16//! changes (or whatever arrives within [`BatchPolicy::max_delay`], whichever
17//! comes first) into a single [`flush`](Sink::flush), turning N changes into
18//! ⌈N / max_changes⌉ bulk round-trips instead of N. The source acks for a batch
19//! are confirmed **only after** the flush that persisted their documents, so the
20//! replication slot advances past a change exactly when its documents are
21//! durable downstream — preserving at-least-once delivery: a crash before the
22//! flush leaves the whole batch unconfirmed, so it is redelivered on restart and
23//! re-applied idempotently (documents are rebuilt from the current row and
24//! written by deterministic id).
25//!
26//! Stopping on any error is therefore safe: unconfirmed changes are redelivered
27//! when the run restarts.
28//!
29//! Before anything else, the engine asks the [`DocumentBuilder`] for each
30//! index's resolved mapping and tells the sink to create it
31//! ([`ensure_index`](Sink::ensure_index)) — so the destination uses the
32//! configured field types instead of guessing. This is idempotent, so it runs
33//! on every start, including resumes.
34//!
35//! Before live capture, the engine runs an optional **backfill** phase. It asks
36//! the [`DocumentBuilder`] which indexes exist and the sink whether each is
37//! already seeded; for those that aren't, it asks the source to
38//! [`snapshot`](ChangeCapture::snapshot) their root tables and drives that
39//! finite stream through the same queue → resolve → build → sink path (scoped to
40//! just the unseeded indexes), then records each as seeded. So "is a backfill
41//! needed?" is the destination's call, not the source's.
42//!
43//! The queue, source, sink, and document builder are all trait objects, so the
44//! backend choices (WAL vs polling, stdout vs OpenSearch, channel vs a durable
45//! broker) are swappable without touching this loop.
46//!
47//! The implementation is split across modules: `policy` holds the run
48//! configuration ([`BatchPolicy`], [`FailurePolicies`]); `pipeline` holds the
49//! `Pipeline` execution machinery this `Engine` drives; `observer` the progress
50//! trait; `error` the error type.
51
52// The pipeline benchmark (in `benches/`) pulls a concrete source and sink as
53// dev-dependencies the unit-test build doesn't touch; allow that only under
54// `cfg(test)` — the normal build still enforces unused dependencies.
55#![cfg_attr(test, allow(unused_crate_dependencies))]
56
57mod error;
58mod observer;
59mod pipeline;
60mod policy;
61
62#[cfg(test)]
63mod tests;
64
65pub use error::*;
66pub use observer::*;
67pub use policy::{BatchPolicy, FailurePolicies, FailurePolicy};
68
69use std::sync::Arc;
70
71use sinks_core::Sink;
72use sources_core::cdc::ChangeCapture;
73use sources_core::document::DocumentBuilder;
74
75use crate::pipeline::{Pipeline, run_inner};
76
77/// Pending changes buffered between capture and the worker.
78const DEFAULT_QUEUE_CAPACITY: usize = 1024;
79
80/// Drives changes from a source through to a sink.
81#[derive(Debug)]
82pub struct Engine {
83    source: Arc<dyn ChangeCapture>,
84    documents: Arc<dyn DocumentBuilder>,
85    sink: Arc<dyn Sink>,
86    observer: Arc<dyn Observer>,
87    queue_capacity: usize,
88    batch: BatchPolicy,
89    skip_backfill: bool,
90    failure_policies: FailurePolicies,
91}
92
93impl Engine {
94    /// Assemble an engine from its pluggable parts.
95    pub fn new(
96        source: Arc<dyn ChangeCapture>,
97        documents: Arc<dyn DocumentBuilder>,
98        sink: Arc<dyn Sink>,
99    ) -> Self {
100        Self {
101            source,
102            documents,
103            sink,
104            observer: Arc::new(NoopObserver),
105            queue_capacity: DEFAULT_QUEUE_CAPACITY,
106            batch: BatchPolicy::default(),
107            skip_backfill: false,
108            failure_policies: FailurePolicies::default(),
109        }
110    }
111
112    /// Report lifecycle and progress events to `observer` (metrics, a live
113    /// status surface, …). Defaults to [`NoopObserver`]. See [`Observer`].
114    pub fn with_observer(mut self, observer: Arc<dyn Observer>) -> Self {
115        self.observer = observer;
116        self
117    }
118
119    /// Set how many changes may buffer between capture and the worker.
120    pub fn with_queue_capacity(mut self, capacity: usize) -> Self {
121        self.queue_capacity = capacity.max(1);
122        self
123    }
124
125    /// Set how the worker groups changes into one sink flush (see
126    /// [`BatchPolicy`]). `max_changes` is clamped to at least 1.
127    pub fn with_batch(mut self, batch: BatchPolicy) -> Self {
128        self.batch = BatchPolicy {
129            max_changes: batch.max_changes.max(1),
130            ..batch
131        };
132        self
133    }
134
135    /// Force-skip the backfill phase entirely, regardless of what the sink
136    /// reports. An escape hatch for sinks that can't persist seeded-state (so
137    /// they would otherwise re-seed every run) or to resume without re-checking.
138    pub fn skip_backfill(mut self, skip: bool) -> Self {
139        self.skip_backfill = skip;
140        self
141    }
142
143    /// Set how the engine resolves the policy for documents a sink rejects at
144    /// the item level (see [`FailurePolicies`]). Defaults to
145    /// [`FailurePolicy::Stop`] for every index.
146    pub fn with_failure_policies(mut self, policies: FailurePolicies) -> Self {
147        self.failure_policies = policies;
148        self
149    }
150
151    /// Run until the live change stream ends or an error stops the pipeline.
152    ///
153    /// First seeds any unseeded index (unless [`skip_backfill`](Self::skip_backfill)
154    /// is set), then follows live changes.
155    #[tracing::instrument(
156        name = "engine.run",
157        skip_all,
158        fields(
159            skip_backfill = self.skip_backfill,
160            queue_capacity = self.queue_capacity,
161            max_changes = self.batch.max_changes,
162            max_delay_ms = self.batch.max_delay.as_millis() as u64,
163        ),
164    )]
165    pub async fn run(self) -> Result<()> {
166        let Engine {
167            source,
168            documents,
169            sink,
170            observer,
171            queue_capacity,
172            batch,
173            skip_backfill,
174            failure_policies,
175        } = self;
176        let pipeline = Pipeline {
177            documents: documents.as_ref(),
178            sink: sink.as_ref(),
179            observer: &observer,
180            queue_capacity,
181            batch,
182            failure_policies: &failure_policies,
183        };
184        let result = run_inner(pipeline, source.as_ref(), skip_backfill).await;
185        if let Err(error) = &result {
186            observer.on_error(&error.to_string());
187        }
188        result
189    }
190}