1mod backends;
22mod lag;
23mod observer;
24pub mod status;
25
26pub use backends::{Backends, SourceParts};
27pub use observer::StatusObserver;
28pub use status::{IndexState, Phase, Status, StatusSnapshot};
29
30pub use engine::{BatchStats, Observer};
34pub use schema_core::IndexName;
35
36use std::future::Future;
37use std::sync::Arc;
38use std::time::{Duration, Instant};
39
40use anyhow::Context;
41use engine::{Engine, FailurePolicies, FanOut};
42use schema::Config;
43use sources_core::cdc::ChangeCapture;
44
45#[derive(Debug, Clone)]
49pub struct DaemonOptions {
50 pub slot: String,
52 pub publication: String,
54 pub manage_publication: bool,
58 pub skip_backfill: bool,
60 pub queue_capacity: usize,
62 pub pretty: bool,
64 pub lag_poll_interval: Duration,
66}
67
68impl Default for DaemonOptions {
69 fn default() -> Self {
70 Self {
71 slot: "flusso".to_owned(),
72 publication: "flusso".to_owned(),
73 manage_publication: true,
74 skip_backfill: false,
75 queue_capacity: 1024,
76 pretty: false,
77 lag_poll_interval: Duration::from_secs(15),
78 }
79 }
80}
81
82#[derive(Debug)]
84pub struct Daemon {
85 config: Config,
86 options: DaemonOptions,
87 backends: Arc<dyn Backends>,
88 extra_observers: Vec<Arc<dyn Observer>>,
89 status: Option<Arc<Status>>,
90}
91
92impl Daemon {
93 pub fn new(config: Config, backends: Arc<dyn Backends>) -> Self {
99 Self {
100 config,
101 options: DaemonOptions::default(),
102 backends,
103 extra_observers: Vec::new(),
104 status: None,
105 }
106 }
107
108 pub fn with_options(mut self, options: DaemonOptions) -> Self {
109 self.options = options;
110 self
111 }
112
113 pub fn with_observer(mut self, observer: Arc<dyn Observer>) -> Self {
117 self.extra_observers.push(observer);
118 self
119 }
120
121 pub fn with_status(mut self, status: Arc<Status>) -> Self {
129 self.status = Some(status);
130 self
131 }
132
133 #[tracing::instrument(name = "daemon.start", skip_all)]
141 pub async fn start(self) -> anyhow::Result<RunningDaemon> {
142 let Daemon {
143 config,
144 options,
145 backends,
146 extra_observers,
147 status,
148 } = self;
149
150 tracing::info!(
151 slot = %options.slot,
152 publication = %options.publication,
153 indexes = config.indexes.len(),
154 "starting sync",
155 );
156
157 let status = status.unwrap_or_else(|| {
160 Arc::new(Status::new(config.indexes.keys().cloned(), Instant::now()))
161 });
162 status.set_phase(Phase::Starting);
163 let mut observers: Vec<Arc<dyn Observer>> =
164 vec![Arc::new(StatusObserver::new(Arc::clone(&status)))];
165 observers.extend(extra_observers);
166 let observer: Arc<dyn Observer> = Arc::new(FanOut::new(observers));
167
168 let config = Arc::new(config);
169 let SourceParts { capture, documents } =
170 backends.source(Arc::clone(&config), &options).await?;
171 let sink = backends.sink(&config, &options).await?;
172
173 let mut failure_policies = FailurePolicies::new(config.on_error);
174 for (name, index) in &config.indexes {
175 if let Some(policy) = index.on_error {
176 failure_policies = failure_policies.with_override(name.as_ref(), policy);
177 }
178 }
179
180 let engine = Engine::new(Arc::clone(&capture), documents, sink)
181 .with_observer(Arc::clone(&observer))
182 .with_queue_capacity(options.queue_capacity)
183 .skip_backfill(options.skip_backfill)
184 .with_failure_policies(failure_policies);
185
186 Ok(RunningDaemon {
187 status,
188 engine,
189 source: capture,
190 observer,
191 lag_poll_interval: options.lag_poll_interval,
192 })
193 }
194}
195
196#[derive(Debug)]
199pub struct RunningDaemon {
200 status: Arc<Status>,
201 engine: Engine,
202 source: Arc<dyn ChangeCapture>,
203 observer: Arc<dyn Observer>,
204 lag_poll_interval: Duration,
205}
206
207impl RunningDaemon {
208 pub fn status(&self) -> Arc<Status> {
211 Arc::clone(&self.status)
212 }
213
214 #[tracing::instrument(name = "daemon.run", skip_all)]
219 pub async fn run(self, shutdown: impl Future<Output = ()> + Send) -> anyhow::Result<()> {
220 let RunningDaemon {
221 status,
222 engine,
223 source,
224 observer,
225 lag_poll_interval,
226 } = self;
227
228 let _lag = LagGuard(tokio::spawn(lag::poll(source, observer, lag_poll_interval)));
232
233 let result = tokio::select! {
234 res = engine.run() => res.context("sync engine stopped"),
235 () = shutdown => {
236 tracing::info!("shutdown requested; stopping pipeline");
237 Ok(())
238 }
239 };
240
241 status.set_phase(Phase::Stopped);
242 result
243 }
244}
245
246#[derive(Debug)]
250struct LagGuard(tokio::task::JoinHandle<()>);
251
252impl Drop for LagGuard {
253 fn drop(&mut self) {
254 self.0.abort();
255 }
256}
257
258#[cfg(test)]
259#[allow(clippy::unwrap_used)]
260mod tests {
261 use super::*;
262
263 use std::collections::BTreeMap;
264 use std::sync::Mutex;
265 use std::sync::atomic::{AtomicU64, Ordering};
266 use std::time::Duration;
267
268 use async_trait::async_trait;
269 use engine::BatchStats;
270 use futures::stream::{self, BoxStream};
271 use schema::{Source, SourceType};
272 use schema_core::{ColumnName, DatabaseSchema, GenericValue, IndexName, TableName};
273 use sinks_core::{FlushReport, Sink};
274 use sources_core::cdc::{Ack, AckSink, Change, ChangeEvent};
275 use sources_core::document::{Document, DocumentBuilder, DocumentId, IndexScope};
276 use sources_core::{RowKey, SnapshotTable};
277 use tokio::sync::Notify;
278
279 use crate::observer::StatusObserver;
280 use crate::status::{IndexState, Phase};
281
282 fn users() -> IndexName {
283 IndexName::try_new("users").unwrap()
284 }
285
286 #[test]
289 fn observer_drives_status_through_its_lifecycle() {
290 let status = Arc::new(Status::new([users()], Instant::now()));
291 let observer = StatusObserver::new(Arc::clone(&status));
292
293 let snap = status.snapshot();
295 assert_eq!(snap.phase, Phase::Starting);
296 assert_eq!(snap.indexes.get("users"), Some(&IndexState::Pending));
297
298 observer.on_indexes_ensured(1);
299 observer.on_backfill_started(&[users()]);
300 let snap = status.snapshot();
301 assert_eq!(snap.phase, Phase::Backfilling);
302 assert_eq!(snap.indexes.get("users"), Some(&IndexState::Backfilling));
303
304 observer.on_index_seeded(&users());
305 observer.on_backfill_completed();
306 observer.on_live_started();
307
308 observer.on_change_captured();
310 observer.on_change_captured();
311 observer.on_change_captured();
312 observer.on_batch_committed(BatchStats {
313 changes: 3,
314 documents: 2,
315 documents_by_index: vec![(users(), 2)],
316 flush: Duration::from_millis(5),
317 });
318 observer.on_slot_lag(4096);
319
320 let snap = status.snapshot();
321 assert_eq!(snap.phase, Phase::Live);
322 assert_eq!(snap.indexes.get("users"), Some(&IndexState::Seeded));
323 assert_eq!(snap.changes_captured, 3);
324 assert_eq!(snap.changes_committed, 3);
325 assert_eq!(snap.changes_in_flight, 0);
326 assert_eq!(snap.documents_built, 2);
327 assert_eq!(snap.batches, 1);
328 assert_eq!(snap.slot_lag_bytes, Some(4096));
329 assert_eq!(snap.errors, 0);
330
331 let json = serde_json::to_value(&snap).unwrap();
333 assert_eq!(json["phase"], "live");
334 assert_eq!(json["indexes"]["users"], "seeded");
335 assert_eq!(json["changes_in_flight"], 0);
336 assert_eq!(json["slot_lag_bytes"], 4096);
337 }
338
339 #[test]
342 fn already_seeded_index_and_error_phase() {
343 let status = Arc::new(Status::new([users()], Instant::now()));
344 let observer = StatusObserver::new(Arc::clone(&status));
345
346 observer.on_live_started();
348 assert_eq!(
349 status.snapshot().indexes.get("users"),
350 Some(&IndexState::Seeded),
351 "an index live without a backfill this run is reported seeded",
352 );
353
354 observer.on_error("boom");
355 let snap = status.snapshot();
356 assert_eq!(snap.phase, Phase::Stopped);
357 assert_eq!(snap.errors, 1);
358 assert_eq!(snap.last_error.as_deref(), Some("boom"));
359 }
360
361 #[derive(Debug)]
363 struct LaggySource(Option<u64>);
364
365 #[async_trait]
366 impl ChangeCapture for LaggySource {
367 async fn live(
368 &self,
369 ) -> sources_core::Result<BoxStream<'static, sources_core::Result<Change>>> {
370 Ok(Box::pin(stream::empty()))
371 }
372
373 async fn lag(&self) -> sources_core::Result<Option<u64>> {
374 Ok(self.0)
375 }
376 }
377
378 #[derive(Debug, Default)]
381 struct LagObserver {
382 last: Mutex<Option<u64>>,
383 reported: Notify,
384 }
385
386 impl Observer for LagObserver {
387 fn on_slot_lag(&self, bytes: u64) {
388 *self.last.lock().unwrap() = Some(bytes);
389 self.reported.notify_one();
390 }
391 }
392
393 #[tokio::test]
397 async fn lag_poller_reports_each_sampled_value() {
398 let observer = Arc::new(LagObserver::default());
399 let source: Arc<dyn ChangeCapture> = Arc::new(LaggySource(Some(8192)));
400
401 let handle = tokio::spawn(lag::poll(
402 source,
403 Arc::clone(&observer) as Arc<dyn Observer>,
404 Duration::from_millis(5),
405 ));
406 tokio::time::timeout(Duration::from_secs(5), observer.reported.notified())
407 .await
408 .expect("the poller should report a lag sample");
409 handle.abort();
410
411 assert_eq!(*observer.last.lock().unwrap(), Some(8192));
412 }
413
414 #[derive(Debug)]
424 struct MockBackends {
425 capture: Arc<dyn ChangeCapture>,
426 documents: Arc<dyn DocumentBuilder>,
427 sink: Arc<dyn Sink>,
428 source_built: Arc<AtomicU64>,
429 sink_built: Arc<AtomicU64>,
430 }
431
432 #[async_trait]
433 impl Backends for MockBackends {
434 async fn source(
435 &self,
436 _config: Arc<Config>,
437 _options: &DaemonOptions,
438 ) -> anyhow::Result<SourceParts> {
439 self.source_built.fetch_add(1, Ordering::SeqCst);
440 Ok(SourceParts {
441 capture: Arc::clone(&self.capture),
442 documents: Arc::clone(&self.documents),
443 })
444 }
445
446 async fn sink(
447 &self,
448 _config: &Config,
449 _options: &DaemonOptions,
450 ) -> anyhow::Result<Arc<dyn Sink>> {
451 self.sink_built.fetch_add(1, Ordering::SeqCst);
452 Ok(Arc::clone(&self.sink))
453 }
454 }
455
456 #[derive(Debug)]
459 struct ScriptedSource {
460 changes: Mutex<Option<Vec<Change>>>,
461 }
462
463 #[async_trait]
464 impl ChangeCapture for ScriptedSource {
465 async fn live(
466 &self,
467 ) -> sources_core::Result<BoxStream<'static, sources_core::Result<Change>>> {
468 let changes = self.changes.lock().unwrap().take().unwrap_or_default();
469 Ok(Box::pin(stream::iter(
470 changes
471 .into_iter()
472 .map(Ok::<Change, sources_core::SourceError>),
473 )))
474 }
475
476 async fn lag(&self) -> sources_core::Result<Option<u64>> {
477 Ok(None)
478 }
479 }
480
481 #[derive(Debug)]
483 struct ScriptedDocuments;
484
485 #[async_trait]
486 impl DocumentBuilder for ScriptedDocuments {
487 async fn resolve(
488 &self,
489 _table: &TableName,
490 key: &RowKey,
491 ) -> sources_core::Result<Vec<DocumentId>> {
492 Ok(vec![DocumentId {
493 index: users(),
494 key: key.clone(),
495 }])
496 }
497
498 async fn build(&self, id: &DocumentId) -> sources_core::Result<Document> {
499 let deleted = matches!(id.key.0.first(), Some((_, GenericValue::Int(2))));
500 Ok(if deleted {
501 Document::Delete { id: id.clone() }
502 } else {
503 Document::Upsert {
504 id: id.clone(),
505 body: GenericValue::Map(Default::default()),
506 }
507 })
508 }
509
510 fn backfill_scopes(&self) -> Vec<IndexScope> {
511 vec![IndexScope {
512 index: users(),
513 root: SnapshotTable {
514 db_schema: DatabaseSchema::try_new("public").unwrap(),
515 table: TableName::try_new("users").unwrap(),
516 },
517 }]
518 }
519 }
520
521 #[derive(Debug, Default)]
523 struct RecordingSink {
524 ops: Arc<Mutex<Vec<String>>>,
525 }
526
527 #[async_trait]
528 impl Sink for RecordingSink {
529 async fn upsert(
530 &self,
531 index: &IndexName,
532 id: &str,
533 _document: &GenericValue,
534 ) -> sinks_core::Result<()> {
535 self.ops
536 .lock()
537 .unwrap()
538 .push(format!("upsert {} {id}", index.as_ref()));
539 Ok(())
540 }
541
542 async fn delete(&self, index: &IndexName, id: &str) -> sinks_core::Result<()> {
543 self.ops
544 .lock()
545 .unwrap()
546 .push(format!("delete {} {id}", index.as_ref()));
547 Ok(())
548 }
549
550 async fn flush(&self, _caught_up: bool) -> sinks_core::Result<FlushReport> {
551 Ok(FlushReport::clean())
552 }
553 }
554
555 #[derive(Debug)]
557 struct CountingAck(Arc<AtomicU64>);
558
559 impl AckSink for CountingAck {
560 fn confirm(&self, _seq: u64) {
561 self.0.fetch_add(1, Ordering::SeqCst);
562 }
563 }
564
565 fn row_change(delete: bool, id: i64, seq: u64, acks: &Arc<AtomicU64>) -> Change {
566 let table = TableName::try_new("users").unwrap();
567 let key = RowKey(vec![(
568 ColumnName::try_new("id").unwrap(),
569 GenericValue::Int(id),
570 )]);
571 let event = if delete {
572 ChangeEvent::Delete { table, key }
573 } else {
574 ChangeEvent::Upsert { table, key }
575 };
576 Change {
577 event,
578 ack: Ack::new(seq, Arc::new(CountingAck(Arc::clone(acks)))),
579 }
580 }
581
582 fn backendless_config() -> Config {
585 Config {
586 source: Source {
587 source_type: SourceType::Postgres,
588 connection: None,
589 manage_publication: true,
590 },
591 sinks: BTreeMap::new(),
592 indexes: BTreeMap::new(),
593 on_error: Default::default(),
594 server: Default::default(),
595 }
596 }
597
598 fn daemon_over(backends: Arc<MockBackends>) -> Daemon {
599 Daemon::new(backendless_config(), backends).with_options(DaemonOptions {
600 skip_backfill: true,
602 ..DaemonOptions::default()
603 })
604 }
605
606 #[tokio::test]
610 async fn start_builds_backends_through_the_seam() {
611 let source_built = Arc::new(AtomicU64::new(0));
612 let sink_built = Arc::new(AtomicU64::new(0));
613
614 let backends = Arc::new(MockBackends {
615 capture: Arc::new(ScriptedSource {
616 changes: Mutex::new(Some(Vec::new())),
617 }),
618 documents: Arc::new(ScriptedDocuments),
619 sink: Arc::new(RecordingSink::default()),
620 source_built: Arc::clone(&source_built),
621 sink_built: Arc::clone(&sink_built),
622 });
623
624 let running = daemon_over(backends).start().await.unwrap();
625 let status = running.status();
626
627 assert_eq!(source_built.load(Ordering::SeqCst), 1);
629 assert_eq!(sink_built.load(Ordering::SeqCst), 1);
630
631 running.run(std::future::pending::<()>()).await.unwrap();
633
634 let snap = status.snapshot();
635 assert_eq!(snap.phase, Phase::Stopped);
636 assert_eq!(snap.changes_committed, 0);
637 }
638
639 #[tokio::test]
643 async fn drives_changes_through_injected_backends() {
644 let acks = Arc::new(AtomicU64::new(0));
645 let ops = Arc::new(Mutex::new(Vec::new()));
646
647 let changes = vec![
648 row_change(false, 1, 0, &acks),
649 row_change(true, 2, 1, &acks),
650 ];
651
652 let backends = Arc::new(MockBackends {
653 capture: Arc::new(ScriptedSource {
654 changes: Mutex::new(Some(changes)),
655 }),
656 documents: Arc::new(ScriptedDocuments),
657 sink: Arc::new(RecordingSink {
658 ops: Arc::clone(&ops),
659 }),
660 source_built: Arc::new(AtomicU64::new(0)),
661 sink_built: Arc::new(AtomicU64::new(0)),
662 });
663
664 let running = daemon_over(backends).start().await.unwrap();
665 let status = running.status();
666 running.run(std::future::pending::<()>()).await.unwrap();
667
668 assert_eq!(
669 *ops.lock().unwrap(),
670 vec!["upsert users 1".to_owned(), "delete users 2".to_owned()],
671 );
672 assert_eq!(acks.load(Ordering::SeqCst), 2, "both changes acked");
673
674 let snap = status.snapshot();
675 assert_eq!(snap.changes_captured, 2);
676 assert_eq!(snap.changes_committed, 2);
677 assert_eq!(snap.changes_in_flight, 0);
678 assert_eq!(snap.phase, Phase::Stopped);
679 }
680}