Skip to main content

anomalyx_normalize/parsers/
netflow.rs

1//! NetFlow / IPFIX parser — flow records via the nfdump CSV export.
2//!
3//! Raw NetFlow v9 / IPFIX is a stateful binary wire format (templates arrive in
4//! separate packets), which fights both determinism and reliable file
5//! detection. The canonical *analyzable* representation is `nfdump -o csv`, whose
6//! header is an unmistakable signature. We parse that, renaming nfdump's cryptic
7//! short names to canonical columns — `ibyt`→`bytes`, `ipkt`→`packets`,
8//! `td`→`duration`, `sa`→`src_addr`, `dp`→`dst_port`, … — so the flow features
9//! the detectors want are directly usable: `mv.mahalanobis` over
10//! `(bytes, packets, duration)` catches exfil where each axis looks normal, and
11//! `dst_port` feeds rare-port `dist` drift. The trailing `Summary` section is
12//! not a flow record and is skipped.
13//!
14//! Detected by the nfdump header signature; claims no extension (nfdump CSV is
15//! generically `*.csv`/`*.txt`).
16
17use crate::infer;
18use crate::parser::{Confidence, FormatParser, STRONG};
19use crate::table::TableBuilder;
20use ax_core::{AxError, Column, Value};
21use std::collections::BTreeMap;
22
23#[derive(Debug, Default, Clone)]
24pub struct NetflowParser;
25
26/// The first fields of an nfdump CSV header, in their canonical order — the
27/// signature that distinguishes a flow export from any other CSV.
28const SIGNATURE: &[&str] = &["ts", "te", "td", "sa", "da", "sp", "dp", "pr"];
29
30/// Maps an nfdump short field name to a canonical column name; unknown fields
31/// pass through under their original name.
32fn canonical(field: &str) -> &str {
33    match field {
34        "ts" => "start",
35        "te" => "end",
36        "td" => "duration",
37        "sa" => "src_addr",
38        "da" => "dst_addr",
39        "sp" => "src_port",
40        "dp" => "dst_port",
41        "pr" => "proto",
42        "flg" => "flags",
43        "ipkt" => "packets",
44        "ibyt" => "bytes",
45        "opkt" => "out_packets",
46        "obyt" => "out_bytes",
47        other => other,
48    }
49}
50
51impl NetflowParser {
52    fn err(&self, msg: impl std::fmt::Display) -> AxError {
53        AxError::Parse {
54            format: self.id().to_string(),
55            message: msg.to_string(),
56        }
57    }
58}
59
60impl FormatParser for NetflowParser {
61    fn id(&self) -> &'static str {
62        "netflow"
63    }
64    fn extensions(&self) -> &'static [&'static str] {
65        &[]
66    }
67    fn sniff(&self, bytes: &[u8]) -> Option<Confidence> {
68        let text = std::str::from_utf8(bytes).ok()?;
69        let line = text.lines().find(|l| !l.trim().is_empty())?;
70        let fields: Vec<&str> = line.split(',').collect();
71        fields.starts_with(SIGNATURE).then_some(STRONG)
72    }
73    fn parse(&self, _source: &str, bytes: &[u8]) -> Result<Vec<Column>, AxError> {
74        let text = std::str::from_utf8(bytes).map_err(|e| self.err(e))?;
75        let mut lines = text.lines().filter(|l| !l.trim().is_empty());
76
77        let header = lines.next().ok_or_else(|| self.err("empty nfdump CSV"))?;
78        let fields: Vec<&str> = header.split(',').collect();
79        if !fields.starts_with(SIGNATURE) {
80            return Err(self.err("not nfdump CSV: unexpected header"));
81        }
82        let names: Vec<String> = fields.iter().map(|f| canonical(f).to_string()).collect();
83
84        let mut builder = TableBuilder::new();
85        for line in lines {
86            // The CSV ends with a separate `Summary` stats section, not flows.
87            if line.trim() == "Summary" {
88                break;
89            }
90            let mut row: BTreeMap<String, Value> = BTreeMap::new();
91            for (name, value) in names.iter().zip(line.split(',')) {
92                row.insert(name.clone(), infer::infer_scalar(value.trim()));
93            }
94            builder.push_row(row);
95        }
96        Ok(builder.finish())
97    }
98}
99
100#[cfg(test)]
101mod tests {
102    use super::*;
103    use ax_core::ColType;
104
105    const NFDUMP: &str = "\
106ts,te,td,sa,da,sp,dp,pr,flg,ipkt,ibyt
1072020-01-01 00:00:00.000,2020-01-01 00:00:01.000,1.000,10.0.0.1,8.8.8.8,12345,53,UDP,......,2,120
1082020-01-01 00:00:02.000,2020-01-01 00:00:30.000,28.000,10.0.0.1,5.6.7.8,40000,443,TCP,.AP.SF,5000,9000000
109Summary
110flows,bytes,packets
1112,9000120,5002
112";
113
114    fn parse(s: &str) -> Vec<Column> {
115        NetflowParser.parse("-", s.as_bytes()).unwrap()
116    }
117    fn col<'a>(cols: &'a [Column], name: &str) -> &'a Column {
118        cols.iter()
119            .find(|c| c.name == name)
120            .unwrap_or_else(|| panic!("missing column {name}"))
121    }
122
123    #[test]
124    fn canonical_renames_the_flow_features() {
125        let cols = parse(NFDUMP);
126        // The mahalanobis triple, under canonical names (not ibyt/ipkt/td).
127        let bytes = col(&cols, "bytes");
128        assert_eq!(bytes.ty, ColType::Int);
129        assert_eq!(bytes.cells, vec![Value::Int(120), Value::Int(9_000_000)]);
130        assert_eq!(
131            col(&cols, "packets").cells,
132            vec![Value::Int(2), Value::Int(5000)]
133        );
134        let duration = col(&cols, "duration");
135        assert_eq!(duration.ty, ColType::Float);
136        assert_eq!(duration.cells, vec![Value::Float(1.0), Value::Float(28.0)]);
137        // The cryptic nfdump names are gone.
138        assert!(cols.iter().all(|c| c.name != "ibyt" && c.name != "td"));
139    }
140
141    #[test]
142    fn addresses_ports_and_proto_are_typed() {
143        let cols = parse(NFDUMP);
144        assert_eq!(
145            col(&cols, "src_addr").cells[0],
146            Value::Str("10.0.0.1".into())
147        );
148        assert_eq!(
149            col(&cols, "dst_addr").cells[1],
150            Value::Str("5.6.7.8".into())
151        );
152        let dport = col(&cols, "dst_port");
153        assert_eq!(dport.ty, ColType::Int);
154        assert_eq!(dport.cells, vec![Value::Int(53), Value::Int(443)]);
155        assert_eq!(col(&cols, "proto").cells[1], Value::Str("TCP".into()));
156        assert_eq!(col(&cols, "flags").cells[1], Value::Str(".AP.SF".into()));
157    }
158
159    #[test]
160    fn summary_section_is_not_parsed_as_flows() {
161        // Two flow rows only; the Summary stats block is skipped.
162        assert_eq!(col(&parse(NFDUMP), "bytes").cells.len(), 2);
163    }
164
165    #[test]
166    fn canonical_units() {
167        assert_eq!(canonical("ibyt"), "bytes");
168        assert_eq!(canonical("ipkt"), "packets");
169        assert_eq!(canonical("td"), "duration");
170        assert_eq!(canonical("sa"), "src_addr");
171        assert_eq!(canonical("dp"), "dst_port");
172        assert_eq!(canonical("pr"), "proto");
173        assert_eq!(canonical("unknown_field"), "unknown_field"); // pass-through
174    }
175
176    #[test]
177    fn malformed_header_errors() {
178        assert!(matches!(
179            NetflowParser.parse("-", b"a,b,c\n1,2,3\n"),
180            Err(AxError::Parse { .. })
181        ));
182        assert!(matches!(
183            NetflowParser.parse("-", b""),
184            Err(AxError::Parse { .. })
185        ));
186    }
187
188    #[test]
189    fn sniff_keys_on_the_nfdump_header() {
190        assert_eq!(NetflowParser.sniff(NFDUMP.as_bytes()), Some(STRONG));
191        // A header missing the full signature is not nfdump.
192        assert_eq!(NetflowParser.sniff(b"ts,te,td,sa\n1,2,3,4"), None);
193        assert_eq!(NetflowParser.sniff(b"a,b,c\n1,2,3"), None); // generic CSV
194        assert_eq!(NetflowParser.sniff(b"{\"a\":1}"), None);
195    }
196
197    #[test]
198    fn claims_no_extension() {
199        assert!(NetflowParser.extensions().is_empty());
200    }
201
202    #[test]
203    fn resolves_netflow_over_csv_by_content() {
204        let reg = crate::parser::ParserRegistry::default();
205        assert_eq!(reg.resolve("-", NFDUMP.as_bytes()).unwrap().id(), "netflow");
206        // A generic CSV stays CSV.
207        assert_eq!(reg.resolve("-", b"a,b,c\n1,2,3").unwrap().id(), "csv");
208    }
209}