base_d/encoders/algorithms/schema/parsers/
json.rs

1use crate::encoders::algorithms::schema::parsers::InputParser;
2use crate::encoders::algorithms::schema::types::*;
3use serde_json::{Map, Value};
4use std::collections::HashMap;
5
6pub struct JsonParser;
7
8impl InputParser for JsonParser {
9    type Error = SchemaError;
10
11    fn parse(input: &str) -> Result<IntermediateRepresentation, Self::Error> {
12        let parsed: Value = serde_json::from_str(input).map_err(|e| {
13            SchemaError::InvalidInput(format!(
14                "Invalid JSON syntax: {}\n\
15                 Ensure the input is valid JSON.",
16                e
17            ))
18        })?;
19
20        match parsed {
21            Value::Array(arr) => parse_array(arr),
22            Value::Object(obj) => parse_object(obj),
23            _ => Err(SchemaError::InvalidInput(
24                "Expected JSON object or array at root level.\n\
25                 Schema encoding works with:\n\
26                 - Single object: {\"name\": \"value\"}\n\
27                 - Array of objects: [{\"id\": 1}, {\"id\": 2}]\n\
28                 - Object with array: {\"users\": [{\"id\": 1}]}"
29                    .to_string(),
30            )),
31        }
32    }
33}
34
35/// Parse array of objects (tabular data)
36fn parse_array(arr: Vec<Value>) -> Result<IntermediateRepresentation, SchemaError> {
37    if arr.is_empty() {
38        return Err(SchemaError::InvalidInput(
39            "Empty array - cannot infer schema from zero rows.\n\
40             Provide at least one object in the array."
41                .to_string(),
42        ));
43    }
44
45    let row_count = arr.len();
46    let mut all_rows: Vec<Map<String, Value>> = Vec::new();
47
48    // Extract objects from array
49    for (idx, item) in arr.into_iter().enumerate() {
50        match item {
51            Value::Object(obj) => all_rows.push(obj),
52            other => {
53                let type_name = match other {
54                    Value::Null => "null",
55                    Value::Bool(_) => "boolean",
56                    Value::Number(_) => "number",
57                    Value::String(_) => "string",
58                    Value::Array(_) => "array",
59                    Value::Object(_) => unreachable!(),
60                };
61                return Err(SchemaError::InvalidInput(format!(
62                    "Array must contain only objects (tabular data). Found {} at index {}.\n\
63                     Schema encoding expects arrays of objects like: [{{\"id\": 1}}, {{\"id\": 2}}]",
64                    type_name, idx
65                )));
66            }
67        }
68    }
69
70    // Flatten all objects and collect field names
71    let mut flattened_rows: Vec<HashMap<String, Value>> = Vec::new();
72    let mut all_field_names = std::collections::BTreeSet::new();
73
74    for obj in &all_rows {
75        let flattened = flatten_object(obj, "");
76        for key in flattened.keys() {
77            all_field_names.insert(key.clone());
78        }
79        flattened_rows.push(flattened);
80    }
81
82    let field_names: Vec<String> = all_field_names.into_iter().collect();
83
84    // Infer types and build fields
85    let mut fields = Vec::new();
86    let mut has_nulls = false;
87
88    for field_name in &field_names {
89        let field_type = infer_field_type(&flattened_rows, field_name, &mut has_nulls)?;
90        fields.push(FieldDef::new(field_name.clone(), field_type));
91    }
92
93    // Build values and null bitmap
94    let mut values = Vec::new();
95    let total_values = row_count * fields.len();
96    let bitmap_bytes = total_values.div_ceil(8);
97    let mut null_bitmap = vec![0u8; bitmap_bytes];
98
99    for (row_idx, row) in flattened_rows.iter().enumerate() {
100        for (field_idx, field) in fields.iter().enumerate() {
101            let value_idx = row_idx * fields.len() + field_idx;
102
103            if let Some(json_value) = row.get(&field.name)
104                && json_value.is_null()
105            {
106                values.push(SchemaValue::Null);
107                set_null_bit(&mut null_bitmap, value_idx);
108                has_nulls = true;
109            } else if let Some(json_value) = row.get(&field.name) {
110                values.push(json_to_schema_value(json_value, &field.field_type)?);
111            } else {
112                // Missing field = null
113                values.push(SchemaValue::Null);
114                set_null_bit(&mut null_bitmap, value_idx);
115                has_nulls = true;
116            }
117        }
118    }
119
120    // Build header
121    let mut header = SchemaHeader::new(row_count, fields);
122    if has_nulls {
123        header.null_bitmap = Some(null_bitmap);
124        header.set_flag(FLAG_HAS_NULLS);
125    }
126
127    IntermediateRepresentation::new(header, values)
128}
129
130/// Parse single object (may have root key)
131fn parse_object(obj: Map<String, Value>) -> Result<IntermediateRepresentation, SchemaError> {
132    // Check for common pagination wrapper keys
133    const WRAPPER_KEYS: &[&str] = &["results", "data", "items", "records"];
134
135    // Check if this is a wrapper object with one of the known keys
136    if obj.len() == 1 {
137        // Check if value is an array of objects before consuming
138        let is_root_key_pattern = obj
139            .values()
140            .next()
141            .map(|v| {
142                if let Value::Array(arr) = v {
143                    // Only treat as root key if array contains objects (tabular data)
144                    !arr.is_empty() && arr.iter().all(|item| matches!(item, Value::Object(_)))
145                } else {
146                    false
147                }
148            })
149            .unwrap_or(false);
150
151        if is_root_key_pattern {
152            // Extract key and value by consuming the map
153            let (key, value) = obj.into_iter().next().unwrap();
154            // We already checked it's an array
155            let arr = match value {
156                Value::Array(a) => a,
157                _ => unreachable!(),
158            };
159
160            // Parse as array with root key
161            let mut ir = parse_array(arr)?;
162            ir.header.root_key = Some(key);
163            ir.header.set_flag(FLAG_HAS_ROOT_KEY);
164            return Ok(ir);
165        }
166    }
167
168    // Check for known wrapper patterns and unwrap them
169    for wrapper_key in WRAPPER_KEYS {
170        if let Some(Value::Array(arr)) = obj.get(*wrapper_key)
171            && !arr.is_empty()
172            && arr.iter().all(|item| matches!(item, Value::Object(_)))
173        {
174            // Found a wrapper key - unwrap and parse the array
175            let arr = arr.clone();
176            let mut ir = parse_array(arr)?;
177            ir.header.root_key = Some((*wrapper_key).to_string());
178            ir.header.set_flag(FLAG_HAS_ROOT_KEY);
179            return Ok(ir);
180        }
181    }
182
183    // Single object - treat as single row
184    let flattened = flatten_object(&obj, "");
185    // Preserve field order from original object (serde_json preserves insertion order)
186    let mut field_names = Vec::new();
187    collect_field_names_ordered(&obj, "", &mut field_names);
188
189    let mut fields = Vec::new();
190    let mut has_nulls = false;
191
192    for field_name in &field_names {
193        let value = &flattened[field_name];
194        let field_type = infer_type(value);
195        if value.is_null() {
196            has_nulls = true;
197        }
198        fields.push(FieldDef::new(field_name.clone(), field_type));
199    }
200
201    // Build values and null bitmap
202    let mut values = Vec::new();
203    let total_values = fields.len();
204    let bitmap_bytes = total_values.div_ceil(8);
205    let mut null_bitmap = vec![0u8; bitmap_bytes];
206
207    for (field_idx, field) in fields.iter().enumerate() {
208        let json_value = &flattened[&field.name];
209        if json_value.is_null() {
210            values.push(SchemaValue::Null);
211            set_null_bit(&mut null_bitmap, field_idx);
212        } else {
213            values.push(json_to_schema_value(json_value, &field.field_type)?);
214        }
215    }
216
217    // Build header
218    let mut header = SchemaHeader::new(1, fields);
219    if has_nulls {
220        header.null_bitmap = Some(null_bitmap);
221        header.set_flag(FLAG_HAS_NULLS);
222    }
223
224    IntermediateRepresentation::new(header, values)
225}
226
227/// Collect field names in order from nested object
228fn collect_field_names_ordered(obj: &Map<String, Value>, prefix: &str, names: &mut Vec<String>) {
229    for (key, value) in obj {
230        let full_key = if prefix.is_empty() {
231            key.clone()
232        } else {
233            format!("{}.{}", prefix, key)
234        };
235
236        match value {
237            Value::Object(nested) => {
238                collect_field_names_ordered(nested, &full_key, names);
239            }
240            _ => {
241                names.push(full_key);
242            }
243        }
244    }
245}
246
247/// Flatten nested object into dotted keys
248fn flatten_object(obj: &Map<String, Value>, prefix: &str) -> HashMap<String, Value> {
249    let mut result = HashMap::new();
250
251    for (key, value) in obj {
252        let full_key = if prefix.is_empty() {
253            key.clone()
254        } else {
255            format!("{}.{}", prefix, key)
256        };
257
258        match value {
259            Value::Object(nested) => {
260                result.extend(flatten_object(nested, &full_key));
261            }
262            _ => {
263                result.insert(full_key, value.clone());
264            }
265        }
266    }
267
268    result
269}
270
271/// Infer type from a single JSON value
272fn infer_type(value: &Value) -> FieldType {
273    match value {
274        Value::Null => FieldType::Null,
275        Value::Bool(_) => FieldType::Bool,
276        Value::Number(n) => {
277            if n.is_f64() {
278                // Check if it has a fractional part
279                if let Some(f) = n.as_f64()
280                    && (f.fract() != 0.0 || f.is_infinite() || f.is_nan())
281                {
282                    return FieldType::F64;
283                }
284            }
285
286            if let Some(i) = n.as_i64() {
287                if i < 0 {
288                    FieldType::I64
289                } else {
290                    FieldType::U64
291                }
292            } else if n.as_u64().is_some() {
293                FieldType::U64
294            } else {
295                FieldType::F64
296            }
297        }
298        Value::String(_) => FieldType::String,
299        Value::Array(arr) => {
300            if arr.is_empty() {
301                FieldType::Array(Box::new(FieldType::Null))
302            } else {
303                // Infer from first non-null element
304                let element_type = arr
305                    .iter()
306                    .find(|v| !v.is_null())
307                    .map(infer_type)
308                    .unwrap_or(FieldType::Null);
309                FieldType::Array(Box::new(element_type))
310            }
311        }
312        Value::Object(_) => {
313            // This shouldn't happen after flattening
314            FieldType::String
315        }
316    }
317}
318
319/// Infer field type across multiple rows
320fn infer_field_type(
321    rows: &[HashMap<String, Value>],
322    field_name: &str,
323    has_nulls: &mut bool,
324) -> Result<FieldType, SchemaError> {
325    let mut inferred_type: Option<FieldType> = None;
326
327    for row in rows {
328        if let Some(value) = row.get(field_name) {
329            if value.is_null() {
330                *has_nulls = true;
331                continue;
332            }
333
334            let current_type = infer_type(value);
335
336            if let Some(ref existing_type) = inferred_type {
337                // Special case: Array(Null) unifies with Array(T) → Array(T)
338                if let (FieldType::Array(existing_inner), FieldType::Array(current_inner)) =
339                    (existing_type, &current_type)
340                {
341                    if **existing_inner == FieldType::Null && **current_inner != FieldType::Null {
342                        // Upgrade from Array(Null) to Array(T)
343                        inferred_type = Some(current_type.clone());
344                        continue;
345                    } else if **current_inner == FieldType::Null
346                        && **existing_inner != FieldType::Null
347                    {
348                        // Keep existing Array(T), ignore Array(Null)
349                        continue;
350                    }
351                }
352
353                if *existing_type != current_type {
354                    // Type conflict - use Any
355                    return Ok(FieldType::Any);
356                }
357            } else {
358                inferred_type = Some(current_type);
359            }
360        } else {
361            *has_nulls = true;
362        }
363    }
364
365    Ok(inferred_type.unwrap_or(FieldType::Null))
366}
367
368/// Convert JSON value to SchemaValue
369fn json_to_schema_value(
370    value: &Value,
371    expected_type: &FieldType,
372) -> Result<SchemaValue, SchemaError> {
373    match value {
374        Value::Null => Ok(SchemaValue::Null),
375        Value::Bool(b) => Ok(SchemaValue::Bool(*b)),
376        Value::Number(n) => match expected_type {
377            FieldType::U64 | FieldType::Any => {
378                if let Some(u) = n.as_u64() {
379                    Ok(SchemaValue::U64(u))
380                } else if let Some(i) = n.as_i64() {
381                    Ok(SchemaValue::I64(i))
382                } else {
383                    Ok(SchemaValue::F64(n.as_f64().unwrap()))
384                }
385            }
386            FieldType::I64 => {
387                if let Some(i) = n.as_i64() {
388                    Ok(SchemaValue::I64(i))
389                } else {
390                    Ok(SchemaValue::I64(n.as_f64().unwrap() as i64))
391                }
392            }
393            FieldType::F64 => Ok(SchemaValue::F64(n.as_f64().unwrap())),
394            _ => Err(SchemaError::InvalidInput(format!(
395                "Type mismatch: expected {}, but found number.\n\
396                 The field type was inferred or specified as {}, which doesn't accept numeric values.",
397                expected_type.display_name(),
398                expected_type.display_name()
399            ))),
400        },
401        Value::String(s) => Ok(SchemaValue::String(s.clone())),
402        Value::Array(arr) => {
403            let element_type = if let FieldType::Array(et) = expected_type {
404                et.as_ref()
405            } else {
406                return Err(SchemaError::InvalidInput(format!(
407                    "Internal error: Expected array type but found {}. This is a bug in type inference.",
408                    expected_type.display_name()
409                )));
410            };
411
412            let mut schema_values = Vec::new();
413            for item in arr {
414                schema_values.push(json_to_schema_value(item, element_type)?);
415            }
416            Ok(SchemaValue::Array(schema_values))
417        }
418        Value::Object(_) => Err(SchemaError::InvalidInput(
419            "Internal error: Encountered nested object that wasn't flattened. This is a bug in the JSON parser."
420                .to_string(),
421        )),
422    }
423}
424
425/// Set a bit in the null bitmap
426fn set_null_bit(bitmap: &mut [u8], index: usize) {
427    let byte_idx = index / 8;
428    let bit_idx = index % 8;
429    bitmap[byte_idx] |= 1 << bit_idx;
430}
431
432#[cfg(test)]
433mod tests {
434    use super::*;
435
436    #[test]
437    fn test_simple_object() {
438        let input = r#"{"id":1,"name":"alice"}"#;
439        let ir = JsonParser::parse(input).unwrap();
440
441        assert_eq!(ir.header.row_count, 1);
442        assert_eq!(ir.header.fields.len(), 2);
443        assert_eq!(ir.values.len(), 2);
444    }
445
446    #[test]
447    fn test_array_of_objects() {
448        let input = r#"[{"id":1,"name":"alice"},{"id":2,"name":"bob"}]"#;
449        let ir = JsonParser::parse(input).unwrap();
450
451        assert_eq!(ir.header.row_count, 2);
452        assert_eq!(ir.header.fields.len(), 2);
453        assert_eq!(ir.values.len(), 4);
454    }
455
456    #[test]
457    fn test_nested_object() {
458        let input = r#"{"user":{"profile":{"name":"alice"}}}"#;
459        let ir = JsonParser::parse(input).unwrap();
460
461        assert_eq!(ir.header.row_count, 1);
462        assert_eq!(ir.header.fields.len(), 1);
463        assert_eq!(ir.header.fields[0].name, "user.profile.name");
464    }
465
466    #[test]
467    fn test_root_key() {
468        let input = r#"{"users":[{"id":1}]}"#;
469        let ir = JsonParser::parse(input).unwrap();
470
471        assert_eq!(ir.header.root_key, Some("users".to_string()));
472        assert!(ir.header.has_flag(FLAG_HAS_ROOT_KEY));
473    }
474
475    #[test]
476    fn test_all_types() {
477        let input = r#"{"u":1,"i":-1,"f":3.14,"s":"test","b":true,"n":null}"#;
478        let ir = JsonParser::parse(input).unwrap();
479
480        assert_eq!(ir.header.fields.len(), 6);
481        assert!(ir.header.has_flag(FLAG_HAS_NULLS));
482    }
483
484    #[test]
485    fn test_null_handling() {
486        let input = r#"{"name":"alice","age":null}"#;
487        let ir = JsonParser::parse(input).unwrap();
488
489        assert!(ir.header.has_flag(FLAG_HAS_NULLS));
490
491        // Find which field is "age"
492        let age_idx = ir
493            .header
494            .fields
495            .iter()
496            .position(|f| f.name == "age")
497            .unwrap();
498        assert!(ir.is_null(0, age_idx)); // age field is null
499    }
500
501    #[test]
502    fn test_homogeneous_array() {
503        let input = r#"{"scores":[1,2,3]}"#;
504        let ir = JsonParser::parse(input).unwrap();
505
506        assert_eq!(
507            ir.header.fields[0].field_type,
508            FieldType::Array(Box::new(FieldType::U64))
509        );
510    }
511
512    #[test]
513    fn test_empty_array() {
514        let input = r#"{"items":[]}"#;
515        let ir = JsonParser::parse(input).unwrap();
516
517        assert_eq!(
518            ir.header.fields[0].field_type,
519            FieldType::Array(Box::new(FieldType::Null))
520        );
521    }
522
523    #[test]
524    fn test_deep_nesting() {
525        let input = r#"{"a":{"b":{"c":{"d":1}}}}"#;
526        let ir = JsonParser::parse(input).unwrap();
527
528        assert_eq!(ir.header.fields[0].name, "a.b.c.d");
529    }
530
531    #[test]
532    fn test_flatten_object() {
533        let obj: Map<String, Value> = serde_json::from_str(r#"{"a":{"b":1}}"#).unwrap();
534        let flattened = flatten_object(&obj, "");
535
536        assert_eq!(flattened.len(), 1);
537        assert!(flattened.contains_key("a.b"));
538    }
539}