engine/lib.rs
1//! The `flusso` sync engine.
2//!
3//! Wires the pluggable edges together and runs the pipeline:
4//!
5//! ```text
6//! ChangeCapture ─▶ queue ─▶ resolve ─▶ build ─▶ Sink ─▶ flush ─▶ ack
7//! ```
8//!
9//! A **capture** task drains the source's change stream into a bounded
10//! in-process [`queue`](queue_channel) (back-pressure: capture blocks when the
11//! queue is full). A **worker** pulls changes and, for the row each names,
12//! resolves the affected document ids, assembles each one, and writes it to the
13//! [`Sink`]'s buffer.
14//!
15//! Writes are **batched**: the worker groups up to [`BatchPolicy::max_changes`]
16//! changes (or whatever arrives within [`BatchPolicy::max_delay`], whichever
17//! comes first) into a single [`flush`](Sink::flush), turning N changes into
18//! ⌈N / max_changes⌉ bulk round-trips instead of N. The source acks for a batch
19//! are confirmed **only after** the flush that persisted their documents, so the
20//! replication slot advances past a change exactly when its documents are
21//! durable downstream — preserving at-least-once delivery: a crash before the
22//! flush leaves the whole batch unconfirmed, so it is redelivered on restart and
23//! re-applied idempotently (documents are rebuilt from the current row and
24//! written by deterministic id).
25//!
26//! Stopping on any error is therefore safe: unconfirmed changes are redelivered
27//! when the run restarts.
28//!
29//! Before anything else, the engine asks the [`DocumentBuilder`] for each
30//! index's resolved mapping and tells the sink to create it
31//! ([`ensure_index`](Sink::ensure_index)) — so the destination uses the
32//! configured field types instead of guessing. This is idempotent, so it runs
33//! on every start, including resumes.
34//!
35//! Before live capture, the engine runs an optional **backfill** phase. It asks
36//! the [`DocumentBuilder`] which indexes exist and the sink whether each is
37//! already seeded; for those that aren't, it asks the source to
38//! [`snapshot`](ChangeCapture::snapshot) their root tables and drives that
39//! finite stream through the same queue → resolve → build → sink path (scoped to
40//! just the unseeded indexes), then records each as seeded. So "is a backfill
41//! needed?" is the destination's call, not the source's.
42//!
43//! The queue, source, sink, and document builder are all trait objects, so the
44//! backend choices (WAL vs polling, stdout vs OpenSearch, channel vs a durable
45//! broker) are swappable without touching this loop.
46//!
47//! The implementation is split across modules: `policy` holds the run
48//! configuration ([`BatchPolicy`], [`FailurePolicies`]); `pipeline` holds the
49//! `Pipeline` execution machinery this `Engine` drives; `observer` the progress
50//! trait; `error` the error type.
51
52// The pipeline benchmark (in `benches/`) pulls a concrete source and sink as
53// dev-dependencies the unit-test build doesn't touch; allow that only under
54// `cfg(test)` — the normal build still enforces unused dependencies.
55#![cfg_attr(test, allow(unused_crate_dependencies))]
56
57mod error;
58mod observer;
59mod pipeline;
60mod policy;
61
62#[cfg(test)]
63mod tests;
64
65pub use error::*;
66pub use observer::*;
67pub use policy::{BatchPolicy, FailurePolicies, FailurePolicy};
68
69use std::sync::Arc;
70
71use sinks_core::Sink;
72use sources_core::cdc::ChangeCapture;
73use sources_core::document::DocumentBuilder;
74
75use crate::pipeline::{Pipeline, run_inner};
76
77/// Pending changes buffered between capture and the worker.
78const DEFAULT_QUEUE_CAPACITY: usize = 1024;
79
80/// Drives changes from a source through to a sink.
81#[derive(Debug)]
82pub struct Engine {
83 source: Arc<dyn ChangeCapture>,
84 documents: Arc<dyn DocumentBuilder>,
85 sink: Arc<dyn Sink>,
86 observer: Arc<dyn Observer>,
87 queue_capacity: usize,
88 batch: BatchPolicy,
89 skip_backfill: bool,
90 failure_policies: FailurePolicies,
91}
92
93impl Engine {
94 pub fn new(
95 source: Arc<dyn ChangeCapture>,
96 documents: Arc<dyn DocumentBuilder>,
97 sink: Arc<dyn Sink>,
98 ) -> Self {
99 Self {
100 source,
101 documents,
102 sink,
103 observer: Arc::new(NoopObserver),
104 queue_capacity: DEFAULT_QUEUE_CAPACITY,
105 batch: BatchPolicy::default(),
106 skip_backfill: false,
107 failure_policies: FailurePolicies::default(),
108 }
109 }
110
111 /// Report lifecycle and progress events to `observer` (metrics, a live
112 /// status surface, …). Defaults to [`NoopObserver`]. See [`Observer`].
113 pub fn with_observer(mut self, observer: Arc<dyn Observer>) -> Self {
114 self.observer = observer;
115 self
116 }
117
118 /// Set how many changes may buffer between capture and the worker.
119 pub fn with_queue_capacity(mut self, capacity: usize) -> Self {
120 self.queue_capacity = capacity.max(1);
121 self
122 }
123
124 /// Set how the worker groups changes into one sink flush (see
125 /// [`BatchPolicy`]). `max_changes` is clamped to at least 1.
126 pub fn with_batch(mut self, batch: BatchPolicy) -> Self {
127 self.batch = BatchPolicy {
128 max_changes: batch.max_changes.max(1),
129 ..batch
130 };
131 self
132 }
133
134 /// Force-skip the backfill phase entirely, regardless of what the sink
135 /// reports. An escape hatch for sinks that can't persist seeded-state (so
136 /// they would otherwise re-seed every run) or to resume without re-checking.
137 pub fn skip_backfill(mut self, skip: bool) -> Self {
138 self.skip_backfill = skip;
139 self
140 }
141
142 /// Set how the engine resolves the policy for documents a sink rejects at
143 /// the item level (see [`FailurePolicies`]). Defaults to
144 /// [`FailurePolicy::Stop`] for every index.
145 pub fn with_failure_policies(mut self, policies: FailurePolicies) -> Self {
146 self.failure_policies = policies;
147 self
148 }
149
150 /// Run until the live change stream ends or an error stops the pipeline.
151 ///
152 /// First seeds any unseeded index (unless [`skip_backfill`](Self::skip_backfill)
153 /// is set), then follows live changes.
154 #[tracing::instrument(
155 name = "engine.run",
156 skip_all,
157 fields(
158 skip_backfill = self.skip_backfill,
159 queue_capacity = self.queue_capacity,
160 max_changes = self.batch.max_changes,
161 max_delay_ms = self.batch.max_delay.as_millis() as u64,
162 ),
163 )]
164 pub async fn run(self) -> Result<()> {
165 let Engine {
166 source,
167 documents,
168 sink,
169 observer,
170 queue_capacity,
171 batch,
172 skip_backfill,
173 failure_policies,
174 } = self;
175 let pipeline = Pipeline {
176 documents: documents.as_ref(),
177 sink: sink.as_ref(),
178 observer: &observer,
179 queue_capacity,
180 batch,
181 failure_policies: &failure_policies,
182 };
183 let result = run_inner(pipeline, source.as_ref(), skip_backfill).await;
184 if let Err(error) = &result {
185 observer.on_error(&error.to_string());
186 }
187 result
188 }
189}