sochdb-vector 2.0.6

Streaming elimination vector search engine for SochDB - CPU-first ANN with RDF + BPS
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
// SPDX-License-Identifier: AGPL-3.0-or-later
// SochDB - LLM-Optimized Embedded Database
// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

//! # Per-Query Telemetry (Task 10)
//!
//! Structured telemetry for every query to enable:
//! - Optimization: falsifiable hypotheses about performance
//! - Regression detection: automatic SLA monitoring
//! - Explainability: "why was this query slow?"
//!
//! ## Metrics Captured
//!
//! - Routing: time, lists considered/scanned
//! - Scan: codes evaluated, RAM bytes read
//! - Rerank: candidates, SSD ops/bytes
//! - Cache: hit ratio
//! - Error: estimated ε envelope used
//! - Stop: termination mode and reason
//!
//! ## Usage
//!
//! ```rust,ignore
//! use sochdb_vector::query_telemetry::{QueryTelemetry, TelemetryCollector};
//!
//! let mut telemetry = QueryTelemetry::new("search_v1");
//! telemetry.record_routing(Duration::from_micros(500), 100, 16);
//! telemetry.record_scan(1024, 16 * 1024 * 1024);
//! telemetry.set_stop_reason(StopReason::BoundSatisfied);
//!
//! // Emit structured telemetry
//! let json = telemetry.to_json();
//! ```

use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::{Duration, Instant};

use crate::cost_model::CostSummary;
use crate::guarantee_ladder::{GuaranteeMode, StopReason};

// ============================================================================
// Query Telemetry
// ============================================================================

/// Comprehensive per-query telemetry
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueryTelemetry {
    /// Query identifier (for correlation)
    pub query_id: String,

    /// Query class (e.g., "low_latency", "high_recall")
    pub query_class: String,

    /// Timestamp when query started
    #[serde(skip)]
    pub start_time: Option<Instant>,

    /// Total query duration
    pub total_duration_us: u64,

    /// Routing phase metrics
    pub routing: RoutingMetrics,

    /// Scan phase metrics
    pub scan: ScanMetrics,

    /// Rerank phase metrics
    pub rerank: RerankMetrics,

    /// Cache metrics
    pub cache: CacheMetrics,

    /// Error envelope metrics
    pub error_envelope: ErrorEnvelopeMetrics,

    /// Termination metrics
    pub termination: TerminationMetrics,

    /// Cost summary (if budget tracking enabled)
    pub cost: Option<CostSummaryJson>,

    /// Custom tags for filtering/grouping
    pub tags: HashMap<String, String>,
}

/// Routing phase metrics
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct RoutingMetrics {
    /// Time spent in routing phase
    pub duration_us: u64,

    /// Total lists/partitions considered
    pub lists_considered: u32,

    /// Lists actually scanned
    pub lists_scanned: u32,

    /// Centroid comparisons performed
    pub centroid_comparisons: u32,

    /// Whether routing used compressed centroids
    pub used_compressed_centroids: bool,

    /// Routing strategy used
    pub strategy: String,
}

/// Scan phase metrics
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ScanMetrics {
    /// Time spent in scan phase
    pub duration_us: u64,

    /// Number of codes/vectors evaluated
    pub codes_evaluated: u64,

    /// RAM bytes read
    pub ram_bytes_read: u64,

    /// Number of SIMD operations
    pub simd_ops: u64,

    /// Vectors passing first-stage filter
    pub candidates_after_stage1: u32,

    /// Distance metric used
    pub distance_metric: String,

    /// Quantization level used
    pub quant_level: String,
}

/// Rerank phase metrics
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct RerankMetrics {
    /// Time spent in rerank phase
    pub duration_us: u64,

    /// Candidates entering rerank
    pub candidates_in: u32,

    /// Candidates after rerank
    pub candidates_out: u32,

    /// SSD random read operations
    pub ssd_random_reads: u32,

    /// SSD sequential bytes read
    pub ssd_sequential_bytes: u64,

    /// Whether IO was coalesced
    pub io_coalesced: bool,

    /// Number of IO ranges after coalescing
    pub coalesced_ranges: u32,

    /// Full-precision distance computations
    pub full_precision_distances: u32,
}

