Skip to main content

vortex_array/
executor.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::type_name;
5use std::fmt;
6use std::fmt::Display;
7use std::sync::Arc;
8use std::sync::atomic::AtomicUsize;
9
10use vortex_error::VortexExpect;
11use vortex_error::VortexResult;
12use vortex_session::VortexSession;
13
14use crate::AnyCanonical;
15use crate::Array;
16use crate::ArrayRef;
17use crate::Canonical;
18use crate::IntoArray;
19
20/// Marker trait for types that an [`ArrayRef`] can be executed into.
21///
22/// Implementors must provide an implementation of `execute` that takes
23/// an [`ArrayRef`] and an [`ExecutionCtx`], and produces an instance of the
24/// implementor type.
25///
26/// Users should use the `Array::execute` or `Array::execute_as` methods
27pub trait Executable: Sized {
28    fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self>;
29}
30
31impl dyn Array + '_ {
32    /// Execute this array to produce an instance of `E`.
33    ///
34    /// See the [`Executable`] implementation for details on how this execution is performed.
35    pub fn execute<E: Executable>(self: Arc<Self>, ctx: &mut ExecutionCtx) -> VortexResult<E> {
36        ctx.log_entry(
37            &self,
38            format_args!("execute<{}> {}", type_name::<E>(), self),
39        );
40        E::execute(self, ctx)
41    }
42
43    /// Execute this array, labeling the execution step with a name for tracing.
44    pub fn execute_as<E: Executable>(
45        self: Arc<Self>,
46        name: &'static str,
47        ctx: &mut ExecutionCtx,
48    ) -> VortexResult<E> {
49        ctx.log_entry(
50            &self,
51            format_args!("{}: execute<{}> {}", name, type_name::<E>(), self),
52        );
53        E::execute(self, ctx)
54    }
55}
56
57/// Execution context for batch CPU compute.
58///
59/// Accumulates a trace of execution steps. Individual steps are logged at TRACE level for
60/// real-time following, and the full trace is dumped at DEBUG level when the context is dropped.
61pub struct ExecutionCtx {
62    id: usize,
63    session: VortexSession,
64    ops: Vec<String>,
65}
66
67impl ExecutionCtx {
68    /// Create a new execution context with the given session.
69    pub fn new(session: VortexSession) -> Self {
70        static EXEC_CTX_ID: AtomicUsize = AtomicUsize::new(0);
71        let id = EXEC_CTX_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
72        Self {
73            id,
74            session,
75            ops: Vec::new(),
76        }
77    }
78
79    /// Get the session associated with this execution context.
80    pub fn session(&self) -> &VortexSession {
81        &self.session
82    }
83
84    /// Log an execution step at the current depth.
85    ///
86    /// Steps are accumulated and dumped as a single trace on Drop at DEBUG level.
87    /// Individual steps are also logged at TRACE level for real-time following.
88    ///
89    /// Use the [`format_args!`] macro to create the `msg` argument.
90    pub fn log(&mut self, msg: fmt::Arguments<'_>) {
91        if tracing::enabled!(tracing::Level::DEBUG) {
92            let formatted = format!(" - {msg}");
93            tracing::trace!("exec[{}]: {formatted}", self.id);
94            self.ops.push(formatted);
95        }
96    }
97
98    /// Log an execution entry point. On the first call into this context, the full
99    /// `display_tree` of the array is included so the starting state is visible.
100    fn log_entry(&mut self, array: &dyn Array, msg: fmt::Arguments<'_>) {
101        if tracing::enabled!(tracing::Level::DEBUG) {
102            if self.ops.is_empty() {
103                self.log(format_args!("{msg}\n{}", array.display_tree()));
104            } else {
105                self.log(msg);
106            }
107        }
108    }
109}
110
111impl Display for ExecutionCtx {
112    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
113        write!(f, "exec[{}]", self.id)
114    }
115}
116
117impl Drop for ExecutionCtx {
118    fn drop(&mut self) {
119        if !self.ops.is_empty() && tracing::enabled!(tracing::Level::DEBUG) {
120            // Unlike itertools `.format()` (panics in 0.14 on second format)
121            struct FmtOps<'a>(&'a [String]);
122            impl Display for FmtOps<'_> {
123                fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
124                    for (i, op) in self.0.iter().enumerate() {
125                        if i > 0 {
126                            f.write_str("\n")?;
127                        }
128                        f.write_str(op)?;
129                    }
130                    Ok(())
131                }
132            }
133            tracing::debug!("exec[{}] trace:\n{}", self.id, FmtOps(&self.ops));
134        }
135    }
136}
137
138/// Executing an [`ArrayRef`] into an [`ArrayRef`] is the atomic execution loop within Vortex.
139///
140/// It attempts to take the smallest possible step of execution such that the returned array
141/// is incrementally more "executed" than the input array. In other words, it is closer to becoming
142/// a canonical array.
143///
144/// The execution steps are as follows:
145/// 0. Check for canonical.
146/// 1. Attempt to call `reduce_parent` on each child.
147/// 2. Attempt to `reduce` the array with metadata-only optimizations.
148/// 3. Attempt to call `execute_parent` on each child.
149/// 4. Call `execute` on the array itself.
150///
151/// Most users will not call this method directly, instead preferring to specify an executable
152/// target such as [`crate::Columnar`], [`Canonical`], or any of the canonical array types (such as
153/// [`crate::arrays::PrimitiveArray`]).
154impl Executable for ArrayRef {
155    fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self> {
156        // 0. Check for canonical
157        if let Some(canonical) = array.as_opt::<AnyCanonical>() {
158            ctx.log(format_args!("-> canonical {}", array));
159            return Ok(Canonical::from(canonical).into_array());
160        }
161
162        // 1. reduce (metadata-only rewrites)
163        if let Some(reduced) = array.vtable().reduce(&array)? {
164            ctx.log(format_args!("reduce: rewrote {} -> {}", array, reduced));
165            reduced.statistics().inherit_from(array.statistics());
166            return Ok(reduced);
167        }
168
169        // 2. reduce_parent (child-driven metadata-only rewrites)
170        for child_idx in 0..array.nchildren() {
171            let child = array.nth_child(child_idx).vortex_expect("checked length");
172            if let Some(reduced_parent) = child.vtable().reduce_parent(&child, &array, child_idx)? {
173                ctx.log(format_args!(
174                    "reduce_parent: child[{}]({}) rewrote {} -> {}",
175                    child_idx,
176                    child.encoding_id(),
177                    array,
178                    reduced_parent
179                ));
180                reduced_parent.statistics().inherit_from(array.statistics());
181                return Ok(reduced_parent);
182            }
183        }
184
185        // 3. execute_parent (child-driven optimized execution)
186        for child_idx in 0..array.nchildren() {
187            let child = array.nth_child(child_idx).vortex_expect("checked length");
188            if let Some(executed_parent) = child
189                .vtable()
190                .execute_parent(&child, &array, child_idx, ctx)?
191            {
192                ctx.log(format_args!(
193                    "execute_parent: child[{}]({}) rewrote {} -> {}",
194                    child_idx,
195                    child.encoding_id(),
196                    array,
197                    executed_parent
198                ));
199                executed_parent
200                    .statistics()
201                    .inherit_from(array.statistics());
202                return Ok(executed_parent);
203            }
204        }
205
206        // 4. execute (optimized execution)
207        ctx.log(format_args!("executing {}", array));
208        let array = array
209            .vtable()
210            .execute(&array, ctx)
211            .map(|c| c.into_array())?;
212        array.statistics().inherit_from(array.statistics());
213        ctx.log(format_args!("-> {}", array.as_ref()));
214
215        Ok(array)
216    }
217}
218
219/// Extension trait for creating an execution context from a session.
220pub trait VortexSessionExecute {
221    /// Create a new execution context from this session.
222    fn create_execution_ctx(&self) -> ExecutionCtx;
223}
224
225impl VortexSessionExecute for VortexSession {
226    fn create_execution_ctx(&self) -> ExecutionCtx {
227        ExecutionCtx::new(self.clone())
228    }
229}