courier/observability/
source_ctx.rs1use std::time::Instant;
4
5use tokio::sync::mpsc::Sender;
6use tokio_util::sync::CancellationToken;
7use tracing::Instrument;
8use tracing_opentelemetry::OpenTelemetrySpanExt;
9
10use crate::config::redact_secret;
11use crate::envelope::Envelope;
12use crate::observability::NodeCtx;
13use crate::observability::trace_context;
14
15#[derive(Clone)]
16pub struct SourceCtx {
17 node_ctx: NodeCtx,
18}
19
20impl SourceCtx {
21 pub fn new(node_id: impl Into<String>) -> Self {
22 let node_id = node_id.into();
23 Self {
24 node_ctx: NodeCtx::for_node(
25 "",
26 &node_id,
27 crate::observability::NodeKind::Source,
28 crate::observability::ObsHandle::noop(),
29 ),
30 }
31 }
32
33 pub fn from_node_ctx(ctx: NodeCtx) -> Self {
34 Self { node_ctx: ctx }
35 }
36
37 pub async fn send(
38 &self,
39 tx: &Sender<Envelope>,
40 mut env: Envelope,
41 cancel: &CancellationToken,
42 ) -> Result<(), SendStopped> {
43 let span = tracing::info_span!(
44 "courier.source",
45 pipeline = %redact_secret(self.node_ctx.pipeline()),
46 node_id = %redact_secret(self.node_ctx.node_id()),
47 node_kind = "source",
48 envelope.source_id = %env.meta.source_id,
49 envelope.key = if self.node_ctx.log_keys() { env.meta.key.as_deref().unwrap_or("") } else { "" },
50 );
51 if let Some(parent) = trace_context::extract(&env.meta.headers) {
52 let _ = span.set_parent(parent);
53 }
54
55 let span_for_context = span.clone();
56 let started = Instant::now();
57 let result = async move {
58 trace_context::inject(&mut env.meta.headers, &span_for_context.context());
59 tokio::select! {
60 _ = cancel.cancelled() => Err(SendStopped::Cancelled),
61 res = tx.send(env) => res.map_err(|_| SendStopped::DownstreamClosed),
62 }
63 }
64 .instrument(span)
65 .await;
66
67 self.node_ctx
68 .record_stage_duration_ms(started.elapsed().as_secs_f64() * 1000.0);
69 if result.is_ok() {
70 self.node_ctx.record_processed();
71 }
72 result
73 }
74}
75
76#[derive(Debug, Clone, Copy, Eq, PartialEq)]
77pub enum SendStopped {
78 Cancelled,
79 DownstreamClosed,
80}