1#![cfg_attr(feature = "fail-on-warnings", deny(warnings))]
2#![cfg_attr(feature = "fail-on-warnings", deny(clippy::all))]
3
4use opentelemetry::{global, trace::TracerProvider, KeyValue};
5use opentelemetry_otlp::WithExportConfig;
6use opentelemetry_sdk::{
7 propagation::TraceContextPropagator,
8 trace::{Sampler, SdkTracerProvider},
9 Resource,
10};
11use opentelemetry_semantic_conventions::resource::SERVICE_NAMESPACE;
12use serde::{Deserialize, Serialize};
13use tracing_subscriber::{filter::EnvFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt};
14
15pub use tracing::*;
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct TracingConfig {
19 service_name: String,
20}
21
22impl Default for TracingConfig {
23 fn default() -> Self {
24 Self {
25 service_name: "cala-dev".to_string(),
26 }
27 }
28}
29
30pub fn init_tracer(config: TracingConfig) -> anyhow::Result<()> {
31 global::set_text_map_propagator(TraceContextPropagator::new());
32
33 let endpoint = std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT")
34 .unwrap_or_else(|_| "http://localhost:4317".to_string());
35
36 let exporter = opentelemetry_otlp::SpanExporter::builder()
37 .with_tonic()
38 .with_endpoint(endpoint)
39 .build()?;
40
41 let provider = SdkTracerProvider::builder()
42 .with_resource(telemetry_resource(&config))
43 .with_batch_exporter(exporter)
44 .with_sampler(Sampler::AlwaysOn)
45 .build();
46
47 global::set_tracer_provider(provider.clone());
48 let tracer = provider.tracer("cala-tracer");
49 let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
50
51 let fmt_layer = fmt::layer().json();
52 let filter_layer = EnvFilter::try_from_default_env()
53 .or_else(|_| EnvFilter::try_new("info,otel::tracing=trace,sqlx=warn"))
54 .unwrap();
55 tracing_subscriber::registry()
56 .with(filter_layer)
57 .with(fmt_layer)
58 .with(telemetry)
59 .init();
60
61 Ok(())
62}
63
64fn telemetry_resource(config: &TracingConfig) -> Resource {
65 Resource::builder()
66 .with_service_name(config.service_name.clone())
67 .with_attributes([KeyValue::new(SERVICE_NAMESPACE, "cala")])
68 .build()
69}
70
71pub fn insert_error_fields(level: tracing::Level, error: impl std::fmt::Display) {
72 Span::current().record("error", tracing::field::display("true"));
73 Span::current().record("error.level", tracing::field::display(level));
74 Span::current().record("error.message", tracing::field::display(error));
75}
76
77#[cfg(feature = "http")]
78pub mod http {
79 pub fn extract_tracing(headers: &axum_extra::headers::HeaderMap) {
80 use opentelemetry::propagation::text_map_propagator::TextMapPropagator;
81 use opentelemetry_http::HeaderExtractor;
82 use opentelemetry_sdk::propagation::TraceContextPropagator;
83 use tracing_opentelemetry::OpenTelemetrySpanExt;
84 let extractor = HeaderExtractor(headers);
85 let propagator = TraceContextPropagator::new();
86 let ctx = propagator.extract(&extractor);
87 let _ = tracing::Span::current().set_parent(ctx);
88 }
89}
90
91#[cfg(feature = "grpc")]
92pub mod grpc {
93 use opentelemetry::propagation::{Extractor, TextMapPropagator};
94 use opentelemetry_sdk::propagation::TraceContextPropagator;
95 use tracing_opentelemetry::OpenTelemetrySpanExt;
96
97 pub fn extract_tracing<T>(request: &tonic::Request<T>) {
98 let propagator = TraceContextPropagator::new();
99 let parent_cx = propagator.extract(&RequestContextExtractor(request));
100 let _ = tracing::Span::current().set_parent(parent_cx);
101 }
102
103 struct RequestContextExtractor<'a, T>(&'a tonic::Request<T>);
104
105 impl<T> Extractor for RequestContextExtractor<'_, T> {
106 fn get(&self, key: &str) -> Option<&str> {
107 self.0.metadata().get(key).and_then(|s| s.to_str().ok())
108 }
109
110 fn keys(&self) -> Vec<&str> {
111 self.0
112 .metadata()
113 .keys()
114 .filter_map(|k| {
115 if let tonic::metadata::KeyRef::Ascii(key) = k {
116 Some(key.as_str())
117 } else {
118 None
119 }
120 })
121 .collect()
122 }
123 }
124}