mod backends;
mod lag;
mod observer;
pub mod status;
pub use backends::{Backends, SourceParts};
pub use observer::StatusObserver;
pub use status::{IndexState, Phase, Status, StatusSnapshot};
pub use engine::{BatchStats, Observer};
pub use schema_core::IndexName;
use std::future::Future;
use std::sync::Arc;
use std::time::{Duration, Instant};
use anyhow::Context;
use engine::{Engine, FailurePolicies, FanOut};
use schema::Config;
use sources_core::cdc::ChangeCapture;
#[derive(Debug, Clone)]
pub struct DaemonOptions {
pub slot: String,
pub publication: String,
pub manage_publication: bool,
pub skip_backfill: bool,
pub queue_capacity: usize,
pub pretty: bool,
pub lag_poll_interval: Duration,
}
impl Default for DaemonOptions {
fn default() -> Self {
Self {
slot: "flusso".to_owned(),
publication: "flusso".to_owned(),
manage_publication: true,
skip_backfill: false,
queue_capacity: 1024,
pretty: false,
lag_poll_interval: Duration::from_secs(15),
}
}
}
#[derive(Debug)]
pub struct Daemon {
config: Config,
options: DaemonOptions,
backends: Arc<dyn Backends>,
extra_observers: Vec<Arc<dyn Observer>>,
status: Option<Arc<Status>>,
}
impl Daemon {
pub fn new(config: Config, backends: Arc<dyn Backends>) -> Self {
Self {
config,
options: DaemonOptions::default(),
backends,
extra_observers: Vec::new(),
status: None,
}
}
pub fn with_options(mut self, options: DaemonOptions) -> Self {
self.options = options;
self
}
pub fn with_observer(mut self, observer: Arc<dyn Observer>) -> Self {
self.extra_observers.push(observer);
self
}
pub fn with_status(mut self, status: Arc<Status>) -> Self {
self.status = Some(status);
self
}
#[tracing::instrument(name = "daemon.start", skip_all)]
pub async fn start(self) -> anyhow::Result<RunningDaemon> {
let Daemon {
config,
options,
backends,
extra_observers,
status,
} = self;
tracing::info!(
slot = %options.slot,
publication = %options.publication,
indexes = config.indexes.len(),
"starting sync",
);
let status = status.unwrap_or_else(|| {
Arc::new(Status::new(config.indexes.keys().cloned(), Instant::now()))
});
status.set_phase(Phase::Starting);
let mut observers: Vec<Arc<dyn Observer>> =
vec![Arc::new(StatusObserver::new(Arc::clone(&status)))];
observers.extend(extra_observers);
let observer: Arc<dyn Observer> = Arc::new(FanOut::new(observers));
let config = Arc::new(config);
let SourceParts { capture, documents } =
backends.source(Arc::clone(&config), &options).await?;
let sink = backends.sink(&config, &options).await?;
let mut failure_policies = FailurePolicies::new(config.on_error);
for (name, index) in &config.indexes {
if let Some(policy) = index.on_error {
failure_policies = failure_policies.with_override(name.as_ref(), policy);
}
}
let engine = Engine::new(Arc::clone(&capture), documents, sink)
.with_observer(Arc::clone(&observer))
.with_queue_capacity(options.queue_capacity)
.skip_backfill(options.skip_backfill)
.with_failure_policies(failure_policies);
Ok(RunningDaemon {
status,
engine,
source: capture,
observer,
lag_poll_interval: options.lag_poll_interval,
})
}
}
#[derive(Debug)]
pub struct RunningDaemon {
status: Arc<Status>,
engine: Engine,
source: Arc<dyn ChangeCapture>,
observer: Arc<dyn Observer>,
lag_poll_interval: Duration,
}
impl RunningDaemon {
pub fn status(&self) -> Arc<Status> {
Arc::clone(&self.status)
}
#[tracing::instrument(name = "daemon.run", skip_all)]
pub async fn run(self, shutdown: impl Future<Output = ()> + Send) -> anyhow::Result<()> {
let RunningDaemon {
status,
engine,
source,
observer,
lag_poll_interval,
} = self;
let _lag = LagGuard(tokio::spawn(lag::poll(source, observer, lag_poll_interval)));
let result = tokio::select! {
res = engine.run() => res.context("sync engine stopped"),
() = shutdown => {
tracing::info!("shutdown requested; stopping pipeline");
Ok(())
}
};
status.set_phase(Phase::Stopped);
result
}
}
#[derive(Debug)]
struct LagGuard(tokio::task::JoinHandle<()>);
impl Drop for LagGuard {
fn drop(&mut self) {
self.0.abort();
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests;