1use futures::future;
8use tokio::task::JoinHandle;
9use tokio_util::sync::CancellationToken;
10
11use crate::config::ObservabilityConfig;
12use crate::observability::ObsHandle;
13
14pub mod cli;
15pub mod config;
16pub mod envelope;
17pub mod observability;
18pub mod pipeline;
19pub mod registry;
20pub mod retry;
21pub mod sinks;
22pub mod sources;
23pub mod transforms;
24
25pub use registry::{Registry, register_builtin};
26pub use retry::{ExhaustedPolicy, RetryPolicy};
27pub use sinks::ManagedSink;
28
29use pipeline::{Pipeline, spawn_pipeline};
30
31pub struct Courier {
34 pipelines: Vec<Pipeline>,
35 observability: Option<ObservabilityConfig>,
40 metrics: ObsHandle,
41}
42
43impl Courier {
44 pub fn new(pipelines: Vec<Pipeline>) -> Self {
45 Self {
46 pipelines,
47 observability: None,
48 metrics: ObsHandle::noop(),
49 }
50 }
51
52 pub fn with_observability(mut self, observability: Option<ObservabilityConfig>) -> Self {
56 self.observability = observability;
57 self
58 }
59
60 pub fn observability(&self) -> Option<&ObservabilityConfig> {
61 self.observability.as_ref()
62 }
63
64 pub(crate) fn with_metrics(mut self, metrics: ObsHandle) -> Self {
65 self.metrics = metrics;
66 self
67 }
68
69 pub fn spawn(self, cancel: CancellationToken) -> Vec<JoinHandle<()>> {
73 let mut handles = Vec::new();
74 for p in self.pipelines {
75 handles.extend(spawn_pipeline(p, cancel.clone()));
76 }
77 handles
78 }
79
80 pub async fn run(self) {
81 let cancel = CancellationToken::new();
82 let metrics = self.metrics.clone();
83
84 let signal_cancel = cancel.clone();
85 let signal_metrics = metrics.clone();
86 tokio::spawn(async move {
87 match tokio::signal::ctrl_c().await {
88 Ok(_) => {
89 log::info!("received shutdown signal, cancelling pipelines");
90 signal_cancel.cancel();
91 signal_metrics.force_flush();
92 crate::observability::force_flush_traces();
93 crate::observability::force_flush_logs();
94 }
95 Err(e) => log::error!("failed to listen for shutdown signal: {e}"),
96 }
97 });
98
99 let handles = self.spawn(cancel);
100 future::join_all(handles).await;
101 metrics.shutdown();
102 crate::observability::shutdown_traces();
103 crate::observability::shutdown_logs();
104 }
105}