use crate::infer;
use crate::parser::{Confidence, FormatParser, STRONG};
use crate::table::TableBuilder;
use ax_core::{AxError, Column, Value};
use serde_json::Value as J;
use std::collections::BTreeMap;
#[derive(Debug, Default, Clone)]
pub struct JournalParser;
const SIGNATURE_KEYS: [&str; 3] = ["__REALTIME_TIMESTAMP", "__CURSOR", "_SYSTEMD_UNIT"];
fn looks_like_journal(obj: &serde_json::Map<String, J>) -> bool {
SIGNATURE_KEYS.iter().any(|k| obj.contains_key(*k))
}
impl JournalParser {
fn err(&self, msg: impl std::fmt::Display) -> AxError {
AxError::Parse {
format: self.id().to_string(),
message: msg.to_string(),
}
}
}
impl FormatParser for JournalParser {
fn id(&self) -> &'static str {
"journal"
}
fn extensions(&self) -> &'static [&'static str] {
&[]
}
fn sniff(&self, bytes: &[u8]) -> Option<Confidence> {
let text = std::str::from_utf8(bytes).ok()?;
let line = text.lines().find(|l| !l.trim().is_empty())?;
let value: J = serde_json::from_str(line).ok()?;
value
.as_object()
.is_some_and(looks_like_journal)
.then_some(STRONG)
}
fn parse(&self, _source: &str, bytes: &[u8]) -> Result<Vec<Column>, AxError> {
let text = std::str::from_utf8(bytes).map_err(|e| self.err(e))?;
let mut builder = TableBuilder::new();
for line in text.lines() {
if line.trim().is_empty() {
continue;
}
let value: J = serde_json::from_str(line).map_err(|e| self.err(e))?;
let obj = value
.as_object()
.ok_or_else(|| self.err("journal entry is not a JSON object"))?;
let mut row: BTreeMap<String, Value> = BTreeMap::new();
for (key, val) in obj {
let cell = match val {
J::String(s) => infer::infer_scalar(s),
other => infer::json_to_value(other),
};
row.insert(key.clone(), cell);
}
builder.push_row(row);
}
Ok(builder.finish())
}
}
#[cfg(test)]
mod tests {
use super::*;
use ax_core::ColType;
const JOURNAL: &str = concat!(
r#"{"__CURSOR":"s=a","__REALTIME_TIMESTAMP":"1612345678000000","__MONOTONIC_TIMESTAMP":"1000","_SYSTEMD_UNIT":"sshd.service","PRIORITY":"6","_PID":"1234","MESSAGE":"Accepted password","_HOSTNAME":"host"}"#,
"\n",
r#"{"__CURSOR":"s=b","__REALTIME_TIMESTAMP":"1612345679000000","_SYSTEMD_UNIT":"cron.service","PRIORITY":"5","MESSAGE":"job done"}"#,
"\n",
r#"{"__CURSOR":"s=c","__REALTIME_TIMESTAMP":"1612345680000000","_SYSTEMD_UNIT":"sshd.service","PRIORITY":"3","MESSAGE":[72,73]}"#,
"\n",
);
fn parse(s: &str) -> Vec<Column> {
JournalParser.parse("-", s.as_bytes()).unwrap()
}
fn col<'a>(cols: &'a [Column], name: &str) -> &'a Column {
cols.iter()
.find(|c| c.name == name)
.unwrap_or_else(|| panic!("missing column {name}"))
}
#[test]
fn numeric_string_fields_are_coerced() {
let cols = parse(JOURNAL);
let ts = col(&cols, "__REALTIME_TIMESTAMP");
assert_eq!(ts.ty, ColType::Int, "timestamp coerced for cadence/cusum");
assert_eq!(ts.cells[0], Value::Int(1_612_345_678_000_000));
let prio = col(&cols, "PRIORITY");
assert_eq!(prio.ty, ColType::Int);
assert_eq!(prio.cells[0], Value::Int(6));
assert_eq!(prio.cells[2], Value::Int(3));
assert_eq!(col(&cols, "_PID").cells[0], Value::Int(1234));
}
#[test]
fn unit_and_message_stay_strings() {
let cols = parse(JOURNAL);
let unit = col(&cols, "_SYSTEMD_UNIT");
assert_eq!(unit.ty, ColType::Str, "unit stays categorical for dist");
assert_eq!(unit.cells[0], Value::Str("sshd.service".into()));
assert_eq!(
col(&cols, "MESSAGE").cells[0],
Value::Str("Accepted password".into())
);
}
#[test]
fn missing_fields_pad_with_null() {
let cols = parse(JOURNAL);
assert_eq!(col(&cols, "_PID").null_count(), 2);
assert_eq!(col(&cols, "_HOSTNAME").null_count(), 2);
}
#[test]
fn array_valued_field_is_canonical_json() {
let cols = parse(JOURNAL);
assert_eq!(col(&cols, "MESSAGE").cells[2], Value::Str("[72,73]".into()));
}
#[test]
fn malformed_entries_error() {
assert!(matches!(
JournalParser.parse("-", b"not json\n"),
Err(AxError::Parse { .. })
));
assert!(matches!(
JournalParser.parse("-", b"[1,2,3]\n"),
Err(AxError::Parse { .. })
));
}
#[test]
fn sniff_keys_on_journald_signature() {
assert_eq!(JournalParser.sniff(JOURNAL.as_bytes()), Some(STRONG));
assert_eq!(
JournalParser.sniff(br#"{"__CURSOR":"x","MESSAGE":"y"}"#),
Some(STRONG)
);
assert_eq!(JournalParser.sniff(b"{\"a\":1}\n{\"a\":2}\n"), None);
assert_eq!(JournalParser.sniff(br#"{"foo":1}"#), None);
assert_eq!(JournalParser.sniff(b"a,b,c\n1,2,3"), None); }
#[test]
fn claims_no_extension() {
assert!(JournalParser.extensions().is_empty());
}
#[test]
fn resolves_journal_over_ndjson_by_content() {
let reg = crate::parser::ParserRegistry::default();
assert_eq!(
reg.resolve("-", JOURNAL.as_bytes()).unwrap().id(),
"journal"
);
assert_eq!(
reg.resolve("-", b"{\"a\":1}\n{\"a\":2}\n").unwrap().id(),
"ndjson"
);
}
}