Skip to main content

engine/
lib.rs

1#![doc = include_str!("../README.md")]
2// The pipeline benchmark (in `benches/`) pulls a concrete source and sink as
3// dev-dependencies the unit-test build doesn't touch; allow that only under
4// `cfg(test)` — the normal build still enforces unused dependencies.
5#![cfg_attr(test, allow(unused_crate_dependencies))]
6
7mod error;
8mod observer;
9mod pipeline;
10mod policy;
11
12#[cfg(test)]
13mod tests;
14
15pub use error::*;
16pub use observer::*;
17pub use policy::{BatchPolicy, FailurePolicies, FailurePolicy};
18
19use std::sync::Arc;
20
21use sinks_core::Sink;
22use sources_core::cdc::ChangeCapture;
23use sources_core::document::DocumentBuilder;
24
25use crate::pipeline::{Pipeline, run_inner};
26
27/// Pending changes buffered between capture and the worker.
28const DEFAULT_QUEUE_CAPACITY: usize = 1024;
29
30/// Drives changes from a source through to a sink.
31#[derive(Debug)]
32pub struct Engine {
33    source: Arc<dyn ChangeCapture>,
34    documents: Arc<dyn DocumentBuilder>,
35    sink: Arc<dyn Sink>,
36    observer: Arc<dyn Observer>,
37    queue_capacity: usize,
38    batch: BatchPolicy,
39    skip_backfill: bool,
40    failure_policies: FailurePolicies,
41}
42
43impl Engine {
44    pub fn new(
45        source: Arc<dyn ChangeCapture>,
46        documents: Arc<dyn DocumentBuilder>,
47        sink: Arc<dyn Sink>,
48    ) -> Self {
49        Self {
50            source,
51            documents,
52            sink,
53            observer: Arc::new(NoopObserver),
54            queue_capacity: DEFAULT_QUEUE_CAPACITY,
55            batch: BatchPolicy::default(),
56            skip_backfill: false,
57            failure_policies: FailurePolicies::default(),
58        }
59    }
60
61    /// Report lifecycle and progress events to `observer` (metrics, a live
62    /// status surface, …). Defaults to [`NoopObserver`]. See [`Observer`].
63    pub fn with_observer(mut self, observer: Arc<dyn Observer>) -> Self {
64        self.observer = observer;
65        self
66    }
67
68    /// Set how many changes may buffer between capture and the worker.
69    pub fn with_queue_capacity(mut self, capacity: usize) -> Self {
70        self.queue_capacity = capacity.max(1);
71        self
72    }
73
74    /// Set how the worker groups changes into one sink flush (see
75    /// [`BatchPolicy`]). `max_changes` is clamped to at least 1.
76    pub fn with_batch(mut self, batch: BatchPolicy) -> Self {
77        self.batch = BatchPolicy {
78            max_changes: batch.max_changes.max(1),
79            ..batch
80        };
81        self
82    }
83
84    /// Force-skip the backfill phase entirely, regardless of what the sink
85    /// reports. An escape hatch for sinks that can't persist seeded-state (so
86    /// they would otherwise re-seed every run) or to resume without re-checking.
87    pub fn skip_backfill(mut self, skip: bool) -> Self {
88        self.skip_backfill = skip;
89        self
90    }
91
92    /// Set how the engine resolves the policy for documents a sink rejects at
93    /// the item level (see [`FailurePolicies`]). Defaults to
94    /// [`FailurePolicy::Stop`] for every index.
95    pub fn with_failure_policies(mut self, policies: FailurePolicies) -> Self {
96        self.failure_policies = policies;
97        self
98    }
99
100    /// Run until the live change stream ends or an error stops the pipeline.
101    ///
102    /// First seeds any unseeded index (unless [`skip_backfill`](Self::skip_backfill)
103    /// is set), then follows live changes.
104    #[tracing::instrument(
105        name = "engine.run",
106        skip_all,
107        fields(
108            skip_backfill = self.skip_backfill,
109            queue_capacity = self.queue_capacity,
110            max_changes = self.batch.max_changes,
111            max_delay_ms = self.batch.max_delay.as_millis() as u64,
112        ),
113    )]
114    pub async fn run(self) -> Result<()> {
115        let Engine {
116            source,
117            documents,
118            sink,
119            observer,
120            queue_capacity,
121            batch,
122            skip_backfill,
123            failure_policies,
124        } = self;
125        let pipeline = Pipeline {
126            documents: documents.as_ref(),
127            sink: sink.as_ref(),
128            observer: &observer,
129            queue_capacity,
130            batch,
131            failure_policies: &failure_policies,
132        };
133        let result = run_inner(pipeline, source.as_ref(), skip_backfill).await;
134        if let Err(error) = &result {
135            observer.on_error(&error.to_string());
136        }
137        result
138    }
139}