use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use thiserror::Error;
pub mod analytics;
pub mod correlation;
pub mod events;
pub mod findings;
pub mod output;
pub mod parsers;
pub mod sql;
pub use analytics::{QueryAnalyzer, TimingAnalysis, TimingAnalyzer};
pub use correlation::{
correlate_query_executions, CorrelationConfidence, Correlator, ProcessOrderCorrelator,
QueryExecution, QueryFamilyIdentity,
};
pub use events::{
normalize_log_entries, DurationEvent, ErrorEvent, EventKind, EventSourceKind, NormalizedEvent,
SessionIdentity, SourceReference, StatementEvent,
};
pub use findings::{
query_family_findings, slow_query_diff_findings, ComparisonMetrics, DeltaMetrics, Finding,
FindingConfidence, FindingKind, FindingMetrics, FindingSet, QueryFamilyFinding, ReasonCode,
SlowQueryDiffOptions, FINDING_SCHEMA_VERSION,
};
pub use output::{JsonFormatter, TextFormatter};
pub use parsers::StderrParser;
pub use sql::{Query, QueryType};
#[derive(Error, Debug)]
pub enum PgLogstatsError {
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),
#[error("Parse error: {message}")]
Parse {
message: String,
line_number: Option<usize>,
line_content: Option<String>,
},
#[error("Timestamp parse error: {message}")]
TimestampParse {
message: String,
timestamp_string: String,
},
#[error("Configuration error: {message}")]
Configuration {
message: String,
field: Option<String>,
},
#[error("Analytics error: {message}")]
Analytics { message: String, operation: String },
#[error("Serialization error: {0}")]
Serialization(#[from] serde_json::Error),
#[error("Unexpected error: {message}")]
Unexpected {
message: String,
context: Option<String>,
},
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum LogLevel {
Error,
Warning,
Info,
Debug,
Notice,
Log,
Statement,
Duration,
Fatal,
Panic,
Unknown(String),
}
impl std::fmt::Display for LogLevel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
LogLevel::Error => write!(f, "ERROR"),
LogLevel::Warning => write!(f, "WARNING"),
LogLevel::Info => write!(f, "INFO"),
LogLevel::Debug => write!(f, "DEBUG"),
LogLevel::Notice => write!(f, "NOTICE"),
LogLevel::Log => write!(f, "LOG"),
LogLevel::Statement => write!(f, "STATEMENT"),
LogLevel::Duration => write!(f, "DURATION"),
LogLevel::Fatal => write!(f, "FATAL"),
LogLevel::Panic => write!(f, "PANIC"),
LogLevel::Unknown(s) => write!(f, "{}", s.to_uppercase()),
}
}
}
impl From<&str> for LogLevel {
fn from(s: &str) -> Self {
match s.to_uppercase().as_str() {
"ERROR" => LogLevel::Error,
"WARNING" => LogLevel::Warning,
"INFO" => LogLevel::Info,
"DEBUG" => LogLevel::Debug,
"NOTICE" => LogLevel::Notice,
"LOG" => LogLevel::Log,
"STATEMENT" => LogLevel::Statement,
"DURATION" => LogLevel::Duration,
"FATAL" => LogLevel::Fatal,
"PANIC" => LogLevel::Panic,
_ => LogLevel::Unknown(s.to_string()),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogEntry {
pub timestamp: DateTime<Utc>,
pub process_id: String,
pub user: Option<String>,
pub database: Option<String>,
pub client_host: Option<String>,
pub application_name: Option<String>,
pub message_type: LogLevel,
pub message: String,
pub queries: Option<Vec<Query>>,
pub duration: Option<f64>,
}
impl LogEntry {
pub fn new(
timestamp: DateTime<Utc>,
process_id: String,
message_type: LogLevel,
message: String,
) -> Self {
Self {
timestamp,
process_id,
user: None,
database: None,
client_host: None,
application_name: None,
message_type,
message,
queries: None,
duration: None,
}
}
pub fn is_query(&self) -> bool {
matches!(self.message_type, LogLevel::Statement)
}
pub fn is_duration(&self) -> bool {
matches!(self.message_type, LogLevel::Duration)
}
pub fn is_error(&self) -> bool {
matches!(self.message_type, LogLevel::Error)
}
pub fn normalized_query(&self) -> Option<String> {
let mut normalized_query: Option<String> = None;
if self.is_query() {
if let Some(queries) = &self.queries {
for query in queries {
normalized_query = match normalized_query {
Some(ref mut s) => Some(format!("{};{}", s, query.normalized_query)),
None => Some(query.normalized_query.clone()),
};
}
}
}
normalized_query
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AnalysisResult {
pub total_queries: u64,
pub total_duration: f64,
pub query_types: HashMap<String, u64>,
pub slowest_queries: Vec<(String, f64)>,
pub most_frequent_queries: Vec<(String, u64)>,
pub error_count: u64,
pub connection_count: u64,
pub average_duration: f64,
pub p95_duration: f64,
pub p99_duration: f64,
}
impl AnalysisResult {
pub fn new() -> Self {
Self {
total_queries: 0,
total_duration: 0.0,
query_types: HashMap::new(),
slowest_queries: Vec::new(),
most_frequent_queries: Vec::new(),
error_count: 0,
connection_count: 0,
average_duration: 0.0,
p95_duration: 0.0,
p99_duration: 0.0,
}
}
pub fn add_query(&mut self, query: &str, duration: f64) {
self.total_queries += 1;
self.total_duration += duration;
let query_type = self.extract_query_type(query);
*self.query_types.entry(query_type).or_insert(0) += 1;
self.average_duration = self.total_duration / self.total_queries as f64;
}
pub fn add_error(&mut self) {
self.error_count += 1;
}
pub fn add_connection(&mut self) {
self.connection_count += 1;
}
fn extract_query_type(&self, query: &str) -> String {
let query_upper = query.trim().to_uppercase();
if query_upper.starts_with("SELECT") {
"SELECT".to_string()
} else if query_upper.starts_with("INSERT") {
"INSERT".to_string()
} else if query_upper.starts_with("UPDATE") {
"UPDATE".to_string()
} else if query_upper.starts_with("DELETE") {
"DELETE".to_string()
} else if query_upper.starts_with("CREATE") {
"CREATE".to_string()
} else if query_upper.starts_with("DROP") {
"DROP".to_string()
} else if query_upper.starts_with("ALTER") {
"ALTER".to_string()
} else if query_upper.starts_with("BEGIN")
|| query_upper.starts_with("COMMIT")
|| query_upper.starts_with("ROLLBACK")
{
"TRANSACTION".to_string()
} else {
"OTHER".to_string()
}
}
pub fn calculate_percentiles(&mut self, durations: &[f64]) {
if durations.is_empty() {
return;
}
let mut sorted_durations = durations.to_vec();
sorted_durations.sort_by(|a, b| a.partial_cmp(b).unwrap());
let len = sorted_durations.len();
let p95_index = (len as f64 * 0.95) as usize;
let p99_index = (len as f64 * 0.99) as usize;
self.p95_duration = sorted_durations[p95_index.min(len - 1)];
self.p99_duration = sorted_durations[p99_index.min(len - 1)];
}
}
impl Default for AnalysisResult {
fn default() -> Self {
Self::new()
}
}
pub type Result<T> = std::result::Result<T, PgLogstatsError>;
pub fn parse_error(
message: &str,
line_number: Option<usize>,
line_content: Option<&str>,
) -> PgLogstatsError {
PgLogstatsError::Parse {
message: message.to_string(),
line_number,
line_content: line_content.map(|s| s.to_string()),
}
}
pub fn timestamp_error(message: &str, timestamp_string: &str) -> PgLogstatsError {
PgLogstatsError::TimestampParse {
message: message.to_string(),
timestamp_string: timestamp_string.to_string(),
}
}
pub fn config_error(message: &str, field: Option<&str>) -> PgLogstatsError {
PgLogstatsError::Configuration {
message: message.to_string(),
field: field.map(|s| s.to_string()),
}
}
pub fn analytics_error(message: &str, operation: &str) -> PgLogstatsError {
PgLogstatsError::Analytics {
message: message.to_string(),
operation: operation.to_string(),
}
}