1use std::collections::BTreeSet;
5use std::ops::Range;
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use futures::FutureExt;
10use futures::future::{BoxFuture, Shared};
11use once_cell::sync::OnceCell;
12use vortex_array::ArrayRef;
13use vortex_array::stats::Precision;
14use vortex_dtype::{DType, FieldMask};
15use vortex_error::{SharedVortexResult, VortexError, VortexResult, vortex_bail};
16use vortex_expr::ExprRef;
17use vortex_mask::Mask;
18
19use crate::children::LayoutChildren;
20use crate::segments::SegmentSource;
21
22pub type LayoutReaderRef = Arc<dyn LayoutReader>;
23
24pub trait LayoutReader: 'static + Send + Sync {
27 fn name(&self) -> &Arc<str>;
29
30 fn dtype(&self) -> &DType;
32
33 fn row_count(&self) -> Precision<u64>;
36
37 fn register_splits(
40 &self,
41 field_mask: &[FieldMask],
42 row_offset: u64,
43 splits: &mut BTreeSet<u64>,
44 ) -> VortexResult<()>;
45
46 fn pruning_evaluation(
48 &self,
49 row_range: &Range<u64>,
50 expr: &ExprRef,
51 ) -> VortexResult<Box<dyn PruningEvaluation>>;
52
53 fn filter_evaluation(
55 &self,
56 row_range: &Range<u64>,
57 expr: &ExprRef,
58 ) -> VortexResult<Box<dyn MaskEvaluation>>;
59
60 fn projection_evaluation(
62 &self,
63 row_range: &Range<u64>,
64 expr: &ExprRef,
65 ) -> VortexResult<Box<dyn ArrayEvaluation>>;
66}
67
68pub type MaskFuture = Shared<BoxFuture<'static, SharedVortexResult<Mask>>>;
69
70pub fn mask_future_ready(mask: Mask) -> MaskFuture {
72 async move { Ok::<_, Arc<VortexError>>(mask) }
73 .boxed()
74 .shared()
75}
76
77#[async_trait]
81pub trait PruningEvaluation: 'static + Send + Sync {
82 async fn invoke(&self, mask: Mask) -> VortexResult<Mask>;
83}
84
85pub struct NoOpPruningEvaluation;
86
87#[async_trait]
88impl PruningEvaluation for NoOpPruningEvaluation {
89 async fn invoke(&self, mask: Mask) -> VortexResult<Mask> {
90 Ok(mask)
91 }
92}
93
94#[async_trait]
100pub trait MaskEvaluation: 'static + Send + Sync {
101 async fn invoke(&self, mask: Mask) -> VortexResult<Mask>;
102}
103
104pub struct NoOpMaskEvaluation;
105
106#[async_trait]
107impl MaskEvaluation for NoOpMaskEvaluation {
108 async fn invoke(&self, mask: Mask) -> VortexResult<Mask> {
109 Ok(mask)
110 }
111}
112
113#[async_trait]
116pub trait ArrayEvaluation: 'static + Send + Sync {
117 async fn invoke(&self, mask: Mask) -> VortexResult<ArrayRef>;
118}
119
120pub struct LazyReaderChildren {
121 children: Arc<dyn LayoutChildren>,
122 segment_source: Arc<dyn SegmentSource>,
123
124 cache: Vec<OnceCell<LayoutReaderRef>>,
126}
127
128impl LazyReaderChildren {
129 pub fn new(children: Arc<dyn LayoutChildren>, segment_source: Arc<dyn SegmentSource>) -> Self {
130 let nchildren = children.nchildren();
131 let cache = (0..nchildren).map(|_| OnceCell::new()).collect();
132 Self {
133 children,
134 segment_source,
135 cache,
136 }
137 }
138
139 pub fn get(
140 &self,
141 idx: usize,
142 dtype: &DType,
143 name: &Arc<str>,
144 ) -> VortexResult<&LayoutReaderRef> {
145 if idx >= self.cache.len() {
146 vortex_bail!("Child index out of bounds: {} of {}", idx, self.cache.len());
147 }
148
149 self.cache[idx].get_or_try_init(|| {
150 let child = self.children.child(idx, dtype)?;
151 child.new_reader(name.clone(), self.segment_source.clone())
152 })
153 }
154}