Skip to main content

vortex_array/
executor.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Iterative array execution.
5//!
6//! The single-step [`Executable`] implementation for [`ArrayRef`] tries `reduce`,
7//! `reduce_parent`, `execute_parent`, then `execute` once. The matcher-driven
8//! [`ArrayRef::execute_until`] loop interprets [`ExecutionStep::ExecuteSlot`],
9//! [`ExecutionStep::AppendChild`], and [`ExecutionStep::Done`] using an explicit stack plus an
10//! optional builder, so encodings can advance without recursive descent.
11//!
12//! See <https://docs.vortex.dev/developer-guide/internals/execution> for the full execution
13//! narrative, diagrams, and walkthroughs.
14
15use std::env::VarError;
16use std::fmt;
17use std::fmt::Display;
18use std::sync::LazyLock;
19
20use vortex_error::VortexExpect;
21use vortex_error::VortexResult;
22use vortex_error::vortex_bail;
23use vortex_error::vortex_ensure;
24use vortex_error::vortex_panic;
25use vortex_session::VortexSession;
26
27use crate::AnyCanonical;
28use crate::ArrayRef;
29use crate::Canonical;
30use crate::IntoArray;
31use crate::array::ArrayId;
32use crate::builders::ArrayBuilder;
33use crate::builders::builder_with_capacity_in;
34use crate::dtype::DType;
35use crate::matcher::Matcher;
36use crate::memory::HostAllocatorRef;
37use crate::memory::MemorySessionExt;
38use crate::optimizer::ArrayOptimizer;
39use crate::stats::ArrayStats;
40use crate::stats::StatsSet;
41
42/// Returns the maximum number of iterations to attempt when executing an array before giving up and returning
43/// an error, can be by the `VORTEX_MAX_ITERATIONS` env variables, otherwise defaults to 2^22.
44pub(crate) fn max_iterations() -> usize {
45    static MAX_ITERATIONS: LazyLock<usize> =
46        LazyLock::new(|| match std::env::var("VORTEX_MAX_ITERATIONS") {
47            Ok(val) => val.parse::<usize>().unwrap_or_else(|e| {
48                vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid usize: {e}")
49            }),
50            Err(VarError::NotPresent) => 2 << 21, // 2 ^ 22
51            Err(VarError::NotUnicode(_)) => {
52                vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid unicode string")
53            }
54        });
55    *MAX_ITERATIONS
56}
57
58/// Marker trait for types that an [`ArrayRef`] can be executed into.
59///
60/// Implementors must provide an implementation of `execute` that takes
61/// an [`ArrayRef`] and an [`ExecutionCtx`], and produces an instance of the
62/// implementor type.
63///
64/// Users should use the `Array::execute` or `Array::execute_as` methods
65pub trait Executable: Sized {
66    fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self>;
67}
68
69#[expect(clippy::same_name_method)]
70impl ArrayRef {
71    /// Execute this array to produce an instance of `E`.
72    ///
73    /// See the [`Executable`] implementation for details on how this execution is performed.
74    pub fn execute<E: Executable>(self, ctx: &mut ExecutionCtx) -> VortexResult<E> {
75        E::execute(self, ctx)
76    }
77
78    /// Execute this array, labeling the execution step with a name for tracing.
79    pub fn execute_as<E: Executable>(
80        self,
81        _name: &'static str,
82        ctx: &mut ExecutionCtx,
83    ) -> VortexResult<E> {
84        E::execute(self, ctx)
85    }
86
87    /// Iteratively execute this array until the [`Matcher`] matches, using an explicit work
88    /// stack plus an optional builder for `AppendChild`.
89    ///
90    /// Note: the returned array may not match `M`. If execution converges to a canonical form
91    /// that does not match `M`, the canonical array is returned since no further execution
92    /// progress is possible.
93    ///
94    /// For safety, this errors once execution reaches a configurable maximum number of
95    /// iterations (default `2^22`, override with `VORTEX_MAX_ITERATIONS`).
96    ///
97    /// # Loop state
98    ///
99    /// - `current_array: ArrayRef` -- the array currently in focus.
100    /// - `current_builder: Option<Box<dyn ArrayBuilder>>` -- active only for builder-mode
101    ///   execution. `AppendChild` appends detached children here. `Done` finishes the builder
102    ///   and turns it back into the next `current_array`.
103    /// - `stack: Vec<StackFrame>` -- suspended parents from `ExecuteSlot`, including the
104    ///   detached slot index, its [`DonePredicate`], and the parent builder that was active
105    ///   before focus moved into the child.
106    ///
107    /// Example after `ExecuteSlot(1, pred)` has focused slot 1 of a parent:
108    ///
109    /// ```text
110    ///   stack[top].parent_array:
111    ///     RunEnd                          <-- suspended parent
112    ///     +-- slot 0: ends
113    ///     +-- slot 1: _  (detached)
114    ///
115    ///   current_array:
116    ///     DictEncoding                    <-- focused child
117    ///     +-- slot 0: codes
118    ///     +-- slot 1: dictionary
119    ///
120    ///   current_builder:
121    ///     None
122    /// ```
123    ///
124    /// Each loop iteration works like this:
125    ///
126    /// ```text
127    /// loop:
128    ///   Step 1: done(current_array)?
129    ///     - root activation   -> return current_array
130    ///     - ExecuteSlot frame -> pop, reattach child, resume parent
131    ///
132    ///   Step 2: current_builder active?
133    ///     - yes -> skip Step 2a / 2b
134    ///     - no  -> try parent kernels
135    ///
136    ///   Step 2a: current_array.execute_parent(stack.top.parent_array)
137    ///     child looks up at the suspended parent from ExecuteSlot
138    ///
139    ///   Step 2b: for child in current_array.children():
140    ///               child.execute_parent(current_array)
141    ///     each child looks up at current_array
142    ///
143    ///   Step 3: match current_array.execute()
144    ///     ExecuteSlot(i, pred) -> push parent on stack, focus child `i`
145    ///     AppendChild(i)       -> detach child `i`, append it into current_builder,
146    ///                             keep parent as current_array
147    ///     Done                 -> finish current_builder if present, else use returned array
148    /// ```
149    ///
150    /// Step 2a and Step 2b are skipped while `current_builder` is active. `AppendChild`
151    /// partially consumes `current_array`: some slots already live in the builder, so a
152    /// parent rewrite would observe inconsistent state and could discard accumulated builder
153    /// data.
154    pub fn execute_until<M: Matcher>(self, ctx: &mut ExecutionCtx) -> VortexResult<ArrayRef> {
155        let mut current_array = self;
156        let mut current_builder: Option<Box<dyn ArrayBuilder>> = None;
157        let mut stack: Vec<StackFrame> = Vec::new();
158        let max_iterations = max_iterations();
159
160        for _ in 0..max_iterations {
161            let is_done = stack
162                .last()
163                .map_or(M::matches as DonePredicate, |frame| frame.done);
164
165            if is_done(&current_array) || AnyCanonical::matches(&current_array) {
166                match stack.pop() {
167                    None => {
168                        debug_assert!(
169                            current_builder.is_none(),
170                            "root activation should not retain a builder"
171                        );
172                        ctx.log(format_args!("-> {}", current_array));
173                        return Ok(current_array);
174                    }
175                    Some(frame) => {
176                        (current_array, current_builder) = pop_frame(frame, current_array)?;
177                        continue;
178                    }
179                }
180            }
181
182            // Step 2a: execute_parent against the suspended parent from ExecuteSlot.
183            //
184            // When executing a child for ExecuteSlot, try execute_parent against
185            // the suspended parent on the stack. This lets kernels like RunEnd's
186            // FilterKernel fire before the child is forced to canonical.
187            //
188            // Skip when a builder is active: the current array has been partially
189            // consumed by AppendChild (some slots are already in the builder), so
190            // a parent rewrite would see inconsistent state and the builder data
191            // would be lost when we restore frame.parent_builder.
192            if current_builder.is_none()
193                && let Some(frame) = stack.last()
194                && let Some(result) =
195                    current_array.execute_parent(&frame.parent_array, frame.slot_idx, ctx)?
196            {
197                ctx.log(format_args!(
198                    "execute_parent (stack) rewrote {} -> {}",
199                    current_array, result
200                ));
201                let frame = stack.pop().vortex_expect("just peeked");
202                current_array = result.optimize_ctx(ctx.session())?;
203                current_builder = frame.parent_builder;
204                continue;
205            }
206
207            // Step 2b: execute_parent against current_array's own children.
208            if current_builder.is_none()
209                && let Some(rewritten) = try_execute_parent(&current_array, ctx)?
210            {
211                ctx.log(format_args!(
212                    "execute_parent rewrote {} -> {}",
213                    current_array, rewritten
214                ));
215                current_array = rewritten.optimize_ctx(ctx.session())?;
216                continue;
217            }
218
219            // execute step
220            let expected_len = current_array.len();
221            let expected_dtype = current_array.dtype().clone();
222            let stats = current_array.statistics().to_array_stats();
223            let encoding_id = current_array.encoding_id();
224            let result = current_array.execute_encoding_unchecked(ctx)?;
225            let (array, step) = result.into_parts();
226            match step {
227                ExecutionStep::ExecuteSlot(i, done) => {
228                    let (parent, child) = unsafe { array.take_slot_unchecked(i) }?;
229                    ctx.log(format_args!(
230                        "ExecuteSlot({i}): pushing {}, focusing on {}",
231                        parent, child
232                    ));
233                    stack.push(StackFrame {
234                        parent_array: parent,
235                        parent_builder: current_builder.take(),
236                        slot_idx: i,
237                        done,
238                        original_dtype: child.dtype().clone(),
239                        original_len: child.len(),
240                    });
241                    current_array = child;
242                    current_builder = None;
243                }
244                ExecutionStep::AppendChild(i) => {
245                    if current_builder.is_none() {
246                        current_builder = Some(builder_with_capacity_in(
247                            ctx.allocator(),
248                            array.dtype(),
249                            array.len(),
250                        ));
251                    }
252                    let (parent, child) = unsafe { array.take_slot_unchecked(i) }?;
253                    ctx.log(format_args!(
254                        "AppendChild({i}): appending {} into builder",
255                        child
256                    ));
257                    // TODO(joe)[7674]: replace with a builder kernel registry so we don't
258                    // need to go through the VTable append_to_builder indirection.
259                    child.append_to_builder(
260                        current_builder
261                            .as_deref_mut()
262                            .vortex_expect("builder must exist"),
263                        ctx,
264                    )?;
265                    current_array = parent;
266                }
267                ExecutionStep::Done => {
268                    ctx.log(format_args!("Done: {}", array));
269                    (current_array, current_builder) = finalize_done(
270                        array,
271                        current_builder,
272                        expected_len,
273                        expected_dtype,
274                        stats,
275                        encoding_id,
276                    )?;
277                }
278            }
279        }
280
281        vortex_bail!(
282            "Exceeded maximum execution iterations ({}) while executing array",
283            max_iterations,
284        )
285    }
286}
287
288struct StackFrame {
289    parent_array: ArrayRef,
290    parent_builder: Option<Box<dyn ArrayBuilder>>,
291    slot_idx: usize,
292    done: DonePredicate,
293    original_dtype: DType,
294    original_len: usize,
295}
296
297/// Execution context for batch CPU compute.
298#[derive(Debug, Clone)]
299pub struct ExecutionCtx {
300    session: VortexSession,
301    #[cfg(debug_assertions)]
302    id: usize,
303    #[cfg(debug_assertions)]
304    ops: Vec<String>,
305}
306
307impl ExecutionCtx {
308    /// Create a new execution context with the given session.
309    pub fn new(session: VortexSession) -> Self {
310        Self {
311            session,
312            #[cfg(debug_assertions)]
313            id: {
314                static EXEC_CTX_ID: std::sync::atomic::AtomicUsize =
315                    std::sync::atomic::AtomicUsize::new(0);
316                EXEC_CTX_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
317            },
318            #[cfg(debug_assertions)]
319            ops: Vec::new(),
320        }
321    }
322
323    /// Get the session associated with this execution context.
324    pub fn session(&self) -> &VortexSession {
325        &self.session
326    }
327
328    /// Get the session-scoped host allocator for this execution context.
329    pub fn allocator(&self) -> HostAllocatorRef {
330        self.session.allocator()
331    }
332
333    /// Log an execution step at the current depth.
334    ///
335    /// Steps are accumulated and dumped as a single trace on Drop at DEBUG level.
336    /// Individual steps are also logged at TRACE level for real-time following.
337    ///
338    /// Use the [`format_args!`] macro to create the `msg` argument.
339    pub fn log(&mut self, msg: fmt::Arguments<'_>) {
340        #[cfg(debug_assertions)]
341        if tracing::enabled!(tracing::Level::DEBUG) {
342            let formatted = format!(" - {msg}");
343            tracing::trace!("exec[{}]: {formatted}", self.id);
344            self.ops.push(formatted);
345        }
346        let _ = msg;
347    }
348}
349
350impl Display for ExecutionCtx {
351    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
352        #[cfg(debug_assertions)]
353        return write!(f, "exec[{}]", self.id);
354        #[cfg(not(debug_assertions))]
355        write!(f, "exec")
356    }
357}
358
359#[cfg(debug_assertions)]
360impl Drop for ExecutionCtx {
361    fn drop(&mut self) {
362        if !self.ops.is_empty() && tracing::enabled!(tracing::Level::DEBUG) {
363            // Unlike itertools `.format()` (panics in 0.14 on second format)
364            struct FmtOps<'a>(&'a [String]);
365            impl Display for FmtOps<'_> {
366                fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
367                    for (i, op) in self.0.iter().enumerate() {
368                        if i > 0 {
369                            f.write_str("\n")?;
370                        }
371                        f.write_str(op)?;
372                    }
373                    Ok(())
374                }
375            }
376            tracing::debug!("exec[{}] trace:\n{}", self.id, FmtOps(&self.ops));
377        }
378    }
379}
380
381/// Single-step execution: takes one step toward canonical form.
382///
383/// Steps through reduce, reduce_parent, execute_parent, then execute. For `ExecuteSlot`,
384/// only a single child execution step is performed — the child is executed once and put back,
385/// making this a lightweight, bounded operation.
386///
387/// **However**, if `execute_step` returns [`ExecutionStep::AppendChild`], this implementation
388/// drives the *entire* array to completion via [`execute_into_builder`] in a single call.
389/// This can do substantially more work than a normal step because it creates a builder and
390/// fully decodes the array into that builder before returning. Callers should be aware that a
391/// single `.execute::<ArrayRef>(ctx)` call may perform O(n_children * decode_cost) work when
392/// `AppendChild` is returned.
393impl Executable for ArrayRef {
394    fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self> {
395        if let Some(canonical) = array.as_opt::<AnyCanonical>() {
396            ctx.log(format_args!("-> canonical {}", array));
397            return Ok(Canonical::from(canonical).into_array());
398        }
399
400        if let Some(reduced) = array.reduce()? {
401            ctx.log(format_args!("reduce: rewrote {} -> {}", array, reduced));
402            reduced.statistics().inherit_from(array.statistics());
403            return Ok(reduced);
404        }
405
406        for (slot_idx, slot) in array.slots().iter().enumerate() {
407            let Some(child) = slot else { continue };
408            if let Some(reduced_parent) = child.reduce_parent(&array, slot_idx)? {
409                ctx.log(format_args!(
410                    "reduce_parent: slot[{}]({}) rewrote {} -> {}",
411                    slot_idx,
412                    child.encoding_id(),
413                    array,
414                    reduced_parent
415                ));
416                reduced_parent.statistics().inherit_from(array.statistics());
417                return Ok(reduced_parent);
418            }
419        }
420
421        for (slot_idx, slot) in array.slots().iter().enumerate() {
422            let Some(child) = slot else { continue };
423            if let Some(executed_parent) = child.execute_parent(&array, slot_idx, ctx)? {
424                ctx.log(format_args!(
425                    "execute_parent: slot[{}]({}) rewrote {} -> {}",
426                    slot_idx,
427                    child.encoding_id(),
428                    array,
429                    executed_parent
430                ));
431                executed_parent
432                    .statistics()
433                    .inherit_from(array.statistics());
434                return Ok(executed_parent);
435            }
436        }
437
438        ctx.log(format_args!("executing {}", array));
439        let result = array.execute_encoding(ctx)?;
440        let (array, step) = result.into_parts();
441        match step {
442            ExecutionStep::Done => {
443                ctx.log(format_args!("-> {}", array));
444                Ok(array)
445            }
446            ExecutionStep::ExecuteSlot(i, _) => {
447                let child = array.slots()[i].clone().vortex_expect("valid slot index");
448                let executed_child = child.execute::<ArrayRef>(ctx)?;
449                array.with_slot(i, executed_child)
450            }
451            ExecutionStep::AppendChild(_) => {
452                // Single-step: build the entire parent via the builder path.
453                let builder = builder_with_capacity_in(ctx.allocator(), array.dtype(), array.len());
454                let mut builder = execute_into_builder(array, builder, ctx)?;
455                Ok(builder.finish())
456            }
457        }
458    }
459}
460
461/// Execute `array` into the given `builder`.
462///
463/// This uses the encoding's [`crate::array::VTable::append_to_builder`] implementation. Most
464/// encodings use the default path of `execute::<Canonical>` followed by `builder.extend_from_array`,
465/// while encodings like `Chunked` can override that to append child-by-child without materializing
466/// the entire parent.
467///
468/// The builder must have a [`DType`] that is a nullability-superset of `array.dtype()`.
469pub fn execute_into_builder(
470    array: ArrayRef,
471    mut builder: Box<dyn ArrayBuilder>,
472    ctx: &mut ExecutionCtx,
473) -> VortexResult<Box<dyn ArrayBuilder>> {
474    array.append_to_builder(builder.as_mut(), ctx)?;
475    Ok(builder)
476}
477
478/// Pop a stack frame, restoring the parent with the finished child in its slot.
479fn pop_frame(
480    frame: StackFrame,
481    child: ArrayRef,
482) -> VortexResult<(ArrayRef, Option<Box<dyn ArrayBuilder>>)> {
483    debug_assert_eq!(
484        child.dtype(),
485        &frame.original_dtype,
486        "child dtype changed during execution"
487    );
488    debug_assert_eq!(
489        child.len(),
490        frame.original_len,
491        "child len changed during execution"
492    );
493    let parent_array = unsafe { frame.parent_array.put_slot_unchecked(frame.slot_idx, child) }?;
494    Ok((parent_array, frame.parent_builder))
495}
496
497fn finalize_done(
498    result: ArrayRef,
499    mut builder: Option<Box<dyn ArrayBuilder>>,
500    expected_len: usize,
501    expected_dtype: DType,
502    stats: ArrayStats,
503    encoding_id: ArrayId,
504) -> VortexResult<(ArrayRef, Option<Box<dyn ArrayBuilder>>)> {
505    let output = if let Some(mut builder) = builder.take() {
506        builder.finish()
507    } else {
508        result
509    };
510
511    if cfg!(debug_assertions) {
512        vortex_ensure!(
513            output.len() == expected_len,
514            "Result length mismatch for {:?}",
515            encoding_id
516        );
517        vortex_ensure!(
518            output.dtype() == &expected_dtype,
519            "Executed canonical dtype mismatch for {:?}",
520            encoding_id
521        );
522    }
523
524    output
525        .statistics()
526        .set_iter(StatsSet::from(stats).into_iter());
527    Ok((output, None))
528}
529
530/// Try execute_parent on each occupied slot of the array.
531fn try_execute_parent(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Option<ArrayRef>> {
532    for (slot_idx, slot) in array.slots().iter().enumerate() {
533        let Some(child) = slot else {
534            continue;
535        };
536        if let Some(result) = child.execute_parent(array, slot_idx, ctx)? {
537            result.statistics().inherit_from(array.statistics());
538            return Ok(Some(result));
539        }
540    }
541    Ok(None)
542}
543
544/// A predicate that determines when an array has reached a desired form during execution.
545pub type DonePredicate = fn(&ArrayRef) -> bool;
546
547/// Scheduler step indicator returned alongside an array in [`ExecutionResult`].
548///
549/// Instead of recursively executing children, encodings return an `ExecutionStep` that tells the
550/// scheduler what to do next. This enables the scheduler to manage execution iteratively using
551/// an explicit work stack plus an optional builder.
552///
553/// # Semantics
554///
555/// Each variant describes a different execution strategy with distinct cost profiles:
556///
557/// - [`Done`](ExecutionStep::Done): The current activation has finished its work. If no builder
558///   is active, the returned array is the result. If a builder is active, the scheduler ignores
559///   the placeholder array and finishes the builder instead. The scheduler may continue
560///   executing if the target form (e.g. canonical) has not yet been reached.
561///
562/// - [`ExecuteSlot`](ExecutionStep::ExecuteSlot): The encoding needs one of its children
563///   decoded before it can make further progress. The scheduler detaches that child, pushes
564///   the parent onto the explicit stack, executes the child until the [`DonePredicate`]
565///   matches, puts it back, and re-enters the parent. This is a cooperative yield: the
566///   encoding does a bounded amount of work per step while the loop tracks the parent-child
567///   relationship explicitly.
568///
569/// - [`AppendChild`](ExecutionStep::AppendChild): The encoding needs one child executed to
570///   canonical form and then appended into a builder owned by the current activation. The
571///   scheduler detaches that child, lazily creates `current_builder` if needed, appends the
572///   child into it, and keeps the parent as `current_array` for the next iteration. While the
573///   builder is active, parent-kernel rewrites are skipped because the parent is partially
574///   consumed. **Important:** in the single-step executor ([`Executable`] for [`ArrayRef`]),
575///   returning `AppendChild` still causes the executor to drive the *entire* array to
576///   completion via [`execute_into_builder`] in one call — this can do significantly more
577///   work than a single `ExecuteSlot` step.
578pub enum ExecutionStep {
579    /// Request that the scheduler execute the slot at the given index, using the provided
580    /// [`DonePredicate`] to determine when the slot is "done", then replace the slot in this
581    /// array and re-enter execution.
582    ///
583    /// Use [`ExecutionResult::execute_slot`] instead of constructing this variant directly.
584    ExecuteSlot(usize, DonePredicate),
585
586    /// Detach the slot at the given index, append that child into the current activation's
587    /// canonical builder, and keep the returned parent as `current_array`.
588    ///
589    /// `Done` finalizes that builder and turns it into the result of the activation.
590    ///
591    /// **Note:** In the single-step executor ([`Executable`] for [`ArrayRef`]), this variant
592    /// drives the entire parent to completion in one call via [`execute_into_builder`], which
593    /// may perform substantially more work than a single `ExecuteSlot` step.
594    AppendChild(usize),
595
596    /// Execution is complete. If no builder is active, the array in the accompanying
597    /// [`ExecutionResult`] is the result. Otherwise, the scheduler finalizes the active
598    /// builder and uses that finished array instead.
599    ///
600    /// The scheduler will continue executing if it has not yet reached the target form.
601    Done,
602}
603
604impl fmt::Debug for ExecutionStep {
605    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
606        match self {
607            ExecutionStep::ExecuteSlot(idx, _) => f.debug_tuple("ExecuteSlot").field(idx).finish(),
608            ExecutionStep::AppendChild(idx) => f.debug_tuple("AppendChild").field(idx).finish(),
609            ExecutionStep::Done => write!(f, "Done"),
610        }
611    }
612}
613
614/// The result of a single execution step on an array encoding.
615///
616/// Combines an [`ArrayRef`] with an [`ExecutionStep`] to tell the scheduler both what to do next
617/// and what array to work with.
618pub struct ExecutionResult {
619    array: ArrayRef,
620    step: ExecutionStep,
621}
622
623impl ExecutionResult {
624    /// Signal that execution is complete with the given result array.
625    pub fn done(result: impl IntoArray) -> Self {
626        Self {
627            array: result.into_array(),
628            step: ExecutionStep::Done,
629        }
630    }
631
632    /// Request execution of slot at `slot_idx` until it matches the given [`Matcher`].
633    ///
634    /// The provided array is the (possibly modified) parent that still needs its slot executed.
635    pub fn execute_slot<M: Matcher>(array: impl IntoArray, slot_idx: usize) -> Self {
636        Self {
637            array: array.into_array(),
638            step: ExecutionStep::ExecuteSlot(slot_idx, M::matches),
639        }
640    }
641
642    /// Request that the child slot at `slot_idx` be detached, appended into the current
643    /// activation's canonical builder, and leave the returned parent as the next
644    /// `current_array`.
645    pub fn append_child(array: impl IntoArray, slot_idx: usize) -> Self {
646        Self {
647            array: array.into_array(),
648            step: ExecutionStep::AppendChild(slot_idx),
649        }
650    }
651
652    /// Returns a reference to the array.
653    pub fn array(&self) -> &ArrayRef {
654        &self.array
655    }
656
657    /// Returns a reference to the step.
658    pub fn step(&self) -> &ExecutionStep {
659        &self.step
660    }
661
662    /// Decompose into parts.
663    pub fn into_parts(self) -> (ArrayRef, ExecutionStep) {
664        (self.array, self.step)
665    }
666}
667
668impl fmt::Debug for ExecutionResult {
669    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
670        f.debug_struct("ExecutionResult")
671            .field("array", &self.array)
672            .field("step", &self.step)
673            .finish()
674    }
675}
676
677/// Require that a child array matches `$M`. If the child already matches, returns the same
678/// array unchanged. Otherwise, early-returns an [`ExecutionResult`] requesting execution of
679/// child `$idx` until it matches `$M`.
680///
681/// ```ignore
682/// let array = require_child!(array, array.codes(), 0 => Primitive);
683/// let array = require_child!(array, array.values(), 1 => AnyCanonical);
684/// ```
685#[macro_export]
686macro_rules! require_child {
687    ($parent:expr, $child:expr, $idx:expr => $M:ty) => {{
688        if !$child.is::<$M>() {
689            return Ok($crate::ExecutionResult::execute_slot::<$M>(
690                $parent.clone(),
691                $idx,
692            ));
693        }
694        $parent
695    }};
696}
697
698/// Like [`require_child!`], but for optional children. If the child is `None`, this is a no-op.
699/// If the child is `Some` but does not match `$M`, early-returns an [`ExecutionResult`] requesting
700/// execution of child `$idx`.
701///
702/// Unlike `require_child!`, this is a statement macro (no value produced) and does not clone
703/// `$parent` - it is moved into the early-return path.
704///
705/// ```ignore
706/// require_opt_child!(array, array.patches().map(|p| p.indices()), 1 => Primitive);
707/// ```
708#[macro_export]
709macro_rules! require_opt_child {
710    ($parent:expr, $child_opt:expr, $idx:expr => $M:ty) => {
711        if $child_opt.is_some_and(|child| !child.is::<$M>()) {
712            return Ok($crate::ExecutionResult::execute_slot::<$M>($parent, $idx));
713        }
714    };
715}
716
717/// Require that patch slots (indices, values, and optionally chunk_offsets) are `Primitive`.
718/// If no patches are present (slots are `None`), this is a no-op.
719///
720/// Like [`require_opt_child!`], `$parent` is moved (not cloned) into the early-return path.
721///
722/// ```ignore
723/// require_patches!(array, PATCH_INDICES_SLOT, PATCH_VALUES_SLOT, PATCH_CHUNK_OFFSETS_SLOT);
724/// ```
725#[macro_export]
726macro_rules! require_patches {
727    ($parent:expr, $indices_slot:expr, $values_slot:expr, $chunk_offsets_slot:expr) => {
728        $crate::require_opt_child!(
729            $parent,
730            $parent.slots()[$indices_slot].as_ref(),
731            $indices_slot => $crate::arrays::Primitive
732        );
733        $crate::require_opt_child!(
734            $parent,
735            $parent.slots()[$values_slot].as_ref(),
736            $values_slot => $crate::arrays::Primitive
737        );
738        $crate::require_opt_child!(
739            $parent,
740            $parent.slots()[$chunk_offsets_slot].as_ref(),
741            $chunk_offsets_slot => $crate::arrays::Primitive
742        );
743    };
744}
745
746/// Require that the validity slot is a [`Bool`](crate::arrays::Bool) array. If validity is not
747/// array-backed (e.g. `NonNullable` or `AllValid`), this is a no-op. If it is array-backed but
748/// not `Bool`, early-returns an [`ExecutionResult`] requesting execution of the validity slot.
749///
750/// Like [`require_opt_child!`], `$parent` is moved (not cloned) into the early-return path.
751///
752/// ```ignore
753/// require_validity!(array, VALIDITY_SLOT);
754/// ```
755#[macro_export]
756macro_rules! require_validity {
757    ($parent:expr, $idx:expr) => {
758        $crate::require_opt_child!(
759            $parent,
760            $parent.slots()[$idx].as_ref(),
761            $idx => $crate::arrays::Bool
762        );
763    };
764}
765
766/// Extension trait for creating an execution context from a session.
767pub trait VortexSessionExecute {
768    /// Create a new execution context from this session.
769    fn create_execution_ctx(&self) -> ExecutionCtx;
770}
771
772impl VortexSessionExecute for VortexSession {
773    fn create_execution_ctx(&self) -> ExecutionCtx {
774        ExecutionCtx::new(self.clone())
775    }
776}