Skip to main content

vortex_layout/
reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::Any;
5use std::collections::BTreeSet;
6use std::ops::Range;
7use std::sync::Arc;
8
9use futures::future::BoxFuture;
10use futures::try_join;
11use once_cell::sync::OnceCell;
12use vortex_array::ArrayRef;
13use vortex_array::IntoArray;
14use vortex_array::MaskFuture;
15use vortex_array::builtins::ArrayBuiltins;
16use vortex_array::dtype::DType;
17use vortex_array::dtype::FieldMask;
18use vortex_array::expr::Expression;
19use vortex_error::VortexResult;
20use vortex_error::vortex_bail;
21use vortex_mask::Mask;
22use vortex_session::VortexSession;
23
24use crate::children::LayoutChildren;
25use crate::segments::SegmentSource;
26
27pub type LayoutReaderRef = Arc<dyn LayoutReader>;
28
29/// A [`LayoutReader`] is used to read a [`crate::Layout`] in a way that can cache state across multiple
30/// evaluation operations.
31pub trait LayoutReader: 'static + Send + Sync {
32    /// Returns the name of the layout reader for debugging.
33    fn name(&self) -> &Arc<str>;
34
35    fn as_any(&self) -> &dyn Any;
36
37    /// Returns the un-projected dtype of the layout reader.
38    fn dtype(&self) -> &DType;
39
40    /// Returns the number of rows in the layout.
41    fn row_count(&self) -> u64;
42
43    /// Register the splits of this layout reader.
44    // TODO(ngates): this is a temporary API until we make layout readers stream based.
45    fn register_splits(
46        &self,
47        field_mask: &[FieldMask],
48        row_range: &Range<u64>,
49        splits: &mut BTreeSet<u64>,
50    ) -> VortexResult<()>;
51
52    /// Returns a mask where all false values are proven to be false in the given expression.
53    ///
54    /// The returned mask **does not** need to have been intersected with the input mask.
55    fn pruning_evaluation(
56        &self,
57        row_range: &Range<u64>,
58        expr: &Expression,
59        mask: Mask,
60    ) -> VortexResult<MaskFuture>;
61
62    /// Refines the given mask, returning a mask equal in length to the input mask.
63    ///
64    /// It is recommended to defer awaiting the input mask for as long as possible (ideally, after
65    /// all I/O is complete). This allows other conjuncts the opportunity to refine the mask as much
66    /// as possible before it is used.
67    ///
68    /// ## Post-conditions
69    ///
70    /// The returned mask **MUST** have been intersected with the input mask.
71    fn filter_evaluation(
72        &self,
73        row_range: &Range<u64>,
74        expr: &Expression,
75        mask: MaskFuture,
76    ) -> VortexResult<MaskFuture>;
77
78    /// Evaluates an expression against an array.
79    ///
80    /// It is recommended to defer awaiting the input mask for as long as possible (ideally, after
81    /// all I/O is complete). This allows other conjuncts the opportunity to refine the mask as much
82    /// as possible before it is used.
83    ///
84    /// ## Post-conditions
85    ///
86    /// The returned array **MUST** have length equal to the true count of the input mask.
87    fn projection_evaluation(
88        &self,
89        row_range: &Range<u64>,
90        expr: &Expression,
91        mask: MaskFuture,
92    ) -> VortexResult<ArrayFuture>;
93}
94
95pub type ArrayFuture = BoxFuture<'static, VortexResult<ArrayRef>>;
96
97pub trait ArrayFutureExt {
98    fn masked(self, mask: MaskFuture) -> Self;
99}
100
101impl ArrayFutureExt for ArrayFuture {
102    /// Returns a new `ArrayFuture` that masks the output with a mask
103    fn masked(self, mask: MaskFuture) -> Self {
104        Box::pin(async move {
105            let (array, mask) = try_join!(self, mask)?;
106            array.mask(mask.into_array())
107        })
108    }
109}
110
111pub struct LazyReaderChildren {
112    children: Arc<dyn LayoutChildren>,
113    dtypes: Vec<DType>,
114    names: Vec<Arc<str>>,
115    segment_source: Arc<dyn SegmentSource>,
116    session: VortexSession,
117    // TODO(ngates): we may want a hash map of some sort here?
118    cache: Vec<OnceCell<LayoutReaderRef>>,
119}
120
121impl LazyReaderChildren {
122    pub fn new(
123        children: Arc<dyn LayoutChildren>,
124        dtypes: Vec<DType>,
125        names: Vec<Arc<str>>,
126        segment_source: Arc<dyn SegmentSource>,
127        session: VortexSession,
128    ) -> Self {
129        let nchildren = children.nchildren();
130        let cache = (0..nchildren).map(|_| OnceCell::new()).collect();
131        Self {
132            children,
133            dtypes,
134            names,
135            segment_source,
136            session,
137            cache,
138        }
139    }
140
141    pub fn get(&self, idx: usize) -> VortexResult<&LayoutReaderRef> {
142        if idx >= self.cache.len() {
143            vortex_bail!("Child index out of bounds: {} of {}", idx, self.cache.len());
144        }
145
146        self.cache[idx].get_or_try_init(|| {
147            let dtype = &self.dtypes[idx];
148            let child = self.children.child(idx, dtype)?;
149            child.new_reader(
150                Arc::clone(&self.names[idx]),
151                Arc::clone(&self.segment_source),
152                &self.session,
153            )
154        })
155    }
156}