use std::any::type_name;
use std::fmt;
use std::fmt::Display;
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_session::VortexSession;
use crate::AnyCanonical;
use crate::Array;
use crate::ArrayRef;
use crate::Canonical;
use crate::IntoArray;
pub trait Executable: Sized {
fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self>;
}
impl dyn Array + '_ {
pub fn execute<E: Executable>(self: Arc<Self>, ctx: &mut ExecutionCtx) -> VortexResult<E> {
ctx.log_entry(
&self,
format_args!("execute<{}> {}", type_name::<E>(), self),
);
E::execute(self, ctx)
}
pub fn execute_as<E: Executable>(
self: Arc<Self>,
name: &'static str,
ctx: &mut ExecutionCtx,
) -> VortexResult<E> {
ctx.log_entry(
&self,
format_args!("{}: execute<{}> {}", name, type_name::<E>(), self),
);
E::execute(self, ctx)
}
}
pub struct ExecutionCtx {
id: usize,
session: VortexSession,
ops: Vec<String>,
}
impl ExecutionCtx {
pub fn new(session: VortexSession) -> Self {
static EXEC_CTX_ID: AtomicUsize = AtomicUsize::new(0);
let id = EXEC_CTX_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Self {
id,
session,
ops: Vec::new(),
}
}
pub fn session(&self) -> &VortexSession {
&self.session
}
pub fn log(&mut self, msg: fmt::Arguments<'_>) {
if tracing::enabled!(tracing::Level::DEBUG) {
let formatted = format!(" - {msg}");
tracing::trace!("exec[{}]: {formatted}", self.id);
self.ops.push(formatted);
}
}
fn log_entry(&mut self, array: &dyn Array, msg: fmt::Arguments<'_>) {
if tracing::enabled!(tracing::Level::DEBUG) {
if self.ops.is_empty() {
self.log(format_args!("{msg}\n{}", array.display_tree()));
} else {
self.log(msg);
}
}
}
}
impl Display for ExecutionCtx {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "exec[{}]", self.id)
}
}
impl Drop for ExecutionCtx {
fn drop(&mut self) {
if !self.ops.is_empty() && tracing::enabled!(tracing::Level::DEBUG) {
struct FmtOps<'a>(&'a [String]);
impl Display for FmtOps<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
for (i, op) in self.0.iter().enumerate() {
if i > 0 {
f.write_str("\n")?;
}
f.write_str(op)?;
}
Ok(())
}
}
tracing::debug!("exec[{}] trace:\n{}", self.id, FmtOps(&self.ops));
}
}
}
impl Executable for ArrayRef {
fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self> {
if let Some(canonical) = array.as_opt::<AnyCanonical>() {
ctx.log(format_args!("-> canonical {}", array));
return Ok(Canonical::from(canonical).into_array());
}
if let Some(reduced) = array.vtable().reduce(&array)? {
ctx.log(format_args!("reduce: rewrote {} -> {}", array, reduced));
reduced.statistics().inherit_from(array.statistics());
return Ok(reduced);
}
for child_idx in 0..array.nchildren() {
let child = array.nth_child(child_idx).vortex_expect("checked length");
if let Some(reduced_parent) = child.vtable().reduce_parent(&child, &array, child_idx)? {
ctx.log(format_args!(
"reduce_parent: child[{}]({}) rewrote {} -> {}",
child_idx,
child.encoding_id(),
array,
reduced_parent
));
reduced_parent.statistics().inherit_from(array.statistics());
return Ok(reduced_parent);
}
}
for child_idx in 0..array.nchildren() {
let child = array.nth_child(child_idx).vortex_expect("checked length");
if let Some(executed_parent) = child
.vtable()
.execute_parent(&child, &array, child_idx, ctx)?
{
ctx.log(format_args!(
"execute_parent: child[{}]({}) rewrote {} -> {}",
child_idx,
child.encoding_id(),
array,
executed_parent
));
executed_parent
.statistics()
.inherit_from(array.statistics());
return Ok(executed_parent);
}
}
ctx.log(format_args!("executing {}", array));
let array = array
.vtable()
.execute(&array, ctx)
.map(|c| c.into_array())?;
array.statistics().inherit_from(array.statistics());
ctx.log(format_args!("-> {}", array.as_ref()));
Ok(array)
}
}
pub trait VortexSessionExecute {
fn create_execution_ctx(&self) -> ExecutionCtx;
}
impl VortexSessionExecute for VortexSession {
fn create_execution_ctx(&self) -> ExecutionCtx {
ExecutionCtx::new(self.clone())
}
}