datafusion_spark/function/map/
map_from_entries.rs1use std::any::Any;
19
20use crate::function::map::utils::{
21 get_element_type, get_list_offsets, get_list_values,
22 map_from_keys_values_offsets_nulls, map_type_from_key_value_types,
23};
24use arrow::array::{Array, ArrayRef, NullBufferBuilder, StructArray};
25use arrow::buffer::NullBuffer;
26use arrow::datatypes::DataType;
27use datafusion_common::utils::take_function_args;
28use datafusion_common::{exec_err, Result};
29use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
30use datafusion_functions::utils::make_scalar_function;
31
32#[derive(Debug, PartialEq, Eq, Hash)]
35pub struct MapFromEntries {
36 signature: Signature,
37}
38
39impl Default for MapFromEntries {
40 fn default() -> Self {
41 Self::new()
42 }
43}
44
45impl MapFromEntries {
46 pub fn new() -> Self {
47 Self {
48 signature: Signature::array(Volatility::Immutable),
49 }
50 }
51}
52
53impl ScalarUDFImpl for MapFromEntries {
54 fn as_any(&self) -> &dyn Any {
55 self
56 }
57
58 fn name(&self) -> &str {
59 "map_from_entries"
60 }
61
62 fn signature(&self) -> &Signature {
63 &self.signature
64 }
65
66 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
67 let [entries_type] = take_function_args("map_from_entries", arg_types)?;
68 let entries_element_type = get_element_type(entries_type)?;
69 let (keys_type, values_type) = match entries_element_type {
70 DataType::Struct(fields) if fields.len() == 2 => {
71 Ok((fields[0].data_type(), fields[1].data_type()))
72 }
73 wrong_type => exec_err!(
74 "map_from_entries: expected array<struct<key, value>>, got {:?}",
75 wrong_type
76 ),
77 }?;
78 Ok(map_type_from_key_value_types(keys_type, values_type))
79 }
80
81 fn invoke_with_args(
82 &self,
83 args: datafusion_expr::ScalarFunctionArgs,
84 ) -> Result<ColumnarValue> {
85 make_scalar_function(map_from_entries_inner, vec![])(&args.args)
86 }
87}
88
89fn map_from_entries_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
90 let [entries] = take_function_args("map_from_entries", args)?;
91 let entries_offsets = get_list_offsets(entries)?;
92 let entries_values = get_list_values(entries)?;
93
94 let (flat_keys, flat_values) =
95 match entries_values.as_any().downcast_ref::<StructArray>() {
96 Some(a) => Ok((a.column(0), a.column(1))),
97 None => exec_err!(
98 "map_from_entries: expected array<struct<key, value>>, got {:?}",
99 entries_values.data_type()
100 ),
101 }?;
102
103 let entries_with_nulls = entries_values.nulls().and_then(|entries_inner_nulls| {
104 let mut builder = NullBufferBuilder::new_with_len(0);
105 let mut cur_offset = entries_offsets
106 .first()
107 .map(|offset| *offset as usize)
108 .unwrap_or(0);
109
110 for next_offset in entries_offsets.iter().skip(1) {
111 let num_entries = *next_offset as usize - cur_offset;
112 builder.append(
113 entries_inner_nulls
114 .slice(cur_offset, num_entries)
115 .null_count()
116 == 0,
117 );
118 cur_offset = *next_offset as usize;
119 }
120 builder.finish()
121 });
122
123 let res_nulls = NullBuffer::union(entries.nulls(), entries_with_nulls.as_ref());
124
125 map_from_keys_values_offsets_nulls(
126 flat_keys,
127 flat_values,
128 &entries_offsets,
129 &entries_offsets,
130 None,
131 res_nulls.as_ref(),
132 )
133}