forge_runtime/observability/
collector.rs

1use std::collections::VecDeque;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU64, Ordering};
4
5use tokio::sync::{RwLock, mpsc};
6
7use forge_core::LogLevel;
8use forge_core::observability::{LogEntry, Metric, Span};
9
10use super::config::{LogsConfig, MetricsConfig, TracesConfig};
11
12/// Metrics collector for buffering and batching metrics.
13pub struct MetricsCollector {
14    config: MetricsConfig,
15    buffer: Arc<RwLock<VecDeque<Metric>>>,
16    sender: mpsc::Sender<Vec<Metric>>,
17    #[allow(dead_code)]
18    receiver: Arc<RwLock<mpsc::Receiver<Vec<Metric>>>>,
19    counter: AtomicU64,
20}
21
22impl MetricsCollector {
23    /// Create a new metrics collector.
24    pub fn new(config: MetricsConfig) -> Self {
25        let (sender, receiver) = mpsc::channel(1024);
26        Self {
27            config,
28            buffer: Arc::new(RwLock::new(VecDeque::new())),
29            sender,
30            receiver: Arc::new(RwLock::new(receiver)),
31            counter: AtomicU64::new(0),
32        }
33    }
34
35    /// Record a metric.
36    pub async fn record(&self, metric: Metric) {
37        let mut buffer = self.buffer.write().await;
38        buffer.push_back(metric);
39        self.counter.fetch_add(1, Ordering::Relaxed);
40
41        // Flush if buffer is full
42        if buffer.len() >= self.config.buffer_size {
43            let batch: Vec<Metric> = buffer.drain(..).collect();
44            let _ = self.sender.send(batch).await;
45        }
46    }
47
48    /// Record a counter increment.
49    pub async fn increment_counter(&self, name: impl Into<String>, value: f64) {
50        self.record(Metric::counter(name, value)).await;
51    }
52
53    /// Record a gauge value.
54    pub async fn set_gauge(&self, name: impl Into<String>, value: f64) {
55        self.record(Metric::gauge(name, value)).await;
56    }
57
58    /// Flush the buffer.
59    pub async fn flush(&self) {
60        let mut buffer = self.buffer.write().await;
61        if !buffer.is_empty() {
62            let batch: Vec<Metric> = buffer.drain(..).collect();
63            let _ = self.sender.send(batch).await;
64        }
65    }
66
67    /// Drain the buffer and return all metrics.
68    ///
69    /// This is used by the flush loop to get metrics for persistence.
70    pub async fn drain(&self) -> Vec<Metric> {
71        let mut buffer = self.buffer.write().await;
72        buffer.drain(..).collect()
73    }
74
75    /// Get the flush receiver for consuming batches.
76    pub fn subscribe(&self) -> mpsc::Receiver<Vec<Metric>> {
77        let (_tx, rx) = mpsc::channel(1024);
78        // Note: In a real implementation, this would clone the sender
79        // For simplicity, we're creating a new channel here
80        rx
81    }
82
83    /// Get collected metrics count.
84    pub fn count(&self) -> u64 {
85        self.counter.load(Ordering::Relaxed)
86    }
87
88    /// Get current buffer size.
89    pub async fn buffer_size(&self) -> usize {
90        self.buffer.read().await.len()
91    }
92
93    /// Run the flush loop.
94    pub async fn run(&self) {
95        let mut interval = tokio::time::interval(self.config.flush_interval);
96        loop {
97            interval.tick().await;
98            self.flush().await;
99        }
100    }
101}
102
103/// Log collector for buffering and filtering logs.
104pub struct LogCollector {
105    config: LogsConfig,
106    buffer: Arc<RwLock<VecDeque<LogEntry>>>,
107    sender: mpsc::Sender<Vec<LogEntry>>,
108    counter: AtomicU64,
109}
110
111impl LogCollector {
112    /// Create a new log collector.
113    pub fn new(config: LogsConfig) -> Self {
114        let (sender, _receiver) = mpsc::channel(1024);
115        Self {
116            config,
117            buffer: Arc::new(RwLock::new(VecDeque::new())),
118            sender,
119            counter: AtomicU64::new(0),
120        }
121    }
122
123    /// Record a log entry.
124    pub async fn record(&self, entry: LogEntry) {
125        // Filter by log level
126        if !entry.matches_level(self.config.level) {
127            return;
128        }
129
130        let mut buffer = self.buffer.write().await;
131        buffer.push_back(entry);
132        self.counter.fetch_add(1, Ordering::Relaxed);
133
134        // Flush if buffer is full
135        if buffer.len() >= self.config.buffer_size {
136            let batch: Vec<LogEntry> = buffer.drain(..).collect();
137            let _ = self.sender.send(batch).await;
138        }
139    }
140
141    /// Log at trace level.
142    pub async fn trace(&self, message: impl Into<String>) {
143        self.record(LogEntry::trace(message)).await;
144    }
145
146    /// Log at debug level.
147    pub async fn debug(&self, message: impl Into<String>) {
148        self.record(LogEntry::debug(message)).await;
149    }
150
151    /// Log at info level.
152    pub async fn info(&self, message: impl Into<String>) {
153        self.record(LogEntry::info(message)).await;
154    }
155
156    /// Log at warn level.
157    pub async fn warn(&self, message: impl Into<String>) {
158        self.record(LogEntry::warn(message)).await;
159    }
160
161    /// Log at error level.
162    pub async fn error(&self, message: impl Into<String>) {
163        self.record(LogEntry::error(message)).await;
164    }
165
166    /// Flush the buffer.
167    pub async fn flush(&self) {
168        let mut buffer = self.buffer.write().await;
169        if !buffer.is_empty() {
170            let batch: Vec<LogEntry> = buffer.drain(..).collect();
171            let _ = self.sender.send(batch).await;
172        }
173    }
174
175    /// Drain the buffer and return all logs.
176    ///
177    /// This is used by the flush loop to get logs for persistence.
178    pub async fn drain(&self) -> Vec<LogEntry> {
179        let mut buffer = self.buffer.write().await;
180        buffer.drain(..).collect()
181    }
182
183    /// Get collected log count.
184    pub fn count(&self) -> u64 {
185        self.counter.load(Ordering::Relaxed)
186    }
187
188    /// Get current buffer size.
189    pub async fn buffer_size(&self) -> usize {
190        self.buffer.read().await.len()
191    }
192
193    /// Get the minimum log level.
194    pub fn min_level(&self) -> LogLevel {
195        self.config.level
196    }
197}
198
199/// Trace collector for sampling and batching traces.
200pub struct TraceCollector {
201    config: TracesConfig,
202    buffer: Arc<RwLock<VecDeque<Span>>>,
203    sender: mpsc::Sender<Vec<Span>>,
204    counter: AtomicU64,
205    sampled_counter: AtomicU64,
206}
207
208impl TraceCollector {
209    /// Create a new trace collector.
210    pub fn new(config: TracesConfig) -> Self {
211        let (sender, _receiver) = mpsc::channel(1024);
212        Self {
213            config,
214            buffer: Arc::new(RwLock::new(VecDeque::new())),
215            sender,
216            counter: AtomicU64::new(0),
217            sampled_counter: AtomicU64::new(0),
218        }
219    }
220
221    /// Record a span.
222    pub async fn record(&self, span: Span) {
223        self.counter.fetch_add(1, Ordering::Relaxed);
224
225        // Sample decision
226        let should_sample = self.should_sample(&span);
227        if !should_sample {
228            return;
229        }
230
231        self.sampled_counter.fetch_add(1, Ordering::Relaxed);
232
233        let mut buffer = self.buffer.write().await;
234        buffer.push_back(span);
235    }
236
237    /// Check if a span should be sampled.
238    fn should_sample(&self, span: &Span) -> bool {
239        // Always sample errors if configured
240        if self.config.always_trace_errors && span.status == forge_core::SpanStatus::Error {
241            return true;
242        }
243
244        // Check if context is sampled
245        if !span.context.is_sampled() {
246            return false;
247        }
248
249        // Apply sample rate
250        if self.config.sample_rate >= 1.0 {
251            return true;
252        }
253
254        // Simple probabilistic sampling
255        let hash = span
256            .context
257            .trace_id
258            .as_str()
259            .as_bytes()
260            .iter()
261            .fold(0u64, |acc, b| acc.wrapping_mul(31).wrapping_add(*b as u64));
262        let threshold = (self.config.sample_rate * u64::MAX as f64) as u64;
263        hash < threshold
264    }
265
266    /// Flush the buffer.
267    pub async fn flush(&self) {
268        let mut buffer = self.buffer.write().await;
269        if !buffer.is_empty() {
270            let batch: Vec<Span> = buffer.drain(..).collect();
271            let _ = self.sender.send(batch).await;
272        }
273    }
274
275    /// Drain the buffer and return all spans.
276    ///
277    /// This is used by the flush loop to get spans for persistence.
278    pub async fn drain(&self) -> Vec<Span> {
279        let mut buffer = self.buffer.write().await;
280        buffer.drain(..).collect()
281    }
282
283    /// Get total span count.
284    pub fn count(&self) -> u64 {
285        self.counter.load(Ordering::Relaxed)
286    }
287
288    /// Get sampled span count.
289    pub fn sampled_count(&self) -> u64 {
290        self.sampled_counter.load(Ordering::Relaxed)
291    }
292
293    /// Get current buffer size.
294    pub async fn buffer_size(&self) -> usize {
295        self.buffer.read().await.len()
296    }
297
298    /// Get the sample rate.
299    pub fn sample_rate(&self) -> f64 {
300        self.config.sample_rate
301    }
302}
303
304/// System metrics collector for CPU, memory, disk, and network stats.
305///
306/// This collector periodically samples system metrics and records them
307/// to a MetricsCollector.
308pub struct SystemMetricsCollector {
309    system: RwLock<sysinfo::System>,
310    shutdown: Arc<RwLock<bool>>,
311}
312
313impl SystemMetricsCollector {
314    /// Create a new system metrics collector.
315    pub fn new() -> Self {
316        Self {
317            system: RwLock::new(sysinfo::System::new_all()),
318            shutdown: Arc::new(RwLock::new(false)),
319        }
320    }
321
322    /// Start collecting system metrics.
323    ///
324    /// This spawns a background task that periodically collects system metrics
325    /// and records them to the provided MetricsCollector.
326    pub fn start(
327        &self,
328        metrics: Arc<MetricsCollector>,
329        interval: std::time::Duration,
330    ) -> tokio::task::JoinHandle<()> {
331        let shutdown = self.shutdown.clone();
332        let system = RwLock::new(sysinfo::System::new_all());
333
334        tokio::spawn(async move {
335            let mut ticker = tokio::time::interval(interval);
336            loop {
337                ticker.tick().await;
338
339                if *shutdown.read().await {
340                    break;
341                }
342
343                // Refresh system info
344                {
345                    let mut sys = system.write().await;
346                    sys.refresh_all();
347
348                    // CPU usage (overall)
349                    let cpu_usage = sys.global_cpu_usage();
350                    metrics
351                        .set_gauge("forge_system_cpu_usage_percent", cpu_usage as f64)
352                        .await;
353
354                    // Memory
355                    let total_memory = sys.total_memory();
356                    let used_memory = sys.used_memory();
357                    let memory_usage_percent = if total_memory > 0 {
358                        (used_memory as f64 / total_memory as f64) * 100.0
359                    } else {
360                        0.0
361                    };
362                    metrics
363                        .set_gauge("forge_system_memory_total_bytes", total_memory as f64)
364                        .await;
365                    metrics
366                        .set_gauge("forge_system_memory_used_bytes", used_memory as f64)
367                        .await;
368                    metrics
369                        .set_gauge("forge_system_memory_usage_percent", memory_usage_percent)
370                        .await;
371
372                    // Swap
373                    let total_swap = sys.total_swap();
374                    let used_swap = sys.used_swap();
375                    metrics
376                        .set_gauge("forge_system_swap_total_bytes", total_swap as f64)
377                        .await;
378                    metrics
379                        .set_gauge("forge_system_swap_used_bytes", used_swap as f64)
380                        .await;
381
382                    // Per-CPU usage
383                    for (i, cpu) in sys.cpus().iter().enumerate() {
384                        let label = format!("cpu{}", i);
385                        let mut metric = Metric::gauge(
386                            "forge_system_cpu_core_usage_percent",
387                            cpu.cpu_usage() as f64,
388                        );
389                        metric.labels.insert("core".to_string(), label);
390                        metrics.record(metric).await;
391                    }
392                }
393
394                // Disk usage
395                let disks = sysinfo::Disks::new_with_refreshed_list();
396                for disk in disks.list() {
397                    let mount = disk.mount_point().to_string_lossy().to_string();
398                    let total = disk.total_space();
399                    let available = disk.available_space();
400                    let used = total.saturating_sub(available);
401                    let usage_percent = if total > 0 {
402                        (used as f64 / total as f64) * 100.0
403                    } else {
404                        0.0
405                    };
406
407                    let mut metric = Metric::gauge("forge_system_disk_total_bytes", total as f64);
408                    metric.labels.insert("mount".to_string(), mount.clone());
409                    metrics.record(metric).await;
410
411                    let mut metric = Metric::gauge("forge_system_disk_used_bytes", used as f64);
412                    metric.labels.insert("mount".to_string(), mount.clone());
413                    metrics.record(metric).await;
414
415                    let mut metric =
416                        Metric::gauge("forge_system_disk_usage_percent", usage_percent);
417                    metric.labels.insert("mount".to_string(), mount);
418                    metrics.record(metric).await;
419                }
420
421                // Load average (Unix only)
422                #[cfg(unix)]
423                {
424                    let load_avg = sysinfo::System::load_average();
425                    metrics
426                        .set_gauge("forge_system_load_1m", load_avg.one)
427                        .await;
428                    metrics
429                        .set_gauge("forge_system_load_5m", load_avg.five)
430                        .await;
431                    metrics
432                        .set_gauge("forge_system_load_15m", load_avg.fifteen)
433                        .await;
434                }
435            }
436
437            tracing::info!("System metrics collector stopped");
438        })
439    }
440
441    /// Stop the collector.
442    pub async fn stop(&self) {
443        let mut shutdown = self.shutdown.write().await;
444        *shutdown = true;
445    }
446
447    /// Get current system metrics snapshot.
448    pub async fn snapshot(&self) -> SystemMetricsSnapshot {
449        let mut sys = self.system.write().await;
450        sys.refresh_all();
451
452        let total_memory = sys.total_memory();
453        let used_memory = sys.used_memory();
454
455        SystemMetricsSnapshot {
456            cpu_usage_percent: sys.global_cpu_usage() as f64,
457            memory_total_bytes: total_memory,
458            memory_used_bytes: used_memory,
459            memory_usage_percent: if total_memory > 0 {
460                (used_memory as f64 / total_memory as f64) * 100.0
461            } else {
462                0.0
463            },
464            swap_total_bytes: sys.total_swap(),
465            swap_used_bytes: sys.used_swap(),
466        }
467    }
468}
469
470impl Default for SystemMetricsCollector {
471    fn default() -> Self {
472        Self::new()
473    }
474}
475
476/// Snapshot of system metrics.
477#[derive(Debug, Clone)]
478pub struct SystemMetricsSnapshot {
479    /// CPU usage percentage (0-100).
480    pub cpu_usage_percent: f64,
481    /// Total memory in bytes.
482    pub memory_total_bytes: u64,
483    /// Used memory in bytes.
484    pub memory_used_bytes: u64,
485    /// Memory usage percentage (0-100).
486    pub memory_usage_percent: f64,
487    /// Total swap in bytes.
488    pub swap_total_bytes: u64,
489    /// Used swap in bytes.
490    pub swap_used_bytes: u64,
491}
492
493#[cfg(test)]
494mod tests {
495    use super::*;
496
497    #[tokio::test]
498    async fn test_metrics_collector_record() {
499        let collector = MetricsCollector::new(MetricsConfig::default());
500
501        collector.increment_counter("test_counter", 1.0).await;
502        collector.set_gauge("test_gauge", 42.0).await;
503
504        assert_eq!(collector.count(), 2);
505        assert_eq!(collector.buffer_size().await, 2);
506    }
507
508    #[tokio::test]
509    async fn test_metrics_collector_flush() {
510        let config = MetricsConfig {
511            buffer_size: 2,
512            ..Default::default()
513        };
514        let collector = MetricsCollector::new(config);
515
516        collector.increment_counter("test1", 1.0).await;
517        collector.increment_counter("test2", 2.0).await;
518        // Buffer should auto-flush at 2
519
520        assert_eq!(collector.count(), 2);
521    }
522
523    #[tokio::test]
524    async fn test_log_collector_level_filter() {
525        let config = LogsConfig {
526            level: LogLevel::Warn,
527            ..Default::default()
528        };
529        let collector = LogCollector::new(config);
530
531        collector.debug("Debug message").await;
532        collector.info("Info message").await;
533        collector.warn("Warn message").await;
534        collector.error("Error message").await;
535
536        // Only warn and error should be collected
537        assert_eq!(collector.count(), 2);
538    }
539
540    #[tokio::test]
541    async fn test_log_collector_record() {
542        let collector = LogCollector::new(LogsConfig::default());
543
544        collector.info("Test message").await;
545        assert_eq!(collector.count(), 1);
546        assert_eq!(collector.buffer_size().await, 1);
547    }
548
549    #[tokio::test]
550    async fn test_trace_collector_sampling() {
551        let config = TracesConfig {
552            sample_rate: 1.0, // 100% sampling
553            ..Default::default()
554        };
555        let collector = TraceCollector::new(config);
556
557        let span = Span::new("test_span");
558        collector.record(span).await;
559
560        assert_eq!(collector.count(), 1);
561        assert_eq!(collector.sampled_count(), 1);
562    }
563
564    #[tokio::test]
565    async fn test_trace_collector_always_trace_errors() {
566        let config = TracesConfig {
567            sample_rate: 0.0, // No sampling
568            always_trace_errors: true,
569            ..Default::default()
570        };
571        let collector = TraceCollector::new(config);
572
573        let mut span = Span::new("error_span");
574        span.end_error("Test error");
575        collector.record(span).await;
576
577        // Error should still be recorded
578        assert_eq!(collector.sampled_count(), 1);
579    }
580}