1use crate::types::ExecutionRequest;
2use tracing::Span;
3
4pub trait Telemetry: Send + Sync {
5 fn runner_span(&self, request: &ExecutionRequest) -> Span;
6 fn clone_box(&self) -> Box<dyn Telemetry>;
7}
8
9impl Clone for Box<dyn Telemetry> {
10 fn clone(&self) -> Self {
11 self.clone_box()
12 }
13}
14
15#[derive(Clone, Default)]
16pub struct NoopTelemetry;
17
18impl Telemetry for NoopTelemetry {
19 fn runner_span(&self, _request: &ExecutionRequest) -> Span {
20 Span::none()
21 }
22
23 fn clone_box(&self) -> Box<dyn Telemetry> {
24 Box::new(self.clone())
25 }
26}
27
28#[cfg(test)]
29mod tests {
30 use super::*;
31 use crate::types::{ExecutionContext, ExecutionRequest};
32
33 fn build_request() -> ExecutionRequest {
34 ExecutionRequest {
35 protocol_version: "2".to_string(),
36 request_id: "req-1".to_string(),
37 job_id: "job-1".to_string(),
38 function_name: "handler".to_string(),
39 params: std::collections::HashMap::new(),
40 context: ExecutionContext {
41 job_id: "job-1".to_string(),
42 attempt: 1,
43 enqueue_time: "2024-01-01T00:00:00Z".parse().unwrap(),
44 queue_name: "default".to_string(),
45 deadline: None,
46 trace_context: None,
47 worker_id: None,
48 },
49 }
50 }
51
52 #[test]
53 fn noop_telemetry_clone_box_works() {
54 let telemetry: Box<dyn Telemetry> = Box::new(NoopTelemetry);
55 let cloned = telemetry.clone();
56 let request = build_request();
57 let span = cloned.runner_span(&request);
58 let _guard = span.enter();
59 }
60}
61
62#[cfg(feature = "otel")]
63pub mod otel {
64 use std::collections::HashMap;
65
66 use opentelemetry::propagation::Extractor;
67 use tracing::Span;
68 use tracing::field::Empty;
69 use tracing_opentelemetry::OpenTelemetrySpanExt;
70
71 use crate::types::ExecutionRequest;
72
73 use super::Telemetry;
74
75 pub struct OtelTelemetry;
76
77 impl Default for OtelTelemetry {
78 fn default() -> Self {
79 Self
80 }
81 }
82
83 impl Telemetry for OtelTelemetry {
84 fn runner_span(&self, request: &ExecutionRequest) -> Span {
85 let span = tracing::info_span!(
86 "rrq.runner",
87 "span.kind" = "consumer",
88 "messaging.system" = "redis",
89 "messaging.destination.name" = %request.context.queue_name,
90 "messaging.destination_kind" = "queue",
91 "messaging.operation" = "process",
92 "rrq.job_id" = %request.job_id,
93 "rrq.function" = %request.function_name,
94 "rrq.queue" = %request.context.queue_name,
95 "rrq.attempt" = request.context.attempt,
96 "rrq.worker_id" = Empty,
97 "rrq.deadline" = Empty,
98 "rrq.outcome" = Empty,
99 "rrq.duration_ms" = Empty,
100 "rrq.retry_delay_ms" = Empty,
101 "rrq.error_message" = Empty,
102 "rrq.error_type" = Empty,
103 );
104
105 if let Some(worker_id) = &request.context.worker_id {
106 span.record("rrq.worker_id", worker_id.as_str());
107 }
108 if let Some(deadline) = &request.context.deadline {
109 let deadline_str = deadline.to_rfc3339();
110 span.record("rrq.deadline", deadline_str.as_str());
111 }
112 if let Some(trace_context) = &request.context.trace_context {
113 let parent = opentelemetry::global::get_text_map_propagator(|prop| {
114 prop.extract(&HashMapExtractor(trace_context))
115 });
116 let _ = span.set_parent(parent);
117 }
118
119 span
120 }
121
122 fn clone_box(&self) -> Box<dyn Telemetry> {
123 Box::new(Self)
124 }
125 }
126
127 struct HashMapExtractor<'a>(&'a HashMap<String, String>);
128
129 impl<'a> Extractor for HashMapExtractor<'a> {
130 fn get(&self, key: &str) -> Option<&str> {
131 self.0.get(key).map(|value| value.as_str())
132 }
133
134 fn keys(&self) -> Vec<&str> {
135 self.0.keys().map(|key| key.as_str()).collect()
136 }
137 }
138
139 pub fn init_tracing(service_name: &str) -> Result<(), Box<dyn std::error::Error>> {
140 use opentelemetry::global;
141 use opentelemetry::trace::TracerProvider as _;
142 use opentelemetry_sdk::Resource;
143 use tracing_subscriber::layer::SubscriberExt;
144 use tracing_subscriber::util::SubscriberInitExt;
145
146 global::set_text_map_propagator(
147 opentelemetry_sdk::propagation::TraceContextPropagator::new(),
148 );
149
150 let exporter = opentelemetry_otlp::SpanExporter::builder()
151 .with_tonic()
152 .build()?;
153 let resource = Resource::builder()
154 .with_service_name(service_name.to_string())
155 .build();
156 let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
157 .with_resource(resource)
158 .with_batch_exporter(exporter)
159 .build();
160
161 let tracer = provider.tracer(service_name.to_string());
162 opentelemetry::global::set_tracer_provider(provider);
163 let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
164 tracing_subscriber::registry()
165 .with(otel_layer)
166 .with(tracing_subscriber::fmt::layer())
167 .try_init()?;
168
169 Ok(())
170 }
171}