Skip to main content

clickhouse_kit/
flatten.rs

1//! Flatten + coerce — where arbitrary input meets a table.
2//!
3//! A schema is safe-by-construction (see [`crate::table`]), but the rows still
4//! arrive as untrusted JSON whose shape rarely lines up with the columns. This
5//! module bridges the two: [`flatten_record`] turns a nested document into a
6//! bounded flat string map, and [`coerce_to_table`] routes a document's keys to
7//! their matching columns, sweeping everything else into the `attrs` catch-all
8//! (and the verbatim payload into `raw`). Neither does type validation — that
9//! stays with the schema layer; this only decides *where* each value lands.
10
11use crate::safety::DEFAULT_RESERVED_COLUMNS;
12use crate::table::TableSpec;
13use serde_json::Value;
14use std::collections::{BTreeMap, HashSet};
15
16/// Bounds for [`flatten_record`]. The caps keep an adversarial document from
17/// producing an unbounded key explosion.
18#[derive(Debug, Clone)]
19pub struct FlattenOptions {
20    /// Maximum object nesting to descend into; deeper objects are kept whole as
21    /// a JSON-stringified leaf rather than recursed.
22    pub max_depth: usize,
23    /// Hard ceiling on the number of flattened keys produced.
24    pub max_keys: usize,
25    /// Separator joining a nested path into a dotted key (e.g. `a.b.c`).
26    pub delimiter: String,
27}
28
29impl Default for FlattenOptions {
30    fn default() -> Self {
31        Self {
32            max_depth: 8,
33            max_keys: 1024,
34            delimiter: ".".to_string(),
35        }
36    }
37}
38
39/// Stringify a non-recursable JSON value: strings pass through unquoted, every
40/// other shape (numbers, bools, null, arrays, and depth-capped objects) is its
41/// compact JSON form.
42fn stringify_leaf(value: &Value) -> String {
43    match value {
44        Value::String(s) => s.clone(),
45        other => other.to_string(),
46    }
47}
48
49fn flatten_into(
50    prefix: &str,
51    value: &Value,
52    depth: usize,
53    opts: &FlattenOptions,
54    out: &mut BTreeMap<String, String>,
55) {
56    if out.len() >= opts.max_keys {
57        return;
58    }
59    match value {
60        // Descend into objects until the depth cap; arrays are never recursed.
61        Value::Object(map) if depth < opts.max_depth => {
62            for (k, v) in map {
63                if out.len() >= opts.max_keys {
64                    break;
65                }
66                let key = if prefix.is_empty() {
67                    k.clone()
68                } else {
69                    format!("{prefix}{}{k}", opts.delimiter)
70                };
71                flatten_into(&key, v, depth + 1, opts, out);
72            }
73        }
74        // Leaf: a primitive, an array, or an object at/over the depth cap. A
75        // top-level leaf (empty prefix) has no key to live under, so it is dropped.
76        _ => {
77            if !prefix.is_empty() {
78                out.insert(prefix.to_string(), stringify_leaf(value));
79            }
80        }
81    }
82}
83
84/// Flatten a JSON document into a bounded `dotted.key -> string` map.
85///
86/// Nested objects become dotted keys; arrays are JSON-stringified (not recursed);
87/// primitives are stringified. Recursion stops at `max_depth` (deeper objects are
88/// kept whole as a JSON string) and the result never exceeds `max_keys` entries.
89/// Pure — no allocation beyond the returned map.
90pub fn flatten_record(value: &Value, opts: &FlattenOptions) -> BTreeMap<String, String> {
91    let mut out = BTreeMap::new();
92    flatten_into("", value, 0, opts, &mut out);
93    out
94}
95
96/// The outcome of coercing a document onto a [`TableSpec`].
97#[derive(Debug, Clone)]
98pub struct CoerceResult {
99    /// Column-name → value, ready to bind. Matched keys pass through verbatim;
100    /// `attrs`/`raw` (when present on the table) hold the catch-all/payload.
101    pub row: BTreeMap<String, Value>,
102    /// Input keys that matched no column and were swept into `attrs`.
103    pub overflow_keys: Vec<String>,
104}
105
106/// Coerce an untrusted document onto a table's columns.
107///
108/// Each input key whose name matches a (non-reserved) column is placed in `row`
109/// under that column, value passed through untouched — no type validation here.
110/// Every unmatched key is recorded in `overflow_keys` and, if the table has an
111/// `attrs` catch-all column, swept into it as a flattened string map (stored as a
112/// JSON object `Value`). If the table has a `raw` String column, the verbatim
113/// document is stored there as a JSON string.
114pub fn coerce_to_table(input: Value, table: &TableSpec, opts: &FlattenOptions) -> CoerceResult {
115    // `attrs`/`raw` are managed columns — input keys never land in them directly.
116    let reserved: HashSet<&str> = DEFAULT_RESERVED_COLUMNS.iter().copied().collect();
117    let matchable: HashSet<&str> = table
118        .columns
119        .iter()
120        .map(|c| c.name.as_str())
121        .filter(|n| !reserved.contains(n))
122        .collect();
123    let has_attrs = table.columns.iter().any(|c| c.name == "attrs");
124    let has_raw = table.columns.iter().any(|c| c.name == "raw");
125
126    let mut row: BTreeMap<String, Value> = BTreeMap::new();
127    let mut overflow_keys: Vec<String> = Vec::new();
128
129    if has_raw {
130        row.insert("raw".to_string(), Value::String(input.to_string()));
131    }
132
133    if let Value::Object(map) = input {
134        let mut leftover = serde_json::Map::new();
135        for (k, v) in map {
136            if matchable.contains(k.as_str()) {
137                row.insert(k, v);
138            } else {
139                overflow_keys.push(k.clone());
140                leftover.insert(k, v);
141            }
142        }
143        if has_attrs && !leftover.is_empty() {
144            let flat = flatten_record(&Value::Object(leftover), opts);
145            let attrs: serde_json::Map<String, Value> = flat
146                .into_iter()
147                .map(|(k, v)| (k, Value::String(v)))
148                .collect();
149            row.insert("attrs".to_string(), Value::Object(attrs));
150        }
151    }
152
153    CoerceResult { row, overflow_keys }
154}
155
156#[cfg(test)]
157mod tests {
158    use super::*;
159    use crate::safety::{ColumnTypeSpec, ScalarType, StringOnly};
160    use crate::table::{ColumnSpec, TableSpec};
161    use serde_json::json;
162
163    fn col(name: &str, type_spec: ColumnTypeSpec) -> ColumnSpec {
164        ColumnSpec {
165            name: name.into(),
166            type_spec,
167            default: None,
168        }
169    }
170
171    /// Table with two real columns plus the `attrs` Map catch-all and `raw` String.
172    fn fixture() -> TableSpec {
173        TableSpec {
174            name: "events".into(),
175            columns: vec![
176                col("id", ColumnTypeSpec::Scalar(ScalarType::Uuid)),
177                col("name", ColumnTypeSpec::Scalar(ScalarType::String)),
178                col(
179                    "attrs",
180                    ColumnTypeSpec::Map {
181                        map: (StringOnly::String, StringOnly::String),
182                    },
183                ),
184                col("raw", ColumnTypeSpec::Scalar(ScalarType::String)),
185            ],
186            engine: "MergeTree()".into(),
187            order_by: vec!["id".into()],
188            partition_by: None,
189            ttl: None,
190            indexes: vec![],
191            settings: vec![],
192        }
193    }
194
195    #[test]
196    fn flattens_nested_objects_to_dotted_keys() {
197        let v = json!({ "a": { "b": { "c": 1 } }, "d": "x" });
198        let flat = flatten_record(&v, &FlattenOptions::default());
199        assert_eq!(flat.get("a.b.c").map(String::as_str), Some("1"));
200        assert_eq!(flat.get("d").map(String::as_str), Some("x"));
201    }
202
203    #[test]
204    fn stringifies_arrays_and_primitives_without_recursing() {
205        let v = json!({
206            "tags": ["x", "y"],
207            "n": 42,
208            "f": 1.5,
209            "b": true,
210            "z": null,
211            "s": "hello",
212        });
213        let flat = flatten_record(&v, &FlattenOptions::default());
214        // Array kept whole as a JSON string, not flattened into tags.0 / tags.1.
215        assert_eq!(flat.get("tags").map(String::as_str), Some(r#"["x","y"]"#));
216        assert!(!flat.keys().any(|k| k.starts_with("tags.")));
217        // Strings pass through unquoted; everything else is its JSON form.
218        assert_eq!(flat.get("s").map(String::as_str), Some("hello"));
219        assert_eq!(flat.get("n").map(String::as_str), Some("42"));
220        assert_eq!(flat.get("f").map(String::as_str), Some("1.5"));
221        assert_eq!(flat.get("b").map(String::as_str), Some("true"));
222        assert_eq!(flat.get("z").map(String::as_str), Some("null"));
223    }
224
225    #[test]
226    fn enforces_depth_cap() {
227        let v = json!({ "a": { "b": { "c": 1 } } });
228        let opts = FlattenOptions {
229            max_depth: 2,
230            ..FlattenOptions::default()
231        };
232        let flat = flatten_record(&v, &opts);
233        // Recurses a -> b, then keeps the depth-capped object whole.
234        assert_eq!(flat.get("a.b").map(String::as_str), Some(r#"{"c":1}"#));
235        assert!(!flat.keys().any(|k| k == "a.b.c"));
236    }
237
238    #[test]
239    fn enforces_key_cap() {
240        let v = json!({ "a": 1, "b": 2, "c": 3, "d": 4 });
241        let opts = FlattenOptions {
242            max_keys: 2,
243            ..FlattenOptions::default()
244        };
245        let flat = flatten_record(&v, &opts);
246        assert_eq!(flat.len(), 2);
247    }
248
249    #[test]
250    fn honors_custom_delimiter() {
251        let v = json!({ "a": { "b": 1 } });
252        let opts = FlattenOptions {
253            delimiter: "__".to_string(),
254            ..FlattenOptions::default()
255        };
256        let flat = flatten_record(&v, &opts);
257        assert_eq!(flat.get("a__b").map(String::as_str), Some("1"));
258    }
259
260    #[test]
261    fn coerce_routes_known_keys_to_columns() {
262        let input = json!({ "id": "abc", "name": "widget" });
263        let res = coerce_to_table(input, &fixture(), &FlattenOptions::default());
264        assert_eq!(res.row.get("id"), Some(&json!("abc")));
265        assert_eq!(res.row.get("name"), Some(&json!("widget")));
266        assert!(res.overflow_keys.is_empty());
267        // No unmatched keys → no attrs entry.
268        assert!(!res.row.contains_key("attrs"));
269    }
270
271    #[test]
272    fn coerce_sweeps_unknown_keys_into_attrs() {
273        let input = json!({
274            "id": "abc",
275            "extra": { "nested": "v" },
276            "color": "blue",
277        });
278        let res = coerce_to_table(input, &fixture(), &FlattenOptions::default());
279
280        // Known key passed through.
281        assert_eq!(res.row.get("id"), Some(&json!("abc")));
282
283        // Unknown keys recorded as overflow.
284        let mut overflow = res.overflow_keys.clone();
285        overflow.sort();
286        assert_eq!(overflow, vec!["color".to_string(), "extra".to_string()]);
287
288        // attrs holds the flattened leftover as a JSON object of strings.
289        let attrs = res.row.get("attrs").expect("attrs populated");
290        assert_eq!(attrs, &json!({ "extra.nested": "v", "color": "blue" }));
291    }
292
293    #[test]
294    fn coerce_sets_raw_payload() {
295        let input = json!({ "id": "abc", "color": "blue" });
296        let res = coerce_to_table(input.clone(), &fixture(), &FlattenOptions::default());
297        assert_eq!(res.row.get("raw"), Some(&Value::String(input.to_string())));
298    }
299
300    #[test]
301    fn coerce_without_catch_all_columns_drops_overflow_to_keys_only() {
302        let table = TableSpec {
303            name: "plain".into(),
304            columns: vec![col("id", ColumnTypeSpec::Scalar(ScalarType::String))],
305            engine: "MergeTree()".into(),
306            order_by: vec!["id".into()],
307            partition_by: None,
308            ttl: None,
309            indexes: vec![],
310            settings: vec![],
311        };
312        let input = json!({ "id": "abc", "extra": "x" });
313        let res = coerce_to_table(input, &table, &FlattenOptions::default());
314        assert_eq!(res.row.get("id"), Some(&json!("abc")));
315        assert_eq!(res.overflow_keys, vec!["extra".to_string()]);
316        // No attrs/raw columns → nothing extra in the row.
317        assert!(!res.row.contains_key("attrs"));
318        assert!(!res.row.contains_key("raw"));
319        assert_eq!(res.row.len(), 1);
320    }
321}