use std::collections::HashMap;
use std::sync::RwLock;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
#[derive(Debug, Clone, Default)]
pub struct PipelineMetrics {
pub query_count: u64,
pub success_count: u64,
pub error_count: u64,
pub avg_latency_ms: f64,
pub min_latency_ms: u64,
pub max_latency_ms: u64,
pub layer_timings: HashMap<String, LayerTiming>,
pub fast_path_hits: u64,
pub documents_indexed: u64,
}
#[derive(Debug, Clone, Default)]
pub struct LayerTiming {
pub invocations: u64,
pub total_time_ms: u64,
pub avg_time_ms: f64,
pub min_time_ms: u64,
pub max_time_ms: u64,
}
impl LayerTiming {
pub fn record(&mut self, duration_ms: u64) {
self.invocations += 1;
self.total_time_ms += duration_ms;
if self.invocations == 1 {
self.min_time_ms = duration_ms;
self.max_time_ms = duration_ms;
} else {
self.min_time_ms = self.min_time_ms.min(duration_ms);
self.max_time_ms = self.max_time_ms.max(duration_ms);
}
#[allow(clippy::cast_precision_loss)]
{
self.avg_time_ms = self.total_time_ms as f64 / self.invocations as f64;
}
}
}
pub struct MetricsCollector {
query_count: AtomicU64,
success_count: AtomicU64,
error_count: AtomicU64,
fast_path_hits: AtomicU64,
documents_indexed: AtomicU64,
total_latency_ms: AtomicU64,
min_latency_ms: AtomicU64,
max_latency_ms: AtomicU64,
layer_timings: RwLock<HashMap<String, LayerTiming>>,
}
impl Default for MetricsCollector {
fn default() -> Self {
Self::new()
}
}
impl MetricsCollector {
#[must_use]
pub fn new() -> Self {
Self {
query_count: AtomicU64::new(0),
success_count: AtomicU64::new(0),
error_count: AtomicU64::new(0),
fast_path_hits: AtomicU64::new(0),
documents_indexed: AtomicU64::new(0),
total_latency_ms: AtomicU64::new(0),
min_latency_ms: AtomicU64::new(u64::MAX),
max_latency_ms: AtomicU64::new(0),
layer_timings: RwLock::new(HashMap::new()),
}
}
pub fn record_query_success(&self, duration: Duration) {
let duration_ms = u64::try_from(duration.as_millis()).unwrap_or(u64::MAX);
self.query_count.fetch_add(1, Ordering::Relaxed);
self.success_count.fetch_add(1, Ordering::Relaxed);
self.total_latency_ms
.fetch_add(duration_ms, Ordering::Relaxed);
let mut current_min = self.min_latency_ms.load(Ordering::Relaxed);
while duration_ms < current_min {
match self.min_latency_ms.compare_exchange_weak(
current_min,
duration_ms,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(x) => current_min = x,
}
}
let mut current_max = self.max_latency_ms.load(Ordering::Relaxed);
while duration_ms > current_max {
match self.max_latency_ms.compare_exchange_weak(
current_max,
duration_ms,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(x) => current_max = x,
}
}
}
pub fn record_query_error(&self) {
self.query_count.fetch_add(1, Ordering::Relaxed);
self.error_count.fetch_add(1, Ordering::Relaxed);
}
pub fn record_fast_path_hit(&self) {
self.fast_path_hits.fetch_add(1, Ordering::Relaxed);
}
pub fn record_document_indexed(&self) {
self.documents_indexed.fetch_add(1, Ordering::Relaxed);
}
pub fn record_layer_timing(&self, layer: &str, duration: Duration) {
let duration_ms = u64::try_from(duration.as_millis()).unwrap_or(u64::MAX);
if let Ok(mut timings) = self.layer_timings.write() {
timings
.entry(layer.to_string())
.or_default()
.record(duration_ms);
}
}
#[must_use]
pub fn snapshot(&self) -> PipelineMetrics {
let query_count = self.query_count.load(Ordering::Relaxed);
let total_latency = self.total_latency_ms.load(Ordering::Relaxed);
let success_count = self.success_count.load(Ordering::Relaxed);
#[allow(clippy::cast_precision_loss)]
let avg_latency_ms = if success_count > 0 {
total_latency as f64 / success_count as f64
} else {
0.0
};
let min_latency = self.min_latency_ms.load(Ordering::Relaxed);
let max_latency = self.max_latency_ms.load(Ordering::Relaxed);
let layer_timings = self
.layer_timings
.read()
.map(|t| t.clone())
.unwrap_or_default();
PipelineMetrics {
query_count,
success_count,
error_count: self.error_count.load(Ordering::Relaxed),
avg_latency_ms,
min_latency_ms: if min_latency == u64::MAX {
0
} else {
min_latency
},
max_latency_ms: if max_latency == 0 && query_count == 0 {
0
} else {
max_latency
},
layer_timings,
fast_path_hits: self.fast_path_hits.load(Ordering::Relaxed),
documents_indexed: self.documents_indexed.load(Ordering::Relaxed),
}
}
pub fn reset(&self) {
self.query_count.store(0, Ordering::Relaxed);
self.success_count.store(0, Ordering::Relaxed);
self.error_count.store(0, Ordering::Relaxed);
self.fast_path_hits.store(0, Ordering::Relaxed);
self.documents_indexed.store(0, Ordering::Relaxed);
self.total_latency_ms.store(0, Ordering::Relaxed);
self.min_latency_ms.store(u64::MAX, Ordering::Relaxed);
self.max_latency_ms.store(0, Ordering::Relaxed);
if let Ok(mut timings) = self.layer_timings.write() {
timings.clear();
}
}
}
pub struct TimingGuard<'a> {
collector: &'a MetricsCollector,
layer: String,
start: Instant,
}
impl<'a> TimingGuard<'a> {
#[must_use]
pub fn new(collector: &'a MetricsCollector, layer: impl Into<String>) -> Self {
Self {
collector,
layer: layer.into(),
start: Instant::now(),
}
}
}
impl Drop for TimingGuard<'_> {
fn drop(&mut self) {
self.collector
.record_layer_timing(&self.layer, self.start.elapsed());
}
}
pub trait TimedOperation {
fn time_layer(&self, layer: impl Into<String>) -> TimingGuard<'_>;
}
impl TimedOperation for MetricsCollector {
fn time_layer(&self, layer: impl Into<String>) -> TimingGuard<'_> {
TimingGuard::new(self, layer)
}
}
#[cfg(test)]
#[allow(clippy::float_cmp)]
mod tests {
use super::*;
use std::thread;
#[test]
fn test_metrics_collector_basic() {
let collector = MetricsCollector::new();
collector.record_query_success(Duration::from_millis(100));
collector.record_query_success(Duration::from_millis(200));
collector.record_query_error();
let metrics = collector.snapshot();
assert_eq!(metrics.query_count, 3);
assert_eq!(metrics.success_count, 2);
assert_eq!(metrics.error_count, 1);
assert_eq!(metrics.min_latency_ms, 100);
assert_eq!(metrics.max_latency_ms, 200);
assert!((metrics.avg_latency_ms - 150.0).abs() < 0.1);
}
#[test]
fn test_layer_timing() {
let collector = MetricsCollector::new();
collector.record_layer_timing("Echo", Duration::from_millis(50));
collector.record_layer_timing("Echo", Duration::from_millis(100));
collector.record_layer_timing("Speculator", Duration::from_millis(200));
let metrics = collector.snapshot();
let echo_timing = metrics.layer_timings.get("Echo").unwrap();
assert_eq!(echo_timing.invocations, 2);
assert_eq!(echo_timing.min_time_ms, 50);
assert_eq!(echo_timing.max_time_ms, 100);
let spec_timing = metrics.layer_timings.get("Speculator").unwrap();
assert_eq!(spec_timing.invocations, 1);
}
#[test]
fn test_fast_path_tracking() {
let collector = MetricsCollector::new();
collector.record_fast_path_hit();
collector.record_fast_path_hit();
let metrics = collector.snapshot();
assert_eq!(metrics.fast_path_hits, 2);
}
#[test]
fn test_documents_indexed() {
let collector = MetricsCollector::new();
collector.record_document_indexed();
collector.record_document_indexed();
collector.record_document_indexed();
let metrics = collector.snapshot();
assert_eq!(metrics.documents_indexed, 3);
}
#[test]
fn test_reset() {
let collector = MetricsCollector::new();
collector.record_query_success(Duration::from_millis(100));
collector.record_layer_timing("Echo", Duration::from_millis(50));
collector.reset();
let metrics = collector.snapshot();
assert_eq!(metrics.query_count, 0);
assert_eq!(metrics.success_count, 0);
assert!(metrics.layer_timings.is_empty());
}
#[test]
fn test_timing_guard() {
let collector = MetricsCollector::new();
{
let _guard = collector.time_layer("TestLayer");
thread::sleep(Duration::from_millis(10));
}
let metrics = collector.snapshot();
let timing = metrics.layer_timings.get("TestLayer").unwrap();
assert_eq!(timing.invocations, 1);
assert!(timing.total_time_ms >= 10);
}
#[test]
fn test_concurrent_access() {
use std::sync::Arc;
let collector = Arc::new(MetricsCollector::new());
let mut handles = vec![];
for _ in 0..10 {
let c = Arc::clone(&collector);
handles.push(thread::spawn(move || {
for _ in 0..100 {
c.record_query_success(Duration::from_millis(10));
}
}));
}
for handle in handles {
handle.join().unwrap();
}
let metrics = collector.snapshot();
assert_eq!(metrics.query_count, 1000);
assert_eq!(metrics.success_count, 1000);
}
#[test]
fn test_empty_metrics() {
let collector = MetricsCollector::new();
let metrics = collector.snapshot();
assert_eq!(metrics.query_count, 0);
assert_eq!(metrics.success_count, 0);
assert_eq!(metrics.avg_latency_ms, 0.0);
assert_eq!(metrics.min_latency_ms, 0);
assert_eq!(metrics.max_latency_ms, 0);
}
}