Skip to main content

vortex_layout/
reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::Any;
5use std::collections::BTreeSet;
6use std::ops::Range;
7use std::sync::Arc;
8
9use futures::future::BoxFuture;
10use futures::try_join;
11use once_cell::sync::OnceCell;
12use vortex_array::ArrayRef;
13use vortex_array::IntoArray;
14use vortex_array::MaskFuture;
15use vortex_array::builtins::ArrayBuiltins;
16use vortex_array::dtype::DType;
17use vortex_array::dtype::FieldMask;
18use vortex_array::expr::Expression;
19use vortex_error::VortexResult;
20use vortex_error::vortex_bail;
21use vortex_mask::Mask;
22use vortex_session::VortexSession;
23
24use crate::children::LayoutChildren;
25use crate::segments::SegmentSource;
26
27pub type LayoutReaderRef = Arc<dyn LayoutReader>;
28
29/// A row range used when registering natural scan splits.
30///
31/// Row range is relative to the reader that receives it. Offset is the offset
32/// that the local row range needs to be shifted by to get the global row range.
33#[derive(Clone, Debug, Eq, PartialEq)]
34pub struct SplitRange {
35    row_offset: u64,
36    row_range: Range<u64>,
37}
38
39impl SplitRange {
40    /// Constructs a split range, returning an error if the local row range is invalid.
41    pub fn try_new(row_offset: u64, row_range: Range<u64>) -> VortexResult<Self> {
42        if row_range.start > row_range.end {
43            vortex_bail!("Invalid split range {:?}", row_range);
44        }
45
46        Ok(Self {
47            row_offset,
48            row_range,
49        })
50    }
51
52    /// Constructs a split range for the root layout.
53    pub fn root(row_range: Range<u64>) -> VortexResult<Self> {
54        Self::try_new(0, row_range)
55    }
56
57    /// The root-layout row offset of this reader's local row zero.
58    pub fn row_offset(&self) -> u64 {
59        self.row_offset
60    }
61
62    /// The local row range within this reader.
63    pub fn row_range(&self) -> &Range<u64> {
64        &self.row_range
65    }
66
67    /// The length of the local row range.
68    pub fn len(&self) -> u64 {
69        self.row_range.end - self.row_range.start
70    }
71
72    /// Returns `true` if the local row range is empty.
73    pub fn is_empty(&self) -> bool {
74        self.row_range.is_empty()
75    }
76
77    /// Returns the equivalent row range in the root layout's coordinate space.
78    pub fn root_row_range(&self) -> Range<u64> {
79        self.row_offset + self.row_range.start..self.row_offset + self.row_range.end
80    }
81
82    /// Returns an error if the local row range is outside the given row count.
83    pub fn check_bounds(&self, row_count: u64) -> VortexResult<()> {
84        if self.row_range.end > row_count {
85            vortex_bail!(
86                "Split range {:?} is out of bounds for row count {}",
87                self.row_range,
88                row_count
89            );
90        }
91
92        Ok(())
93    }
94}
95
96/// A [`LayoutReader`] is used to read a [`crate::Layout`] in a way that can cache state across multiple
97/// evaluation operations.
98pub trait LayoutReader: 'static + Send + Sync {
99    /// Returns the name of the layout reader for debugging.
100    fn name(&self) -> &Arc<str>;
101
102    fn as_any(&self) -> &dyn Any;
103
104    /// Returns the un-projected dtype of the layout reader.
105    fn dtype(&self) -> &DType;
106
107    /// Returns the number of rows in the layout.
108    fn row_count(&self) -> u64;
109
110    /// Register the splits of this layout reader.
111    // TODO(ngates): this is a temporary API until we make layout readers stream based.
112    fn register_splits(
113        &self,
114        field_mask: &[FieldMask],
115        split_range: &SplitRange,
116        splits: &mut BTreeSet<u64>,
117    ) -> VortexResult<()>;
118
119    /// Returns a mask where all false values are proven to be false in the given expression.
120    ///
121    /// The returned mask **does not** need to have been intersected with the input mask.
122    fn pruning_evaluation(
123        &self,
124        row_range: &Range<u64>,
125        expr: &Expression,
126        mask: Mask,
127    ) -> VortexResult<MaskFuture>;
128
129    /// Refines the given mask, returning a mask equal in length to the input mask.
130    ///
131    /// It is recommended to defer awaiting the input mask for as long as possible (ideally, after
132    /// all I/O is complete). This allows other conjuncts the opportunity to refine the mask as much
133    /// as possible before it is used.
134    ///
135    /// ## Post-conditions
136    ///
137    /// The returned mask **MUST** have been intersected with the input mask.
138    fn filter_evaluation(
139        &self,
140        row_range: &Range<u64>,
141        expr: &Expression,
142        mask: MaskFuture,
143    ) -> VortexResult<MaskFuture>;
144
145    /// Evaluates an expression against an array.
146    ///
147    /// It is recommended to defer awaiting the input mask for as long as possible (ideally, after
148    /// all I/O is complete). This allows other conjuncts the opportunity to refine the mask as much
149    /// as possible before it is used.
150    ///
151    /// ## Post-conditions
152    ///
153    /// The returned array **MUST** have length equal to the true count of the input mask.
154    fn projection_evaluation(
155        &self,
156        row_range: &Range<u64>,
157        expr: &Expression,
158        mask: MaskFuture,
159    ) -> VortexResult<ArrayFuture>;
160}
161
162pub type ArrayFuture = BoxFuture<'static, VortexResult<ArrayRef>>;
163
164pub trait ArrayFutureExt {
165    fn masked(self, mask: MaskFuture) -> Self;
166}
167
168impl ArrayFutureExt for ArrayFuture {
169    /// Returns a new `ArrayFuture` that masks the output with a mask
170    fn masked(self, mask: MaskFuture) -> Self {
171        Box::pin(async move {
172            let (array, mask) = try_join!(self, mask)?;
173            array.mask(mask.into_array())
174        })
175    }
176}
177
178pub struct LazyReaderChildren {
179    children: Arc<dyn LayoutChildren>,
180    dtypes: Vec<DType>,
181    names: Vec<Arc<str>>,
182    segment_source: Arc<dyn SegmentSource>,
183    session: VortexSession,
184    // TODO(ngates): we may want a hash map of some sort here?
185    cache: Vec<OnceCell<LayoutReaderRef>>,
186}
187
188impl LazyReaderChildren {
189    pub fn new(
190        children: Arc<dyn LayoutChildren>,
191        dtypes: Vec<DType>,
192        names: Vec<Arc<str>>,
193        segment_source: Arc<dyn SegmentSource>,
194        session: VortexSession,
195    ) -> Self {
196        let nchildren = children.nchildren();
197        let cache = (0..nchildren).map(|_| OnceCell::new()).collect();
198        Self {
199            children,
200            dtypes,
201            names,
202            segment_source,
203            session,
204            cache,
205        }
206    }
207
208    pub fn get(&self, idx: usize) -> VortexResult<&LayoutReaderRef> {
209        if idx >= self.cache.len() {
210            vortex_bail!("Child index out of bounds: {} of {}", idx, self.cache.len());
211        }
212
213        self.cache[idx].get_or_try_init(|| {
214            let dtype = &self.dtypes[idx];
215            let child = self.children.child(idx, dtype)?;
216            child.new_reader(
217                Arc::clone(&self.names[idx]),
218                Arc::clone(&self.segment_source),
219                &self.session,
220            )
221        })
222    }
223}