vortex_array/executor.rs
1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Iterative array execution.
5//!
6//! The single-step [`Executable`] implementation for [`ArrayRef`] tries `reduce`,
7//! `reduce_parent`, `execute_parent`, then `execute` once. The matcher-driven
8//! [`ArrayRef::execute_until`] loop interprets [`ExecutionStep::ExecuteSlot`],
9//! [`ExecutionStep::AppendChild`], and [`ExecutionStep::Done`] using an explicit stack plus an
10//! optional builder, so encodings can advance without recursive descent.
11//!
12//! See <https://docs.vortex.dev/developer-guide/internals/execution> for the full execution
13//! narrative, diagrams, and walkthroughs.
14
15use std::env::VarError;
16use std::fmt;
17use std::fmt::Display;
18use std::sync::LazyLock;
19
20use vortex_error::VortexExpect;
21use vortex_error::VortexResult;
22use vortex_error::vortex_bail;
23use vortex_error::vortex_ensure;
24use vortex_error::vortex_panic;
25use vortex_session::VortexSession;
26
27use crate::AnyCanonical;
28use crate::ArrayRef;
29use crate::Canonical;
30use crate::IntoArray;
31use crate::array::ArrayId;
32use crate::builders::ArrayBuilder;
33use crate::builders::builder_with_capacity_in;
34use crate::dtype::DType;
35use crate::matcher::Matcher;
36use crate::memory::HostAllocatorRef;
37use crate::memory::MemorySessionExt;
38use crate::optimizer::ArrayOptimizer;
39use crate::stats::ArrayStats;
40use crate::stats::StatsSet;
41
42/// Returns the maximum number of iterations to attempt when executing an array before giving up and returning
43/// an error, can be by the `VORTEX_MAX_ITERATIONS` env variables, otherwise defaults to 2^22.
44pub(crate) fn max_iterations() -> usize {
45 static MAX_ITERATIONS: LazyLock<usize> =
46 LazyLock::new(|| match std::env::var("VORTEX_MAX_ITERATIONS") {
47 Ok(val) => val.parse::<usize>().unwrap_or_else(|e| {
48 vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid usize: {e}")
49 }),
50 Err(VarError::NotPresent) => 2 << 21, // 2 ^ 22
51 Err(VarError::NotUnicode(_)) => {
52 vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid unicode string")
53 }
54 });
55 *MAX_ITERATIONS
56}
57
58/// Marker trait for types that an [`ArrayRef`] can be executed into.
59///
60/// Implementors must provide an implementation of `execute` that takes
61/// an [`ArrayRef`] and an [`ExecutionCtx`], and produces an instance of the
62/// implementor type.
63///
64/// Users should use the `Array::execute` or `Array::execute_as` methods
65pub trait Executable: Sized {
66 fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self>;
67}
68
69#[expect(clippy::same_name_method)]
70impl ArrayRef {
71 /// Execute this array to produce an instance of `E`.
72 ///
73 /// See the [`Executable`] implementation for details on how this execution is performed.
74 pub fn execute<E: Executable>(self, ctx: &mut ExecutionCtx) -> VortexResult<E> {
75 E::execute(self, ctx)
76 }
77
78 /// Execute this array, labeling the execution step with a name for tracing.
79 pub fn execute_as<E: Executable>(
80 self,
81 _name: &'static str,
82 ctx: &mut ExecutionCtx,
83 ) -> VortexResult<E> {
84 E::execute(self, ctx)
85 }
86
87 /// Iteratively execute this array until the [`Matcher`] matches, using an explicit work
88 /// stack plus an optional builder for `AppendChild`.
89 ///
90 /// Note: the returned array may not match `M`. If execution converges to a canonical form
91 /// that does not match `M`, the canonical array is returned since no further execution
92 /// progress is possible.
93 ///
94 /// For safety, this errors once execution reaches a configurable maximum number of
95 /// iterations (default `2^22`, override with `VORTEX_MAX_ITERATIONS`).
96 ///
97 /// # Loop state
98 ///
99 /// - `current_array: ArrayRef` -- the array currently in focus.
100 /// - `current_builder: Option<Box<dyn ArrayBuilder>>` -- active only for builder-mode
101 /// execution. `AppendChild` appends detached children here. `Done` finishes the builder
102 /// and turns it back into the next `current_array`.
103 /// - `stack: Vec<StackFrame>` -- suspended parents from `ExecuteSlot`, including the
104 /// detached slot index, its [`DonePredicate`], and the parent builder that was active
105 /// before focus moved into the child.
106 ///
107 /// Example after `ExecuteSlot(1, pred)` has focused slot 1 of a parent:
108 ///
109 /// ```text
110 /// stack[top].parent_array:
111 /// RunEnd <-- suspended parent
112 /// +-- slot 0: ends
113 /// +-- slot 1: _ (detached)
114 ///
115 /// current_array:
116 /// DictEncoding <-- focused child
117 /// +-- slot 0: codes
118 /// +-- slot 1: dictionary
119 ///
120 /// current_builder:
121 /// None
122 /// ```
123 ///
124 /// Each loop iteration works like this:
125 ///
126 /// ```text
127 /// loop:
128 /// Step 1: done(current_array)?
129 /// - root activation -> return current_array
130 /// - ExecuteSlot frame -> pop, reattach child, resume parent
131 ///
132 /// Step 2: current_builder active?
133 /// - yes -> skip Step 2a / 2b
134 /// - no -> try parent kernels
135 ///
136 /// Step 2a: current_array.execute_parent(stack.top.parent_array)
137 /// child looks up at the suspended parent from ExecuteSlot
138 ///
139 /// Step 2b: for child in current_array.children():
140 /// child.execute_parent(current_array)
141 /// each child looks up at current_array
142 ///
143 /// Step 3: match current_array.execute()
144 /// ExecuteSlot(i, pred) -> push parent on stack, focus child `i`
145 /// AppendChild(i) -> detach child `i`, append it into current_builder,
146 /// keep parent as current_array
147 /// Done -> finish current_builder if present, else use returned array
148 /// ```
149 ///
150 /// Step 2a and Step 2b are skipped while `current_builder` is active. `AppendChild`
151 /// partially consumes `current_array`: some slots already live in the builder, so a
152 /// parent rewrite would observe inconsistent state and could discard accumulated builder
153 /// data.
154 pub fn execute_until<M: Matcher>(self, ctx: &mut ExecutionCtx) -> VortexResult<ArrayRef> {
155 let mut current_array = self;
156 let mut current_builder: Option<Box<dyn ArrayBuilder>> = None;
157 let mut stack: Vec<StackFrame> = Vec::new();
158 let max_iterations = max_iterations();
159
160 for _ in 0..max_iterations {
161 let is_done = stack
162 .last()
163 .map_or(M::matches as DonePredicate, |frame| frame.done);
164
165 if is_done(¤t_array) || AnyCanonical::matches(¤t_array) {
166 match stack.pop() {
167 None => {
168 debug_assert!(
169 current_builder.is_none(),
170 "root activation should not retain a builder"
171 );
172 ctx.log(format_args!("-> {}", current_array));
173 return Ok(current_array);
174 }
175 Some(frame) => {
176 (current_array, current_builder) = pop_frame(frame, current_array)?;
177 continue;
178 }
179 }
180 }
181
182 // Step 2a: execute_parent against the suspended parent from ExecuteSlot.
183 //
184 // When executing a child for ExecuteSlot, try execute_parent against
185 // the suspended parent on the stack. This lets kernels like RunEnd's
186 // FilterKernel fire before the child is forced to canonical.
187 //
188 // Skip when a builder is active: the current array has been partially
189 // consumed by AppendChild (some slots are already in the builder), so
190 // a parent rewrite would see inconsistent state and the builder data
191 // would be lost when we restore frame.parent_builder.
192 if current_builder.is_none()
193 && let Some(frame) = stack.last()
194 && let Some(result) =
195 current_array.execute_parent(&frame.parent_array, frame.slot_idx, ctx)?
196 {
197 ctx.log(format_args!(
198 "execute_parent (stack) rewrote {} -> {}",
199 current_array, result
200 ));
201 let frame = stack.pop().vortex_expect("just peeked");
202 current_array = result.optimize_ctx(ctx.session())?;
203 current_builder = frame.parent_builder;
204 continue;
205 }
206
207 // Step 2b: execute_parent against current_array's own children.
208 if current_builder.is_none()
209 && let Some(rewritten) = try_execute_parent(¤t_array, ctx)?
210 {
211 ctx.log(format_args!(
212 "execute_parent rewrote {} -> {}",
213 current_array, rewritten
214 ));
215 current_array = rewritten.optimize_ctx(ctx.session())?;
216 continue;
217 }
218
219 // execute step
220 let expected_len = current_array.len();
221 let expected_dtype = current_array.dtype().clone();
222 let stats = current_array.statistics().to_array_stats();
223 let encoding_id = current_array.encoding_id();
224 let result = current_array.execute_encoding_unchecked(ctx)?;
225 let (array, step) = result.into_parts();
226 match step {
227 ExecutionStep::ExecuteSlot(i, done) => {
228 let (parent, child) = unsafe { array.take_slot_unchecked(i) }?;
229 ctx.log(format_args!(
230 "ExecuteSlot({i}): pushing {}, focusing on {}",
231 parent, child
232 ));
233 stack.push(StackFrame {
234 parent_array: parent,
235 parent_builder: current_builder.take(),
236 slot_idx: i,
237 done,
238 original_dtype: child.dtype().clone(),
239 original_len: child.len(),
240 });
241 current_array = child;
242 current_builder = None;
243 }
244 ExecutionStep::AppendChild(i) => {
245 if current_builder.is_none() {
246 current_builder = Some(builder_with_capacity_in(
247 ctx.allocator(),
248 array.dtype(),
249 array.len(),
250 ));
251 }
252 let (parent, child) = unsafe { array.take_slot_unchecked(i) }?;
253 ctx.log(format_args!(
254 "AppendChild({i}): appending {} into builder",
255 child
256 ));
257 // TODO(joe)[7674]: replace with a builder kernel registry so we don't
258 // need to go through the VTable append_to_builder indirection.
259 child.append_to_builder(
260 current_builder
261 .as_deref_mut()
262 .vortex_expect("builder must exist"),
263 ctx,
264 )?;
265 current_array = parent;
266 }
267 ExecutionStep::Done => {
268 ctx.log(format_args!("Done: {}", array));
269 (current_array, current_builder) = finalize_done(
270 array,
271 current_builder,
272 expected_len,
273 expected_dtype,
274 stats,
275 encoding_id,
276 )?;
277 }
278 }
279 }
280
281 vortex_bail!(
282 "Exceeded maximum execution iterations ({}) while executing array",
283 max_iterations,
284 )
285 }
286}
287
288struct StackFrame {
289 parent_array: ArrayRef,
290 parent_builder: Option<Box<dyn ArrayBuilder>>,
291 slot_idx: usize,
292 done: DonePredicate,
293 original_dtype: DType,
294 original_len: usize,
295}
296
297/// Execution context for batch CPU compute.
298#[derive(Debug, Clone)]
299pub struct ExecutionCtx {
300 session: VortexSession,
301 #[cfg(debug_assertions)]
302 id: usize,
303 #[cfg(debug_assertions)]
304 ops: Vec<String>,
305}
306
307impl ExecutionCtx {
308 /// Create a new execution context with the given session.
309 pub fn new(session: VortexSession) -> Self {
310 Self {
311 session,
312 #[cfg(debug_assertions)]
313 id: {
314 static EXEC_CTX_ID: std::sync::atomic::AtomicUsize =
315 std::sync::atomic::AtomicUsize::new(0);
316 EXEC_CTX_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
317 },
318 #[cfg(debug_assertions)]
319 ops: Vec::new(),
320 }
321 }
322
323 /// Get the session associated with this execution context.
324 pub fn session(&self) -> &VortexSession {
325 &self.session
326 }
327
328 /// Get the session-scoped host allocator for this execution context.
329 pub fn allocator(&self) -> HostAllocatorRef {
330 self.session.allocator()
331 }
332
333 /// Log an execution step at the current depth.
334 ///
335 /// Steps are accumulated and dumped as a single trace on Drop at DEBUG level.
336 /// Individual steps are also logged at TRACE level for real-time following.
337 ///
338 /// Use the [`format_args!`] macro to create the `msg` argument.
339 pub fn log(&mut self, msg: fmt::Arguments<'_>) {
340 #[cfg(debug_assertions)]
341 if tracing::enabled!(tracing::Level::DEBUG) {
342 let formatted = format!(" - {msg}");
343 tracing::trace!("exec[{}]: {formatted}", self.id);
344 self.ops.push(formatted);
345 }
346 let _ = msg;
347 }
348}
349
350impl Display for ExecutionCtx {
351 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
352 #[cfg(debug_assertions)]
353 return write!(f, "exec[{}]", self.id);
354 #[cfg(not(debug_assertions))]
355 write!(f, "exec")
356 }
357}
358
359#[cfg(debug_assertions)]
360impl Drop for ExecutionCtx {
361 fn drop(&mut self) {
362 if !self.ops.is_empty() && tracing::enabled!(tracing::Level::DEBUG) {
363 // Unlike itertools `.format()` (panics in 0.14 on second format)
364 struct FmtOps<'a>(&'a [String]);
365 impl Display for FmtOps<'_> {
366 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
367 for (i, op) in self.0.iter().enumerate() {
368 if i > 0 {
369 f.write_str("\n")?;
370 }
371 f.write_str(op)?;
372 }
373 Ok(())
374 }
375 }
376 tracing::debug!("exec[{}] trace:\n{}", self.id, FmtOps(&self.ops));
377 }
378 }
379}
380
381/// Single-step execution: takes one step toward canonical form.
382///
383/// Steps through reduce, reduce_parent, execute_parent, then execute. For `ExecuteSlot`,
384/// only a single child execution step is performed — the child is executed once and put back,
385/// making this a lightweight, bounded operation.
386///
387/// **However**, if `execute_step` returns [`ExecutionStep::AppendChild`], this implementation
388/// drives the *entire* array to completion via [`execute_into_builder`] in a single call.
389/// This can do substantially more work than a normal step because it creates a builder and
390/// fully decodes the array into that builder before returning. Callers should be aware that a
391/// single `.execute::<ArrayRef>(ctx)` call may perform O(n_children * decode_cost) work when
392/// `AppendChild` is returned.
393impl Executable for ArrayRef {
394 fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self> {
395 if let Some(canonical) = array.as_opt::<AnyCanonical>() {
396 ctx.log(format_args!("-> canonical {}", array));
397 return Ok(Canonical::from(canonical).into_array());
398 }
399
400 if let Some(reduced) = array.reduce()? {
401 ctx.log(format_args!("reduce: rewrote {} -> {}", array, reduced));
402 reduced.statistics().inherit_from(array.statistics());
403 return Ok(reduced);
404 }
405
406 for (slot_idx, slot) in array.slots().iter().enumerate() {
407 let Some(child) = slot else { continue };
408 if let Some(reduced_parent) = child.reduce_parent(&array, slot_idx)? {
409 ctx.log(format_args!(
410 "reduce_parent: slot[{}]({}) rewrote {} -> {}",
411 slot_idx,
412 child.encoding_id(),
413 array,
414 reduced_parent
415 ));
416 reduced_parent.statistics().inherit_from(array.statistics());
417 return Ok(reduced_parent);
418 }
419 }
420
421 for (slot_idx, slot) in array.slots().iter().enumerate() {
422 let Some(child) = slot else { continue };
423 if let Some(executed_parent) = child.execute_parent(&array, slot_idx, ctx)? {
424 ctx.log(format_args!(
425 "execute_parent: slot[{}]({}) rewrote {} -> {}",
426 slot_idx,
427 child.encoding_id(),
428 array,
429 executed_parent
430 ));
431 executed_parent
432 .statistics()
433 .inherit_from(array.statistics());
434 return Ok(executed_parent);
435 }
436 }
437
438 ctx.log(format_args!("executing {}", array));
439 let result = array.execute_encoding(ctx)?;
440 let (array, step) = result.into_parts();
441 match step {
442 ExecutionStep::Done => {
443 ctx.log(format_args!("-> {}", array));
444 Ok(array)
445 }
446 ExecutionStep::ExecuteSlot(i, _) => {
447 let child = array.slots()[i].clone().vortex_expect("valid slot index");
448 let executed_child = child.execute::<ArrayRef>(ctx)?;
449 array.with_slot(i, executed_child)
450 }
451 ExecutionStep::AppendChild(_) => {
452 // Single-step: build the entire parent via the builder path.
453 let builder = builder_with_capacity_in(ctx.allocator(), array.dtype(), array.len());
454 let mut builder = execute_into_builder(array, builder, ctx)?;
455 Ok(builder.finish())
456 }
457 }
458 }
459}
460
461/// Execute `array` into the given `builder`.
462///
463/// This uses the encoding's [`crate::array::VTable::append_to_builder`] implementation. Most
464/// encodings use the default path of `execute::<Canonical>` followed by `builder.extend_from_array`,
465/// while encodings like `Chunked` can override that to append child-by-child without materializing
466/// the entire parent.
467///
468/// The builder must have a [`DType`] that is a nullability-superset of `array.dtype()`.
469pub fn execute_into_builder(
470 array: ArrayRef,
471 mut builder: Box<dyn ArrayBuilder>,
472 ctx: &mut ExecutionCtx,
473) -> VortexResult<Box<dyn ArrayBuilder>> {
474 array.append_to_builder(builder.as_mut(), ctx)?;
475 Ok(builder)
476}
477
478/// Pop a stack frame, restoring the parent with the finished child in its slot.
479fn pop_frame(
480 frame: StackFrame,
481 child: ArrayRef,
482) -> VortexResult<(ArrayRef, Option<Box<dyn ArrayBuilder>>)> {
483 debug_assert_eq!(
484 child.dtype(),
485 &frame.original_dtype,
486 "child dtype changed during execution"
487 );
488 debug_assert_eq!(
489 child.len(),
490 frame.original_len,
491 "child len changed during execution"
492 );
493 let parent_array = unsafe { frame.parent_array.put_slot_unchecked(frame.slot_idx, child) }?;
494 Ok((parent_array, frame.parent_builder))
495}
496
497fn finalize_done(
498 result: ArrayRef,
499 mut builder: Option<Box<dyn ArrayBuilder>>,
500 expected_len: usize,
501 expected_dtype: DType,
502 stats: ArrayStats,
503 encoding_id: ArrayId,
504) -> VortexResult<(ArrayRef, Option<Box<dyn ArrayBuilder>>)> {
505 let output = if let Some(mut builder) = builder.take() {
506 builder.finish()
507 } else {
508 result
509 };
510
511 if cfg!(debug_assertions) {
512 vortex_ensure!(
513 output.len() == expected_len,
514 "Result length mismatch for {:?}",
515 encoding_id
516 );
517 vortex_ensure!(
518 output.dtype() == &expected_dtype,
519 "Executed canonical dtype mismatch for {:?}",
520 encoding_id
521 );
522 }
523
524 output
525 .statistics()
526 .set_iter(StatsSet::from(stats).into_iter());
527 Ok((output, None))
528}
529
530/// Try execute_parent on each occupied slot of the array.
531fn try_execute_parent(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Option<ArrayRef>> {
532 for (slot_idx, slot) in array.slots().iter().enumerate() {
533 let Some(child) = slot else {
534 continue;
535 };
536 if let Some(result) = child.execute_parent(array, slot_idx, ctx)? {
537 result.statistics().inherit_from(array.statistics());
538 return Ok(Some(result));
539 }
540 }
541 Ok(None)
542}
543
544/// A predicate that determines when an array has reached a desired form during execution.
545pub type DonePredicate = fn(&ArrayRef) -> bool;
546
547/// Scheduler step indicator returned alongside an array in [`ExecutionResult`].
548///
549/// Instead of recursively executing children, encodings return an `ExecutionStep` that tells the
550/// scheduler what to do next. This enables the scheduler to manage execution iteratively using
551/// an explicit work stack plus an optional builder.
552///
553/// # Semantics
554///
555/// Each variant describes a different execution strategy with distinct cost profiles:
556///
557/// - [`Done`](ExecutionStep::Done): The current activation has finished its work. If no builder
558/// is active, the returned array is the result. If a builder is active, the scheduler ignores
559/// the placeholder array and finishes the builder instead. The scheduler may continue
560/// executing if the target form (e.g. canonical) has not yet been reached.
561///
562/// - [`ExecuteSlot`](ExecutionStep::ExecuteSlot): The encoding needs one of its children
563/// decoded before it can make further progress. The scheduler detaches that child, pushes
564/// the parent onto the explicit stack, executes the child until the [`DonePredicate`]
565/// matches, puts it back, and re-enters the parent. This is a cooperative yield: the
566/// encoding does a bounded amount of work per step while the loop tracks the parent-child
567/// relationship explicitly.
568///
569/// - [`AppendChild`](ExecutionStep::AppendChild): The encoding needs one child executed to
570/// canonical form and then appended into a builder owned by the current activation. The
571/// scheduler detaches that child, lazily creates `current_builder` if needed, appends the
572/// child into it, and keeps the parent as `current_array` for the next iteration. While the
573/// builder is active, parent-kernel rewrites are skipped because the parent is partially
574/// consumed. **Important:** in the single-step executor ([`Executable`] for [`ArrayRef`]),
575/// returning `AppendChild` still causes the executor to drive the *entire* array to
576/// completion via [`execute_into_builder`] in one call — this can do significantly more
577/// work than a single `ExecuteSlot` step.
578pub enum ExecutionStep {
579 /// Request that the scheduler execute the slot at the given index, using the provided
580 /// [`DonePredicate`] to determine when the slot is "done", then replace the slot in this
581 /// array and re-enter execution.
582 ///
583 /// Use [`ExecutionResult::execute_slot`] instead of constructing this variant directly.
584 ExecuteSlot(usize, DonePredicate),
585
586 /// Detach the slot at the given index, append that child into the current activation's
587 /// canonical builder, and keep the returned parent as `current_array`.
588 ///
589 /// `Done` finalizes that builder and turns it into the result of the activation.
590 ///
591 /// **Note:** In the single-step executor ([`Executable`] for [`ArrayRef`]), this variant
592 /// drives the entire parent to completion in one call via [`execute_into_builder`], which
593 /// may perform substantially more work than a single `ExecuteSlot` step.
594 AppendChild(usize),
595
596 /// Execution is complete. If no builder is active, the array in the accompanying
597 /// [`ExecutionResult`] is the result. Otherwise, the scheduler finalizes the active
598 /// builder and uses that finished array instead.
599 ///
600 /// The scheduler will continue executing if it has not yet reached the target form.
601 Done,
602}
603
604impl fmt::Debug for ExecutionStep {
605 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
606 match self {
607 ExecutionStep::ExecuteSlot(idx, _) => f.debug_tuple("ExecuteSlot").field(idx).finish(),
608 ExecutionStep::AppendChild(idx) => f.debug_tuple("AppendChild").field(idx).finish(),
609 ExecutionStep::Done => write!(f, "Done"),
610 }
611 }
612}
613
614/// The result of a single execution step on an array encoding.
615///
616/// Combines an [`ArrayRef`] with an [`ExecutionStep`] to tell the scheduler both what to do next
617/// and what array to work with.
618pub struct ExecutionResult {
619 array: ArrayRef,
620 step: ExecutionStep,
621}
622
623impl ExecutionResult {
624 /// Signal that execution is complete with the given result array.
625 pub fn done(result: impl IntoArray) -> Self {
626 Self {
627 array: result.into_array(),
628 step: ExecutionStep::Done,
629 }
630 }
631
632 /// Request execution of slot at `slot_idx` until it matches the given [`Matcher`].
633 ///
634 /// The provided array is the (possibly modified) parent that still needs its slot executed.
635 pub fn execute_slot<M: Matcher>(array: impl IntoArray, slot_idx: usize) -> Self {
636 Self {
637 array: array.into_array(),
638 step: ExecutionStep::ExecuteSlot(slot_idx, M::matches),
639 }
640 }
641
642 /// Request that the child slot at `slot_idx` be detached, appended into the current
643 /// activation's canonical builder, and leave the returned parent as the next
644 /// `current_array`.
645 pub fn append_child(array: impl IntoArray, slot_idx: usize) -> Self {
646 Self {
647 array: array.into_array(),
648 step: ExecutionStep::AppendChild(slot_idx),
649 }
650 }
651
652 /// Returns a reference to the array.
653 pub fn array(&self) -> &ArrayRef {
654 &self.array
655 }
656
657 /// Returns a reference to the step.
658 pub fn step(&self) -> &ExecutionStep {
659 &self.step
660 }
661
662 /// Decompose into parts.
663 pub fn into_parts(self) -> (ArrayRef, ExecutionStep) {
664 (self.array, self.step)
665 }
666}
667
668impl fmt::Debug for ExecutionResult {
669 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
670 f.debug_struct("ExecutionResult")
671 .field("array", &self.array)
672 .field("step", &self.step)
673 .finish()
674 }
675}
676
677/// Require that a child array matches `$M`. If the child already matches, returns the same
678/// array unchanged. Otherwise, early-returns an [`ExecutionResult`] requesting execution of
679/// child `$idx` until it matches `$M`.
680///
681/// ```ignore
682/// let array = require_child!(array, array.codes(), 0 => Primitive);
683/// let array = require_child!(array, array.values(), 1 => AnyCanonical);
684/// ```
685#[macro_export]
686macro_rules! require_child {
687 ($parent:expr, $child:expr, $idx:expr => $M:ty) => {{
688 if !$child.is::<$M>() {
689 return Ok($crate::ExecutionResult::execute_slot::<$M>(
690 $parent.clone(),
691 $idx,
692 ));
693 }
694 $parent
695 }};
696}
697
698/// Like [`require_child!`], but for optional children. If the child is `None`, this is a no-op.
699/// If the child is `Some` but does not match `$M`, early-returns an [`ExecutionResult`] requesting
700/// execution of child `$idx`.
701///
702/// Unlike `require_child!`, this is a statement macro (no value produced) and does not clone
703/// `$parent` - it is moved into the early-return path.
704///
705/// ```ignore
706/// require_opt_child!(array, array.patches().map(|p| p.indices()), 1 => Primitive);
707/// ```
708#[macro_export]
709macro_rules! require_opt_child {
710 ($parent:expr, $child_opt:expr, $idx:expr => $M:ty) => {
711 if $child_opt.is_some_and(|child| !child.is::<$M>()) {
712 return Ok($crate::ExecutionResult::execute_slot::<$M>($parent, $idx));
713 }
714 };
715}
716
717/// Require that patch slots (indices, values, and optionally chunk_offsets) are `Primitive`.
718/// If no patches are present (slots are `None`), this is a no-op.
719///
720/// Like [`require_opt_child!`], `$parent` is moved (not cloned) into the early-return path.
721///
722/// ```ignore
723/// require_patches!(array, PATCH_INDICES_SLOT, PATCH_VALUES_SLOT, PATCH_CHUNK_OFFSETS_SLOT);
724/// ```
725#[macro_export]
726macro_rules! require_patches {
727 ($parent:expr, $indices_slot:expr, $values_slot:expr, $chunk_offsets_slot:expr) => {
728 $crate::require_opt_child!(
729 $parent,
730 $parent.slots()[$indices_slot].as_ref(),
731 $indices_slot => $crate::arrays::Primitive
732 );
733 $crate::require_opt_child!(
734 $parent,
735 $parent.slots()[$values_slot].as_ref(),
736 $values_slot => $crate::arrays::Primitive
737 );
738 $crate::require_opt_child!(
739 $parent,
740 $parent.slots()[$chunk_offsets_slot].as_ref(),
741 $chunk_offsets_slot => $crate::arrays::Primitive
742 );
743 };
744}
745
746/// Require that the validity slot is a [`Bool`](crate::arrays::Bool) array. If validity is not
747/// array-backed (e.g. `NonNullable` or `AllValid`), this is a no-op. If it is array-backed but
748/// not `Bool`, early-returns an [`ExecutionResult`] requesting execution of the validity slot.
749///
750/// Like [`require_opt_child!`], `$parent` is moved (not cloned) into the early-return path.
751///
752/// ```ignore
753/// require_validity!(array, VALIDITY_SLOT);
754/// ```
755#[macro_export]
756macro_rules! require_validity {
757 ($parent:expr, $idx:expr) => {
758 $crate::require_opt_child!(
759 $parent,
760 $parent.slots()[$idx].as_ref(),
761 $idx => $crate::arrays::Bool
762 );
763 };
764}
765
766/// Extension trait for creating an execution context from a session.
767pub trait VortexSessionExecute {
768 /// Create a new execution context from this session.
769 fn create_execution_ctx(&self) -> ExecutionCtx;
770}
771
772impl VortexSessionExecute for VortexSession {
773 fn create_execution_ctx(&self) -> ExecutionCtx {
774 ExecutionCtx::new(self.clone())
775 }
776}