1mod expr;
5
6use std::collections::BTreeSet;
7use std::fmt::Display;
8use std::fmt::Formatter;
9use std::ops::BitAnd;
10use std::ops::Range;
11use std::sync::Arc;
12
13use Nullability::NonNullable;
14pub use expr::*;
15use futures::FutureExt;
16use futures::future::BoxFuture;
17use vortex_array::ArrayRef;
18use vortex_array::IntoArray;
19use vortex_array::MaskFuture;
20use vortex_array::compute::filter;
21use vortex_array::expr::ExactExpr;
22use vortex_array::expr::Expression;
23use vortex_array::expr::is_root;
24use vortex_array::expr::root;
25use vortex_array::expr::transform::PartitionedExpr;
26use vortex_array::expr::transform::partition;
27use vortex_array::expr::transform::replace;
28use vortex_array::mask::MaskExecutor;
29use vortex_dtype::DType;
30use vortex_dtype::FieldMask;
31use vortex_dtype::FieldName;
32use vortex_dtype::Nullability;
33use vortex_dtype::PType;
34use vortex_error::VortexExpect;
35use vortex_error::VortexResult;
36use vortex_mask::Mask;
37use vortex_scalar::PValue;
38use vortex_sequence::SequenceArray;
39use vortex_session::VortexSession;
40use vortex_utils::aliases::dash_map::DashMap;
41
42use crate::ArrayFuture;
43use crate::LayoutReader;
44use crate::layouts::USE_VORTEX_OPERATORS;
45use crate::layouts::partitioned::PartitionedExprEval;
46
47pub struct RowIdxLayoutReader {
48 name: Arc<str>,
49 row_offset: u64,
50 child: Arc<dyn LayoutReader>,
51 partition_cache: DashMap<ExactExpr, Partitioning>,
52 session: VortexSession,
53}
54
55impl RowIdxLayoutReader {
56 pub fn new(row_offset: u64, child: Arc<dyn LayoutReader>, session: VortexSession) -> Self {
57 Self {
58 name: child.name().clone(),
59 row_offset,
60 child,
61 partition_cache: DashMap::with_hasher(Default::default()),
62 session,
63 }
64 }
65
66 fn partition_expr(&self, expr: &Expression) -> Partitioning {
67 self.partition_cache
68 .entry(ExactExpr(expr.clone()))
69 .or_insert_with(|| {
70 let mut partitioned = partition(expr.clone(), self.dtype(), |expr| {
72 if expr.is::<RowIdx>() {
73 vec![Partition::RowIdx]
74 } else if is_root(expr) {
75 vec![Partition::Child]
76 } else {
77 vec![]
78 }
79 })
80 .vortex_expect("We should not fail to partition expression over struct fields");
81
82 if partitioned.partitions.len() == 1 {
84 return match &partitioned.partition_annotations[0] {
85 Partition::RowIdx => {
86 Partitioning::RowIdx(replace(expr.clone(), &row_idx(), root()))
87 }
88 Partition::Child => Partitioning::Child(expr.clone()),
89 };
90 }
91
92 partitioned.partitions = partitioned
94 .partitions
95 .into_iter()
96 .map(|p| replace(p, &row_idx(), root()))
97 .collect();
98
99 Partitioning::Partitioned(Arc::new(partitioned))
100 })
101 .clone()
102 }
103}
104
105#[derive(Clone)]
106enum Partitioning {
107 RowIdx(Expression),
109 Child(Expression),
111 Partitioned(Arc<PartitionedExpr<Partition>>),
113}
114
115#[derive(Clone, PartialEq, Eq, Hash)]
116enum Partition {
117 RowIdx,
118 Child,
119}
120
121impl Partition {
122 pub fn name(&self) -> &str {
123 match self {
124 Partition::RowIdx => "row_idx",
125 Partition::Child => "child",
126 }
127 }
128}
129
130impl From<Partition> for FieldName {
131 fn from(value: Partition) -> Self {
132 FieldName::from(value.name())
133 }
134}
135
136impl Display for Partition {
137 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
138 write!(f, "{}", self.name())
139 }
140}
141
142impl LayoutReader for RowIdxLayoutReader {
143 fn name(&self) -> &Arc<str> {
144 &self.name
145 }
146
147 fn dtype(&self) -> &DType {
148 self.child.dtype()
149 }
150
151 fn row_count(&self) -> u64 {
152 self.child.row_count()
153 }
154
155 fn register_splits(
156 &self,
157 field_mask: &[FieldMask],
158 row_range: &Range<u64>,
159 splits: &mut BTreeSet<u64>,
160 ) -> VortexResult<()> {
161 self.child.register_splits(field_mask, row_range, splits)
162 }
163
164 fn pruning_evaluation(
165 &self,
166 row_range: &Range<u64>,
167 expr: &Expression,
168 mask: Mask,
169 ) -> VortexResult<MaskFuture> {
170 Ok(match &self.partition_expr(expr) {
171 Partitioning::RowIdx(expr) => row_idx_mask_future(
172 self.row_offset,
173 row_range,
174 expr,
175 MaskFuture::ready(mask),
176 self.session.clone(),
177 ),
178 Partitioning::Child(expr) => self.child.pruning_evaluation(row_range, expr, mask)?,
179 Partitioning::Partitioned(..) => MaskFuture::ready(mask),
180 })
181 }
182
183 fn filter_evaluation(
184 &self,
185 row_range: &Range<u64>,
186 expr: &Expression,
187 mask: MaskFuture,
188 ) -> VortexResult<MaskFuture> {
189 match &self.partition_expr(expr) {
190 Partitioning::RowIdx(_) => Ok(mask),
193 Partitioning::Child(expr) => self.child.filter_evaluation(row_range, expr, mask),
194 Partitioning::Partitioned(p) => p.clone().into_mask_future(
195 mask,
196 |annotation, expr, mask| match annotation {
197 Partition::RowIdx => Ok(row_idx_mask_future(
198 self.row_offset,
199 row_range,
200 expr,
201 mask,
202 self.session.clone(),
203 )),
204 Partition::Child => self.child.filter_evaluation(row_range, expr, mask),
205 },
206 |annotation, expr, mask| match annotation {
207 Partition::RowIdx => {
208 Ok(row_idx_array_future(self.row_offset, row_range, expr, mask))
209 }
210 Partition::Child => self.child.projection_evaluation(row_range, expr, mask),
211 },
212 self.session.clone(),
213 ),
214 }
215 }
216
217 fn projection_evaluation(
218 &self,
219 row_range: &Range<u64>,
220 expr: &Expression,
221 mask: MaskFuture,
222 ) -> VortexResult<BoxFuture<'static, VortexResult<ArrayRef>>> {
223 match &self.partition_expr(expr) {
224 Partitioning::RowIdx(expr) => {
225 Ok(row_idx_array_future(self.row_offset, row_range, expr, mask))
226 }
227 Partitioning::Child(expr) => self.child.projection_evaluation(row_range, expr, mask),
228 Partitioning::Partitioned(p) => {
229 p.clone()
230 .into_array_future(mask, |annotation, expr, mask| match annotation {
231 Partition::RowIdx => {
232 Ok(row_idx_array_future(self.row_offset, row_range, expr, mask))
233 }
234 Partition::Child => self.child.projection_evaluation(row_range, expr, mask),
235 })
236 }
237 }
238 }
239}
240
241fn idx_array(row_offset: u64, row_range: &Range<u64>) -> SequenceArray {
243 SequenceArray::new(
244 PValue::U64(row_offset + row_range.start),
245 PValue::U64(1),
246 PType::U64,
247 NonNullable,
248 usize::try_from(row_range.end - row_range.start)
249 .vortex_expect("Row range length must fit in usize"),
250 )
251 .vortex_expect("Failed to create row index array")
252}
253
254fn row_idx_mask_future(
255 row_offset: u64,
256 row_range: &Range<u64>,
257 expr: &Expression,
258 mask: MaskFuture,
259 session: VortexSession,
260) -> MaskFuture {
261 let row_range = row_range.clone();
262 let expr = expr.clone();
263 MaskFuture::new(mask.len(), async move {
264 let array = idx_array(row_offset, &row_range).into_array();
265
266 let result_mask = if *USE_VORTEX_OPERATORS {
267 array.apply(&expr)?.execute_mask(&session)
268 } else {
269 expr.evaluate(&array)?.try_to_mask_fill_null_false()
270 }?;
271
272 Ok(result_mask.bitand(&mask.await?))
273 })
274}
275
276fn row_idx_array_future(
277 row_offset: u64,
278 row_range: &Range<u64>,
279 expr: &Expression,
280 mask: MaskFuture,
281) -> ArrayFuture {
282 let row_range = row_range.clone();
283 let expr = expr.clone();
284 async move {
285 let array = idx_array(row_offset, &row_range).into_array();
286 let array = filter(&array, &mask.await?)?;
287 if *USE_VORTEX_OPERATORS {
288 array.apply(&expr)
289 } else {
290 expr.evaluate(&array)
291 }
292 }
293 .boxed()
294}
295
296#[cfg(test)]
297mod tests {
298 use std::sync::Arc;
299
300 use itertools::Itertools;
301 use vortex_array::ArrayContext;
302 use vortex_array::IntoArray as _;
303 use vortex_array::MaskFuture;
304 use vortex_array::ToCanonical;
305 use vortex_array::expr::eq;
306 use vortex_array::expr::gt;
307 use vortex_array::expr::lit;
308 use vortex_array::expr::or;
309 use vortex_array::expr::root;
310 use vortex_buffer::BitBuffer;
311 use vortex_buffer::buffer;
312 use vortex_io::runtime::single::block_on;
313
314 use crate::LayoutReader;
315 use crate::LayoutStrategy;
316 use crate::layouts::flat::writer::FlatLayoutStrategy;
317 use crate::layouts::row_idx::RowIdxLayoutReader;
318 use crate::layouts::row_idx::row_idx;
319 use crate::segments::TestSegments;
320 use crate::sequence::SequenceId;
321 use crate::sequence::SequentialArrayStreamExt;
322 use crate::test::SESSION;
323
324 #[test]
325 fn flat_expr_no_row_id() {
326 block_on(|handle| async {
327 let ctx = ArrayContext::empty();
328 let segments = Arc::new(TestSegments::default());
329 let (ptr, eof) = SequenceId::root().split();
330 let array = buffer![1..=5].into_array();
331 let layout = FlatLayoutStrategy::default()
332 .write_stream(
333 ctx,
334 segments.clone(),
335 array.to_array_stream().sequenced(ptr),
336 eof,
337 handle,
338 )
339 .await
340 .unwrap();
341
342 let expr = eq(root(), lit(3i32));
343 let result = RowIdxLayoutReader::new(
344 0,
345 layout.new_reader("".into(), segments, &SESSION).unwrap(),
346 SESSION.clone(),
347 )
348 .projection_evaluation(
349 &(0..layout.row_count()),
350 &expr,
351 MaskFuture::new_true(layout.row_count().try_into().unwrap()),
352 )
353 .unwrap()
354 .await
355 .unwrap()
356 .to_bool();
357
358 assert_eq!(
359 &BitBuffer::from_iter([false, false, true, false, false]),
360 result.bit_buffer()
361 );
362 })
363 }
364
365 #[test]
366 fn flat_expr_row_id() {
367 block_on(|handle| async {
368 let ctx = ArrayContext::empty();
369 let segments = Arc::new(TestSegments::default());
370 let (ptr, eof) = SequenceId::root().split();
371 let array = buffer![1..=5].into_array();
372 let layout = FlatLayoutStrategy::default()
373 .write_stream(
374 ctx,
375 segments.clone(),
376 array.to_array_stream().sequenced(ptr),
377 eof,
378 handle,
379 )
380 .await
381 .unwrap();
382
383 let expr = gt(row_idx(), lit(3u64));
384 let result = RowIdxLayoutReader::new(
385 0,
386 layout.new_reader("".into(), segments, &SESSION).unwrap(),
387 SESSION.clone(),
388 )
389 .projection_evaluation(
390 &(0..layout.row_count()),
391 &expr,
392 MaskFuture::new_true(layout.row_count().try_into().unwrap()),
393 )
394 .unwrap()
395 .await
396 .unwrap()
397 .to_bool();
398
399 assert_eq!(
400 &BitBuffer::from_iter([false, false, false, false, true]),
401 result.bit_buffer()
402 );
403 })
404 }
405
406 #[test]
407 fn flat_expr_or() {
408 block_on(|handle| async {
409 let ctx = ArrayContext::empty();
410 let segments = Arc::new(TestSegments::default());
411 let (ptr, eof) = SequenceId::root().split();
412 let array = buffer![1..=5].into_array();
413 let layout = FlatLayoutStrategy::default()
414 .write_stream(
415 ctx,
416 segments.clone(),
417 array.to_array_stream().sequenced(ptr),
418 eof,
419 handle,
420 )
421 .await
422 .unwrap();
423
424 let expr = or(
425 eq(root(), lit(3i32)),
426 or(gt(row_idx(), lit(3u64)), eq(root(), lit(1i32))),
427 );
428
429 let result = RowIdxLayoutReader::new(
430 0,
431 layout.new_reader("".into(), segments, &SESSION).unwrap(),
432 SESSION.clone(),
433 )
434 .projection_evaluation(
435 &(0..layout.row_count()),
436 &expr,
437 MaskFuture::new_true(layout.row_count().try_into().unwrap()),
438 )
439 .unwrap()
440 .await
441 .unwrap()
442 .to_bool();
443
444 assert_eq!(
445 vec![true, false, true, false, true],
446 result.bit_buffer().iter().collect_vec()
447 );
448 })
449 }
450}