1use std::{
5 collections::{BinaryHeap, VecDeque},
6 ops::Range,
7 sync::Arc,
8};
9
10use super::{
11 fixed_size_list::StructuralFixedSizeListDecoder, list::StructuralListDecoder,
12 map::StructuralMapDecoder, primitive::StructuralPrimitiveFieldDecoder,
13};
14use crate::{
15 decoder::{
16 DecodedArray, FilterExpression, LoadedPageShard, NextDecodeTask, PageEncoding,
17 ScheduledScanLine, SchedulerContext, StructuralDecodeArrayTask, StructuralFieldDecoder,
18 StructuralFieldScheduler, StructuralSchedulingJob,
19 },
20 encoder::{EncodeTask, EncodedColumn, EncodedPage, FieldEncoder, OutOfLineBuffers},
21 format::pb,
22 repdef::{CompositeRepDefUnraveler, RepDefBuilder},
23};
24use arrow_array::{cast::AsArray, Array, ArrayRef, StructArray};
25use arrow_schema::{DataType, Fields};
26use futures::{
27 future::BoxFuture,
28 stream::{FuturesOrdered, FuturesUnordered},
29 FutureExt, StreamExt, TryStreamExt,
30};
31use itertools::Itertools;
32use lance_arrow::FieldExt;
33use lance_arrow::{deepcopy::deep_copy_nulls, r#struct::StructArrayExt};
34use lance_core::{Error, Result};
35use log::trace;
36use snafu::location;
37
38#[derive(Debug)]
39struct StructuralSchedulingJobWithStatus<'a> {
40 col_idx: u32,
41 col_name: &'a str,
42 job: Box<dyn StructuralSchedulingJob + 'a>,
43 rows_scheduled: u64,
44 rows_remaining: u64,
45 ready_scan_lines: VecDeque<ScheduledScanLine>,
46}
47
48impl PartialEq for StructuralSchedulingJobWithStatus<'_> {
49 fn eq(&self, other: &Self) -> bool {
50 self.col_idx == other.col_idx
51 }
52}
53
54impl Eq for StructuralSchedulingJobWithStatus<'_> {}
55
56impl PartialOrd for StructuralSchedulingJobWithStatus<'_> {
57 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
58 Some(self.cmp(other))
59 }
60}
61
62impl Ord for StructuralSchedulingJobWithStatus<'_> {
63 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
64 other.rows_scheduled.cmp(&self.rows_scheduled)
66 }
67}
68
69#[derive(Debug)]
76struct RepDefStructSchedulingJob<'a> {
77 children: BinaryHeap<StructuralSchedulingJobWithStatus<'a>>,
79 rows_scheduled: u64,
80 num_rows: u64,
81}
82
83impl<'a> RepDefStructSchedulingJob<'a> {
84 fn new(
85 scheduler: &'a StructuralStructScheduler,
86 children: Vec<Box<dyn StructuralSchedulingJob + 'a>>,
87 num_rows: u64,
88 ) -> Self {
89 let children = children
90 .into_iter()
91 .enumerate()
92 .map(|(idx, job)| StructuralSchedulingJobWithStatus {
93 col_idx: idx as u32,
94 col_name: scheduler.child_fields[idx].name(),
95 job,
96 rows_scheduled: 0,
97 rows_remaining: num_rows,
98 ready_scan_lines: VecDeque::new(),
99 })
100 .collect::<BinaryHeap<_>>();
101 Self {
102 children,
103 rows_scheduled: 0,
104 num_rows,
105 }
106 }
107}
108
109impl StructuralSchedulingJob for RepDefStructSchedulingJob<'_> {
110 fn schedule_next(
111 &mut self,
112 mut context: &mut SchedulerContext,
113 ) -> Result<Vec<ScheduledScanLine>> {
114 if self.children.is_empty() {
115 if self.rows_scheduled == self.num_rows {
117 return Ok(Vec::new());
118 }
119 self.rows_scheduled = self.num_rows;
120 return Ok(vec![ScheduledScanLine {
121 decoders: Vec::new(),
122 rows_scheduled: self.num_rows,
123 }]);
124 }
125
126 let mut decoders = Vec::new();
127 let old_rows_scheduled = self.rows_scheduled;
128 while old_rows_scheduled == self.rows_scheduled {
131 if self.children.is_empty() {
132 return Ok(Vec::new());
134 }
135 let mut next_child = self.children.pop().unwrap();
136 if next_child.ready_scan_lines.is_empty() {
137 let scoped = context.push(next_child.col_name, next_child.col_idx);
138 let child_scans = next_child.job.schedule_next(scoped.context)?;
139 context = scoped.pop();
140 if child_scans.is_empty() {
141 continue;
143 }
144 next_child.ready_scan_lines.extend(child_scans);
145 }
146 let child_scan = next_child.ready_scan_lines.pop_front().unwrap();
147 trace!(
148 "Scheduled {} rows for child {}",
149 child_scan.rows_scheduled,
150 next_child.col_idx
151 );
152 next_child.rows_scheduled += child_scan.rows_scheduled;
153 next_child.rows_remaining -= child_scan.rows_scheduled;
154 decoders.extend(child_scan.decoders);
155 self.children.push(next_child);
156 self.rows_scheduled = self.children.peek().unwrap().rows_scheduled;
157 }
158 let struct_rows_scheduled = self.rows_scheduled - old_rows_scheduled;
159 Ok(vec![ScheduledScanLine {
160 decoders,
161 rows_scheduled: struct_rows_scheduled,
162 }])
163 }
164}
165
166#[derive(Debug)]
177pub struct StructuralStructScheduler {
178 children: Vec<Box<dyn StructuralFieldScheduler>>,
179 child_fields: Fields,
180}
181
182impl StructuralStructScheduler {
183 pub fn new(children: Vec<Box<dyn StructuralFieldScheduler>>, child_fields: Fields) -> Self {
184 Self {
185 children,
186 child_fields,
187 }
188 }
189}
190
191impl StructuralFieldScheduler for StructuralStructScheduler {
192 fn schedule_ranges<'a>(
193 &'a self,
194 ranges: &[Range<u64>],
195 filter: &FilterExpression,
196 ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
197 let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
198
199 let child_schedulers = self
200 .children
201 .iter()
202 .map(|child| child.schedule_ranges(ranges, filter))
203 .collect::<Result<Vec<_>>>()?;
204
205 Ok(Box::new(RepDefStructSchedulingJob::new(
206 self,
207 child_schedulers,
208 num_rows,
209 )))
210 }
211
212 fn initialize<'a>(
213 &'a mut self,
214 filter: &'a FilterExpression,
215 context: &'a SchedulerContext,
216 ) -> BoxFuture<'a, Result<()>> {
217 let children_initialization = self
218 .children
219 .iter_mut()
220 .map(|child| child.initialize(filter, context))
221 .collect::<FuturesUnordered<_>>();
222 async move {
223 children_initialization
224 .map(|res| res.map(|_| ()))
225 .try_collect::<Vec<_>>()
226 .await?;
227 Ok(())
228 }
229 .boxed()
230 }
231}
232
233#[derive(Debug)]
234pub struct StructuralStructDecoder {
235 children: Vec<Box<dyn StructuralFieldDecoder>>,
236 data_type: DataType,
237 child_fields: Fields,
238 is_root: bool,
240}
241
242impl StructuralStructDecoder {
243 pub fn new(fields: Fields, should_validate: bool, is_root: bool) -> Result<Self> {
244 let children = fields
245 .iter()
246 .map(|field| Self::field_to_decoder(field, should_validate))
247 .collect::<Result<Vec<_>>>()?;
248 let data_type = DataType::Struct(fields.clone());
249 Ok(Self {
250 data_type,
251 children,
252 child_fields: fields,
253 is_root,
254 })
255 }
256
257 fn field_to_decoder(
258 field: &Arc<arrow_schema::Field>,
259 should_validate: bool,
260 ) -> Result<Box<dyn StructuralFieldDecoder>> {
261 match field.data_type() {
262 DataType::Struct(fields) => {
263 if field.is_packed_struct() || field.is_blob() {
264 let decoder =
265 StructuralPrimitiveFieldDecoder::new(&field.clone(), should_validate);
266 Ok(Box::new(decoder))
267 } else {
268 Ok(Box::new(Self::new(fields.clone(), should_validate, false)?))
269 }
270 }
271 DataType::List(child_field) | DataType::LargeList(child_field) => {
272 let child_decoder = Self::field_to_decoder(child_field, should_validate)?;
273 Ok(Box::new(StructuralListDecoder::new(
274 child_decoder,
275 field.data_type().clone(),
276 )))
277 }
278 DataType::FixedSizeList(child_field, _)
279 if matches!(child_field.data_type(), DataType::Struct(_)) =>
280 {
281 let child_decoder = Self::field_to_decoder(child_field, should_validate)?;
283 Ok(Box::new(StructuralFixedSizeListDecoder::new(
284 child_decoder,
285 field.data_type().clone(),
286 )))
287 }
288 DataType::Map(entries_field, keys_sorted) => {
289 if *keys_sorted {
290 return Err(Error::NotSupported {
291 source: "Map data type with keys_sorted=true is not supported yet"
292 .to_string()
293 .into(),
294 location: location!(),
295 });
296 }
297 let child_decoder = Self::field_to_decoder(entries_field, should_validate)?;
298 Ok(Box::new(StructuralMapDecoder::new(
299 child_decoder,
300 field.data_type().clone(),
301 )))
302 }
303 DataType::RunEndEncoded(_, _) => todo!(),
304 DataType::ListView(_) | DataType::LargeListView(_) => todo!(),
305 DataType::Union(_, _) => todo!(),
306 _ => Ok(Box::new(StructuralPrimitiveFieldDecoder::new(
307 field,
308 should_validate,
309 ))),
310 }
311 }
312
313 pub fn drain_batch_task(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
314 let array_drain = self.drain(num_rows)?;
315 Ok(NextDecodeTask {
316 num_rows,
317 task: Box::new(array_drain),
318 })
319 }
320}
321
322impl StructuralFieldDecoder for StructuralStructDecoder {
323 fn accept_page(&mut self, mut child: LoadedPageShard) -> Result<()> {
324 let child_idx = child.path.pop_front().unwrap();
326 self.children[child_idx as usize].accept_page(child)?;
328 Ok(())
329 }
330
331 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
332 let child_tasks = self
333 .children
334 .iter_mut()
335 .map(|child| child.drain(num_rows))
336 .collect::<Result<Vec<_>>>()?;
337 Ok(Box::new(RepDefStructDecodeTask {
338 children: child_tasks,
339 child_fields: self.child_fields.clone(),
340 is_root: self.is_root,
341 num_rows,
342 }))
343 }
344
345 fn data_type(&self) -> &DataType {
346 &self.data_type
347 }
348}
349
350#[derive(Debug)]
351struct RepDefStructDecodeTask {
352 children: Vec<Box<dyn StructuralDecodeArrayTask>>,
353 child_fields: Fields,
354 is_root: bool,
355 num_rows: u64,
356}
357
358impl StructuralDecodeArrayTask for RepDefStructDecodeTask {
359 fn decode(self: Box<Self>) -> Result<DecodedArray> {
360 if self.children.is_empty() {
361 return Ok(DecodedArray {
362 array: Arc::new(StructArray::new_empty_fields(self.num_rows as usize, None)),
363 repdef: CompositeRepDefUnraveler::new(vec![]),
364 });
365 }
366
367 let arrays = self
368 .children
369 .into_iter()
370 .map(|task| task.decode())
371 .collect::<Result<Vec<_>>>()?;
372 let mut children = Vec::with_capacity(arrays.len());
373 let mut arrays_iter = arrays.into_iter();
374 let first_array = arrays_iter.next().unwrap();
375 let length = first_array.array.len();
376
377 let mut repdef = first_array.repdef;
379 children.push(first_array.array);
380
381 for array in arrays_iter {
382 debug_assert_eq!(length, array.array.len());
383 children.push(array.array);
384 }
385
386 let validity = if self.is_root {
387 None
388 } else {
389 repdef.unravel_validity(length)
390 };
391
392 let array = StructArray::new(self.child_fields, children, validity);
393 Ok(DecodedArray {
394 array: Arc::new(array),
395 repdef,
396 })
397 }
398}
399
400pub struct StructStructuralEncoder {
405 keep_original_array: bool,
406 children: Vec<Box<dyn FieldEncoder>>,
407}
408
409impl StructStructuralEncoder {
410 pub fn new(keep_original_array: bool, children: Vec<Box<dyn FieldEncoder>>) -> Self {
411 Self {
412 keep_original_array,
413 children,
414 }
415 }
416}
417
418impl FieldEncoder for StructStructuralEncoder {
419 fn maybe_encode(
420 &mut self,
421 array: ArrayRef,
422 external_buffers: &mut OutOfLineBuffers,
423 mut repdef: RepDefBuilder,
424 row_number: u64,
425 num_rows: u64,
426 ) -> Result<Vec<EncodeTask>> {
427 let struct_array = array.as_struct();
428 let mut struct_array = struct_array.normalize_slicing()?;
429 if let Some(validity) = struct_array.nulls() {
430 if self.keep_original_array {
431 repdef.add_validity_bitmap(validity.clone())
432 } else {
433 repdef.add_validity_bitmap(deep_copy_nulls(Some(validity)).unwrap())
434 }
435 struct_array = struct_array.pushdown_nulls()?;
436 } else {
437 repdef.add_no_null(struct_array.len());
438 }
439 let child_tasks = self
440 .children
441 .iter_mut()
442 .zip(struct_array.columns().iter())
443 .map(|(encoder, arr)| {
444 encoder.maybe_encode(
445 arr.clone(),
446 external_buffers,
447 repdef.clone(),
448 row_number,
449 num_rows,
450 )
451 })
452 .collect::<Result<Vec<_>>>()?;
453 Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
454 }
455
456 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
457 self.children
458 .iter_mut()
459 .map(|encoder| encoder.flush(external_buffers))
460 .flatten_ok()
461 .collect::<Result<Vec<_>>>()
462 }
463
464 fn num_columns(&self) -> u32 {
465 self.children
466 .iter()
467 .map(|child| child.num_columns())
468 .sum::<u32>()
469 }
470
471 fn finish(
472 &mut self,
473 external_buffers: &mut OutOfLineBuffers,
474 ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
475 let mut child_columns = self
476 .children
477 .iter_mut()
478 .map(|child| child.finish(external_buffers))
479 .collect::<FuturesOrdered<_>>();
480 async move {
481 let mut encoded_columns = Vec::with_capacity(child_columns.len());
482 while let Some(child_cols) = child_columns.next().await {
483 encoded_columns.extend(child_cols?);
484 }
485 Ok(encoded_columns)
486 }
487 .boxed()
488 }
489}
490
491pub struct StructFieldEncoder {
492 children: Vec<Box<dyn FieldEncoder>>,
493 column_index: u32,
494 num_rows_seen: u64,
495}
496
497impl StructFieldEncoder {
498 #[allow(dead_code)]
499 pub fn new(children: Vec<Box<dyn FieldEncoder>>, column_index: u32) -> Self {
500 Self {
501 children,
502 column_index,
503 num_rows_seen: 0,
504 }
505 }
506}
507
508impl FieldEncoder for StructFieldEncoder {
509 fn maybe_encode(
510 &mut self,
511 array: ArrayRef,
512 external_buffers: &mut OutOfLineBuffers,
513 repdef: RepDefBuilder,
514 row_number: u64,
515 num_rows: u64,
516 ) -> Result<Vec<EncodeTask>> {
517 self.num_rows_seen += array.len() as u64;
518 let struct_array = array.as_struct();
519 let child_tasks = self
520 .children
521 .iter_mut()
522 .zip(struct_array.columns().iter())
523 .map(|(encoder, arr)| {
524 encoder.maybe_encode(
525 arr.clone(),
526 external_buffers,
527 repdef.clone(),
528 row_number,
529 num_rows,
530 )
531 })
532 .collect::<Result<Vec<_>>>()?;
533 Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
534 }
535
536 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
537 let child_tasks = self
538 .children
539 .iter_mut()
540 .map(|encoder| encoder.flush(external_buffers))
541 .collect::<Result<Vec<_>>>()?;
542 Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
543 }
544
545 fn num_columns(&self) -> u32 {
546 self.children
547 .iter()
548 .map(|child| child.num_columns())
549 .sum::<u32>()
550 + 1
551 }
552
553 fn finish(
554 &mut self,
555 external_buffers: &mut OutOfLineBuffers,
556 ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
557 let mut child_columns = self
558 .children
559 .iter_mut()
560 .map(|child| child.finish(external_buffers))
561 .collect::<FuturesOrdered<_>>();
562 let num_rows_seen = self.num_rows_seen;
563 let column_index = self.column_index;
564 async move {
565 let mut columns = Vec::new();
566 let mut header = EncodedColumn::default();
568 header.final_pages.push(EncodedPage {
569 data: Vec::new(),
570 description: PageEncoding::Legacy(pb::ArrayEncoding {
571 array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
572 pb::SimpleStruct {},
573 )),
574 }),
575 num_rows: num_rows_seen,
576 column_idx: column_index,
577 row_number: 0, });
579 columns.push(header);
580 while let Some(child_cols) = child_columns.next().await {
582 columns.extend(child_cols?);
583 }
584 Ok(columns)
585 }
586 .boxed()
587 }
588}
589
590#[cfg(test)]
591mod tests {
592
593 use std::{collections::HashMap, sync::Arc};
594
595 use arrow_array::{
596 builder::{Int32Builder, ListBuilder},
597 Array, ArrayRef, Int32Array, ListArray, StructArray,
598 };
599 use arrow_buffer::{BooleanBuffer, NullBuffer, OffsetBuffer, ScalarBuffer};
600 use arrow_schema::{DataType, Field, Fields};
601
602 use crate::{
603 testing::{check_basic_random, check_round_trip_encoding_of_data, TestCases},
604 version::LanceFileVersion,
605 };
606
607 #[test_log::test(tokio::test)]
608 async fn test_simple_struct() {
609 let data_type = DataType::Struct(Fields::from(vec![
610 Field::new("a", DataType::Int32, false),
611 Field::new("b", DataType::Int32, false),
612 ]));
613 let field = Field::new("", data_type, false);
614 check_basic_random(field).await;
615 }
616
617 #[test_log::test(tokio::test)]
618 async fn test_nullable_struct() {
619 let inner_fields = Fields::from(vec![
637 Field::new("x", DataType::Int32, false),
638 Field::new("y", DataType::Int32, true),
639 ]);
640 let inner_struct = DataType::Struct(inner_fields.clone());
641 let outer_fields = Fields::from(vec![
642 Field::new("score", DataType::Int32, true),
643 Field::new("location", inner_struct, true),
644 ]);
645
646 let x_vals = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4), Some(5)]);
647 let y_vals = Int32Array::from(vec![Some(6), None, Some(8), Some(9), Some(10)]);
648 let scores = Int32Array::from(vec![None, Some(12), Some(13), Some(14), Some(15)]);
649
650 let location_validity = NullBuffer::from(vec![true, true, true, false, true]);
651 let locations = StructArray::new(
652 inner_fields,
653 vec![Arc::new(x_vals), Arc::new(y_vals)],
654 Some(location_validity),
655 );
656
657 let rows_validity = NullBuffer::from(vec![true, true, true, true, false]);
658 let rows = StructArray::new(
659 outer_fields,
660 vec![Arc::new(scores), Arc::new(locations)],
661 Some(rows_validity),
662 );
663
664 let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
665
666 check_round_trip_encoding_of_data(vec![Arc::new(rows)], &test_cases, HashMap::new()).await;
667 }
668
669 #[test_log::test(tokio::test)]
670 async fn test_simple_masked_nonempty_list() {
671 let items = Int32Array::from(vec![Some(1), Some(2), None, Some(4), Some(5), Some(6)]);
674 let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 2, 3, 4, 4, 4, 5]));
675 let list_validity = BooleanBuffer::from(vec![true, true, true, true, false, true]);
676 let list_array = ListArray::new(
677 Arc::new(Field::new("item", DataType::Int32, true)),
678 offsets,
679 Arc::new(items),
680 Some(NullBuffer::new(list_validity)),
681 );
682 let struct_validity = BooleanBuffer::from(vec![true, true, true, true, true, false]);
683 let struct_array = StructArray::new(
684 Fields::from(vec![Field::new(
685 "inner_list",
686 list_array.data_type().clone(),
687 true,
688 )]),
689 vec![Arc::new(list_array)],
690 Some(NullBuffer::new(struct_validity)),
691 );
692 check_round_trip_encoding_of_data(
693 vec![Arc::new(struct_array)],
694 &TestCases::default().with_min_file_version(LanceFileVersion::V2_1),
695 HashMap::new(),
696 )
697 .await;
698 }
699
700 #[test_log::test(tokio::test)]
701 async fn test_simple_struct_list() {
702 let items = Int32Array::from(vec![Some(1), Some(2), None, Some(4)]);
705 let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 2, 3, 4, 4, 4, 4]));
706 let list_validity = BooleanBuffer::from(vec![true, true, true, true, false, true]);
707 let list_array = ListArray::new(
708 Arc::new(Field::new("item", DataType::Int32, true)),
709 offsets,
710 Arc::new(items),
711 Some(NullBuffer::new(list_validity)),
712 );
713 let struct_validity = BooleanBuffer::from(vec![true, true, true, true, true, false]);
714 let struct_array = StructArray::new(
715 Fields::from(vec![Field::new(
716 "inner_list",
717 list_array.data_type().clone(),
718 true,
719 )]),
720 vec![Arc::new(list_array)],
721 Some(NullBuffer::new(struct_validity)),
722 );
723 check_round_trip_encoding_of_data(
724 vec![Arc::new(struct_array)],
725 &TestCases::default().with_min_file_version(LanceFileVersion::V2_1),
726 HashMap::new(),
727 )
728 .await;
729 }
730
731 #[test_log::test(tokio::test)]
732 async fn test_struct_list() {
733 let data_type = DataType::Struct(Fields::from(vec![
734 Field::new(
735 "inner_list",
736 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
737 true,
738 ),
739 Field::new("outer_int", DataType::Int32, true),
740 ]));
741 let field = Field::new("row", data_type, false);
742 check_basic_random(field).await;
743 }
744
745 #[test_log::test(tokio::test)]
746 async fn test_empty_struct() {
747 let data_type = DataType::Struct(Fields::from(Vec::<Field>::default()));
750 let field = Field::new("row", data_type, false);
751 check_basic_random(field).await;
752 }
753
754 #[test_log::test(tokio::test)]
755 async fn test_complicated_struct() {
756 let data_type = DataType::Struct(Fields::from(vec![
757 Field::new("int", DataType::Int32, true),
758 Field::new(
759 "inner",
760 DataType::Struct(Fields::from(vec![
761 Field::new("inner_int", DataType::Int32, true),
762 Field::new(
763 "inner_list",
764 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
765 true,
766 ),
767 ])),
768 true,
769 ),
770 Field::new("outer_binary", DataType::Binary, true),
771 ]));
772 let field = Field::new("row", data_type, false);
773 check_basic_random(field).await;
774 }
775
776 #[test_log::test(tokio::test)]
777 async fn test_ragged_scheduling() {
778 let items_builder = Int32Builder::new();
782 let mut list_builder = ListBuilder::new(items_builder);
783 for _ in 0..10000 {
784 list_builder.append_null();
785 }
786 let list_array = Arc::new(list_builder.finish());
787 let int_array = Arc::new(Int32Array::from_iter_values(0..10000));
788 let fields = vec![
789 Field::new("", list_array.data_type().clone(), true),
790 Field::new("", int_array.data_type().clone(), true),
791 ];
792 let struct_array = Arc::new(StructArray::new(
793 Fields::from(fields),
794 vec![list_array, int_array],
795 None,
796 )) as ArrayRef;
797 let struct_arrays = (0..10000)
798 .step_by(437)
800 .map(|offset| struct_array.slice(offset, 437.min(10000 - offset)))
801 .collect::<Vec<_>>();
802 check_round_trip_encoding_of_data(struct_arrays, &TestCases::default(), HashMap::new())
803 .await;
804 }
805}