1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
1718//! [`ScalarUDFImpl`] definitions for map_extract functions.
1920use crate::utils::{get_map_entry_field, make_scalar_function};
21use arrow::array::{
22 make_array, Array, ArrayRef, Capacities, ListArray, MapArray, MutableArrayData,
23};
24use arrow::buffer::OffsetBuffer;
25use arrow::datatypes::{DataType, Field};
26use datafusion_common::utils::take_function_args;
27use datafusion_common::{cast::as_map_array, exec_err, Result};
28use datafusion_expr::{
29 ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
30};
31use datafusion_macros::user_doc;
32use std::any::Any;
33use std::sync::Arc;
34use std::vec;
3536// Create static instances of ScalarUDFs for each function
37make_udf_expr_and_func!(
38 MapExtract,
39 map_extract,
40 map key,
41"Return a list containing the value for a given key or an empty list if the key is not contained in the map.",
42 map_extract_udf
43);
4445#[user_doc(
46 doc_section(label = "Map Functions"),
47 description = "Returns a list containing the value for the given key or an empty list if the key is not present in the map.",
48 syntax_example = "map_extract(map, key)",
49 sql_example = r#"```sql
50SELECT map_extract(MAP {'a': 1, 'b': NULL, 'c': 3}, 'a');
51----
52[1]
5354SELECT map_extract(MAP {1: 'one', 2: 'two'}, 2);
55----
56['two']
5758SELECT map_extract(MAP {'x': 10, 'y': NULL, 'z': 30}, 'y');
59----
60[]
61```"#,
62 argument(
63 name = "map",
64 description = "Map expression. Can be a constant, column, or function, and any combination of map operators."
65),
66 argument(
67 name = "key",
68 description = "Key to extract from the map. Can be a constant, column, or function, any combination of arithmetic or string operators, or a named expression of the previously listed."
69)
70)]
71#[derive(Debug)]
72pub struct MapExtract {
73 signature: Signature,
74 aliases: Vec<String>,
75}
7677impl Default for MapExtract {
78fn default() -> Self {
79Self::new()
80 }
81}
8283impl MapExtract {
84pub fn new() -> Self {
85Self {
86 signature: Signature::user_defined(Volatility::Immutable),
87 aliases: vec![String::from("element_at")],
88 }
89 }
90}
9192impl ScalarUDFImpl for MapExtract {
93fn as_any(&self) -> &dyn Any {
94self
95}
96fn name(&self) -> &str {
97"map_extract"
98}
99100fn signature(&self) -> &Signature {
101&self.signature
102 }
103104fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
105let [map_type, _] = take_function_args(self.name(), arg_types)?;
106let map_fields = get_map_entry_field(map_type)?;
107Ok(DataType::List(Arc::new(Field::new_list_field(
108 map_fields.last().unwrap().data_type().clone(),
109true,
110 ))))
111 }
112113fn invoke_with_args(
114&self,
115 args: datafusion_expr::ScalarFunctionArgs,
116 ) -> Result<ColumnarValue> {
117 make_scalar_function(map_extract_inner)(&args.args)
118 }
119120fn aliases(&self) -> &[String] {
121&self.aliases
122 }
123124fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
125let [map_type, _] = take_function_args(self.name(), arg_types)?;
126127let field = get_map_entry_field(map_type)?;
128Ok(vec![
129 map_type.clone(),
130 field.first().unwrap().data_type().clone(),
131 ])
132 }
133134fn documentation(&self) -> Option<&Documentation> {
135self.doc()
136 }
137}
138139fn general_map_extract_inner(
140 map_array: &MapArray,
141 query_keys_array: &dyn Array,
142) -> Result<ArrayRef> {
143let keys = map_array.keys();
144let mut offsets = vec![0_i32];
145146let values = map_array.values();
147let original_data = values.to_data();
148let capacity = Capacities::Array(original_data.len());
149150let mut mutable =
151 MutableArrayData::with_capacities(vec![&original_data], true, capacity);
152153for (row_index, offset_window) in map_array.value_offsets().windows(2).enumerate() {
154let start = offset_window[0] as usize;
155let end = offset_window[1] as usize;
156let len = end - start;
157158let query_key = query_keys_array.slice(row_index, 1);
159160let value_index =
161 (0..len).find(|&i| keys.slice(start + i, 1).as_ref() == query_key.as_ref());
162163match value_index {
164Some(index) => {
165 mutable.extend(0, start + index, start + index + 1);
166 }
167None => {
168 mutable.extend_nulls(1);
169 }
170 }
171 offsets.push(offsets[row_index] + 1);
172 }
173174let data = mutable.freeze();
175176Ok(Arc::new(ListArray::new(
177 Arc::new(Field::new_list_field(map_array.value_type().clone(), true)),
178 OffsetBuffer::<i32>::new(offsets.into()),
179 Arc::new(make_array(data)),
180None,
181 )))
182}
183184fn map_extract_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
185let [map_arg, key_arg] = take_function_args("map_extract", args)?;
186187let map_array = match map_arg.data_type() {
188 DataType::Map(_, _) => as_map_array(&map_arg)?,
189_ => return exec_err!("The first argument in map_extract must be a map"),
190 };
191192let key_type = map_array.key_type();
193194if key_type != key_arg.data_type() {
195return exec_err!(
196"The key type {} does not match the map key type {}",
197 key_arg.data_type(),
198 key_type
199 );
200 }
201202 general_map_extract_inner(map_array, key_arg)
203}