vortex_array/executor.rs
1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! The execution engine: iteratively transforms arrays toward canonical form.
5//!
6//! Execution proceeds through four layers tried in order on each iteration:
7//!
8//! 1. **`reduce`** -- metadata-only self-rewrite (cheapest).
9//! 2. **`reduce_parent`** -- metadata-only child-driven parent rewrite.
10//! 3. **`execute_parent`** -- child-driven fused execution (may read buffers).
11//! 4. **`execute`** -- the encoding's own decode step (most expensive).
12//!
13//! The main entry point is [`DynArray::execute_until`], which uses an explicit work stack
14//! to drive execution iteratively without recursion. Between steps, the optimizer runs
15//! reduce/reduce_parent rules to fixpoint.
16//!
17//! See <https://docs.vortex.dev/developer-guide/internals/execution> for a full description
18//! of the model.
19
20use std::env::VarError;
21use std::fmt;
22use std::fmt::Display;
23use std::sync::LazyLock;
24use std::sync::atomic::AtomicUsize;
25
26use vortex_error::VortexExpect;
27use vortex_error::VortexResult;
28use vortex_error::vortex_bail;
29use vortex_error::vortex_panic;
30use vortex_session::VortexSession;
31
32use crate::AnyCanonical;
33use crate::ArrayRef;
34use crate::Canonical;
35use crate::IntoArray;
36use crate::matcher::Matcher;
37use crate::memory::HostAllocatorRef;
38use crate::memory::MemorySessionExt;
39use crate::optimizer::ArrayOptimizer;
40
41/// Maximum number of iterations to attempt when executing an array before giving up and returning
42/// an error.
43pub(crate) static MAX_ITERATIONS: LazyLock<usize> =
44 LazyLock::new(|| match std::env::var("VORTEX_MAX_ITERATIONS") {
45 Ok(val) => val
46 .parse::<usize>()
47 .unwrap_or_else(|e| vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid usize: {e}")),
48 Err(VarError::NotPresent) => 128,
49 Err(VarError::NotUnicode(_)) => {
50 vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid unicode string")
51 }
52 });
53
54/// Marker trait for types that an [`ArrayRef`] can be executed into.
55///
56/// Implementors must provide an implementation of `execute` that takes
57/// an [`ArrayRef`] and an [`ExecutionCtx`], and produces an instance of the
58/// implementor type.
59///
60/// Users should use the `Array::execute` or `Array::execute_as` methods
61pub trait Executable: Sized {
62 fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self>;
63}
64
65#[allow(clippy::same_name_method)]
66impl ArrayRef {
67 /// Execute this array to produce an instance of `E`.
68 ///
69 /// See the [`Executable`] implementation for details on how this execution is performed.
70 pub fn execute<E: Executable>(self, ctx: &mut ExecutionCtx) -> VortexResult<E> {
71 E::execute(self, ctx)
72 }
73
74 /// Execute this array, labeling the execution step with a name for tracing.
75 pub fn execute_as<E: Executable>(
76 self,
77 _name: &'static str,
78 ctx: &mut ExecutionCtx,
79 ) -> VortexResult<E> {
80 E::execute(self, ctx)
81 }
82
83 /// Iteratively execute this array until the [`Matcher`] matches, using an explicit work
84 /// stack.
85 ///
86 /// The scheduler repeatedly:
87 /// 1. Checks if the current array matches `M` — if so, pops the stack or returns.
88 /// 2. Runs `execute_parent` on each child for child-driven optimizations.
89 /// 3. Calls `execute` which returns an [`ExecutionStep`].
90 ///
91 /// Note: the returned array may not match `M`. If execution converges to a canonical form
92 /// that does not match `M`, the canonical array is returned since no further execution
93 /// progress is possible.
94 ///
95 /// For safety, we will error when the number of execution iterations reaches a configurable
96 /// maximum (default 128, override with `VORTEX_MAX_ITERATIONS`).
97 pub fn execute_until<M: Matcher>(self, ctx: &mut ExecutionCtx) -> VortexResult<ArrayRef> {
98 static MAX_ITERATIONS: LazyLock<usize> =
99 LazyLock::new(|| match std::env::var("VORTEX_MAX_ITERATIONS") {
100 Ok(val) => val.parse::<usize>().unwrap_or_else(|e| {
101 vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid usize: {e}")
102 }),
103 Err(VarError::NotPresent) => 128,
104 Err(VarError::NotUnicode(_)) => {
105 vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid unicode string")
106 }
107 });
108
109 let mut current = self.optimize()?;
110 // Stack frames: (parent, slot_idx, done_predicate_for_slot)
111 let mut stack: Vec<(ArrayRef, usize, DonePredicate)> = Vec::new();
112
113 for _ in 0..*MAX_ITERATIONS {
114 // Check for termination: use the stack frame's done predicate, or the root matcher.
115 let is_done = stack
116 .last()
117 .map_or(M::matches as DonePredicate, |frame| frame.2);
118 if is_done(¤t) {
119 match stack.pop() {
120 None => {
121 ctx.log(format_args!("-> {}", current));
122 return Ok(current);
123 }
124 Some((parent, slot_idx, _)) => {
125 current = parent.with_slot(slot_idx, current)?;
126 current = current.optimize()?;
127 continue;
128 }
129 }
130 }
131
132 // If we've reached canonical form, we can't execute any further regardless
133 // of whether the matcher matched.
134 if AnyCanonical::matches(¤t) {
135 match stack.pop() {
136 None => {
137 ctx.log(format_args!("-> canonical (unmatched) {}", current));
138 return Ok(current);
139 }
140 Some((parent, slot_idx, _)) => {
141 current = parent.with_slot(slot_idx, current)?;
142 current = current.optimize()?;
143 continue;
144 }
145 }
146 }
147
148 // Try execute_parent (child-driven optimized execution)
149 if let Some(rewritten) = try_execute_parent(¤t, ctx)? {
150 ctx.log(format_args!(
151 "execute_parent rewrote {} -> {}",
152 current, rewritten
153 ));
154 current = rewritten.optimize()?;
155 continue;
156 }
157
158 // Execute the array itself.
159 let result = execute_step(current, ctx)?;
160 let (array, step) = result.into_parts();
161 match step {
162 ExecutionStep::ExecuteSlot(i, done) => {
163 let child = array.slots()[i]
164 .clone()
165 .vortex_expect("ExecuteSlot index in bounds");
166 ctx.log(format_args!(
167 "ExecuteSlot({i}): pushing {}, focusing on {}",
168 array, child
169 ));
170 stack.push((array, i, done));
171 current = child.optimize()?;
172 }
173 ExecutionStep::Done => {
174 ctx.log(format_args!("Done: {}", array));
175 current = array;
176 }
177 }
178 }
179
180 vortex_bail!(
181 "Exceeded maximum execution iterations ({}) while executing array",
182 *MAX_ITERATIONS,
183 )
184 }
185}
186
187/// Execution context for batch CPU compute.
188///
189/// Accumulates a trace of execution steps. Individual steps are logged at TRACE level for
190/// real-time following, and the full trace is dumped at DEBUG level when the context is dropped.
191#[derive(Debug, Clone)]
192pub struct ExecutionCtx {
193 id: usize,
194 session: VortexSession,
195 ops: Vec<String>,
196}
197
198impl ExecutionCtx {
199 /// Create a new execution context with the given session.
200 pub fn new(session: VortexSession) -> Self {
201 static EXEC_CTX_ID: AtomicUsize = AtomicUsize::new(0);
202 let id = EXEC_CTX_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
203 Self {
204 id,
205 session,
206 ops: Vec::new(),
207 }
208 }
209
210 /// Get the session associated with this execution context.
211 pub fn session(&self) -> &VortexSession {
212 &self.session
213 }
214
215 /// Get the session-scoped host allocator for this execution context.
216 pub fn allocator(&self) -> HostAllocatorRef {
217 self.session.allocator()
218 }
219
220 /// Log an execution step at the current depth.
221 ///
222 /// Steps are accumulated and dumped as a single trace on Drop at DEBUG level.
223 /// Individual steps are also logged at TRACE level for real-time following.
224 ///
225 /// Use the [`format_args!`] macro to create the `msg` argument.
226 pub fn log(&mut self, msg: fmt::Arguments<'_>) {
227 if tracing::enabled!(tracing::Level::DEBUG) {
228 let formatted = format!(" - {msg}");
229 tracing::trace!("exec[{}]: {formatted}", self.id);
230 self.ops.push(formatted);
231 }
232 }
233}
234
235impl Display for ExecutionCtx {
236 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
237 write!(f, "exec[{}]", self.id)
238 }
239}
240
241impl Drop for ExecutionCtx {
242 fn drop(&mut self) {
243 if !self.ops.is_empty() && tracing::enabled!(tracing::Level::DEBUG) {
244 // Unlike itertools `.format()` (panics in 0.14 on second format)
245 struct FmtOps<'a>(&'a [String]);
246 impl Display for FmtOps<'_> {
247 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
248 for (i, op) in self.0.iter().enumerate() {
249 if i > 0 {
250 f.write_str("\n")?;
251 }
252 f.write_str(op)?;
253 }
254 Ok(())
255 }
256 }
257 tracing::debug!("exec[{}] trace:\n{}", self.id, FmtOps(&self.ops));
258 }
259 }
260}
261
262/// Executing an [`ArrayRef`] into an [`ArrayRef`] is the atomic execution loop within Vortex.
263///
264/// It attempts to take the smallest possible step of execution such that the returned array
265/// is incrementally more "executed" than the input array. In other words, it is closer to becoming
266/// a canonical array.
267///
268/// The execution steps are as follows:
269/// 0. Check for canonical.
270/// 1. Attempt to `reduce` the array with metadata-only optimizations.
271/// 2. Attempt to call `reduce_parent` on each child.
272/// 3. Attempt to call `execute_parent` on each child.
273/// 4. Call `execute` on the array itself (which returns an [`ExecutionStep`]).
274///
275/// Most users will not call this method directly, instead preferring to specify an executable
276/// target such as [`crate::Columnar`], [`Canonical`], or any of the canonical array types (such as
277/// [`crate::arrays::PrimitiveArray`]).
278impl Executable for ArrayRef {
279 fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self> {
280 // 0. Check for canonical
281 if let Some(canonical) = array.as_opt::<AnyCanonical>() {
282 ctx.log(format_args!("-> canonical {}", array));
283 return Ok(Canonical::from(canonical).into_array());
284 }
285
286 // 1. reduce (metadata-only rewrites)
287 if let Some(reduced) = array.reduce()? {
288 ctx.log(format_args!("reduce: rewrote {} -> {}", array, reduced));
289 reduced.statistics().inherit_from(array.statistics());
290 return Ok(reduced);
291 }
292
293 // 2. reduce_parent (child-driven metadata-only rewrites)
294 for (slot_idx, slot) in array.slots().iter().enumerate() {
295 let Some(child) = slot else {
296 continue;
297 };
298 if let Some(reduced_parent) = child.reduce_parent(&array, slot_idx)? {
299 ctx.log(format_args!(
300 "reduce_parent: slot[{}]({}) rewrote {} -> {}",
301 slot_idx,
302 child.encoding_id(),
303 array,
304 reduced_parent
305 ));
306 reduced_parent.statistics().inherit_from(array.statistics());
307 return Ok(reduced_parent);
308 }
309 }
310
311 // 3. execute_parent (child-driven optimized execution)
312 for (slot_idx, slot) in array.slots().iter().enumerate() {
313 let Some(child) = slot else {
314 continue;
315 };
316 if let Some(executed_parent) = child.execute_parent(&array, slot_idx, ctx)? {
317 ctx.log(format_args!(
318 "execute_parent: slot[{}]({}) rewrote {} -> {}",
319 slot_idx,
320 child.encoding_id(),
321 array,
322 executed_parent
323 ));
324 executed_parent
325 .statistics()
326 .inherit_from(array.statistics());
327 return Ok(executed_parent);
328 }
329 }
330
331 // 4. execute (returns an ExecutionResult)
332 ctx.log(format_args!("executing {}", array));
333 let result = execute_step(array, ctx)?;
334 let (array, step) = result.into_parts();
335 match step {
336 ExecutionStep::Done => {
337 ctx.log(format_args!("-> {}", array));
338 Ok(array)
339 }
340 ExecutionStep::ExecuteSlot(i, _) => {
341 // For single-step execution, handle ExecuteSlot by executing the slot,
342 // replacing it, and returning the updated array.
343 let child = array.slots()[i].clone().vortex_expect("valid slot index");
344 let executed_child = child.execute::<ArrayRef>(ctx)?;
345 array.with_slot(i, executed_child)
346 }
347 }
348 }
349}
350
351/// Execute a single step on an array, consuming it.
352///
353/// Extracts the vtable before consuming the array to avoid borrow conflicts.
354fn execute_step(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
355 array.execute_encoding(ctx)
356}
357
358/// Try execute_parent on each occupied slot of the array.
359fn try_execute_parent(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Option<ArrayRef>> {
360 for (slot_idx, slot) in array.slots().iter().enumerate() {
361 let Some(child) = slot else {
362 continue;
363 };
364 if let Some(result) = child.execute_parent(array, slot_idx, ctx)? {
365 result.statistics().inherit_from(array.statistics());
366 return Ok(Some(result));
367 }
368 }
369 Ok(None)
370}
371
372/// A predicate that determines when an array has reached a desired form during execution.
373pub type DonePredicate = fn(&ArrayRef) -> bool;
374
375/// Metadata-only step indicator returned alongside an array in [`ExecutionResult`].
376///
377/// Instead of recursively executing children, encodings return an `ExecutionStep` that tells the
378/// scheduler what to do next. This enables the scheduler to manage execution iteratively using
379/// an explicit work stack, run cross-step optimizations, and cache shared sub-expressions.
380pub enum ExecutionStep {
381 /// Request that the scheduler execute the slot at the given index, using the provided
382 /// [`DonePredicate`] to determine when the slot is "done", then replace the slot in this
383 /// array and re-enter execution.
384 ///
385 /// Between steps, the scheduler runs reduce/reduce_parent rules to fixpoint, enabling
386 /// cross-step optimization (e.g., pushing scalar functions through newly-decoded children).
387 ///
388 /// Use [`ExecutionResult::execute_slot`] instead of constructing this variant directly.
389 ExecuteSlot(usize, DonePredicate),
390
391 /// Execution is complete. The array in the accompanying [`ExecutionResult`] is the result.
392 /// The scheduler will continue executing if it has not yet reached the target form.
393 Done,
394}
395
396impl fmt::Debug for ExecutionStep {
397 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
398 match self {
399 ExecutionStep::ExecuteSlot(idx, _) => f.debug_tuple("ExecuteSlot").field(idx).finish(),
400 ExecutionStep::Done => write!(f, "Done"),
401 }
402 }
403}
404
405/// The result of a single execution step on an array encoding.
406///
407/// Combines an [`ArrayRef`] with an [`ExecutionStep`] to tell the scheduler both what to do next
408/// and what array to work with.
409pub struct ExecutionResult {
410 array: ArrayRef,
411 step: ExecutionStep,
412}
413
414impl ExecutionResult {
415 /// Signal that execution is complete with the given result array.
416 pub fn done(result: impl IntoArray) -> Self {
417 Self {
418 array: result.into_array(),
419 step: ExecutionStep::Done,
420 }
421 }
422
423 /// Request execution of slot at `slot_idx` until it matches the given [`Matcher`].
424 ///
425 /// The provided array is the (possibly modified) parent that still needs its slot executed.
426 pub fn execute_slot<M: Matcher>(array: impl IntoArray, slot_idx: usize) -> Self {
427 Self {
428 array: array.into_array(),
429 step: ExecutionStep::ExecuteSlot(slot_idx, M::matches),
430 }
431 }
432
433 /// Returns a reference to the array.
434 pub fn array(&self) -> &ArrayRef {
435 &self.array
436 }
437
438 /// Returns a reference to the step.
439 pub fn step(&self) -> &ExecutionStep {
440 &self.step
441 }
442
443 /// Decompose into parts.
444 pub fn into_parts(self) -> (ArrayRef, ExecutionStep) {
445 (self.array, self.step)
446 }
447}
448
449impl fmt::Debug for ExecutionResult {
450 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
451 f.debug_struct("ExecutionResult")
452 .field("array", &self.array)
453 .field("step", &self.step)
454 .finish()
455 }
456}
457
458/// Require that a child array matches `$M`. If the child already matches, returns the same
459/// array unchanged. Otherwise, early-returns an [`ExecutionResult`] requesting execution of
460/// child `$idx` until it matches `$M`.
461///
462/// ```ignore
463/// let array = require_child!(array, array.codes(), 0 => Primitive);
464/// let array = require_child!(array, array.values(), 1 => AnyCanonical);
465/// ```
466#[macro_export]
467macro_rules! require_child {
468 ($parent:expr, $child:expr, $idx:expr => $M:ty) => {{
469 if !$child.is::<$M>() {
470 return Ok($crate::ExecutionResult::execute_slot::<$M>(
471 $parent.clone(),
472 $idx,
473 ));
474 }
475 $parent
476 }};
477}
478
479/// Like [`require_child!`], but for optional children. If the child is `None`, this is a no-op.
480/// If the child is `Some` but does not match `$M`, early-returns an [`ExecutionResult`] requesting
481/// execution of child `$idx`.
482///
483/// Unlike `require_child!`, this is a statement macro (no value produced) and does not clone
484/// `$parent` — it is moved into the early-return path.
485///
486/// ```ignore
487/// require_opt_child!(array, array.patches().map(|p| p.indices()), 1 => Primitive);
488/// ```
489#[macro_export]
490macro_rules! require_opt_child {
491 ($parent:expr, $child_opt:expr, $idx:expr => $M:ty) => {
492 if $child_opt.is_some_and(|child| !child.is::<$M>()) {
493 return Ok($crate::ExecutionResult::execute_slot::<$M>($parent, $idx));
494 }
495 };
496}
497
498/// Require that patch slots (indices, values, and optionally chunk_offsets) are `Primitive`.
499/// If no patches are present (slots are `None`), this is a no-op.
500///
501/// Like [`require_opt_child!`], `$parent` is moved (not cloned) into the early-return path.
502///
503/// ```ignore
504/// require_patches!(array, PATCH_INDICES_SLOT, PATCH_VALUES_SLOT, PATCH_CHUNK_OFFSETS_SLOT);
505/// ```
506#[macro_export]
507macro_rules! require_patches {
508 ($parent:expr, $indices_slot:expr, $values_slot:expr, $chunk_offsets_slot:expr) => {
509 $crate::require_opt_child!(
510 $parent,
511 $parent.slots()[$indices_slot].as_ref(),
512 $indices_slot => $crate::arrays::Primitive
513 );
514 $crate::require_opt_child!(
515 $parent,
516 $parent.slots()[$values_slot].as_ref(),
517 $values_slot => $crate::arrays::Primitive
518 );
519 $crate::require_opt_child!(
520 $parent,
521 $parent.slots()[$chunk_offsets_slot].as_ref(),
522 $chunk_offsets_slot => $crate::arrays::Primitive
523 );
524 };
525}
526
527/// Require that the validity slot is a [`Bool`](crate::arrays::Bool) array. If validity is not
528/// array-backed (e.g. `NonNullable` or `AllValid`), this is a no-op. If it is array-backed but
529/// not `Bool`, early-returns an [`ExecutionResult`] requesting execution of the validity slot.
530///
531/// Like [`require_opt_child!`], `$parent` is moved (not cloned) into the early-return path.
532///
533/// ```ignore
534/// require_validity!(array, VALIDITY_SLOT);
535/// ```
536#[macro_export]
537macro_rules! require_validity {
538 ($parent:expr, $idx:expr) => {
539 $crate::require_opt_child!(
540 $parent,
541 $parent.slots()[$idx].as_ref(),
542 $idx => $crate::arrays::Bool
543 );
544 };
545}
546
547/// Extension trait for creating an execution context from a session.
548pub trait VortexSessionExecute {
549 /// Create a new execution context from this session.
550 fn create_execution_ctx(&self) -> ExecutionCtx;
551}
552
553impl VortexSessionExecute for VortexSession {
554 fn create_execution_ctx(&self) -> ExecutionCtx {
555 ExecutionCtx::new(self.clone())
556 }
557}