vortex_array/executor.rs
1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! The execution engine: iteratively transforms arrays toward canonical form.
5//!
6//! Execution proceeds through four layers tried in order on each iteration:
7//!
8//! 1. **`reduce`** -- metadata-only self-rewrite (cheapest).
9//! 2. **`reduce_parent`** -- metadata-only child-driven parent rewrite.
10//! 3. **`execute_parent`** -- child-driven fused execution (may read buffers).
11//! 4. **`execute`** -- the encoding's own decode step (most expensive).
12//!
13//! The main entry point is [`DynArray::execute_until`], which uses an explicit work stack
14//! to drive execution iteratively without recursion. Between steps, the optimizer runs
15//! reduce/reduce_parent rules to fixpoint.
16//!
17//! See <https://docs.vortex.dev/developer-guide/internals/execution> for a full description
18//! of the model.
19
20use std::env::VarError;
21use std::fmt;
22use std::fmt::Display;
23use std::sync::Arc;
24use std::sync::LazyLock;
25use std::sync::atomic::AtomicUsize;
26
27use vortex_error::VortexExpect;
28use vortex_error::VortexResult;
29use vortex_error::vortex_bail;
30use vortex_error::vortex_panic;
31use vortex_session::VortexSession;
32
33use crate::AnyCanonical;
34use crate::ArrayRef;
35use crate::Canonical;
36use crate::DynArray;
37use crate::IntoArray;
38use crate::matcher::Matcher;
39use crate::optimizer::ArrayOptimizer;
40use crate::vtable::VTable;
41use crate::vtable::upcast_array;
42
43/// Maximum number of iterations to attempt when executing an array before giving up and returning
44/// an error.
45pub(crate) static MAX_ITERATIONS: LazyLock<usize> =
46 LazyLock::new(|| match std::env::var("VORTEX_MAX_ITERATIONS") {
47 Ok(val) => val
48 .parse::<usize>()
49 .unwrap_or_else(|e| vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid usize: {e}")),
50 Err(VarError::NotPresent) => 128,
51 Err(VarError::NotUnicode(_)) => {
52 vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid unicode string")
53 }
54 });
55
56/// Marker trait for types that an [`ArrayRef`] can be executed into.
57///
58/// Implementors must provide an implementation of `execute` that takes
59/// an [`ArrayRef`] and an [`ExecutionCtx`], and produces an instance of the
60/// implementor type.
61///
62/// Users should use the `Array::execute` or `Array::execute_as` methods
63pub trait Executable: Sized {
64 fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self>;
65}
66
67impl dyn DynArray + '_ {
68 /// Execute this array to produce an instance of `E`.
69 ///
70 /// See the [`Executable`] implementation for details on how this execution is performed.
71 pub fn execute<E: Executable>(self: Arc<Self>, ctx: &mut ExecutionCtx) -> VortexResult<E> {
72 E::execute(self, ctx)
73 }
74
75 /// Execute this array, labeling the execution step with a name for tracing.
76 pub fn execute_as<E: Executable>(
77 self: Arc<Self>,
78 _name: &'static str,
79 ctx: &mut ExecutionCtx,
80 ) -> VortexResult<E> {
81 E::execute(self, ctx)
82 }
83
84 /// Iteratively execute this array until the [`Matcher`] matches, using an explicit work
85 /// stack.
86 ///
87 /// The scheduler repeatedly:
88 /// 1. Checks if the current array matches `M` — if so, pops the stack or returns.
89 /// 2. Runs `execute_parent` on each child for child-driven optimizations.
90 /// 3. Calls `execute` which returns an [`ExecutionStep`].
91 ///
92 /// Note: the returned array may not match `M`. If execution converges to a canonical form
93 /// that does not match `M`, the canonical array is returned since no further execution
94 /// progress is possible.
95 ///
96 /// For safety, we will error when the number of execution iterations reaches a configurable
97 /// maximum (default 128, override with `VORTEX_MAX_ITERATIONS`).
98 pub fn execute_until<M: Matcher>(
99 self: Arc<Self>,
100 ctx: &mut ExecutionCtx,
101 ) -> VortexResult<ArrayRef> {
102 static MAX_ITERATIONS: LazyLock<usize> =
103 LazyLock::new(|| match std::env::var("VORTEX_MAX_ITERATIONS") {
104 Ok(val) => val.parse::<usize>().unwrap_or_else(|e| {
105 vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid usize: {e}")
106 }),
107 Err(VarError::NotPresent) => 128,
108 Err(VarError::NotUnicode(_)) => {
109 vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid unicode string")
110 }
111 });
112
113 let mut current = self.optimize()?;
114 // Stack frames: (parent, child_idx, done_predicate_for_child)
115 let mut stack: Vec<(ArrayRef, usize, DonePredicate)> = Vec::new();
116
117 for _ in 0..*MAX_ITERATIONS {
118 // Check for termination: use the stack frame's done predicate, or the root matcher.
119 let is_done = stack
120 .last()
121 .map_or(M::matches as DonePredicate, |frame| frame.2);
122 if is_done(current.as_ref()) {
123 match stack.pop() {
124 None => {
125 ctx.log(format_args!("-> {}", current));
126 return Ok(current);
127 }
128 Some((parent, child_idx, _)) => {
129 current = parent.with_child(child_idx, current)?;
130 current = current.optimize()?;
131 continue;
132 }
133 }
134 }
135
136 // If we've reached canonical form, we can't execute any further regardless
137 // of whether the matcher matched.
138 if AnyCanonical::matches(current.as_ref()) {
139 match stack.pop() {
140 None => {
141 ctx.log(format_args!("-> canonical (unmatched) {}", current));
142 return Ok(current);
143 }
144 Some((parent, child_idx, _)) => {
145 current = parent.with_child(child_idx, current)?;
146 current = current.optimize()?;
147 continue;
148 }
149 }
150 }
151
152 // Try execute_parent (child-driven optimized execution)
153 if let Some(rewritten) = try_execute_parent(¤t, ctx)? {
154 ctx.log(format_args!(
155 "execute_parent rewrote {} -> {}",
156 current, rewritten
157 ));
158 current = rewritten.optimize()?;
159 continue;
160 }
161
162 // Execute the array itself.
163 let result = execute_step(current, ctx)?;
164 let (array, step) = result.into_parts();
165 match step {
166 ExecutionStep::ExecuteChild(i, done) => {
167 let child = array
168 .nth_child(i)
169 .vortex_expect("ExecuteChild index in bounds");
170 ctx.log(format_args!(
171 "ExecuteChild({i}): pushing {}, focusing on {}",
172 array, child
173 ));
174 stack.push((array, i, done));
175 current = child.optimize()?;
176 }
177 ExecutionStep::Done => {
178 ctx.log(format_args!("Done: {}", array));
179 current = array;
180 }
181 }
182 }
183
184 vortex_bail!(
185 "Exceeded maximum execution iterations ({}) while executing array",
186 *MAX_ITERATIONS,
187 )
188 }
189}
190
191/// Execution context for batch CPU compute.
192///
193/// Accumulates a trace of execution steps. Individual steps are logged at TRACE level for
194/// real-time following, and the full trace is dumped at DEBUG level when the context is dropped.
195pub struct ExecutionCtx {
196 id: usize,
197 session: VortexSession,
198 ops: Vec<String>,
199}
200
201impl ExecutionCtx {
202 /// Create a new execution context with the given session.
203 pub fn new(session: VortexSession) -> Self {
204 static EXEC_CTX_ID: AtomicUsize = AtomicUsize::new(0);
205 let id = EXEC_CTX_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
206 Self {
207 id,
208 session,
209 ops: Vec::new(),
210 }
211 }
212
213 /// Get the session associated with this execution context.
214 pub fn session(&self) -> &VortexSession {
215 &self.session
216 }
217
218 /// Log an execution step at the current depth.
219 ///
220 /// Steps are accumulated and dumped as a single trace on Drop at DEBUG level.
221 /// Individual steps are also logged at TRACE level for real-time following.
222 ///
223 /// Use the [`format_args!`] macro to create the `msg` argument.
224 pub fn log(&mut self, msg: fmt::Arguments<'_>) {
225 if tracing::enabled!(tracing::Level::DEBUG) {
226 let formatted = format!(" - {msg}");
227 tracing::trace!("exec[{}]: {formatted}", self.id);
228 self.ops.push(formatted);
229 }
230 }
231}
232
233impl Display for ExecutionCtx {
234 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
235 write!(f, "exec[{}]", self.id)
236 }
237}
238
239impl Drop for ExecutionCtx {
240 fn drop(&mut self) {
241 if !self.ops.is_empty() && tracing::enabled!(tracing::Level::DEBUG) {
242 // Unlike itertools `.format()` (panics in 0.14 on second format)
243 struct FmtOps<'a>(&'a [String]);
244 impl Display for FmtOps<'_> {
245 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
246 for (i, op) in self.0.iter().enumerate() {
247 if i > 0 {
248 f.write_str("\n")?;
249 }
250 f.write_str(op)?;
251 }
252 Ok(())
253 }
254 }
255 tracing::debug!("exec[{}] trace:\n{}", self.id, FmtOps(&self.ops));
256 }
257 }
258}
259
260/// Executing an [`ArrayRef`] into an [`ArrayRef`] is the atomic execution loop within Vortex.
261///
262/// It attempts to take the smallest possible step of execution such that the returned array
263/// is incrementally more "executed" than the input array. In other words, it is closer to becoming
264/// a canonical array.
265///
266/// The execution steps are as follows:
267/// 0. Check for canonical.
268/// 1. Attempt to `reduce` the array with metadata-only optimizations.
269/// 2. Attempt to call `reduce_parent` on each child.
270/// 3. Attempt to call `execute_parent` on each child.
271/// 4. Call `execute` on the array itself (which returns an [`ExecutionStep`]).
272///
273/// Most users will not call this method directly, instead preferring to specify an executable
274/// target such as [`crate::Columnar`], [`Canonical`], or any of the canonical array types (such as
275/// [`crate::arrays::PrimitiveArray`]).
276impl Executable for ArrayRef {
277 fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self> {
278 // 0. Check for canonical
279 if let Some(canonical) = array.as_opt::<AnyCanonical>() {
280 ctx.log(format_args!("-> canonical {}", array));
281 return Ok(Canonical::from(canonical).into_array());
282 }
283
284 // 1. reduce (metadata-only rewrites)
285 if let Some(reduced) = array.vtable().reduce(&array)? {
286 ctx.log(format_args!("reduce: rewrote {} -> {}", array, reduced));
287 reduced.statistics().inherit_from(array.statistics());
288 return Ok(reduced);
289 }
290
291 // 2. reduce_parent (child-driven metadata-only rewrites)
292 for child_idx in 0..array.nchildren() {
293 let child = array.nth_child(child_idx).vortex_expect("checked length");
294 if let Some(reduced_parent) = child.vtable().reduce_parent(&child, &array, child_idx)? {
295 ctx.log(format_args!(
296 "reduce_parent: child[{}]({}) rewrote {} -> {}",
297 child_idx,
298 child.encoding_id(),
299 array,
300 reduced_parent
301 ));
302 reduced_parent.statistics().inherit_from(array.statistics());
303 return Ok(reduced_parent);
304 }
305 }
306
307 // 3. execute_parent (child-driven optimized execution)
308 for child_idx in 0..array.nchildren() {
309 // TODO(joe): remove internal copy in nth_child.
310 let child = array.nth_child(child_idx).vortex_expect("checked length");
311 if let Some(executed_parent) = child
312 .vtable()
313 .execute_parent(&child, &array, child_idx, ctx)?
314 {
315 ctx.log(format_args!(
316 "execute_parent: child[{}]({}) rewrote {} -> {}",
317 child_idx,
318 child.encoding_id(),
319 array,
320 executed_parent
321 ));
322 executed_parent
323 .statistics()
324 .inherit_from(array.statistics());
325 return Ok(executed_parent);
326 }
327 }
328
329 // 4. execute (returns an ExecutionResult)
330 ctx.log(format_args!("executing {}", array));
331 let result = execute_step(array, ctx)?;
332 let (array, step) = result.into_parts();
333 match step {
334 ExecutionStep::Done => {
335 ctx.log(format_args!("-> {}", array.as_ref()));
336 Ok(array)
337 }
338 ExecutionStep::ExecuteChild(i, _) => {
339 // For single-step execution, handle ExecuteChild by executing the child,
340 // replacing it, and returning the updated array.
341 let child = array.nth_child(i).vortex_expect("valid child index");
342 let executed_child = child.execute::<ArrayRef>(ctx)?;
343 array.with_child(i, executed_child)
344 }
345 }
346 }
347}
348
349/// Execute a single step on an array, consuming it.
350///
351/// Extracts the vtable before consuming the array to avoid borrow conflicts.
352fn execute_step(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
353 let vtable = array.vtable().clone_boxed();
354 vtable.execute(array, ctx)
355}
356
357/// Try execute_parent on each child of the array.
358fn try_execute_parent(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Option<ArrayRef>> {
359 for child_idx in 0..array.nchildren() {
360 let child = array
361 .nth_child(child_idx)
362 .vortex_expect("checked nchildren");
363 if let Some(result) = child
364 .vtable()
365 .execute_parent(&child, array, child_idx, ctx)?
366 {
367 result.statistics().inherit_from(array.statistics());
368 return Ok(Some(result));
369 }
370 }
371 Ok(None)
372}
373
374/// A predicate that determines when an array has reached a desired form during execution.
375pub type DonePredicate = fn(&dyn DynArray) -> bool;
376
377/// Metadata-only step indicator returned alongside an array in [`ExecutionResult`].
378///
379/// Instead of recursively executing children, encodings return an `ExecutionStep` that tells the
380/// scheduler what to do next. This enables the scheduler to manage execution iteratively using
381/// an explicit work stack, run cross-step optimizations, and cache shared sub-expressions.
382pub enum ExecutionStep {
383 /// Request that the scheduler execute child at the given index, using the provided
384 /// [`DonePredicate`] to determine when the child is "done", then replace the child in this
385 /// array and re-enter execution.
386 ///
387 /// Between steps, the scheduler runs reduce/reduce_parent rules to fixpoint, enabling
388 /// cross-step optimization (e.g., pushing scalar functions through newly-decoded children).
389 ExecuteChild(usize, DonePredicate),
390
391 /// Execution is complete. The array in the accompanying [`ExecutionResult`] is the result.
392 /// The scheduler will continue executing if it has not yet reached the target form.
393 Done,
394}
395
396impl fmt::Debug for ExecutionStep {
397 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
398 match self {
399 ExecutionStep::ExecuteChild(idx, _) => {
400 f.debug_tuple("ExecuteChild").field(idx).finish()
401 }
402 ExecutionStep::Done => write!(f, "Done"),
403 }
404 }
405}
406
407/// The result of a single execution step on an array encoding.
408///
409/// Combines an [`ArrayRef`] with an [`ExecutionStep`] to tell the scheduler both what to do next
410/// and what array to work with.
411pub struct ExecutionResult {
412 array: ArrayRef,
413 step: ExecutionStep,
414}
415
416impl ExecutionResult {
417 /// Signal that execution is complete with the given result array.
418 pub fn done(result: impl IntoArray) -> Self {
419 Self {
420 array: result.into_array(),
421 step: ExecutionStep::Done,
422 }
423 }
424
425 pub fn done_upcast<V: VTable>(arr: Arc<V::Array>) -> Self {
426 Self {
427 array: upcast_array::<V>(arr),
428 step: ExecutionStep::Done,
429 }
430 }
431
432 /// Request execution of child at `child_idx` until it matches the given [`Matcher`].
433 ///
434 /// The provided array is the (possibly modified) parent that still needs its child executed.
435 pub fn execute_child<M: Matcher>(array: impl IntoArray, child_idx: usize) -> Self {
436 Self {
437 array: array.into_array(),
438 step: ExecutionStep::ExecuteChild(child_idx, M::matches),
439 }
440 }
441
442 /// Like [`execute_child`](Self::execute_child), but accepts an `Arc<V::Array>` directly,
443 /// upcasting it to an [`ArrayRef`].
444 pub fn execute_child_upcast<V: VTable, M: Matcher>(
445 array: Arc<V::Array>,
446 child_idx: usize,
447 ) -> Self {
448 Self {
449 array: upcast_array::<V>(array),
450 step: ExecutionStep::ExecuteChild(child_idx, M::matches),
451 }
452 }
453
454 /// Returns a reference to the array.
455 pub fn array(&self) -> &ArrayRef {
456 &self.array
457 }
458
459 /// Returns a reference to the step.
460 pub fn step(&self) -> &ExecutionStep {
461 &self.step
462 }
463
464 /// Decompose into parts.
465 pub fn into_parts(self) -> (ArrayRef, ExecutionStep) {
466 (self.array, self.step)
467 }
468}
469
470impl fmt::Debug for ExecutionResult {
471 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
472 f.debug_struct("ExecutionResult")
473 .field("array", &self.array)
474 .field("step", &self.step)
475 .finish()
476 }
477}
478
479/// Require that a child array matches `$M`. If the child already matches, returns the same
480/// array unchanged. Otherwise, early-returns an [`ExecutionResult`] requesting execution of
481/// child `$idx` until it matches `$M`.
482///
483/// ```ignore
484/// let codes = require_child!(Self, array, array.codes(), 0 => Primitive);
485/// let values = require_child!(Self, array, array.values(), 1 => AnyCanonical);
486/// ```
487#[macro_export]
488macro_rules! require_child {
489 ($V:ty, $parent:expr, $child:expr, $idx:expr => $M:ty) => {{
490 if !$child.is::<$M>() {
491 return Ok($crate::ExecutionResult::execute_child_upcast::<$V, $M>(
492 std::sync::Arc::clone(&$parent),
493 $idx,
494 ));
495 }
496 $parent
497 }};
498}
499
500/// Extension trait for creating an execution context from a session.
501pub trait VortexSessionExecute {
502 /// Create a new execution context from this session.
503 fn create_execution_ctx(&self) -> ExecutionCtx;
504}
505
506impl VortexSessionExecute for VortexSession {
507 fn create_execution_ctx(&self) -> ExecutionCtx {
508 ExecutionCtx::new(self.clone())
509 }
510}