1mod expr;
5
6use std::collections::BTreeSet;
7use std::fmt::Display;
8use std::fmt::Formatter;
9use std::ops::BitAnd;
10use std::ops::Range;
11use std::sync::Arc;
12
13use Nullability::NonNullable;
14pub use expr::*;
15use futures::FutureExt;
16use futures::future::BoxFuture;
17use vortex_array::ArrayRef;
18use vortex_array::IntoArray;
19use vortex_array::MaskFuture;
20use vortex_array::VortexSessionExecute;
21use vortex_array::compute::filter;
22use vortex_array::expr::ExactExpr;
23use vortex_array::expr::Expression;
24use vortex_array::expr::is_root;
25use vortex_array::expr::root;
26use vortex_array::expr::transform::PartitionedExpr;
27use vortex_array::expr::transform::partition;
28use vortex_array::expr::transform::replace;
29use vortex_array::scalar::PValue;
30use vortex_dtype::DType;
31use vortex_dtype::FieldMask;
32use vortex_dtype::FieldName;
33use vortex_dtype::Nullability;
34use vortex_dtype::PType;
35use vortex_error::VortexExpect;
36use vortex_error::VortexResult;
37use vortex_mask::Mask;
38use vortex_sequence::SequenceArray;
39use vortex_session::VortexSession;
40use vortex_utils::aliases::dash_map::DashMap;
41
42use crate::ArrayFuture;
43use crate::LayoutReader;
44use crate::layouts::partitioned::PartitionedExprEval;
45
46pub struct RowIdxLayoutReader {
47 name: Arc<str>,
48 row_offset: u64,
49 child: Arc<dyn LayoutReader>,
50 partition_cache: DashMap<ExactExpr, Partitioning>,
51 session: VortexSession,
52}
53
54impl RowIdxLayoutReader {
55 pub fn new(row_offset: u64, child: Arc<dyn LayoutReader>, session: VortexSession) -> Self {
56 Self {
57 name: child.name().clone(),
58 row_offset,
59 child,
60 partition_cache: DashMap::with_hasher(Default::default()),
61 session,
62 }
63 }
64
65 fn partition_expr(&self, expr: &Expression) -> Partitioning {
66 let key = ExactExpr(expr.clone());
67
68 if let Some(partitioning) = self.partition_cache.get(&key) {
70 return partitioning.clone();
71 }
72
73 self.partition_cache
74 .entry(key)
75 .or_insert_with(|| {
76 let mut partitioned = partition(expr.clone(), self.dtype(), |expr| {
78 if expr.is::<RowIdx>() {
79 vec![Partition::RowIdx]
80 } else if is_root(expr) {
81 vec![Partition::Child]
82 } else {
83 vec![]
84 }
85 })
86 .vortex_expect("We should not fail to partition expression over struct fields");
87
88 if partitioned.partitions.len() == 1 {
90 return match &partitioned.partition_annotations[0] {
91 Partition::RowIdx => {
92 Partitioning::RowIdx(replace(expr.clone(), &row_idx(), root()))
93 }
94 Partition::Child => Partitioning::Child(expr.clone()),
95 };
96 }
97
98 partitioned.partitions = partitioned
100 .partitions
101 .into_iter()
102 .map(|p| replace(p, &row_idx(), root()))
103 .collect();
104
105 Partitioning::Partitioned(Arc::new(partitioned))
106 })
107 .clone()
108 }
109}
110
111#[derive(Clone)]
112enum Partitioning {
113 RowIdx(Expression),
115 Child(Expression),
117 Partitioned(Arc<PartitionedExpr<Partition>>),
119}
120
121#[derive(Clone, PartialEq, Eq, Hash)]
122enum Partition {
123 RowIdx,
124 Child,
125}
126
127impl Partition {
128 pub fn name(&self) -> &str {
129 match self {
130 Partition::RowIdx => "row_idx",
131 Partition::Child => "child",
132 }
133 }
134}
135
136impl From<Partition> for FieldName {
137 fn from(value: Partition) -> Self {
138 FieldName::from(value.name())
139 }
140}
141
142impl Display for Partition {
143 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
144 write!(f, "{}", self.name())
145 }
146}
147
148impl LayoutReader for RowIdxLayoutReader {
149 fn name(&self) -> &Arc<str> {
150 &self.name
151 }
152
153 fn dtype(&self) -> &DType {
154 self.child.dtype()
155 }
156
157 fn row_count(&self) -> u64 {
158 self.child.row_count()
159 }
160
161 fn register_splits(
162 &self,
163 field_mask: &[FieldMask],
164 row_range: &Range<u64>,
165 splits: &mut BTreeSet<u64>,
166 ) -> VortexResult<()> {
167 self.child.register_splits(field_mask, row_range, splits)
168 }
169
170 fn pruning_evaluation(
171 &self,
172 row_range: &Range<u64>,
173 expr: &Expression,
174 mask: Mask,
175 ) -> VortexResult<MaskFuture> {
176 Ok(match &self.partition_expr(expr) {
177 Partitioning::RowIdx(expr) => row_idx_mask_future(
178 self.row_offset,
179 row_range,
180 expr,
181 MaskFuture::ready(mask),
182 self.session.clone(),
183 ),
184 Partitioning::Child(expr) => self.child.pruning_evaluation(row_range, expr, mask)?,
185 Partitioning::Partitioned(..) => MaskFuture::ready(mask),
186 })
187 }
188
189 fn filter_evaluation(
190 &self,
191 row_range: &Range<u64>,
192 expr: &Expression,
193 mask: MaskFuture,
194 ) -> VortexResult<MaskFuture> {
195 match &self.partition_expr(expr) {
196 Partitioning::RowIdx(_) => Ok(mask),
199 Partitioning::Child(expr) => self.child.filter_evaluation(row_range, expr, mask),
200 Partitioning::Partitioned(p) => p.clone().into_mask_future(
201 mask,
202 |annotation, expr, mask| match annotation {
203 Partition::RowIdx => Ok(row_idx_mask_future(
204 self.row_offset,
205 row_range,
206 expr,
207 mask,
208 self.session.clone(),
209 )),
210 Partition::Child => self.child.filter_evaluation(row_range, expr, mask),
211 },
212 |annotation, expr, mask| match annotation {
213 Partition::RowIdx => {
214 Ok(row_idx_array_future(self.row_offset, row_range, expr, mask))
215 }
216 Partition::Child => self.child.projection_evaluation(row_range, expr, mask),
217 },
218 self.session.clone(),
219 ),
220 }
221 }
222
223 fn projection_evaluation(
224 &self,
225 row_range: &Range<u64>,
226 expr: &Expression,
227 mask: MaskFuture,
228 ) -> VortexResult<BoxFuture<'static, VortexResult<ArrayRef>>> {
229 match &self.partition_expr(expr) {
230 Partitioning::RowIdx(expr) => {
231 Ok(row_idx_array_future(self.row_offset, row_range, expr, mask))
232 }
233 Partitioning::Child(expr) => self.child.projection_evaluation(row_range, expr, mask),
234 Partitioning::Partitioned(p) => {
235 p.clone()
236 .into_array_future(mask, |annotation, expr, mask| match annotation {
237 Partition::RowIdx => {
238 Ok(row_idx_array_future(self.row_offset, row_range, expr, mask))
239 }
240 Partition::Child => self.child.projection_evaluation(row_range, expr, mask),
241 })
242 }
243 }
244 }
245}
246
247fn idx_array(row_offset: u64, row_range: &Range<u64>) -> SequenceArray {
249 SequenceArray::new(
250 PValue::U64(row_offset + row_range.start),
251 PValue::U64(1),
252 PType::U64,
253 NonNullable,
254 usize::try_from(row_range.end - row_range.start)
255 .vortex_expect("Row range length must fit in usize"),
256 )
257 .vortex_expect("Failed to create row index array")
258}
259
260fn row_idx_mask_future(
261 row_offset: u64,
262 row_range: &Range<u64>,
263 expr: &Expression,
264 mask: MaskFuture,
265 session: VortexSession,
266) -> MaskFuture {
267 let row_range = row_range.clone();
268 let expr = expr.clone();
269 MaskFuture::new(mask.len(), async move {
270 let array = idx_array(row_offset, &row_range).into_array();
271
272 let mut ctx = session.create_execution_ctx();
273 let result_mask = array.apply(&expr)?.execute::<Mask>(&mut ctx)?;
274
275 Ok(result_mask.bitand(&mask.await?))
276 })
277}
278
279fn row_idx_array_future(
280 row_offset: u64,
281 row_range: &Range<u64>,
282 expr: &Expression,
283 mask: MaskFuture,
284) -> ArrayFuture {
285 let row_range = row_range.clone();
286 let expr = expr.clone();
287 async move {
288 let array = idx_array(row_offset, &row_range).into_array();
289 let array = filter(&array, &mask.await?)?;
290 array.apply(&expr)
291 }
292 .boxed()
293}
294
295#[cfg(test)]
296mod tests {
297 use std::sync::Arc;
298
299 use vortex_array::ArrayContext;
300 use vortex_array::IntoArray as _;
301 use vortex_array::MaskFuture;
302 use vortex_array::arrays::BoolArray;
303 use vortex_array::assert_arrays_eq;
304 use vortex_array::expr::eq;
305 use vortex_array::expr::gt;
306 use vortex_array::expr::lit;
307 use vortex_array::expr::or;
308 use vortex_array::expr::root;
309 use vortex_buffer::buffer;
310 use vortex_io::runtime::single::block_on;
311
312 use crate::LayoutReader;
313 use crate::LayoutStrategy;
314 use crate::layouts::flat::writer::FlatLayoutStrategy;
315 use crate::layouts::row_idx::RowIdxLayoutReader;
316 use crate::layouts::row_idx::row_idx;
317 use crate::segments::TestSegments;
318 use crate::sequence::SequenceId;
319 use crate::sequence::SequentialArrayStreamExt;
320 use crate::test::SESSION;
321
322 #[test]
323 fn flat_expr_no_row_id() {
324 block_on(|handle| async {
325 let ctx = ArrayContext::empty();
326 let segments = Arc::new(TestSegments::default());
327 let (ptr, eof) = SequenceId::root().split();
328 let array = buffer![1..=5].into_array();
329 let layout = FlatLayoutStrategy::default()
330 .write_stream(
331 ctx,
332 segments.clone(),
333 array.to_array_stream().sequenced(ptr),
334 eof,
335 handle,
336 )
337 .await
338 .unwrap();
339
340 let expr = eq(root(), lit(3i32));
341 let result = RowIdxLayoutReader::new(
342 0,
343 layout.new_reader("".into(), segments, &SESSION).unwrap(),
344 SESSION.clone(),
345 )
346 .projection_evaluation(
347 &(0..layout.row_count()),
348 &expr,
349 MaskFuture::new_true(layout.row_count().try_into().unwrap()),
350 )
351 .unwrap()
352 .await
353 .unwrap();
354
355 assert_arrays_eq!(
356 result,
357 BoolArray::from_iter([false, false, true, false, false])
358 );
359 })
360 }
361
362 #[test]
363 fn flat_expr_row_id() {
364 block_on(|handle| async {
365 let ctx = ArrayContext::empty();
366 let segments = Arc::new(TestSegments::default());
367 let (ptr, eof) = SequenceId::root().split();
368 let array = buffer![1..=5].into_array();
369 let layout = FlatLayoutStrategy::default()
370 .write_stream(
371 ctx,
372 segments.clone(),
373 array.to_array_stream().sequenced(ptr),
374 eof,
375 handle,
376 )
377 .await
378 .unwrap();
379
380 let expr = gt(row_idx(), lit(3u64));
381 let result = RowIdxLayoutReader::new(
382 0,
383 layout.new_reader("".into(), segments, &SESSION).unwrap(),
384 SESSION.clone(),
385 )
386 .projection_evaluation(
387 &(0..layout.row_count()),
388 &expr,
389 MaskFuture::new_true(layout.row_count().try_into().unwrap()),
390 )
391 .unwrap()
392 .await
393 .unwrap();
394
395 assert_arrays_eq!(
396 result,
397 BoolArray::from_iter([false, false, false, false, true])
398 );
399 })
400 }
401
402 #[test]
403 fn flat_expr_or() {
404 block_on(|handle| async {
405 let ctx = ArrayContext::empty();
406 let segments = Arc::new(TestSegments::default());
407 let (ptr, eof) = SequenceId::root().split();
408 let array = buffer![1..=5].into_array();
409 let layout = FlatLayoutStrategy::default()
410 .write_stream(
411 ctx,
412 segments.clone(),
413 array.to_array_stream().sequenced(ptr),
414 eof,
415 handle,
416 )
417 .await
418 .unwrap();
419
420 let expr = or(
421 eq(root(), lit(3i32)),
422 or(gt(row_idx(), lit(3u64)), eq(root(), lit(1i32))),
423 );
424
425 let result = RowIdxLayoutReader::new(
426 0,
427 layout.new_reader("".into(), segments, &SESSION).unwrap(),
428 SESSION.clone(),
429 )
430 .projection_evaluation(
431 &(0..layout.row_count()),
432 &expr,
433 MaskFuture::new_true(layout.row_count().try_into().unwrap()),
434 )
435 .unwrap()
436 .await
437 .unwrap();
438
439 assert_arrays_eq!(
440 result,
441 BoolArray::from_iter([true, false, true, false, true])
442 );
443 })
444 }
445}