1use serde::{Deserialize, Serialize};
49use std::collections::HashMap;
50use std::time::{Duration, Instant};
51
52use crate::cost_model::CostSummary;
53use crate::guarantee_ladder::{GuaranteeMode, StopReason};
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct QueryTelemetry {
62 pub query_id: String,
64
65 pub query_class: String,
67
68 #[serde(skip)]
70 pub start_time: Option<Instant>,
71
72 pub total_duration_us: u64,
74
75 pub routing: RoutingMetrics,
77
78 pub scan: ScanMetrics,
80
81 pub rerank: RerankMetrics,
83
84 pub cache: CacheMetrics,
86
87 pub error_envelope: ErrorEnvelopeMetrics,
89
90 pub termination: TerminationMetrics,
92
93 pub cost: Option<CostSummaryJson>,
95
96 pub tags: HashMap<String, String>,
98}
99
100#[derive(Debug, Clone, Default, Serialize, Deserialize)]
102pub struct RoutingMetrics {
103 pub duration_us: u64,
105
106 pub lists_considered: u32,
108
109 pub lists_scanned: u32,
111
112 pub centroid_comparisons: u32,
114
115 pub used_compressed_centroids: bool,
117
118 pub strategy: String,
120}
121
122#[derive(Debug, Clone, Default, Serialize, Deserialize)]
124pub struct ScanMetrics {
125 pub duration_us: u64,
127
128 pub codes_evaluated: u64,
130
131 pub ram_bytes_read: u64,
133
134 pub simd_ops: u64,
136
137 pub candidates_after_stage1: u32,
139
140 pub distance_metric: String,
142
143 pub quant_level: String,
145}
146
147#[derive(Debug, Clone, Default, Serialize, Deserialize)]
149pub struct RerankMetrics {
150 pub duration_us: u64,
152
153 pub candidates_in: u32,
155
156 pub candidates_out: u32,
158
159 pub ssd_random_reads: u32,
161
162 pub ssd_sequential_bytes: u64,
164
165 pub io_coalesced: bool,
167
168 pub coalesced_ranges: u32,
170
171 pub full_precision_distances: u32,
173}
174
175#[derive(Debug, Clone, Default, Serialize, Deserialize)]
177pub struct CacheMetrics {
178 pub centroid_cache_hits: u32,
180
181 pub centroid_cache_misses: u32,
183
184 pub vector_cache_hits: u32,
186
187 pub vector_cache_misses: u32,
189
190 pub distance_cache_hits: u32,
192
193 pub distance_cache_misses: u32,
195}
196
197impl CacheMetrics {
198 pub fn hit_ratio(&self) -> f32 {
200 let total_hits =
201 self.centroid_cache_hits + self.vector_cache_hits + self.distance_cache_hits;
202 let total_misses =
203 self.centroid_cache_misses + self.vector_cache_misses + self.distance_cache_misses;
204 let total = total_hits + total_misses;
205 if total == 0 {
206 1.0
207 } else {
208 total_hits as f32 / total as f32
209 }
210 }
211}
212
213#[derive(Debug, Clone, Default, Serialize, Deserialize)]
215pub struct ErrorEnvelopeMetrics {
216 pub guarantee_mode: String,
218
219 pub error_quantile: Option<f32>,
221
222 pub max_error_observed: f32,
224
225 pub mean_error: f32,
227
228 pub tight_bound_candidates: u32,
230
231 pub loose_bound_candidates: u32,
233}
234
235#[derive(Debug, Clone, Default, Serialize, Deserialize)]
237pub struct TerminationMetrics {
238 pub stop_reason: String,
240
241 pub probes_at_stop: u32,
243
244 pub max_probes: u32,
246
247 pub budget_exhausted: bool,
249
250 pub miss_probability: Option<f32>,
252
253 pub result_count: u32,
255}
256
257#[derive(Debug, Clone, Serialize, Deserialize)]
259pub struct CostSummaryJson {
260 pub query_class: String,
261 pub ram_bytes_used: u64,
262 pub ram_bytes_limit: u64,
263 pub ssd_random_reads_used: u32,
264 pub ssd_random_reads_limit: u32,
265 pub ssd_sequential_bytes_used: u64,
266 pub ssd_sequential_bytes_limit: u64,
267 pub cpu_cycles_used: u64,
268 pub cpu_cycles_limit: u64,
269 pub elapsed_us: u64,
270 pub latency_target_us: u64,
271 pub exhausted: bool,
272 pub exhaustion_reason: Option<String>,
273}
274
275impl From<CostSummary> for CostSummaryJson {
276 fn from(summary: CostSummary) -> Self {
277 Self {
278 query_class: summary.query_class,
279 ram_bytes_used: summary.ram_bytes_used,
280 ram_bytes_limit: summary.ram_bytes_limit,
281 ssd_random_reads_used: summary.ssd_random_reads_used,
282 ssd_random_reads_limit: summary.ssd_random_reads_limit,
283 ssd_sequential_bytes_used: summary.ssd_sequential_bytes_used,
284 ssd_sequential_bytes_limit: summary.ssd_sequential_bytes_limit,
285 cpu_cycles_used: summary.cpu_cycles_used,
286 cpu_cycles_limit: summary.cpu_cycles_limit,
287 elapsed_us: summary.elapsed.as_micros() as u64,
288 latency_target_us: summary.latency_target.as_micros() as u64,
289 exhausted: summary.exhausted,
290 exhaustion_reason: summary.exhaustion_reason.map(|r| format!("{:?}", r)),
291 }
292 }
293}
294
295impl QueryTelemetry {
296 pub fn new(query_class: &str) -> Self {
298 Self {
299 query_id: uuid_v4(),
300 query_class: query_class.to_string(),
301 start_time: Some(Instant::now()),
302 total_duration_us: 0,
303 routing: RoutingMetrics::default(),
304 scan: ScanMetrics::default(),
305 rerank: RerankMetrics::default(),
306 cache: CacheMetrics::default(),
307 error_envelope: ErrorEnvelopeMetrics::default(),
308 termination: TerminationMetrics::default(),
309 cost: None,
310 tags: HashMap::new(),
311 }
312 }
313
314 pub fn with_id(query_id: &str, query_class: &str) -> Self {
316 let mut t = Self::new(query_class);
317 t.query_id = query_id.to_string();
318 t
319 }
320
321 pub fn record_routing(
323 &mut self,
324 duration: Duration,
325 lists_considered: u32,
326 lists_scanned: u32,
327 ) {
328 self.routing.duration_us = duration.as_micros() as u64;
329 self.routing.lists_considered = lists_considered;
330 self.routing.lists_scanned = lists_scanned;
331 }
332
333 pub fn record_routing_full(
335 &mut self,
336 duration: Duration,
337 lists_considered: u32,
338 lists_scanned: u32,
339 centroid_comparisons: u32,
340 used_compressed: bool,
341 strategy: &str,
342 ) {
343 self.routing.duration_us = duration.as_micros() as u64;
344 self.routing.lists_considered = lists_considered;
345 self.routing.lists_scanned = lists_scanned;
346 self.routing.centroid_comparisons = centroid_comparisons;
347 self.routing.used_compressed_centroids = used_compressed;
348 self.routing.strategy = strategy.to_string();
349 }
350
351 pub fn record_scan(&mut self, codes_evaluated: u64, ram_bytes: u64) {
353 self.scan.codes_evaluated = codes_evaluated;
354 self.scan.ram_bytes_read = ram_bytes;
355 }
356
357 pub fn record_scan_full(
359 &mut self,
360 duration: Duration,
361 codes_evaluated: u64,
362 ram_bytes: u64,
363 simd_ops: u64,
364 candidates_stage1: u32,
365 distance_metric: &str,
366 quant_level: &str,
367 ) {
368 self.scan.duration_us = duration.as_micros() as u64;
369 self.scan.codes_evaluated = codes_evaluated;
370 self.scan.ram_bytes_read = ram_bytes;
371 self.scan.simd_ops = simd_ops;
372 self.scan.candidates_after_stage1 = candidates_stage1;
373 self.scan.distance_metric = distance_metric.to_string();
374 self.scan.quant_level = quant_level.to_string();
375 }
376
377 pub fn record_rerank(
379 &mut self,
380 duration: Duration,
381 candidates_in: u32,
382 candidates_out: u32,
383 ssd_random_reads: u32,
384 ssd_sequential_bytes: u64,
385 ) {
386 self.rerank.duration_us = duration.as_micros() as u64;
387 self.rerank.candidates_in = candidates_in;
388 self.rerank.candidates_out = candidates_out;
389 self.rerank.ssd_random_reads = ssd_random_reads;
390 self.rerank.ssd_sequential_bytes = ssd_sequential_bytes;
391 }
392
393 pub fn record_io_coalescing(&mut self, coalesced: bool, ranges: u32) {
395 self.rerank.io_coalesced = coalesced;
396 self.rerank.coalesced_ranges = ranges;
397 }
398
399 pub fn record_cache_hit(&mut self, cache_type: CacheType) {
401 match cache_type {
402 CacheType::Centroid => self.cache.centroid_cache_hits += 1,
403 CacheType::Vector => self.cache.vector_cache_hits += 1,
404 CacheType::Distance => self.cache.distance_cache_hits += 1,
405 }
406 }
407
408 pub fn record_cache_miss(&mut self, cache_type: CacheType) {
410 match cache_type {
411 CacheType::Centroid => self.cache.centroid_cache_misses += 1,
412 CacheType::Vector => self.cache.vector_cache_misses += 1,
413 CacheType::Distance => self.cache.distance_cache_misses += 1,
414 }
415 }
416
417 pub fn set_guarantee_mode(&mut self, mode: &GuaranteeMode) {
419 self.error_envelope.guarantee_mode = format!("{:?}", mode);
420 self.error_envelope.error_quantile = mode.error_quantile();
421 }
422
423 pub fn record_error_bounds(&mut self, max_error: f32, mean_error: f32) {
425 self.error_envelope.max_error_observed = max_error;
426 self.error_envelope.mean_error = mean_error;
427 }
428
429 pub fn set_stop_reason(&mut self, reason: StopReason, probes: u32, max_probes: u32) {
431 self.termination.stop_reason = format!("{:?}", reason);
432 self.termination.probes_at_stop = probes;
433 self.termination.max_probes = max_probes;
434 self.termination.budget_exhausted = matches!(reason, StopReason::BudgetExhausted);
435 }
436
437 pub fn set_miss_probability(&mut self, prob: f32) {
439 self.termination.miss_probability = Some(prob);
440 }
441
442 pub fn set_result_count(&mut self, count: u32) {
444 self.termination.result_count = count;
445 }
446
447 pub fn attach_cost(&mut self, summary: CostSummary) {
449 self.cost = Some(summary.into());
450 }
451
452 pub fn add_tag(&mut self, key: &str, value: &str) {
454 self.tags.insert(key.to_string(), value.to_string());
455 }
456
457 pub fn finalize(&mut self) {
459 if let Some(start) = self.start_time.take() {
460 self.total_duration_us = start.elapsed().as_micros() as u64;
461 }
462 }
463
464 pub fn to_json(&self) -> String {
466 serde_json::to_string(self).unwrap_or_else(|_| "{}".to_string())
467 }
468
469 pub fn to_json_pretty(&self) -> String {
471 serde_json::to_string_pretty(self).unwrap_or_else(|_| "{}".to_string())
472 }
473}
474
475#[derive(Debug, Clone, Copy)]
477pub enum CacheType {
478 Centroid,
479 Vector,
480 Distance,
481}
482
483pub struct TelemetryCollector {
489 entries: parking_lot::RwLock<Vec<QueryTelemetry>>,
491
492 max_entries: usize,
494
495 emit_callback: parking_lot::RwLock<Option<Box<dyn Fn(&QueryTelemetry) + Send + Sync>>>,
497}
498
499impl TelemetryCollector {
500 pub fn new(max_entries: usize) -> Self {
502 Self {
503 entries: parking_lot::RwLock::new(Vec::with_capacity(max_entries)),
504 max_entries,
505 emit_callback: parking_lot::RwLock::new(None),
506 }
507 }
508
509 pub fn set_emit_callback<F>(&self, callback: F)
511 where
512 F: Fn(&QueryTelemetry) + Send + Sync + 'static,
513 {
514 *self.emit_callback.write() = Some(Box::new(callback));
515 }
516
517 pub fn record(&self, mut telemetry: QueryTelemetry) {
519 telemetry.finalize();
520
521 if let Some(callback) = &*self.emit_callback.read() {
523 callback(&telemetry);
524 }
525
526 let mut entries = self.entries.write();
528 if entries.len() >= self.max_entries {
529 entries.remove(0);
530 }
531 entries.push(telemetry);
532 }
533
534 pub fn recent(&self, count: usize) -> Vec<QueryTelemetry> {
536 let entries = self.entries.read();
537 let start = entries.len().saturating_sub(count);
538 entries[start..].to_vec()
539 }
540
541 pub fn aggregate(&self) -> TelemetryAggregate {
543 let entries = self.entries.read();
544
545 if entries.is_empty() {
546 return TelemetryAggregate::default();
547 }
548
549 let n = entries.len();
550 let mut durations: Vec<u64> = entries.iter().map(|e| e.total_duration_us).collect();
551 durations.sort_unstable();
552
553 let total_duration: u64 = durations.iter().sum();
554 let p50 = durations[n / 2];
555 let p99 = durations[(n * 99) / 100];
556 let max = durations[n - 1];
557
558 let total_ram_bytes: u64 = entries.iter().map(|e| e.scan.ram_bytes_read).sum();
559 let total_codes: u64 = entries.iter().map(|e| e.scan.codes_evaluated).sum();
560
561 let budget_exhausted = entries
562 .iter()
563 .filter(|e| e.termination.budget_exhausted)
564 .count();
565
566 TelemetryAggregate {
567 query_count: n,
568 mean_duration_us: total_duration / n as u64,
569 p50_duration_us: p50,
570 p99_duration_us: p99,
571 max_duration_us: max,
572 total_ram_bytes_read: total_ram_bytes,
573 total_codes_evaluated: total_codes,
574 budget_exhausted_count: budget_exhausted,
575 cache_hit_ratio: entries.iter().map(|e| e.cache.hit_ratio()).sum::<f32>() / n as f32,
576 }
577 }
578
579 pub fn clear(&self) {
581 self.entries.write().clear();
582 }
583}
584
585impl Default for TelemetryCollector {
586 fn default() -> Self {
587 Self::new(10000)
588 }
589}
590
591#[derive(Debug, Clone, Default, Serialize, Deserialize)]
593pub struct TelemetryAggregate {
594 pub query_count: usize,
595 pub mean_duration_us: u64,
596 pub p50_duration_us: u64,
597 pub p99_duration_us: u64,
598 pub max_duration_us: u64,
599 pub total_ram_bytes_read: u64,
600 pub total_codes_evaluated: u64,
601 pub budget_exhausted_count: usize,
602 pub cache_hit_ratio: f32,
603}
604
605fn uuid_v4() -> String {
611 use std::time::{SystemTime, UNIX_EPOCH};
612 let now = SystemTime::now()
613 .duration_since(UNIX_EPOCH)
614 .unwrap()
615 .as_nanos();
616 format!("{:032x}", now)
617}
618
619#[cfg(test)]
620mod tests {
621 use super::*;
622
623 #[test]
624 fn test_telemetry_creation() {
625 let mut telemetry = QueryTelemetry::new("test");
626
627 telemetry.record_routing(Duration::from_micros(500), 100, 16);
628 telemetry.record_scan(10000, 16 * 1024 * 1024);
629 telemetry.record_rerank(Duration::from_micros(1000), 100, 10, 0, 0);
630
631 telemetry.finalize();
632
633 let _ = telemetry.total_duration_us;
636 assert_eq!(telemetry.routing.lists_considered, 100);
637 assert_eq!(telemetry.scan.codes_evaluated, 10000);
638 }
639
640 #[test]
641 fn test_telemetry_json() {
642 let mut telemetry = QueryTelemetry::new("balanced");
643 telemetry.record_routing(Duration::from_micros(100), 50, 8);
644 telemetry.finalize();
645
646 let json = telemetry.to_json();
647 assert!(json.contains("balanced"));
648 assert!(json.contains("lists_considered"));
649 }
650
651 #[test]
652 fn test_collector() {
653 let collector = TelemetryCollector::new(100);
654
655 for i in 0..10 {
656 let mut t = QueryTelemetry::new("test");
657 t.total_duration_us = i * 100;
658 collector.record(t);
659 }
660
661 let recent = collector.recent(5);
662 assert_eq!(recent.len(), 5);
663
664 let agg = collector.aggregate();
665 assert_eq!(agg.query_count, 10);
666 }
667
668 #[test]
669 fn test_cache_hit_ratio() {
670 let mut cache = CacheMetrics::default();
671 cache.centroid_cache_hits = 80;
672 cache.centroid_cache_misses = 20;
673
674 assert!((cache.hit_ratio() - 0.8).abs() < 0.01);
675 }
676}