heliosdb_proxy/analytics/
statistics.rs1use std::sync::atomic::{AtomicU64, Ordering};
6use std::time::Duration;
7
8use dashmap::DashMap;
9
10use super::fingerprinter::{OperationType, QueryFingerprint};
11use super::histogram::LatencyHistogram;
12use super::OrderBy;
13
14#[derive(Debug, Clone)]
16pub struct QueryExecution {
17 pub query: String,
19
20 pub duration: Duration,
22
23 pub rows: usize,
25
26 pub error: Option<String>,
28
29 pub user: String,
31
32 pub client_ip: String,
34
35 pub database: String,
37
38 pub node: String,
40
41 pub session_id: Option<String>,
43
44 pub workflow_id: Option<String>,
46
47 pub parameters: Option<Vec<String>>,
49}
50
51impl QueryExecution {
52 pub fn new(query: impl Into<String>, duration: Duration) -> Self {
54 Self {
55 query: query.into(),
56 duration,
57 rows: 0,
58 error: None,
59 user: "unknown".to_string(),
60 client_ip: "unknown".to_string(),
61 database: "default".to_string(),
62 node: "primary".to_string(),
63 session_id: None,
64 workflow_id: None,
65 parameters: None,
66 }
67 }
68
69 pub fn with_rows(mut self, rows: usize) -> Self {
70 self.rows = rows;
71 self
72 }
73
74 pub fn with_error(mut self, error: impl Into<String>) -> Self {
75 self.error = Some(error.into());
76 self
77 }
78
79 pub fn with_user(mut self, user: impl Into<String>) -> Self {
80 self.user = user.into();
81 self
82 }
83
84 pub fn with_client_ip(mut self, ip: impl Into<String>) -> Self {
85 self.client_ip = ip.into();
86 self
87 }
88
89 pub fn with_database(mut self, db: impl Into<String>) -> Self {
90 self.database = db.into();
91 self
92 }
93
94 pub fn with_node(mut self, node: impl Into<String>) -> Self {
95 self.node = node.into();
96 self
97 }
98
99 pub fn with_session(mut self, session: impl Into<String>) -> Self {
100 self.session_id = Some(session.into());
101 self
102 }
103
104 pub fn with_workflow(mut self, workflow: impl Into<String>) -> Self {
105 self.workflow_id = Some(workflow.into());
106 self
107 }
108
109 pub fn is_error(&self) -> bool {
111 self.error.is_some()
112 }
113}
114
115pub struct QueryStatistics {
117 fingerprint: QueryFingerprint,
119
120 calls: AtomicU64,
122
123 total_time_us: AtomicU64,
125
126 min_time_us: AtomicU64,
128
129 max_time_us: AtomicU64,
131
132 rows: AtomicU64,
134
135 errors: AtomicU64,
137
138 histogram: LatencyHistogram,
140
141 first_seen: AtomicU64,
143
144 last_seen: AtomicU64,
146
147 users: DashMap<String, AtomicU64>,
149
150 clients: DashMap<String, AtomicU64>,
152
153 databases: DashMap<String, AtomicU64>,
155}
156
157impl QueryStatistics {
158 pub fn new(fingerprint: QueryFingerprint) -> Self {
160 let now = now_nanos();
161 Self {
162 fingerprint,
163 calls: AtomicU64::new(0),
164 total_time_us: AtomicU64::new(0),
165 min_time_us: AtomicU64::new(u64::MAX),
166 max_time_us: AtomicU64::new(0),
167 rows: AtomicU64::new(0),
168 errors: AtomicU64::new(0),
169 histogram: LatencyHistogram::new(),
170 first_seen: AtomicU64::new(now),
171 last_seen: AtomicU64::new(now),
172 users: DashMap::new(),
173 clients: DashMap::new(),
174 databases: DashMap::new(),
175 }
176 }
177
178 pub fn record(&self, execution: &QueryExecution) {
180 self.calls.fetch_add(1, Ordering::Relaxed);
181
182 let duration_us = execution.duration.as_micros() as u64;
183 self.total_time_us.fetch_add(duration_us, Ordering::Relaxed);
184 self.rows
185 .fetch_add(execution.rows as u64, Ordering::Relaxed);
186
187 if execution.error.is_some() {
188 self.errors.fetch_add(1, Ordering::Relaxed);
189 }
190
191 self.update_min(duration_us);
193 self.update_max(duration_us);
194
195 self.histogram.record(execution.duration);
197
198 self.last_seen.store(now_nanos(), Ordering::Relaxed);
200
201 self.users
203 .entry(execution.user.clone())
204 .or_insert_with(|| AtomicU64::new(0))
205 .fetch_add(1, Ordering::Relaxed);
206
207 self.clients
209 .entry(execution.client_ip.clone())
210 .or_insert_with(|| AtomicU64::new(0))
211 .fetch_add(1, Ordering::Relaxed);
212
213 self.databases
215 .entry(execution.database.clone())
216 .or_insert_with(|| AtomicU64::new(0))
217 .fetch_add(1, Ordering::Relaxed);
218 }
219
220 fn update_min(&self, value: u64) {
221 let mut current = self.min_time_us.load(Ordering::Relaxed);
222 while value < current {
223 match self.min_time_us.compare_exchange_weak(
224 current,
225 value,
226 Ordering::SeqCst,
227 Ordering::Relaxed,
228 ) {
229 Ok(_) => break,
230 Err(c) => current = c,
231 }
232 }
233 }
234
235 fn update_max(&self, value: u64) {
236 let mut current = self.max_time_us.load(Ordering::Relaxed);
237 while value > current {
238 match self.max_time_us.compare_exchange_weak(
239 current,
240 value,
241 Ordering::SeqCst,
242 Ordering::Relaxed,
243 ) {
244 Ok(_) => break,
245 Err(c) => current = c,
246 }
247 }
248 }
249
250 pub fn fingerprint(&self) -> &QueryFingerprint {
252 &self.fingerprint
253 }
254
255 pub fn calls(&self) -> u64 {
257 self.calls.load(Ordering::Relaxed)
258 }
259
260 pub fn avg_time(&self) -> Duration {
262 let total = self.total_time_us.load(Ordering::Relaxed);
263 let calls = self.calls.load(Ordering::Relaxed);
264 Duration::from_micros(total / calls.max(1))
265 }
266
267 pub fn total_time(&self) -> Duration {
269 Duration::from_micros(self.total_time_us.load(Ordering::Relaxed))
270 }
271
272 pub fn min_time(&self) -> Duration {
274 let min = self.min_time_us.load(Ordering::Relaxed);
275 if min == u64::MAX {
276 Duration::ZERO
277 } else {
278 Duration::from_micros(min)
279 }
280 }
281
282 pub fn max_time(&self) -> Duration {
284 Duration::from_micros(self.max_time_us.load(Ordering::Relaxed))
285 }
286
287 pub fn rows(&self) -> u64 {
289 self.rows.load(Ordering::Relaxed)
290 }
291
292 pub fn errors(&self) -> u64 {
294 self.errors.load(Ordering::Relaxed)
295 }
296
297 pub fn p50(&self) -> Duration {
299 self.histogram.percentile(0.50)
300 }
301
302 pub fn p90(&self) -> Duration {
304 self.histogram.percentile(0.90)
305 }
306
307 pub fn p99(&self) -> Duration {
309 self.histogram.percentile(0.99)
310 }
311
312 pub fn error_rate(&self) -> f64 {
314 let calls = self.calls() as f64;
315 if calls == 0.0 {
316 return 0.0;
317 }
318 self.errors() as f64 / calls
319 }
320
321 pub fn to_stats(&self) -> QueryStats {
323 QueryStats {
324 fingerprint_hash: self.fingerprint.hash,
325 normalized: self.fingerprint.normalized.clone(),
326 tables: self.fingerprint.tables.clone(),
327 operation: self.fingerprint.operation,
328 calls: self.calls(),
329 total_time: self.total_time(),
330 avg_time: self.avg_time(),
331 min_time: self.min_time(),
332 max_time: self.max_time(),
333 rows: self.rows(),
334 errors: self.errors(),
335 error_rate: self.error_rate(),
336 p50: self.p50(),
337 p90: self.p90(),
338 p99: self.p99(),
339 first_seen_nanos: self.first_seen.load(Ordering::Relaxed),
340 last_seen_nanos: self.last_seen.load(Ordering::Relaxed),
341 }
342 }
343}
344
345#[derive(Debug, Clone)]
347pub struct QueryStats {
348 pub fingerprint_hash: u64,
349 pub normalized: String,
350 pub tables: Vec<String>,
351 pub operation: OperationType,
352 pub calls: u64,
353 pub total_time: Duration,
354 pub avg_time: Duration,
355 pub min_time: Duration,
356 pub max_time: Duration,
357 pub rows: u64,
358 pub errors: u64,
359 pub error_rate: f64,
360 pub p50: Duration,
361 pub p90: Duration,
362 pub p99: Duration,
363 pub first_seen_nanos: u64,
364 pub last_seen_nanos: u64,
365}
366
367impl QueryStats {
368 pub fn short_id(&self) -> String {
370 format!("{:016x}", self.fingerprint_hash)
371 }
372}
373
374pub struct StatisticsStore {
376 stats: DashMap<u64, QueryStatistics>,
378
379 max_fingerprints: usize,
381}
382
383impl StatisticsStore {
384 pub fn new(max_fingerprints: usize) -> Self {
386 Self {
387 stats: DashMap::new(),
388 max_fingerprints,
389 }
390 }
391
392 pub fn record(&self, fingerprint: &QueryFingerprint, execution: &QueryExecution) {
394 if !self.stats.contains_key(&fingerprint.hash) && self.stats.len() >= self.max_fingerprints
397 {
398 self.evict_oldest();
399 }
400
401 let stats = self
402 .stats
403 .entry(fingerprint.hash)
404 .or_insert_with(|| QueryStatistics::new(fingerprint.clone()));
405
406 stats.record(execution);
407 }
408
409 pub fn get(&self, fingerprint_hash: u64) -> Option<QueryStats> {
411 self.stats.get(&fingerprint_hash).map(|s| s.to_stats())
412 }
413
414 pub fn top(&self, order_by: OrderBy, limit: usize) -> Vec<QueryStats> {
416 let mut all: Vec<_> = self.stats.iter().map(|r| r.to_stats()).collect();
417
418 match order_by {
419 OrderBy::TotalTime => all.sort_by_key(|b| std::cmp::Reverse(b.total_time)),
420 OrderBy::AvgTime => all.sort_by_key(|b| std::cmp::Reverse(b.avg_time)),
421 OrderBy::Calls => all.sort_by_key(|b| std::cmp::Reverse(b.calls)),
422 OrderBy::Errors => all.sort_by_key(|b| std::cmp::Reverse(b.errors)),
423 OrderBy::P99Time => all.sort_by_key(|b| std::cmp::Reverse(b.p99)),
424 OrderBy::Rows => all.sort_by_key(|b| std::cmp::Reverse(b.rows)),
425 }
426
427 all.truncate(limit);
428 all
429 }
430
431 pub fn all(&self) -> Vec<QueryStats> {
433 self.stats.iter().map(|r| r.to_stats()).collect()
434 }
435
436 pub fn count(&self) -> usize {
438 self.stats.len()
439 }
440
441 pub fn reset(&self) {
443 self.stats.clear();
444 }
445
446 fn evict_oldest(&self) {
448 let oldest = self
449 .stats
450 .iter()
451 .min_by_key(|r| r.last_seen.load(Ordering::Relaxed))
452 .map(|r| *r.key());
453
454 if let Some(hash) = oldest {
455 self.stats.remove(&hash);
456 }
457 }
458}
459
460fn now_nanos() -> u64 {
461 use std::time::SystemTime;
462 SystemTime::now()
463 .duration_since(SystemTime::UNIX_EPOCH)
464 .map(|d| d.as_nanos() as u64)
465 .unwrap_or(0)
466}
467
468#[cfg(test)]
469mod tests {
470 use super::*;
471
472 #[test]
473 fn test_query_execution_builder() {
474 let exec = QueryExecution::new("SELECT 1", Duration::from_millis(5))
475 .with_rows(1)
476 .with_user("alice")
477 .with_database("test");
478
479 assert_eq!(exec.rows, 1);
480 assert_eq!(exec.user, "alice");
481 assert_eq!(exec.database, "test");
482 }
483
484 #[test]
485 fn test_query_statistics_record() {
486 use crate::analytics::fingerprinter::QueryFingerprinter;
487
488 let fp = QueryFingerprinter::new();
489 let fingerprint = fp.fingerprint("SELECT * FROM users WHERE id = 1");
490 let stats = QueryStatistics::new(fingerprint);
491
492 let exec =
493 QueryExecution::new("SELECT * FROM users WHERE id = 1", Duration::from_millis(5))
494 .with_rows(1);
495
496 stats.record(&exec);
497 stats.record(&exec);
498
499 assert_eq!(stats.calls(), 2);
500 assert_eq!(stats.rows(), 2);
501 }
502
503 #[test]
504 fn test_statistics_store() {
505 use crate::analytics::fingerprinter::QueryFingerprinter;
506
507 let store = StatisticsStore::new(100);
508 let fp = QueryFingerprinter::new();
509
510 let fingerprint = fp.fingerprint("SELECT * FROM users WHERE id = 1");
511 let exec =
512 QueryExecution::new("SELECT * FROM users WHERE id = 1", Duration::from_millis(5));
513
514 store.record(&fingerprint, &exec);
515 store.record(&fingerprint, &exec);
516
517 let stats = store.get(fingerprint.hash).unwrap();
518 assert_eq!(stats.calls, 2);
519 }
520
521 #[test]
522 fn test_top_queries() {
523 use crate::analytics::fingerprinter::QueryFingerprinter;
524
525 let store = StatisticsStore::new(100);
526 let fp = QueryFingerprinter::new();
527
528 let fp1 = fp.fingerprint("SELECT * FROM users");
530 for _ in 0..10 {
531 let exec = QueryExecution::new("SELECT * FROM users", Duration::from_millis(1));
532 store.record(&fp1, &exec);
533 }
534
535 let fp2 = fp.fingerprint("SELECT * FROM orders");
537 for _ in 0..5 {
538 let exec = QueryExecution::new("SELECT * FROM orders", Duration::from_millis(1));
539 store.record(&fp2, &exec);
540 }
541
542 let top = store.top(OrderBy::Calls, 10);
543 assert_eq!(top.len(), 2);
544 assert_eq!(top[0].calls, 10);
545 assert_eq!(top[1].calls, 5);
546 }
547}