1mod expr;
5
6use std::collections::BTreeSet;
7use std::fmt::Display;
8use std::fmt::Formatter;
9use std::ops::BitAnd;
10use std::ops::Range;
11use std::sync::Arc;
12use std::sync::OnceLock;
13
14use Nullability::NonNullable;
15pub use expr::*;
16use futures::FutureExt;
17use futures::future::BoxFuture;
18use vortex_array::ArrayRef;
19use vortex_array::Canonical;
20use vortex_array::IntoArray;
21use vortex_array::MaskFuture;
22use vortex_array::VortexSessionExecute;
23use vortex_array::dtype::DType;
24use vortex_array::dtype::FieldMask;
25use vortex_array::dtype::FieldName;
26use vortex_array::dtype::Nullability;
27use vortex_array::dtype::PType;
28use vortex_array::expr::ExactExpr;
29use vortex_array::expr::Expression;
30use vortex_array::expr::is_root;
31use vortex_array::expr::root;
32use vortex_array::expr::transform::PartitionedExpr;
33use vortex_array::expr::transform::partition;
34use vortex_array::expr::transform::replace;
35use vortex_array::scalar::PValue;
36use vortex_error::VortexExpect;
37use vortex_error::VortexResult;
38use vortex_mask::Mask;
39use vortex_sequence::Sequence;
40use vortex_sequence::SequenceArray;
41use vortex_session::VortexSession;
42use vortex_utils::aliases::dash_map::DashMap;
43
44use crate::ArrayFuture;
45use crate::LayoutReader;
46use crate::SplitRange;
47use crate::layouts::partitioned::PartitionedExprEval;
48
49pub struct RowIdxLayoutReader {
50 name: Arc<str>,
51 row_offset: u64,
52 child: Arc<dyn LayoutReader>,
53 partition_cache: DashMap<ExactExpr, Arc<OnceLock<Partitioning>>>,
54 session: VortexSession,
55}
56
57impl RowIdxLayoutReader {
58 pub fn new(row_offset: u64, child: Arc<dyn LayoutReader>, session: VortexSession) -> Self {
59 Self {
60 name: Arc::clone(child.name()),
61 row_offset,
62 child,
63 partition_cache: DashMap::with_hasher(Default::default()),
64 session,
65 }
66 }
67
68 fn partition_expr(&self, expr: &Expression) -> VortexResult<Partitioning> {
69 let key = ExactExpr(expr.clone());
70
71 if let Some(entry) = self.partition_cache.get(&key)
73 && let Some(partitioning) = entry.value().get()
74 {
75 return Ok(partitioning.clone());
76 }
77
78 let result = self.compute_partitioning(expr)?;
79
80 self.partition_cache
81 .entry(key)
82 .or_insert_with(|| Arc::new(OnceLock::new()))
83 .get_or_init(|| result.clone());
84
85 Ok(result)
86 }
87
88 fn compute_partitioning(&self, expr: &Expression) -> VortexResult<Partitioning> {
89 let mut partitioned = partition(expr.clone(), self.dtype(), |expr| {
91 if expr.is::<RowIdx>() {
92 vec![Partition::RowIdx]
93 } else if is_root(expr) {
94 vec![Partition::Child]
95 } else {
96 vec![]
97 }
98 })?;
99
100 if partitioned.partitions.len() == 1 {
102 return Ok(match &partitioned.partition_annotations[0] {
103 Partition::RowIdx => {
104 Partitioning::RowIdx(replace(expr.clone(), &row_idx(), root()))
105 }
106 Partition::Child => Partitioning::Child(expr.clone()),
107 });
108 }
109
110 partitioned.partitions = partitioned
112 .partitions
113 .into_iter()
114 .map(|p| replace(p, &row_idx(), root()))
115 .collect();
116
117 Ok(Partitioning::Partitioned(Arc::new(partitioned)))
118 }
119}
120
121#[derive(Clone)]
122enum Partitioning {
123 RowIdx(Expression),
125 Child(Expression),
127 Partitioned(Arc<PartitionedExpr<Partition>>),
129}
130
131#[derive(Clone, PartialEq, Eq, Hash)]
132enum Partition {
133 RowIdx,
134 Child,
135}
136
137impl Partition {
138 pub fn name(&self) -> &str {
139 match self {
140 Partition::RowIdx => "row_idx",
141 Partition::Child => "child",
142 }
143 }
144}
145
146impl From<Partition> for FieldName {
147 fn from(value: Partition) -> Self {
148 FieldName::from(value.name())
149 }
150}
151
152impl Display for Partition {
153 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
154 write!(f, "{}", self.name())
155 }
156}
157
158impl LayoutReader for RowIdxLayoutReader {
159 fn name(&self) -> &Arc<str> {
160 &self.name
161 }
162
163 fn dtype(&self) -> &DType {
164 self.child.dtype()
165 }
166
167 fn row_count(&self) -> u64 {
168 self.child.row_count()
169 }
170
171 fn register_splits(
172 &self,
173 field_mask: &[FieldMask],
174 split_range: &SplitRange,
175 splits: &mut BTreeSet<u64>,
176 ) -> VortexResult<()> {
177 self.child.register_splits(field_mask, split_range, splits)
178 }
179
180 fn pruning_evaluation(
181 &self,
182 row_range: &Range<u64>,
183 expr: &Expression,
184 mask: Mask,
185 ) -> VortexResult<MaskFuture> {
186 Ok(match &self.partition_expr(expr)? {
187 Partitioning::RowIdx(expr) => row_idx_mask_future(
188 self.row_offset,
189 row_range,
190 expr,
191 MaskFuture::ready(mask),
192 self.session.clone(),
193 ),
194 Partitioning::Child(expr) => self.child.pruning_evaluation(row_range, expr, mask)?,
195 Partitioning::Partitioned(..) => MaskFuture::ready(mask),
196 })
197 }
198
199 fn filter_evaluation(
200 &self,
201 row_range: &Range<u64>,
202 expr: &Expression,
203 mask: MaskFuture,
204 ) -> VortexResult<MaskFuture> {
205 match &self.partition_expr(expr)? {
206 Partitioning::RowIdx(_) => Ok(mask),
209 Partitioning::Child(expr) => self.child.filter_evaluation(row_range, expr, mask),
210 Partitioning::Partitioned(p) => Arc::clone(p).into_mask_future(
211 mask,
212 |annotation, expr, mask| match annotation {
213 Partition::RowIdx => Ok(row_idx_mask_future(
214 self.row_offset,
215 row_range,
216 expr,
217 mask,
218 self.session.clone(),
219 )),
220 Partition::Child => self.child.filter_evaluation(row_range, expr, mask),
221 },
222 |annotation, expr, mask| match annotation {
223 Partition::RowIdx => Ok(row_idx_array_future(
224 self.row_offset,
225 row_range,
226 expr,
227 mask,
228 self.session.clone(),
229 )),
230 Partition::Child => self.child.projection_evaluation(row_range, expr, mask),
231 },
232 self.session.clone(),
233 ),
234 }
235 }
236
237 fn projection_evaluation(
238 &self,
239 row_range: &Range<u64>,
240 expr: &Expression,
241 mask: MaskFuture,
242 ) -> VortexResult<BoxFuture<'static, VortexResult<ArrayRef>>> {
243 match &self.partition_expr(expr)? {
244 Partitioning::RowIdx(expr) => Ok(row_idx_array_future(
245 self.row_offset,
246 row_range,
247 expr,
248 mask,
249 self.session.clone(),
250 )),
251 Partitioning::Child(expr) => self.child.projection_evaluation(row_range, expr, mask),
252 Partitioning::Partitioned(p) => {
253 Arc::clone(p).into_array_future(mask, |annotation, expr, mask| match annotation {
254 Partition::RowIdx => Ok(row_idx_array_future(
255 self.row_offset,
256 row_range,
257 expr,
258 mask,
259 self.session.clone(),
260 )),
261 Partition::Child => self.child.projection_evaluation(row_range, expr, mask),
262 })
263 }
264 }
265 }
266
267 fn as_any(&self) -> &dyn std::any::Any {
268 self
269 }
270}
271
272fn idx_array(row_offset: u64, row_range: &Range<u64>) -> SequenceArray {
274 Sequence::try_new(
275 PValue::U64(row_offset + row_range.start),
276 PValue::U64(1),
277 PType::U64,
278 NonNullable,
279 usize::try_from(row_range.end - row_range.start)
280 .vortex_expect("Row range length must fit in usize"),
281 )
282 .vortex_expect("Failed to create row index array")
283}
284
285fn row_idx_mask_future(
286 row_offset: u64,
287 row_range: &Range<u64>,
288 expr: &Expression,
289 mask: MaskFuture,
290 session: VortexSession,
291) -> MaskFuture {
292 let row_range = row_range.clone();
293 let expr = expr.clone();
294 MaskFuture::new(mask.len(), async move {
295 let array = idx_array(row_offset, &row_range).into_array();
296
297 let mut ctx = session.create_execution_ctx();
298 let result_mask = array.apply(&expr)?.execute::<Mask>(&mut ctx)?;
299
300 Ok(result_mask.bitand(&mask.await?))
301 })
302}
303
304fn row_idx_array_future(
305 row_offset: u64,
306 row_range: &Range<u64>,
307 expr: &Expression,
308 mask: MaskFuture,
309 session: VortexSession,
310) -> ArrayFuture {
311 let row_range = row_range.clone();
312 let expr = expr.clone();
313 async move {
314 let array = idx_array(row_offset, &row_range).into_array();
315 let filtered = array.filter(mask.await?)?;
316 let mut ctx = session.create_execution_ctx();
317 let array = filtered.execute::<Canonical>(&mut ctx)?.into_array();
318 array.apply(&expr)
319 }
320 .boxed()
321}
322
323#[cfg(test)]
324mod tests {
325 use std::sync::Arc;
326
327 use vortex_array::ArrayContext;
328 use vortex_array::IntoArray as _;
329 use vortex_array::MaskFuture;
330 use vortex_array::arrays::BoolArray;
331 use vortex_array::assert_arrays_eq;
332 use vortex_array::expr::eq;
333 use vortex_array::expr::gt;
334 use vortex_array::expr::lit;
335 use vortex_array::expr::or;
336 use vortex_array::expr::root;
337 use vortex_buffer::buffer;
338 use vortex_io::runtime::single::block_on;
339 use vortex_io::session::RuntimeSessionExt;
340
341 use crate::LayoutReader;
342 use crate::LayoutStrategy;
343 use crate::layouts::flat::writer::FlatLayoutStrategy;
344 use crate::layouts::row_idx::RowIdxLayoutReader;
345 use crate::layouts::row_idx::row_idx;
346 use crate::segments::TestSegments;
347 use crate::sequence::SequenceId;
348 use crate::sequence::SequentialArrayStreamExt;
349 use crate::test::SESSION;
350
351 #[test]
352 fn flat_expr_no_row_id() {
353 block_on(|handle| async {
354 let session = SESSION.clone().with_handle(handle);
355 let ctx = ArrayContext::empty();
356 let segments = Arc::new(TestSegments::default());
357 let (ptr, eof) = SequenceId::root().split();
358 let array = buffer![1..=5].into_array();
359 let layout = FlatLayoutStrategy::default()
360 .write_stream(
361 ctx,
362 Arc::<TestSegments>::clone(&segments),
363 array.to_array_stream().sequenced(ptr),
364 eof,
365 &session,
366 )
367 .await
368 .unwrap();
369
370 let expr = eq(root(), lit(3i32));
371 let result = RowIdxLayoutReader::new(
372 0,
373 layout.new_reader("".into(), segments, &SESSION).unwrap(),
374 SESSION.clone(),
375 )
376 .projection_evaluation(
377 &(0..layout.row_count()),
378 &expr,
379 MaskFuture::new_true(layout.row_count().try_into().unwrap()),
380 )
381 .unwrap()
382 .await
383 .unwrap();
384
385 assert_arrays_eq!(
386 result,
387 BoolArray::from_iter([false, false, true, false, false])
388 );
389 })
390 }
391
392 #[test]
393 fn flat_expr_row_id() {
394 block_on(|handle| async {
395 let session = SESSION.clone().with_handle(handle);
396 let ctx = ArrayContext::empty();
397 let segments = Arc::new(TestSegments::default());
398 let (ptr, eof) = SequenceId::root().split();
399 let array = buffer![1..=5].into_array();
400 let layout = FlatLayoutStrategy::default()
401 .write_stream(
402 ctx,
403 Arc::<TestSegments>::clone(&segments),
404 array.to_array_stream().sequenced(ptr),
405 eof,
406 &session,
407 )
408 .await
409 .unwrap();
410
411 let expr = gt(row_idx(), lit(3u64));
412 let result = RowIdxLayoutReader::new(
413 0,
414 layout.new_reader("".into(), segments, &SESSION).unwrap(),
415 SESSION.clone(),
416 )
417 .projection_evaluation(
418 &(0..layout.row_count()),
419 &expr,
420 MaskFuture::new_true(layout.row_count().try_into().unwrap()),
421 )
422 .unwrap()
423 .await
424 .unwrap();
425
426 assert_arrays_eq!(
427 result,
428 BoolArray::from_iter([false, false, false, false, true])
429 );
430 })
431 }
432
433 #[test]
434 fn flat_expr_or() {
435 block_on(|handle| async {
436 let session = SESSION.clone().with_handle(handle);
437 let ctx = ArrayContext::empty();
438 let segments = Arc::new(TestSegments::default());
439 let (ptr, eof) = SequenceId::root().split();
440 let array = buffer![1..=5].into_array();
441 let layout = FlatLayoutStrategy::default()
442 .write_stream(
443 ctx,
444 Arc::<TestSegments>::clone(&segments),
445 array.to_array_stream().sequenced(ptr),
446 eof,
447 &session,
448 )
449 .await
450 .unwrap();
451
452 let expr = or(
453 eq(root(), lit(3i32)),
454 or(gt(row_idx(), lit(3u64)), eq(root(), lit(1i32))),
455 );
456
457 let result = RowIdxLayoutReader::new(
458 0,
459 layout.new_reader("".into(), segments, &SESSION).unwrap(),
460 SESSION.clone(),
461 )
462 .projection_evaluation(
463 &(0..layout.row_count()),
464 &expr,
465 MaskFuture::new_true(layout.row_count().try_into().unwrap()),
466 )
467 .unwrap()
468 .await
469 .unwrap();
470
471 assert_arrays_eq!(
472 result,
473 BoolArray::from_iter([true, false, true, false, true])
474 );
475 })
476 }
477}