Skip to main content

courier/
lib.rs

1//! Courier — async pipelines composed as `Source → Transform* → Sink[]`.
2//!
3//! Nodes communicate via `tokio::mpsc` channels of `Envelope`. Each node
4//! runs as its own task; the shared `CancellationToken` triggers a
5//! graceful drain on SIGINT/Ctrl+C.
6
7use futures::future;
8use tokio::task::JoinHandle;
9use tokio_util::sync::CancellationToken;
10
11use crate::config::ObservabilityConfig;
12use crate::observability::ObsHandle;
13
14pub mod cli;
15pub mod config;
16pub mod envelope;
17pub mod observability;
18pub mod pipeline;
19pub mod registry;
20pub mod retry;
21pub mod sinks;
22pub mod sources;
23pub mod transforms;
24
25pub use registry::{Registry, register_builtin};
26pub use retry::{ExhaustedPolicy, RetryPolicy};
27pub use sinks::ManagedSink;
28
29use pipeline::{Pipeline, spawn_pipeline};
30
31/// Top-level runtime. Owns every pipeline; `run` blocks until all of them
32/// exit (cancellation, upstream closure, or unrecoverable error).
33pub struct Courier {
34    pipelines: Vec<Pipeline>,
35    /// Parsed observability config carried through from `Config`. Stored
36    /// here so subsequent PRs can wire OTLP exporters and force-flush
37    /// providers on shutdown without changing this signature again.
38    /// `None` means "use built-in defaults".
39    observability: Option<ObservabilityConfig>,
40    metrics: ObsHandle,
41}
42
43impl Courier {
44    pub fn new(pipelines: Vec<Pipeline>) -> Self {
45        Self {
46            pipelines,
47            observability: None,
48            metrics: ObsHandle::noop(),
49        }
50    }
51
52    /// Attach the observability config parsed from `[observability]`.
53    /// Builder shape so tests and `Registry::build_courier` keep using
54    /// `Courier::new(...)` without a forced extra argument.
55    pub fn with_observability(mut self, observability: Option<ObservabilityConfig>) -> Self {
56        self.observability = observability;
57        self
58    }
59
60    pub fn observability(&self) -> Option<&ObservabilityConfig> {
61        self.observability.as_ref()
62    }
63
64    pub(crate) fn with_metrics(mut self, metrics: ObsHandle) -> Self {
65        self.metrics = metrics;
66        self
67    }
68
69    /// Spawn every pipeline as tokio tasks under the given cancel token.
70    /// Caller is responsible for awaiting the returned handles and firing
71    /// the token on shutdown. `run` wraps this with a SIGINT handler.
72    pub fn spawn(self, cancel: CancellationToken) -> Vec<JoinHandle<()>> {
73        let mut handles = Vec::new();
74        for p in self.pipelines {
75            handles.extend(spawn_pipeline(p, cancel.clone()));
76        }
77        handles
78    }
79
80    pub async fn run(self) {
81        let cancel = CancellationToken::new();
82        let metrics = self.metrics.clone();
83
84        let signal_cancel = cancel.clone();
85        let signal_metrics = metrics.clone();
86        tokio::spawn(async move {
87            match tokio::signal::ctrl_c().await {
88                Ok(_) => {
89                    log::info!("received shutdown signal, cancelling pipelines");
90                    signal_cancel.cancel();
91                    signal_metrics.force_flush();
92                    crate::observability::force_flush_traces();
93                    crate::observability::force_flush_logs();
94                }
95                Err(e) => log::error!("failed to listen for shutdown signal: {e}"),
96            }
97        });
98
99        let handles = self.spawn(cancel);
100        future::join_all(handles).await;
101        metrics.shutdown();
102        crate::observability::shutdown_traces();
103        crate::observability::shutdown_logs();
104    }
105}