1use crate::types::ExecutionRequest;
2use crate::types::OutcomeStatus;
3use tracing::Span;
4
5pub trait Telemetry: Send + Sync {
6 fn runner_span(&self, request: &ExecutionRequest) -> Span;
7 fn clone_box(&self) -> Box<dyn Telemetry>;
8}
9
10impl Clone for Box<dyn Telemetry> {
11 fn clone(&self) -> Self {
12 self.clone_box()
13 }
14}
15
16#[derive(Clone, Default)]
17pub struct NoopTelemetry;
18
19impl Telemetry for NoopTelemetry {
20 fn runner_span(&self, _request: &ExecutionRequest) -> Span {
21 Span::none()
22 }
23
24 fn clone_box(&self) -> Box<dyn Telemetry> {
25 Box::new(self.clone())
26 }
27}
28
29#[cfg(not(feature = "otel"))]
30pub fn record_runner_inflight_delta(_delta: i64) {}
31
32#[cfg(not(feature = "otel"))]
33pub fn record_runner_channel_pressure(_pressure: usize) {}
34
35#[cfg(not(feature = "otel"))]
36pub fn record_deadline_expired() {}
37
38#[cfg(not(feature = "otel"))]
39pub fn record_cancellation(_scope: &str) {}
40
41#[cfg(not(feature = "otel"))]
42pub fn record_job_outcome(
43 _function_name: &str,
44 _outcome: OutcomeStatus,
45 _duration: std::time::Duration,
46) {
47}
48
49#[cfg(feature = "otel")]
50pub fn record_runner_inflight_delta(delta: i64) {
51 otel::record_runner_inflight_delta(delta);
52}
53
54#[cfg(feature = "otel")]
55pub fn record_runner_channel_pressure(pressure: usize) {
56 otel::record_runner_channel_pressure(pressure);
57}
58
59#[cfg(feature = "otel")]
60pub fn record_deadline_expired() {
61 otel::record_deadline_expired();
62}
63
64#[cfg(feature = "otel")]
65pub fn record_cancellation(scope: &str) {
66 otel::record_cancellation(scope);
67}
68
69#[cfg(feature = "otel")]
70pub fn record_job_outcome(
71 function_name: &str,
72 outcome: OutcomeStatus,
73 duration: std::time::Duration,
74) {
75 otel::record_job_outcome(function_name, outcome, duration);
76}
77
78#[cfg(test)]
79mod tests {
80 use super::*;
81 use crate::types::{ExecutionContext, ExecutionRequest};
82
83 fn build_request() -> ExecutionRequest {
84 ExecutionRequest {
85 protocol_version: "2".to_string(),
86 request_id: "req-1".to_string(),
87 job_id: "job-1".to_string(),
88 function_name: "handler".to_string(),
89 params: std::collections::HashMap::new(),
90 context: ExecutionContext {
91 job_id: "job-1".to_string(),
92 attempt: 1,
93 enqueue_time: "2024-01-01T00:00:00Z".parse().unwrap(),
94 queue_name: "default".to_string(),
95 deadline: None,
96 trace_context: None,
97 correlation_context: None,
98 worker_id: None,
99 },
100 }
101 }
102
103 #[test]
104 fn noop_telemetry_clone_box_works() {
105 let telemetry: Box<dyn Telemetry> = Box::new(NoopTelemetry);
106 let cloned = telemetry.clone();
107 let request = build_request();
108 let span = cloned.runner_span(&request);
109 let _guard = span.enter();
110 }
111}
112
113#[cfg(feature = "otel")]
114pub mod otel {
115 use std::collections::HashMap;
116 use std::sync::OnceLock;
117
118 use chrono::Utc;
119 use opentelemetry::metrics::{Counter, Histogram, Meter, UpDownCounter};
120 use opentelemetry::propagation::Extractor;
121 use opentelemetry::trace::TraceContextExt;
122 use opentelemetry::trace::TracerProvider as _;
123 use opentelemetry::{KeyValue, global};
124 use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
125 use opentelemetry_otlp::{WithExportConfig, WithHttpConfig};
126 use opentelemetry_sdk::metrics::periodic_reader_with_async_runtime::PeriodicReader as AsyncPeriodicReader;
127 use opentelemetry_sdk::runtime;
128 use opentelemetry_sdk::trace::span_processor_with_async_runtime::BatchSpanProcessor;
129 use rrq_config::{OtlpEnvConfig, OtlpGlobalEndpointStyle, OtlpSignal, resolve_otlp_env};
130 use tracing::Span;
131 use tracing::field::Empty;
132 use tracing_opentelemetry::OpenTelemetrySpanExt;
133
134 use crate::types::ExecutionRequest;
135 use crate::types::OutcomeStatus;
136
137 use super::Telemetry;
138
139 static RUNNER_METRICS: OnceLock<RunnerMetrics> = OnceLock::new();
140 static METRICS_ENDPOINT_CONFIGURED: OnceLock<bool> = OnceLock::new();
141 static TRACE_PROVIDER: OnceLock<opentelemetry_sdk::trace::SdkTracerProvider> = OnceLock::new();
142 static LOG_PROVIDER: OnceLock<opentelemetry_sdk::logs::SdkLoggerProvider> = OnceLock::new();
143 static METER_PROVIDER: OnceLock<opentelemetry_sdk::metrics::SdkMeterProvider> = OnceLock::new();
144 static RUNNER_LABEL: OnceLock<String> = OnceLock::new();
145
146 #[derive(Clone)]
147 struct RunnerMetrics {
148 runner_inflight: UpDownCounter<i64>,
149 runner_channel_pressure: Histogram<f64>,
150 deadline_expired_total: Counter<u64>,
151 cancellations_total: Counter<u64>,
152 runner_jobs_total: Counter<u64>,
153 runner_job_duration_ms: Histogram<f64>,
154 }
155
156 impl RunnerMetrics {
157 fn new(meter: &Meter) -> Self {
158 Self {
159 runner_inflight: meter.i64_up_down_counter("rrq_runner_inflight").build(),
160 runner_channel_pressure: meter.f64_histogram("rrq_runner_channel_pressure").build(),
161 deadline_expired_total: meter.u64_counter("rrq_deadline_expired_total").build(),
162 cancellations_total: meter.u64_counter("rrq_cancellations_total").build(),
163 runner_jobs_total: meter.u64_counter("rrq_runner_jobs_total").build(),
164 runner_job_duration_ms: meter.f64_histogram("rrq_runner_job_duration_ms").build(),
165 }
166 }
167 }
168
169 fn runner_label() -> String {
170 if let Some(value) = RUNNER_LABEL.get() {
171 return value.clone();
172 }
173 match std::env::var("OTEL_SERVICE_NAME") {
174 Ok(value) if !value.is_empty() => value,
175 _ => std::env::var("SERVICE_NAME").unwrap_or_else(|_| "rrq-runner".to_string()),
176 }
177 }
178
179 fn metrics_endpoint_configured() -> bool {
180 *METRICS_ENDPOINT_CONFIGURED.get_or_init(|| {
181 resolve_otlp_env(OtlpGlobalEndpointStyle::AppendHttpSignalPath)
182 .signal(OtlpSignal::Metrics)
183 .endpoint
184 .is_some()
185 })
186 }
187
188 fn runner_metrics() -> Option<RunnerMetrics> {
189 if let Some(metrics) = RUNNER_METRICS.get() {
190 return Some(metrics.clone());
191 }
192 if !metrics_endpoint_configured() {
193 return None;
194 }
195
196 let meter = global::meter("rrq.runner");
200 Some(RunnerMetrics::new(&meter))
201 }
202
203 pub fn record_runner_inflight_delta(delta: i64) {
204 if delta == 0 {
205 return;
206 }
207 let Some(metrics) = runner_metrics() else {
208 return;
209 };
210 metrics
211 .runner_inflight
212 .add(delta, &[KeyValue::new("runner", runner_label())]);
213 }
214
215 pub fn record_runner_channel_pressure(pressure: usize) {
216 let Some(metrics) = runner_metrics() else {
217 return;
218 };
219 metrics
220 .runner_channel_pressure
221 .record(pressure as f64, &[KeyValue::new("runner", runner_label())]);
222 }
223
224 pub fn record_deadline_expired() {
225 let Some(metrics) = runner_metrics() else {
226 return;
227 };
228 metrics
229 .deadline_expired_total
230 .add(1, &[KeyValue::new("runner", runner_label())]);
231 }
232
233 pub fn record_cancellation(scope: &str) {
234 let Some(metrics) = runner_metrics() else {
235 return;
236 };
237 metrics.cancellations_total.add(
238 1,
239 &[
240 KeyValue::new("runner", runner_label()),
241 KeyValue::new("scope", scope.to_string()),
242 ],
243 );
244 }
245
246 pub fn record_job_outcome(
247 function_name: &str,
248 outcome: OutcomeStatus,
249 duration: std::time::Duration,
250 ) {
251 let Some(metrics) = runner_metrics() else {
252 return;
253 };
254 let outcome = match outcome {
255 OutcomeStatus::Success => "success",
256 OutcomeStatus::Retry => "retry",
257 OutcomeStatus::Timeout => "timeout",
258 OutcomeStatus::Error => "error",
259 };
260 let attrs = [
261 KeyValue::new("runner", runner_label()),
262 KeyValue::new("outcome", outcome.to_string()),
263 KeyValue::new("function", function_name.to_string()),
264 ];
265 metrics.runner_jobs_total.add(1, &attrs);
266 metrics
267 .runner_job_duration_ms
268 .record(duration.as_secs_f64() * 1000.0, &attrs);
269 }
270
271 pub struct OtelTelemetry;
272
273 impl Default for OtelTelemetry {
274 fn default() -> Self {
275 Self
276 }
277 }
278
279 impl Telemetry for OtelTelemetry {
280 fn runner_span(&self, request: &ExecutionRequest) -> Span {
281 let queue_wait_ms = Utc::now()
282 .signed_duration_since(request.context.enqueue_time)
283 .num_milliseconds()
284 .max(0) as f64;
285 let span = tracing::info_span!(
286 "rrq.runner",
287 "span.kind" = "consumer",
288 "messaging.system" = "redis",
289 "messaging.destination.name" = %request.context.queue_name,
290 "messaging.destination_kind" = "queue",
291 "messaging.operation" = "process",
292 "rrq.job_id" = %request.job_id,
293 "rrq.function" = %request.function_name,
294 "rrq.queue" = %request.context.queue_name,
295 "rrq.attempt" = request.context.attempt,
296 "rrq.worker_id" = Empty,
297 "rrq.deadline" = Empty,
298 "rrq.deadline_remaining_ms" = Empty,
299 "rrq.queue_wait_ms" = queue_wait_ms,
300 "rrq.outcome" = Empty,
301 "rrq.duration_ms" = Empty,
302 "rrq.retry_delay_ms" = Empty,
303 "rrq.error_message" = Empty,
304 "rrq.error_type" = Empty,
305 );
306
307 if let Some(worker_id) = &request.context.worker_id {
308 span.record("rrq.worker_id", worker_id.as_str());
309 }
310 if let Some(deadline) = &request.context.deadline {
311 let deadline_str = deadline.to_rfc3339();
312 span.record("rrq.deadline", deadline_str.as_str());
313 let remaining_ms = deadline
314 .signed_duration_since(Utc::now())
315 .num_milliseconds();
316 span.record("rrq.deadline_remaining_ms", (remaining_ms.max(0)) as f64);
317 }
318 if let Some(trace_context) = &request.context.trace_context {
319 let parent = opentelemetry::global::get_text_map_propagator(|prop| {
320 prop.extract(&HashMapExtractor(trace_context))
321 });
322 let _ = span.set_parent(parent);
323 }
324 if let Some(correlation_context) = &request.context.correlation_context {
325 let span_context = span.context();
326 let otel_span = span_context.span();
327 for (key, value) in correlation_context {
328 if key.is_empty() || value.is_empty() {
329 continue;
330 }
331 otel_span.set_attribute(KeyValue::new(key.clone(), value.clone()));
332 }
333 }
334
335 span
336 }
337
338 fn clone_box(&self) -> Box<dyn Telemetry> {
339 Box::new(Self)
340 }
341 }
342
343 struct HashMapExtractor<'a>(&'a HashMap<String, String>);
344
345 impl<'a> Extractor for HashMapExtractor<'a> {
346 fn get(&self, key: &str) -> Option<&str> {
347 self.0.get(key).map(|value| value.as_str())
348 }
349
350 fn keys(&self) -> Vec<&str> {
351 self.0.keys().map(|key| key.as_str()).collect()
352 }
353 }
354
355 pub fn init_tracing(service_name: &str) -> Result<(), Box<dyn std::error::Error>> {
356 use opentelemetry_sdk::Resource;
357 use tracing_subscriber::layer::SubscriberExt;
358 use tracing_subscriber::util::SubscriberInitExt;
359
360 let otlp = resolve_otlp_env(OtlpGlobalEndpointStyle::AppendHttpSignalPath);
361 opentelemetry::global::set_text_map_propagator(
362 opentelemetry_sdk::propagation::TraceContextPropagator::new(),
363 );
364
365 let resource = Resource::builder()
366 .with_service_name(service_name.to_string())
367 .build();
368 let (trace_layer, trace_error) = init_trace_layer(service_name, resource.clone(), &otlp);
369 let (log_layer, logs_error) = init_logs_layer(resource.clone(), &otlp);
370 let metrics_error = init_metrics_provider(service_name, resource, &otlp).err();
371 match (trace_layer, log_layer) {
372 (Some(trace_layer), Some(log_layer)) => tracing_subscriber::registry()
373 .with(trace_layer)
374 .with(log_layer)
375 .with(tracing_subscriber::fmt::layer())
376 .try_init()?,
377 (Some(trace_layer), None) => tracing_subscriber::registry()
378 .with(trace_layer)
379 .with(tracing_subscriber::fmt::layer())
380 .try_init()?,
381 (None, Some(log_layer)) => tracing_subscriber::registry()
382 .with(log_layer)
383 .with(tracing_subscriber::fmt::layer())
384 .try_init()?,
385 (None, None) => tracing_subscriber::registry()
386 .with(tracing_subscriber::fmt::layer())
387 .try_init()?,
388 };
389 warn_if_global_otlp_endpoint_only(&otlp);
390 if let Some(error) = trace_error {
391 tracing::warn!(error = %error, "OpenTelemetry tracing exporter failed to initialize");
392 }
393 if let Some(error) = metrics_error {
394 tracing::warn!(error = %error, "OpenTelemetry metrics exporter failed to initialize");
395 }
396 if let Some(error) = logs_error {
397 tracing::warn!(error = %error, "OpenTelemetry logs exporter failed to initialize");
398 }
399
400 Ok(())
401 }
402
403 fn init_trace_layer(
404 service_name: &str,
405 resource: opentelemetry_sdk::Resource,
406 otlp: &OtlpEnvConfig,
407 ) -> (
408 Option<
409 tracing_opentelemetry::OpenTelemetryLayer<
410 tracing_subscriber::Registry,
411 opentelemetry_sdk::trace::Tracer,
412 >,
413 >,
414 Option<String>,
415 ) {
416 let Some(endpoint) = otlp.signal(OtlpSignal::Traces).endpoint.as_deref() else {
417 return (None, None);
418 };
419
420 let exporter = match opentelemetry_otlp::SpanExporter::builder()
421 .with_http()
422 .with_endpoint(endpoint.to_string())
423 .with_headers(otlp.signal(OtlpSignal::Traces).headers.clone())
424 .build()
425 {
426 Ok(exporter) => exporter,
427 Err(err) => return (None, Some(err.to_string())),
428 };
429 let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
430 .with_resource(resource)
431 .with_span_processor(BatchSpanProcessor::builder(exporter, runtime::Tokio).build())
432 .build();
433 let tracer = provider.tracer(service_name.to_string());
434 let _ = TRACE_PROVIDER.set(provider.clone());
435 opentelemetry::global::set_tracer_provider(provider);
436 let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
437 (Some(otel_layer), None)
438 }
439
440 fn init_metrics_provider(
441 service_name: &str,
442 resource: opentelemetry_sdk::Resource,
443 otlp: &OtlpEnvConfig,
444 ) -> Result<(), Box<dyn std::error::Error>> {
445 let Some(endpoint) = otlp.signal(OtlpSignal::Metrics).endpoint.as_deref() else {
446 return Ok(());
447 };
448 let exporter = opentelemetry_otlp::MetricExporter::builder()
449 .with_http()
450 .with_endpoint(endpoint.to_string())
451 .with_headers(otlp.signal(OtlpSignal::Metrics).headers.clone())
452 .build()?;
453 let reader = AsyncPeriodicReader::builder(exporter, runtime::Tokio).build();
454 let meter_provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder()
455 .with_resource(resource)
456 .with_reader(reader)
457 .build();
458 global::set_meter_provider(meter_provider.clone());
459 let meter = global::meter("rrq.runner");
460 let _ = RUNNER_METRICS.set(RunnerMetrics::new(&meter));
461 let _ = RUNNER_LABEL.set(service_name.to_string());
462 let _ = METER_PROVIDER.set(meter_provider);
463 Ok(())
464 }
465
466 fn init_logs_layer(
467 resource: opentelemetry_sdk::Resource,
468 otlp: &OtlpEnvConfig,
469 ) -> (
470 Option<
471 OpenTelemetryTracingBridge<
472 opentelemetry_sdk::logs::SdkLoggerProvider,
473 opentelemetry_sdk::logs::SdkLogger,
474 >,
475 >,
476 Option<String>,
477 ) {
478 let Some(endpoint) = otlp.signal(OtlpSignal::Logs).endpoint.as_deref() else {
479 return (None, None);
480 };
481
482 let exporter = match opentelemetry_otlp::LogExporter::builder()
483 .with_http()
484 .with_endpoint(endpoint.to_string())
485 .with_headers(otlp.signal(OtlpSignal::Logs).headers.clone())
486 .build()
487 {
488 Ok(exporter) => exporter,
489 Err(err) => return (None, Some(err.to_string())),
490 };
491 let provider = opentelemetry_sdk::logs::SdkLoggerProvider::builder()
492 .with_resource(resource)
493 .with_log_processor(
494 opentelemetry_sdk::logs::log_processor_with_async_runtime::BatchLogProcessor::builder(
495 exporter,
496 opentelemetry_sdk::runtime::Tokio,
497 )
498 .build(),
499 )
500 .build();
501 let _ = LOG_PROVIDER.set(provider);
502 let Some(provider_ref) = LOG_PROVIDER.get() else {
503 return (
504 None,
505 Some("failed to initialize OpenTelemetry logger provider".to_string()),
506 );
507 };
508 let layer = OpenTelemetryTracingBridge::new(provider_ref);
509 (Some(layer), None)
510 }
511
512 fn warn_if_global_otlp_endpoint_only(otlp: &OtlpEnvConfig) {
513 if !otlp.has_global_endpoint() {
514 return;
515 }
516 let signal_overrides = otlp.explicit_signal_endpoint_count();
517 if signal_overrides < 3 {
518 tracing::debug!(
519 signal_overrides,
520 "using OTEL_EXPORTER_OTLP_ENDPOINT fallback for signals without explicit OTEL_EXPORTER_OTLP_{{TRACES|METRICS|LOGS}}_ENDPOINT"
521 );
522 }
523 }
524
525 #[cfg(test)]
526 mod tests {
527 use std::collections::HashMap;
528 use std::sync::{Mutex, OnceLock};
529
530 use opentelemetry_sdk::Resource;
531 use rrq_config::OtlpSignalConfig;
532
533 use super::*;
534
535 static TEST_MUTEX: OnceLock<Mutex<()>> = OnceLock::new();
536
537 fn test_lock() -> std::sync::MutexGuard<'static, ()> {
538 TEST_MUTEX
539 .get_or_init(|| Mutex::new(()))
540 .lock()
541 .expect("test mutex poisoned")
542 }
543
544 #[tokio::test]
545 async fn early_metric_emission_does_not_prebind_metrics_cache() {
546 let _guard = test_lock();
547 assert!(
548 METER_PROVIDER.get().is_none(),
549 "test requires uninitialized meter provider"
550 );
551 assert!(
552 RUNNER_METRICS.get().is_none(),
553 "test requires empty runner metrics cache"
554 );
555 assert!(
556 RUNNER_LABEL.get().is_none(),
557 "test requires empty runner label cache"
558 );
559
560 let _pre_init = runner_label();
563 assert!(
564 RUNNER_LABEL.get().is_none(),
565 "runner_label must not lock runner label before telemetry init"
566 );
567
568 record_deadline_expired();
571 assert!(
572 RUNNER_METRICS.get().is_none(),
573 "early metric emission must not cache no-op instruments"
574 );
575 assert!(
576 METRICS_ENDPOINT_CONFIGURED.get().is_some(),
577 "metrics endpoint configuration should be cached after first emission"
578 );
579 assert!(
580 RUNNER_LABEL.get().is_none(),
581 "early metric emission must not lock runner label before telemetry init"
582 );
583
584 let otlp = OtlpEnvConfig {
585 metrics: OtlpSignalConfig {
586 endpoint: Some("http://127.0.0.1:4318/v1/metrics".to_string()),
587 headers: HashMap::new(),
588 has_explicit_endpoint_env: true,
589 },
590 ..OtlpEnvConfig::default()
591 };
592 let resource = Resource::builder()
593 .with_service_name("rrq-runner-test".to_string())
594 .build();
595
596 init_metrics_provider("rrq-runner-test", resource, &otlp)
597 .expect("metrics provider should initialize");
598
599 assert!(
600 METER_PROVIDER.get().is_some(),
601 "metrics provider should be cached after init"
602 );
603 assert!(
604 RUNNER_METRICS.get().is_some(),
605 "runner metrics cache should initialize after provider setup"
606 );
607 assert_eq!(
608 RUNNER_LABEL.get().map(String::as_str),
609 Some("rrq-runner-test"),
610 "metrics init should set runner label to configured service name"
611 );
612 }
613 }
614}