Skip to main content

courier/transforms/
mod.rs

1use std::time::Instant;
2
3use anyhow::Result;
4use async_trait::async_trait;
5use tokio::sync::mpsc::{Receiver, Sender};
6use tokio_util::sync::CancellationToken;
7use tracing::Instrument;
8use tracing_opentelemetry::OpenTelemetrySpanExt;
9
10use crate::config::redact_secret;
11use crate::envelope::Envelope;
12use crate::observability::NodeCtx;
13use crate::observability::trace_context;
14use crate::pipeline::ErrorPolicy;
15
16pub mod batch;
17pub mod filter;
18pub mod mutate;
19pub mod script;
20pub mod set_key;
21
22/// Full-control transform: owns both channels.
23///
24/// Implement directly for cardinality changes or stateful work:
25/// batching (`N -> 1`), flat-map (`1 -> N`), joins, windowed aggregation.
26/// For plain `1 -> 0-or-1` transforms, implement `MapOne` instead.
27#[async_trait]
28pub trait Transform: Send + Sync {
29    fn id(&self) -> &str;
30
31    /// Attach the per-node observability context. Called by
32    /// `spawn_pipeline` after the transform is built but before it
33    /// runs. Default no-op — full-control transforms that want
34    /// metrics override this and store the ctx; `BasicTransform`
35    /// already does so for the common path.
36    fn set_node_ctx(&mut self, _ctx: NodeCtx) {}
37
38    async fn run(
39        self: Box<Self>,
40        rx: Receiver<Envelope>,
41        tx: Sender<Envelope>,
42        cancel: CancellationToken,
43    );
44}
45
46/// Ergonomic transform: "receive one, return at most one".
47///
48/// Returning `Ok(None)` filters the envelope out. `BasicTransform` wraps
49/// a `MapOne` into a `Transform` with the standard loop and error policy.
50#[async_trait]
51pub trait MapOne: Send + Sync {
52    fn id(&self) -> &str;
53
54    async fn map(&self, env: Envelope) -> Result<Option<Envelope>>;
55}
56
57/// Adapter that turns any `MapOne` into a `Transform`.
58pub struct BasicTransform<M: MapOne> {
59    pub inner: M,
60    pub on_error: ErrorPolicy,
61    node_ctx: NodeCtx,
62}
63
64impl<M: MapOne> BasicTransform<M> {
65    pub fn new(inner: M) -> Self {
66        Self {
67            inner,
68            on_error: ErrorPolicy::Drop,
69            node_ctx: NodeCtx::noop(),
70        }
71    }
72
73    pub fn with_error_policy(mut self, policy: ErrorPolicy) -> Self {
74        self.on_error = policy;
75        self
76    }
77}
78
79#[async_trait]
80impl<M: MapOne + 'static> Transform for BasicTransform<M> {
81    fn id(&self) -> &str {
82        self.inner.id()
83    }
84
85    fn set_node_ctx(&mut self, ctx: NodeCtx) {
86        self.node_ctx = ctx;
87    }
88
89    async fn run(
90        self: Box<Self>,
91        mut rx: Receiver<Envelope>,
92        tx: Sender<Envelope>,
93        cancel: CancellationToken,
94    ) {
95        let id = self.inner.id().to_string();
96        let ctx = self.node_ctx;
97        loop {
98            tokio::select! {
99                _ = cancel.cancelled() => break,
100                maybe = rx.recv() => {
101                    let Some(env) = maybe else { break };
102                    let span = tracing::info_span!(
103                        "courier.transform",
104                        pipeline = %redact_secret(ctx.pipeline()),
105                        node_id = %redact_secret(ctx.node_id()),
106                        node_kind = %ctx.node_kind_str(),
107                        envelope.source_id = %env.meta.source_id,
108                        envelope.key = if ctx.log_keys() { env.meta.key.as_deref().unwrap_or("") } else { "" },
109                    );
110                    if let Some(parent) = trace_context::extract(&env.meta.headers) {
111                        let _ = span.set_parent(parent);
112                    }
113                    let span_context = span.context();
114                    let started = Instant::now();
115                    let result = self.inner.map(env).instrument(span.clone()).await;
116                    ctx.record_stage_duration_ms(started.elapsed().as_secs_f64() * 1000.0);
117                    match result {
118                        Ok(Some(mut out)) => {
119                            ctx.record_processed();
120                            trace_context::inject(&mut out.meta.headers, &span_context);
121                            if tx.send(out).await.is_err() {
122                                tracing::debug!(node_id = %redact_secret(&id), "downstream closed");
123                                return;
124                            }
125                        }
126                        Ok(None) => {
127                            ctx.record_filtered();
128                        }
129                        Err(e) => {
130                            ctx.record_failed();
131                            match &self.on_error {
132                                ErrorPolicy::Drop => {
133                                    tracing::error!(node_id = %redact_secret(&id), error = %e, "map failed, dropping");
134                                }
135                                ErrorPolicy::FailPipeline => {
136                                    tracing::error!(node_id = %redact_secret(&id), error = %e, "map failed, failing pipeline");
137                                    cancel.cancel();
138                                    break;
139                                }
140                            }
141                        }
142                    }
143                }
144            }
145        }
146    }
147}