Skip to main content

rrq_runner/
telemetry.rs

1use crate::types::ExecutionRequest;
2use crate::types::OutcomeStatus;
3use tracing::Span;
4
5pub trait Telemetry: Send + Sync {
6    fn runner_span(&self, request: &ExecutionRequest) -> Span;
7    fn clone_box(&self) -> Box<dyn Telemetry>;
8}
9
10impl Clone for Box<dyn Telemetry> {
11    fn clone(&self) -> Self {
12        self.clone_box()
13    }
14}
15
16#[derive(Clone, Default)]
17pub struct NoopTelemetry;
18
19impl Telemetry for NoopTelemetry {
20    fn runner_span(&self, _request: &ExecutionRequest) -> Span {
21        Span::none()
22    }
23
24    fn clone_box(&self) -> Box<dyn Telemetry> {
25        Box::new(self.clone())
26    }
27}
28
29#[cfg(not(feature = "otel"))]
30pub fn record_runner_inflight_delta(_delta: i64) {}
31
32#[cfg(not(feature = "otel"))]
33pub fn record_runner_channel_pressure(_pressure: usize) {}
34
35#[cfg(not(feature = "otel"))]
36pub fn record_deadline_expired() {}
37
38#[cfg(not(feature = "otel"))]
39pub fn record_cancellation(_scope: &str) {}
40
41#[cfg(not(feature = "otel"))]
42pub fn record_job_outcome(
43    _function_name: &str,
44    _outcome: OutcomeStatus,
45    _duration: std::time::Duration,
46) {
47}
48
49#[cfg(feature = "otel")]
50pub fn record_runner_inflight_delta(delta: i64) {
51    otel::record_runner_inflight_delta(delta);
52}
53
54#[cfg(feature = "otel")]
55pub fn record_runner_channel_pressure(pressure: usize) {
56    otel::record_runner_channel_pressure(pressure);
57}
58
59#[cfg(feature = "otel")]
60pub fn record_deadline_expired() {
61    otel::record_deadline_expired();
62}
63
64#[cfg(feature = "otel")]
65pub fn record_cancellation(scope: &str) {
66    otel::record_cancellation(scope);
67}
68
69#[cfg(feature = "otel")]
70pub fn record_job_outcome(
71    function_name: &str,
72    outcome: OutcomeStatus,
73    duration: std::time::Duration,
74) {
75    otel::record_job_outcome(function_name, outcome, duration);
76}
77
78#[cfg(test)]
79mod tests {
80    use super::*;
81    use crate::types::{ExecutionContext, ExecutionRequest};
82
83    fn build_request() -> ExecutionRequest {
84        ExecutionRequest {
85            protocol_version: "2".to_string(),
86            request_id: "req-1".to_string(),
87            job_id: "job-1".to_string(),
88            function_name: "handler".to_string(),
89            params: std::collections::HashMap::new(),
90            context: ExecutionContext {
91                job_id: "job-1".to_string(),
92                attempt: 1,
93                enqueue_time: "2024-01-01T00:00:00Z".parse().unwrap(),
94                queue_name: "default".to_string(),
95                deadline: None,
96                trace_context: None,
97                correlation_context: None,
98                worker_id: None,
99            },
100        }
101    }
102
103    #[test]
104    fn noop_telemetry_clone_box_works() {
105        let telemetry: Box<dyn Telemetry> = Box::new(NoopTelemetry);
106        let cloned = telemetry.clone();
107        let request = build_request();
108        let span = cloned.runner_span(&request);
109        let _guard = span.enter();
110    }
111}
112
113#[cfg(feature = "otel")]
114pub mod otel {
115    use std::collections::HashMap;
116    use std::sync::OnceLock;
117
118    use chrono::Utc;
119    use opentelemetry::metrics::{Counter, Histogram, Meter, UpDownCounter};
120    use opentelemetry::propagation::Extractor;
121    use opentelemetry::trace::TraceContextExt;
122    use opentelemetry::trace::TracerProvider as _;
123    use opentelemetry::{KeyValue, global};
124    use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
125    use opentelemetry_otlp::{WithExportConfig, WithHttpConfig};
126    use opentelemetry_sdk::metrics::periodic_reader_with_async_runtime::PeriodicReader as AsyncPeriodicReader;
127    use opentelemetry_sdk::runtime;
128    use opentelemetry_sdk::trace::span_processor_with_async_runtime::BatchSpanProcessor;
129    use rrq_config::{OtlpEnvConfig, OtlpGlobalEndpointStyle, OtlpSignal, resolve_otlp_env};
130    use tracing::Span;
131    use tracing::field::Empty;
132    use tracing_opentelemetry::OpenTelemetrySpanExt;
133
134    use crate::types::ExecutionRequest;
135    use crate::types::OutcomeStatus;
136
137    use super::Telemetry;
138
139    static RUNNER_METRICS: OnceLock<RunnerMetrics> = OnceLock::new();
140    static METRICS_ENDPOINT_CONFIGURED: OnceLock<bool> = OnceLock::new();
141    static TRACE_PROVIDER: OnceLock<opentelemetry_sdk::trace::SdkTracerProvider> = OnceLock::new();
142    static LOG_PROVIDER: OnceLock<opentelemetry_sdk::logs::SdkLoggerProvider> = OnceLock::new();
143    static METER_PROVIDER: OnceLock<opentelemetry_sdk::metrics::SdkMeterProvider> = OnceLock::new();
144    static RUNNER_LABEL: OnceLock<String> = OnceLock::new();
145
146    #[derive(Clone)]
147    struct RunnerMetrics {
148        runner_inflight: UpDownCounter<i64>,
149        runner_channel_pressure: Histogram<f64>,
150        deadline_expired_total: Counter<u64>,
151        cancellations_total: Counter<u64>,
152        runner_jobs_total: Counter<u64>,
153        runner_job_duration_ms: Histogram<f64>,
154    }
155
156    impl RunnerMetrics {
157        fn new(meter: &Meter) -> Self {
158            Self {
159                runner_inflight: meter.i64_up_down_counter("rrq_runner_inflight").build(),
160                runner_channel_pressure: meter.f64_histogram("rrq_runner_channel_pressure").build(),
161                deadline_expired_total: meter.u64_counter("rrq_deadline_expired_total").build(),
162                cancellations_total: meter.u64_counter("rrq_cancellations_total").build(),
163                runner_jobs_total: meter.u64_counter("rrq_runner_jobs_total").build(),
164                runner_job_duration_ms: meter.f64_histogram("rrq_runner_job_duration_ms").build(),
165            }
166        }
167    }
168
169    fn runner_label() -> String {
170        if let Some(value) = RUNNER_LABEL.get() {
171            return value.clone();
172        }
173        match std::env::var("OTEL_SERVICE_NAME") {
174            Ok(value) if !value.is_empty() => value,
175            _ => std::env::var("SERVICE_NAME").unwrap_or_else(|_| "rrq-runner".to_string()),
176        }
177    }
178
179    fn metrics_endpoint_configured() -> bool {
180        *METRICS_ENDPOINT_CONFIGURED.get_or_init(|| {
181            resolve_otlp_env(OtlpGlobalEndpointStyle::AppendHttpSignalPath)
182                .signal(OtlpSignal::Metrics)
183                .endpoint
184                .is_some()
185        })
186    }
187
188    fn runner_metrics() -> Option<RunnerMetrics> {
189        if let Some(metrics) = RUNNER_METRICS.get() {
190            return Some(metrics.clone());
191        }
192        if !metrics_endpoint_configured() {
193            return None;
194        }
195
196        // Build transient instruments from the current global provider without
197        // caching. This avoids permanently binding RUNNER_METRICS to the
198        // process-default no-op provider before init_metrics_provider() runs.
199        let meter = global::meter("rrq.runner");
200        Some(RunnerMetrics::new(&meter))
201    }
202
203    pub fn record_runner_inflight_delta(delta: i64) {
204        if delta == 0 {
205            return;
206        }
207        let Some(metrics) = runner_metrics() else {
208            return;
209        };
210        metrics
211            .runner_inflight
212            .add(delta, &[KeyValue::new("runner", runner_label())]);
213    }
214
215    pub fn record_runner_channel_pressure(pressure: usize) {
216        let Some(metrics) = runner_metrics() else {
217            return;
218        };
219        metrics
220            .runner_channel_pressure
221            .record(pressure as f64, &[KeyValue::new("runner", runner_label())]);
222    }
223
224    pub fn record_deadline_expired() {
225        let Some(metrics) = runner_metrics() else {
226            return;
227        };
228        metrics
229            .deadline_expired_total
230            .add(1, &[KeyValue::new("runner", runner_label())]);
231    }
232
233    pub fn record_cancellation(scope: &str) {
234        let Some(metrics) = runner_metrics() else {
235            return;
236        };
237        metrics.cancellations_total.add(
238            1,
239            &[
240                KeyValue::new("runner", runner_label()),
241                KeyValue::new("scope", scope.to_string()),
242            ],
243        );
244    }
245
246    pub fn record_job_outcome(
247        function_name: &str,
248        outcome: OutcomeStatus,
249        duration: std::time::Duration,
250    ) {
251        let Some(metrics) = runner_metrics() else {
252            return;
253        };
254        let outcome = match outcome {
255            OutcomeStatus::Success => "success",
256            OutcomeStatus::Retry => "retry",
257            OutcomeStatus::Timeout => "timeout",
258            OutcomeStatus::Error => "error",
259        };
260        let attrs = [
261            KeyValue::new("runner", runner_label()),
262            KeyValue::new("outcome", outcome.to_string()),
263            KeyValue::new("function", function_name.to_string()),
264        ];
265        metrics.runner_jobs_total.add(1, &attrs);
266        metrics
267            .runner_job_duration_ms
268            .record(duration.as_secs_f64() * 1000.0, &attrs);
269    }
270
271    pub struct OtelTelemetry;
272
273    impl Default for OtelTelemetry {
274        fn default() -> Self {
275            Self
276        }
277    }
278
279    impl Telemetry for OtelTelemetry {
280        fn runner_span(&self, request: &ExecutionRequest) -> Span {
281            let queue_wait_ms = Utc::now()
282                .signed_duration_since(request.context.enqueue_time)
283                .num_milliseconds()
284                .max(0) as f64;
285            let span = tracing::info_span!(
286                "rrq.runner",
287                "span.kind" = "consumer",
288                "messaging.system" = "redis",
289                "messaging.destination.name" = %request.context.queue_name,
290                "messaging.destination_kind" = "queue",
291                "messaging.operation" = "process",
292                "rrq.job_id" = %request.job_id,
293                "rrq.function" = %request.function_name,
294                "rrq.queue" = %request.context.queue_name,
295                "rrq.attempt" = request.context.attempt,
296                "rrq.worker_id" = Empty,
297                "rrq.deadline" = Empty,
298                "rrq.deadline_remaining_ms" = Empty,
299                "rrq.queue_wait_ms" = queue_wait_ms,
300                "rrq.outcome" = Empty,
301                "rrq.duration_ms" = Empty,
302                "rrq.retry_delay_ms" = Empty,
303                "rrq.error_message" = Empty,
304                "rrq.error_type" = Empty,
305            );
306
307            if let Some(worker_id) = &request.context.worker_id {
308                span.record("rrq.worker_id", worker_id.as_str());
309            }
310            if let Some(deadline) = &request.context.deadline {
311                let deadline_str = deadline.to_rfc3339();
312                span.record("rrq.deadline", deadline_str.as_str());
313                let remaining_ms = deadline
314                    .signed_duration_since(Utc::now())
315                    .num_milliseconds();
316                span.record("rrq.deadline_remaining_ms", (remaining_ms.max(0)) as f64);
317            }
318            if let Some(trace_context) = &request.context.trace_context {
319                let parent = opentelemetry::global::get_text_map_propagator(|prop| {
320                    prop.extract(&HashMapExtractor(trace_context))
321                });
322                let _ = span.set_parent(parent);
323            }
324            if let Some(correlation_context) = &request.context.correlation_context {
325                let span_context = span.context();
326                let otel_span = span_context.span();
327                for (key, value) in correlation_context {
328                    if key.is_empty() || value.is_empty() {
329                        continue;
330                    }
331                    otel_span.set_attribute(KeyValue::new(key.clone(), value.clone()));
332                }
333            }
334
335            span
336        }
337
338        fn clone_box(&self) -> Box<dyn Telemetry> {
339            Box::new(Self)
340        }
341    }
342
343    struct HashMapExtractor<'a>(&'a HashMap<String, String>);
344
345    impl<'a> Extractor for HashMapExtractor<'a> {
346        fn get(&self, key: &str) -> Option<&str> {
347            self.0.get(key).map(|value| value.as_str())
348        }
349
350        fn keys(&self) -> Vec<&str> {
351            self.0.keys().map(|key| key.as_str()).collect()
352        }
353    }
354
355    pub fn init_tracing(service_name: &str) -> Result<(), Box<dyn std::error::Error>> {
356        use opentelemetry_sdk::Resource;
357        use tracing_subscriber::layer::SubscriberExt;
358        use tracing_subscriber::util::SubscriberInitExt;
359
360        let otlp = resolve_otlp_env(OtlpGlobalEndpointStyle::AppendHttpSignalPath);
361        opentelemetry::global::set_text_map_propagator(
362            opentelemetry_sdk::propagation::TraceContextPropagator::new(),
363        );
364
365        let resource = Resource::builder()
366            .with_service_name(service_name.to_string())
367            .build();
368        let (trace_layer, trace_error) = init_trace_layer(service_name, resource.clone(), &otlp);
369        let (log_layer, logs_error) = init_logs_layer(resource.clone(), &otlp);
370        let metrics_error = init_metrics_provider(service_name, resource, &otlp).err();
371        match (trace_layer, log_layer) {
372            (Some(trace_layer), Some(log_layer)) => tracing_subscriber::registry()
373                .with(trace_layer)
374                .with(log_layer)
375                .with(tracing_subscriber::fmt::layer())
376                .try_init()?,
377            (Some(trace_layer), None) => tracing_subscriber::registry()
378                .with(trace_layer)
379                .with(tracing_subscriber::fmt::layer())
380                .try_init()?,
381            (None, Some(log_layer)) => tracing_subscriber::registry()
382                .with(log_layer)
383                .with(tracing_subscriber::fmt::layer())
384                .try_init()?,
385            (None, None) => tracing_subscriber::registry()
386                .with(tracing_subscriber::fmt::layer())
387                .try_init()?,
388        };
389        warn_if_global_otlp_endpoint_only(&otlp);
390        if let Some(error) = trace_error {
391            tracing::warn!(error = %error, "OpenTelemetry tracing exporter failed to initialize");
392        }
393        if let Some(error) = metrics_error {
394            tracing::warn!(error = %error, "OpenTelemetry metrics exporter failed to initialize");
395        }
396        if let Some(error) = logs_error {
397            tracing::warn!(error = %error, "OpenTelemetry logs exporter failed to initialize");
398        }
399
400        Ok(())
401    }
402
403    fn init_trace_layer(
404        service_name: &str,
405        resource: opentelemetry_sdk::Resource,
406        otlp: &OtlpEnvConfig,
407    ) -> (
408        Option<
409            tracing_opentelemetry::OpenTelemetryLayer<
410                tracing_subscriber::Registry,
411                opentelemetry_sdk::trace::Tracer,
412            >,
413        >,
414        Option<String>,
415    ) {
416        let Some(endpoint) = otlp.signal(OtlpSignal::Traces).endpoint.as_deref() else {
417            return (None, None);
418        };
419
420        let exporter = match opentelemetry_otlp::SpanExporter::builder()
421            .with_http()
422            .with_endpoint(endpoint.to_string())
423            .with_headers(otlp.signal(OtlpSignal::Traces).headers.clone())
424            .build()
425        {
426            Ok(exporter) => exporter,
427            Err(err) => return (None, Some(err.to_string())),
428        };
429        let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
430            .with_resource(resource)
431            .with_span_processor(BatchSpanProcessor::builder(exporter, runtime::Tokio).build())
432            .build();
433        let tracer = provider.tracer(service_name.to_string());
434        let _ = TRACE_PROVIDER.set(provider.clone());
435        opentelemetry::global::set_tracer_provider(provider);
436        // Avoid TLS ContextGuard teardown panics on runtime worker shutdown.
437        // We rely on tracing spans for hierarchy and explicit propagation helpers.
438        let otel_layer = tracing_opentelemetry::layer()
439            .with_tracer(tracer)
440            .with_context_activation(false);
441        (Some(otel_layer), None)
442    }
443
444    fn init_metrics_provider(
445        service_name: &str,
446        resource: opentelemetry_sdk::Resource,
447        otlp: &OtlpEnvConfig,
448    ) -> Result<(), Box<dyn std::error::Error>> {
449        let Some(endpoint) = otlp.signal(OtlpSignal::Metrics).endpoint.as_deref() else {
450            return Ok(());
451        };
452        let exporter = opentelemetry_otlp::MetricExporter::builder()
453            .with_http()
454            .with_endpoint(endpoint.to_string())
455            .with_headers(otlp.signal(OtlpSignal::Metrics).headers.clone())
456            .build()?;
457        let reader = AsyncPeriodicReader::builder(exporter, runtime::Tokio).build();
458        let meter_provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder()
459            .with_resource(resource)
460            .with_reader(reader)
461            .build();
462        global::set_meter_provider(meter_provider.clone());
463        let meter = global::meter("rrq.runner");
464        let _ = RUNNER_METRICS.set(RunnerMetrics::new(&meter));
465        let _ = RUNNER_LABEL.set(service_name.to_string());
466        let _ = METER_PROVIDER.set(meter_provider);
467        Ok(())
468    }
469
470    fn init_logs_layer(
471        resource: opentelemetry_sdk::Resource,
472        otlp: &OtlpEnvConfig,
473    ) -> (
474        Option<
475            OpenTelemetryTracingBridge<
476                opentelemetry_sdk::logs::SdkLoggerProvider,
477                opentelemetry_sdk::logs::SdkLogger,
478            >,
479        >,
480        Option<String>,
481    ) {
482        let Some(endpoint) = otlp.signal(OtlpSignal::Logs).endpoint.as_deref() else {
483            return (None, None);
484        };
485
486        let exporter = match opentelemetry_otlp::LogExporter::builder()
487            .with_http()
488            .with_endpoint(endpoint.to_string())
489            .with_headers(otlp.signal(OtlpSignal::Logs).headers.clone())
490            .build()
491        {
492            Ok(exporter) => exporter,
493            Err(err) => return (None, Some(err.to_string())),
494        };
495        let provider = opentelemetry_sdk::logs::SdkLoggerProvider::builder()
496            .with_resource(resource)
497            .with_log_processor(
498                opentelemetry_sdk::logs::log_processor_with_async_runtime::BatchLogProcessor::builder(
499                    exporter,
500                    opentelemetry_sdk::runtime::Tokio,
501                )
502                .build(),
503            )
504            .build();
505        let _ = LOG_PROVIDER.set(provider);
506        let Some(provider_ref) = LOG_PROVIDER.get() else {
507            return (
508                None,
509                Some("failed to initialize OpenTelemetry logger provider".to_string()),
510            );
511        };
512        let layer = OpenTelemetryTracingBridge::new(provider_ref);
513        (Some(layer), None)
514    }
515
516    fn warn_if_global_otlp_endpoint_only(otlp: &OtlpEnvConfig) {
517        if !otlp.has_global_endpoint() {
518            return;
519        }
520        let signal_overrides = otlp.explicit_signal_endpoint_count();
521        if signal_overrides < 3 {
522            tracing::debug!(
523                signal_overrides,
524                "using OTEL_EXPORTER_OTLP_ENDPOINT fallback for signals without explicit OTEL_EXPORTER_OTLP_{{TRACES|METRICS|LOGS}}_ENDPOINT"
525            );
526        }
527    }
528
529    #[cfg(test)]
530    mod tests {
531        use std::collections::HashMap;
532        use std::sync::{Mutex, OnceLock};
533
534        use opentelemetry_sdk::Resource;
535        use rrq_config::OtlpSignalConfig;
536
537        use super::*;
538
539        static TEST_MUTEX: OnceLock<Mutex<()>> = OnceLock::new();
540
541        fn test_lock() -> std::sync::MutexGuard<'static, ()> {
542            TEST_MUTEX
543                .get_or_init(|| Mutex::new(()))
544                .lock()
545                .expect("test mutex poisoned")
546        }
547
548        #[tokio::test]
549        async fn early_metric_emission_does_not_prebind_metrics_cache() {
550            let _guard = test_lock();
551            assert!(
552                METER_PROVIDER.get().is_none(),
553                "test requires uninitialized meter provider"
554            );
555            assert!(
556                RUNNER_METRICS.get().is_none(),
557                "test requires empty runner metrics cache"
558            );
559            assert!(
560                RUNNER_LABEL.get().is_none(),
561                "test requires empty runner label cache"
562            );
563
564            // Regression: ensure helper reads do not lock the configured runner label before
565            // init_metrics_provider() gets a chance to set it.
566            let _pre_init = runner_label();
567            assert!(
568                RUNNER_LABEL.get().is_none(),
569                "runner_label must not lock runner label before telemetry init"
570            );
571
572            // Emit before metrics provider init to model startup ordering where
573            // runtime counters may fire before init_tracing().
574            record_deadline_expired();
575            assert!(
576                RUNNER_METRICS.get().is_none(),
577                "early metric emission must not cache no-op instruments"
578            );
579            assert!(
580                METRICS_ENDPOINT_CONFIGURED.get().is_some(),
581                "metrics endpoint configuration should be cached after first emission"
582            );
583            assert!(
584                RUNNER_LABEL.get().is_none(),
585                "early metric emission must not lock runner label before telemetry init"
586            );
587
588            let otlp = OtlpEnvConfig {
589                metrics: OtlpSignalConfig {
590                    endpoint: Some("http://127.0.0.1:4318/v1/metrics".to_string()),
591                    headers: HashMap::new(),
592                    has_explicit_endpoint_env: true,
593                },
594                ..OtlpEnvConfig::default()
595            };
596            let resource = Resource::builder()
597                .with_service_name("rrq-runner-test".to_string())
598                .build();
599
600            init_metrics_provider("rrq-runner-test", resource, &otlp)
601                .expect("metrics provider should initialize");
602
603            assert!(
604                METER_PROVIDER.get().is_some(),
605                "metrics provider should be cached after init"
606            );
607            assert!(
608                RUNNER_METRICS.get().is_some(),
609                "runner metrics cache should initialize after provider setup"
610            );
611            assert_eq!(
612                RUNNER_LABEL.get().map(String::as_str),
613                Some("rrq-runner-test"),
614                "metrics init should set runner label to configured service name"
615            );
616        }
617    }
618}