Skip to main content

vortex_layout/layouts/row_idx/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4mod expr;
5
6use std::collections::BTreeSet;
7use std::fmt::Display;
8use std::fmt::Formatter;
9use std::ops::BitAnd;
10use std::ops::Range;
11use std::sync::Arc;
12
13use Nullability::NonNullable;
14pub use expr::*;
15use futures::FutureExt;
16use futures::future::BoxFuture;
17use vortex_array::ArrayRef;
18use vortex_array::IntoArray;
19use vortex_array::MaskFuture;
20use vortex_array::VortexSessionExecute;
21use vortex_array::compute::filter;
22use vortex_array::expr::ExactExpr;
23use vortex_array::expr::Expression;
24use vortex_array::expr::is_root;
25use vortex_array::expr::root;
26use vortex_array::expr::transform::PartitionedExpr;
27use vortex_array::expr::transform::partition;
28use vortex_array::expr::transform::replace;
29use vortex_array::scalar::PValue;
30use vortex_dtype::DType;
31use vortex_dtype::FieldMask;
32use vortex_dtype::FieldName;
33use vortex_dtype::Nullability;
34use vortex_dtype::PType;
35use vortex_error::VortexExpect;
36use vortex_error::VortexResult;
37use vortex_mask::Mask;
38use vortex_sequence::SequenceArray;
39use vortex_session::VortexSession;
40use vortex_utils::aliases::dash_map::DashMap;
41
42use crate::ArrayFuture;
43use crate::LayoutReader;
44use crate::layouts::partitioned::PartitionedExprEval;
45
46pub struct RowIdxLayoutReader {
47    name: Arc<str>,
48    row_offset: u64,
49    child: Arc<dyn LayoutReader>,
50    partition_cache: DashMap<ExactExpr, Partitioning>,
51    session: VortexSession,
52}
53
54impl RowIdxLayoutReader {
55    pub fn new(row_offset: u64, child: Arc<dyn LayoutReader>, session: VortexSession) -> Self {
56        Self {
57            name: child.name().clone(),
58            row_offset,
59            child,
60            partition_cache: DashMap::with_hasher(Default::default()),
61            session,
62        }
63    }
64
65    fn partition_expr(&self, expr: &Expression) -> Partitioning {
66        let key = ExactExpr(expr.clone());
67
68        // Check cache first with read-only lock.
69        if let Some(partitioning) = self.partition_cache.get(&key) {
70            return partitioning.clone();
71        }
72
73        self.partition_cache
74            .entry(key)
75            .or_insert_with(|| {
76                // Partition the expression into row idx and child expressions.
77                let mut partitioned = partition(expr.clone(), self.dtype(), |expr| {
78                    if expr.is::<RowIdx>() {
79                        vec![Partition::RowIdx]
80                    } else if is_root(expr) {
81                        vec![Partition::Child]
82                    } else {
83                        vec![]
84                    }
85                })
86                .vortex_expect("We should not fail to partition expression over struct fields");
87
88                // If there's only a single partition, we can directly return the expression.
89                if partitioned.partitions.len() == 1 {
90                    return match &partitioned.partition_annotations[0] {
91                        Partition::RowIdx => {
92                            Partitioning::RowIdx(replace(expr.clone(), &row_idx(), root()))
93                        }
94                        Partition::Child => Partitioning::Child(expr.clone()),
95                    };
96                }
97
98                // Replace the row_idx expression with the root expression in the row_idx partition.
99                partitioned.partitions = partitioned
100                    .partitions
101                    .into_iter()
102                    .map(|p| replace(p, &row_idx(), root()))
103                    .collect();
104
105                Partitioning::Partitioned(Arc::new(partitioned))
106            })
107            .clone()
108    }
109}
110
111#[derive(Clone)]
112enum Partitioning {
113    // An expression that only references the row index (e.g., `row_idx == 5`).
114    RowIdx(Expression),
115    // An expression that does not reference the row index.
116    Child(Expression),
117    // Contains both the RowIdx and Child expressions, (e.g., `row_idx < child.some_field`).
118    Partitioned(Arc<PartitionedExpr<Partition>>),
119}
120
121#[derive(Clone, PartialEq, Eq, Hash)]
122enum Partition {
123    RowIdx,
124    Child,
125}
126
127impl Partition {
128    pub fn name(&self) -> &str {
129        match self {
130            Partition::RowIdx => "row_idx",
131            Partition::Child => "child",
132        }
133    }
134}
135
136impl From<Partition> for FieldName {
137    fn from(value: Partition) -> Self {
138        FieldName::from(value.name())
139    }
140}
141
142impl Display for Partition {
143    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
144        write!(f, "{}", self.name())
145    }
146}
147
148impl LayoutReader for RowIdxLayoutReader {
149    fn name(&self) -> &Arc<str> {
150        &self.name
151    }
152
153    fn dtype(&self) -> &DType {
154        self.child.dtype()
155    }
156
157    fn row_count(&self) -> u64 {
158        self.child.row_count()
159    }
160
161    fn register_splits(
162        &self,
163        field_mask: &[FieldMask],
164        row_range: &Range<u64>,
165        splits: &mut BTreeSet<u64>,
166    ) -> VortexResult<()> {
167        self.child.register_splits(field_mask, row_range, splits)
168    }
169
170    fn pruning_evaluation(
171        &self,
172        row_range: &Range<u64>,
173        expr: &Expression,
174        mask: Mask,
175    ) -> VortexResult<MaskFuture> {
176        Ok(match &self.partition_expr(expr) {
177            Partitioning::RowIdx(expr) => row_idx_mask_future(
178                self.row_offset,
179                row_range,
180                expr,
181                MaskFuture::ready(mask),
182                self.session.clone(),
183            ),
184            Partitioning::Child(expr) => self.child.pruning_evaluation(row_range, expr, mask)?,
185            Partitioning::Partitioned(..) => MaskFuture::ready(mask),
186        })
187    }
188
189    fn filter_evaluation(
190        &self,
191        row_range: &Range<u64>,
192        expr: &Expression,
193        mask: MaskFuture,
194    ) -> VortexResult<MaskFuture> {
195        match &self.partition_expr(expr) {
196            // Since this is run during pruning, we skip re-evaluating the row index expression
197            // during the filter evaluation.
198            Partitioning::RowIdx(_) => Ok(mask),
199            Partitioning::Child(expr) => self.child.filter_evaluation(row_range, expr, mask),
200            Partitioning::Partitioned(p) => p.clone().into_mask_future(
201                mask,
202                |annotation, expr, mask| match annotation {
203                    Partition::RowIdx => Ok(row_idx_mask_future(
204                        self.row_offset,
205                        row_range,
206                        expr,
207                        mask,
208                        self.session.clone(),
209                    )),
210                    Partition::Child => self.child.filter_evaluation(row_range, expr, mask),
211                },
212                |annotation, expr, mask| match annotation {
213                    Partition::RowIdx => {
214                        Ok(row_idx_array_future(self.row_offset, row_range, expr, mask))
215                    }
216                    Partition::Child => self.child.projection_evaluation(row_range, expr, mask),
217                },
218                self.session.clone(),
219            ),
220        }
221    }
222
223    fn projection_evaluation(
224        &self,
225        row_range: &Range<u64>,
226        expr: &Expression,
227        mask: MaskFuture,
228    ) -> VortexResult<BoxFuture<'static, VortexResult<ArrayRef>>> {
229        match &self.partition_expr(expr) {
230            Partitioning::RowIdx(expr) => {
231                Ok(row_idx_array_future(self.row_offset, row_range, expr, mask))
232            }
233            Partitioning::Child(expr) => self.child.projection_evaluation(row_range, expr, mask),
234            Partitioning::Partitioned(p) => {
235                p.clone()
236                    .into_array_future(mask, |annotation, expr, mask| match annotation {
237                        Partition::RowIdx => {
238                            Ok(row_idx_array_future(self.row_offset, row_range, expr, mask))
239                        }
240                        Partition::Child => self.child.projection_evaluation(row_range, expr, mask),
241                    })
242            }
243        }
244    }
245}
246
247// Returns a SequenceArray representing the row indices for the given row range,
248fn idx_array(row_offset: u64, row_range: &Range<u64>) -> SequenceArray {
249    SequenceArray::new(
250        PValue::U64(row_offset + row_range.start),
251        PValue::U64(1),
252        PType::U64,
253        NonNullable,
254        usize::try_from(row_range.end - row_range.start)
255            .vortex_expect("Row range length must fit in usize"),
256    )
257    .vortex_expect("Failed to create row index array")
258}
259
260fn row_idx_mask_future(
261    row_offset: u64,
262    row_range: &Range<u64>,
263    expr: &Expression,
264    mask: MaskFuture,
265    session: VortexSession,
266) -> MaskFuture {
267    let row_range = row_range.clone();
268    let expr = expr.clone();
269    MaskFuture::new(mask.len(), async move {
270        let array = idx_array(row_offset, &row_range).into_array();
271
272        let mut ctx = session.create_execution_ctx();
273        let result_mask = array.apply(&expr)?.execute::<Mask>(&mut ctx)?;
274
275        Ok(result_mask.bitand(&mask.await?))
276    })
277}
278
279fn row_idx_array_future(
280    row_offset: u64,
281    row_range: &Range<u64>,
282    expr: &Expression,
283    mask: MaskFuture,
284) -> ArrayFuture {
285    let row_range = row_range.clone();
286    let expr = expr.clone();
287    async move {
288        let array = idx_array(row_offset, &row_range).into_array();
289        let array = filter(&array, &mask.await?)?;
290        array.apply(&expr)
291    }
292    .boxed()
293}
294
295#[cfg(test)]
296mod tests {
297    use std::sync::Arc;
298
299    use vortex_array::ArrayContext;
300    use vortex_array::IntoArray as _;
301    use vortex_array::MaskFuture;
302    use vortex_array::arrays::BoolArray;
303    use vortex_array::assert_arrays_eq;
304    use vortex_array::expr::eq;
305    use vortex_array::expr::gt;
306    use vortex_array::expr::lit;
307    use vortex_array::expr::or;
308    use vortex_array::expr::root;
309    use vortex_buffer::buffer;
310    use vortex_io::runtime::single::block_on;
311
312    use crate::LayoutReader;
313    use crate::LayoutStrategy;
314    use crate::layouts::flat::writer::FlatLayoutStrategy;
315    use crate::layouts::row_idx::RowIdxLayoutReader;
316    use crate::layouts::row_idx::row_idx;
317    use crate::segments::TestSegments;
318    use crate::sequence::SequenceId;
319    use crate::sequence::SequentialArrayStreamExt;
320    use crate::test::SESSION;
321
322    #[test]
323    fn flat_expr_no_row_id() {
324        block_on(|handle| async {
325            let ctx = ArrayContext::empty();
326            let segments = Arc::new(TestSegments::default());
327            let (ptr, eof) = SequenceId::root().split();
328            let array = buffer![1..=5].into_array();
329            let layout = FlatLayoutStrategy::default()
330                .write_stream(
331                    ctx,
332                    segments.clone(),
333                    array.to_array_stream().sequenced(ptr),
334                    eof,
335                    handle,
336                )
337                .await
338                .unwrap();
339
340            let expr = eq(root(), lit(3i32));
341            let result = RowIdxLayoutReader::new(
342                0,
343                layout.new_reader("".into(), segments, &SESSION).unwrap(),
344                SESSION.clone(),
345            )
346            .projection_evaluation(
347                &(0..layout.row_count()),
348                &expr,
349                MaskFuture::new_true(layout.row_count().try_into().unwrap()),
350            )
351            .unwrap()
352            .await
353            .unwrap();
354
355            assert_arrays_eq!(
356                result,
357                BoolArray::from_iter([false, false, true, false, false])
358            );
359        })
360    }
361
362    #[test]
363    fn flat_expr_row_id() {
364        block_on(|handle| async {
365            let ctx = ArrayContext::empty();
366            let segments = Arc::new(TestSegments::default());
367            let (ptr, eof) = SequenceId::root().split();
368            let array = buffer![1..=5].into_array();
369            let layout = FlatLayoutStrategy::default()
370                .write_stream(
371                    ctx,
372                    segments.clone(),
373                    array.to_array_stream().sequenced(ptr),
374                    eof,
375                    handle,
376                )
377                .await
378                .unwrap();
379
380            let expr = gt(row_idx(), lit(3u64));
381            let result = RowIdxLayoutReader::new(
382                0,
383                layout.new_reader("".into(), segments, &SESSION).unwrap(),
384                SESSION.clone(),
385            )
386            .projection_evaluation(
387                &(0..layout.row_count()),
388                &expr,
389                MaskFuture::new_true(layout.row_count().try_into().unwrap()),
390            )
391            .unwrap()
392            .await
393            .unwrap();
394
395            assert_arrays_eq!(
396                result,
397                BoolArray::from_iter([false, false, false, false, true])
398            );
399        })
400    }
401
402    #[test]
403    fn flat_expr_or() {
404        block_on(|handle| async {
405            let ctx = ArrayContext::empty();
406            let segments = Arc::new(TestSegments::default());
407            let (ptr, eof) = SequenceId::root().split();
408            let array = buffer![1..=5].into_array();
409            let layout = FlatLayoutStrategy::default()
410                .write_stream(
411                    ctx,
412                    segments.clone(),
413                    array.to_array_stream().sequenced(ptr),
414                    eof,
415                    handle,
416                )
417                .await
418                .unwrap();
419
420            let expr = or(
421                eq(root(), lit(3i32)),
422                or(gt(row_idx(), lit(3u64)), eq(root(), lit(1i32))),
423            );
424
425            let result = RowIdxLayoutReader::new(
426                0,
427                layout.new_reader("".into(), segments, &SESSION).unwrap(),
428                SESSION.clone(),
429            )
430            .projection_evaluation(
431                &(0..layout.row_count()),
432                &expr,
433                MaskFuture::new_true(layout.row_count().try_into().unwrap()),
434            )
435            .unwrap()
436            .await
437            .unwrap();
438
439            assert_arrays_eq!(
440                result,
441                BoolArray::from_iter([true, false, true, false, true])
442            );
443        })
444    }
445}