Skip to main content

anomalyx_normalize/parsers/
ndjson.rs

1//! Newline-delimited JSON parser: one JSON value per line. Scalar or array
2//! lines are placed under the synthetic `value` column.
3
4use crate::parser::{Confidence, FormatParser, STRONG};
5use crate::table::TableBuilder;
6use ax_core::{AxError, Column};
7
8#[derive(Debug, Default, Clone)]
9pub struct NdjsonParser;
10
11impl FormatParser for NdjsonParser {
12    fn id(&self) -> &'static str {
13        "ndjson"
14    }
15    fn extensions(&self) -> &'static [&'static str] {
16        &["ndjson", "jsonl"]
17    }
18    fn sniff(&self, bytes: &[u8]) -> Option<Confidence> {
19        let text = std::str::from_utf8(bytes).ok()?;
20        let trimmed = text.trim_start();
21        if !trimmed.starts_with('{') {
22            return None;
23        }
24        // Two or more object-leading lines distinguishes NDJSON from a single
25        // JSON object; this outranks JsonParser's TEXT confidence.
26        let object_lines = trimmed
27            .lines()
28            .filter(|l| !l.trim().is_empty())
29            .take(3)
30            .filter(|l| l.trim_start().starts_with('{'))
31            .count();
32        (object_lines >= 2).then_some(STRONG)
33    }
34    fn parse(&self, _source: &str, bytes: &[u8]) -> Result<Vec<Column>, AxError> {
35        let text = std::str::from_utf8(bytes).map_err(|e| AxError::Parse {
36            format: self.id().to_string(),
37            message: e.to_string(),
38        })?;
39        let mut builder = TableBuilder::new();
40        for (lineno, line) in text.lines().enumerate() {
41            if line.trim().is_empty() {
42                continue;
43            }
44            let val: serde_json::Value =
45                serde_json::from_str(line).map_err(|e| AxError::Parse {
46                    format: self.id().to_string(),
47                    message: format!("line {}: {e}", lineno + 1),
48                })?;
49            builder.push_value(val);
50        }
51        Ok(builder.finish())
52    }
53}
54
55#[cfg(test)]
56mod tests {
57    use super::*;
58
59    #[test]
60    fn key_union_pads_missing() {
61        let cols = NdjsonParser
62            .parse("-", b"{\"a\":1}\n{\"a\":2,\"b\":9}\n")
63            .unwrap();
64        let bcol = cols.iter().find(|c| c.name == "b").unwrap();
65        assert_eq!(bcol.null_count(), 1);
66        assert_eq!(cols.iter().find(|c| c.name == "a").unwrap().len(), 2);
67    }
68
69    #[test]
70    fn blank_lines_skipped() {
71        let cols = NdjsonParser
72            .parse("-", b"{\"a\":1}\n\n{\"a\":2}\n")
73            .unwrap();
74        assert_eq!(cols[0].len(), 2);
75    }
76
77    #[test]
78    fn sniff_needs_repeated_object_lines() {
79        assert_eq!(NdjsonParser.sniff(b"{\"a\":1}\n{\"a\":2}\n"), Some(STRONG));
80        assert_eq!(NdjsonParser.sniff(b"{\"a\":1}"), None); // single object → JsonParser's job
81        assert_eq!(NdjsonParser.sniff(b"[1,2]"), None);
82    }
83
84    #[test]
85    fn malformed_line_errors_with_line_number() {
86        let err = NdjsonParser.parse("-", b"{\"a\":1}\n{bad}\n").unwrap_err();
87        assert!(matches!(err, AxError::Parse { .. }));
88        assert!(format!("{err}").contains("line 2"));
89    }
90}