1use std::time::{Instant, SystemTime, UNIX_EPOCH};
2
3use anyhow::Result;
4use async_trait::async_trait;
5use tokio::sync::mpsc::Receiver;
6use tokio_util::sync::CancellationToken;
7use tracing::Instrument;
8use tracing_opentelemetry::OpenTelemetrySpanExt;
9
10use self::retry::WriteOutcome;
11use crate::config::redact_secret;
12use crate::envelope::Envelope;
13use crate::observability::NodeCtx;
14use crate::observability::trace_context;
15use crate::pipeline::ErrorPolicy;
16use crate::retry::RetryPolicy;
17
18pub mod api;
19pub mod file;
20pub mod kafka;
21mod retry;
22pub mod sql;
23
24#[async_trait]
30pub trait Sink: Send + Sync {
31 fn id(&self) -> &str;
32
33 fn set_node_ctx(&mut self, _ctx: NodeCtx) {}
38
39 async fn run(self: Box<Self>, rx: Receiver<Envelope>, cancel: CancellationToken);
40}
41
42#[async_trait]
49pub trait WriteOne: Send + Sync {
50 fn id(&self) -> &str;
51
52 async fn write(&self, env: &Envelope) -> Result<()>;
53}
54
55pub struct ManagedSink<W: WriteOne> {
60 pub inner: W,
61 pub on_error: ErrorPolicy,
62 pub retry: Option<RetryPolicy>,
63 node_ctx: NodeCtx,
64}
65
66impl<W: WriteOne> ManagedSink<W> {
67 pub fn new(inner: W) -> Self {
68 Self {
69 inner,
70 on_error: ErrorPolicy::Drop,
71 retry: None,
72 node_ctx: NodeCtx::noop(),
73 }
74 }
75
76 pub fn with_error_policy(mut self, policy: ErrorPolicy) -> Self {
77 self.on_error = policy;
78 self
79 }
80
81 pub fn with_retry(mut self, policy: RetryPolicy) -> Self {
82 self.retry = Some(policy);
83 self
84 }
85}
86
87#[async_trait]
88impl<W: WriteOne + 'static> Sink for ManagedSink<W> {
89 fn id(&self) -> &str {
90 self.inner.id()
91 }
92
93 fn set_node_ctx(&mut self, ctx: NodeCtx) {
94 self.node_ctx = ctx;
95 }
96
97 async fn run(self: Box<Self>, mut rx: Receiver<Envelope>, cancel: CancellationToken) {
98 let id = self.inner.id().to_string();
99 let ctx = self.node_ctx;
100 loop {
101 tokio::select! {
102 _ = cancel.cancelled() => {
103 tracing::debug!(node_id = %redact_secret(&id), reason = "cancel", "sink drained on cancellation");
104 break;
105 }
106 maybe = rx.recv() => {
107 let Some(mut env) = maybe else {
108 tracing::debug!(node_id = %redact_secret(&id), reason = "upstream_closed", "sink loop ending");
109 break;
110 };
111 let span = tracing::info_span!(
112 "courier.sink",
113 pipeline = %redact_secret(ctx.pipeline()),
114 node_id = %redact_secret(ctx.node_id()),
115 node_kind = %ctx.node_kind_str(),
116 envelope.source_id = %env.meta.source_id,
117 envelope.key = if ctx.log_keys() { env.meta.key.as_deref().unwrap_or("") } else { "" },
118 );
119 if let Some(parent) = trace_context::extract(&env.meta.headers) {
120 let _ = span.set_parent(parent);
121 }
122 trace_context::inject(&mut env.meta.headers, &span.context());
123 let started = Instant::now();
124 let result = async {
125 match &self.retry {
126 Some(policy) => retry::write_with_retry(&self.inner, &env, policy, &ctx, &cancel).await,
127 None => self.inner.write(&env).await.map(|()| WriteOutcome::Written),
128 }
129 }
130 .instrument(span)
131 .await;
132 ctx.record_stage_duration_ms(started.elapsed().as_secs_f64() * 1000.0);
133
134 match result {
135 Ok(WriteOutcome::Written) => {
136 ctx.record_processed();
137 if let Some(latency_ms) = end_to_end_latency_ms(&env) {
138 ctx.record_end_to_end_latency_ms(latency_ms);
139 }
140 }
141 Ok(WriteOutcome::DeadLettered) => {
142 ctx.record_failed();
143 }
144 Ok(WriteOutcome::Cancelled) => {
145 tracing::debug!(node_id = %redact_secret(&id), reason = "cancel", "sink retry interrupted by cancellation");
146 break;
147 }
148 Err(e) => {
149 ctx.record_failed();
150 match &self.on_error {
151 ErrorPolicy::Drop => {
152 tracing::error!(node_id = %redact_secret(&id), error = %e, "write failed, dropping");
153 }
154 ErrorPolicy::FailPipeline => {
155 tracing::error!(node_id = %redact_secret(&id), error = %e, "write failed, failing pipeline");
156 cancel.cancel();
157 break;
158 }
159 }
160 }
161 }
162 }
163 }
164 }
165 }
166}
167
168fn end_to_end_latency_ms(env: &Envelope) -> Option<f64> {
173 let now_ms = SystemTime::now()
174 .duration_since(UNIX_EPOCH)
175 .ok()?
176 .as_millis() as u64;
177 let started_ms = env.meta.timestamp_ms;
178 if started_ms == 0 || started_ms > now_ms {
179 return None;
180 }
181 Some((now_ms - started_ms) as f64)
182}
183
184#[cfg(test)]
185mod tests {
186 use anyhow::{Result, anyhow};
187 use async_trait::async_trait;
188 use serde_json::json;
189 use tokio::sync::mpsc;
190 use tokio_util::sync::CancellationToken;
191
192 use super::*;
193 use crate::observability::NodeKind;
194 use crate::observability::metrics::testing::{
195 counter_sum, histogram_count, obs_handle_in_memory,
196 };
197 use crate::retry::{ExhaustedPolicy, RetryPolicy};
198
199 struct AlwaysFailSink;
200
201 #[async_trait]
202 impl WriteOne for AlwaysFailSink {
203 fn id(&self) -> &str {
204 "fail"
205 }
206
207 async fn write(&self, _env: &Envelope) -> Result<()> {
208 Err(anyhow!("simulated failure"))
209 }
210 }
211
212 fn no_delay_dead_letter_policy(path: std::path::PathBuf) -> RetryPolicy {
213 RetryPolicy {
214 max_attempts: 1,
215 initial_delay_ms: 1,
216 backoff_multiplier: 1.0,
217 max_delay_ms: 1,
218 on_exhausted: ExhaustedPolicy::DeadLetter { path },
219 }
220 }
221
222 #[tokio::test]
223 async fn dead_lettered_sink_write_is_not_recorded_as_processed() {
224 let (handle, exporter) = obs_handle_in_memory();
225 let ctx = NodeCtx::for_node("p", "p/sink0", NodeKind::Sink, handle.clone());
226 let dir = tempfile::tempdir().unwrap();
227 let policy = no_delay_dead_letter_policy(dir.path().join("dead.jsonl"));
228 let mut sink = ManagedSink::new(AlwaysFailSink).with_retry(policy);
229 sink.set_node_ctx(ctx);
230
231 let (tx, rx) = mpsc::channel(1);
232 tx.send(Envelope::new("src", json!({"x": 1})))
233 .await
234 .unwrap();
235 drop(tx);
236
237 Box::new(sink).run(rx, CancellationToken::new()).await;
238 handle.shutdown();
239
240 let attrs = &[("pipeline", "p"), ("node_id", "p/sink0")];
241 assert_eq!(
242 counter_sum(&exporter, "courier_envelopes_processed_total", attrs),
243 0
244 );
245 assert_eq!(
246 counter_sum(&exporter, "courier_envelopes_failed_total", attrs),
247 1
248 );
249 assert_eq!(
250 counter_sum(&exporter, "courier_dead_lettered_total", attrs),
251 1
252 );
253 assert_eq!(
254 histogram_count(&exporter, "courier_end_to_end_latency_milliseconds", attrs),
255 0
256 );
257 assert_eq!(
258 histogram_count(&exporter, "courier_stage_duration_milliseconds", attrs),
259 1
260 );
261 }
262}