datafusion_functions_json/
json_get_json.rs1use std::any::Any;
2use std::sync::Arc;
3
4use datafusion::arrow::array::StringArray;
5use datafusion::arrow::datatypes::{DataType, Field, FieldRef};
6use datafusion::common::Result as DataFusionResult;
7use datafusion::logical_expr::{
8 ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
9};
10
11use crate::common::{get_err, invoke, jiter_json_find, return_type_check, GetError, JsonPath};
12use crate::common_macros::make_udf_function;
13use crate::common_union::json_field_metadata;
14
15make_udf_function!(
16 JsonGetJson,
17 json_get_json,
18 json_data path,
19 r#"Get a nested raw JSON string from a JSON string by its "path""#
20);
21
22#[derive(Debug, PartialEq, Eq, Hash)]
23pub(super) struct JsonGetJson {
24 signature: Signature,
25 aliases: [String; 1],
26}
27
28impl Default for JsonGetJson {
29 fn default() -> Self {
30 Self {
31 signature: Signature::variadic_any(Volatility::Immutable),
32 aliases: ["json_get_json".to_string()],
33 }
34 }
35}
36
37impl ScalarUDFImpl for JsonGetJson {
38 fn as_any(&self) -> &dyn Any {
39 self
40 }
41
42 fn name(&self) -> &str {
43 self.aliases[0].as_str()
44 }
45
46 fn signature(&self) -> &Signature {
47 &self.signature
48 }
49
50 fn return_type(&self, arg_types: &[DataType]) -> DataFusionResult<DataType> {
51 return_type_check(arg_types, self.name(), DataType::Utf8)
52 }
53
54 fn return_field_from_args(&self, args: ReturnFieldArgs) -> DataFusionResult<FieldRef> {
55 let arg_types: Vec<DataType> = args.arg_fields.iter().map(|f| f.data_type().clone()).collect();
56 let return_type = self.return_type(&arg_types)?;
57 Ok(Arc::new(
58 Field::new(self.name(), return_type, true).with_metadata(json_field_metadata()),
59 ))
60 }
61
62 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DataFusionResult<ColumnarValue> {
63 invoke::<StringArray>(&args.args, jiter_json_get_json)
64 }
65
66 fn aliases(&self) -> &[String] {
67 &self.aliases
68 }
69
70 fn placement(
71 &self,
72 args: &[datafusion::logical_expr::ExpressionPlacement],
73 ) -> datafusion::logical_expr::ExpressionPlacement {
74 if args.len() >= 2
77 && matches!(args[0], datafusion::logical_expr::ExpressionPlacement::Column)
78 && args[1..]
79 .iter()
80 .all(|arg| matches!(arg, datafusion::logical_expr::ExpressionPlacement::Literal))
81 {
82 datafusion::logical_expr::ExpressionPlacement::MoveTowardsLeafNodes
83 } else {
84 datafusion::logical_expr::ExpressionPlacement::KeepInPlace
85 }
86 }
87}
88
89fn jiter_json_get_json(opt_json: Option<&str>, path: &[JsonPath]) -> Result<String, GetError> {
90 if let Some((mut jiter, peek)) = jiter_json_find(opt_json, path) {
91 let start = jiter.current_index();
92 jiter.known_skip(peek)?;
93 let object_slice = jiter.slice_to_current(start);
94 let object_string = std::str::from_utf8(object_slice)?;
95 Ok(object_string.to_owned())
96 } else {
97 get_err!()
98 }
99}