base_d/encoders/algorithms/schema/parsers/
json.rs1use crate::encoders::algorithms::schema::parsers::InputParser;
2use crate::encoders::algorithms::schema::types::*;
3use serde_json::{Map, Value};
4use std::collections::HashMap;
5
6pub struct JsonParser;
7
8impl InputParser for JsonParser {
9 type Error = SchemaError;
10
11 fn parse(input: &str) -> Result<IntermediateRepresentation, Self::Error> {
12 let parsed: Value = serde_json::from_str(input).map_err(|e| {
13 SchemaError::InvalidInput(format!(
14 "Invalid JSON syntax: {}\n\
15 Ensure the input is valid JSON.",
16 e
17 ))
18 })?;
19
20 match parsed {
21 Value::Array(arr) => parse_array(arr),
22 Value::Object(obj) => parse_object(obj),
23 _ => Err(SchemaError::InvalidInput(
24 "Expected JSON object or array at root level.\n\
25 Schema encoding works with:\n\
26 - Single object: {\"name\": \"value\"}\n\
27 - Array of objects: [{\"id\": 1}, {\"id\": 2}]\n\
28 - Object with array: {\"users\": [{\"id\": 1}]}"
29 .to_string(),
30 )),
31 }
32 }
33}
34
35fn parse_array(arr: Vec<Value>) -> Result<IntermediateRepresentation, SchemaError> {
37 if arr.is_empty() {
38 return Err(SchemaError::InvalidInput(
39 "Empty array - cannot infer schema from zero rows.\n\
40 Provide at least one object in the array."
41 .to_string(),
42 ));
43 }
44
45 let row_count = arr.len();
46 let mut all_rows: Vec<Map<String, Value>> = Vec::new();
47
48 for (idx, item) in arr.into_iter().enumerate() {
50 match item {
51 Value::Object(obj) => all_rows.push(obj),
52 other => {
53 let type_name = match other {
54 Value::Null => "null",
55 Value::Bool(_) => "boolean",
56 Value::Number(_) => "number",
57 Value::String(_) => "string",
58 Value::Array(_) => "array",
59 Value::Object(_) => unreachable!(),
60 };
61 return Err(SchemaError::InvalidInput(format!(
62 "Array must contain only objects (tabular data). Found {} at index {}.\n\
63 Schema encoding expects arrays of objects like: [{{\"id\": 1}}, {{\"id\": 2}}]",
64 type_name, idx
65 )));
66 }
67 }
68 }
69
70 let mut flattened_rows: Vec<HashMap<String, Value>> = Vec::new();
72 let mut all_field_names = std::collections::BTreeSet::new();
73
74 for obj in &all_rows {
75 let flattened = flatten_object(obj, "");
76 for key in flattened.keys() {
77 all_field_names.insert(key.clone());
78 }
79 flattened_rows.push(flattened);
80 }
81
82 let field_names: Vec<String> = all_field_names.into_iter().collect();
83
84 let mut fields = Vec::new();
86 let mut has_nulls = false;
87
88 for field_name in &field_names {
89 let field_type = infer_field_type(&flattened_rows, field_name, &mut has_nulls)?;
90 fields.push(FieldDef::new(field_name.clone(), field_type));
91 }
92
93 let mut values = Vec::new();
95 let total_values = row_count * fields.len();
96 let bitmap_bytes = total_values.div_ceil(8);
97 let mut null_bitmap = vec![0u8; bitmap_bytes];
98
99 for (row_idx, row) in flattened_rows.iter().enumerate() {
100 for (field_idx, field) in fields.iter().enumerate() {
101 let value_idx = row_idx * fields.len() + field_idx;
102
103 if let Some(json_value) = row.get(&field.name)
104 && json_value.is_null()
105 {
106 values.push(SchemaValue::Null);
107 set_null_bit(&mut null_bitmap, value_idx);
108 has_nulls = true;
109 } else if let Some(json_value) = row.get(&field.name) {
110 values.push(json_to_schema_value(json_value, &field.field_type)?);
111 } else {
112 values.push(SchemaValue::Null);
114 set_null_bit(&mut null_bitmap, value_idx);
115 has_nulls = true;
116 }
117 }
118 }
119
120 let mut header = SchemaHeader::new(row_count, fields);
122 if has_nulls {
123 header.null_bitmap = Some(null_bitmap);
124 header.set_flag(FLAG_HAS_NULLS);
125 }
126
127 IntermediateRepresentation::new(header, values)
128}
129
130fn parse_object(obj: Map<String, Value>) -> Result<IntermediateRepresentation, SchemaError> {
132 const WRAPPER_KEYS: &[&str] = &["results", "data", "items", "records"];
134
135 if obj.len() == 1 {
137 let is_root_key_pattern = obj
139 .values()
140 .next()
141 .map(|v| {
142 if let Value::Array(arr) = v {
143 !arr.is_empty() && arr.iter().all(|item| matches!(item, Value::Object(_)))
145 } else {
146 false
147 }
148 })
149 .unwrap_or(false);
150
151 if is_root_key_pattern {
152 let (key, value) = obj.into_iter().next().unwrap();
154 let arr = match value {
156 Value::Array(a) => a,
157 _ => unreachable!(),
158 };
159
160 let mut ir = parse_array(arr)?;
162 ir.header.root_key = Some(key);
163 ir.header.set_flag(FLAG_HAS_ROOT_KEY);
164 return Ok(ir);
165 }
166 }
167
168 for wrapper_key in WRAPPER_KEYS {
170 if let Some(Value::Array(arr)) = obj.get(*wrapper_key)
171 && !arr.is_empty()
172 && arr.iter().all(|item| matches!(item, Value::Object(_)))
173 {
174 let arr = arr.clone();
176 let mut ir = parse_array(arr)?;
177 ir.header.root_key = Some((*wrapper_key).to_string());
178 ir.header.set_flag(FLAG_HAS_ROOT_KEY);
179 return Ok(ir);
180 }
181 }
182
183 let flattened = flatten_object(&obj, "");
185 let mut field_names = Vec::new();
187 collect_field_names_ordered(&obj, "", &mut field_names);
188
189 let mut fields = Vec::new();
190 let mut has_nulls = false;
191
192 for field_name in &field_names {
193 let value = &flattened[field_name];
194 let field_type = infer_type(value);
195 if value.is_null() {
196 has_nulls = true;
197 }
198 fields.push(FieldDef::new(field_name.clone(), field_type));
199 }
200
201 let mut values = Vec::new();
203 let total_values = fields.len();
204 let bitmap_bytes = total_values.div_ceil(8);
205 let mut null_bitmap = vec![0u8; bitmap_bytes];
206
207 for (field_idx, field) in fields.iter().enumerate() {
208 let json_value = &flattened[&field.name];
209 if json_value.is_null() {
210 values.push(SchemaValue::Null);
211 set_null_bit(&mut null_bitmap, field_idx);
212 } else {
213 values.push(json_to_schema_value(json_value, &field.field_type)?);
214 }
215 }
216
217 let mut header = SchemaHeader::new(1, fields);
219 if has_nulls {
220 header.null_bitmap = Some(null_bitmap);
221 header.set_flag(FLAG_HAS_NULLS);
222 }
223
224 IntermediateRepresentation::new(header, values)
225}
226
227fn collect_field_names_ordered(obj: &Map<String, Value>, prefix: &str, names: &mut Vec<String>) {
229 for (key, value) in obj {
230 let full_key = if prefix.is_empty() {
231 key.clone()
232 } else {
233 format!("{}.{}", prefix, key)
234 };
235
236 match value {
237 Value::Object(nested) => {
238 collect_field_names_ordered(nested, &full_key, names);
239 }
240 _ => {
241 names.push(full_key);
242 }
243 }
244 }
245}
246
247fn flatten_object(obj: &Map<String, Value>, prefix: &str) -> HashMap<String, Value> {
249 let mut result = HashMap::new();
250
251 for (key, value) in obj {
252 let full_key = if prefix.is_empty() {
253 key.clone()
254 } else {
255 format!("{}.{}", prefix, key)
256 };
257
258 match value {
259 Value::Object(nested) => {
260 result.extend(flatten_object(nested, &full_key));
261 }
262 _ => {
263 result.insert(full_key, value.clone());
264 }
265 }
266 }
267
268 result
269}
270
271fn infer_type(value: &Value) -> FieldType {
273 match value {
274 Value::Null => FieldType::Null,
275 Value::Bool(_) => FieldType::Bool,
276 Value::Number(n) => {
277 if n.is_f64() {
278 if let Some(f) = n.as_f64()
280 && (f.fract() != 0.0 || f.is_infinite() || f.is_nan())
281 {
282 return FieldType::F64;
283 }
284 }
285
286 if let Some(i) = n.as_i64() {
287 if i < 0 {
288 FieldType::I64
289 } else {
290 FieldType::U64
291 }
292 } else if n.as_u64().is_some() {
293 FieldType::U64
294 } else {
295 FieldType::F64
296 }
297 }
298 Value::String(_) => FieldType::String,
299 Value::Array(arr) => {
300 if arr.is_empty() {
301 FieldType::Array(Box::new(FieldType::Null))
302 } else {
303 let element_type = arr
305 .iter()
306 .find(|v| !v.is_null())
307 .map(infer_type)
308 .unwrap_or(FieldType::Null);
309 FieldType::Array(Box::new(element_type))
310 }
311 }
312 Value::Object(_) => {
313 FieldType::String
315 }
316 }
317}
318
319fn infer_field_type(
321 rows: &[HashMap<String, Value>],
322 field_name: &str,
323 has_nulls: &mut bool,
324) -> Result<FieldType, SchemaError> {
325 let mut inferred_type: Option<FieldType> = None;
326
327 for row in rows {
328 if let Some(value) = row.get(field_name) {
329 if value.is_null() {
330 *has_nulls = true;
331 continue;
332 }
333
334 let current_type = infer_type(value);
335
336 if let Some(ref existing_type) = inferred_type {
337 if let (FieldType::Array(existing_inner), FieldType::Array(current_inner)) =
339 (existing_type, ¤t_type)
340 {
341 if **existing_inner == FieldType::Null && **current_inner != FieldType::Null {
342 inferred_type = Some(current_type.clone());
344 continue;
345 } else if **current_inner == FieldType::Null
346 && **existing_inner != FieldType::Null
347 {
348 continue;
350 }
351 }
352
353 if *existing_type != current_type {
354 return Ok(FieldType::Any);
356 }
357 } else {
358 inferred_type = Some(current_type);
359 }
360 } else {
361 *has_nulls = true;
362 }
363 }
364
365 Ok(inferred_type.unwrap_or(FieldType::Null))
366}
367
368fn json_to_schema_value(
370 value: &Value,
371 expected_type: &FieldType,
372) -> Result<SchemaValue, SchemaError> {
373 match value {
374 Value::Null => Ok(SchemaValue::Null),
375 Value::Bool(b) => Ok(SchemaValue::Bool(*b)),
376 Value::Number(n) => match expected_type {
377 FieldType::U64 | FieldType::Any => {
378 if let Some(u) = n.as_u64() {
379 Ok(SchemaValue::U64(u))
380 } else if let Some(i) = n.as_i64() {
381 Ok(SchemaValue::I64(i))
382 } else {
383 Ok(SchemaValue::F64(n.as_f64().unwrap()))
384 }
385 }
386 FieldType::I64 => {
387 if let Some(i) = n.as_i64() {
388 Ok(SchemaValue::I64(i))
389 } else {
390 Ok(SchemaValue::I64(n.as_f64().unwrap() as i64))
391 }
392 }
393 FieldType::F64 => Ok(SchemaValue::F64(n.as_f64().unwrap())),
394 _ => Err(SchemaError::InvalidInput(format!(
395 "Type mismatch: expected {}, but found number.\n\
396 The field type was inferred or specified as {}, which doesn't accept numeric values.",
397 expected_type.display_name(),
398 expected_type.display_name()
399 ))),
400 },
401 Value::String(s) => Ok(SchemaValue::String(s.clone())),
402 Value::Array(arr) => {
403 let element_type = if let FieldType::Array(et) = expected_type {
404 et.as_ref()
405 } else {
406 return Err(SchemaError::InvalidInput(format!(
407 "Internal error: Expected array type but found {}. This is a bug in type inference.",
408 expected_type.display_name()
409 )));
410 };
411
412 let mut schema_values = Vec::new();
413 for item in arr {
414 schema_values.push(json_to_schema_value(item, element_type)?);
415 }
416 Ok(SchemaValue::Array(schema_values))
417 }
418 Value::Object(_) => Err(SchemaError::InvalidInput(
419 "Internal error: Encountered nested object that wasn't flattened. This is a bug in the JSON parser."
420 .to_string(),
421 )),
422 }
423}
424
425fn set_null_bit(bitmap: &mut [u8], index: usize) {
427 let byte_idx = index / 8;
428 let bit_idx = index % 8;
429 bitmap[byte_idx] |= 1 << bit_idx;
430}
431
432#[cfg(test)]
433mod tests {
434 use super::*;
435
436 #[test]
437 fn test_simple_object() {
438 let input = r#"{"id":1,"name":"alice"}"#;
439 let ir = JsonParser::parse(input).unwrap();
440
441 assert_eq!(ir.header.row_count, 1);
442 assert_eq!(ir.header.fields.len(), 2);
443 assert_eq!(ir.values.len(), 2);
444 }
445
446 #[test]
447 fn test_array_of_objects() {
448 let input = r#"[{"id":1,"name":"alice"},{"id":2,"name":"bob"}]"#;
449 let ir = JsonParser::parse(input).unwrap();
450
451 assert_eq!(ir.header.row_count, 2);
452 assert_eq!(ir.header.fields.len(), 2);
453 assert_eq!(ir.values.len(), 4);
454 }
455
456 #[test]
457 fn test_nested_object() {
458 let input = r#"{"user":{"profile":{"name":"alice"}}}"#;
459 let ir = JsonParser::parse(input).unwrap();
460
461 assert_eq!(ir.header.row_count, 1);
462 assert_eq!(ir.header.fields.len(), 1);
463 assert_eq!(ir.header.fields[0].name, "user.profile.name");
464 }
465
466 #[test]
467 fn test_root_key() {
468 let input = r#"{"users":[{"id":1}]}"#;
469 let ir = JsonParser::parse(input).unwrap();
470
471 assert_eq!(ir.header.root_key, Some("users".to_string()));
472 assert!(ir.header.has_flag(FLAG_HAS_ROOT_KEY));
473 }
474
475 #[test]
476 fn test_all_types() {
477 let input = r#"{"u":1,"i":-1,"f":3.14,"s":"test","b":true,"n":null}"#;
478 let ir = JsonParser::parse(input).unwrap();
479
480 assert_eq!(ir.header.fields.len(), 6);
481 assert!(ir.header.has_flag(FLAG_HAS_NULLS));
482 }
483
484 #[test]
485 fn test_null_handling() {
486 let input = r#"{"name":"alice","age":null}"#;
487 let ir = JsonParser::parse(input).unwrap();
488
489 assert!(ir.header.has_flag(FLAG_HAS_NULLS));
490
491 let age_idx = ir
493 .header
494 .fields
495 .iter()
496 .position(|f| f.name == "age")
497 .unwrap();
498 assert!(ir.is_null(0, age_idx)); }
500
501 #[test]
502 fn test_homogeneous_array() {
503 let input = r#"{"scores":[1,2,3]}"#;
504 let ir = JsonParser::parse(input).unwrap();
505
506 assert_eq!(
507 ir.header.fields[0].field_type,
508 FieldType::Array(Box::new(FieldType::U64))
509 );
510 }
511
512 #[test]
513 fn test_empty_array() {
514 let input = r#"{"items":[]}"#;
515 let ir = JsonParser::parse(input).unwrap();
516
517 assert_eq!(
518 ir.header.fields[0].field_type,
519 FieldType::Array(Box::new(FieldType::Null))
520 );
521 }
522
523 #[test]
524 fn test_deep_nesting() {
525 let input = r#"{"a":{"b":{"c":{"d":1}}}}"#;
526 let ir = JsonParser::parse(input).unwrap();
527
528 assert_eq!(ir.header.fields[0].name, "a.b.c.d");
529 }
530
531 #[test]
532 fn test_flatten_object() {
533 let obj: Map<String, Value> = serde_json::from_str(r#"{"a":{"b":1}}"#).unwrap();
534 let flattened = flatten_object(&obj, "");
535
536 assert_eq!(flattened.len(), 1);
537 assert!(flattened.contains_key("a.b"));
538 }
539}