use std::env::VarError;
use std::fmt;
use std::fmt::Display;
use std::sync::LazyLock;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use vortex_error::vortex_ensure;
use vortex_error::vortex_panic;
use vortex_session::VortexSession;
use crate::AnyCanonical;
use crate::ArrayRef;
use crate::Canonical;
use crate::IntoArray;
use crate::array::ArrayId;
use crate::builders::ArrayBuilder;
use crate::builders::builder_with_capacity_in;
use crate::dtype::DType;
use crate::matcher::Matcher;
use crate::memory::HostAllocatorRef;
use crate::memory::MemorySessionExt;
use crate::optimizer::ArrayOptimizer;
use crate::stats::ArrayStats;
use crate::stats::StatsSet;
pub(crate) fn max_iterations() -> usize {
static MAX_ITERATIONS: LazyLock<usize> =
LazyLock::new(|| match std::env::var("VORTEX_MAX_ITERATIONS") {
Ok(val) => val.parse::<usize>().unwrap_or_else(|e| {
vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid usize: {e}")
}),
Err(VarError::NotPresent) => 2 << 21, Err(VarError::NotUnicode(_)) => {
vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid unicode string")
}
});
*MAX_ITERATIONS
}
pub trait Executable: Sized {
fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self>;
}
#[expect(clippy::same_name_method)]
impl ArrayRef {
pub fn execute<E: Executable>(self, ctx: &mut ExecutionCtx) -> VortexResult<E> {
E::execute(self, ctx)
}
pub fn execute_as<E: Executable>(
self,
_name: &'static str,
ctx: &mut ExecutionCtx,
) -> VortexResult<E> {
E::execute(self, ctx)
}
pub fn execute_until<M: Matcher>(self, ctx: &mut ExecutionCtx) -> VortexResult<ArrayRef> {
let mut current_array = self;
let mut current_builder: Option<Box<dyn ArrayBuilder>> = None;
let mut stack: Vec<StackFrame> = Vec::new();
let max_iterations = max_iterations();
for _ in 0..max_iterations {
let is_done = stack
.last()
.map_or(M::matches as DonePredicate, |frame| frame.done);
if is_done(¤t_array) || AnyCanonical::matches(¤t_array) {
match stack.pop() {
None => {
debug_assert!(
current_builder.is_none(),
"root activation should not retain a builder"
);
ctx.log(format_args!("-> {}", current_array));
return Ok(current_array);
}
Some(frame) => {
(current_array, current_builder) = pop_frame(frame, current_array)?;
continue;
}
}
}
if current_builder.is_none()
&& let Some(frame) = stack.last()
&& let Some(result) =
current_array.execute_parent(&frame.parent_array, frame.slot_idx, ctx)?
{
ctx.log(format_args!(
"execute_parent (stack) rewrote {} -> {}",
current_array, result
));
let frame = stack.pop().vortex_expect("just peeked");
current_array = result.optimize_ctx(ctx.session())?;
current_builder = frame.parent_builder;
continue;
}
if current_builder.is_none()
&& let Some(rewritten) = try_execute_parent(¤t_array, ctx)?
{
ctx.log(format_args!(
"execute_parent rewrote {} -> {}",
current_array, rewritten
));
current_array = rewritten.optimize_ctx(ctx.session())?;
continue;
}
let expected_len = current_array.len();
let expected_dtype = current_array.dtype().clone();
let stats = current_array.statistics().to_array_stats();
let encoding_id = current_array.encoding_id();
let result = current_array.execute_encoding_unchecked(ctx)?;
let (array, step) = result.into_parts();
match step {
ExecutionStep::ExecuteSlot(i, done) => {
let (parent, child) = unsafe { array.take_slot_unchecked(i) }?;
ctx.log(format_args!(
"ExecuteSlot({i}): pushing {}, focusing on {}",
parent, child
));
stack.push(StackFrame {
parent_array: parent,
parent_builder: current_builder.take(),
slot_idx: i,
done,
original_dtype: child.dtype().clone(),
original_len: child.len(),
});
current_array = child;
current_builder = None;
}
ExecutionStep::AppendChild(i) => {
if current_builder.is_none() {
current_builder = Some(builder_with_capacity_in(
ctx.allocator(),
array.dtype(),
array.len(),
));
}
let (parent, child) = unsafe { array.take_slot_unchecked(i) }?;
ctx.log(format_args!(
"AppendChild({i}): appending {} into builder",
child
));
child.append_to_builder(
current_builder
.as_deref_mut()
.vortex_expect("builder must exist"),
ctx,
)?;
current_array = parent;
}
ExecutionStep::Done => {
ctx.log(format_args!("Done: {}", array));
(current_array, current_builder) = finalize_done(
array,
current_builder,
expected_len,
expected_dtype,
stats,
encoding_id,
)?;
}
}
}
vortex_bail!(
"Exceeded maximum execution iterations ({}) while executing array",
max_iterations,
)
}
}
struct StackFrame {
parent_array: ArrayRef,
parent_builder: Option<Box<dyn ArrayBuilder>>,
slot_idx: usize,
done: DonePredicate,
original_dtype: DType,
original_len: usize,
}
#[derive(Debug, Clone)]
pub struct ExecutionCtx {
session: VortexSession,
#[cfg(debug_assertions)]
id: usize,
#[cfg(debug_assertions)]
ops: Vec<String>,
}
impl ExecutionCtx {
pub fn new(session: VortexSession) -> Self {
Self {
session,
#[cfg(debug_assertions)]
id: {
static EXEC_CTX_ID: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
EXEC_CTX_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
},
#[cfg(debug_assertions)]
ops: Vec::new(),
}
}
pub fn session(&self) -> &VortexSession {
&self.session
}
pub fn allocator(&self) -> HostAllocatorRef {
self.session.allocator()
}
pub fn log(&mut self, msg: fmt::Arguments<'_>) {
#[cfg(debug_assertions)]
if tracing::enabled!(tracing::Level::DEBUG) {
let formatted = format!(" - {msg}");
tracing::trace!("exec[{}]: {formatted}", self.id);
self.ops.push(formatted);
}
let _ = msg;
}
}
impl Display for ExecutionCtx {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
#[cfg(debug_assertions)]
return write!(f, "exec[{}]", self.id);
#[cfg(not(debug_assertions))]
write!(f, "exec")
}
}
#[cfg(debug_assertions)]
impl Drop for ExecutionCtx {
fn drop(&mut self) {
if !self.ops.is_empty() && tracing::enabled!(tracing::Level::DEBUG) {
struct FmtOps<'a>(&'a [String]);
impl Display for FmtOps<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
for (i, op) in self.0.iter().enumerate() {
if i > 0 {
f.write_str("\n")?;
}
f.write_str(op)?;
}
Ok(())
}
}
tracing::debug!("exec[{}] trace:\n{}", self.id, FmtOps(&self.ops));
}
}
}
impl Executable for ArrayRef {
fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self> {
if let Some(canonical) = array.as_opt::<AnyCanonical>() {
ctx.log(format_args!("-> canonical {}", array));
return Ok(Canonical::from(canonical).into_array());
}
if let Some(reduced) = array.reduce()? {
ctx.log(format_args!("reduce: rewrote {} -> {}", array, reduced));
reduced.statistics().inherit_from(array.statistics());
return Ok(reduced);
}
for (slot_idx, slot) in array.slots().iter().enumerate() {
let Some(child) = slot else { continue };
if let Some(reduced_parent) = child.reduce_parent(&array, slot_idx)? {
ctx.log(format_args!(
"reduce_parent: slot[{}]({}) rewrote {} -> {}",
slot_idx,
child.encoding_id(),
array,
reduced_parent
));
reduced_parent.statistics().inherit_from(array.statistics());
return Ok(reduced_parent);
}
}
for (slot_idx, slot) in array.slots().iter().enumerate() {
let Some(child) = slot else { continue };
if let Some(executed_parent) = child.execute_parent(&array, slot_idx, ctx)? {
ctx.log(format_args!(
"execute_parent: slot[{}]({}) rewrote {} -> {}",
slot_idx,
child.encoding_id(),
array,
executed_parent
));
executed_parent
.statistics()
.inherit_from(array.statistics());
return Ok(executed_parent);
}
}
ctx.log(format_args!("executing {}", array));
let result = array.execute_encoding(ctx)?;
let (array, step) = result.into_parts();
match step {
ExecutionStep::Done => {
ctx.log(format_args!("-> {}", array));
Ok(array)
}
ExecutionStep::ExecuteSlot(i, _) => {
let child = array.slots()[i].clone().vortex_expect("valid slot index");
let executed_child = child.execute::<ArrayRef>(ctx)?;
array.with_slot(i, executed_child)
}
ExecutionStep::AppendChild(_) => {
let builder = builder_with_capacity_in(ctx.allocator(), array.dtype(), array.len());
let mut builder = execute_into_builder(array, builder, ctx)?;
Ok(builder.finish())
}
}
}
}
pub fn execute_into_builder(
array: ArrayRef,
mut builder: Box<dyn ArrayBuilder>,
ctx: &mut ExecutionCtx,
) -> VortexResult<Box<dyn ArrayBuilder>> {
array.append_to_builder(builder.as_mut(), ctx)?;
Ok(builder)
}
fn pop_frame(
frame: StackFrame,
child: ArrayRef,
) -> VortexResult<(ArrayRef, Option<Box<dyn ArrayBuilder>>)> {
debug_assert_eq!(
child.dtype(),
&frame.original_dtype,
"child dtype changed during execution"
);
debug_assert_eq!(
child.len(),
frame.original_len,
"child len changed during execution"
);
let parent_array = unsafe { frame.parent_array.put_slot_unchecked(frame.slot_idx, child) }?;
Ok((parent_array, frame.parent_builder))
}
fn finalize_done(
result: ArrayRef,
mut builder: Option<Box<dyn ArrayBuilder>>,
expected_len: usize,
expected_dtype: DType,
stats: ArrayStats,
encoding_id: ArrayId,
) -> VortexResult<(ArrayRef, Option<Box<dyn ArrayBuilder>>)> {
let output = if let Some(mut builder) = builder.take() {
builder.finish()
} else {
result
};
if cfg!(debug_assertions) {
vortex_ensure!(
output.len() == expected_len,
"Result length mismatch for {:?}",
encoding_id
);
vortex_ensure!(
output.dtype() == &expected_dtype,
"Executed canonical dtype mismatch for {:?}",
encoding_id
);
}
output
.statistics()
.set_iter(StatsSet::from(stats).into_iter());
Ok((output, None))
}
fn try_execute_parent(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Option<ArrayRef>> {
for (slot_idx, slot) in array.slots().iter().enumerate() {
let Some(child) = slot else {
continue;
};
if let Some(result) = child.execute_parent(array, slot_idx, ctx)? {
result.statistics().inherit_from(array.statistics());
return Ok(Some(result));
}
}
Ok(None)
}
pub type DonePredicate = fn(&ArrayRef) -> bool;
pub enum ExecutionStep {
ExecuteSlot(usize, DonePredicate),
AppendChild(usize),
Done,
}
impl fmt::Debug for ExecutionStep {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ExecutionStep::ExecuteSlot(idx, _) => f.debug_tuple("ExecuteSlot").field(idx).finish(),
ExecutionStep::AppendChild(idx) => f.debug_tuple("AppendChild").field(idx).finish(),
ExecutionStep::Done => write!(f, "Done"),
}
}
}
pub struct ExecutionResult {
array: ArrayRef,
step: ExecutionStep,
}
impl ExecutionResult {
pub fn done(result: impl IntoArray) -> Self {
Self {
array: result.into_array(),
step: ExecutionStep::Done,
}
}
pub fn execute_slot<M: Matcher>(array: impl IntoArray, slot_idx: usize) -> Self {
Self {
array: array.into_array(),
step: ExecutionStep::ExecuteSlot(slot_idx, M::matches),
}
}
pub fn append_child(array: impl IntoArray, slot_idx: usize) -> Self {
Self {
array: array.into_array(),
step: ExecutionStep::AppendChild(slot_idx),
}
}
pub fn array(&self) -> &ArrayRef {
&self.array
}
pub fn step(&self) -> &ExecutionStep {
&self.step
}
pub fn into_parts(self) -> (ArrayRef, ExecutionStep) {
(self.array, self.step)
}
}
impl fmt::Debug for ExecutionResult {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ExecutionResult")
.field("array", &self.array)
.field("step", &self.step)
.finish()
}
}
#[macro_export]
macro_rules! require_child {
($parent:expr, $child:expr, $idx:expr => $M:ty) => {{
if !$child.is::<$M>() {
return Ok($crate::ExecutionResult::execute_slot::<$M>(
$parent.clone(),
$idx,
));
}
$parent
}};
}
#[macro_export]
macro_rules! require_opt_child {
($parent:expr, $child_opt:expr, $idx:expr => $M:ty) => {
if $child_opt.is_some_and(|child| !child.is::<$M>()) {
return Ok($crate::ExecutionResult::execute_slot::<$M>($parent, $idx));
}
};
}
#[macro_export]
macro_rules! require_patches {
($parent:expr, $indices_slot:expr, $values_slot:expr, $chunk_offsets_slot:expr) => {
$crate::require_opt_child!(
$parent,
$parent.slots()[$indices_slot].as_ref(),
$indices_slot => $crate::arrays::Primitive
);
$crate::require_opt_child!(
$parent,
$parent.slots()[$values_slot].as_ref(),
$values_slot => $crate::arrays::Primitive
);
$crate::require_opt_child!(
$parent,
$parent.slots()[$chunk_offsets_slot].as_ref(),
$chunk_offsets_slot => $crate::arrays::Primitive
);
};
}
#[macro_export]
macro_rules! require_validity {
($parent:expr, $idx:expr) => {
$crate::require_opt_child!(
$parent,
$parent.slots()[$idx].as_ref(),
$idx => $crate::arrays::Bool
);
};
}
pub trait VortexSessionExecute {
fn create_execution_ctx(&self) -> ExecutionCtx;
}
impl VortexSessionExecute for VortexSession {
fn create_execution_ctx(&self) -> ExecutionCtx {
ExecutionCtx::new(self.clone())
}
}