1use std::any::type_name;
5use std::fmt;
6use std::fmt::Display;
7use std::sync::Arc;
8use std::sync::atomic::AtomicUsize;
9
10use vortex_error::VortexExpect;
11use vortex_error::VortexResult;
12use vortex_session::VortexSession;
13
14use crate::AnyCanonical;
15use crate::Array;
16use crate::ArrayRef;
17use crate::Canonical;
18use crate::IntoArray;
19
20pub trait Executable: Sized {
28 fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self>;
29}
30
31impl dyn Array + '_ {
32 pub fn execute<E: Executable>(self: Arc<Self>, ctx: &mut ExecutionCtx) -> VortexResult<E> {
36 ctx.log_entry(
37 &self,
38 format_args!("execute<{}> {}", type_name::<E>(), self),
39 );
40 E::execute(self, ctx)
41 }
42
43 pub fn execute_as<E: Executable>(
45 self: Arc<Self>,
46 name: &'static str,
47 ctx: &mut ExecutionCtx,
48 ) -> VortexResult<E> {
49 ctx.log_entry(
50 &self,
51 format_args!("{}: execute<{}> {}", name, type_name::<E>(), self),
52 );
53 E::execute(self, ctx)
54 }
55}
56
57pub struct ExecutionCtx {
62 id: usize,
63 session: VortexSession,
64 ops: Vec<String>,
65}
66
67impl ExecutionCtx {
68 pub fn new(session: VortexSession) -> Self {
70 static EXEC_CTX_ID: AtomicUsize = AtomicUsize::new(0);
71 let id = EXEC_CTX_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
72 Self {
73 id,
74 session,
75 ops: Vec::new(),
76 }
77 }
78
79 pub fn session(&self) -> &VortexSession {
81 &self.session
82 }
83
84 pub fn log(&mut self, msg: fmt::Arguments<'_>) {
91 if tracing::enabled!(tracing::Level::DEBUG) {
92 let formatted = format!(" - {msg}");
93 tracing::trace!("exec[{}]: {formatted}", self.id);
94 self.ops.push(formatted);
95 }
96 }
97
98 fn log_entry(&mut self, array: &dyn Array, msg: fmt::Arguments<'_>) {
101 if tracing::enabled!(tracing::Level::DEBUG) {
102 if self.ops.is_empty() {
103 self.log(format_args!("{msg}\n{}", array.display_tree()));
104 } else {
105 self.log(msg);
106 }
107 }
108 }
109}
110
111impl Display for ExecutionCtx {
112 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
113 write!(f, "exec[{}]", self.id)
114 }
115}
116
117impl Drop for ExecutionCtx {
118 fn drop(&mut self) {
119 if !self.ops.is_empty() && tracing::enabled!(tracing::Level::DEBUG) {
120 struct FmtOps<'a>(&'a [String]);
122 impl Display for FmtOps<'_> {
123 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
124 for (i, op) in self.0.iter().enumerate() {
125 if i > 0 {
126 f.write_str("\n")?;
127 }
128 f.write_str(op)?;
129 }
130 Ok(())
131 }
132 }
133 tracing::debug!("exec[{}] trace:\n{}", self.id, FmtOps(&self.ops));
134 }
135 }
136}
137
138impl Executable for ArrayRef {
155 fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self> {
156 if let Some(canonical) = array.as_opt::<AnyCanonical>() {
158 ctx.log(format_args!("-> canonical {}", array));
159 return Ok(Canonical::from(canonical).into_array());
160 }
161
162 if let Some(reduced) = array.vtable().reduce(&array)? {
164 ctx.log(format_args!("reduce: rewrote {} -> {}", array, reduced));
165 reduced.statistics().inherit_from(array.statistics());
166 return Ok(reduced);
167 }
168
169 for child_idx in 0..array.nchildren() {
171 let child = array.nth_child(child_idx).vortex_expect("checked length");
172 if let Some(reduced_parent) = child.vtable().reduce_parent(&child, &array, child_idx)? {
173 ctx.log(format_args!(
174 "reduce_parent: child[{}]({}) rewrote {} -> {}",
175 child_idx,
176 child.encoding_id(),
177 array,
178 reduced_parent
179 ));
180 reduced_parent.statistics().inherit_from(array.statistics());
181 return Ok(reduced_parent);
182 }
183 }
184
185 for child_idx in 0..array.nchildren() {
187 let child = array.nth_child(child_idx).vortex_expect("checked length");
188 if let Some(executed_parent) = child
189 .vtable()
190 .execute_parent(&child, &array, child_idx, ctx)?
191 {
192 ctx.log(format_args!(
193 "execute_parent: child[{}]({}) rewrote {} -> {}",
194 child_idx,
195 child.encoding_id(),
196 array,
197 executed_parent
198 ));
199 executed_parent
200 .statistics()
201 .inherit_from(array.statistics());
202 return Ok(executed_parent);
203 }
204 }
205
206 ctx.log(format_args!("executing {}", array));
208 let array = array
209 .vtable()
210 .execute(&array, ctx)
211 .map(|c| c.into_array())?;
212 array.statistics().inherit_from(array.statistics());
213 ctx.log(format_args!("-> {}", array.as_ref()));
214
215 Ok(array)
216 }
217}
218
219pub trait VortexSessionExecute {
221 fn create_execution_ctx(&self) -> ExecutionCtx;
223}
224
225impl VortexSessionExecute for VortexSession {
226 fn create_execution_ctx(&self) -> ExecutionCtx {
227 ExecutionCtx::new(self.clone())
228 }
229}