1use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use thiserror::Error;
11
12pub mod analytics;
13pub mod correlation;
14pub mod events;
15pub mod findings;
16pub mod output;
17pub mod parsers;
18pub mod sql;
19
20pub use analytics::{QueryAnalyzer, TimingAnalysis, TimingAnalyzer};
22pub use correlation::{
23 correlate_query_executions, CorrelationConfidence, Correlator, ProcessOrderCorrelator,
24 QueryExecution, QueryFamilyIdentity,
25};
26pub use events::{
27 normalize_log_entries, DurationEvent, ErrorEvent, EventKind, EventSourceKind, NormalizedEvent,
28 SessionIdentity, SourceReference, StatementEvent,
29};
30pub use findings::{
31 query_family_findings, slow_query_diff_findings, ComparisonMetrics, DeltaMetrics, Finding,
32 FindingConfidence, FindingKind, FindingMetrics, FindingSet, QueryFamilyFinding, ReasonCode,
33 SlowQueryDiffOptions, FINDING_SCHEMA_VERSION,
34};
35pub use output::{JsonFormatter, TextFormatter};
36pub use parsers::StderrParser;
37pub use sql::{Query, QueryType};
38
39#[derive(Error, Debug)]
41pub enum PgLogstatsError {
42 #[error("I/O error: {0}")]
44 Io(#[from] std::io::Error),
45
46 #[error("Parse error: {message}")]
48 Parse {
49 message: String,
50 line_number: Option<usize>,
51 line_content: Option<String>,
52 },
53
54 #[error("Timestamp parse error: {message}")]
56 TimestampParse {
57 message: String,
58 timestamp_string: String,
59 },
60
61 #[error("Configuration error: {message}")]
63 Configuration {
64 message: String,
65 field: Option<String>,
66 },
67
68 #[error("Analytics error: {message}")]
70 Analytics { message: String, operation: String },
71
72 #[error("Serialization error: {0}")]
74 Serialization(#[from] serde_json::Error),
75
76 #[error("Unexpected error: {message}")]
78 Unexpected {
79 message: String,
80 context: Option<String>,
81 },
82}
83
84#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
86pub enum LogLevel {
87 Error,
89 Warning,
91 Info,
93 Debug,
95 Notice,
97 Log,
99 Statement,
101 Duration,
103 Fatal,
105 Panic,
107 Unknown(String),
109}
110
111impl std::fmt::Display for LogLevel {
112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113 match self {
114 LogLevel::Error => write!(f, "ERROR"),
115 LogLevel::Warning => write!(f, "WARNING"),
116 LogLevel::Info => write!(f, "INFO"),
117 LogLevel::Debug => write!(f, "DEBUG"),
118 LogLevel::Notice => write!(f, "NOTICE"),
119 LogLevel::Log => write!(f, "LOG"),
120 LogLevel::Statement => write!(f, "STATEMENT"),
121 LogLevel::Duration => write!(f, "DURATION"),
122 LogLevel::Fatal => write!(f, "FATAL"),
123 LogLevel::Panic => write!(f, "PANIC"),
124 LogLevel::Unknown(s) => write!(f, "{}", s.to_uppercase()),
125 }
126 }
127}
128
129impl From<&str> for LogLevel {
130 fn from(s: &str) -> Self {
131 match s.to_uppercase().as_str() {
132 "ERROR" => LogLevel::Error,
133 "WARNING" => LogLevel::Warning,
134 "INFO" => LogLevel::Info,
135 "DEBUG" => LogLevel::Debug,
136 "NOTICE" => LogLevel::Notice,
137 "LOG" => LogLevel::Log,
138 "STATEMENT" => LogLevel::Statement,
139 "DURATION" => LogLevel::Duration,
140 "FATAL" => LogLevel::Fatal,
141 "PANIC" => LogLevel::Panic,
142 _ => LogLevel::Unknown(s.to_string()),
143 }
144 }
145}
146
147#[derive(Debug, Clone, Serialize, Deserialize)]
149pub struct LogEntry {
150 pub timestamp: DateTime<Utc>,
152 pub process_id: String,
154 pub user: Option<String>,
156 pub database: Option<String>,
158 pub client_host: Option<String>,
160 pub application_name: Option<String>,
162 pub message_type: LogLevel,
164 pub message: String,
166 pub queries: Option<Vec<Query>>,
168 pub duration: Option<f64>,
170}
171
172impl LogEntry {
173 pub fn new(
175 timestamp: DateTime<Utc>,
176 process_id: String,
177 message_type: LogLevel,
178 message: String,
179 ) -> Self {
180 Self {
181 timestamp,
182 process_id,
183 user: None,
184 database: None,
185 client_host: None,
186 application_name: None,
187 message_type,
188 message,
189 queries: None,
190 duration: None,
191 }
192 }
193
194 pub fn is_query(&self) -> bool {
196 matches!(self.message_type, LogLevel::Statement)
197 }
198
199 pub fn is_duration(&self) -> bool {
201 matches!(self.message_type, LogLevel::Duration)
202 }
203
204 pub fn is_error(&self) -> bool {
206 matches!(self.message_type, LogLevel::Error)
207 }
208
209 pub fn normalized_query(&self) -> Option<String> {
211 let mut normalized_query: Option<String> = None;
212 if self.is_query() {
213 if let Some(queries) = &self.queries {
214 for query in queries {
215 normalized_query = match normalized_query {
216 Some(ref mut s) => Some(format!("{};{}", s, query.normalized_query)),
217 None => Some(query.normalized_query.clone()),
218 };
219 }
220 }
221 }
222 normalized_query
223 }
224}
225
226#[derive(Debug, Clone, Serialize, Deserialize)]
228pub struct AnalysisResult {
229 pub total_queries: u64,
231 pub total_duration: f64,
233 pub query_types: HashMap<String, u64>,
235 pub slowest_queries: Vec<(String, f64)>,
237 pub most_frequent_queries: Vec<(String, u64)>,
239 pub error_count: u64,
241 pub connection_count: u64,
243 pub average_duration: f64,
245 pub p95_duration: f64,
247 pub p99_duration: f64,
249}
250
251impl AnalysisResult {
252 pub fn new() -> Self {
254 Self {
255 total_queries: 0,
256 total_duration: 0.0,
257 query_types: HashMap::new(),
258 slowest_queries: Vec::new(),
259 most_frequent_queries: Vec::new(),
260 error_count: 0,
261 connection_count: 0,
262 average_duration: 0.0,
263 p95_duration: 0.0,
264 p99_duration: 0.0,
265 }
266 }
267
268 pub fn add_query(&mut self, query: &str, duration: f64) {
270 self.total_queries += 1;
271 self.total_duration += duration;
272
273 let query_type = self.extract_query_type(query);
275 *self.query_types.entry(query_type).or_insert(0) += 1;
276
277 self.average_duration = self.total_duration / self.total_queries as f64;
279 }
280
281 pub fn add_error(&mut self) {
283 self.error_count += 1;
284 }
285
286 pub fn add_connection(&mut self) {
288 self.connection_count += 1;
289 }
290
291 fn extract_query_type(&self, query: &str) -> String {
293 let query_upper = query.trim().to_uppercase();
294 if query_upper.starts_with("SELECT") {
295 "SELECT".to_string()
296 } else if query_upper.starts_with("INSERT") {
297 "INSERT".to_string()
298 } else if query_upper.starts_with("UPDATE") {
299 "UPDATE".to_string()
300 } else if query_upper.starts_with("DELETE") {
301 "DELETE".to_string()
302 } else if query_upper.starts_with("CREATE") {
303 "CREATE".to_string()
304 } else if query_upper.starts_with("DROP") {
305 "DROP".to_string()
306 } else if query_upper.starts_with("ALTER") {
307 "ALTER".to_string()
308 } else if query_upper.starts_with("BEGIN")
309 || query_upper.starts_with("COMMIT")
310 || query_upper.starts_with("ROLLBACK")
311 {
312 "TRANSACTION".to_string()
313 } else {
314 "OTHER".to_string()
315 }
316 }
317
318 pub fn calculate_percentiles(&mut self, durations: &[f64]) {
320 if durations.is_empty() {
321 return;
322 }
323
324 let mut sorted_durations = durations.to_vec();
325 sorted_durations.sort_by(|a, b| a.partial_cmp(b).unwrap());
326
327 let len = sorted_durations.len();
328 let p95_index = (len as f64 * 0.95) as usize;
329 let p99_index = (len as f64 * 0.99) as usize;
330
331 self.p95_duration = sorted_durations[p95_index.min(len - 1)];
332 self.p99_duration = sorted_durations[p99_index.min(len - 1)];
333 }
334}
335
336impl Default for AnalysisResult {
337 fn default() -> Self {
338 Self::new()
339 }
340}
341
342pub type Result<T> = std::result::Result<T, PgLogstatsError>;
344
345pub fn parse_error(
347 message: &str,
348 line_number: Option<usize>,
349 line_content: Option<&str>,
350) -> PgLogstatsError {
351 PgLogstatsError::Parse {
352 message: message.to_string(),
353 line_number,
354 line_content: line_content.map(|s| s.to_string()),
355 }
356}
357
358pub fn timestamp_error(message: &str, timestamp_string: &str) -> PgLogstatsError {
360 PgLogstatsError::TimestampParse {
361 message: message.to_string(),
362 timestamp_string: timestamp_string.to_string(),
363 }
364}
365
366pub fn config_error(message: &str, field: Option<&str>) -> PgLogstatsError {
368 PgLogstatsError::Configuration {
369 message: message.to_string(),
370 field: field.map(|s| s.to_string()),
371 }
372}
373
374pub fn analytics_error(message: &str, operation: &str) -> PgLogstatsError {
376 PgLogstatsError::Analytics {
377 message: message.to_string(),
378 operation: operation.to_string(),
379 }
380}