lance_encoding/encodings/logical/
struct.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    collections::{BinaryHeap, VecDeque},
6    ops::Range,
7    sync::Arc,
8};
9
10use crate::{
11    decoder::{
12        DecodedArray, FilterExpression, LoadedPageShard, NextDecodeTask, PageEncoding,
13        ScheduledScanLine, SchedulerContext, StructuralDecodeArrayTask, StructuralFieldDecoder,
14        StructuralFieldScheduler, StructuralSchedulingJob,
15    },
16    encoder::{EncodeTask, EncodedColumn, EncodedPage, FieldEncoder, OutOfLineBuffers},
17    format::pb,
18    repdef::{CompositeRepDefUnraveler, RepDefBuilder},
19};
20use arrow_array::{cast::AsArray, Array, ArrayRef, StructArray};
21use arrow_schema::{DataType, Fields};
22use futures::{
23    future::BoxFuture,
24    stream::{FuturesOrdered, FuturesUnordered},
25    FutureExt, StreamExt, TryStreamExt,
26};
27use itertools::Itertools;
28use lance_arrow::FieldExt;
29use lance_arrow::{deepcopy::deep_copy_nulls, r#struct::StructArrayExt};
30use lance_core::Result;
31use log::trace;
32
33use super::{list::StructuralListDecoder, primitive::StructuralPrimitiveFieldDecoder};
34
35#[derive(Debug)]
36struct StructuralSchedulingJobWithStatus<'a> {
37    col_idx: u32,
38    col_name: &'a str,
39    job: Box<dyn StructuralSchedulingJob + 'a>,
40    rows_scheduled: u64,
41    rows_remaining: u64,
42    ready_scan_lines: VecDeque<ScheduledScanLine>,
43}
44
45impl PartialEq for StructuralSchedulingJobWithStatus<'_> {
46    fn eq(&self, other: &Self) -> bool {
47        self.col_idx == other.col_idx
48    }
49}
50
51impl Eq for StructuralSchedulingJobWithStatus<'_> {}
52
53impl PartialOrd for StructuralSchedulingJobWithStatus<'_> {
54    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
55        Some(self.cmp(other))
56    }
57}
58
59impl Ord for StructuralSchedulingJobWithStatus<'_> {
60    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
61        // Note this is reversed to make it min-heap
62        other.rows_scheduled.cmp(&self.rows_scheduled)
63    }
64}
65
66/// Scheduling job for struct data
67///
68/// The order in which we schedule the children is important.  We want to schedule the child
69/// with the least amount of data first.
70///
71/// This allows us to decode entire rows as quickly as possible
72#[derive(Debug)]
73struct RepDefStructSchedulingJob<'a> {
74    /// A min-heap whose key is the # of rows currently scheduled
75    children: BinaryHeap<StructuralSchedulingJobWithStatus<'a>>,
76    rows_scheduled: u64,
77    num_rows: u64,
78}
79
80impl<'a> RepDefStructSchedulingJob<'a> {
81    fn new(
82        scheduler: &'a StructuralStructScheduler,
83        children: Vec<Box<dyn StructuralSchedulingJob + 'a>>,
84        num_rows: u64,
85    ) -> Self {
86        let children = children
87            .into_iter()
88            .enumerate()
89            .map(|(idx, job)| StructuralSchedulingJobWithStatus {
90                col_idx: idx as u32,
91                col_name: scheduler.child_fields[idx].name(),
92                job,
93                rows_scheduled: 0,
94                rows_remaining: num_rows,
95                ready_scan_lines: VecDeque::new(),
96            })
97            .collect::<BinaryHeap<_>>();
98        Self {
99            children,
100            rows_scheduled: 0,
101            num_rows,
102        }
103    }
104}
105
106impl StructuralSchedulingJob for RepDefStructSchedulingJob<'_> {
107    fn schedule_next(
108        &mut self,
109        mut context: &mut SchedulerContext,
110    ) -> Result<Vec<ScheduledScanLine>> {
111        if self.children.is_empty() {
112            // Special path for empty structs
113            if self.rows_scheduled == self.num_rows {
114                return Ok(Vec::new());
115            }
116            self.rows_scheduled = self.num_rows;
117            return Ok(vec![ScheduledScanLine {
118                decoders: Vec::new(),
119                rows_scheduled: self.num_rows,
120            }]);
121        }
122
123        let mut decoders = Vec::new();
124        let old_rows_scheduled = self.rows_scheduled;
125        // Schedule as many children as we need to until we have scheduled at least one
126        // complete row
127        while old_rows_scheduled == self.rows_scheduled {
128            if self.children.is_empty() {
129                // Early exit when schedulers are exhausted prematurely (TODO: does this still happen?)
130                return Ok(Vec::new());
131            }
132            let mut next_child = self.children.pop().unwrap();
133            if next_child.ready_scan_lines.is_empty() {
134                let scoped = context.push(next_child.col_name, next_child.col_idx);
135                let child_scans = next_child.job.schedule_next(scoped.context)?;
136                context = scoped.pop();
137                if child_scans.is_empty() {
138                    // Continue without pushing next_child back onto the heap (it is done)
139                    continue;
140                }
141                next_child.ready_scan_lines.extend(child_scans);
142            }
143            let child_scan = next_child.ready_scan_lines.pop_front().unwrap();
144            trace!(
145                "Scheduled {} rows for child {}",
146                child_scan.rows_scheduled,
147                next_child.col_idx
148            );
149            next_child.rows_scheduled += child_scan.rows_scheduled;
150            next_child.rows_remaining -= child_scan.rows_scheduled;
151            decoders.extend(child_scan.decoders);
152            self.children.push(next_child);
153            self.rows_scheduled = self.children.peek().unwrap().rows_scheduled;
154        }
155        let struct_rows_scheduled = self.rows_scheduled - old_rows_scheduled;
156        Ok(vec![ScheduledScanLine {
157            decoders,
158            rows_scheduled: struct_rows_scheduled,
159        }])
160    }
161}
162
163/// A scheduler for structs
164///
165/// The implementation is actually a bit more tricky than one might initially think.  We can't just
166/// go through and schedule each column one after the other.  This would mean our decode can't start
167/// until nearly all the data has arrived (since we need data from each column to yield a batch)
168///
169/// Instead, we schedule in row-major fashion
170///
171/// Note: this scheduler is the starting point for all decoding.  This is because we treat the top-level
172/// record batch as a non-nullable struct.
173#[derive(Debug)]
174pub struct StructuralStructScheduler {
175    children: Vec<Box<dyn StructuralFieldScheduler>>,
176    child_fields: Fields,
177}
178
179impl StructuralStructScheduler {
180    pub fn new(children: Vec<Box<dyn StructuralFieldScheduler>>, child_fields: Fields) -> Self {
181        Self {
182            children,
183            child_fields,
184        }
185    }
186}
187
188impl StructuralFieldScheduler for StructuralStructScheduler {
189    fn schedule_ranges<'a>(
190        &'a self,
191        ranges: &[Range<u64>],
192        filter: &FilterExpression,
193    ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
194        let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
195
196        let child_schedulers = self
197            .children
198            .iter()
199            .map(|child| child.schedule_ranges(ranges, filter))
200            .collect::<Result<Vec<_>>>()?;
201
202        Ok(Box::new(RepDefStructSchedulingJob::new(
203            self,
204            child_schedulers,
205            num_rows,
206        )))
207    }
208
209    fn initialize<'a>(
210        &'a mut self,
211        filter: &'a FilterExpression,
212        context: &'a SchedulerContext,
213    ) -> BoxFuture<'a, Result<()>> {
214        let children_initialization = self
215            .children
216            .iter_mut()
217            .map(|child| child.initialize(filter, context))
218            .collect::<FuturesUnordered<_>>();
219        async move {
220            children_initialization
221                .map(|res| res.map(|_| ()))
222                .try_collect::<Vec<_>>()
223                .await?;
224            Ok(())
225        }
226        .boxed()
227    }
228}
229
230#[derive(Debug)]
231pub struct StructuralStructDecoder {
232    children: Vec<Box<dyn StructuralFieldDecoder>>,
233    data_type: DataType,
234    child_fields: Fields,
235    // The root decoder is slightly different because it cannot have nulls
236    is_root: bool,
237}
238
239impl StructuralStructDecoder {
240    pub fn new(fields: Fields, should_validate: bool, is_root: bool) -> Self {
241        let children = fields
242            .iter()
243            .map(|field| Self::field_to_decoder(field, should_validate))
244            .collect();
245        let data_type = DataType::Struct(fields.clone());
246        Self {
247            data_type,
248            children,
249            child_fields: fields,
250            is_root,
251        }
252    }
253
254    fn field_to_decoder(
255        field: &Arc<arrow_schema::Field>,
256        should_validate: bool,
257    ) -> Box<dyn StructuralFieldDecoder> {
258        match field.data_type() {
259            DataType::Struct(fields) => {
260                if field.is_packed_struct() || field.is_blob() {
261                    let decoder =
262                        StructuralPrimitiveFieldDecoder::new(&field.clone(), should_validate);
263                    Box::new(decoder)
264                } else {
265                    Box::new(Self::new(fields.clone(), should_validate, false))
266                }
267            }
268            DataType::List(child_field) | DataType::LargeList(child_field) => {
269                let child_decoder = Self::field_to_decoder(child_field, should_validate);
270                Box::new(StructuralListDecoder::new(
271                    child_decoder,
272                    field.data_type().clone(),
273                ))
274            }
275            DataType::RunEndEncoded(_, _) => todo!(),
276            DataType::ListView(_) | DataType::LargeListView(_) => todo!(),
277            DataType::Map(_, _) => todo!(),
278            DataType::Union(_, _) => todo!(),
279            _ => Box::new(StructuralPrimitiveFieldDecoder::new(field, should_validate)),
280        }
281    }
282
283    pub fn drain_batch_task(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
284        let array_drain = self.drain(num_rows)?;
285        Ok(NextDecodeTask {
286            num_rows,
287            task: Box::new(array_drain),
288        })
289    }
290}
291
292impl StructuralFieldDecoder for StructuralStructDecoder {
293    fn accept_page(&mut self, mut child: LoadedPageShard) -> Result<()> {
294        // children with empty path should not be delivered to this method
295        let child_idx = child.path.pop_front().unwrap();
296        // This decoder is intended for one of our children
297        self.children[child_idx as usize].accept_page(child)?;
298        Ok(())
299    }
300
301    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
302        let child_tasks = self
303            .children
304            .iter_mut()
305            .map(|child| child.drain(num_rows))
306            .collect::<Result<Vec<_>>>()?;
307        Ok(Box::new(RepDefStructDecodeTask {
308            children: child_tasks,
309            child_fields: self.child_fields.clone(),
310            is_root: self.is_root,
311            num_rows,
312        }))
313    }
314
315    fn data_type(&self) -> &DataType {
316        &self.data_type
317    }
318}
319
320#[derive(Debug)]
321struct RepDefStructDecodeTask {
322    children: Vec<Box<dyn StructuralDecodeArrayTask>>,
323    child_fields: Fields,
324    is_root: bool,
325    num_rows: u64,
326}
327
328impl StructuralDecodeArrayTask for RepDefStructDecodeTask {
329    fn decode(self: Box<Self>) -> Result<DecodedArray> {
330        if self.children.is_empty() {
331            return Ok(DecodedArray {
332                array: Arc::new(StructArray::new_empty_fields(self.num_rows as usize, None)),
333                repdef: CompositeRepDefUnraveler::new(vec![]),
334            });
335        }
336
337        let arrays = self
338            .children
339            .into_iter()
340            .map(|task| task.decode())
341            .collect::<Result<Vec<_>>>()?;
342        let mut children = Vec::with_capacity(arrays.len());
343        let mut arrays_iter = arrays.into_iter();
344        let first_array = arrays_iter.next().unwrap();
345        let length = first_array.array.len();
346
347        // The repdef should be identical across all children at this point
348        let mut repdef = first_array.repdef;
349        children.push(first_array.array);
350
351        for array in arrays_iter {
352            debug_assert_eq!(length, array.array.len());
353            children.push(array.array);
354        }
355
356        let validity = if self.is_root {
357            None
358        } else {
359            repdef.unravel_validity(length)
360        };
361
362        let array = StructArray::new(self.child_fields, children, validity);
363        Ok(DecodedArray {
364            array: Arc::new(array),
365            repdef,
366        })
367    }
368}
369
370/// A structural encoder for struct fields
371///
372/// The struct's validity is added to the rep/def builder
373/// and the builder is cloned to all children.
374pub struct StructStructuralEncoder {
375    keep_original_array: bool,
376    children: Vec<Box<dyn FieldEncoder>>,
377}
378
379impl StructStructuralEncoder {
380    pub fn new(keep_original_array: bool, children: Vec<Box<dyn FieldEncoder>>) -> Self {
381        Self {
382            keep_original_array,
383            children,
384        }
385    }
386}
387
388impl FieldEncoder for StructStructuralEncoder {
389    fn maybe_encode(
390        &mut self,
391        array: ArrayRef,
392        external_buffers: &mut OutOfLineBuffers,
393        mut repdef: RepDefBuilder,
394        row_number: u64,
395        num_rows: u64,
396    ) -> Result<Vec<EncodeTask>> {
397        let struct_array = array.as_struct();
398        let mut struct_array = struct_array.normalize_slicing()?;
399        if let Some(validity) = struct_array.nulls() {
400            if self.keep_original_array {
401                repdef.add_validity_bitmap(validity.clone())
402            } else {
403                repdef.add_validity_bitmap(deep_copy_nulls(Some(validity)).unwrap())
404            }
405            struct_array = struct_array.pushdown_nulls()?;
406        } else {
407            repdef.add_no_null(struct_array.len());
408        }
409        let child_tasks = self
410            .children
411            .iter_mut()
412            .zip(struct_array.columns().iter())
413            .map(|(encoder, arr)| {
414                encoder.maybe_encode(
415                    arr.clone(),
416                    external_buffers,
417                    repdef.clone(),
418                    row_number,
419                    num_rows,
420                )
421            })
422            .collect::<Result<Vec<_>>>()?;
423        Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
424    }
425
426    fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
427        self.children
428            .iter_mut()
429            .map(|encoder| encoder.flush(external_buffers))
430            .flatten_ok()
431            .collect::<Result<Vec<_>>>()
432    }
433
434    fn num_columns(&self) -> u32 {
435        self.children
436            .iter()
437            .map(|child| child.num_columns())
438            .sum::<u32>()
439    }
440
441    fn finish(
442        &mut self,
443        external_buffers: &mut OutOfLineBuffers,
444    ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
445        let mut child_columns = self
446            .children
447            .iter_mut()
448            .map(|child| child.finish(external_buffers))
449            .collect::<FuturesOrdered<_>>();
450        async move {
451            let mut encoded_columns = Vec::with_capacity(child_columns.len());
452            while let Some(child_cols) = child_columns.next().await {
453                encoded_columns.extend(child_cols?);
454            }
455            Ok(encoded_columns)
456        }
457        .boxed()
458    }
459}
460
461pub struct StructFieldEncoder {
462    children: Vec<Box<dyn FieldEncoder>>,
463    column_index: u32,
464    num_rows_seen: u64,
465}
466
467impl StructFieldEncoder {
468    #[allow(dead_code)]
469    pub fn new(children: Vec<Box<dyn FieldEncoder>>, column_index: u32) -> Self {
470        Self {
471            children,
472            column_index,
473            num_rows_seen: 0,
474        }
475    }
476}
477
478impl FieldEncoder for StructFieldEncoder {
479    fn maybe_encode(
480        &mut self,
481        array: ArrayRef,
482        external_buffers: &mut OutOfLineBuffers,
483        repdef: RepDefBuilder,
484        row_number: u64,
485        num_rows: u64,
486    ) -> Result<Vec<EncodeTask>> {
487        self.num_rows_seen += array.len() as u64;
488        let struct_array = array.as_struct();
489        let child_tasks = self
490            .children
491            .iter_mut()
492            .zip(struct_array.columns().iter())
493            .map(|(encoder, arr)| {
494                encoder.maybe_encode(
495                    arr.clone(),
496                    external_buffers,
497                    repdef.clone(),
498                    row_number,
499                    num_rows,
500                )
501            })
502            .collect::<Result<Vec<_>>>()?;
503        Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
504    }
505
506    fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
507        let child_tasks = self
508            .children
509            .iter_mut()
510            .map(|encoder| encoder.flush(external_buffers))
511            .collect::<Result<Vec<_>>>()?;
512        Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
513    }
514
515    fn num_columns(&self) -> u32 {
516        self.children
517            .iter()
518            .map(|child| child.num_columns())
519            .sum::<u32>()
520            + 1
521    }
522
523    fn finish(
524        &mut self,
525        external_buffers: &mut OutOfLineBuffers,
526    ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
527        let mut child_columns = self
528            .children
529            .iter_mut()
530            .map(|child| child.finish(external_buffers))
531            .collect::<FuturesOrdered<_>>();
532        let num_rows_seen = self.num_rows_seen;
533        let column_index = self.column_index;
534        async move {
535            let mut columns = Vec::new();
536            // Add a column for the struct header
537            let mut header = EncodedColumn::default();
538            header.final_pages.push(EncodedPage {
539                data: Vec::new(),
540                description: PageEncoding::Legacy(pb::ArrayEncoding {
541                    array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
542                        pb::SimpleStruct {},
543                    )),
544                }),
545                num_rows: num_rows_seen,
546                column_idx: column_index,
547                row_number: 0, // Not used by legacy encoding
548            });
549            columns.push(header);
550            // Now run finish on the children
551            while let Some(child_cols) = child_columns.next().await {
552                columns.extend(child_cols?);
553            }
554            Ok(columns)
555        }
556        .boxed()
557    }
558}
559
560#[cfg(test)]
561mod tests {
562
563    use std::{collections::HashMap, sync::Arc};
564
565    use arrow_array::{
566        builder::{Int32Builder, ListBuilder},
567        Array, ArrayRef, Int32Array, ListArray, StructArray,
568    };
569    use arrow_buffer::{BooleanBuffer, NullBuffer, OffsetBuffer, ScalarBuffer};
570    use arrow_schema::{DataType, Field, Fields};
571
572    use crate::{
573        testing::{check_basic_random, check_round_trip_encoding_of_data, TestCases},
574        version::LanceFileVersion,
575    };
576
577    #[test_log::test(tokio::test)]
578    async fn test_simple_struct() {
579        let data_type = DataType::Struct(Fields::from(vec![
580            Field::new("a", DataType::Int32, false),
581            Field::new("b", DataType::Int32, false),
582        ]));
583        let field = Field::new("", data_type, false);
584        check_basic_random(field).await;
585    }
586
587    #[test_log::test(tokio::test)]
588    async fn test_nullable_struct() {
589        // Test data struct<score: int32, location: struct<x: int32, y: int32>>
590        // - score: null
591        //   location:
592        //     x: 1
593        //     y: 6
594        // - score: 12
595        //   location:
596        //     x: 2
597        //     y: null
598        // - score: 13
599        //   location:
600        //     x: 3
601        //     y: 8
602        // - score: 14
603        //   location: null
604        // - null
605        //
606        let inner_fields = Fields::from(vec![
607            Field::new("x", DataType::Int32, false),
608            Field::new("y", DataType::Int32, true),
609        ]);
610        let inner_struct = DataType::Struct(inner_fields.clone());
611        let outer_fields = Fields::from(vec![
612            Field::new("score", DataType::Int32, true),
613            Field::new("location", inner_struct, true),
614        ]);
615
616        let x_vals = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4), Some(5)]);
617        let y_vals = Int32Array::from(vec![Some(6), None, Some(8), Some(9), Some(10)]);
618        let scores = Int32Array::from(vec![None, Some(12), Some(13), Some(14), Some(15)]);
619
620        let location_validity = NullBuffer::from(vec![true, true, true, false, true]);
621        let locations = StructArray::new(
622            inner_fields,
623            vec![Arc::new(x_vals), Arc::new(y_vals)],
624            Some(location_validity),
625        );
626
627        let rows_validity = NullBuffer::from(vec![true, true, true, true, false]);
628        let rows = StructArray::new(
629            outer_fields,
630            vec![Arc::new(scores), Arc::new(locations)],
631            Some(rows_validity),
632        );
633
634        let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
635
636        check_round_trip_encoding_of_data(vec![Arc::new(rows)], &test_cases, HashMap::new()).await;
637    }
638
639    #[test_log::test(tokio::test)]
640    async fn test_simple_masked_nonempty_list() {
641        // [[1, 2], [NULL], [4], [], NULL, NULL-STRUCT]
642        //
643        let items = Int32Array::from(vec![Some(1), Some(2), None, Some(4), Some(5), Some(6)]);
644        let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 2, 3, 4, 4, 4, 5]));
645        let list_validity = BooleanBuffer::from(vec![true, true, true, true, false, true]);
646        let list_array = ListArray::new(
647            Arc::new(Field::new("item", DataType::Int32, true)),
648            offsets,
649            Arc::new(items),
650            Some(NullBuffer::new(list_validity)),
651        );
652        let struct_validity = BooleanBuffer::from(vec![true, true, true, true, true, false]);
653        let struct_array = StructArray::new(
654            Fields::from(vec![Field::new(
655                "inner_list",
656                list_array.data_type().clone(),
657                true,
658            )]),
659            vec![Arc::new(list_array)],
660            Some(NullBuffer::new(struct_validity)),
661        );
662        check_round_trip_encoding_of_data(
663            vec![Arc::new(struct_array)],
664            &TestCases::default().with_min_file_version(LanceFileVersion::V2_1),
665            HashMap::new(),
666        )
667        .await;
668    }
669
670    #[test_log::test(tokio::test)]
671    async fn test_simple_struct_list() {
672        // [[1, 2], [NULL], [4], [], NULL, NULL-STRUCT]
673        //
674        let items = Int32Array::from(vec![Some(1), Some(2), None, Some(4)]);
675        let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 2, 3, 4, 4, 4, 4]));
676        let list_validity = BooleanBuffer::from(vec![true, true, true, true, false, true]);
677        let list_array = ListArray::new(
678            Arc::new(Field::new("item", DataType::Int32, true)),
679            offsets,
680            Arc::new(items),
681            Some(NullBuffer::new(list_validity)),
682        );
683        let struct_validity = BooleanBuffer::from(vec![true, true, true, true, true, false]);
684        let struct_array = StructArray::new(
685            Fields::from(vec![Field::new(
686                "inner_list",
687                list_array.data_type().clone(),
688                true,
689            )]),
690            vec![Arc::new(list_array)],
691            Some(NullBuffer::new(struct_validity)),
692        );
693        check_round_trip_encoding_of_data(
694            vec![Arc::new(struct_array)],
695            &TestCases::default().with_min_file_version(LanceFileVersion::V2_1),
696            HashMap::new(),
697        )
698        .await;
699    }
700
701    #[test_log::test(tokio::test)]
702    async fn test_struct_list() {
703        let data_type = DataType::Struct(Fields::from(vec![
704            Field::new(
705                "inner_list",
706                DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
707                true,
708            ),
709            Field::new("outer_int", DataType::Int32, true),
710        ]));
711        let field = Field::new("row", data_type, false);
712        check_basic_random(field).await;
713    }
714
715    #[test_log::test(tokio::test)]
716    async fn test_empty_struct() {
717        // It's technically legal for a struct to have 0 children, need to
718        // make sure we support that
719        let data_type = DataType::Struct(Fields::from(Vec::<Field>::default()));
720        let field = Field::new("row", data_type, false);
721        check_basic_random(field).await;
722    }
723
724    #[test_log::test(tokio::test)]
725    async fn test_complicated_struct() {
726        let data_type = DataType::Struct(Fields::from(vec![
727            Field::new("int", DataType::Int32, true),
728            Field::new(
729                "inner",
730                DataType::Struct(Fields::from(vec![
731                    Field::new("inner_int", DataType::Int32, true),
732                    Field::new(
733                        "inner_list",
734                        DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
735                        true,
736                    ),
737                ])),
738                true,
739            ),
740            Field::new("outer_binary", DataType::Binary, true),
741        ]));
742        let field = Field::new("row", data_type, false);
743        check_basic_random(field).await;
744    }
745
746    #[test_log::test(tokio::test)]
747    async fn test_ragged_scheduling() {
748        // This test covers scheduling when batches straddle page boundaries
749
750        // Create a list with 10k nulls
751        let items_builder = Int32Builder::new();
752        let mut list_builder = ListBuilder::new(items_builder);
753        for _ in 0..10000 {
754            list_builder.append_null();
755        }
756        let list_array = Arc::new(list_builder.finish());
757        let int_array = Arc::new(Int32Array::from_iter_values(0..10000));
758        let fields = vec![
759            Field::new("", list_array.data_type().clone(), true),
760            Field::new("", int_array.data_type().clone(), true),
761        ];
762        let struct_array = Arc::new(StructArray::new(
763            Fields::from(fields),
764            vec![list_array, int_array],
765            None,
766        )) as ArrayRef;
767        let struct_arrays = (0..10000)
768            // Intentionally skip in some randomish amount to create more ragged scheduling
769            .step_by(437)
770            .map(|offset| struct_array.slice(offset, 437.min(10000 - offset)))
771            .collect::<Vec<_>>();
772        check_round_trip_encoding_of_data(struct_arrays, &TestCases::default(), HashMap::new())
773            .await;
774    }
775}