forge_runtime/observability/
telemetry.rs1use opentelemetry::{KeyValue, global, trace::TracerProvider as _};
2use opentelemetry_otlp::{LogExporter, MetricExporter, SpanExporter, WithExportConfig};
3use opentelemetry_sdk::{
4 Resource,
5 logs::LoggerProvider,
6 metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider},
7 propagation::TraceContextPropagator,
8 resource::{EnvResourceDetector, SdkProvidedResourceDetector},
9 trace::{RandomIdGenerator, Sampler, TracerProvider},
10};
11use opentelemetry_semantic_conventions::resource::{SERVICE_NAME, SERVICE_VERSION};
12
13const DEPLOYMENT_ENVIRONMENT_NAME: &str = "deployment.environment.name";
14use std::sync::OnceLock;
15use thiserror::Error;
16use tracing_opentelemetry::OpenTelemetryLayer;
17use tracing_subscriber::{EnvFilter, Registry, layer::SubscriberExt, util::SubscriberInitExt};
18
19static TRACER_PROVIDER: OnceLock<TracerProvider> = OnceLock::new();
20static METER_PROVIDER: OnceLock<SdkMeterProvider> = OnceLock::new();
21static LOGGER_PROVIDER: OnceLock<LoggerProvider> = OnceLock::new();
22
23#[derive(Debug, Error)]
24pub enum TelemetryError {
25 #[error("failed to initialize tracer: {0}")]
26 TracerInit(String),
27 #[error("failed to initialize meter: {0}")]
28 MeterInit(String),
29 #[error("failed to initialize logger: {0}")]
30 LoggerInit(String),
31 #[error("telemetry already initialized")]
32 AlreadyInitialized,
33 #[error("tracing subscriber init failed: {0}")]
34 SubscriberInit(String),
35}
36
37#[derive(Debug, Clone)]
38pub struct TelemetryConfig {
39 pub otlp_endpoint: String,
40 pub service_name: String,
41 pub service_version: String,
42 pub environment: String,
43 pub enable_traces: bool,
44 pub enable_metrics: bool,
45 pub enable_logs: bool,
46 pub sampling_ratio: f64,
47}
48
49impl Default for TelemetryConfig {
50 fn default() -> Self {
51 Self {
52 otlp_endpoint: "http://localhost:4318".to_string(),
53 service_name: "forge-service".to_string(),
54 service_version: "0.1.0".to_string(),
55 environment: "development".to_string(),
56 enable_traces: true,
57 enable_metrics: true,
58 enable_logs: true,
59 sampling_ratio: 1.0,
60 }
61 }
62}
63
64impl TelemetryConfig {
65 pub fn new(service_name: impl Into<String>) -> Self {
66 Self {
67 service_name: service_name.into(),
68 ..Default::default()
69 }
70 }
71
72 pub fn from_observability_config(
74 obs: &forge_core::config::ObservabilityConfig,
75 project_name: &str,
76 project_version: &str,
77 ) -> Self {
78 let otlp_enabled = obs.enabled;
81 Self {
82 otlp_endpoint: obs.otlp_endpoint.clone(),
83 service_name: obs
84 .service_name
85 .clone()
86 .unwrap_or_else(|| project_name.to_string()),
87 service_version: project_version.to_string(),
88 environment: "production".to_string(),
89 enable_traces: otlp_enabled && obs.enable_traces,
90 enable_metrics: otlp_enabled && obs.enable_metrics,
91 enable_logs: otlp_enabled && obs.enable_logs,
92 sampling_ratio: obs.sampling_ratio,
93 }
94 }
95
96 pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
97 self.otlp_endpoint = endpoint.into();
98 self
99 }
100
101 pub fn with_version(mut self, version: impl Into<String>) -> Self {
102 self.service_version = version.into();
103 self
104 }
105
106 pub fn with_environment(mut self, env: impl Into<String>) -> Self {
107 self.environment = env.into();
108 self
109 }
110
111 pub fn with_traces(mut self, enabled: bool) -> Self {
112 self.enable_traces = enabled;
113 self
114 }
115
116 pub fn with_metrics(mut self, enabled: bool) -> Self {
117 self.enable_metrics = enabled;
118 self
119 }
120
121 pub fn with_logs(mut self, enabled: bool) -> Self {
122 self.enable_logs = enabled;
123 self
124 }
125
126 pub fn with_sampling_ratio(mut self, ratio: f64) -> Self {
127 self.sampling_ratio = ratio.clamp(0.0, 1.0);
128 self
129 }
130}
131
132fn build_resource(config: &TelemetryConfig) -> Resource {
133 let base = Resource::from_detectors(
134 std::time::Duration::from_secs(5),
135 vec![
136 Box::new(SdkProvidedResourceDetector),
137 Box::new(EnvResourceDetector::new()),
138 ],
139 );
140
141 let custom = Resource::new(vec![
142 KeyValue::new(SERVICE_NAME, config.service_name.clone()),
143 KeyValue::new(SERVICE_VERSION, config.service_version.clone()),
144 KeyValue::new(DEPLOYMENT_ENVIRONMENT_NAME, config.environment.clone()),
145 ]);
146
147 base.merge(&custom)
148}
149
150fn init_tracer(config: &TelemetryConfig) -> Result<TracerProvider, TelemetryError> {
151 let exporter = SpanExporter::builder()
152 .with_http()
153 .with_endpoint(&config.otlp_endpoint)
154 .build()
155 .map_err(|e| TelemetryError::TracerInit(e.to_string()))?;
156
157 let sampler = if config.sampling_ratio >= 1.0 {
158 Sampler::AlwaysOn
159 } else if config.sampling_ratio <= 0.0 {
160 Sampler::AlwaysOff
161 } else {
162 Sampler::TraceIdRatioBased(config.sampling_ratio)
163 };
164
165 let provider = TracerProvider::builder()
166 .with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
167 .with_sampler(sampler)
168 .with_id_generator(RandomIdGenerator::default())
169 .with_resource(build_resource(config))
170 .build();
171
172 Ok(provider)
173}
174
175fn init_meter(config: &TelemetryConfig) -> Result<SdkMeterProvider, TelemetryError> {
176 let exporter = MetricExporter::builder()
177 .with_http()
178 .with_endpoint(&config.otlp_endpoint)
179 .build()
180 .map_err(|e| TelemetryError::MeterInit(e.to_string()))?;
181
182 let reader = PeriodicReader::builder(exporter, opentelemetry_sdk::runtime::Tokio).build();
183
184 let provider = MeterProviderBuilder::default()
185 .with_reader(reader)
186 .with_resource(build_resource(config))
187 .build();
188
189 Ok(provider)
190}
191
192fn init_logger(config: &TelemetryConfig) -> Result<LoggerProvider, TelemetryError> {
193 let exporter = LogExporter::builder()
194 .with_http()
195 .with_endpoint(&config.otlp_endpoint)
196 .build()
197 .map_err(|e| TelemetryError::LoggerInit(e.to_string()))?;
198
199 let provider = LoggerProvider::builder()
200 .with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
201 .with_resource(build_resource(config))
202 .build();
203
204 Ok(provider)
205}
206
207pub fn build_env_filter(project_name: &str, log_level: &str) -> EnvFilter {
210 let crate_name = project_name.replace('-', "_");
211
212 let base = if let Ok(filter) = EnvFilter::try_from_default_env() {
213 filter
214 } else {
215 EnvFilter::new(log_level)
216 };
217
218 let directive = format!("{}={}", crate_name, log_level);
220 match directive.parse() {
221 Ok(d) => base.add_directive(d),
222 Err(_) => base,
223 }
224}
225
226pub fn init_telemetry(
229 config: &TelemetryConfig,
230 project_name: &str,
231 log_level: &str,
232) -> Result<bool, TelemetryError> {
233 global::set_text_map_propagator(TraceContextPropagator::new());
234
235 let env_filter = build_env_filter(project_name, log_level);
236
237 let registry = Registry::default().with(env_filter);
238
239 if config.enable_traces {
240 let tracer_provider = init_tracer(config)?;
241 let tracer = tracer_provider.tracer(config.service_name.clone());
242
243 TRACER_PROVIDER
244 .set(tracer_provider.clone())
245 .map_err(|_| TelemetryError::AlreadyInitialized)?;
246
247 global::set_tracer_provider(tracer_provider);
248
249 let otel_layer = OpenTelemetryLayer::new(tracer);
250 let fmt_layer = tracing_subscriber::fmt::layer()
251 .with_target(true)
252 .with_thread_ids(false)
253 .with_file(false)
254 .with_line_number(false);
255
256 if registry
257 .with(otel_layer)
258 .with(fmt_layer)
259 .try_init()
260 .is_err()
261 {
262 return Ok(false);
263 }
264 } else {
265 let fmt_layer = tracing_subscriber::fmt::layer()
266 .with_target(true)
267 .with_thread_ids(false)
268 .with_file(false)
269 .with_line_number(false);
270
271 if registry.with(fmt_layer).try_init().is_err() {
272 return Ok(false);
273 }
274 }
275
276 if config.enable_metrics {
277 let meter_provider = init_meter(config)?;
278
279 METER_PROVIDER
280 .set(meter_provider.clone())
281 .map_err(|_| TelemetryError::AlreadyInitialized)?;
282
283 global::set_meter_provider(meter_provider);
284 }
285
286 if config.enable_logs {
287 let logger_provider = init_logger(config)?;
288
289 LOGGER_PROVIDER
290 .set(logger_provider)
291 .map_err(|_| TelemetryError::AlreadyInitialized)?;
292 }
293
294 tracing::info!(
295 service = %config.service_name,
296 version = %config.service_version,
297 environment = %config.environment,
298 traces = config.enable_traces,
299 metrics = config.enable_metrics,
300 logs = config.enable_logs,
301 "telemetry initialized"
302 );
303
304 Ok(true)
305}
306
307pub fn shutdown_telemetry() {
308 tracing::info!("shutting down telemetry");
309
310 if let Some(provider) = TRACER_PROVIDER.get()
311 && let Err(e) = provider.shutdown()
312 {
313 tracing::warn!(error = %e, "failed to shutdown tracer provider");
314 }
315
316 if let Some(provider) = METER_PROVIDER.get()
317 && let Err(e) = provider.shutdown()
318 {
319 tracing::warn!(error = %e, "failed to shutdown meter provider");
320 }
321
322 if let Some(provider) = LOGGER_PROVIDER.get()
323 && let Err(e) = provider.shutdown()
324 {
325 tracing::warn!(error = %e, "failed to shutdown logger provider");
326 }
327}
328
329#[cfg(test)]
330mod tests {
331 use super::*;
332
333 #[test]
334 fn test_config_builder() {
335 let config = TelemetryConfig::new("test-service")
336 .with_endpoint("http://otel:4318")
337 .with_version("1.0.0")
338 .with_environment("production")
339 .with_traces(true)
340 .with_metrics(false)
341 .with_logs(true);
342
343 assert_eq!(config.service_name, "test-service");
344 assert_eq!(config.otlp_endpoint, "http://otel:4318");
345 assert_eq!(config.service_version, "1.0.0");
346 assert_eq!(config.environment, "production");
347 assert!(config.enable_traces);
348 assert!(!config.enable_metrics);
349 assert!(config.enable_logs);
350 }
351
352 #[test]
353 fn test_default_config() {
354 let config = TelemetryConfig::default();
355
356 assert_eq!(config.otlp_endpoint, "http://localhost:4318");
357 assert_eq!(config.service_name, "forge-service");
358 assert_eq!(config.environment, "development");
359 assert!(config.enable_traces);
360 assert!(config.enable_metrics);
361 assert!(config.enable_logs);
362 }
363}