forge_runtime/observability/
telemetry.rs1use opentelemetry::{KeyValue, global, trace::TracerProvider as _};
2use opentelemetry_otlp::{LogExporter, MetricExporter, SpanExporter, WithExportConfig};
3use opentelemetry_sdk::{
4 Resource,
5 logs::LoggerProvider,
6 metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider},
7 propagation::TraceContextPropagator,
8 resource::{EnvResourceDetector, SdkProvidedResourceDetector},
9 trace::{RandomIdGenerator, Sampler, TracerProvider},
10};
11use opentelemetry_semantic_conventions::resource::{SERVICE_NAME, SERVICE_VERSION};
12
13const DEPLOYMENT_ENVIRONMENT_NAME: &str = "deployment.environment.name";
14use std::sync::OnceLock;
15use thiserror::Error;
16use tracing_opentelemetry::OpenTelemetryLayer;
17use tracing_subscriber::{EnvFilter, Registry, layer::SubscriberExt, util::SubscriberInitExt};
18
19static TRACER_PROVIDER: OnceLock<TracerProvider> = OnceLock::new();
20static METER_PROVIDER: OnceLock<SdkMeterProvider> = OnceLock::new();
21static LOGGER_PROVIDER: OnceLock<LoggerProvider> = OnceLock::new();
22
23#[derive(Debug, Error)]
24pub enum TelemetryError {
25 #[error("failed to initialize tracer: {0}")]
26 TracerInit(String),
27 #[error("failed to initialize meter: {0}")]
28 MeterInit(String),
29 #[error("failed to initialize logger: {0}")]
30 LoggerInit(String),
31 #[error("telemetry already initialized")]
32 AlreadyInitialized,
33 #[error("tracing subscriber init failed: {0}")]
34 SubscriberInit(String),
35}
36
37#[derive(Debug, Clone)]
38pub struct TelemetryConfig {
39 pub otlp_endpoint: String,
40 pub service_name: String,
41 pub service_version: String,
42 pub environment: String,
43 pub enable_traces: bool,
44 pub enable_metrics: bool,
45 pub enable_logs: bool,
46 pub sampling_ratio: f64,
47}
48
49impl Default for TelemetryConfig {
50 fn default() -> Self {
51 Self {
52 otlp_endpoint: "http://localhost:4317".to_string(),
53 service_name: "forge-service".to_string(),
54 service_version: "0.1.0".to_string(),
55 environment: "development".to_string(),
56 enable_traces: true,
57 enable_metrics: true,
58 enable_logs: true,
59 sampling_ratio: 1.0,
60 }
61 }
62}
63
64impl TelemetryConfig {
65 pub fn new(service_name: impl Into<String>) -> Self {
66 Self {
67 service_name: service_name.into(),
68 ..Default::default()
69 }
70 }
71
72 pub fn from_observability_config(
74 obs: &forge_core::config::ObservabilityConfig,
75 project_name: &str,
76 project_version: &str,
77 ) -> Self {
78 Self {
79 otlp_endpoint: obs.otlp_endpoint.clone(),
80 service_name: obs
81 .service_name
82 .clone()
83 .unwrap_or_else(|| project_name.to_string()),
84 service_version: project_version.to_string(),
85 environment: "production".to_string(),
86 enable_traces: obs.enable_traces,
87 enable_metrics: obs.enable_metrics,
88 enable_logs: obs.enable_logs,
89 sampling_ratio: obs.sampling_ratio,
90 }
91 }
92
93 pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
94 self.otlp_endpoint = endpoint.into();
95 self
96 }
97
98 pub fn with_version(mut self, version: impl Into<String>) -> Self {
99 self.service_version = version.into();
100 self
101 }
102
103 pub fn with_environment(mut self, env: impl Into<String>) -> Self {
104 self.environment = env.into();
105 self
106 }
107
108 pub fn with_traces(mut self, enabled: bool) -> Self {
109 self.enable_traces = enabled;
110 self
111 }
112
113 pub fn with_metrics(mut self, enabled: bool) -> Self {
114 self.enable_metrics = enabled;
115 self
116 }
117
118 pub fn with_logs(mut self, enabled: bool) -> Self {
119 self.enable_logs = enabled;
120 self
121 }
122
123 pub fn with_sampling_ratio(mut self, ratio: f64) -> Self {
124 self.sampling_ratio = ratio.clamp(0.0, 1.0);
125 self
126 }
127}
128
129fn build_resource(config: &TelemetryConfig) -> Resource {
130 let base = Resource::from_detectors(
131 std::time::Duration::from_secs(5),
132 vec![
133 Box::new(SdkProvidedResourceDetector),
134 Box::new(EnvResourceDetector::new()),
135 ],
136 );
137
138 let custom = Resource::new(vec![
139 KeyValue::new(SERVICE_NAME, config.service_name.clone()),
140 KeyValue::new(SERVICE_VERSION, config.service_version.clone()),
141 KeyValue::new(DEPLOYMENT_ENVIRONMENT_NAME, config.environment.clone()),
142 ]);
143
144 base.merge(&custom)
145}
146
147fn init_tracer(config: &TelemetryConfig) -> Result<TracerProvider, TelemetryError> {
148 let exporter = SpanExporter::builder()
149 .with_tonic()
150 .with_endpoint(&config.otlp_endpoint)
151 .build()
152 .map_err(|e| TelemetryError::TracerInit(e.to_string()))?;
153
154 let sampler = if config.sampling_ratio >= 1.0 {
155 Sampler::AlwaysOn
156 } else if config.sampling_ratio <= 0.0 {
157 Sampler::AlwaysOff
158 } else {
159 Sampler::TraceIdRatioBased(config.sampling_ratio)
160 };
161
162 let provider = TracerProvider::builder()
163 .with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
164 .with_sampler(sampler)
165 .with_id_generator(RandomIdGenerator::default())
166 .with_resource(build_resource(config))
167 .build();
168
169 Ok(provider)
170}
171
172fn init_meter(config: &TelemetryConfig) -> Result<SdkMeterProvider, TelemetryError> {
173 let exporter = MetricExporter::builder()
174 .with_tonic()
175 .with_endpoint(&config.otlp_endpoint)
176 .build()
177 .map_err(|e| TelemetryError::MeterInit(e.to_string()))?;
178
179 let reader = PeriodicReader::builder(exporter, opentelemetry_sdk::runtime::Tokio).build();
180
181 let provider = MeterProviderBuilder::default()
182 .with_reader(reader)
183 .with_resource(build_resource(config))
184 .build();
185
186 Ok(provider)
187}
188
189fn init_logger(config: &TelemetryConfig) -> Result<LoggerProvider, TelemetryError> {
190 let exporter = LogExporter::builder()
191 .with_tonic()
192 .with_endpoint(&config.otlp_endpoint)
193 .build()
194 .map_err(|e| TelemetryError::LoggerInit(e.to_string()))?;
195
196 let provider = LoggerProvider::builder()
197 .with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
198 .with_resource(build_resource(config))
199 .build();
200
201 Ok(provider)
202}
203
204pub fn build_env_filter(project_name: &str, log_level: &str) -> EnvFilter {
207 let crate_name = project_name.replace('-', "_");
208
209 let base = if let Ok(filter) = EnvFilter::try_from_default_env() {
210 filter
211 } else {
212 EnvFilter::new(log_level)
213 };
214
215 let directive = format!("{}={}", crate_name, log_level);
217 match directive.parse() {
218 Ok(d) => base.add_directive(d),
219 Err(_) => base,
220 }
221}
222
223pub fn init_telemetry(
226 config: &TelemetryConfig,
227 project_name: &str,
228 log_level: &str,
229) -> Result<bool, TelemetryError> {
230 global::set_text_map_propagator(TraceContextPropagator::new());
231
232 let env_filter = build_env_filter(project_name, log_level);
233
234 let registry = Registry::default().with(env_filter);
235
236 if config.enable_traces {
237 let tracer_provider = init_tracer(config)?;
238 let tracer = tracer_provider.tracer(config.service_name.clone());
239
240 TRACER_PROVIDER
241 .set(tracer_provider.clone())
242 .map_err(|_| TelemetryError::AlreadyInitialized)?;
243
244 global::set_tracer_provider(tracer_provider);
245
246 let otel_layer = OpenTelemetryLayer::new(tracer);
247 let fmt_layer = tracing_subscriber::fmt::layer()
248 .with_target(true)
249 .with_thread_ids(false)
250 .with_file(false)
251 .with_line_number(false);
252
253 if registry
254 .with(otel_layer)
255 .with(fmt_layer)
256 .try_init()
257 .is_err()
258 {
259 return Ok(false);
260 }
261 } else {
262 let fmt_layer = tracing_subscriber::fmt::layer()
263 .with_target(true)
264 .with_thread_ids(false)
265 .with_file(false)
266 .with_line_number(false);
267
268 if registry.with(fmt_layer).try_init().is_err() {
269 return Ok(false);
270 }
271 }
272
273 if config.enable_metrics {
274 let meter_provider = init_meter(config)?;
275
276 METER_PROVIDER
277 .set(meter_provider.clone())
278 .map_err(|_| TelemetryError::AlreadyInitialized)?;
279
280 global::set_meter_provider(meter_provider);
281 }
282
283 if config.enable_logs {
284 let logger_provider = init_logger(config)?;
285
286 LOGGER_PROVIDER
287 .set(logger_provider)
288 .map_err(|_| TelemetryError::AlreadyInitialized)?;
289 }
290
291 tracing::info!(
292 service = %config.service_name,
293 version = %config.service_version,
294 environment = %config.environment,
295 traces = config.enable_traces,
296 metrics = config.enable_metrics,
297 logs = config.enable_logs,
298 "telemetry initialized"
299 );
300
301 Ok(true)
302}
303
304pub fn shutdown_telemetry() {
305 tracing::info!("shutting down telemetry");
306
307 if let Some(provider) = TRACER_PROVIDER.get()
308 && let Err(e) = provider.shutdown()
309 {
310 tracing::warn!(error = %e, "failed to shutdown tracer provider");
311 }
312
313 if let Some(provider) = METER_PROVIDER.get()
314 && let Err(e) = provider.shutdown()
315 {
316 tracing::warn!(error = %e, "failed to shutdown meter provider");
317 }
318
319 if let Some(provider) = LOGGER_PROVIDER.get()
320 && let Err(e) = provider.shutdown()
321 {
322 tracing::warn!(error = %e, "failed to shutdown logger provider");
323 }
324}
325
326#[cfg(test)]
327mod tests {
328 use super::*;
329
330 #[test]
331 fn test_config_builder() {
332 let config = TelemetryConfig::new("test-service")
333 .with_endpoint("http://otel:4317")
334 .with_version("1.0.0")
335 .with_environment("production")
336 .with_traces(true)
337 .with_metrics(false)
338 .with_logs(true);
339
340 assert_eq!(config.service_name, "test-service");
341 assert_eq!(config.otlp_endpoint, "http://otel:4317");
342 assert_eq!(config.service_version, "1.0.0");
343 assert_eq!(config.environment, "production");
344 assert!(config.enable_traces);
345 assert!(!config.enable_metrics);
346 assert!(config.enable_logs);
347 }
348
349 #[test]
350 fn test_default_config() {
351 let config = TelemetryConfig::default();
352
353 assert_eq!(config.otlp_endpoint, "http://localhost:4317");
354 assert_eq!(config.service_name, "forge-service");
355 assert_eq!(config.environment, "development");
356 assert!(config.enable_traces);
357 assert!(config.enable_metrics);
358 assert!(config.enable_logs);
359 }
360}