1use std::{
5 collections::{BinaryHeap, VecDeque},
6 ops::Range,
7 sync::Arc,
8};
9
10use super::{
11 fixed_size_list::StructuralFixedSizeListDecoder, list::StructuralListDecoder,
12 map::StructuralMapDecoder, primitive::StructuralPrimitiveFieldDecoder,
13};
14use crate::{
15 decoder::{
16 DecodedArray, FilterExpression, LoadedPageShard, NextDecodeTask, PageEncoding,
17 ScheduledScanLine, SchedulerContext, StructuralDecodeArrayTask, StructuralFieldDecoder,
18 StructuralFieldScheduler, StructuralSchedulingJob,
19 },
20 encoder::{EncodeTask, EncodedColumn, EncodedPage, FieldEncoder, OutOfLineBuffers},
21 format::pb,
22 repdef::{CompositeRepDefUnraveler, RepDefBuilder},
23};
24use arrow_array::{Array, ArrayRef, StructArray, cast::AsArray};
25use arrow_schema::{DataType, Fields};
26use futures::{
27 FutureExt, StreamExt, TryStreamExt,
28 future::BoxFuture,
29 stream::{FuturesOrdered, FuturesUnordered},
30};
31use itertools::Itertools;
32use lance_arrow::FieldExt;
33use lance_arrow::{deepcopy::deep_copy_nulls, r#struct::StructArrayExt};
34use lance_core::{Error, Result};
35use log::trace;
36
37#[derive(Debug)]
38struct StructuralSchedulingJobWithStatus<'a> {
39 col_idx: u32,
40 col_name: &'a str,
41 job: Box<dyn StructuralSchedulingJob + 'a>,
42 rows_scheduled: u64,
43 rows_remaining: u64,
44 ready_scan_lines: VecDeque<ScheduledScanLine>,
45}
46
47impl PartialEq for StructuralSchedulingJobWithStatus<'_> {
48 fn eq(&self, other: &Self) -> bool {
49 self.col_idx == other.col_idx
50 }
51}
52
53impl Eq for StructuralSchedulingJobWithStatus<'_> {}
54
55impl PartialOrd for StructuralSchedulingJobWithStatus<'_> {
56 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
57 Some(self.cmp(other))
58 }
59}
60
61impl Ord for StructuralSchedulingJobWithStatus<'_> {
62 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
63 other.rows_scheduled.cmp(&self.rows_scheduled)
65 }
66}
67
68#[derive(Debug)]
75struct RepDefStructSchedulingJob<'a> {
76 children: BinaryHeap<StructuralSchedulingJobWithStatus<'a>>,
78 rows_scheduled: u64,
79 num_rows: u64,
80}
81
82impl<'a> RepDefStructSchedulingJob<'a> {
83 fn new(
84 scheduler: &'a StructuralStructScheduler,
85 children: Vec<Box<dyn StructuralSchedulingJob + 'a>>,
86 num_rows: u64,
87 ) -> Self {
88 let children = children
89 .into_iter()
90 .enumerate()
91 .map(|(idx, job)| StructuralSchedulingJobWithStatus {
92 col_idx: idx as u32,
93 col_name: scheduler.child_fields[idx].name(),
94 job,
95 rows_scheduled: 0,
96 rows_remaining: num_rows,
97 ready_scan_lines: VecDeque::new(),
98 })
99 .collect::<BinaryHeap<_>>();
100 Self {
101 children,
102 rows_scheduled: 0,
103 num_rows,
104 }
105 }
106}
107
108impl StructuralSchedulingJob for RepDefStructSchedulingJob<'_> {
109 fn schedule_next(
110 &mut self,
111 mut context: &mut SchedulerContext,
112 ) -> Result<Vec<ScheduledScanLine>> {
113 if self.children.is_empty() {
114 if self.rows_scheduled == self.num_rows {
116 return Ok(Vec::new());
117 }
118 self.rows_scheduled = self.num_rows;
119 return Ok(vec![ScheduledScanLine {
120 decoders: Vec::new(),
121 rows_scheduled: self.num_rows,
122 }]);
123 }
124
125 let mut decoders = Vec::new();
126 let old_rows_scheduled = self.rows_scheduled;
127 while old_rows_scheduled == self.rows_scheduled {
130 if self.children.is_empty() {
131 return Ok(Vec::new());
133 }
134 let mut next_child = self.children.pop().unwrap();
135 if next_child.ready_scan_lines.is_empty() {
136 let scoped = context.push(next_child.col_name, next_child.col_idx);
137 let child_scans = next_child.job.schedule_next(scoped.context)?;
138 context = scoped.pop();
139 if child_scans.is_empty() {
140 continue;
142 }
143 next_child.ready_scan_lines.extend(child_scans);
144 }
145 let child_scan = next_child.ready_scan_lines.pop_front().unwrap();
146 trace!(
147 "Scheduled {} rows for child {}",
148 child_scan.rows_scheduled, next_child.col_idx
149 );
150 next_child.rows_scheduled += child_scan.rows_scheduled;
151 next_child.rows_remaining -= child_scan.rows_scheduled;
152 decoders.extend(child_scan.decoders);
153 self.children.push(next_child);
154 self.rows_scheduled = self.children.peek().unwrap().rows_scheduled;
155 }
156 let struct_rows_scheduled = self.rows_scheduled - old_rows_scheduled;
157 Ok(vec![ScheduledScanLine {
158 decoders,
159 rows_scheduled: struct_rows_scheduled,
160 }])
161 }
162}
163
164#[derive(Debug)]
175pub struct StructuralStructScheduler {
176 children: Vec<Box<dyn StructuralFieldScheduler>>,
177 child_fields: Fields,
178}
179
180impl StructuralStructScheduler {
181 pub fn new(children: Vec<Box<dyn StructuralFieldScheduler>>, child_fields: Fields) -> Self {
182 Self {
183 children,
184 child_fields,
185 }
186 }
187}
188
189impl StructuralFieldScheduler for StructuralStructScheduler {
190 fn schedule_ranges<'a>(
191 &'a self,
192 ranges: &[Range<u64>],
193 filter: &FilterExpression,
194 ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
195 let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
196
197 let child_schedulers = self
198 .children
199 .iter()
200 .map(|child| child.schedule_ranges(ranges, filter))
201 .collect::<Result<Vec<_>>>()?;
202
203 Ok(Box::new(RepDefStructSchedulingJob::new(
204 self,
205 child_schedulers,
206 num_rows,
207 )))
208 }
209
210 fn initialize<'a>(
211 &'a mut self,
212 filter: &'a FilterExpression,
213 context: &'a SchedulerContext,
214 ) -> BoxFuture<'a, Result<()>> {
215 let children_initialization = self
216 .children
217 .iter_mut()
218 .map(|child| child.initialize(filter, context))
219 .collect::<FuturesUnordered<_>>();
220 async move {
221 children_initialization
222 .map(|res| res.map(|_| ()))
223 .try_collect::<Vec<_>>()
224 .await?;
225 Ok(())
226 }
227 .boxed()
228 }
229}
230
231#[derive(Debug)]
232pub struct StructuralStructDecoder {
233 children: Vec<Box<dyn StructuralFieldDecoder>>,
234 data_type: DataType,
235 child_fields: Fields,
236 is_root: bool,
238}
239
240impl StructuralStructDecoder {
241 pub fn new(fields: Fields, should_validate: bool, is_root: bool) -> Result<Self> {
242 let children = fields
243 .iter()
244 .map(|field| Self::field_to_decoder(field, should_validate))
245 .collect::<Result<Vec<_>>>()?;
246 let data_type = DataType::Struct(fields.clone());
247 Ok(Self {
248 data_type,
249 children,
250 child_fields: fields,
251 is_root,
252 })
253 }
254
255 fn field_to_decoder(
256 field: &Arc<arrow_schema::Field>,
257 should_validate: bool,
258 ) -> Result<Box<dyn StructuralFieldDecoder>> {
259 match field.data_type() {
260 DataType::Struct(fields) => {
261 if field.is_packed_struct() || field.is_blob() {
262 let decoder =
263 StructuralPrimitiveFieldDecoder::new(&field.clone(), should_validate);
264 Ok(Box::new(decoder))
265 } else {
266 Ok(Box::new(Self::new(fields.clone(), should_validate, false)?))
267 }
268 }
269 DataType::List(child_field) | DataType::LargeList(child_field) => {
270 let child_decoder = Self::field_to_decoder(child_field, should_validate)?;
271 Ok(Box::new(StructuralListDecoder::new(
272 child_decoder,
273 field.data_type().clone(),
274 )))
275 }
276 DataType::FixedSizeList(child_field, _)
277 if matches!(child_field.data_type(), DataType::Struct(_)) =>
278 {
279 let child_decoder = Self::field_to_decoder(child_field, should_validate)?;
281 Ok(Box::new(StructuralFixedSizeListDecoder::new(
282 child_decoder,
283 field.data_type().clone(),
284 )))
285 }
286 DataType::Map(entries_field, keys_sorted) => {
287 if *keys_sorted {
288 return Err(Error::not_supported_source(
289 "Map data type with keys_sorted=true is not supported yet"
290 .to_string()
291 .into(),
292 ));
293 }
294 let child_decoder = Self::field_to_decoder(entries_field, should_validate)?;
295 Ok(Box::new(StructuralMapDecoder::new(
296 child_decoder,
297 field.data_type().clone(),
298 )))
299 }
300 DataType::RunEndEncoded(_, _) => todo!(),
301 DataType::ListView(_) | DataType::LargeListView(_) => todo!(),
302 DataType::Union(_, _) => todo!(),
303 _ => Ok(Box::new(StructuralPrimitiveFieldDecoder::new(
304 field,
305 should_validate,
306 ))),
307 }
308 }
309
310 pub fn drain_batch_task(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
311 let array_drain = self.drain(num_rows)?;
312 Ok(NextDecodeTask {
313 num_rows,
314 task: Box::new(array_drain),
315 })
316 }
317}
318
319impl StructuralFieldDecoder for StructuralStructDecoder {
320 fn accept_page(&mut self, mut child: LoadedPageShard) -> Result<()> {
321 let child_idx = child.path.pop_front().unwrap();
323 self.children[child_idx as usize].accept_page(child)?;
325 Ok(())
326 }
327
328 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
329 let child_tasks = self
330 .children
331 .iter_mut()
332 .map(|child| child.drain(num_rows))
333 .collect::<Result<Vec<_>>>()?;
334 Ok(Box::new(RepDefStructDecodeTask {
335 children: child_tasks,
336 child_fields: self.child_fields.clone(),
337 is_root: self.is_root,
338 num_rows,
339 }))
340 }
341
342 fn data_type(&self) -> &DataType {
343 &self.data_type
344 }
345}
346
347#[derive(Debug)]
348struct RepDefStructDecodeTask {
349 children: Vec<Box<dyn StructuralDecodeArrayTask>>,
350 child_fields: Fields,
351 is_root: bool,
352 num_rows: u64,
353}
354
355impl StructuralDecodeArrayTask for RepDefStructDecodeTask {
356 fn decode(self: Box<Self>) -> Result<DecodedArray> {
357 if self.children.is_empty() {
358 return Ok(DecodedArray {
359 array: Arc::new(StructArray::new_empty_fields(self.num_rows as usize, None)),
360 repdef: CompositeRepDefUnraveler::new(vec![]),
361 data_size: 0,
362 });
363 }
364
365 let arrays = self
366 .children
367 .into_iter()
368 .map(|task| task.decode())
369 .collect::<Result<Vec<_>>>()?;
370 let mut children = Vec::with_capacity(arrays.len());
371 let mut data_size = 0u64;
372 let mut arrays_iter = arrays.into_iter();
373 let first_array = arrays_iter.next().unwrap();
374 let length = first_array.array.len();
375
376 let mut repdef = first_array.repdef;
378 data_size += first_array.data_size;
379 children.push(first_array.array);
380
381 for array in arrays_iter {
382 debug_assert_eq!(length, array.array.len());
383 data_size += array.data_size;
384 children.push(array.array);
385 }
386
387 let validity = if self.is_root {
388 None
389 } else {
390 repdef.unravel_validity(length)
391 };
392
393 let array = StructArray::try_new(self.child_fields, children, validity)
394 .map_err(|e| Error::invalid_input_source(e.to_string().into()))?;
395 Ok(DecodedArray {
396 array: Arc::new(array),
397 repdef,
398 data_size,
399 })
400 }
401}
402
403pub struct StructStructuralEncoder {
408 keep_original_array: bool,
409 children: Vec<Box<dyn FieldEncoder>>,
410}
411
412impl StructStructuralEncoder {
413 pub fn new(keep_original_array: bool, children: Vec<Box<dyn FieldEncoder>>) -> Self {
414 Self {
415 keep_original_array,
416 children,
417 }
418 }
419}
420
421impl FieldEncoder for StructStructuralEncoder {
422 fn maybe_encode(
423 &mut self,
424 array: ArrayRef,
425 external_buffers: &mut OutOfLineBuffers,
426 mut repdef: RepDefBuilder,
427 row_number: u64,
428 num_rows: u64,
429 ) -> Result<Vec<EncodeTask>> {
430 let struct_array = array.as_struct();
431 let mut struct_array = struct_array.normalize_slicing()?;
432 if let Some(validity) = struct_array.nulls() {
433 if self.keep_original_array {
434 repdef.add_validity_bitmap(validity.clone())
435 } else {
436 repdef.add_validity_bitmap(deep_copy_nulls(Some(validity)).unwrap())
437 }
438 struct_array = struct_array.pushdown_nulls()?;
439 } else {
440 repdef.add_no_null(struct_array.len());
441 }
442 let child_tasks = self
443 .children
444 .iter_mut()
445 .zip(struct_array.columns().iter())
446 .map(|(encoder, arr)| {
447 encoder.maybe_encode(
448 arr.clone(),
449 external_buffers,
450 repdef.clone(),
451 row_number,
452 num_rows,
453 )
454 })
455 .collect::<Result<Vec<_>>>()?;
456 Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
457 }
458
459 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
460 self.children
461 .iter_mut()
462 .map(|encoder| encoder.flush(external_buffers))
463 .flatten_ok()
464 .collect::<Result<Vec<_>>>()
465 }
466
467 fn num_columns(&self) -> u32 {
468 self.children
469 .iter()
470 .map(|child| child.num_columns())
471 .sum::<u32>()
472 }
473
474 fn finish(
475 &mut self,
476 external_buffers: &mut OutOfLineBuffers,
477 ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
478 let mut child_columns = self
479 .children
480 .iter_mut()
481 .map(|child| child.finish(external_buffers))
482 .collect::<FuturesOrdered<_>>();
483 async move {
484 let mut encoded_columns = Vec::with_capacity(child_columns.len());
485 while let Some(child_cols) = child_columns.next().await {
486 encoded_columns.extend(child_cols?);
487 }
488 Ok(encoded_columns)
489 }
490 .boxed()
491 }
492}
493
494pub struct StructFieldEncoder {
495 children: Vec<Box<dyn FieldEncoder>>,
496 column_index: u32,
497 num_rows_seen: u64,
498}
499
500impl StructFieldEncoder {
501 pub fn new(children: Vec<Box<dyn FieldEncoder>>, column_index: u32) -> Self {
502 Self {
503 children,
504 column_index,
505 num_rows_seen: 0,
506 }
507 }
508}
509
510impl FieldEncoder for StructFieldEncoder {
511 fn maybe_encode(
512 &mut self,
513 array: ArrayRef,
514 external_buffers: &mut OutOfLineBuffers,
515 repdef: RepDefBuilder,
516 row_number: u64,
517 num_rows: u64,
518 ) -> Result<Vec<EncodeTask>> {
519 self.num_rows_seen += array.len() as u64;
520 let struct_array = array.as_struct();
521 let child_tasks = self
522 .children
523 .iter_mut()
524 .zip(struct_array.columns().iter())
525 .map(|(encoder, arr)| {
526 encoder.maybe_encode(
527 arr.clone(),
528 external_buffers,
529 repdef.clone(),
530 row_number,
531 num_rows,
532 )
533 })
534 .collect::<Result<Vec<_>>>()?;
535 Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
536 }
537
538 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
539 let child_tasks = self
540 .children
541 .iter_mut()
542 .map(|encoder| encoder.flush(external_buffers))
543 .collect::<Result<Vec<_>>>()?;
544 Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
545 }
546
547 fn num_columns(&self) -> u32 {
548 self.children
549 .iter()
550 .map(|child| child.num_columns())
551 .sum::<u32>()
552 + 1
553 }
554
555 fn finish(
556 &mut self,
557 external_buffers: &mut OutOfLineBuffers,
558 ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
559 let mut child_columns = self
560 .children
561 .iter_mut()
562 .map(|child| child.finish(external_buffers))
563 .collect::<FuturesOrdered<_>>();
564 let num_rows_seen = self.num_rows_seen;
565 let column_index = self.column_index;
566 async move {
567 let mut columns = Vec::new();
568 let mut header = EncodedColumn::default();
570 header.final_pages.push(EncodedPage {
571 data: Vec::new(),
572 description: PageEncoding::Legacy(pb::ArrayEncoding {
573 array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
574 pb::SimpleStruct {},
575 )),
576 }),
577 num_rows: num_rows_seen,
578 column_idx: column_index,
579 row_number: 0, });
581 columns.push(header);
582 while let Some(child_cols) = child_columns.next().await {
584 columns.extend(child_cols?);
585 }
586 Ok(columns)
587 }
588 .boxed()
589 }
590}
591
592#[cfg(test)]
593mod tests {
594
595 use std::{collections::HashMap, sync::Arc};
596
597 use arrow_array::{
598 Array, ArrayRef, Int32Array, ListArray, StructArray,
599 builder::{Int32Builder, ListBuilder},
600 };
601 use arrow_buffer::{BooleanBuffer, NullBuffer, OffsetBuffer, ScalarBuffer};
602 use arrow_schema::{DataType, Field, Fields};
603
604 use crate::{
605 testing::{TestCases, check_basic_random, check_round_trip_encoding_of_data},
606 version::LanceFileVersion,
607 };
608
609 #[test_log::test(tokio::test)]
610 async fn test_simple_struct() {
611 let data_type = DataType::Struct(Fields::from(vec![
612 Field::new("a", DataType::Int32, false),
613 Field::new("b", DataType::Int32, false),
614 ]));
615 let field = Field::new("", data_type, false);
616 check_basic_random(field).await;
617 }
618
619 #[test_log::test(tokio::test)]
620 async fn test_nullable_struct() {
621 let inner_fields = Fields::from(vec![
639 Field::new("x", DataType::Int32, false),
640 Field::new("y", DataType::Int32, true),
641 ]);
642 let inner_struct = DataType::Struct(inner_fields.clone());
643 let outer_fields = Fields::from(vec![
644 Field::new("score", DataType::Int32, true),
645 Field::new("location", inner_struct, true),
646 ]);
647
648 let x_vals = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4), Some(5)]);
649 let y_vals = Int32Array::from(vec![Some(6), None, Some(8), Some(9), Some(10)]);
650 let scores = Int32Array::from(vec![None, Some(12), Some(13), Some(14), Some(15)]);
651
652 let location_validity = NullBuffer::from(vec![true, true, true, false, true]);
653 let locations = StructArray::new(
654 inner_fields,
655 vec![Arc::new(x_vals), Arc::new(y_vals)],
656 Some(location_validity),
657 );
658
659 let rows_validity = NullBuffer::from(vec![true, true, true, true, false]);
660 let rows = StructArray::new(
661 outer_fields,
662 vec![Arc::new(scores), Arc::new(locations)],
663 Some(rows_validity),
664 );
665
666 let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
667
668 check_round_trip_encoding_of_data(vec![Arc::new(rows)], &test_cases, HashMap::new()).await;
669 }
670
671 #[test_log::test(tokio::test)]
672 async fn test_simple_masked_nonempty_list() {
673 let items = Int32Array::from(vec![Some(1), Some(2), None, Some(4), Some(5), Some(6)]);
676 let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 2, 3, 4, 4, 4, 5]));
677 let list_validity = BooleanBuffer::from(vec![true, true, true, true, false, true]);
678 let list_array = ListArray::new(
679 Arc::new(Field::new("item", DataType::Int32, true)),
680 offsets,
681 Arc::new(items),
682 Some(NullBuffer::new(list_validity)),
683 );
684 let struct_validity = BooleanBuffer::from(vec![true, true, true, true, true, false]);
685 let struct_array = StructArray::new(
686 Fields::from(vec![Field::new(
687 "inner_list",
688 list_array.data_type().clone(),
689 true,
690 )]),
691 vec![Arc::new(list_array)],
692 Some(NullBuffer::new(struct_validity)),
693 );
694 check_round_trip_encoding_of_data(
695 vec![Arc::new(struct_array)],
696 &TestCases::default().with_min_file_version(LanceFileVersion::V2_1),
697 HashMap::new(),
698 )
699 .await;
700 }
701
702 #[test_log::test(tokio::test)]
703 async fn test_simple_struct_list() {
704 let items = Int32Array::from(vec![Some(1), Some(2), None, Some(4)]);
707 let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 2, 3, 4, 4, 4, 4]));
708 let list_validity = BooleanBuffer::from(vec![true, true, true, true, false, true]);
709 let list_array = ListArray::new(
710 Arc::new(Field::new("item", DataType::Int32, true)),
711 offsets,
712 Arc::new(items),
713 Some(NullBuffer::new(list_validity)),
714 );
715 let struct_validity = BooleanBuffer::from(vec![true, true, true, true, true, false]);
716 let struct_array = StructArray::new(
717 Fields::from(vec![Field::new(
718 "inner_list",
719 list_array.data_type().clone(),
720 true,
721 )]),
722 vec![Arc::new(list_array)],
723 Some(NullBuffer::new(struct_validity)),
724 );
725 check_round_trip_encoding_of_data(
726 vec![Arc::new(struct_array)],
727 &TestCases::default().with_min_file_version(LanceFileVersion::V2_1),
728 HashMap::new(),
729 )
730 .await;
731 }
732
733 #[test_log::test(tokio::test)]
734 async fn test_struct_list() {
735 let data_type = DataType::Struct(Fields::from(vec![
736 Field::new(
737 "inner_list",
738 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
739 true,
740 ),
741 Field::new("outer_int", DataType::Int32, true),
742 ]));
743 let field = Field::new("row", data_type, false);
744 check_basic_random(field).await;
745 }
746
747 #[test_log::test(tokio::test)]
748 async fn test_empty_struct() {
749 let data_type = DataType::Struct(Fields::from(Vec::<Field>::default()));
752 let field = Field::new("row", data_type, false);
753 check_basic_random(field).await;
754 }
755
756 #[test_log::test(tokio::test)]
757 async fn test_complicated_struct() {
758 let data_type = DataType::Struct(Fields::from(vec![
759 Field::new("int", DataType::Int32, true),
760 Field::new(
761 "inner",
762 DataType::Struct(Fields::from(vec![
763 Field::new("inner_int", DataType::Int32, true),
764 Field::new(
765 "inner_list",
766 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
767 true,
768 ),
769 ])),
770 true,
771 ),
772 Field::new("outer_binary", DataType::Binary, true),
773 ]));
774 let field = Field::new("row", data_type, false);
775 check_basic_random(field).await;
776 }
777
778 #[test_log::test(tokio::test)]
779 async fn test_list_of_struct_with_null_struct_element() {
780 use arrow_array::StringArray;
783
784 let tag_array = StringArray::from(vec![
785 Some("valid"),
786 Some("null_struct"),
787 Some("valid"),
788 Some("valid"),
789 ]);
790 let struct_fields = Fields::from(vec![Field::new("tag", DataType::Utf8, true)]);
791 let struct_validity = NullBuffer::from(vec![false, true, false, false]);
793 let struct_array = StructArray::new(
794 struct_fields.clone(),
795 vec![Arc::new(tag_array)],
796 Some(struct_validity),
797 );
798
799 let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 4]));
800 let list_field = Field::new("item", DataType::Struct(struct_fields), true);
801 let list_array =
802 ListArray::new(Arc::new(list_field), offsets, Arc::new(struct_array), None);
803
804 check_round_trip_encoding_of_data(
805 vec![Arc::new(list_array)],
806 &TestCases::default().with_min_file_version(LanceFileVersion::V2_2),
807 HashMap::new(),
808 )
809 .await;
810 }
811
812 #[test_log::test(tokio::test)]
813 async fn test_ragged_scheduling() {
814 let items_builder = Int32Builder::new();
818 let mut list_builder = ListBuilder::new(items_builder);
819 for _ in 0..10000 {
820 list_builder.append_null();
821 }
822 let list_array = Arc::new(list_builder.finish());
823 let int_array = Arc::new(Int32Array::from_iter_values(0..10000));
824 let fields = vec![
825 Field::new("", list_array.data_type().clone(), true),
826 Field::new("", int_array.data_type().clone(), true),
827 ];
828 let struct_array = Arc::new(StructArray::new(
829 Fields::from(fields),
830 vec![list_array, int_array],
831 None,
832 )) as ArrayRef;
833 let struct_arrays = (0..10000)
834 .step_by(437)
836 .map(|offset| struct_array.slice(offset, 437.min(10000 - offset)))
837 .collect::<Vec<_>>();
838 check_round_trip_encoding_of_data(struct_arrays, &TestCases::default(), HashMap::new())
839 .await;
840 }
841}