Skip to main content

forge_runtime/observability/
telemetry.rs

1use opentelemetry::{KeyValue, global, trace::TracerProvider as _};
2use opentelemetry_otlp::{LogExporter, MetricExporter, SpanExporter, WithExportConfig};
3use opentelemetry_sdk::{
4    Resource,
5    logs::LoggerProvider,
6    metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider},
7    propagation::TraceContextPropagator,
8    resource::{EnvResourceDetector, SdkProvidedResourceDetector},
9    trace::{RandomIdGenerator, Sampler, TracerProvider},
10};
11use opentelemetry_semantic_conventions::resource::{SERVICE_NAME, SERVICE_VERSION};
12
13const DEPLOYMENT_ENVIRONMENT_NAME: &str = "deployment.environment.name";
14use std::sync::OnceLock;
15use thiserror::Error;
16use tracing_opentelemetry::OpenTelemetryLayer;
17use tracing_subscriber::{EnvFilter, Registry, layer::SubscriberExt, util::SubscriberInitExt};
18
19static TRACER_PROVIDER: OnceLock<TracerProvider> = OnceLock::new();
20static METER_PROVIDER: OnceLock<SdkMeterProvider> = OnceLock::new();
21static LOGGER_PROVIDER: OnceLock<LoggerProvider> = OnceLock::new();
22
23#[derive(Debug, Error)]
24pub enum TelemetryError {
25    #[error("failed to initialize tracer: {0}")]
26    TracerInit(String),
27    #[error("failed to initialize meter: {0}")]
28    MeterInit(String),
29    #[error("failed to initialize logger: {0}")]
30    LoggerInit(String),
31    #[error("telemetry already initialized")]
32    AlreadyInitialized,
33    #[error("tracing subscriber init failed: {0}")]
34    SubscriberInit(String),
35}
36
37#[derive(Debug, Clone)]
38pub struct TelemetryConfig {
39    pub otlp_endpoint: String,
40    pub service_name: String,
41    pub service_version: String,
42    pub environment: String,
43    pub enable_traces: bool,
44    pub enable_metrics: bool,
45    pub enable_logs: bool,
46    pub sampling_ratio: f64,
47}
48
49impl Default for TelemetryConfig {
50    fn default() -> Self {
51        Self {
52            otlp_endpoint: "http://localhost:4317".to_string(),
53            service_name: "forge-service".to_string(),
54            service_version: "0.1.0".to_string(),
55            environment: "development".to_string(),
56            enable_traces: true,
57            enable_metrics: true,
58            enable_logs: true,
59            sampling_ratio: 1.0,
60        }
61    }
62}
63
64impl TelemetryConfig {
65    pub fn new(service_name: impl Into<String>) -> Self {
66        Self {
67            service_name: service_name.into(),
68            ..Default::default()
69        }
70    }
71
72    /// Create config from ForgeConfig's observability settings.
73    pub fn from_observability_config(
74        obs: &forge_core::config::ObservabilityConfig,
75        project_name: &str,
76        project_version: &str,
77    ) -> Self {
78        Self {
79            otlp_endpoint: obs.otlp_endpoint.clone(),
80            service_name: obs
81                .service_name
82                .clone()
83                .unwrap_or_else(|| project_name.to_string()),
84            service_version: project_version.to_string(),
85            environment: "production".to_string(),
86            enable_traces: obs.enable_traces,
87            enable_metrics: obs.enable_metrics,
88            enable_logs: obs.enable_logs,
89            sampling_ratio: obs.sampling_ratio,
90        }
91    }
92
93    pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
94        self.otlp_endpoint = endpoint.into();
95        self
96    }
97
98    pub fn with_version(mut self, version: impl Into<String>) -> Self {
99        self.service_version = version.into();
100        self
101    }
102
103    pub fn with_environment(mut self, env: impl Into<String>) -> Self {
104        self.environment = env.into();
105        self
106    }
107
108    pub fn with_traces(mut self, enabled: bool) -> Self {
109        self.enable_traces = enabled;
110        self
111    }
112
113    pub fn with_metrics(mut self, enabled: bool) -> Self {
114        self.enable_metrics = enabled;
115        self
116    }
117
118    pub fn with_logs(mut self, enabled: bool) -> Self {
119        self.enable_logs = enabled;
120        self
121    }
122
123    pub fn with_sampling_ratio(mut self, ratio: f64) -> Self {
124        self.sampling_ratio = ratio.clamp(0.0, 1.0);
125        self
126    }
127}
128
129fn build_resource(config: &TelemetryConfig) -> Resource {
130    let base = Resource::from_detectors(
131        std::time::Duration::from_secs(5),
132        vec![
133            Box::new(SdkProvidedResourceDetector),
134            Box::new(EnvResourceDetector::new()),
135        ],
136    );
137
138    let custom = Resource::new(vec![
139        KeyValue::new(SERVICE_NAME, config.service_name.clone()),
140        KeyValue::new(SERVICE_VERSION, config.service_version.clone()),
141        KeyValue::new(DEPLOYMENT_ENVIRONMENT_NAME, config.environment.clone()),
142    ]);
143
144    base.merge(&custom)
145}
146
147fn init_tracer(config: &TelemetryConfig) -> Result<TracerProvider, TelemetryError> {
148    let exporter = SpanExporter::builder()
149        .with_tonic()
150        .with_endpoint(&config.otlp_endpoint)
151        .build()
152        .map_err(|e| TelemetryError::TracerInit(e.to_string()))?;
153
154    let sampler = if config.sampling_ratio >= 1.0 {
155        Sampler::AlwaysOn
156    } else if config.sampling_ratio <= 0.0 {
157        Sampler::AlwaysOff
158    } else {
159        Sampler::TraceIdRatioBased(config.sampling_ratio)
160    };
161
162    let provider = TracerProvider::builder()
163        .with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
164        .with_sampler(sampler)
165        .with_id_generator(RandomIdGenerator::default())
166        .with_resource(build_resource(config))
167        .build();
168
169    Ok(provider)
170}
171
172fn init_meter(config: &TelemetryConfig) -> Result<SdkMeterProvider, TelemetryError> {
173    let exporter = MetricExporter::builder()
174        .with_tonic()
175        .with_endpoint(&config.otlp_endpoint)
176        .build()
177        .map_err(|e| TelemetryError::MeterInit(e.to_string()))?;
178
179    let reader = PeriodicReader::builder(exporter, opentelemetry_sdk::runtime::Tokio).build();
180
181    let provider = MeterProviderBuilder::default()
182        .with_reader(reader)
183        .with_resource(build_resource(config))
184        .build();
185
186    Ok(provider)
187}
188
189fn init_logger(config: &TelemetryConfig) -> Result<LoggerProvider, TelemetryError> {
190    let exporter = LogExporter::builder()
191        .with_tonic()
192        .with_endpoint(&config.otlp_endpoint)
193        .build()
194        .map_err(|e| TelemetryError::LoggerInit(e.to_string()))?;
195
196    let provider = LoggerProvider::builder()
197        .with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
198        .with_resource(build_resource(config))
199        .build();
200
201    Ok(provider)
202}
203
204pub fn init_telemetry(config: &TelemetryConfig) -> Result<(), TelemetryError> {
205    global::set_text_map_propagator(TraceContextPropagator::new());
206
207    let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
208
209    let registry = Registry::default().with(env_filter);
210
211    if config.enable_traces {
212        let tracer_provider = init_tracer(config)?;
213        let tracer = tracer_provider.tracer(config.service_name.clone());
214
215        TRACER_PROVIDER
216            .set(tracer_provider.clone())
217            .map_err(|_| TelemetryError::AlreadyInitialized)?;
218
219        global::set_tracer_provider(tracer_provider);
220
221        let otel_layer = OpenTelemetryLayer::new(tracer);
222
223        let fmt_layer = tracing_subscriber::fmt::layer()
224            .with_target(true)
225            .with_thread_ids(false)
226            .with_file(false)
227            .with_line_number(false);
228
229        registry
230            .with(otel_layer)
231            .with(fmt_layer)
232            .try_init()
233            .map_err(|e| TelemetryError::SubscriberInit(e.to_string()))?;
234    } else {
235        let fmt_layer = tracing_subscriber::fmt::layer()
236            .with_target(true)
237            .with_thread_ids(false)
238            .with_file(false)
239            .with_line_number(false);
240
241        registry
242            .with(fmt_layer)
243            .try_init()
244            .map_err(|e| TelemetryError::SubscriberInit(e.to_string()))?;
245    }
246
247    if config.enable_metrics {
248        let meter_provider = init_meter(config)?;
249
250        METER_PROVIDER
251            .set(meter_provider.clone())
252            .map_err(|_| TelemetryError::AlreadyInitialized)?;
253
254        global::set_meter_provider(meter_provider);
255    }
256
257    if config.enable_logs {
258        let logger_provider = init_logger(config)?;
259
260        LOGGER_PROVIDER
261            .set(logger_provider)
262            .map_err(|_| TelemetryError::AlreadyInitialized)?;
263    }
264
265    tracing::info!(
266        service = %config.service_name,
267        version = %config.service_version,
268        environment = %config.environment,
269        traces = config.enable_traces,
270        metrics = config.enable_metrics,
271        logs = config.enable_logs,
272        "telemetry initialized"
273    );
274
275    Ok(())
276}
277
278pub fn shutdown_telemetry() {
279    tracing::info!("shutting down telemetry");
280
281    if let Some(provider) = TRACER_PROVIDER.get()
282        && let Err(e) = provider.shutdown()
283    {
284        tracing::warn!(error = %e, "failed to shutdown tracer provider");
285    }
286
287    if let Some(provider) = METER_PROVIDER.get()
288        && let Err(e) = provider.shutdown()
289    {
290        tracing::warn!(error = %e, "failed to shutdown meter provider");
291    }
292
293    if let Some(provider) = LOGGER_PROVIDER.get()
294        && let Err(e) = provider.shutdown()
295    {
296        tracing::warn!(error = %e, "failed to shutdown logger provider");
297    }
298}
299
300#[cfg(test)]
301mod tests {
302    use super::*;
303
304    #[test]
305    fn test_config_builder() {
306        let config = TelemetryConfig::new("test-service")
307            .with_endpoint("http://otel:4317")
308            .with_version("1.0.0")
309            .with_environment("production")
310            .with_traces(true)
311            .with_metrics(false)
312            .with_logs(true);
313
314        assert_eq!(config.service_name, "test-service");
315        assert_eq!(config.otlp_endpoint, "http://otel:4317");
316        assert_eq!(config.service_version, "1.0.0");
317        assert_eq!(config.environment, "production");
318        assert!(config.enable_traces);
319        assert!(!config.enable_metrics);
320        assert!(config.enable_logs);
321    }
322
323    #[test]
324    fn test_default_config() {
325        let config = TelemetryConfig::default();
326
327        assert_eq!(config.otlp_endpoint, "http://localhost:4317");
328        assert_eq!(config.service_name, "forge-service");
329        assert_eq!(config.environment, "development");
330        assert!(config.enable_traces);
331        assert!(config.enable_metrics);
332        assert!(config.enable_logs);
333    }
334}