1use std::collections::BTreeSet;
5use std::ops::Range;
6use std::sync::Arc;
7
8use futures::future::BoxFuture;
9use futures::try_join;
10use once_cell::sync::OnceCell;
11use vortex_array::expr::Expression;
12use vortex_array::{ArrayRef, MaskFuture};
13use vortex_dtype::{DType, FieldMask};
14use vortex_error::{VortexResult, vortex_bail};
15use vortex_mask::Mask;
16
17use crate::children::LayoutChildren;
18use crate::segments::SegmentSource;
19
20pub type LayoutReaderRef = Arc<dyn LayoutReader>;
21
22pub trait LayoutReader: 'static + Send + Sync {
25 fn name(&self) -> &Arc<str>;
27
28 fn dtype(&self) -> &DType;
30
31 fn row_count(&self) -> u64;
33
34 fn register_splits(
37 &self,
38 field_mask: &[FieldMask],
39 row_range: &Range<u64>,
40 splits: &mut BTreeSet<u64>,
41 ) -> VortexResult<()>;
42
43 fn pruning_evaluation(
47 &self,
48 row_range: &Range<u64>,
49 expr: &Expression,
50 mask: Mask,
51 ) -> VortexResult<MaskFuture>;
52
53 fn filter_evaluation(
63 &self,
64 row_range: &Range<u64>,
65 expr: &Expression,
66 mask: MaskFuture,
67 ) -> VortexResult<MaskFuture>;
68
69 fn projection_evaluation(
79 &self,
80 row_range: &Range<u64>,
81 expr: &Expression,
82 mask: MaskFuture,
83 ) -> VortexResult<ArrayFuture>;
84}
85
86pub type ArrayFuture = BoxFuture<'static, VortexResult<ArrayRef>>;
87
88pub trait ArrayFutureExt {
89 fn masked(self, mask: MaskFuture) -> Self;
90}
91
92impl ArrayFutureExt for ArrayFuture {
93 fn masked(self, mask: MaskFuture) -> Self {
95 Box::pin(async move {
96 let (array, mask) = try_join!(self, mask)?;
97 vortex_array::compute::mask(array.as_ref(), &mask)
98 })
99 }
100}
101
102pub struct LazyReaderChildren {
103 children: Arc<dyn LayoutChildren>,
104 dtypes: Vec<DType>,
105 names: Vec<Arc<str>>,
106 segment_source: Arc<dyn SegmentSource>,
107 cache: Vec<OnceCell<LayoutReaderRef>>,
109}
110
111impl LazyReaderChildren {
112 pub fn new(
113 children: Arc<dyn LayoutChildren>,
114 dtypes: Vec<DType>,
115 names: Vec<Arc<str>>,
116 segment_source: Arc<dyn SegmentSource>,
117 ) -> Self {
118 let nchildren = children.nchildren();
119 let cache = (0..nchildren).map(|_| OnceCell::new()).collect();
120 Self {
121 children,
122 dtypes,
123 names,
124 segment_source,
125 cache,
126 }
127 }
128
129 pub fn get(&self, idx: usize) -> VortexResult<&LayoutReaderRef> {
130 if idx >= self.cache.len() {
131 vortex_bail!("Child index out of bounds: {} of {}", idx, self.cache.len());
132 }
133
134 self.cache[idx].get_or_try_init(|| {
135 let dtype = &self.dtypes[idx];
136 let child = self.children.child(idx, dtype)?;
137 child.new_reader(Arc::clone(&self.names[idx]), self.segment_source.clone())
138 })
139 }
140}