datafusion_functions_json/
json_get_json.rs1use std::sync::Arc;
2
3use datafusion::arrow::array::StringArray;
4use datafusion::arrow::datatypes::{DataType, Field, FieldRef};
5use datafusion::common::Result as DataFusionResult;
6use datafusion::logical_expr::{
7 ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
8};
9
10use crate::common::{get_err, invoke, jiter_json_find, return_type_check, GetError, JsonPath};
11use crate::common_macros::make_udf_function;
12use crate::common_union::json_field_metadata;
13
14make_udf_function!(
15 JsonGetJson,
16 json_get_json,
17 json_data path,
18 r#"Get a nested raw JSON string from a JSON string by its "path""#
19);
20
21#[derive(Debug, PartialEq, Eq, Hash)]
22pub(super) struct JsonGetJson {
23 signature: Signature,
24 aliases: [String; 1],
25}
26
27impl Default for JsonGetJson {
28 fn default() -> Self {
29 Self {
30 signature: Signature::variadic_any(Volatility::Immutable),
31 aliases: ["json_get_json".to_string()],
32 }
33 }
34}
35
36impl ScalarUDFImpl for JsonGetJson {
37 fn name(&self) -> &str {
38 self.aliases[0].as_str()
39 }
40
41 fn signature(&self) -> &Signature {
42 &self.signature
43 }
44
45 fn return_type(&self, arg_types: &[DataType]) -> DataFusionResult<DataType> {
46 return_type_check(arg_types, self.name(), DataType::Utf8)
47 }
48
49 fn return_field_from_args(&self, args: ReturnFieldArgs) -> DataFusionResult<FieldRef> {
50 let arg_types: Vec<DataType> = args.arg_fields.iter().map(|f| f.data_type().clone()).collect();
51 let return_type = self.return_type(&arg_types)?;
52 Ok(Arc::new(
53 Field::new(self.name(), return_type, true).with_metadata(json_field_metadata()),
54 ))
55 }
56
57 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DataFusionResult<ColumnarValue> {
58 invoke::<StringArray>(&args.args, jiter_json_get_json)
59 }
60
61 fn aliases(&self) -> &[String] {
62 &self.aliases
63 }
64
65 fn placement(
66 &self,
67 args: &[datafusion::logical_expr::ExpressionPlacement],
68 ) -> datafusion::logical_expr::ExpressionPlacement {
69 if args.len() >= 2
72 && matches!(args[0], datafusion::logical_expr::ExpressionPlacement::Column)
73 && args[1..]
74 .iter()
75 .all(|arg| matches!(arg, datafusion::logical_expr::ExpressionPlacement::Literal))
76 {
77 datafusion::logical_expr::ExpressionPlacement::MoveTowardsLeafNodes
78 } else {
79 datafusion::logical_expr::ExpressionPlacement::KeepInPlace
80 }
81 }
82}
83
84fn jiter_json_get_json(opt_json: Option<&str>, path: &[JsonPath]) -> Result<String, GetError> {
85 if let Some((mut jiter, peek)) = jiter_json_find(opt_json, path) {
86 let start = jiter.current_index();
87 jiter.known_skip(peek)?;
88 let object_slice = jiter.slice_to_current(start);
89 let object_string = std::str::from_utf8(object_slice)?;
90 Ok(object_string.to_owned())
91 } else {
92 get_err!()
93 }
94}