use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::{Duration, Instant};
use crate::cost_model::CostSummary;
use crate::guarantee_ladder::{GuaranteeMode, StopReason};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueryTelemetry {
pub query_id: String,
pub query_class: String,
#[serde(skip)]
pub start_time: Option<Instant>,
pub total_duration_us: u64,
pub routing: RoutingMetrics,
pub scan: ScanMetrics,
pub rerank: RerankMetrics,
pub cache: CacheMetrics,
pub error_envelope: ErrorEnvelopeMetrics,
pub termination: TerminationMetrics,
pub cost: Option<CostSummaryJson>,
pub tags: HashMap<String, String>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct RoutingMetrics {
pub duration_us: u64,
pub lists_considered: u32,
pub lists_scanned: u32,
pub centroid_comparisons: u32,
pub used_compressed_centroids: bool,
pub strategy: String,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ScanMetrics {
pub duration_us: u64,
pub codes_evaluated: u64,
pub ram_bytes_read: u64,
pub simd_ops: u64,
pub candidates_after_stage1: u32,
pub distance_metric: String,
pub quant_level: String,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct RerankMetrics {
pub duration_us: u64,
pub candidates_in: u32,
pub candidates_out: u32,
pub ssd_random_reads: u32,
pub ssd_sequential_bytes: u64,
pub io_coalesced: bool,
pub coalesced_ranges: u32,
pub full_precision_distances: u32,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct CacheMetrics {
pub centroid_cache_hits: u32,
pub centroid_cache_misses: u32,
pub vector_cache_hits: u32,
pub vector_cache_misses: u32,
pub distance_cache_hits: u32,
pub distance_cache_misses: u32,
}
impl CacheMetrics {
pub fn hit_ratio(&self) -> f32 {
let total_hits =
self.centroid_cache_hits + self.vector_cache_hits + self.distance_cache_hits;
let total_misses =
self.centroid_cache_misses + self.vector_cache_misses + self.distance_cache_misses;
let total = total_hits + total_misses;
if total == 0 {
1.0
} else {
total_hits as f32 / total as f32
}
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ErrorEnvelopeMetrics {
pub guarantee_mode: String,
pub error_quantile: Option<f32>,
pub max_error_observed: f32,
pub mean_error: f32,
pub tight_bound_candidates: u32,
pub loose_bound_candidates: u32,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct TerminationMetrics {
pub stop_reason: String,
pub probes_at_stop: u32,
pub max_probes: u32,
pub budget_exhausted: bool,
pub miss_probability: Option<f32>,
pub result_count: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CostSummaryJson {
pub query_class: String,
pub ram_bytes_used: u64,
pub ram_bytes_limit: u64,
pub ssd_random_reads_used: u32,
pub ssd_random_reads_limit: u32,
pub ssd_sequential_bytes_used: u64,
pub ssd_sequential_bytes_limit: u64,
pub cpu_cycles_used: u64,
pub cpu_cycles_limit: u64,
pub elapsed_us: u64,
pub latency_target_us: u64,
pub exhausted: bool,
pub exhaustion_reason: Option<String>,
}
impl From<CostSummary> for CostSummaryJson {
fn from(summary: CostSummary) -> Self {
Self {
query_class: summary.query_class,
ram_bytes_used: summary.ram_bytes_used,
ram_bytes_limit: summary.ram_bytes_limit,
ssd_random_reads_used: summary.ssd_random_reads_used,
ssd_random_reads_limit: summary.ssd_random_reads_limit,
ssd_sequential_bytes_used: summary.ssd_sequential_bytes_used,
ssd_sequential_bytes_limit: summary.ssd_sequential_bytes_limit,
cpu_cycles_used: summary.cpu_cycles_used,
cpu_cycles_limit: summary.cpu_cycles_limit,
elapsed_us: summary.elapsed.as_micros() as u64,
latency_target_us: summary.latency_target.as_micros() as u64,
exhausted: summary.exhausted,
exhaustion_reason: summary.exhaustion_reason.map(|r| format!("{:?}", r)),
}
}
}
impl QueryTelemetry {
pub fn new(query_class: &str) -> Self {
Self {
query_id: uuid_v4(),
query_class: query_class.to_string(),
start_time: Some(Instant::now()),
total_duration_us: 0,
routing: RoutingMetrics::default(),
scan: ScanMetrics::default(),
rerank: RerankMetrics::default(),
cache: CacheMetrics::default(),
error_envelope: ErrorEnvelopeMetrics::default(),
termination: TerminationMetrics::default(),
cost: None,
tags: HashMap::new(),
}
}
pub fn with_id(query_id: &str, query_class: &str) -> Self {
let mut t = Self::new(query_class);
t.query_id = query_id.to_string();
t
}
pub fn record_routing(
&mut self,
duration: Duration,
lists_considered: u32,
lists_scanned: u32,
) {
self.routing.duration_us = duration.as_micros() as u64;
self.routing.lists_considered = lists_considered;
self.routing.lists_scanned = lists_scanned;
}
pub fn record_routing_full(
&mut self,
duration: Duration,
lists_considered: u32,
lists_scanned: u32,
centroid_comparisons: u32,
used_compressed: bool,
strategy: &str,
) {
self.routing.duration_us = duration.as_micros() as u64;
self.routing.lists_considered = lists_considered;
self.routing.lists_scanned = lists_scanned;
self.routing.centroid_comparisons = centroid_comparisons;
self.routing.used_compressed_centroids = used_compressed;
self.routing.strategy = strategy.to_string();
}
pub fn record_scan(&mut self, codes_evaluated: u64, ram_bytes: u64) {
self.scan.codes_evaluated = codes_evaluated;
self.scan.ram_bytes_read = ram_bytes;
}
pub fn record_scan_full(
&mut self,
duration: Duration,
codes_evaluated: u64,
ram_bytes: u64,
simd_ops: u64,
candidates_stage1: u32,
distance_metric: &str,
quant_level: &str,
) {
self.scan.duration_us = duration.as_micros() as u64;
self.scan.codes_evaluated = codes_evaluated;
self.scan.ram_bytes_read = ram_bytes;
self.scan.simd_ops = simd_ops;
self.scan.candidates_after_stage1 = candidates_stage1;
self.scan.distance_metric = distance_metric.to_string();
self.scan.quant_level = quant_level.to_string();
}
pub fn record_rerank(
&mut self,
duration: Duration,
candidates_in: u32,
candidates_out: u32,
ssd_random_reads: u32,
ssd_sequential_bytes: u64,
) {
self.rerank.duration_us = duration.as_micros() as u64;
self.rerank.candidates_in = candidates_in;
self.rerank.candidates_out = candidates_out;
self.rerank.ssd_random_reads = ssd_random_reads;
self.rerank.ssd_sequential_bytes = ssd_sequential_bytes;
}
pub fn record_io_coalescing(&mut self, coalesced: bool, ranges: u32) {
self.rerank.io_coalesced = coalesced;
self.rerank.coalesced_ranges = ranges;
}
pub fn record_cache_hit(&mut self, cache_type: CacheType) {
match cache_type {
CacheType::Centroid => self.cache.centroid_cache_hits += 1,
CacheType::Vector => self.cache.vector_cache_hits += 1,
CacheType::Distance => self.cache.distance_cache_hits += 1,
}
}
pub fn record_cache_miss(&mut self, cache_type: CacheType) {
match cache_type {
CacheType::Centroid => self.cache.centroid_cache_misses += 1,
CacheType::Vector => self.cache.vector_cache_misses += 1,
CacheType::Distance => self.cache.distance_cache_misses += 1,
}
}
pub fn set_guarantee_mode(&mut self, mode: &GuaranteeMode) {
self.error_envelope.guarantee_mode = format!("{:?}", mode);
self.error_envelope.error_quantile = mode.error_quantile();
}
pub fn record_error_bounds(&mut self, max_error: f32, mean_error: f32) {
self.error_envelope.max_error_observed = max_error;
self.error_envelope.mean_error = mean_error;
}
pub fn set_stop_reason(&mut self, reason: StopReason, probes: u32, max_probes: u32) {
self.termination.stop_reason = format!("{:?}", reason);
self.termination.probes_at_stop = probes;
self.termination.max_probes = max_probes;
self.termination.budget_exhausted = matches!(reason, StopReason::BudgetExhausted);
}
pub fn set_miss_probability(&mut self, prob: f32) {
self.termination.miss_probability = Some(prob);
}
pub fn set_result_count(&mut self, count: u32) {
self.termination.result_count = count;
}
pub fn attach_cost(&mut self, summary: CostSummary) {
self.cost = Some(summary.into());
}
pub fn add_tag(&mut self, key: &str, value: &str) {
self.tags.insert(key.to_string(), value.to_string());
}
pub fn finalize(&mut self) {
if let Some(start) = self.start_time.take() {
self.total_duration_us = start.elapsed().as_micros() as u64;
}
}
pub fn to_json(&self) -> String {
serde_json::to_string(self).unwrap_or_else(|_| "{}".to_string())
}
pub fn to_json_pretty(&self) -> String {
serde_json::to_string_pretty(self).unwrap_or_else(|_| "{}".to_string())
}
}
#[derive(Debug, Clone, Copy)]
pub enum CacheType {
Centroid,
Vector,
Distance,
}
pub struct TelemetryCollector {
entries: parking_lot::RwLock<Vec<QueryTelemetry>>,
max_entries: usize,
emit_callback: parking_lot::RwLock<Option<Box<dyn Fn(&QueryTelemetry) + Send + Sync>>>,
}
impl TelemetryCollector {
pub fn new(max_entries: usize) -> Self {
Self {
entries: parking_lot::RwLock::new(Vec::with_capacity(max_entries)),
max_entries,
emit_callback: parking_lot::RwLock::new(None),
}
}
pub fn set_emit_callback<F>(&self, callback: F)
where
F: Fn(&QueryTelemetry) + Send + Sync + 'static,
{
*self.emit_callback.write() = Some(Box::new(callback));
}
pub fn record(&self, mut telemetry: QueryTelemetry) {
telemetry.finalize();
if let Some(callback) = &*self.emit_callback.read() {
callback(&telemetry);
}
let mut entries = self.entries.write();
if entries.len() >= self.max_entries {
entries.remove(0);
}
entries.push(telemetry);
}
pub fn recent(&self, count: usize) -> Vec<QueryTelemetry> {
let entries = self.entries.read();
let start = entries.len().saturating_sub(count);
entries[start..].to_vec()
}
pub fn aggregate(&self) -> TelemetryAggregate {
let entries = self.entries.read();
if entries.is_empty() {
return TelemetryAggregate::default();
}
let n = entries.len();
let mut durations: Vec<u64> = entries.iter().map(|e| e.total_duration_us).collect();
durations.sort_unstable();
let total_duration: u64 = durations.iter().sum();
let p50 = durations[n / 2];
let p99 = durations[(n * 99) / 100];
let max = durations[n - 1];
let total_ram_bytes: u64 = entries.iter().map(|e| e.scan.ram_bytes_read).sum();
let total_codes: u64 = entries.iter().map(|e| e.scan.codes_evaluated).sum();
let budget_exhausted = entries
.iter()
.filter(|e| e.termination.budget_exhausted)
.count();
TelemetryAggregate {
query_count: n,
mean_duration_us: total_duration / n as u64,
p50_duration_us: p50,
p99_duration_us: p99,
max_duration_us: max,
total_ram_bytes_read: total_ram_bytes,
total_codes_evaluated: total_codes,
budget_exhausted_count: budget_exhausted,
cache_hit_ratio: entries.iter().map(|e| e.cache.hit_ratio()).sum::<f32>() / n as f32,
}
}
pub fn clear(&self) {
self.entries.write().clear();
}
}
impl Default for TelemetryCollector {
fn default() -> Self {
Self::new(10000)
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct TelemetryAggregate {
pub query_count: usize,
pub mean_duration_us: u64,
pub p50_duration_us: u64,
pub p99_duration_us: u64,
pub max_duration_us: u64,
pub total_ram_bytes_read: u64,
pub total_codes_evaluated: u64,
pub budget_exhausted_count: usize,
pub cache_hit_ratio: f32,
}
fn uuid_v4() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos();
format!("{:032x}", now)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_telemetry_creation() {
let mut telemetry = QueryTelemetry::new("test");
telemetry.record_routing(Duration::from_micros(500), 100, 16);
telemetry.record_scan(10000, 16 * 1024 * 1024);
telemetry.record_rerank(Duration::from_micros(1000), 100, 10, 0, 0);
telemetry.finalize();
let _ = telemetry.total_duration_us;
assert_eq!(telemetry.routing.lists_considered, 100);
assert_eq!(telemetry.scan.codes_evaluated, 10000);
}
#[test]
fn test_telemetry_json() {
let mut telemetry = QueryTelemetry::new("balanced");
telemetry.record_routing(Duration::from_micros(100), 50, 8);
telemetry.finalize();
let json = telemetry.to_json();
assert!(json.contains("balanced"));
assert!(json.contains("lists_considered"));
}
#[test]
fn test_collector() {
let collector = TelemetryCollector::new(100);
for i in 0..10 {
let mut t = QueryTelemetry::new("test");
t.total_duration_us = i * 100;
collector.record(t);
}
let recent = collector.recent(5);
assert_eq!(recent.len(), 5);
let agg = collector.aggregate();
assert_eq!(agg.query_count, 10);
}
#[test]
fn test_cache_hit_ratio() {
let mut cache = CacheMetrics::default();
cache.centroid_cache_hits = 80;
cache.centroid_cache_misses = 20;
assert!((cache.hit_ratio() - 0.8).abs() < 0.01);
}
}