Skip to main content

vortex_array/
executor.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt;
5use std::fmt::Display;
6use std::sync::Arc;
7use std::sync::atomic::AtomicUsize;
8
9use vortex_error::VortexExpect;
10use vortex_error::VortexResult;
11use vortex_session::VortexSession;
12
13use crate::AnyCanonical;
14use crate::Array;
15use crate::ArrayRef;
16use crate::Canonical;
17use crate::IntoArray;
18
19/// Marker trait for types that an [`ArrayRef`] can be executed into.
20///
21/// Implementors must provide an implementation of `execute` that takes
22/// an [`ArrayRef`] and an [`ExecutionCtx`], and produces an instance of the
23/// implementor type.
24///
25/// Users should use the `Array::execute` or `Array::execute_as` methods
26pub trait Executable: Sized {
27    fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self>;
28}
29
30impl dyn Array + '_ {
31    /// Execute this array to produce an instance of `E`.
32    ///
33    /// See the [`Executable`] implementation for details on how this execution is performed.
34    pub fn execute<E: Executable>(self: Arc<Self>, ctx: &mut ExecutionCtx) -> VortexResult<E> {
35        E::execute(self, ctx)
36    }
37
38    /// Execute this array, labeling the execution step with a name for tracing.
39    pub fn execute_as<E: Executable>(
40        self: Arc<Self>,
41        _name: &'static str,
42        ctx: &mut ExecutionCtx,
43    ) -> VortexResult<E> {
44        E::execute(self, ctx)
45    }
46}
47
48/// Execution context for batch CPU compute.
49///
50/// Accumulates a trace of execution steps. Individual steps are logged at TRACE level for
51/// real-time following, and the full trace is dumped at DEBUG level when the context is dropped.
52pub struct ExecutionCtx {
53    id: usize,
54    session: VortexSession,
55    ops: Vec<String>,
56}
57
58impl ExecutionCtx {
59    /// Create a new execution context with the given session.
60    pub fn new(session: VortexSession) -> Self {
61        static EXEC_CTX_ID: AtomicUsize = AtomicUsize::new(0);
62        let id = EXEC_CTX_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
63        Self {
64            id,
65            session,
66            ops: Vec::new(),
67        }
68    }
69
70    /// Get the session associated with this execution context.
71    pub fn session(&self) -> &VortexSession {
72        &self.session
73    }
74
75    /// Log an execution step at the current depth.
76    ///
77    /// Steps are accumulated and dumped as a single trace on Drop at DEBUG level.
78    /// Individual steps are also logged at TRACE level for real-time following.
79    ///
80    /// Use the [`format_args!`] macro to create the `msg` argument.
81    pub fn log(&mut self, msg: fmt::Arguments<'_>) {
82        if tracing::enabled!(tracing::Level::DEBUG) {
83            let formatted = format!(" - {msg}");
84            tracing::trace!("exec[{}]: {formatted}", self.id);
85            self.ops.push(formatted);
86        }
87    }
88}
89
90impl Display for ExecutionCtx {
91    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92        write!(f, "exec[{}]", self.id)
93    }
94}
95
96impl Drop for ExecutionCtx {
97    fn drop(&mut self) {
98        if !self.ops.is_empty() && tracing::enabled!(tracing::Level::DEBUG) {
99            // Unlike itertools `.format()` (panics in 0.14 on second format)
100            struct FmtOps<'a>(&'a [String]);
101            impl Display for FmtOps<'_> {
102                fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
103                    for (i, op) in self.0.iter().enumerate() {
104                        if i > 0 {
105                            f.write_str("\n")?;
106                        }
107                        f.write_str(op)?;
108                    }
109                    Ok(())
110                }
111            }
112            tracing::debug!("exec[{}] trace:\n{}", self.id, FmtOps(&self.ops));
113        }
114    }
115}
116
117/// Executing an [`ArrayRef`] into an [`ArrayRef`] is the atomic execution loop within Vortex.
118///
119/// It attempts to take the smallest possible step of execution such that the returned array
120/// is incrementally more "executed" than the input array. In other words, it is closer to becoming
121/// a canonical array.
122///
123/// The execution steps are as follows:
124/// 0. Check for canonical.
125/// 1. Attempt to call `reduce_parent` on each child.
126/// 2. Attempt to `reduce` the array with metadata-only optimizations.
127/// 3. Attempt to call `execute_parent` on each child.
128/// 4. Call `execute` on the array itself.
129///
130/// Most users will not call this method directly, instead preferring to specify an executable
131/// target such as [`crate::Columnar`], [`Canonical`], or any of the canonical array types (such as
132/// [`crate::arrays::PrimitiveArray`]).
133impl Executable for ArrayRef {
134    fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self> {
135        // 0. Check for canonical
136        if let Some(canonical) = array.as_opt::<AnyCanonical>() {
137            ctx.log(format_args!("-> canonical {}", array));
138            return Ok(Canonical::from(canonical).into_array());
139        }
140
141        // 1. reduce (metadata-only rewrites)
142        if let Some(reduced) = array.vtable().reduce(&array)? {
143            ctx.log(format_args!("reduce: rewrote {} -> {}", array, reduced));
144            reduced.statistics().inherit_from(array.statistics());
145            return Ok(reduced);
146        }
147
148        // 2. reduce_parent (child-driven metadata-only rewrites)
149        for child_idx in 0..array.nchildren() {
150            let child = array.nth_child(child_idx).vortex_expect("checked length");
151            if let Some(reduced_parent) = child.vtable().reduce_parent(&child, &array, child_idx)? {
152                ctx.log(format_args!(
153                    "reduce_parent: child[{}]({}) rewrote {} -> {}",
154                    child_idx,
155                    child.encoding_id(),
156                    array,
157                    reduced_parent
158                ));
159                reduced_parent.statistics().inherit_from(array.statistics());
160                return Ok(reduced_parent);
161            }
162        }
163
164        // 3. execute_parent (child-driven optimized execution)
165        for child_idx in 0..array.nchildren() {
166            let child = array.nth_child(child_idx).vortex_expect("checked length");
167            if let Some(executed_parent) = child
168                .vtable()
169                .execute_parent(&child, &array, child_idx, ctx)?
170            {
171                ctx.log(format_args!(
172                    "execute_parent: child[{}]({}) rewrote {} -> {}",
173                    child_idx,
174                    child.encoding_id(),
175                    array,
176                    executed_parent
177                ));
178                executed_parent
179                    .statistics()
180                    .inherit_from(array.statistics());
181                return Ok(executed_parent);
182            }
183        }
184
185        // 4. execute (optimized execution)
186        ctx.log(format_args!("executing {}", array));
187        let array = array
188            .vtable()
189            .execute(&array, ctx)
190            .map(|c| c.into_array())?;
191        array.statistics().inherit_from(array.statistics());
192        ctx.log(format_args!("-> {}", array.as_ref()));
193
194        Ok(array)
195    }
196}
197
198/// Extension trait for creating an execution context from a session.
199pub trait VortexSessionExecute {
200    /// Create a new execution context from this session.
201    fn create_execution_ctx(&self) -> ExecutionCtx;
202}
203
204impl VortexSessionExecute for VortexSession {
205    fn create_execution_ctx(&self) -> ExecutionCtx {
206        ExecutionCtx::new(self.clone())
207    }
208}