Skip to main content

laminar_sql/datafusion/
json_extensions.rs

1//! LaminarDB JSON extension UDFs (F-SCHEMA-013).
2//!
3//! Streaming-specific JSON transformation functions that extend beyond
4//! PostgreSQL standard:
5//!
6//! - **Merge**: `jsonb_merge`, `jsonb_deep_merge`
7//! - **Cleanup**: `jsonb_strip_nulls`
8//! - **Key ops**: `jsonb_rename_keys`, `jsonb_pick`, `jsonb_except`
9//! - **Flatten**: `jsonb_flatten`, `jsonb_unflatten`
10//! - **Schema**: `json_to_columns`, `json_infer_schema`
11
12use std::any::Any;
13use std::collections::HashSet;
14use std::hash::{Hash, Hasher};
15use std::sync::Arc;
16
17use arrow::datatypes::DataType;
18use arrow_array::{
19    builder::{LargeBinaryBuilder, StringBuilder},
20    Array, LargeBinaryArray, ListArray, MapArray, StringArray,
21};
22use datafusion_common::Result;
23use datafusion_expr::{
24    ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility,
25};
26
27use super::json_types;
28use super::json_udf::expand_args;
29
30/// Maximum recursion depth for deep merge / flatten.
31const MAX_DEPTH: usize = 64;
32
33// ══════════════════════════════════════════════════════════════════
34// jsonb_merge(jsonb, jsonb) -> jsonb — shallow merge
35// ══════════════════════════════════════════════════════════════════
36
37/// `jsonb_merge(jsonb, jsonb) -> jsonb`
38///
39/// Shallow-merges two JSONB objects. Keys from the second argument
40/// overwrite keys in the first. Non-object inputs: returns second arg.
41#[derive(Debug)]
42pub struct JsonbMerge {
43    signature: Signature,
44}
45
46impl JsonbMerge {
47    /// Creates a new `jsonb_merge` UDF.
48    #[must_use]
49    pub fn new() -> Self {
50        Self {
51            signature: Signature::new(
52                TypeSignature::Exact(vec![DataType::LargeBinary, DataType::LargeBinary]),
53                Volatility::Immutable,
54            ),
55        }
56    }
57}
58
59impl Default for JsonbMerge {
60    fn default() -> Self {
61        Self::new()
62    }
63}
64
65impl PartialEq for JsonbMerge {
66    fn eq(&self, _other: &Self) -> bool {
67        true
68    }
69}
70
71impl Eq for JsonbMerge {}
72
73impl Hash for JsonbMerge {
74    fn hash<H: Hasher>(&self, state: &mut H) {
75        "jsonb_merge".hash(state);
76    }
77}
78
79impl ScalarUDFImpl for JsonbMerge {
80    fn as_any(&self) -> &dyn Any {
81        self
82    }
83
84    fn name(&self) -> &'static str {
85        "jsonb_merge"
86    }
87
88    fn signature(&self) -> &Signature {
89        &self.signature
90    }
91
92    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
93        Ok(DataType::LargeBinary)
94    }
95
96    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
97        let expanded = expand_args(&args.args)?;
98        let left_arr = expanded[0]
99            .as_any()
100            .downcast_ref::<LargeBinaryArray>()
101            .ok_or_else(|| {
102                datafusion_common::DataFusionError::Internal(
103                    "jsonb_merge: first arg must be LargeBinary".into(),
104                )
105            })?;
106        let right_arr = expanded[1]
107            .as_any()
108            .downcast_ref::<LargeBinaryArray>()
109            .ok_or_else(|| {
110                datafusion_common::DataFusionError::Internal(
111                    "jsonb_merge: second arg must be LargeBinary".into(),
112                )
113            })?;
114
115        let mut builder = LargeBinaryBuilder::with_capacity(left_arr.len(), 256);
116        for i in 0..left_arr.len() {
117            if left_arr.is_null(i) || right_arr.is_null(i) {
118                builder.append_null();
119            } else {
120                let left_val = json_types::jsonb_to_value(left_arr.value(i));
121                let right_val = json_types::jsonb_to_value(right_arr.value(i));
122                match (left_val, right_val) {
123                    (
124                        Some(serde_json::Value::Object(mut l)),
125                        Some(serde_json::Value::Object(r)),
126                    ) => {
127                        for (k, v) in r {
128                            l.insert(k, v);
129                        }
130                        builder
131                            .append_value(json_types::encode_jsonb(&serde_json::Value::Object(l)));
132                    }
133                    (_, Some(r)) => {
134                        builder.append_value(json_types::encode_jsonb(&r));
135                    }
136                    _ => builder.append_null(),
137                }
138            }
139        }
140        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
141    }
142}
143
144// ══════════════════════════════════════════════════════════════════
145// jsonb_deep_merge(jsonb, jsonb) -> jsonb — recursive merge
146// ══════════════════════════════════════════════════════════════════
147
148/// `jsonb_deep_merge(jsonb, jsonb) -> jsonb`
149///
150/// Recursively merges two JSONB objects. When both sides have an object
151/// at the same key, the merge recurses. Otherwise second wins.
152#[derive(Debug)]
153pub struct JsonbDeepMerge {
154    signature: Signature,
155}
156
157impl JsonbDeepMerge {
158    /// Creates a new `jsonb_deep_merge` UDF.
159    #[must_use]
160    pub fn new() -> Self {
161        Self {
162            signature: Signature::new(
163                TypeSignature::Exact(vec![DataType::LargeBinary, DataType::LargeBinary]),
164                Volatility::Immutable,
165            ),
166        }
167    }
168}
169
170impl Default for JsonbDeepMerge {
171    fn default() -> Self {
172        Self::new()
173    }
174}
175
176impl PartialEq for JsonbDeepMerge {
177    fn eq(&self, _other: &Self) -> bool {
178        true
179    }
180}
181
182impl Eq for JsonbDeepMerge {}
183
184impl Hash for JsonbDeepMerge {
185    fn hash<H: Hasher>(&self, state: &mut H) {
186        "jsonb_deep_merge".hash(state);
187    }
188}
189
190fn deep_merge(
191    left: serde_json::Value,
192    right: serde_json::Value,
193    depth: usize,
194) -> std::result::Result<serde_json::Value, String> {
195    if depth > MAX_DEPTH {
196        return Err("jsonb_deep_merge: max depth exceeded".into());
197    }
198    match (left, right) {
199        (serde_json::Value::Object(mut l), serde_json::Value::Object(r)) => {
200            for (k, rv) in r {
201                let merged = if let Some(lv) = l.remove(&k) {
202                    deep_merge(lv, rv, depth + 1)?
203                } else {
204                    rv
205                };
206                l.insert(k, merged);
207            }
208            Ok(serde_json::Value::Object(l))
209        }
210        (_, r) => Ok(r),
211    }
212}
213
214impl ScalarUDFImpl for JsonbDeepMerge {
215    fn as_any(&self) -> &dyn Any {
216        self
217    }
218
219    fn name(&self) -> &'static str {
220        "jsonb_deep_merge"
221    }
222
223    fn signature(&self) -> &Signature {
224        &self.signature
225    }
226
227    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
228        Ok(DataType::LargeBinary)
229    }
230
231    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
232        let expanded = expand_args(&args.args)?;
233        let left_arr = expanded[0]
234            .as_any()
235            .downcast_ref::<LargeBinaryArray>()
236            .ok_or_else(|| {
237                datafusion_common::DataFusionError::Internal(
238                    "jsonb_deep_merge: first arg must be LargeBinary".into(),
239                )
240            })?;
241        let right_arr = expanded[1]
242            .as_any()
243            .downcast_ref::<LargeBinaryArray>()
244            .ok_or_else(|| {
245                datafusion_common::DataFusionError::Internal(
246                    "jsonb_deep_merge: second arg must be LargeBinary".into(),
247                )
248            })?;
249
250        let mut builder = LargeBinaryBuilder::with_capacity(left_arr.len(), 256);
251        for i in 0..left_arr.len() {
252            if left_arr.is_null(i) || right_arr.is_null(i) {
253                builder.append_null();
254            } else {
255                let left_val = json_types::jsonb_to_value(left_arr.value(i));
256                let right_val = json_types::jsonb_to_value(right_arr.value(i));
257                match (left_val, right_val) {
258                    (Some(l), Some(r)) => {
259                        let merged = deep_merge(l, r, 0)
260                            .map_err(datafusion_common::DataFusionError::Execution)?;
261                        builder.append_value(json_types::encode_jsonb(&merged));
262                    }
263                    _ => builder.append_null(),
264                }
265            }
266        }
267        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
268    }
269}
270
271// ══════════════════════════════════════════════════════════════════
272// jsonb_strip_nulls(jsonb) -> jsonb
273// ══════════════════════════════════════════════════════════════════
274
275/// `jsonb_strip_nulls(jsonb) -> jsonb`
276///
277/// Recursively removes null-valued object fields. Array elements
278/// are recursed into but null elements are preserved (PostgreSQL semantics).
279#[derive(Debug)]
280pub struct JsonbStripNulls {
281    signature: Signature,
282}
283
284impl JsonbStripNulls {
285    /// Creates a new `jsonb_strip_nulls` UDF.
286    #[must_use]
287    pub fn new() -> Self {
288        Self {
289            signature: Signature::new(
290                TypeSignature::Exact(vec![DataType::LargeBinary]),
291                Volatility::Immutable,
292            ),
293        }
294    }
295}
296
297impl Default for JsonbStripNulls {
298    fn default() -> Self {
299        Self::new()
300    }
301}
302
303impl PartialEq for JsonbStripNulls {
304    fn eq(&self, _other: &Self) -> bool {
305        true
306    }
307}
308
309impl Eq for JsonbStripNulls {}
310
311impl Hash for JsonbStripNulls {
312    fn hash<H: Hasher>(&self, state: &mut H) {
313        "jsonb_strip_nulls".hash(state);
314    }
315}
316
317fn strip_nulls(val: serde_json::Value) -> serde_json::Value {
318    match val {
319        serde_json::Value::Object(obj) => {
320            let filtered: serde_json::Map<String, serde_json::Value> = obj
321                .into_iter()
322                .filter(|(_, v)| !v.is_null())
323                .map(|(k, v)| (k, strip_nulls(v)))
324                .collect();
325            serde_json::Value::Object(filtered)
326        }
327        serde_json::Value::Array(arr) => {
328            serde_json::Value::Array(arr.into_iter().map(strip_nulls).collect())
329        }
330        other => other,
331    }
332}
333
334impl ScalarUDFImpl for JsonbStripNulls {
335    fn as_any(&self) -> &dyn Any {
336        self
337    }
338
339    fn name(&self) -> &'static str {
340        "jsonb_strip_nulls"
341    }
342
343    fn signature(&self) -> &Signature {
344        &self.signature
345    }
346
347    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
348        Ok(DataType::LargeBinary)
349    }
350
351    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
352        let expanded = expand_args(&args.args)?;
353        let jsonb_arr = expanded[0]
354            .as_any()
355            .downcast_ref::<LargeBinaryArray>()
356            .ok_or_else(|| {
357                datafusion_common::DataFusionError::Internal(
358                    "jsonb_strip_nulls: arg must be LargeBinary".into(),
359                )
360            })?;
361
362        let mut builder = LargeBinaryBuilder::with_capacity(jsonb_arr.len(), 256);
363        for i in 0..jsonb_arr.len() {
364            if jsonb_arr.is_null(i) {
365                builder.append_null();
366            } else {
367                match json_types::jsonb_to_value(jsonb_arr.value(i)) {
368                    Some(val) => {
369                        builder.append_value(json_types::encode_jsonb(&strip_nulls(val)));
370                    }
371                    None => builder.append_null(),
372                }
373            }
374        }
375        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
376    }
377}
378
379// ══════════════════════════════════════════════════════════════════
380// jsonb_rename_keys(jsonb, map<text,text>) -> jsonb
381// ══════════════════════════════════════════════════════════════════
382
383/// `jsonb_rename_keys(jsonb, map<text,text>) -> jsonb`
384///
385/// Renames top-level object keys according to the given rename map.
386/// Keys not in the map are preserved unchanged.
387#[derive(Debug)]
388pub struct JsonbRenameKeys {
389    signature: Signature,
390}
391
392impl JsonbRenameKeys {
393    /// Creates a new `jsonb_rename_keys` UDF.
394    #[must_use]
395    pub fn new() -> Self {
396        Self {
397            signature: Signature::new(
398                TypeSignature::Exact(vec![
399                    DataType::LargeBinary,
400                    DataType::Map(
401                        Arc::new(arrow_schema::Field::new(
402                            "entries",
403                            DataType::Struct(
404                                vec![
405                                    arrow_schema::Field::new("key", DataType::Utf8, false),
406                                    arrow_schema::Field::new("value", DataType::Utf8, true),
407                                ]
408                                .into(),
409                            ),
410                            false,
411                        )),
412                        false,
413                    ),
414                ]),
415                Volatility::Immutable,
416            ),
417        }
418    }
419}
420
421impl Default for JsonbRenameKeys {
422    fn default() -> Self {
423        Self::new()
424    }
425}
426
427impl PartialEq for JsonbRenameKeys {
428    fn eq(&self, _other: &Self) -> bool {
429        true
430    }
431}
432
433impl Eq for JsonbRenameKeys {}
434
435impl Hash for JsonbRenameKeys {
436    fn hash<H: Hasher>(&self, state: &mut H) {
437        "jsonb_rename_keys".hash(state);
438    }
439}
440
441impl ScalarUDFImpl for JsonbRenameKeys {
442    fn as_any(&self) -> &dyn Any {
443        self
444    }
445
446    fn name(&self) -> &'static str {
447        "jsonb_rename_keys"
448    }
449
450    fn signature(&self) -> &Signature {
451        &self.signature
452    }
453
454    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
455        Ok(DataType::LargeBinary)
456    }
457
458    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
459        let expanded = expand_args(&args.args)?;
460        let jsonb_arr = expanded[0]
461            .as_any()
462            .downcast_ref::<LargeBinaryArray>()
463            .ok_or_else(|| {
464                datafusion_common::DataFusionError::Internal(
465                    "jsonb_rename_keys: first arg must be LargeBinary".into(),
466                )
467            })?;
468        let map_arr = expanded[1]
469            .as_any()
470            .downcast_ref::<MapArray>()
471            .ok_or_else(|| {
472                datafusion_common::DataFusionError::Internal(
473                    "jsonb_rename_keys: second arg must be Map<Utf8,Utf8>".into(),
474                )
475            })?;
476
477        let mut builder = LargeBinaryBuilder::with_capacity(jsonb_arr.len(), 256);
478        for i in 0..jsonb_arr.len() {
479            if jsonb_arr.is_null(i) || map_arr.is_null(i) {
480                builder.append_null();
481            } else {
482                let rename_map = extract_string_map(map_arr, i);
483                match json_types::jsonb_to_value(jsonb_arr.value(i)) {
484                    Some(serde_json::Value::Object(obj)) => {
485                        let renamed: serde_json::Map<String, serde_json::Value> = obj
486                            .into_iter()
487                            .map(|(k, v)| {
488                                let new_key = rename_map.get(k.as_str()).cloned().unwrap_or(k);
489                                (new_key, v)
490                            })
491                            .collect();
492                        builder.append_value(json_types::encode_jsonb(&serde_json::Value::Object(
493                            renamed,
494                        )));
495                    }
496                    Some(other) => {
497                        builder.append_value(json_types::encode_jsonb(&other));
498                    }
499                    None => builder.append_null(),
500                }
501            }
502        }
503        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
504    }
505}
506
507/// Extract a `HashMap<String, String>` from a `MapArray` at row `i`.
508fn extract_string_map(map_arr: &MapArray, row: usize) -> std::collections::HashMap<String, String> {
509    let mut result = std::collections::HashMap::new();
510    let entries = map_arr.value(row);
511    let struct_arr = entries
512        .as_any()
513        .downcast_ref::<arrow_array::StructArray>()
514        .unwrap();
515    let keys = struct_arr
516        .column(0)
517        .as_any()
518        .downcast_ref::<StringArray>()
519        .unwrap();
520    let vals = struct_arr
521        .column(1)
522        .as_any()
523        .downcast_ref::<StringArray>()
524        .unwrap();
525    for j in 0..keys.len() {
526        if !keys.is_null(j) && !vals.is_null(j) {
527            result.insert(keys.value(j).to_owned(), vals.value(j).to_owned());
528        }
529    }
530    result
531}
532
533// ══════════════════════════════════════════════════════════════════
534// jsonb_pick(jsonb, text[]) -> jsonb
535// ══════════════════════════════════════════════════════════════════
536
537/// `jsonb_pick(jsonb, text[]) -> jsonb`
538///
539/// Returns a new JSONB object containing only the specified keys.
540#[derive(Debug)]
541pub struct JsonbPick {
542    signature: Signature,
543}
544
545impl JsonbPick {
546    /// Creates a new `jsonb_pick` UDF.
547    #[must_use]
548    pub fn new() -> Self {
549        Self {
550            signature: Signature::new(
551                TypeSignature::Exact(vec![
552                    DataType::LargeBinary,
553                    DataType::List(Arc::new(arrow_schema::Field::new(
554                        "item",
555                        DataType::Utf8,
556                        true,
557                    ))),
558                ]),
559                Volatility::Immutable,
560            ),
561        }
562    }
563}
564
565impl Default for JsonbPick {
566    fn default() -> Self {
567        Self::new()
568    }
569}
570
571impl PartialEq for JsonbPick {
572    fn eq(&self, _other: &Self) -> bool {
573        true
574    }
575}
576
577impl Eq for JsonbPick {}
578
579impl Hash for JsonbPick {
580    fn hash<H: Hasher>(&self, state: &mut H) {
581        "jsonb_pick".hash(state);
582    }
583}
584
585impl ScalarUDFImpl for JsonbPick {
586    fn as_any(&self) -> &dyn Any {
587        self
588    }
589
590    fn name(&self) -> &'static str {
591        "jsonb_pick"
592    }
593
594    fn signature(&self) -> &Signature {
595        &self.signature
596    }
597
598    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
599        Ok(DataType::LargeBinary)
600    }
601
602    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
603        let expanded = expand_args(&args.args)?;
604        let jsonb_arr = expanded[0]
605            .as_any()
606            .downcast_ref::<LargeBinaryArray>()
607            .ok_or_else(|| {
608                datafusion_common::DataFusionError::Internal(
609                    "jsonb_pick: first arg must be LargeBinary".into(),
610                )
611            })?;
612        let keys_arr = expanded[1]
613            .as_any()
614            .downcast_ref::<ListArray>()
615            .ok_or_else(|| {
616                datafusion_common::DataFusionError::Internal(
617                    "jsonb_pick: second arg must be List<Utf8>".into(),
618                )
619            })?;
620
621        let mut builder = LargeBinaryBuilder::with_capacity(jsonb_arr.len(), 256);
622        for i in 0..jsonb_arr.len() {
623            if jsonb_arr.is_null(i) || keys_arr.is_null(i) {
624                builder.append_null();
625            } else {
626                let key_set = extract_string_set(keys_arr, i);
627                match json_types::jsonb_to_value(jsonb_arr.value(i)) {
628                    Some(serde_json::Value::Object(obj)) => {
629                        let picked: serde_json::Map<String, serde_json::Value> = obj
630                            .into_iter()
631                            .filter(|(k, _)| key_set.contains(k.as_str()))
632                            .collect();
633                        builder.append_value(json_types::encode_jsonb(&serde_json::Value::Object(
634                            picked,
635                        )));
636                    }
637                    Some(other) => {
638                        builder.append_value(json_types::encode_jsonb(&other));
639                    }
640                    None => builder.append_null(),
641                }
642            }
643        }
644        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
645    }
646}
647
648/// Extract a `HashSet<String>` from a `ListArray<Utf8>` at row `i`.
649fn extract_string_set(list_arr: &ListArray, row: usize) -> HashSet<String> {
650    let values = list_arr.value(row);
651    let str_arr = values.as_any().downcast_ref::<StringArray>();
652    let mut result = HashSet::new();
653    if let Some(arr) = str_arr {
654        for j in 0..arr.len() {
655            if !arr.is_null(j) {
656                result.insert(arr.value(j).to_owned());
657            }
658        }
659    }
660    result
661}
662
663// ══════════════════════════════════════════════════════════════════
664// jsonb_except(jsonb, text[]) -> jsonb
665// ══════════════════════════════════════════════════════════════════
666
667/// `jsonb_except(jsonb, text[]) -> jsonb`
668///
669/// Returns a new JSONB object excluding the specified keys.
670#[derive(Debug)]
671pub struct JsonbExcept {
672    signature: Signature,
673}
674
675impl JsonbExcept {
676    /// Creates a new `jsonb_except` UDF.
677    #[must_use]
678    pub fn new() -> Self {
679        Self {
680            signature: Signature::new(
681                TypeSignature::Exact(vec![
682                    DataType::LargeBinary,
683                    DataType::List(Arc::new(arrow_schema::Field::new(
684                        "item",
685                        DataType::Utf8,
686                        true,
687                    ))),
688                ]),
689                Volatility::Immutable,
690            ),
691        }
692    }
693}
694
695impl Default for JsonbExcept {
696    fn default() -> Self {
697        Self::new()
698    }
699}
700
701impl PartialEq for JsonbExcept {
702    fn eq(&self, _other: &Self) -> bool {
703        true
704    }
705}
706
707impl Eq for JsonbExcept {}
708
709impl Hash for JsonbExcept {
710    fn hash<H: Hasher>(&self, state: &mut H) {
711        "jsonb_except".hash(state);
712    }
713}
714
715impl ScalarUDFImpl for JsonbExcept {
716    fn as_any(&self) -> &dyn Any {
717        self
718    }
719
720    fn name(&self) -> &'static str {
721        "jsonb_except"
722    }
723
724    fn signature(&self) -> &Signature {
725        &self.signature
726    }
727
728    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
729        Ok(DataType::LargeBinary)
730    }
731
732    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
733        let expanded = expand_args(&args.args)?;
734        let jsonb_arr = expanded[0]
735            .as_any()
736            .downcast_ref::<LargeBinaryArray>()
737            .ok_or_else(|| {
738                datafusion_common::DataFusionError::Internal(
739                    "jsonb_except: first arg must be LargeBinary".into(),
740                )
741            })?;
742        let keys_arr = expanded[1]
743            .as_any()
744            .downcast_ref::<ListArray>()
745            .ok_or_else(|| {
746                datafusion_common::DataFusionError::Internal(
747                    "jsonb_except: second arg must be List<Utf8>".into(),
748                )
749            })?;
750
751        let mut builder = LargeBinaryBuilder::with_capacity(jsonb_arr.len(), 256);
752        for i in 0..jsonb_arr.len() {
753            if jsonb_arr.is_null(i) || keys_arr.is_null(i) {
754                builder.append_null();
755            } else {
756                let exclude_set = extract_string_set(keys_arr, i);
757                match json_types::jsonb_to_value(jsonb_arr.value(i)) {
758                    Some(serde_json::Value::Object(obj)) => {
759                        let filtered: serde_json::Map<String, serde_json::Value> = obj
760                            .into_iter()
761                            .filter(|(k, _)| !exclude_set.contains(k.as_str()))
762                            .collect();
763                        builder.append_value(json_types::encode_jsonb(&serde_json::Value::Object(
764                            filtered,
765                        )));
766                    }
767                    Some(other) => {
768                        builder.append_value(json_types::encode_jsonb(&other));
769                    }
770                    None => builder.append_null(),
771                }
772            }
773        }
774        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
775    }
776}
777
778// ══════════════════════════════════════════════════════════════════
779// jsonb_flatten(jsonb, text) -> jsonb
780// ══════════════════════════════════════════════════════════════════
781
782/// `jsonb_flatten(jsonb, separator) -> jsonb`
783///
784/// Flattens a nested JSONB object into a single-level object with
785/// dot-path keys. Arrays are indexed numerically (e.g. `tags.0`).
786#[derive(Debug)]
787pub struct JsonbFlatten {
788    signature: Signature,
789}
790
791impl JsonbFlatten {
792    /// Creates a new `jsonb_flatten` UDF.
793    #[must_use]
794    pub fn new() -> Self {
795        Self {
796            signature: Signature::new(
797                TypeSignature::Exact(vec![DataType::LargeBinary, DataType::Utf8]),
798                Volatility::Immutable,
799            ),
800        }
801    }
802}
803
804impl Default for JsonbFlatten {
805    fn default() -> Self {
806        Self::new()
807    }
808}
809
810impl PartialEq for JsonbFlatten {
811    fn eq(&self, _other: &Self) -> bool {
812        true
813    }
814}
815
816impl Eq for JsonbFlatten {}
817
818impl Hash for JsonbFlatten {
819    fn hash<H: Hasher>(&self, state: &mut H) {
820        "jsonb_flatten".hash(state);
821    }
822}
823
824fn flatten_value(
825    val: &serde_json::Value,
826    prefix: &str,
827    sep: &str,
828    out: &mut serde_json::Map<String, serde_json::Value>,
829    depth: usize,
830) -> std::result::Result<(), String> {
831    if depth > MAX_DEPTH {
832        return Err("jsonb_flatten: max depth exceeded".into());
833    }
834    match val {
835        serde_json::Value::Object(obj) => {
836            for (k, v) in obj {
837                let new_key = if prefix.is_empty() {
838                    k.clone()
839                } else {
840                    format!("{prefix}{sep}{k}")
841                };
842                flatten_value(v, &new_key, sep, out, depth + 1)?;
843            }
844        }
845        serde_json::Value::Array(arr) => {
846            for (idx, v) in arr.iter().enumerate() {
847                let new_key = if prefix.is_empty() {
848                    idx.to_string()
849                } else {
850                    format!("{prefix}{sep}{idx}")
851                };
852                flatten_value(v, &new_key, sep, out, depth + 1)?;
853            }
854        }
855        _ => {
856            out.insert(prefix.to_owned(), val.clone());
857        }
858    }
859    Ok(())
860}
861
862impl ScalarUDFImpl for JsonbFlatten {
863    fn as_any(&self) -> &dyn Any {
864        self
865    }
866
867    fn name(&self) -> &'static str {
868        "jsonb_flatten"
869    }
870
871    fn signature(&self) -> &Signature {
872        &self.signature
873    }
874
875    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
876        Ok(DataType::LargeBinary)
877    }
878
879    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
880        let expanded = expand_args(&args.args)?;
881        let jsonb_arr = expanded[0]
882            .as_any()
883            .downcast_ref::<LargeBinaryArray>()
884            .ok_or_else(|| {
885                datafusion_common::DataFusionError::Internal(
886                    "jsonb_flatten: first arg must be LargeBinary".into(),
887                )
888            })?;
889        let sep_arr = expanded[1]
890            .as_any()
891            .downcast_ref::<StringArray>()
892            .ok_or_else(|| {
893                datafusion_common::DataFusionError::Internal(
894                    "jsonb_flatten: second arg must be Utf8".into(),
895                )
896            })?;
897
898        let mut builder = LargeBinaryBuilder::with_capacity(jsonb_arr.len(), 256);
899        for i in 0..jsonb_arr.len() {
900            if jsonb_arr.is_null(i) || sep_arr.is_null(i) {
901                builder.append_null();
902            } else {
903                let sep = sep_arr.value(i);
904                match json_types::jsonb_to_value(jsonb_arr.value(i)) {
905                    Some(val) => {
906                        let mut flat = serde_json::Map::new();
907                        flatten_value(&val, "", sep, &mut flat, 0)
908                            .map_err(datafusion_common::DataFusionError::Execution)?;
909                        builder.append_value(json_types::encode_jsonb(&serde_json::Value::Object(
910                            flat,
911                        )));
912                    }
913                    None => builder.append_null(),
914                }
915            }
916        }
917        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
918    }
919}
920
921// ══════════════════════════════════════════════════════════════════
922// jsonb_unflatten(jsonb, text) -> jsonb
923// ══════════════════════════════════════════════════════════════════
924
925/// `jsonb_unflatten(jsonb, separator) -> jsonb`
926///
927/// Rebuilds a nested JSONB object from a flat key-value structure.
928/// Keys are split by separator and nested accordingly. Numeric keys
929/// stay as object keys (not converted to arrays).
930#[derive(Debug)]
931pub struct JsonbUnflatten {
932    signature: Signature,
933}
934
935impl JsonbUnflatten {
936    /// Creates a new `jsonb_unflatten` UDF.
937    #[must_use]
938    pub fn new() -> Self {
939        Self {
940            signature: Signature::new(
941                TypeSignature::Exact(vec![DataType::LargeBinary, DataType::Utf8]),
942                Volatility::Immutable,
943            ),
944        }
945    }
946}
947
948impl Default for JsonbUnflatten {
949    fn default() -> Self {
950        Self::new()
951    }
952}
953
954impl PartialEq for JsonbUnflatten {
955    fn eq(&self, _other: &Self) -> bool {
956        true
957    }
958}
959
960impl Eq for JsonbUnflatten {}
961
962impl Hash for JsonbUnflatten {
963    fn hash<H: Hasher>(&self, state: &mut H) {
964        "jsonb_unflatten".hash(state);
965    }
966}
967
968fn unflatten_insert(root: &mut serde_json::Value, parts: &[&str], value: serde_json::Value) {
969    if parts.is_empty() {
970        return;
971    }
972    if parts.len() == 1 {
973        if let serde_json::Value::Object(obj) = root {
974            obj.insert(parts[0].to_owned(), value);
975        }
976        return;
977    }
978    if let serde_json::Value::Object(obj) = root {
979        let child = obj
980            .entry(parts[0].to_owned())
981            .or_insert_with(|| serde_json::Value::Object(serde_json::Map::new()));
982        unflatten_insert(child, &parts[1..], value);
983    }
984}
985
986impl ScalarUDFImpl for JsonbUnflatten {
987    fn as_any(&self) -> &dyn Any {
988        self
989    }
990
991    fn name(&self) -> &'static str {
992        "jsonb_unflatten"
993    }
994
995    fn signature(&self) -> &Signature {
996        &self.signature
997    }
998
999    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
1000        Ok(DataType::LargeBinary)
1001    }
1002
1003    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
1004        let expanded = expand_args(&args.args)?;
1005        let jsonb_arr = expanded[0]
1006            .as_any()
1007            .downcast_ref::<LargeBinaryArray>()
1008            .ok_or_else(|| {
1009                datafusion_common::DataFusionError::Internal(
1010                    "jsonb_unflatten: first arg must be LargeBinary".into(),
1011                )
1012            })?;
1013        let sep_arr = expanded[1]
1014            .as_any()
1015            .downcast_ref::<StringArray>()
1016            .ok_or_else(|| {
1017                datafusion_common::DataFusionError::Internal(
1018                    "jsonb_unflatten: second arg must be Utf8".into(),
1019                )
1020            })?;
1021
1022        let mut builder = LargeBinaryBuilder::with_capacity(jsonb_arr.len(), 256);
1023        for i in 0..jsonb_arr.len() {
1024            if jsonb_arr.is_null(i) || sep_arr.is_null(i) {
1025                builder.append_null();
1026            } else {
1027                let sep = sep_arr.value(i);
1028                match json_types::jsonb_to_value(jsonb_arr.value(i)) {
1029                    Some(serde_json::Value::Object(flat)) => {
1030                        let mut root = serde_json::Value::Object(serde_json::Map::new());
1031                        for (key, val) in flat {
1032                            let parts: Vec<&str> = key.split(sep).collect();
1033                            unflatten_insert(&mut root, &parts, val);
1034                        }
1035                        builder.append_value(json_types::encode_jsonb(&root));
1036                    }
1037                    Some(other) => {
1038                        builder.append_value(json_types::encode_jsonb(&other));
1039                    }
1040                    None => builder.append_null(),
1041                }
1042            }
1043        }
1044        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
1045    }
1046}
1047
1048// ══════════════════════════════════════════════════════════════════
1049// json_to_columns(jsonb, text) -> jsonb  (runtime fallback)
1050// ══════════════════════════════════════════════════════════════════
1051
1052/// `json_to_columns(jsonb, type_spec) -> jsonb`
1053///
1054/// Runtime fallback for structured extraction. Parses the type_spec
1055/// to determine field names, extracts each from the JSONB object,
1056/// and returns the result as a new JSONB object containing only
1057/// those fields. Full plan-time struct rewriting is deferred.
1058#[derive(Debug)]
1059pub struct JsonToColumns {
1060    signature: Signature,
1061}
1062
1063impl JsonToColumns {
1064    /// Creates a new `json_to_columns` UDF.
1065    #[must_use]
1066    pub fn new() -> Self {
1067        Self {
1068            signature: Signature::new(
1069                TypeSignature::Exact(vec![DataType::LargeBinary, DataType::Utf8]),
1070                Volatility::Immutable,
1071            ),
1072        }
1073    }
1074}
1075
1076impl Default for JsonToColumns {
1077    fn default() -> Self {
1078        Self::new()
1079    }
1080}
1081
1082impl PartialEq for JsonToColumns {
1083    fn eq(&self, _other: &Self) -> bool {
1084        true
1085    }
1086}
1087
1088impl Eq for JsonToColumns {}
1089
1090impl Hash for JsonToColumns {
1091    fn hash<H: Hasher>(&self, state: &mut H) {
1092        "json_to_columns".hash(state);
1093    }
1094}
1095
1096/// Parse a type_spec like `"name VARCHAR, age BIGINT, active BOOLEAN"`
1097/// into a list of field names.
1098fn parse_type_spec_fields(spec: &str) -> Vec<String> {
1099    spec.split(',')
1100        .filter_map(|part| {
1101            let trimmed = part.trim();
1102            trimmed.split_whitespace().next().map(ToOwned::to_owned)
1103        })
1104        .collect()
1105}
1106
1107impl ScalarUDFImpl for JsonToColumns {
1108    fn as_any(&self) -> &dyn Any {
1109        self
1110    }
1111
1112    fn name(&self) -> &'static str {
1113        "json_to_columns"
1114    }
1115
1116    fn signature(&self) -> &Signature {
1117        &self.signature
1118    }
1119
1120    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
1121        Ok(DataType::LargeBinary)
1122    }
1123
1124    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
1125        let expanded = expand_args(&args.args)?;
1126        let jsonb_arr = expanded[0]
1127            .as_any()
1128            .downcast_ref::<LargeBinaryArray>()
1129            .ok_or_else(|| {
1130                datafusion_common::DataFusionError::Internal(
1131                    "json_to_columns: first arg must be LargeBinary".into(),
1132                )
1133            })?;
1134        let spec_arr = expanded[1]
1135            .as_any()
1136            .downcast_ref::<StringArray>()
1137            .ok_or_else(|| {
1138                datafusion_common::DataFusionError::Internal(
1139                    "json_to_columns: second arg must be Utf8".into(),
1140                )
1141            })?;
1142
1143        let mut builder = LargeBinaryBuilder::with_capacity(jsonb_arr.len(), 256);
1144        for i in 0..jsonb_arr.len() {
1145            if jsonb_arr.is_null(i) || spec_arr.is_null(i) {
1146                builder.append_null();
1147            } else {
1148                let fields = parse_type_spec_fields(spec_arr.value(i));
1149                let field_set: HashSet<&str> = fields.iter().map(String::as_str).collect();
1150                match json_types::jsonb_to_value(jsonb_arr.value(i)) {
1151                    Some(serde_json::Value::Object(obj)) => {
1152                        let picked: serde_json::Map<String, serde_json::Value> = obj
1153                            .into_iter()
1154                            .filter(|(k, _)| field_set.contains(k.as_str()))
1155                            .collect();
1156                        builder.append_value(json_types::encode_jsonb(&serde_json::Value::Object(
1157                            picked,
1158                        )));
1159                    }
1160                    Some(other) => {
1161                        builder.append_value(json_types::encode_jsonb(&other));
1162                    }
1163                    None => builder.append_null(),
1164                }
1165            }
1166        }
1167        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
1168    }
1169}
1170
1171// ══════════════════════════════════════════════════════════════════
1172// json_infer_schema(jsonb) -> text
1173// ══════════════════════════════════════════════════════════════════
1174
1175/// `json_infer_schema(jsonb) -> text`
1176///
1177/// Infers the SQL schema of a JSONB value, returning a JSON object
1178/// mapping field names to SQL type names.
1179#[derive(Debug)]
1180pub struct JsonInferSchema {
1181    signature: Signature,
1182}
1183
1184impl JsonInferSchema {
1185    /// Creates a new `json_infer_schema` UDF.
1186    #[must_use]
1187    pub fn new() -> Self {
1188        Self {
1189            signature: Signature::new(
1190                TypeSignature::Exact(vec![DataType::LargeBinary]),
1191                Volatility::Immutable,
1192            ),
1193        }
1194    }
1195}
1196
1197impl Default for JsonInferSchema {
1198    fn default() -> Self {
1199        Self::new()
1200    }
1201}
1202
1203impl PartialEq for JsonInferSchema {
1204    fn eq(&self, _other: &Self) -> bool {
1205        true
1206    }
1207}
1208
1209impl Eq for JsonInferSchema {}
1210
1211impl Hash for JsonInferSchema {
1212    fn hash<H: Hasher>(&self, state: &mut H) {
1213        "json_infer_schema".hash(state);
1214    }
1215}
1216
1217fn infer_type(val: &serde_json::Value) -> String {
1218    match val {
1219        serde_json::Value::Null => "NULL".to_owned(),
1220        serde_json::Value::Bool(_) => "BOOLEAN".to_owned(),
1221        serde_json::Value::Number(n) => {
1222            if n.is_i64() || n.is_u64() {
1223                "BIGINT".to_owned()
1224            } else {
1225                "DOUBLE".to_owned()
1226            }
1227        }
1228        serde_json::Value::String(_) => "VARCHAR".to_owned(),
1229        serde_json::Value::Array(arr) => {
1230            let inner = arr.first().map_or("NULL".to_owned(), infer_type);
1231            format!("ARRAY<{inner}>")
1232        }
1233        serde_json::Value::Object(obj) => {
1234            let fields: Vec<String> = obj
1235                .iter()
1236                .map(|(k, v)| format!("{k} {}", infer_type(v)))
1237                .collect();
1238            format!("STRUCT({})", fields.join(", "))
1239        }
1240    }
1241}
1242
1243impl ScalarUDFImpl for JsonInferSchema {
1244    fn as_any(&self) -> &dyn Any {
1245        self
1246    }
1247
1248    fn name(&self) -> &'static str {
1249        "json_infer_schema"
1250    }
1251
1252    fn signature(&self) -> &Signature {
1253        &self.signature
1254    }
1255
1256    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
1257        Ok(DataType::Utf8)
1258    }
1259
1260    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
1261        let expanded = expand_args(&args.args)?;
1262        let jsonb_arr = expanded[0]
1263            .as_any()
1264            .downcast_ref::<LargeBinaryArray>()
1265            .ok_or_else(|| {
1266                datafusion_common::DataFusionError::Internal(
1267                    "json_infer_schema: arg must be LargeBinary".into(),
1268                )
1269            })?;
1270
1271        let mut builder = StringBuilder::with_capacity(jsonb_arr.len(), 256);
1272        for i in 0..jsonb_arr.len() {
1273            if jsonb_arr.is_null(i) {
1274                builder.append_null();
1275            } else {
1276                match json_types::jsonb_to_value(jsonb_arr.value(i)) {
1277                    Some(serde_json::Value::Object(obj)) => {
1278                        let schema: serde_json::Map<String, serde_json::Value> = obj
1279                            .iter()
1280                            .map(|(k, v)| (k.clone(), serde_json::Value::String(infer_type(v))))
1281                            .collect();
1282                        builder.append_value(
1283                            serde_json::to_string(&serde_json::Value::Object(schema))
1284                                .unwrap_or_default(),
1285                        );
1286                    }
1287                    Some(val) => {
1288                        builder.append_value(infer_type(&val));
1289                    }
1290                    None => builder.append_null(),
1291                }
1292            }
1293        }
1294        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
1295    }
1296}
1297
1298// ══════════════════════════════════════════════════════════════════
1299// Registration
1300// ══════════════════════════════════════════════════════════════════
1301
1302/// Registers all JSON extension UDFs with the given session context.
1303pub fn register_json_extensions(ctx: &datafusion::prelude::SessionContext) {
1304    use datafusion_expr::ScalarUDF;
1305
1306    ctx.register_udf(ScalarUDF::new_from_impl(JsonbMerge::new()));
1307    ctx.register_udf(ScalarUDF::new_from_impl(JsonbDeepMerge::new()));
1308    ctx.register_udf(ScalarUDF::new_from_impl(JsonbStripNulls::new()));
1309    ctx.register_udf(ScalarUDF::new_from_impl(JsonbRenameKeys::new()));
1310    ctx.register_udf(ScalarUDF::new_from_impl(JsonbPick::new()));
1311    ctx.register_udf(ScalarUDF::new_from_impl(JsonbExcept::new()));
1312    ctx.register_udf(ScalarUDF::new_from_impl(JsonbFlatten::new()));
1313    ctx.register_udf(ScalarUDF::new_from_impl(JsonbUnflatten::new()));
1314    ctx.register_udf(ScalarUDF::new_from_impl(JsonToColumns::new()));
1315    ctx.register_udf(ScalarUDF::new_from_impl(JsonInferSchema::new()));
1316}
1317
1318// ══════════════════════════════════════════════════════════════════
1319// Tests
1320// ══════════════════════════════════════════════════════════════════
1321
1322#[cfg(test)]
1323mod tests {
1324    use super::*;
1325    use arrow_array::builder::{MapBuilder, StringBuilder as MapSB};
1326    use arrow_array::ArrayRef;
1327    use arrow_schema::Field;
1328    use datafusion_common::config::ConfigOptions;
1329    use serde_json::json;
1330
1331    fn enc(v: &serde_json::Value) -> Vec<u8> {
1332        json_types::encode_jsonb(v)
1333    }
1334
1335    fn make_jsonb_array(vals: &[serde_json::Value]) -> LargeBinaryArray {
1336        let encoded: Vec<Vec<u8>> = vals.iter().map(enc).collect();
1337        let refs: Vec<&[u8]> = encoded.iter().map(Vec::as_slice).collect();
1338        LargeBinaryArray::from_iter_values(refs)
1339    }
1340
1341    fn make_args_2(a: ArrayRef, b: ArrayRef) -> ScalarFunctionArgs {
1342        ScalarFunctionArgs {
1343            args: vec![ColumnarValue::Array(a), ColumnarValue::Array(b)],
1344            arg_fields: vec![],
1345            number_rows: 0,
1346            return_field: Arc::new(Field::new("output", DataType::LargeBinary, true)),
1347            config_options: Arc::new(ConfigOptions::default()),
1348        }
1349    }
1350
1351    fn make_args_1(a: ArrayRef) -> ScalarFunctionArgs {
1352        ScalarFunctionArgs {
1353            args: vec![ColumnarValue::Array(a)],
1354            arg_fields: vec![],
1355            number_rows: 0,
1356            return_field: Arc::new(Field::new("output", DataType::LargeBinary, true)),
1357            config_options: Arc::new(ConfigOptions::default()),
1358        }
1359    }
1360
1361    fn decode_jsonb_result(result: ColumnarValue, row: usize) -> serde_json::Value {
1362        let ColumnarValue::Array(arr) = result else {
1363            panic!("expected array")
1364        };
1365        let bin = arr.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
1366        assert!(!bin.is_null(row), "unexpected null at row {row}");
1367        json_types::jsonb_to_value(bin.value(row)).expect("invalid jsonb")
1368    }
1369
1370    fn decode_text_result(result: ColumnarValue, row: usize) -> String {
1371        let ColumnarValue::Array(arr) = result else {
1372            panic!("expected array")
1373        };
1374        let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
1375        str_arr.value(row).to_owned()
1376    }
1377
1378    // ── jsonb_merge tests ─────────────────────────────────────
1379
1380    #[test]
1381    fn test_json_ext_merge_objects() {
1382        let udf = JsonbMerge::new();
1383        let left = make_jsonb_array(&[json!({"a": 1, "b": 2})]);
1384        let right = make_jsonb_array(&[json!({"b": 99, "c": 3})]);
1385        let result = udf
1386            .invoke_with_args(make_args_2(Arc::new(left), Arc::new(right)))
1387            .unwrap();
1388        assert_eq!(
1389            decode_jsonb_result(result, 0),
1390            json!({"a": 1, "b": 99, "c": 3})
1391        );
1392    }
1393
1394    #[test]
1395    fn test_json_ext_merge_non_object() {
1396        let udf = JsonbMerge::new();
1397        let left = make_jsonb_array(&[json!(42)]);
1398        let right = make_jsonb_array(&[json!("hello")]);
1399        let result = udf
1400            .invoke_with_args(make_args_2(Arc::new(left), Arc::new(right)))
1401            .unwrap();
1402        assert_eq!(decode_jsonb_result(result, 0), json!("hello"));
1403    }
1404
1405    // ── jsonb_deep_merge tests ────────────────────────────────
1406
1407    #[test]
1408    fn test_json_ext_deep_merge() {
1409        let udf = JsonbDeepMerge::new();
1410        let left = make_jsonb_array(&[json!({"a": {"x": 1, "y": 2}, "b": 10})]);
1411        let right = make_jsonb_array(&[json!({"a": {"y": 99, "z": 3}, "c": 20})]);
1412        let result = udf
1413            .invoke_with_args(make_args_2(Arc::new(left), Arc::new(right)))
1414            .unwrap();
1415        assert_eq!(
1416            decode_jsonb_result(result, 0),
1417            json!({"a": {"x": 1, "y": 99, "z": 3}, "b": 10, "c": 20})
1418        );
1419    }
1420
1421    #[test]
1422    fn test_json_ext_deep_merge_non_object_override() {
1423        let udf = JsonbDeepMerge::new();
1424        let left = make_jsonb_array(&[json!({"a": {"x": 1}})]);
1425        let right = make_jsonb_array(&[json!({"a": 42})]);
1426        let result = udf
1427            .invoke_with_args(make_args_2(Arc::new(left), Arc::new(right)))
1428            .unwrap();
1429        assert_eq!(decode_jsonb_result(result, 0), json!({"a": 42}));
1430    }
1431
1432    // ── jsonb_strip_nulls tests ───────────────────────────────
1433
1434    #[test]
1435    fn test_json_ext_strip_nulls() {
1436        let udf = JsonbStripNulls::new();
1437        let input = make_jsonb_array(&[json!({"a": 1, "b": null, "c": {"d": null, "e": 2}})]);
1438        let result = udf.invoke_with_args(make_args_1(Arc::new(input))).unwrap();
1439        assert_eq!(
1440            decode_jsonb_result(result, 0),
1441            json!({"a": 1, "c": {"e": 2}})
1442        );
1443    }
1444
1445    #[test]
1446    fn test_json_ext_strip_nulls_array_preserved() {
1447        let udf = JsonbStripNulls::new();
1448        let input = make_jsonb_array(&[json!({"arr": [1, null, 3]})]);
1449        let result = udf.invoke_with_args(make_args_1(Arc::new(input))).unwrap();
1450        assert_eq!(decode_jsonb_result(result, 0), json!({"arr": [1, null, 3]}));
1451    }
1452
1453    // ── jsonb_rename_keys tests ───────────────────────────────
1454
1455    #[test]
1456    fn test_json_ext_rename_keys() {
1457        let udf = JsonbRenameKeys::new();
1458        let input = make_jsonb_array(&[json!({"old_name": 1, "keep": 2})]);
1459
1460        // Build a MapArray with one row: {"old_name": "new_name"}
1461        let key_builder = MapSB::new();
1462        let val_builder = MapSB::new();
1463        let mut map_builder = MapBuilder::new(None, key_builder, val_builder);
1464        map_builder.keys().append_value("old_name");
1465        map_builder.values().append_value("new_name");
1466        map_builder.append(true).unwrap();
1467        let map_arr = map_builder.finish();
1468
1469        let result = udf
1470            .invoke_with_args(make_args_2(Arc::new(input), Arc::new(map_arr)))
1471            .unwrap();
1472        assert_eq!(
1473            decode_jsonb_result(result, 0),
1474            json!({"keep": 2, "new_name": 1})
1475        );
1476    }
1477
1478    // ── jsonb_pick tests ──────────────────────────────────────
1479
1480    #[test]
1481    fn test_json_ext_pick() {
1482        let udf = JsonbPick::new();
1483        let input = make_jsonb_array(&[json!({"a": 1, "b": 2, "c": 3})]);
1484        let keys = make_string_list(&[&["a", "c"]]);
1485        let result = udf
1486            .invoke_with_args(make_args_2(Arc::new(input), Arc::new(keys)))
1487            .unwrap();
1488        assert_eq!(decode_jsonb_result(result, 0), json!({"a": 1, "c": 3}));
1489    }
1490
1491    // ── jsonb_except tests ────────────────────────────────────
1492
1493    #[test]
1494    fn test_json_ext_except() {
1495        let udf = JsonbExcept::new();
1496        let input = make_jsonb_array(&[json!({"a": 1, "b": 2, "c": 3})]);
1497        let keys = make_string_list(&[&["b"]]);
1498        let result = udf
1499            .invoke_with_args(make_args_2(Arc::new(input), Arc::new(keys)))
1500            .unwrap();
1501        assert_eq!(decode_jsonb_result(result, 0), json!({"a": 1, "c": 3}));
1502    }
1503
1504    // ── jsonb_flatten tests ───────────────────────────────────
1505
1506    #[test]
1507    fn test_json_ext_flatten() {
1508        let udf = JsonbFlatten::new();
1509        let input = make_jsonb_array(&[json!({"a": {"b": 1, "c": [2, 3]}})]);
1510        let sep = StringArray::from(vec!["."]);
1511        let result = udf
1512            .invoke_with_args(make_args_2(Arc::new(input), Arc::new(sep)))
1513            .unwrap();
1514        assert_eq!(
1515            decode_jsonb_result(result, 0),
1516            json!({"a.b": 1, "a.c.0": 2, "a.c.1": 3})
1517        );
1518    }
1519
1520    #[test]
1521    fn test_json_ext_flatten_custom_sep() {
1522        let udf = JsonbFlatten::new();
1523        let input = make_jsonb_array(&[json!({"x": {"y": 42}})]);
1524        let sep = StringArray::from(vec!["/"]);
1525        let result = udf
1526            .invoke_with_args(make_args_2(Arc::new(input), Arc::new(sep)))
1527            .unwrap();
1528        assert_eq!(decode_jsonb_result(result, 0), json!({"x/y": 42}));
1529    }
1530
1531    // ── jsonb_unflatten tests ─────────────────────────────────
1532
1533    #[test]
1534    fn test_json_ext_unflatten() {
1535        let udf = JsonbUnflatten::new();
1536        let input = make_jsonb_array(&[json!({"a.b": 1, "a.c": 2, "d": 3})]);
1537        let sep = StringArray::from(vec!["."]);
1538        let result = udf
1539            .invoke_with_args(make_args_2(Arc::new(input), Arc::new(sep)))
1540            .unwrap();
1541        assert_eq!(
1542            decode_jsonb_result(result, 0),
1543            json!({"a": {"b": 1, "c": 2}, "d": 3})
1544        );
1545    }
1546
1547    // ── json_to_columns tests ─────────────────────────────────
1548
1549    #[test]
1550    fn test_json_ext_to_columns() {
1551        let udf = JsonToColumns::new();
1552        let input = make_jsonb_array(&[json!({"name": "Alice", "age": 30, "active": true})]);
1553        let spec = StringArray::from(vec!["name VARCHAR, age BIGINT"]);
1554        let result = udf
1555            .invoke_with_args(make_args_2(Arc::new(input), Arc::new(spec)))
1556            .unwrap();
1557        assert_eq!(
1558            decode_jsonb_result(result, 0),
1559            json!({"age": 30, "name": "Alice"})
1560        );
1561    }
1562
1563    // ── json_infer_schema tests ───────────────────────────────
1564
1565    #[test]
1566    fn test_json_ext_infer_schema_object() {
1567        let udf = JsonInferSchema::new();
1568        let input = make_jsonb_array(&[json!({"name": "Alice", "age": 30, "active": true})]);
1569        let args = ScalarFunctionArgs {
1570            args: vec![ColumnarValue::Array(Arc::new(input))],
1571            arg_fields: vec![],
1572            number_rows: 0,
1573            return_field: Arc::new(Field::new("output", DataType::Utf8, true)),
1574            config_options: Arc::new(ConfigOptions::default()),
1575        };
1576        let result = udf.invoke_with_args(args).unwrap();
1577        let text = decode_text_result(result, 0);
1578        let parsed: serde_json::Value = serde_json::from_str(&text).unwrap();
1579        assert_eq!(parsed["name"], "VARCHAR");
1580        assert_eq!(parsed["age"], "BIGINT");
1581        assert_eq!(parsed["active"], "BOOLEAN");
1582    }
1583
1584    #[test]
1585    fn test_json_ext_infer_schema_nested() {
1586        let udf = JsonInferSchema::new();
1587        let input = make_jsonb_array(&[json!({"tags": [1, 2], "meta": {"x": 1.5}})]);
1588        let args = ScalarFunctionArgs {
1589            args: vec![ColumnarValue::Array(Arc::new(input))],
1590            arg_fields: vec![],
1591            number_rows: 0,
1592            return_field: Arc::new(Field::new("output", DataType::Utf8, true)),
1593            config_options: Arc::new(ConfigOptions::default()),
1594        };
1595        let result = udf.invoke_with_args(args).unwrap();
1596        let text = decode_text_result(result, 0);
1597        let parsed: serde_json::Value = serde_json::from_str(&text).unwrap();
1598        assert_eq!(parsed["tags"], "ARRAY<BIGINT>");
1599        assert_eq!(parsed["meta"], "STRUCT(x DOUBLE)");
1600    }
1601
1602    #[test]
1603    fn test_json_ext_infer_schema_scalar() {
1604        let udf = JsonInferSchema::new();
1605        let input = make_jsonb_array(&[json!(42)]);
1606        let args = ScalarFunctionArgs {
1607            args: vec![ColumnarValue::Array(Arc::new(input))],
1608            arg_fields: vec![],
1609            number_rows: 0,
1610            return_field: Arc::new(Field::new("output", DataType::Utf8, true)),
1611            config_options: Arc::new(ConfigOptions::default()),
1612        };
1613        let result = udf.invoke_with_args(args).unwrap();
1614        assert_eq!(decode_text_result(result, 0), "BIGINT");
1615    }
1616
1617    // ── Registration test ─────────────────────────────────────
1618
1619    #[test]
1620    fn test_json_ext_all_udfs_register() {
1621        use datafusion_expr::ScalarUDF;
1622
1623        let names: Vec<String> = vec![
1624            ScalarUDF::new_from_impl(JsonbMerge::new())
1625                .name()
1626                .to_owned(),
1627            ScalarUDF::new_from_impl(JsonbDeepMerge::new())
1628                .name()
1629                .to_owned(),
1630            ScalarUDF::new_from_impl(JsonbStripNulls::new())
1631                .name()
1632                .to_owned(),
1633            ScalarUDF::new_from_impl(JsonbRenameKeys::new())
1634                .name()
1635                .to_owned(),
1636            ScalarUDF::new_from_impl(JsonbPick::new()).name().to_owned(),
1637            ScalarUDF::new_from_impl(JsonbExcept::new())
1638                .name()
1639                .to_owned(),
1640            ScalarUDF::new_from_impl(JsonbFlatten::new())
1641                .name()
1642                .to_owned(),
1643            ScalarUDF::new_from_impl(JsonbUnflatten::new())
1644                .name()
1645                .to_owned(),
1646            ScalarUDF::new_from_impl(JsonToColumns::new())
1647                .name()
1648                .to_owned(),
1649            ScalarUDF::new_from_impl(JsonInferSchema::new())
1650                .name()
1651                .to_owned(),
1652        ];
1653        for name in &names {
1654            assert!(!name.is_empty(), "UDF has empty name");
1655        }
1656        assert_eq!(names.len(), 10);
1657    }
1658
1659    // ── Null-handling tests ───────────────────────────────────
1660
1661    #[test]
1662    fn test_json_ext_merge_null_input() {
1663        let udf = JsonbMerge::new();
1664        let left = LargeBinaryArray::new_null(1);
1665        let right = make_jsonb_array(&[json!({"a": 1})]);
1666        let result = udf
1667            .invoke_with_args(make_args_2(Arc::new(left), Arc::new(right)))
1668            .unwrap();
1669        let ColumnarValue::Array(arr) = result else {
1670            panic!("expected array")
1671        };
1672        let bin = arr.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
1673        assert!(bin.is_null(0));
1674    }
1675
1676    // ── Helper: build ListArray<Utf8> ─────────────────────────
1677
1678    fn make_string_list(rows: &[&[&str]]) -> ListArray {
1679        use arrow_array::builder::{ListBuilder, StringBuilder};
1680
1681        let mut builder = ListBuilder::new(StringBuilder::new());
1682        for row in rows {
1683            for &s in *row {
1684                builder.values().append_value(s);
1685            }
1686            builder.append(true);
1687        }
1688        builder.finish()
1689    }
1690}