lance_encoding/encodings/logical/
struct.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    collections::{BinaryHeap, VecDeque},
6    ops::Range,
7    sync::Arc,
8};
9
10use crate::{
11    decoder::{
12        DecodeArrayTask, DecodedArray, DecoderReady, FieldScheduler, FilterExpression, LoadedPage,
13        LogicalPageDecoder, MessageType, NextDecodeTask, PageEncoding, PriorityRange,
14        ScheduledScanLine, SchedulerContext, SchedulingJob, StructuralDecodeArrayTask,
15        StructuralFieldDecoder, StructuralFieldScheduler, StructuralSchedulingJob,
16    },
17    encoder::{EncodeTask, EncodedColumn, EncodedPage, FieldEncoder, OutOfLineBuffers},
18    format::pb,
19    repdef::RepDefBuilder,
20};
21use arrow_array::{cast::AsArray, Array, ArrayRef, StructArray};
22use arrow_schema::{DataType, Field, Fields};
23use futures::{
24    future::BoxFuture,
25    stream::{FuturesOrdered, FuturesUnordered},
26    FutureExt, StreamExt, TryStreamExt,
27};
28use itertools::Itertools;
29use lance_arrow::deepcopy::deep_copy_nulls;
30use lance_arrow::FieldExt;
31use lance_core::{Error, Result};
32use log::trace;
33use snafu::location;
34
35use super::{list::StructuralListDecoder, primitive::StructuralPrimitiveFieldDecoder};
36
37#[derive(Debug)]
38struct SchedulingJobWithStatus<'a> {
39    col_idx: u32,
40    col_name: &'a str,
41    job: Box<dyn SchedulingJob + 'a>,
42    rows_scheduled: u64,
43    rows_remaining: u64,
44}
45
46impl PartialEq for SchedulingJobWithStatus<'_> {
47    fn eq(&self, other: &Self) -> bool {
48        self.col_idx == other.col_idx
49    }
50}
51
52impl Eq for SchedulingJobWithStatus<'_> {}
53
54impl PartialOrd for SchedulingJobWithStatus<'_> {
55    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
56        Some(self.cmp(other))
57    }
58}
59
60impl Ord for SchedulingJobWithStatus<'_> {
61    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
62        // Note this is reversed to make it min-heap
63        other.rows_scheduled.cmp(&self.rows_scheduled)
64    }
65}
66
67#[derive(Debug)]
68struct EmptyStructDecodeTask {
69    num_rows: u64,
70}
71
72impl DecodeArrayTask for EmptyStructDecodeTask {
73    fn decode(self: Box<Self>) -> Result<ArrayRef> {
74        Ok(Arc::new(StructArray::new_empty_fields(
75            self.num_rows as usize,
76            None,
77        )))
78    }
79}
80
81#[derive(Debug)]
82struct EmptyStructDecoder {
83    num_rows: u64,
84    rows_drained: u64,
85    data_type: DataType,
86}
87
88impl EmptyStructDecoder {
89    fn new(num_rows: u64) -> Self {
90        Self {
91            num_rows,
92            rows_drained: 0,
93            data_type: DataType::Struct(Fields::from(Vec::<Field>::default())),
94        }
95    }
96}
97
98impl LogicalPageDecoder for EmptyStructDecoder {
99    fn wait_for_loaded(&mut self, _loaded_need: u64) -> BoxFuture<Result<()>> {
100        Box::pin(std::future::ready(Ok(())))
101    }
102    fn rows_loaded(&self) -> u64 {
103        self.num_rows
104    }
105    fn rows_unloaded(&self) -> u64 {
106        0
107    }
108    fn num_rows(&self) -> u64 {
109        self.num_rows
110    }
111    fn rows_drained(&self) -> u64 {
112        self.rows_drained
113    }
114    fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
115        self.rows_drained += num_rows;
116        Ok(NextDecodeTask {
117            num_rows,
118            task: Box::new(EmptyStructDecodeTask { num_rows }),
119        })
120    }
121    fn data_type(&self) -> &DataType {
122        &self.data_type
123    }
124}
125
126#[derive(Debug)]
127struct EmptyStructSchedulerJob {
128    num_rows: u64,
129}
130
131impl SchedulingJob for EmptyStructSchedulerJob {
132    fn schedule_next(
133        &mut self,
134        context: &mut SchedulerContext,
135        _priority: &dyn PriorityRange,
136    ) -> Result<ScheduledScanLine> {
137        let empty_decoder = Box::new(EmptyStructDecoder::new(self.num_rows));
138        let struct_decoder = context.locate_decoder(empty_decoder);
139        Ok(ScheduledScanLine {
140            decoders: vec![MessageType::DecoderReady(struct_decoder)],
141            rows_scheduled: self.num_rows,
142        })
143    }
144
145    fn num_rows(&self) -> u64 {
146        self.num_rows
147    }
148}
149
150/// Scheduling job for struct data
151///
152/// The order in which we schedule the children is important.  We want to schedule the child
153/// with the least amount of data first.
154///
155/// This allows us to decode entire rows as quickly as possible
156#[derive(Debug)]
157struct SimpleStructSchedulerJob<'a> {
158    scheduler: &'a SimpleStructScheduler,
159    /// A min-heap whose key is the # of rows currently scheduled
160    children: BinaryHeap<SchedulingJobWithStatus<'a>>,
161    rows_scheduled: u64,
162    num_rows: u64,
163    initialized: bool,
164}
165
166impl<'a> SimpleStructSchedulerJob<'a> {
167    fn new(
168        scheduler: &'a SimpleStructScheduler,
169        children: Vec<Box<dyn SchedulingJob + 'a>>,
170        num_rows: u64,
171    ) -> Self {
172        let children = children
173            .into_iter()
174            .enumerate()
175            .map(|(idx, job)| SchedulingJobWithStatus {
176                col_idx: idx as u32,
177                col_name: scheduler.child_fields[idx].name(),
178                job,
179                rows_scheduled: 0,
180                rows_remaining: num_rows,
181            })
182            .collect::<BinaryHeap<_>>();
183        Self {
184            scheduler,
185            children,
186            rows_scheduled: 0,
187            num_rows,
188            initialized: false,
189        }
190    }
191}
192
193impl SchedulingJob for SimpleStructSchedulerJob<'_> {
194    fn schedule_next(
195        &mut self,
196        mut context: &mut SchedulerContext,
197        priority: &dyn PriorityRange,
198    ) -> Result<ScheduledScanLine> {
199        let mut decoders = Vec::new();
200        if !self.initialized {
201            // Send info to the decoder thread so it knows a struct is here.  In the future we will also
202            // send validity info here.
203            let struct_decoder = Box::new(SimpleStructDecoder::new(
204                self.scheduler.child_fields.clone(),
205                self.num_rows,
206            ));
207            let struct_decoder = context.locate_decoder(struct_decoder);
208            decoders.push(MessageType::DecoderReady(struct_decoder));
209            self.initialized = true;
210        }
211        let old_rows_scheduled = self.rows_scheduled;
212        // Schedule as many children as we need to until we have scheduled at least one
213        // complete row
214        while old_rows_scheduled == self.rows_scheduled {
215            let mut next_child = self.children.pop().unwrap();
216            trace!("Scheduling more rows for child {}", next_child.col_idx);
217            let scoped = context.push(next_child.col_name, next_child.col_idx);
218            let child_scan = next_child.job.schedule_next(scoped.context, priority)?;
219            trace!(
220                "Scheduled {} rows for child {}",
221                child_scan.rows_scheduled,
222                next_child.col_idx
223            );
224            next_child.rows_scheduled += child_scan.rows_scheduled;
225            next_child.rows_remaining -= child_scan.rows_scheduled;
226            decoders.extend(child_scan.decoders);
227            self.children.push(next_child);
228            self.rows_scheduled = self.children.peek().unwrap().rows_scheduled;
229            context = scoped.pop();
230        }
231        let struct_rows_scheduled = self.rows_scheduled - old_rows_scheduled;
232        Ok(ScheduledScanLine {
233            decoders,
234            rows_scheduled: struct_rows_scheduled,
235        })
236    }
237
238    fn num_rows(&self) -> u64 {
239        self.num_rows
240    }
241}
242
243/// A scheduler for structs
244///
245/// The implementation is actually a bit more tricky than one might initially think.  We can't just
246/// go through and schedule each column one after the other.  This would mean our decode can't start
247/// until nearly all the data has arrived (since we need data from each column)
248///
249/// Instead, we schedule in row-major fashion
250///
251/// Note: this scheduler is the starting point for all decoding.  This is because we treat the top-level
252/// record batch as a non-nullable struct.
253#[derive(Debug)]
254pub struct SimpleStructScheduler {
255    children: Vec<Arc<dyn FieldScheduler>>,
256    child_fields: Fields,
257    num_rows: u64,
258}
259
260impl SimpleStructScheduler {
261    pub fn new(
262        children: Vec<Arc<dyn FieldScheduler>>,
263        child_fields: Fields,
264        num_rows: u64,
265    ) -> Self {
266        let num_rows = children
267            .first()
268            .map(|child| child.num_rows())
269            .unwrap_or(num_rows);
270        debug_assert!(children.iter().all(|child| child.num_rows() == num_rows));
271        Self {
272            children,
273            child_fields,
274            num_rows,
275        }
276    }
277}
278
279impl FieldScheduler for SimpleStructScheduler {
280    fn schedule_ranges<'a>(
281        &'a self,
282        ranges: &[Range<u64>],
283        filter: &FilterExpression,
284    ) -> Result<Box<dyn SchedulingJob + 'a>> {
285        if self.children.is_empty() {
286            return Ok(Box::new(EmptyStructSchedulerJob {
287                num_rows: ranges.iter().map(|r| r.end - r.start).sum(),
288            }));
289        }
290        let child_schedulers = self
291            .children
292            .iter()
293            .map(|child| child.schedule_ranges(ranges, filter))
294            .collect::<Result<Vec<_>>>()?;
295        let num_rows = child_schedulers[0].num_rows();
296        Ok(Box::new(SimpleStructSchedulerJob::new(
297            self,
298            child_schedulers,
299            num_rows,
300        )))
301    }
302
303    fn num_rows(&self) -> u64 {
304        self.num_rows
305    }
306
307    fn initialize<'a>(
308        &'a self,
309        _filter: &'a FilterExpression,
310        _context: &'a SchedulerContext,
311    ) -> BoxFuture<'a, Result<()>> {
312        let futures = self
313            .children
314            .iter()
315            .map(|child| child.initialize(_filter, _context))
316            .collect::<FuturesUnordered<_>>();
317        async move {
318            futures
319                .map(|res| res.map(|_| ()))
320                .try_collect::<Vec<_>>()
321                .await?;
322            Ok(())
323        }
324        .boxed()
325    }
326}
327
328#[derive(Debug)]
329struct StructuralSchedulingJobWithStatus<'a> {
330    col_idx: u32,
331    col_name: &'a str,
332    job: Box<dyn StructuralSchedulingJob + 'a>,
333    rows_scheduled: u64,
334    rows_remaining: u64,
335}
336
337impl PartialEq for StructuralSchedulingJobWithStatus<'_> {
338    fn eq(&self, other: &Self) -> bool {
339        self.col_idx == other.col_idx
340    }
341}
342
343impl Eq for StructuralSchedulingJobWithStatus<'_> {}
344
345impl PartialOrd for StructuralSchedulingJobWithStatus<'_> {
346    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
347        Some(self.cmp(other))
348    }
349}
350
351impl Ord for StructuralSchedulingJobWithStatus<'_> {
352    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
353        // Note this is reversed to make it min-heap
354        other.rows_scheduled.cmp(&self.rows_scheduled)
355    }
356}
357
358/// Scheduling job for struct data
359///
360/// The order in which we schedule the children is important.  We want to schedule the child
361/// with the least amount of data first.
362///
363/// This allows us to decode entire rows as quickly as possible
364#[derive(Debug)]
365struct RepDefStructSchedulingJob<'a> {
366    /// A min-heap whose key is the # of rows currently scheduled
367    children: BinaryHeap<StructuralSchedulingJobWithStatus<'a>>,
368    rows_scheduled: u64,
369}
370
371impl<'a> RepDefStructSchedulingJob<'a> {
372    fn new(
373        scheduler: &'a StructuralStructScheduler,
374        children: Vec<Box<dyn StructuralSchedulingJob + 'a>>,
375        num_rows: u64,
376    ) -> Self {
377        let children = children
378            .into_iter()
379            .enumerate()
380            .map(|(idx, job)| StructuralSchedulingJobWithStatus {
381                col_idx: idx as u32,
382                col_name: scheduler.child_fields[idx].name(),
383                job,
384                rows_scheduled: 0,
385                rows_remaining: num_rows,
386            })
387            .collect::<BinaryHeap<_>>();
388        Self {
389            children,
390            rows_scheduled: 0,
391        }
392    }
393}
394
395impl StructuralSchedulingJob for RepDefStructSchedulingJob<'_> {
396    fn schedule_next(
397        &mut self,
398        mut context: &mut SchedulerContext,
399    ) -> Result<Option<ScheduledScanLine>> {
400        let mut decoders = Vec::new();
401        let old_rows_scheduled = self.rows_scheduled;
402        // Schedule as many children as we need to until we have scheduled at least one
403        // complete row
404        while old_rows_scheduled == self.rows_scheduled {
405            let mut next_child = self.children.pop().unwrap();
406            let scoped = context.push(next_child.col_name, next_child.col_idx);
407            let child_scan = next_child.job.schedule_next(scoped.context)?;
408            // next_child is the least-scheduled child and, if it's done, that
409            // means we are completely done.
410            if child_scan.is_none() {
411                return Ok(None);
412            }
413            let child_scan = child_scan.unwrap();
414
415            trace!(
416                "Scheduled {} rows for child {}",
417                child_scan.rows_scheduled,
418                next_child.col_idx
419            );
420            next_child.rows_scheduled += child_scan.rows_scheduled;
421            next_child.rows_remaining -= child_scan.rows_scheduled;
422            decoders.extend(child_scan.decoders);
423            self.children.push(next_child);
424            self.rows_scheduled = self.children.peek().unwrap().rows_scheduled;
425            context = scoped.pop();
426        }
427        let struct_rows_scheduled = self.rows_scheduled - old_rows_scheduled;
428        Ok(Some(ScheduledScanLine {
429            decoders,
430            rows_scheduled: struct_rows_scheduled,
431        }))
432    }
433}
434
435/// A scheduler for structs
436///
437/// The implementation is actually a bit more tricky than one might initially think.  We can't just
438/// go through and schedule each column one after the other.  This would mean our decode can't start
439/// until nearly all the data has arrived (since we need data from each column to yield a batch)
440///
441/// Instead, we schedule in row-major fashion
442///
443/// Note: this scheduler is the starting point for all decoding.  This is because we treat the top-level
444/// record batch as a non-nullable struct.
445#[derive(Debug)]
446pub struct StructuralStructScheduler {
447    children: Vec<Box<dyn StructuralFieldScheduler>>,
448    child_fields: Fields,
449}
450
451impl StructuralStructScheduler {
452    pub fn new(children: Vec<Box<dyn StructuralFieldScheduler>>, child_fields: Fields) -> Self {
453        debug_assert!(!children.is_empty());
454        Self {
455            children,
456            child_fields,
457        }
458    }
459}
460
461impl StructuralFieldScheduler for StructuralStructScheduler {
462    fn schedule_ranges<'a>(
463        &'a self,
464        ranges: &[Range<u64>],
465        filter: &FilterExpression,
466    ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
467        let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
468
469        let child_schedulers = self
470            .children
471            .iter()
472            .map(|child| child.schedule_ranges(ranges, filter))
473            .collect::<Result<Vec<_>>>()?;
474
475        Ok(Box::new(RepDefStructSchedulingJob::new(
476            self,
477            child_schedulers,
478            num_rows,
479        )))
480    }
481
482    fn initialize<'a>(
483        &'a mut self,
484        filter: &'a FilterExpression,
485        context: &'a SchedulerContext,
486    ) -> BoxFuture<'a, Result<()>> {
487        let children_initialization = self
488            .children
489            .iter_mut()
490            .map(|child| child.initialize(filter, context))
491            .collect::<FuturesUnordered<_>>();
492        async move {
493            children_initialization
494                .map(|res| res.map(|_| ()))
495                .try_collect::<Vec<_>>()
496                .await?;
497            Ok(())
498        }
499        .boxed()
500    }
501}
502
503#[derive(Debug)]
504struct ChildState {
505    // As child decoders are scheduled they are added to this queue
506    // Once the decoder is fully drained it is popped from this queue
507    //
508    // TODO: It may be a minor perf optimization, in some rare cases, if we have a separate
509    // "fully awaited but not yet drained" queue so we don't loop through fully awaited pages
510    // during each call to wait.
511    //
512    // Note: This queue may have more than one page in it if the batch size is very large
513    // or pages are very small
514    // TODO: Test this case
515    scheduled: VecDeque<Box<dyn LogicalPageDecoder>>,
516    // Rows that have been awaited
517    rows_loaded: u64,
518    // Rows that have drained
519    rows_drained: u64,
520    // Rows that have been popped (the decoder has been completely drained and removed from `scheduled`)
521    rows_popped: u64,
522    // Total number of rows in the struct
523    num_rows: u64,
524    // The field index in the struct (used for debugging / logging)
525    field_index: u32,
526}
527
528struct CompositeDecodeTask {
529    // One per child
530    tasks: Vec<Box<dyn DecodeArrayTask>>,
531    num_rows: u64,
532    has_more: bool,
533}
534
535impl CompositeDecodeTask {
536    fn decode(self) -> Result<ArrayRef> {
537        let arrays = self
538            .tasks
539            .into_iter()
540            .map(|task| task.decode())
541            .collect::<Result<Vec<_>>>()?;
542        let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
543        // TODO: If this is a primitive column we should be able to avoid this
544        // allocation + copy with "page bridging" which could save us a few CPU
545        // cycles.
546        //
547        // This optimization is probably most important for super fast storage like NVME
548        // where the page size can be smaller.
549        Ok(arrow_select::concat::concat(&array_refs)?)
550    }
551}
552
553impl ChildState {
554    fn new(num_rows: u64, field_index: u32) -> Self {
555        Self {
556            scheduled: VecDeque::new(),
557            rows_loaded: 0,
558            rows_drained: 0,
559            rows_popped: 0,
560            num_rows,
561            field_index,
562        }
563    }
564
565    // Wait for the next set of rows to arrive
566    //
567    // Wait until we have at least `loaded_need` loaded and stop as soon as we
568    // go above that limit.
569    async fn wait_for_loaded(&mut self, loaded_need: u64) -> Result<()> {
570        trace!(
571            "Struct child {} waiting for more than {} rows to be loaded and {} are fully loaded already",
572            self.field_index,
573            loaded_need,
574            self.rows_loaded,
575        );
576        let mut fully_loaded = self.rows_popped;
577        for (page_idx, next_decoder) in self.scheduled.iter_mut().enumerate() {
578            if next_decoder.rows_unloaded() > 0 {
579                let mut current_need = loaded_need;
580                current_need -= fully_loaded;
581                let rows_in_page = next_decoder.num_rows();
582                let need_for_page = (rows_in_page - 1).min(current_need);
583                trace!(
584                    "Struct child {} page {} will wait until more than {} rows loaded from page with {} rows",
585                    self.field_index,
586                    page_idx,
587                    need_for_page,
588                    rows_in_page,
589                );
590                // We might only await part of a page.  This is important for things
591                // like the struct<struct<...>> case where we have one outer page, one
592                // middle page, and then a bunch of inner pages.  If we await the entire
593                // middle page then we will have to wait for all the inner pages to arrive
594                // before we can start decoding.
595                next_decoder.wait_for_loaded(need_for_page).await?;
596                let now_loaded = next_decoder.rows_loaded();
597                fully_loaded += now_loaded;
598                trace!(
599                    "Struct child {} page {} await and now has {} loaded rows and we have {} fully loaded",
600                    self.field_index,
601                    page_idx,
602                    now_loaded,
603                    fully_loaded
604                );
605            } else {
606                fully_loaded += next_decoder.num_rows();
607            }
608            if fully_loaded > loaded_need {
609                break;
610            }
611        }
612        self.rows_loaded = fully_loaded;
613        trace!(
614            "Struct child {} loaded {} new rows and now {} are loaded",
615            self.field_index,
616            fully_loaded,
617            self.rows_loaded
618        );
619        Ok(())
620    }
621
622    fn drain(&mut self, num_rows: u64) -> Result<CompositeDecodeTask> {
623        trace!("Struct draining {} rows", num_rows);
624
625        trace!(
626            "Draining {} rows from struct page with {} rows already drained",
627            num_rows,
628            self.rows_drained
629        );
630        let mut remaining = num_rows;
631        let mut composite = CompositeDecodeTask {
632            tasks: Vec::new(),
633            num_rows: 0,
634            has_more: true,
635        };
636        while remaining > 0 {
637            let next = self.scheduled.front_mut().unwrap();
638            let rows_to_take = remaining.min(next.rows_left());
639            let next_task = next.drain(rows_to_take)?;
640            if next.rows_left() == 0 {
641                trace!("Completely drained page");
642                self.rows_popped += next.num_rows();
643                self.scheduled.pop_front();
644            }
645            remaining -= rows_to_take;
646            composite.tasks.push(next_task.task);
647            composite.num_rows += next_task.num_rows;
648        }
649        self.rows_drained += num_rows;
650        composite.has_more = self.rows_drained != self.num_rows;
651        Ok(composite)
652    }
653}
654
655// Wrapper around ChildState that orders using rows_unawaited
656struct WaitOrder<'a>(&'a mut ChildState);
657
658impl Eq for WaitOrder<'_> {}
659impl PartialEq for WaitOrder<'_> {
660    fn eq(&self, other: &Self) -> bool {
661        self.0.rows_loaded == other.0.rows_loaded
662    }
663}
664impl Ord for WaitOrder<'_> {
665    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
666        // Note: this is inverted so we have a min-heap
667        other.0.rows_loaded.cmp(&self.0.rows_loaded)
668    }
669}
670impl PartialOrd for WaitOrder<'_> {
671    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
672        Some(self.cmp(other))
673    }
674}
675
676#[derive(Debug)]
677pub struct StructuralStructDecoder {
678    children: Vec<Box<dyn StructuralFieldDecoder>>,
679    data_type: DataType,
680    child_fields: Fields,
681    // The root decoder is slightly different because it cannot have nulls
682    is_root: bool,
683}
684
685impl StructuralStructDecoder {
686    pub fn new(fields: Fields, should_validate: bool, is_root: bool) -> Self {
687        let children = fields
688            .iter()
689            .map(|field| Self::field_to_decoder(field, should_validate))
690            .collect();
691        let data_type = DataType::Struct(fields.clone());
692        Self {
693            data_type,
694            children,
695            child_fields: fields,
696            is_root,
697        }
698    }
699
700    fn field_to_decoder(
701        field: &Arc<arrow_schema::Field>,
702        should_validate: bool,
703    ) -> Box<dyn StructuralFieldDecoder> {
704        match field.data_type() {
705            DataType::Struct(fields) => {
706                if field.is_packed_struct() {
707                    let decoder =
708                        StructuralPrimitiveFieldDecoder::new(&field.clone(), should_validate);
709                    Box::new(decoder)
710                } else {
711                    Box::new(Self::new(fields.clone(), should_validate, false))
712                }
713            }
714            DataType::List(child_field) | DataType::LargeList(child_field) => {
715                let child_decoder = Self::field_to_decoder(child_field, should_validate);
716                Box::new(StructuralListDecoder::new(
717                    child_decoder,
718                    field.data_type().clone(),
719                ))
720            }
721            DataType::RunEndEncoded(_, _) => todo!(),
722            DataType::ListView(_) | DataType::LargeListView(_) => todo!(),
723            DataType::Map(_, _) => todo!(),
724            DataType::Union(_, _) => todo!(),
725            _ => Box::new(StructuralPrimitiveFieldDecoder::new(field, should_validate)),
726        }
727    }
728
729    pub fn drain_batch_task(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
730        let array_drain = self.drain(num_rows)?;
731        Ok(NextDecodeTask {
732            num_rows,
733            task: Box::new(array_drain),
734        })
735    }
736}
737
738impl StructuralFieldDecoder for StructuralStructDecoder {
739    fn accept_page(&mut self, mut child: LoadedPage) -> Result<()> {
740        // children with empty path should not be delivered to this method
741        let child_idx = child.path.pop_front().unwrap();
742        // This decoder is intended for one of our children
743        self.children[child_idx as usize].accept_page(child)?;
744        Ok(())
745    }
746
747    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
748        let child_tasks = self
749            .children
750            .iter_mut()
751            .map(|child| child.drain(num_rows))
752            .collect::<Result<Vec<_>>>()?;
753        Ok(Box::new(RepDefStructDecodeTask {
754            children: child_tasks,
755            child_fields: self.child_fields.clone(),
756            is_root: self.is_root,
757        }))
758    }
759
760    fn data_type(&self) -> &DataType {
761        &self.data_type
762    }
763}
764
765#[derive(Debug)]
766struct RepDefStructDecodeTask {
767    children: Vec<Box<dyn StructuralDecodeArrayTask>>,
768    child_fields: Fields,
769    is_root: bool,
770}
771
772impl StructuralDecodeArrayTask for RepDefStructDecodeTask {
773    fn decode(self: Box<Self>) -> Result<DecodedArray> {
774        let arrays = self
775            .children
776            .into_iter()
777            .map(|task| task.decode())
778            .collect::<Result<Vec<_>>>()?;
779        let mut children = Vec::with_capacity(arrays.len());
780        let mut arrays_iter = arrays.into_iter();
781        let first_array = arrays_iter.next().unwrap();
782        let length = first_array.array.len();
783
784        // The repdef should be identical across all children at this point
785        let mut repdef = first_array.repdef;
786        children.push(first_array.array);
787
788        for array in arrays_iter {
789            debug_assert_eq!(length, array.array.len());
790            children.push(array.array);
791        }
792
793        let validity = if self.is_root {
794            None
795        } else {
796            repdef.unravel_validity(length)
797        };
798        let array = StructArray::new(self.child_fields, children, validity);
799        Ok(DecodedArray {
800            array: Arc::new(array),
801            repdef,
802        })
803    }
804}
805
806#[derive(Debug)]
807pub struct SimpleStructDecoder {
808    children: Vec<ChildState>,
809    child_fields: Fields,
810    data_type: DataType,
811    num_rows: u64,
812}
813
814impl SimpleStructDecoder {
815    pub fn new(child_fields: Fields, num_rows: u64) -> Self {
816        let data_type = DataType::Struct(child_fields.clone());
817        Self {
818            children: child_fields
819                .iter()
820                .enumerate()
821                .map(|(idx, _)| ChildState::new(num_rows, idx as u32))
822                .collect(),
823            child_fields,
824            data_type,
825            num_rows,
826        }
827    }
828
829    async fn do_wait_for_loaded(&mut self, loaded_need: u64) -> Result<()> {
830        let mut wait_orders = self
831            .children
832            .iter_mut()
833            .filter_map(|child| {
834                if child.rows_loaded <= loaded_need {
835                    Some(WaitOrder(child))
836                } else {
837                    None
838                }
839            })
840            .collect::<BinaryHeap<_>>();
841        while !wait_orders.is_empty() {
842            let next_waiter = wait_orders.pop().unwrap();
843            let next_highest = wait_orders
844                .peek()
845                .map(|w| w.0.rows_loaded)
846                .unwrap_or(u64::MAX);
847            // Wait until you have the number of rows needed, or at least more than the
848            // next highest waiter
849            let limit = loaded_need.min(next_highest);
850            next_waiter.0.wait_for_loaded(limit).await?;
851            log::trace!(
852                "Struct child {} finished await pass and now {} are loaded",
853                next_waiter.0.field_index,
854                next_waiter.0.rows_loaded
855            );
856            if next_waiter.0.rows_loaded <= loaded_need {
857                wait_orders.push(next_waiter);
858            }
859        }
860        Ok(())
861    }
862}
863
864impl LogicalPageDecoder for SimpleStructDecoder {
865    fn accept_child(&mut self, mut child: DecoderReady) -> Result<()> {
866        // children with empty path should not be delivered to this method
867        let child_idx = child.path.pop_front().unwrap();
868        if child.path.is_empty() {
869            // This decoder is intended for us
870            self.children[child_idx as usize]
871                .scheduled
872                .push_back(child.decoder);
873        } else {
874            // This decoder is intended for one of our children
875            let intended = self.children[child_idx as usize].scheduled.back_mut().ok_or_else(|| Error::Internal { message: format!("Decoder scheduled for child at index {} but we don't have any child at that index yet", child_idx), location: location!() })?;
876            intended.accept_child(child)?;
877        }
878        Ok(())
879    }
880
881    fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<Result<()>> {
882        self.do_wait_for_loaded(loaded_need).boxed()
883    }
884
885    fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
886        let child_tasks = self
887            .children
888            .iter_mut()
889            .map(|child| child.drain(num_rows))
890            .collect::<Result<Vec<_>>>()?;
891        let num_rows = child_tasks[0].num_rows;
892        debug_assert!(child_tasks.iter().all(|task| task.num_rows == num_rows));
893        Ok(NextDecodeTask {
894            task: Box::new(SimpleStructDecodeTask {
895                children: child_tasks,
896                child_fields: self.child_fields.clone(),
897            }),
898            num_rows,
899        })
900    }
901
902    fn rows_loaded(&self) -> u64 {
903        self.children.iter().map(|c| c.rows_loaded).min().unwrap()
904    }
905
906    fn rows_drained(&self) -> u64 {
907        // All children should have the same number of rows drained
908        debug_assert!(self
909            .children
910            .iter()
911            .all(|c| c.rows_drained == self.children[0].rows_drained));
912        self.children[0].rows_drained
913    }
914
915    fn num_rows(&self) -> u64 {
916        self.num_rows
917    }
918
919    fn data_type(&self) -> &DataType {
920        &self.data_type
921    }
922}
923
924struct SimpleStructDecodeTask {
925    children: Vec<CompositeDecodeTask>,
926    child_fields: Fields,
927}
928
929impl DecodeArrayTask for SimpleStructDecodeTask {
930    fn decode(self: Box<Self>) -> Result<ArrayRef> {
931        let child_arrays = self
932            .children
933            .into_iter()
934            .map(|child| child.decode())
935            .collect::<Result<Vec<_>>>()?;
936        Ok(Arc::new(StructArray::try_new(
937            self.child_fields,
938            child_arrays,
939            None,
940        )?))
941    }
942}
943
944/// A structural encoder for struct fields
945///
946/// The struct's validity is added to the rep/def builder
947/// and the builder is cloned to all children.
948pub struct StructStructuralEncoder {
949    keep_original_array: bool,
950    children: Vec<Box<dyn FieldEncoder>>,
951}
952
953impl StructStructuralEncoder {
954    pub fn new(keep_original_array: bool, children: Vec<Box<dyn FieldEncoder>>) -> Self {
955        Self {
956            keep_original_array,
957            children,
958        }
959    }
960}
961
962impl FieldEncoder for StructStructuralEncoder {
963    fn maybe_encode(
964        &mut self,
965        array: ArrayRef,
966        external_buffers: &mut OutOfLineBuffers,
967        mut repdef: RepDefBuilder,
968        row_number: u64,
969        num_rows: u64,
970    ) -> Result<Vec<EncodeTask>> {
971        let struct_array = array.as_struct();
972        if let Some(validity) = struct_array.nulls() {
973            if self.keep_original_array {
974                repdef.add_validity_bitmap(validity.clone())
975            } else {
976                repdef.add_validity_bitmap(deep_copy_nulls(Some(validity)).unwrap())
977            }
978        } else {
979            repdef.add_no_null(struct_array.len());
980        }
981        let child_tasks = self
982            .children
983            .iter_mut()
984            .zip(struct_array.columns().iter())
985            .map(|(encoder, arr)| {
986                encoder.maybe_encode(
987                    arr.clone(),
988                    external_buffers,
989                    repdef.clone(),
990                    row_number,
991                    num_rows,
992                )
993            })
994            .collect::<Result<Vec<_>>>()?;
995        Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
996    }
997
998    fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
999        self.children
1000            .iter_mut()
1001            .map(|encoder| encoder.flush(external_buffers))
1002            .flatten_ok()
1003            .collect::<Result<Vec<_>>>()
1004    }
1005
1006    fn num_columns(&self) -> u32 {
1007        self.children
1008            .iter()
1009            .map(|child| child.num_columns())
1010            .sum::<u32>()
1011    }
1012
1013    fn finish(
1014        &mut self,
1015        external_buffers: &mut OutOfLineBuffers,
1016    ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
1017        let mut child_columns = self
1018            .children
1019            .iter_mut()
1020            .map(|child| child.finish(external_buffers))
1021            .collect::<FuturesOrdered<_>>();
1022        async move {
1023            let mut encoded_columns = Vec::with_capacity(child_columns.len());
1024            while let Some(child_cols) = child_columns.next().await {
1025                encoded_columns.extend(child_cols?);
1026            }
1027            Ok(encoded_columns)
1028        }
1029        .boxed()
1030    }
1031}
1032
1033pub struct StructFieldEncoder {
1034    children: Vec<Box<dyn FieldEncoder>>,
1035    column_index: u32,
1036    num_rows_seen: u64,
1037}
1038
1039impl StructFieldEncoder {
1040    #[allow(dead_code)]
1041    pub fn new(children: Vec<Box<dyn FieldEncoder>>, column_index: u32) -> Self {
1042        Self {
1043            children,
1044            column_index,
1045            num_rows_seen: 0,
1046        }
1047    }
1048}
1049
1050impl FieldEncoder for StructFieldEncoder {
1051    fn maybe_encode(
1052        &mut self,
1053        array: ArrayRef,
1054        external_buffers: &mut OutOfLineBuffers,
1055        repdef: RepDefBuilder,
1056        row_number: u64,
1057        num_rows: u64,
1058    ) -> Result<Vec<EncodeTask>> {
1059        self.num_rows_seen += array.len() as u64;
1060        let struct_array = array.as_struct();
1061        let child_tasks = self
1062            .children
1063            .iter_mut()
1064            .zip(struct_array.columns().iter())
1065            .map(|(encoder, arr)| {
1066                encoder.maybe_encode(
1067                    arr.clone(),
1068                    external_buffers,
1069                    repdef.clone(),
1070                    row_number,
1071                    num_rows,
1072                )
1073            })
1074            .collect::<Result<Vec<_>>>()?;
1075        Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
1076    }
1077
1078    fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
1079        let child_tasks = self
1080            .children
1081            .iter_mut()
1082            .map(|encoder| encoder.flush(external_buffers))
1083            .collect::<Result<Vec<_>>>()?;
1084        Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
1085    }
1086
1087    fn num_columns(&self) -> u32 {
1088        self.children
1089            .iter()
1090            .map(|child| child.num_columns())
1091            .sum::<u32>()
1092            + 1
1093    }
1094
1095    fn finish(
1096        &mut self,
1097        external_buffers: &mut OutOfLineBuffers,
1098    ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
1099        let mut child_columns = self
1100            .children
1101            .iter_mut()
1102            .map(|child| child.finish(external_buffers))
1103            .collect::<FuturesOrdered<_>>();
1104        let num_rows_seen = self.num_rows_seen;
1105        let column_index = self.column_index;
1106        async move {
1107            let mut columns = Vec::new();
1108            // Add a column for the struct header
1109            let mut header = EncodedColumn::default();
1110            header.final_pages.push(EncodedPage {
1111                data: Vec::new(),
1112                description: PageEncoding::Legacy(pb::ArrayEncoding {
1113                    array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
1114                        pb::SimpleStruct {},
1115                    )),
1116                }),
1117                num_rows: num_rows_seen,
1118                column_idx: column_index,
1119                row_number: 0, // Not used by legacy encoding
1120            });
1121            columns.push(header);
1122            // Now run finish on the children
1123            while let Some(child_cols) = child_columns.next().await {
1124                columns.extend(child_cols?);
1125            }
1126            Ok(columns)
1127        }
1128        .boxed()
1129    }
1130}
1131
1132#[cfg(test)]
1133mod tests {
1134
1135    use std::{collections::HashMap, sync::Arc};
1136
1137    use arrow_array::{
1138        builder::{Int32Builder, ListBuilder},
1139        Array, ArrayRef, Int32Array, StructArray,
1140    };
1141    use arrow_buffer::NullBuffer;
1142    use arrow_schema::{DataType, Field, Fields};
1143
1144    use crate::{
1145        testing::{check_round_trip_encoding_of_data, check_round_trip_encoding_random, TestCases},
1146        version::LanceFileVersion,
1147    };
1148
1149    #[test_log::test(tokio::test)]
1150    async fn test_simple_struct() {
1151        let data_type = DataType::Struct(Fields::from(vec![
1152            Field::new("a", DataType::Int32, false),
1153            Field::new("b", DataType::Int32, false),
1154        ]));
1155        let field = Field::new("", data_type, false);
1156        check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
1157    }
1158
1159    #[test_log::test(tokio::test)]
1160    async fn test_nullable_struct() {
1161        // Test data struct<score: int32, location: struct<x: int32, y: int32>>
1162        // - score: null
1163        //   location:
1164        //     x: 1
1165        //     y: 6
1166        // - score: 12
1167        //   location:
1168        //     x: 2
1169        //     y: null
1170        // - score: 13
1171        //   location:
1172        //     x: 3
1173        //     y: 8
1174        // - score: 14
1175        //   location: null
1176        // - null
1177        //
1178        let inner_fields = Fields::from(vec![
1179            Field::new("x", DataType::Int32, false),
1180            Field::new("y", DataType::Int32, true),
1181        ]);
1182        let inner_struct = DataType::Struct(inner_fields.clone());
1183        let outer_fields = Fields::from(vec![
1184            Field::new("score", DataType::Int32, true),
1185            Field::new("location", inner_struct, true),
1186        ]);
1187
1188        let x_vals = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4), Some(5)]);
1189        let y_vals = Int32Array::from(vec![Some(6), None, Some(8), Some(9), Some(10)]);
1190        let scores = Int32Array::from(vec![None, Some(12), Some(13), Some(14), Some(15)]);
1191
1192        let location_validity = NullBuffer::from(vec![true, true, true, false, true]);
1193        let locations = StructArray::new(
1194            inner_fields,
1195            vec![Arc::new(x_vals), Arc::new(y_vals)],
1196            Some(location_validity),
1197        );
1198
1199        let rows_validity = NullBuffer::from(vec![true, true, true, true, false]);
1200        let rows = StructArray::new(
1201            outer_fields,
1202            vec![Arc::new(scores), Arc::new(locations)],
1203            Some(rows_validity),
1204        );
1205
1206        let test_cases = TestCases::default().with_file_version(LanceFileVersion::V2_1);
1207
1208        check_round_trip_encoding_of_data(vec![Arc::new(rows)], &test_cases, HashMap::new()).await;
1209    }
1210
1211    #[test_log::test(tokio::test)]
1212    async fn test_struct_list() {
1213        let data_type = DataType::Struct(Fields::from(vec![
1214            Field::new(
1215                "inner_list",
1216                DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
1217                true,
1218            ),
1219            Field::new("outer_int", DataType::Int32, true),
1220        ]));
1221        let field = Field::new("row", data_type, false);
1222        check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
1223    }
1224
1225    #[test_log::test(tokio::test)]
1226    async fn test_empty_struct() {
1227        // It's technically legal for a struct to have 0 children, need to
1228        // make sure we support that
1229        let data_type = DataType::Struct(Fields::from(Vec::<Field>::default()));
1230        let field = Field::new("row", data_type, false);
1231        check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
1232    }
1233
1234    #[test_log::test(tokio::test)]
1235    async fn test_complicated_struct() {
1236        let data_type = DataType::Struct(Fields::from(vec![
1237            Field::new("int", DataType::Int32, true),
1238            Field::new(
1239                "inner",
1240                DataType::Struct(Fields::from(vec![
1241                    Field::new("inner_int", DataType::Int32, true),
1242                    Field::new(
1243                        "inner_list",
1244                        DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
1245                        true,
1246                    ),
1247                ])),
1248                true,
1249            ),
1250            Field::new("outer_binary", DataType::Binary, true),
1251        ]));
1252        let field = Field::new("row", data_type, false);
1253        check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
1254    }
1255
1256    #[test_log::test(tokio::test)]
1257    async fn test_ragged_scheduling() {
1258        // This test covers scheduling when batches straddle page boundaries
1259
1260        // Create a list with 10k nulls
1261        let items_builder = Int32Builder::new();
1262        let mut list_builder = ListBuilder::new(items_builder);
1263        for _ in 0..10000 {
1264            list_builder.append_null();
1265        }
1266        let list_array = Arc::new(list_builder.finish());
1267        let int_array = Arc::new(Int32Array::from_iter_values(0..10000));
1268        let fields = vec![
1269            Field::new("", list_array.data_type().clone(), true),
1270            Field::new("", int_array.data_type().clone(), true),
1271        ];
1272        let struct_array = Arc::new(StructArray::new(
1273            Fields::from(fields),
1274            vec![list_array, int_array],
1275            None,
1276        )) as ArrayRef;
1277        let struct_arrays = (0..10000)
1278            // Intentionally skip in some randomish amount to create more ragged scheduling
1279            .step_by(437)
1280            .map(|offset| struct_array.slice(offset, 437.min(10000 - offset)))
1281            .collect::<Vec<_>>();
1282        check_round_trip_encoding_of_data(struct_arrays, &TestCases::default(), HashMap::new())
1283            .await;
1284    }
1285}