Skip to main content

laminar_sql/datafusion/
json_extensions.rs

1//! LaminarDB JSON extension UDFs (F-SCHEMA-013).
2//!
3//! Streaming-specific JSON transformation functions that extend beyond
4//! PostgreSQL standard:
5//!
6//! - **Merge**: `jsonb_merge`, `jsonb_deep_merge`
7//! - **Cleanup**: `jsonb_strip_nulls`
8//! - **Key ops**: `jsonb_rename_keys`, `jsonb_pick`, `jsonb_except`
9//! - **Flatten**: `jsonb_flatten`, `jsonb_unflatten`
10//! - **Schema**: `json_to_columns`, `json_infer_schema`
11
12use std::any::Any;
13#[allow(clippy::disallowed_types)] // cold path: DataFusion integration
14use std::collections::HashSet;
15use std::hash::{Hash, Hasher};
16use std::sync::Arc;
17
18use arrow::datatypes::DataType;
19use arrow_array::{
20    builder::{LargeBinaryBuilder, StringBuilder},
21    Array, LargeBinaryArray, ListArray, MapArray, StringArray,
22};
23use datafusion_common::Result;
24use datafusion_expr::{
25    ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility,
26};
27
28use super::json_types;
29use super::json_udf::expand_args;
30
31/// Maximum recursion depth for deep merge / flatten.
32const MAX_DEPTH: usize = 64;
33
34// ══════════════════════════════════════════════════════════════════
35// jsonb_merge(jsonb, jsonb) -> jsonb — shallow merge
36// ══════════════════════════════════════════════════════════════════
37
38/// `jsonb_merge(jsonb, jsonb) -> jsonb`
39///
40/// Shallow-merges two JSONB objects. Keys from the second argument
41/// overwrite keys in the first. Non-object inputs: returns second arg.
42#[derive(Debug)]
43pub struct JsonbMerge {
44    signature: Signature,
45}
46
47impl JsonbMerge {
48    /// Creates a new `jsonb_merge` UDF.
49    #[must_use]
50    pub fn new() -> Self {
51        Self {
52            signature: Signature::new(
53                TypeSignature::Exact(vec![DataType::LargeBinary, DataType::LargeBinary]),
54                Volatility::Immutable,
55            ),
56        }
57    }
58}
59
60impl Default for JsonbMerge {
61    fn default() -> Self {
62        Self::new()
63    }
64}
65
66impl PartialEq for JsonbMerge {
67    fn eq(&self, _other: &Self) -> bool {
68        true
69    }
70}
71
72impl Eq for JsonbMerge {}
73
74impl Hash for JsonbMerge {
75    fn hash<H: Hasher>(&self, state: &mut H) {
76        "jsonb_merge".hash(state);
77    }
78}
79
80impl ScalarUDFImpl for JsonbMerge {
81    fn as_any(&self) -> &dyn Any {
82        self
83    }
84
85    fn name(&self) -> &'static str {
86        "jsonb_merge"
87    }
88
89    fn signature(&self) -> &Signature {
90        &self.signature
91    }
92
93    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
94        Ok(DataType::LargeBinary)
95    }
96
97    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
98        let expanded = expand_args(&args.args)?;
99        let left_arr = expanded[0]
100            .as_any()
101            .downcast_ref::<LargeBinaryArray>()
102            .ok_or_else(|| {
103                datafusion_common::DataFusionError::Internal(
104                    "jsonb_merge: first arg must be LargeBinary".into(),
105                )
106            })?;
107        let right_arr = expanded[1]
108            .as_any()
109            .downcast_ref::<LargeBinaryArray>()
110            .ok_or_else(|| {
111                datafusion_common::DataFusionError::Internal(
112                    "jsonb_merge: second arg must be LargeBinary".into(),
113                )
114            })?;
115
116        let mut builder = LargeBinaryBuilder::with_capacity(left_arr.len(), 256);
117        for i in 0..left_arr.len() {
118            if left_arr.is_null(i) || right_arr.is_null(i) {
119                builder.append_null();
120            } else {
121                let left_val = json_types::jsonb_to_value(left_arr.value(i));
122                let right_val = json_types::jsonb_to_value(right_arr.value(i));
123                match (left_val, right_val) {
124                    (
125                        Some(serde_json::Value::Object(mut l)),
126                        Some(serde_json::Value::Object(r)),
127                    ) => {
128                        for (k, v) in r {
129                            l.insert(k, v);
130                        }
131                        builder
132                            .append_value(json_types::encode_jsonb(&serde_json::Value::Object(l)));
133                    }
134                    (_, Some(r)) => {
135                        builder.append_value(json_types::encode_jsonb(&r));
136                    }
137                    _ => builder.append_null(),
138                }
139            }
140        }
141        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
142    }
143}
144
145// ══════════════════════════════════════════════════════════════════
146// jsonb_deep_merge(jsonb, jsonb) -> jsonb — recursive merge
147// ══════════════════════════════════════════════════════════════════
148
149/// `jsonb_deep_merge(jsonb, jsonb) -> jsonb`
150///
151/// Recursively merges two JSONB objects. When both sides have an object
152/// at the same key, the merge recurses. Otherwise second wins.
153#[derive(Debug)]
154pub struct JsonbDeepMerge {
155    signature: Signature,
156}
157
158impl JsonbDeepMerge {
159    /// Creates a new `jsonb_deep_merge` UDF.
160    #[must_use]
161    pub fn new() -> Self {
162        Self {
163            signature: Signature::new(
164                TypeSignature::Exact(vec![DataType::LargeBinary, DataType::LargeBinary]),
165                Volatility::Immutable,
166            ),
167        }
168    }
169}
170
171impl Default for JsonbDeepMerge {
172    fn default() -> Self {
173        Self::new()
174    }
175}
176
177impl PartialEq for JsonbDeepMerge {
178    fn eq(&self, _other: &Self) -> bool {
179        true
180    }
181}
182
183impl Eq for JsonbDeepMerge {}
184
185impl Hash for JsonbDeepMerge {
186    fn hash<H: Hasher>(&self, state: &mut H) {
187        "jsonb_deep_merge".hash(state);
188    }
189}
190
191fn deep_merge(
192    left: serde_json::Value,
193    right: serde_json::Value,
194    depth: usize,
195) -> std::result::Result<serde_json::Value, String> {
196    if depth > MAX_DEPTH {
197        return Err("jsonb_deep_merge: max depth exceeded".into());
198    }
199    match (left, right) {
200        (serde_json::Value::Object(mut l), serde_json::Value::Object(r)) => {
201            for (k, rv) in r {
202                let merged = if let Some(lv) = l.remove(&k) {
203                    deep_merge(lv, rv, depth + 1)?
204                } else {
205                    rv
206                };
207                l.insert(k, merged);
208            }
209            Ok(serde_json::Value::Object(l))
210        }
211        (_, r) => Ok(r),
212    }
213}
214
215impl ScalarUDFImpl for JsonbDeepMerge {
216    fn as_any(&self) -> &dyn Any {
217        self
218    }
219
220    fn name(&self) -> &'static str {
221        "jsonb_deep_merge"
222    }
223
224    fn signature(&self) -> &Signature {
225        &self.signature
226    }
227
228    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
229        Ok(DataType::LargeBinary)
230    }
231
232    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
233        let expanded = expand_args(&args.args)?;
234        let left_arr = expanded[0]
235            .as_any()
236            .downcast_ref::<LargeBinaryArray>()
237            .ok_or_else(|| {
238                datafusion_common::DataFusionError::Internal(
239                    "jsonb_deep_merge: first arg must be LargeBinary".into(),
240                )
241            })?;
242        let right_arr = expanded[1]
243            .as_any()
244            .downcast_ref::<LargeBinaryArray>()
245            .ok_or_else(|| {
246                datafusion_common::DataFusionError::Internal(
247                    "jsonb_deep_merge: second arg must be LargeBinary".into(),
248                )
249            })?;
250
251        let mut builder = LargeBinaryBuilder::with_capacity(left_arr.len(), 256);
252        for i in 0..left_arr.len() {
253            if left_arr.is_null(i) || right_arr.is_null(i) {
254                builder.append_null();
255            } else {
256                let left_val = json_types::jsonb_to_value(left_arr.value(i));
257                let right_val = json_types::jsonb_to_value(right_arr.value(i));
258                match (left_val, right_val) {
259                    (Some(l), Some(r)) => {
260                        let merged = deep_merge(l, r, 0)
261                            .map_err(datafusion_common::DataFusionError::Execution)?;
262                        builder.append_value(json_types::encode_jsonb(&merged));
263                    }
264                    _ => builder.append_null(),
265                }
266            }
267        }
268        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
269    }
270}
271
272// ══════════════════════════════════════════════════════════════════
273// jsonb_strip_nulls(jsonb) -> jsonb
274// ══════════════════════════════════════════════════════════════════
275
276/// `jsonb_strip_nulls(jsonb) -> jsonb`
277///
278/// Recursively removes null-valued object fields. Array elements
279/// are recursed into but null elements are preserved (PostgreSQL semantics).
280#[derive(Debug)]
281pub struct JsonbStripNulls {
282    signature: Signature,
283}
284
285impl JsonbStripNulls {
286    /// Creates a new `jsonb_strip_nulls` UDF.
287    #[must_use]
288    pub fn new() -> Self {
289        Self {
290            signature: Signature::new(
291                TypeSignature::Exact(vec![DataType::LargeBinary]),
292                Volatility::Immutable,
293            ),
294        }
295    }
296}
297
298impl Default for JsonbStripNulls {
299    fn default() -> Self {
300        Self::new()
301    }
302}
303
304impl PartialEq for JsonbStripNulls {
305    fn eq(&self, _other: &Self) -> bool {
306        true
307    }
308}
309
310impl Eq for JsonbStripNulls {}
311
312impl Hash for JsonbStripNulls {
313    fn hash<H: Hasher>(&self, state: &mut H) {
314        "jsonb_strip_nulls".hash(state);
315    }
316}
317
318fn strip_nulls(val: serde_json::Value) -> serde_json::Value {
319    match val {
320        serde_json::Value::Object(obj) => {
321            let filtered: serde_json::Map<String, serde_json::Value> = obj
322                .into_iter()
323                .filter(|(_, v)| !v.is_null())
324                .map(|(k, v)| (k, strip_nulls(v)))
325                .collect();
326            serde_json::Value::Object(filtered)
327        }
328        serde_json::Value::Array(arr) => {
329            serde_json::Value::Array(arr.into_iter().map(strip_nulls).collect())
330        }
331        other => other,
332    }
333}
334
335impl ScalarUDFImpl for JsonbStripNulls {
336    fn as_any(&self) -> &dyn Any {
337        self
338    }
339
340    fn name(&self) -> &'static str {
341        "jsonb_strip_nulls"
342    }
343
344    fn signature(&self) -> &Signature {
345        &self.signature
346    }
347
348    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
349        Ok(DataType::LargeBinary)
350    }
351
352    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
353        let expanded = expand_args(&args.args)?;
354        let jsonb_arr = expanded[0]
355            .as_any()
356            .downcast_ref::<LargeBinaryArray>()
357            .ok_or_else(|| {
358                datafusion_common::DataFusionError::Internal(
359                    "jsonb_strip_nulls: arg must be LargeBinary".into(),
360                )
361            })?;
362
363        let mut builder = LargeBinaryBuilder::with_capacity(jsonb_arr.len(), 256);
364        for i in 0..jsonb_arr.len() {
365            if jsonb_arr.is_null(i) {
366                builder.append_null();
367            } else {
368                match json_types::jsonb_to_value(jsonb_arr.value(i)) {
369                    Some(val) => {
370                        builder.append_value(json_types::encode_jsonb(&strip_nulls(val)));
371                    }
372                    None => builder.append_null(),
373                }
374            }
375        }
376        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
377    }
378}
379
380// ══════════════════════════════════════════════════════════════════
381// jsonb_rename_keys(jsonb, map<text,text>) -> jsonb
382// ══════════════════════════════════════════════════════════════════
383
384/// `jsonb_rename_keys(jsonb, map<text,text>) -> jsonb`
385///
386/// Renames top-level object keys according to the given rename map.
387/// Keys not in the map are preserved unchanged.
388#[derive(Debug)]
389pub struct JsonbRenameKeys {
390    signature: Signature,
391}
392
393impl JsonbRenameKeys {
394    /// Creates a new `jsonb_rename_keys` UDF.
395    #[must_use]
396    pub fn new() -> Self {
397        Self {
398            signature: Signature::new(
399                TypeSignature::Exact(vec![
400                    DataType::LargeBinary,
401                    DataType::Map(
402                        Arc::new(arrow_schema::Field::new(
403                            "entries",
404                            DataType::Struct(
405                                vec![
406                                    arrow_schema::Field::new("key", DataType::Utf8, false),
407                                    arrow_schema::Field::new("value", DataType::Utf8, true),
408                                ]
409                                .into(),
410                            ),
411                            false,
412                        )),
413                        false,
414                    ),
415                ]),
416                Volatility::Immutable,
417            ),
418        }
419    }
420}
421
422impl Default for JsonbRenameKeys {
423    fn default() -> Self {
424        Self::new()
425    }
426}
427
428impl PartialEq for JsonbRenameKeys {
429    fn eq(&self, _other: &Self) -> bool {
430        true
431    }
432}
433
434impl Eq for JsonbRenameKeys {}
435
436impl Hash for JsonbRenameKeys {
437    fn hash<H: Hasher>(&self, state: &mut H) {
438        "jsonb_rename_keys".hash(state);
439    }
440}
441
442impl ScalarUDFImpl for JsonbRenameKeys {
443    fn as_any(&self) -> &dyn Any {
444        self
445    }
446
447    fn name(&self) -> &'static str {
448        "jsonb_rename_keys"
449    }
450
451    fn signature(&self) -> &Signature {
452        &self.signature
453    }
454
455    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
456        Ok(DataType::LargeBinary)
457    }
458
459    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
460        let expanded = expand_args(&args.args)?;
461        let jsonb_arr = expanded[0]
462            .as_any()
463            .downcast_ref::<LargeBinaryArray>()
464            .ok_or_else(|| {
465                datafusion_common::DataFusionError::Internal(
466                    "jsonb_rename_keys: first arg must be LargeBinary".into(),
467                )
468            })?;
469        let map_arr = expanded[1]
470            .as_any()
471            .downcast_ref::<MapArray>()
472            .ok_or_else(|| {
473                datafusion_common::DataFusionError::Internal(
474                    "jsonb_rename_keys: second arg must be Map<Utf8,Utf8>".into(),
475                )
476            })?;
477
478        let mut builder = LargeBinaryBuilder::with_capacity(jsonb_arr.len(), 256);
479        for i in 0..jsonb_arr.len() {
480            if jsonb_arr.is_null(i) || map_arr.is_null(i) {
481                builder.append_null();
482            } else {
483                let rename_map = extract_string_map(map_arr, i);
484                match json_types::jsonb_to_value(jsonb_arr.value(i)) {
485                    Some(serde_json::Value::Object(obj)) => {
486                        let renamed: serde_json::Map<String, serde_json::Value> = obj
487                            .into_iter()
488                            .map(|(k, v)| {
489                                let new_key = rename_map.get(k.as_str()).cloned().unwrap_or(k);
490                                (new_key, v)
491                            })
492                            .collect();
493                        builder.append_value(json_types::encode_jsonb(&serde_json::Value::Object(
494                            renamed,
495                        )));
496                    }
497                    Some(other) => {
498                        builder.append_value(json_types::encode_jsonb(&other));
499                    }
500                    None => builder.append_null(),
501                }
502            }
503        }
504        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
505    }
506}
507
508/// Extract a `HashMap<String, String>` from a `MapArray` at row `i`.
509#[allow(clippy::disallowed_types)] // cold path: DataFusion integration
510fn extract_string_map(map_arr: &MapArray, row: usize) -> std::collections::HashMap<String, String> {
511    let mut result = std::collections::HashMap::new();
512    let entries = map_arr.value(row);
513    let struct_arr = entries
514        .as_any()
515        .downcast_ref::<arrow_array::StructArray>()
516        .unwrap();
517    let keys = struct_arr
518        .column(0)
519        .as_any()
520        .downcast_ref::<StringArray>()
521        .unwrap();
522    let vals = struct_arr
523        .column(1)
524        .as_any()
525        .downcast_ref::<StringArray>()
526        .unwrap();
527    for j in 0..keys.len() {
528        if !keys.is_null(j) && !vals.is_null(j) {
529            result.insert(keys.value(j).to_owned(), vals.value(j).to_owned());
530        }
531    }
532    result
533}
534
535// ══════════════════════════════════════════════════════════════════
536// jsonb_pick(jsonb, text[]) -> jsonb
537// ══════════════════════════════════════════════════════════════════
538
539/// `jsonb_pick(jsonb, text[]) -> jsonb`
540///
541/// Returns a new JSONB object containing only the specified keys.
542#[derive(Debug)]
543pub struct JsonbPick {
544    signature: Signature,
545}
546
547impl JsonbPick {
548    /// Creates a new `jsonb_pick` UDF.
549    #[must_use]
550    pub fn new() -> Self {
551        Self {
552            signature: Signature::new(
553                TypeSignature::Exact(vec![
554                    DataType::LargeBinary,
555                    DataType::List(Arc::new(arrow_schema::Field::new(
556                        "item",
557                        DataType::Utf8,
558                        true,
559                    ))),
560                ]),
561                Volatility::Immutable,
562            ),
563        }
564    }
565}
566
567impl Default for JsonbPick {
568    fn default() -> Self {
569        Self::new()
570    }
571}
572
573impl PartialEq for JsonbPick {
574    fn eq(&self, _other: &Self) -> bool {
575        true
576    }
577}
578
579impl Eq for JsonbPick {}
580
581impl Hash for JsonbPick {
582    fn hash<H: Hasher>(&self, state: &mut H) {
583        "jsonb_pick".hash(state);
584    }
585}
586
587impl ScalarUDFImpl for JsonbPick {
588    fn as_any(&self) -> &dyn Any {
589        self
590    }
591
592    fn name(&self) -> &'static str {
593        "jsonb_pick"
594    }
595
596    fn signature(&self) -> &Signature {
597        &self.signature
598    }
599
600    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
601        Ok(DataType::LargeBinary)
602    }
603
604    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
605        let expanded = expand_args(&args.args)?;
606        let jsonb_arr = expanded[0]
607            .as_any()
608            .downcast_ref::<LargeBinaryArray>()
609            .ok_or_else(|| {
610                datafusion_common::DataFusionError::Internal(
611                    "jsonb_pick: first arg must be LargeBinary".into(),
612                )
613            })?;
614        let keys_arr = expanded[1]
615            .as_any()
616            .downcast_ref::<ListArray>()
617            .ok_or_else(|| {
618                datafusion_common::DataFusionError::Internal(
619                    "jsonb_pick: second arg must be List<Utf8>".into(),
620                )
621            })?;
622
623        let mut builder = LargeBinaryBuilder::with_capacity(jsonb_arr.len(), 256);
624        for i in 0..jsonb_arr.len() {
625            if jsonb_arr.is_null(i) || keys_arr.is_null(i) {
626                builder.append_null();
627            } else {
628                let key_set = extract_string_set(keys_arr, i);
629                match json_types::jsonb_to_value(jsonb_arr.value(i)) {
630                    Some(serde_json::Value::Object(obj)) => {
631                        let picked: serde_json::Map<String, serde_json::Value> = obj
632                            .into_iter()
633                            .filter(|(k, _)| key_set.contains(k.as_str()))
634                            .collect();
635                        builder.append_value(json_types::encode_jsonb(&serde_json::Value::Object(
636                            picked,
637                        )));
638                    }
639                    Some(other) => {
640                        builder.append_value(json_types::encode_jsonb(&other));
641                    }
642                    None => builder.append_null(),
643                }
644            }
645        }
646        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
647    }
648}
649
650/// Extract a `HashSet<String>` from a `ListArray<Utf8>` at row `i`.
651fn extract_string_set(list_arr: &ListArray, row: usize) -> HashSet<String> {
652    let values = list_arr.value(row);
653    let str_arr = values.as_any().downcast_ref::<StringArray>();
654    let mut result = HashSet::new();
655    if let Some(arr) = str_arr {
656        for j in 0..arr.len() {
657            if !arr.is_null(j) {
658                result.insert(arr.value(j).to_owned());
659            }
660        }
661    }
662    result
663}
664
665// ══════════════════════════════════════════════════════════════════
666// jsonb_except(jsonb, text[]) -> jsonb
667// ══════════════════════════════════════════════════════════════════
668
669/// `jsonb_except(jsonb, text[]) -> jsonb`
670///
671/// Returns a new JSONB object excluding the specified keys.
672#[derive(Debug)]
673pub struct JsonbExcept {
674    signature: Signature,
675}
676
677impl JsonbExcept {
678    /// Creates a new `jsonb_except` UDF.
679    #[must_use]
680    pub fn new() -> Self {
681        Self {
682            signature: Signature::new(
683                TypeSignature::Exact(vec![
684                    DataType::LargeBinary,
685                    DataType::List(Arc::new(arrow_schema::Field::new(
686                        "item",
687                        DataType::Utf8,
688                        true,
689                    ))),
690                ]),
691                Volatility::Immutable,
692            ),
693        }
694    }
695}
696
697impl Default for JsonbExcept {
698    fn default() -> Self {
699        Self::new()
700    }
701}
702
703impl PartialEq for JsonbExcept {
704    fn eq(&self, _other: &Self) -> bool {
705        true
706    }
707}
708
709impl Eq for JsonbExcept {}
710
711impl Hash for JsonbExcept {
712    fn hash<H: Hasher>(&self, state: &mut H) {
713        "jsonb_except".hash(state);
714    }
715}
716
717impl ScalarUDFImpl for JsonbExcept {
718    fn as_any(&self) -> &dyn Any {
719        self
720    }
721
722    fn name(&self) -> &'static str {
723        "jsonb_except"
724    }
725
726    fn signature(&self) -> &Signature {
727        &self.signature
728    }
729
730    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
731        Ok(DataType::LargeBinary)
732    }
733
734    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
735        let expanded = expand_args(&args.args)?;
736        let jsonb_arr = expanded[0]
737            .as_any()
738            .downcast_ref::<LargeBinaryArray>()
739            .ok_or_else(|| {
740                datafusion_common::DataFusionError::Internal(
741                    "jsonb_except: first arg must be LargeBinary".into(),
742                )
743            })?;
744        let keys_arr = expanded[1]
745            .as_any()
746            .downcast_ref::<ListArray>()
747            .ok_or_else(|| {
748                datafusion_common::DataFusionError::Internal(
749                    "jsonb_except: second arg must be List<Utf8>".into(),
750                )
751            })?;
752
753        let mut builder = LargeBinaryBuilder::with_capacity(jsonb_arr.len(), 256);
754        for i in 0..jsonb_arr.len() {
755            if jsonb_arr.is_null(i) || keys_arr.is_null(i) {
756                builder.append_null();
757            } else {
758                let exclude_set = extract_string_set(keys_arr, i);
759                match json_types::jsonb_to_value(jsonb_arr.value(i)) {
760                    Some(serde_json::Value::Object(obj)) => {
761                        let filtered: serde_json::Map<String, serde_json::Value> = obj
762                            .into_iter()
763                            .filter(|(k, _)| !exclude_set.contains(k.as_str()))
764                            .collect();
765                        builder.append_value(json_types::encode_jsonb(&serde_json::Value::Object(
766                            filtered,
767                        )));
768                    }
769                    Some(other) => {
770                        builder.append_value(json_types::encode_jsonb(&other));
771                    }
772                    None => builder.append_null(),
773                }
774            }
775        }
776        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
777    }
778}
779
780// ══════════════════════════════════════════════════════════════════
781// jsonb_flatten(jsonb, text) -> jsonb
782// ══════════════════════════════════════════════════════════════════
783
784/// `jsonb_flatten(jsonb, separator) -> jsonb`
785///
786/// Flattens a nested JSONB object into a single-level object with
787/// dot-path keys. Arrays are indexed numerically (e.g. `tags.0`).
788#[derive(Debug)]
789pub struct JsonbFlatten {
790    signature: Signature,
791}
792
793impl JsonbFlatten {
794    /// Creates a new `jsonb_flatten` UDF.
795    #[must_use]
796    pub fn new() -> Self {
797        Self {
798            signature: Signature::new(
799                TypeSignature::Exact(vec![DataType::LargeBinary, DataType::Utf8]),
800                Volatility::Immutable,
801            ),
802        }
803    }
804}
805
806impl Default for JsonbFlatten {
807    fn default() -> Self {
808        Self::new()
809    }
810}
811
812impl PartialEq for JsonbFlatten {
813    fn eq(&self, _other: &Self) -> bool {
814        true
815    }
816}
817
818impl Eq for JsonbFlatten {}
819
820impl Hash for JsonbFlatten {
821    fn hash<H: Hasher>(&self, state: &mut H) {
822        "jsonb_flatten".hash(state);
823    }
824}
825
826fn flatten_value(
827    val: &serde_json::Value,
828    prefix: &str,
829    sep: &str,
830    out: &mut serde_json::Map<String, serde_json::Value>,
831    depth: usize,
832) -> std::result::Result<(), String> {
833    if depth > MAX_DEPTH {
834        return Err("jsonb_flatten: max depth exceeded".into());
835    }
836    match val {
837        serde_json::Value::Object(obj) => {
838            for (k, v) in obj {
839                let new_key = if prefix.is_empty() {
840                    k.clone()
841                } else {
842                    format!("{prefix}{sep}{k}")
843                };
844                flatten_value(v, &new_key, sep, out, depth + 1)?;
845            }
846        }
847        serde_json::Value::Array(arr) => {
848            for (idx, v) in arr.iter().enumerate() {
849                let new_key = if prefix.is_empty() {
850                    idx.to_string()
851                } else {
852                    format!("{prefix}{sep}{idx}")
853                };
854                flatten_value(v, &new_key, sep, out, depth + 1)?;
855            }
856        }
857        _ => {
858            out.insert(prefix.to_owned(), val.clone());
859        }
860    }
861    Ok(())
862}
863
864impl ScalarUDFImpl for JsonbFlatten {
865    fn as_any(&self) -> &dyn Any {
866        self
867    }
868
869    fn name(&self) -> &'static str {
870        "jsonb_flatten"
871    }
872
873    fn signature(&self) -> &Signature {
874        &self.signature
875    }
876
877    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
878        Ok(DataType::LargeBinary)
879    }
880
881    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
882        let expanded = expand_args(&args.args)?;
883        let jsonb_arr = expanded[0]
884            .as_any()
885            .downcast_ref::<LargeBinaryArray>()
886            .ok_or_else(|| {
887                datafusion_common::DataFusionError::Internal(
888                    "jsonb_flatten: first arg must be LargeBinary".into(),
889                )
890            })?;
891        let sep_arr = expanded[1]
892            .as_any()
893            .downcast_ref::<StringArray>()
894            .ok_or_else(|| {
895                datafusion_common::DataFusionError::Internal(
896                    "jsonb_flatten: second arg must be Utf8".into(),
897                )
898            })?;
899
900        let mut builder = LargeBinaryBuilder::with_capacity(jsonb_arr.len(), 256);
901        for i in 0..jsonb_arr.len() {
902            if jsonb_arr.is_null(i) || sep_arr.is_null(i) {
903                builder.append_null();
904            } else {
905                let sep = sep_arr.value(i);
906                match json_types::jsonb_to_value(jsonb_arr.value(i)) {
907                    Some(val) => {
908                        let mut flat = serde_json::Map::new();
909                        flatten_value(&val, "", sep, &mut flat, 0)
910                            .map_err(datafusion_common::DataFusionError::Execution)?;
911                        builder.append_value(json_types::encode_jsonb(&serde_json::Value::Object(
912                            flat,
913                        )));
914                    }
915                    None => builder.append_null(),
916                }
917            }
918        }
919        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
920    }
921}
922
923// ══════════════════════════════════════════════════════════════════
924// jsonb_unflatten(jsonb, text) -> jsonb
925// ══════════════════════════════════════════════════════════════════
926
927/// `jsonb_unflatten(jsonb, separator) -> jsonb`
928///
929/// Rebuilds a nested JSONB object from a flat key-value structure.
930/// Keys are split by separator and nested accordingly. Numeric keys
931/// stay as object keys (not converted to arrays).
932#[derive(Debug)]
933pub struct JsonbUnflatten {
934    signature: Signature,
935}
936
937impl JsonbUnflatten {
938    /// Creates a new `jsonb_unflatten` UDF.
939    #[must_use]
940    pub fn new() -> Self {
941        Self {
942            signature: Signature::new(
943                TypeSignature::Exact(vec![DataType::LargeBinary, DataType::Utf8]),
944                Volatility::Immutable,
945            ),
946        }
947    }
948}
949
950impl Default for JsonbUnflatten {
951    fn default() -> Self {
952        Self::new()
953    }
954}
955
956impl PartialEq for JsonbUnflatten {
957    fn eq(&self, _other: &Self) -> bool {
958        true
959    }
960}
961
962impl Eq for JsonbUnflatten {}
963
964impl Hash for JsonbUnflatten {
965    fn hash<H: Hasher>(&self, state: &mut H) {
966        "jsonb_unflatten".hash(state);
967    }
968}
969
970fn unflatten_insert(root: &mut serde_json::Value, parts: &[&str], value: serde_json::Value) {
971    if parts.is_empty() {
972        return;
973    }
974    if parts.len() == 1 {
975        if let serde_json::Value::Object(obj) = root {
976            obj.insert(parts[0].to_owned(), value);
977        }
978        return;
979    }
980    if let serde_json::Value::Object(obj) = root {
981        let child = obj
982            .entry(parts[0].to_owned())
983            .or_insert_with(|| serde_json::Value::Object(serde_json::Map::new()));
984        unflatten_insert(child, &parts[1..], value);
985    }
986}
987
988impl ScalarUDFImpl for JsonbUnflatten {
989    fn as_any(&self) -> &dyn Any {
990        self
991    }
992
993    fn name(&self) -> &'static str {
994        "jsonb_unflatten"
995    }
996
997    fn signature(&self) -> &Signature {
998        &self.signature
999    }
1000
1001    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
1002        Ok(DataType::LargeBinary)
1003    }
1004
1005    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
1006        let expanded = expand_args(&args.args)?;
1007        let jsonb_arr = expanded[0]
1008            .as_any()
1009            .downcast_ref::<LargeBinaryArray>()
1010            .ok_or_else(|| {
1011                datafusion_common::DataFusionError::Internal(
1012                    "jsonb_unflatten: first arg must be LargeBinary".into(),
1013                )
1014            })?;
1015        let sep_arr = expanded[1]
1016            .as_any()
1017            .downcast_ref::<StringArray>()
1018            .ok_or_else(|| {
1019                datafusion_common::DataFusionError::Internal(
1020                    "jsonb_unflatten: second arg must be Utf8".into(),
1021                )
1022            })?;
1023
1024        let mut builder = LargeBinaryBuilder::with_capacity(jsonb_arr.len(), 256);
1025        for i in 0..jsonb_arr.len() {
1026            if jsonb_arr.is_null(i) || sep_arr.is_null(i) {
1027                builder.append_null();
1028            } else {
1029                let sep = sep_arr.value(i);
1030                match json_types::jsonb_to_value(jsonb_arr.value(i)) {
1031                    Some(serde_json::Value::Object(flat)) => {
1032                        let mut root = serde_json::Value::Object(serde_json::Map::new());
1033                        for (key, val) in flat {
1034                            let parts: Vec<&str> = key.split(sep).collect();
1035                            unflatten_insert(&mut root, &parts, val);
1036                        }
1037                        builder.append_value(json_types::encode_jsonb(&root));
1038                    }
1039                    Some(other) => {
1040                        builder.append_value(json_types::encode_jsonb(&other));
1041                    }
1042                    None => builder.append_null(),
1043                }
1044            }
1045        }
1046        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
1047    }
1048}
1049
1050// ══════════════════════════════════════════════════════════════════
1051// json_to_columns(jsonb, text) -> jsonb  (runtime fallback)
1052// ══════════════════════════════════════════════════════════════════
1053
1054/// `json_to_columns(jsonb, type_spec) -> jsonb`
1055///
1056/// Runtime fallback for structured extraction. Parses the type_spec
1057/// to determine field names, extracts each from the JSONB object,
1058/// and returns the result as a new JSONB object containing only
1059/// those fields. Full plan-time struct rewriting is deferred.
1060#[derive(Debug)]
1061pub struct JsonToColumns {
1062    signature: Signature,
1063}
1064
1065impl JsonToColumns {
1066    /// Creates a new `json_to_columns` UDF.
1067    #[must_use]
1068    pub fn new() -> Self {
1069        Self {
1070            signature: Signature::new(
1071                TypeSignature::Exact(vec![DataType::LargeBinary, DataType::Utf8]),
1072                Volatility::Immutable,
1073            ),
1074        }
1075    }
1076}
1077
1078impl Default for JsonToColumns {
1079    fn default() -> Self {
1080        Self::new()
1081    }
1082}
1083
1084impl PartialEq for JsonToColumns {
1085    fn eq(&self, _other: &Self) -> bool {
1086        true
1087    }
1088}
1089
1090impl Eq for JsonToColumns {}
1091
1092impl Hash for JsonToColumns {
1093    fn hash<H: Hasher>(&self, state: &mut H) {
1094        "json_to_columns".hash(state);
1095    }
1096}
1097
1098/// Parse a type_spec like `"name VARCHAR, age BIGINT, active BOOLEAN"`
1099/// into a list of field names.
1100fn parse_type_spec_fields(spec: &str) -> Vec<String> {
1101    spec.split(',')
1102        .filter_map(|part| {
1103            let trimmed = part.trim();
1104            trimmed.split_whitespace().next().map(ToOwned::to_owned)
1105        })
1106        .collect()
1107}
1108
1109impl ScalarUDFImpl for JsonToColumns {
1110    fn as_any(&self) -> &dyn Any {
1111        self
1112    }
1113
1114    fn name(&self) -> &'static str {
1115        "json_to_columns"
1116    }
1117
1118    fn signature(&self) -> &Signature {
1119        &self.signature
1120    }
1121
1122    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
1123        Ok(DataType::LargeBinary)
1124    }
1125
1126    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
1127        let expanded = expand_args(&args.args)?;
1128        let jsonb_arr = expanded[0]
1129            .as_any()
1130            .downcast_ref::<LargeBinaryArray>()
1131            .ok_or_else(|| {
1132                datafusion_common::DataFusionError::Internal(
1133                    "json_to_columns: first arg must be LargeBinary".into(),
1134                )
1135            })?;
1136        let spec_arr = expanded[1]
1137            .as_any()
1138            .downcast_ref::<StringArray>()
1139            .ok_or_else(|| {
1140                datafusion_common::DataFusionError::Internal(
1141                    "json_to_columns: second arg must be Utf8".into(),
1142                )
1143            })?;
1144
1145        let mut builder = LargeBinaryBuilder::with_capacity(jsonb_arr.len(), 256);
1146        for i in 0..jsonb_arr.len() {
1147            if jsonb_arr.is_null(i) || spec_arr.is_null(i) {
1148                builder.append_null();
1149            } else {
1150                let fields = parse_type_spec_fields(spec_arr.value(i));
1151                let field_set: HashSet<&str> = fields.iter().map(String::as_str).collect();
1152                match json_types::jsonb_to_value(jsonb_arr.value(i)) {
1153                    Some(serde_json::Value::Object(obj)) => {
1154                        let picked: serde_json::Map<String, serde_json::Value> = obj
1155                            .into_iter()
1156                            .filter(|(k, _)| field_set.contains(k.as_str()))
1157                            .collect();
1158                        builder.append_value(json_types::encode_jsonb(&serde_json::Value::Object(
1159                            picked,
1160                        )));
1161                    }
1162                    Some(other) => {
1163                        builder.append_value(json_types::encode_jsonb(&other));
1164                    }
1165                    None => builder.append_null(),
1166                }
1167            }
1168        }
1169        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
1170    }
1171}
1172
1173// ══════════════════════════════════════════════════════════════════
1174// json_infer_schema(jsonb) -> text
1175// ══════════════════════════════════════════════════════════════════
1176
1177/// `json_infer_schema(jsonb) -> text`
1178///
1179/// Infers the SQL schema of a JSONB value, returning a JSON object
1180/// mapping field names to SQL type names.
1181#[derive(Debug)]
1182pub struct JsonInferSchema {
1183    signature: Signature,
1184}
1185
1186impl JsonInferSchema {
1187    /// Creates a new `json_infer_schema` UDF.
1188    #[must_use]
1189    pub fn new() -> Self {
1190        Self {
1191            signature: Signature::new(
1192                TypeSignature::Exact(vec![DataType::LargeBinary]),
1193                Volatility::Immutable,
1194            ),
1195        }
1196    }
1197}
1198
1199impl Default for JsonInferSchema {
1200    fn default() -> Self {
1201        Self::new()
1202    }
1203}
1204
1205impl PartialEq for JsonInferSchema {
1206    fn eq(&self, _other: &Self) -> bool {
1207        true
1208    }
1209}
1210
1211impl Eq for JsonInferSchema {}
1212
1213impl Hash for JsonInferSchema {
1214    fn hash<H: Hasher>(&self, state: &mut H) {
1215        "json_infer_schema".hash(state);
1216    }
1217}
1218
1219fn infer_type(val: &serde_json::Value) -> String {
1220    match val {
1221        serde_json::Value::Null => "NULL".to_owned(),
1222        serde_json::Value::Bool(_) => "BOOLEAN".to_owned(),
1223        serde_json::Value::Number(n) => {
1224            if n.is_i64() || n.is_u64() {
1225                "BIGINT".to_owned()
1226            } else {
1227                "DOUBLE".to_owned()
1228            }
1229        }
1230        serde_json::Value::String(_) => "VARCHAR".to_owned(),
1231        serde_json::Value::Array(arr) => {
1232            let inner = arr.first().map_or("NULL".to_owned(), infer_type);
1233            format!("ARRAY<{inner}>")
1234        }
1235        serde_json::Value::Object(obj) => {
1236            let fields: Vec<String> = obj
1237                .iter()
1238                .map(|(k, v)| format!("{k} {}", infer_type(v)))
1239                .collect();
1240            format!("STRUCT({})", fields.join(", "))
1241        }
1242    }
1243}
1244
1245impl ScalarUDFImpl for JsonInferSchema {
1246    fn as_any(&self) -> &dyn Any {
1247        self
1248    }
1249
1250    fn name(&self) -> &'static str {
1251        "json_infer_schema"
1252    }
1253
1254    fn signature(&self) -> &Signature {
1255        &self.signature
1256    }
1257
1258    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
1259        Ok(DataType::Utf8)
1260    }
1261
1262    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
1263        let expanded = expand_args(&args.args)?;
1264        let jsonb_arr = expanded[0]
1265            .as_any()
1266            .downcast_ref::<LargeBinaryArray>()
1267            .ok_or_else(|| {
1268                datafusion_common::DataFusionError::Internal(
1269                    "json_infer_schema: arg must be LargeBinary".into(),
1270                )
1271            })?;
1272
1273        let mut builder = StringBuilder::with_capacity(jsonb_arr.len(), 256);
1274        for i in 0..jsonb_arr.len() {
1275            if jsonb_arr.is_null(i) {
1276                builder.append_null();
1277            } else {
1278                match json_types::jsonb_to_value(jsonb_arr.value(i)) {
1279                    Some(serde_json::Value::Object(obj)) => {
1280                        let schema: serde_json::Map<String, serde_json::Value> = obj
1281                            .iter()
1282                            .map(|(k, v)| (k.clone(), serde_json::Value::String(infer_type(v))))
1283                            .collect();
1284                        builder.append_value(
1285                            serde_json::to_string(&serde_json::Value::Object(schema))
1286                                .unwrap_or_default(),
1287                        );
1288                    }
1289                    Some(val) => {
1290                        builder.append_value(infer_type(&val));
1291                    }
1292                    None => builder.append_null(),
1293                }
1294            }
1295        }
1296        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
1297    }
1298}
1299
1300// ══════════════════════════════════════════════════════════════════
1301// Registration
1302// ══════════════════════════════════════════════════════════════════
1303
1304/// Registers all JSON extension UDFs with the given session context.
1305pub fn register_json_extensions(ctx: &datafusion::prelude::SessionContext) {
1306    use datafusion_expr::ScalarUDF;
1307
1308    ctx.register_udf(ScalarUDF::new_from_impl(JsonbMerge::new()));
1309    ctx.register_udf(ScalarUDF::new_from_impl(JsonbDeepMerge::new()));
1310    ctx.register_udf(ScalarUDF::new_from_impl(JsonbStripNulls::new()));
1311    ctx.register_udf(ScalarUDF::new_from_impl(JsonbRenameKeys::new()));
1312    ctx.register_udf(ScalarUDF::new_from_impl(JsonbPick::new()));
1313    ctx.register_udf(ScalarUDF::new_from_impl(JsonbExcept::new()));
1314    ctx.register_udf(ScalarUDF::new_from_impl(JsonbFlatten::new()));
1315    ctx.register_udf(ScalarUDF::new_from_impl(JsonbUnflatten::new()));
1316    ctx.register_udf(ScalarUDF::new_from_impl(JsonToColumns::new()));
1317    ctx.register_udf(ScalarUDF::new_from_impl(JsonInferSchema::new()));
1318}
1319
1320// ══════════════════════════════════════════════════════════════════
1321// Tests
1322// ══════════════════════════════════════════════════════════════════
1323
1324#[cfg(test)]
1325mod tests {
1326    use super::*;
1327    use arrow_array::builder::{MapBuilder, StringBuilder as MapSB};
1328    use arrow_array::ArrayRef;
1329    use arrow_schema::Field;
1330    use datafusion_common::config::ConfigOptions;
1331    use serde_json::json;
1332
1333    fn enc(v: &serde_json::Value) -> Vec<u8> {
1334        json_types::encode_jsonb(v)
1335    }
1336
1337    fn make_jsonb_array(vals: &[serde_json::Value]) -> LargeBinaryArray {
1338        let encoded: Vec<Vec<u8>> = vals.iter().map(enc).collect();
1339        let refs: Vec<&[u8]> = encoded.iter().map(Vec::as_slice).collect();
1340        LargeBinaryArray::from_iter_values(refs)
1341    }
1342
1343    fn make_args_2(a: ArrayRef, b: ArrayRef) -> ScalarFunctionArgs {
1344        ScalarFunctionArgs {
1345            args: vec![ColumnarValue::Array(a), ColumnarValue::Array(b)],
1346            arg_fields: vec![],
1347            number_rows: 0,
1348            return_field: Arc::new(Field::new("output", DataType::LargeBinary, true)),
1349            config_options: Arc::new(ConfigOptions::default()),
1350        }
1351    }
1352
1353    fn make_args_1(a: ArrayRef) -> ScalarFunctionArgs {
1354        ScalarFunctionArgs {
1355            args: vec![ColumnarValue::Array(a)],
1356            arg_fields: vec![],
1357            number_rows: 0,
1358            return_field: Arc::new(Field::new("output", DataType::LargeBinary, true)),
1359            config_options: Arc::new(ConfigOptions::default()),
1360        }
1361    }
1362
1363    fn decode_jsonb_result(result: ColumnarValue, row: usize) -> serde_json::Value {
1364        let ColumnarValue::Array(arr) = result else {
1365            panic!("expected array")
1366        };
1367        let bin = arr.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
1368        assert!(!bin.is_null(row), "unexpected null at row {row}");
1369        json_types::jsonb_to_value(bin.value(row)).expect("invalid jsonb")
1370    }
1371
1372    fn decode_text_result(result: ColumnarValue, row: usize) -> String {
1373        let ColumnarValue::Array(arr) = result else {
1374            panic!("expected array")
1375        };
1376        let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
1377        str_arr.value(row).to_owned()
1378    }
1379
1380    // ── jsonb_merge tests ─────────────────────────────────────
1381
1382    #[test]
1383    fn test_json_ext_merge_objects() {
1384        let udf = JsonbMerge::new();
1385        let left = make_jsonb_array(&[json!({"a": 1, "b": 2})]);
1386        let right = make_jsonb_array(&[json!({"b": 99, "c": 3})]);
1387        let result = udf
1388            .invoke_with_args(make_args_2(Arc::new(left), Arc::new(right)))
1389            .unwrap();
1390        assert_eq!(
1391            decode_jsonb_result(result, 0),
1392            json!({"a": 1, "b": 99, "c": 3})
1393        );
1394    }
1395
1396    #[test]
1397    fn test_json_ext_merge_non_object() {
1398        let udf = JsonbMerge::new();
1399        let left = make_jsonb_array(&[json!(42)]);
1400        let right = make_jsonb_array(&[json!("hello")]);
1401        let result = udf
1402            .invoke_with_args(make_args_2(Arc::new(left), Arc::new(right)))
1403            .unwrap();
1404        assert_eq!(decode_jsonb_result(result, 0), json!("hello"));
1405    }
1406
1407    // ── jsonb_deep_merge tests ────────────────────────────────
1408
1409    #[test]
1410    fn test_json_ext_deep_merge() {
1411        let udf = JsonbDeepMerge::new();
1412        let left = make_jsonb_array(&[json!({"a": {"x": 1, "y": 2}, "b": 10})]);
1413        let right = make_jsonb_array(&[json!({"a": {"y": 99, "z": 3}, "c": 20})]);
1414        let result = udf
1415            .invoke_with_args(make_args_2(Arc::new(left), Arc::new(right)))
1416            .unwrap();
1417        assert_eq!(
1418            decode_jsonb_result(result, 0),
1419            json!({"a": {"x": 1, "y": 99, "z": 3}, "b": 10, "c": 20})
1420        );
1421    }
1422
1423    #[test]
1424    fn test_json_ext_deep_merge_non_object_override() {
1425        let udf = JsonbDeepMerge::new();
1426        let left = make_jsonb_array(&[json!({"a": {"x": 1}})]);
1427        let right = make_jsonb_array(&[json!({"a": 42})]);
1428        let result = udf
1429            .invoke_with_args(make_args_2(Arc::new(left), Arc::new(right)))
1430            .unwrap();
1431        assert_eq!(decode_jsonb_result(result, 0), json!({"a": 42}));
1432    }
1433
1434    // ── jsonb_strip_nulls tests ───────────────────────────────
1435
1436    #[test]
1437    fn test_json_ext_strip_nulls() {
1438        let udf = JsonbStripNulls::new();
1439        let input = make_jsonb_array(&[json!({"a": 1, "b": null, "c": {"d": null, "e": 2}})]);
1440        let result = udf.invoke_with_args(make_args_1(Arc::new(input))).unwrap();
1441        assert_eq!(
1442            decode_jsonb_result(result, 0),
1443            json!({"a": 1, "c": {"e": 2}})
1444        );
1445    }
1446
1447    #[test]
1448    fn test_json_ext_strip_nulls_array_preserved() {
1449        let udf = JsonbStripNulls::new();
1450        let input = make_jsonb_array(&[json!({"arr": [1, null, 3]})]);
1451        let result = udf.invoke_with_args(make_args_1(Arc::new(input))).unwrap();
1452        assert_eq!(decode_jsonb_result(result, 0), json!({"arr": [1, null, 3]}));
1453    }
1454
1455    // ── jsonb_rename_keys tests ───────────────────────────────
1456
1457    #[test]
1458    fn test_json_ext_rename_keys() {
1459        let udf = JsonbRenameKeys::new();
1460        let input = make_jsonb_array(&[json!({"old_name": 1, "keep": 2})]);
1461
1462        // Build a MapArray with one row: {"old_name": "new_name"}
1463        let key_builder = MapSB::new();
1464        let val_builder = MapSB::new();
1465        let mut map_builder = MapBuilder::new(None, key_builder, val_builder);
1466        map_builder.keys().append_value("old_name");
1467        map_builder.values().append_value("new_name");
1468        map_builder.append(true).unwrap();
1469        let map_arr = map_builder.finish();
1470
1471        let result = udf
1472            .invoke_with_args(make_args_2(Arc::new(input), Arc::new(map_arr)))
1473            .unwrap();
1474        assert_eq!(
1475            decode_jsonb_result(result, 0),
1476            json!({"keep": 2, "new_name": 1})
1477        );
1478    }
1479
1480    // ── jsonb_pick tests ──────────────────────────────────────
1481
1482    #[test]
1483    fn test_json_ext_pick() {
1484        let udf = JsonbPick::new();
1485        let input = make_jsonb_array(&[json!({"a": 1, "b": 2, "c": 3})]);
1486        let keys = make_string_list(&[&["a", "c"]]);
1487        let result = udf
1488            .invoke_with_args(make_args_2(Arc::new(input), Arc::new(keys)))
1489            .unwrap();
1490        assert_eq!(decode_jsonb_result(result, 0), json!({"a": 1, "c": 3}));
1491    }
1492
1493    // ── jsonb_except tests ────────────────────────────────────
1494
1495    #[test]
1496    fn test_json_ext_except() {
1497        let udf = JsonbExcept::new();
1498        let input = make_jsonb_array(&[json!({"a": 1, "b": 2, "c": 3})]);
1499        let keys = make_string_list(&[&["b"]]);
1500        let result = udf
1501            .invoke_with_args(make_args_2(Arc::new(input), Arc::new(keys)))
1502            .unwrap();
1503        assert_eq!(decode_jsonb_result(result, 0), json!({"a": 1, "c": 3}));
1504    }
1505
1506    // ── jsonb_flatten tests ───────────────────────────────────
1507
1508    #[test]
1509    fn test_json_ext_flatten() {
1510        let udf = JsonbFlatten::new();
1511        let input = make_jsonb_array(&[json!({"a": {"b": 1, "c": [2, 3]}})]);
1512        let sep = StringArray::from(vec!["."]);
1513        let result = udf
1514            .invoke_with_args(make_args_2(Arc::new(input), Arc::new(sep)))
1515            .unwrap();
1516        assert_eq!(
1517            decode_jsonb_result(result, 0),
1518            json!({"a.b": 1, "a.c.0": 2, "a.c.1": 3})
1519        );
1520    }
1521
1522    #[test]
1523    fn test_json_ext_flatten_custom_sep() {
1524        let udf = JsonbFlatten::new();
1525        let input = make_jsonb_array(&[json!({"x": {"y": 42}})]);
1526        let sep = StringArray::from(vec!["/"]);
1527        let result = udf
1528            .invoke_with_args(make_args_2(Arc::new(input), Arc::new(sep)))
1529            .unwrap();
1530        assert_eq!(decode_jsonb_result(result, 0), json!({"x/y": 42}));
1531    }
1532
1533    // ── jsonb_unflatten tests ─────────────────────────────────
1534
1535    #[test]
1536    fn test_json_ext_unflatten() {
1537        let udf = JsonbUnflatten::new();
1538        let input = make_jsonb_array(&[json!({"a.b": 1, "a.c": 2, "d": 3})]);
1539        let sep = StringArray::from(vec!["."]);
1540        let result = udf
1541            .invoke_with_args(make_args_2(Arc::new(input), Arc::new(sep)))
1542            .unwrap();
1543        assert_eq!(
1544            decode_jsonb_result(result, 0),
1545            json!({"a": {"b": 1, "c": 2}, "d": 3})
1546        );
1547    }
1548
1549    // ── json_to_columns tests ─────────────────────────────────
1550
1551    #[test]
1552    fn test_json_ext_to_columns() {
1553        let udf = JsonToColumns::new();
1554        let input = make_jsonb_array(&[json!({"name": "Alice", "age": 30, "active": true})]);
1555        let spec = StringArray::from(vec!["name VARCHAR, age BIGINT"]);
1556        let result = udf
1557            .invoke_with_args(make_args_2(Arc::new(input), Arc::new(spec)))
1558            .unwrap();
1559        assert_eq!(
1560            decode_jsonb_result(result, 0),
1561            json!({"age": 30, "name": "Alice"})
1562        );
1563    }
1564
1565    // ── json_infer_schema tests ───────────────────────────────
1566
1567    #[test]
1568    fn test_json_ext_infer_schema_object() {
1569        let udf = JsonInferSchema::new();
1570        let input = make_jsonb_array(&[json!({"name": "Alice", "age": 30, "active": true})]);
1571        let args = ScalarFunctionArgs {
1572            args: vec![ColumnarValue::Array(Arc::new(input))],
1573            arg_fields: vec![],
1574            number_rows: 0,
1575            return_field: Arc::new(Field::new("output", DataType::Utf8, true)),
1576            config_options: Arc::new(ConfigOptions::default()),
1577        };
1578        let result = udf.invoke_with_args(args).unwrap();
1579        let text = decode_text_result(result, 0);
1580        let parsed: serde_json::Value = serde_json::from_str(&text).unwrap();
1581        assert_eq!(parsed["name"], "VARCHAR");
1582        assert_eq!(parsed["age"], "BIGINT");
1583        assert_eq!(parsed["active"], "BOOLEAN");
1584    }
1585
1586    #[test]
1587    fn test_json_ext_infer_schema_nested() {
1588        let udf = JsonInferSchema::new();
1589        let input = make_jsonb_array(&[json!({"tags": [1, 2], "meta": {"x": 1.5}})]);
1590        let args = ScalarFunctionArgs {
1591            args: vec![ColumnarValue::Array(Arc::new(input))],
1592            arg_fields: vec![],
1593            number_rows: 0,
1594            return_field: Arc::new(Field::new("output", DataType::Utf8, true)),
1595            config_options: Arc::new(ConfigOptions::default()),
1596        };
1597        let result = udf.invoke_with_args(args).unwrap();
1598        let text = decode_text_result(result, 0);
1599        let parsed: serde_json::Value = serde_json::from_str(&text).unwrap();
1600        assert_eq!(parsed["tags"], "ARRAY<BIGINT>");
1601        assert_eq!(parsed["meta"], "STRUCT(x DOUBLE)");
1602    }
1603
1604    #[test]
1605    fn test_json_ext_infer_schema_scalar() {
1606        let udf = JsonInferSchema::new();
1607        let input = make_jsonb_array(&[json!(42)]);
1608        let args = ScalarFunctionArgs {
1609            args: vec![ColumnarValue::Array(Arc::new(input))],
1610            arg_fields: vec![],
1611            number_rows: 0,
1612            return_field: Arc::new(Field::new("output", DataType::Utf8, true)),
1613            config_options: Arc::new(ConfigOptions::default()),
1614        };
1615        let result = udf.invoke_with_args(args).unwrap();
1616        assert_eq!(decode_text_result(result, 0), "BIGINT");
1617    }
1618
1619    // ── Registration test ─────────────────────────────────────
1620
1621    #[test]
1622    fn test_json_ext_all_udfs_register() {
1623        use datafusion_expr::ScalarUDF;
1624
1625        let names: Vec<String> = vec![
1626            ScalarUDF::new_from_impl(JsonbMerge::new())
1627                .name()
1628                .to_owned(),
1629            ScalarUDF::new_from_impl(JsonbDeepMerge::new())
1630                .name()
1631                .to_owned(),
1632            ScalarUDF::new_from_impl(JsonbStripNulls::new())
1633                .name()
1634                .to_owned(),
1635            ScalarUDF::new_from_impl(JsonbRenameKeys::new())
1636                .name()
1637                .to_owned(),
1638            ScalarUDF::new_from_impl(JsonbPick::new()).name().to_owned(),
1639            ScalarUDF::new_from_impl(JsonbExcept::new())
1640                .name()
1641                .to_owned(),
1642            ScalarUDF::new_from_impl(JsonbFlatten::new())
1643                .name()
1644                .to_owned(),
1645            ScalarUDF::new_from_impl(JsonbUnflatten::new())
1646                .name()
1647                .to_owned(),
1648            ScalarUDF::new_from_impl(JsonToColumns::new())
1649                .name()
1650                .to_owned(),
1651            ScalarUDF::new_from_impl(JsonInferSchema::new())
1652                .name()
1653                .to_owned(),
1654        ];
1655        for name in &names {
1656            assert!(!name.is_empty(), "UDF has empty name");
1657        }
1658        assert_eq!(names.len(), 10);
1659    }
1660
1661    // ── Null-handling tests ───────────────────────────────────
1662
1663    #[test]
1664    fn test_json_ext_merge_null_input() {
1665        let udf = JsonbMerge::new();
1666        let left = LargeBinaryArray::new_null(1);
1667        let right = make_jsonb_array(&[json!({"a": 1})]);
1668        let result = udf
1669            .invoke_with_args(make_args_2(Arc::new(left), Arc::new(right)))
1670            .unwrap();
1671        let ColumnarValue::Array(arr) = result else {
1672            panic!("expected array")
1673        };
1674        let bin = arr.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
1675        assert!(bin.is_null(0));
1676    }
1677
1678    // ── Helper: build ListArray<Utf8> ─────────────────────────
1679
1680    fn make_string_list(rows: &[&[&str]]) -> ListArray {
1681        use arrow_array::builder::{ListBuilder, StringBuilder};
1682
1683        let mut builder = ListBuilder::new(StringBuilder::new());
1684        for row in rows {
1685            for &s in *row {
1686                builder.values().append_value(s);
1687            }
1688            builder.append(true);
1689        }
1690        builder.finish()
1691    }
1692}