1mod expr;
5
6use std::collections::BTreeSet;
7use std::fmt::Display;
8use std::fmt::Formatter;
9use std::ops::BitAnd;
10use std::ops::Range;
11use std::sync::Arc;
12use std::sync::OnceLock;
13
14use Nullability::NonNullable;
15pub use expr::*;
16use futures::FutureExt;
17use futures::future::BoxFuture;
18use vortex_array::ArrayRef;
19use vortex_array::IntoArray;
20use vortex_array::MaskFuture;
21use vortex_array::VortexSessionExecute;
22use vortex_array::dtype::DType;
23use vortex_array::dtype::FieldMask;
24use vortex_array::dtype::FieldName;
25use vortex_array::dtype::Nullability;
26use vortex_array::dtype::PType;
27use vortex_array::expr::ExactExpr;
28use vortex_array::expr::Expression;
29use vortex_array::expr::is_root;
30use vortex_array::expr::root;
31use vortex_array::expr::transform::PartitionedExpr;
32use vortex_array::expr::transform::partition;
33use vortex_array::expr::transform::replace;
34use vortex_array::scalar::PValue;
35use vortex_error::VortexExpect;
36use vortex_error::VortexResult;
37use vortex_mask::Mask;
38use vortex_sequence::Sequence;
39use vortex_sequence::SequenceArray;
40use vortex_session::VortexSession;
41use vortex_utils::aliases::dash_map::DashMap;
42
43use crate::ArrayFuture;
44use crate::LayoutReader;
45use crate::layouts::partitioned::PartitionedExprEval;
46
47pub struct RowIdxLayoutReader {
48 name: Arc<str>,
49 row_offset: u64,
50 child: Arc<dyn LayoutReader>,
51 partition_cache: DashMap<ExactExpr, Arc<OnceLock<Partitioning>>>,
52 session: VortexSession,
53}
54
55impl RowIdxLayoutReader {
56 pub fn new(row_offset: u64, child: Arc<dyn LayoutReader>, session: VortexSession) -> Self {
57 Self {
58 name: Arc::clone(child.name()),
59 row_offset,
60 child,
61 partition_cache: DashMap::with_hasher(Default::default()),
62 session,
63 }
64 }
65
66 fn partition_expr(&self, expr: &Expression) -> Partitioning {
67 let key = ExactExpr(expr.clone());
68
69 if let Some(entry) = self.partition_cache.get(&key)
71 && let Some(partitioning) = entry.value().get()
72 {
73 return partitioning.clone();
74 }
75
76 let cell = self
77 .partition_cache
78 .entry(key)
79 .or_insert_with(|| Arc::new(OnceLock::new()))
80 .clone();
81
82 cell.get_or_init(|| self.compute_partitioning(expr)).clone()
83 }
84
85 fn compute_partitioning(&self, expr: &Expression) -> Partitioning {
86 let mut partitioned = partition(expr.clone(), self.dtype(), |expr| {
88 if expr.is::<RowIdx>() {
89 vec![Partition::RowIdx]
90 } else if is_root(expr) {
91 vec![Partition::Child]
92 } else {
93 vec![]
94 }
95 })
96 .vortex_expect("We should not fail to partition expression over struct fields");
97
98 if partitioned.partitions.len() == 1 {
100 return match &partitioned.partition_annotations[0] {
101 Partition::RowIdx => {
102 Partitioning::RowIdx(replace(expr.clone(), &row_idx(), root()))
103 }
104 Partition::Child => Partitioning::Child(expr.clone()),
105 };
106 }
107
108 partitioned.partitions = partitioned
110 .partitions
111 .into_iter()
112 .map(|p| replace(p, &row_idx(), root()))
113 .collect();
114
115 Partitioning::Partitioned(Arc::new(partitioned))
116 }
117}
118
119#[derive(Clone)]
120enum Partitioning {
121 RowIdx(Expression),
123 Child(Expression),
125 Partitioned(Arc<PartitionedExpr<Partition>>),
127}
128
129#[derive(Clone, PartialEq, Eq, Hash)]
130enum Partition {
131 RowIdx,
132 Child,
133}
134
135impl Partition {
136 pub fn name(&self) -> &str {
137 match self {
138 Partition::RowIdx => "row_idx",
139 Partition::Child => "child",
140 }
141 }
142}
143
144impl From<Partition> for FieldName {
145 fn from(value: Partition) -> Self {
146 FieldName::from(value.name())
147 }
148}
149
150impl Display for Partition {
151 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
152 write!(f, "{}", self.name())
153 }
154}
155
156impl LayoutReader for RowIdxLayoutReader {
157 fn name(&self) -> &Arc<str> {
158 &self.name
159 }
160
161 fn dtype(&self) -> &DType {
162 self.child.dtype()
163 }
164
165 fn row_count(&self) -> u64 {
166 self.child.row_count()
167 }
168
169 fn register_splits(
170 &self,
171 field_mask: &[FieldMask],
172 row_range: &Range<u64>,
173 splits: &mut BTreeSet<u64>,
174 ) -> VortexResult<()> {
175 self.child.register_splits(field_mask, row_range, splits)
176 }
177
178 fn pruning_evaluation(
179 &self,
180 row_range: &Range<u64>,
181 expr: &Expression,
182 mask: Mask,
183 ) -> VortexResult<MaskFuture> {
184 Ok(match &self.partition_expr(expr) {
185 Partitioning::RowIdx(expr) => row_idx_mask_future(
186 self.row_offset,
187 row_range,
188 expr,
189 MaskFuture::ready(mask),
190 self.session.clone(),
191 ),
192 Partitioning::Child(expr) => self.child.pruning_evaluation(row_range, expr, mask)?,
193 Partitioning::Partitioned(..) => MaskFuture::ready(mask),
194 })
195 }
196
197 fn filter_evaluation(
198 &self,
199 row_range: &Range<u64>,
200 expr: &Expression,
201 mask: MaskFuture,
202 ) -> VortexResult<MaskFuture> {
203 match &self.partition_expr(expr) {
204 Partitioning::RowIdx(_) => Ok(mask),
207 Partitioning::Child(expr) => self.child.filter_evaluation(row_range, expr, mask),
208 Partitioning::Partitioned(p) => Arc::clone(p).into_mask_future(
209 mask,
210 |annotation, expr, mask| match annotation {
211 Partition::RowIdx => Ok(row_idx_mask_future(
212 self.row_offset,
213 row_range,
214 expr,
215 mask,
216 self.session.clone(),
217 )),
218 Partition::Child => self.child.filter_evaluation(row_range, expr, mask),
219 },
220 |annotation, expr, mask| match annotation {
221 Partition::RowIdx => {
222 Ok(row_idx_array_future(self.row_offset, row_range, expr, mask))
223 }
224 Partition::Child => self.child.projection_evaluation(row_range, expr, mask),
225 },
226 self.session.clone(),
227 ),
228 }
229 }
230
231 fn projection_evaluation(
232 &self,
233 row_range: &Range<u64>,
234 expr: &Expression,
235 mask: MaskFuture,
236 ) -> VortexResult<BoxFuture<'static, VortexResult<ArrayRef>>> {
237 match &self.partition_expr(expr) {
238 Partitioning::RowIdx(expr) => {
239 Ok(row_idx_array_future(self.row_offset, row_range, expr, mask))
240 }
241 Partitioning::Child(expr) => self.child.projection_evaluation(row_range, expr, mask),
242 Partitioning::Partitioned(p) => {
243 Arc::clone(p).into_array_future(mask, |annotation, expr, mask| match annotation {
244 Partition::RowIdx => {
245 Ok(row_idx_array_future(self.row_offset, row_range, expr, mask))
246 }
247 Partition::Child => self.child.projection_evaluation(row_range, expr, mask),
248 })
249 }
250 }
251 }
252}
253
254fn idx_array(row_offset: u64, row_range: &Range<u64>) -> SequenceArray {
256 Sequence::try_new(
257 PValue::U64(row_offset + row_range.start),
258 PValue::U64(1),
259 PType::U64,
260 NonNullable,
261 usize::try_from(row_range.end - row_range.start)
262 .vortex_expect("Row range length must fit in usize"),
263 )
264 .vortex_expect("Failed to create row index array")
265}
266
267fn row_idx_mask_future(
268 row_offset: u64,
269 row_range: &Range<u64>,
270 expr: &Expression,
271 mask: MaskFuture,
272 session: VortexSession,
273) -> MaskFuture {
274 let row_range = row_range.clone();
275 let expr = expr.clone();
276 MaskFuture::new(mask.len(), async move {
277 let array = idx_array(row_offset, &row_range).into_array();
278
279 let mut ctx = session.create_execution_ctx();
280 let result_mask = array.apply(&expr)?.execute::<Mask>(&mut ctx)?;
281
282 Ok(result_mask.bitand(&mask.await?))
283 })
284}
285
286fn row_idx_array_future(
287 row_offset: u64,
288 row_range: &Range<u64>,
289 expr: &Expression,
290 mask: MaskFuture,
291) -> ArrayFuture {
292 let row_range = row_range.clone();
293 let expr = expr.clone();
294 async move {
295 let array = idx_array(row_offset, &row_range).into_array();
296 let array = array.filter(mask.await?)?.to_canonical()?.into_array();
297 array.apply(&expr)
298 }
299 .boxed()
300}
301
302#[cfg(test)]
303mod tests {
304 use std::sync::Arc;
305
306 use vortex_array::ArrayContext;
307 use vortex_array::IntoArray as _;
308 use vortex_array::MaskFuture;
309 use vortex_array::arrays::BoolArray;
310 use vortex_array::assert_arrays_eq;
311 use vortex_array::expr::eq;
312 use vortex_array::expr::gt;
313 use vortex_array::expr::lit;
314 use vortex_array::expr::or;
315 use vortex_array::expr::root;
316 use vortex_buffer::buffer;
317 use vortex_io::runtime::single::block_on;
318 use vortex_io::session::RuntimeSessionExt;
319
320 use crate::LayoutReader;
321 use crate::LayoutStrategy;
322 use crate::layouts::flat::writer::FlatLayoutStrategy;
323 use crate::layouts::row_idx::RowIdxLayoutReader;
324 use crate::layouts::row_idx::row_idx;
325 use crate::segments::TestSegments;
326 use crate::sequence::SequenceId;
327 use crate::sequence::SequentialArrayStreamExt;
328 use crate::test::SESSION;
329
330 #[test]
331 fn flat_expr_no_row_id() {
332 block_on(|handle| async {
333 let session = SESSION.clone().with_handle(handle);
334 let ctx = ArrayContext::empty();
335 let segments = Arc::new(TestSegments::default());
336 let (ptr, eof) = SequenceId::root().split();
337 let array = buffer![1..=5].into_array();
338 let layout = FlatLayoutStrategy::default()
339 .write_stream(
340 ctx,
341 Arc::<TestSegments>::clone(&segments),
342 array.to_array_stream().sequenced(ptr),
343 eof,
344 &session,
345 )
346 .await
347 .unwrap();
348
349 let expr = eq(root(), lit(3i32));
350 let result = RowIdxLayoutReader::new(
351 0,
352 layout.new_reader("".into(), segments, &SESSION).unwrap(),
353 SESSION.clone(),
354 )
355 .projection_evaluation(
356 &(0..layout.row_count()),
357 &expr,
358 MaskFuture::new_true(layout.row_count().try_into().unwrap()),
359 )
360 .unwrap()
361 .await
362 .unwrap();
363
364 assert_arrays_eq!(
365 result,
366 BoolArray::from_iter([false, false, true, false, false])
367 );
368 })
369 }
370
371 #[test]
372 fn flat_expr_row_id() {
373 block_on(|handle| async {
374 let session = SESSION.clone().with_handle(handle);
375 let ctx = ArrayContext::empty();
376 let segments = Arc::new(TestSegments::default());
377 let (ptr, eof) = SequenceId::root().split();
378 let array = buffer![1..=5].into_array();
379 let layout = FlatLayoutStrategy::default()
380 .write_stream(
381 ctx,
382 Arc::<TestSegments>::clone(&segments),
383 array.to_array_stream().sequenced(ptr),
384 eof,
385 &session,
386 )
387 .await
388 .unwrap();
389
390 let expr = gt(row_idx(), lit(3u64));
391 let result = RowIdxLayoutReader::new(
392 0,
393 layout.new_reader("".into(), segments, &SESSION).unwrap(),
394 SESSION.clone(),
395 )
396 .projection_evaluation(
397 &(0..layout.row_count()),
398 &expr,
399 MaskFuture::new_true(layout.row_count().try_into().unwrap()),
400 )
401 .unwrap()
402 .await
403 .unwrap();
404
405 assert_arrays_eq!(
406 result,
407 BoolArray::from_iter([false, false, false, false, true])
408 );
409 })
410 }
411
412 #[test]
413 fn flat_expr_or() {
414 block_on(|handle| async {
415 let session = SESSION.clone().with_handle(handle);
416 let ctx = ArrayContext::empty();
417 let segments = Arc::new(TestSegments::default());
418 let (ptr, eof) = SequenceId::root().split();
419 let array = buffer![1..=5].into_array();
420 let layout = FlatLayoutStrategy::default()
421 .write_stream(
422 ctx,
423 Arc::<TestSegments>::clone(&segments),
424 array.to_array_stream().sequenced(ptr),
425 eof,
426 &session,
427 )
428 .await
429 .unwrap();
430
431 let expr = or(
432 eq(root(), lit(3i32)),
433 or(gt(row_idx(), lit(3u64)), eq(root(), lit(1i32))),
434 );
435
436 let result = RowIdxLayoutReader::new(
437 0,
438 layout.new_reader("".into(), segments, &SESSION).unwrap(),
439 SESSION.clone(),
440 )
441 .projection_evaluation(
442 &(0..layout.row_count()),
443 &expr,
444 MaskFuture::new_true(layout.row_count().try_into().unwrap()),
445 )
446 .unwrap()
447 .await
448 .unwrap();
449
450 assert_arrays_eq!(
451 result,
452 BoolArray::from_iter([true, false, true, false, true])
453 );
454 })
455 }
456}