Skip to main content

datafusion_functions_nested/
map_extract.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ScalarUDFImpl`] definitions for map_extract functions.
19
20use crate::utils::{get_map_entry_field, make_scalar_function};
21use arrow::array::{
22    Array, ArrayRef, Capacities, ListArray, MapArray, MutableArrayData, make_array,
23};
24use arrow::buffer::OffsetBuffer;
25use arrow::datatypes::{DataType, Field};
26use datafusion_common::utils::take_function_args;
27use datafusion_common::{Result, cast::as_map_array, exec_err};
28use datafusion_expr::{
29    ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature,
30    Volatility,
31};
32use datafusion_macros::user_doc;
33use std::sync::Arc;
34use std::vec;
35
36// Create static instances of ScalarUDFs for each function
37make_udf_expr_and_func!(
38    MapExtract,
39    map_extract,
40    map key,
41    "Return a list containing the value for a given key or an empty list if the key is not contained in the map.",
42    map_extract_udf
43);
44
45#[user_doc(
46    doc_section(label = "Map Functions"),
47    description = "Returns a list containing the value for the given key or an empty list if the key is not present in the map.",
48    syntax_example = "map_extract(map, key)",
49    sql_example = r#"```sql
50SELECT map_extract(MAP {'a': 1, 'b': NULL, 'c': 3}, 'a');
51----
52[1]
53
54SELECT map_extract(MAP {1: 'one', 2: 'two'}, 2);
55----
56['two']
57
58SELECT map_extract(MAP {'x': 10, 'y': NULL, 'z': 30}, 'y');
59----
60[NULL]
61
62-- non-existing key
63SELECT map_extract(MAP {'x': 10, 'y': NULL, 'z': 30}, 'a');
64----
65[]
66```"#,
67    argument(
68        name = "map",
69        description = "Map expression. Can be a constant, column, or function, and any combination of map operators."
70    ),
71    argument(
72        name = "key",
73        description = "Key to extract from the map. Can be a constant, column, or function, any combination of arithmetic or string operators, or a named expression of the previously listed."
74    )
75)]
76#[derive(Debug, PartialEq, Eq, Hash)]
77pub struct MapExtract {
78    signature: Signature,
79    aliases: Vec<String>,
80}
81
82impl Default for MapExtract {
83    fn default() -> Self {
84        Self::new()
85    }
86}
87
88impl MapExtract {
89    pub fn new() -> Self {
90        Self {
91            signature: Signature::user_defined(Volatility::Immutable),
92            aliases: vec![String::from("element_at")],
93        }
94    }
95}
96
97impl ScalarUDFImpl for MapExtract {
98    fn name(&self) -> &str {
99        "map_extract"
100    }
101
102    fn signature(&self) -> &Signature {
103        &self.signature
104    }
105
106    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
107        let [map_type, _] = take_function_args(self.name(), arg_types)?;
108        let map_fields = get_map_entry_field(map_type)?;
109        Ok(DataType::List(Arc::new(Field::new_list_field(
110            map_fields.last().unwrap().data_type().clone(),
111            true,
112        ))))
113    }
114
115    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
116        make_scalar_function(map_extract_inner)(&args.args)
117    }
118
119    fn aliases(&self) -> &[String] {
120        &self.aliases
121    }
122
123    fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
124        let [map_type, _] = take_function_args(self.name(), arg_types)?;
125
126        let field = get_map_entry_field(map_type)?;
127        Ok(vec![
128            map_type.clone(),
129            field.first().unwrap().data_type().clone(),
130        ])
131    }
132
133    fn documentation(&self) -> Option<&Documentation> {
134        self.doc()
135    }
136}
137
138fn general_map_extract_inner(
139    map_array: &MapArray,
140    query_keys_array: &dyn Array,
141) -> Result<ArrayRef> {
142    let keys = map_array.keys();
143    let mut offsets = vec![0_i32];
144
145    let values = map_array.values();
146    let original_data = values.to_data();
147    let capacity = Capacities::Array(original_data.len());
148
149    let mut mutable =
150        MutableArrayData::with_capacities(vec![&original_data], true, capacity);
151
152    for (row_index, offset_window) in map_array.value_offsets().windows(2).enumerate() {
153        let start = offset_window[0] as usize;
154        let end = offset_window[1] as usize;
155        let len = end - start;
156
157        let query_key = query_keys_array.slice(row_index, 1);
158
159        let value_index =
160            (0..len).find(|&i| keys.slice(start + i, 1).as_ref() == query_key.as_ref());
161
162        match value_index {
163            Some(index) => {
164                mutable.extend(0, start + index, start + index + 1);
165            }
166            None => {
167                mutable.extend_nulls(1);
168            }
169        }
170        offsets.push(offsets[row_index] + 1);
171    }
172
173    let data = mutable.freeze();
174
175    Ok(Arc::new(ListArray::new(
176        Arc::new(Field::new_list_field(map_array.value_type().clone(), true)),
177        OffsetBuffer::<i32>::new(offsets.into()),
178        Arc::new(make_array(data)),
179        None,
180    )))
181}
182
183fn map_extract_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
184    let [map_arg, key_arg] = take_function_args("map_extract", args)?;
185
186    let map_array = match map_arg.data_type() {
187        DataType::Map(_, _) => as_map_array(&map_arg)?,
188        _ => return exec_err!("The first argument in map_extract must be a map"),
189    };
190
191    let key_type = map_array.key_type();
192
193    if key_type != key_arg.data_type() {
194        return exec_err!(
195            "The key type {} does not match the map key type {}",
196            key_arg.data_type(),
197            key_type
198        );
199    }
200
201    general_map_extract_inner(map_array, key_arg)
202}