vortex_layout/
reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::collections::BTreeSet;
5use std::ops::Range;
6use std::sync::Arc;
7
8use futures::future::BoxFuture;
9use futures::try_join;
10use once_cell::sync::OnceCell;
11use vortex_array::expr::Expression;
12use vortex_array::{ArrayRef, MaskFuture};
13use vortex_dtype::{DType, FieldMask};
14use vortex_error::{VortexResult, vortex_bail};
15use vortex_mask::Mask;
16
17use crate::children::LayoutChildren;
18use crate::segments::SegmentSource;
19
20pub type LayoutReaderRef = Arc<dyn LayoutReader>;
21
22/// A [`LayoutReader`] is used to read a [`crate::Layout`] in a way that can cache state across multiple
23/// evaluation operations.
24pub trait LayoutReader: 'static + Send + Sync {
25    /// Returns the name of the layout reader for debugging.
26    fn name(&self) -> &Arc<str>;
27
28    /// Returns the un-projected dtype of the layout reader.
29    fn dtype(&self) -> &DType;
30
31    /// Returns the number of rows in the layout.
32    fn row_count(&self) -> u64;
33
34    /// Register the splits of this layout reader.
35    // TODO(ngates): this is a temporary API until we make layout readers stream based.
36    fn register_splits(
37        &self,
38        field_mask: &[FieldMask],
39        row_range: &Range<u64>,
40        splits: &mut BTreeSet<u64>,
41    ) -> VortexResult<()>;
42
43    /// Returns a mask where all false values are proven to be false in the given expression.
44    ///
45    /// The returned mask **does not** need to have been intersected with the input mask.
46    fn pruning_evaluation(
47        &self,
48        row_range: &Range<u64>,
49        expr: &Expression,
50        mask: Mask,
51    ) -> VortexResult<MaskFuture>;
52
53    /// Refines the given mask, returning a mask equal in length to the input mask.
54    ///
55    /// It is recommended to defer awaiting the input mask for as long as possible (ideally, after
56    /// all I/O is complete). This allows other conjuncts the opportunity to refine the mask as much
57    /// as possible before it is used.
58    ///
59    /// ## Post-conditions
60    ///
61    /// The returned mask **MUST** have been intersected with the input mask.
62    fn filter_evaluation(
63        &self,
64        row_range: &Range<u64>,
65        expr: &Expression,
66        mask: MaskFuture,
67    ) -> VortexResult<MaskFuture>;
68
69    /// Evaluates an expression against an array.
70    ///
71    /// It is recommended to defer awaiting the input mask for as long as possible (ideally, after
72    /// all I/O is complete). This allows other conjuncts the opportunity to refine the mask as much
73    /// as possible before it is used.
74    ///
75    /// ## Post-conditions
76    ///
77    /// The returned array **MUST** have length equal to the true count of the input mask.
78    fn projection_evaluation(
79        &self,
80        row_range: &Range<u64>,
81        expr: &Expression,
82        mask: MaskFuture,
83    ) -> VortexResult<ArrayFuture>;
84}
85
86pub type ArrayFuture = BoxFuture<'static, VortexResult<ArrayRef>>;
87
88pub trait ArrayFutureExt {
89    fn masked(self, mask: MaskFuture) -> Self;
90}
91
92impl ArrayFutureExt for ArrayFuture {
93    /// Returns a new `ArrayFuture` that masks the output with a mask
94    fn masked(self, mask: MaskFuture) -> Self {
95        Box::pin(async move {
96            let (array, mask) = try_join!(self, mask)?;
97            vortex_array::compute::mask(array.as_ref(), &mask)
98        })
99    }
100}
101
102pub struct LazyReaderChildren {
103    children: Arc<dyn LayoutChildren>,
104    dtypes: Vec<DType>,
105    names: Vec<Arc<str>>,
106    segment_source: Arc<dyn SegmentSource>,
107    // TODO(ngates): we may want a hash map of some sort here?
108    cache: Vec<OnceCell<LayoutReaderRef>>,
109}
110
111impl LazyReaderChildren {
112    pub fn new(
113        children: Arc<dyn LayoutChildren>,
114        dtypes: Vec<DType>,
115        names: Vec<Arc<str>>,
116        segment_source: Arc<dyn SegmentSource>,
117    ) -> Self {
118        let nchildren = children.nchildren();
119        let cache = (0..nchildren).map(|_| OnceCell::new()).collect();
120        Self {
121            children,
122            dtypes,
123            names,
124            segment_source,
125            cache,
126        }
127    }
128
129    pub fn get(&self, idx: usize) -> VortexResult<&LayoutReaderRef> {
130        if idx >= self.cache.len() {
131            vortex_bail!("Child index out of bounds: {} of {}", idx, self.cache.len());
132        }
133
134        self.cache[idx].get_or_try_init(|| {
135            let dtype = &self.dtypes[idx];
136            let child = self.children.child(idx, dtype)?;
137            child.new_reader(Arc::clone(&self.names[idx]), self.segment_source.clone())
138        })
139    }
140}