Skip to main content

anomalyx_normalize/parsers/
zeek.rs

1//! Zeek (Bro) TSV log parser — `conn.log`, `dns.log`, and the rest of the family.
2//!
3//! Zeek logs are tab-separated with a `#`-prefixed metadata header that declares
4//! the field separator (`#separator`), the column names (`#fields`), and the
5//! token used for unset values (`#unset_field`, default `-`). We honor those
6//! directives and map the unset token to `Null`. Detected by its unmistakable
7//! `#separator` header, so it claims no file extension (Zeek logs are generically
8//! named `*.log`, which we don't want to hijack).
9
10use crate::infer;
11use crate::parser::{Confidence, FormatParser, STRONG};
12use ax_core::{AxError, Column, Value};
13
14#[derive(Debug, Default, Clone)]
15pub struct ZeekParser;
16
17/// Decodes a Zeek `#separator` value such as `\x09` into its character. A plain
18/// character (or unescaped byte) is returned as-is.
19fn decode_separator(token: &str) -> Option<char> {
20    match token.strip_prefix("\\x") {
21        Some(hex) => u8::from_str_radix(hex, 16).ok().map(|b| b as char),
22        None => token.chars().next(),
23    }
24}
25
26impl ZeekParser {
27    fn err(&self, msg: impl std::fmt::Display) -> AxError {
28        AxError::Parse {
29            format: self.id().to_string(),
30            message: msg.to_string(),
31        }
32    }
33}
34
35impl FormatParser for ZeekParser {
36    fn id(&self) -> &'static str {
37        "zeek"
38    }
39    fn extensions(&self) -> &'static [&'static str] {
40        // No extension: Zeek logs are `*.log`, too generic to claim. Detection
41        // is by the `#separator` header (see `sniff`).
42        &[]
43    }
44    fn sniff(&self, bytes: &[u8]) -> Option<Confidence> {
45        let text = std::str::from_utf8(bytes).ok()?;
46        text.trim_start()
47            .starts_with("#separator")
48            .then_some(STRONG)
49    }
50    fn parse(&self, _source: &str, bytes: &[u8]) -> Result<Vec<Column>, AxError> {
51        let text = std::str::from_utf8(bytes).map_err(|e| self.err(e))?;
52        let mut sep = '\t';
53        let mut unset = "-".to_string();
54        let mut empty = "(empty)".to_string();
55        let mut fields: Option<Vec<String>> = None;
56        let mut cols: Vec<Vec<Value>> = Vec::new();
57
58        for line in text.lines() {
59            if line.is_empty() {
60                continue;
61            }
62            if let Some(rest) = line.strip_prefix('#') {
63                // `#separator` is space-delimited and comes first; it sets the
64                // separator the rest of the header and the data rows use.
65                if let Some(val) = rest.strip_prefix("separator") {
66                    if let Some(c) = decode_separator(val.trim()) {
67                        sep = c;
68                    }
69                    continue;
70                }
71                let mut parts = rest.split(sep);
72                match parts.next().unwrap_or("") {
73                    "fields" => {
74                        let names: Vec<String> = parts.map(str::to_string).collect();
75                        cols = vec![Vec::new(); names.len()];
76                        fields = Some(names);
77                    }
78                    "unset_field" => {
79                        if let Some(v) = parts.next() {
80                            unset = v.to_string();
81                        }
82                    }
83                    "empty_field" => {
84                        if let Some(v) = parts.next() {
85                            empty = v.to_string();
86                        }
87                    }
88                    _ => {} // set_separator, types, path, open, close — ignored
89                }
90                continue;
91            }
92
93            // A data row. It must come after the `#fields` header.
94            if fields.is_none() {
95                return Err(self.err("data row before #fields header"));
96            }
97            let mut values = line.split(sep);
98            for col in cols.iter_mut() {
99                col.push(match values.next() {
100                    Some(v) if v == unset => Value::Null,
101                    Some(v) if v == empty => Value::Str(String::new()),
102                    Some(v) => infer::infer_scalar(v),
103                    None => Value::Null,
104                });
105            }
106        }
107
108        let names = fields.ok_or_else(|| self.err("missing #fields header"))?;
109        Ok(names
110            .into_iter()
111            .zip(cols)
112            .map(|(name, cells)| Column::new(name, cells))
113            .collect())
114    }
115}
116
117#[cfg(test)]
118mod tests {
119    use super::*;
120    use ax_core::ColType;
121
122    // A realistic conn.log slice: literal `\x09` separator directive, real tabs
123    // elsewhere, and `-` for unset fields (service/duration on the second row).
124    const CONN: &str = "#separator \\x09\n\
125#set_separator\t,\n\
126#empty_field\t(empty)\n\
127#unset_field\t-\n\
128#path\tconn\n\
129#fields\tts\tuid\tid.orig_h\tid.orig_p\tproto\tservice\tduration\torig_bytes\n\
130#types\ttime\tstring\taddr\tport\tenum\tstring\tinterval\tcount\n\
1311300475167.096535\tCwx1\t192.168.1.1\t80\ttcp\thttp\t0.512\t1024\n\
1321300475168.000000\tCwy2\t10.0.0.2\t443\ttcp\t-\t-\t-\n\
133#close\t2011-03-18-19-06-08\n";
134
135    fn parse(s: &str) -> Vec<Column> {
136        ZeekParser.parse("conn.log", s.as_bytes()).unwrap()
137    }
138
139    #[test]
140    fn separator_directive_decodes() {
141        assert_eq!(decode_separator("\\x09"), Some('\t'));
142        assert_eq!(decode_separator("\\x2c"), Some(','));
143        assert_eq!(decode_separator(","), Some(','));
144        assert_eq!(decode_separator(""), None);
145    }
146
147    #[test]
148    fn parses_fields_and_typed_columns() {
149        let cols = parse(CONN);
150        let names: Vec<&str> = cols.iter().map(|c| c.name.as_str()).collect();
151        assert_eq!(
152            names,
153            vec![
154                "ts",
155                "uid",
156                "id.orig_h",
157                "id.orig_p",
158                "proto",
159                "service",
160                "duration",
161                "orig_bytes"
162            ]
163        );
164        assert_eq!(cols[0].len(), 2, "two data rows");
165        assert_eq!(cols[0].ty, ColType::Float, "ts is epoch float");
166        assert_eq!(cols[3].ty, ColType::Int, "ports are ints");
167        assert_eq!(cols[1].ty, ColType::Str, "uid is string");
168    }
169
170    #[test]
171    fn unset_token_maps_to_null() {
172        let cols = parse(CONN);
173        let service = cols.iter().find(|c| c.name == "service").unwrap();
174        // row 0 "http", row 1 "-" (unset) → exactly one null
175        assert_eq!(service.null_count(), 1);
176        assert_eq!(service.cells[0], Value::Str("http".into()));
177        assert_eq!(service.cells[1], Value::Null);
178        // the comment header lines are not data
179        assert_eq!(
180            cols.iter()
181                .find(|c| c.name == "duration")
182                .unwrap()
183                .null_count(),
184            1
185        );
186    }
187
188    #[test]
189    fn respects_a_custom_unset_field() {
190        let s = "#separator \\x09\n#unset_field\tNULL\n#fields\ta\tb\n1\tNULL\n";
191        let cols = ZeekParser.parse("-", s.as_bytes()).unwrap();
192        assert_eq!(cols[1].cells[0], Value::Null);
193    }
194
195    #[test]
196    fn respects_a_custom_empty_field() {
197        // A non-default empty token maps to an empty string (distinct from unset
198        // → Null). Uses a custom token so the directive is actually observed.
199        let s = "#separator \\x09\n#empty_field\tEMPTY\n#unset_field\t-\n#fields\ta\tb\n1\tEMPTY\n2\t-\n";
200        let cols = ZeekParser.parse("-", s.as_bytes()).unwrap();
201        assert_eq!(
202            cols[1].cells[0],
203            Value::Str(String::new()),
204            "empty token → empty string"
205        );
206        assert_eq!(cols[1].cells[1], Value::Null, "unset token → null");
207    }
208
209    #[test]
210    fn sniff_recognizes_zeek_header_only() {
211        assert_eq!(ZeekParser.sniff(CONN.as_bytes()), Some(STRONG));
212        assert_eq!(ZeekParser.sniff(b"a,b\n1,2"), None);
213        assert_eq!(ZeekParser.sniff(b"ts\tuid\n1\tx"), None); // plain TSV is not Zeek
214    }
215
216    #[test]
217    fn claims_no_extension() {
218        assert!(ZeekParser.extensions().is_empty());
219    }
220
221    #[test]
222    fn data_before_fields_errors() {
223        assert!(matches!(
224            ZeekParser.parse("-", b"1\t2\t3\n"),
225            Err(AxError::Parse { .. })
226        ));
227    }
228
229    #[test]
230    fn header_without_fields_errors() {
231        assert!(matches!(
232            ZeekParser.parse("-", b"#separator \\x09\n#path\tconn\n"),
233            Err(AxError::Parse { .. })
234        ));
235    }
236
237    #[test]
238    fn resolves_by_content_not_extension() {
239        let reg = crate::parser::ParserRegistry::default();
240        // a Zeek body wins by sniff even though `.log` claims no parser
241        assert_eq!(
242            reg.resolve("conn.log", CONN.as_bytes()).unwrap().id(),
243            "zeek"
244        );
245        // a non-Zeek `.log` is NOT hijacked by the zeek parser
246        assert_eq!(reg.resolve("app.log", b"a,b\n1,2").unwrap().id(), "csv");
247    }
248}