forge_runtime/observability/
telemetry.rs1use opentelemetry::{KeyValue, global, trace::TracerProvider as _};
2use opentelemetry_otlp::{LogExporter, MetricExporter, SpanExporter, WithExportConfig};
3use opentelemetry_sdk::{
4 Resource,
5 logs::LoggerProvider,
6 metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider},
7 propagation::TraceContextPropagator,
8 resource::{EnvResourceDetector, SdkProvidedResourceDetector},
9 trace::{RandomIdGenerator, Sampler, TracerProvider},
10};
11use opentelemetry_semantic_conventions::resource::{SERVICE_NAME, SERVICE_VERSION};
12
13const DEPLOYMENT_ENVIRONMENT_NAME: &str = "deployment.environment.name";
14use std::sync::OnceLock;
15use thiserror::Error;
16use tracing_opentelemetry::OpenTelemetryLayer;
17use tracing_subscriber::{EnvFilter, Registry, layer::SubscriberExt, util::SubscriberInitExt};
18
19static TRACER_PROVIDER: OnceLock<TracerProvider> = OnceLock::new();
20static METER_PROVIDER: OnceLock<SdkMeterProvider> = OnceLock::new();
21static LOGGER_PROVIDER: OnceLock<LoggerProvider> = OnceLock::new();
22
23#[derive(Debug, Error)]
24pub enum TelemetryError {
25 #[error("failed to initialize tracer: {0}")]
26 TracerInit(String),
27 #[error("failed to initialize meter: {0}")]
28 MeterInit(String),
29 #[error("failed to initialize logger: {0}")]
30 LoggerInit(String),
31 #[error("telemetry already initialized")]
32 AlreadyInitialized,
33 #[error("tracing subscriber init failed: {0}")]
34 SubscriberInit(String),
35}
36
37#[derive(Debug, Clone)]
38pub struct TelemetryConfig {
39 pub otlp_endpoint: String,
40 pub service_name: String,
41 pub service_version: String,
42 pub environment: String,
43 pub enable_traces: bool,
44 pub enable_metrics: bool,
45 pub enable_logs: bool,
46 pub sampling_ratio: f64,
47}
48
49impl Default for TelemetryConfig {
50 fn default() -> Self {
51 Self {
52 otlp_endpoint: "http://localhost:4317".to_string(),
53 service_name: "forge-service".to_string(),
54 service_version: "0.1.0".to_string(),
55 environment: "development".to_string(),
56 enable_traces: true,
57 enable_metrics: true,
58 enable_logs: true,
59 sampling_ratio: 1.0,
60 }
61 }
62}
63
64impl TelemetryConfig {
65 pub fn new(service_name: impl Into<String>) -> Self {
66 Self {
67 service_name: service_name.into(),
68 ..Default::default()
69 }
70 }
71
72 pub fn from_observability_config(
74 obs: &forge_core::config::ObservabilityConfig,
75 project_name: &str,
76 project_version: &str,
77 ) -> Self {
78 Self {
79 otlp_endpoint: obs.otlp_endpoint.clone(),
80 service_name: obs
81 .service_name
82 .clone()
83 .unwrap_or_else(|| project_name.to_string()),
84 service_version: project_version.to_string(),
85 environment: "production".to_string(),
86 enable_traces: obs.enable_traces,
87 enable_metrics: obs.enable_metrics,
88 enable_logs: obs.enable_logs,
89 sampling_ratio: obs.sampling_ratio,
90 }
91 }
92
93 pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
94 self.otlp_endpoint = endpoint.into();
95 self
96 }
97
98 pub fn with_version(mut self, version: impl Into<String>) -> Self {
99 self.service_version = version.into();
100 self
101 }
102
103 pub fn with_environment(mut self, env: impl Into<String>) -> Self {
104 self.environment = env.into();
105 self
106 }
107
108 pub fn with_traces(mut self, enabled: bool) -> Self {
109 self.enable_traces = enabled;
110 self
111 }
112
113 pub fn with_metrics(mut self, enabled: bool) -> Self {
114 self.enable_metrics = enabled;
115 self
116 }
117
118 pub fn with_logs(mut self, enabled: bool) -> Self {
119 self.enable_logs = enabled;
120 self
121 }
122
123 pub fn with_sampling_ratio(mut self, ratio: f64) -> Self {
124 self.sampling_ratio = ratio.clamp(0.0, 1.0);
125 self
126 }
127}
128
129fn build_resource(config: &TelemetryConfig) -> Resource {
130 let base = Resource::from_detectors(
131 std::time::Duration::from_secs(5),
132 vec![
133 Box::new(SdkProvidedResourceDetector),
134 Box::new(EnvResourceDetector::new()),
135 ],
136 );
137
138 let custom = Resource::new(vec![
139 KeyValue::new(SERVICE_NAME, config.service_name.clone()),
140 KeyValue::new(SERVICE_VERSION, config.service_version.clone()),
141 KeyValue::new(DEPLOYMENT_ENVIRONMENT_NAME, config.environment.clone()),
142 ]);
143
144 base.merge(&custom)
145}
146
147fn init_tracer(config: &TelemetryConfig) -> Result<TracerProvider, TelemetryError> {
148 let exporter = SpanExporter::builder()
149 .with_tonic()
150 .with_endpoint(&config.otlp_endpoint)
151 .build()
152 .map_err(|e| TelemetryError::TracerInit(e.to_string()))?;
153
154 let sampler = if config.sampling_ratio >= 1.0 {
155 Sampler::AlwaysOn
156 } else if config.sampling_ratio <= 0.0 {
157 Sampler::AlwaysOff
158 } else {
159 Sampler::TraceIdRatioBased(config.sampling_ratio)
160 };
161
162 let provider = TracerProvider::builder()
163 .with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
164 .with_sampler(sampler)
165 .with_id_generator(RandomIdGenerator::default())
166 .with_resource(build_resource(config))
167 .build();
168
169 Ok(provider)
170}
171
172fn init_meter(config: &TelemetryConfig) -> Result<SdkMeterProvider, TelemetryError> {
173 let exporter = MetricExporter::builder()
174 .with_tonic()
175 .with_endpoint(&config.otlp_endpoint)
176 .build()
177 .map_err(|e| TelemetryError::MeterInit(e.to_string()))?;
178
179 let reader = PeriodicReader::builder(exporter, opentelemetry_sdk::runtime::Tokio).build();
180
181 let provider = MeterProviderBuilder::default()
182 .with_reader(reader)
183 .with_resource(build_resource(config))
184 .build();
185
186 Ok(provider)
187}
188
189fn init_logger(config: &TelemetryConfig) -> Result<LoggerProvider, TelemetryError> {
190 let exporter = LogExporter::builder()
191 .with_tonic()
192 .with_endpoint(&config.otlp_endpoint)
193 .build()
194 .map_err(|e| TelemetryError::LoggerInit(e.to_string()))?;
195
196 let provider = LoggerProvider::builder()
197 .with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
198 .with_resource(build_resource(config))
199 .build();
200
201 Ok(provider)
202}
203
204pub fn init_telemetry(config: &TelemetryConfig) -> Result<(), TelemetryError> {
205 global::set_text_map_propagator(TraceContextPropagator::new());
206
207 let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
208
209 let registry = Registry::default().with(env_filter);
210
211 if config.enable_traces {
212 let tracer_provider = init_tracer(config)?;
213 let tracer = tracer_provider.tracer(config.service_name.clone());
214
215 TRACER_PROVIDER
216 .set(tracer_provider.clone())
217 .map_err(|_| TelemetryError::AlreadyInitialized)?;
218
219 global::set_tracer_provider(tracer_provider);
220
221 let otel_layer = OpenTelemetryLayer::new(tracer);
222
223 let fmt_layer = tracing_subscriber::fmt::layer()
224 .with_target(true)
225 .with_thread_ids(false)
226 .with_file(false)
227 .with_line_number(false);
228
229 registry
230 .with(otel_layer)
231 .with(fmt_layer)
232 .try_init()
233 .map_err(|e| TelemetryError::SubscriberInit(e.to_string()))?;
234 } else {
235 let fmt_layer = tracing_subscriber::fmt::layer()
236 .with_target(true)
237 .with_thread_ids(false)
238 .with_file(false)
239 .with_line_number(false);
240
241 registry
242 .with(fmt_layer)
243 .try_init()
244 .map_err(|e| TelemetryError::SubscriberInit(e.to_string()))?;
245 }
246
247 if config.enable_metrics {
248 let meter_provider = init_meter(config)?;
249
250 METER_PROVIDER
251 .set(meter_provider.clone())
252 .map_err(|_| TelemetryError::AlreadyInitialized)?;
253
254 global::set_meter_provider(meter_provider);
255 }
256
257 if config.enable_logs {
258 let logger_provider = init_logger(config)?;
259
260 LOGGER_PROVIDER
261 .set(logger_provider)
262 .map_err(|_| TelemetryError::AlreadyInitialized)?;
263 }
264
265 tracing::info!(
266 service = %config.service_name,
267 version = %config.service_version,
268 environment = %config.environment,
269 traces = config.enable_traces,
270 metrics = config.enable_metrics,
271 logs = config.enable_logs,
272 "telemetry initialized"
273 );
274
275 Ok(())
276}
277
278pub fn shutdown_telemetry() {
279 tracing::info!("shutting down telemetry");
280
281 if let Some(provider) = TRACER_PROVIDER.get()
282 && let Err(e) = provider.shutdown()
283 {
284 tracing::warn!(error = %e, "failed to shutdown tracer provider");
285 }
286
287 if let Some(provider) = METER_PROVIDER.get()
288 && let Err(e) = provider.shutdown()
289 {
290 tracing::warn!(error = %e, "failed to shutdown meter provider");
291 }
292
293 if let Some(provider) = LOGGER_PROVIDER.get()
294 && let Err(e) = provider.shutdown()
295 {
296 tracing::warn!(error = %e, "failed to shutdown logger provider");
297 }
298}
299
300#[cfg(test)]
301mod tests {
302 use super::*;
303
304 #[test]
305 fn test_config_builder() {
306 let config = TelemetryConfig::new("test-service")
307 .with_endpoint("http://otel:4317")
308 .with_version("1.0.0")
309 .with_environment("production")
310 .with_traces(true)
311 .with_metrics(false)
312 .with_logs(true);
313
314 assert_eq!(config.service_name, "test-service");
315 assert_eq!(config.otlp_endpoint, "http://otel:4317");
316 assert_eq!(config.service_version, "1.0.0");
317 assert_eq!(config.environment, "production");
318 assert!(config.enable_traces);
319 assert!(!config.enable_metrics);
320 assert!(config.enable_logs);
321 }
322
323 #[test]
324 fn test_default_config() {
325 let config = TelemetryConfig::default();
326
327 assert_eq!(config.otlp_endpoint, "http://localhost:4317");
328 assert_eq!(config.service_name, "forge-service");
329 assert_eq!(config.environment, "development");
330 assert!(config.enable_traces);
331 assert!(config.enable_metrics);
332 assert!(config.enable_logs);
333 }
334}