1use std::env::VarError;
5use std::fmt;
6use std::fmt::Display;
7use std::sync::Arc;
8use std::sync::LazyLock;
9use std::sync::atomic::AtomicUsize;
10
11use vortex_error::VortexExpect;
12use vortex_error::VortexResult;
13use vortex_error::vortex_bail;
14use vortex_error::vortex_panic;
15use vortex_session::VortexSession;
16
17use crate::AnyCanonical;
18use crate::ArrayRef;
19use crate::Canonical;
20use crate::DynArray;
21use crate::IntoArray;
22use crate::matcher::Matcher;
23use crate::optimizer::ArrayOptimizer;
24use crate::vtable::VTable;
25use crate::vtable::upcast_array;
26
27pub(crate) static MAX_ITERATIONS: LazyLock<usize> =
30 LazyLock::new(|| match std::env::var("VORTEX_MAX_ITERATIONS") {
31 Ok(val) => val
32 .parse::<usize>()
33 .unwrap_or_else(|e| vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid usize: {e}")),
34 Err(VarError::NotPresent) => 128,
35 Err(VarError::NotUnicode(_)) => {
36 vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid unicode string")
37 }
38 });
39
40pub trait Executable: Sized {
48 fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self>;
49}
50
51impl dyn DynArray + '_ {
52 pub fn execute<E: Executable>(self: Arc<Self>, ctx: &mut ExecutionCtx) -> VortexResult<E> {
56 E::execute(self, ctx)
57 }
58
59 pub fn execute_as<E: Executable>(
61 self: Arc<Self>,
62 _name: &'static str,
63 ctx: &mut ExecutionCtx,
64 ) -> VortexResult<E> {
65 E::execute(self, ctx)
66 }
67
68 pub fn execute_until<M: Matcher>(
83 self: Arc<Self>,
84 ctx: &mut ExecutionCtx,
85 ) -> VortexResult<ArrayRef> {
86 static MAX_ITERATIONS: LazyLock<usize> =
87 LazyLock::new(|| match std::env::var("VORTEX_MAX_ITERATIONS") {
88 Ok(val) => val.parse::<usize>().unwrap_or_else(|e| {
89 vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid usize: {e}")
90 }),
91 Err(VarError::NotPresent) => 128,
92 Err(VarError::NotUnicode(_)) => {
93 vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid unicode string")
94 }
95 });
96
97 let mut current = self.optimize()?;
98 let mut stack: Vec<(ArrayRef, usize, DonePredicate)> = Vec::new();
100
101 for _ in 0..*MAX_ITERATIONS {
102 let is_done = stack
104 .last()
105 .map_or(M::matches as DonePredicate, |frame| frame.2);
106 if is_done(current.as_ref()) {
107 match stack.pop() {
108 None => {
109 ctx.log(format_args!("-> {}", current));
110 return Ok(current);
111 }
112 Some((parent, child_idx, _)) => {
113 current = parent.with_child(child_idx, current)?;
114 current = current.optimize()?;
115 continue;
116 }
117 }
118 }
119
120 if AnyCanonical::matches(current.as_ref()) {
123 match stack.pop() {
124 None => {
125 ctx.log(format_args!("-> canonical (unmatched) {}", current));
126 return Ok(current);
127 }
128 Some((parent, child_idx, _)) => {
129 current = parent.with_child(child_idx, current)?;
130 current = current.optimize()?;
131 continue;
132 }
133 }
134 }
135
136 if let Some(rewritten) = try_execute_parent(¤t, ctx)? {
138 ctx.log(format_args!(
139 "execute_parent rewrote {} -> {}",
140 current, rewritten
141 ));
142 current = rewritten.optimize()?;
143 continue;
144 }
145
146 let result = execute_step(current, ctx)?;
148 let (array, step) = result.into_parts();
149 match step {
150 ExecutionStep::ExecuteChild(i, done) => {
151 let child = array
152 .nth_child(i)
153 .vortex_expect("ExecuteChild index in bounds");
154 ctx.log(format_args!(
155 "ExecuteChild({i}): pushing {}, focusing on {}",
156 array, child
157 ));
158 stack.push((array, i, done));
159 current = child.optimize()?;
160 }
161 ExecutionStep::Done => {
162 ctx.log(format_args!("Done: {}", array));
163 current = array;
164 }
165 }
166 }
167
168 vortex_bail!(
169 "Exceeded maximum execution iterations ({}) while executing array",
170 *MAX_ITERATIONS,
171 )
172 }
173}
174
175pub struct ExecutionCtx {
180 id: usize,
181 session: VortexSession,
182 ops: Vec<String>,
183}
184
185impl ExecutionCtx {
186 pub fn new(session: VortexSession) -> Self {
188 static EXEC_CTX_ID: AtomicUsize = AtomicUsize::new(0);
189 let id = EXEC_CTX_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
190 Self {
191 id,
192 session,
193 ops: Vec::new(),
194 }
195 }
196
197 pub fn session(&self) -> &VortexSession {
199 &self.session
200 }
201
202 pub fn log(&mut self, msg: fmt::Arguments<'_>) {
209 if tracing::enabled!(tracing::Level::DEBUG) {
210 let formatted = format!(" - {msg}");
211 tracing::trace!("exec[{}]: {formatted}", self.id);
212 self.ops.push(formatted);
213 }
214 }
215}
216
217impl Display for ExecutionCtx {
218 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
219 write!(f, "exec[{}]", self.id)
220 }
221}
222
223impl Drop for ExecutionCtx {
224 fn drop(&mut self) {
225 if !self.ops.is_empty() && tracing::enabled!(tracing::Level::DEBUG) {
226 struct FmtOps<'a>(&'a [String]);
228 impl Display for FmtOps<'_> {
229 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
230 for (i, op) in self.0.iter().enumerate() {
231 if i > 0 {
232 f.write_str("\n")?;
233 }
234 f.write_str(op)?;
235 }
236 Ok(())
237 }
238 }
239 tracing::debug!("exec[{}] trace:\n{}", self.id, FmtOps(&self.ops));
240 }
241 }
242}
243
244impl Executable for ArrayRef {
261 fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self> {
262 if let Some(canonical) = array.as_opt::<AnyCanonical>() {
264 ctx.log(format_args!("-> canonical {}", array));
265 return Ok(Canonical::from(canonical).into_array());
266 }
267
268 if let Some(reduced) = array.vtable().reduce(&array)? {
270 ctx.log(format_args!("reduce: rewrote {} -> {}", array, reduced));
271 reduced.statistics().inherit_from(array.statistics());
272 return Ok(reduced);
273 }
274
275 for child_idx in 0..array.nchildren() {
277 let child = array.nth_child(child_idx).vortex_expect("checked length");
278 if let Some(reduced_parent) = child.vtable().reduce_parent(&child, &array, child_idx)? {
279 ctx.log(format_args!(
280 "reduce_parent: child[{}]({}) rewrote {} -> {}",
281 child_idx,
282 child.encoding_id(),
283 array,
284 reduced_parent
285 ));
286 reduced_parent.statistics().inherit_from(array.statistics());
287 return Ok(reduced_parent);
288 }
289 }
290
291 for child_idx in 0..array.nchildren() {
293 let child = array.nth_child(child_idx).vortex_expect("checked length");
295 if let Some(executed_parent) = child
296 .vtable()
297 .execute_parent(&child, &array, child_idx, ctx)?
298 {
299 ctx.log(format_args!(
300 "execute_parent: child[{}]({}) rewrote {} -> {}",
301 child_idx,
302 child.encoding_id(),
303 array,
304 executed_parent
305 ));
306 executed_parent
307 .statistics()
308 .inherit_from(array.statistics());
309 return Ok(executed_parent);
310 }
311 }
312
313 ctx.log(format_args!("executing {}", array));
315 let result = execute_step(array, ctx)?;
316 let (array, step) = result.into_parts();
317 match step {
318 ExecutionStep::Done => {
319 ctx.log(format_args!("-> {}", array.as_ref()));
320 Ok(array)
321 }
322 ExecutionStep::ExecuteChild(i, _) => {
323 let child = array.nth_child(i).vortex_expect("valid child index");
326 let executed_child = child.execute::<ArrayRef>(ctx)?;
327 array.with_child(i, executed_child)
328 }
329 }
330 }
331}
332
333fn execute_step(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
337 let vtable = array.vtable().clone_boxed();
338 vtable.execute(array, ctx)
339}
340
341fn try_execute_parent(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Option<ArrayRef>> {
343 for child_idx in 0..array.nchildren() {
344 let child = array
345 .nth_child(child_idx)
346 .vortex_expect("checked nchildren");
347 if let Some(result) = child
348 .vtable()
349 .execute_parent(&child, array, child_idx, ctx)?
350 {
351 result.statistics().inherit_from(array.statistics());
352 return Ok(Some(result));
353 }
354 }
355 Ok(None)
356}
357
358pub type DonePredicate = fn(&dyn DynArray) -> bool;
360
361pub enum ExecutionStep {
367 ExecuteChild(usize, DonePredicate),
374
375 Done,
378}
379
380impl fmt::Debug for ExecutionStep {
381 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
382 match self {
383 ExecutionStep::ExecuteChild(idx, _) => {
384 f.debug_tuple("ExecuteChild").field(idx).finish()
385 }
386 ExecutionStep::Done => write!(f, "Done"),
387 }
388 }
389}
390
391pub struct ExecutionResult {
396 array: ArrayRef,
397 step: ExecutionStep,
398}
399
400impl ExecutionResult {
401 pub fn done(result: impl IntoArray) -> Self {
403 Self {
404 array: result.into_array(),
405 step: ExecutionStep::Done,
406 }
407 }
408
409 pub fn done_upcast<V: VTable>(arr: Arc<V::Array>) -> Self {
410 Self {
411 array: upcast_array::<V>(arr),
412 step: ExecutionStep::Done,
413 }
414 }
415
416 pub fn execute_child<M: Matcher>(array: impl IntoArray, child_idx: usize) -> Self {
420 Self {
421 array: array.into_array(),
422 step: ExecutionStep::ExecuteChild(child_idx, M::matches),
423 }
424 }
425
426 pub fn execute_child_upcast<V: VTable, M: Matcher>(
429 array: Arc<V::Array>,
430 child_idx: usize,
431 ) -> Self {
432 Self {
433 array: upcast_array::<V>(array),
434 step: ExecutionStep::ExecuteChild(child_idx, M::matches),
435 }
436 }
437
438 pub fn array(&self) -> &ArrayRef {
440 &self.array
441 }
442
443 pub fn step(&self) -> &ExecutionStep {
445 &self.step
446 }
447
448 pub fn into_parts(self) -> (ArrayRef, ExecutionStep) {
450 (self.array, self.step)
451 }
452}
453
454impl fmt::Debug for ExecutionResult {
455 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
456 f.debug_struct("ExecutionResult")
457 .field("array", &self.array)
458 .field("step", &self.step)
459 .finish()
460 }
461}
462
463#[macro_export]
472macro_rules! require_child {
473 ($V:ty, $parent:expr, $child:expr, $idx:expr => $M:ty) => {{
474 if !$child.is::<$M>() {
475 return Ok($crate::ExecutionResult::execute_child_upcast::<$V, $M>(
476 std::sync::Arc::clone(&$parent),
477 $idx,
478 ));
479 }
480 $parent
481 }};
482}
483
484pub trait VortexSessionExecute {
486 fn create_execution_ctx(&self) -> ExecutionCtx;
488}
489
490impl VortexSessionExecute for VortexSession {
491 fn create_execution_ctx(&self) -> ExecutionCtx {
492 ExecutionCtx::new(self.clone())
493 }
494}