anomalyx_normalize/parsers/
netflow.rs1use crate::infer;
18use crate::parser::{Confidence, FormatParser, STRONG};
19use crate::table::TableBuilder;
20use ax_core::{AxError, Column, Value};
21use std::collections::BTreeMap;
22
23#[derive(Debug, Default, Clone)]
24pub struct NetflowParser;
25
26const SIGNATURE: &[&str] = &["ts", "te", "td", "sa", "da", "sp", "dp", "pr"];
29
30fn canonical(field: &str) -> &str {
33 match field {
34 "ts" => "start",
35 "te" => "end",
36 "td" => "duration",
37 "sa" => "src_addr",
38 "da" => "dst_addr",
39 "sp" => "src_port",
40 "dp" => "dst_port",
41 "pr" => "proto",
42 "flg" => "flags",
43 "ipkt" => "packets",
44 "ibyt" => "bytes",
45 "opkt" => "out_packets",
46 "obyt" => "out_bytes",
47 other => other,
48 }
49}
50
51impl NetflowParser {
52 fn err(&self, msg: impl std::fmt::Display) -> AxError {
53 AxError::Parse {
54 format: self.id().to_string(),
55 message: msg.to_string(),
56 }
57 }
58}
59
60impl FormatParser for NetflowParser {
61 fn id(&self) -> &'static str {
62 "netflow"
63 }
64 fn extensions(&self) -> &'static [&'static str] {
65 &[]
66 }
67 fn sniff(&self, bytes: &[u8]) -> Option<Confidence> {
68 let text = std::str::from_utf8(bytes).ok()?;
69 let line = text.lines().find(|l| !l.trim().is_empty())?;
70 let fields: Vec<&str> = line.split(',').collect();
71 fields.starts_with(SIGNATURE).then_some(STRONG)
72 }
73 fn parse(&self, _source: &str, bytes: &[u8]) -> Result<Vec<Column>, AxError> {
74 let text = std::str::from_utf8(bytes).map_err(|e| self.err(e))?;
75 let mut lines = text.lines().filter(|l| !l.trim().is_empty());
76
77 let header = lines.next().ok_or_else(|| self.err("empty nfdump CSV"))?;
78 let fields: Vec<&str> = header.split(',').collect();
79 if !fields.starts_with(SIGNATURE) {
80 return Err(self.err("not nfdump CSV: unexpected header"));
81 }
82 let names: Vec<String> = fields.iter().map(|f| canonical(f).to_string()).collect();
83
84 let mut builder = TableBuilder::new();
85 for line in lines {
86 if line.trim() == "Summary" {
88 break;
89 }
90 let mut row: BTreeMap<String, Value> = BTreeMap::new();
91 for (name, value) in names.iter().zip(line.split(',')) {
92 row.insert(name.clone(), infer::infer_scalar(value.trim()));
93 }
94 builder.push_row(row);
95 }
96 Ok(builder.finish())
97 }
98}
99
100#[cfg(test)]
101mod tests {
102 use super::*;
103 use ax_core::ColType;
104
105 const NFDUMP: &str = "\
106ts,te,td,sa,da,sp,dp,pr,flg,ipkt,ibyt
1072020-01-01 00:00:00.000,2020-01-01 00:00:01.000,1.000,10.0.0.1,8.8.8.8,12345,53,UDP,......,2,120
1082020-01-01 00:00:02.000,2020-01-01 00:00:30.000,28.000,10.0.0.1,5.6.7.8,40000,443,TCP,.AP.SF,5000,9000000
109Summary
110flows,bytes,packets
1112,9000120,5002
112";
113
114 fn parse(s: &str) -> Vec<Column> {
115 NetflowParser.parse("-", s.as_bytes()).unwrap()
116 }
117 fn col<'a>(cols: &'a [Column], name: &str) -> &'a Column {
118 cols.iter()
119 .find(|c| c.name == name)
120 .unwrap_or_else(|| panic!("missing column {name}"))
121 }
122
123 #[test]
124 fn canonical_renames_the_flow_features() {
125 let cols = parse(NFDUMP);
126 let bytes = col(&cols, "bytes");
128 assert_eq!(bytes.ty, ColType::Int);
129 assert_eq!(bytes.cells, vec![Value::Int(120), Value::Int(9_000_000)]);
130 assert_eq!(
131 col(&cols, "packets").cells,
132 vec![Value::Int(2), Value::Int(5000)]
133 );
134 let duration = col(&cols, "duration");
135 assert_eq!(duration.ty, ColType::Float);
136 assert_eq!(duration.cells, vec![Value::Float(1.0), Value::Float(28.0)]);
137 assert!(cols.iter().all(|c| c.name != "ibyt" && c.name != "td"));
139 }
140
141 #[test]
142 fn addresses_ports_and_proto_are_typed() {
143 let cols = parse(NFDUMP);
144 assert_eq!(
145 col(&cols, "src_addr").cells[0],
146 Value::Str("10.0.0.1".into())
147 );
148 assert_eq!(
149 col(&cols, "dst_addr").cells[1],
150 Value::Str("5.6.7.8".into())
151 );
152 let dport = col(&cols, "dst_port");
153 assert_eq!(dport.ty, ColType::Int);
154 assert_eq!(dport.cells, vec![Value::Int(53), Value::Int(443)]);
155 assert_eq!(col(&cols, "proto").cells[1], Value::Str("TCP".into()));
156 assert_eq!(col(&cols, "flags").cells[1], Value::Str(".AP.SF".into()));
157 }
158
159 #[test]
160 fn summary_section_is_not_parsed_as_flows() {
161 assert_eq!(col(&parse(NFDUMP), "bytes").cells.len(), 2);
163 }
164
165 #[test]
166 fn canonical_units() {
167 assert_eq!(canonical("ibyt"), "bytes");
168 assert_eq!(canonical("ipkt"), "packets");
169 assert_eq!(canonical("td"), "duration");
170 assert_eq!(canonical("sa"), "src_addr");
171 assert_eq!(canonical("dp"), "dst_port");
172 assert_eq!(canonical("pr"), "proto");
173 assert_eq!(canonical("unknown_field"), "unknown_field"); }
175
176 #[test]
177 fn malformed_header_errors() {
178 assert!(matches!(
179 NetflowParser.parse("-", b"a,b,c\n1,2,3\n"),
180 Err(AxError::Parse { .. })
181 ));
182 assert!(matches!(
183 NetflowParser.parse("-", b""),
184 Err(AxError::Parse { .. })
185 ));
186 }
187
188 #[test]
189 fn sniff_keys_on_the_nfdump_header() {
190 assert_eq!(NetflowParser.sniff(NFDUMP.as_bytes()), Some(STRONG));
191 assert_eq!(NetflowParser.sniff(b"ts,te,td,sa\n1,2,3,4"), None);
193 assert_eq!(NetflowParser.sniff(b"a,b,c\n1,2,3"), None); assert_eq!(NetflowParser.sniff(b"{\"a\":1}"), None);
195 }
196
197 #[test]
198 fn claims_no_extension() {
199 assert!(NetflowParser.extensions().is_empty());
200 }
201
202 #[test]
203 fn resolves_netflow_over_csv_by_content() {
204 let reg = crate::parser::ParserRegistry::default();
205 assert_eq!(reg.resolve("-", NFDUMP.as_bytes()).unwrap().id(), "netflow");
206 assert_eq!(reg.resolve("-", b"a,b,c\n1,2,3").unwrap().id(), "csv");
208 }
209}