Skip to main content

forge_runtime/observability/
telemetry.rs

1use opentelemetry::{KeyValue, global, trace::TracerProvider as _};
2use opentelemetry_otlp::{LogExporter, MetricExporter, SpanExporter, WithExportConfig};
3use opentelemetry_sdk::{
4    Resource,
5    logs::LoggerProvider,
6    metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider},
7    propagation::TraceContextPropagator,
8    resource::{EnvResourceDetector, SdkProvidedResourceDetector},
9    trace::{RandomIdGenerator, Sampler, TracerProvider},
10};
11use opentelemetry_semantic_conventions::resource::{SERVICE_NAME, SERVICE_VERSION};
12
13const DEPLOYMENT_ENVIRONMENT_NAME: &str = "deployment.environment.name";
14use std::sync::OnceLock;
15use thiserror::Error;
16use tracing_opentelemetry::OpenTelemetryLayer;
17use tracing_subscriber::{EnvFilter, Registry, layer::SubscriberExt, util::SubscriberInitExt};
18
19static TRACER_PROVIDER: OnceLock<TracerProvider> = OnceLock::new();
20static METER_PROVIDER: OnceLock<SdkMeterProvider> = OnceLock::new();
21static LOGGER_PROVIDER: OnceLock<LoggerProvider> = OnceLock::new();
22
23#[derive(Debug, Error)]
24pub enum TelemetryError {
25    #[error("failed to initialize tracer: {0}")]
26    TracerInit(String),
27    #[error("failed to initialize meter: {0}")]
28    MeterInit(String),
29    #[error("failed to initialize logger: {0}")]
30    LoggerInit(String),
31    #[error("telemetry already initialized")]
32    AlreadyInitialized,
33    #[error("tracing subscriber init failed: {0}")]
34    SubscriberInit(String),
35}
36
37#[derive(Debug, Clone)]
38pub struct TelemetryConfig {
39    pub otlp_endpoint: String,
40    pub service_name: String,
41    pub service_version: String,
42    pub environment: String,
43    pub enable_traces: bool,
44    pub enable_metrics: bool,
45    pub enable_logs: bool,
46    pub sampling_ratio: f64,
47}
48
49impl Default for TelemetryConfig {
50    fn default() -> Self {
51        Self {
52            otlp_endpoint: "http://localhost:4318".to_string(),
53            service_name: "forge-service".to_string(),
54            service_version: "0.1.0".to_string(),
55            environment: "development".to_string(),
56            enable_traces: true,
57            enable_metrics: true,
58            enable_logs: true,
59            sampling_ratio: 1.0,
60        }
61    }
62}
63
64impl TelemetryConfig {
65    pub fn new(service_name: impl Into<String>) -> Self {
66        Self {
67            service_name: service_name.into(),
68            ..Default::default()
69        }
70    }
71
72    /// Create config from ForgeConfig's observability settings.
73    pub fn from_observability_config(
74        obs: &forge_core::config::ObservabilityConfig,
75        project_name: &str,
76        project_version: &str,
77    ) -> Self {
78        // When observability is disabled, turn off OTLP export but keep the
79        // fmt subscriber so console logs still work.
80        let otlp_enabled = obs.enabled;
81        Self {
82            otlp_endpoint: obs.otlp_endpoint.clone(),
83            service_name: obs
84                .service_name
85                .clone()
86                .unwrap_or_else(|| project_name.to_string()),
87            service_version: project_version.to_string(),
88            environment: "production".to_string(),
89            enable_traces: otlp_enabled && obs.enable_traces,
90            enable_metrics: otlp_enabled && obs.enable_metrics,
91            enable_logs: otlp_enabled && obs.enable_logs,
92            sampling_ratio: obs.sampling_ratio,
93        }
94    }
95
96    pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
97        self.otlp_endpoint = endpoint.into();
98        self
99    }
100
101    pub fn with_version(mut self, version: impl Into<String>) -> Self {
102        self.service_version = version.into();
103        self
104    }
105
106    pub fn with_environment(mut self, env: impl Into<String>) -> Self {
107        self.environment = env.into();
108        self
109    }
110
111    pub fn with_traces(mut self, enabled: bool) -> Self {
112        self.enable_traces = enabled;
113        self
114    }
115
116    pub fn with_metrics(mut self, enabled: bool) -> Self {
117        self.enable_metrics = enabled;
118        self
119    }
120
121    pub fn with_logs(mut self, enabled: bool) -> Self {
122        self.enable_logs = enabled;
123        self
124    }
125
126    pub fn with_sampling_ratio(mut self, ratio: f64) -> Self {
127        self.sampling_ratio = ratio.clamp(0.0, 1.0);
128        self
129    }
130}
131
132fn build_resource(config: &TelemetryConfig) -> Resource {
133    let base = Resource::from_detectors(
134        std::time::Duration::from_secs(5),
135        vec![
136            Box::new(SdkProvidedResourceDetector),
137            Box::new(EnvResourceDetector::new()),
138        ],
139    );
140
141    let custom = Resource::new(vec![
142        KeyValue::new(SERVICE_NAME, config.service_name.clone()),
143        KeyValue::new(SERVICE_VERSION, config.service_version.clone()),
144        KeyValue::new(DEPLOYMENT_ENVIRONMENT_NAME, config.environment.clone()),
145    ]);
146
147    base.merge(&custom)
148}
149
150fn init_tracer(config: &TelemetryConfig) -> Result<TracerProvider, TelemetryError> {
151    let exporter = SpanExporter::builder()
152        .with_http()
153        .with_endpoint(&config.otlp_endpoint)
154        .build()
155        .map_err(|e| TelemetryError::TracerInit(e.to_string()))?;
156
157    let sampler = if config.sampling_ratio >= 1.0 {
158        Sampler::AlwaysOn
159    } else if config.sampling_ratio <= 0.0 {
160        Sampler::AlwaysOff
161    } else {
162        Sampler::TraceIdRatioBased(config.sampling_ratio)
163    };
164
165    let provider = TracerProvider::builder()
166        .with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
167        .with_sampler(sampler)
168        .with_id_generator(RandomIdGenerator::default())
169        .with_resource(build_resource(config))
170        .build();
171
172    Ok(provider)
173}
174
175fn init_meter(config: &TelemetryConfig) -> Result<SdkMeterProvider, TelemetryError> {
176    let exporter = MetricExporter::builder()
177        .with_http()
178        .with_endpoint(&config.otlp_endpoint)
179        .build()
180        .map_err(|e| TelemetryError::MeterInit(e.to_string()))?;
181
182    let reader = PeriodicReader::builder(exporter, opentelemetry_sdk::runtime::Tokio).build();
183
184    let provider = MeterProviderBuilder::default()
185        .with_reader(reader)
186        .with_resource(build_resource(config))
187        .build();
188
189    Ok(provider)
190}
191
192fn init_logger(config: &TelemetryConfig) -> Result<LoggerProvider, TelemetryError> {
193    let exporter = LogExporter::builder()
194        .with_http()
195        .with_endpoint(&config.otlp_endpoint)
196        .build()
197        .map_err(|e| TelemetryError::LoggerInit(e.to_string()))?;
198
199    let provider = LoggerProvider::builder()
200        .with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
201        .with_resource(build_resource(config))
202        .build();
203
204    Ok(provider)
205}
206
207/// `forge dev` sets RUST_LOG=warn,forge=info which silences user crate logs.
208/// This ensures the user's crate is always visible at the configured level.
209pub fn build_env_filter(project_name: &str, log_level: &str) -> EnvFilter {
210    let crate_name = project_name.replace('-', "_");
211
212    let base = if let Ok(filter) = EnvFilter::try_from_default_env() {
213        filter
214    } else {
215        EnvFilter::new(log_level)
216    };
217
218    // Always ensure the user crate is visible at the configured level
219    let directive = format!("{}={}", crate_name, log_level);
220    match directive.parse() {
221        Ok(d) => base.add_directive(d),
222        Err(_) => base,
223    }
224}
225
226/// Set up tracing so logs work without any user boilerplate.
227/// Returns `Ok(false)` if a subscriber already exists (user configured their own).
228pub fn init_telemetry(
229    config: &TelemetryConfig,
230    project_name: &str,
231    log_level: &str,
232) -> Result<bool, TelemetryError> {
233    global::set_text_map_propagator(TraceContextPropagator::new());
234
235    let env_filter = build_env_filter(project_name, log_level);
236
237    let registry = Registry::default().with(env_filter);
238
239    if config.enable_traces {
240        let tracer_provider = init_tracer(config)?;
241        let tracer = tracer_provider.tracer(config.service_name.clone());
242
243        TRACER_PROVIDER
244            .set(tracer_provider.clone())
245            .map_err(|_| TelemetryError::AlreadyInitialized)?;
246
247        global::set_tracer_provider(tracer_provider);
248
249        let otel_layer = OpenTelemetryLayer::new(tracer);
250        let fmt_layer = tracing_subscriber::fmt::layer()
251            .with_target(true)
252            .with_thread_ids(false)
253            .with_file(false)
254            .with_line_number(false);
255
256        if registry
257            .with(otel_layer)
258            .with(fmt_layer)
259            .try_init()
260            .is_err()
261        {
262            return Ok(false);
263        }
264    } else {
265        let fmt_layer = tracing_subscriber::fmt::layer()
266            .with_target(true)
267            .with_thread_ids(false)
268            .with_file(false)
269            .with_line_number(false);
270
271        if registry.with(fmt_layer).try_init().is_err() {
272            return Ok(false);
273        }
274    }
275
276    if config.enable_metrics {
277        let meter_provider = init_meter(config)?;
278
279        METER_PROVIDER
280            .set(meter_provider.clone())
281            .map_err(|_| TelemetryError::AlreadyInitialized)?;
282
283        global::set_meter_provider(meter_provider);
284    }
285
286    if config.enable_logs {
287        let logger_provider = init_logger(config)?;
288
289        LOGGER_PROVIDER
290            .set(logger_provider)
291            .map_err(|_| TelemetryError::AlreadyInitialized)?;
292    }
293
294    tracing::info!(
295        service = %config.service_name,
296        version = %config.service_version,
297        environment = %config.environment,
298        traces = config.enable_traces,
299        metrics = config.enable_metrics,
300        logs = config.enable_logs,
301        "telemetry initialized"
302    );
303
304    Ok(true)
305}
306
307pub fn shutdown_telemetry() {
308    tracing::info!("shutting down telemetry");
309
310    if let Some(provider) = TRACER_PROVIDER.get()
311        && let Err(e) = provider.shutdown()
312    {
313        tracing::warn!(error = %e, "failed to shutdown tracer provider");
314    }
315
316    if let Some(provider) = METER_PROVIDER.get()
317        && let Err(e) = provider.shutdown()
318    {
319        tracing::warn!(error = %e, "failed to shutdown meter provider");
320    }
321
322    if let Some(provider) = LOGGER_PROVIDER.get()
323        && let Err(e) = provider.shutdown()
324    {
325        tracing::warn!(error = %e, "failed to shutdown logger provider");
326    }
327}
328
329#[cfg(test)]
330mod tests {
331    use super::*;
332
333    #[test]
334    fn test_config_builder() {
335        let config = TelemetryConfig::new("test-service")
336            .with_endpoint("http://otel:4318")
337            .with_version("1.0.0")
338            .with_environment("production")
339            .with_traces(true)
340            .with_metrics(false)
341            .with_logs(true);
342
343        assert_eq!(config.service_name, "test-service");
344        assert_eq!(config.otlp_endpoint, "http://otel:4318");
345        assert_eq!(config.service_version, "1.0.0");
346        assert_eq!(config.environment, "production");
347        assert!(config.enable_traces);
348        assert!(!config.enable_metrics);
349        assert!(config.enable_logs);
350    }
351
352    #[test]
353    fn test_default_config() {
354        let config = TelemetryConfig::default();
355
356        assert_eq!(config.otlp_endpoint, "http://localhost:4318");
357        assert_eq!(config.service_name, "forge-service");
358        assert_eq!(config.environment, "development");
359        assert!(config.enable_traces);
360        assert!(config.enable_metrics);
361        assert!(config.enable_logs);
362    }
363}