vortex_array/
executor.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use vortex_error::VortexResult;
5use vortex_session::VortexSession;
6use vortex_vector::Datum;
7use vortex_vector::Vector;
8
9use crate::Array;
10use crate::ArrayRef;
11use crate::arrays::ConstantVTable;
12
13/// Execution context for batch CPU compute.
14pub struct ExecutionCtx {
15    session: VortexSession,
16}
17
18impl ExecutionCtx {
19    /// Create a new execution context with the given session.
20    pub(crate) fn new(session: VortexSession) -> Self {
21        Self { session }
22    }
23
24    /// Get the session associated with this execution context.
25    pub fn session(&self) -> &VortexSession {
26        &self.session
27    }
28}
29
30/// Executor for exporting a Vortex [`Vector`] or [`Datum`] from an [`ArrayRef`].
31pub trait VectorExecutor {
32    /// Recursively execute the array.
33    fn execute(&self, ctx: &mut ExecutionCtx) -> VortexResult<Vector>;
34
35    /// Execute the array and return the resulting datum.
36    fn execute_datum(&self, session: &VortexSession) -> VortexResult<Datum>;
37    /// Execute the array and return the resulting vector.
38    fn execute_vector(&self, session: &VortexSession) -> VortexResult<Vector>;
39}
40
41impl VectorExecutor for ArrayRef {
42    fn execute(&self, ctx: &mut ExecutionCtx) -> VortexResult<Vector> {
43        // Try and dispatch to a child that can optimize execution.
44        for (child_idx, child) in self.children().iter().enumerate() {
45            if let Some(result) = child
46                .encoding()
47                .as_dyn()
48                .execute_parent(child, self, child_idx, ctx)?
49            {
50                tracing::debug!(
51                    "Executed array {} via child {} optimization.",
52                    self.encoding_id(),
53                    child.encoding_id()
54                );
55                return Ok(result);
56            }
57        }
58
59        // Otherwise fall back to the default execution.
60        self.encoding().as_dyn().execute(self, ctx)
61    }
62
63    fn execute_datum(&self, session: &VortexSession) -> VortexResult<Datum> {
64        // Attempt to short-circuit constant arrays.
65        if let Some(constant) = self.as_opt::<ConstantVTable>() {
66            return Ok(Datum::Scalar(constant.scalar().to_vector_scalar()));
67        }
68
69        let mut ctx = ExecutionCtx::new(session.clone());
70        tracing::debug!("Executing array {}:\n{}", self, self.display_tree());
71        Ok(Datum::Vector(self.execute(&mut ctx)?))
72    }
73
74    fn execute_vector(&self, session: &VortexSession) -> VortexResult<Vector> {
75        let len = self.len();
76        Ok(self.execute_datum(session)?.unwrap_into_vector(len))
77    }
78}