engine/lib.rs
1//! The `flusso` sync engine.
2//!
3//! Wires the pluggable edges together and runs the pipeline:
4//!
5//! ```text
6//! ChangeCapture ─▶ queue ─▶ resolve ─▶ build ─▶ Sink ─▶ flush ─▶ ack
7//! ```
8//!
9//! A **capture** task drains the source's change stream into a bounded
10//! in-process [`queue`](queue_channel) (back-pressure: capture blocks when the
11//! queue is full). A **worker** pulls changes and, for the row each names,
12//! resolves the affected document ids, assembles each one, and writes it to the
13//! [`Sink`]'s buffer.
14//!
15//! Writes are **batched**: the worker groups up to [`BatchPolicy::max_changes`]
16//! changes (or whatever arrives within [`BatchPolicy::max_delay`], whichever
17//! comes first) into a single [`flush`](Sink::flush), turning N changes into
18//! ⌈N / max_changes⌉ bulk round-trips instead of N. The source acks for a batch
19//! are confirmed **only after** the flush that persisted their documents, so the
20//! replication slot advances past a change exactly when its documents are
21//! durable downstream — preserving at-least-once delivery: a crash before the
22//! flush leaves the whole batch unconfirmed, so it is redelivered on restart and
23//! re-applied idempotently (documents are rebuilt from the current row and
24//! written by deterministic id).
25//!
26//! Stopping on any error is therefore safe: unconfirmed changes are redelivered
27//! when the run restarts.
28//!
29//! Before anything else, the engine asks the [`DocumentBuilder`] for each
30//! index's resolved mapping and tells the sink to create it
31//! ([`ensure_index`](Sink::ensure_index)) — so the destination uses the
32//! configured field types instead of guessing. This is idempotent, so it runs
33//! on every start, including resumes.
34//!
35//! Before live capture, the engine runs an optional **backfill** phase. It asks
36//! the [`DocumentBuilder`] which indexes exist and the sink whether each is
37//! already seeded; for those that aren't, it asks the source to
38//! [`snapshot`](ChangeCapture::snapshot) their root tables and drives that
39//! finite stream through the same queue → resolve → build → sink path (scoped to
40//! just the unseeded indexes), then records each as seeded. So "is a backfill
41//! needed?" is the destination's call, not the source's.
42//!
43//! The queue, source, sink, and document builder are all trait objects, so the
44//! backend choices (WAL vs polling, stdout vs OpenSearch, channel vs a durable
45//! broker) are swappable without touching this loop.
46//!
47//! The implementation is split across modules: `policy` holds the run
48//! configuration ([`BatchPolicy`], [`FailurePolicies`]); `pipeline` holds the
49//! `Pipeline` execution machinery this `Engine` drives; `observer` the progress
50//! trait; `error` the error type.
51
52// The pipeline benchmark (in `benches/`) pulls a concrete source and sink as
53// dev-dependencies the unit-test build doesn't touch; allow that only under
54// `cfg(test)` — the normal build still enforces unused dependencies.
55#![cfg_attr(test, allow(unused_crate_dependencies))]
56
57mod error;
58mod observer;
59mod pipeline;
60mod policy;
61
62#[cfg(test)]
63mod tests;
64
65pub use error::*;
66pub use observer::*;
67pub use policy::{BatchPolicy, FailurePolicies, FailurePolicy};
68
69use std::sync::Arc;
70
71use sinks_core::Sink;
72use sources_core::cdc::ChangeCapture;
73use sources_core::document::DocumentBuilder;
74
75use crate::pipeline::{Pipeline, run_inner};
76
77/// Pending changes buffered between capture and the worker.
78const DEFAULT_QUEUE_CAPACITY: usize = 1024;
79
80/// Drives changes from a source through to a sink.
81#[derive(Debug)]
82pub struct Engine {
83 source: Arc<dyn ChangeCapture>,
84 documents: Arc<dyn DocumentBuilder>,
85 sink: Arc<dyn Sink>,
86 observer: Arc<dyn Observer>,
87 queue_capacity: usize,
88 batch: BatchPolicy,
89 skip_backfill: bool,
90 failure_policies: FailurePolicies,
91}
92
93impl Engine {
94 /// Assemble an engine from its pluggable parts.
95 pub fn new(
96 source: Arc<dyn ChangeCapture>,
97 documents: Arc<dyn DocumentBuilder>,
98 sink: Arc<dyn Sink>,
99 ) -> Self {
100 Self {
101 source,
102 documents,
103 sink,
104 observer: Arc::new(NoopObserver),
105 queue_capacity: DEFAULT_QUEUE_CAPACITY,
106 batch: BatchPolicy::default(),
107 skip_backfill: false,
108 failure_policies: FailurePolicies::default(),
109 }
110 }
111
112 /// Report lifecycle and progress events to `observer` (metrics, a live
113 /// status surface, …). Defaults to [`NoopObserver`]. See [`Observer`].
114 pub fn with_observer(mut self, observer: Arc<dyn Observer>) -> Self {
115 self.observer = observer;
116 self
117 }
118
119 /// Set how many changes may buffer between capture and the worker.
120 pub fn with_queue_capacity(mut self, capacity: usize) -> Self {
121 self.queue_capacity = capacity.max(1);
122 self
123 }
124
125 /// Set how the worker groups changes into one sink flush (see
126 /// [`BatchPolicy`]). `max_changes` is clamped to at least 1.
127 pub fn with_batch(mut self, batch: BatchPolicy) -> Self {
128 self.batch = BatchPolicy {
129 max_changes: batch.max_changes.max(1),
130 ..batch
131 };
132 self
133 }
134
135 /// Force-skip the backfill phase entirely, regardless of what the sink
136 /// reports. An escape hatch for sinks that can't persist seeded-state (so
137 /// they would otherwise re-seed every run) or to resume without re-checking.
138 pub fn skip_backfill(mut self, skip: bool) -> Self {
139 self.skip_backfill = skip;
140 self
141 }
142
143 /// Set how the engine resolves the policy for documents a sink rejects at
144 /// the item level (see [`FailurePolicies`]). Defaults to
145 /// [`FailurePolicy::Stop`] for every index.
146 pub fn with_failure_policies(mut self, policies: FailurePolicies) -> Self {
147 self.failure_policies = policies;
148 self
149 }
150
151 /// Run until the live change stream ends or an error stops the pipeline.
152 ///
153 /// First seeds any unseeded index (unless [`skip_backfill`](Self::skip_backfill)
154 /// is set), then follows live changes.
155 #[tracing::instrument(
156 name = "engine.run",
157 skip_all,
158 fields(
159 skip_backfill = self.skip_backfill,
160 queue_capacity = self.queue_capacity,
161 max_changes = self.batch.max_changes,
162 max_delay_ms = self.batch.max_delay.as_millis() as u64,
163 ),
164 )]
165 pub async fn run(self) -> Result<()> {
166 let Engine {
167 source,
168 documents,
169 sink,
170 observer,
171 queue_capacity,
172 batch,
173 skip_backfill,
174 failure_policies,
175 } = self;
176 let pipeline = Pipeline {
177 documents: documents.as_ref(),
178 sink: sink.as_ref(),
179 observer: &observer,
180 queue_capacity,
181 batch,
182 failure_policies: &failure_policies,
183 };
184 let result = run_inner(pipeline, source.as_ref(), skip_backfill).await;
185 if let Err(error) = &result {
186 observer.on_error(&error.to_string());
187 }
188 result
189 }
190}