datafusion_functions_nested/
map_entries.rs1use crate::utils::{get_map_entry_field, make_scalar_function};
21use arrow::array::{Array, ArrayRef, ListArray};
22use arrow::datatypes::{DataType, Field, Fields};
23use datafusion_common::utils::take_function_args;
24use datafusion_common::{Result, cast::as_map_array, exec_err};
25use datafusion_expr::{
26 ArrayFunctionSignature, ColumnarValue, Documentation, ScalarFunctionArgs,
27 ScalarUDFImpl, Signature, TypeSignature, Volatility,
28};
29use datafusion_macros::user_doc;
30use std::sync::Arc;
31
32make_udf_expr_and_func!(
33 MapEntriesFunc,
34 map_entries,
35 map,
36 "Return a list of all entries in the map.",
37 map_entries_udf
38);
39
40#[user_doc(
41 doc_section(label = "Map Functions"),
42 description = "Returns a list of all entries in the map.",
43 syntax_example = "map_entries(map)",
44 sql_example = r#"```sql
45SELECT map_entries(MAP {'a': 1, 'b': NULL, 'c': 3});
46----
47[{'key': a, 'value': 1}, {'key': b, 'value': NULL}, {'key': c, 'value': 3}]
48
49SELECT map_entries(map([100, 5], [42, 43]));
50----
51[{'key': 100, 'value': 42}, {'key': 5, 'value': 43}]
52```"#,
53 argument(
54 name = "map",
55 description = "Map expression. Can be a constant, column, or function, and any combination of map operators."
56 )
57)]
58#[derive(Debug, PartialEq, Eq, Hash)]
59pub struct MapEntriesFunc {
60 signature: Signature,
61}
62
63impl Default for MapEntriesFunc {
64 fn default() -> Self {
65 Self::new()
66 }
67}
68
69impl MapEntriesFunc {
70 pub fn new() -> Self {
71 Self {
72 signature: Signature::new(
73 TypeSignature::ArraySignature(ArrayFunctionSignature::MapArray),
74 Volatility::Immutable,
75 ),
76 }
77 }
78}
79
80impl ScalarUDFImpl for MapEntriesFunc {
81 fn name(&self) -> &str {
82 "map_entries"
83 }
84
85 fn signature(&self) -> &Signature {
86 &self.signature
87 }
88
89 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
90 let [map_type] = take_function_args(self.name(), arg_types)?;
91 let map_fields = get_map_entry_field(map_type)?;
92 Ok(DataType::List(Arc::new(Field::new_list_field(
93 DataType::Struct(Fields::from(vec![
94 Field::new(
95 "key",
96 map_fields.first().unwrap().data_type().clone(),
97 false,
98 ),
99 Field::new(
100 "value",
101 map_fields.get(1).unwrap().data_type().clone(),
102 true,
103 ),
104 ])),
105 false,
106 ))))
107 }
108
109 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
110 make_scalar_function(map_entries_inner)(&args.args)
111 }
112
113 fn documentation(&self) -> Option<&Documentation> {
114 self.doc()
115 }
116}
117
118fn map_entries_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
119 let [map_arg] = take_function_args("map_entries", args)?;
120
121 let map_array = match map_arg.data_type() {
122 DataType::Map(_, _) => as_map_array(&map_arg)?,
123 _ => return exec_err!("Argument for map_entries should be a map"),
124 };
125
126 Ok(Arc::new(ListArray::new(
127 Arc::new(Field::new_list_field(
128 DataType::Struct(Fields::from(vec![
129 Field::new("key", map_array.key_type().clone(), false),
130 Field::new("value", map_array.value_type().clone(), true),
131 ])),
132 false,
133 )),
134 map_array.offsets().clone(),
135 Arc::new(map_array.entries().clone()),
136 map_array.nulls().cloned(),
137 )))
138}