vortex_array/executor.rs
1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Iterative array execution.
5//!
6//! The single-step [`Executable`] implementation for [`ArrayRef`] tries `reduce`,
7//! `reduce_parent`, `execute_parent`, then `execute` once. The matcher-driven
8//! [`ArrayRef::execute_until`] loop interprets [`ExecutionStep::ExecuteSlot`],
9//! [`ExecutionStep::AppendChild`], and [`ExecutionStep::Done`] using an explicit stack plus an
10//! optional builder, so encodings can advance without recursive descent.
11//!
12//! See <https://docs.vortex.dev/developer-guide/internals/execution> for the full execution
13//! narrative, diagrams, and walkthroughs.
14
15use std::env::VarError;
16use std::fmt;
17use std::fmt::Display;
18use std::sync::LazyLock;
19#[cfg(debug_assertions)]
20use std::sync::atomic::AtomicUsize;
21#[cfg(debug_assertions)]
22use std::sync::atomic::Ordering;
23
24use vortex_error::VortexExpect;
25use vortex_error::VortexResult;
26use vortex_error::vortex_bail;
27use vortex_error::vortex_ensure;
28use vortex_error::vortex_panic;
29use vortex_session::Ref;
30use vortex_session::SessionExt;
31use vortex_session::VortexSession;
32
33use crate::AnyCanonical;
34use crate::ArrayRef;
35use crate::Canonical;
36use crate::IntoArray;
37use crate::array::ArrayId;
38use crate::builders::ArrayBuilder;
39use crate::builders::builder_with_capacity_in;
40use crate::dtype::DType;
41use crate::matcher::Matcher;
42use crate::memory::HostAllocatorRef;
43use crate::memory::MemorySessionExt;
44use crate::optimizer::ArrayOptimizer;
45use crate::optimizer::kernels::ArrayKernels;
46use crate::stats::ArrayStats;
47use crate::stats::StatsSet;
48
49/// Returns the maximum number of iterations to attempt when executing an array before giving up and returning
50/// an error, can be by the `VORTEX_MAX_ITERATIONS` env variables, otherwise defaults to 2^22.
51pub(crate) fn max_iterations() -> usize {
52 static MAX_ITERATIONS: LazyLock<usize> =
53 LazyLock::new(|| match std::env::var("VORTEX_MAX_ITERATIONS") {
54 Ok(val) => val.parse::<usize>().unwrap_or_else(|e| {
55 vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid usize: {e}")
56 }),
57 Err(VarError::NotPresent) => 2 << 21, // 2 ^ 22
58 Err(VarError::NotUnicode(_)) => {
59 vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid unicode string")
60 }
61 });
62 *MAX_ITERATIONS
63}
64
65/// Marker trait for types that an [`ArrayRef`] can be executed into.
66///
67/// Implementors must provide an implementation of `execute` that takes
68/// an [`ArrayRef`] and an [`ExecutionCtx`], and produces an instance of the
69/// implementor type.
70///
71/// Users should use the `Array::execute` or `Array::execute_as` methods
72pub trait Executable: Sized {
73 fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self>;
74}
75
76#[expect(clippy::same_name_method)]
77impl ArrayRef {
78 /// Execute this array to produce an instance of `E`.
79 ///
80 /// See the [`Executable`] implementation for details on how this execution is performed.
81 pub fn execute<E: Executable>(self, ctx: &mut ExecutionCtx) -> VortexResult<E> {
82 E::execute(self, ctx)
83 }
84
85 /// Execute this array, labeling the execution step with a name for tracing.
86 pub fn execute_as<E: Executable>(
87 self,
88 _name: &'static str,
89 ctx: &mut ExecutionCtx,
90 ) -> VortexResult<E> {
91 E::execute(self, ctx)
92 }
93
94 /// Iteratively execute this array until the [`Matcher`] matches, using an explicit work
95 /// stack plus an optional builder for `AppendChild`.
96 ///
97 /// Note: the returned array may not match `M`. If execution converges to a canonical form
98 /// that does not match `M`, the canonical array is returned since no further execution
99 /// progress is possible.
100 ///
101 /// For safety, this errors once execution reaches a configurable maximum number of
102 /// iterations (default `2^22`, override with `VORTEX_MAX_ITERATIONS`).
103 ///
104 /// # Loop state
105 ///
106 /// - `current_array: ArrayRef` -- the array currently in focus.
107 /// - `current_builder: Option<Box<dyn ArrayBuilder>>` -- active only for builder-mode
108 /// execution. `AppendChild` appends detached children here. `Done` finishes the builder
109 /// and turns it back into the next `current_array`.
110 /// - `stack: Vec<StackFrame>` -- suspended parents from `ExecuteSlot`, including the
111 /// detached slot index, its [`DonePredicate`], and the parent builder that was active
112 /// before focus moved into the child.
113 ///
114 /// Example after `ExecuteSlot(1, pred)` has focused slot 1 of a parent:
115 ///
116 /// ```text
117 /// stack[top].parent_array:
118 /// RunEnd <-- suspended parent
119 /// +-- slot 0: ends
120 /// +-- slot 1: _ (detached)
121 ///
122 /// current_array:
123 /// DictEncoding <-- focused child
124 /// +-- slot 0: codes
125 /// +-- slot 1: dictionary
126 ///
127 /// current_builder:
128 /// None
129 /// ```
130 ///
131 /// Each loop iteration works like this:
132 ///
133 /// ```text
134 /// loop:
135 /// Step 1: done(current_array)?
136 /// - root activation -> return current_array
137 /// - ExecuteSlot frame -> pop, reattach child, resume parent
138 ///
139 /// Step 2: current_builder active?
140 /// - yes -> skip Step 2a / 2b
141 /// - no -> try parent kernels
142 ///
143 /// Step 2a: current_array.execute_parent(stack.top.parent_array)
144 /// child looks up at the suspended parent from ExecuteSlot
145 ///
146 /// Step 2b: for child in current_array.children():
147 /// child.execute_parent(current_array)
148 /// each child looks up at current_array
149 ///
150 /// Step 3: match current_array.execute()
151 /// ExecuteSlot(i, pred) -> push parent on stack, focus child `i`
152 /// AppendChild(i) -> detach child `i`, append it into current_builder,
153 /// keep parent as current_array
154 /// Done -> finish current_builder if present, else use returned array
155 /// ```
156 ///
157 /// Step 2a and Step 2b are skipped while `current_builder` is active. `AppendChild`
158 /// partially consumes `current_array`: some slots already live in the builder, so a
159 /// parent rewrite would observe inconsistent state and could discard accumulated builder
160 /// data.
161 pub fn execute_until<M: Matcher>(self, ctx: &mut ExecutionCtx) -> VortexResult<ArrayRef> {
162 let mut current_array = self;
163 let mut current_builder: Option<Box<dyn ArrayBuilder>> = None;
164 let mut stack: Vec<StackFrame> = Vec::new();
165 let max_iterations = max_iterations();
166
167 for _ in 0..max_iterations {
168 let is_done = stack
169 .last()
170 .map_or(M::matches as DonePredicate, |frame| frame.done);
171
172 if is_done(¤t_array) || AnyCanonical::matches(¤t_array) {
173 match stack.pop() {
174 None => {
175 debug_assert!(
176 current_builder.is_none(),
177 "root activation should not retain a builder"
178 );
179 ctx.log(format_args!("-> {}", current_array));
180 return Ok(current_array);
181 }
182 Some(frame) => {
183 (current_array, current_builder) = pop_frame(frame, current_array)?;
184 continue;
185 }
186 }
187 }
188
189 // Step 2a: execute_parent against the suspended parent from ExecuteSlot.
190 //
191 // When executing a child for ExecuteSlot, try execute_parent against
192 // the suspended parent on the stack. This lets kernels like RunEnd's
193 // FilterKernel fire before the child is forced to canonical.
194 //
195 // Skip when a builder is active: the current array has been partially
196 // consumed by AppendChild (some slots are already in the builder), so
197 // a parent rewrite would see inconsistent state and the builder data
198 // would be lost when we restore frame.parent_builder.
199 if current_builder.is_none()
200 && let Some(frame) = stack.last()
201 && let Some(result) =
202 current_array.execute_parent(&frame.parent_array, frame.slot_idx, ctx)?
203 {
204 ctx.log(format_args!(
205 "execute_parent (stack) rewrote {} -> {}",
206 current_array, result
207 ));
208 let frame = stack.pop().vortex_expect("just peeked");
209 current_array = result.optimize_ctx(ctx.session())?;
210 current_builder = frame.parent_builder;
211 continue;
212 }
213
214 // Step 2b: execute_parent against current_array's own children.
215 if current_builder.is_none()
216 && let Some(rewritten) = try_execute_parent(¤t_array, ctx)?
217 {
218 ctx.log(format_args!(
219 "execute_parent rewrote {} -> {}",
220 current_array, rewritten
221 ));
222 current_array = rewritten.optimize_ctx(ctx.session())?;
223 continue;
224 }
225
226 // execute step
227 let expected_len = current_array.len();
228 let expected_dtype = current_array.dtype().clone();
229 let stats = current_array.statistics().to_array_stats();
230 let encoding_id = current_array.encoding_id();
231 let result = current_array.execute_encoding_unchecked(ctx)?;
232 let (array, step) = result.into_parts();
233 match step {
234 ExecutionStep::ExecuteSlot(i, done) => {
235 let (parent, child) = unsafe { array.take_slot_unchecked(i) }?;
236 ctx.log(format_args!(
237 "ExecuteSlot({i}): pushing {}, focusing on {}",
238 parent, child
239 ));
240 stack.push(StackFrame {
241 parent_array: parent,
242 parent_builder: current_builder.take(),
243 slot_idx: i,
244 done,
245 original_dtype: child.dtype().clone(),
246 original_len: child.len(),
247 });
248 current_array = child;
249 current_builder = None;
250 }
251 ExecutionStep::AppendChild(i) => {
252 if current_builder.is_none() {
253 current_builder = Some(builder_with_capacity_in(
254 ctx.allocator(),
255 array.dtype(),
256 array.len(),
257 ));
258 }
259 let (parent, child) = unsafe { array.take_slot_unchecked(i) }?;
260 ctx.log(format_args!(
261 "AppendChild({i}): appending {} into builder",
262 child
263 ));
264 // TODO(joe)[7674]: replace with a builder kernel registry so we don't
265 // need to go through the VTable append_to_builder indirection.
266 child.append_to_builder(
267 current_builder
268 .as_deref_mut()
269 .vortex_expect("builder must exist"),
270 ctx,
271 )?;
272 current_array = parent;
273 }
274 ExecutionStep::Done => {
275 ctx.log(format_args!("Done: {}", array));
276 (current_array, current_builder) = finalize_done(
277 array,
278 current_builder,
279 expected_len,
280 expected_dtype,
281 stats,
282 encoding_id,
283 )?;
284 }
285 }
286 }
287
288 vortex_bail!(
289 "Exceeded maximum execution iterations ({}) while executing array",
290 max_iterations,
291 )
292 }
293}
294
295struct StackFrame {
296 parent_array: ArrayRef,
297 parent_builder: Option<Box<dyn ArrayBuilder>>,
298 slot_idx: usize,
299 done: DonePredicate,
300 original_dtype: DType,
301 original_len: usize,
302}
303
304/// Execution context for batch CPU compute.
305#[derive(Debug, Clone)]
306pub struct ExecutionCtx {
307 session: VortexSession,
308 #[cfg(debug_assertions)]
309 id: usize,
310 #[cfg(debug_assertions)]
311 ops: Vec<String>,
312}
313
314impl ExecutionCtx {
315 /// Create a new execution context with the given session.
316 pub fn new(session: VortexSession) -> Self {
317 Self {
318 session,
319 #[cfg(debug_assertions)]
320 id: {
321 static EXEC_CTX_ID: AtomicUsize = AtomicUsize::new(0);
322 EXEC_CTX_ID.fetch_add(1, Ordering::Relaxed)
323 },
324 #[cfg(debug_assertions)]
325 ops: Vec::new(),
326 }
327 }
328
329 /// Get the session associated with this execution context.
330 pub fn session(&self) -> &VortexSession {
331 &self.session
332 }
333
334 /// Get the session-scoped host allocator for this execution context.
335 pub fn allocator(&self) -> HostAllocatorRef {
336 self.session.allocator()
337 }
338
339 /// Log an execution step at the current depth.
340 ///
341 /// Steps are accumulated and dumped as a single trace on Drop at DEBUG level.
342 /// Individual steps are also logged at TRACE level for real-time following.
343 ///
344 /// Use the [`format_args!`] macro to create the `msg` argument.
345 pub fn log(&mut self, msg: fmt::Arguments<'_>) {
346 #[cfg(debug_assertions)]
347 if tracing::enabled!(tracing::Level::DEBUG) {
348 let formatted = format!(" - {msg}");
349 tracing::trace!("exec[{}]: {formatted}", self.id);
350 self.ops.push(formatted);
351 }
352 let _ = msg;
353 }
354}
355
356impl Display for ExecutionCtx {
357 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
358 #[cfg(debug_assertions)]
359 return write!(f, "exec[{}]", self.id);
360 #[cfg(not(debug_assertions))]
361 write!(f, "exec")
362 }
363}
364
365#[cfg(debug_assertions)]
366impl Drop for ExecutionCtx {
367 fn drop(&mut self) {
368 if !self.ops.is_empty() && tracing::enabled!(tracing::Level::DEBUG) {
369 // Unlike itertools `.format()` (panics in 0.14 on second format)
370 struct FmtOps<'a>(&'a [String]);
371 impl Display for FmtOps<'_> {
372 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
373 for (i, op) in self.0.iter().enumerate() {
374 if i > 0 {
375 f.write_str("\n")?;
376 }
377 f.write_str(op)?;
378 }
379 Ok(())
380 }
381 }
382 tracing::debug!("exec[{}] trace:\n{}", self.id, FmtOps(&self.ops));
383 }
384 }
385}
386
387/// Single-step execution: takes one step toward canonical form.
388///
389/// Steps through reduce, reduce_parent, execute_parent, then execute. For `ExecuteSlot`,
390/// only a single child execution step is performed — the child is executed once and put back,
391/// making this a lightweight, bounded operation.
392///
393/// **However**, if `execute_step` returns [`ExecutionStep::AppendChild`], this implementation
394/// drives the *entire* array to completion via [`execute_into_builder`] in a single call.
395/// This can do substantially more work than a normal step because it creates a builder and
396/// fully decodes the array into that builder before returning. Callers should be aware that a
397/// single `.execute::<ArrayRef>(ctx)` call may perform O(n_children * decode_cost) work when
398/// `AppendChild` is returned.
399impl Executable for ArrayRef {
400 fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self> {
401 if let Some(canonical) = array.as_opt::<AnyCanonical>() {
402 ctx.log(format_args!("-> canonical {}", array));
403 return Ok(Canonical::from(canonical).into_array());
404 }
405
406 if let Some(reduced) = array.reduce()? {
407 ctx.log(format_args!("reduce: rewrote {} -> {}", array, reduced));
408 reduced.statistics().inherit_from(array.statistics());
409 return Ok(reduced);
410 }
411
412 for (slot_idx, slot) in array.slots().iter().enumerate() {
413 let Some(child) = slot else { continue };
414 if let Some(reduced_parent) = child.reduce_parent(&array, slot_idx)? {
415 ctx.log(format_args!(
416 "reduce_parent: slot[{}]({}) rewrote {} -> {}",
417 slot_idx,
418 child.encoding_id(),
419 array,
420 reduced_parent
421 ));
422 reduced_parent.statistics().inherit_from(array.statistics());
423 return Ok(reduced_parent);
424 }
425 }
426
427 let tmp_session = ctx.session().clone();
428 let kernels = tmp_session.get_opt::<ArrayKernels>();
429
430 for (slot_idx, slot) in array.slots().iter().enumerate() {
431 let Some(child) = slot else { continue };
432 if let Some(executed_parent) =
433 execute_parent_for_child(&array, child, slot_idx, kernels.as_ref(), ctx)?
434 {
435 ctx.log(format_args!(
436 "execute_parent: slot[{}]({}) rewrote {} -> {}",
437 slot_idx,
438 child.encoding_id(),
439 array,
440 executed_parent
441 ));
442 executed_parent
443 .statistics()
444 .inherit_from(array.statistics());
445 return Ok(executed_parent);
446 }
447 }
448
449 ctx.log(format_args!("executing {}", array));
450 let result = array.execute_encoding(ctx)?;
451 let (array, step) = result.into_parts();
452 match step {
453 ExecutionStep::Done => {
454 ctx.log(format_args!("-> {}", array));
455 Ok(array)
456 }
457 ExecutionStep::ExecuteSlot(i, _) => {
458 let child = array.slots()[i].clone().vortex_expect("valid slot index");
459 let executed_child = child.execute::<ArrayRef>(ctx)?;
460 array.with_slot(i, executed_child)
461 }
462 ExecutionStep::AppendChild(_) => {
463 // Single-step: build the entire parent via the builder path.
464 let builder = builder_with_capacity_in(ctx.allocator(), array.dtype(), array.len());
465 let mut builder = execute_into_builder(array, builder, ctx)?;
466 Ok(builder.finish())
467 }
468 }
469 }
470}
471
472/// Execute `array` into the given `builder`.
473///
474/// This uses the encoding's [`crate::array::VTable::append_to_builder`] implementation. Most
475/// encodings use the default path of `execute::<Canonical>` followed by `builder.extend_from_array`,
476/// while encodings like `Chunked` can override that to append child-by-child without materializing
477/// the entire parent.
478///
479/// The builder must have a [`DType`] that is a nullability-superset of `array.dtype()`.
480pub fn execute_into_builder(
481 array: ArrayRef,
482 mut builder: Box<dyn ArrayBuilder>,
483 ctx: &mut ExecutionCtx,
484) -> VortexResult<Box<dyn ArrayBuilder>> {
485 array.append_to_builder(builder.as_mut(), ctx)?;
486 Ok(builder)
487}
488
489/// Pop a stack frame, restoring the parent with the finished child in its slot.
490fn pop_frame(
491 frame: StackFrame,
492 child: ArrayRef,
493) -> VortexResult<(ArrayRef, Option<Box<dyn ArrayBuilder>>)> {
494 debug_assert_eq!(
495 child.dtype(),
496 &frame.original_dtype,
497 "child dtype changed during execution"
498 );
499 debug_assert_eq!(
500 child.len(),
501 frame.original_len,
502 "child len changed during execution"
503 );
504 let parent_array = unsafe { frame.parent_array.put_slot_unchecked(frame.slot_idx, child) }?;
505 Ok((parent_array, frame.parent_builder))
506}
507
508fn finalize_done(
509 result: ArrayRef,
510 mut builder: Option<Box<dyn ArrayBuilder>>,
511 expected_len: usize,
512 expected_dtype: DType,
513 stats: ArrayStats,
514 encoding_id: ArrayId,
515) -> VortexResult<(ArrayRef, Option<Box<dyn ArrayBuilder>>)> {
516 let output = if let Some(mut builder) = builder.take() {
517 builder.finish()
518 } else {
519 result
520 };
521
522 if cfg!(debug_assertions) {
523 vortex_ensure!(
524 output.len() == expected_len,
525 "Result length mismatch for {:?}",
526 encoding_id
527 );
528 vortex_ensure!(
529 output.dtype() == &expected_dtype,
530 "Executed canonical dtype mismatch for {:?}",
531 encoding_id
532 );
533 }
534
535 output
536 .statistics()
537 .set_iter(StatsSet::from(stats).into_iter());
538 Ok((output, None))
539}
540
541fn execute_parent_for_child(
542 parent: &ArrayRef,
543 child: &ArrayRef,
544 slot_idx: usize,
545 kernels: Option<&Ref<ArrayKernels>>,
546 ctx: &mut ExecutionCtx,
547) -> VortexResult<Option<ArrayRef>> {
548 if let Some(kernels) = kernels
549 && let Some(plugins) =
550 kernels.find_execute_parent(parent.encoding_id(), child.encoding_id())
551 {
552 for plugin in plugins.as_ref() {
553 if let Some(result) = plugin(child, parent, slot_idx, ctx)? {
554 return Ok(Some(result));
555 }
556 }
557 }
558
559 child.execute_parent(parent, slot_idx, ctx)
560}
561
562/// Try execute_parent on each occupied slot of the array.
563fn try_execute_parent(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Option<ArrayRef>> {
564 let tmp_session = ctx.session().clone();
565 let kernels = tmp_session.get_opt::<ArrayKernels>();
566
567 for (slot_idx, slot) in array.slots().iter().enumerate() {
568 let Some(child) = slot else { continue };
569 if let Some(executed_parent) =
570 execute_parent_for_child(array, child, slot_idx, kernels.as_ref(), ctx)?
571 {
572 ctx.log(format_args!(
573 "execute_parent: slot[{}]({}) rewrote {} -> {}",
574 slot_idx,
575 child.encoding_id(),
576 array,
577 executed_parent
578 ));
579 executed_parent
580 .statistics()
581 .inherit_from(array.statistics());
582 return Ok(Some(executed_parent));
583 }
584 }
585 Ok(None)
586}
587
588/// A predicate that determines when an array has reached a desired form during execution.
589pub type DonePredicate = fn(&ArrayRef) -> bool;
590
591/// Scheduler step indicator returned alongside an array in [`ExecutionResult`].
592///
593/// Instead of recursively executing children, encodings return an `ExecutionStep` that tells the
594/// scheduler what to do next. This enables the scheduler to manage execution iteratively using
595/// an explicit work stack plus an optional builder.
596///
597/// # Semantics
598///
599/// Each variant describes a different execution strategy with distinct cost profiles:
600///
601/// - [`Done`](ExecutionStep::Done): The current activation has finished its work. If no builder
602/// is active, the returned array is the result. If a builder is active, the scheduler ignores
603/// the placeholder array and finishes the builder instead. The scheduler may continue
604/// executing if the target form (e.g. canonical) has not yet been reached.
605///
606/// - [`ExecuteSlot`](ExecutionStep::ExecuteSlot): The encoding needs one of its children
607/// decoded before it can make further progress. The scheduler detaches that child, pushes
608/// the parent onto the explicit stack, executes the child until the [`DonePredicate`]
609/// matches, puts it back, and re-enters the parent. This is a cooperative yield: the
610/// encoding does a bounded amount of work per step while the loop tracks the parent-child
611/// relationship explicitly.
612///
613/// - [`AppendChild`](ExecutionStep::AppendChild): The encoding needs one child executed to
614/// canonical form and then appended into a builder owned by the current activation. The
615/// scheduler detaches that child, lazily creates `current_builder` if needed, appends the
616/// child into it, and keeps the parent as `current_array` for the next iteration. While the
617/// builder is active, parent-kernel rewrites are skipped because the parent is partially
618/// consumed. **Important:** in the single-step executor ([`Executable`] for [`ArrayRef`]),
619/// returning `AppendChild` still causes the executor to drive the *entire* array to
620/// completion via [`execute_into_builder`] in one call — this can do significantly more
621/// work than a single `ExecuteSlot` step.
622pub enum ExecutionStep {
623 /// Request that the scheduler execute the slot at the given index, using the provided
624 /// [`DonePredicate`] to determine when the slot is "done", then replace the slot in this
625 /// array and re-enter execution.
626 ///
627 /// Use [`ExecutionResult::execute_slot`] instead of constructing this variant directly.
628 ExecuteSlot(usize, DonePredicate),
629
630 /// Detach the slot at the given index, append that child into the current activation's
631 /// canonical builder, and keep the returned parent as `current_array`.
632 ///
633 /// `Done` finalizes that builder and turns it into the result of the activation.
634 ///
635 /// **Note:** In the single-step executor ([`Executable`] for [`ArrayRef`]), this variant
636 /// drives the entire parent to completion in one call via [`execute_into_builder`], which
637 /// may perform substantially more work than a single `ExecuteSlot` step.
638 AppendChild(usize),
639
640 /// Execution is complete. If no builder is active, the array in the accompanying
641 /// [`ExecutionResult`] is the result. Otherwise, the scheduler finalizes the active
642 /// builder and uses that finished array instead.
643 ///
644 /// The scheduler will continue executing if it has not yet reached the target form.
645 Done,
646}
647
648impl fmt::Debug for ExecutionStep {
649 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
650 match self {
651 ExecutionStep::ExecuteSlot(idx, _) => f.debug_tuple("ExecuteSlot").field(idx).finish(),
652 ExecutionStep::AppendChild(idx) => f.debug_tuple("AppendChild").field(idx).finish(),
653 ExecutionStep::Done => write!(f, "Done"),
654 }
655 }
656}
657
658/// The result of a single execution step on an array encoding.
659///
660/// Combines an [`ArrayRef`] with an [`ExecutionStep`] to tell the scheduler both what to do next
661/// and what array to work with.
662pub struct ExecutionResult {
663 array: ArrayRef,
664 step: ExecutionStep,
665}
666
667impl ExecutionResult {
668 /// Signal that execution is complete with the given result array.
669 pub fn done(result: impl IntoArray) -> Self {
670 Self {
671 array: result.into_array(),
672 step: ExecutionStep::Done,
673 }
674 }
675
676 /// Request execution of slot at `slot_idx` until it matches the given [`Matcher`].
677 ///
678 /// The provided array is the (possibly modified) parent that still needs its slot executed.
679 pub fn execute_slot<M: Matcher>(array: impl IntoArray, slot_idx: usize) -> Self {
680 Self {
681 array: array.into_array(),
682 step: ExecutionStep::ExecuteSlot(slot_idx, M::matches),
683 }
684 }
685
686 /// Request that the child slot at `slot_idx` be detached, appended into the current
687 /// activation's canonical builder, and leave the returned parent as the next
688 /// `current_array`.
689 pub fn append_child(array: impl IntoArray, slot_idx: usize) -> Self {
690 Self {
691 array: array.into_array(),
692 step: ExecutionStep::AppendChild(slot_idx),
693 }
694 }
695
696 /// Returns a reference to the array.
697 pub fn array(&self) -> &ArrayRef {
698 &self.array
699 }
700
701 /// Returns a reference to the step.
702 pub fn step(&self) -> &ExecutionStep {
703 &self.step
704 }
705
706 /// Decompose into parts.
707 pub fn into_parts(self) -> (ArrayRef, ExecutionStep) {
708 (self.array, self.step)
709 }
710}
711
712impl fmt::Debug for ExecutionResult {
713 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
714 f.debug_struct("ExecutionResult")
715 .field("array", &self.array)
716 .field("step", &self.step)
717 .finish()
718 }
719}
720
721/// Require that a child array matches `$M`. If the child already matches, returns the same
722/// array unchanged. Otherwise, early-returns an [`ExecutionResult`] requesting execution of
723/// child `$idx` until it matches `$M`.
724///
725/// ```ignore
726/// let array = require_child!(array, array.codes(), 0 => Primitive);
727/// let array = require_child!(array, array.values(), 1 => AnyCanonical);
728/// ```
729#[macro_export]
730macro_rules! require_child {
731 ($parent:expr, $child:expr, $idx:expr => $M:ty) => {{
732 if !$child.is::<$M>() {
733 return Ok($crate::ExecutionResult::execute_slot::<$M>(
734 $parent.clone(),
735 $idx,
736 ));
737 }
738 $parent
739 }};
740}
741
742/// Like [`require_child!`], but for optional children. If the child is `None`, this is a no-op.
743/// If the child is `Some` but does not match `$M`, early-returns an [`ExecutionResult`] requesting
744/// execution of child `$idx`.
745///
746/// Unlike `require_child!`, this is a statement macro (no value produced) and does not clone
747/// `$parent` - it is moved into the early-return path.
748///
749/// ```ignore
750/// require_opt_child!(array, array.patches().map(|p| p.indices()), 1 => Primitive);
751/// ```
752#[macro_export]
753macro_rules! require_opt_child {
754 ($parent:expr, $child_opt:expr, $idx:expr => $M:ty) => {
755 if $child_opt.is_some_and(|child| !child.is::<$M>()) {
756 return Ok($crate::ExecutionResult::execute_slot::<$M>($parent, $idx));
757 }
758 };
759}
760
761/// Require that patch slots (indices, values, and optionally chunk_offsets) are `Primitive`.
762/// If no patches are present (slots are `None`), this is a no-op.
763///
764/// Like [`require_opt_child!`], `$parent` is moved (not cloned) into the early-return path.
765///
766/// ```ignore
767/// require_patches!(array, PATCH_INDICES_SLOT, PATCH_VALUES_SLOT, PATCH_CHUNK_OFFSETS_SLOT);
768/// ```
769#[macro_export]
770macro_rules! require_patches {
771 ($parent:expr, $indices_slot:expr, $values_slot:expr, $chunk_offsets_slot:expr) => {
772 $crate::require_opt_child!(
773 $parent,
774 $parent.slots()[$indices_slot].as_ref(),
775 $indices_slot => $crate::arrays::Primitive
776 );
777 $crate::require_opt_child!(
778 $parent,
779 $parent.slots()[$values_slot].as_ref(),
780 $values_slot => $crate::arrays::Primitive
781 );
782 $crate::require_opt_child!(
783 $parent,
784 $parent.slots()[$chunk_offsets_slot].as_ref(),
785 $chunk_offsets_slot => $crate::arrays::Primitive
786 );
787 };
788}
789
790/// Require that the validity slot is a [`Bool`](crate::arrays::Bool) array. If validity is not
791/// array-backed (e.g. `NonNullable` or `AllValid`), this is a no-op. If it is array-backed but
792/// not `Bool`, early-returns an [`ExecutionResult`] requesting execution of the validity slot.
793///
794/// Like [`require_opt_child!`], `$parent` is moved (not cloned) into the early-return path.
795///
796/// ```ignore
797/// require_validity!(array, VALIDITY_SLOT);
798/// ```
799#[macro_export]
800macro_rules! require_validity {
801 ($parent:expr, $idx:expr) => {
802 $crate::require_opt_child!(
803 $parent,
804 $parent.slots()[$idx].as_ref(),
805 $idx => $crate::arrays::Bool
806 );
807 };
808}
809
810/// Extension trait for creating an execution context from a session.
811pub trait VortexSessionExecute {
812 /// Create a new execution context from this session.
813 fn create_execution_ctx(&self) -> ExecutionCtx;
814}
815
816impl VortexSessionExecute for VortexSession {
817 fn create_execution_ctx(&self) -> ExecutionCtx {
818 ExecutionCtx::new(self.clone())
819 }
820}