1use crate::{
4 normalize_log_entries, AnalysisResult, Correlator, EventSourceKind, LogEntry, NormalizedEvent,
5 ProcessOrderCorrelator, QueryType, Result,
6};
7use chrono::{DateTime, Timelike, Utc};
8use regex::Regex;
9use serde::{Deserialize, Serialize};
10use std::cmp::Reverse;
11use std::collections::HashMap;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct QueryMetrics {
16 pub min_duration: f64,
18 pub max_duration: f64,
20 pub average_duration: f64,
22 pub p95_duration: f64,
24 pub p99_duration: f64,
26 pub total_queries: u64,
28 pub total_duration: f64,
30}
31
32impl Default for QueryMetrics {
33 fn default() -> Self {
34 Self {
35 min_duration: 0.0,
36 max_duration: 0.0,
37 average_duration: 0.0,
38 p95_duration: 0.0,
39 p99_duration: 0.0,
40 total_queries: 0,
41 total_duration: 0.0,
42 }
43 }
44}
45
46#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct HourlyStats {
49 pub hour: u32,
51 pub query_count: u64,
53 pub queries_per_second: f64,
55 pub total_duration: f64,
57 pub average_duration: f64,
59}
60
61pub struct QueryAnalyzer {
63 slow_query_threshold: f64,
65 max_slow_queries: usize,
67 max_frequent_queries: usize,
69 literal_regex: Regex,
71 numeric_regex: Regex,
73 string_regex: Regex,
75}
76
77impl QueryAnalyzer {
78 pub fn new() -> Self {
80 Self {
81 slow_query_threshold: 1000.0, max_slow_queries: 10,
83 max_frequent_queries: 20,
84 literal_regex: Regex::new(r"\$(\d+)").unwrap(),
85 numeric_regex: Regex::new(r"\b\d+(?:\.\d+)?\b").unwrap(),
86 string_regex: Regex::new(r"'[^']*'").unwrap(),
87 }
88 }
89
90 pub fn with_settings(
92 slow_query_threshold: f64,
93 max_slow_queries: usize,
94 max_frequent_queries: usize,
95 ) -> Self {
96 Self {
97 slow_query_threshold,
98 max_slow_queries,
99 max_frequent_queries,
100 literal_regex: Regex::new(r"\$(\d+)").unwrap(),
101 numeric_regex: Regex::new(r"\b\d+(?:\.\d+)?\b").unwrap(),
102 string_regex: Regex::new(r"'[^']*'").unwrap(),
103 }
104 }
105
106 pub fn slow_query_threshold(&self) -> f64 {
108 self.slow_query_threshold
109 }
110
111 pub fn max_slow_queries(&self) -> usize {
113 self.max_slow_queries
114 }
115
116 pub fn max_frequent_queries(&self) -> usize {
118 self.max_frequent_queries
119 }
120
121 pub fn analyze(&self, entries: &[LogEntry]) -> Result<AnalysisResult> {
123 let events = normalize_log_entries(entries, EventSourceKind::Stderr);
124 self.analyze_events(&events)
125 }
126
127 pub fn analyze_events(&self, events: &[NormalizedEvent]) -> Result<AnalysisResult> {
129 if events.is_empty() {
130 return Ok(AnalysisResult::new());
131 }
132
133 let mut result = AnalysisResult::new();
134 let mut query_durations = Vec::new();
135 let mut query_counts = HashMap::new();
136 let mut query_type_counts = HashMap::new();
137 let mut hourly_stats = HashMap::new();
138 let mut slow_queries = Vec::new();
139 let mut error_count = 0;
140 let mut connection_count = 0;
141
142 let executions = ProcessOrderCorrelator.correlate(events);
143 for execution in &executions {
144 let duration = execution.duration_ms.unwrap_or(0.0);
145 let normalized_concat = Some(execution.query_family.normalized_sql.clone());
146 for query in &execution.queries {
147 let normalized_query = query.normalized_query.clone();
148 let query_type = &query.query_type;
149
150 *query_counts.entry(normalized_query).or_insert(0) += 1;
152 *query_type_counts.entry(query_type).or_insert(0) += 1;
153 }
154
155 if let Some(ref n) = normalized_concat {
157 if duration > self.slow_query_threshold {
158 slow_queries.push((n.clone(), duration));
159 }
160 }
161
162 let hour = execution.timestamp.hour();
164 let hourly = hourly_stats.entry(hour).or_insert_with(|| HourlyStats {
165 hour,
166 query_count: 0,
167 queries_per_second: 0.0,
168 total_duration: 0.0,
169 average_duration: 0.0,
170 });
171 hourly.query_count += 1;
172 hourly.total_duration += duration;
173 result.total_queries += 1;
174 query_durations.push(duration);
175 result.total_duration += duration;
176 }
177
178 for event in events {
179 if event.is_error() {
180 error_count += 1;
181 } else if event.message().to_lowercase().contains("connection") {
182 connection_count += 1;
183 }
184 }
185
186 let metrics = self.calculate_metrics(&query_durations);
188 result.average_duration = metrics.average_duration;
189 result.p95_duration = metrics.p95_duration;
190 result.p99_duration = metrics.p99_duration;
191
192 result.error_count = error_count;
194 result.connection_count = connection_count;
195
196 slow_queries.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
198 result.slowest_queries = slow_queries
199 .into_iter()
200 .take(self.max_slow_queries)
201 .collect();
202
203 let mut frequent_queries: Vec<_> = query_counts.into_iter().collect();
205 frequent_queries.sort_by_key(|query| Reverse(query.1));
206 result.most_frequent_queries = frequent_queries
207 .into_iter()
208 .take(self.max_frequent_queries)
209 .collect();
210
211 result.query_types = query_type_counts
213 .into_iter()
214 .map(|(query_type, count)| (query_type.to_string(), count))
215 .collect();
216
217 self.calculate_queries_per_second(&mut hourly_stats, events);
219
220 Ok(result)
221 }
222
223 pub fn normalize_query(&self, sql: &str) -> String {
225 let mut normalized = sql.trim().to_string();
226
227 normalized = self.literal_regex.replace_all(&normalized, "?").to_string();
229
230 normalized = self.numeric_regex.replace_all(&normalized, "N").to_string();
232
233 normalized = self.string_regex.replace_all(&normalized, "S").to_string();
235
236 normalized.split_whitespace().collect::<Vec<_>>().join(" ")
238 }
239
240 pub fn classify_query(&self, sql: &str) -> QueryType {
242 let sql_upper = sql.trim().to_uppercase();
243
244 if sql_upper.starts_with("SELECT") {
245 QueryType::Select
246 } else if sql_upper.starts_with("INSERT") {
247 QueryType::Insert
248 } else if sql_upper.starts_with("UPDATE") {
249 QueryType::Update
250 } else if sql_upper.starts_with("DELETE") {
251 QueryType::Delete
252 } else if sql_upper.starts_with("CREATE")
253 || sql_upper.starts_with("DROP")
254 || sql_upper.starts_with("ALTER")
255 || sql_upper.starts_with("TRUNCATE")
256 || sql_upper.starts_with("GRANT")
257 || sql_upper.starts_with("REVOKE")
258 {
259 QueryType::DDL
260 } else {
261 QueryType::Other
262 }
263 }
264
265 pub fn calculate_metrics(&self, durations: &[f64]) -> QueryMetrics {
267 if durations.is_empty() {
268 return QueryMetrics::default();
269 }
270
271 let total_queries = durations.len() as u64;
272 let total_duration = durations.iter().sum::<f64>();
273 let average_duration = total_duration / total_queries as f64;
274
275 let min_duration = durations.iter().fold(f64::INFINITY, |a, &b| a.min(b));
276 let max_duration = durations.iter().fold(0.0_f64, |a, &b| a.max(b));
277
278 let mut sorted_durations = durations.to_vec();
280 sorted_durations.sort_by(|a, b| a.partial_cmp(b).unwrap());
281
282 let p95_index = (sorted_durations.len() as f64 * 0.95) as usize;
283 let p99_index = (sorted_durations.len() as f64 * 0.99) as usize;
284
285 let p95_duration = sorted_durations[p95_index.min(sorted_durations.len() - 1)];
286 let p99_duration = sorted_durations[p99_index.min(sorted_durations.len() - 1)];
287
288 QueryMetrics {
289 min_duration,
290 max_duration,
291 average_duration,
292 p95_duration,
293 p99_duration,
294 total_queries,
295 total_duration,
296 }
297 }
298
299 fn calculate_queries_per_second(
301 &self,
302 hourly_stats: &mut HashMap<u32, HourlyStats>,
303 events: &[NormalizedEvent],
304 ) {
305 let mut hourly_entries: HashMap<u32, Vec<DateTime<Utc>>> = HashMap::new();
307
308 for event in events {
309 if event.is_query() {
310 let hour = event.timestamp.hour();
311 hourly_entries
312 .entry(hour)
313 .or_default()
314 .push(event.timestamp);
315 }
316 }
317
318 for (hour, timestamps) in hourly_entries {
319 if let Some(stats) = hourly_stats.get_mut(&hour) {
320 if timestamps.len() > 1 {
321 let min_time = timestamps.iter().min().unwrap();
322 let max_time = timestamps.iter().max().unwrap();
323 let duration_seconds = (*max_time - *min_time).num_seconds() as f64;
324
325 if duration_seconds > 0.0 {
326 stats.queries_per_second = stats.query_count as f64 / duration_seconds;
327 }
328 }
329
330 if stats.query_count > 0 {
331 stats.average_duration = stats.total_duration / stats.query_count as f64;
332 }
333 }
334 }
335 }
336
337 pub fn find_slow_queries(
339 &self,
340 entries: &[LogEntry],
341 threshold_ms: f64,
342 ) -> Result<Vec<LogEntry>> {
343 let slow_queries: Vec<_> = entries
344 .iter()
345 .filter(|e| e.is_query() && e.duration.unwrap_or(0.0) > threshold_ms)
346 .cloned()
347 .collect();
348
349 Ok(slow_queries)
350 }
351
352 pub fn get_query_type_distribution(&self, entries: &[LogEntry]) -> HashMap<QueryType, u64> {
354 let events = normalize_log_entries(entries, EventSourceKind::Stderr);
355 self.get_query_type_distribution_for_events(&events)
356 }
357
358 pub fn get_query_type_distribution_for_events(
359 &self,
360 events: &[NormalizedEvent],
361 ) -> HashMap<QueryType, u64> {
362 let mut distribution = HashMap::new();
363
364 for event in events {
365 if event.is_query() {
366 if let Some(queries) = event.queries() {
367 for query in queries {
368 *distribution.entry(query.query_type.clone()).or_insert(0) += 1;
369 }
370 }
371 }
372 }
373
374 distribution
375 }
376
377 pub fn calculate_error_rate(&self, entries: &[LogEntry]) -> f64 {
379 let events = normalize_log_entries(entries, EventSourceKind::Stderr);
380 self.calculate_error_rate_for_events(&events)
381 }
382
383 pub fn calculate_error_rate_for_events(&self, events: &[NormalizedEvent]) -> f64 {
384 let total_entries = events.len() as f64;
385 if total_entries == 0.0 {
386 return 0.0;
387 }
388
389 let error_count = events.iter().filter(|event| event.is_error()).count() as f64;
390 error_count / total_entries
391 }
392}
393
394impl Default for QueryAnalyzer {
395 fn default() -> Self {
396 Self::new()
397 }
398}
399
400#[cfg(test)]
401mod tests {
402 use super::*;
403 use crate::LogLevel;
404
405 fn create_test_entry(
406 timestamp: DateTime<Utc>,
407 message_type: LogLevel,
408 query: Option<String>,
409 duration: Option<f64>,
410 ) -> LogEntry {
411 LogEntry {
412 timestamp,
413 process_id: "12345".to_string(),
414 user: Some("test_user".to_string()),
415 database: Some("testdb".to_string()),
416 client_host: None,
417 application_name: Some("psql".to_string()),
418 message_type,
419 message: query
420 .as_ref()
421 .map_or("test message".to_string(), |q| format!("statement: {}", q)),
422 queries: crate::Query::from_sql(query.as_deref().unwrap_or("")).ok(),
423 duration,
424 }
425 }
426
427 #[test]
428 fn test_normalize_query() {
429 let analyzer = QueryAnalyzer::new();
430
431 let query = "SELECT * FROM users WHERE id = $1 AND name = $2";
433 let normalized = analyzer.normalize_query(query);
434 assert_eq!(normalized, "SELECT * FROM users WHERE id = ? AND name = ?");
435
436 let query = "SELECT * FROM users WHERE age > 25 AND score < 100.5";
438 let normalized = analyzer.normalize_query(query);
439 assert_eq!(
440 normalized,
441 "SELECT * FROM users WHERE age > N AND score < N"
442 );
443
444 let query = "SELECT * FROM users WHERE name = 'John' AND city = 'New York'";
446 let normalized = analyzer.normalize_query(query);
447 assert_eq!(
448 normalized,
449 "SELECT * FROM users WHERE name = S AND city = S"
450 );
451
452 let query = "SELECT * FROM users WHERE id=1";
454 let normalized = analyzer.normalize_query(query);
455 assert_eq!(normalized, "SELECT * FROM users WHERE id=N");
456 }
457
458 #[test]
459 fn test_classify_query() {
460 let analyzer = QueryAnalyzer::new();
461
462 assert_eq!(
463 analyzer.classify_query("SELECT * FROM users"),
464 QueryType::Select
465 );
466 assert_eq!(
467 analyzer.classify_query("INSERT INTO users VALUES (1, 'John')"),
468 QueryType::Insert
469 );
470 assert_eq!(
471 analyzer.classify_query("UPDATE users SET name = 'Jane'"),
472 QueryType::Update
473 );
474 assert_eq!(
475 analyzer.classify_query("DELETE FROM users WHERE id = 1"),
476 QueryType::Delete
477 );
478 assert_eq!(
479 analyzer.classify_query("CREATE TABLE users (id INT)"),
480 QueryType::DDL
481 );
482 assert_eq!(analyzer.classify_query("DROP TABLE users"), QueryType::DDL);
483 assert_eq!(analyzer.classify_query("BEGIN"), QueryType::Other);
484 assert_eq!(analyzer.classify_query("COMMIT"), QueryType::Other);
485 }
486
487 #[test]
488 fn test_analyze_empty_entries() {
489 let analyzer = QueryAnalyzer::new();
490 let result = analyzer.analyze(&[]).unwrap();
491
492 assert_eq!(result.total_queries, 0);
493 assert_eq!(result.total_duration, 0.0);
494 assert_eq!(result.error_count, 0);
495 assert_eq!(result.connection_count, 0);
496 }
497
498 #[test]
499 fn test_analyze_with_queries() {
500 let analyzer = QueryAnalyzer::new();
501 let now = Utc::now();
502
503 let entries = vec![
504 create_test_entry(
505 now,
506 LogLevel::Statement,
507 Some("SELECT * FROM users".to_string()),
508 Some(100.0),
509 ),
510 create_test_entry(
511 now,
512 LogLevel::Statement,
513 Some("SELECT * FROM users".to_string()),
514 Some(200.0),
515 ),
516 create_test_entry(
517 now,
518 LogLevel::Statement,
519 Some("INSERT INTO users VALUES (1)".to_string()),
520 Some(50.0),
521 ),
522 create_test_entry(now, LogLevel::Error, None, None),
523 ];
524
525 let result = analyzer.analyze(&entries).unwrap();
526
527 assert_eq!(result.total_queries, 3);
528 assert_eq!(result.total_duration, 350.0);
529 assert_eq!(result.average_duration, 116.66666666666667);
530 assert_eq!(result.error_count, 1);
531 assert_eq!(result.connection_count, 0);
532
533 assert_eq!(result.query_types.get("SELECT"), Some(&2));
535 assert_eq!(result.query_types.get("INSERT"), Some(&1));
536 }
537
538 #[test]
539 fn test_analyze_events_matches_log_entry_analysis() {
540 let analyzer = QueryAnalyzer::with_settings(100.0, 5, 5);
541 let now = Utc::now();
542
543 let entries = vec![
544 create_test_entry(
545 now,
546 LogLevel::Statement,
547 Some("SELECT * FROM users WHERE id = 1".to_string()),
548 Some(150.0),
549 ),
550 create_test_entry(
551 now,
552 LogLevel::Statement,
553 Some("INSERT INTO users VALUES (1)".to_string()),
554 Some(50.0),
555 ),
556 create_test_entry(now, LogLevel::Error, None, None),
557 create_test_entry(now, LogLevel::Log, None, None),
558 ];
559
560 let events = normalize_log_entries(&entries, EventSourceKind::Stderr);
561 let entry_result = analyzer.analyze(&entries).unwrap();
562 let event_result = analyzer.analyze_events(&events).unwrap();
563
564 assert_eq!(event_result.total_queries, entry_result.total_queries);
565 assert_eq!(event_result.total_duration, entry_result.total_duration);
566 assert_eq!(event_result.error_count, entry_result.error_count);
567 assert_eq!(event_result.query_types, entry_result.query_types);
568 assert_eq!(event_result.slowest_queries, entry_result.slowest_queries);
569 }
570
571 #[test]
572 fn test_analyze_events_uses_correlated_statement_duration_pairs() {
573 let analyzer = QueryAnalyzer::with_settings(100.0, 5, 5);
574 let parser = crate::StderrParser::new();
575 let lines = vec![
576 "2024-08-15 10:30:15.123 UTC [12345] postgres@testdb psql: LOG: statement: SELECT * FROM users WHERE id = 1".to_string(),
577 "2024-08-15 10:30:15.456 UTC [12345] postgres@testdb psql: LOG: duration: 150.000 ms".to_string(),
578 ];
579 let entries = parser.parse_lines(&lines).unwrap();
580 let events = normalize_log_entries(&entries, EventSourceKind::Stderr);
581
582 let result = analyzer.analyze_events(&events).unwrap();
583
584 assert_eq!(result.total_queries, 1);
585 assert_eq!(result.total_duration, 150.0);
586 assert_eq!(result.average_duration, 150.0);
587 assert_eq!(result.slowest_queries.len(), 1);
588 assert_eq!(
589 result.slowest_queries[0],
590 ("SELECT * FROM users WHERE id = ?".to_string(), 150.0)
591 );
592 }
593
594 #[test]
595 fn test_analyze_events_correlates_interleaved_processes() {
596 let analyzer = QueryAnalyzer::with_settings(100.0, 5, 5);
597 let parser = crate::StderrParser::new();
598 let lines = vec![
599 "2024-08-15 10:30:15.000 UTC [11111] postgres@testdb psql: LOG: statement: SELECT * FROM users WHERE id = 1".to_string(),
600 "2024-08-15 10:30:15.001 UTC [22222] postgres@testdb psql: LOG: statement: SELECT * FROM orders WHERE id = 2".to_string(),
601 "2024-08-15 10:30:15.002 UTC [22222] postgres@testdb psql: LOG: duration: 250.000 ms".to_string(),
602 "2024-08-15 10:30:15.003 UTC [11111] postgres@testdb psql: LOG: duration: 150.000 ms".to_string(),
603 ];
604 let entries = parser.parse_lines(&lines).unwrap();
605 let events = normalize_log_entries(&entries, EventSourceKind::Stderr);
606
607 let result = analyzer.analyze_events(&events).unwrap();
608
609 assert_eq!(result.total_queries, 2);
610 assert_eq!(result.total_duration, 400.0);
611 assert_eq!(result.slowest_queries.len(), 2);
612 assert_eq!(
613 result.slowest_queries[0],
614 ("SELECT * FROM orders WHERE id = ?".to_string(), 250.0)
615 );
616 assert_eq!(
617 result.slowest_queries[1],
618 ("SELECT * FROM users WHERE id = ?".to_string(), 150.0)
619 );
620 }
621
622 #[test]
623 fn test_event_native_distribution_and_error_rate() {
624 let analyzer = QueryAnalyzer::new();
625 let now = Utc::now();
626 let entries = vec![
627 create_test_entry(
628 now,
629 LogLevel::Statement,
630 Some("SELECT * FROM users".to_string()),
631 Some(10.0),
632 ),
633 create_test_entry(
634 now,
635 LogLevel::Statement,
636 Some("UPDATE users SET name = 'Jane'".to_string()),
637 Some(20.0),
638 ),
639 create_test_entry(now, LogLevel::Error, None, None),
640 create_test_entry(now, LogLevel::Warning, None, None),
641 ];
642 let events = normalize_log_entries(&entries, EventSourceKind::Stderr);
643
644 let distribution = analyzer.get_query_type_distribution_for_events(&events);
645 let error_rate = analyzer.calculate_error_rate_for_events(&events);
646
647 assert_eq!(distribution.get(&QueryType::Select), Some(&1));
648 assert_eq!(distribution.get(&QueryType::Update), Some(&1));
649 assert_eq!(error_rate, 0.25);
650 }
651
652 #[test]
653 fn test_slow_queries() {
654 let analyzer = QueryAnalyzer::with_settings(100.0, 5, 5);
655 let now = Utc::now();
656
657 let entries = vec![
658 create_test_entry(
659 now,
660 LogLevel::Statement,
661 Some("SELECT * FROM users".to_string()),
662 Some(50.0),
663 ),
664 create_test_entry(
665 now,
666 LogLevel::Statement,
667 Some("SELECT * FROM posts".to_string()),
668 Some(150.0),
669 ),
670 create_test_entry(
671 now,
672 LogLevel::Statement,
673 Some("SELECT * FROM comments".to_string()),
674 Some(250.0),
675 ),
676 ];
677
678 let result = analyzer.analyze(&entries).unwrap();
679
680 assert_eq!(result.slowest_queries.len(), 2); assert_eq!(result.slowest_queries[0].1, 250.0); assert_eq!(result.slowest_queries[1].1, 150.0);
683 }
684
685 #[test]
686 fn test_error_rate_calculation() {
687 let analyzer = QueryAnalyzer::new();
688 let now = Utc::now();
689
690 let entries = vec![
691 create_test_entry(
692 now,
693 LogLevel::Statement,
694 Some("SELECT * FROM users".to_string()),
695 Some(100.0),
696 ),
697 create_test_entry(now, LogLevel::Error, None, None),
698 create_test_entry(
699 now,
700 LogLevel::Statement,
701 Some("SELECT * FROM posts".to_string()),
702 Some(200.0),
703 ),
704 create_test_entry(now, LogLevel::Error, None, None),
705 ];
706
707 let error_rate = analyzer.calculate_error_rate(&entries);
708 assert_eq!(error_rate, 0.5); }
710
711 #[test]
712 fn test_query_type_distribution() {
713 let analyzer = QueryAnalyzer::new();
714 let now = Utc::now();
715
716 let entries = vec![
717 create_test_entry(
718 now,
719 LogLevel::Statement,
720 Some("SELECT * FROM users".to_string()),
721 Some(100.0),
722 ),
723 create_test_entry(
724 now,
725 LogLevel::Statement,
726 Some("SELECT * FROM posts".to_string()),
727 Some(200.0),
728 ),
729 create_test_entry(
730 now,
731 LogLevel::Statement,
732 Some("INSERT INTO users VALUES (1)".to_string()),
733 Some(50.0),
734 ),
735 create_test_entry(
736 now,
737 LogLevel::Statement,
738 Some("UPDATE users SET name = 'John'".to_string()),
739 Some(75.0),
740 ),
741 ];
742
743 let distribution = analyzer.get_query_type_distribution(&entries);
744
745 assert_eq!(distribution.get(&QueryType::Select), Some(&2));
746 assert_eq!(distribution.get(&QueryType::Insert), Some(&1));
747 assert_eq!(distribution.get(&QueryType::Update), Some(&1));
748 assert_eq!(distribution.get(&QueryType::Delete), None);
749 }
750}