anomalyx_normalize/parsers/
journal.rs1use crate::infer;
18use crate::parser::{Confidence, FormatParser, STRONG};
19use crate::table::TableBuilder;
20use ax_core::{AxError, Column, Value};
21use serde_json::Value as J;
22use std::collections::BTreeMap;
23
24#[derive(Debug, Default, Clone)]
25pub struct JournalParser;
26
27const SIGNATURE_KEYS: [&str; 3] = ["__REALTIME_TIMESTAMP", "__CURSOR", "_SYSTEMD_UNIT"];
30
31fn looks_like_journal(obj: &serde_json::Map<String, J>) -> bool {
32 SIGNATURE_KEYS.iter().any(|k| obj.contains_key(*k))
33}
34
35impl JournalParser {
36 fn err(&self, msg: impl std::fmt::Display) -> AxError {
37 AxError::Parse {
38 format: self.id().to_string(),
39 message: msg.to_string(),
40 }
41 }
42}
43
44impl FormatParser for JournalParser {
45 fn id(&self) -> &'static str {
46 "journal"
47 }
48 fn extensions(&self) -> &'static [&'static str] {
49 &[]
50 }
51 fn sniff(&self, bytes: &[u8]) -> Option<Confidence> {
52 let text = std::str::from_utf8(bytes).ok()?;
53 let line = text.lines().find(|l| !l.trim().is_empty())?;
54 let value: J = serde_json::from_str(line).ok()?;
55 value
56 .as_object()
57 .is_some_and(looks_like_journal)
58 .then_some(STRONG)
59 }
60 fn parse(&self, _source: &str, bytes: &[u8]) -> Result<Vec<Column>, AxError> {
61 let text = std::str::from_utf8(bytes).map_err(|e| self.err(e))?;
62 let mut builder = TableBuilder::new();
63 for line in text.lines() {
64 if line.trim().is_empty() {
65 continue;
66 }
67 let value: J = serde_json::from_str(line).map_err(|e| self.err(e))?;
68 let obj = value
69 .as_object()
70 .ok_or_else(|| self.err("journal entry is not a JSON object"))?;
71 let mut row: BTreeMap<String, Value> = BTreeMap::new();
72 for (key, val) in obj {
73 let cell = match val {
77 J::String(s) => infer::infer_scalar(s),
78 other => infer::json_to_value(other),
79 };
80 row.insert(key.clone(), cell);
81 }
82 builder.push_row(row);
83 }
84 Ok(builder.finish())
85 }
86}
87
88#[cfg(test)]
89mod tests {
90 use super::*;
91 use ax_core::ColType;
92
93 const JOURNAL: &str = concat!(
94 r#"{"__CURSOR":"s=a","__REALTIME_TIMESTAMP":"1612345678000000","__MONOTONIC_TIMESTAMP":"1000","_SYSTEMD_UNIT":"sshd.service","PRIORITY":"6","_PID":"1234","MESSAGE":"Accepted password","_HOSTNAME":"host"}"#,
95 "\n",
96 r#"{"__CURSOR":"s=b","__REALTIME_TIMESTAMP":"1612345679000000","_SYSTEMD_UNIT":"cron.service","PRIORITY":"5","MESSAGE":"job done"}"#,
97 "\n",
98 r#"{"__CURSOR":"s=c","__REALTIME_TIMESTAMP":"1612345680000000","_SYSTEMD_UNIT":"sshd.service","PRIORITY":"3","MESSAGE":[72,73]}"#,
99 "\n",
100 );
101
102 fn parse(s: &str) -> Vec<Column> {
103 JournalParser.parse("-", s.as_bytes()).unwrap()
104 }
105 fn col<'a>(cols: &'a [Column], name: &str) -> &'a Column {
106 cols.iter()
107 .find(|c| c.name == name)
108 .unwrap_or_else(|| panic!("missing column {name}"))
109 }
110
111 #[test]
112 fn numeric_string_fields_are_coerced() {
113 let cols = parse(JOURNAL);
114 let ts = col(&cols, "__REALTIME_TIMESTAMP");
115 assert_eq!(ts.ty, ColType::Int, "timestamp coerced for cadence/cusum");
116 assert_eq!(ts.cells[0], Value::Int(1_612_345_678_000_000));
117 let prio = col(&cols, "PRIORITY");
118 assert_eq!(prio.ty, ColType::Int);
119 assert_eq!(prio.cells[0], Value::Int(6));
120 assert_eq!(prio.cells[2], Value::Int(3));
121 assert_eq!(col(&cols, "_PID").cells[0], Value::Int(1234));
122 }
123
124 #[test]
125 fn unit_and_message_stay_strings() {
126 let cols = parse(JOURNAL);
127 let unit = col(&cols, "_SYSTEMD_UNIT");
128 assert_eq!(unit.ty, ColType::Str, "unit stays categorical for dist");
129 assert_eq!(unit.cells[0], Value::Str("sshd.service".into()));
130 assert_eq!(
131 col(&cols, "MESSAGE").cells[0],
132 Value::Str("Accepted password".into())
133 );
134 }
135
136 #[test]
137 fn missing_fields_pad_with_null() {
138 let cols = parse(JOURNAL);
139 assert_eq!(col(&cols, "_PID").null_count(), 2);
141 assert_eq!(col(&cols, "_HOSTNAME").null_count(), 2);
142 }
143
144 #[test]
145 fn array_valued_field_is_canonical_json() {
146 let cols = parse(JOURNAL);
148 assert_eq!(col(&cols, "MESSAGE").cells[2], Value::Str("[72,73]".into()));
149 }
150
151 #[test]
152 fn malformed_entries_error() {
153 assert!(matches!(
154 JournalParser.parse("-", b"not json\n"),
155 Err(AxError::Parse { .. })
156 ));
157 assert!(matches!(
159 JournalParser.parse("-", b"[1,2,3]\n"),
160 Err(AxError::Parse { .. })
161 ));
162 }
163
164 #[test]
165 fn sniff_keys_on_journald_signature() {
166 assert_eq!(JournalParser.sniff(JOURNAL.as_bytes()), Some(STRONG));
167 assert_eq!(
169 JournalParser.sniff(br#"{"__CURSOR":"x","MESSAGE":"y"}"#),
170 Some(STRONG)
171 );
172 assert_eq!(JournalParser.sniff(b"{\"a\":1}\n{\"a\":2}\n"), None);
174 assert_eq!(JournalParser.sniff(br#"{"foo":1}"#), None);
175 assert_eq!(JournalParser.sniff(b"a,b,c\n1,2,3"), None); }
177
178 #[test]
179 fn claims_no_extension() {
180 assert!(JournalParser.extensions().is_empty());
181 }
182
183 #[test]
184 fn resolves_journal_over_ndjson_by_content() {
185 let reg = crate::parser::ParserRegistry::default();
186 assert_eq!(
188 reg.resolve("-", JOURNAL.as_bytes()).unwrap().id(),
189 "journal"
190 );
191 assert_eq!(
193 reg.resolve("-", b"{\"a\":1}\n{\"a\":2}\n").unwrap().id(),
194 "ndjson"
195 );
196 }
197}