Skip to main content

vortex_array/
executor.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::env::VarError;
5use std::fmt;
6use std::fmt::Display;
7use std::sync::Arc;
8use std::sync::LazyLock;
9use std::sync::atomic::AtomicUsize;
10
11use vortex_error::VortexExpect;
12use vortex_error::VortexResult;
13use vortex_error::vortex_bail;
14use vortex_error::vortex_panic;
15use vortex_session::VortexSession;
16
17use crate::AnyCanonical;
18use crate::ArrayRef;
19use crate::Canonical;
20use crate::DynArray;
21use crate::IntoArray;
22use crate::matcher::Matcher;
23use crate::optimizer::ArrayOptimizer;
24
25/// Maximum number of iterations to attempt when executing an array before giving up and returning
26/// an error.
27pub(crate) static MAX_ITERATIONS: LazyLock<usize> =
28    LazyLock::new(|| match std::env::var("VORTEX_MAX_ITERATIONS") {
29        Ok(val) => val
30            .parse::<usize>()
31            .unwrap_or_else(|e| vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid usize: {e}")),
32        Err(VarError::NotPresent) => 128,
33        Err(VarError::NotUnicode(_)) => {
34            vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid unicode string")
35        }
36    });
37
38/// Marker trait for types that an [`ArrayRef`] can be executed into.
39///
40/// Implementors must provide an implementation of `execute` that takes
41/// an [`ArrayRef`] and an [`ExecutionCtx`], and produces an instance of the
42/// implementor type.
43///
44/// Users should use the `Array::execute` or `Array::execute_as` methods
45pub trait Executable: Sized {
46    fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self>;
47}
48
49impl dyn DynArray + '_ {
50    /// Execute this array to produce an instance of `E`.
51    ///
52    /// See the [`Executable`] implementation for details on how this execution is performed.
53    pub fn execute<E: Executable>(self: Arc<Self>, ctx: &mut ExecutionCtx) -> VortexResult<E> {
54        E::execute(self, ctx)
55    }
56
57    /// Execute this array, labeling the execution step with a name for tracing.
58    pub fn execute_as<E: Executable>(
59        self: Arc<Self>,
60        _name: &'static str,
61        ctx: &mut ExecutionCtx,
62    ) -> VortexResult<E> {
63        E::execute(self, ctx)
64    }
65
66    /// Iteratively execute this array until the [`Matcher`] matches, using an explicit work
67    /// stack.
68    ///
69    /// The scheduler repeatedly:
70    /// 1. Checks if the current array matches `M` — if so, pops the stack or returns.
71    /// 2. Runs `execute_parent` on each child for child-driven optimizations.
72    /// 3. Calls `execute` which returns an [`ExecutionStep`].
73    ///
74    /// Note: the returned array may not match `M`. If execution converges to a canonical form
75    /// that does not match `M`, the canonical array is returned since no further execution
76    /// progress is possible.
77    ///
78    /// For safety, we will error when the number of execution iterations reaches a configurable
79    /// maximum (default 128, override with `VORTEX_MAX_ITERATIONS`).
80    pub fn execute_until<M: Matcher>(
81        self: Arc<Self>,
82        ctx: &mut ExecutionCtx,
83    ) -> VortexResult<ArrayRef> {
84        static MAX_ITERATIONS: LazyLock<usize> =
85            LazyLock::new(|| match std::env::var("VORTEX_MAX_ITERATIONS") {
86                Ok(val) => val.parse::<usize>().unwrap_or_else(|e| {
87                    vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid usize: {e}")
88                }),
89                Err(VarError::NotPresent) => 128,
90                Err(VarError::NotUnicode(_)) => {
91                    vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid unicode string")
92                }
93            });
94
95        let mut current = self.optimize()?;
96        // Stack frames: (parent, child_idx, done_predicate_for_child)
97        let mut stack: Vec<(ArrayRef, usize, DonePredicate)> = Vec::new();
98
99        for _ in 0..*MAX_ITERATIONS {
100            // Check for termination: use the stack frame's done predicate, or the root matcher.
101            let is_done = stack
102                .last()
103                .map_or(M::matches as DonePredicate, |frame| frame.2);
104            if is_done(current.as_ref()) {
105                match stack.pop() {
106                    None => {
107                        ctx.log(format_args!("-> {}", current));
108                        return Ok(current);
109                    }
110                    Some((parent, child_idx, _)) => {
111                        current = parent.with_child(child_idx, current)?;
112                        current = current.optimize()?;
113                        continue;
114                    }
115                }
116            }
117
118            // If we've reached canonical form, we can't execute any further regardless
119            // of whether the matcher matched.
120            if AnyCanonical::matches(current.as_ref()) {
121                match stack.pop() {
122                    None => {
123                        ctx.log(format_args!("-> canonical (unmatched) {}", current));
124                        return Ok(current);
125                    }
126                    Some((parent, child_idx, _)) => {
127                        current = parent.with_child(child_idx, current)?;
128                        current = current.optimize()?;
129                        continue;
130                    }
131                }
132            }
133
134            // Try execute_parent (child-driven optimized execution)
135            if let Some(rewritten) = try_execute_parent(&current, ctx)? {
136                ctx.log(format_args!(
137                    "execute_parent rewrote {} -> {}",
138                    current, rewritten
139                ));
140                current = rewritten.optimize()?;
141                continue;
142            }
143
144            // Execute the array itself
145            match current.vtable().execute(&current, ctx)? {
146                ExecutionStep::ExecuteChild(i, done) => {
147                    let child = current
148                        .nth_child(i)
149                        .vortex_expect("ExecuteChild index in bounds");
150                    ctx.log(format_args!(
151                        "ExecuteChild({i}): pushing {}, focusing on {}",
152                        current, child
153                    ));
154                    stack.push((current, i, done));
155                    current = child.optimize()?;
156                }
157                ExecutionStep::Done(result) => {
158                    ctx.log(format_args!("Done: {} -> {}", current, result));
159                    current = result;
160                }
161            }
162        }
163
164        vortex_bail!(
165            "Exceeded maximum execution iterations ({}) while executing array",
166            *MAX_ITERATIONS,
167        )
168    }
169}
170
171/// Execution context for batch CPU compute.
172///
173/// Accumulates a trace of execution steps. Individual steps are logged at TRACE level for
174/// real-time following, and the full trace is dumped at DEBUG level when the context is dropped.
175pub struct ExecutionCtx {
176    id: usize,
177    session: VortexSession,
178    ops: Vec<String>,
179}
180
181impl ExecutionCtx {
182    /// Create a new execution context with the given session.
183    pub fn new(session: VortexSession) -> Self {
184        static EXEC_CTX_ID: AtomicUsize = AtomicUsize::new(0);
185        let id = EXEC_CTX_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
186        Self {
187            id,
188            session,
189            ops: Vec::new(),
190        }
191    }
192
193    /// Get the session associated with this execution context.
194    pub fn session(&self) -> &VortexSession {
195        &self.session
196    }
197
198    /// Log an execution step at the current depth.
199    ///
200    /// Steps are accumulated and dumped as a single trace on Drop at DEBUG level.
201    /// Individual steps are also logged at TRACE level for real-time following.
202    ///
203    /// Use the [`format_args!`] macro to create the `msg` argument.
204    pub fn log(&mut self, msg: fmt::Arguments<'_>) {
205        if tracing::enabled!(tracing::Level::DEBUG) {
206            let formatted = format!(" - {msg}");
207            tracing::trace!("exec[{}]: {formatted}", self.id);
208            self.ops.push(formatted);
209        }
210    }
211}
212
213impl Display for ExecutionCtx {
214    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
215        write!(f, "exec[{}]", self.id)
216    }
217}
218
219impl Drop for ExecutionCtx {
220    fn drop(&mut self) {
221        if !self.ops.is_empty() && tracing::enabled!(tracing::Level::DEBUG) {
222            // Unlike itertools `.format()` (panics in 0.14 on second format)
223            struct FmtOps<'a>(&'a [String]);
224            impl Display for FmtOps<'_> {
225                fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
226                    for (i, op) in self.0.iter().enumerate() {
227                        if i > 0 {
228                            f.write_str("\n")?;
229                        }
230                        f.write_str(op)?;
231                    }
232                    Ok(())
233                }
234            }
235            tracing::debug!("exec[{}] trace:\n{}", self.id, FmtOps(&self.ops));
236        }
237    }
238}
239
240/// Executing an [`ArrayRef`] into an [`ArrayRef`] is the atomic execution loop within Vortex.
241///
242/// It attempts to take the smallest possible step of execution such that the returned array
243/// is incrementally more "executed" than the input array. In other words, it is closer to becoming
244/// a canonical array.
245///
246/// The execution steps are as follows:
247/// 0. Check for canonical.
248/// 1. Attempt to `reduce` the array with metadata-only optimizations.
249/// 2. Attempt to call `reduce_parent` on each child.
250/// 3. Attempt to call `execute_parent` on each child.
251/// 4. Call `execute` on the array itself (which returns an [`ExecutionStep`]).
252///
253/// Most users will not call this method directly, instead preferring to specify an executable
254/// target such as [`crate::Columnar`], [`Canonical`], or any of the canonical array types (such as
255/// [`crate::arrays::PrimitiveArray`]).
256impl Executable for ArrayRef {
257    fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self> {
258        // 0. Check for canonical
259        if let Some(canonical) = array.as_opt::<AnyCanonical>() {
260            ctx.log(format_args!("-> canonical {}", array));
261            return Ok(Canonical::from(canonical).into_array());
262        }
263
264        // 1. reduce (metadata-only rewrites)
265        if let Some(reduced) = array.vtable().reduce(&array)? {
266            ctx.log(format_args!("reduce: rewrote {} -> {}", array, reduced));
267            reduced.statistics().inherit_from(array.statistics());
268            return Ok(reduced);
269        }
270
271        // 2. reduce_parent (child-driven metadata-only rewrites)
272        for child_idx in 0..array.nchildren() {
273            let child = array.nth_child(child_idx).vortex_expect("checked length");
274            if let Some(reduced_parent) = child.vtable().reduce_parent(&child, &array, child_idx)? {
275                ctx.log(format_args!(
276                    "reduce_parent: child[{}]({}) rewrote {} -> {}",
277                    child_idx,
278                    child.encoding_id(),
279                    array,
280                    reduced_parent
281                ));
282                reduced_parent.statistics().inherit_from(array.statistics());
283                return Ok(reduced_parent);
284            }
285        }
286
287        // 3. execute_parent (child-driven optimized execution)
288        for child_idx in 0..array.nchildren() {
289            let child = array.nth_child(child_idx).vortex_expect("checked length");
290            if let Some(executed_parent) = child
291                .vtable()
292                .execute_parent(&child, &array, child_idx, ctx)?
293            {
294                ctx.log(format_args!(
295                    "execute_parent: child[{}]({}) rewrote {} -> {}",
296                    child_idx,
297                    child.encoding_id(),
298                    array,
299                    executed_parent
300                ));
301                executed_parent
302                    .statistics()
303                    .inherit_from(array.statistics());
304                return Ok(executed_parent);
305            }
306        }
307
308        // 4. execute (returns an ExecutionStep)
309        ctx.log(format_args!("executing {}", array));
310        match array.vtable().execute(&array, ctx)? {
311            ExecutionStep::Done(result) => {
312                ctx.log(format_args!("-> {}", result.as_ref()));
313                Ok(result)
314            }
315            ExecutionStep::ExecuteChild(i, _) => {
316                // For single-step execution, handle ExecuteChild by executing the child,
317                // replacing it, and returning the updated array.
318                let child = array.nth_child(i).vortex_expect("valid child index");
319                let executed_child = child.execute::<ArrayRef>(ctx)?;
320                array.with_child(i, executed_child)
321            }
322        }
323    }
324}
325
326/// Try execute_parent on each child of the array.
327fn try_execute_parent(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Option<ArrayRef>> {
328    for child_idx in 0..array.nchildren() {
329        let child = array
330            .nth_child(child_idx)
331            .vortex_expect("checked nchildren");
332        if let Some(result) = child
333            .vtable()
334            .execute_parent(&child, array, child_idx, ctx)?
335        {
336            result.statistics().inherit_from(array.statistics());
337            return Ok(Some(result));
338        }
339    }
340    Ok(None)
341}
342
343/// A predicate that determines when an array has reached a desired form during execution.
344pub type DonePredicate = fn(&dyn DynArray) -> bool;
345
346/// The result of a single execution step on an array encoding.
347///
348/// Instead of recursively executing children, encodings return an `ExecutionStep` that tells the
349/// scheduler what to do next. This enables the scheduler to manage execution iteratively using
350/// an explicit work stack, run cross-step optimizations, and cache shared sub-expressions.
351pub enum ExecutionStep {
352    /// Request that the scheduler execute child at the given index, using the provided
353    /// [`DonePredicate`] to determine when the child is "done", then replace the child in this
354    /// array and re-enter execution.
355    ///
356    /// Between steps, the scheduler runs reduce/reduce_parent rules to fixpoint, enabling
357    /// cross-step optimization (e.g., pushing scalar functions through newly-decoded children).
358    ///
359    /// Use [`ExecutionStep::execute_child`] instead of constructing this variant directly.
360    ExecuteChild(usize, DonePredicate),
361
362    /// Execution is complete. The result may be in any encoding — not necessarily canonical.
363    /// The scheduler will continue executing the result if it has not yet reached the target form.
364    Done(ArrayRef),
365}
366
367impl ExecutionStep {
368    /// Request execution of child at `child_idx` until it matches the given [`Matcher`].
369    pub fn execute_child<M: Matcher>(child_idx: usize) -> Self {
370        ExecutionStep::ExecuteChild(child_idx, M::matches)
371    }
372
373    /// Signal that execution is complete with the given result.
374    pub fn done(result: ArrayRef) -> Self {
375        ExecutionStep::Done(result)
376    }
377}
378
379impl fmt::Debug for ExecutionStep {
380    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
381        match self {
382            ExecutionStep::ExecuteChild(idx, _) => {
383                f.debug_tuple("ExecuteChild").field(idx).finish()
384            }
385            ExecutionStep::Done(result) => f.debug_tuple("Done").field(result).finish(),
386        }
387    }
388}
389
390/// Extension trait for creating an execution context from a session.
391pub trait VortexSessionExecute {
392    /// Create a new execution context from this session.
393    fn create_execution_ctx(&self) -> ExecutionCtx;
394}
395
396impl VortexSessionExecute for VortexSession {
397    fn create_execution_ctx(&self) -> ExecutionCtx {
398        ExecutionCtx::new(self.clone())
399    }
400}