1mod expr;
5
6use std::collections::BTreeSet;
7use std::fmt::{Display, Formatter};
8use std::ops::{BitAnd, Range};
9use std::sync::Arc;
10
11use Nullability::NonNullable;
12pub use expr::*;
13use futures::FutureExt;
14use futures::future::BoxFuture;
15use vortex_array::compute::filter;
16use vortex_array::stats::Precision;
17use vortex_array::{ArrayRef, IntoArray, MaskFuture};
18use vortex_dtype::{DType, FieldMask, Nullability, PType};
19use vortex_error::{VortexExpect, VortexResult};
20use vortex_expr::transform::{PartitionedExpr, partition, replace};
21use vortex_expr::{ExactExpr, ExprRef, Scope, is_root, root};
22use vortex_mask::Mask;
23use vortex_scalar::PValue;
24use vortex_sequence::SequenceArray;
25use vortex_utils::aliases::dash_map::DashMap;
26
27use crate::layouts::partitioned::PartitionedExprEval;
28use crate::{ArrayFuture, LayoutReader};
29
30pub struct RowIdxLayoutReader {
31 name: Arc<str>,
32 row_offset: u64,
33 child: Arc<dyn LayoutReader>,
34
35 partition_cache: DashMap<ExactExpr, Partitioning>,
36}
37
38impl RowIdxLayoutReader {
39 pub fn new(row_offset: u64, child: Arc<dyn LayoutReader>) -> Self {
40 Self {
41 name: child.name().clone(),
42 row_offset,
43 child,
44 partition_cache: DashMap::with_hasher(Default::default()),
45 }
46 }
47
48 fn partition_expr(&self, expr: &ExprRef) -> Partitioning {
49 self.partition_cache
50 .entry(ExactExpr(expr.clone()))
51 .or_insert_with(|| {
52 let mut partitioned = partition(expr.clone(), self.dtype(), |expr| {
54 if expr.is::<RowIdxVTable>() {
55 vec![Partition::RowIdx]
56 } else if is_root(expr) {
57 vec![Partition::Child]
58 } else {
59 vec![]
60 }
61 })
62 .vortex_expect("We should not fail to partition expression over struct fields");
63
64 if partitioned.partitions.len() == 1 {
66 return match &partitioned.partition_annotations[0] {
67 Partition::RowIdx => {
68 Partitioning::RowIdx(replace(expr.clone(), &row_idx(), root()))
69 }
70 Partition::Child => Partitioning::Child(expr.clone()),
71 };
72 }
73
74 partitioned.partitions = partitioned
76 .partitions
77 .into_iter()
78 .map(|p| replace(p, &row_idx(), root()))
79 .collect();
80
81 Partitioning::Partitioned(Arc::new(partitioned))
82 })
83 .clone()
84 }
85}
86
87#[derive(Clone)]
88enum Partitioning {
89 RowIdx(ExprRef),
91 Child(ExprRef),
93 Partitioned(Arc<PartitionedExpr<Partition>>),
95}
96
97#[derive(Clone, PartialEq, Eq, Hash)]
98enum Partition {
99 RowIdx,
100 Child,
101}
102
103impl Display for Partition {
104 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
105 match self {
106 Partition::RowIdx => write!(f, "row_idx"),
107 Partition::Child => write!(f, "child"),
108 }
109 }
110}
111
112impl LayoutReader for RowIdxLayoutReader {
113 fn name(&self) -> &Arc<str> {
114 &self.name
115 }
116
117 fn dtype(&self) -> &DType {
118 self.child.dtype()
119 }
120
121 fn row_count(&self) -> Precision<u64> {
122 self.child.row_count()
123 }
124
125 fn register_splits(
126 &self,
127 field_mask: &[FieldMask],
128 row_offset: u64,
129 splits: &mut BTreeSet<u64>,
130 ) -> VortexResult<()> {
131 self.child.register_splits(field_mask, row_offset, splits)
132 }
133
134 fn pruning_evaluation(
135 &self,
136 row_range: &Range<u64>,
137 expr: &ExprRef,
138 mask: Mask,
139 ) -> VortexResult<MaskFuture> {
140 Ok(match &self.partition_expr(expr) {
141 Partitioning::RowIdx(expr) => {
142 row_idx_mask_future(self.row_offset, row_range, expr, MaskFuture::ready(mask))
143 }
144 Partitioning::Child(expr) => self.child.pruning_evaluation(row_range, expr, mask)?,
145 Partitioning::Partitioned(..) => MaskFuture::ready(mask),
146 })
147 }
148
149 fn filter_evaluation(
150 &self,
151 row_range: &Range<u64>,
152 expr: &ExprRef,
153 mask: MaskFuture,
154 ) -> VortexResult<MaskFuture> {
155 match &self.partition_expr(expr) {
156 Partitioning::RowIdx(_) => Ok(mask),
159 Partitioning::Child(expr) => self.child.filter_evaluation(row_range, expr, mask),
160 Partitioning::Partitioned(p) => p.clone().into_mask_future(
161 mask,
162 |annotation, expr, mask| match annotation {
163 Partition::RowIdx => {
164 Ok(row_idx_mask_future(self.row_offset, row_range, expr, mask))
165 }
166 Partition::Child => self.child.filter_evaluation(row_range, expr, mask),
167 },
168 |annotation, expr, mask| match annotation {
169 Partition::RowIdx => {
170 Ok(row_idx_array_future(self.row_offset, row_range, expr, mask))
171 }
172 Partition::Child => self.child.projection_evaluation(row_range, expr, mask),
173 },
174 ),
175 }
176 }
177
178 fn projection_evaluation(
179 &self,
180 row_range: &Range<u64>,
181 expr: &ExprRef,
182 mask: MaskFuture,
183 ) -> VortexResult<BoxFuture<'static, VortexResult<ArrayRef>>> {
184 match &self.partition_expr(expr) {
185 Partitioning::RowIdx(expr) => {
186 Ok(row_idx_array_future(self.row_offset, row_range, expr, mask))
187 }
188 Partitioning::Child(expr) => self.child.projection_evaluation(row_range, expr, mask),
189 Partitioning::Partitioned(p) => {
190 p.clone()
191 .into_array_future(mask, |annotation, expr, mask| match annotation {
192 Partition::RowIdx => {
193 Ok(row_idx_array_future(self.row_offset, row_range, expr, mask))
194 }
195 Partition::Child => self.child.projection_evaluation(row_range, expr, mask),
196 })
197 }
198 }
199 }
200}
201
202fn idx_array(row_offset: u64, row_range: &Range<u64>) -> SequenceArray {
204 SequenceArray::new(
205 PValue::U64(row_offset + row_range.start),
206 PValue::U64(1),
207 PType::U64,
208 NonNullable,
209 usize::try_from(row_range.end - row_range.start)
210 .vortex_expect("Row range length must fit in usize"),
211 )
212 .vortex_expect("Failed to create row index array")
213}
214
215fn row_idx_mask_future(
216 row_offset: u64,
217 row_range: &Range<u64>,
218 expr: &ExprRef,
219 mask: MaskFuture,
220) -> MaskFuture {
221 let row_range = row_range.clone();
222 let expr = expr.clone();
223 MaskFuture::new(mask.len(), async move {
224 let array = idx_array(row_offset, &row_range).into_array();
225 let result_mask = expr
226 .evaluate(&Scope::new(array))?
227 .try_to_mask_fill_null_false()?;
228 Ok(result_mask.bitand(&mask.await?))
229 })
230}
231
232fn row_idx_array_future(
233 row_offset: u64,
234 row_range: &Range<u64>,
235 expr: &ExprRef,
236 mask: MaskFuture,
237) -> ArrayFuture {
238 let row_range = row_range.clone();
239 let expr = expr.clone();
240 async move {
241 let array = idx_array(row_offset, &row_range).into_array();
242 let array = filter(&array, &mask.await?)?;
243 expr.evaluate(&Scope::new(array))
244 }
245 .boxed()
246}
247
248#[cfg(test)]
249mod tests {
250 use std::sync::Arc;
251
252 use arrow_buffer::BooleanBuffer;
253 use itertools::Itertools;
254 use vortex_array::{ArrayContext, IntoArray as _, MaskFuture, ToCanonical};
255 use vortex_buffer::buffer;
256 use vortex_expr::{eq, gt, lit, or, root};
257 use vortex_io::runtime::single::block_on;
258
259 use crate::layouts::flat::writer::FlatLayoutStrategy;
260 use crate::layouts::row_idx::{RowIdxLayoutReader, row_idx};
261 use crate::segments::TestSegments;
262 use crate::sequence::{SequenceId, SequentialArrayStreamExt};
263 use crate::{LayoutReader, LayoutStrategy};
264
265 #[test]
266 fn flat_expr_no_row_id() {
267 block_on(|handle| async {
268 let ctx = ArrayContext::empty();
269 let segments = Arc::new(TestSegments::default());
270 let (ptr, eof) = SequenceId::root().split();
271 let array = buffer![1..=5].into_array();
272 let layout = FlatLayoutStrategy::default()
273 .write_stream(
274 ctx,
275 segments.clone(),
276 array.to_array_stream().sequenced(ptr),
277 eof,
278 handle,
279 )
280 .await
281 .unwrap();
282
283 let expr = eq(root(), lit(3i32));
284 let result =
285 RowIdxLayoutReader::new(0, layout.new_reader("".into(), segments).unwrap())
286 .projection_evaluation(
287 &(0..layout.row_count()),
288 &expr,
289 MaskFuture::new_true(layout.row_count().try_into().unwrap()),
290 )
291 .unwrap()
292 .await
293 .unwrap()
294 .to_bool();
295
296 assert_eq!(
297 &BooleanBuffer::from_iter([false, false, true, false, false]),
298 result.boolean_buffer()
299 );
300 })
301 }
302
303 #[test]
304 fn flat_expr_row_id() {
305 block_on(|handle| async {
306 let ctx = ArrayContext::empty();
307 let segments = Arc::new(TestSegments::default());
308 let (ptr, eof) = SequenceId::root().split();
309 let array = buffer![1..=5].into_array();
310 let layout = FlatLayoutStrategy::default()
311 .write_stream(
312 ctx,
313 segments.clone(),
314 array.to_array_stream().sequenced(ptr),
315 eof,
316 handle,
317 )
318 .await
319 .unwrap();
320
321 let expr = gt(row_idx(), lit(3u64));
322 let result =
323 RowIdxLayoutReader::new(0, layout.new_reader("".into(), segments).unwrap())
324 .projection_evaluation(
325 &(0..layout.row_count()),
326 &expr,
327 MaskFuture::new_true(layout.row_count().try_into().unwrap()),
328 )
329 .unwrap()
330 .await
331 .unwrap()
332 .to_bool();
333
334 assert_eq!(
335 &BooleanBuffer::from_iter([false, false, false, false, true]),
336 result.boolean_buffer()
337 );
338 })
339 }
340
341 #[test]
342 fn flat_expr_or() {
343 block_on(|handle| async {
344 let ctx = ArrayContext::empty();
345 let segments = Arc::new(TestSegments::default());
346 let (ptr, eof) = SequenceId::root().split();
347 let array = buffer![1..=5].into_array();
348 let layout = FlatLayoutStrategy::default()
349 .write_stream(
350 ctx,
351 segments.clone(),
352 array.to_array_stream().sequenced(ptr),
353 eof,
354 handle,
355 )
356 .await
357 .unwrap();
358
359 let expr = or(
360 eq(root(), lit(3i32)),
361 or(gt(row_idx(), lit(3u64)), eq(root(), lit(1i32))),
362 );
363
364 let result =
365 RowIdxLayoutReader::new(0, layout.new_reader("".into(), segments).unwrap())
366 .projection_evaluation(
367 &(0..layout.row_count()),
368 &expr,
369 MaskFuture::new_true(layout.row_count().try_into().unwrap()),
370 )
371 .unwrap()
372 .await
373 .unwrap()
374 .to_bool();
375
376 assert_eq!(
377 vec![true, false, true, false, true],
378 result.boolean_buffer().iter().collect_vec()
379 );
380 })
381 }
382}