heliosdb_proxy/analytics/
statistics.rs1use std::sync::atomic::{AtomicU64, Ordering};
6use std::time::Duration;
7
8use dashmap::DashMap;
9
10use super::fingerprinter::{QueryFingerprint, OperationType};
11use super::histogram::LatencyHistogram;
12use super::OrderBy;
13
14#[derive(Debug, Clone)]
16pub struct QueryExecution {
17 pub query: String,
19
20 pub duration: Duration,
22
23 pub rows: usize,
25
26 pub error: Option<String>,
28
29 pub user: String,
31
32 pub client_ip: String,
34
35 pub database: String,
37
38 pub node: String,
40
41 pub session_id: Option<String>,
43
44 pub workflow_id: Option<String>,
46
47 pub parameters: Option<Vec<String>>,
49}
50
51impl QueryExecution {
52 pub fn new(query: impl Into<String>, duration: Duration) -> Self {
54 Self {
55 query: query.into(),
56 duration,
57 rows: 0,
58 error: None,
59 user: "unknown".to_string(),
60 client_ip: "unknown".to_string(),
61 database: "default".to_string(),
62 node: "primary".to_string(),
63 session_id: None,
64 workflow_id: None,
65 parameters: None,
66 }
67 }
68
69 pub fn with_rows(mut self, rows: usize) -> Self {
70 self.rows = rows;
71 self
72 }
73
74 pub fn with_error(mut self, error: impl Into<String>) -> Self {
75 self.error = Some(error.into());
76 self
77 }
78
79 pub fn with_user(mut self, user: impl Into<String>) -> Self {
80 self.user = user.into();
81 self
82 }
83
84 pub fn with_client_ip(mut self, ip: impl Into<String>) -> Self {
85 self.client_ip = ip.into();
86 self
87 }
88
89 pub fn with_database(mut self, db: impl Into<String>) -> Self {
90 self.database = db.into();
91 self
92 }
93
94 pub fn with_node(mut self, node: impl Into<String>) -> Self {
95 self.node = node.into();
96 self
97 }
98
99 pub fn with_session(mut self, session: impl Into<String>) -> Self {
100 self.session_id = Some(session.into());
101 self
102 }
103
104 pub fn with_workflow(mut self, workflow: impl Into<String>) -> Self {
105 self.workflow_id = Some(workflow.into());
106 self
107 }
108
109 pub fn is_error(&self) -> bool {
111 self.error.is_some()
112 }
113}
114
115pub struct QueryStatistics {
117 fingerprint: QueryFingerprint,
119
120 calls: AtomicU64,
122
123 total_time_us: AtomicU64,
125
126 min_time_us: AtomicU64,
128
129 max_time_us: AtomicU64,
131
132 rows: AtomicU64,
134
135 errors: AtomicU64,
137
138 histogram: LatencyHistogram,
140
141 first_seen: AtomicU64,
143
144 last_seen: AtomicU64,
146
147 users: DashMap<String, AtomicU64>,
149
150 clients: DashMap<String, AtomicU64>,
152
153 databases: DashMap<String, AtomicU64>,
155}
156
157impl QueryStatistics {
158 pub fn new(fingerprint: QueryFingerprint) -> Self {
160 let now = now_nanos();
161 Self {
162 fingerprint,
163 calls: AtomicU64::new(0),
164 total_time_us: AtomicU64::new(0),
165 min_time_us: AtomicU64::new(u64::MAX),
166 max_time_us: AtomicU64::new(0),
167 rows: AtomicU64::new(0),
168 errors: AtomicU64::new(0),
169 histogram: LatencyHistogram::new(),
170 first_seen: AtomicU64::new(now),
171 last_seen: AtomicU64::new(now),
172 users: DashMap::new(),
173 clients: DashMap::new(),
174 databases: DashMap::new(),
175 }
176 }
177
178 pub fn record(&self, execution: &QueryExecution) {
180 self.calls.fetch_add(1, Ordering::Relaxed);
181
182 let duration_us = execution.duration.as_micros() as u64;
183 self.total_time_us.fetch_add(duration_us, Ordering::Relaxed);
184 self.rows.fetch_add(execution.rows as u64, Ordering::Relaxed);
185
186 if execution.error.is_some() {
187 self.errors.fetch_add(1, Ordering::Relaxed);
188 }
189
190 self.update_min(duration_us);
192 self.update_max(duration_us);
193
194 self.histogram.record(execution.duration);
196
197 self.last_seen.store(now_nanos(), Ordering::Relaxed);
199
200 self.users
202 .entry(execution.user.clone())
203 .or_insert_with(|| AtomicU64::new(0))
204 .fetch_add(1, Ordering::Relaxed);
205
206 self.clients
208 .entry(execution.client_ip.clone())
209 .or_insert_with(|| AtomicU64::new(0))
210 .fetch_add(1, Ordering::Relaxed);
211
212 self.databases
214 .entry(execution.database.clone())
215 .or_insert_with(|| AtomicU64::new(0))
216 .fetch_add(1, Ordering::Relaxed);
217 }
218
219 fn update_min(&self, value: u64) {
220 let mut current = self.min_time_us.load(Ordering::Relaxed);
221 while value < current {
222 match self.min_time_us.compare_exchange_weak(
223 current,
224 value,
225 Ordering::SeqCst,
226 Ordering::Relaxed,
227 ) {
228 Ok(_) => break,
229 Err(c) => current = c,
230 }
231 }
232 }
233
234 fn update_max(&self, value: u64) {
235 let mut current = self.max_time_us.load(Ordering::Relaxed);
236 while value > current {
237 match self.max_time_us.compare_exchange_weak(
238 current,
239 value,
240 Ordering::SeqCst,
241 Ordering::Relaxed,
242 ) {
243 Ok(_) => break,
244 Err(c) => current = c,
245 }
246 }
247 }
248
249 pub fn fingerprint(&self) -> &QueryFingerprint {
251 &self.fingerprint
252 }
253
254 pub fn calls(&self) -> u64 {
256 self.calls.load(Ordering::Relaxed)
257 }
258
259 pub fn avg_time(&self) -> Duration {
261 let total = self.total_time_us.load(Ordering::Relaxed);
262 let calls = self.calls.load(Ordering::Relaxed);
263 Duration::from_micros(total / calls.max(1))
264 }
265
266 pub fn total_time(&self) -> Duration {
268 Duration::from_micros(self.total_time_us.load(Ordering::Relaxed))
269 }
270
271 pub fn min_time(&self) -> Duration {
273 let min = self.min_time_us.load(Ordering::Relaxed);
274 if min == u64::MAX {
275 Duration::ZERO
276 } else {
277 Duration::from_micros(min)
278 }
279 }
280
281 pub fn max_time(&self) -> Duration {
283 Duration::from_micros(self.max_time_us.load(Ordering::Relaxed))
284 }
285
286 pub fn rows(&self) -> u64 {
288 self.rows.load(Ordering::Relaxed)
289 }
290
291 pub fn errors(&self) -> u64 {
293 self.errors.load(Ordering::Relaxed)
294 }
295
296 pub fn p50(&self) -> Duration {
298 self.histogram.percentile(0.50)
299 }
300
301 pub fn p90(&self) -> Duration {
303 self.histogram.percentile(0.90)
304 }
305
306 pub fn p99(&self) -> Duration {
308 self.histogram.percentile(0.99)
309 }
310
311 pub fn error_rate(&self) -> f64 {
313 let calls = self.calls() as f64;
314 if calls == 0.0 {
315 return 0.0;
316 }
317 self.errors() as f64 / calls
318 }
319
320 pub fn to_stats(&self) -> QueryStats {
322 QueryStats {
323 fingerprint_hash: self.fingerprint.hash,
324 normalized: self.fingerprint.normalized.clone(),
325 tables: self.fingerprint.tables.clone(),
326 operation: self.fingerprint.operation,
327 calls: self.calls(),
328 total_time: self.total_time(),
329 avg_time: self.avg_time(),
330 min_time: self.min_time(),
331 max_time: self.max_time(),
332 rows: self.rows(),
333 errors: self.errors(),
334 error_rate: self.error_rate(),
335 p50: self.p50(),
336 p90: self.p90(),
337 p99: self.p99(),
338 first_seen_nanos: self.first_seen.load(Ordering::Relaxed),
339 last_seen_nanos: self.last_seen.load(Ordering::Relaxed),
340 }
341 }
342}
343
344#[derive(Debug, Clone)]
346pub struct QueryStats {
347 pub fingerprint_hash: u64,
348 pub normalized: String,
349 pub tables: Vec<String>,
350 pub operation: OperationType,
351 pub calls: u64,
352 pub total_time: Duration,
353 pub avg_time: Duration,
354 pub min_time: Duration,
355 pub max_time: Duration,
356 pub rows: u64,
357 pub errors: u64,
358 pub error_rate: f64,
359 pub p50: Duration,
360 pub p90: Duration,
361 pub p99: Duration,
362 pub first_seen_nanos: u64,
363 pub last_seen_nanos: u64,
364}
365
366impl QueryStats {
367 pub fn short_id(&self) -> String {
369 format!("{:016x}", self.fingerprint_hash)
370 }
371}
372
373pub struct StatisticsStore {
375 stats: DashMap<u64, QueryStatistics>,
377
378 max_fingerprints: usize,
380}
381
382impl StatisticsStore {
383 pub fn new(max_fingerprints: usize) -> Self {
385 Self {
386 stats: DashMap::new(),
387 max_fingerprints,
388 }
389 }
390
391 pub fn record(&self, fingerprint: &QueryFingerprint, execution: &QueryExecution) {
393 if !self.stats.contains_key(&fingerprint.hash)
396 && self.stats.len() >= self.max_fingerprints
397 {
398 self.evict_oldest();
399 }
400
401 let stats = self.stats.entry(fingerprint.hash)
402 .or_insert_with(|| QueryStatistics::new(fingerprint.clone()));
403
404 stats.record(execution);
405 }
406
407 pub fn get(&self, fingerprint_hash: u64) -> Option<QueryStats> {
409 self.stats.get(&fingerprint_hash).map(|s| s.to_stats())
410 }
411
412 pub fn top(&self, order_by: OrderBy, limit: usize) -> Vec<QueryStats> {
414 let mut all: Vec<_> = self.stats.iter().map(|r| r.to_stats()).collect();
415
416 match order_by {
417 OrderBy::TotalTime => all.sort_by(|a, b| b.total_time.cmp(&a.total_time)),
418 OrderBy::AvgTime => all.sort_by(|a, b| b.avg_time.cmp(&a.avg_time)),
419 OrderBy::Calls => all.sort_by(|a, b| b.calls.cmp(&a.calls)),
420 OrderBy::Errors => all.sort_by(|a, b| b.errors.cmp(&a.errors)),
421 OrderBy::P99Time => all.sort_by(|a, b| b.p99.cmp(&a.p99)),
422 OrderBy::Rows => all.sort_by(|a, b| b.rows.cmp(&a.rows)),
423 }
424
425 all.truncate(limit);
426 all
427 }
428
429 pub fn all(&self) -> Vec<QueryStats> {
431 self.stats.iter().map(|r| r.to_stats()).collect()
432 }
433
434 pub fn count(&self) -> usize {
436 self.stats.len()
437 }
438
439 pub fn reset(&self) {
441 self.stats.clear();
442 }
443
444 fn evict_oldest(&self) {
446 let oldest = self
447 .stats
448 .iter()
449 .min_by_key(|r| r.last_seen.load(Ordering::Relaxed))
450 .map(|r| *r.key());
451
452 if let Some(hash) = oldest {
453 self.stats.remove(&hash);
454 }
455 }
456}
457
458fn now_nanos() -> u64 {
459 use std::time::SystemTime;
460 SystemTime::now()
461 .duration_since(SystemTime::UNIX_EPOCH)
462 .map(|d| d.as_nanos() as u64)
463 .unwrap_or(0)
464}
465
466#[cfg(test)]
467mod tests {
468 use super::*;
469
470 #[test]
471 fn test_query_execution_builder() {
472 let exec = QueryExecution::new("SELECT 1", Duration::from_millis(5))
473 .with_rows(1)
474 .with_user("alice")
475 .with_database("test");
476
477 assert_eq!(exec.rows, 1);
478 assert_eq!(exec.user, "alice");
479 assert_eq!(exec.database, "test");
480 }
481
482 #[test]
483 fn test_query_statistics_record() {
484 use crate::analytics::fingerprinter::QueryFingerprinter;
485
486 let fp = QueryFingerprinter::new();
487 let fingerprint = fp.fingerprint("SELECT * FROM users WHERE id = 1");
488 let stats = QueryStatistics::new(fingerprint);
489
490 let exec = QueryExecution::new("SELECT * FROM users WHERE id = 1", Duration::from_millis(5))
491 .with_rows(1);
492
493 stats.record(&exec);
494 stats.record(&exec);
495
496 assert_eq!(stats.calls(), 2);
497 assert_eq!(stats.rows(), 2);
498 }
499
500 #[test]
501 fn test_statistics_store() {
502 use crate::analytics::fingerprinter::QueryFingerprinter;
503
504 let store = StatisticsStore::new(100);
505 let fp = QueryFingerprinter::new();
506
507 let fingerprint = fp.fingerprint("SELECT * FROM users WHERE id = 1");
508 let exec = QueryExecution::new("SELECT * FROM users WHERE id = 1", Duration::from_millis(5));
509
510 store.record(&fingerprint, &exec);
511 store.record(&fingerprint, &exec);
512
513 let stats = store.get(fingerprint.hash).unwrap();
514 assert_eq!(stats.calls, 2);
515 }
516
517 #[test]
518 fn test_top_queries() {
519 use crate::analytics::fingerprinter::QueryFingerprinter;
520
521 let store = StatisticsStore::new(100);
522 let fp = QueryFingerprinter::new();
523
524 let fp1 = fp.fingerprint("SELECT * FROM users");
526 for _ in 0..10 {
527 let exec = QueryExecution::new("SELECT * FROM users", Duration::from_millis(1));
528 store.record(&fp1, &exec);
529 }
530
531 let fp2 = fp.fingerprint("SELECT * FROM orders");
533 for _ in 0..5 {
534 let exec = QueryExecution::new("SELECT * FROM orders", Duration::from_millis(1));
535 store.record(&fp2, &exec);
536 }
537
538 let top = store.top(OrderBy::Calls, 10);
539 assert_eq!(top.len(), 2);
540 assert_eq!(top[0].calls, 10);
541 assert_eq!(top[1].calls, 5);
542 }
543}