vortex_layout/
reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::collections::BTreeSet;
5use std::ops::Range;
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use once_cell::sync::OnceCell;
10use vortex_array::ArrayRef;
11use vortex_array::stats::Precision;
12use vortex_dtype::{DType, FieldMask};
13use vortex_error::{VortexResult, vortex_bail};
14use vortex_expr::ExprRef;
15use vortex_mask::Mask;
16
17use crate::MaskFuture;
18use crate::children::LayoutChildren;
19use crate::segments::SegmentSource;
20
21pub type LayoutReaderRef = Arc<dyn LayoutReader>;
22
23/// A [`LayoutReader`] is used to read a [`crate::Layout`] in a way that can cache state across multiple
24/// evaluation operations.
25pub trait LayoutReader: 'static + Send + Sync {
26    /// Returns the name of the layout reader for debugging.
27    fn name(&self) -> &Arc<str>;
28
29    /// Returns the un-projected dtype of the layout reader.
30    fn dtype(&self) -> &DType;
31
32    /// Returns the number of rows in the layout reader.
33    /// An inexact count may be larger or smaller than the actual row count.
34    fn row_count(&self) -> Precision<u64>;
35
36    /// Register the splits of this layout reader.
37    // TODO(ngates): this is a temporary API until we make layout readers stream based.
38    fn register_splits(
39        &self,
40        field_mask: &[FieldMask],
41        row_offset: u64,
42        splits: &mut BTreeSet<u64>,
43    ) -> VortexResult<()>;
44
45    /// Performs an approximate evaluation of the expression against the layout reader.
46    fn pruning_evaluation(
47        &self,
48        row_range: &Range<u64>,
49        expr: &ExprRef,
50    ) -> VortexResult<Box<dyn PruningEvaluation>>;
51
52    /// Performs an exact evaluation of the expression against the layout reader.
53    fn filter_evaluation(
54        &self,
55        row_range: &Range<u64>,
56        expr: &ExprRef,
57    ) -> VortexResult<Box<dyn MaskEvaluation>>;
58
59    /// Evaluates the expression against the layout.
60    fn projection_evaluation(
61        &self,
62        row_range: &Range<u64>,
63        expr: &ExprRef,
64    ) -> VortexResult<Box<dyn ArrayEvaluation>>;
65}
66
67/// Returns a mask where all false values are proven to be false in the given expression.
68///
69/// The returned mask **does not** need to have been intersected with the input mask.
70#[async_trait]
71pub trait PruningEvaluation: 'static + Send + Sync {
72    async fn invoke(&self, mask: Mask) -> VortexResult<Mask>;
73}
74
75pub struct NoOpPruningEvaluation;
76
77#[async_trait]
78impl PruningEvaluation for NoOpPruningEvaluation {
79    async fn invoke(&self, mask: Mask) -> VortexResult<Mask> {
80        Ok(mask)
81    }
82}
83
84/// Refines the given mask, returning a mask equal in length to the input mask.
85///
86/// It is recommended to defer awaiting the input mask for as long as possible (ideally, after
87/// all I/O is complete). This allows other conjuncts the opportunity to refine the mask as much
88/// as possible before it is used.
89///
90/// ## Post-conditions
91///
92/// The returned mask **MUST** have been intersected with the input mask.
93#[async_trait]
94pub trait MaskEvaluation: 'static + Send + Sync {
95    async fn invoke(&self, mask: MaskFuture) -> VortexResult<Mask>;
96}
97
98pub struct NoOpMaskEvaluation;
99
100#[async_trait]
101impl MaskEvaluation for NoOpMaskEvaluation {
102    async fn invoke(&self, mask: MaskFuture) -> VortexResult<Mask> {
103        mask.await
104    }
105}
106
107/// Evaluates an expression against an array.
108///
109/// It is recommended to defer awaiting the input mask for as long as possible (ideally, after
110/// all I/O is complete). This allows other conjuncts the opportunity to refine the mask as much
111/// as possible before it is used.
112///
113/// ## Post-conditions
114///
115/// The returned array **MUST** have length equal to the true count of the input mask.
116#[async_trait]
117pub trait ArrayEvaluation: 'static + Send + Sync {
118    async fn invoke(&self, mask: MaskFuture) -> VortexResult<ArrayRef>;
119}
120
121pub struct LazyReaderChildren {
122    children: Arc<dyn LayoutChildren>,
123    segment_source: Arc<dyn SegmentSource>,
124
125    // TODO(ngates): we may want a hash map of some sort here?
126    cache: Vec<OnceCell<LayoutReaderRef>>,
127}
128
129impl LazyReaderChildren {
130    pub fn new(children: Arc<dyn LayoutChildren>, segment_source: Arc<dyn SegmentSource>) -> Self {
131        let nchildren = children.nchildren();
132        let cache = (0..nchildren).map(|_| OnceCell::new()).collect();
133        Self {
134            children,
135            segment_source,
136            cache,
137        }
138    }
139
140    pub fn get(
141        &self,
142        idx: usize,
143        dtype: &DType,
144        name: &Arc<str>,
145    ) -> VortexResult<&LayoutReaderRef> {
146        if idx >= self.cache.len() {
147            vortex_bail!("Child index out of bounds: {} of {}", idx, self.cache.len());
148        }
149
150        self.cache[idx].get_or_try_init(|| {
151            let child = self.children.child(idx, dtype)?;
152            child.new_reader(name.clone(), self.segment_source.clone())
153        })
154    }
155}