use std::any::Any;
use std::borrow::Borrow;
use std::hash::BuildHasher;
use std::hash::Hash;
use std::sync::Arc;
use std::sync::LazyLock;
use arc_swap::ArcSwap;
use vortex_error::VortexResult;
use vortex_session::Ref;
use vortex_session::SessionExt;
use vortex_session::SessionVar;
use vortex_session::registry::Id;
use vortex_utils::aliases::DefaultHashBuilder;
use vortex_utils::aliases::hash_map::HashMap;
use crate::ArrayRef;
use crate::ExecutionCtx;
use crate::array::VTable;
use crate::arrays::Struct;
use crate::arrays::struct_::compute::cast::struct_cast_execute_parent;
use crate::arrays::struct_::compute::rules::struct_cast_reduce_parent;
use crate::scalar_fn::ScalarFnVTable;
use crate::scalar_fn::fns::cast::Cast;
static FN_HASHER: LazyLock<DefaultHashBuilder> = LazyLock::new(DefaultHashBuilder::default);
pub type ReduceParentFn =
fn(child: &ArrayRef, parent: &ArrayRef, child_idx: usize) -> VortexResult<Option<ArrayRef>>;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)]
#[repr(transparent)]
struct ReduceParentFnId(u64);
impl From<u64> for ReduceParentFnId {
fn from(id: u64) -> Self {
Self(id)
}
}
impl Borrow<u64> for ReduceParentFnId {
fn borrow(&self) -> &u64 {
&self.0
}
}
pub type ExecuteParentFn = fn(
child: &ArrayRef,
parent: &ArrayRef,
child_idx: usize,
ctx: &mut ExecutionCtx,
) -> VortexResult<Option<ArrayRef>>;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)]
#[repr(transparent)]
struct ExecuteParentFnId(u64);
impl From<u64> for ExecuteParentFnId {
fn from(id: u64) -> Self {
Self(id)
}
}
impl Borrow<u64> for ExecuteParentFnId {
fn borrow(&self) -> &u64 {
&self.0
}
}
#[derive(Debug)]
pub struct ArrayKernels {
reduce_parent: ArcSwap<HashMap<ReduceParentFnId, Arc<[ReduceParentFn]>>>,
execute_parent: ArcSwap<HashMap<ExecuteParentFnId, Arc<[ExecuteParentFn]>>>,
}
impl Default for ArrayKernels {
fn default() -> ArrayKernels {
let this = Self::empty();
this.register_builtin_reduce_parent();
this.register_builtin_execute_parent();
this
}
}
impl ArrayKernels {
pub fn empty() -> Self {
Self {
reduce_parent: ArcSwap::from_pointee(HashMap::default()),
execute_parent: ArcSwap::from_pointee(HashMap::default()),
}
}
fn register_builtin_reduce_parent(&self) {
self.register_reduce_parent(
Cast.id(),
Struct.id(),
&[struct_cast_reduce_parent as ReduceParentFn],
);
}
fn register_builtin_execute_parent(&self) {
self.register_execute_parent(
Cast.id(),
Struct.id(),
&[struct_cast_execute_parent as ExecuteParentFn],
);
}
pub fn register_reduce_parent(&self, parent: Id, child: Id, fns: &[ReduceParentFn]) {
self.reduce_parent.rcu(move |registry| {
update_fns(registry.as_ref().clone(), hash_fn_id(parent, child), fns)
});
}
pub fn find_reduce_parent(&self, parent: Id, child: Id) -> Option<Arc<[ReduceParentFn]>> {
let id = hash_fn_id(parent, child);
self.reduce_parent.load().get(&id).cloned()
}
pub fn register_execute_parent(&self, parent: Id, child: Id, fns: &[ExecuteParentFn]) {
self.execute_parent.rcu(move |registry| {
update_fns(registry.as_ref().clone(), hash_fn_id(parent, child), fns)
});
}
pub fn find_execute_parent(&self, parent: Id, child: Id) -> Option<Arc<[ExecuteParentFn]>> {
let id = hash_fn_id(parent, child);
self.execute_parent.load().get(&id).cloned()
}
}
fn hash_fn_id(parent: Id, child: Id) -> u64 {
FN_HASHER.hash_one((parent, child))
}
fn update_fns<F: Clone, K: Borrow<u64> + Eq + Hash + From<u64>>(
mut existing: HashMap<K, Arc<[F]>>,
id: u64,
fns: &[F],
) -> HashMap<K, Arc<[F]>> {
if let Some(existing_fns) = existing.remove(&id) {
existing.insert(
id.into(),
existing_fns.as_ref().iter().chain(fns).cloned().collect(),
);
} else {
existing.insert(id.into(), fns.into());
}
existing
}
impl SessionVar for ArrayKernels {
fn as_any(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
}
pub trait ArrayKernelsExt: SessionExt {
fn kernels(&self) -> Ref<'_, ArrayKernels> {
self.get::<ArrayKernels>()
}
}
impl<S: SessionExt> ArrayKernelsExt for S {}