Skip to main content

vortex_array/
executor.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! The execution engine: iteratively transforms arrays toward canonical form.
5//!
6//! Execution proceeds through four layers tried in order on each iteration:
7//!
8//! 1. **`reduce`** -- metadata-only self-rewrite (cheapest).
9//! 2. **`reduce_parent`** -- metadata-only child-driven parent rewrite.
10//! 3. **`execute_parent`** -- child-driven fused execution (may read buffers).
11//! 4. **`execute`** -- the encoding's own decode step (most expensive).
12//!
13//! The main entry point is [`DynArray::execute_until`], which uses an explicit work stack
14//! to drive execution iteratively without recursion. Between steps, the optimizer runs
15//! reduce/reduce_parent rules to fixpoint.
16//!
17//! See <https://docs.vortex.dev/developer-guide/internals/execution> for a full description
18//! of the model.
19
20use std::env::VarError;
21use std::fmt;
22use std::fmt::Display;
23use std::sync::Arc;
24use std::sync::LazyLock;
25use std::sync::atomic::AtomicUsize;
26
27use vortex_error::VortexExpect;
28use vortex_error::VortexResult;
29use vortex_error::vortex_bail;
30use vortex_error::vortex_panic;
31use vortex_session::VortexSession;
32
33use crate::AnyCanonical;
34use crate::ArrayRef;
35use crate::Canonical;
36use crate::DynArray;
37use crate::IntoArray;
38use crate::matcher::Matcher;
39use crate::optimizer::ArrayOptimizer;
40use crate::vtable::VTable;
41use crate::vtable::upcast_array;
42
43/// Maximum number of iterations to attempt when executing an array before giving up and returning
44/// an error.
45pub(crate) static MAX_ITERATIONS: LazyLock<usize> =
46    LazyLock::new(|| match std::env::var("VORTEX_MAX_ITERATIONS") {
47        Ok(val) => val
48            .parse::<usize>()
49            .unwrap_or_else(|e| vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid usize: {e}")),
50        Err(VarError::NotPresent) => 128,
51        Err(VarError::NotUnicode(_)) => {
52            vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid unicode string")
53        }
54    });
55
56/// Marker trait for types that an [`ArrayRef`] can be executed into.
57///
58/// Implementors must provide an implementation of `execute` that takes
59/// an [`ArrayRef`] and an [`ExecutionCtx`], and produces an instance of the
60/// implementor type.
61///
62/// Users should use the `Array::execute` or `Array::execute_as` methods
63pub trait Executable: Sized {
64    fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self>;
65}
66
67impl dyn DynArray + '_ {
68    /// Execute this array to produce an instance of `E`.
69    ///
70    /// See the [`Executable`] implementation for details on how this execution is performed.
71    pub fn execute<E: Executable>(self: Arc<Self>, ctx: &mut ExecutionCtx) -> VortexResult<E> {
72        E::execute(self, ctx)
73    }
74
75    /// Execute this array, labeling the execution step with a name for tracing.
76    pub fn execute_as<E: Executable>(
77        self: Arc<Self>,
78        _name: &'static str,
79        ctx: &mut ExecutionCtx,
80    ) -> VortexResult<E> {
81        E::execute(self, ctx)
82    }
83
84    /// Iteratively execute this array until the [`Matcher`] matches, using an explicit work
85    /// stack.
86    ///
87    /// The scheduler repeatedly:
88    /// 1. Checks if the current array matches `M` — if so, pops the stack or returns.
89    /// 2. Runs `execute_parent` on each child for child-driven optimizations.
90    /// 3. Calls `execute` which returns an [`ExecutionStep`].
91    ///
92    /// Note: the returned array may not match `M`. If execution converges to a canonical form
93    /// that does not match `M`, the canonical array is returned since no further execution
94    /// progress is possible.
95    ///
96    /// For safety, we will error when the number of execution iterations reaches a configurable
97    /// maximum (default 128, override with `VORTEX_MAX_ITERATIONS`).
98    pub fn execute_until<M: Matcher>(
99        self: Arc<Self>,
100        ctx: &mut ExecutionCtx,
101    ) -> VortexResult<ArrayRef> {
102        static MAX_ITERATIONS: LazyLock<usize> =
103            LazyLock::new(|| match std::env::var("VORTEX_MAX_ITERATIONS") {
104                Ok(val) => val.parse::<usize>().unwrap_or_else(|e| {
105                    vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid usize: {e}")
106                }),
107                Err(VarError::NotPresent) => 128,
108                Err(VarError::NotUnicode(_)) => {
109                    vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid unicode string")
110                }
111            });
112
113        let mut current = self.optimize()?;
114        // Stack frames: (parent, child_idx, done_predicate_for_child)
115        let mut stack: Vec<(ArrayRef, usize, DonePredicate)> = Vec::new();
116
117        for _ in 0..*MAX_ITERATIONS {
118            // Check for termination: use the stack frame's done predicate, or the root matcher.
119            let is_done = stack
120                .last()
121                .map_or(M::matches as DonePredicate, |frame| frame.2);
122            if is_done(current.as_ref()) {
123                match stack.pop() {
124                    None => {
125                        ctx.log(format_args!("-> {}", current));
126                        return Ok(current);
127                    }
128                    Some((parent, child_idx, _)) => {
129                        current = parent.with_child(child_idx, current)?;
130                        current = current.optimize()?;
131                        continue;
132                    }
133                }
134            }
135
136            // If we've reached canonical form, we can't execute any further regardless
137            // of whether the matcher matched.
138            if AnyCanonical::matches(current.as_ref()) {
139                match stack.pop() {
140                    None => {
141                        ctx.log(format_args!("-> canonical (unmatched) {}", current));
142                        return Ok(current);
143                    }
144                    Some((parent, child_idx, _)) => {
145                        current = parent.with_child(child_idx, current)?;
146                        current = current.optimize()?;
147                        continue;
148                    }
149                }
150            }
151
152            // Try execute_parent (child-driven optimized execution)
153            if let Some(rewritten) = try_execute_parent(&current, ctx)? {
154                ctx.log(format_args!(
155                    "execute_parent rewrote {} -> {}",
156                    current, rewritten
157                ));
158                current = rewritten.optimize()?;
159                continue;
160            }
161
162            // Execute the array itself.
163            let result = execute_step(current, ctx)?;
164            let (array, step) = result.into_parts();
165            match step {
166                ExecutionStep::ExecuteChild(i, done) => {
167                    let child = array
168                        .nth_child(i)
169                        .vortex_expect("ExecuteChild index in bounds");
170                    ctx.log(format_args!(
171                        "ExecuteChild({i}): pushing {}, focusing on {}",
172                        array, child
173                    ));
174                    stack.push((array, i, done));
175                    current = child.optimize()?;
176                }
177                ExecutionStep::Done => {
178                    ctx.log(format_args!("Done: {}", array));
179                    current = array;
180                }
181            }
182        }
183
184        vortex_bail!(
185            "Exceeded maximum execution iterations ({}) while executing array",
186            *MAX_ITERATIONS,
187        )
188    }
189}
190
191/// Execution context for batch CPU compute.
192///
193/// Accumulates a trace of execution steps. Individual steps are logged at TRACE level for
194/// real-time following, and the full trace is dumped at DEBUG level when the context is dropped.
195pub struct ExecutionCtx {
196    id: usize,
197    session: VortexSession,
198    ops: Vec<String>,
199}
200
201impl ExecutionCtx {
202    /// Create a new execution context with the given session.
203    pub fn new(session: VortexSession) -> Self {
204        static EXEC_CTX_ID: AtomicUsize = AtomicUsize::new(0);
205        let id = EXEC_CTX_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
206        Self {
207            id,
208            session,
209            ops: Vec::new(),
210        }
211    }
212
213    /// Get the session associated with this execution context.
214    pub fn session(&self) -> &VortexSession {
215        &self.session
216    }
217
218    /// Log an execution step at the current depth.
219    ///
220    /// Steps are accumulated and dumped as a single trace on Drop at DEBUG level.
221    /// Individual steps are also logged at TRACE level for real-time following.
222    ///
223    /// Use the [`format_args!`] macro to create the `msg` argument.
224    pub fn log(&mut self, msg: fmt::Arguments<'_>) {
225        if tracing::enabled!(tracing::Level::DEBUG) {
226            let formatted = format!(" - {msg}");
227            tracing::trace!("exec[{}]: {formatted}", self.id);
228            self.ops.push(formatted);
229        }
230    }
231}
232
233impl Display for ExecutionCtx {
234    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
235        write!(f, "exec[{}]", self.id)
236    }
237}
238
239impl Drop for ExecutionCtx {
240    fn drop(&mut self) {
241        if !self.ops.is_empty() && tracing::enabled!(tracing::Level::DEBUG) {
242            // Unlike itertools `.format()` (panics in 0.14 on second format)
243            struct FmtOps<'a>(&'a [String]);
244            impl Display for FmtOps<'_> {
245                fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
246                    for (i, op) in self.0.iter().enumerate() {
247                        if i > 0 {
248                            f.write_str("\n")?;
249                        }
250                        f.write_str(op)?;
251                    }
252                    Ok(())
253                }
254            }
255            tracing::debug!("exec[{}] trace:\n{}", self.id, FmtOps(&self.ops));
256        }
257    }
258}
259
260/// Executing an [`ArrayRef`] into an [`ArrayRef`] is the atomic execution loop within Vortex.
261///
262/// It attempts to take the smallest possible step of execution such that the returned array
263/// is incrementally more "executed" than the input array. In other words, it is closer to becoming
264/// a canonical array.
265///
266/// The execution steps are as follows:
267/// 0. Check for canonical.
268/// 1. Attempt to `reduce` the array with metadata-only optimizations.
269/// 2. Attempt to call `reduce_parent` on each child.
270/// 3. Attempt to call `execute_parent` on each child.
271/// 4. Call `execute` on the array itself (which returns an [`ExecutionStep`]).
272///
273/// Most users will not call this method directly, instead preferring to specify an executable
274/// target such as [`crate::Columnar`], [`Canonical`], or any of the canonical array types (such as
275/// [`crate::arrays::PrimitiveArray`]).
276impl Executable for ArrayRef {
277    fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self> {
278        // 0. Check for canonical
279        if let Some(canonical) = array.as_opt::<AnyCanonical>() {
280            ctx.log(format_args!("-> canonical {}", array));
281            return Ok(Canonical::from(canonical).into_array());
282        }
283
284        // 1. reduce (metadata-only rewrites)
285        if let Some(reduced) = array.vtable().reduce(&array)? {
286            ctx.log(format_args!("reduce: rewrote {} -> {}", array, reduced));
287            reduced.statistics().inherit_from(array.statistics());
288            return Ok(reduced);
289        }
290
291        // 2. reduce_parent (child-driven metadata-only rewrites)
292        for child_idx in 0..array.nchildren() {
293            let child = array.nth_child(child_idx).vortex_expect("checked length");
294            if let Some(reduced_parent) = child.vtable().reduce_parent(&child, &array, child_idx)? {
295                ctx.log(format_args!(
296                    "reduce_parent: child[{}]({}) rewrote {} -> {}",
297                    child_idx,
298                    child.encoding_id(),
299                    array,
300                    reduced_parent
301                ));
302                reduced_parent.statistics().inherit_from(array.statistics());
303                return Ok(reduced_parent);
304            }
305        }
306
307        // 3. execute_parent (child-driven optimized execution)
308        for child_idx in 0..array.nchildren() {
309            // TODO(joe): remove internal copy in nth_child.
310            let child = array.nth_child(child_idx).vortex_expect("checked length");
311            if let Some(executed_parent) = child
312                .vtable()
313                .execute_parent(&child, &array, child_idx, ctx)?
314            {
315                ctx.log(format_args!(
316                    "execute_parent: child[{}]({}) rewrote {} -> {}",
317                    child_idx,
318                    child.encoding_id(),
319                    array,
320                    executed_parent
321                ));
322                executed_parent
323                    .statistics()
324                    .inherit_from(array.statistics());
325                return Ok(executed_parent);
326            }
327        }
328
329        // 4. execute (returns an ExecutionResult)
330        ctx.log(format_args!("executing {}", array));
331        let result = execute_step(array, ctx)?;
332        let (array, step) = result.into_parts();
333        match step {
334            ExecutionStep::Done => {
335                ctx.log(format_args!("-> {}", array.as_ref()));
336                Ok(array)
337            }
338            ExecutionStep::ExecuteChild(i, _) => {
339                // For single-step execution, handle ExecuteChild by executing the child,
340                // replacing it, and returning the updated array.
341                let child = array.nth_child(i).vortex_expect("valid child index");
342                let executed_child = child.execute::<ArrayRef>(ctx)?;
343                array.with_child(i, executed_child)
344            }
345        }
346    }
347}
348
349/// Execute a single step on an array, consuming it.
350///
351/// Extracts the vtable before consuming the array to avoid borrow conflicts.
352fn execute_step(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
353    let vtable = array.vtable().clone_boxed();
354    vtable.execute(array, ctx)
355}
356
357/// Try execute_parent on each child of the array.
358fn try_execute_parent(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Option<ArrayRef>> {
359    for child_idx in 0..array.nchildren() {
360        let child = array
361            .nth_child(child_idx)
362            .vortex_expect("checked nchildren");
363        if let Some(result) = child
364            .vtable()
365            .execute_parent(&child, array, child_idx, ctx)?
366        {
367            result.statistics().inherit_from(array.statistics());
368            return Ok(Some(result));
369        }
370    }
371    Ok(None)
372}
373
374/// A predicate that determines when an array has reached a desired form during execution.
375pub type DonePredicate = fn(&dyn DynArray) -> bool;
376
377/// Metadata-only step indicator returned alongside an array in [`ExecutionResult`].
378///
379/// Instead of recursively executing children, encodings return an `ExecutionStep` that tells the
380/// scheduler what to do next. This enables the scheduler to manage execution iteratively using
381/// an explicit work stack, run cross-step optimizations, and cache shared sub-expressions.
382pub enum ExecutionStep {
383    /// Request that the scheduler execute child at the given index, using the provided
384    /// [`DonePredicate`] to determine when the child is "done", then replace the child in this
385    /// array and re-enter execution.
386    ///
387    /// Between steps, the scheduler runs reduce/reduce_parent rules to fixpoint, enabling
388    /// cross-step optimization (e.g., pushing scalar functions through newly-decoded children).
389    ExecuteChild(usize, DonePredicate),
390
391    /// Execution is complete. The array in the accompanying [`ExecutionResult`] is the result.
392    /// The scheduler will continue executing if it has not yet reached the target form.
393    Done,
394}
395
396impl fmt::Debug for ExecutionStep {
397    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
398        match self {
399            ExecutionStep::ExecuteChild(idx, _) => {
400                f.debug_tuple("ExecuteChild").field(idx).finish()
401            }
402            ExecutionStep::Done => write!(f, "Done"),
403        }
404    }
405}
406
407/// The result of a single execution step on an array encoding.
408///
409/// Combines an [`ArrayRef`] with an [`ExecutionStep`] to tell the scheduler both what to do next
410/// and what array to work with.
411pub struct ExecutionResult {
412    array: ArrayRef,
413    step: ExecutionStep,
414}
415
416impl ExecutionResult {
417    /// Signal that execution is complete with the given result array.
418    pub fn done(result: impl IntoArray) -> Self {
419        Self {
420            array: result.into_array(),
421            step: ExecutionStep::Done,
422        }
423    }
424
425    pub fn done_upcast<V: VTable>(arr: Arc<V::Array>) -> Self {
426        Self {
427            array: upcast_array::<V>(arr),
428            step: ExecutionStep::Done,
429        }
430    }
431
432    /// Request execution of child at `child_idx` until it matches the given [`Matcher`].
433    ///
434    /// The provided array is the (possibly modified) parent that still needs its child executed.
435    pub fn execute_child<M: Matcher>(array: impl IntoArray, child_idx: usize) -> Self {
436        Self {
437            array: array.into_array(),
438            step: ExecutionStep::ExecuteChild(child_idx, M::matches),
439        }
440    }
441
442    /// Like [`execute_child`](Self::execute_child), but accepts an `Arc<V::Array>` directly,
443    /// upcasting it to an [`ArrayRef`].
444    pub fn execute_child_upcast<V: VTable, M: Matcher>(
445        array: Arc<V::Array>,
446        child_idx: usize,
447    ) -> Self {
448        Self {
449            array: upcast_array::<V>(array),
450            step: ExecutionStep::ExecuteChild(child_idx, M::matches),
451        }
452    }
453
454    /// Returns a reference to the array.
455    pub fn array(&self) -> &ArrayRef {
456        &self.array
457    }
458
459    /// Returns a reference to the step.
460    pub fn step(&self) -> &ExecutionStep {
461        &self.step
462    }
463
464    /// Decompose into parts.
465    pub fn into_parts(self) -> (ArrayRef, ExecutionStep) {
466        (self.array, self.step)
467    }
468}
469
470impl fmt::Debug for ExecutionResult {
471    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
472        f.debug_struct("ExecutionResult")
473            .field("array", &self.array)
474            .field("step", &self.step)
475            .finish()
476    }
477}
478
479/// Require that a child array matches `$M`. If the child already matches, returns the same
480/// array unchanged. Otherwise, early-returns an [`ExecutionResult`] requesting execution of
481/// child `$idx` until it matches `$M`.
482///
483/// ```ignore
484/// let codes = require_child!(Self, array, array.codes(), 0 => Primitive);
485/// let values = require_child!(Self, array, array.values(), 1 => AnyCanonical);
486/// ```
487#[macro_export]
488macro_rules! require_child {
489    ($V:ty, $parent:expr, $child:expr, $idx:expr => $M:ty) => {{
490        if !$child.is::<$M>() {
491            return Ok($crate::ExecutionResult::execute_child_upcast::<$V, $M>(
492                std::sync::Arc::clone(&$parent),
493                $idx,
494            ));
495        }
496        $parent
497    }};
498}
499
500/// Extension trait for creating an execution context from a session.
501pub trait VortexSessionExecute {
502    /// Create a new execution context from this session.
503    fn create_execution_ctx(&self) -> ExecutionCtx;
504}
505
506impl VortexSessionExecute for VortexSession {
507    fn create_execution_ctx(&self) -> ExecutionCtx {
508        ExecutionCtx::new(self.clone())
509    }
510}