use clap::{Args, Parser, Subcommand, ValueEnum};
use indicatif::{ProgressBar, ProgressStyle};
use log::{debug, error, info, warn};
use pg_logstats::{
normalize_log_entries, query_family_findings, slow_query_diff_findings, Correlator,
EventSourceKind, Finding, FindingSet, JsonFormatter, PgLogstatsError, ProcessOrderCorrelator,
Result, SlowQueryDiffOptions, StderrParser, TextFormatter,
};
use serde_json::json;
use std::fs;
use std::path::{Path, PathBuf};
use std::process;
use std::time::Instant;
#[derive(Debug, Parser)]
#[clap(
name = "pg-logstats",
version,
about = "A PostgreSQL log investigation CLI for top query families, slow-query diffs, and follow-up SQL"
)]
struct Arguments {
#[clap(subcommand)]
command: Command,
#[clap(long, global = true, value_enum, default_value = "text")]
output_format: OutputFormat,
#[clap(short = 'o', long, global = true, value_name = "PATH")]
outfile: Option<String>,
#[clap(short = 'O', long, global = true, value_name = "DIR")]
outdir: Option<String>,
#[clap(short = 'q', long, global = true)]
quiet: bool,
}
#[derive(Debug, Args)]
struct LogInputArgs {
#[clap(long, value_name = "DIR")]
log_dir: Option<PathBuf>,
#[clap(long, value_name = "N")]
sample_size: Option<usize>,
#[clap(short = 'L', long, value_name = "logfile-list")]
logfile_list: Option<String>,
#[clap(value_name = "LOG_FILES")]
log_files: Vec<String>,
}
#[derive(Debug, Subcommand)]
enum Command {
Top {
#[clap(subcommand)]
command: TopCommand,
},
SlowQueries {
#[clap(subcommand)]
command: SlowQueriesCommand,
},
SuggestSql {
#[clap(long, value_name = "PATH")]
findings_file: PathBuf,
#[clap(long, value_name = "FINDING_ID", conflicts_with = "rank")]
finding_id: Option<String>,
#[clap(long, value_name = "N", conflicts_with = "finding_id")]
rank: Option<usize>,
},
}
#[derive(Debug, Subcommand)]
enum TopCommand {
QueryFamilies {
#[clap(long, default_value_t = 10)]
limit: usize,
#[clap(flatten)]
input: LogInputArgs,
},
}
#[derive(Debug, Subcommand)]
enum SlowQueriesCommand {
Diff {
#[clap(long, value_name = "PATH")]
baseline: PathBuf,
#[clap(long, value_name = "PATH")]
target: PathBuf,
#[clap(long, value_name = "N")]
sample_size: Option<usize>,
#[clap(long, default_value_t = 10)]
limit: usize,
#[clap(long, default_value_t = 1)]
min_target_count: u64,
#[clap(long, default_value_t = 0.0)]
min_target_total_ms: f64,
#[clap(long, default_value_t = 0.0)]
min_p95_delta_ms: f64,
},
}
#[derive(Debug, ValueEnum, Clone, Copy)]
enum OutputFormat {
Text,
Json,
}
#[derive(Debug, ValueEnum, Clone, Copy, Default)]
enum Format {
#[default]
Syslog,
Syslog2,
Stderr,
Jsonlog,
Cvs,
Pgbouncer,
Logplex,
Rds,
Redshift,
}
fn main() -> Result<()> {
env_logger::init();
let args = Arguments::parse();
let start_time = Instant::now();
validate_arguments(&args)?;
let parser = initialize_parser(&args)?;
run_command(&args, &parser)?;
let elapsed = start_time.elapsed();
if !args.quiet {
println!("Analysis completed in {:.2}s", elapsed.as_secs_f64());
}
Ok(())
}
fn run_command(args: &Arguments, parser: &StderrParser) -> Result<()> {
match &args.command {
Command::Top {
command: TopCommand::QueryFamilies { limit, input },
} => run_top_query_families_command(args, parser, input, *limit),
Command::SlowQueries {
command:
SlowQueriesCommand::Diff {
baseline,
target,
sample_size,
limit,
min_target_count,
min_target_total_ms,
min_p95_delta_ms,
},
} => run_slow_queries_diff_command(
args,
parser,
baseline,
target,
*sample_size,
SlowQueryDiffOptions {
limit: *limit,
min_target_count: *min_target_count,
min_target_total_ms: *min_target_total_ms,
min_p95_delta_ms: *min_p95_delta_ms,
},
),
Command::SuggestSql {
findings_file,
finding_id,
rank,
} => run_suggest_sql_command(args, findings_file, finding_id.as_deref(), *rank),
}
}
fn load_default_log_entries(
args: &Arguments,
input: &LogInputArgs,
parser: &StderrParser,
) -> Result<Vec<pg_logstats::LogEntry>> {
let progress_bar = if !args.quiet {
Some(create_progress_bar())
} else {
None
};
let log_files = discover_log_files(input)?;
if log_files.is_empty() {
error!("No log files found to process");
process::exit(1);
}
info!("Found {} log files to process", log_files.len());
let mut all_entries = Vec::new();
for (index, log_file) in log_files.iter().enumerate() {
if let Some(pb) = &progress_bar {
pb.set_message(format!("Processing {}", log_file.display()));
pb.set_position(index as u64);
}
match process_log_file(log_file, parser, input.sample_size) {
Ok(mut entries) => {
info!(
"Processed {} entries from {}",
entries.len(),
log_file.display()
);
all_entries.append(&mut entries);
}
Err(e) => {
warn!("Failed to process {}: {}", log_file.display(), e);
continue;
}
}
}
if let Some(pb) = &progress_bar {
pb.finish_with_message("File processing complete");
}
if all_entries.is_empty() {
warn!("No log entries were successfully parsed");
process::exit(1);
}
info!("Total entries parsed: {}", all_entries.len());
Ok(all_entries)
}
fn run_top_query_families_command(
args: &Arguments,
parser: &StderrParser,
input: &LogInputArgs,
limit: usize,
) -> Result<()> {
let all_entries = load_default_log_entries(args, input, parser)?;
let findings = run_top_query_families(&all_entries, limit)?;
output_findings(&findings, args, &all_entries)
}
fn run_slow_queries_diff_command(
args: &Arguments,
parser: &StderrParser,
baseline: &Path,
target: &Path,
sample_size: Option<usize>,
options: SlowQueryDiffOptions,
) -> Result<()> {
let (findings, total_entries) =
run_slow_queries_diff(baseline, target, parser, sample_size, options)?;
output_findings_with_entry_count(&findings, args, total_entries)
}
fn validate_arguments(args: &Arguments) -> Result<()> {
match &args.command {
Command::Top {
command: TopCommand::QueryFamilies { input, .. },
} => validate_log_input_args(input)?,
Command::SlowQueries {
command: SlowQueriesCommand::Diff { sample_size, .. },
} => validate_sample_size(*sample_size)?,
Command::SuggestSql {
findings_file,
finding_id,
rank,
} => validate_suggest_sql_args(findings_file, finding_id.as_deref(), *rank)?,
}
if let Some(outdir) = &args.outdir {
let outdir_path = Path::new(outdir);
if outdir_path.exists() && !outdir_path.is_dir() {
return Err(PgLogstatsError::Configuration {
message: format!(
"Output directory path exists but is not a directory: {}",
outdir
),
field: Some("outdir".to_string()),
});
}
}
Ok(())
}
fn validate_log_input_args(input: &LogInputArgs) -> Result<()> {
if let Some(log_dir) = &input.log_dir {
if !log_dir.exists() {
return Err(PgLogstatsError::Configuration {
message: format!("Log directory does not exist: {}", log_dir.display()),
field: Some("log_dir".to_string()),
});
}
if !log_dir.is_dir() {
return Err(PgLogstatsError::Configuration {
message: format!(
"Log directory path is not a directory: {}",
log_dir.display()
),
field: Some("log_dir".to_string()),
});
}
match fs::read_dir(log_dir) {
Ok(_) => {}
Err(e) => {
return Err(PgLogstatsError::Configuration {
message: format!("Cannot read log directory {}: {}", log_dir.display(), e),
field: Some("log_dir".to_string()),
});
}
}
}
validate_sample_size(input.sample_size)
}
fn validate_sample_size(sample_size: Option<usize>) -> Result<()> {
if let Some(sample_size) = sample_size {
if sample_size == 0 {
return Err(PgLogstatsError::Configuration {
message: "Sample size must be greater than 0".to_string(),
field: Some("sample_size".to_string()),
});
}
}
Ok(())
}
fn validate_suggest_sql_args(
findings_file: &Path,
finding_id: Option<&str>,
rank: Option<usize>,
) -> Result<()> {
if !findings_file.exists() {
return Err(PgLogstatsError::Configuration {
message: format!("Findings file does not exist: {}", findings_file.display()),
field: Some("findings_file".to_string()),
});
}
if !findings_file.is_file() {
return Err(PgLogstatsError::Configuration {
message: format!("Findings path is not a file: {}", findings_file.display()),
field: Some("findings_file".to_string()),
});
}
if finding_id.is_none() && rank.is_none() {
return Err(PgLogstatsError::Configuration {
message: "Specify either --finding-id or --rank".to_string(),
field: Some("finding_selector".to_string()),
});
}
if matches!(rank, Some(0)) {
return Err(PgLogstatsError::Configuration {
message: "Rank must be greater than 0".to_string(),
field: Some("rank".to_string()),
});
}
Ok(())
}
fn discover_log_files(input: &LogInputArgs) -> Result<Vec<PathBuf>> {
let mut log_files = Vec::new();
if let Some(log_dir) = &input.log_dir {
discover_files_in_directory(log_dir, &mut log_files)?;
}
for file_pattern in &input.log_files {
if let Ok(path) = PathBuf::from(file_pattern).canonicalize() {
if path.is_file() {
log_files.push(path);
}
} else {
let path = Path::new(file_pattern);
if path.exists() && path.is_file() {
log_files.push(path.to_path_buf());
}
}
}
if let Some(logfile_list) = &input.logfile_list {
let list_content = fs::read_to_string(logfile_list).map_err(PgLogstatsError::Io)?;
for line in list_content.lines() {
let line = line.trim();
if !line.is_empty() && !line.starts_with('#') {
let path = Path::new(line);
if path.exists() && path.is_file() {
log_files.push(path.to_path_buf());
}
}
}
}
log_files.sort();
log_files.dedup();
log_files.retain(|path| match fs::metadata(path) {
Ok(metadata) => {
if metadata.len() == 0 {
warn!("Skipping empty log file: {}", path.display());
false
} else {
true
}
}
Err(e) => {
warn!("Cannot read metadata for {}: {}", path.display(), e);
false
}
});
Ok(log_files)
}
fn discover_files_in_directory(dir: &Path, log_files: &mut Vec<PathBuf>) -> Result<()> {
let entries = fs::read_dir(dir)?;
for entry in entries {
let entry = entry?;
let path = entry.path();
if path.is_file() {
if let Some(extension) = path.extension() {
let ext_str = extension.to_string_lossy().to_lowercase();
if ext_str == "log" || ext_str == "txt" {
log_files.push(path);
}
} else if let Some(filename) = path.file_name() {
let filename_str = filename.to_string_lossy().to_lowercase();
if filename_str.contains("postgres") || filename_str.contains("pg") {
log_files.push(path);
}
}
}
}
Ok(())
}
fn discover_log_files_for_path(path: &Path) -> Result<Vec<PathBuf>> {
if !path.exists() {
return Err(PgLogstatsError::Configuration {
message: format!("Log path does not exist: {}", path.display()),
field: Some("path".to_string()),
});
}
let mut log_files = Vec::new();
if path.is_file() {
log_files.push(path.to_path_buf());
} else if path.is_dir() {
discover_files_in_directory(path, &mut log_files)?;
} else {
return Err(PgLogstatsError::Configuration {
message: format!("Log path is neither file nor directory: {}", path.display()),
field: Some("path".to_string()),
});
}
log_files.sort();
log_files.dedup();
Ok(log_files)
}
fn initialize_parser(_args: &Arguments) -> Result<StderrParser> {
debug!("Initializing stderr parser");
Ok(StderrParser::new())
}
fn process_log_file(
log_file: &Path,
parser: &StderrParser,
sample_size: Option<usize>,
) -> Result<Vec<pg_logstats::LogEntry>> {
let content = fs::read_to_string(log_file)?;
let lines: Vec<String> = content.lines().map(|s| s.to_string()).collect();
let lines_to_process = if let Some(sample_size) = sample_size {
if lines.len() > sample_size {
info!(
"Limiting analysis to first {} lines of {}",
sample_size,
log_file.display()
);
&lines[..sample_size]
} else {
&lines
}
} else {
&lines
};
parser.parse_lines(lines_to_process)
}
fn process_log_paths(
path: &Path,
parser: &StderrParser,
sample_size: Option<usize>,
) -> Result<Vec<pg_logstats::LogEntry>> {
let log_files = discover_log_files_for_path(path)?;
if log_files.is_empty() {
return Err(PgLogstatsError::Configuration {
message: format!("No log files found under {}", path.display()),
field: Some("path".to_string()),
});
}
let mut all_entries = Vec::new();
for log_file in log_files {
let mut entries = process_log_file(&log_file, parser, sample_size)?;
all_entries.append(&mut entries);
}
Ok(all_entries)
}
fn run_top_query_families(
entries: &[pg_logstats::LogEntry],
limit: usize,
) -> Result<pg_logstats::FindingSet> {
info!(
"Building top query-family findings from {} entries",
entries.len()
);
let events = normalize_log_entries(entries, EventSourceKind::Stderr);
let executions = ProcessOrderCorrelator.correlate(&events);
Ok(query_family_findings(&executions, limit))
}
fn run_slow_queries_diff(
baseline: &Path,
target: &Path,
parser: &StderrParser,
sample_size: Option<usize>,
options: SlowQueryDiffOptions,
) -> Result<(pg_logstats::FindingSet, usize)> {
info!(
"Building slow-query diff findings from baseline {} and target {}",
baseline.display(),
target.display()
);
let baseline_entries = process_log_paths(baseline, parser, sample_size)?;
let target_entries = process_log_paths(target, parser, sample_size)?;
let baseline_events = normalize_log_entries(&baseline_entries, EventSourceKind::Stderr);
let target_events = normalize_log_entries(&target_entries, EventSourceKind::Stderr);
let baseline_executions = ProcessOrderCorrelator.correlate(&baseline_events);
let target_executions = ProcessOrderCorrelator.correlate(&target_events);
let findings = slow_query_diff_findings(&baseline_executions, &target_executions, options);
let total_entries = baseline_entries.len() + target_entries.len();
Ok((findings, total_entries))
}
fn run_suggest_sql_command(
args: &Arguments,
findings_file: &Path,
finding_id: Option<&str>,
rank: Option<usize>,
) -> Result<()> {
let findings = load_findings_file(findings_file)?;
let finding = select_finding(&findings, finding_id, rank)?;
if finding.next_sql.is_empty() {
return Err(PgLogstatsError::Configuration {
message: format!("No suggested SQL is available for {}", finding.finding_id),
field: Some("finding_id".to_string()),
});
}
output_suggested_sql(args, finding)
}
fn load_findings_file(path: &Path) -> Result<FindingSet> {
let content = fs::read_to_string(path)?;
serde_json::from_str(&content).map_err(PgLogstatsError::Serialization)
}
fn select_finding<'a>(
findings: &'a FindingSet,
finding_id: Option<&str>,
rank: Option<usize>,
) -> Result<&'a Finding> {
if let Some(finding_id) = finding_id {
return findings
.findings
.iter()
.find(|finding| finding.finding_id == finding_id)
.ok_or_else(|| PgLogstatsError::Configuration {
message: format!("Finding id not found: {}", finding_id),
field: Some("finding_id".to_string()),
});
}
if let Some(rank) = rank {
return findings
.findings
.iter()
.find(|finding| finding.rank == rank)
.ok_or_else(|| PgLogstatsError::Configuration {
message: format!("Finding rank not found: {}", rank),
field: Some("rank".to_string()),
});
}
Err(PgLogstatsError::Configuration {
message: "Specify either --finding-id or --rank".to_string(),
field: Some("finding_selector".to_string()),
})
}
fn output_suggested_sql(args: &Arguments, finding: &Finding) -> Result<()> {
match args.output_format {
OutputFormat::Json => {
let output = serde_json::to_string_pretty(&json!({
"finding_id": finding.finding_id,
"rank": finding.rank,
"kind": finding.kind,
"title": finding.title,
"next_sql": finding.next_sql,
}))
.map_err(PgLogstatsError::Serialization)?;
write_or_print_output(output, args)
}
OutputFormat::Text => {
let mut output = String::new();
output.push_str(&format!(
"#{} [{}] {}\n",
finding.rank, finding.finding_id, finding.title
));
for statement in &finding.next_sql {
output.push_str(statement);
output.push('\n');
}
write_or_print_output(output, args)
}
}
}
fn output_findings(
findings: &pg_logstats::FindingSet,
args: &Arguments,
entries: &[pg_logstats::LogEntry],
) -> Result<()> {
output_findings_with_entry_count(findings, args, entries.len())
}
fn output_findings_with_entry_count(
findings: &pg_logstats::FindingSet,
args: &Arguments,
total_log_entries: usize,
) -> Result<()> {
match args.output_format {
OutputFormat::Json => {
let formatter = JsonFormatter::new().with_pretty(true).with_metadata(
env!("CARGO_PKG_VERSION"),
vec![],
total_log_entries,
);
let output = formatter.format_findings(findings)?;
write_or_print_output(output, args)?;
}
OutputFormat::Text => {
let formatter = TextFormatter::new();
let output = formatter.format_findings(findings)?;
write_or_print_output(output, args)?;
}
}
Ok(())
}
fn write_or_print_output(output: String, args: &Arguments) -> Result<()> {
if let Some(outfile) = &args.outfile {
if outfile == "-" {
println!("{}", output);
} else {
let output_path = if let Some(outdir) = &args.outdir {
Path::new(outdir).join(outfile)
} else {
PathBuf::from(outfile)
};
fs::write(&output_path, output)?;
info!("Results written to {}", output_path.display());
}
} else {
println!("{}", output);
}
Ok(())
}
fn create_progress_bar() -> ProgressBar {
let pb = ProgressBar::new(100);
pb.set_style(
ProgressStyle::default_bar()
.template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} {msg}")
.unwrap()
.progress_chars("#>-"),
);
pb
}