Skip to main content

anomalyx_normalize/parsers/
vpcflow.rs

1//! AWS VPC Flow Logs parser — cloud-native network telemetry (space-delimited).
2//!
3//! A VPC flow log file is whitespace-delimited with a header line naming the
4//! fields (the default v2 layout, or a custom field order). We use the header to
5//! map columns, renaming AWS's names to the same canonical schema as the NetFlow
6//! parser — `srcaddr`→`src_addr`, `dstport`→`dst_port`, `protocol`→`proto`, … —
7//! and synthesize `duration` = `end - start`. So the same flow anomalies apply
8//! with zero new infra: `mv.mahalanobis` over `(bytes, packets, duration)` and
9//! rare-port `dist` on `dst_port`. The AWS `-` placeholder (e.g. a `NODATA`
10//! record) becomes `Null` (honest absence).
11//!
12//! Detected by the VPC header signature (`srcaddr` + `dstaddr` + `dstport`);
13//! claims no extension (flow log files are generically `*.log`).
14
15use crate::infer;
16use crate::parser::{Confidence, FormatParser, STRONG};
17use crate::table::TableBuilder;
18use ax_core::{AxError, Column, Value};
19use std::collections::BTreeMap;
20
21#[derive(Debug, Default, Clone)]
22pub struct VpcFlowParser;
23
24/// Header field names that together identify a VPC flow log (regardless of the
25/// custom field order AWS allows).
26const SIGNATURE: &[&str] = &["srcaddr", "dstaddr", "dstport"];
27
28fn is_vpc_header(line: &str) -> bool {
29    let tokens: Vec<&str> = line.split_whitespace().collect();
30    SIGNATURE.iter().all(|field| tokens.contains(field))
31}
32
33/// Maps a VPC flow log field name to the canonical NetFlow-style column name;
34/// other fields pass through unchanged.
35fn canonical(field: &str) -> &str {
36    match field {
37        "srcaddr" => "src_addr",
38        "dstaddr" => "dst_addr",
39        "srcport" => "src_port",
40        "dstport" => "dst_port",
41        "protocol" => "proto",
42        "interface-id" => "interface_id",
43        "account-id" => "account_id",
44        "log-status" => "log_status",
45        other => other,
46    }
47}
48
49impl VpcFlowParser {
50    fn err(&self, msg: impl std::fmt::Display) -> AxError {
51        AxError::Parse {
52            format: self.id().to_string(),
53            message: msg.to_string(),
54        }
55    }
56}
57
58impl FormatParser for VpcFlowParser {
59    fn id(&self) -> &'static str {
60        "vpcflow"
61    }
62    fn extensions(&self) -> &'static [&'static str] {
63        &[]
64    }
65    fn sniff(&self, bytes: &[u8]) -> Option<Confidence> {
66        let text = std::str::from_utf8(bytes).ok()?;
67        let line = text.lines().find(|l| !l.trim().is_empty())?;
68        is_vpc_header(line).then_some(STRONG)
69    }
70    fn parse(&self, _source: &str, bytes: &[u8]) -> Result<Vec<Column>, AxError> {
71        let text = std::str::from_utf8(bytes).map_err(|e| self.err(e))?;
72        let mut lines = text.lines().filter(|l| !l.trim().is_empty());
73
74        let header = lines.next().ok_or_else(|| self.err("empty VPC flow log"))?;
75        if !is_vpc_header(header) {
76            return Err(self.err("not a VPC flow log: header is missing srcaddr/dstaddr/dstport"));
77        }
78        let names: Vec<String> = header
79            .split_whitespace()
80            .map(|f| canonical(f).to_string())
81            .collect();
82
83        let mut builder = TableBuilder::new();
84        for line in lines {
85            let mut row: BTreeMap<String, Value> = BTreeMap::new();
86            for (name, value) in names.iter().zip(line.split_whitespace()) {
87                // AWS uses `-` for a field with no value (e.g. a NODATA record).
88                let cell = if value == "-" {
89                    Value::Null
90                } else {
91                    infer::infer_scalar(value)
92                };
93                row.insert(name.clone(), cell);
94            }
95            // Synthesize flow duration from the epoch start/end when both present.
96            if let (Some(Value::Int(start)), Some(Value::Int(end))) =
97                (row.get("start"), row.get("end"))
98            {
99                if let Some(d) = end.checked_sub(*start) {
100                    row.insert("duration".to_string(), Value::Int(d));
101                }
102            }
103            builder.push_row(row);
104        }
105        Ok(builder.finish())
106    }
107}
108
109#[cfg(test)]
110mod tests {
111    use super::*;
112    use ax_core::ColType;
113
114    const VPC: &str = "\
115version account-id interface-id srcaddr dstaddr srcport dstport protocol packets bytes start end action log-status
1162 123456789010 eni-abc 172.31.16.139 172.31.16.21 20641 22 6 20 4249 1418530010 1418530070 ACCEPT OK
1172 123456789010 eni-abc 172.31.9.69 172.31.9.12 0 0 - - - 1431280876 1431280934 - NODATA
118";
119
120    fn parse(s: &str) -> Vec<Column> {
121        VpcFlowParser.parse("-", s.as_bytes()).unwrap()
122    }
123    fn col<'a>(cols: &'a [Column], name: &str) -> &'a Column {
124        cols.iter()
125            .find(|c| c.name == name)
126            .unwrap_or_else(|| panic!("missing column {name}"))
127    }
128
129    #[test]
130    fn canonical_flow_columns_are_typed() {
131        let cols = parse(VPC);
132        assert_eq!(
133            col(&cols, "src_addr").cells[0],
134            Value::Str("172.31.16.139".into())
135        );
136        let dport = col(&cols, "dst_port");
137        assert_eq!(dport.ty, ColType::Int);
138        assert_eq!(dport.cells, vec![Value::Int(22), Value::Int(0)]);
139        assert_eq!(col(&cols, "proto").cells[0], Value::Int(6)); // protocol → proto
140        assert_eq!(col(&cols, "bytes").cells[0], Value::Int(4249));
141        // The AWS short names are gone.
142        assert!(cols
143            .iter()
144            .all(|c| c.name != "srcaddr" && c.name != "dstport"));
145    }
146
147    #[test]
148    fn duration_is_synthesized_from_start_end() {
149        let cols = parse(VPC);
150        let dur = col(&cols, "duration");
151        assert_eq!(dur.ty, ColType::Int);
152        assert_eq!(dur.cells, vec![Value::Int(60), Value::Int(58)]);
153    }
154
155    #[test]
156    fn dash_placeholder_is_null() {
157        let cols = parse(VPC);
158        // The second record is NODATA: packets/bytes/action are `-`.
159        assert_eq!(col(&cols, "packets").cells[1], Value::Null);
160        assert_eq!(col(&cols, "bytes").cells[1], Value::Null);
161        assert_eq!(col(&cols, "action").cells[1], Value::Null);
162        assert_eq!(col(&cols, "action").cells[0], Value::Str("ACCEPT".into()));
163        assert_eq!(
164            col(&cols, "log_status").cells[1],
165            Value::Str("NODATA".into())
166        );
167    }
168
169    #[test]
170    fn canonical_units() {
171        assert_eq!(canonical("srcaddr"), "src_addr");
172        assert_eq!(canonical("dstport"), "dst_port");
173        assert_eq!(canonical("protocol"), "proto");
174        assert_eq!(canonical("log-status"), "log_status");
175        assert_eq!(canonical("packets"), "packets"); // pass-through
176        assert_eq!(canonical("tcp-flags"), "tcp-flags"); // custom field pass-through
177    }
178
179    #[test]
180    fn malformed_header_errors() {
181        assert!(matches!(
182            VpcFlowParser.parse("-", b"a b c\n1 2 3\n"),
183            Err(AxError::Parse { .. })
184        ));
185        assert!(matches!(
186            VpcFlowParser.parse("-", b""),
187            Err(AxError::Parse { .. })
188        ));
189    }
190
191    #[test]
192    fn sniff_keys_on_the_vpc_header() {
193        assert_eq!(VpcFlowParser.sniff(VPC.as_bytes()), Some(STRONG));
194        // A header missing one signature field is not VPC.
195        assert_eq!(VpcFlowParser.sniff(b"srcaddr dstaddr action\n1 2 3"), None);
196        assert_eq!(VpcFlowParser.sniff(b"a b c\n1 2 3"), None);
197        assert_eq!(VpcFlowParser.sniff(b"ts,te,td,sa\n"), None); // nfdump (comma)
198    }
199
200    #[test]
201    fn claims_no_extension() {
202        assert!(VpcFlowParser.extensions().is_empty());
203    }
204
205    #[test]
206    fn resolves_vpcflow_by_content() {
207        let reg = crate::parser::ParserRegistry::default();
208        assert_eq!(reg.resolve("-", VPC.as_bytes()).unwrap().id(), "vpcflow");
209    }
210}