Skip to main content

vortex_layout/
reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::Any;
5use std::ops::Range;
6use std::sync::Arc;
7
8use futures::future::BoxFuture;
9use futures::try_join;
10use once_cell::sync::OnceCell;
11use vortex_array::ArrayRef;
12use vortex_array::IntoArray;
13use vortex_array::MaskFuture;
14use vortex_array::builtins::ArrayBuiltins;
15use vortex_array::dtype::DType;
16use vortex_array::dtype::FieldMask;
17use vortex_array::expr::Expression;
18use vortex_error::VortexResult;
19use vortex_error::vortex_bail;
20use vortex_mask::Mask;
21use vortex_session::VortexSession;
22
23use crate::LayoutReaderContext;
24use crate::children::LayoutChildren;
25use crate::segments::SegmentSource;
26
27pub type LayoutReaderRef = Arc<dyn LayoutReader>;
28
29/// A row range used when registering natural scan splits.
30///
31/// Row range is relative to the reader that receives it. Offset is the offset
32/// that the local row range needs to be shifted by to get the global row range.
33#[derive(Clone, Debug, Eq, PartialEq)]
34pub struct SplitRange {
35    row_offset: u64,
36    row_range: Range<u64>,
37}
38
39impl SplitRange {
40    /// Constructs a split range, returning an error if the local row range is invalid.
41    pub fn try_new(row_offset: u64, row_range: Range<u64>) -> VortexResult<Self> {
42        if row_range.start > row_range.end {
43            vortex_bail!("Invalid split range {:?}", row_range);
44        }
45
46        Ok(Self {
47            row_offset,
48            row_range,
49        })
50    }
51
52    /// Constructs a split range for the root layout.
53    pub fn root(row_range: Range<u64>) -> VortexResult<Self> {
54        Self::try_new(0, row_range)
55    }
56
57    /// The root-layout row offset of this reader's local row zero.
58    pub fn row_offset(&self) -> u64 {
59        self.row_offset
60    }
61
62    /// The local row range within this reader.
63    pub fn row_range(&self) -> &Range<u64> {
64        &self.row_range
65    }
66
67    /// The length of the local row range.
68    pub fn len(&self) -> u64 {
69        self.row_range.end - self.row_range.start
70    }
71
72    /// Returns `true` if the local row range is empty.
73    pub fn is_empty(&self) -> bool {
74        self.row_range.is_empty()
75    }
76
77    /// Returns the equivalent row range in the root layout's coordinate space.
78    pub fn root_row_range(&self) -> Range<u64> {
79        self.row_offset + self.row_range.start..self.row_offset + self.row_range.end
80    }
81
82    /// Returns an error if the local row range is outside the given row count.
83    pub fn check_bounds(&self, row_count: u64) -> VortexResult<()> {
84        if self.row_range.end > row_count {
85            vortex_bail!(
86                "Split range {:?} is out of bounds for row count {}",
87                self.row_range,
88                row_count
89            );
90        }
91
92        Ok(())
93    }
94}
95
96/// A collection of row split points
97pub struct RowSplits(Vec<u64>);
98
99impl RowSplits {
100    /// Add row to splits
101    pub fn push(&mut self, row: u64) {
102        self.0.push(row);
103    }
104
105    /// Reserve space for "additional" elements
106    pub fn reserve(&mut self, additional: usize) {
107        self.0.reserve(additional);
108    }
109
110    /// Create a new RowSplits with preallocated "capacity"
111    pub(crate) fn new_capacity(capacity: usize) -> Self {
112        Self(Vec::with_capacity(capacity))
113    }
114
115    pub(crate) fn into_sorted_deduped(mut self) -> Vec<u64> {
116        self.0.sort_unstable();
117        self.0.dedup();
118        self.0.shrink_to_fit();
119        self.0
120    }
121}
122
123/// A [`LayoutReader`] is used to read a [`crate::Layout`] in a way that can cache state across multiple
124/// evaluation operations.
125pub trait LayoutReader: 'static + Send + Sync {
126    /// Returns the name of the layout reader for debugging.
127    fn name(&self) -> &Arc<str>;
128
129    fn as_any(&self) -> &dyn Any;
130
131    /// Returns the un-projected dtype of the layout reader.
132    fn dtype(&self) -> &DType;
133
134    /// Returns the number of rows in the layout.
135    fn row_count(&self) -> u64;
136
137    /// Register the splits of this layout reader.
138    // TODO(ngates): this is a temporary API until we make layout readers stream based.
139    fn register_splits(
140        &self,
141        field_mask: &[FieldMask],
142        split_range: &SplitRange,
143        splits: &mut RowSplits,
144    ) -> VortexResult<()>;
145
146    /// Returns a mask where all false values are proven to be false in the given expression.
147    ///
148    /// The returned mask **does not** need to have been intersected with the input mask.
149    fn pruning_evaluation(
150        &self,
151        row_range: &Range<u64>,
152        expr: &Expression,
153        mask: Mask,
154    ) -> VortexResult<MaskFuture>;
155
156    /// Refines the given mask, returning a mask equal in length to the input mask.
157    ///
158    /// It is recommended to defer awaiting the input mask for as long as possible (ideally, after
159    /// all I/O is complete). This allows other conjuncts the opportunity to refine the mask as much
160    /// as possible before it is used.
161    ///
162    /// ## Post-conditions
163    ///
164    /// The returned mask **MUST** have been intersected with the input mask.
165    fn filter_evaluation(
166        &self,
167        row_range: &Range<u64>,
168        expr: &Expression,
169        mask: MaskFuture,
170    ) -> VortexResult<MaskFuture>;
171
172    /// Evaluates an expression against an array.
173    ///
174    /// It is recommended to defer awaiting the input mask for as long as possible (ideally, after
175    /// all I/O is complete). This allows other conjuncts the opportunity to refine the mask as much
176    /// as possible before it is used.
177    ///
178    /// ## Post-conditions
179    ///
180    /// The returned array **MUST** have length equal to the true count of the input mask.
181    fn projection_evaluation(
182        &self,
183        row_range: &Range<u64>,
184        expr: &Expression,
185        mask: MaskFuture,
186    ) -> VortexResult<ArrayFuture>;
187}
188
189pub type ArrayFuture = BoxFuture<'static, VortexResult<ArrayRef>>;
190
191pub trait ArrayFutureExt {
192    fn masked(self, mask: MaskFuture) -> Self;
193}
194
195impl ArrayFutureExt for ArrayFuture {
196    /// Returns a new `ArrayFuture` that masks the output with a mask
197    fn masked(self, mask: MaskFuture) -> Self {
198        Box::pin(async move {
199            let (array, mask) = try_join!(self, mask)?;
200            array.mask(mask.into_array())
201        })
202    }
203}
204
205pub struct LazyReaderChildren {
206    children: Arc<dyn LayoutChildren>,
207    dtypes: Vec<DType>,
208    names: Vec<Arc<str>>,
209    segment_source: Arc<dyn SegmentSource>,
210    session: VortexSession,
211    ctx: LayoutReaderContext,
212    // TODO(ngates): we may want a hash map of some sort here?
213    cache: Vec<OnceCell<LayoutReaderRef>>,
214}
215
216impl LazyReaderChildren {
217    pub fn new(
218        children: Arc<dyn LayoutChildren>,
219        dtypes: Vec<DType>,
220        names: Vec<Arc<str>>,
221        segment_source: Arc<dyn SegmentSource>,
222        session: VortexSession,
223        ctx: LayoutReaderContext,
224    ) -> Self {
225        let nchildren = children.nchildren();
226        let cache = (0..nchildren).map(|_| OnceCell::new()).collect();
227        Self {
228            children,
229            dtypes,
230            names,
231            segment_source,
232            session,
233            ctx,
234            cache,
235        }
236    }
237
238    pub fn get(&self, idx: usize) -> VortexResult<&LayoutReaderRef> {
239        if idx >= self.cache.len() {
240            vortex_bail!("Child index out of bounds: {} of {}", idx, self.cache.len());
241        }
242
243        self.cache[idx].get_or_try_init(|| {
244            let dtype = &self.dtypes[idx];
245            let child = self.children.child(idx, dtype)?;
246            child.new_reader(
247                Arc::clone(&self.names[idx]),
248                Arc::clone(&self.segment_source),
249                &self.session,
250                &self.ctx,
251            )
252        })
253    }
254}