1use std::{
5 collections::{BinaryHeap, VecDeque},
6 ops::Range,
7 sync::Arc,
8};
9
10use arrow_array::{cast::AsArray, Array, ArrayRef, StructArray};
11use arrow_schema::{DataType, Fields};
12use futures::{
13 future::BoxFuture,
14 stream::{FuturesOrdered, FuturesUnordered},
15 FutureExt, StreamExt, TryStreamExt,
16};
17use itertools::Itertools;
18use lance_arrow::FieldExt;
19use log::trace;
20use snafu::location;
21
22use crate::{
23 decoder::{
24 DecodeArrayTask, DecodedArray, DecoderReady, FieldScheduler, FilterExpression, LoadedPage,
25 LogicalPageDecoder, MessageType, NextDecodeTask, PageEncoding, PriorityRange,
26 ScheduledScanLine, SchedulerContext, SchedulingJob, StructuralDecodeArrayTask,
27 StructuralFieldDecoder, StructuralFieldScheduler, StructuralSchedulingJob,
28 },
29 encoder::{EncodeTask, EncodedColumn, EncodedPage, FieldEncoder, OutOfLineBuffers},
30 format::pb,
31 repdef::RepDefBuilder,
32};
33use lance_core::{Error, Result};
34
35use super::{list::StructuralListDecoder, primitive::StructuralPrimitiveFieldDecoder};
36
37#[derive(Debug)]
38struct SchedulingJobWithStatus<'a> {
39 col_idx: u32,
40 col_name: &'a str,
41 job: Box<dyn SchedulingJob + 'a>,
42 rows_scheduled: u64,
43 rows_remaining: u64,
44}
45
46impl PartialEq for SchedulingJobWithStatus<'_> {
47 fn eq(&self, other: &Self) -> bool {
48 self.col_idx == other.col_idx
49 }
50}
51
52impl Eq for SchedulingJobWithStatus<'_> {}
53
54impl PartialOrd for SchedulingJobWithStatus<'_> {
55 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
56 Some(self.cmp(other))
57 }
58}
59
60impl Ord for SchedulingJobWithStatus<'_> {
61 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
62 other.rows_scheduled.cmp(&self.rows_scheduled)
64 }
65}
66
67#[derive(Debug)]
74struct SimpleStructSchedulerJob<'a> {
75 scheduler: &'a SimpleStructScheduler,
76 children: BinaryHeap<SchedulingJobWithStatus<'a>>,
78 rows_scheduled: u64,
79 num_rows: u64,
80 initialized: bool,
81}
82
83impl<'a> SimpleStructSchedulerJob<'a> {
84 fn new(
85 scheduler: &'a SimpleStructScheduler,
86 children: Vec<Box<dyn SchedulingJob + 'a>>,
87 num_rows: u64,
88 ) -> Self {
89 let children = children
90 .into_iter()
91 .enumerate()
92 .map(|(idx, job)| SchedulingJobWithStatus {
93 col_idx: idx as u32,
94 col_name: scheduler.child_fields[idx].name(),
95 job,
96 rows_scheduled: 0,
97 rows_remaining: num_rows,
98 })
99 .collect::<BinaryHeap<_>>();
100 Self {
101 scheduler,
102 children,
103 rows_scheduled: 0,
104 num_rows,
105 initialized: false,
106 }
107 }
108}
109
110impl SchedulingJob for SimpleStructSchedulerJob<'_> {
111 fn schedule_next(
112 &mut self,
113 mut context: &mut SchedulerContext,
114 priority: &dyn PriorityRange,
115 ) -> Result<ScheduledScanLine> {
116 let mut decoders = Vec::new();
117 if !self.initialized {
118 let struct_decoder = Box::new(SimpleStructDecoder::new(
121 self.scheduler.child_fields.clone(),
122 self.num_rows,
123 ));
124 let struct_decoder = context.locate_decoder(struct_decoder);
125 decoders.push(MessageType::DecoderReady(struct_decoder));
126 self.initialized = true;
127 }
128 let old_rows_scheduled = self.rows_scheduled;
129 while old_rows_scheduled == self.rows_scheduled {
132 let mut next_child = self.children.pop().unwrap();
133 trace!("Scheduling more rows for child {}", next_child.col_idx);
134 let scoped = context.push(next_child.col_name, next_child.col_idx);
135 let child_scan = next_child.job.schedule_next(scoped.context, priority)?;
136 trace!(
137 "Scheduled {} rows for child {}",
138 child_scan.rows_scheduled,
139 next_child.col_idx
140 );
141 next_child.rows_scheduled += child_scan.rows_scheduled;
142 next_child.rows_remaining -= child_scan.rows_scheduled;
143 decoders.extend(child_scan.decoders);
144 self.children.push(next_child);
145 self.rows_scheduled = self.children.peek().unwrap().rows_scheduled;
146 context = scoped.pop();
147 }
148 let struct_rows_scheduled = self.rows_scheduled - old_rows_scheduled;
149 Ok(ScheduledScanLine {
150 decoders,
151 rows_scheduled: struct_rows_scheduled,
152 })
153 }
154
155 fn num_rows(&self) -> u64 {
156 self.num_rows
157 }
158}
159
160#[derive(Debug)]
171pub struct SimpleStructScheduler {
172 children: Vec<Arc<dyn FieldScheduler>>,
173 child_fields: Fields,
174 num_rows: u64,
175}
176
177impl SimpleStructScheduler {
178 pub fn new(children: Vec<Arc<dyn FieldScheduler>>, child_fields: Fields) -> Self {
179 debug_assert!(!children.is_empty());
180 let num_rows = children[0].num_rows();
181 debug_assert!(children.iter().all(|child| child.num_rows() == num_rows));
182 Self {
183 children,
184 child_fields,
185 num_rows,
186 }
187 }
188}
189
190impl FieldScheduler for SimpleStructScheduler {
191 fn schedule_ranges<'a>(
192 &'a self,
193 ranges: &[Range<u64>],
194 filter: &FilterExpression,
195 ) -> Result<Box<dyn SchedulingJob + 'a>> {
196 let child_schedulers = self
197 .children
198 .iter()
199 .map(|child| child.schedule_ranges(ranges, filter))
200 .collect::<Result<Vec<_>>>()?;
201 let num_rows = child_schedulers[0].num_rows();
202 Ok(Box::new(SimpleStructSchedulerJob::new(
203 self,
204 child_schedulers,
205 num_rows,
206 )))
207 }
208
209 fn num_rows(&self) -> u64 {
210 self.num_rows
211 }
212
213 fn initialize<'a>(
214 &'a self,
215 _filter: &'a FilterExpression,
216 _context: &'a SchedulerContext,
217 ) -> BoxFuture<'a, Result<()>> {
218 let futures = self
219 .children
220 .iter()
221 .map(|child| child.initialize(_filter, _context))
222 .collect::<FuturesUnordered<_>>();
223 async move {
224 futures
225 .map(|res| res.map(|_| ()))
226 .try_collect::<Vec<_>>()
227 .await?;
228 Ok(())
229 }
230 .boxed()
231 }
232}
233
234#[derive(Debug)]
235struct StructuralSchedulingJobWithStatus<'a> {
236 col_idx: u32,
237 col_name: &'a str,
238 job: Box<dyn StructuralSchedulingJob + 'a>,
239 rows_scheduled: u64,
240 rows_remaining: u64,
241}
242
243impl PartialEq for StructuralSchedulingJobWithStatus<'_> {
244 fn eq(&self, other: &Self) -> bool {
245 self.col_idx == other.col_idx
246 }
247}
248
249impl Eq for StructuralSchedulingJobWithStatus<'_> {}
250
251impl PartialOrd for StructuralSchedulingJobWithStatus<'_> {
252 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
253 Some(self.cmp(other))
254 }
255}
256
257impl Ord for StructuralSchedulingJobWithStatus<'_> {
258 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
259 other.rows_scheduled.cmp(&self.rows_scheduled)
261 }
262}
263
264#[derive(Debug)]
271struct RepDefStructSchedulingJob<'a> {
272 children: BinaryHeap<StructuralSchedulingJobWithStatus<'a>>,
274 rows_scheduled: u64,
275}
276
277impl<'a> RepDefStructSchedulingJob<'a> {
278 fn new(
279 scheduler: &'a StructuralStructScheduler,
280 children: Vec<Box<dyn StructuralSchedulingJob + 'a>>,
281 num_rows: u64,
282 ) -> Self {
283 let children = children
284 .into_iter()
285 .enumerate()
286 .map(|(idx, job)| StructuralSchedulingJobWithStatus {
287 col_idx: idx as u32,
288 col_name: scheduler.child_fields[idx].name(),
289 job,
290 rows_scheduled: 0,
291 rows_remaining: num_rows,
292 })
293 .collect::<BinaryHeap<_>>();
294 Self {
295 children,
296 rows_scheduled: 0,
297 }
298 }
299}
300
301impl StructuralSchedulingJob for RepDefStructSchedulingJob<'_> {
302 fn schedule_next(
303 &mut self,
304 mut context: &mut SchedulerContext,
305 ) -> Result<Option<ScheduledScanLine>> {
306 let mut decoders = Vec::new();
307 let old_rows_scheduled = self.rows_scheduled;
308 while old_rows_scheduled == self.rows_scheduled {
311 let mut next_child = self.children.pop().unwrap();
312 let scoped = context.push(next_child.col_name, next_child.col_idx);
313 let child_scan = next_child.job.schedule_next(scoped.context)?;
314 if child_scan.is_none() {
317 return Ok(None);
318 }
319 let child_scan = child_scan.unwrap();
320
321 trace!(
322 "Scheduled {} rows for child {}",
323 child_scan.rows_scheduled,
324 next_child.col_idx
325 );
326 next_child.rows_scheduled += child_scan.rows_scheduled;
327 next_child.rows_remaining -= child_scan.rows_scheduled;
328 decoders.extend(child_scan.decoders);
329 self.children.push(next_child);
330 self.rows_scheduled = self.children.peek().unwrap().rows_scheduled;
331 context = scoped.pop();
332 }
333 let struct_rows_scheduled = self.rows_scheduled - old_rows_scheduled;
334 Ok(Some(ScheduledScanLine {
335 decoders,
336 rows_scheduled: struct_rows_scheduled,
337 }))
338 }
339}
340
341#[derive(Debug)]
352pub struct StructuralStructScheduler {
353 children: Vec<Box<dyn StructuralFieldScheduler>>,
354 child_fields: Fields,
355}
356
357impl StructuralStructScheduler {
358 pub fn new(children: Vec<Box<dyn StructuralFieldScheduler>>, child_fields: Fields) -> Self {
359 debug_assert!(!children.is_empty());
360 Self {
361 children,
362 child_fields,
363 }
364 }
365}
366
367impl StructuralFieldScheduler for StructuralStructScheduler {
368 fn schedule_ranges<'a>(
369 &'a self,
370 ranges: &[Range<u64>],
371 filter: &FilterExpression,
372 ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
373 let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
374
375 let child_schedulers = self
376 .children
377 .iter()
378 .map(|child| child.schedule_ranges(ranges, filter))
379 .collect::<Result<Vec<_>>>()?;
380
381 Ok(Box::new(RepDefStructSchedulingJob::new(
382 self,
383 child_schedulers,
384 num_rows,
385 )))
386 }
387
388 fn initialize<'a>(
389 &'a mut self,
390 filter: &'a FilterExpression,
391 context: &'a SchedulerContext,
392 ) -> BoxFuture<'a, Result<()>> {
393 let children_initialization = self
394 .children
395 .iter_mut()
396 .map(|child| child.initialize(filter, context))
397 .collect::<FuturesUnordered<_>>();
398 async move {
399 children_initialization
400 .map(|res| res.map(|_| ()))
401 .try_collect::<Vec<_>>()
402 .await?;
403 Ok(())
404 }
405 .boxed()
406 }
407}
408
409#[derive(Debug)]
410struct ChildState {
411 scheduled: VecDeque<Box<dyn LogicalPageDecoder>>,
422 rows_loaded: u64,
424 rows_drained: u64,
426 rows_popped: u64,
428 num_rows: u64,
430 field_index: u32,
432}
433
434struct CompositeDecodeTask {
435 tasks: Vec<Box<dyn DecodeArrayTask>>,
437 num_rows: u64,
438 has_more: bool,
439}
440
441impl CompositeDecodeTask {
442 fn decode(self) -> Result<ArrayRef> {
443 let arrays = self
444 .tasks
445 .into_iter()
446 .map(|task| task.decode())
447 .collect::<Result<Vec<_>>>()?;
448 let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
449 Ok(arrow_select::concat::concat(&array_refs)?)
456 }
457}
458
459impl ChildState {
460 fn new(num_rows: u64, field_index: u32) -> Self {
461 Self {
462 scheduled: VecDeque::new(),
463 rows_loaded: 0,
464 rows_drained: 0,
465 rows_popped: 0,
466 num_rows,
467 field_index,
468 }
469 }
470
471 async fn wait_for_loaded(&mut self, loaded_need: u64) -> Result<()> {
476 trace!(
477 "Struct child {} waiting for more than {} rows to be loaded and {} are fully loaded already",
478 self.field_index,
479 loaded_need,
480 self.rows_loaded,
481 );
482 let mut fully_loaded = self.rows_popped;
483 for (page_idx, next_decoder) in self.scheduled.iter_mut().enumerate() {
484 if next_decoder.rows_unloaded() > 0 {
485 let mut current_need = loaded_need;
486 current_need -= fully_loaded;
487 let rows_in_page = next_decoder.num_rows();
488 let need_for_page = (rows_in_page - 1).min(current_need);
489 trace!(
490 "Struct child {} page {} will wait until more than {} rows loaded from page with {} rows",
491 self.field_index,
492 page_idx,
493 need_for_page,
494 rows_in_page,
495 );
496 next_decoder.wait_for_loaded(need_for_page).await?;
502 let now_loaded = next_decoder.rows_loaded();
503 fully_loaded += now_loaded;
504 trace!(
505 "Struct child {} page {} await and now has {} loaded rows and we have {} fully loaded",
506 self.field_index,
507 page_idx,
508 now_loaded,
509 fully_loaded
510 );
511 } else {
512 fully_loaded += next_decoder.num_rows();
513 }
514 if fully_loaded > loaded_need {
515 break;
516 }
517 }
518 self.rows_loaded = fully_loaded;
519 trace!(
520 "Struct child {} loaded {} new rows and now {} are loaded",
521 self.field_index,
522 fully_loaded,
523 self.rows_loaded
524 );
525 Ok(())
526 }
527
528 fn drain(&mut self, num_rows: u64) -> Result<CompositeDecodeTask> {
529 trace!("Struct draining {} rows", num_rows);
530
531 trace!(
532 "Draining {} rows from struct page with {} rows already drained",
533 num_rows,
534 self.rows_drained
535 );
536 let mut remaining = num_rows;
537 let mut composite = CompositeDecodeTask {
538 tasks: Vec::new(),
539 num_rows: 0,
540 has_more: true,
541 };
542 while remaining > 0 {
543 let next = self.scheduled.front_mut().unwrap();
544 let rows_to_take = remaining.min(next.rows_left());
545 let next_task = next.drain(rows_to_take)?;
546 if next.rows_left() == 0 {
547 trace!("Completely drained page");
548 self.rows_popped += next.num_rows();
549 self.scheduled.pop_front();
550 }
551 remaining -= rows_to_take;
552 composite.tasks.push(next_task.task);
553 composite.num_rows += next_task.num_rows;
554 }
555 self.rows_drained += num_rows;
556 composite.has_more = self.rows_drained != self.num_rows;
557 Ok(composite)
558 }
559}
560
561struct WaitOrder<'a>(&'a mut ChildState);
563
564impl Eq for WaitOrder<'_> {}
565impl PartialEq for WaitOrder<'_> {
566 fn eq(&self, other: &Self) -> bool {
567 self.0.rows_loaded == other.0.rows_loaded
568 }
569}
570impl Ord for WaitOrder<'_> {
571 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
572 other.0.rows_loaded.cmp(&self.0.rows_loaded)
574 }
575}
576impl PartialOrd for WaitOrder<'_> {
577 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
578 Some(self.cmp(other))
579 }
580}
581
582#[derive(Debug)]
583pub struct StructuralStructDecoder {
584 children: Vec<Box<dyn StructuralFieldDecoder>>,
585 data_type: DataType,
586 child_fields: Fields,
587 is_root: bool,
589}
590
591impl StructuralStructDecoder {
592 pub fn new(fields: Fields, should_validate: bool, is_root: bool) -> Self {
593 let children = fields
594 .iter()
595 .map(|field| Self::field_to_decoder(field, should_validate))
596 .collect();
597 let data_type = DataType::Struct(fields.clone());
598 Self {
599 data_type,
600 children,
601 child_fields: fields,
602 is_root,
603 }
604 }
605
606 fn field_to_decoder(
607 field: &Arc<arrow_schema::Field>,
608 should_validate: bool,
609 ) -> Box<dyn StructuralFieldDecoder> {
610 match field.data_type() {
611 DataType::Struct(fields) => {
612 if field.is_packed_struct() {
613 let decoder =
614 StructuralPrimitiveFieldDecoder::new(&field.clone(), should_validate);
615 Box::new(decoder)
616 } else {
617 Box::new(Self::new(fields.clone(), should_validate, false))
618 }
619 }
620 DataType::List(child_field) | DataType::LargeList(child_field) => {
621 let child_decoder = Self::field_to_decoder(child_field, should_validate);
622 Box::new(StructuralListDecoder::new(
623 child_decoder,
624 field.data_type().clone(),
625 ))
626 }
627 DataType::RunEndEncoded(_, _) => todo!(),
628 DataType::ListView(_) | DataType::LargeListView(_) => todo!(),
629 DataType::Map(_, _) => todo!(),
630 DataType::Union(_, _) => todo!(),
631 _ => Box::new(StructuralPrimitiveFieldDecoder::new(field, should_validate)),
632 }
633 }
634
635 pub fn drain_batch_task(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
636 let array_drain = self.drain(num_rows)?;
637 Ok(NextDecodeTask {
638 num_rows,
639 task: Box::new(array_drain),
640 })
641 }
642}
643
644impl StructuralFieldDecoder for StructuralStructDecoder {
645 fn accept_page(&mut self, mut child: LoadedPage) -> Result<()> {
646 let child_idx = child.path.pop_front().unwrap();
648 self.children[child_idx as usize].accept_page(child)?;
650 Ok(())
651 }
652
653 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
654 let child_tasks = self
655 .children
656 .iter_mut()
657 .map(|child| child.drain(num_rows))
658 .collect::<Result<Vec<_>>>()?;
659 Ok(Box::new(RepDefStructDecodeTask {
660 children: child_tasks,
661 child_fields: self.child_fields.clone(),
662 is_root: self.is_root,
663 }))
664 }
665
666 fn data_type(&self) -> &DataType {
667 &self.data_type
668 }
669}
670
671#[derive(Debug)]
672struct RepDefStructDecodeTask {
673 children: Vec<Box<dyn StructuralDecodeArrayTask>>,
674 child_fields: Fields,
675 is_root: bool,
676}
677
678impl StructuralDecodeArrayTask for RepDefStructDecodeTask {
679 fn decode(self: Box<Self>) -> Result<DecodedArray> {
680 let arrays = self
681 .children
682 .into_iter()
683 .map(|task| task.decode())
684 .collect::<Result<Vec<_>>>()?;
685 let mut children = Vec::with_capacity(arrays.len());
686 let mut arrays_iter = arrays.into_iter();
687 let first_array = arrays_iter.next().unwrap();
688 let length = first_array.array.len();
689
690 let mut repdef = first_array.repdef;
692 children.push(first_array.array);
693
694 for array in arrays_iter {
695 debug_assert_eq!(length, array.array.len());
696 children.push(array.array);
697 }
698
699 let validity = if self.is_root {
700 None
701 } else {
702 repdef.unravel_validity(length)
703 };
704 let array = StructArray::new(self.child_fields, children, validity);
705 Ok(DecodedArray {
706 array: Arc::new(array),
707 repdef,
708 })
709 }
710}
711
712#[derive(Debug)]
713pub struct SimpleStructDecoder {
714 children: Vec<ChildState>,
715 child_fields: Fields,
716 data_type: DataType,
717 num_rows: u64,
718}
719
720impl SimpleStructDecoder {
721 pub fn new(child_fields: Fields, num_rows: u64) -> Self {
722 let data_type = DataType::Struct(child_fields.clone());
723 Self {
724 children: child_fields
725 .iter()
726 .enumerate()
727 .map(|(idx, _)| ChildState::new(num_rows, idx as u32))
728 .collect(),
729 child_fields,
730 data_type,
731 num_rows,
732 }
733 }
734
735 async fn do_wait_for_loaded(&mut self, loaded_need: u64) -> Result<()> {
736 let mut wait_orders = self
737 .children
738 .iter_mut()
739 .filter_map(|child| {
740 if child.rows_loaded <= loaded_need {
741 Some(WaitOrder(child))
742 } else {
743 None
744 }
745 })
746 .collect::<BinaryHeap<_>>();
747 while !wait_orders.is_empty() {
748 let next_waiter = wait_orders.pop().unwrap();
749 let next_highest = wait_orders
750 .peek()
751 .map(|w| w.0.rows_loaded)
752 .unwrap_or(u64::MAX);
753 let limit = loaded_need.min(next_highest);
756 next_waiter.0.wait_for_loaded(limit).await?;
757 log::trace!(
758 "Struct child {} finished await pass and now {} are loaded",
759 next_waiter.0.field_index,
760 next_waiter.0.rows_loaded
761 );
762 if next_waiter.0.rows_loaded <= loaded_need {
763 wait_orders.push(next_waiter);
764 }
765 }
766 Ok(())
767 }
768}
769
770impl LogicalPageDecoder for SimpleStructDecoder {
771 fn accept_child(&mut self, mut child: DecoderReady) -> Result<()> {
772 let child_idx = child.path.pop_front().unwrap();
774 if child.path.is_empty() {
775 self.children[child_idx as usize]
777 .scheduled
778 .push_back(child.decoder);
779 } else {
780 let intended = self.children[child_idx as usize].scheduled.back_mut().ok_or_else(|| Error::Internal { message: format!("Decoder scheduled for child at index {} but we don't have any child at that index yet", child_idx), location: location!() })?;
782 intended.accept_child(child)?;
783 }
784 Ok(())
785 }
786
787 fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<Result<()>> {
788 self.do_wait_for_loaded(loaded_need).boxed()
789 }
790
791 fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
792 let child_tasks = self
793 .children
794 .iter_mut()
795 .map(|child| child.drain(num_rows))
796 .collect::<Result<Vec<_>>>()?;
797 let num_rows = child_tasks[0].num_rows;
798 debug_assert!(child_tasks.iter().all(|task| task.num_rows == num_rows));
799 Ok(NextDecodeTask {
800 task: Box::new(SimpleStructDecodeTask {
801 children: child_tasks,
802 child_fields: self.child_fields.clone(),
803 }),
804 num_rows,
805 })
806 }
807
808 fn rows_loaded(&self) -> u64 {
809 self.children.iter().map(|c| c.rows_loaded).min().unwrap()
810 }
811
812 fn rows_drained(&self) -> u64 {
813 debug_assert!(self
815 .children
816 .iter()
817 .all(|c| c.rows_drained == self.children[0].rows_drained));
818 self.children[0].rows_drained
819 }
820
821 fn num_rows(&self) -> u64 {
822 self.num_rows
823 }
824
825 fn data_type(&self) -> &DataType {
826 &self.data_type
827 }
828}
829
830struct SimpleStructDecodeTask {
831 children: Vec<CompositeDecodeTask>,
832 child_fields: Fields,
833}
834
835impl DecodeArrayTask for SimpleStructDecodeTask {
836 fn decode(self: Box<Self>) -> Result<ArrayRef> {
837 let child_arrays = self
838 .children
839 .into_iter()
840 .map(|child| child.decode())
841 .collect::<Result<Vec<_>>>()?;
842 Ok(Arc::new(StructArray::try_new(
843 self.child_fields,
844 child_arrays,
845 None,
846 )?))
847 }
848}
849
850pub struct StructStructuralEncoder {
855 children: Vec<Box<dyn FieldEncoder>>,
856}
857
858impl StructStructuralEncoder {
859 pub fn new(children: Vec<Box<dyn FieldEncoder>>) -> Self {
860 Self { children }
861 }
862}
863
864impl FieldEncoder for StructStructuralEncoder {
865 fn maybe_encode(
866 &mut self,
867 array: ArrayRef,
868 external_buffers: &mut OutOfLineBuffers,
869 mut repdef: RepDefBuilder,
870 row_number: u64,
871 num_rows: u64,
872 ) -> Result<Vec<EncodeTask>> {
873 let struct_array = array.as_struct();
874 if let Some(validity) = struct_array.nulls() {
875 repdef.add_validity_bitmap(validity.clone());
876 } else {
877 repdef.add_no_null(struct_array.len());
878 }
879 let child_tasks = self
880 .children
881 .iter_mut()
882 .zip(struct_array.columns().iter())
883 .map(|(encoder, arr)| {
884 encoder.maybe_encode(
885 arr.clone(),
886 external_buffers,
887 repdef.clone(),
888 row_number,
889 num_rows,
890 )
891 })
892 .collect::<Result<Vec<_>>>()?;
893 Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
894 }
895
896 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
897 self.children
898 .iter_mut()
899 .map(|encoder| encoder.flush(external_buffers))
900 .flatten_ok()
901 .collect::<Result<Vec<_>>>()
902 }
903
904 fn num_columns(&self) -> u32 {
905 self.children
906 .iter()
907 .map(|child| child.num_columns())
908 .sum::<u32>()
909 }
910
911 fn finish(
912 &mut self,
913 external_buffers: &mut OutOfLineBuffers,
914 ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
915 let mut child_columns = self
916 .children
917 .iter_mut()
918 .map(|child| child.finish(external_buffers))
919 .collect::<FuturesOrdered<_>>();
920 async move {
921 let mut encoded_columns = Vec::with_capacity(child_columns.len());
922 while let Some(child_cols) = child_columns.next().await {
923 encoded_columns.extend(child_cols?);
924 }
925 Ok(encoded_columns)
926 }
927 .boxed()
928 }
929}
930
931pub struct StructFieldEncoder {
932 children: Vec<Box<dyn FieldEncoder>>,
933 column_index: u32,
934 num_rows_seen: u64,
935}
936
937impl StructFieldEncoder {
938 #[allow(dead_code)]
939 pub fn new(children: Vec<Box<dyn FieldEncoder>>, column_index: u32) -> Self {
940 Self {
941 children,
942 column_index,
943 num_rows_seen: 0,
944 }
945 }
946}
947
948impl FieldEncoder for StructFieldEncoder {
949 fn maybe_encode(
950 &mut self,
951 array: ArrayRef,
952 external_buffers: &mut OutOfLineBuffers,
953 repdef: RepDefBuilder,
954 row_number: u64,
955 num_rows: u64,
956 ) -> Result<Vec<EncodeTask>> {
957 self.num_rows_seen += array.len() as u64;
958 let struct_array = array.as_struct();
959 let child_tasks = self
960 .children
961 .iter_mut()
962 .zip(struct_array.columns().iter())
963 .map(|(encoder, arr)| {
964 encoder.maybe_encode(
965 arr.clone(),
966 external_buffers,
967 repdef.clone(),
968 row_number,
969 num_rows,
970 )
971 })
972 .collect::<Result<Vec<_>>>()?;
973 Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
974 }
975
976 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
977 let child_tasks = self
978 .children
979 .iter_mut()
980 .map(|encoder| encoder.flush(external_buffers))
981 .collect::<Result<Vec<_>>>()?;
982 Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
983 }
984
985 fn num_columns(&self) -> u32 {
986 self.children
987 .iter()
988 .map(|child| child.num_columns())
989 .sum::<u32>()
990 + 1
991 }
992
993 fn finish(
994 &mut self,
995 external_buffers: &mut OutOfLineBuffers,
996 ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
997 let mut child_columns = self
998 .children
999 .iter_mut()
1000 .map(|child| child.finish(external_buffers))
1001 .collect::<FuturesOrdered<_>>();
1002 let num_rows_seen = self.num_rows_seen;
1003 let column_index = self.column_index;
1004 async move {
1005 let mut columns = Vec::new();
1006 let mut header = EncodedColumn::default();
1008 header.final_pages.push(EncodedPage {
1009 data: Vec::new(),
1010 description: PageEncoding::Legacy(pb::ArrayEncoding {
1011 array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
1012 pb::SimpleStruct {},
1013 )),
1014 }),
1015 num_rows: num_rows_seen,
1016 column_idx: column_index,
1017 row_number: 0, });
1019 columns.push(header);
1020 while let Some(child_cols) = child_columns.next().await {
1022 columns.extend(child_cols?);
1023 }
1024 Ok(columns)
1025 }
1026 .boxed()
1027 }
1028}
1029
1030#[cfg(test)]
1031mod tests {
1032
1033 use std::{collections::HashMap, sync::Arc};
1034
1035 use arrow_array::{
1036 builder::{Int32Builder, ListBuilder},
1037 Array, ArrayRef, Int32Array, StructArray,
1038 };
1039 use arrow_buffer::NullBuffer;
1040 use arrow_schema::{DataType, Field, Fields};
1041
1042 use crate::{
1043 testing::{check_round_trip_encoding_of_data, check_round_trip_encoding_random, TestCases},
1044 version::LanceFileVersion,
1045 };
1046
1047 #[test_log::test(tokio::test)]
1048 async fn test_simple_struct() {
1049 let data_type = DataType::Struct(Fields::from(vec![
1050 Field::new("a", DataType::Int32, false),
1051 Field::new("b", DataType::Int32, false),
1052 ]));
1053 let field = Field::new("", data_type, false);
1054 check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
1055 }
1056
1057 #[test_log::test(tokio::test)]
1058 async fn test_nullable_struct() {
1059 let inner_fields = Fields::from(vec![
1077 Field::new("x", DataType::Int32, false),
1078 Field::new("y", DataType::Int32, true),
1079 ]);
1080 let inner_struct = DataType::Struct(inner_fields.clone());
1081 let outer_fields = Fields::from(vec![
1082 Field::new("score", DataType::Int32, true),
1083 Field::new("location", inner_struct, true),
1084 ]);
1085
1086 let x_vals = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4), Some(5)]);
1087 let y_vals = Int32Array::from(vec![Some(6), None, Some(8), Some(9), Some(10)]);
1088 let scores = Int32Array::from(vec![None, Some(12), Some(13), Some(14), Some(15)]);
1089
1090 let location_validity = NullBuffer::from(vec![true, true, true, false, true]);
1091 let locations = StructArray::new(
1092 inner_fields,
1093 vec![Arc::new(x_vals), Arc::new(y_vals)],
1094 Some(location_validity),
1095 );
1096
1097 let rows_validity = NullBuffer::from(vec![true, true, true, true, false]);
1098 let rows = StructArray::new(
1099 outer_fields,
1100 vec![Arc::new(scores), Arc::new(locations)],
1101 Some(rows_validity),
1102 );
1103
1104 let test_cases = TestCases::default().with_file_version(LanceFileVersion::V2_1);
1105
1106 check_round_trip_encoding_of_data(vec![Arc::new(rows)], &test_cases, HashMap::new()).await;
1107 }
1108
1109 #[test_log::test(tokio::test)]
1110 async fn test_struct_list() {
1111 let data_type = DataType::Struct(Fields::from(vec![
1112 Field::new(
1113 "inner_list",
1114 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
1115 true,
1116 ),
1117 Field::new("outer_int", DataType::Int32, true),
1118 ]));
1119 let field = Field::new("row", data_type, false);
1120 check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
1121 }
1122
1123 #[test_log::test(tokio::test)]
1124 async fn test_complicated_struct() {
1125 let data_type = DataType::Struct(Fields::from(vec![
1126 Field::new("int", DataType::Int32, true),
1127 Field::new(
1128 "inner",
1129 DataType::Struct(Fields::from(vec![
1130 Field::new("inner_int", DataType::Int32, true),
1131 Field::new(
1132 "inner_list",
1133 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
1134 true,
1135 ),
1136 ])),
1137 true,
1138 ),
1139 Field::new("outer_binary", DataType::Binary, true),
1140 ]));
1141 let field = Field::new("row", data_type, false);
1142 check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
1143 }
1144
1145 #[test_log::test(tokio::test)]
1146 async fn test_ragged_scheduling() {
1147 let items_builder = Int32Builder::new();
1151 let mut list_builder = ListBuilder::new(items_builder);
1152 for _ in 0..10000 {
1153 list_builder.append_null();
1154 }
1155 let list_array = Arc::new(list_builder.finish());
1156 let int_array = Arc::new(Int32Array::from_iter_values(0..10000));
1157 let fields = vec![
1158 Field::new("", list_array.data_type().clone(), true),
1159 Field::new("", int_array.data_type().clone(), true),
1160 ];
1161 let struct_array = Arc::new(StructArray::new(
1162 Fields::from(fields),
1163 vec![list_array, int_array],
1164 None,
1165 )) as ArrayRef;
1166 let struct_arrays = (0..10000)
1167 .step_by(437)
1169 .map(|offset| struct_array.slice(offset, 437.min(10000 - offset)))
1170 .collect::<Vec<_>>();
1171 check_round_trip_encoding_of_data(struct_arrays, &TestCases::default(), HashMap::new())
1172 .await;
1173 }
1174}