1#![doc = include_str!("../README.md")]
2#![cfg_attr(test, allow(unused_crate_dependencies))]
6
7mod error;
8mod observer;
9mod pipeline;
10mod policy;
11
12#[cfg(test)]
13mod tests;
14
15pub use error::*;
16pub use observer::*;
17pub use policy::{BatchPolicy, FailurePolicies, FailurePolicy};
18
19use std::sync::Arc;
20
21use sinks_core::Sink;
22use sources_core::cdc::ChangeCapture;
23use sources_core::document::DocumentBuilder;
24
25use crate::pipeline::{Pipeline, run_inner};
26
27const DEFAULT_QUEUE_CAPACITY: usize = 1024;
29
30#[derive(Debug)]
32pub struct Engine {
33 source: Arc<dyn ChangeCapture>,
34 documents: Arc<dyn DocumentBuilder>,
35 sink: Arc<dyn Sink>,
36 observer: Arc<dyn Observer>,
37 queue_capacity: usize,
38 batch: BatchPolicy,
39 skip_backfill: bool,
40 failure_policies: FailurePolicies,
41}
42
43impl Engine {
44 pub fn new(
45 source: Arc<dyn ChangeCapture>,
46 documents: Arc<dyn DocumentBuilder>,
47 sink: Arc<dyn Sink>,
48 ) -> Self {
49 Self {
50 source,
51 documents,
52 sink,
53 observer: Arc::new(NoopObserver),
54 queue_capacity: DEFAULT_QUEUE_CAPACITY,
55 batch: BatchPolicy::default(),
56 skip_backfill: false,
57 failure_policies: FailurePolicies::default(),
58 }
59 }
60
61 pub fn with_observer(mut self, observer: Arc<dyn Observer>) -> Self {
64 self.observer = observer;
65 self
66 }
67
68 pub fn with_queue_capacity(mut self, capacity: usize) -> Self {
70 self.queue_capacity = capacity.max(1);
71 self
72 }
73
74 pub fn with_batch(mut self, batch: BatchPolicy) -> Self {
77 self.batch = BatchPolicy {
78 max_changes: batch.max_changes.max(1),
79 ..batch
80 };
81 self
82 }
83
84 pub fn skip_backfill(mut self, skip: bool) -> Self {
88 self.skip_backfill = skip;
89 self
90 }
91
92 pub fn with_failure_policies(mut self, policies: FailurePolicies) -> Self {
96 self.failure_policies = policies;
97 self
98 }
99
100 #[tracing::instrument(
105 name = "engine.run",
106 skip_all,
107 fields(
108 skip_backfill = self.skip_backfill,
109 queue_capacity = self.queue_capacity,
110 max_changes = self.batch.max_changes,
111 max_delay_ms = self.batch.max_delay.as_millis() as u64,
112 ),
113 )]
114 pub async fn run(self) -> Result<()> {
115 let Engine {
116 source,
117 documents,
118 sink,
119 observer,
120 queue_capacity,
121 batch,
122 skip_backfill,
123 failure_policies,
124 } = self;
125 let pipeline = Pipeline {
126 documents: documents.as_ref(),
127 sink: sink.as_ref(),
128 observer: &observer,
129 queue_capacity,
130 batch,
131 failure_policies: &failure_policies,
132 };
133 let result = run_inner(pipeline, source.as_ref(), skip_backfill).await;
134 if let Err(error) = &result {
135 observer.on_error(&error.to_string());
136 }
137 result
138 }
139}