anomalyx_normalize/parsers/
cloudtrail.rs1use crate::infer;
21use crate::parser::{Confidence, FormatParser, STRONG};
22use crate::table::TableBuilder;
23use ax_core::{AxError, Column, Value};
24use chrono::DateTime;
25use serde_json::Value as J;
26use std::collections::BTreeMap;
27
28#[derive(Debug, Default, Clone)]
29pub struct CloudTrailParser;
30
31fn flatten_record(record: &serde_json::Map<String, J>, row: &mut BTreeMap<String, Value>) {
35 for (key, value) in record {
36 match value {
37 J::Object(inner) => {
38 for (child, child_value) in inner {
39 row.insert(format!("{key}.{child}"), infer::json_to_value(child_value));
40 }
41 }
42 scalar_or_array => {
43 row.insert(key.clone(), infer::json_to_value(scalar_or_array));
44 }
45 }
46 }
47}
48
49impl CloudTrailParser {
50 fn err(&self, msg: impl std::fmt::Display) -> AxError {
51 AxError::Parse {
52 format: self.id().to_string(),
53 message: msg.to_string(),
54 }
55 }
56}
57
58impl FormatParser for CloudTrailParser {
59 fn id(&self) -> &'static str {
60 "cloudtrail"
61 }
62 fn extensions(&self) -> &'static [&'static str] {
63 &[]
64 }
65 fn sniff(&self, bytes: &[u8]) -> Option<Confidence> {
66 let value: J = serde_json::from_slice(bytes).ok()?;
67 let records = value.get("Records").and_then(J::as_array)?;
68 records
71 .first()
72 .is_some_and(|r| r.get("eventName").is_some())
73 .then_some(STRONG)
74 }
75 fn parse(&self, _source: &str, bytes: &[u8]) -> Result<Vec<Column>, AxError> {
76 let root: J = serde_json::from_slice(bytes).map_err(|e| self.err(e))?;
77 let records = root
78 .get("Records")
79 .and_then(J::as_array)
80 .ok_or_else(|| self.err("not CloudTrail: missing 'Records' array"))?;
81
82 let mut builder = TableBuilder::new();
83 for record in records {
84 let obj = record
85 .as_object()
86 .ok_or_else(|| self.err("CloudTrail record is not an object"))?;
87 let mut row: BTreeMap<String, Value> = BTreeMap::new();
88 flatten_record(obj, &mut row);
89 if let Some(time) = obj.get("eventTime").and_then(J::as_str) {
91 if let Ok(dt) = DateTime::parse_from_rfc3339(time) {
92 row.insert("eventEpoch".into(), Value::Int(dt.timestamp()));
93 }
94 }
95 builder.push_row(row);
96 }
97 Ok(builder.finish())
98 }
99}
100
101#[cfg(test)]
102mod tests {
103 use super::*;
104 use ax_core::ColType;
105
106 const TRAIL: &str = r#"{
107 "Records": [
108 {
109 "eventVersion": "1.08",
110 "eventTime": "2021-01-01T00:00:00Z",
111 "eventSource": "s3.amazonaws.com",
112 "eventName": "GetObject",
113 "awsRegion": "us-east-1",
114 "sourceIPAddress": "1.2.3.4",
115 "readOnly": true,
116 "userIdentity": {"type": "IAMUser", "userName": "alice", "accountId": "111"},
117 "requestParameters": {"bucketName": "logs", "key": "a/b"},
118 "responseElements": null
119 },
120 {
121 "eventVersion": "1.08",
122 "eventTime": "2021-01-01T03:30:00Z",
123 "eventSource": "iam.amazonaws.com",
124 "eventName": "CreateUser",
125 "awsRegion": "us-east-1",
126 "sourceIPAddress": "9.9.9.9",
127 "readOnly": false,
128 "userIdentity": {"type": "AssumedRole", "sessionContext": {"mfa": true}}
129 }
130 ]
131 }"#;
132
133 fn parse(s: &str) -> Vec<Column> {
134 CloudTrailParser.parse("-", s.as_bytes()).unwrap()
135 }
136 fn col<'a>(cols: &'a [Column], name: &str) -> &'a Column {
137 cols.iter()
138 .find(|c| c.name == name)
139 .unwrap_or_else(|| panic!("missing column {name}"))
140 }
141
142 #[test]
143 fn one_row_per_record_with_typed_top_level_fields() {
144 let cols = parse(TRAIL);
145 let name = col(&cols, "eventName");
146 assert_eq!(name.cells.len(), 2);
147 assert_eq!(name.cells[0], Value::Str("GetObject".into()));
148 assert_eq!(name.cells[1], Value::Str("CreateUser".into()));
149 assert_eq!(
150 col(&cols, "eventSource").cells[1],
151 Value::Str("iam.amazonaws.com".into())
152 );
153 assert_eq!(col(&cols, "readOnly").ty, ColType::Bool);
154 assert_eq!(col(&cols, "readOnly").cells[0], Value::Bool(true));
155 }
156
157 #[test]
158 fn event_time_is_parsed_to_epoch_seconds() {
159 let cols = parse(TRAIL);
160 let epoch = col(&cols, "eventEpoch");
161 assert_eq!(epoch.ty, ColType::Int);
162 assert_eq!(epoch.cells[0], Value::Int(1_609_459_200)); assert_eq!(epoch.cells[1], Value::Int(1_609_471_800)); }
165
166 #[test]
167 fn nested_objects_flatten_one_level() {
168 let cols = parse(TRAIL);
169 assert_eq!(
170 col(&cols, "userIdentity.userName").cells[0],
171 Value::Str("alice".into())
172 );
173 assert_eq!(
174 col(&cols, "userIdentity.type").cells[1],
175 Value::Str("AssumedRole".into())
176 );
177 assert_eq!(
179 col(&cols, "requestParameters.bucketName").cells[0],
180 Value::Str("logs".into())
181 );
182 assert_eq!(col(&cols, "userIdentity.userName").cells[1], Value::Null);
184 }
185
186 #[test]
187 fn deeper_nesting_is_kept_as_canonical_json() {
188 let cols = parse(TRAIL);
189 assert_eq!(
191 col(&cols, "userIdentity.sessionContext").cells[1],
192 Value::Str("{\"mfa\":true}".into())
193 );
194 }
195
196 #[test]
197 fn flatten_record_units() {
198 let mut row = BTreeMap::new();
199 let serde_json::Value::Object(obj) =
200 serde_json::json!({"a": 1, "b": {"c": "x", "d": {"e": 2}}})
201 else {
202 unreachable!()
203 };
204 flatten_record(&obj, &mut row);
205 assert_eq!(row.get("a"), Some(&Value::Int(1))); assert_eq!(row.get("b.c"), Some(&Value::Str("x".into()))); assert_eq!(row.get("b.d"), Some(&Value::Str("{\"e\":2}".into())));
209 assert_eq!(row.get("b.d.e"), None);
210 }
211
212 #[test]
213 fn malformed_and_non_cloudtrail_error() {
214 assert!(matches!(
215 CloudTrailParser.parse("-", b"{not json"),
216 Err(AxError::Parse { .. })
217 ));
218 assert!(matches!(
219 CloudTrailParser.parse("-", br#"{"foo": 1}"#),
220 Err(AxError::Parse { .. })
221 ));
222 assert!(matches!(
224 CloudTrailParser.parse("-", br#"{"Records": [1, 2]}"#),
225 Err(AxError::Parse { .. })
226 ));
227 }
228
229 #[test]
230 fn sniff_keys_on_records_with_event_name() {
231 assert_eq!(CloudTrailParser.sniff(TRAIL.as_bytes()), Some(STRONG));
232 assert_eq!(CloudTrailParser.sniff(br#"{"Records": [{"x": 1}]}"#), None);
234 assert_eq!(CloudTrailParser.sniff(br#"{"Records": []}"#), None); assert_eq!(CloudTrailParser.sniff(br#"{"Records": 5}"#), None); assert_eq!(CloudTrailParser.sniff(br#"{"foo": 1}"#), None);
237 assert_eq!(CloudTrailParser.sniff(b"a,b,c\n1,2,3"), None); }
239
240 #[test]
241 fn claims_no_extension() {
242 assert!(CloudTrailParser.extensions().is_empty());
243 }
244
245 #[test]
246 fn resolves_by_content() {
247 let reg = crate::parser::ParserRegistry::default();
248 assert_eq!(
249 reg.resolve("-", TRAIL.as_bytes()).unwrap().id(),
250 "cloudtrail"
251 );
252 }
253}