Skip to main content

heliosdb_proxy/analytics/
slow_log.rs

1//! Slow Query Log
2//!
3//! Track and persist slow queries for analysis.
4
5use std::collections::VecDeque;
6use std::fs::{File, OpenOptions};
7use std::io::{BufRead, BufReader, Write};
8use std::path::PathBuf;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::time::{Duration, SystemTime};
11
12use parking_lot::RwLock;
13
14use super::config::SlowQueryConfig;
15use super::fingerprinter::QueryFingerprint;
16use super::statistics::QueryExecution;
17
18/// Slow query log entry
19#[derive(Debug, Clone)]
20pub struct SlowQueryEntry {
21    /// Timestamp (nanos since epoch)
22    pub timestamp_nanos: u64,
23
24    /// Query duration
25    pub duration: Duration,
26
27    /// Query text (possibly truncated)
28    pub query: String,
29
30    /// Normalized/fingerprinted query
31    pub fingerprint: String,
32
33    /// Fingerprint hash
34    pub fingerprint_hash: u64,
35
36    /// User who executed the query
37    pub user: String,
38
39    /// Database name
40    pub database: String,
41
42    /// Client IP
43    pub client_ip: String,
44
45    /// Node that executed the query
46    pub node: String,
47
48    /// Rows returned/affected
49    pub rows: usize,
50
51    /// Error message (if query failed)
52    pub error: Option<String>,
53
54    /// Session ID
55    pub session_id: Option<String>,
56
57    /// Workflow ID (for agent tracing)
58    pub workflow_id: Option<String>,
59}
60
61impl SlowQueryEntry {
62    /// Create from execution record and fingerprint
63    pub fn from_execution(
64        execution: &QueryExecution,
65        fingerprint: &QueryFingerprint,
66        max_query_length: usize,
67    ) -> Self {
68        let query = if execution.query.len() > max_query_length {
69            format!("{}...", &execution.query[..max_query_length])
70        } else {
71            execution.query.clone()
72        };
73
74        Self {
75            timestamp_nanos: now_nanos(),
76            duration: execution.duration,
77            query,
78            fingerprint: fingerprint.normalized.clone(),
79            fingerprint_hash: fingerprint.hash,
80            user: execution.user.clone(),
81            database: execution.database.clone(),
82            client_ip: execution.client_ip.clone(),
83            node: execution.node.clone(),
84            rows: execution.rows,
85            error: execution.error.clone(),
86            session_id: execution.session_id.clone(),
87            workflow_id: execution.workflow_id.clone(),
88        }
89    }
90
91    /// Format as log line
92    pub fn format_log_line(&self) -> String {
93        let timestamp = format_timestamp(self.timestamp_nanos);
94        let duration_ms = self.duration.as_secs_f64() * 1000.0;
95        let status = if self.error.is_some() { "ERROR" } else { "OK" };
96
97        format!(
98            "{} user={} db={} client={} node={} duration={:.3}ms rows={} status={} query={}",
99            timestamp,
100            self.user,
101            self.database,
102            self.client_ip,
103            self.node,
104            duration_ms,
105            self.rows,
106            status,
107            self.query.replace('\n', " ")
108        )
109    }
110
111    /// Parse from log line
112    pub fn parse_log_line(line: &str) -> Option<Self> {
113        // Basic parser for log lines
114        // Format: TIMESTAMP user=X db=X client=X node=X duration=Xms rows=X status=X query=X
115
116        let parts: Vec<&str> = line.splitn(9, ' ').collect();
117        if parts.len() < 9 {
118            return None;
119        }
120
121        let timestamp = parts[0];
122        let timestamp_nanos = parse_timestamp(timestamp)?;
123
124        let mut user = String::new();
125        let mut db = String::new();
126        let mut client = String::new();
127        let mut node = String::new();
128        let mut duration_ms = 0.0f64;
129        let mut rows = 0usize;
130        let mut status = "OK";
131        let mut query = String::new();
132
133        for part in &parts[1..] {
134            if let Some(val) = part.strip_prefix("user=") {
135                user = val.to_string();
136            } else if let Some(val) = part.strip_prefix("db=") {
137                db = val.to_string();
138            } else if let Some(val) = part.strip_prefix("client=") {
139                client = val.to_string();
140            } else if let Some(val) = part.strip_prefix("node=") {
141                node = val.to_string();
142            } else if let Some(val) = part.strip_prefix("duration=") {
143                if let Some(ms_str) = val.strip_suffix("ms") {
144                    duration_ms = ms_str.parse().unwrap_or(0.0);
145                }
146            } else if let Some(val) = part.strip_prefix("rows=") {
147                rows = val.parse().unwrap_or(0);
148            } else if let Some(val) = part.strip_prefix("status=") {
149                status = val;
150            } else if let Some(val) = part.strip_prefix("query=") {
151                query = val.to_string();
152            }
153        }
154
155        let error = if status == "ERROR" {
156            Some("Query failed".to_string())
157        } else {
158            None
159        };
160
161        Some(Self {
162            timestamp_nanos,
163            duration: Duration::from_secs_f64(duration_ms / 1000.0),
164            query,
165            fingerprint: String::new(),
166            fingerprint_hash: 0,
167            user,
168            database: db,
169            client_ip: client,
170            node,
171            rows,
172            error,
173            session_id: None,
174            workflow_id: None,
175        })
176    }
177}
178
179/// Slow query log
180pub struct SlowQueryLog {
181    /// Configuration
182    config: SlowQueryConfig,
183
184    /// Recent entries (in-memory)
185    recent: RwLock<VecDeque<SlowQueryEntry>>,
186
187    /// Log file writer
188    file_writer: RwLock<Option<File>>,
189
190    /// Total logged count
191    logged_count: AtomicU64,
192}
193
194impl SlowQueryLog {
195    /// Create new slow query log
196    pub fn new(config: SlowQueryConfig) -> Self {
197        let file_writer = if let Some(ref path) = config.log_file {
198            match OpenOptions::new().create(true).append(true).open(path) {
199                Ok(file) => Some(file),
200                Err(e) => {
201                    eprintln!("Failed to open slow query log file {:?}: {}", path, e);
202                    None
203                }
204            }
205        } else {
206            None
207        };
208
209        Self {
210            config,
211            recent: RwLock::new(VecDeque::new()),
212            file_writer: RwLock::new(file_writer),
213            logged_count: AtomicU64::new(0),
214        }
215    }
216
217    /// Log query if it exceeds threshold
218    pub fn log_if_slow(&self, execution: &QueryExecution, fingerprint: &QueryFingerprint) {
219        if !self.config.enabled {
220            return;
221        }
222
223        if execution.duration < self.config.threshold {
224            return;
225        }
226
227        let entry =
228            SlowQueryEntry::from_execution(execution, fingerprint, self.config.max_query_length);
229
230        self.log_entry(entry);
231    }
232
233    /// Log an entry directly
234    pub fn log_entry(&self, entry: SlowQueryEntry) {
235        self.logged_count.fetch_add(1, Ordering::Relaxed);
236
237        // Add to recent entries
238        {
239            let mut recent = self.recent.write();
240            recent.push_back(entry.clone());
241
242            // Trim if exceeding max
243            while recent.len() > self.config.max_recent_entries {
244                recent.pop_front();
245            }
246        }
247
248        // Write to file if configured
249        if let Some(ref mut file) = *self.file_writer.write() {
250            let line = entry.format_log_line();
251            if let Err(e) = writeln!(file, "{}", line) {
252                eprintln!("Failed to write slow query log: {}", e);
253            }
254        }
255    }
256
257    /// Get recent slow queries
258    pub fn recent(&self, limit: usize) -> Vec<SlowQueryEntry> {
259        let recent = self.recent.read();
260        recent.iter().rev().take(limit).cloned().collect()
261    }
262
263    /// Get all recent entries
264    pub fn all_recent(&self) -> Vec<SlowQueryEntry> {
265        self.recent.read().iter().cloned().collect()
266    }
267
268    /// Get count of logged slow queries
269    pub fn count(&self) -> u64 {
270        self.logged_count.load(Ordering::Relaxed)
271    }
272
273    /// Get threshold
274    pub fn threshold(&self) -> Duration {
275        self.config.threshold
276    }
277
278    /// Clear recent entries
279    pub fn clear(&self) {
280        self.recent.write().clear();
281    }
282
283    /// Check if enabled
284    pub fn is_enabled(&self) -> bool {
285        self.config.enabled
286    }
287}
288
289/// Reader for slow query log files
290pub struct SlowQueryReader {
291    /// Path to log file
292    path: PathBuf,
293}
294
295impl SlowQueryReader {
296    /// Create reader for log file
297    pub fn new(path: impl Into<PathBuf>) -> Self {
298        Self { path: path.into() }
299    }
300
301    /// Read all entries from file
302    pub fn read_all(&self) -> std::io::Result<Vec<SlowQueryEntry>> {
303        let file = File::open(&self.path)?;
304        let reader = BufReader::new(file);
305        let mut entries = Vec::new();
306
307        for line in reader.lines() {
308            let line = line?;
309            if let Some(entry) = SlowQueryEntry::parse_log_line(&line) {
310                entries.push(entry);
311            }
312        }
313
314        Ok(entries)
315    }
316
317    /// Read entries within time range
318    pub fn read_range(
319        &self,
320        start_nanos: u64,
321        end_nanos: u64,
322    ) -> std::io::Result<Vec<SlowQueryEntry>> {
323        let all = self.read_all()?;
324        Ok(all
325            .into_iter()
326            .filter(|e| e.timestamp_nanos >= start_nanos && e.timestamp_nanos <= end_nanos)
327            .collect())
328    }
329
330    /// Read entries slower than threshold
331    pub fn read_slower_than(&self, threshold: Duration) -> std::io::Result<Vec<SlowQueryEntry>> {
332        let all = self.read_all()?;
333        Ok(all.into_iter().filter(|e| e.duration > threshold).collect())
334    }
335
336    /// Read last N entries
337    pub fn read_last(&self, n: usize) -> std::io::Result<Vec<SlowQueryEntry>> {
338        let all = self.read_all()?;
339        Ok(all.into_iter().rev().take(n).collect())
340    }
341}
342
343fn now_nanos() -> u64 {
344    SystemTime::now()
345        .duration_since(SystemTime::UNIX_EPOCH)
346        .map(|d| d.as_nanos() as u64)
347        .unwrap_or(0)
348}
349
350fn format_timestamp(nanos: u64) -> String {
351    // Simple ISO 8601-like format
352    let secs = nanos / 1_000_000_000;
353    let ms = (nanos % 1_000_000_000) / 1_000_000;
354
355    // Use chrono if available, otherwise basic format
356    format!("{}:{:03}", secs, ms)
357}
358
359fn parse_timestamp(s: &str) -> Option<u64> {
360    let parts: Vec<&str> = s.split(':').collect();
361    if parts.len() >= 2 {
362        let secs: u64 = parts[0].parse().ok()?;
363        let ms: u64 = parts[1].parse().ok()?;
364        Some(secs * 1_000_000_000 + ms * 1_000_000)
365    } else {
366        None
367    }
368}
369
370#[cfg(test)]
371mod tests {
372    use super::*;
373
374    #[test]
375    fn test_slow_query_entry_format() {
376        let entry = SlowQueryEntry {
377            timestamp_nanos: 1704067200_000_000_000,
378            duration: Duration::from_millis(1500),
379            query: "SELECT * FROM users WHERE id = 1".to_string(),
380            fingerprint: "select * from users where id = ?".to_string(),
381            fingerprint_hash: 12345,
382            user: "alice".to_string(),
383            database: "mydb".to_string(),
384            client_ip: "192.168.1.100".to_string(),
385            node: "primary".to_string(),
386            rows: 1,
387            error: None,
388            session_id: None,
389            workflow_id: None,
390        };
391
392        let line = entry.format_log_line();
393        assert!(line.contains("user=alice"));
394        assert!(line.contains("db=mydb"));
395        assert!(line.contains("duration=1500.000ms"));
396        assert!(line.contains("status=OK"));
397    }
398
399    #[test]
400    fn test_slow_query_log_enabled() {
401        let config = SlowQueryConfig {
402            enabled: true,
403            threshold: Duration::from_millis(100),
404            log_file: None,
405            log_parameters: false,
406            max_query_length: 1000,
407            max_recent_entries: 10,
408        };
409
410        let log = SlowQueryLog::new(config);
411        assert!(log.is_enabled());
412        assert_eq!(log.threshold(), Duration::from_millis(100));
413    }
414
415    #[test]
416    fn test_slow_query_log_threshold() {
417        let config = SlowQueryConfig {
418            enabled: true,
419            threshold: Duration::from_millis(100),
420            log_file: None,
421            log_parameters: false,
422            max_query_length: 1000,
423            max_recent_entries: 10,
424        };
425
426        let log = SlowQueryLog::new(config);
427
428        // Fast query - should not be logged
429        let fast_exec = QueryExecution::new("SELECT 1", Duration::from_millis(50));
430        let fingerprint =
431            super::super::fingerprinter::QueryFingerprinter::new().fingerprint("SELECT 1");
432        log.log_if_slow(&fast_exec, &fingerprint);
433        assert_eq!(log.count(), 0);
434
435        // Slow query - should be logged
436        let slow_exec = QueryExecution::new("SELECT * FROM users", Duration::from_millis(150));
437        let fingerprint = super::super::fingerprinter::QueryFingerprinter::new()
438            .fingerprint("SELECT * FROM users");
439        log.log_if_slow(&slow_exec, &fingerprint);
440        assert_eq!(log.count(), 1);
441    }
442
443    #[test]
444    fn test_slow_query_log_recent() {
445        let config = SlowQueryConfig {
446            enabled: true,
447            threshold: Duration::from_millis(100),
448            log_file: None,
449            log_parameters: false,
450            max_query_length: 1000,
451            max_recent_entries: 5,
452        };
453
454        let log = SlowQueryLog::new(config);
455        let fp = super::super::fingerprinter::QueryFingerprinter::new();
456
457        // Log 10 slow queries
458        for i in 0..10 {
459            let exec = QueryExecution::new(
460                format!("SELECT * FROM table_{}", i),
461                Duration::from_millis(150),
462            );
463            let fingerprint = fp.fingerprint(&exec.query);
464            log.log_if_slow(&exec, &fingerprint);
465        }
466
467        // Should only keep last 5
468        let recent = log.recent(10);
469        assert_eq!(recent.len(), 5);
470        assert!(recent[0].query.contains("table_9")); // Most recent first
471    }
472
473    #[test]
474    fn test_slow_query_entry_parse() {
475        let line = "1704067200:000 user=alice db=mydb client=127.0.0.1 node=primary duration=1500.000ms rows=1 status=OK query=SELECT 1";
476        let entry = SlowQueryEntry::parse_log_line(line);
477
478        assert!(entry.is_some());
479        let entry = entry.unwrap();
480        assert_eq!(entry.user, "alice");
481        assert_eq!(entry.database, "mydb");
482        assert_eq!(entry.rows, 1);
483    }
484
485    #[test]
486    fn test_query_truncation() {
487        let config = SlowQueryConfig {
488            enabled: true,
489            threshold: Duration::from_millis(100),
490            log_file: None,
491            log_parameters: false,
492            max_query_length: 20,
493            max_recent_entries: 10,
494        };
495
496        let log = SlowQueryLog::new(config);
497        let fp = super::super::fingerprinter::QueryFingerprinter::new();
498
499        let long_query = "SELECT * FROM users WHERE name = 'this is a very long query'";
500        let exec = QueryExecution::new(long_query, Duration::from_millis(150));
501        let fingerprint = fp.fingerprint(long_query);
502        log.log_if_slow(&exec, &fingerprint);
503
504        let recent = log.recent(1);
505        assert_eq!(recent.len(), 1);
506        assert!(recent[0].query.len() <= 23); // 20 + "..."
507        assert!(recent[0].query.ends_with("..."));
508    }
509}