#![allow(dead_code)] use clap::ValueEnum;
use crate::config_file::ConfigExpansionInfo;
#[derive(Debug, Clone)]
pub struct KeloraConfig {
pub input: InputConfig,
pub output: OutputConfig,
pub processing: ProcessingConfig,
pub performance: PerformanceConfig,
}
#[derive(Debug, Clone)]
pub struct InputConfig {
pub files: Vec<String>,
pub no_input: bool,
pub format: InputFormat,
pub file_order: FileOrder,
pub merge_ts: bool,
pub skip_lines: usize,
pub head_lines: Option<usize>,
pub section: Option<SectionConfig>,
pub ignore_lines: Option<regex::Regex>,
pub keep_lines: Option<regex::Regex>,
pub multiline: Option<MultilineConfig>,
pub ts_field: Option<String>,
pub ts_format: Option<String>,
pub default_timezone: Option<String>,
pub extract_prefix: Option<String>,
pub prefix_sep: String,
pub cols_sep: Option<String>,
}
#[derive(Debug, Clone)]
pub struct OutputConfig {
pub format: OutputFormat,
pub keys: Vec<String>,
pub exclude_keys: Vec<String>,
pub core: bool,
pub brief: bool,
pub wrap: bool,
pub pretty: bool,
pub color: ColorMode,
pub emoji: EmojiMode,
pub stats: Option<crate::cli::StatsFormat>,
pub stats_with_events: bool,
pub metrics: Option<crate::cli::MetricsFormat>,
pub metrics_with_events: bool,
pub metrics_file: Option<String>,
pub drain: Option<crate::cli::DrainFormat>,
pub discover_fields: Option<crate::cli::DiscoverFieldsFormat>,
pub discover_final: bool,
pub mark_gaps: Option<chrono::Duration>,
pub timestamp_formatting: TimestampFormatConfig,
}
#[derive(Debug, Clone)]
pub enum ScriptStageType {
Filter {
script: String,
includes: Vec<IncludeFile>,
},
Exec(String),
Assert(String),
LevelFilter {
include: Vec<String>,
exclude: Vec<String>,
},
}
#[derive(Debug, Clone)]
pub struct IncludeFile {
pub path: String,
pub content: String,
}
#[derive(Debug, Clone)]
pub struct ErrorReportConfig {
pub style: ErrorReportStyle,
}
#[derive(Debug, Clone)]
pub enum ErrorReportStyle {
Off,
Summary,
Print,
}
#[derive(Debug, Clone)]
pub struct ContextConfig {
pub before_context: usize,
pub after_context: usize,
pub enabled: bool,
}
impl ContextConfig {
pub fn new(before_context: usize, after_context: usize) -> Self {
Self {
before_context,
after_context,
enabled: before_context > 0 || after_context > 0,
}
}
pub fn disabled() -> Self {
Self {
before_context: 0,
after_context: 0,
enabled: false,
}
}
pub fn is_active(&self) -> bool {
self.enabled && (self.before_context > 0 || self.after_context > 0)
}
pub fn required_window_size(&self) -> usize {
if self.is_active() {
self.before_context + self.after_context + 1
} else {
0
}
}
}
#[derive(Debug, Clone)]
pub struct ProcessingConfig {
pub begin: Option<String>,
pub stages: Vec<ScriptStageType>,
pub end: Option<String>,
pub error_report: ErrorReportConfig,
pub levels: Vec<String>,
pub exclude_levels: Vec<String>,
pub window_size: usize,
pub timestamp_filter: Option<TimestampFilterConfig>,
pub normalize_timestamps: bool,
pub take_limit: Option<usize>,
pub strict: bool,
pub span: Option<SpanConfig>,
pub verbose: u8,
pub quiet_events: bool,
pub suppress_diagnostics: bool,
pub silent: bool,
pub suppress_script_output: bool,
pub quiet_level: u8,
pub context: ContextConfig,
pub allow_fs_writes: bool,
}
#[derive(Debug, Clone)]
pub struct PerformanceConfig {
pub parallel: bool,
pub threads: usize,
pub batch_size: Option<usize>,
pub batch_timeout: u64,
pub no_preserve_order: bool,
}
#[derive(Debug, Clone)]
pub enum SpanMode {
Count { events_per_span: usize },
Time { duration_ms: i64 },
Field { field_name: String },
Idle { timeout_ms: i64 },
}
#[derive(Debug, Clone)]
pub struct SpanConfig {
pub mode: SpanMode,
pub close_script: Option<String>,
}
#[derive(Clone, Debug, PartialEq)]
pub enum InputFormat {
Auto,
AutoPerFile,
Json,
Line,
Raw,
Logfmt,
Syslog,
Cef,
Csv(Option<String>), Tsv(Option<String>), Csvnh, Tsvnh, Combined,
Cols(String), Regex(String), Cascade(Vec<InputFormat>),
}
impl InputFormat {
pub fn to_display_string(&self) -> String {
match self {
InputFormat::Auto => "auto".to_string(),
InputFormat::AutoPerFile => "auto-per-file".to_string(),
InputFormat::Json => "json".to_string(),
InputFormat::Line => "line".to_string(),
InputFormat::Raw => "raw".to_string(),
InputFormat::Logfmt => "logfmt".to_string(),
InputFormat::Syslog => "syslog".to_string(),
InputFormat::Cef => "cef".to_string(),
InputFormat::Csv(_) => "csv".to_string(),
InputFormat::Tsv(_) => "tsv".to_string(),
InputFormat::Csvnh => "csvnh".to_string(),
InputFormat::Tsvnh => "tsvnh".to_string(),
InputFormat::Combined => "combined".to_string(),
InputFormat::Cols(_) => "cols".to_string(),
InputFormat::Regex(_) => "regex".to_string(),
InputFormat::Cascade(formats) => {
let names: Vec<String> = formats.iter().map(|f| f.to_display_string()).collect();
format!("cascade({})", names.join(","))
}
}
}
pub fn is_cascade(&self) -> bool {
matches!(self, InputFormat::Cascade(_))
}
pub fn cascade_name(&self) -> &'static str {
match self {
InputFormat::Auto => "auto",
InputFormat::AutoPerFile => "auto-per-file",
InputFormat::Json => "json",
InputFormat::Line => "line",
InputFormat::Raw => "raw",
InputFormat::Logfmt => "logfmt",
InputFormat::Syslog => "syslog",
InputFormat::Cef => "cef",
InputFormat::Csv(_) => "csv",
InputFormat::Tsv(_) => "tsv",
InputFormat::Csvnh => "csvnh",
InputFormat::Tsvnh => "tsvnh",
InputFormat::Combined => "combined",
InputFormat::Cols(_) => "cols",
InputFormat::Regex(_) => "regex",
InputFormat::Cascade(_) => "cascade",
}
}
}
#[derive(ValueEnum, Clone, Debug, Default, PartialEq)]
pub enum OutputFormat {
Json,
#[default]
Default,
Logfmt,
Inspect,
Levelmap,
Keymap,
Tailmap,
Csv,
Tsv,
Csvnh,
Tsvnh,
}
#[derive(ValueEnum, Clone, Debug)]
pub enum FileOrder {
Cli,
Name,
Mtime,
}
#[derive(ValueEnum, Clone, Debug)]
pub enum ColorMode {
Auto,
Always,
Never,
}
#[derive(Clone, Debug)]
pub enum EmojiMode {
Auto,
Always,
Never,
}
#[derive(Debug, Clone)]
pub struct TimestampFilterConfig {
pub since: Option<chrono::DateTime<chrono::Utc>>,
pub until: Option<chrono::DateTime<chrono::Utc>>,
}
#[derive(Debug, Clone, Default)]
pub struct TimestampFormatConfig {
pub format_fields: Vec<String>,
pub auto_format_all: bool,
pub format_as_utc: bool,
pub parse_format_hint: Option<String>,
pub parse_timezone_hint: Option<String>,
}
#[derive(Debug, Clone)]
pub struct MultilineConfig {
pub strategy: MultilineStrategy,
pub join: MultilineJoin,
}
#[derive(Debug, Clone)]
pub enum MultilineStrategy {
Timestamp { chrono_format: Option<String> },
Indent,
Regex { start: String, end: Option<String> },
All,
}
#[derive(ValueEnum, Clone, Copy, Debug, Default, PartialEq)]
pub enum MultilineJoin {
#[default]
Space,
Newline,
Empty,
}
#[derive(Debug, Clone)]
pub struct SectionConfig {
pub start: Option<SectionStart>,
pub end: Option<SectionEnd>,
pub max_sections: i64,
}
#[derive(Debug, Clone)]
pub enum SectionStart {
From(regex::Regex),
After(regex::Regex),
}
#[derive(Debug, Clone)]
pub enum SectionEnd {
Before(regex::Regex),
Through(regex::Regex),
}
impl MultilineConfig {
pub fn parse(value: &str) -> Result<Self, String> {
if value.trim().is_empty() {
return Err("Empty multiline configuration".to_string());
}
let mut segments = value.split(':');
let strategy_name = segments
.next()
.ok_or_else(|| "Empty multiline configuration".to_string())?;
let strategy = match strategy_name {
"timestamp" => {
let mut chrono_format: Option<String> = None;
for segment in segments {
if let Some(format) = segment.strip_prefix("format=") {
if chrono_format.replace(format.to_string()).is_some() {
return Err("timestamp:format specified more than once".to_string());
}
} else {
return Err(format!(
"Unknown timestamp option: {} (supported: format=...)",
segment
));
}
}
MultilineStrategy::Timestamp { chrono_format }
}
"indent" => {
if segments.next().is_some() {
return Err("indent does not accept options".to_string());
}
MultilineStrategy::Indent
}
"regex" => {
let mut start_pattern: Option<String> = None;
let mut end_pattern: Option<String> = None;
for segment in segments {
if let Some(pattern) = segment.strip_prefix("match=") {
if start_pattern.replace(pattern.to_string()).is_some() {
return Err("regex:match specified more than once".to_string());
}
} else if let Some(pattern) = segment.strip_prefix("end=") {
if end_pattern.replace(pattern.to_string()).is_some() {
return Err("regex:end specified more than once".to_string());
}
} else {
return Err(format!(
"Unknown regex option: {} (supported: match=..., end=...)",
segment
));
}
}
let start = start_pattern.ok_or_else(|| {
"regex strategy requires match=REGEX (e.g. regex:match=^PID=)".to_string()
})?;
MultilineStrategy::Regex {
start,
end: end_pattern,
}
}
"all" => {
if segments.next().is_some() {
return Err("all does not accept options".to_string());
}
MultilineStrategy::All
}
other => {
return Err(format!(
"Unknown multiline strategy: {} (supported: timestamp, indent, regex, all)",
other
));
}
};
Ok(MultilineConfig {
strategy,
join: MultilineJoin::Space,
})
}
}
impl Default for MultilineConfig {
fn default() -> Self {
Self {
strategy: MultilineStrategy::Timestamp {
chrono_format: None,
},
join: MultilineJoin::Space,
}
}
}
impl KeloraConfig {
pub fn get_core_field_names() -> Vec<String> {
let mut core_fields = Vec::new();
core_fields.extend(
crate::event::TIMESTAMP_FIELD_NAMES
.iter()
.map(|s| s.to_string()),
);
core_fields.extend(
crate::event::LEVEL_FIELD_NAMES
.iter()
.map(|s| s.to_string()),
);
core_fields.extend(
crate::event::MESSAGE_FIELD_NAMES
.iter()
.map(|s| s.to_string()),
);
core_fields
}
pub fn format_error_message(&self, message: &str) -> String {
let use_emoji =
crate::tty::should_use_emoji_with_mode(&self.output.emoji, &self.output.color);
if use_emoji {
format!("⚠️ {}", message)
} else {
format!("kelora: {}", message)
}
}
pub fn format_info_message(&self, message: &str) -> String {
let use_emoji =
crate::tty::should_use_emoji_with_mode(&self.output.emoji, &self.output.color);
if use_emoji {
format!("🔹 {}", message)
} else {
format!("kelora: {}", message)
}
}
pub fn format_hint_message(&self, message: &str) -> String {
let use_emoji =
crate::tty::should_use_emoji_with_mode(&self.output.emoji, &self.output.color);
if use_emoji {
format!("💡 {}", message)
} else {
format!("kelora hint: {}", message)
}
}
pub fn format_warning_message(&self, message: &str) -> String {
let use_emoji =
crate::tty::should_use_emoji_with_mode(&self.output.emoji, &self.output.color);
if use_emoji {
format!("🔸 {}", message)
} else {
format!("kelora warning: {}", message)
}
}
pub fn format_stats_message(&self, message: &str, with_header: bool) -> String {
let use_emoji =
crate::tty::should_use_emoji_with_mode(&self.output.emoji, &self.output.color);
if with_header {
if use_emoji {
format!("\n📈 Stats:\n{}", message)
} else {
format!("\nkelora: Stats:\n{}", message)
}
} else {
format!("\n{}", message)
}
}
pub fn format_metrics_message(&self, message: &str, with_header: bool) -> String {
let use_emoji =
crate::tty::should_use_emoji_with_mode(&self.output.emoji, &self.output.color);
if with_header {
if use_emoji {
format!("\n📊 Tracked metrics:\n{}", message)
} else {
format!("\nkelora: Tracked metrics:\n{}", message)
}
} else {
format!("\n{}", message)
}
}
pub fn display_config_expansion(
info: &ConfigExpansionInfo,
config: &KeloraConfig,
stderr: &mut crate::platform::SafeStderr,
) {
if config.processing.suppress_diagnostics || config.processing.silent {
return;
}
let show_verbose_details = config.processing.verbose > 0 || info.explicit_config_path;
let show_loaded_path = show_verbose_details;
let show_defaults = show_verbose_details;
let show_aliases = show_verbose_details || !info.expanded_aliases.is_empty();
let has_content = (show_loaded_path && info.loaded_config_path.is_some())
|| (show_defaults && info.applied_defaults.is_some())
|| (show_aliases && !info.expanded_aliases.is_empty());
if !has_content {
return;
}
let mut lines = Vec::new();
if show_loaded_path {
if let Some(path) = &info.loaded_config_path {
let msg = config.format_info_message(&format!("Config: {}", path.display()));
lines.push(msg);
}
}
if show_defaults {
if let Some(defaults) = &info.applied_defaults {
let msg = config.format_info_message(&format!(" Defaults: {}", defaults));
lines.push(msg);
}
}
if show_aliases {
for (alias_name, expansion) in &info.expanded_aliases {
let msg = config
.format_info_message(&format!(" Alias: -a {} → {}", alias_name, expansion));
lines.push(msg);
}
}
for line in lines {
stderr.writeln(&line).unwrap_or(());
}
}
}
pub fn format_error_message_auto(message: &str) -> String {
let use_emoji = crate::tty::should_use_emoji_for_stderr();
if use_emoji {
format!("⚠️ {}", message)
} else {
format!("kelora: {}", message)
}
}
pub fn format_warning_message_auto(message: &str) -> String {
let use_emoji = crate::tty::should_use_emoji_for_stderr();
if use_emoji {
format!("🔸 {}", message)
} else {
format!("kelora warning: {}", message)
}
}
pub fn format_hint_message_auto(message: &str) -> String {
let use_emoji = crate::tty::should_use_emoji_for_stderr();
if use_emoji {
format!("💡 {}", message)
} else {
format!("kelora hint: {}", message)
}
}
pub fn format_input_open_error(path: &str, err: &str) -> String {
let mut message = format!("Failed to open file '{}': {}", path, err);
let looks_like_glob = path.contains('*') || path.contains('?') || path.contains('[');
let missing_file = err.contains("No such file")
|| err.contains("not found")
|| err.contains("cannot find the path");
if looks_like_glob && missing_file {
message.push_str(
". Shell glob patterns must be expanded by the shell; remove the quotes or use interactive mode for glob expansion",
);
}
message
}
pub fn format_verbose_error(line_num: Option<usize>, error_type: &str, message: &str) -> String {
format_verbose_error_with_config(line_num, error_type, message, None)
}
pub fn format_verbose_error_with_config(
line_num: Option<usize>,
error_type: &str,
message: &str,
config: Option<&KeloraConfig>,
) -> String {
let use_emoji = if let Some(cfg) = config {
crate::tty::should_use_emoji_with_mode(&cfg.output.emoji, &cfg.output.color)
} else {
crate::tty::should_use_emoji_for_stderr()
};
let prefix = if use_emoji { "⚠️ " } else { "kelora: " };
if let Some(line) = line_num {
format!("{}line {}: {} - {}", prefix, line, error_type, message)
} else {
format!("{}{} - {}", prefix, error_type, message)
}
}
pub fn print_verbose_error_to_stderr(
line_num: Option<usize>,
error_type: &str,
message: &str,
config: Option<&KeloraConfig>,
) {
if let Some(cfg) = config {
if cfg.processing.silent || cfg.processing.suppress_diagnostics {
return;
}
}
let formatted = format_verbose_error_with_config(line_num, error_type, message, config);
eprintln!("{}", formatted);
}
pub fn print_verbose_error_to_stderr_pipeline(
line_num: Option<usize>,
error_type: &str,
message: &str,
config: Option<&crate::pipeline::PipelineConfig>,
) {
if let Some(cfg) = config {
if cfg.silent || cfg.suppress_diagnostics {
return;
}
}
let formatted =
format_verbose_error_with_pipeline_config(line_num, error_type, message, config);
eprintln!("{}", formatted);
}
pub fn format_verbose_error_with_pipeline_config(
line_num: Option<usize>,
error_type: &str,
message: &str,
config: Option<&crate::pipeline::PipelineConfig>,
) -> String {
let use_emoji = if let Some(cfg) = config {
crate::tty::should_use_emoji_with_mode(&cfg.emoji_mode, &cfg.color_mode)
} else {
crate::tty::should_use_emoji_for_stderr()
};
let prefix = if use_emoji { "⚠️ " } else { "kelora: " };
if let Some(line) = line_num {
format!("{}line {}: {} - {}", prefix, line, error_type, message)
} else {
format!("{}{} - {}", prefix, error_type, message)
}
}
pub fn format_error_line(line: &str) -> String {
if line.chars().any(|c| c.is_control() && c != '\n') {
format!("{:?}", line) } else if line.ends_with('\n') {
line.trim_end().to_string() } else {
line.to_string() }
}
impl OutputConfig {
pub fn get_effective_keys(&self) -> Vec<String> {
if self.core {
let mut keys = KeloraConfig::get_core_field_names();
for key in &self.keys {
if !keys.contains(key) {
keys.push(key.clone());
}
}
keys
} else {
self.keys.clone()
}
}
}
impl KeloraConfig {
pub fn from_cli(cli: &crate::Cli) -> anyhow::Result<Self> {
let color_mode = if cli.no_color {
ColorMode::Never
} else if cli.force_color {
ColorMode::Always
} else {
ColorMode::Auto
};
let emoji_mode = if cli.no_emoji {
EmojiMode::Never
} else if cli.force_emoji {
EmojiMode::Always
} else {
EmojiMode::Auto
};
let default_timezone = determine_default_timezone(cli);
let mut quiet_events = cli.quiet;
let mut suppress_diagnostics = if cli.diagnostics {
false
} else if cli.no_diagnostics {
true
} else {
false };
let mut silent = cli.silent;
if cli.no_silent {
silent = false;
}
let mut suppress_script_output = if cli.script_output {
false
} else if cli.no_script_output {
true
} else {
false };
let flatten_levels = |values: &[String]| -> Vec<String> {
values
.iter()
.flat_map(|value| value.split(','))
.map(|part| part.trim())
.filter(|part| !part.is_empty())
.map(|part| part.to_string())
.collect()
};
let include_levels = flatten_levels(&cli.levels);
let exclude_levels = flatten_levels(&cli.exclude_levels);
let stats_format = if cli.no_stats {
None
} else if cli.stats.is_some() {
cli.stats.clone()
} else if cli.with_stats {
Some(crate::cli::StatsFormat::Table)
} else {
None
};
let stats_with_events = cli.with_stats;
let suppress_events_for_stats = stats_format.is_some() && !stats_with_events;
let metrics_format = if cli.no_metrics {
None
} else if cli.metrics.is_some() {
cli.metrics.clone()
} else if cli.with_metrics {
Some(crate::cli::MetricsFormat::Full)
} else {
None
};
let metrics_with_events = cli.with_metrics;
let suppress_events_for_metrics = metrics_format.is_some() && !metrics_with_events;
let suppress_events_for_drain = cli.drain.is_some();
let discover_fields = cli
.discover_fields
.clone()
.or(cli.discover_final_fields.clone());
let suppress_events_for_discover = discover_fields.is_some();
if suppress_events_for_stats
|| suppress_events_for_metrics
|| suppress_events_for_drain
|| suppress_events_for_discover
{
quiet_events = true;
}
let output_format = if cli.json_output {
OutputFormat::Json
} else {
cli.output_format.clone().into()
};
if suppress_events_for_stats {
suppress_script_output = true;
}
if suppress_events_for_metrics {
suppress_diagnostics = true;
suppress_script_output = true;
}
if suppress_events_for_drain {
suppress_diagnostics = true;
suppress_script_output = true;
}
if suppress_events_for_discover {
suppress_diagnostics = true;
suppress_script_output = true;
}
if silent {
quiet_events = true;
}
let metrics_file = cli.metrics_file.clone();
let quiet_level = if suppress_script_output {
3
} else if suppress_diagnostics || silent {
1
} else {
0
};
let verbose_level = if suppress_diagnostics || silent {
0
} else {
cli.verbose
};
Ok(Self {
input: InputConfig {
files: cli.files.clone(),
no_input: cli.no_input,
format: if cli.json_input {
InputFormat::Json
} else {
parse_input_format_from_cli(cli)?
},
file_order: cli.file_order.clone().into(),
merge_ts: cli.merge_ts,
skip_lines: cli.skip_lines.unwrap_or(0),
head_lines: cli.head,
section: None, ignore_lines: None, keep_lines: None, multiline: None, ts_field: cli.ts_field.clone(),
ts_format: cli.ts_format.clone(),
default_timezone: default_timezone.clone(),
extract_prefix: cli.extract_prefix.clone(),
prefix_sep: cli.prefix_sep.clone(),
cols_sep: cli.cols_sep.clone(),
},
output: OutputConfig {
format: output_format,
keys: cli.keys.clone(),
exclude_keys: cli.exclude_keys.clone(),
core: cli.core,
brief: cli.brief,
wrap: !cli.no_wrap, pretty: cli.expand_nested,
color: color_mode,
emoji: emoji_mode,
stats: stats_format,
stats_with_events,
metrics: metrics_format,
metrics_with_events,
metrics_file,
drain: cli.drain.clone(),
discover_fields,
discover_final: cli.discover_final_fields.is_some(),
mark_gaps: None,
timestamp_formatting: create_timestamp_format_config(cli, default_timezone.clone()),
},
processing: ProcessingConfig {
begin: cli.begin.clone(),
stages: Vec::new(), end: cli.end.clone(),
error_report: parse_error_report_config(cli),
levels: include_levels,
exclude_levels,
span: parse_span_config(cli)?,
window_size: cli.window_size.unwrap_or(0),
timestamp_filter: None, normalize_timestamps: cli.normalize_ts,
take_limit: cli.take,
strict: cli.strict,
verbose: verbose_level,
quiet_events,
suppress_diagnostics,
silent,
suppress_script_output,
quiet_level,
context: create_context_config(cli)?,
allow_fs_writes: cli.allow_fs_writes,
},
performance: PerformanceConfig {
parallel: cli.parallel,
threads: cli.threads,
batch_size: cli.batch_size,
batch_timeout: cli.batch_timeout,
no_preserve_order: cli.no_preserve_order,
},
})
}
pub fn should_use_parallel(&self) -> bool {
if self.processing.span.is_some() {
return false;
}
self.performance.parallel
|| self.performance.threads > 0
|| self.performance.batch_size.is_some()
}
pub fn effective_batch_size(&self) -> usize {
self.performance.batch_size.unwrap_or(1000)
}
pub fn effective_threads(&self) -> usize {
if self.performance.threads == 0 {
num_cpus::get()
} else {
self.performance.threads
}
}
}
impl Default for KeloraConfig {
fn default() -> Self {
Self {
input: InputConfig {
files: Vec::new(),
no_input: false,
format: InputFormat::Auto,
file_order: FileOrder::Cli,
merge_ts: false,
skip_lines: 0,
head_lines: None,
section: None,
ignore_lines: None,
keep_lines: None,
multiline: None,
ts_field: None,
ts_format: None,
default_timezone: None,
extract_prefix: None,
prefix_sep: "|".to_string(),
cols_sep: None,
},
output: OutputConfig {
format: OutputFormat::Default,
keys: Vec::new(),
exclude_keys: Vec::new(),
core: false,
brief: false,
wrap: true, pretty: false,
color: ColorMode::Auto,
emoji: EmojiMode::Auto,
stats: None,
stats_with_events: false,
metrics: None,
metrics_with_events: false,
metrics_file: None,
drain: None,
discover_fields: None,
discover_final: false,
mark_gaps: None,
timestamp_formatting: TimestampFormatConfig::default(),
},
processing: ProcessingConfig {
begin: None,
stages: Vec::new(),
end: None,
error_report: ErrorReportConfig {
style: ErrorReportStyle::Summary,
},
span: None,
levels: Vec::new(),
exclude_levels: Vec::new(),
window_size: 0,
timestamp_filter: None,
normalize_timestamps: false,
take_limit: None,
strict: false,
verbose: 0,
quiet_events: false,
suppress_diagnostics: false,
silent: false,
suppress_script_output: false,
quiet_level: 0,
context: ContextConfig::disabled(),
allow_fs_writes: false,
},
performance: PerformanceConfig {
parallel: false,
threads: 0,
batch_size: None,
batch_timeout: 200,
no_preserve_order: false,
},
}
}
}
fn parse_input_format_from_cli(cli: &crate::Cli) -> anyhow::Result<InputFormat> {
parse_input_format_spec(&cli.format)
}
pub(crate) fn parse_input_format_spec(spec: &str) -> anyhow::Result<InputFormat> {
if spec.contains(',')
&& !spec.starts_with("regex:")
&& !spec.starts_with("cols:")
&& !spec.starts_with("csv:")
&& !spec.starts_with("csv ")
&& !spec.starts_with("tsv:")
&& !spec.starts_with("tsv ")
{
return parse_cascade_spec(spec);
}
let parse_field_spec = |_prefix: &str, name: &str| -> Option<String> {
if let Some(field_spec) = spec.strip_prefix(&format!("{}:", name)) {
Some(field_spec.trim().to_string())
} else {
spec.strip_prefix(&format!("{} ", name))
.map(|field_spec| field_spec.trim().to_string())
}
};
if let Some(regex_pattern) = spec.strip_prefix("regex:") {
if regex_pattern.trim().is_empty() {
return Err(anyhow::anyhow!(
"regex format requires a pattern, e.g., 'regex:(?P<field>\\d+)'"
));
}
return Ok(InputFormat::Regex(regex_pattern.to_string()));
}
if let Some(cols_spec) = spec.strip_prefix("cols:") {
if cols_spec.trim().is_empty() {
return Err(anyhow::anyhow!(
"cols format requires a specification, e.g., 'cols:ts level *msg'"
));
}
return Ok(InputFormat::Cols(cols_spec.to_string()));
}
if let Some(field_spec) = parse_field_spec(spec, "csv") {
return Ok(InputFormat::Csv(Some(field_spec)));
}
if let Some(field_spec) = parse_field_spec(spec, "tsv") {
return Ok(InputFormat::Tsv(Some(field_spec)));
}
match spec.to_lowercase().as_str() {
"auto" => Ok(InputFormat::Auto),
"auto-per-file" => Ok(InputFormat::AutoPerFile),
"json" => Ok(InputFormat::Json),
"line" => Ok(InputFormat::Line),
"raw" => Ok(InputFormat::Raw),
"logfmt" => Ok(InputFormat::Logfmt),
"syslog" => Ok(InputFormat::Syslog),
"cef" => Ok(InputFormat::Cef),
"csv" => Ok(InputFormat::Csv(None)),
"tsv" => Ok(InputFormat::Tsv(None)),
"csvnh" => Ok(InputFormat::Csvnh),
"tsvnh" => Ok(InputFormat::Tsvnh),
"combined" => Ok(InputFormat::Combined),
_ => Err(anyhow::anyhow!("Unknown input format: '{}'. Supported formats: json, line, csv, syslog, cef, logfmt, raw, tsv, csvnh, tsvnh, combined, auto, auto-per-file, cols:<spec>, and regex:<pattern>", spec)),
}
}
fn parse_cascade_spec(spec: &str) -> anyhow::Result<InputFormat> {
let parts: Vec<&str> = spec.split(',').map(|s| s.trim()).collect();
if parts.len() < 2 {
return Err(anyhow::anyhow!(
"cascade format requires at least two formats, e.g., 'json,line'"
));
}
let mut formats = Vec::with_capacity(parts.len());
let mut seen = std::collections::HashSet::new();
for part in parts {
if part.is_empty() {
return Err(anyhow::anyhow!(
"cascade format contains an empty entry in '{}'",
spec
));
}
let fmt = match part.to_lowercase().as_str() {
"json" => InputFormat::Json,
"line" => InputFormat::Line,
"raw" => InputFormat::Raw,
"logfmt" => InputFormat::Logfmt,
"syslog" => InputFormat::Syslog,
"cef" => InputFormat::Cef,
"combined" => InputFormat::Combined,
"auto" => {
return Err(anyhow::anyhow!(
"'auto' is not allowed inside a cascade list; list the formats explicitly"
));
}
"auto-per-file" => {
return Err(anyhow::anyhow!(
"'auto-per-file' is not allowed inside a cascade list; list the formats explicitly"
));
}
"csv" | "tsv" | "csvnh" | "tsvnh" => {
return Err(anyhow::anyhow!(
"'{}' is not allowed inside a cascade list (schema-based formats cannot be mixed per-line)",
part
));
}
"cols" | "regex" | "cascade" => {
return Err(anyhow::anyhow!(
"'{}' is not allowed inside a cascade list",
part
));
}
_ => {
return Err(anyhow::anyhow!(
"Unknown format '{}' in cascade list. Allowed: json, line, raw, logfmt, syslog, cef, combined",
part
));
}
};
let name = fmt.cascade_name();
if !seen.insert(name) {
return Err(anyhow::anyhow!(
"cascade list contains duplicate format '{}'",
name
));
}
formats.push(fmt);
}
for (idx, fmt) in formats.iter().enumerate() {
if matches!(fmt, InputFormat::Line | InputFormat::Raw) && idx != formats.len() - 1 {
return Err(anyhow::anyhow!(
"'{}' must be the last format in a cascade list; later formats would never run",
fmt.cascade_name()
));
}
}
Ok(InputFormat::Cascade(formats))
}
fn create_timestamp_format_config(
cli: &crate::Cli,
default_timezone: Option<String>,
) -> TimestampFormatConfig {
let auto_format_all = cli.format_timestamps_local || cli.format_timestamps_utc;
let mut format_fields = Vec::new();
if auto_format_all {
if let Some(ref ts_field) = cli.ts_field {
let trimmed = ts_field.trim();
if !trimmed.is_empty() {
format_fields.push(trimmed.to_string());
}
}
}
let format_as_utc = cli.format_timestamps_utc;
TimestampFormatConfig {
format_fields,
auto_format_all,
format_as_utc,
parse_format_hint: cli.ts_format.clone(),
parse_timezone_hint: default_timezone,
}
}
fn parse_error_report_config(cli: &crate::Cli) -> ErrorReportConfig {
let style = if cli.strict {
ErrorReportStyle::Print } else {
ErrorReportStyle::Summary };
ErrorReportConfig { style }
}
fn create_context_config(cli: &crate::Cli) -> anyhow::Result<ContextConfig> {
let (before_context, after_context) = if let Some(context) = cli.context {
(context, context)
} else {
(
cli.before_context.unwrap_or(0),
cli.after_context.unwrap_or(0),
)
};
let has_filtering = !cli.filters.is_empty()
|| !cli.levels.is_empty()
|| !cli.exclude_levels.is_empty()
|| cli.since.is_some()
|| cli.until.is_some();
if (before_context > 0 || after_context > 0) && !has_filtering {
return Err(anyhow::anyhow!(
"Context options (-A, -B, -C) require active filtering because context is shown around matches. Add --filter, --levels, --since, or --until."
));
}
Ok(ContextConfig::new(before_context, after_context))
}
fn determine_default_timezone(cli: &crate::Cli) -> Option<String> {
if let Some(ref input_tz) = cli.input_tz {
if input_tz == "local" {
return None; } else {
return Some(input_tz.clone());
}
}
if let Ok(tz) = std::env::var("TZ") {
if !tz.is_empty() {
return Some(tz);
}
}
Some("UTC".to_string())
}
fn parse_span_config(cli: &crate::Cli) -> anyhow::Result<Option<SpanConfig>> {
let span_spec = cli
.span
.as_ref()
.map(|s| s.trim())
.filter(|s| !s.is_empty());
let idle_spec = cli
.span_idle
.as_ref()
.map(|s| s.trim())
.filter(|s| !s.is_empty());
if span_spec.is_none() && idle_spec.is_none() {
if cli.span_close.is_some() {
return Err(anyhow::anyhow!(
"--span-close requires --span or --span-idle. Use --span N for fixed-size spans or --span-idle 30s for inactivity-based spans."
));
}
return Ok(None);
}
if span_spec.is_some() && idle_spec.is_some() {
return Err(anyhow::anyhow!(
"--span and --span-idle cannot be used together. Use --span N for fixed-size spans or --span-idle 30s for inactivity-based spans."
));
}
if let Some(spec) = idle_spec {
let duration = humantime::parse_duration(spec).map_err(|e| {
anyhow::anyhow!(
"Invalid --span-idle duration '{}': {}. Use formats like 30s, 5m, 1h.",
spec,
e
)
})?;
if duration.is_zero() {
return Err(anyhow::anyhow!(
"--span-idle duration must be greater than zero"
));
}
let timeout_ms: i64 = duration
.as_millis()
.try_into()
.map_err(|_| anyhow::anyhow!("--span-idle duration is too large"))?;
return Ok(Some(SpanConfig {
mode: SpanMode::Idle { timeout_ms },
close_script: cli.span_close.clone(),
}));
}
let span_spec = span_spec.expect("span presence checked above");
if let Ok(count) = span_spec.parse::<usize>() {
if count == 0 {
return Err(anyhow::anyhow!(
"--span <N> must be a positive integer greater than zero"
));
}
return Ok(Some(SpanConfig {
mode: SpanMode::Count {
events_per_span: count,
},
close_script: cli.span_close.clone(),
}));
}
if let Ok(duration) = humantime::parse_duration(span_spec) {
if duration.is_zero() {
return Err(anyhow::anyhow!("--span duration must be greater than zero"));
}
let duration_ms: i64 = duration
.as_millis()
.try_into()
.map_err(|_| anyhow::anyhow!("--span duration is too large"))?;
return Ok(Some(SpanConfig {
mode: SpanMode::Time { duration_ms },
close_script: cli.span_close.clone(),
}));
}
if !is_valid_field_name(span_spec) {
return Err(anyhow::anyhow!(
"Invalid --span field name '{}': must start with a letter and contain only letters, digits, or underscores",
span_spec
));
}
Ok(Some(SpanConfig {
mode: SpanMode::Field {
field_name: span_spec.to_string(),
},
close_script: cli.span_close.clone(),
}))
}
fn is_valid_field_name(name: &str) -> bool {
let mut chars = name.chars();
match chars.next() {
Some(c) if c.is_ascii_alphabetic() => {}
_ => return false,
}
chars.all(|c| c.is_ascii_alphanumeric() || c == '_')
}
impl From<crate::InputFormat> for InputFormat {
fn from(format: crate::InputFormat) -> Self {
match format {
crate::InputFormat::Auto => InputFormat::Auto,
crate::InputFormat::AutoPerFile => InputFormat::AutoPerFile,
crate::InputFormat::Json => InputFormat::Json,
crate::InputFormat::Line => InputFormat::Line,
crate::InputFormat::Raw => InputFormat::Raw,
crate::InputFormat::Logfmt => InputFormat::Logfmt,
crate::InputFormat::Syslog => InputFormat::Syslog,
crate::InputFormat::Cef => InputFormat::Cef,
crate::InputFormat::Csv => InputFormat::Csv(None),
crate::InputFormat::Tsv => InputFormat::Tsv(None),
crate::InputFormat::Csvnh => InputFormat::Csvnh,
crate::InputFormat::Tsvnh => InputFormat::Tsvnh,
crate::InputFormat::Combined => InputFormat::Combined,
crate::InputFormat::Cols => {
InputFormat::Cols(String::new())
}
crate::InputFormat::Regex => {
InputFormat::Regex(String::new())
}
}
}
}
impl From<InputFormat> for crate::InputFormat {
fn from(format: InputFormat) -> Self {
match format {
InputFormat::Auto => crate::InputFormat::Auto,
InputFormat::AutoPerFile => crate::InputFormat::AutoPerFile,
InputFormat::Json => crate::InputFormat::Json,
InputFormat::Line => crate::InputFormat::Line,
InputFormat::Raw => crate::InputFormat::Raw,
InputFormat::Logfmt => crate::InputFormat::Logfmt,
InputFormat::Syslog => crate::InputFormat::Syslog,
InputFormat::Cef => crate::InputFormat::Cef,
InputFormat::Csv(_) => crate::InputFormat::Csv,
InputFormat::Tsv(_) => crate::InputFormat::Tsv,
InputFormat::Csvnh => crate::InputFormat::Csvnh,
InputFormat::Tsvnh => crate::InputFormat::Tsvnh,
InputFormat::Combined => crate::InputFormat::Combined,
InputFormat::Cols(_) => crate::InputFormat::Cols,
InputFormat::Regex(_) => crate::InputFormat::Regex,
InputFormat::Cascade(_) => crate::InputFormat::Auto,
}
}
}
impl From<crate::OutputFormat> for OutputFormat {
fn from(format: crate::OutputFormat) -> Self {
match format {
crate::OutputFormat::Json => OutputFormat::Json,
crate::OutputFormat::Default => OutputFormat::Default,
crate::OutputFormat::Logfmt => OutputFormat::Logfmt,
crate::OutputFormat::Inspect => OutputFormat::Inspect,
crate::OutputFormat::Levelmap => OutputFormat::Levelmap,
crate::OutputFormat::Keymap => OutputFormat::Keymap,
crate::OutputFormat::Tailmap => OutputFormat::Tailmap,
crate::OutputFormat::Csv => OutputFormat::Csv,
crate::OutputFormat::Tsv => OutputFormat::Tsv,
crate::OutputFormat::Csvnh => OutputFormat::Csvnh,
crate::OutputFormat::Tsvnh => OutputFormat::Tsvnh,
}
}
}
impl From<OutputFormat> for crate::OutputFormat {
fn from(format: OutputFormat) -> Self {
match format {
OutputFormat::Json => crate::OutputFormat::Json,
OutputFormat::Default => crate::OutputFormat::Default,
OutputFormat::Logfmt => crate::OutputFormat::Logfmt,
OutputFormat::Inspect => crate::OutputFormat::Inspect,
OutputFormat::Levelmap => crate::OutputFormat::Levelmap,
OutputFormat::Keymap => crate::OutputFormat::Keymap,
OutputFormat::Tailmap => crate::OutputFormat::Tailmap,
OutputFormat::Csv => crate::OutputFormat::Csv,
OutputFormat::Tsv => crate::OutputFormat::Tsv,
OutputFormat::Csvnh => crate::OutputFormat::Csvnh,
OutputFormat::Tsvnh => crate::OutputFormat::Tsvnh,
}
}
}
impl From<crate::FileOrder> for FileOrder {
fn from(order: crate::FileOrder) -> Self {
match order {
crate::FileOrder::Cli => FileOrder::Cli,
crate::FileOrder::Name => FileOrder::Name,
crate::FileOrder::Mtime => FileOrder::Mtime,
}
}
}
impl From<FileOrder> for crate::FileOrder {
fn from(order: FileOrder) -> Self {
match order {
FileOrder::Cli => crate::FileOrder::Cli,
FileOrder::Name => crate::FileOrder::Name,
FileOrder::Mtime => crate::FileOrder::Mtime,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cli::Cli;
use clap::Parser;
use once_cell::sync::Lazy;
use std::sync::Mutex;
static ENV_LOCK: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(()));
struct EnvGuard {
vars: Vec<(&'static str, Option<String>)>,
}
impl EnvGuard {
fn new(keys: &[&'static str]) -> Self {
let vars = keys
.iter()
.map(|key| (*key, std::env::var(key).ok()))
.collect();
Self { vars }
}
}
impl Drop for EnvGuard {
fn drop(&mut self) {
for (key, value) in &self.vars {
if let Some(v) = value {
std::env::set_var(key, v);
} else {
std::env::remove_var(key);
}
}
}
}
fn with_env_lock<F: FnOnce()>(keys: &[&'static str], f: F) {
let _lock = ENV_LOCK.lock().unwrap();
let _guard = EnvGuard::new(keys);
f();
}
#[test]
fn determine_default_timezone_defaults_to_utc() {
with_env_lock(&["TZ"], || {
std::env::remove_var("TZ");
let cli = Cli::parse_from(["kelora"]);
let tz = super::determine_default_timezone(&cli);
assert_eq!(tz.as_deref(), Some("UTC"));
});
}
#[test]
fn determine_default_timezone_respects_cli_local() {
with_env_lock(&["TZ"], || {
std::env::remove_var("TZ");
let cli = Cli::parse_from(["kelora", "--input-tz", "local"]);
let tz = super::determine_default_timezone(&cli);
assert_eq!(tz, None);
});
}
#[test]
fn determine_default_timezone_prefers_cli_over_env() {
with_env_lock(&["TZ"], || {
std::env::set_var("TZ", "America/New_York");
let cli = Cli::parse_from(["kelora", "--input-tz", "Europe/Berlin"]);
let tz = super::determine_default_timezone(&cli);
assert_eq!(tz.as_deref(), Some("Europe/Berlin"));
});
}
#[test]
fn determine_default_timezone_uses_environment_when_present() {
with_env_lock(&["TZ"], || {
std::env::set_var("TZ", "Asia/Tokyo");
let cli = Cli::parse_from(["kelora"]);
let tz = super::determine_default_timezone(&cli);
assert_eq!(tz.as_deref(), Some("Asia/Tokyo"));
});
}
#[test]
fn format_error_message_respects_color_settings() {
with_env_lock(&["NO_COLOR", "NO_EMOJI", "FORCE_COLOR"], || {
std::env::remove_var("NO_COLOR");
std::env::remove_var("NO_EMOJI");
std::env::remove_var("FORCE_COLOR");
let mut config = KeloraConfig::default();
config.output.color = ColorMode::Always;
config.output.emoji = EmojiMode::Always;
let message = config.format_error_message("problem");
assert!(message.starts_with("⚠️"));
assert!(message.ends_with("problem"));
});
}
#[test]
fn format_error_message_without_colors_falls_back_to_plain_prefix() {
let mut config = KeloraConfig::default();
config.output.color = ColorMode::Never;
config.output.emoji = EmojiMode::Never;
let message = config.format_error_message("issue");
assert_eq!(message, "kelora: issue");
}
#[test]
fn output_config_get_effective_keys_includes_core_fields() {
let mut config = KeloraConfig::default();
config.output.core = true;
config.output.keys = vec!["custom".to_string(), "ts".to_string()];
let keys = config.output.get_effective_keys();
let core = KeloraConfig::get_core_field_names();
for required in &core {
assert!(keys.contains(required), "missing core key {required}");
}
assert!(keys.contains(&"custom".to_string()));
let mut unique = keys.clone();
unique.sort();
unique.dedup();
assert_eq!(
unique.len(),
keys.len(),
"keys should not contain duplicates"
);
}
#[test]
fn output_config_get_effective_keys_respects_non_core_mode() {
let mut config = KeloraConfig::default();
config.output.core = false;
config.output.keys = vec!["alpha".to_string(), "beta".to_string()];
let keys = config.output.get_effective_keys();
assert_eq!(keys, vec!["alpha".to_string(), "beta".to_string()]);
}
#[test]
fn parse_cascade_spec_rejects_line_before_last_position() {
let err = parse_input_format_spec("json,line,logfmt")
.expect_err("line before the last position should be rejected");
let message = err.to_string();
assert!(message.contains("line"));
assert!(message.contains("must be the last format"));
}
#[test]
fn parse_cascade_spec_rejects_raw_before_last_position() {
let err = parse_input_format_spec("json,raw,logfmt")
.expect_err("raw before the last position should be rejected");
let message = err.to_string();
assert!(message.contains("raw"));
assert!(message.contains("must be the last format"));
}
#[test]
fn parse_cascade_spec_allows_catch_all_last() {
let parsed = parse_input_format_spec("json,logfmt,line")
.expect("line should be allowed as the final fallback");
assert!(matches!(parsed, InputFormat::Cascade(_)));
let parsed = parse_input_format_spec("json,logfmt,raw")
.expect("raw should be allowed as the final fallback");
assert!(matches!(parsed, InputFormat::Cascade(_)));
}
}