datafusion_functions_json/
json_get_array.rs1use std::any::Any;
2use std::sync::Arc;
3
4use datafusion::arrow::array::{ArrayRef, ListBuilder, StringBuilder};
5use datafusion::arrow::datatypes::{DataType, Field};
6use datafusion::common::{Result as DataFusionResult, ScalarValue};
7use datafusion::logical_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility};
8use jiter::Peek;
9
10use crate::common::{get_err, invoke, jiter_json_find, return_type_check, GetError, InvokeResult, JsonPath};
11use crate::common_macros::make_udf_function;
12use crate::common_union::json_field_metadata;
13
14fn list_item_field() -> Field {
15 Field::new("item", DataType::Utf8, true).with_metadata(json_field_metadata())
16}
17
18make_udf_function!(
19 JsonGetArray,
20 json_get_array,
21 json_data path,
22 r#"Get an arrow array from a JSON string by its "path""#
23);
24
25#[derive(Debug, PartialEq, Eq, Hash)]
26pub(super) struct JsonGetArray {
27 signature: Signature,
28 aliases: [String; 1],
29}
30
31impl Default for JsonGetArray {
32 fn default() -> Self {
33 Self {
34 signature: Signature::variadic_any(Volatility::Immutable),
35 aliases: ["json_get_array".to_string()],
36 }
37 }
38}
39
40impl ScalarUDFImpl for JsonGetArray {
41 fn as_any(&self) -> &dyn Any {
42 self
43 }
44
45 fn name(&self) -> &str {
46 self.aliases[0].as_str()
47 }
48
49 fn signature(&self) -> &Signature {
50 &self.signature
51 }
52
53 fn return_type(&self, arg_types: &[DataType]) -> DataFusionResult<DataType> {
54 return_type_check(arg_types, self.name(), DataType::List(Arc::new(list_item_field())))
55 }
56
57 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DataFusionResult<ColumnarValue> {
58 invoke::<BuildArrayList>(&args.args, jiter_json_get_array)
59 }
60
61 fn aliases(&self) -> &[String] {
62 &self.aliases
63 }
64
65 fn placement(
66 &self,
67 args: &[datafusion::logical_expr::ExpressionPlacement],
68 ) -> datafusion::logical_expr::ExpressionPlacement {
69 if args.len() >= 2
72 && matches!(args[0], datafusion::logical_expr::ExpressionPlacement::Column)
73 && args[1..]
74 .iter()
75 .all(|arg| matches!(arg, datafusion::logical_expr::ExpressionPlacement::Literal))
76 {
77 datafusion::logical_expr::ExpressionPlacement::MoveTowardsLeafNodes
78 } else {
79 datafusion::logical_expr::ExpressionPlacement::KeepInPlace
80 }
81 }
82}
83
84#[derive(Debug)]
85struct BuildArrayList;
86
87impl InvokeResult for BuildArrayList {
88 type Item = Vec<String>;
89
90 type Builder = ListBuilder<StringBuilder>;
91
92 const ACCEPT_DICT_RETURN: bool = false;
93
94 fn builder(capacity: usize) -> Self::Builder {
95 let values_builder = StringBuilder::new();
96 ListBuilder::with_capacity(values_builder, capacity).with_field(list_item_field())
97 }
98
99 fn append_value(builder: &mut Self::Builder, value: Option<Self::Item>) {
100 builder.append_option(value.map(|v| v.into_iter().map(Some)));
101 }
102
103 fn finish(mut builder: Self::Builder) -> DataFusionResult<ArrayRef> {
104 Ok(Arc::new(builder.finish()))
105 }
106
107 fn scalar(value: Option<Self::Item>) -> ScalarValue {
108 let mut builder = ListBuilder::new(StringBuilder::new()).with_field(list_item_field());
109
110 if let Some(array_items) = value {
111 for item in array_items {
112 builder.values().append_value(item);
113 }
114
115 builder.append(true);
116 } else {
117 builder.append(false);
118 }
119 let array = builder.finish();
120 ScalarValue::List(Arc::new(array))
121 }
122}
123
124fn jiter_json_get_array(opt_json: Option<&str>, path: &[JsonPath]) -> Result<Vec<String>, GetError> {
125 if let Some((mut jiter, peek)) = jiter_json_find(opt_json, path) {
126 match peek {
127 Peek::Array => {
128 let mut peek_opt = jiter.known_array()?;
129 let mut array_items: Vec<String> = Vec::new();
130
131 while let Some(element_peek) = peek_opt {
132 let start = jiter.current_index();
134 jiter.known_skip(element_peek)?;
135 let slice = jiter.slice_to_current(start);
136 let element_str = std::str::from_utf8(slice)?.to_string();
137
138 array_items.push(element_str);
139 peek_opt = jiter.array_step()?;
140 }
141
142 Ok(array_items)
143 }
144 _ => get_err!(),
145 }
146 } else {
147 get_err!()
148 }
149}