forge_runtime/observability/
mod.rs

1mod alerts;
2mod collector;
3mod config;
4mod partitions;
5mod storage;
6mod tracing_layer;
7
8pub use alerts::{
9    Alert, AlertCondition, AlertEvaluator, AlertRule, AlertSeverity, AlertStatus, AlertStore,
10};
11pub use collector::{
12    LogCollector, MetricsCollector, SystemMetricsCollector, SystemMetricsSnapshot, TraceCollector,
13};
14pub use config::{LogsConfig, MetricsConfig, ObservabilityConfig, TracesConfig};
15pub use partitions::{PartitionConfig, PartitionGranularity, PartitionManager};
16pub use storage::{LogStore, MetricsStore, TraceStore, TraceSummary};
17pub use tracing_layer::ForgeTracingLayer;
18
19use std::sync::Arc;
20use std::time::Duration;
21
22use forge_core::Result;
23use forge_core::observability::{LogEntry, Metric, Span};
24use tokio::sync::RwLock;
25
26/// Shared observability state for the runtime.
27///
28/// This struct encapsulates all observability components (collectors and stores)
29/// and provides a unified interface for recording and querying observability data.
30#[derive(Clone)]
31pub struct ObservabilityState {
32    /// Metrics collector for buffering metrics.
33    pub metrics_collector: Arc<MetricsCollector>,
34    /// Log collector for buffering logs.
35    pub log_collector: Arc<LogCollector>,
36    /// Trace collector for buffering traces.
37    pub trace_collector: Arc<TraceCollector>,
38    /// System metrics collector.
39    pub system_metrics: Arc<SystemMetricsCollector>,
40    /// Metrics store for persistence.
41    pub metrics_store: Arc<MetricsStore>,
42    /// Log store for persistence.
43    pub log_store: Arc<LogStore>,
44    /// Trace store for persistence.
45    pub trace_store: Arc<TraceStore>,
46    /// Alert store for persistence.
47    pub alert_store: Arc<AlertStore>,
48    /// Configuration.
49    config: ObservabilityConfig,
50    /// Whether observability is enabled.
51    enabled: bool,
52    /// Shutdown flag.
53    shutdown: Arc<RwLock<bool>>,
54}
55
56impl ObservabilityState {
57    /// Create a new observability state from config and database pool.
58    pub fn new(config: ObservabilityConfig, pool: sqlx::PgPool) -> Self {
59        let enabled = config.enabled;
60
61        // Create collectors
62        let metrics_collector = Arc::new(MetricsCollector::new(config.metrics.clone()));
63        let log_collector = Arc::new(LogCollector::new(config.logs.clone()));
64        let trace_collector = Arc::new(TraceCollector::new(config.traces.clone()));
65        let system_metrics = Arc::new(SystemMetricsCollector::new());
66
67        // Create stores
68        let metrics_store = Arc::new(MetricsStore::new(pool.clone()));
69        let log_store = Arc::new(LogStore::new(pool.clone()));
70        let trace_store = Arc::new(TraceStore::new(pool.clone()));
71        let alert_store = Arc::new(AlertStore::new(pool));
72
73        Self {
74            metrics_collector,
75            log_collector,
76            trace_collector,
77            system_metrics,
78            metrics_store,
79            log_store,
80            trace_store,
81            alert_store,
82            config,
83            enabled,
84            shutdown: Arc::new(RwLock::new(false)),
85        }
86    }
87
88    /// Check if observability is enabled.
89    pub fn is_enabled(&self) -> bool {
90        self.enabled
91    }
92
93    /// Record a metric.
94    pub async fn record_metric(&self, metric: Metric) {
95        if self.enabled {
96            self.metrics_collector.record(metric).await;
97        }
98    }
99
100    /// Increment a counter metric.
101    pub async fn increment_counter(&self, name: impl Into<String>, value: f64) {
102        if self.enabled {
103            self.metrics_collector.increment_counter(name, value).await;
104        }
105    }
106
107    /// Set a gauge metric.
108    pub async fn set_gauge(&self, name: impl Into<String>, value: f64) {
109        if self.enabled {
110            self.metrics_collector.set_gauge(name, value).await;
111        }
112    }
113
114    /// Record a log entry.
115    pub async fn record_log(&self, log: LogEntry) {
116        if self.enabled {
117            self.log_collector.record(log).await;
118        }
119    }
120
121    /// Log at info level.
122    pub async fn info(&self, message: impl Into<String>) {
123        if self.enabled {
124            self.log_collector.info(message).await;
125        }
126    }
127
128    /// Log at warn level.
129    pub async fn warn(&self, message: impl Into<String>) {
130        if self.enabled {
131            self.log_collector.warn(message).await;
132        }
133    }
134
135    /// Log at error level.
136    pub async fn error(&self, message: impl Into<String>) {
137        if self.enabled {
138            self.log_collector.error(message).await;
139        }
140    }
141
142    /// Record a span.
143    pub async fn record_span(&self, span: Span) {
144        if self.enabled {
145            self.trace_collector.record(span).await;
146        }
147    }
148
149    /// Flush all collectors and persist to stores.
150    pub async fn flush(&self) -> Result<()> {
151        if !self.enabled {
152            return Ok(());
153        }
154
155        // Flush metrics
156        self.metrics_collector.flush().await;
157
158        // Flush logs
159        self.log_collector.flush().await;
160
161        // Flush traces
162        self.trace_collector.flush().await;
163
164        Ok(())
165    }
166
167    /// Start background flush loops.
168    ///
169    /// This spawns tasks that periodically flush collectors to stores
170    /// and run cleanup based on retention policies.
171    pub fn start_background_tasks(&self) -> Vec<tokio::task::JoinHandle<()>> {
172        let mut handles = Vec::new();
173
174        if !self.enabled {
175            return handles;
176        }
177
178        // Metrics flush loop
179        {
180            let collector = self.metrics_collector.clone();
181            let store = self.metrics_store.clone();
182            let interval = self.config.metrics.flush_interval;
183            let shutdown = self.shutdown.clone();
184
185            handles.push(tokio::spawn(async move {
186                let mut ticker = tokio::time::interval(interval);
187                loop {
188                    ticker.tick().await;
189
190                    if *shutdown.read().await {
191                        break;
192                    }
193
194                    // Drain collector buffer and persist to store
195                    let metrics = collector.drain().await;
196                    if !metrics.is_empty() {
197                        if let Err(e) = store.store(metrics).await {
198                            tracing::warn!("Failed to persist metrics: {}", e);
199                        }
200                    }
201                }
202            }));
203        }
204
205        // Logs flush loop
206        {
207            let collector = self.log_collector.clone();
208            let store = self.log_store.clone();
209            let shutdown = self.shutdown.clone();
210            let interval = Duration::from_secs(10); // Logs flush every 10s
211
212            handles.push(tokio::spawn(async move {
213                let mut ticker = tokio::time::interval(interval);
214                loop {
215                    ticker.tick().await;
216
217                    if *shutdown.read().await {
218                        break;
219                    }
220
221                    // Drain collector buffer and persist to store
222                    let logs = collector.drain().await;
223                    if !logs.is_empty() {
224                        if let Err(e) = store.store(logs).await {
225                            tracing::warn!("Failed to persist logs: {}", e);
226                        }
227                    }
228                }
229            }));
230        }
231
232        // Traces flush loop
233        {
234            let collector = self.trace_collector.clone();
235            let store = self.trace_store.clone();
236            let shutdown = self.shutdown.clone();
237            let interval = Duration::from_secs(10); // Traces flush every 10s
238
239            handles.push(tokio::spawn(async move {
240                let mut ticker = tokio::time::interval(interval);
241                loop {
242                    ticker.tick().await;
243
244                    if *shutdown.read().await {
245                        break;
246                    }
247
248                    // Drain collector buffer and persist to store
249                    let spans = collector.drain().await;
250                    if !spans.is_empty() {
251                        if let Err(e) = store.store(spans).await {
252                            tracing::warn!("Failed to persist traces: {}", e);
253                        }
254                    }
255                }
256            }));
257        }
258
259        // System metrics collection loop (every 15 seconds)
260        {
261            let handle = self
262                .system_metrics
263                .start(self.metrics_collector.clone(), Duration::from_secs(15));
264            handles.push(handle);
265        }
266
267        // Cleanup loop (runs less frequently)
268        {
269            let metrics_store = self.metrics_store.clone();
270            let log_store = self.log_store.clone();
271            let trace_store = self.trace_store.clone();
272            let metrics_retention = self.config.metrics.raw_retention;
273            let logs_retention = self.config.logs.retention;
274            let traces_retention = self.config.traces.retention;
275            let shutdown = self.shutdown.clone();
276
277            handles.push(tokio::spawn(async move {
278                let mut ticker = tokio::time::interval(Duration::from_secs(3600)); // Cleanup every hour
279                loop {
280                    ticker.tick().await;
281
282                    if *shutdown.read().await {
283                        break;
284                    }
285
286                    // Run cleanup for each store
287                    if let Err(e) = metrics_store.cleanup(metrics_retention).await {
288                        tracing::warn!("Metrics cleanup error: {}", e);
289                    }
290
291                    if let Err(e) = log_store.cleanup(logs_retention).await {
292                        tracing::warn!("Logs cleanup error: {}", e);
293                    }
294
295                    if let Err(e) = trace_store.cleanup(traces_retention).await {
296                        tracing::warn!("Traces cleanup error: {}", e);
297                    }
298                }
299            }));
300        }
301
302        handles
303    }
304
305    /// Signal shutdown to background tasks.
306    pub async fn shutdown(&self) {
307        let mut shutdown = self.shutdown.write().await;
308        *shutdown = true;
309
310        // Stop system metrics collector
311        self.system_metrics.stop().await;
312
313        // Final flush
314        let _ = self.flush().await;
315    }
316
317    /// Get a tracing layer that forwards logs to the LogCollector.
318    ///
319    /// Use this to add log collection to your tracing subscriber:
320    /// ```ignore
321    /// use tracing_subscriber::layer::SubscriberExt;
322    /// use tracing_subscriber::util::SubscriberInitExt;
323    ///
324    /// tracing_subscriber::registry()
325    ///     .with(observability.tracing_layer())
326    ///     .with(tracing_subscriber::fmt::layer())
327    ///     .init();
328    /// ```
329    pub fn tracing_layer(&self) -> ForgeTracingLayer {
330        ForgeTracingLayer::new(self.log_collector.clone())
331    }
332}