1use std::collections::VecDeque;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::time::{Duration, Instant};
8
9use dashmap::DashMap;
10use parking_lot::RwLock;
11
12use super::config::PatternConfig;
13use super::fingerprinter::QueryFingerprint;
14use super::statistics::QueryExecution;
15
16#[derive(Debug, Clone)]
18pub enum PatternAlert {
19 NplusOne(NplusOnePattern),
21 Burst(QueryBurst),
23}
24
25impl PatternAlert {
26 pub fn severity(&self) -> u8 {
28 match self {
29 PatternAlert::NplusOne(p) => {
30 if p.repeat_count > 100 {
31 5
32 } else if p.repeat_count > 50 {
33 4
34 } else if p.repeat_count > 20 {
35 3
36 } else if p.repeat_count > 10 {
37 2
38 } else {
39 1
40 }
41 }
42 PatternAlert::Burst(b) => {
43 if b.query_count > 500 {
44 5
45 } else if b.query_count > 200 {
46 4
47 } else if b.query_count > 100 {
48 3
49 } else if b.query_count > 50 {
50 2
51 } else {
52 1
53 }
54 }
55 }
56 }
57
58 pub fn description(&self) -> String {
60 match self {
61 PatternAlert::NplusOne(p) => {
62 format!(
63 "N+1 query pattern: {} repeated {} times in session {}",
64 truncate(&p.fingerprint, 50),
65 p.repeat_count,
66 p.session_id
67 )
68 }
69 PatternAlert::Burst(b) => {
70 format!(
71 "Query burst: {} queries in {:?} from session {}",
72 b.query_count, b.window, b.session_id
73 )
74 }
75 }
76 }
77}
78
79#[derive(Debug, Clone)]
81pub struct NplusOnePattern {
82 pub session_id: String,
84
85 pub fingerprint: String,
87
88 pub fingerprint_hash: u64,
90
91 pub repeat_count: usize,
93
94 pub window: Duration,
96
97 pub first_seen_nanos: u64,
99
100 pub last_seen_nanos: u64,
102
103 pub tables: Vec<String>,
105}
106
107#[derive(Debug, Clone)]
109pub struct QueryBurst {
110 pub session_id: String,
112
113 pub query_count: usize,
115
116 pub window: Duration,
118
119 pub start_nanos: u64,
121
122 pub end_nanos: u64,
124
125 pub top_fingerprints: Vec<(u64, usize)>,
127}
128
129struct SessionHistory {
131 query_times: VecDeque<Instant>,
133
134 recent_fingerprints: VecDeque<(u64, Instant, String, Vec<String>)>,
136
137 last_activity: Instant,
139
140 session_id: String,
142}
143
144impl SessionHistory {
145 fn new(session_id: String) -> Self {
146 Self {
147 query_times: VecDeque::new(),
148 recent_fingerprints: VecDeque::new(),
149 last_activity: Instant::now(),
150 session_id,
151 }
152 }
153
154 fn record_query(&mut self, fingerprint: &QueryFingerprint, max_history: usize) {
155 let now = Instant::now();
156 self.last_activity = now;
157
158 self.query_times.push_back(now);
160 while self.query_times.len() > max_history {
161 self.query_times.pop_front();
162 }
163
164 self.recent_fingerprints.push_back((
166 fingerprint.hash,
167 now,
168 fingerprint.normalized.clone(),
169 fingerprint.tables.clone(),
170 ));
171 while self.recent_fingerprints.len() > max_history {
172 self.recent_fingerprints.pop_front();
173 }
174 }
175
176 fn count_in_window(&self, window: Duration) -> usize {
177 let cutoff = Instant::now() - window;
178 self.query_times
179 .iter()
180 .filter(|t| **t > cutoff)
181 .count()
182 }
183
184 fn count_fingerprint_in_window(&self, hash: u64, window: Duration) -> usize {
185 let cutoff = Instant::now() - window;
186 self.recent_fingerprints
187 .iter()
188 .filter(|(h, t, _, _)| *h == hash && *t > cutoff)
189 .count()
190 }
191
192 fn get_repeated_fingerprints(&self, threshold: usize) -> Vec<(u64, usize, String, Vec<String>)> {
193 let mut counts: std::collections::HashMap<u64, (usize, String, Vec<String>)> =
194 std::collections::HashMap::new();
195
196 for (hash, _, normalized, tables) in &self.recent_fingerprints {
197 let entry = counts
198 .entry(*hash)
199 .or_insert((0, normalized.clone(), tables.clone()));
200 entry.0 += 1;
201 }
202
203 counts
204 .into_iter()
205 .filter(|(_, (count, _, _))| *count >= threshold)
206 .map(|(hash, (count, normalized, tables))| (hash, count, normalized, tables))
207 .collect()
208 }
209}
210
211pub struct PatternDetector {
213 config: PatternConfig,
215
216 sessions: DashMap<String, SessionHistory>,
218
219 alerts: RwLock<VecDeque<PatternAlert>>,
221
222 alert_count: AtomicU64,
224
225 last_cleanup: RwLock<Instant>,
227}
228
229impl PatternDetector {
230 pub fn new(config: PatternConfig) -> Self {
232 Self {
233 config,
234 sessions: DashMap::new(),
235 alerts: RwLock::new(VecDeque::new()),
236 alert_count: AtomicU64::new(0),
237 last_cleanup: RwLock::new(Instant::now()),
238 }
239 }
240
241 pub fn record_query(
243 &self,
244 session_id: &str,
245 _execution: &QueryExecution,
246 fingerprint: &QueryFingerprint,
247 ) {
248 self.maybe_cleanup();
250
251 let mut session = self
253 .sessions
254 .entry(session_id.to_string())
255 .or_insert_with(|| SessionHistory::new(session_id.to_string()));
256
257 session.record_query(fingerprint, self.config.session_history_size);
259
260 if self.config.n_plus_one_detection {
262 self.check_n_plus_one(&session, fingerprint);
263 }
264
265 if self.config.burst_detection {
267 self.check_burst(&session);
268 }
269 }
270
271 fn check_n_plus_one(&self, session: &SessionHistory, fingerprint: &QueryFingerprint) {
273 let count = session.count_fingerprint_in_window(fingerprint.hash, Duration::from_secs(5));
274
275 if count >= self.config.n_plus_one_threshold {
276 let pattern = NplusOnePattern {
277 session_id: session.session_id.clone(),
278 fingerprint: fingerprint.normalized.clone(),
279 fingerprint_hash: fingerprint.hash,
280 repeat_count: count,
281 window: Duration::from_secs(5),
282 first_seen_nanos: now_nanos(),
283 last_seen_nanos: now_nanos(),
284 tables: fingerprint.tables.clone(),
285 };
286
287 self.add_alert(PatternAlert::NplusOne(pattern));
288 }
289 }
290
291 fn check_burst(&self, session: &SessionHistory) {
293 let count = session.count_in_window(self.config.burst_window);
294
295 if count >= self.config.burst_threshold {
296 let repeated = session.get_repeated_fingerprints(3);
298 let top_fingerprints: Vec<_> = repeated
299 .iter()
300 .take(5)
301 .map(|(hash, count, _, _)| (*hash, *count))
302 .collect();
303
304 let burst = QueryBurst {
305 session_id: session.session_id.clone(),
306 query_count: count,
307 window: self.config.burst_window,
308 start_nanos: now_nanos() - self.config.burst_window.as_nanos() as u64,
309 end_nanos: now_nanos(),
310 top_fingerprints,
311 };
312
313 self.add_alert(PatternAlert::Burst(burst));
314 }
315 }
316
317 fn add_alert(&self, alert: PatternAlert) {
319 self.alert_count.fetch_add(1, Ordering::Relaxed);
320
321 let mut alerts = self.alerts.write();
322 alerts.push_back(alert);
323
324 while alerts.len() > 1000 {
326 alerts.pop_front();
327 }
328 }
329
330 pub fn get_alerts(&self) -> Vec<PatternAlert> {
332 self.alerts.read().iter().cloned().collect()
333 }
334
335 pub fn get_n_plus_one_alerts(&self) -> Vec<NplusOnePattern> {
337 self.alerts
338 .read()
339 .iter()
340 .filter_map(|a| match a {
341 PatternAlert::NplusOne(p) => Some(p.clone()),
342 _ => None,
343 })
344 .collect()
345 }
346
347 pub fn get_burst_alerts(&self) -> Vec<QueryBurst> {
349 self.alerts
350 .read()
351 .iter()
352 .filter_map(|a| match a {
353 PatternAlert::Burst(b) => Some(b.clone()),
354 _ => None,
355 })
356 .collect()
357 }
358
359 pub fn alert_count(&self) -> u64 {
361 self.alert_count.load(Ordering::Relaxed)
362 }
363
364 pub fn clear_alerts(&self) {
366 self.alerts.write().clear();
367 }
368
369 fn maybe_cleanup(&self) {
371 let now = Instant::now();
372 let mut last_cleanup = self.last_cleanup.write();
373
374 if now.duration_since(*last_cleanup) < Duration::from_secs(60) {
376 return;
377 }
378 *last_cleanup = now;
379 drop(last_cleanup);
380
381 let timeout = self.config.session_timeout;
383 self.sessions.retain(|_, session| {
384 now.duration_since(session.last_activity) < timeout
385 });
386
387 while self.sessions.len() > self.config.max_sessions {
389 let oldest = self
391 .sessions
392 .iter()
393 .min_by_key(|s| s.last_activity)
394 .map(|s| s.key().clone());
395
396 if let Some(key) = oldest {
397 self.sessions.remove(&key);
398 } else {
399 break;
400 }
401 }
402 }
403
404 pub fn session_count(&self) -> usize {
406 self.sessions.len()
407 }
408
409 pub fn reset(&self) {
411 self.sessions.clear();
412 self.alerts.write().clear();
413 self.alert_count.store(0, Ordering::Relaxed);
414 }
415}
416
417fn now_nanos() -> u64 {
418 std::time::SystemTime::now()
419 .duration_since(std::time::SystemTime::UNIX_EPOCH)
420 .map(|d| d.as_nanos() as u64)
421 .unwrap_or(0)
422}
423
424fn truncate(s: &str, max: usize) -> String {
425 if s.len() > max {
426 format!("{}...", &s[..max])
427 } else {
428 s.to_string()
429 }
430}
431
432#[cfg(test)]
433mod tests {
434 use super::*;
435 use crate::analytics::fingerprinter::QueryFingerprinter;
436
437 #[test]
438 fn test_pattern_detector_new() {
439 let config = PatternConfig::default();
440 let detector = PatternDetector::new(config);
441 assert_eq!(detector.session_count(), 0);
442 assert_eq!(detector.alert_count(), 0);
443 }
444
445 #[test]
446 fn test_n_plus_one_detection() {
447 let mut config = PatternConfig::default();
448 config.n_plus_one_threshold = 3;
449 config.burst_detection = false;
450
451 let detector = PatternDetector::new(config);
452 let fp = QueryFingerprinter::new();
453
454 let session_id = "session-1";
455
456 for i in 0..5 {
458 let query = format!("SELECT * FROM users WHERE id = {}", i);
459 let fingerprint = fp.fingerprint(&query);
460 let execution = super::super::statistics::QueryExecution::new(
461 query,
462 Duration::from_millis(5),
463 );
464 detector.record_query(session_id, &execution, &fingerprint);
465 }
466
467 let alerts = detector.get_n_plus_one_alerts();
469 assert!(!alerts.is_empty(), "Should detect N+1 pattern");
470 }
471
472 #[test]
473 fn test_burst_detection() {
474 let mut config = PatternConfig::default();
475 config.burst_threshold = 5;
476 config.burst_window = Duration::from_secs(1);
477 config.n_plus_one_detection = false;
478
479 let detector = PatternDetector::new(config);
480 let fp = QueryFingerprinter::new();
481
482 let session_id = "session-1";
483
484 for i in 0..10 {
486 let query = format!("SELECT * FROM table_{}", i);
487 let fingerprint = fp.fingerprint(&query);
488 let execution = super::super::statistics::QueryExecution::new(
489 query,
490 Duration::from_millis(1),
491 );
492 detector.record_query(session_id, &execution, &fingerprint);
493 }
494
495 let alerts = detector.get_burst_alerts();
497 assert!(!alerts.is_empty(), "Should detect burst pattern");
498 }
499
500 #[test]
501 fn test_alert_severity() {
502 let pattern = NplusOnePattern {
503 session_id: "session-1".to_string(),
504 fingerprint: "select * from users where id = ?".to_string(),
505 fingerprint_hash: 12345,
506 repeat_count: 25,
507 window: Duration::from_secs(5),
508 first_seen_nanos: 0,
509 last_seen_nanos: 0,
510 tables: vec!["users".to_string()],
511 };
512
513 let alert = PatternAlert::NplusOne(pattern);
514 assert_eq!(alert.severity(), 3);
515 }
516
517 #[test]
518 fn test_session_cleanup() {
519 let mut config = PatternConfig::default();
520 config.session_timeout = Duration::from_millis(100);
521
522 let detector = PatternDetector::new(config);
523 let fp = QueryFingerprinter::new();
524
525 let fingerprint = fp.fingerprint("SELECT 1");
527 let execution = super::super::statistics::QueryExecution::new(
528 "SELECT 1",
529 Duration::from_millis(1),
530 );
531 detector.record_query("session-1", &execution, &fingerprint);
532
533 assert_eq!(detector.session_count(), 1);
534
535 std::thread::sleep(Duration::from_millis(150));
537
538 detector.record_query("session-2", &execution, &fingerprint);
540
541 }
544
545 #[test]
546 fn test_reset() {
547 let config = PatternConfig::default();
548 let detector = PatternDetector::new(config);
549 let fp = QueryFingerprinter::new();
550
551 let fingerprint = fp.fingerprint("SELECT 1");
552 let execution = super::super::statistics::QueryExecution::new(
553 "SELECT 1",
554 Duration::from_millis(1),
555 );
556 detector.record_query("session-1", &execution, &fingerprint);
557
558 detector.reset();
559
560 assert_eq!(detector.session_count(), 0);
561 assert_eq!(detector.alert_count(), 0);
562 }
563
564 #[test]
565 fn test_alert_description() {
566 let pattern = NplusOnePattern {
567 session_id: "sess-123".to_string(),
568 fingerprint: "select * from users where id = ?".to_string(),
569 fingerprint_hash: 12345,
570 repeat_count: 10,
571 window: Duration::from_secs(5),
572 first_seen_nanos: 0,
573 last_seen_nanos: 0,
574 tables: vec!["users".to_string()],
575 };
576
577 let alert = PatternAlert::NplusOne(pattern);
578 let desc = alert.description();
579
580 assert!(desc.contains("N+1"));
581 assert!(desc.contains("10 times"));
582 assert!(desc.contains("sess-123"));
583 }
584}