Skip to main content

rrq_runner/
telemetry.rs

1use crate::types::ExecutionRequest;
2use tracing::Span;
3
4pub trait Telemetry: Send + Sync {
5    fn runner_span(&self, request: &ExecutionRequest) -> Span;
6    fn clone_box(&self) -> Box<dyn Telemetry>;
7}
8
9impl Clone for Box<dyn Telemetry> {
10    fn clone(&self) -> Self {
11        self.clone_box()
12    }
13}
14
15#[derive(Clone, Default)]
16pub struct NoopTelemetry;
17
18impl Telemetry for NoopTelemetry {
19    fn runner_span(&self, _request: &ExecutionRequest) -> Span {
20        Span::none()
21    }
22
23    fn clone_box(&self) -> Box<dyn Telemetry> {
24        Box::new(self.clone())
25    }
26}
27
28#[cfg(test)]
29mod tests {
30    use super::*;
31    use crate::types::{ExecutionContext, ExecutionRequest};
32
33    fn build_request() -> ExecutionRequest {
34        ExecutionRequest {
35            protocol_version: "2".to_string(),
36            request_id: "req-1".to_string(),
37            job_id: "job-1".to_string(),
38            function_name: "handler".to_string(),
39            params: std::collections::HashMap::new(),
40            context: ExecutionContext {
41                job_id: "job-1".to_string(),
42                attempt: 1,
43                enqueue_time: "2024-01-01T00:00:00Z".parse().unwrap(),
44                queue_name: "default".to_string(),
45                deadline: None,
46                trace_context: None,
47                worker_id: None,
48            },
49        }
50    }
51
52    #[test]
53    fn noop_telemetry_clone_box_works() {
54        let telemetry: Box<dyn Telemetry> = Box::new(NoopTelemetry);
55        let cloned = telemetry.clone();
56        let request = build_request();
57        let span = cloned.runner_span(&request);
58        let _guard = span.enter();
59    }
60}
61
62#[cfg(feature = "otel")]
63pub mod otel {
64    use std::collections::HashMap;
65
66    use opentelemetry::propagation::Extractor;
67    use tracing::Span;
68    use tracing::field::Empty;
69    use tracing_opentelemetry::OpenTelemetrySpanExt;
70
71    use crate::types::ExecutionRequest;
72
73    use super::Telemetry;
74
75    pub struct OtelTelemetry;
76
77    impl Default for OtelTelemetry {
78        fn default() -> Self {
79            Self
80        }
81    }
82
83    impl Telemetry for OtelTelemetry {
84        fn runner_span(&self, request: &ExecutionRequest) -> Span {
85            let span = tracing::info_span!(
86                "rrq.runner",
87                "span.kind" = "consumer",
88                "messaging.system" = "redis",
89                "messaging.destination.name" = %request.context.queue_name,
90                "messaging.destination_kind" = "queue",
91                "messaging.operation" = "process",
92                "rrq.job_id" = %request.job_id,
93                "rrq.function" = %request.function_name,
94                "rrq.queue" = %request.context.queue_name,
95                "rrq.attempt" = request.context.attempt,
96                "rrq.worker_id" = Empty,
97                "rrq.deadline" = Empty,
98                "rrq.outcome" = Empty,
99                "rrq.duration_ms" = Empty,
100                "rrq.retry_delay_ms" = Empty,
101                "rrq.error_message" = Empty,
102                "rrq.error_type" = Empty,
103            );
104
105            if let Some(worker_id) = &request.context.worker_id {
106                span.record("rrq.worker_id", worker_id.as_str());
107            }
108            if let Some(deadline) = &request.context.deadline {
109                let deadline_str = deadline.to_rfc3339();
110                span.record("rrq.deadline", deadline_str.as_str());
111            }
112            if let Some(trace_context) = &request.context.trace_context {
113                let parent = opentelemetry::global::get_text_map_propagator(|prop| {
114                    prop.extract(&HashMapExtractor(trace_context))
115                });
116                let _ = span.set_parent(parent);
117            }
118
119            span
120        }
121
122        fn clone_box(&self) -> Box<dyn Telemetry> {
123            Box::new(Self)
124        }
125    }
126
127    struct HashMapExtractor<'a>(&'a HashMap<String, String>);
128
129    impl<'a> Extractor for HashMapExtractor<'a> {
130        fn get(&self, key: &str) -> Option<&str> {
131            self.0.get(key).map(|value| value.as_str())
132        }
133
134        fn keys(&self) -> Vec<&str> {
135            self.0.keys().map(|key| key.as_str()).collect()
136        }
137    }
138
139    pub fn init_tracing(service_name: &str) -> Result<(), Box<dyn std::error::Error>> {
140        use opentelemetry::global;
141        use opentelemetry::trace::TracerProvider as _;
142        use opentelemetry_sdk::Resource;
143        use tracing_subscriber::layer::SubscriberExt;
144        use tracing_subscriber::util::SubscriberInitExt;
145
146        global::set_text_map_propagator(
147            opentelemetry_sdk::propagation::TraceContextPropagator::new(),
148        );
149
150        let exporter = opentelemetry_otlp::SpanExporter::builder()
151            .with_tonic()
152            .build()?;
153        let resource = Resource::builder()
154            .with_service_name(service_name.to_string())
155            .build();
156        let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
157            .with_resource(resource)
158            .with_batch_exporter(exporter)
159            .build();
160
161        let tracer = provider.tracer(service_name.to_string());
162        opentelemetry::global::set_tracer_provider(provider);
163        let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
164        tracing_subscriber::registry()
165            .with(otel_layer)
166            .with(tracing_subscriber::fmt::layer())
167            .try_init()?;
168
169        Ok(())
170    }
171}