Skip to main content

anomalyx_normalize/parsers/
cloudtrail.rs

1//! AWS CloudTrail parser — the audit trail for AWS API activity.
2//!
3//! A CloudTrail file is a single JSON document with a top-level `Records` array;
4//! each record is one API call. We flatten each record to **one row**, depth-1:
5//! top-level scalars become columns (`eventName`, `eventSource`,
6//! `sourceIPAddress`, …), and one level into nested objects
7//! (`userIdentity.userName`, `userIdentity.type`); anything deeper or array-
8//! valued (`requestParameters`, `responseElements`) is kept as canonical JSON,
9//! so a varied per-API payload doesn't explode the schema.
10//!
11//! We also synthesize **`eventEpoch`** — the RFC 3339 `eventTime` parsed to Unix
12//! seconds (deterministic; no wall clock) — so `--cadence eventEpoch` and the
13//! `contextual` detector (`--period 24`) can read the call series for off-hours /
14//! automated-pattern anomalies, while `eventName` feeds rare-API `dist` drift.
15//!
16//! Detected by a `Records` array whose entries carry `eventName`; claims no
17//! extension (CloudTrail is delivered as `*.json`, owned by the JSON parser —
18//! pipe it on stdin for CloudTrail-aware flattening).
19
20use crate::infer;
21use crate::parser::{Confidence, FormatParser, STRONG};
22use crate::table::TableBuilder;
23use ax_core::{AxError, Column, Value};
24use chrono::DateTime;
25use serde_json::Value as J;
26use std::collections::BTreeMap;
27
28#[derive(Debug, Default, Clone)]
29pub struct CloudTrailParser;
30
31/// Flattens one record depth-1: top-level scalars keep their name, nested
32/// objects contribute `parent.child` columns, and anything deeper (or an array)
33/// is lowered to its canonical JSON string by [`infer::json_to_value`].
34fn flatten_record(record: &serde_json::Map<String, J>, row: &mut BTreeMap<String, Value>) {
35    for (key, value) in record {
36        match value {
37            J::Object(inner) => {
38                for (child, child_value) in inner {
39                    row.insert(format!("{key}.{child}"), infer::json_to_value(child_value));
40                }
41            }
42            scalar_or_array => {
43                row.insert(key.clone(), infer::json_to_value(scalar_or_array));
44            }
45        }
46    }
47}
48
49impl CloudTrailParser {
50    fn err(&self, msg: impl std::fmt::Display) -> AxError {
51        AxError::Parse {
52            format: self.id().to_string(),
53            message: msg.to_string(),
54        }
55    }
56}
57
58impl FormatParser for CloudTrailParser {
59    fn id(&self) -> &'static str {
60        "cloudtrail"
61    }
62    fn extensions(&self) -> &'static [&'static str] {
63        &[]
64    }
65    fn sniff(&self, bytes: &[u8]) -> Option<Confidence> {
66        let value: J = serde_json::from_slice(bytes).ok()?;
67        let records = value.get("Records").and_then(J::as_array)?;
68        // A `Records` array whose entries carry `eventName` is CloudTrail (and not
69        // just any JSON that happens to have a "Records" key).
70        records
71            .first()
72            .is_some_and(|r| r.get("eventName").is_some())
73            .then_some(STRONG)
74    }
75    fn parse(&self, _source: &str, bytes: &[u8]) -> Result<Vec<Column>, AxError> {
76        let root: J = serde_json::from_slice(bytes).map_err(|e| self.err(e))?;
77        let records = root
78            .get("Records")
79            .and_then(J::as_array)
80            .ok_or_else(|| self.err("not CloudTrail: missing 'Records' array"))?;
81
82        let mut builder = TableBuilder::new();
83        for record in records {
84            let obj = record
85                .as_object()
86                .ok_or_else(|| self.err("CloudTrail record is not an object"))?;
87            let mut row: BTreeMap<String, Value> = BTreeMap::new();
88            flatten_record(obj, &mut row);
89            // Numeric event time for cadence / contextual analysis.
90            if let Some(time) = obj.get("eventTime").and_then(J::as_str) {
91                if let Ok(dt) = DateTime::parse_from_rfc3339(time) {
92                    row.insert("eventEpoch".into(), Value::Int(dt.timestamp()));
93                }
94            }
95            builder.push_row(row);
96        }
97        Ok(builder.finish())
98    }
99}
100
101#[cfg(test)]
102mod tests {
103    use super::*;
104    use ax_core::ColType;
105
106    const TRAIL: &str = r#"{
107      "Records": [
108        {
109          "eventVersion": "1.08",
110          "eventTime": "2021-01-01T00:00:00Z",
111          "eventSource": "s3.amazonaws.com",
112          "eventName": "GetObject",
113          "awsRegion": "us-east-1",
114          "sourceIPAddress": "1.2.3.4",
115          "readOnly": true,
116          "userIdentity": {"type": "IAMUser", "userName": "alice", "accountId": "111"},
117          "requestParameters": {"bucketName": "logs", "key": "a/b"},
118          "responseElements": null
119        },
120        {
121          "eventVersion": "1.08",
122          "eventTime": "2021-01-01T03:30:00Z",
123          "eventSource": "iam.amazonaws.com",
124          "eventName": "CreateUser",
125          "awsRegion": "us-east-1",
126          "sourceIPAddress": "9.9.9.9",
127          "readOnly": false,
128          "userIdentity": {"type": "AssumedRole", "sessionContext": {"mfa": true}}
129        }
130      ]
131    }"#;
132
133    fn parse(s: &str) -> Vec<Column> {
134        CloudTrailParser.parse("-", s.as_bytes()).unwrap()
135    }
136    fn col<'a>(cols: &'a [Column], name: &str) -> &'a Column {
137        cols.iter()
138            .find(|c| c.name == name)
139            .unwrap_or_else(|| panic!("missing column {name}"))
140    }
141
142    #[test]
143    fn one_row_per_record_with_typed_top_level_fields() {
144        let cols = parse(TRAIL);
145        let name = col(&cols, "eventName");
146        assert_eq!(name.cells.len(), 2);
147        assert_eq!(name.cells[0], Value::Str("GetObject".into()));
148        assert_eq!(name.cells[1], Value::Str("CreateUser".into()));
149        assert_eq!(
150            col(&cols, "eventSource").cells[1],
151            Value::Str("iam.amazonaws.com".into())
152        );
153        assert_eq!(col(&cols, "readOnly").ty, ColType::Bool);
154        assert_eq!(col(&cols, "readOnly").cells[0], Value::Bool(true));
155    }
156
157    #[test]
158    fn event_time_is_parsed_to_epoch_seconds() {
159        let cols = parse(TRAIL);
160        let epoch = col(&cols, "eventEpoch");
161        assert_eq!(epoch.ty, ColType::Int);
162        assert_eq!(epoch.cells[0], Value::Int(1_609_459_200)); // 2021-01-01T00:00:00Z
163        assert_eq!(epoch.cells[1], Value::Int(1_609_471_800)); // +3h30m (12600s)
164    }
165
166    #[test]
167    fn nested_objects_flatten_one_level() {
168        let cols = parse(TRAIL);
169        assert_eq!(
170            col(&cols, "userIdentity.userName").cells[0],
171            Value::Str("alice".into())
172        );
173        assert_eq!(
174            col(&cols, "userIdentity.type").cells[1],
175            Value::Str("AssumedRole".into())
176        );
177        // requestParameters flattens one level too.
178        assert_eq!(
179            col(&cols, "requestParameters.bucketName").cells[0],
180            Value::Str("logs".into())
181        );
182        // userName absent on the second record → padded null.
183        assert_eq!(col(&cols, "userIdentity.userName").cells[1], Value::Null);
184    }
185
186    #[test]
187    fn deeper_nesting_is_kept_as_canonical_json() {
188        let cols = parse(TRAIL);
189        // sessionContext is an object nested inside userIdentity → stringified.
190        assert_eq!(
191            col(&cols, "userIdentity.sessionContext").cells[1],
192            Value::Str("{\"mfa\":true}".into())
193        );
194    }
195
196    #[test]
197    fn flatten_record_units() {
198        let mut row = BTreeMap::new();
199        let serde_json::Value::Object(obj) =
200            serde_json::json!({"a": 1, "b": {"c": "x", "d": {"e": 2}}})
201        else {
202            unreachable!()
203        };
204        flatten_record(&obj, &mut row);
205        assert_eq!(row.get("a"), Some(&Value::Int(1))); // top-level scalar
206        assert_eq!(row.get("b.c"), Some(&Value::Str("x".into()))); // one level in
207                                                                   // b.d is an object → canonical JSON string, not b.d.e.
208        assert_eq!(row.get("b.d"), Some(&Value::Str("{\"e\":2}".into())));
209        assert_eq!(row.get("b.d.e"), None);
210    }
211
212    #[test]
213    fn malformed_and_non_cloudtrail_error() {
214        assert!(matches!(
215            CloudTrailParser.parse("-", b"{not json"),
216            Err(AxError::Parse { .. })
217        ));
218        assert!(matches!(
219            CloudTrailParser.parse("-", br#"{"foo": 1}"#),
220            Err(AxError::Parse { .. })
221        ));
222        // A record that is not an object.
223        assert!(matches!(
224            CloudTrailParser.parse("-", br#"{"Records": [1, 2]}"#),
225            Err(AxError::Parse { .. })
226        ));
227    }
228
229    #[test]
230    fn sniff_keys_on_records_with_event_name() {
231        assert_eq!(CloudTrailParser.sniff(TRAIL.as_bytes()), Some(STRONG));
232        // A Records array without eventName entries is not CloudTrail.
233        assert_eq!(CloudTrailParser.sniff(br#"{"Records": [{"x": 1}]}"#), None);
234        assert_eq!(CloudTrailParser.sniff(br#"{"Records": []}"#), None); // empty
235        assert_eq!(CloudTrailParser.sniff(br#"{"Records": 5}"#), None); // not an array
236        assert_eq!(CloudTrailParser.sniff(br#"{"foo": 1}"#), None);
237        assert_eq!(CloudTrailParser.sniff(b"a,b,c\n1,2,3"), None); // not JSON
238    }
239
240    #[test]
241    fn claims_no_extension() {
242        assert!(CloudTrailParser.extensions().is_empty());
243    }
244
245    #[test]
246    fn resolves_by_content() {
247        let reg = crate::parser::ParserRegistry::default();
248        assert_eq!(
249            reg.resolve("-", TRAIL.as_bytes()).unwrap().id(),
250            "cloudtrail"
251        );
252    }
253}