Skip to main content

vortex_array/
executor.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! The execution engine: iteratively transforms arrays toward canonical form.
5//!
6//! Execution proceeds through four layers tried in order on each iteration:
7//!
8//! 1. **`reduce`** -- metadata-only self-rewrite (cheapest).
9//! 2. **`reduce_parent`** -- metadata-only child-driven parent rewrite.
10//! 3. **`execute_parent`** -- child-driven fused execution (may read buffers).
11//! 4. **`execute`** -- the encoding's own decode step (most expensive).
12//!
13//! The main entry point is [`DynArray::execute_until`], which uses an explicit work stack
14//! to drive execution iteratively without recursion. Between steps, the optimizer runs
15//! reduce/reduce_parent rules to fixpoint.
16//!
17//! See <https://docs.vortex.dev/developer-guide/internals/execution> for a full description
18//! of the model.
19
20use std::env::VarError;
21use std::fmt;
22use std::fmt::Display;
23use std::sync::LazyLock;
24use std::sync::atomic::AtomicUsize;
25
26use vortex_error::VortexExpect;
27use vortex_error::VortexResult;
28use vortex_error::vortex_bail;
29use vortex_error::vortex_panic;
30use vortex_session::VortexSession;
31
32use crate::AnyCanonical;
33use crate::ArrayRef;
34use crate::Canonical;
35use crate::IntoArray;
36use crate::matcher::Matcher;
37use crate::memory::HostAllocatorRef;
38use crate::memory::MemorySessionExt;
39use crate::optimizer::ArrayOptimizer;
40
41/// Returns the maximum number of iterations to attempt when executing an array before giving up and returning
42/// an error, can be by the `VORTEX_MAX_ITERATIONS` env variables, otherwise defaults to 128.
43pub(crate) fn max_iterations() -> usize {
44    static MAX_ITERATIONS: LazyLock<usize> =
45        LazyLock::new(|| match std::env::var("VORTEX_MAX_ITERATIONS") {
46            Ok(val) => val.parse::<usize>().unwrap_or_else(|e| {
47                vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid usize: {e}")
48            }),
49            Err(VarError::NotPresent) => 128,
50            Err(VarError::NotUnicode(_)) => {
51                vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid unicode string")
52            }
53        });
54    *MAX_ITERATIONS
55}
56
57/// Marker trait for types that an [`ArrayRef`] can be executed into.
58///
59/// Implementors must provide an implementation of `execute` that takes
60/// an [`ArrayRef`] and an [`ExecutionCtx`], and produces an instance of the
61/// implementor type.
62///
63/// Users should use the `Array::execute` or `Array::execute_as` methods
64pub trait Executable: Sized {
65    fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self>;
66}
67
68#[expect(clippy::same_name_method)]
69impl ArrayRef {
70    /// Execute this array to produce an instance of `E`.
71    ///
72    /// See the [`Executable`] implementation for details on how this execution is performed.
73    pub fn execute<E: Executable>(self, ctx: &mut ExecutionCtx) -> VortexResult<E> {
74        E::execute(self, ctx)
75    }
76
77    /// Execute this array, labeling the execution step with a name for tracing.
78    pub fn execute_as<E: Executable>(
79        self,
80        _name: &'static str,
81        ctx: &mut ExecutionCtx,
82    ) -> VortexResult<E> {
83        E::execute(self, ctx)
84    }
85
86    /// Iteratively execute this array until the [`Matcher`] matches, using an explicit work
87    /// stack.
88    ///
89    /// The scheduler repeatedly:
90    /// 1. Checks if the current array matches `M` — if so, pops the stack or returns.
91    /// 2. Runs `execute_parent` on each child for child-driven optimizations.
92    /// 3. Calls `execute` which returns an [`ExecutionStep`].
93    ///
94    /// Note: the returned array may not match `M`. If execution converges to a canonical form
95    /// that does not match `M`, the canonical array is returned since no further execution
96    /// progress is possible.
97    ///
98    /// For safety, we will error when the number of execution iterations reaches a configurable
99    /// maximum (default 128, override with `VORTEX_MAX_ITERATIONS`).
100    pub fn execute_until<M: Matcher>(self, ctx: &mut ExecutionCtx) -> VortexResult<ArrayRef> {
101        let mut current = self.optimize()?;
102        // Stack frames: (parent, slot_idx, done_predicate_for_slot)
103        let mut stack: Vec<(ArrayRef, usize, DonePredicate)> = Vec::new();
104
105        for _ in 0..max_iterations() {
106            // Check for termination: use the stack frame's done predicate, or the root matcher.
107            let is_done = stack
108                .last()
109                .map_or(M::matches as DonePredicate, |frame| frame.2);
110            if is_done(&current) {
111                match stack.pop() {
112                    None => {
113                        ctx.log(format_args!("-> {}", current));
114                        return Ok(current);
115                    }
116                    Some((parent, slot_idx, _)) => {
117                        current = parent.with_slot(slot_idx, current)?;
118                        current = current.optimize()?;
119                        continue;
120                    }
121                }
122            }
123
124            // If we've reached canonical form, we can't execute any further regardless
125            // of whether the matcher matched.
126            if AnyCanonical::matches(&current) {
127                match stack.pop() {
128                    None => {
129                        ctx.log(format_args!("-> canonical (unmatched) {}", current));
130                        return Ok(current);
131                    }
132                    Some((parent, slot_idx, _)) => {
133                        current = parent.with_slot(slot_idx, current)?;
134                        current = current.optimize()?;
135                        continue;
136                    }
137                }
138            }
139
140            // Try execute_parent (child-driven optimized execution)
141            if let Some(rewritten) = try_execute_parent(&current, ctx)? {
142                ctx.log(format_args!(
143                    "execute_parent rewrote {} -> {}",
144                    current, rewritten
145                ));
146                current = rewritten.optimize()?;
147                continue;
148            }
149
150            // Execute the array itself.
151            let result = execute_step(current, ctx)?;
152            let (array, step) = result.into_parts();
153            match step {
154                ExecutionStep::ExecuteSlot(i, done) => {
155                    let child = array.slots()[i]
156                        .clone()
157                        .vortex_expect("ExecuteSlot index in bounds");
158                    ctx.log(format_args!(
159                        "ExecuteSlot({i}): pushing {}, focusing on {}",
160                        array, child
161                    ));
162                    stack.push((array, i, done));
163                    current = child.optimize()?;
164                }
165                ExecutionStep::Done => {
166                    ctx.log(format_args!("Done: {}", array));
167                    current = array;
168                }
169            }
170        }
171
172        vortex_bail!(
173            "Exceeded maximum execution iterations ({}) while executing array",
174            max_iterations(),
175        )
176    }
177}
178
179/// Execution context for batch CPU compute.
180///
181/// Accumulates a trace of execution steps. Individual steps are logged at TRACE level for
182/// real-time following, and the full trace is dumped at DEBUG level when the context is dropped.
183#[derive(Debug, Clone)]
184pub struct ExecutionCtx {
185    id: usize,
186    session: VortexSession,
187    ops: Vec<String>,
188}
189
190impl ExecutionCtx {
191    /// Create a new execution context with the given session.
192    pub fn new(session: VortexSession) -> Self {
193        static EXEC_CTX_ID: AtomicUsize = AtomicUsize::new(0);
194        let id = EXEC_CTX_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
195        Self {
196            id,
197            session,
198            ops: Vec::new(),
199        }
200    }
201
202    /// Get the session associated with this execution context.
203    pub fn session(&self) -> &VortexSession {
204        &self.session
205    }
206
207    /// Get the session-scoped host allocator for this execution context.
208    pub fn allocator(&self) -> HostAllocatorRef {
209        self.session.allocator()
210    }
211
212    /// Log an execution step at the current depth.
213    ///
214    /// Steps are accumulated and dumped as a single trace on Drop at DEBUG level.
215    /// Individual steps are also logged at TRACE level for real-time following.
216    ///
217    /// Use the [`format_args!`] macro to create the `msg` argument.
218    pub fn log(&mut self, msg: fmt::Arguments<'_>) {
219        if tracing::enabled!(tracing::Level::DEBUG) {
220            let formatted = format!(" - {msg}");
221            tracing::trace!("exec[{}]: {formatted}", self.id);
222            self.ops.push(formatted);
223        }
224    }
225}
226
227impl Display for ExecutionCtx {
228    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
229        write!(f, "exec[{}]", self.id)
230    }
231}
232
233impl Drop for ExecutionCtx {
234    fn drop(&mut self) {
235        if !self.ops.is_empty() && tracing::enabled!(tracing::Level::DEBUG) {
236            // Unlike itertools `.format()` (panics in 0.14 on second format)
237            struct FmtOps<'a>(&'a [String]);
238            impl Display for FmtOps<'_> {
239                fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
240                    for (i, op) in self.0.iter().enumerate() {
241                        if i > 0 {
242                            f.write_str("\n")?;
243                        }
244                        f.write_str(op)?;
245                    }
246                    Ok(())
247                }
248            }
249            tracing::debug!("exec[{}] trace:\n{}", self.id, FmtOps(&self.ops));
250        }
251    }
252}
253
254/// Executing an [`ArrayRef`] into an [`ArrayRef`] is the atomic execution loop within Vortex.
255///
256/// It attempts to take the smallest possible step of execution such that the returned array
257/// is incrementally more "executed" than the input array. In other words, it is closer to becoming
258/// a canonical array.
259///
260/// The execution steps are as follows:
261/// 0. Check for canonical.
262/// 1. Attempt to `reduce` the array with metadata-only optimizations.
263/// 2. Attempt to call `reduce_parent` on each child.
264/// 3. Attempt to call `execute_parent` on each child.
265/// 4. Call `execute` on the array itself (which returns an [`ExecutionStep`]).
266///
267/// Most users will not call this method directly, instead preferring to specify an executable
268/// target such as [`crate::Columnar`], [`Canonical`], or any of the canonical array types (such as
269/// [`crate::arrays::PrimitiveArray`]).
270impl Executable for ArrayRef {
271    fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self> {
272        // 0. Check for canonical
273        if let Some(canonical) = array.as_opt::<AnyCanonical>() {
274            ctx.log(format_args!("-> canonical {}", array));
275            return Ok(Canonical::from(canonical).into_array());
276        }
277
278        // 1. reduce (metadata-only rewrites)
279        if let Some(reduced) = array.reduce()? {
280            ctx.log(format_args!("reduce: rewrote {} -> {}", array, reduced));
281            reduced.statistics().inherit_from(array.statistics());
282            return Ok(reduced);
283        }
284
285        // 2. reduce_parent (child-driven metadata-only rewrites)
286        for (slot_idx, slot) in array.slots().iter().enumerate() {
287            let Some(child) = slot else {
288                continue;
289            };
290            if let Some(reduced_parent) = child.reduce_parent(&array, slot_idx)? {
291                ctx.log(format_args!(
292                    "reduce_parent: slot[{}]({}) rewrote {} -> {}",
293                    slot_idx,
294                    child.encoding_id(),
295                    array,
296                    reduced_parent
297                ));
298                reduced_parent.statistics().inherit_from(array.statistics());
299                return Ok(reduced_parent);
300            }
301        }
302
303        // 3. execute_parent (child-driven optimized execution)
304        for (slot_idx, slot) in array.slots().iter().enumerate() {
305            let Some(child) = slot else {
306                continue;
307            };
308            if let Some(executed_parent) = child.execute_parent(&array, slot_idx, ctx)? {
309                ctx.log(format_args!(
310                    "execute_parent: slot[{}]({}) rewrote {} -> {}",
311                    slot_idx,
312                    child.encoding_id(),
313                    array,
314                    executed_parent
315                ));
316                executed_parent
317                    .statistics()
318                    .inherit_from(array.statistics());
319                return Ok(executed_parent);
320            }
321        }
322
323        // 4. execute (returns an ExecutionResult)
324        ctx.log(format_args!("executing {}", array));
325        let result = execute_step(array, ctx)?;
326        let (array, step) = result.into_parts();
327        match step {
328            ExecutionStep::Done => {
329                ctx.log(format_args!("-> {}", array));
330                Ok(array)
331            }
332            ExecutionStep::ExecuteSlot(i, _) => {
333                // For single-step execution, handle ExecuteSlot by executing the slot,
334                // replacing it, and returning the updated array.
335                let child = array.slots()[i].clone().vortex_expect("valid slot index");
336                let executed_child = child.execute::<ArrayRef>(ctx)?;
337                array.with_slot(i, executed_child)
338            }
339        }
340    }
341}
342
343/// Execute a single step on an array, consuming it.
344///
345/// Extracts the vtable before consuming the array to avoid borrow conflicts.
346fn execute_step(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
347    array.execute_encoding(ctx)
348}
349
350/// Try execute_parent on each occupied slot of the array.
351fn try_execute_parent(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Option<ArrayRef>> {
352    for (slot_idx, slot) in array.slots().iter().enumerate() {
353        let Some(child) = slot else {
354            continue;
355        };
356        if let Some(result) = child.execute_parent(array, slot_idx, ctx)? {
357            result.statistics().inherit_from(array.statistics());
358            return Ok(Some(result));
359        }
360    }
361    Ok(None)
362}
363
364/// A predicate that determines when an array has reached a desired form during execution.
365pub type DonePredicate = fn(&ArrayRef) -> bool;
366
367/// Metadata-only step indicator returned alongside an array in [`ExecutionResult`].
368///
369/// Instead of recursively executing children, encodings return an `ExecutionStep` that tells the
370/// scheduler what to do next. This enables the scheduler to manage execution iteratively using
371/// an explicit work stack, run cross-step optimizations, and cache shared sub-expressions.
372pub enum ExecutionStep {
373    /// Request that the scheduler execute the slot at the given index, using the provided
374    /// [`DonePredicate`] to determine when the slot is "done", then replace the slot in this
375    /// array and re-enter execution.
376    ///
377    /// Between steps, the scheduler runs reduce/reduce_parent rules to fixpoint, enabling
378    /// cross-step optimization (e.g., pushing scalar functions through newly-decoded children).
379    ///
380    /// Use [`ExecutionResult::execute_slot`] instead of constructing this variant directly.
381    ExecuteSlot(usize, DonePredicate),
382
383    /// Execution is complete. The array in the accompanying [`ExecutionResult`] is the result.
384    /// The scheduler will continue executing if it has not yet reached the target form.
385    Done,
386}
387
388impl fmt::Debug for ExecutionStep {
389    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
390        match self {
391            ExecutionStep::ExecuteSlot(idx, _) => f.debug_tuple("ExecuteSlot").field(idx).finish(),
392            ExecutionStep::Done => write!(f, "Done"),
393        }
394    }
395}
396
397/// The result of a single execution step on an array encoding.
398///
399/// Combines an [`ArrayRef`] with an [`ExecutionStep`] to tell the scheduler both what to do next
400/// and what array to work with.
401pub struct ExecutionResult {
402    array: ArrayRef,
403    step: ExecutionStep,
404}
405
406impl ExecutionResult {
407    /// Signal that execution is complete with the given result array.
408    pub fn done(result: impl IntoArray) -> Self {
409        Self {
410            array: result.into_array(),
411            step: ExecutionStep::Done,
412        }
413    }
414
415    /// Request execution of slot at `slot_idx` until it matches the given [`Matcher`].
416    ///
417    /// The provided array is the (possibly modified) parent that still needs its slot executed.
418    pub fn execute_slot<M: Matcher>(array: impl IntoArray, slot_idx: usize) -> Self {
419        Self {
420            array: array.into_array(),
421            step: ExecutionStep::ExecuteSlot(slot_idx, M::matches),
422        }
423    }
424
425    /// Returns a reference to the array.
426    pub fn array(&self) -> &ArrayRef {
427        &self.array
428    }
429
430    /// Returns a reference to the step.
431    pub fn step(&self) -> &ExecutionStep {
432        &self.step
433    }
434
435    /// Decompose into parts.
436    pub fn into_parts(self) -> (ArrayRef, ExecutionStep) {
437        (self.array, self.step)
438    }
439}
440
441impl fmt::Debug for ExecutionResult {
442    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
443        f.debug_struct("ExecutionResult")
444            .field("array", &self.array)
445            .field("step", &self.step)
446            .finish()
447    }
448}
449
450/// Require that a child array matches `$M`. If the child already matches, returns the same
451/// array unchanged. Otherwise, early-returns an [`ExecutionResult`] requesting execution of
452/// child `$idx` until it matches `$M`.
453///
454/// ```ignore
455/// let array = require_child!(array, array.codes(), 0 => Primitive);
456/// let array = require_child!(array, array.values(), 1 => AnyCanonical);
457/// ```
458#[macro_export]
459macro_rules! require_child {
460    ($parent:expr, $child:expr, $idx:expr => $M:ty) => {{
461        if !$child.is::<$M>() {
462            return Ok($crate::ExecutionResult::execute_slot::<$M>(
463                $parent.clone(),
464                $idx,
465            ));
466        }
467        $parent
468    }};
469}
470
471/// Like [`require_child!`], but for optional children. If the child is `None`, this is a no-op.
472/// If the child is `Some` but does not match `$M`, early-returns an [`ExecutionResult`] requesting
473/// execution of child `$idx`.
474///
475/// Unlike `require_child!`, this is a statement macro (no value produced) and does not clone
476/// `$parent` — it is moved into the early-return path.
477///
478/// ```ignore
479/// require_opt_child!(array, array.patches().map(|p| p.indices()), 1 => Primitive);
480/// ```
481#[macro_export]
482macro_rules! require_opt_child {
483    ($parent:expr, $child_opt:expr, $idx:expr => $M:ty) => {
484        if $child_opt.is_some_and(|child| !child.is::<$M>()) {
485            return Ok($crate::ExecutionResult::execute_slot::<$M>($parent, $idx));
486        }
487    };
488}
489
490/// Require that patch slots (indices, values, and optionally chunk_offsets) are `Primitive`.
491/// If no patches are present (slots are `None`), this is a no-op.
492///
493/// Like [`require_opt_child!`], `$parent` is moved (not cloned) into the early-return path.
494///
495/// ```ignore
496/// require_patches!(array, PATCH_INDICES_SLOT, PATCH_VALUES_SLOT, PATCH_CHUNK_OFFSETS_SLOT);
497/// ```
498#[macro_export]
499macro_rules! require_patches {
500    ($parent:expr, $indices_slot:expr, $values_slot:expr, $chunk_offsets_slot:expr) => {
501        $crate::require_opt_child!(
502            $parent,
503            $parent.slots()[$indices_slot].as_ref(),
504            $indices_slot => $crate::arrays::Primitive
505        );
506        $crate::require_opt_child!(
507            $parent,
508            $parent.slots()[$values_slot].as_ref(),
509            $values_slot => $crate::arrays::Primitive
510        );
511        $crate::require_opt_child!(
512            $parent,
513            $parent.slots()[$chunk_offsets_slot].as_ref(),
514            $chunk_offsets_slot => $crate::arrays::Primitive
515        );
516    };
517}
518
519/// Require that the validity slot is a [`Bool`](crate::arrays::Bool) array. If validity is not
520/// array-backed (e.g. `NonNullable` or `AllValid`), this is a no-op. If it is array-backed but
521/// not `Bool`, early-returns an [`ExecutionResult`] requesting execution of the validity slot.
522///
523/// Like [`require_opt_child!`], `$parent` is moved (not cloned) into the early-return path.
524///
525/// ```ignore
526/// require_validity!(array, VALIDITY_SLOT);
527/// ```
528#[macro_export]
529macro_rules! require_validity {
530    ($parent:expr, $idx:expr) => {
531        $crate::require_opt_child!(
532            $parent,
533            $parent.slots()[$idx].as_ref(),
534            $idx => $crate::arrays::Bool
535        );
536    };
537}
538
539/// Extension trait for creating an execution context from a session.
540pub trait VortexSessionExecute {
541    /// Create a new execution context from this session.
542    fn create_execution_ctx(&self) -> ExecutionCtx;
543}
544
545impl VortexSessionExecute for VortexSession {
546    fn create_execution_ctx(&self) -> ExecutionCtx {
547        ExecutionCtx::new(self.clone())
548    }
549}