Skip to main content

vortex_layout/layouts/row_idx/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4mod expr;
5
6use std::collections::BTreeSet;
7use std::fmt::Display;
8use std::fmt::Formatter;
9use std::ops::BitAnd;
10use std::ops::Range;
11use std::sync::Arc;
12use std::sync::OnceLock;
13
14use Nullability::NonNullable;
15pub use expr::*;
16use futures::FutureExt;
17use futures::future::BoxFuture;
18use vortex_array::ArrayRef;
19use vortex_array::DynArray;
20use vortex_array::IntoArray;
21use vortex_array::MaskFuture;
22use vortex_array::VortexSessionExecute;
23use vortex_array::dtype::DType;
24use vortex_array::dtype::FieldMask;
25use vortex_array::dtype::FieldName;
26use vortex_array::dtype::Nullability;
27use vortex_array::dtype::PType;
28use vortex_array::expr::ExactExpr;
29use vortex_array::expr::Expression;
30use vortex_array::expr::is_root;
31use vortex_array::expr::root;
32use vortex_array::expr::transform::PartitionedExpr;
33use vortex_array::expr::transform::partition;
34use vortex_array::expr::transform::replace;
35use vortex_array::scalar::PValue;
36use vortex_error::VortexExpect;
37use vortex_error::VortexResult;
38use vortex_mask::Mask;
39use vortex_sequence::SequenceArray;
40use vortex_session::VortexSession;
41use vortex_utils::aliases::dash_map::DashMap;
42
43use crate::ArrayFuture;
44use crate::LayoutReader;
45use crate::layouts::partitioned::PartitionedExprEval;
46
47pub struct RowIdxLayoutReader {
48    name: Arc<str>,
49    row_offset: u64,
50    child: Arc<dyn LayoutReader>,
51    partition_cache: DashMap<ExactExpr, Arc<OnceLock<Partitioning>>>,
52    session: VortexSession,
53}
54
55impl RowIdxLayoutReader {
56    pub fn new(row_offset: u64, child: Arc<dyn LayoutReader>, session: VortexSession) -> Self {
57        Self {
58            name: child.name().clone(),
59            row_offset,
60            child,
61            partition_cache: DashMap::with_hasher(Default::default()),
62            session,
63        }
64    }
65
66    fn partition_expr(&self, expr: &Expression) -> Partitioning {
67        let key = ExactExpr(expr.clone());
68
69        // Check cache first with read-only lock.
70        if let Some(entry) = self.partition_cache.get(&key)
71            && let Some(partitioning) = entry.value().get()
72        {
73            return partitioning.clone();
74        }
75
76        let cell = self
77            .partition_cache
78            .entry(key)
79            .or_insert_with(|| Arc::new(OnceLock::new()))
80            .clone();
81
82        cell.get_or_init(|| self.compute_partitioning(expr)).clone()
83    }
84
85    fn compute_partitioning(&self, expr: &Expression) -> Partitioning {
86        // Partition the expression into row idx and child expressions.
87        let mut partitioned = partition(expr.clone(), self.dtype(), |expr| {
88            if expr.is::<RowIdx>() {
89                vec![Partition::RowIdx]
90            } else if is_root(expr) {
91                vec![Partition::Child]
92            } else {
93                vec![]
94            }
95        })
96        .vortex_expect("We should not fail to partition expression over struct fields");
97
98        // If there's only a single partition, we can directly return the expression.
99        if partitioned.partitions.len() == 1 {
100            return match &partitioned.partition_annotations[0] {
101                Partition::RowIdx => {
102                    Partitioning::RowIdx(replace(expr.clone(), &row_idx(), root()))
103                }
104                Partition::Child => Partitioning::Child(expr.clone()),
105            };
106        }
107
108        // Replace the row_idx expression with the root expression in the row_idx partition.
109        partitioned.partitions = partitioned
110            .partitions
111            .into_iter()
112            .map(|p| replace(p, &row_idx(), root()))
113            .collect();
114
115        Partitioning::Partitioned(Arc::new(partitioned))
116    }
117}
118
119#[derive(Clone)]
120enum Partitioning {
121    // An expression that only references the row index (e.g., `row_idx == 5`).
122    RowIdx(Expression),
123    // An expression that does not reference the row index.
124    Child(Expression),
125    // Contains both the RowIdx and Child expressions, (e.g., `row_idx < child.some_field`).
126    Partitioned(Arc<PartitionedExpr<Partition>>),
127}
128
129#[derive(Clone, PartialEq, Eq, Hash)]
130enum Partition {
131    RowIdx,
132    Child,
133}
134
135impl Partition {
136    pub fn name(&self) -> &str {
137        match self {
138            Partition::RowIdx => "row_idx",
139            Partition::Child => "child",
140        }
141    }
142}
143
144impl From<Partition> for FieldName {
145    fn from(value: Partition) -> Self {
146        FieldName::from(value.name())
147    }
148}
149
150impl Display for Partition {
151    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
152        write!(f, "{}", self.name())
153    }
154}
155
156impl LayoutReader for RowIdxLayoutReader {
157    fn name(&self) -> &Arc<str> {
158        &self.name
159    }
160
161    fn dtype(&self) -> &DType {
162        self.child.dtype()
163    }
164
165    fn row_count(&self) -> u64 {
166        self.child.row_count()
167    }
168
169    fn register_splits(
170        &self,
171        field_mask: &[FieldMask],
172        row_range: &Range<u64>,
173        splits: &mut BTreeSet<u64>,
174    ) -> VortexResult<()> {
175        self.child.register_splits(field_mask, row_range, splits)
176    }
177
178    fn pruning_evaluation(
179        &self,
180        row_range: &Range<u64>,
181        expr: &Expression,
182        mask: Mask,
183    ) -> VortexResult<MaskFuture> {
184        Ok(match &self.partition_expr(expr) {
185            Partitioning::RowIdx(expr) => row_idx_mask_future(
186                self.row_offset,
187                row_range,
188                expr,
189                MaskFuture::ready(mask),
190                self.session.clone(),
191            ),
192            Partitioning::Child(expr) => self.child.pruning_evaluation(row_range, expr, mask)?,
193            Partitioning::Partitioned(..) => MaskFuture::ready(mask),
194        })
195    }
196
197    fn filter_evaluation(
198        &self,
199        row_range: &Range<u64>,
200        expr: &Expression,
201        mask: MaskFuture,
202    ) -> VortexResult<MaskFuture> {
203        match &self.partition_expr(expr) {
204            // Since this is run during pruning, we skip re-evaluating the row index expression
205            // during the filter evaluation.
206            Partitioning::RowIdx(_) => Ok(mask),
207            Partitioning::Child(expr) => self.child.filter_evaluation(row_range, expr, mask),
208            Partitioning::Partitioned(p) => p.clone().into_mask_future(
209                mask,
210                |annotation, expr, mask| match annotation {
211                    Partition::RowIdx => Ok(row_idx_mask_future(
212                        self.row_offset,
213                        row_range,
214                        expr,
215                        mask,
216                        self.session.clone(),
217                    )),
218                    Partition::Child => self.child.filter_evaluation(row_range, expr, mask),
219                },
220                |annotation, expr, mask| match annotation {
221                    Partition::RowIdx => {
222                        Ok(row_idx_array_future(self.row_offset, row_range, expr, mask))
223                    }
224                    Partition::Child => self.child.projection_evaluation(row_range, expr, mask),
225                },
226                self.session.clone(),
227            ),
228        }
229    }
230
231    fn projection_evaluation(
232        &self,
233        row_range: &Range<u64>,
234        expr: &Expression,
235        mask: MaskFuture,
236    ) -> VortexResult<BoxFuture<'static, VortexResult<ArrayRef>>> {
237        match &self.partition_expr(expr) {
238            Partitioning::RowIdx(expr) => {
239                Ok(row_idx_array_future(self.row_offset, row_range, expr, mask))
240            }
241            Partitioning::Child(expr) => self.child.projection_evaluation(row_range, expr, mask),
242            Partitioning::Partitioned(p) => {
243                p.clone()
244                    .into_array_future(mask, |annotation, expr, mask| match annotation {
245                        Partition::RowIdx => {
246                            Ok(row_idx_array_future(self.row_offset, row_range, expr, mask))
247                        }
248                        Partition::Child => self.child.projection_evaluation(row_range, expr, mask),
249                    })
250            }
251        }
252    }
253}
254
255// Returns a SequenceArray representing the row indices for the given row range,
256fn idx_array(row_offset: u64, row_range: &Range<u64>) -> SequenceArray {
257    SequenceArray::try_new(
258        PValue::U64(row_offset + row_range.start),
259        PValue::U64(1),
260        PType::U64,
261        NonNullable,
262        usize::try_from(row_range.end - row_range.start)
263            .vortex_expect("Row range length must fit in usize"),
264    )
265    .vortex_expect("Failed to create row index array")
266}
267
268fn row_idx_mask_future(
269    row_offset: u64,
270    row_range: &Range<u64>,
271    expr: &Expression,
272    mask: MaskFuture,
273    session: VortexSession,
274) -> MaskFuture {
275    let row_range = row_range.clone();
276    let expr = expr.clone();
277    MaskFuture::new(mask.len(), async move {
278        let array = idx_array(row_offset, &row_range).into_array();
279
280        let mut ctx = session.create_execution_ctx();
281        let result_mask = array.apply(&expr)?.execute::<Mask>(&mut ctx)?;
282
283        Ok(result_mask.bitand(&mask.await?))
284    })
285}
286
287fn row_idx_array_future(
288    row_offset: u64,
289    row_range: &Range<u64>,
290    expr: &Expression,
291    mask: MaskFuture,
292) -> ArrayFuture {
293    let row_range = row_range.clone();
294    let expr = expr.clone();
295    async move {
296        let array = idx_array(row_offset, &row_range).into_array();
297        let array = array.filter(mask.await?)?.to_canonical()?.into_array();
298        array.apply(&expr)
299    }
300    .boxed()
301}
302
303#[cfg(test)]
304mod tests {
305    use std::sync::Arc;
306
307    use vortex_array::ArrayContext;
308    use vortex_array::IntoArray as _;
309    use vortex_array::MaskFuture;
310    use vortex_array::arrays::BoolArray;
311    use vortex_array::assert_arrays_eq;
312    use vortex_array::expr::eq;
313    use vortex_array::expr::gt;
314    use vortex_array::expr::lit;
315    use vortex_array::expr::or;
316    use vortex_array::expr::root;
317    use vortex_buffer::buffer;
318    use vortex_io::runtime::single::block_on;
319
320    use crate::LayoutReader;
321    use crate::LayoutStrategy;
322    use crate::layouts::flat::writer::FlatLayoutStrategy;
323    use crate::layouts::row_idx::RowIdxLayoutReader;
324    use crate::layouts::row_idx::row_idx;
325    use crate::segments::TestSegments;
326    use crate::sequence::SequenceId;
327    use crate::sequence::SequentialArrayStreamExt;
328    use crate::test::SESSION;
329
330    #[test]
331    fn flat_expr_no_row_id() {
332        block_on(|handle| async {
333            let ctx = ArrayContext::empty();
334            let segments = Arc::new(TestSegments::default());
335            let (ptr, eof) = SequenceId::root().split();
336            let array = buffer![1..=5].into_array();
337            let layout = FlatLayoutStrategy::default()
338                .write_stream(
339                    ctx,
340                    segments.clone(),
341                    array.to_array_stream().sequenced(ptr),
342                    eof,
343                    handle,
344                )
345                .await
346                .unwrap();
347
348            let expr = eq(root(), lit(3i32));
349            let result = RowIdxLayoutReader::new(
350                0,
351                layout.new_reader("".into(), segments, &SESSION).unwrap(),
352                SESSION.clone(),
353            )
354            .projection_evaluation(
355                &(0..layout.row_count()),
356                &expr,
357                MaskFuture::new_true(layout.row_count().try_into().unwrap()),
358            )
359            .unwrap()
360            .await
361            .unwrap();
362
363            assert_arrays_eq!(
364                result,
365                BoolArray::from_iter([false, false, true, false, false])
366            );
367        })
368    }
369
370    #[test]
371    fn flat_expr_row_id() {
372        block_on(|handle| async {
373            let ctx = ArrayContext::empty();
374            let segments = Arc::new(TestSegments::default());
375            let (ptr, eof) = SequenceId::root().split();
376            let array = buffer![1..=5].into_array();
377            let layout = FlatLayoutStrategy::default()
378                .write_stream(
379                    ctx,
380                    segments.clone(),
381                    array.to_array_stream().sequenced(ptr),
382                    eof,
383                    handle,
384                )
385                .await
386                .unwrap();
387
388            let expr = gt(row_idx(), lit(3u64));
389            let result = RowIdxLayoutReader::new(
390                0,
391                layout.new_reader("".into(), segments, &SESSION).unwrap(),
392                SESSION.clone(),
393            )
394            .projection_evaluation(
395                &(0..layout.row_count()),
396                &expr,
397                MaskFuture::new_true(layout.row_count().try_into().unwrap()),
398            )
399            .unwrap()
400            .await
401            .unwrap();
402
403            assert_arrays_eq!(
404                result,
405                BoolArray::from_iter([false, false, false, false, true])
406            );
407        })
408    }
409
410    #[test]
411    fn flat_expr_or() {
412        block_on(|handle| async {
413            let ctx = ArrayContext::empty();
414            let segments = Arc::new(TestSegments::default());
415            let (ptr, eof) = SequenceId::root().split();
416            let array = buffer![1..=5].into_array();
417            let layout = FlatLayoutStrategy::default()
418                .write_stream(
419                    ctx,
420                    segments.clone(),
421                    array.to_array_stream().sequenced(ptr),
422                    eof,
423                    handle,
424                )
425                .await
426                .unwrap();
427
428            let expr = or(
429                eq(root(), lit(3i32)),
430                or(gt(row_idx(), lit(3u64)), eq(root(), lit(1i32))),
431            );
432
433            let result = RowIdxLayoutReader::new(
434                0,
435                layout.new_reader("".into(), segments, &SESSION).unwrap(),
436                SESSION.clone(),
437            )
438            .projection_evaluation(
439                &(0..layout.row_count()),
440                &expr,
441                MaskFuture::new_true(layout.row_count().try_into().unwrap()),
442            )
443            .unwrap()
444            .await
445            .unwrap();
446
447            assert_arrays_eq!(
448                result,
449                BoolArray::from_iter([true, false, true, false, true])
450            );
451        })
452    }
453}