Skip to main content

anomalyx_normalize/parsers/
journal.rs

1//! systemd journal export parser — `journalctl -o json`.
2//!
3//! The export is NDJSON (one JSON object per log entry), but journald encodes
4//! *every* value as a string — including numeric fields like
5//! `__REALTIME_TIMESTAMP` (microseconds since epoch), `PRIORITY`, and `_PID`.
6//! Generic NDJSON would leave those as `Str`, where the numeric detectors can't
7//! reach them. So this parser type-**coerces** each string scalar (the same
8//! inference CSV uses): `__REALTIME_TIMESTAMP` → `Int` for `--cadence` /
9//! `coll.cusum` event-rate analysis, `PRIORITY` → `Int`, while `_SYSTEMD_UNIT`
10//! stays a `Str` for rare-unit `dist` drift. A field that appears as an array
11//! (multi-value, or a non-UTF-8 byte field) is kept as its canonical JSON string.
12//!
13//! Detected by journald's signature trusted fields (`__REALTIME_TIMESTAMP` /
14//! `__CURSOR` / `_SYSTEMD_UNIT`); claims no extension (the JSON export is
15//! generically `*.json`, and `.journal` is the unrelated binary format).
16
17use crate::infer;
18use crate::parser::{Confidence, FormatParser, STRONG};
19use crate::table::TableBuilder;
20use ax_core::{AxError, Column, Value};
21use serde_json::Value as J;
22use std::collections::BTreeMap;
23
24#[derive(Debug, Default, Clone)]
25pub struct JournalParser;
26
27/// Trusted/addressing fields that `journalctl -o json` always emits — their
28/// presence is the journal signature, distinguishing it from generic NDJSON.
29const SIGNATURE_KEYS: [&str; 3] = ["__REALTIME_TIMESTAMP", "__CURSOR", "_SYSTEMD_UNIT"];
30
31fn looks_like_journal(obj: &serde_json::Map<String, J>) -> bool {
32    SIGNATURE_KEYS.iter().any(|k| obj.contains_key(*k))
33}
34
35impl JournalParser {
36    fn err(&self, msg: impl std::fmt::Display) -> AxError {
37        AxError::Parse {
38            format: self.id().to_string(),
39            message: msg.to_string(),
40        }
41    }
42}
43
44impl FormatParser for JournalParser {
45    fn id(&self) -> &'static str {
46        "journal"
47    }
48    fn extensions(&self) -> &'static [&'static str] {
49        &[]
50    }
51    fn sniff(&self, bytes: &[u8]) -> Option<Confidence> {
52        let text = std::str::from_utf8(bytes).ok()?;
53        let line = text.lines().find(|l| !l.trim().is_empty())?;
54        let value: J = serde_json::from_str(line).ok()?;
55        value
56            .as_object()
57            .is_some_and(looks_like_journal)
58            .then_some(STRONG)
59    }
60    fn parse(&self, _source: &str, bytes: &[u8]) -> Result<Vec<Column>, AxError> {
61        let text = std::str::from_utf8(bytes).map_err(|e| self.err(e))?;
62        let mut builder = TableBuilder::new();
63        for line in text.lines() {
64            if line.trim().is_empty() {
65                continue;
66            }
67            let value: J = serde_json::from_str(line).map_err(|e| self.err(e))?;
68            let obj = value
69                .as_object()
70                .ok_or_else(|| self.err("journal entry is not a JSON object"))?;
71            let mut row: BTreeMap<String, Value> = BTreeMap::new();
72            for (key, val) in obj {
73                // journald stores everything as a string: infer the type so
74                // numeric fields become numeric columns. Arrays/objects keep the
75                // existing JSON lowering (canonical string for non-scalars).
76                let cell = match val {
77                    J::String(s) => infer::infer_scalar(s),
78                    other => infer::json_to_value(other),
79                };
80                row.insert(key.clone(), cell);
81            }
82            builder.push_row(row);
83        }
84        Ok(builder.finish())
85    }
86}
87
88#[cfg(test)]
89mod tests {
90    use super::*;
91    use ax_core::ColType;
92
93    const JOURNAL: &str = concat!(
94        r#"{"__CURSOR":"s=a","__REALTIME_TIMESTAMP":"1612345678000000","__MONOTONIC_TIMESTAMP":"1000","_SYSTEMD_UNIT":"sshd.service","PRIORITY":"6","_PID":"1234","MESSAGE":"Accepted password","_HOSTNAME":"host"}"#,
95        "\n",
96        r#"{"__CURSOR":"s=b","__REALTIME_TIMESTAMP":"1612345679000000","_SYSTEMD_UNIT":"cron.service","PRIORITY":"5","MESSAGE":"job done"}"#,
97        "\n",
98        r#"{"__CURSOR":"s=c","__REALTIME_TIMESTAMP":"1612345680000000","_SYSTEMD_UNIT":"sshd.service","PRIORITY":"3","MESSAGE":[72,73]}"#,
99        "\n",
100    );
101
102    fn parse(s: &str) -> Vec<Column> {
103        JournalParser.parse("-", s.as_bytes()).unwrap()
104    }
105    fn col<'a>(cols: &'a [Column], name: &str) -> &'a Column {
106        cols.iter()
107            .find(|c| c.name == name)
108            .unwrap_or_else(|| panic!("missing column {name}"))
109    }
110
111    #[test]
112    fn numeric_string_fields_are_coerced() {
113        let cols = parse(JOURNAL);
114        let ts = col(&cols, "__REALTIME_TIMESTAMP");
115        assert_eq!(ts.ty, ColType::Int, "timestamp coerced for cadence/cusum");
116        assert_eq!(ts.cells[0], Value::Int(1_612_345_678_000_000));
117        let prio = col(&cols, "PRIORITY");
118        assert_eq!(prio.ty, ColType::Int);
119        assert_eq!(prio.cells[0], Value::Int(6));
120        assert_eq!(prio.cells[2], Value::Int(3));
121        assert_eq!(col(&cols, "_PID").cells[0], Value::Int(1234));
122    }
123
124    #[test]
125    fn unit_and_message_stay_strings() {
126        let cols = parse(JOURNAL);
127        let unit = col(&cols, "_SYSTEMD_UNIT");
128        assert_eq!(unit.ty, ColType::Str, "unit stays categorical for dist");
129        assert_eq!(unit.cells[0], Value::Str("sshd.service".into()));
130        assert_eq!(
131            col(&cols, "MESSAGE").cells[0],
132            Value::Str("Accepted password".into())
133        );
134    }
135
136    #[test]
137    fn missing_fields_pad_with_null() {
138        let cols = parse(JOURNAL);
139        // _PID and _HOSTNAME appear only on the first entry.
140        assert_eq!(col(&cols, "_PID").null_count(), 2);
141        assert_eq!(col(&cols, "_HOSTNAME").null_count(), 2);
142    }
143
144    #[test]
145    fn array_valued_field_is_canonical_json() {
146        // A non-UTF-8 MESSAGE is exported as a byte array; keep it as a string.
147        let cols = parse(JOURNAL);
148        assert_eq!(col(&cols, "MESSAGE").cells[2], Value::Str("[72,73]".into()));
149    }
150
151    #[test]
152    fn malformed_entries_error() {
153        assert!(matches!(
154            JournalParser.parse("-", b"not json\n"),
155            Err(AxError::Parse { .. })
156        ));
157        // Valid JSON but not an object (journald entries are always objects).
158        assert!(matches!(
159            JournalParser.parse("-", b"[1,2,3]\n"),
160            Err(AxError::Parse { .. })
161        ));
162    }
163
164    #[test]
165    fn sniff_keys_on_journald_signature() {
166        assert_eq!(JournalParser.sniff(JOURNAL.as_bytes()), Some(STRONG));
167        // A single signature key is enough (covers any-vs-all).
168        assert_eq!(
169            JournalParser.sniff(br#"{"__CURSOR":"x","MESSAGE":"y"}"#),
170            Some(STRONG)
171        );
172        // Generic NDJSON with no journald field is NOT journal.
173        assert_eq!(JournalParser.sniff(b"{\"a\":1}\n{\"a\":2}\n"), None);
174        assert_eq!(JournalParser.sniff(br#"{"foo":1}"#), None);
175        assert_eq!(JournalParser.sniff(b"a,b,c\n1,2,3"), None); // not JSON
176    }
177
178    #[test]
179    fn claims_no_extension() {
180        assert!(JournalParser.extensions().is_empty());
181    }
182
183    #[test]
184    fn resolves_journal_over_ndjson_by_content() {
185        let reg = crate::parser::ParserRegistry::default();
186        // journald content wins the journal signature over generic NDJSON.
187        assert_eq!(
188            reg.resolve("-", JOURNAL.as_bytes()).unwrap().id(),
189            "journal"
190        );
191        // Generic NDJSON (no signature) is still NDJSON.
192        assert_eq!(
193            reg.resolve("-", b"{\"a\":1}\n{\"a\":2}\n").unwrap().id(),
194            "ndjson"
195        );
196    }
197}