1use std::ops::{Deref, Range};
2use std::sync::{Arc, OnceLock};
3
4use async_trait::async_trait;
5use futures::FutureExt;
6use futures::future::{BoxFuture, Shared};
7use vortex_array::{ArrayContext, ArrayRef};
8use vortex_dtype::DType;
9use vortex_error::{SharedVortexResult, VortexError, VortexResult, vortex_bail};
10use vortex_expr::ExprRef;
11use vortex_mask::Mask;
12
13use crate::Layout;
14use crate::children::LayoutChildren;
15use crate::segments::SegmentSource;
16
17pub type LayoutReaderRef = Arc<dyn LayoutReader>;
18
19pub trait LayoutReader: 'static + Send + Sync + Deref<Target = dyn Layout> {
24 fn name(&self) -> &Arc<str>;
26
27 fn pruning_evaluation(
29 &self,
30 row_range: &Range<u64>,
31 expr: &ExprRef,
32 ) -> VortexResult<Box<dyn PruningEvaluation>>;
33
34 fn filter_evaluation(
36 &self,
37 row_range: &Range<u64>,
38 expr: &ExprRef,
39 ) -> VortexResult<Box<dyn MaskEvaluation>>;
40
41 fn projection_evaluation(
43 &self,
44 row_range: &Range<u64>,
45 expr: &ExprRef,
46 ) -> VortexResult<Box<dyn ArrayEvaluation>>;
47}
48
49pub type MaskFuture = Shared<BoxFuture<'static, SharedVortexResult<Mask>>>;
50
51pub fn mask_future_ready(mask: Mask) -> MaskFuture {
53 async move { Ok::<_, Arc<VortexError>>(mask) }
54 .boxed()
55 .shared()
56}
57
58#[async_trait]
59pub trait PruningEvaluation: 'static + Send + Sync {
60 async fn invoke(&self, mask: Mask) -> VortexResult<Mask>;
61}
62
63pub struct NoOpPruningEvaluation;
64
65#[async_trait]
66impl PruningEvaluation for NoOpPruningEvaluation {
67 async fn invoke(&self, mask: Mask) -> VortexResult<Mask> {
68 Ok(mask)
69 }
70}
71
72#[async_trait]
74pub trait MaskEvaluation: 'static + Send + Sync {
75 async fn invoke(&self, mask: Mask) -> VortexResult<Mask>;
76}
77
78#[async_trait]
81pub trait ArrayEvaluation: 'static + Send + Sync {
82 async fn invoke(&self, mask: Mask) -> VortexResult<ArrayRef>;
83}
84
85pub struct LazyReaderChildren {
86 children: Arc<dyn LayoutChildren>,
87 segment_source: Arc<dyn SegmentSource>,
88 ctx: ArrayContext,
89
90 cache: Vec<OnceLock<LayoutReaderRef>>,
92}
93
94impl LazyReaderChildren {
95 pub fn new(
96 children: Arc<dyn LayoutChildren>,
97 segment_source: Arc<dyn SegmentSource>,
98 ctx: ArrayContext,
99 ) -> Self {
100 let nchildren = children.nchildren();
101 let cache = (0..nchildren).map(|_| OnceLock::new()).collect::<Vec<_>>();
102 Self {
103 children,
104 segment_source,
105 ctx,
106 cache,
107 }
108 }
109
110 pub fn get(
111 &self,
112 idx: usize,
113 dtype: &DType,
114 name: &Arc<str>,
115 ) -> VortexResult<&LayoutReaderRef> {
116 if idx >= self.cache.len() {
117 vortex_bail!("Child index out of bounds: {} of {}", idx, self.cache.len());
118 }
119 self.cache[idx].get_or_try_init(|| {
120 let child = self.children.child(idx, dtype)?;
121 child.new_reader(name, &self.segment_source, &self.ctx)
122 })
123 }
124}