forge_runtime/observability/
mod.rs1mod alerts;
2mod collector;
3mod config;
4mod partitions;
5mod storage;
6mod tracing_layer;
7
8pub use alerts::{
9 Alert, AlertCondition, AlertEvaluator, AlertRule, AlertSeverity, AlertStatus, AlertStore,
10};
11pub use collector::{
12 LogCollector, MetricsCollector, SystemMetricsCollector, SystemMetricsSnapshot, TraceCollector,
13};
14pub use config::{LogsConfig, MetricsConfig, ObservabilityConfig, TracesConfig};
15pub use partitions::{PartitionConfig, PartitionGranularity, PartitionManager};
16pub use storage::{LogStore, MetricsStore, TraceStore, TraceSummary};
17pub use tracing_layer::ForgeTracingLayer;
18
19use std::sync::Arc;
20use std::time::Duration;
21
22use forge_core::Result;
23use forge_core::observability::{LogEntry, Metric, Span};
24use tokio::sync::RwLock;
25
26#[derive(Clone)]
31pub struct ObservabilityState {
32 pub metrics_collector: Arc<MetricsCollector>,
34 pub log_collector: Arc<LogCollector>,
36 pub trace_collector: Arc<TraceCollector>,
38 pub system_metrics: Arc<SystemMetricsCollector>,
40 pub metrics_store: Arc<MetricsStore>,
42 pub log_store: Arc<LogStore>,
44 pub trace_store: Arc<TraceStore>,
46 pub alert_store: Arc<AlertStore>,
48 config: ObservabilityConfig,
50 enabled: bool,
52 shutdown: Arc<RwLock<bool>>,
54}
55
56impl ObservabilityState {
57 pub fn new(config: ObservabilityConfig, pool: sqlx::PgPool) -> Self {
59 let enabled = config.enabled;
60
61 let metrics_collector = Arc::new(MetricsCollector::new(config.metrics.clone()));
63 let log_collector = Arc::new(LogCollector::new(config.logs.clone()));
64 let trace_collector = Arc::new(TraceCollector::new(config.traces.clone()));
65 let system_metrics = Arc::new(SystemMetricsCollector::new());
66
67 let metrics_store = Arc::new(MetricsStore::new(pool.clone()));
69 let log_store = Arc::new(LogStore::new(pool.clone()));
70 let trace_store = Arc::new(TraceStore::new(pool.clone()));
71 let alert_store = Arc::new(AlertStore::new(pool));
72
73 Self {
74 metrics_collector,
75 log_collector,
76 trace_collector,
77 system_metrics,
78 metrics_store,
79 log_store,
80 trace_store,
81 alert_store,
82 config,
83 enabled,
84 shutdown: Arc::new(RwLock::new(false)),
85 }
86 }
87
88 pub fn is_enabled(&self) -> bool {
90 self.enabled
91 }
92
93 pub async fn record_metric(&self, metric: Metric) {
95 if self.enabled {
96 self.metrics_collector.record(metric).await;
97 }
98 }
99
100 pub async fn increment_counter(&self, name: impl Into<String>, value: f64) {
102 if self.enabled {
103 self.metrics_collector.increment_counter(name, value).await;
104 }
105 }
106
107 pub async fn set_gauge(&self, name: impl Into<String>, value: f64) {
109 if self.enabled {
110 self.metrics_collector.set_gauge(name, value).await;
111 }
112 }
113
114 pub async fn record_log(&self, log: LogEntry) {
116 if self.enabled {
117 self.log_collector.record(log).await;
118 }
119 }
120
121 pub async fn info(&self, message: impl Into<String>) {
123 if self.enabled {
124 self.log_collector.info(message).await;
125 }
126 }
127
128 pub async fn warn(&self, message: impl Into<String>) {
130 if self.enabled {
131 self.log_collector.warn(message).await;
132 }
133 }
134
135 pub async fn error(&self, message: impl Into<String>) {
137 if self.enabled {
138 self.log_collector.error(message).await;
139 }
140 }
141
142 pub async fn record_span(&self, span: Span) {
144 if self.enabled {
145 self.trace_collector.record(span).await;
146 }
147 }
148
149 pub async fn flush(&self) -> Result<()> {
151 if !self.enabled {
152 return Ok(());
153 }
154
155 self.metrics_collector.flush().await;
157
158 self.log_collector.flush().await;
160
161 self.trace_collector.flush().await;
163
164 Ok(())
165 }
166
167 pub fn start_background_tasks(&self) -> Vec<tokio::task::JoinHandle<()>> {
172 let mut handles = Vec::new();
173
174 if !self.enabled {
175 return handles;
176 }
177
178 {
180 let collector = self.metrics_collector.clone();
181 let store = self.metrics_store.clone();
182 let interval = self.config.metrics.flush_interval;
183 let shutdown = self.shutdown.clone();
184
185 handles.push(tokio::spawn(async move {
186 let mut ticker = tokio::time::interval(interval);
187 loop {
188 ticker.tick().await;
189
190 if *shutdown.read().await {
191 break;
192 }
193
194 let metrics = collector.drain().await;
196 if !metrics.is_empty() {
197 if let Err(e) = store.store(metrics).await {
198 tracing::warn!("Failed to persist metrics: {}", e);
199 }
200 }
201 }
202 }));
203 }
204
205 {
207 let collector = self.log_collector.clone();
208 let store = self.log_store.clone();
209 let shutdown = self.shutdown.clone();
210 let interval = Duration::from_secs(10); handles.push(tokio::spawn(async move {
213 let mut ticker = tokio::time::interval(interval);
214 loop {
215 ticker.tick().await;
216
217 if *shutdown.read().await {
218 break;
219 }
220
221 let logs = collector.drain().await;
223 if !logs.is_empty() {
224 if let Err(e) = store.store(logs).await {
225 tracing::warn!("Failed to persist logs: {}", e);
226 }
227 }
228 }
229 }));
230 }
231
232 {
234 let collector = self.trace_collector.clone();
235 let store = self.trace_store.clone();
236 let shutdown = self.shutdown.clone();
237 let interval = Duration::from_secs(10); handles.push(tokio::spawn(async move {
240 let mut ticker = tokio::time::interval(interval);
241 loop {
242 ticker.tick().await;
243
244 if *shutdown.read().await {
245 break;
246 }
247
248 let spans = collector.drain().await;
250 if !spans.is_empty() {
251 if let Err(e) = store.store(spans).await {
252 tracing::warn!("Failed to persist traces: {}", e);
253 }
254 }
255 }
256 }));
257 }
258
259 {
261 let handle = self
262 .system_metrics
263 .start(self.metrics_collector.clone(), Duration::from_secs(15));
264 handles.push(handle);
265 }
266
267 {
269 let metrics_store = self.metrics_store.clone();
270 let log_store = self.log_store.clone();
271 let trace_store = self.trace_store.clone();
272 let metrics_retention = self.config.metrics.raw_retention;
273 let logs_retention = self.config.logs.retention;
274 let traces_retention = self.config.traces.retention;
275 let shutdown = self.shutdown.clone();
276
277 handles.push(tokio::spawn(async move {
278 let mut ticker = tokio::time::interval(Duration::from_secs(3600)); loop {
280 ticker.tick().await;
281
282 if *shutdown.read().await {
283 break;
284 }
285
286 if let Err(e) = metrics_store.cleanup(metrics_retention).await {
288 tracing::warn!("Metrics cleanup error: {}", e);
289 }
290
291 if let Err(e) = log_store.cleanup(logs_retention).await {
292 tracing::warn!("Logs cleanup error: {}", e);
293 }
294
295 if let Err(e) = trace_store.cleanup(traces_retention).await {
296 tracing::warn!("Traces cleanup error: {}", e);
297 }
298 }
299 }));
300 }
301
302 handles
303 }
304
305 pub async fn shutdown(&self) {
307 let mut shutdown = self.shutdown.write().await;
308 *shutdown = true;
309
310 self.system_metrics.stop().await;
312
313 let _ = self.flush().await;
315 }
316
317 pub fn tracing_layer(&self) -> ForgeTracingLayer {
330 ForgeTracingLayer::new(self.log_collector.clone())
331 }
332}