vortex_layout/
reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::collections::BTreeSet;
5use std::ops::Range;
6use std::sync::Arc;
7
8use futures::future::BoxFuture;
9use futures::try_join;
10use once_cell::sync::OnceCell;
11use vortex_array::ArrayRef;
12use vortex_array::MaskFuture;
13use vortex_array::expr::Expression;
14use vortex_dtype::DType;
15use vortex_dtype::FieldMask;
16use vortex_error::VortexResult;
17use vortex_error::vortex_bail;
18use vortex_mask::Mask;
19use vortex_session::VortexSession;
20
21use crate::children::LayoutChildren;
22use crate::segments::SegmentSource;
23
24pub type LayoutReaderRef = Arc<dyn LayoutReader>;
25
26/// A [`LayoutReader`] is used to read a [`crate::Layout`] in a way that can cache state across multiple
27/// evaluation operations.
28pub trait LayoutReader: 'static + Send + Sync {
29    /// Returns the name of the layout reader for debugging.
30    fn name(&self) -> &Arc<str>;
31
32    /// Returns the un-projected dtype of the layout reader.
33    fn dtype(&self) -> &DType;
34
35    /// Returns the number of rows in the layout.
36    fn row_count(&self) -> u64;
37
38    /// Register the splits of this layout reader.
39    // TODO(ngates): this is a temporary API until we make layout readers stream based.
40    fn register_splits(
41        &self,
42        field_mask: &[FieldMask],
43        row_range: &Range<u64>,
44        splits: &mut BTreeSet<u64>,
45    ) -> VortexResult<()>;
46
47    /// Returns a mask where all false values are proven to be false in the given expression.
48    ///
49    /// The returned mask **does not** need to have been intersected with the input mask.
50    fn pruning_evaluation(
51        &self,
52        row_range: &Range<u64>,
53        expr: &Expression,
54        mask: Mask,
55    ) -> VortexResult<MaskFuture>;
56
57    /// Refines the given mask, returning a mask equal in length to the input mask.
58    ///
59    /// It is recommended to defer awaiting the input mask for as long as possible (ideally, after
60    /// all I/O is complete). This allows other conjuncts the opportunity to refine the mask as much
61    /// as possible before it is used.
62    ///
63    /// ## Post-conditions
64    ///
65    /// The returned mask **MUST** have been intersected with the input mask.
66    fn filter_evaluation(
67        &self,
68        row_range: &Range<u64>,
69        expr: &Expression,
70        mask: MaskFuture,
71    ) -> VortexResult<MaskFuture>;
72
73    /// Evaluates an expression against an array.
74    ///
75    /// It is recommended to defer awaiting the input mask for as long as possible (ideally, after
76    /// all I/O is complete). This allows other conjuncts the opportunity to refine the mask as much
77    /// as possible before it is used.
78    ///
79    /// ## Post-conditions
80    ///
81    /// The returned array **MUST** have length equal to the true count of the input mask.
82    fn projection_evaluation(
83        &self,
84        row_range: &Range<u64>,
85        expr: &Expression,
86        mask: MaskFuture,
87    ) -> VortexResult<ArrayFuture>;
88}
89
90pub type ArrayFuture = BoxFuture<'static, VortexResult<ArrayRef>>;
91
92pub trait ArrayFutureExt {
93    fn masked(self, mask: MaskFuture) -> Self;
94}
95
96impl ArrayFutureExt for ArrayFuture {
97    /// Returns a new `ArrayFuture` that masks the output with a mask
98    fn masked(self, mask: MaskFuture) -> Self {
99        Box::pin(async move {
100            let (array, mask) = try_join!(self, mask)?;
101            vortex_array::compute::mask(array.as_ref(), &mask)
102        })
103    }
104}
105
106pub struct LazyReaderChildren {
107    children: Arc<dyn LayoutChildren>,
108    dtypes: Vec<DType>,
109    names: Vec<Arc<str>>,
110    segment_source: Arc<dyn SegmentSource>,
111    session: VortexSession,
112    // TODO(ngates): we may want a hash map of some sort here?
113    cache: Vec<OnceCell<LayoutReaderRef>>,
114}
115
116impl LazyReaderChildren {
117    pub fn new(
118        children: Arc<dyn LayoutChildren>,
119        dtypes: Vec<DType>,
120        names: Vec<Arc<str>>,
121        segment_source: Arc<dyn SegmentSource>,
122        session: VortexSession,
123    ) -> Self {
124        let nchildren = children.nchildren();
125        let cache = (0..nchildren).map(|_| OnceCell::new()).collect();
126        Self {
127            children,
128            dtypes,
129            names,
130            segment_source,
131            session,
132            cache,
133        }
134    }
135
136    pub fn get(&self, idx: usize) -> VortexResult<&LayoutReaderRef> {
137        if idx >= self.cache.len() {
138            vortex_bail!("Child index out of bounds: {} of {}", idx, self.cache.len());
139        }
140
141        self.cache[idx].get_or_try_init(|| {
142            let dtype = &self.dtypes[idx];
143            let child = self.children.child(idx, dtype)?;
144            child.new_reader(
145                Arc::clone(&self.names[idx]),
146                self.segment_source.clone(),
147                &self.session,
148            )
149        })
150    }
151}