1use crate::safety::DEFAULT_RESERVED_COLUMNS;
12use crate::table::TableSpec;
13use serde_json::Value;
14use std::collections::{BTreeMap, HashSet};
15
16#[derive(Debug, Clone)]
19pub struct FlattenOptions {
20 pub max_depth: usize,
23 pub max_keys: usize,
25 pub delimiter: String,
27}
28
29impl Default for FlattenOptions {
30 fn default() -> Self {
31 Self {
32 max_depth: 8,
33 max_keys: 1024,
34 delimiter: ".".to_string(),
35 }
36 }
37}
38
39fn stringify_leaf(value: &Value) -> String {
43 match value {
44 Value::String(s) => s.clone(),
45 other => other.to_string(),
46 }
47}
48
49fn flatten_into(
50 prefix: &str,
51 value: &Value,
52 depth: usize,
53 opts: &FlattenOptions,
54 out: &mut BTreeMap<String, String>,
55) {
56 if out.len() >= opts.max_keys {
57 return;
58 }
59 match value {
60 Value::Object(map) if depth < opts.max_depth => {
62 for (k, v) in map {
63 if out.len() >= opts.max_keys {
64 break;
65 }
66 let key = if prefix.is_empty() {
67 k.clone()
68 } else {
69 format!("{prefix}{}{k}", opts.delimiter)
70 };
71 flatten_into(&key, v, depth + 1, opts, out);
72 }
73 }
74 _ => {
77 if !prefix.is_empty() {
78 out.insert(prefix.to_string(), stringify_leaf(value));
79 }
80 }
81 }
82}
83
84pub fn flatten_record(value: &Value, opts: &FlattenOptions) -> BTreeMap<String, String> {
91 let mut out = BTreeMap::new();
92 flatten_into("", value, 0, opts, &mut out);
93 out
94}
95
96#[derive(Debug, Clone)]
98pub struct CoerceResult {
99 pub row: BTreeMap<String, Value>,
102 pub overflow_keys: Vec<String>,
104}
105
106pub fn coerce_to_table(input: Value, table: &TableSpec, opts: &FlattenOptions) -> CoerceResult {
115 let reserved: HashSet<&str> = DEFAULT_RESERVED_COLUMNS.iter().copied().collect();
117 let matchable: HashSet<&str> = table
118 .columns
119 .iter()
120 .map(|c| c.name.as_str())
121 .filter(|n| !reserved.contains(n))
122 .collect();
123 let has_attrs = table.columns.iter().any(|c| c.name == "attrs");
124 let has_raw = table.columns.iter().any(|c| c.name == "raw");
125
126 let mut row: BTreeMap<String, Value> = BTreeMap::new();
127 let mut overflow_keys: Vec<String> = Vec::new();
128
129 if has_raw {
130 row.insert("raw".to_string(), Value::String(input.to_string()));
131 }
132
133 if let Value::Object(map) = input {
134 let mut leftover = serde_json::Map::new();
135 for (k, v) in map {
136 if matchable.contains(k.as_str()) {
137 row.insert(k, v);
138 } else {
139 overflow_keys.push(k.clone());
140 leftover.insert(k, v);
141 }
142 }
143 if has_attrs && !leftover.is_empty() {
144 let flat = flatten_record(&Value::Object(leftover), opts);
145 let attrs: serde_json::Map<String, Value> = flat
146 .into_iter()
147 .map(|(k, v)| (k, Value::String(v)))
148 .collect();
149 row.insert("attrs".to_string(), Value::Object(attrs));
150 }
151 }
152
153 CoerceResult { row, overflow_keys }
154}
155
156#[cfg(test)]
157mod tests {
158 use super::*;
159 use crate::safety::{ColumnTypeSpec, ScalarType, StringOnly};
160 use crate::table::{ColumnSpec, TableSpec};
161 use serde_json::json;
162
163 fn col(name: &str, type_spec: ColumnTypeSpec) -> ColumnSpec {
164 ColumnSpec {
165 name: name.into(),
166 type_spec,
167 default: None,
168 }
169 }
170
171 fn fixture() -> TableSpec {
173 TableSpec {
174 name: "events".into(),
175 columns: vec![
176 col("id", ColumnTypeSpec::Scalar(ScalarType::Uuid)),
177 col("name", ColumnTypeSpec::Scalar(ScalarType::String)),
178 col(
179 "attrs",
180 ColumnTypeSpec::Map {
181 map: (StringOnly::String, StringOnly::String),
182 },
183 ),
184 col("raw", ColumnTypeSpec::Scalar(ScalarType::String)),
185 ],
186 engine: "MergeTree()".into(),
187 order_by: vec!["id".into()],
188 partition_by: None,
189 ttl: None,
190 indexes: vec![],
191 settings: vec![],
192 }
193 }
194
195 #[test]
196 fn flattens_nested_objects_to_dotted_keys() {
197 let v = json!({ "a": { "b": { "c": 1 } }, "d": "x" });
198 let flat = flatten_record(&v, &FlattenOptions::default());
199 assert_eq!(flat.get("a.b.c").map(String::as_str), Some("1"));
200 assert_eq!(flat.get("d").map(String::as_str), Some("x"));
201 }
202
203 #[test]
204 fn stringifies_arrays_and_primitives_without_recursing() {
205 let v = json!({
206 "tags": ["x", "y"],
207 "n": 42,
208 "f": 1.5,
209 "b": true,
210 "z": null,
211 "s": "hello",
212 });
213 let flat = flatten_record(&v, &FlattenOptions::default());
214 assert_eq!(flat.get("tags").map(String::as_str), Some(r#"["x","y"]"#));
216 assert!(!flat.keys().any(|k| k.starts_with("tags.")));
217 assert_eq!(flat.get("s").map(String::as_str), Some("hello"));
219 assert_eq!(flat.get("n").map(String::as_str), Some("42"));
220 assert_eq!(flat.get("f").map(String::as_str), Some("1.5"));
221 assert_eq!(flat.get("b").map(String::as_str), Some("true"));
222 assert_eq!(flat.get("z").map(String::as_str), Some("null"));
223 }
224
225 #[test]
226 fn enforces_depth_cap() {
227 let v = json!({ "a": { "b": { "c": 1 } } });
228 let opts = FlattenOptions {
229 max_depth: 2,
230 ..FlattenOptions::default()
231 };
232 let flat = flatten_record(&v, &opts);
233 assert_eq!(flat.get("a.b").map(String::as_str), Some(r#"{"c":1}"#));
235 assert!(!flat.keys().any(|k| k == "a.b.c"));
236 }
237
238 #[test]
239 fn enforces_key_cap() {
240 let v = json!({ "a": 1, "b": 2, "c": 3, "d": 4 });
241 let opts = FlattenOptions {
242 max_keys: 2,
243 ..FlattenOptions::default()
244 };
245 let flat = flatten_record(&v, &opts);
246 assert_eq!(flat.len(), 2);
247 }
248
249 #[test]
250 fn honors_custom_delimiter() {
251 let v = json!({ "a": { "b": 1 } });
252 let opts = FlattenOptions {
253 delimiter: "__".to_string(),
254 ..FlattenOptions::default()
255 };
256 let flat = flatten_record(&v, &opts);
257 assert_eq!(flat.get("a__b").map(String::as_str), Some("1"));
258 }
259
260 #[test]
261 fn coerce_routes_known_keys_to_columns() {
262 let input = json!({ "id": "abc", "name": "widget" });
263 let res = coerce_to_table(input, &fixture(), &FlattenOptions::default());
264 assert_eq!(res.row.get("id"), Some(&json!("abc")));
265 assert_eq!(res.row.get("name"), Some(&json!("widget")));
266 assert!(res.overflow_keys.is_empty());
267 assert!(!res.row.contains_key("attrs"));
269 }
270
271 #[test]
272 fn coerce_sweeps_unknown_keys_into_attrs() {
273 let input = json!({
274 "id": "abc",
275 "extra": { "nested": "v" },
276 "color": "blue",
277 });
278 let res = coerce_to_table(input, &fixture(), &FlattenOptions::default());
279
280 assert_eq!(res.row.get("id"), Some(&json!("abc")));
282
283 let mut overflow = res.overflow_keys.clone();
285 overflow.sort();
286 assert_eq!(overflow, vec!["color".to_string(), "extra".to_string()]);
287
288 let attrs = res.row.get("attrs").expect("attrs populated");
290 assert_eq!(attrs, &json!({ "extra.nested": "v", "color": "blue" }));
291 }
292
293 #[test]
294 fn coerce_sets_raw_payload() {
295 let input = json!({ "id": "abc", "color": "blue" });
296 let res = coerce_to_table(input.clone(), &fixture(), &FlattenOptions::default());
297 assert_eq!(res.row.get("raw"), Some(&Value::String(input.to_string())));
298 }
299
300 #[test]
301 fn coerce_without_catch_all_columns_drops_overflow_to_keys_only() {
302 let table = TableSpec {
303 name: "plain".into(),
304 columns: vec![col("id", ColumnTypeSpec::Scalar(ScalarType::String))],
305 engine: "MergeTree()".into(),
306 order_by: vec!["id".into()],
307 partition_by: None,
308 ttl: None,
309 indexes: vec![],
310 settings: vec![],
311 };
312 let input = json!({ "id": "abc", "extra": "x" });
313 let res = coerce_to_table(input, &table, &FlattenOptions::default());
314 assert_eq!(res.row.get("id"), Some(&json!("abc")));
315 assert_eq!(res.overflow_keys, vec!["extra".to_string()]);
316 assert!(!res.row.contains_key("attrs"));
318 assert!(!res.row.contains_key("raw"));
319 assert_eq!(res.row.len(), 1);
320 }
321}