vortex_layout/
reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::collections::BTreeSet;
5use std::ops::Range;
6use std::sync::Arc;
7
8use futures::future::BoxFuture;
9use once_cell::sync::OnceCell;
10use vortex_array::stats::Precision;
11use vortex_array::{ArrayRef, MaskFuture};
12use vortex_dtype::{DType, FieldMask};
13use vortex_error::{VortexResult, vortex_bail};
14use vortex_expr::ExprRef;
15use vortex_mask::Mask;
16
17use crate::children::LayoutChildren;
18use crate::segments::SegmentSource;
19
20pub type LayoutReaderRef = Arc<dyn LayoutReader>;
21
22/// A [`LayoutReader`] is used to read a [`crate::Layout`] in a way that can cache state across multiple
23/// evaluation operations.
24pub trait LayoutReader: 'static + Send + Sync {
25    /// Returns the name of the layout reader for debugging.
26    fn name(&self) -> &Arc<str>;
27
28    /// Returns the un-projected dtype of the layout reader.
29    fn dtype(&self) -> &DType;
30
31    /// Returns the number of rows in the layout reader.
32    /// An inexact count may be larger or smaller than the actual row count.
33    fn row_count(&self) -> Precision<u64>;
34
35    /// Register the splits of this layout reader.
36    // TODO(ngates): this is a temporary API until we make layout readers stream based.
37    fn register_splits(
38        &self,
39        field_mask: &[FieldMask],
40        row_offset: u64,
41        splits: &mut BTreeSet<u64>,
42    ) -> VortexResult<()>;
43
44    /// Returns a mask where all false values are proven to be false in the given expression.
45    ///
46    /// The returned mask **does not** need to have been intersected with the input mask.
47    fn pruning_evaluation(
48        &self,
49        row_range: &Range<u64>,
50        expr: &ExprRef,
51        mask: Mask,
52    ) -> VortexResult<MaskFuture>;
53
54    /// Refines the given mask, returning a mask equal in length to the input mask.
55    ///
56    /// It is recommended to defer awaiting the input mask for as long as possible (ideally, after
57    /// all I/O is complete). This allows other conjuncts the opportunity to refine the mask as much
58    /// as possible before it is used.
59    ///
60    /// ## Post-conditions
61    ///
62    /// The returned mask **MUST** have been intersected with the input mask.
63    fn filter_evaluation(
64        &self,
65        row_range: &Range<u64>,
66        expr: &ExprRef,
67        mask: MaskFuture,
68    ) -> VortexResult<MaskFuture>;
69
70    /// Evaluates an expression against an array.
71    ///
72    /// It is recommended to defer awaiting the input mask for as long as possible (ideally, after
73    /// all I/O is complete). This allows other conjuncts the opportunity to refine the mask as much
74    /// as possible before it is used.
75    ///
76    /// ## Post-conditions
77    ///
78    /// The returned array **MUST** have length equal to the true count of the input mask.
79    fn projection_evaluation(
80        &self,
81        row_range: &Range<u64>,
82        expr: &ExprRef,
83        mask: MaskFuture,
84    ) -> VortexResult<ArrayFuture>;
85}
86
87pub type ArrayFuture = BoxFuture<'static, VortexResult<ArrayRef>>;
88
89pub struct LazyReaderChildren {
90    children: Arc<dyn LayoutChildren>,
91    segment_source: Arc<dyn SegmentSource>,
92
93    // TODO(ngates): we may want a hash map of some sort here?
94    cache: Vec<OnceCell<LayoutReaderRef>>,
95}
96
97impl LazyReaderChildren {
98    pub fn new(children: Arc<dyn LayoutChildren>, segment_source: Arc<dyn SegmentSource>) -> Self {
99        let nchildren = children.nchildren();
100        let cache = (0..nchildren).map(|_| OnceCell::new()).collect();
101        Self {
102            children,
103            segment_source,
104            cache,
105        }
106    }
107
108    pub fn get(
109        &self,
110        idx: usize,
111        dtype: &DType,
112        name: &Arc<str>,
113    ) -> VortexResult<&LayoutReaderRef> {
114        if idx >= self.cache.len() {
115            vortex_bail!("Child index out of bounds: {} of {}", idx, self.cache.len());
116        }
117
118        self.cache[idx].get_or_try_init(|| {
119            let child = self.children.child(idx, dtype)?;
120            child.new_reader(name.clone(), self.segment_source.clone())
121        })
122    }
123}