anomalyx_normalize/parsers/
ndjson.rs1use crate::parser::{Confidence, FormatParser, STRONG};
5use crate::table::TableBuilder;
6use ax_core::{AxError, Column};
7
8#[derive(Debug, Default, Clone)]
9pub struct NdjsonParser;
10
11impl FormatParser for NdjsonParser {
12 fn id(&self) -> &'static str {
13 "ndjson"
14 }
15 fn extensions(&self) -> &'static [&'static str] {
16 &["ndjson", "jsonl"]
17 }
18 fn sniff(&self, bytes: &[u8]) -> Option<Confidence> {
19 let text = std::str::from_utf8(bytes).ok()?;
20 let trimmed = text.trim_start();
21 if !trimmed.starts_with('{') {
22 return None;
23 }
24 let object_lines = trimmed
27 .lines()
28 .filter(|l| !l.trim().is_empty())
29 .take(3)
30 .filter(|l| l.trim_start().starts_with('{'))
31 .count();
32 (object_lines >= 2).then_some(STRONG)
33 }
34 fn parse(&self, _source: &str, bytes: &[u8]) -> Result<Vec<Column>, AxError> {
35 let text = std::str::from_utf8(bytes).map_err(|e| AxError::Parse {
36 format: self.id().to_string(),
37 message: e.to_string(),
38 })?;
39 let mut builder = TableBuilder::new();
40 for (lineno, line) in text.lines().enumerate() {
41 if line.trim().is_empty() {
42 continue;
43 }
44 let val: serde_json::Value =
45 serde_json::from_str(line).map_err(|e| AxError::Parse {
46 format: self.id().to_string(),
47 message: format!("line {}: {e}", lineno + 1),
48 })?;
49 builder.push_value(val);
50 }
51 Ok(builder.finish())
52 }
53}
54
55#[cfg(test)]
56mod tests {
57 use super::*;
58
59 #[test]
60 fn key_union_pads_missing() {
61 let cols = NdjsonParser
62 .parse("-", b"{\"a\":1}\n{\"a\":2,\"b\":9}\n")
63 .unwrap();
64 let bcol = cols.iter().find(|c| c.name == "b").unwrap();
65 assert_eq!(bcol.null_count(), 1);
66 assert_eq!(cols.iter().find(|c| c.name == "a").unwrap().len(), 2);
67 }
68
69 #[test]
70 fn blank_lines_skipped() {
71 let cols = NdjsonParser
72 .parse("-", b"{\"a\":1}\n\n{\"a\":2}\n")
73 .unwrap();
74 assert_eq!(cols[0].len(), 2);
75 }
76
77 #[test]
78 fn sniff_needs_repeated_object_lines() {
79 assert_eq!(NdjsonParser.sniff(b"{\"a\":1}\n{\"a\":2}\n"), Some(STRONG));
80 assert_eq!(NdjsonParser.sniff(b"{\"a\":1}"), None); assert_eq!(NdjsonParser.sniff(b"[1,2]"), None);
82 }
83
84 #[test]
85 fn malformed_line_errors_with_line_number() {
86 let err = NdjsonParser.parse("-", b"{\"a\":1}\n{bad}\n").unwrap_err();
87 assert!(matches!(err, AxError::Parse { .. }));
88 assert!(format!("{err}").contains("line 2"));
89 }
90}