forge_runtime/observability/
telemetry.rs1use opentelemetry::{KeyValue, global, trace::TracerProvider as _};
2use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
3use opentelemetry_otlp::{LogExporter, MetricExporter, SpanExporter, WithExportConfig};
4use opentelemetry_sdk::{
5 Resource,
6 logs::LoggerProvider,
7 metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider},
8 propagation::TraceContextPropagator,
9 resource::{EnvResourceDetector, SdkProvidedResourceDetector},
10 trace::{RandomIdGenerator, Sampler, TracerProvider},
11};
12use opentelemetry_semantic_conventions::resource::{SERVICE_NAME, SERVICE_VERSION};
13
14const DEPLOYMENT_ENVIRONMENT_NAME: &str = "deployment.environment.name";
15use std::sync::OnceLock;
16use thiserror::Error;
17use tracing_opentelemetry::OpenTelemetryLayer;
18use tracing_subscriber::{EnvFilter, Registry, layer::SubscriberExt, util::SubscriberInitExt};
19
20static TRACER_PROVIDER: OnceLock<TracerProvider> = OnceLock::new();
21static METER_PROVIDER: OnceLock<SdkMeterProvider> = OnceLock::new();
22static LOGGER_PROVIDER: OnceLock<LoggerProvider> = OnceLock::new();
23
24#[derive(Debug, Error)]
25pub enum TelemetryError {
26 #[error("failed to initialize tracer: {0}")]
27 TracerInit(String),
28 #[error("failed to initialize meter: {0}")]
29 MeterInit(String),
30 #[error("failed to initialize logger: {0}")]
31 LoggerInit(String),
32 #[error("telemetry already initialized")]
33 AlreadyInitialized,
34 #[error("tracing subscriber init failed: {0}")]
35 SubscriberInit(String),
36}
37
38#[derive(Debug, Clone)]
39pub struct TelemetryConfig {
40 pub otlp_endpoint: String,
41 pub service_name: String,
42 pub service_version: String,
43 pub environment: String,
44 pub enable_traces: bool,
45 pub enable_metrics: bool,
46 pub enable_logs: bool,
47 pub sampling_ratio: f64,
48}
49
50impl Default for TelemetryConfig {
51 fn default() -> Self {
52 Self {
53 otlp_endpoint: "http://localhost:4318".to_string(),
54 service_name: "forge-service".to_string(),
55 service_version: "0.1.0".to_string(),
56 environment: "development".to_string(),
57 enable_traces: true,
58 enable_metrics: true,
59 enable_logs: true,
60 sampling_ratio: 1.0,
61 }
62 }
63}
64
65impl TelemetryConfig {
66 pub fn new(service_name: impl Into<String>) -> Self {
67 Self {
68 service_name: service_name.into(),
69 ..Default::default()
70 }
71 }
72
73 pub fn from_observability_config(
75 obs: &forge_core::config::ObservabilityConfig,
76 project_name: &str,
77 project_version: &str,
78 ) -> Self {
79 let otlp_enabled = obs.enabled;
82 Self {
83 otlp_endpoint: obs.otlp_endpoint.clone(),
84 service_name: obs
85 .service_name
86 .clone()
87 .unwrap_or_else(|| project_name.to_string()),
88 service_version: project_version.to_string(),
89 environment: "production".to_string(),
90 enable_traces: otlp_enabled && obs.enable_traces,
91 enable_metrics: otlp_enabled && obs.enable_metrics,
92 enable_logs: otlp_enabled && obs.enable_logs,
93 sampling_ratio: obs.sampling_ratio,
94 }
95 }
96
97 pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
98 self.otlp_endpoint = endpoint.into();
99 self
100 }
101
102 pub fn with_version(mut self, version: impl Into<String>) -> Self {
103 self.service_version = version.into();
104 self
105 }
106
107 pub fn with_environment(mut self, env: impl Into<String>) -> Self {
108 self.environment = env.into();
109 self
110 }
111
112 pub fn with_traces(mut self, enabled: bool) -> Self {
113 self.enable_traces = enabled;
114 self
115 }
116
117 pub fn with_metrics(mut self, enabled: bool) -> Self {
118 self.enable_metrics = enabled;
119 self
120 }
121
122 pub fn with_logs(mut self, enabled: bool) -> Self {
123 self.enable_logs = enabled;
124 self
125 }
126
127 pub fn with_sampling_ratio(mut self, ratio: f64) -> Self {
128 self.sampling_ratio = ratio.clamp(0.0, 1.0);
129 self
130 }
131}
132
133fn build_resource(config: &TelemetryConfig) -> Resource {
134 let base = Resource::from_detectors(
135 std::time::Duration::from_secs(5),
136 vec![
137 Box::new(SdkProvidedResourceDetector),
138 Box::new(EnvResourceDetector::new()),
139 ],
140 );
141
142 let custom = Resource::new(vec![
143 KeyValue::new(SERVICE_NAME, config.service_name.clone()),
144 KeyValue::new(SERVICE_VERSION, config.service_version.clone()),
145 KeyValue::new(DEPLOYMENT_ENVIRONMENT_NAME, config.environment.clone()),
146 ]);
147
148 base.merge(&custom)
149}
150
151fn init_tracer(config: &TelemetryConfig) -> Result<TracerProvider, TelemetryError> {
152 let exporter = SpanExporter::builder()
153 .with_http()
154 .with_endpoint(&config.otlp_endpoint)
155 .build()
156 .map_err(|e| TelemetryError::TracerInit(e.to_string()))?;
157
158 let sampler = if config.sampling_ratio >= 1.0 {
159 Sampler::AlwaysOn
160 } else if config.sampling_ratio <= 0.0 {
161 Sampler::AlwaysOff
162 } else {
163 Sampler::TraceIdRatioBased(config.sampling_ratio)
164 };
165
166 let provider = TracerProvider::builder()
167 .with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
168 .with_sampler(sampler)
169 .with_id_generator(RandomIdGenerator::default())
170 .with_resource(build_resource(config))
171 .build();
172
173 Ok(provider)
174}
175
176fn init_meter(config: &TelemetryConfig) -> Result<SdkMeterProvider, TelemetryError> {
177 let exporter = MetricExporter::builder()
178 .with_http()
179 .with_endpoint(&config.otlp_endpoint)
180 .build()
181 .map_err(|e| TelemetryError::MeterInit(e.to_string()))?;
182
183 let reader = PeriodicReader::builder(exporter, opentelemetry_sdk::runtime::Tokio).build();
184
185 let provider = MeterProviderBuilder::default()
186 .with_reader(reader)
187 .with_resource(build_resource(config))
188 .build();
189
190 Ok(provider)
191}
192
193fn init_logger(config: &TelemetryConfig) -> Result<LoggerProvider, TelemetryError> {
194 let exporter = LogExporter::builder()
195 .with_http()
196 .with_endpoint(&config.otlp_endpoint)
197 .build()
198 .map_err(|e| TelemetryError::LoggerInit(e.to_string()))?;
199
200 let provider = LoggerProvider::builder()
201 .with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
202 .with_resource(build_resource(config))
203 .build();
204
205 Ok(provider)
206}
207
208pub fn build_env_filter(project_name: &str, log_level: &str) -> EnvFilter {
211 let crate_name = project_name.replace('-', "_");
212
213 let base = if let Ok(filter) = EnvFilter::try_from_default_env() {
214 filter
215 } else {
216 EnvFilter::new(log_level)
217 };
218
219 let directive = format!("{}={}", crate_name, log_level);
221 match directive.parse() {
222 Ok(d) => base.add_directive(d),
223 Err(_) => base,
224 }
225}
226
227pub fn init_telemetry(
230 config: &TelemetryConfig,
231 project_name: &str,
232 log_level: &str,
233) -> Result<bool, TelemetryError> {
234 global::set_text_map_propagator(TraceContextPropagator::new());
235
236 let env_filter = build_env_filter(project_name, log_level);
237
238 let fmt_layer = tracing_subscriber::fmt::layer()
239 .with_target(true)
240 .with_thread_ids(false)
241 .with_file(false)
242 .with_line_number(false);
243
244 let otel_trace_layer = if config.enable_traces {
246 let tracer_provider = init_tracer(config)?;
247 let tracer = tracer_provider.tracer(config.service_name.clone());
248
249 TRACER_PROVIDER
250 .set(tracer_provider.clone())
251 .map_err(|_| TelemetryError::AlreadyInitialized)?;
252
253 global::set_tracer_provider(tracer_provider);
254
255 Some(OpenTelemetryLayer::new(tracer))
256 } else {
257 None
258 };
259
260 let otel_log_layer = if config.enable_logs {
262 let logger_provider = init_logger(config)?;
263
264 let log_layer = OpenTelemetryTracingBridge::new(&logger_provider);
265
266 LOGGER_PROVIDER
267 .set(logger_provider)
268 .map_err(|_| TelemetryError::AlreadyInitialized)?;
269
270 Some(log_layer)
271 } else {
272 None
273 };
274
275 if Registry::default()
277 .with(env_filter)
278 .with(fmt_layer)
279 .with(otel_trace_layer)
280 .with(otel_log_layer)
281 .try_init()
282 .is_err()
283 {
284 return Ok(false);
285 }
286
287 if config.enable_metrics {
288 let meter_provider = init_meter(config)?;
289
290 METER_PROVIDER
291 .set(meter_provider.clone())
292 .map_err(|_| TelemetryError::AlreadyInitialized)?;
293
294 global::set_meter_provider(meter_provider);
295 }
296
297 tracing::info!(
298 service = %config.service_name,
299 version = %config.service_version,
300 environment = %config.environment,
301 traces = config.enable_traces,
302 metrics = config.enable_metrics,
303 logs = config.enable_logs,
304 "telemetry initialized"
305 );
306
307 Ok(true)
308}
309
310pub fn shutdown_telemetry() {
311 tracing::info!("shutting down telemetry");
312
313 if let Some(provider) = TRACER_PROVIDER.get()
314 && let Err(e) = provider.shutdown()
315 {
316 tracing::warn!(error = %e, "failed to shutdown tracer provider");
317 }
318
319 if let Some(provider) = METER_PROVIDER.get()
320 && let Err(e) = provider.shutdown()
321 {
322 tracing::warn!(error = %e, "failed to shutdown meter provider");
323 }
324
325 if let Some(provider) = LOGGER_PROVIDER.get()
326 && let Err(e) = provider.shutdown()
327 {
328 tracing::warn!(error = %e, "failed to shutdown logger provider");
329 }
330}
331
332#[cfg(test)]
333mod tests {
334 use super::*;
335
336 #[test]
337 fn test_config_builder() {
338 let config = TelemetryConfig::new("test-service")
339 .with_endpoint("http://otel:4318")
340 .with_version("1.0.0")
341 .with_environment("production")
342 .with_traces(true)
343 .with_metrics(false)
344 .with_logs(true);
345
346 assert_eq!(config.service_name, "test-service");
347 assert_eq!(config.otlp_endpoint, "http://otel:4318");
348 assert_eq!(config.service_version, "1.0.0");
349 assert_eq!(config.environment, "production");
350 assert!(config.enable_traces);
351 assert!(!config.enable_metrics);
352 assert!(config.enable_logs);
353 }
354
355 #[test]
356 fn test_default_config() {
357 let config = TelemetryConfig::default();
358
359 assert_eq!(config.otlp_endpoint, "http://localhost:4318");
360 assert_eq!(config.service_name, "forge-service");
361 assert_eq!(config.environment, "development");
362 assert!(config.enable_traces);
363 assert!(config.enable_metrics);
364 assert!(config.enable_logs);
365 }
366}