Skip to main content

anomalyx_normalize/parsers/
syslog.rs

1//! Syslog parser — RFC 3164 (BSD) and RFC 5424 wire messages.
2//!
3//! Both variants begin with a `<PRI>` priority header (`PRI = facility*8 +
4//! severity`, 0–191). We derive the numeric `facility`/`severity` ourselves from
5//! that header (clean and deterministic) and delegate the harder dual-RFC field
6//! parsing — BSD vs ISO timestamps, app/host/proc IDs, RFC 5424 structured data
7//! — to `syslog_loose`. One row per message, with columns the detectors want:
8//! `severity`/`facility` for event-rate `dist` drift, `hostname` for rare-host
9//! `structural`/`dist`, and rows-as-an-ordered-series for off-hours `contextual`
10//! (`--period 24`).
11//!
12//! Determinism: `syslog_loose`'s default entry point fills a year-less RFC 3164
13//! timestamp from the wall clock and the local time zone. We instead pin a fixed
14//! year and UTC, so the same bytes always normalize identically (the real
15//! month/day/time are preserved; only the absent RFC 3164 year is a sentinel).
16//!
17//! Detected by the `<PRI>` header; claims `.syslog` (a plain `.log` is too
18//! generic). A line without a valid `<PRI>` is a clean parse error.
19
20use crate::parser::{Confidence, FormatParser, STRONG};
21use crate::table::TableBuilder;
22use ax_core::{AxError, Column, Value};
23use chrono::Utc;
24use std::collections::BTreeMap;
25use syslog_loose::{parse_message_with_year_tz, ProcId, Protocol, Variant};
26
27#[derive(Debug, Default, Clone)]
28pub struct SyslogParser;
29
30/// The RFC 3164 year is unknowable from the wire; pin a sentinel so the parse is
31/// deterministic (the month/day/time carry the real information).
32const SENTINEL_YEAR: i32 = 1970;
33
34/// Parses the leading `<PRI>` header into `(facility, severity)`. `PRI = facility
35/// * 8 + severity` and is 0–191; anything else is not a syslog priority.
36fn parse_pri(line: &str) -> Option<(i64, i64)> {
37    let rest = line.strip_prefix('<')?;
38    let end = rest.find('>')?;
39    let pri: u16 = rest[..end].parse().ok()?;
40    (pri <= 191).then_some(((pri / 8) as i64, (pri % 8) as i64))
41}
42
43impl SyslogParser {
44    fn err(&self, msg: impl std::fmt::Display) -> AxError {
45        AxError::Parse {
46            format: self.id().to_string(),
47            message: msg.to_string(),
48        }
49    }
50}
51
52impl FormatParser for SyslogParser {
53    fn id(&self) -> &'static str {
54        "syslog"
55    }
56    fn extensions(&self) -> &'static [&'static str] {
57        &["syslog"]
58    }
59    fn sniff(&self, bytes: &[u8]) -> Option<Confidence> {
60        let text = std::str::from_utf8(bytes).ok()?;
61        let line = text.lines().find(|l| !l.trim().is_empty())?;
62        parse_pri(line).map(|_| STRONG)
63    }
64    fn parse(&self, _source: &str, bytes: &[u8]) -> Result<Vec<Column>, AxError> {
65        let text = std::str::from_utf8(bytes).map_err(|e| self.err(e))?;
66        let mut builder = TableBuilder::new();
67        for line in text.lines() {
68            if line.trim().is_empty() {
69                continue;
70            }
71            let (facility, severity) = parse_pri(line)
72                .ok_or_else(|| self.err("not a syslog line: missing or invalid <PRI> header"))?;
73            let msg =
74                parse_message_with_year_tz(line, |_| SENTINEL_YEAR, Some(Utc), Variant::Either);
75
76            let mut row: BTreeMap<String, Value> = BTreeMap::new();
77            row.insert("facility".into(), Value::Int(facility));
78            row.insert("severity".into(), Value::Int(severity));
79            row.insert(
80                "protocol".into(),
81                Value::Str(
82                    match msg.protocol {
83                        Protocol::RFC3164 => "RFC3164",
84                        Protocol::RFC5424(_) => "RFC5424",
85                    }
86                    .to_string(),
87                ),
88            );
89            if let Some(ts) = msg.timestamp {
90                row.insert("timestamp".into(), Value::Str(ts.to_string()));
91            }
92            if let Some(host) = msg.hostname {
93                row.insert("hostname".into(), Value::Str(host.to_string()));
94            }
95            if let Some(app) = msg.appname {
96                row.insert("appname".into(), Value::Str(app.to_string()));
97            }
98            if let Some(procid) = msg.procid {
99                let v = match procid {
100                    ProcId::PID(pid) => Value::Int(pid as i64),
101                    ProcId::Name(name) => Value::Str(name.to_string()),
102                };
103                row.insert("procid".into(), v);
104            }
105            if let Some(msgid) = msg.msgid {
106                row.insert("msgid".into(), Value::Str(msgid.to_string()));
107            }
108            for element in &msg.structured_data {
109                for (key, value) in &element.params {
110                    row.insert(
111                        format!("sd.{}.{}", element.id, key),
112                        Value::Str(value.to_string()),
113                    );
114                }
115            }
116            row.insert("message".into(), Value::Str(msg.msg.to_string()));
117            builder.push_row(row);
118        }
119        Ok(builder.finish())
120    }
121}
122
123#[cfg(test)]
124mod tests {
125    use super::*;
126    use ax_core::ColType;
127
128    const SYSLOG: &str = concat!(
129        r#"<165>1 2003-10-11T22:14:15.003Z mymachine.example.com evntslog 1234 ID47 [exampleSDID@32473 iut="3" eventID="1011"] App event log entry"#,
130        "\n",
131        "<34>Oct 11 22:14:15 mymachine su[567]: 'su root' failed for lonvick\n",
132    );
133
134    fn parse(s: &str) -> Vec<Column> {
135        SyslogParser.parse("-", s.as_bytes()).unwrap()
136    }
137    fn col<'a>(cols: &'a [Column], name: &str) -> &'a Column {
138        cols.iter()
139            .find(|c| c.name == name)
140            .unwrap_or_else(|| panic!("missing column {name}"))
141    }
142
143    #[test]
144    fn priority_decodes_to_facility_and_severity() {
145        let cols = parse(SYSLOG);
146        let fac = col(&cols, "facility");
147        let sev = col(&cols, "severity");
148        assert_eq!(fac.ty, ColType::Int);
149        assert_eq!(sev.ty, ColType::Int);
150        assert_eq!(fac.cells, vec![Value::Int(20), Value::Int(4)]); // 165/8, 34/8
151        assert_eq!(sev.cells, vec![Value::Int(5), Value::Int(2)]); // 165%8, 34%8
152    }
153
154    #[test]
155    fn both_rfc_variants_parse_their_fields() {
156        let cols = parse(SYSLOG);
157        assert_eq!(
158            col(&cols, "protocol").cells,
159            vec![Value::Str("RFC5424".into()), Value::Str("RFC3164".into())]
160        );
161        assert_eq!(
162            col(&cols, "hostname").cells,
163            vec![
164                Value::Str("mymachine.example.com".into()),
165                Value::Str("mymachine".into())
166            ]
167        );
168        assert_eq!(
169            col(&cols, "appname").cells,
170            vec![Value::Str("evntslog".into()), Value::Str("su".into())]
171        );
172        assert_eq!(
173            col(&cols, "procid").cells,
174            vec![Value::Int(1234), Value::Int(567)]
175        );
176    }
177
178    #[test]
179    fn rfc5424_only_fields_pad_with_null() {
180        let cols = parse(SYSLOG);
181        // msgid and structured data exist only on the RFC 5424 row.
182        assert_eq!(col(&cols, "msgid").cells[0], Value::Str("ID47".into()));
183        assert_eq!(col(&cols, "msgid").cells[1], Value::Null);
184        let sd = col(&cols, "sd.exampleSDID@32473.iut");
185        assert_eq!(sd.cells[0], Value::Str("3".into()));
186        assert_eq!(sd.cells[1], Value::Null);
187        assert_eq!(
188            col(&cols, "sd.exampleSDID@32473.eventID").cells[0],
189            Value::Str("1011".into())
190        );
191    }
192
193    #[test]
194    fn message_body_is_captured() {
195        let cols = parse(SYSLOG);
196        let msg = col(&cols, "message");
197        assert_eq!(msg.cells[0], Value::Str("App event log entry".into()));
198        assert_eq!(
199            msg.cells[1],
200            Value::Str("'su root' failed for lonvick".into())
201        );
202    }
203
204    #[test]
205    fn deterministic_across_calls() {
206        // Same bytes → byte-identical columns, despite RFC 3164's missing year
207        // (pinned to a sentinel, never the wall clock).
208        assert_eq!(
209            format!("{:?}", parse(SYSLOG)),
210            format!("{:?}", parse(SYSLOG))
211        );
212        // The RFC 3164 timestamp uses the sentinel year, deterministically.
213        let cols = parse(SYSLOG);
214        let ts = col(&cols, "timestamp");
215        match &ts.cells[1] {
216            Value::Str(s) => assert!(s.starts_with("1970-"), "sentinel year, got {s}"),
217            other => panic!("expected Str timestamp, got {other:?}"),
218        }
219    }
220
221    #[test]
222    fn parse_pri_units() {
223        assert_eq!(parse_pri("<0>x"), Some((0, 0)));
224        assert_eq!(parse_pri("<34>x"), Some((4, 2)));
225        assert_eq!(parse_pri("<165>x"), Some((20, 5)));
226        assert_eq!(parse_pri("<191>x"), Some((23, 7))); // max valid
227        assert_eq!(parse_pri("<192>x"), None); // out of range
228        assert_eq!(parse_pri("<abc>x"), None); // not a number
229        assert_eq!(parse_pri("<34"), None); // unterminated
230        assert_eq!(parse_pri("no bracket"), None);
231    }
232
233    #[test]
234    fn malformed_lines_error() {
235        assert!(matches!(
236            SyslogParser.parse("-", b"this is not syslog\n"),
237            Err(AxError::Parse { .. })
238        ));
239        assert!(matches!(
240            SyslogParser.parse("-", b"<192>priority out of range\n"),
241            Err(AxError::Parse { .. })
242        ));
243    }
244
245    #[test]
246    fn sniff_keys_on_pri_header() {
247        assert_eq!(SyslogParser.sniff(SYSLOG.as_bytes()), Some(STRONG));
248        assert_eq!(
249            SyslogParser.sniff(b"<13>Feb  5 17:32:18 host app: msg\n"),
250            Some(STRONG)
251        );
252        assert_eq!(SyslogParser.sniff(b"<999>bad pri\n"), None); // > 191
253        assert_eq!(SyslogParser.sniff(b"<?xml version=\"1.0\"?>"), None); // XML, not PRI
254        assert_eq!(SyslogParser.sniff(b"plain text line\n"), None);
255        assert_eq!(SyslogParser.sniff(b"{\"a\":1}"), None);
256        assert_eq!(SyslogParser.sniff(b"a,b,c\n1,2,3"), None);
257    }
258
259    #[test]
260    fn claims_syslog_extension() {
261        assert_eq!(SyslogParser.extensions(), &["syslog"]);
262    }
263
264    #[test]
265    fn resolves_by_extension_and_content() {
266        let reg = crate::parser::ParserRegistry::default();
267        assert_eq!(
268            reg.resolve("app.syslog", b"<34>Oct 11 22:14:15 h a: m")
269                .unwrap()
270                .id(),
271            "syslog"
272        );
273        assert_eq!(reg.resolve("-", SYSLOG.as_bytes()).unwrap().id(), "syslog");
274    }
275}