use std::collections::VecDeque;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
use forge_core::observability::{LogEntry, Metric, Span};
use forge_core::LogLevel;
use super::config::{LogsConfig, MetricsConfig, TracesConfig};
pub struct MetricsCollector {
config: MetricsConfig,
buffer: Arc<RwLock<VecDeque<Metric>>>,
sender: mpsc::Sender<Vec<Metric>>,
#[allow(dead_code)]
receiver: Arc<RwLock<mpsc::Receiver<Vec<Metric>>>>,
counter: AtomicU64,
}
impl MetricsCollector {
pub fn new(config: MetricsConfig) -> Self {
let (sender, receiver) = mpsc::channel(1024);
Self {
config,
buffer: Arc::new(RwLock::new(VecDeque::new())),
sender,
receiver: Arc::new(RwLock::new(receiver)),
counter: AtomicU64::new(0),
}
}
pub async fn record(&self, metric: Metric) {
let mut buffer = self.buffer.write().await;
buffer.push_back(metric);
self.counter.fetch_add(1, Ordering::Relaxed);
if buffer.len() >= self.config.buffer_size {
let batch: Vec<Metric> = buffer.drain(..).collect();
let _ = self.sender.send(batch).await;
}
}
pub async fn increment_counter(&self, name: impl Into<String>, value: f64) {
self.record(Metric::counter(name, value)).await;
}
pub async fn set_gauge(&self, name: impl Into<String>, value: f64) {
self.record(Metric::gauge(name, value)).await;
}
pub async fn flush(&self) {
let mut buffer = self.buffer.write().await;
if !buffer.is_empty() {
let batch: Vec<Metric> = buffer.drain(..).collect();
let _ = self.sender.send(batch).await;
}
}
pub async fn drain(&self) -> Vec<Metric> {
let mut buffer = self.buffer.write().await;
buffer.drain(..).collect()
}
pub fn subscribe(&self) -> mpsc::Receiver<Vec<Metric>> {
let (_tx, rx) = mpsc::channel(1024);
rx
}
pub fn count(&self) -> u64 {
self.counter.load(Ordering::Relaxed)
}
pub async fn buffer_size(&self) -> usize {
self.buffer.read().await.len()
}
pub async fn run(&self) {
let mut interval = tokio::time::interval(self.config.flush_interval);
loop {
interval.tick().await;
self.flush().await;
}
}
}
pub struct LogCollector {
config: LogsConfig,
buffer: Arc<RwLock<VecDeque<LogEntry>>>,
sender: mpsc::Sender<Vec<LogEntry>>,
counter: AtomicU64,
}
impl LogCollector {
pub fn new(config: LogsConfig) -> Self {
let (sender, _receiver) = mpsc::channel(1024);
Self {
config,
buffer: Arc::new(RwLock::new(VecDeque::new())),
sender,
counter: AtomicU64::new(0),
}
}
pub async fn record(&self, entry: LogEntry) {
if !entry.matches_level(self.config.level) {
return;
}
let mut buffer = self.buffer.write().await;
buffer.push_back(entry);
self.counter.fetch_add(1, Ordering::Relaxed);
if buffer.len() >= self.config.buffer_size {
let batch: Vec<LogEntry> = buffer.drain(..).collect();
let _ = self.sender.send(batch).await;
}
}
pub async fn trace(&self, message: impl Into<String>) {
self.record(LogEntry::trace(message)).await;
}
pub async fn debug(&self, message: impl Into<String>) {
self.record(LogEntry::debug(message)).await;
}
pub async fn info(&self, message: impl Into<String>) {
self.record(LogEntry::info(message)).await;
}
pub async fn warn(&self, message: impl Into<String>) {
self.record(LogEntry::warn(message)).await;
}
pub async fn error(&self, message: impl Into<String>) {
self.record(LogEntry::error(message)).await;
}
pub async fn flush(&self) {
let mut buffer = self.buffer.write().await;
if !buffer.is_empty() {
let batch: Vec<LogEntry> = buffer.drain(..).collect();
let _ = self.sender.send(batch).await;
}
}
pub async fn drain(&self) -> Vec<LogEntry> {
let mut buffer = self.buffer.write().await;
buffer.drain(..).collect()
}
pub fn count(&self) -> u64 {
self.counter.load(Ordering::Relaxed)
}
pub async fn buffer_size(&self) -> usize {
self.buffer.read().await.len()
}
pub fn min_level(&self) -> LogLevel {
self.config.level
}
}
pub struct TraceCollector {
config: TracesConfig,
buffer: Arc<RwLock<VecDeque<Span>>>,
sender: mpsc::Sender<Vec<Span>>,
counter: AtomicU64,
sampled_counter: AtomicU64,
}
impl TraceCollector {
pub fn new(config: TracesConfig) -> Self {
let (sender, _receiver) = mpsc::channel(1024);
Self {
config,
buffer: Arc::new(RwLock::new(VecDeque::new())),
sender,
counter: AtomicU64::new(0),
sampled_counter: AtomicU64::new(0),
}
}
pub async fn record(&self, span: Span) {
self.counter.fetch_add(1, Ordering::Relaxed);
let should_sample = self.should_sample(&span);
if !should_sample {
return;
}
self.sampled_counter.fetch_add(1, Ordering::Relaxed);
let mut buffer = self.buffer.write().await;
buffer.push_back(span);
}
fn should_sample(&self, span: &Span) -> bool {
if self.config.always_trace_errors && span.status == forge_core::SpanStatus::Error {
return true;
}
if !span.context.is_sampled() {
return false;
}
if self.config.sample_rate >= 1.0 {
return true;
}
let hash = span
.context
.trace_id
.as_str()
.as_bytes()
.iter()
.fold(0u64, |acc, b| acc.wrapping_mul(31).wrapping_add(*b as u64));
let threshold = (self.config.sample_rate * u64::MAX as f64) as u64;
hash < threshold
}
pub async fn flush(&self) {
let mut buffer = self.buffer.write().await;
if !buffer.is_empty() {
let batch: Vec<Span> = buffer.drain(..).collect();
let _ = self.sender.send(batch).await;
}
}
pub async fn drain(&self) -> Vec<Span> {
let mut buffer = self.buffer.write().await;
buffer.drain(..).collect()
}
pub fn count(&self) -> u64 {
self.counter.load(Ordering::Relaxed)
}
pub fn sampled_count(&self) -> u64 {
self.sampled_counter.load(Ordering::Relaxed)
}
pub async fn buffer_size(&self) -> usize {
self.buffer.read().await.len()
}
pub fn sample_rate(&self) -> f64 {
self.config.sample_rate
}
}
pub struct SystemMetricsCollector {
system: RwLock<sysinfo::System>,
shutdown: Arc<RwLock<bool>>,
}
impl SystemMetricsCollector {
pub fn new() -> Self {
Self {
system: RwLock::new(sysinfo::System::new_all()),
shutdown: Arc::new(RwLock::new(false)),
}
}
pub fn start(
&self,
metrics: Arc<MetricsCollector>,
interval: std::time::Duration,
) -> tokio::task::JoinHandle<()> {
let shutdown = self.shutdown.clone();
let system = RwLock::new(sysinfo::System::new_all());
tokio::spawn(async move {
let mut ticker = tokio::time::interval(interval);
loop {
ticker.tick().await;
if *shutdown.read().await {
break;
}
{
let mut sys = system.write().await;
sys.refresh_all();
let cpu_usage = sys.global_cpu_usage();
metrics
.set_gauge("forge_system_cpu_usage_percent", cpu_usage as f64)
.await;
let total_memory = sys.total_memory();
let used_memory = sys.used_memory();
let memory_usage_percent = if total_memory > 0 {
(used_memory as f64 / total_memory as f64) * 100.0
} else {
0.0
};
metrics
.set_gauge("forge_system_memory_total_bytes", total_memory as f64)
.await;
metrics
.set_gauge("forge_system_memory_used_bytes", used_memory as f64)
.await;
metrics
.set_gauge("forge_system_memory_usage_percent", memory_usage_percent)
.await;
let total_swap = sys.total_swap();
let used_swap = sys.used_swap();
metrics
.set_gauge("forge_system_swap_total_bytes", total_swap as f64)
.await;
metrics
.set_gauge("forge_system_swap_used_bytes", used_swap as f64)
.await;
for (i, cpu) in sys.cpus().iter().enumerate() {
let label = format!("cpu{}", i);
let mut metric = Metric::gauge(
"forge_system_cpu_core_usage_percent",
cpu.cpu_usage() as f64,
);
metric.labels.insert("core".to_string(), label);
metrics.record(metric).await;
}
}
let disks = sysinfo::Disks::new_with_refreshed_list();
for disk in disks.list() {
let mount = disk.mount_point().to_string_lossy().to_string();
let total = disk.total_space();
let available = disk.available_space();
let used = total.saturating_sub(available);
let usage_percent = if total > 0 {
(used as f64 / total as f64) * 100.0
} else {
0.0
};
let mut metric = Metric::gauge("forge_system_disk_total_bytes", total as f64);
metric.labels.insert("mount".to_string(), mount.clone());
metrics.record(metric).await;
let mut metric = Metric::gauge("forge_system_disk_used_bytes", used as f64);
metric.labels.insert("mount".to_string(), mount.clone());
metrics.record(metric).await;
let mut metric =
Metric::gauge("forge_system_disk_usage_percent", usage_percent);
metric.labels.insert("mount".to_string(), mount);
metrics.record(metric).await;
}
#[cfg(unix)]
{
let load_avg = sysinfo::System::load_average();
metrics
.set_gauge("forge_system_load_1m", load_avg.one)
.await;
metrics
.set_gauge("forge_system_load_5m", load_avg.five)
.await;
metrics
.set_gauge("forge_system_load_15m", load_avg.fifteen)
.await;
}
}
tracing::info!("System metrics collector stopped");
})
}
pub async fn stop(&self) {
let mut shutdown = self.shutdown.write().await;
*shutdown = true;
}
pub async fn snapshot(&self) -> SystemMetricsSnapshot {
let mut sys = self.system.write().await;
sys.refresh_all();
let total_memory = sys.total_memory();
let used_memory = sys.used_memory();
SystemMetricsSnapshot {
cpu_usage_percent: sys.global_cpu_usage() as f64,
memory_total_bytes: total_memory,
memory_used_bytes: used_memory,
memory_usage_percent: if total_memory > 0 {
(used_memory as f64 / total_memory as f64) * 100.0
} else {
0.0
},
swap_total_bytes: sys.total_swap(),
swap_used_bytes: sys.used_swap(),
}
}
}
impl Default for SystemMetricsCollector {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct SystemMetricsSnapshot {
pub cpu_usage_percent: f64,
pub memory_total_bytes: u64,
pub memory_used_bytes: u64,
pub memory_usage_percent: f64,
pub swap_total_bytes: u64,
pub swap_used_bytes: u64,
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_metrics_collector_record() {
let collector = MetricsCollector::new(MetricsConfig::default());
collector.increment_counter("test_counter", 1.0).await;
collector.set_gauge("test_gauge", 42.0).await;
assert_eq!(collector.count(), 2);
assert_eq!(collector.buffer_size().await, 2);
}
#[tokio::test]
async fn test_metrics_collector_flush() {
let config = MetricsConfig {
buffer_size: 2,
..Default::default()
};
let collector = MetricsCollector::new(config);
collector.increment_counter("test1", 1.0).await;
collector.increment_counter("test2", 2.0).await;
assert_eq!(collector.count(), 2);
}
#[tokio::test]
async fn test_log_collector_level_filter() {
let config = LogsConfig {
level: LogLevel::Warn,
..Default::default()
};
let collector = LogCollector::new(config);
collector.debug("Debug message").await;
collector.info("Info message").await;
collector.warn("Warn message").await;
collector.error("Error message").await;
assert_eq!(collector.count(), 2);
}
#[tokio::test]
async fn test_log_collector_record() {
let collector = LogCollector::new(LogsConfig::default());
collector.info("Test message").await;
assert_eq!(collector.count(), 1);
assert_eq!(collector.buffer_size().await, 1);
}
#[tokio::test]
async fn test_trace_collector_sampling() {
let config = TracesConfig {
sample_rate: 1.0, ..Default::default()
};
let collector = TraceCollector::new(config);
let span = Span::new("test_span");
collector.record(span).await;
assert_eq!(collector.count(), 1);
assert_eq!(collector.sampled_count(), 1);
}
#[tokio::test]
async fn test_trace_collector_always_trace_errors() {
let config = TracesConfig {
sample_rate: 0.0, always_trace_errors: true,
..Default::default()
};
let collector = TraceCollector::new(config);
let mut span = Span::new("error_span");
span.end_error("Test error");
collector.record(span).await;
assert_eq!(collector.sampled_count(), 1);
}
}