use serde::Deserialize;
use serde_json::Value;
use std::collections::HashSet;
use std::io::BufRead;
use std::path::Path;
use crate::core::errors::DataProfilerError;
use crate::core::profile_builder;
use crate::core::report_assembler::ReportAssembler;
use crate::core::streaming_stats::StreamingColumnCollection;
use crate::types::{
ColumnProfile, DataSource, ExecutionMetadata, FileFormat, ProfileReport, QualityDimension,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum JsonFormat {
JsonArray,
Jsonl,
}
#[derive(Debug, Clone, Default)]
pub struct JsonParserConfig {
pub format: Option<JsonFormat>,
pub max_rows: Option<usize>,
}
impl JsonParserConfig {
pub fn with_max_rows(mut self, max_rows: usize) -> Self {
self.max_rows = Some(max_rows);
self
}
pub fn jsonl() -> Self {
Self {
format: Some(JsonFormat::Jsonl),
..Default::default()
}
}
pub fn json_array() -> Self {
Self {
format: Some(JsonFormat::JsonArray),
..Default::default()
}
}
}
fn json_value_to_string(value: &Value) -> String {
match value {
Value::Null => String::new(),
Value::Bool(b) => b.to_string(),
Value::Number(n) => n.to_string(),
Value::String(s) => s.to_string(),
Value::Array(_) | Value::Object(_) => serde_json::to_string(value).unwrap_or_default(),
}
}
fn feed_json_object(
obj: &serde_json::Map<String, Value>,
known_columns: &mut Vec<String>,
known_columns_set: &mut HashSet<String>,
column_stats: &mut StreamingColumnCollection,
) {
for key in obj.keys() {
if known_columns_set.insert(key.clone()) {
known_columns.push(key.clone());
}
}
let values: Vec<String> = known_columns
.iter()
.map(|col| obj.get(col).map(json_value_to_string).unwrap_or_default())
.collect();
column_stats.process_record(known_columns, values);
}
pub fn analyze_json_from_reader<R: BufRead>(
mut reader: R,
config: &JsonParserConfig,
) -> Result<
(
Vec<ColumnProfile>,
StreamingColumnCollection,
usize,
usize,
FileFormat,
),
DataProfilerError,
> {
let format = match config.format {
Some(JsonFormat::JsonArray) => FileFormat::Json,
Some(JsonFormat::Jsonl) => FileFormat::Jsonl,
None => match consume_leading_whitespace(&mut reader)? {
Some(b'[') => FileFormat::Json,
_ => FileFormat::Jsonl,
},
};
let mut column_stats = StreamingColumnCollection::new();
let mut known_columns: Vec<String> = Vec::new();
let mut known_columns_set: HashSet<String> = HashSet::new();
let mut rows_read = 0;
let mut malformed_lines = 0;
match format {
FileFormat::Jsonl => {
loop {
if let Some(max) = config.max_rows
&& rows_read >= max
{
break;
}
let mut deserializer = serde_json::Deserializer::from_reader(&mut reader);
let value: Value = match Value::deserialize(&mut deserializer) {
Ok(v) => v,
Err(err) if err.classify() == serde_json::error::Category::Eof => break,
Err(_) => {
malformed_lines += 1;
skip_to_next_line(&mut reader)?;
continue;
}
};
if let Value::Object(ref obj) = value {
feed_json_object(
obj,
&mut known_columns,
&mut known_columns_set,
&mut column_stats,
);
rows_read += 1;
}
}
}
_ => {
let mut found_array = false;
loop {
let mut consume = 0;
{
let buf = reader.fill_buf().map_err(DataProfilerError::from)?;
if buf.is_empty() {
break;
}
for &b in buf {
consume += 1;
if b == b'[' {
found_array = true;
break;
} else if !b.is_ascii_whitespace() {
break;
}
}
}
reader.consume(consume);
if found_array || consume == 0 {
break;
}
}
if !found_array {
if config.format.is_some() {
return Err(DataProfilerError::JsonParsingError {
message: "Expected JSON array (starts with '[') but input does not match"
.to_string(),
});
}
}
if found_array {
loop {
let mut consume = 0;
let mut found_value = false;
let mut end_of_array = false;
{
let buf = reader.fill_buf().map_err(DataProfilerError::from)?;
if buf.is_empty() {
break;
}
for &b in buf {
if b.is_ascii_whitespace() || b == b',' {
consume += 1;
} else if b == b']' {
end_of_array = true;
consume += 1;
break;
} else {
found_value = true;
break;
}
}
}
reader.consume(consume);
if end_of_array {
break;
}
if found_value {
if let Some(max) = config.max_rows
&& rows_read >= max
{
break;
}
let mut de = serde_json::Deserializer::from_reader(&mut reader);
match serde::Deserialize::deserialize(&mut de) {
Ok(Value::Object(obj)) => {
feed_json_object(
&obj,
&mut known_columns,
&mut known_columns_set,
&mut column_stats,
);
rows_read += 1;
}
Ok(_) => { }
Err(_) => {
malformed_lines += 1;
break;
}
}
}
}
}
}
}
let profiles = profile_builder::profiles_from_streaming(&column_stats, false, false, None);
Ok((profiles, column_stats, rows_read, malformed_lines, format))
}
fn consume_leading_whitespace<R: BufRead>(reader: &mut R) -> Result<Option<u8>, DataProfilerError> {
loop {
let mut bytes_to_consume = 0;
let first_non_whitespace = {
let buf = reader.fill_buf().map_err(DataProfilerError::from)?;
if buf.is_empty() {
return Ok(None);
}
let first_non_whitespace = buf.iter().find(|byte| !byte.is_ascii_whitespace()).copied();
if first_non_whitespace.is_none() {
bytes_to_consume = buf.len();
}
first_non_whitespace
};
if first_non_whitespace.is_some() {
return Ok(first_non_whitespace);
}
reader.consume(bytes_to_consume);
}
}
fn skip_to_next_line<R: BufRead>(reader: &mut R) -> Result<(), DataProfilerError> {
let mut discarded = Vec::new();
reader
.read_until(b'\n', &mut discarded)
.map_err(DataProfilerError::from)?;
Ok(())
}
pub fn analyze_json_file(
file_path: &Path,
config: &JsonParserConfig,
) -> Result<ProfileReport, DataProfilerError> {
analyze_json_file_with_dimensions(file_path, config, None)
}
pub fn analyze_json_file_with_dimensions(
file_path: &Path,
config: &JsonParserConfig,
quality_dimensions: Option<&[QualityDimension]>,
) -> Result<ProfileReport, DataProfilerError> {
let metadata = std::fs::metadata(file_path).map_err(|e| map_io_error(file_path, e))?;
let start = std::time::Instant::now();
let file = std::fs::File::open(file_path).map_err(|e| map_io_error(file_path, e))?;
let buf_reader = std::io::BufReader::new(file);
let (column_profiles, column_stats, rows_read, malformed_lines, format) =
analyze_json_from_reader(buf_reader, config)?;
let file_source = DataSource::File {
path: file_path.display().to_string(),
format,
size_bytes: metadata.len(),
modified_at: None,
parquet_metadata: None,
};
if rows_read == 0 {
if malformed_lines > 0 {
return Err(DataProfilerError::JsonParsingError {
message: "No valid JSON records found in file (malformed JSON encountered)"
.to_string(),
});
}
return Ok(ReportAssembler::new(
file_source,
ExecutionMetadata::new(0, 0, start.elapsed().as_millis()),
)
.skip_quality()
.build());
}
let sample_columns = profile_builder::quality_check_samples(&column_stats);
let scan_time_ms = start.elapsed().as_millis();
let num_columns = column_profiles.len();
let mut assembler = ReportAssembler::new(
file_source,
ExecutionMetadata::new(rows_read, num_columns, scan_time_ms),
)
.columns(column_profiles)
.with_quality_data(sample_columns);
if let Some(dims) = quality_dimensions {
assembler = assembler.with_requested_dimensions(dims.to_vec());
}
Ok(assembler.build())
}
fn map_io_error(file_path: &Path, e: std::io::Error) -> DataProfilerError {
if e.kind() == std::io::ErrorKind::NotFound {
DataProfilerError::FileNotFound {
path: file_path.display().to_string(),
}
} else {
DataProfilerError::from(e)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::{Cursor, Write};
use tempfile::NamedTempFile;
fn write_file(content: &str) -> NamedTempFile {
let mut f = NamedTempFile::new().unwrap();
write!(f, "{}", content).unwrap();
f.flush().unwrap();
f
}
#[test]
fn test_analyze_json_from_reader_jsonl_streaming() {
let data = b"{\"x\":1,\"y\":\"a\"}\n{\"x\":2,\"y\":\"b\"}\n{\"x\":3,\"y\":\"c\"}\n";
let cursor = Cursor::new(data.as_ref());
let config = JsonParserConfig::default();
let (profiles, _stats, rows, _malformed, format) =
analyze_json_from_reader(cursor, &config).unwrap();
assert_eq!(format, FileFormat::Jsonl);
assert_eq!(rows, 3);
assert_eq!(profiles.len(), 2);
}
#[test]
fn test_analyze_json_from_reader_json_array() {
let data = br#"[{"name":"Alice","age":25},{"name":"Bob","age":30}]"#;
let cursor = Cursor::new(data.as_ref());
let config = JsonParserConfig::default();
let (profiles, _stats, rows, _malformed, format) =
analyze_json_from_reader(cursor, &config).unwrap();
assert_eq!(format, FileFormat::Json);
assert_eq!(rows, 2);
assert_eq!(profiles.len(), 2);
}
#[test]
fn test_analyze_json_from_reader_max_rows() {
let data = b"{\"x\":1}\n{\"x\":2}\n{\"x\":3}\n{\"x\":4}\n{\"x\":5}\n";
let cursor = Cursor::new(data.as_ref());
let config = JsonParserConfig::default().with_max_rows(3);
let (_profiles, _stats, rows, _malformed, _format) =
analyze_json_from_reader(cursor, &config).unwrap();
assert_eq!(rows, 3);
}
#[test]
fn test_analyze_json_from_reader_missing_fields() {
let data = b"{\"a\":1,\"b\":2}\n{\"a\":3}\n";
let cursor = Cursor::new(data.as_ref());
let config = JsonParserConfig::jsonl();
let (profiles, _stats, rows, _malformed, _format) =
analyze_json_from_reader(cursor, &config).unwrap();
assert_eq!(rows, 2);
let col_b = profiles.iter().find(|p| p.name == "b").unwrap();
assert_eq!(col_b.total_count, 2); }
#[test]
fn test_analyze_json_file_quality_report() {
let f = write_file(r#"[{"x":1},{"x":2}]"#);
let config = JsonParserConfig::default();
let report = analyze_json_file(f.path(), &config).unwrap();
assert_eq!(report.execution.rows_processed, 2);
assert_eq!(report.column_profiles.len(), 1);
assert!(report.quality_score().unwrap() >= 0.0);
}
#[test]
fn test_jsonl_skips_malformed_lines() {
let data = b"{\"x\":1}\n{\"x\":,malformed}\n{\"x\":3}\n";
let cursor = Cursor::new(data.as_ref());
let config = JsonParserConfig::jsonl();
let (profiles, _stats, rows, _malformed, format) =
analyze_json_from_reader(cursor, &config).unwrap();
assert_eq!(format, FileFormat::Jsonl);
assert_eq!(rows, 2);
assert_eq!(profiles[0].total_count, 2);
}
#[test]
fn test_analyze_json_with_large_leading_whitespace() {
let data = format!("{}[{{\"x\":1}}]", " ".repeat(10_000));
let cursor = Cursor::new(data.into_bytes());
let config = JsonParserConfig::default();
let (_profiles, _stats, rows, malformed, format) =
analyze_json_from_reader(cursor, &config).unwrap();
assert_eq!(format, FileFormat::Json);
assert_eq!(rows, 1);
assert_eq!(malformed, 0);
}
#[test]
fn test_analyze_json_array() {
let json = write_file(r#"[{"name":"Alice","age":25},{"name":"Bob","age":30}]"#);
let config = JsonParserConfig::default();
let report = analyze_json_file(json.path(), &config).unwrap();
let profiles = &report.column_profiles;
assert_eq!(profiles.len(), 2);
let names: Vec<&str> = profiles.iter().map(|p| p.name.as_str()).collect();
assert!(names.contains(&"name"));
assert!(names.contains(&"age"));
let age = profiles.iter().find(|p| p.name == "age").unwrap();
assert_eq!(age.total_count, 2);
assert_eq!(age.null_count, 0);
}
#[test]
fn test_analyze_jsonl() {
let jsonl = write_file("{\"x\":1}\n{\"x\":2}\n{\"x\":3}\n");
let config = JsonParserConfig::default();
let report = analyze_json_file(jsonl.path(), &config).unwrap();
let profiles = &report.column_profiles;
assert_eq!(profiles.len(), 1);
assert_eq!(profiles[0].name, "x");
assert_eq!(profiles[0].total_count, 3);
}
#[test]
fn test_analyze_json_with_nulls() {
let json = write_file(r#"[{"a":"hello","b":1},{"a":null,"b":2},{"a":"world","b":null}]"#);
let config = JsonParserConfig::default();
let report = analyze_json_file(json.path(), &config).unwrap();
let profiles = &report.column_profiles;
let col_a = profiles.iter().find(|p| p.name == "a").unwrap();
assert_eq!(col_a.null_count, 1);
let col_b = profiles.iter().find(|p| p.name == "b").unwrap();
assert_eq!(col_b.null_count, 1);
}
#[test]
fn test_analyze_json_with_missing_fields() {
let json = write_file(r#"[{"a":1,"b":2},{"a":3}]"#);
let config = JsonParserConfig::default();
let report = analyze_json_file(json.path(), &config).unwrap();
let profiles = &report.column_profiles;
let col_b = profiles.iter().find(|p| p.name == "b").unwrap();
assert_eq!(col_b.total_count, 2);
}
#[test]
fn test_analyze_json_empty_array() {
let json = write_file("[]");
let config = JsonParserConfig::default();
let report = analyze_json_file(json.path(), &config).unwrap();
assert!(report.column_profiles.is_empty());
}
#[test]
fn test_analyze_json_malformed_returns_error() {
let json = write_file("this is entirely invalid json");
let config = JsonParserConfig::default();
let err = analyze_json_file(json.path(), &config)
.expect_err("malformed JSON should return an error");
let message = err.to_string().to_lowercase();
assert!(
message.contains("malformed") && message.contains("json"),
"expected parsing error mentioning malformed json, got: {err}"
);
}
#[test]
fn test_analyze_json_file_detects_format() {
let json_array = write_file(r#"[{"x":1}]"#);
let config = JsonParserConfig::default();
let report = analyze_json_file(json_array.path(), &config).unwrap();
assert!(matches!(
report.data_source,
DataSource::File {
format: FileFormat::Json,
..
}
));
let jsonl = write_file("{\"x\":1}\n{\"x\":2}\n");
let report = analyze_json_file(jsonl.path(), &config).unwrap();
assert!(matches!(
report.data_source,
DataSource::File {
format: FileFormat::Jsonl,
..
}
));
}
#[test]
fn test_analyze_json_file_empty() {
let json = write_file("");
let config = JsonParserConfig::default();
let report = analyze_json_file(json.path(), &config).unwrap();
assert_eq!(report.execution.rows_processed, 0);
assert!(report.column_profiles.is_empty());
}
#[test]
fn test_analyze_json_boolean_and_nested() {
let json =
write_file(r#"[{"flag":true,"nested":{"a":1}},{"flag":false,"nested":{"b":2}}]"#);
let config = JsonParserConfig::default();
let report = analyze_json_file(json.path(), &config).unwrap();
let profiles = &report.column_profiles;
let flag = profiles.iter().find(|p| p.name == "flag").unwrap();
assert_eq!(flag.total_count, 2);
let nested = profiles.iter().find(|p| p.name == "nested").unwrap();
assert_eq!(nested.total_count, 2);
}
#[test]
fn test_single_root_object_compact_yields_one_row() {
let data = br#"{"type":"FeatureCollection","features":[1,2,3]}"#;
let cursor = Cursor::new(data.as_ref());
let config = JsonParserConfig::default();
let (profiles, _stats, rows, malformed, _format) =
analyze_json_from_reader(cursor, &config).unwrap();
assert_eq!(rows, 1);
assert_eq!(malformed, 0);
assert_eq!(profiles.len(), 2); let names: Vec<&str> = profiles.iter().map(|p| p.name.as_str()).collect();
assert!(names.contains(&"type"));
assert!(names.contains(&"features"));
}
#[test]
fn test_jsonl_multi_object_still_works() {
let data = b"{\"x\":1}\n{\"x\":2}\n{\"x\":3}\n";
let cursor = Cursor::new(data.as_ref());
let config = JsonParserConfig::default();
let (_profiles, _stats, rows, _malformed, format) =
analyze_json_from_reader(cursor, &config).unwrap();
assert_eq!(format, FileFormat::Jsonl);
assert_eq!(rows, 3);
}
#[test]
fn test_single_root_object_via_analyze_json_file() {
let f = write_file(r#"{"type":"FeatureCollection","features":[{"id":1},{"id":2}]}"#);
let config = JsonParserConfig::default();
let report = analyze_json_file(f.path(), &config).unwrap();
assert_eq!(report.execution.rows_processed, 1);
assert_eq!(report.column_profiles.len(), 2);
}
#[test]
fn test_single_root_object_pretty_printed_yields_one_row() {
let data = br#"{
"type": "FeatureCollection",
"features": [
{"id": 1},
{"id": 2}
]
}"#;
let cursor = Cursor::new(data.as_ref());
let config = JsonParserConfig::default();
let (profiles, _stats, rows, malformed, format) =
analyze_json_from_reader(cursor, &config).unwrap();
assert_eq!(format, FileFormat::Jsonl);
assert_eq!(rows, 1);
assert_eq!(malformed, 0);
assert_eq!(profiles.len(), 2);
}
#[test]
fn test_single_root_object_pretty_printed_via_analyze_json_file() {
let f = write_file(
"{\n \"type\": \"FeatureCollection\",\n \"features\": [\n {\"id\": 1},\n {\"id\": 2}\n ]\n}\n",
);
let config = JsonParserConfig::default();
let report = analyze_json_file(f.path(), &config).unwrap();
assert_eq!(report.execution.rows_processed, 1);
assert_eq!(report.column_profiles.len(), 2);
}
}