use anyhow::Result;
use clap::{ArgMatches, CommandFactory, FromArgMatches};
use rhai::Dynamic;
use std::collections::HashMap;
use std::sync::atomic::Ordering;
mod cli;
mod colors;
mod config;
mod config_file;
mod decompression;
mod engine;
mod event;
mod formatters;
mod parallel;
mod parsers;
mod pipeline;
mod platform;
mod readers;
mod rhai_functions;
mod stats;
mod timestamp;
mod tty;
use config::KeloraConfig;
use config_file::ConfigFile;
use platform::{
ExitCode, ProcessCleanup, SafeFileOut, SafeStderr, SafeStdout, SignalHandler, SHOULD_TERMINATE,
TERMINATED_BY_SIGNAL,
};
use cli::{Cli, FileOrder, InputFormat, OutputFormat};
use config::{MultilineConfig, TimestampFilterConfig};
fn detect_format_from_peekable_reader<R: std::io::BufRead>(
reader: &mut readers::PeekableLineReader<R>,
) -> Result<config::InputFormat> {
match reader.peek_first_line()? {
None => {
Ok(config::InputFormat::Line)
}
Some(line) => {
let trimmed_line = line.trim_end_matches(&['\r', '\n'][..]);
let detected = parsers::detect_format(trimmed_line)?;
Ok(detected)
}
}
}
fn detect_format_for_parallel_mode(files: &[String]) -> Result<config::InputFormat> {
use std::io;
if files.is_empty() {
let stdin = io::stdin();
let stdin_lock = stdin.lock();
let mut peekable_reader = readers::PeekableLineReader::new(stdin_lock);
match detect_format_from_peekable_reader(&mut peekable_reader)? {
config::InputFormat::Auto => Ok(config::InputFormat::Line), format => Ok(format),
}
} else {
let sorted_files = pipeline::builders::sort_files(files, &config::FileOrder::Cli)?;
if sorted_files.is_empty() {
return Ok(config::InputFormat::Line);
}
let first_file = &sorted_files[0];
let decompressed = decompression::DecompressionReader::new(first_file)?;
let mut peekable_reader = readers::PeekableLineReader::new(decompressed);
match detect_format_from_peekable_reader(&mut peekable_reader)? {
config::InputFormat::Auto => Ok(config::InputFormat::Line), format => Ok(format),
}
}
}
use parallel::{ParallelConfig, ParallelProcessor};
use pipeline::{
create_input_reader, create_pipeline_builder_from_config, create_pipeline_from_config,
};
use platform::check_termination;
use stats::{
get_thread_stats, stats_add_error, stats_add_line_filtered, stats_add_line_output,
stats_add_line_read, stats_finish_processing, stats_start_timer, ProcessingStats,
};
use std::io::{self, BufRead, Write};
#[derive(Debug)]
struct PipelineResult {
pub stats: Option<ProcessingStats>,
pub tracking_data: HashMap<String, Dynamic>,
}
fn run_pipeline_with_kelora_config<W: Write + Send + 'static>(
config: &KeloraConfig,
output: W,
) -> Result<PipelineResult> {
if config.output.stats {
stats_start_timer();
}
let use_parallel = config.should_use_parallel();
if use_parallel {
run_pipeline_parallel(config, output)
} else {
let mut output = output;
run_pipeline_sequential(config, &mut output)?;
let tracking_data = rhai_functions::tracking::get_thread_tracking_state();
stats_finish_processing();
let mut stats = get_thread_stats();
stats.extract_discovered_from_tracking(&tracking_data);
let final_stats = Some(stats);
Ok(PipelineResult {
stats: final_stats,
tracking_data,
})
}
}
fn run_pipeline_parallel<W: Write + Send + 'static>(
config: &KeloraConfig,
output: W,
) -> Result<PipelineResult> {
let final_config = if matches!(config.input.format, config::InputFormat::Auto) {
let detected_format = detect_format_for_parallel_mode(&config.input.files)?;
if config.processing.quiet_level == 0 {
let format_name = format!("{:?}", detected_format).to_lowercase();
let message =
config.format_error_message(&format!("auto-detected format: {}", format_name));
eprintln!("{}", message);
}
let mut new_config = config.clone();
new_config.input.format = detected_format;
new_config
} else {
config.clone()
};
let config = &final_config;
let batch_size = config.effective_batch_size();
let parallel_config = ParallelConfig {
num_workers: config.effective_threads(),
batch_size,
batch_timeout_ms: config.performance.batch_timeout,
preserve_order: !config.performance.no_preserve_order,
buffer_size: Some(10000),
};
let processor =
ParallelProcessor::new(parallel_config).with_take_limit(config.processing.take_limit);
let pipeline_builder = create_pipeline_builder_from_config(config);
let (_pipeline, begin_stage, end_stage, mut ctx) = pipeline_builder
.clone()
.build(config.processing.stages.clone())?;
if let Err(e) = begin_stage.execute(&mut ctx) {
return Err(anyhow::anyhow!("Begin stage error: {}", e));
}
let reader = create_input_reader(config)?;
processor.process_with_pipeline(
reader,
pipeline_builder,
config.processing.stages.clone(),
config,
output,
)?;
let parallel_tracked = processor.get_final_tracked_state();
processor
.extract_final_stats_from_tracking(¶llel_tracked)
.unwrap_or(());
for (key, dynamic_value) in ¶llel_tracked {
if !key.starts_with("__internal_")
&& !key.starts_with("__kelora_stats_")
&& !key.starts_with("__op___kelora_stats_")
&& !key.starts_with("__kelora_error_")
&& !key.starts_with("__op___kelora_error_")
{
ctx.tracker.insert(key.clone(), dynamic_value.clone());
}
}
if let Err(e) = end_stage.execute(&ctx) {
return Err(anyhow::anyhow!("End stage error: {}", e));
}
Ok(PipelineResult {
stats: Some(processor.get_final_stats()),
tracking_data: parallel_tracked,
})
}
fn run_pipeline_sequential<W: Write>(config: &KeloraConfig, output: &mut W) -> Result<()> {
if matches!(config.input.format, config::InputFormat::Auto) {
return run_pipeline_sequential_with_auto_detection(config, output);
}
let (mut pipeline, begin_stage, end_stage, mut ctx) = create_pipeline_from_config(config)?;
if let Err(e) = begin_stage.execute(&mut ctx) {
return Err(anyhow::anyhow!("Begin stage error: {}", e));
}
let mut current_csv_headers: Option<Vec<String>> = None;
let mut last_filename: Option<String> = None;
let mut line_num = 0;
let mut skipped_lines = 0;
if config.input.files.is_empty() {
let stdin = io::stdin();
let reader = stdin.lock();
for line_result in reader.lines() {
if check_termination().is_err() {
return Ok(());
}
match process_line_sequential(
line_result,
&mut line_num,
&mut skipped_lines,
&mut pipeline,
&mut ctx,
config,
output,
None,
&mut current_csv_headers,
&mut last_filename,
)? {
ProcessingResult::Continue => {}
ProcessingResult::TakeLimitExhausted => break,
}
if rhai_functions::process::is_exit_requested() {
let exit_code = rhai_functions::process::get_exit_code();
std::process::exit(exit_code);
}
}
} else {
let sorted_files =
pipeline::builders::sort_files(&config.input.files, &config.input.file_order)?;
let mut multi_reader = readers::MultiFileReader::new(sorted_files)?;
let mut line_buf = String::new();
loop {
if check_termination().is_err() {
return Ok(());
}
line_buf.clear();
let bytes_read = match multi_reader.read_line(&mut line_buf) {
Ok(0) => break, Ok(n) => n,
Err(e) => {
let line_result = Err(e);
let current_filename = multi_reader.current_filename().map(|s| s.to_string());
match process_line_sequential(
line_result,
&mut line_num,
&mut skipped_lines,
&mut pipeline,
&mut ctx,
config,
output,
current_filename,
&mut current_csv_headers,
&mut last_filename,
)? {
ProcessingResult::Continue => {}
ProcessingResult::TakeLimitExhausted => break,
}
if rhai_functions::process::is_exit_requested() {
let exit_code = rhai_functions::process::get_exit_code();
std::process::exit(exit_code);
}
continue;
}
};
if bytes_read > 0 {
let current_filename = multi_reader.current_filename().map(|s| s.to_string());
match process_line_sequential(
Ok(line_buf.clone()),
&mut line_num,
&mut skipped_lines,
&mut pipeline,
&mut ctx,
config,
output,
current_filename,
&mut current_csv_headers,
&mut last_filename,
)? {
ProcessingResult::Continue => {}
ProcessingResult::TakeLimitExhausted => break,
}
if rhai_functions::process::is_exit_requested() {
let exit_code = rhai_functions::process::get_exit_code();
std::process::exit(exit_code);
}
}
}
}
let results = pipeline.flush(&mut ctx)?;
for result in results {
if !result.is_empty() {
writeln!(output, "{}", result)?;
}
}
if let Err(e) = end_stage.execute(&ctx) {
return Err(anyhow::anyhow!("End stage error: {}", e));
}
rhai_functions::tracking::merge_thread_tracking_to_context(&mut ctx);
Ok(())
}
fn run_pipeline_sequential_with_auto_detection<W: Write>(
config: &KeloraConfig,
output: &mut W,
) -> Result<()> {
if config.input.files.is_empty() {
let stdin = io::stdin();
let stdin_lock = stdin.lock();
let mut peekable_reader = readers::PeekableLineReader::new(stdin_lock);
let detected_format = detect_format_from_peekable_reader(&mut peekable_reader)?;
if config.processing.quiet_level == 0 {
let format_name = format!("{:?}", detected_format).to_lowercase();
let message =
config.format_error_message(&format!("auto-detected format: {}", format_name));
eprintln!("{}", message);
}
let mut final_config = config.clone();
final_config.input.format = detected_format;
let (mut pipeline, begin_stage, end_stage, mut ctx) =
create_pipeline_from_config(&final_config)?;
if let Err(e) = begin_stage.execute(&mut ctx) {
return Err(anyhow::anyhow!("Begin stage error: {}", e));
}
run_sequential_with_reader(
&mut peekable_reader,
&mut pipeline,
&mut ctx,
&final_config,
output,
None, )?;
if let Err(e) = end_stage.execute(&ctx) {
return Err(anyhow::anyhow!("End stage error: {}", e));
}
rhai_functions::tracking::merge_thread_tracking_to_context(&mut ctx);
} else {
let sorted_files =
pipeline::builders::sort_files(&config.input.files, &config.input.file_order)?;
if sorted_files.is_empty() {
return Ok(());
}
let first_file = &sorted_files[0];
let detected_format = {
let decompressed = decompression::DecompressionReader::new(first_file)?;
let mut peekable_reader = readers::PeekableLineReader::new(decompressed);
detect_format_from_peekable_reader(&mut peekable_reader)?
};
if config.processing.quiet_level == 0 {
let format_name = format!("{:?}", detected_format).to_lowercase();
let message =
config.format_error_message(&format!("auto-detected format: {}", format_name));
eprintln!("{}", message);
}
let mut final_config = config.clone();
final_config.input.format = detected_format;
let (mut pipeline, begin_stage, end_stage, mut ctx) =
create_pipeline_from_config(&final_config)?;
if let Err(e) = begin_stage.execute(&mut ctx) {
return Err(anyhow::anyhow!("Begin stage error: {}", e));
}
let mut multi_reader = readers::MultiFileReader::new(sorted_files)?;
run_sequential_with_multi_reader(
&mut multi_reader,
&mut pipeline,
&mut ctx,
&final_config,
output,
)?;
if let Err(e) = end_stage.execute(&ctx) {
return Err(anyhow::anyhow!("End stage error: {}", e));
}
rhai_functions::tracking::merge_thread_tracking_to_context(&mut ctx);
}
Ok(())
}
fn run_sequential_with_reader<W: Write, R: BufRead>(
reader: &mut R,
pipeline: &mut pipeline::Pipeline,
ctx: &mut pipeline::PipelineContext,
config: &KeloraConfig,
output: &mut W,
multi_reader: Option<&mut readers::MultiFileReader>, ) -> Result<()> {
let mut current_csv_headers: Option<Vec<String>> = None;
let mut last_filename: Option<String> = None;
let mut line_num = 0;
let mut skipped_lines = 0;
let mut line_buf = String::new();
loop {
if check_termination().is_err() {
return Ok(());
}
line_buf.clear();
let bytes_read = match reader.read_line(&mut line_buf) {
Ok(0) => break, Ok(n) => n,
Err(e) => {
let line_result = Err(e);
let current_filename = multi_reader
.as_ref()
.and_then(|mr| mr.current_filename().map(|s| s.to_string()));
match process_line_sequential(
line_result,
&mut line_num,
&mut skipped_lines,
pipeline,
ctx,
config,
output,
current_filename,
&mut current_csv_headers,
&mut last_filename,
)? {
ProcessingResult::Continue => {}
ProcessingResult::TakeLimitExhausted => break,
}
if rhai_functions::process::is_exit_requested() {
let exit_code = rhai_functions::process::get_exit_code();
std::process::exit(exit_code);
}
continue;
}
};
if bytes_read > 0 {
let current_filename = multi_reader
.as_ref()
.and_then(|mr| mr.current_filename().map(|s| s.to_string()));
match process_line_sequential(
Ok(line_buf.clone()),
&mut line_num,
&mut skipped_lines,
pipeline,
ctx,
config,
output,
current_filename,
&mut current_csv_headers,
&mut last_filename,
)? {
ProcessingResult::Continue => {}
ProcessingResult::TakeLimitExhausted => break,
}
if rhai_functions::process::is_exit_requested() {
let exit_code = rhai_functions::process::get_exit_code();
std::process::exit(exit_code);
}
}
}
let results = pipeline.flush(ctx)?;
for result in results {
if !result.is_empty() {
writeln!(output, "{}", result)?;
}
}
Ok(())
}
fn run_sequential_with_multi_reader<W: Write>(
multi_reader: &mut readers::MultiFileReader,
pipeline: &mut pipeline::Pipeline,
ctx: &mut pipeline::PipelineContext,
config: &KeloraConfig,
output: &mut W,
) -> Result<()> {
let mut current_csv_headers: Option<Vec<String>> = None;
let mut last_filename: Option<String> = None;
let mut line_num = 0;
let mut skipped_lines = 0;
let mut line_buf = String::new();
loop {
if check_termination().is_err() {
return Ok(());
}
line_buf.clear();
let bytes_read = match multi_reader.read_line(&mut line_buf) {
Ok(0) => break, Ok(n) => n,
Err(e) => {
let line_result = Err(e);
let current_filename = multi_reader.current_filename().map(|s| s.to_string());
match process_line_sequential(
line_result,
&mut line_num,
&mut skipped_lines,
pipeline,
ctx,
config,
output,
current_filename,
&mut current_csv_headers,
&mut last_filename,
)? {
ProcessingResult::Continue => {}
ProcessingResult::TakeLimitExhausted => break,
}
if rhai_functions::process::is_exit_requested() {
let exit_code = rhai_functions::process::get_exit_code();
std::process::exit(exit_code);
}
continue;
}
};
if bytes_read > 0 {
let current_filename = multi_reader.current_filename().map(|s| s.to_string());
match process_line_sequential(
Ok(line_buf.clone()),
&mut line_num,
&mut skipped_lines,
pipeline,
ctx,
config,
output,
current_filename,
&mut current_csv_headers,
&mut last_filename,
)? {
ProcessingResult::Continue => {}
ProcessingResult::TakeLimitExhausted => break,
}
if rhai_functions::process::is_exit_requested() {
let exit_code = rhai_functions::process::get_exit_code();
std::process::exit(exit_code);
}
}
}
let results = pipeline.flush(ctx)?;
for result in results {
if !result.is_empty() {
writeln!(output, "{}", result)?;
}
}
Ok(())
}
enum ProcessingResult {
Continue,
TakeLimitExhausted,
}
#[allow(clippy::too_many_arguments)]
fn process_line_sequential<W: Write>(
line_result: io::Result<String>,
line_num: &mut usize,
skipped_lines: &mut usize,
pipeline: &mut pipeline::Pipeline,
ctx: &mut pipeline::PipelineContext,
config: &KeloraConfig,
output: &mut W,
current_filename: Option<String>,
current_csv_headers: &mut Option<Vec<String>>,
last_filename: &mut Option<String>,
) -> Result<ProcessingResult> {
let line = line_result?;
*line_num += 1;
if config.output.stats {
stats_add_line_read();
}
if *skipped_lines < config.input.skip_lines {
*skipped_lines += 1;
if config.output.stats {
stats_add_line_filtered();
}
return Ok(ProcessingResult::Continue);
}
if let Some(ref ignore_regex) = config.input.ignore_lines {
if ignore_regex.is_match(&line) {
if config.output.stats {
stats_add_line_filtered();
}
return Ok(ProcessingResult::Continue);
}
}
if line.trim().is_empty() {
if !matches!(config.input.format, config::InputFormat::Line) {
return Ok(ProcessingResult::Continue);
}
}
if matches!(
config.input.format,
config::InputFormat::Csv
| config::InputFormat::Tsv
| config::InputFormat::Csvnh
| config::InputFormat::Tsvnh
) && (current_filename != *last_filename
|| (current_filename.is_none() && current_csv_headers.is_none()))
{
let mut temp_parser = match config.input.format {
config::InputFormat::Csv => parsers::CsvParser::new_csv(),
config::InputFormat::Tsv => parsers::CsvParser::new_tsv(),
config::InputFormat::Csvnh => parsers::CsvParser::new_csv_no_headers(),
config::InputFormat::Tsvnh => parsers::CsvParser::new_tsv_no_headers(),
_ => unreachable!(),
};
let was_consumed = temp_parser.initialize_headers_from_line(&line)?;
let headers = temp_parser.get_headers();
*current_csv_headers = Some(headers.clone());
*last_filename = current_filename.clone();
let mut pipeline_builder = create_pipeline_builder_from_config(config);
pipeline_builder = pipeline_builder.with_csv_headers(headers);
let (new_pipeline, _unused_begin_stage, _unused_end_stage, new_ctx) =
pipeline_builder.build(config.processing.stages.clone())?;
*pipeline = new_pipeline;
ctx.rhai = new_ctx.rhai;
if was_consumed {
return Ok(ProcessingResult::Continue);
}
}
ctx.meta.line_num = Some(*line_num);
ctx.meta.filename = current_filename;
match pipeline.process_line(line, ctx) {
Ok(results) => {
if config.output.stats && !results.is_empty() {
stats_add_line_output();
}
for result in results {
if !result.is_empty() {
writeln!(output, "{}", result)?;
}
}
if pipeline.is_take_limit_exhausted() {
return Ok(ProcessingResult::TakeLimitExhausted);
}
}
Err(e) => {
if config.output.stats {
stats_add_error();
}
if config.processing.strict {
return Err(e);
}
}
}
Ok(ProcessingResult::Continue)
}
fn main() -> Result<()> {
let _signal_handler = match SignalHandler::new() {
Ok(handler) => handler,
Err(e) => {
eprintln!("Failed to initialize signal handling: {}", e);
ExitCode::GeneralError.exit();
}
};
let _cleanup = ProcessCleanup::new();
let mut stderr = SafeStderr::new();
let (matches, cli) = process_args_with_config(&mut stderr);
if let Err(e) = validate_cli_args(&cli) {
stderr
.writeln(&format!("kelora: Error: {}", e))
.unwrap_or(());
ExitCode::InvalidUsage.exit();
}
let ordered_stages = match cli.get_ordered_script_stages(&matches) {
Ok(stages) => stages,
Err(e) => {
stderr
.writeln(&format!("kelora: Error: {}", e))
.unwrap_or(());
ExitCode::InvalidUsage.exit();
}
};
let mut config = KeloraConfig::from_cli(&cli);
config.processing.stages = ordered_stages;
if cli.since.is_some() || cli.until.is_some() {
let cli_timezone = config.input.default_timezone.as_deref();
let since = if let Some(ref since_str) = cli.since {
match crate::timestamp::parse_timestamp_arg_with_timezone(since_str, cli_timezone) {
Ok(dt) => Some(dt),
Err(e) => {
stderr
.writeln(&config.format_error_message(&format!(
"Invalid --since timestamp '{}': {}",
since_str, e
)))
.unwrap_or(());
ExitCode::InvalidUsage.exit();
}
}
} else {
None
};
let until = if let Some(ref until_str) = cli.until {
match crate::timestamp::parse_timestamp_arg_with_timezone(until_str, cli_timezone) {
Ok(dt) => Some(dt),
Err(e) => {
stderr
.writeln(&config.format_error_message(&format!(
"Invalid --until timestamp '{}': {}",
until_str, e
)))
.unwrap_or(());
ExitCode::InvalidUsage.exit();
}
}
} else {
None
};
config.processing.timestamp_filter = Some(TimestampFilterConfig { since, until });
}
if let Some(ignore_pattern) = &cli.ignore_lines {
match regex::Regex::new(ignore_pattern) {
Ok(regex) => {
config.input.ignore_lines = Some(regex);
}
Err(e) => {
stderr
.writeln(&config.format_error_message(&format!(
"Invalid ignore-lines regex pattern '{}': {}",
ignore_pattern, e
)))
.unwrap_or(());
ExitCode::InvalidUsage.exit();
}
}
}
if let Some(multiline_str) = &cli.multiline {
match MultilineConfig::parse(multiline_str) {
Ok(multiline_config) => {
config.input.multiline = Some(multiline_config);
}
Err(e) => {
stderr
.writeln(&config.format_error_message(&format!(
"Invalid multiline configuration '{}': {}",
multiline_str, e
)))
.unwrap_or(());
ExitCode::InvalidUsage.exit();
}
}
} else {
config.input.multiline = config.input.format.default_multiline();
}
if let Err(e) = validate_cli_args(&cli) {
stderr
.writeln(&config.format_error_message(&format!("Error: {}", e)))
.unwrap_or(());
ExitCode::InvalidUsage.exit();
}
let result = if let Some(ref output_file_path) = cli.output_file {
let file_output = match SafeFileOut::new(output_file_path) {
Ok(file) => file,
Err(e) => {
stderr
.writeln(&config.format_error_message(&e.to_string()))
.unwrap_or(());
ExitCode::GeneralError.exit();
}
};
run_pipeline_with_kelora_config(&config, file_output)
} else {
let stdout_output = SafeStdout::new();
run_pipeline_with_kelora_config(&config, stdout_output)
};
let (final_stats, tracking_data) = match result {
Ok(pipeline_result) => {
if config.output.metrics && !SHOULD_TERMINATE.load(Ordering::Relaxed) {
let metrics_output = crate::rhai_functions::tracking::format_metrics_output(
&pipeline_result.tracking_data,
);
if !metrics_output.is_empty() && metrics_output != "No metrics tracked" {
stderr
.writeln(&config.format_metrics_message(&metrics_output))
.unwrap_or(());
}
}
if let Some(ref metrics_file) = config.output.metrics_file {
if let Ok(json_output) = crate::rhai_functions::tracking::format_metrics_json(
&pipeline_result.tracking_data,
) {
if let Err(e) = std::fs::write(metrics_file, json_output) {
stderr
.writeln(&config.format_error_message(&format!(
"Failed to write metrics file: {}",
e
)))
.unwrap_or(());
}
}
}
if !SHOULD_TERMINATE.load(Ordering::Relaxed) {
if let Some(ref s) = pipeline_result.stats {
if config.output.stats && config.processing.quiet_level == 0 {
stderr
.writeln(&config.format_stats_message(
&s.format_stats(config.input.multiline.is_some()),
))
.unwrap_or(());
} else if config.processing.quiet_level == 0 {
if let Some(error_summary) =
crate::rhai_functions::tracking::extract_error_summary_from_tracking(
&pipeline_result.tracking_data,
config.processing.verbose,
Some(&config),
)
{
stderr
.writeln(&config.format_error_message(&error_summary))
.unwrap_or(());
}
}
}
}
(pipeline_result.stats, Some(pipeline_result.tracking_data))
}
Err(e) => {
stderr
.writeln(&config.format_error_message(&format!("Pipeline error: {}", e)))
.unwrap_or(());
ExitCode::GeneralError.exit();
}
};
if TERMINATED_BY_SIGNAL.load(Ordering::Relaxed) {
if let Some(stats) = final_stats {
if config.output.stats && config.processing.quiet_level == 0 {
stderr
.writeln(&config.format_stats_message(
&stats.format_stats(config.input.multiline.is_some()),
))
.unwrap_or(());
} else if stats.has_errors() && config.processing.quiet_level == 0 {
stderr
.writeln(&config.format_error_message(&stats.format_error_summary()))
.unwrap_or(());
}
} else if config.output.stats && config.processing.quiet_level == 0 {
stderr
.writeln(&config.format_stats_message("Processing interrupted"))
.unwrap_or(());
}
ExitCode::SignalInt.exit();
}
let had_errors = if let Some(ref tracking) = tracking_data {
crate::rhai_functions::tracking::has_errors_in_tracking(tracking)
} else if let Some(ref stats) = final_stats {
stats.has_errors()
} else {
false
};
if had_errors {
ExitCode::GeneralError.exit();
} else {
ExitCode::Success.exit();
}
}
fn validate_cli_args(cli: &Cli) -> Result<()> {
let mut stdin_count = 0;
for file_path in &cli.files {
if file_path == "-" {
stdin_count += 1;
if stdin_count > 1 {
return Err(anyhow::anyhow!("stdin (\"-\") can only be specified once"));
}
} else if !std::path::Path::new(file_path).exists() {
return Err(anyhow::anyhow!("File not found: {}", file_path));
}
}
for exec_file in &cli.exec_files {
if !std::path::Path::new(exec_file).exists() {
return Err(anyhow::anyhow!("Exec file not found: {}", exec_file));
}
}
if let Some(batch_size) = cli.batch_size {
if batch_size == 0 {
return Err(anyhow::anyhow!("Batch size must be greater than 0"));
}
}
if cli.threads > 1000 {
return Err(anyhow::anyhow!("Thread count too high (max 1000)"));
}
if cli.core {
match cli.output_format {
OutputFormat::Csv => {
return Err(anyhow::anyhow!(
"csv output format does not support --core flag. Use --keys to specify field names"
));
}
OutputFormat::Tsv => {
return Err(anyhow::anyhow!(
"tsv output format does not support --core flag. Use --keys to specify field names"
));
}
OutputFormat::Csvnh => {
return Err(anyhow::anyhow!(
"csvnh output format does not support --core flag. Use --keys to specify field names"
));
}
OutputFormat::Tsvnh => {
return Err(anyhow::anyhow!(
"tsvnh output format does not support --core flag. Use --keys to specify field names"
));
}
_ => {
}
}
}
Ok(())
}
#[allow(dead_code)]
fn validate_config(config: &KeloraConfig) -> Result<()> {
for file_path in &config.input.files {
if !std::path::Path::new(file_path).exists() {
return Err(anyhow::anyhow!("File not found: {}", file_path));
}
}
if let Some(batch_size) = config.performance.batch_size {
if batch_size == 0 {
return Err(anyhow::anyhow!("Batch size must be greater than 0"));
}
}
if config.performance.threads > 1000 {
return Err(anyhow::anyhow!("Thread count too high (max 1000)"));
}
Ok(())
}
fn extract_config_file_arg(args: &[String]) -> Option<String> {
for i in 0..args.len() {
if args[i] == "--config-file" && i + 1 < args.len() {
return Some(args[i + 1].clone());
}
}
None
}
fn process_args_with_config(stderr: &mut SafeStderr) -> (ArgMatches, Cli) {
let raw_args: Vec<String> = std::env::args().collect();
if raw_args.iter().any(|arg| arg == "--show-config") {
ConfigFile::show_config();
std::process::exit(0);
}
if raw_args.iter().any(|arg| arg == "--help-time") {
print_time_format_help();
std::process::exit(0);
}
if raw_args.iter().any(|arg| arg == "--help-functions") {
print_functions_help();
std::process::exit(0);
}
if raw_args.iter().any(|arg| arg == "--help-rhai") {
print_rhai_help();
std::process::exit(0);
}
if raw_args.iter().any(|arg| arg == "--help-multiline") {
print_multiline_help();
std::process::exit(0);
}
let ignore_config = raw_args.iter().any(|arg| arg == "--ignore-config");
let config_file_path = extract_config_file_arg(&raw_args);
let processed_args = if ignore_config {
raw_args
} else {
match ConfigFile::load_with_custom_path(config_file_path.as_deref()) {
Ok(config_file) => match config_file.process_args(raw_args) {
Ok(processed) => processed,
Err(e) => {
stderr
.writeln(&format!("kelora: Config error: {}", e))
.unwrap_or(());
std::process::exit(1);
}
},
Err(e) => {
stderr
.writeln(&format!("kelora: Config file error: {}", e))
.unwrap_or(());
std::process::exit(1);
}
}
};
let matches = Cli::command().get_matches_from(processed_args);
let mut cli = Cli::from_arg_matches(&matches).unwrap_or_else(|e| {
stderr
.writeln(&format!("kelora: Error: {}", e))
.unwrap_or(());
std::process::exit(1);
});
cli.resolve_boolean_flags();
if crate::tty::is_stdin_tty() && cli.files.is_empty() {
println!("{}", Cli::command().render_usage());
println!("A command-line log analysis tool with embedded Rhai scripting");
println!("Try 'kelora --help' for more information.");
std::process::exit(0);
}
(matches, cli)
}
fn print_time_format_help() {
let help_text = r#"
Time Format Reference for --ts-format:
Basic Date/Time Components:
%Y - Year with century (e.g., 2024)
%y - Year without century (00-99)
%m - Month as zero-padded decimal (01-12)
%b - Month as abbreviated name (Jan, Feb, ..., Dec)
%B - Month as full name (January, February, ..., December)
%d - Day of month as zero-padded decimal (01-31)
%j - Day of year as zero-padded decimal (001-366)
%H - Hour (24-hour) as zero-padded decimal (00-23)
%I - Hour (12-hour) as zero-padded decimal (01-12)
%p - AM/PM indicator
%M - Minute as zero-padded decimal (00-59)
%S - Second as zero-padded decimal (00-59)
Subsecond Precision:
%f - Microseconds (000000-999999)
%3f - Milliseconds (000-999)
%6f - Microseconds (000000-999999)
%9f - Nanoseconds (000000000-999999999)
%. - Subseconds with automatic precision
Time Zone:
%z - UTC offset (+HHMM or -HHMM)
%Z - Time zone name (if available)
%:z - UTC offset with colon (+HH:MM or -HH:MM)
Weekday:
%w - Weekday as decimal (0=Sunday, 6=Saturday)
%a - Weekday as abbreviated name (Sun, Mon, ..., Sat)
%A - Weekday as full name (Sunday, Monday, ..., Saturday)
Week Numbers:
%W - Week number (Monday as first day of week)
%U - Week number (Sunday as first day of week)
Common Examples:
%Y-%m-%d %H:%M:%S # 2024-01-15 14:30:45
%Y-%m-%dT%H:%M:%S%z # 2024-01-15T14:30:45+0000
%Y-%m-%d %H:%M:%S%.f # 2024-01-15 14:30:45.123456
%b %d %H:%M:%S # Jan 15 14:30:45 (syslog format)
%d/%b/%Y:%H:%M:%S %z # 15/Jan/2024:14:30:45 +0000 (Apache format)
%Y-%m-%d %H:%M:%S,%3f # 2024-01-15 14:30:45,123 (Python logging)
For complete format reference, see:
https://docs.rs/chrono/latest/chrono/format/strftime/index.html
"#;
println!("{}", help_text);
}
fn print_functions_help() {
let help_text = rhai_functions::docs::generate_help_text();
println!("{}", help_text);
}
fn print_rhai_help() {
let help_text = r#"
Rhai Scripting Guide for Kelora:
For complete Rhai language documentation, visit: https://rhai.rs
BASIC CONCEPTS:
e Current event (renamed from 'event')
e.field Access field directly
e.nested.field Access nested fields
e.scores[1] Array access (supports negative indexing)
e.headers["user-agent"] Field access with special characters
VARIABLE DECLARATION:
let myfield = e.col("1,2") Always use 'let' for new variables
let result = e.user.name.lower() Chain operations and store result
FIELD EXISTENCE AND SAFETY:
"field" in e Check if field exists
e.has_path("user.role") Check nested field existence
e.scores.len() > 0 Check if array has elements
type_of(e.field) != "()" Check if field has a value
FIELD AND EVENT REMOVAL:
e.field = () Remove individual field
e = () Remove entire event (filters out)
KELORA-SPECIFIC FUNCTIONS:
Use --help-functions to see all available functions for log processing:
regex operations, IP handling, text manipulation, JSON parsing,
key-value extraction, array processing, safe field access, and utilities.
METHOD CHAINING EXAMPLES:
e.message.extract_re("user=(\\w+)").upper()
e.client_ip.mask_ip(2)
e.url.extract_domain().lower()
e.timestamp.parse_ts().format("%H:%M")
FUNCTION VS METHOD SYNTAX:
extract_re(e.line, "\\d+") Function style (avoids conflicts)
e.line.extract_re("\\d+") Method style (better for chaining)
Both syntaxes work identically. Use method syntax for readability and chaining,
function syntax when method names conflict with field names.
COMMON PATTERNS:
# Safe field access with defaults
let user_role = e.get_path("user.role", "guest");
# Process arrays safely
if e.events.len() > 0 {
e.latest_event = e.events[-1];
e.event_types = unique(e.events.map(|event| event.type));
}
# Conditional event removal
if e.level != "ERROR" { e = (); }
# Field cleanup and transformation
e.password = (); e.ssn = (); // Remove sensitive fields
e.summary = e.method + " " + e.status;
ARRAY PROCESSING:
sorted(e.scores) Sort array numerically/lexicographically
reversed(e.items) Reverse array order
unique(e.tags) Remove duplicate elements
sorted_by(e.users, "age") Sort array of objects by field
e.tags.join(", ") Join array elements
JSON ARRAY HANDLING:
JSON arrays are automatically converted to native Rhai arrays with full
functionality (sorted, reversed, unique, etc.) while maintaining proper
JSON types in output formats.
SIDE EFFECTS IN QUIET MODE:
print("debug info") Levels -q/-qq: visible, -qqq: suppressed
eprint("error details") Levels -q/-qq: visible, -qqq: suppressed
# File operations preserved at all quiet levels
ERROR HANDLING:
Kelora uses resilient processing by default:
• Parse errors: Skip malformed lines, continue processing
• Filter errors: Evaluate to false, skip event
• Transform errors: Return original event unchanged
Use --strict for fail-fast behavior on any error.
For complete function reference: kelora --help-functions
For usage examples: kelora --help (see examples section)
For time format help: kelora --help-time
For multiline strategy help: kelora --help-multiline
"#;
println!("{}", help_text);
}
fn print_multiline_help() {
let help_text = r#"
Multiline Strategy Reference for --multiline:
Kelora supports various strategies for detecting multi-line event boundaries.
By default, multiline processing is disabled for all formats to avoid unexpected
buffering behavior in streaming scenarios.
AVAILABLE STRATEGIES:
timestamp[:pattern=REGEX]
Events start with timestamp pattern (anchored to line beginning)
Default pattern matches ISO dates and syslog timestamps
Examples:
-M timestamp # Use default timestamp patterns
-M timestamp:pattern=^\d{4} # Lines starting with 4 digits
-M timestamp:pattern=^\[.*\] # Lines starting with bracketed content
indent[:spaces=N|tabs|mixed]
Continuation lines are indented, new events start at column 1
Options:
spaces=N - Require exactly N spaces minimum
tabs - Only tabs count as indentation
mixed - Any whitespace counts (default)
Examples:
-M indent # Any whitespace indentation
-M indent:spaces=4 # Minimum 4 spaces
-M indent:tabs # Only tab indentation
start:REGEX
Events start when line matches pattern
Pattern is a regular expression
Examples:
-M start:^ERROR # Events start with "ERROR"
-M start:^\d{4}-\d{2}-\d{2} # Events start with date format
-M start:^[A-Z]+: # Events start with UPPERCASE:
end:REGEX
Events end when line matches pattern
Current event completes when pattern is found
Examples:
-M end:^$ # Events end at blank lines
-M end:END_OF_EVENT # Events end with specific marker
-M end:^---+$ # Events end with dashed separator
boundary:start=START_REGEX:end=END_REGEX
Events have both start and end boundaries
New events start at start pattern, current events end at end pattern
Note: End markers become part of the next event's start
Examples:
-M boundary:start=^BEGIN:end=^END # BEGIN...END blocks
-M boundary:start=^START:end=^STOP # START...STOP blocks
-M boundary:start=^<log:end=^</log> # XML-like boundaries
backslash[:char=C]
Lines ending with continuation character continue the event
Default continuation character is backslash (\)
Examples:
-M backslash # Lines ending with \ continue
-M backslash:char=, # Lines ending with comma continue
-M backslash:char=^ # Lines ending with caret continue
whole
Read entire input as a single event
Useful for processing complete files as single records
Examples:
-M whole # Entire input becomes one event
COMMON USE CASES:
Stack Traces (Java/Python):
-M timestamp # New events start with timestamps
-M indent # Continuation lines are indented
JSON Objects:
-M whole # Single large JSON file
-M timestamp # JSON logs with timestamps per entry
Log Entries with Continuation:
-M backslash # Lines ending with \ continue
-M indent # Indented lines continue previous
Docker/Container Logs:
-M timestamp --extract-prefix container # Container-prefixed with timestamps
SQL Statements:
-M end:;$ # Statements end with semicolon
-M backslash # Line continuation with backslash
PERFORMANCE NOTES:
- Multiline mode buffers events in memory until boundaries are detected
- Use --batch-size to control memory usage in parallel mode
- --take N applies after multiline reconstruction, not to input lines
- Whole strategy loads entire input into memory
For complete CLI reference: kelora --help
For Rhai scripting help: kelora --help-rhai
"#;
println!("{}", help_text);
}