Skip to main content

heliosdb_proxy/analytics/
metrics.rs

1//! Analytics Metrics
2//!
3//! Track aggregated query metrics and provide snapshots.
4
5use std::collections::HashMap;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::time::Duration;
8
9use dashmap::DashMap;
10use parking_lot::RwLock;
11
12use super::fingerprinter::{OperationType, QueryFingerprint};
13use super::intent::QueryIntent;
14use super::statistics::QueryExecution;
15
16/// Per-operation metrics
17struct OperationMetrics {
18    /// Query count
19    count: AtomicU64,
20    /// Total time in microseconds
21    total_time_us: AtomicU64,
22    /// Error count
23    errors: AtomicU64,
24    /// Total rows
25    rows: AtomicU64,
26}
27
28impl OperationMetrics {
29    fn new() -> Self {
30        Self {
31            count: AtomicU64::new(0),
32            total_time_us: AtomicU64::new(0),
33            errors: AtomicU64::new(0),
34            rows: AtomicU64::new(0),
35        }
36    }
37
38    fn record(&self, execution: &QueryExecution) {
39        self.count.fetch_add(1, Ordering::Relaxed);
40        self.total_time_us
41            .fetch_add(execution.duration.as_micros() as u64, Ordering::Relaxed);
42        self.rows
43            .fetch_add(execution.rows as u64, Ordering::Relaxed);
44
45        if execution.error.is_some() {
46            self.errors.fetch_add(1, Ordering::Relaxed);
47        }
48    }
49
50    fn snapshot(&self) -> OperationSnapshot {
51        let count = self.count.load(Ordering::Relaxed);
52        let total_time_us = self.total_time_us.load(Ordering::Relaxed);
53        let errors = self.errors.load(Ordering::Relaxed);
54        let rows = self.rows.load(Ordering::Relaxed);
55
56        let avg_time_us = total_time_us.checked_div(count).unwrap_or(0);
57
58        OperationSnapshot {
59            count,
60            total_time: Duration::from_micros(total_time_us),
61            avg_time: Duration::from_micros(avg_time_us),
62            errors,
63            error_rate: if count > 0 {
64                errors as f64 / count as f64
65            } else {
66                0.0
67            },
68            rows,
69        }
70    }
71
72    fn reset(&self) {
73        self.count.store(0, Ordering::Relaxed);
74        self.total_time_us.store(0, Ordering::Relaxed);
75        self.errors.store(0, Ordering::Relaxed);
76        self.rows.store(0, Ordering::Relaxed);
77    }
78}
79
80/// Snapshot of operation metrics
81#[derive(Debug, Clone)]
82pub struct OperationSnapshot {
83    pub count: u64,
84    pub total_time: Duration,
85    pub avg_time: Duration,
86    pub errors: u64,
87    pub error_rate: f64,
88    pub rows: u64,
89}
90
91/// Per-intent metrics
92struct IntentMetrics {
93    /// Query count
94    count: AtomicU64,
95    /// Total time in microseconds
96    total_time_us: AtomicU64,
97    /// Cache hits (for retrieval intent)
98    cache_hits: AtomicU64,
99}
100
101impl IntentMetrics {
102    fn new() -> Self {
103        Self {
104            count: AtomicU64::new(0),
105            total_time_us: AtomicU64::new(0),
106            cache_hits: AtomicU64::new(0),
107        }
108    }
109
110    fn record(&self, duration: Duration) {
111        self.count.fetch_add(1, Ordering::Relaxed);
112        self.total_time_us
113            .fetch_add(duration.as_micros() as u64, Ordering::Relaxed);
114    }
115
116    fn record_cache_hit(&self) {
117        self.cache_hits.fetch_add(1, Ordering::Relaxed);
118    }
119
120    fn snapshot(&self) -> super::IntentStats {
121        let count = self.count.load(Ordering::Relaxed);
122        let total_us = self.total_time_us.load(Ordering::Relaxed);
123        let cache_hits = self.cache_hits.load(Ordering::Relaxed);
124
125        super::IntentStats {
126            calls: count,
127            total_time_ms: total_us / 1000,
128            avg_time_ms: if count > 0 {
129                (total_us as f64 / count as f64) / 1000.0
130            } else {
131                0.0
132            },
133            cache_hit_ratio: if count > 0 {
134                cache_hits as f64 / count as f64
135            } else {
136                0.0
137            },
138        }
139    }
140
141    fn reset(&self) {
142        self.count.store(0, Ordering::Relaxed);
143        self.total_time_us.store(0, Ordering::Relaxed);
144        self.cache_hits.store(0, Ordering::Relaxed);
145    }
146}
147
148/// Query metric entry (for recent queries tracking)
149#[derive(Debug, Clone)]
150pub struct QueryMetricEntry {
151    pub fingerprint_hash: u64,
152    pub normalized: String,
153    pub duration: Duration,
154    pub timestamp_nanos: u64,
155    pub user: String,
156    pub database: String,
157    pub intent: QueryIntent,
158}
159
160/// Analytics metrics aggregator
161pub struct AnalyticsMetrics {
162    /// Total query count
163    total_queries: AtomicU64,
164
165    /// Total time in microseconds
166    total_time_us: AtomicU64,
167
168    /// Total errors
169    total_errors: AtomicU64,
170
171    /// Per-operation metrics
172    operations: DashMap<OperationType, OperationMetrics>,
173
174    /// Per-intent metrics
175    intents: DashMap<QueryIntent, IntentMetrics>,
176
177    /// Per-user metrics
178    users: DashMap<String, OperationMetrics>,
179
180    /// Per-database metrics
181    databases: DashMap<String, OperationMetrics>,
182
183    /// Per-node metrics
184    nodes: DashMap<String, OperationMetrics>,
185
186    /// Recent query entries (for debugging)
187    recent: RwLock<Vec<QueryMetricEntry>>,
188
189    /// Max recent entries
190    max_recent: usize,
191}
192
193impl AnalyticsMetrics {
194    /// Create new metrics aggregator
195    pub fn new() -> Self {
196        Self::with_max_recent(100)
197    }
198
199    /// Create with custom recent entries limit
200    pub fn with_max_recent(max_recent: usize) -> Self {
201        Self {
202            total_queries: AtomicU64::new(0),
203            total_time_us: AtomicU64::new(0),
204            total_errors: AtomicU64::new(0),
205            operations: DashMap::new(),
206            intents: DashMap::new(),
207            users: DashMap::new(),
208            databases: DashMap::new(),
209            nodes: DashMap::new(),
210            recent: RwLock::new(Vec::new()),
211            max_recent,
212        }
213    }
214
215    /// Record query execution
216    pub fn record(
217        &self,
218        fingerprint: &QueryFingerprint,
219        execution: &QueryExecution,
220        intent: QueryIntent,
221    ) {
222        // Global counters
223        self.total_queries.fetch_add(1, Ordering::Relaxed);
224        self.total_time_us
225            .fetch_add(execution.duration.as_micros() as u64, Ordering::Relaxed);
226
227        if execution.error.is_some() {
228            self.total_errors.fetch_add(1, Ordering::Relaxed);
229        }
230
231        // Per-operation
232        self.operations
233            .entry(fingerprint.operation)
234            .or_insert_with(OperationMetrics::new)
235            .record(execution);
236
237        // Per-intent
238        self.intents
239            .entry(intent)
240            .or_insert_with(IntentMetrics::new)
241            .record(execution.duration);
242
243        // Per-user
244        self.users
245            .entry(execution.user.clone())
246            .or_insert_with(OperationMetrics::new)
247            .record(execution);
248
249        // Per-database
250        self.databases
251            .entry(execution.database.clone())
252            .or_insert_with(OperationMetrics::new)
253            .record(execution);
254
255        // Per-node
256        self.nodes
257            .entry(execution.node.clone())
258            .or_insert_with(OperationMetrics::new)
259            .record(execution);
260
261        // Recent entries
262        {
263            let mut recent = self.recent.write();
264            if recent.len() >= self.max_recent {
265                recent.remove(0);
266            }
267            recent.push(QueryMetricEntry {
268                fingerprint_hash: fingerprint.hash,
269                normalized: fingerprint.normalized.clone(),
270                duration: execution.duration,
271                timestamp_nanos: now_nanos(),
272                user: execution.user.clone(),
273                database: execution.database.clone(),
274                intent,
275            });
276        }
277    }
278
279    /// Record cache hit for an intent
280    pub fn record_cache_hit(&self, intent: QueryIntent) {
281        self.intents
282            .entry(intent)
283            .or_insert_with(IntentMetrics::new)
284            .record_cache_hit();
285    }
286
287    /// Get snapshot of all metrics
288    pub fn snapshot(&self) -> AnalyticsSnapshot {
289        let total_queries = self.total_queries.load(Ordering::Relaxed);
290        let total_time_us = self.total_time_us.load(Ordering::Relaxed);
291        let total_errors = self.total_errors.load(Ordering::Relaxed);
292
293        let operations: HashMap<_, _> = self
294            .operations
295            .iter()
296            .map(|r| (*r.key(), r.value().snapshot()))
297            .collect();
298
299        let users: HashMap<_, _> = self
300            .users
301            .iter()
302            .map(|r| (r.key().clone(), r.value().snapshot()))
303            .collect();
304
305        let databases: HashMap<_, _> = self
306            .databases
307            .iter()
308            .map(|r| (r.key().clone(), r.value().snapshot()))
309            .collect();
310
311        let nodes: HashMap<_, _> = self
312            .nodes
313            .iter()
314            .map(|r| (r.key().clone(), r.value().snapshot()))
315            .collect();
316
317        AnalyticsSnapshot {
318            total_queries,
319            total_time: Duration::from_micros(total_time_us),
320            total_errors,
321            error_rate: if total_queries > 0 {
322                total_errors as f64 / total_queries as f64
323            } else {
324                0.0
325            },
326            qps: 0.0, // Would need time tracking for accurate QPS
327            avg_time: Duration::from_micros(total_time_us.checked_div(total_queries).unwrap_or(0)),
328            by_operation: operations,
329            by_user: users,
330            by_database: databases,
331            by_node: nodes,
332        }
333    }
334
335    /// Get metrics by intent
336    pub fn by_intent(&self) -> HashMap<QueryIntent, super::IntentStats> {
337        self.intents
338            .iter()
339            .map(|r| (*r.key(), r.value().snapshot()))
340            .collect()
341    }
342
343    /// Get recent queries
344    pub fn recent_queries(&self, limit: usize) -> Vec<QueryMetricEntry> {
345        let recent = self.recent.read();
346        recent.iter().rev().take(limit).cloned().collect()
347    }
348
349    /// Reset all metrics
350    pub fn reset(&self) {
351        self.total_queries.store(0, Ordering::Relaxed);
352        self.total_time_us.store(0, Ordering::Relaxed);
353        self.total_errors.store(0, Ordering::Relaxed);
354
355        for entry in self.operations.iter() {
356            entry.value().reset();
357        }
358        for entry in self.intents.iter() {
359            entry.value().reset();
360        }
361        for entry in self.users.iter() {
362            entry.value().reset();
363        }
364        for entry in self.databases.iter() {
365            entry.value().reset();
366        }
367        for entry in self.nodes.iter() {
368            entry.value().reset();
369        }
370
371        self.recent.write().clear();
372    }
373}
374
375impl Default for AnalyticsMetrics {
376    fn default() -> Self {
377        Self::new()
378    }
379}
380
381/// Snapshot of analytics metrics
382#[derive(Debug, Clone)]
383pub struct AnalyticsSnapshot {
384    /// Total queries executed
385    pub total_queries: u64,
386
387    /// Total execution time
388    pub total_time: Duration,
389
390    /// Total errors
391    pub total_errors: u64,
392
393    /// Error rate (0.0 - 1.0)
394    pub error_rate: f64,
395
396    /// Queries per second (approximate)
397    pub qps: f64,
398
399    /// Average query time
400    pub avg_time: Duration,
401
402    /// Metrics by operation type
403    pub by_operation: HashMap<OperationType, OperationSnapshot>,
404
405    /// Metrics by user
406    pub by_user: HashMap<String, OperationSnapshot>,
407
408    /// Metrics by database
409    pub by_database: HashMap<String, OperationSnapshot>,
410
411    /// Metrics by node
412    pub by_node: HashMap<String, OperationSnapshot>,
413}
414
415fn now_nanos() -> u64 {
416    std::time::SystemTime::now()
417        .duration_since(std::time::SystemTime::UNIX_EPOCH)
418        .map(|d| d.as_nanos() as u64)
419        .unwrap_or(0)
420}
421
422#[cfg(test)]
423mod tests {
424    use super::*;
425    use crate::analytics::fingerprinter::QueryFingerprinter;
426
427    #[test]
428    fn test_metrics_new() {
429        let metrics = AnalyticsMetrics::new();
430        let snapshot = metrics.snapshot();
431        assert_eq!(snapshot.total_queries, 0);
432        assert_eq!(snapshot.total_errors, 0);
433    }
434
435    #[test]
436    fn test_metrics_record() {
437        let metrics = AnalyticsMetrics::new();
438        let fp = QueryFingerprinter::new();
439
440        let fingerprint = fp.fingerprint("SELECT * FROM users WHERE id = 1");
441        let execution = QueryExecution::new(
442            "SELECT * FROM users WHERE id = 1",
443            Duration::from_millis(10),
444        )
445        .with_user("alice")
446        .with_database("mydb")
447        .with_node("primary")
448        .with_rows(1);
449
450        metrics.record(&fingerprint, &execution, QueryIntent::Retrieval);
451
452        let snapshot = metrics.snapshot();
453        assert_eq!(snapshot.total_queries, 1);
454        assert!(snapshot.by_operation.contains_key(&OperationType::Select));
455        assert!(snapshot.by_user.contains_key("alice"));
456        assert!(snapshot.by_database.contains_key("mydb"));
457    }
458
459    #[test]
460    fn test_metrics_by_intent() {
461        let metrics = AnalyticsMetrics::new();
462        let fp = QueryFingerprinter::new();
463
464        // Record retrieval query
465        let fingerprint = fp.fingerprint("SELECT * FROM users");
466        let execution = QueryExecution::new("SELECT * FROM users", Duration::from_millis(5));
467        metrics.record(&fingerprint, &execution, QueryIntent::Retrieval);
468
469        // Record storage query
470        let fingerprint = fp.fingerprint("INSERT INTO users VALUES (1, 'Alice')");
471        let execution = QueryExecution::new(
472            "INSERT INTO users VALUES (1, 'Alice')",
473            Duration::from_millis(10),
474        );
475        metrics.record(&fingerprint, &execution, QueryIntent::Storage);
476
477        let by_intent = metrics.by_intent();
478        assert!(by_intent.contains_key(&QueryIntent::Retrieval));
479        assert!(by_intent.contains_key(&QueryIntent::Storage));
480    }
481
482    #[test]
483    fn test_metrics_error_tracking() {
484        let metrics = AnalyticsMetrics::new();
485        let fp = QueryFingerprinter::new();
486
487        // Record successful query
488        let fingerprint = fp.fingerprint("SELECT 1");
489        let execution = QueryExecution::new("SELECT 1", Duration::from_millis(1));
490        metrics.record(&fingerprint, &execution, QueryIntent::Retrieval);
491
492        // Record failed query
493        let execution = QueryExecution::new("SELECT 1", Duration::from_millis(1))
494            .with_error("Connection refused");
495        metrics.record(&fingerprint, &execution, QueryIntent::Retrieval);
496
497        let snapshot = metrics.snapshot();
498        assert_eq!(snapshot.total_queries, 2);
499        assert_eq!(snapshot.total_errors, 1);
500        assert!((snapshot.error_rate - 0.5).abs() < 0.001);
501    }
502
503    #[test]
504    fn test_metrics_reset() {
505        let metrics = AnalyticsMetrics::new();
506        let fp = QueryFingerprinter::new();
507
508        let fingerprint = fp.fingerprint("SELECT 1");
509        let execution = QueryExecution::new("SELECT 1", Duration::from_millis(1));
510        metrics.record(&fingerprint, &execution, QueryIntent::Retrieval);
511
512        metrics.reset();
513
514        let snapshot = metrics.snapshot();
515        assert_eq!(snapshot.total_queries, 0);
516    }
517
518    #[test]
519    fn test_recent_queries() {
520        let metrics = AnalyticsMetrics::with_max_recent(5);
521        let fp = QueryFingerprinter::new();
522
523        // Record 10 queries
524        for i in 0..10 {
525            let query = format!("SELECT {}", i);
526            let fingerprint = fp.fingerprint(&query);
527            let execution = QueryExecution::new(query, Duration::from_millis(1));
528            metrics.record(&fingerprint, &execution, QueryIntent::Retrieval);
529        }
530
531        // Should only keep last 5
532        let recent = metrics.recent_queries(10);
533        assert_eq!(recent.len(), 5);
534    }
535
536    #[test]
537    fn test_cache_hit_recording() {
538        let metrics = AnalyticsMetrics::new();
539
540        // Record cache hits
541        for _ in 0..5 {
542            metrics.record_cache_hit(QueryIntent::Retrieval);
543        }
544
545        let by_intent = metrics.by_intent();
546        if let Some(stats) = by_intent.get(&QueryIntent::Retrieval) {
547            assert_eq!(stats.calls, 0); // Only recorded cache hits, no queries
548        }
549    }
550}