Skip to main content

datafusion_functions_json/
json_get_json.rs

1use std::any::Any;
2use std::sync::Arc;
3
4use datafusion::arrow::array::StringArray;
5use datafusion::arrow::datatypes::{DataType, Field, FieldRef};
6use datafusion::common::Result as DataFusionResult;
7use datafusion::logical_expr::{
8    ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
9};
10
11use crate::common::{get_err, invoke, jiter_json_find, return_type_check, GetError, JsonPath};
12use crate::common_macros::make_udf_function;
13use crate::common_union::json_field_metadata;
14
15make_udf_function!(
16    JsonGetJson,
17    json_get_json,
18    json_data path,
19    r#"Get a nested raw JSON string from a JSON string by its "path""#
20);
21
22#[derive(Debug, PartialEq, Eq, Hash)]
23pub(super) struct JsonGetJson {
24    signature: Signature,
25    aliases: [String; 1],
26}
27
28impl Default for JsonGetJson {
29    fn default() -> Self {
30        Self {
31            signature: Signature::variadic_any(Volatility::Immutable),
32            aliases: ["json_get_json".to_string()],
33        }
34    }
35}
36
37impl ScalarUDFImpl for JsonGetJson {
38    fn as_any(&self) -> &dyn Any {
39        self
40    }
41
42    fn name(&self) -> &str {
43        self.aliases[0].as_str()
44    }
45
46    fn signature(&self) -> &Signature {
47        &self.signature
48    }
49
50    fn return_type(&self, arg_types: &[DataType]) -> DataFusionResult<DataType> {
51        return_type_check(arg_types, self.name(), DataType::Utf8)
52    }
53
54    fn return_field_from_args(&self, args: ReturnFieldArgs) -> DataFusionResult<FieldRef> {
55        let arg_types: Vec<DataType> = args.arg_fields.iter().map(|f| f.data_type().clone()).collect();
56        let return_type = self.return_type(&arg_types)?;
57        Ok(Arc::new(
58            Field::new(self.name(), return_type, true).with_metadata(json_field_metadata()),
59        ))
60    }
61
62    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DataFusionResult<ColumnarValue> {
63        invoke::<StringArray>(&args.args, jiter_json_get_json)
64    }
65
66    fn aliases(&self) -> &[String] {
67        &self.aliases
68    }
69
70    fn placement(
71        &self,
72        args: &[datafusion::logical_expr::ExpressionPlacement],
73    ) -> datafusion::logical_expr::ExpressionPlacement {
74        // If the first argument is a column and the remaining arguments are literals (a path)
75        // then we can push this UDF down to the leaf nodes.
76        if args.len() >= 2
77            && matches!(args[0], datafusion::logical_expr::ExpressionPlacement::Column)
78            && args[1..]
79                .iter()
80                .all(|arg| matches!(arg, datafusion::logical_expr::ExpressionPlacement::Literal))
81        {
82            datafusion::logical_expr::ExpressionPlacement::MoveTowardsLeafNodes
83        } else {
84            datafusion::logical_expr::ExpressionPlacement::KeepInPlace
85        }
86    }
87}
88
89fn jiter_json_get_json(opt_json: Option<&str>, path: &[JsonPath]) -> Result<String, GetError> {
90    if let Some((mut jiter, peek)) = jiter_json_find(opt_json, path) {
91        let start = jiter.current_index();
92        jiter.known_skip(peek)?;
93        let object_slice = jiter.slice_to_current(start);
94        let object_string = std::str::from_utf8(object_slice)?;
95        Ok(object_string.to_owned())
96    } else {
97        get_err!()
98    }
99}