/// Cache metrics
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct CacheMetrics {
    /// Centroid cache hits
    pub centroid_cache_hits: u32,

    /// Centroid cache misses
    pub centroid_cache_misses: u32,

    /// Vector cache hits
    pub vector_cache_hits: u32,

    /// Vector cache misses
    pub vector_cache_misses: u32,

    /// Distance cache hits
    pub distance_cache_hits: u32,

    /// Distance cache misses
    pub distance_cache_misses: u32,
}

impl CacheMetrics {
    /// Compute overall cache hit ratio
    pub fn hit_ratio(&self) -> f32 {
        let total_hits =
            self.centroid_cache_hits + self.vector_cache_hits + self.distance_cache_hits;
        let total_misses =
            self.centroid_cache_misses + self.vector_cache_misses + self.distance_cache_misses;
        let total = total_hits + total_misses;
        if total == 0 {
            1.0
        } else {
            total_hits as f32 / total as f32
        }
    }
}

/// Error envelope metrics
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ErrorEnvelopeMetrics {
    /// Guarantee mode used
    pub guarantee_mode: String,

    /// Error quantile used (for calibrated mode)
    pub error_quantile: Option<f32>,

    /// Maximum error bound observed
    pub max_error_observed: f32,

    /// Mean error bound
    pub mean_error: f32,

    /// Number of candidates with tight bounds
    pub tight_bound_candidates: u32,

    /// Number of candidates with loose bounds
    pub loose_bound_candidates: u32,
}

/// Termination metrics
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct TerminationMetrics {
    /// Stop reason code
    pub stop_reason: String,

    /// Probes completed when stopped
    pub probes_at_stop: u32,

    /// Max probes allowed
    pub max_probes: u32,

    /// Whether budget was exhausted
    pub budget_exhausted: bool,

    /// Estimated miss probability (for calibrated mode)
    pub miss_probability: Option<f32>,

    /// Final result count
    pub result_count: u32,
}

/// JSON-serializable cost summary
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CostSummaryJson {
    pub query_class: String,
    pub ram_bytes_used: u64,
    pub ram_bytes_limit: u64,
    pub ssd_random_reads_used: u32,
    pub ssd_random_reads_limit: u32,
    pub ssd_sequential_bytes_used: u64,
    pub ssd_sequential_bytes_limit: u64,
    pub cpu_cycles_used: u64,
    pub cpu_cycles_limit: u64,
    pub elapsed_us: u64,
    pub latency_target_us: u64,
    pub exhausted: bool,
    pub exhaustion_reason: Option<String>,
}

impl From<CostSummary> for CostSummaryJson {
    fn from(summary: CostSummary) -> Self {
        Self {
            query_class: summary.query_class,
            ram_bytes_used: summary.ram_bytes_used,
            ram_bytes_limit: summary.ram_bytes_limit,
            ssd_random_reads_used: summary.ssd_random_reads_used,
            ssd_random_reads_limit: summary.ssd_random_reads_limit,
            ssd_sequential_bytes_used: summary.ssd_sequential_bytes_used,
            ssd_sequential_bytes_limit: summary.ssd_sequential_bytes_limit,
            cpu_cycles_used: summary.cpu_cycles_used,
            cpu_cycles_limit: summary.cpu_cycles_limit,
            elapsed_us: summary.elapsed.as_micros() as u64,
            latency_target_us: summary.latency_target.as_micros() as u64,
            exhausted: summary.exhausted,
            exhaustion_reason: summary.exhaustion_reason.map(|r| format!("{:?}", r)),
        }
    }
}

