forge_runtime/observability/
collector.rs1use std::collections::VecDeque;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU64, Ordering};
4
5use tokio::sync::{RwLock, mpsc};
6
7use forge_core::LogLevel;
8use forge_core::observability::{LogEntry, Metric, Span};
9
10use super::config::{LogsConfig, MetricsConfig, TracesConfig};
11
12pub struct MetricsCollector {
14 config: MetricsConfig,
15 buffer: Arc<RwLock<VecDeque<Metric>>>,
16 sender: mpsc::Sender<Vec<Metric>>,
17 #[allow(dead_code)]
18 receiver: Arc<RwLock<mpsc::Receiver<Vec<Metric>>>>,
19 counter: AtomicU64,
20}
21
22impl MetricsCollector {
23 pub fn new(config: MetricsConfig) -> Self {
25 let (sender, receiver) = mpsc::channel(1024);
26 Self {
27 config,
28 buffer: Arc::new(RwLock::new(VecDeque::new())),
29 sender,
30 receiver: Arc::new(RwLock::new(receiver)),
31 counter: AtomicU64::new(0),
32 }
33 }
34
35 pub async fn record(&self, metric: Metric) {
37 let mut buffer = self.buffer.write().await;
38 buffer.push_back(metric);
39 self.counter.fetch_add(1, Ordering::Relaxed);
40
41 if buffer.len() >= self.config.buffer_size {
43 let batch: Vec<Metric> = buffer.drain(..).collect();
44 let _ = self.sender.send(batch).await;
45 }
46 }
47
48 pub async fn increment_counter(&self, name: impl Into<String>, value: f64) {
50 self.record(Metric::counter(name, value)).await;
51 }
52
53 pub async fn set_gauge(&self, name: impl Into<String>, value: f64) {
55 self.record(Metric::gauge(name, value)).await;
56 }
57
58 pub async fn flush(&self) {
60 let mut buffer = self.buffer.write().await;
61 if !buffer.is_empty() {
62 let batch: Vec<Metric> = buffer.drain(..).collect();
63 let _ = self.sender.send(batch).await;
64 }
65 }
66
67 pub async fn drain(&self) -> Vec<Metric> {
71 let mut buffer = self.buffer.write().await;
72 buffer.drain(..).collect()
73 }
74
75 pub fn subscribe(&self) -> mpsc::Receiver<Vec<Metric>> {
77 let (_tx, rx) = mpsc::channel(1024);
78 rx
81 }
82
83 pub fn count(&self) -> u64 {
85 self.counter.load(Ordering::Relaxed)
86 }
87
88 pub async fn buffer_size(&self) -> usize {
90 self.buffer.read().await.len()
91 }
92
93 pub async fn run(&self) {
95 let mut interval = tokio::time::interval(self.config.flush_interval);
96 loop {
97 interval.tick().await;
98 self.flush().await;
99 }
100 }
101}
102
103pub struct LogCollector {
105 config: LogsConfig,
106 buffer: Arc<RwLock<VecDeque<LogEntry>>>,
107 sender: mpsc::Sender<Vec<LogEntry>>,
108 counter: AtomicU64,
109}
110
111impl LogCollector {
112 pub fn new(config: LogsConfig) -> Self {
114 let (sender, _receiver) = mpsc::channel(1024);
115 Self {
116 config,
117 buffer: Arc::new(RwLock::new(VecDeque::new())),
118 sender,
119 counter: AtomicU64::new(0),
120 }
121 }
122
123 pub async fn record(&self, entry: LogEntry) {
125 if !entry.matches_level(self.config.level) {
127 return;
128 }
129
130 let mut buffer = self.buffer.write().await;
131 buffer.push_back(entry);
132 self.counter.fetch_add(1, Ordering::Relaxed);
133
134 if buffer.len() >= self.config.buffer_size {
136 let batch: Vec<LogEntry> = buffer.drain(..).collect();
137 let _ = self.sender.send(batch).await;
138 }
139 }
140
141 pub async fn trace(&self, message: impl Into<String>) {
143 self.record(LogEntry::trace(message)).await;
144 }
145
146 pub async fn debug(&self, message: impl Into<String>) {
148 self.record(LogEntry::debug(message)).await;
149 }
150
151 pub async fn info(&self, message: impl Into<String>) {
153 self.record(LogEntry::info(message)).await;
154 }
155
156 pub async fn warn(&self, message: impl Into<String>) {
158 self.record(LogEntry::warn(message)).await;
159 }
160
161 pub async fn error(&self, message: impl Into<String>) {
163 self.record(LogEntry::error(message)).await;
164 }
165
166 pub async fn flush(&self) {
168 let mut buffer = self.buffer.write().await;
169 if !buffer.is_empty() {
170 let batch: Vec<LogEntry> = buffer.drain(..).collect();
171 let _ = self.sender.send(batch).await;
172 }
173 }
174
175 pub async fn drain(&self) -> Vec<LogEntry> {
179 let mut buffer = self.buffer.write().await;
180 buffer.drain(..).collect()
181 }
182
183 pub fn count(&self) -> u64 {
185 self.counter.load(Ordering::Relaxed)
186 }
187
188 pub async fn buffer_size(&self) -> usize {
190 self.buffer.read().await.len()
191 }
192
193 pub fn min_level(&self) -> LogLevel {
195 self.config.level
196 }
197}
198
199pub struct TraceCollector {
201 config: TracesConfig,
202 buffer: Arc<RwLock<VecDeque<Span>>>,
203 sender: mpsc::Sender<Vec<Span>>,
204 counter: AtomicU64,
205 sampled_counter: AtomicU64,
206}
207
208impl TraceCollector {
209 pub fn new(config: TracesConfig) -> Self {
211 let (sender, _receiver) = mpsc::channel(1024);
212 Self {
213 config,
214 buffer: Arc::new(RwLock::new(VecDeque::new())),
215 sender,
216 counter: AtomicU64::new(0),
217 sampled_counter: AtomicU64::new(0),
218 }
219 }
220
221 pub async fn record(&self, span: Span) {
223 self.counter.fetch_add(1, Ordering::Relaxed);
224
225 let should_sample = self.should_sample(&span);
227 if !should_sample {
228 return;
229 }
230
231 self.sampled_counter.fetch_add(1, Ordering::Relaxed);
232
233 let mut buffer = self.buffer.write().await;
234 buffer.push_back(span);
235 }
236
237 fn should_sample(&self, span: &Span) -> bool {
239 if self.config.always_trace_errors && span.status == forge_core::SpanStatus::Error {
241 return true;
242 }
243
244 if !span.context.is_sampled() {
246 return false;
247 }
248
249 if self.config.sample_rate >= 1.0 {
251 return true;
252 }
253
254 let hash = span
256 .context
257 .trace_id
258 .as_str()
259 .as_bytes()
260 .iter()
261 .fold(0u64, |acc, b| acc.wrapping_mul(31).wrapping_add(*b as u64));
262 let threshold = (self.config.sample_rate * u64::MAX as f64) as u64;
263 hash < threshold
264 }
265
266 pub async fn flush(&self) {
268 let mut buffer = self.buffer.write().await;
269 if !buffer.is_empty() {
270 let batch: Vec<Span> = buffer.drain(..).collect();
271 let _ = self.sender.send(batch).await;
272 }
273 }
274
275 pub async fn drain(&self) -> Vec<Span> {
279 let mut buffer = self.buffer.write().await;
280 buffer.drain(..).collect()
281 }
282
283 pub fn count(&self) -> u64 {
285 self.counter.load(Ordering::Relaxed)
286 }
287
288 pub fn sampled_count(&self) -> u64 {
290 self.sampled_counter.load(Ordering::Relaxed)
291 }
292
293 pub async fn buffer_size(&self) -> usize {
295 self.buffer.read().await.len()
296 }
297
298 pub fn sample_rate(&self) -> f64 {
300 self.config.sample_rate
301 }
302}
303
304pub struct SystemMetricsCollector {
309 system: RwLock<sysinfo::System>,
310 shutdown: Arc<RwLock<bool>>,
311}
312
313impl SystemMetricsCollector {
314 pub fn new() -> Self {
316 Self {
317 system: RwLock::new(sysinfo::System::new_all()),
318 shutdown: Arc::new(RwLock::new(false)),
319 }
320 }
321
322 pub fn start(
327 &self,
328 metrics: Arc<MetricsCollector>,
329 interval: std::time::Duration,
330 ) -> tokio::task::JoinHandle<()> {
331 let shutdown = self.shutdown.clone();
332 let system = RwLock::new(sysinfo::System::new_all());
333
334 tokio::spawn(async move {
335 let mut ticker = tokio::time::interval(interval);
336 loop {
337 ticker.tick().await;
338
339 if *shutdown.read().await {
340 break;
341 }
342
343 {
345 let mut sys = system.write().await;
346 sys.refresh_all();
347
348 let cpu_usage = sys.global_cpu_usage();
350 metrics
351 .set_gauge("forge_system_cpu_usage_percent", cpu_usage as f64)
352 .await;
353
354 let total_memory = sys.total_memory();
356 let used_memory = sys.used_memory();
357 let memory_usage_percent = if total_memory > 0 {
358 (used_memory as f64 / total_memory as f64) * 100.0
359 } else {
360 0.0
361 };
362 metrics
363 .set_gauge("forge_system_memory_total_bytes", total_memory as f64)
364 .await;
365 metrics
366 .set_gauge("forge_system_memory_used_bytes", used_memory as f64)
367 .await;
368 metrics
369 .set_gauge("forge_system_memory_usage_percent", memory_usage_percent)
370 .await;
371
372 let total_swap = sys.total_swap();
374 let used_swap = sys.used_swap();
375 metrics
376 .set_gauge("forge_system_swap_total_bytes", total_swap as f64)
377 .await;
378 metrics
379 .set_gauge("forge_system_swap_used_bytes", used_swap as f64)
380 .await;
381
382 for (i, cpu) in sys.cpus().iter().enumerate() {
384 let label = format!("cpu{}", i);
385 let mut metric = Metric::gauge(
386 "forge_system_cpu_core_usage_percent",
387 cpu.cpu_usage() as f64,
388 );
389 metric.labels.insert("core".to_string(), label);
390 metrics.record(metric).await;
391 }
392 }
393
394 let disks = sysinfo::Disks::new_with_refreshed_list();
396 for disk in disks.list() {
397 let mount = disk.mount_point().to_string_lossy().to_string();
398 let total = disk.total_space();
399 let available = disk.available_space();
400 let used = total.saturating_sub(available);
401 let usage_percent = if total > 0 {
402 (used as f64 / total as f64) * 100.0
403 } else {
404 0.0
405 };
406
407 let mut metric = Metric::gauge("forge_system_disk_total_bytes", total as f64);
408 metric.labels.insert("mount".to_string(), mount.clone());
409 metrics.record(metric).await;
410
411 let mut metric = Metric::gauge("forge_system_disk_used_bytes", used as f64);
412 metric.labels.insert("mount".to_string(), mount.clone());
413 metrics.record(metric).await;
414
415 let mut metric =
416 Metric::gauge("forge_system_disk_usage_percent", usage_percent);
417 metric.labels.insert("mount".to_string(), mount);
418 metrics.record(metric).await;
419 }
420
421 #[cfg(unix)]
423 {
424 let load_avg = sysinfo::System::load_average();
425 metrics
426 .set_gauge("forge_system_load_1m", load_avg.one)
427 .await;
428 metrics
429 .set_gauge("forge_system_load_5m", load_avg.five)
430 .await;
431 metrics
432 .set_gauge("forge_system_load_15m", load_avg.fifteen)
433 .await;
434 }
435 }
436
437 tracing::info!("System metrics collector stopped");
438 })
439 }
440
441 pub async fn stop(&self) {
443 let mut shutdown = self.shutdown.write().await;
444 *shutdown = true;
445 }
446
447 pub async fn snapshot(&self) -> SystemMetricsSnapshot {
449 let mut sys = self.system.write().await;
450 sys.refresh_all();
451
452 let total_memory = sys.total_memory();
453 let used_memory = sys.used_memory();
454
455 SystemMetricsSnapshot {
456 cpu_usage_percent: sys.global_cpu_usage() as f64,
457 memory_total_bytes: total_memory,
458 memory_used_bytes: used_memory,
459 memory_usage_percent: if total_memory > 0 {
460 (used_memory as f64 / total_memory as f64) * 100.0
461 } else {
462 0.0
463 },
464 swap_total_bytes: sys.total_swap(),
465 swap_used_bytes: sys.used_swap(),
466 }
467 }
468}
469
470impl Default for SystemMetricsCollector {
471 fn default() -> Self {
472 Self::new()
473 }
474}
475
476#[derive(Debug, Clone)]
478pub struct SystemMetricsSnapshot {
479 pub cpu_usage_percent: f64,
481 pub memory_total_bytes: u64,
483 pub memory_used_bytes: u64,
485 pub memory_usage_percent: f64,
487 pub swap_total_bytes: u64,
489 pub swap_used_bytes: u64,
491}
492
493#[cfg(test)]
494mod tests {
495 use super::*;
496
497 #[tokio::test]
498 async fn test_metrics_collector_record() {
499 let collector = MetricsCollector::new(MetricsConfig::default());
500
501 collector.increment_counter("test_counter", 1.0).await;
502 collector.set_gauge("test_gauge", 42.0).await;
503
504 assert_eq!(collector.count(), 2);
505 assert_eq!(collector.buffer_size().await, 2);
506 }
507
508 #[tokio::test]
509 async fn test_metrics_collector_flush() {
510 let config = MetricsConfig {
511 buffer_size: 2,
512 ..Default::default()
513 };
514 let collector = MetricsCollector::new(config);
515
516 collector.increment_counter("test1", 1.0).await;
517 collector.increment_counter("test2", 2.0).await;
518 assert_eq!(collector.count(), 2);
521 }
522
523 #[tokio::test]
524 async fn test_log_collector_level_filter() {
525 let config = LogsConfig {
526 level: LogLevel::Warn,
527 ..Default::default()
528 };
529 let collector = LogCollector::new(config);
530
531 collector.debug("Debug message").await;
532 collector.info("Info message").await;
533 collector.warn("Warn message").await;
534 collector.error("Error message").await;
535
536 assert_eq!(collector.count(), 2);
538 }
539
540 #[tokio::test]
541 async fn test_log_collector_record() {
542 let collector = LogCollector::new(LogsConfig::default());
543
544 collector.info("Test message").await;
545 assert_eq!(collector.count(), 1);
546 assert_eq!(collector.buffer_size().await, 1);
547 }
548
549 #[tokio::test]
550 async fn test_trace_collector_sampling() {
551 let config = TracesConfig {
552 sample_rate: 1.0, ..Default::default()
554 };
555 let collector = TraceCollector::new(config);
556
557 let span = Span::new("test_span");
558 collector.record(span).await;
559
560 assert_eq!(collector.count(), 1);
561 assert_eq!(collector.sampled_count(), 1);
562 }
563
564 #[tokio::test]
565 async fn test_trace_collector_always_trace_errors() {
566 let config = TracesConfig {
567 sample_rate: 0.0, always_trace_errors: true,
569 ..Default::default()
570 };
571 let collector = TraceCollector::new(config);
572
573 let mut span = Span::new("error_span");
574 span.end_error("Test error");
575 collector.record(span).await;
576
577 assert_eq!(collector.sampled_count(), 1);
579 }
580}