vortex_array/executor.rs
1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Iterative array execution.
5//!
6//! The single-step [`Executable`] implementation for [`ArrayRef`] tries `reduce`,
7//! `reduce_parent`, `execute_parent`, then `execute` once. The matcher-driven
8//! [`ArrayRef::execute_until`] loop interprets [`ExecutionStep::ExecuteSlot`],
9//! [`ExecutionStep::AppendChild`], and [`ExecutionStep::Done`] using an explicit stack plus an
10//! optional builder, so encodings can advance without recursive descent.
11//!
12//! See <https://docs.vortex.dev/developer-guide/internals/execution> for the full execution
13//! narrative, diagrams, and walkthroughs.
14
15use std::env::VarError;
16use std::fmt;
17use std::fmt::Display;
18use std::sync::Arc;
19use std::sync::LazyLock;
20#[cfg(debug_assertions)]
21use std::sync::atomic::AtomicUsize;
22#[cfg(debug_assertions)]
23use std::sync::atomic::Ordering;
24
25use vortex_error::VortexExpect;
26use vortex_error::VortexResult;
27use vortex_error::vortex_bail;
28use vortex_error::vortex_ensure;
29use vortex_error::vortex_panic;
30use vortex_session::VortexSession;
31
32use crate::AnyCanonical;
33use crate::ArrayRef;
34use crate::Canonical;
35use crate::IntoArray;
36use crate::array::ArrayId;
37use crate::builders::ArrayBuilder;
38use crate::builders::builder_with_capacity_in;
39use crate::dtype::DType;
40use crate::matcher::Matcher;
41use crate::memory::HostAllocatorRef;
42use crate::memory::MemorySessionExt;
43use crate::optimizer::ArrayOptimizer;
44use crate::optimizer::kernels::ArrayKernelsExt;
45use crate::optimizer::kernels::ParentExecutionKernels;
46use crate::optimizer::kernels::execute_parent_key;
47use crate::stats::ArrayStats;
48use crate::stats::StatsSet;
49
50/// Returns the maximum number of iterations to attempt when executing an array before giving up and returning
51/// an error, can be by the `VORTEX_MAX_ITERATIONS` env variables, otherwise defaults to 2^22.
52pub(crate) fn max_iterations() -> usize {
53 static MAX_ITERATIONS: LazyLock<usize> =
54 LazyLock::new(|| match std::env::var("VORTEX_MAX_ITERATIONS") {
55 Ok(val) => val.parse::<usize>().unwrap_or_else(|e| {
56 vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid usize: {e}")
57 }),
58 Err(VarError::NotPresent) => 2 << 21, // 2 ^ 22
59 Err(VarError::NotUnicode(_)) => {
60 vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid unicode string")
61 }
62 });
63 *MAX_ITERATIONS
64}
65
66/// Marker trait for types that an [`ArrayRef`] can be executed into.
67///
68/// Implementors must provide an implementation of `execute` that takes
69/// an [`ArrayRef`] and an [`ExecutionCtx`], and produces an instance of the
70/// implementor type.
71///
72/// Users should use the `Array::execute` or `Array::execute_as` methods
73pub trait Executable: Sized {
74 fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self>;
75}
76
77#[expect(clippy::same_name_method)]
78impl ArrayRef {
79 /// Execute this array to produce an instance of `E`.
80 ///
81 /// See the [`Executable`] implementation for details on how this execution is performed.
82 pub fn execute<E: Executable>(self, ctx: &mut ExecutionCtx) -> VortexResult<E> {
83 E::execute(self, ctx)
84 }
85
86 /// Execute this array, labeling the execution step with a name for tracing.
87 pub fn execute_as<E: Executable>(
88 self,
89 _name: &'static str,
90 ctx: &mut ExecutionCtx,
91 ) -> VortexResult<E> {
92 E::execute(self, ctx)
93 }
94
95 /// Iteratively execute this array until the [`Matcher`] matches, using an explicit work
96 /// stack plus an optional builder for `AppendChild`.
97 ///
98 /// Note: the returned array may not match `M`. If execution converges to a canonical form
99 /// that does not match `M`, the canonical array is returned since no further execution
100 /// progress is possible.
101 ///
102 /// For safety, this errors once execution reaches a configurable maximum number of
103 /// iterations (default `2^22`, override with `VORTEX_MAX_ITERATIONS`).
104 ///
105 /// # Loop state
106 ///
107 /// - `current_array: ArrayRef` -- the array currently in focus.
108 /// - `current_builder: Option<Box<dyn ArrayBuilder>>` -- active only for builder-mode
109 /// execution. `AppendChild` appends detached children here. `Done` finishes the builder
110 /// and turns it back into the next `current_array`.
111 /// - `stack: Vec<StackFrame>` -- suspended parents from `ExecuteSlot`, including the
112 /// detached slot index, its [`DonePredicate`], and the parent builder that was active
113 /// before focus moved into the child.
114 ///
115 /// Example after `ExecuteSlot(1, pred)` has focused slot 1 of a parent:
116 ///
117 /// ```text
118 /// stack[top].parent_array:
119 /// RunEnd <-- suspended parent
120 /// +-- slot 0: ends
121 /// +-- slot 1: _ (detached)
122 ///
123 /// current_array:
124 /// DictEncoding <-- focused child
125 /// +-- slot 0: codes
126 /// +-- slot 1: dictionary
127 ///
128 /// current_builder:
129 /// None
130 /// ```
131 ///
132 /// Each loop iteration works like this:
133 ///
134 /// ```text
135 /// loop:
136 /// Step 1: done(current_array)?
137 /// - root activation -> return current_array
138 /// - ExecuteSlot frame -> pop, reattach child, resume parent
139 ///
140 /// Step 2: current_builder active?
141 /// - yes -> skip Step 2a / 2b
142 /// - no -> try parent kernels
143 ///
144 /// Step 2a: if stack.top exists:
145 /// parent = stack.top.parent_array
146 /// child = current_array
147 /// kernels[(parent.encoding_id(), child.encoding_id())]
148 /// .try_execute_parent(child, parent, stack.top.slot_idx)
149 ///
150 /// Step 2b: for child in current_array.children():
151 /// parent = current_array
152 /// kernels[(parent.encoding_id(), child.encoding_id())]
153 /// .try_execute_parent(child, parent, child.slot_idx)
154 ///
155 /// Step 3: match current_array.execute()
156 /// ExecuteSlot(i, pred) -> push parent on stack, focus child `i`
157 /// AppendChild(i) -> detach child `i`, append it into current_builder,
158 /// keep parent as current_array
159 /// Done -> finish current_builder if present, else use returned array
160 /// ```
161 ///
162 /// Step 2a and Step 2b are skipped while `current_builder` is active. `AppendChild`
163 /// partially consumes `current_array`: some slots already live in the builder, so a
164 /// parent rewrite would observe inconsistent state and could discard accumulated builder
165 /// data.
166 pub fn execute_until<M: Matcher>(self, ctx: &mut ExecutionCtx) -> VortexResult<ArrayRef> {
167 let mut current_array = self;
168 let mut current_builder: Option<Box<dyn ArrayBuilder>> = None;
169 let mut stack: Vec<StackFrame> = Vec::new();
170 let execute_parent_kernels = Arc::clone(&ctx.execute_parent_kernels);
171 let kernels = execute_parent_kernels.as_ref();
172 let max_iterations = max_iterations();
173
174 for _ in 0..max_iterations {
175 let is_done = stack
176 .last()
177 .map_or(M::matches as DonePredicate, |frame| frame.done);
178
179 if is_done(¤t_array) || AnyCanonical::matches(¤t_array) {
180 match stack.pop() {
181 None => {
182 debug_assert!(
183 current_builder.is_none(),
184 "root activation should not retain a builder"
185 );
186 ctx.log(format_args!("-> {}", current_array));
187 return Ok(current_array);
188 }
189 Some(frame) => {
190 (current_array, current_builder) = pop_frame(frame, current_array)?;
191 continue;
192 }
193 }
194 }
195
196 // Step 2a: execute_parent against the suspended parent from ExecuteSlot.
197 //
198 // When executing a child for ExecuteSlot, try execute_parent against
199 // the suspended parent on the stack. This lets kernels like RunEnd's
200 // FilterKernel fire before the child is forced to canonical.
201 //
202 // Skip when a builder is active: the current array has been partially
203 // consumed by AppendChild (some slots are already in the builder), so
204 // a parent rewrite would see inconsistent state and the builder data
205 // would be lost when we restore frame.parent_builder.
206 if current_builder.is_none()
207 && let Some(frame) = stack.last()
208 && let Some(result) = {
209 execute_parent_for_child(
210 &frame.parent_array,
211 ¤t_array,
212 frame.slot_idx,
213 kernels,
214 ctx,
215 )?
216 }
217 {
218 ctx.log(format_args!(
219 "execute_parent (stack) rewrote {} -> {}",
220 current_array, result
221 ));
222 let frame = stack.pop().vortex_expect("just peeked");
223 current_array = result.optimize_ctx(ctx.session())?;
224 current_builder = frame.parent_builder;
225 continue;
226 }
227
228 // Step 2b: execute_parent against current_array's own children.
229 if current_builder.is_none()
230 && let Some(rewritten) = try_execute_parent(¤t_array, kernels, ctx)?
231 {
232 ctx.log(format_args!(
233 "execute_parent rewrote {} -> {}",
234 current_array, rewritten
235 ));
236 current_array = rewritten.optimize_ctx(ctx.session())?;
237 continue;
238 }
239
240 // execute step
241 let expected_len = current_array.len();
242 let expected_dtype = current_array.dtype().clone();
243 let stats = current_array.statistics().to_array_stats();
244 let encoding_id = current_array.encoding_id();
245 let result = current_array.execute_encoding_unchecked(ctx)?;
246 let (array, step) = result.into_parts();
247 match step {
248 ExecutionStep::ExecuteSlot(i, done) => {
249 let (parent, child) = unsafe { array.take_slot_unchecked(i) }?;
250 ctx.log(format_args!(
251 "ExecuteSlot({i}): pushing {}, focusing on {}",
252 parent, child
253 ));
254 stack.push(StackFrame {
255 parent_array: parent,
256 parent_builder: current_builder.take(),
257 slot_idx: i,
258 done,
259 original_dtype: child.dtype().clone(),
260 original_len: child.len(),
261 });
262 current_array = child;
263 current_builder = None;
264 }
265 ExecutionStep::AppendChild(i) => {
266 if current_builder.is_none() {
267 current_builder = Some(builder_with_capacity_in(
268 ctx.allocator(),
269 array.dtype(),
270 array.len(),
271 ));
272 }
273 let (parent, child) = unsafe { array.take_slot_unchecked(i) }?;
274 ctx.log(format_args!(
275 "AppendChild({i}): appending {} into builder",
276 child
277 ));
278 // TODO(joe)[7674]: replace with a builder kernel registry so we don't
279 // need to go through the VTable append_to_builder indirection.
280 child.append_to_builder(
281 current_builder
282 .as_deref_mut()
283 .vortex_expect("builder must exist"),
284 ctx,
285 )?;
286 current_array = parent;
287 }
288 ExecutionStep::Done => {
289 ctx.log(format_args!("Done: {}", array));
290 (current_array, current_builder) = finalize_done(
291 array,
292 current_builder,
293 expected_len,
294 expected_dtype,
295 stats,
296 encoding_id,
297 )?;
298 }
299 }
300 }
301
302 vortex_bail!(
303 "Exceeded maximum execution iterations ({}) while executing array",
304 max_iterations,
305 )
306 }
307}
308
309struct StackFrame {
310 parent_array: ArrayRef,
311 parent_builder: Option<Box<dyn ArrayBuilder>>,
312 slot_idx: usize,
313 done: DonePredicate,
314 original_dtype: DType,
315 original_len: usize,
316}
317
318/// Execution context for batch CPU compute.
319#[derive(Debug, Clone)]
320pub struct ExecutionCtx {
321 session: VortexSession,
322 execute_parent_kernels: Arc<ParentExecutionKernels>,
323 #[cfg(debug_assertions)]
324 id: usize,
325 #[cfg(debug_assertions)]
326 ops: Vec<String>,
327}
328
329impl ExecutionCtx {
330 /// Create a new execution context with the given session.
331 ///
332 /// This captures a snapshot of the session's execute-parent kernel registry. Kernels
333 /// registered after this context is created are not visible to it; create a new
334 /// [`ExecutionCtx`] after registration to use newly registered kernels.
335 pub fn new(session: VortexSession) -> Self {
336 let execute_parent_kernels = session.kernels().execute_parent_snapshot();
337 Self {
338 session,
339 execute_parent_kernels,
340 #[cfg(debug_assertions)]
341 id: {
342 static EXEC_CTX_ID: AtomicUsize = AtomicUsize::new(0);
343 EXEC_CTX_ID.fetch_add(1, Ordering::Relaxed)
344 },
345 #[cfg(debug_assertions)]
346 ops: Vec::new(),
347 }
348 }
349
350 /// Get the session associated with this execution context.
351 pub fn session(&self) -> &VortexSession {
352 &self.session
353 }
354
355 /// Get the session-scoped host allocator for this execution context.
356 pub fn allocator(&self) -> HostAllocatorRef {
357 self.session.allocator()
358 }
359
360 /// Log an execution step at the current depth.
361 ///
362 /// Steps are accumulated and dumped as a single trace on Drop at DEBUG level.
363 /// Individual steps are also logged at TRACE level for real-time following.
364 ///
365 /// Use the [`format_args!`] macro to create the `msg` argument.
366 pub fn log(&mut self, msg: fmt::Arguments<'_>) {
367 #[cfg(debug_assertions)]
368 if tracing::enabled!(tracing::Level::TRACE) {
369 let formatted = format!(" - {msg}");
370 tracing::trace!("exec[{}]: {formatted}", self.id);
371 self.ops.push(formatted);
372 }
373 let _ = msg;
374 }
375}
376
377impl Display for ExecutionCtx {
378 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
379 #[cfg(debug_assertions)]
380 return write!(f, "exec[{}]", self.id);
381 #[cfg(not(debug_assertions))]
382 write!(f, "exec")
383 }
384}
385
386#[cfg(debug_assertions)]
387impl Drop for ExecutionCtx {
388 fn drop(&mut self) {
389 if !self.ops.is_empty() && tracing::enabled!(tracing::Level::DEBUG) {
390 // Unlike itertools `.format()` (panics in 0.14 on second format)
391 struct FmtOps<'a>(&'a [String]);
392 impl Display for FmtOps<'_> {
393 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
394 for (i, op) in self.0.iter().enumerate() {
395 if i > 0 {
396 f.write_str("\n")?;
397 }
398 f.write_str(op)?;
399 }
400 Ok(())
401 }
402 }
403 tracing::debug!("exec[{}] trace:\n{}", self.id, FmtOps(&self.ops));
404 }
405 }
406}
407
408/// Single-step execution: takes one step toward canonical form.
409///
410/// Steps through reduce, reduce_parent, execute_parent, then execute. For `ExecuteSlot`,
411/// only a single child execution step is performed — the child is executed once and put back,
412/// making this a lightweight, bounded operation.
413///
414/// **However**, if `execute_step` returns [`ExecutionStep::AppendChild`], this implementation
415/// drives the *entire* array to completion via [`execute_into_builder`] in a single call.
416/// This can do substantially more work than a normal step because it creates a builder and
417/// fully decodes the array into that builder before returning. Callers should be aware that a
418/// single `.execute::<ArrayRef>(ctx)` call may perform O(n_children * decode_cost) work when
419/// `AppendChild` is returned.
420impl Executable for ArrayRef {
421 fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self> {
422 if let Some(canonical) = array.as_opt::<AnyCanonical>() {
423 ctx.log(format_args!("-> canonical {}", array));
424 return Ok(Canonical::from(canonical).into_array());
425 }
426
427 if let Some(reduced) = array.reduce()? {
428 ctx.log(format_args!("reduce: rewrote {} -> {}", array, reduced));
429 reduced.statistics().inherit_from(array.statistics());
430 return Ok(reduced);
431 }
432
433 for (slot_idx, slot) in array.slots().iter().enumerate() {
434 let Some(child) = slot else { continue };
435 if let Some(reduced_parent) = child.reduce_parent(&array, slot_idx)? {
436 ctx.log(format_args!(
437 "reduce_parent: slot[{}]({}) rewrote {} -> {}",
438 slot_idx,
439 child.encoding_id(),
440 array,
441 reduced_parent
442 ));
443 reduced_parent.statistics().inherit_from(array.statistics());
444 return Ok(reduced_parent);
445 }
446 }
447
448 let execute_parent_kernels = Arc::clone(&ctx.execute_parent_kernels);
449 let kernels = execute_parent_kernels.as_ref();
450
451 for (slot_idx, slot) in array.slots().iter().enumerate() {
452 let Some(child) = slot else { continue };
453 if let Some(executed_parent) =
454 execute_parent_for_child(&array, child, slot_idx, kernels, ctx)?
455 {
456 ctx.log(format_args!(
457 "execute_parent: slot[{}]({}) rewrote {} -> {}",
458 slot_idx,
459 child.encoding_id(),
460 array,
461 executed_parent
462 ));
463 executed_parent
464 .statistics()
465 .inherit_from(array.statistics());
466 return Ok(executed_parent);
467 }
468 }
469
470 ctx.log(format_args!("executing {}", array));
471 let result = array.execute_encoding(ctx)?;
472 let (array, step) = result.into_parts();
473 match step {
474 ExecutionStep::Done => {
475 ctx.log(format_args!("-> {}", array));
476 Ok(array)
477 }
478 ExecutionStep::ExecuteSlot(i, _) => {
479 let child = array.slots()[i].clone().vortex_expect("valid slot index");
480 let executed_child = child.execute::<ArrayRef>(ctx)?;
481 // SAFETY: execution of a child slot produces a logically equivalent array in a
482 // different physical representation, preserving parent values and statistics.
483 unsafe { array.with_slot(i, executed_child) }
484 }
485 ExecutionStep::AppendChild(_) => {
486 // Single-step: build the entire parent via the builder path.
487 let builder = builder_with_capacity_in(ctx.allocator(), array.dtype(), array.len());
488 let mut builder = execute_into_builder(array, builder, ctx)?;
489 Ok(builder.finish())
490 }
491 }
492 }
493}
494
495/// Execute `array` into the given `builder`.
496///
497/// This uses the encoding's [`crate::array::VTable::append_to_builder`] implementation. Most
498/// encodings use the default path of `execute::<Canonical>` followed by `builder.extend_from_array`,
499/// while encodings like `Chunked` can override that to append child-by-child without materializing
500/// the entire parent.
501///
502/// The builder must have a [`DType`] that is a nullability-superset of `array.dtype()`.
503pub fn execute_into_builder(
504 array: ArrayRef,
505 mut builder: Box<dyn ArrayBuilder>,
506 ctx: &mut ExecutionCtx,
507) -> VortexResult<Box<dyn ArrayBuilder>> {
508 array.append_to_builder(builder.as_mut(), ctx)?;
509 Ok(builder)
510}
511
512/// Pop a stack frame, restoring the parent with the finished child in its slot.
513fn pop_frame(
514 frame: StackFrame,
515 child: ArrayRef,
516) -> VortexResult<(ArrayRef, Option<Box<dyn ArrayBuilder>>)> {
517 debug_assert_eq!(
518 child.dtype(),
519 &frame.original_dtype,
520 "child dtype changed during execution"
521 );
522 debug_assert_eq!(
523 child.len(),
524 frame.original_len,
525 "child len changed during execution"
526 );
527 let parent_array = unsafe { frame.parent_array.put_slot_unchecked(frame.slot_idx, child) }?;
528 Ok((parent_array, frame.parent_builder))
529}
530
531fn finalize_done(
532 result: ArrayRef,
533 mut builder: Option<Box<dyn ArrayBuilder>>,
534 expected_len: usize,
535 expected_dtype: DType,
536 stats: ArrayStats,
537 encoding_id: ArrayId,
538) -> VortexResult<(ArrayRef, Option<Box<dyn ArrayBuilder>>)> {
539 let output = if let Some(mut builder) = builder.take() {
540 builder.finish()
541 } else {
542 result
543 };
544
545 if cfg!(debug_assertions) {
546 vortex_ensure!(
547 output.len() == expected_len,
548 "Result length mismatch for {:?}",
549 encoding_id
550 );
551 vortex_ensure!(
552 output.dtype() == &expected_dtype,
553 "Executed canonical dtype mismatch for {:?}",
554 encoding_id
555 );
556 }
557
558 output
559 .statistics()
560 .set_iter(StatsSet::from(stats).into_iter());
561 Ok((output, None))
562}
563
564fn execute_parent_for_child(
565 parent: &ArrayRef,
566 child: &ArrayRef,
567 slot_idx: usize,
568 kernels: &ParentExecutionKernels,
569 ctx: &mut ExecutionCtx,
570) -> VortexResult<Option<ArrayRef>> {
571 let key = execute_parent_key(parent.encoding_id(), child.encoding_id());
572 if let Some(plugins) = kernels.get(&key) {
573 for plugin in plugins.as_ref() {
574 if let Some(result) = plugin.execute_parent(child, parent, slot_idx, ctx)? {
575 if cfg!(debug_assertions) {
576 vortex_ensure!(
577 result.len() == parent.len(),
578 "Executed parent canonical length mismatch"
579 );
580 vortex_ensure!(
581 result.dtype() == parent.dtype(),
582 "Executed parent canonical dtype mismatch"
583 );
584 }
585 return Ok(Some(result));
586 }
587 }
588 }
589
590 Ok(None)
591}
592
593/// Try execute_parent on each occupied slot of the array.
594fn try_execute_parent(
595 array: &ArrayRef,
596 kernels: &ParentExecutionKernels,
597 ctx: &mut ExecutionCtx,
598) -> VortexResult<Option<ArrayRef>> {
599 for (slot_idx, slot) in array.slots().iter().enumerate() {
600 let Some(child) = slot else { continue };
601 if let Some(executed_parent) =
602 execute_parent_for_child(array, child, slot_idx, kernels, ctx)?
603 {
604 ctx.log(format_args!(
605 "execute_parent: slot[{}]({}) rewrote {} -> {}",
606 slot_idx,
607 child.encoding_id(),
608 array,
609 executed_parent
610 ));
611 executed_parent
612 .statistics()
613 .inherit_from(array.statistics());
614 return Ok(Some(executed_parent));
615 }
616 }
617 Ok(None)
618}
619
620/// A predicate that determines when an array has reached a desired form during execution.
621pub type DonePredicate = fn(&ArrayRef) -> bool;
622
623/// Scheduler step indicator returned alongside an array in [`ExecutionResult`].
624///
625/// Instead of recursively executing children, encodings return an `ExecutionStep` that tells the
626/// scheduler what to do next. This enables the scheduler to manage execution iteratively using
627/// an explicit work stack plus an optional builder.
628///
629/// # Semantics
630///
631/// Each variant describes a different execution strategy with distinct cost profiles:
632///
633/// - [`Done`](ExecutionStep::Done): The current activation has finished its work. If no builder
634/// is active, the returned array is the result. If a builder is active, the scheduler ignores
635/// the placeholder array and finishes the builder instead. The scheduler may continue
636/// executing if the target form (e.g. canonical) has not yet been reached.
637///
638/// - [`ExecuteSlot`](ExecutionStep::ExecuteSlot): The encoding needs one of its children
639/// decoded before it can make further progress. The scheduler detaches that child, pushes
640/// the parent onto the explicit stack, executes the child until the [`DonePredicate`]
641/// matches, puts it back, and re-enters the parent. This is a cooperative yield: the
642/// encoding does a bounded amount of work per step while the loop tracks the parent-child
643/// relationship explicitly.
644///
645/// - [`AppendChild`](ExecutionStep::AppendChild): The encoding needs one child executed to
646/// canonical form and then appended into a builder owned by the current activation. The
647/// scheduler detaches that child, lazily creates `current_builder` if needed, appends the
648/// child into it, and keeps the parent as `current_array` for the next iteration. While the
649/// builder is active, parent-kernel rewrites are skipped because the parent is partially
650/// consumed. **Important:** in the single-step executor ([`Executable`] for [`ArrayRef`]),
651/// returning `AppendChild` still causes the executor to drive the *entire* array to
652/// completion via [`execute_into_builder`] in one call — this can do significantly more
653/// work than a single `ExecuteSlot` step.
654pub enum ExecutionStep {
655 /// Request that the scheduler execute the slot at the given index, using the provided
656 /// [`DonePredicate`] to determine when the slot is "done", then replace the slot in this
657 /// array and re-enter execution.
658 ///
659 /// Use [`ExecutionResult::execute_slot`] instead of constructing this variant directly.
660 ExecuteSlot(usize, DonePredicate),
661
662 /// Detach the slot at the given index, append that child into the current activation's
663 /// canonical builder, and keep the returned parent as `current_array`.
664 ///
665 /// `Done` finalizes that builder and turns it into the result of the activation.
666 ///
667 /// **Note:** In the single-step executor ([`Executable`] for [`ArrayRef`]), this variant
668 /// drives the entire parent to completion in one call via [`execute_into_builder`], which
669 /// may perform substantially more work than a single `ExecuteSlot` step.
670 AppendChild(usize),
671
672 /// Execution is complete. If no builder is active, the array in the accompanying
673 /// [`ExecutionResult`] is the result. Otherwise, the scheduler finalizes the active
674 /// builder and uses that finished array instead.
675 ///
676 /// The scheduler will continue executing if it has not yet reached the target form.
677 Done,
678}
679
680impl fmt::Debug for ExecutionStep {
681 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
682 match self {
683 ExecutionStep::ExecuteSlot(idx, _) => f.debug_tuple("ExecuteSlot").field(idx).finish(),
684 ExecutionStep::AppendChild(idx) => f.debug_tuple("AppendChild").field(idx).finish(),
685 ExecutionStep::Done => write!(f, "Done"),
686 }
687 }
688}
689
690/// The result of a single execution step on an array encoding.
691///
692/// Combines an [`ArrayRef`] with an [`ExecutionStep`] to tell the scheduler both what to do next
693/// and what array to work with.
694pub struct ExecutionResult {
695 array: ArrayRef,
696 step: ExecutionStep,
697}
698
699impl ExecutionResult {
700 /// Signal that execution is complete with the given result array.
701 pub fn done(result: impl IntoArray) -> Self {
702 Self {
703 array: result.into_array(),
704 step: ExecutionStep::Done,
705 }
706 }
707
708 /// Request execution of slot at `slot_idx` until it matches the given [`Matcher`].
709 ///
710 /// The provided array is the (possibly modified) parent that still needs its slot executed.
711 pub fn execute_slot<M: Matcher>(array: impl IntoArray, slot_idx: usize) -> Self {
712 Self {
713 array: array.into_array(),
714 step: ExecutionStep::ExecuteSlot(slot_idx, M::matches),
715 }
716 }
717
718 /// Request that the child slot at `slot_idx` be detached, appended into the current
719 /// activation's canonical builder, and leave the returned parent as the next
720 /// `current_array`.
721 pub fn append_child(array: impl IntoArray, slot_idx: usize) -> Self {
722 Self {
723 array: array.into_array(),
724 step: ExecutionStep::AppendChild(slot_idx),
725 }
726 }
727
728 /// Returns a reference to the array.
729 pub fn array(&self) -> &ArrayRef {
730 &self.array
731 }
732
733 /// Returns a reference to the step.
734 pub fn step(&self) -> &ExecutionStep {
735 &self.step
736 }
737
738 /// Decompose into parts.
739 pub fn into_parts(self) -> (ArrayRef, ExecutionStep) {
740 (self.array, self.step)
741 }
742}
743
744impl fmt::Debug for ExecutionResult {
745 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
746 f.debug_struct("ExecutionResult")
747 .field("array", &self.array)
748 .field("step", &self.step)
749 .finish()
750 }
751}
752
753/// Require that a child array matches `$M`. If the child already matches, returns the same
754/// array unchanged. Otherwise, early-returns an [`ExecutionResult`] requesting execution of
755/// child `$idx` until it matches `$M`.
756///
757/// ```ignore
758/// let array = require_child!(array, array.codes(), 0 => Primitive);
759/// let array = require_child!(array, array.values(), 1 => AnyCanonical);
760/// ```
761#[macro_export]
762macro_rules! require_child {
763 ($parent:expr, $child:expr, $idx:expr => $M:ty) => {{
764 if !$child.is::<$M>() {
765 return Ok($crate::ExecutionResult::execute_slot::<$M>(
766 $parent.clone(),
767 $idx,
768 ));
769 }
770 $parent
771 }};
772}
773
774/// Like [`require_child!`], but for optional children. If the child is `None`, this is a no-op.
775/// If the child is `Some` but does not match `$M`, early-returns an [`ExecutionResult`] requesting
776/// execution of child `$idx`.
777///
778/// Unlike `require_child!`, this is a statement macro (no value produced) and does not clone
779/// `$parent` - it is moved into the early-return path.
780///
781/// ```ignore
782/// require_opt_child!(array, array.patches().map(|p| p.indices()), 1 => Primitive);
783/// ```
784#[macro_export]
785macro_rules! require_opt_child {
786 ($parent:expr, $child_opt:expr, $idx:expr => $M:ty) => {
787 if $child_opt.is_some_and(|child| !child.is::<$M>()) {
788 return Ok($crate::ExecutionResult::execute_slot::<$M>($parent, $idx));
789 }
790 };
791}
792
793/// Require that patch slots (indices, values, and optionally chunk_offsets) are `Primitive`.
794/// If no patches are present (slots are `None`), this is a no-op.
795///
796/// Like [`require_opt_child!`], `$parent` is moved (not cloned) into the early-return path.
797///
798/// ```ignore
799/// require_patches!(array, PATCH_INDICES_SLOT, PATCH_VALUES_SLOT, PATCH_CHUNK_OFFSETS_SLOT);
800/// ```
801#[macro_export]
802macro_rules! require_patches {
803 ($parent:expr, $indices_slot:expr, $values_slot:expr, $chunk_offsets_slot:expr) => {
804 $crate::require_opt_child!(
805 $parent,
806 $parent.slots()[$indices_slot].as_ref(),
807 $indices_slot => $crate::arrays::Primitive
808 );
809 $crate::require_opt_child!(
810 $parent,
811 $parent.slots()[$values_slot].as_ref(),
812 $values_slot => $crate::arrays::Primitive
813 );
814 $crate::require_opt_child!(
815 $parent,
816 $parent.slots()[$chunk_offsets_slot].as_ref(),
817 $chunk_offsets_slot => $crate::arrays::Primitive
818 );
819 };
820}
821
822/// Require that the validity slot is a [`Bool`](crate::arrays::Bool) array. If validity is not
823/// array-backed (e.g. `NonNullable` or `AllValid`), this is a no-op. If it is array-backed but
824/// not `Bool`, early-returns an [`ExecutionResult`] requesting execution of the validity slot.
825///
826/// Like [`require_opt_child!`], `$parent` is moved (not cloned) into the early-return path.
827///
828/// ```ignore
829/// require_validity!(array, VALIDITY_SLOT);
830/// ```
831#[macro_export]
832macro_rules! require_validity {
833 ($parent:expr, $idx:expr) => {
834 $crate::require_opt_child!(
835 $parent,
836 $parent.slots()[$idx].as_ref(),
837 $idx => $crate::arrays::Bool
838 );
839 };
840}
841
842/// Extension trait for creating an execution context from a session.
843pub trait VortexSessionExecute {
844 /// Create a new execution context from this session.
845 fn create_execution_ctx(&self) -> ExecutionCtx;
846}
847
848impl VortexSessionExecute for VortexSession {
849 fn create_execution_ctx(&self) -> ExecutionCtx {
850 ExecutionCtx::new(self.clone())
851 }
852}
853
854#[cfg(test)]
855mod tests {
856 use vortex_session::VortexSession;
857
858 use super::*;
859 use crate::VTable as _;
860 use crate::VortexSessionExecute;
861 use crate::arrays::Bool;
862 use crate::arrays::Primitive;
863 use crate::optimizer::kernels::ExecuteParentFn;
864 use crate::optimizer::kernels::KernelSession;
865 use crate::optimizer::kernels::execute_parent_key;
866
867 fn noop_execute_parent(
868 _child: &ArrayRef,
869 _parent: &ArrayRef,
870 _child_idx: usize,
871 _ctx: &mut ExecutionCtx,
872 ) -> VortexResult<Option<ArrayRef>> {
873 Ok(None)
874 }
875
876 #[test]
877 fn execution_ctx_snapshots_execute_parent_kernels_at_creation() {
878 let session = VortexSession::empty().with_some(KernelSession::empty());
879 let key = execute_parent_key(Bool.id(), Primitive.id());
880
881 let before_registration = session.create_execution_ctx();
882 assert!(
883 !before_registration
884 .execute_parent_kernels
885 .contains_key(&key)
886 );
887
888 let kernels = session.kernels();
889 kernels.register_execute_parent(
890 Bool.id(),
891 Primitive.id(),
892 &[noop_execute_parent as ExecuteParentFn],
893 );
894
895 assert!(
896 !before_registration
897 .execute_parent_kernels
898 .contains_key(&key)
899 );
900
901 let after_registration = session.create_execution_ctx();
902 assert!(after_registration.execute_parent_kernels.contains_key(&key));
903 }
904}