use std::fmt::{Debug, Display, Formatter};
use std::hash::{Hash, Hasher};
use std::sync::Arc;
pub use batcher::BatchMode;
pub(crate) use batcher::*;
pub(crate) use graph_generator::*;
pub(crate) use next_strategy::*;
pub(crate) use structure::*;
use crate::operator::iteration::IterationStateLock;
use crate::operator::Operator;
use crate::scheduler::BlockId;
use crate::CoordUInt;
mod batcher;
mod graph_generator;
mod next_strategy;
pub mod structure;
#[derive(Debug)]
pub(crate) struct Block<OperatorChain>
where
OperatorChain: Operator,
{
pub(crate) id: BlockId,
pub(crate) operators: OperatorChain,
pub(crate) batch_mode: BatchMode,
pub(crate) iteration_ctx: Vec<Arc<IterationStateLock>>,
pub(crate) is_only_one_strategy: bool,
pub(crate) scheduling: Scheduling,
}
impl<OperatorChain> Clone for Block<OperatorChain>
where
OperatorChain: Operator,
{
fn clone(&self) -> Self {
Self {
id: self.id,
operators: self.operators.clone(),
batch_mode: self.batch_mode,
iteration_ctx: self.iteration_ctx.clone(),
is_only_one_strategy: self.is_only_one_strategy,
scheduling: self.scheduling.clone(),
}
}
}
impl<OperatorChain> Block<OperatorChain>
where
OperatorChain: Operator,
{
pub fn add_operator<Op2, GetOp>(self, get_operator: GetOp) -> Block<Op2>
where
Op2: Operator,
GetOp: FnOnce(OperatorChain) -> Op2,
{
Block {
id: self.id,
operators: get_operator(self.operators),
batch_mode: self.batch_mode,
iteration_ctx: self.iteration_ctx,
is_only_one_strategy: false,
scheduling: self.scheduling,
}
}
}
#[derive(Clone, Debug, Default)]
pub(crate) struct Scheduling {
pub(crate) replication: Replication,
}
#[derive(Debug, Clone, Copy, Eq, PartialEq, Default)]
pub enum Replication {
#[default]
Unlimited,
Limited(CoordUInt),
Host,
One,
}
impl Replication {
pub fn new_unlimited() -> Self {
Self::Unlimited
}
pub fn new_limited(size: CoordUInt) -> Self {
assert!(size > 0, "Replication limit must be greater than zero!");
Self::Limited(size)
}
pub fn new_host() -> Self {
Self::Host
}
pub fn new_one() -> Self {
Self::One
}
pub fn is_unlimited(&self) -> bool {
matches!(self, Replication::Unlimited)
}
pub fn intersect(&self, rhs: Self) -> Self {
match (*self, rhs) {
(Replication::One, _) | (_, Replication::One) => Replication::One,
(Replication::Host, _) | (_, Replication::Host) => Replication::Host,
(Replication::Limited(n), Replication::Limited(m)) => Replication::Limited(n.min(m)),
(Replication::Limited(n), _) | (_, Replication::Limited(n)) => Replication::Limited(n),
(Replication::Unlimited, Replication::Unlimited) => Replication::Unlimited,
}
}
pub(crate) fn clamp(&self, n: CoordUInt) -> CoordUInt {
match self {
Replication::Unlimited => n,
Replication::Limited(q) => n.min(*q),
Replication::Host => 1,
Replication::One => 1,
}
}
}
impl<OperatorChain> Block<OperatorChain>
where
OperatorChain: Operator,
{
pub fn new(
id: BlockId,
operators: OperatorChain,
batch_mode: BatchMode,
iteration_ctx: Vec<Arc<IterationStateLock>>,
scheduling: Scheduling,
) -> Self {
Self {
id,
operators,
batch_mode,
iteration_ctx,
is_only_one_strategy: false,
scheduling,
}
}
pub(crate) fn iteration_ctx(&self) -> Vec<*const ()> {
self.iteration_ctx
.iter()
.map(|s| Arc::as_ptr(s) as *const ())
.collect()
}
}
impl<OperatorChain> Display for Block<OperatorChain>
where
OperatorChain: Operator,
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.operators)
}
}
impl Scheduling {
pub(crate) fn replication(&mut self, replication: Replication) {
self.replication = self.replication.intersect(replication);
}
}
pub fn group_by_hash<T: Hash>(item: &T) -> u64 {
let mut hasher = wyhash::WyHash::with_seed(0x0123456789abcdef);
item.hash(&mut hasher);
hasher.finish()
}
pub type CoordHasherBuilder = fxhash::FxBuildHasher;
pub type GroupHasherBuilder = core::hash::BuildHasherDefault<wyhash::WyHash>;