Skip to main content

heliosdb_proxy/analytics/
patterns.rs

1//! Pattern Detection
2//!
3//! Detect problematic query patterns like N+1 queries and query bursts.
4
5use std::collections::VecDeque;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::time::{Duration, Instant};
8
9use dashmap::DashMap;
10use parking_lot::RwLock;
11
12use super::config::PatternConfig;
13use super::fingerprinter::QueryFingerprint;
14use super::statistics::QueryExecution;
15
16/// Pattern alert types
17#[derive(Debug, Clone)]
18pub enum PatternAlert {
19    /// N+1 query detected
20    NplusOne(NplusOnePattern),
21    /// Query burst detected
22    Burst(QueryBurst),
23}
24
25impl PatternAlert {
26    /// Get severity level (1-5)
27    pub fn severity(&self) -> u8 {
28        match self {
29            PatternAlert::NplusOne(p) => {
30                if p.repeat_count > 100 {
31                    5
32                } else if p.repeat_count > 50 {
33                    4
34                } else if p.repeat_count > 20 {
35                    3
36                } else if p.repeat_count > 10 {
37                    2
38                } else {
39                    1
40                }
41            }
42            PatternAlert::Burst(b) => {
43                if b.query_count > 500 {
44                    5
45                } else if b.query_count > 200 {
46                    4
47                } else if b.query_count > 100 {
48                    3
49                } else if b.query_count > 50 {
50                    2
51                } else {
52                    1
53                }
54            }
55        }
56    }
57
58    /// Get description
59    pub fn description(&self) -> String {
60        match self {
61            PatternAlert::NplusOne(p) => {
62                format!(
63                    "N+1 query pattern: {} repeated {} times in session {}",
64                    truncate(&p.fingerprint, 50),
65                    p.repeat_count,
66                    p.session_id
67                )
68            }
69            PatternAlert::Burst(b) => {
70                format!(
71                    "Query burst: {} queries in {:?} from session {}",
72                    b.query_count, b.window, b.session_id
73                )
74            }
75        }
76    }
77}
78
79/// N+1 query pattern
80#[derive(Debug, Clone)]
81pub struct NplusOnePattern {
82    /// Session that exhibited the pattern
83    pub session_id: String,
84
85    /// Fingerprint of the repeated query
86    pub fingerprint: String,
87
88    /// Fingerprint hash
89    pub fingerprint_hash: u64,
90
91    /// Number of repetitions
92    pub repeat_count: usize,
93
94    /// Time window in which repetitions occurred
95    pub window: Duration,
96
97    /// First seen timestamp
98    pub first_seen_nanos: u64,
99
100    /// Last seen timestamp
101    pub last_seen_nanos: u64,
102
103    /// Tables involved
104    pub tables: Vec<String>,
105}
106
107/// Query burst (many queries in short window)
108#[derive(Debug, Clone)]
109pub struct QueryBurst {
110    /// Session that exhibited the burst
111    pub session_id: String,
112
113    /// Number of queries in window
114    pub query_count: usize,
115
116    /// Detection window
117    pub window: Duration,
118
119    /// Start timestamp
120    pub start_nanos: u64,
121
122    /// End timestamp
123    pub end_nanos: u64,
124
125    /// Top fingerprints in burst
126    pub top_fingerprints: Vec<(u64, usize)>,
127}
128
129/// Session query history
130struct SessionHistory {
131    /// Recent query timestamps
132    query_times: VecDeque<Instant>,
133
134    /// Recent fingerprint hashes
135    recent_fingerprints: VecDeque<(u64, Instant, String, Vec<String>)>,
136
137    /// Last activity time
138    last_activity: Instant,
139
140    /// Session ID
141    session_id: String,
142}
143
144impl SessionHistory {
145    fn new(session_id: String) -> Self {
146        Self {
147            query_times: VecDeque::new(),
148            recent_fingerprints: VecDeque::new(),
149            last_activity: Instant::now(),
150            session_id,
151        }
152    }
153
154    fn record_query(&mut self, fingerprint: &QueryFingerprint, max_history: usize) {
155        let now = Instant::now();
156        self.last_activity = now;
157
158        // Record timestamp
159        self.query_times.push_back(now);
160        while self.query_times.len() > max_history {
161            self.query_times.pop_front();
162        }
163
164        // Record fingerprint
165        self.recent_fingerprints.push_back((
166            fingerprint.hash,
167            now,
168            fingerprint.normalized.clone(),
169            fingerprint.tables.clone(),
170        ));
171        while self.recent_fingerprints.len() > max_history {
172            self.recent_fingerprints.pop_front();
173        }
174    }
175
176    fn count_in_window(&self, window: Duration) -> usize {
177        let cutoff = Instant::now() - window;
178        self.query_times.iter().filter(|t| **t > cutoff).count()
179    }
180
181    fn count_fingerprint_in_window(&self, hash: u64, window: Duration) -> usize {
182        let cutoff = Instant::now() - window;
183        self.recent_fingerprints
184            .iter()
185            .filter(|(h, t, _, _)| *h == hash && *t > cutoff)
186            .count()
187    }
188
189    fn get_repeated_fingerprints(
190        &self,
191        threshold: usize,
192    ) -> Vec<(u64, usize, String, Vec<String>)> {
193        let mut counts: std::collections::HashMap<u64, (usize, String, Vec<String>)> =
194            std::collections::HashMap::new();
195
196        for (hash, _, normalized, tables) in &self.recent_fingerprints {
197            let entry = counts
198                .entry(*hash)
199                .or_insert((0, normalized.clone(), tables.clone()));
200            entry.0 += 1;
201        }
202
203        counts
204            .into_iter()
205            .filter(|(_, (count, _, _))| *count >= threshold)
206            .map(|(hash, (count, normalized, tables))| (hash, count, normalized, tables))
207            .collect()
208    }
209}
210
211/// Pattern detector
212pub struct PatternDetector {
213    /// Configuration
214    config: PatternConfig,
215
216    /// Per-session history
217    sessions: DashMap<String, SessionHistory>,
218
219    /// Detected alerts
220    alerts: RwLock<VecDeque<PatternAlert>>,
221
222    /// Alert counter
223    alert_count: AtomicU64,
224
225    /// Last cleanup time
226    last_cleanup: RwLock<Instant>,
227}
228
229impl PatternDetector {
230    /// Create new pattern detector
231    pub fn new(config: PatternConfig) -> Self {
232        Self {
233            config,
234            sessions: DashMap::new(),
235            alerts: RwLock::new(VecDeque::new()),
236            alert_count: AtomicU64::new(0),
237            last_cleanup: RwLock::new(Instant::now()),
238        }
239    }
240
241    /// Record a query execution
242    pub fn record_query(
243        &self,
244        session_id: &str,
245        _execution: &QueryExecution,
246        fingerprint: &QueryFingerprint,
247    ) {
248        // Periodic cleanup
249        self.maybe_cleanup();
250
251        // Get or create session history
252        let mut session = self
253            .sessions
254            .entry(session_id.to_string())
255            .or_insert_with(|| SessionHistory::new(session_id.to_string()));
256
257        // Record the query
258        session.record_query(fingerprint, self.config.session_history_size);
259
260        // Check for N+1 pattern
261        if self.config.n_plus_one_detection {
262            self.check_n_plus_one(&session, fingerprint);
263        }
264
265        // Check for burst pattern
266        if self.config.burst_detection {
267            self.check_burst(&session);
268        }
269    }
270
271    /// Check for N+1 query pattern
272    fn check_n_plus_one(&self, session: &SessionHistory, fingerprint: &QueryFingerprint) {
273        let count = session.count_fingerprint_in_window(fingerprint.hash, Duration::from_secs(5));
274
275        if count >= self.config.n_plus_one_threshold {
276            let pattern = NplusOnePattern {
277                session_id: session.session_id.clone(),
278                fingerprint: fingerprint.normalized.clone(),
279                fingerprint_hash: fingerprint.hash,
280                repeat_count: count,
281                window: Duration::from_secs(5),
282                first_seen_nanos: now_nanos(),
283                last_seen_nanos: now_nanos(),
284                tables: fingerprint.tables.clone(),
285            };
286
287            self.add_alert(PatternAlert::NplusOne(pattern));
288        }
289    }
290
291    /// Check for query burst
292    fn check_burst(&self, session: &SessionHistory) {
293        let count = session.count_in_window(self.config.burst_window);
294
295        if count >= self.config.burst_threshold {
296            // Get top fingerprints in the burst
297            let repeated = session.get_repeated_fingerprints(3);
298            let top_fingerprints: Vec<_> = repeated
299                .iter()
300                .take(5)
301                .map(|(hash, count, _, _)| (*hash, *count))
302                .collect();
303
304            let burst = QueryBurst {
305                session_id: session.session_id.clone(),
306                query_count: count,
307                window: self.config.burst_window,
308                start_nanos: now_nanos() - self.config.burst_window.as_nanos() as u64,
309                end_nanos: now_nanos(),
310                top_fingerprints,
311            };
312
313            self.add_alert(PatternAlert::Burst(burst));
314        }
315    }
316
317    /// Add an alert
318    fn add_alert(&self, alert: PatternAlert) {
319        self.alert_count.fetch_add(1, Ordering::Relaxed);
320
321        let mut alerts = self.alerts.write();
322        alerts.push_back(alert);
323
324        // Keep only recent alerts (max 1000)
325        while alerts.len() > 1000 {
326            alerts.pop_front();
327        }
328    }
329
330    /// Get recent alerts
331    pub fn get_alerts(&self) -> Vec<PatternAlert> {
332        self.alerts.read().iter().cloned().collect()
333    }
334
335    /// Get alerts by type
336    pub fn get_n_plus_one_alerts(&self) -> Vec<NplusOnePattern> {
337        self.alerts
338            .read()
339            .iter()
340            .filter_map(|a| match a {
341                PatternAlert::NplusOne(p) => Some(p.clone()),
342                _ => None,
343            })
344            .collect()
345    }
346
347    /// Get burst alerts
348    pub fn get_burst_alerts(&self) -> Vec<QueryBurst> {
349        self.alerts
350            .read()
351            .iter()
352            .filter_map(|a| match a {
353                PatternAlert::Burst(b) => Some(b.clone()),
354                _ => None,
355            })
356            .collect()
357    }
358
359    /// Get alert count
360    pub fn alert_count(&self) -> u64 {
361        self.alert_count.load(Ordering::Relaxed)
362    }
363
364    /// Clear alerts
365    pub fn clear_alerts(&self) {
366        self.alerts.write().clear();
367    }
368
369    /// Cleanup inactive sessions
370    fn maybe_cleanup(&self) {
371        let now = Instant::now();
372        let mut last_cleanup = self.last_cleanup.write();
373
374        // Cleanup every minute
375        if now.duration_since(*last_cleanup) < Duration::from_secs(60) {
376            return;
377        }
378        *last_cleanup = now;
379        drop(last_cleanup);
380
381        // Remove inactive sessions
382        let timeout = self.config.session_timeout;
383        self.sessions
384            .retain(|_, session| now.duration_since(session.last_activity) < timeout);
385
386        // Enforce max sessions
387        while self.sessions.len() > self.config.max_sessions {
388            // Remove oldest session
389            let oldest = self
390                .sessions
391                .iter()
392                .min_by_key(|s| s.last_activity)
393                .map(|s| s.key().clone());
394
395            if let Some(key) = oldest {
396                self.sessions.remove(&key);
397            } else {
398                break;
399            }
400        }
401    }
402
403    /// Get session count
404    pub fn session_count(&self) -> usize {
405        self.sessions.len()
406    }
407
408    /// Reset detector
409    pub fn reset(&self) {
410        self.sessions.clear();
411        self.alerts.write().clear();
412        self.alert_count.store(0, Ordering::Relaxed);
413    }
414}
415
416fn now_nanos() -> u64 {
417    std::time::SystemTime::now()
418        .duration_since(std::time::SystemTime::UNIX_EPOCH)
419        .map(|d| d.as_nanos() as u64)
420        .unwrap_or(0)
421}
422
423fn truncate(s: &str, max: usize) -> String {
424    if s.len() > max {
425        format!("{}...", &s[..max])
426    } else {
427        s.to_string()
428    }
429}
430
431#[cfg(test)]
432mod tests {
433    use super::*;
434    use crate::analytics::fingerprinter::QueryFingerprinter;
435
436    #[test]
437    fn test_pattern_detector_new() {
438        let config = PatternConfig::default();
439        let detector = PatternDetector::new(config);
440        assert_eq!(detector.session_count(), 0);
441        assert_eq!(detector.alert_count(), 0);
442    }
443
444    #[test]
445    fn test_n_plus_one_detection() {
446        let mut config = PatternConfig::default();
447        config.n_plus_one_threshold = 3;
448        config.burst_detection = false;
449
450        let detector = PatternDetector::new(config);
451        let fp = QueryFingerprinter::new();
452
453        let session_id = "session-1";
454
455        // Record same query multiple times
456        for i in 0..5 {
457            let query = format!("SELECT * FROM users WHERE id = {}", i);
458            let fingerprint = fp.fingerprint(&query);
459            let execution =
460                super::super::statistics::QueryExecution::new(query, Duration::from_millis(5));
461            detector.record_query(session_id, &execution, &fingerprint);
462        }
463
464        // Should have detected N+1 pattern
465        let alerts = detector.get_n_plus_one_alerts();
466        assert!(!alerts.is_empty(), "Should detect N+1 pattern");
467    }
468
469    #[test]
470    fn test_burst_detection() {
471        let mut config = PatternConfig::default();
472        config.burst_threshold = 5;
473        config.burst_window = Duration::from_secs(1);
474        config.n_plus_one_detection = false;
475
476        let detector = PatternDetector::new(config);
477        let fp = QueryFingerprinter::new();
478
479        let session_id = "session-1";
480
481        // Record many queries quickly
482        for i in 0..10 {
483            let query = format!("SELECT * FROM table_{}", i);
484            let fingerprint = fp.fingerprint(&query);
485            let execution =
486                super::super::statistics::QueryExecution::new(query, Duration::from_millis(1));
487            detector.record_query(session_id, &execution, &fingerprint);
488        }
489
490        // Should have detected burst
491        let alerts = detector.get_burst_alerts();
492        assert!(!alerts.is_empty(), "Should detect burst pattern");
493    }
494
495    #[test]
496    fn test_alert_severity() {
497        let pattern = NplusOnePattern {
498            session_id: "session-1".to_string(),
499            fingerprint: "select * from users where id = ?".to_string(),
500            fingerprint_hash: 12345,
501            repeat_count: 25,
502            window: Duration::from_secs(5),
503            first_seen_nanos: 0,
504            last_seen_nanos: 0,
505            tables: vec!["users".to_string()],
506        };
507
508        let alert = PatternAlert::NplusOne(pattern);
509        assert_eq!(alert.severity(), 3);
510    }
511
512    #[test]
513    fn test_session_cleanup() {
514        let mut config = PatternConfig::default();
515        config.session_timeout = Duration::from_millis(100);
516
517        let detector = PatternDetector::new(config);
518        let fp = QueryFingerprinter::new();
519
520        // Record query in session
521        let fingerprint = fp.fingerprint("SELECT 1");
522        let execution =
523            super::super::statistics::QueryExecution::new("SELECT 1", Duration::from_millis(1));
524        detector.record_query("session-1", &execution, &fingerprint);
525
526        assert_eq!(detector.session_count(), 1);
527
528        // Wait for timeout
529        std::thread::sleep(Duration::from_millis(150));
530
531        // Record in new session to trigger cleanup
532        detector.record_query("session-2", &execution, &fingerprint);
533
534        // Old session should be cleaned up (cleanup runs every minute in production,
535        // but our test may or may not have triggered it)
536    }
537
538    #[test]
539    fn test_reset() {
540        let config = PatternConfig::default();
541        let detector = PatternDetector::new(config);
542        let fp = QueryFingerprinter::new();
543
544        let fingerprint = fp.fingerprint("SELECT 1");
545        let execution =
546            super::super::statistics::QueryExecution::new("SELECT 1", Duration::from_millis(1));
547        detector.record_query("session-1", &execution, &fingerprint);
548
549        detector.reset();
550
551        assert_eq!(detector.session_count(), 0);
552        assert_eq!(detector.alert_count(), 0);
553    }
554
555    #[test]
556    fn test_alert_description() {
557        let pattern = NplusOnePattern {
558            session_id: "sess-123".to_string(),
559            fingerprint: "select * from users where id = ?".to_string(),
560            fingerprint_hash: 12345,
561            repeat_count: 10,
562            window: Duration::from_secs(5),
563            first_seen_nanos: 0,
564            last_seen_nanos: 0,
565            tables: vec!["users".to_string()],
566        };
567
568        let alert = PatternAlert::NplusOne(pattern);
569        let desc = alert.description();
570
571        assert!(desc.contains("N+1"));
572        assert!(desc.contains("10 times"));
573        assert!(desc.contains("sess-123"));
574    }
575}