vortex_layout/
reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::collections::BTreeSet;
5use std::ops::Range;
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use futures::FutureExt;
10use futures::future::{BoxFuture, Shared};
11use once_cell::sync::OnceCell;
12use vortex_array::ArrayRef;
13use vortex_array::stats::Precision;
14use vortex_dtype::{DType, FieldMask};
15use vortex_error::{SharedVortexResult, VortexError, VortexResult, vortex_bail};
16use vortex_expr::ExprRef;
17use vortex_mask::Mask;
18
19use crate::children::LayoutChildren;
20use crate::segments::SegmentSource;
21
22pub type LayoutReaderRef = Arc<dyn LayoutReader>;
23
24/// A [`LayoutReader`] is used to read a [`crate::Layout`] in a way that can cache state across multiple
25/// evaluation operations.
26pub trait LayoutReader: 'static + Send + Sync {
27    /// Returns the name of the layout reader for debugging.
28    fn name(&self) -> &Arc<str>;
29
30    /// Returns the un-projected dtype of the layout reader.
31    fn dtype(&self) -> &DType;
32
33    /// Returns the number of rows in the layout reader.
34    /// An inexact count may be larger or smaller than the actual row count.
35    fn row_count(&self) -> Precision<u64>;
36
37    /// Register the splits of this layout reader.
38    // TODO(ngates): this is a temporary API until we make layout readers stream based.
39    fn register_splits(
40        &self,
41        field_mask: &[FieldMask],
42        row_offset: u64,
43        splits: &mut BTreeSet<u64>,
44    ) -> VortexResult<()>;
45
46    /// Performs an approximate evaluation of the expression against the layout reader.
47    fn pruning_evaluation(
48        &self,
49        row_range: &Range<u64>,
50        expr: &ExprRef,
51    ) -> VortexResult<Box<dyn PruningEvaluation>>;
52
53    /// Performs an exact evaluation of the expression against the layout reader.
54    fn filter_evaluation(
55        &self,
56        row_range: &Range<u64>,
57        expr: &ExprRef,
58    ) -> VortexResult<Box<dyn MaskEvaluation>>;
59
60    /// Evaluates the expression against the layout.
61    fn projection_evaluation(
62        &self,
63        row_range: &Range<u64>,
64        expr: &ExprRef,
65    ) -> VortexResult<Box<dyn ArrayEvaluation>>;
66}
67
68pub type MaskFuture = Shared<BoxFuture<'static, SharedVortexResult<Mask>>>;
69
70/// Create a resolved [`MaskFuture`] from a [`Mask`].
71pub fn mask_future_ready(mask: Mask) -> MaskFuture {
72    async move { Ok::<_, Arc<VortexError>>(mask) }
73        .boxed()
74        .shared()
75}
76
77/// Returns a mask where all false values are proven to be false in the given expression.
78///
79/// The returned mask **does not** need to have been intersected with the input mask.
80#[async_trait]
81pub trait PruningEvaluation: 'static + Send + Sync {
82    async fn invoke(&self, mask: Mask) -> VortexResult<Mask>;
83}
84
85pub struct NoOpPruningEvaluation;
86
87#[async_trait]
88impl PruningEvaluation for NoOpPruningEvaluation {
89    async fn invoke(&self, mask: Mask) -> VortexResult<Mask> {
90        Ok(mask)
91    }
92}
93
94/// Refines the given mask, returning a mask equal in length to the input mask.
95///
96/// ## Post-conditions
97///
98/// The returned mask **MUST** have been intersected with the input mask.
99#[async_trait]
100pub trait MaskEvaluation: 'static + Send + Sync {
101    async fn invoke(&self, mask: Mask) -> VortexResult<Mask>;
102}
103
104pub struct NoOpMaskEvaluation;
105
106#[async_trait]
107impl MaskEvaluation for NoOpMaskEvaluation {
108    async fn invoke(&self, mask: Mask) -> VortexResult<Mask> {
109        Ok(mask)
110    }
111}
112
113/// Evaluates an expression against an array, returning an array equal in length to the true count
114/// of the input mask.
115#[async_trait]
116pub trait ArrayEvaluation: 'static + Send + Sync {
117    async fn invoke(&self, mask: Mask) -> VortexResult<ArrayRef>;
118}
119
120pub struct LazyReaderChildren {
121    children: Arc<dyn LayoutChildren>,
122    segment_source: Arc<dyn SegmentSource>,
123
124    // TODO(ngates): we may want a hash map of some sort here?
125    cache: Vec<OnceCell<LayoutReaderRef>>,
126}
127
128impl LazyReaderChildren {
129    pub fn new(children: Arc<dyn LayoutChildren>, segment_source: Arc<dyn SegmentSource>) -> Self {
130        let nchildren = children.nchildren();
131        let cache = (0..nchildren).map(|_| OnceCell::new()).collect();
132        Self {
133            children,
134            segment_source,
135            cache,
136        }
137    }
138
139    pub fn get(
140        &self,
141        idx: usize,
142        dtype: &DType,
143        name: &Arc<str>,
144    ) -> VortexResult<&LayoutReaderRef> {
145        if idx >= self.cache.len() {
146            vortex_bail!("Child index out of bounds: {} of {}", idx, self.cache.len());
147        }
148
149        self.cache[idx].get_or_try_init(|| {
150            let child = self.children.child(idx, dtype)?;
151            child.new_reader(name.clone(), self.segment_source.clone())
152        })
153    }
154}