Skip to main content

vortex_array/
executor.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Iterative array execution.
5//!
6//! The single-step [`Executable`] implementation for [`ArrayRef`] tries `reduce`,
7//! `reduce_parent`, `execute_parent`, then `execute` once. The matcher-driven
8//! [`ArrayRef::execute_until`] loop interprets [`ExecutionStep::ExecuteSlot`],
9//! [`ExecutionStep::AppendChild`], and [`ExecutionStep::Done`] using an explicit stack plus an
10//! optional builder, so encodings can advance without recursive descent.
11//!
12//! See <https://docs.vortex.dev/developer-guide/internals/execution> for the full execution
13//! narrative, diagrams, and walkthroughs.
14
15use std::env::VarError;
16use std::fmt;
17use std::fmt::Display;
18use std::sync::Arc;
19use std::sync::LazyLock;
20#[cfg(debug_assertions)]
21use std::sync::atomic::AtomicUsize;
22#[cfg(debug_assertions)]
23use std::sync::atomic::Ordering;
24
25use vortex_error::VortexExpect;
26use vortex_error::VortexResult;
27use vortex_error::vortex_bail;
28use vortex_error::vortex_ensure;
29use vortex_error::vortex_panic;
30use vortex_session::VortexSession;
31
32use crate::AnyCanonical;
33use crate::ArrayRef;
34use crate::Canonical;
35use crate::IntoArray;
36use crate::array::ArrayId;
37use crate::builders::ArrayBuilder;
38use crate::builders::builder_with_capacity_in;
39use crate::dtype::DType;
40use crate::matcher::Matcher;
41use crate::memory::HostAllocatorRef;
42use crate::memory::MemorySessionExt;
43use crate::optimizer::ArrayOptimizer;
44use crate::optimizer::kernels::ArrayKernelsExt;
45use crate::optimizer::kernels::ParentExecutionKernels;
46use crate::optimizer::kernels::execute_parent_key;
47use crate::stats::ArrayStats;
48use crate::stats::StatsSet;
49
50/// Returns the maximum number of iterations to attempt when executing an array before giving up and returning
51/// an error, can be by the `VORTEX_MAX_ITERATIONS` env variables, otherwise defaults to 2^22.
52pub(crate) fn max_iterations() -> usize {
53    static MAX_ITERATIONS: LazyLock<usize> =
54        LazyLock::new(|| match std::env::var("VORTEX_MAX_ITERATIONS") {
55            Ok(val) => val.parse::<usize>().unwrap_or_else(|e| {
56                vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid usize: {e}")
57            }),
58            Err(VarError::NotPresent) => 2 << 21, // 2 ^ 22
59            Err(VarError::NotUnicode(_)) => {
60                vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid unicode string")
61            }
62        });
63    *MAX_ITERATIONS
64}
65
66/// Marker trait for types that an [`ArrayRef`] can be executed into.
67///
68/// Implementors must provide an implementation of `execute` that takes
69/// an [`ArrayRef`] and an [`ExecutionCtx`], and produces an instance of the
70/// implementor type.
71///
72/// Users should use the `Array::execute` or `Array::execute_as` methods
73pub trait Executable: Sized {
74    fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self>;
75}
76
77#[expect(clippy::same_name_method)]
78impl ArrayRef {
79    /// Execute this array to produce an instance of `E`.
80    ///
81    /// See the [`Executable`] implementation for details on how this execution is performed.
82    pub fn execute<E: Executable>(self, ctx: &mut ExecutionCtx) -> VortexResult<E> {
83        E::execute(self, ctx)
84    }
85
86    /// Execute this array, labeling the execution step with a name for tracing.
87    pub fn execute_as<E: Executable>(
88        self,
89        _name: &'static str,
90        ctx: &mut ExecutionCtx,
91    ) -> VortexResult<E> {
92        E::execute(self, ctx)
93    }
94
95    /// Iteratively execute this array until the [`Matcher`] matches, using an explicit work
96    /// stack plus an optional builder for `AppendChild`.
97    ///
98    /// Note: the returned array may not match `M`. If execution converges to a canonical form
99    /// that does not match `M`, the canonical array is returned since no further execution
100    /// progress is possible.
101    ///
102    /// For safety, this errors once execution reaches a configurable maximum number of
103    /// iterations (default `2^22`, override with `VORTEX_MAX_ITERATIONS`).
104    ///
105    /// # Loop state
106    ///
107    /// - `current_array: ArrayRef` -- the array currently in focus.
108    /// - `current_builder: Option<Box<dyn ArrayBuilder>>` -- active only for builder-mode
109    ///   execution. `AppendChild` appends detached children here. `Done` finishes the builder
110    ///   and turns it back into the next `current_array`.
111    /// - `stack: Vec<StackFrame>` -- suspended parents from `ExecuteSlot`, including the
112    ///   detached slot index, its [`DonePredicate`], and the parent builder that was active
113    ///   before focus moved into the child.
114    ///
115    /// Example after `ExecuteSlot(1, pred)` has focused slot 1 of a parent:
116    ///
117    /// ```text
118    ///   stack[top].parent_array:
119    ///     RunEnd                          <-- suspended parent
120    ///     +-- slot 0: ends
121    ///     +-- slot 1: _  (detached)
122    ///
123    ///   current_array:
124    ///     DictEncoding                    <-- focused child
125    ///     +-- slot 0: codes
126    ///     +-- slot 1: dictionary
127    ///
128    ///   current_builder:
129    ///     None
130    /// ```
131    ///
132    /// Each loop iteration works like this:
133    ///
134    /// ```text
135    /// loop:
136    ///   Step 1: done(current_array)?
137    ///     - root activation   -> return current_array
138    ///     - ExecuteSlot frame -> pop, reattach child, resume parent
139    ///
140    ///   Step 2: current_builder active?
141    ///     - yes -> skip Step 2a / 2b
142    ///     - no  -> try parent kernels
143    ///
144    ///   Step 2a: if stack.top exists:
145    ///               parent = stack.top.parent_array
146    ///               child = current_array
147    ///               kernels[(parent.encoding_id(), child.encoding_id())]
148    ///                 .try_execute_parent(child, parent, stack.top.slot_idx)
149    ///
150    ///   Step 2b: for child in current_array.children():
151    ///               parent = current_array
152    ///               kernels[(parent.encoding_id(), child.encoding_id())]
153    ///                 .try_execute_parent(child, parent, child.slot_idx)
154    ///
155    ///   Step 3: match current_array.execute()
156    ///     ExecuteSlot(i, pred) -> push parent on stack, focus child `i`
157    ///     AppendChild(i)       -> detach child `i`, append it into current_builder,
158    ///                             keep parent as current_array
159    ///     Done                 -> finish current_builder if present, else use returned array
160    /// ```
161    ///
162    /// Step 2a and Step 2b are skipped while `current_builder` is active. `AppendChild`
163    /// partially consumes `current_array`: some slots already live in the builder, so a
164    /// parent rewrite would observe inconsistent state and could discard accumulated builder
165    /// data.
166    pub fn execute_until<M: Matcher>(self, ctx: &mut ExecutionCtx) -> VortexResult<ArrayRef> {
167        let mut current_array = self;
168        let mut current_builder: Option<Box<dyn ArrayBuilder>> = None;
169        let mut stack: Vec<StackFrame> = Vec::new();
170        let execute_parent_kernels = Arc::clone(&ctx.execute_parent_kernels);
171        let kernels = execute_parent_kernels.as_ref();
172        let max_iterations = max_iterations();
173
174        for _ in 0..max_iterations {
175            let is_done = stack
176                .last()
177                .map_or(M::matches as DonePredicate, |frame| frame.done);
178
179            if is_done(&current_array) || AnyCanonical::matches(&current_array) {
180                match stack.pop() {
181                    None => {
182                        debug_assert!(
183                            current_builder.is_none(),
184                            "root activation should not retain a builder"
185                        );
186                        ctx.log(format_args!("-> {}", current_array));
187                        return Ok(current_array);
188                    }
189                    Some(frame) => {
190                        (current_array, current_builder) = pop_frame(frame, current_array)?;
191                        continue;
192                    }
193                }
194            }
195
196            // Step 2a: execute_parent against the suspended parent from ExecuteSlot.
197            //
198            // When executing a child for ExecuteSlot, try execute_parent against
199            // the suspended parent on the stack. This lets kernels like RunEnd's
200            // FilterKernel fire before the child is forced to canonical.
201            //
202            // Skip when a builder is active: the current array has been partially
203            // consumed by AppendChild (some slots are already in the builder), so
204            // a parent rewrite would see inconsistent state and the builder data
205            // would be lost when we restore frame.parent_builder.
206            if current_builder.is_none()
207                && let Some(frame) = stack.last()
208                && let Some(result) = {
209                    execute_parent_for_child(
210                        &frame.parent_array,
211                        &current_array,
212                        frame.slot_idx,
213                        kernels,
214                        ctx,
215                    )?
216                }
217            {
218                ctx.log(format_args!(
219                    "execute_parent (stack) rewrote {} -> {}",
220                    current_array, result
221                ));
222                let frame = stack.pop().vortex_expect("just peeked");
223                current_array = result.optimize_ctx(ctx.session())?;
224                current_builder = frame.parent_builder;
225                continue;
226            }
227
228            // Step 2b: execute_parent against current_array's own children.
229            if current_builder.is_none()
230                && let Some(rewritten) = try_execute_parent(&current_array, kernels, ctx)?
231            {
232                ctx.log(format_args!(
233                    "execute_parent rewrote {} -> {}",
234                    current_array, rewritten
235                ));
236                current_array = rewritten.optimize_ctx(ctx.session())?;
237                continue;
238            }
239
240            // execute step
241            let expected_len = current_array.len();
242            let expected_dtype = current_array.dtype().clone();
243            let stats = current_array.statistics().to_array_stats();
244            let encoding_id = current_array.encoding_id();
245            let result = current_array.execute_encoding_unchecked(ctx)?;
246            let (array, step) = result.into_parts();
247            match step {
248                ExecutionStep::ExecuteSlot(i, done) => {
249                    let (parent, child) = unsafe { array.take_slot_unchecked(i) }?;
250                    ctx.log(format_args!(
251                        "ExecuteSlot({i}): pushing {}, focusing on {}",
252                        parent, child
253                    ));
254                    stack.push(StackFrame {
255                        parent_array: parent,
256                        parent_builder: current_builder.take(),
257                        slot_idx: i,
258                        done,
259                        original_dtype: child.dtype().clone(),
260                        original_len: child.len(),
261                    });
262                    current_array = child;
263                    current_builder = None;
264                }
265                ExecutionStep::AppendChild(i) => {
266                    if current_builder.is_none() {
267                        current_builder = Some(builder_with_capacity_in(
268                            ctx.allocator(),
269                            array.dtype(),
270                            array.len(),
271                        ));
272                    }
273                    let (parent, child) = unsafe { array.take_slot_unchecked(i) }?;
274                    ctx.log(format_args!(
275                        "AppendChild({i}): appending {} into builder",
276                        child
277                    ));
278                    // TODO(joe)[7674]: replace with a builder kernel registry so we don't
279                    // need to go through the VTable append_to_builder indirection.
280                    child.append_to_builder(
281                        current_builder
282                            .as_deref_mut()
283                            .vortex_expect("builder must exist"),
284                        ctx,
285                    )?;
286                    current_array = parent;
287                }
288                ExecutionStep::Done => {
289                    ctx.log(format_args!("Done: {}", array));
290                    (current_array, current_builder) = finalize_done(
291                        array,
292                        current_builder,
293                        expected_len,
294                        expected_dtype,
295                        stats,
296                        encoding_id,
297                    )?;
298                }
299            }
300        }
301
302        vortex_bail!(
303            "Exceeded maximum execution iterations ({}) while executing array",
304            max_iterations,
305        )
306    }
307}
308
309struct StackFrame {
310    parent_array: ArrayRef,
311    parent_builder: Option<Box<dyn ArrayBuilder>>,
312    slot_idx: usize,
313    done: DonePredicate,
314    original_dtype: DType,
315    original_len: usize,
316}
317
318/// Execution context for batch CPU compute.
319#[derive(Debug, Clone)]
320pub struct ExecutionCtx {
321    session: VortexSession,
322    execute_parent_kernels: Arc<ParentExecutionKernels>,
323    #[cfg(debug_assertions)]
324    id: usize,
325    #[cfg(debug_assertions)]
326    ops: Vec<String>,
327}
328
329impl ExecutionCtx {
330    /// Create a new execution context with the given session.
331    ///
332    /// This captures a snapshot of the session's execute-parent kernel registry. Kernels
333    /// registered after this context is created are not visible to it; create a new
334    /// [`ExecutionCtx`] after registration to use newly registered kernels.
335    pub fn new(session: VortexSession) -> Self {
336        let execute_parent_kernels = session.kernels().execute_parent_snapshot();
337        Self {
338            session,
339            execute_parent_kernels,
340            #[cfg(debug_assertions)]
341            id: {
342                static EXEC_CTX_ID: AtomicUsize = AtomicUsize::new(0);
343                EXEC_CTX_ID.fetch_add(1, Ordering::Relaxed)
344            },
345            #[cfg(debug_assertions)]
346            ops: Vec::new(),
347        }
348    }
349
350    /// Get the session associated with this execution context.
351    pub fn session(&self) -> &VortexSession {
352        &self.session
353    }
354
355    /// Get the session-scoped host allocator for this execution context.
356    pub fn allocator(&self) -> HostAllocatorRef {
357        self.session.allocator()
358    }
359
360    /// Log an execution step at the current depth.
361    ///
362    /// Steps are accumulated and dumped as a single trace on Drop at DEBUG level.
363    /// Individual steps are also logged at TRACE level for real-time following.
364    ///
365    /// Use the [`format_args!`] macro to create the `msg` argument.
366    pub fn log(&mut self, msg: fmt::Arguments<'_>) {
367        #[cfg(debug_assertions)]
368        if tracing::enabled!(tracing::Level::TRACE) {
369            let formatted = format!(" - {msg}");
370            tracing::trace!("exec[{}]: {formatted}", self.id);
371            self.ops.push(formatted);
372        }
373        let _ = msg;
374    }
375}
376
377impl Display for ExecutionCtx {
378    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
379        #[cfg(debug_assertions)]
380        return write!(f, "exec[{}]", self.id);
381        #[cfg(not(debug_assertions))]
382        write!(f, "exec")
383    }
384}
385
386#[cfg(debug_assertions)]
387impl Drop for ExecutionCtx {
388    fn drop(&mut self) {
389        if !self.ops.is_empty() && tracing::enabled!(tracing::Level::DEBUG) {
390            // Unlike itertools `.format()` (panics in 0.14 on second format)
391            struct FmtOps<'a>(&'a [String]);
392            impl Display for FmtOps<'_> {
393                fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
394                    for (i, op) in self.0.iter().enumerate() {
395                        if i > 0 {
396                            f.write_str("\n")?;
397                        }
398                        f.write_str(op)?;
399                    }
400                    Ok(())
401                }
402            }
403            tracing::debug!("exec[{}] trace:\n{}", self.id, FmtOps(&self.ops));
404        }
405    }
406}
407
408/// Single-step execution: takes one step toward canonical form.
409///
410/// Steps through reduce, reduce_parent, execute_parent, then execute. For `ExecuteSlot`,
411/// only a single child execution step is performed — the child is executed once and put back,
412/// making this a lightweight, bounded operation.
413///
414/// **However**, if `execute_step` returns [`ExecutionStep::AppendChild`], this implementation
415/// drives the *entire* array to completion via [`execute_into_builder`] in a single call.
416/// This can do substantially more work than a normal step because it creates a builder and
417/// fully decodes the array into that builder before returning. Callers should be aware that a
418/// single `.execute::<ArrayRef>(ctx)` call may perform O(n_children * decode_cost) work when
419/// `AppendChild` is returned.
420impl Executable for ArrayRef {
421    fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self> {
422        if let Some(canonical) = array.as_opt::<AnyCanonical>() {
423            ctx.log(format_args!("-> canonical {}", array));
424            return Ok(Canonical::from(canonical).into_array());
425        }
426
427        if let Some(reduced) = array.reduce()? {
428            ctx.log(format_args!("reduce: rewrote {} -> {}", array, reduced));
429            reduced.statistics().inherit_from(array.statistics());
430            return Ok(reduced);
431        }
432
433        for (slot_idx, slot) in array.slots().iter().enumerate() {
434            let Some(child) = slot else { continue };
435            if let Some(reduced_parent) = child.reduce_parent(&array, slot_idx)? {
436                ctx.log(format_args!(
437                    "reduce_parent: slot[{}]({}) rewrote {} -> {}",
438                    slot_idx,
439                    child.encoding_id(),
440                    array,
441                    reduced_parent
442                ));
443                reduced_parent.statistics().inherit_from(array.statistics());
444                return Ok(reduced_parent);
445            }
446        }
447
448        let execute_parent_kernels = Arc::clone(&ctx.execute_parent_kernels);
449        let kernels = execute_parent_kernels.as_ref();
450
451        for (slot_idx, slot) in array.slots().iter().enumerate() {
452            let Some(child) = slot else { continue };
453            if let Some(executed_parent) =
454                execute_parent_for_child(&array, child, slot_idx, kernels, ctx)?
455            {
456                ctx.log(format_args!(
457                    "execute_parent: slot[{}]({}) rewrote {} -> {}",
458                    slot_idx,
459                    child.encoding_id(),
460                    array,
461                    executed_parent
462                ));
463                executed_parent
464                    .statistics()
465                    .inherit_from(array.statistics());
466                return Ok(executed_parent);
467            }
468        }
469
470        ctx.log(format_args!("executing {}", array));
471        let result = array.execute_encoding(ctx)?;
472        let (array, step) = result.into_parts();
473        match step {
474            ExecutionStep::Done => {
475                ctx.log(format_args!("-> {}", array));
476                Ok(array)
477            }
478            ExecutionStep::ExecuteSlot(i, _) => {
479                let child = array.slots()[i].clone().vortex_expect("valid slot index");
480                let executed_child = child.execute::<ArrayRef>(ctx)?;
481                // SAFETY: execution of a child slot produces a logically equivalent array in a
482                // different physical representation, preserving parent values and statistics.
483                unsafe { array.with_slot(i, executed_child) }
484            }
485            ExecutionStep::AppendChild(_) => {
486                // Single-step: build the entire parent via the builder path.
487                let builder = builder_with_capacity_in(ctx.allocator(), array.dtype(), array.len());
488                let mut builder = execute_into_builder(array, builder, ctx)?;
489                Ok(builder.finish())
490            }
491        }
492    }
493}
494
495/// Execute `array` into the given `builder`.
496///
497/// This uses the encoding's [`crate::array::VTable::append_to_builder`] implementation. Most
498/// encodings use the default path of `execute::<Canonical>` followed by `builder.extend_from_array`,
499/// while encodings like `Chunked` can override that to append child-by-child without materializing
500/// the entire parent.
501///
502/// The builder must have a [`DType`] that is a nullability-superset of `array.dtype()`.
503pub fn execute_into_builder(
504    array: ArrayRef,
505    mut builder: Box<dyn ArrayBuilder>,
506    ctx: &mut ExecutionCtx,
507) -> VortexResult<Box<dyn ArrayBuilder>> {
508    array.append_to_builder(builder.as_mut(), ctx)?;
509    Ok(builder)
510}
511
512/// Pop a stack frame, restoring the parent with the finished child in its slot.
513fn pop_frame(
514    frame: StackFrame,
515    child: ArrayRef,
516) -> VortexResult<(ArrayRef, Option<Box<dyn ArrayBuilder>>)> {
517    debug_assert_eq!(
518        child.dtype(),
519        &frame.original_dtype,
520        "child dtype changed during execution"
521    );
522    debug_assert_eq!(
523        child.len(),
524        frame.original_len,
525        "child len changed during execution"
526    );
527    let parent_array = unsafe { frame.parent_array.put_slot_unchecked(frame.slot_idx, child) }?;
528    Ok((parent_array, frame.parent_builder))
529}
530
531fn finalize_done(
532    result: ArrayRef,
533    mut builder: Option<Box<dyn ArrayBuilder>>,
534    expected_len: usize,
535    expected_dtype: DType,
536    stats: ArrayStats,
537    encoding_id: ArrayId,
538) -> VortexResult<(ArrayRef, Option<Box<dyn ArrayBuilder>>)> {
539    let output = if let Some(mut builder) = builder.take() {
540        builder.finish()
541    } else {
542        result
543    };
544
545    if cfg!(debug_assertions) {
546        vortex_ensure!(
547            output.len() == expected_len,
548            "Result length mismatch for {:?}",
549            encoding_id
550        );
551        vortex_ensure!(
552            output.dtype() == &expected_dtype,
553            "Executed canonical dtype mismatch for {:?}",
554            encoding_id
555        );
556    }
557
558    output
559        .statistics()
560        .set_iter(StatsSet::from(stats).into_iter());
561    Ok((output, None))
562}
563
564fn execute_parent_for_child(
565    parent: &ArrayRef,
566    child: &ArrayRef,
567    slot_idx: usize,
568    kernels: &ParentExecutionKernels,
569    ctx: &mut ExecutionCtx,
570) -> VortexResult<Option<ArrayRef>> {
571    let key = execute_parent_key(parent.encoding_id(), child.encoding_id());
572    if let Some(plugins) = kernels.get(&key) {
573        for plugin in plugins.as_ref() {
574            if let Some(result) = plugin.execute_parent(child, parent, slot_idx, ctx)? {
575                if cfg!(debug_assertions) {
576                    vortex_ensure!(
577                        result.len() == parent.len(),
578                        "Executed parent canonical length mismatch"
579                    );
580                    vortex_ensure!(
581                        result.dtype() == parent.dtype(),
582                        "Executed parent canonical dtype mismatch"
583                    );
584                }
585                return Ok(Some(result));
586            }
587        }
588    }
589
590    Ok(None)
591}
592
593/// Try execute_parent on each occupied slot of the array.
594fn try_execute_parent(
595    array: &ArrayRef,
596    kernels: &ParentExecutionKernels,
597    ctx: &mut ExecutionCtx,
598) -> VortexResult<Option<ArrayRef>> {
599    for (slot_idx, slot) in array.slots().iter().enumerate() {
600        let Some(child) = slot else { continue };
601        if let Some(executed_parent) =
602            execute_parent_for_child(array, child, slot_idx, kernels, ctx)?
603        {
604            ctx.log(format_args!(
605                "execute_parent: slot[{}]({}) rewrote {} -> {}",
606                slot_idx,
607                child.encoding_id(),
608                array,
609                executed_parent
610            ));
611            executed_parent
612                .statistics()
613                .inherit_from(array.statistics());
614            return Ok(Some(executed_parent));
615        }
616    }
617    Ok(None)
618}
619
620/// A predicate that determines when an array has reached a desired form during execution.
621pub type DonePredicate = fn(&ArrayRef) -> bool;
622
623/// Scheduler step indicator returned alongside an array in [`ExecutionResult`].
624///
625/// Instead of recursively executing children, encodings return an `ExecutionStep` that tells the
626/// scheduler what to do next. This enables the scheduler to manage execution iteratively using
627/// an explicit work stack plus an optional builder.
628///
629/// # Semantics
630///
631/// Each variant describes a different execution strategy with distinct cost profiles:
632///
633/// - [`Done`](ExecutionStep::Done): The current activation has finished its work. If no builder
634///   is active, the returned array is the result. If a builder is active, the scheduler ignores
635///   the placeholder array and finishes the builder instead. The scheduler may continue
636///   executing if the target form (e.g. canonical) has not yet been reached.
637///
638/// - [`ExecuteSlot`](ExecutionStep::ExecuteSlot): The encoding needs one of its children
639///   decoded before it can make further progress. The scheduler detaches that child, pushes
640///   the parent onto the explicit stack, executes the child until the [`DonePredicate`]
641///   matches, puts it back, and re-enters the parent. This is a cooperative yield: the
642///   encoding does a bounded amount of work per step while the loop tracks the parent-child
643///   relationship explicitly.
644///
645/// - [`AppendChild`](ExecutionStep::AppendChild): The encoding needs one child executed to
646///   canonical form and then appended into a builder owned by the current activation. The
647///   scheduler detaches that child, lazily creates `current_builder` if needed, appends the
648///   child into it, and keeps the parent as `current_array` for the next iteration. While the
649///   builder is active, parent-kernel rewrites are skipped because the parent is partially
650///   consumed. **Important:** in the single-step executor ([`Executable`] for [`ArrayRef`]),
651///   returning `AppendChild` still causes the executor to drive the *entire* array to
652///   completion via [`execute_into_builder`] in one call — this can do significantly more
653///   work than a single `ExecuteSlot` step.
654pub enum ExecutionStep {
655    /// Request that the scheduler execute the slot at the given index, using the provided
656    /// [`DonePredicate`] to determine when the slot is "done", then replace the slot in this
657    /// array and re-enter execution.
658    ///
659    /// Use [`ExecutionResult::execute_slot`] instead of constructing this variant directly.
660    ExecuteSlot(usize, DonePredicate),
661
662    /// Detach the slot at the given index, append that child into the current activation's
663    /// canonical builder, and keep the returned parent as `current_array`.
664    ///
665    /// `Done` finalizes that builder and turns it into the result of the activation.
666    ///
667    /// **Note:** In the single-step executor ([`Executable`] for [`ArrayRef`]), this variant
668    /// drives the entire parent to completion in one call via [`execute_into_builder`], which
669    /// may perform substantially more work than a single `ExecuteSlot` step.
670    AppendChild(usize),
671
672    /// Execution is complete. If no builder is active, the array in the accompanying
673    /// [`ExecutionResult`] is the result. Otherwise, the scheduler finalizes the active
674    /// builder and uses that finished array instead.
675    ///
676    /// The scheduler will continue executing if it has not yet reached the target form.
677    Done,
678}
679
680impl fmt::Debug for ExecutionStep {
681    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
682        match self {
683            ExecutionStep::ExecuteSlot(idx, _) => f.debug_tuple("ExecuteSlot").field(idx).finish(),
684            ExecutionStep::AppendChild(idx) => f.debug_tuple("AppendChild").field(idx).finish(),
685            ExecutionStep::Done => write!(f, "Done"),
686        }
687    }
688}
689
690/// The result of a single execution step on an array encoding.
691///
692/// Combines an [`ArrayRef`] with an [`ExecutionStep`] to tell the scheduler both what to do next
693/// and what array to work with.
694pub struct ExecutionResult {
695    array: ArrayRef,
696    step: ExecutionStep,
697}
698
699impl ExecutionResult {
700    /// Signal that execution is complete with the given result array.
701    pub fn done(result: impl IntoArray) -> Self {
702        Self {
703            array: result.into_array(),
704            step: ExecutionStep::Done,
705        }
706    }
707
708    /// Request execution of slot at `slot_idx` until it matches the given [`Matcher`].
709    ///
710    /// The provided array is the (possibly modified) parent that still needs its slot executed.
711    pub fn execute_slot<M: Matcher>(array: impl IntoArray, slot_idx: usize) -> Self {
712        Self {
713            array: array.into_array(),
714            step: ExecutionStep::ExecuteSlot(slot_idx, M::matches),
715        }
716    }
717
718    /// Request that the child slot at `slot_idx` be detached, appended into the current
719    /// activation's canonical builder, and leave the returned parent as the next
720    /// `current_array`.
721    pub fn append_child(array: impl IntoArray, slot_idx: usize) -> Self {
722        Self {
723            array: array.into_array(),
724            step: ExecutionStep::AppendChild(slot_idx),
725        }
726    }
727
728    /// Returns a reference to the array.
729    pub fn array(&self) -> &ArrayRef {
730        &self.array
731    }
732
733    /// Returns a reference to the step.
734    pub fn step(&self) -> &ExecutionStep {
735        &self.step
736    }
737
738    /// Decompose into parts.
739    pub fn into_parts(self) -> (ArrayRef, ExecutionStep) {
740        (self.array, self.step)
741    }
742}
743
744impl fmt::Debug for ExecutionResult {
745    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
746        f.debug_struct("ExecutionResult")
747            .field("array", &self.array)
748            .field("step", &self.step)
749            .finish()
750    }
751}
752
753/// Require that a child array matches `$M`. If the child already matches, returns the same
754/// array unchanged. Otherwise, early-returns an [`ExecutionResult`] requesting execution of
755/// child `$idx` until it matches `$M`.
756///
757/// ```ignore
758/// let array = require_child!(array, array.codes(), 0 => Primitive);
759/// let array = require_child!(array, array.values(), 1 => AnyCanonical);
760/// ```
761#[macro_export]
762macro_rules! require_child {
763    ($parent:expr, $child:expr, $idx:expr => $M:ty) => {{
764        if !$child.is::<$M>() {
765            return Ok($crate::ExecutionResult::execute_slot::<$M>(
766                $parent.clone(),
767                $idx,
768            ));
769        }
770        $parent
771    }};
772}
773
774/// Like [`require_child!`], but for optional children. If the child is `None`, this is a no-op.
775/// If the child is `Some` but does not match `$M`, early-returns an [`ExecutionResult`] requesting
776/// execution of child `$idx`.
777///
778/// Unlike `require_child!`, this is a statement macro (no value produced) and does not clone
779/// `$parent` - it is moved into the early-return path.
780///
781/// ```ignore
782/// require_opt_child!(array, array.patches().map(|p| p.indices()), 1 => Primitive);
783/// ```
784#[macro_export]
785macro_rules! require_opt_child {
786    ($parent:expr, $child_opt:expr, $idx:expr => $M:ty) => {
787        if $child_opt.is_some_and(|child| !child.is::<$M>()) {
788            return Ok($crate::ExecutionResult::execute_slot::<$M>($parent, $idx));
789        }
790    };
791}
792
793/// Require that patch slots (indices, values, and optionally chunk_offsets) are `Primitive`.
794/// If no patches are present (slots are `None`), this is a no-op.
795///
796/// Like [`require_opt_child!`], `$parent` is moved (not cloned) into the early-return path.
797///
798/// ```ignore
799/// require_patches!(array, PATCH_INDICES_SLOT, PATCH_VALUES_SLOT, PATCH_CHUNK_OFFSETS_SLOT);
800/// ```
801#[macro_export]
802macro_rules! require_patches {
803    ($parent:expr, $indices_slot:expr, $values_slot:expr, $chunk_offsets_slot:expr) => {
804        $crate::require_opt_child!(
805            $parent,
806            $parent.slots()[$indices_slot].as_ref(),
807            $indices_slot => $crate::arrays::Primitive
808        );
809        $crate::require_opt_child!(
810            $parent,
811            $parent.slots()[$values_slot].as_ref(),
812            $values_slot => $crate::arrays::Primitive
813        );
814        $crate::require_opt_child!(
815            $parent,
816            $parent.slots()[$chunk_offsets_slot].as_ref(),
817            $chunk_offsets_slot => $crate::arrays::Primitive
818        );
819    };
820}
821
822/// Require that the validity slot is a [`Bool`](crate::arrays::Bool) array. If validity is not
823/// array-backed (e.g. `NonNullable` or `AllValid`), this is a no-op. If it is array-backed but
824/// not `Bool`, early-returns an [`ExecutionResult`] requesting execution of the validity slot.
825///
826/// Like [`require_opt_child!`], `$parent` is moved (not cloned) into the early-return path.
827///
828/// ```ignore
829/// require_validity!(array, VALIDITY_SLOT);
830/// ```
831#[macro_export]
832macro_rules! require_validity {
833    ($parent:expr, $idx:expr) => {
834        $crate::require_opt_child!(
835            $parent,
836            $parent.slots()[$idx].as_ref(),
837            $idx => $crate::arrays::Bool
838        );
839    };
840}
841
842/// Extension trait for creating an execution context from a session.
843pub trait VortexSessionExecute {
844    /// Create a new execution context from this session.
845    fn create_execution_ctx(&self) -> ExecutionCtx;
846}
847
848impl VortexSessionExecute for VortexSession {
849    fn create_execution_ctx(&self) -> ExecutionCtx {
850        ExecutionCtx::new(self.clone())
851    }
852}
853
854#[cfg(test)]
855mod tests {
856    use vortex_session::VortexSession;
857
858    use super::*;
859    use crate::VTable as _;
860    use crate::VortexSessionExecute;
861    use crate::arrays::Bool;
862    use crate::arrays::Primitive;
863    use crate::optimizer::kernels::ExecuteParentFn;
864    use crate::optimizer::kernels::KernelSession;
865    use crate::optimizer::kernels::execute_parent_key;
866
867    fn noop_execute_parent(
868        _child: &ArrayRef,
869        _parent: &ArrayRef,
870        _child_idx: usize,
871        _ctx: &mut ExecutionCtx,
872    ) -> VortexResult<Option<ArrayRef>> {
873        Ok(None)
874    }
875
876    #[test]
877    fn execution_ctx_snapshots_execute_parent_kernels_at_creation() {
878        let session = VortexSession::empty().with_some(KernelSession::empty());
879        let key = execute_parent_key(Bool.id(), Primitive.id());
880
881        let before_registration = session.create_execution_ctx();
882        assert!(
883            !before_registration
884                .execute_parent_kernels
885                .contains_key(&key)
886        );
887
888        let kernels = session.kernels();
889        kernels.register_execute_parent(
890            Bool.id(),
891            Primitive.id(),
892            &[noop_execute_parent as ExecuteParentFn],
893        );
894
895        assert!(
896            !before_registration
897                .execute_parent_kernels
898                .contains_key(&key)
899        );
900
901        let after_registration = session.create_execution_ctx();
902        assert!(after_registration.execute_parent_kernels.contains_key(&key));
903    }
904}