1use std::collections::HashMap;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::time::Duration;
8
9use dashmap::DashMap;
10use parking_lot::RwLock;
11
12use super::fingerprinter::{OperationType, QueryFingerprint};
13use super::intent::QueryIntent;
14use super::statistics::QueryExecution;
15
16struct OperationMetrics {
18 count: AtomicU64,
20 total_time_us: AtomicU64,
22 errors: AtomicU64,
24 rows: AtomicU64,
26}
27
28impl OperationMetrics {
29 fn new() -> Self {
30 Self {
31 count: AtomicU64::new(0),
32 total_time_us: AtomicU64::new(0),
33 errors: AtomicU64::new(0),
34 rows: AtomicU64::new(0),
35 }
36 }
37
38 fn record(&self, execution: &QueryExecution) {
39 self.count.fetch_add(1, Ordering::Relaxed);
40 self.total_time_us
41 .fetch_add(execution.duration.as_micros() as u64, Ordering::Relaxed);
42 self.rows
43 .fetch_add(execution.rows as u64, Ordering::Relaxed);
44
45 if execution.error.is_some() {
46 self.errors.fetch_add(1, Ordering::Relaxed);
47 }
48 }
49
50 fn snapshot(&self) -> OperationSnapshot {
51 let count = self.count.load(Ordering::Relaxed);
52 let total_time_us = self.total_time_us.load(Ordering::Relaxed);
53 let errors = self.errors.load(Ordering::Relaxed);
54 let rows = self.rows.load(Ordering::Relaxed);
55
56 let avg_time_us = total_time_us.checked_div(count).unwrap_or(0);
57
58 OperationSnapshot {
59 count,
60 total_time: Duration::from_micros(total_time_us),
61 avg_time: Duration::from_micros(avg_time_us),
62 errors,
63 error_rate: if count > 0 {
64 errors as f64 / count as f64
65 } else {
66 0.0
67 },
68 rows,
69 }
70 }
71
72 fn reset(&self) {
73 self.count.store(0, Ordering::Relaxed);
74 self.total_time_us.store(0, Ordering::Relaxed);
75 self.errors.store(0, Ordering::Relaxed);
76 self.rows.store(0, Ordering::Relaxed);
77 }
78}
79
80#[derive(Debug, Clone)]
82pub struct OperationSnapshot {
83 pub count: u64,
84 pub total_time: Duration,
85 pub avg_time: Duration,
86 pub errors: u64,
87 pub error_rate: f64,
88 pub rows: u64,
89}
90
91struct IntentMetrics {
93 count: AtomicU64,
95 total_time_us: AtomicU64,
97 cache_hits: AtomicU64,
99}
100
101impl IntentMetrics {
102 fn new() -> Self {
103 Self {
104 count: AtomicU64::new(0),
105 total_time_us: AtomicU64::new(0),
106 cache_hits: AtomicU64::new(0),
107 }
108 }
109
110 fn record(&self, duration: Duration) {
111 self.count.fetch_add(1, Ordering::Relaxed);
112 self.total_time_us
113 .fetch_add(duration.as_micros() as u64, Ordering::Relaxed);
114 }
115
116 fn record_cache_hit(&self) {
117 self.cache_hits.fetch_add(1, Ordering::Relaxed);
118 }
119
120 fn snapshot(&self) -> super::IntentStats {
121 let count = self.count.load(Ordering::Relaxed);
122 let total_us = self.total_time_us.load(Ordering::Relaxed);
123 let cache_hits = self.cache_hits.load(Ordering::Relaxed);
124
125 super::IntentStats {
126 calls: count,
127 total_time_ms: total_us / 1000,
128 avg_time_ms: if count > 0 {
129 (total_us as f64 / count as f64) / 1000.0
130 } else {
131 0.0
132 },
133 cache_hit_ratio: if count > 0 {
134 cache_hits as f64 / count as f64
135 } else {
136 0.0
137 },
138 }
139 }
140
141 fn reset(&self) {
142 self.count.store(0, Ordering::Relaxed);
143 self.total_time_us.store(0, Ordering::Relaxed);
144 self.cache_hits.store(0, Ordering::Relaxed);
145 }
146}
147
148#[derive(Debug, Clone)]
150pub struct QueryMetricEntry {
151 pub fingerprint_hash: u64,
152 pub normalized: String,
153 pub duration: Duration,
154 pub timestamp_nanos: u64,
155 pub user: String,
156 pub database: String,
157 pub intent: QueryIntent,
158}
159
160pub struct AnalyticsMetrics {
162 total_queries: AtomicU64,
164
165 total_time_us: AtomicU64,
167
168 total_errors: AtomicU64,
170
171 operations: DashMap<OperationType, OperationMetrics>,
173
174 intents: DashMap<QueryIntent, IntentMetrics>,
176
177 users: DashMap<String, OperationMetrics>,
179
180 databases: DashMap<String, OperationMetrics>,
182
183 nodes: DashMap<String, OperationMetrics>,
185
186 recent: RwLock<Vec<QueryMetricEntry>>,
188
189 max_recent: usize,
191}
192
193impl AnalyticsMetrics {
194 pub fn new() -> Self {
196 Self::with_max_recent(100)
197 }
198
199 pub fn with_max_recent(max_recent: usize) -> Self {
201 Self {
202 total_queries: AtomicU64::new(0),
203 total_time_us: AtomicU64::new(0),
204 total_errors: AtomicU64::new(0),
205 operations: DashMap::new(),
206 intents: DashMap::new(),
207 users: DashMap::new(),
208 databases: DashMap::new(),
209 nodes: DashMap::new(),
210 recent: RwLock::new(Vec::new()),
211 max_recent,
212 }
213 }
214
215 pub fn record(
217 &self,
218 fingerprint: &QueryFingerprint,
219 execution: &QueryExecution,
220 intent: QueryIntent,
221 ) {
222 self.total_queries.fetch_add(1, Ordering::Relaxed);
224 self.total_time_us
225 .fetch_add(execution.duration.as_micros() as u64, Ordering::Relaxed);
226
227 if execution.error.is_some() {
228 self.total_errors.fetch_add(1, Ordering::Relaxed);
229 }
230
231 self.operations
233 .entry(fingerprint.operation)
234 .or_insert_with(OperationMetrics::new)
235 .record(execution);
236
237 self.intents
239 .entry(intent)
240 .or_insert_with(IntentMetrics::new)
241 .record(execution.duration);
242
243 self.users
245 .entry(execution.user.clone())
246 .or_insert_with(OperationMetrics::new)
247 .record(execution);
248
249 self.databases
251 .entry(execution.database.clone())
252 .or_insert_with(OperationMetrics::new)
253 .record(execution);
254
255 self.nodes
257 .entry(execution.node.clone())
258 .or_insert_with(OperationMetrics::new)
259 .record(execution);
260
261 {
263 let mut recent = self.recent.write();
264 if recent.len() >= self.max_recent {
265 recent.remove(0);
266 }
267 recent.push(QueryMetricEntry {
268 fingerprint_hash: fingerprint.hash,
269 normalized: fingerprint.normalized.clone(),
270 duration: execution.duration,
271 timestamp_nanos: now_nanos(),
272 user: execution.user.clone(),
273 database: execution.database.clone(),
274 intent,
275 });
276 }
277 }
278
279 pub fn record_cache_hit(&self, intent: QueryIntent) {
281 self.intents
282 .entry(intent)
283 .or_insert_with(IntentMetrics::new)
284 .record_cache_hit();
285 }
286
287 pub fn snapshot(&self) -> AnalyticsSnapshot {
289 let total_queries = self.total_queries.load(Ordering::Relaxed);
290 let total_time_us = self.total_time_us.load(Ordering::Relaxed);
291 let total_errors = self.total_errors.load(Ordering::Relaxed);
292
293 let operations: HashMap<_, _> = self
294 .operations
295 .iter()
296 .map(|r| (*r.key(), r.value().snapshot()))
297 .collect();
298
299 let users: HashMap<_, _> = self
300 .users
301 .iter()
302 .map(|r| (r.key().clone(), r.value().snapshot()))
303 .collect();
304
305 let databases: HashMap<_, _> = self
306 .databases
307 .iter()
308 .map(|r| (r.key().clone(), r.value().snapshot()))
309 .collect();
310
311 let nodes: HashMap<_, _> = self
312 .nodes
313 .iter()
314 .map(|r| (r.key().clone(), r.value().snapshot()))
315 .collect();
316
317 AnalyticsSnapshot {
318 total_queries,
319 total_time: Duration::from_micros(total_time_us),
320 total_errors,
321 error_rate: if total_queries > 0 {
322 total_errors as f64 / total_queries as f64
323 } else {
324 0.0
325 },
326 qps: 0.0, avg_time: Duration::from_micros(total_time_us.checked_div(total_queries).unwrap_or(0)),
328 by_operation: operations,
329 by_user: users,
330 by_database: databases,
331 by_node: nodes,
332 }
333 }
334
335 pub fn by_intent(&self) -> HashMap<QueryIntent, super::IntentStats> {
337 self.intents
338 .iter()
339 .map(|r| (*r.key(), r.value().snapshot()))
340 .collect()
341 }
342
343 pub fn recent_queries(&self, limit: usize) -> Vec<QueryMetricEntry> {
345 let recent = self.recent.read();
346 recent.iter().rev().take(limit).cloned().collect()
347 }
348
349 pub fn reset(&self) {
351 self.total_queries.store(0, Ordering::Relaxed);
352 self.total_time_us.store(0, Ordering::Relaxed);
353 self.total_errors.store(0, Ordering::Relaxed);
354
355 for entry in self.operations.iter() {
356 entry.value().reset();
357 }
358 for entry in self.intents.iter() {
359 entry.value().reset();
360 }
361 for entry in self.users.iter() {
362 entry.value().reset();
363 }
364 for entry in self.databases.iter() {
365 entry.value().reset();
366 }
367 for entry in self.nodes.iter() {
368 entry.value().reset();
369 }
370
371 self.recent.write().clear();
372 }
373}
374
375impl Default for AnalyticsMetrics {
376 fn default() -> Self {
377 Self::new()
378 }
379}
380
381#[derive(Debug, Clone)]
383pub struct AnalyticsSnapshot {
384 pub total_queries: u64,
386
387 pub total_time: Duration,
389
390 pub total_errors: u64,
392
393 pub error_rate: f64,
395
396 pub qps: f64,
398
399 pub avg_time: Duration,
401
402 pub by_operation: HashMap<OperationType, OperationSnapshot>,
404
405 pub by_user: HashMap<String, OperationSnapshot>,
407
408 pub by_database: HashMap<String, OperationSnapshot>,
410
411 pub by_node: HashMap<String, OperationSnapshot>,
413}
414
415fn now_nanos() -> u64 {
416 std::time::SystemTime::now()
417 .duration_since(std::time::SystemTime::UNIX_EPOCH)
418 .map(|d| d.as_nanos() as u64)
419 .unwrap_or(0)
420}
421
422#[cfg(test)]
423mod tests {
424 use super::*;
425 use crate::analytics::fingerprinter::QueryFingerprinter;
426
427 #[test]
428 fn test_metrics_new() {
429 let metrics = AnalyticsMetrics::new();
430 let snapshot = metrics.snapshot();
431 assert_eq!(snapshot.total_queries, 0);
432 assert_eq!(snapshot.total_errors, 0);
433 }
434
435 #[test]
436 fn test_metrics_record() {
437 let metrics = AnalyticsMetrics::new();
438 let fp = QueryFingerprinter::new();
439
440 let fingerprint = fp.fingerprint("SELECT * FROM users WHERE id = 1");
441 let execution = QueryExecution::new(
442 "SELECT * FROM users WHERE id = 1",
443 Duration::from_millis(10),
444 )
445 .with_user("alice")
446 .with_database("mydb")
447 .with_node("primary")
448 .with_rows(1);
449
450 metrics.record(&fingerprint, &execution, QueryIntent::Retrieval);
451
452 let snapshot = metrics.snapshot();
453 assert_eq!(snapshot.total_queries, 1);
454 assert!(snapshot.by_operation.contains_key(&OperationType::Select));
455 assert!(snapshot.by_user.contains_key("alice"));
456 assert!(snapshot.by_database.contains_key("mydb"));
457 }
458
459 #[test]
460 fn test_metrics_by_intent() {
461 let metrics = AnalyticsMetrics::new();
462 let fp = QueryFingerprinter::new();
463
464 let fingerprint = fp.fingerprint("SELECT * FROM users");
466 let execution = QueryExecution::new("SELECT * FROM users", Duration::from_millis(5));
467 metrics.record(&fingerprint, &execution, QueryIntent::Retrieval);
468
469 let fingerprint = fp.fingerprint("INSERT INTO users VALUES (1, 'Alice')");
471 let execution = QueryExecution::new(
472 "INSERT INTO users VALUES (1, 'Alice')",
473 Duration::from_millis(10),
474 );
475 metrics.record(&fingerprint, &execution, QueryIntent::Storage);
476
477 let by_intent = metrics.by_intent();
478 assert!(by_intent.contains_key(&QueryIntent::Retrieval));
479 assert!(by_intent.contains_key(&QueryIntent::Storage));
480 }
481
482 #[test]
483 fn test_metrics_error_tracking() {
484 let metrics = AnalyticsMetrics::new();
485 let fp = QueryFingerprinter::new();
486
487 let fingerprint = fp.fingerprint("SELECT 1");
489 let execution = QueryExecution::new("SELECT 1", Duration::from_millis(1));
490 metrics.record(&fingerprint, &execution, QueryIntent::Retrieval);
491
492 let execution = QueryExecution::new("SELECT 1", Duration::from_millis(1))
494 .with_error("Connection refused");
495 metrics.record(&fingerprint, &execution, QueryIntent::Retrieval);
496
497 let snapshot = metrics.snapshot();
498 assert_eq!(snapshot.total_queries, 2);
499 assert_eq!(snapshot.total_errors, 1);
500 assert!((snapshot.error_rate - 0.5).abs() < 0.001);
501 }
502
503 #[test]
504 fn test_metrics_reset() {
505 let metrics = AnalyticsMetrics::new();
506 let fp = QueryFingerprinter::new();
507
508 let fingerprint = fp.fingerprint("SELECT 1");
509 let execution = QueryExecution::new("SELECT 1", Duration::from_millis(1));
510 metrics.record(&fingerprint, &execution, QueryIntent::Retrieval);
511
512 metrics.reset();
513
514 let snapshot = metrics.snapshot();
515 assert_eq!(snapshot.total_queries, 0);
516 }
517
518 #[test]
519 fn test_recent_queries() {
520 let metrics = AnalyticsMetrics::with_max_recent(5);
521 let fp = QueryFingerprinter::new();
522
523 for i in 0..10 {
525 let query = format!("SELECT {}", i);
526 let fingerprint = fp.fingerprint(&query);
527 let execution = QueryExecution::new(query, Duration::from_millis(1));
528 metrics.record(&fingerprint, &execution, QueryIntent::Retrieval);
529 }
530
531 let recent = metrics.recent_queries(10);
533 assert_eq!(recent.len(), 5);
534 }
535
536 #[test]
537 fn test_cache_hit_recording() {
538 let metrics = AnalyticsMetrics::new();
539
540 for _ in 0..5 {
542 metrics.record_cache_hit(QueryIntent::Retrieval);
543 }
544
545 let by_intent = metrics.by_intent();
546 if let Some(stats) = by_intent.get(&QueryIntent::Retrieval) {
547 assert_eq!(stats.calls, 0); }
549 }
550}