Skip to main content

lance_encoding/encodings/logical/
list.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{ops::Range, sync::Arc};
5
6use arrow_array::{Array, ArrayRef, LargeListArray, ListArray, cast::AsArray, make_array};
7use arrow_schema::DataType;
8use futures::future::BoxFuture;
9use lance_arrow::deepcopy::deep_copy_nulls;
10use lance_arrow::list::ListArrayExt;
11use lance_core::Result;
12
13use crate::{
14    decoder::{
15        DecodedArray, FilterExpression, ScheduledScanLine, SchedulerContext,
16        StructuralDecodeArrayTask, StructuralFieldDecoder, StructuralFieldScheduler,
17        StructuralSchedulingJob,
18    },
19    encoder::{EncodeTask, FieldEncoder, OutOfLineBuffers},
20    repdef::RepDefBuilder,
21};
22
23/// A structural encoder for list fields
24///
25/// The list's offsets are added to the rep/def builder
26/// and the list array's values are passed to the child encoder
27///
28/// The values will have any garbage values removed and will be trimmed
29/// to only include the values that are actually used.
30pub struct ListStructuralEncoder {
31    keep_original_array: bool,
32    child: Box<dyn FieldEncoder>,
33}
34
35impl ListStructuralEncoder {
36    pub fn new(keep_original_array: bool, child: Box<dyn FieldEncoder>) -> Self {
37        Self {
38            keep_original_array,
39            child,
40        }
41    }
42}
43
44impl FieldEncoder for ListStructuralEncoder {
45    fn maybe_encode(
46        &mut self,
47        array: ArrayRef,
48        external_buffers: &mut OutOfLineBuffers,
49        mut repdef: RepDefBuilder,
50        row_number: u64,
51        num_rows: u64,
52    ) -> Result<Vec<EncodeTask>> {
53        let values = if let Some(list_arr) = array.as_list_opt::<i32>() {
54            let has_garbage_values = if self.keep_original_array {
55                repdef.add_offsets(list_arr.offsets().clone(), array.nulls().cloned())
56            } else {
57                // there is no need to deep copy offsets, because offset buffers will be cast to a common type (i64).
58                repdef.add_offsets(list_arr.offsets().clone(), deep_copy_nulls(array.nulls()))
59            };
60            if has_garbage_values {
61                list_arr.filter_garbage_nulls().trimmed_values()
62            } else {
63                list_arr.trimmed_values()
64            }
65        } else if let Some(list_arr) = array.as_list_opt::<i64>() {
66            let has_garbage_values = if self.keep_original_array {
67                repdef.add_offsets(list_arr.offsets().clone(), array.nulls().cloned())
68            } else {
69                repdef.add_offsets(list_arr.offsets().clone(), deep_copy_nulls(array.nulls()))
70            };
71            if has_garbage_values {
72                list_arr.filter_garbage_nulls().trimmed_values()
73            } else {
74                list_arr.trimmed_values()
75            }
76        } else {
77            panic!("List encoder used for non-list data")
78        };
79        self.child
80            .maybe_encode(values, external_buffers, repdef, row_number, num_rows)
81    }
82
83    fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
84        self.child.flush(external_buffers)
85    }
86
87    fn num_columns(&self) -> u32 {
88        self.child.num_columns()
89    }
90
91    fn finish(
92        &mut self,
93        external_buffers: &mut OutOfLineBuffers,
94    ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
95        self.child.finish(external_buffers)
96    }
97}
98
99#[derive(Debug)]
100pub struct StructuralListScheduler {
101    child: Box<dyn StructuralFieldScheduler>,
102}
103
104impl StructuralListScheduler {
105    pub fn new(child: Box<dyn StructuralFieldScheduler>) -> Self {
106        Self { child }
107    }
108}
109
110impl StructuralFieldScheduler for StructuralListScheduler {
111    fn schedule_ranges<'a>(
112        &'a self,
113        ranges: &[Range<u64>],
114        filter: &FilterExpression,
115    ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
116        let child = self.child.schedule_ranges(ranges, filter)?;
117
118        Ok(Box::new(StructuralListSchedulingJob::new(child)))
119    }
120
121    fn initialize<'a>(
122        &'a mut self,
123        filter: &'a FilterExpression,
124        context: &'a SchedulerContext,
125    ) -> BoxFuture<'a, Result<()>> {
126        self.child.initialize(filter, context)
127    }
128}
129
130/// Scheduling job for list data
131///
132/// Scheduling is handled by the primitive encoder and nothing special
133/// happens here.
134#[derive(Debug)]
135struct StructuralListSchedulingJob<'a> {
136    child: Box<dyn StructuralSchedulingJob + 'a>,
137}
138
139impl<'a> StructuralListSchedulingJob<'a> {
140    fn new(child: Box<dyn StructuralSchedulingJob + 'a>) -> Self {
141        Self { child }
142    }
143}
144
145impl StructuralSchedulingJob for StructuralListSchedulingJob<'_> {
146    fn schedule_next(&mut self, context: &mut SchedulerContext) -> Result<Vec<ScheduledScanLine>> {
147        self.child.schedule_next(context)
148    }
149}
150
151#[derive(Debug)]
152pub struct StructuralListDecoder {
153    child: Box<dyn StructuralFieldDecoder>,
154    data_type: DataType,
155}
156
157impl StructuralListDecoder {
158    pub fn new(child: Box<dyn StructuralFieldDecoder>, data_type: DataType) -> Self {
159        Self { child, data_type }
160    }
161}
162
163impl StructuralFieldDecoder for StructuralListDecoder {
164    fn accept_page(&mut self, child: crate::decoder::LoadedPageShard) -> Result<()> {
165        self.child.accept_page(child)
166    }
167
168    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
169        let child_task = self.child.drain(num_rows)?;
170        Ok(Box::new(StructuralListDecodeTask::new(
171            child_task,
172            self.data_type.clone(),
173        )))
174    }
175
176    fn data_type(&self) -> &DataType {
177        &self.data_type
178    }
179}
180
181#[derive(Debug)]
182struct StructuralListDecodeTask {
183    child_task: Box<dyn StructuralDecodeArrayTask>,
184    data_type: DataType,
185}
186
187impl StructuralListDecodeTask {
188    fn new(child_task: Box<dyn StructuralDecodeArrayTask>, data_type: DataType) -> Self {
189        Self {
190            child_task,
191            data_type,
192        }
193    }
194}
195
196impl StructuralDecodeArrayTask for StructuralListDecodeTask {
197    fn decode(self: Box<Self>) -> Result<DecodedArray> {
198        let DecodedArray {
199            array,
200            mut repdef,
201            data_size,
202        } = self.child_task.decode()?;
203        match &self.data_type {
204            DataType::List(child_field) => {
205                let (offsets, validity) = repdef.unravel_offsets::<i32>()?;
206                let array = if !child_field.is_nullable() && array.null_count() == array.len() {
207                    make_array(array.into_data().into_builder().nulls(None).build()?)
208                } else {
209                    array
210                };
211                let list_array = ListArray::try_new(child_field.clone(), offsets, array, validity)?;
212
213                Ok(DecodedArray {
214                    array: Arc::new(list_array),
215                    repdef,
216                    data_size,
217                })
218            }
219            DataType::LargeList(child_field) => {
220                let (offsets, validity) = repdef.unravel_offsets::<i64>()?;
221                let list_array =
222                    LargeListArray::try_new(child_field.clone(), offsets, array, validity)?;
223                Ok(DecodedArray {
224                    array: Arc::new(list_array),
225                    repdef,
226                    data_size,
227                })
228            }
229            _ => panic!("List decoder did not have a list field"),
230        }
231    }
232}
233
234#[cfg(test)]
235mod tests {
236
237    use std::{collections::HashMap, sync::Arc};
238
239    use crate::constants::{
240        STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY, STRUCTURAL_ENCODING_MINIBLOCK,
241    };
242    use arrow_array::{
243        Array, ArrayRef, BooleanArray, DictionaryArray, LargeStringArray, ListArray, StructArray,
244        UInt8Array, UInt64Array,
245        builder::{Int32Builder, Int64Builder, LargeListBuilder, ListBuilder, StringBuilder},
246    };
247
248    use arrow_buffer::{BooleanBuffer, NullBuffer, OffsetBuffer, ScalarBuffer};
249    use arrow_schema::{DataType, Field, Fields};
250    use rstest::rstest;
251
252    use crate::{
253        testing::{TestCases, check_basic_random, check_round_trip_encoding_of_data},
254        version::LanceFileVersion,
255    };
256
257    fn make_list_type(inner_type: DataType) -> DataType {
258        DataType::List(Arc::new(Field::new("item", inner_type, true)))
259    }
260
261    fn make_large_list_type(inner_type: DataType) -> DataType {
262        DataType::LargeList(Arc::new(Field::new("item", inner_type, true)))
263    }
264
265    #[rstest]
266    #[test_log::test(tokio::test)]
267    async fn test_list(
268        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
269        structural_encoding: &str,
270    ) {
271        let mut field_metadata = HashMap::new();
272        field_metadata.insert(
273            STRUCTURAL_ENCODING_META_KEY.to_string(),
274            structural_encoding.into(),
275        );
276        let field =
277            Field::new("", make_list_type(DataType::Int32), true).with_metadata(field_metadata);
278        check_basic_random(field).await;
279    }
280
281    #[rstest]
282    #[test_log::test(tokio::test)]
283    async fn test_deeply_nested_lists(
284        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
285        structural_encoding: &str,
286    ) {
287        let mut field_metadata = HashMap::new();
288        field_metadata.insert(
289            STRUCTURAL_ENCODING_META_KEY.to_string(),
290            structural_encoding.into(),
291        );
292        let field = Field::new("item", DataType::Int32, true).with_metadata(field_metadata);
293        for _ in 0..5 {
294            let field = Field::new("", make_list_type(field.data_type().clone()), true);
295            check_basic_random(field).await;
296        }
297    }
298
299    #[test_log::test(tokio::test)]
300    async fn test_large_list() {
301        let field = Field::new("", make_large_list_type(DataType::Int32), true);
302        check_basic_random(field).await;
303    }
304
305    #[test_log::test(tokio::test)]
306    async fn test_nested_strings() {
307        let field = Field::new("", make_list_type(DataType::Utf8), true);
308        check_basic_random(field).await;
309    }
310
311    #[test_log::test(tokio::test)]
312    async fn test_nested_list() {
313        let field = Field::new("", make_list_type(make_list_type(DataType::Int32)), true);
314        check_basic_random(field).await;
315    }
316
317    #[test_log::test(tokio::test)]
318    async fn test_list_struct_list() {
319        let struct_type = DataType::Struct(Fields::from(vec![Field::new(
320            "inner_str",
321            DataType::Utf8,
322            false,
323        )]));
324
325        let field = Field::new("", make_list_type(struct_type), true);
326        check_basic_random(field).await;
327    }
328
329    #[test_log::test(tokio::test)]
330    async fn test_list_struct_empty() {
331        let fields = Fields::from(vec![Field::new("inner", DataType::UInt64, true)]);
332        let items = UInt64Array::from(Vec::<u64>::new());
333        let structs = StructArray::new(fields, vec![Arc::new(items)], None);
334        let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0; 2 * 1024 * 1024 + 1]));
335        let lists = ListArray::new(
336            Arc::new(Field::new("item", structs.data_type().clone(), true)),
337            offsets,
338            Arc::new(structs),
339            None,
340        );
341
342        check_round_trip_encoding_of_data(
343            vec![Arc::new(lists)],
344            &TestCases::default(),
345            HashMap::new(),
346        )
347        .await;
348    }
349
350    #[rstest]
351    #[test_log::test(tokio::test)]
352    async fn test_simple_list(
353        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
354        structural_encoding: &str,
355    ) {
356        let items_builder = Int32Builder::new();
357        let mut list_builder = ListBuilder::new(items_builder);
358        list_builder.append_value([Some(1), Some(2), Some(3)]);
359        list_builder.append_value([Some(4), Some(5)]);
360        list_builder.append_null();
361        list_builder.append_value([Some(6), Some(7), Some(8)]);
362        let list_array = list_builder.finish();
363
364        let mut field_metadata = HashMap::new();
365        field_metadata.insert(
366            STRUCTURAL_ENCODING_META_KEY.to_string(),
367            structural_encoding.into(),
368        );
369
370        let test_cases = TestCases::default()
371            .with_range(0..2)
372            .with_range(0..3)
373            .with_range(1..3)
374            .with_indices(vec![1, 3])
375            .with_indices(vec![2]);
376        check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, field_metadata)
377            .await;
378    }
379
380    #[rstest]
381    #[test_log::test(tokio::test)]
382    async fn test_simple_nested_list_ends_with_null(
383        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
384        structural_encoding: &str,
385    ) {
386        use arrow_array::Int32Array;
387
388        let values = Int32Array::from(vec![1, 2, 3, 4, 5]);
389        let inner_offsets = ScalarBuffer::<i32>::from(vec![0, 1, 2, 3, 4, 5, 5]);
390        let inner_validity = BooleanBuffer::from(vec![true, true, true, true, true, false]);
391        let outer_offsets = ScalarBuffer::<i32>::from(vec![0, 1, 2, 3, 4, 5, 6, 6]);
392        let outer_validity = BooleanBuffer::from(vec![true, true, true, true, true, true, false]);
393
394        let inner_list = ListArray::new(
395            Arc::new(Field::new("item", DataType::Int32, true)),
396            OffsetBuffer::new(inner_offsets),
397            Arc::new(values),
398            Some(NullBuffer::new(inner_validity)),
399        );
400        let outer_list = ListArray::new(
401            Arc::new(Field::new(
402                "item",
403                DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
404                true,
405            )),
406            OffsetBuffer::new(outer_offsets),
407            Arc::new(inner_list),
408            Some(NullBuffer::new(outer_validity)),
409        );
410
411        let mut field_metadata = HashMap::new();
412        field_metadata.insert(
413            STRUCTURAL_ENCODING_META_KEY.to_string(),
414            structural_encoding.into(),
415        );
416
417        let test_cases = TestCases::default()
418            .with_range(0..2)
419            .with_range(0..3)
420            .with_range(5..7)
421            .with_indices(vec![1, 6])
422            .with_indices(vec![6])
423            .with_min_file_version(LanceFileVersion::V2_1);
424        check_round_trip_encoding_of_data(vec![Arc::new(outer_list)], &test_cases, field_metadata)
425            .await;
426    }
427
428    #[rstest]
429    #[test_log::test(tokio::test)]
430    async fn test_simple_string_list(
431        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
432        structural_encoding: &str,
433    ) {
434        let items_builder = StringBuilder::new();
435        let mut list_builder = ListBuilder::new(items_builder);
436        list_builder.append_value([Some("a"), Some("bc"), Some("def")]);
437        list_builder.append_value([Some("gh"), None]);
438        list_builder.append_null();
439        list_builder.append_value([Some("ijk"), Some("lmnop"), Some("qrs")]);
440        let list_array = list_builder.finish();
441
442        let mut field_metadata = HashMap::new();
443        field_metadata.insert(
444            STRUCTURAL_ENCODING_META_KEY.to_string(),
445            structural_encoding.into(),
446        );
447
448        let test_cases = TestCases::default()
449            .with_range(0..2)
450            .with_range(0..3)
451            .with_range(1..3)
452            .with_indices(vec![1, 3])
453            .with_indices(vec![2])
454            .with_min_file_version(LanceFileVersion::V2_1);
455        check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, field_metadata)
456            .await;
457    }
458
459    #[rstest]
460    #[test_log::test(tokio::test)]
461    async fn test_simple_string_list_no_null(
462        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
463        structural_encoding: &str,
464    ) {
465        let items_builder = StringBuilder::new();
466        let mut list_builder = ListBuilder::new(items_builder);
467        list_builder.append_value([Some("a"), Some("bc"), Some("def")]);
468        list_builder.append_value([Some("gh"), Some("zxy")]);
469        list_builder.append_value([Some("gh"), Some("z")]);
470        list_builder.append_value([Some("ijk"), Some("lmnop"), Some("qrs")]);
471        let list_array = list_builder.finish();
472
473        let mut field_metadata = HashMap::new();
474        field_metadata.insert(
475            STRUCTURAL_ENCODING_META_KEY.to_string(),
476            structural_encoding.into(),
477        );
478
479        let test_cases = TestCases::default()
480            .with_range(0..2)
481            .with_range(0..3)
482            .with_range(1..3)
483            .with_indices(vec![1, 3])
484            .with_indices(vec![2])
485            .with_min_file_version(LanceFileVersion::V2_1);
486        check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, field_metadata)
487            .await;
488    }
489
490    #[rstest]
491    #[test_log::test(tokio::test)]
492    async fn test_simple_sliced_list(
493        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
494        structural_encoding: &str,
495    ) {
496        let items_builder = Int32Builder::new();
497        let mut list_builder = ListBuilder::new(items_builder);
498        list_builder.append_value([Some(1), Some(2), Some(3)]);
499        list_builder.append_value([Some(4), Some(5)]);
500        list_builder.append_null();
501        list_builder.append_value([Some(6), Some(7), Some(8)]);
502        let list_array = list_builder.finish();
503
504        let list_array = list_array.slice(1, 2);
505
506        let mut field_metadata = HashMap::new();
507        field_metadata.insert(
508            STRUCTURAL_ENCODING_META_KEY.to_string(),
509            structural_encoding.into(),
510        );
511
512        let test_cases = TestCases::default()
513            .with_range(0..2)
514            .with_range(1..2)
515            .with_indices(vec![0])
516            .with_indices(vec![1])
517            .with_min_file_version(LanceFileVersion::V2_1);
518        check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, field_metadata)
519            .await;
520    }
521
522    #[test_log::test(tokio::test)]
523    async fn test_simple_list_dict() {
524        let values = LargeStringArray::from_iter_values(["a", "bb", "ccc"]);
525        let indices = UInt8Array::from(vec![0, 1, 2, 0, 1, 2, 0, 1, 2]);
526        let dict_array = DictionaryArray::new(indices, Arc::new(values));
527        let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 3, 5, 6, 9]));
528        let list_array = ListArray::new(
529            Arc::new(Field::new("item", dict_array.data_type().clone(), true)),
530            offsets,
531            Arc::new(dict_array),
532            None,
533        );
534
535        let test_cases = TestCases::default()
536            .with_range(0..2)
537            .with_range(1..3)
538            .with_range(2..4)
539            .with_indices(vec![1])
540            .with_indices(vec![2]);
541        check_round_trip_encoding_of_data(
542            vec![Arc::new(list_array)],
543            &test_cases,
544            HashMap::default(),
545        )
546        .await;
547    }
548
549    #[test_log::test(tokio::test)]
550    async fn test_simple_list_all_null() {
551        let items = UInt64Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
552        let offsets = ScalarBuffer::<i32>::from(vec![0, 5, 8, 10]);
553        let offsets = OffsetBuffer::new(offsets);
554        let list_validity = NullBuffer::new(BooleanBuffer::from(vec![false, false, false]));
555
556        // The list array is nullable but the items are not.  Then, all lists are null.
557        let list_arr = ListArray::new(
558            Arc::new(Field::new("item", DataType::UInt64, false)),
559            offsets,
560            Arc::new(items),
561            Some(list_validity),
562        );
563
564        let test_cases = TestCases::default()
565            .with_range(0..3)
566            .with_range(1..2)
567            .with_indices(vec![1])
568            .with_indices(vec![2])
569            .with_min_file_version(LanceFileVersion::V2_1);
570        check_round_trip_encoding_of_data(
571            vec![Arc::new(list_arr)],
572            &test_cases,
573            HashMap::default(),
574        )
575        .await;
576    }
577
578    #[rstest]
579    #[test_log::test(tokio::test)]
580    async fn test_list_with_garbage_nulls(
581        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
582        structural_encoding: &str,
583    ) {
584        // In Arrow, list nulls are allowed to be non-empty, with masked garbage values
585        // Here we make a list with a null row in the middle with 3 garbage values
586        let items = UInt64Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
587        let offsets = ScalarBuffer::<i32>::from(vec![0, 5, 8, 10]);
588        let offsets = OffsetBuffer::new(offsets);
589        let list_validity = NullBuffer::new(BooleanBuffer::from(vec![true, false, true]));
590        let list_arr = ListArray::new(
591            Arc::new(Field::new("item", DataType::UInt64, true)),
592            offsets,
593            Arc::new(items),
594            Some(list_validity),
595        );
596
597        let mut field_metadata = HashMap::new();
598        field_metadata.insert(
599            STRUCTURAL_ENCODING_META_KEY.to_string(),
600            structural_encoding.into(),
601        );
602
603        let test_cases = TestCases::default()
604            .with_range(0..3)
605            .with_range(1..2)
606            .with_indices(vec![1])
607            .with_indices(vec![2])
608            .with_min_file_version(LanceFileVersion::V2_1);
609        check_round_trip_encoding_of_data(vec![Arc::new(list_arr)], &test_cases, field_metadata)
610            .await;
611    }
612
613    #[rstest]
614    #[test_log::test(tokio::test)]
615    async fn test_simple_two_page_list(
616        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
617        structural_encoding: &str,
618    ) {
619        // This is a simple pre-defined list that spans two pages.  This test is useful for
620        // debugging the repetition index
621
622        let items_builder = Int64Builder::new();
623        let mut list_builder = ListBuilder::new(items_builder);
624        for i in 0..512 {
625            list_builder.append_value([Some(i), Some(i * 2)]);
626        }
627        let list_array_1 = list_builder.finish();
628
629        let items_builder = Int64Builder::new();
630        let mut list_builder = ListBuilder::new(items_builder);
631        for i in 0..512 {
632            let i = i + 512;
633            list_builder.append_value([Some(i), Some(i * 2)]);
634        }
635        let list_array_2 = list_builder.finish();
636
637        let mut metadata = HashMap::new();
638        metadata.insert(
639            STRUCTURAL_ENCODING_META_KEY.to_string(),
640            structural_encoding.into(),
641        );
642
643        let test_cases = TestCases::default()
644            .with_min_file_version(LanceFileVersion::V2_1)
645            .with_page_sizes(vec![100])
646            .with_range(800..900);
647        check_round_trip_encoding_of_data(
648            vec![Arc::new(list_array_1), Arc::new(list_array_2)],
649            &test_cases,
650            metadata,
651        )
652        .await;
653    }
654
655    #[test_log::test(tokio::test)]
656    async fn test_simple_large_list() {
657        let items_builder = Int32Builder::new();
658        let mut list_builder = LargeListBuilder::new(items_builder);
659        list_builder.append_value([Some(1), Some(2), Some(3)]);
660        list_builder.append_value([Some(4), Some(5)]);
661        list_builder.append_null();
662        list_builder.append_value([Some(6), Some(7), Some(8)]);
663        let list_array = list_builder.finish();
664
665        let test_cases = TestCases::default()
666            .with_range(0..2)
667            .with_range(0..3)
668            .with_range(1..3)
669            .with_indices(vec![1, 3]);
670        check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, HashMap::new())
671            .await;
672    }
673
674    #[rstest]
675    #[test_log::test(tokio::test)]
676    async fn test_empty_lists(
677        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
678        structural_encoding: &str,
679    ) {
680        let mut field_metadata = HashMap::new();
681        field_metadata.insert(
682            STRUCTURAL_ENCODING_META_KEY.to_string(),
683            structural_encoding.into(),
684        );
685
686        // Scenario 1: Some lists are empty
687
688        let values = [vec![Some(1), Some(2), Some(3)], vec![], vec![None]];
689        // Test empty list at beginning, middle, and end
690        for order in [[0, 1, 2], [1, 0, 2], [2, 0, 1]] {
691            let items_builder = Int32Builder::new();
692            let mut list_builder = ListBuilder::new(items_builder);
693            for idx in order {
694                list_builder.append_value(values[idx].clone());
695            }
696            let list_array = Arc::new(list_builder.finish());
697            let test_cases = TestCases::default()
698                .with_indices(vec![1])
699                .with_indices(vec![0])
700                .with_indices(vec![2])
701                .with_indices(vec![0, 1]);
702            check_round_trip_encoding_of_data(
703                vec![list_array.clone()],
704                &test_cases,
705                field_metadata.clone(),
706            )
707            .await;
708            let test_cases = test_cases.with_batch_size(1);
709            check_round_trip_encoding_of_data(
710                vec![list_array],
711                &test_cases,
712                field_metadata.clone(),
713            )
714            .await;
715        }
716
717        // Scenario 2: All lists are empty
718
719        // When encoding a list of empty lists there are no items to encode
720        // which is strange and we want to ensure we handle it
721        let items_builder = Int32Builder::new();
722        let mut list_builder = ListBuilder::new(items_builder);
723        list_builder.append(true);
724        list_builder.append_null();
725        list_builder.append(true);
726        let list_array = Arc::new(list_builder.finish());
727
728        let test_cases = TestCases::default().with_range(0..2).with_indices(vec![1]);
729        check_round_trip_encoding_of_data(
730            vec![list_array.clone()],
731            &test_cases,
732            field_metadata.clone(),
733        )
734        .await;
735        let test_cases = test_cases.with_batch_size(1);
736        check_round_trip_encoding_of_data(vec![list_array], &test_cases, field_metadata.clone())
737            .await;
738
739        // Scenario 2B: All lists are empty (but now with strings)
740
741        // When encoding a list of empty lists there are no items to encode
742        // which is strange and we want to ensure we handle it
743        let items_builder = StringBuilder::new();
744        let mut list_builder = ListBuilder::new(items_builder);
745        list_builder.append(true);
746        list_builder.append_null();
747        list_builder.append(true);
748        let list_array = Arc::new(list_builder.finish());
749
750        let test_cases = TestCases::default().with_range(0..2).with_indices(vec![1]);
751        check_round_trip_encoding_of_data(
752            vec![list_array.clone()],
753            &test_cases,
754            field_metadata.clone(),
755        )
756        .await;
757        let test_cases = test_cases.with_batch_size(1);
758        check_round_trip_encoding_of_data(vec![list_array], &test_cases, field_metadata.clone())
759            .await;
760
761        // Scenario 3: All lists are null
762
763        let items_builder = Int32Builder::new();
764        let mut list_builder = ListBuilder::new(items_builder);
765        list_builder.append_null();
766        list_builder.append_null();
767        list_builder.append_null();
768        let list_array = Arc::new(list_builder.finish());
769
770        let test_cases = TestCases::default().with_range(0..2).with_indices(vec![1]);
771        check_round_trip_encoding_of_data(
772            vec![list_array.clone()],
773            &test_cases,
774            field_metadata.clone(),
775        )
776        .await;
777        let test_cases = test_cases.with_batch_size(1);
778        check_round_trip_encoding_of_data(vec![list_array], &test_cases, field_metadata.clone())
779            .await;
780
781        // Scenario 4: All lists are null and inside a struct (only valid for 2.1 since 2.0 doesn't
782        // support null structs)
783        let items_builder = Int32Builder::new();
784        let mut list_builder = ListBuilder::new(items_builder);
785        list_builder.append_null();
786        list_builder.append_null();
787        list_builder.append_null();
788        let list_array = Arc::new(list_builder.finish());
789
790        let struct_validity = NullBuffer::new(BooleanBuffer::from(vec![true, false, true]));
791        let struct_array = Arc::new(StructArray::new(
792            Fields::from(vec![Field::new(
793                "lists",
794                list_array.data_type().clone(),
795                true,
796            )]),
797            vec![list_array],
798            Some(struct_validity),
799        ));
800
801        let test_cases = TestCases::default()
802            .with_range(0..2)
803            .with_indices(vec![1])
804            .with_min_file_version(LanceFileVersion::V2_1);
805        check_round_trip_encoding_of_data(
806            vec![struct_array.clone()],
807            &test_cases,
808            field_metadata.clone(),
809        )
810        .await;
811        let test_cases = test_cases.with_batch_size(1);
812        check_round_trip_encoding_of_data(vec![struct_array], &test_cases, field_metadata.clone())
813            .await;
814    }
815
816    #[test_log::test(tokio::test)]
817    async fn test_empty_list_list() {
818        let items_builder = Int32Builder::new();
819        let list_builder = ListBuilder::new(items_builder);
820        let mut outer_list_builder = ListBuilder::new(list_builder);
821        outer_list_builder.append_null();
822        outer_list_builder.append_null();
823        outer_list_builder.append_null();
824        let list_array = Arc::new(outer_list_builder.finish());
825
826        let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
827        check_round_trip_encoding_of_data(vec![list_array], &test_cases, HashMap::new()).await;
828    }
829
830    #[test_log::test(tokio::test)]
831    #[ignore] // This test is quite slow in debug mode
832    async fn test_jumbo_list() {
833        // This is an overflow test.  We have a list of lists where each list
834        // has 1Mi items.  We encode 5000 of these lists and so we have over 4Gi in the
835        // offsets range
836        let items = BooleanArray::new_null(1024 * 1024);
837        let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 1024 * 1024]));
838        let list_arr = Arc::new(ListArray::new(
839            Arc::new(Field::new("item", DataType::Boolean, true)),
840            offsets,
841            Arc::new(items),
842            None,
843        )) as ArrayRef;
844        let arrs = vec![list_arr; 5000];
845
846        // We can't validate because our validation relies on concatenating all input arrays
847        let test_cases = TestCases::default().without_validation();
848        check_round_trip_encoding_of_data(arrs, &test_cases, HashMap::new()).await;
849    }
850
851    // Regression test for issue with ListArray encoding when crossing 1024 value boundary
852    // This test reproduces the bug where rows_avail assertion fails in schedule_instructions
853    // when encoding a ListArray with specific size patterns that cross the 1024 value boundary
854    #[tokio::test]
855    async fn test_fuzz_issue_4466() {
856        // This specific pattern of list sizes triggers the bug when total values cross 1024
857        // 94 lists total 1009 values (passes), 95 lists total 1025 values (fails)
858        let list_sizes = vec![
859            13, 18, 12, 7, 14, 12, 6, 13, 18, 8, // 0-9: 119 values
860            6, 11, 17, 12, 8, 19, 5, 6, 10, 13, // 10-19: 107 values
861            8, 6, 10, 4, 8, 16, 14, 12, 18, 9, // 20-29: 105 values
862            17, 8, 14, 18, 15, 3, 2, 4, 5, 1, // 30-39: 82 values
863            3, 13, 1, 2, 10, 4, 10, 18, 7, 14, // 40-49: 75 values
864            18, 13, 9, 17, 3, 13, 10, 14, 8, 19, // 50-59: 125 values
865            17, 10, 5, 11, 6, 15, 10, 18, 18, 20, // 60-69: 130 values
866            16, 11, 12, 15, 7, 9, 3, 10, 20, 5, // 70-79: 102 values
867            2, 3, 17, 4, 8, 12, 15, 6, 3, 20, // 80-89: 90 values
868            15, 20, 1, 19, 16, // 90-94: 71 values
869        ];
870
871        // Build the ListArray
872        let mut list_builder = ListBuilder::new(Int32Builder::new());
873        let mut total_values = 0;
874
875        for size in &list_sizes {
876            for i in 0..*size {
877                list_builder.values().append_value(i);
878            }
879            list_builder.append(true);
880            total_values += size;
881        }
882
883        let list_array = Arc::new(list_builder.finish());
884
885        // Verify we have the expected number of values
886        assert_eq!(list_array.len(), 95);
887        assert_eq!(total_values, 1025);
888
889        // This should trigger the assertion failure at primitive.rs:1362
890        // debug_assert!(rows_avail > 0)
891        let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
892
893        // The bug manifests when encoding this specific pattern
894        // Expected: successful round-trip encoding
895        // Actual: panic at primitive.rs:1362 - assertion failed: rows_avail > 0
896        check_round_trip_encoding_of_data(vec![list_array], &test_cases, HashMap::new()).await;
897    }
898
899    #[rstest]
900    #[test_log::test(tokio::test)]
901    async fn test_sparse_large_string_list(
902        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
903        structural_encoding: &str,
904    ) {
905        // 2.5 million rows, mostly empty lists. ~100 lists have 10 short strings each.
906        let num_rows = 2_500_000u32;
907        let num_non_empty = 100u32;
908        let strings_per_list = 10;
909
910        let items_builder = StringBuilder::new();
911        let mut list_builder = ListBuilder::new(items_builder);
912
913        // Spread non-empty lists evenly across the range
914        let step = num_rows / num_non_empty;
915        let mut next_non_empty = step / 2;
916
917        for i in 0..num_rows {
918            if i == next_non_empty {
919                let vals: Vec<Option<&str>> = (0..strings_per_list)
920                    .map(|j| match j % 4 {
921                        0 => Some("a"),
922                        1 => Some("bb"),
923                        2 => Some("ccc"),
924                        _ => Some("d"),
925                    })
926                    .collect();
927                list_builder.append_value(vals);
928                next_non_empty = next_non_empty.saturating_add(step);
929            } else {
930                list_builder.append_value([] as [Option<&str>; 0]);
931            }
932        }
933        let list_array = list_builder.finish();
934
935        let mut field_metadata = HashMap::new();
936        field_metadata.insert(
937            STRUCTURAL_ENCODING_META_KEY.to_string(),
938            structural_encoding.into(),
939        );
940
941        let test_cases = TestCases::default()
942            .with_range(0..1000)
943            .with_range(0..num_rows as u64)
944            .with_indices(vec![0, (step / 2) as u64, num_rows as u64 - 1])
945            .with_max_file_version(LanceFileVersion::V2_2);
946        check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, field_metadata)
947            .await;
948    }
949}