forge_runtime/observability/
mod.rs1#[cfg(feature = "otel")]
12mod db;
13#[cfg(feature = "otel")]
14mod metrics;
15#[cfg(feature = "otel")]
16mod telemetry;
17
18#[cfg(feature = "otel")]
19pub use db::{extract_table_name, instrumented_query, record_pool_metrics, record_query_duration};
20#[cfg(feature = "otel")]
21pub use metrics::{
22 ActiveConnectionsGauge, FnCacheMetrics, FnMetrics, HttpMetrics, JobMetrics, NotifyMetrics,
23 SubscriptionMetrics, WorkflowSchedulerMetrics, record_fn_cache, record_fn_execution,
24 record_http_request, record_job_execution, record_lost_claim, record_notify_payload_bytes,
25 record_subscription_counts, record_workflow_scheduler_duration, set_active_connections,
26};
27#[cfg(feature = "otel")]
28pub use telemetry::{
29 TelemetryConfig, TelemetryError, build_env_filter, init_telemetry, shutdown_telemetry,
30};
31
32#[cfg(not(feature = "otel"))]
35mod stub {
36 use forge_core::config::ObservabilityConfig;
37 use sqlx::PgPool;
38 use std::time::Duration;
39
40 #[derive(Debug, Clone, Default)]
43 pub struct TelemetryConfig {
44 pub otlp_endpoint: String,
45 pub enable_traces: bool,
46 pub enable_metrics: bool,
47 pub enable_logs: bool,
48 pub sampling_ratio: f64,
49 }
50
51 impl TelemetryConfig {
52 pub fn from_observability_config(
53 _cfg: &ObservabilityConfig,
54 _service_name: &str,
55 _service_version: &str,
56 ) -> Self {
57 Self::default()
58 }
59 }
60
61 #[derive(Debug, thiserror::Error)]
62 pub enum TelemetryError {
63 #[error("telemetry disabled: build with the `otel` feature to enable OpenTelemetry")]
64 Disabled,
65 }
66
67 pub fn init_telemetry(
71 _cfg: &TelemetryConfig,
72 _service_name: &str,
73 log_level: &str,
74 ) -> Result<bool, TelemetryError> {
75 use tracing_subscriber::EnvFilter;
76 use tracing_subscriber::fmt;
77 use tracing_subscriber::prelude::*;
78
79 let filter =
80 EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(log_level));
81
82 let _ = tracing_subscriber::registry()
85 .with(filter)
86 .with(fmt::layer())
87 .try_init();
88
89 Ok(false)
90 }
91
92 pub fn shutdown_telemetry() {}
93
94 pub fn build_env_filter(_log_level: &str) -> tracing_subscriber::EnvFilter {
95 tracing_subscriber::EnvFilter::new("info")
96 }
97
98 #[inline]
99 pub fn record_pool_metrics(_pool: &PgPool) {}
100
101 #[inline]
102 pub fn record_query_duration(_operation: &str, _duration: Duration) {}
103
104 #[inline]
105 pub fn record_fn_execution(
106 _function: &str,
107 _kind: &str,
108 _success: bool,
109 _cached: bool,
110 _duration_secs: f64,
111 ) {
112 }
113
114 #[inline]
115 pub fn record_fn_cache(_function: &str, _hit: bool) {}
116
117 #[inline]
118 pub fn record_http_request(_method: &str, _path: &str, _status: u16, _duration_secs: f64) {}
119
120 #[inline]
121 pub fn record_job_execution(_job_type: &str, _status: &'static str, _duration_secs: f64) {}
122
123 #[inline]
124 pub fn record_lost_claim(_job_type: &str) {}
125
126 #[inline]
127 pub fn set_active_connections(_connection_type: &'static str, _delta: i64) {}
128
129 #[inline]
130 pub fn record_notify_payload_bytes(_channel: &str, _bytes: usize) {}
131
132 #[inline]
133 pub fn record_subscription_counts(_subscribers: usize, _groups: usize, _tables: usize) {}
134
135 #[inline]
136 pub fn record_workflow_scheduler_duration(_duration_secs: f64) {}
137
138 pub fn extract_table_name(_sql: &str) -> Option<&str> {
139 None
140 }
141
142 #[inline]
143 pub async fn instrumented_query<F, T, E>(
144 _operation: &str,
145 _table: Option<&str>,
146 f: F,
147 ) -> Result<T, E>
148 where
149 F: std::future::Future<Output = Result<T, E>>,
150 {
151 f.await
152 }
153}
154
155#[cfg(not(feature = "otel"))]
156pub use stub::{
157 TelemetryConfig, TelemetryError, build_env_filter, extract_table_name, init_telemetry,
158 instrumented_query, record_fn_cache, record_fn_execution, record_http_request,
159 record_job_execution, record_lost_claim, record_notify_payload_bytes, record_pool_metrics,
160 record_query_duration, record_subscription_counts, record_workflow_scheduler_duration,
161 set_active_connections, shutdown_telemetry,
162};