Skip to main content

lance_encoding/encodings/logical/
struct.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    collections::{BinaryHeap, VecDeque},
6    ops::Range,
7    sync::Arc,
8};
9
10use super::{
11    fixed_size_list::StructuralFixedSizeListDecoder, list::StructuralListDecoder,
12    map::StructuralMapDecoder, primitive::StructuralPrimitiveFieldDecoder,
13};
14use crate::{
15    decoder::{
16        DecodedArray, FilterExpression, LoadedPageShard, NextDecodeTask, PageEncoding,
17        ScheduledScanLine, SchedulerContext, StructuralDecodeArrayTask, StructuralFieldDecoder,
18        StructuralFieldScheduler, StructuralSchedulingJob,
19    },
20    encoder::{EncodeTask, EncodedColumn, EncodedPage, FieldEncoder, OutOfLineBuffers},
21    format::pb,
22    repdef::{CompositeRepDefUnraveler, RepDefBuilder},
23};
24use arrow_array::{Array, ArrayRef, StructArray, cast::AsArray};
25use arrow_schema::{DataType, Fields};
26use futures::{
27    FutureExt, StreamExt, TryStreamExt,
28    future::BoxFuture,
29    stream::{FuturesOrdered, FuturesUnordered},
30};
31use itertools::Itertools;
32use lance_arrow::FieldExt;
33use lance_arrow::{deepcopy::deep_copy_nulls, r#struct::StructArrayExt};
34use lance_core::{Error, Result};
35use log::trace;
36
37#[derive(Debug)]
38struct StructuralSchedulingJobWithStatus<'a> {
39    col_idx: u32,
40    col_name: &'a str,
41    job: Box<dyn StructuralSchedulingJob + 'a>,
42    rows_scheduled: u64,
43    rows_remaining: u64,
44    ready_scan_lines: VecDeque<ScheduledScanLine>,
45}
46
47impl PartialEq for StructuralSchedulingJobWithStatus<'_> {
48    fn eq(&self, other: &Self) -> bool {
49        self.col_idx == other.col_idx
50    }
51}
52
53impl Eq for StructuralSchedulingJobWithStatus<'_> {}
54
55impl PartialOrd for StructuralSchedulingJobWithStatus<'_> {
56    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
57        Some(self.cmp(other))
58    }
59}
60
61impl Ord for StructuralSchedulingJobWithStatus<'_> {
62    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
63        // Note this is reversed to make it min-heap
64        other.rows_scheduled.cmp(&self.rows_scheduled)
65    }
66}
67
68/// Scheduling job for struct data
69///
70/// The order in which we schedule the children is important.  We want to schedule the child
71/// with the least amount of data first.
72///
73/// This allows us to decode entire rows as quickly as possible
74#[derive(Debug)]
75struct RepDefStructSchedulingJob<'a> {
76    /// A min-heap whose key is the # of rows currently scheduled
77    children: BinaryHeap<StructuralSchedulingJobWithStatus<'a>>,
78    rows_scheduled: u64,
79    num_rows: u64,
80}
81
82impl<'a> RepDefStructSchedulingJob<'a> {
83    fn new(
84        scheduler: &'a StructuralStructScheduler,
85        children: Vec<Box<dyn StructuralSchedulingJob + 'a>>,
86        num_rows: u64,
87    ) -> Self {
88        let children = children
89            .into_iter()
90            .enumerate()
91            .map(|(idx, job)| StructuralSchedulingJobWithStatus {
92                col_idx: idx as u32,
93                col_name: scheduler.child_fields[idx].name(),
94                job,
95                rows_scheduled: 0,
96                rows_remaining: num_rows,
97                ready_scan_lines: VecDeque::new(),
98            })
99            .collect::<BinaryHeap<_>>();
100        Self {
101            children,
102            rows_scheduled: 0,
103            num_rows,
104        }
105    }
106}
107
108impl StructuralSchedulingJob for RepDefStructSchedulingJob<'_> {
109    fn schedule_next(
110        &mut self,
111        mut context: &mut SchedulerContext,
112    ) -> Result<Vec<ScheduledScanLine>> {
113        if self.children.is_empty() {
114            // Special path for empty structs
115            if self.rows_scheduled == self.num_rows {
116                return Ok(Vec::new());
117            }
118            self.rows_scheduled = self.num_rows;
119            return Ok(vec![ScheduledScanLine {
120                decoders: Vec::new(),
121                rows_scheduled: self.num_rows,
122            }]);
123        }
124
125        let mut decoders = Vec::new();
126        let old_rows_scheduled = self.rows_scheduled;
127        // Schedule as many children as we need to until we have scheduled at least one
128        // complete row
129        while old_rows_scheduled == self.rows_scheduled {
130            if self.children.is_empty() {
131                // Early exit when schedulers are exhausted prematurely (TODO: does this still happen?)
132                return Ok(Vec::new());
133            }
134            let mut next_child = self.children.pop().unwrap();
135            if next_child.ready_scan_lines.is_empty() {
136                let scoped = context.push(next_child.col_name, next_child.col_idx);
137                let child_scans = next_child.job.schedule_next(scoped.context)?;
138                context = scoped.pop();
139                if child_scans.is_empty() {
140                    // Continue without pushing next_child back onto the heap (it is done)
141                    continue;
142                }
143                next_child.ready_scan_lines.extend(child_scans);
144            }
145            let child_scan = next_child.ready_scan_lines.pop_front().unwrap();
146            trace!(
147                "Scheduled {} rows for child {}",
148                child_scan.rows_scheduled, next_child.col_idx
149            );
150            next_child.rows_scheduled += child_scan.rows_scheduled;
151            next_child.rows_remaining -= child_scan.rows_scheduled;
152            decoders.extend(child_scan.decoders);
153            self.children.push(next_child);
154            self.rows_scheduled = self.children.peek().unwrap().rows_scheduled;
155        }
156        let struct_rows_scheduled = self.rows_scheduled - old_rows_scheduled;
157        Ok(vec![ScheduledScanLine {
158            decoders,
159            rows_scheduled: struct_rows_scheduled,
160        }])
161    }
162}
163
164/// A scheduler for structs
165///
166/// The implementation is actually a bit more tricky than one might initially think.  We can't just
167/// go through and schedule each column one after the other.  This would mean our decode can't start
168/// until nearly all the data has arrived (since we need data from each column to yield a batch)
169///
170/// Instead, we schedule in row-major fashion
171///
172/// Note: this scheduler is the starting point for all decoding.  This is because we treat the top-level
173/// record batch as a non-nullable struct.
174#[derive(Debug)]
175pub struct StructuralStructScheduler {
176    children: Vec<Box<dyn StructuralFieldScheduler>>,
177    child_fields: Fields,
178}
179
180impl StructuralStructScheduler {
181    pub fn new(children: Vec<Box<dyn StructuralFieldScheduler>>, child_fields: Fields) -> Self {
182        Self {
183            children,
184            child_fields,
185        }
186    }
187}
188
189impl StructuralFieldScheduler for StructuralStructScheduler {
190    fn schedule_ranges<'a>(
191        &'a self,
192        ranges: &[Range<u64>],
193        filter: &FilterExpression,
194    ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
195        let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
196
197        let child_schedulers = self
198            .children
199            .iter()
200            .map(|child| child.schedule_ranges(ranges, filter))
201            .collect::<Result<Vec<_>>>()?;
202
203        Ok(Box::new(RepDefStructSchedulingJob::new(
204            self,
205            child_schedulers,
206            num_rows,
207        )))
208    }
209
210    fn initialize<'a>(
211        &'a mut self,
212        filter: &'a FilterExpression,
213        context: &'a SchedulerContext,
214    ) -> BoxFuture<'a, Result<()>> {
215        let children_initialization = self
216            .children
217            .iter_mut()
218            .map(|child| child.initialize(filter, context))
219            .collect::<FuturesUnordered<_>>();
220        async move {
221            children_initialization
222                .map(|res| res.map(|_| ()))
223                .try_collect::<Vec<_>>()
224                .await?;
225            Ok(())
226        }
227        .boxed()
228    }
229}
230
231#[derive(Debug)]
232pub struct StructuralStructDecoder {
233    children: Vec<Box<dyn StructuralFieldDecoder>>,
234    data_type: DataType,
235    child_fields: Fields,
236    // The root decoder is slightly different because it cannot have nulls
237    is_root: bool,
238}
239
240impl StructuralStructDecoder {
241    pub fn new(fields: Fields, should_validate: bool, is_root: bool) -> Result<Self> {
242        let children = fields
243            .iter()
244            .map(|field| Self::field_to_decoder(field, should_validate))
245            .collect::<Result<Vec<_>>>()?;
246        let data_type = DataType::Struct(fields.clone());
247        Ok(Self {
248            data_type,
249            children,
250            child_fields: fields,
251            is_root,
252        })
253    }
254
255    fn field_to_decoder(
256        field: &Arc<arrow_schema::Field>,
257        should_validate: bool,
258    ) -> Result<Box<dyn StructuralFieldDecoder>> {
259        match field.data_type() {
260            DataType::Struct(fields) => {
261                if field.is_packed_struct() || field.is_blob() {
262                    let decoder =
263                        StructuralPrimitiveFieldDecoder::new(&field.clone(), should_validate);
264                    Ok(Box::new(decoder))
265                } else {
266                    Ok(Box::new(Self::new(fields.clone(), should_validate, false)?))
267                }
268            }
269            DataType::List(child_field) | DataType::LargeList(child_field) => {
270                let child_decoder = Self::field_to_decoder(child_field, should_validate)?;
271                Ok(Box::new(StructuralListDecoder::new(
272                    child_decoder,
273                    field.data_type().clone(),
274                )))
275            }
276            DataType::FixedSizeList(child_field, _)
277                if matches!(child_field.data_type(), DataType::Struct(_)) =>
278            {
279                // FixedSizeList containing Struct needs structural decoding
280                let child_decoder = Self::field_to_decoder(child_field, should_validate)?;
281                Ok(Box::new(StructuralFixedSizeListDecoder::new(
282                    child_decoder,
283                    field.data_type().clone(),
284                )))
285            }
286            DataType::Map(entries_field, keys_sorted) => {
287                if *keys_sorted {
288                    return Err(Error::not_supported_source(
289                        "Map data type with keys_sorted=true is not supported yet"
290                            .to_string()
291                            .into(),
292                    ));
293                }
294                let child_decoder = Self::field_to_decoder(entries_field, should_validate)?;
295                Ok(Box::new(StructuralMapDecoder::new(
296                    child_decoder,
297                    field.data_type().clone(),
298                )))
299            }
300            DataType::RunEndEncoded(_, _) => todo!(),
301            DataType::ListView(_) | DataType::LargeListView(_) => todo!(),
302            DataType::Union(_, _) => todo!(),
303            _ => Ok(Box::new(StructuralPrimitiveFieldDecoder::new(
304                field,
305                should_validate,
306            ))),
307        }
308    }
309
310    pub fn drain_batch_task(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
311        let array_drain = self.drain(num_rows)?;
312        Ok(NextDecodeTask {
313            num_rows,
314            task: Box::new(array_drain),
315        })
316    }
317}
318
319impl StructuralFieldDecoder for StructuralStructDecoder {
320    fn accept_page(&mut self, mut child: LoadedPageShard) -> Result<()> {
321        // children with empty path should not be delivered to this method
322        let child_idx = child.path.pop_front().unwrap();
323        // This decoder is intended for one of our children
324        self.children[child_idx as usize].accept_page(child)?;
325        Ok(())
326    }
327
328    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
329        let child_tasks = self
330            .children
331            .iter_mut()
332            .map(|child| child.drain(num_rows))
333            .collect::<Result<Vec<_>>>()?;
334        Ok(Box::new(RepDefStructDecodeTask {
335            children: child_tasks,
336            child_fields: self.child_fields.clone(),
337            is_root: self.is_root,
338            num_rows,
339        }))
340    }
341
342    fn data_type(&self) -> &DataType {
343        &self.data_type
344    }
345}
346
347#[derive(Debug)]
348struct RepDefStructDecodeTask {
349    children: Vec<Box<dyn StructuralDecodeArrayTask>>,
350    child_fields: Fields,
351    is_root: bool,
352    num_rows: u64,
353}
354
355impl StructuralDecodeArrayTask for RepDefStructDecodeTask {
356    fn decode(self: Box<Self>) -> Result<DecodedArray> {
357        if self.children.is_empty() {
358            return Ok(DecodedArray {
359                array: Arc::new(StructArray::new_empty_fields(self.num_rows as usize, None)),
360                repdef: CompositeRepDefUnraveler::new(vec![]),
361            });
362        }
363
364        let arrays = self
365            .children
366            .into_iter()
367            .map(|task| task.decode())
368            .collect::<Result<Vec<_>>>()?;
369        let mut children = Vec::with_capacity(arrays.len());
370        let mut arrays_iter = arrays.into_iter();
371        let first_array = arrays_iter.next().unwrap();
372        let length = first_array.array.len();
373
374        // The repdef should be identical across all children at this point
375        let mut repdef = first_array.repdef;
376        children.push(first_array.array);
377
378        for array in arrays_iter {
379            debug_assert_eq!(length, array.array.len());
380            children.push(array.array);
381        }
382
383        let validity = if self.is_root {
384            None
385        } else {
386            repdef.unravel_validity(length)
387        };
388
389        let array = StructArray::try_new(self.child_fields, children, validity)
390            .map_err(|e| Error::invalid_input_source(e.to_string().into()))?;
391        Ok(DecodedArray {
392            array: Arc::new(array),
393            repdef,
394        })
395    }
396}
397
398/// A structural encoder for struct fields
399///
400/// The struct's validity is added to the rep/def builder
401/// and the builder is cloned to all children.
402pub struct StructStructuralEncoder {
403    keep_original_array: bool,
404    children: Vec<Box<dyn FieldEncoder>>,
405}
406
407impl StructStructuralEncoder {
408    pub fn new(keep_original_array: bool, children: Vec<Box<dyn FieldEncoder>>) -> Self {
409        Self {
410            keep_original_array,
411            children,
412        }
413    }
414}
415
416impl FieldEncoder for StructStructuralEncoder {
417    fn maybe_encode(
418        &mut self,
419        array: ArrayRef,
420        external_buffers: &mut OutOfLineBuffers,
421        mut repdef: RepDefBuilder,
422        row_number: u64,
423        num_rows: u64,
424    ) -> Result<Vec<EncodeTask>> {
425        let struct_array = array.as_struct();
426        let mut struct_array = struct_array.normalize_slicing()?;
427        if let Some(validity) = struct_array.nulls() {
428            if self.keep_original_array {
429                repdef.add_validity_bitmap(validity.clone())
430            } else {
431                repdef.add_validity_bitmap(deep_copy_nulls(Some(validity)).unwrap())
432            }
433            struct_array = struct_array.pushdown_nulls()?;
434        } else {
435            repdef.add_no_null(struct_array.len());
436        }
437        let child_tasks = self
438            .children
439            .iter_mut()
440            .zip(struct_array.columns().iter())
441            .map(|(encoder, arr)| {
442                encoder.maybe_encode(
443                    arr.clone(),
444                    external_buffers,
445                    repdef.clone(),
446                    row_number,
447                    num_rows,
448                )
449            })
450            .collect::<Result<Vec<_>>>()?;
451        Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
452    }
453
454    fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
455        self.children
456            .iter_mut()
457            .map(|encoder| encoder.flush(external_buffers))
458            .flatten_ok()
459            .collect::<Result<Vec<_>>>()
460    }
461
462    fn num_columns(&self) -> u32 {
463        self.children
464            .iter()
465            .map(|child| child.num_columns())
466            .sum::<u32>()
467    }
468
469    fn finish(
470        &mut self,
471        external_buffers: &mut OutOfLineBuffers,
472    ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
473        let mut child_columns = self
474            .children
475            .iter_mut()
476            .map(|child| child.finish(external_buffers))
477            .collect::<FuturesOrdered<_>>();
478        async move {
479            let mut encoded_columns = Vec::with_capacity(child_columns.len());
480            while let Some(child_cols) = child_columns.next().await {
481                encoded_columns.extend(child_cols?);
482            }
483            Ok(encoded_columns)
484        }
485        .boxed()
486    }
487}
488
489pub struct StructFieldEncoder {
490    children: Vec<Box<dyn FieldEncoder>>,
491    column_index: u32,
492    num_rows_seen: u64,
493}
494
495impl StructFieldEncoder {
496    #[allow(dead_code)]
497    pub fn new(children: Vec<Box<dyn FieldEncoder>>, column_index: u32) -> Self {
498        Self {
499            children,
500            column_index,
501            num_rows_seen: 0,
502        }
503    }
504}
505
506impl FieldEncoder for StructFieldEncoder {
507    fn maybe_encode(
508        &mut self,
509        array: ArrayRef,
510        external_buffers: &mut OutOfLineBuffers,
511        repdef: RepDefBuilder,
512        row_number: u64,
513        num_rows: u64,
514    ) -> Result<Vec<EncodeTask>> {
515        self.num_rows_seen += array.len() as u64;
516        let struct_array = array.as_struct();
517        let child_tasks = self
518            .children
519            .iter_mut()
520            .zip(struct_array.columns().iter())
521            .map(|(encoder, arr)| {
522                encoder.maybe_encode(
523                    arr.clone(),
524                    external_buffers,
525                    repdef.clone(),
526                    row_number,
527                    num_rows,
528                )
529            })
530            .collect::<Result<Vec<_>>>()?;
531        Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
532    }
533
534    fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
535        let child_tasks = self
536            .children
537            .iter_mut()
538            .map(|encoder| encoder.flush(external_buffers))
539            .collect::<Result<Vec<_>>>()?;
540        Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
541    }
542
543    fn num_columns(&self) -> u32 {
544        self.children
545            .iter()
546            .map(|child| child.num_columns())
547            .sum::<u32>()
548            + 1
549    }
550
551    fn finish(
552        &mut self,
553        external_buffers: &mut OutOfLineBuffers,
554    ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
555        let mut child_columns = self
556            .children
557            .iter_mut()
558            .map(|child| child.finish(external_buffers))
559            .collect::<FuturesOrdered<_>>();
560        let num_rows_seen = self.num_rows_seen;
561        let column_index = self.column_index;
562        async move {
563            let mut columns = Vec::new();
564            // Add a column for the struct header
565            let mut header = EncodedColumn::default();
566            header.final_pages.push(EncodedPage {
567                data: Vec::new(),
568                description: PageEncoding::Legacy(pb::ArrayEncoding {
569                    array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
570                        pb::SimpleStruct {},
571                    )),
572                }),
573                num_rows: num_rows_seen,
574                column_idx: column_index,
575                row_number: 0, // Not used by legacy encoding
576            });
577            columns.push(header);
578            // Now run finish on the children
579            while let Some(child_cols) = child_columns.next().await {
580                columns.extend(child_cols?);
581            }
582            Ok(columns)
583        }
584        .boxed()
585    }
586}
587
588#[cfg(test)]
589mod tests {
590
591    use std::{collections::HashMap, sync::Arc};
592
593    use arrow_array::{
594        Array, ArrayRef, Int32Array, ListArray, StructArray,
595        builder::{Int32Builder, ListBuilder},
596    };
597    use arrow_buffer::{BooleanBuffer, NullBuffer, OffsetBuffer, ScalarBuffer};
598    use arrow_schema::{DataType, Field, Fields};
599
600    use crate::{
601        testing::{TestCases, check_basic_random, check_round_trip_encoding_of_data},
602        version::LanceFileVersion,
603    };
604
605    #[test_log::test(tokio::test)]
606    async fn test_simple_struct() {
607        let data_type = DataType::Struct(Fields::from(vec![
608            Field::new("a", DataType::Int32, false),
609            Field::new("b", DataType::Int32, false),
610        ]));
611        let field = Field::new("", data_type, false);
612        check_basic_random(field).await;
613    }
614
615    #[test_log::test(tokio::test)]
616    async fn test_nullable_struct() {
617        // Test data struct<score: int32, location: struct<x: int32, y: int32>>
618        // - score: null
619        //   location:
620        //     x: 1
621        //     y: 6
622        // - score: 12
623        //   location:
624        //     x: 2
625        //     y: null
626        // - score: 13
627        //   location:
628        //     x: 3
629        //     y: 8
630        // - score: 14
631        //   location: null
632        // - null
633        //
634        let inner_fields = Fields::from(vec![
635            Field::new("x", DataType::Int32, false),
636            Field::new("y", DataType::Int32, true),
637        ]);
638        let inner_struct = DataType::Struct(inner_fields.clone());
639        let outer_fields = Fields::from(vec![
640            Field::new("score", DataType::Int32, true),
641            Field::new("location", inner_struct, true),
642        ]);
643
644        let x_vals = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4), Some(5)]);
645        let y_vals = Int32Array::from(vec![Some(6), None, Some(8), Some(9), Some(10)]);
646        let scores = Int32Array::from(vec![None, Some(12), Some(13), Some(14), Some(15)]);
647
648        let location_validity = NullBuffer::from(vec![true, true, true, false, true]);
649        let locations = StructArray::new(
650            inner_fields,
651            vec![Arc::new(x_vals), Arc::new(y_vals)],
652            Some(location_validity),
653        );
654
655        let rows_validity = NullBuffer::from(vec![true, true, true, true, false]);
656        let rows = StructArray::new(
657            outer_fields,
658            vec![Arc::new(scores), Arc::new(locations)],
659            Some(rows_validity),
660        );
661
662        let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
663
664        check_round_trip_encoding_of_data(vec![Arc::new(rows)], &test_cases, HashMap::new()).await;
665    }
666
667    #[test_log::test(tokio::test)]
668    async fn test_simple_masked_nonempty_list() {
669        // [[1, 2], [NULL], [4], [], NULL, NULL-STRUCT]
670        //
671        let items = Int32Array::from(vec![Some(1), Some(2), None, Some(4), Some(5), Some(6)]);
672        let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 2, 3, 4, 4, 4, 5]));
673        let list_validity = BooleanBuffer::from(vec![true, true, true, true, false, true]);
674        let list_array = ListArray::new(
675            Arc::new(Field::new("item", DataType::Int32, true)),
676            offsets,
677            Arc::new(items),
678            Some(NullBuffer::new(list_validity)),
679        );
680        let struct_validity = BooleanBuffer::from(vec![true, true, true, true, true, false]);
681        let struct_array = StructArray::new(
682            Fields::from(vec![Field::new(
683                "inner_list",
684                list_array.data_type().clone(),
685                true,
686            )]),
687            vec![Arc::new(list_array)],
688            Some(NullBuffer::new(struct_validity)),
689        );
690        check_round_trip_encoding_of_data(
691            vec![Arc::new(struct_array)],
692            &TestCases::default().with_min_file_version(LanceFileVersion::V2_1),
693            HashMap::new(),
694        )
695        .await;
696    }
697
698    #[test_log::test(tokio::test)]
699    async fn test_simple_struct_list() {
700        // [[1, 2], [NULL], [4], [], NULL, NULL-STRUCT]
701        //
702        let items = Int32Array::from(vec![Some(1), Some(2), None, Some(4)]);
703        let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 2, 3, 4, 4, 4, 4]));
704        let list_validity = BooleanBuffer::from(vec![true, true, true, true, false, true]);
705        let list_array = ListArray::new(
706            Arc::new(Field::new("item", DataType::Int32, true)),
707            offsets,
708            Arc::new(items),
709            Some(NullBuffer::new(list_validity)),
710        );
711        let struct_validity = BooleanBuffer::from(vec![true, true, true, true, true, false]);
712        let struct_array = StructArray::new(
713            Fields::from(vec![Field::new(
714                "inner_list",
715                list_array.data_type().clone(),
716                true,
717            )]),
718            vec![Arc::new(list_array)],
719            Some(NullBuffer::new(struct_validity)),
720        );
721        check_round_trip_encoding_of_data(
722            vec![Arc::new(struct_array)],
723            &TestCases::default().with_min_file_version(LanceFileVersion::V2_1),
724            HashMap::new(),
725        )
726        .await;
727    }
728
729    #[test_log::test(tokio::test)]
730    async fn test_struct_list() {
731        let data_type = DataType::Struct(Fields::from(vec![
732            Field::new(
733                "inner_list",
734                DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
735                true,
736            ),
737            Field::new("outer_int", DataType::Int32, true),
738        ]));
739        let field = Field::new("row", data_type, false);
740        check_basic_random(field).await;
741    }
742
743    #[test_log::test(tokio::test)]
744    async fn test_empty_struct() {
745        // It's technically legal for a struct to have 0 children, need to
746        // make sure we support that
747        let data_type = DataType::Struct(Fields::from(Vec::<Field>::default()));
748        let field = Field::new("row", data_type, false);
749        check_basic_random(field).await;
750    }
751
752    #[test_log::test(tokio::test)]
753    async fn test_complicated_struct() {
754        let data_type = DataType::Struct(Fields::from(vec![
755            Field::new("int", DataType::Int32, true),
756            Field::new(
757                "inner",
758                DataType::Struct(Fields::from(vec![
759                    Field::new("inner_int", DataType::Int32, true),
760                    Field::new(
761                        "inner_list",
762                        DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
763                        true,
764                    ),
765                ])),
766                true,
767            ),
768            Field::new("outer_binary", DataType::Binary, true),
769        ]));
770        let field = Field::new("row", data_type, false);
771        check_basic_random(field).await;
772    }
773
774    #[test_log::test(tokio::test)]
775    async fn test_list_of_struct_with_null_struct_element() {
776        // Regression: a list containing structs where most struct elements are null
777        // causes a length mismatch during decoding with V2_2 encoding.
778        use arrow_array::StringArray;
779
780        let tag_array = StringArray::from(vec![
781            Some("valid"),
782            Some("null_struct"),
783            Some("valid"),
784            Some("valid"),
785        ]);
786        let struct_fields = Fields::from(vec![Field::new("tag", DataType::Utf8, true)]);
787        // 3 out of 4 struct elements are null
788        let struct_validity = NullBuffer::from(vec![false, true, false, false]);
789        let struct_array = StructArray::new(
790            struct_fields.clone(),
791            vec![Arc::new(tag_array)],
792            Some(struct_validity),
793        );
794
795        let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 4]));
796        let list_field = Field::new("item", DataType::Struct(struct_fields), true);
797        let list_array =
798            ListArray::new(Arc::new(list_field), offsets, Arc::new(struct_array), None);
799
800        check_round_trip_encoding_of_data(
801            vec![Arc::new(list_array)],
802            &TestCases::default().with_min_file_version(LanceFileVersion::V2_2),
803            HashMap::new(),
804        )
805        .await;
806    }
807
808    #[test_log::test(tokio::test)]
809    async fn test_ragged_scheduling() {
810        // This test covers scheduling when batches straddle page boundaries
811
812        // Create a list with 10k nulls
813        let items_builder = Int32Builder::new();
814        let mut list_builder = ListBuilder::new(items_builder);
815        for _ in 0..10000 {
816            list_builder.append_null();
817        }
818        let list_array = Arc::new(list_builder.finish());
819        let int_array = Arc::new(Int32Array::from_iter_values(0..10000));
820        let fields = vec![
821            Field::new("", list_array.data_type().clone(), true),
822            Field::new("", int_array.data_type().clone(), true),
823        ];
824        let struct_array = Arc::new(StructArray::new(
825            Fields::from(fields),
826            vec![list_array, int_array],
827            None,
828        )) as ArrayRef;
829        let struct_arrays = (0..10000)
830            // Intentionally skip in some randomish amount to create more ragged scheduling
831            .step_by(437)
832            .map(|offset| struct_array.slice(offset, 437.min(10000 - offset)))
833            .collect::<Vec<_>>();
834        check_round_trip_encoding_of_data(struct_arrays, &TestCases::default(), HashMap::new())
835            .await;
836    }
837}