heliosdb_proxy/analytics/
slow_log.rs1use std::collections::VecDeque;
6use std::fs::{File, OpenOptions};
7use std::io::{BufRead, BufReader, Write};
8use std::path::PathBuf;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::time::{Duration, SystemTime};
11
12use parking_lot::RwLock;
13
14use super::config::SlowQueryConfig;
15use super::fingerprinter::QueryFingerprint;
16use super::statistics::QueryExecution;
17
18#[derive(Debug, Clone)]
20pub struct SlowQueryEntry {
21 pub timestamp_nanos: u64,
23
24 pub duration: Duration,
26
27 pub query: String,
29
30 pub fingerprint: String,
32
33 pub fingerprint_hash: u64,
35
36 pub user: String,
38
39 pub database: String,
41
42 pub client_ip: String,
44
45 pub node: String,
47
48 pub rows: usize,
50
51 pub error: Option<String>,
53
54 pub session_id: Option<String>,
56
57 pub workflow_id: Option<String>,
59}
60
61impl SlowQueryEntry {
62 pub fn from_execution(
64 execution: &QueryExecution,
65 fingerprint: &QueryFingerprint,
66 max_query_length: usize,
67 ) -> Self {
68 let query = if execution.query.len() > max_query_length {
69 format!("{}...", &execution.query[..max_query_length])
70 } else {
71 execution.query.clone()
72 };
73
74 Self {
75 timestamp_nanos: now_nanos(),
76 duration: execution.duration,
77 query,
78 fingerprint: fingerprint.normalized.clone(),
79 fingerprint_hash: fingerprint.hash,
80 user: execution.user.clone(),
81 database: execution.database.clone(),
82 client_ip: execution.client_ip.clone(),
83 node: execution.node.clone(),
84 rows: execution.rows,
85 error: execution.error.clone(),
86 session_id: execution.session_id.clone(),
87 workflow_id: execution.workflow_id.clone(),
88 }
89 }
90
91 pub fn format_log_line(&self) -> String {
93 let timestamp = format_timestamp(self.timestamp_nanos);
94 let duration_ms = self.duration.as_secs_f64() * 1000.0;
95 let status = if self.error.is_some() { "ERROR" } else { "OK" };
96
97 format!(
98 "{} user={} db={} client={} node={} duration={:.3}ms rows={} status={} query={}",
99 timestamp,
100 self.user,
101 self.database,
102 self.client_ip,
103 self.node,
104 duration_ms,
105 self.rows,
106 status,
107 self.query.replace('\n', " ")
108 )
109 }
110
111 pub fn parse_log_line(line: &str) -> Option<Self> {
113 let parts: Vec<&str> = line.splitn(9, ' ').collect();
117 if parts.len() < 9 {
118 return None;
119 }
120
121 let timestamp = parts[0];
122 let timestamp_nanos = parse_timestamp(timestamp)?;
123
124 let mut user = String::new();
125 let mut db = String::new();
126 let mut client = String::new();
127 let mut node = String::new();
128 let mut duration_ms = 0.0f64;
129 let mut rows = 0usize;
130 let mut status = "OK";
131 let mut query = String::new();
132
133 for part in &parts[1..] {
134 if let Some(val) = part.strip_prefix("user=") {
135 user = val.to_string();
136 } else if let Some(val) = part.strip_prefix("db=") {
137 db = val.to_string();
138 } else if let Some(val) = part.strip_prefix("client=") {
139 client = val.to_string();
140 } else if let Some(val) = part.strip_prefix("node=") {
141 node = val.to_string();
142 } else if let Some(val) = part.strip_prefix("duration=") {
143 if let Some(ms_str) = val.strip_suffix("ms") {
144 duration_ms = ms_str.parse().unwrap_or(0.0);
145 }
146 } else if let Some(val) = part.strip_prefix("rows=") {
147 rows = val.parse().unwrap_or(0);
148 } else if let Some(val) = part.strip_prefix("status=") {
149 status = val;
150 } else if let Some(val) = part.strip_prefix("query=") {
151 query = val.to_string();
152 }
153 }
154
155 let error = if status == "ERROR" {
156 Some("Query failed".to_string())
157 } else {
158 None
159 };
160
161 Some(Self {
162 timestamp_nanos,
163 duration: Duration::from_secs_f64(duration_ms / 1000.0),
164 query,
165 fingerprint: String::new(),
166 fingerprint_hash: 0,
167 user,
168 database: db,
169 client_ip: client,
170 node,
171 rows,
172 error,
173 session_id: None,
174 workflow_id: None,
175 })
176 }
177}
178
179pub struct SlowQueryLog {
181 config: SlowQueryConfig,
183
184 recent: RwLock<VecDeque<SlowQueryEntry>>,
186
187 file_writer: RwLock<Option<File>>,
189
190 logged_count: AtomicU64,
192}
193
194impl SlowQueryLog {
195 pub fn new(config: SlowQueryConfig) -> Self {
197 let file_writer = if let Some(ref path) = config.log_file {
198 match OpenOptions::new()
199 .create(true)
200 .append(true)
201 .open(path)
202 {
203 Ok(file) => Some(file),
204 Err(e) => {
205 eprintln!("Failed to open slow query log file {:?}: {}", path, e);
206 None
207 }
208 }
209 } else {
210 None
211 };
212
213 Self {
214 config,
215 recent: RwLock::new(VecDeque::new()),
216 file_writer: RwLock::new(file_writer),
217 logged_count: AtomicU64::new(0),
218 }
219 }
220
221 pub fn log_if_slow(&self, execution: &QueryExecution, fingerprint: &QueryFingerprint) {
223 if !self.config.enabled {
224 return;
225 }
226
227 if execution.duration < self.config.threshold {
228 return;
229 }
230
231 let entry = SlowQueryEntry::from_execution(
232 execution,
233 fingerprint,
234 self.config.max_query_length,
235 );
236
237 self.log_entry(entry);
238 }
239
240 pub fn log_entry(&self, entry: SlowQueryEntry) {
242 self.logged_count.fetch_add(1, Ordering::Relaxed);
243
244 {
246 let mut recent = self.recent.write();
247 recent.push_back(entry.clone());
248
249 while recent.len() > self.config.max_recent_entries {
251 recent.pop_front();
252 }
253 }
254
255 if let Some(ref mut file) = *self.file_writer.write() {
257 let line = entry.format_log_line();
258 if let Err(e) = writeln!(file, "{}", line) {
259 eprintln!("Failed to write slow query log: {}", e);
260 }
261 }
262 }
263
264 pub fn recent(&self, limit: usize) -> Vec<SlowQueryEntry> {
266 let recent = self.recent.read();
267 recent
268 .iter()
269 .rev()
270 .take(limit)
271 .cloned()
272 .collect()
273 }
274
275 pub fn all_recent(&self) -> Vec<SlowQueryEntry> {
277 self.recent.read().iter().cloned().collect()
278 }
279
280 pub fn count(&self) -> u64 {
282 self.logged_count.load(Ordering::Relaxed)
283 }
284
285 pub fn threshold(&self) -> Duration {
287 self.config.threshold
288 }
289
290 pub fn clear(&self) {
292 self.recent.write().clear();
293 }
294
295 pub fn is_enabled(&self) -> bool {
297 self.config.enabled
298 }
299}
300
301pub struct SlowQueryReader {
303 path: PathBuf,
305}
306
307impl SlowQueryReader {
308 pub fn new(path: impl Into<PathBuf>) -> Self {
310 Self { path: path.into() }
311 }
312
313 pub fn read_all(&self) -> std::io::Result<Vec<SlowQueryEntry>> {
315 let file = File::open(&self.path)?;
316 let reader = BufReader::new(file);
317 let mut entries = Vec::new();
318
319 for line in reader.lines() {
320 let line = line?;
321 if let Some(entry) = SlowQueryEntry::parse_log_line(&line) {
322 entries.push(entry);
323 }
324 }
325
326 Ok(entries)
327 }
328
329 pub fn read_range(
331 &self,
332 start_nanos: u64,
333 end_nanos: u64,
334 ) -> std::io::Result<Vec<SlowQueryEntry>> {
335 let all = self.read_all()?;
336 Ok(all
337 .into_iter()
338 .filter(|e| e.timestamp_nanos >= start_nanos && e.timestamp_nanos <= end_nanos)
339 .collect())
340 }
341
342 pub fn read_slower_than(&self, threshold: Duration) -> std::io::Result<Vec<SlowQueryEntry>> {
344 let all = self.read_all()?;
345 Ok(all
346 .into_iter()
347 .filter(|e| e.duration > threshold)
348 .collect())
349 }
350
351 pub fn read_last(&self, n: usize) -> std::io::Result<Vec<SlowQueryEntry>> {
353 let all = self.read_all()?;
354 Ok(all.into_iter().rev().take(n).collect())
355 }
356}
357
358fn now_nanos() -> u64 {
359 SystemTime::now()
360 .duration_since(SystemTime::UNIX_EPOCH)
361 .map(|d| d.as_nanos() as u64)
362 .unwrap_or(0)
363}
364
365fn format_timestamp(nanos: u64) -> String {
366 let secs = nanos / 1_000_000_000;
368 let ms = (nanos % 1_000_000_000) / 1_000_000;
369
370 format!("{}:{:03}", secs, ms)
372}
373
374fn parse_timestamp(s: &str) -> Option<u64> {
375 let parts: Vec<&str> = s.split(':').collect();
376 if parts.len() >= 2 {
377 let secs: u64 = parts[0].parse().ok()?;
378 let ms: u64 = parts[1].parse().ok()?;
379 Some(secs * 1_000_000_000 + ms * 1_000_000)
380 } else {
381 None
382 }
383}
384
385#[cfg(test)]
386mod tests {
387 use super::*;
388
389 #[test]
390 fn test_slow_query_entry_format() {
391 let entry = SlowQueryEntry {
392 timestamp_nanos: 1704067200_000_000_000,
393 duration: Duration::from_millis(1500),
394 query: "SELECT * FROM users WHERE id = 1".to_string(),
395 fingerprint: "select * from users where id = ?".to_string(),
396 fingerprint_hash: 12345,
397 user: "alice".to_string(),
398 database: "mydb".to_string(),
399 client_ip: "192.168.1.100".to_string(),
400 node: "primary".to_string(),
401 rows: 1,
402 error: None,
403 session_id: None,
404 workflow_id: None,
405 };
406
407 let line = entry.format_log_line();
408 assert!(line.contains("user=alice"));
409 assert!(line.contains("db=mydb"));
410 assert!(line.contains("duration=1500.000ms"));
411 assert!(line.contains("status=OK"));
412 }
413
414 #[test]
415 fn test_slow_query_log_enabled() {
416 let config = SlowQueryConfig {
417 enabled: true,
418 threshold: Duration::from_millis(100),
419 log_file: None,
420 log_parameters: false,
421 max_query_length: 1000,
422 max_recent_entries: 10,
423 };
424
425 let log = SlowQueryLog::new(config);
426 assert!(log.is_enabled());
427 assert_eq!(log.threshold(), Duration::from_millis(100));
428 }
429
430 #[test]
431 fn test_slow_query_log_threshold() {
432 let config = SlowQueryConfig {
433 enabled: true,
434 threshold: Duration::from_millis(100),
435 log_file: None,
436 log_parameters: false,
437 max_query_length: 1000,
438 max_recent_entries: 10,
439 };
440
441 let log = SlowQueryLog::new(config);
442
443 let fast_exec = QueryExecution::new("SELECT 1", Duration::from_millis(50));
445 let fingerprint = super::super::fingerprinter::QueryFingerprinter::new()
446 .fingerprint("SELECT 1");
447 log.log_if_slow(&fast_exec, &fingerprint);
448 assert_eq!(log.count(), 0);
449
450 let slow_exec = QueryExecution::new("SELECT * FROM users", Duration::from_millis(150));
452 let fingerprint = super::super::fingerprinter::QueryFingerprinter::new()
453 .fingerprint("SELECT * FROM users");
454 log.log_if_slow(&slow_exec, &fingerprint);
455 assert_eq!(log.count(), 1);
456 }
457
458 #[test]
459 fn test_slow_query_log_recent() {
460 let config = SlowQueryConfig {
461 enabled: true,
462 threshold: Duration::from_millis(100),
463 log_file: None,
464 log_parameters: false,
465 max_query_length: 1000,
466 max_recent_entries: 5,
467 };
468
469 let log = SlowQueryLog::new(config);
470 let fp = super::super::fingerprinter::QueryFingerprinter::new();
471
472 for i in 0..10 {
474 let exec = QueryExecution::new(
475 format!("SELECT * FROM table_{}", i),
476 Duration::from_millis(150),
477 );
478 let fingerprint = fp.fingerprint(&exec.query);
479 log.log_if_slow(&exec, &fingerprint);
480 }
481
482 let recent = log.recent(10);
484 assert_eq!(recent.len(), 5);
485 assert!(recent[0].query.contains("table_9")); }
487
488 #[test]
489 fn test_slow_query_entry_parse() {
490 let line = "1704067200:000 user=alice db=mydb client=127.0.0.1 node=primary duration=1500.000ms rows=1 status=OK query=SELECT 1";
491 let entry = SlowQueryEntry::parse_log_line(line);
492
493 assert!(entry.is_some());
494 let entry = entry.unwrap();
495 assert_eq!(entry.user, "alice");
496 assert_eq!(entry.database, "mydb");
497 assert_eq!(entry.rows, 1);
498 }
499
500 #[test]
501 fn test_query_truncation() {
502 let config = SlowQueryConfig {
503 enabled: true,
504 threshold: Duration::from_millis(100),
505 log_file: None,
506 log_parameters: false,
507 max_query_length: 20,
508 max_recent_entries: 10,
509 };
510
511 let log = SlowQueryLog::new(config);
512 let fp = super::super::fingerprinter::QueryFingerprinter::new();
513
514 let long_query = "SELECT * FROM users WHERE name = 'this is a very long query'";
515 let exec = QueryExecution::new(long_query, Duration::from_millis(150));
516 let fingerprint = fp.fingerprint(long_query);
517 log.log_if_slow(&exec, &fingerprint);
518
519 let recent = log.recent(1);
520 assert_eq!(recent.len(), 1);
521 assert!(recent[0].query.len() <= 23); assert!(recent[0].query.ends_with("..."));
523 }
524}