Skip to main content

vortex_array/
executor.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::env::VarError;
5use std::fmt;
6use std::fmt::Display;
7use std::sync::Arc;
8use std::sync::LazyLock;
9use std::sync::atomic::AtomicUsize;
10
11use vortex_error::VortexExpect;
12use vortex_error::VortexResult;
13use vortex_error::vortex_bail;
14use vortex_error::vortex_panic;
15use vortex_session::VortexSession;
16
17use crate::AnyCanonical;
18use crate::ArrayRef;
19use crate::Canonical;
20use crate::DynArray;
21use crate::IntoArray;
22use crate::matcher::Matcher;
23use crate::optimizer::ArrayOptimizer;
24use crate::vtable::VTable;
25use crate::vtable::upcast_array;
26
27/// Maximum number of iterations to attempt when executing an array before giving up and returning
28/// an error.
29pub(crate) static MAX_ITERATIONS: LazyLock<usize> =
30    LazyLock::new(|| match std::env::var("VORTEX_MAX_ITERATIONS") {
31        Ok(val) => val
32            .parse::<usize>()
33            .unwrap_or_else(|e| vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid usize: {e}")),
34        Err(VarError::NotPresent) => 128,
35        Err(VarError::NotUnicode(_)) => {
36            vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid unicode string")
37        }
38    });
39
40/// Marker trait for types that an [`ArrayRef`] can be executed into.
41///
42/// Implementors must provide an implementation of `execute` that takes
43/// an [`ArrayRef`] and an [`ExecutionCtx`], and produces an instance of the
44/// implementor type.
45///
46/// Users should use the `Array::execute` or `Array::execute_as` methods
47pub trait Executable: Sized {
48    fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self>;
49}
50
51impl dyn DynArray + '_ {
52    /// Execute this array to produce an instance of `E`.
53    ///
54    /// See the [`Executable`] implementation for details on how this execution is performed.
55    pub fn execute<E: Executable>(self: Arc<Self>, ctx: &mut ExecutionCtx) -> VortexResult<E> {
56        E::execute(self, ctx)
57    }
58
59    /// Execute this array, labeling the execution step with a name for tracing.
60    pub fn execute_as<E: Executable>(
61        self: Arc<Self>,
62        _name: &'static str,
63        ctx: &mut ExecutionCtx,
64    ) -> VortexResult<E> {
65        E::execute(self, ctx)
66    }
67
68    /// Iteratively execute this array until the [`Matcher`] matches, using an explicit work
69    /// stack.
70    ///
71    /// The scheduler repeatedly:
72    /// 1. Checks if the current array matches `M` — if so, pops the stack or returns.
73    /// 2. Runs `execute_parent` on each child for child-driven optimizations.
74    /// 3. Calls `execute` which returns an [`ExecutionStep`].
75    ///
76    /// Note: the returned array may not match `M`. If execution converges to a canonical form
77    /// that does not match `M`, the canonical array is returned since no further execution
78    /// progress is possible.
79    ///
80    /// For safety, we will error when the number of execution iterations reaches a configurable
81    /// maximum (default 128, override with `VORTEX_MAX_ITERATIONS`).
82    pub fn execute_until<M: Matcher>(
83        self: Arc<Self>,
84        ctx: &mut ExecutionCtx,
85    ) -> VortexResult<ArrayRef> {
86        static MAX_ITERATIONS: LazyLock<usize> =
87            LazyLock::new(|| match std::env::var("VORTEX_MAX_ITERATIONS") {
88                Ok(val) => val.parse::<usize>().unwrap_or_else(|e| {
89                    vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid usize: {e}")
90                }),
91                Err(VarError::NotPresent) => 128,
92                Err(VarError::NotUnicode(_)) => {
93                    vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid unicode string")
94                }
95            });
96
97        let mut current = self.optimize()?;
98        // Stack frames: (parent, child_idx, done_predicate_for_child)
99        let mut stack: Vec<(ArrayRef, usize, DonePredicate)> = Vec::new();
100
101        for _ in 0..*MAX_ITERATIONS {
102            // Check for termination: use the stack frame's done predicate, or the root matcher.
103            let is_done = stack
104                .last()
105                .map_or(M::matches as DonePredicate, |frame| frame.2);
106            if is_done(current.as_ref()) {
107                match stack.pop() {
108                    None => {
109                        ctx.log(format_args!("-> {}", current));
110                        return Ok(current);
111                    }
112                    Some((parent, child_idx, _)) => {
113                        current = parent.with_child(child_idx, current)?;
114                        current = current.optimize()?;
115                        continue;
116                    }
117                }
118            }
119
120            // If we've reached canonical form, we can't execute any further regardless
121            // of whether the matcher matched.
122            if AnyCanonical::matches(current.as_ref()) {
123                match stack.pop() {
124                    None => {
125                        ctx.log(format_args!("-> canonical (unmatched) {}", current));
126                        return Ok(current);
127                    }
128                    Some((parent, child_idx, _)) => {
129                        current = parent.with_child(child_idx, current)?;
130                        current = current.optimize()?;
131                        continue;
132                    }
133                }
134            }
135
136            // Try execute_parent (child-driven optimized execution)
137            if let Some(rewritten) = try_execute_parent(&current, ctx)? {
138                ctx.log(format_args!(
139                    "execute_parent rewrote {} -> {}",
140                    current, rewritten
141                ));
142                current = rewritten.optimize()?;
143                continue;
144            }
145
146            // Execute the array itself.
147            let result = execute_step(current, ctx)?;
148            let (array, step) = result.into_parts();
149            match step {
150                ExecutionStep::ExecuteChild(i, done) => {
151                    let child = array
152                        .nth_child(i)
153                        .vortex_expect("ExecuteChild index in bounds");
154                    ctx.log(format_args!(
155                        "ExecuteChild({i}): pushing {}, focusing on {}",
156                        array, child
157                    ));
158                    stack.push((array, i, done));
159                    current = child.optimize()?;
160                }
161                ExecutionStep::Done => {
162                    ctx.log(format_args!("Done: {}", array));
163                    current = array;
164                }
165            }
166        }
167
168        vortex_bail!(
169            "Exceeded maximum execution iterations ({}) while executing array",
170            *MAX_ITERATIONS,
171        )
172    }
173}
174
175/// Execution context for batch CPU compute.
176///
177/// Accumulates a trace of execution steps. Individual steps are logged at TRACE level for
178/// real-time following, and the full trace is dumped at DEBUG level when the context is dropped.
179pub struct ExecutionCtx {
180    id: usize,
181    session: VortexSession,
182    ops: Vec<String>,
183}
184
185impl ExecutionCtx {
186    /// Create a new execution context with the given session.
187    pub fn new(session: VortexSession) -> Self {
188        static EXEC_CTX_ID: AtomicUsize = AtomicUsize::new(0);
189        let id = EXEC_CTX_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
190        Self {
191            id,
192            session,
193            ops: Vec::new(),
194        }
195    }
196
197    /// Get the session associated with this execution context.
198    pub fn session(&self) -> &VortexSession {
199        &self.session
200    }
201
202    /// Log an execution step at the current depth.
203    ///
204    /// Steps are accumulated and dumped as a single trace on Drop at DEBUG level.
205    /// Individual steps are also logged at TRACE level for real-time following.
206    ///
207    /// Use the [`format_args!`] macro to create the `msg` argument.
208    pub fn log(&mut self, msg: fmt::Arguments<'_>) {
209        if tracing::enabled!(tracing::Level::DEBUG) {
210            let formatted = format!(" - {msg}");
211            tracing::trace!("exec[{}]: {formatted}", self.id);
212            self.ops.push(formatted);
213        }
214    }
215}
216
217impl Display for ExecutionCtx {
218    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
219        write!(f, "exec[{}]", self.id)
220    }
221}
222
223impl Drop for ExecutionCtx {
224    fn drop(&mut self) {
225        if !self.ops.is_empty() && tracing::enabled!(tracing::Level::DEBUG) {
226            // Unlike itertools `.format()` (panics in 0.14 on second format)
227            struct FmtOps<'a>(&'a [String]);
228            impl Display for FmtOps<'_> {
229                fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
230                    for (i, op) in self.0.iter().enumerate() {
231                        if i > 0 {
232                            f.write_str("\n")?;
233                        }
234                        f.write_str(op)?;
235                    }
236                    Ok(())
237                }
238            }
239            tracing::debug!("exec[{}] trace:\n{}", self.id, FmtOps(&self.ops));
240        }
241    }
242}
243
244/// Executing an [`ArrayRef`] into an [`ArrayRef`] is the atomic execution loop within Vortex.
245///
246/// It attempts to take the smallest possible step of execution such that the returned array
247/// is incrementally more "executed" than the input array. In other words, it is closer to becoming
248/// a canonical array.
249///
250/// The execution steps are as follows:
251/// 0. Check for canonical.
252/// 1. Attempt to `reduce` the array with metadata-only optimizations.
253/// 2. Attempt to call `reduce_parent` on each child.
254/// 3. Attempt to call `execute_parent` on each child.
255/// 4. Call `execute` on the array itself (which returns an [`ExecutionStep`]).
256///
257/// Most users will not call this method directly, instead preferring to specify an executable
258/// target such as [`crate::Columnar`], [`Canonical`], or any of the canonical array types (such as
259/// [`crate::arrays::PrimitiveArray`]).
260impl Executable for ArrayRef {
261    fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self> {
262        // 0. Check for canonical
263        if let Some(canonical) = array.as_opt::<AnyCanonical>() {
264            ctx.log(format_args!("-> canonical {}", array));
265            return Ok(Canonical::from(canonical).into_array());
266        }
267
268        // 1. reduce (metadata-only rewrites)
269        if let Some(reduced) = array.vtable().reduce(&array)? {
270            ctx.log(format_args!("reduce: rewrote {} -> {}", array, reduced));
271            reduced.statistics().inherit_from(array.statistics());
272            return Ok(reduced);
273        }
274
275        // 2. reduce_parent (child-driven metadata-only rewrites)
276        for child_idx in 0..array.nchildren() {
277            let child = array.nth_child(child_idx).vortex_expect("checked length");
278            if let Some(reduced_parent) = child.vtable().reduce_parent(&child, &array, child_idx)? {
279                ctx.log(format_args!(
280                    "reduce_parent: child[{}]({}) rewrote {} -> {}",
281                    child_idx,
282                    child.encoding_id(),
283                    array,
284                    reduced_parent
285                ));
286                reduced_parent.statistics().inherit_from(array.statistics());
287                return Ok(reduced_parent);
288            }
289        }
290
291        // 3. execute_parent (child-driven optimized execution)
292        for child_idx in 0..array.nchildren() {
293            // TODO(joe): remove internal copy in nth_child.
294            let child = array.nth_child(child_idx).vortex_expect("checked length");
295            if let Some(executed_parent) = child
296                .vtable()
297                .execute_parent(&child, &array, child_idx, ctx)?
298            {
299                ctx.log(format_args!(
300                    "execute_parent: child[{}]({}) rewrote {} -> {}",
301                    child_idx,
302                    child.encoding_id(),
303                    array,
304                    executed_parent
305                ));
306                executed_parent
307                    .statistics()
308                    .inherit_from(array.statistics());
309                return Ok(executed_parent);
310            }
311        }
312
313        // 4. execute (returns an ExecutionResult)
314        ctx.log(format_args!("executing {}", array));
315        let result = execute_step(array, ctx)?;
316        let (array, step) = result.into_parts();
317        match step {
318            ExecutionStep::Done => {
319                ctx.log(format_args!("-> {}", array.as_ref()));
320                Ok(array)
321            }
322            ExecutionStep::ExecuteChild(i, _) => {
323                // For single-step execution, handle ExecuteChild by executing the child,
324                // replacing it, and returning the updated array.
325                let child = array.nth_child(i).vortex_expect("valid child index");
326                let executed_child = child.execute::<ArrayRef>(ctx)?;
327                array.with_child(i, executed_child)
328            }
329        }
330    }
331}
332
333/// Execute a single step on an array, consuming it.
334///
335/// Extracts the vtable before consuming the array to avoid borrow conflicts.
336fn execute_step(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
337    let vtable = array.vtable().clone_boxed();
338    vtable.execute(array, ctx)
339}
340
341/// Try execute_parent on each child of the array.
342fn try_execute_parent(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Option<ArrayRef>> {
343    for child_idx in 0..array.nchildren() {
344        let child = array
345            .nth_child(child_idx)
346            .vortex_expect("checked nchildren");
347        if let Some(result) = child
348            .vtable()
349            .execute_parent(&child, array, child_idx, ctx)?
350        {
351            result.statistics().inherit_from(array.statistics());
352            return Ok(Some(result));
353        }
354    }
355    Ok(None)
356}
357
358/// A predicate that determines when an array has reached a desired form during execution.
359pub type DonePredicate = fn(&dyn DynArray) -> bool;
360
361/// Metadata-only step indicator returned alongside an array in [`ExecutionResult`].
362///
363/// Instead of recursively executing children, encodings return an `ExecutionStep` that tells the
364/// scheduler what to do next. This enables the scheduler to manage execution iteratively using
365/// an explicit work stack, run cross-step optimizations, and cache shared sub-expressions.
366pub enum ExecutionStep {
367    /// Request that the scheduler execute child at the given index, using the provided
368    /// [`DonePredicate`] to determine when the child is "done", then replace the child in this
369    /// array and re-enter execution.
370    ///
371    /// Between steps, the scheduler runs reduce/reduce_parent rules to fixpoint, enabling
372    /// cross-step optimization (e.g., pushing scalar functions through newly-decoded children).
373    ExecuteChild(usize, DonePredicate),
374
375    /// Execution is complete. The array in the accompanying [`ExecutionResult`] is the result.
376    /// The scheduler will continue executing if it has not yet reached the target form.
377    Done,
378}
379
380impl fmt::Debug for ExecutionStep {
381    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
382        match self {
383            ExecutionStep::ExecuteChild(idx, _) => {
384                f.debug_tuple("ExecuteChild").field(idx).finish()
385            }
386            ExecutionStep::Done => write!(f, "Done"),
387        }
388    }
389}
390
391/// The result of a single execution step on an array encoding.
392///
393/// Combines an [`ArrayRef`] with an [`ExecutionStep`] to tell the scheduler both what to do next
394/// and what array to work with.
395pub struct ExecutionResult {
396    array: ArrayRef,
397    step: ExecutionStep,
398}
399
400impl ExecutionResult {
401    /// Signal that execution is complete with the given result array.
402    pub fn done(result: impl IntoArray) -> Self {
403        Self {
404            array: result.into_array(),
405            step: ExecutionStep::Done,
406        }
407    }
408
409    pub fn done_upcast<V: VTable>(arr: Arc<V::Array>) -> Self {
410        Self {
411            array: upcast_array::<V>(arr),
412            step: ExecutionStep::Done,
413        }
414    }
415
416    /// Request execution of child at `child_idx` until it matches the given [`Matcher`].
417    ///
418    /// The provided array is the (possibly modified) parent that still needs its child executed.
419    pub fn execute_child<M: Matcher>(array: impl IntoArray, child_idx: usize) -> Self {
420        Self {
421            array: array.into_array(),
422            step: ExecutionStep::ExecuteChild(child_idx, M::matches),
423        }
424    }
425
426    /// Like [`execute_child`](Self::execute_child), but accepts an `Arc<V::Array>` directly,
427    /// upcasting it to an [`ArrayRef`].
428    pub fn execute_child_upcast<V: VTable, M: Matcher>(
429        array: Arc<V::Array>,
430        child_idx: usize,
431    ) -> Self {
432        Self {
433            array: upcast_array::<V>(array),
434            step: ExecutionStep::ExecuteChild(child_idx, M::matches),
435        }
436    }
437
438    /// Returns a reference to the array.
439    pub fn array(&self) -> &ArrayRef {
440        &self.array
441    }
442
443    /// Returns a reference to the step.
444    pub fn step(&self) -> &ExecutionStep {
445        &self.step
446    }
447
448    /// Decompose into parts.
449    pub fn into_parts(self) -> (ArrayRef, ExecutionStep) {
450        (self.array, self.step)
451    }
452}
453
454impl fmt::Debug for ExecutionResult {
455    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
456        f.debug_struct("ExecutionResult")
457            .field("array", &self.array)
458            .field("step", &self.step)
459            .finish()
460    }
461}
462
463/// Require that a child array matches `$M`. If the child already matches, returns the same
464/// array unchanged. Otherwise, early-returns an [`ExecutionResult`] requesting execution of
465/// child `$idx` until it matches `$M`.
466///
467/// ```ignore
468/// let codes = require_child!(Self, array, array.codes(), 0 => Primitive);
469/// let values = require_child!(Self, array, array.values(), 1 => AnyCanonical);
470/// ```
471#[macro_export]
472macro_rules! require_child {
473    ($V:ty, $parent:expr, $child:expr, $idx:expr => $M:ty) => {{
474        if !$child.is::<$M>() {
475            return Ok($crate::ExecutionResult::execute_child_upcast::<$V, $M>(
476                std::sync::Arc::clone(&$parent),
477                $idx,
478            ));
479        }
480        $parent
481    }};
482}
483
484/// Extension trait for creating an execution context from a session.
485pub trait VortexSessionExecute {
486    /// Create a new execution context from this session.
487    fn create_execution_ctx(&self) -> ExecutionCtx;
488}
489
490impl VortexSessionExecute for VortexSession {
491    fn create_execution_ctx(&self) -> ExecutionCtx {
492        ExecutionCtx::new(self.clone())
493    }
494}