use serde_json::Value;
const SEVERITY_ERROR_THRESHOLD: i64 = 17;
const STATUS_CODE_ERROR: i64 = 2;
#[derive(Default, Debug)]
pub struct LogAggregates {
pub count: i64,
pub error_count: i64,
}
impl LogAggregates {
pub fn accumulate(&mut self, record: &Value) {
self.count += 1;
if let Some(severity) = record.get("severity_number").and_then(|v| v.as_i64()) {
if severity >= SEVERITY_ERROR_THRESHOLD {
self.error_count += 1;
}
}
}
}
#[derive(Default, Debug)]
pub struct TraceAggregates {
pub count: i64,
pub error_count: i64,
pub latency_sum_us: i64,
pub latency_min_us: Option<i64>,
pub latency_max_us: Option<i64>,
}
impl TraceAggregates {
pub fn accumulate(&mut self, record: &Value) {
self.count += 1;
if let Some(status) = record.get("status_code").and_then(|v| v.as_i64()) {
if status == STATUS_CODE_ERROR {
self.error_count += 1;
}
}
if let Some(duration_ms) = record.get("duration").and_then(|v| v.as_i64()) {
let duration_us = duration_ms * 1000;
self.latency_sum_us += duration_us;
self.latency_min_us = Some(
self.latency_min_us
.map(|min| min.min(duration_us))
.unwrap_or(duration_us),
);
self.latency_max_us = Some(
self.latency_max_us
.map(|max| max.max(duration_us))
.unwrap_or(duration_us),
);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn log_aggregates_counts_records() {
let mut agg = LogAggregates::default();
agg.accumulate(&json!({"severity_number": 9}));
agg.accumulate(&json!({"severity_number": 17}));
agg.accumulate(&json!({"severity_number": 21}));
assert_eq!(agg.count, 3);
assert_eq!(agg.error_count, 2); }
#[test]
fn trace_aggregates_tracks_latency() {
let mut agg = TraceAggregates::default();
agg.accumulate(&json!({"status_code": 0, "duration": 1})); agg.accumulate(&json!({"status_code": 2, "duration": 5})); agg.accumulate(&json!({"status_code": 1, "duration": 2}));
assert_eq!(agg.count, 3);
assert_eq!(agg.error_count, 1);
assert_eq!(agg.latency_sum_us, 8000); assert_eq!(agg.latency_min_us, Some(1000)); assert_eq!(agg.latency_max_us, Some(5000)); }
#[test]
fn log_aggregates_handles_missing_severity() {
let mut agg = LogAggregates::default();
agg.accumulate(&json!({"body": "test"})); assert_eq!(agg.count, 1);
assert_eq!(agg.error_count, 0);
}
#[test]
fn log_aggregates_severity_boundary() {
let mut agg = LogAggregates::default();
agg.accumulate(&json!({"severity_number": 16})); agg.accumulate(&json!({"severity_number": 17})); assert_eq!(agg.error_count, 1);
}
#[test]
fn trace_aggregates_handles_missing_duration() {
let mut agg = TraceAggregates::default();
agg.accumulate(&json!({"status_code": 0})); assert_eq!(agg.count, 1);
assert_eq!(agg.latency_sum_us, 0);
assert_eq!(agg.latency_min_us, None);
}
#[test]
fn trace_aggregates_matches_vrl_output_format() {
let mut agg = TraceAggregates::default();
let vrl_output = json!({
"trace_id": "0af7651916cd43dd8448eb211c80319c",
"span_id": "b7ad6b7169203331",
"span_name": "HTTP GET /api/users",
"duration": 100, "status_code": 1, "service_name": "my-service"
});
agg.accumulate(&vrl_output);
assert_eq!(agg.count, 1);
assert_eq!(agg.error_count, 0);
assert_eq!(agg.latency_sum_us, 100_000); assert_eq!(agg.latency_min_us, Some(100_000));
assert_eq!(agg.latency_max_us, Some(100_000));
}
#[test]
fn trace_aggregates_error_spans() {
let mut agg = TraceAggregates::default();
agg.accumulate(&json!({"status_code": 2, "duration": 50}));
agg.accumulate(&json!({"status_code": 1, "duration": 30}));
agg.accumulate(&json!({"status_code": 2, "duration": 70}));
assert_eq!(agg.count, 3);
assert_eq!(agg.error_count, 2); assert_eq!(agg.latency_sum_us, 150_000); assert_eq!(agg.latency_min_us, Some(30_000));
assert_eq!(agg.latency_max_us, Some(70_000));
}
}