1use crate::types::ExecutionRequest;
2use crate::types::OutcomeStatus;
3use tracing::Span;
4
5pub trait Telemetry: Send + Sync {
6 fn runner_span(&self, request: &ExecutionRequest) -> Span;
7 fn clone_box(&self) -> Box<dyn Telemetry>;
8}
9
10impl Clone for Box<dyn Telemetry> {
11 fn clone(&self) -> Self {
12 self.clone_box()
13 }
14}
15
16#[derive(Clone, Default)]
17pub struct NoopTelemetry;
18
19impl Telemetry for NoopTelemetry {
20 fn runner_span(&self, _request: &ExecutionRequest) -> Span {
21 Span::none()
22 }
23
24 fn clone_box(&self) -> Box<dyn Telemetry> {
25 Box::new(self.clone())
26 }
27}
28
29#[cfg(not(feature = "otel"))]
30pub fn record_runner_inflight_delta(_delta: i64) {}
31
32#[cfg(not(feature = "otel"))]
33pub fn record_runner_channel_pressure(_pressure: usize) {}
34
35#[cfg(not(feature = "otel"))]
36pub fn record_deadline_expired() {}
37
38#[cfg(not(feature = "otel"))]
39pub fn record_cancellation(_scope: &str) {}
40
41#[cfg(not(feature = "otel"))]
42pub fn record_job_outcome(
43 _function_name: &str,
44 _outcome: OutcomeStatus,
45 _duration: std::time::Duration,
46) {
47}
48
49#[cfg(feature = "otel")]
50pub fn record_runner_inflight_delta(delta: i64) {
51 otel::record_runner_inflight_delta(delta);
52}
53
54#[cfg(feature = "otel")]
55pub fn record_runner_channel_pressure(pressure: usize) {
56 otel::record_runner_channel_pressure(pressure);
57}
58
59#[cfg(feature = "otel")]
60pub fn record_deadline_expired() {
61 otel::record_deadline_expired();
62}
63
64#[cfg(feature = "otel")]
65pub fn record_cancellation(scope: &str) {
66 otel::record_cancellation(scope);
67}
68
69#[cfg(feature = "otel")]
70pub fn record_job_outcome(
71 function_name: &str,
72 outcome: OutcomeStatus,
73 duration: std::time::Duration,
74) {
75 otel::record_job_outcome(function_name, outcome, duration);
76}
77
78#[cfg(test)]
79mod tests {
80 use super::*;
81 use crate::types::{ExecutionContext, ExecutionRequest};
82
83 fn build_request() -> ExecutionRequest {
84 ExecutionRequest {
85 protocol_version: "2".to_string(),
86 request_id: "req-1".to_string(),
87 job_id: "job-1".to_string(),
88 function_name: "handler".to_string(),
89 params: std::collections::HashMap::new(),
90 context: ExecutionContext {
91 job_id: "job-1".to_string(),
92 attempt: 1,
93 enqueue_time: "2024-01-01T00:00:00Z".parse().unwrap(),
94 queue_name: "default".to_string(),
95 deadline: None,
96 trace_context: None,
97 correlation_context: None,
98 worker_id: None,
99 },
100 }
101 }
102
103 #[test]
104 fn noop_telemetry_clone_box_works() {
105 let telemetry: Box<dyn Telemetry> = Box::new(NoopTelemetry);
106 let cloned = telemetry.clone();
107 let request = build_request();
108 let span = cloned.runner_span(&request);
109 let _guard = span.enter();
110 }
111}
112
113#[cfg(feature = "otel")]
114pub mod otel {
115 use std::collections::HashMap;
116 use std::sync::OnceLock;
117
118 use chrono::Utc;
119 use opentelemetry::metrics::{Counter, Histogram, Meter, UpDownCounter};
120 use opentelemetry::propagation::Extractor;
121 use opentelemetry::trace::TraceContextExt;
122 use opentelemetry::trace::TracerProvider as _;
123 use opentelemetry::{KeyValue, global};
124 use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
125 use opentelemetry_otlp::{WithExportConfig, WithHttpConfig};
126 use opentelemetry_sdk::metrics::periodic_reader_with_async_runtime::PeriodicReader as AsyncPeriodicReader;
127 use opentelemetry_sdk::runtime;
128 use opentelemetry_sdk::trace::span_processor_with_async_runtime::BatchSpanProcessor;
129 use rrq_config::{OtlpEnvConfig, OtlpGlobalEndpointStyle, OtlpSignal, resolve_otlp_env};
130 use tracing::Span;
131 use tracing::field::Empty;
132 use tracing_opentelemetry::OpenTelemetrySpanExt;
133
134 use crate::types::ExecutionRequest;
135 use crate::types::OutcomeStatus;
136
137 use super::Telemetry;
138
139 static RUNNER_METRICS: OnceLock<RunnerMetrics> = OnceLock::new();
140 static METRICS_ENDPOINT_CONFIGURED: OnceLock<bool> = OnceLock::new();
141 static TRACE_PROVIDER: OnceLock<opentelemetry_sdk::trace::SdkTracerProvider> = OnceLock::new();
142 static LOG_PROVIDER: OnceLock<opentelemetry_sdk::logs::SdkLoggerProvider> = OnceLock::new();
143 static METER_PROVIDER: OnceLock<opentelemetry_sdk::metrics::SdkMeterProvider> = OnceLock::new();
144 static RUNNER_LABEL: OnceLock<String> = OnceLock::new();
145
146 #[derive(Clone)]
147 struct RunnerMetrics {
148 runner_inflight: UpDownCounter<i64>,
149 runner_channel_pressure: Histogram<f64>,
150 deadline_expired_total: Counter<u64>,
151 cancellations_total: Counter<u64>,
152 runner_jobs_total: Counter<u64>,
153 runner_job_duration_ms: Histogram<f64>,
154 }
155
156 impl RunnerMetrics {
157 fn new(meter: &Meter) -> Self {
158 Self {
159 runner_inflight: meter.i64_up_down_counter("rrq_runner_inflight").build(),
160 runner_channel_pressure: meter.f64_histogram("rrq_runner_channel_pressure").build(),
161 deadline_expired_total: meter.u64_counter("rrq_deadline_expired_total").build(),
162 cancellations_total: meter.u64_counter("rrq_cancellations_total").build(),
163 runner_jobs_total: meter.u64_counter("rrq_runner_jobs_total").build(),
164 runner_job_duration_ms: meter.f64_histogram("rrq_runner_job_duration_ms").build(),
165 }
166 }
167 }
168
169 fn runner_label() -> String {
170 if let Some(value) = RUNNER_LABEL.get() {
171 return value.clone();
172 }
173 match std::env::var("OTEL_SERVICE_NAME") {
174 Ok(value) if !value.is_empty() => value,
175 _ => std::env::var("SERVICE_NAME").unwrap_or_else(|_| "rrq-runner".to_string()),
176 }
177 }
178
179 fn metrics_endpoint_configured() -> bool {
180 *METRICS_ENDPOINT_CONFIGURED.get_or_init(|| {
181 resolve_otlp_env(OtlpGlobalEndpointStyle::AppendHttpSignalPath)
182 .signal(OtlpSignal::Metrics)
183 .endpoint
184 .is_some()
185 })
186 }
187
188 fn runner_metrics() -> Option<RunnerMetrics> {
189 if let Some(metrics) = RUNNER_METRICS.get() {
190 return Some(metrics.clone());
191 }
192 if !metrics_endpoint_configured() {
193 return None;
194 }
195
196 let meter = global::meter("rrq.runner");
200 Some(RunnerMetrics::new(&meter))
201 }
202
203 pub fn record_runner_inflight_delta(delta: i64) {
204 if delta == 0 {
205 return;
206 }
207 let Some(metrics) = runner_metrics() else {
208 return;
209 };
210 metrics
211 .runner_inflight
212 .add(delta, &[KeyValue::new("runner", runner_label())]);
213 }
214
215 pub fn record_runner_channel_pressure(pressure: usize) {
216 let Some(metrics) = runner_metrics() else {
217 return;
218 };
219 metrics
220 .runner_channel_pressure
221 .record(pressure as f64, &[KeyValue::new("runner", runner_label())]);
222 }
223
224 pub fn record_deadline_expired() {
225 let Some(metrics) = runner_metrics() else {
226 return;
227 };
228 metrics
229 .deadline_expired_total
230 .add(1, &[KeyValue::new("runner", runner_label())]);
231 }
232
233 pub fn record_cancellation(scope: &str) {
234 let Some(metrics) = runner_metrics() else {
235 return;
236 };
237 metrics.cancellations_total.add(
238 1,
239 &[
240 KeyValue::new("runner", runner_label()),
241 KeyValue::new("scope", scope.to_string()),
242 ],
243 );
244 }
245
246 pub fn record_job_outcome(
247 function_name: &str,
248 outcome: OutcomeStatus,
249 duration: std::time::Duration,
250 ) {
251 let Some(metrics) = runner_metrics() else {
252 return;
253 };
254 let outcome = match outcome {
255 OutcomeStatus::Success => "success",
256 OutcomeStatus::Retry => "retry",
257 OutcomeStatus::Timeout => "timeout",
258 OutcomeStatus::Error => "error",
259 };
260 let attrs = [
261 KeyValue::new("runner", runner_label()),
262 KeyValue::new("outcome", outcome.to_string()),
263 KeyValue::new("function", function_name.to_string()),
264 ];
265 metrics.runner_jobs_total.add(1, &attrs);
266 metrics
267 .runner_job_duration_ms
268 .record(duration.as_secs_f64() * 1000.0, &attrs);
269 }
270
271 pub struct OtelTelemetry;
272
273 impl Default for OtelTelemetry {
274 fn default() -> Self {
275 Self
276 }
277 }
278
279 impl Telemetry for OtelTelemetry {
280 fn runner_span(&self, request: &ExecutionRequest) -> Span {
281 let queue_wait_ms = Utc::now()
282 .signed_duration_since(request.context.enqueue_time)
283 .num_milliseconds()
284 .max(0) as f64;
285 let span = tracing::info_span!(
286 "rrq.runner",
287 "span.kind" = "consumer",
288 "messaging.system" = "redis",
289 "messaging.destination.name" = %request.context.queue_name,
290 "messaging.destination_kind" = "queue",
291 "messaging.operation" = "process",
292 "rrq.job_id" = %request.job_id,
293 "rrq.function" = %request.function_name,
294 "rrq.queue" = %request.context.queue_name,
295 "rrq.attempt" = request.context.attempt,
296 "rrq.worker_id" = Empty,
297 "rrq.deadline" = Empty,
298 "rrq.deadline_remaining_ms" = Empty,
299 "rrq.queue_wait_ms" = queue_wait_ms,
300 "rrq.outcome" = Empty,
301 "rrq.duration_ms" = Empty,
302 "rrq.retry_delay_ms" = Empty,
303 "rrq.error_message" = Empty,
304 "rrq.error_type" = Empty,
305 );
306
307 if let Some(worker_id) = &request.context.worker_id {
308 span.record("rrq.worker_id", worker_id.as_str());
309 }
310 if let Some(deadline) = &request.context.deadline {
311 let deadline_str = deadline.to_rfc3339();
312 span.record("rrq.deadline", deadline_str.as_str());
313 let remaining_ms = deadline
314 .signed_duration_since(Utc::now())
315 .num_milliseconds();
316 span.record("rrq.deadline_remaining_ms", (remaining_ms.max(0)) as f64);
317 }
318 if let Some(trace_context) = &request.context.trace_context {
319 let parent = opentelemetry::global::get_text_map_propagator(|prop| {
320 prop.extract(&HashMapExtractor(trace_context))
321 });
322 let _ = span.set_parent(parent);
323 }
324 if let Some(correlation_context) = &request.context.correlation_context {
325 let span_context = span.context();
326 let otel_span = span_context.span();
327 for (key, value) in correlation_context {
328 if key.is_empty() || value.is_empty() {
329 continue;
330 }
331 otel_span.set_attribute(KeyValue::new(key.clone(), value.clone()));
332 }
333 }
334
335 span
336 }
337
338 fn clone_box(&self) -> Box<dyn Telemetry> {
339 Box::new(Self)
340 }
341 }
342
343 struct HashMapExtractor<'a>(&'a HashMap<String, String>);
344
345 impl<'a> Extractor for HashMapExtractor<'a> {
346 fn get(&self, key: &str) -> Option<&str> {
347 self.0.get(key).map(|value| value.as_str())
348 }
349
350 fn keys(&self) -> Vec<&str> {
351 self.0.keys().map(|key| key.as_str()).collect()
352 }
353 }
354
355 pub fn init_tracing(service_name: &str) -> Result<(), Box<dyn std::error::Error>> {
356 use opentelemetry_sdk::Resource;
357 use tracing_subscriber::layer::SubscriberExt;
358 use tracing_subscriber::util::SubscriberInitExt;
359
360 let otlp = resolve_otlp_env(OtlpGlobalEndpointStyle::AppendHttpSignalPath);
361 opentelemetry::global::set_text_map_propagator(
362 opentelemetry_sdk::propagation::TraceContextPropagator::new(),
363 );
364
365 let resource = Resource::builder()
366 .with_service_name(service_name.to_string())
367 .build();
368 let (trace_layer, trace_error) = init_trace_layer(service_name, resource.clone(), &otlp);
369 let (log_layer, logs_error) = init_logs_layer(resource.clone(), &otlp);
370 let metrics_error = init_metrics_provider(service_name, resource, &otlp).err();
371 match (trace_layer, log_layer) {
372 (Some(trace_layer), Some(log_layer)) => tracing_subscriber::registry()
373 .with(trace_layer)
374 .with(log_layer)
375 .with(tracing_subscriber::fmt::layer())
376 .try_init()?,
377 (Some(trace_layer), None) => tracing_subscriber::registry()
378 .with(trace_layer)
379 .with(tracing_subscriber::fmt::layer())
380 .try_init()?,
381 (None, Some(log_layer)) => tracing_subscriber::registry()
382 .with(log_layer)
383 .with(tracing_subscriber::fmt::layer())
384 .try_init()?,
385 (None, None) => tracing_subscriber::registry()
386 .with(tracing_subscriber::fmt::layer())
387 .try_init()?,
388 };
389 warn_if_global_otlp_endpoint_only(&otlp);
390 if let Some(error) = trace_error {
391 tracing::warn!(error = %error, "OpenTelemetry tracing exporter failed to initialize");
392 }
393 if let Some(error) = metrics_error {
394 tracing::warn!(error = %error, "OpenTelemetry metrics exporter failed to initialize");
395 }
396 if let Some(error) = logs_error {
397 tracing::warn!(error = %error, "OpenTelemetry logs exporter failed to initialize");
398 }
399
400 Ok(())
401 }
402
403 fn init_trace_layer(
404 service_name: &str,
405 resource: opentelemetry_sdk::Resource,
406 otlp: &OtlpEnvConfig,
407 ) -> (
408 Option<
409 tracing_opentelemetry::OpenTelemetryLayer<
410 tracing_subscriber::Registry,
411 opentelemetry_sdk::trace::Tracer,
412 >,
413 >,
414 Option<String>,
415 ) {
416 let Some(endpoint) = otlp.signal(OtlpSignal::Traces).endpoint.as_deref() else {
417 return (None, None);
418 };
419
420 let exporter = match opentelemetry_otlp::SpanExporter::builder()
421 .with_http()
422 .with_endpoint(endpoint.to_string())
423 .with_headers(otlp.signal(OtlpSignal::Traces).headers.clone())
424 .build()
425 {
426 Ok(exporter) => exporter,
427 Err(err) => return (None, Some(err.to_string())),
428 };
429 let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
430 .with_resource(resource)
431 .with_span_processor(BatchSpanProcessor::builder(exporter, runtime::Tokio).build())
432 .build();
433 let tracer = provider.tracer(service_name.to_string());
434 let _ = TRACE_PROVIDER.set(provider.clone());
435 opentelemetry::global::set_tracer_provider(provider);
436 let otel_layer = tracing_opentelemetry::layer()
439 .with_tracer(tracer)
440 .with_context_activation(false);
441 (Some(otel_layer), None)
442 }
443
444 fn init_metrics_provider(
445 service_name: &str,
446 resource: opentelemetry_sdk::Resource,
447 otlp: &OtlpEnvConfig,
448 ) -> Result<(), Box<dyn std::error::Error>> {
449 let Some(endpoint) = otlp.signal(OtlpSignal::Metrics).endpoint.as_deref() else {
450 return Ok(());
451 };
452 let exporter = opentelemetry_otlp::MetricExporter::builder()
453 .with_http()
454 .with_endpoint(endpoint.to_string())
455 .with_headers(otlp.signal(OtlpSignal::Metrics).headers.clone())
456 .build()?;
457 let reader = AsyncPeriodicReader::builder(exporter, runtime::Tokio).build();
458 let meter_provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder()
459 .with_resource(resource)
460 .with_reader(reader)
461 .build();
462 global::set_meter_provider(meter_provider.clone());
463 let meter = global::meter("rrq.runner");
464 let _ = RUNNER_METRICS.set(RunnerMetrics::new(&meter));
465 let _ = RUNNER_LABEL.set(service_name.to_string());
466 let _ = METER_PROVIDER.set(meter_provider);
467 Ok(())
468 }
469
470 fn init_logs_layer(
471 resource: opentelemetry_sdk::Resource,
472 otlp: &OtlpEnvConfig,
473 ) -> (
474 Option<
475 OpenTelemetryTracingBridge<
476 opentelemetry_sdk::logs::SdkLoggerProvider,
477 opentelemetry_sdk::logs::SdkLogger,
478 >,
479 >,
480 Option<String>,
481 ) {
482 let Some(endpoint) = otlp.signal(OtlpSignal::Logs).endpoint.as_deref() else {
483 return (None, None);
484 };
485
486 let exporter = match opentelemetry_otlp::LogExporter::builder()
487 .with_http()
488 .with_endpoint(endpoint.to_string())
489 .with_headers(otlp.signal(OtlpSignal::Logs).headers.clone())
490 .build()
491 {
492 Ok(exporter) => exporter,
493 Err(err) => return (None, Some(err.to_string())),
494 };
495 let provider = opentelemetry_sdk::logs::SdkLoggerProvider::builder()
496 .with_resource(resource)
497 .with_log_processor(
498 opentelemetry_sdk::logs::log_processor_with_async_runtime::BatchLogProcessor::builder(
499 exporter,
500 opentelemetry_sdk::runtime::Tokio,
501 )
502 .build(),
503 )
504 .build();
505 let _ = LOG_PROVIDER.set(provider);
506 let Some(provider_ref) = LOG_PROVIDER.get() else {
507 return (
508 None,
509 Some("failed to initialize OpenTelemetry logger provider".to_string()),
510 );
511 };
512 let layer = OpenTelemetryTracingBridge::new(provider_ref);
513 (Some(layer), None)
514 }
515
516 fn warn_if_global_otlp_endpoint_only(otlp: &OtlpEnvConfig) {
517 if !otlp.has_global_endpoint() {
518 return;
519 }
520 let signal_overrides = otlp.explicit_signal_endpoint_count();
521 if signal_overrides < 3 {
522 tracing::debug!(
523 signal_overrides,
524 "using OTEL_EXPORTER_OTLP_ENDPOINT fallback for signals without explicit OTEL_EXPORTER_OTLP_{{TRACES|METRICS|LOGS}}_ENDPOINT"
525 );
526 }
527 }
528
529 #[cfg(test)]
530 mod tests {
531 use std::collections::HashMap;
532 use std::sync::{Mutex, OnceLock};
533
534 use opentelemetry_sdk::Resource;
535 use rrq_config::OtlpSignalConfig;
536
537 use super::*;
538
539 static TEST_MUTEX: OnceLock<Mutex<()>> = OnceLock::new();
540
541 fn test_lock() -> std::sync::MutexGuard<'static, ()> {
542 TEST_MUTEX
543 .get_or_init(|| Mutex::new(()))
544 .lock()
545 .expect("test mutex poisoned")
546 }
547
548 #[tokio::test]
549 async fn early_metric_emission_does_not_prebind_metrics_cache() {
550 let _guard = test_lock();
551 assert!(
552 METER_PROVIDER.get().is_none(),
553 "test requires uninitialized meter provider"
554 );
555 assert!(
556 RUNNER_METRICS.get().is_none(),
557 "test requires empty runner metrics cache"
558 );
559 assert!(
560 RUNNER_LABEL.get().is_none(),
561 "test requires empty runner label cache"
562 );
563
564 let _pre_init = runner_label();
567 assert!(
568 RUNNER_LABEL.get().is_none(),
569 "runner_label must not lock runner label before telemetry init"
570 );
571
572 record_deadline_expired();
575 assert!(
576 RUNNER_METRICS.get().is_none(),
577 "early metric emission must not cache no-op instruments"
578 );
579 assert!(
580 METRICS_ENDPOINT_CONFIGURED.get().is_some(),
581 "metrics endpoint configuration should be cached after first emission"
582 );
583 assert!(
584 RUNNER_LABEL.get().is_none(),
585 "early metric emission must not lock runner label before telemetry init"
586 );
587
588 let otlp = OtlpEnvConfig {
589 metrics: OtlpSignalConfig {
590 endpoint: Some("http://127.0.0.1:4318/v1/metrics".to_string()),
591 headers: HashMap::new(),
592 has_explicit_endpoint_env: true,
593 },
594 ..OtlpEnvConfig::default()
595 };
596 let resource = Resource::builder()
597 .with_service_name("rrq-runner-test".to_string())
598 .build();
599
600 init_metrics_provider("rrq-runner-test", resource, &otlp)
601 .expect("metrics provider should initialize");
602
603 assert!(
604 METER_PROVIDER.get().is_some(),
605 "metrics provider should be cached after init"
606 );
607 assert!(
608 RUNNER_METRICS.get().is_some(),
609 "runner metrics cache should initialize after provider setup"
610 );
611 assert_eq!(
612 RUNNER_LABEL.get().map(String::as_str),
613 Some("rrq-runner-test"),
614 "metrics init should set runner label to configured service name"
615 );
616 }
617 }
618}