1use std::any::Any;
5use std::collections::BTreeSet;
6use std::ops::Range;
7use std::sync::Arc;
8
9use futures::future::BoxFuture;
10use futures::try_join;
11use once_cell::sync::OnceCell;
12use vortex_array::ArrayRef;
13use vortex_array::IntoArray;
14use vortex_array::MaskFuture;
15use vortex_array::builtins::ArrayBuiltins;
16use vortex_array::dtype::DType;
17use vortex_array::dtype::FieldMask;
18use vortex_array::expr::Expression;
19use vortex_error::VortexResult;
20use vortex_error::vortex_bail;
21use vortex_mask::Mask;
22use vortex_session::VortexSession;
23
24use crate::children::LayoutChildren;
25use crate::segments::SegmentSource;
26
27pub type LayoutReaderRef = Arc<dyn LayoutReader>;
28
29pub trait LayoutReader: 'static + Send + Sync {
32 fn name(&self) -> &Arc<str>;
34
35 fn as_any(&self) -> &dyn Any;
36
37 fn dtype(&self) -> &DType;
39
40 fn row_count(&self) -> u64;
42
43 fn register_splits(
46 &self,
47 field_mask: &[FieldMask],
48 row_range: &Range<u64>,
49 splits: &mut BTreeSet<u64>,
50 ) -> VortexResult<()>;
51
52 fn pruning_evaluation(
56 &self,
57 row_range: &Range<u64>,
58 expr: &Expression,
59 mask: Mask,
60 ) -> VortexResult<MaskFuture>;
61
62 fn filter_evaluation(
72 &self,
73 row_range: &Range<u64>,
74 expr: &Expression,
75 mask: MaskFuture,
76 ) -> VortexResult<MaskFuture>;
77
78 fn projection_evaluation(
88 &self,
89 row_range: &Range<u64>,
90 expr: &Expression,
91 mask: MaskFuture,
92 ) -> VortexResult<ArrayFuture>;
93}
94
95pub type ArrayFuture = BoxFuture<'static, VortexResult<ArrayRef>>;
96
97pub trait ArrayFutureExt {
98 fn masked(self, mask: MaskFuture) -> Self;
99}
100
101impl ArrayFutureExt for ArrayFuture {
102 fn masked(self, mask: MaskFuture) -> Self {
104 Box::pin(async move {
105 let (array, mask) = try_join!(self, mask)?;
106 array.mask(mask.into_array())
107 })
108 }
109}
110
111pub struct LazyReaderChildren {
112 children: Arc<dyn LayoutChildren>,
113 dtypes: Vec<DType>,
114 names: Vec<Arc<str>>,
115 segment_source: Arc<dyn SegmentSource>,
116 session: VortexSession,
117 cache: Vec<OnceCell<LayoutReaderRef>>,
119}
120
121impl LazyReaderChildren {
122 pub fn new(
123 children: Arc<dyn LayoutChildren>,
124 dtypes: Vec<DType>,
125 names: Vec<Arc<str>>,
126 segment_source: Arc<dyn SegmentSource>,
127 session: VortexSession,
128 ) -> Self {
129 let nchildren = children.nchildren();
130 let cache = (0..nchildren).map(|_| OnceCell::new()).collect();
131 Self {
132 children,
133 dtypes,
134 names,
135 segment_source,
136 session,
137 cache,
138 }
139 }
140
141 pub fn get(&self, idx: usize) -> VortexResult<&LayoutReaderRef> {
142 if idx >= self.cache.len() {
143 vortex_bail!("Child index out of bounds: {} of {}", idx, self.cache.len());
144 }
145
146 self.cache[idx].get_or_try_init(|| {
147 let dtype = &self.dtypes[idx];
148 let child = self.children.child(idx, dtype)?;
149 child.new_reader(
150 Arc::clone(&self.names[idx]),
151 Arc::clone(&self.segment_source),
152 &self.session,
153 )
154 })
155 }
156}