vortex_layout/layouts/row_idx/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4mod expr;
5
6use std::collections::BTreeSet;
7use std::fmt::{Display, Formatter};
8use std::ops::{BitAnd, Range};
9use std::sync::Arc;
10
11use Nullability::NonNullable;
12pub use expr::*;
13use futures::FutureExt;
14use futures::future::BoxFuture;
15use vortex_array::compute::filter;
16use vortex_array::stats::Precision;
17use vortex_array::{ArrayRef, IntoArray, MaskFuture};
18use vortex_dtype::{DType, FieldMask, Nullability, PType};
19use vortex_error::{VortexExpect, VortexResult};
20use vortex_expr::transform::{PartitionedExpr, partition, replace};
21use vortex_expr::{ExactExpr, ExprRef, Scope, is_root, root};
22use vortex_mask::Mask;
23use vortex_scalar::PValue;
24use vortex_sequence::SequenceArray;
25use vortex_utils::aliases::dash_map::DashMap;
26
27use crate::layouts::partitioned::PartitionedExprEval;
28use crate::{ArrayFuture, LayoutReader};
29
30pub struct RowIdxLayoutReader {
31    name: Arc<str>,
32    row_offset: u64,
33    child: Arc<dyn LayoutReader>,
34
35    partition_cache: DashMap<ExactExpr, Partitioning>,
36}
37
38impl RowIdxLayoutReader {
39    pub fn new(row_offset: u64, child: Arc<dyn LayoutReader>) -> Self {
40        Self {
41            name: child.name().clone(),
42            row_offset,
43            child,
44            partition_cache: DashMap::with_hasher(Default::default()),
45        }
46    }
47
48    fn partition_expr(&self, expr: &ExprRef) -> Partitioning {
49        self.partition_cache
50            .entry(ExactExpr(expr.clone()))
51            .or_insert_with(|| {
52                // Partition the expression into row idx and child expressions.
53                let mut partitioned = partition(expr.clone(), self.dtype(), |expr| {
54                    if expr.is::<RowIdxVTable>() {
55                        vec![Partition::RowIdx]
56                    } else if is_root(expr) {
57                        vec![Partition::Child]
58                    } else {
59                        vec![]
60                    }
61                })
62                .vortex_expect("We should not fail to partition expression over struct fields");
63
64                // If there's only a single partition, we can directly return the expression.
65                if partitioned.partitions.len() == 1 {
66                    return match &partitioned.partition_annotations[0] {
67                        Partition::RowIdx => {
68                            Partitioning::RowIdx(replace(expr.clone(), &row_idx(), root()))
69                        }
70                        Partition::Child => Partitioning::Child(expr.clone()),
71                    };
72                }
73
74                // Replace the row_idx expression with the root expression in the row_idx partition.
75                partitioned.partitions = partitioned
76                    .partitions
77                    .into_iter()
78                    .map(|p| replace(p, &row_idx(), root()))
79                    .collect();
80
81                Partitioning::Partitioned(Arc::new(partitioned))
82            })
83            .clone()
84    }
85}
86
87#[derive(Clone)]
88enum Partitioning {
89    // An expression that only references the row index (e.g., `row_idx == 5`).
90    RowIdx(ExprRef),
91    // An expression that does not reference the row index.
92    Child(ExprRef),
93    // Contains both the RowIdx and Child expressions, (e.g., `row_idx < child.some_field`).
94    Partitioned(Arc<PartitionedExpr<Partition>>),
95}
96
97#[derive(Clone, PartialEq, Eq, Hash)]
98enum Partition {
99    RowIdx,
100    Child,
101}
102
103impl Display for Partition {
104    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
105        match self {
106            Partition::RowIdx => write!(f, "row_idx"),
107            Partition::Child => write!(f, "child"),
108        }
109    }
110}
111
112impl LayoutReader for RowIdxLayoutReader {
113    fn name(&self) -> &Arc<str> {
114        &self.name
115    }
116
117    fn dtype(&self) -> &DType {
118        self.child.dtype()
119    }
120
121    fn row_count(&self) -> Precision<u64> {
122        self.child.row_count()
123    }
124
125    fn register_splits(
126        &self,
127        field_mask: &[FieldMask],
128        row_offset: u64,
129        splits: &mut BTreeSet<u64>,
130    ) -> VortexResult<()> {
131        self.child.register_splits(field_mask, row_offset, splits)
132    }
133
134    fn pruning_evaluation(
135        &self,
136        row_range: &Range<u64>,
137        expr: &ExprRef,
138        mask: Mask,
139    ) -> VortexResult<MaskFuture> {
140        Ok(match &self.partition_expr(expr) {
141            Partitioning::RowIdx(expr) => {
142                row_idx_mask_future(self.row_offset, row_range, expr, MaskFuture::ready(mask))
143            }
144            Partitioning::Child(expr) => self.child.pruning_evaluation(row_range, expr, mask)?,
145            Partitioning::Partitioned(..) => MaskFuture::ready(mask),
146        })
147    }
148
149    fn filter_evaluation(
150        &self,
151        row_range: &Range<u64>,
152        expr: &ExprRef,
153        mask: MaskFuture,
154    ) -> VortexResult<MaskFuture> {
155        match &self.partition_expr(expr) {
156            // Since this is run during pruning, we skip re-evaluating the row index expression
157            // during the filter evaluation.
158            Partitioning::RowIdx(_) => Ok(mask),
159            Partitioning::Child(expr) => self.child.filter_evaluation(row_range, expr, mask),
160            Partitioning::Partitioned(p) => p.clone().into_mask_future(
161                mask,
162                |annotation, expr, mask| match annotation {
163                    Partition::RowIdx => {
164                        Ok(row_idx_mask_future(self.row_offset, row_range, expr, mask))
165                    }
166                    Partition::Child => self.child.filter_evaluation(row_range, expr, mask),
167                },
168                |annotation, expr, mask| match annotation {
169                    Partition::RowIdx => {
170                        Ok(row_idx_array_future(self.row_offset, row_range, expr, mask))
171                    }
172                    Partition::Child => self.child.projection_evaluation(row_range, expr, mask),
173                },
174            ),
175        }
176    }
177
178    fn projection_evaluation(
179        &self,
180        row_range: &Range<u64>,
181        expr: &ExprRef,
182        mask: MaskFuture,
183    ) -> VortexResult<BoxFuture<'static, VortexResult<ArrayRef>>> {
184        match &self.partition_expr(expr) {
185            Partitioning::RowIdx(expr) => {
186                Ok(row_idx_array_future(self.row_offset, row_range, expr, mask))
187            }
188            Partitioning::Child(expr) => self.child.projection_evaluation(row_range, expr, mask),
189            Partitioning::Partitioned(p) => {
190                p.clone()
191                    .into_array_future(mask, |annotation, expr, mask| match annotation {
192                        Partition::RowIdx => {
193                            Ok(row_idx_array_future(self.row_offset, row_range, expr, mask))
194                        }
195                        Partition::Child => self.child.projection_evaluation(row_range, expr, mask),
196                    })
197            }
198        }
199    }
200}
201
202// Returns a SequenceArray representing the row indices for the given row range,
203fn idx_array(row_offset: u64, row_range: &Range<u64>) -> SequenceArray {
204    SequenceArray::new(
205        PValue::U64(row_offset + row_range.start),
206        PValue::U64(1),
207        PType::U64,
208        NonNullable,
209        usize::try_from(row_range.end - row_range.start)
210            .vortex_expect("Row range length must fit in usize"),
211    )
212    .vortex_expect("Failed to create row index array")
213}
214
215fn row_idx_mask_future(
216    row_offset: u64,
217    row_range: &Range<u64>,
218    expr: &ExprRef,
219    mask: MaskFuture,
220) -> MaskFuture {
221    let row_range = row_range.clone();
222    let expr = expr.clone();
223    MaskFuture::new(mask.len(), async move {
224        let array = idx_array(row_offset, &row_range).into_array();
225        let result_mask = expr
226            .evaluate(&Scope::new(array))?
227            .try_to_mask_fill_null_false()?;
228        Ok(result_mask.bitand(&mask.await?))
229    })
230}
231
232fn row_idx_array_future(
233    row_offset: u64,
234    row_range: &Range<u64>,
235    expr: &ExprRef,
236    mask: MaskFuture,
237) -> ArrayFuture {
238    let row_range = row_range.clone();
239    let expr = expr.clone();
240    async move {
241        let array = idx_array(row_offset, &row_range).into_array();
242        let array = filter(&array, &mask.await?)?;
243        expr.evaluate(&Scope::new(array))
244    }
245    .boxed()
246}
247
248#[cfg(test)]
249mod tests {
250    use std::sync::Arc;
251
252    use arrow_buffer::BooleanBuffer;
253    use itertools::Itertools;
254    use vortex_array::{ArrayContext, IntoArray as _, MaskFuture, ToCanonical};
255    use vortex_buffer::buffer;
256    use vortex_expr::{eq, gt, lit, or, root};
257    use vortex_io::runtime::single::block_on;
258
259    use crate::layouts::flat::writer::FlatLayoutStrategy;
260    use crate::layouts::row_idx::{RowIdxLayoutReader, row_idx};
261    use crate::segments::TestSegments;
262    use crate::sequence::{SequenceId, SequentialArrayStreamExt};
263    use crate::{LayoutReader, LayoutStrategy};
264
265    #[test]
266    fn flat_expr_no_row_id() {
267        block_on(|handle| async {
268            let ctx = ArrayContext::empty();
269            let segments = Arc::new(TestSegments::default());
270            let (ptr, eof) = SequenceId::root().split();
271            let array = buffer![1..=5].into_array();
272            let layout = FlatLayoutStrategy::default()
273                .write_stream(
274                    ctx,
275                    segments.clone(),
276                    array.to_array_stream().sequenced(ptr),
277                    eof,
278                    handle,
279                )
280                .await
281                .unwrap();
282
283            let expr = eq(root(), lit(3i32));
284            let result =
285                RowIdxLayoutReader::new(0, layout.new_reader("".into(), segments).unwrap())
286                    .projection_evaluation(
287                        &(0..layout.row_count()),
288                        &expr,
289                        MaskFuture::new_true(layout.row_count().try_into().unwrap()),
290                    )
291                    .unwrap()
292                    .await
293                    .unwrap()
294                    .to_bool();
295
296            assert_eq!(
297                &BooleanBuffer::from_iter([false, false, true, false, false]),
298                result.boolean_buffer()
299            );
300        })
301    }
302
303    #[test]
304    fn flat_expr_row_id() {
305        block_on(|handle| async {
306            let ctx = ArrayContext::empty();
307            let segments = Arc::new(TestSegments::default());
308            let (ptr, eof) = SequenceId::root().split();
309            let array = buffer![1..=5].into_array();
310            let layout = FlatLayoutStrategy::default()
311                .write_stream(
312                    ctx,
313                    segments.clone(),
314                    array.to_array_stream().sequenced(ptr),
315                    eof,
316                    handle,
317                )
318                .await
319                .unwrap();
320
321            let expr = gt(row_idx(), lit(3u64));
322            let result =
323                RowIdxLayoutReader::new(0, layout.new_reader("".into(), segments).unwrap())
324                    .projection_evaluation(
325                        &(0..layout.row_count()),
326                        &expr,
327                        MaskFuture::new_true(layout.row_count().try_into().unwrap()),
328                    )
329                    .unwrap()
330                    .await
331                    .unwrap()
332                    .to_bool();
333
334            assert_eq!(
335                &BooleanBuffer::from_iter([false, false, false, false, true]),
336                result.boolean_buffer()
337            );
338        })
339    }
340
341    #[test]
342    fn flat_expr_or() {
343        block_on(|handle| async {
344            let ctx = ArrayContext::empty();
345            let segments = Arc::new(TestSegments::default());
346            let (ptr, eof) = SequenceId::root().split();
347            let array = buffer![1..=5].into_array();
348            let layout = FlatLayoutStrategy::default()
349                .write_stream(
350                    ctx,
351                    segments.clone(),
352                    array.to_array_stream().sequenced(ptr),
353                    eof,
354                    handle,
355                )
356                .await
357                .unwrap();
358
359            let expr = or(
360                eq(root(), lit(3i32)),
361                or(gt(row_idx(), lit(3u64)), eq(root(), lit(1i32))),
362            );
363
364            let result =
365                RowIdxLayoutReader::new(0, layout.new_reader("".into(), segments).unwrap())
366                    .projection_evaluation(
367                        &(0..layout.row_count()),
368                        &expr,
369                        MaskFuture::new_true(layout.row_count().try_into().unwrap()),
370                    )
371                    .unwrap()
372                    .await
373                    .unwrap()
374                    .to_bool();
375
376            assert_eq!(
377                vec![true, false, true, false, true],
378                result.boolean_buffer().iter().collect_vec()
379            );
380        })
381    }
382}