1#![doc = include_str!("../README.md")]
2
3mod backends;
4mod lag;
5mod observer;
6pub mod status;
7
8pub use backends::{Backends, SourceParts};
9pub use observer::StatusObserver;
10pub use status::{IndexState, Phase, Status, StatusSnapshot};
11
12pub use engine::{BatchStats, Observer};
16pub use schema_core::IndexName;
17
18use std::future::Future;
19use std::sync::Arc;
20use std::time::{Duration, Instant};
21
22use anyhow::Context;
23use engine::{Engine, FailurePolicies, FanOut};
24use schema::Config;
25use sources_core::cdc::ChangeCapture;
26
27#[derive(Debug, Clone)]
31pub struct DaemonOptions {
32 pub slot: String,
34 pub publication: String,
36 pub manage_publication: bool,
40 pub skip_backfill: bool,
42 pub queue_capacity: usize,
44 pub pretty: bool,
46 pub lag_poll_interval: Duration,
48}
49
50impl Default for DaemonOptions {
51 fn default() -> Self {
52 Self {
53 slot: "flusso".to_owned(),
54 publication: "flusso".to_owned(),
55 manage_publication: true,
56 skip_backfill: false,
57 queue_capacity: 1024,
58 pretty: false,
59 lag_poll_interval: Duration::from_secs(15),
60 }
61 }
62}
63
64#[derive(Debug)]
66pub struct Daemon {
67 config: Config,
68 options: DaemonOptions,
69 backends: Arc<dyn Backends>,
70 extra_observers: Vec<Arc<dyn Observer>>,
71 status: Option<Arc<Status>>,
72}
73
74impl Daemon {
75 pub fn new(config: Config, backends: Arc<dyn Backends>) -> Self {
81 Self {
82 config,
83 options: DaemonOptions::default(),
84 backends,
85 extra_observers: Vec::new(),
86 status: None,
87 }
88 }
89
90 pub fn with_options(mut self, options: DaemonOptions) -> Self {
91 self.options = options;
92 self
93 }
94
95 pub fn with_observer(mut self, observer: Arc<dyn Observer>) -> Self {
99 self.extra_observers.push(observer);
100 self
101 }
102
103 pub fn with_status(mut self, status: Arc<Status>) -> Self {
111 self.status = Some(status);
112 self
113 }
114
115 #[tracing::instrument(name = "daemon.start", skip_all)]
123 pub async fn start(self) -> anyhow::Result<RunningDaemon> {
124 let Daemon {
125 config,
126 options,
127 backends,
128 extra_observers,
129 status,
130 } = self;
131
132 tracing::info!(
133 slot = %options.slot,
134 publication = %options.publication,
135 indexes = config.indexes.len(),
136 "starting sync",
137 );
138
139 let status = status.unwrap_or_else(|| {
142 Arc::new(Status::new(config.indexes.keys().cloned(), Instant::now()))
143 });
144 status.set_phase(Phase::Starting);
145 let mut observers: Vec<Arc<dyn Observer>> =
146 vec![Arc::new(StatusObserver::new(Arc::clone(&status)))];
147 observers.extend(extra_observers);
148 let observer: Arc<dyn Observer> = Arc::new(FanOut::new(observers));
149
150 let config = Arc::new(config);
151 let SourceParts { capture, documents } =
152 backends.source(Arc::clone(&config), &options).await?;
153 let sink = backends.sink(&config, &options).await?;
154
155 let mut failure_policies = FailurePolicies::new(config.on_error);
156 for (name, index) in &config.indexes {
157 if let Some(policy) = index.on_error {
158 failure_policies = failure_policies.with_override(name.as_ref(), policy);
159 }
160 }
161
162 let engine = Engine::new(Arc::clone(&capture), documents, sink)
163 .with_observer(Arc::clone(&observer))
164 .with_queue_capacity(options.queue_capacity)
165 .skip_backfill(options.skip_backfill)
166 .with_failure_policies(failure_policies);
167
168 Ok(RunningDaemon {
169 status,
170 engine,
171 source: capture,
172 observer,
173 lag_poll_interval: options.lag_poll_interval,
174 })
175 }
176}
177
178#[derive(Debug)]
181pub struct RunningDaemon {
182 status: Arc<Status>,
183 engine: Engine,
184 source: Arc<dyn ChangeCapture>,
185 observer: Arc<dyn Observer>,
186 lag_poll_interval: Duration,
187}
188
189impl RunningDaemon {
190 pub fn status(&self) -> Arc<Status> {
193 Arc::clone(&self.status)
194 }
195
196 #[tracing::instrument(name = "daemon.run", skip_all)]
201 pub async fn run(self, shutdown: impl Future<Output = ()> + Send) -> anyhow::Result<()> {
202 let RunningDaemon {
203 status,
204 engine,
205 source,
206 observer,
207 lag_poll_interval,
208 } = self;
209
210 let _lag = LagGuard(tokio::spawn(lag::poll(source, observer, lag_poll_interval)));
214
215 let result = tokio::select! {
216 res = engine.run() => res.context("sync engine stopped"),
217 () = shutdown => {
218 tracing::info!("shutdown requested; stopping pipeline");
219 Ok(())
220 }
221 };
222
223 status.set_phase(Phase::Stopped);
224 result
225 }
226}
227
228#[derive(Debug)]
232struct LagGuard(tokio::task::JoinHandle<()>);
233
234impl Drop for LagGuard {
235 fn drop(&mut self) {
236 self.0.abort();
237 }
238}
239
240#[cfg(test)]
241#[allow(clippy::unwrap_used)]
242mod tests;