impl QueryTelemetry {
    /// Create new telemetry for a query
    pub fn new(query_class: &str) -> Self {
        Self {
            query_id: uuid_v4(),
            query_class: query_class.to_string(),
            start_time: Some(Instant::now()),
            total_duration_us: 0,
            routing: RoutingMetrics::default(),
            scan: ScanMetrics::default(),
            rerank: RerankMetrics::default(),
            cache: CacheMetrics::default(),
            error_envelope: ErrorEnvelopeMetrics::default(),
            termination: TerminationMetrics::default(),
            cost: None,
            tags: HashMap::new(),
        }
    }

    /// Create with specific query ID
    pub fn with_id(query_id: &str, query_class: &str) -> Self {
        let mut t = Self::new(query_class);
        t.query_id = query_id.to_string();
        t
    }

    /// Record routing phase
    pub fn record_routing(
        &mut self,
        duration: Duration,
        lists_considered: u32,
        lists_scanned: u32,
    ) {
        self.routing.duration_us = duration.as_micros() as u64;
        self.routing.lists_considered = lists_considered;
        self.routing.lists_scanned = lists_scanned;
    }

    /// Record routing with full details
    pub fn record_routing_full(
        &mut self,
        duration: Duration,
        lists_considered: u32,
        lists_scanned: u32,
        centroid_comparisons: u32,
        used_compressed: bool,
        strategy: &str,
    ) {
        self.routing.duration_us = duration.as_micros() as u64;
        self.routing.lists_considered = lists_considered;
        self.routing.lists_scanned = lists_scanned;
        self.routing.centroid_comparisons = centroid_comparisons;
        self.routing.used_compressed_centroids = used_compressed;
        self.routing.strategy = strategy.to_string();
    }

    /// Record scan phase
    pub fn record_scan(&mut self, codes_evaluated: u64, ram_bytes: u64) {
        self.scan.codes_evaluated = codes_evaluated;
        self.scan.ram_bytes_read = ram_bytes;
    }

    /// Record scan with full details
    pub fn record_scan_full(
        &mut self,
        duration: Duration,
        codes_evaluated: u64,
        ram_bytes: u64,
        simd_ops: u64,
        candidates_stage1: u32,
        distance_metric: &str,
        quant_level: &str,
    ) {
        self.scan.duration_us = duration.as_micros() as u64;
        self.scan.codes_evaluated = codes_evaluated;
        self.scan.ram_bytes_read = ram_bytes;
        self.scan.simd_ops = simd_ops;
        self.scan.candidates_after_stage1 = candidates_stage1;
        self.scan.distance_metric = distance_metric.to_string();
        self.scan.quant_level = quant_level.to_string();
    }

    /// Record rerank phase
    pub fn record_rerank(
        &mut self,
        duration: Duration,
        candidates_in: u32,
        candidates_out: u32,
        ssd_random_reads: u32,
        ssd_sequential_bytes: u64,
    ) {
        self.rerank.duration_us = duration.as_micros() as u64;
        self.rerank.candidates_in = candidates_in;
        self.rerank.candidates_out = candidates_out;
        self.rerank.ssd_random_reads = ssd_random_reads;
        self.rerank.ssd_sequential_bytes = ssd_sequential_bytes;
    }

    /// Record IO coalescing details
    pub fn record_io_coalescing(&mut self, coalesced: bool, ranges: u32) {
        self.rerank.io_coalesced = coalesced;
        self.rerank.coalesced_ranges = ranges;
    }

    /// Record cache hits/misses
    pub fn record_cache_hit(&mut self, cache_type: CacheType) {
        match cache_type {
            CacheType::Centroid => self.cache.centroid_cache_hits += 1,
            CacheType::Vector => self.cache.vector_cache_hits += 1,
            CacheType::Distance => self.cache.distance_cache_hits += 1,
        }
    }

    /// Record cache miss
    pub fn record_cache_miss(&mut self, cache_type: CacheType) {
        match cache_type {
            CacheType::Centroid => self.cache.centroid_cache_misses += 1,
            CacheType::Vector => self.cache.vector_cache_misses += 1,
            CacheType::Distance => self.cache.distance_cache_misses += 1,
        }
    }

