anomalyx_normalize/parsers/
osquery.rs1use crate::infer;
16use crate::parser::{Confidence, FormatParser, STRONG};
17use crate::table::TableBuilder;
18use ax_core::{AxError, Column, Value};
19use serde_json::Value as J;
20use std::collections::BTreeMap;
21
22#[derive(Debug, Default, Clone)]
23pub struct OsqueryParser;
24
25fn looks_like_osquery(obj: &serde_json::Map<String, J>) -> bool {
26 obj.contains_key("hostIdentifier")
27 && (obj.get("columns").is_some_and(J::is_object)
28 || obj.get("snapshot").is_some_and(J::is_array))
29}
30
31fn add_columns(result: &serde_json::Map<String, J>, row: &mut BTreeMap<String, Value>) {
34 for (key, value) in result {
35 let cell = match value {
36 J::String(s) => infer::infer_scalar(s),
37 other => infer::json_to_value(other),
38 };
39 row.insert(format!("columns.{key}"), cell);
40 }
41}
42
43impl OsqueryParser {
44 fn err(&self, msg: impl std::fmt::Display) -> AxError {
45 AxError::Parse {
46 format: self.id().to_string(),
47 message: msg.to_string(),
48 }
49 }
50}
51
52impl FormatParser for OsqueryParser {
53 fn id(&self) -> &'static str {
54 "osquery"
55 }
56 fn extensions(&self) -> &'static [&'static str] {
57 &[]
58 }
59 fn sniff(&self, bytes: &[u8]) -> Option<Confidence> {
60 let text = std::str::from_utf8(bytes).ok()?;
61 let line = text.lines().find(|l| !l.trim().is_empty())?;
62 let value: J = serde_json::from_str(line).ok()?;
63 value
64 .as_object()
65 .is_some_and(looks_like_osquery)
66 .then_some(STRONG)
67 }
68 fn parse(&self, _source: &str, bytes: &[u8]) -> Result<Vec<Column>, AxError> {
69 let text = std::str::from_utf8(bytes).map_err(|e| self.err(e))?;
70 let mut builder = TableBuilder::new();
71 for line in text.lines() {
72 if line.trim().is_empty() {
73 continue;
74 }
75 let value: J = serde_json::from_str(line).map_err(|e| self.err(e))?;
76 let obj = value
77 .as_object()
78 .ok_or_else(|| self.err("osquery result is not a JSON object"))?;
79
80 let mut meta: BTreeMap<String, Value> = BTreeMap::new();
82 for (key, val) in obj {
83 if key == "columns" || key == "snapshot" {
84 continue;
85 }
86 meta.insert(key.clone(), infer::json_to_value(val));
87 }
88
89 match (obj.get("columns"), obj.get("snapshot")) {
90 (Some(J::Object(columns)), _) => {
92 let mut row = meta.clone();
93 add_columns(columns, &mut row);
94 builder.push_row(row);
95 }
96 (_, Some(J::Array(snapshot))) => {
98 for element in snapshot {
99 if let Some(result) = element.as_object() {
100 let mut row = meta.clone();
101 add_columns(result, &mut row);
102 builder.push_row(row);
103 }
104 }
105 }
106 _ => builder.push_row(meta),
108 }
109 }
110 Ok(builder.finish())
111 }
112}
113
114#[cfg(test)]
115mod tests {
116 use super::*;
117 use ax_core::ColType;
118
119 const OSQUERY: &str = concat!(
120 r#"{"name":"processes","hostIdentifier":"host1","unixTime":1609459200,"action":"added","columns":{"pid":"123","name":"sshd","path":"/usr/sbin/sshd"}}"#,
121 "\n",
122 r#"{"name":"processes","hostIdentifier":"host1","unixTime":1609459260,"action":"removed","columns":{"pid":"99","name":"bash"}}"#,
123 "\n",
124 r#"{"name":"users","hostIdentifier":"host2","unixTime":1609459300,"action":"snapshot","snapshot":[{"uid":"0","username":"root"},{"uid":"1000","username":"alice"}]}"#,
125 "\n",
126 );
127
128 fn parse(s: &str) -> Vec<Column> {
129 OsqueryParser.parse("-", s.as_bytes()).unwrap()
130 }
131 fn col<'a>(cols: &'a [Column], name: &str) -> &'a Column {
132 cols.iter()
133 .find(|c| c.name == name)
134 .unwrap_or_else(|| panic!("missing column {name}"))
135 }
136
137 #[test]
138 fn differential_columns_are_prefixed_and_coerced() {
139 let cols = parse(OSQUERY);
140 let pid = col(&cols, "columns.pid");
141 assert_eq!(pid.ty, ColType::Int, "stringified pid coerced to int");
142 assert_eq!(pid.cells[0], Value::Int(123));
143 assert_eq!(pid.cells[1], Value::Int(99));
144 assert_eq!(col(&cols, "name").cells[0], Value::Str("processes".into()));
146 assert_eq!(
147 col(&cols, "columns.name").cells[0],
148 Value::Str("sshd".into())
149 );
150 }
151
152 #[test]
153 fn metadata_is_typed() {
154 let cols = parse(OSQUERY);
155 assert_eq!(col(&cols, "unixTime").ty, ColType::Int);
156 assert_eq!(col(&cols, "unixTime").cells[0], Value::Int(1609459200));
157 assert_eq!(col(&cols, "action").cells[1], Value::Str("removed".into()));
158 assert_eq!(
159 col(&cols, "hostIdentifier").cells[0],
160 Value::Str("host1".into())
161 );
162 }
163
164 #[test]
165 fn snapshot_expands_to_one_row_per_element() {
166 let cols = parse(OSQUERY);
167 assert_eq!(col(&cols, "name").cells.len(), 4);
169 let uid = col(&cols, "columns.uid");
170 assert_eq!(uid.cells[2], Value::Int(0)); assert_eq!(uid.cells[3], Value::Int(1000)); assert_eq!(
173 col(&cols, "columns.username").cells[3],
174 Value::Str("alice".into())
175 );
176 assert_eq!(
178 col(&cols, "hostIdentifier").cells[2],
179 Value::Str("host2".into())
180 );
181 assert_eq!(col(&cols, "action").cells[2], Value::Str("snapshot".into()));
182 assert_eq!(col(&cols, "columns.pid").cells[2], Value::Null);
184 assert!(cols.iter().all(|c| c.name != "columns"));
186 assert!(cols.iter().all(|c| c.name != "snapshot"));
187 }
188
189 #[test]
190 fn add_columns_units() {
191 let mut row = BTreeMap::new();
192 let serde_json::Value::Object(obj) = serde_json::json!({"pid": "5", "name": "x"}) else {
193 unreachable!()
194 };
195 add_columns(&obj, &mut row);
196 assert_eq!(row.get("columns.pid"), Some(&Value::Int(5)));
197 assert_eq!(row.get("columns.name"), Some(&Value::Str("x".into())));
198 }
199
200 #[test]
201 fn malformed_events_error() {
202 assert!(matches!(
203 OsqueryParser.parse("-", b"not json\n"),
204 Err(AxError::Parse { .. })
205 ));
206 assert!(matches!(
207 OsqueryParser.parse("-", b"[1,2,3]\n"),
208 Err(AxError::Parse { .. })
209 ));
210 }
211
212 #[test]
213 fn sniff_keys_on_host_identifier_and_payload() {
214 assert_eq!(OsqueryParser.sniff(OSQUERY.as_bytes()), Some(STRONG));
215 assert_eq!(
217 OsqueryParser.sniff(br#"{"hostIdentifier":"h","snapshot":[{"a":"1"}]}"#),
218 Some(STRONG)
219 );
220 assert_eq!(OsqueryParser.sniff(br#"{"hostIdentifier":"h"}"#), None);
222 assert_eq!(OsqueryParser.sniff(br#"{"columns":{"a":"1"}}"#), None);
224 assert_eq!(
226 OsqueryParser.sniff(br#"{"hostIdentifier":"h","columns":5}"#),
227 None
228 );
229 assert_eq!(OsqueryParser.sniff(b"{\"a\":1}\n{\"a\":2}\n"), None); assert_eq!(OsqueryParser.sniff(b"a,b,c\n1,2,3"), None); }
232
233 #[test]
234 fn claims_no_extension() {
235 assert!(OsqueryParser.extensions().is_empty());
236 }
237
238 #[test]
239 fn resolves_osquery_over_ndjson_by_content() {
240 let reg = crate::parser::ParserRegistry::default();
241 assert_eq!(
242 reg.resolve("-", OSQUERY.as_bytes()).unwrap().id(),
243 "osquery"
244 );
245 assert_eq!(
246 reg.resolve("-", b"{\"a\":1}\n{\"a\":2}\n").unwrap().id(),
247 "ndjson"
248 );
249 }
250}