datafusion_functions_nested/
map_entries.rs1use crate::utils::{get_map_entry_field, make_scalar_function};
21use arrow::array::{Array, ArrayRef, ListArray};
22use arrow::datatypes::{DataType, Field, Fields};
23use datafusion_common::utils::take_function_args;
24use datafusion_common::{cast::as_map_array, exec_err, Result};
25use datafusion_expr::{
26 ArrayFunctionSignature, ColumnarValue, Documentation, ScalarUDFImpl, Signature,
27 TypeSignature, Volatility,
28};
29use datafusion_macros::user_doc;
30use std::any::Any;
31use std::sync::Arc;
32
33make_udf_expr_and_func!(
34 MapEntriesFunc,
35 map_entries,
36 map,
37 "Return a list of all entries in the map.",
38 map_entries_udf
39);
40
41#[user_doc(
42 doc_section(label = "Map Functions"),
43 description = "Returns a list of all entries in the map.",
44 syntax_example = "map_entries(map)",
45 sql_example = r#"```sql
46SELECT map_entries(MAP {'a': 1, 'b': NULL, 'c': 3});
47----
48[{'key': a, 'value': 1}, {'key': b, 'value': NULL}, {'key': c, 'value': 3}]
49
50SELECT map_entries(map([100, 5], [42, 43]));
51----
52[{'key': 100, 'value': 42}, {'key': 5, 'value': 43}]
53```"#,
54 argument(
55 name = "map",
56 description = "Map expression. Can be a constant, column, or function, and any combination of map operators."
57 )
58)]
59#[derive(Debug)]
60pub struct MapEntriesFunc {
61 signature: Signature,
62}
63
64impl Default for MapEntriesFunc {
65 fn default() -> Self {
66 Self::new()
67 }
68}
69
70impl MapEntriesFunc {
71 pub fn new() -> Self {
72 Self {
73 signature: Signature::new(
74 TypeSignature::ArraySignature(ArrayFunctionSignature::MapArray),
75 Volatility::Immutable,
76 ),
77 }
78 }
79}
80
81impl ScalarUDFImpl for MapEntriesFunc {
82 fn as_any(&self) -> &dyn Any {
83 self
84 }
85
86 fn name(&self) -> &str {
87 "map_entries"
88 }
89
90 fn signature(&self) -> &Signature {
91 &self.signature
92 }
93
94 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
95 let [map_type] = take_function_args(self.name(), arg_types)?;
96 let map_fields = get_map_entry_field(map_type)?;
97 Ok(DataType::List(Arc::new(Field::new_list_field(
98 DataType::Struct(Fields::from(vec![
99 Field::new(
100 "key",
101 map_fields.first().unwrap().data_type().clone(),
102 false,
103 ),
104 Field::new(
105 "value",
106 map_fields.get(1).unwrap().data_type().clone(),
107 true,
108 ),
109 ])),
110 false,
111 ))))
112 }
113
114 fn invoke_with_args(
115 &self,
116 args: datafusion_expr::ScalarFunctionArgs,
117 ) -> Result<ColumnarValue> {
118 make_scalar_function(map_entries_inner)(&args.args)
119 }
120
121 fn documentation(&self) -> Option<&Documentation> {
122 self.doc()
123 }
124}
125
126fn map_entries_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
127 let [map_arg] = take_function_args("map_entries", args)?;
128
129 let map_array = match map_arg.data_type() {
130 DataType::Map(_, _) => as_map_array(&map_arg)?,
131 _ => return exec_err!("Argument for map_entries should be a map"),
132 };
133
134 Ok(Arc::new(ListArray::new(
135 Arc::new(Field::new_list_field(
136 DataType::Struct(Fields::from(vec![
137 Field::new("key", map_array.key_type().clone(), false),
138 Field::new("value", map_array.value_type().clone(), true),
139 ])),
140 false,
141 )),
142 map_array.offsets().clone(),
143 Arc::new(map_array.entries().clone()),
144 map_array.nulls().cloned(),
145 )))
146}