    /// Set guarantee mode
    pub fn set_guarantee_mode(&mut self, mode: &GuaranteeMode) {
        self.error_envelope.guarantee_mode = format!("{:?}", mode);
        self.error_envelope.error_quantile = mode.error_quantile();
    }

    /// Record error bounds observed
    pub fn record_error_bounds(&mut self, max_error: f32, mean_error: f32) {
        self.error_envelope.max_error_observed = max_error;
        self.error_envelope.mean_error = mean_error;
    }

    /// Set stop reason
    pub fn set_stop_reason(&mut self, reason: StopReason, probes: u32, max_probes: u32) {
        self.termination.stop_reason = format!("{:?}", reason);
        self.termination.probes_at_stop = probes;
        self.termination.max_probes = max_probes;
        self.termination.budget_exhausted = matches!(reason, StopReason::BudgetExhausted);
    }

    /// Set miss probability
    pub fn set_miss_probability(&mut self, prob: f32) {
        self.termination.miss_probability = Some(prob);
    }

    /// Set result count
    pub fn set_result_count(&mut self, count: u32) {
        self.termination.result_count = count;
    }

    /// Attach cost summary
    pub fn attach_cost(&mut self, summary: CostSummary) {
        self.cost = Some(summary.into());
    }

    /// Add a custom tag
    pub fn add_tag(&mut self, key: &str, value: &str) {
        self.tags.insert(key.to_string(), value.to_string());
    }

    /// Finalize telemetry (compute total duration)
    pub fn finalize(&mut self) {
        if let Some(start) = self.start_time.take() {
            self.total_duration_us = start.elapsed().as_micros() as u64;
        }
    }

    /// Serialize to JSON
    pub fn to_json(&self) -> String {
        serde_json::to_string(self).unwrap_or_else(|_| "{}".to_string())
    }

    /// Serialize to pretty JSON
    pub fn to_json_pretty(&self) -> String {
        serde_json::to_string_pretty(self).unwrap_or_else(|_| "{}".to_string())
    }
}

/// Cache type for hit/miss tracking
#[derive(Debug, Clone, Copy)]
pub enum CacheType {
    Centroid,
    Vector,
    Distance,
}

// ============================================================================
// Telemetry Collector
// ============================================================================

/// Thread-safe telemetry collector with aggregation
pub struct TelemetryCollector {
    /// Collected telemetry entries
    entries: parking_lot::RwLock<Vec<QueryTelemetry>>,

    /// Maximum entries to keep in memory
    max_entries: usize,

    /// Callback for emitting telemetry
    emit_callback: parking_lot::RwLock<Option<Box<dyn Fn(&QueryTelemetry) + Send + Sync>>>,
}

impl TelemetryCollector {
    /// Create new collector
    pub fn new(max_entries: usize) -> Self {
        Self {
            entries: parking_lot::RwLock::new(Vec::with_capacity(max_entries)),
            max_entries,
            emit_callback: parking_lot::RwLock::new(None),
        }
    }

    /// Set callback for emitting telemetry
    pub fn set_emit_callback<F>(&self, callback: F)
    where
        F: Fn(&QueryTelemetry) + Send + Sync + 'static,
    {
        *self.emit_callback.write() = Some(Box::new(callback));
    }

    /// Record telemetry
    pub fn record(&self, mut telemetry: QueryTelemetry) {
        telemetry.finalize();

        // Emit via callback
        if let Some(callback) = &*self.emit_callback.read() {
            callback(&telemetry);
        }

        // Store in memory
        let mut entries = self.entries.write();
        if entries.len() >= self.max_entries {
            entries.remove(0);
        }
        entries.push(telemetry);
    }

    /// Get recent entries
    pub fn recent(&self, count: usize) -> Vec<QueryTelemetry> {
        let entries = self.entries.read();
        let start = entries.len().saturating_sub(count);
        entries[start..].to_vec()
    }

