courier/transforms/
mod.rs1use std::time::Instant;
2
3use anyhow::Result;
4use async_trait::async_trait;
5use tokio::sync::mpsc::{Receiver, Sender};
6use tokio_util::sync::CancellationToken;
7use tracing::Instrument;
8use tracing_opentelemetry::OpenTelemetrySpanExt;
9
10use crate::config::redact_secret;
11use crate::envelope::Envelope;
12use crate::observability::NodeCtx;
13use crate::observability::trace_context;
14use crate::pipeline::ErrorPolicy;
15
16pub mod batch;
17pub mod filter;
18pub mod mutate;
19pub mod script;
20pub mod set_key;
21
22#[async_trait]
28pub trait Transform: Send + Sync {
29 fn id(&self) -> &str;
30
31 fn set_node_ctx(&mut self, _ctx: NodeCtx) {}
37
38 async fn run(
39 self: Box<Self>,
40 rx: Receiver<Envelope>,
41 tx: Sender<Envelope>,
42 cancel: CancellationToken,
43 );
44}
45
46#[async_trait]
51pub trait MapOne: Send + Sync {
52 fn id(&self) -> &str;
53
54 async fn map(&self, env: Envelope) -> Result<Option<Envelope>>;
55}
56
57pub struct BasicTransform<M: MapOne> {
59 pub inner: M,
60 pub on_error: ErrorPolicy,
61 node_ctx: NodeCtx,
62}
63
64impl<M: MapOne> BasicTransform<M> {
65 pub fn new(inner: M) -> Self {
66 Self {
67 inner,
68 on_error: ErrorPolicy::Drop,
69 node_ctx: NodeCtx::noop(),
70 }
71 }
72
73 pub fn with_error_policy(mut self, policy: ErrorPolicy) -> Self {
74 self.on_error = policy;
75 self
76 }
77}
78
79#[async_trait]
80impl<M: MapOne + 'static> Transform for BasicTransform<M> {
81 fn id(&self) -> &str {
82 self.inner.id()
83 }
84
85 fn set_node_ctx(&mut self, ctx: NodeCtx) {
86 self.node_ctx = ctx;
87 }
88
89 async fn run(
90 self: Box<Self>,
91 mut rx: Receiver<Envelope>,
92 tx: Sender<Envelope>,
93 cancel: CancellationToken,
94 ) {
95 let id = self.inner.id().to_string();
96 let ctx = self.node_ctx;
97 loop {
98 tokio::select! {
99 _ = cancel.cancelled() => break,
100 maybe = rx.recv() => {
101 let Some(env) = maybe else { break };
102 let span = tracing::info_span!(
103 "courier.transform",
104 pipeline = %redact_secret(ctx.pipeline()),
105 node_id = %redact_secret(ctx.node_id()),
106 node_kind = %ctx.node_kind_str(),
107 envelope.source_id = %env.meta.source_id,
108 envelope.key = if ctx.log_keys() { env.meta.key.as_deref().unwrap_or("") } else { "" },
109 );
110 if let Some(parent) = trace_context::extract(&env.meta.headers) {
111 let _ = span.set_parent(parent);
112 }
113 let span_context = span.context();
114 let started = Instant::now();
115 let result = self.inner.map(env).instrument(span.clone()).await;
116 ctx.record_stage_duration_ms(started.elapsed().as_secs_f64() * 1000.0);
117 match result {
118 Ok(Some(mut out)) => {
119 ctx.record_processed();
120 trace_context::inject(&mut out.meta.headers, &span_context);
121 if tx.send(out).await.is_err() {
122 tracing::debug!(node_id = %redact_secret(&id), "downstream closed");
123 return;
124 }
125 }
126 Ok(None) => {
127 ctx.record_filtered();
128 }
129 Err(e) => {
130 ctx.record_failed();
131 match &self.on_error {
132 ErrorPolicy::Drop => {
133 tracing::error!(node_id = %redact_secret(&id), error = %e, "map failed, dropping");
134 }
135 ErrorPolicy::FailPipeline => {
136 tracing::error!(node_id = %redact_secret(&id), error = %e, "map failed, failing pipeline");
137 cancel.cancel();
138 break;
139 }
140 }
141 }
142 }
143 }
144 }
145 }
146 }
147}