use rhai::Dynamic;
use std::collections::HashMap;
pub use config::{KeloraConfig, MultilineConfig, ScriptStageType, TimestampFilterConfig};
#[derive(Debug, Clone)]
pub struct PipelineConfig {
pub input: PipelineInputConfig,
pub processing: PipelineProcessingConfig,
pub performance: PipelinePerformanceConfig,
pub output_format: OutputFormat,
}
#[derive(Debug, Clone)]
pub struct PipelineInputConfig {
pub files: Vec<String>,
pub format: InputFormat,
pub file_order: FileOrder,
pub skip_lines: usize,
pub ignore_lines: Option<regex::Regex>,
pub multiline: Option<config::MultilineConfig>,
pub ts_field: Option<String>,
pub ts_format: Option<String>,
pub default_timezone: Option<String>,
}
#[derive(Debug, Clone)]
pub struct PipelineProcessingConfig {
pub begin: Option<String>,
pub stages: Vec<ScriptStageType>,
pub end: Option<String>,
pub error_report: config::ErrorReportConfig,
pub levels: Vec<String>,
pub exclude_levels: Vec<String>,
pub window_size: usize,
pub timestamp_filter: Option<config::TimestampFilterConfig>,
pub take_limit: Option<usize>,
pub strict: bool,
pub verbose: bool,
pub quiet: bool,
}
#[derive(Debug, Clone)]
pub struct PipelinePerformanceConfig {
pub parallel: bool,
pub threads: usize,
pub batch_size: Option<usize>,
pub batch_timeout: u64,
pub no_preserve_order: bool,
}
impl PipelineConfig {
pub fn from_kelora_config(config: &KeloraConfig) -> Self {
Self {
input: PipelineInputConfig {
files: config.input.files.clone(),
format: config.input.format.clone().into(),
file_order: config.input.file_order.clone().into(),
skip_lines: config.input.skip_lines,
ignore_lines: config.input.ignore_lines.clone(),
multiline: config.input.multiline.clone(),
ts_field: config.input.ts_field.clone(),
ts_format: config.input.ts_format.clone(),
default_timezone: config.input.default_timezone.clone(),
},
processing: PipelineProcessingConfig {
begin: config.processing.begin.clone(),
stages: config.processing.stages.clone(),
end: config.processing.end.clone(),
error_report: config.processing.error_report.clone(),
levels: config.processing.levels.clone(),
exclude_levels: config.processing.exclude_levels.clone(),
window_size: config.processing.window_size,
timestamp_filter: config.processing.timestamp_filter.clone(),
take_limit: config.processing.take_limit,
strict: config.processing.strict,
verbose: config.processing.verbose,
quiet: config.processing.quiet,
},
performance: PipelinePerformanceConfig {
parallel: config.performance.parallel,
threads: config.performance.threads,
batch_size: config.performance.batch_size,
batch_timeout: config.performance.batch_timeout,
no_preserve_order: config.performance.no_preserve_order,
},
output_format: config.output.format.clone().into(),
}
}
pub fn should_use_parallel(&self) -> bool {
self.performance.parallel
|| self.performance.threads > 0
|| self.performance.batch_size.is_some()
}
pub fn effective_batch_size(&self) -> usize {
self.performance.batch_size.unwrap_or(1000)
}
pub fn effective_threads(&self) -> usize {
if self.performance.threads == 0 {
num_cpus::get()
} else {
self.performance.threads
}
}
}
pub mod cli;
mod colors;
mod config;
mod config_file;
mod decompression;
mod engine;
mod event;
mod formatters;
mod parallel;
mod parsers;
pub mod pipeline;
mod platform;
mod readers;
mod rhai_functions;
mod stats;
mod timestamp;
mod tty;
use crate::decompression::DecompressionReader;
use anyhow::Result;
pub use cli::{Cli, FileOrder, InputFormat, OutputFormat};
fn detect_format_from_peekable_reader<R: BufRead>(
reader: &mut crate::readers::PeekableLineReader<R>,
) -> Result<config::InputFormat> {
match reader.peek_first_line()? {
None => {
Ok(config::InputFormat::Line)
}
Some(line) => {
let trimmed_line = line.trim_end_matches(&['\r', '\n'][..]);
let detected = crate::parsers::detect_format(trimmed_line)?;
Ok(detected)
}
}
}
fn detect_format_for_parallel_mode(files: &[String]) -> Result<InputFormat> {
if files.is_empty() {
let stdin = io::stdin();
let stdin_lock = stdin.lock();
let mut peekable_reader = crate::readers::PeekableLineReader::new(stdin_lock);
match detect_format_from_peekable_reader(&mut peekable_reader)? {
config::InputFormat::Auto => Ok(InputFormat::Line), format => Ok(format.into()),
}
} else {
let sorted_files = pipeline::builders::sort_files(files, &config::FileOrder::None)?;
if sorted_files.is_empty() {
return Ok(InputFormat::Line);
}
let first_file = &sorted_files[0];
let decompressed = DecompressionReader::new(first_file)?;
let mut peekable_reader = crate::readers::PeekableLineReader::new(decompressed);
match detect_format_from_peekable_reader(&mut peekable_reader)? {
config::InputFormat::Auto => Ok(InputFormat::Line), format => Ok(format.into()),
}
}
}
use parallel::{ParallelConfig, ParallelProcessor};
use pipeline::{
create_input_reader, create_pipeline_builder_from_config, create_pipeline_from_config,
};
use platform::{check_termination, SHOULD_TERMINATE};
use stats::{
get_thread_stats, stats_add_error, stats_add_line_filtered, stats_add_line_output,
stats_add_line_read, stats_finish_processing, stats_start_timer, ProcessingStats,
};
use std::io::{self, BufRead, Write};
use std::sync::atomic::Ordering;
#[derive(Debug)]
pub struct PipelineResult {
pub stats: Option<ProcessingStats>,
pub success: bool,
pub tracking_data: HashMap<String, Dynamic>,
}
pub fn run_pipeline<W: Write + Send + 'static>(
config: &PipelineConfig,
output: W,
collect_stats: bool,
) -> Result<PipelineResult> {
if collect_stats {
stats_start_timer();
}
let use_parallel = config.should_use_parallel();
if use_parallel {
run_pipeline_parallel_with_config(config, output)
} else {
let mut output = output;
run_pipeline_sequential_with_config(config, &mut output)?;
let tracking_data = crate::rhai_functions::tracking::get_thread_tracking_state();
let final_stats = if collect_stats {
stats_finish_processing();
let mut stats = get_thread_stats();
stats.extract_discovered_from_tracking(&tracking_data);
Some(stats)
} else {
None
};
Ok(PipelineResult {
stats: final_stats,
success: !SHOULD_TERMINATE.load(Ordering::Relaxed),
tracking_data,
})
}
}
pub fn run_pipeline_with_kelora_config<W: Write + Send + 'static>(
config: &KeloraConfig,
output: W,
) -> Result<PipelineResult> {
if config.output.stats {
stats_start_timer();
}
let use_parallel = config.should_use_parallel();
if use_parallel {
let pipeline_config = PipelineConfig::from_kelora_config(config);
run_pipeline_parallel_with_config(&pipeline_config, output)
} else {
let mut output = output;
run_pipeline_sequential(config, &mut output)?;
let tracking_data = crate::rhai_functions::tracking::get_thread_tracking_state();
stats_finish_processing();
let mut stats = get_thread_stats();
stats.extract_discovered_from_tracking(&tracking_data);
let final_stats = Some(stats);
Ok(PipelineResult {
stats: final_stats,
success: !SHOULD_TERMINATE.load(Ordering::Relaxed),
tracking_data,
})
}
}
pub fn run_pipeline_parallel_with_config<W: Write + Send + 'static>(
config: &PipelineConfig,
output: W,
) -> Result<PipelineResult> {
let final_config = if matches!(config.input.format, InputFormat::Auto) {
let detected_format = detect_format_for_parallel_mode(&config.input.files)?;
if !config.processing.quiet {
let format_name = format!("{:?}", detected_format).to_lowercase();
let use_colors =
crate::tty::should_use_colors_with_mode(&crate::config::ColorMode::Auto);
let no_emoji = std::env::var("NO_EMOJI").is_ok();
let use_emoji = use_colors && !no_emoji;
let prefix = if use_emoji { "🧱" } else { "kelora:" };
eprintln!("{} auto-detected format: {}", prefix, format_name);
}
let mut new_config = config.clone();
new_config.input.format = detected_format;
new_config
} else {
config.clone()
};
let kelora_config = KeloraConfig {
input: config::InputConfig {
files: final_config.input.files.clone(),
format: final_config.input.format.clone().into(),
file_order: final_config.input.file_order.clone().into(),
skip_lines: final_config.input.skip_lines,
ignore_lines: final_config.input.ignore_lines.clone(),
multiline: final_config.input.multiline.clone(),
ts_field: final_config.input.ts_field.clone(),
ts_format: final_config.input.ts_format.clone(),
default_timezone: final_config.input.default_timezone.clone(),
},
output: config::OutputConfig {
format: final_config.output_format.clone().into(),
keys: Vec::new(),
exclude_keys: Vec::new(),
core: false,
brief: false,
color: config::ColorMode::Auto,
no_emoji: false,
no_section_headers: false,
stats: false, metrics: false,
metrics_file: None,
timestamp_formatting: config::TimestampFormatConfig::default(),
},
processing: config::ProcessingConfig {
begin: final_config.processing.begin.clone(),
stages: final_config.processing.stages.clone(),
end: final_config.processing.end.clone(),
error_report: final_config.processing.error_report.clone(),
levels: final_config.processing.levels.clone(),
exclude_levels: final_config.processing.exclude_levels.clone(),
window_size: final_config.processing.window_size,
timestamp_filter: final_config.processing.timestamp_filter.clone(),
take_limit: final_config.processing.take_limit,
strict: final_config.processing.strict,
verbose: final_config.processing.verbose,
quiet: final_config.processing.quiet,
},
performance: config::PerformanceConfig {
parallel: final_config.performance.parallel,
threads: final_config.performance.threads,
batch_size: final_config.performance.batch_size,
batch_timeout: final_config.performance.batch_timeout,
no_preserve_order: final_config.performance.no_preserve_order,
},
};
run_pipeline_parallel(&kelora_config, output)
}
fn run_pipeline_parallel<W: Write + Send + 'static>(
config: &KeloraConfig,
output: W,
) -> Result<PipelineResult> {
let batch_size = config.effective_batch_size();
let parallel_config = ParallelConfig {
num_workers: config.effective_threads(),
batch_size,
batch_timeout_ms: config.performance.batch_timeout,
preserve_order: !config.performance.no_preserve_order,
buffer_size: Some(10000),
};
let processor =
ParallelProcessor::new(parallel_config).with_take_limit(config.processing.take_limit);
let pipeline_builder = create_pipeline_builder_from_config(config);
let (_pipeline, begin_stage, end_stage, mut ctx) = pipeline_builder
.clone()
.build(config.processing.stages.clone())?;
if let Err(e) = begin_stage.execute(&mut ctx) {
return Err(anyhow::anyhow!("Begin stage error: {}", e));
}
let reader = create_input_reader(config)?;
processor.process_with_pipeline(
reader,
pipeline_builder,
config.processing.stages.clone(),
config,
output,
)?;
let parallel_tracked = processor.get_final_tracked_state();
if let Some(ref file_path) = config.processing.error_report.file {
crate::rhai_functions::tracking::write_error_summary_to_file(¶llel_tracked, file_path)
.unwrap_or_else(|e| eprintln!("Failed to write error summary to file: {}", e));
}
processor
.extract_final_stats_from_tracking(¶llel_tracked)
.unwrap_or(());
for (key, dynamic_value) in ¶llel_tracked {
if !key.starts_with("__internal_")
&& !key.starts_with("__kelora_stats_")
&& !key.starts_with("__op___kelora_stats_")
&& !key.starts_with("__kelora_error_")
&& !key.starts_with("__op___kelora_error_")
{
ctx.tracker.insert(key.clone(), dynamic_value.clone());
}
}
if let Err(e) = end_stage.execute(&ctx) {
return Err(anyhow::anyhow!("End stage error: {}", e));
}
Ok(PipelineResult {
stats: Some(processor.get_final_stats()),
success: !SHOULD_TERMINATE.load(Ordering::Relaxed),
tracking_data: parallel_tracked,
})
}
pub fn run_pipeline_sequential_with_config<W: Write>(
config: &PipelineConfig,
output: &mut W,
) -> Result<()> {
let kelora_config = KeloraConfig {
input: config::InputConfig {
files: config.input.files.clone(),
format: config.input.format.clone().into(),
file_order: config.input.file_order.clone().into(),
skip_lines: config.input.skip_lines,
ignore_lines: config.input.ignore_lines.clone(),
multiline: config.input.multiline.clone(),
ts_field: config.input.ts_field.clone(),
ts_format: config.input.ts_format.clone(),
default_timezone: config.input.default_timezone.clone(),
},
output: config::OutputConfig {
format: config.output_format.clone().into(),
keys: Vec::new(),
exclude_keys: Vec::new(),
core: false,
brief: false,
color: config::ColorMode::Auto,
no_emoji: false,
no_section_headers: false,
stats: false, metrics: false,
metrics_file: None,
timestamp_formatting: config::TimestampFormatConfig::default(),
},
processing: config::ProcessingConfig {
begin: config.processing.begin.clone(),
stages: config.processing.stages.clone(),
end: config.processing.end.clone(),
error_report: config.processing.error_report.clone(),
levels: config.processing.levels.clone(),
exclude_levels: config.processing.exclude_levels.clone(),
window_size: config.processing.window_size,
timestamp_filter: config.processing.timestamp_filter.clone(),
take_limit: config.processing.take_limit,
strict: config.processing.strict,
verbose: config.processing.verbose,
quiet: config.processing.quiet,
},
performance: config::PerformanceConfig {
parallel: config.performance.parallel,
threads: config.performance.threads,
batch_size: config.performance.batch_size,
batch_timeout: config.performance.batch_timeout,
no_preserve_order: config.performance.no_preserve_order,
},
};
run_pipeline_sequential(&kelora_config, output)
}
pub fn run_pipeline_sequential<W: Write>(config: &KeloraConfig, output: &mut W) -> Result<()> {
if matches!(config.input.format, config::InputFormat::Auto) {
return run_pipeline_sequential_with_auto_detection(config, output);
}
let (mut pipeline, begin_stage, end_stage, mut ctx) = create_pipeline_from_config(config)?;
if let Err(e) = begin_stage.execute(&mut ctx) {
return Err(anyhow::anyhow!("Begin stage error: {}", e));
}
let mut current_csv_headers: Option<Vec<String>> = None;
let mut last_filename: Option<String> = None;
let mut line_num = 0;
let mut skipped_lines = 0;
if config.input.files.is_empty() {
let stdin = io::stdin();
let reader = stdin.lock();
for line_result in reader.lines() {
if check_termination().is_err() {
return Ok(());
}
match process_line_sequential(
line_result,
&mut line_num,
&mut skipped_lines,
&mut pipeline,
&mut ctx,
config,
output,
None,
&mut current_csv_headers,
&mut last_filename,
)? {
ProcessingResult::Continue => {}
ProcessingResult::TakeLimitExhausted => break,
}
if crate::rhai_functions::process::is_exit_requested() {
let exit_code = crate::rhai_functions::process::get_exit_code();
std::process::exit(exit_code);
}
}
} else {
let sorted_files =
pipeline::builders::sort_files(&config.input.files, &config.input.file_order)?;
let mut multi_reader = crate::readers::MultiFileReader::new(sorted_files)?;
let mut line_buf = String::new();
loop {
if check_termination().is_err() {
return Ok(());
}
line_buf.clear();
let bytes_read = match multi_reader.read_line(&mut line_buf) {
Ok(0) => break, Ok(n) => n,
Err(e) => {
let line_result = Err(e);
let current_filename = multi_reader.current_filename().map(|s| s.to_string());
match process_line_sequential(
line_result,
&mut line_num,
&mut skipped_lines,
&mut pipeline,
&mut ctx,
config,
output,
current_filename,
&mut current_csv_headers,
&mut last_filename,
)? {
ProcessingResult::Continue => {}
ProcessingResult::TakeLimitExhausted => break,
}
if crate::rhai_functions::process::is_exit_requested() {
let exit_code = crate::rhai_functions::process::get_exit_code();
std::process::exit(exit_code);
}
continue;
}
};
if bytes_read > 0 {
let current_filename = multi_reader.current_filename().map(|s| s.to_string());
match process_line_sequential(
Ok(line_buf.clone()),
&mut line_num,
&mut skipped_lines,
&mut pipeline,
&mut ctx,
config,
output,
current_filename,
&mut current_csv_headers,
&mut last_filename,
)? {
ProcessingResult::Continue => {}
ProcessingResult::TakeLimitExhausted => break,
}
if crate::rhai_functions::process::is_exit_requested() {
let exit_code = crate::rhai_functions::process::get_exit_code();
std::process::exit(exit_code);
}
}
}
}
let results = pipeline.flush(&mut ctx)?;
for result in results {
if !result.is_empty() {
writeln!(output, "{}", result)?;
}
}
if let Err(e) = end_stage.execute(&ctx) {
return Err(anyhow::anyhow!("End stage error: {}", e));
}
crate::rhai_functions::tracking::merge_thread_tracking_to_context(&mut ctx);
if let Some(ref file_path) = config.processing.error_report.file {
crate::rhai_functions::tracking::write_error_summary_to_file(&ctx.tracker, file_path)
.unwrap_or_else(|e| eprintln!("Failed to write error summary to file: {}", e));
}
Ok(())
}
fn run_pipeline_sequential_with_auto_detection<W: Write>(
config: &KeloraConfig,
output: &mut W,
) -> Result<()> {
if config.input.files.is_empty() {
let stdin = io::stdin();
let stdin_lock = stdin.lock();
let mut peekable_reader = crate::readers::PeekableLineReader::new(stdin_lock);
let detected_format = detect_format_from_peekable_reader(&mut peekable_reader)?;
if !config.processing.quiet {
let format_name = format!("{:?}", detected_format).to_lowercase();
let message =
config.format_error_message(&format!("auto-detected format: {}", format_name));
eprintln!("{}", message);
}
let mut final_config = config.clone();
final_config.input.format = detected_format;
let (mut pipeline, begin_stage, end_stage, mut ctx) =
create_pipeline_from_config(&final_config)?;
if let Err(e) = begin_stage.execute(&mut ctx) {
return Err(anyhow::anyhow!("Begin stage error: {}", e));
}
run_sequential_with_reader(
&mut peekable_reader,
&mut pipeline,
&mut ctx,
&final_config,
output,
None, )?;
if let Err(e) = end_stage.execute(&ctx) {
return Err(anyhow::anyhow!("End stage error: {}", e));
}
crate::rhai_functions::tracking::merge_thread_tracking_to_context(&mut ctx);
if let Some(ref file_path) = final_config.processing.error_report.file {
crate::rhai_functions::tracking::write_error_summary_to_file(&ctx.tracker, file_path)
.unwrap_or_else(|e| eprintln!("Failed to write error summary to file: {}", e));
}
} else {
let sorted_files =
pipeline::builders::sort_files(&config.input.files, &config.input.file_order)?;
if sorted_files.is_empty() {
return Ok(());
}
let first_file = &sorted_files[0];
let detected_format = {
let decompressed = DecompressionReader::new(first_file)?;
let mut peekable_reader = crate::readers::PeekableLineReader::new(decompressed);
detect_format_from_peekable_reader(&mut peekable_reader)?
};
if !config.processing.quiet {
let format_name = format!("{:?}", detected_format).to_lowercase();
let message =
config.format_error_message(&format!("auto-detected format: {}", format_name));
eprintln!("{}", message);
}
let mut final_config = config.clone();
final_config.input.format = detected_format;
let (mut pipeline, begin_stage, end_stage, mut ctx) =
create_pipeline_from_config(&final_config)?;
if let Err(e) = begin_stage.execute(&mut ctx) {
return Err(anyhow::anyhow!("Begin stage error: {}", e));
}
let mut multi_reader = crate::readers::MultiFileReader::new(sorted_files)?;
run_sequential_with_multi_reader(
&mut multi_reader,
&mut pipeline,
&mut ctx,
&final_config,
output,
)?;
if let Err(e) = end_stage.execute(&ctx) {
return Err(anyhow::anyhow!("End stage error: {}", e));
}
crate::rhai_functions::tracking::merge_thread_tracking_to_context(&mut ctx);
if let Some(ref file_path) = final_config.processing.error_report.file {
crate::rhai_functions::tracking::write_error_summary_to_file(&ctx.tracker, file_path)
.unwrap_or_else(|e| eprintln!("Failed to write error summary to file: {}", e));
}
}
Ok(())
}
fn run_sequential_with_reader<W: Write, R: BufRead>(
reader: &mut R,
pipeline: &mut pipeline::Pipeline,
ctx: &mut pipeline::PipelineContext,
config: &KeloraConfig,
output: &mut W,
multi_reader: Option<&mut crate::readers::MultiFileReader>, ) -> Result<()> {
let mut current_csv_headers: Option<Vec<String>> = None;
let mut last_filename: Option<String> = None;
let mut line_num = 0;
let mut skipped_lines = 0;
let mut line_buf = String::new();
loop {
if check_termination().is_err() {
return Ok(());
}
line_buf.clear();
let bytes_read = match reader.read_line(&mut line_buf) {
Ok(0) => break, Ok(n) => n,
Err(e) => {
let line_result = Err(e);
let current_filename = multi_reader
.as_ref()
.and_then(|mr| mr.current_filename().map(|s| s.to_string()));
match process_line_sequential(
line_result,
&mut line_num,
&mut skipped_lines,
pipeline,
ctx,
config,
output,
current_filename,
&mut current_csv_headers,
&mut last_filename,
)? {
ProcessingResult::Continue => {}
ProcessingResult::TakeLimitExhausted => break,
}
if crate::rhai_functions::process::is_exit_requested() {
let exit_code = crate::rhai_functions::process::get_exit_code();
std::process::exit(exit_code);
}
continue;
}
};
if bytes_read > 0 {
let current_filename = multi_reader
.as_ref()
.and_then(|mr| mr.current_filename().map(|s| s.to_string()));
match process_line_sequential(
Ok(line_buf.clone()),
&mut line_num,
&mut skipped_lines,
pipeline,
ctx,
config,
output,
current_filename,
&mut current_csv_headers,
&mut last_filename,
)? {
ProcessingResult::Continue => {}
ProcessingResult::TakeLimitExhausted => break,
}
if crate::rhai_functions::process::is_exit_requested() {
let exit_code = crate::rhai_functions::process::get_exit_code();
std::process::exit(exit_code);
}
}
}
let results = pipeline.flush(ctx)?;
for result in results {
if !result.is_empty() {
writeln!(output, "{}", result)?;
}
}
Ok(())
}
fn run_sequential_with_multi_reader<W: Write>(
multi_reader: &mut crate::readers::MultiFileReader,
pipeline: &mut pipeline::Pipeline,
ctx: &mut pipeline::PipelineContext,
config: &KeloraConfig,
output: &mut W,
) -> Result<()> {
let mut current_csv_headers: Option<Vec<String>> = None;
let mut last_filename: Option<String> = None;
let mut line_num = 0;
let mut skipped_lines = 0;
let mut line_buf = String::new();
loop {
if check_termination().is_err() {
return Ok(());
}
line_buf.clear();
let bytes_read = match multi_reader.read_line(&mut line_buf) {
Ok(0) => break, Ok(n) => n,
Err(e) => {
let line_result = Err(e);
let current_filename = multi_reader.current_filename().map(|s| s.to_string());
match process_line_sequential(
line_result,
&mut line_num,
&mut skipped_lines,
pipeline,
ctx,
config,
output,
current_filename,
&mut current_csv_headers,
&mut last_filename,
)? {
ProcessingResult::Continue => {}
ProcessingResult::TakeLimitExhausted => break,
}
if crate::rhai_functions::process::is_exit_requested() {
let exit_code = crate::rhai_functions::process::get_exit_code();
std::process::exit(exit_code);
}
continue;
}
};
if bytes_read > 0 {
let current_filename = multi_reader.current_filename().map(|s| s.to_string());
match process_line_sequential(
Ok(line_buf.clone()),
&mut line_num,
&mut skipped_lines,
pipeline,
ctx,
config,
output,
current_filename,
&mut current_csv_headers,
&mut last_filename,
)? {
ProcessingResult::Continue => {}
ProcessingResult::TakeLimitExhausted => break,
}
if crate::rhai_functions::process::is_exit_requested() {
let exit_code = crate::rhai_functions::process::get_exit_code();
std::process::exit(exit_code);
}
}
}
let results = pipeline.flush(ctx)?;
for result in results {
if !result.is_empty() {
writeln!(output, "{}", result)?;
}
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
enum ProcessingResult {
Continue,
TakeLimitExhausted,
}
#[allow(clippy::too_many_arguments)]
fn process_line_sequential<W: Write>(
line_result: io::Result<String>,
line_num: &mut usize,
skipped_lines: &mut usize,
pipeline: &mut pipeline::Pipeline,
ctx: &mut pipeline::PipelineContext,
config: &KeloraConfig,
output: &mut W,
current_filename: Option<String>,
current_csv_headers: &mut Option<Vec<String>>,
last_filename: &mut Option<String>,
) -> Result<ProcessingResult> {
let line = line_result?;
*line_num += 1;
if config.output.stats {
stats_add_line_read();
}
if *skipped_lines < config.input.skip_lines {
*skipped_lines += 1;
if config.output.stats {
stats_add_line_filtered();
}
return Ok(ProcessingResult::Continue);
}
if let Some(ref ignore_regex) = config.input.ignore_lines {
if ignore_regex.is_match(&line) {
if config.output.stats {
stats_add_line_filtered();
}
return Ok(ProcessingResult::Continue);
}
}
if line.trim().is_empty() {
if !matches!(config.input.format, config::InputFormat::Line) {
return Ok(ProcessingResult::Continue);
}
}
if matches!(
config.input.format,
config::InputFormat::Csv
| config::InputFormat::Tsv
| config::InputFormat::Csvnh
| config::InputFormat::Tsvnh
) && (current_filename != *last_filename
|| (current_filename.is_none() && current_csv_headers.is_none()))
{
let mut temp_parser = match config.input.format {
config::InputFormat::Csv => crate::parsers::CsvParser::new_csv(),
config::InputFormat::Tsv => crate::parsers::CsvParser::new_tsv(),
config::InputFormat::Csvnh => crate::parsers::CsvParser::new_csv_no_headers(),
config::InputFormat::Tsvnh => crate::parsers::CsvParser::new_tsv_no_headers(),
_ => unreachable!(),
};
let was_consumed = temp_parser.initialize_headers_from_line(&line)?;
let headers = temp_parser.get_headers();
*current_csv_headers = Some(headers.clone());
*last_filename = current_filename.clone();
let mut pipeline_builder = create_pipeline_builder_from_config(config);
pipeline_builder = pipeline_builder.with_csv_headers(headers);
let (new_pipeline, _unused_begin_stage, _unused_end_stage, new_ctx) =
pipeline_builder.build(config.processing.stages.clone())?;
*pipeline = new_pipeline;
ctx.rhai = new_ctx.rhai;
if was_consumed {
return Ok(ProcessingResult::Continue);
}
}
ctx.meta.line_number = Some(*line_num);
ctx.meta.filename = current_filename;
match pipeline.process_line(line, ctx) {
Ok(results) => {
if config.output.stats && !results.is_empty() {
stats_add_line_output();
}
for result in results {
if !result.is_empty() {
writeln!(output, "{}", result)?;
}
}
if pipeline.is_take_limit_exhausted() {
return Ok(ProcessingResult::TakeLimitExhausted);
}
}
Err(e) => {
if config.output.stats {
stats_add_error();
}
if config.processing.strict {
return Err(e);
}
}
}
Ok(ProcessingResult::Continue)
}