1use std::{
5 collections::{BinaryHeap, VecDeque},
6 ops::Range,
7 sync::Arc,
8};
9
10use crate::{
11 decoder::{
12 DecodeArrayTask, DecodedArray, DecoderReady, FieldScheduler, FilterExpression, LoadedPage,
13 LogicalPageDecoder, MessageType, NextDecodeTask, PageEncoding, PriorityRange,
14 ScheduledScanLine, SchedulerContext, SchedulingJob, StructuralDecodeArrayTask,
15 StructuralFieldDecoder, StructuralFieldScheduler, StructuralSchedulingJob,
16 },
17 encoder::{EncodeTask, EncodedColumn, EncodedPage, FieldEncoder, OutOfLineBuffers},
18 format::pb,
19 repdef::RepDefBuilder,
20};
21use arrow_array::{cast::AsArray, Array, ArrayRef, StructArray};
22use arrow_schema::{DataType, Field, Fields};
23use futures::{
24 future::BoxFuture,
25 stream::{FuturesOrdered, FuturesUnordered},
26 FutureExt, StreamExt, TryStreamExt,
27};
28use itertools::Itertools;
29use lance_arrow::deepcopy::deep_copy_nulls;
30use lance_arrow::FieldExt;
31use lance_core::{Error, Result};
32use log::trace;
33use snafu::location;
34
35use super::{list::StructuralListDecoder, primitive::StructuralPrimitiveFieldDecoder};
36
37#[derive(Debug)]
38struct SchedulingJobWithStatus<'a> {
39 col_idx: u32,
40 col_name: &'a str,
41 job: Box<dyn SchedulingJob + 'a>,
42 rows_scheduled: u64,
43 rows_remaining: u64,
44}
45
46impl PartialEq for SchedulingJobWithStatus<'_> {
47 fn eq(&self, other: &Self) -> bool {
48 self.col_idx == other.col_idx
49 }
50}
51
52impl Eq for SchedulingJobWithStatus<'_> {}
53
54impl PartialOrd for SchedulingJobWithStatus<'_> {
55 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
56 Some(self.cmp(other))
57 }
58}
59
60impl Ord for SchedulingJobWithStatus<'_> {
61 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
62 other.rows_scheduled.cmp(&self.rows_scheduled)
64 }
65}
66
67#[derive(Debug)]
68struct EmptyStructDecodeTask {
69 num_rows: u64,
70}
71
72impl DecodeArrayTask for EmptyStructDecodeTask {
73 fn decode(self: Box<Self>) -> Result<ArrayRef> {
74 Ok(Arc::new(StructArray::new_empty_fields(
75 self.num_rows as usize,
76 None,
77 )))
78 }
79}
80
81#[derive(Debug)]
82struct EmptyStructDecoder {
83 num_rows: u64,
84 rows_drained: u64,
85 data_type: DataType,
86}
87
88impl EmptyStructDecoder {
89 fn new(num_rows: u64) -> Self {
90 Self {
91 num_rows,
92 rows_drained: 0,
93 data_type: DataType::Struct(Fields::from(Vec::<Field>::default())),
94 }
95 }
96}
97
98impl LogicalPageDecoder for EmptyStructDecoder {
99 fn wait_for_loaded(&mut self, _loaded_need: u64) -> BoxFuture<Result<()>> {
100 Box::pin(std::future::ready(Ok(())))
101 }
102 fn rows_loaded(&self) -> u64 {
103 self.num_rows
104 }
105 fn rows_unloaded(&self) -> u64 {
106 0
107 }
108 fn num_rows(&self) -> u64 {
109 self.num_rows
110 }
111 fn rows_drained(&self) -> u64 {
112 self.rows_drained
113 }
114 fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
115 self.rows_drained += num_rows;
116 Ok(NextDecodeTask {
117 num_rows,
118 task: Box::new(EmptyStructDecodeTask { num_rows }),
119 })
120 }
121 fn data_type(&self) -> &DataType {
122 &self.data_type
123 }
124}
125
126#[derive(Debug)]
127struct EmptyStructSchedulerJob {
128 num_rows: u64,
129}
130
131impl SchedulingJob for EmptyStructSchedulerJob {
132 fn schedule_next(
133 &mut self,
134 context: &mut SchedulerContext,
135 _priority: &dyn PriorityRange,
136 ) -> Result<ScheduledScanLine> {
137 let empty_decoder = Box::new(EmptyStructDecoder::new(self.num_rows));
138 let struct_decoder = context.locate_decoder(empty_decoder);
139 Ok(ScheduledScanLine {
140 decoders: vec![MessageType::DecoderReady(struct_decoder)],
141 rows_scheduled: self.num_rows,
142 })
143 }
144
145 fn num_rows(&self) -> u64 {
146 self.num_rows
147 }
148}
149
150#[derive(Debug)]
157struct SimpleStructSchedulerJob<'a> {
158 scheduler: &'a SimpleStructScheduler,
159 children: BinaryHeap<SchedulingJobWithStatus<'a>>,
161 rows_scheduled: u64,
162 num_rows: u64,
163 initialized: bool,
164}
165
166impl<'a> SimpleStructSchedulerJob<'a> {
167 fn new(
168 scheduler: &'a SimpleStructScheduler,
169 children: Vec<Box<dyn SchedulingJob + 'a>>,
170 num_rows: u64,
171 ) -> Self {
172 let children = children
173 .into_iter()
174 .enumerate()
175 .map(|(idx, job)| SchedulingJobWithStatus {
176 col_idx: idx as u32,
177 col_name: scheduler.child_fields[idx].name(),
178 job,
179 rows_scheduled: 0,
180 rows_remaining: num_rows,
181 })
182 .collect::<BinaryHeap<_>>();
183 Self {
184 scheduler,
185 children,
186 rows_scheduled: 0,
187 num_rows,
188 initialized: false,
189 }
190 }
191}
192
193impl SchedulingJob for SimpleStructSchedulerJob<'_> {
194 fn schedule_next(
195 &mut self,
196 mut context: &mut SchedulerContext,
197 priority: &dyn PriorityRange,
198 ) -> Result<ScheduledScanLine> {
199 let mut decoders = Vec::new();
200 if !self.initialized {
201 let struct_decoder = Box::new(SimpleStructDecoder::new(
204 self.scheduler.child_fields.clone(),
205 self.num_rows,
206 ));
207 let struct_decoder = context.locate_decoder(struct_decoder);
208 decoders.push(MessageType::DecoderReady(struct_decoder));
209 self.initialized = true;
210 }
211 let old_rows_scheduled = self.rows_scheduled;
212 while old_rows_scheduled == self.rows_scheduled {
215 let mut next_child = self.children.pop().unwrap();
216 trace!("Scheduling more rows for child {}", next_child.col_idx);
217 let scoped = context.push(next_child.col_name, next_child.col_idx);
218 let child_scan = next_child.job.schedule_next(scoped.context, priority)?;
219 trace!(
220 "Scheduled {} rows for child {}",
221 child_scan.rows_scheduled,
222 next_child.col_idx
223 );
224 next_child.rows_scheduled += child_scan.rows_scheduled;
225 next_child.rows_remaining -= child_scan.rows_scheduled;
226 decoders.extend(child_scan.decoders);
227 self.children.push(next_child);
228 self.rows_scheduled = self.children.peek().unwrap().rows_scheduled;
229 context = scoped.pop();
230 }
231 let struct_rows_scheduled = self.rows_scheduled - old_rows_scheduled;
232 Ok(ScheduledScanLine {
233 decoders,
234 rows_scheduled: struct_rows_scheduled,
235 })
236 }
237
238 fn num_rows(&self) -> u64 {
239 self.num_rows
240 }
241}
242
243#[derive(Debug)]
254pub struct SimpleStructScheduler {
255 children: Vec<Arc<dyn FieldScheduler>>,
256 child_fields: Fields,
257 num_rows: u64,
258}
259
260impl SimpleStructScheduler {
261 pub fn new(
262 children: Vec<Arc<dyn FieldScheduler>>,
263 child_fields: Fields,
264 num_rows: u64,
265 ) -> Self {
266 let num_rows = children
267 .first()
268 .map(|child| child.num_rows())
269 .unwrap_or(num_rows);
270 debug_assert!(children.iter().all(|child| child.num_rows() == num_rows));
271 Self {
272 children,
273 child_fields,
274 num_rows,
275 }
276 }
277}
278
279impl FieldScheduler for SimpleStructScheduler {
280 fn schedule_ranges<'a>(
281 &'a self,
282 ranges: &[Range<u64>],
283 filter: &FilterExpression,
284 ) -> Result<Box<dyn SchedulingJob + 'a>> {
285 if self.children.is_empty() {
286 return Ok(Box::new(EmptyStructSchedulerJob {
287 num_rows: ranges.iter().map(|r| r.end - r.start).sum(),
288 }));
289 }
290 let child_schedulers = self
291 .children
292 .iter()
293 .map(|child| child.schedule_ranges(ranges, filter))
294 .collect::<Result<Vec<_>>>()?;
295 let num_rows = child_schedulers[0].num_rows();
296 Ok(Box::new(SimpleStructSchedulerJob::new(
297 self,
298 child_schedulers,
299 num_rows,
300 )))
301 }
302
303 fn num_rows(&self) -> u64 {
304 self.num_rows
305 }
306
307 fn initialize<'a>(
308 &'a self,
309 _filter: &'a FilterExpression,
310 _context: &'a SchedulerContext,
311 ) -> BoxFuture<'a, Result<()>> {
312 let futures = self
313 .children
314 .iter()
315 .map(|child| child.initialize(_filter, _context))
316 .collect::<FuturesUnordered<_>>();
317 async move {
318 futures
319 .map(|res| res.map(|_| ()))
320 .try_collect::<Vec<_>>()
321 .await?;
322 Ok(())
323 }
324 .boxed()
325 }
326}
327
328#[derive(Debug)]
329struct StructuralSchedulingJobWithStatus<'a> {
330 col_idx: u32,
331 col_name: &'a str,
332 job: Box<dyn StructuralSchedulingJob + 'a>,
333 rows_scheduled: u64,
334 rows_remaining: u64,
335}
336
337impl PartialEq for StructuralSchedulingJobWithStatus<'_> {
338 fn eq(&self, other: &Self) -> bool {
339 self.col_idx == other.col_idx
340 }
341}
342
343impl Eq for StructuralSchedulingJobWithStatus<'_> {}
344
345impl PartialOrd for StructuralSchedulingJobWithStatus<'_> {
346 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
347 Some(self.cmp(other))
348 }
349}
350
351impl Ord for StructuralSchedulingJobWithStatus<'_> {
352 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
353 other.rows_scheduled.cmp(&self.rows_scheduled)
355 }
356}
357
358#[derive(Debug)]
365struct RepDefStructSchedulingJob<'a> {
366 children: BinaryHeap<StructuralSchedulingJobWithStatus<'a>>,
368 rows_scheduled: u64,
369}
370
371impl<'a> RepDefStructSchedulingJob<'a> {
372 fn new(
373 scheduler: &'a StructuralStructScheduler,
374 children: Vec<Box<dyn StructuralSchedulingJob + 'a>>,
375 num_rows: u64,
376 ) -> Self {
377 let children = children
378 .into_iter()
379 .enumerate()
380 .map(|(idx, job)| StructuralSchedulingJobWithStatus {
381 col_idx: idx as u32,
382 col_name: scheduler.child_fields[idx].name(),
383 job,
384 rows_scheduled: 0,
385 rows_remaining: num_rows,
386 })
387 .collect::<BinaryHeap<_>>();
388 Self {
389 children,
390 rows_scheduled: 0,
391 }
392 }
393}
394
395impl StructuralSchedulingJob for RepDefStructSchedulingJob<'_> {
396 fn schedule_next(
397 &mut self,
398 mut context: &mut SchedulerContext,
399 ) -> Result<Option<ScheduledScanLine>> {
400 let mut decoders = Vec::new();
401 let old_rows_scheduled = self.rows_scheduled;
402 while old_rows_scheduled == self.rows_scheduled {
405 let mut next_child = self.children.pop().unwrap();
406 let scoped = context.push(next_child.col_name, next_child.col_idx);
407 let child_scan = next_child.job.schedule_next(scoped.context)?;
408 if child_scan.is_none() {
411 return Ok(None);
412 }
413 let child_scan = child_scan.unwrap();
414
415 trace!(
416 "Scheduled {} rows for child {}",
417 child_scan.rows_scheduled,
418 next_child.col_idx
419 );
420 next_child.rows_scheduled += child_scan.rows_scheduled;
421 next_child.rows_remaining -= child_scan.rows_scheduled;
422 decoders.extend(child_scan.decoders);
423 self.children.push(next_child);
424 self.rows_scheduled = self.children.peek().unwrap().rows_scheduled;
425 context = scoped.pop();
426 }
427 let struct_rows_scheduled = self.rows_scheduled - old_rows_scheduled;
428 Ok(Some(ScheduledScanLine {
429 decoders,
430 rows_scheduled: struct_rows_scheduled,
431 }))
432 }
433}
434
435#[derive(Debug)]
446pub struct StructuralStructScheduler {
447 children: Vec<Box<dyn StructuralFieldScheduler>>,
448 child_fields: Fields,
449}
450
451impl StructuralStructScheduler {
452 pub fn new(children: Vec<Box<dyn StructuralFieldScheduler>>, child_fields: Fields) -> Self {
453 debug_assert!(!children.is_empty());
454 Self {
455 children,
456 child_fields,
457 }
458 }
459}
460
461impl StructuralFieldScheduler for StructuralStructScheduler {
462 fn schedule_ranges<'a>(
463 &'a self,
464 ranges: &[Range<u64>],
465 filter: &FilterExpression,
466 ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
467 let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
468
469 let child_schedulers = self
470 .children
471 .iter()
472 .map(|child| child.schedule_ranges(ranges, filter))
473 .collect::<Result<Vec<_>>>()?;
474
475 Ok(Box::new(RepDefStructSchedulingJob::new(
476 self,
477 child_schedulers,
478 num_rows,
479 )))
480 }
481
482 fn initialize<'a>(
483 &'a mut self,
484 filter: &'a FilterExpression,
485 context: &'a SchedulerContext,
486 ) -> BoxFuture<'a, Result<()>> {
487 let children_initialization = self
488 .children
489 .iter_mut()
490 .map(|child| child.initialize(filter, context))
491 .collect::<FuturesUnordered<_>>();
492 async move {
493 children_initialization
494 .map(|res| res.map(|_| ()))
495 .try_collect::<Vec<_>>()
496 .await?;
497 Ok(())
498 }
499 .boxed()
500 }
501}
502
503#[derive(Debug)]
504struct ChildState {
505 scheduled: VecDeque<Box<dyn LogicalPageDecoder>>,
516 rows_loaded: u64,
518 rows_drained: u64,
520 rows_popped: u64,
522 num_rows: u64,
524 field_index: u32,
526}
527
528struct CompositeDecodeTask {
529 tasks: Vec<Box<dyn DecodeArrayTask>>,
531 num_rows: u64,
532 has_more: bool,
533}
534
535impl CompositeDecodeTask {
536 fn decode(self) -> Result<ArrayRef> {
537 let arrays = self
538 .tasks
539 .into_iter()
540 .map(|task| task.decode())
541 .collect::<Result<Vec<_>>>()?;
542 let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
543 Ok(arrow_select::concat::concat(&array_refs)?)
550 }
551}
552
553impl ChildState {
554 fn new(num_rows: u64, field_index: u32) -> Self {
555 Self {
556 scheduled: VecDeque::new(),
557 rows_loaded: 0,
558 rows_drained: 0,
559 rows_popped: 0,
560 num_rows,
561 field_index,
562 }
563 }
564
565 async fn wait_for_loaded(&mut self, loaded_need: u64) -> Result<()> {
570 trace!(
571 "Struct child {} waiting for more than {} rows to be loaded and {} are fully loaded already",
572 self.field_index,
573 loaded_need,
574 self.rows_loaded,
575 );
576 let mut fully_loaded = self.rows_popped;
577 for (page_idx, next_decoder) in self.scheduled.iter_mut().enumerate() {
578 if next_decoder.rows_unloaded() > 0 {
579 let mut current_need = loaded_need;
580 current_need -= fully_loaded;
581 let rows_in_page = next_decoder.num_rows();
582 let need_for_page = (rows_in_page - 1).min(current_need);
583 trace!(
584 "Struct child {} page {} will wait until more than {} rows loaded from page with {} rows",
585 self.field_index,
586 page_idx,
587 need_for_page,
588 rows_in_page,
589 );
590 next_decoder.wait_for_loaded(need_for_page).await?;
596 let now_loaded = next_decoder.rows_loaded();
597 fully_loaded += now_loaded;
598 trace!(
599 "Struct child {} page {} await and now has {} loaded rows and we have {} fully loaded",
600 self.field_index,
601 page_idx,
602 now_loaded,
603 fully_loaded
604 );
605 } else {
606 fully_loaded += next_decoder.num_rows();
607 }
608 if fully_loaded > loaded_need {
609 break;
610 }
611 }
612 self.rows_loaded = fully_loaded;
613 trace!(
614 "Struct child {} loaded {} new rows and now {} are loaded",
615 self.field_index,
616 fully_loaded,
617 self.rows_loaded
618 );
619 Ok(())
620 }
621
622 fn drain(&mut self, num_rows: u64) -> Result<CompositeDecodeTask> {
623 trace!("Struct draining {} rows", num_rows);
624
625 trace!(
626 "Draining {} rows from struct page with {} rows already drained",
627 num_rows,
628 self.rows_drained
629 );
630 let mut remaining = num_rows;
631 let mut composite = CompositeDecodeTask {
632 tasks: Vec::new(),
633 num_rows: 0,
634 has_more: true,
635 };
636 while remaining > 0 {
637 let next = self.scheduled.front_mut().unwrap();
638 let rows_to_take = remaining.min(next.rows_left());
639 let next_task = next.drain(rows_to_take)?;
640 if next.rows_left() == 0 {
641 trace!("Completely drained page");
642 self.rows_popped += next.num_rows();
643 self.scheduled.pop_front();
644 }
645 remaining -= rows_to_take;
646 composite.tasks.push(next_task.task);
647 composite.num_rows += next_task.num_rows;
648 }
649 self.rows_drained += num_rows;
650 composite.has_more = self.rows_drained != self.num_rows;
651 Ok(composite)
652 }
653}
654
655struct WaitOrder<'a>(&'a mut ChildState);
657
658impl Eq for WaitOrder<'_> {}
659impl PartialEq for WaitOrder<'_> {
660 fn eq(&self, other: &Self) -> bool {
661 self.0.rows_loaded == other.0.rows_loaded
662 }
663}
664impl Ord for WaitOrder<'_> {
665 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
666 other.0.rows_loaded.cmp(&self.0.rows_loaded)
668 }
669}
670impl PartialOrd for WaitOrder<'_> {
671 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
672 Some(self.cmp(other))
673 }
674}
675
676#[derive(Debug)]
677pub struct StructuralStructDecoder {
678 children: Vec<Box<dyn StructuralFieldDecoder>>,
679 data_type: DataType,
680 child_fields: Fields,
681 is_root: bool,
683}
684
685impl StructuralStructDecoder {
686 pub fn new(fields: Fields, should_validate: bool, is_root: bool) -> Self {
687 let children = fields
688 .iter()
689 .map(|field| Self::field_to_decoder(field, should_validate))
690 .collect();
691 let data_type = DataType::Struct(fields.clone());
692 Self {
693 data_type,
694 children,
695 child_fields: fields,
696 is_root,
697 }
698 }
699
700 fn field_to_decoder(
701 field: &Arc<arrow_schema::Field>,
702 should_validate: bool,
703 ) -> Box<dyn StructuralFieldDecoder> {
704 match field.data_type() {
705 DataType::Struct(fields) => {
706 if field.is_packed_struct() {
707 let decoder =
708 StructuralPrimitiveFieldDecoder::new(&field.clone(), should_validate);
709 Box::new(decoder)
710 } else {
711 Box::new(Self::new(fields.clone(), should_validate, false))
712 }
713 }
714 DataType::List(child_field) | DataType::LargeList(child_field) => {
715 let child_decoder = Self::field_to_decoder(child_field, should_validate);
716 Box::new(StructuralListDecoder::new(
717 child_decoder,
718 field.data_type().clone(),
719 ))
720 }
721 DataType::RunEndEncoded(_, _) => todo!(),
722 DataType::ListView(_) | DataType::LargeListView(_) => todo!(),
723 DataType::Map(_, _) => todo!(),
724 DataType::Union(_, _) => todo!(),
725 _ => Box::new(StructuralPrimitiveFieldDecoder::new(field, should_validate)),
726 }
727 }
728
729 pub fn drain_batch_task(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
730 let array_drain = self.drain(num_rows)?;
731 Ok(NextDecodeTask {
732 num_rows,
733 task: Box::new(array_drain),
734 })
735 }
736}
737
738impl StructuralFieldDecoder for StructuralStructDecoder {
739 fn accept_page(&mut self, mut child: LoadedPage) -> Result<()> {
740 let child_idx = child.path.pop_front().unwrap();
742 self.children[child_idx as usize].accept_page(child)?;
744 Ok(())
745 }
746
747 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
748 let child_tasks = self
749 .children
750 .iter_mut()
751 .map(|child| child.drain(num_rows))
752 .collect::<Result<Vec<_>>>()?;
753 Ok(Box::new(RepDefStructDecodeTask {
754 children: child_tasks,
755 child_fields: self.child_fields.clone(),
756 is_root: self.is_root,
757 }))
758 }
759
760 fn data_type(&self) -> &DataType {
761 &self.data_type
762 }
763}
764
765#[derive(Debug)]
766struct RepDefStructDecodeTask {
767 children: Vec<Box<dyn StructuralDecodeArrayTask>>,
768 child_fields: Fields,
769 is_root: bool,
770}
771
772impl StructuralDecodeArrayTask for RepDefStructDecodeTask {
773 fn decode(self: Box<Self>) -> Result<DecodedArray> {
774 let arrays = self
775 .children
776 .into_iter()
777 .map(|task| task.decode())
778 .collect::<Result<Vec<_>>>()?;
779 let mut children = Vec::with_capacity(arrays.len());
780 let mut arrays_iter = arrays.into_iter();
781 let first_array = arrays_iter.next().unwrap();
782 let length = first_array.array.len();
783
784 let mut repdef = first_array.repdef;
786 children.push(first_array.array);
787
788 for array in arrays_iter {
789 debug_assert_eq!(length, array.array.len());
790 children.push(array.array);
791 }
792
793 let validity = if self.is_root {
794 None
795 } else {
796 repdef.unravel_validity(length)
797 };
798 let array = StructArray::new(self.child_fields, children, validity);
799 Ok(DecodedArray {
800 array: Arc::new(array),
801 repdef,
802 })
803 }
804}
805
806#[derive(Debug)]
807pub struct SimpleStructDecoder {
808 children: Vec<ChildState>,
809 child_fields: Fields,
810 data_type: DataType,
811 num_rows: u64,
812}
813
814impl SimpleStructDecoder {
815 pub fn new(child_fields: Fields, num_rows: u64) -> Self {
816 let data_type = DataType::Struct(child_fields.clone());
817 Self {
818 children: child_fields
819 .iter()
820 .enumerate()
821 .map(|(idx, _)| ChildState::new(num_rows, idx as u32))
822 .collect(),
823 child_fields,
824 data_type,
825 num_rows,
826 }
827 }
828
829 async fn do_wait_for_loaded(&mut self, loaded_need: u64) -> Result<()> {
830 let mut wait_orders = self
831 .children
832 .iter_mut()
833 .filter_map(|child| {
834 if child.rows_loaded <= loaded_need {
835 Some(WaitOrder(child))
836 } else {
837 None
838 }
839 })
840 .collect::<BinaryHeap<_>>();
841 while !wait_orders.is_empty() {
842 let next_waiter = wait_orders.pop().unwrap();
843 let next_highest = wait_orders
844 .peek()
845 .map(|w| w.0.rows_loaded)
846 .unwrap_or(u64::MAX);
847 let limit = loaded_need.min(next_highest);
850 next_waiter.0.wait_for_loaded(limit).await?;
851 log::trace!(
852 "Struct child {} finished await pass and now {} are loaded",
853 next_waiter.0.field_index,
854 next_waiter.0.rows_loaded
855 );
856 if next_waiter.0.rows_loaded <= loaded_need {
857 wait_orders.push(next_waiter);
858 }
859 }
860 Ok(())
861 }
862}
863
864impl LogicalPageDecoder for SimpleStructDecoder {
865 fn accept_child(&mut self, mut child: DecoderReady) -> Result<()> {
866 let child_idx = child.path.pop_front().unwrap();
868 if child.path.is_empty() {
869 self.children[child_idx as usize]
871 .scheduled
872 .push_back(child.decoder);
873 } else {
874 let intended = self.children[child_idx as usize].scheduled.back_mut().ok_or_else(|| Error::Internal { message: format!("Decoder scheduled for child at index {} but we don't have any child at that index yet", child_idx), location: location!() })?;
876 intended.accept_child(child)?;
877 }
878 Ok(())
879 }
880
881 fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<Result<()>> {
882 self.do_wait_for_loaded(loaded_need).boxed()
883 }
884
885 fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
886 let child_tasks = self
887 .children
888 .iter_mut()
889 .map(|child| child.drain(num_rows))
890 .collect::<Result<Vec<_>>>()?;
891 let num_rows = child_tasks[0].num_rows;
892 debug_assert!(child_tasks.iter().all(|task| task.num_rows == num_rows));
893 Ok(NextDecodeTask {
894 task: Box::new(SimpleStructDecodeTask {
895 children: child_tasks,
896 child_fields: self.child_fields.clone(),
897 }),
898 num_rows,
899 })
900 }
901
902 fn rows_loaded(&self) -> u64 {
903 self.children.iter().map(|c| c.rows_loaded).min().unwrap()
904 }
905
906 fn rows_drained(&self) -> u64 {
907 debug_assert!(self
909 .children
910 .iter()
911 .all(|c| c.rows_drained == self.children[0].rows_drained));
912 self.children[0].rows_drained
913 }
914
915 fn num_rows(&self) -> u64 {
916 self.num_rows
917 }
918
919 fn data_type(&self) -> &DataType {
920 &self.data_type
921 }
922}
923
924struct SimpleStructDecodeTask {
925 children: Vec<CompositeDecodeTask>,
926 child_fields: Fields,
927}
928
929impl DecodeArrayTask for SimpleStructDecodeTask {
930 fn decode(self: Box<Self>) -> Result<ArrayRef> {
931 let child_arrays = self
932 .children
933 .into_iter()
934 .map(|child| child.decode())
935 .collect::<Result<Vec<_>>>()?;
936 Ok(Arc::new(StructArray::try_new(
937 self.child_fields,
938 child_arrays,
939 None,
940 )?))
941 }
942}
943
944pub struct StructStructuralEncoder {
949 keep_original_array: bool,
950 children: Vec<Box<dyn FieldEncoder>>,
951}
952
953impl StructStructuralEncoder {
954 pub fn new(keep_original_array: bool, children: Vec<Box<dyn FieldEncoder>>) -> Self {
955 Self {
956 keep_original_array,
957 children,
958 }
959 }
960}
961
962impl FieldEncoder for StructStructuralEncoder {
963 fn maybe_encode(
964 &mut self,
965 array: ArrayRef,
966 external_buffers: &mut OutOfLineBuffers,
967 mut repdef: RepDefBuilder,
968 row_number: u64,
969 num_rows: u64,
970 ) -> Result<Vec<EncodeTask>> {
971 let struct_array = array.as_struct();
972 if let Some(validity) = struct_array.nulls() {
973 if self.keep_original_array {
974 repdef.add_validity_bitmap(validity.clone())
975 } else {
976 repdef.add_validity_bitmap(deep_copy_nulls(Some(validity)).unwrap())
977 }
978 } else {
979 repdef.add_no_null(struct_array.len());
980 }
981 let child_tasks = self
982 .children
983 .iter_mut()
984 .zip(struct_array.columns().iter())
985 .map(|(encoder, arr)| {
986 encoder.maybe_encode(
987 arr.clone(),
988 external_buffers,
989 repdef.clone(),
990 row_number,
991 num_rows,
992 )
993 })
994 .collect::<Result<Vec<_>>>()?;
995 Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
996 }
997
998 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
999 self.children
1000 .iter_mut()
1001 .map(|encoder| encoder.flush(external_buffers))
1002 .flatten_ok()
1003 .collect::<Result<Vec<_>>>()
1004 }
1005
1006 fn num_columns(&self) -> u32 {
1007 self.children
1008 .iter()
1009 .map(|child| child.num_columns())
1010 .sum::<u32>()
1011 }
1012
1013 fn finish(
1014 &mut self,
1015 external_buffers: &mut OutOfLineBuffers,
1016 ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
1017 let mut child_columns = self
1018 .children
1019 .iter_mut()
1020 .map(|child| child.finish(external_buffers))
1021 .collect::<FuturesOrdered<_>>();
1022 async move {
1023 let mut encoded_columns = Vec::with_capacity(child_columns.len());
1024 while let Some(child_cols) = child_columns.next().await {
1025 encoded_columns.extend(child_cols?);
1026 }
1027 Ok(encoded_columns)
1028 }
1029 .boxed()
1030 }
1031}
1032
1033pub struct StructFieldEncoder {
1034 children: Vec<Box<dyn FieldEncoder>>,
1035 column_index: u32,
1036 num_rows_seen: u64,
1037}
1038
1039impl StructFieldEncoder {
1040 #[allow(dead_code)]
1041 pub fn new(children: Vec<Box<dyn FieldEncoder>>, column_index: u32) -> Self {
1042 Self {
1043 children,
1044 column_index,
1045 num_rows_seen: 0,
1046 }
1047 }
1048}
1049
1050impl FieldEncoder for StructFieldEncoder {
1051 fn maybe_encode(
1052 &mut self,
1053 array: ArrayRef,
1054 external_buffers: &mut OutOfLineBuffers,
1055 repdef: RepDefBuilder,
1056 row_number: u64,
1057 num_rows: u64,
1058 ) -> Result<Vec<EncodeTask>> {
1059 self.num_rows_seen += array.len() as u64;
1060 let struct_array = array.as_struct();
1061 let child_tasks = self
1062 .children
1063 .iter_mut()
1064 .zip(struct_array.columns().iter())
1065 .map(|(encoder, arr)| {
1066 encoder.maybe_encode(
1067 arr.clone(),
1068 external_buffers,
1069 repdef.clone(),
1070 row_number,
1071 num_rows,
1072 )
1073 })
1074 .collect::<Result<Vec<_>>>()?;
1075 Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
1076 }
1077
1078 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
1079 let child_tasks = self
1080 .children
1081 .iter_mut()
1082 .map(|encoder| encoder.flush(external_buffers))
1083 .collect::<Result<Vec<_>>>()?;
1084 Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
1085 }
1086
1087 fn num_columns(&self) -> u32 {
1088 self.children
1089 .iter()
1090 .map(|child| child.num_columns())
1091 .sum::<u32>()
1092 + 1
1093 }
1094
1095 fn finish(
1096 &mut self,
1097 external_buffers: &mut OutOfLineBuffers,
1098 ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
1099 let mut child_columns = self
1100 .children
1101 .iter_mut()
1102 .map(|child| child.finish(external_buffers))
1103 .collect::<FuturesOrdered<_>>();
1104 let num_rows_seen = self.num_rows_seen;
1105 let column_index = self.column_index;
1106 async move {
1107 let mut columns = Vec::new();
1108 let mut header = EncodedColumn::default();
1110 header.final_pages.push(EncodedPage {
1111 data: Vec::new(),
1112 description: PageEncoding::Legacy(pb::ArrayEncoding {
1113 array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
1114 pb::SimpleStruct {},
1115 )),
1116 }),
1117 num_rows: num_rows_seen,
1118 column_idx: column_index,
1119 row_number: 0, });
1121 columns.push(header);
1122 while let Some(child_cols) = child_columns.next().await {
1124 columns.extend(child_cols?);
1125 }
1126 Ok(columns)
1127 }
1128 .boxed()
1129 }
1130}
1131
1132#[cfg(test)]
1133mod tests {
1134
1135 use std::{collections::HashMap, sync::Arc};
1136
1137 use arrow_array::{
1138 builder::{Int32Builder, ListBuilder},
1139 Array, ArrayRef, Int32Array, StructArray,
1140 };
1141 use arrow_buffer::NullBuffer;
1142 use arrow_schema::{DataType, Field, Fields};
1143
1144 use crate::{
1145 testing::{check_round_trip_encoding_of_data, check_round_trip_encoding_random, TestCases},
1146 version::LanceFileVersion,
1147 };
1148
1149 #[test_log::test(tokio::test)]
1150 async fn test_simple_struct() {
1151 let data_type = DataType::Struct(Fields::from(vec![
1152 Field::new("a", DataType::Int32, false),
1153 Field::new("b", DataType::Int32, false),
1154 ]));
1155 let field = Field::new("", data_type, false);
1156 check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
1157 }
1158
1159 #[test_log::test(tokio::test)]
1160 async fn test_nullable_struct() {
1161 let inner_fields = Fields::from(vec![
1179 Field::new("x", DataType::Int32, false),
1180 Field::new("y", DataType::Int32, true),
1181 ]);
1182 let inner_struct = DataType::Struct(inner_fields.clone());
1183 let outer_fields = Fields::from(vec![
1184 Field::new("score", DataType::Int32, true),
1185 Field::new("location", inner_struct, true),
1186 ]);
1187
1188 let x_vals = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4), Some(5)]);
1189 let y_vals = Int32Array::from(vec![Some(6), None, Some(8), Some(9), Some(10)]);
1190 let scores = Int32Array::from(vec![None, Some(12), Some(13), Some(14), Some(15)]);
1191
1192 let location_validity = NullBuffer::from(vec![true, true, true, false, true]);
1193 let locations = StructArray::new(
1194 inner_fields,
1195 vec![Arc::new(x_vals), Arc::new(y_vals)],
1196 Some(location_validity),
1197 );
1198
1199 let rows_validity = NullBuffer::from(vec![true, true, true, true, false]);
1200 let rows = StructArray::new(
1201 outer_fields,
1202 vec![Arc::new(scores), Arc::new(locations)],
1203 Some(rows_validity),
1204 );
1205
1206 let test_cases = TestCases::default().with_file_version(LanceFileVersion::V2_1);
1207
1208 check_round_trip_encoding_of_data(vec![Arc::new(rows)], &test_cases, HashMap::new()).await;
1209 }
1210
1211 #[test_log::test(tokio::test)]
1212 async fn test_struct_list() {
1213 let data_type = DataType::Struct(Fields::from(vec![
1214 Field::new(
1215 "inner_list",
1216 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
1217 true,
1218 ),
1219 Field::new("outer_int", DataType::Int32, true),
1220 ]));
1221 let field = Field::new("row", data_type, false);
1222 check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
1223 }
1224
1225 #[test_log::test(tokio::test)]
1226 async fn test_empty_struct() {
1227 let data_type = DataType::Struct(Fields::from(Vec::<Field>::default()));
1230 let field = Field::new("row", data_type, false);
1231 check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
1232 }
1233
1234 #[test_log::test(tokio::test)]
1235 async fn test_complicated_struct() {
1236 let data_type = DataType::Struct(Fields::from(vec![
1237 Field::new("int", DataType::Int32, true),
1238 Field::new(
1239 "inner",
1240 DataType::Struct(Fields::from(vec![
1241 Field::new("inner_int", DataType::Int32, true),
1242 Field::new(
1243 "inner_list",
1244 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
1245 true,
1246 ),
1247 ])),
1248 true,
1249 ),
1250 Field::new("outer_binary", DataType::Binary, true),
1251 ]));
1252 let field = Field::new("row", data_type, false);
1253 check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
1254 }
1255
1256 #[test_log::test(tokio::test)]
1257 async fn test_ragged_scheduling() {
1258 let items_builder = Int32Builder::new();
1262 let mut list_builder = ListBuilder::new(items_builder);
1263 for _ in 0..10000 {
1264 list_builder.append_null();
1265 }
1266 let list_array = Arc::new(list_builder.finish());
1267 let int_array = Arc::new(Int32Array::from_iter_values(0..10000));
1268 let fields = vec![
1269 Field::new("", list_array.data_type().clone(), true),
1270 Field::new("", int_array.data_type().clone(), true),
1271 ];
1272 let struct_array = Arc::new(StructArray::new(
1273 Fields::from(fields),
1274 vec![list_array, int_array],
1275 None,
1276 )) as ArrayRef;
1277 let struct_arrays = (0..10000)
1278 .step_by(437)
1280 .map(|offset| struct_array.slice(offset, 437.min(10000 - offset)))
1281 .collect::<Vec<_>>();
1282 check_round_trip_encoding_of_data(struct_arrays, &TestCases::default(), HashMap::new())
1283 .await;
1284 }
1285}