lance_encoding/encodings/logical/
struct.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    collections::{BinaryHeap, VecDeque},
6    ops::Range,
7    sync::Arc,
8};
9
10use arrow_array::{cast::AsArray, Array, ArrayRef, StructArray};
11use arrow_schema::{DataType, Fields};
12use futures::{
13    future::BoxFuture,
14    stream::{FuturesOrdered, FuturesUnordered},
15    FutureExt, StreamExt, TryStreamExt,
16};
17use itertools::Itertools;
18use lance_arrow::FieldExt;
19use log::trace;
20use snafu::location;
21
22use crate::{
23    decoder::{
24        DecodeArrayTask, DecodedArray, DecoderReady, FieldScheduler, FilterExpression, LoadedPage,
25        LogicalPageDecoder, MessageType, NextDecodeTask, PageEncoding, PriorityRange,
26        ScheduledScanLine, SchedulerContext, SchedulingJob, StructuralDecodeArrayTask,
27        StructuralFieldDecoder, StructuralFieldScheduler, StructuralSchedulingJob,
28    },
29    encoder::{EncodeTask, EncodedColumn, EncodedPage, FieldEncoder, OutOfLineBuffers},
30    format::pb,
31    repdef::RepDefBuilder,
32};
33use lance_core::{Error, Result};
34
35use super::{list::StructuralListDecoder, primitive::StructuralPrimitiveFieldDecoder};
36
37#[derive(Debug)]
38struct SchedulingJobWithStatus<'a> {
39    col_idx: u32,
40    col_name: &'a str,
41    job: Box<dyn SchedulingJob + 'a>,
42    rows_scheduled: u64,
43    rows_remaining: u64,
44}
45
46impl PartialEq for SchedulingJobWithStatus<'_> {
47    fn eq(&self, other: &Self) -> bool {
48        self.col_idx == other.col_idx
49    }
50}
51
52impl Eq for SchedulingJobWithStatus<'_> {}
53
54impl PartialOrd for SchedulingJobWithStatus<'_> {
55    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
56        Some(self.cmp(other))
57    }
58}
59
60impl Ord for SchedulingJobWithStatus<'_> {
61    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
62        // Note this is reversed to make it min-heap
63        other.rows_scheduled.cmp(&self.rows_scheduled)
64    }
65}
66
67/// Scheduling job for struct data
68///
69/// The order in which we schedule the children is important.  We want to schedule the child
70/// with the least amount of data first.
71///
72/// This allows us to decode entire rows as quickly as possible
73#[derive(Debug)]
74struct SimpleStructSchedulerJob<'a> {
75    scheduler: &'a SimpleStructScheduler,
76    /// A min-heap whose key is the # of rows currently scheduled
77    children: BinaryHeap<SchedulingJobWithStatus<'a>>,
78    rows_scheduled: u64,
79    num_rows: u64,
80    initialized: bool,
81}
82
83impl<'a> SimpleStructSchedulerJob<'a> {
84    fn new(
85        scheduler: &'a SimpleStructScheduler,
86        children: Vec<Box<dyn SchedulingJob + 'a>>,
87        num_rows: u64,
88    ) -> Self {
89        let children = children
90            .into_iter()
91            .enumerate()
92            .map(|(idx, job)| SchedulingJobWithStatus {
93                col_idx: idx as u32,
94                col_name: scheduler.child_fields[idx].name(),
95                job,
96                rows_scheduled: 0,
97                rows_remaining: num_rows,
98            })
99            .collect::<BinaryHeap<_>>();
100        Self {
101            scheduler,
102            children,
103            rows_scheduled: 0,
104            num_rows,
105            initialized: false,
106        }
107    }
108}
109
110impl SchedulingJob for SimpleStructSchedulerJob<'_> {
111    fn schedule_next(
112        &mut self,
113        mut context: &mut SchedulerContext,
114        priority: &dyn PriorityRange,
115    ) -> Result<ScheduledScanLine> {
116        let mut decoders = Vec::new();
117        if !self.initialized {
118            // Send info to the decoder thread so it knows a struct is here.  In the future we will also
119            // send validity info here.
120            let struct_decoder = Box::new(SimpleStructDecoder::new(
121                self.scheduler.child_fields.clone(),
122                self.num_rows,
123            ));
124            let struct_decoder = context.locate_decoder(struct_decoder);
125            decoders.push(MessageType::DecoderReady(struct_decoder));
126            self.initialized = true;
127        }
128        let old_rows_scheduled = self.rows_scheduled;
129        // Schedule as many children as we need to until we have scheduled at least one
130        // complete row
131        while old_rows_scheduled == self.rows_scheduled {
132            let mut next_child = self.children.pop().unwrap();
133            trace!("Scheduling more rows for child {}", next_child.col_idx);
134            let scoped = context.push(next_child.col_name, next_child.col_idx);
135            let child_scan = next_child.job.schedule_next(scoped.context, priority)?;
136            trace!(
137                "Scheduled {} rows for child {}",
138                child_scan.rows_scheduled,
139                next_child.col_idx
140            );
141            next_child.rows_scheduled += child_scan.rows_scheduled;
142            next_child.rows_remaining -= child_scan.rows_scheduled;
143            decoders.extend(child_scan.decoders);
144            self.children.push(next_child);
145            self.rows_scheduled = self.children.peek().unwrap().rows_scheduled;
146            context = scoped.pop();
147        }
148        let struct_rows_scheduled = self.rows_scheduled - old_rows_scheduled;
149        Ok(ScheduledScanLine {
150            decoders,
151            rows_scheduled: struct_rows_scheduled,
152        })
153    }
154
155    fn num_rows(&self) -> u64 {
156        self.num_rows
157    }
158}
159
160/// A scheduler for structs
161///
162/// The implementation is actually a bit more tricky than one might initially think.  We can't just
163/// go through and schedule each column one after the other.  This would mean our decode can't start
164/// until nearly all the data has arrived (since we need data from each column)
165///
166/// Instead, we schedule in row-major fashion
167///
168/// Note: this scheduler is the starting point for all decoding.  This is because we treat the top-level
169/// record batch as a non-nullable struct.
170#[derive(Debug)]
171pub struct SimpleStructScheduler {
172    children: Vec<Arc<dyn FieldScheduler>>,
173    child_fields: Fields,
174    num_rows: u64,
175}
176
177impl SimpleStructScheduler {
178    pub fn new(children: Vec<Arc<dyn FieldScheduler>>, child_fields: Fields) -> Self {
179        debug_assert!(!children.is_empty());
180        let num_rows = children[0].num_rows();
181        debug_assert!(children.iter().all(|child| child.num_rows() == num_rows));
182        Self {
183            children,
184            child_fields,
185            num_rows,
186        }
187    }
188}
189
190impl FieldScheduler for SimpleStructScheduler {
191    fn schedule_ranges<'a>(
192        &'a self,
193        ranges: &[Range<u64>],
194        filter: &FilterExpression,
195    ) -> Result<Box<dyn SchedulingJob + 'a>> {
196        let child_schedulers = self
197            .children
198            .iter()
199            .map(|child| child.schedule_ranges(ranges, filter))
200            .collect::<Result<Vec<_>>>()?;
201        let num_rows = child_schedulers[0].num_rows();
202        Ok(Box::new(SimpleStructSchedulerJob::new(
203            self,
204            child_schedulers,
205            num_rows,
206        )))
207    }
208
209    fn num_rows(&self) -> u64 {
210        self.num_rows
211    }
212
213    fn initialize<'a>(
214        &'a self,
215        _filter: &'a FilterExpression,
216        _context: &'a SchedulerContext,
217    ) -> BoxFuture<'a, Result<()>> {
218        let futures = self
219            .children
220            .iter()
221            .map(|child| child.initialize(_filter, _context))
222            .collect::<FuturesUnordered<_>>();
223        async move {
224            futures
225                .map(|res| res.map(|_| ()))
226                .try_collect::<Vec<_>>()
227                .await?;
228            Ok(())
229        }
230        .boxed()
231    }
232}
233
234#[derive(Debug)]
235struct StructuralSchedulingJobWithStatus<'a> {
236    col_idx: u32,
237    col_name: &'a str,
238    job: Box<dyn StructuralSchedulingJob + 'a>,
239    rows_scheduled: u64,
240    rows_remaining: u64,
241}
242
243impl PartialEq for StructuralSchedulingJobWithStatus<'_> {
244    fn eq(&self, other: &Self) -> bool {
245        self.col_idx == other.col_idx
246    }
247}
248
249impl Eq for StructuralSchedulingJobWithStatus<'_> {}
250
251impl PartialOrd for StructuralSchedulingJobWithStatus<'_> {
252    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
253        Some(self.cmp(other))
254    }
255}
256
257impl Ord for StructuralSchedulingJobWithStatus<'_> {
258    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
259        // Note this is reversed to make it min-heap
260        other.rows_scheduled.cmp(&self.rows_scheduled)
261    }
262}
263
264/// Scheduling job for struct data
265///
266/// The order in which we schedule the children is important.  We want to schedule the child
267/// with the least amount of data first.
268///
269/// This allows us to decode entire rows as quickly as possible
270#[derive(Debug)]
271struct RepDefStructSchedulingJob<'a> {
272    /// A min-heap whose key is the # of rows currently scheduled
273    children: BinaryHeap<StructuralSchedulingJobWithStatus<'a>>,
274    rows_scheduled: u64,
275}
276
277impl<'a> RepDefStructSchedulingJob<'a> {
278    fn new(
279        scheduler: &'a StructuralStructScheduler,
280        children: Vec<Box<dyn StructuralSchedulingJob + 'a>>,
281        num_rows: u64,
282    ) -> Self {
283        let children = children
284            .into_iter()
285            .enumerate()
286            .map(|(idx, job)| StructuralSchedulingJobWithStatus {
287                col_idx: idx as u32,
288                col_name: scheduler.child_fields[idx].name(),
289                job,
290                rows_scheduled: 0,
291                rows_remaining: num_rows,
292            })
293            .collect::<BinaryHeap<_>>();
294        Self {
295            children,
296            rows_scheduled: 0,
297        }
298    }
299}
300
301impl StructuralSchedulingJob for RepDefStructSchedulingJob<'_> {
302    fn schedule_next(
303        &mut self,
304        mut context: &mut SchedulerContext,
305    ) -> Result<Option<ScheduledScanLine>> {
306        let mut decoders = Vec::new();
307        let old_rows_scheduled = self.rows_scheduled;
308        // Schedule as many children as we need to until we have scheduled at least one
309        // complete row
310        while old_rows_scheduled == self.rows_scheduled {
311            let mut next_child = self.children.pop().unwrap();
312            let scoped = context.push(next_child.col_name, next_child.col_idx);
313            let child_scan = next_child.job.schedule_next(scoped.context)?;
314            // next_child is the least-scheduled child and, if it's done, that
315            // means we are completely done.
316            if child_scan.is_none() {
317                return Ok(None);
318            }
319            let child_scan = child_scan.unwrap();
320
321            trace!(
322                "Scheduled {} rows for child {}",
323                child_scan.rows_scheduled,
324                next_child.col_idx
325            );
326            next_child.rows_scheduled += child_scan.rows_scheduled;
327            next_child.rows_remaining -= child_scan.rows_scheduled;
328            decoders.extend(child_scan.decoders);
329            self.children.push(next_child);
330            self.rows_scheduled = self.children.peek().unwrap().rows_scheduled;
331            context = scoped.pop();
332        }
333        let struct_rows_scheduled = self.rows_scheduled - old_rows_scheduled;
334        Ok(Some(ScheduledScanLine {
335            decoders,
336            rows_scheduled: struct_rows_scheduled,
337        }))
338    }
339}
340
341/// A scheduler for structs
342///
343/// The implementation is actually a bit more tricky than one might initially think.  We can't just
344/// go through and schedule each column one after the other.  This would mean our decode can't start
345/// until nearly all the data has arrived (since we need data from each column to yield a batch)
346///
347/// Instead, we schedule in row-major fashion
348///
349/// Note: this scheduler is the starting point for all decoding.  This is because we treat the top-level
350/// record batch as a non-nullable struct.
351#[derive(Debug)]
352pub struct StructuralStructScheduler {
353    children: Vec<Box<dyn StructuralFieldScheduler>>,
354    child_fields: Fields,
355}
356
357impl StructuralStructScheduler {
358    pub fn new(children: Vec<Box<dyn StructuralFieldScheduler>>, child_fields: Fields) -> Self {
359        debug_assert!(!children.is_empty());
360        Self {
361            children,
362            child_fields,
363        }
364    }
365}
366
367impl StructuralFieldScheduler for StructuralStructScheduler {
368    fn schedule_ranges<'a>(
369        &'a self,
370        ranges: &[Range<u64>],
371        filter: &FilterExpression,
372    ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
373        let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
374
375        let child_schedulers = self
376            .children
377            .iter()
378            .map(|child| child.schedule_ranges(ranges, filter))
379            .collect::<Result<Vec<_>>>()?;
380
381        Ok(Box::new(RepDefStructSchedulingJob::new(
382            self,
383            child_schedulers,
384            num_rows,
385        )))
386    }
387
388    fn initialize<'a>(
389        &'a mut self,
390        filter: &'a FilterExpression,
391        context: &'a SchedulerContext,
392    ) -> BoxFuture<'a, Result<()>> {
393        let children_initialization = self
394            .children
395            .iter_mut()
396            .map(|child| child.initialize(filter, context))
397            .collect::<FuturesUnordered<_>>();
398        async move {
399            children_initialization
400                .map(|res| res.map(|_| ()))
401                .try_collect::<Vec<_>>()
402                .await?;
403            Ok(())
404        }
405        .boxed()
406    }
407}
408
409#[derive(Debug)]
410struct ChildState {
411    // As child decoders are scheduled they are added to this queue
412    // Once the decoder is fully drained it is popped from this queue
413    //
414    // TODO: It may be a minor perf optimization, in some rare cases, if we have a separate
415    // "fully awaited but not yet drained" queue so we don't loop through fully awaited pages
416    // during each call to wait.
417    //
418    // Note: This queue may have more than one page in it if the batch size is very large
419    // or pages are very small
420    // TODO: Test this case
421    scheduled: VecDeque<Box<dyn LogicalPageDecoder>>,
422    // Rows that have been awaited
423    rows_loaded: u64,
424    // Rows that have drained
425    rows_drained: u64,
426    // Rows that have been popped (the decoder has been completely drained and removed from `scheduled`)
427    rows_popped: u64,
428    // Total number of rows in the struct
429    num_rows: u64,
430    // The field index in the struct (used for debugging / logging)
431    field_index: u32,
432}
433
434struct CompositeDecodeTask {
435    // One per child
436    tasks: Vec<Box<dyn DecodeArrayTask>>,
437    num_rows: u64,
438    has_more: bool,
439}
440
441impl CompositeDecodeTask {
442    fn decode(self) -> Result<ArrayRef> {
443        let arrays = self
444            .tasks
445            .into_iter()
446            .map(|task| task.decode())
447            .collect::<Result<Vec<_>>>()?;
448        let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
449        // TODO: If this is a primitive column we should be able to avoid this
450        // allocation + copy with "page bridging" which could save us a few CPU
451        // cycles.
452        //
453        // This optimization is probably most important for super fast storage like NVME
454        // where the page size can be smaller.
455        Ok(arrow_select::concat::concat(&array_refs)?)
456    }
457}
458
459impl ChildState {
460    fn new(num_rows: u64, field_index: u32) -> Self {
461        Self {
462            scheduled: VecDeque::new(),
463            rows_loaded: 0,
464            rows_drained: 0,
465            rows_popped: 0,
466            num_rows,
467            field_index,
468        }
469    }
470
471    // Wait for the next set of rows to arrive
472    //
473    // Wait until we have at least `loaded_need` loaded and stop as soon as we
474    // go above that limit.
475    async fn wait_for_loaded(&mut self, loaded_need: u64) -> Result<()> {
476        trace!(
477            "Struct child {} waiting for more than {} rows to be loaded and {} are fully loaded already",
478            self.field_index,
479            loaded_need,
480            self.rows_loaded,
481        );
482        let mut fully_loaded = self.rows_popped;
483        for (page_idx, next_decoder) in self.scheduled.iter_mut().enumerate() {
484            if next_decoder.rows_unloaded() > 0 {
485                let mut current_need = loaded_need;
486                current_need -= fully_loaded;
487                let rows_in_page = next_decoder.num_rows();
488                let need_for_page = (rows_in_page - 1).min(current_need);
489                trace!(
490                    "Struct child {} page {} will wait until more than {} rows loaded from page with {} rows",
491                    self.field_index,
492                    page_idx,
493                    need_for_page,
494                    rows_in_page,
495                );
496                // We might only await part of a page.  This is important for things
497                // like the struct<struct<...>> case where we have one outer page, one
498                // middle page, and then a bunch of inner pages.  If we await the entire
499                // middle page then we will have to wait for all the inner pages to arrive
500                // before we can start decoding.
501                next_decoder.wait_for_loaded(need_for_page).await?;
502                let now_loaded = next_decoder.rows_loaded();
503                fully_loaded += now_loaded;
504                trace!(
505                    "Struct child {} page {} await and now has {} loaded rows and we have {} fully loaded",
506                    self.field_index,
507                    page_idx,
508                    now_loaded,
509                    fully_loaded
510                );
511            } else {
512                fully_loaded += next_decoder.num_rows();
513            }
514            if fully_loaded > loaded_need {
515                break;
516            }
517        }
518        self.rows_loaded = fully_loaded;
519        trace!(
520            "Struct child {} loaded {} new rows and now {} are loaded",
521            self.field_index,
522            fully_loaded,
523            self.rows_loaded
524        );
525        Ok(())
526    }
527
528    fn drain(&mut self, num_rows: u64) -> Result<CompositeDecodeTask> {
529        trace!("Struct draining {} rows", num_rows);
530
531        trace!(
532            "Draining {} rows from struct page with {} rows already drained",
533            num_rows,
534            self.rows_drained
535        );
536        let mut remaining = num_rows;
537        let mut composite = CompositeDecodeTask {
538            tasks: Vec::new(),
539            num_rows: 0,
540            has_more: true,
541        };
542        while remaining > 0 {
543            let next = self.scheduled.front_mut().unwrap();
544            let rows_to_take = remaining.min(next.rows_left());
545            let next_task = next.drain(rows_to_take)?;
546            if next.rows_left() == 0 {
547                trace!("Completely drained page");
548                self.rows_popped += next.num_rows();
549                self.scheduled.pop_front();
550            }
551            remaining -= rows_to_take;
552            composite.tasks.push(next_task.task);
553            composite.num_rows += next_task.num_rows;
554        }
555        self.rows_drained += num_rows;
556        composite.has_more = self.rows_drained != self.num_rows;
557        Ok(composite)
558    }
559}
560
561// Wrapper around ChildState that orders using rows_unawaited
562struct WaitOrder<'a>(&'a mut ChildState);
563
564impl Eq for WaitOrder<'_> {}
565impl PartialEq for WaitOrder<'_> {
566    fn eq(&self, other: &Self) -> bool {
567        self.0.rows_loaded == other.0.rows_loaded
568    }
569}
570impl Ord for WaitOrder<'_> {
571    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
572        // Note: this is inverted so we have a min-heap
573        other.0.rows_loaded.cmp(&self.0.rows_loaded)
574    }
575}
576impl PartialOrd for WaitOrder<'_> {
577    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
578        Some(self.cmp(other))
579    }
580}
581
582#[derive(Debug)]
583pub struct StructuralStructDecoder {
584    children: Vec<Box<dyn StructuralFieldDecoder>>,
585    data_type: DataType,
586    child_fields: Fields,
587    // The root decoder is slightly different because it cannot have nulls
588    is_root: bool,
589}
590
591impl StructuralStructDecoder {
592    pub fn new(fields: Fields, should_validate: bool, is_root: bool) -> Self {
593        let children = fields
594            .iter()
595            .map(|field| Self::field_to_decoder(field, should_validate))
596            .collect();
597        let data_type = DataType::Struct(fields.clone());
598        Self {
599            data_type,
600            children,
601            child_fields: fields,
602            is_root,
603        }
604    }
605
606    fn field_to_decoder(
607        field: &Arc<arrow_schema::Field>,
608        should_validate: bool,
609    ) -> Box<dyn StructuralFieldDecoder> {
610        match field.data_type() {
611            DataType::Struct(fields) => {
612                if field.is_packed_struct() {
613                    let decoder =
614                        StructuralPrimitiveFieldDecoder::new(&field.clone(), should_validate);
615                    Box::new(decoder)
616                } else {
617                    Box::new(Self::new(fields.clone(), should_validate, false))
618                }
619            }
620            DataType::List(child_field) | DataType::LargeList(child_field) => {
621                let child_decoder = Self::field_to_decoder(child_field, should_validate);
622                Box::new(StructuralListDecoder::new(
623                    child_decoder,
624                    field.data_type().clone(),
625                ))
626            }
627            DataType::RunEndEncoded(_, _) => todo!(),
628            DataType::ListView(_) | DataType::LargeListView(_) => todo!(),
629            DataType::Map(_, _) => todo!(),
630            DataType::Union(_, _) => todo!(),
631            _ => Box::new(StructuralPrimitiveFieldDecoder::new(field, should_validate)),
632        }
633    }
634
635    pub fn drain_batch_task(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
636        let array_drain = self.drain(num_rows)?;
637        Ok(NextDecodeTask {
638            num_rows,
639            task: Box::new(array_drain),
640        })
641    }
642}
643
644impl StructuralFieldDecoder for StructuralStructDecoder {
645    fn accept_page(&mut self, mut child: LoadedPage) -> Result<()> {
646        // children with empty path should not be delivered to this method
647        let child_idx = child.path.pop_front().unwrap();
648        // This decoder is intended for one of our children
649        self.children[child_idx as usize].accept_page(child)?;
650        Ok(())
651    }
652
653    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
654        let child_tasks = self
655            .children
656            .iter_mut()
657            .map(|child| child.drain(num_rows))
658            .collect::<Result<Vec<_>>>()?;
659        Ok(Box::new(RepDefStructDecodeTask {
660            children: child_tasks,
661            child_fields: self.child_fields.clone(),
662            is_root: self.is_root,
663        }))
664    }
665
666    fn data_type(&self) -> &DataType {
667        &self.data_type
668    }
669}
670
671#[derive(Debug)]
672struct RepDefStructDecodeTask {
673    children: Vec<Box<dyn StructuralDecodeArrayTask>>,
674    child_fields: Fields,
675    is_root: bool,
676}
677
678impl StructuralDecodeArrayTask for RepDefStructDecodeTask {
679    fn decode(self: Box<Self>) -> Result<DecodedArray> {
680        let arrays = self
681            .children
682            .into_iter()
683            .map(|task| task.decode())
684            .collect::<Result<Vec<_>>>()?;
685        let mut children = Vec::with_capacity(arrays.len());
686        let mut arrays_iter = arrays.into_iter();
687        let first_array = arrays_iter.next().unwrap();
688        let length = first_array.array.len();
689
690        // The repdef should be identical across all children at this point
691        let mut repdef = first_array.repdef;
692        children.push(first_array.array);
693
694        for array in arrays_iter {
695            debug_assert_eq!(length, array.array.len());
696            children.push(array.array);
697        }
698
699        let validity = if self.is_root {
700            None
701        } else {
702            repdef.unravel_validity(length)
703        };
704        let array = StructArray::new(self.child_fields, children, validity);
705        Ok(DecodedArray {
706            array: Arc::new(array),
707            repdef,
708        })
709    }
710}
711
712#[derive(Debug)]
713pub struct SimpleStructDecoder {
714    children: Vec<ChildState>,
715    child_fields: Fields,
716    data_type: DataType,
717    num_rows: u64,
718}
719
720impl SimpleStructDecoder {
721    pub fn new(child_fields: Fields, num_rows: u64) -> Self {
722        let data_type = DataType::Struct(child_fields.clone());
723        Self {
724            children: child_fields
725                .iter()
726                .enumerate()
727                .map(|(idx, _)| ChildState::new(num_rows, idx as u32))
728                .collect(),
729            child_fields,
730            data_type,
731            num_rows,
732        }
733    }
734
735    async fn do_wait_for_loaded(&mut self, loaded_need: u64) -> Result<()> {
736        let mut wait_orders = self
737            .children
738            .iter_mut()
739            .filter_map(|child| {
740                if child.rows_loaded <= loaded_need {
741                    Some(WaitOrder(child))
742                } else {
743                    None
744                }
745            })
746            .collect::<BinaryHeap<_>>();
747        while !wait_orders.is_empty() {
748            let next_waiter = wait_orders.pop().unwrap();
749            let next_highest = wait_orders
750                .peek()
751                .map(|w| w.0.rows_loaded)
752                .unwrap_or(u64::MAX);
753            // Wait until you have the number of rows needed, or at least more than the
754            // next highest waiter
755            let limit = loaded_need.min(next_highest);
756            next_waiter.0.wait_for_loaded(limit).await?;
757            log::trace!(
758                "Struct child {} finished await pass and now {} are loaded",
759                next_waiter.0.field_index,
760                next_waiter.0.rows_loaded
761            );
762            if next_waiter.0.rows_loaded <= loaded_need {
763                wait_orders.push(next_waiter);
764            }
765        }
766        Ok(())
767    }
768}
769
770impl LogicalPageDecoder for SimpleStructDecoder {
771    fn accept_child(&mut self, mut child: DecoderReady) -> Result<()> {
772        // children with empty path should not be delivered to this method
773        let child_idx = child.path.pop_front().unwrap();
774        if child.path.is_empty() {
775            // This decoder is intended for us
776            self.children[child_idx as usize]
777                .scheduled
778                .push_back(child.decoder);
779        } else {
780            // This decoder is intended for one of our children
781            let intended = self.children[child_idx as usize].scheduled.back_mut().ok_or_else(|| Error::Internal { message: format!("Decoder scheduled for child at index {} but we don't have any child at that index yet", child_idx), location: location!() })?;
782            intended.accept_child(child)?;
783        }
784        Ok(())
785    }
786
787    fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<Result<()>> {
788        self.do_wait_for_loaded(loaded_need).boxed()
789    }
790
791    fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
792        let child_tasks = self
793            .children
794            .iter_mut()
795            .map(|child| child.drain(num_rows))
796            .collect::<Result<Vec<_>>>()?;
797        let num_rows = child_tasks[0].num_rows;
798        debug_assert!(child_tasks.iter().all(|task| task.num_rows == num_rows));
799        Ok(NextDecodeTask {
800            task: Box::new(SimpleStructDecodeTask {
801                children: child_tasks,
802                child_fields: self.child_fields.clone(),
803            }),
804            num_rows,
805        })
806    }
807
808    fn rows_loaded(&self) -> u64 {
809        self.children.iter().map(|c| c.rows_loaded).min().unwrap()
810    }
811
812    fn rows_drained(&self) -> u64 {
813        // All children should have the same number of rows drained
814        debug_assert!(self
815            .children
816            .iter()
817            .all(|c| c.rows_drained == self.children[0].rows_drained));
818        self.children[0].rows_drained
819    }
820
821    fn num_rows(&self) -> u64 {
822        self.num_rows
823    }
824
825    fn data_type(&self) -> &DataType {
826        &self.data_type
827    }
828}
829
830struct SimpleStructDecodeTask {
831    children: Vec<CompositeDecodeTask>,
832    child_fields: Fields,
833}
834
835impl DecodeArrayTask for SimpleStructDecodeTask {
836    fn decode(self: Box<Self>) -> Result<ArrayRef> {
837        let child_arrays = self
838            .children
839            .into_iter()
840            .map(|child| child.decode())
841            .collect::<Result<Vec<_>>>()?;
842        Ok(Arc::new(StructArray::try_new(
843            self.child_fields,
844            child_arrays,
845            None,
846        )?))
847    }
848}
849
850/// A structural encoder for struct fields
851///
852/// The struct's validity is added to the rep/def builder
853/// and the builder is cloned to all children.
854pub struct StructStructuralEncoder {
855    children: Vec<Box<dyn FieldEncoder>>,
856}
857
858impl StructStructuralEncoder {
859    pub fn new(children: Vec<Box<dyn FieldEncoder>>) -> Self {
860        Self { children }
861    }
862}
863
864impl FieldEncoder for StructStructuralEncoder {
865    fn maybe_encode(
866        &mut self,
867        array: ArrayRef,
868        external_buffers: &mut OutOfLineBuffers,
869        mut repdef: RepDefBuilder,
870        row_number: u64,
871        num_rows: u64,
872    ) -> Result<Vec<EncodeTask>> {
873        let struct_array = array.as_struct();
874        if let Some(validity) = struct_array.nulls() {
875            repdef.add_validity_bitmap(validity.clone());
876        } else {
877            repdef.add_no_null(struct_array.len());
878        }
879        let child_tasks = self
880            .children
881            .iter_mut()
882            .zip(struct_array.columns().iter())
883            .map(|(encoder, arr)| {
884                encoder.maybe_encode(
885                    arr.clone(),
886                    external_buffers,
887                    repdef.clone(),
888                    row_number,
889                    num_rows,
890                )
891            })
892            .collect::<Result<Vec<_>>>()?;
893        Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
894    }
895
896    fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
897        self.children
898            .iter_mut()
899            .map(|encoder| encoder.flush(external_buffers))
900            .flatten_ok()
901            .collect::<Result<Vec<_>>>()
902    }
903
904    fn num_columns(&self) -> u32 {
905        self.children
906            .iter()
907            .map(|child| child.num_columns())
908            .sum::<u32>()
909    }
910
911    fn finish(
912        &mut self,
913        external_buffers: &mut OutOfLineBuffers,
914    ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
915        let mut child_columns = self
916            .children
917            .iter_mut()
918            .map(|child| child.finish(external_buffers))
919            .collect::<FuturesOrdered<_>>();
920        async move {
921            let mut encoded_columns = Vec::with_capacity(child_columns.len());
922            while let Some(child_cols) = child_columns.next().await {
923                encoded_columns.extend(child_cols?);
924            }
925            Ok(encoded_columns)
926        }
927        .boxed()
928    }
929}
930
931pub struct StructFieldEncoder {
932    children: Vec<Box<dyn FieldEncoder>>,
933    column_index: u32,
934    num_rows_seen: u64,
935}
936
937impl StructFieldEncoder {
938    #[allow(dead_code)]
939    pub fn new(children: Vec<Box<dyn FieldEncoder>>, column_index: u32) -> Self {
940        Self {
941            children,
942            column_index,
943            num_rows_seen: 0,
944        }
945    }
946}
947
948impl FieldEncoder for StructFieldEncoder {
949    fn maybe_encode(
950        &mut self,
951        array: ArrayRef,
952        external_buffers: &mut OutOfLineBuffers,
953        repdef: RepDefBuilder,
954        row_number: u64,
955        num_rows: u64,
956    ) -> Result<Vec<EncodeTask>> {
957        self.num_rows_seen += array.len() as u64;
958        let struct_array = array.as_struct();
959        let child_tasks = self
960            .children
961            .iter_mut()
962            .zip(struct_array.columns().iter())
963            .map(|(encoder, arr)| {
964                encoder.maybe_encode(
965                    arr.clone(),
966                    external_buffers,
967                    repdef.clone(),
968                    row_number,
969                    num_rows,
970                )
971            })
972            .collect::<Result<Vec<_>>>()?;
973        Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
974    }
975
976    fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
977        let child_tasks = self
978            .children
979            .iter_mut()
980            .map(|encoder| encoder.flush(external_buffers))
981            .collect::<Result<Vec<_>>>()?;
982        Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
983    }
984
985    fn num_columns(&self) -> u32 {
986        self.children
987            .iter()
988            .map(|child| child.num_columns())
989            .sum::<u32>()
990            + 1
991    }
992
993    fn finish(
994        &mut self,
995        external_buffers: &mut OutOfLineBuffers,
996    ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
997        let mut child_columns = self
998            .children
999            .iter_mut()
1000            .map(|child| child.finish(external_buffers))
1001            .collect::<FuturesOrdered<_>>();
1002        let num_rows_seen = self.num_rows_seen;
1003        let column_index = self.column_index;
1004        async move {
1005            let mut columns = Vec::new();
1006            // Add a column for the struct header
1007            let mut header = EncodedColumn::default();
1008            header.final_pages.push(EncodedPage {
1009                data: Vec::new(),
1010                description: PageEncoding::Legacy(pb::ArrayEncoding {
1011                    array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
1012                        pb::SimpleStruct {},
1013                    )),
1014                }),
1015                num_rows: num_rows_seen,
1016                column_idx: column_index,
1017                row_number: 0, // Not used by legacy encoding
1018            });
1019            columns.push(header);
1020            // Now run finish on the children
1021            while let Some(child_cols) = child_columns.next().await {
1022                columns.extend(child_cols?);
1023            }
1024            Ok(columns)
1025        }
1026        .boxed()
1027    }
1028}
1029
1030#[cfg(test)]
1031mod tests {
1032
1033    use std::{collections::HashMap, sync::Arc};
1034
1035    use arrow_array::{
1036        builder::{Int32Builder, ListBuilder},
1037        Array, ArrayRef, Int32Array, StructArray,
1038    };
1039    use arrow_buffer::NullBuffer;
1040    use arrow_schema::{DataType, Field, Fields};
1041
1042    use crate::{
1043        testing::{check_round_trip_encoding_of_data, check_round_trip_encoding_random, TestCases},
1044        version::LanceFileVersion,
1045    };
1046
1047    #[test_log::test(tokio::test)]
1048    async fn test_simple_struct() {
1049        let data_type = DataType::Struct(Fields::from(vec![
1050            Field::new("a", DataType::Int32, false),
1051            Field::new("b", DataType::Int32, false),
1052        ]));
1053        let field = Field::new("", data_type, false);
1054        check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
1055    }
1056
1057    #[test_log::test(tokio::test)]
1058    async fn test_nullable_struct() {
1059        // Test data struct<score: int32, location: struct<x: int32, y: int32>>
1060        // - score: null
1061        //   location:
1062        //     x: 1
1063        //     y: 6
1064        // - score: 12
1065        //   location:
1066        //     x: 2
1067        //     y: null
1068        // - score: 13
1069        //   location:
1070        //     x: 3
1071        //     y: 8
1072        // - score: 14
1073        //   location: null
1074        // - null
1075        //
1076        let inner_fields = Fields::from(vec![
1077            Field::new("x", DataType::Int32, false),
1078            Field::new("y", DataType::Int32, true),
1079        ]);
1080        let inner_struct = DataType::Struct(inner_fields.clone());
1081        let outer_fields = Fields::from(vec![
1082            Field::new("score", DataType::Int32, true),
1083            Field::new("location", inner_struct, true),
1084        ]);
1085
1086        let x_vals = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4), Some(5)]);
1087        let y_vals = Int32Array::from(vec![Some(6), None, Some(8), Some(9), Some(10)]);
1088        let scores = Int32Array::from(vec![None, Some(12), Some(13), Some(14), Some(15)]);
1089
1090        let location_validity = NullBuffer::from(vec![true, true, true, false, true]);
1091        let locations = StructArray::new(
1092            inner_fields,
1093            vec![Arc::new(x_vals), Arc::new(y_vals)],
1094            Some(location_validity),
1095        );
1096
1097        let rows_validity = NullBuffer::from(vec![true, true, true, true, false]);
1098        let rows = StructArray::new(
1099            outer_fields,
1100            vec![Arc::new(scores), Arc::new(locations)],
1101            Some(rows_validity),
1102        );
1103
1104        let test_cases = TestCases::default().with_file_version(LanceFileVersion::V2_1);
1105
1106        check_round_trip_encoding_of_data(vec![Arc::new(rows)], &test_cases, HashMap::new()).await;
1107    }
1108
1109    #[test_log::test(tokio::test)]
1110    async fn test_struct_list() {
1111        let data_type = DataType::Struct(Fields::from(vec![
1112            Field::new(
1113                "inner_list",
1114                DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
1115                true,
1116            ),
1117            Field::new("outer_int", DataType::Int32, true),
1118        ]));
1119        let field = Field::new("row", data_type, false);
1120        check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
1121    }
1122
1123    #[test_log::test(tokio::test)]
1124    async fn test_complicated_struct() {
1125        let data_type = DataType::Struct(Fields::from(vec![
1126            Field::new("int", DataType::Int32, true),
1127            Field::new(
1128                "inner",
1129                DataType::Struct(Fields::from(vec![
1130                    Field::new("inner_int", DataType::Int32, true),
1131                    Field::new(
1132                        "inner_list",
1133                        DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
1134                        true,
1135                    ),
1136                ])),
1137                true,
1138            ),
1139            Field::new("outer_binary", DataType::Binary, true),
1140        ]));
1141        let field = Field::new("row", data_type, false);
1142        check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
1143    }
1144
1145    #[test_log::test(tokio::test)]
1146    async fn test_ragged_scheduling() {
1147        // This test covers scheduling when batches straddle page boundaries
1148
1149        // Create a list with 10k nulls
1150        let items_builder = Int32Builder::new();
1151        let mut list_builder = ListBuilder::new(items_builder);
1152        for _ in 0..10000 {
1153            list_builder.append_null();
1154        }
1155        let list_array = Arc::new(list_builder.finish());
1156        let int_array = Arc::new(Int32Array::from_iter_values(0..10000));
1157        let fields = vec![
1158            Field::new("", list_array.data_type().clone(), true),
1159            Field::new("", int_array.data_type().clone(), true),
1160        ];
1161        let struct_array = Arc::new(StructArray::new(
1162            Fields::from(fields),
1163            vec![list_array, int_array],
1164            None,
1165        )) as ArrayRef;
1166        let struct_arrays = (0..10000)
1167            // Intentionally skip in some randomish amount to create more ragged scheduling
1168            .step_by(437)
1169            .map(|offset| struct_array.slice(offset, 437.min(10000 - offset)))
1170            .collect::<Vec<_>>();
1171        check_round_trip_encoding_of_data(struct_arrays, &TestCases::default(), HashMap::new())
1172            .await;
1173    }
1174}