vortex_array/executor.rs
1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! The execution engine: iteratively transforms arrays toward canonical form.
5//!
6//! Execution proceeds through four layers tried in order on each iteration:
7//!
8//! 1. **`reduce`** -- metadata-only self-rewrite (cheapest).
9//! 2. **`reduce_parent`** -- metadata-only child-driven parent rewrite.
10//! 3. **`execute_parent`** -- child-driven fused execution (may read buffers).
11//! 4. **`execute`** -- the encoding's own decode step (most expensive).
12//!
13//! The main entry point is [`DynArray::execute_until`], which uses an explicit work stack
14//! to drive execution iteratively without recursion. Between steps, the optimizer runs
15//! reduce/reduce_parent rules to fixpoint.
16//!
17//! See <https://docs.vortex.dev/developer-guide/internals/execution> for a full description
18//! of the model.
19
20use std::env::VarError;
21use std::fmt;
22use std::fmt::Display;
23use std::sync::LazyLock;
24use std::sync::atomic::AtomicUsize;
25
26use vortex_error::VortexExpect;
27use vortex_error::VortexResult;
28use vortex_error::vortex_bail;
29use vortex_error::vortex_panic;
30use vortex_session::VortexSession;
31
32use crate::AnyCanonical;
33use crate::ArrayRef;
34use crate::Canonical;
35use crate::IntoArray;
36use crate::matcher::Matcher;
37use crate::memory::HostAllocatorRef;
38use crate::memory::MemorySessionExt;
39use crate::optimizer::ArrayOptimizer;
40
41/// Returns the maximum number of iterations to attempt when executing an array before giving up and returning
42/// an error, can be by the `VORTEX_MAX_ITERATIONS` env variables, otherwise defaults to 128.
43pub(crate) fn max_iterations() -> usize {
44 static MAX_ITERATIONS: LazyLock<usize> =
45 LazyLock::new(|| match std::env::var("VORTEX_MAX_ITERATIONS") {
46 Ok(val) => val.parse::<usize>().unwrap_or_else(|e| {
47 vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid usize: {e}")
48 }),
49 Err(VarError::NotPresent) => 128,
50 Err(VarError::NotUnicode(_)) => {
51 vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid unicode string")
52 }
53 });
54 *MAX_ITERATIONS
55}
56
57/// Marker trait for types that an [`ArrayRef`] can be executed into.
58///
59/// Implementors must provide an implementation of `execute` that takes
60/// an [`ArrayRef`] and an [`ExecutionCtx`], and produces an instance of the
61/// implementor type.
62///
63/// Users should use the `Array::execute` or `Array::execute_as` methods
64pub trait Executable: Sized {
65 fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self>;
66}
67
68#[expect(clippy::same_name_method)]
69impl ArrayRef {
70 /// Execute this array to produce an instance of `E`.
71 ///
72 /// See the [`Executable`] implementation for details on how this execution is performed.
73 pub fn execute<E: Executable>(self, ctx: &mut ExecutionCtx) -> VortexResult<E> {
74 E::execute(self, ctx)
75 }
76
77 /// Execute this array, labeling the execution step with a name for tracing.
78 pub fn execute_as<E: Executable>(
79 self,
80 _name: &'static str,
81 ctx: &mut ExecutionCtx,
82 ) -> VortexResult<E> {
83 E::execute(self, ctx)
84 }
85
86 /// Iteratively execute this array until the [`Matcher`] matches, using an explicit work
87 /// stack.
88 ///
89 /// The scheduler repeatedly:
90 /// 1. Checks if the current array matches `M` — if so, pops the stack or returns.
91 /// 2. Runs `execute_parent` on each child for child-driven optimizations.
92 /// 3. Calls `execute` which returns an [`ExecutionStep`].
93 ///
94 /// Note: the returned array may not match `M`. If execution converges to a canonical form
95 /// that does not match `M`, the canonical array is returned since no further execution
96 /// progress is possible.
97 ///
98 /// For safety, we will error when the number of execution iterations reaches a configurable
99 /// maximum (default 128, override with `VORTEX_MAX_ITERATIONS`).
100 pub fn execute_until<M: Matcher>(self, ctx: &mut ExecutionCtx) -> VortexResult<ArrayRef> {
101 let mut current = self.optimize()?;
102 // Stack frames: (parent, slot_idx, done_predicate_for_slot)
103 let mut stack: Vec<(ArrayRef, usize, DonePredicate)> = Vec::new();
104
105 for _ in 0..max_iterations() {
106 // Check for termination: use the stack frame's done predicate, or the root matcher.
107 let is_done = stack
108 .last()
109 .map_or(M::matches as DonePredicate, |frame| frame.2);
110 if is_done(¤t) {
111 match stack.pop() {
112 None => {
113 ctx.log(format_args!("-> {}", current));
114 return Ok(current);
115 }
116 Some((parent, slot_idx, _)) => {
117 current = parent.with_slot(slot_idx, current)?;
118 current = current.optimize()?;
119 continue;
120 }
121 }
122 }
123
124 // If we've reached canonical form, we can't execute any further regardless
125 // of whether the matcher matched.
126 if AnyCanonical::matches(¤t) {
127 match stack.pop() {
128 None => {
129 ctx.log(format_args!("-> canonical (unmatched) {}", current));
130 return Ok(current);
131 }
132 Some((parent, slot_idx, _)) => {
133 current = parent.with_slot(slot_idx, current)?;
134 current = current.optimize()?;
135 continue;
136 }
137 }
138 }
139
140 // Try execute_parent (child-driven optimized execution)
141 if let Some(rewritten) = try_execute_parent(¤t, ctx)? {
142 ctx.log(format_args!(
143 "execute_parent rewrote {} -> {}",
144 current, rewritten
145 ));
146 current = rewritten.optimize()?;
147 continue;
148 }
149
150 // Execute the array itself.
151 let result = execute_step(current, ctx)?;
152 let (array, step) = result.into_parts();
153 match step {
154 ExecutionStep::ExecuteSlot(i, done) => {
155 let child = array.slots()[i]
156 .clone()
157 .vortex_expect("ExecuteSlot index in bounds");
158 ctx.log(format_args!(
159 "ExecuteSlot({i}): pushing {}, focusing on {}",
160 array, child
161 ));
162 stack.push((array, i, done));
163 current = child.optimize()?;
164 }
165 ExecutionStep::Done => {
166 ctx.log(format_args!("Done: {}", array));
167 current = array;
168 }
169 }
170 }
171
172 vortex_bail!(
173 "Exceeded maximum execution iterations ({}) while executing array",
174 max_iterations(),
175 )
176 }
177}
178
179/// Execution context for batch CPU compute.
180///
181/// Accumulates a trace of execution steps. Individual steps are logged at TRACE level for
182/// real-time following, and the full trace is dumped at DEBUG level when the context is dropped.
183#[derive(Debug, Clone)]
184pub struct ExecutionCtx {
185 id: usize,
186 session: VortexSession,
187 ops: Vec<String>,
188}
189
190impl ExecutionCtx {
191 /// Create a new execution context with the given session.
192 pub fn new(session: VortexSession) -> Self {
193 static EXEC_CTX_ID: AtomicUsize = AtomicUsize::new(0);
194 let id = EXEC_CTX_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
195 Self {
196 id,
197 session,
198 ops: Vec::new(),
199 }
200 }
201
202 /// Get the session associated with this execution context.
203 pub fn session(&self) -> &VortexSession {
204 &self.session
205 }
206
207 /// Get the session-scoped host allocator for this execution context.
208 pub fn allocator(&self) -> HostAllocatorRef {
209 self.session.allocator()
210 }
211
212 /// Log an execution step at the current depth.
213 ///
214 /// Steps are accumulated and dumped as a single trace on Drop at DEBUG level.
215 /// Individual steps are also logged at TRACE level for real-time following.
216 ///
217 /// Use the [`format_args!`] macro to create the `msg` argument.
218 pub fn log(&mut self, msg: fmt::Arguments<'_>) {
219 if tracing::enabled!(tracing::Level::DEBUG) {
220 let formatted = format!(" - {msg}");
221 tracing::trace!("exec[{}]: {formatted}", self.id);
222 self.ops.push(formatted);
223 }
224 }
225}
226
227impl Display for ExecutionCtx {
228 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
229 write!(f, "exec[{}]", self.id)
230 }
231}
232
233impl Drop for ExecutionCtx {
234 fn drop(&mut self) {
235 if !self.ops.is_empty() && tracing::enabled!(tracing::Level::DEBUG) {
236 // Unlike itertools `.format()` (panics in 0.14 on second format)
237 struct FmtOps<'a>(&'a [String]);
238 impl Display for FmtOps<'_> {
239 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
240 for (i, op) in self.0.iter().enumerate() {
241 if i > 0 {
242 f.write_str("\n")?;
243 }
244 f.write_str(op)?;
245 }
246 Ok(())
247 }
248 }
249 tracing::debug!("exec[{}] trace:\n{}", self.id, FmtOps(&self.ops));
250 }
251 }
252}
253
254/// Executing an [`ArrayRef`] into an [`ArrayRef`] is the atomic execution loop within Vortex.
255///
256/// It attempts to take the smallest possible step of execution such that the returned array
257/// is incrementally more "executed" than the input array. In other words, it is closer to becoming
258/// a canonical array.
259///
260/// The execution steps are as follows:
261/// 0. Check for canonical.
262/// 1. Attempt to `reduce` the array with metadata-only optimizations.
263/// 2. Attempt to call `reduce_parent` on each child.
264/// 3. Attempt to call `execute_parent` on each child.
265/// 4. Call `execute` on the array itself (which returns an [`ExecutionStep`]).
266///
267/// Most users will not call this method directly, instead preferring to specify an executable
268/// target such as [`crate::Columnar`], [`Canonical`], or any of the canonical array types (such as
269/// [`crate::arrays::PrimitiveArray`]).
270impl Executable for ArrayRef {
271 fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self> {
272 // 0. Check for canonical
273 if let Some(canonical) = array.as_opt::<AnyCanonical>() {
274 ctx.log(format_args!("-> canonical {}", array));
275 return Ok(Canonical::from(canonical).into_array());
276 }
277
278 // 1. reduce (metadata-only rewrites)
279 if let Some(reduced) = array.reduce()? {
280 ctx.log(format_args!("reduce: rewrote {} -> {}", array, reduced));
281 reduced.statistics().inherit_from(array.statistics());
282 return Ok(reduced);
283 }
284
285 // 2. reduce_parent (child-driven metadata-only rewrites)
286 for (slot_idx, slot) in array.slots().iter().enumerate() {
287 let Some(child) = slot else {
288 continue;
289 };
290 if let Some(reduced_parent) = child.reduce_parent(&array, slot_idx)? {
291 ctx.log(format_args!(
292 "reduce_parent: slot[{}]({}) rewrote {} -> {}",
293 slot_idx,
294 child.encoding_id(),
295 array,
296 reduced_parent
297 ));
298 reduced_parent.statistics().inherit_from(array.statistics());
299 return Ok(reduced_parent);
300 }
301 }
302
303 // 3. execute_parent (child-driven optimized execution)
304 for (slot_idx, slot) in array.slots().iter().enumerate() {
305 let Some(child) = slot else {
306 continue;
307 };
308 if let Some(executed_parent) = child.execute_parent(&array, slot_idx, ctx)? {
309 ctx.log(format_args!(
310 "execute_parent: slot[{}]({}) rewrote {} -> {}",
311 slot_idx,
312 child.encoding_id(),
313 array,
314 executed_parent
315 ));
316 executed_parent
317 .statistics()
318 .inherit_from(array.statistics());
319 return Ok(executed_parent);
320 }
321 }
322
323 // 4. execute (returns an ExecutionResult)
324 ctx.log(format_args!("executing {}", array));
325 let result = execute_step(array, ctx)?;
326 let (array, step) = result.into_parts();
327 match step {
328 ExecutionStep::Done => {
329 ctx.log(format_args!("-> {}", array));
330 Ok(array)
331 }
332 ExecutionStep::ExecuteSlot(i, _) => {
333 // For single-step execution, handle ExecuteSlot by executing the slot,
334 // replacing it, and returning the updated array.
335 let child = array.slots()[i].clone().vortex_expect("valid slot index");
336 let executed_child = child.execute::<ArrayRef>(ctx)?;
337 array.with_slot(i, executed_child)
338 }
339 }
340 }
341}
342
343/// Execute a single step on an array, consuming it.
344///
345/// Extracts the vtable before consuming the array to avoid borrow conflicts.
346fn execute_step(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
347 array.execute_encoding(ctx)
348}
349
350/// Try execute_parent on each occupied slot of the array.
351fn try_execute_parent(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Option<ArrayRef>> {
352 for (slot_idx, slot) in array.slots().iter().enumerate() {
353 let Some(child) = slot else {
354 continue;
355 };
356 if let Some(result) = child.execute_parent(array, slot_idx, ctx)? {
357 result.statistics().inherit_from(array.statistics());
358 return Ok(Some(result));
359 }
360 }
361 Ok(None)
362}
363
364/// A predicate that determines when an array has reached a desired form during execution.
365pub type DonePredicate = fn(&ArrayRef) -> bool;
366
367/// Metadata-only step indicator returned alongside an array in [`ExecutionResult`].
368///
369/// Instead of recursively executing children, encodings return an `ExecutionStep` that tells the
370/// scheduler what to do next. This enables the scheduler to manage execution iteratively using
371/// an explicit work stack, run cross-step optimizations, and cache shared sub-expressions.
372pub enum ExecutionStep {
373 /// Request that the scheduler execute the slot at the given index, using the provided
374 /// [`DonePredicate`] to determine when the slot is "done", then replace the slot in this
375 /// array and re-enter execution.
376 ///
377 /// Between steps, the scheduler runs reduce/reduce_parent rules to fixpoint, enabling
378 /// cross-step optimization (e.g., pushing scalar functions through newly-decoded children).
379 ///
380 /// Use [`ExecutionResult::execute_slot`] instead of constructing this variant directly.
381 ExecuteSlot(usize, DonePredicate),
382
383 /// Execution is complete. The array in the accompanying [`ExecutionResult`] is the result.
384 /// The scheduler will continue executing if it has not yet reached the target form.
385 Done,
386}
387
388impl fmt::Debug for ExecutionStep {
389 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
390 match self {
391 ExecutionStep::ExecuteSlot(idx, _) => f.debug_tuple("ExecuteSlot").field(idx).finish(),
392 ExecutionStep::Done => write!(f, "Done"),
393 }
394 }
395}
396
397/// The result of a single execution step on an array encoding.
398///
399/// Combines an [`ArrayRef`] with an [`ExecutionStep`] to tell the scheduler both what to do next
400/// and what array to work with.
401pub struct ExecutionResult {
402 array: ArrayRef,
403 step: ExecutionStep,
404}
405
406impl ExecutionResult {
407 /// Signal that execution is complete with the given result array.
408 pub fn done(result: impl IntoArray) -> Self {
409 Self {
410 array: result.into_array(),
411 step: ExecutionStep::Done,
412 }
413 }
414
415 /// Request execution of slot at `slot_idx` until it matches the given [`Matcher`].
416 ///
417 /// The provided array is the (possibly modified) parent that still needs its slot executed.
418 pub fn execute_slot<M: Matcher>(array: impl IntoArray, slot_idx: usize) -> Self {
419 Self {
420 array: array.into_array(),
421 step: ExecutionStep::ExecuteSlot(slot_idx, M::matches),
422 }
423 }
424
425 /// Returns a reference to the array.
426 pub fn array(&self) -> &ArrayRef {
427 &self.array
428 }
429
430 /// Returns a reference to the step.
431 pub fn step(&self) -> &ExecutionStep {
432 &self.step
433 }
434
435 /// Decompose into parts.
436 pub fn into_parts(self) -> (ArrayRef, ExecutionStep) {
437 (self.array, self.step)
438 }
439}
440
441impl fmt::Debug for ExecutionResult {
442 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
443 f.debug_struct("ExecutionResult")
444 .field("array", &self.array)
445 .field("step", &self.step)
446 .finish()
447 }
448}
449
450/// Require that a child array matches `$M`. If the child already matches, returns the same
451/// array unchanged. Otherwise, early-returns an [`ExecutionResult`] requesting execution of
452/// child `$idx` until it matches `$M`.
453///
454/// ```ignore
455/// let array = require_child!(array, array.codes(), 0 => Primitive);
456/// let array = require_child!(array, array.values(), 1 => AnyCanonical);
457/// ```
458#[macro_export]
459macro_rules! require_child {
460 ($parent:expr, $child:expr, $idx:expr => $M:ty) => {{
461 if !$child.is::<$M>() {
462 return Ok($crate::ExecutionResult::execute_slot::<$M>(
463 $parent.clone(),
464 $idx,
465 ));
466 }
467 $parent
468 }};
469}
470
471/// Like [`require_child!`], but for optional children. If the child is `None`, this is a no-op.
472/// If the child is `Some` but does not match `$M`, early-returns an [`ExecutionResult`] requesting
473/// execution of child `$idx`.
474///
475/// Unlike `require_child!`, this is a statement macro (no value produced) and does not clone
476/// `$parent` — it is moved into the early-return path.
477///
478/// ```ignore
479/// require_opt_child!(array, array.patches().map(|p| p.indices()), 1 => Primitive);
480/// ```
481#[macro_export]
482macro_rules! require_opt_child {
483 ($parent:expr, $child_opt:expr, $idx:expr => $M:ty) => {
484 if $child_opt.is_some_and(|child| !child.is::<$M>()) {
485 return Ok($crate::ExecutionResult::execute_slot::<$M>($parent, $idx));
486 }
487 };
488}
489
490/// Require that patch slots (indices, values, and optionally chunk_offsets) are `Primitive`.
491/// If no patches are present (slots are `None`), this is a no-op.
492///
493/// Like [`require_opt_child!`], `$parent` is moved (not cloned) into the early-return path.
494///
495/// ```ignore
496/// require_patches!(array, PATCH_INDICES_SLOT, PATCH_VALUES_SLOT, PATCH_CHUNK_OFFSETS_SLOT);
497/// ```
498#[macro_export]
499macro_rules! require_patches {
500 ($parent:expr, $indices_slot:expr, $values_slot:expr, $chunk_offsets_slot:expr) => {
501 $crate::require_opt_child!(
502 $parent,
503 $parent.slots()[$indices_slot].as_ref(),
504 $indices_slot => $crate::arrays::Primitive
505 );
506 $crate::require_opt_child!(
507 $parent,
508 $parent.slots()[$values_slot].as_ref(),
509 $values_slot => $crate::arrays::Primitive
510 );
511 $crate::require_opt_child!(
512 $parent,
513 $parent.slots()[$chunk_offsets_slot].as_ref(),
514 $chunk_offsets_slot => $crate::arrays::Primitive
515 );
516 };
517}
518
519/// Require that the validity slot is a [`Bool`](crate::arrays::Bool) array. If validity is not
520/// array-backed (e.g. `NonNullable` or `AllValid`), this is a no-op. If it is array-backed but
521/// not `Bool`, early-returns an [`ExecutionResult`] requesting execution of the validity slot.
522///
523/// Like [`require_opt_child!`], `$parent` is moved (not cloned) into the early-return path.
524///
525/// ```ignore
526/// require_validity!(array, VALIDITY_SLOT);
527/// ```
528#[macro_export]
529macro_rules! require_validity {
530 ($parent:expr, $idx:expr) => {
531 $crate::require_opt_child!(
532 $parent,
533 $parent.slots()[$idx].as_ref(),
534 $idx => $crate::arrays::Bool
535 );
536 };
537}
538
539/// Extension trait for creating an execution context from a session.
540pub trait VortexSessionExecute {
541 /// Create a new execution context from this session.
542 fn create_execution_ctx(&self) -> ExecutionCtx;
543}
544
545impl VortexSessionExecute for VortexSession {
546 fn create_execution_ctx(&self) -> ExecutionCtx {
547 ExecutionCtx::new(self.clone())
548 }
549}