Skip to main content

vortex_layout/
reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::collections::BTreeSet;
5use std::ops::Range;
6use std::sync::Arc;
7
8use futures::future::BoxFuture;
9use futures::try_join;
10use once_cell::sync::OnceCell;
11use vortex_array::ArrayRef;
12use vortex_array::IntoArray;
13use vortex_array::MaskFuture;
14use vortex_array::builtins::ArrayBuiltins;
15use vortex_array::expr::Expression;
16use vortex_dtype::DType;
17use vortex_dtype::FieldMask;
18use vortex_error::VortexResult;
19use vortex_error::vortex_bail;
20use vortex_mask::Mask;
21use vortex_session::VortexSession;
22
23use crate::children::LayoutChildren;
24use crate::segments::SegmentSource;
25
26pub type LayoutReaderRef = Arc<dyn LayoutReader>;
27
28/// A [`LayoutReader`] is used to read a [`crate::Layout`] in a way that can cache state across multiple
29/// evaluation operations.
30pub trait LayoutReader: 'static + Send + Sync {
31    /// Returns the name of the layout reader for debugging.
32    fn name(&self) -> &Arc<str>;
33
34    /// Returns the un-projected dtype of the layout reader.
35    fn dtype(&self) -> &DType;
36
37    /// Returns the number of rows in the layout.
38    fn row_count(&self) -> u64;
39
40    /// Register the splits of this layout reader.
41    // TODO(ngates): this is a temporary API until we make layout readers stream based.
42    fn register_splits(
43        &self,
44        field_mask: &[FieldMask],
45        row_range: &Range<u64>,
46        splits: &mut BTreeSet<u64>,
47    ) -> VortexResult<()>;
48
49    /// Returns a mask where all false values are proven to be false in the given expression.
50    ///
51    /// The returned mask **does not** need to have been intersected with the input mask.
52    fn pruning_evaluation(
53        &self,
54        row_range: &Range<u64>,
55        expr: &Expression,
56        mask: Mask,
57    ) -> VortexResult<MaskFuture>;
58
59    /// Refines the given mask, returning a mask equal in length to the input mask.
60    ///
61    /// It is recommended to defer awaiting the input mask for as long as possible (ideally, after
62    /// all I/O is complete). This allows other conjuncts the opportunity to refine the mask as much
63    /// as possible before it is used.
64    ///
65    /// ## Post-conditions
66    ///
67    /// The returned mask **MUST** have been intersected with the input mask.
68    fn filter_evaluation(
69        &self,
70        row_range: &Range<u64>,
71        expr: &Expression,
72        mask: MaskFuture,
73    ) -> VortexResult<MaskFuture>;
74
75    /// Evaluates an expression against an array.
76    ///
77    /// It is recommended to defer awaiting the input mask for as long as possible (ideally, after
78    /// all I/O is complete). This allows other conjuncts the opportunity to refine the mask as much
79    /// as possible before it is used.
80    ///
81    /// ## Post-conditions
82    ///
83    /// The returned array **MUST** have length equal to the true count of the input mask.
84    fn projection_evaluation(
85        &self,
86        row_range: &Range<u64>,
87        expr: &Expression,
88        mask: MaskFuture,
89    ) -> VortexResult<ArrayFuture>;
90}
91
92pub type ArrayFuture = BoxFuture<'static, VortexResult<ArrayRef>>;
93
94pub trait ArrayFutureExt {
95    fn masked(self, mask: MaskFuture) -> Self;
96}
97
98impl ArrayFutureExt for ArrayFuture {
99    /// Returns a new `ArrayFuture` that masks the output with a mask
100    fn masked(self, mask: MaskFuture) -> Self {
101        Box::pin(async move {
102            let (array, mask) = try_join!(self, mask)?;
103            array.mask(mask.into_array())
104        })
105    }
106}
107
108pub struct LazyReaderChildren {
109    children: Arc<dyn LayoutChildren>,
110    dtypes: Vec<DType>,
111    names: Vec<Arc<str>>,
112    segment_source: Arc<dyn SegmentSource>,
113    session: VortexSession,
114    // TODO(ngates): we may want a hash map of some sort here?
115    cache: Vec<OnceCell<LayoutReaderRef>>,
116}
117
118impl LazyReaderChildren {
119    pub fn new(
120        children: Arc<dyn LayoutChildren>,
121        dtypes: Vec<DType>,
122        names: Vec<Arc<str>>,
123        segment_source: Arc<dyn SegmentSource>,
124        session: VortexSession,
125    ) -> Self {
126        let nchildren = children.nchildren();
127        let cache = (0..nchildren).map(|_| OnceCell::new()).collect();
128        Self {
129            children,
130            dtypes,
131            names,
132            segment_source,
133            session,
134            cache,
135        }
136    }
137
138    pub fn get(&self, idx: usize) -> VortexResult<&LayoutReaderRef> {
139        if idx >= self.cache.len() {
140            vortex_bail!("Child index out of bounds: {} of {}", idx, self.cache.len());
141        }
142
143        self.cache[idx].get_or_try_init(|| {
144            let dtype = &self.dtypes[idx];
145            let child = self.children.child(idx, dtype)?;
146            child.new_reader(
147                Arc::clone(&self.names[idx]),
148                self.segment_source.clone(),
149                &self.session,
150            )
151        })
152    }
153}