lance_encoding/encodings/logical/
list.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{ops::Range, sync::Arc};
5
6use arrow_array::{cast::AsArray, make_array, Array, ArrayRef, LargeListArray, ListArray};
7use arrow_schema::DataType;
8use futures::future::BoxFuture;
9use lance_arrow::deepcopy::deep_copy_nulls;
10use lance_arrow::list::ListArrayExt;
11use lance_core::Result;
12
13use crate::{
14    decoder::{
15        DecodedArray, FilterExpression, ScheduledScanLine, SchedulerContext,
16        StructuralDecodeArrayTask, StructuralFieldDecoder, StructuralFieldScheduler,
17        StructuralSchedulingJob,
18    },
19    encoder::{EncodeTask, FieldEncoder, OutOfLineBuffers},
20    repdef::RepDefBuilder,
21};
22
23/// A structural encoder for list fields
24///
25/// The list's offsets are added to the rep/def builder
26/// and the list array's values are passed to the child encoder
27///
28/// The values will have any garbage values removed and will be trimmed
29/// to only include the values that are actually used.
30pub struct ListStructuralEncoder {
31    keep_original_array: bool,
32    child: Box<dyn FieldEncoder>,
33}
34
35impl ListStructuralEncoder {
36    pub fn new(keep_original_array: bool, child: Box<dyn FieldEncoder>) -> Self {
37        Self {
38            keep_original_array,
39            child,
40        }
41    }
42}
43
44impl FieldEncoder for ListStructuralEncoder {
45    fn maybe_encode(
46        &mut self,
47        array: ArrayRef,
48        external_buffers: &mut OutOfLineBuffers,
49        mut repdef: RepDefBuilder,
50        row_number: u64,
51        num_rows: u64,
52    ) -> Result<Vec<EncodeTask>> {
53        let values = if let Some(list_arr) = array.as_list_opt::<i32>() {
54            let has_garbage_values = if self.keep_original_array {
55                repdef.add_offsets(list_arr.offsets().clone(), array.nulls().cloned())
56            } else {
57                // there is no need to deep copy offsets, because offset buffers will be cast to a common type (i64).
58                repdef.add_offsets(list_arr.offsets().clone(), deep_copy_nulls(array.nulls()))
59            };
60            if has_garbage_values {
61                list_arr.filter_garbage_nulls().trimmed_values()
62            } else {
63                list_arr.trimmed_values()
64            }
65        } else if let Some(list_arr) = array.as_list_opt::<i64>() {
66            let has_garbage_values = if self.keep_original_array {
67                repdef.add_offsets(list_arr.offsets().clone(), array.nulls().cloned())
68            } else {
69                repdef.add_offsets(list_arr.offsets().clone(), deep_copy_nulls(array.nulls()))
70            };
71            if has_garbage_values {
72                list_arr.filter_garbage_nulls().trimmed_values()
73            } else {
74                list_arr.trimmed_values()
75            }
76        } else {
77            panic!("List encoder used for non-list data")
78        };
79        self.child
80            .maybe_encode(values, external_buffers, repdef, row_number, num_rows)
81    }
82
83    fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
84        self.child.flush(external_buffers)
85    }
86
87    fn num_columns(&self) -> u32 {
88        self.child.num_columns()
89    }
90
91    fn finish(
92        &mut self,
93        external_buffers: &mut OutOfLineBuffers,
94    ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
95        self.child.finish(external_buffers)
96    }
97}
98
99#[derive(Debug)]
100pub struct StructuralListScheduler {
101    child: Box<dyn StructuralFieldScheduler>,
102}
103
104impl StructuralListScheduler {
105    pub fn new(child: Box<dyn StructuralFieldScheduler>) -> Self {
106        Self { child }
107    }
108}
109
110impl StructuralFieldScheduler for StructuralListScheduler {
111    fn schedule_ranges<'a>(
112        &'a self,
113        ranges: &[Range<u64>],
114        filter: &FilterExpression,
115    ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
116        let child = self.child.schedule_ranges(ranges, filter)?;
117
118        Ok(Box::new(StructuralListSchedulingJob::new(child)))
119    }
120
121    fn initialize<'a>(
122        &'a mut self,
123        filter: &'a FilterExpression,
124        context: &'a SchedulerContext,
125    ) -> BoxFuture<'a, Result<()>> {
126        self.child.initialize(filter, context)
127    }
128}
129
130/// Scheduling job for list data
131///
132/// Scheduling is handled by the primitive encoder and nothing special
133/// happens here.
134#[derive(Debug)]
135struct StructuralListSchedulingJob<'a> {
136    child: Box<dyn StructuralSchedulingJob + 'a>,
137}
138
139impl<'a> StructuralListSchedulingJob<'a> {
140    fn new(child: Box<dyn StructuralSchedulingJob + 'a>) -> Self {
141        Self { child }
142    }
143}
144
145impl StructuralSchedulingJob for StructuralListSchedulingJob<'_> {
146    fn schedule_next(&mut self, context: &mut SchedulerContext) -> Result<Vec<ScheduledScanLine>> {
147        self.child.schedule_next(context)
148    }
149}
150
151#[derive(Debug)]
152pub struct StructuralListDecoder {
153    child: Box<dyn StructuralFieldDecoder>,
154    data_type: DataType,
155}
156
157impl StructuralListDecoder {
158    pub fn new(child: Box<dyn StructuralFieldDecoder>, data_type: DataType) -> Self {
159        Self { child, data_type }
160    }
161}
162
163impl StructuralFieldDecoder for StructuralListDecoder {
164    fn accept_page(&mut self, child: crate::decoder::LoadedPageShard) -> Result<()> {
165        self.child.accept_page(child)
166    }
167
168    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
169        let child_task = self.child.drain(num_rows)?;
170        Ok(Box::new(StructuralListDecodeTask::new(
171            child_task,
172            self.data_type.clone(),
173        )))
174    }
175
176    fn data_type(&self) -> &DataType {
177        &self.data_type
178    }
179}
180
181#[derive(Debug)]
182struct StructuralListDecodeTask {
183    child_task: Box<dyn StructuralDecodeArrayTask>,
184    data_type: DataType,
185}
186
187impl StructuralListDecodeTask {
188    fn new(child_task: Box<dyn StructuralDecodeArrayTask>, data_type: DataType) -> Self {
189        Self {
190            child_task,
191            data_type,
192        }
193    }
194}
195
196impl StructuralDecodeArrayTask for StructuralListDecodeTask {
197    fn decode(self: Box<Self>) -> Result<DecodedArray> {
198        let DecodedArray { array, mut repdef } = self.child_task.decode()?;
199        match &self.data_type {
200            DataType::List(child_field) => {
201                let (offsets, validity) = repdef.unravel_offsets::<i32>()?;
202                let array = if !child_field.is_nullable() && array.null_count() == array.len() {
203                    make_array(array.into_data().into_builder().nulls(None).build()?)
204                } else {
205                    array
206                };
207                let list_array = ListArray::try_new(child_field.clone(), offsets, array, validity)?;
208
209                Ok(DecodedArray {
210                    array: Arc::new(list_array),
211                    repdef,
212                })
213            }
214            DataType::LargeList(child_field) => {
215                let (offsets, validity) = repdef.unravel_offsets::<i64>()?;
216                let list_array =
217                    LargeListArray::try_new(child_field.clone(), offsets, array, validity)?;
218                Ok(DecodedArray {
219                    array: Arc::new(list_array),
220                    repdef,
221                })
222            }
223            _ => panic!("List decoder did not have a list field"),
224        }
225    }
226}
227
228#[cfg(test)]
229mod tests {
230
231    use std::{collections::HashMap, sync::Arc};
232
233    use crate::constants::{
234        STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY, STRUCTURAL_ENCODING_MINIBLOCK,
235    };
236    use arrow_array::{
237        builder::{Int32Builder, Int64Builder, LargeListBuilder, ListBuilder, StringBuilder},
238        Array, ArrayRef, BooleanArray, DictionaryArray, LargeStringArray, ListArray, StructArray,
239        UInt64Array, UInt8Array,
240    };
241
242    use arrow_buffer::{BooleanBuffer, NullBuffer, OffsetBuffer, ScalarBuffer};
243    use arrow_schema::{DataType, Field, Fields};
244    use rstest::rstest;
245
246    use crate::{
247        testing::{check_basic_random, check_round_trip_encoding_of_data, TestCases},
248        version::LanceFileVersion,
249    };
250
251    fn make_list_type(inner_type: DataType) -> DataType {
252        DataType::List(Arc::new(Field::new("item", inner_type, true)))
253    }
254
255    fn make_large_list_type(inner_type: DataType) -> DataType {
256        DataType::LargeList(Arc::new(Field::new("item", inner_type, true)))
257    }
258
259    #[rstest]
260    #[test_log::test(tokio::test)]
261    async fn test_list(
262        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
263        structural_encoding: &str,
264    ) {
265        let mut field_metadata = HashMap::new();
266        field_metadata.insert(
267            STRUCTURAL_ENCODING_META_KEY.to_string(),
268            structural_encoding.into(),
269        );
270        let field =
271            Field::new("", make_list_type(DataType::Int32), true).with_metadata(field_metadata);
272        check_basic_random(field).await;
273    }
274
275    #[rstest]
276    #[test_log::test(tokio::test)]
277    async fn test_deeply_nested_lists(
278        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
279        structural_encoding: &str,
280    ) {
281        let mut field_metadata = HashMap::new();
282        field_metadata.insert(
283            STRUCTURAL_ENCODING_META_KEY.to_string(),
284            structural_encoding.into(),
285        );
286        let field = Field::new("item", DataType::Int32, true).with_metadata(field_metadata);
287        for _ in 0..5 {
288            let field = Field::new("", make_list_type(field.data_type().clone()), true);
289            check_basic_random(field).await;
290        }
291    }
292
293    #[test_log::test(tokio::test)]
294    async fn test_large_list() {
295        let field = Field::new("", make_large_list_type(DataType::Int32), true);
296        check_basic_random(field).await;
297    }
298
299    #[test_log::test(tokio::test)]
300    async fn test_nested_strings() {
301        let field = Field::new("", make_list_type(DataType::Utf8), true);
302        check_basic_random(field).await;
303    }
304
305    #[test_log::test(tokio::test)]
306    async fn test_nested_list() {
307        let field = Field::new("", make_list_type(make_list_type(DataType::Int32)), true);
308        check_basic_random(field).await;
309    }
310
311    #[test_log::test(tokio::test)]
312    async fn test_list_struct_list() {
313        let struct_type = DataType::Struct(Fields::from(vec![Field::new(
314            "inner_str",
315            DataType::Utf8,
316            false,
317        )]));
318
319        let field = Field::new("", make_list_type(struct_type), true);
320        check_basic_random(field).await;
321    }
322
323    #[test_log::test(tokio::test)]
324    async fn test_list_struct_empty() {
325        let fields = Fields::from(vec![Field::new("inner", DataType::UInt64, true)]);
326        let items = UInt64Array::from(Vec::<u64>::new());
327        let structs = StructArray::new(fields, vec![Arc::new(items)], None);
328        let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0; 2 * 1024 * 1024 + 1]));
329        let lists = ListArray::new(
330            Arc::new(Field::new("item", structs.data_type().clone(), true)),
331            offsets,
332            Arc::new(structs),
333            None,
334        );
335
336        check_round_trip_encoding_of_data(
337            vec![Arc::new(lists)],
338            &TestCases::default(),
339            HashMap::new(),
340        )
341        .await;
342    }
343
344    #[rstest]
345    #[test_log::test(tokio::test)]
346    async fn test_simple_list(
347        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
348        structural_encoding: &str,
349    ) {
350        let items_builder = Int32Builder::new();
351        let mut list_builder = ListBuilder::new(items_builder);
352        list_builder.append_value([Some(1), Some(2), Some(3)]);
353        list_builder.append_value([Some(4), Some(5)]);
354        list_builder.append_null();
355        list_builder.append_value([Some(6), Some(7), Some(8)]);
356        let list_array = list_builder.finish();
357
358        let mut field_metadata = HashMap::new();
359        field_metadata.insert(
360            STRUCTURAL_ENCODING_META_KEY.to_string(),
361            structural_encoding.into(),
362        );
363
364        let test_cases = TestCases::default()
365            .with_range(0..2)
366            .with_range(0..3)
367            .with_range(1..3)
368            .with_indices(vec![1, 3])
369            .with_indices(vec![2]);
370        check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, field_metadata)
371            .await;
372    }
373
374    #[rstest]
375    #[test_log::test(tokio::test)]
376    async fn test_simple_nested_list_ends_with_null(
377        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
378        structural_encoding: &str,
379    ) {
380        use arrow_array::Int32Array;
381
382        let values = Int32Array::from(vec![1, 2, 3, 4, 5]);
383        let inner_offsets = ScalarBuffer::<i32>::from(vec![0, 1, 2, 3, 4, 5, 5]);
384        let inner_validity = BooleanBuffer::from(vec![true, true, true, true, true, false]);
385        let outer_offsets = ScalarBuffer::<i32>::from(vec![0, 1, 2, 3, 4, 5, 6, 6]);
386        let outer_validity = BooleanBuffer::from(vec![true, true, true, true, true, true, false]);
387
388        let inner_list = ListArray::new(
389            Arc::new(Field::new("item", DataType::Int32, true)),
390            OffsetBuffer::new(inner_offsets),
391            Arc::new(values),
392            Some(NullBuffer::new(inner_validity)),
393        );
394        let outer_list = ListArray::new(
395            Arc::new(Field::new(
396                "item",
397                DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
398                true,
399            )),
400            OffsetBuffer::new(outer_offsets),
401            Arc::new(inner_list),
402            Some(NullBuffer::new(outer_validity)),
403        );
404
405        let mut field_metadata = HashMap::new();
406        field_metadata.insert(
407            STRUCTURAL_ENCODING_META_KEY.to_string(),
408            structural_encoding.into(),
409        );
410
411        let test_cases = TestCases::default()
412            .with_range(0..2)
413            .with_range(0..3)
414            .with_range(5..7)
415            .with_indices(vec![1, 6])
416            .with_indices(vec![6])
417            .with_min_file_version(LanceFileVersion::V2_1);
418        check_round_trip_encoding_of_data(vec![Arc::new(outer_list)], &test_cases, field_metadata)
419            .await;
420    }
421
422    #[rstest]
423    #[test_log::test(tokio::test)]
424    async fn test_simple_string_list(
425        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
426        structural_encoding: &str,
427    ) {
428        let items_builder = StringBuilder::new();
429        let mut list_builder = ListBuilder::new(items_builder);
430        list_builder.append_value([Some("a"), Some("bc"), Some("def")]);
431        list_builder.append_value([Some("gh"), None]);
432        list_builder.append_null();
433        list_builder.append_value([Some("ijk"), Some("lmnop"), Some("qrs")]);
434        let list_array = list_builder.finish();
435
436        let mut field_metadata = HashMap::new();
437        field_metadata.insert(
438            STRUCTURAL_ENCODING_META_KEY.to_string(),
439            structural_encoding.into(),
440        );
441
442        let test_cases = TestCases::default()
443            .with_range(0..2)
444            .with_range(0..3)
445            .with_range(1..3)
446            .with_indices(vec![1, 3])
447            .with_indices(vec![2])
448            .with_min_file_version(LanceFileVersion::V2_1);
449        check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, field_metadata)
450            .await;
451    }
452
453    #[rstest]
454    #[test_log::test(tokio::test)]
455    async fn test_simple_string_list_no_null(
456        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
457        structural_encoding: &str,
458    ) {
459        let items_builder = StringBuilder::new();
460        let mut list_builder = ListBuilder::new(items_builder);
461        list_builder.append_value([Some("a"), Some("bc"), Some("def")]);
462        list_builder.append_value([Some("gh"), Some("zxy")]);
463        list_builder.append_value([Some("gh"), Some("z")]);
464        list_builder.append_value([Some("ijk"), Some("lmnop"), Some("qrs")]);
465        let list_array = list_builder.finish();
466
467        let mut field_metadata = HashMap::new();
468        field_metadata.insert(
469            STRUCTURAL_ENCODING_META_KEY.to_string(),
470            structural_encoding.into(),
471        );
472
473        let test_cases = TestCases::default()
474            .with_range(0..2)
475            .with_range(0..3)
476            .with_range(1..3)
477            .with_indices(vec![1, 3])
478            .with_indices(vec![2])
479            .with_min_file_version(LanceFileVersion::V2_1);
480        check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, field_metadata)
481            .await;
482    }
483
484    #[rstest]
485    #[test_log::test(tokio::test)]
486    async fn test_simple_sliced_list(
487        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
488        structural_encoding: &str,
489    ) {
490        let items_builder = Int32Builder::new();
491        let mut list_builder = ListBuilder::new(items_builder);
492        list_builder.append_value([Some(1), Some(2), Some(3)]);
493        list_builder.append_value([Some(4), Some(5)]);
494        list_builder.append_null();
495        list_builder.append_value([Some(6), Some(7), Some(8)]);
496        let list_array = list_builder.finish();
497
498        let list_array = list_array.slice(1, 2);
499
500        let mut field_metadata = HashMap::new();
501        field_metadata.insert(
502            STRUCTURAL_ENCODING_META_KEY.to_string(),
503            structural_encoding.into(),
504        );
505
506        let test_cases = TestCases::default()
507            .with_range(0..2)
508            .with_range(1..2)
509            .with_indices(vec![0])
510            .with_indices(vec![1])
511            .with_min_file_version(LanceFileVersion::V2_1);
512        check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, field_metadata)
513            .await;
514    }
515
516    #[test_log::test(tokio::test)]
517    async fn test_simple_list_dict() {
518        let values = LargeStringArray::from_iter_values(["a", "bb", "ccc"]);
519        let indices = UInt8Array::from(vec![0, 1, 2, 0, 1, 2, 0, 1, 2]);
520        let dict_array = DictionaryArray::new(indices, Arc::new(values));
521        let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 3, 5, 6, 9]));
522        let list_array = ListArray::new(
523            Arc::new(Field::new("item", dict_array.data_type().clone(), true)),
524            offsets,
525            Arc::new(dict_array),
526            None,
527        );
528
529        let test_cases = TestCases::default()
530            .with_range(0..2)
531            .with_range(1..3)
532            .with_range(2..4)
533            .with_indices(vec![1])
534            .with_indices(vec![2]);
535        check_round_trip_encoding_of_data(
536            vec![Arc::new(list_array)],
537            &test_cases,
538            HashMap::default(),
539        )
540        .await;
541    }
542
543    #[test_log::test(tokio::test)]
544    async fn test_simple_list_all_null() {
545        let items = UInt64Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
546        let offsets = ScalarBuffer::<i32>::from(vec![0, 5, 8, 10]);
547        let offsets = OffsetBuffer::new(offsets);
548        let list_validity = NullBuffer::new(BooleanBuffer::from(vec![false, false, false]));
549
550        // The list array is nullable but the items are not.  Then, all lists are null.
551        let list_arr = ListArray::new(
552            Arc::new(Field::new("item", DataType::UInt64, false)),
553            offsets,
554            Arc::new(items),
555            Some(list_validity),
556        );
557
558        let test_cases = TestCases::default()
559            .with_range(0..3)
560            .with_range(1..2)
561            .with_indices(vec![1])
562            .with_indices(vec![2])
563            .with_min_file_version(LanceFileVersion::V2_1);
564        check_round_trip_encoding_of_data(
565            vec![Arc::new(list_arr)],
566            &test_cases,
567            HashMap::default(),
568        )
569        .await;
570    }
571
572    #[rstest]
573    #[test_log::test(tokio::test)]
574    async fn test_list_with_garbage_nulls(
575        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
576        structural_encoding: &str,
577    ) {
578        // In Arrow, list nulls are allowed to be non-empty, with masked garbage values
579        // Here we make a list with a null row in the middle with 3 garbage values
580        let items = UInt64Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
581        let offsets = ScalarBuffer::<i32>::from(vec![0, 5, 8, 10]);
582        let offsets = OffsetBuffer::new(offsets);
583        let list_validity = NullBuffer::new(BooleanBuffer::from(vec![true, false, true]));
584        let list_arr = ListArray::new(
585            Arc::new(Field::new("item", DataType::UInt64, true)),
586            offsets,
587            Arc::new(items),
588            Some(list_validity),
589        );
590
591        let mut field_metadata = HashMap::new();
592        field_metadata.insert(
593            STRUCTURAL_ENCODING_META_KEY.to_string(),
594            structural_encoding.into(),
595        );
596
597        let test_cases = TestCases::default()
598            .with_range(0..3)
599            .with_range(1..2)
600            .with_indices(vec![1])
601            .with_indices(vec![2])
602            .with_min_file_version(LanceFileVersion::V2_1);
603        check_round_trip_encoding_of_data(vec![Arc::new(list_arr)], &test_cases, field_metadata)
604            .await;
605    }
606
607    #[rstest]
608    #[test_log::test(tokio::test)]
609    async fn test_simple_two_page_list(
610        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
611        structural_encoding: &str,
612    ) {
613        // This is a simple pre-defined list that spans two pages.  This test is useful for
614        // debugging the repetition index
615
616        let items_builder = Int64Builder::new();
617        let mut list_builder = ListBuilder::new(items_builder);
618        for i in 0..512 {
619            list_builder.append_value([Some(i), Some(i * 2)]);
620        }
621        let list_array_1 = list_builder.finish();
622
623        let items_builder = Int64Builder::new();
624        let mut list_builder = ListBuilder::new(items_builder);
625        for i in 0..512 {
626            let i = i + 512;
627            list_builder.append_value([Some(i), Some(i * 2)]);
628        }
629        let list_array_2 = list_builder.finish();
630
631        let mut metadata = HashMap::new();
632        metadata.insert(
633            STRUCTURAL_ENCODING_META_KEY.to_string(),
634            structural_encoding.into(),
635        );
636
637        let test_cases = TestCases::default()
638            .with_min_file_version(LanceFileVersion::V2_1)
639            .with_page_sizes(vec![100])
640            .with_range(800..900);
641        check_round_trip_encoding_of_data(
642            vec![Arc::new(list_array_1), Arc::new(list_array_2)],
643            &test_cases,
644            metadata,
645        )
646        .await;
647    }
648
649    #[test_log::test(tokio::test)]
650    async fn test_simple_large_list() {
651        let items_builder = Int32Builder::new();
652        let mut list_builder = LargeListBuilder::new(items_builder);
653        list_builder.append_value([Some(1), Some(2), Some(3)]);
654        list_builder.append_value([Some(4), Some(5)]);
655        list_builder.append_null();
656        list_builder.append_value([Some(6), Some(7), Some(8)]);
657        let list_array = list_builder.finish();
658
659        let test_cases = TestCases::default()
660            .with_range(0..2)
661            .with_range(0..3)
662            .with_range(1..3)
663            .with_indices(vec![1, 3]);
664        check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, HashMap::new())
665            .await;
666    }
667
668    #[rstest]
669    #[test_log::test(tokio::test)]
670    async fn test_empty_lists(
671        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
672        structural_encoding: &str,
673    ) {
674        let mut field_metadata = HashMap::new();
675        field_metadata.insert(
676            STRUCTURAL_ENCODING_META_KEY.to_string(),
677            structural_encoding.into(),
678        );
679
680        // Scenario 1: Some lists are empty
681
682        let values = [vec![Some(1), Some(2), Some(3)], vec![], vec![None]];
683        // Test empty list at beginning, middle, and end
684        for order in [[0, 1, 2], [1, 0, 2], [2, 0, 1]] {
685            let items_builder = Int32Builder::new();
686            let mut list_builder = ListBuilder::new(items_builder);
687            for idx in order {
688                list_builder.append_value(values[idx].clone());
689            }
690            let list_array = Arc::new(list_builder.finish());
691            let test_cases = TestCases::default()
692                .with_indices(vec![1])
693                .with_indices(vec![0])
694                .with_indices(vec![2])
695                .with_indices(vec![0, 1]);
696            check_round_trip_encoding_of_data(
697                vec![list_array.clone()],
698                &test_cases,
699                field_metadata.clone(),
700            )
701            .await;
702            let test_cases = test_cases.with_batch_size(1);
703            check_round_trip_encoding_of_data(
704                vec![list_array],
705                &test_cases,
706                field_metadata.clone(),
707            )
708            .await;
709        }
710
711        // Scenario 2: All lists are empty
712
713        // When encoding a list of empty lists there are no items to encode
714        // which is strange and we want to ensure we handle it
715        let items_builder = Int32Builder::new();
716        let mut list_builder = ListBuilder::new(items_builder);
717        list_builder.append(true);
718        list_builder.append_null();
719        list_builder.append(true);
720        let list_array = Arc::new(list_builder.finish());
721
722        let test_cases = TestCases::default().with_range(0..2).with_indices(vec![1]);
723        check_round_trip_encoding_of_data(
724            vec![list_array.clone()],
725            &test_cases,
726            field_metadata.clone(),
727        )
728        .await;
729        let test_cases = test_cases.with_batch_size(1);
730        check_round_trip_encoding_of_data(vec![list_array], &test_cases, field_metadata.clone())
731            .await;
732
733        // Scenario 2B: All lists are empty (but now with strings)
734
735        // When encoding a list of empty lists there are no items to encode
736        // which is strange and we want to ensure we handle it
737        let items_builder = StringBuilder::new();
738        let mut list_builder = ListBuilder::new(items_builder);
739        list_builder.append(true);
740        list_builder.append_null();
741        list_builder.append(true);
742        let list_array = Arc::new(list_builder.finish());
743
744        let test_cases = TestCases::default().with_range(0..2).with_indices(vec![1]);
745        check_round_trip_encoding_of_data(
746            vec![list_array.clone()],
747            &test_cases,
748            field_metadata.clone(),
749        )
750        .await;
751        let test_cases = test_cases.with_batch_size(1);
752        check_round_trip_encoding_of_data(vec![list_array], &test_cases, field_metadata.clone())
753            .await;
754
755        // Scenario 3: All lists are null
756
757        let items_builder = Int32Builder::new();
758        let mut list_builder = ListBuilder::new(items_builder);
759        list_builder.append_null();
760        list_builder.append_null();
761        list_builder.append_null();
762        let list_array = Arc::new(list_builder.finish());
763
764        let test_cases = TestCases::default().with_range(0..2).with_indices(vec![1]);
765        check_round_trip_encoding_of_data(
766            vec![list_array.clone()],
767            &test_cases,
768            field_metadata.clone(),
769        )
770        .await;
771        let test_cases = test_cases.with_batch_size(1);
772        check_round_trip_encoding_of_data(vec![list_array], &test_cases, field_metadata.clone())
773            .await;
774
775        // Scenario 4: All lists are null and inside a struct (only valid for 2.1 since 2.0 doesn't
776        // support null structs)
777        let items_builder = Int32Builder::new();
778        let mut list_builder = ListBuilder::new(items_builder);
779        list_builder.append_null();
780        list_builder.append_null();
781        list_builder.append_null();
782        let list_array = Arc::new(list_builder.finish());
783
784        let struct_validity = NullBuffer::new(BooleanBuffer::from(vec![true, false, true]));
785        let struct_array = Arc::new(StructArray::new(
786            Fields::from(vec![Field::new(
787                "lists",
788                list_array.data_type().clone(),
789                true,
790            )]),
791            vec![list_array],
792            Some(struct_validity),
793        ));
794
795        let test_cases = TestCases::default()
796            .with_range(0..2)
797            .with_indices(vec![1])
798            .with_min_file_version(LanceFileVersion::V2_1);
799        check_round_trip_encoding_of_data(
800            vec![struct_array.clone()],
801            &test_cases,
802            field_metadata.clone(),
803        )
804        .await;
805        let test_cases = test_cases.with_batch_size(1);
806        check_round_trip_encoding_of_data(vec![struct_array], &test_cases, field_metadata.clone())
807            .await;
808    }
809
810    #[test_log::test(tokio::test)]
811    async fn test_empty_list_list() {
812        let items_builder = Int32Builder::new();
813        let list_builder = ListBuilder::new(items_builder);
814        let mut outer_list_builder = ListBuilder::new(list_builder);
815        outer_list_builder.append_null();
816        outer_list_builder.append_null();
817        outer_list_builder.append_null();
818        let list_array = Arc::new(outer_list_builder.finish());
819
820        let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
821        check_round_trip_encoding_of_data(vec![list_array], &test_cases, HashMap::new()).await;
822    }
823
824    #[test_log::test(tokio::test)]
825    #[ignore] // This test is quite slow in debug mode
826    async fn test_jumbo_list() {
827        // This is an overflow test.  We have a list of lists where each list
828        // has 1Mi items.  We encode 5000 of these lists and so we have over 4Gi in the
829        // offsets range
830        let items = BooleanArray::new_null(1024 * 1024);
831        let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 1024 * 1024]));
832        let list_arr = Arc::new(ListArray::new(
833            Arc::new(Field::new("item", DataType::Boolean, true)),
834            offsets,
835            Arc::new(items),
836            None,
837        )) as ArrayRef;
838        let arrs = vec![list_arr; 5000];
839
840        // We can't validate because our validation relies on concatenating all input arrays
841        let test_cases = TestCases::default().without_validation();
842        check_round_trip_encoding_of_data(arrs, &test_cases, HashMap::new()).await;
843    }
844
845    // Regression test for issue with ListArray encoding when crossing 1024 value boundary
846    // This test reproduces the bug where rows_avail assertion fails in schedule_instructions
847    // when encoding a ListArray with specific size patterns that cross the 1024 value boundary
848    #[tokio::test]
849    async fn test_fuzz_issue_4466() {
850        // This specific pattern of list sizes triggers the bug when total values cross 1024
851        // 94 lists total 1009 values (passes), 95 lists total 1025 values (fails)
852        let list_sizes = vec![
853            13, 18, 12, 7, 14, 12, 6, 13, 18, 8, // 0-9: 119 values
854            6, 11, 17, 12, 8, 19, 5, 6, 10, 13, // 10-19: 107 values
855            8, 6, 10, 4, 8, 16, 14, 12, 18, 9, // 20-29: 105 values
856            17, 8, 14, 18, 15, 3, 2, 4, 5, 1, // 30-39: 82 values
857            3, 13, 1, 2, 10, 4, 10, 18, 7, 14, // 40-49: 75 values
858            18, 13, 9, 17, 3, 13, 10, 14, 8, 19, // 50-59: 125 values
859            17, 10, 5, 11, 6, 15, 10, 18, 18, 20, // 60-69: 130 values
860            16, 11, 12, 15, 7, 9, 3, 10, 20, 5, // 70-79: 102 values
861            2, 3, 17, 4, 8, 12, 15, 6, 3, 20, // 80-89: 90 values
862            15, 20, 1, 19, 16, // 90-94: 71 values
863        ];
864
865        // Build the ListArray
866        let mut list_builder = ListBuilder::new(Int32Builder::new());
867        let mut total_values = 0;
868
869        for size in &list_sizes {
870            for i in 0..*size {
871                list_builder.values().append_value(i);
872            }
873            list_builder.append(true);
874            total_values += size;
875        }
876
877        let list_array = Arc::new(list_builder.finish());
878
879        // Verify we have the expected number of values
880        assert_eq!(list_array.len(), 95);
881        assert_eq!(total_values, 1025);
882
883        // This should trigger the assertion failure at primitive.rs:1362
884        // debug_assert!(rows_avail > 0)
885        let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
886
887        // The bug manifests when encoding this specific pattern
888        // Expected: successful round-trip encoding
889        // Actual: panic at primitive.rs:1362 - assertion failed: rows_avail > 0
890        check_round_trip_encoding_of_data(vec![list_array], &test_cases, HashMap::new()).await;
891    }
892}