1use crate::hnsw::DistanceMetric;
7use parking_lot::RwLock;
8use std::collections::HashMap;
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11
12#[derive(Debug, Clone)]
14pub struct QueryMetrics {
15 pub duration: Duration,
17 pub result_count: usize,
19 pub cache_hit: bool,
21 pub metric: DistanceMetric,
23 pub ef_search: usize,
25 pub k: usize,
27}
28
29#[derive(Debug, Clone)]
31pub struct AnalyticsSummary {
32 pub total_queries: usize,
34 pub cache_hits: usize,
36 pub cache_hit_rate: f32,
38 pub avg_duration: Duration,
40 pub p50_latency: Duration,
42 pub p90_latency: Duration,
44 pub p99_latency: Duration,
46 pub top_k_values: Vec<(usize, usize)>, pub qps: f32,
50}
51
52#[derive(Debug, Clone)]
54pub struct DetectedPattern {
55 pub embedding_hash: u64,
57 pub frequency: usize,
59 pub avg_duration: Duration,
61}
62
63pub struct AnalyticsTracker {
65 query_history: Arc<RwLock<Vec<(Instant, QueryMetrics)>>>,
67 query_patterns: Arc<RwLock<HashMap<u64, DetectedPattern>>>,
69 max_history_size: usize,
71 start_time: Instant,
73}
74
75impl AnalyticsTracker {
76 pub fn new(max_history_size: usize) -> Self {
78 Self {
79 query_history: Arc::new(RwLock::new(Vec::new())),
80 query_patterns: Arc::new(RwLock::new(HashMap::new())),
81 max_history_size,
82 start_time: Instant::now(),
83 }
84 }
85
86 pub fn with_defaults() -> Self {
88 Self::new(10000) }
90
91 pub fn record_query(&self, embedding: &[f32], metrics: QueryMetrics) {
93 let now = Instant::now();
94 let hash = Self::hash_embedding(embedding);
95
96 {
98 let mut history = self.query_history.write();
99 history.push((now, metrics.clone()));
100
101 if history.len() > self.max_history_size {
103 let remove_count = history.len() - self.max_history_size;
104 history.drain(0..remove_count);
105 }
106 }
107
108 {
110 let mut patterns = self.query_patterns.write();
111 patterns
112 .entry(hash)
113 .and_modify(|pattern| {
114 pattern.frequency += 1;
115 let total = pattern.avg_duration.as_nanos() as f64
117 * (pattern.frequency - 1) as f64
118 + metrics.duration.as_nanos() as f64;
119 pattern.avg_duration =
120 Duration::from_nanos((total / pattern.frequency as f64) as u64);
121 })
122 .or_insert(DetectedPattern {
123 embedding_hash: hash,
124 frequency: 1,
125 avg_duration: metrics.duration,
126 });
127 }
128 }
129
130 pub fn get_summary(&self, window: Option<Duration>) -> AnalyticsSummary {
132 let history = self.query_history.read();
133
134 let now = Instant::now();
136 let filtered: Vec<&QueryMetrics> = if let Some(duration) = window {
137 history
138 .iter()
139 .filter(|(timestamp, _)| now.duration_since(*timestamp) <= duration)
140 .map(|(_, metrics)| metrics)
141 .collect()
142 } else {
143 history.iter().map(|(_, metrics)| metrics).collect()
144 };
145
146 if filtered.is_empty() {
147 return AnalyticsSummary {
148 total_queries: 0,
149 cache_hits: 0,
150 cache_hit_rate: 0.0,
151 avg_duration: Duration::from_secs(0),
152 p50_latency: Duration::from_secs(0),
153 p90_latency: Duration::from_secs(0),
154 p99_latency: Duration::from_secs(0),
155 top_k_values: Vec::new(),
156 qps: 0.0,
157 };
158 }
159
160 let total_queries = filtered.len();
161 let cache_hits = filtered.iter().filter(|m| m.cache_hit).count();
162 let cache_hit_rate = cache_hits as f32 / total_queries as f32;
163
164 let total_duration: u128 = filtered.iter().map(|m| m.duration.as_nanos()).sum();
166 let avg_duration = Duration::from_nanos((total_duration / total_queries as u128) as u64);
167
168 let mut durations: Vec<Duration> = filtered.iter().map(|m| m.duration).collect();
170 durations.sort();
171
172 let p50_latency = durations[total_queries * 50 / 100];
173 let p90_latency = durations[total_queries * 90 / 100];
174 let p99_latency = durations[total_queries * 99 / 100];
175
176 let mut k_counts: HashMap<usize, usize> = HashMap::new();
178 for metrics in &filtered {
179 *k_counts.entry(metrics.k).or_insert(0) += 1;
180 }
181 let mut top_k_values: Vec<(usize, usize)> = k_counts.into_iter().collect();
182 top_k_values.sort_by(|a, b| b.1.cmp(&a.1)); top_k_values.truncate(5); let elapsed = self.start_time.elapsed().as_secs_f32();
187 let qps = if elapsed > 0.0 {
188 total_queries as f32 / elapsed
189 } else {
190 0.0
191 };
192
193 AnalyticsSummary {
194 total_queries,
195 cache_hits,
196 cache_hit_rate,
197 avg_duration,
198 p50_latency,
199 p90_latency,
200 p99_latency,
201 top_k_values,
202 qps,
203 }
204 }
205
206 pub fn get_top_patterns(&self, limit: usize) -> Vec<DetectedPattern> {
208 let patterns = self.query_patterns.read();
209 let mut sorted: Vec<DetectedPattern> = patterns.values().cloned().collect();
210 sorted.sort_by(|a, b| b.frequency.cmp(&a.frequency));
211 sorted.truncate(limit);
212 sorted
213 }
214
215 pub fn clear(&self) {
217 self.query_history.write().clear();
218 self.query_patterns.write().clear();
219 }
220
221 pub fn total_queries(&self) -> usize {
223 self.query_history.read().len()
224 }
225
226 fn hash_embedding(embedding: &[f32]) -> u64 {
228 use std::collections::hash_map::DefaultHasher;
229 use std::hash::{Hash, Hasher};
230
231 let mut hasher = DefaultHasher::new();
232 for (i, &val) in embedding.iter().enumerate().step_by(8) {
234 (i, (val * 1000.0) as i32).hash(&mut hasher);
235 }
236 hasher.finish()
237 }
238}
239
240pub struct QueryTimer {
242 start: Instant,
243 embedding: Vec<f32>,
244 k: usize,
245 ef_search: usize,
246 metric: DistanceMetric,
247 cache_hit: bool,
248}
249
250impl QueryTimer {
251 pub fn start(embedding: Vec<f32>, k: usize, ef_search: usize, metric: DistanceMetric) -> Self {
253 Self {
254 start: Instant::now(),
255 embedding,
256 k,
257 ef_search,
258 metric,
259 cache_hit: false,
260 }
261 }
262
263 pub fn set_cache_hit(&mut self, hit: bool) {
265 self.cache_hit = hit;
266 }
267
268 pub fn finish(self, tracker: &AnalyticsTracker, result_count: usize) {
270 let duration = self.start.elapsed();
271 let metrics = QueryMetrics {
272 duration,
273 result_count,
274 cache_hit: self.cache_hit,
275 metric: self.metric,
276 ef_search: self.ef_search,
277 k: self.k,
278 };
279 tracker.record_query(&self.embedding, metrics);
280 }
281}
282
283#[cfg(test)]
284mod tests {
285 use super::*;
286
287 #[test]
288 fn test_tracker_creation() {
289 let tracker = AnalyticsTracker::with_defaults();
290 assert_eq!(tracker.total_queries(), 0);
291 }
292
293 #[test]
294 fn test_record_query() {
295 let tracker = AnalyticsTracker::with_defaults();
296 let embedding = vec![0.5; 128];
297
298 let metrics = QueryMetrics {
299 duration: Duration::from_millis(10),
300 result_count: 5,
301 cache_hit: false,
302 metric: DistanceMetric::Cosine,
303 ef_search: 50,
304 k: 10,
305 };
306
307 tracker.record_query(&embedding, metrics);
308 assert_eq!(tracker.total_queries(), 1);
309 }
310
311 #[test]
312 fn test_analytics_summary() {
313 let tracker = AnalyticsTracker::with_defaults();
314 let embedding = vec![0.5; 128];
315
316 for i in 0..10 {
318 let metrics = QueryMetrics {
319 duration: Duration::from_millis(i * 10),
320 result_count: 5,
321 cache_hit: i % 2 == 0, metric: DistanceMetric::Cosine,
323 ef_search: 50,
324 k: 10,
325 };
326 tracker.record_query(&embedding, metrics);
327 }
328
329 let summary = tracker.get_summary(None);
330 assert_eq!(summary.total_queries, 10);
331 assert_eq!(summary.cache_hits, 5);
332 assert!((summary.cache_hit_rate - 0.5).abs() < 0.01);
333 }
334
335 #[test]
336 fn test_query_patterns() {
337 let tracker = AnalyticsTracker::with_defaults();
338
339 let embedding1 = vec![0.5; 128];
341 for _ in 0..5 {
342 let metrics = QueryMetrics {
343 duration: Duration::from_millis(10),
344 result_count: 5,
345 cache_hit: false,
346 metric: DistanceMetric::Cosine,
347 ef_search: 50,
348 k: 10,
349 };
350 tracker.record_query(&embedding1, metrics);
351 }
352
353 let embedding2 = vec![0.8; 128];
355 for _ in 0..3 {
356 let metrics = QueryMetrics {
357 duration: Duration::from_millis(20),
358 result_count: 5,
359 cache_hit: false,
360 metric: DistanceMetric::Cosine,
361 ef_search: 50,
362 k: 10,
363 };
364 tracker.record_query(&embedding2, metrics);
365 }
366
367 let patterns = tracker.get_top_patterns(2);
368 assert_eq!(patterns.len(), 2);
369 assert_eq!(patterns[0].frequency, 5); }
371
372 #[test]
373 fn test_query_timer() {
374 let tracker = AnalyticsTracker::with_defaults();
375 let embedding = vec![0.5; 128];
376
377 let timer = QueryTimer::start(embedding, 10, 50, DistanceMetric::Cosine);
378 std::thread::sleep(Duration::from_millis(10));
379 timer.finish(&tracker, 5);
380
381 assert_eq!(tracker.total_queries(), 1);
382 let summary = tracker.get_summary(None);
383 assert!(summary.avg_duration >= Duration::from_millis(10));
384 }
385
386 #[test]
387 fn test_top_k_values() {
388 let tracker = AnalyticsTracker::with_defaults();
389 let embedding = vec![0.5; 128];
390
391 for k in &[5, 10, 10, 10, 20] {
393 let metrics = QueryMetrics {
394 duration: Duration::from_millis(10),
395 result_count: 5,
396 cache_hit: false,
397 metric: DistanceMetric::Cosine,
398 ef_search: 50,
399 k: *k,
400 };
401 tracker.record_query(&embedding, metrics);
402 }
403
404 let summary = tracker.get_summary(None);
405 assert_eq!(summary.top_k_values[0].0, 10); assert_eq!(summary.top_k_values[0].1, 3); }
408
409 #[test]
410 fn test_clear_analytics() {
411 let tracker = AnalyticsTracker::with_defaults();
412 let embedding = vec![0.5; 128];
413
414 let metrics = QueryMetrics {
415 duration: Duration::from_millis(10),
416 result_count: 5,
417 cache_hit: false,
418 metric: DistanceMetric::Cosine,
419 ef_search: 50,
420 k: 10,
421 };
422
423 tracker.record_query(&embedding, metrics);
424 assert_eq!(tracker.total_queries(), 1);
425
426 tracker.clear();
427 assert_eq!(tracker.total_queries(), 0);
428 }
429
430 #[test]
431 fn test_time_window_filtering() {
432 let tracker = AnalyticsTracker::with_defaults();
433 let embedding = vec![0.5; 128];
434
435 let metrics = QueryMetrics {
437 duration: Duration::from_millis(10),
438 result_count: 5,
439 cache_hit: false,
440 metric: DistanceMetric::Cosine,
441 ef_search: 50,
442 k: 10,
443 };
444 tracker.record_query(&embedding, metrics);
445
446 let summary = tracker.get_summary(Some(Duration::from_secs(1)));
448 assert_eq!(summary.total_queries, 1);
449
450 std::thread::sleep(Duration::from_millis(100));
452 let summary = tracker.get_summary(Some(Duration::from_millis(10)));
453 assert_eq!(summary.total_queries, 0);
454 }
455}