    /// Compute aggregate statistics
    pub fn aggregate(&self) -> TelemetryAggregate {
        let entries = self.entries.read();

        if entries.is_empty() {
            return TelemetryAggregate::default();
        }

        let n = entries.len();
        let mut durations: Vec<u64> = entries.iter().map(|e| e.total_duration_us).collect();
        durations.sort_unstable();

        let total_duration: u64 = durations.iter().sum();
        let p50 = durations[n / 2];
        let p99 = durations[(n * 99) / 100];
        let max = durations[n - 1];

        let total_ram_bytes: u64 = entries.iter().map(|e| e.scan.ram_bytes_read).sum();
        let total_codes: u64 = entries.iter().map(|e| e.scan.codes_evaluated).sum();

        let budget_exhausted = entries
            .iter()
            .filter(|e| e.termination.budget_exhausted)
            .count();

        TelemetryAggregate {
            query_count: n,
            mean_duration_us: total_duration / n as u64,
            p50_duration_us: p50,
            p99_duration_us: p99,
            max_duration_us: max,
            total_ram_bytes_read: total_ram_bytes,
            total_codes_evaluated: total_codes,
            budget_exhausted_count: budget_exhausted,
            cache_hit_ratio: entries.iter().map(|e| e.cache.hit_ratio()).sum::<f32>() / n as f32,
        }
    }

    /// Clear all entries
    pub fn clear(&self) {
        self.entries.write().clear();
    }
}

impl Default for TelemetryCollector {
    fn default() -> Self {
        Self::new(10000)
    }
}

/// Aggregate telemetry statistics
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct TelemetryAggregate {
    pub query_count: usize,
    pub mean_duration_us: u64,
    pub p50_duration_us: u64,
    pub p99_duration_us: u64,
    pub max_duration_us: u64,
    pub total_ram_bytes_read: u64,
    pub total_codes_evaluated: u64,
    pub budget_exhausted_count: usize,
    pub cache_hit_ratio: f32,
}

// ============================================================================
// Helpers
// ============================================================================

/// Generate a simple UUID-like string
fn uuid_v4() -> String {
    use std::time::{SystemTime, UNIX_EPOCH};
    let now = SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap()
        .as_nanos();
    format!("{:032x}", now)
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_telemetry_creation() {
        let mut telemetry = QueryTelemetry::new("test");

        telemetry.record_routing(Duration::from_micros(500), 100, 16);
        telemetry.record_scan(10000, 16 * 1024 * 1024);
        telemetry.record_rerank(Duration::from_micros(1000), 100, 10, 0, 0);

        telemetry.finalize();

        // Duration might be 0 on very fast systems; it is unsigned so just
        // ensure it was recorded.
        let _ = telemetry.total_duration_us;
        assert_eq!(telemetry.routing.lists_considered, 100);
        assert_eq!(telemetry.scan.codes_evaluated, 10000);
    }

    #[test]
    fn test_telemetry_json() {
        let mut telemetry = QueryTelemetry::new("balanced");
        telemetry.record_routing(Duration::from_micros(100), 50, 8);
        telemetry.finalize();

        let json = telemetry.to_json();
        assert!(json.contains("balanced"));
        assert!(json.contains("lists_considered"));
    }

    #[test]
    fn test_collector() {
        let collector = TelemetryCollector::new(100);

        for i in 0..10 {
            let mut t = QueryTelemetry::new("test");
            t.total_duration_us = i * 100;
            collector.record(t);
        }

        let recent = collector.recent(5);
        assert_eq!(recent.len(), 5);

        let agg = collector.aggregate();
        assert_eq!(agg.query_count, 10);
    }

    #[test]
    fn test_cache_hit_ratio() {
        let mut cache = CacheMetrics::default();
        cache.centroid_cache_hits = 80;
        cache.centroid_cache_misses = 20;

        assert!((cache.hit_ratio() - 0.8).abs() < 0.01);
    }
}