lance_encoding/encodings/logical/
struct.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{collections::BinaryHeap, ops::Range, sync::Arc};
5
6use crate::{
7    decoder::{
8        DecodedArray, FilterExpression, LoadedPage, NextDecodeTask, PageEncoding,
9        ScheduledScanLine, SchedulerContext, StructuralDecodeArrayTask, StructuralFieldDecoder,
10        StructuralFieldScheduler, StructuralSchedulingJob,
11    },
12    encoder::{EncodeTask, EncodedColumn, EncodedPage, FieldEncoder, OutOfLineBuffers},
13    format::pb,
14    repdef::{CompositeRepDefUnraveler, RepDefBuilder},
15};
16use arrow_array::{cast::AsArray, Array, ArrayRef, StructArray};
17use arrow_schema::{DataType, Fields};
18use futures::{
19    future::BoxFuture,
20    stream::{FuturesOrdered, FuturesUnordered},
21    FutureExt, StreamExt, TryStreamExt,
22};
23use itertools::Itertools;
24use lance_arrow::FieldExt;
25use lance_arrow::{deepcopy::deep_copy_nulls, r#struct::StructArrayExt};
26use lance_core::Result;
27use log::trace;
28
29use super::{list::StructuralListDecoder, primitive::StructuralPrimitiveFieldDecoder};
30
31#[derive(Debug)]
32struct StructuralSchedulingJobWithStatus<'a> {
33    col_idx: u32,
34    col_name: &'a str,
35    job: Box<dyn StructuralSchedulingJob + 'a>,
36    rows_scheduled: u64,
37    rows_remaining: u64,
38}
39
40impl PartialEq for StructuralSchedulingJobWithStatus<'_> {
41    fn eq(&self, other: &Self) -> bool {
42        self.col_idx == other.col_idx
43    }
44}
45
46impl Eq for StructuralSchedulingJobWithStatus<'_> {}
47
48impl PartialOrd for StructuralSchedulingJobWithStatus<'_> {
49    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
50        Some(self.cmp(other))
51    }
52}
53
54impl Ord for StructuralSchedulingJobWithStatus<'_> {
55    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
56        // Note this is reversed to make it min-heap
57        other.rows_scheduled.cmp(&self.rows_scheduled)
58    }
59}
60
61/// Scheduling job for struct data
62///
63/// The order in which we schedule the children is important.  We want to schedule the child
64/// with the least amount of data first.
65///
66/// This allows us to decode entire rows as quickly as possible
67#[derive(Debug)]
68struct RepDefStructSchedulingJob<'a> {
69    /// A min-heap whose key is the # of rows currently scheduled
70    children: BinaryHeap<StructuralSchedulingJobWithStatus<'a>>,
71    rows_scheduled: u64,
72    num_rows: u64,
73}
74
75impl<'a> RepDefStructSchedulingJob<'a> {
76    fn new(
77        scheduler: &'a StructuralStructScheduler,
78        children: Vec<Box<dyn StructuralSchedulingJob + 'a>>,
79        num_rows: u64,
80    ) -> Self {
81        let children = children
82            .into_iter()
83            .enumerate()
84            .map(|(idx, job)| StructuralSchedulingJobWithStatus {
85                col_idx: idx as u32,
86                col_name: scheduler.child_fields[idx].name(),
87                job,
88                rows_scheduled: 0,
89                rows_remaining: num_rows,
90            })
91            .collect::<BinaryHeap<_>>();
92        Self {
93            children,
94            rows_scheduled: 0,
95            num_rows,
96        }
97    }
98}
99
100impl StructuralSchedulingJob for RepDefStructSchedulingJob<'_> {
101    fn schedule_next(
102        &mut self,
103        mut context: &mut SchedulerContext,
104    ) -> Result<Option<ScheduledScanLine>> {
105        if self.children.is_empty() {
106            // Special path for empty structs
107            if self.rows_scheduled == self.num_rows {
108                return Ok(None);
109            }
110            self.rows_scheduled = self.num_rows;
111            return Ok(Some(ScheduledScanLine {
112                decoders: Vec::new(),
113                rows_scheduled: self.num_rows,
114            }));
115        }
116
117        let mut decoders = Vec::new();
118        let old_rows_scheduled = self.rows_scheduled;
119        // Schedule as many children as we need to until we have scheduled at least one
120        // complete row
121        while old_rows_scheduled == self.rows_scheduled {
122            let mut next_child = self.children.pop().unwrap();
123            let scoped = context.push(next_child.col_name, next_child.col_idx);
124            let child_scan = next_child.job.schedule_next(scoped.context)?;
125            // next_child is the least-scheduled child and, if it's done, that
126            // means we are completely done.
127            if child_scan.is_none() {
128                return Ok(None);
129            }
130            let child_scan = child_scan.unwrap();
131
132            trace!(
133                "Scheduled {} rows for child {}",
134                child_scan.rows_scheduled,
135                next_child.col_idx
136            );
137            next_child.rows_scheduled += child_scan.rows_scheduled;
138            next_child.rows_remaining -= child_scan.rows_scheduled;
139            decoders.extend(child_scan.decoders);
140            self.children.push(next_child);
141            self.rows_scheduled = self.children.peek().unwrap().rows_scheduled;
142            context = scoped.pop();
143        }
144        let struct_rows_scheduled = self.rows_scheduled - old_rows_scheduled;
145        Ok(Some(ScheduledScanLine {
146            decoders,
147            rows_scheduled: struct_rows_scheduled,
148        }))
149    }
150}
151
152/// A scheduler for structs
153///
154/// The implementation is actually a bit more tricky than one might initially think.  We can't just
155/// go through and schedule each column one after the other.  This would mean our decode can't start
156/// until nearly all the data has arrived (since we need data from each column to yield a batch)
157///
158/// Instead, we schedule in row-major fashion
159///
160/// Note: this scheduler is the starting point for all decoding.  This is because we treat the top-level
161/// record batch as a non-nullable struct.
162#[derive(Debug)]
163pub struct StructuralStructScheduler {
164    children: Vec<Box<dyn StructuralFieldScheduler>>,
165    child_fields: Fields,
166}
167
168impl StructuralStructScheduler {
169    pub fn new(children: Vec<Box<dyn StructuralFieldScheduler>>, child_fields: Fields) -> Self {
170        Self {
171            children,
172            child_fields,
173        }
174    }
175}
176
177impl StructuralFieldScheduler for StructuralStructScheduler {
178    fn schedule_ranges<'a>(
179        &'a self,
180        ranges: &[Range<u64>],
181        filter: &FilterExpression,
182    ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
183        let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
184
185        let child_schedulers = self
186            .children
187            .iter()
188            .map(|child| child.schedule_ranges(ranges, filter))
189            .collect::<Result<Vec<_>>>()?;
190
191        Ok(Box::new(RepDefStructSchedulingJob::new(
192            self,
193            child_schedulers,
194            num_rows,
195        )))
196    }
197
198    fn initialize<'a>(
199        &'a mut self,
200        filter: &'a FilterExpression,
201        context: &'a SchedulerContext,
202    ) -> BoxFuture<'a, Result<()>> {
203        let children_initialization = self
204            .children
205            .iter_mut()
206            .map(|child| child.initialize(filter, context))
207            .collect::<FuturesUnordered<_>>();
208        async move {
209            children_initialization
210                .map(|res| res.map(|_| ()))
211                .try_collect::<Vec<_>>()
212                .await?;
213            Ok(())
214        }
215        .boxed()
216    }
217}
218
219#[derive(Debug)]
220pub struct StructuralStructDecoder {
221    children: Vec<Box<dyn StructuralFieldDecoder>>,
222    data_type: DataType,
223    child_fields: Fields,
224    // The root decoder is slightly different because it cannot have nulls
225    is_root: bool,
226}
227
228impl StructuralStructDecoder {
229    pub fn new(fields: Fields, should_validate: bool, is_root: bool) -> Self {
230        let children = fields
231            .iter()
232            .map(|field| Self::field_to_decoder(field, should_validate))
233            .collect();
234        let data_type = DataType::Struct(fields.clone());
235        Self {
236            data_type,
237            children,
238            child_fields: fields,
239            is_root,
240        }
241    }
242
243    fn field_to_decoder(
244        field: &Arc<arrow_schema::Field>,
245        should_validate: bool,
246    ) -> Box<dyn StructuralFieldDecoder> {
247        match field.data_type() {
248            DataType::Struct(fields) => {
249                if field.is_packed_struct() {
250                    let decoder =
251                        StructuralPrimitiveFieldDecoder::new(&field.clone(), should_validate);
252                    Box::new(decoder)
253                } else {
254                    Box::new(Self::new(fields.clone(), should_validate, false))
255                }
256            }
257            DataType::List(child_field) | DataType::LargeList(child_field) => {
258                let child_decoder = Self::field_to_decoder(child_field, should_validate);
259                Box::new(StructuralListDecoder::new(
260                    child_decoder,
261                    field.data_type().clone(),
262                ))
263            }
264            DataType::RunEndEncoded(_, _) => todo!(),
265            DataType::ListView(_) | DataType::LargeListView(_) => todo!(),
266            DataType::Map(_, _) => todo!(),
267            DataType::Union(_, _) => todo!(),
268            _ => Box::new(StructuralPrimitiveFieldDecoder::new(field, should_validate)),
269        }
270    }
271
272    pub fn drain_batch_task(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
273        let array_drain = self.drain(num_rows)?;
274        Ok(NextDecodeTask {
275            num_rows,
276            task: Box::new(array_drain),
277        })
278    }
279}
280
281impl StructuralFieldDecoder for StructuralStructDecoder {
282    fn accept_page(&mut self, mut child: LoadedPage) -> Result<()> {
283        // children with empty path should not be delivered to this method
284        let child_idx = child.path.pop_front().unwrap();
285        // This decoder is intended for one of our children
286        self.children[child_idx as usize].accept_page(child)?;
287        Ok(())
288    }
289
290    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
291        let child_tasks = self
292            .children
293            .iter_mut()
294            .map(|child| child.drain(num_rows))
295            .collect::<Result<Vec<_>>>()?;
296        Ok(Box::new(RepDefStructDecodeTask {
297            children: child_tasks,
298            child_fields: self.child_fields.clone(),
299            is_root: self.is_root,
300            num_rows,
301        }))
302    }
303
304    fn data_type(&self) -> &DataType {
305        &self.data_type
306    }
307}
308
309#[derive(Debug)]
310struct RepDefStructDecodeTask {
311    children: Vec<Box<dyn StructuralDecodeArrayTask>>,
312    child_fields: Fields,
313    is_root: bool,
314    num_rows: u64,
315}
316
317impl StructuralDecodeArrayTask for RepDefStructDecodeTask {
318    fn decode(self: Box<Self>) -> Result<DecodedArray> {
319        if self.children.is_empty() {
320            return Ok(DecodedArray {
321                array: Arc::new(StructArray::new_empty_fields(self.num_rows as usize, None)),
322                repdef: CompositeRepDefUnraveler::new(vec![]),
323            });
324        }
325
326        let arrays = self
327            .children
328            .into_iter()
329            .map(|task| task.decode())
330            .collect::<Result<Vec<_>>>()?;
331        let mut children = Vec::with_capacity(arrays.len());
332        let mut arrays_iter = arrays.into_iter();
333        let first_array = arrays_iter.next().unwrap();
334        let length = first_array.array.len();
335
336        // The repdef should be identical across all children at this point
337        let mut repdef = first_array.repdef;
338        children.push(first_array.array);
339
340        for array in arrays_iter {
341            debug_assert_eq!(length, array.array.len());
342            children.push(array.array);
343        }
344
345        let validity = if self.is_root {
346            None
347        } else {
348            repdef.unravel_validity(length)
349        };
350        let array = StructArray::new(self.child_fields, children, validity);
351        Ok(DecodedArray {
352            array: Arc::new(array),
353            repdef,
354        })
355    }
356}
357
358/// A structural encoder for struct fields
359///
360/// The struct's validity is added to the rep/def builder
361/// and the builder is cloned to all children.
362pub struct StructStructuralEncoder {
363    keep_original_array: bool,
364    children: Vec<Box<dyn FieldEncoder>>,
365}
366
367impl StructStructuralEncoder {
368    pub fn new(keep_original_array: bool, children: Vec<Box<dyn FieldEncoder>>) -> Self {
369        Self {
370            keep_original_array,
371            children,
372        }
373    }
374}
375
376impl FieldEncoder for StructStructuralEncoder {
377    fn maybe_encode(
378        &mut self,
379        array: ArrayRef,
380        external_buffers: &mut OutOfLineBuffers,
381        mut repdef: RepDefBuilder,
382        row_number: u64,
383        num_rows: u64,
384    ) -> Result<Vec<EncodeTask>> {
385        let struct_array = array.as_struct();
386        let mut struct_array = struct_array.normalize_slicing()?;
387        if let Some(validity) = struct_array.nulls() {
388            if self.keep_original_array {
389                repdef.add_validity_bitmap(validity.clone())
390            } else {
391                repdef.add_validity_bitmap(deep_copy_nulls(Some(validity)).unwrap())
392            }
393            struct_array = struct_array.pushdown_nulls()?;
394        } else {
395            repdef.add_no_null(struct_array.len());
396        }
397        let child_tasks = self
398            .children
399            .iter_mut()
400            .zip(struct_array.columns().iter())
401            .map(|(encoder, arr)| {
402                encoder.maybe_encode(
403                    arr.clone(),
404                    external_buffers,
405                    repdef.clone(),
406                    row_number,
407                    num_rows,
408                )
409            })
410            .collect::<Result<Vec<_>>>()?;
411        Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
412    }
413
414    fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
415        self.children
416            .iter_mut()
417            .map(|encoder| encoder.flush(external_buffers))
418            .flatten_ok()
419            .collect::<Result<Vec<_>>>()
420    }
421
422    fn num_columns(&self) -> u32 {
423        self.children
424            .iter()
425            .map(|child| child.num_columns())
426            .sum::<u32>()
427    }
428
429    fn finish(
430        &mut self,
431        external_buffers: &mut OutOfLineBuffers,
432    ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
433        let mut child_columns = self
434            .children
435            .iter_mut()
436            .map(|child| child.finish(external_buffers))
437            .collect::<FuturesOrdered<_>>();
438        async move {
439            let mut encoded_columns = Vec::with_capacity(child_columns.len());
440            while let Some(child_cols) = child_columns.next().await {
441                encoded_columns.extend(child_cols?);
442            }
443            Ok(encoded_columns)
444        }
445        .boxed()
446    }
447}
448
449pub struct StructFieldEncoder {
450    children: Vec<Box<dyn FieldEncoder>>,
451    column_index: u32,
452    num_rows_seen: u64,
453}
454
455impl StructFieldEncoder {
456    #[allow(dead_code)]
457    pub fn new(children: Vec<Box<dyn FieldEncoder>>, column_index: u32) -> Self {
458        Self {
459            children,
460            column_index,
461            num_rows_seen: 0,
462        }
463    }
464}
465
466impl FieldEncoder for StructFieldEncoder {
467    fn maybe_encode(
468        &mut self,
469        array: ArrayRef,
470        external_buffers: &mut OutOfLineBuffers,
471        repdef: RepDefBuilder,
472        row_number: u64,
473        num_rows: u64,
474    ) -> Result<Vec<EncodeTask>> {
475        self.num_rows_seen += array.len() as u64;
476        let struct_array = array.as_struct();
477        let child_tasks = self
478            .children
479            .iter_mut()
480            .zip(struct_array.columns().iter())
481            .map(|(encoder, arr)| {
482                encoder.maybe_encode(
483                    arr.clone(),
484                    external_buffers,
485                    repdef.clone(),
486                    row_number,
487                    num_rows,
488                )
489            })
490            .collect::<Result<Vec<_>>>()?;
491        Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
492    }
493
494    fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
495        let child_tasks = self
496            .children
497            .iter_mut()
498            .map(|encoder| encoder.flush(external_buffers))
499            .collect::<Result<Vec<_>>>()?;
500        Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
501    }
502
503    fn num_columns(&self) -> u32 {
504        self.children
505            .iter()
506            .map(|child| child.num_columns())
507            .sum::<u32>()
508            + 1
509    }
510
511    fn finish(
512        &mut self,
513        external_buffers: &mut OutOfLineBuffers,
514    ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
515        let mut child_columns = self
516            .children
517            .iter_mut()
518            .map(|child| child.finish(external_buffers))
519            .collect::<FuturesOrdered<_>>();
520        let num_rows_seen = self.num_rows_seen;
521        let column_index = self.column_index;
522        async move {
523            let mut columns = Vec::new();
524            // Add a column for the struct header
525            let mut header = EncodedColumn::default();
526            header.final_pages.push(EncodedPage {
527                data: Vec::new(),
528                description: PageEncoding::Legacy(pb::ArrayEncoding {
529                    array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
530                        pb::SimpleStruct {},
531                    )),
532                }),
533                num_rows: num_rows_seen,
534                column_idx: column_index,
535                row_number: 0, // Not used by legacy encoding
536            });
537            columns.push(header);
538            // Now run finish on the children
539            while let Some(child_cols) = child_columns.next().await {
540                columns.extend(child_cols?);
541            }
542            Ok(columns)
543        }
544        .boxed()
545    }
546}
547
548#[cfg(test)]
549mod tests {
550
551    use std::{collections::HashMap, sync::Arc};
552
553    use arrow_array::{
554        builder::{Int32Builder, ListBuilder},
555        Array, ArrayRef, Int32Array, ListArray, StructArray,
556    };
557    use arrow_buffer::{BooleanBuffer, NullBuffer, OffsetBuffer, ScalarBuffer};
558    use arrow_schema::{DataType, Field, Fields};
559
560    use crate::{
561        testing::{check_basic_random, check_round_trip_encoding_of_data, TestCases},
562        version::LanceFileVersion,
563    };
564
565    #[test_log::test(tokio::test)]
566    async fn test_simple_struct() {
567        let data_type = DataType::Struct(Fields::from(vec![
568            Field::new("a", DataType::Int32, false),
569            Field::new("b", DataType::Int32, false),
570        ]));
571        let field = Field::new("", data_type, false);
572        check_basic_random(field).await;
573    }
574
575    #[test_log::test(tokio::test)]
576    async fn test_nullable_struct() {
577        // Test data struct<score: int32, location: struct<x: int32, y: int32>>
578        // - score: null
579        //   location:
580        //     x: 1
581        //     y: 6
582        // - score: 12
583        //   location:
584        //     x: 2
585        //     y: null
586        // - score: 13
587        //   location:
588        //     x: 3
589        //     y: 8
590        // - score: 14
591        //   location: null
592        // - null
593        //
594        let inner_fields = Fields::from(vec![
595            Field::new("x", DataType::Int32, false),
596            Field::new("y", DataType::Int32, true),
597        ]);
598        let inner_struct = DataType::Struct(inner_fields.clone());
599        let outer_fields = Fields::from(vec![
600            Field::new("score", DataType::Int32, true),
601            Field::new("location", inner_struct, true),
602        ]);
603
604        let x_vals = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4), Some(5)]);
605        let y_vals = Int32Array::from(vec![Some(6), None, Some(8), Some(9), Some(10)]);
606        let scores = Int32Array::from(vec![None, Some(12), Some(13), Some(14), Some(15)]);
607
608        let location_validity = NullBuffer::from(vec![true, true, true, false, true]);
609        let locations = StructArray::new(
610            inner_fields,
611            vec![Arc::new(x_vals), Arc::new(y_vals)],
612            Some(location_validity),
613        );
614
615        let rows_validity = NullBuffer::from(vec![true, true, true, true, false]);
616        let rows = StructArray::new(
617            outer_fields,
618            vec![Arc::new(scores), Arc::new(locations)],
619            Some(rows_validity),
620        );
621
622        let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
623
624        check_round_trip_encoding_of_data(vec![Arc::new(rows)], &test_cases, HashMap::new()).await;
625    }
626
627    #[test_log::test(tokio::test)]
628    async fn test_simple_masked_nonempty_list() {
629        // [[1, 2], [NULL], [4], [], NULL, NULL-STRUCT]
630        //
631        let items = Int32Array::from(vec![Some(1), Some(2), None, Some(4), Some(5), Some(6)]);
632        let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 2, 3, 4, 4, 4, 5]));
633        let list_validity = BooleanBuffer::from(vec![true, true, true, true, false, true]);
634        let list_array = ListArray::new(
635            Arc::new(Field::new("item", DataType::Int32, true)),
636            offsets,
637            Arc::new(items),
638            Some(NullBuffer::new(list_validity)),
639        );
640        let struct_validity = BooleanBuffer::from(vec![true, true, true, true, true, false]);
641        let struct_array = StructArray::new(
642            Fields::from(vec![Field::new(
643                "inner_list",
644                list_array.data_type().clone(),
645                true,
646            )]),
647            vec![Arc::new(list_array)],
648            Some(NullBuffer::new(struct_validity)),
649        );
650        check_round_trip_encoding_of_data(
651            vec![Arc::new(struct_array)],
652            &TestCases::default().with_min_file_version(LanceFileVersion::V2_1),
653            HashMap::new(),
654        )
655        .await;
656    }
657
658    #[test_log::test(tokio::test)]
659    async fn test_simple_struct_list() {
660        // [[1, 2], [NULL], [4], [], NULL, NULL-STRUCT]
661        //
662        let items = Int32Array::from(vec![Some(1), Some(2), None, Some(4)]);
663        let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 2, 3, 4, 4, 4, 4]));
664        let list_validity = BooleanBuffer::from(vec![true, true, true, true, false, true]);
665        let list_array = ListArray::new(
666            Arc::new(Field::new("item", DataType::Int32, true)),
667            offsets,
668            Arc::new(items),
669            Some(NullBuffer::new(list_validity)),
670        );
671        let struct_validity = BooleanBuffer::from(vec![true, true, true, true, true, false]);
672        let struct_array = StructArray::new(
673            Fields::from(vec![Field::new(
674                "inner_list",
675                list_array.data_type().clone(),
676                true,
677            )]),
678            vec![Arc::new(list_array)],
679            Some(NullBuffer::new(struct_validity)),
680        );
681        check_round_trip_encoding_of_data(
682            vec![Arc::new(struct_array)],
683            &TestCases::default().with_min_file_version(LanceFileVersion::V2_1),
684            HashMap::new(),
685        )
686        .await;
687    }
688
689    #[test_log::test(tokio::test)]
690    async fn test_struct_list() {
691        let data_type = DataType::Struct(Fields::from(vec![
692            Field::new(
693                "inner_list",
694                DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
695                true,
696            ),
697            Field::new("outer_int", DataType::Int32, true),
698        ]));
699        let field = Field::new("row", data_type, false);
700        check_basic_random(field).await;
701    }
702
703    #[test_log::test(tokio::test)]
704    async fn test_empty_struct() {
705        // It's technically legal for a struct to have 0 children, need to
706        // make sure we support that
707        let data_type = DataType::Struct(Fields::from(Vec::<Field>::default()));
708        let field = Field::new("row", data_type, false);
709        check_basic_random(field).await;
710    }
711
712    #[test_log::test(tokio::test)]
713    async fn test_complicated_struct() {
714        let data_type = DataType::Struct(Fields::from(vec![
715            Field::new("int", DataType::Int32, true),
716            Field::new(
717                "inner",
718                DataType::Struct(Fields::from(vec![
719                    Field::new("inner_int", DataType::Int32, true),
720                    Field::new(
721                        "inner_list",
722                        DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
723                        true,
724                    ),
725                ])),
726                true,
727            ),
728            Field::new("outer_binary", DataType::Binary, true),
729        ]));
730        let field = Field::new("row", data_type, false);
731        check_basic_random(field).await;
732    }
733
734    #[test_log::test(tokio::test)]
735    async fn test_ragged_scheduling() {
736        // This test covers scheduling when batches straddle page boundaries
737
738        // Create a list with 10k nulls
739        let items_builder = Int32Builder::new();
740        let mut list_builder = ListBuilder::new(items_builder);
741        for _ in 0..10000 {
742            list_builder.append_null();
743        }
744        let list_array = Arc::new(list_builder.finish());
745        let int_array = Arc::new(Int32Array::from_iter_values(0..10000));
746        let fields = vec![
747            Field::new("", list_array.data_type().clone(), true),
748            Field::new("", int_array.data_type().clone(), true),
749        ];
750        let struct_array = Arc::new(StructArray::new(
751            Fields::from(fields),
752            vec![list_array, int_array],
753            None,
754        )) as ArrayRef;
755        let struct_arrays = (0..10000)
756            // Intentionally skip in some randomish amount to create more ragged scheduling
757            .step_by(437)
758            .map(|offset| struct_array.slice(offset, 437.min(10000 - offset)))
759            .collect::<Vec<_>>();
760        check_round_trip_encoding_of_data(struct_arrays, &TestCases::default(), HashMap::new())
761            .await;
762    }
763}