1use std::any::Any;
5use std::ops::Range;
6use std::sync::Arc;
7
8use futures::future::BoxFuture;
9use futures::try_join;
10use once_cell::sync::OnceCell;
11use vortex_array::ArrayRef;
12use vortex_array::IntoArray;
13use vortex_array::MaskFuture;
14use vortex_array::builtins::ArrayBuiltins;
15use vortex_array::dtype::DType;
16use vortex_array::dtype::FieldMask;
17use vortex_array::expr::Expression;
18use vortex_error::VortexResult;
19use vortex_error::vortex_bail;
20use vortex_mask::Mask;
21use vortex_session::VortexSession;
22
23use crate::children::LayoutChildren;
24use crate::segments::SegmentSource;
25
26pub type LayoutReaderRef = Arc<dyn LayoutReader>;
27
28#[derive(Clone, Debug, Eq, PartialEq)]
33pub struct SplitRange {
34 row_offset: u64,
35 row_range: Range<u64>,
36}
37
38impl SplitRange {
39 pub fn try_new(row_offset: u64, row_range: Range<u64>) -> VortexResult<Self> {
41 if row_range.start > row_range.end {
42 vortex_bail!("Invalid split range {:?}", row_range);
43 }
44
45 Ok(Self {
46 row_offset,
47 row_range,
48 })
49 }
50
51 pub fn root(row_range: Range<u64>) -> VortexResult<Self> {
53 Self::try_new(0, row_range)
54 }
55
56 pub fn row_offset(&self) -> u64 {
58 self.row_offset
59 }
60
61 pub fn row_range(&self) -> &Range<u64> {
63 &self.row_range
64 }
65
66 pub fn len(&self) -> u64 {
68 self.row_range.end - self.row_range.start
69 }
70
71 pub fn is_empty(&self) -> bool {
73 self.row_range.is_empty()
74 }
75
76 pub fn root_row_range(&self) -> Range<u64> {
78 self.row_offset + self.row_range.start..self.row_offset + self.row_range.end
79 }
80
81 pub fn check_bounds(&self, row_count: u64) -> VortexResult<()> {
83 if self.row_range.end > row_count {
84 vortex_bail!(
85 "Split range {:?} is out of bounds for row count {}",
86 self.row_range,
87 row_count
88 );
89 }
90
91 Ok(())
92 }
93}
94
95pub struct RowSplits(Vec<u64>);
97
98impl RowSplits {
99 pub fn push(&mut self, row: u64) {
101 self.0.push(row);
102 }
103
104 pub fn reserve(&mut self, additional: usize) {
106 self.0.reserve(additional);
107 }
108
109 pub(crate) fn new_capacity(capacity: usize) -> Self {
111 Self(Vec::with_capacity(capacity))
112 }
113
114 pub(crate) fn into_sorted_deduped(mut self) -> Vec<u64> {
115 self.0.sort_unstable();
116 self.0.dedup();
117 self.0.shrink_to_fit();
118 self.0
119 }
120}
121
122pub trait LayoutReader: 'static + Send + Sync {
125 fn name(&self) -> &Arc<str>;
127
128 fn as_any(&self) -> &dyn Any;
129
130 fn dtype(&self) -> &DType;
132
133 fn row_count(&self) -> u64;
135
136 fn register_splits(
139 &self,
140 field_mask: &[FieldMask],
141 split_range: &SplitRange,
142 splits: &mut RowSplits,
143 ) -> VortexResult<()>;
144
145 fn pruning_evaluation(
149 &self,
150 row_range: &Range<u64>,
151 expr: &Expression,
152 mask: Mask,
153 ) -> VortexResult<MaskFuture>;
154
155 fn filter_evaluation(
165 &self,
166 row_range: &Range<u64>,
167 expr: &Expression,
168 mask: MaskFuture,
169 ) -> VortexResult<MaskFuture>;
170
171 fn projection_evaluation(
181 &self,
182 row_range: &Range<u64>,
183 expr: &Expression,
184 mask: MaskFuture,
185 ) -> VortexResult<ArrayFuture>;
186}
187
188pub type ArrayFuture = BoxFuture<'static, VortexResult<ArrayRef>>;
189
190pub trait ArrayFutureExt {
191 fn masked(self, mask: MaskFuture) -> Self;
192}
193
194impl ArrayFutureExt for ArrayFuture {
195 fn masked(self, mask: MaskFuture) -> Self {
197 Box::pin(async move {
198 let (array, mask) = try_join!(self, mask)?;
199 array.mask(mask.into_array())
200 })
201 }
202}
203
204pub struct LazyReaderChildren {
205 children: Arc<dyn LayoutChildren>,
206 dtypes: Vec<DType>,
207 names: Vec<Arc<str>>,
208 segment_source: Arc<dyn SegmentSource>,
209 session: VortexSession,
210 cache: Vec<OnceCell<LayoutReaderRef>>,
212}
213
214impl LazyReaderChildren {
215 pub fn new(
216 children: Arc<dyn LayoutChildren>,
217 dtypes: Vec<DType>,
218 names: Vec<Arc<str>>,
219 segment_source: Arc<dyn SegmentSource>,
220 session: VortexSession,
221 ) -> Self {
222 let nchildren = children.nchildren();
223 let cache = (0..nchildren).map(|_| OnceCell::new()).collect();
224 Self {
225 children,
226 dtypes,
227 names,
228 segment_source,
229 session,
230 cache,
231 }
232 }
233
234 pub fn get(&self, idx: usize) -> VortexResult<&LayoutReaderRef> {
235 if idx >= self.cache.len() {
236 vortex_bail!("Child index out of bounds: {} of {}", idx, self.cache.len());
237 }
238
239 self.cache[idx].get_or_try_init(|| {
240 let dtype = &self.dtypes[idx];
241 let child = self.children.child(idx, dtype)?;
242 child.new_reader(
243 Arc::clone(&self.names[idx]),
244 Arc::clone(&self.segment_source),
245 &self.session,
246 )
247 })
248 }
249}