Skip to main content

courier/sinks/
mod.rs

1use std::time::{Instant, SystemTime, UNIX_EPOCH};
2
3use anyhow::Result;
4use async_trait::async_trait;
5use tokio::sync::mpsc::Receiver;
6use tokio_util::sync::CancellationToken;
7use tracing::Instrument;
8use tracing_opentelemetry::OpenTelemetrySpanExt;
9
10use self::retry::WriteOutcome;
11use crate::config::redact_secret;
12use crate::envelope::Envelope;
13use crate::observability::NodeCtx;
14use crate::observability::trace_context;
15use crate::pipeline::ErrorPolicy;
16use crate::retry::RetryPolicy;
17
18pub mod api;
19pub mod file;
20pub mod kafka;
21mod retry;
22pub mod sql;
23
24/// Full-control sink: owns the receiver loop.
25///
26/// Implement directly when the sink needs buffering, background work, or
27/// a custom shutdown drain. Most sinks should implement `WriteOne` instead
28/// and wrap themselves in `ManagedSink`.
29#[async_trait]
30pub trait Sink: Send + Sync {
31    fn id(&self) -> &str;
32
33    /// Attach the per-node observability context. Called by
34    /// `spawn_pipeline` after the sink is built but before it runs.
35    /// Default no-op — full-control sinks that want metrics override
36    /// this and store the ctx; `ManagedSink` already does so.
37    fn set_node_ctx(&mut self, _ctx: NodeCtx) {}
38
39    async fn run(self: Box<Self>, rx: Receiver<Envelope>, cancel: CancellationToken);
40}
41
42/// Ergonomic sink: "write one envelope, report result".
43///
44/// `ManagedSink` turns a `WriteOne` into a `Sink` with the standard recv
45/// loop, cancellation, error policy, and optional retry with exponential
46/// back-off. Cross-cutting wrappers (rate limiting, batching) compose over
47/// `WriteOne` the same way and plug into `ManagedSink` unchanged.
48#[async_trait]
49pub trait WriteOne: Send + Sync {
50    fn id(&self) -> &str;
51
52    async fn write(&self, env: &Envelope) -> Result<()>;
53}
54
55/// Adapter that turns any [`WriteOne`] into a [`Sink`].
56///
57/// Manages the recv loop, graceful cancellation, error policy, and optional
58/// retry with exponential back-off and a configurable exhaustion policy.
59pub struct ManagedSink<W: WriteOne> {
60    pub inner: W,
61    pub on_error: ErrorPolicy,
62    pub retry: Option<RetryPolicy>,
63    node_ctx: NodeCtx,
64}
65
66impl<W: WriteOne> ManagedSink<W> {
67    pub fn new(inner: W) -> Self {
68        Self {
69            inner,
70            on_error: ErrorPolicy::Drop,
71            retry: None,
72            node_ctx: NodeCtx::noop(),
73        }
74    }
75
76    pub fn with_error_policy(mut self, policy: ErrorPolicy) -> Self {
77        self.on_error = policy;
78        self
79    }
80
81    pub fn with_retry(mut self, policy: RetryPolicy) -> Self {
82        self.retry = Some(policy);
83        self
84    }
85}
86
87#[async_trait]
88impl<W: WriteOne + 'static> Sink for ManagedSink<W> {
89    fn id(&self) -> &str {
90        self.inner.id()
91    }
92
93    fn set_node_ctx(&mut self, ctx: NodeCtx) {
94        self.node_ctx = ctx;
95    }
96
97    async fn run(self: Box<Self>, mut rx: Receiver<Envelope>, cancel: CancellationToken) {
98        let id = self.inner.id().to_string();
99        let ctx = self.node_ctx;
100        loop {
101            tokio::select! {
102                _ = cancel.cancelled() => {
103                    tracing::debug!(node_id = %redact_secret(&id), reason = "cancel", "sink drained on cancellation");
104                    break;
105                }
106                maybe = rx.recv() => {
107                    let Some(mut env) = maybe else {
108                        tracing::debug!(node_id = %redact_secret(&id), reason = "upstream_closed", "sink loop ending");
109                        break;
110                    };
111                    let span = tracing::info_span!(
112                        "courier.sink",
113                        pipeline = %redact_secret(ctx.pipeline()),
114                        node_id = %redact_secret(ctx.node_id()),
115                        node_kind = %ctx.node_kind_str(),
116                        envelope.source_id = %env.meta.source_id,
117                        envelope.key = if ctx.log_keys() { env.meta.key.as_deref().unwrap_or("") } else { "" },
118                    );
119                    if let Some(parent) = trace_context::extract(&env.meta.headers) {
120                        let _ = span.set_parent(parent);
121                    }
122                    trace_context::inject(&mut env.meta.headers, &span.context());
123                    let started = Instant::now();
124                    let result = async {
125                        match &self.retry {
126                            Some(policy) => retry::write_with_retry(&self.inner, &env, policy, &ctx, &cancel).await,
127                            None => self.inner.write(&env).await.map(|()| WriteOutcome::Written),
128                        }
129                    }
130                    .instrument(span)
131                    .await;
132                    ctx.record_stage_duration_ms(started.elapsed().as_secs_f64() * 1000.0);
133
134                    match result {
135                        Ok(WriteOutcome::Written) => {
136                            ctx.record_processed();
137                            if let Some(latency_ms) = end_to_end_latency_ms(&env) {
138                                ctx.record_end_to_end_latency_ms(latency_ms);
139                            }
140                        }
141                        Ok(WriteOutcome::DeadLettered) => {
142                            ctx.record_failed();
143                        }
144                        Ok(WriteOutcome::Cancelled) => {
145                            tracing::debug!(node_id = %redact_secret(&id), reason = "cancel", "sink retry interrupted by cancellation");
146                            break;
147                        }
148                        Err(e) => {
149                            ctx.record_failed();
150                            match &self.on_error {
151                                ErrorPolicy::Drop => {
152                                    tracing::error!(node_id = %redact_secret(&id), error = %e, "write failed, dropping");
153                                }
154                                ErrorPolicy::FailPipeline => {
155                                    tracing::error!(node_id = %redact_secret(&id), error = %e, "write failed, failing pipeline");
156                                    cancel.cancel();
157                                    break;
158                                }
159                            }
160                        }
161                    }
162                }
163            }
164        }
165    }
166}
167
168/// Best-effort `now - env.meta.timestamp_ms`. `Envelope::new` always
169/// stamps a timestamp, but a custom source could build an envelope
170/// directly from `Meta::default()` (where `timestamp_ms` is 0) — that
171/// case and any future-skewed clock are skipped.
172fn end_to_end_latency_ms(env: &Envelope) -> Option<f64> {
173    let now_ms = SystemTime::now()
174        .duration_since(UNIX_EPOCH)
175        .ok()?
176        .as_millis() as u64;
177    let started_ms = env.meta.timestamp_ms;
178    if started_ms == 0 || started_ms > now_ms {
179        return None;
180    }
181    Some((now_ms - started_ms) as f64)
182}
183
184#[cfg(test)]
185mod tests {
186    use anyhow::{Result, anyhow};
187    use async_trait::async_trait;
188    use serde_json::json;
189    use tokio::sync::mpsc;
190    use tokio_util::sync::CancellationToken;
191
192    use super::*;
193    use crate::observability::NodeKind;
194    use crate::observability::metrics::testing::{
195        counter_sum, histogram_count, obs_handle_in_memory,
196    };
197    use crate::retry::{ExhaustedPolicy, RetryPolicy};
198
199    struct AlwaysFailSink;
200
201    #[async_trait]
202    impl WriteOne for AlwaysFailSink {
203        fn id(&self) -> &str {
204            "fail"
205        }
206
207        async fn write(&self, _env: &Envelope) -> Result<()> {
208            Err(anyhow!("simulated failure"))
209        }
210    }
211
212    fn no_delay_dead_letter_policy(path: std::path::PathBuf) -> RetryPolicy {
213        RetryPolicy {
214            max_attempts: 1,
215            initial_delay_ms: 1,
216            backoff_multiplier: 1.0,
217            max_delay_ms: 1,
218            on_exhausted: ExhaustedPolicy::DeadLetter { path },
219        }
220    }
221
222    #[tokio::test]
223    async fn dead_lettered_sink_write_is_not_recorded_as_processed() {
224        let (handle, exporter) = obs_handle_in_memory();
225        let ctx = NodeCtx::for_node("p", "p/sink0", NodeKind::Sink, handle.clone());
226        let dir = tempfile::tempdir().unwrap();
227        let policy = no_delay_dead_letter_policy(dir.path().join("dead.jsonl"));
228        let mut sink = ManagedSink::new(AlwaysFailSink).with_retry(policy);
229        sink.set_node_ctx(ctx);
230
231        let (tx, rx) = mpsc::channel(1);
232        tx.send(Envelope::new("src", json!({"x": 1})))
233            .await
234            .unwrap();
235        drop(tx);
236
237        Box::new(sink).run(rx, CancellationToken::new()).await;
238        handle.shutdown();
239
240        let attrs = &[("pipeline", "p"), ("node_id", "p/sink0")];
241        assert_eq!(
242            counter_sum(&exporter, "courier_envelopes_processed_total", attrs),
243            0
244        );
245        assert_eq!(
246            counter_sum(&exporter, "courier_envelopes_failed_total", attrs),
247            1
248        );
249        assert_eq!(
250            counter_sum(&exporter, "courier_dead_lettered_total", attrs),
251            1
252        );
253        assert_eq!(
254            histogram_count(&exporter, "courier_end_to_end_latency_milliseconds", attrs),
255            0
256        );
257        assert_eq!(
258            histogram_count(&exporter, "courier_stage_duration_milliseconds", attrs),
259            1
260        );
261    }
262}