Skip to main content

heliosdb_proxy/analytics/
statistics.rs

1//! Query Statistics
2//!
3//! Track execution statistics per query fingerprint.
4
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::time::Duration;
7
8use dashmap::DashMap;
9
10use super::fingerprinter::{QueryFingerprint, OperationType};
11use super::histogram::LatencyHistogram;
12use super::OrderBy;
13
14/// Query execution record
15#[derive(Debug, Clone)]
16pub struct QueryExecution {
17    /// Query text
18    pub query: String,
19
20    /// Execution duration
21    pub duration: Duration,
22
23    /// Rows returned/affected
24    pub rows: usize,
25
26    /// Error message (if failed)
27    pub error: Option<String>,
28
29    /// User who executed the query
30    pub user: String,
31
32    /// Client IP address
33    pub client_ip: String,
34
35    /// Database name
36    pub database: String,
37
38    /// Node that executed the query
39    pub node: String,
40
41    /// Session ID (for pattern detection)
42    pub session_id: Option<String>,
43
44    /// Workflow ID (for tracing)
45    pub workflow_id: Option<String>,
46
47    /// Query parameters (if tracking enabled)
48    pub parameters: Option<Vec<String>>,
49}
50
51impl QueryExecution {
52    /// Create a new execution record
53    pub fn new(query: impl Into<String>, duration: Duration) -> Self {
54        Self {
55            query: query.into(),
56            duration,
57            rows: 0,
58            error: None,
59            user: "unknown".to_string(),
60            client_ip: "unknown".to_string(),
61            database: "default".to_string(),
62            node: "primary".to_string(),
63            session_id: None,
64            workflow_id: None,
65            parameters: None,
66        }
67    }
68
69    pub fn with_rows(mut self, rows: usize) -> Self {
70        self.rows = rows;
71        self
72    }
73
74    pub fn with_error(mut self, error: impl Into<String>) -> Self {
75        self.error = Some(error.into());
76        self
77    }
78
79    pub fn with_user(mut self, user: impl Into<String>) -> Self {
80        self.user = user.into();
81        self
82    }
83
84    pub fn with_client_ip(mut self, ip: impl Into<String>) -> Self {
85        self.client_ip = ip.into();
86        self
87    }
88
89    pub fn with_database(mut self, db: impl Into<String>) -> Self {
90        self.database = db.into();
91        self
92    }
93
94    pub fn with_node(mut self, node: impl Into<String>) -> Self {
95        self.node = node.into();
96        self
97    }
98
99    pub fn with_session(mut self, session: impl Into<String>) -> Self {
100        self.session_id = Some(session.into());
101        self
102    }
103
104    pub fn with_workflow(mut self, workflow: impl Into<String>) -> Self {
105        self.workflow_id = Some(workflow.into());
106        self
107    }
108
109    /// Check if this execution failed
110    pub fn is_error(&self) -> bool {
111        self.error.is_some()
112    }
113}
114
115/// Statistics for a single query fingerprint
116pub struct QueryStatistics {
117    /// Query fingerprint
118    fingerprint: QueryFingerprint,
119
120    /// Call count
121    calls: AtomicU64,
122
123    /// Total execution time (microseconds)
124    total_time_us: AtomicU64,
125
126    /// Minimum execution time (microseconds)
127    min_time_us: AtomicU64,
128
129    /// Maximum execution time (microseconds)
130    max_time_us: AtomicU64,
131
132    /// Total rows returned
133    rows: AtomicU64,
134
135    /// Error count
136    errors: AtomicU64,
137
138    /// Latency histogram
139    histogram: LatencyHistogram,
140
141    /// First seen timestamp (nanos since epoch)
142    first_seen: AtomicU64,
143
144    /// Last seen timestamp (nanos since epoch)
145    last_seen: AtomicU64,
146
147    /// Per-user call counts
148    users: DashMap<String, AtomicU64>,
149
150    /// Per-client call counts
151    clients: DashMap<String, AtomicU64>,
152
153    /// Per-database call counts
154    databases: DashMap<String, AtomicU64>,
155}
156
157impl QueryStatistics {
158    /// Create new statistics for a fingerprint
159    pub fn new(fingerprint: QueryFingerprint) -> Self {
160        let now = now_nanos();
161        Self {
162            fingerprint,
163            calls: AtomicU64::new(0),
164            total_time_us: AtomicU64::new(0),
165            min_time_us: AtomicU64::new(u64::MAX),
166            max_time_us: AtomicU64::new(0),
167            rows: AtomicU64::new(0),
168            errors: AtomicU64::new(0),
169            histogram: LatencyHistogram::new(),
170            first_seen: AtomicU64::new(now),
171            last_seen: AtomicU64::new(now),
172            users: DashMap::new(),
173            clients: DashMap::new(),
174            databases: DashMap::new(),
175        }
176    }
177
178    /// Record an execution
179    pub fn record(&self, execution: &QueryExecution) {
180        self.calls.fetch_add(1, Ordering::Relaxed);
181
182        let duration_us = execution.duration.as_micros() as u64;
183        self.total_time_us.fetch_add(duration_us, Ordering::Relaxed);
184        self.rows.fetch_add(execution.rows as u64, Ordering::Relaxed);
185
186        if execution.error.is_some() {
187            self.errors.fetch_add(1, Ordering::Relaxed);
188        }
189
190        // Update min/max
191        self.update_min(duration_us);
192        self.update_max(duration_us);
193
194        // Record in histogram
195        self.histogram.record(execution.duration);
196
197        // Update last seen
198        self.last_seen.store(now_nanos(), Ordering::Relaxed);
199
200        // User attribution
201        self.users
202            .entry(execution.user.clone())
203            .or_insert_with(|| AtomicU64::new(0))
204            .fetch_add(1, Ordering::Relaxed);
205
206        // Client attribution
207        self.clients
208            .entry(execution.client_ip.clone())
209            .or_insert_with(|| AtomicU64::new(0))
210            .fetch_add(1, Ordering::Relaxed);
211
212        // Database attribution
213        self.databases
214            .entry(execution.database.clone())
215            .or_insert_with(|| AtomicU64::new(0))
216            .fetch_add(1, Ordering::Relaxed);
217    }
218
219    fn update_min(&self, value: u64) {
220        let mut current = self.min_time_us.load(Ordering::Relaxed);
221        while value < current {
222            match self.min_time_us.compare_exchange_weak(
223                current,
224                value,
225                Ordering::SeqCst,
226                Ordering::Relaxed,
227            ) {
228                Ok(_) => break,
229                Err(c) => current = c,
230            }
231        }
232    }
233
234    fn update_max(&self, value: u64) {
235        let mut current = self.max_time_us.load(Ordering::Relaxed);
236        while value > current {
237            match self.max_time_us.compare_exchange_weak(
238                current,
239                value,
240                Ordering::SeqCst,
241                Ordering::Relaxed,
242            ) {
243                Ok(_) => break,
244                Err(c) => current = c,
245            }
246        }
247    }
248
249    /// Get fingerprint
250    pub fn fingerprint(&self) -> &QueryFingerprint {
251        &self.fingerprint
252    }
253
254    /// Get call count
255    pub fn calls(&self) -> u64 {
256        self.calls.load(Ordering::Relaxed)
257    }
258
259    /// Get average execution time
260    pub fn avg_time(&self) -> Duration {
261        let total = self.total_time_us.load(Ordering::Relaxed);
262        let calls = self.calls.load(Ordering::Relaxed);
263        Duration::from_micros(total / calls.max(1))
264    }
265
266    /// Get total execution time
267    pub fn total_time(&self) -> Duration {
268        Duration::from_micros(self.total_time_us.load(Ordering::Relaxed))
269    }
270
271    /// Get min execution time
272    pub fn min_time(&self) -> Duration {
273        let min = self.min_time_us.load(Ordering::Relaxed);
274        if min == u64::MAX {
275            Duration::ZERO
276        } else {
277            Duration::from_micros(min)
278        }
279    }
280
281    /// Get max execution time
282    pub fn max_time(&self) -> Duration {
283        Duration::from_micros(self.max_time_us.load(Ordering::Relaxed))
284    }
285
286    /// Get total rows
287    pub fn rows(&self) -> u64 {
288        self.rows.load(Ordering::Relaxed)
289    }
290
291    /// Get error count
292    pub fn errors(&self) -> u64 {
293        self.errors.load(Ordering::Relaxed)
294    }
295
296    /// Get P50 latency
297    pub fn p50(&self) -> Duration {
298        self.histogram.percentile(0.50)
299    }
300
301    /// Get P90 latency
302    pub fn p90(&self) -> Duration {
303        self.histogram.percentile(0.90)
304    }
305
306    /// Get P99 latency
307    pub fn p99(&self) -> Duration {
308        self.histogram.percentile(0.99)
309    }
310
311    /// Get error rate
312    pub fn error_rate(&self) -> f64 {
313        let calls = self.calls() as f64;
314        if calls == 0.0 {
315            return 0.0;
316        }
317        self.errors() as f64 / calls
318    }
319
320    /// Convert to QueryStats
321    pub fn to_stats(&self) -> QueryStats {
322        QueryStats {
323            fingerprint_hash: self.fingerprint.hash,
324            normalized: self.fingerprint.normalized.clone(),
325            tables: self.fingerprint.tables.clone(),
326            operation: self.fingerprint.operation,
327            calls: self.calls(),
328            total_time: self.total_time(),
329            avg_time: self.avg_time(),
330            min_time: self.min_time(),
331            max_time: self.max_time(),
332            rows: self.rows(),
333            errors: self.errors(),
334            error_rate: self.error_rate(),
335            p50: self.p50(),
336            p90: self.p90(),
337            p99: self.p99(),
338            first_seen_nanos: self.first_seen.load(Ordering::Relaxed),
339            last_seen_nanos: self.last_seen.load(Ordering::Relaxed),
340        }
341    }
342}
343
344/// Query stats (snapshot of statistics)
345#[derive(Debug, Clone)]
346pub struct QueryStats {
347    pub fingerprint_hash: u64,
348    pub normalized: String,
349    pub tables: Vec<String>,
350    pub operation: OperationType,
351    pub calls: u64,
352    pub total_time: Duration,
353    pub avg_time: Duration,
354    pub min_time: Duration,
355    pub max_time: Duration,
356    pub rows: u64,
357    pub errors: u64,
358    pub error_rate: f64,
359    pub p50: Duration,
360    pub p90: Duration,
361    pub p99: Duration,
362    pub first_seen_nanos: u64,
363    pub last_seen_nanos: u64,
364}
365
366impl QueryStats {
367    /// Get fingerprint short ID
368    pub fn short_id(&self) -> String {
369        format!("{:016x}", self.fingerprint_hash)
370    }
371}
372
373/// Statistics store (all fingerprints)
374pub struct StatisticsStore {
375    /// Statistics by fingerprint hash
376    stats: DashMap<u64, QueryStatistics>,
377
378    /// Maximum fingerprints to track
379    max_fingerprints: usize,
380}
381
382impl StatisticsStore {
383    /// Create new statistics store
384    pub fn new(max_fingerprints: usize) -> Self {
385        Self {
386            stats: DashMap::new(),
387            max_fingerprints,
388        }
389    }
390
391    /// Record execution for a fingerprint
392    pub fn record(&self, fingerprint: &QueryFingerprint, execution: &QueryExecution) {
393        // Enforce max fingerprints before entering the entry API
394        // (reading len() inside or_insert_with would deadlock on DashMap)
395        if !self.stats.contains_key(&fingerprint.hash)
396            && self.stats.len() >= self.max_fingerprints
397        {
398            self.evict_oldest();
399        }
400
401        let stats = self.stats.entry(fingerprint.hash)
402            .or_insert_with(|| QueryStatistics::new(fingerprint.clone()));
403
404        stats.record(execution);
405    }
406
407    /// Get statistics for a fingerprint
408    pub fn get(&self, fingerprint_hash: u64) -> Option<QueryStats> {
409        self.stats.get(&fingerprint_hash).map(|s| s.to_stats())
410    }
411
412    /// Get top queries by metric
413    pub fn top(&self, order_by: OrderBy, limit: usize) -> Vec<QueryStats> {
414        let mut all: Vec<_> = self.stats.iter().map(|r| r.to_stats()).collect();
415
416        match order_by {
417            OrderBy::TotalTime => all.sort_by(|a, b| b.total_time.cmp(&a.total_time)),
418            OrderBy::AvgTime => all.sort_by(|a, b| b.avg_time.cmp(&a.avg_time)),
419            OrderBy::Calls => all.sort_by(|a, b| b.calls.cmp(&a.calls)),
420            OrderBy::Errors => all.sort_by(|a, b| b.errors.cmp(&a.errors)),
421            OrderBy::P99Time => all.sort_by(|a, b| b.p99.cmp(&a.p99)),
422            OrderBy::Rows => all.sort_by(|a, b| b.rows.cmp(&a.rows)),
423        }
424
425        all.truncate(limit);
426        all
427    }
428
429    /// Get all statistics
430    pub fn all(&self) -> Vec<QueryStats> {
431        self.stats.iter().map(|r| r.to_stats()).collect()
432    }
433
434    /// Get count of tracked fingerprints
435    pub fn count(&self) -> usize {
436        self.stats.len()
437    }
438
439    /// Reset all statistics
440    pub fn reset(&self) {
441        self.stats.clear();
442    }
443
444    /// Evict oldest fingerprint
445    fn evict_oldest(&self) {
446        let oldest = self
447            .stats
448            .iter()
449            .min_by_key(|r| r.last_seen.load(Ordering::Relaxed))
450            .map(|r| *r.key());
451
452        if let Some(hash) = oldest {
453            self.stats.remove(&hash);
454        }
455    }
456}
457
458fn now_nanos() -> u64 {
459    use std::time::SystemTime;
460    SystemTime::now()
461        .duration_since(SystemTime::UNIX_EPOCH)
462        .map(|d| d.as_nanos() as u64)
463        .unwrap_or(0)
464}
465
466#[cfg(test)]
467mod tests {
468    use super::*;
469
470    #[test]
471    fn test_query_execution_builder() {
472        let exec = QueryExecution::new("SELECT 1", Duration::from_millis(5))
473            .with_rows(1)
474            .with_user("alice")
475            .with_database("test");
476
477        assert_eq!(exec.rows, 1);
478        assert_eq!(exec.user, "alice");
479        assert_eq!(exec.database, "test");
480    }
481
482    #[test]
483    fn test_query_statistics_record() {
484        use crate::analytics::fingerprinter::QueryFingerprinter;
485
486        let fp = QueryFingerprinter::new();
487        let fingerprint = fp.fingerprint("SELECT * FROM users WHERE id = 1");
488        let stats = QueryStatistics::new(fingerprint);
489
490        let exec = QueryExecution::new("SELECT * FROM users WHERE id = 1", Duration::from_millis(5))
491            .with_rows(1);
492
493        stats.record(&exec);
494        stats.record(&exec);
495
496        assert_eq!(stats.calls(), 2);
497        assert_eq!(stats.rows(), 2);
498    }
499
500    #[test]
501    fn test_statistics_store() {
502        use crate::analytics::fingerprinter::QueryFingerprinter;
503
504        let store = StatisticsStore::new(100);
505        let fp = QueryFingerprinter::new();
506
507        let fingerprint = fp.fingerprint("SELECT * FROM users WHERE id = 1");
508        let exec = QueryExecution::new("SELECT * FROM users WHERE id = 1", Duration::from_millis(5));
509
510        store.record(&fingerprint, &exec);
511        store.record(&fingerprint, &exec);
512
513        let stats = store.get(fingerprint.hash).unwrap();
514        assert_eq!(stats.calls, 2);
515    }
516
517    #[test]
518    fn test_top_queries() {
519        use crate::analytics::fingerprinter::QueryFingerprinter;
520
521        let store = StatisticsStore::new(100);
522        let fp = QueryFingerprinter::new();
523
524        // Query 1: 10 calls
525        let fp1 = fp.fingerprint("SELECT * FROM users");
526        for _ in 0..10 {
527            let exec = QueryExecution::new("SELECT * FROM users", Duration::from_millis(1));
528            store.record(&fp1, &exec);
529        }
530
531        // Query 2: 5 calls
532        let fp2 = fp.fingerprint("SELECT * FROM orders");
533        for _ in 0..5 {
534            let exec = QueryExecution::new("SELECT * FROM orders", Duration::from_millis(1));
535            store.record(&fp2, &exec);
536        }
537
538        let top = store.top(OrderBy::Calls, 10);
539        assert_eq!(top.len(), 2);
540        assert_eq!(top[0].calls, 10);
541        assert_eq!(top[1].calls, 5);
542    }
543}