Skip to main content

heliosdb_proxy/analytics/
statistics.rs

1//! Query Statistics
2//!
3//! Track execution statistics per query fingerprint.
4
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::time::Duration;
7
8use dashmap::DashMap;
9
10use super::fingerprinter::{OperationType, QueryFingerprint};
11use super::histogram::LatencyHistogram;
12use super::OrderBy;
13
14/// Query execution record
15#[derive(Debug, Clone)]
16pub struct QueryExecution {
17    /// Query text
18    pub query: String,
19
20    /// Execution duration
21    pub duration: Duration,
22
23    /// Rows returned/affected
24    pub rows: usize,
25
26    /// Error message (if failed)
27    pub error: Option<String>,
28
29    /// User who executed the query
30    pub user: String,
31
32    /// Client IP address
33    pub client_ip: String,
34
35    /// Database name
36    pub database: String,
37
38    /// Node that executed the query
39    pub node: String,
40
41    /// Session ID (for pattern detection)
42    pub session_id: Option<String>,
43
44    /// Workflow ID (for tracing)
45    pub workflow_id: Option<String>,
46
47    /// Query parameters (if tracking enabled)
48    pub parameters: Option<Vec<String>>,
49}
50
51impl QueryExecution {
52    /// Create a new execution record
53    pub fn new(query: impl Into<String>, duration: Duration) -> Self {
54        Self {
55            query: query.into(),
56            duration,
57            rows: 0,
58            error: None,
59            user: "unknown".to_string(),
60            client_ip: "unknown".to_string(),
61            database: "default".to_string(),
62            node: "primary".to_string(),
63            session_id: None,
64            workflow_id: None,
65            parameters: None,
66        }
67    }
68
69    pub fn with_rows(mut self, rows: usize) -> Self {
70        self.rows = rows;
71        self
72    }
73
74    pub fn with_error(mut self, error: impl Into<String>) -> Self {
75        self.error = Some(error.into());
76        self
77    }
78
79    pub fn with_user(mut self, user: impl Into<String>) -> Self {
80        self.user = user.into();
81        self
82    }
83
84    pub fn with_client_ip(mut self, ip: impl Into<String>) -> Self {
85        self.client_ip = ip.into();
86        self
87    }
88
89    pub fn with_database(mut self, db: impl Into<String>) -> Self {
90        self.database = db.into();
91        self
92    }
93
94    pub fn with_node(mut self, node: impl Into<String>) -> Self {
95        self.node = node.into();
96        self
97    }
98
99    pub fn with_session(mut self, session: impl Into<String>) -> Self {
100        self.session_id = Some(session.into());
101        self
102    }
103
104    pub fn with_workflow(mut self, workflow: impl Into<String>) -> Self {
105        self.workflow_id = Some(workflow.into());
106        self
107    }
108
109    /// Check if this execution failed
110    pub fn is_error(&self) -> bool {
111        self.error.is_some()
112    }
113}
114
115/// Statistics for a single query fingerprint
116pub struct QueryStatistics {
117    /// Query fingerprint
118    fingerprint: QueryFingerprint,
119
120    /// Call count
121    calls: AtomicU64,
122
123    /// Total execution time (microseconds)
124    total_time_us: AtomicU64,
125
126    /// Minimum execution time (microseconds)
127    min_time_us: AtomicU64,
128
129    /// Maximum execution time (microseconds)
130    max_time_us: AtomicU64,
131
132    /// Total rows returned
133    rows: AtomicU64,
134
135    /// Error count
136    errors: AtomicU64,
137
138    /// Latency histogram
139    histogram: LatencyHistogram,
140
141    /// First seen timestamp (nanos since epoch)
142    first_seen: AtomicU64,
143
144    /// Last seen timestamp (nanos since epoch)
145    last_seen: AtomicU64,
146
147    /// Per-user call counts
148    users: DashMap<String, AtomicU64>,
149
150    /// Per-client call counts
151    clients: DashMap<String, AtomicU64>,
152
153    /// Per-database call counts
154    databases: DashMap<String, AtomicU64>,
155}
156
157impl QueryStatistics {
158    /// Create new statistics for a fingerprint
159    pub fn new(fingerprint: QueryFingerprint) -> Self {
160        let now = now_nanos();
161        Self {
162            fingerprint,
163            calls: AtomicU64::new(0),
164            total_time_us: AtomicU64::new(0),
165            min_time_us: AtomicU64::new(u64::MAX),
166            max_time_us: AtomicU64::new(0),
167            rows: AtomicU64::new(0),
168            errors: AtomicU64::new(0),
169            histogram: LatencyHistogram::new(),
170            first_seen: AtomicU64::new(now),
171            last_seen: AtomicU64::new(now),
172            users: DashMap::new(),
173            clients: DashMap::new(),
174            databases: DashMap::new(),
175        }
176    }
177
178    /// Record an execution
179    pub fn record(&self, execution: &QueryExecution) {
180        self.calls.fetch_add(1, Ordering::Relaxed);
181
182        let duration_us = execution.duration.as_micros() as u64;
183        self.total_time_us.fetch_add(duration_us, Ordering::Relaxed);
184        self.rows
185            .fetch_add(execution.rows as u64, Ordering::Relaxed);
186
187        if execution.error.is_some() {
188            self.errors.fetch_add(1, Ordering::Relaxed);
189        }
190
191        // Update min/max
192        self.update_min(duration_us);
193        self.update_max(duration_us);
194
195        // Record in histogram
196        self.histogram.record(execution.duration);
197
198        // Update last seen
199        self.last_seen.store(now_nanos(), Ordering::Relaxed);
200
201        // User attribution
202        self.users
203            .entry(execution.user.clone())
204            .or_insert_with(|| AtomicU64::new(0))
205            .fetch_add(1, Ordering::Relaxed);
206
207        // Client attribution
208        self.clients
209            .entry(execution.client_ip.clone())
210            .or_insert_with(|| AtomicU64::new(0))
211            .fetch_add(1, Ordering::Relaxed);
212
213        // Database attribution
214        self.databases
215            .entry(execution.database.clone())
216            .or_insert_with(|| AtomicU64::new(0))
217            .fetch_add(1, Ordering::Relaxed);
218    }
219
220    fn update_min(&self, value: u64) {
221        let mut current = self.min_time_us.load(Ordering::Relaxed);
222        while value < current {
223            match self.min_time_us.compare_exchange_weak(
224                current,
225                value,
226                Ordering::SeqCst,
227                Ordering::Relaxed,
228            ) {
229                Ok(_) => break,
230                Err(c) => current = c,
231            }
232        }
233    }
234
235    fn update_max(&self, value: u64) {
236        let mut current = self.max_time_us.load(Ordering::Relaxed);
237        while value > current {
238            match self.max_time_us.compare_exchange_weak(
239                current,
240                value,
241                Ordering::SeqCst,
242                Ordering::Relaxed,
243            ) {
244                Ok(_) => break,
245                Err(c) => current = c,
246            }
247        }
248    }
249
250    /// Get fingerprint
251    pub fn fingerprint(&self) -> &QueryFingerprint {
252        &self.fingerprint
253    }
254
255    /// Get call count
256    pub fn calls(&self) -> u64 {
257        self.calls.load(Ordering::Relaxed)
258    }
259
260    /// Get average execution time
261    pub fn avg_time(&self) -> Duration {
262        let total = self.total_time_us.load(Ordering::Relaxed);
263        let calls = self.calls.load(Ordering::Relaxed);
264        Duration::from_micros(total / calls.max(1))
265    }
266
267    /// Get total execution time
268    pub fn total_time(&self) -> Duration {
269        Duration::from_micros(self.total_time_us.load(Ordering::Relaxed))
270    }
271
272    /// Get min execution time
273    pub fn min_time(&self) -> Duration {
274        let min = self.min_time_us.load(Ordering::Relaxed);
275        if min == u64::MAX {
276            Duration::ZERO
277        } else {
278            Duration::from_micros(min)
279        }
280    }
281
282    /// Get max execution time
283    pub fn max_time(&self) -> Duration {
284        Duration::from_micros(self.max_time_us.load(Ordering::Relaxed))
285    }
286
287    /// Get total rows
288    pub fn rows(&self) -> u64 {
289        self.rows.load(Ordering::Relaxed)
290    }
291
292    /// Get error count
293    pub fn errors(&self) -> u64 {
294        self.errors.load(Ordering::Relaxed)
295    }
296
297    /// Get P50 latency
298    pub fn p50(&self) -> Duration {
299        self.histogram.percentile(0.50)
300    }
301
302    /// Get P90 latency
303    pub fn p90(&self) -> Duration {
304        self.histogram.percentile(0.90)
305    }
306
307    /// Get P99 latency
308    pub fn p99(&self) -> Duration {
309        self.histogram.percentile(0.99)
310    }
311
312    /// Get error rate
313    pub fn error_rate(&self) -> f64 {
314        let calls = self.calls() as f64;
315        if calls == 0.0 {
316            return 0.0;
317        }
318        self.errors() as f64 / calls
319    }
320
321    /// Convert to QueryStats
322    pub fn to_stats(&self) -> QueryStats {
323        QueryStats {
324            fingerprint_hash: self.fingerprint.hash,
325            normalized: self.fingerprint.normalized.clone(),
326            tables: self.fingerprint.tables.clone(),
327            operation: self.fingerprint.operation,
328            calls: self.calls(),
329            total_time: self.total_time(),
330            avg_time: self.avg_time(),
331            min_time: self.min_time(),
332            max_time: self.max_time(),
333            rows: self.rows(),
334            errors: self.errors(),
335            error_rate: self.error_rate(),
336            p50: self.p50(),
337            p90: self.p90(),
338            p99: self.p99(),
339            first_seen_nanos: self.first_seen.load(Ordering::Relaxed),
340            last_seen_nanos: self.last_seen.load(Ordering::Relaxed),
341        }
342    }
343}
344
345/// Query stats (snapshot of statistics)
346#[derive(Debug, Clone)]
347pub struct QueryStats {
348    pub fingerprint_hash: u64,
349    pub normalized: String,
350    pub tables: Vec<String>,
351    pub operation: OperationType,
352    pub calls: u64,
353    pub total_time: Duration,
354    pub avg_time: Duration,
355    pub min_time: Duration,
356    pub max_time: Duration,
357    pub rows: u64,
358    pub errors: u64,
359    pub error_rate: f64,
360    pub p50: Duration,
361    pub p90: Duration,
362    pub p99: Duration,
363    pub first_seen_nanos: u64,
364    pub last_seen_nanos: u64,
365}
366
367impl QueryStats {
368    /// Get fingerprint short ID
369    pub fn short_id(&self) -> String {
370        format!("{:016x}", self.fingerprint_hash)
371    }
372}
373
374/// Statistics store (all fingerprints)
375pub struct StatisticsStore {
376    /// Statistics by fingerprint hash
377    stats: DashMap<u64, QueryStatistics>,
378
379    /// Maximum fingerprints to track
380    max_fingerprints: usize,
381}
382
383impl StatisticsStore {
384    /// Create new statistics store
385    pub fn new(max_fingerprints: usize) -> Self {
386        Self {
387            stats: DashMap::new(),
388            max_fingerprints,
389        }
390    }
391
392    /// Record execution for a fingerprint
393    pub fn record(&self, fingerprint: &QueryFingerprint, execution: &QueryExecution) {
394        // Enforce max fingerprints before entering the entry API
395        // (reading len() inside or_insert_with would deadlock on DashMap)
396        if !self.stats.contains_key(&fingerprint.hash) && self.stats.len() >= self.max_fingerprints
397        {
398            self.evict_oldest();
399        }
400
401        let stats = self
402            .stats
403            .entry(fingerprint.hash)
404            .or_insert_with(|| QueryStatistics::new(fingerprint.clone()));
405
406        stats.record(execution);
407    }
408
409    /// Get statistics for a fingerprint
410    pub fn get(&self, fingerprint_hash: u64) -> Option<QueryStats> {
411        self.stats.get(&fingerprint_hash).map(|s| s.to_stats())
412    }
413
414    /// Get top queries by metric
415    pub fn top(&self, order_by: OrderBy, limit: usize) -> Vec<QueryStats> {
416        let mut all: Vec<_> = self.stats.iter().map(|r| r.to_stats()).collect();
417
418        match order_by {
419            OrderBy::TotalTime => all.sort_by_key(|b| std::cmp::Reverse(b.total_time)),
420            OrderBy::AvgTime => all.sort_by_key(|b| std::cmp::Reverse(b.avg_time)),
421            OrderBy::Calls => all.sort_by_key(|b| std::cmp::Reverse(b.calls)),
422            OrderBy::Errors => all.sort_by_key(|b| std::cmp::Reverse(b.errors)),
423            OrderBy::P99Time => all.sort_by_key(|b| std::cmp::Reverse(b.p99)),
424            OrderBy::Rows => all.sort_by_key(|b| std::cmp::Reverse(b.rows)),
425        }
426
427        all.truncate(limit);
428        all
429    }
430
431    /// Get all statistics
432    pub fn all(&self) -> Vec<QueryStats> {
433        self.stats.iter().map(|r| r.to_stats()).collect()
434    }
435
436    /// Get count of tracked fingerprints
437    pub fn count(&self) -> usize {
438        self.stats.len()
439    }
440
441    /// Reset all statistics
442    pub fn reset(&self) {
443        self.stats.clear();
444    }
445
446    /// Evict oldest fingerprint
447    fn evict_oldest(&self) {
448        let oldest = self
449            .stats
450            .iter()
451            .min_by_key(|r| r.last_seen.load(Ordering::Relaxed))
452            .map(|r| *r.key());
453
454        if let Some(hash) = oldest {
455            self.stats.remove(&hash);
456        }
457    }
458}
459
460fn now_nanos() -> u64 {
461    use std::time::SystemTime;
462    SystemTime::now()
463        .duration_since(SystemTime::UNIX_EPOCH)
464        .map(|d| d.as_nanos() as u64)
465        .unwrap_or(0)
466}
467
468#[cfg(test)]
469mod tests {
470    use super::*;
471
472    #[test]
473    fn test_query_execution_builder() {
474        let exec = QueryExecution::new("SELECT 1", Duration::from_millis(5))
475            .with_rows(1)
476            .with_user("alice")
477            .with_database("test");
478
479        assert_eq!(exec.rows, 1);
480        assert_eq!(exec.user, "alice");
481        assert_eq!(exec.database, "test");
482    }
483
484    #[test]
485    fn test_query_statistics_record() {
486        use crate::analytics::fingerprinter::QueryFingerprinter;
487
488        let fp = QueryFingerprinter::new();
489        let fingerprint = fp.fingerprint("SELECT * FROM users WHERE id = 1");
490        let stats = QueryStatistics::new(fingerprint);
491
492        let exec =
493            QueryExecution::new("SELECT * FROM users WHERE id = 1", Duration::from_millis(5))
494                .with_rows(1);
495
496        stats.record(&exec);
497        stats.record(&exec);
498
499        assert_eq!(stats.calls(), 2);
500        assert_eq!(stats.rows(), 2);
501    }
502
503    #[test]
504    fn test_statistics_store() {
505        use crate::analytics::fingerprinter::QueryFingerprinter;
506
507        let store = StatisticsStore::new(100);
508        let fp = QueryFingerprinter::new();
509
510        let fingerprint = fp.fingerprint("SELECT * FROM users WHERE id = 1");
511        let exec =
512            QueryExecution::new("SELECT * FROM users WHERE id = 1", Duration::from_millis(5));
513
514        store.record(&fingerprint, &exec);
515        store.record(&fingerprint, &exec);
516
517        let stats = store.get(fingerprint.hash).unwrap();
518        assert_eq!(stats.calls, 2);
519    }
520
521    #[test]
522    fn test_top_queries() {
523        use crate::analytics::fingerprinter::QueryFingerprinter;
524
525        let store = StatisticsStore::new(100);
526        let fp = QueryFingerprinter::new();
527
528        // Query 1: 10 calls
529        let fp1 = fp.fingerprint("SELECT * FROM users");
530        for _ in 0..10 {
531            let exec = QueryExecution::new("SELECT * FROM users", Duration::from_millis(1));
532            store.record(&fp1, &exec);
533        }
534
535        // Query 2: 5 calls
536        let fp2 = fp.fingerprint("SELECT * FROM orders");
537        for _ in 0..5 {
538            let exec = QueryExecution::new("SELECT * FROM orders", Duration::from_millis(1));
539            store.record(&fp2, &exec);
540        }
541
542        let top = store.top(OrderBy::Calls, 10);
543        assert_eq!(top.len(), 2);
544        assert_eq!(top[0].calls, 10);
545        assert_eq!(top[1].calls, 5);
546    }
547}