ipfrs_semantic/
stats.rs

1//! Index statistics and monitoring
2//!
3//! This module provides comprehensive statistics collection and monitoring
4//! for vector indexes, enabling performance analysis and optimization.
5
6use serde::{Deserialize, Serialize};
7use std::collections::VecDeque;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::{Arc, RwLock};
10use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
11
12/// Index statistics collector
13#[derive(Default)]
14pub struct IndexStats {
15    /// Total number of inserts
16    insert_count: AtomicU64,
17    /// Total number of deletes
18    delete_count: AtomicU64,
19    /// Total number of searches
20    search_count: AtomicU64,
21    /// Search latency histogram
22    search_latencies: Arc<RwLock<LatencyHistogram>>,
23    /// Insert latency histogram
24    insert_latencies: Arc<RwLock<LatencyHistogram>>,
25    /// Cache hit count
26    cache_hits: AtomicU64,
27    /// Cache miss count
28    cache_misses: AtomicU64,
29    /// Timestamp when stats started collecting
30    start_time: u64,
31    /// Recent query log for analysis
32    recent_queries: Arc<RwLock<VecDeque<QueryRecord>>>,
33    /// Maximum recent queries to keep
34    max_recent_queries: usize,
35}
36
37impl IndexStats {
38    /// Create a new stats collector
39    pub fn new() -> Self {
40        Self {
41            insert_count: AtomicU64::new(0),
42            delete_count: AtomicU64::new(0),
43            search_count: AtomicU64::new(0),
44            search_latencies: Arc::new(RwLock::new(LatencyHistogram::new())),
45            insert_latencies: Arc::new(RwLock::new(LatencyHistogram::new())),
46            cache_hits: AtomicU64::new(0),
47            cache_misses: AtomicU64::new(0),
48            start_time: SystemTime::now()
49                .duration_since(UNIX_EPOCH)
50                .unwrap_or_default()
51                .as_secs(),
52            recent_queries: Arc::new(RwLock::new(VecDeque::new())),
53            max_recent_queries: 1000,
54        }
55    }
56
57    /// Record an insert operation
58    pub fn record_insert(&self, duration: Duration) {
59        self.insert_count.fetch_add(1, Ordering::Relaxed);
60        self.insert_latencies
61            .write()
62            .unwrap()
63            .record(duration.as_micros() as u64);
64    }
65
66    /// Record a delete operation
67    pub fn record_delete(&self) {
68        self.delete_count.fetch_add(1, Ordering::Relaxed);
69    }
70
71    /// Record a search operation
72    pub fn record_search(&self, duration: Duration, k: usize, result_count: usize) {
73        self.search_count.fetch_add(1, Ordering::Relaxed);
74        self.search_latencies
75            .write()
76            .unwrap()
77            .record(duration.as_micros() as u64);
78
79        // Record query details
80        let mut queries = self.recent_queries.write().unwrap();
81        if queries.len() >= self.max_recent_queries {
82            queries.pop_front();
83        }
84        queries.push_back(QueryRecord {
85            timestamp: SystemTime::now()
86                .duration_since(UNIX_EPOCH)
87                .unwrap_or_default()
88                .as_secs(),
89            latency_us: duration.as_micros() as u64,
90            k,
91            result_count,
92        });
93    }
94
95    /// Record a cache hit
96    pub fn record_cache_hit(&self) {
97        self.cache_hits.fetch_add(1, Ordering::Relaxed);
98    }
99
100    /// Record a cache miss
101    pub fn record_cache_miss(&self) {
102        self.cache_misses.fetch_add(1, Ordering::Relaxed);
103    }
104
105    /// Get a snapshot of current statistics
106    pub fn snapshot(&self) -> StatsSnapshot {
107        let search_latencies = self.search_latencies.read().unwrap();
108        let insert_latencies = self.insert_latencies.read().unwrap();
109
110        let cache_hits = self.cache_hits.load(Ordering::Relaxed);
111        let cache_misses = self.cache_misses.load(Ordering::Relaxed);
112        let total_cache = cache_hits + cache_misses;
113
114        StatsSnapshot {
115            insert_count: self.insert_count.load(Ordering::Relaxed),
116            delete_count: self.delete_count.load(Ordering::Relaxed),
117            search_count: self.search_count.load(Ordering::Relaxed),
118            search_latency_p50: search_latencies.percentile(50),
119            search_latency_p90: search_latencies.percentile(90),
120            search_latency_p99: search_latencies.percentile(99),
121            search_latency_avg: search_latencies.average(),
122            insert_latency_avg: insert_latencies.average(),
123            cache_hit_rate: if total_cache > 0 {
124                cache_hits as f64 / total_cache as f64
125            } else {
126                0.0
127            },
128            uptime_seconds: SystemTime::now()
129                .duration_since(UNIX_EPOCH)
130                .unwrap_or_default()
131                .as_secs()
132                - self.start_time,
133        }
134    }
135
136    /// Reset all statistics
137    pub fn reset(&self) {
138        self.insert_count.store(0, Ordering::Relaxed);
139        self.delete_count.store(0, Ordering::Relaxed);
140        self.search_count.store(0, Ordering::Relaxed);
141        self.search_latencies.write().unwrap().reset();
142        self.insert_latencies.write().unwrap().reset();
143        self.cache_hits.store(0, Ordering::Relaxed);
144        self.cache_misses.store(0, Ordering::Relaxed);
145        self.recent_queries.write().unwrap().clear();
146    }
147
148    /// Get recent query records
149    pub fn recent_queries(&self) -> Vec<QueryRecord> {
150        self.recent_queries
151            .read()
152            .unwrap()
153            .iter()
154            .cloned()
155            .collect()
156    }
157
158    /// Calculate queries per second (QPS)
159    pub fn qps(&self) -> f64 {
160        let uptime = SystemTime::now()
161            .duration_since(UNIX_EPOCH)
162            .unwrap_or_default()
163            .as_secs()
164            - self.start_time;
165
166        if uptime > 0 {
167            self.search_count.load(Ordering::Relaxed) as f64 / uptime as f64
168        } else {
169            0.0
170        }
171    }
172}
173
174/// Statistics snapshot at a point in time
175#[derive(Debug, Clone, Serialize, Deserialize)]
176pub struct StatsSnapshot {
177    /// Total insert operations
178    pub insert_count: u64,
179    /// Total delete operations
180    pub delete_count: u64,
181    /// Total search operations
182    pub search_count: u64,
183    /// Search latency P50 (microseconds)
184    pub search_latency_p50: u64,
185    /// Search latency P90 (microseconds)
186    pub search_latency_p90: u64,
187    /// Search latency P99 (microseconds)
188    pub search_latency_p99: u64,
189    /// Average search latency (microseconds)
190    pub search_latency_avg: u64,
191    /// Average insert latency (microseconds)
192    pub insert_latency_avg: u64,
193    /// Cache hit rate (0.0 to 1.0)
194    pub cache_hit_rate: f64,
195    /// Uptime in seconds
196    pub uptime_seconds: u64,
197}
198
199impl StatsSnapshot {
200    /// Format latency as human-readable string
201    pub fn format_latency(us: u64) -> String {
202        if us < 1000 {
203            format!("{}µs", us)
204        } else if us < 1_000_000 {
205            format!("{:.2}ms", us as f64 / 1000.0)
206        } else {
207            format!("{:.2}s", us as f64 / 1_000_000.0)
208        }
209    }
210
211    /// Get a summary string
212    pub fn summary(&self) -> String {
213        format!(
214            "Searches: {} (P50: {}, P99: {}), Inserts: {}, Cache: {:.1}%",
215            self.search_count,
216            Self::format_latency(self.search_latency_p50),
217            Self::format_latency(self.search_latency_p99),
218            self.insert_count,
219            self.cache_hit_rate * 100.0
220        )
221    }
222}
223
224/// Query record for analysis
225#[derive(Debug, Clone, Serialize, Deserialize)]
226pub struct QueryRecord {
227    /// Unix timestamp
228    pub timestamp: u64,
229    /// Latency in microseconds
230    pub latency_us: u64,
231    /// K parameter (number of results requested)
232    pub k: usize,
233    /// Actual result count
234    pub result_count: usize,
235}
236
237/// Latency histogram for percentile calculations
238#[derive(Default)]
239pub struct LatencyHistogram {
240    /// Sorted latencies (in microseconds)
241    values: Vec<u64>,
242    /// Sum for average calculation
243    sum: u64,
244    /// Count
245    count: u64,
246}
247
248impl LatencyHistogram {
249    /// Create a new histogram
250    pub fn new() -> Self {
251        Self::default()
252    }
253
254    /// Record a latency value
255    pub fn record(&mut self, value_us: u64) {
256        // Keep sorted for percentile calculation
257        let pos = self.values.binary_search(&value_us).unwrap_or_else(|i| i);
258        self.values.insert(pos, value_us);
259
260        self.sum += value_us;
261        self.count += 1;
262
263        // Keep bounded to avoid memory growth
264        if self.values.len() > 10000 {
265            // Remove oldest values (this is approximate)
266            self.values.drain(0..1000);
267        }
268    }
269
270    /// Get percentile value
271    pub fn percentile(&self, p: u8) -> u64 {
272        if self.values.is_empty() {
273            return 0;
274        }
275
276        let idx = ((p as usize) * self.values.len() / 100).min(self.values.len() - 1);
277        self.values[idx]
278    }
279
280    /// Get average value
281    pub fn average(&self) -> u64 {
282        if self.count == 0 {
283            return 0;
284        }
285        self.sum / self.count
286    }
287
288    /// Reset the histogram
289    pub fn reset(&mut self) {
290        self.values.clear();
291        self.sum = 0;
292        self.count = 0;
293    }
294
295    /// Get total count
296    pub fn count(&self) -> u64 {
297        self.count
298    }
299}
300
301/// Index health metrics
302#[derive(Debug, Clone, Serialize, Deserialize)]
303pub struct IndexHealth {
304    /// Index size (number of vectors)
305    pub size: usize,
306    /// Estimated memory usage (bytes)
307    pub memory_bytes: usize,
308    /// Vector dimension
309    pub dimension: usize,
310    /// Average connectivity (HNSW specific)
311    pub avg_connectivity: Option<f32>,
312    /// Search recall estimate (if available)
313    pub recall_estimate: Option<f32>,
314    /// Overall health score (0.0 to 1.0)
315    pub health_score: f32,
316    /// Issues detected
317    pub issues: Vec<HealthIssue>,
318}
319
320/// Health issue description
321#[derive(Debug, Clone, Serialize, Deserialize)]
322pub struct HealthIssue {
323    /// Issue severity (0 = info, 1 = warning, 2 = error)
324    pub severity: u8,
325    /// Issue description
326    pub message: String,
327    /// Recommendation
328    pub recommendation: String,
329}
330
331impl IndexHealth {
332    /// Create health metrics for an index
333    pub fn analyze(size: usize, dimension: usize, stats: Option<&StatsSnapshot>) -> Self {
334        let mut issues = Vec::new();
335        let mut health_score = 1.0;
336
337        // Estimate memory usage (HNSW overhead ~= 4 * dimension * M bytes per vector)
338        let memory_bytes = size * dimension * 4 + size * dimension * 4 * 16;
339
340        // Check for potential issues
341        if size == 0 {
342            issues.push(HealthIssue {
343                severity: 0,
344                message: "Index is empty".to_string(),
345                recommendation: "Add vectors to enable semantic search".to_string(),
346            });
347            health_score *= 0.9;
348        }
349
350        if let Some(s) = stats {
351            // Check latency
352            if s.search_latency_p99 > 100_000 {
353                // > 100ms
354                issues.push(HealthIssue {
355                    severity: 2,
356                    message: format!(
357                        "High P99 search latency: {}",
358                        StatsSnapshot::format_latency(s.search_latency_p99)
359                    ),
360                    recommendation: "Consider reducing ef_search or optimizing index parameters"
361                        .to_string(),
362                });
363                health_score *= 0.7;
364            } else if s.search_latency_p99 > 10_000 {
365                // > 10ms
366                issues.push(HealthIssue {
367                    severity: 1,
368                    message: format!(
369                        "Elevated P99 search latency: {}",
370                        StatsSnapshot::format_latency(s.search_latency_p99)
371                    ),
372                    recommendation: "Monitor latency trends".to_string(),
373                });
374                health_score *= 0.9;
375            }
376
377            // Check cache hit rate
378            if s.cache_hit_rate < 0.5 && s.search_count > 100 {
379                issues.push(HealthIssue {
380                    severity: 1,
381                    message: format!("Low cache hit rate: {:.1}%", s.cache_hit_rate * 100.0),
382                    recommendation: "Consider increasing cache size".to_string(),
383                });
384                health_score *= 0.95;
385            }
386        }
387
388        // Check size for performance
389        if size > 1_000_000 {
390            issues.push(HealthIssue {
391                severity: 1,
392                message: format!("Large index size: {} vectors", size),
393                recommendation:
394                    "Consider using DiskANN or quantization for better memory efficiency"
395                        .to_string(),
396            });
397        }
398
399        Self {
400            size,
401            memory_bytes,
402            dimension,
403            avg_connectivity: None,
404            recall_estimate: None,
405            health_score,
406            issues,
407        }
408    }
409}
410
411/// Performance timer for measuring operation latencies
412pub struct PerfTimer {
413    start: Instant,
414}
415
416impl PerfTimer {
417    /// Start a new timer
418    pub fn start() -> Self {
419        Self {
420            start: Instant::now(),
421        }
422    }
423
424    /// Get elapsed duration
425    pub fn elapsed(&self) -> Duration {
426        self.start.elapsed()
427    }
428
429    /// Stop and return duration
430    pub fn stop(self) -> Duration {
431        self.start.elapsed()
432    }
433}
434
435/// Memory usage tracker
436#[derive(Debug, Clone, Serialize, Deserialize)]
437pub struct MemoryUsage {
438    /// Vector data memory (bytes)
439    pub vectors_bytes: usize,
440    /// Index structure memory (bytes)
441    pub index_bytes: usize,
442    /// Metadata memory (bytes)
443    pub metadata_bytes: usize,
444    /// Cache memory (bytes)
445    pub cache_bytes: usize,
446    /// Total memory (bytes)
447    pub total_bytes: usize,
448}
449
450impl MemoryUsage {
451    /// Estimate memory usage
452    pub fn estimate(
453        num_vectors: usize,
454        dimension: usize,
455        metadata_count: usize,
456        cache_size: usize,
457    ) -> Self {
458        // Vector storage: num_vectors * dimension * 4 bytes (f32)
459        let vectors_bytes = num_vectors * dimension * 4;
460
461        // HNSW index overhead: approximately M * num_vectors * 4 * 2 bytes for graph
462        // Assuming M = 16
463        let index_bytes = 16 * num_vectors * 4 * 2;
464
465        // Metadata: rough estimate of 200 bytes per entry
466        let metadata_bytes = metadata_count * 200;
467
468        // Cache: cached vectors + overhead
469        let cache_bytes = cache_size * dimension * 4 * 2;
470
471        let total_bytes = vectors_bytes + index_bytes + metadata_bytes + cache_bytes;
472
473        Self {
474            vectors_bytes,
475            index_bytes,
476            metadata_bytes,
477            cache_bytes,
478            total_bytes,
479        }
480    }
481
482    /// Format as human-readable string
483    pub fn format_bytes(bytes: usize) -> String {
484        if bytes < 1024 {
485            format!("{} B", bytes)
486        } else if bytes < 1024 * 1024 {
487            format!("{:.2} KB", bytes as f64 / 1024.0)
488        } else if bytes < 1024 * 1024 * 1024 {
489            format!("{:.2} MB", bytes as f64 / (1024.0 * 1024.0))
490        } else {
491            format!("{:.2} GB", bytes as f64 / (1024.0 * 1024.0 * 1024.0))
492        }
493    }
494
495    /// Get formatted summary
496    pub fn summary(&self) -> String {
497        format!(
498            "Total: {} (Vectors: {}, Index: {}, Metadata: {}, Cache: {})",
499            Self::format_bytes(self.total_bytes),
500            Self::format_bytes(self.vectors_bytes),
501            Self::format_bytes(self.index_bytes),
502            Self::format_bytes(self.metadata_bytes),
503            Self::format_bytes(self.cache_bytes),
504        )
505    }
506}
507
508#[cfg(test)]
509mod tests {
510    use super::*;
511
512    #[test]
513    fn test_stats_recording() {
514        let stats = IndexStats::new();
515
516        // Record some operations
517        stats.record_insert(Duration::from_micros(100));
518        stats.record_insert(Duration::from_micros(200));
519        stats.record_search(Duration::from_micros(50), 10, 10);
520        stats.record_search(Duration::from_micros(150), 10, 8);
521        stats.record_cache_hit();
522        stats.record_cache_miss();
523
524        let snapshot = stats.snapshot();
525
526        assert_eq!(snapshot.insert_count, 2);
527        assert_eq!(snapshot.search_count, 2);
528        assert!(snapshot.cache_hit_rate > 0.4 && snapshot.cache_hit_rate < 0.6);
529    }
530
531    #[test]
532    fn test_latency_histogram() {
533        let mut histogram = LatencyHistogram::new();
534
535        for i in 1..=100 {
536            histogram.record(i);
537        }
538
539        assert_eq!(histogram.count(), 100);
540        // Percentile 50 should be around 50-51 (0-indexed array, so idx 50 = value 51)
541        let p50 = histogram.percentile(50);
542        assert!((50..=52).contains(&p50), "P50 was {}", p50);
543        assert!(histogram.percentile(99) >= 99);
544        // Average of 1..=100 is 50.5, rounded to 50
545        assert!(histogram.average() >= 50 && histogram.average() <= 51);
546    }
547
548    #[test]
549    fn test_index_health() {
550        let health = IndexHealth::analyze(1000, 768, None);
551
552        assert!(health.health_score > 0.0);
553        assert_eq!(health.size, 1000);
554        assert_eq!(health.dimension, 768);
555    }
556
557    #[test]
558    fn test_memory_usage() {
559        let usage = MemoryUsage::estimate(10000, 768, 10000, 1000);
560
561        // Should be in MB range for this size
562        assert!(usage.total_bytes > 1024 * 1024);
563        assert!(usage.vectors_bytes > 0);
564    }
565
566    #[test]
567    fn test_perf_timer() {
568        let timer = PerfTimer::start();
569        std::thread::sleep(Duration::from_millis(10));
570        let elapsed = timer.stop();
571
572        assert!(elapsed >= Duration::from_millis(10));
573    }
574}