use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use clap::Parser;
use std::collections::HashMap;
use std::fs::File;
use std::io::{self, BufRead, BufReader, Write};
use std::path::PathBuf;
mod event;
mod formatters;
mod parsers;
use formatters::{DefaultFormatter, Formatter, JsonlFormatter};
use parsers::{JsonlParser, LogParser, LogfmtParser, SyslogParser};
#[derive(Parser)]
#[command(name = "kelora")]
#[command(about = "A fast, extensible log parser")]
#[command(version = "0.1.0")]
#[command(author = "Dirk Loss <mail@dirk-loss.de>")]
pub struct Cli {
pub files: Vec<PathBuf>,
#[arg(short = 'f', long = "format", value_enum, default_value = "logfmt")]
pub input_format: InputFormat,
#[arg(
short = 'F',
long = "output-format",
value_enum,
default_value = "default"
)]
pub output_format: OutputFormat,
#[arg(short = 'k', long = "keys", value_delimiter = ',')]
pub keys: Vec<String>,
#[arg(short = 'l', long = "level", value_delimiter = ',')]
pub levels: Vec<String>,
#[arg(short = 'S', long = "stats-only")]
pub stats_only: bool,
#[arg(short = 's', long = "stats")]
pub stats: bool,
#[arg(long)]
pub debug: bool,
#[arg(short = 'c', long = "common")]
pub common: bool,
}
#[derive(clap::ValueEnum, Clone, Debug)]
pub enum InputFormat {
Logfmt,
Jsonl,
Syslog,
}
#[derive(clap::ValueEnum, Clone, Debug)]
pub enum OutputFormat {
Default,
Jsonl,
}
#[derive(Debug, Default)]
pub struct Stats {
pub lines_seen: usize,
pub events_shown: usize,
pub parse_errors: usize,
pub filtered_out: usize,
pub start_time: Option<DateTime<Utc>>,
pub end_time: Option<DateTime<Utc>>,
pub levels_seen: HashMap<String, usize>,
}
impl Stats {
pub fn new() -> Self {
Self::default()
}
pub fn record_event(&mut self, event: &event::Event) {
self.events_shown += 1;
if let Some(timestamp) = &event.timestamp {
if self.start_time.is_none() || Some(*timestamp) < self.start_time {
self.start_time = Some(*timestamp);
}
if self.end_time.is_none() || Some(*timestamp) > self.end_time {
self.end_time = Some(*timestamp);
}
}
if let Some(level) = &event.level {
*self.levels_seen.entry(level.clone()).or_insert(0) += 1;
}
}
pub fn print_stats(&self) {
eprintln!(
"Events shown: {} (parse errors: {}, lines seen: {}, filtered: {})",
self.events_shown, self.parse_errors, self.lines_seen, self.filtered_out
);
if let (Some(start), Some(end)) = (&self.start_time, &self.end_time) {
let duration = end.signed_duration_since(*start);
eprintln!(
"Time span: {} to {} (duration: {})",
start.format("%Y-%m-%dT%H:%M:%S%.3fZ"),
end.format("%Y-%m-%dT%H:%M:%S%.3fZ"),
format_duration(duration)
);
}
if !self.levels_seen.is_empty() {
let mut levels: Vec<_> = self.levels_seen.iter().collect();
levels.sort_by_key(|(level, _)| level.as_str());
eprintln!(
"Log levels: {}",
levels
.iter()
.map(|(level, count)| format!("{}({})", level, count))
.collect::<Vec<_>>()
.join(", ")
);
}
}
}
fn format_duration(duration: chrono::Duration) -> String {
let seconds = duration.num_seconds();
let hours = seconds / 3600;
let minutes = (seconds % 3600) / 60;
let secs = seconds % 60;
if hours > 0 {
format!("{}h{}m{}s", hours, minutes, secs)
} else if minutes > 0 {
format!("{}m{}s", minutes, secs)
} else {
format!("{}s", secs)
}
}
fn main() -> Result<()> {
let cli = Cli::parse();
let parser = create_parser(&cli.input_format);
let formatter = create_formatter(&cli.output_format);
let readers: Vec<Box<dyn BufRead>> = if cli.files.is_empty() {
vec![Box::new(io::stdin().lock())]
} else {
cli.files
.iter()
.map(|path| open_input_file(path))
.collect::<Result<Vec<_>>>()?
};
let mut stats = Stats::new();
let levels_filter = prepare_levels_filter(&cli.levels);
let keys_filter = prepare_keys_filter(&cli);
for reader in readers {
process_reader(
reader,
&*parser,
&*formatter,
&mut stats,
&levels_filter,
&keys_filter,
&cli,
)?;
}
if cli.stats_only || cli.stats {
stats.print_stats();
}
Ok(())
}
fn create_parser(format: &InputFormat) -> Box<dyn LogParser> {
match format {
InputFormat::Logfmt => Box::new(LogfmtParser::new()),
InputFormat::Jsonl => Box::new(JsonlParser::new()),
InputFormat::Syslog => Box::new(SyslogParser::new()),
}
}
fn create_formatter(format: &OutputFormat) -> Box<dyn Formatter> {
match format {
OutputFormat::Default => Box::new(DefaultFormatter::new()),
OutputFormat::Jsonl => Box::new(JsonlFormatter::new()),
}
}
fn open_input_file(path: &PathBuf) -> Result<Box<dyn BufRead>> {
let file =
File::open(path).with_context(|| format!("Failed to open file: {}", path.display()))?;
Ok(Box::new(BufReader::new(file)))
}
fn prepare_levels_filter(levels: &[String]) -> Option<Vec<String>> {
if levels.is_empty() {
None
} else {
Some(levels.iter().map(|level| level.to_uppercase()).collect())
}
}
fn prepare_keys_filter(cli: &Cli) -> Option<Vec<String>> {
if cli.common {
Some(vec![
"timestamp".to_string(),
"level".to_string(),
"message".to_string(),
])
} else if !cli.keys.is_empty() {
Some(cli.keys.clone())
} else {
None
}
}
fn process_reader(
reader: Box<dyn BufRead>,
parser: &dyn LogParser,
formatter: &dyn Formatter,
stats: &mut Stats,
levels_filter: &Option<Vec<String>>,
keys_filter: &Option<Vec<String>>,
cli: &Cli,
) -> Result<()> {
for (line_num, line_result) in reader.lines().enumerate() {
let line = line_result.with_context(|| format!("Failed to read line {}", line_num + 1))?;
stats.lines_seen += 1;
if line.trim().is_empty() {
continue;
}
match parser.parse(&line) {
Ok(mut event) => {
if let Some(ref levels) = levels_filter {
if let Some(ref level) = event.level {
if !levels.contains(&level.to_uppercase()) {
stats.filtered_out += 1;
continue;
}
} else {
stats.filtered_out += 1;
continue;
}
}
if let Some(ref keys) = keys_filter {
event.filter_keys(keys);
if !event.has_displayable_content() {
stats.filtered_out += 1;
continue;
}
}
stats.record_event(&event);
if !cli.stats_only {
if let Err(e) = writeln!(io::stdout(), "{}", formatter.format(&event)) {
if e.kind() == std::io::ErrorKind::BrokenPipe {
break;
} else {
return Err(anyhow::Error::from(e));
}
}
}
}
Err(e) => {
stats.parse_errors += 1;
if cli.debug {
eprintln!("Parse error on line {}: {}", line_num + 1, e);
}
}
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_format_duration() {
let duration = chrono::Duration::seconds(3661); assert_eq!(format_duration(duration), "1h1m1s");
let duration = chrono::Duration::seconds(61); assert_eq!(format_duration(duration), "1m1s");
let duration = chrono::Duration::seconds(30); assert_eq!(format_duration(duration), "30s");
}
#[test]
fn test_prepare_levels_filter() {
let levels = vec!["error".to_string(), "warn".to_string()];
let result = prepare_levels_filter(&levels);
assert_eq!(result, Some(vec!["ERROR".to_string(), "WARN".to_string()]));
let empty_levels: Vec<String> = vec![];
let result = prepare_levels_filter(&empty_levels);
assert_eq!(result, None);
}
}