use super::types::{MetricEvent, MetricQuery};
use anyhow::{Context, Result};
use serde_json::Value;
use std::collections::HashMap;
use std::io::{BufRead, BufReader};
use std::path::{Path, PathBuf};
use tokio::fs::{create_dir_all, OpenOptions};
use tokio::io::AsyncWriteExt;
pub struct MetricsCollector {
log_path: PathBuf,
}
impl MetricsCollector {
pub fn new(base_dir: impl AsRef<Path>) -> Self {
let log_path = base_dir.as_ref().join("logs").join("metrics.jsonl");
Self { log_path }
}
async fn ensure_log_dir(&self) -> Result<()> {
if let Some(parent) = self.log_path.parent() {
create_dir_all(parent)
.await
.context("Failed to create log directory")?;
}
Ok(())
}
pub async fn record_event(
&self,
event_type: &str,
data: Value,
session_id: Option<&str>,
) -> Result<()> {
self.ensure_log_dir().await?;
let event = MetricEvent::new(event_type, data, session_id);
let mut line = serde_json::to_string(&event).context("Failed to serialize metric event")?;
line.push('\n');
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(&self.log_path)
.await
.context("Failed to open metrics log file")?;
file.write_all(line.as_bytes())
.await
.context("Failed to write metric event")?;
Ok(())
}
pub async fn query(&self, query: MetricQuery) -> Result<Vec<MetricEvent>> {
if !self.log_path.exists() {
return Ok(Vec::new());
}
let log_path = self.log_path.clone();
let events = tokio::task::spawn_blocking(move || -> Result<Vec<MetricEvent>> {
let file = std::fs::File::open(&log_path).context("Failed to open metrics log file")?;
let reader = BufReader::new(file);
let mut events = Vec::new();
let mut skipped = 0;
for line in reader.lines() {
let line = line.context("Failed to read line from metrics log")?;
if line.trim().is_empty() {
continue;
}
let event: MetricEvent =
serde_json::from_str(&line).context("Failed to parse metric event")?;
if query.matches(&event) {
if let Some(offset) = query.offset {
if skipped < offset {
skipped += 1;
continue;
}
}
events.push(event);
if let Some(limit) = query.limit {
if events.len() >= limit {
break;
}
}
}
}
Ok(events)
})
.await
.context("Failed to execute blocking query task")??;
Ok(events)
}
pub async fn aggregate<F>(&self, query: MetricQuery, aggregator: F) -> Result<Value>
where
F: FnOnce(&[MetricEvent]) -> Value,
{
let events = self.query(query).await?;
Ok(aggregator(&events))
}
}
pub mod aggregators {
use super::*;
pub fn sum(field: &str) -> impl Fn(&[MetricEvent]) -> Value {
let field = field.to_string();
move |events: &[MetricEvent]| {
let total: f64 = events
.iter()
.filter_map(|e| e.get_numeric_field(&field))
.sum();
serde_json::json!({ "sum": total })
}
}
pub fn avg(field: &str) -> impl Fn(&[MetricEvent]) -> Value {
let field = field.to_string();
move |events: &[MetricEvent]| {
let values: Vec<f64> = events
.iter()
.filter_map(|e| e.get_numeric_field(&field))
.collect();
if values.is_empty() {
return serde_json::json!({ "avg": 0.0, "count": 0 });
}
let total: f64 = values.iter().sum();
let avg = total / values.len() as f64;
serde_json::json!({ "avg": avg, "count": values.len() })
}
}
pub fn count() -> impl Fn(&[MetricEvent]) -> Value {
|events: &[MetricEvent]| serde_json::json!({ "count": events.len() })
}
pub fn group_by(field: &str) -> impl Fn(&[MetricEvent]) -> Value {
let field = field.to_string();
move |events: &[MetricEvent]| {
let mut groups: HashMap<String, Vec<&MetricEvent>> = HashMap::new();
for event in events {
let key = event
.get_string_field(&field)
.unwrap_or("unknown")
.to_string();
groups.entry(key).or_default().push(event);
}
let result: HashMap<String, usize> = groups
.into_iter()
.map(|(key, events)| (key, events.len()))
.collect();
serde_json::to_value(result).unwrap_or(Value::Null)
}
}
pub fn stats(field: &str) -> impl Fn(&[MetricEvent]) -> Value {
let field = field.to_string();
move |events: &[MetricEvent]| {
let values: Vec<f64> = events
.iter()
.filter_map(|e| e.get_numeric_field(&field))
.collect();
if values.is_empty() {
return serde_json::json!({
"count": 0,
"min": null,
"max": null,
"avg": null,
"sum": 0.0
});
}
let min = values.iter().fold(f64::INFINITY, |a, &b| a.min(b));
let max = values.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b));
let sum: f64 = values.iter().sum();
let avg = sum / values.len() as f64;
serde_json::json!({
"count": values.len(),
"min": min,
"max": max,
"avg": avg,
"sum": sum
})
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[tokio::test]
async fn test_record_and_query() {
let temp_dir = TempDir::new().unwrap();
let collector = MetricsCollector::new(temp_dir.path());
collector
.record_event(
"test_event",
serde_json::json!({ "value": 42 }),
Some("session1"),
)
.await
.unwrap();
collector
.record_event(
"test_event",
serde_json::json!({ "value": 100 }),
Some("session1"),
)
.await
.unwrap();
collector
.record_event(
"other_event",
serde_json::json!({ "value": 200 }),
Some("session2"),
)
.await
.unwrap();
let query = MetricQuery::new().with_event_type("test_event");
let events = collector.query(query).await.unwrap();
assert_eq!(events.len(), 2);
let query = MetricQuery::new().with_session_id("session2");
let events = collector.query(query).await.unwrap();
assert_eq!(events.len(), 1);
assert_eq!(events[0].get_numeric_field("value"), Some(200.0));
}
#[tokio::test]
async fn test_aggregators() {
let temp_dir = TempDir::new().unwrap();
let collector = MetricsCollector::new(temp_dir.path());
for i in 1..=5 {
collector
.record_event("metric", serde_json::json!({ "value": i * 10 }), None)
.await
.unwrap();
}
let query = MetricQuery::new();
let result = collector
.aggregate(query.clone(), aggregators::sum("value"))
.await
.unwrap();
assert_eq!(result["sum"], 150.0);
let result = collector
.aggregate(query.clone(), aggregators::avg("value"))
.await
.unwrap();
assert_eq!(result["avg"], 30.0);
assert_eq!(result["count"], 5);
let result = collector
.aggregate(query, aggregators::count())
.await
.unwrap();
assert_eq!(result["count"], 5);
}
}