1use std::collections::HashMap;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::time::Duration;
8
9use dashmap::DashMap;
10use parking_lot::RwLock;
11
12use super::fingerprinter::{QueryFingerprint, OperationType};
13use super::statistics::QueryExecution;
14use super::intent::QueryIntent;
15
16struct OperationMetrics {
18 count: AtomicU64,
20 total_time_us: AtomicU64,
22 errors: AtomicU64,
24 rows: AtomicU64,
26}
27
28impl OperationMetrics {
29 fn new() -> Self {
30 Self {
31 count: AtomicU64::new(0),
32 total_time_us: AtomicU64::new(0),
33 errors: AtomicU64::new(0),
34 rows: AtomicU64::new(0),
35 }
36 }
37
38 fn record(&self, execution: &QueryExecution) {
39 self.count.fetch_add(1, Ordering::Relaxed);
40 self.total_time_us
41 .fetch_add(execution.duration.as_micros() as u64, Ordering::Relaxed);
42 self.rows
43 .fetch_add(execution.rows as u64, Ordering::Relaxed);
44
45 if execution.error.is_some() {
46 self.errors.fetch_add(1, Ordering::Relaxed);
47 }
48 }
49
50 fn snapshot(&self) -> OperationSnapshot {
51 let count = self.count.load(Ordering::Relaxed);
52 let total_time_us = self.total_time_us.load(Ordering::Relaxed);
53 let errors = self.errors.load(Ordering::Relaxed);
54 let rows = self.rows.load(Ordering::Relaxed);
55
56 let avg_time_us = if count > 0 {
57 total_time_us / count
58 } else {
59 0
60 };
61
62 OperationSnapshot {
63 count,
64 total_time: Duration::from_micros(total_time_us),
65 avg_time: Duration::from_micros(avg_time_us),
66 errors,
67 error_rate: if count > 0 {
68 errors as f64 / count as f64
69 } else {
70 0.0
71 },
72 rows,
73 }
74 }
75
76 fn reset(&self) {
77 self.count.store(0, Ordering::Relaxed);
78 self.total_time_us.store(0, Ordering::Relaxed);
79 self.errors.store(0, Ordering::Relaxed);
80 self.rows.store(0, Ordering::Relaxed);
81 }
82}
83
84#[derive(Debug, Clone)]
86pub struct OperationSnapshot {
87 pub count: u64,
88 pub total_time: Duration,
89 pub avg_time: Duration,
90 pub errors: u64,
91 pub error_rate: f64,
92 pub rows: u64,
93}
94
95struct IntentMetrics {
97 count: AtomicU64,
99 total_time_us: AtomicU64,
101 cache_hits: AtomicU64,
103}
104
105impl IntentMetrics {
106 fn new() -> Self {
107 Self {
108 count: AtomicU64::new(0),
109 total_time_us: AtomicU64::new(0),
110 cache_hits: AtomicU64::new(0),
111 }
112 }
113
114 fn record(&self, duration: Duration) {
115 self.count.fetch_add(1, Ordering::Relaxed);
116 self.total_time_us
117 .fetch_add(duration.as_micros() as u64, Ordering::Relaxed);
118 }
119
120 fn record_cache_hit(&self) {
121 self.cache_hits.fetch_add(1, Ordering::Relaxed);
122 }
123
124 fn snapshot(&self) -> super::IntentStats {
125 let count = self.count.load(Ordering::Relaxed);
126 let total_us = self.total_time_us.load(Ordering::Relaxed);
127 let cache_hits = self.cache_hits.load(Ordering::Relaxed);
128
129 super::IntentStats {
130 calls: count,
131 total_time_ms: total_us / 1000,
132 avg_time_ms: if count > 0 {
133 (total_us as f64 / count as f64) / 1000.0
134 } else {
135 0.0
136 },
137 cache_hit_ratio: if count > 0 {
138 cache_hits as f64 / count as f64
139 } else {
140 0.0
141 },
142 }
143 }
144
145 fn reset(&self) {
146 self.count.store(0, Ordering::Relaxed);
147 self.total_time_us.store(0, Ordering::Relaxed);
148 self.cache_hits.store(0, Ordering::Relaxed);
149 }
150}
151
152#[derive(Debug, Clone)]
154pub struct QueryMetricEntry {
155 pub fingerprint_hash: u64,
156 pub normalized: String,
157 pub duration: Duration,
158 pub timestamp_nanos: u64,
159 pub user: String,
160 pub database: String,
161 pub intent: QueryIntent,
162}
163
164pub struct AnalyticsMetrics {
166 total_queries: AtomicU64,
168
169 total_time_us: AtomicU64,
171
172 total_errors: AtomicU64,
174
175 operations: DashMap<OperationType, OperationMetrics>,
177
178 intents: DashMap<QueryIntent, IntentMetrics>,
180
181 users: DashMap<String, OperationMetrics>,
183
184 databases: DashMap<String, OperationMetrics>,
186
187 nodes: DashMap<String, OperationMetrics>,
189
190 recent: RwLock<Vec<QueryMetricEntry>>,
192
193 max_recent: usize,
195}
196
197impl AnalyticsMetrics {
198 pub fn new() -> Self {
200 Self::with_max_recent(100)
201 }
202
203 pub fn with_max_recent(max_recent: usize) -> Self {
205 Self {
206 total_queries: AtomicU64::new(0),
207 total_time_us: AtomicU64::new(0),
208 total_errors: AtomicU64::new(0),
209 operations: DashMap::new(),
210 intents: DashMap::new(),
211 users: DashMap::new(),
212 databases: DashMap::new(),
213 nodes: DashMap::new(),
214 recent: RwLock::new(Vec::new()),
215 max_recent,
216 }
217 }
218
219 pub fn record(
221 &self,
222 fingerprint: &QueryFingerprint,
223 execution: &QueryExecution,
224 intent: QueryIntent,
225 ) {
226 self.total_queries.fetch_add(1, Ordering::Relaxed);
228 self.total_time_us
229 .fetch_add(execution.duration.as_micros() as u64, Ordering::Relaxed);
230
231 if execution.error.is_some() {
232 self.total_errors.fetch_add(1, Ordering::Relaxed);
233 }
234
235 self.operations
237 .entry(fingerprint.operation)
238 .or_insert_with(OperationMetrics::new)
239 .record(execution);
240
241 self.intents
243 .entry(intent)
244 .or_insert_with(IntentMetrics::new)
245 .record(execution.duration);
246
247 self.users
249 .entry(execution.user.clone())
250 .or_insert_with(OperationMetrics::new)
251 .record(execution);
252
253 self.databases
255 .entry(execution.database.clone())
256 .or_insert_with(OperationMetrics::new)
257 .record(execution);
258
259 self.nodes
261 .entry(execution.node.clone())
262 .or_insert_with(OperationMetrics::new)
263 .record(execution);
264
265 {
267 let mut recent = self.recent.write();
268 if recent.len() >= self.max_recent {
269 recent.remove(0);
270 }
271 recent.push(QueryMetricEntry {
272 fingerprint_hash: fingerprint.hash,
273 normalized: fingerprint.normalized.clone(),
274 duration: execution.duration,
275 timestamp_nanos: now_nanos(),
276 user: execution.user.clone(),
277 database: execution.database.clone(),
278 intent,
279 });
280 }
281 }
282
283 pub fn record_cache_hit(&self, intent: QueryIntent) {
285 self.intents
286 .entry(intent)
287 .or_insert_with(IntentMetrics::new)
288 .record_cache_hit();
289 }
290
291 pub fn snapshot(&self) -> AnalyticsSnapshot {
293 let total_queries = self.total_queries.load(Ordering::Relaxed);
294 let total_time_us = self.total_time_us.load(Ordering::Relaxed);
295 let total_errors = self.total_errors.load(Ordering::Relaxed);
296
297 let operations: HashMap<_, _> = self
298 .operations
299 .iter()
300 .map(|r| (*r.key(), r.value().snapshot()))
301 .collect();
302
303 let users: HashMap<_, _> = self
304 .users
305 .iter()
306 .map(|r| (r.key().clone(), r.value().snapshot()))
307 .collect();
308
309 let databases: HashMap<_, _> = self
310 .databases
311 .iter()
312 .map(|r| (r.key().clone(), r.value().snapshot()))
313 .collect();
314
315 let nodes: HashMap<_, _> = self
316 .nodes
317 .iter()
318 .map(|r| (r.key().clone(), r.value().snapshot()))
319 .collect();
320
321 AnalyticsSnapshot {
322 total_queries,
323 total_time: Duration::from_micros(total_time_us),
324 total_errors,
325 error_rate: if total_queries > 0 {
326 total_errors as f64 / total_queries as f64
327 } else {
328 0.0
329 },
330 qps: 0.0, avg_time: if total_queries > 0 {
332 Duration::from_micros(total_time_us / total_queries)
333 } else {
334 Duration::ZERO
335 },
336 by_operation: operations,
337 by_user: users,
338 by_database: databases,
339 by_node: nodes,
340 }
341 }
342
343 pub fn by_intent(&self) -> HashMap<QueryIntent, super::IntentStats> {
345 self.intents
346 .iter()
347 .map(|r| (*r.key(), r.value().snapshot()))
348 .collect()
349 }
350
351 pub fn recent_queries(&self, limit: usize) -> Vec<QueryMetricEntry> {
353 let recent = self.recent.read();
354 recent.iter().rev().take(limit).cloned().collect()
355 }
356
357 pub fn reset(&self) {
359 self.total_queries.store(0, Ordering::Relaxed);
360 self.total_time_us.store(0, Ordering::Relaxed);
361 self.total_errors.store(0, Ordering::Relaxed);
362
363 for entry in self.operations.iter() {
364 entry.value().reset();
365 }
366 for entry in self.intents.iter() {
367 entry.value().reset();
368 }
369 for entry in self.users.iter() {
370 entry.value().reset();
371 }
372 for entry in self.databases.iter() {
373 entry.value().reset();
374 }
375 for entry in self.nodes.iter() {
376 entry.value().reset();
377 }
378
379 self.recent.write().clear();
380 }
381}
382
383impl Default for AnalyticsMetrics {
384 fn default() -> Self {
385 Self::new()
386 }
387}
388
389#[derive(Debug, Clone)]
391pub struct AnalyticsSnapshot {
392 pub total_queries: u64,
394
395 pub total_time: Duration,
397
398 pub total_errors: u64,
400
401 pub error_rate: f64,
403
404 pub qps: f64,
406
407 pub avg_time: Duration,
409
410 pub by_operation: HashMap<OperationType, OperationSnapshot>,
412
413 pub by_user: HashMap<String, OperationSnapshot>,
415
416 pub by_database: HashMap<String, OperationSnapshot>,
418
419 pub by_node: HashMap<String, OperationSnapshot>,
421}
422
423fn now_nanos() -> u64 {
424 std::time::SystemTime::now()
425 .duration_since(std::time::SystemTime::UNIX_EPOCH)
426 .map(|d| d.as_nanos() as u64)
427 .unwrap_or(0)
428}
429
430#[cfg(test)]
431mod tests {
432 use super::*;
433 use crate::analytics::fingerprinter::QueryFingerprinter;
434
435 #[test]
436 fn test_metrics_new() {
437 let metrics = AnalyticsMetrics::new();
438 let snapshot = metrics.snapshot();
439 assert_eq!(snapshot.total_queries, 0);
440 assert_eq!(snapshot.total_errors, 0);
441 }
442
443 #[test]
444 fn test_metrics_record() {
445 let metrics = AnalyticsMetrics::new();
446 let fp = QueryFingerprinter::new();
447
448 let fingerprint = fp.fingerprint("SELECT * FROM users WHERE id = 1");
449 let execution = QueryExecution::new("SELECT * FROM users WHERE id = 1", Duration::from_millis(10))
450 .with_user("alice")
451 .with_database("mydb")
452 .with_node("primary")
453 .with_rows(1);
454
455 metrics.record(&fingerprint, &execution, QueryIntent::Retrieval);
456
457 let snapshot = metrics.snapshot();
458 assert_eq!(snapshot.total_queries, 1);
459 assert!(snapshot.by_operation.contains_key(&OperationType::Select));
460 assert!(snapshot.by_user.contains_key("alice"));
461 assert!(snapshot.by_database.contains_key("mydb"));
462 }
463
464 #[test]
465 fn test_metrics_by_intent() {
466 let metrics = AnalyticsMetrics::new();
467 let fp = QueryFingerprinter::new();
468
469 let fingerprint = fp.fingerprint("SELECT * FROM users");
471 let execution = QueryExecution::new("SELECT * FROM users", Duration::from_millis(5));
472 metrics.record(&fingerprint, &execution, QueryIntent::Retrieval);
473
474 let fingerprint = fp.fingerprint("INSERT INTO users VALUES (1, 'Alice')");
476 let execution = QueryExecution::new("INSERT INTO users VALUES (1, 'Alice')", Duration::from_millis(10));
477 metrics.record(&fingerprint, &execution, QueryIntent::Storage);
478
479 let by_intent = metrics.by_intent();
480 assert!(by_intent.contains_key(&QueryIntent::Retrieval));
481 assert!(by_intent.contains_key(&QueryIntent::Storage));
482 }
483
484 #[test]
485 fn test_metrics_error_tracking() {
486 let metrics = AnalyticsMetrics::new();
487 let fp = QueryFingerprinter::new();
488
489 let fingerprint = fp.fingerprint("SELECT 1");
491 let execution = QueryExecution::new("SELECT 1", Duration::from_millis(1));
492 metrics.record(&fingerprint, &execution, QueryIntent::Retrieval);
493
494 let execution = QueryExecution::new("SELECT 1", Duration::from_millis(1))
496 .with_error("Connection refused");
497 metrics.record(&fingerprint, &execution, QueryIntent::Retrieval);
498
499 let snapshot = metrics.snapshot();
500 assert_eq!(snapshot.total_queries, 2);
501 assert_eq!(snapshot.total_errors, 1);
502 assert!((snapshot.error_rate - 0.5).abs() < 0.001);
503 }
504
505 #[test]
506 fn test_metrics_reset() {
507 let metrics = AnalyticsMetrics::new();
508 let fp = QueryFingerprinter::new();
509
510 let fingerprint = fp.fingerprint("SELECT 1");
511 let execution = QueryExecution::new("SELECT 1", Duration::from_millis(1));
512 metrics.record(&fingerprint, &execution, QueryIntent::Retrieval);
513
514 metrics.reset();
515
516 let snapshot = metrics.snapshot();
517 assert_eq!(snapshot.total_queries, 0);
518 }
519
520 #[test]
521 fn test_recent_queries() {
522 let metrics = AnalyticsMetrics::with_max_recent(5);
523 let fp = QueryFingerprinter::new();
524
525 for i in 0..10 {
527 let query = format!("SELECT {}", i);
528 let fingerprint = fp.fingerprint(&query);
529 let execution = QueryExecution::new(query, Duration::from_millis(1));
530 metrics.record(&fingerprint, &execution, QueryIntent::Retrieval);
531 }
532
533 let recent = metrics.recent_queries(10);
535 assert_eq!(recent.len(), 5);
536 }
537
538 #[test]
539 fn test_cache_hit_recording() {
540 let metrics = AnalyticsMetrics::new();
541
542 for _ in 0..5 {
544 metrics.record_cache_hit(QueryIntent::Retrieval);
545 }
546
547 let by_intent = metrics.by_intent();
548 if let Some(stats) = by_intent.get(&QueryIntent::Retrieval) {
549 assert_eq!(stats.calls, 0); }
551 }
552}