anomalyx_normalize/parsers/
zeek.rs1use crate::infer;
11use crate::parser::{Confidence, FormatParser, STRONG};
12use ax_core::{AxError, Column, Value};
13
14#[derive(Debug, Default, Clone)]
15pub struct ZeekParser;
16
17fn decode_separator(token: &str) -> Option<char> {
20 match token.strip_prefix("\\x") {
21 Some(hex) => u8::from_str_radix(hex, 16).ok().map(|b| b as char),
22 None => token.chars().next(),
23 }
24}
25
26impl ZeekParser {
27 fn err(&self, msg: impl std::fmt::Display) -> AxError {
28 AxError::Parse {
29 format: self.id().to_string(),
30 message: msg.to_string(),
31 }
32 }
33}
34
35impl FormatParser for ZeekParser {
36 fn id(&self) -> &'static str {
37 "zeek"
38 }
39 fn extensions(&self) -> &'static [&'static str] {
40 &[]
43 }
44 fn sniff(&self, bytes: &[u8]) -> Option<Confidence> {
45 let text = std::str::from_utf8(bytes).ok()?;
46 text.trim_start()
47 .starts_with("#separator")
48 .then_some(STRONG)
49 }
50 fn parse(&self, _source: &str, bytes: &[u8]) -> Result<Vec<Column>, AxError> {
51 let text = std::str::from_utf8(bytes).map_err(|e| self.err(e))?;
52 let mut sep = '\t';
53 let mut unset = "-".to_string();
54 let mut empty = "(empty)".to_string();
55 let mut fields: Option<Vec<String>> = None;
56 let mut cols: Vec<Vec<Value>> = Vec::new();
57
58 for line in text.lines() {
59 if line.is_empty() {
60 continue;
61 }
62 if let Some(rest) = line.strip_prefix('#') {
63 if let Some(val) = rest.strip_prefix("separator") {
66 if let Some(c) = decode_separator(val.trim()) {
67 sep = c;
68 }
69 continue;
70 }
71 let mut parts = rest.split(sep);
72 match parts.next().unwrap_or("") {
73 "fields" => {
74 let names: Vec<String> = parts.map(str::to_string).collect();
75 cols = vec![Vec::new(); names.len()];
76 fields = Some(names);
77 }
78 "unset_field" => {
79 if let Some(v) = parts.next() {
80 unset = v.to_string();
81 }
82 }
83 "empty_field" => {
84 if let Some(v) = parts.next() {
85 empty = v.to_string();
86 }
87 }
88 _ => {} }
90 continue;
91 }
92
93 if fields.is_none() {
95 return Err(self.err("data row before #fields header"));
96 }
97 let mut values = line.split(sep);
98 for col in cols.iter_mut() {
99 col.push(match values.next() {
100 Some(v) if v == unset => Value::Null,
101 Some(v) if v == empty => Value::Str(String::new()),
102 Some(v) => infer::infer_scalar(v),
103 None => Value::Null,
104 });
105 }
106 }
107
108 let names = fields.ok_or_else(|| self.err("missing #fields header"))?;
109 Ok(names
110 .into_iter()
111 .zip(cols)
112 .map(|(name, cells)| Column::new(name, cells))
113 .collect())
114 }
115}
116
117#[cfg(test)]
118mod tests {
119 use super::*;
120 use ax_core::ColType;
121
122 const CONN: &str = "#separator \\x09\n\
125#set_separator\t,\n\
126#empty_field\t(empty)\n\
127#unset_field\t-\n\
128#path\tconn\n\
129#fields\tts\tuid\tid.orig_h\tid.orig_p\tproto\tservice\tduration\torig_bytes\n\
130#types\ttime\tstring\taddr\tport\tenum\tstring\tinterval\tcount\n\
1311300475167.096535\tCwx1\t192.168.1.1\t80\ttcp\thttp\t0.512\t1024\n\
1321300475168.000000\tCwy2\t10.0.0.2\t443\ttcp\t-\t-\t-\n\
133#close\t2011-03-18-19-06-08\n";
134
135 fn parse(s: &str) -> Vec<Column> {
136 ZeekParser.parse("conn.log", s.as_bytes()).unwrap()
137 }
138
139 #[test]
140 fn separator_directive_decodes() {
141 assert_eq!(decode_separator("\\x09"), Some('\t'));
142 assert_eq!(decode_separator("\\x2c"), Some(','));
143 assert_eq!(decode_separator(","), Some(','));
144 assert_eq!(decode_separator(""), None);
145 }
146
147 #[test]
148 fn parses_fields_and_typed_columns() {
149 let cols = parse(CONN);
150 let names: Vec<&str> = cols.iter().map(|c| c.name.as_str()).collect();
151 assert_eq!(
152 names,
153 vec![
154 "ts",
155 "uid",
156 "id.orig_h",
157 "id.orig_p",
158 "proto",
159 "service",
160 "duration",
161 "orig_bytes"
162 ]
163 );
164 assert_eq!(cols[0].len(), 2, "two data rows");
165 assert_eq!(cols[0].ty, ColType::Float, "ts is epoch float");
166 assert_eq!(cols[3].ty, ColType::Int, "ports are ints");
167 assert_eq!(cols[1].ty, ColType::Str, "uid is string");
168 }
169
170 #[test]
171 fn unset_token_maps_to_null() {
172 let cols = parse(CONN);
173 let service = cols.iter().find(|c| c.name == "service").unwrap();
174 assert_eq!(service.null_count(), 1);
176 assert_eq!(service.cells[0], Value::Str("http".into()));
177 assert_eq!(service.cells[1], Value::Null);
178 assert_eq!(
180 cols.iter()
181 .find(|c| c.name == "duration")
182 .unwrap()
183 .null_count(),
184 1
185 );
186 }
187
188 #[test]
189 fn respects_a_custom_unset_field() {
190 let s = "#separator \\x09\n#unset_field\tNULL\n#fields\ta\tb\n1\tNULL\n";
191 let cols = ZeekParser.parse("-", s.as_bytes()).unwrap();
192 assert_eq!(cols[1].cells[0], Value::Null);
193 }
194
195 #[test]
196 fn respects_a_custom_empty_field() {
197 let s = "#separator \\x09\n#empty_field\tEMPTY\n#unset_field\t-\n#fields\ta\tb\n1\tEMPTY\n2\t-\n";
200 let cols = ZeekParser.parse("-", s.as_bytes()).unwrap();
201 assert_eq!(
202 cols[1].cells[0],
203 Value::Str(String::new()),
204 "empty token → empty string"
205 );
206 assert_eq!(cols[1].cells[1], Value::Null, "unset token → null");
207 }
208
209 #[test]
210 fn sniff_recognizes_zeek_header_only() {
211 assert_eq!(ZeekParser.sniff(CONN.as_bytes()), Some(STRONG));
212 assert_eq!(ZeekParser.sniff(b"a,b\n1,2"), None);
213 assert_eq!(ZeekParser.sniff(b"ts\tuid\n1\tx"), None); }
215
216 #[test]
217 fn claims_no_extension() {
218 assert!(ZeekParser.extensions().is_empty());
219 }
220
221 #[test]
222 fn data_before_fields_errors() {
223 assert!(matches!(
224 ZeekParser.parse("-", b"1\t2\t3\n"),
225 Err(AxError::Parse { .. })
226 ));
227 }
228
229 #[test]
230 fn header_without_fields_errors() {
231 assert!(matches!(
232 ZeekParser.parse("-", b"#separator \\x09\n#path\tconn\n"),
233 Err(AxError::Parse { .. })
234 ));
235 }
236
237 #[test]
238 fn resolves_by_content_not_extension() {
239 let reg = crate::parser::ParserRegistry::default();
240 assert_eq!(
242 reg.resolve("conn.log", CONN.as_bytes()).unwrap().id(),
243 "zeek"
244 );
245 assert_eq!(reg.resolve("app.log", b"a,b\n1,2").unwrap().id(), "csv");
247 }
248}