Skip to main content

vortex_array/
executor.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Iterative array execution.
5//!
6//! The single-step [`Executable`] implementation for [`ArrayRef`] tries `reduce`,
7//! `reduce_parent`, `execute_parent`, then `execute` once. The matcher-driven
8//! [`ArrayRef::execute_until`] loop interprets [`ExecutionStep::ExecuteSlot`],
9//! [`ExecutionStep::AppendChild`], and [`ExecutionStep::Done`] using an explicit stack plus an
10//! optional builder, so encodings can advance without recursive descent.
11//!
12//! See <https://docs.vortex.dev/developer-guide/internals/execution> for the full execution
13//! narrative, diagrams, and walkthroughs.
14
15use std::env::VarError;
16use std::fmt;
17use std::fmt::Display;
18use std::sync::LazyLock;
19#[cfg(debug_assertions)]
20use std::sync::atomic::AtomicUsize;
21#[cfg(debug_assertions)]
22use std::sync::atomic::Ordering;
23
24use vortex_error::VortexExpect;
25use vortex_error::VortexResult;
26use vortex_error::vortex_bail;
27use vortex_error::vortex_ensure;
28use vortex_error::vortex_panic;
29use vortex_session::Ref;
30use vortex_session::SessionExt;
31use vortex_session::VortexSession;
32
33use crate::AnyCanonical;
34use crate::ArrayRef;
35use crate::Canonical;
36use crate::IntoArray;
37use crate::array::ArrayId;
38use crate::builders::ArrayBuilder;
39use crate::builders::builder_with_capacity_in;
40use crate::dtype::DType;
41use crate::matcher::Matcher;
42use crate::memory::HostAllocatorRef;
43use crate::memory::MemorySessionExt;
44use crate::optimizer::ArrayOptimizer;
45use crate::optimizer::kernels::ArrayKernels;
46use crate::stats::ArrayStats;
47use crate::stats::StatsSet;
48
49/// Returns the maximum number of iterations to attempt when executing an array before giving up and returning
50/// an error, can be by the `VORTEX_MAX_ITERATIONS` env variables, otherwise defaults to 2^22.
51pub(crate) fn max_iterations() -> usize {
52    static MAX_ITERATIONS: LazyLock<usize> =
53        LazyLock::new(|| match std::env::var("VORTEX_MAX_ITERATIONS") {
54            Ok(val) => val.parse::<usize>().unwrap_or_else(|e| {
55                vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid usize: {e}")
56            }),
57            Err(VarError::NotPresent) => 2 << 21, // 2 ^ 22
58            Err(VarError::NotUnicode(_)) => {
59                vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid unicode string")
60            }
61        });
62    *MAX_ITERATIONS
63}
64
65/// Marker trait for types that an [`ArrayRef`] can be executed into.
66///
67/// Implementors must provide an implementation of `execute` that takes
68/// an [`ArrayRef`] and an [`ExecutionCtx`], and produces an instance of the
69/// implementor type.
70///
71/// Users should use the `Array::execute` or `Array::execute_as` methods
72pub trait Executable: Sized {
73    fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self>;
74}
75
76#[expect(clippy::same_name_method)]
77impl ArrayRef {
78    /// Execute this array to produce an instance of `E`.
79    ///
80    /// See the [`Executable`] implementation for details on how this execution is performed.
81    pub fn execute<E: Executable>(self, ctx: &mut ExecutionCtx) -> VortexResult<E> {
82        E::execute(self, ctx)
83    }
84
85    /// Execute this array, labeling the execution step with a name for tracing.
86    pub fn execute_as<E: Executable>(
87        self,
88        _name: &'static str,
89        ctx: &mut ExecutionCtx,
90    ) -> VortexResult<E> {
91        E::execute(self, ctx)
92    }
93
94    /// Iteratively execute this array until the [`Matcher`] matches, using an explicit work
95    /// stack plus an optional builder for `AppendChild`.
96    ///
97    /// Note: the returned array may not match `M`. If execution converges to a canonical form
98    /// that does not match `M`, the canonical array is returned since no further execution
99    /// progress is possible.
100    ///
101    /// For safety, this errors once execution reaches a configurable maximum number of
102    /// iterations (default `2^22`, override with `VORTEX_MAX_ITERATIONS`).
103    ///
104    /// # Loop state
105    ///
106    /// - `current_array: ArrayRef` -- the array currently in focus.
107    /// - `current_builder: Option<Box<dyn ArrayBuilder>>` -- active only for builder-mode
108    ///   execution. `AppendChild` appends detached children here. `Done` finishes the builder
109    ///   and turns it back into the next `current_array`.
110    /// - `stack: Vec<StackFrame>` -- suspended parents from `ExecuteSlot`, including the
111    ///   detached slot index, its [`DonePredicate`], and the parent builder that was active
112    ///   before focus moved into the child.
113    ///
114    /// Example after `ExecuteSlot(1, pred)` has focused slot 1 of a parent:
115    ///
116    /// ```text
117    ///   stack[top].parent_array:
118    ///     RunEnd                          <-- suspended parent
119    ///     +-- slot 0: ends
120    ///     +-- slot 1: _  (detached)
121    ///
122    ///   current_array:
123    ///     DictEncoding                    <-- focused child
124    ///     +-- slot 0: codes
125    ///     +-- slot 1: dictionary
126    ///
127    ///   current_builder:
128    ///     None
129    /// ```
130    ///
131    /// Each loop iteration works like this:
132    ///
133    /// ```text
134    /// loop:
135    ///   Step 1: done(current_array)?
136    ///     - root activation   -> return current_array
137    ///     - ExecuteSlot frame -> pop, reattach child, resume parent
138    ///
139    ///   Step 2: current_builder active?
140    ///     - yes -> skip Step 2a / 2b
141    ///     - no  -> try parent kernels
142    ///
143    ///   Step 2a: current_array.execute_parent(stack.top.parent_array)
144    ///     child looks up at the suspended parent from ExecuteSlot
145    ///
146    ///   Step 2b: for child in current_array.children():
147    ///               child.execute_parent(current_array)
148    ///     each child looks up at current_array
149    ///
150    ///   Step 3: match current_array.execute()
151    ///     ExecuteSlot(i, pred) -> push parent on stack, focus child `i`
152    ///     AppendChild(i)       -> detach child `i`, append it into current_builder,
153    ///                             keep parent as current_array
154    ///     Done                 -> finish current_builder if present, else use returned array
155    /// ```
156    ///
157    /// Step 2a and Step 2b are skipped while `current_builder` is active. `AppendChild`
158    /// partially consumes `current_array`: some slots already live in the builder, so a
159    /// parent rewrite would observe inconsistent state and could discard accumulated builder
160    /// data.
161    pub fn execute_until<M: Matcher>(self, ctx: &mut ExecutionCtx) -> VortexResult<ArrayRef> {
162        let mut current_array = self;
163        let mut current_builder: Option<Box<dyn ArrayBuilder>> = None;
164        let mut stack: Vec<StackFrame> = Vec::new();
165        let max_iterations = max_iterations();
166
167        for _ in 0..max_iterations {
168            let is_done = stack
169                .last()
170                .map_or(M::matches as DonePredicate, |frame| frame.done);
171
172            if is_done(&current_array) || AnyCanonical::matches(&current_array) {
173                match stack.pop() {
174                    None => {
175                        debug_assert!(
176                            current_builder.is_none(),
177                            "root activation should not retain a builder"
178                        );
179                        ctx.log(format_args!("-> {}", current_array));
180                        return Ok(current_array);
181                    }
182                    Some(frame) => {
183                        (current_array, current_builder) = pop_frame(frame, current_array)?;
184                        continue;
185                    }
186                }
187            }
188
189            // Step 2a: execute_parent against the suspended parent from ExecuteSlot.
190            //
191            // When executing a child for ExecuteSlot, try execute_parent against
192            // the suspended parent on the stack. This lets kernels like RunEnd's
193            // FilterKernel fire before the child is forced to canonical.
194            //
195            // Skip when a builder is active: the current array has been partially
196            // consumed by AppendChild (some slots are already in the builder), so
197            // a parent rewrite would see inconsistent state and the builder data
198            // would be lost when we restore frame.parent_builder.
199            if current_builder.is_none()
200                && let Some(frame) = stack.last()
201                && let Some(result) =
202                    current_array.execute_parent(&frame.parent_array, frame.slot_idx, ctx)?
203            {
204                ctx.log(format_args!(
205                    "execute_parent (stack) rewrote {} -> {}",
206                    current_array, result
207                ));
208                let frame = stack.pop().vortex_expect("just peeked");
209                current_array = result.optimize_ctx(ctx.session())?;
210                current_builder = frame.parent_builder;
211                continue;
212            }
213
214            // Step 2b: execute_parent against current_array's own children.
215            if current_builder.is_none()
216                && let Some(rewritten) = try_execute_parent(&current_array, ctx)?
217            {
218                ctx.log(format_args!(
219                    "execute_parent rewrote {} -> {}",
220                    current_array, rewritten
221                ));
222                current_array = rewritten.optimize_ctx(ctx.session())?;
223                continue;
224            }
225
226            // execute step
227            let expected_len = current_array.len();
228            let expected_dtype = current_array.dtype().clone();
229            let stats = current_array.statistics().to_array_stats();
230            let encoding_id = current_array.encoding_id();
231            let result = current_array.execute_encoding_unchecked(ctx)?;
232            let (array, step) = result.into_parts();
233            match step {
234                ExecutionStep::ExecuteSlot(i, done) => {
235                    let (parent, child) = unsafe { array.take_slot_unchecked(i) }?;
236                    ctx.log(format_args!(
237                        "ExecuteSlot({i}): pushing {}, focusing on {}",
238                        parent, child
239                    ));
240                    stack.push(StackFrame {
241                        parent_array: parent,
242                        parent_builder: current_builder.take(),
243                        slot_idx: i,
244                        done,
245                        original_dtype: child.dtype().clone(),
246                        original_len: child.len(),
247                    });
248                    current_array = child;
249                    current_builder = None;
250                }
251                ExecutionStep::AppendChild(i) => {
252                    if current_builder.is_none() {
253                        current_builder = Some(builder_with_capacity_in(
254                            ctx.allocator(),
255                            array.dtype(),
256                            array.len(),
257                        ));
258                    }
259                    let (parent, child) = unsafe { array.take_slot_unchecked(i) }?;
260                    ctx.log(format_args!(
261                        "AppendChild({i}): appending {} into builder",
262                        child
263                    ));
264                    // TODO(joe)[7674]: replace with a builder kernel registry so we don't
265                    // need to go through the VTable append_to_builder indirection.
266                    child.append_to_builder(
267                        current_builder
268                            .as_deref_mut()
269                            .vortex_expect("builder must exist"),
270                        ctx,
271                    )?;
272                    current_array = parent;
273                }
274                ExecutionStep::Done => {
275                    ctx.log(format_args!("Done: {}", array));
276                    (current_array, current_builder) = finalize_done(
277                        array,
278                        current_builder,
279                        expected_len,
280                        expected_dtype,
281                        stats,
282                        encoding_id,
283                    )?;
284                }
285            }
286        }
287
288        vortex_bail!(
289            "Exceeded maximum execution iterations ({}) while executing array",
290            max_iterations,
291        )
292    }
293}
294
295struct StackFrame {
296    parent_array: ArrayRef,
297    parent_builder: Option<Box<dyn ArrayBuilder>>,
298    slot_idx: usize,
299    done: DonePredicate,
300    original_dtype: DType,
301    original_len: usize,
302}
303
304/// Execution context for batch CPU compute.
305#[derive(Debug, Clone)]
306pub struct ExecutionCtx {
307    session: VortexSession,
308    #[cfg(debug_assertions)]
309    id: usize,
310    #[cfg(debug_assertions)]
311    ops: Vec<String>,
312}
313
314impl ExecutionCtx {
315    /// Create a new execution context with the given session.
316    pub fn new(session: VortexSession) -> Self {
317        Self {
318            session,
319            #[cfg(debug_assertions)]
320            id: {
321                static EXEC_CTX_ID: AtomicUsize = AtomicUsize::new(0);
322                EXEC_CTX_ID.fetch_add(1, Ordering::Relaxed)
323            },
324            #[cfg(debug_assertions)]
325            ops: Vec::new(),
326        }
327    }
328
329    /// Get the session associated with this execution context.
330    pub fn session(&self) -> &VortexSession {
331        &self.session
332    }
333
334    /// Get the session-scoped host allocator for this execution context.
335    pub fn allocator(&self) -> HostAllocatorRef {
336        self.session.allocator()
337    }
338
339    /// Log an execution step at the current depth.
340    ///
341    /// Steps are accumulated and dumped as a single trace on Drop at DEBUG level.
342    /// Individual steps are also logged at TRACE level for real-time following.
343    ///
344    /// Use the [`format_args!`] macro to create the `msg` argument.
345    pub fn log(&mut self, msg: fmt::Arguments<'_>) {
346        #[cfg(debug_assertions)]
347        if tracing::enabled!(tracing::Level::DEBUG) {
348            let formatted = format!(" - {msg}");
349            tracing::trace!("exec[{}]: {formatted}", self.id);
350            self.ops.push(formatted);
351        }
352        let _ = msg;
353    }
354}
355
356impl Display for ExecutionCtx {
357    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
358        #[cfg(debug_assertions)]
359        return write!(f, "exec[{}]", self.id);
360        #[cfg(not(debug_assertions))]
361        write!(f, "exec")
362    }
363}
364
365#[cfg(debug_assertions)]
366impl Drop for ExecutionCtx {
367    fn drop(&mut self) {
368        if !self.ops.is_empty() && tracing::enabled!(tracing::Level::DEBUG) {
369            // Unlike itertools `.format()` (panics in 0.14 on second format)
370            struct FmtOps<'a>(&'a [String]);
371            impl Display for FmtOps<'_> {
372                fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
373                    for (i, op) in self.0.iter().enumerate() {
374                        if i > 0 {
375                            f.write_str("\n")?;
376                        }
377                        f.write_str(op)?;
378                    }
379                    Ok(())
380                }
381            }
382            tracing::debug!("exec[{}] trace:\n{}", self.id, FmtOps(&self.ops));
383        }
384    }
385}
386
387/// Single-step execution: takes one step toward canonical form.
388///
389/// Steps through reduce, reduce_parent, execute_parent, then execute. For `ExecuteSlot`,
390/// only a single child execution step is performed — the child is executed once and put back,
391/// making this a lightweight, bounded operation.
392///
393/// **However**, if `execute_step` returns [`ExecutionStep::AppendChild`], this implementation
394/// drives the *entire* array to completion via [`execute_into_builder`] in a single call.
395/// This can do substantially more work than a normal step because it creates a builder and
396/// fully decodes the array into that builder before returning. Callers should be aware that a
397/// single `.execute::<ArrayRef>(ctx)` call may perform O(n_children * decode_cost) work when
398/// `AppendChild` is returned.
399impl Executable for ArrayRef {
400    fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self> {
401        if let Some(canonical) = array.as_opt::<AnyCanonical>() {
402            ctx.log(format_args!("-> canonical {}", array));
403            return Ok(Canonical::from(canonical).into_array());
404        }
405
406        if let Some(reduced) = array.reduce()? {
407            ctx.log(format_args!("reduce: rewrote {} -> {}", array, reduced));
408            reduced.statistics().inherit_from(array.statistics());
409            return Ok(reduced);
410        }
411
412        for (slot_idx, slot) in array.slots().iter().enumerate() {
413            let Some(child) = slot else { continue };
414            if let Some(reduced_parent) = child.reduce_parent(&array, slot_idx)? {
415                ctx.log(format_args!(
416                    "reduce_parent: slot[{}]({}) rewrote {} -> {}",
417                    slot_idx,
418                    child.encoding_id(),
419                    array,
420                    reduced_parent
421                ));
422                reduced_parent.statistics().inherit_from(array.statistics());
423                return Ok(reduced_parent);
424            }
425        }
426
427        let tmp_session = ctx.session().clone();
428        let kernels = tmp_session.get_opt::<ArrayKernels>();
429
430        for (slot_idx, slot) in array.slots().iter().enumerate() {
431            let Some(child) = slot else { continue };
432            if let Some(executed_parent) =
433                execute_parent_for_child(&array, child, slot_idx, kernels.as_ref(), ctx)?
434            {
435                ctx.log(format_args!(
436                    "execute_parent: slot[{}]({}) rewrote {} -> {}",
437                    slot_idx,
438                    child.encoding_id(),
439                    array,
440                    executed_parent
441                ));
442                executed_parent
443                    .statistics()
444                    .inherit_from(array.statistics());
445                return Ok(executed_parent);
446            }
447        }
448
449        ctx.log(format_args!("executing {}", array));
450        let result = array.execute_encoding(ctx)?;
451        let (array, step) = result.into_parts();
452        match step {
453            ExecutionStep::Done => {
454                ctx.log(format_args!("-> {}", array));
455                Ok(array)
456            }
457            ExecutionStep::ExecuteSlot(i, _) => {
458                let child = array.slots()[i].clone().vortex_expect("valid slot index");
459                let executed_child = child.execute::<ArrayRef>(ctx)?;
460                array.with_slot(i, executed_child)
461            }
462            ExecutionStep::AppendChild(_) => {
463                // Single-step: build the entire parent via the builder path.
464                let builder = builder_with_capacity_in(ctx.allocator(), array.dtype(), array.len());
465                let mut builder = execute_into_builder(array, builder, ctx)?;
466                Ok(builder.finish())
467            }
468        }
469    }
470}
471
472/// Execute `array` into the given `builder`.
473///
474/// This uses the encoding's [`crate::array::VTable::append_to_builder`] implementation. Most
475/// encodings use the default path of `execute::<Canonical>` followed by `builder.extend_from_array`,
476/// while encodings like `Chunked` can override that to append child-by-child without materializing
477/// the entire parent.
478///
479/// The builder must have a [`DType`] that is a nullability-superset of `array.dtype()`.
480pub fn execute_into_builder(
481    array: ArrayRef,
482    mut builder: Box<dyn ArrayBuilder>,
483    ctx: &mut ExecutionCtx,
484) -> VortexResult<Box<dyn ArrayBuilder>> {
485    array.append_to_builder(builder.as_mut(), ctx)?;
486    Ok(builder)
487}
488
489/// Pop a stack frame, restoring the parent with the finished child in its slot.
490fn pop_frame(
491    frame: StackFrame,
492    child: ArrayRef,
493) -> VortexResult<(ArrayRef, Option<Box<dyn ArrayBuilder>>)> {
494    debug_assert_eq!(
495        child.dtype(),
496        &frame.original_dtype,
497        "child dtype changed during execution"
498    );
499    debug_assert_eq!(
500        child.len(),
501        frame.original_len,
502        "child len changed during execution"
503    );
504    let parent_array = unsafe { frame.parent_array.put_slot_unchecked(frame.slot_idx, child) }?;
505    Ok((parent_array, frame.parent_builder))
506}
507
508fn finalize_done(
509    result: ArrayRef,
510    mut builder: Option<Box<dyn ArrayBuilder>>,
511    expected_len: usize,
512    expected_dtype: DType,
513    stats: ArrayStats,
514    encoding_id: ArrayId,
515) -> VortexResult<(ArrayRef, Option<Box<dyn ArrayBuilder>>)> {
516    let output = if let Some(mut builder) = builder.take() {
517        builder.finish()
518    } else {
519        result
520    };
521
522    if cfg!(debug_assertions) {
523        vortex_ensure!(
524            output.len() == expected_len,
525            "Result length mismatch for {:?}",
526            encoding_id
527        );
528        vortex_ensure!(
529            output.dtype() == &expected_dtype,
530            "Executed canonical dtype mismatch for {:?}",
531            encoding_id
532        );
533    }
534
535    output
536        .statistics()
537        .set_iter(StatsSet::from(stats).into_iter());
538    Ok((output, None))
539}
540
541fn execute_parent_for_child(
542    parent: &ArrayRef,
543    child: &ArrayRef,
544    slot_idx: usize,
545    kernels: Option<&Ref<ArrayKernels>>,
546    ctx: &mut ExecutionCtx,
547) -> VortexResult<Option<ArrayRef>> {
548    if let Some(kernels) = kernels
549        && let Some(plugins) =
550            kernels.find_execute_parent(parent.encoding_id(), child.encoding_id())
551    {
552        for plugin in plugins.as_ref() {
553            if let Some(result) = plugin(child, parent, slot_idx, ctx)? {
554                return Ok(Some(result));
555            }
556        }
557    }
558
559    child.execute_parent(parent, slot_idx, ctx)
560}
561
562/// Try execute_parent on each occupied slot of the array.
563fn try_execute_parent(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Option<ArrayRef>> {
564    let tmp_session = ctx.session().clone();
565    let kernels = tmp_session.get_opt::<ArrayKernels>();
566
567    for (slot_idx, slot) in array.slots().iter().enumerate() {
568        let Some(child) = slot else { continue };
569        if let Some(executed_parent) =
570            execute_parent_for_child(array, child, slot_idx, kernels.as_ref(), ctx)?
571        {
572            ctx.log(format_args!(
573                "execute_parent: slot[{}]({}) rewrote {} -> {}",
574                slot_idx,
575                child.encoding_id(),
576                array,
577                executed_parent
578            ));
579            executed_parent
580                .statistics()
581                .inherit_from(array.statistics());
582            return Ok(Some(executed_parent));
583        }
584    }
585    Ok(None)
586}
587
588/// A predicate that determines when an array has reached a desired form during execution.
589pub type DonePredicate = fn(&ArrayRef) -> bool;
590
591/// Scheduler step indicator returned alongside an array in [`ExecutionResult`].
592///
593/// Instead of recursively executing children, encodings return an `ExecutionStep` that tells the
594/// scheduler what to do next. This enables the scheduler to manage execution iteratively using
595/// an explicit work stack plus an optional builder.
596///
597/// # Semantics
598///
599/// Each variant describes a different execution strategy with distinct cost profiles:
600///
601/// - [`Done`](ExecutionStep::Done): The current activation has finished its work. If no builder
602///   is active, the returned array is the result. If a builder is active, the scheduler ignores
603///   the placeholder array and finishes the builder instead. The scheduler may continue
604///   executing if the target form (e.g. canonical) has not yet been reached.
605///
606/// - [`ExecuteSlot`](ExecutionStep::ExecuteSlot): The encoding needs one of its children
607///   decoded before it can make further progress. The scheduler detaches that child, pushes
608///   the parent onto the explicit stack, executes the child until the [`DonePredicate`]
609///   matches, puts it back, and re-enters the parent. This is a cooperative yield: the
610///   encoding does a bounded amount of work per step while the loop tracks the parent-child
611///   relationship explicitly.
612///
613/// - [`AppendChild`](ExecutionStep::AppendChild): The encoding needs one child executed to
614///   canonical form and then appended into a builder owned by the current activation. The
615///   scheduler detaches that child, lazily creates `current_builder` if needed, appends the
616///   child into it, and keeps the parent as `current_array` for the next iteration. While the
617///   builder is active, parent-kernel rewrites are skipped because the parent is partially
618///   consumed. **Important:** in the single-step executor ([`Executable`] for [`ArrayRef`]),
619///   returning `AppendChild` still causes the executor to drive the *entire* array to
620///   completion via [`execute_into_builder`] in one call — this can do significantly more
621///   work than a single `ExecuteSlot` step.
622pub enum ExecutionStep {
623    /// Request that the scheduler execute the slot at the given index, using the provided
624    /// [`DonePredicate`] to determine when the slot is "done", then replace the slot in this
625    /// array and re-enter execution.
626    ///
627    /// Use [`ExecutionResult::execute_slot`] instead of constructing this variant directly.
628    ExecuteSlot(usize, DonePredicate),
629
630    /// Detach the slot at the given index, append that child into the current activation's
631    /// canonical builder, and keep the returned parent as `current_array`.
632    ///
633    /// `Done` finalizes that builder and turns it into the result of the activation.
634    ///
635    /// **Note:** In the single-step executor ([`Executable`] for [`ArrayRef`]), this variant
636    /// drives the entire parent to completion in one call via [`execute_into_builder`], which
637    /// may perform substantially more work than a single `ExecuteSlot` step.
638    AppendChild(usize),
639
640    /// Execution is complete. If no builder is active, the array in the accompanying
641    /// [`ExecutionResult`] is the result. Otherwise, the scheduler finalizes the active
642    /// builder and uses that finished array instead.
643    ///
644    /// The scheduler will continue executing if it has not yet reached the target form.
645    Done,
646}
647
648impl fmt::Debug for ExecutionStep {
649    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
650        match self {
651            ExecutionStep::ExecuteSlot(idx, _) => f.debug_tuple("ExecuteSlot").field(idx).finish(),
652            ExecutionStep::AppendChild(idx) => f.debug_tuple("AppendChild").field(idx).finish(),
653            ExecutionStep::Done => write!(f, "Done"),
654        }
655    }
656}
657
658/// The result of a single execution step on an array encoding.
659///
660/// Combines an [`ArrayRef`] with an [`ExecutionStep`] to tell the scheduler both what to do next
661/// and what array to work with.
662pub struct ExecutionResult {
663    array: ArrayRef,
664    step: ExecutionStep,
665}
666
667impl ExecutionResult {
668    /// Signal that execution is complete with the given result array.
669    pub fn done(result: impl IntoArray) -> Self {
670        Self {
671            array: result.into_array(),
672            step: ExecutionStep::Done,
673        }
674    }
675
676    /// Request execution of slot at `slot_idx` until it matches the given [`Matcher`].
677    ///
678    /// The provided array is the (possibly modified) parent that still needs its slot executed.
679    pub fn execute_slot<M: Matcher>(array: impl IntoArray, slot_idx: usize) -> Self {
680        Self {
681            array: array.into_array(),
682            step: ExecutionStep::ExecuteSlot(slot_idx, M::matches),
683        }
684    }
685
686    /// Request that the child slot at `slot_idx` be detached, appended into the current
687    /// activation's canonical builder, and leave the returned parent as the next
688    /// `current_array`.
689    pub fn append_child(array: impl IntoArray, slot_idx: usize) -> Self {
690        Self {
691            array: array.into_array(),
692            step: ExecutionStep::AppendChild(slot_idx),
693        }
694    }
695
696    /// Returns a reference to the array.
697    pub fn array(&self) -> &ArrayRef {
698        &self.array
699    }
700
701    /// Returns a reference to the step.
702    pub fn step(&self) -> &ExecutionStep {
703        &self.step
704    }
705
706    /// Decompose into parts.
707    pub fn into_parts(self) -> (ArrayRef, ExecutionStep) {
708        (self.array, self.step)
709    }
710}
711
712impl fmt::Debug for ExecutionResult {
713    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
714        f.debug_struct("ExecutionResult")
715            .field("array", &self.array)
716            .field("step", &self.step)
717            .finish()
718    }
719}
720
721/// Require that a child array matches `$M`. If the child already matches, returns the same
722/// array unchanged. Otherwise, early-returns an [`ExecutionResult`] requesting execution of
723/// child `$idx` until it matches `$M`.
724///
725/// ```ignore
726/// let array = require_child!(array, array.codes(), 0 => Primitive);
727/// let array = require_child!(array, array.values(), 1 => AnyCanonical);
728/// ```
729#[macro_export]
730macro_rules! require_child {
731    ($parent:expr, $child:expr, $idx:expr => $M:ty) => {{
732        if !$child.is::<$M>() {
733            return Ok($crate::ExecutionResult::execute_slot::<$M>(
734                $parent.clone(),
735                $idx,
736            ));
737        }
738        $parent
739    }};
740}
741
742/// Like [`require_child!`], but for optional children. If the child is `None`, this is a no-op.
743/// If the child is `Some` but does not match `$M`, early-returns an [`ExecutionResult`] requesting
744/// execution of child `$idx`.
745///
746/// Unlike `require_child!`, this is a statement macro (no value produced) and does not clone
747/// `$parent` - it is moved into the early-return path.
748///
749/// ```ignore
750/// require_opt_child!(array, array.patches().map(|p| p.indices()), 1 => Primitive);
751/// ```
752#[macro_export]
753macro_rules! require_opt_child {
754    ($parent:expr, $child_opt:expr, $idx:expr => $M:ty) => {
755        if $child_opt.is_some_and(|child| !child.is::<$M>()) {
756            return Ok($crate::ExecutionResult::execute_slot::<$M>($parent, $idx));
757        }
758    };
759}
760
761/// Require that patch slots (indices, values, and optionally chunk_offsets) are `Primitive`.
762/// If no patches are present (slots are `None`), this is a no-op.
763///
764/// Like [`require_opt_child!`], `$parent` is moved (not cloned) into the early-return path.
765///
766/// ```ignore
767/// require_patches!(array, PATCH_INDICES_SLOT, PATCH_VALUES_SLOT, PATCH_CHUNK_OFFSETS_SLOT);
768/// ```
769#[macro_export]
770macro_rules! require_patches {
771    ($parent:expr, $indices_slot:expr, $values_slot:expr, $chunk_offsets_slot:expr) => {
772        $crate::require_opt_child!(
773            $parent,
774            $parent.slots()[$indices_slot].as_ref(),
775            $indices_slot => $crate::arrays::Primitive
776        );
777        $crate::require_opt_child!(
778            $parent,
779            $parent.slots()[$values_slot].as_ref(),
780            $values_slot => $crate::arrays::Primitive
781        );
782        $crate::require_opt_child!(
783            $parent,
784            $parent.slots()[$chunk_offsets_slot].as_ref(),
785            $chunk_offsets_slot => $crate::arrays::Primitive
786        );
787    };
788}
789
790/// Require that the validity slot is a [`Bool`](crate::arrays::Bool) array. If validity is not
791/// array-backed (e.g. `NonNullable` or `AllValid`), this is a no-op. If it is array-backed but
792/// not `Bool`, early-returns an [`ExecutionResult`] requesting execution of the validity slot.
793///
794/// Like [`require_opt_child!`], `$parent` is moved (not cloned) into the early-return path.
795///
796/// ```ignore
797/// require_validity!(array, VALIDITY_SLOT);
798/// ```
799#[macro_export]
800macro_rules! require_validity {
801    ($parent:expr, $idx:expr) => {
802        $crate::require_opt_child!(
803            $parent,
804            $parent.slots()[$idx].as_ref(),
805            $idx => $crate::arrays::Bool
806        );
807    };
808}
809
810/// Extension trait for creating an execution context from a session.
811pub trait VortexSessionExecute {
812    /// Create a new execution context from this session.
813    fn create_execution_ctx(&self) -> ExecutionCtx;
814}
815
816impl VortexSessionExecute for VortexSession {
817    fn create_execution_ctx(&self) -> ExecutionCtx {
818        ExecutionCtx::new(self.clone())
819    }
820}