1use std::fmt;
5use std::fmt::Display;
6use std::sync::Arc;
7use std::sync::atomic::AtomicUsize;
8
9use vortex_error::VortexExpect;
10use vortex_error::VortexResult;
11use vortex_session::VortexSession;
12
13use crate::AnyCanonical;
14use crate::Array;
15use crate::ArrayRef;
16use crate::Canonical;
17use crate::IntoArray;
18
19pub trait Executable: Sized {
27 fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self>;
28}
29
30impl dyn Array + '_ {
31 pub fn execute<E: Executable>(self: Arc<Self>, ctx: &mut ExecutionCtx) -> VortexResult<E> {
35 E::execute(self, ctx)
36 }
37
38 pub fn execute_as<E: Executable>(
40 self: Arc<Self>,
41 _name: &'static str,
42 ctx: &mut ExecutionCtx,
43 ) -> VortexResult<E> {
44 E::execute(self, ctx)
45 }
46}
47
48pub struct ExecutionCtx {
53 id: usize,
54 session: VortexSession,
55 ops: Vec<String>,
56}
57
58impl ExecutionCtx {
59 pub fn new(session: VortexSession) -> Self {
61 static EXEC_CTX_ID: AtomicUsize = AtomicUsize::new(0);
62 let id = EXEC_CTX_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
63 Self {
64 id,
65 session,
66 ops: Vec::new(),
67 }
68 }
69
70 pub fn session(&self) -> &VortexSession {
72 &self.session
73 }
74
75 pub fn log(&mut self, msg: fmt::Arguments<'_>) {
82 if tracing::enabled!(tracing::Level::DEBUG) {
83 let formatted = format!(" - {msg}");
84 tracing::trace!("exec[{}]: {formatted}", self.id);
85 self.ops.push(formatted);
86 }
87 }
88}
89
90impl Display for ExecutionCtx {
91 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92 write!(f, "exec[{}]", self.id)
93 }
94}
95
96impl Drop for ExecutionCtx {
97 fn drop(&mut self) {
98 if !self.ops.is_empty() && tracing::enabled!(tracing::Level::DEBUG) {
99 struct FmtOps<'a>(&'a [String]);
101 impl Display for FmtOps<'_> {
102 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
103 for (i, op) in self.0.iter().enumerate() {
104 if i > 0 {
105 f.write_str("\n")?;
106 }
107 f.write_str(op)?;
108 }
109 Ok(())
110 }
111 }
112 tracing::debug!("exec[{}] trace:\n{}", self.id, FmtOps(&self.ops));
113 }
114 }
115}
116
117impl Executable for ArrayRef {
134 fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self> {
135 if let Some(canonical) = array.as_opt::<AnyCanonical>() {
137 ctx.log(format_args!("-> canonical {}", array));
138 return Ok(Canonical::from(canonical).into_array());
139 }
140
141 if let Some(reduced) = array.vtable().reduce(&array)? {
143 ctx.log(format_args!("reduce: rewrote {} -> {}", array, reduced));
144 reduced.statistics().inherit_from(array.statistics());
145 return Ok(reduced);
146 }
147
148 for child_idx in 0..array.nchildren() {
150 let child = array.nth_child(child_idx).vortex_expect("checked length");
151 if let Some(reduced_parent) = child.vtable().reduce_parent(&child, &array, child_idx)? {
152 ctx.log(format_args!(
153 "reduce_parent: child[{}]({}) rewrote {} -> {}",
154 child_idx,
155 child.encoding_id(),
156 array,
157 reduced_parent
158 ));
159 reduced_parent.statistics().inherit_from(array.statistics());
160 return Ok(reduced_parent);
161 }
162 }
163
164 for child_idx in 0..array.nchildren() {
166 let child = array.nth_child(child_idx).vortex_expect("checked length");
167 if let Some(executed_parent) = child
168 .vtable()
169 .execute_parent(&child, &array, child_idx, ctx)?
170 {
171 ctx.log(format_args!(
172 "execute_parent: child[{}]({}) rewrote {} -> {}",
173 child_idx,
174 child.encoding_id(),
175 array,
176 executed_parent
177 ));
178 executed_parent
179 .statistics()
180 .inherit_from(array.statistics());
181 return Ok(executed_parent);
182 }
183 }
184
185 ctx.log(format_args!("executing {}", array));
187 let array = array
188 .vtable()
189 .execute(&array, ctx)
190 .map(|c| c.into_array())?;
191 array.statistics().inherit_from(array.statistics());
192 ctx.log(format_args!("-> {}", array.as_ref()));
193
194 Ok(array)
195 }
196}
197
198pub trait VortexSessionExecute {
200 fn create_execution_ctx(&self) -> ExecutionCtx;
202}
203
204impl VortexSessionExecute for VortexSession {
205 fn create_execution_ctx(&self) -> ExecutionCtx {
206 ExecutionCtx::new(self.clone())
207 }
208}