use futures::future;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use crate::config::ObservabilityConfig;
use crate::observability::ObsHandle;
pub mod cli;
pub mod config;
pub mod envelope;
pub mod observability;
pub mod pipeline;
pub mod registry;
pub mod retry;
pub mod sinks;
pub mod sources;
pub mod transforms;
pub use registry::{Registry, register_builtin};
pub use retry::{ExhaustedPolicy, RetryPolicy};
pub use sinks::ManagedSink;
use pipeline::{Pipeline, spawn_pipeline};
pub struct Courier {
pipelines: Vec<Pipeline>,
observability: Option<ObservabilityConfig>,
metrics: ObsHandle,
}
impl Courier {
pub fn new(pipelines: Vec<Pipeline>) -> Self {
Self {
pipelines,
observability: None,
metrics: ObsHandle::noop(),
}
}
pub fn with_observability(mut self, observability: Option<ObservabilityConfig>) -> Self {
self.observability = observability;
self
}
pub fn observability(&self) -> Option<&ObservabilityConfig> {
self.observability.as_ref()
}
pub(crate) fn with_metrics(mut self, metrics: ObsHandle) -> Self {
self.metrics = metrics;
self
}
pub fn spawn(self, cancel: CancellationToken) -> Vec<JoinHandle<()>> {
let mut handles = Vec::new();
for p in self.pipelines {
handles.extend(spawn_pipeline(p, cancel.clone()));
}
handles
}
pub async fn run(self) {
let cancel = CancellationToken::new();
let metrics = self.metrics.clone();
let signal_cancel = cancel.clone();
let signal_metrics = metrics.clone();
tokio::spawn(async move {
match tokio::signal::ctrl_c().await {
Ok(_) => {
log::info!("received shutdown signal, cancelling pipelines");
signal_cancel.cancel();
signal_metrics.force_flush();
crate::observability::force_flush_traces();
crate::observability::force_flush_logs();
}
Err(e) => log::error!("failed to listen for shutdown signal: {e}"),
}
});
let handles = self.spawn(cancel);
future::join_all(handles).await;
metrics.shutdown();
crate::observability::shutdown_traces();
crate::observability::shutdown_logs();
}
}