use crate::config::InputFormat as ConfigInputFormat;
use crate::parsers::{CefParser, CombinedParser, LogfmtParser, SyslogParser};
use crate::pipeline::EventParser;
use anyhow::Result;
pub fn detect_format(sample_line: &str) -> Result<ConfigInputFormat> {
let trimmed = sample_line.trim();
if trimmed.is_empty() {
return Ok(ConfigInputFormat::Line);
}
if detect_json(trimmed) {
return Ok(ConfigInputFormat::Json);
}
if detect_cef(trimmed) {
return Ok(ConfigInputFormat::Cef);
}
if detect_syslog(trimmed) {
return Ok(ConfigInputFormat::Syslog);
}
if detect_combined_logs(trimmed) {
return Ok(ConfigInputFormat::Combined);
}
if detect_logfmt(trimmed) {
return Ok(ConfigInputFormat::Logfmt);
}
if let Some(csv_format) = detect_csv_variants(trimmed) {
return Ok(csv_format);
}
Ok(ConfigInputFormat::Line)
}
fn detect_json(line: &str) -> bool {
if !line.starts_with('{') {
return false;
}
serde_json::from_str::<serde_json::Value>(line).is_ok()
}
fn detect_cef(line: &str) -> bool {
let parser = CefParser::new_without_auto_timestamp().with_strict(true);
parser.parse(line).is_ok()
}
fn detect_syslog(line: &str) -> bool {
if let Ok(parser) = SyslogParser::new_without_auto_timestamp() {
parser.parse(line).is_ok()
} else {
false }
}
fn detect_combined_logs(line: &str) -> bool {
if let Ok(parser) = CombinedParser::new_without_auto_timestamp() {
parser.parse(line).is_ok()
} else {
false }
}
fn detect_logfmt(line: &str) -> bool {
let parser = LogfmtParser::new_without_auto_timestamp();
parser.parse(line).is_ok()
}
fn detect_csv_variants(line: &str) -> Option<ConfigInputFormat> {
let comma_count = line.matches(',').count();
let tab_count = line.matches('\t').count();
if tab_count >= 2 {
if let Some(first_field) = line.split('\t').next() {
if first_field.chars().any(|c| c.is_ascii_alphabetic())
&& !first_field.chars().all(|c| c.is_ascii_digit())
{
return Some(ConfigInputFormat::Tsv(None));
} else {
return Some(ConfigInputFormat::Tsvnh);
}
}
}
if comma_count >= 2 {
if let Some(first_field) = line.split(',').next() {
let trimmed_field = first_field.trim_matches('"').trim();
if trimmed_field.chars().any(|c| c.is_ascii_alphabetic())
&& !trimmed_field.chars().all(|c| c.is_ascii_digit())
{
return Some(ConfigInputFormat::Csv(None));
} else {
return Some(ConfigInputFormat::Csvnh);
}
}
}
None
}
#[cfg(test)]
mod tests {
use super::*;
use proptest::prelude::*;
use proptest::strategy::{BoxedStrategy, Strategy};
#[test]
fn test_detect_json() {
assert_eq!(
detect_format(r#"{"key": "value", "num": 42}"#).unwrap(),
ConfigInputFormat::Json
);
assert_eq!(
detect_format(r#"{"timestamp": "2023-04-15T10:00:00Z"}"#).unwrap(),
ConfigInputFormat::Json
);
}
#[test]
fn test_detect_cef() {
assert_eq!(
detect_format("CEF:0|Vendor|Product|Version|EventID|Name|Severity|Extension").unwrap(),
ConfigInputFormat::Cef
);
}
#[test]
fn test_detect_syslog() {
assert_eq!(
detect_format("<34>1 2023-04-15T10:00:00.000Z hostname app - - - message").unwrap(),
ConfigInputFormat::Syslog
);
assert_eq!(
detect_format("<13>Apr 15 10:00:00 hostname program: message").unwrap(),
ConfigInputFormat::Syslog
);
assert_eq!(
detect_format("Jan 15 10:30:45 server1 sshd[1234]: Accepted publickey for user")
.unwrap(),
ConfigInputFormat::Syslog
);
assert_eq!(
detect_format("Dec 25 23:59:59 hostname kernel: USB disconnect").unwrap(),
ConfigInputFormat::Syslog
);
}
#[test]
fn test_detect_combined() {
assert_eq!(
detect_format(
r#"192.168.1.1 - - [15/Apr/2023:10:00:00 +0000] "GET /path HTTP/1.1" 200 1234"#
)
.unwrap(),
ConfigInputFormat::Combined
);
}
#[test]
fn test_detect_logfmt() {
assert_eq!(
detect_format("time=2023-04-15T10:00:00Z level=info msg=test").unwrap(),
ConfigInputFormat::Logfmt
);
assert_eq!(
detect_format("key1=value1 key2=value2 key3=value3").unwrap(),
ConfigInputFormat::Logfmt
);
}
#[test]
fn test_detect_csv() {
assert!(matches!(
detect_format("name,age,city").unwrap(),
ConfigInputFormat::Csv(_)
));
assert!(matches!(
detect_format("1,2,3").unwrap(),
ConfigInputFormat::Csvnh
));
assert!(matches!(
detect_format("john\t25\tnyc").unwrap(),
ConfigInputFormat::Tsv(_)
)); assert!(matches!(
detect_format("name\tage\tcity").unwrap(),
ConfigInputFormat::Tsv(_)
));
assert!(matches!(
detect_format("1\t2\t3").unwrap(),
ConfigInputFormat::Tsvnh
));
}
#[test]
fn test_detect_line_fallback() {
assert_eq!(
detect_format("just some random text").unwrap(),
ConfigInputFormat::Line
);
assert_eq!(detect_format("").unwrap(), ConfigInputFormat::Line);
assert_eq!(
detect_format("a single word").unwrap(),
ConfigInputFormat::Line
);
}
fn lower_ascii(len: std::ops::RangeInclusive<usize>) -> BoxedStrategy<String> {
prop::collection::vec(proptest::char::range('a', 'z'), len)
.prop_map(|chars| chars.into_iter().collect())
.boxed()
}
fn identifier() -> BoxedStrategy<String> {
lower_ascii(1..=8)
}
fn json_value() -> BoxedStrategy<serde_json::Value> {
let string_val = lower_ascii(0..=8)
.prop_map(serde_json::Value::String)
.boxed();
let number_val = any::<i64>()
.prop_map(|v| serde_json::Value::Number(serde_json::Number::from(v)))
.boxed();
let bool_val = any::<bool>().prop_map(serde_json::Value::Bool).boxed();
prop_oneof![string_val, number_val, bool_val].boxed()
}
fn json_line() -> BoxedStrategy<String> {
prop::collection::vec((identifier(), json_value()), 1..=4)
.prop_map(|entries| {
let mut map = serde_json::Map::new();
for (k, v) in entries {
map.insert(k, v);
}
serde_json::Value::Object(map).to_string()
})
.boxed()
}
fn cef_line() -> BoxedStrategy<String> {
(
identifier(),
identifier(),
identifier(),
identifier(),
identifier(),
0u8..=10,
identifier(),
identifier(),
)
.prop_map(|(vendor, product, version, signature, name, severity, ext_key, ext_value)| {
format!(
"CEF:0|{vendor}|{product}|{version}|{signature}|{name}|{severity}|{ext_key}={ext_value}"
)
})
.boxed()
}
fn csv_with_headers() -> BoxedStrategy<String> {
prop::collection::vec(identifier(), 3..=5)
.prop_map(|fields| fields.join(","))
.boxed()
}
fn csv_without_headers() -> BoxedStrategy<String> {
prop::collection::vec(0u16..=999, 3..=5)
.prop_map(|nums| {
nums.into_iter()
.map(|n| n.to_string())
.collect::<Vec<_>>()
.join(",")
})
.boxed()
}
fn tsv_with_headers() -> BoxedStrategy<String> {
prop::collection::vec(identifier(), 3..=5)
.prop_map(|fields| fields.join("\t"))
.boxed()
}
fn tsv_without_headers() -> BoxedStrategy<String> {
prop::collection::vec(0u16..=999, 3..=5)
.prop_map(|nums| {
nums.into_iter()
.map(|n| n.to_string())
.collect::<Vec<_>>()
.join("\t")
})
.boxed()
}
fn plain_line() -> BoxedStrategy<String> {
lower_ascii(5..=30)
}
proptest! {
#[test]
fn prop_detects_json(line in json_line()) {
prop_assert_eq!(detect_format(&line).unwrap(), ConfigInputFormat::Json);
}
#[test]
fn prop_detects_cef(line in cef_line()) {
prop_assert_eq!(detect_format(&line).unwrap(), ConfigInputFormat::Cef);
}
#[test]
fn prop_detects_csv_headers(line in csv_with_headers()) {
prop_assert_eq!(detect_format(&line).unwrap(), ConfigInputFormat::Csv(None));
}
#[test]
fn prop_detects_csv_no_headers(line in csv_without_headers()) {
prop_assert_eq!(detect_format(&line).unwrap(), ConfigInputFormat::Csvnh);
}
#[test]
fn prop_detects_tsv_headers(line in tsv_with_headers()) {
prop_assert_eq!(detect_format(&line).unwrap(), ConfigInputFormat::Tsv(None));
}
#[test]
fn prop_detects_tsv_no_headers(line in tsv_without_headers()) {
prop_assert_eq!(detect_format(&line).unwrap(), ConfigInputFormat::Tsvnh);
}
#[test]
fn prop_detects_line_fallback(line in plain_line()) {
prop_assert_eq!(detect_format(&line).unwrap(), ConfigInputFormat::Line);
}
}
}