Skip to main content

vortex_array/
executor.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! The execution engine: iteratively transforms arrays toward canonical form.
5//!
6//! Execution proceeds through four layers tried in order on each iteration:
7//!
8//! 1. **`reduce`** -- metadata-only self-rewrite (cheapest).
9//! 2. **`reduce_parent`** -- metadata-only child-driven parent rewrite.
10//! 3. **`execute_parent`** -- child-driven fused execution (may read buffers).
11//! 4. **`execute`** -- the encoding's own decode step (most expensive).
12//!
13//! The main entry point is [`DynArray::execute_until`], which uses an explicit work stack
14//! to drive execution iteratively without recursion. Between steps, the optimizer runs
15//! reduce/reduce_parent rules to fixpoint.
16//!
17//! See <https://docs.vortex.dev/developer-guide/internals/execution> for a full description
18//! of the model.
19
20use std::env::VarError;
21use std::fmt;
22use std::fmt::Display;
23use std::sync::Arc;
24use std::sync::LazyLock;
25use std::sync::atomic::AtomicUsize;
26
27use vortex_error::VortexExpect;
28use vortex_error::VortexResult;
29use vortex_error::vortex_bail;
30use vortex_error::vortex_panic;
31use vortex_session::VortexSession;
32
33use crate::AnyCanonical;
34use crate::ArrayRef;
35use crate::Canonical;
36use crate::DynArray;
37use crate::IntoArray;
38use crate::matcher::Matcher;
39use crate::optimizer::ArrayOptimizer;
40
41/// Maximum number of iterations to attempt when executing an array before giving up and returning
42/// an error.
43pub(crate) static MAX_ITERATIONS: LazyLock<usize> =
44    LazyLock::new(|| match std::env::var("VORTEX_MAX_ITERATIONS") {
45        Ok(val) => val
46            .parse::<usize>()
47            .unwrap_or_else(|e| vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid usize: {e}")),
48        Err(VarError::NotPresent) => 128,
49        Err(VarError::NotUnicode(_)) => {
50            vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid unicode string")
51        }
52    });
53
54/// Marker trait for types that an [`ArrayRef`] can be executed into.
55///
56/// Implementors must provide an implementation of `execute` that takes
57/// an [`ArrayRef`] and an [`ExecutionCtx`], and produces an instance of the
58/// implementor type.
59///
60/// Users should use the `Array::execute` or `Array::execute_as` methods
61pub trait Executable: Sized {
62    fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self>;
63}
64
65impl dyn DynArray + '_ {
66    /// Execute this array to produce an instance of `E`.
67    ///
68    /// See the [`Executable`] implementation for details on how this execution is performed.
69    pub fn execute<E: Executable>(self: Arc<Self>, ctx: &mut ExecutionCtx) -> VortexResult<E> {
70        E::execute(self, ctx)
71    }
72
73    /// Execute this array, labeling the execution step with a name for tracing.
74    pub fn execute_as<E: Executable>(
75        self: Arc<Self>,
76        _name: &'static str,
77        ctx: &mut ExecutionCtx,
78    ) -> VortexResult<E> {
79        E::execute(self, ctx)
80    }
81
82    /// Iteratively execute this array until the [`Matcher`] matches, using an explicit work
83    /// stack.
84    ///
85    /// The scheduler repeatedly:
86    /// 1. Checks if the current array matches `M` — if so, pops the stack or returns.
87    /// 2. Runs `execute_parent` on each child for child-driven optimizations.
88    /// 3. Calls `execute` which returns an [`ExecutionStep`].
89    ///
90    /// Note: the returned array may not match `M`. If execution converges to a canonical form
91    /// that does not match `M`, the canonical array is returned since no further execution
92    /// progress is possible.
93    ///
94    /// For safety, we will error when the number of execution iterations reaches a configurable
95    /// maximum (default 128, override with `VORTEX_MAX_ITERATIONS`).
96    pub fn execute_until<M: Matcher>(
97        self: Arc<Self>,
98        ctx: &mut ExecutionCtx,
99    ) -> VortexResult<ArrayRef> {
100        static MAX_ITERATIONS: LazyLock<usize> =
101            LazyLock::new(|| match std::env::var("VORTEX_MAX_ITERATIONS") {
102                Ok(val) => val.parse::<usize>().unwrap_or_else(|e| {
103                    vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid usize: {e}")
104                }),
105                Err(VarError::NotPresent) => 128,
106                Err(VarError::NotUnicode(_)) => {
107                    vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid unicode string")
108                }
109            });
110
111        let mut current = self.optimize()?;
112        // Stack frames: (parent, child_idx, done_predicate_for_child)
113        let mut stack: Vec<(ArrayRef, usize, DonePredicate)> = Vec::new();
114
115        for _ in 0..*MAX_ITERATIONS {
116            // Check for termination: use the stack frame's done predicate, or the root matcher.
117            let is_done = stack
118                .last()
119                .map_or(M::matches as DonePredicate, |frame| frame.2);
120            if is_done(current.as_ref()) {
121                match stack.pop() {
122                    None => {
123                        ctx.log(format_args!("-> {}", current));
124                        return Ok(current);
125                    }
126                    Some((parent, child_idx, _)) => {
127                        current = parent.with_child(child_idx, current)?;
128                        current = current.optimize()?;
129                        continue;
130                    }
131                }
132            }
133
134            // If we've reached canonical form, we can't execute any further regardless
135            // of whether the matcher matched.
136            if AnyCanonical::matches(current.as_ref()) {
137                match stack.pop() {
138                    None => {
139                        ctx.log(format_args!("-> canonical (unmatched) {}", current));
140                        return Ok(current);
141                    }
142                    Some((parent, child_idx, _)) => {
143                        current = parent.with_child(child_idx, current)?;
144                        current = current.optimize()?;
145                        continue;
146                    }
147                }
148            }
149
150            // Try execute_parent (child-driven optimized execution)
151            if let Some(rewritten) = try_execute_parent(&current, ctx)? {
152                ctx.log(format_args!(
153                    "execute_parent rewrote {} -> {}",
154                    current, rewritten
155                ));
156                current = rewritten.optimize()?;
157                continue;
158            }
159
160            // Execute the array itself.
161            let result = execute_step(current, ctx)?;
162            let (array, step) = result.into_parts();
163            match step {
164                ExecutionStep::ExecuteChild(i, done) => {
165                    let child = array
166                        .nth_child(i)
167                        .vortex_expect("ExecuteChild index in bounds");
168                    ctx.log(format_args!(
169                        "ExecuteChild({i}): pushing {}, focusing on {}",
170                        array, child
171                    ));
172                    stack.push((array, i, done));
173                    current = child.optimize()?;
174                }
175                ExecutionStep::Done => {
176                    ctx.log(format_args!("Done: {}", array));
177                    current = array;
178                }
179            }
180        }
181
182        vortex_bail!(
183            "Exceeded maximum execution iterations ({}) while executing array",
184            *MAX_ITERATIONS,
185        )
186    }
187}
188
189/// Execution context for batch CPU compute.
190///
191/// Accumulates a trace of execution steps. Individual steps are logged at TRACE level for
192/// real-time following, and the full trace is dumped at DEBUG level when the context is dropped.
193pub struct ExecutionCtx {
194    id: usize,
195    session: VortexSession,
196    ops: Vec<String>,
197}
198
199impl ExecutionCtx {
200    /// Create a new execution context with the given session.
201    pub fn new(session: VortexSession) -> Self {
202        static EXEC_CTX_ID: AtomicUsize = AtomicUsize::new(0);
203        let id = EXEC_CTX_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
204        Self {
205            id,
206            session,
207            ops: Vec::new(),
208        }
209    }
210
211    /// Get the session associated with this execution context.
212    pub fn session(&self) -> &VortexSession {
213        &self.session
214    }
215
216    /// Log an execution step at the current depth.
217    ///
218    /// Steps are accumulated and dumped as a single trace on Drop at DEBUG level.
219    /// Individual steps are also logged at TRACE level for real-time following.
220    ///
221    /// Use the [`format_args!`] macro to create the `msg` argument.
222    pub fn log(&mut self, msg: fmt::Arguments<'_>) {
223        if tracing::enabled!(tracing::Level::DEBUG) {
224            let formatted = format!(" - {msg}");
225            tracing::trace!("exec[{}]: {formatted}", self.id);
226            self.ops.push(formatted);
227        }
228    }
229}
230
231impl Display for ExecutionCtx {
232    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
233        write!(f, "exec[{}]", self.id)
234    }
235}
236
237impl Drop for ExecutionCtx {
238    fn drop(&mut self) {
239        if !self.ops.is_empty() && tracing::enabled!(tracing::Level::DEBUG) {
240            // Unlike itertools `.format()` (panics in 0.14 on second format)
241            struct FmtOps<'a>(&'a [String]);
242            impl Display for FmtOps<'_> {
243                fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
244                    for (i, op) in self.0.iter().enumerate() {
245                        if i > 0 {
246                            f.write_str("\n")?;
247                        }
248                        f.write_str(op)?;
249                    }
250                    Ok(())
251                }
252            }
253            tracing::debug!("exec[{}] trace:\n{}", self.id, FmtOps(&self.ops));
254        }
255    }
256}
257
258/// Executing an [`ArrayRef`] into an [`ArrayRef`] is the atomic execution loop within Vortex.
259///
260/// It attempts to take the smallest possible step of execution such that the returned array
261/// is incrementally more "executed" than the input array. In other words, it is closer to becoming
262/// a canonical array.
263///
264/// The execution steps are as follows:
265/// 0. Check for canonical.
266/// 1. Attempt to `reduce` the array with metadata-only optimizations.
267/// 2. Attempt to call `reduce_parent` on each child.
268/// 3. Attempt to call `execute_parent` on each child.
269/// 4. Call `execute` on the array itself (which returns an [`ExecutionStep`]).
270///
271/// Most users will not call this method directly, instead preferring to specify an executable
272/// target such as [`crate::Columnar`], [`Canonical`], or any of the canonical array types (such as
273/// [`crate::arrays::PrimitiveArray`]).
274impl Executable for ArrayRef {
275    fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self> {
276        // 0. Check for canonical
277        if let Some(canonical) = array.as_opt::<AnyCanonical>() {
278            ctx.log(format_args!("-> canonical {}", array));
279            return Ok(Canonical::from(canonical).into_array());
280        }
281
282        // 1. reduce (metadata-only rewrites)
283        if let Some(reduced) = array.vtable().reduce(&array)? {
284            ctx.log(format_args!("reduce: rewrote {} -> {}", array, reduced));
285            reduced.statistics().inherit_from(array.statistics());
286            return Ok(reduced);
287        }
288
289        // 2. reduce_parent (child-driven metadata-only rewrites)
290        for child_idx in 0..array.nchildren() {
291            let child = array.nth_child(child_idx).vortex_expect("checked length");
292            if let Some(reduced_parent) = child.vtable().reduce_parent(&child, &array, child_idx)? {
293                ctx.log(format_args!(
294                    "reduce_parent: child[{}]({}) rewrote {} -> {}",
295                    child_idx,
296                    child.encoding_id(),
297                    array,
298                    reduced_parent
299                ));
300                reduced_parent.statistics().inherit_from(array.statistics());
301                return Ok(reduced_parent);
302            }
303        }
304
305        // 3. execute_parent (child-driven optimized execution)
306        for child_idx in 0..array.nchildren() {
307            // TODO(joe): remove internal copy in nth_child.
308            let child = array.nth_child(child_idx).vortex_expect("checked length");
309            if let Some(executed_parent) = child
310                .vtable()
311                .execute_parent(&child, &array, child_idx, ctx)?
312            {
313                ctx.log(format_args!(
314                    "execute_parent: child[{}]({}) rewrote {} -> {}",
315                    child_idx,
316                    child.encoding_id(),
317                    array,
318                    executed_parent
319                ));
320                executed_parent
321                    .statistics()
322                    .inherit_from(array.statistics());
323                return Ok(executed_parent);
324            }
325        }
326
327        // 4. execute (returns an ExecutionResult)
328        ctx.log(format_args!("executing {}", array));
329        let result = execute_step(array, ctx)?;
330        let (array, step) = result.into_parts();
331        match step {
332            ExecutionStep::Done => {
333                ctx.log(format_args!("-> {}", array.as_ref()));
334                Ok(array)
335            }
336            ExecutionStep::ExecuteChild(i, _) => {
337                // For single-step execution, handle ExecuteChild by executing the child,
338                // replacing it, and returning the updated array.
339                let child = array.nth_child(i).vortex_expect("valid child index");
340                let executed_child = child.execute::<ArrayRef>(ctx)?;
341                array.with_child(i, executed_child)
342            }
343        }
344    }
345}
346
347/// Execute a single step on an array, consuming it.
348///
349/// Extracts the vtable before consuming the array to avoid borrow conflicts.
350fn execute_step(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
351    let vtable = array.vtable().clone_boxed();
352    vtable.execute(array, ctx)
353}
354
355/// Try execute_parent on each child of the array.
356fn try_execute_parent(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Option<ArrayRef>> {
357    for child_idx in 0..array.nchildren() {
358        let child = array
359            .nth_child(child_idx)
360            .vortex_expect("checked nchildren");
361        if let Some(result) = child
362            .vtable()
363            .execute_parent(&child, array, child_idx, ctx)?
364        {
365            result.statistics().inherit_from(array.statistics());
366            return Ok(Some(result));
367        }
368    }
369    Ok(None)
370}
371
372/// A predicate that determines when an array has reached a desired form during execution.
373pub type DonePredicate = fn(&dyn DynArray) -> bool;
374
375/// Metadata-only step indicator returned alongside an array in [`ExecutionResult`].
376///
377/// Instead of recursively executing children, encodings return an `ExecutionStep` that tells the
378/// scheduler what to do next. This enables the scheduler to manage execution iteratively using
379/// an explicit work stack, run cross-step optimizations, and cache shared sub-expressions.
380pub enum ExecutionStep {
381    /// Request that the scheduler execute child at the given index, using the provided
382    /// [`DonePredicate`] to determine when the child is "done", then replace the child in this
383    /// array and re-enter execution.
384    ///
385    /// Between steps, the scheduler runs reduce/reduce_parent rules to fixpoint, enabling
386    /// cross-step optimization (e.g., pushing scalar functions through newly-decoded children).
387    ExecuteChild(usize, DonePredicate),
388
389    /// Execution is complete. The array in the accompanying [`ExecutionResult`] is the result.
390    /// The scheduler will continue executing if it has not yet reached the target form.
391    Done,
392}
393
394impl fmt::Debug for ExecutionStep {
395    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
396        match self {
397            ExecutionStep::ExecuteChild(idx, _) => {
398                f.debug_tuple("ExecuteChild").field(idx).finish()
399            }
400            ExecutionStep::Done => write!(f, "Done"),
401        }
402    }
403}
404
405/// The result of a single execution step on an array encoding.
406///
407/// Combines an [`ArrayRef`] with an [`ExecutionStep`] to tell the scheduler both what to do next
408/// and what array to work with.
409pub struct ExecutionResult {
410    array: ArrayRef,
411    step: ExecutionStep,
412}
413
414impl ExecutionResult {
415    /// Signal that execution is complete with the given result array.
416    pub fn done(result: impl IntoArray) -> Self {
417        Self {
418            array: result.into_array(),
419            step: ExecutionStep::Done,
420        }
421    }
422
423    /// Request execution of child at `child_idx` until it matches the given [`Matcher`].
424    ///
425    /// The provided array is the (possibly modified) parent that still needs its child executed.
426    pub fn execute_child<M: Matcher>(array: impl IntoArray, child_idx: usize) -> Self {
427        Self {
428            array: array.into_array(),
429            step: ExecutionStep::ExecuteChild(child_idx, M::matches),
430        }
431    }
432
433    /// Returns a reference to the array.
434    pub fn array(&self) -> &ArrayRef {
435        &self.array
436    }
437
438    /// Returns a reference to the step.
439    pub fn step(&self) -> &ExecutionStep {
440        &self.step
441    }
442
443    /// Decompose into parts.
444    pub fn into_parts(self) -> (ArrayRef, ExecutionStep) {
445        (self.array, self.step)
446    }
447}
448
449impl fmt::Debug for ExecutionResult {
450    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
451        f.debug_struct("ExecutionResult")
452            .field("array", &self.array)
453            .field("step", &self.step)
454            .finish()
455    }
456}
457
458/// Require that a child array matches `$M`. If the child already matches, returns the same
459/// array unchanged. Otherwise, early-returns an [`ExecutionResult`] requesting execution of
460/// child `$idx` until it matches `$M`.
461///
462/// ```ignore
463/// let array = require_child!(array, array.codes(), 0 => Primitive);
464/// let array = require_child!(array, array.values(), 1 => AnyCanonical);
465/// ```
466#[macro_export]
467macro_rules! require_child {
468    ($parent:expr, $child:expr, $idx:expr => $M:ty) => {{
469        if !$child.is::<$M>() {
470            return Ok($crate::ExecutionResult::execute_child::<$M>(
471                $parent.clone(),
472                $idx,
473            ));
474        }
475        $parent
476    }};
477}
478
479/// Extension trait for creating an execution context from a session.
480pub trait VortexSessionExecute {
481    /// Create a new execution context from this session.
482    fn create_execution_ctx(&self) -> ExecutionCtx;
483}
484
485impl VortexSessionExecute for VortexSession {
486    fn create_execution_ctx(&self) -> ExecutionCtx {
487        ExecutionCtx::new(self.clone())
488    }
489}