vortex_array/executor.rs
1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! The execution engine: iteratively transforms arrays toward canonical form.
5//!
6//! Execution proceeds through four layers tried in order on each iteration:
7//!
8//! 1. **`reduce`** -- metadata-only self-rewrite (cheapest).
9//! 2. **`reduce_parent`** -- metadata-only child-driven parent rewrite.
10//! 3. **`execute_parent`** -- child-driven fused execution (may read buffers).
11//! 4. **`execute`** -- the encoding's own decode step (most expensive).
12//!
13//! The main entry point is [`DynArray::execute_until`], which uses an explicit work stack
14//! to drive execution iteratively without recursion. Between steps, the optimizer runs
15//! reduce/reduce_parent rules to fixpoint.
16//!
17//! See <https://docs.vortex.dev/developer-guide/internals/execution> for a full description
18//! of the model.
19
20use std::env::VarError;
21use std::fmt;
22use std::fmt::Display;
23use std::sync::Arc;
24use std::sync::LazyLock;
25use std::sync::atomic::AtomicUsize;
26
27use vortex_error::VortexExpect;
28use vortex_error::VortexResult;
29use vortex_error::vortex_bail;
30use vortex_error::vortex_panic;
31use vortex_session::VortexSession;
32
33use crate::AnyCanonical;
34use crate::ArrayRef;
35use crate::Canonical;
36use crate::DynArray;
37use crate::IntoArray;
38use crate::matcher::Matcher;
39use crate::optimizer::ArrayOptimizer;
40
41/// Maximum number of iterations to attempt when executing an array before giving up and returning
42/// an error.
43pub(crate) static MAX_ITERATIONS: LazyLock<usize> =
44 LazyLock::new(|| match std::env::var("VORTEX_MAX_ITERATIONS") {
45 Ok(val) => val
46 .parse::<usize>()
47 .unwrap_or_else(|e| vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid usize: {e}")),
48 Err(VarError::NotPresent) => 128,
49 Err(VarError::NotUnicode(_)) => {
50 vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid unicode string")
51 }
52 });
53
54/// Marker trait for types that an [`ArrayRef`] can be executed into.
55///
56/// Implementors must provide an implementation of `execute` that takes
57/// an [`ArrayRef`] and an [`ExecutionCtx`], and produces an instance of the
58/// implementor type.
59///
60/// Users should use the `Array::execute` or `Array::execute_as` methods
61pub trait Executable: Sized {
62 fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self>;
63}
64
65impl dyn DynArray + '_ {
66 /// Execute this array to produce an instance of `E`.
67 ///
68 /// See the [`Executable`] implementation for details on how this execution is performed.
69 pub fn execute<E: Executable>(self: Arc<Self>, ctx: &mut ExecutionCtx) -> VortexResult<E> {
70 E::execute(self, ctx)
71 }
72
73 /// Execute this array, labeling the execution step with a name for tracing.
74 pub fn execute_as<E: Executable>(
75 self: Arc<Self>,
76 _name: &'static str,
77 ctx: &mut ExecutionCtx,
78 ) -> VortexResult<E> {
79 E::execute(self, ctx)
80 }
81
82 /// Iteratively execute this array until the [`Matcher`] matches, using an explicit work
83 /// stack.
84 ///
85 /// The scheduler repeatedly:
86 /// 1. Checks if the current array matches `M` — if so, pops the stack or returns.
87 /// 2. Runs `execute_parent` on each child for child-driven optimizations.
88 /// 3. Calls `execute` which returns an [`ExecutionStep`].
89 ///
90 /// Note: the returned array may not match `M`. If execution converges to a canonical form
91 /// that does not match `M`, the canonical array is returned since no further execution
92 /// progress is possible.
93 ///
94 /// For safety, we will error when the number of execution iterations reaches a configurable
95 /// maximum (default 128, override with `VORTEX_MAX_ITERATIONS`).
96 pub fn execute_until<M: Matcher>(
97 self: Arc<Self>,
98 ctx: &mut ExecutionCtx,
99 ) -> VortexResult<ArrayRef> {
100 static MAX_ITERATIONS: LazyLock<usize> =
101 LazyLock::new(|| match std::env::var("VORTEX_MAX_ITERATIONS") {
102 Ok(val) => val.parse::<usize>().unwrap_or_else(|e| {
103 vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid usize: {e}")
104 }),
105 Err(VarError::NotPresent) => 128,
106 Err(VarError::NotUnicode(_)) => {
107 vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid unicode string")
108 }
109 });
110
111 let mut current = self.optimize()?;
112 // Stack frames: (parent, child_idx, done_predicate_for_child)
113 let mut stack: Vec<(ArrayRef, usize, DonePredicate)> = Vec::new();
114
115 for _ in 0..*MAX_ITERATIONS {
116 // Check for termination: use the stack frame's done predicate, or the root matcher.
117 let is_done = stack
118 .last()
119 .map_or(M::matches as DonePredicate, |frame| frame.2);
120 if is_done(current.as_ref()) {
121 match stack.pop() {
122 None => {
123 ctx.log(format_args!("-> {}", current));
124 return Ok(current);
125 }
126 Some((parent, child_idx, _)) => {
127 current = parent.with_child(child_idx, current)?;
128 current = current.optimize()?;
129 continue;
130 }
131 }
132 }
133
134 // If we've reached canonical form, we can't execute any further regardless
135 // of whether the matcher matched.
136 if AnyCanonical::matches(current.as_ref()) {
137 match stack.pop() {
138 None => {
139 ctx.log(format_args!("-> canonical (unmatched) {}", current));
140 return Ok(current);
141 }
142 Some((parent, child_idx, _)) => {
143 current = parent.with_child(child_idx, current)?;
144 current = current.optimize()?;
145 continue;
146 }
147 }
148 }
149
150 // Try execute_parent (child-driven optimized execution)
151 if let Some(rewritten) = try_execute_parent(¤t, ctx)? {
152 ctx.log(format_args!(
153 "execute_parent rewrote {} -> {}",
154 current, rewritten
155 ));
156 current = rewritten.optimize()?;
157 continue;
158 }
159
160 // Execute the array itself.
161 let result = execute_step(current, ctx)?;
162 let (array, step) = result.into_parts();
163 match step {
164 ExecutionStep::ExecuteChild(i, done) => {
165 let child = array
166 .nth_child(i)
167 .vortex_expect("ExecuteChild index in bounds");
168 ctx.log(format_args!(
169 "ExecuteChild({i}): pushing {}, focusing on {}",
170 array, child
171 ));
172 stack.push((array, i, done));
173 current = child.optimize()?;
174 }
175 ExecutionStep::Done => {
176 ctx.log(format_args!("Done: {}", array));
177 current = array;
178 }
179 }
180 }
181
182 vortex_bail!(
183 "Exceeded maximum execution iterations ({}) while executing array",
184 *MAX_ITERATIONS,
185 )
186 }
187}
188
189/// Execution context for batch CPU compute.
190///
191/// Accumulates a trace of execution steps. Individual steps are logged at TRACE level for
192/// real-time following, and the full trace is dumped at DEBUG level when the context is dropped.
193pub struct ExecutionCtx {
194 id: usize,
195 session: VortexSession,
196 ops: Vec<String>,
197}
198
199impl ExecutionCtx {
200 /// Create a new execution context with the given session.
201 pub fn new(session: VortexSession) -> Self {
202 static EXEC_CTX_ID: AtomicUsize = AtomicUsize::new(0);
203 let id = EXEC_CTX_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
204 Self {
205 id,
206 session,
207 ops: Vec::new(),
208 }
209 }
210
211 /// Get the session associated with this execution context.
212 pub fn session(&self) -> &VortexSession {
213 &self.session
214 }
215
216 /// Log an execution step at the current depth.
217 ///
218 /// Steps are accumulated and dumped as a single trace on Drop at DEBUG level.
219 /// Individual steps are also logged at TRACE level for real-time following.
220 ///
221 /// Use the [`format_args!`] macro to create the `msg` argument.
222 pub fn log(&mut self, msg: fmt::Arguments<'_>) {
223 if tracing::enabled!(tracing::Level::DEBUG) {
224 let formatted = format!(" - {msg}");
225 tracing::trace!("exec[{}]: {formatted}", self.id);
226 self.ops.push(formatted);
227 }
228 }
229}
230
231impl Display for ExecutionCtx {
232 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
233 write!(f, "exec[{}]", self.id)
234 }
235}
236
237impl Drop for ExecutionCtx {
238 fn drop(&mut self) {
239 if !self.ops.is_empty() && tracing::enabled!(tracing::Level::DEBUG) {
240 // Unlike itertools `.format()` (panics in 0.14 on second format)
241 struct FmtOps<'a>(&'a [String]);
242 impl Display for FmtOps<'_> {
243 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
244 for (i, op) in self.0.iter().enumerate() {
245 if i > 0 {
246 f.write_str("\n")?;
247 }
248 f.write_str(op)?;
249 }
250 Ok(())
251 }
252 }
253 tracing::debug!("exec[{}] trace:\n{}", self.id, FmtOps(&self.ops));
254 }
255 }
256}
257
258/// Executing an [`ArrayRef`] into an [`ArrayRef`] is the atomic execution loop within Vortex.
259///
260/// It attempts to take the smallest possible step of execution such that the returned array
261/// is incrementally more "executed" than the input array. In other words, it is closer to becoming
262/// a canonical array.
263///
264/// The execution steps are as follows:
265/// 0. Check for canonical.
266/// 1. Attempt to `reduce` the array with metadata-only optimizations.
267/// 2. Attempt to call `reduce_parent` on each child.
268/// 3. Attempt to call `execute_parent` on each child.
269/// 4. Call `execute` on the array itself (which returns an [`ExecutionStep`]).
270///
271/// Most users will not call this method directly, instead preferring to specify an executable
272/// target such as [`crate::Columnar`], [`Canonical`], or any of the canonical array types (such as
273/// [`crate::arrays::PrimitiveArray`]).
274impl Executable for ArrayRef {
275 fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self> {
276 // 0. Check for canonical
277 if let Some(canonical) = array.as_opt::<AnyCanonical>() {
278 ctx.log(format_args!("-> canonical {}", array));
279 return Ok(Canonical::from(canonical).into_array());
280 }
281
282 // 1. reduce (metadata-only rewrites)
283 if let Some(reduced) = array.vtable().reduce(&array)? {
284 ctx.log(format_args!("reduce: rewrote {} -> {}", array, reduced));
285 reduced.statistics().inherit_from(array.statistics());
286 return Ok(reduced);
287 }
288
289 // 2. reduce_parent (child-driven metadata-only rewrites)
290 for child_idx in 0..array.nchildren() {
291 let child = array.nth_child(child_idx).vortex_expect("checked length");
292 if let Some(reduced_parent) = child.vtable().reduce_parent(&child, &array, child_idx)? {
293 ctx.log(format_args!(
294 "reduce_parent: child[{}]({}) rewrote {} -> {}",
295 child_idx,
296 child.encoding_id(),
297 array,
298 reduced_parent
299 ));
300 reduced_parent.statistics().inherit_from(array.statistics());
301 return Ok(reduced_parent);
302 }
303 }
304
305 // 3. execute_parent (child-driven optimized execution)
306 for child_idx in 0..array.nchildren() {
307 // TODO(joe): remove internal copy in nth_child.
308 let child = array.nth_child(child_idx).vortex_expect("checked length");
309 if let Some(executed_parent) = child
310 .vtable()
311 .execute_parent(&child, &array, child_idx, ctx)?
312 {
313 ctx.log(format_args!(
314 "execute_parent: child[{}]({}) rewrote {} -> {}",
315 child_idx,
316 child.encoding_id(),
317 array,
318 executed_parent
319 ));
320 executed_parent
321 .statistics()
322 .inherit_from(array.statistics());
323 return Ok(executed_parent);
324 }
325 }
326
327 // 4. execute (returns an ExecutionResult)
328 ctx.log(format_args!("executing {}", array));
329 let result = execute_step(array, ctx)?;
330 let (array, step) = result.into_parts();
331 match step {
332 ExecutionStep::Done => {
333 ctx.log(format_args!("-> {}", array.as_ref()));
334 Ok(array)
335 }
336 ExecutionStep::ExecuteChild(i, _) => {
337 // For single-step execution, handle ExecuteChild by executing the child,
338 // replacing it, and returning the updated array.
339 let child = array.nth_child(i).vortex_expect("valid child index");
340 let executed_child = child.execute::<ArrayRef>(ctx)?;
341 array.with_child(i, executed_child)
342 }
343 }
344 }
345}
346
347/// Execute a single step on an array, consuming it.
348///
349/// Extracts the vtable before consuming the array to avoid borrow conflicts.
350fn execute_step(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
351 let vtable = array.vtable().clone_boxed();
352 vtable.execute(array, ctx)
353}
354
355/// Try execute_parent on each child of the array.
356fn try_execute_parent(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Option<ArrayRef>> {
357 for child_idx in 0..array.nchildren() {
358 let child = array
359 .nth_child(child_idx)
360 .vortex_expect("checked nchildren");
361 if let Some(result) = child
362 .vtable()
363 .execute_parent(&child, array, child_idx, ctx)?
364 {
365 result.statistics().inherit_from(array.statistics());
366 return Ok(Some(result));
367 }
368 }
369 Ok(None)
370}
371
372/// A predicate that determines when an array has reached a desired form during execution.
373pub type DonePredicate = fn(&dyn DynArray) -> bool;
374
375/// Metadata-only step indicator returned alongside an array in [`ExecutionResult`].
376///
377/// Instead of recursively executing children, encodings return an `ExecutionStep` that tells the
378/// scheduler what to do next. This enables the scheduler to manage execution iteratively using
379/// an explicit work stack, run cross-step optimizations, and cache shared sub-expressions.
380pub enum ExecutionStep {
381 /// Request that the scheduler execute child at the given index, using the provided
382 /// [`DonePredicate`] to determine when the child is "done", then replace the child in this
383 /// array and re-enter execution.
384 ///
385 /// Between steps, the scheduler runs reduce/reduce_parent rules to fixpoint, enabling
386 /// cross-step optimization (e.g., pushing scalar functions through newly-decoded children).
387 ExecuteChild(usize, DonePredicate),
388
389 /// Execution is complete. The array in the accompanying [`ExecutionResult`] is the result.
390 /// The scheduler will continue executing if it has not yet reached the target form.
391 Done,
392}
393
394impl fmt::Debug for ExecutionStep {
395 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
396 match self {
397 ExecutionStep::ExecuteChild(idx, _) => {
398 f.debug_tuple("ExecuteChild").field(idx).finish()
399 }
400 ExecutionStep::Done => write!(f, "Done"),
401 }
402 }
403}
404
405/// The result of a single execution step on an array encoding.
406///
407/// Combines an [`ArrayRef`] with an [`ExecutionStep`] to tell the scheduler both what to do next
408/// and what array to work with.
409pub struct ExecutionResult {
410 array: ArrayRef,
411 step: ExecutionStep,
412}
413
414impl ExecutionResult {
415 /// Signal that execution is complete with the given result array.
416 pub fn done(result: impl IntoArray) -> Self {
417 Self {
418 array: result.into_array(),
419 step: ExecutionStep::Done,
420 }
421 }
422
423 /// Request execution of child at `child_idx` until it matches the given [`Matcher`].
424 ///
425 /// The provided array is the (possibly modified) parent that still needs its child executed.
426 pub fn execute_child<M: Matcher>(array: impl IntoArray, child_idx: usize) -> Self {
427 Self {
428 array: array.into_array(),
429 step: ExecutionStep::ExecuteChild(child_idx, M::matches),
430 }
431 }
432
433 /// Returns a reference to the array.
434 pub fn array(&self) -> &ArrayRef {
435 &self.array
436 }
437
438 /// Returns a reference to the step.
439 pub fn step(&self) -> &ExecutionStep {
440 &self.step
441 }
442
443 /// Decompose into parts.
444 pub fn into_parts(self) -> (ArrayRef, ExecutionStep) {
445 (self.array, self.step)
446 }
447}
448
449impl fmt::Debug for ExecutionResult {
450 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
451 f.debug_struct("ExecutionResult")
452 .field("array", &self.array)
453 .field("step", &self.step)
454 .finish()
455 }
456}
457
458/// Require that a child array matches `$M`. If the child already matches, returns the same
459/// array unchanged. Otherwise, early-returns an [`ExecutionResult`] requesting execution of
460/// child `$idx` until it matches `$M`.
461///
462/// ```ignore
463/// let array = require_child!(array, array.codes(), 0 => Primitive);
464/// let array = require_child!(array, array.values(), 1 => AnyCanonical);
465/// ```
466#[macro_export]
467macro_rules! require_child {
468 ($parent:expr, $child:expr, $idx:expr => $M:ty) => {{
469 if !$child.is::<$M>() {
470 return Ok($crate::ExecutionResult::execute_child::<$M>(
471 $parent.clone(),
472 $idx,
473 ));
474 }
475 $parent
476 }};
477}
478
479/// Extension trait for creating an execution context from a session.
480pub trait VortexSessionExecute {
481 /// Create a new execution context from this session.
482 fn create_execution_ctx(&self) -> ExecutionCtx;
483}
484
485impl VortexSessionExecute for VortexSession {
486 fn create_execution_ctx(&self) -> ExecutionCtx {
487 ExecutionCtx::new(self.clone())
488 }
489}