forge_runtime/observability/
telemetry.rs1use opentelemetry::{KeyValue, global, trace::TracerProvider as _};
2use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
3use opentelemetry_otlp::{LogExporter, MetricExporter, SpanExporter, WithExportConfig};
4use opentelemetry_sdk::{
5 Resource,
6 logs::LoggerProvider,
7 metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider},
8 propagation::TraceContextPropagator,
9 resource::{EnvResourceDetector, SdkProvidedResourceDetector},
10 trace::{RandomIdGenerator, Sampler, TracerProvider},
11};
12use opentelemetry_semantic_conventions::resource::{SERVICE_NAME, SERVICE_VERSION};
13
14const DEPLOYMENT_ENVIRONMENT_NAME: &str = "deployment.environment.name";
15use std::sync::OnceLock;
16use thiserror::Error;
17use tracing_opentelemetry::OpenTelemetryLayer;
18use tracing_subscriber::{
19 EnvFilter, Layer, Registry,
20 filter::{FilterExt, FilterFn},
21 layer::SubscriberExt,
22 util::SubscriberInitExt,
23};
24
25static TRACER_PROVIDER: OnceLock<TracerProvider> = OnceLock::new();
26static METER_PROVIDER: OnceLock<SdkMeterProvider> = OnceLock::new();
27static LOGGER_PROVIDER: OnceLock<LoggerProvider> = OnceLock::new();
28
29#[derive(Debug, Error)]
30pub enum TelemetryError {
31 #[error("failed to initialize tracer: {0}")]
32 TracerInit(String),
33 #[error("failed to initialize meter: {0}")]
34 MeterInit(String),
35 #[error("failed to initialize logger: {0}")]
36 LoggerInit(String),
37 #[error("telemetry already initialized")]
38 AlreadyInitialized,
39 #[error("tracing subscriber init failed: {0}")]
40 SubscriberInit(String),
41}
42
43#[derive(Debug, Clone)]
44pub struct TelemetryConfig {
45 pub otlp_endpoint: String,
46 pub service_name: String,
47 pub service_version: String,
48 pub environment: String,
49 pub enable_traces: bool,
50 pub enable_metrics: bool,
51 pub enable_logs: bool,
52 pub sampling_ratio: f64,
53}
54
55impl Default for TelemetryConfig {
56 fn default() -> Self {
57 Self {
58 otlp_endpoint: "http://localhost:4318".to_string(),
59 service_name: "forge-service".to_string(),
60 service_version: "0.1.0".to_string(),
61 environment: "development".to_string(),
62 enable_traces: true,
63 enable_metrics: true,
64 enable_logs: true,
65 sampling_ratio: 1.0,
66 }
67 }
68}
69
70impl TelemetryConfig {
71 pub fn new(service_name: impl Into<String>) -> Self {
72 Self {
73 service_name: service_name.into(),
74 ..Default::default()
75 }
76 }
77
78 pub fn from_observability_config(
80 obs: &forge_core::config::ObservabilityConfig,
81 project_name: &str,
82 project_version: &str,
83 ) -> Self {
84 let otlp_enabled = obs.enabled;
87 Self {
88 otlp_endpoint: obs.otlp_endpoint.clone(),
89 service_name: obs
90 .service_name
91 .clone()
92 .unwrap_or_else(|| project_name.to_string()),
93 service_version: project_version.to_string(),
94 environment: "production".to_string(),
95 enable_traces: otlp_enabled && obs.enable_traces,
96 enable_metrics: otlp_enabled && obs.enable_metrics,
97 enable_logs: otlp_enabled && obs.enable_logs,
98 sampling_ratio: obs.sampling_ratio,
99 }
100 }
101
102 pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
103 self.otlp_endpoint = endpoint.into();
104 self
105 }
106
107 pub fn with_version(mut self, version: impl Into<String>) -> Self {
108 self.service_version = version.into();
109 self
110 }
111
112 pub fn with_environment(mut self, env: impl Into<String>) -> Self {
113 self.environment = env.into();
114 self
115 }
116
117 pub fn with_traces(mut self, enabled: bool) -> Self {
118 self.enable_traces = enabled;
119 self
120 }
121
122 pub fn with_metrics(mut self, enabled: bool) -> Self {
123 self.enable_metrics = enabled;
124 self
125 }
126
127 pub fn with_logs(mut self, enabled: bool) -> Self {
128 self.enable_logs = enabled;
129 self
130 }
131
132 pub fn with_sampling_ratio(mut self, ratio: f64) -> Self {
133 self.sampling_ratio = ratio.clamp(0.0, 1.0);
134 self
135 }
136}
137
138fn build_resource(config: &TelemetryConfig) -> Resource {
139 let base = Resource::from_detectors(
140 std::time::Duration::from_secs(5),
141 vec![
142 Box::new(SdkProvidedResourceDetector),
143 Box::new(EnvResourceDetector::new()),
144 ],
145 );
146
147 let custom = Resource::new(vec![
148 KeyValue::new(SERVICE_NAME, config.service_name.clone()),
149 KeyValue::new(SERVICE_VERSION, config.service_version.clone()),
150 KeyValue::new(DEPLOYMENT_ENVIRONMENT_NAME, config.environment.clone()),
151 ]);
152
153 base.merge(&custom)
154}
155
156fn init_tracer(config: &TelemetryConfig) -> Result<TracerProvider, TelemetryError> {
157 let exporter = SpanExporter::builder()
158 .with_http()
159 .with_endpoint(&config.otlp_endpoint)
160 .build()
161 .map_err(|e| TelemetryError::TracerInit(e.to_string()))?;
162
163 let sampler = if config.sampling_ratio >= 1.0 {
164 Sampler::AlwaysOn
165 } else if config.sampling_ratio <= 0.0 {
166 Sampler::AlwaysOff
167 } else {
168 Sampler::TraceIdRatioBased(config.sampling_ratio)
169 };
170
171 let provider = TracerProvider::builder()
172 .with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
173 .with_sampler(sampler)
174 .with_id_generator(RandomIdGenerator::default())
175 .with_resource(build_resource(config))
176 .build();
177
178 Ok(provider)
179}
180
181fn init_meter(config: &TelemetryConfig) -> Result<SdkMeterProvider, TelemetryError> {
182 let exporter = MetricExporter::builder()
183 .with_http()
184 .with_endpoint(&config.otlp_endpoint)
185 .build()
186 .map_err(|e| TelemetryError::MeterInit(e.to_string()))?;
187
188 let reader = PeriodicReader::builder(exporter, opentelemetry_sdk::runtime::Tokio).build();
189
190 let provider = MeterProviderBuilder::default()
191 .with_reader(reader)
192 .with_resource(build_resource(config))
193 .build();
194
195 Ok(provider)
196}
197
198fn init_logger(config: &TelemetryConfig) -> Result<LoggerProvider, TelemetryError> {
199 let exporter = LogExporter::builder()
200 .with_http()
201 .with_endpoint(&config.otlp_endpoint)
202 .build()
203 .map_err(|e| TelemetryError::LoggerInit(e.to_string()))?;
204
205 let provider = LoggerProvider::builder()
206 .with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
207 .with_resource(build_resource(config))
208 .build();
209
210 Ok(provider)
211}
212
213pub fn build_env_filter(project_name: &str, log_level: &str) -> EnvFilter {
218 build_console_filter(project_name, log_level)
219}
220
221fn build_console_filter(project_name: &str, log_level: &str) -> EnvFilter {
222 let crate_name = project_name.replace('-', "_");
223
224 let base = if let Ok(filter) = EnvFilter::try_from_default_env() {
225 filter
226 } else {
227 EnvFilter::new(log_level)
228 };
229
230 let directive = format!("{}={}", crate_name, log_level);
231 match directive.parse() {
232 Ok(d) => base.add_directive(d),
233 Err(_) => base,
234 }
235}
236
237pub fn init_telemetry(
240 config: &TelemetryConfig,
241 project_name: &str,
242 log_level: &str,
243) -> Result<bool, TelemetryError> {
244 global::set_text_map_propagator(TraceContextPropagator::new());
245
246 let fmt_layer = tracing_subscriber::fmt::layer()
249 .with_target(true)
250 .with_thread_ids(false)
251 .with_file(false)
252 .with_line_number(false)
253 .with_filter(build_console_filter(project_name, log_level));
254
255 let otel_trace_layer = if config.enable_traces {
257 let tracer_provider = init_tracer(config)?;
258 let tracer = tracer_provider.tracer(config.service_name.clone());
259
260 TRACER_PROVIDER
261 .set(tracer_provider.clone())
262 .map_err(|_| TelemetryError::AlreadyInitialized)?;
263
264 global::set_tracer_provider(tracer_provider);
265
266 Some(
267 OpenTelemetryLayer::new(tracer)
268 .with_filter(build_console_filter(project_name, log_level)),
269 )
270 } else {
271 None
272 };
273
274 let otel_log_layer = if config.enable_logs {
279 let logger_provider = init_logger(config)?;
280
281 let env_filter = build_console_filter(project_name, log_level);
282 let log_layer = OpenTelemetryTracingBridge::new(&logger_provider).with_filter(
283 env_filter.and(FilterFn::new(|metadata| {
284 let target = metadata.target();
285 !target.starts_with("hyper")
286 && !target.starts_with("reqwest")
287 && !target.starts_with("h2")
288 && !target.starts_with("tonic")
289 && !target.starts_with("tower")
290 && !target.starts_with("opentelemetry")
291 })),
292 );
293
294 LOGGER_PROVIDER
295 .set(logger_provider)
296 .map_err(|_| TelemetryError::AlreadyInitialized)?;
297
298 Some(log_layer)
299 } else {
300 None
301 };
302
303 if Registry::default()
305 .with(fmt_layer)
306 .with(otel_trace_layer)
307 .with(otel_log_layer)
308 .try_init()
309 .is_err()
310 {
311 return Ok(false);
312 }
313
314 if config.enable_metrics {
315 let meter_provider = init_meter(config)?;
316
317 METER_PROVIDER
318 .set(meter_provider.clone())
319 .map_err(|_| TelemetryError::AlreadyInitialized)?;
320
321 global::set_meter_provider(meter_provider);
322 }
323
324 tracing::info!(
325 service = %config.service_name,
326 version = %config.service_version,
327 environment = %config.environment,
328 traces = config.enable_traces,
329 metrics = config.enable_metrics,
330 logs = config.enable_logs,
331 "telemetry initialized"
332 );
333
334 Ok(true)
335}
336
337pub fn shutdown_telemetry() {
338 tracing::info!("shutting down telemetry");
339
340 if let Some(provider) = TRACER_PROVIDER.get()
341 && let Err(e) = provider.shutdown()
342 {
343 tracing::warn!(error = %e, "failed to shutdown tracer provider");
344 }
345
346 if let Some(provider) = METER_PROVIDER.get()
347 && let Err(e) = provider.shutdown()
348 {
349 tracing::warn!(error = %e, "failed to shutdown meter provider");
350 }
351
352 if let Some(provider) = LOGGER_PROVIDER.get()
353 && let Err(e) = provider.shutdown()
354 {
355 tracing::warn!(error = %e, "failed to shutdown logger provider");
356 }
357}
358
359#[cfg(test)]
360mod tests {
361 use super::*;
362
363 #[test]
364 fn test_config_builder() {
365 let config = TelemetryConfig::new("test-service")
366 .with_endpoint("http://otel:4318")
367 .with_version("1.0.0")
368 .with_environment("production")
369 .with_traces(true)
370 .with_metrics(false)
371 .with_logs(true);
372
373 assert_eq!(config.service_name, "test-service");
374 assert_eq!(config.otlp_endpoint, "http://otel:4318");
375 assert_eq!(config.service_version, "1.0.0");
376 assert_eq!(config.environment, "production");
377 assert!(config.enable_traces);
378 assert!(!config.enable_metrics);
379 assert!(config.enable_logs);
380 }
381
382 #[test]
383 fn test_default_config() {
384 let config = TelemetryConfig::default();
385
386 assert_eq!(config.otlp_endpoint, "http://localhost:4318");
387 assert_eq!(config.service_name, "forge-service");
388 assert_eq!(config.environment, "development");
389 assert!(config.enable_traces);
390 assert!(config.enable_metrics);
391 assert!(config.enable_logs);
392 }
393}