use crate::{timestamp_error, LogEntry, LogLevel, PgLogstatsError, Result};
use chrono::{DateTime, Utc};
use regex::Regex;
pub struct StderrParser {
pub log_line_regex: Regex,
duration_regex: Regex,
parameter_regex: Regex,
pending_statement: Option<PendingStatement>,
}
#[derive(Debug)]
struct PendingStatement {
timestamp: DateTime<Utc>,
process_id: String,
user: String,
database: String,
application_name: String,
query: String,
line_count: usize,
}
impl StderrParser {
pub fn new() -> Self {
Self {
log_line_regex: Regex::new(
r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}(?:\.\d+)?) (\w+) \[(\d+)\] ([^@]+)@([^ ]+) ([^:]+): (\w+):\s*(.+)$"
).unwrap(),
duration_regex: Regex::new(r"duration: ([\d.]+) ms").unwrap(),
parameter_regex: Regex::new(r"\$(\d+)").unwrap(),
pending_statement: None,
}
}
pub fn parse_line(&mut self, line: &str) -> Result<Option<LogEntry>> {
let line = line.trim();
if line.is_empty() {
return Ok(None);
}
if !line.chars().next().unwrap_or(' ').is_ascii_digit() {
return self.handle_continuation_line(line);
}
if let Some(captures) = self.log_line_regex.captures(line) {
return self.parse_standard_format(&captures, line);
}
Ok(None)
}
pub fn parse_lines(&self, lines: &[String]) -> Result<Vec<LogEntry>> {
let mut parser = StderrParser::new();
let mut entries = Vec::new();
let mut errors = Vec::new();
for (line_number, line) in lines.iter().enumerate() {
match parser.parse_line(line) {
Ok(Some(entry)) => entries.push(entry),
Ok(None) => {
}
Err(e) => {
errors.push(format!("Line {}: {}", line_number + 1, e));
}
}
}
if let Some(pending) = parser.pending_statement.take() {
entries.push(LogEntry {
timestamp: pending.timestamp,
process_id: pending.process_id,
user: Some(pending.user),
database: Some(pending.database),
client_host: None,
application_name: Some(pending.application_name),
message_type: LogLevel::Statement,
message: format!("statement: {}", pending.query),
queries: crate::Query::from_sql(&pending.query).ok(),
duration: None,
});
}
if !errors.is_empty() {
return Err(PgLogstatsError::Parse {
message: format!(
"Failed to parse {} lines: {}",
errors.len(),
errors.join("; ")
),
line_number: None,
line_content: None,
});
}
Ok(entries)
}
fn parse_standard_format(
&mut self,
captures: ®ex::Captures,
_original_line: &str,
) -> Result<Option<LogEntry>> {
let timestamp_str = captures.get(1).unwrap().as_str();
let timezone = captures.get(2).unwrap().as_str();
let process_id = captures.get(3).unwrap().as_str();
let user = captures.get(4).unwrap().as_str();
let database = captures.get(5).unwrap().as_str();
let app_name = captures.get(6).unwrap().as_str();
let log_level = captures.get(7).unwrap().as_str();
let message = captures.get(8).unwrap().as_str();
let timestamp = self.parse_timestamp(timestamp_str, timezone)?;
let message_type = LogLevel::from(log_level);
let actual_message_type = if message.starts_with("statement: ") {
LogLevel::Statement
} else if message.starts_with("duration: ") {
LogLevel::Duration
} else {
message_type
};
match actual_message_type {
LogLevel::Statement => self
.handle_statement_message(timestamp, process_id, user, database, app_name, message),
LogLevel::Duration => self
.handle_duration_message(timestamp, process_id, user, database, app_name, message),
_ => {
let entry = LogEntry {
timestamp,
process_id: process_id.to_string(),
user: Some(user.to_string()),
database: Some(database.to_string()),
client_host: None,
application_name: Some(app_name.to_string()),
message_type: actual_message_type,
message: message.to_string(),
queries: None,
duration: None,
};
Ok(Some(entry))
}
}
}
fn handle_statement_message(
&mut self,
timestamp: DateTime<Utc>,
process_id: &str,
user: &str,
database: &str,
app_name: &str,
message: &str,
) -> Result<Option<LogEntry>> {
let query = if message.starts_with("statement: ") {
&message[10..]
} else {
message
};
let queries = crate::Query::from_sql(query);
let normalized_queries = queries.ok();
let entry = LogEntry {
timestamp,
process_id: process_id.to_string(),
user: Some(user.to_string()),
database: Some(database.to_string()),
client_host: None,
application_name: Some(app_name.to_string()),
message_type: LogLevel::Statement,
message: format!("statement: {}", query),
queries: normalized_queries,
duration: None,
};
Ok(Some(entry))
}
fn handle_duration_message(
&mut self,
timestamp: DateTime<Utc>,
process_id: &str,
user: &str,
database: &str,
app_name: &str,
message: &str,
) -> Result<Option<LogEntry>> {
if let Some(duration) = self.extract_duration(message) {
let entry = LogEntry {
timestamp,
process_id: process_id.to_string(),
user: Some(user.to_string()),
database: Some(database.to_string()),
client_host: None,
application_name: Some(app_name.to_string()),
message_type: LogLevel::Duration,
message: message.to_string(),
queries: None,
duration: Some(duration),
};
Ok(Some(entry))
} else {
let entry = LogEntry {
timestamp,
process_id: process_id.to_string(),
user: Some(user.to_string()),
database: Some(database.to_string()),
client_host: None,
application_name: Some(app_name.to_string()),
message_type: LogLevel::Duration,
message: message.to_string(),
queries: None,
duration: None,
};
Ok(Some(entry))
}
}
fn handle_continuation_line(&mut self, line: &str) -> Result<Option<LogEntry>> {
if let Some(pending) = &mut self.pending_statement {
pending.query.push(' ');
pending.query.push_str(line);
pending.line_count += 1;
Ok(None)
} else {
Ok(None)
}
}
pub fn parse_timestamp(&self, timestamp_str: &str, _timezone: &str) -> Result<DateTime<Utc>> {
if let Ok(dt) =
DateTime::parse_from_str(&format!("{} UTC", timestamp_str), "%Y-%m-%d %H:%M:%S%.f %Z")
{
return Ok(dt.with_timezone(&Utc));
}
if let Ok(dt) =
DateTime::parse_from_str(&format!("{} UTC", timestamp_str), "%Y-%m-%d %H:%M:%S %Z")
{
return Ok(dt.with_timezone(&Utc));
}
if let Ok(naive_dt) =
chrono::NaiveDateTime::parse_from_str(timestamp_str, "%Y-%m-%d %H:%M:%S%.f")
{
return Ok(DateTime::from_naive_utc_and_offset(naive_dt, Utc));
}
if let Ok(naive_dt) =
chrono::NaiveDateTime::parse_from_str(timestamp_str, "%Y-%m-%d %H:%M:%S")
{
return Ok(DateTime::from_naive_utc_and_offset(naive_dt, Utc));
}
Err(timestamp_error("Failed to parse timestamp", timestamp_str))
}
pub fn extract_duration(&self, message: &str) -> Option<f64> {
self.duration_regex
.captures(message)
.and_then(|captures| captures.get(1))
.and_then(|m| m.as_str().parse::<f64>().ok())
}
pub fn duration_regex(&self) -> &Regex {
&self.duration_regex
}
pub fn parameter_regex(&self) -> &Regex {
&self.parameter_regex
}
}
impl Default for StderrParser {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_simple_statement() {
let mut parser = StderrParser::new();
let line = "2024-08-14 10:30:15.123 UTC [12345] postgres@testdb psql: LOG: statement: SELECT * FROM users WHERE active = true;";
let result = parser.parse_line(line).unwrap();
assert!(result.is_some());
let entry = result.unwrap();
assert_eq!(entry.process_id, "12345");
assert_eq!(entry.user, Some("postgres".to_string()));
assert_eq!(entry.database, Some("testdb".to_string()));
assert_eq!(entry.application_name, Some("psql".to_string()));
assert_eq!(entry.message_type, LogLevel::Statement);
assert!(entry.queries.is_some());
assert_eq!(entry.queries.as_ref().unwrap().len(), 1);
assert_eq!(
entry.queries.as_ref().unwrap()[0].normalized_query,
"SELECT * FROM users WHERE active = ?"
);
}
#[test]
fn test_parse_duration() {
let mut parser = StderrParser::new();
let line =
"2024-08-14 10:30:15.456 UTC [12345] postgres@testdb psql: LOG: duration: 45.123 ms";
let result = parser.parse_line(line).unwrap();
assert!(result.is_some());
let entry = result.unwrap();
assert_eq!(entry.message_type, LogLevel::Duration);
assert_eq!(entry.duration, Some(45.123));
}
#[test]
fn test_parse_error() {
let mut parser = StderrParser::new();
let line = "2024-08-14 10:30:16.789 UTC [12346] admin@analytics pgbench: ERROR: relation \"missing_table\" does not exist";
let result = parser.parse_line(line).unwrap();
assert!(result.is_some());
let entry = result.unwrap();
assert_eq!(entry.message_type, LogLevel::Error);
assert_eq!(entry.user, Some("admin".to_string()));
assert_eq!(entry.database, Some("analytics".to_string()));
assert_eq!(entry.application_name, Some("pgbench".to_string()));
assert!(entry
.message
.contains("relation \"missing_table\" does not exist"));
}
#[test]
fn test_parse_parameterized_query() {
let mut parser = StderrParser::new();
let line = "2024-08-14 10:30:17.012 UTC [12347] postgres@testdb psql: LOG: statement: UPDATE products SET price = $1 WHERE id = $2";
let result = parser.parse_line(line).unwrap();
assert!(result.is_some());
let entry = result.unwrap();
assert_eq!(entry.message_type, LogLevel::Statement);
assert!(entry.queries.is_some());
assert_eq!(entry.queries.as_ref().unwrap().len(), 1);
assert_eq!(
entry.queries.as_ref().unwrap()[0].sql,
"UPDATE products SET price = $1 WHERE id = $2"
);
}
#[test]
fn test_parse_multi_line_statement() {
let lines = [
"2024-08-14 10:30:18.000 UTC [12348] postgres@testdb psql: LOG: statement: SELECT u.name, p.title",
" FROM users u",
" JOIN posts p ON u.id = p.user_id",
" WHERE u.active = true",
" ORDER BY p.created_at DESC;",
"2024-08-14 10:30:18.123 UTC [12348] postgres@testdb psql: LOG: duration: 12.345 ms",
];
let parser = StderrParser::new();
let result = parser.parse_lines(&lines.iter().map(|s| s.to_string()).collect::<Vec<_>>());
assert!(result.is_ok());
let entries = result.unwrap();
assert_eq!(entries.len(), 2); let statement_entry = &entries[0];
let duration_entry = &entries[1];
assert_eq!(statement_entry.message_type, LogLevel::Statement);
assert_eq!(duration_entry.message_type, LogLevel::Duration);
assert_eq!(duration_entry.duration, Some(12.345));
assert!(statement_entry.queries.is_some());
assert_eq!(statement_entry.queries.as_ref().unwrap().len(), 1);
assert!(statement_entry.queries.as_ref().unwrap()[0]
.normalized_query
.contains("SELECT u.name, p.title"));
}
#[test]
fn test_parse_empty_line() {
let mut parser = StderrParser::new();
let result = parser.parse_line("").unwrap();
assert!(result.is_none());
}
#[test]
fn test_parse_unparseable_line() {
let mut parser = StderrParser::new();
let result = parser
.parse_line("This is not a PostgreSQL log line")
.unwrap();
assert!(result.is_none());
}
#[test]
fn test_parse_lines_with_errors() {
let lines = [
"2024-08-14 10:30:15.123 UTC [12345] postgres@testdb psql: LOG: statement: SELECT * FROM users;",
"This is not a valid log line",
"2024-08-14 10:30:15.456 UTC [12345] postgres@testdb psql: LOG: duration: 45.123 ms",
];
let parser = StderrParser::new();
let result = parser.parse_lines(&lines.iter().map(|s| s.to_string()).collect::<Vec<_>>());
assert!(result.is_ok());
let entries = result.unwrap();
assert_eq!(entries.len(), 2); }
#[test]
fn test_timestamp_parsing() {
let parser = StderrParser::new();
let timestamp_str = "2024-08-14 10:30:15.123";
let timezone = "UTC";
let result = parser.parse_timestamp(timestamp_str, timezone);
println!("Timestamp parsing result: {:?}", result);
assert!(result.is_ok());
}
#[test]
fn test_regex_matching() {
let parser = StderrParser::new();
let line = "2024-08-14 10:30:15.123 UTC [12345] postgres@testdb psql: LOG: statement: SELECT * FROM users WHERE active = true;";
if let Some(captures) = parser.log_line_regex.captures(line) {
println!("Regex matched!");
println!("Timestamp: {}", captures.get(1).unwrap().as_str());
println!("Timezone: {}", captures.get(2).unwrap().as_str());
println!("Process ID: {}", captures.get(3).unwrap().as_str());
println!("User: {}", captures.get(4).unwrap().as_str());
println!("Database: {}", captures.get(5).unwrap().as_str());
println!("App: {}", captures.get(6).unwrap().as_str());
println!("Level: {}", captures.get(7).unwrap().as_str());
println!("Message: {}", captures.get(8).unwrap().as_str());
} else {
println!("Regex did not match!");
println!("Line: {}", line);
}
}
}