Skip to main content

forge_runtime/observability/
telemetry.rs

1use opentelemetry::{KeyValue, global, trace::TracerProvider as _};
2use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
3use opentelemetry_otlp::{LogExporter, MetricExporter, SpanExporter, WithExportConfig};
4use opentelemetry_sdk::{
5    Resource,
6    logs::LoggerProvider,
7    metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider},
8    propagation::TraceContextPropagator,
9    resource::{EnvResourceDetector, SdkProvidedResourceDetector},
10    trace::{RandomIdGenerator, Sampler, TracerProvider},
11};
12use opentelemetry_semantic_conventions::resource::{SERVICE_NAME, SERVICE_VERSION};
13
14const DEPLOYMENT_ENVIRONMENT_NAME: &str = "deployment.environment.name";
15use std::sync::OnceLock;
16use thiserror::Error;
17use tracing_opentelemetry::OpenTelemetryLayer;
18use tracing_subscriber::{
19    EnvFilter, Layer, Registry,
20    filter::{FilterExt, FilterFn},
21    layer::SubscriberExt,
22    util::SubscriberInitExt,
23};
24
25static TRACER_PROVIDER: OnceLock<TracerProvider> = OnceLock::new();
26static METER_PROVIDER: OnceLock<SdkMeterProvider> = OnceLock::new();
27static LOGGER_PROVIDER: OnceLock<LoggerProvider> = OnceLock::new();
28
29#[derive(Debug, Error)]
30pub enum TelemetryError {
31    #[error("failed to initialize tracer: {0}")]
32    TracerInit(String),
33    #[error("failed to initialize meter: {0}")]
34    MeterInit(String),
35    #[error("failed to initialize logger: {0}")]
36    LoggerInit(String),
37    #[error("telemetry already initialized")]
38    AlreadyInitialized,
39    #[error("tracing subscriber init failed: {0}")]
40    SubscriberInit(String),
41}
42
43#[derive(Debug, Clone)]
44pub struct TelemetryConfig {
45    pub otlp_endpoint: String,
46    pub service_name: String,
47    pub service_version: String,
48    pub environment: String,
49    pub enable_traces: bool,
50    pub enable_metrics: bool,
51    pub enable_logs: bool,
52    pub sampling_ratio: f64,
53}
54
55impl Default for TelemetryConfig {
56    fn default() -> Self {
57        Self {
58            otlp_endpoint: "http://localhost:4318".to_string(),
59            service_name: "forge-service".to_string(),
60            service_version: "0.1.0".to_string(),
61            environment: "development".to_string(),
62            enable_traces: true,
63            enable_metrics: true,
64            enable_logs: true,
65            sampling_ratio: 1.0,
66        }
67    }
68}
69
70impl TelemetryConfig {
71    pub fn new(service_name: impl Into<String>) -> Self {
72        Self {
73            service_name: service_name.into(),
74            ..Default::default()
75        }
76    }
77
78    /// Create config from ForgeConfig's observability settings.
79    pub fn from_observability_config(
80        obs: &forge_core::config::ObservabilityConfig,
81        project_name: &str,
82        project_version: &str,
83    ) -> Self {
84        // When observability is disabled, turn off OTLP export but keep the
85        // fmt subscriber so console logs still work.
86        let otlp_enabled = obs.enabled;
87        Self {
88            otlp_endpoint: obs.otlp_endpoint.clone(),
89            service_name: obs
90                .service_name
91                .clone()
92                .unwrap_or_else(|| project_name.to_string()),
93            service_version: project_version.to_string(),
94            environment: "production".to_string(),
95            enable_traces: otlp_enabled && obs.enable_traces,
96            enable_metrics: otlp_enabled && obs.enable_metrics,
97            enable_logs: otlp_enabled && obs.enable_logs,
98            sampling_ratio: obs.sampling_ratio,
99        }
100    }
101
102    pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
103        self.otlp_endpoint = endpoint.into();
104        self
105    }
106
107    pub fn with_version(mut self, version: impl Into<String>) -> Self {
108        self.service_version = version.into();
109        self
110    }
111
112    pub fn with_environment(mut self, env: impl Into<String>) -> Self {
113        self.environment = env.into();
114        self
115    }
116
117    pub fn with_traces(mut self, enabled: bool) -> Self {
118        self.enable_traces = enabled;
119        self
120    }
121
122    pub fn with_metrics(mut self, enabled: bool) -> Self {
123        self.enable_metrics = enabled;
124        self
125    }
126
127    pub fn with_logs(mut self, enabled: bool) -> Self {
128        self.enable_logs = enabled;
129        self
130    }
131
132    pub fn with_sampling_ratio(mut self, ratio: f64) -> Self {
133        self.sampling_ratio = ratio.clamp(0.0, 1.0);
134        self
135    }
136}
137
138fn build_resource(config: &TelemetryConfig) -> Resource {
139    let base = Resource::from_detectors(
140        std::time::Duration::from_secs(5),
141        vec![
142            Box::new(SdkProvidedResourceDetector),
143            Box::new(EnvResourceDetector::new()),
144        ],
145    );
146
147    let custom = Resource::new(vec![
148        KeyValue::new(SERVICE_NAME, config.service_name.clone()),
149        KeyValue::new(SERVICE_VERSION, config.service_version.clone()),
150        KeyValue::new(DEPLOYMENT_ENVIRONMENT_NAME, config.environment.clone()),
151    ]);
152
153    base.merge(&custom)
154}
155
156fn init_tracer(config: &TelemetryConfig) -> Result<TracerProvider, TelemetryError> {
157    let exporter = SpanExporter::builder()
158        .with_http()
159        .with_endpoint(&config.otlp_endpoint)
160        .build()
161        .map_err(|e| TelemetryError::TracerInit(e.to_string()))?;
162
163    let sampler = if config.sampling_ratio >= 1.0 {
164        Sampler::AlwaysOn
165    } else if config.sampling_ratio <= 0.0 {
166        Sampler::AlwaysOff
167    } else {
168        Sampler::TraceIdRatioBased(config.sampling_ratio)
169    };
170
171    let provider = TracerProvider::builder()
172        .with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
173        .with_sampler(sampler)
174        .with_id_generator(RandomIdGenerator::default())
175        .with_resource(build_resource(config))
176        .build();
177
178    Ok(provider)
179}
180
181fn init_meter(config: &TelemetryConfig) -> Result<SdkMeterProvider, TelemetryError> {
182    let exporter = MetricExporter::builder()
183        .with_http()
184        .with_endpoint(&config.otlp_endpoint)
185        .build()
186        .map_err(|e| TelemetryError::MeterInit(e.to_string()))?;
187
188    let reader = PeriodicReader::builder(exporter, opentelemetry_sdk::runtime::Tokio).build();
189
190    let provider = MeterProviderBuilder::default()
191        .with_reader(reader)
192        .with_resource(build_resource(config))
193        .build();
194
195    Ok(provider)
196}
197
198fn init_logger(config: &TelemetryConfig) -> Result<LoggerProvider, TelemetryError> {
199    let exporter = LogExporter::builder()
200        .with_http()
201        .with_endpoint(&config.otlp_endpoint)
202        .build()
203        .map_err(|e| TelemetryError::LoggerInit(e.to_string()))?;
204
205    let provider = LoggerProvider::builder()
206        .with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
207        .with_resource(build_resource(config))
208        .build();
209
210    Ok(provider)
211}
212
213/// Build an `EnvFilter` for console output.
214///
215/// Ensures the user's crate is visible at the configured level even when
216/// `RUST_LOG` is set to something restrictive (e.g. `warn`).
217pub fn build_env_filter(project_name: &str, log_level: &str) -> EnvFilter {
218    build_console_filter(project_name, log_level)
219}
220
221fn build_console_filter(project_name: &str, log_level: &str) -> EnvFilter {
222    let crate_name = project_name.replace('-', "_");
223
224    let base = if let Ok(filter) = EnvFilter::try_from_default_env() {
225        filter
226    } else {
227        EnvFilter::new(log_level)
228    };
229
230    let directive = format!("{}={}", crate_name, log_level);
231    match directive.parse() {
232        Ok(d) => base.add_directive(d),
233        Err(_) => base,
234    }
235}
236
237/// Set up tracing so logs work without any user boilerplate.
238/// Returns `Ok(false)` if a subscriber already exists (user configured their own).
239pub fn init_telemetry(
240    config: &TelemetryConfig,
241    project_name: &str,
242    log_level: &str,
243) -> Result<bool, TelemetryError> {
244    global::set_text_map_propagator(TraceContextPropagator::new());
245
246    // Per-layer filters avoid the global EnvFilter + per-layer filter conflict
247    // that causes the OTel log bridge to silently drop events.
248    let fmt_layer = tracing_subscriber::fmt::layer()
249        .with_target(true)
250        .with_thread_ids(false)
251        .with_file(false)
252        .with_line_number(false)
253        .with_filter(build_console_filter(project_name, log_level));
254
255    // Build optional trace layer (includes sqlx debug for DB-level spans)
256    let otel_trace_layer = if config.enable_traces {
257        let tracer_provider = init_tracer(config)?;
258        let tracer = tracer_provider.tracer(config.service_name.clone());
259
260        TRACER_PROVIDER
261            .set(tracer_provider.clone())
262            .map_err(|_| TelemetryError::AlreadyInitialized)?;
263
264        global::set_tracer_provider(tracer_provider);
265
266        Some(
267            OpenTelemetryLayer::new(tracer)
268                .with_filter(build_console_filter(project_name, log_level)),
269        )
270    } else {
271        None
272    };
273
274    // Build optional log bridge layer.
275    // Uses the OTel filter (includes sqlx debug) plus an anti-recursion guard
276    // that blocks events from the OTLP transport stack (hyper, reqwest, h2,
277    // etc.) to prevent infinite feedback through the HTTP exporter.
278    let otel_log_layer = if config.enable_logs {
279        let logger_provider = init_logger(config)?;
280
281        let env_filter = build_console_filter(project_name, log_level);
282        let log_layer = OpenTelemetryTracingBridge::new(&logger_provider).with_filter(
283            env_filter.and(FilterFn::new(|metadata| {
284                let target = metadata.target();
285                !target.starts_with("hyper")
286                    && !target.starts_with("reqwest")
287                    && !target.starts_with("h2")
288                    && !target.starts_with("tonic")
289                    && !target.starts_with("tower")
290                    && !target.starts_with("opentelemetry")
291            })),
292        );
293
294        LOGGER_PROVIDER
295            .set(logger_provider)
296            .map_err(|_| TelemetryError::AlreadyInitialized)?;
297
298        Some(log_layer)
299    } else {
300        None
301    };
302
303    // No global filter: each layer carries its own per-layer filter.
304    if Registry::default()
305        .with(fmt_layer)
306        .with(otel_trace_layer)
307        .with(otel_log_layer)
308        .try_init()
309        .is_err()
310    {
311        return Ok(false);
312    }
313
314    if config.enable_metrics {
315        let meter_provider = init_meter(config)?;
316
317        METER_PROVIDER
318            .set(meter_provider.clone())
319            .map_err(|_| TelemetryError::AlreadyInitialized)?;
320
321        global::set_meter_provider(meter_provider);
322    }
323
324    tracing::info!(
325        service = %config.service_name,
326        version = %config.service_version,
327        environment = %config.environment,
328        traces = config.enable_traces,
329        metrics = config.enable_metrics,
330        logs = config.enable_logs,
331        "telemetry initialized"
332    );
333
334    Ok(true)
335}
336
337pub fn shutdown_telemetry() {
338    tracing::info!("shutting down telemetry");
339
340    if let Some(provider) = TRACER_PROVIDER.get()
341        && let Err(e) = provider.shutdown()
342    {
343        tracing::warn!(error = %e, "failed to shutdown tracer provider");
344    }
345
346    if let Some(provider) = METER_PROVIDER.get()
347        && let Err(e) = provider.shutdown()
348    {
349        tracing::warn!(error = %e, "failed to shutdown meter provider");
350    }
351
352    if let Some(provider) = LOGGER_PROVIDER.get()
353        && let Err(e) = provider.shutdown()
354    {
355        tracing::warn!(error = %e, "failed to shutdown logger provider");
356    }
357}
358
359#[cfg(test)]
360mod tests {
361    use super::*;
362
363    #[test]
364    fn test_config_builder() {
365        let config = TelemetryConfig::new("test-service")
366            .with_endpoint("http://otel:4318")
367            .with_version("1.0.0")
368            .with_environment("production")
369            .with_traces(true)
370            .with_metrics(false)
371            .with_logs(true);
372
373        assert_eq!(config.service_name, "test-service");
374        assert_eq!(config.otlp_endpoint, "http://otel:4318");
375        assert_eq!(config.service_version, "1.0.0");
376        assert_eq!(config.environment, "production");
377        assert!(config.enable_traces);
378        assert!(!config.enable_metrics);
379        assert!(config.enable_logs);
380    }
381
382    #[test]
383    fn test_default_config() {
384        let config = TelemetryConfig::default();
385
386        assert_eq!(config.otlp_endpoint, "http://localhost:4318");
387        assert_eq!(config.service_name, "forge-service");
388        assert_eq!(config.environment, "development");
389        assert!(config.enable_traces);
390        assert!(config.enable_metrics);
391        assert!(config.enable_logs);
392    }
393}