Skip to main content

heliosdb_proxy/analytics/
metrics.rs

1//! Analytics Metrics
2//!
3//! Track aggregated query metrics and provide snapshots.
4
5use std::collections::HashMap;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::time::Duration;
8
9use dashmap::DashMap;
10use parking_lot::RwLock;
11
12use super::fingerprinter::{QueryFingerprint, OperationType};
13use super::statistics::QueryExecution;
14use super::intent::QueryIntent;
15
16/// Per-operation metrics
17struct OperationMetrics {
18    /// Query count
19    count: AtomicU64,
20    /// Total time in microseconds
21    total_time_us: AtomicU64,
22    /// Error count
23    errors: AtomicU64,
24    /// Total rows
25    rows: AtomicU64,
26}
27
28impl OperationMetrics {
29    fn new() -> Self {
30        Self {
31            count: AtomicU64::new(0),
32            total_time_us: AtomicU64::new(0),
33            errors: AtomicU64::new(0),
34            rows: AtomicU64::new(0),
35        }
36    }
37
38    fn record(&self, execution: &QueryExecution) {
39        self.count.fetch_add(1, Ordering::Relaxed);
40        self.total_time_us
41            .fetch_add(execution.duration.as_micros() as u64, Ordering::Relaxed);
42        self.rows
43            .fetch_add(execution.rows as u64, Ordering::Relaxed);
44
45        if execution.error.is_some() {
46            self.errors.fetch_add(1, Ordering::Relaxed);
47        }
48    }
49
50    fn snapshot(&self) -> OperationSnapshot {
51        let count = self.count.load(Ordering::Relaxed);
52        let total_time_us = self.total_time_us.load(Ordering::Relaxed);
53        let errors = self.errors.load(Ordering::Relaxed);
54        let rows = self.rows.load(Ordering::Relaxed);
55
56        let avg_time_us = if count > 0 {
57            total_time_us / count
58        } else {
59            0
60        };
61
62        OperationSnapshot {
63            count,
64            total_time: Duration::from_micros(total_time_us),
65            avg_time: Duration::from_micros(avg_time_us),
66            errors,
67            error_rate: if count > 0 {
68                errors as f64 / count as f64
69            } else {
70                0.0
71            },
72            rows,
73        }
74    }
75
76    fn reset(&self) {
77        self.count.store(0, Ordering::Relaxed);
78        self.total_time_us.store(0, Ordering::Relaxed);
79        self.errors.store(0, Ordering::Relaxed);
80        self.rows.store(0, Ordering::Relaxed);
81    }
82}
83
84/// Snapshot of operation metrics
85#[derive(Debug, Clone)]
86pub struct OperationSnapshot {
87    pub count: u64,
88    pub total_time: Duration,
89    pub avg_time: Duration,
90    pub errors: u64,
91    pub error_rate: f64,
92    pub rows: u64,
93}
94
95/// Per-intent metrics
96struct IntentMetrics {
97    /// Query count
98    count: AtomicU64,
99    /// Total time in microseconds
100    total_time_us: AtomicU64,
101    /// Cache hits (for retrieval intent)
102    cache_hits: AtomicU64,
103}
104
105impl IntentMetrics {
106    fn new() -> Self {
107        Self {
108            count: AtomicU64::new(0),
109            total_time_us: AtomicU64::new(0),
110            cache_hits: AtomicU64::new(0),
111        }
112    }
113
114    fn record(&self, duration: Duration) {
115        self.count.fetch_add(1, Ordering::Relaxed);
116        self.total_time_us
117            .fetch_add(duration.as_micros() as u64, Ordering::Relaxed);
118    }
119
120    fn record_cache_hit(&self) {
121        self.cache_hits.fetch_add(1, Ordering::Relaxed);
122    }
123
124    fn snapshot(&self) -> super::IntentStats {
125        let count = self.count.load(Ordering::Relaxed);
126        let total_us = self.total_time_us.load(Ordering::Relaxed);
127        let cache_hits = self.cache_hits.load(Ordering::Relaxed);
128
129        super::IntentStats {
130            calls: count,
131            total_time_ms: total_us / 1000,
132            avg_time_ms: if count > 0 {
133                (total_us as f64 / count as f64) / 1000.0
134            } else {
135                0.0
136            },
137            cache_hit_ratio: if count > 0 {
138                cache_hits as f64 / count as f64
139            } else {
140                0.0
141            },
142        }
143    }
144
145    fn reset(&self) {
146        self.count.store(0, Ordering::Relaxed);
147        self.total_time_us.store(0, Ordering::Relaxed);
148        self.cache_hits.store(0, Ordering::Relaxed);
149    }
150}
151
152/// Query metric entry (for recent queries tracking)
153#[derive(Debug, Clone)]
154pub struct QueryMetricEntry {
155    pub fingerprint_hash: u64,
156    pub normalized: String,
157    pub duration: Duration,
158    pub timestamp_nanos: u64,
159    pub user: String,
160    pub database: String,
161    pub intent: QueryIntent,
162}
163
164/// Analytics metrics aggregator
165pub struct AnalyticsMetrics {
166    /// Total query count
167    total_queries: AtomicU64,
168
169    /// Total time in microseconds
170    total_time_us: AtomicU64,
171
172    /// Total errors
173    total_errors: AtomicU64,
174
175    /// Per-operation metrics
176    operations: DashMap<OperationType, OperationMetrics>,
177
178    /// Per-intent metrics
179    intents: DashMap<QueryIntent, IntentMetrics>,
180
181    /// Per-user metrics
182    users: DashMap<String, OperationMetrics>,
183
184    /// Per-database metrics
185    databases: DashMap<String, OperationMetrics>,
186
187    /// Per-node metrics
188    nodes: DashMap<String, OperationMetrics>,
189
190    /// Recent query entries (for debugging)
191    recent: RwLock<Vec<QueryMetricEntry>>,
192
193    /// Max recent entries
194    max_recent: usize,
195}
196
197impl AnalyticsMetrics {
198    /// Create new metrics aggregator
199    pub fn new() -> Self {
200        Self::with_max_recent(100)
201    }
202
203    /// Create with custom recent entries limit
204    pub fn with_max_recent(max_recent: usize) -> Self {
205        Self {
206            total_queries: AtomicU64::new(0),
207            total_time_us: AtomicU64::new(0),
208            total_errors: AtomicU64::new(0),
209            operations: DashMap::new(),
210            intents: DashMap::new(),
211            users: DashMap::new(),
212            databases: DashMap::new(),
213            nodes: DashMap::new(),
214            recent: RwLock::new(Vec::new()),
215            max_recent,
216        }
217    }
218
219    /// Record query execution
220    pub fn record(
221        &self,
222        fingerprint: &QueryFingerprint,
223        execution: &QueryExecution,
224        intent: QueryIntent,
225    ) {
226        // Global counters
227        self.total_queries.fetch_add(1, Ordering::Relaxed);
228        self.total_time_us
229            .fetch_add(execution.duration.as_micros() as u64, Ordering::Relaxed);
230
231        if execution.error.is_some() {
232            self.total_errors.fetch_add(1, Ordering::Relaxed);
233        }
234
235        // Per-operation
236        self.operations
237            .entry(fingerprint.operation)
238            .or_insert_with(OperationMetrics::new)
239            .record(execution);
240
241        // Per-intent
242        self.intents
243            .entry(intent)
244            .or_insert_with(IntentMetrics::new)
245            .record(execution.duration);
246
247        // Per-user
248        self.users
249            .entry(execution.user.clone())
250            .or_insert_with(OperationMetrics::new)
251            .record(execution);
252
253        // Per-database
254        self.databases
255            .entry(execution.database.clone())
256            .or_insert_with(OperationMetrics::new)
257            .record(execution);
258
259        // Per-node
260        self.nodes
261            .entry(execution.node.clone())
262            .or_insert_with(OperationMetrics::new)
263            .record(execution);
264
265        // Recent entries
266        {
267            let mut recent = self.recent.write();
268            if recent.len() >= self.max_recent {
269                recent.remove(0);
270            }
271            recent.push(QueryMetricEntry {
272                fingerprint_hash: fingerprint.hash,
273                normalized: fingerprint.normalized.clone(),
274                duration: execution.duration,
275                timestamp_nanos: now_nanos(),
276                user: execution.user.clone(),
277                database: execution.database.clone(),
278                intent,
279            });
280        }
281    }
282
283    /// Record cache hit for an intent
284    pub fn record_cache_hit(&self, intent: QueryIntent) {
285        self.intents
286            .entry(intent)
287            .or_insert_with(IntentMetrics::new)
288            .record_cache_hit();
289    }
290
291    /// Get snapshot of all metrics
292    pub fn snapshot(&self) -> AnalyticsSnapshot {
293        let total_queries = self.total_queries.load(Ordering::Relaxed);
294        let total_time_us = self.total_time_us.load(Ordering::Relaxed);
295        let total_errors = self.total_errors.load(Ordering::Relaxed);
296
297        let operations: HashMap<_, _> = self
298            .operations
299            .iter()
300            .map(|r| (*r.key(), r.value().snapshot()))
301            .collect();
302
303        let users: HashMap<_, _> = self
304            .users
305            .iter()
306            .map(|r| (r.key().clone(), r.value().snapshot()))
307            .collect();
308
309        let databases: HashMap<_, _> = self
310            .databases
311            .iter()
312            .map(|r| (r.key().clone(), r.value().snapshot()))
313            .collect();
314
315        let nodes: HashMap<_, _> = self
316            .nodes
317            .iter()
318            .map(|r| (r.key().clone(), r.value().snapshot()))
319            .collect();
320
321        AnalyticsSnapshot {
322            total_queries,
323            total_time: Duration::from_micros(total_time_us),
324            total_errors,
325            error_rate: if total_queries > 0 {
326                total_errors as f64 / total_queries as f64
327            } else {
328                0.0
329            },
330            qps: 0.0, // Would need time tracking for accurate QPS
331            avg_time: if total_queries > 0 {
332                Duration::from_micros(total_time_us / total_queries)
333            } else {
334                Duration::ZERO
335            },
336            by_operation: operations,
337            by_user: users,
338            by_database: databases,
339            by_node: nodes,
340        }
341    }
342
343    /// Get metrics by intent
344    pub fn by_intent(&self) -> HashMap<QueryIntent, super::IntentStats> {
345        self.intents
346            .iter()
347            .map(|r| (*r.key(), r.value().snapshot()))
348            .collect()
349    }
350
351    /// Get recent queries
352    pub fn recent_queries(&self, limit: usize) -> Vec<QueryMetricEntry> {
353        let recent = self.recent.read();
354        recent.iter().rev().take(limit).cloned().collect()
355    }
356
357    /// Reset all metrics
358    pub fn reset(&self) {
359        self.total_queries.store(0, Ordering::Relaxed);
360        self.total_time_us.store(0, Ordering::Relaxed);
361        self.total_errors.store(0, Ordering::Relaxed);
362
363        for entry in self.operations.iter() {
364            entry.value().reset();
365        }
366        for entry in self.intents.iter() {
367            entry.value().reset();
368        }
369        for entry in self.users.iter() {
370            entry.value().reset();
371        }
372        for entry in self.databases.iter() {
373            entry.value().reset();
374        }
375        for entry in self.nodes.iter() {
376            entry.value().reset();
377        }
378
379        self.recent.write().clear();
380    }
381}
382
383impl Default for AnalyticsMetrics {
384    fn default() -> Self {
385        Self::new()
386    }
387}
388
389/// Snapshot of analytics metrics
390#[derive(Debug, Clone)]
391pub struct AnalyticsSnapshot {
392    /// Total queries executed
393    pub total_queries: u64,
394
395    /// Total execution time
396    pub total_time: Duration,
397
398    /// Total errors
399    pub total_errors: u64,
400
401    /// Error rate (0.0 - 1.0)
402    pub error_rate: f64,
403
404    /// Queries per second (approximate)
405    pub qps: f64,
406
407    /// Average query time
408    pub avg_time: Duration,
409
410    /// Metrics by operation type
411    pub by_operation: HashMap<OperationType, OperationSnapshot>,
412
413    /// Metrics by user
414    pub by_user: HashMap<String, OperationSnapshot>,
415
416    /// Metrics by database
417    pub by_database: HashMap<String, OperationSnapshot>,
418
419    /// Metrics by node
420    pub by_node: HashMap<String, OperationSnapshot>,
421}
422
423fn now_nanos() -> u64 {
424    std::time::SystemTime::now()
425        .duration_since(std::time::SystemTime::UNIX_EPOCH)
426        .map(|d| d.as_nanos() as u64)
427        .unwrap_or(0)
428}
429
430#[cfg(test)]
431mod tests {
432    use super::*;
433    use crate::analytics::fingerprinter::QueryFingerprinter;
434
435    #[test]
436    fn test_metrics_new() {
437        let metrics = AnalyticsMetrics::new();
438        let snapshot = metrics.snapshot();
439        assert_eq!(snapshot.total_queries, 0);
440        assert_eq!(snapshot.total_errors, 0);
441    }
442
443    #[test]
444    fn test_metrics_record() {
445        let metrics = AnalyticsMetrics::new();
446        let fp = QueryFingerprinter::new();
447
448        let fingerprint = fp.fingerprint("SELECT * FROM users WHERE id = 1");
449        let execution = QueryExecution::new("SELECT * FROM users WHERE id = 1", Duration::from_millis(10))
450            .with_user("alice")
451            .with_database("mydb")
452            .with_node("primary")
453            .with_rows(1);
454
455        metrics.record(&fingerprint, &execution, QueryIntent::Retrieval);
456
457        let snapshot = metrics.snapshot();
458        assert_eq!(snapshot.total_queries, 1);
459        assert!(snapshot.by_operation.contains_key(&OperationType::Select));
460        assert!(snapshot.by_user.contains_key("alice"));
461        assert!(snapshot.by_database.contains_key("mydb"));
462    }
463
464    #[test]
465    fn test_metrics_by_intent() {
466        let metrics = AnalyticsMetrics::new();
467        let fp = QueryFingerprinter::new();
468
469        // Record retrieval query
470        let fingerprint = fp.fingerprint("SELECT * FROM users");
471        let execution = QueryExecution::new("SELECT * FROM users", Duration::from_millis(5));
472        metrics.record(&fingerprint, &execution, QueryIntent::Retrieval);
473
474        // Record storage query
475        let fingerprint = fp.fingerprint("INSERT INTO users VALUES (1, 'Alice')");
476        let execution = QueryExecution::new("INSERT INTO users VALUES (1, 'Alice')", Duration::from_millis(10));
477        metrics.record(&fingerprint, &execution, QueryIntent::Storage);
478
479        let by_intent = metrics.by_intent();
480        assert!(by_intent.contains_key(&QueryIntent::Retrieval));
481        assert!(by_intent.contains_key(&QueryIntent::Storage));
482    }
483
484    #[test]
485    fn test_metrics_error_tracking() {
486        let metrics = AnalyticsMetrics::new();
487        let fp = QueryFingerprinter::new();
488
489        // Record successful query
490        let fingerprint = fp.fingerprint("SELECT 1");
491        let execution = QueryExecution::new("SELECT 1", Duration::from_millis(1));
492        metrics.record(&fingerprint, &execution, QueryIntent::Retrieval);
493
494        // Record failed query
495        let execution = QueryExecution::new("SELECT 1", Duration::from_millis(1))
496            .with_error("Connection refused");
497        metrics.record(&fingerprint, &execution, QueryIntent::Retrieval);
498
499        let snapshot = metrics.snapshot();
500        assert_eq!(snapshot.total_queries, 2);
501        assert_eq!(snapshot.total_errors, 1);
502        assert!((snapshot.error_rate - 0.5).abs() < 0.001);
503    }
504
505    #[test]
506    fn test_metrics_reset() {
507        let metrics = AnalyticsMetrics::new();
508        let fp = QueryFingerprinter::new();
509
510        let fingerprint = fp.fingerprint("SELECT 1");
511        let execution = QueryExecution::new("SELECT 1", Duration::from_millis(1));
512        metrics.record(&fingerprint, &execution, QueryIntent::Retrieval);
513
514        metrics.reset();
515
516        let snapshot = metrics.snapshot();
517        assert_eq!(snapshot.total_queries, 0);
518    }
519
520    #[test]
521    fn test_recent_queries() {
522        let metrics = AnalyticsMetrics::with_max_recent(5);
523        let fp = QueryFingerprinter::new();
524
525        // Record 10 queries
526        for i in 0..10 {
527            let query = format!("SELECT {}", i);
528            let fingerprint = fp.fingerprint(&query);
529            let execution = QueryExecution::new(query, Duration::from_millis(1));
530            metrics.record(&fingerprint, &execution, QueryIntent::Retrieval);
531        }
532
533        // Should only keep last 5
534        let recent = metrics.recent_queries(10);
535        assert_eq!(recent.len(), 5);
536    }
537
538    #[test]
539    fn test_cache_hit_recording() {
540        let metrics = AnalyticsMetrics::new();
541
542        // Record cache hits
543        for _ in 0..5 {
544            metrics.record_cache_hit(QueryIntent::Retrieval);
545        }
546
547        let by_intent = metrics.by_intent();
548        if let Some(stats) = by_intent.get(&QueryIntent::Retrieval) {
549            assert_eq!(stats.calls, 0); // Only recorded cache hits, no queries
550        }
551    }
552}