1mod backends;
22mod lag;
23mod observer;
24pub mod status;
25
26pub use backends::{Backends, SourceParts};
27pub use observer::StatusObserver;
28pub use status::{IndexState, Phase, Status, StatusSnapshot};
29
30pub use engine::{BatchStats, Observer};
34pub use schema_core::IndexName;
35
36use std::future::Future;
37use std::sync::Arc;
38use std::time::{Duration, Instant};
39
40use anyhow::Context;
41use engine::{Engine, FailurePolicies, FanOut};
42use schema::Config;
43use sources_core::cdc::ChangeCapture;
44
45#[derive(Debug, Clone)]
49pub struct DaemonOptions {
50 pub slot: String,
52 pub publication: String,
54 pub skip_backfill: bool,
56 pub queue_capacity: usize,
58 pub pretty: bool,
60 pub lag_poll_interval: Duration,
62}
63
64impl Default for DaemonOptions {
65 fn default() -> Self {
66 Self {
67 slot: "flusso".to_owned(),
68 publication: "flusso".to_owned(),
69 skip_backfill: false,
70 queue_capacity: 1024,
71 pretty: false,
72 lag_poll_interval: Duration::from_secs(15),
73 }
74 }
75}
76
77#[derive(Debug)]
79pub struct Daemon {
80 config: Config,
81 options: DaemonOptions,
82 backends: Arc<dyn Backends>,
83 extra_observers: Vec<Arc<dyn Observer>>,
84 status: Option<Arc<Status>>,
85}
86
87impl Daemon {
88 pub fn new(config: Config, backends: Arc<dyn Backends>) -> Self {
94 Self {
95 config,
96 options: DaemonOptions::default(),
97 backends,
98 extra_observers: Vec::new(),
99 status: None,
100 }
101 }
102
103 pub fn with_options(mut self, options: DaemonOptions) -> Self {
105 self.options = options;
106 self
107 }
108
109 pub fn with_observer(mut self, observer: Arc<dyn Observer>) -> Self {
113 self.extra_observers.push(observer);
114 self
115 }
116
117 pub fn with_status(mut self, status: Arc<Status>) -> Self {
125 self.status = Some(status);
126 self
127 }
128
129 #[tracing::instrument(name = "daemon.start", skip_all)]
137 pub async fn start(self) -> anyhow::Result<RunningDaemon> {
138 let Daemon {
139 config,
140 options,
141 backends,
142 extra_observers,
143 status,
144 } = self;
145
146 tracing::info!(
147 slot = %options.slot,
148 publication = %options.publication,
149 indexes = config.indexes.len(),
150 "starting sync",
151 );
152
153 let status = status.unwrap_or_else(|| {
157 Arc::new(Status::new(config.indexes.keys().cloned(), Instant::now()))
158 });
159 status.set_phase(Phase::Starting);
160 let mut observers: Vec<Arc<dyn Observer>> =
163 vec![Arc::new(StatusObserver::new(Arc::clone(&status)))];
164 observers.extend(extra_observers);
165 let observer: Arc<dyn Observer> = Arc::new(FanOut::new(observers));
166
167 let config = Arc::new(config);
171 let SourceParts { capture, documents } =
172 backends.source(Arc::clone(&config), &options).await?;
173 let sink = backends.sink(&config, &options).await?;
174
175 let mut failure_policies = FailurePolicies::new(config.on_error);
178 for (name, index) in &config.indexes {
179 if let Some(policy) = index.on_error {
180 failure_policies = failure_policies.with_override(name.as_ref(), policy);
181 }
182 }
183
184 let engine = Engine::new(Arc::clone(&capture), documents, sink)
185 .with_observer(Arc::clone(&observer))
186 .with_queue_capacity(options.queue_capacity)
187 .skip_backfill(options.skip_backfill)
188 .with_failure_policies(failure_policies);
189
190 Ok(RunningDaemon {
191 status,
192 engine,
193 source: capture,
194 observer,
195 lag_poll_interval: options.lag_poll_interval,
196 })
197 }
198}
199
200#[derive(Debug)]
203pub struct RunningDaemon {
204 status: Arc<Status>,
205 engine: Engine,
206 source: Arc<dyn ChangeCapture>,
207 observer: Arc<dyn Observer>,
208 lag_poll_interval: Duration,
209}
210
211impl RunningDaemon {
212 pub fn status(&self) -> Arc<Status> {
215 Arc::clone(&self.status)
216 }
217
218 #[tracing::instrument(name = "daemon.run", skip_all)]
223 pub async fn run(self, shutdown: impl Future<Output = ()> + Send) -> anyhow::Result<()> {
224 let RunningDaemon {
225 status,
226 engine,
227 source,
228 observer,
229 lag_poll_interval,
230 } = self;
231
232 let _lag = LagGuard(tokio::spawn(lag::poll(source, observer, lag_poll_interval)));
237
238 let result = tokio::select! {
239 res = engine.run() => res.context("sync engine stopped"),
240 () = shutdown => {
241 tracing::info!("shutdown requested; stopping pipeline");
242 Ok(())
243 }
244 };
245
246 status.set_phase(Phase::Stopped);
247 result
248 }
249}
250
251#[derive(Debug)]
255struct LagGuard(tokio::task::JoinHandle<()>);
256
257impl Drop for LagGuard {
258 fn drop(&mut self) {
259 self.0.abort();
260 }
261}
262
263#[cfg(test)]
264#[allow(clippy::unwrap_used)]
265mod tests {
266 use super::*;
267
268 use std::collections::BTreeMap;
269 use std::sync::Mutex;
270 use std::sync::atomic::{AtomicU64, Ordering};
271 use std::time::Duration;
272
273 use async_trait::async_trait;
274 use engine::BatchStats;
275 use futures::stream::{self, BoxStream};
276 use schema::{Source, SourceType};
277 use schema_core::{ColumnName, DatabaseSchema, GenericValue, IndexName, TableName};
278 use sinks_core::{FlushReport, Sink};
279 use sources_core::cdc::{Ack, AckSink, Change, ChangeEvent};
280 use sources_core::document::{Document, DocumentBuilder, DocumentId, IndexScope};
281 use sources_core::{RowKey, SnapshotTable};
282 use tokio::sync::Notify;
283
284 use crate::observer::StatusObserver;
285 use crate::status::{IndexState, Phase};
286
287 fn users() -> IndexName {
288 IndexName::try_new("users").unwrap()
289 }
290
291 #[test]
294 fn observer_drives_status_through_its_lifecycle() {
295 let status = Arc::new(Status::new([users()], Instant::now()));
296 let observer = StatusObserver::new(Arc::clone(&status));
297
298 let snap = status.snapshot();
300 assert_eq!(snap.phase, Phase::Starting);
301 assert_eq!(snap.indexes.get("users"), Some(&IndexState::Pending));
302
303 observer.on_indexes_ensured(1);
304 observer.on_backfill_started(&[users()]);
305 let snap = status.snapshot();
306 assert_eq!(snap.phase, Phase::Backfilling);
307 assert_eq!(snap.indexes.get("users"), Some(&IndexState::Backfilling));
308
309 observer.on_index_seeded(&users());
310 observer.on_backfill_completed();
311 observer.on_live_started();
312
313 observer.on_change_captured();
315 observer.on_change_captured();
316 observer.on_change_captured();
317 observer.on_batch_committed(BatchStats {
318 changes: 3,
319 documents: 2,
320 documents_by_index: vec![(users(), 2)],
321 flush: Duration::from_millis(5),
322 });
323 observer.on_slot_lag(4096);
324
325 let snap = status.snapshot();
326 assert_eq!(snap.phase, Phase::Live);
327 assert_eq!(snap.indexes.get("users"), Some(&IndexState::Seeded));
328 assert_eq!(snap.changes_captured, 3);
329 assert_eq!(snap.changes_committed, 3);
330 assert_eq!(snap.changes_in_flight, 0);
331 assert_eq!(snap.documents_built, 2);
332 assert_eq!(snap.batches, 1);
333 assert_eq!(snap.slot_lag_bytes, Some(4096));
334 assert_eq!(snap.errors, 0);
335
336 let json = serde_json::to_value(&snap).unwrap();
338 assert_eq!(json["phase"], "live");
339 assert_eq!(json["indexes"]["users"], "seeded");
340 assert_eq!(json["changes_in_flight"], 0);
341 assert_eq!(json["slot_lag_bytes"], 4096);
342 }
343
344 #[test]
347 fn already_seeded_index_and_error_phase() {
348 let status = Arc::new(Status::new([users()], Instant::now()));
349 let observer = StatusObserver::new(Arc::clone(&status));
350
351 observer.on_live_started();
353 assert_eq!(
354 status.snapshot().indexes.get("users"),
355 Some(&IndexState::Seeded),
356 "an index live without a backfill this run is reported seeded",
357 );
358
359 observer.on_error("boom");
360 let snap = status.snapshot();
361 assert_eq!(snap.phase, Phase::Stopped);
362 assert_eq!(snap.errors, 1);
363 assert_eq!(snap.last_error.as_deref(), Some("boom"));
364 }
365
366 #[derive(Debug)]
368 struct LaggySource(Option<u64>);
369
370 #[async_trait]
371 impl ChangeCapture for LaggySource {
372 async fn live(
373 &self,
374 ) -> sources_core::Result<BoxStream<'static, sources_core::Result<Change>>> {
375 Ok(Box::pin(stream::empty()))
376 }
377
378 async fn lag(&self) -> sources_core::Result<Option<u64>> {
379 Ok(self.0)
380 }
381 }
382
383 #[derive(Debug, Default)]
386 struct LagObserver {
387 last: Mutex<Option<u64>>,
388 reported: Notify,
389 }
390
391 impl Observer for LagObserver {
392 fn on_slot_lag(&self, bytes: u64) {
393 *self.last.lock().unwrap() = Some(bytes);
394 self.reported.notify_one();
395 }
396 }
397
398 #[tokio::test]
402 async fn lag_poller_reports_each_sampled_value() {
403 let observer = Arc::new(LagObserver::default());
404 let source: Arc<dyn ChangeCapture> = Arc::new(LaggySource(Some(8192)));
405
406 let handle = tokio::spawn(lag::poll(
407 source,
408 Arc::clone(&observer) as Arc<dyn Observer>,
409 Duration::from_millis(5),
410 ));
411 tokio::time::timeout(Duration::from_secs(5), observer.reported.notified())
412 .await
413 .expect("the poller should report a lag sample");
414 handle.abort();
415
416 assert_eq!(*observer.last.lock().unwrap(), Some(8192));
417 }
418
419 #[derive(Debug)]
429 struct MockBackends {
430 capture: Arc<dyn ChangeCapture>,
431 documents: Arc<dyn DocumentBuilder>,
432 sink: Arc<dyn Sink>,
433 source_built: Arc<AtomicU64>,
434 sink_built: Arc<AtomicU64>,
435 }
436
437 #[async_trait]
438 impl Backends for MockBackends {
439 async fn source(
440 &self,
441 _config: Arc<Config>,
442 _options: &DaemonOptions,
443 ) -> anyhow::Result<SourceParts> {
444 self.source_built.fetch_add(1, Ordering::SeqCst);
445 Ok(SourceParts {
446 capture: Arc::clone(&self.capture),
447 documents: Arc::clone(&self.documents),
448 })
449 }
450
451 async fn sink(
452 &self,
453 _config: &Config,
454 _options: &DaemonOptions,
455 ) -> anyhow::Result<Arc<dyn Sink>> {
456 self.sink_built.fetch_add(1, Ordering::SeqCst);
457 Ok(Arc::clone(&self.sink))
458 }
459 }
460
461 #[derive(Debug)]
464 struct ScriptedSource {
465 changes: Mutex<Option<Vec<Change>>>,
466 }
467
468 #[async_trait]
469 impl ChangeCapture for ScriptedSource {
470 async fn live(
471 &self,
472 ) -> sources_core::Result<BoxStream<'static, sources_core::Result<Change>>> {
473 let changes = self.changes.lock().unwrap().take().unwrap_or_default();
474 Ok(Box::pin(stream::iter(
475 changes
476 .into_iter()
477 .map(Ok::<Change, sources_core::SourceError>),
478 )))
479 }
480
481 async fn lag(&self) -> sources_core::Result<Option<u64>> {
482 Ok(None)
483 }
484 }
485
486 #[derive(Debug)]
488 struct ScriptedDocuments;
489
490 #[async_trait]
491 impl DocumentBuilder for ScriptedDocuments {
492 async fn resolve(
493 &self,
494 _table: &TableName,
495 key: &RowKey,
496 ) -> sources_core::Result<Vec<DocumentId>> {
497 Ok(vec![DocumentId {
498 index: users(),
499 key: key.clone(),
500 }])
501 }
502
503 async fn build(&self, id: &DocumentId) -> sources_core::Result<Document> {
504 let deleted = matches!(id.key.0.first(), Some((_, GenericValue::Int(2))));
505 Ok(if deleted {
506 Document::Delete { id: id.clone() }
507 } else {
508 Document::Upsert {
509 id: id.clone(),
510 body: GenericValue::Map(Default::default()),
511 }
512 })
513 }
514
515 fn backfill_scopes(&self) -> Vec<IndexScope> {
516 vec![IndexScope {
517 index: users(),
518 root: SnapshotTable {
519 db_schema: DatabaseSchema::try_new("public").unwrap(),
520 table: TableName::try_new("users").unwrap(),
521 },
522 }]
523 }
524 }
525
526 #[derive(Debug, Default)]
528 struct RecordingSink {
529 ops: Arc<Mutex<Vec<String>>>,
530 }
531
532 #[async_trait]
533 impl Sink for RecordingSink {
534 async fn upsert(
535 &self,
536 index: &IndexName,
537 id: &str,
538 _document: &GenericValue,
539 ) -> sinks_core::Result<()> {
540 self.ops
541 .lock()
542 .unwrap()
543 .push(format!("upsert {} {id}", index.as_ref()));
544 Ok(())
545 }
546
547 async fn delete(&self, index: &IndexName, id: &str) -> sinks_core::Result<()> {
548 self.ops
549 .lock()
550 .unwrap()
551 .push(format!("delete {} {id}", index.as_ref()));
552 Ok(())
553 }
554
555 async fn flush(&self, _caught_up: bool) -> sinks_core::Result<FlushReport> {
556 Ok(FlushReport::clean())
557 }
558 }
559
560 #[derive(Debug)]
562 struct CountingAck(Arc<AtomicU64>);
563
564 impl AckSink for CountingAck {
565 fn confirm(&self, _seq: u64) {
566 self.0.fetch_add(1, Ordering::SeqCst);
567 }
568 }
569
570 fn row_change(delete: bool, id: i64, seq: u64, acks: &Arc<AtomicU64>) -> Change {
571 let table = TableName::try_new("users").unwrap();
572 let key = RowKey(vec![(
573 ColumnName::try_new("id").unwrap(),
574 GenericValue::Int(id),
575 )]);
576 let event = if delete {
577 ChangeEvent::Delete { table, key }
578 } else {
579 ChangeEvent::Upsert { table, key }
580 };
581 Change {
582 event,
583 ack: Ack::new(seq, Arc::new(CountingAck(Arc::clone(acks)))),
584 }
585 }
586
587 fn backendless_config() -> Config {
590 Config {
591 source: Source {
592 source_type: SourceType::Postgres,
593 connection: None,
594 },
595 sinks: BTreeMap::new(),
596 indexes: BTreeMap::new(),
597 on_error: Default::default(),
598 server: Default::default(),
599 }
600 }
601
602 fn daemon_over(backends: Arc<MockBackends>) -> Daemon {
603 Daemon::new(backendless_config(), backends).with_options(DaemonOptions {
604 skip_backfill: true,
606 ..DaemonOptions::default()
607 })
608 }
609
610 #[tokio::test]
614 async fn start_builds_backends_through_the_seam() {
615 let source_built = Arc::new(AtomicU64::new(0));
616 let sink_built = Arc::new(AtomicU64::new(0));
617
618 let backends = Arc::new(MockBackends {
619 capture: Arc::new(ScriptedSource {
620 changes: Mutex::new(Some(Vec::new())),
621 }),
622 documents: Arc::new(ScriptedDocuments),
623 sink: Arc::new(RecordingSink::default()),
624 source_built: Arc::clone(&source_built),
625 sink_built: Arc::clone(&sink_built),
626 });
627
628 let running = daemon_over(backends).start().await.unwrap();
629 let status = running.status();
630
631 assert_eq!(source_built.load(Ordering::SeqCst), 1);
633 assert_eq!(sink_built.load(Ordering::SeqCst), 1);
634
635 running.run(std::future::pending::<()>()).await.unwrap();
637
638 let snap = status.snapshot();
639 assert_eq!(snap.phase, Phase::Stopped);
640 assert_eq!(snap.changes_committed, 0);
641 }
642
643 #[tokio::test]
647 async fn drives_changes_through_injected_backends() {
648 let acks = Arc::new(AtomicU64::new(0));
649 let ops = Arc::new(Mutex::new(Vec::new()));
650
651 let changes = vec![
652 row_change(false, 1, 0, &acks),
653 row_change(true, 2, 1, &acks),
654 ];
655
656 let backends = Arc::new(MockBackends {
657 capture: Arc::new(ScriptedSource {
658 changes: Mutex::new(Some(changes)),
659 }),
660 documents: Arc::new(ScriptedDocuments),
661 sink: Arc::new(RecordingSink {
662 ops: Arc::clone(&ops),
663 }),
664 source_built: Arc::new(AtomicU64::new(0)),
665 sink_built: Arc::new(AtomicU64::new(0)),
666 });
667
668 let running = daemon_over(backends).start().await.unwrap();
669 let status = running.status();
670 running.run(std::future::pending::<()>()).await.unwrap();
671
672 assert_eq!(
673 *ops.lock().unwrap(),
674 vec!["upsert users 1".to_owned(), "delete users 2".to_owned()],
675 );
676 assert_eq!(acks.load(Ordering::SeqCst), 2, "both changes acked");
677
678 let snap = status.snapshot();
679 assert_eq!(snap.changes_captured, 2);
680 assert_eq!(snap.changes_committed, 2);
681 assert_eq!(snap.changes_in_flight, 0);
682 assert_eq!(snap.phase, Phase::Stopped);
683 }
684}