1use std::collections::BTreeSet;
5use std::ops::Range;
6use std::sync::Arc;
7
8use futures::future::BoxFuture;
9use futures::try_join;
10use once_cell::sync::OnceCell;
11use vortex_array::ArrayRef;
12use vortex_array::IntoArray;
13use vortex_array::MaskFuture;
14use vortex_array::builtins::ArrayBuiltins;
15use vortex_array::expr::Expression;
16use vortex_dtype::DType;
17use vortex_dtype::FieldMask;
18use vortex_error::VortexResult;
19use vortex_error::vortex_bail;
20use vortex_mask::Mask;
21use vortex_session::VortexSession;
22
23use crate::children::LayoutChildren;
24use crate::segments::SegmentSource;
25
26pub type LayoutReaderRef = Arc<dyn LayoutReader>;
27
28pub trait LayoutReader: 'static + Send + Sync {
31 fn name(&self) -> &Arc<str>;
33
34 fn dtype(&self) -> &DType;
36
37 fn row_count(&self) -> u64;
39
40 fn register_splits(
43 &self,
44 field_mask: &[FieldMask],
45 row_range: &Range<u64>,
46 splits: &mut BTreeSet<u64>,
47 ) -> VortexResult<()>;
48
49 fn pruning_evaluation(
53 &self,
54 row_range: &Range<u64>,
55 expr: &Expression,
56 mask: Mask,
57 ) -> VortexResult<MaskFuture>;
58
59 fn filter_evaluation(
69 &self,
70 row_range: &Range<u64>,
71 expr: &Expression,
72 mask: MaskFuture,
73 ) -> VortexResult<MaskFuture>;
74
75 fn projection_evaluation(
85 &self,
86 row_range: &Range<u64>,
87 expr: &Expression,
88 mask: MaskFuture,
89 ) -> VortexResult<ArrayFuture>;
90}
91
92pub type ArrayFuture = BoxFuture<'static, VortexResult<ArrayRef>>;
93
94pub trait ArrayFutureExt {
95 fn masked(self, mask: MaskFuture) -> Self;
96}
97
98impl ArrayFutureExt for ArrayFuture {
99 fn masked(self, mask: MaskFuture) -> Self {
101 Box::pin(async move {
102 let (array, mask) = try_join!(self, mask)?;
103 array.mask(mask.into_array())
104 })
105 }
106}
107
108pub struct LazyReaderChildren {
109 children: Arc<dyn LayoutChildren>,
110 dtypes: Vec<DType>,
111 names: Vec<Arc<str>>,
112 segment_source: Arc<dyn SegmentSource>,
113 session: VortexSession,
114 cache: Vec<OnceCell<LayoutReaderRef>>,
116}
117
118impl LazyReaderChildren {
119 pub fn new(
120 children: Arc<dyn LayoutChildren>,
121 dtypes: Vec<DType>,
122 names: Vec<Arc<str>>,
123 segment_source: Arc<dyn SegmentSource>,
124 session: VortexSession,
125 ) -> Self {
126 let nchildren = children.nchildren();
127 let cache = (0..nchildren).map(|_| OnceCell::new()).collect();
128 Self {
129 children,
130 dtypes,
131 names,
132 segment_source,
133 session,
134 cache,
135 }
136 }
137
138 pub fn get(&self, idx: usize) -> VortexResult<&LayoutReaderRef> {
139 if idx >= self.cache.len() {
140 vortex_bail!("Child index out of bounds: {} of {}", idx, self.cache.len());
141 }
142
143 self.cache[idx].get_or_try_init(|| {
144 let dtype = &self.dtypes[idx];
145 let child = self.children.child(idx, dtype)?;
146 child.new_reader(
147 Arc::clone(&self.names[idx]),
148 self.segment_source.clone(),
149 &self.session,
150 )
151 })
152 }
153}