cala_tracing/
lib.rs

1#![cfg_attr(feature = "fail-on-warnings", deny(warnings))]
2#![cfg_attr(feature = "fail-on-warnings", deny(clippy::all))]
3
4use opentelemetry::{trace::TracerProvider, KeyValue};
5use opentelemetry_sdk::{
6    resource::{EnvResourceDetector, SdkProvidedResourceDetector},
7    trace::Config,
8    Resource,
9};
10use opentelemetry_semantic_conventions::resource::{SERVICE_NAME, SERVICE_NAMESPACE};
11use serde::{Deserialize, Serialize};
12use tracing_subscriber::{filter::EnvFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt};
13
14pub use tracing::*;
15
16use std::time::Duration;
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct TracingConfig {
20    service_name: String,
21}
22
23impl Default for TracingConfig {
24    fn default() -> Self {
25        Self {
26            service_name: "cala-dev".to_string(),
27        }
28    }
29}
30
31pub fn init_tracer(config: TracingConfig) -> anyhow::Result<()> {
32    let provider = opentelemetry_otlp::new_pipeline()
33        .tracing()
34        .with_exporter(opentelemetry_otlp::new_exporter().tonic())
35        .with_trace_config(Config::default().with_resource(telemetry_resource(&config)))
36        .install_batch(opentelemetry_sdk::runtime::Tokio)?;
37    let telemetry =
38        tracing_opentelemetry::layer().with_tracer(provider.tracer(config.service_name));
39
40    let fmt_layer = fmt::layer().json();
41    let filter_layer = EnvFilter::try_from_default_env()
42        .or_else(|_| EnvFilter::try_new("info,otel::tracing=trace,sqlx=warn,sqlx_ledger=info"))
43        .unwrap();
44    tracing_subscriber::registry()
45        .with(filter_layer)
46        .with(fmt_layer)
47        .with(telemetry)
48        .init();
49
50    Ok(())
51}
52
53fn telemetry_resource(config: &TracingConfig) -> Resource {
54    Resource::from_detectors(
55        Duration::from_secs(3),
56        vec![
57            Box::new(EnvResourceDetector::new()),
58            Box::new(SdkProvidedResourceDetector),
59        ],
60    )
61    .merge(&Resource::new(vec![
62        KeyValue::new(SERVICE_NAME, config.service_name.clone()),
63        KeyValue::new(SERVICE_NAMESPACE, "lava"),
64    ]))
65}
66
67pub fn insert_error_fields(level: tracing::Level, error: impl std::fmt::Display) {
68    Span::current().record("error", tracing::field::display("true"));
69    Span::current().record("error.level", tracing::field::display(level));
70    Span::current().record("error.message", tracing::field::display(error));
71}
72
73#[cfg(feature = "http")]
74pub mod http {
75    pub fn extract_tracing(headers: &axum_extra::headers::HeaderMap) {
76        use opentelemetry::propagation::text_map_propagator::TextMapPropagator;
77        use opentelemetry_http::HeaderExtractor;
78        use opentelemetry_sdk::propagation::TraceContextPropagator;
79        use tracing_opentelemetry::OpenTelemetrySpanExt;
80        let extractor = HeaderExtractor(headers);
81        let propagator = TraceContextPropagator::new();
82        let ctx = propagator.extract(&extractor);
83        tracing::Span::current().set_parent(ctx)
84    }
85}
86
87#[cfg(feature = "grpc")]
88pub mod grpc {
89    use opentelemetry::propagation::{Extractor, TextMapPropagator};
90    use opentelemetry_sdk::propagation::TraceContextPropagator;
91    use tracing_opentelemetry::OpenTelemetrySpanExt;
92
93    pub fn extract_tracing<T>(request: &tonic::Request<T>) {
94        let propagator = TraceContextPropagator::new();
95        let parent_cx = propagator.extract(&RequestContextExtractor(request));
96        tracing::Span::current().set_parent(parent_cx)
97    }
98
99    struct RequestContextExtractor<'a, T>(&'a tonic::Request<T>);
100
101    impl<T> Extractor for RequestContextExtractor<'_, T> {
102        fn get(&self, key: &str) -> Option<&str> {
103            self.0.metadata().get(key).and_then(|s| s.to_str().ok())
104        }
105
106        fn keys(&self) -> Vec<&str> {
107            self.0
108                .metadata()
109                .keys()
110                .filter_map(|k| {
111                    if let tonic::metadata::KeyRef::Ascii(key) = k {
112                        Some(key.as_str())
113                    } else {
114                        None
115                    }
116                })
117                .collect()
118        }
119    }
120}