Skip to main content

aster/tracing/
otlp_layer.rs

1use opentelemetry::trace::TracerProvider;
2use opentelemetry::{global, KeyValue};
3use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
4use opentelemetry_otlp::WithExportConfig;
5use opentelemetry_sdk::logs::{Logger, LoggerProvider};
6use opentelemetry_sdk::trace::{self, RandomIdGenerator, Sampler};
7use opentelemetry_sdk::{runtime, Resource};
8use std::time::Duration;
9use tracing::{Level, Metadata};
10use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer};
11use tracing_subscriber::filter::FilterFn;
12
13pub type OtlpTracingLayer =
14    OpenTelemetryLayer<tracing_subscriber::Registry, opentelemetry_sdk::trace::Tracer>;
15pub type OtlpMetricsLayer = MetricsLayer<tracing_subscriber::Registry>;
16pub type OtlpLogsLayer = OpenTelemetryTracingBridge<LoggerProvider, Logger>;
17pub type OtlpLayers = (OtlpTracingLayer, OtlpMetricsLayer, OtlpLogsLayer);
18pub type OtlpResult<T> = Result<T, Box<dyn std::error::Error + Send + Sync>>;
19
20#[derive(Debug, Clone)]
21pub struct OtlpConfig {
22    pub endpoint: String,
23    pub timeout: Duration,
24}
25
26impl Default for OtlpConfig {
27    fn default() -> Self {
28        Self {
29            endpoint: "http://localhost:4318".to_string(),
30            timeout: Duration::from_secs(10),
31        }
32    }
33}
34
35impl OtlpConfig {
36    pub fn from_config() -> Option<Self> {
37        let config = crate::config::Config::global();
38
39        // Try to get the endpoint from config (checks OTEL_EXPORTER_OTLP_ENDPOINT env var first)
40        let endpoint = config
41            .get_param::<String>("otel_exporter_otlp_endpoint")
42            .ok()?;
43
44        let mut otlp_config = Self {
45            endpoint,
46            timeout: Duration::from_secs(10),
47        };
48
49        // Try to get timeout from config (checks OTEL_EXPORTER_OTLP_TIMEOUT env var first)
50        if let Ok(timeout_ms) = config.get_param::<u64>("otel_exporter_otlp_timeout") {
51            otlp_config.timeout = Duration::from_millis(timeout_ms);
52        }
53
54        Some(otlp_config)
55    }
56}
57
58pub fn init_otlp_tracing(config: &OtlpConfig) -> OtlpResult<()> {
59    let resource = Resource::new(vec![
60        KeyValue::new("service.name", "aster"),
61        KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
62        KeyValue::new("service.namespace", "aster"),
63    ]);
64
65    let exporter = opentelemetry_otlp::SpanExporter::builder()
66        .with_http()
67        .with_endpoint(&config.endpoint)
68        .with_timeout(config.timeout)
69        .build()?;
70
71    let tracer_provider = trace::TracerProvider::builder()
72        .with_batch_exporter(exporter, runtime::Tokio)
73        .with_resource(resource.clone())
74        .with_id_generator(RandomIdGenerator::default())
75        .with_sampler(Sampler::AlwaysOn)
76        .build();
77
78    global::set_tracer_provider(tracer_provider);
79
80    Ok(())
81}
82
83pub fn init_otlp_metrics(config: &OtlpConfig) -> OtlpResult<()> {
84    let resource = Resource::new(vec![
85        KeyValue::new("service.name", "aster"),
86        KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
87        KeyValue::new("service.namespace", "aster"),
88    ]);
89
90    let exporter = opentelemetry_otlp::MetricExporter::builder()
91        .with_http()
92        .with_endpoint(&config.endpoint)
93        .with_timeout(config.timeout)
94        .build()?;
95
96    let meter_provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder()
97        .with_resource(resource)
98        .with_reader(
99            opentelemetry_sdk::metrics::PeriodicReader::builder(exporter, runtime::Tokio)
100                .with_interval(Duration::from_secs(3))
101                .build(),
102        )
103        .build();
104
105    global::set_meter_provider(meter_provider);
106
107    Ok(())
108}
109
110pub fn create_otlp_tracing_layer() -> OtlpResult<OtlpTracingLayer> {
111    let config = OtlpConfig::from_config().ok_or("OTEL_EXPORTER_OTLP_ENDPOINT not configured")?;
112
113    let resource = Resource::new(vec![
114        KeyValue::new("service.name", "aster"),
115        KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
116        KeyValue::new("service.namespace", "aster"),
117    ]);
118
119    let exporter = opentelemetry_otlp::SpanExporter::builder()
120        .with_http()
121        .with_endpoint(&config.endpoint)
122        .with_timeout(config.timeout)
123        .build()?;
124
125    let tracer_provider = trace::TracerProvider::builder()
126        .with_batch_exporter(exporter, runtime::Tokio)
127        .with_max_events_per_span(2048)
128        .with_max_attributes_per_span(512)
129        .with_max_links_per_span(512)
130        .with_resource(resource)
131        .with_id_generator(RandomIdGenerator::default())
132        .with_sampler(Sampler::TraceIdRatioBased(0.1))
133        .build();
134
135    let tracer = tracer_provider.tracer("aster");
136    Ok(tracing_opentelemetry::layer().with_tracer(tracer))
137}
138
139pub fn create_otlp_metrics_layer() -> OtlpResult<OtlpMetricsLayer> {
140    let config = OtlpConfig::from_config().ok_or("OTEL_EXPORTER_OTLP_ENDPOINT not configured")?;
141
142    let resource = Resource::new(vec![
143        KeyValue::new("service.name", "aster"),
144        KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
145        KeyValue::new("service.namespace", "aster"),
146    ]);
147
148    let exporter = opentelemetry_otlp::MetricExporter::builder()
149        .with_http()
150        .with_endpoint(&config.endpoint)
151        .with_timeout(config.timeout)
152        .build()?;
153
154    let meter_provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder()
155        .with_resource(resource)
156        .with_reader(
157            opentelemetry_sdk::metrics::PeriodicReader::builder(exporter, runtime::Tokio)
158                .with_interval(Duration::from_millis(2000))
159                .build(),
160        )
161        .build();
162
163    global::set_meter_provider(meter_provider.clone());
164
165    Ok(tracing_opentelemetry::MetricsLayer::new(meter_provider))
166}
167
168pub fn create_otlp_logs_layer() -> OtlpResult<OtlpLogsLayer> {
169    let config = OtlpConfig::from_config().ok_or("OTEL_EXPORTER_OTLP_ENDPOINT not configured")?;
170
171    let resource = Resource::new(vec![
172        KeyValue::new("service.name", "aster"),
173        KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
174        KeyValue::new("service.namespace", "aster"),
175    ]);
176
177    let exporter = opentelemetry_otlp::LogExporter::builder()
178        .with_http()
179        .with_endpoint(&config.endpoint)
180        .with_timeout(config.timeout)
181        .build()?;
182
183    let logger_provider = LoggerProvider::builder()
184        .with_batch_exporter(exporter, runtime::Tokio)
185        .with_resource(resource)
186        .build();
187
188    Ok(OpenTelemetryTracingBridge::new(&logger_provider))
189}
190
191pub fn init_otlp() -> OtlpResult<OtlpLayers> {
192    let tracing_layer = create_otlp_tracing_layer()?;
193    let metrics_layer = create_otlp_metrics_layer()?;
194    let logs_layer = create_otlp_logs_layer()?;
195    Ok((tracing_layer, metrics_layer, logs_layer))
196}
197
198pub fn init_otlp_tracing_only() -> OtlpResult<OtlpTracingLayer> {
199    create_otlp_tracing_layer()
200}
201
202/// Creates a custom filter for OTLP tracing that captures:
203/// - All spans at INFO level and above
204/// - Specific spans marked with "otel.trace" field
205/// - Events from specific modules related to telemetry
206pub fn create_otlp_tracing_filter() -> FilterFn<impl Fn(&Metadata<'_>) -> bool> {
207    FilterFn::new(|metadata: &Metadata<'_>| {
208        if metadata.level() <= &Level::INFO {
209            return true;
210        }
211
212        if metadata.level() == &Level::DEBUG {
213            let target = metadata.target();
214            if target.starts_with("aster::")
215                || target.starts_with("opentelemetry")
216                || target.starts_with("tracing_opentelemetry")
217            {
218                return true;
219            }
220        }
221
222        false
223    })
224}
225
226/// Creates a custom filter for OTLP metrics that captures:
227/// - All events at INFO level and above
228/// - Specific events marked with "otel.metric" field
229/// - Events that should be converted to metrics
230pub fn create_otlp_metrics_filter() -> FilterFn<impl Fn(&Metadata<'_>) -> bool> {
231    FilterFn::new(|metadata: &Metadata<'_>| {
232        if metadata.level() <= &Level::INFO {
233            return true;
234        }
235
236        if metadata.level() == &Level::DEBUG {
237            let target = metadata.target();
238            if target.starts_with("aster::telemetry")
239                || target.starts_with("aster::metrics")
240                || target.contains("metric")
241            {
242                return true;
243            }
244        }
245
246        false
247    })
248}
249
250/// Creates a custom filter for OTLP metrics that captures:
251/// - All events at WARN level and above
252pub fn create_otlp_logs_filter() -> FilterFn<impl Fn(&Metadata<'_>) -> bool> {
253    FilterFn::new(|metadata: &Metadata<'_>| {
254        if metadata.level() <= &Level::WARN {
255            return true;
256        }
257
258        false
259    })
260}
261
262/// Shutdown OTLP providers gracefully
263pub fn shutdown_otlp() {
264    // Shutdown the tracer provider and flush any pending spans
265    global::shutdown_tracer_provider();
266
267    // Force flush of metrics by waiting a bit
268    // The meter provider doesn't have a direct shutdown method in the current SDK,
269    // but we can give it time to export any pending metrics
270    std::thread::sleep(std::time::Duration::from_millis(500));
271}
272
273#[cfg(test)]
274mod tests {
275    use super::*;
276    use std::env;
277
278    #[test]
279    fn test_otlp_config_default() {
280        let config = OtlpConfig::default();
281        assert_eq!(config.endpoint, "http://localhost:4318");
282        assert_eq!(config.timeout, Duration::from_secs(10));
283    }
284
285    #[test]
286    fn test_otlp_config_from_config() {
287        use tempfile::NamedTempFile;
288
289        // Save original env vars
290        let original_endpoint = env::var("OTEL_EXPORTER_OTLP_ENDPOINT").ok();
291        let original_timeout = env::var("OTEL_EXPORTER_OTLP_TIMEOUT").ok();
292
293        // Clear env vars to ensure we're testing config file
294        env::remove_var("OTEL_EXPORTER_OTLP_ENDPOINT");
295        env::remove_var("OTEL_EXPORTER_OTLP_TIMEOUT");
296
297        // Create a test config file
298        let temp_file = NamedTempFile::new().unwrap();
299        let test_config = crate::config::Config::new(temp_file.path(), "test-otlp").unwrap();
300
301        // Set values in config
302        test_config
303            .set_param("otel_exporter_otlp_endpoint", "http://config:4318")
304            .unwrap();
305        test_config
306            .set_param("otel_exporter_otlp_timeout", 3000)
307            .unwrap();
308
309        // Test that from_config reads from the config file
310        // Note: We can't easily test from_config() directly since it uses Config::global()
311        // But we can test that the config system works with our keys
312        let endpoint: String = test_config
313            .get_param("otel_exporter_otlp_endpoint")
314            .unwrap();
315        assert_eq!(endpoint, "http://config:4318");
316
317        let timeout: u64 = test_config.get_param("otel_exporter_otlp_timeout").unwrap();
318        assert_eq!(timeout, 3000);
319
320        // Test env var override still works
321        env::set_var("OTEL_EXPORTER_OTLP_ENDPOINT", "http://env:4317");
322        let endpoint: String = test_config
323            .get_param("otel_exporter_otlp_endpoint")
324            .unwrap();
325        assert_eq!(endpoint, "http://env:4317");
326
327        // Restore original env vars
328        match original_endpoint {
329            Some(val) => env::set_var("OTEL_EXPORTER_OTLP_ENDPOINT", val),
330            None => env::remove_var("OTEL_EXPORTER_OTLP_ENDPOINT"),
331        }
332        match original_timeout {
333            Some(val) => env::set_var("OTEL_EXPORTER_OTLP_TIMEOUT", val),
334            None => env::remove_var("OTEL_EXPORTER_OTLP_TIMEOUT"),
335        }
336    }
337}