anomalyx_normalize/parsers/
vpcflow.rs1use crate::infer;
16use crate::parser::{Confidence, FormatParser, STRONG};
17use crate::table::TableBuilder;
18use ax_core::{AxError, Column, Value};
19use std::collections::BTreeMap;
20
21#[derive(Debug, Default, Clone)]
22pub struct VpcFlowParser;
23
24const SIGNATURE: &[&str] = &["srcaddr", "dstaddr", "dstport"];
27
28fn is_vpc_header(line: &str) -> bool {
29 let tokens: Vec<&str> = line.split_whitespace().collect();
30 SIGNATURE.iter().all(|field| tokens.contains(field))
31}
32
33fn canonical(field: &str) -> &str {
36 match field {
37 "srcaddr" => "src_addr",
38 "dstaddr" => "dst_addr",
39 "srcport" => "src_port",
40 "dstport" => "dst_port",
41 "protocol" => "proto",
42 "interface-id" => "interface_id",
43 "account-id" => "account_id",
44 "log-status" => "log_status",
45 other => other,
46 }
47}
48
49impl VpcFlowParser {
50 fn err(&self, msg: impl std::fmt::Display) -> AxError {
51 AxError::Parse {
52 format: self.id().to_string(),
53 message: msg.to_string(),
54 }
55 }
56}
57
58impl FormatParser for VpcFlowParser {
59 fn id(&self) -> &'static str {
60 "vpcflow"
61 }
62 fn extensions(&self) -> &'static [&'static str] {
63 &[]
64 }
65 fn sniff(&self, bytes: &[u8]) -> Option<Confidence> {
66 let text = std::str::from_utf8(bytes).ok()?;
67 let line = text.lines().find(|l| !l.trim().is_empty())?;
68 is_vpc_header(line).then_some(STRONG)
69 }
70 fn parse(&self, _source: &str, bytes: &[u8]) -> Result<Vec<Column>, AxError> {
71 let text = std::str::from_utf8(bytes).map_err(|e| self.err(e))?;
72 let mut lines = text.lines().filter(|l| !l.trim().is_empty());
73
74 let header = lines.next().ok_or_else(|| self.err("empty VPC flow log"))?;
75 if !is_vpc_header(header) {
76 return Err(self.err("not a VPC flow log: header is missing srcaddr/dstaddr/dstport"));
77 }
78 let names: Vec<String> = header
79 .split_whitespace()
80 .map(|f| canonical(f).to_string())
81 .collect();
82
83 let mut builder = TableBuilder::new();
84 for line in lines {
85 let mut row: BTreeMap<String, Value> = BTreeMap::new();
86 for (name, value) in names.iter().zip(line.split_whitespace()) {
87 let cell = if value == "-" {
89 Value::Null
90 } else {
91 infer::infer_scalar(value)
92 };
93 row.insert(name.clone(), cell);
94 }
95 if let (Some(Value::Int(start)), Some(Value::Int(end))) =
97 (row.get("start"), row.get("end"))
98 {
99 if let Some(d) = end.checked_sub(*start) {
100 row.insert("duration".to_string(), Value::Int(d));
101 }
102 }
103 builder.push_row(row);
104 }
105 Ok(builder.finish())
106 }
107}
108
109#[cfg(test)]
110mod tests {
111 use super::*;
112 use ax_core::ColType;
113
114 const VPC: &str = "\
115version account-id interface-id srcaddr dstaddr srcport dstport protocol packets bytes start end action log-status
1162 123456789010 eni-abc 172.31.16.139 172.31.16.21 20641 22 6 20 4249 1418530010 1418530070 ACCEPT OK
1172 123456789010 eni-abc 172.31.9.69 172.31.9.12 0 0 - - - 1431280876 1431280934 - NODATA
118";
119
120 fn parse(s: &str) -> Vec<Column> {
121 VpcFlowParser.parse("-", s.as_bytes()).unwrap()
122 }
123 fn col<'a>(cols: &'a [Column], name: &str) -> &'a Column {
124 cols.iter()
125 .find(|c| c.name == name)
126 .unwrap_or_else(|| panic!("missing column {name}"))
127 }
128
129 #[test]
130 fn canonical_flow_columns_are_typed() {
131 let cols = parse(VPC);
132 assert_eq!(
133 col(&cols, "src_addr").cells[0],
134 Value::Str("172.31.16.139".into())
135 );
136 let dport = col(&cols, "dst_port");
137 assert_eq!(dport.ty, ColType::Int);
138 assert_eq!(dport.cells, vec![Value::Int(22), Value::Int(0)]);
139 assert_eq!(col(&cols, "proto").cells[0], Value::Int(6)); assert_eq!(col(&cols, "bytes").cells[0], Value::Int(4249));
141 assert!(cols
143 .iter()
144 .all(|c| c.name != "srcaddr" && c.name != "dstport"));
145 }
146
147 #[test]
148 fn duration_is_synthesized_from_start_end() {
149 let cols = parse(VPC);
150 let dur = col(&cols, "duration");
151 assert_eq!(dur.ty, ColType::Int);
152 assert_eq!(dur.cells, vec![Value::Int(60), Value::Int(58)]);
153 }
154
155 #[test]
156 fn dash_placeholder_is_null() {
157 let cols = parse(VPC);
158 assert_eq!(col(&cols, "packets").cells[1], Value::Null);
160 assert_eq!(col(&cols, "bytes").cells[1], Value::Null);
161 assert_eq!(col(&cols, "action").cells[1], Value::Null);
162 assert_eq!(col(&cols, "action").cells[0], Value::Str("ACCEPT".into()));
163 assert_eq!(
164 col(&cols, "log_status").cells[1],
165 Value::Str("NODATA".into())
166 );
167 }
168
169 #[test]
170 fn canonical_units() {
171 assert_eq!(canonical("srcaddr"), "src_addr");
172 assert_eq!(canonical("dstport"), "dst_port");
173 assert_eq!(canonical("protocol"), "proto");
174 assert_eq!(canonical("log-status"), "log_status");
175 assert_eq!(canonical("packets"), "packets"); assert_eq!(canonical("tcp-flags"), "tcp-flags"); }
178
179 #[test]
180 fn malformed_header_errors() {
181 assert!(matches!(
182 VpcFlowParser.parse("-", b"a b c\n1 2 3\n"),
183 Err(AxError::Parse { .. })
184 ));
185 assert!(matches!(
186 VpcFlowParser.parse("-", b""),
187 Err(AxError::Parse { .. })
188 ));
189 }
190
191 #[test]
192 fn sniff_keys_on_the_vpc_header() {
193 assert_eq!(VpcFlowParser.sniff(VPC.as_bytes()), Some(STRONG));
194 assert_eq!(VpcFlowParser.sniff(b"srcaddr dstaddr action\n1 2 3"), None);
196 assert_eq!(VpcFlowParser.sniff(b"a b c\n1 2 3"), None);
197 assert_eq!(VpcFlowParser.sniff(b"ts,te,td,sa\n"), None); }
199
200 #[test]
201 fn claims_no_extension() {
202 assert!(VpcFlowParser.extensions().is_empty());
203 }
204
205 #[test]
206 fn resolves_vpcflow_by_content() {
207 let reg = crate::parser::ParserRegistry::default();
208 assert_eq!(reg.resolve("-", VPC.as_bytes()).unwrap().id(), "vpcflow");
209 }
210}