#![doc = include_str!("../README.md")]
#![cfg_attr(test, allow(unused_crate_dependencies))]
mod error;
mod observer;
mod pipeline;
mod policy;
#[cfg(test)]
mod tests;
pub use error::*;
pub use observer::*;
pub use policy::{BatchPolicy, FailurePolicies, FailurePolicy};
use std::sync::Arc;
use sinks_core::Sink;
use sources_core::cdc::ChangeCapture;
use sources_core::document::DocumentBuilder;
use crate::pipeline::{Pipeline, run_inner};
const DEFAULT_QUEUE_CAPACITY: usize = 1024;
#[derive(Debug)]
pub struct Engine {
source: Arc<dyn ChangeCapture>,
documents: Arc<dyn DocumentBuilder>,
sink: Arc<dyn Sink>,
observer: Arc<dyn Observer>,
queue_capacity: usize,
batch: BatchPolicy,
skip_backfill: bool,
failure_policies: FailurePolicies,
}
impl Engine {
pub fn new(
source: Arc<dyn ChangeCapture>,
documents: Arc<dyn DocumentBuilder>,
sink: Arc<dyn Sink>,
) -> Self {
Self {
source,
documents,
sink,
observer: Arc::new(NoopObserver),
queue_capacity: DEFAULT_QUEUE_CAPACITY,
batch: BatchPolicy::default(),
skip_backfill: false,
failure_policies: FailurePolicies::default(),
}
}
pub fn with_observer(mut self, observer: Arc<dyn Observer>) -> Self {
self.observer = observer;
self
}
pub fn with_queue_capacity(mut self, capacity: usize) -> Self {
self.queue_capacity = capacity.max(1);
self
}
pub fn with_batch(mut self, batch: BatchPolicy) -> Self {
self.batch = BatchPolicy {
max_changes: batch.max_changes.max(1),
..batch
};
self
}
pub fn skip_backfill(mut self, skip: bool) -> Self {
self.skip_backfill = skip;
self
}
pub fn with_failure_policies(mut self, policies: FailurePolicies) -> Self {
self.failure_policies = policies;
self
}
#[tracing::instrument(
name = "engine.run",
skip_all,
fields(
skip_backfill = self.skip_backfill,
queue_capacity = self.queue_capacity,
max_changes = self.batch.max_changes,
max_delay_ms = self.batch.max_delay.as_millis() as u64,
),
)]
pub async fn run(self) -> Result<()> {
let Engine {
source,
documents,
sink,
observer,
queue_capacity,
batch,
skip_backfill,
failure_policies,
} = self;
let pipeline = Pipeline {
documents: documents.as_ref(),
sink: sink.as_ref(),
observer: &observer,
queue_capacity,
batch,
failure_policies: &failure_policies,
};
let result = run_inner(pipeline, source.as_ref(), skip_backfill).await;
if let Err(error) = &result {
observer.on_error(&error.to_string());
}
result
}
}