use std::marker::PhantomData;
use crate::communication::{Push, Pull, Data};
use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
use crate::worker::AsWorker;
use crate::dataflow::channels::pushers::Exchange as ExchangePusher;
use super::{Bundle, Message};
use crate::logging::TimelyLogger as Logger;
pub trait ParallelizationContract<T: 'static, D: 'static> {
type Pusher: Push<Bundle<T, D>>+'static;
type Puller: Pull<Bundle<T, D>>+'static;
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option<Logger>) -> (Self::Pusher, Self::Puller);
}
pub struct Pipeline;
impl<T: 'static, D: 'static> ParallelizationContract<T, D> for Pipeline {
type Pusher = LogPusher<T, D, ThreadPusher<Bundle<T, D>>>;
type Puller = LogPuller<T, D, ThreadPuller<Bundle<T, D>>>;
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
let (pusher, puller) = allocator.pipeline::<Message<T, D>>(identifier, address);
(LogPusher::new(pusher, allocator.index(), allocator.index(), identifier, logging.clone()),
LogPuller::new(puller, allocator.index(), identifier, logging.clone()))
}
}
pub struct Exchange<D, F: FnMut(&D)->u64+'static> { hash_func: F, phantom: PhantomData<D>, }
impl<D, F: FnMut(&D)->u64> Exchange<D, F> {
pub fn new(func: F) -> Exchange<D, F> {
Exchange {
hash_func: func,
phantom: PhantomData,
}
}
}
impl<T: Eq+Data+Clone, D: Data+Clone, F: FnMut(&D)->u64+'static> ParallelizationContract<T, D> for Exchange<D, F> {
type Pusher = Box<dyn Push<Bundle<T, D>>>;
type Puller = Box<dyn Pull<Bundle<T, D>>>;
fn connect<A: AsWorker>(mut self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
let (senders, receiver) = allocator.allocate::<Message<T, D>>(identifier, address);
let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::<Vec<_>>();
(Box::new(ExchangePusher::new(senders, move |_, d| (self.hash_func)(d))), Box::new(LogPuller::new(receiver, allocator.index(), identifier, logging.clone())))
}
}
pub struct LogPusher<T, D, P: Push<Bundle<T, D>>> {
pusher: P,
channel: usize,
counter: usize,
source: usize,
target: usize,
phantom: ::std::marker::PhantomData<(T, D)>,
logging: Option<Logger>,
}
impl<T, D, P: Push<Bundle<T, D>>> LogPusher<T, D, P> {
pub fn new(pusher: P, source: usize, target: usize, channel: usize, logging: Option<Logger>) -> Self {
LogPusher {
pusher,
channel,
counter: 0,
source,
target,
phantom: ::std::marker::PhantomData,
logging,
}
}
}
impl<T, D, P: Push<Bundle<T, D>>> Push<Bundle<T, D>> for LogPusher<T, D, P> {
#[inline]
fn push(&mut self, pair: &mut Option<Bundle<T, D>>) {
if let Some(bundle) = pair {
self.counter += 1;
if let Some(message) = bundle.if_mut() {
message.seq = self.counter-1;
message.from = self.source;
}
self.logging.as_ref().map(|l| l.log(crate::logging::MessagesEvent {
is_send: true,
channel: self.channel,
source: self.source,
target: self.target,
seq_no: self.counter-1,
length: bundle.data.len(),
}));
}
self.pusher.push(pair);
}
}
pub struct LogPuller<T, D, P: Pull<Bundle<T, D>>> {
puller: P,
channel: usize,
index: usize,
phantom: ::std::marker::PhantomData<(T, D)>,
logging: Option<Logger>,
}
impl<T, D, P: Pull<Bundle<T, D>>> LogPuller<T, D, P> {
pub fn new(puller: P, index: usize, channel: usize, logging: Option<Logger>) -> Self {
LogPuller {
puller,
channel,
index,
phantom: ::std::marker::PhantomData,
logging,
}
}
}
impl<T, D, P: Pull<Bundle<T, D>>> Pull<Bundle<T, D>> for LogPuller<T, D, P> {
#[inline]
fn pull(&mut self) -> &mut Option<Bundle<T,D>> {
let result = self.puller.pull();
if let Some(bundle) = result {
let channel = self.channel;
let target = self.index;
self.logging.as_ref().map(|l| l.log(crate::logging::MessagesEvent {
is_send: false,
channel,
source: bundle.from,
target,
seq_no: bundle.seq,
length: bundle.data.len(),
}));
}
result
}
}