Skip to main content

vortex_array/
executor.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! The execution engine: iteratively transforms arrays toward canonical form.
5//!
6//! Execution proceeds through four layers tried in order on each iteration:
7//!
8//! 1. **`reduce`** -- metadata-only self-rewrite (cheapest).
9//! 2. **`reduce_parent`** -- metadata-only child-driven parent rewrite.
10//! 3. **`execute_parent`** -- child-driven fused execution (may read buffers).
11//! 4. **`execute`** -- the encoding's own decode step (most expensive).
12//!
13//! The main entry point is [`DynArray::execute_until`], which uses an explicit work stack
14//! to drive execution iteratively without recursion. Between steps, the optimizer runs
15//! reduce/reduce_parent rules to fixpoint.
16//!
17//! See <https://docs.vortex.dev/developer-guide/internals/execution> for a full description
18//! of the model.
19
20use std::env::VarError;
21use std::fmt;
22use std::fmt::Display;
23use std::sync::LazyLock;
24use std::sync::atomic::AtomicUsize;
25
26use vortex_error::VortexExpect;
27use vortex_error::VortexResult;
28use vortex_error::vortex_bail;
29use vortex_error::vortex_panic;
30use vortex_session::VortexSession;
31
32use crate::AnyCanonical;
33use crate::ArrayRef;
34use crate::Canonical;
35use crate::IntoArray;
36use crate::matcher::Matcher;
37use crate::memory::HostAllocatorRef;
38use crate::memory::MemorySessionExt;
39use crate::optimizer::ArrayOptimizer;
40
41/// Maximum number of iterations to attempt when executing an array before giving up and returning
42/// an error.
43pub(crate) static MAX_ITERATIONS: LazyLock<usize> =
44    LazyLock::new(|| match std::env::var("VORTEX_MAX_ITERATIONS") {
45        Ok(val) => val
46            .parse::<usize>()
47            .unwrap_or_else(|e| vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid usize: {e}")),
48        Err(VarError::NotPresent) => 128,
49        Err(VarError::NotUnicode(_)) => {
50            vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid unicode string")
51        }
52    });
53
54/// Marker trait for types that an [`ArrayRef`] can be executed into.
55///
56/// Implementors must provide an implementation of `execute` that takes
57/// an [`ArrayRef`] and an [`ExecutionCtx`], and produces an instance of the
58/// implementor type.
59///
60/// Users should use the `Array::execute` or `Array::execute_as` methods
61pub trait Executable: Sized {
62    fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self>;
63}
64
65#[allow(clippy::same_name_method)]
66impl ArrayRef {
67    /// Execute this array to produce an instance of `E`.
68    ///
69    /// See the [`Executable`] implementation for details on how this execution is performed.
70    pub fn execute<E: Executable>(self, ctx: &mut ExecutionCtx) -> VortexResult<E> {
71        E::execute(self, ctx)
72    }
73
74    /// Execute this array, labeling the execution step with a name for tracing.
75    pub fn execute_as<E: Executable>(
76        self,
77        _name: &'static str,
78        ctx: &mut ExecutionCtx,
79    ) -> VortexResult<E> {
80        E::execute(self, ctx)
81    }
82
83    /// Iteratively execute this array until the [`Matcher`] matches, using an explicit work
84    /// stack.
85    ///
86    /// The scheduler repeatedly:
87    /// 1. Checks if the current array matches `M` — if so, pops the stack or returns.
88    /// 2. Runs `execute_parent` on each child for child-driven optimizations.
89    /// 3. Calls `execute` which returns an [`ExecutionStep`].
90    ///
91    /// Note: the returned array may not match `M`. If execution converges to a canonical form
92    /// that does not match `M`, the canonical array is returned since no further execution
93    /// progress is possible.
94    ///
95    /// For safety, we will error when the number of execution iterations reaches a configurable
96    /// maximum (default 128, override with `VORTEX_MAX_ITERATIONS`).
97    pub fn execute_until<M: Matcher>(self, ctx: &mut ExecutionCtx) -> VortexResult<ArrayRef> {
98        static MAX_ITERATIONS: LazyLock<usize> =
99            LazyLock::new(|| match std::env::var("VORTEX_MAX_ITERATIONS") {
100                Ok(val) => val.parse::<usize>().unwrap_or_else(|e| {
101                    vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid usize: {e}")
102                }),
103                Err(VarError::NotPresent) => 128,
104                Err(VarError::NotUnicode(_)) => {
105                    vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid unicode string")
106                }
107            });
108
109        let mut current = self.optimize()?;
110        // Stack frames: (parent, slot_idx, done_predicate_for_slot)
111        let mut stack: Vec<(ArrayRef, usize, DonePredicate)> = Vec::new();
112
113        for _ in 0..*MAX_ITERATIONS {
114            // Check for termination: use the stack frame's done predicate, or the root matcher.
115            let is_done = stack
116                .last()
117                .map_or(M::matches as DonePredicate, |frame| frame.2);
118            if is_done(&current) {
119                match stack.pop() {
120                    None => {
121                        ctx.log(format_args!("-> {}", current));
122                        return Ok(current);
123                    }
124                    Some((parent, slot_idx, _)) => {
125                        current = parent.with_slot(slot_idx, current)?;
126                        current = current.optimize()?;
127                        continue;
128                    }
129                }
130            }
131
132            // If we've reached canonical form, we can't execute any further regardless
133            // of whether the matcher matched.
134            if AnyCanonical::matches(&current) {
135                match stack.pop() {
136                    None => {
137                        ctx.log(format_args!("-> canonical (unmatched) {}", current));
138                        return Ok(current);
139                    }
140                    Some((parent, slot_idx, _)) => {
141                        current = parent.with_slot(slot_idx, current)?;
142                        current = current.optimize()?;
143                        continue;
144                    }
145                }
146            }
147
148            // Try execute_parent (child-driven optimized execution)
149            if let Some(rewritten) = try_execute_parent(&current, ctx)? {
150                ctx.log(format_args!(
151                    "execute_parent rewrote {} -> {}",
152                    current, rewritten
153                ));
154                current = rewritten.optimize()?;
155                continue;
156            }
157
158            // Execute the array itself.
159            let result = execute_step(current, ctx)?;
160            let (array, step) = result.into_parts();
161            match step {
162                ExecutionStep::ExecuteSlot(i, done) => {
163                    let child = array.slots()[i]
164                        .clone()
165                        .vortex_expect("ExecuteSlot index in bounds");
166                    ctx.log(format_args!(
167                        "ExecuteSlot({i}): pushing {}, focusing on {}",
168                        array, child
169                    ));
170                    stack.push((array, i, done));
171                    current = child.optimize()?;
172                }
173                ExecutionStep::Done => {
174                    ctx.log(format_args!("Done: {}", array));
175                    current = array;
176                }
177            }
178        }
179
180        vortex_bail!(
181            "Exceeded maximum execution iterations ({}) while executing array",
182            *MAX_ITERATIONS,
183        )
184    }
185}
186
187/// Execution context for batch CPU compute.
188///
189/// Accumulates a trace of execution steps. Individual steps are logged at TRACE level for
190/// real-time following, and the full trace is dumped at DEBUG level when the context is dropped.
191#[derive(Debug, Clone)]
192pub struct ExecutionCtx {
193    id: usize,
194    session: VortexSession,
195    ops: Vec<String>,
196}
197
198impl ExecutionCtx {
199    /// Create a new execution context with the given session.
200    pub fn new(session: VortexSession) -> Self {
201        static EXEC_CTX_ID: AtomicUsize = AtomicUsize::new(0);
202        let id = EXEC_CTX_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
203        Self {
204            id,
205            session,
206            ops: Vec::new(),
207        }
208    }
209
210    /// Get the session associated with this execution context.
211    pub fn session(&self) -> &VortexSession {
212        &self.session
213    }
214
215    /// Get the session-scoped host allocator for this execution context.
216    pub fn allocator(&self) -> HostAllocatorRef {
217        self.session.allocator()
218    }
219
220    /// Log an execution step at the current depth.
221    ///
222    /// Steps are accumulated and dumped as a single trace on Drop at DEBUG level.
223    /// Individual steps are also logged at TRACE level for real-time following.
224    ///
225    /// Use the [`format_args!`] macro to create the `msg` argument.
226    pub fn log(&mut self, msg: fmt::Arguments<'_>) {
227        if tracing::enabled!(tracing::Level::DEBUG) {
228            let formatted = format!(" - {msg}");
229            tracing::trace!("exec[{}]: {formatted}", self.id);
230            self.ops.push(formatted);
231        }
232    }
233}
234
235impl Display for ExecutionCtx {
236    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
237        write!(f, "exec[{}]", self.id)
238    }
239}
240
241impl Drop for ExecutionCtx {
242    fn drop(&mut self) {
243        if !self.ops.is_empty() && tracing::enabled!(tracing::Level::DEBUG) {
244            // Unlike itertools `.format()` (panics in 0.14 on second format)
245            struct FmtOps<'a>(&'a [String]);
246            impl Display for FmtOps<'_> {
247                fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
248                    for (i, op) in self.0.iter().enumerate() {
249                        if i > 0 {
250                            f.write_str("\n")?;
251                        }
252                        f.write_str(op)?;
253                    }
254                    Ok(())
255                }
256            }
257            tracing::debug!("exec[{}] trace:\n{}", self.id, FmtOps(&self.ops));
258        }
259    }
260}
261
262/// Executing an [`ArrayRef`] into an [`ArrayRef`] is the atomic execution loop within Vortex.
263///
264/// It attempts to take the smallest possible step of execution such that the returned array
265/// is incrementally more "executed" than the input array. In other words, it is closer to becoming
266/// a canonical array.
267///
268/// The execution steps are as follows:
269/// 0. Check for canonical.
270/// 1. Attempt to `reduce` the array with metadata-only optimizations.
271/// 2. Attempt to call `reduce_parent` on each child.
272/// 3. Attempt to call `execute_parent` on each child.
273/// 4. Call `execute` on the array itself (which returns an [`ExecutionStep`]).
274///
275/// Most users will not call this method directly, instead preferring to specify an executable
276/// target such as [`crate::Columnar`], [`Canonical`], or any of the canonical array types (such as
277/// [`crate::arrays::PrimitiveArray`]).
278impl Executable for ArrayRef {
279    fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self> {
280        // 0. Check for canonical
281        if let Some(canonical) = array.as_opt::<AnyCanonical>() {
282            ctx.log(format_args!("-> canonical {}", array));
283            return Ok(Canonical::from(canonical).into_array());
284        }
285
286        // 1. reduce (metadata-only rewrites)
287        if let Some(reduced) = array.reduce()? {
288            ctx.log(format_args!("reduce: rewrote {} -> {}", array, reduced));
289            reduced.statistics().inherit_from(array.statistics());
290            return Ok(reduced);
291        }
292
293        // 2. reduce_parent (child-driven metadata-only rewrites)
294        for (slot_idx, slot) in array.slots().iter().enumerate() {
295            let Some(child) = slot else {
296                continue;
297            };
298            if let Some(reduced_parent) = child.reduce_parent(&array, slot_idx)? {
299                ctx.log(format_args!(
300                    "reduce_parent: slot[{}]({}) rewrote {} -> {}",
301                    slot_idx,
302                    child.encoding_id(),
303                    array,
304                    reduced_parent
305                ));
306                reduced_parent.statistics().inherit_from(array.statistics());
307                return Ok(reduced_parent);
308            }
309        }
310
311        // 3. execute_parent (child-driven optimized execution)
312        for (slot_idx, slot) in array.slots().iter().enumerate() {
313            let Some(child) = slot else {
314                continue;
315            };
316            if let Some(executed_parent) = child.execute_parent(&array, slot_idx, ctx)? {
317                ctx.log(format_args!(
318                    "execute_parent: slot[{}]({}) rewrote {} -> {}",
319                    slot_idx,
320                    child.encoding_id(),
321                    array,
322                    executed_parent
323                ));
324                executed_parent
325                    .statistics()
326                    .inherit_from(array.statistics());
327                return Ok(executed_parent);
328            }
329        }
330
331        // 4. execute (returns an ExecutionResult)
332        ctx.log(format_args!("executing {}", array));
333        let result = execute_step(array, ctx)?;
334        let (array, step) = result.into_parts();
335        match step {
336            ExecutionStep::Done => {
337                ctx.log(format_args!("-> {}", array));
338                Ok(array)
339            }
340            ExecutionStep::ExecuteSlot(i, _) => {
341                // For single-step execution, handle ExecuteSlot by executing the slot,
342                // replacing it, and returning the updated array.
343                let child = array.slots()[i].clone().vortex_expect("valid slot index");
344                let executed_child = child.execute::<ArrayRef>(ctx)?;
345                array.with_slot(i, executed_child)
346            }
347        }
348    }
349}
350
351/// Execute a single step on an array, consuming it.
352///
353/// Extracts the vtable before consuming the array to avoid borrow conflicts.
354fn execute_step(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
355    array.execute_encoding(ctx)
356}
357
358/// Try execute_parent on each occupied slot of the array.
359fn try_execute_parent(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Option<ArrayRef>> {
360    for (slot_idx, slot) in array.slots().iter().enumerate() {
361        let Some(child) = slot else {
362            continue;
363        };
364        if let Some(result) = child.execute_parent(array, slot_idx, ctx)? {
365            result.statistics().inherit_from(array.statistics());
366            return Ok(Some(result));
367        }
368    }
369    Ok(None)
370}
371
372/// A predicate that determines when an array has reached a desired form during execution.
373pub type DonePredicate = fn(&ArrayRef) -> bool;
374
375/// Metadata-only step indicator returned alongside an array in [`ExecutionResult`].
376///
377/// Instead of recursively executing children, encodings return an `ExecutionStep` that tells the
378/// scheduler what to do next. This enables the scheduler to manage execution iteratively using
379/// an explicit work stack, run cross-step optimizations, and cache shared sub-expressions.
380pub enum ExecutionStep {
381    /// Request that the scheduler execute the slot at the given index, using the provided
382    /// [`DonePredicate`] to determine when the slot is "done", then replace the slot in this
383    /// array and re-enter execution.
384    ///
385    /// Between steps, the scheduler runs reduce/reduce_parent rules to fixpoint, enabling
386    /// cross-step optimization (e.g., pushing scalar functions through newly-decoded children).
387    ///
388    /// Use [`ExecutionResult::execute_slot`] instead of constructing this variant directly.
389    ExecuteSlot(usize, DonePredicate),
390
391    /// Execution is complete. The array in the accompanying [`ExecutionResult`] is the result.
392    /// The scheduler will continue executing if it has not yet reached the target form.
393    Done,
394}
395
396impl fmt::Debug for ExecutionStep {
397    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
398        match self {
399            ExecutionStep::ExecuteSlot(idx, _) => f.debug_tuple("ExecuteSlot").field(idx).finish(),
400            ExecutionStep::Done => write!(f, "Done"),
401        }
402    }
403}
404
405/// The result of a single execution step on an array encoding.
406///
407/// Combines an [`ArrayRef`] with an [`ExecutionStep`] to tell the scheduler both what to do next
408/// and what array to work with.
409pub struct ExecutionResult {
410    array: ArrayRef,
411    step: ExecutionStep,
412}
413
414impl ExecutionResult {
415    /// Signal that execution is complete with the given result array.
416    pub fn done(result: impl IntoArray) -> Self {
417        Self {
418            array: result.into_array(),
419            step: ExecutionStep::Done,
420        }
421    }
422
423    /// Request execution of slot at `slot_idx` until it matches the given [`Matcher`].
424    ///
425    /// The provided array is the (possibly modified) parent that still needs its slot executed.
426    pub fn execute_slot<M: Matcher>(array: impl IntoArray, slot_idx: usize) -> Self {
427        Self {
428            array: array.into_array(),
429            step: ExecutionStep::ExecuteSlot(slot_idx, M::matches),
430        }
431    }
432
433    /// Returns a reference to the array.
434    pub fn array(&self) -> &ArrayRef {
435        &self.array
436    }
437
438    /// Returns a reference to the step.
439    pub fn step(&self) -> &ExecutionStep {
440        &self.step
441    }
442
443    /// Decompose into parts.
444    pub fn into_parts(self) -> (ArrayRef, ExecutionStep) {
445        (self.array, self.step)
446    }
447}
448
449impl fmt::Debug for ExecutionResult {
450    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
451        f.debug_struct("ExecutionResult")
452            .field("array", &self.array)
453            .field("step", &self.step)
454            .finish()
455    }
456}
457
458/// Require that a child array matches `$M`. If the child already matches, returns the same
459/// array unchanged. Otherwise, early-returns an [`ExecutionResult`] requesting execution of
460/// child `$idx` until it matches `$M`.
461///
462/// ```ignore
463/// let array = require_child!(array, array.codes(), 0 => Primitive);
464/// let array = require_child!(array, array.values(), 1 => AnyCanonical);
465/// ```
466#[macro_export]
467macro_rules! require_child {
468    ($parent:expr, $child:expr, $idx:expr => $M:ty) => {{
469        if !$child.is::<$M>() {
470            return Ok($crate::ExecutionResult::execute_slot::<$M>(
471                $parent.clone(),
472                $idx,
473            ));
474        }
475        $parent
476    }};
477}
478
479/// Like [`require_child!`], but for optional children. If the child is `None`, this is a no-op.
480/// If the child is `Some` but does not match `$M`, early-returns an [`ExecutionResult`] requesting
481/// execution of child `$idx`.
482///
483/// Unlike `require_child!`, this is a statement macro (no value produced) and does not clone
484/// `$parent` — it is moved into the early-return path.
485///
486/// ```ignore
487/// require_opt_child!(array, array.patches().map(|p| p.indices()), 1 => Primitive);
488/// ```
489#[macro_export]
490macro_rules! require_opt_child {
491    ($parent:expr, $child_opt:expr, $idx:expr => $M:ty) => {
492        if $child_opt.is_some_and(|child| !child.is::<$M>()) {
493            return Ok($crate::ExecutionResult::execute_slot::<$M>($parent, $idx));
494        }
495    };
496}
497
498/// Require that patch slots (indices, values, and optionally chunk_offsets) are `Primitive`.
499/// If no patches are present (slots are `None`), this is a no-op.
500///
501/// Like [`require_opt_child!`], `$parent` is moved (not cloned) into the early-return path.
502///
503/// ```ignore
504/// require_patches!(array, PATCH_INDICES_SLOT, PATCH_VALUES_SLOT, PATCH_CHUNK_OFFSETS_SLOT);
505/// ```
506#[macro_export]
507macro_rules! require_patches {
508    ($parent:expr, $indices_slot:expr, $values_slot:expr, $chunk_offsets_slot:expr) => {
509        $crate::require_opt_child!(
510            $parent,
511            $parent.slots()[$indices_slot].as_ref(),
512            $indices_slot => $crate::arrays::Primitive
513        );
514        $crate::require_opt_child!(
515            $parent,
516            $parent.slots()[$values_slot].as_ref(),
517            $values_slot => $crate::arrays::Primitive
518        );
519        $crate::require_opt_child!(
520            $parent,
521            $parent.slots()[$chunk_offsets_slot].as_ref(),
522            $chunk_offsets_slot => $crate::arrays::Primitive
523        );
524    };
525}
526
527/// Require that the validity slot is a [`Bool`](crate::arrays::Bool) array. If validity is not
528/// array-backed (e.g. `NonNullable` or `AllValid`), this is a no-op. If it is array-backed but
529/// not `Bool`, early-returns an [`ExecutionResult`] requesting execution of the validity slot.
530///
531/// Like [`require_opt_child!`], `$parent` is moved (not cloned) into the early-return path.
532///
533/// ```ignore
534/// require_validity!(array, VALIDITY_SLOT);
535/// ```
536#[macro_export]
537macro_rules! require_validity {
538    ($parent:expr, $idx:expr) => {
539        $crate::require_opt_child!(
540            $parent,
541            $parent.slots()[$idx].as_ref(),
542            $idx => $crate::arrays::Bool
543        );
544    };
545}
546
547/// Extension trait for creating an execution context from a session.
548pub trait VortexSessionExecute {
549    /// Create a new execution context from this session.
550    fn create_execution_ctx(&self) -> ExecutionCtx;
551}
552
553impl VortexSessionExecute for VortexSession {
554    fn create_execution_ctx(&self) -> ExecutionCtx {
555        ExecutionCtx::new(self.clone())
556    }
557}