1use std::any::Any;
5use std::ops::Range;
6use std::sync::Arc;
7
8use futures::future::BoxFuture;
9use futures::try_join;
10use once_cell::sync::OnceCell;
11use vortex_array::ArrayRef;
12use vortex_array::IntoArray;
13use vortex_array::MaskFuture;
14use vortex_array::builtins::ArrayBuiltins;
15use vortex_array::dtype::DType;
16use vortex_array::dtype::FieldMask;
17use vortex_array::expr::Expression;
18use vortex_error::VortexResult;
19use vortex_error::vortex_bail;
20use vortex_mask::Mask;
21use vortex_session::VortexSession;
22
23use crate::LayoutReaderContext;
24use crate::children::LayoutChildren;
25use crate::segments::SegmentSource;
26
27pub type LayoutReaderRef = Arc<dyn LayoutReader>;
28
29#[derive(Clone, Debug, Eq, PartialEq)]
34pub struct SplitRange {
35 row_offset: u64,
36 row_range: Range<u64>,
37}
38
39impl SplitRange {
40 pub fn try_new(row_offset: u64, row_range: Range<u64>) -> VortexResult<Self> {
42 if row_range.start > row_range.end {
43 vortex_bail!("Invalid split range {:?}", row_range);
44 }
45
46 Ok(Self {
47 row_offset,
48 row_range,
49 })
50 }
51
52 pub fn root(row_range: Range<u64>) -> VortexResult<Self> {
54 Self::try_new(0, row_range)
55 }
56
57 pub fn row_offset(&self) -> u64 {
59 self.row_offset
60 }
61
62 pub fn row_range(&self) -> &Range<u64> {
64 &self.row_range
65 }
66
67 pub fn len(&self) -> u64 {
69 self.row_range.end - self.row_range.start
70 }
71
72 pub fn is_empty(&self) -> bool {
74 self.row_range.is_empty()
75 }
76
77 pub fn root_row_range(&self) -> Range<u64> {
79 self.row_offset + self.row_range.start..self.row_offset + self.row_range.end
80 }
81
82 pub fn check_bounds(&self, row_count: u64) -> VortexResult<()> {
84 if self.row_range.end > row_count {
85 vortex_bail!(
86 "Split range {:?} is out of bounds for row count {}",
87 self.row_range,
88 row_count
89 );
90 }
91
92 Ok(())
93 }
94}
95
96pub struct RowSplits(Vec<u64>);
98
99impl RowSplits {
100 pub fn push(&mut self, row: u64) {
102 self.0.push(row);
103 }
104
105 pub fn reserve(&mut self, additional: usize) {
107 self.0.reserve(additional);
108 }
109
110 pub(crate) fn new_capacity(capacity: usize) -> Self {
112 Self(Vec::with_capacity(capacity))
113 }
114
115 pub(crate) fn into_sorted_deduped(mut self) -> Vec<u64> {
116 self.0.sort_unstable();
117 self.0.dedup();
118 self.0.shrink_to_fit();
119 self.0
120 }
121}
122
123pub trait LayoutReader: 'static + Send + Sync {
126 fn name(&self) -> &Arc<str>;
128
129 fn as_any(&self) -> &dyn Any;
130
131 fn dtype(&self) -> &DType;
133
134 fn row_count(&self) -> u64;
136
137 fn register_splits(
140 &self,
141 field_mask: &[FieldMask],
142 split_range: &SplitRange,
143 splits: &mut RowSplits,
144 ) -> VortexResult<()>;
145
146 fn pruning_evaluation(
150 &self,
151 row_range: &Range<u64>,
152 expr: &Expression,
153 mask: Mask,
154 ) -> VortexResult<MaskFuture>;
155
156 fn filter_evaluation(
166 &self,
167 row_range: &Range<u64>,
168 expr: &Expression,
169 mask: MaskFuture,
170 ) -> VortexResult<MaskFuture>;
171
172 fn projection_evaluation(
182 &self,
183 row_range: &Range<u64>,
184 expr: &Expression,
185 mask: MaskFuture,
186 ) -> VortexResult<ArrayFuture>;
187}
188
189pub type ArrayFuture = BoxFuture<'static, VortexResult<ArrayRef>>;
190
191pub trait ArrayFutureExt {
192 fn masked(self, mask: MaskFuture) -> Self;
193}
194
195impl ArrayFutureExt for ArrayFuture {
196 fn masked(self, mask: MaskFuture) -> Self {
198 Box::pin(async move {
199 let (array, mask) = try_join!(self, mask)?;
200 array.mask(mask.into_array())
201 })
202 }
203}
204
205pub struct LazyReaderChildren {
206 children: Arc<dyn LayoutChildren>,
207 dtypes: Vec<DType>,
208 names: Vec<Arc<str>>,
209 segment_source: Arc<dyn SegmentSource>,
210 session: VortexSession,
211 ctx: LayoutReaderContext,
212 cache: Vec<OnceCell<LayoutReaderRef>>,
214}
215
216impl LazyReaderChildren {
217 pub fn new(
218 children: Arc<dyn LayoutChildren>,
219 dtypes: Vec<DType>,
220 names: Vec<Arc<str>>,
221 segment_source: Arc<dyn SegmentSource>,
222 session: VortexSession,
223 ctx: LayoutReaderContext,
224 ) -> Self {
225 let nchildren = children.nchildren();
226 let cache = (0..nchildren).map(|_| OnceCell::new()).collect();
227 Self {
228 children,
229 dtypes,
230 names,
231 segment_source,
232 session,
233 ctx,
234 cache,
235 }
236 }
237
238 pub fn get(&self, idx: usize) -> VortexResult<&LayoutReaderRef> {
239 if idx >= self.cache.len() {
240 vortex_bail!("Child index out of bounds: {} of {}", idx, self.cache.len());
241 }
242
243 self.cache[idx].get_or_try_init(|| {
244 let dtype = &self.dtypes[idx];
245 let child = self.children.child(idx, dtype)?;
246 child.new_reader(
247 Arc::clone(&self.names[idx]),
248 Arc::clone(&self.segment_source),
249 &self.session,
250 &self.ctx,
251 )
252 })
253 }
254}