Skip to main content

lance_encoding/encodings/logical/
list.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{ops::Range, sync::Arc};
5
6use arrow_array::{Array, ArrayRef, LargeListArray, ListArray, cast::AsArray, make_array};
7use arrow_schema::DataType;
8use futures::future::BoxFuture;
9use lance_arrow::deepcopy::deep_copy_nulls;
10use lance_arrow::list::ListArrayExt;
11use lance_core::Result;
12
13use crate::{
14    decoder::{
15        DecodedArray, FilterExpression, ScheduledScanLine, SchedulerContext,
16        StructuralDecodeArrayTask, StructuralFieldDecoder, StructuralFieldScheduler,
17        StructuralSchedulingJob,
18    },
19    encoder::{EncodeTask, FieldEncoder, OutOfLineBuffers},
20    repdef::RepDefBuilder,
21};
22
23/// A structural encoder for list fields
24///
25/// The list's offsets are added to the rep/def builder
26/// and the list array's values are passed to the child encoder
27///
28/// The values will have any garbage values removed and will be trimmed
29/// to only include the values that are actually used.
30pub struct ListStructuralEncoder {
31    keep_original_array: bool,
32    child: Box<dyn FieldEncoder>,
33}
34
35impl ListStructuralEncoder {
36    pub fn new(keep_original_array: bool, child: Box<dyn FieldEncoder>) -> Self {
37        Self {
38            keep_original_array,
39            child,
40        }
41    }
42}
43
44impl FieldEncoder for ListStructuralEncoder {
45    fn maybe_encode(
46        &mut self,
47        array: ArrayRef,
48        external_buffers: &mut OutOfLineBuffers,
49        mut repdef: RepDefBuilder,
50        row_number: u64,
51        num_rows: u64,
52    ) -> Result<Vec<EncodeTask>> {
53        let values = if let Some(list_arr) = array.as_list_opt::<i32>() {
54            let has_garbage_values = if self.keep_original_array {
55                repdef.add_offsets(list_arr.offsets().clone(), array.nulls().cloned())
56            } else {
57                // there is no need to deep copy offsets, because offset buffers will be cast to a common type (i64).
58                repdef.add_offsets(list_arr.offsets().clone(), deep_copy_nulls(array.nulls()))
59            };
60            if has_garbage_values {
61                list_arr.filter_garbage_nulls().trimmed_values()
62            } else {
63                list_arr.trimmed_values()
64            }
65        } else if let Some(list_arr) = array.as_list_opt::<i64>() {
66            let has_garbage_values = if self.keep_original_array {
67                repdef.add_offsets(list_arr.offsets().clone(), array.nulls().cloned())
68            } else {
69                repdef.add_offsets(list_arr.offsets().clone(), deep_copy_nulls(array.nulls()))
70            };
71            if has_garbage_values {
72                list_arr.filter_garbage_nulls().trimmed_values()
73            } else {
74                list_arr.trimmed_values()
75            }
76        } else {
77            panic!("List encoder used for non-list data")
78        };
79        self.child
80            .maybe_encode(values, external_buffers, repdef, row_number, num_rows)
81    }
82
83    fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
84        self.child.flush(external_buffers)
85    }
86
87    fn num_columns(&self) -> u32 {
88        self.child.num_columns()
89    }
90
91    fn finish(
92        &mut self,
93        external_buffers: &mut OutOfLineBuffers,
94    ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
95        self.child.finish(external_buffers)
96    }
97}
98
99#[derive(Debug)]
100pub struct StructuralListScheduler {
101    child: Box<dyn StructuralFieldScheduler>,
102}
103
104impl StructuralListScheduler {
105    pub fn new(child: Box<dyn StructuralFieldScheduler>) -> Self {
106        Self { child }
107    }
108}
109
110impl StructuralFieldScheduler for StructuralListScheduler {
111    fn schedule_ranges<'a>(
112        &'a self,
113        ranges: &[Range<u64>],
114        filter: &FilterExpression,
115    ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
116        let child = self.child.schedule_ranges(ranges, filter)?;
117
118        Ok(Box::new(StructuralListSchedulingJob::new(child)))
119    }
120
121    fn initialize<'a>(
122        &'a mut self,
123        filter: &'a FilterExpression,
124        context: &'a SchedulerContext,
125    ) -> BoxFuture<'a, Result<()>> {
126        self.child.initialize(filter, context)
127    }
128}
129
130/// Scheduling job for list data
131///
132/// Scheduling is handled by the primitive encoder and nothing special
133/// happens here.
134#[derive(Debug)]
135struct StructuralListSchedulingJob<'a> {
136    child: Box<dyn StructuralSchedulingJob + 'a>,
137}
138
139impl<'a> StructuralListSchedulingJob<'a> {
140    fn new(child: Box<dyn StructuralSchedulingJob + 'a>) -> Self {
141        Self { child }
142    }
143}
144
145impl StructuralSchedulingJob for StructuralListSchedulingJob<'_> {
146    fn schedule_next(&mut self, context: &mut SchedulerContext) -> Result<Vec<ScheduledScanLine>> {
147        self.child.schedule_next(context)
148    }
149}
150
151#[derive(Debug)]
152pub struct StructuralListDecoder {
153    child: Box<dyn StructuralFieldDecoder>,
154    data_type: DataType,
155}
156
157impl StructuralListDecoder {
158    pub fn new(child: Box<dyn StructuralFieldDecoder>, data_type: DataType) -> Self {
159        Self { child, data_type }
160    }
161}
162
163impl StructuralFieldDecoder for StructuralListDecoder {
164    fn accept_page(&mut self, child: crate::decoder::LoadedPageShard) -> Result<()> {
165        self.child.accept_page(child)
166    }
167
168    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
169        let child_task = self.child.drain(num_rows)?;
170        Ok(Box::new(StructuralListDecodeTask::new(
171            child_task,
172            self.data_type.clone(),
173        )))
174    }
175
176    fn data_type(&self) -> &DataType {
177        &self.data_type
178    }
179}
180
181#[derive(Debug)]
182struct StructuralListDecodeTask {
183    child_task: Box<dyn StructuralDecodeArrayTask>,
184    data_type: DataType,
185}
186
187impl StructuralListDecodeTask {
188    fn new(child_task: Box<dyn StructuralDecodeArrayTask>, data_type: DataType) -> Self {
189        Self {
190            child_task,
191            data_type,
192        }
193    }
194}
195
196impl StructuralDecodeArrayTask for StructuralListDecodeTask {
197    fn decode(self: Box<Self>) -> Result<DecodedArray> {
198        let DecodedArray {
199            array,
200            mut repdef,
201            data_size,
202        } = self.child_task.decode()?;
203        match &self.data_type {
204            DataType::List(child_field) => {
205                let (offsets, validity) = repdef.unravel_offsets::<i32>()?;
206                let array = if !child_field.is_nullable() && array.null_count() == array.len() {
207                    make_array(array.into_data().into_builder().nulls(None).build()?)
208                } else {
209                    array
210                };
211                let list_array = ListArray::try_new(child_field.clone(), offsets, array, validity)?;
212
213                Ok(DecodedArray {
214                    array: Arc::new(list_array),
215                    repdef,
216                    data_size,
217                })
218            }
219            DataType::LargeList(child_field) => {
220                let (offsets, validity) = repdef.unravel_offsets::<i64>()?;
221                let list_array =
222                    LargeListArray::try_new(child_field.clone(), offsets, array, validity)?;
223                Ok(DecodedArray {
224                    array: Arc::new(list_array),
225                    repdef,
226                    data_size,
227                })
228            }
229            _ => panic!("List decoder did not have a list field"),
230        }
231    }
232}
233
234#[cfg(test)]
235mod tests {
236
237    use std::{collections::HashMap, sync::Arc};
238
239    use crate::constants::{
240        STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY, STRUCTURAL_ENCODING_MINIBLOCK,
241    };
242    use arrow_array::{
243        Array, ArrayRef, BooleanArray, DictionaryArray, LargeStringArray, ListArray, StructArray,
244        UInt8Array, UInt64Array,
245        builder::{
246            Int32Builder, Int64Builder, LargeListBuilder, ListBuilder, StringBuilder, UInt32Builder,
247        },
248    };
249
250    use arrow_buffer::{BooleanBuffer, NullBuffer, OffsetBuffer, ScalarBuffer};
251    use arrow_schema::{DataType, Field, Fields};
252    use rstest::rstest;
253
254    use crate::{
255        testing::{TestCases, check_basic_random, check_round_trip_encoding_of_data},
256        version::LanceFileVersion,
257    };
258
259    fn make_list_type(inner_type: DataType) -> DataType {
260        DataType::List(Arc::new(Field::new("item", inner_type, true)))
261    }
262
263    fn make_large_list_type(inner_type: DataType) -> DataType {
264        DataType::LargeList(Arc::new(Field::new("item", inner_type, true)))
265    }
266
267    async fn try_encode_v22_pages(
268        array: ArrayRef,
269    ) -> lance_core::Result<Vec<crate::encoder::EncodedPage>> {
270        try_encode_v22_pages_with_metadata(array, HashMap::new()).await
271    }
272
273    async fn try_encode_v22_pages_with_metadata(
274        array: ArrayRef,
275        field_metadata: HashMap<String, String>,
276    ) -> lance_core::Result<Vec<crate::encoder::EncodedPage>> {
277        let arrow_field =
278            Field::new("", array.data_type().clone(), true).with_metadata(field_metadata);
279        let lance_field = lance_core::datatypes::Field::try_from(&arrow_field).unwrap();
280        let encoding_strategy = crate::encoder::default_encoding_strategy(LanceFileVersion::V2_2);
281        let mut column_index_seq = crate::encoder::ColumnIndexSequence::default();
282        let encoding_options = crate::encoder::EncodingOptions {
283            version: LanceFileVersion::V2_2,
284            ..Default::default()
285        };
286        let mut encoder = encoding_strategy
287            .create_field_encoder(
288                encoding_strategy.as_ref(),
289                &lance_field,
290                &mut column_index_seq,
291                &encoding_options,
292            )
293            .unwrap();
294        let mut external_buffers =
295            crate::encoder::OutOfLineBuffers::new(0, crate::encoder::MIN_PAGE_BUFFER_ALIGNMENT);
296        let num_rows = array.len() as u64;
297        let mut pages = Vec::new();
298        for task in encoder
299            .maybe_encode(
300                array,
301                &mut external_buffers,
302                crate::repdef::RepDefBuilder::default(),
303                0,
304                num_rows,
305            )
306            .unwrap()
307        {
308            pages.push(task.await?);
309        }
310        for task in encoder.flush(&mut external_buffers).unwrap() {
311            pages.push(task.await?);
312        }
313        Ok(pages)
314    }
315
316    async fn encode_v22_pages(array: ArrayRef) -> Vec<crate::encoder::EncodedPage> {
317        try_encode_v22_pages(array).await.unwrap()
318    }
319
320    fn assert_split_miniblock_layout(
321        pages: &[crate::encoder::EncodedPage],
322        expect_structural_only_page: bool,
323    ) {
324        let mut miniblock_pages = 0;
325        let mut fullzip_pages = 0;
326        let mut structural_only_pages = 0;
327
328        for page in pages {
329            let crate::decoder::PageEncoding::Structural(layout) = &page.description else {
330                continue;
331            };
332            match layout.layout.as_ref().unwrap() {
333                crate::format::pb21::page_layout::Layout::MiniBlockLayout(_) => {
334                    miniblock_pages += 1;
335                }
336                crate::format::pb21::page_layout::Layout::FullZipLayout(_) => {
337                    fullzip_pages += 1;
338                }
339                crate::format::pb21::page_layout::Layout::ConstantLayout(layout) => {
340                    if layout.inline_value.is_none()
341                        && (layout.num_rep_values > 0 || layout.num_def_values > 0)
342                    {
343                        structural_only_pages += 1;
344                    }
345                }
346                crate::format::pb21::page_layout::Layout::BlobLayout(_) => {}
347            }
348        }
349
350        assert!(
351            miniblock_pages > 0,
352            "expected leaf values to remain on mini-block pages"
353        );
354        assert_eq!(
355            fullzip_pages, 0,
356            "split list pages should not fall back to full-zip"
357        );
358        if expect_structural_only_page {
359            assert!(
360                structural_only_pages > 0,
361                "expected at least one structural-only page"
362            );
363        }
364    }
365
366    fn assert_has_fullzip_layout(pages: &[crate::encoder::EncodedPage]) {
367        let has_fullzip = pages.iter().any(|page| {
368            let crate::decoder::PageEncoding::Structural(layout) = &page.description else {
369                return false;
370            };
371            matches!(
372                layout.layout.as_ref().unwrap(),
373                crate::format::pb21::page_layout::Layout::FullZipLayout(_)
374            )
375        });
376        assert!(has_fullzip, "expected at least one full-zip page");
377    }
378
379    #[rstest]
380    #[test_log::test(tokio::test)]
381    async fn test_list(
382        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
383        structural_encoding: &str,
384    ) {
385        let mut field_metadata = HashMap::new();
386        field_metadata.insert(
387            STRUCTURAL_ENCODING_META_KEY.to_string(),
388            structural_encoding.into(),
389        );
390        let field =
391            Field::new("", make_list_type(DataType::Int32), true).with_metadata(field_metadata);
392        check_basic_random(field).await;
393    }
394
395    #[rstest]
396    #[test_log::test(tokio::test)]
397    async fn test_deeply_nested_lists(
398        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
399        structural_encoding: &str,
400    ) {
401        let mut field_metadata = HashMap::new();
402        field_metadata.insert(
403            STRUCTURAL_ENCODING_META_KEY.to_string(),
404            structural_encoding.into(),
405        );
406        let field = Field::new("item", DataType::Int32, true).with_metadata(field_metadata);
407        for _ in 0..5 {
408            let field = Field::new("", make_list_type(field.data_type().clone()), true);
409            check_basic_random(field).await;
410        }
411    }
412
413    #[test_log::test(tokio::test)]
414    async fn test_large_list() {
415        let field = Field::new("", make_large_list_type(DataType::Int32), true);
416        check_basic_random(field).await;
417    }
418
419    #[test_log::test(tokio::test)]
420    async fn test_nested_strings() {
421        let field = Field::new("", make_list_type(DataType::Utf8), true);
422        check_basic_random(field).await;
423    }
424
425    #[test_log::test(tokio::test)]
426    async fn test_nested_list() {
427        let field = Field::new("", make_list_type(make_list_type(DataType::Int32)), true);
428        check_basic_random(field).await;
429    }
430
431    #[test_log::test(tokio::test)]
432    async fn test_list_struct_list() {
433        let struct_type = DataType::Struct(Fields::from(vec![Field::new(
434            "inner_str",
435            DataType::Utf8,
436            false,
437        )]));
438
439        let field = Field::new("", make_list_type(struct_type), true);
440        check_basic_random(field).await;
441    }
442
443    #[test_log::test(tokio::test)]
444    async fn test_list_struct_empty() {
445        let fields = Fields::from(vec![Field::new("inner", DataType::UInt64, true)]);
446        let items = UInt64Array::from(Vec::<u64>::new());
447        let structs = StructArray::new(fields, vec![Arc::new(items)], None);
448        let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0; 2 * 1024 * 1024 + 1]));
449        let lists = ListArray::new(
450            Arc::new(Field::new("item", structs.data_type().clone(), true)),
451            offsets,
452            Arc::new(structs),
453            None,
454        );
455
456        check_round_trip_encoding_of_data(
457            vec![Arc::new(lists)],
458            &TestCases::default(),
459            HashMap::new(),
460        )
461        .await;
462    }
463
464    #[rstest]
465    #[test_log::test(tokio::test)]
466    async fn test_simple_list(
467        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
468        structural_encoding: &str,
469    ) {
470        let items_builder = Int32Builder::new();
471        let mut list_builder = ListBuilder::new(items_builder);
472        list_builder.append_value([Some(1), Some(2), Some(3)]);
473        list_builder.append_value([Some(4), Some(5)]);
474        list_builder.append_null();
475        list_builder.append_value([Some(6), Some(7), Some(8)]);
476        let list_array = list_builder.finish();
477
478        let mut field_metadata = HashMap::new();
479        field_metadata.insert(
480            STRUCTURAL_ENCODING_META_KEY.to_string(),
481            structural_encoding.into(),
482        );
483
484        let test_cases = TestCases::default()
485            .with_range(0..2)
486            .with_range(0..3)
487            .with_range(1..3)
488            .with_indices(vec![1, 3])
489            .with_indices(vec![2]);
490        check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, field_metadata)
491            .await;
492    }
493
494    #[rstest]
495    #[test_log::test(tokio::test)]
496    async fn test_simple_nested_list_ends_with_null(
497        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
498        structural_encoding: &str,
499    ) {
500        use arrow_array::Int32Array;
501
502        let values = Int32Array::from(vec![1, 2, 3, 4, 5]);
503        let inner_offsets = ScalarBuffer::<i32>::from(vec![0, 1, 2, 3, 4, 5, 5]);
504        let inner_validity = BooleanBuffer::from(vec![true, true, true, true, true, false]);
505        let outer_offsets = ScalarBuffer::<i32>::from(vec![0, 1, 2, 3, 4, 5, 6, 6]);
506        let outer_validity = BooleanBuffer::from(vec![true, true, true, true, true, true, false]);
507
508        let inner_list = ListArray::new(
509            Arc::new(Field::new("item", DataType::Int32, true)),
510            OffsetBuffer::new(inner_offsets),
511            Arc::new(values),
512            Some(NullBuffer::new(inner_validity)),
513        );
514        let outer_list = ListArray::new(
515            Arc::new(Field::new(
516                "item",
517                DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
518                true,
519            )),
520            OffsetBuffer::new(outer_offsets),
521            Arc::new(inner_list),
522            Some(NullBuffer::new(outer_validity)),
523        );
524
525        let mut field_metadata = HashMap::new();
526        field_metadata.insert(
527            STRUCTURAL_ENCODING_META_KEY.to_string(),
528            structural_encoding.into(),
529        );
530
531        let test_cases = TestCases::default()
532            .with_range(0..2)
533            .with_range(0..3)
534            .with_range(5..7)
535            .with_indices(vec![1, 6])
536            .with_indices(vec![6])
537            .with_min_file_version(LanceFileVersion::V2_1);
538        check_round_trip_encoding_of_data(vec![Arc::new(outer_list)], &test_cases, field_metadata)
539            .await;
540    }
541
542    #[rstest]
543    #[test_log::test(tokio::test)]
544    async fn test_simple_string_list(
545        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
546        structural_encoding: &str,
547    ) {
548        let items_builder = StringBuilder::new();
549        let mut list_builder = ListBuilder::new(items_builder);
550        list_builder.append_value([Some("a"), Some("bc"), Some("def")]);
551        list_builder.append_value([Some("gh"), None]);
552        list_builder.append_null();
553        list_builder.append_value([Some("ijk"), Some("lmnop"), Some("qrs")]);
554        let list_array = list_builder.finish();
555
556        let mut field_metadata = HashMap::new();
557        field_metadata.insert(
558            STRUCTURAL_ENCODING_META_KEY.to_string(),
559            structural_encoding.into(),
560        );
561
562        let test_cases = TestCases::default()
563            .with_range(0..2)
564            .with_range(0..3)
565            .with_range(1..3)
566            .with_indices(vec![1, 3])
567            .with_indices(vec![2])
568            .with_min_file_version(LanceFileVersion::V2_1);
569        check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, field_metadata)
570            .await;
571    }
572
573    #[rstest]
574    #[test_log::test(tokio::test)]
575    async fn test_simple_string_list_no_null(
576        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
577        structural_encoding: &str,
578    ) {
579        let items_builder = StringBuilder::new();
580        let mut list_builder = ListBuilder::new(items_builder);
581        list_builder.append_value([Some("a"), Some("bc"), Some("def")]);
582        list_builder.append_value([Some("gh"), Some("zxy")]);
583        list_builder.append_value([Some("gh"), Some("z")]);
584        list_builder.append_value([Some("ijk"), Some("lmnop"), Some("qrs")]);
585        let list_array = list_builder.finish();
586
587        let mut field_metadata = HashMap::new();
588        field_metadata.insert(
589            STRUCTURAL_ENCODING_META_KEY.to_string(),
590            structural_encoding.into(),
591        );
592
593        let test_cases = TestCases::default()
594            .with_range(0..2)
595            .with_range(0..3)
596            .with_range(1..3)
597            .with_indices(vec![1, 3])
598            .with_indices(vec![2])
599            .with_min_file_version(LanceFileVersion::V2_1);
600        check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, field_metadata)
601            .await;
602    }
603
604    #[rstest]
605    #[test_log::test(tokio::test)]
606    async fn test_simple_sliced_list(
607        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
608        structural_encoding: &str,
609    ) {
610        let items_builder = Int32Builder::new();
611        let mut list_builder = ListBuilder::new(items_builder);
612        list_builder.append_value([Some(1), Some(2), Some(3)]);
613        list_builder.append_value([Some(4), Some(5)]);
614        list_builder.append_null();
615        list_builder.append_value([Some(6), Some(7), Some(8)]);
616        let list_array = list_builder.finish();
617
618        let list_array = list_array.slice(1, 2);
619
620        let mut field_metadata = HashMap::new();
621        field_metadata.insert(
622            STRUCTURAL_ENCODING_META_KEY.to_string(),
623            structural_encoding.into(),
624        );
625
626        let test_cases = TestCases::default()
627            .with_range(0..2)
628            .with_range(1..2)
629            .with_indices(vec![0])
630            .with_indices(vec![1])
631            .with_min_file_version(LanceFileVersion::V2_1);
632        check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, field_metadata)
633            .await;
634    }
635
636    #[test_log::test(tokio::test)]
637    async fn test_simple_list_dict() {
638        let values = LargeStringArray::from_iter_values(["a", "bb", "ccc"]);
639        let indices = UInt8Array::from(vec![0, 1, 2, 0, 1, 2, 0, 1, 2]);
640        let dict_array = DictionaryArray::new(indices, Arc::new(values));
641        let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 3, 5, 6, 9]));
642        let list_array = ListArray::new(
643            Arc::new(Field::new("item", dict_array.data_type().clone(), true)),
644            offsets,
645            Arc::new(dict_array),
646            None,
647        );
648
649        let test_cases = TestCases::default()
650            .with_range(0..2)
651            .with_range(1..3)
652            .with_range(2..4)
653            .with_indices(vec![1])
654            .with_indices(vec![2]);
655        check_round_trip_encoding_of_data(
656            vec![Arc::new(list_array)],
657            &test_cases,
658            HashMap::default(),
659        )
660        .await;
661    }
662
663    #[test_log::test(tokio::test)]
664    async fn test_simple_list_all_null() {
665        let items = UInt64Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
666        let offsets = ScalarBuffer::<i32>::from(vec![0, 5, 8, 10]);
667        let offsets = OffsetBuffer::new(offsets);
668        let list_validity = NullBuffer::new(BooleanBuffer::from(vec![false, false, false]));
669
670        // The list array is nullable but the items are not.  Then, all lists are null.
671        let list_arr = ListArray::new(
672            Arc::new(Field::new("item", DataType::UInt64, false)),
673            offsets,
674            Arc::new(items),
675            Some(list_validity),
676        );
677
678        let test_cases = TestCases::default()
679            .with_range(0..3)
680            .with_range(1..2)
681            .with_indices(vec![1])
682            .with_indices(vec![2])
683            .with_min_file_version(LanceFileVersion::V2_1);
684        check_round_trip_encoding_of_data(
685            vec![Arc::new(list_arr)],
686            &test_cases,
687            HashMap::default(),
688        )
689        .await;
690    }
691
692    #[rstest]
693    #[test_log::test(tokio::test)]
694    async fn test_list_with_garbage_nulls(
695        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
696        structural_encoding: &str,
697    ) {
698        // In Arrow, list nulls are allowed to be non-empty, with masked garbage values
699        // Here we make a list with a null row in the middle with 3 garbage values
700        let items = UInt64Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
701        let offsets = ScalarBuffer::<i32>::from(vec![0, 5, 8, 10]);
702        let offsets = OffsetBuffer::new(offsets);
703        let list_validity = NullBuffer::new(BooleanBuffer::from(vec![true, false, true]));
704        let list_arr = ListArray::new(
705            Arc::new(Field::new("item", DataType::UInt64, true)),
706            offsets,
707            Arc::new(items),
708            Some(list_validity),
709        );
710
711        let mut field_metadata = HashMap::new();
712        field_metadata.insert(
713            STRUCTURAL_ENCODING_META_KEY.to_string(),
714            structural_encoding.into(),
715        );
716
717        let test_cases = TestCases::default()
718            .with_range(0..3)
719            .with_range(1..2)
720            .with_indices(vec![1])
721            .with_indices(vec![2])
722            .with_min_file_version(LanceFileVersion::V2_1);
723        check_round_trip_encoding_of_data(vec![Arc::new(list_arr)], &test_cases, field_metadata)
724            .await;
725    }
726
727    #[rstest]
728    #[test_log::test(tokio::test)]
729    async fn test_simple_two_page_list(
730        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
731        structural_encoding: &str,
732    ) {
733        // This is a simple pre-defined list that spans two pages.  This test is useful for
734        // debugging the repetition index
735
736        let items_builder = Int64Builder::new();
737        let mut list_builder = ListBuilder::new(items_builder);
738        for i in 0..512 {
739            list_builder.append_value([Some(i), Some(i * 2)]);
740        }
741        let list_array_1 = list_builder.finish();
742
743        let items_builder = Int64Builder::new();
744        let mut list_builder = ListBuilder::new(items_builder);
745        for i in 0..512 {
746            let i = i + 512;
747            list_builder.append_value([Some(i), Some(i * 2)]);
748        }
749        let list_array_2 = list_builder.finish();
750
751        let mut metadata = HashMap::new();
752        metadata.insert(
753            STRUCTURAL_ENCODING_META_KEY.to_string(),
754            structural_encoding.into(),
755        );
756
757        let test_cases = TestCases::default()
758            .with_min_file_version(LanceFileVersion::V2_1)
759            .with_page_sizes(vec![100])
760            .with_range(800..900);
761        check_round_trip_encoding_of_data(
762            vec![Arc::new(list_array_1), Arc::new(list_array_2)],
763            &test_cases,
764            metadata,
765        )
766        .await;
767    }
768
769    #[test_log::test(tokio::test)]
770    async fn test_simple_large_list() {
771        let items_builder = Int32Builder::new();
772        let mut list_builder = LargeListBuilder::new(items_builder);
773        list_builder.append_value([Some(1), Some(2), Some(3)]);
774        list_builder.append_value([Some(4), Some(5)]);
775        list_builder.append_null();
776        list_builder.append_value([Some(6), Some(7), Some(8)]);
777        let list_array = list_builder.finish();
778
779        let test_cases = TestCases::default()
780            .with_range(0..2)
781            .with_range(0..3)
782            .with_range(1..3)
783            .with_indices(vec![1, 3]);
784        check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, HashMap::new())
785            .await;
786    }
787
788    #[rstest]
789    #[test_log::test(tokio::test)]
790    async fn test_empty_lists(
791        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
792        structural_encoding: &str,
793    ) {
794        let mut field_metadata = HashMap::new();
795        field_metadata.insert(
796            STRUCTURAL_ENCODING_META_KEY.to_string(),
797            structural_encoding.into(),
798        );
799
800        // Scenario 1: Some lists are empty
801
802        let values = [vec![Some(1), Some(2), Some(3)], vec![], vec![None]];
803        // Test empty list at beginning, middle, and end
804        for order in [[0, 1, 2], [1, 0, 2], [2, 0, 1]] {
805            let items_builder = Int32Builder::new();
806            let mut list_builder = ListBuilder::new(items_builder);
807            for idx in order {
808                list_builder.append_value(values[idx].clone());
809            }
810            let list_array = Arc::new(list_builder.finish());
811            let test_cases = TestCases::default()
812                .with_indices(vec![1])
813                .with_indices(vec![0])
814                .with_indices(vec![2])
815                .with_indices(vec![0, 1]);
816            check_round_trip_encoding_of_data(
817                vec![list_array.clone()],
818                &test_cases,
819                field_metadata.clone(),
820            )
821            .await;
822            let test_cases = test_cases.with_batch_size(1);
823            check_round_trip_encoding_of_data(
824                vec![list_array],
825                &test_cases,
826                field_metadata.clone(),
827            )
828            .await;
829        }
830
831        // Scenario 2: All lists are empty
832
833        // When encoding a list of empty lists there are no items to encode
834        // which is strange and we want to ensure we handle it
835        let items_builder = Int32Builder::new();
836        let mut list_builder = ListBuilder::new(items_builder);
837        list_builder.append(true);
838        list_builder.append_null();
839        list_builder.append(true);
840        let list_array = Arc::new(list_builder.finish());
841
842        let test_cases = TestCases::default().with_range(0..2).with_indices(vec![1]);
843        check_round_trip_encoding_of_data(
844            vec![list_array.clone()],
845            &test_cases,
846            field_metadata.clone(),
847        )
848        .await;
849        let test_cases = test_cases.with_batch_size(1);
850        check_round_trip_encoding_of_data(vec![list_array], &test_cases, field_metadata.clone())
851            .await;
852
853        // Scenario 2B: All lists are empty (but now with strings)
854
855        // When encoding a list of empty lists there are no items to encode
856        // which is strange and we want to ensure we handle it
857        let items_builder = StringBuilder::new();
858        let mut list_builder = ListBuilder::new(items_builder);
859        list_builder.append(true);
860        list_builder.append_null();
861        list_builder.append(true);
862        let list_array = Arc::new(list_builder.finish());
863
864        let test_cases = TestCases::default().with_range(0..2).with_indices(vec![1]);
865        check_round_trip_encoding_of_data(
866            vec![list_array.clone()],
867            &test_cases,
868            field_metadata.clone(),
869        )
870        .await;
871        let test_cases = test_cases.with_batch_size(1);
872        check_round_trip_encoding_of_data(vec![list_array], &test_cases, field_metadata.clone())
873            .await;
874
875        // Scenario 3: All lists are null
876
877        let items_builder = Int32Builder::new();
878        let mut list_builder = ListBuilder::new(items_builder);
879        list_builder.append_null();
880        list_builder.append_null();
881        list_builder.append_null();
882        let list_array = Arc::new(list_builder.finish());
883
884        let test_cases = TestCases::default().with_range(0..2).with_indices(vec![1]);
885        check_round_trip_encoding_of_data(
886            vec![list_array.clone()],
887            &test_cases,
888            field_metadata.clone(),
889        )
890        .await;
891        let test_cases = test_cases.with_batch_size(1);
892        check_round_trip_encoding_of_data(vec![list_array], &test_cases, field_metadata.clone())
893            .await;
894
895        // Scenario 4: All lists are null and inside a struct (only valid for 2.1 since 2.0 doesn't
896        // support null structs)
897        let items_builder = Int32Builder::new();
898        let mut list_builder = ListBuilder::new(items_builder);
899        list_builder.append_null();
900        list_builder.append_null();
901        list_builder.append_null();
902        let list_array = Arc::new(list_builder.finish());
903
904        let struct_validity = NullBuffer::new(BooleanBuffer::from(vec![true, false, true]));
905        let struct_array = Arc::new(StructArray::new(
906            Fields::from(vec![Field::new(
907                "lists",
908                list_array.data_type().clone(),
909                true,
910            )]),
911            vec![list_array],
912            Some(struct_validity),
913        ));
914
915        let test_cases = TestCases::default()
916            .with_range(0..2)
917            .with_indices(vec![1])
918            .with_min_file_version(LanceFileVersion::V2_1);
919        check_round_trip_encoding_of_data(
920            vec![struct_array.clone()],
921            &test_cases,
922            field_metadata.clone(),
923        )
924        .await;
925        let test_cases = test_cases.with_batch_size(1);
926        check_round_trip_encoding_of_data(vec![struct_array], &test_cases, field_metadata.clone())
927            .await;
928    }
929
930    #[test_log::test(tokio::test)]
931    async fn test_empty_list_list() {
932        let items_builder = Int32Builder::new();
933        let list_builder = ListBuilder::new(items_builder);
934        let mut outer_list_builder = ListBuilder::new(list_builder);
935        outer_list_builder.append_null();
936        outer_list_builder.append_null();
937        outer_list_builder.append_null();
938        let list_array = Arc::new(outer_list_builder.finish());
939
940        let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
941        check_round_trip_encoding_of_data(vec![list_array], &test_cases, HashMap::new()).await;
942    }
943
944    #[test_log::test(tokio::test)]
945    #[ignore] // This test is quite slow in debug mode
946    async fn test_jumbo_list() {
947        // This is an overflow test.  We have a list of lists where each list
948        // has 1Mi items.  We encode 5000 of these lists and so we have over 4Gi in the
949        // offsets range
950        let items = BooleanArray::new_null(1024 * 1024);
951        let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 1024 * 1024]));
952        let list_arr = Arc::new(ListArray::new(
953            Arc::new(Field::new("item", DataType::Boolean, true)),
954            offsets,
955            Arc::new(items),
956            None,
957        )) as ArrayRef;
958        let arrs = vec![list_arr; 5000];
959
960        // We can't validate because our validation relies on concatenating all input arrays
961        let test_cases = TestCases::default().without_validation();
962        check_round_trip_encoding_of_data(arrs, &test_cases, HashMap::new()).await;
963    }
964
965    // Regression test for issue with ListArray encoding when crossing 1024 value boundary
966    // This test reproduces the bug where rows_avail assertion fails in schedule_instructions
967    // when encoding a ListArray with specific size patterns that cross the 1024 value boundary
968    #[tokio::test]
969    async fn test_fuzz_issue_4466() {
970        // This specific pattern of list sizes triggers the bug when total values cross 1024
971        // 94 lists total 1009 values (passes), 95 lists total 1025 values (fails)
972        let list_sizes = vec![
973            13, 18, 12, 7, 14, 12, 6, 13, 18, 8, // 0-9: 119 values
974            6, 11, 17, 12, 8, 19, 5, 6, 10, 13, // 10-19: 107 values
975            8, 6, 10, 4, 8, 16, 14, 12, 18, 9, // 20-29: 105 values
976            17, 8, 14, 18, 15, 3, 2, 4, 5, 1, // 30-39: 82 values
977            3, 13, 1, 2, 10, 4, 10, 18, 7, 14, // 40-49: 75 values
978            18, 13, 9, 17, 3, 13, 10, 14, 8, 19, // 50-59: 125 values
979            17, 10, 5, 11, 6, 15, 10, 18, 18, 20, // 60-69: 130 values
980            16, 11, 12, 15, 7, 9, 3, 10, 20, 5, // 70-79: 102 values
981            2, 3, 17, 4, 8, 12, 15, 6, 3, 20, // 80-89: 90 values
982            15, 20, 1, 19, 16, // 90-94: 71 values
983        ];
984
985        // Build the ListArray
986        let mut list_builder = ListBuilder::new(Int32Builder::new());
987        let mut total_values = 0;
988
989        for size in &list_sizes {
990            for i in 0..*size {
991                list_builder.values().append_value(i);
992            }
993            list_builder.append(true);
994            total_values += size;
995        }
996
997        let list_array = Arc::new(list_builder.finish());
998
999        // Verify we have the expected number of values
1000        assert_eq!(list_array.len(), 95);
1001        assert_eq!(total_values, 1025);
1002
1003        // This should trigger the assertion failure at primitive.rs:1362
1004        // debug_assert!(rows_avail > 0)
1005        let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
1006
1007        // The bug manifests when encoding this specific pattern
1008        // Expected: successful round-trip encoding
1009        // Actual: panic at primitive.rs:1362 - assertion failed: rows_avail > 0
1010        check_round_trip_encoding_of_data(vec![list_array], &test_cases, HashMap::new()).await;
1011    }
1012
1013    #[rstest]
1014    #[test_log::test(tokio::test)]
1015    async fn test_sparse_large_string_list(
1016        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
1017        structural_encoding: &str,
1018    ) {
1019        // 2.5 million rows, mostly empty lists. ~100 lists have 10 short strings each.
1020        let num_rows = 2_500_000u32;
1021        let num_non_empty = 100u32;
1022        let strings_per_list = 10;
1023
1024        let items_builder = StringBuilder::new();
1025        let mut list_builder = ListBuilder::new(items_builder);
1026
1027        // Spread non-empty lists evenly across the range
1028        let step = num_rows / num_non_empty;
1029        let mut next_non_empty = step / 2;
1030
1031        for i in 0..num_rows {
1032            if i == next_non_empty {
1033                let vals: Vec<Option<&str>> = (0..strings_per_list)
1034                    .map(|j| match j % 4 {
1035                        0 => Some("a"),
1036                        1 => Some("bb"),
1037                        2 => Some("ccc"),
1038                        _ => Some("d"),
1039                    })
1040                    .collect();
1041                list_builder.append_value(vals);
1042                next_non_empty = next_non_empty.saturating_add(step);
1043            } else {
1044                list_builder.append_value([] as [Option<&str>; 0]);
1045            }
1046        }
1047        let list_array = list_builder.finish();
1048
1049        let mut field_metadata = HashMap::new();
1050        field_metadata.insert(
1051            STRUCTURAL_ENCODING_META_KEY.to_string(),
1052            structural_encoding.into(),
1053        );
1054
1055        let test_cases = TestCases::default()
1056            .with_range(0..1000)
1057            .with_range(0..num_rows as u64)
1058            .with_indices(vec![0, (step / 2) as u64, num_rows as u64 - 1])
1059            .with_max_file_version(LanceFileVersion::V2_2);
1060        check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, field_metadata)
1061            .await;
1062    }
1063
1064    #[test_log::test(tokio::test)]
1065    async fn test_sparse_boolean_list_uses_miniblock() {
1066        // Redacted reproduction from a production schema shape containing ARRAY(BOOLEAN).
1067        // The field names are not relevant; the failure requires sparse list structure
1068        // with a 1-bit Boolean leaf value.
1069        let num_rows = 200_000usize;
1070        let num_non_empty = 10usize;
1071        let booleans_per_list = 8usize;
1072        let step = num_rows / num_non_empty;
1073
1074        let mut offsets = Vec::with_capacity(num_rows + 1);
1075        let mut values = Vec::with_capacity(num_non_empty * booleans_per_list);
1076        offsets.push(0i32);
1077
1078        let mut next_non_empty = step / 2;
1079        for row in 0..num_rows {
1080            if row == next_non_empty {
1081                values.extend((0..booleans_per_list).map(|idx| idx % 2 == 0));
1082                next_non_empty += step;
1083            }
1084            offsets.push(values.len() as i32);
1085        }
1086
1087        let items = BooleanArray::from(values);
1088        let list_array = ListArray::new(
1089            Arc::new(Field::new("item", DataType::Boolean, true)),
1090            OffsetBuffer::new(ScalarBuffer::from(offsets)),
1091            Arc::new(items),
1092            None,
1093        );
1094
1095        let test_cases = TestCases::default()
1096            .with_range(0..1000)
1097            .with_range(0..num_rows as u64)
1098            .with_indices(vec![0, (step / 2) as u64, num_rows as u64 - 1])
1099            .with_max_file_version(LanceFileVersion::V2_2);
1100        let list_array = Arc::new(list_array) as ArrayRef;
1101        let pages = encode_v22_pages(list_array.clone()).await;
1102        assert_split_miniblock_layout(&pages, false);
1103        check_round_trip_encoding_of_data(vec![list_array], &test_cases, HashMap::new()).await;
1104    }
1105
1106    #[test_log::test(tokio::test)]
1107    async fn test_sparse_boolean_list_with_long_empty_prefix() {
1108        let empty_prefix_rows = 70_000usize;
1109        let trailing_empty_rows = 9usize;
1110        let booleans_per_list = 8usize;
1111        let num_rows = empty_prefix_rows + 1 + trailing_empty_rows;
1112
1113        let mut offsets = Vec::with_capacity(num_rows + 1);
1114        offsets.extend(std::iter::repeat_n(0i32, empty_prefix_rows + 1));
1115        let values = (0..booleans_per_list)
1116            .map(|idx| idx % 2 == 0)
1117            .collect::<Vec<_>>();
1118        offsets.push(values.len() as i32);
1119        offsets.extend(std::iter::repeat_n(
1120            values.len() as i32,
1121            trailing_empty_rows,
1122        ));
1123
1124        let items = BooleanArray::from(values);
1125        let list_array = ListArray::new(
1126            Arc::new(Field::new("item", DataType::Boolean, true)),
1127            OffsetBuffer::new(ScalarBuffer::from(offsets)),
1128            Arc::new(items),
1129            None,
1130        );
1131
1132        let test_cases = TestCases::default()
1133            .with_range(0..num_rows as u64)
1134            .with_indices(vec![0, empty_prefix_rows as u64, num_rows as u64 - 1])
1135            .with_max_file_version(LanceFileVersion::V2_2);
1136        let list_array = Arc::new(list_array) as ArrayRef;
1137        let pages = encode_v22_pages(list_array.clone()).await;
1138        assert_split_miniblock_layout(&pages, true);
1139        check_round_trip_encoding_of_data(vec![list_array], &test_cases, HashMap::new()).await;
1140    }
1141
1142    #[test_log::test(tokio::test)]
1143    async fn test_sparse_boolean_list_with_long_null_prefix() {
1144        let null_prefix_rows = 70_000usize;
1145        let trailing_empty_rows = 9usize;
1146        let booleans_per_list = 8usize;
1147        let num_rows = null_prefix_rows + 1 + trailing_empty_rows;
1148
1149        let mut offsets = Vec::with_capacity(num_rows + 1);
1150        offsets.extend(std::iter::repeat_n(0i32, null_prefix_rows + 1));
1151        let values = (0..booleans_per_list)
1152            .map(|idx| idx % 2 == 0)
1153            .collect::<Vec<_>>();
1154        offsets.push(values.len() as i32);
1155        offsets.extend(std::iter::repeat_n(
1156            values.len() as i32,
1157            trailing_empty_rows,
1158        ));
1159        let validity = BooleanBuffer::from_iter((0..num_rows).map(|row| row >= null_prefix_rows));
1160
1161        let items = BooleanArray::from(values);
1162        let list_array = ListArray::new(
1163            Arc::new(Field::new("item", DataType::Boolean, true)),
1164            OffsetBuffer::new(ScalarBuffer::from(offsets)),
1165            Arc::new(items),
1166            Some(NullBuffer::new(validity)),
1167        );
1168
1169        let test_cases = TestCases::default()
1170            .with_range(0..num_rows as u64)
1171            .with_indices(vec![0, null_prefix_rows as u64, num_rows as u64 - 1])
1172            .with_max_file_version(LanceFileVersion::V2_2);
1173        let list_array = Arc::new(list_array) as ArrayRef;
1174        let pages = encode_v22_pages(list_array.clone()).await;
1175        assert_split_miniblock_layout(&pages, true);
1176        check_round_trip_encoding_of_data(vec![list_array], &test_cases, HashMap::new()).await;
1177    }
1178
1179    #[test_log::test(tokio::test)]
1180    async fn test_sparse_boolean_list_with_amortized_long_empty_prefix() {
1181        let empty_prefix_rows = 62_000usize;
1182        let booleans_per_list = 8_192usize;
1183        let num_rows = empty_prefix_rows + 1;
1184
1185        let mut offsets = Vec::with_capacity(num_rows + 1);
1186        offsets.extend(std::iter::repeat_n(0i32, empty_prefix_rows + 1));
1187        let values = (0..booleans_per_list)
1188            .map(|idx| idx % 2 == 0)
1189            .collect::<Vec<_>>();
1190        offsets.push(values.len() as i32);
1191
1192        let items = BooleanArray::from(values);
1193        let list_array = ListArray::new(
1194            Arc::new(Field::new("item", DataType::Boolean, true)),
1195            OffsetBuffer::new(ScalarBuffer::from(offsets)),
1196            Arc::new(items),
1197            None,
1198        );
1199
1200        let test_cases = TestCases::default()
1201            .with_range(0..num_rows as u64)
1202            .with_indices(vec![0, empty_prefix_rows as u64])
1203            .with_max_file_version(LanceFileVersion::V2_2);
1204        let list_array = Arc::new(list_array) as ArrayRef;
1205        let pages = encode_v22_pages(list_array.clone()).await;
1206        assert_split_miniblock_layout(&pages, true);
1207        check_round_trip_encoding_of_data(vec![list_array], &test_cases, HashMap::new()).await;
1208    }
1209
1210    #[test_log::test(tokio::test)]
1211    async fn test_nested_sparse_boolean_list_fails_without_panic() {
1212        let empty_inner_lists = 70_000usize;
1213        let booleans_per_list = 8usize;
1214
1215        let mut inner_offsets = vec![0i32; empty_inner_lists + 1];
1216        let values = (0..booleans_per_list)
1217            .map(|idx| idx % 2 == 0)
1218            .collect::<Vec<_>>();
1219        inner_offsets.push(values.len() as i32);
1220
1221        let inner_items = BooleanArray::from(values);
1222        let inner_list = ListArray::new(
1223            Arc::new(Field::new("item", DataType::Boolean, true)),
1224            OffsetBuffer::new(ScalarBuffer::from(inner_offsets)),
1225            Arc::new(inner_items),
1226            None,
1227        );
1228        let outer_list = ListArray::new(
1229            Arc::new(Field::new("item", inner_list.data_type().clone(), true)),
1230            OffsetBuffer::new(ScalarBuffer::from(vec![0i32, empty_inner_lists as i32 + 1])),
1231            Arc::new(inner_list),
1232            None,
1233        );
1234
1235        let err = try_encode_v22_pages(Arc::new(outer_list))
1236            .await
1237            .unwrap_err();
1238        assert!(
1239            err.to_string().contains("Mini-block cannot encode"),
1240            "unexpected error: {err}"
1241        );
1242    }
1243
1244    #[test_log::test(tokio::test)]
1245    async fn test_nested_sparse_string_single_row_falls_back_to_fullzip() {
1246        let empty_inner_lists = 70_000usize;
1247
1248        let mut inner_offsets = vec![0i32; empty_inner_lists + 1];
1249        inner_offsets.push(1);
1250        inner_offsets.push(2);
1251
1252        let mut strings = StringBuilder::new();
1253        strings.append_value("value");
1254        strings.append_value("other");
1255        let inner_items = strings.finish();
1256        let inner_list = ListArray::new(
1257            Arc::new(Field::new("item", DataType::Utf8, true)),
1258            OffsetBuffer::new(ScalarBuffer::from(inner_offsets)),
1259            Arc::new(inner_items),
1260            None,
1261        );
1262        let outer_list = ListArray::new(
1263            Arc::new(Field::new("item", inner_list.data_type().clone(), true)),
1264            OffsetBuffer::new(ScalarBuffer::from(vec![0i32, empty_inner_lists as i32 + 2])),
1265            Arc::new(inner_list),
1266            None,
1267        );
1268
1269        let outer_list = Arc::new(outer_list) as ArrayRef;
1270        let pages = encode_v22_pages(outer_list.clone()).await;
1271        assert_has_fullzip_layout(&pages);
1272
1273        let test_cases = TestCases::default()
1274            .with_range(0..1)
1275            .with_indices(vec![0])
1276            .with_max_file_version(LanceFileVersion::V2_2);
1277        check_round_trip_encoding_of_data(vec![outer_list], &test_cases, HashMap::new()).await;
1278    }
1279
1280    /// Builds the HNSW-flush repro shape: a dense prefix where every row has
1281    /// `NEIGHBORS_PER_ROW` distinct values, followed by a long tail of empty
1282    /// lists. Mirrors `HNSW::schema()` `__neighbors` / `__dists` columns:
1283    /// dense level-0 lists, then ~6x as many mostly-empty higher-level rows.
1284    fn make_hnsw_shaped_list_u32() -> ListArray {
1285        const DENSE_ROWS: u32 = 40_000;
1286        const NEIGHBORS_PER_ROW: u32 = 32;
1287        const EMPTY_TAIL_ROWS: u32 = 240_000;
1288
1289        let mut list_builder = ListBuilder::new(UInt32Builder::new());
1290        let mut next_val: u32 = 0;
1291        for _ in 0..DENSE_ROWS {
1292            for _ in 0..NEIGHBORS_PER_ROW {
1293                list_builder.values().append_value(next_val);
1294                next_val = next_val.wrapping_add(1);
1295            }
1296            list_builder.append(true);
1297        }
1298        for _ in 0..EMPTY_TAIL_ROWS {
1299            list_builder.append(true);
1300        }
1301        list_builder.finish()
1302    }
1303
1304    /// Reproduces the HNSW-flush shape at v2.2 on the auto path (no
1305    /// `STRUCTURAL_ENCODING` metadata): a dense level-0 prefix followed by a
1306    /// long tail of empty lists. The global levels/values ratio looks dense,
1307    /// so this used to encode as a single mini-block page whose final chunk
1308    /// absorbed every trailing empty list and overflowed the per-chunk `u16`
1309    /// `num_levels`, corrupting the read. The structural page planner now
1310    /// splits on top-level row boundaries: the dense prefix stays on
1311    /// mini-block pages and the empty tail becomes structural-only pages, so
1312    /// the round-trip is lossless without falling back to full-zip.
1313    #[test_log::test(tokio::test)]
1314    async fn test_list_hnsw_shape_splits_to_miniblock_v2_2() {
1315        let list_array = make_hnsw_shaped_list_u32();
1316        let dense_rows: u64 = 40_000;
1317        let total_rows = list_array.len() as u64;
1318
1319        let test_cases = TestCases::default()
1320            .with_range(0..1000)
1321            .with_range(dense_rows.saturating_sub(8)..(dense_rows + 8))
1322            .with_range(0..total_rows)
1323            .with_indices(vec![0, dense_rows - 1, dense_rows, total_rows - 1])
1324            .with_min_file_version(LanceFileVersion::V2_2)
1325            .with_max_file_version(LanceFileVersion::V2_2);
1326        let list_array = Arc::new(list_array) as ArrayRef;
1327        let pages = encode_v22_pages(list_array.clone()).await;
1328        assert_split_miniblock_layout(&pages, true);
1329        check_round_trip_encoding_of_data(vec![list_array], &test_cases, HashMap::new()).await;
1330    }
1331
1332    /// Companion to the auto-path test: even when the user explicitly requests
1333    /// `STRUCTURAL_ENCODING_MINIBLOCK`, the structural page planner splits the
1334    /// HNSW shape so every emitted page fits the mini-block per-chunk budget.
1335    /// The request is honored (the dense prefix stays on mini-block pages
1336    /// rather than being forced to full-zip) and the round-trip is lossless.
1337    #[test_log::test(tokio::test)]
1338    async fn test_forced_miniblock_hnsw_shape_splits_to_miniblock_v2_2() {
1339        let list_array = make_hnsw_shaped_list_u32();
1340        let total_rows = list_array.len() as u64;
1341
1342        let mut field_metadata = HashMap::new();
1343        field_metadata.insert(
1344            STRUCTURAL_ENCODING_META_KEY.to_string(),
1345            STRUCTURAL_ENCODING_MINIBLOCK.into(),
1346        );
1347
1348        let test_cases = TestCases::default()
1349            .with_range(0..total_rows)
1350            .with_min_file_version(LanceFileVersion::V2_2)
1351            .with_max_file_version(LanceFileVersion::V2_2);
1352        let list_array = Arc::new(list_array) as ArrayRef;
1353        let pages = try_encode_v22_pages_with_metadata(list_array.clone(), field_metadata.clone())
1354            .await
1355            .unwrap();
1356        assert_split_miniblock_layout(&pages, true);
1357        check_round_trip_encoding_of_data(vec![list_array], &test_cases, field_metadata).await;
1358    }
1359}