Skip to main content

vortex_layout/layouts/row_idx/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4mod expr;
5
6use std::fmt::Display;
7use std::fmt::Formatter;
8use std::ops::BitAnd;
9use std::ops::Range;
10use std::sync::Arc;
11use std::sync::OnceLock;
12
13use Nullability::NonNullable;
14pub use expr::*;
15use futures::FutureExt;
16use futures::future::BoxFuture;
17use vortex_array::ArrayRef;
18use vortex_array::Canonical;
19use vortex_array::IntoArray;
20use vortex_array::MaskFuture;
21use vortex_array::VortexSessionExecute;
22use vortex_array::dtype::DType;
23use vortex_array::dtype::FieldMask;
24use vortex_array::dtype::FieldName;
25use vortex_array::dtype::Nullability;
26use vortex_array::dtype::PType;
27use vortex_array::expr::ExactExpr;
28use vortex_array::expr::Expression;
29use vortex_array::expr::is_root;
30use vortex_array::expr::root;
31use vortex_array::expr::transform::PartitionedExpr;
32use vortex_array::expr::transform::partition;
33use vortex_array::expr::transform::replace;
34use vortex_array::scalar::PValue;
35use vortex_error::VortexExpect;
36use vortex_error::VortexResult;
37use vortex_mask::Mask;
38use vortex_sequence::Sequence;
39use vortex_sequence::SequenceArray;
40use vortex_session::VortexSession;
41use vortex_utils::aliases::dash_map::DashMap;
42
43use crate::ArrayFuture;
44use crate::LayoutReader;
45use crate::RowSplits;
46use crate::SplitRange;
47use crate::layouts::partitioned::PartitionedExprEval;
48
49pub struct RowIdxLayoutReader {
50    name: Arc<str>,
51    row_offset: u64,
52    child: Arc<dyn LayoutReader>,
53    partition_cache: DashMap<ExactExpr, Arc<OnceLock<Partitioning>>>,
54    session: VortexSession,
55}
56
57impl RowIdxLayoutReader {
58    pub fn new(row_offset: u64, child: Arc<dyn LayoutReader>, session: VortexSession) -> Self {
59        Self {
60            name: Arc::clone(child.name()),
61            row_offset,
62            child,
63            partition_cache: DashMap::with_hasher(Default::default()),
64            session,
65        }
66    }
67
68    fn partition_expr(&self, expr: &Expression) -> VortexResult<Partitioning> {
69        let key = ExactExpr(expr.clone());
70
71        // Check cache first with read-only lock.
72        if let Some(entry) = self.partition_cache.get(&key)
73            && let Some(partitioning) = entry.value().get()
74        {
75            return Ok(partitioning.clone());
76        }
77
78        let result = self.compute_partitioning(expr)?;
79
80        self.partition_cache
81            .entry(key)
82            .or_insert_with(|| Arc::new(OnceLock::new()))
83            .get_or_init(|| result.clone());
84
85        Ok(result)
86    }
87
88    fn compute_partitioning(&self, expr: &Expression) -> VortexResult<Partitioning> {
89        // Partition the expression into row idx and child expressions.
90        let mut partitioned = partition(expr.clone(), self.dtype(), |expr| {
91            if expr.is::<RowIdx>() {
92                vec![Partition::RowIdx]
93            } else if is_root(expr) {
94                vec![Partition::Child]
95            } else {
96                vec![]
97            }
98        })?;
99
100        // If there's only a single partition, we can directly return the expression.
101        if partitioned.partitions.len() == 1 {
102            return Ok(match &partitioned.partition_annotations[0] {
103                Partition::RowIdx => {
104                    Partitioning::RowIdx(replace(expr.clone(), &row_idx(), root()))
105                }
106                Partition::Child => Partitioning::Child(expr.clone()),
107            });
108        }
109
110        // Replace the row_idx expression with the root expression in the row_idx partition.
111        partitioned.partitions = partitioned
112            .partitions
113            .into_iter()
114            .map(|p| replace(p, &row_idx(), root()))
115            .collect();
116
117        Ok(Partitioning::Partitioned(Arc::new(partitioned)))
118    }
119}
120
121#[derive(Clone)]
122enum Partitioning {
123    // An expression that only references the row index (e.g., `row_idx == 5`).
124    RowIdx(Expression),
125    // An expression that does not reference the row index.
126    Child(Expression),
127    // Contains both the RowIdx and Child expressions, (e.g., `row_idx < child.some_field`).
128    Partitioned(Arc<PartitionedExpr<Partition>>),
129}
130
131#[derive(Clone, PartialEq, Eq, Hash)]
132enum Partition {
133    RowIdx,
134    Child,
135}
136
137impl Partition {
138    pub fn name(&self) -> &str {
139        match self {
140            Partition::RowIdx => "row_idx",
141            Partition::Child => "child",
142        }
143    }
144}
145
146impl From<Partition> for FieldName {
147    fn from(value: Partition) -> Self {
148        FieldName::from(value.name())
149    }
150}
151
152impl Display for Partition {
153    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
154        write!(f, "{}", self.name())
155    }
156}
157
158impl LayoutReader for RowIdxLayoutReader {
159    fn name(&self) -> &Arc<str> {
160        &self.name
161    }
162
163    fn dtype(&self) -> &DType {
164        self.child.dtype()
165    }
166
167    fn row_count(&self) -> u64 {
168        self.child.row_count()
169    }
170
171    fn register_splits(
172        &self,
173        field_mask: &[FieldMask],
174        split_range: &SplitRange,
175        splits: &mut RowSplits,
176    ) -> VortexResult<()> {
177        self.child.register_splits(field_mask, split_range, splits)
178    }
179
180    fn pruning_evaluation(
181        &self,
182        row_range: &Range<u64>,
183        expr: &Expression,
184        mask: Mask,
185    ) -> VortexResult<MaskFuture> {
186        Ok(match &self.partition_expr(expr)? {
187            Partitioning::RowIdx(expr) => row_idx_mask_future(
188                self.row_offset,
189                row_range,
190                expr,
191                MaskFuture::ready(mask),
192                self.session.clone(),
193            ),
194            Partitioning::Child(expr) => self.child.pruning_evaluation(row_range, expr, mask)?,
195            Partitioning::Partitioned(..) => MaskFuture::ready(mask),
196        })
197    }
198
199    fn filter_evaluation(
200        &self,
201        row_range: &Range<u64>,
202        expr: &Expression,
203        mask: MaskFuture,
204    ) -> VortexResult<MaskFuture> {
205        match &self.partition_expr(expr)? {
206            // Since this is run during pruning, we skip re-evaluating the row index expression
207            // during the filter evaluation.
208            Partitioning::RowIdx(_) => Ok(mask),
209            Partitioning::Child(expr) => self.child.filter_evaluation(row_range, expr, mask),
210            Partitioning::Partitioned(p) => Arc::clone(p).into_mask_future(
211                mask,
212                |annotation, expr, mask| match annotation {
213                    Partition::RowIdx => Ok(row_idx_mask_future(
214                        self.row_offset,
215                        row_range,
216                        expr,
217                        mask,
218                        self.session.clone(),
219                    )),
220                    Partition::Child => self.child.filter_evaluation(row_range, expr, mask),
221                },
222                |annotation, expr, mask| match annotation {
223                    Partition::RowIdx => Ok(row_idx_array_future(
224                        self.row_offset,
225                        row_range,
226                        expr,
227                        mask,
228                        self.session.clone(),
229                    )),
230                    Partition::Child => self.child.projection_evaluation(row_range, expr, mask),
231                },
232                self.session.clone(),
233            ),
234        }
235    }
236
237    fn projection_evaluation(
238        &self,
239        row_range: &Range<u64>,
240        expr: &Expression,
241        mask: MaskFuture,
242    ) -> VortexResult<BoxFuture<'static, VortexResult<ArrayRef>>> {
243        match &self.partition_expr(expr)? {
244            Partitioning::RowIdx(expr) => Ok(row_idx_array_future(
245                self.row_offset,
246                row_range,
247                expr,
248                mask,
249                self.session.clone(),
250            )),
251            Partitioning::Child(expr) => self.child.projection_evaluation(row_range, expr, mask),
252            Partitioning::Partitioned(p) => {
253                Arc::clone(p).into_array_future(mask, |annotation, expr, mask| match annotation {
254                    Partition::RowIdx => Ok(row_idx_array_future(
255                        self.row_offset,
256                        row_range,
257                        expr,
258                        mask,
259                        self.session.clone(),
260                    )),
261                    Partition::Child => self.child.projection_evaluation(row_range, expr, mask),
262                })
263            }
264        }
265    }
266
267    fn as_any(&self) -> &dyn std::any::Any {
268        self
269    }
270}
271
272// Returns a SequenceArray representing the row indices for the given row range,
273fn idx_array(row_offset: u64, row_range: &Range<u64>) -> SequenceArray {
274    Sequence::try_new(
275        PValue::U64(row_offset + row_range.start),
276        PValue::U64(1),
277        PType::U64,
278        NonNullable,
279        usize::try_from(row_range.end - row_range.start)
280            .vortex_expect("Row range length must fit in usize"),
281    )
282    .vortex_expect("Failed to create row index array")
283}
284
285fn row_idx_mask_future(
286    row_offset: u64,
287    row_range: &Range<u64>,
288    expr: &Expression,
289    mask: MaskFuture,
290    session: VortexSession,
291) -> MaskFuture {
292    let row_range = row_range.clone();
293    let expr = expr.clone();
294    MaskFuture::new(mask.len(), async move {
295        let array = idx_array(row_offset, &row_range).into_array();
296
297        let mut ctx = session.create_execution_ctx();
298        let result_mask = array.apply(&expr)?.execute::<Mask>(&mut ctx)?;
299
300        Ok(result_mask.bitand(&mask.await?))
301    })
302}
303
304fn row_idx_array_future(
305    row_offset: u64,
306    row_range: &Range<u64>,
307    expr: &Expression,
308    mask: MaskFuture,
309    session: VortexSession,
310) -> ArrayFuture {
311    let row_range = row_range.clone();
312    let expr = expr.clone();
313    async move {
314        let array = idx_array(row_offset, &row_range).into_array();
315        let filtered = array.filter(mask.await?)?;
316        let mut ctx = session.create_execution_ctx();
317        let array = filtered.execute::<Canonical>(&mut ctx)?.into_array();
318        array.apply(&expr)
319    }
320    .boxed()
321}
322
323#[cfg(test)]
324mod tests {
325    use std::sync::Arc;
326
327    use vortex_array::ArrayContext;
328    use vortex_array::IntoArray as _;
329    use vortex_array::MaskFuture;
330    use vortex_array::arrays::BoolArray;
331    use vortex_array::assert_arrays_eq;
332    use vortex_array::expr::eq;
333    use vortex_array::expr::gt;
334    use vortex_array::expr::lit;
335    use vortex_array::expr::or;
336    use vortex_array::expr::root;
337    use vortex_buffer::buffer;
338    use vortex_io::runtime::single::block_on;
339    use vortex_io::session::RuntimeSessionExt;
340
341    use crate::LayoutReader;
342    use crate::LayoutStrategy;
343    use crate::layouts::flat::writer::FlatLayoutStrategy;
344    use crate::layouts::row_idx::RowIdxLayoutReader;
345    use crate::layouts::row_idx::row_idx;
346    use crate::segments::TestSegments;
347    use crate::sequence::SequenceId;
348    use crate::sequence::SequentialArrayStreamExt;
349    use crate::test::SESSION;
350
351    #[test]
352    fn flat_expr_no_row_id() {
353        block_on(|handle| async {
354            let session = SESSION.clone().with_handle(handle);
355            let ctx = ArrayContext::empty();
356            let segments = Arc::new(TestSegments::default());
357            let (ptr, eof) = SequenceId::root().split();
358            let array = buffer![1..=5].into_array();
359            let layout = FlatLayoutStrategy::default()
360                .write_stream(
361                    ctx,
362                    Arc::<TestSegments>::clone(&segments),
363                    array.to_array_stream().sequenced(ptr),
364                    eof,
365                    &session,
366                )
367                .await
368                .unwrap();
369
370            let expr = eq(root(), lit(3i32));
371            let result = RowIdxLayoutReader::new(
372                0,
373                layout
374                    .new_reader("".into(), segments, &SESSION, &Default::default())
375                    .unwrap(),
376                SESSION.clone(),
377            )
378            .projection_evaluation(
379                &(0..layout.row_count()),
380                &expr,
381                MaskFuture::new_true(layout.row_count().try_into().unwrap()),
382            )
383            .unwrap()
384            .await
385            .unwrap();
386
387            assert_arrays_eq!(
388                result,
389                BoolArray::from_iter([false, false, true, false, false])
390            );
391        })
392    }
393
394    #[test]
395    fn flat_expr_row_id() {
396        block_on(|handle| async {
397            let session = SESSION.clone().with_handle(handle);
398            let ctx = ArrayContext::empty();
399            let segments = Arc::new(TestSegments::default());
400            let (ptr, eof) = SequenceId::root().split();
401            let array = buffer![1..=5].into_array();
402            let layout = FlatLayoutStrategy::default()
403                .write_stream(
404                    ctx,
405                    Arc::<TestSegments>::clone(&segments),
406                    array.to_array_stream().sequenced(ptr),
407                    eof,
408                    &session,
409                )
410                .await
411                .unwrap();
412
413            let expr = gt(row_idx(), lit(3u64));
414            let result = RowIdxLayoutReader::new(
415                0,
416                layout
417                    .new_reader("".into(), segments, &SESSION, &Default::default())
418                    .unwrap(),
419                SESSION.clone(),
420            )
421            .projection_evaluation(
422                &(0..layout.row_count()),
423                &expr,
424                MaskFuture::new_true(layout.row_count().try_into().unwrap()),
425            )
426            .unwrap()
427            .await
428            .unwrap();
429
430            assert_arrays_eq!(
431                result,
432                BoolArray::from_iter([false, false, false, false, true])
433            );
434        })
435    }
436
437    #[test]
438    fn flat_expr_or() {
439        block_on(|handle| async {
440            let session = SESSION.clone().with_handle(handle);
441            let ctx = ArrayContext::empty();
442            let segments = Arc::new(TestSegments::default());
443            let (ptr, eof) = SequenceId::root().split();
444            let array = buffer![1..=5].into_array();
445            let layout = FlatLayoutStrategy::default()
446                .write_stream(
447                    ctx,
448                    Arc::<TestSegments>::clone(&segments),
449                    array.to_array_stream().sequenced(ptr),
450                    eof,
451                    &session,
452                )
453                .await
454                .unwrap();
455
456            let expr = or(
457                eq(root(), lit(3i32)),
458                or(gt(row_idx(), lit(3u64)), eq(root(), lit(1i32))),
459            );
460
461            let result = RowIdxLayoutReader::new(
462                0,
463                layout
464                    .new_reader("".into(), segments, &SESSION, &Default::default())
465                    .unwrap(),
466                SESSION.clone(),
467            )
468            .projection_evaluation(
469                &(0..layout.row_count()),
470                &expr,
471                MaskFuture::new_true(layout.row_count().try_into().unwrap()),
472            )
473            .unwrap()
474            .await
475            .unwrap();
476
477            assert_arrays_eq!(
478                result,
479                BoolArray::from_iter([true, false, true, false, true])
480            );
481        })
482    }
483}