Skip to main content

courier/observability/
source_ctx.rs

1//! Source-side helpers for creating root spans and stamping trace context.
2
3use std::time::Instant;
4
5use tokio::sync::mpsc::Sender;
6use tokio_util::sync::CancellationToken;
7use tracing::Instrument;
8use tracing_opentelemetry::OpenTelemetrySpanExt;
9
10use crate::config::redact_secret;
11use crate::envelope::Envelope;
12use crate::observability::NodeCtx;
13use crate::observability::trace_context;
14
15#[derive(Clone)]
16pub struct SourceCtx {
17    node_ctx: NodeCtx,
18}
19
20impl SourceCtx {
21    pub fn new(node_id: impl Into<String>) -> Self {
22        let node_id = node_id.into();
23        Self {
24            node_ctx: NodeCtx::for_node(
25                "",
26                &node_id,
27                crate::observability::NodeKind::Source,
28                crate::observability::ObsHandle::noop(),
29            ),
30        }
31    }
32
33    pub fn from_node_ctx(ctx: NodeCtx) -> Self {
34        Self { node_ctx: ctx }
35    }
36
37    pub async fn send(
38        &self,
39        tx: &Sender<Envelope>,
40        mut env: Envelope,
41        cancel: &CancellationToken,
42    ) -> Result<(), SendStopped> {
43        let span = tracing::info_span!(
44            "courier.source",
45            pipeline = %redact_secret(self.node_ctx.pipeline()),
46            node_id = %redact_secret(self.node_ctx.node_id()),
47            node_kind = "source",
48            envelope.source_id = %env.meta.source_id,
49            envelope.key = if self.node_ctx.log_keys() { env.meta.key.as_deref().unwrap_or("") } else { "" },
50        );
51        if let Some(parent) = trace_context::extract(&env.meta.headers) {
52            let _ = span.set_parent(parent);
53        }
54
55        let span_for_context = span.clone();
56        let started = Instant::now();
57        let result = async move {
58            trace_context::inject(&mut env.meta.headers, &span_for_context.context());
59            tokio::select! {
60                _ = cancel.cancelled() => Err(SendStopped::Cancelled),
61                res = tx.send(env) => res.map_err(|_| SendStopped::DownstreamClosed),
62            }
63        }
64        .instrument(span)
65        .await;
66
67        self.node_ctx
68            .record_stage_duration_ms(started.elapsed().as_secs_f64() * 1000.0);
69        if result.is_ok() {
70            self.node_ctx.record_processed();
71        }
72        result
73    }
74}
75
76#[derive(Debug, Clone, Copy, Eq, PartialEq)]
77pub enum SendStopped {
78    Cancelled,
79    DownstreamClosed,
80}