aster/tracing/
otlp_layer.rs1use opentelemetry::trace::TracerProvider;
2use opentelemetry::{global, KeyValue};
3use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
4use opentelemetry_otlp::WithExportConfig;
5use opentelemetry_sdk::logs::{Logger, LoggerProvider};
6use opentelemetry_sdk::trace::{self, RandomIdGenerator, Sampler};
7use opentelemetry_sdk::{runtime, Resource};
8use std::time::Duration;
9use tracing::{Level, Metadata};
10use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer};
11use tracing_subscriber::filter::FilterFn;
12
13pub type OtlpTracingLayer =
14 OpenTelemetryLayer<tracing_subscriber::Registry, opentelemetry_sdk::trace::Tracer>;
15pub type OtlpMetricsLayer = MetricsLayer<tracing_subscriber::Registry>;
16pub type OtlpLogsLayer = OpenTelemetryTracingBridge<LoggerProvider, Logger>;
17pub type OtlpLayers = (OtlpTracingLayer, OtlpMetricsLayer, OtlpLogsLayer);
18pub type OtlpResult<T> = Result<T, Box<dyn std::error::Error + Send + Sync>>;
19
20#[derive(Debug, Clone)]
21pub struct OtlpConfig {
22 pub endpoint: String,
23 pub timeout: Duration,
24}
25
26impl Default for OtlpConfig {
27 fn default() -> Self {
28 Self {
29 endpoint: "http://localhost:4318".to_string(),
30 timeout: Duration::from_secs(10),
31 }
32 }
33}
34
35impl OtlpConfig {
36 pub fn from_config() -> Option<Self> {
37 let config = crate::config::Config::global();
38
39 let endpoint = config
41 .get_param::<String>("otel_exporter_otlp_endpoint")
42 .ok()?;
43
44 let mut otlp_config = Self {
45 endpoint,
46 timeout: Duration::from_secs(10),
47 };
48
49 if let Ok(timeout_ms) = config.get_param::<u64>("otel_exporter_otlp_timeout") {
51 otlp_config.timeout = Duration::from_millis(timeout_ms);
52 }
53
54 Some(otlp_config)
55 }
56}
57
58pub fn init_otlp_tracing(config: &OtlpConfig) -> OtlpResult<()> {
59 let resource = Resource::new(vec![
60 KeyValue::new("service.name", "aster"),
61 KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
62 KeyValue::new("service.namespace", "aster"),
63 ]);
64
65 let exporter = opentelemetry_otlp::SpanExporter::builder()
66 .with_http()
67 .with_endpoint(&config.endpoint)
68 .with_timeout(config.timeout)
69 .build()?;
70
71 let tracer_provider = trace::TracerProvider::builder()
72 .with_batch_exporter(exporter, runtime::Tokio)
73 .with_resource(resource.clone())
74 .with_id_generator(RandomIdGenerator::default())
75 .with_sampler(Sampler::AlwaysOn)
76 .build();
77
78 global::set_tracer_provider(tracer_provider);
79
80 Ok(())
81}
82
83pub fn init_otlp_metrics(config: &OtlpConfig) -> OtlpResult<()> {
84 let resource = Resource::new(vec![
85 KeyValue::new("service.name", "aster"),
86 KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
87 KeyValue::new("service.namespace", "aster"),
88 ]);
89
90 let exporter = opentelemetry_otlp::MetricExporter::builder()
91 .with_http()
92 .with_endpoint(&config.endpoint)
93 .with_timeout(config.timeout)
94 .build()?;
95
96 let meter_provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder()
97 .with_resource(resource)
98 .with_reader(
99 opentelemetry_sdk::metrics::PeriodicReader::builder(exporter, runtime::Tokio)
100 .with_interval(Duration::from_secs(3))
101 .build(),
102 )
103 .build();
104
105 global::set_meter_provider(meter_provider);
106
107 Ok(())
108}
109
110pub fn create_otlp_tracing_layer() -> OtlpResult<OtlpTracingLayer> {
111 let config = OtlpConfig::from_config().ok_or("OTEL_EXPORTER_OTLP_ENDPOINT not configured")?;
112
113 let resource = Resource::new(vec![
114 KeyValue::new("service.name", "aster"),
115 KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
116 KeyValue::new("service.namespace", "aster"),
117 ]);
118
119 let exporter = opentelemetry_otlp::SpanExporter::builder()
120 .with_http()
121 .with_endpoint(&config.endpoint)
122 .with_timeout(config.timeout)
123 .build()?;
124
125 let tracer_provider = trace::TracerProvider::builder()
126 .with_batch_exporter(exporter, runtime::Tokio)
127 .with_max_events_per_span(2048)
128 .with_max_attributes_per_span(512)
129 .with_max_links_per_span(512)
130 .with_resource(resource)
131 .with_id_generator(RandomIdGenerator::default())
132 .with_sampler(Sampler::TraceIdRatioBased(0.1))
133 .build();
134
135 let tracer = tracer_provider.tracer("aster");
136 Ok(tracing_opentelemetry::layer().with_tracer(tracer))
137}
138
139pub fn create_otlp_metrics_layer() -> OtlpResult<OtlpMetricsLayer> {
140 let config = OtlpConfig::from_config().ok_or("OTEL_EXPORTER_OTLP_ENDPOINT not configured")?;
141
142 let resource = Resource::new(vec![
143 KeyValue::new("service.name", "aster"),
144 KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
145 KeyValue::new("service.namespace", "aster"),
146 ]);
147
148 let exporter = opentelemetry_otlp::MetricExporter::builder()
149 .with_http()
150 .with_endpoint(&config.endpoint)
151 .with_timeout(config.timeout)
152 .build()?;
153
154 let meter_provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder()
155 .with_resource(resource)
156 .with_reader(
157 opentelemetry_sdk::metrics::PeriodicReader::builder(exporter, runtime::Tokio)
158 .with_interval(Duration::from_millis(2000))
159 .build(),
160 )
161 .build();
162
163 global::set_meter_provider(meter_provider.clone());
164
165 Ok(tracing_opentelemetry::MetricsLayer::new(meter_provider))
166}
167
168pub fn create_otlp_logs_layer() -> OtlpResult<OtlpLogsLayer> {
169 let config = OtlpConfig::from_config().ok_or("OTEL_EXPORTER_OTLP_ENDPOINT not configured")?;
170
171 let resource = Resource::new(vec![
172 KeyValue::new("service.name", "aster"),
173 KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
174 KeyValue::new("service.namespace", "aster"),
175 ]);
176
177 let exporter = opentelemetry_otlp::LogExporter::builder()
178 .with_http()
179 .with_endpoint(&config.endpoint)
180 .with_timeout(config.timeout)
181 .build()?;
182
183 let logger_provider = LoggerProvider::builder()
184 .with_batch_exporter(exporter, runtime::Tokio)
185 .with_resource(resource)
186 .build();
187
188 Ok(OpenTelemetryTracingBridge::new(&logger_provider))
189}
190
191pub fn init_otlp() -> OtlpResult<OtlpLayers> {
192 let tracing_layer = create_otlp_tracing_layer()?;
193 let metrics_layer = create_otlp_metrics_layer()?;
194 let logs_layer = create_otlp_logs_layer()?;
195 Ok((tracing_layer, metrics_layer, logs_layer))
196}
197
198pub fn init_otlp_tracing_only() -> OtlpResult<OtlpTracingLayer> {
199 create_otlp_tracing_layer()
200}
201
202pub fn create_otlp_tracing_filter() -> FilterFn<impl Fn(&Metadata<'_>) -> bool> {
207 FilterFn::new(|metadata: &Metadata<'_>| {
208 if metadata.level() <= &Level::INFO {
209 return true;
210 }
211
212 if metadata.level() == &Level::DEBUG {
213 let target = metadata.target();
214 if target.starts_with("aster::")
215 || target.starts_with("opentelemetry")
216 || target.starts_with("tracing_opentelemetry")
217 {
218 return true;
219 }
220 }
221
222 false
223 })
224}
225
226pub fn create_otlp_metrics_filter() -> FilterFn<impl Fn(&Metadata<'_>) -> bool> {
231 FilterFn::new(|metadata: &Metadata<'_>| {
232 if metadata.level() <= &Level::INFO {
233 return true;
234 }
235
236 if metadata.level() == &Level::DEBUG {
237 let target = metadata.target();
238 if target.starts_with("aster::telemetry")
239 || target.starts_with("aster::metrics")
240 || target.contains("metric")
241 {
242 return true;
243 }
244 }
245
246 false
247 })
248}
249
250pub fn create_otlp_logs_filter() -> FilterFn<impl Fn(&Metadata<'_>) -> bool> {
253 FilterFn::new(|metadata: &Metadata<'_>| {
254 if metadata.level() <= &Level::WARN {
255 return true;
256 }
257
258 false
259 })
260}
261
262pub fn shutdown_otlp() {
264 global::shutdown_tracer_provider();
266
267 std::thread::sleep(std::time::Duration::from_millis(500));
271}
272
273#[cfg(test)]
274mod tests {
275 use super::*;
276 use std::env;
277
278 #[test]
279 fn test_otlp_config_default() {
280 let config = OtlpConfig::default();
281 assert_eq!(config.endpoint, "http://localhost:4318");
282 assert_eq!(config.timeout, Duration::from_secs(10));
283 }
284
285 #[test]
286 fn test_otlp_config_from_config() {
287 use tempfile::NamedTempFile;
288
289 let original_endpoint = env::var("OTEL_EXPORTER_OTLP_ENDPOINT").ok();
291 let original_timeout = env::var("OTEL_EXPORTER_OTLP_TIMEOUT").ok();
292
293 env::remove_var("OTEL_EXPORTER_OTLP_ENDPOINT");
295 env::remove_var("OTEL_EXPORTER_OTLP_TIMEOUT");
296
297 let temp_file = NamedTempFile::new().unwrap();
299 let test_config = crate::config::Config::new(temp_file.path(), "test-otlp").unwrap();
300
301 test_config
303 .set_param("otel_exporter_otlp_endpoint", "http://config:4318")
304 .unwrap();
305 test_config
306 .set_param("otel_exporter_otlp_timeout", 3000)
307 .unwrap();
308
309 let endpoint: String = test_config
313 .get_param("otel_exporter_otlp_endpoint")
314 .unwrap();
315 assert_eq!(endpoint, "http://config:4318");
316
317 let timeout: u64 = test_config.get_param("otel_exporter_otlp_timeout").unwrap();
318 assert_eq!(timeout, 3000);
319
320 env::set_var("OTEL_EXPORTER_OTLP_ENDPOINT", "http://env:4317");
322 let endpoint: String = test_config
323 .get_param("otel_exporter_otlp_endpoint")
324 .unwrap();
325 assert_eq!(endpoint, "http://env:4317");
326
327 match original_endpoint {
329 Some(val) => env::set_var("OTEL_EXPORTER_OTLP_ENDPOINT", val),
330 None => env::remove_var("OTEL_EXPORTER_OTLP_ENDPOINT"),
331 }
332 match original_timeout {
333 Some(val) => env::set_var("OTEL_EXPORTER_OTLP_TIMEOUT", val),
334 None => env::remove_var("OTEL_EXPORTER_OTLP_TIMEOUT"),
335 }
336 }
337}