Skip to main content

pg_logstats/
lib.rs

1//! pg-logstats - PostgreSQL log analysis tool
2//!
3//! This library provides tools for parsing and analyzing PostgreSQL log files.
4//! It includes robust error handling, comprehensive data structures, and
5//! production-ready analysis capabilities.
6
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use thiserror::Error;
11
12pub mod analytics;
13pub mod correlation;
14pub mod events;
15pub mod findings;
16pub mod output;
17pub mod parsers;
18pub mod sql;
19
20// Re-export commonly used items
21pub use analytics::{QueryAnalyzer, TimingAnalysis, TimingAnalyzer};
22pub use correlation::{
23    correlate_query_executions, CorrelationConfidence, Correlator, ProcessOrderCorrelator,
24    QueryExecution, QueryFamilyIdentity,
25};
26pub use events::{
27    normalize_log_entries, DurationEvent, ErrorEvent, EventKind, EventSourceKind, NormalizedEvent,
28    SessionIdentity, SourceReference, StatementEvent,
29};
30pub use findings::{
31    query_family_findings, slow_query_diff_findings, ComparisonMetrics, DeltaMetrics, Finding,
32    FindingConfidence, FindingKind, FindingMetrics, FindingSet, QueryFamilyFinding, ReasonCode,
33    SlowQueryDiffOptions, FINDING_SCHEMA_VERSION,
34};
35pub use output::{JsonFormatter, TextFormatter};
36pub use parsers::StderrParser;
37pub use sql::{Query, QueryType};
38
39/// Main error type for pg-logstats operations
40#[derive(Error, Debug)]
41pub enum PgLogstatsError {
42    /// I/O errors when reading files or writing output
43    #[error("I/O error: {0}")]
44    Io(#[from] std::io::Error),
45
46    /// Errors parsing log files or individual log lines
47    #[error("Parse error: {message}")]
48    Parse {
49        message: String,
50        line_number: Option<usize>,
51        line_content: Option<String>,
52    },
53
54    /// Errors parsing timestamps in log entries
55    #[error("Timestamp parse error: {message}")]
56    TimestampParse {
57        message: String,
58        timestamp_string: String,
59    },
60
61    /// Configuration errors from CLI arguments or settings
62    #[error("Configuration error: {message}")]
63    Configuration {
64        message: String,
65        field: Option<String>,
66    },
67
68    /// Errors during analytics computation
69    #[error("Analytics error: {message}")]
70    Analytics { message: String, operation: String },
71
72    /// Errors serializing/deserializing data
73    #[error("Serialization error: {0}")]
74    Serialization(#[from] serde_json::Error),
75
76    /// Generic error for unexpected conditions
77    #[error("Unexpected error: {message}")]
78    Unexpected {
79        message: String,
80        context: Option<String>,
81    },
82}
83
84/// Log level enumeration for PostgreSQL log entries
85#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
86pub enum LogLevel {
87    /// Error messages
88    Error,
89    /// Warning messages
90    Warning,
91    /// Information messages
92    Info,
93    /// Debug messages
94    Debug,
95    /// Notice messages
96    Notice,
97    /// Log messages
98    Log,
99    /// Statement messages
100    Statement,
101    /// Duration messages
102    Duration,
103    /// Fatal error messages
104    Fatal,
105    /// Panic messages
106    Panic,
107    /// Unknown or unrecognized log level
108    Unknown(String),
109}
110
111impl std::fmt::Display for LogLevel {
112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113        match self {
114            LogLevel::Error => write!(f, "ERROR"),
115            LogLevel::Warning => write!(f, "WARNING"),
116            LogLevel::Info => write!(f, "INFO"),
117            LogLevel::Debug => write!(f, "DEBUG"),
118            LogLevel::Notice => write!(f, "NOTICE"),
119            LogLevel::Log => write!(f, "LOG"),
120            LogLevel::Statement => write!(f, "STATEMENT"),
121            LogLevel::Duration => write!(f, "DURATION"),
122            LogLevel::Fatal => write!(f, "FATAL"),
123            LogLevel::Panic => write!(f, "PANIC"),
124            LogLevel::Unknown(s) => write!(f, "{}", s.to_uppercase()),
125        }
126    }
127}
128
129impl From<&str> for LogLevel {
130    fn from(s: &str) -> Self {
131        match s.to_uppercase().as_str() {
132            "ERROR" => LogLevel::Error,
133            "WARNING" => LogLevel::Warning,
134            "INFO" => LogLevel::Info,
135            "DEBUG" => LogLevel::Debug,
136            "NOTICE" => LogLevel::Notice,
137            "LOG" => LogLevel::Log,
138            "STATEMENT" => LogLevel::Statement,
139            "DURATION" => LogLevel::Duration,
140            "FATAL" => LogLevel::Fatal,
141            "PANIC" => LogLevel::Panic,
142            _ => LogLevel::Unknown(s.to_string()),
143        }
144    }
145}
146
147/// Represents a single parsed PostgreSQL log entry
148#[derive(Debug, Clone, Serialize, Deserialize)]
149pub struct LogEntry {
150    /// Timestamp when the log entry was generated
151    pub timestamp: DateTime<Utc>,
152    /// PostgreSQL process ID
153    pub process_id: String,
154    /// Database user (if available)
155    pub user: Option<String>,
156    /// Database name (if available)
157    pub database: Option<String>,
158    /// Client host address (if available)
159    pub client_host: Option<String>,
160    /// Application name (if available)
161    pub application_name: Option<String>,
162    /// Type/level of the log message
163    pub message_type: LogLevel,
164    /// The main log message content
165    pub message: String,
166    /// SQL query (if this is a statement log)
167    pub queries: Option<Vec<Query>>,
168    /// Query duration in milliseconds (if available)
169    pub duration: Option<f64>,
170}
171
172impl LogEntry {
173    /// Create a new LogEntry with required fields
174    pub fn new(
175        timestamp: DateTime<Utc>,
176        process_id: String,
177        message_type: LogLevel,
178        message: String,
179    ) -> Self {
180        Self {
181            timestamp,
182            process_id,
183            user: None,
184            database: None,
185            client_host: None,
186            application_name: None,
187            message_type,
188            message,
189            queries: None,
190            duration: None,
191        }
192    }
193
194    /// Check if this log entry represents a query statement
195    pub fn is_query(&self) -> bool {
196        matches!(self.message_type, LogLevel::Statement)
197    }
198
199    /// Check if this log entry represents a duration measurement
200    pub fn is_duration(&self) -> bool {
201        matches!(self.message_type, LogLevel::Duration)
202    }
203
204    /// Check if this log entry represents an error
205    pub fn is_error(&self) -> bool {
206        matches!(self.message_type, LogLevel::Error)
207    }
208
209    /// Get the normalized query (for deduplication)
210    pub fn normalized_query(&self) -> Option<String> {
211        let mut normalized_query: Option<String> = None;
212        if self.is_query() {
213            if let Some(queries) = &self.queries {
214                for query in queries {
215                    normalized_query = match normalized_query {
216                        Some(ref mut s) => Some(format!("{};{}", s, query.normalized_query)),
217                        None => Some(query.normalized_query.clone()),
218                    };
219                }
220            }
221        }
222        normalized_query
223    }
224}
225
226/// Contains aggregated statistics from log analysis
227#[derive(Debug, Clone, Serialize, Deserialize)]
228pub struct AnalysisResult {
229    /// Total number of queries processed
230    pub total_queries: u64,
231    /// Total duration of all queries in milliseconds
232    pub total_duration: f64,
233    /// Count of queries by type (SELECT, INSERT, UPDATE, DELETE, etc.)
234    pub query_types: HashMap<String, u64>,
235    /// Slowest queries with their durations
236    pub slowest_queries: Vec<(String, f64)>,
237    /// Most frequent queries with their counts
238    pub most_frequent_queries: Vec<(String, u64)>,
239    /// Total number of error messages
240    pub error_count: u64,
241    /// Total number of connection events
242    pub connection_count: u64,
243    /// Average query duration in milliseconds
244    pub average_duration: f64,
245    /// 95th percentile query duration in milliseconds
246    pub p95_duration: f64,
247    /// 99th percentile query duration in milliseconds
248    pub p99_duration: f64,
249}
250
251impl AnalysisResult {
252    /// Create a new empty AnalysisResult
253    pub fn new() -> Self {
254        Self {
255            total_queries: 0,
256            total_duration: 0.0,
257            query_types: HashMap::new(),
258            slowest_queries: Vec::new(),
259            most_frequent_queries: Vec::new(),
260            error_count: 0,
261            connection_count: 0,
262            average_duration: 0.0,
263            p95_duration: 0.0,
264            p99_duration: 0.0,
265        }
266    }
267
268    /// Add a query to the analysis
269    pub fn add_query(&mut self, query: &str, duration: f64) {
270        self.total_queries += 1;
271        self.total_duration += duration;
272
273        // Update query type count
274        let query_type = self.extract_query_type(query);
275        *self.query_types.entry(query_type).or_insert(0) += 1;
276
277        // Update average duration
278        self.average_duration = self.total_duration / self.total_queries as f64;
279    }
280
281    /// Add an error to the count
282    pub fn add_error(&mut self) {
283        self.error_count += 1;
284    }
285
286    /// Add a connection event to the count
287    pub fn add_connection(&mut self) {
288        self.connection_count += 1;
289    }
290
291    /// Extract the query type from a SQL query
292    fn extract_query_type(&self, query: &str) -> String {
293        let query_upper = query.trim().to_uppercase();
294        if query_upper.starts_with("SELECT") {
295            "SELECT".to_string()
296        } else if query_upper.starts_with("INSERT") {
297            "INSERT".to_string()
298        } else if query_upper.starts_with("UPDATE") {
299            "UPDATE".to_string()
300        } else if query_upper.starts_with("DELETE") {
301            "DELETE".to_string()
302        } else if query_upper.starts_with("CREATE") {
303            "CREATE".to_string()
304        } else if query_upper.starts_with("DROP") {
305            "DROP".to_string()
306        } else if query_upper.starts_with("ALTER") {
307            "ALTER".to_string()
308        } else if query_upper.starts_with("BEGIN")
309            || query_upper.starts_with("COMMIT")
310            || query_upper.starts_with("ROLLBACK")
311        {
312            "TRANSACTION".to_string()
313        } else {
314            "OTHER".to_string()
315        }
316    }
317
318    /// Calculate percentiles from a list of durations
319    pub fn calculate_percentiles(&mut self, durations: &[f64]) {
320        if durations.is_empty() {
321            return;
322        }
323
324        let mut sorted_durations = durations.to_vec();
325        sorted_durations.sort_by(|a, b| a.partial_cmp(b).unwrap());
326
327        let len = sorted_durations.len();
328        let p95_index = (len as f64 * 0.95) as usize;
329        let p99_index = (len as f64 * 0.99) as usize;
330
331        self.p95_duration = sorted_durations[p95_index.min(len - 1)];
332        self.p99_duration = sorted_durations[p99_index.min(len - 1)];
333    }
334}
335
336impl Default for AnalysisResult {
337    fn default() -> Self {
338        Self::new()
339    }
340}
341
342/// Result type alias for pg-loggrep operations
343pub type Result<T> = std::result::Result<T, PgLogstatsError>;
344
345/// Helper function to create parse errors with context
346pub fn parse_error(
347    message: &str,
348    line_number: Option<usize>,
349    line_content: Option<&str>,
350) -> PgLogstatsError {
351    PgLogstatsError::Parse {
352        message: message.to_string(),
353        line_number,
354        line_content: line_content.map(|s| s.to_string()),
355    }
356}
357
358/// Helper function to create timestamp parse errors
359pub fn timestamp_error(message: &str, timestamp_string: &str) -> PgLogstatsError {
360    PgLogstatsError::TimestampParse {
361        message: message.to_string(),
362        timestamp_string: timestamp_string.to_string(),
363    }
364}
365
366/// Helper function to create configuration errors
367pub fn config_error(message: &str, field: Option<&str>) -> PgLogstatsError {
368    PgLogstatsError::Configuration {
369        message: message.to_string(),
370        field: field.map(|s| s.to_string()),
371    }
372}
373
374/// Helper function to create analytics errors
375pub fn analytics_error(message: &str, operation: &str) -> PgLogstatsError {
376    PgLogstatsError::Analytics {
377        message: message.to_string(),
378        operation: operation.to_string(),
379    }
380}