1mod expr;
5
6use std::collections::BTreeSet;
7use std::fmt::{Display, Formatter};
8use std::ops::{BitAnd, Range};
9use std::sync::Arc;
10
11use Nullability::NonNullable;
12use async_trait::async_trait;
13pub use expr::*;
14use vortex_array::compute::filter;
15use vortex_array::stats::Precision;
16use vortex_array::{ArrayRef, IntoArray};
17use vortex_dtype::{DType, FieldMask, Nullability, PType};
18use vortex_error::{VortexExpect, VortexResult};
19use vortex_expr::transform::{PartitionedExpr, partition, replace};
20use vortex_expr::{ExactExpr, ExprRef, Scope, is_root, root};
21use vortex_mask::Mask;
22use vortex_scalar::PValue;
23use vortex_sequence::SequenceArray;
24use vortex_utils::aliases::dash_map::DashMap;
25
26use crate::layouts::partitioned::{PartitionedArrayEvaluation, PartitionedMaskEvaluation};
27use crate::{
28 ArrayEvaluation, LayoutReader, MaskEvaluation, MaskFuture, NoOpMaskEvaluation,
29 NoOpPruningEvaluation, PruningEvaluation,
30};
31
32pub struct RowIdxLayoutReader {
33 name: Arc<str>,
34 row_offset: u64,
35 child: Arc<dyn LayoutReader>,
36
37 partition_cache: DashMap<ExactExpr, Partitioning>,
38}
39
40impl RowIdxLayoutReader {
41 pub fn new(row_offset: u64, child: Arc<dyn LayoutReader>) -> Self {
42 Self {
43 name: child.name().clone(),
44 row_offset,
45 child,
46 partition_cache: DashMap::with_hasher(Default::default()),
47 }
48 }
49
50 fn partition_expr(&self, expr: &ExprRef) -> Partitioning {
51 self.partition_cache
52 .entry(ExactExpr(expr.clone()))
53 .or_insert_with(|| {
54 let mut partitioned = partition(expr.clone(), self.dtype(), |expr| {
56 if expr.is::<RowIdxVTable>() {
57 vec![Partition::RowIdx]
58 } else if is_root(expr) {
59 vec![Partition::Child]
60 } else {
61 vec![]
62 }
63 })
64 .vortex_expect("We should not fail to partition expression over struct fields");
65
66 if partitioned.partitions.len() == 1 {
68 return match &partitioned.partition_annotations[0] {
69 Partition::RowIdx => {
70 Partitioning::RowIdx(replace(expr.clone(), &row_idx(), root()))
71 }
72 Partition::Child => Partitioning::Child(expr.clone()),
73 };
74 }
75
76 partitioned.partitions = partitioned
78 .partitions
79 .into_iter()
80 .map(|p| replace(p, &row_idx(), root()))
81 .collect();
82
83 Partitioning::Partitioned(Arc::new(partitioned))
84 })
85 .clone()
86 }
87}
88
89#[derive(Clone)]
90enum Partitioning {
91 RowIdx(ExprRef),
93 Child(ExprRef),
95 Partitioned(Arc<PartitionedExpr<Partition>>),
97}
98
99#[derive(Clone, PartialEq, Eq, Hash)]
100enum Partition {
101 RowIdx,
102 Child,
103}
104
105impl Display for Partition {
106 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
107 match self {
108 Partition::RowIdx => write!(f, "row_idx"),
109 Partition::Child => write!(f, "child"),
110 }
111 }
112}
113
114impl LayoutReader for RowIdxLayoutReader {
115 fn name(&self) -> &Arc<str> {
116 &self.name
117 }
118
119 fn dtype(&self) -> &DType {
120 self.child.dtype()
121 }
122
123 fn row_count(&self) -> Precision<u64> {
124 self.child.row_count()
125 }
126
127 fn register_splits(
128 &self,
129 field_mask: &[FieldMask],
130 row_offset: u64,
131 splits: &mut BTreeSet<u64>,
132 ) -> VortexResult<()> {
133 self.child.register_splits(field_mask, row_offset, splits)
134 }
135
136 fn pruning_evaluation(
137 &self,
138 row_range: &Range<u64>,
139 expr: &ExprRef,
140 ) -> VortexResult<Box<dyn PruningEvaluation>> {
141 match &self.partition_expr(expr) {
142 Partitioning::RowIdx(expr) => Ok(Box::new(RowIdxEvaluation::new(
143 self.row_offset,
144 row_range,
145 expr,
146 ))),
147 Partitioning::Child(expr) => self.child.pruning_evaluation(row_range, expr),
148 Partitioning::Partitioned(..) => Ok(Box::new(NoOpPruningEvaluation)),
149 }
150 }
151
152 fn filter_evaluation(
153 &self,
154 row_range: &Range<u64>,
155 expr: &ExprRef,
156 ) -> VortexResult<Box<dyn MaskEvaluation>> {
157 match &self.partition_expr(expr) {
158 Partitioning::RowIdx(_) => Ok(Box::new(NoOpMaskEvaluation)),
161 Partitioning::Child(expr) => self.child.filter_evaluation(row_range, expr),
162 Partitioning::Partitioned(p) => Ok(Box::new(PartitionedMaskEvaluation::try_new(
163 p.clone(),
164 |annotation, expr| match annotation {
165 Partition::RowIdx => Ok(Box::new(RowIdxEvaluation::new(
166 self.row_offset,
167 row_range,
168 expr,
169 ))),
170 Partition::Child => self.child.filter_evaluation(row_range, expr),
171 },
172 |annotation, expr| match annotation {
173 Partition::RowIdx => Ok(Box::new(RowIdxEvaluation::new(
174 self.row_offset,
175 row_range,
176 expr,
177 ))),
178 Partition::Child => self.child.projection_evaluation(row_range, expr),
179 },
180 )?)),
181 }
182 }
183
184 fn projection_evaluation(
185 &self,
186 row_range: &Range<u64>,
187 expr: &ExprRef,
188 ) -> VortexResult<Box<dyn ArrayEvaluation>> {
189 match &self.partition_expr(expr) {
190 Partitioning::RowIdx(expr) => Ok(Box::new(RowIdxEvaluation::new(
191 self.row_offset,
192 row_range,
193 expr,
194 ))),
195 Partitioning::Child(expr) => self.child.projection_evaluation(row_range, expr),
196 Partitioning::Partitioned(p) => Ok(Box::new(PartitionedArrayEvaluation::try_new(
197 p.clone(),
198 |annotation, expr| match annotation {
199 Partition::RowIdx => Ok(Box::new(RowIdxEvaluation::new(
200 self.row_offset,
201 row_range,
202 expr,
203 ))),
204 Partition::Child => self.child.projection_evaluation(row_range, expr),
205 },
206 )?)),
207 }
208 }
209}
210
211struct RowIdxEvaluation {
215 array: ArrayRef,
216 expr: ExprRef,
217}
218
219impl RowIdxEvaluation {
220 fn new(row_offset: u64, row_range: &Range<u64>, expr: &ExprRef) -> Self {
221 let array = SequenceArray::new(
222 PValue::U64(row_offset + row_range.start),
223 PValue::U64(1),
224 PType::U64,
225 NonNullable,
226 usize::try_from(row_range.end - row_range.start)
227 .vortex_expect("Row range length must fit in usize"),
228 )
229 .vortex_expect("Failed to create row index array");
230
231 Self {
232 array: array.into_array(),
233 expr: expr.clone(),
234 }
235 }
236}
237
238#[async_trait]
239impl PruningEvaluation for RowIdxEvaluation {
240 async fn invoke(&self, _mask: Mask) -> VortexResult<Mask> {
241 self.expr
244 .evaluate(&Scope::new(self.array.clone()))?
245 .try_to_mask_fill_null_false()
246 }
247}
248
249#[async_trait]
250impl MaskEvaluation for RowIdxEvaluation {
251 async fn invoke(&self, mask: MaskFuture) -> VortexResult<Mask> {
252 let result = self
255 .expr
256 .evaluate(&Scope::new(self.array.clone()))?
257 .try_to_mask_fill_null_false()?;
258
259 Ok(result.bitand(&mask.await?))
262 }
263}
264
265#[async_trait]
266impl ArrayEvaluation for RowIdxEvaluation {
267 async fn invoke(&self, mask: MaskFuture) -> VortexResult<ArrayRef> {
268 let array = filter(&self.array, &mask.await?)?;
269 self.expr.evaluate(&Scope::new(array))
270 }
271}
272
273#[cfg(test)]
274mod tests {
275 use std::sync::Arc;
276
277 use arrow_buffer::BooleanBuffer;
278 use futures::executor::block_on;
279 use futures::stream;
280 use itertools::Itertools;
281 use vortex_array::arrays::PrimitiveArray;
282 use vortex_array::{ArrayContext, ToCanonical};
283 use vortex_expr::{eq, gt, lit, or, root};
284
285 use crate::layouts::flat::writer::FlatLayoutStrategy;
286 use crate::layouts::row_idx::{RowIdxLayoutReader, row_idx};
287 use crate::segments::{SegmentSource, SequenceWriter, TestSegments};
288 use crate::sequence::SequenceId;
289 use crate::{
290 LayoutReader, LayoutStrategy, MaskFuture, SequentialStreamAdapter, SequentialStreamExt,
291 };
292
293 #[test]
294 fn flat_expr_no_row_id() {
295 block_on(async {
296 let ctx = ArrayContext::empty();
297 let segments = TestSegments::default();
298 let sequence_writer = SequenceWriter::new(Box::new(segments.clone()));
299 let array = PrimitiveArray::from_iter(1..=5).to_array();
300 let array_clone = array.clone();
301 let layout = FlatLayoutStrategy::default()
302 .write_stream(
303 &ctx,
304 sequence_writer.clone(),
305 SequentialStreamAdapter::new(
306 array.dtype().clone(),
307 stream::once(async { Ok((SequenceId::root().downgrade(), array_clone)) }),
308 )
309 .sendable(),
310 )
311 .await
312 .unwrap();
313 let segments: Arc<dyn SegmentSource> = Arc::new(segments);
314
315 let expr = eq(root(), lit(3i32));
316 let result =
317 RowIdxLayoutReader::new(0, layout.new_reader("".into(), segments).unwrap())
318 .projection_evaluation(&(0..layout.row_count()), &expr)
319 .unwrap()
320 .invoke(MaskFuture::new_true(layout.row_count().try_into().unwrap()))
321 .await
322 .unwrap()
323 .to_bool();
324
325 assert_eq!(
326 &BooleanBuffer::from_iter([false, false, true, false, false]),
327 result.boolean_buffer()
328 );
329 })
330 }
331
332 #[test]
333 fn flat_expr_row_id() {
334 block_on(async {
335 let ctx = ArrayContext::empty();
336 let segments = TestSegments::default();
337 let sequence_writer = SequenceWriter::new(Box::new(segments.clone()));
338 let array = PrimitiveArray::from_iter(1..=5).to_array();
339 let array_clone = array.clone();
340 let layout = FlatLayoutStrategy::default()
341 .write_stream(
342 &ctx,
343 sequence_writer.clone(),
344 SequentialStreamAdapter::new(
345 array.dtype().clone(),
346 stream::once(async { Ok((SequenceId::root().downgrade(), array_clone)) }),
347 )
348 .sendable(),
349 )
350 .await
351 .unwrap();
352 let segments: Arc<dyn SegmentSource> = Arc::new(segments);
353
354 let expr = gt(row_idx(), lit(3u64));
355 let result =
356 RowIdxLayoutReader::new(0, layout.new_reader("".into(), segments).unwrap())
357 .projection_evaluation(&(0..layout.row_count()), &expr)
358 .unwrap()
359 .invoke(MaskFuture::new_true(layout.row_count().try_into().unwrap()))
360 .await
361 .unwrap()
362 .to_bool();
363
364 assert_eq!(
365 &BooleanBuffer::from_iter([false, false, false, false, true]),
366 result.boolean_buffer()
367 );
368 })
369 }
370
371 #[test]
372 fn flat_expr_or() {
373 block_on(async {
374 let ctx = ArrayContext::empty();
375 let segments = TestSegments::default();
376 let sequence_writer = SequenceWriter::new(Box::new(segments.clone()));
377 let array = PrimitiveArray::from_iter(1..=5).to_array();
378 let array_clone = array.clone();
379 let layout = FlatLayoutStrategy::default()
380 .write_stream(
381 &ctx,
382 sequence_writer.clone(),
383 SequentialStreamAdapter::new(
384 array.dtype().clone(),
385 stream::once(async { Ok((SequenceId::root().downgrade(), array_clone)) }),
386 )
387 .sendable(),
388 )
389 .await
390 .unwrap();
391 let segments: Arc<dyn SegmentSource> = Arc::new(segments);
392
393 let expr = or(
394 eq(root(), lit(3i32)),
395 or(gt(row_idx(), lit(3u64)), eq(root(), lit(1i32))),
396 );
397
398 let result =
399 RowIdxLayoutReader::new(0, layout.new_reader("".into(), segments).unwrap())
400 .projection_evaluation(&(0..layout.row_count()), &expr)
401 .unwrap()
402 .invoke(MaskFuture::new_true(layout.row_count().try_into().unwrap()))
403 .await
404 .unwrap()
405 .to_bool();
406
407 assert_eq!(
408 vec![true, false, true, false, true],
409 result.boolean_buffer().iter().collect_vec()
410 );
411 })
412 }
413}