Skip to main content

heliosdb_proxy/analytics/
patterns.rs

1//! Pattern Detection
2//!
3//! Detect problematic query patterns like N+1 queries and query bursts.
4
5use std::collections::VecDeque;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::time::{Duration, Instant};
8
9use dashmap::DashMap;
10use parking_lot::RwLock;
11
12use super::config::PatternConfig;
13use super::fingerprinter::QueryFingerprint;
14use super::statistics::QueryExecution;
15
16/// Pattern alert types
17#[derive(Debug, Clone)]
18pub enum PatternAlert {
19    /// N+1 query detected
20    NplusOne(NplusOnePattern),
21    /// Query burst detected
22    Burst(QueryBurst),
23}
24
25impl PatternAlert {
26    /// Get severity level (1-5)
27    pub fn severity(&self) -> u8 {
28        match self {
29            PatternAlert::NplusOne(p) => {
30                if p.repeat_count > 100 {
31                    5
32                } else if p.repeat_count > 50 {
33                    4
34                } else if p.repeat_count > 20 {
35                    3
36                } else if p.repeat_count > 10 {
37                    2
38                } else {
39                    1
40                }
41            }
42            PatternAlert::Burst(b) => {
43                if b.query_count > 500 {
44                    5
45                } else if b.query_count > 200 {
46                    4
47                } else if b.query_count > 100 {
48                    3
49                } else if b.query_count > 50 {
50                    2
51                } else {
52                    1
53                }
54            }
55        }
56    }
57
58    /// Get description
59    pub fn description(&self) -> String {
60        match self {
61            PatternAlert::NplusOne(p) => {
62                format!(
63                    "N+1 query pattern: {} repeated {} times in session {}",
64                    truncate(&p.fingerprint, 50),
65                    p.repeat_count,
66                    p.session_id
67                )
68            }
69            PatternAlert::Burst(b) => {
70                format!(
71                    "Query burst: {} queries in {:?} from session {}",
72                    b.query_count, b.window, b.session_id
73                )
74            }
75        }
76    }
77}
78
79/// N+1 query pattern
80#[derive(Debug, Clone)]
81pub struct NplusOnePattern {
82    /// Session that exhibited the pattern
83    pub session_id: String,
84
85    /// Fingerprint of the repeated query
86    pub fingerprint: String,
87
88    /// Fingerprint hash
89    pub fingerprint_hash: u64,
90
91    /// Number of repetitions
92    pub repeat_count: usize,
93
94    /// Time window in which repetitions occurred
95    pub window: Duration,
96
97    /// First seen timestamp
98    pub first_seen_nanos: u64,
99
100    /// Last seen timestamp
101    pub last_seen_nanos: u64,
102
103    /// Tables involved
104    pub tables: Vec<String>,
105}
106
107/// Query burst (many queries in short window)
108#[derive(Debug, Clone)]
109pub struct QueryBurst {
110    /// Session that exhibited the burst
111    pub session_id: String,
112
113    /// Number of queries in window
114    pub query_count: usize,
115
116    /// Detection window
117    pub window: Duration,
118
119    /// Start timestamp
120    pub start_nanos: u64,
121
122    /// End timestamp
123    pub end_nanos: u64,
124
125    /// Top fingerprints in burst
126    pub top_fingerprints: Vec<(u64, usize)>,
127}
128
129/// Session query history
130struct SessionHistory {
131    /// Recent query timestamps
132    query_times: VecDeque<Instant>,
133
134    /// Recent fingerprint hashes
135    recent_fingerprints: VecDeque<(u64, Instant, String, Vec<String>)>,
136
137    /// Last activity time
138    last_activity: Instant,
139
140    /// Session ID
141    session_id: String,
142}
143
144impl SessionHistory {
145    fn new(session_id: String) -> Self {
146        Self {
147            query_times: VecDeque::new(),
148            recent_fingerprints: VecDeque::new(),
149            last_activity: Instant::now(),
150            session_id,
151        }
152    }
153
154    fn record_query(&mut self, fingerprint: &QueryFingerprint, max_history: usize) {
155        let now = Instant::now();
156        self.last_activity = now;
157
158        // Record timestamp
159        self.query_times.push_back(now);
160        while self.query_times.len() > max_history {
161            self.query_times.pop_front();
162        }
163
164        // Record fingerprint
165        self.recent_fingerprints.push_back((
166            fingerprint.hash,
167            now,
168            fingerprint.normalized.clone(),
169            fingerprint.tables.clone(),
170        ));
171        while self.recent_fingerprints.len() > max_history {
172            self.recent_fingerprints.pop_front();
173        }
174    }
175
176    fn count_in_window(&self, window: Duration) -> usize {
177        let cutoff = Instant::now() - window;
178        self.query_times
179            .iter()
180            .filter(|t| **t > cutoff)
181            .count()
182    }
183
184    fn count_fingerprint_in_window(&self, hash: u64, window: Duration) -> usize {
185        let cutoff = Instant::now() - window;
186        self.recent_fingerprints
187            .iter()
188            .filter(|(h, t, _, _)| *h == hash && *t > cutoff)
189            .count()
190    }
191
192    fn get_repeated_fingerprints(&self, threshold: usize) -> Vec<(u64, usize, String, Vec<String>)> {
193        let mut counts: std::collections::HashMap<u64, (usize, String, Vec<String>)> =
194            std::collections::HashMap::new();
195
196        for (hash, _, normalized, tables) in &self.recent_fingerprints {
197            let entry = counts
198                .entry(*hash)
199                .or_insert((0, normalized.clone(), tables.clone()));
200            entry.0 += 1;
201        }
202
203        counts
204            .into_iter()
205            .filter(|(_, (count, _, _))| *count >= threshold)
206            .map(|(hash, (count, normalized, tables))| (hash, count, normalized, tables))
207            .collect()
208    }
209}
210
211/// Pattern detector
212pub struct PatternDetector {
213    /// Configuration
214    config: PatternConfig,
215
216    /// Per-session history
217    sessions: DashMap<String, SessionHistory>,
218
219    /// Detected alerts
220    alerts: RwLock<VecDeque<PatternAlert>>,
221
222    /// Alert counter
223    alert_count: AtomicU64,
224
225    /// Last cleanup time
226    last_cleanup: RwLock<Instant>,
227}
228
229impl PatternDetector {
230    /// Create new pattern detector
231    pub fn new(config: PatternConfig) -> Self {
232        Self {
233            config,
234            sessions: DashMap::new(),
235            alerts: RwLock::new(VecDeque::new()),
236            alert_count: AtomicU64::new(0),
237            last_cleanup: RwLock::new(Instant::now()),
238        }
239    }
240
241    /// Record a query execution
242    pub fn record_query(
243        &self,
244        session_id: &str,
245        _execution: &QueryExecution,
246        fingerprint: &QueryFingerprint,
247    ) {
248        // Periodic cleanup
249        self.maybe_cleanup();
250
251        // Get or create session history
252        let mut session = self
253            .sessions
254            .entry(session_id.to_string())
255            .or_insert_with(|| SessionHistory::new(session_id.to_string()));
256
257        // Record the query
258        session.record_query(fingerprint, self.config.session_history_size);
259
260        // Check for N+1 pattern
261        if self.config.n_plus_one_detection {
262            self.check_n_plus_one(&session, fingerprint);
263        }
264
265        // Check for burst pattern
266        if self.config.burst_detection {
267            self.check_burst(&session);
268        }
269    }
270
271    /// Check for N+1 query pattern
272    fn check_n_plus_one(&self, session: &SessionHistory, fingerprint: &QueryFingerprint) {
273        let count = session.count_fingerprint_in_window(fingerprint.hash, Duration::from_secs(5));
274
275        if count >= self.config.n_plus_one_threshold {
276            let pattern = NplusOnePattern {
277                session_id: session.session_id.clone(),
278                fingerprint: fingerprint.normalized.clone(),
279                fingerprint_hash: fingerprint.hash,
280                repeat_count: count,
281                window: Duration::from_secs(5),
282                first_seen_nanos: now_nanos(),
283                last_seen_nanos: now_nanos(),
284                tables: fingerprint.tables.clone(),
285            };
286
287            self.add_alert(PatternAlert::NplusOne(pattern));
288        }
289    }
290
291    /// Check for query burst
292    fn check_burst(&self, session: &SessionHistory) {
293        let count = session.count_in_window(self.config.burst_window);
294
295        if count >= self.config.burst_threshold {
296            // Get top fingerprints in the burst
297            let repeated = session.get_repeated_fingerprints(3);
298            let top_fingerprints: Vec<_> = repeated
299                .iter()
300                .take(5)
301                .map(|(hash, count, _, _)| (*hash, *count))
302                .collect();
303
304            let burst = QueryBurst {
305                session_id: session.session_id.clone(),
306                query_count: count,
307                window: self.config.burst_window,
308                start_nanos: now_nanos() - self.config.burst_window.as_nanos() as u64,
309                end_nanos: now_nanos(),
310                top_fingerprints,
311            };
312
313            self.add_alert(PatternAlert::Burst(burst));
314        }
315    }
316
317    /// Add an alert
318    fn add_alert(&self, alert: PatternAlert) {
319        self.alert_count.fetch_add(1, Ordering::Relaxed);
320
321        let mut alerts = self.alerts.write();
322        alerts.push_back(alert);
323
324        // Keep only recent alerts (max 1000)
325        while alerts.len() > 1000 {
326            alerts.pop_front();
327        }
328    }
329
330    /// Get recent alerts
331    pub fn get_alerts(&self) -> Vec<PatternAlert> {
332        self.alerts.read().iter().cloned().collect()
333    }
334
335    /// Get alerts by type
336    pub fn get_n_plus_one_alerts(&self) -> Vec<NplusOnePattern> {
337        self.alerts
338            .read()
339            .iter()
340            .filter_map(|a| match a {
341                PatternAlert::NplusOne(p) => Some(p.clone()),
342                _ => None,
343            })
344            .collect()
345    }
346
347    /// Get burst alerts
348    pub fn get_burst_alerts(&self) -> Vec<QueryBurst> {
349        self.alerts
350            .read()
351            .iter()
352            .filter_map(|a| match a {
353                PatternAlert::Burst(b) => Some(b.clone()),
354                _ => None,
355            })
356            .collect()
357    }
358
359    /// Get alert count
360    pub fn alert_count(&self) -> u64 {
361        self.alert_count.load(Ordering::Relaxed)
362    }
363
364    /// Clear alerts
365    pub fn clear_alerts(&self) {
366        self.alerts.write().clear();
367    }
368
369    /// Cleanup inactive sessions
370    fn maybe_cleanup(&self) {
371        let now = Instant::now();
372        let mut last_cleanup = self.last_cleanup.write();
373
374        // Cleanup every minute
375        if now.duration_since(*last_cleanup) < Duration::from_secs(60) {
376            return;
377        }
378        *last_cleanup = now;
379        drop(last_cleanup);
380
381        // Remove inactive sessions
382        let timeout = self.config.session_timeout;
383        self.sessions.retain(|_, session| {
384            now.duration_since(session.last_activity) < timeout
385        });
386
387        // Enforce max sessions
388        while self.sessions.len() > self.config.max_sessions {
389            // Remove oldest session
390            let oldest = self
391                .sessions
392                .iter()
393                .min_by_key(|s| s.last_activity)
394                .map(|s| s.key().clone());
395
396            if let Some(key) = oldest {
397                self.sessions.remove(&key);
398            } else {
399                break;
400            }
401        }
402    }
403
404    /// Get session count
405    pub fn session_count(&self) -> usize {
406        self.sessions.len()
407    }
408
409    /// Reset detector
410    pub fn reset(&self) {
411        self.sessions.clear();
412        self.alerts.write().clear();
413        self.alert_count.store(0, Ordering::Relaxed);
414    }
415}
416
417fn now_nanos() -> u64 {
418    std::time::SystemTime::now()
419        .duration_since(std::time::SystemTime::UNIX_EPOCH)
420        .map(|d| d.as_nanos() as u64)
421        .unwrap_or(0)
422}
423
424fn truncate(s: &str, max: usize) -> String {
425    if s.len() > max {
426        format!("{}...", &s[..max])
427    } else {
428        s.to_string()
429    }
430}
431
432#[cfg(test)]
433mod tests {
434    use super::*;
435    use crate::analytics::fingerprinter::QueryFingerprinter;
436
437    #[test]
438    fn test_pattern_detector_new() {
439        let config = PatternConfig::default();
440        let detector = PatternDetector::new(config);
441        assert_eq!(detector.session_count(), 0);
442        assert_eq!(detector.alert_count(), 0);
443    }
444
445    #[test]
446    fn test_n_plus_one_detection() {
447        let mut config = PatternConfig::default();
448        config.n_plus_one_threshold = 3;
449        config.burst_detection = false;
450
451        let detector = PatternDetector::new(config);
452        let fp = QueryFingerprinter::new();
453
454        let session_id = "session-1";
455
456        // Record same query multiple times
457        for i in 0..5 {
458            let query = format!("SELECT * FROM users WHERE id = {}", i);
459            let fingerprint = fp.fingerprint(&query);
460            let execution = super::super::statistics::QueryExecution::new(
461                query,
462                Duration::from_millis(5),
463            );
464            detector.record_query(session_id, &execution, &fingerprint);
465        }
466
467        // Should have detected N+1 pattern
468        let alerts = detector.get_n_plus_one_alerts();
469        assert!(!alerts.is_empty(), "Should detect N+1 pattern");
470    }
471
472    #[test]
473    fn test_burst_detection() {
474        let mut config = PatternConfig::default();
475        config.burst_threshold = 5;
476        config.burst_window = Duration::from_secs(1);
477        config.n_plus_one_detection = false;
478
479        let detector = PatternDetector::new(config);
480        let fp = QueryFingerprinter::new();
481
482        let session_id = "session-1";
483
484        // Record many queries quickly
485        for i in 0..10 {
486            let query = format!("SELECT * FROM table_{}", i);
487            let fingerprint = fp.fingerprint(&query);
488            let execution = super::super::statistics::QueryExecution::new(
489                query,
490                Duration::from_millis(1),
491            );
492            detector.record_query(session_id, &execution, &fingerprint);
493        }
494
495        // Should have detected burst
496        let alerts = detector.get_burst_alerts();
497        assert!(!alerts.is_empty(), "Should detect burst pattern");
498    }
499
500    #[test]
501    fn test_alert_severity() {
502        let pattern = NplusOnePattern {
503            session_id: "session-1".to_string(),
504            fingerprint: "select * from users where id = ?".to_string(),
505            fingerprint_hash: 12345,
506            repeat_count: 25,
507            window: Duration::from_secs(5),
508            first_seen_nanos: 0,
509            last_seen_nanos: 0,
510            tables: vec!["users".to_string()],
511        };
512
513        let alert = PatternAlert::NplusOne(pattern);
514        assert_eq!(alert.severity(), 3);
515    }
516
517    #[test]
518    fn test_session_cleanup() {
519        let mut config = PatternConfig::default();
520        config.session_timeout = Duration::from_millis(100);
521
522        let detector = PatternDetector::new(config);
523        let fp = QueryFingerprinter::new();
524
525        // Record query in session
526        let fingerprint = fp.fingerprint("SELECT 1");
527        let execution = super::super::statistics::QueryExecution::new(
528            "SELECT 1",
529            Duration::from_millis(1),
530        );
531        detector.record_query("session-1", &execution, &fingerprint);
532
533        assert_eq!(detector.session_count(), 1);
534
535        // Wait for timeout
536        std::thread::sleep(Duration::from_millis(150));
537
538        // Record in new session to trigger cleanup
539        detector.record_query("session-2", &execution, &fingerprint);
540
541        // Old session should be cleaned up (cleanup runs every minute in production,
542        // but our test may or may not have triggered it)
543    }
544
545    #[test]
546    fn test_reset() {
547        let config = PatternConfig::default();
548        let detector = PatternDetector::new(config);
549        let fp = QueryFingerprinter::new();
550
551        let fingerprint = fp.fingerprint("SELECT 1");
552        let execution = super::super::statistics::QueryExecution::new(
553            "SELECT 1",
554            Duration::from_millis(1),
555        );
556        detector.record_query("session-1", &execution, &fingerprint);
557
558        detector.reset();
559
560        assert_eq!(detector.session_count(), 0);
561        assert_eq!(detector.alert_count(), 0);
562    }
563
564    #[test]
565    fn test_alert_description() {
566        let pattern = NplusOnePattern {
567            session_id: "sess-123".to_string(),
568            fingerprint: "select * from users where id = ?".to_string(),
569            fingerprint_hash: 12345,
570            repeat_count: 10,
571            window: Duration::from_secs(5),
572            first_seen_nanos: 0,
573            last_seen_nanos: 0,
574            tables: vec!["users".to_string()],
575        };
576
577        let alert = PatternAlert::NplusOne(pattern);
578        let desc = alert.description();
579
580        assert!(desc.contains("N+1"));
581        assert!(desc.contains("10 times"));
582        assert!(desc.contains("sess-123"));
583    }
584}