Skip to main content

forge_runtime/observability/
telemetry.rs

1use opentelemetry::{KeyValue, global, trace::TracerProvider as _};
2use opentelemetry_otlp::{LogExporter, MetricExporter, SpanExporter, WithExportConfig};
3use opentelemetry_sdk::{
4    Resource,
5    logs::LoggerProvider,
6    metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider},
7    propagation::TraceContextPropagator,
8    resource::{EnvResourceDetector, SdkProvidedResourceDetector},
9    trace::{RandomIdGenerator, Sampler, TracerProvider},
10};
11use opentelemetry_semantic_conventions::resource::{SERVICE_NAME, SERVICE_VERSION};
12
13const DEPLOYMENT_ENVIRONMENT_NAME: &str = "deployment.environment.name";
14use std::sync::OnceLock;
15use thiserror::Error;
16use tracing_opentelemetry::OpenTelemetryLayer;
17use tracing_subscriber::{EnvFilter, Registry, layer::SubscriberExt, util::SubscriberInitExt};
18
19static TRACER_PROVIDER: OnceLock<TracerProvider> = OnceLock::new();
20static METER_PROVIDER: OnceLock<SdkMeterProvider> = OnceLock::new();
21static LOGGER_PROVIDER: OnceLock<LoggerProvider> = OnceLock::new();
22
23#[derive(Debug, Error)]
24pub enum TelemetryError {
25    #[error("failed to initialize tracer: {0}")]
26    TracerInit(String),
27    #[error("failed to initialize meter: {0}")]
28    MeterInit(String),
29    #[error("failed to initialize logger: {0}")]
30    LoggerInit(String),
31    #[error("telemetry already initialized")]
32    AlreadyInitialized,
33    #[error("tracing subscriber init failed: {0}")]
34    SubscriberInit(String),
35}
36
37#[derive(Debug, Clone)]
38pub struct TelemetryConfig {
39    pub otlp_endpoint: String,
40    pub service_name: String,
41    pub service_version: String,
42    pub environment: String,
43    pub enable_traces: bool,
44    pub enable_metrics: bool,
45    pub enable_logs: bool,
46    pub sampling_ratio: f64,
47}
48
49impl Default for TelemetryConfig {
50    fn default() -> Self {
51        Self {
52            otlp_endpoint: "http://localhost:4317".to_string(),
53            service_name: "forge-service".to_string(),
54            service_version: "0.1.0".to_string(),
55            environment: "development".to_string(),
56            enable_traces: true,
57            enable_metrics: true,
58            enable_logs: true,
59            sampling_ratio: 1.0,
60        }
61    }
62}
63
64impl TelemetryConfig {
65    pub fn new(service_name: impl Into<String>) -> Self {
66        Self {
67            service_name: service_name.into(),
68            ..Default::default()
69        }
70    }
71
72    /// Create config from ForgeConfig's observability settings.
73    pub fn from_observability_config(
74        obs: &forge_core::config::ObservabilityConfig,
75        project_name: &str,
76        project_version: &str,
77    ) -> Self {
78        Self {
79            otlp_endpoint: obs.otlp_endpoint.clone(),
80            service_name: obs
81                .service_name
82                .clone()
83                .unwrap_or_else(|| project_name.to_string()),
84            service_version: project_version.to_string(),
85            environment: "production".to_string(),
86            enable_traces: obs.enable_traces,
87            enable_metrics: obs.enable_metrics,
88            enable_logs: obs.enable_logs,
89            sampling_ratio: obs.sampling_ratio,
90        }
91    }
92
93    pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
94        self.otlp_endpoint = endpoint.into();
95        self
96    }
97
98    pub fn with_version(mut self, version: impl Into<String>) -> Self {
99        self.service_version = version.into();
100        self
101    }
102
103    pub fn with_environment(mut self, env: impl Into<String>) -> Self {
104        self.environment = env.into();
105        self
106    }
107
108    pub fn with_traces(mut self, enabled: bool) -> Self {
109        self.enable_traces = enabled;
110        self
111    }
112
113    pub fn with_metrics(mut self, enabled: bool) -> Self {
114        self.enable_metrics = enabled;
115        self
116    }
117
118    pub fn with_logs(mut self, enabled: bool) -> Self {
119        self.enable_logs = enabled;
120        self
121    }
122
123    pub fn with_sampling_ratio(mut self, ratio: f64) -> Self {
124        self.sampling_ratio = ratio.clamp(0.0, 1.0);
125        self
126    }
127}
128
129fn build_resource(config: &TelemetryConfig) -> Resource {
130    let base = Resource::from_detectors(
131        std::time::Duration::from_secs(5),
132        vec![
133            Box::new(SdkProvidedResourceDetector),
134            Box::new(EnvResourceDetector::new()),
135        ],
136    );
137
138    let custom = Resource::new(vec![
139        KeyValue::new(SERVICE_NAME, config.service_name.clone()),
140        KeyValue::new(SERVICE_VERSION, config.service_version.clone()),
141        KeyValue::new(DEPLOYMENT_ENVIRONMENT_NAME, config.environment.clone()),
142    ]);
143
144    base.merge(&custom)
145}
146
147fn init_tracer(config: &TelemetryConfig) -> Result<TracerProvider, TelemetryError> {
148    let exporter = SpanExporter::builder()
149        .with_tonic()
150        .with_endpoint(&config.otlp_endpoint)
151        .build()
152        .map_err(|e| TelemetryError::TracerInit(e.to_string()))?;
153
154    let sampler = if config.sampling_ratio >= 1.0 {
155        Sampler::AlwaysOn
156    } else if config.sampling_ratio <= 0.0 {
157        Sampler::AlwaysOff
158    } else {
159        Sampler::TraceIdRatioBased(config.sampling_ratio)
160    };
161
162    let provider = TracerProvider::builder()
163        .with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
164        .with_sampler(sampler)
165        .with_id_generator(RandomIdGenerator::default())
166        .with_resource(build_resource(config))
167        .build();
168
169    Ok(provider)
170}
171
172fn init_meter(config: &TelemetryConfig) -> Result<SdkMeterProvider, TelemetryError> {
173    let exporter = MetricExporter::builder()
174        .with_tonic()
175        .with_endpoint(&config.otlp_endpoint)
176        .build()
177        .map_err(|e| TelemetryError::MeterInit(e.to_string()))?;
178
179    let reader = PeriodicReader::builder(exporter, opentelemetry_sdk::runtime::Tokio).build();
180
181    let provider = MeterProviderBuilder::default()
182        .with_reader(reader)
183        .with_resource(build_resource(config))
184        .build();
185
186    Ok(provider)
187}
188
189fn init_logger(config: &TelemetryConfig) -> Result<LoggerProvider, TelemetryError> {
190    let exporter = LogExporter::builder()
191        .with_tonic()
192        .with_endpoint(&config.otlp_endpoint)
193        .build()
194        .map_err(|e| TelemetryError::LoggerInit(e.to_string()))?;
195
196    let provider = LoggerProvider::builder()
197        .with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
198        .with_resource(build_resource(config))
199        .build();
200
201    Ok(provider)
202}
203
204/// `forge dev` sets RUST_LOG=warn,forge=info which silences user crate logs.
205/// This ensures the user's crate is always visible at the configured level.
206pub fn build_env_filter(project_name: &str, log_level: &str) -> EnvFilter {
207    let crate_name = project_name.replace('-', "_");
208
209    let base = if let Ok(filter) = EnvFilter::try_from_default_env() {
210        filter
211    } else {
212        EnvFilter::new(log_level)
213    };
214
215    // Always ensure the user crate is visible at the configured level
216    let directive = format!("{}={}", crate_name, log_level);
217    match directive.parse() {
218        Ok(d) => base.add_directive(d),
219        Err(_) => base,
220    }
221}
222
223/// Set up tracing so logs work without any user boilerplate.
224/// Returns `Ok(false)` if a subscriber already exists (user configured their own).
225pub fn init_telemetry(
226    config: &TelemetryConfig,
227    project_name: &str,
228    log_level: &str,
229) -> Result<bool, TelemetryError> {
230    global::set_text_map_propagator(TraceContextPropagator::new());
231
232    let env_filter = build_env_filter(project_name, log_level);
233
234    let registry = Registry::default().with(env_filter);
235
236    if config.enable_traces {
237        let tracer_provider = init_tracer(config)?;
238        let tracer = tracer_provider.tracer(config.service_name.clone());
239
240        TRACER_PROVIDER
241            .set(tracer_provider.clone())
242            .map_err(|_| TelemetryError::AlreadyInitialized)?;
243
244        global::set_tracer_provider(tracer_provider);
245
246        let otel_layer = OpenTelemetryLayer::new(tracer);
247        let fmt_layer = tracing_subscriber::fmt::layer()
248            .with_target(true)
249            .with_thread_ids(false)
250            .with_file(false)
251            .with_line_number(false);
252
253        if registry
254            .with(otel_layer)
255            .with(fmt_layer)
256            .try_init()
257            .is_err()
258        {
259            return Ok(false);
260        }
261    } else {
262        let fmt_layer = tracing_subscriber::fmt::layer()
263            .with_target(true)
264            .with_thread_ids(false)
265            .with_file(false)
266            .with_line_number(false);
267
268        if registry.with(fmt_layer).try_init().is_err() {
269            return Ok(false);
270        }
271    }
272
273    if config.enable_metrics {
274        let meter_provider = init_meter(config)?;
275
276        METER_PROVIDER
277            .set(meter_provider.clone())
278            .map_err(|_| TelemetryError::AlreadyInitialized)?;
279
280        global::set_meter_provider(meter_provider);
281    }
282
283    if config.enable_logs {
284        let logger_provider = init_logger(config)?;
285
286        LOGGER_PROVIDER
287            .set(logger_provider)
288            .map_err(|_| TelemetryError::AlreadyInitialized)?;
289    }
290
291    tracing::info!(
292        service = %config.service_name,
293        version = %config.service_version,
294        environment = %config.environment,
295        traces = config.enable_traces,
296        metrics = config.enable_metrics,
297        logs = config.enable_logs,
298        "telemetry initialized"
299    );
300
301    Ok(true)
302}
303
304pub fn shutdown_telemetry() {
305    tracing::info!("shutting down telemetry");
306
307    if let Some(provider) = TRACER_PROVIDER.get()
308        && let Err(e) = provider.shutdown()
309    {
310        tracing::warn!(error = %e, "failed to shutdown tracer provider");
311    }
312
313    if let Some(provider) = METER_PROVIDER.get()
314        && let Err(e) = provider.shutdown()
315    {
316        tracing::warn!(error = %e, "failed to shutdown meter provider");
317    }
318
319    if let Some(provider) = LOGGER_PROVIDER.get()
320        && let Err(e) = provider.shutdown()
321    {
322        tracing::warn!(error = %e, "failed to shutdown logger provider");
323    }
324}
325
326#[cfg(test)]
327mod tests {
328    use super::*;
329
330    #[test]
331    fn test_config_builder() {
332        let config = TelemetryConfig::new("test-service")
333            .with_endpoint("http://otel:4317")
334            .with_version("1.0.0")
335            .with_environment("production")
336            .with_traces(true)
337            .with_metrics(false)
338            .with_logs(true);
339
340        assert_eq!(config.service_name, "test-service");
341        assert_eq!(config.otlp_endpoint, "http://otel:4317");
342        assert_eq!(config.service_version, "1.0.0");
343        assert_eq!(config.environment, "production");
344        assert!(config.enable_traces);
345        assert!(!config.enable_metrics);
346        assert!(config.enable_logs);
347    }
348
349    #[test]
350    fn test_default_config() {
351        let config = TelemetryConfig::default();
352
353        assert_eq!(config.otlp_endpoint, "http://localhost:4317");
354        assert_eq!(config.service_name, "forge-service");
355        assert_eq!(config.environment, "development");
356        assert!(config.enable_traces);
357        assert!(config.enable_metrics);
358        assert!(config.enable_logs);
359    }
360}