use std::env::VarError;
use std::fmt;
use std::fmt::Display;
use std::sync::Arc;
use std::sync::LazyLock;
use std::sync::atomic::AtomicUsize;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use vortex_error::vortex_panic;
use vortex_session::VortexSession;
use crate::AnyCanonical;
use crate::ArrayRef;
use crate::Canonical;
use crate::DynArray;
use crate::IntoArray;
use crate::matcher::Matcher;
use crate::optimizer::ArrayOptimizer;
pub(crate) static MAX_ITERATIONS: LazyLock<usize> =
LazyLock::new(|| match std::env::var("VORTEX_MAX_ITERATIONS") {
Ok(val) => val
.parse::<usize>()
.unwrap_or_else(|e| vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid usize: {e}")),
Err(VarError::NotPresent) => 128,
Err(VarError::NotUnicode(_)) => {
vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid unicode string")
}
});
pub trait Executable: Sized {
fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self>;
}
impl dyn DynArray + '_ {
pub fn execute<E: Executable>(self: Arc<Self>, ctx: &mut ExecutionCtx) -> VortexResult<E> {
E::execute(self, ctx)
}
pub fn execute_as<E: Executable>(
self: Arc<Self>,
_name: &'static str,
ctx: &mut ExecutionCtx,
) -> VortexResult<E> {
E::execute(self, ctx)
}
pub fn execute_until<M: Matcher>(
self: Arc<Self>,
ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
static MAX_ITERATIONS: LazyLock<usize> =
LazyLock::new(|| match std::env::var("VORTEX_MAX_ITERATIONS") {
Ok(val) => val.parse::<usize>().unwrap_or_else(|e| {
vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid usize: {e}")
}),
Err(VarError::NotPresent) => 128,
Err(VarError::NotUnicode(_)) => {
vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid unicode string")
}
});
let mut current = self.optimize()?;
let mut stack: Vec<(ArrayRef, usize, DonePredicate)> = Vec::new();
for _ in 0..*MAX_ITERATIONS {
let is_done = stack
.last()
.map_or(M::matches as DonePredicate, |frame| frame.2);
if is_done(current.as_ref()) {
match stack.pop() {
None => {
ctx.log(format_args!("-> {}", current));
return Ok(current);
}
Some((parent, child_idx, _)) => {
current = parent.with_child(child_idx, current)?;
current = current.optimize()?;
continue;
}
}
}
if AnyCanonical::matches(current.as_ref()) {
match stack.pop() {
None => {
ctx.log(format_args!("-> canonical (unmatched) {}", current));
return Ok(current);
}
Some((parent, child_idx, _)) => {
current = parent.with_child(child_idx, current)?;
current = current.optimize()?;
continue;
}
}
}
if let Some(rewritten) = try_execute_parent(¤t, ctx)? {
ctx.log(format_args!(
"execute_parent rewrote {} -> {}",
current, rewritten
));
current = rewritten.optimize()?;
continue;
}
match current.vtable().execute(¤t, ctx)? {
ExecutionStep::ExecuteChild(i, done) => {
let child = current
.nth_child(i)
.vortex_expect("ExecuteChild index in bounds");
ctx.log(format_args!(
"ExecuteChild({i}): pushing {}, focusing on {}",
current, child
));
stack.push((current, i, done));
current = child.optimize()?;
}
ExecutionStep::Done(result) => {
ctx.log(format_args!("Done: {} -> {}", current, result));
current = result;
}
}
}
vortex_bail!(
"Exceeded maximum execution iterations ({}) while executing array",
*MAX_ITERATIONS,
)
}
}
pub struct ExecutionCtx {
id: usize,
session: VortexSession,
ops: Vec<String>,
}
impl ExecutionCtx {
pub fn new(session: VortexSession) -> Self {
static EXEC_CTX_ID: AtomicUsize = AtomicUsize::new(0);
let id = EXEC_CTX_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Self {
id,
session,
ops: Vec::new(),
}
}
pub fn session(&self) -> &VortexSession {
&self.session
}
pub fn log(&mut self, msg: fmt::Arguments<'_>) {
if tracing::enabled!(tracing::Level::DEBUG) {
let formatted = format!(" - {msg}");
tracing::trace!("exec[{}]: {formatted}", self.id);
self.ops.push(formatted);
}
}
}
impl Display for ExecutionCtx {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "exec[{}]", self.id)
}
}
impl Drop for ExecutionCtx {
fn drop(&mut self) {
if !self.ops.is_empty() && tracing::enabled!(tracing::Level::DEBUG) {
struct FmtOps<'a>(&'a [String]);
impl Display for FmtOps<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
for (i, op) in self.0.iter().enumerate() {
if i > 0 {
f.write_str("\n")?;
}
f.write_str(op)?;
}
Ok(())
}
}
tracing::debug!("exec[{}] trace:\n{}", self.id, FmtOps(&self.ops));
}
}
}
impl Executable for ArrayRef {
fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self> {
if let Some(canonical) = array.as_opt::<AnyCanonical>() {
ctx.log(format_args!("-> canonical {}", array));
return Ok(Canonical::from(canonical).into_array());
}
if let Some(reduced) = array.vtable().reduce(&array)? {
ctx.log(format_args!("reduce: rewrote {} -> {}", array, reduced));
reduced.statistics().inherit_from(array.statistics());
return Ok(reduced);
}
for child_idx in 0..array.nchildren() {
let child = array.nth_child(child_idx).vortex_expect("checked length");
if let Some(reduced_parent) = child.vtable().reduce_parent(&child, &array, child_idx)? {
ctx.log(format_args!(
"reduce_parent: child[{}]({}) rewrote {} -> {}",
child_idx,
child.encoding_id(),
array,
reduced_parent
));
reduced_parent.statistics().inherit_from(array.statistics());
return Ok(reduced_parent);
}
}
for child_idx in 0..array.nchildren() {
let child = array.nth_child(child_idx).vortex_expect("checked length");
if let Some(executed_parent) = child
.vtable()
.execute_parent(&child, &array, child_idx, ctx)?
{
ctx.log(format_args!(
"execute_parent: child[{}]({}) rewrote {} -> {}",
child_idx,
child.encoding_id(),
array,
executed_parent
));
executed_parent
.statistics()
.inherit_from(array.statistics());
return Ok(executed_parent);
}
}
ctx.log(format_args!("executing {}", array));
match array.vtable().execute(&array, ctx)? {
ExecutionStep::Done(result) => {
ctx.log(format_args!("-> {}", result.as_ref()));
Ok(result)
}
ExecutionStep::ExecuteChild(i, _) => {
let child = array.nth_child(i).vortex_expect("valid child index");
let executed_child = child.execute::<ArrayRef>(ctx)?;
array.with_child(i, executed_child)
}
}
}
}
fn try_execute_parent(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Option<ArrayRef>> {
for child_idx in 0..array.nchildren() {
let child = array
.nth_child(child_idx)
.vortex_expect("checked nchildren");
if let Some(result) = child
.vtable()
.execute_parent(&child, array, child_idx, ctx)?
{
result.statistics().inherit_from(array.statistics());
return Ok(Some(result));
}
}
Ok(None)
}
pub type DonePredicate = fn(&dyn DynArray) -> bool;
pub enum ExecutionStep {
ExecuteChild(usize, DonePredicate),
Done(ArrayRef),
}
impl ExecutionStep {
pub fn execute_child<M: Matcher>(child_idx: usize) -> Self {
ExecutionStep::ExecuteChild(child_idx, M::matches)
}
pub fn done(result: ArrayRef) -> Self {
ExecutionStep::Done(result)
}
}
impl fmt::Debug for ExecutionStep {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ExecutionStep::ExecuteChild(idx, _) => {
f.debug_tuple("ExecuteChild").field(idx).finish()
}
ExecutionStep::Done(result) => f.debug_tuple("Done").field(result).finish(),
}
}
}
pub trait VortexSessionExecute {
fn create_execution_ctx(&self) -> ExecutionCtx;
}
impl VortexSessionExecute for VortexSession {
fn create_execution_ctx(&self) -> ExecutionCtx {
ExecutionCtx::new(self.clone())
}
}