use std::env::VarError;
use std::fmt;
use std::fmt::Display;
use std::sync::LazyLock;
use std::sync::atomic::AtomicUsize;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use vortex_error::vortex_panic;
use vortex_session::VortexSession;
use crate::AnyCanonical;
use crate::ArrayRef;
use crate::Canonical;
use crate::IntoArray;
use crate::matcher::Matcher;
use crate::memory::HostAllocatorRef;
use crate::memory::MemorySessionExt;
use crate::optimizer::ArrayOptimizer;
pub(crate) static MAX_ITERATIONS: LazyLock<usize> =
LazyLock::new(|| match std::env::var("VORTEX_MAX_ITERATIONS") {
Ok(val) => val
.parse::<usize>()
.unwrap_or_else(|e| vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid usize: {e}")),
Err(VarError::NotPresent) => 128,
Err(VarError::NotUnicode(_)) => {
vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid unicode string")
}
});
pub trait Executable: Sized {
fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self>;
}
#[allow(clippy::same_name_method)]
impl ArrayRef {
pub fn execute<E: Executable>(self, ctx: &mut ExecutionCtx) -> VortexResult<E> {
E::execute(self, ctx)
}
pub fn execute_as<E: Executable>(
self,
_name: &'static str,
ctx: &mut ExecutionCtx,
) -> VortexResult<E> {
E::execute(self, ctx)
}
pub fn execute_until<M: Matcher>(self, ctx: &mut ExecutionCtx) -> VortexResult<ArrayRef> {
static MAX_ITERATIONS: LazyLock<usize> =
LazyLock::new(|| match std::env::var("VORTEX_MAX_ITERATIONS") {
Ok(val) => val.parse::<usize>().unwrap_or_else(|e| {
vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid usize: {e}")
}),
Err(VarError::NotPresent) => 128,
Err(VarError::NotUnicode(_)) => {
vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid unicode string")
}
});
let mut current = self.optimize()?;
let mut stack: Vec<(ArrayRef, usize, DonePredicate)> = Vec::new();
for _ in 0..*MAX_ITERATIONS {
let is_done = stack
.last()
.map_or(M::matches as DonePredicate, |frame| frame.2);
if is_done(¤t) {
match stack.pop() {
None => {
ctx.log(format_args!("-> {}", current));
return Ok(current);
}
Some((parent, slot_idx, _)) => {
current = parent.with_slot(slot_idx, current)?;
current = current.optimize()?;
continue;
}
}
}
if AnyCanonical::matches(¤t) {
match stack.pop() {
None => {
ctx.log(format_args!("-> canonical (unmatched) {}", current));
return Ok(current);
}
Some((parent, slot_idx, _)) => {
current = parent.with_slot(slot_idx, current)?;
current = current.optimize()?;
continue;
}
}
}
if let Some(rewritten) = try_execute_parent(¤t, ctx)? {
ctx.log(format_args!(
"execute_parent rewrote {} -> {}",
current, rewritten
));
current = rewritten.optimize()?;
continue;
}
let result = execute_step(current, ctx)?;
let (array, step) = result.into_parts();
match step {
ExecutionStep::ExecuteSlot(i, done) => {
let child = array.slots()[i]
.clone()
.vortex_expect("ExecuteSlot index in bounds");
ctx.log(format_args!(
"ExecuteSlot({i}): pushing {}, focusing on {}",
array, child
));
stack.push((array, i, done));
current = child.optimize()?;
}
ExecutionStep::Done => {
ctx.log(format_args!("Done: {}", array));
current = array;
}
}
}
vortex_bail!(
"Exceeded maximum execution iterations ({}) while executing array",
*MAX_ITERATIONS,
)
}
}
#[derive(Debug, Clone)]
pub struct ExecutionCtx {
id: usize,
session: VortexSession,
ops: Vec<String>,
}
impl ExecutionCtx {
pub fn new(session: VortexSession) -> Self {
static EXEC_CTX_ID: AtomicUsize = AtomicUsize::new(0);
let id = EXEC_CTX_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Self {
id,
session,
ops: Vec::new(),
}
}
pub fn session(&self) -> &VortexSession {
&self.session
}
pub fn allocator(&self) -> HostAllocatorRef {
self.session.allocator()
}
pub fn log(&mut self, msg: fmt::Arguments<'_>) {
if tracing::enabled!(tracing::Level::DEBUG) {
let formatted = format!(" - {msg}");
tracing::trace!("exec[{}]: {formatted}", self.id);
self.ops.push(formatted);
}
}
}
impl Display for ExecutionCtx {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "exec[{}]", self.id)
}
}
impl Drop for ExecutionCtx {
fn drop(&mut self) {
if !self.ops.is_empty() && tracing::enabled!(tracing::Level::DEBUG) {
struct FmtOps<'a>(&'a [String]);
impl Display for FmtOps<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
for (i, op) in self.0.iter().enumerate() {
if i > 0 {
f.write_str("\n")?;
}
f.write_str(op)?;
}
Ok(())
}
}
tracing::debug!("exec[{}] trace:\n{}", self.id, FmtOps(&self.ops));
}
}
}
impl Executable for ArrayRef {
fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self> {
if let Some(canonical) = array.as_opt::<AnyCanonical>() {
ctx.log(format_args!("-> canonical {}", array));
return Ok(Canonical::from(canonical).into_array());
}
if let Some(reduced) = array.reduce()? {
ctx.log(format_args!("reduce: rewrote {} -> {}", array, reduced));
reduced.statistics().inherit_from(array.statistics());
return Ok(reduced);
}
for (slot_idx, slot) in array.slots().iter().enumerate() {
let Some(child) = slot else {
continue;
};
if let Some(reduced_parent) = child.reduce_parent(&array, slot_idx)? {
ctx.log(format_args!(
"reduce_parent: slot[{}]({}) rewrote {} -> {}",
slot_idx,
child.encoding_id(),
array,
reduced_parent
));
reduced_parent.statistics().inherit_from(array.statistics());
return Ok(reduced_parent);
}
}
for (slot_idx, slot) in array.slots().iter().enumerate() {
let Some(child) = slot else {
continue;
};
if let Some(executed_parent) = child.execute_parent(&array, slot_idx, ctx)? {
ctx.log(format_args!(
"execute_parent: slot[{}]({}) rewrote {} -> {}",
slot_idx,
child.encoding_id(),
array,
executed_parent
));
executed_parent
.statistics()
.inherit_from(array.statistics());
return Ok(executed_parent);
}
}
ctx.log(format_args!("executing {}", array));
let result = execute_step(array, ctx)?;
let (array, step) = result.into_parts();
match step {
ExecutionStep::Done => {
ctx.log(format_args!("-> {}", array));
Ok(array)
}
ExecutionStep::ExecuteSlot(i, _) => {
let child = array.slots()[i].clone().vortex_expect("valid slot index");
let executed_child = child.execute::<ArrayRef>(ctx)?;
array.with_slot(i, executed_child)
}
}
}
}
fn execute_step(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
array.execute_encoding(ctx)
}
fn try_execute_parent(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Option<ArrayRef>> {
for (slot_idx, slot) in array.slots().iter().enumerate() {
let Some(child) = slot else {
continue;
};
if let Some(result) = child.execute_parent(array, slot_idx, ctx)? {
result.statistics().inherit_from(array.statistics());
return Ok(Some(result));
}
}
Ok(None)
}
pub type DonePredicate = fn(&ArrayRef) -> bool;
pub enum ExecutionStep {
ExecuteSlot(usize, DonePredicate),
Done,
}
impl fmt::Debug for ExecutionStep {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ExecutionStep::ExecuteSlot(idx, _) => f.debug_tuple("ExecuteSlot").field(idx).finish(),
ExecutionStep::Done => write!(f, "Done"),
}
}
}
pub struct ExecutionResult {
array: ArrayRef,
step: ExecutionStep,
}
impl ExecutionResult {
pub fn done(result: impl IntoArray) -> Self {
Self {
array: result.into_array(),
step: ExecutionStep::Done,
}
}
pub fn execute_slot<M: Matcher>(array: impl IntoArray, slot_idx: usize) -> Self {
Self {
array: array.into_array(),
step: ExecutionStep::ExecuteSlot(slot_idx, M::matches),
}
}
pub fn array(&self) -> &ArrayRef {
&self.array
}
pub fn step(&self) -> &ExecutionStep {
&self.step
}
pub fn into_parts(self) -> (ArrayRef, ExecutionStep) {
(self.array, self.step)
}
}
impl fmt::Debug for ExecutionResult {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ExecutionResult")
.field("array", &self.array)
.field("step", &self.step)
.finish()
}
}
#[macro_export]
macro_rules! require_child {
($parent:expr, $child:expr, $idx:expr => $M:ty) => {{
if !$child.is::<$M>() {
return Ok($crate::ExecutionResult::execute_slot::<$M>(
$parent.clone(),
$idx,
));
}
$parent
}};
}
#[macro_export]
macro_rules! require_opt_child {
($parent:expr, $child_opt:expr, $idx:expr => $M:ty) => {
if $child_opt.is_some_and(|child| !child.is::<$M>()) {
return Ok($crate::ExecutionResult::execute_slot::<$M>($parent, $idx));
}
};
}
#[macro_export]
macro_rules! require_patches {
($parent:expr, $indices_slot:expr, $values_slot:expr, $chunk_offsets_slot:expr) => {
$crate::require_opt_child!(
$parent,
$parent.slots()[$indices_slot].as_ref(),
$indices_slot => $crate::arrays::Primitive
);
$crate::require_opt_child!(
$parent,
$parent.slots()[$values_slot].as_ref(),
$values_slot => $crate::arrays::Primitive
);
$crate::require_opt_child!(
$parent,
$parent.slots()[$chunk_offsets_slot].as_ref(),
$chunk_offsets_slot => $crate::arrays::Primitive
);
};
}
#[macro_export]
macro_rules! require_validity {
($parent:expr, $idx:expr) => {
$crate::require_opt_child!(
$parent,
$parent.slots()[$idx].as_ref(),
$idx => $crate::arrays::Bool
);
};
}
pub trait VortexSessionExecute {
fn create_execution_ctx(&self) -> ExecutionCtx;
}
impl VortexSessionExecute for VortexSession {
fn create_execution_ctx(&self) -> ExecutionCtx {
ExecutionCtx::new(self.clone())
}
}