use hdrhistogram::Histogram;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
use tokio::sync::RwLock;
const HISTOGRAM_MIN_MICROS: u64 = 1; const HISTOGRAM_MAX_MICROS: u64 = 3_600_000_000; const HISTOGRAM_PRECISION: u8 = 3;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct DetailedIndexStats {
pub total_shards: usize,
pub total_postings: usize,
pub pending_operations: usize,
pub memory_usage: usize,
pub disk_usage: usize,
pub active_postings: usize,
pub deleted_postings: usize,
pub average_shard_utilization: f32,
pub vector_dimension: usize,
pub search_latency_p50: Duration,
pub search_latency_p95: Duration,
pub search_latency_p99: Duration,
pub write_throughput: f64, pub read_throughput: f64,
pub bloom_filter_hit_rate: f64,
pub bloom_filter_false_positive_rate: f64,
pub bloom_filter_memory_usage: usize,
pub file_descriptor_count: usize,
pub memory_mapped_regions: usize,
pub wal_segment_count: usize,
pub active_connections: usize,
pub uptime: Duration,
pub total_operations: u64,
pub last_updated: SystemTime,
}
pub struct PerformanceMonitor {
counters: Arc<AtomicCounters>,
complex_metrics: Arc<RwLock<ComplexMetrics>>,
start_time: Instant,
}
#[derive(Debug)]
pub struct AtomicCounters {
pub total_searches: std::sync::atomic::AtomicU64,
pub successful_searches: std::sync::atomic::AtomicU64,
pub failed_searches: std::sync::atomic::AtomicU64,
pub total_writes: std::sync::atomic::AtomicU64,
pub successful_writes: std::sync::atomic::AtomicU64,
pub failed_writes: std::sync::atomic::AtomicU64,
pub bytes_written: std::sync::atomic::AtomicU64,
pub bloom_filter_hits: std::sync::atomic::AtomicU64,
pub bloom_filter_misses: std::sync::atomic::AtomicU64,
pub bloom_filter_false_positives: std::sync::atomic::AtomicU64,
pub file_descriptor_count: std::sync::atomic::AtomicUsize,
pub active_connections: std::sync::atomic::AtomicUsize,
pub total_operations: std::sync::atomic::AtomicU64,
}
impl Default for AtomicCounters {
fn default() -> Self {
Self {
total_searches: std::sync::atomic::AtomicU64::new(0),
successful_searches: std::sync::atomic::AtomicU64::new(0),
failed_searches: std::sync::atomic::AtomicU64::new(0),
total_writes: std::sync::atomic::AtomicU64::new(0),
successful_writes: std::sync::atomic::AtomicU64::new(0),
failed_writes: std::sync::atomic::AtomicU64::new(0),
bytes_written: std::sync::atomic::AtomicU64::new(0),
bloom_filter_hits: std::sync::atomic::AtomicU64::new(0),
bloom_filter_misses: std::sync::atomic::AtomicU64::new(0),
bloom_filter_false_positives: std::sync::atomic::AtomicU64::new(0),
file_descriptor_count: std::sync::atomic::AtomicUsize::new(0),
active_connections: std::sync::atomic::AtomicUsize::new(0),
total_operations: std::sync::atomic::AtomicU64::new(0),
}
}
}
#[derive(Debug)]
pub struct ComplexMetrics {
pub search_latency_calculator: PercentileCalculator,
pub recent_search_times: std::collections::VecDeque<Instant>,
pub write_latency_calculator: PercentileCalculator,
pub recent_write_times: std::collections::VecDeque<Instant>,
pub last_write_time: Option<Instant>,
pub bloom_filter_memory_usage: usize,
pub text_storage_size: usize,
pub text_retrieval_cache_hits: u64,
pub text_retrieval_cache_misses: u64,
pub memory_usage: usize,
pub memory_mapped_regions: usize,
pub wal_segment_count: usize,
pub snapshots: std::collections::VecDeque<MetricsSnapshot>,
pub last_updated: SystemTime,
}
impl Default for ComplexMetrics {
fn default() -> Self {
Self {
search_latency_calculator: PercentileCalculator::new(),
recent_search_times: std::collections::VecDeque::new(),
write_latency_calculator: PercentileCalculator::new(),
recent_write_times: std::collections::VecDeque::new(),
last_write_time: None,
bloom_filter_memory_usage: 0,
text_storage_size: 0,
text_retrieval_cache_hits: 0,
text_retrieval_cache_misses: 0,
memory_usage: 0,
memory_mapped_regions: 0,
wal_segment_count: 0,
snapshots: std::collections::VecDeque::new(),
last_updated: SystemTime::UNIX_EPOCH,
}
}
}
#[derive(Debug, Clone)]
pub struct MetricsSnapshot {
pub timestamp: SystemTime,
pub total_operations: u64,
pub memory_usage: usize,
pub search_throughput: f64,
pub write_throughput: f64,
pub bloom_filter_hit_rate: f64,
}
impl Default for MetricsSnapshot {
fn default() -> Self {
Self {
timestamp: SystemTime::now(),
total_operations: 0,
memory_usage: 0,
search_throughput: 0.0,
write_throughput: 0.0,
bloom_filter_hit_rate: 0.0,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct WriteMetrics {
pub total_writes: u64,
pub successful_writes: u64,
pub failed_writes: u64,
pub average_write_latency_ms: f64,
pub write_throughput_ops_per_sec: f64,
pub bytes_written: u64,
pub last_write_time: Option<Instant>,
pub wal_writes: u64,
pub wal_flushes: u64,
pub average_wal_flush_latency_ms: f64,
}
#[derive(Debug, Clone, Default)]
pub struct BloomFilterMetrics {
pub total_lookups: u64,
pub hits: u64,
pub misses: u64,
pub false_positives: u64,
pub hit_rate: f64,
pub false_positive_rate: f64,
pub average_lookup_time_ns: f64,
pub memory_usage_bytes: usize,
}
#[derive(Debug, Clone, Default)]
pub struct ResourceMetrics {
pub memory_usage_bytes: usize,
pub disk_usage_bytes: usize,
pub file_descriptor_count: usize,
pub memory_mapped_regions: usize,
pub wal_segment_count: usize,
pub shard_file_count: usize,
pub active_connections: usize,
}
#[derive(Debug, Clone, Default)]
pub struct DocumentTextMetrics {
pub total_documents: u64,
pub total_text_size: u64,
pub average_document_size: f64,
pub document_storage_operations: u64,
pub document_retrieval_operations: u64,
}
#[derive(Debug, Clone)]
pub struct HistoricalData {
pub data_points: Vec<HistoricalDataPoint>,
pub max_data_points: usize,
pub collection_interval: Duration,
pub last_collection: Option<Instant>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HistoricalDataPoint {
pub timestamp: SystemTime,
pub search_latency_p95: Duration,
pub write_throughput: f64,
pub memory_usage: usize,
pub disk_usage: usize,
pub bloom_filter_hit_rate: f64,
pub active_shards: usize,
pub pending_operations: usize,
}
#[derive(Debug, Clone)]
pub struct TrendAnalysis {
pub slope: f64,
pub correlation: f64,
pub trend_direction: TrendDirection,
pub confidence: f64,
}
#[derive(Debug, Clone, PartialEq)]
pub enum TrendDirection {
Increasing,
Decreasing,
Stable,
}
pub struct PercentileCalculator {
histogram: Histogram<u64>,
sample_count: usize,
}
impl std::fmt::Debug for PercentileCalculator {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PercentileCalculator")
.field("sample_count", &self.sample_count)
.field("min_us", &self.histogram.min())
.field("max_us", &self.histogram.max())
.field("mean_us", &(self.histogram.mean() as u64))
.finish()
}
}
impl PerformanceMonitor {
pub fn new() -> Self {
Self {
counters: Arc::new(AtomicCounters::default()),
complex_metrics: Arc::new(RwLock::new(ComplexMetrics::default())),
start_time: Instant::now(),
}
}
pub async fn record_search(&self, latency: Duration, _result_count: usize, success: bool) {
self.counters
.total_searches
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if success {
self.counters
.successful_searches
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
} else {
self.counters
.failed_searches
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
self.counters
.total_operations
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let mut complex = self.complex_metrics.write().await;
complex.search_latency_calculator.add_sample(latency);
complex.recent_search_times.push_back(Instant::now());
let now = Instant::now();
while let Some(&front_time) = complex.recent_search_times.front() {
if now.duration_since(front_time) > Duration::from_secs(60) {
complex.recent_search_times.pop_front();
} else {
break;
}
}
complex.last_updated = SystemTime::now();
}
pub async fn record_write(&self, latency: Duration, bytes_written: u64, success: bool) {
self.counters
.total_writes
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if success {
self.counters
.successful_writes
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
} else {
self.counters
.failed_writes
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
self.counters
.bytes_written
.fetch_add(bytes_written, std::sync::atomic::Ordering::Relaxed);
self.counters
.total_operations
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let mut complex = self.complex_metrics.write().await;
complex.write_latency_calculator.add_sample(latency);
complex.recent_write_times.push_back(Instant::now());
complex.last_write_time = Some(Instant::now());
let now = Instant::now();
while let Some(&front_time) = complex.recent_write_times.front() {
if now.duration_since(front_time) > Duration::from_secs(60) {
complex.recent_write_times.pop_front();
} else {
break;
}
}
complex.last_updated = SystemTime::now();
}
pub async fn record_bloom_filter_lookup(&self, hit: bool, _lookup_time: Duration, false_positive: bool) {
if hit {
self.counters
.bloom_filter_hits
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
} else {
self.counters
.bloom_filter_misses
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
if false_positive {
self.counters
.bloom_filter_false_positives
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
self.counters
.total_operations
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
pub async fn update_resource_metrics(&self, memory_usage: usize, _disk_usage: usize, fd_count: usize) {
let mut complex = self.complex_metrics.write().await;
complex.memory_usage = memory_usage;
self.counters
.file_descriptor_count
.store(fd_count, std::sync::atomic::Ordering::Relaxed);
}
pub fn increment_operations_counter(&self) {
self.counters
.total_operations
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
pub fn add_operations_count(&self, count: u64) {
self.counters
.total_operations
.fetch_add(count, std::sync::atomic::Ordering::Relaxed);
}
pub fn increment_successful_searches(&self) {
self.counters
.successful_searches
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
pub fn increment_failed_searches(&self) {
self.counters
.failed_searches
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
pub fn increment_successful_writes(&self) {
self.counters
.successful_writes
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
pub fn increment_failed_writes(&self) {
self.counters
.failed_writes
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
pub fn add_bytes_written(&self, bytes: u64) {
self.counters
.bytes_written
.fetch_add(bytes, std::sync::atomic::Ordering::Relaxed);
}
pub async fn get_detailed_stats(&self) -> DetailedIndexStats {
let mut complex = self.complex_metrics.write().await;
let search_p50 = complex.search_latency_calculator.percentile(0.5);
let search_p95 = complex.search_latency_calculator.percentile(0.95);
let search_p99 = complex.search_latency_calculator.percentile(0.99);
let search_throughput = complex.recent_search_times.len() as f64 / 60.0; let write_throughput = complex.recent_write_times.len() as f64 / 60.0;
let bloom_hits = self
.counters
.bloom_filter_hits
.load(std::sync::atomic::Ordering::Relaxed);
let bloom_misses = self
.counters
.bloom_filter_misses
.load(std::sync::atomic::Ordering::Relaxed);
let bloom_total = bloom_hits + bloom_misses;
let bloom_hit_rate = if bloom_total > 0 {
bloom_hits as f64 / bloom_total as f64
} else {
0.0
};
let bloom_false_positives = self
.counters
.bloom_filter_false_positives
.load(std::sync::atomic::Ordering::Relaxed);
let bloom_false_positive_rate = if bloom_total > 0 {
bloom_false_positives as f64 / bloom_total as f64
} else {
0.0
};
DetailedIndexStats {
total_shards: 1, total_postings: 0,
pending_operations: 0,
memory_usage: complex.memory_usage,
disk_usage: 0, active_postings: 0,
deleted_postings: 0,
average_shard_utilization: 0.0,
vector_dimension: 0,
search_latency_p50: search_p50,
search_latency_p95: search_p95,
search_latency_p99: search_p99,
write_throughput,
read_throughput: search_throughput,
bloom_filter_hit_rate: bloom_hit_rate,
bloom_filter_false_positive_rate: bloom_false_positive_rate,
bloom_filter_memory_usage: complex.bloom_filter_memory_usage,
file_descriptor_count: self
.counters
.file_descriptor_count
.load(std::sync::atomic::Ordering::Relaxed),
memory_mapped_regions: complex.memory_mapped_regions,
wal_segment_count: complex.wal_segment_count,
active_connections: self
.counters
.active_connections
.load(std::sync::atomic::Ordering::Relaxed),
uptime: self.start_time.elapsed(),
total_operations: self
.counters
.total_operations
.load(std::sync::atomic::Ordering::Relaxed),
last_updated: complex.last_updated,
}
}
}
impl Default for PerformanceMonitor {
fn default() -> Self {
Self::new()
}
}
impl Default for PercentileCalculator {
fn default() -> Self {
Self::new()
}
}
impl PercentileCalculator {
pub fn new() -> Self {
let histogram = Histogram::new_with_bounds(HISTOGRAM_MIN_MICROS, HISTOGRAM_MAX_MICROS, HISTOGRAM_PRECISION)
.expect("Failed to create HDRHistogram with valid bounds");
Self {
histogram,
sample_count: 0,
}
}
pub fn add_sample(&mut self, duration: Duration) {
let micros = duration.as_micros() as u64;
let clamped_micros = micros.clamp(HISTOGRAM_MIN_MICROS, HISTOGRAM_MAX_MICROS);
if let Err(e) = self.histogram.record(clamped_micros) {
tracing::warn!("Failed to record histogram sample {}: {}", clamped_micros, e);
} else {
self.sample_count += 1;
}
}
pub fn percentile(&mut self, p: f64) -> Duration {
if self.sample_count == 0 {
return Duration::ZERO;
}
let percentile_value = (p * 100.0).clamp(0.0, 100.0);
let micros = self.histogram.value_at_percentile(percentile_value);
Duration::from_micros(micros)
}
pub fn clear(&mut self) {
self.histogram.clear();
self.sample_count = 0;
}
pub fn sample_count(&self) -> usize {
self.sample_count
}
pub fn min(&self) -> Duration {
if self.sample_count == 0 {
Duration::ZERO
} else {
Duration::from_micros(self.histogram.min())
}
}
pub fn max(&self) -> Duration {
if self.sample_count == 0 {
Duration::ZERO
} else {
Duration::from_micros(self.histogram.max())
}
}
pub fn mean(&self) -> Duration {
if self.sample_count == 0 {
Duration::ZERO
} else {
Duration::from_micros(self.histogram.mean() as u64)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
const PERCENTILE_TEST_TOLERANCE_MS: u64 = 5;
#[tokio::test]
async fn test_performance_monitor() {
let monitor = PerformanceMonitor::new();
monitor
.record_write(Duration::from_millis(10), 1024, true)
.await;
monitor
.record_write(Duration::from_millis(15), 2048, true)
.await;
let stats = monitor.get_detailed_stats().await;
assert!(stats.write_throughput >= 0.0);
assert!(stats.uptime > Duration::ZERO);
}
#[tokio::test]
async fn test_bloom_filter_metrics() {
let monitor = PerformanceMonitor::new();
monitor
.record_bloom_filter_lookup(true, Duration::from_nanos(100), false)
.await;
monitor
.record_bloom_filter_lookup(false, Duration::from_nanos(150), false)
.await;
monitor
.record_bloom_filter_lookup(true, Duration::from_nanos(120), true)
.await;
let stats = monitor.get_detailed_stats().await;
assert!(stats.bloom_filter_hit_rate > 0.0);
assert!(stats.bloom_filter_false_positive_rate >= 0.0);
}
#[test]
fn test_percentile_calculator() {
let mut calc = PercentileCalculator::new();
for i in 1..=100 {
calc.add_sample(Duration::from_millis(i));
}
assert_eq!(calc.sample_count(), 100);
let p50 = calc.percentile(0.5);
let p95 = calc.percentile(0.95);
let p99 = calc.percentile(0.99);
assert!(
p50 >= Duration::from_millis(50 - PERCENTILE_TEST_TOLERANCE_MS)
&& p50 <= Duration::from_millis(50 + PERCENTILE_TEST_TOLERANCE_MS),
"P50 was {:?}, expected around 50ms",
p50
);
assert!(
p95 >= Duration::from_millis(95 - PERCENTILE_TEST_TOLERANCE_MS)
&& p95 <= Duration::from_millis(95 + PERCENTILE_TEST_TOLERANCE_MS),
"P95 was {:?}, expected around 95ms",
p95
);
assert!(
p99 >= Duration::from_millis(99 - PERCENTILE_TEST_TOLERANCE_MS)
&& p99 <= Duration::from_millis(99 + PERCENTILE_TEST_TOLERANCE_MS),
"P99 was {:?}, expected around 99ms",
p99
);
assert_eq!(calc.min(), Duration::from_millis(1));
let max_val = calc.max();
assert!(
max_val >= Duration::from_millis(100)
&& max_val <= Duration::from_millis(100 + PERCENTILE_TEST_TOLERANCE_MS),
"Max was {:?}, expected around 100ms",
max_val
);
calc.clear();
assert_eq!(calc.sample_count(), 0);
assert_eq!(calc.percentile(0.5), Duration::ZERO);
assert_eq!(calc.min(), Duration::ZERO);
assert_eq!(calc.max(), Duration::ZERO);
let mut fresh_calc = PercentileCalculator::new();
fresh_calc.add_sample(Duration::from_millis(42));
assert_eq!(fresh_calc.sample_count(), 1);
let p50 = fresh_calc.percentile(0.5);
let p95 = fresh_calc.percentile(0.95);
let min_val = fresh_calc.min();
let max_val = fresh_calc.max();
assert!(
p50 >= Duration::from_millis(35) && p50 <= Duration::from_millis(50),
"P50 was {:?}, expected around 42ms",
p50
);
assert!(
p95 >= Duration::from_millis(35) && p95 <= Duration::from_millis(50),
"P95 was {:?}, expected around 42ms",
p95
);
assert!(
min_val >= Duration::from_millis(35) && min_val <= Duration::from_millis(50),
"Min was {:?}, expected around 42ms",
min_val
);
assert!(
max_val >= Duration::from_millis(35) && max_val <= Duration::from_millis(50),
"Max was {:?}, expected around 42ms",
max_val
);
}
#[test]
fn test_percentile_calculator_with_microsecond_precision() {
let mut calc = PercentileCalculator::new();
for i in 1..=1000 {
calc.add_sample(Duration::from_micros(i));
}
assert_eq!(calc.sample_count(), 1000);
let p50 = calc.percentile(0.5);
let p95 = calc.percentile(0.95);
assert!(p50.as_micros() > 400 && p50.as_micros() < 600);
assert!(p95.as_micros() > 900 && p95.as_micros() < 1000);
let mean = calc.mean();
assert!(mean.as_micros() > 400 && mean.as_micros() < 600);
}
#[test]
fn test_percentile_calculator_extreme_values() {
let mut calc = PercentileCalculator::new();
calc.add_sample(Duration::from_nanos(1));
calc.add_sample(Duration::from_nanos(500));
calc.add_sample(Duration::from_micros(10));
calc.add_sample(Duration::from_millis(1));
assert_eq!(calc.sample_count(), 4);
let min_val = calc.min();
assert!(min_val >= Duration::from_micros(1));
let max_val = calc.max();
assert!(
max_val >= Duration::from_millis(1) && max_val <= Duration::from_millis(2),
"Max was {:?}, expected around 1ms",
max_val
);
}
}