Skip to main content

faucet_stream/
schema.rs

1//! JSON Schema inference from record samples.
2//!
3//! Given a slice of JSON values (records from a REST API), produces a JSON Schema
4//! that is valid for all of them.  The algorithm:
5//!
6//! * Each field type is inferred independently per record then **merged** across records.
7//! * A field absent from some records gets `"null"` added to its type.
8//! * `"integer"` widens to `"number"` when the same field is an integer in some records
9//!   and a float in others.
10//! * Nested objects are recursively inferred and merged.
11
12use serde_json::{Map, Value, json};
13use std::collections::HashSet;
14
15/// Infer a JSON Schema `object` descriptor from a slice of record values.
16///
17/// Non-object top-level values are ignored.  Returns an empty-properties
18/// object schema when `records` is empty or contains no objects.
19pub fn infer_schema(records: &[Value]) -> Value {
20    let objects: Vec<&Map<String, Value>> = records.iter().filter_map(|r| r.as_object()).collect();
21
22    if objects.is_empty() {
23        return json!({"type": "object", "properties": {}});
24    }
25
26    // Collect all field names across all records.
27    let all_keys: HashSet<&String> = objects.iter().flat_map(|o| o.keys()).collect();
28
29    let mut properties = Map::new();
30
31    for key in all_keys {
32        let values: Vec<&Value> = objects.iter().filter_map(|o| o.get(key)).collect();
33        let records_with_key = values.len();
34
35        let mut field_schema = values
36            .into_iter()
37            .map(infer_value_schema)
38            .reduce(merge_schemas)
39            .unwrap_or_else(|| json!({}));
40
41        // Fields absent from some records are implicitly nullable.
42        if records_with_key < objects.len() {
43            add_null_type(&mut field_schema);
44        }
45
46        properties.insert(key.clone(), field_schema);
47    }
48
49    json!({
50        "type": "object",
51        "properties": Value::Object(properties)
52    })
53}
54
55// ── Internal helpers ──────────────────────────────────────────────────────────
56
57fn infer_value_schema(v: &Value) -> Value {
58    match v {
59        Value::Null => json!({"type": "null"}),
60        Value::Bool(_) => json!({"type": "boolean"}),
61        Value::Number(n) => {
62            if n.is_i64() || n.is_u64() {
63                json!({"type": "integer"})
64            } else {
65                json!({"type": "number"})
66            }
67        }
68        Value::String(_) => json!({"type": "string"}),
69        Value::Array(arr) => {
70            let items = if arr.is_empty() {
71                json!({})
72            } else {
73                arr.iter()
74                    .map(infer_value_schema)
75                    .reduce(merge_schemas)
76                    .unwrap_or_else(|| json!({}))
77            };
78            json!({"type": "array", "items": items})
79        }
80        Value::Object(map) => {
81            let props: Map<String, Value> = map
82                .iter()
83                .map(|(k, v)| (k.clone(), infer_value_schema(v)))
84                .collect();
85            json!({"type": "object", "properties": Value::Object(props)})
86        }
87    }
88}
89
90/// Merge two schemas into one that is valid for both.
91fn merge_schemas(a: Value, b: Value) -> Value {
92    let mut types = collect_types(&a)
93        .union(&collect_types(&b))
94        .cloned()
95        .collect::<Vec<_>>();
96
97    // Numeric widening: integer + number → number.
98    if types.contains(&"integer".to_string()) && types.contains(&"number".to_string()) {
99        types.retain(|t| t != "integer");
100    }
101    types.sort();
102    types.dedup();
103
104    // Merge object properties when both schemas are (or include) objects.
105    if types.contains(&"object".to_string()) {
106        let props = merge_properties(extract_properties(&a), extract_properties(&b));
107        return json!({
108            "type": make_type_value(types),
109            "properties": Value::Object(props)
110        });
111    }
112
113    // Merge array item schemas.
114    if types == ["array"] {
115        let items_a = a.get("items").cloned().unwrap_or_else(|| json!({}));
116        let items_b = b.get("items").cloned().unwrap_or_else(|| json!({}));
117        return json!({
118            "type": "array",
119            "items": merge_schemas(items_a, items_b)
120        });
121    }
122
123    json!({"type": make_type_value(types)})
124}
125
126fn merge_properties(a: Map<String, Value>, b: Map<String, Value>) -> Map<String, Value> {
127    let keys_a: HashSet<String> = a.keys().cloned().collect();
128    let keys_b: HashSet<String> = b.keys().cloned().collect();
129    let mut result = Map::new();
130
131    // Keys in both: merge.
132    for key in keys_a.intersection(&keys_b) {
133        result.insert(key.clone(), merge_schemas(a[key].clone(), b[key].clone()));
134    }
135    // Keys only in A: field can be absent → nullable.
136    for key in keys_a.difference(&keys_b) {
137        let mut s = a[key].clone();
138        add_null_type(&mut s);
139        result.insert(key.clone(), s);
140    }
141    // Keys only in B: field can be absent → nullable.
142    for key in keys_b.difference(&keys_a) {
143        let mut s = b[key].clone();
144        add_null_type(&mut s);
145        result.insert(key.clone(), s);
146    }
147
148    result
149}
150
151fn collect_types(schema: &Value) -> HashSet<String> {
152    match schema.get("type") {
153        Some(Value::String(t)) => std::iter::once(t.clone()).collect(),
154        Some(Value::Array(arr)) => arr
155            .iter()
156            .filter_map(|v| v.as_str().map(String::from))
157            .collect(),
158        _ => HashSet::new(),
159    }
160}
161
162fn extract_properties(schema: &Value) -> Map<String, Value> {
163    schema
164        .get("properties")
165        .and_then(|p| p.as_object())
166        .cloned()
167        .unwrap_or_default()
168}
169
170/// Add `"null"` to the type of `schema` if not already present.
171fn add_null_type(schema: &mut Value) {
172    let mut types = collect_types(schema);
173    if types.contains("null") {
174        return;
175    }
176    types.insert("null".to_string());
177    let new_type = make_type_value(types.into_iter().collect());
178    if let Some(t) = schema.get_mut("type") {
179        *t = new_type;
180    }
181}
182
183fn make_type_value(mut types: Vec<String>) -> Value {
184    types.sort();
185    types.dedup();
186    if types.len() == 1 {
187        Value::String(types.remove(0))
188    } else {
189        Value::Array(types.into_iter().map(Value::String).collect())
190    }
191}
192
193// ── Tests ─────────────────────────────────────────────────────────────────────
194
195#[cfg(test)]
196mod tests {
197    use super::*;
198    use serde_json::json;
199
200    #[test]
201    fn test_infer_schema_basic_types() {
202        let records = vec![json!({"id": 1, "name": "Alice", "score": 9.5, "active": true})];
203        let schema = infer_schema(&records);
204        let props = &schema["properties"];
205        assert_eq!(props["id"]["type"], "integer");
206        assert_eq!(props["name"]["type"], "string");
207        assert_eq!(props["score"]["type"], "number");
208        assert_eq!(props["active"]["type"], "boolean");
209    }
210
211    #[test]
212    fn test_infer_schema_nullable_absent_field() {
213        let records = vec![json!({"id": 1, "email": "a@example.com"}), json!({"id": 2})];
214        let schema = infer_schema(&records);
215        let props = &schema["properties"];
216        assert_eq!(props["id"]["type"], "integer");
217        // email is absent in second record → nullable
218        let email_type = &props["email"]["type"];
219        assert!(
220            email_type == &json!(["null", "string"]) || email_type == &json!(["string", "null"]),
221            "expected nullable string, got {email_type}"
222        );
223    }
224
225    #[test]
226    fn test_infer_schema_explicit_null_value() {
227        let records = vec![json!({"tag": "foo"}), json!({"tag": null})];
228        let schema = infer_schema(&records);
229        let tag_type = &schema["properties"]["tag"]["type"];
230        assert!(
231            tag_type == &json!(["null", "string"]) || tag_type == &json!(["string", "null"]),
232            "expected nullable string, got {tag_type}"
233        );
234    }
235
236    #[test]
237    fn test_infer_schema_integer_widens_to_number() {
238        let records = vec![json!({"val": 42}), json!({"val": 3.15})];
239        let schema = infer_schema(&records);
240        assert_eq!(schema["properties"]["val"]["type"], "number");
241    }
242
243    #[test]
244    fn test_infer_schema_array_field() {
245        let records = vec![json!({"tags": ["rust", "api"]})];
246        let schema = infer_schema(&records);
247        assert_eq!(schema["properties"]["tags"]["type"], "array");
248        assert_eq!(schema["properties"]["tags"]["items"]["type"], "string");
249    }
250
251    #[test]
252    fn test_infer_schema_nested_object() {
253        let records = vec![
254            json!({"address": {"city": "NYC", "zip": "10001"}}),
255            json!({"address": {"city": "LA"}}),
256        ];
257        let schema = infer_schema(&records);
258        let addr = &schema["properties"]["address"];
259        assert_eq!(addr["type"], "object");
260        assert_eq!(addr["properties"]["city"]["type"], "string");
261        // zip absent from second record → nullable
262        let zip_type = &addr["properties"]["zip"]["type"];
263        assert!(
264            zip_type == &json!(["null", "string"]) || zip_type == &json!(["string", "null"]),
265            "expected nullable string, got {zip_type}"
266        );
267    }
268
269    #[test]
270    fn test_infer_schema_empty_records() {
271        let schema = infer_schema(&[]);
272        assert_eq!(schema["type"], "object");
273        assert_eq!(schema["properties"], json!({}));
274    }
275
276    #[test]
277    fn test_infer_schema_skips_non_objects() {
278        // Top-level arrays and primitives are ignored.
279        let records = vec![json!("string"), json!(42), json!({"id": 1})];
280        let schema = infer_schema(&records);
281        assert_eq!(schema["properties"]["id"]["type"], "integer");
282    }
283
284    #[test]
285    fn test_add_null_type_idempotent() {
286        let mut s = json!({"type": ["null", "string"]});
287        add_null_type(&mut s);
288        // Should not duplicate "null".
289        assert_eq!(s["type"], json!(["null", "string"]));
290    }
291
292    #[test]
293    fn test_merge_schemas_object_merges_properties() {
294        let a = json!({"type": "object", "properties": {"x": {"type": "integer"}}});
295        let b = json!({"type": "object", "properties": {"y": {"type": "string"}}});
296        let merged = merge_schemas(a, b);
297        assert_eq!(merged["type"], "object");
298        // x is absent from b → nullable in merged
299        let x_type = &merged["properties"]["x"]["type"];
300        assert!(
301            x_type == &json!(["integer", "null"]) || x_type == &json!(["null", "integer"]),
302            "got {x_type}"
303        );
304        // y is absent from a → nullable in merged
305        let y_type = &merged["properties"]["y"]["type"];
306        assert!(
307            y_type == &json!(["null", "string"]) || y_type == &json!(["string", "null"]),
308            "got {y_type}"
309        );
310    }
311
312    #[test]
313    fn test_merge_schemas_array_items_merged() {
314        let a = json!({"type": "array", "items": {"type": "integer"}});
315        let b = json!({"type": "array", "items": {"type": "string"}});
316        let merged = merge_schemas(a, b);
317        assert_eq!(merged["type"], "array");
318        let items_type = &merged["items"]["type"];
319        assert!(
320            items_type == &json!(["integer", "string"])
321                || items_type == &json!(["string", "integer"]),
322            "got {items_type}"
323        );
324    }
325}