1use std::{ops::Range, sync::Arc};
5
6use arrow_array::{Array, ArrayRef, ListArray, MapArray};
7use arrow_schema::DataType;
8use futures::future::BoxFuture;
9use lance_arrow::deepcopy::deep_copy_nulls;
10use lance_arrow::list::ListArrayExt;
11use lance_core::{Error, Result};
12
13use crate::{
14 decoder::{
15 DecodedArray, FilterExpression, ScheduledScanLine, SchedulerContext,
16 StructuralDecodeArrayTask, StructuralFieldDecoder, StructuralFieldScheduler,
17 StructuralSchedulingJob,
18 },
19 encoder::{EncodeTask, FieldEncoder, OutOfLineBuffers},
20 repdef::RepDefBuilder,
21};
22
23pub struct MapStructuralEncoder {
29 keep_original_array: bool,
30 child: Box<dyn FieldEncoder>,
31}
32
33impl MapStructuralEncoder {
34 pub fn new(keep_original_array: bool, child: Box<dyn FieldEncoder>) -> Self {
35 Self {
36 keep_original_array,
37 child,
38 }
39 }
40}
41
42impl FieldEncoder for MapStructuralEncoder {
43 fn maybe_encode(
44 &mut self,
45 array: ArrayRef,
46 external_buffers: &mut OutOfLineBuffers,
47 mut repdef: RepDefBuilder,
48 row_number: u64,
49 num_rows: u64,
50 ) -> Result<Vec<EncodeTask>> {
51 let map_array = array
52 .as_any()
53 .downcast_ref::<MapArray>()
54 .expect("MapEncoder used for non-map data");
55
56 let has_garbage_values = if self.keep_original_array {
58 repdef.add_offsets(map_array.offsets().clone(), array.nulls().cloned())
59 } else {
60 repdef.add_offsets(map_array.offsets().clone(), deep_copy_nulls(array.nulls()))
61 };
62
63 let list_array: ListArray = map_array.clone().into();
65 let entries = if has_garbage_values {
66 list_array.filter_garbage_nulls().trimmed_values()
67 } else {
68 list_array.trimmed_values()
69 };
70
71 self.child
72 .maybe_encode(entries, external_buffers, repdef, row_number, num_rows)
73 }
74
75 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
76 self.child.flush(external_buffers)
77 }
78
79 fn num_columns(&self) -> u32 {
80 self.child.num_columns()
81 }
82
83 fn finish(
84 &mut self,
85 external_buffers: &mut OutOfLineBuffers,
86 ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
87 self.child.finish(external_buffers)
88 }
89}
90
91#[derive(Debug)]
92pub struct StructuralMapScheduler {
93 child: Box<dyn StructuralFieldScheduler>,
94}
95
96impl StructuralMapScheduler {
97 pub fn new(child: Box<dyn StructuralFieldScheduler>) -> Self {
98 Self { child }
99 }
100}
101
102impl StructuralFieldScheduler for StructuralMapScheduler {
103 fn schedule_ranges<'a>(
104 &'a self,
105 ranges: &[Range<u64>],
106 filter: &FilterExpression,
107 ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
108 let child = self.child.schedule_ranges(ranges, filter)?;
109
110 Ok(Box::new(StructuralMapSchedulingJob::new(child)))
111 }
112
113 fn initialize<'a>(
114 &'a mut self,
115 filter: &'a FilterExpression,
116 context: &'a SchedulerContext,
117 ) -> BoxFuture<'a, Result<()>> {
118 self.child.initialize(filter, context)
119 }
120}
121
122#[derive(Debug)]
127struct StructuralMapSchedulingJob<'a> {
128 child: Box<dyn StructuralSchedulingJob + 'a>,
129}
130
131impl<'a> StructuralMapSchedulingJob<'a> {
132 fn new(child: Box<dyn StructuralSchedulingJob + 'a>) -> Self {
133 Self { child }
134 }
135}
136
137impl StructuralSchedulingJob for StructuralMapSchedulingJob<'_> {
138 fn schedule_next(&mut self, context: &mut SchedulerContext) -> Result<Vec<ScheduledScanLine>> {
139 self.child.schedule_next(context)
140 }
141}
142
143#[derive(Debug)]
144pub struct StructuralMapDecoder {
145 child: Box<dyn StructuralFieldDecoder>,
146 data_type: DataType,
147}
148
149impl StructuralMapDecoder {
150 pub fn new(child: Box<dyn StructuralFieldDecoder>, data_type: DataType) -> Self {
151 Self { child, data_type }
152 }
153}
154
155impl StructuralFieldDecoder for StructuralMapDecoder {
156 fn accept_page(&mut self, child: crate::decoder::LoadedPageShard) -> Result<()> {
157 self.child.accept_page(child)
158 }
159
160 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
161 let child_task = self.child.drain(num_rows)?;
162 Ok(Box::new(StructuralMapDecodeTask::new(
163 child_task,
164 self.data_type.clone(),
165 )))
166 }
167
168 fn data_type(&self) -> &DataType {
169 &self.data_type
170 }
171}
172
173#[derive(Debug)]
174struct StructuralMapDecodeTask {
175 child_task: Box<dyn StructuralDecodeArrayTask>,
176 data_type: DataType,
177}
178
179impl StructuralMapDecodeTask {
180 fn new(child_task: Box<dyn StructuralDecodeArrayTask>, data_type: DataType) -> Self {
181 Self {
182 child_task,
183 data_type,
184 }
185 }
186}
187
188impl StructuralDecodeArrayTask for StructuralMapDecodeTask {
189 fn decode(self: Box<Self>) -> Result<DecodedArray> {
190 let DecodedArray {
191 array,
192 mut repdef,
193 data_size,
194 } = self.child_task.decode()?;
195
196 let (offsets, validity) = repdef.unravel_offsets::<i32>()?;
198
199 let (entries_field, keys_sorted) = match &self.data_type {
201 DataType::Map(field, keys_sorted) => {
202 if *keys_sorted {
203 return Err(Error::not_supported_source(
204 "Map type decoder does not support keys_sorted=true now"
205 .to_string()
206 .into(),
207 ));
208 }
209 (field.clone(), *keys_sorted)
210 }
211 _ => {
212 return Err(Error::schema(
213 "Map decoder did not have a map field".to_string(),
214 ));
215 }
216 };
217
218 let entries = array
220 .as_any()
221 .downcast_ref::<arrow_array::StructArray>()
222 .ok_or_else(|| Error::schema("Map entries should be a StructArray".to_string()))?
223 .clone();
224
225 let map_array = MapArray::new(entries_field, offsets, entries, validity, keys_sorted);
227
228 Ok(DecodedArray {
229 array: Arc::new(map_array),
230 repdef,
231 data_size,
232 })
233 }
234}
235
236#[cfg(test)]
237mod tests {
238 use std::{collections::HashMap, sync::Arc};
239
240 use arrow_array::{
241 Array, Int32Array, MapArray, StringArray, StructArray,
242 builder::{Int32Builder, MapBuilder, StringBuilder},
243 };
244 use arrow_buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
245 use arrow_schema::{DataType, Field, Fields};
246
247 use crate::encoder::{ColumnIndexSequence, EncodingOptions, default_encoding_strategy};
248 use crate::{
249 testing::{TestCases, check_round_trip_encoding_of_data},
250 version::LanceFileVersion,
251 };
252 use arrow_schema::Field as ArrowField;
253 use lance_core::datatypes::Field as LanceField;
254
255 fn make_map_type(key_type: DataType, value_type: DataType) -> DataType {
256 let entries = Field::new(
258 "entries",
259 DataType::Struct(Fields::from(vec![
260 Field::new("keys", key_type, false),
261 Field::new("values", value_type, true),
262 ])),
263 false,
264 );
265 DataType::Map(Arc::new(entries), false)
266 }
267
268 #[test_log::test(tokio::test)]
269 async fn test_simple_map() {
270 let string_builder = StringBuilder::new();
272 let int_builder = Int32Builder::new();
273 let mut map_builder = MapBuilder::new(None, string_builder, int_builder);
274
275 map_builder.keys().append_value("key1");
277 map_builder.values().append_value(10);
278 map_builder.keys().append_value("key2");
279 map_builder.values().append_value(20);
280 map_builder.append(true).unwrap();
281
282 map_builder.keys().append_value("key3");
284 map_builder.values().append_value(30);
285 map_builder.append(true).unwrap();
286
287 let map_array = map_builder.finish();
288
289 let test_cases = TestCases::default()
290 .with_range(0..2)
291 .with_min_file_version(LanceFileVersion::V2_2);
292
293 check_round_trip_encoding_of_data(vec![Arc::new(map_array)], &test_cases, HashMap::new())
294 .await;
295 }
296
297 #[test_log::test(tokio::test)]
298 async fn test_empty_maps() {
299 let string_builder = StringBuilder::new();
301 let int_builder = Int32Builder::new();
302 let mut map_builder = MapBuilder::new(None, string_builder, int_builder);
303
304 map_builder.keys().append_value("a");
306 map_builder.values().append_value(1);
307 map_builder.append(true).unwrap();
308
309 map_builder.append(true).unwrap();
311
312 map_builder.append(false).unwrap();
314
315 map_builder.append(true).unwrap();
317
318 let map_array = map_builder.finish();
319
320 let test_cases = TestCases::default()
321 .with_range(0..4)
322 .with_indices(vec![1])
323 .with_indices(vec![2])
324 .with_min_file_version(LanceFileVersion::V2_2);
325
326 check_round_trip_encoding_of_data(vec![Arc::new(map_array)], &test_cases, HashMap::new())
327 .await;
328 }
329
330 #[test_log::test(tokio::test)]
331 async fn test_map_with_null_values() {
332 let string_builder = StringBuilder::new();
334 let int_builder = Int32Builder::new();
335 let mut map_builder = MapBuilder::new(None, string_builder, int_builder);
336
337 map_builder.keys().append_value("key1");
339 map_builder.values().append_value(10);
340 map_builder.keys().append_value("key2");
341 map_builder.values().append_null();
342 map_builder.append(true).unwrap();
343
344 map_builder.keys().append_value("key3");
346 map_builder.values().append_null();
347 map_builder.append(true).unwrap();
348
349 let map_array = map_builder.finish();
350
351 let test_cases = TestCases::default()
352 .with_range(0..2)
353 .with_indices(vec![0])
354 .with_indices(vec![1])
355 .with_min_file_version(LanceFileVersion::V2_2);
356
357 check_round_trip_encoding_of_data(vec![Arc::new(map_array)], &test_cases, HashMap::new())
358 .await;
359 }
360
361 #[test_log::test(tokio::test)]
362 async fn test_map_in_struct() {
363 let string_key_builder = StringBuilder::new();
367 let string_val_builder = StringBuilder::new();
368 let mut map_builder = MapBuilder::new(None, string_key_builder, string_val_builder);
369
370 map_builder.keys().append_value("name");
372 map_builder.values().append_value("Alice");
373 map_builder.keys().append_value("city");
374 map_builder.values().append_value("NYC");
375 map_builder.append(true).unwrap();
376
377 map_builder.keys().append_value("name");
379 map_builder.values().append_value("Bob");
380 map_builder.append(true).unwrap();
381
382 map_builder.append(false).unwrap();
384
385 let map_array = Arc::new(map_builder.finish());
386 let id_array = Arc::new(Int32Array::from(vec![1, 2, 3]));
387
388 let struct_array = StructArray::new(
389 Fields::from(vec![
390 Field::new("id", DataType::Int32, false),
391 Field::new(
392 "properties",
393 make_map_type(DataType::Utf8, DataType::Utf8),
394 true,
395 ),
396 ]),
397 vec![id_array, map_array],
398 None,
399 );
400
401 let test_cases = TestCases::default()
402 .with_range(0..3)
403 .with_indices(vec![0, 2])
404 .with_min_file_version(LanceFileVersion::V2_2);
405
406 check_round_trip_encoding_of_data(
407 vec![Arc::new(struct_array)],
408 &test_cases,
409 HashMap::new(),
410 )
411 .await;
412 }
413
414 #[test_log::test(tokio::test)]
415 async fn test_map_in_nullable_struct() {
416 let entries_fields = Fields::from(vec![
419 Field::new("keys", DataType::Utf8, false),
420 Field::new("values", DataType::Int32, true),
421 ]);
422 let entries_field = Arc::new(Field::new(
423 "entries",
424 DataType::Struct(entries_fields.clone()),
425 false,
426 ));
427 let map_entries = StructArray::new(
428 entries_fields,
429 vec![
430 Arc::new(StringArray::from(vec!["a", "garbage", "b"])),
431 Arc::new(Int32Array::from(vec![1, 999, 2])),
432 ],
433 None,
434 );
435 let map_array: Arc<dyn Array> = Arc::new(MapArray::new(
437 entries_field,
438 OffsetBuffer::new(ScalarBuffer::from(vec![0, 1, 2, 3])),
439 map_entries,
440 None, false,
442 ));
443
444 let struct_array = StructArray::new(
445 Fields::from(vec![
446 Field::new("id", DataType::Int32, true),
447 Field::new("props", map_array.data_type().clone(), true),
448 ]),
449 vec![
450 Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])),
451 map_array,
452 ],
453 Some(NullBuffer::from(vec![true, false, true])), );
455
456 let test_cases = TestCases::default()
457 .with_range(0..3)
458 .with_min_file_version(LanceFileVersion::V2_2);
459
460 check_round_trip_encoding_of_data(
461 vec![Arc::new(struct_array)],
462 &test_cases,
463 HashMap::new(),
464 )
465 .await;
466 }
467
468 #[test_log::test(tokio::test)]
469 async fn test_list_of_maps() {
470 use arrow_array::builder::ListBuilder;
472
473 let string_builder = StringBuilder::new();
474 let int_builder = Int32Builder::new();
475 let map_builder = MapBuilder::new(None, string_builder, int_builder);
476 let mut list_builder = ListBuilder::new(map_builder);
477
478 list_builder.values().keys().append_value("a");
480 list_builder.values().values().append_value(1);
481 list_builder.values().append(true).unwrap();
482
483 list_builder.values().keys().append_value("b");
484 list_builder.values().values().append_value(2);
485 list_builder.values().append(true).unwrap();
486
487 list_builder.append(true);
488
489 list_builder.values().keys().append_value("c");
491 list_builder.values().values().append_value(3);
492 list_builder.values().append(true).unwrap();
493
494 list_builder.append(true);
495
496 list_builder.append(true);
498
499 let list_array = list_builder.finish();
500
501 let test_cases = TestCases::default()
502 .with_range(0..3)
503 .with_indices(vec![0, 2])
504 .with_min_file_version(LanceFileVersion::V2_2);
505
506 check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, HashMap::new())
507 .await;
508 }
509
510 #[test_log::test(tokio::test)]
511 async fn test_nested_map() {
512 let inner_string_builder = StringBuilder::new();
517 let inner_int_builder = Int32Builder::new();
518 let mut inner_map_builder1 = MapBuilder::new(None, inner_string_builder, inner_int_builder);
519
520 inner_map_builder1.keys().append_value("x");
522 inner_map_builder1.values().append_value(10);
523 inner_map_builder1.append(true).unwrap();
524
525 inner_map_builder1.keys().append_value("y");
527 inner_map_builder1.values().append_value(20);
528 inner_map_builder1.keys().append_value("z");
529 inner_map_builder1.values().append_value(30);
530 inner_map_builder1.append(true).unwrap();
531
532 let inner_maps = Arc::new(inner_map_builder1.finish());
533
534 let outer_keys = Arc::new(StringArray::from(vec!["key1", "key2"]));
536
537 let entries_struct = StructArray::new(
539 Fields::from(vec![
540 Field::new("key", DataType::Utf8, false),
541 Field::new(
542 "value",
543 make_map_type(DataType::Utf8, DataType::Int32),
544 true,
545 ),
546 ]),
547 vec![outer_keys, inner_maps],
548 None,
549 );
550
551 let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 2]));
552 let entries_field = Field::new("entries", entries_struct.data_type().clone(), false);
553
554 let outer_map = MapArray::new(
555 Arc::new(entries_field),
556 offsets,
557 entries_struct,
558 None,
559 false,
560 );
561
562 let test_cases = TestCases::default()
563 .with_range(0..1)
564 .with_min_file_version(LanceFileVersion::V2_2);
565
566 check_round_trip_encoding_of_data(vec![Arc::new(outer_map)], &test_cases, HashMap::new())
567 .await;
568 }
569
570 #[test_log::test(tokio::test)]
571 async fn test_map_different_key_types() {
572 let int_builder = Int32Builder::new();
574 let string_builder = StringBuilder::new();
575 let mut map_builder = MapBuilder::new(None, int_builder, string_builder);
576
577 map_builder.keys().append_value(1);
579 map_builder.values().append_value("one");
580 map_builder.keys().append_value(2);
581 map_builder.values().append_value("two");
582 map_builder.append(true).unwrap();
583
584 map_builder.keys().append_value(3);
586 map_builder.values().append_value("three");
587 map_builder.append(true).unwrap();
588
589 let map_array = map_builder.finish();
590
591 let test_cases = TestCases::default()
592 .with_range(0..2)
593 .with_indices(vec![0, 1])
594 .with_min_file_version(LanceFileVersion::V2_2);
595
596 check_round_trip_encoding_of_data(vec![Arc::new(map_array)], &test_cases, HashMap::new())
597 .await;
598 }
599
600 #[test_log::test(tokio::test)]
601 async fn test_map_with_extreme_sizes() {
602 let string_builder = StringBuilder::new();
604 let int_builder = Int32Builder::new();
605 let mut map_builder = MapBuilder::new(None, string_builder, int_builder);
606
607 for i in 0..100 {
609 map_builder.keys().append_value(format!("key{}", i));
610 map_builder.values().append_value(i);
611 }
612 map_builder.append(true).unwrap();
613
614 map_builder.append(true).unwrap();
616
617 let map_array = map_builder.finish();
618
619 let test_cases = TestCases::default()
620 .with_range(0..2)
621 .with_min_file_version(LanceFileVersion::V2_2);
622
623 check_round_trip_encoding_of_data(vec![Arc::new(map_array)], &test_cases, HashMap::new())
624 .await;
625 }
626
627 #[test_log::test(tokio::test)]
628 async fn test_map_all_null() {
629 let string_builder = StringBuilder::new();
631 let int_builder = Int32Builder::new();
632 let mut map_builder = MapBuilder::new(None, string_builder, int_builder);
633
634 map_builder.append(false).unwrap(); map_builder.append(false).unwrap(); let map_array = map_builder.finish();
639
640 let test_cases = TestCases::default()
641 .with_range(0..2)
642 .with_min_file_version(LanceFileVersion::V2_2);
643
644 check_round_trip_encoding_of_data(vec![Arc::new(map_array)], &test_cases, HashMap::new())
645 .await;
646 }
647
648 #[test_log::test(tokio::test)]
649 async fn test_map_encoder_keep_original_array_scenarios() {
650 let string_builder = StringBuilder::new();
653 let int_builder = Int32Builder::new();
654 let mut map_builder = MapBuilder::new(None, string_builder, int_builder);
655
656 map_builder.keys().append_value("key1");
659 map_builder.values().append_value(10);
660 map_builder.keys().append_value("key2");
661 map_builder.values().append_null();
662 map_builder.append(true).unwrap();
663
664 map_builder.append(false).unwrap();
666
667 map_builder.keys().append_value("key3");
669 map_builder.values().append_value(30);
670 map_builder.append(true).unwrap();
671
672 let map_array = map_builder.finish();
673
674 let test_cases = TestCases::default()
675 .with_range(0..3)
676 .with_indices(vec![0, 1, 2])
677 .with_min_file_version(LanceFileVersion::V2_2);
678
679 check_round_trip_encoding_of_data(vec![Arc::new(map_array)], &test_cases, HashMap::new())
682 .await;
683 }
684
685 #[test]
686 fn test_map_not_supported_write_in_v2_1() {
687 let map_arrow_field = ArrowField::new(
689 "map_field",
690 make_map_type(DataType::Utf8, DataType::Int32),
691 true,
692 );
693 let map_field = LanceField::try_from(&map_arrow_field).unwrap();
694
695 let encoder_strategy = default_encoding_strategy(LanceFileVersion::V2_1);
697 let mut column_index = ColumnIndexSequence::default();
698 let options = EncodingOptions::default();
699
700 let encoder_result = encoder_strategy.create_field_encoder(
701 encoder_strategy.as_ref(),
702 &map_field,
703 &mut column_index,
704 &options,
705 );
706
707 assert!(
708 encoder_result.is_err(),
709 "Map type should not be supported in V2_1 for encoder"
710 );
711 let Err(encoder_err) = encoder_result else {
712 panic!("Expected error but got Ok")
713 };
714
715 let encoder_err_msg = format!("{}", encoder_err);
716 assert!(
717 encoder_err_msg.contains("2.2"),
718 "Encoder error message should mention version 2.2, got: {}",
719 encoder_err_msg
720 );
721 assert!(
722 encoder_err_msg.contains("Map data type"),
723 "Encoder error message should mention Map data type, got: {}",
724 encoder_err_msg
725 );
726 }
727}