Skip to main content

sochdb_vector/
query_telemetry.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! # Per-Query Telemetry (Task 10)
19//!
20//! Structured telemetry for every query to enable:
21//! - Optimization: falsifiable hypotheses about performance
22//! - Regression detection: automatic SLA monitoring
23//! - Explainability: "why was this query slow?"
24//!
25//! ## Metrics Captured
26//!
27//! - Routing: time, lists considered/scanned
28//! - Scan: codes evaluated, RAM bytes read
29//! - Rerank: candidates, SSD ops/bytes
30//! - Cache: hit ratio
31//! - Error: estimated ε envelope used
32//! - Stop: termination mode and reason
33//!
34//! ## Usage
35//!
36//! ```rust,ignore
37//! use sochdb_vector::query_telemetry::{QueryTelemetry, TelemetryCollector};
38//!
39//! let mut telemetry = QueryTelemetry::new("search_v1");
40//! telemetry.record_routing(Duration::from_micros(500), 100, 16);
41//! telemetry.record_scan(1024, 16 * 1024 * 1024);
42//! telemetry.set_stop_reason(StopReason::BoundSatisfied);
43//!
44//! // Emit structured telemetry
45//! let json = telemetry.to_json();
46//! ```
47
48use serde::{Deserialize, Serialize};
49use std::collections::HashMap;
50use std::time::{Duration, Instant};
51
52use crate::cost_model::CostSummary;
53use crate::guarantee_ladder::{GuaranteeMode, StopReason};
54
55// ============================================================================
56// Query Telemetry
57// ============================================================================
58
59/// Comprehensive per-query telemetry
60#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct QueryTelemetry {
62    /// Query identifier (for correlation)
63    pub query_id: String,
64
65    /// Query class (e.g., "low_latency", "high_recall")
66    pub query_class: String,
67
68    /// Timestamp when query started
69    #[serde(skip)]
70    pub start_time: Option<Instant>,
71
72    /// Total query duration
73    pub total_duration_us: u64,
74
75    /// Routing phase metrics
76    pub routing: RoutingMetrics,
77
78    /// Scan phase metrics
79    pub scan: ScanMetrics,
80
81    /// Rerank phase metrics
82    pub rerank: RerankMetrics,
83
84    /// Cache metrics
85    pub cache: CacheMetrics,
86
87    /// Error envelope metrics
88    pub error_envelope: ErrorEnvelopeMetrics,
89
90    /// Termination metrics
91    pub termination: TerminationMetrics,
92
93    /// Cost summary (if budget tracking enabled)
94    pub cost: Option<CostSummaryJson>,
95
96    /// Custom tags for filtering/grouping
97    pub tags: HashMap<String, String>,
98}
99
100/// Routing phase metrics
101#[derive(Debug, Clone, Default, Serialize, Deserialize)]
102pub struct RoutingMetrics {
103    /// Time spent in routing phase
104    pub duration_us: u64,
105
106    /// Total lists/partitions considered
107    pub lists_considered: u32,
108
109    /// Lists actually scanned
110    pub lists_scanned: u32,
111
112    /// Centroid comparisons performed
113    pub centroid_comparisons: u32,
114
115    /// Whether routing used compressed centroids
116    pub used_compressed_centroids: bool,
117
118    /// Routing strategy used
119    pub strategy: String,
120}
121
122/// Scan phase metrics
123#[derive(Debug, Clone, Default, Serialize, Deserialize)]
124pub struct ScanMetrics {
125    /// Time spent in scan phase
126    pub duration_us: u64,
127
128    /// Number of codes/vectors evaluated
129    pub codes_evaluated: u64,
130
131    /// RAM bytes read
132    pub ram_bytes_read: u64,
133
134    /// Number of SIMD operations
135    pub simd_ops: u64,
136
137    /// Vectors passing first-stage filter
138    pub candidates_after_stage1: u32,
139
140    /// Distance metric used
141    pub distance_metric: String,
142
143    /// Quantization level used
144    pub quant_level: String,
145}
146
147/// Rerank phase metrics
148#[derive(Debug, Clone, Default, Serialize, Deserialize)]
149pub struct RerankMetrics {
150    /// Time spent in rerank phase
151    pub duration_us: u64,
152
153    /// Candidates entering rerank
154    pub candidates_in: u32,
155
156    /// Candidates after rerank
157    pub candidates_out: u32,
158
159    /// SSD random read operations
160    pub ssd_random_reads: u32,
161
162    /// SSD sequential bytes read
163    pub ssd_sequential_bytes: u64,
164
165    /// Whether IO was coalesced
166    pub io_coalesced: bool,
167
168    /// Number of IO ranges after coalescing
169    pub coalesced_ranges: u32,
170
171    /// Full-precision distance computations
172    pub full_precision_distances: u32,
173}
174
175/// Cache metrics
176#[derive(Debug, Clone, Default, Serialize, Deserialize)]
177pub struct CacheMetrics {
178    /// Centroid cache hits
179    pub centroid_cache_hits: u32,
180
181    /// Centroid cache misses
182    pub centroid_cache_misses: u32,
183
184    /// Vector cache hits
185    pub vector_cache_hits: u32,
186
187    /// Vector cache misses
188    pub vector_cache_misses: u32,
189
190    /// Distance cache hits
191    pub distance_cache_hits: u32,
192
193    /// Distance cache misses
194    pub distance_cache_misses: u32,
195}
196
197impl CacheMetrics {
198    /// Compute overall cache hit ratio
199    pub fn hit_ratio(&self) -> f32 {
200        let total_hits =
201            self.centroid_cache_hits + self.vector_cache_hits + self.distance_cache_hits;
202        let total_misses =
203            self.centroid_cache_misses + self.vector_cache_misses + self.distance_cache_misses;
204        let total = total_hits + total_misses;
205        if total == 0 {
206            1.0
207        } else {
208            total_hits as f32 / total as f32
209        }
210    }
211}
212
213/// Error envelope metrics
214#[derive(Debug, Clone, Default, Serialize, Deserialize)]
215pub struct ErrorEnvelopeMetrics {
216    /// Guarantee mode used
217    pub guarantee_mode: String,
218
219    /// Error quantile used (for calibrated mode)
220    pub error_quantile: Option<f32>,
221
222    /// Maximum error bound observed
223    pub max_error_observed: f32,
224
225    /// Mean error bound
226    pub mean_error: f32,
227
228    /// Number of candidates with tight bounds
229    pub tight_bound_candidates: u32,
230
231    /// Number of candidates with loose bounds
232    pub loose_bound_candidates: u32,
233}
234
235/// Termination metrics
236#[derive(Debug, Clone, Default, Serialize, Deserialize)]
237pub struct TerminationMetrics {
238    /// Stop reason code
239    pub stop_reason: String,
240
241    /// Probes completed when stopped
242    pub probes_at_stop: u32,
243
244    /// Max probes allowed
245    pub max_probes: u32,
246
247    /// Whether budget was exhausted
248    pub budget_exhausted: bool,
249
250    /// Estimated miss probability (for calibrated mode)
251    pub miss_probability: Option<f32>,
252
253    /// Final result count
254    pub result_count: u32,
255}
256
257/// JSON-serializable cost summary
258#[derive(Debug, Clone, Serialize, Deserialize)]
259pub struct CostSummaryJson {
260    pub query_class: String,
261    pub ram_bytes_used: u64,
262    pub ram_bytes_limit: u64,
263    pub ssd_random_reads_used: u32,
264    pub ssd_random_reads_limit: u32,
265    pub ssd_sequential_bytes_used: u64,
266    pub ssd_sequential_bytes_limit: u64,
267    pub cpu_cycles_used: u64,
268    pub cpu_cycles_limit: u64,
269    pub elapsed_us: u64,
270    pub latency_target_us: u64,
271    pub exhausted: bool,
272    pub exhaustion_reason: Option<String>,
273}
274
275impl From<CostSummary> for CostSummaryJson {
276    fn from(summary: CostSummary) -> Self {
277        Self {
278            query_class: summary.query_class,
279            ram_bytes_used: summary.ram_bytes_used,
280            ram_bytes_limit: summary.ram_bytes_limit,
281            ssd_random_reads_used: summary.ssd_random_reads_used,
282            ssd_random_reads_limit: summary.ssd_random_reads_limit,
283            ssd_sequential_bytes_used: summary.ssd_sequential_bytes_used,
284            ssd_sequential_bytes_limit: summary.ssd_sequential_bytes_limit,
285            cpu_cycles_used: summary.cpu_cycles_used,
286            cpu_cycles_limit: summary.cpu_cycles_limit,
287            elapsed_us: summary.elapsed.as_micros() as u64,
288            latency_target_us: summary.latency_target.as_micros() as u64,
289            exhausted: summary.exhausted,
290            exhaustion_reason: summary.exhaustion_reason.map(|r| format!("{:?}", r)),
291        }
292    }
293}
294
295impl QueryTelemetry {
296    /// Create new telemetry for a query
297    pub fn new(query_class: &str) -> Self {
298        Self {
299            query_id: uuid_v4(),
300            query_class: query_class.to_string(),
301            start_time: Some(Instant::now()),
302            total_duration_us: 0,
303            routing: RoutingMetrics::default(),
304            scan: ScanMetrics::default(),
305            rerank: RerankMetrics::default(),
306            cache: CacheMetrics::default(),
307            error_envelope: ErrorEnvelopeMetrics::default(),
308            termination: TerminationMetrics::default(),
309            cost: None,
310            tags: HashMap::new(),
311        }
312    }
313
314    /// Create with specific query ID
315    pub fn with_id(query_id: &str, query_class: &str) -> Self {
316        let mut t = Self::new(query_class);
317        t.query_id = query_id.to_string();
318        t
319    }
320
321    /// Record routing phase
322    pub fn record_routing(
323        &mut self,
324        duration: Duration,
325        lists_considered: u32,
326        lists_scanned: u32,
327    ) {
328        self.routing.duration_us = duration.as_micros() as u64;
329        self.routing.lists_considered = lists_considered;
330        self.routing.lists_scanned = lists_scanned;
331    }
332
333    /// Record routing with full details
334    pub fn record_routing_full(
335        &mut self,
336        duration: Duration,
337        lists_considered: u32,
338        lists_scanned: u32,
339        centroid_comparisons: u32,
340        used_compressed: bool,
341        strategy: &str,
342    ) {
343        self.routing.duration_us = duration.as_micros() as u64;
344        self.routing.lists_considered = lists_considered;
345        self.routing.lists_scanned = lists_scanned;
346        self.routing.centroid_comparisons = centroid_comparisons;
347        self.routing.used_compressed_centroids = used_compressed;
348        self.routing.strategy = strategy.to_string();
349    }
350
351    /// Record scan phase
352    pub fn record_scan(&mut self, codes_evaluated: u64, ram_bytes: u64) {
353        self.scan.codes_evaluated = codes_evaluated;
354        self.scan.ram_bytes_read = ram_bytes;
355    }
356
357    /// Record scan with full details
358    pub fn record_scan_full(
359        &mut self,
360        duration: Duration,
361        codes_evaluated: u64,
362        ram_bytes: u64,
363        simd_ops: u64,
364        candidates_stage1: u32,
365        distance_metric: &str,
366        quant_level: &str,
367    ) {
368        self.scan.duration_us = duration.as_micros() as u64;
369        self.scan.codes_evaluated = codes_evaluated;
370        self.scan.ram_bytes_read = ram_bytes;
371        self.scan.simd_ops = simd_ops;
372        self.scan.candidates_after_stage1 = candidates_stage1;
373        self.scan.distance_metric = distance_metric.to_string();
374        self.scan.quant_level = quant_level.to_string();
375    }
376
377    /// Record rerank phase
378    pub fn record_rerank(
379        &mut self,
380        duration: Duration,
381        candidates_in: u32,
382        candidates_out: u32,
383        ssd_random_reads: u32,
384        ssd_sequential_bytes: u64,
385    ) {
386        self.rerank.duration_us = duration.as_micros() as u64;
387        self.rerank.candidates_in = candidates_in;
388        self.rerank.candidates_out = candidates_out;
389        self.rerank.ssd_random_reads = ssd_random_reads;
390        self.rerank.ssd_sequential_bytes = ssd_sequential_bytes;
391    }
392
393    /// Record IO coalescing details
394    pub fn record_io_coalescing(&mut self, coalesced: bool, ranges: u32) {
395        self.rerank.io_coalesced = coalesced;
396        self.rerank.coalesced_ranges = ranges;
397    }
398
399    /// Record cache hits/misses
400    pub fn record_cache_hit(&mut self, cache_type: CacheType) {
401        match cache_type {
402            CacheType::Centroid => self.cache.centroid_cache_hits += 1,
403            CacheType::Vector => self.cache.vector_cache_hits += 1,
404            CacheType::Distance => self.cache.distance_cache_hits += 1,
405        }
406    }
407
408    /// Record cache miss
409    pub fn record_cache_miss(&mut self, cache_type: CacheType) {
410        match cache_type {
411            CacheType::Centroid => self.cache.centroid_cache_misses += 1,
412            CacheType::Vector => self.cache.vector_cache_misses += 1,
413            CacheType::Distance => self.cache.distance_cache_misses += 1,
414        }
415    }
416
417    /// Set guarantee mode
418    pub fn set_guarantee_mode(&mut self, mode: &GuaranteeMode) {
419        self.error_envelope.guarantee_mode = format!("{:?}", mode);
420        self.error_envelope.error_quantile = mode.error_quantile();
421    }
422
423    /// Record error bounds observed
424    pub fn record_error_bounds(&mut self, max_error: f32, mean_error: f32) {
425        self.error_envelope.max_error_observed = max_error;
426        self.error_envelope.mean_error = mean_error;
427    }
428
429    /// Set stop reason
430    pub fn set_stop_reason(&mut self, reason: StopReason, probes: u32, max_probes: u32) {
431        self.termination.stop_reason = format!("{:?}", reason);
432        self.termination.probes_at_stop = probes;
433        self.termination.max_probes = max_probes;
434        self.termination.budget_exhausted = matches!(reason, StopReason::BudgetExhausted);
435    }
436
437    /// Set miss probability
438    pub fn set_miss_probability(&mut self, prob: f32) {
439        self.termination.miss_probability = Some(prob);
440    }
441
442    /// Set result count
443    pub fn set_result_count(&mut self, count: u32) {
444        self.termination.result_count = count;
445    }
446
447    /// Attach cost summary
448    pub fn attach_cost(&mut self, summary: CostSummary) {
449        self.cost = Some(summary.into());
450    }
451
452    /// Add a custom tag
453    pub fn add_tag(&mut self, key: &str, value: &str) {
454        self.tags.insert(key.to_string(), value.to_string());
455    }
456
457    /// Finalize telemetry (compute total duration)
458    pub fn finalize(&mut self) {
459        if let Some(start) = self.start_time.take() {
460            self.total_duration_us = start.elapsed().as_micros() as u64;
461        }
462    }
463
464    /// Serialize to JSON
465    pub fn to_json(&self) -> String {
466        serde_json::to_string(self).unwrap_or_else(|_| "{}".to_string())
467    }
468
469    /// Serialize to pretty JSON
470    pub fn to_json_pretty(&self) -> String {
471        serde_json::to_string_pretty(self).unwrap_or_else(|_| "{}".to_string())
472    }
473}
474
475/// Cache type for hit/miss tracking
476#[derive(Debug, Clone, Copy)]
477pub enum CacheType {
478    Centroid,
479    Vector,
480    Distance,
481}
482
483// ============================================================================
484// Telemetry Collector
485// ============================================================================
486
487/// Thread-safe telemetry collector with aggregation
488pub struct TelemetryCollector {
489    /// Collected telemetry entries
490    entries: parking_lot::RwLock<Vec<QueryTelemetry>>,
491
492    /// Maximum entries to keep in memory
493    max_entries: usize,
494
495    /// Callback for emitting telemetry
496    emit_callback: parking_lot::RwLock<Option<Box<dyn Fn(&QueryTelemetry) + Send + Sync>>>,
497}
498
499impl TelemetryCollector {
500    /// Create new collector
501    pub fn new(max_entries: usize) -> Self {
502        Self {
503            entries: parking_lot::RwLock::new(Vec::with_capacity(max_entries)),
504            max_entries,
505            emit_callback: parking_lot::RwLock::new(None),
506        }
507    }
508
509    /// Set callback for emitting telemetry
510    pub fn set_emit_callback<F>(&self, callback: F)
511    where
512        F: Fn(&QueryTelemetry) + Send + Sync + 'static,
513    {
514        *self.emit_callback.write() = Some(Box::new(callback));
515    }
516
517    /// Record telemetry
518    pub fn record(&self, mut telemetry: QueryTelemetry) {
519        telemetry.finalize();
520
521        // Emit via callback
522        if let Some(callback) = &*self.emit_callback.read() {
523            callback(&telemetry);
524        }
525
526        // Store in memory
527        let mut entries = self.entries.write();
528        if entries.len() >= self.max_entries {
529            entries.remove(0);
530        }
531        entries.push(telemetry);
532    }
533
534    /// Get recent entries
535    pub fn recent(&self, count: usize) -> Vec<QueryTelemetry> {
536        let entries = self.entries.read();
537        let start = entries.len().saturating_sub(count);
538        entries[start..].to_vec()
539    }
540
541    /// Compute aggregate statistics
542    pub fn aggregate(&self) -> TelemetryAggregate {
543        let entries = self.entries.read();
544
545        if entries.is_empty() {
546            return TelemetryAggregate::default();
547        }
548
549        let n = entries.len();
550        let mut durations: Vec<u64> = entries.iter().map(|e| e.total_duration_us).collect();
551        durations.sort_unstable();
552
553        let total_duration: u64 = durations.iter().sum();
554        let p50 = durations[n / 2];
555        let p99 = durations[(n * 99) / 100];
556        let max = durations[n - 1];
557
558        let total_ram_bytes: u64 = entries.iter().map(|e| e.scan.ram_bytes_read).sum();
559        let total_codes: u64 = entries.iter().map(|e| e.scan.codes_evaluated).sum();
560
561        let budget_exhausted = entries
562            .iter()
563            .filter(|e| e.termination.budget_exhausted)
564            .count();
565
566        TelemetryAggregate {
567            query_count: n,
568            mean_duration_us: total_duration / n as u64,
569            p50_duration_us: p50,
570            p99_duration_us: p99,
571            max_duration_us: max,
572            total_ram_bytes_read: total_ram_bytes,
573            total_codes_evaluated: total_codes,
574            budget_exhausted_count: budget_exhausted,
575            cache_hit_ratio: entries.iter().map(|e| e.cache.hit_ratio()).sum::<f32>() / n as f32,
576        }
577    }
578
579    /// Clear all entries
580    pub fn clear(&self) {
581        self.entries.write().clear();
582    }
583}
584
585impl Default for TelemetryCollector {
586    fn default() -> Self {
587        Self::new(10000)
588    }
589}
590
591/// Aggregate telemetry statistics
592#[derive(Debug, Clone, Default, Serialize, Deserialize)]
593pub struct TelemetryAggregate {
594    pub query_count: usize,
595    pub mean_duration_us: u64,
596    pub p50_duration_us: u64,
597    pub p99_duration_us: u64,
598    pub max_duration_us: u64,
599    pub total_ram_bytes_read: u64,
600    pub total_codes_evaluated: u64,
601    pub budget_exhausted_count: usize,
602    pub cache_hit_ratio: f32,
603}
604
605// ============================================================================
606// Helpers
607// ============================================================================
608
609/// Generate a simple UUID-like string
610fn uuid_v4() -> String {
611    use std::time::{SystemTime, UNIX_EPOCH};
612    let now = SystemTime::now()
613        .duration_since(UNIX_EPOCH)
614        .unwrap()
615        .as_nanos();
616    format!("{:032x}", now)
617}
618
619#[cfg(test)]
620mod tests {
621    use super::*;
622
623    #[test]
624    fn test_telemetry_creation() {
625        let mut telemetry = QueryTelemetry::new("test");
626
627        telemetry.record_routing(Duration::from_micros(500), 100, 16);
628        telemetry.record_scan(10000, 16 * 1024 * 1024);
629        telemetry.record_rerank(Duration::from_micros(1000), 100, 10, 0, 0);
630
631        telemetry.finalize();
632
633        // Duration might be 0 on very fast systems; it is unsigned so just
634        // ensure it was recorded.
635        let _ = telemetry.total_duration_us;
636        assert_eq!(telemetry.routing.lists_considered, 100);
637        assert_eq!(telemetry.scan.codes_evaluated, 10000);
638    }
639
640    #[test]
641    fn test_telemetry_json() {
642        let mut telemetry = QueryTelemetry::new("balanced");
643        telemetry.record_routing(Duration::from_micros(100), 50, 8);
644        telemetry.finalize();
645
646        let json = telemetry.to_json();
647        assert!(json.contains("balanced"));
648        assert!(json.contains("lists_considered"));
649    }
650
651    #[test]
652    fn test_collector() {
653        let collector = TelemetryCollector::new(100);
654
655        for i in 0..10 {
656            let mut t = QueryTelemetry::new("test");
657            t.total_duration_us = i * 100;
658            collector.record(t);
659        }
660
661        let recent = collector.recent(5);
662        assert_eq!(recent.len(), 5);
663
664        let agg = collector.aggregate();
665        assert_eq!(agg.query_count, 10);
666    }
667
668    #[test]
669    fn test_cache_hit_ratio() {
670        let mut cache = CacheMetrics::default();
671        cache.centroid_cache_hits = 80;
672        cache.centroid_cache_misses = 20;
673
674        assert!((cache.hit_ratio() - 0.8).abs() < 0.01);
675    }
676}