datafusion_spark/function/map/
map_from_entries.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19
20use crate::function::map::utils::{
21    get_element_type, get_list_offsets, get_list_values,
22    map_from_keys_values_offsets_nulls, map_type_from_key_value_types,
23};
24use arrow::array::{Array, ArrayRef, NullBufferBuilder, StructArray};
25use arrow::buffer::NullBuffer;
26use arrow::datatypes::DataType;
27use datafusion_common::utils::take_function_args;
28use datafusion_common::{exec_err, Result};
29use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
30use datafusion_functions::utils::make_scalar_function;
31
32/// Spark-compatible `map_from_entries` expression
33/// <https://spark.apache.org/docs/latest/api/sql/index.html#map_from_entries>
34#[derive(Debug, PartialEq, Eq, Hash)]
35pub struct MapFromEntries {
36    signature: Signature,
37}
38
39impl Default for MapFromEntries {
40    fn default() -> Self {
41        Self::new()
42    }
43}
44
45impl MapFromEntries {
46    pub fn new() -> Self {
47        Self {
48            signature: Signature::array(Volatility::Immutable),
49        }
50    }
51}
52
53impl ScalarUDFImpl for MapFromEntries {
54    fn as_any(&self) -> &dyn Any {
55        self
56    }
57
58    fn name(&self) -> &str {
59        "map_from_entries"
60    }
61
62    fn signature(&self) -> &Signature {
63        &self.signature
64    }
65
66    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
67        let [entries_type] = take_function_args("map_from_entries", arg_types)?;
68        let entries_element_type = get_element_type(entries_type)?;
69        let (keys_type, values_type) = match entries_element_type {
70            DataType::Struct(fields) if fields.len() == 2 => {
71                Ok((fields[0].data_type(), fields[1].data_type()))
72            }
73            wrong_type => exec_err!(
74                "map_from_entries: expected array<struct<key, value>>, got {:?}",
75                wrong_type
76            ),
77        }?;
78        Ok(map_type_from_key_value_types(keys_type, values_type))
79    }
80
81    fn invoke_with_args(
82        &self,
83        args: datafusion_expr::ScalarFunctionArgs,
84    ) -> Result<ColumnarValue> {
85        make_scalar_function(map_from_entries_inner, vec![])(&args.args)
86    }
87}
88
89fn map_from_entries_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
90    let [entries] = take_function_args("map_from_entries", args)?;
91    let entries_offsets = get_list_offsets(entries)?;
92    let entries_values = get_list_values(entries)?;
93
94    let (flat_keys, flat_values) =
95        match entries_values.as_any().downcast_ref::<StructArray>() {
96            Some(a) => Ok((a.column(0), a.column(1))),
97            None => exec_err!(
98                "map_from_entries: expected array<struct<key, value>>, got {:?}",
99                entries_values.data_type()
100            ),
101        }?;
102
103    let entries_with_nulls = entries_values.nulls().and_then(|entries_inner_nulls| {
104        let mut builder = NullBufferBuilder::new_with_len(0);
105        let mut cur_offset = entries_offsets
106            .first()
107            .map(|offset| *offset as usize)
108            .unwrap_or(0);
109
110        for next_offset in entries_offsets.iter().skip(1) {
111            let num_entries = *next_offset as usize - cur_offset;
112            builder.append(
113                entries_inner_nulls
114                    .slice(cur_offset, num_entries)
115                    .null_count()
116                    == 0,
117            );
118            cur_offset = *next_offset as usize;
119        }
120        builder.finish()
121    });
122
123    let res_nulls = NullBuffer::union(entries.nulls(), entries_with_nulls.as_ref());
124
125    map_from_keys_values_offsets_nulls(
126        flat_keys,
127        flat_values,
128        &entries_offsets,
129        &entries_offsets,
130        None,
131        res_nulls.as_ref(),
132    )
133}