use anyhow::Result;
use std::fs;
use std::io::BufRead;
use crate::config::{self, KeloraConfig};
use crate::decompression;
use crate::parsers;
use crate::pipeline;
use crate::readers;
use crate::stats;
use crate::stats::ProcessingStats;
#[derive(Debug)]
pub struct AllInputsUnopenable;
impl std::fmt::Display for AllInputsUnopenable {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "no input files could be opened")
}
}
impl std::error::Error for AllInputsUnopenable {}
#[derive(Debug, Clone)]
pub struct DetectedFormat {
pub format: config::InputFormat,
pub had_input: bool,
}
impl DetectedFormat {
pub fn detected_non_line(&self) -> bool {
self.had_input && !matches!(self.format, config::InputFormat::Line)
}
pub fn fell_back_to_line(&self) -> bool {
self.had_input && matches!(self.format, config::InputFormat::Line)
}
}
pub fn detect_format_from_peekable_reader<R: std::io::BufRead>(
reader: &mut readers::PeekableLineReader<R>,
) -> Result<DetectedFormat> {
match reader.peek_first_non_empty_line()? {
None => Ok(DetectedFormat {
format: config::InputFormat::Line,
had_input: reader.saw_any_input(),
}),
Some(line) => {
let trimmed_line = line.trim_end_matches(&['\r', '\n'][..]);
let detected = parsers::detect_format(trimmed_line)?;
Ok(DetectedFormat {
format: detected,
had_input: true,
})
}
}
}
pub fn detect_format_for_parallel_mode(
files: &[String],
no_input: bool,
strict: bool,
) -> Result<(DetectedFormat, Option<Box<dyn BufRead + Send>>)> {
use std::io;
if no_input {
return Ok((
DetectedFormat {
format: config::InputFormat::Line,
had_input: false,
},
None,
));
}
if files.is_empty() {
let stdin_reader = readers::ChannelStdinReader::new()?;
let processed_stdin = decompression::maybe_decompress(stdin_reader)?;
let mut peekable_reader =
readers::PeekableLineReader::new(io::BufReader::new(processed_stdin));
let detected = detect_format_from_peekable_reader(&mut peekable_reader)?;
Ok((detected, Some(Box::new(peekable_reader))))
} else {
let sorted_files = pipeline::builders::sort_files(files, &config::FileOrder::Cli)?;
let mut failed_opens: Vec<(String, String)> = Vec::new();
let mut failed_dirs: Vec<String> = Vec::new();
let mut detected: Option<DetectedFormat> = None;
for file_path in &sorted_files {
if let Ok(metadata) = fs::metadata(file_path) {
if metadata.is_dir() {
if strict {
return Err(anyhow::anyhow!(
"Input path '{}' is a directory; only files are supported",
file_path
));
}
failed_dirs.push(file_path.clone());
continue;
}
}
match decompression::DecompressionReader::new(file_path) {
Ok(decompressed) => {
let mut peekable_reader = readers::PeekableLineReader::new(decompressed);
detected = Some(detect_format_from_peekable_reader(&mut peekable_reader)?);
break;
}
Err(e) => {
if strict {
return Err(anyhow::anyhow!(config::format_input_open_error(
file_path,
&e.to_string()
)));
}
failed_opens.push((file_path.clone(), e.to_string()));
}
}
}
let detected = match detected {
Some(detected) => detected,
None => {
let printed_detail = !failed_dirs.is_empty() || !failed_opens.is_empty();
for path in failed_dirs {
eprintln!(
"{}",
config::format_error_message_auto(&format!(
"Input path '{}' is a directory; skipping (input files only)",
path
))
);
stats::stats_file_open_failed(&path);
}
for (path, err) in failed_opens {
eprintln!(
"{}",
config::format_error_message_auto(&config::format_input_open_error(
&path, &err
))
);
stats::stats_file_open_failed(&path);
}
if printed_detail {
return Err(anyhow::Error::new(AllInputsUnopenable));
}
return Err(anyhow::anyhow!(
"Failed to open any input files for detection"
));
}
};
Ok((detected, None))
}
}
pub fn format_detected_format_notice(
config: &KeloraConfig,
detected: &DetectedFormat,
) -> Option<String> {
if detected.detected_non_line() {
if config.processing.verbose == 0 {
return None;
}
let format_name = detected.format.to_display_string();
let message = config.format_info_message(&format!(
"Auto-detected format: {} (from first line)",
format_name
));
Some(message)
} else if detected.fell_back_to_line() {
if !config.hints_allowed() {
return None;
}
let message = config.format_hint_message(
"No input format detected; keeping whole lines as 'line'. For 'timestamp LEVEL message' app logs, extract fields with -f 'cols:ts(2) level *msg' (or a regex:). Mixed file? Cascade with repeated -f, e.g. -f json -f 'cols:ts(2) level *msg'. See --help-formats.",
);
Some(message)
} else {
None
}
}
pub fn emit_detected_format_notice(config: &KeloraConfig, detected: &DetectedFormat) {
if let Some(message) = format_detected_format_notice(config, detected) {
eprintln!("{}", message);
}
}
pub fn parse_failure_warning_message(
config: &KeloraConfig,
stats: Option<&ProcessingStats>,
auto_detected_non_line: bool,
events_were_output: bool,
) -> Option<String> {
if !auto_detected_non_line || !config.warnings_allowed() {
return None;
}
let stats = stats?;
let parse_errors = stats.lines_errors as i64;
let events_created = stats.events_created as i64;
let seen = std::cmp::max(1, events_created + parse_errors);
let should_warn = (parse_errors >= 10 && parse_errors * 3 >= seen)
|| (events_created == 0 && parse_errors >= 3);
if should_warn {
let text = mixed_format_suggestion(stats).unwrap_or_else(|| {
"Parsing mostly failed. The input may use the wrong format, contain mixed formats, or require multiline parsing. Try -f line, specify -f <fmt>, or see --help-formats / --help-multiline.".to_string()
});
let mut message = config.format_warning_message(&text);
if !events_were_output {
message = message.trim_start_matches('\n').to_string();
}
Some(message)
} else {
None
}
}
fn mixed_format_suggestion(stats: &ProcessingStats) -> Option<String> {
let primary = stats.detected_format.as_deref()?;
let sample = stats.first_parse_error_sample.as_deref()?;
let secondary_fmt = parsers::detect_format(sample).ok()?;
let secondary = secondary_fmt.cascade_name();
if secondary == primary {
return None;
}
let primary_eligible = !matches!(primary, "csv" | "tsv" | "csvnh" | "tsvnh");
if primary_eligible && secondary_fmt.is_cascade_eligible() {
Some(format!(
"Detected mixed formats ({primary} + {secondary}). Try: -f {primary},{secondary} (see --help-formats)."
))
} else {
Some(format!(
"Detected mixed formats ({primary} + {secondary}). These can't share a comma list; use repeated flags: -f {primary} -f {secondary} (see --help-formats)."
))
}
}
pub fn emit_parse_failure_warning(
config: &KeloraConfig,
stats: Option<&ProcessingStats>,
auto_detected_non_line: bool,
events_were_output: bool,
) {
if let Some(message) =
parse_failure_warning_message(config, stats, auto_detected_non_line, events_were_output)
{
eprintln!("{}", message);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::{ColorMode, EmojiMode};
fn base_config() -> KeloraConfig {
let mut cfg = KeloraConfig::default();
cfg.output.emoji = EmojiMode::Never;
cfg.output.color = ColorMode::Never;
cfg.processing.quiet_events = false;
cfg.processing.silent = false;
cfg.processing.suppress_warnings = false;
cfg.processing.suppress_hints = false;
cfg
}
#[test]
fn detected_format_notice_is_verbose_only() {
let detected = DetectedFormat {
format: config::InputFormat::Json,
had_input: true,
};
let cfg = base_config();
assert!(
format_detected_format_notice(&cfg, &detected).is_none(),
"confident auto-detect must stay silent without -v"
);
let mut verbose_cfg = base_config();
verbose_cfg.processing.verbose = 1;
let message =
format_detected_format_notice(&verbose_cfg, &detected).expect("expected info notice");
assert!(
message.contains("Auto-detected format: json"),
"message was {message}"
);
}
#[test]
fn parse_failure_warning_triggers_on_heavy_errors() {
let cfg = base_config();
let stats = ProcessingStats {
lines_errors: 10,
events_created: 0,
..Default::default()
};
let message = parse_failure_warning_message(&cfg, Some(&stats), true, false)
.expect("expected warning");
assert!(
message.contains("Parsing mostly failed"),
"message was {message}"
);
assert!(
message.contains("--help-multiline"),
"message should point to multiline help: {message}"
);
}
#[test]
fn parse_failure_warning_names_mixed_cascade_formats() {
let cfg = base_config();
let stats = ProcessingStats {
lines_errors: 10,
events_created: 5,
detected_format: Some("json".to_string()),
first_parse_error_sample: Some("just a plain text line".to_string()),
..Default::default()
};
let message = parse_failure_warning_message(&cfg, Some(&stats), true, false)
.expect("expected warning");
assert!(
message.contains("Detected mixed formats (json + line)"),
"message was {message}"
);
assert!(
message.contains("-f json,line"),
"should suggest the comma cascade: {message}"
);
}
#[test]
fn parse_failure_warning_suggests_repeated_flags_for_schema_formats() {
let cfg = base_config();
let stats = ProcessingStats {
lines_errors: 10,
events_created: 5,
detected_format: Some("json".to_string()),
first_parse_error_sample: Some("name,age,city".to_string()),
..Default::default()
};
let message = parse_failure_warning_message(&cfg, Some(&stats), true, false)
.expect("expected warning");
assert!(
message.contains("Detected mixed formats (json + csv)"),
"message was {message}"
);
assert!(
message.contains("-f json -f csv"),
"should suggest repeated flags: {message}"
);
}
#[test]
fn parse_failure_warning_uses_generic_message_without_a_sample() {
let cfg = base_config();
let stats = ProcessingStats {
lines_errors: 10,
events_created: 0,
detected_format: Some("json".to_string()),
first_parse_error_sample: None,
..Default::default()
};
let message = parse_failure_warning_message(&cfg, Some(&stats), true, false)
.expect("expected warning");
assert!(
message.contains("Parsing mostly failed"),
"should fall back to generic message: {message}"
);
}
#[test]
fn mixed_format_suggestion_skips_same_format_secondary() {
let stats = ProcessingStats {
detected_format: Some("line".to_string()),
first_parse_error_sample: Some("just a plain text line".to_string()),
..Default::default()
};
assert!(
mixed_format_suggestion(&stats).is_none(),
"same primary/secondary format should yield no specific suggestion"
);
}
#[test]
fn parse_failure_warning_skips_light_error_rates() {
let cfg = base_config();
let stats = ProcessingStats {
lines_errors: 2,
events_created: 10,
..Default::default()
};
assert!(
parse_failure_warning_message(&cfg, Some(&stats), true, false).is_none(),
"should not warn on low error rate"
);
}
}