Skip to main content

vortex_layout/layouts/row_idx/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4mod expr;
5
6use std::collections::BTreeSet;
7use std::fmt::Display;
8use std::fmt::Formatter;
9use std::ops::BitAnd;
10use std::ops::Range;
11use std::sync::Arc;
12use std::sync::OnceLock;
13
14use Nullability::NonNullable;
15pub use expr::*;
16use futures::FutureExt;
17use futures::future::BoxFuture;
18use vortex_array::ArrayRef;
19use vortex_array::IntoArray;
20use vortex_array::MaskFuture;
21use vortex_array::VortexSessionExecute;
22use vortex_array::dtype::DType;
23use vortex_array::dtype::FieldMask;
24use vortex_array::dtype::FieldName;
25use vortex_array::dtype::Nullability;
26use vortex_array::dtype::PType;
27use vortex_array::expr::ExactExpr;
28use vortex_array::expr::Expression;
29use vortex_array::expr::is_root;
30use vortex_array::expr::root;
31use vortex_array::expr::transform::PartitionedExpr;
32use vortex_array::expr::transform::partition;
33use vortex_array::expr::transform::replace;
34use vortex_array::scalar::PValue;
35use vortex_error::VortexExpect;
36use vortex_error::VortexResult;
37use vortex_mask::Mask;
38use vortex_sequence::Sequence;
39use vortex_sequence::SequenceArray;
40use vortex_session::VortexSession;
41use vortex_utils::aliases::dash_map::DashMap;
42
43use crate::ArrayFuture;
44use crate::LayoutReader;
45use crate::layouts::partitioned::PartitionedExprEval;
46
47pub struct RowIdxLayoutReader {
48    name: Arc<str>,
49    row_offset: u64,
50    child: Arc<dyn LayoutReader>,
51    partition_cache: DashMap<ExactExpr, Arc<OnceLock<Partitioning>>>,
52    session: VortexSession,
53}
54
55impl RowIdxLayoutReader {
56    pub fn new(row_offset: u64, child: Arc<dyn LayoutReader>, session: VortexSession) -> Self {
57        Self {
58            name: Arc::clone(child.name()),
59            row_offset,
60            child,
61            partition_cache: DashMap::with_hasher(Default::default()),
62            session,
63        }
64    }
65
66    fn partition_expr(&self, expr: &Expression) -> Partitioning {
67        let key = ExactExpr(expr.clone());
68
69        // Check cache first with read-only lock.
70        if let Some(entry) = self.partition_cache.get(&key)
71            && let Some(partitioning) = entry.value().get()
72        {
73            return partitioning.clone();
74        }
75
76        let cell = self
77            .partition_cache
78            .entry(key)
79            .or_insert_with(|| Arc::new(OnceLock::new()))
80            .clone();
81
82        cell.get_or_init(|| self.compute_partitioning(expr)).clone()
83    }
84
85    fn compute_partitioning(&self, expr: &Expression) -> Partitioning {
86        // Partition the expression into row idx and child expressions.
87        let mut partitioned = partition(expr.clone(), self.dtype(), |expr| {
88            if expr.is::<RowIdx>() {
89                vec![Partition::RowIdx]
90            } else if is_root(expr) {
91                vec![Partition::Child]
92            } else {
93                vec![]
94            }
95        })
96        .vortex_expect("We should not fail to partition expression over struct fields");
97
98        // If there's only a single partition, we can directly return the expression.
99        if partitioned.partitions.len() == 1 {
100            return match &partitioned.partition_annotations[0] {
101                Partition::RowIdx => {
102                    Partitioning::RowIdx(replace(expr.clone(), &row_idx(), root()))
103                }
104                Partition::Child => Partitioning::Child(expr.clone()),
105            };
106        }
107
108        // Replace the row_idx expression with the root expression in the row_idx partition.
109        partitioned.partitions = partitioned
110            .partitions
111            .into_iter()
112            .map(|p| replace(p, &row_idx(), root()))
113            .collect();
114
115        Partitioning::Partitioned(Arc::new(partitioned))
116    }
117}
118
119#[derive(Clone)]
120enum Partitioning {
121    // An expression that only references the row index (e.g., `row_idx == 5`).
122    RowIdx(Expression),
123    // An expression that does not reference the row index.
124    Child(Expression),
125    // Contains both the RowIdx and Child expressions, (e.g., `row_idx < child.some_field`).
126    Partitioned(Arc<PartitionedExpr<Partition>>),
127}
128
129#[derive(Clone, PartialEq, Eq, Hash)]
130enum Partition {
131    RowIdx,
132    Child,
133}
134
135impl Partition {
136    pub fn name(&self) -> &str {
137        match self {
138            Partition::RowIdx => "row_idx",
139            Partition::Child => "child",
140        }
141    }
142}
143
144impl From<Partition> for FieldName {
145    fn from(value: Partition) -> Self {
146        FieldName::from(value.name())
147    }
148}
149
150impl Display for Partition {
151    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
152        write!(f, "{}", self.name())
153    }
154}
155
156impl LayoutReader for RowIdxLayoutReader {
157    fn name(&self) -> &Arc<str> {
158        &self.name
159    }
160
161    fn dtype(&self) -> &DType {
162        self.child.dtype()
163    }
164
165    fn row_count(&self) -> u64 {
166        self.child.row_count()
167    }
168
169    fn register_splits(
170        &self,
171        field_mask: &[FieldMask],
172        row_range: &Range<u64>,
173        splits: &mut BTreeSet<u64>,
174    ) -> VortexResult<()> {
175        self.child.register_splits(field_mask, row_range, splits)
176    }
177
178    fn pruning_evaluation(
179        &self,
180        row_range: &Range<u64>,
181        expr: &Expression,
182        mask: Mask,
183    ) -> VortexResult<MaskFuture> {
184        Ok(match &self.partition_expr(expr) {
185            Partitioning::RowIdx(expr) => row_idx_mask_future(
186                self.row_offset,
187                row_range,
188                expr,
189                MaskFuture::ready(mask),
190                self.session.clone(),
191            ),
192            Partitioning::Child(expr) => self.child.pruning_evaluation(row_range, expr, mask)?,
193            Partitioning::Partitioned(..) => MaskFuture::ready(mask),
194        })
195    }
196
197    fn filter_evaluation(
198        &self,
199        row_range: &Range<u64>,
200        expr: &Expression,
201        mask: MaskFuture,
202    ) -> VortexResult<MaskFuture> {
203        match &self.partition_expr(expr) {
204            // Since this is run during pruning, we skip re-evaluating the row index expression
205            // during the filter evaluation.
206            Partitioning::RowIdx(_) => Ok(mask),
207            Partitioning::Child(expr) => self.child.filter_evaluation(row_range, expr, mask),
208            Partitioning::Partitioned(p) => Arc::clone(p).into_mask_future(
209                mask,
210                |annotation, expr, mask| match annotation {
211                    Partition::RowIdx => Ok(row_idx_mask_future(
212                        self.row_offset,
213                        row_range,
214                        expr,
215                        mask,
216                        self.session.clone(),
217                    )),
218                    Partition::Child => self.child.filter_evaluation(row_range, expr, mask),
219                },
220                |annotation, expr, mask| match annotation {
221                    Partition::RowIdx => {
222                        Ok(row_idx_array_future(self.row_offset, row_range, expr, mask))
223                    }
224                    Partition::Child => self.child.projection_evaluation(row_range, expr, mask),
225                },
226                self.session.clone(),
227            ),
228        }
229    }
230
231    fn projection_evaluation(
232        &self,
233        row_range: &Range<u64>,
234        expr: &Expression,
235        mask: MaskFuture,
236    ) -> VortexResult<BoxFuture<'static, VortexResult<ArrayRef>>> {
237        match &self.partition_expr(expr) {
238            Partitioning::RowIdx(expr) => {
239                Ok(row_idx_array_future(self.row_offset, row_range, expr, mask))
240            }
241            Partitioning::Child(expr) => self.child.projection_evaluation(row_range, expr, mask),
242            Partitioning::Partitioned(p) => {
243                Arc::clone(p).into_array_future(mask, |annotation, expr, mask| match annotation {
244                    Partition::RowIdx => {
245                        Ok(row_idx_array_future(self.row_offset, row_range, expr, mask))
246                    }
247                    Partition::Child => self.child.projection_evaluation(row_range, expr, mask),
248                })
249            }
250        }
251    }
252}
253
254// Returns a SequenceArray representing the row indices for the given row range,
255fn idx_array(row_offset: u64, row_range: &Range<u64>) -> SequenceArray {
256    Sequence::try_new(
257        PValue::U64(row_offset + row_range.start),
258        PValue::U64(1),
259        PType::U64,
260        NonNullable,
261        usize::try_from(row_range.end - row_range.start)
262            .vortex_expect("Row range length must fit in usize"),
263    )
264    .vortex_expect("Failed to create row index array")
265}
266
267fn row_idx_mask_future(
268    row_offset: u64,
269    row_range: &Range<u64>,
270    expr: &Expression,
271    mask: MaskFuture,
272    session: VortexSession,
273) -> MaskFuture {
274    let row_range = row_range.clone();
275    let expr = expr.clone();
276    MaskFuture::new(mask.len(), async move {
277        let array = idx_array(row_offset, &row_range).into_array();
278
279        let mut ctx = session.create_execution_ctx();
280        let result_mask = array.apply(&expr)?.execute::<Mask>(&mut ctx)?;
281
282        Ok(result_mask.bitand(&mask.await?))
283    })
284}
285
286fn row_idx_array_future(
287    row_offset: u64,
288    row_range: &Range<u64>,
289    expr: &Expression,
290    mask: MaskFuture,
291) -> ArrayFuture {
292    let row_range = row_range.clone();
293    let expr = expr.clone();
294    async move {
295        let array = idx_array(row_offset, &row_range).into_array();
296        let array = array.filter(mask.await?)?.to_canonical()?.into_array();
297        array.apply(&expr)
298    }
299    .boxed()
300}
301
302#[cfg(test)]
303mod tests {
304    use std::sync::Arc;
305
306    use vortex_array::ArrayContext;
307    use vortex_array::IntoArray as _;
308    use vortex_array::MaskFuture;
309    use vortex_array::arrays::BoolArray;
310    use vortex_array::assert_arrays_eq;
311    use vortex_array::expr::eq;
312    use vortex_array::expr::gt;
313    use vortex_array::expr::lit;
314    use vortex_array::expr::or;
315    use vortex_array::expr::root;
316    use vortex_buffer::buffer;
317    use vortex_io::runtime::single::block_on;
318    use vortex_io::session::RuntimeSessionExt;
319
320    use crate::LayoutReader;
321    use crate::LayoutStrategy;
322    use crate::layouts::flat::writer::FlatLayoutStrategy;
323    use crate::layouts::row_idx::RowIdxLayoutReader;
324    use crate::layouts::row_idx::row_idx;
325    use crate::segments::TestSegments;
326    use crate::sequence::SequenceId;
327    use crate::sequence::SequentialArrayStreamExt;
328    use crate::test::SESSION;
329
330    #[test]
331    fn flat_expr_no_row_id() {
332        block_on(|handle| async {
333            let session = SESSION.clone().with_handle(handle);
334            let ctx = ArrayContext::empty();
335            let segments = Arc::new(TestSegments::default());
336            let (ptr, eof) = SequenceId::root().split();
337            let array = buffer![1..=5].into_array();
338            let layout = FlatLayoutStrategy::default()
339                .write_stream(
340                    ctx,
341                    Arc::<TestSegments>::clone(&segments),
342                    array.to_array_stream().sequenced(ptr),
343                    eof,
344                    &session,
345                )
346                .await
347                .unwrap();
348
349            let expr = eq(root(), lit(3i32));
350            let result = RowIdxLayoutReader::new(
351                0,
352                layout.new_reader("".into(), segments, &SESSION).unwrap(),
353                SESSION.clone(),
354            )
355            .projection_evaluation(
356                &(0..layout.row_count()),
357                &expr,
358                MaskFuture::new_true(layout.row_count().try_into().unwrap()),
359            )
360            .unwrap()
361            .await
362            .unwrap();
363
364            assert_arrays_eq!(
365                result,
366                BoolArray::from_iter([false, false, true, false, false])
367            );
368        })
369    }
370
371    #[test]
372    fn flat_expr_row_id() {
373        block_on(|handle| async {
374            let session = SESSION.clone().with_handle(handle);
375            let ctx = ArrayContext::empty();
376            let segments = Arc::new(TestSegments::default());
377            let (ptr, eof) = SequenceId::root().split();
378            let array = buffer![1..=5].into_array();
379            let layout = FlatLayoutStrategy::default()
380                .write_stream(
381                    ctx,
382                    Arc::<TestSegments>::clone(&segments),
383                    array.to_array_stream().sequenced(ptr),
384                    eof,
385                    &session,
386                )
387                .await
388                .unwrap();
389
390            let expr = gt(row_idx(), lit(3u64));
391            let result = RowIdxLayoutReader::new(
392                0,
393                layout.new_reader("".into(), segments, &SESSION).unwrap(),
394                SESSION.clone(),
395            )
396            .projection_evaluation(
397                &(0..layout.row_count()),
398                &expr,
399                MaskFuture::new_true(layout.row_count().try_into().unwrap()),
400            )
401            .unwrap()
402            .await
403            .unwrap();
404
405            assert_arrays_eq!(
406                result,
407                BoolArray::from_iter([false, false, false, false, true])
408            );
409        })
410    }
411
412    #[test]
413    fn flat_expr_or() {
414        block_on(|handle| async {
415            let session = SESSION.clone().with_handle(handle);
416            let ctx = ArrayContext::empty();
417            let segments = Arc::new(TestSegments::default());
418            let (ptr, eof) = SequenceId::root().split();
419            let array = buffer![1..=5].into_array();
420            let layout = FlatLayoutStrategy::default()
421                .write_stream(
422                    ctx,
423                    Arc::<TestSegments>::clone(&segments),
424                    array.to_array_stream().sequenced(ptr),
425                    eof,
426                    &session,
427                )
428                .await
429                .unwrap();
430
431            let expr = or(
432                eq(root(), lit(3i32)),
433                or(gt(row_idx(), lit(3u64)), eq(root(), lit(1i32))),
434            );
435
436            let result = RowIdxLayoutReader::new(
437                0,
438                layout.new_reader("".into(), segments, &SESSION).unwrap(),
439                SESSION.clone(),
440            )
441            .projection_evaluation(
442                &(0..layout.row_count()),
443                &expr,
444                MaskFuture::new_true(layout.row_count().try_into().unwrap()),
445            )
446            .unwrap()
447            .await
448            .unwrap();
449
450            assert_arrays_eq!(
451                result,
452                BoolArray::from_iter([true, false, true, false, true])
453            );
454        })
455    }
456}