1use std::any::Any;
19use std::collections::VecDeque;
20use std::sync::Arc;
21
22use arrow::array::{Array, ArrayData, ArrayRef, MapArray, OffsetSizeTrait, StructArray};
23use arrow::buffer::Buffer;
24use arrow::datatypes::{DataType, Field, SchemaBuilder, ToByteSlice};
25
26use datafusion_common::utils::{fixed_size_list_to_arrays, list_to_arrays};
27use datafusion_common::{
28 HashSet, Result, ScalarValue, exec_err, utils::take_function_args,
29};
30use datafusion_expr::expr::ScalarFunction;
31use datafusion_expr::{
32 ColumnarValue, Documentation, Expr, ScalarUDFImpl, Signature, Volatility,
33};
34use datafusion_macros::user_doc;
35
36use crate::make_array::make_array;
37
38pub fn map(keys: Vec<Expr>, values: Vec<Expr>) -> Expr {
40 let keys = make_array(keys);
41 let values = make_array(values);
42 Expr::ScalarFunction(ScalarFunction::new_udf(map_udf(), vec![keys, values]))
43}
44
45create_func!(MapFunc, map_udf);
46
47fn can_evaluate_to_const(args: &[ColumnarValue]) -> bool {
55 args.iter()
56 .all(|arg| matches!(arg, ColumnarValue::Scalar(_)))
57}
58
59fn make_map_batch(args: &[ColumnarValue]) -> Result<ColumnarValue> {
60 let [keys_arg, values_arg] = take_function_args("make_map", args)?;
61
62 let can_evaluate_to_const = can_evaluate_to_const(args);
63
64 let keys = get_first_array_ref(keys_arg)?;
65 let key_array = keys.as_ref();
66
67 match keys_arg {
68 ColumnarValue::Array(_) => {
69 let row_keys = match key_array.data_type() {
70 DataType::List(_) => list_to_arrays::<i32>(&keys),
71 DataType::LargeList(_) => list_to_arrays::<i64>(&keys),
72 DataType::FixedSizeList(_, _) => fixed_size_list_to_arrays(&keys),
73 data_type => {
74 return exec_err!(
75 "Expected list, large_list or fixed_size_list, got {:?}",
76 data_type
77 );
78 }
79 };
80
81 row_keys
82 .iter()
83 .try_for_each(|key| validate_map_keys(key.as_ref()))?;
84 }
85 ColumnarValue::Scalar(_) => {
86 validate_map_keys(key_array)?;
87 }
88 }
89
90 let values = get_first_array_ref(values_arg)?;
91
92 make_map_batch_internal(&keys, &values, can_evaluate_to_const, &keys_arg.data_type())
93}
94
95fn validate_map_keys(array: &dyn Array) -> Result<()> {
97 let mut seen_keys = HashSet::with_capacity(array.len());
98
99 for i in 0..array.len() {
100 let key = ScalarValue::try_from_array(array, i)?;
101
102 if key.is_null() {
104 return exec_err!("map key cannot be null");
105 }
106
107 if seen_keys.contains(&key) {
109 return exec_err!("map key must be unique, duplicate key found: {}", key);
110 }
111 seen_keys.insert(key);
112 }
113 Ok(())
114}
115
116fn get_first_array_ref(columnar_value: &ColumnarValue) -> Result<ArrayRef> {
117 match columnar_value {
118 ColumnarValue::Scalar(value) => match value {
119 ScalarValue::List(array) => Ok(array.value(0)),
120 ScalarValue::LargeList(array) => Ok(array.value(0)),
121 ScalarValue::FixedSizeList(array) => Ok(array.value(0)),
122 _ => exec_err!("Expected array, got {:?}", value),
123 },
124 ColumnarValue::Array(array) => Ok(array.to_owned()),
125 }
126}
127
128fn make_map_batch_internal(
129 keys: &ArrayRef,
130 values: &ArrayRef,
131 can_evaluate_to_const: bool,
132 data_type: &DataType,
133) -> Result<ColumnarValue> {
134 if keys.len() != values.len() {
135 return exec_err!("map requires key and value lists to have the same length");
136 }
137
138 if !can_evaluate_to_const || keys.null_count() > 0 {
142 return match data_type {
143 DataType::LargeList(..) => make_map_array_internal::<i64>(keys, values),
144 DataType::List(..) => make_map_array_internal::<i32>(keys, values),
145 DataType::FixedSizeList(..) => {
146 make_map_array_from_fixed_size_list(keys, values)
148 }
149 _ => exec_err!(
150 "Expected List, LargeList, or FixedSizeList, got {:?}",
151 data_type
152 ),
153 };
154 }
155
156 let key_field = Arc::new(Field::new("key", keys.data_type().clone(), false));
157 let value_field = Arc::new(Field::new("value", values.data_type().clone(), true));
158 let mut entry_struct_buffer: VecDeque<(Arc<Field>, ArrayRef)> = VecDeque::new();
159 let mut entry_offsets_buffer = VecDeque::new();
160 entry_offsets_buffer.push_back(0);
161
162 entry_struct_buffer.push_back((Arc::clone(&key_field), Arc::clone(keys)));
163 entry_struct_buffer.push_back((Arc::clone(&value_field), Arc::clone(values)));
164 entry_offsets_buffer.push_back(keys.len() as u32);
165
166 let entry_struct: Vec<(Arc<Field>, ArrayRef)> = entry_struct_buffer.into();
167 let entry_struct = StructArray::from(entry_struct);
168
169 let map_data_type = DataType::Map(
170 Arc::new(Field::new(
171 "entries",
172 entry_struct.data_type().clone(),
173 false,
174 )),
175 false,
176 );
177
178 let entry_offsets: Vec<u32> = entry_offsets_buffer.into();
179 let entry_offsets_buffer = Buffer::from(entry_offsets.to_byte_slice());
180
181 let map_data = ArrayData::builder(map_data_type)
182 .len(entry_offsets.len() - 1)
183 .add_buffer(entry_offsets_buffer)
184 .add_child_data(entry_struct.to_data())
185 .build()?;
186 let map_array = Arc::new(MapArray::from(map_data));
187
188 Ok(if can_evaluate_to_const {
189 ColumnarValue::Scalar(ScalarValue::try_from_array(map_array.as_ref(), 0)?)
190 } else {
191 ColumnarValue::Array(map_array)
192 })
193}
194
195#[user_doc(
196 doc_section(label = "Map Functions"),
197 description = "Returns an Arrow map with the specified key-value pairs.\n\n\
198 The `make_map` function creates a map from two lists: one for keys and one for values. Each key must be unique and non-null.",
199 syntax_example = "map(key, value)\nmap(key: value)\nmake_map(['key1', 'key2'], ['value1', 'value2'])",
200 sql_example = r#"
201```sql
202-- Using map function
203SELECT MAP('type', 'test');
204----
205{type: test}
206
207SELECT MAP(['POST', 'HEAD', 'PATCH'], [41, 33, null]);
208----
209{POST: 41, HEAD: 33, PATCH: NULL}
210
211SELECT MAP([[1,2], [3,4]], ['a', 'b']);
212----
213{[1, 2]: a, [3, 4]: b}
214
215SELECT MAP { 'a': 1, 'b': 2 };
216----
217{a: 1, b: 2}
218
219-- Using make_map function
220SELECT MAKE_MAP(['POST', 'HEAD'], [41, 33]);
221----
222{POST: 41, HEAD: 33}
223
224SELECT MAKE_MAP(['key1', 'key2'], ['value1', null]);
225----
226{key1: value1, key2: }
227```"#,
228 argument(
229 name = "key",
230 description = "For `map`: Expression to be used for key. Can be a constant, column, function, or any combination of arithmetic or string operators.\n\
231 For `make_map`: The list of keys to be used in the map. Each key must be unique and non-null."
232 ),
233 argument(
234 name = "value",
235 description = "For `map`: Expression to be used for value. Can be a constant, column, function, or any combination of arithmetic or string operators.\n\
236 For `make_map`: The list of values to be mapped to the corresponding keys."
237 )
238)]
239#[derive(Debug, PartialEq, Eq, Hash)]
240pub struct MapFunc {
241 signature: Signature,
242}
243
244impl Default for MapFunc {
245 fn default() -> Self {
246 Self::new()
247 }
248}
249
250impl MapFunc {
251 pub fn new() -> Self {
252 Self {
253 signature: Signature::variadic_any(Volatility::Immutable),
254 }
255 }
256}
257
258impl ScalarUDFImpl for MapFunc {
259 fn as_any(&self) -> &dyn Any {
260 self
261 }
262
263 fn name(&self) -> &str {
264 "map"
265 }
266
267 fn signature(&self) -> &Signature {
268 &self.signature
269 }
270
271 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
272 let [keys_arg, values_arg] = take_function_args(self.name(), arg_types)?;
273 let mut builder = SchemaBuilder::new();
274 builder.push(Field::new(
275 "key",
276 get_element_type(keys_arg)?.clone(),
277 false,
278 ));
279 builder.push(Field::new(
280 "value",
281 get_element_type(values_arg)?.clone(),
282 true,
283 ));
284 let fields = builder.finish().fields;
285 Ok(DataType::Map(
286 Arc::new(Field::new("entries", DataType::Struct(fields), false)),
287 false,
288 ))
289 }
290
291 fn invoke_with_args(
292 &self,
293 args: datafusion_expr::ScalarFunctionArgs,
294 ) -> Result<ColumnarValue> {
295 make_map_batch(&args.args)
296 }
297
298 fn documentation(&self) -> Option<&Documentation> {
299 self.doc()
300 }
301}
302
303fn get_element_type(data_type: &DataType) -> Result<&DataType> {
304 match data_type {
305 DataType::List(element) => Ok(element.data_type()),
306 DataType::LargeList(element) => Ok(element.data_type()),
307 DataType::FixedSizeList(element, _) => Ok(element.data_type()),
308 _ => exec_err!(
309 "Expected list, large_list or fixed_size_list, got {:?}",
310 data_type
311 ),
312 }
313}
314
315fn make_map_array_internal<O: OffsetSizeTrait>(
371 keys: &ArrayRef,
372 values: &ArrayRef,
373) -> Result<ColumnarValue> {
374 let keys_data_type = keys.data_type().clone();
376 let values_data_type = values.data_type().clone();
377 let original_len = keys.len(); let nulls_bitmap = keys.nulls().cloned();
382
383 let keys = list_to_arrays::<O>(keys);
384 let values = list_to_arrays::<O>(values);
385
386 build_map_array(
387 &keys,
388 &values,
389 &keys_data_type,
390 &values_data_type,
391 original_len,
392 nulls_bitmap,
393 )
394}
395
396fn make_map_array_from_fixed_size_list(
399 keys: &ArrayRef,
400 values: &ArrayRef,
401) -> Result<ColumnarValue> {
402 let keys_data_type = keys.data_type().clone();
404 let values_data_type = values.data_type().clone();
405 let original_len = keys.len();
406
407 let nulls_bitmap = keys.nulls().cloned();
409
410 let keys = fixed_size_list_to_arrays(keys);
411 let values = fixed_size_list_to_arrays(values);
412
413 build_map_array(
414 &keys,
415 &values,
416 &keys_data_type,
417 &values_data_type,
418 original_len,
419 nulls_bitmap,
420 )
421}
422
423fn build_map_array(
425 keys: &[ArrayRef],
426 values: &[ArrayRef],
427 keys_data_type: &DataType,
428 values_data_type: &DataType,
429 original_len: usize,
430 nulls_bitmap: Option<arrow::buffer::NullBuffer>,
431) -> Result<ColumnarValue> {
432 let mut key_array_vec = vec![];
433 let mut value_array_vec = vec![];
434 for (k, v) in keys.iter().zip(values.iter()) {
435 key_array_vec.push(k.as_ref());
436 value_array_vec.push(v.as_ref());
437 }
438
439 let mut running_offset = 0i32;
444 let mut offset_buffer = vec![running_offset];
445 let mut non_null_idx = 0;
446 for i in 0..original_len {
447 let is_null = nulls_bitmap.as_ref().is_some_and(|nulls| nulls.is_null(i));
448 if !is_null {
449 let entry_count = keys[non_null_idx].len();
450 let entry_count_i32 = i32::try_from(entry_count).map_err(|_| {
452 datafusion_common::DataFusionError::Execution(format!(
453 "Map offset overflow: entry count {entry_count} at index {i} exceeds i32::MAX",
454 ))
455 })?;
456 running_offset =
457 running_offset.checked_add(entry_count_i32).ok_or_else(|| {
458 datafusion_common::DataFusionError::Execution(format!(
459 "Map offset overflow: cumulative offset exceeds i32::MAX at index {i}",
460 ))
461 })?;
462 non_null_idx += 1;
463 }
464 offset_buffer.push(running_offset);
465 }
466
467 let (flattened_keys, flattened_values) = if key_array_vec.is_empty() {
471 let key_type = get_element_type(keys_data_type)?;
474 let value_type = get_element_type(values_data_type)?;
475
476 (
477 arrow::array::new_empty_array(key_type),
478 arrow::array::new_empty_array(value_type),
479 )
480 } else {
481 let flattened_keys = arrow::compute::concat(key_array_vec.as_ref())?;
482 if flattened_keys.null_count() > 0 {
483 return exec_err!("keys cannot be null");
484 }
485 let flattened_values = arrow::compute::concat(value_array_vec.as_ref())?;
486 (flattened_keys, flattened_values)
487 };
488
489 let fields = vec![
490 Arc::new(Field::new("key", flattened_keys.data_type().clone(), false)),
491 Arc::new(Field::new(
492 "value",
493 flattened_values.data_type().clone(),
494 true,
495 )),
496 ];
497
498 let struct_data = ArrayData::builder(DataType::Struct(fields.into()))
499 .len(flattened_keys.len())
500 .add_child_data(flattened_keys.to_data())
501 .add_child_data(flattened_values.to_data())
502 .build()?;
503
504 let mut map_data_builder = ArrayData::builder(DataType::Map(
505 Arc::new(Field::new(
506 "entries",
507 struct_data.data_type().clone(),
508 false,
509 )),
510 false,
511 ))
512 .len(original_len) .add_child_data(struct_data)
514 .add_buffer(Buffer::from_slice_ref(offset_buffer.as_slice()));
515
516 if let Some(nulls) = nulls_bitmap {
518 map_data_builder = map_data_builder.nulls(Some(nulls));
519 }
520
521 let map_data = map_data_builder.build()?;
522 Ok(ColumnarValue::Array(Arc::new(MapArray::from(map_data))))
523}
524
525#[cfg(test)]
526mod tests {
527 use super::*;
528 #[test]
529 fn test_make_map_with_null_maps() {
530 let mut key_builder =
541 arrow::array::ListBuilder::new(arrow::array::StringBuilder::new());
542
543 key_builder.values().append_value("a");
545 key_builder.append(true);
546
547 key_builder.append(false);
549
550 key_builder.values().append_value("b");
552 key_builder.append(true);
553
554 let keys_array = Arc::new(key_builder.finish());
555
556 let mut value_builder =
558 arrow::array::ListBuilder::new(arrow::array::Int32Builder::new());
559
560 value_builder.values().append_value(1);
561 value_builder.append(true);
562
563 value_builder.values().append_value(2);
564 value_builder.append(true);
565
566 value_builder.values().append_value(3);
567 value_builder.append(true);
568
569 let values_array = Arc::new(value_builder.finish());
570
571 let result = make_map_batch(&[
573 ColumnarValue::Array(keys_array),
574 ColumnarValue::Array(values_array),
575 ]);
576
577 assert!(result.is_ok(), "Should handle NULL maps correctly");
578
579 let map_array = match result.unwrap() {
581 ColumnarValue::Array(arr) => arr,
582 _ => panic!("Expected Array result"),
583 };
584
585 assert_eq!(map_array.len(), 3, "Should have 3 maps");
586 assert!(!map_array.is_null(0), "First map should not be NULL");
587 assert!(map_array.is_null(1), "Second map should be NULL");
588 assert!(!map_array.is_null(2), "Third map should not be NULL");
589 }
590
591 #[test]
592 fn test_make_map_with_null_key_within_map_should_fail() {
593 let mut key_builder =
599 arrow::array::ListBuilder::new(arrow::array::StringBuilder::new());
600
601 key_builder.values().append_value("a");
602 key_builder.values().append_null(); key_builder.values().append_value("b");
604 key_builder.append(true);
605
606 let keys_array = Arc::new(key_builder.finish());
607
608 let mut value_builder =
610 arrow::array::ListBuilder::new(arrow::array::Int32Builder::new());
611
612 value_builder.values().append_value(1);
613 value_builder.values().append_value(2);
614 value_builder.values().append_value(3);
615 value_builder.append(true);
616
617 let values_array = Arc::new(value_builder.finish());
618
619 let result = make_map_batch(&[
621 ColumnarValue::Array(keys_array),
622 ColumnarValue::Array(values_array),
623 ]);
624
625 assert!(result.is_err(), "Should reject null keys within maps");
626
627 let err_msg = result.unwrap_err().to_string();
628 assert!(
629 err_msg.contains("cannot be null"),
630 "Error should mention null keys, got: {err_msg}"
631 );
632 }
633
634 #[test]
635 fn test_make_map_with_large_list() {
636 let mut key_builder =
641 arrow::array::LargeListBuilder::new(arrow::array::StringBuilder::new());
642
643 key_builder.values().append_value("a");
645 key_builder.values().append_value("b");
646 key_builder.append(true);
647
648 key_builder.values().append_value("c");
650 key_builder.append(true);
651
652 let keys_array = Arc::new(key_builder.finish());
653
654 let mut value_builder =
656 arrow::array::LargeListBuilder::new(arrow::array::Int32Builder::new());
657
658 value_builder.values().append_value(1);
659 value_builder.values().append_value(2);
660 value_builder.append(true);
661
662 value_builder.values().append_value(3);
663 value_builder.append(true);
664
665 let values_array = Arc::new(value_builder.finish());
666
667 let result = make_map_batch(&[
669 ColumnarValue::Array(keys_array),
670 ColumnarValue::Array(values_array),
671 ]);
672
673 assert!(
674 result.is_ok(),
675 "Should handle LargeList inputs correctly: {:?}",
676 result.err()
677 );
678
679 let map_array = match result.unwrap() {
681 ColumnarValue::Array(arr) => arr,
682 _ => panic!("Expected Array result"),
683 };
684
685 assert_eq!(map_array.len(), 2, "Should have 2 maps");
686 assert!(!map_array.is_null(0), "First map should not be NULL");
687 assert!(!map_array.is_null(1), "Second map should not be NULL");
688 }
689
690 #[test]
691 fn test_make_map_with_fixed_size_list() {
692 use arrow::array::FixedSizeListBuilder;
696
697 let key_values_builder = arrow::array::StringBuilder::new();
699 let mut key_builder = FixedSizeListBuilder::new(key_values_builder, 2);
700
701 key_builder.values().append_value("a");
703 key_builder.values().append_value("b");
704 key_builder.append(true);
705
706 key_builder.values().append_value("c");
708 key_builder.values().append_value("d");
709 key_builder.append(true);
710
711 let keys_array = Arc::new(key_builder.finish());
712
713 let value_values_builder = arrow::array::Int32Builder::new();
715 let mut value_builder = FixedSizeListBuilder::new(value_values_builder, 2);
716
717 value_builder.values().append_value(1);
718 value_builder.values().append_value(2);
719 value_builder.append(true);
720
721 value_builder.values().append_value(3);
722 value_builder.values().append_value(4);
723 value_builder.append(true);
724
725 let values_array = Arc::new(value_builder.finish());
726
727 let result = make_map_batch(&[
729 ColumnarValue::Array(keys_array),
730 ColumnarValue::Array(values_array),
731 ]);
732
733 assert!(
734 result.is_ok(),
735 "Should handle FixedSizeList inputs correctly: {:?}",
736 result.err()
737 );
738
739 let map_array = match result.unwrap() {
741 ColumnarValue::Array(arr) => arr,
742 _ => panic!("Expected Array result"),
743 };
744
745 assert_eq!(map_array.len(), 2, "Should have 2 maps");
746 assert!(!map_array.is_null(0), "First map should not be NULL");
747 assert!(!map_array.is_null(1), "Second map should not be NULL");
748 }
749}