Skip to main content

endpoint_libs/libs/log/
otel.rs

1//! OpenTelemetry log and trace forwarding layer
2//!
3//! This module provides a separate tracing layer that forwards all log events and span lifecycle
4//! to an OpenTelemetry collector via OTLP. It operates independently from the existing stdout
5//! and file logging layers.
6//!
7//! # Configuration
8//! OTel can be configured via [`OtelConfig`] or through standard environment variables:
9//! - `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT` - OTLP collector endpoint for traces
10//! - `OTEL_EXPORTER_OTLP_LOGS_ENDPOINT` - OTLP collector endpoint for logs
11//! - `OTEL_SERVICE_NAME` - Service name for traces
12//! - `OTEL_EXPORTER_OTLP_HEADERS` - Additional headers (e.g., auth tokens)
13//!
14//! # Graceful Degradation
15//! If the OTel layer fails to initialize (e.g., invalid endpoint, network issues), a warning
16//! is logged and the application continues with stdout/file logging only.
17
18use std::collections::HashMap;
19
20use opentelemetry::propagation::{TextMapCompositePropagator, TextMapPropagator};
21use opentelemetry_otlp::WithHttpConfig;
22use opentelemetry_sdk::{
23    Resource,
24    logs::SdkLoggerProvider,
25    propagation::{BaggagePropagator, TraceContextPropagator},
26    trace::{SdkTracerProvider, Tracer},
27};
28use opentelemetry_semantic_conventions::resource::SERVICE_VERSION;
29
30/// Configuration for OpenTelemetry integration
31#[derive(Debug, Clone)]
32pub struct OtelConfig {
33    /// Whether to enable OTel log/trace forwarding
34    pub enabled: bool,
35    /// Service name to identify this application
36    pub service_name: Option<String>,
37    /// OTLP collector endpoint (e.g., "http://localhost:4317")
38    pub endpoint: Option<String>,
39    /// Additional headers to include in OTLP requests (e.g., authentication)
40    pub headers: HashMap<String, String>,
41}
42
43impl Default for OtelConfig {
44    fn default() -> Self {
45        Self {
46            enabled: false,
47            service_name: None,
48            endpoint: None,
49            headers: HashMap::new(),
50        }
51    }
52}
53
54/// Guards for OpenTelemetry providers to ensure traces and logs are flushed on drop
55pub struct OtelGuards {
56    /// The tracer provider guard
57    pub tracer_provider: SdkTracerProvider,
58    /// The logger provider guard
59    pub logger_provider: SdkLoggerProvider,
60}
61
62impl Drop for OtelGuards {
63    fn drop(&mut self) {
64        tracing::debug!(target: "otel::setup", "OTel layer shutting down - flushing pending traces and logs");
65    }
66}
67
68/// Result of building the OTel layer
69pub struct OtelLayerResult {
70    /// The guards that must be kept alive to ensure traces/logs are flushed on drop
71    pub guards: Option<OtelGuards>,
72    /// The tracer for OpenTelemetryLayer
73    pub tracer: Option<Tracer>,
74}
75
76/// Build OpenTelemetry tracer and logger providers for forwarding logs and spans
77pub fn build_otel_layer(config: &OtelConfig) -> OtelLayerResult {
78    if !config.enabled {
79        tracing::debug!(target: "otel::setup", "OTel layer disabled by config");
80        return OtelLayerResult {
81            guards: None,
82            tracer: None,
83        };
84    }
85
86    match build_otel_layer_inner(config) {
87        Ok(result) => {
88            tracing::debug!(
89                target: "otel::setup",
90                service_name = result.service_name,
91                endpoint = ?config.endpoint,
92                "OTel layer initialized successfully (Traces + Logs)"
93            );
94            tracing::debug!(
95                target: "otel::setup",
96                endpoint = config.endpoint.as_deref().unwrap_or("SDK default"),
97                header_keys = ?config.headers.keys().collect::<Vec<_>>(),
98                "OTel exporter config"
99            );
100            OtelLayerResult {
101                guards: Some(OtelGuards {
102                    tracer_provider: result.tracer_provider,
103                    logger_provider: result.logger_provider,
104                }),
105                tracer: Some(result.tracer),
106            }
107        }
108        Err(e) => {
109            tracing::warn!(
110                target: "otel::setup",
111                error = %e,
112                "Failed to initialize OTel layer - continuing without OTel forwarding"
113            );
114            OtelLayerResult {
115                guards: None,
116                tracer: None,
117            }
118        }
119    }
120}
121
122struct OtelLayerBuild {
123    tracer_provider: SdkTracerProvider,
124    logger_provider: SdkLoggerProvider,
125    tracer: Tracer,
126    service_name: String,
127}
128
129fn build_otel_layer_inner(
130    config: &OtelConfig,
131) -> Result<OtelLayerBuild, Box<dyn std::error::Error + Send + Sync>> {
132    let service_name = config
133        .service_name
134        .clone()
135        .or_else(|| std::env::var("OTEL_SERVICE_NAME").ok())
136        .unwrap_or_else(|| env!("CARGO_PKG_NAME").to_string());
137
138    let resource = Resource::builder()
139        .with_service_name(service_name.clone())
140        .with_attribute(opentelemetry::KeyValue::new(
141            SERVICE_VERSION,
142            env!("CARGO_PKG_VERSION"),
143        ))
144        .build();
145
146    let tracer_provider = build_tracer_provider(&resource, config)?;
147    let logger_provider = build_logger_provider(&resource, config)?;
148
149    init_propagator();
150
151    let tracer =
152        opentelemetry::trace::TracerProvider::tracer(&tracer_provider, service_name.clone());
153
154    Ok(OtelLayerBuild {
155        tracer_provider,
156        logger_provider,
157        tracer,
158        service_name,
159    })
160}
161
162fn build_tracer_provider(
163    resource: &Resource,
164    config: &OtelConfig,
165) -> Result<SdkTracerProvider, Box<dyn std::error::Error + Send + Sync>> {
166    use opentelemetry_otlp::{SpanExporter, WithExportConfig};
167
168    let endpoint = config
169        .endpoint
170        .clone()
171        .or_else(|| std::env::var("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT").ok());
172
173    tracing::debug!(
174        target: "otel::setup",
175        "Building traces exporter: endpoint={:?}, headers_count={}",
176        endpoint,
177        config.headers.len(),
178    );
179
180    let mut builder = SdkTracerProvider::builder().with_resource(resource.clone());
181
182    let mut exporter_builder = SpanExporter::builder().with_http();
183    if let Some(ref ep) = endpoint {
184        exporter_builder = exporter_builder.with_endpoint(ep);
185    }
186    if !config.headers.is_empty() {
187        exporter_builder = exporter_builder.with_headers(config.headers.clone());
188    }
189    builder = builder.with_batch_exporter(exporter_builder.build()?);
190
191    Ok(builder.build())
192}
193
194fn build_logger_provider(
195    resource: &Resource,
196    config: &OtelConfig,
197) -> Result<SdkLoggerProvider, Box<dyn std::error::Error + Send + Sync>> {
198    use opentelemetry_otlp::{LogExporter, WithExportConfig};
199
200    let endpoint = config
201        .endpoint
202        .clone()
203        .or_else(|| std::env::var("OTEL_EXPORTER_OTLP_LOGS_ENDPOINT").ok())
204        .or_else(|| config.endpoint.clone())
205        .or_else(|| std::env::var("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT").ok());
206
207    tracing::debug!(
208        target: "otel::setup",
209        "Building logs exporter: endpoint={:?}, headers_count={}",
210        endpoint,
211        config.headers.len(),
212    );
213
214    let mut builder = SdkLoggerProvider::builder().with_resource(resource.clone());
215
216    let mut exporter_builder = LogExporter::builder().with_http();
217    if let Some(ref ep) = endpoint {
218        exporter_builder = exporter_builder.with_endpoint(ep);
219    }
220    if !config.headers.is_empty() {
221        exporter_builder = exporter_builder.with_headers(config.headers.clone());
222    }
223    builder = builder.with_batch_exporter(exporter_builder.build()?);
224
225    Ok(builder.build())
226}
227
228fn init_propagator() {
229    let value =
230        std::env::var("OTEL_PROPAGATORS").unwrap_or_else(|_| "tracecontext,baggage".to_string());
231    let mut propagators: Vec<(Box<dyn TextMapPropagator + Send + Sync>, String)> = Vec::new();
232
233    for name in value.split(',').map(|s| s.trim().to_lowercase()) {
234        match name.as_str() {
235            "tracecontext" => propagators.push((Box::new(TraceContextPropagator::new()), name)),
236            "baggage" => propagators.push((Box::new(BaggagePropagator::new()), name)),
237            _ => {}
238        }
239    }
240
241    if !propagators.is_empty() {
242        let (propagators_impl, _): (Vec<_>, Vec<_>) = propagators.into_iter().unzip();
243        opentelemetry::global::set_text_map_propagator(TextMapCompositePropagator::new(
244            propagators_impl,
245        ));
246    }
247}