1use std::{
5 collections::{BinaryHeap, VecDeque},
6 ops::Range,
7 sync::Arc,
8};
9
10use super::{
11 fixed_size_list::StructuralFixedSizeListDecoder, list::StructuralListDecoder,
12 map::StructuralMapDecoder, primitive::StructuralPrimitiveFieldDecoder,
13};
14use crate::{
15 decoder::{
16 DecodedArray, FilterExpression, LoadedPageShard, NextDecodeTask, PageEncoding,
17 ScheduledScanLine, SchedulerContext, StructuralDecodeArrayTask, StructuralFieldDecoder,
18 StructuralFieldScheduler, StructuralSchedulingJob,
19 },
20 encoder::{EncodeTask, EncodedColumn, EncodedPage, FieldEncoder, OutOfLineBuffers},
21 format::pb,
22 repdef::{CompositeRepDefUnraveler, RepDefBuilder},
23};
24use arrow_array::{Array, ArrayRef, StructArray, cast::AsArray};
25use arrow_schema::{DataType, Fields};
26use futures::{
27 FutureExt, StreamExt, TryStreamExt,
28 future::BoxFuture,
29 stream::{FuturesOrdered, FuturesUnordered},
30};
31use itertools::Itertools;
32use lance_arrow::FieldExt;
33use lance_arrow::{deepcopy::deep_copy_nulls, r#struct::StructArrayExt};
34use lance_core::{Error, Result};
35use log::trace;
36
37#[derive(Debug)]
38struct StructuralSchedulingJobWithStatus<'a> {
39 col_idx: u32,
40 col_name: &'a str,
41 job: Box<dyn StructuralSchedulingJob + 'a>,
42 rows_scheduled: u64,
43 rows_remaining: u64,
44 ready_scan_lines: VecDeque<ScheduledScanLine>,
45}
46
47impl PartialEq for StructuralSchedulingJobWithStatus<'_> {
48 fn eq(&self, other: &Self) -> bool {
49 self.col_idx == other.col_idx
50 }
51}
52
53impl Eq for StructuralSchedulingJobWithStatus<'_> {}
54
55impl PartialOrd for StructuralSchedulingJobWithStatus<'_> {
56 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
57 Some(self.cmp(other))
58 }
59}
60
61impl Ord for StructuralSchedulingJobWithStatus<'_> {
62 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
63 other.rows_scheduled.cmp(&self.rows_scheduled)
65 }
66}
67
68#[derive(Debug)]
75struct RepDefStructSchedulingJob<'a> {
76 children: BinaryHeap<StructuralSchedulingJobWithStatus<'a>>,
78 rows_scheduled: u64,
79 num_rows: u64,
80}
81
82impl<'a> RepDefStructSchedulingJob<'a> {
83 fn new(
84 scheduler: &'a StructuralStructScheduler,
85 children: Vec<Box<dyn StructuralSchedulingJob + 'a>>,
86 num_rows: u64,
87 ) -> Self {
88 let children = children
89 .into_iter()
90 .enumerate()
91 .map(|(idx, job)| StructuralSchedulingJobWithStatus {
92 col_idx: idx as u32,
93 col_name: scheduler.child_fields[idx].name(),
94 job,
95 rows_scheduled: 0,
96 rows_remaining: num_rows,
97 ready_scan_lines: VecDeque::new(),
98 })
99 .collect::<BinaryHeap<_>>();
100 Self {
101 children,
102 rows_scheduled: 0,
103 num_rows,
104 }
105 }
106}
107
108impl StructuralSchedulingJob for RepDefStructSchedulingJob<'_> {
109 fn schedule_next(
110 &mut self,
111 mut context: &mut SchedulerContext,
112 ) -> Result<Vec<ScheduledScanLine>> {
113 if self.children.is_empty() {
114 if self.rows_scheduled == self.num_rows {
116 return Ok(Vec::new());
117 }
118 self.rows_scheduled = self.num_rows;
119 return Ok(vec![ScheduledScanLine {
120 decoders: Vec::new(),
121 rows_scheduled: self.num_rows,
122 }]);
123 }
124
125 let mut decoders = Vec::new();
126 let old_rows_scheduled = self.rows_scheduled;
127 while old_rows_scheduled == self.rows_scheduled {
130 if self.children.is_empty() {
131 return Ok(Vec::new());
133 }
134 let mut next_child = self.children.pop().unwrap();
135 if next_child.ready_scan_lines.is_empty() {
136 let scoped = context.push(next_child.col_name, next_child.col_idx);
137 let child_scans = next_child.job.schedule_next(scoped.context)?;
138 context = scoped.pop();
139 if child_scans.is_empty() {
140 continue;
142 }
143 next_child.ready_scan_lines.extend(child_scans);
144 }
145 let child_scan = next_child.ready_scan_lines.pop_front().unwrap();
146 trace!(
147 "Scheduled {} rows for child {}",
148 child_scan.rows_scheduled, next_child.col_idx
149 );
150 next_child.rows_scheduled += child_scan.rows_scheduled;
151 next_child.rows_remaining -= child_scan.rows_scheduled;
152 decoders.extend(child_scan.decoders);
153 self.children.push(next_child);
154 self.rows_scheduled = self.children.peek().unwrap().rows_scheduled;
155 }
156 let struct_rows_scheduled = self.rows_scheduled - old_rows_scheduled;
157 Ok(vec![ScheduledScanLine {
158 decoders,
159 rows_scheduled: struct_rows_scheduled,
160 }])
161 }
162}
163
164#[derive(Debug)]
175pub struct StructuralStructScheduler {
176 children: Vec<Box<dyn StructuralFieldScheduler>>,
177 child_fields: Fields,
178}
179
180impl StructuralStructScheduler {
181 pub fn new(children: Vec<Box<dyn StructuralFieldScheduler>>, child_fields: Fields) -> Self {
182 Self {
183 children,
184 child_fields,
185 }
186 }
187}
188
189impl StructuralFieldScheduler for StructuralStructScheduler {
190 fn schedule_ranges<'a>(
191 &'a self,
192 ranges: &[Range<u64>],
193 filter: &FilterExpression,
194 ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
195 let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
196
197 let child_schedulers = self
198 .children
199 .iter()
200 .map(|child| child.schedule_ranges(ranges, filter))
201 .collect::<Result<Vec<_>>>()?;
202
203 Ok(Box::new(RepDefStructSchedulingJob::new(
204 self,
205 child_schedulers,
206 num_rows,
207 )))
208 }
209
210 fn initialize<'a>(
211 &'a mut self,
212 filter: &'a FilterExpression,
213 context: &'a SchedulerContext,
214 ) -> BoxFuture<'a, Result<()>> {
215 let children_initialization = self
216 .children
217 .iter_mut()
218 .map(|child| child.initialize(filter, context))
219 .collect::<FuturesUnordered<_>>();
220 async move {
221 children_initialization
222 .map(|res| res.map(|_| ()))
223 .try_collect::<Vec<_>>()
224 .await?;
225 Ok(())
226 }
227 .boxed()
228 }
229}
230
231#[derive(Debug)]
232pub struct StructuralStructDecoder {
233 children: Vec<Box<dyn StructuralFieldDecoder>>,
234 data_type: DataType,
235 child_fields: Fields,
236 is_root: bool,
238}
239
240impl StructuralStructDecoder {
241 pub fn new(fields: Fields, should_validate: bool, is_root: bool) -> Result<Self> {
242 let children = fields
243 .iter()
244 .map(|field| Self::field_to_decoder(field, should_validate))
245 .collect::<Result<Vec<_>>>()?;
246 let data_type = DataType::Struct(fields.clone());
247 Ok(Self {
248 data_type,
249 children,
250 child_fields: fields,
251 is_root,
252 })
253 }
254
255 fn field_to_decoder(
256 field: &Arc<arrow_schema::Field>,
257 should_validate: bool,
258 ) -> Result<Box<dyn StructuralFieldDecoder>> {
259 match field.data_type() {
260 DataType::Struct(fields) => {
261 if field.is_packed_struct() || field.is_blob() {
262 let decoder =
263 StructuralPrimitiveFieldDecoder::new(&field.clone(), should_validate);
264 Ok(Box::new(decoder))
265 } else {
266 Ok(Box::new(Self::new(fields.clone(), should_validate, false)?))
267 }
268 }
269 DataType::List(child_field) | DataType::LargeList(child_field) => {
270 let child_decoder = Self::field_to_decoder(child_field, should_validate)?;
271 Ok(Box::new(StructuralListDecoder::new(
272 child_decoder,
273 field.data_type().clone(),
274 )))
275 }
276 DataType::FixedSizeList(child_field, _)
277 if matches!(child_field.data_type(), DataType::Struct(_)) =>
278 {
279 let child_decoder = Self::field_to_decoder(child_field, should_validate)?;
281 Ok(Box::new(StructuralFixedSizeListDecoder::new(
282 child_decoder,
283 field.data_type().clone(),
284 )))
285 }
286 DataType::Map(entries_field, keys_sorted) => {
287 if *keys_sorted {
288 return Err(Error::not_supported_source(
289 "Map data type with keys_sorted=true is not supported yet"
290 .to_string()
291 .into(),
292 ));
293 }
294 let child_decoder = Self::field_to_decoder(entries_field, should_validate)?;
295 Ok(Box::new(StructuralMapDecoder::new(
296 child_decoder,
297 field.data_type().clone(),
298 )))
299 }
300 DataType::RunEndEncoded(_, _) => todo!(),
301 DataType::ListView(_) | DataType::LargeListView(_) => todo!(),
302 DataType::Union(_, _) => todo!(),
303 _ => Ok(Box::new(StructuralPrimitiveFieldDecoder::new(
304 field,
305 should_validate,
306 ))),
307 }
308 }
309
310 pub fn drain_batch_task(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
311 let array_drain = self.drain(num_rows)?;
312 Ok(NextDecodeTask {
313 num_rows,
314 task: Box::new(array_drain),
315 })
316 }
317}
318
319impl StructuralFieldDecoder for StructuralStructDecoder {
320 fn accept_page(&mut self, mut child: LoadedPageShard) -> Result<()> {
321 let child_idx = child.path.pop_front().unwrap();
323 self.children[child_idx as usize].accept_page(child)?;
325 Ok(())
326 }
327
328 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
329 let child_tasks = self
330 .children
331 .iter_mut()
332 .map(|child| child.drain(num_rows))
333 .collect::<Result<Vec<_>>>()?;
334 Ok(Box::new(RepDefStructDecodeTask {
335 children: child_tasks,
336 child_fields: self.child_fields.clone(),
337 is_root: self.is_root,
338 num_rows,
339 }))
340 }
341
342 fn data_type(&self) -> &DataType {
343 &self.data_type
344 }
345}
346
347#[derive(Debug)]
348struct RepDefStructDecodeTask {
349 children: Vec<Box<dyn StructuralDecodeArrayTask>>,
350 child_fields: Fields,
351 is_root: bool,
352 num_rows: u64,
353}
354
355impl StructuralDecodeArrayTask for RepDefStructDecodeTask {
356 fn decode(self: Box<Self>) -> Result<DecodedArray> {
357 if self.children.is_empty() {
358 return Ok(DecodedArray {
359 array: Arc::new(StructArray::new_empty_fields(self.num_rows as usize, None)),
360 repdef: CompositeRepDefUnraveler::new(vec![]),
361 });
362 }
363
364 let arrays = self
365 .children
366 .into_iter()
367 .map(|task| task.decode())
368 .collect::<Result<Vec<_>>>()?;
369 let mut children = Vec::with_capacity(arrays.len());
370 let mut arrays_iter = arrays.into_iter();
371 let first_array = arrays_iter.next().unwrap();
372 let length = first_array.array.len();
373
374 let mut repdef = first_array.repdef;
376 children.push(first_array.array);
377
378 for array in arrays_iter {
379 debug_assert_eq!(length, array.array.len());
380 children.push(array.array);
381 }
382
383 let validity = if self.is_root {
384 None
385 } else {
386 repdef.unravel_validity(length)
387 };
388
389 let array = StructArray::try_new(self.child_fields, children, validity)
390 .map_err(|e| Error::invalid_input_source(e.to_string().into()))?;
391 Ok(DecodedArray {
392 array: Arc::new(array),
393 repdef,
394 })
395 }
396}
397
398pub struct StructStructuralEncoder {
403 keep_original_array: bool,
404 children: Vec<Box<dyn FieldEncoder>>,
405}
406
407impl StructStructuralEncoder {
408 pub fn new(keep_original_array: bool, children: Vec<Box<dyn FieldEncoder>>) -> Self {
409 Self {
410 keep_original_array,
411 children,
412 }
413 }
414}
415
416impl FieldEncoder for StructStructuralEncoder {
417 fn maybe_encode(
418 &mut self,
419 array: ArrayRef,
420 external_buffers: &mut OutOfLineBuffers,
421 mut repdef: RepDefBuilder,
422 row_number: u64,
423 num_rows: u64,
424 ) -> Result<Vec<EncodeTask>> {
425 let struct_array = array.as_struct();
426 let mut struct_array = struct_array.normalize_slicing()?;
427 if let Some(validity) = struct_array.nulls() {
428 if self.keep_original_array {
429 repdef.add_validity_bitmap(validity.clone())
430 } else {
431 repdef.add_validity_bitmap(deep_copy_nulls(Some(validity)).unwrap())
432 }
433 struct_array = struct_array.pushdown_nulls()?;
434 } else {
435 repdef.add_no_null(struct_array.len());
436 }
437 let child_tasks = self
438 .children
439 .iter_mut()
440 .zip(struct_array.columns().iter())
441 .map(|(encoder, arr)| {
442 encoder.maybe_encode(
443 arr.clone(),
444 external_buffers,
445 repdef.clone(),
446 row_number,
447 num_rows,
448 )
449 })
450 .collect::<Result<Vec<_>>>()?;
451 Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
452 }
453
454 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
455 self.children
456 .iter_mut()
457 .map(|encoder| encoder.flush(external_buffers))
458 .flatten_ok()
459 .collect::<Result<Vec<_>>>()
460 }
461
462 fn num_columns(&self) -> u32 {
463 self.children
464 .iter()
465 .map(|child| child.num_columns())
466 .sum::<u32>()
467 }
468
469 fn finish(
470 &mut self,
471 external_buffers: &mut OutOfLineBuffers,
472 ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
473 let mut child_columns = self
474 .children
475 .iter_mut()
476 .map(|child| child.finish(external_buffers))
477 .collect::<FuturesOrdered<_>>();
478 async move {
479 let mut encoded_columns = Vec::with_capacity(child_columns.len());
480 while let Some(child_cols) = child_columns.next().await {
481 encoded_columns.extend(child_cols?);
482 }
483 Ok(encoded_columns)
484 }
485 .boxed()
486 }
487}
488
489pub struct StructFieldEncoder {
490 children: Vec<Box<dyn FieldEncoder>>,
491 column_index: u32,
492 num_rows_seen: u64,
493}
494
495impl StructFieldEncoder {
496 #[allow(dead_code)]
497 pub fn new(children: Vec<Box<dyn FieldEncoder>>, column_index: u32) -> Self {
498 Self {
499 children,
500 column_index,
501 num_rows_seen: 0,
502 }
503 }
504}
505
506impl FieldEncoder for StructFieldEncoder {
507 fn maybe_encode(
508 &mut self,
509 array: ArrayRef,
510 external_buffers: &mut OutOfLineBuffers,
511 repdef: RepDefBuilder,
512 row_number: u64,
513 num_rows: u64,
514 ) -> Result<Vec<EncodeTask>> {
515 self.num_rows_seen += array.len() as u64;
516 let struct_array = array.as_struct();
517 let child_tasks = self
518 .children
519 .iter_mut()
520 .zip(struct_array.columns().iter())
521 .map(|(encoder, arr)| {
522 encoder.maybe_encode(
523 arr.clone(),
524 external_buffers,
525 repdef.clone(),
526 row_number,
527 num_rows,
528 )
529 })
530 .collect::<Result<Vec<_>>>()?;
531 Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
532 }
533
534 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
535 let child_tasks = self
536 .children
537 .iter_mut()
538 .map(|encoder| encoder.flush(external_buffers))
539 .collect::<Result<Vec<_>>>()?;
540 Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
541 }
542
543 fn num_columns(&self) -> u32 {
544 self.children
545 .iter()
546 .map(|child| child.num_columns())
547 .sum::<u32>()
548 + 1
549 }
550
551 fn finish(
552 &mut self,
553 external_buffers: &mut OutOfLineBuffers,
554 ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
555 let mut child_columns = self
556 .children
557 .iter_mut()
558 .map(|child| child.finish(external_buffers))
559 .collect::<FuturesOrdered<_>>();
560 let num_rows_seen = self.num_rows_seen;
561 let column_index = self.column_index;
562 async move {
563 let mut columns = Vec::new();
564 let mut header = EncodedColumn::default();
566 header.final_pages.push(EncodedPage {
567 data: Vec::new(),
568 description: PageEncoding::Legacy(pb::ArrayEncoding {
569 array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
570 pb::SimpleStruct {},
571 )),
572 }),
573 num_rows: num_rows_seen,
574 column_idx: column_index,
575 row_number: 0, });
577 columns.push(header);
578 while let Some(child_cols) = child_columns.next().await {
580 columns.extend(child_cols?);
581 }
582 Ok(columns)
583 }
584 .boxed()
585 }
586}
587
588#[cfg(test)]
589mod tests {
590
591 use std::{collections::HashMap, sync::Arc};
592
593 use arrow_array::{
594 Array, ArrayRef, Int32Array, ListArray, StructArray,
595 builder::{Int32Builder, ListBuilder},
596 };
597 use arrow_buffer::{BooleanBuffer, NullBuffer, OffsetBuffer, ScalarBuffer};
598 use arrow_schema::{DataType, Field, Fields};
599
600 use crate::{
601 testing::{TestCases, check_basic_random, check_round_trip_encoding_of_data},
602 version::LanceFileVersion,
603 };
604
605 #[test_log::test(tokio::test)]
606 async fn test_simple_struct() {
607 let data_type = DataType::Struct(Fields::from(vec![
608 Field::new("a", DataType::Int32, false),
609 Field::new("b", DataType::Int32, false),
610 ]));
611 let field = Field::new("", data_type, false);
612 check_basic_random(field).await;
613 }
614
615 #[test_log::test(tokio::test)]
616 async fn test_nullable_struct() {
617 let inner_fields = Fields::from(vec![
635 Field::new("x", DataType::Int32, false),
636 Field::new("y", DataType::Int32, true),
637 ]);
638 let inner_struct = DataType::Struct(inner_fields.clone());
639 let outer_fields = Fields::from(vec![
640 Field::new("score", DataType::Int32, true),
641 Field::new("location", inner_struct, true),
642 ]);
643
644 let x_vals = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4), Some(5)]);
645 let y_vals = Int32Array::from(vec![Some(6), None, Some(8), Some(9), Some(10)]);
646 let scores = Int32Array::from(vec![None, Some(12), Some(13), Some(14), Some(15)]);
647
648 let location_validity = NullBuffer::from(vec![true, true, true, false, true]);
649 let locations = StructArray::new(
650 inner_fields,
651 vec![Arc::new(x_vals), Arc::new(y_vals)],
652 Some(location_validity),
653 );
654
655 let rows_validity = NullBuffer::from(vec![true, true, true, true, false]);
656 let rows = StructArray::new(
657 outer_fields,
658 vec![Arc::new(scores), Arc::new(locations)],
659 Some(rows_validity),
660 );
661
662 let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
663
664 check_round_trip_encoding_of_data(vec![Arc::new(rows)], &test_cases, HashMap::new()).await;
665 }
666
667 #[test_log::test(tokio::test)]
668 async fn test_simple_masked_nonempty_list() {
669 let items = Int32Array::from(vec![Some(1), Some(2), None, Some(4), Some(5), Some(6)]);
672 let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 2, 3, 4, 4, 4, 5]));
673 let list_validity = BooleanBuffer::from(vec![true, true, true, true, false, true]);
674 let list_array = ListArray::new(
675 Arc::new(Field::new("item", DataType::Int32, true)),
676 offsets,
677 Arc::new(items),
678 Some(NullBuffer::new(list_validity)),
679 );
680 let struct_validity = BooleanBuffer::from(vec![true, true, true, true, true, false]);
681 let struct_array = StructArray::new(
682 Fields::from(vec![Field::new(
683 "inner_list",
684 list_array.data_type().clone(),
685 true,
686 )]),
687 vec![Arc::new(list_array)],
688 Some(NullBuffer::new(struct_validity)),
689 );
690 check_round_trip_encoding_of_data(
691 vec![Arc::new(struct_array)],
692 &TestCases::default().with_min_file_version(LanceFileVersion::V2_1),
693 HashMap::new(),
694 )
695 .await;
696 }
697
698 #[test_log::test(tokio::test)]
699 async fn test_simple_struct_list() {
700 let items = Int32Array::from(vec![Some(1), Some(2), None, Some(4)]);
703 let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 2, 3, 4, 4, 4, 4]));
704 let list_validity = BooleanBuffer::from(vec![true, true, true, true, false, true]);
705 let list_array = ListArray::new(
706 Arc::new(Field::new("item", DataType::Int32, true)),
707 offsets,
708 Arc::new(items),
709 Some(NullBuffer::new(list_validity)),
710 );
711 let struct_validity = BooleanBuffer::from(vec![true, true, true, true, true, false]);
712 let struct_array = StructArray::new(
713 Fields::from(vec![Field::new(
714 "inner_list",
715 list_array.data_type().clone(),
716 true,
717 )]),
718 vec![Arc::new(list_array)],
719 Some(NullBuffer::new(struct_validity)),
720 );
721 check_round_trip_encoding_of_data(
722 vec![Arc::new(struct_array)],
723 &TestCases::default().with_min_file_version(LanceFileVersion::V2_1),
724 HashMap::new(),
725 )
726 .await;
727 }
728
729 #[test_log::test(tokio::test)]
730 async fn test_struct_list() {
731 let data_type = DataType::Struct(Fields::from(vec![
732 Field::new(
733 "inner_list",
734 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
735 true,
736 ),
737 Field::new("outer_int", DataType::Int32, true),
738 ]));
739 let field = Field::new("row", data_type, false);
740 check_basic_random(field).await;
741 }
742
743 #[test_log::test(tokio::test)]
744 async fn test_empty_struct() {
745 let data_type = DataType::Struct(Fields::from(Vec::<Field>::default()));
748 let field = Field::new("row", data_type, false);
749 check_basic_random(field).await;
750 }
751
752 #[test_log::test(tokio::test)]
753 async fn test_complicated_struct() {
754 let data_type = DataType::Struct(Fields::from(vec![
755 Field::new("int", DataType::Int32, true),
756 Field::new(
757 "inner",
758 DataType::Struct(Fields::from(vec![
759 Field::new("inner_int", DataType::Int32, true),
760 Field::new(
761 "inner_list",
762 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
763 true,
764 ),
765 ])),
766 true,
767 ),
768 Field::new("outer_binary", DataType::Binary, true),
769 ]));
770 let field = Field::new("row", data_type, false);
771 check_basic_random(field).await;
772 }
773
774 #[test_log::test(tokio::test)]
775 async fn test_list_of_struct_with_null_struct_element() {
776 use arrow_array::StringArray;
779
780 let tag_array = StringArray::from(vec![
781 Some("valid"),
782 Some("null_struct"),
783 Some("valid"),
784 Some("valid"),
785 ]);
786 let struct_fields = Fields::from(vec![Field::new("tag", DataType::Utf8, true)]);
787 let struct_validity = NullBuffer::from(vec![false, true, false, false]);
789 let struct_array = StructArray::new(
790 struct_fields.clone(),
791 vec![Arc::new(tag_array)],
792 Some(struct_validity),
793 );
794
795 let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 4]));
796 let list_field = Field::new("item", DataType::Struct(struct_fields), true);
797 let list_array =
798 ListArray::new(Arc::new(list_field), offsets, Arc::new(struct_array), None);
799
800 check_round_trip_encoding_of_data(
801 vec![Arc::new(list_array)],
802 &TestCases::default().with_min_file_version(LanceFileVersion::V2_2),
803 HashMap::new(),
804 )
805 .await;
806 }
807
808 #[test_log::test(tokio::test)]
809 async fn test_ragged_scheduling() {
810 let items_builder = Int32Builder::new();
814 let mut list_builder = ListBuilder::new(items_builder);
815 for _ in 0..10000 {
816 list_builder.append_null();
817 }
818 let list_array = Arc::new(list_builder.finish());
819 let int_array = Arc::new(Int32Array::from_iter_values(0..10000));
820 let fields = vec![
821 Field::new("", list_array.data_type().clone(), true),
822 Field::new("", int_array.data_type().clone(), true),
823 ];
824 let struct_array = Arc::new(StructArray::new(
825 Fields::from(fields),
826 vec![list_array, int_array],
827 None,
828 )) as ArrayRef;
829 let struct_arrays = (0..10000)
830 .step_by(437)
832 .map(|offset| struct_array.slice(offset, 437.min(10000 - offset)))
833 .collect::<Vec<_>>();
834 check_round_trip_encoding_of_data(struct_arrays, &TestCases::default(), HashMap::new())
835 .await;
836 }
837}