Skip to main content

lance_encoding/encodings/logical/
struct.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    collections::{BinaryHeap, VecDeque},
6    ops::Range,
7    sync::Arc,
8};
9
10use super::{
11    fixed_size_list::StructuralFixedSizeListDecoder, list::StructuralListDecoder,
12    map::StructuralMapDecoder, primitive::StructuralPrimitiveFieldDecoder,
13};
14use crate::{
15    decoder::{
16        DecodedArray, FilterExpression, LoadedPageShard, NextDecodeTask, PageEncoding,
17        ScheduledScanLine, SchedulerContext, StructuralDecodeArrayTask, StructuralFieldDecoder,
18        StructuralFieldScheduler, StructuralSchedulingJob,
19    },
20    encoder::{EncodeTask, EncodedColumn, EncodedPage, FieldEncoder, OutOfLineBuffers},
21    format::pb,
22    repdef::{CompositeRepDefUnraveler, RepDefBuilder},
23};
24use arrow_array::{cast::AsArray, Array, ArrayRef, StructArray};
25use arrow_schema::{DataType, Fields};
26use futures::{
27    future::BoxFuture,
28    stream::{FuturesOrdered, FuturesUnordered},
29    FutureExt, StreamExt, TryStreamExt,
30};
31use itertools::Itertools;
32use lance_arrow::FieldExt;
33use lance_arrow::{deepcopy::deep_copy_nulls, r#struct::StructArrayExt};
34use lance_core::{Error, Result};
35use log::trace;
36use snafu::location;
37
38#[derive(Debug)]
39struct StructuralSchedulingJobWithStatus<'a> {
40    col_idx: u32,
41    col_name: &'a str,
42    job: Box<dyn StructuralSchedulingJob + 'a>,
43    rows_scheduled: u64,
44    rows_remaining: u64,
45    ready_scan_lines: VecDeque<ScheduledScanLine>,
46}
47
48impl PartialEq for StructuralSchedulingJobWithStatus<'_> {
49    fn eq(&self, other: &Self) -> bool {
50        self.col_idx == other.col_idx
51    }
52}
53
54impl Eq for StructuralSchedulingJobWithStatus<'_> {}
55
56impl PartialOrd for StructuralSchedulingJobWithStatus<'_> {
57    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
58        Some(self.cmp(other))
59    }
60}
61
62impl Ord for StructuralSchedulingJobWithStatus<'_> {
63    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
64        // Note this is reversed to make it min-heap
65        other.rows_scheduled.cmp(&self.rows_scheduled)
66    }
67}
68
69/// Scheduling job for struct data
70///
71/// The order in which we schedule the children is important.  We want to schedule the child
72/// with the least amount of data first.
73///
74/// This allows us to decode entire rows as quickly as possible
75#[derive(Debug)]
76struct RepDefStructSchedulingJob<'a> {
77    /// A min-heap whose key is the # of rows currently scheduled
78    children: BinaryHeap<StructuralSchedulingJobWithStatus<'a>>,
79    rows_scheduled: u64,
80    num_rows: u64,
81}
82
83impl<'a> RepDefStructSchedulingJob<'a> {
84    fn new(
85        scheduler: &'a StructuralStructScheduler,
86        children: Vec<Box<dyn StructuralSchedulingJob + 'a>>,
87        num_rows: u64,
88    ) -> Self {
89        let children = children
90            .into_iter()
91            .enumerate()
92            .map(|(idx, job)| StructuralSchedulingJobWithStatus {
93                col_idx: idx as u32,
94                col_name: scheduler.child_fields[idx].name(),
95                job,
96                rows_scheduled: 0,
97                rows_remaining: num_rows,
98                ready_scan_lines: VecDeque::new(),
99            })
100            .collect::<BinaryHeap<_>>();
101        Self {
102            children,
103            rows_scheduled: 0,
104            num_rows,
105        }
106    }
107}
108
109impl StructuralSchedulingJob for RepDefStructSchedulingJob<'_> {
110    fn schedule_next(
111        &mut self,
112        mut context: &mut SchedulerContext,
113    ) -> Result<Vec<ScheduledScanLine>> {
114        if self.children.is_empty() {
115            // Special path for empty structs
116            if self.rows_scheduled == self.num_rows {
117                return Ok(Vec::new());
118            }
119            self.rows_scheduled = self.num_rows;
120            return Ok(vec![ScheduledScanLine {
121                decoders: Vec::new(),
122                rows_scheduled: self.num_rows,
123            }]);
124        }
125
126        let mut decoders = Vec::new();
127        let old_rows_scheduled = self.rows_scheduled;
128        // Schedule as many children as we need to until we have scheduled at least one
129        // complete row
130        while old_rows_scheduled == self.rows_scheduled {
131            if self.children.is_empty() {
132                // Early exit when schedulers are exhausted prematurely (TODO: does this still happen?)
133                return Ok(Vec::new());
134            }
135            let mut next_child = self.children.pop().unwrap();
136            if next_child.ready_scan_lines.is_empty() {
137                let scoped = context.push(next_child.col_name, next_child.col_idx);
138                let child_scans = next_child.job.schedule_next(scoped.context)?;
139                context = scoped.pop();
140                if child_scans.is_empty() {
141                    // Continue without pushing next_child back onto the heap (it is done)
142                    continue;
143                }
144                next_child.ready_scan_lines.extend(child_scans);
145            }
146            let child_scan = next_child.ready_scan_lines.pop_front().unwrap();
147            trace!(
148                "Scheduled {} rows for child {}",
149                child_scan.rows_scheduled,
150                next_child.col_idx
151            );
152            next_child.rows_scheduled += child_scan.rows_scheduled;
153            next_child.rows_remaining -= child_scan.rows_scheduled;
154            decoders.extend(child_scan.decoders);
155            self.children.push(next_child);
156            self.rows_scheduled = self.children.peek().unwrap().rows_scheduled;
157        }
158        let struct_rows_scheduled = self.rows_scheduled - old_rows_scheduled;
159        Ok(vec![ScheduledScanLine {
160            decoders,
161            rows_scheduled: struct_rows_scheduled,
162        }])
163    }
164}
165
166/// A scheduler for structs
167///
168/// The implementation is actually a bit more tricky than one might initially think.  We can't just
169/// go through and schedule each column one after the other.  This would mean our decode can't start
170/// until nearly all the data has arrived (since we need data from each column to yield a batch)
171///
172/// Instead, we schedule in row-major fashion
173///
174/// Note: this scheduler is the starting point for all decoding.  This is because we treat the top-level
175/// record batch as a non-nullable struct.
176#[derive(Debug)]
177pub struct StructuralStructScheduler {
178    children: Vec<Box<dyn StructuralFieldScheduler>>,
179    child_fields: Fields,
180}
181
182impl StructuralStructScheduler {
183    pub fn new(children: Vec<Box<dyn StructuralFieldScheduler>>, child_fields: Fields) -> Self {
184        Self {
185            children,
186            child_fields,
187        }
188    }
189}
190
191impl StructuralFieldScheduler for StructuralStructScheduler {
192    fn schedule_ranges<'a>(
193        &'a self,
194        ranges: &[Range<u64>],
195        filter: &FilterExpression,
196    ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
197        let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
198
199        let child_schedulers = self
200            .children
201            .iter()
202            .map(|child| child.schedule_ranges(ranges, filter))
203            .collect::<Result<Vec<_>>>()?;
204
205        Ok(Box::new(RepDefStructSchedulingJob::new(
206            self,
207            child_schedulers,
208            num_rows,
209        )))
210    }
211
212    fn initialize<'a>(
213        &'a mut self,
214        filter: &'a FilterExpression,
215        context: &'a SchedulerContext,
216    ) -> BoxFuture<'a, Result<()>> {
217        let children_initialization = self
218            .children
219            .iter_mut()
220            .map(|child| child.initialize(filter, context))
221            .collect::<FuturesUnordered<_>>();
222        async move {
223            children_initialization
224                .map(|res| res.map(|_| ()))
225                .try_collect::<Vec<_>>()
226                .await?;
227            Ok(())
228        }
229        .boxed()
230    }
231}
232
233#[derive(Debug)]
234pub struct StructuralStructDecoder {
235    children: Vec<Box<dyn StructuralFieldDecoder>>,
236    data_type: DataType,
237    child_fields: Fields,
238    // The root decoder is slightly different because it cannot have nulls
239    is_root: bool,
240}
241
242impl StructuralStructDecoder {
243    pub fn new(fields: Fields, should_validate: bool, is_root: bool) -> Result<Self> {
244        let children = fields
245            .iter()
246            .map(|field| Self::field_to_decoder(field, should_validate))
247            .collect::<Result<Vec<_>>>()?;
248        let data_type = DataType::Struct(fields.clone());
249        Ok(Self {
250            data_type,
251            children,
252            child_fields: fields,
253            is_root,
254        })
255    }
256
257    fn field_to_decoder(
258        field: &Arc<arrow_schema::Field>,
259        should_validate: bool,
260    ) -> Result<Box<dyn StructuralFieldDecoder>> {
261        match field.data_type() {
262            DataType::Struct(fields) => {
263                if field.is_packed_struct() || field.is_blob() {
264                    let decoder =
265                        StructuralPrimitiveFieldDecoder::new(&field.clone(), should_validate);
266                    Ok(Box::new(decoder))
267                } else {
268                    Ok(Box::new(Self::new(fields.clone(), should_validate, false)?))
269                }
270            }
271            DataType::List(child_field) | DataType::LargeList(child_field) => {
272                let child_decoder = Self::field_to_decoder(child_field, should_validate)?;
273                Ok(Box::new(StructuralListDecoder::new(
274                    child_decoder,
275                    field.data_type().clone(),
276                )))
277            }
278            DataType::FixedSizeList(child_field, _)
279                if matches!(child_field.data_type(), DataType::Struct(_)) =>
280            {
281                // FixedSizeList containing Struct needs structural decoding
282                let child_decoder = Self::field_to_decoder(child_field, should_validate)?;
283                Ok(Box::new(StructuralFixedSizeListDecoder::new(
284                    child_decoder,
285                    field.data_type().clone(),
286                )))
287            }
288            DataType::Map(entries_field, keys_sorted) => {
289                if *keys_sorted {
290                    return Err(Error::NotSupported {
291                        source: "Map data type with keys_sorted=true is not supported yet"
292                            .to_string()
293                            .into(),
294                        location: location!(),
295                    });
296                }
297                let child_decoder = Self::field_to_decoder(entries_field, should_validate)?;
298                Ok(Box::new(StructuralMapDecoder::new(
299                    child_decoder,
300                    field.data_type().clone(),
301                )))
302            }
303            DataType::RunEndEncoded(_, _) => todo!(),
304            DataType::ListView(_) | DataType::LargeListView(_) => todo!(),
305            DataType::Union(_, _) => todo!(),
306            _ => Ok(Box::new(StructuralPrimitiveFieldDecoder::new(
307                field,
308                should_validate,
309            ))),
310        }
311    }
312
313    pub fn drain_batch_task(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
314        let array_drain = self.drain(num_rows)?;
315        Ok(NextDecodeTask {
316            num_rows,
317            task: Box::new(array_drain),
318        })
319    }
320}
321
322impl StructuralFieldDecoder for StructuralStructDecoder {
323    fn accept_page(&mut self, mut child: LoadedPageShard) -> Result<()> {
324        // children with empty path should not be delivered to this method
325        let child_idx = child.path.pop_front().unwrap();
326        // This decoder is intended for one of our children
327        self.children[child_idx as usize].accept_page(child)?;
328        Ok(())
329    }
330
331    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
332        let child_tasks = self
333            .children
334            .iter_mut()
335            .map(|child| child.drain(num_rows))
336            .collect::<Result<Vec<_>>>()?;
337        Ok(Box::new(RepDefStructDecodeTask {
338            children: child_tasks,
339            child_fields: self.child_fields.clone(),
340            is_root: self.is_root,
341            num_rows,
342        }))
343    }
344
345    fn data_type(&self) -> &DataType {
346        &self.data_type
347    }
348}
349
350#[derive(Debug)]
351struct RepDefStructDecodeTask {
352    children: Vec<Box<dyn StructuralDecodeArrayTask>>,
353    child_fields: Fields,
354    is_root: bool,
355    num_rows: u64,
356}
357
358impl StructuralDecodeArrayTask for RepDefStructDecodeTask {
359    fn decode(self: Box<Self>) -> Result<DecodedArray> {
360        if self.children.is_empty() {
361            return Ok(DecodedArray {
362                array: Arc::new(StructArray::new_empty_fields(self.num_rows as usize, None)),
363                repdef: CompositeRepDefUnraveler::new(vec![]),
364            });
365        }
366
367        let arrays = self
368            .children
369            .into_iter()
370            .map(|task| task.decode())
371            .collect::<Result<Vec<_>>>()?;
372        let mut children = Vec::with_capacity(arrays.len());
373        let mut arrays_iter = arrays.into_iter();
374        let first_array = arrays_iter.next().unwrap();
375        let length = first_array.array.len();
376
377        // The repdef should be identical across all children at this point
378        let mut repdef = first_array.repdef;
379        children.push(first_array.array);
380
381        for array in arrays_iter {
382            debug_assert_eq!(length, array.array.len());
383            children.push(array.array);
384        }
385
386        let validity = if self.is_root {
387            None
388        } else {
389            repdef.unravel_validity(length)
390        };
391
392        let array = StructArray::new(self.child_fields, children, validity);
393        Ok(DecodedArray {
394            array: Arc::new(array),
395            repdef,
396        })
397    }
398}
399
400/// A structural encoder for struct fields
401///
402/// The struct's validity is added to the rep/def builder
403/// and the builder is cloned to all children.
404pub struct StructStructuralEncoder {
405    keep_original_array: bool,
406    children: Vec<Box<dyn FieldEncoder>>,
407}
408
409impl StructStructuralEncoder {
410    pub fn new(keep_original_array: bool, children: Vec<Box<dyn FieldEncoder>>) -> Self {
411        Self {
412            keep_original_array,
413            children,
414        }
415    }
416}
417
418impl FieldEncoder for StructStructuralEncoder {
419    fn maybe_encode(
420        &mut self,
421        array: ArrayRef,
422        external_buffers: &mut OutOfLineBuffers,
423        mut repdef: RepDefBuilder,
424        row_number: u64,
425        num_rows: u64,
426    ) -> Result<Vec<EncodeTask>> {
427        let struct_array = array.as_struct();
428        let mut struct_array = struct_array.normalize_slicing()?;
429        if let Some(validity) = struct_array.nulls() {
430            if self.keep_original_array {
431                repdef.add_validity_bitmap(validity.clone())
432            } else {
433                repdef.add_validity_bitmap(deep_copy_nulls(Some(validity)).unwrap())
434            }
435            struct_array = struct_array.pushdown_nulls()?;
436        } else {
437            repdef.add_no_null(struct_array.len());
438        }
439        let child_tasks = self
440            .children
441            .iter_mut()
442            .zip(struct_array.columns().iter())
443            .map(|(encoder, arr)| {
444                encoder.maybe_encode(
445                    arr.clone(),
446                    external_buffers,
447                    repdef.clone(),
448                    row_number,
449                    num_rows,
450                )
451            })
452            .collect::<Result<Vec<_>>>()?;
453        Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
454    }
455
456    fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
457        self.children
458            .iter_mut()
459            .map(|encoder| encoder.flush(external_buffers))
460            .flatten_ok()
461            .collect::<Result<Vec<_>>>()
462    }
463
464    fn num_columns(&self) -> u32 {
465        self.children
466            .iter()
467            .map(|child| child.num_columns())
468            .sum::<u32>()
469    }
470
471    fn finish(
472        &mut self,
473        external_buffers: &mut OutOfLineBuffers,
474    ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
475        let mut child_columns = self
476            .children
477            .iter_mut()
478            .map(|child| child.finish(external_buffers))
479            .collect::<FuturesOrdered<_>>();
480        async move {
481            let mut encoded_columns = Vec::with_capacity(child_columns.len());
482            while let Some(child_cols) = child_columns.next().await {
483                encoded_columns.extend(child_cols?);
484            }
485            Ok(encoded_columns)
486        }
487        .boxed()
488    }
489}
490
491pub struct StructFieldEncoder {
492    children: Vec<Box<dyn FieldEncoder>>,
493    column_index: u32,
494    num_rows_seen: u64,
495}
496
497impl StructFieldEncoder {
498    #[allow(dead_code)]
499    pub fn new(children: Vec<Box<dyn FieldEncoder>>, column_index: u32) -> Self {
500        Self {
501            children,
502            column_index,
503            num_rows_seen: 0,
504        }
505    }
506}
507
508impl FieldEncoder for StructFieldEncoder {
509    fn maybe_encode(
510        &mut self,
511        array: ArrayRef,
512        external_buffers: &mut OutOfLineBuffers,
513        repdef: RepDefBuilder,
514        row_number: u64,
515        num_rows: u64,
516    ) -> Result<Vec<EncodeTask>> {
517        self.num_rows_seen += array.len() as u64;
518        let struct_array = array.as_struct();
519        let child_tasks = self
520            .children
521            .iter_mut()
522            .zip(struct_array.columns().iter())
523            .map(|(encoder, arr)| {
524                encoder.maybe_encode(
525                    arr.clone(),
526                    external_buffers,
527                    repdef.clone(),
528                    row_number,
529                    num_rows,
530                )
531            })
532            .collect::<Result<Vec<_>>>()?;
533        Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
534    }
535
536    fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
537        let child_tasks = self
538            .children
539            .iter_mut()
540            .map(|encoder| encoder.flush(external_buffers))
541            .collect::<Result<Vec<_>>>()?;
542        Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
543    }
544
545    fn num_columns(&self) -> u32 {
546        self.children
547            .iter()
548            .map(|child| child.num_columns())
549            .sum::<u32>()
550            + 1
551    }
552
553    fn finish(
554        &mut self,
555        external_buffers: &mut OutOfLineBuffers,
556    ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
557        let mut child_columns = self
558            .children
559            .iter_mut()
560            .map(|child| child.finish(external_buffers))
561            .collect::<FuturesOrdered<_>>();
562        let num_rows_seen = self.num_rows_seen;
563        let column_index = self.column_index;
564        async move {
565            let mut columns = Vec::new();
566            // Add a column for the struct header
567            let mut header = EncodedColumn::default();
568            header.final_pages.push(EncodedPage {
569                data: Vec::new(),
570                description: PageEncoding::Legacy(pb::ArrayEncoding {
571                    array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
572                        pb::SimpleStruct {},
573                    )),
574                }),
575                num_rows: num_rows_seen,
576                column_idx: column_index,
577                row_number: 0, // Not used by legacy encoding
578            });
579            columns.push(header);
580            // Now run finish on the children
581            while let Some(child_cols) = child_columns.next().await {
582                columns.extend(child_cols?);
583            }
584            Ok(columns)
585        }
586        .boxed()
587    }
588}
589
590#[cfg(test)]
591mod tests {
592
593    use std::{collections::HashMap, sync::Arc};
594
595    use arrow_array::{
596        builder::{Int32Builder, ListBuilder},
597        Array, ArrayRef, Int32Array, ListArray, StructArray,
598    };
599    use arrow_buffer::{BooleanBuffer, NullBuffer, OffsetBuffer, ScalarBuffer};
600    use arrow_schema::{DataType, Field, Fields};
601
602    use crate::{
603        testing::{check_basic_random, check_round_trip_encoding_of_data, TestCases},
604        version::LanceFileVersion,
605    };
606
607    #[test_log::test(tokio::test)]
608    async fn test_simple_struct() {
609        let data_type = DataType::Struct(Fields::from(vec![
610            Field::new("a", DataType::Int32, false),
611            Field::new("b", DataType::Int32, false),
612        ]));
613        let field = Field::new("", data_type, false);
614        check_basic_random(field).await;
615    }
616
617    #[test_log::test(tokio::test)]
618    async fn test_nullable_struct() {
619        // Test data struct<score: int32, location: struct<x: int32, y: int32>>
620        // - score: null
621        //   location:
622        //     x: 1
623        //     y: 6
624        // - score: 12
625        //   location:
626        //     x: 2
627        //     y: null
628        // - score: 13
629        //   location:
630        //     x: 3
631        //     y: 8
632        // - score: 14
633        //   location: null
634        // - null
635        //
636        let inner_fields = Fields::from(vec![
637            Field::new("x", DataType::Int32, false),
638            Field::new("y", DataType::Int32, true),
639        ]);
640        let inner_struct = DataType::Struct(inner_fields.clone());
641        let outer_fields = Fields::from(vec![
642            Field::new("score", DataType::Int32, true),
643            Field::new("location", inner_struct, true),
644        ]);
645
646        let x_vals = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4), Some(5)]);
647        let y_vals = Int32Array::from(vec![Some(6), None, Some(8), Some(9), Some(10)]);
648        let scores = Int32Array::from(vec![None, Some(12), Some(13), Some(14), Some(15)]);
649
650        let location_validity = NullBuffer::from(vec![true, true, true, false, true]);
651        let locations = StructArray::new(
652            inner_fields,
653            vec![Arc::new(x_vals), Arc::new(y_vals)],
654            Some(location_validity),
655        );
656
657        let rows_validity = NullBuffer::from(vec![true, true, true, true, false]);
658        let rows = StructArray::new(
659            outer_fields,
660            vec![Arc::new(scores), Arc::new(locations)],
661            Some(rows_validity),
662        );
663
664        let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
665
666        check_round_trip_encoding_of_data(vec![Arc::new(rows)], &test_cases, HashMap::new()).await;
667    }
668
669    #[test_log::test(tokio::test)]
670    async fn test_simple_masked_nonempty_list() {
671        // [[1, 2], [NULL], [4], [], NULL, NULL-STRUCT]
672        //
673        let items = Int32Array::from(vec![Some(1), Some(2), None, Some(4), Some(5), Some(6)]);
674        let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 2, 3, 4, 4, 4, 5]));
675        let list_validity = BooleanBuffer::from(vec![true, true, true, true, false, true]);
676        let list_array = ListArray::new(
677            Arc::new(Field::new("item", DataType::Int32, true)),
678            offsets,
679            Arc::new(items),
680            Some(NullBuffer::new(list_validity)),
681        );
682        let struct_validity = BooleanBuffer::from(vec![true, true, true, true, true, false]);
683        let struct_array = StructArray::new(
684            Fields::from(vec![Field::new(
685                "inner_list",
686                list_array.data_type().clone(),
687                true,
688            )]),
689            vec![Arc::new(list_array)],
690            Some(NullBuffer::new(struct_validity)),
691        );
692        check_round_trip_encoding_of_data(
693            vec![Arc::new(struct_array)],
694            &TestCases::default().with_min_file_version(LanceFileVersion::V2_1),
695            HashMap::new(),
696        )
697        .await;
698    }
699
700    #[test_log::test(tokio::test)]
701    async fn test_simple_struct_list() {
702        // [[1, 2], [NULL], [4], [], NULL, NULL-STRUCT]
703        //
704        let items = Int32Array::from(vec![Some(1), Some(2), None, Some(4)]);
705        let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 2, 3, 4, 4, 4, 4]));
706        let list_validity = BooleanBuffer::from(vec![true, true, true, true, false, true]);
707        let list_array = ListArray::new(
708            Arc::new(Field::new("item", DataType::Int32, true)),
709            offsets,
710            Arc::new(items),
711            Some(NullBuffer::new(list_validity)),
712        );
713        let struct_validity = BooleanBuffer::from(vec![true, true, true, true, true, false]);
714        let struct_array = StructArray::new(
715            Fields::from(vec![Field::new(
716                "inner_list",
717                list_array.data_type().clone(),
718                true,
719            )]),
720            vec![Arc::new(list_array)],
721            Some(NullBuffer::new(struct_validity)),
722        );
723        check_round_trip_encoding_of_data(
724            vec![Arc::new(struct_array)],
725            &TestCases::default().with_min_file_version(LanceFileVersion::V2_1),
726            HashMap::new(),
727        )
728        .await;
729    }
730
731    #[test_log::test(tokio::test)]
732    async fn test_struct_list() {
733        let data_type = DataType::Struct(Fields::from(vec![
734            Field::new(
735                "inner_list",
736                DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
737                true,
738            ),
739            Field::new("outer_int", DataType::Int32, true),
740        ]));
741        let field = Field::new("row", data_type, false);
742        check_basic_random(field).await;
743    }
744
745    #[test_log::test(tokio::test)]
746    async fn test_empty_struct() {
747        // It's technically legal for a struct to have 0 children, need to
748        // make sure we support that
749        let data_type = DataType::Struct(Fields::from(Vec::<Field>::default()));
750        let field = Field::new("row", data_type, false);
751        check_basic_random(field).await;
752    }
753
754    #[test_log::test(tokio::test)]
755    async fn test_complicated_struct() {
756        let data_type = DataType::Struct(Fields::from(vec![
757            Field::new("int", DataType::Int32, true),
758            Field::new(
759                "inner",
760                DataType::Struct(Fields::from(vec![
761                    Field::new("inner_int", DataType::Int32, true),
762                    Field::new(
763                        "inner_list",
764                        DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
765                        true,
766                    ),
767                ])),
768                true,
769            ),
770            Field::new("outer_binary", DataType::Binary, true),
771        ]));
772        let field = Field::new("row", data_type, false);
773        check_basic_random(field).await;
774    }
775
776    #[test_log::test(tokio::test)]
777    async fn test_ragged_scheduling() {
778        // This test covers scheduling when batches straddle page boundaries
779
780        // Create a list with 10k nulls
781        let items_builder = Int32Builder::new();
782        let mut list_builder = ListBuilder::new(items_builder);
783        for _ in 0..10000 {
784            list_builder.append_null();
785        }
786        let list_array = Arc::new(list_builder.finish());
787        let int_array = Arc::new(Int32Array::from_iter_values(0..10000));
788        let fields = vec![
789            Field::new("", list_array.data_type().clone(), true),
790            Field::new("", int_array.data_type().clone(), true),
791        ];
792        let struct_array = Arc::new(StructArray::new(
793            Fields::from(fields),
794            vec![list_array, int_array],
795            None,
796        )) as ArrayRef;
797        let struct_arrays = (0..10000)
798            // Intentionally skip in some randomish amount to create more ragged scheduling
799            .step_by(437)
800            .map(|offset| struct_array.slice(offset, 437.min(10000 - offset)))
801            .collect::<Vec<_>>();
802        check_round_trip_encoding_of_data(struct_arrays, &TestCases::default(), HashMap::new())
803            .await;
804    }
805}