Skip to main content

forge_runtime/observability/
mod.rs

1//! Observability: traces, metrics, and structured logging.
2//!
3//! With the `otel` feature: real OpenTelemetry pipeline (OTLP exporters for
4//! traces/metrics/logs, instrumented DB queries, function/job/HTTP metrics).
5//!
6//! Without `otel`: every recording function is a no-op. Callers can record
7//! events unconditionally without compile-time cfg gates everywhere.
8//! `tracing-subscriber` log output still works via `init_telemetry`'s
9//! fallback path.
10
11#[cfg(feature = "otel")]
12mod db;
13#[cfg(feature = "otel")]
14mod metrics;
15#[cfg(feature = "otel")]
16mod telemetry;
17
18#[cfg(feature = "otel")]
19pub use db::{extract_table_name, instrumented_query, record_pool_metrics, record_query_duration};
20#[cfg(feature = "otel")]
21pub use metrics::{
22    ActiveConnectionsGauge, FnCacheMetrics, FnMetrics, HttpMetrics, JobMetrics, NotifyMetrics,
23    SubscriptionMetrics, WorkflowSchedulerMetrics, record_fn_cache, record_fn_execution,
24    record_http_request, record_job_execution, record_lost_claim, record_notify_payload_bytes,
25    record_subscription_counts, record_workflow_scheduler_duration, set_active_connections,
26};
27#[cfg(feature = "otel")]
28pub use telemetry::{
29    TelemetryConfig, TelemetryError, build_env_filter, init_telemetry, shutdown_telemetry,
30};
31
32// --- No-op stubs when otel is disabled ---
33
34#[cfg(not(feature = "otel"))]
35mod stub {
36    use forge_core::config::ObservabilityConfig;
37    use sqlx::PgPool;
38    use std::time::Duration;
39
40    /// Minimal telemetry config that mirrors the real one's surface so
41    /// callers don't need cfg gates.
42    #[derive(Debug, Clone, Default)]
43    pub struct TelemetryConfig {
44        pub otlp_endpoint: String,
45        pub enable_traces: bool,
46        pub enable_metrics: bool,
47        pub enable_logs: bool,
48        pub sampling_ratio: f64,
49    }
50
51    impl TelemetryConfig {
52        pub fn from_observability_config(
53            _cfg: &ObservabilityConfig,
54            _service_name: &str,
55            _service_version: &str,
56        ) -> Self {
57            Self::default()
58        }
59    }
60
61    #[derive(Debug, thiserror::Error)]
62    pub enum TelemetryError {
63        #[error("telemetry disabled: build with the `otel` feature to enable OpenTelemetry")]
64        Disabled,
65    }
66
67    /// When `otel` is off, install a basic `tracing-subscriber` formatter so
68    /// log output still reaches stderr. Returns `Ok(false)` to indicate that
69    /// no exporters were installed.
70    pub fn init_telemetry(
71        _cfg: &TelemetryConfig,
72        _service_name: &str,
73        log_level: &str,
74    ) -> Result<bool, TelemetryError> {
75        use tracing_subscriber::EnvFilter;
76        use tracing_subscriber::fmt;
77        use tracing_subscriber::prelude::*;
78
79        let filter =
80            EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(log_level));
81
82        // Idempotent install: ignore "already set" errors so tests and
83        // multi-init paths don't panic.
84        let _ = tracing_subscriber::registry()
85            .with(filter)
86            .with(fmt::layer())
87            .try_init();
88
89        Ok(false)
90    }
91
92    pub fn shutdown_telemetry() {}
93
94    pub fn build_env_filter(_log_level: &str) -> tracing_subscriber::EnvFilter {
95        tracing_subscriber::EnvFilter::new("info")
96    }
97
98    #[inline]
99    pub fn record_pool_metrics(_pool: &PgPool) {}
100
101    #[inline]
102    pub fn record_query_duration(_operation: &str, _duration: Duration) {}
103
104    #[inline]
105    pub fn record_fn_execution(
106        _function: &str,
107        _kind: &str,
108        _success: bool,
109        _cached: bool,
110        _duration_secs: f64,
111    ) {
112    }
113
114    #[inline]
115    pub fn record_fn_cache(_function: &str, _hit: bool) {}
116
117    #[inline]
118    pub fn record_http_request(_method: &str, _path: &str, _status: u16, _duration_secs: f64) {}
119
120    #[inline]
121    pub fn record_job_execution(_job_type: &str, _status: &'static str, _duration_secs: f64) {}
122
123    #[inline]
124    pub fn record_lost_claim(_job_type: &str) {}
125
126    #[inline]
127    pub fn set_active_connections(_connection_type: &'static str, _delta: i64) {}
128
129    #[inline]
130    pub fn record_notify_payload_bytes(_channel: &str, _bytes: usize) {}
131
132    #[inline]
133    pub fn record_subscription_counts(_subscribers: usize, _groups: usize, _tables: usize) {}
134
135    #[inline]
136    pub fn record_workflow_scheduler_duration(_duration_secs: f64) {}
137
138    pub fn extract_table_name(_sql: &str) -> Option<&str> {
139        None
140    }
141
142    #[inline]
143    pub async fn instrumented_query<F, T, E>(
144        _operation: &str,
145        _table: Option<&str>,
146        f: F,
147    ) -> Result<T, E>
148    where
149        F: std::future::Future<Output = Result<T, E>>,
150    {
151        f.await
152    }
153}
154
155#[cfg(not(feature = "otel"))]
156pub use stub::{
157    TelemetryConfig, TelemetryError, build_env_filter, extract_table_name, init_telemetry,
158    instrumented_query, record_fn_cache, record_fn_execution, record_http_request,
159    record_job_execution, record_lost_claim, record_notify_payload_bytes, record_pool_metrics,
160    record_query_duration, record_subscription_counts, record_workflow_scheduler_duration,
161    set_active_connections, shutdown_telemetry,
162};