1use std::collections::BTreeSet;
5use std::ops::Range;
6use std::sync::Arc;
7
8use futures::future::BoxFuture;
9use futures::try_join;
10use once_cell::sync::OnceCell;
11use vortex_array::ArrayRef;
12use vortex_array::MaskFuture;
13use vortex_array::expr::Expression;
14use vortex_dtype::DType;
15use vortex_dtype::FieldMask;
16use vortex_error::VortexResult;
17use vortex_error::vortex_bail;
18use vortex_mask::Mask;
19use vortex_session::VortexSession;
20
21use crate::children::LayoutChildren;
22use crate::segments::SegmentSource;
23
24pub type LayoutReaderRef = Arc<dyn LayoutReader>;
25
26pub trait LayoutReader: 'static + Send + Sync {
29 fn name(&self) -> &Arc<str>;
31
32 fn dtype(&self) -> &DType;
34
35 fn row_count(&self) -> u64;
37
38 fn register_splits(
41 &self,
42 field_mask: &[FieldMask],
43 row_range: &Range<u64>,
44 splits: &mut BTreeSet<u64>,
45 ) -> VortexResult<()>;
46
47 fn pruning_evaluation(
51 &self,
52 row_range: &Range<u64>,
53 expr: &Expression,
54 mask: Mask,
55 ) -> VortexResult<MaskFuture>;
56
57 fn filter_evaluation(
67 &self,
68 row_range: &Range<u64>,
69 expr: &Expression,
70 mask: MaskFuture,
71 ) -> VortexResult<MaskFuture>;
72
73 fn projection_evaluation(
83 &self,
84 row_range: &Range<u64>,
85 expr: &Expression,
86 mask: MaskFuture,
87 ) -> VortexResult<ArrayFuture>;
88}
89
90pub type ArrayFuture = BoxFuture<'static, VortexResult<ArrayRef>>;
91
92pub trait ArrayFutureExt {
93 fn masked(self, mask: MaskFuture) -> Self;
94}
95
96impl ArrayFutureExt for ArrayFuture {
97 fn masked(self, mask: MaskFuture) -> Self {
99 Box::pin(async move {
100 let (array, mask) = try_join!(self, mask)?;
101 vortex_array::compute::mask(array.as_ref(), &mask)
102 })
103 }
104}
105
106pub struct LazyReaderChildren {
107 children: Arc<dyn LayoutChildren>,
108 dtypes: Vec<DType>,
109 names: Vec<Arc<str>>,
110 segment_source: Arc<dyn SegmentSource>,
111 session: VortexSession,
112 cache: Vec<OnceCell<LayoutReaderRef>>,
114}
115
116impl LazyReaderChildren {
117 pub fn new(
118 children: Arc<dyn LayoutChildren>,
119 dtypes: Vec<DType>,
120 names: Vec<Arc<str>>,
121 segment_source: Arc<dyn SegmentSource>,
122 session: VortexSession,
123 ) -> Self {
124 let nchildren = children.nchildren();
125 let cache = (0..nchildren).map(|_| OnceCell::new()).collect();
126 Self {
127 children,
128 dtypes,
129 names,
130 segment_source,
131 session,
132 cache,
133 }
134 }
135
136 pub fn get(&self, idx: usize) -> VortexResult<&LayoutReaderRef> {
137 if idx >= self.cache.len() {
138 vortex_bail!("Child index out of bounds: {} of {}", idx, self.cache.len());
139 }
140
141 self.cache[idx].get_or_try_init(|| {
142 let dtype = &self.dtypes[idx];
143 let child = self.children.child(idx, dtype)?;
144 child.new_reader(
145 Arc::clone(&self.names[idx]),
146 self.segment_source.clone(),
147 &self.session,
148 )
149 })
150 }
151}