vortex_array/executor.rs
1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::env::VarError;
5use std::fmt;
6use std::fmt::Display;
7use std::sync::Arc;
8use std::sync::LazyLock;
9use std::sync::atomic::AtomicUsize;
10
11use vortex_error::VortexExpect;
12use vortex_error::VortexResult;
13use vortex_error::vortex_bail;
14use vortex_error::vortex_panic;
15use vortex_session::VortexSession;
16
17use crate::AnyCanonical;
18use crate::ArrayRef;
19use crate::Canonical;
20use crate::DynArray;
21use crate::IntoArray;
22use crate::matcher::Matcher;
23use crate::optimizer::ArrayOptimizer;
24
25/// Maximum number of iterations to attempt when executing an array before giving up and returning
26/// an error.
27pub(crate) static MAX_ITERATIONS: LazyLock<usize> =
28 LazyLock::new(|| match std::env::var("VORTEX_MAX_ITERATIONS") {
29 Ok(val) => val
30 .parse::<usize>()
31 .unwrap_or_else(|e| vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid usize: {e}")),
32 Err(VarError::NotPresent) => 128,
33 Err(VarError::NotUnicode(_)) => {
34 vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid unicode string")
35 }
36 });
37
38/// Marker trait for types that an [`ArrayRef`] can be executed into.
39///
40/// Implementors must provide an implementation of `execute` that takes
41/// an [`ArrayRef`] and an [`ExecutionCtx`], and produces an instance of the
42/// implementor type.
43///
44/// Users should use the `Array::execute` or `Array::execute_as` methods
45pub trait Executable: Sized {
46 fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self>;
47}
48
49impl dyn DynArray + '_ {
50 /// Execute this array to produce an instance of `E`.
51 ///
52 /// See the [`Executable`] implementation for details on how this execution is performed.
53 pub fn execute<E: Executable>(self: Arc<Self>, ctx: &mut ExecutionCtx) -> VortexResult<E> {
54 E::execute(self, ctx)
55 }
56
57 /// Execute this array, labeling the execution step with a name for tracing.
58 pub fn execute_as<E: Executable>(
59 self: Arc<Self>,
60 _name: &'static str,
61 ctx: &mut ExecutionCtx,
62 ) -> VortexResult<E> {
63 E::execute(self, ctx)
64 }
65
66 /// Iteratively execute this array until the [`Matcher`] matches, using an explicit work
67 /// stack.
68 ///
69 /// The scheduler repeatedly:
70 /// 1. Checks if the current array matches `M` — if so, pops the stack or returns.
71 /// 2. Runs `execute_parent` on each child for child-driven optimizations.
72 /// 3. Calls `execute` which returns an [`ExecutionStep`].
73 ///
74 /// Note: the returned array may not match `M`. If execution converges to a canonical form
75 /// that does not match `M`, the canonical array is returned since no further execution
76 /// progress is possible.
77 ///
78 /// For safety, we will error when the number of execution iterations reaches a configurable
79 /// maximum (default 128, override with `VORTEX_MAX_ITERATIONS`).
80 pub fn execute_until<M: Matcher>(
81 self: Arc<Self>,
82 ctx: &mut ExecutionCtx,
83 ) -> VortexResult<ArrayRef> {
84 static MAX_ITERATIONS: LazyLock<usize> =
85 LazyLock::new(|| match std::env::var("VORTEX_MAX_ITERATIONS") {
86 Ok(val) => val.parse::<usize>().unwrap_or_else(|e| {
87 vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid usize: {e}")
88 }),
89 Err(VarError::NotPresent) => 128,
90 Err(VarError::NotUnicode(_)) => {
91 vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid unicode string")
92 }
93 });
94
95 let mut current = self.optimize()?;
96 // Stack frames: (parent, child_idx, done_predicate_for_child)
97 let mut stack: Vec<(ArrayRef, usize, DonePredicate)> = Vec::new();
98
99 for _ in 0..*MAX_ITERATIONS {
100 // Check for termination: use the stack frame's done predicate, or the root matcher.
101 let is_done = stack
102 .last()
103 .map_or(M::matches as DonePredicate, |frame| frame.2);
104 if is_done(current.as_ref()) {
105 match stack.pop() {
106 None => {
107 ctx.log(format_args!("-> {}", current));
108 return Ok(current);
109 }
110 Some((parent, child_idx, _)) => {
111 current = parent.with_child(child_idx, current)?;
112 current = current.optimize()?;
113 continue;
114 }
115 }
116 }
117
118 // If we've reached canonical form, we can't execute any further regardless
119 // of whether the matcher matched.
120 if AnyCanonical::matches(current.as_ref()) {
121 match stack.pop() {
122 None => {
123 ctx.log(format_args!("-> canonical (unmatched) {}", current));
124 return Ok(current);
125 }
126 Some((parent, child_idx, _)) => {
127 current = parent.with_child(child_idx, current)?;
128 current = current.optimize()?;
129 continue;
130 }
131 }
132 }
133
134 // Try execute_parent (child-driven optimized execution)
135 if let Some(rewritten) = try_execute_parent(¤t, ctx)? {
136 ctx.log(format_args!(
137 "execute_parent rewrote {} -> {}",
138 current, rewritten
139 ));
140 current = rewritten.optimize()?;
141 continue;
142 }
143
144 // Execute the array itself
145 match current.vtable().execute(¤t, ctx)? {
146 ExecutionStep::ExecuteChild(i, done) => {
147 let child = current
148 .nth_child(i)
149 .vortex_expect("ExecuteChild index in bounds");
150 ctx.log(format_args!(
151 "ExecuteChild({i}): pushing {}, focusing on {}",
152 current, child
153 ));
154 stack.push((current, i, done));
155 current = child.optimize()?;
156 }
157 ExecutionStep::Done(result) => {
158 ctx.log(format_args!("Done: {} -> {}", current, result));
159 current = result;
160 }
161 }
162 }
163
164 vortex_bail!(
165 "Exceeded maximum execution iterations ({}) while executing array",
166 *MAX_ITERATIONS,
167 )
168 }
169}
170
171/// Execution context for batch CPU compute.
172///
173/// Accumulates a trace of execution steps. Individual steps are logged at TRACE level for
174/// real-time following, and the full trace is dumped at DEBUG level when the context is dropped.
175pub struct ExecutionCtx {
176 id: usize,
177 session: VortexSession,
178 ops: Vec<String>,
179}
180
181impl ExecutionCtx {
182 /// Create a new execution context with the given session.
183 pub fn new(session: VortexSession) -> Self {
184 static EXEC_CTX_ID: AtomicUsize = AtomicUsize::new(0);
185 let id = EXEC_CTX_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
186 Self {
187 id,
188 session,
189 ops: Vec::new(),
190 }
191 }
192
193 /// Get the session associated with this execution context.
194 pub fn session(&self) -> &VortexSession {
195 &self.session
196 }
197
198 /// Log an execution step at the current depth.
199 ///
200 /// Steps are accumulated and dumped as a single trace on Drop at DEBUG level.
201 /// Individual steps are also logged at TRACE level for real-time following.
202 ///
203 /// Use the [`format_args!`] macro to create the `msg` argument.
204 pub fn log(&mut self, msg: fmt::Arguments<'_>) {
205 if tracing::enabled!(tracing::Level::DEBUG) {
206 let formatted = format!(" - {msg}");
207 tracing::trace!("exec[{}]: {formatted}", self.id);
208 self.ops.push(formatted);
209 }
210 }
211}
212
213impl Display for ExecutionCtx {
214 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
215 write!(f, "exec[{}]", self.id)
216 }
217}
218
219impl Drop for ExecutionCtx {
220 fn drop(&mut self) {
221 if !self.ops.is_empty() && tracing::enabled!(tracing::Level::DEBUG) {
222 // Unlike itertools `.format()` (panics in 0.14 on second format)
223 struct FmtOps<'a>(&'a [String]);
224 impl Display for FmtOps<'_> {
225 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
226 for (i, op) in self.0.iter().enumerate() {
227 if i > 0 {
228 f.write_str("\n")?;
229 }
230 f.write_str(op)?;
231 }
232 Ok(())
233 }
234 }
235 tracing::debug!("exec[{}] trace:\n{}", self.id, FmtOps(&self.ops));
236 }
237 }
238}
239
240/// Executing an [`ArrayRef`] into an [`ArrayRef`] is the atomic execution loop within Vortex.
241///
242/// It attempts to take the smallest possible step of execution such that the returned array
243/// is incrementally more "executed" than the input array. In other words, it is closer to becoming
244/// a canonical array.
245///
246/// The execution steps are as follows:
247/// 0. Check for canonical.
248/// 1. Attempt to `reduce` the array with metadata-only optimizations.
249/// 2. Attempt to call `reduce_parent` on each child.
250/// 3. Attempt to call `execute_parent` on each child.
251/// 4. Call `execute` on the array itself (which returns an [`ExecutionStep`]).
252///
253/// Most users will not call this method directly, instead preferring to specify an executable
254/// target such as [`crate::Columnar`], [`Canonical`], or any of the canonical array types (such as
255/// [`crate::arrays::PrimitiveArray`]).
256impl Executable for ArrayRef {
257 fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self> {
258 // 0. Check for canonical
259 if let Some(canonical) = array.as_opt::<AnyCanonical>() {
260 ctx.log(format_args!("-> canonical {}", array));
261 return Ok(Canonical::from(canonical).into_array());
262 }
263
264 // 1. reduce (metadata-only rewrites)
265 if let Some(reduced) = array.vtable().reduce(&array)? {
266 ctx.log(format_args!("reduce: rewrote {} -> {}", array, reduced));
267 reduced.statistics().inherit_from(array.statistics());
268 return Ok(reduced);
269 }
270
271 // 2. reduce_parent (child-driven metadata-only rewrites)
272 for child_idx in 0..array.nchildren() {
273 let child = array.nth_child(child_idx).vortex_expect("checked length");
274 if let Some(reduced_parent) = child.vtable().reduce_parent(&child, &array, child_idx)? {
275 ctx.log(format_args!(
276 "reduce_parent: child[{}]({}) rewrote {} -> {}",
277 child_idx,
278 child.encoding_id(),
279 array,
280 reduced_parent
281 ));
282 reduced_parent.statistics().inherit_from(array.statistics());
283 return Ok(reduced_parent);
284 }
285 }
286
287 // 3. execute_parent (child-driven optimized execution)
288 for child_idx in 0..array.nchildren() {
289 let child = array.nth_child(child_idx).vortex_expect("checked length");
290 if let Some(executed_parent) = child
291 .vtable()
292 .execute_parent(&child, &array, child_idx, ctx)?
293 {
294 ctx.log(format_args!(
295 "execute_parent: child[{}]({}) rewrote {} -> {}",
296 child_idx,
297 child.encoding_id(),
298 array,
299 executed_parent
300 ));
301 executed_parent
302 .statistics()
303 .inherit_from(array.statistics());
304 return Ok(executed_parent);
305 }
306 }
307
308 // 4. execute (returns an ExecutionStep)
309 ctx.log(format_args!("executing {}", array));
310 match array.vtable().execute(&array, ctx)? {
311 ExecutionStep::Done(result) => {
312 ctx.log(format_args!("-> {}", result.as_ref()));
313 Ok(result)
314 }
315 ExecutionStep::ExecuteChild(i, _) => {
316 // For single-step execution, handle ExecuteChild by executing the child,
317 // replacing it, and returning the updated array.
318 let child = array.nth_child(i).vortex_expect("valid child index");
319 let executed_child = child.execute::<ArrayRef>(ctx)?;
320 array.with_child(i, executed_child)
321 }
322 }
323 }
324}
325
326/// Try execute_parent on each child of the array.
327fn try_execute_parent(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Option<ArrayRef>> {
328 for child_idx in 0..array.nchildren() {
329 let child = array
330 .nth_child(child_idx)
331 .vortex_expect("checked nchildren");
332 if let Some(result) = child
333 .vtable()
334 .execute_parent(&child, array, child_idx, ctx)?
335 {
336 result.statistics().inherit_from(array.statistics());
337 return Ok(Some(result));
338 }
339 }
340 Ok(None)
341}
342
343/// A predicate that determines when an array has reached a desired form during execution.
344pub type DonePredicate = fn(&dyn DynArray) -> bool;
345
346/// The result of a single execution step on an array encoding.
347///
348/// Instead of recursively executing children, encodings return an `ExecutionStep` that tells the
349/// scheduler what to do next. This enables the scheduler to manage execution iteratively using
350/// an explicit work stack, run cross-step optimizations, and cache shared sub-expressions.
351pub enum ExecutionStep {
352 /// Request that the scheduler execute child at the given index, using the provided
353 /// [`DonePredicate`] to determine when the child is "done", then replace the child in this
354 /// array and re-enter execution.
355 ///
356 /// Between steps, the scheduler runs reduce/reduce_parent rules to fixpoint, enabling
357 /// cross-step optimization (e.g., pushing scalar functions through newly-decoded children).
358 ///
359 /// Use [`ExecutionStep::execute_child`] instead of constructing this variant directly.
360 ExecuteChild(usize, DonePredicate),
361
362 /// Execution is complete. The result may be in any encoding — not necessarily canonical.
363 /// The scheduler will continue executing the result if it has not yet reached the target form.
364 Done(ArrayRef),
365}
366
367impl ExecutionStep {
368 /// Request execution of child at `child_idx` until it matches the given [`Matcher`].
369 pub fn execute_child<M: Matcher>(child_idx: usize) -> Self {
370 ExecutionStep::ExecuteChild(child_idx, M::matches)
371 }
372
373 /// Signal that execution is complete with the given result.
374 pub fn done(result: ArrayRef) -> Self {
375 ExecutionStep::Done(result)
376 }
377}
378
379impl fmt::Debug for ExecutionStep {
380 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
381 match self {
382 ExecutionStep::ExecuteChild(idx, _) => {
383 f.debug_tuple("ExecuteChild").field(idx).finish()
384 }
385 ExecutionStep::Done(result) => f.debug_tuple("Done").field(result).finish(),
386 }
387 }
388}
389
390/// Extension trait for creating an execution context from a session.
391pub trait VortexSessionExecute {
392 /// Create a new execution context from this session.
393 fn create_execution_ctx(&self) -> ExecutionCtx;
394}
395
396impl VortexSessionExecute for VortexSession {
397 fn create_execution_ctx(&self) -> ExecutionCtx {
398 ExecutionCtx::new(self.clone())
399 }
400}