Skip to main content

datafusion_functions_json/
json_get_array.rs

1use std::any::Any;
2use std::sync::Arc;
3
4use datafusion::arrow::array::{ArrayRef, ListBuilder, StringBuilder};
5use datafusion::arrow::datatypes::{DataType, Field};
6use datafusion::common::{Result as DataFusionResult, ScalarValue};
7use datafusion::logical_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility};
8use jiter::Peek;
9
10use crate::common::{get_err, invoke, jiter_json_find, return_type_check, GetError, InvokeResult, JsonPath};
11use crate::common_macros::make_udf_function;
12use crate::common_union::json_field_metadata;
13
14fn list_item_field() -> Field {
15    Field::new("item", DataType::Utf8, true).with_metadata(json_field_metadata())
16}
17
18make_udf_function!(
19    JsonGetArray,
20    json_get_array,
21    json_data path,
22    r#"Get an arrow array from a JSON string by its "path""#
23);
24
25#[derive(Debug, PartialEq, Eq, Hash)]
26pub(super) struct JsonGetArray {
27    signature: Signature,
28    aliases: [String; 1],
29}
30
31impl Default for JsonGetArray {
32    fn default() -> Self {
33        Self {
34            signature: Signature::variadic_any(Volatility::Immutable),
35            aliases: ["json_get_array".to_string()],
36        }
37    }
38}
39
40impl ScalarUDFImpl for JsonGetArray {
41    fn as_any(&self) -> &dyn Any {
42        self
43    }
44
45    fn name(&self) -> &str {
46        self.aliases[0].as_str()
47    }
48
49    fn signature(&self) -> &Signature {
50        &self.signature
51    }
52
53    fn return_type(&self, arg_types: &[DataType]) -> DataFusionResult<DataType> {
54        return_type_check(arg_types, self.name(), DataType::List(Arc::new(list_item_field())))
55    }
56
57    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DataFusionResult<ColumnarValue> {
58        invoke::<BuildArrayList>(&args.args, jiter_json_get_array)
59    }
60
61    fn aliases(&self) -> &[String] {
62        &self.aliases
63    }
64
65    fn placement(
66        &self,
67        args: &[datafusion::logical_expr::ExpressionPlacement],
68    ) -> datafusion::logical_expr::ExpressionPlacement {
69        // If the first argument is a column and the remaining arguments are literals (a path)
70        // then we can push this UDF down to the leaf nodes.
71        if args.len() >= 2
72            && matches!(args[0], datafusion::logical_expr::ExpressionPlacement::Column)
73            && args[1..]
74                .iter()
75                .all(|arg| matches!(arg, datafusion::logical_expr::ExpressionPlacement::Literal))
76        {
77            datafusion::logical_expr::ExpressionPlacement::MoveTowardsLeafNodes
78        } else {
79            datafusion::logical_expr::ExpressionPlacement::KeepInPlace
80        }
81    }
82}
83
84#[derive(Debug)]
85struct BuildArrayList;
86
87impl InvokeResult for BuildArrayList {
88    type Item = Vec<String>;
89
90    type Builder = ListBuilder<StringBuilder>;
91
92    const ACCEPT_DICT_RETURN: bool = false;
93
94    fn builder(capacity: usize) -> Self::Builder {
95        let values_builder = StringBuilder::new();
96        ListBuilder::with_capacity(values_builder, capacity).with_field(list_item_field())
97    }
98
99    fn append_value(builder: &mut Self::Builder, value: Option<Self::Item>) {
100        builder.append_option(value.map(|v| v.into_iter().map(Some)));
101    }
102
103    fn finish(mut builder: Self::Builder) -> DataFusionResult<ArrayRef> {
104        Ok(Arc::new(builder.finish()))
105    }
106
107    fn scalar(value: Option<Self::Item>) -> ScalarValue {
108        let mut builder = ListBuilder::new(StringBuilder::new()).with_field(list_item_field());
109
110        if let Some(array_items) = value {
111            for item in array_items {
112                builder.values().append_value(item);
113            }
114
115            builder.append(true);
116        } else {
117            builder.append(false);
118        }
119        let array = builder.finish();
120        ScalarValue::List(Arc::new(array))
121    }
122}
123
124fn jiter_json_get_array(opt_json: Option<&str>, path: &[JsonPath]) -> Result<Vec<String>, GetError> {
125    if let Some((mut jiter, peek)) = jiter_json_find(opt_json, path) {
126        match peek {
127            Peek::Array => {
128                let mut peek_opt = jiter.known_array()?;
129                let mut array_items: Vec<String> = Vec::new();
130
131                while let Some(element_peek) = peek_opt {
132                    // Get the raw JSON slice for each array element
133                    let start = jiter.current_index();
134                    jiter.known_skip(element_peek)?;
135                    let slice = jiter.slice_to_current(start);
136                    let element_str = std::str::from_utf8(slice)?.to_string();
137
138                    array_items.push(element_str);
139                    peek_opt = jiter.array_step()?;
140                }
141
142                Ok(array_items)
143            }
144            _ => get_err!(),
145        }
146    } else {
147        get_err!()
148    }
149}