heliosdb_proxy/analytics/
slow_log.rs1use std::collections::VecDeque;
6use std::fs::{File, OpenOptions};
7use std::io::{BufRead, BufReader, Write};
8use std::path::PathBuf;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::time::{Duration, SystemTime};
11
12use parking_lot::RwLock;
13
14use super::config::SlowQueryConfig;
15use super::fingerprinter::QueryFingerprint;
16use super::statistics::QueryExecution;
17
18#[derive(Debug, Clone)]
20pub struct SlowQueryEntry {
21 pub timestamp_nanos: u64,
23
24 pub duration: Duration,
26
27 pub query: String,
29
30 pub fingerprint: String,
32
33 pub fingerprint_hash: u64,
35
36 pub user: String,
38
39 pub database: String,
41
42 pub client_ip: String,
44
45 pub node: String,
47
48 pub rows: usize,
50
51 pub error: Option<String>,
53
54 pub session_id: Option<String>,
56
57 pub workflow_id: Option<String>,
59}
60
61impl SlowQueryEntry {
62 pub fn from_execution(
64 execution: &QueryExecution,
65 fingerprint: &QueryFingerprint,
66 max_query_length: usize,
67 ) -> Self {
68 let query = if execution.query.len() > max_query_length {
69 format!("{}...", &execution.query[..max_query_length])
70 } else {
71 execution.query.clone()
72 };
73
74 Self {
75 timestamp_nanos: now_nanos(),
76 duration: execution.duration,
77 query,
78 fingerprint: fingerprint.normalized.clone(),
79 fingerprint_hash: fingerprint.hash,
80 user: execution.user.clone(),
81 database: execution.database.clone(),
82 client_ip: execution.client_ip.clone(),
83 node: execution.node.clone(),
84 rows: execution.rows,
85 error: execution.error.clone(),
86 session_id: execution.session_id.clone(),
87 workflow_id: execution.workflow_id.clone(),
88 }
89 }
90
91 pub fn format_log_line(&self) -> String {
93 let timestamp = format_timestamp(self.timestamp_nanos);
94 let duration_ms = self.duration.as_secs_f64() * 1000.0;
95 let status = if self.error.is_some() { "ERROR" } else { "OK" };
96
97 format!(
98 "{} user={} db={} client={} node={} duration={:.3}ms rows={} status={} query={}",
99 timestamp,
100 self.user,
101 self.database,
102 self.client_ip,
103 self.node,
104 duration_ms,
105 self.rows,
106 status,
107 self.query.replace('\n', " ")
108 )
109 }
110
111 pub fn parse_log_line(line: &str) -> Option<Self> {
113 let parts: Vec<&str> = line.splitn(9, ' ').collect();
117 if parts.len() < 9 {
118 return None;
119 }
120
121 let timestamp = parts[0];
122 let timestamp_nanos = parse_timestamp(timestamp)?;
123
124 let mut user = String::new();
125 let mut db = String::new();
126 let mut client = String::new();
127 let mut node = String::new();
128 let mut duration_ms = 0.0f64;
129 let mut rows = 0usize;
130 let mut status = "OK";
131 let mut query = String::new();
132
133 for part in &parts[1..] {
134 if let Some(val) = part.strip_prefix("user=") {
135 user = val.to_string();
136 } else if let Some(val) = part.strip_prefix("db=") {
137 db = val.to_string();
138 } else if let Some(val) = part.strip_prefix("client=") {
139 client = val.to_string();
140 } else if let Some(val) = part.strip_prefix("node=") {
141 node = val.to_string();
142 } else if let Some(val) = part.strip_prefix("duration=") {
143 if let Some(ms_str) = val.strip_suffix("ms") {
144 duration_ms = ms_str.parse().unwrap_or(0.0);
145 }
146 } else if let Some(val) = part.strip_prefix("rows=") {
147 rows = val.parse().unwrap_or(0);
148 } else if let Some(val) = part.strip_prefix("status=") {
149 status = val;
150 } else if let Some(val) = part.strip_prefix("query=") {
151 query = val.to_string();
152 }
153 }
154
155 let error = if status == "ERROR" {
156 Some("Query failed".to_string())
157 } else {
158 None
159 };
160
161 Some(Self {
162 timestamp_nanos,
163 duration: Duration::from_secs_f64(duration_ms / 1000.0),
164 query,
165 fingerprint: String::new(),
166 fingerprint_hash: 0,
167 user,
168 database: db,
169 client_ip: client,
170 node,
171 rows,
172 error,
173 session_id: None,
174 workflow_id: None,
175 })
176 }
177}
178
179pub struct SlowQueryLog {
181 config: SlowQueryConfig,
183
184 recent: RwLock<VecDeque<SlowQueryEntry>>,
186
187 file_writer: RwLock<Option<File>>,
189
190 logged_count: AtomicU64,
192}
193
194impl SlowQueryLog {
195 pub fn new(config: SlowQueryConfig) -> Self {
197 let file_writer = if let Some(ref path) = config.log_file {
198 match OpenOptions::new().create(true).append(true).open(path) {
199 Ok(file) => Some(file),
200 Err(e) => {
201 eprintln!("Failed to open slow query log file {:?}: {}", path, e);
202 None
203 }
204 }
205 } else {
206 None
207 };
208
209 Self {
210 config,
211 recent: RwLock::new(VecDeque::new()),
212 file_writer: RwLock::new(file_writer),
213 logged_count: AtomicU64::new(0),
214 }
215 }
216
217 pub fn log_if_slow(&self, execution: &QueryExecution, fingerprint: &QueryFingerprint) {
219 if !self.config.enabled {
220 return;
221 }
222
223 if execution.duration < self.config.threshold {
224 return;
225 }
226
227 let entry =
228 SlowQueryEntry::from_execution(execution, fingerprint, self.config.max_query_length);
229
230 self.log_entry(entry);
231 }
232
233 pub fn log_entry(&self, entry: SlowQueryEntry) {
235 self.logged_count.fetch_add(1, Ordering::Relaxed);
236
237 {
239 let mut recent = self.recent.write();
240 recent.push_back(entry.clone());
241
242 while recent.len() > self.config.max_recent_entries {
244 recent.pop_front();
245 }
246 }
247
248 if let Some(ref mut file) = *self.file_writer.write() {
250 let line = entry.format_log_line();
251 if let Err(e) = writeln!(file, "{}", line) {
252 eprintln!("Failed to write slow query log: {}", e);
253 }
254 }
255 }
256
257 pub fn recent(&self, limit: usize) -> Vec<SlowQueryEntry> {
259 let recent = self.recent.read();
260 recent.iter().rev().take(limit).cloned().collect()
261 }
262
263 pub fn all_recent(&self) -> Vec<SlowQueryEntry> {
265 self.recent.read().iter().cloned().collect()
266 }
267
268 pub fn count(&self) -> u64 {
270 self.logged_count.load(Ordering::Relaxed)
271 }
272
273 pub fn threshold(&self) -> Duration {
275 self.config.threshold
276 }
277
278 pub fn clear(&self) {
280 self.recent.write().clear();
281 }
282
283 pub fn is_enabled(&self) -> bool {
285 self.config.enabled
286 }
287}
288
289pub struct SlowQueryReader {
291 path: PathBuf,
293}
294
295impl SlowQueryReader {
296 pub fn new(path: impl Into<PathBuf>) -> Self {
298 Self { path: path.into() }
299 }
300
301 pub fn read_all(&self) -> std::io::Result<Vec<SlowQueryEntry>> {
303 let file = File::open(&self.path)?;
304 let reader = BufReader::new(file);
305 let mut entries = Vec::new();
306
307 for line in reader.lines() {
308 let line = line?;
309 if let Some(entry) = SlowQueryEntry::parse_log_line(&line) {
310 entries.push(entry);
311 }
312 }
313
314 Ok(entries)
315 }
316
317 pub fn read_range(
319 &self,
320 start_nanos: u64,
321 end_nanos: u64,
322 ) -> std::io::Result<Vec<SlowQueryEntry>> {
323 let all = self.read_all()?;
324 Ok(all
325 .into_iter()
326 .filter(|e| e.timestamp_nanos >= start_nanos && e.timestamp_nanos <= end_nanos)
327 .collect())
328 }
329
330 pub fn read_slower_than(&self, threshold: Duration) -> std::io::Result<Vec<SlowQueryEntry>> {
332 let all = self.read_all()?;
333 Ok(all.into_iter().filter(|e| e.duration > threshold).collect())
334 }
335
336 pub fn read_last(&self, n: usize) -> std::io::Result<Vec<SlowQueryEntry>> {
338 let all = self.read_all()?;
339 Ok(all.into_iter().rev().take(n).collect())
340 }
341}
342
343fn now_nanos() -> u64 {
344 SystemTime::now()
345 .duration_since(SystemTime::UNIX_EPOCH)
346 .map(|d| d.as_nanos() as u64)
347 .unwrap_or(0)
348}
349
350fn format_timestamp(nanos: u64) -> String {
351 let secs = nanos / 1_000_000_000;
353 let ms = (nanos % 1_000_000_000) / 1_000_000;
354
355 format!("{}:{:03}", secs, ms)
357}
358
359fn parse_timestamp(s: &str) -> Option<u64> {
360 let parts: Vec<&str> = s.split(':').collect();
361 if parts.len() >= 2 {
362 let secs: u64 = parts[0].parse().ok()?;
363 let ms: u64 = parts[1].parse().ok()?;
364 Some(secs * 1_000_000_000 + ms * 1_000_000)
365 } else {
366 None
367 }
368}
369
370#[cfg(test)]
371mod tests {
372 use super::*;
373
374 #[test]
375 fn test_slow_query_entry_format() {
376 let entry = SlowQueryEntry {
377 timestamp_nanos: 1704067200_000_000_000,
378 duration: Duration::from_millis(1500),
379 query: "SELECT * FROM users WHERE id = 1".to_string(),
380 fingerprint: "select * from users where id = ?".to_string(),
381 fingerprint_hash: 12345,
382 user: "alice".to_string(),
383 database: "mydb".to_string(),
384 client_ip: "192.168.1.100".to_string(),
385 node: "primary".to_string(),
386 rows: 1,
387 error: None,
388 session_id: None,
389 workflow_id: None,
390 };
391
392 let line = entry.format_log_line();
393 assert!(line.contains("user=alice"));
394 assert!(line.contains("db=mydb"));
395 assert!(line.contains("duration=1500.000ms"));
396 assert!(line.contains("status=OK"));
397 }
398
399 #[test]
400 fn test_slow_query_log_enabled() {
401 let config = SlowQueryConfig {
402 enabled: true,
403 threshold: Duration::from_millis(100),
404 log_file: None,
405 log_parameters: false,
406 max_query_length: 1000,
407 max_recent_entries: 10,
408 };
409
410 let log = SlowQueryLog::new(config);
411 assert!(log.is_enabled());
412 assert_eq!(log.threshold(), Duration::from_millis(100));
413 }
414
415 #[test]
416 fn test_slow_query_log_threshold() {
417 let config = SlowQueryConfig {
418 enabled: true,
419 threshold: Duration::from_millis(100),
420 log_file: None,
421 log_parameters: false,
422 max_query_length: 1000,
423 max_recent_entries: 10,
424 };
425
426 let log = SlowQueryLog::new(config);
427
428 let fast_exec = QueryExecution::new("SELECT 1", Duration::from_millis(50));
430 let fingerprint =
431 super::super::fingerprinter::QueryFingerprinter::new().fingerprint("SELECT 1");
432 log.log_if_slow(&fast_exec, &fingerprint);
433 assert_eq!(log.count(), 0);
434
435 let slow_exec = QueryExecution::new("SELECT * FROM users", Duration::from_millis(150));
437 let fingerprint = super::super::fingerprinter::QueryFingerprinter::new()
438 .fingerprint("SELECT * FROM users");
439 log.log_if_slow(&slow_exec, &fingerprint);
440 assert_eq!(log.count(), 1);
441 }
442
443 #[test]
444 fn test_slow_query_log_recent() {
445 let config = SlowQueryConfig {
446 enabled: true,
447 threshold: Duration::from_millis(100),
448 log_file: None,
449 log_parameters: false,
450 max_query_length: 1000,
451 max_recent_entries: 5,
452 };
453
454 let log = SlowQueryLog::new(config);
455 let fp = super::super::fingerprinter::QueryFingerprinter::new();
456
457 for i in 0..10 {
459 let exec = QueryExecution::new(
460 format!("SELECT * FROM table_{}", i),
461 Duration::from_millis(150),
462 );
463 let fingerprint = fp.fingerprint(&exec.query);
464 log.log_if_slow(&exec, &fingerprint);
465 }
466
467 let recent = log.recent(10);
469 assert_eq!(recent.len(), 5);
470 assert!(recent[0].query.contains("table_9")); }
472
473 #[test]
474 fn test_slow_query_entry_parse() {
475 let line = "1704067200:000 user=alice db=mydb client=127.0.0.1 node=primary duration=1500.000ms rows=1 status=OK query=SELECT 1";
476 let entry = SlowQueryEntry::parse_log_line(line);
477
478 assert!(entry.is_some());
479 let entry = entry.unwrap();
480 assert_eq!(entry.user, "alice");
481 assert_eq!(entry.database, "mydb");
482 assert_eq!(entry.rows, 1);
483 }
484
485 #[test]
486 fn test_query_truncation() {
487 let config = SlowQueryConfig {
488 enabled: true,
489 threshold: Duration::from_millis(100),
490 log_file: None,
491 log_parameters: false,
492 max_query_length: 20,
493 max_recent_entries: 10,
494 };
495
496 let log = SlowQueryLog::new(config);
497 let fp = super::super::fingerprinter::QueryFingerprinter::new();
498
499 let long_query = "SELECT * FROM users WHERE name = 'this is a very long query'";
500 let exec = QueryExecution::new(long_query, Duration::from_millis(150));
501 let fingerprint = fp.fingerprint(long_query);
502 log.log_if_slow(&exec, &fingerprint);
503
504 let recent = log.recent(1);
505 assert_eq!(recent.len(), 1);
506 assert!(recent[0].query.len() <= 23); assert!(recent[0].query.ends_with("..."));
508 }
509}