vortex_layout/layouts/row_idx/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4mod expr;
5
6use std::collections::BTreeSet;
7use std::fmt::Display;
8use std::fmt::Formatter;
9use std::ops::BitAnd;
10use std::ops::Range;
11use std::sync::Arc;
12
13use Nullability::NonNullable;
14pub use expr::*;
15use futures::FutureExt;
16use futures::future::BoxFuture;
17use vortex_array::ArrayRef;
18use vortex_array::IntoArray;
19use vortex_array::MaskFuture;
20use vortex_array::compute::filter;
21use vortex_array::expr::ExactExpr;
22use vortex_array::expr::Expression;
23use vortex_array::expr::is_root;
24use vortex_array::expr::root;
25use vortex_array::expr::transform::PartitionedExpr;
26use vortex_array::expr::transform::partition;
27use vortex_array::expr::transform::replace;
28use vortex_array::mask::MaskExecutor;
29use vortex_dtype::DType;
30use vortex_dtype::FieldMask;
31use vortex_dtype::FieldName;
32use vortex_dtype::Nullability;
33use vortex_dtype::PType;
34use vortex_error::VortexExpect;
35use vortex_error::VortexResult;
36use vortex_mask::Mask;
37use vortex_scalar::PValue;
38use vortex_sequence::SequenceArray;
39use vortex_session::VortexSession;
40use vortex_utils::aliases::dash_map::DashMap;
41
42use crate::ArrayFuture;
43use crate::LayoutReader;
44use crate::layouts::USE_VORTEX_OPERATORS;
45use crate::layouts::partitioned::PartitionedExprEval;
46
47pub struct RowIdxLayoutReader {
48    name: Arc<str>,
49    row_offset: u64,
50    child: Arc<dyn LayoutReader>,
51    partition_cache: DashMap<ExactExpr, Partitioning>,
52    session: VortexSession,
53}
54
55impl RowIdxLayoutReader {
56    pub fn new(row_offset: u64, child: Arc<dyn LayoutReader>, session: VortexSession) -> Self {
57        Self {
58            name: child.name().clone(),
59            row_offset,
60            child,
61            partition_cache: DashMap::with_hasher(Default::default()),
62            session,
63        }
64    }
65
66    fn partition_expr(&self, expr: &Expression) -> Partitioning {
67        self.partition_cache
68            .entry(ExactExpr(expr.clone()))
69            .or_insert_with(|| {
70                // Partition the expression into row idx and child expressions.
71                let mut partitioned = partition(expr.clone(), self.dtype(), |expr| {
72                    if expr.is::<RowIdx>() {
73                        vec![Partition::RowIdx]
74                    } else if is_root(expr) {
75                        vec![Partition::Child]
76                    } else {
77                        vec![]
78                    }
79                })
80                .vortex_expect("We should not fail to partition expression over struct fields");
81
82                // If there's only a single partition, we can directly return the expression.
83                if partitioned.partitions.len() == 1 {
84                    return match &partitioned.partition_annotations[0] {
85                        Partition::RowIdx => {
86                            Partitioning::RowIdx(replace(expr.clone(), &row_idx(), root()))
87                        }
88                        Partition::Child => Partitioning::Child(expr.clone()),
89                    };
90                }
91
92                // Replace the row_idx expression with the root expression in the row_idx partition.
93                partitioned.partitions = partitioned
94                    .partitions
95                    .into_iter()
96                    .map(|p| replace(p, &row_idx(), root()))
97                    .collect();
98
99                Partitioning::Partitioned(Arc::new(partitioned))
100            })
101            .clone()
102    }
103}
104
105#[derive(Clone)]
106enum Partitioning {
107    // An expression that only references the row index (e.g., `row_idx == 5`).
108    RowIdx(Expression),
109    // An expression that does not reference the row index.
110    Child(Expression),
111    // Contains both the RowIdx and Child expressions, (e.g., `row_idx < child.some_field`).
112    Partitioned(Arc<PartitionedExpr<Partition>>),
113}
114
115#[derive(Clone, PartialEq, Eq, Hash)]
116enum Partition {
117    RowIdx,
118    Child,
119}
120
121impl Partition {
122    pub fn name(&self) -> &str {
123        match self {
124            Partition::RowIdx => "row_idx",
125            Partition::Child => "child",
126        }
127    }
128}
129
130impl From<Partition> for FieldName {
131    fn from(value: Partition) -> Self {
132        FieldName::from(value.name())
133    }
134}
135
136impl Display for Partition {
137    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
138        write!(f, "{}", self.name())
139    }
140}
141
142impl LayoutReader for RowIdxLayoutReader {
143    fn name(&self) -> &Arc<str> {
144        &self.name
145    }
146
147    fn dtype(&self) -> &DType {
148        self.child.dtype()
149    }
150
151    fn row_count(&self) -> u64 {
152        self.child.row_count()
153    }
154
155    fn register_splits(
156        &self,
157        field_mask: &[FieldMask],
158        row_range: &Range<u64>,
159        splits: &mut BTreeSet<u64>,
160    ) -> VortexResult<()> {
161        self.child.register_splits(field_mask, row_range, splits)
162    }
163
164    fn pruning_evaluation(
165        &self,
166        row_range: &Range<u64>,
167        expr: &Expression,
168        mask: Mask,
169    ) -> VortexResult<MaskFuture> {
170        Ok(match &self.partition_expr(expr) {
171            Partitioning::RowIdx(expr) => row_idx_mask_future(
172                self.row_offset,
173                row_range,
174                expr,
175                MaskFuture::ready(mask),
176                self.session.clone(),
177            ),
178            Partitioning::Child(expr) => self.child.pruning_evaluation(row_range, expr, mask)?,
179            Partitioning::Partitioned(..) => MaskFuture::ready(mask),
180        })
181    }
182
183    fn filter_evaluation(
184        &self,
185        row_range: &Range<u64>,
186        expr: &Expression,
187        mask: MaskFuture,
188    ) -> VortexResult<MaskFuture> {
189        match &self.partition_expr(expr) {
190            // Since this is run during pruning, we skip re-evaluating the row index expression
191            // during the filter evaluation.
192            Partitioning::RowIdx(_) => Ok(mask),
193            Partitioning::Child(expr) => self.child.filter_evaluation(row_range, expr, mask),
194            Partitioning::Partitioned(p) => p.clone().into_mask_future(
195                mask,
196                |annotation, expr, mask| match annotation {
197                    Partition::RowIdx => Ok(row_idx_mask_future(
198                        self.row_offset,
199                        row_range,
200                        expr,
201                        mask,
202                        self.session.clone(),
203                    )),
204                    Partition::Child => self.child.filter_evaluation(row_range, expr, mask),
205                },
206                |annotation, expr, mask| match annotation {
207                    Partition::RowIdx => {
208                        Ok(row_idx_array_future(self.row_offset, row_range, expr, mask))
209                    }
210                    Partition::Child => self.child.projection_evaluation(row_range, expr, mask),
211                },
212                self.session.clone(),
213            ),
214        }
215    }
216
217    fn projection_evaluation(
218        &self,
219        row_range: &Range<u64>,
220        expr: &Expression,
221        mask: MaskFuture,
222    ) -> VortexResult<BoxFuture<'static, VortexResult<ArrayRef>>> {
223        match &self.partition_expr(expr) {
224            Partitioning::RowIdx(expr) => {
225                Ok(row_idx_array_future(self.row_offset, row_range, expr, mask))
226            }
227            Partitioning::Child(expr) => self.child.projection_evaluation(row_range, expr, mask),
228            Partitioning::Partitioned(p) => {
229                p.clone()
230                    .into_array_future(mask, |annotation, expr, mask| match annotation {
231                        Partition::RowIdx => {
232                            Ok(row_idx_array_future(self.row_offset, row_range, expr, mask))
233                        }
234                        Partition::Child => self.child.projection_evaluation(row_range, expr, mask),
235                    })
236            }
237        }
238    }
239}
240
241// Returns a SequenceArray representing the row indices for the given row range,
242fn idx_array(row_offset: u64, row_range: &Range<u64>) -> SequenceArray {
243    SequenceArray::new(
244        PValue::U64(row_offset + row_range.start),
245        PValue::U64(1),
246        PType::U64,
247        NonNullable,
248        usize::try_from(row_range.end - row_range.start)
249            .vortex_expect("Row range length must fit in usize"),
250    )
251    .vortex_expect("Failed to create row index array")
252}
253
254fn row_idx_mask_future(
255    row_offset: u64,
256    row_range: &Range<u64>,
257    expr: &Expression,
258    mask: MaskFuture,
259    session: VortexSession,
260) -> MaskFuture {
261    let row_range = row_range.clone();
262    let expr = expr.clone();
263    MaskFuture::new(mask.len(), async move {
264        let array = idx_array(row_offset, &row_range).into_array();
265
266        let result_mask = if *USE_VORTEX_OPERATORS {
267            array.apply(&expr)?.execute_mask(&session)
268        } else {
269            expr.evaluate(&array)?.try_to_mask_fill_null_false()
270        }?;
271
272        Ok(result_mask.bitand(&mask.await?))
273    })
274}
275
276fn row_idx_array_future(
277    row_offset: u64,
278    row_range: &Range<u64>,
279    expr: &Expression,
280    mask: MaskFuture,
281) -> ArrayFuture {
282    let row_range = row_range.clone();
283    let expr = expr.clone();
284    async move {
285        let array = idx_array(row_offset, &row_range).into_array();
286        let array = filter(&array, &mask.await?)?;
287        if *USE_VORTEX_OPERATORS {
288            array.apply(&expr)
289        } else {
290            expr.evaluate(&array)
291        }
292    }
293    .boxed()
294}
295
296#[cfg(test)]
297mod tests {
298    use std::sync::Arc;
299
300    use itertools::Itertools;
301    use vortex_array::ArrayContext;
302    use vortex_array::IntoArray as _;
303    use vortex_array::MaskFuture;
304    use vortex_array::ToCanonical;
305    use vortex_array::expr::eq;
306    use vortex_array::expr::gt;
307    use vortex_array::expr::lit;
308    use vortex_array::expr::or;
309    use vortex_array::expr::root;
310    use vortex_buffer::BitBuffer;
311    use vortex_buffer::buffer;
312    use vortex_io::runtime::single::block_on;
313
314    use crate::LayoutReader;
315    use crate::LayoutStrategy;
316    use crate::layouts::flat::writer::FlatLayoutStrategy;
317    use crate::layouts::row_idx::RowIdxLayoutReader;
318    use crate::layouts::row_idx::row_idx;
319    use crate::segments::TestSegments;
320    use crate::sequence::SequenceId;
321    use crate::sequence::SequentialArrayStreamExt;
322    use crate::test::SESSION;
323
324    #[test]
325    fn flat_expr_no_row_id() {
326        block_on(|handle| async {
327            let ctx = ArrayContext::empty();
328            let segments = Arc::new(TestSegments::default());
329            let (ptr, eof) = SequenceId::root().split();
330            let array = buffer![1..=5].into_array();
331            let layout = FlatLayoutStrategy::default()
332                .write_stream(
333                    ctx,
334                    segments.clone(),
335                    array.to_array_stream().sequenced(ptr),
336                    eof,
337                    handle,
338                )
339                .await
340                .unwrap();
341
342            let expr = eq(root(), lit(3i32));
343            let result = RowIdxLayoutReader::new(
344                0,
345                layout.new_reader("".into(), segments, &SESSION).unwrap(),
346                SESSION.clone(),
347            )
348            .projection_evaluation(
349                &(0..layout.row_count()),
350                &expr,
351                MaskFuture::new_true(layout.row_count().try_into().unwrap()),
352            )
353            .unwrap()
354            .await
355            .unwrap()
356            .to_bool();
357
358            assert_eq!(
359                &BitBuffer::from_iter([false, false, true, false, false]),
360                result.bit_buffer()
361            );
362        })
363    }
364
365    #[test]
366    fn flat_expr_row_id() {
367        block_on(|handle| async {
368            let ctx = ArrayContext::empty();
369            let segments = Arc::new(TestSegments::default());
370            let (ptr, eof) = SequenceId::root().split();
371            let array = buffer![1..=5].into_array();
372            let layout = FlatLayoutStrategy::default()
373                .write_stream(
374                    ctx,
375                    segments.clone(),
376                    array.to_array_stream().sequenced(ptr),
377                    eof,
378                    handle,
379                )
380                .await
381                .unwrap();
382
383            let expr = gt(row_idx(), lit(3u64));
384            let result = RowIdxLayoutReader::new(
385                0,
386                layout.new_reader("".into(), segments, &SESSION).unwrap(),
387                SESSION.clone(),
388            )
389            .projection_evaluation(
390                &(0..layout.row_count()),
391                &expr,
392                MaskFuture::new_true(layout.row_count().try_into().unwrap()),
393            )
394            .unwrap()
395            .await
396            .unwrap()
397            .to_bool();
398
399            assert_eq!(
400                &BitBuffer::from_iter([false, false, false, false, true]),
401                result.bit_buffer()
402            );
403        })
404    }
405
406    #[test]
407    fn flat_expr_or() {
408        block_on(|handle| async {
409            let ctx = ArrayContext::empty();
410            let segments = Arc::new(TestSegments::default());
411            let (ptr, eof) = SequenceId::root().split();
412            let array = buffer![1..=5].into_array();
413            let layout = FlatLayoutStrategy::default()
414                .write_stream(
415                    ctx,
416                    segments.clone(),
417                    array.to_array_stream().sequenced(ptr),
418                    eof,
419                    handle,
420                )
421                .await
422                .unwrap();
423
424            let expr = or(
425                eq(root(), lit(3i32)),
426                or(gt(row_idx(), lit(3u64)), eq(root(), lit(1i32))),
427            );
428
429            let result = RowIdxLayoutReader::new(
430                0,
431                layout.new_reader("".into(), segments, &SESSION).unwrap(),
432                SESSION.clone(),
433            )
434            .projection_evaluation(
435                &(0..layout.row_count()),
436                &expr,
437                MaskFuture::new_true(layout.row_count().try_into().unwrap()),
438            )
439            .unwrap()
440            .await
441            .unwrap()
442            .to_bool();
443
444            assert_eq!(
445                vec![true, false, true, false, true],
446                result.bit_buffer().iter().collect_vec()
447            );
448        })
449    }
450}