use crate::{
normalize_log_entries, AnalysisResult, Correlator, EventSourceKind, LogEntry, NormalizedEvent,
ProcessOrderCorrelator, QueryType, Result,
};
use chrono::{DateTime, Timelike, Utc};
use regex::Regex;
use serde::{Deserialize, Serialize};
use std::cmp::Reverse;
use std::collections::HashMap;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueryMetrics {
pub min_duration: f64,
pub max_duration: f64,
pub average_duration: f64,
pub p95_duration: f64,
pub p99_duration: f64,
pub total_queries: u64,
pub total_duration: f64,
}
impl Default for QueryMetrics {
fn default() -> Self {
Self {
min_duration: 0.0,
max_duration: 0.0,
average_duration: 0.0,
p95_duration: 0.0,
p99_duration: 0.0,
total_queries: 0,
total_duration: 0.0,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HourlyStats {
pub hour: u32,
pub query_count: u64,
pub queries_per_second: f64,
pub total_duration: f64,
pub average_duration: f64,
}
pub struct QueryAnalyzer {
slow_query_threshold: f64,
max_slow_queries: usize,
max_frequent_queries: usize,
literal_regex: Regex,
numeric_regex: Regex,
string_regex: Regex,
}
impl QueryAnalyzer {
pub fn new() -> Self {
Self {
slow_query_threshold: 1000.0, max_slow_queries: 10,
max_frequent_queries: 20,
literal_regex: Regex::new(r"\$(\d+)").unwrap(),
numeric_regex: Regex::new(r"\b\d+(?:\.\d+)?\b").unwrap(),
string_regex: Regex::new(r"'[^']*'").unwrap(),
}
}
pub fn with_settings(
slow_query_threshold: f64,
max_slow_queries: usize,
max_frequent_queries: usize,
) -> Self {
Self {
slow_query_threshold,
max_slow_queries,
max_frequent_queries,
literal_regex: Regex::new(r"\$(\d+)").unwrap(),
numeric_regex: Regex::new(r"\b\d+(?:\.\d+)?\b").unwrap(),
string_regex: Regex::new(r"'[^']*'").unwrap(),
}
}
pub fn slow_query_threshold(&self) -> f64 {
self.slow_query_threshold
}
pub fn max_slow_queries(&self) -> usize {
self.max_slow_queries
}
pub fn max_frequent_queries(&self) -> usize {
self.max_frequent_queries
}
pub fn analyze(&self, entries: &[LogEntry]) -> Result<AnalysisResult> {
let events = normalize_log_entries(entries, EventSourceKind::Stderr);
self.analyze_events(&events)
}
pub fn analyze_events(&self, events: &[NormalizedEvent]) -> Result<AnalysisResult> {
if events.is_empty() {
return Ok(AnalysisResult::new());
}
let mut result = AnalysisResult::new();
let mut query_durations = Vec::new();
let mut query_counts = HashMap::new();
let mut query_type_counts = HashMap::new();
let mut hourly_stats = HashMap::new();
let mut slow_queries = Vec::new();
let mut error_count = 0;
let mut connection_count = 0;
let executions = ProcessOrderCorrelator.correlate(events);
for execution in &executions {
let duration = execution.duration_ms.unwrap_or(0.0);
let normalized_concat = Some(execution.query_family.normalized_sql.clone());
for query in &execution.queries {
let normalized_query = query.normalized_query.clone();
let query_type = &query.query_type;
*query_counts.entry(normalized_query).or_insert(0) += 1;
*query_type_counts.entry(query_type).or_insert(0) += 1;
}
if let Some(ref n) = normalized_concat {
if duration > self.slow_query_threshold {
slow_queries.push((n.clone(), duration));
}
}
let hour = execution.timestamp.hour();
let hourly = hourly_stats.entry(hour).or_insert_with(|| HourlyStats {
hour,
query_count: 0,
queries_per_second: 0.0,
total_duration: 0.0,
average_duration: 0.0,
});
hourly.query_count += 1;
hourly.total_duration += duration;
result.total_queries += 1;
query_durations.push(duration);
result.total_duration += duration;
}
for event in events {
if event.is_error() {
error_count += 1;
} else if event.message().to_lowercase().contains("connection") {
connection_count += 1;
}
}
let metrics = self.calculate_metrics(&query_durations);
result.average_duration = metrics.average_duration;
result.p95_duration = metrics.p95_duration;
result.p99_duration = metrics.p99_duration;
result.error_count = error_count;
result.connection_count = connection_count;
slow_queries.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
result.slowest_queries = slow_queries
.into_iter()
.take(self.max_slow_queries)
.collect();
let mut frequent_queries: Vec<_> = query_counts.into_iter().collect();
frequent_queries.sort_by_key(|query| Reverse(query.1));
result.most_frequent_queries = frequent_queries
.into_iter()
.take(self.max_frequent_queries)
.collect();
result.query_types = query_type_counts
.into_iter()
.map(|(query_type, count)| (query_type.to_string(), count))
.collect();
self.calculate_queries_per_second(&mut hourly_stats, events);
Ok(result)
}
pub fn normalize_query(&self, sql: &str) -> String {
let mut normalized = sql.trim().to_string();
normalized = self.literal_regex.replace_all(&normalized, "?").to_string();
normalized = self.numeric_regex.replace_all(&normalized, "N").to_string();
normalized = self.string_regex.replace_all(&normalized, "S").to_string();
normalized.split_whitespace().collect::<Vec<_>>().join(" ")
}
pub fn classify_query(&self, sql: &str) -> QueryType {
let sql_upper = sql.trim().to_uppercase();
if sql_upper.starts_with("SELECT") {
QueryType::Select
} else if sql_upper.starts_with("INSERT") {
QueryType::Insert
} else if sql_upper.starts_with("UPDATE") {
QueryType::Update
} else if sql_upper.starts_with("DELETE") {
QueryType::Delete
} else if sql_upper.starts_with("CREATE")
|| sql_upper.starts_with("DROP")
|| sql_upper.starts_with("ALTER")
|| sql_upper.starts_with("TRUNCATE")
|| sql_upper.starts_with("GRANT")
|| sql_upper.starts_with("REVOKE")
{
QueryType::DDL
} else {
QueryType::Other
}
}
pub fn calculate_metrics(&self, durations: &[f64]) -> QueryMetrics {
if durations.is_empty() {
return QueryMetrics::default();
}
let total_queries = durations.len() as u64;
let total_duration = durations.iter().sum::<f64>();
let average_duration = total_duration / total_queries as f64;
let min_duration = durations.iter().fold(f64::INFINITY, |a, &b| a.min(b));
let max_duration = durations.iter().fold(0.0_f64, |a, &b| a.max(b));
let mut sorted_durations = durations.to_vec();
sorted_durations.sort_by(|a, b| a.partial_cmp(b).unwrap());
let p95_index = (sorted_durations.len() as f64 * 0.95) as usize;
let p99_index = (sorted_durations.len() as f64 * 0.99) as usize;
let p95_duration = sorted_durations[p95_index.min(sorted_durations.len() - 1)];
let p99_duration = sorted_durations[p99_index.min(sorted_durations.len() - 1)];
QueryMetrics {
min_duration,
max_duration,
average_duration,
p95_duration,
p99_duration,
total_queries,
total_duration,
}
}
fn calculate_queries_per_second(
&self,
hourly_stats: &mut HashMap<u32, HourlyStats>,
events: &[NormalizedEvent],
) {
let mut hourly_entries: HashMap<u32, Vec<DateTime<Utc>>> = HashMap::new();
for event in events {
if event.is_query() {
let hour = event.timestamp.hour();
hourly_entries
.entry(hour)
.or_default()
.push(event.timestamp);
}
}
for (hour, timestamps) in hourly_entries {
if let Some(stats) = hourly_stats.get_mut(&hour) {
if timestamps.len() > 1 {
let min_time = timestamps.iter().min().unwrap();
let max_time = timestamps.iter().max().unwrap();
let duration_seconds = (*max_time - *min_time).num_seconds() as f64;
if duration_seconds > 0.0 {
stats.queries_per_second = stats.query_count as f64 / duration_seconds;
}
}
if stats.query_count > 0 {
stats.average_duration = stats.total_duration / stats.query_count as f64;
}
}
}
}
pub fn find_slow_queries(
&self,
entries: &[LogEntry],
threshold_ms: f64,
) -> Result<Vec<LogEntry>> {
let slow_queries: Vec<_> = entries
.iter()
.filter(|e| e.is_query() && e.duration.unwrap_or(0.0) > threshold_ms)
.cloned()
.collect();
Ok(slow_queries)
}
pub fn get_query_type_distribution(&self, entries: &[LogEntry]) -> HashMap<QueryType, u64> {
let events = normalize_log_entries(entries, EventSourceKind::Stderr);
self.get_query_type_distribution_for_events(&events)
}
pub fn get_query_type_distribution_for_events(
&self,
events: &[NormalizedEvent],
) -> HashMap<QueryType, u64> {
let mut distribution = HashMap::new();
for event in events {
if event.is_query() {
if let Some(queries) = event.queries() {
for query in queries {
*distribution.entry(query.query_type.clone()).or_insert(0) += 1;
}
}
}
}
distribution
}
pub fn calculate_error_rate(&self, entries: &[LogEntry]) -> f64 {
let events = normalize_log_entries(entries, EventSourceKind::Stderr);
self.calculate_error_rate_for_events(&events)
}
pub fn calculate_error_rate_for_events(&self, events: &[NormalizedEvent]) -> f64 {
let total_entries = events.len() as f64;
if total_entries == 0.0 {
return 0.0;
}
let error_count = events.iter().filter(|event| event.is_error()).count() as f64;
error_count / total_entries
}
}
impl Default for QueryAnalyzer {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::LogLevel;
fn create_test_entry(
timestamp: DateTime<Utc>,
message_type: LogLevel,
query: Option<String>,
duration: Option<f64>,
) -> LogEntry {
LogEntry {
timestamp,
process_id: "12345".to_string(),
user: Some("test_user".to_string()),
database: Some("testdb".to_string()),
client_host: None,
application_name: Some("psql".to_string()),
message_type,
message: query
.as_ref()
.map_or("test message".to_string(), |q| format!("statement: {}", q)),
queries: crate::Query::from_sql(query.as_deref().unwrap_or("")).ok(),
duration,
}
}
#[test]
fn test_normalize_query() {
let analyzer = QueryAnalyzer::new();
let query = "SELECT * FROM users WHERE id = $1 AND name = $2";
let normalized = analyzer.normalize_query(query);
assert_eq!(normalized, "SELECT * FROM users WHERE id = ? AND name = ?");
let query = "SELECT * FROM users WHERE age > 25 AND score < 100.5";
let normalized = analyzer.normalize_query(query);
assert_eq!(
normalized,
"SELECT * FROM users WHERE age > N AND score < N"
);
let query = "SELECT * FROM users WHERE name = 'John' AND city = 'New York'";
let normalized = analyzer.normalize_query(query);
assert_eq!(
normalized,
"SELECT * FROM users WHERE name = S AND city = S"
);
let query = "SELECT * FROM users WHERE id=1";
let normalized = analyzer.normalize_query(query);
assert_eq!(normalized, "SELECT * FROM users WHERE id=N");
}
#[test]
fn test_classify_query() {
let analyzer = QueryAnalyzer::new();
assert_eq!(
analyzer.classify_query("SELECT * FROM users"),
QueryType::Select
);
assert_eq!(
analyzer.classify_query("INSERT INTO users VALUES (1, 'John')"),
QueryType::Insert
);
assert_eq!(
analyzer.classify_query("UPDATE users SET name = 'Jane'"),
QueryType::Update
);
assert_eq!(
analyzer.classify_query("DELETE FROM users WHERE id = 1"),
QueryType::Delete
);
assert_eq!(
analyzer.classify_query("CREATE TABLE users (id INT)"),
QueryType::DDL
);
assert_eq!(analyzer.classify_query("DROP TABLE users"), QueryType::DDL);
assert_eq!(analyzer.classify_query("BEGIN"), QueryType::Other);
assert_eq!(analyzer.classify_query("COMMIT"), QueryType::Other);
}
#[test]
fn test_analyze_empty_entries() {
let analyzer = QueryAnalyzer::new();
let result = analyzer.analyze(&[]).unwrap();
assert_eq!(result.total_queries, 0);
assert_eq!(result.total_duration, 0.0);
assert_eq!(result.error_count, 0);
assert_eq!(result.connection_count, 0);
}
#[test]
fn test_analyze_with_queries() {
let analyzer = QueryAnalyzer::new();
let now = Utc::now();
let entries = vec![
create_test_entry(
now,
LogLevel::Statement,
Some("SELECT * FROM users".to_string()),
Some(100.0),
),
create_test_entry(
now,
LogLevel::Statement,
Some("SELECT * FROM users".to_string()),
Some(200.0),
),
create_test_entry(
now,
LogLevel::Statement,
Some("INSERT INTO users VALUES (1)".to_string()),
Some(50.0),
),
create_test_entry(now, LogLevel::Error, None, None),
];
let result = analyzer.analyze(&entries).unwrap();
assert_eq!(result.total_queries, 3);
assert_eq!(result.total_duration, 350.0);
assert_eq!(result.average_duration, 116.66666666666667);
assert_eq!(result.error_count, 1);
assert_eq!(result.connection_count, 0);
assert_eq!(result.query_types.get("SELECT"), Some(&2));
assert_eq!(result.query_types.get("INSERT"), Some(&1));
}
#[test]
fn test_analyze_events_matches_log_entry_analysis() {
let analyzer = QueryAnalyzer::with_settings(100.0, 5, 5);
let now = Utc::now();
let entries = vec![
create_test_entry(
now,
LogLevel::Statement,
Some("SELECT * FROM users WHERE id = 1".to_string()),
Some(150.0),
),
create_test_entry(
now,
LogLevel::Statement,
Some("INSERT INTO users VALUES (1)".to_string()),
Some(50.0),
),
create_test_entry(now, LogLevel::Error, None, None),
create_test_entry(now, LogLevel::Log, None, None),
];
let events = normalize_log_entries(&entries, EventSourceKind::Stderr);
let entry_result = analyzer.analyze(&entries).unwrap();
let event_result = analyzer.analyze_events(&events).unwrap();
assert_eq!(event_result.total_queries, entry_result.total_queries);
assert_eq!(event_result.total_duration, entry_result.total_duration);
assert_eq!(event_result.error_count, entry_result.error_count);
assert_eq!(event_result.query_types, entry_result.query_types);
assert_eq!(event_result.slowest_queries, entry_result.slowest_queries);
}
#[test]
fn test_analyze_events_uses_correlated_statement_duration_pairs() {
let analyzer = QueryAnalyzer::with_settings(100.0, 5, 5);
let parser = crate::StderrParser::new();
let lines = vec![
"2024-08-15 10:30:15.123 UTC [12345] postgres@testdb psql: LOG: statement: SELECT * FROM users WHERE id = 1".to_string(),
"2024-08-15 10:30:15.456 UTC [12345] postgres@testdb psql: LOG: duration: 150.000 ms".to_string(),
];
let entries = parser.parse_lines(&lines).unwrap();
let events = normalize_log_entries(&entries, EventSourceKind::Stderr);
let result = analyzer.analyze_events(&events).unwrap();
assert_eq!(result.total_queries, 1);
assert_eq!(result.total_duration, 150.0);
assert_eq!(result.average_duration, 150.0);
assert_eq!(result.slowest_queries.len(), 1);
assert_eq!(
result.slowest_queries[0],
("SELECT * FROM users WHERE id = ?".to_string(), 150.0)
);
}
#[test]
fn test_analyze_events_correlates_interleaved_processes() {
let analyzer = QueryAnalyzer::with_settings(100.0, 5, 5);
let parser = crate::StderrParser::new();
let lines = vec![
"2024-08-15 10:30:15.000 UTC [11111] postgres@testdb psql: LOG: statement: SELECT * FROM users WHERE id = 1".to_string(),
"2024-08-15 10:30:15.001 UTC [22222] postgres@testdb psql: LOG: statement: SELECT * FROM orders WHERE id = 2".to_string(),
"2024-08-15 10:30:15.002 UTC [22222] postgres@testdb psql: LOG: duration: 250.000 ms".to_string(),
"2024-08-15 10:30:15.003 UTC [11111] postgres@testdb psql: LOG: duration: 150.000 ms".to_string(),
];
let entries = parser.parse_lines(&lines).unwrap();
let events = normalize_log_entries(&entries, EventSourceKind::Stderr);
let result = analyzer.analyze_events(&events).unwrap();
assert_eq!(result.total_queries, 2);
assert_eq!(result.total_duration, 400.0);
assert_eq!(result.slowest_queries.len(), 2);
assert_eq!(
result.slowest_queries[0],
("SELECT * FROM orders WHERE id = ?".to_string(), 250.0)
);
assert_eq!(
result.slowest_queries[1],
("SELECT * FROM users WHERE id = ?".to_string(), 150.0)
);
}
#[test]
fn test_event_native_distribution_and_error_rate() {
let analyzer = QueryAnalyzer::new();
let now = Utc::now();
let entries = vec![
create_test_entry(
now,
LogLevel::Statement,
Some("SELECT * FROM users".to_string()),
Some(10.0),
),
create_test_entry(
now,
LogLevel::Statement,
Some("UPDATE users SET name = 'Jane'".to_string()),
Some(20.0),
),
create_test_entry(now, LogLevel::Error, None, None),
create_test_entry(now, LogLevel::Warning, None, None),
];
let events = normalize_log_entries(&entries, EventSourceKind::Stderr);
let distribution = analyzer.get_query_type_distribution_for_events(&events);
let error_rate = analyzer.calculate_error_rate_for_events(&events);
assert_eq!(distribution.get(&QueryType::Select), Some(&1));
assert_eq!(distribution.get(&QueryType::Update), Some(&1));
assert_eq!(error_rate, 0.25);
}
#[test]
fn test_slow_queries() {
let analyzer = QueryAnalyzer::with_settings(100.0, 5, 5);
let now = Utc::now();
let entries = vec![
create_test_entry(
now,
LogLevel::Statement,
Some("SELECT * FROM users".to_string()),
Some(50.0),
),
create_test_entry(
now,
LogLevel::Statement,
Some("SELECT * FROM posts".to_string()),
Some(150.0),
),
create_test_entry(
now,
LogLevel::Statement,
Some("SELECT * FROM comments".to_string()),
Some(250.0),
),
];
let result = analyzer.analyze(&entries).unwrap();
assert_eq!(result.slowest_queries.len(), 2); assert_eq!(result.slowest_queries[0].1, 250.0); assert_eq!(result.slowest_queries[1].1, 150.0);
}
#[test]
fn test_error_rate_calculation() {
let analyzer = QueryAnalyzer::new();
let now = Utc::now();
let entries = vec![
create_test_entry(
now,
LogLevel::Statement,
Some("SELECT * FROM users".to_string()),
Some(100.0),
),
create_test_entry(now, LogLevel::Error, None, None),
create_test_entry(
now,
LogLevel::Statement,
Some("SELECT * FROM posts".to_string()),
Some(200.0),
),
create_test_entry(now, LogLevel::Error, None, None),
];
let error_rate = analyzer.calculate_error_rate(&entries);
assert_eq!(error_rate, 0.5); }
#[test]
fn test_query_type_distribution() {
let analyzer = QueryAnalyzer::new();
let now = Utc::now();
let entries = vec![
create_test_entry(
now,
LogLevel::Statement,
Some("SELECT * FROM users".to_string()),
Some(100.0),
),
create_test_entry(
now,
LogLevel::Statement,
Some("SELECT * FROM posts".to_string()),
Some(200.0),
),
create_test_entry(
now,
LogLevel::Statement,
Some("INSERT INTO users VALUES (1)".to_string()),
Some(50.0),
),
create_test_entry(
now,
LogLevel::Statement,
Some("UPDATE users SET name = 'John'".to_string()),
Some(75.0),
),
];
let distribution = analyzer.get_query_type_distribution(&entries);
assert_eq!(distribution.get(&QueryType::Select), Some(&2));
assert_eq!(distribution.get(&QueryType::Insert), Some(&1));
assert_eq!(distribution.get(&QueryType::Update), Some(&1));
assert_eq!(distribution.get(&QueryType::Delete), None);
}
}