Skip to main content

vortex_layout/
reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::Any;
5use std::ops::Range;
6use std::sync::Arc;
7
8use futures::future::BoxFuture;
9use futures::try_join;
10use once_cell::sync::OnceCell;
11use vortex_array::ArrayRef;
12use vortex_array::IntoArray;
13use vortex_array::MaskFuture;
14use vortex_array::builtins::ArrayBuiltins;
15use vortex_array::dtype::DType;
16use vortex_array::dtype::FieldMask;
17use vortex_array::expr::Expression;
18use vortex_error::VortexResult;
19use vortex_error::vortex_bail;
20use vortex_mask::Mask;
21use vortex_session::VortexSession;
22
23use crate::children::LayoutChildren;
24use crate::segments::SegmentSource;
25
26pub type LayoutReaderRef = Arc<dyn LayoutReader>;
27
28/// A row range used when registering natural scan splits.
29///
30/// Row range is relative to the reader that receives it. Offset is the offset
31/// that the local row range needs to be shifted by to get the global row range.
32#[derive(Clone, Debug, Eq, PartialEq)]
33pub struct SplitRange {
34    row_offset: u64,
35    row_range: Range<u64>,
36}
37
38impl SplitRange {
39    /// Constructs a split range, returning an error if the local row range is invalid.
40    pub fn try_new(row_offset: u64, row_range: Range<u64>) -> VortexResult<Self> {
41        if row_range.start > row_range.end {
42            vortex_bail!("Invalid split range {:?}", row_range);
43        }
44
45        Ok(Self {
46            row_offset,
47            row_range,
48        })
49    }
50
51    /// Constructs a split range for the root layout.
52    pub fn root(row_range: Range<u64>) -> VortexResult<Self> {
53        Self::try_new(0, row_range)
54    }
55
56    /// The root-layout row offset of this reader's local row zero.
57    pub fn row_offset(&self) -> u64 {
58        self.row_offset
59    }
60
61    /// The local row range within this reader.
62    pub fn row_range(&self) -> &Range<u64> {
63        &self.row_range
64    }
65
66    /// The length of the local row range.
67    pub fn len(&self) -> u64 {
68        self.row_range.end - self.row_range.start
69    }
70
71    /// Returns `true` if the local row range is empty.
72    pub fn is_empty(&self) -> bool {
73        self.row_range.is_empty()
74    }
75
76    /// Returns the equivalent row range in the root layout's coordinate space.
77    pub fn root_row_range(&self) -> Range<u64> {
78        self.row_offset + self.row_range.start..self.row_offset + self.row_range.end
79    }
80
81    /// Returns an error if the local row range is outside the given row count.
82    pub fn check_bounds(&self, row_count: u64) -> VortexResult<()> {
83        if self.row_range.end > row_count {
84            vortex_bail!(
85                "Split range {:?} is out of bounds for row count {}",
86                self.row_range,
87                row_count
88            );
89        }
90
91        Ok(())
92    }
93}
94
95/// A collection of row split points
96pub struct RowSplits(Vec<u64>);
97
98impl RowSplits {
99    /// Add row to splits
100    pub fn push(&mut self, row: u64) {
101        self.0.push(row);
102    }
103
104    /// Reserve space for "additional" elements
105    pub fn reserve(&mut self, additional: usize) {
106        self.0.reserve(additional);
107    }
108
109    /// Create a new RowSplits with preallocated "capacity"
110    pub(crate) fn new_capacity(capacity: usize) -> Self {
111        Self(Vec::with_capacity(capacity))
112    }
113
114    pub(crate) fn into_sorted_deduped(mut self) -> Vec<u64> {
115        self.0.sort_unstable();
116        self.0.dedup();
117        self.0.shrink_to_fit();
118        self.0
119    }
120}
121
122/// A [`LayoutReader`] is used to read a [`crate::Layout`] in a way that can cache state across multiple
123/// evaluation operations.
124pub trait LayoutReader: 'static + Send + Sync {
125    /// Returns the name of the layout reader for debugging.
126    fn name(&self) -> &Arc<str>;
127
128    fn as_any(&self) -> &dyn Any;
129
130    /// Returns the un-projected dtype of the layout reader.
131    fn dtype(&self) -> &DType;
132
133    /// Returns the number of rows in the layout.
134    fn row_count(&self) -> u64;
135
136    /// Register the splits of this layout reader.
137    // TODO(ngates): this is a temporary API until we make layout readers stream based.
138    fn register_splits(
139        &self,
140        field_mask: &[FieldMask],
141        split_range: &SplitRange,
142        splits: &mut RowSplits,
143    ) -> VortexResult<()>;
144
145    /// Returns a mask where all false values are proven to be false in the given expression.
146    ///
147    /// The returned mask **does not** need to have been intersected with the input mask.
148    fn pruning_evaluation(
149        &self,
150        row_range: &Range<u64>,
151        expr: &Expression,
152        mask: Mask,
153    ) -> VortexResult<MaskFuture>;
154
155    /// Refines the given mask, returning a mask equal in length to the input mask.
156    ///
157    /// It is recommended to defer awaiting the input mask for as long as possible (ideally, after
158    /// all I/O is complete). This allows other conjuncts the opportunity to refine the mask as much
159    /// as possible before it is used.
160    ///
161    /// ## Post-conditions
162    ///
163    /// The returned mask **MUST** have been intersected with the input mask.
164    fn filter_evaluation(
165        &self,
166        row_range: &Range<u64>,
167        expr: &Expression,
168        mask: MaskFuture,
169    ) -> VortexResult<MaskFuture>;
170
171    /// Evaluates an expression against an array.
172    ///
173    /// It is recommended to defer awaiting the input mask for as long as possible (ideally, after
174    /// all I/O is complete). This allows other conjuncts the opportunity to refine the mask as much
175    /// as possible before it is used.
176    ///
177    /// ## Post-conditions
178    ///
179    /// The returned array **MUST** have length equal to the true count of the input mask.
180    fn projection_evaluation(
181        &self,
182        row_range: &Range<u64>,
183        expr: &Expression,
184        mask: MaskFuture,
185    ) -> VortexResult<ArrayFuture>;
186}
187
188pub type ArrayFuture = BoxFuture<'static, VortexResult<ArrayRef>>;
189
190pub trait ArrayFutureExt {
191    fn masked(self, mask: MaskFuture) -> Self;
192}
193
194impl ArrayFutureExt for ArrayFuture {
195    /// Returns a new `ArrayFuture` that masks the output with a mask
196    fn masked(self, mask: MaskFuture) -> Self {
197        Box::pin(async move {
198            let (array, mask) = try_join!(self, mask)?;
199            array.mask(mask.into_array())
200        })
201    }
202}
203
204pub struct LazyReaderChildren {
205    children: Arc<dyn LayoutChildren>,
206    dtypes: Vec<DType>,
207    names: Vec<Arc<str>>,
208    segment_source: Arc<dyn SegmentSource>,
209    session: VortexSession,
210    // TODO(ngates): we may want a hash map of some sort here?
211    cache: Vec<OnceCell<LayoutReaderRef>>,
212}
213
214impl LazyReaderChildren {
215    pub fn new(
216        children: Arc<dyn LayoutChildren>,
217        dtypes: Vec<DType>,
218        names: Vec<Arc<str>>,
219        segment_source: Arc<dyn SegmentSource>,
220        session: VortexSession,
221    ) -> Self {
222        let nchildren = children.nchildren();
223        let cache = (0..nchildren).map(|_| OnceCell::new()).collect();
224        Self {
225            children,
226            dtypes,
227            names,
228            segment_source,
229            session,
230            cache,
231        }
232    }
233
234    pub fn get(&self, idx: usize) -> VortexResult<&LayoutReaderRef> {
235        if idx >= self.cache.len() {
236            vortex_bail!("Child index out of bounds: {} of {}", idx, self.cache.len());
237        }
238
239        self.cache[idx].get_or_try_init(|| {
240            let dtype = &self.dtypes[idx];
241            let child = self.children.child(idx, dtype)?;
242            child.new_reader(
243                Arc::clone(&self.names[idx]),
244                Arc::clone(&self.segment_source),
245                &self.session,
246            )
247        })
248    }
249}