vortex_layout/
reader.rs

1use std::ops::{Deref, Range};
2use std::sync::{Arc, OnceLock};
3
4use async_trait::async_trait;
5use futures::FutureExt;
6use futures::future::{BoxFuture, Shared};
7use vortex_array::{ArrayContext, ArrayRef};
8use vortex_dtype::DType;
9use vortex_error::{SharedVortexResult, VortexError, VortexResult, vortex_bail};
10use vortex_expr::ExprRef;
11use vortex_mask::Mask;
12
13use crate::Layout;
14use crate::children::LayoutChildren;
15use crate::segments::SegmentSource;
16
17pub type LayoutReaderRef = Arc<dyn LayoutReader>;
18
19/// A [`LayoutReader`] is used to read a [`Layout`] in a way that can cache state across multiple
20/// evaluation operations.
21///
22/// It dereferences into the underlying layout being read.
23pub trait LayoutReader: 'static + Send + Sync + Deref<Target = dyn Layout> {
24    /// Returns the name of the layout reader for debugging.
25    fn name(&self) -> &Arc<str>;
26
27    /// Performs an approximate evaluation of the expression against the layout reader.
28    fn pruning_evaluation(
29        &self,
30        row_range: &Range<u64>,
31        expr: &ExprRef,
32    ) -> VortexResult<Box<dyn PruningEvaluation>>;
33
34    /// Performs an exact evaluation of the expression against the layout reader.
35    fn filter_evaluation(
36        &self,
37        row_range: &Range<u64>,
38        expr: &ExprRef,
39    ) -> VortexResult<Box<dyn MaskEvaluation>>;
40
41    /// Evaluates the expression against the layout.
42    fn projection_evaluation(
43        &self,
44        row_range: &Range<u64>,
45        expr: &ExprRef,
46    ) -> VortexResult<Box<dyn ArrayEvaluation>>;
47}
48
49pub type MaskFuture = Shared<BoxFuture<'static, SharedVortexResult<Mask>>>;
50
51/// Create a resolved [`MaskFuture`] from a [`Mask`].
52pub fn mask_future_ready(mask: Mask) -> MaskFuture {
53    async move { Ok::<_, Arc<VortexError>>(mask) }
54        .boxed()
55        .shared()
56}
57
58#[async_trait]
59pub trait PruningEvaluation: 'static + Send + Sync {
60    async fn invoke(&self, mask: Mask) -> VortexResult<Mask>;
61}
62
63pub struct NoOpPruningEvaluation;
64
65#[async_trait]
66impl PruningEvaluation for NoOpPruningEvaluation {
67    async fn invoke(&self, mask: Mask) -> VortexResult<Mask> {
68        Ok(mask)
69    }
70}
71
72/// Refines the given mask, returning a mask equal in length to the input mask.
73#[async_trait]
74pub trait MaskEvaluation: 'static + Send + Sync {
75    async fn invoke(&self, mask: Mask) -> VortexResult<Mask>;
76}
77
78/// Evaluates an expression against an array, returning an array equal in length to the true count
79/// of the input mask.
80#[async_trait]
81pub trait ArrayEvaluation: 'static + Send + Sync {
82    async fn invoke(&self, mask: Mask) -> VortexResult<ArrayRef>;
83}
84
85pub struct LazyReaderChildren {
86    children: Arc<dyn LayoutChildren>,
87    segment_source: Arc<dyn SegmentSource>,
88    ctx: ArrayContext,
89
90    // TODO(ngates): we may want a hash map of some sort here?
91    cache: Vec<OnceLock<LayoutReaderRef>>,
92}
93
94impl LazyReaderChildren {
95    pub fn new(
96        children: Arc<dyn LayoutChildren>,
97        segment_source: Arc<dyn SegmentSource>,
98        ctx: ArrayContext,
99    ) -> Self {
100        let nchildren = children.nchildren();
101        let cache = (0..nchildren).map(|_| OnceLock::new()).collect::<Vec<_>>();
102        Self {
103            children,
104            segment_source,
105            ctx,
106            cache,
107        }
108    }
109
110    pub fn get(
111        &self,
112        idx: usize,
113        dtype: &DType,
114        name: &Arc<str>,
115    ) -> VortexResult<&LayoutReaderRef> {
116        if idx >= self.cache.len() {
117            vortex_bail!("Child index out of bounds: {} of {}", idx, self.cache.len());
118        }
119        self.cache[idx].get_or_try_init(|| {
120            let child = self.children.child(idx, dtype)?;
121            child.new_reader(name, &self.segment_source, &self.ctx)
122        })
123    }
124}