1#![cfg_attr(feature = "fail-on-warnings", deny(warnings))]
2#![cfg_attr(feature = "fail-on-warnings", deny(clippy::all))]
3
4use opentelemetry::{trace::TracerProvider, KeyValue};
5use opentelemetry_sdk::{
6 resource::{EnvResourceDetector, SdkProvidedResourceDetector},
7 trace::Config,
8 Resource,
9};
10use opentelemetry_semantic_conventions::resource::{SERVICE_NAME, SERVICE_NAMESPACE};
11use serde::{Deserialize, Serialize};
12use tracing_subscriber::{filter::EnvFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt};
13
14pub use tracing::*;
15
16use std::time::Duration;
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct TracingConfig {
20 service_name: String,
21}
22
23impl Default for TracingConfig {
24 fn default() -> Self {
25 Self {
26 service_name: "cala-dev".to_string(),
27 }
28 }
29}
30
31pub fn init_tracer(config: TracingConfig) -> anyhow::Result<()> {
32 let provider = opentelemetry_otlp::new_pipeline()
33 .tracing()
34 .with_exporter(opentelemetry_otlp::new_exporter().tonic())
35 .with_trace_config(Config::default().with_resource(telemetry_resource(&config)))
36 .install_batch(opentelemetry_sdk::runtime::Tokio)?;
37 let telemetry =
38 tracing_opentelemetry::layer().with_tracer(provider.tracer(config.service_name));
39
40 let fmt_layer = fmt::layer().json();
41 let filter_layer = EnvFilter::try_from_default_env()
42 .or_else(|_| EnvFilter::try_new("info,otel::tracing=trace,sqlx=warn,sqlx_ledger=info"))
43 .unwrap();
44 tracing_subscriber::registry()
45 .with(filter_layer)
46 .with(fmt_layer)
47 .with(telemetry)
48 .init();
49
50 Ok(())
51}
52
53fn telemetry_resource(config: &TracingConfig) -> Resource {
54 Resource::from_detectors(
55 Duration::from_secs(3),
56 vec![
57 Box::new(EnvResourceDetector::new()),
58 Box::new(SdkProvidedResourceDetector),
59 ],
60 )
61 .merge(&Resource::new(vec![
62 KeyValue::new(SERVICE_NAME, config.service_name.clone()),
63 KeyValue::new(SERVICE_NAMESPACE, "lava"),
64 ]))
65}
66
67pub fn insert_error_fields(level: tracing::Level, error: impl std::fmt::Display) {
68 Span::current().record("error", tracing::field::display("true"));
69 Span::current().record("error.level", tracing::field::display(level));
70 Span::current().record("error.message", tracing::field::display(error));
71}
72
73#[cfg(feature = "http")]
74pub mod http {
75 pub fn extract_tracing(headers: &axum_extra::headers::HeaderMap) {
76 use opentelemetry::propagation::text_map_propagator::TextMapPropagator;
77 use opentelemetry_http::HeaderExtractor;
78 use opentelemetry_sdk::propagation::TraceContextPropagator;
79 use tracing_opentelemetry::OpenTelemetrySpanExt;
80 let extractor = HeaderExtractor(headers);
81 let propagator = TraceContextPropagator::new();
82 let ctx = propagator.extract(&extractor);
83 tracing::Span::current().set_parent(ctx)
84 }
85}
86
87#[cfg(feature = "grpc")]
88pub mod grpc {
89 use opentelemetry::propagation::{Extractor, TextMapPropagator};
90 use opentelemetry_sdk::propagation::TraceContextPropagator;
91 use tracing_opentelemetry::OpenTelemetrySpanExt;
92
93 pub fn extract_tracing<T>(request: &tonic::Request<T>) {
94 let propagator = TraceContextPropagator::new();
95 let parent_cx = propagator.extract(&RequestContextExtractor(request));
96 tracing::Span::current().set_parent(parent_cx)
97 }
98
99 struct RequestContextExtractor<'a, T>(&'a tonic::Request<T>);
100
101 impl<T> Extractor for RequestContextExtractor<'_, T> {
102 fn get(&self, key: &str) -> Option<&str> {
103 self.0.metadata().get(key).and_then(|s| s.to_str().ok())
104 }
105
106 fn keys(&self) -> Vec<&str> {
107 self.0
108 .metadata()
109 .keys()
110 .filter_map(|k| {
111 if let tonic::metadata::KeyRef::Ascii(key) = k {
112 Some(key.as_str())
113 } else {
114 None
115 }
116 })
117 .collect()
118 }
119 }
120}