Skip to main content

datafusion_functions_json/
json_get_json.rs

1use std::sync::Arc;
2
3use datafusion::arrow::array::StringArray;
4use datafusion::arrow::datatypes::{DataType, Field, FieldRef};
5use datafusion::common::Result as DataFusionResult;
6use datafusion::logical_expr::{
7    ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
8};
9
10use crate::common::{get_err, invoke, jiter_json_find, return_type_check, GetError, JsonPath};
11use crate::common_macros::make_udf_function;
12use crate::common_union::json_field_metadata;
13
14make_udf_function!(
15    JsonGetJson,
16    json_get_json,
17    json_data path,
18    r#"Get a nested raw JSON string from a JSON string by its "path""#
19);
20
21#[derive(Debug, PartialEq, Eq, Hash)]
22pub(super) struct JsonGetJson {
23    signature: Signature,
24    aliases: [String; 1],
25}
26
27impl Default for JsonGetJson {
28    fn default() -> Self {
29        Self {
30            signature: Signature::variadic_any(Volatility::Immutable),
31            aliases: ["json_get_json".to_string()],
32        }
33    }
34}
35
36impl ScalarUDFImpl for JsonGetJson {
37    fn name(&self) -> &str {
38        self.aliases[0].as_str()
39    }
40
41    fn signature(&self) -> &Signature {
42        &self.signature
43    }
44
45    fn return_type(&self, arg_types: &[DataType]) -> DataFusionResult<DataType> {
46        return_type_check(arg_types, self.name(), DataType::Utf8)
47    }
48
49    fn return_field_from_args(&self, args: ReturnFieldArgs) -> DataFusionResult<FieldRef> {
50        let arg_types: Vec<DataType> = args.arg_fields.iter().map(|f| f.data_type().clone()).collect();
51        let return_type = self.return_type(&arg_types)?;
52        Ok(Arc::new(
53            Field::new(self.name(), return_type, true).with_metadata(json_field_metadata()),
54        ))
55    }
56
57    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DataFusionResult<ColumnarValue> {
58        invoke::<StringArray>(&args.args, jiter_json_get_json)
59    }
60
61    fn aliases(&self) -> &[String] {
62        &self.aliases
63    }
64
65    fn placement(
66        &self,
67        args: &[datafusion::logical_expr::ExpressionPlacement],
68    ) -> datafusion::logical_expr::ExpressionPlacement {
69        // If the first argument is a column and the remaining arguments are literals (a path)
70        // then we can push this UDF down to the leaf nodes.
71        if args.len() >= 2
72            && matches!(args[0], datafusion::logical_expr::ExpressionPlacement::Column)
73            && args[1..]
74                .iter()
75                .all(|arg| matches!(arg, datafusion::logical_expr::ExpressionPlacement::Literal))
76        {
77            datafusion::logical_expr::ExpressionPlacement::MoveTowardsLeafNodes
78        } else {
79            datafusion::logical_expr::ExpressionPlacement::KeepInPlace
80        }
81    }
82}
83
84fn jiter_json_get_json(opt_json: Option<&str>, path: &[JsonPath]) -> Result<String, GetError> {
85    if let Some((mut jiter, peek)) = jiter_json_find(opt_json, path) {
86        let start = jiter.current_index();
87        jiter.known_skip(peek)?;
88        let object_slice = jiter.slice_to_current(start);
89        let object_string = std::str::from_utf8(object_slice)?;
90        Ok(object_string.to_owned())
91    } else {
92        get_err!()
93    }
94}