1use std::{ops::Range, sync::Arc};
5
6use arrow_array::{Array, ArrayRef, ListArray, MapArray};
7use arrow_schema::DataType;
8use futures::future::BoxFuture;
9use lance_arrow::deepcopy::deep_copy_nulls;
10use lance_arrow::list::ListArrayExt;
11use lance_core::{Error, Result};
12use snafu::location;
13
14use crate::{
15 decoder::{
16 DecodedArray, FilterExpression, ScheduledScanLine, SchedulerContext,
17 StructuralDecodeArrayTask, StructuralFieldDecoder, StructuralFieldScheduler,
18 StructuralSchedulingJob,
19 },
20 encoder::{EncodeTask, FieldEncoder, OutOfLineBuffers},
21 repdef::RepDefBuilder,
22};
23
24pub struct MapStructuralEncoder {
30 keep_original_array: bool,
31 child: Box<dyn FieldEncoder>,
32}
33
34impl MapStructuralEncoder {
35 pub fn new(keep_original_array: bool, child: Box<dyn FieldEncoder>) -> Self {
36 Self {
37 keep_original_array,
38 child,
39 }
40 }
41}
42
43impl FieldEncoder for MapStructuralEncoder {
44 fn maybe_encode(
45 &mut self,
46 array: ArrayRef,
47 external_buffers: &mut OutOfLineBuffers,
48 mut repdef: RepDefBuilder,
49 row_number: u64,
50 num_rows: u64,
51 ) -> Result<Vec<EncodeTask>> {
52 let map_array = array
53 .as_any()
54 .downcast_ref::<MapArray>()
55 .expect("MapEncoder used for non-map data");
56
57 let has_garbage_values = if self.keep_original_array {
59 repdef.add_offsets(map_array.offsets().clone(), array.nulls().cloned())
60 } else {
61 repdef.add_offsets(map_array.offsets().clone(), deep_copy_nulls(array.nulls()))
62 };
63
64 let list_array: ListArray = map_array.clone().into();
66 let entries = if has_garbage_values {
67 list_array.filter_garbage_nulls().trimmed_values()
68 } else {
69 list_array.trimmed_values()
70 };
71
72 self.child
73 .maybe_encode(entries, external_buffers, repdef, row_number, num_rows)
74 }
75
76 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
77 self.child.flush(external_buffers)
78 }
79
80 fn num_columns(&self) -> u32 {
81 self.child.num_columns()
82 }
83
84 fn finish(
85 &mut self,
86 external_buffers: &mut OutOfLineBuffers,
87 ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
88 self.child.finish(external_buffers)
89 }
90}
91
92#[derive(Debug)]
93pub struct StructuralMapScheduler {
94 child: Box<dyn StructuralFieldScheduler>,
95}
96
97impl StructuralMapScheduler {
98 pub fn new(child: Box<dyn StructuralFieldScheduler>) -> Self {
99 Self { child }
100 }
101}
102
103impl StructuralFieldScheduler for StructuralMapScheduler {
104 fn schedule_ranges<'a>(
105 &'a self,
106 ranges: &[Range<u64>],
107 filter: &FilterExpression,
108 ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
109 let child = self.child.schedule_ranges(ranges, filter)?;
110
111 Ok(Box::new(StructuralMapSchedulingJob::new(child)))
112 }
113
114 fn initialize<'a>(
115 &'a mut self,
116 filter: &'a FilterExpression,
117 context: &'a SchedulerContext,
118 ) -> BoxFuture<'a, Result<()>> {
119 self.child.initialize(filter, context)
120 }
121}
122
123#[derive(Debug)]
128struct StructuralMapSchedulingJob<'a> {
129 child: Box<dyn StructuralSchedulingJob + 'a>,
130}
131
132impl<'a> StructuralMapSchedulingJob<'a> {
133 fn new(child: Box<dyn StructuralSchedulingJob + 'a>) -> Self {
134 Self { child }
135 }
136}
137
138impl StructuralSchedulingJob for StructuralMapSchedulingJob<'_> {
139 fn schedule_next(&mut self, context: &mut SchedulerContext) -> Result<Vec<ScheduledScanLine>> {
140 self.child.schedule_next(context)
141 }
142}
143
144#[derive(Debug)]
145pub struct StructuralMapDecoder {
146 child: Box<dyn StructuralFieldDecoder>,
147 data_type: DataType,
148}
149
150impl StructuralMapDecoder {
151 pub fn new(child: Box<dyn StructuralFieldDecoder>, data_type: DataType) -> Self {
152 Self { child, data_type }
153 }
154}
155
156impl StructuralFieldDecoder for StructuralMapDecoder {
157 fn accept_page(&mut self, child: crate::decoder::LoadedPageShard) -> Result<()> {
158 self.child.accept_page(child)
159 }
160
161 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
162 let child_task = self.child.drain(num_rows)?;
163 Ok(Box::new(StructuralMapDecodeTask::new(
164 child_task,
165 self.data_type.clone(),
166 )))
167 }
168
169 fn data_type(&self) -> &DataType {
170 &self.data_type
171 }
172}
173
174#[derive(Debug)]
175struct StructuralMapDecodeTask {
176 child_task: Box<dyn StructuralDecodeArrayTask>,
177 data_type: DataType,
178}
179
180impl StructuralMapDecodeTask {
181 fn new(child_task: Box<dyn StructuralDecodeArrayTask>, data_type: DataType) -> Self {
182 Self {
183 child_task,
184 data_type,
185 }
186 }
187}
188
189impl StructuralDecodeArrayTask for StructuralMapDecodeTask {
190 fn decode(self: Box<Self>) -> Result<DecodedArray> {
191 let DecodedArray { array, mut repdef } = self.child_task.decode()?;
192
193 let (offsets, validity) = repdef.unravel_offsets::<i32>()?;
195
196 let (entries_field, keys_sorted) = match &self.data_type {
198 DataType::Map(field, keys_sorted) => {
199 if *keys_sorted {
200 return Err(Error::NotSupported {
201 source: "Map type decoder does not support keys_sorted=true now"
202 .to_string()
203 .into(),
204 location: location!(),
205 });
206 }
207 (field.clone(), *keys_sorted)
208 }
209 _ => {
210 return Err(Error::Schema {
211 message: "Map decoder did not have a map field".to_string(),
212 location: location!(),
213 });
214 }
215 };
216
217 let entries = array
219 .as_any()
220 .downcast_ref::<arrow_array::StructArray>()
221 .ok_or_else(|| Error::Schema {
222 message: "Map entries should be a StructArray".to_string(),
223 location: location!(),
224 })?
225 .clone();
226
227 let map_array = MapArray::new(entries_field, offsets, entries, validity, keys_sorted);
229
230 Ok(DecodedArray {
231 array: Arc::new(map_array),
232 repdef,
233 })
234 }
235}
236
237#[cfg(test)]
238mod tests {
239 use std::{collections::HashMap, sync::Arc};
240
241 use arrow_array::{
242 builder::{Int32Builder, MapBuilder, StringBuilder},
243 Array, Int32Array, MapArray, StringArray, StructArray,
244 };
245 use arrow_buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
246 use arrow_schema::{DataType, Field, Fields};
247
248 use crate::encoder::{default_encoding_strategy, ColumnIndexSequence, EncodingOptions};
249 use crate::{
250 testing::{check_round_trip_encoding_of_data, TestCases},
251 version::LanceFileVersion,
252 };
253 use arrow_schema::Field as ArrowField;
254 use lance_core::datatypes::Field as LanceField;
255
256 fn make_map_type(key_type: DataType, value_type: DataType) -> DataType {
257 let entries = Field::new(
259 "entries",
260 DataType::Struct(Fields::from(vec![
261 Field::new("keys", key_type, false),
262 Field::new("values", value_type, true),
263 ])),
264 false,
265 );
266 DataType::Map(Arc::new(entries), false)
267 }
268
269 #[test_log::test(tokio::test)]
270 async fn test_simple_map() {
271 let string_builder = StringBuilder::new();
273 let int_builder = Int32Builder::new();
274 let mut map_builder = MapBuilder::new(None, string_builder, int_builder);
275
276 map_builder.keys().append_value("key1");
278 map_builder.values().append_value(10);
279 map_builder.keys().append_value("key2");
280 map_builder.values().append_value(20);
281 map_builder.append(true).unwrap();
282
283 map_builder.keys().append_value("key3");
285 map_builder.values().append_value(30);
286 map_builder.append(true).unwrap();
287
288 let map_array = map_builder.finish();
289
290 let test_cases = TestCases::default()
291 .with_range(0..2)
292 .with_min_file_version(LanceFileVersion::V2_2);
293
294 check_round_trip_encoding_of_data(vec![Arc::new(map_array)], &test_cases, HashMap::new())
295 .await;
296 }
297
298 #[test_log::test(tokio::test)]
299 async fn test_empty_maps() {
300 let string_builder = StringBuilder::new();
302 let int_builder = Int32Builder::new();
303 let mut map_builder = MapBuilder::new(None, string_builder, int_builder);
304
305 map_builder.keys().append_value("a");
307 map_builder.values().append_value(1);
308 map_builder.append(true).unwrap();
309
310 map_builder.append(true).unwrap();
312
313 map_builder.append(false).unwrap();
315
316 map_builder.append(true).unwrap();
318
319 let map_array = map_builder.finish();
320
321 let test_cases = TestCases::default()
322 .with_range(0..4)
323 .with_indices(vec![1])
324 .with_indices(vec![2])
325 .with_min_file_version(LanceFileVersion::V2_2);
326
327 check_round_trip_encoding_of_data(vec![Arc::new(map_array)], &test_cases, HashMap::new())
328 .await;
329 }
330
331 #[test_log::test(tokio::test)]
332 async fn test_map_with_null_values() {
333 let string_builder = StringBuilder::new();
335 let int_builder = Int32Builder::new();
336 let mut map_builder = MapBuilder::new(None, string_builder, int_builder);
337
338 map_builder.keys().append_value("key1");
340 map_builder.values().append_value(10);
341 map_builder.keys().append_value("key2");
342 map_builder.values().append_null();
343 map_builder.append(true).unwrap();
344
345 map_builder.keys().append_value("key3");
347 map_builder.values().append_null();
348 map_builder.append(true).unwrap();
349
350 let map_array = map_builder.finish();
351
352 let test_cases = TestCases::default()
353 .with_range(0..2)
354 .with_indices(vec![0])
355 .with_indices(vec![1])
356 .with_min_file_version(LanceFileVersion::V2_2);
357
358 check_round_trip_encoding_of_data(vec![Arc::new(map_array)], &test_cases, HashMap::new())
359 .await;
360 }
361
362 #[test_log::test(tokio::test)]
363 async fn test_map_in_struct() {
364 let string_key_builder = StringBuilder::new();
368 let string_val_builder = StringBuilder::new();
369 let mut map_builder = MapBuilder::new(None, string_key_builder, string_val_builder);
370
371 map_builder.keys().append_value("name");
373 map_builder.values().append_value("Alice");
374 map_builder.keys().append_value("city");
375 map_builder.values().append_value("NYC");
376 map_builder.append(true).unwrap();
377
378 map_builder.keys().append_value("name");
380 map_builder.values().append_value("Bob");
381 map_builder.append(true).unwrap();
382
383 map_builder.append(false).unwrap();
385
386 let map_array = Arc::new(map_builder.finish());
387 let id_array = Arc::new(Int32Array::from(vec![1, 2, 3]));
388
389 let struct_array = StructArray::new(
390 Fields::from(vec![
391 Field::new("id", DataType::Int32, false),
392 Field::new(
393 "properties",
394 make_map_type(DataType::Utf8, DataType::Utf8),
395 true,
396 ),
397 ]),
398 vec![id_array, map_array],
399 None,
400 );
401
402 let test_cases = TestCases::default()
403 .with_range(0..3)
404 .with_indices(vec![0, 2])
405 .with_min_file_version(LanceFileVersion::V2_2);
406
407 check_round_trip_encoding_of_data(
408 vec![Arc::new(struct_array)],
409 &test_cases,
410 HashMap::new(),
411 )
412 .await;
413 }
414
415 #[test_log::test(tokio::test)]
416 async fn test_map_in_nullable_struct() {
417 let entries_fields = Fields::from(vec![
420 Field::new("keys", DataType::Utf8, false),
421 Field::new("values", DataType::Int32, true),
422 ]);
423 let entries_field = Arc::new(Field::new(
424 "entries",
425 DataType::Struct(entries_fields.clone()),
426 false,
427 ));
428 let map_entries = StructArray::new(
429 entries_fields,
430 vec![
431 Arc::new(StringArray::from(vec!["a", "garbage", "b"])),
432 Arc::new(Int32Array::from(vec![1, 999, 2])),
433 ],
434 None,
435 );
436 let map_array: Arc<dyn Array> = Arc::new(MapArray::new(
438 entries_field,
439 OffsetBuffer::new(ScalarBuffer::from(vec![0, 1, 2, 3])),
440 map_entries,
441 None, false,
443 ));
444
445 let struct_array = StructArray::new(
446 Fields::from(vec![
447 Field::new("id", DataType::Int32, true),
448 Field::new("props", map_array.data_type().clone(), true),
449 ]),
450 vec![
451 Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])),
452 map_array,
453 ],
454 Some(NullBuffer::from(vec![true, false, true])), );
456
457 let test_cases = TestCases::default()
458 .with_range(0..3)
459 .with_min_file_version(LanceFileVersion::V2_2);
460
461 check_round_trip_encoding_of_data(
462 vec![Arc::new(struct_array)],
463 &test_cases,
464 HashMap::new(),
465 )
466 .await;
467 }
468
469 #[test_log::test(tokio::test)]
470 async fn test_list_of_maps() {
471 use arrow_array::builder::ListBuilder;
473
474 let string_builder = StringBuilder::new();
475 let int_builder = Int32Builder::new();
476 let map_builder = MapBuilder::new(None, string_builder, int_builder);
477 let mut list_builder = ListBuilder::new(map_builder);
478
479 list_builder.values().keys().append_value("a");
481 list_builder.values().values().append_value(1);
482 list_builder.values().append(true).unwrap();
483
484 list_builder.values().keys().append_value("b");
485 list_builder.values().values().append_value(2);
486 list_builder.values().append(true).unwrap();
487
488 list_builder.append(true);
489
490 list_builder.values().keys().append_value("c");
492 list_builder.values().values().append_value(3);
493 list_builder.values().append(true).unwrap();
494
495 list_builder.append(true);
496
497 list_builder.append(true);
499
500 let list_array = list_builder.finish();
501
502 let test_cases = TestCases::default()
503 .with_range(0..3)
504 .with_indices(vec![0, 2])
505 .with_min_file_version(LanceFileVersion::V2_2);
506
507 check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, HashMap::new())
508 .await;
509 }
510
511 #[test_log::test(tokio::test)]
512 async fn test_nested_map() {
513 let inner_string_builder = StringBuilder::new();
518 let inner_int_builder = Int32Builder::new();
519 let mut inner_map_builder1 = MapBuilder::new(None, inner_string_builder, inner_int_builder);
520
521 inner_map_builder1.keys().append_value("x");
523 inner_map_builder1.values().append_value(10);
524 inner_map_builder1.append(true).unwrap();
525
526 inner_map_builder1.keys().append_value("y");
528 inner_map_builder1.values().append_value(20);
529 inner_map_builder1.keys().append_value("z");
530 inner_map_builder1.values().append_value(30);
531 inner_map_builder1.append(true).unwrap();
532
533 let inner_maps = Arc::new(inner_map_builder1.finish());
534
535 let outer_keys = Arc::new(StringArray::from(vec!["key1", "key2"]));
537
538 let entries_struct = StructArray::new(
540 Fields::from(vec![
541 Field::new("key", DataType::Utf8, false),
542 Field::new(
543 "value",
544 make_map_type(DataType::Utf8, DataType::Int32),
545 true,
546 ),
547 ]),
548 vec![outer_keys, inner_maps],
549 None,
550 );
551
552 let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 2]));
553 let entries_field = Field::new("entries", entries_struct.data_type().clone(), false);
554
555 let outer_map = MapArray::new(
556 Arc::new(entries_field),
557 offsets,
558 entries_struct,
559 None,
560 false,
561 );
562
563 let test_cases = TestCases::default()
564 .with_range(0..1)
565 .with_min_file_version(LanceFileVersion::V2_2);
566
567 check_round_trip_encoding_of_data(vec![Arc::new(outer_map)], &test_cases, HashMap::new())
568 .await;
569 }
570
571 #[test_log::test(tokio::test)]
572 async fn test_map_different_key_types() {
573 let int_builder = Int32Builder::new();
575 let string_builder = StringBuilder::new();
576 let mut map_builder = MapBuilder::new(None, int_builder, string_builder);
577
578 map_builder.keys().append_value(1);
580 map_builder.values().append_value("one");
581 map_builder.keys().append_value(2);
582 map_builder.values().append_value("two");
583 map_builder.append(true).unwrap();
584
585 map_builder.keys().append_value(3);
587 map_builder.values().append_value("three");
588 map_builder.append(true).unwrap();
589
590 let map_array = map_builder.finish();
591
592 let test_cases = TestCases::default()
593 .with_range(0..2)
594 .with_indices(vec![0, 1])
595 .with_min_file_version(LanceFileVersion::V2_2);
596
597 check_round_trip_encoding_of_data(vec![Arc::new(map_array)], &test_cases, HashMap::new())
598 .await;
599 }
600
601 #[test_log::test(tokio::test)]
602 async fn test_map_with_extreme_sizes() {
603 let string_builder = StringBuilder::new();
605 let int_builder = Int32Builder::new();
606 let mut map_builder = MapBuilder::new(None, string_builder, int_builder);
607
608 for i in 0..100 {
610 map_builder.keys().append_value(format!("key{}", i));
611 map_builder.values().append_value(i);
612 }
613 map_builder.append(true).unwrap();
614
615 map_builder.append(true).unwrap();
617
618 let map_array = map_builder.finish();
619
620 let test_cases = TestCases::default()
621 .with_range(0..2)
622 .with_min_file_version(LanceFileVersion::V2_2);
623
624 check_round_trip_encoding_of_data(vec![Arc::new(map_array)], &test_cases, HashMap::new())
625 .await;
626 }
627
628 #[test_log::test(tokio::test)]
629 async fn test_map_all_null() {
630 let string_builder = StringBuilder::new();
632 let int_builder = Int32Builder::new();
633 let mut map_builder = MapBuilder::new(None, string_builder, int_builder);
634
635 map_builder.append(false).unwrap(); map_builder.append(false).unwrap(); let map_array = map_builder.finish();
640
641 let test_cases = TestCases::default()
642 .with_range(0..2)
643 .with_min_file_version(LanceFileVersion::V2_2);
644
645 check_round_trip_encoding_of_data(vec![Arc::new(map_array)], &test_cases, HashMap::new())
646 .await;
647 }
648
649 #[test_log::test(tokio::test)]
650 async fn test_map_encoder_keep_original_array_scenarios() {
651 let string_builder = StringBuilder::new();
654 let int_builder = Int32Builder::new();
655 let mut map_builder = MapBuilder::new(None, string_builder, int_builder);
656
657 map_builder.keys().append_value("key1");
660 map_builder.values().append_value(10);
661 map_builder.keys().append_value("key2");
662 map_builder.values().append_null();
663 map_builder.append(true).unwrap();
664
665 map_builder.append(false).unwrap();
667
668 map_builder.keys().append_value("key3");
670 map_builder.values().append_value(30);
671 map_builder.append(true).unwrap();
672
673 let map_array = map_builder.finish();
674
675 let test_cases = TestCases::default()
676 .with_range(0..3)
677 .with_indices(vec![0, 1, 2])
678 .with_min_file_version(LanceFileVersion::V2_2);
679
680 check_round_trip_encoding_of_data(vec![Arc::new(map_array)], &test_cases, HashMap::new())
683 .await;
684 }
685
686 #[test]
687 fn test_map_not_supported_write_in_v2_1() {
688 let map_arrow_field = ArrowField::new(
690 "map_field",
691 make_map_type(DataType::Utf8, DataType::Int32),
692 true,
693 );
694 let map_field = LanceField::try_from(&map_arrow_field).unwrap();
695
696 let encoder_strategy = default_encoding_strategy(LanceFileVersion::V2_1);
698 let mut column_index = ColumnIndexSequence::default();
699 let options = EncodingOptions::default();
700
701 let encoder_result = encoder_strategy.create_field_encoder(
702 encoder_strategy.as_ref(),
703 &map_field,
704 &mut column_index,
705 &options,
706 );
707
708 assert!(
709 encoder_result.is_err(),
710 "Map type should not be supported in V2_1 for encoder"
711 );
712 let Err(encoder_err) = encoder_result else {
713 panic!("Expected error but got Ok")
714 };
715
716 let encoder_err_msg = format!("{}", encoder_err);
717 assert!(
718 encoder_err_msg.contains("2.2"),
719 "Encoder error message should mention version 2.2, got: {}",
720 encoder_err_msg
721 );
722 assert!(
723 encoder_err_msg.contains("Map data type"),
724 "Encoder error message should mention Map data type, got: {}",
725 encoder_err_msg
726 );
727 }
728}