1use std::any::Any;
5use std::collections::BTreeSet;
6use std::ops::Range;
7use std::sync::Arc;
8
9use futures::future::BoxFuture;
10use futures::try_join;
11use once_cell::sync::OnceCell;
12use vortex_array::ArrayRef;
13use vortex_array::IntoArray;
14use vortex_array::MaskFuture;
15use vortex_array::builtins::ArrayBuiltins;
16use vortex_array::dtype::DType;
17use vortex_array::dtype::FieldMask;
18use vortex_array::expr::Expression;
19use vortex_error::VortexResult;
20use vortex_error::vortex_bail;
21use vortex_mask::Mask;
22use vortex_session::VortexSession;
23
24use crate::children::LayoutChildren;
25use crate::segments::SegmentSource;
26
27pub type LayoutReaderRef = Arc<dyn LayoutReader>;
28
29#[derive(Clone, Debug, Eq, PartialEq)]
34pub struct SplitRange {
35 row_offset: u64,
36 row_range: Range<u64>,
37}
38
39impl SplitRange {
40 pub fn try_new(row_offset: u64, row_range: Range<u64>) -> VortexResult<Self> {
42 if row_range.start > row_range.end {
43 vortex_bail!("Invalid split range {:?}", row_range);
44 }
45
46 Ok(Self {
47 row_offset,
48 row_range,
49 })
50 }
51
52 pub fn root(row_range: Range<u64>) -> VortexResult<Self> {
54 Self::try_new(0, row_range)
55 }
56
57 pub fn row_offset(&self) -> u64 {
59 self.row_offset
60 }
61
62 pub fn row_range(&self) -> &Range<u64> {
64 &self.row_range
65 }
66
67 pub fn len(&self) -> u64 {
69 self.row_range.end - self.row_range.start
70 }
71
72 pub fn is_empty(&self) -> bool {
74 self.row_range.is_empty()
75 }
76
77 pub fn root_row_range(&self) -> Range<u64> {
79 self.row_offset + self.row_range.start..self.row_offset + self.row_range.end
80 }
81
82 pub fn check_bounds(&self, row_count: u64) -> VortexResult<()> {
84 if self.row_range.end > row_count {
85 vortex_bail!(
86 "Split range {:?} is out of bounds for row count {}",
87 self.row_range,
88 row_count
89 );
90 }
91
92 Ok(())
93 }
94}
95
96pub trait LayoutReader: 'static + Send + Sync {
99 fn name(&self) -> &Arc<str>;
101
102 fn as_any(&self) -> &dyn Any;
103
104 fn dtype(&self) -> &DType;
106
107 fn row_count(&self) -> u64;
109
110 fn register_splits(
113 &self,
114 field_mask: &[FieldMask],
115 split_range: &SplitRange,
116 splits: &mut BTreeSet<u64>,
117 ) -> VortexResult<()>;
118
119 fn pruning_evaluation(
123 &self,
124 row_range: &Range<u64>,
125 expr: &Expression,
126 mask: Mask,
127 ) -> VortexResult<MaskFuture>;
128
129 fn filter_evaluation(
139 &self,
140 row_range: &Range<u64>,
141 expr: &Expression,
142 mask: MaskFuture,
143 ) -> VortexResult<MaskFuture>;
144
145 fn projection_evaluation(
155 &self,
156 row_range: &Range<u64>,
157 expr: &Expression,
158 mask: MaskFuture,
159 ) -> VortexResult<ArrayFuture>;
160}
161
162pub type ArrayFuture = BoxFuture<'static, VortexResult<ArrayRef>>;
163
164pub trait ArrayFutureExt {
165 fn masked(self, mask: MaskFuture) -> Self;
166}
167
168impl ArrayFutureExt for ArrayFuture {
169 fn masked(self, mask: MaskFuture) -> Self {
171 Box::pin(async move {
172 let (array, mask) = try_join!(self, mask)?;
173 array.mask(mask.into_array())
174 })
175 }
176}
177
178pub struct LazyReaderChildren {
179 children: Arc<dyn LayoutChildren>,
180 dtypes: Vec<DType>,
181 names: Vec<Arc<str>>,
182 segment_source: Arc<dyn SegmentSource>,
183 session: VortexSession,
184 cache: Vec<OnceCell<LayoutReaderRef>>,
186}
187
188impl LazyReaderChildren {
189 pub fn new(
190 children: Arc<dyn LayoutChildren>,
191 dtypes: Vec<DType>,
192 names: Vec<Arc<str>>,
193 segment_source: Arc<dyn SegmentSource>,
194 session: VortexSession,
195 ) -> Self {
196 let nchildren = children.nchildren();
197 let cache = (0..nchildren).map(|_| OnceCell::new()).collect();
198 Self {
199 children,
200 dtypes,
201 names,
202 segment_source,
203 session,
204 cache,
205 }
206 }
207
208 pub fn get(&self, idx: usize) -> VortexResult<&LayoutReaderRef> {
209 if idx >= self.cache.len() {
210 vortex_bail!("Child index out of bounds: {} of {}", idx, self.cache.len());
211 }
212
213 self.cache[idx].get_or_try_init(|| {
214 let dtype = &self.dtypes[idx];
215 let child = self.children.child(idx, dtype)?;
216 child.new_reader(
217 Arc::clone(&self.names[idx]),
218 Arc::clone(&self.segment_source),
219 &self.session,
220 )
221 })
222 }
223}