Skip to main content

anomalyx_normalize/parsers/
osquery.rs

1//! osquery results parser — endpoint fleet telemetry (JSON event log).
2//!
3//! osquery's results log is NDJSON. Each line carries query metadata (`name`,
4//! `hostIdentifier`, `unixTime`, `action`, …) plus the actual result data in
5//! either a **differential** `columns` object (one result row) or a **snapshot**
6//! array (many result rows). We emit **one row per result**: the metadata
7//! columns plus the result fields under a `columns.<key>` prefix (so a query
8//! column named `name` doesn't collide with the query `name`). osquery stringifies
9//! every column value, so those are type-coerced (`pid` `"123"` → `Int`) — which
10//! lets `structural`/`dist` read fleet-posture drift against a baseline snapshot.
11//!
12//! Detected by the `hostIdentifier` + (`columns` | `snapshot`) signature; claims
13//! no extension (the results log is generically `*.log`).
14
15use crate::infer;
16use crate::parser::{Confidence, FormatParser, STRONG};
17use crate::table::TableBuilder;
18use ax_core::{AxError, Column, Value};
19use serde_json::Value as J;
20use std::collections::BTreeMap;
21
22#[derive(Debug, Default, Clone)]
23pub struct OsqueryParser;
24
25fn looks_like_osquery(obj: &serde_json::Map<String, J>) -> bool {
26    obj.contains_key("hostIdentifier")
27        && (obj.get("columns").is_some_and(J::is_object)
28            || obj.get("snapshot").is_some_and(J::is_array))
29}
30
31/// Adds a result object's fields as `columns.<key>` cells, type-coercing the
32/// stringified values osquery emits.
33fn add_columns(result: &serde_json::Map<String, J>, row: &mut BTreeMap<String, Value>) {
34    for (key, value) in result {
35        let cell = match value {
36            J::String(s) => infer::infer_scalar(s),
37            other => infer::json_to_value(other),
38        };
39        row.insert(format!("columns.{key}"), cell);
40    }
41}
42
43impl OsqueryParser {
44    fn err(&self, msg: impl std::fmt::Display) -> AxError {
45        AxError::Parse {
46            format: self.id().to_string(),
47            message: msg.to_string(),
48        }
49    }
50}
51
52impl FormatParser for OsqueryParser {
53    fn id(&self) -> &'static str {
54        "osquery"
55    }
56    fn extensions(&self) -> &'static [&'static str] {
57        &[]
58    }
59    fn sniff(&self, bytes: &[u8]) -> Option<Confidence> {
60        let text = std::str::from_utf8(bytes).ok()?;
61        let line = text.lines().find(|l| !l.trim().is_empty())?;
62        let value: J = serde_json::from_str(line).ok()?;
63        value
64            .as_object()
65            .is_some_and(looks_like_osquery)
66            .then_some(STRONG)
67    }
68    fn parse(&self, _source: &str, bytes: &[u8]) -> Result<Vec<Column>, AxError> {
69        let text = std::str::from_utf8(bytes).map_err(|e| self.err(e))?;
70        let mut builder = TableBuilder::new();
71        for line in text.lines() {
72            if line.trim().is_empty() {
73                continue;
74            }
75            let value: J = serde_json::from_str(line).map_err(|e| self.err(e))?;
76            let obj = value
77                .as_object()
78                .ok_or_else(|| self.err("osquery result is not a JSON object"))?;
79
80            // Metadata: every top-level scalar except the result payloads.
81            let mut meta: BTreeMap<String, Value> = BTreeMap::new();
82            for (key, val) in obj {
83                if key == "columns" || key == "snapshot" {
84                    continue;
85                }
86                meta.insert(key.clone(), infer::json_to_value(val));
87            }
88
89            match (obj.get("columns"), obj.get("snapshot")) {
90                // Differential: one result object → one row.
91                (Some(J::Object(columns)), _) => {
92                    let mut row = meta.clone();
93                    add_columns(columns, &mut row);
94                    builder.push_row(row);
95                }
96                // Snapshot: one row per array element (metadata replicated).
97                (_, Some(J::Array(snapshot))) => {
98                    for element in snapshot {
99                        if let Some(result) = element.as_object() {
100                            let mut row = meta.clone();
101                            add_columns(result, &mut row);
102                            builder.push_row(row);
103                        }
104                    }
105                }
106                // Neither payload: emit the metadata row as-is.
107                _ => builder.push_row(meta),
108            }
109        }
110        Ok(builder.finish())
111    }
112}
113
114#[cfg(test)]
115mod tests {
116    use super::*;
117    use ax_core::ColType;
118
119    const OSQUERY: &str = concat!(
120        r#"{"name":"processes","hostIdentifier":"host1","unixTime":1609459200,"action":"added","columns":{"pid":"123","name":"sshd","path":"/usr/sbin/sshd"}}"#,
121        "\n",
122        r#"{"name":"processes","hostIdentifier":"host1","unixTime":1609459260,"action":"removed","columns":{"pid":"99","name":"bash"}}"#,
123        "\n",
124        r#"{"name":"users","hostIdentifier":"host2","unixTime":1609459300,"action":"snapshot","snapshot":[{"uid":"0","username":"root"},{"uid":"1000","username":"alice"}]}"#,
125        "\n",
126    );
127
128    fn parse(s: &str) -> Vec<Column> {
129        OsqueryParser.parse("-", s.as_bytes()).unwrap()
130    }
131    fn col<'a>(cols: &'a [Column], name: &str) -> &'a Column {
132        cols.iter()
133            .find(|c| c.name == name)
134            .unwrap_or_else(|| panic!("missing column {name}"))
135    }
136
137    #[test]
138    fn differential_columns_are_prefixed_and_coerced() {
139        let cols = parse(OSQUERY);
140        let pid = col(&cols, "columns.pid");
141        assert_eq!(pid.ty, ColType::Int, "stringified pid coerced to int");
142        assert_eq!(pid.cells[0], Value::Int(123));
143        assert_eq!(pid.cells[1], Value::Int(99));
144        // The query name (top-level) and the column name stay distinct.
145        assert_eq!(col(&cols, "name").cells[0], Value::Str("processes".into()));
146        assert_eq!(
147            col(&cols, "columns.name").cells[0],
148            Value::Str("sshd".into())
149        );
150    }
151
152    #[test]
153    fn metadata_is_typed() {
154        let cols = parse(OSQUERY);
155        assert_eq!(col(&cols, "unixTime").ty, ColType::Int);
156        assert_eq!(col(&cols, "unixTime").cells[0], Value::Int(1609459200));
157        assert_eq!(col(&cols, "action").cells[1], Value::Str("removed".into()));
158        assert_eq!(
159            col(&cols, "hostIdentifier").cells[0],
160            Value::Str("host1".into())
161        );
162    }
163
164    #[test]
165    fn snapshot_expands_to_one_row_per_element() {
166        let cols = parse(OSQUERY);
167        // 2 differential rows + 2 snapshot rows = 4.
168        assert_eq!(col(&cols, "name").cells.len(), 4);
169        let uid = col(&cols, "columns.uid");
170        assert_eq!(uid.cells[2], Value::Int(0)); // root
171        assert_eq!(uid.cells[3], Value::Int(1000)); // alice
172        assert_eq!(
173            col(&cols, "columns.username").cells[3],
174            Value::Str("alice".into())
175        );
176        // Metadata replicated onto each snapshot row.
177        assert_eq!(
178            col(&cols, "hostIdentifier").cells[2],
179            Value::Str("host2".into())
180        );
181        assert_eq!(col(&cols, "action").cells[2], Value::Str("snapshot".into()));
182        // A differential-only column is null on the snapshot rows.
183        assert_eq!(col(&cols, "columns.pid").cells[2], Value::Null);
184        // The raw payloads are flattened, never emitted as bare columns.
185        assert!(cols.iter().all(|c| c.name != "columns"));
186        assert!(cols.iter().all(|c| c.name != "snapshot"));
187    }
188
189    #[test]
190    fn add_columns_units() {
191        let mut row = BTreeMap::new();
192        let serde_json::Value::Object(obj) = serde_json::json!({"pid": "5", "name": "x"}) else {
193            unreachable!()
194        };
195        add_columns(&obj, &mut row);
196        assert_eq!(row.get("columns.pid"), Some(&Value::Int(5)));
197        assert_eq!(row.get("columns.name"), Some(&Value::Str("x".into())));
198    }
199
200    #[test]
201    fn malformed_events_error() {
202        assert!(matches!(
203            OsqueryParser.parse("-", b"not json\n"),
204            Err(AxError::Parse { .. })
205        ));
206        assert!(matches!(
207            OsqueryParser.parse("-", b"[1,2,3]\n"),
208            Err(AxError::Parse { .. })
209        ));
210    }
211
212    #[test]
213    fn sniff_keys_on_host_identifier_and_payload() {
214        assert_eq!(OsqueryParser.sniff(OSQUERY.as_bytes()), Some(STRONG));
215        // snapshot form is also recognized.
216        assert_eq!(
217            OsqueryParser.sniff(br#"{"hostIdentifier":"h","snapshot":[{"a":"1"}]}"#),
218            Some(STRONG)
219        );
220        // hostIdentifier alone (no columns/snapshot) is not enough.
221        assert_eq!(OsqueryParser.sniff(br#"{"hostIdentifier":"h"}"#), None);
222        // columns alone (no hostIdentifier) is not enough.
223        assert_eq!(OsqueryParser.sniff(br#"{"columns":{"a":"1"}}"#), None);
224        // columns must be an object, snapshot must be an array.
225        assert_eq!(
226            OsqueryParser.sniff(br#"{"hostIdentifier":"h","columns":5}"#),
227            None
228        );
229        assert_eq!(OsqueryParser.sniff(b"{\"a\":1}\n{\"a\":2}\n"), None); // generic NDJSON
230        assert_eq!(OsqueryParser.sniff(b"a,b,c\n1,2,3"), None); // not JSON
231    }
232
233    #[test]
234    fn claims_no_extension() {
235        assert!(OsqueryParser.extensions().is_empty());
236    }
237
238    #[test]
239    fn resolves_osquery_over_ndjson_by_content() {
240        let reg = crate::parser::ParserRegistry::default();
241        assert_eq!(
242            reg.resolve("-", OSQUERY.as_bytes()).unwrap().id(),
243            "osquery"
244        );
245        assert_eq!(
246            reg.resolve("-", b"{\"a\":1}\n{\"a\":2}\n").unwrap().id(),
247            "ndjson"
248        );
249    }
250}