Skip to main content

heliosdb_proxy/analytics/
slow_log.rs

1//! Slow Query Log
2//!
3//! Track and persist slow queries for analysis.
4
5use std::collections::VecDeque;
6use std::fs::{File, OpenOptions};
7use std::io::{BufRead, BufReader, Write};
8use std::path::PathBuf;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::time::{Duration, SystemTime};
11
12use parking_lot::RwLock;
13
14use super::config::SlowQueryConfig;
15use super::fingerprinter::QueryFingerprint;
16use super::statistics::QueryExecution;
17
18/// Slow query log entry
19#[derive(Debug, Clone)]
20pub struct SlowQueryEntry {
21    /// Timestamp (nanos since epoch)
22    pub timestamp_nanos: u64,
23
24    /// Query duration
25    pub duration: Duration,
26
27    /// Query text (possibly truncated)
28    pub query: String,
29
30    /// Normalized/fingerprinted query
31    pub fingerprint: String,
32
33    /// Fingerprint hash
34    pub fingerprint_hash: u64,
35
36    /// User who executed the query
37    pub user: String,
38
39    /// Database name
40    pub database: String,
41
42    /// Client IP
43    pub client_ip: String,
44
45    /// Node that executed the query
46    pub node: String,
47
48    /// Rows returned/affected
49    pub rows: usize,
50
51    /// Error message (if query failed)
52    pub error: Option<String>,
53
54    /// Session ID
55    pub session_id: Option<String>,
56
57    /// Workflow ID (for agent tracing)
58    pub workflow_id: Option<String>,
59}
60
61impl SlowQueryEntry {
62    /// Create from execution record and fingerprint
63    pub fn from_execution(
64        execution: &QueryExecution,
65        fingerprint: &QueryFingerprint,
66        max_query_length: usize,
67    ) -> Self {
68        let query = if execution.query.len() > max_query_length {
69            format!("{}...", &execution.query[..max_query_length])
70        } else {
71            execution.query.clone()
72        };
73
74        Self {
75            timestamp_nanos: now_nanos(),
76            duration: execution.duration,
77            query,
78            fingerprint: fingerprint.normalized.clone(),
79            fingerprint_hash: fingerprint.hash,
80            user: execution.user.clone(),
81            database: execution.database.clone(),
82            client_ip: execution.client_ip.clone(),
83            node: execution.node.clone(),
84            rows: execution.rows,
85            error: execution.error.clone(),
86            session_id: execution.session_id.clone(),
87            workflow_id: execution.workflow_id.clone(),
88        }
89    }
90
91    /// Format as log line
92    pub fn format_log_line(&self) -> String {
93        let timestamp = format_timestamp(self.timestamp_nanos);
94        let duration_ms = self.duration.as_secs_f64() * 1000.0;
95        let status = if self.error.is_some() { "ERROR" } else { "OK" };
96
97        format!(
98            "{} user={} db={} client={} node={} duration={:.3}ms rows={} status={} query={}",
99            timestamp,
100            self.user,
101            self.database,
102            self.client_ip,
103            self.node,
104            duration_ms,
105            self.rows,
106            status,
107            self.query.replace('\n', " ")
108        )
109    }
110
111    /// Parse from log line
112    pub fn parse_log_line(line: &str) -> Option<Self> {
113        // Basic parser for log lines
114        // Format: TIMESTAMP user=X db=X client=X node=X duration=Xms rows=X status=X query=X
115
116        let parts: Vec<&str> = line.splitn(9, ' ').collect();
117        if parts.len() < 9 {
118            return None;
119        }
120
121        let timestamp = parts[0];
122        let timestamp_nanos = parse_timestamp(timestamp)?;
123
124        let mut user = String::new();
125        let mut db = String::new();
126        let mut client = String::new();
127        let mut node = String::new();
128        let mut duration_ms = 0.0f64;
129        let mut rows = 0usize;
130        let mut status = "OK";
131        let mut query = String::new();
132
133        for part in &parts[1..] {
134            if let Some(val) = part.strip_prefix("user=") {
135                user = val.to_string();
136            } else if let Some(val) = part.strip_prefix("db=") {
137                db = val.to_string();
138            } else if let Some(val) = part.strip_prefix("client=") {
139                client = val.to_string();
140            } else if let Some(val) = part.strip_prefix("node=") {
141                node = val.to_string();
142            } else if let Some(val) = part.strip_prefix("duration=") {
143                if let Some(ms_str) = val.strip_suffix("ms") {
144                    duration_ms = ms_str.parse().unwrap_or(0.0);
145                }
146            } else if let Some(val) = part.strip_prefix("rows=") {
147                rows = val.parse().unwrap_or(0);
148            } else if let Some(val) = part.strip_prefix("status=") {
149                status = val;
150            } else if let Some(val) = part.strip_prefix("query=") {
151                query = val.to_string();
152            }
153        }
154
155        let error = if status == "ERROR" {
156            Some("Query failed".to_string())
157        } else {
158            None
159        };
160
161        Some(Self {
162            timestamp_nanos,
163            duration: Duration::from_secs_f64(duration_ms / 1000.0),
164            query,
165            fingerprint: String::new(),
166            fingerprint_hash: 0,
167            user,
168            database: db,
169            client_ip: client,
170            node,
171            rows,
172            error,
173            session_id: None,
174            workflow_id: None,
175        })
176    }
177}
178
179/// Slow query log
180pub struct SlowQueryLog {
181    /// Configuration
182    config: SlowQueryConfig,
183
184    /// Recent entries (in-memory)
185    recent: RwLock<VecDeque<SlowQueryEntry>>,
186
187    /// Log file writer
188    file_writer: RwLock<Option<File>>,
189
190    /// Total logged count
191    logged_count: AtomicU64,
192}
193
194impl SlowQueryLog {
195    /// Create new slow query log
196    pub fn new(config: SlowQueryConfig) -> Self {
197        let file_writer = if let Some(ref path) = config.log_file {
198            match OpenOptions::new()
199                .create(true)
200                .append(true)
201                .open(path)
202            {
203                Ok(file) => Some(file),
204                Err(e) => {
205                    eprintln!("Failed to open slow query log file {:?}: {}", path, e);
206                    None
207                }
208            }
209        } else {
210            None
211        };
212
213        Self {
214            config,
215            recent: RwLock::new(VecDeque::new()),
216            file_writer: RwLock::new(file_writer),
217            logged_count: AtomicU64::new(0),
218        }
219    }
220
221    /// Log query if it exceeds threshold
222    pub fn log_if_slow(&self, execution: &QueryExecution, fingerprint: &QueryFingerprint) {
223        if !self.config.enabled {
224            return;
225        }
226
227        if execution.duration < self.config.threshold {
228            return;
229        }
230
231        let entry = SlowQueryEntry::from_execution(
232            execution,
233            fingerprint,
234            self.config.max_query_length,
235        );
236
237        self.log_entry(entry);
238    }
239
240    /// Log an entry directly
241    pub fn log_entry(&self, entry: SlowQueryEntry) {
242        self.logged_count.fetch_add(1, Ordering::Relaxed);
243
244        // Add to recent entries
245        {
246            let mut recent = self.recent.write();
247            recent.push_back(entry.clone());
248
249            // Trim if exceeding max
250            while recent.len() > self.config.max_recent_entries {
251                recent.pop_front();
252            }
253        }
254
255        // Write to file if configured
256        if let Some(ref mut file) = *self.file_writer.write() {
257            let line = entry.format_log_line();
258            if let Err(e) = writeln!(file, "{}", line) {
259                eprintln!("Failed to write slow query log: {}", e);
260            }
261        }
262    }
263
264    /// Get recent slow queries
265    pub fn recent(&self, limit: usize) -> Vec<SlowQueryEntry> {
266        let recent = self.recent.read();
267        recent
268            .iter()
269            .rev()
270            .take(limit)
271            .cloned()
272            .collect()
273    }
274
275    /// Get all recent entries
276    pub fn all_recent(&self) -> Vec<SlowQueryEntry> {
277        self.recent.read().iter().cloned().collect()
278    }
279
280    /// Get count of logged slow queries
281    pub fn count(&self) -> u64 {
282        self.logged_count.load(Ordering::Relaxed)
283    }
284
285    /// Get threshold
286    pub fn threshold(&self) -> Duration {
287        self.config.threshold
288    }
289
290    /// Clear recent entries
291    pub fn clear(&self) {
292        self.recent.write().clear();
293    }
294
295    /// Check if enabled
296    pub fn is_enabled(&self) -> bool {
297        self.config.enabled
298    }
299}
300
301/// Reader for slow query log files
302pub struct SlowQueryReader {
303    /// Path to log file
304    path: PathBuf,
305}
306
307impl SlowQueryReader {
308    /// Create reader for log file
309    pub fn new(path: impl Into<PathBuf>) -> Self {
310        Self { path: path.into() }
311    }
312
313    /// Read all entries from file
314    pub fn read_all(&self) -> std::io::Result<Vec<SlowQueryEntry>> {
315        let file = File::open(&self.path)?;
316        let reader = BufReader::new(file);
317        let mut entries = Vec::new();
318
319        for line in reader.lines() {
320            let line = line?;
321            if let Some(entry) = SlowQueryEntry::parse_log_line(&line) {
322                entries.push(entry);
323            }
324        }
325
326        Ok(entries)
327    }
328
329    /// Read entries within time range
330    pub fn read_range(
331        &self,
332        start_nanos: u64,
333        end_nanos: u64,
334    ) -> std::io::Result<Vec<SlowQueryEntry>> {
335        let all = self.read_all()?;
336        Ok(all
337            .into_iter()
338            .filter(|e| e.timestamp_nanos >= start_nanos && e.timestamp_nanos <= end_nanos)
339            .collect())
340    }
341
342    /// Read entries slower than threshold
343    pub fn read_slower_than(&self, threshold: Duration) -> std::io::Result<Vec<SlowQueryEntry>> {
344        let all = self.read_all()?;
345        Ok(all
346            .into_iter()
347            .filter(|e| e.duration > threshold)
348            .collect())
349    }
350
351    /// Read last N entries
352    pub fn read_last(&self, n: usize) -> std::io::Result<Vec<SlowQueryEntry>> {
353        let all = self.read_all()?;
354        Ok(all.into_iter().rev().take(n).collect())
355    }
356}
357
358fn now_nanos() -> u64 {
359    SystemTime::now()
360        .duration_since(SystemTime::UNIX_EPOCH)
361        .map(|d| d.as_nanos() as u64)
362        .unwrap_or(0)
363}
364
365fn format_timestamp(nanos: u64) -> String {
366    // Simple ISO 8601-like format
367    let secs = nanos / 1_000_000_000;
368    let ms = (nanos % 1_000_000_000) / 1_000_000;
369
370    // Use chrono if available, otherwise basic format
371    format!("{}:{:03}", secs, ms)
372}
373
374fn parse_timestamp(s: &str) -> Option<u64> {
375    let parts: Vec<&str> = s.split(':').collect();
376    if parts.len() >= 2 {
377        let secs: u64 = parts[0].parse().ok()?;
378        let ms: u64 = parts[1].parse().ok()?;
379        Some(secs * 1_000_000_000 + ms * 1_000_000)
380    } else {
381        None
382    }
383}
384
385#[cfg(test)]
386mod tests {
387    use super::*;
388
389    #[test]
390    fn test_slow_query_entry_format() {
391        let entry = SlowQueryEntry {
392            timestamp_nanos: 1704067200_000_000_000,
393            duration: Duration::from_millis(1500),
394            query: "SELECT * FROM users WHERE id = 1".to_string(),
395            fingerprint: "select * from users where id = ?".to_string(),
396            fingerprint_hash: 12345,
397            user: "alice".to_string(),
398            database: "mydb".to_string(),
399            client_ip: "192.168.1.100".to_string(),
400            node: "primary".to_string(),
401            rows: 1,
402            error: None,
403            session_id: None,
404            workflow_id: None,
405        };
406
407        let line = entry.format_log_line();
408        assert!(line.contains("user=alice"));
409        assert!(line.contains("db=mydb"));
410        assert!(line.contains("duration=1500.000ms"));
411        assert!(line.contains("status=OK"));
412    }
413
414    #[test]
415    fn test_slow_query_log_enabled() {
416        let config = SlowQueryConfig {
417            enabled: true,
418            threshold: Duration::from_millis(100),
419            log_file: None,
420            log_parameters: false,
421            max_query_length: 1000,
422            max_recent_entries: 10,
423        };
424
425        let log = SlowQueryLog::new(config);
426        assert!(log.is_enabled());
427        assert_eq!(log.threshold(), Duration::from_millis(100));
428    }
429
430    #[test]
431    fn test_slow_query_log_threshold() {
432        let config = SlowQueryConfig {
433            enabled: true,
434            threshold: Duration::from_millis(100),
435            log_file: None,
436            log_parameters: false,
437            max_query_length: 1000,
438            max_recent_entries: 10,
439        };
440
441        let log = SlowQueryLog::new(config);
442
443        // Fast query - should not be logged
444        let fast_exec = QueryExecution::new("SELECT 1", Duration::from_millis(50));
445        let fingerprint = super::super::fingerprinter::QueryFingerprinter::new()
446            .fingerprint("SELECT 1");
447        log.log_if_slow(&fast_exec, &fingerprint);
448        assert_eq!(log.count(), 0);
449
450        // Slow query - should be logged
451        let slow_exec = QueryExecution::new("SELECT * FROM users", Duration::from_millis(150));
452        let fingerprint = super::super::fingerprinter::QueryFingerprinter::new()
453            .fingerprint("SELECT * FROM users");
454        log.log_if_slow(&slow_exec, &fingerprint);
455        assert_eq!(log.count(), 1);
456    }
457
458    #[test]
459    fn test_slow_query_log_recent() {
460        let config = SlowQueryConfig {
461            enabled: true,
462            threshold: Duration::from_millis(100),
463            log_file: None,
464            log_parameters: false,
465            max_query_length: 1000,
466            max_recent_entries: 5,
467        };
468
469        let log = SlowQueryLog::new(config);
470        let fp = super::super::fingerprinter::QueryFingerprinter::new();
471
472        // Log 10 slow queries
473        for i in 0..10 {
474            let exec = QueryExecution::new(
475                format!("SELECT * FROM table_{}", i),
476                Duration::from_millis(150),
477            );
478            let fingerprint = fp.fingerprint(&exec.query);
479            log.log_if_slow(&exec, &fingerprint);
480        }
481
482        // Should only keep last 5
483        let recent = log.recent(10);
484        assert_eq!(recent.len(), 5);
485        assert!(recent[0].query.contains("table_9")); // Most recent first
486    }
487
488    #[test]
489    fn test_slow_query_entry_parse() {
490        let line = "1704067200:000 user=alice db=mydb client=127.0.0.1 node=primary duration=1500.000ms rows=1 status=OK query=SELECT 1";
491        let entry = SlowQueryEntry::parse_log_line(line);
492
493        assert!(entry.is_some());
494        let entry = entry.unwrap();
495        assert_eq!(entry.user, "alice");
496        assert_eq!(entry.database, "mydb");
497        assert_eq!(entry.rows, 1);
498    }
499
500    #[test]
501    fn test_query_truncation() {
502        let config = SlowQueryConfig {
503            enabled: true,
504            threshold: Duration::from_millis(100),
505            log_file: None,
506            log_parameters: false,
507            max_query_length: 20,
508            max_recent_entries: 10,
509        };
510
511        let log = SlowQueryLog::new(config);
512        let fp = super::super::fingerprinter::QueryFingerprinter::new();
513
514        let long_query = "SELECT * FROM users WHERE name = 'this is a very long query'";
515        let exec = QueryExecution::new(long_query, Duration::from_millis(150));
516        let fingerprint = fp.fingerprint(long_query);
517        log.log_if_slow(&exec, &fingerprint);
518
519        let recent = log.recent(1);
520        assert_eq!(recent.len(), 1);
521        assert!(recent[0].query.len() <= 23); // 20 + "..."
522        assert!(recent[0].query.ends_with("..."));
523    }
524}