vortex_layout/reader.rs
1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::collections::BTreeSet;
5use std::ops::Range;
6use std::sync::Arc;
7
8use futures::future::BoxFuture;
9use once_cell::sync::OnceCell;
10use vortex_array::stats::Precision;
11use vortex_array::{ArrayRef, MaskFuture};
12use vortex_dtype::{DType, FieldMask};
13use vortex_error::{VortexResult, vortex_bail};
14use vortex_expr::ExprRef;
15use vortex_mask::Mask;
16
17use crate::children::LayoutChildren;
18use crate::segments::SegmentSource;
19
20pub type LayoutReaderRef = Arc<dyn LayoutReader>;
21
22/// A [`LayoutReader`] is used to read a [`crate::Layout`] in a way that can cache state across multiple
23/// evaluation operations.
24pub trait LayoutReader: 'static + Send + Sync {
25 /// Returns the name of the layout reader for debugging.
26 fn name(&self) -> &Arc<str>;
27
28 /// Returns the un-projected dtype of the layout reader.
29 fn dtype(&self) -> &DType;
30
31 /// Returns the number of rows in the layout reader.
32 /// An inexact count may be larger or smaller than the actual row count.
33 fn row_count(&self) -> Precision<u64>;
34
35 /// Register the splits of this layout reader.
36 // TODO(ngates): this is a temporary API until we make layout readers stream based.
37 fn register_splits(
38 &self,
39 field_mask: &[FieldMask],
40 row_offset: u64,
41 splits: &mut BTreeSet<u64>,
42 ) -> VortexResult<()>;
43
44 /// Returns a mask where all false values are proven to be false in the given expression.
45 ///
46 /// The returned mask **does not** need to have been intersected with the input mask.
47 fn pruning_evaluation(
48 &self,
49 row_range: &Range<u64>,
50 expr: &ExprRef,
51 mask: Mask,
52 ) -> VortexResult<MaskFuture>;
53
54 /// Refines the given mask, returning a mask equal in length to the input mask.
55 ///
56 /// It is recommended to defer awaiting the input mask for as long as possible (ideally, after
57 /// all I/O is complete). This allows other conjuncts the opportunity to refine the mask as much
58 /// as possible before it is used.
59 ///
60 /// ## Post-conditions
61 ///
62 /// The returned mask **MUST** have been intersected with the input mask.
63 fn filter_evaluation(
64 &self,
65 row_range: &Range<u64>,
66 expr: &ExprRef,
67 mask: MaskFuture,
68 ) -> VortexResult<MaskFuture>;
69
70 /// Evaluates an expression against an array.
71 ///
72 /// It is recommended to defer awaiting the input mask for as long as possible (ideally, after
73 /// all I/O is complete). This allows other conjuncts the opportunity to refine the mask as much
74 /// as possible before it is used.
75 ///
76 /// ## Post-conditions
77 ///
78 /// The returned array **MUST** have length equal to the true count of the input mask.
79 fn projection_evaluation(
80 &self,
81 row_range: &Range<u64>,
82 expr: &ExprRef,
83 mask: MaskFuture,
84 ) -> VortexResult<ArrayFuture>;
85}
86
87pub type ArrayFuture = BoxFuture<'static, VortexResult<ArrayRef>>;
88
89pub struct LazyReaderChildren {
90 children: Arc<dyn LayoutChildren>,
91 segment_source: Arc<dyn SegmentSource>,
92
93 // TODO(ngates): we may want a hash map of some sort here?
94 cache: Vec<OnceCell<LayoutReaderRef>>,
95}
96
97impl LazyReaderChildren {
98 pub fn new(children: Arc<dyn LayoutChildren>, segment_source: Arc<dyn SegmentSource>) -> Self {
99 let nchildren = children.nchildren();
100 let cache = (0..nchildren).map(|_| OnceCell::new()).collect();
101 Self {
102 children,
103 segment_source,
104 cache,
105 }
106 }
107
108 pub fn get(
109 &self,
110 idx: usize,
111 dtype: &DType,
112 name: &Arc<str>,
113 ) -> VortexResult<&LayoutReaderRef> {
114 if idx >= self.cache.len() {
115 vortex_bail!("Child index out of bounds: {} of {}", idx, self.cache.len());
116 }
117
118 self.cache[idx].get_or_try_init(|| {
119 let child = self.children.child(idx, dtype)?;
120 child.new_reader(name.clone(), self.segment_source.clone())
121 })
122 }
123}