use std::marker::PhantomData;
use timely_communication::{Allocate, Push, Pull, Data};
use timely_communication::allocator::Thread;
use timely_communication::allocator::thread::Pusher as ThreadPusher;
use timely_communication::allocator::thread::Puller as ThreadPuller;
use dataflow::channels::pushers::Exchange as ExchangePusher;
use dataflow::channels::{Message, Content};
use logging::Logger;
use abomonation::Abomonation;
pub trait ParallelizationContract<T: 'static, D: 'static> {
type Pusher: Push<(T, Content<D>)>+'static;
type Puller: Pull<(T, Content<D>)>+'static;
fn connect<A: Allocate>(self, allocator: &mut A, identifier: usize, logging: Logger) -> (Self::Pusher, Self::Puller);
}
pub struct Pipeline;
impl<T: 'static, D: 'static> ParallelizationContract<T, D> for Pipeline {
type Pusher = Pusher<T, D, ThreadPusher<Message<T, D>>>;
type Puller = Puller<T, D, ThreadPuller<Message<T, D>>>;
fn connect<A: Allocate>(self, allocator: &mut A, identifier: usize, logging: Logger) -> (Self::Pusher, Self::Puller) {
let (pusher, puller) = Thread::new::<Message<T, D>>();
(Pusher::new(pusher, allocator.index(), allocator.index(), identifier, None, logging.clone()),
Puller::new(puller, allocator.index(), identifier, None, logging.clone()))
}
}
pub struct Exchange<D, F: Fn(&D)->u64+'static> { hash_func: F, phantom: PhantomData<D>, }
impl<D, F: Fn(&D)->u64> Exchange<D, F> {
pub fn new(func: F) -> Exchange<D, F> {
Exchange {
hash_func: func,
phantom: PhantomData,
}
}
}
impl<T: Eq+Data+Abomonation+Clone, D: Data+Abomonation+Clone, F: Fn(&D)->u64+'static> ParallelizationContract<T, D> for Exchange<D, F> {
type Pusher = Box<Push<(T, Content<D>)>>;
type Puller = Puller<T, D, Box<Pull<Message<T, D>>>>;
fn connect<A: Allocate>(self, allocator: &mut A, identifier: usize, logging: Logger) -> (Self::Pusher, Self::Puller) {
let (senders, receiver, channel_id) = allocator.allocate::<Message<T, D>>();
let senders = senders.into_iter().enumerate().map(|(i,x)| Pusher::new(x, allocator.index(), i, identifier, channel_id, logging.clone())).collect::<Vec<_>>();
(Box::new(ExchangePusher::new(senders, move |_, d| (self.hash_func)(d))), Puller::new(receiver, allocator.index(), identifier, channel_id, logging.clone()))
}
}
pub struct TimeExchange<D, T, F: Fn(&T, &D)->u64+'static> { hash_func: F, phantom: PhantomData<(T, D)>, }
impl<D, T, F: Fn(&T, &D)->u64> TimeExchange<D, T, F> {
pub fn new(func: F) -> TimeExchange<D, T, F> {
TimeExchange {
hash_func: func,
phantom: PhantomData,
}
}
}
impl<T: Eq+Data+Abomonation+Clone, D: Data+Abomonation+Clone, F: Fn(&T, &D)->u64+'static> ParallelizationContract<T, D> for TimeExchange<D, T, F> {
type Pusher = ExchangePusher<T, D, Pusher<T, D, Box<Push<Message<T, D>>>>, F>;
type Puller = Puller<T, D, Box<Pull<Message<T, D>>>>;
fn connect<A: Allocate>(self, allocator: &mut A, identifier: usize, logging: Logger) -> (Self::Pusher, Self::Puller) {
let (senders, receiver, channel_id) = allocator.allocate::<Message<T, D>>();
let senders = senders.into_iter().enumerate().map(|(i,x)| Pusher::new(x, allocator.index(), i, identifier, channel_id, logging.clone())).collect::<Vec<_>>();
(ExchangePusher::new(senders, self.hash_func), Puller::new(receiver, allocator.index(), identifier, channel_id, logging.clone()))
}
}
pub struct Pusher<T, D, P: Push<Message<T, D>>> {
pusher: P,
channel: usize,
comm_channel: Option<usize>,
counter: usize,
source: usize,
target: usize,
phantom: ::std::marker::PhantomData<(T, D)>,
logging: Logger,
}
impl<T, D, P: Push<Message<T, D>>> Pusher<T, D, P> {
pub fn new(pusher: P, source: usize, target: usize, channel: usize, comm_channel: Option<usize>, logging: Logger) -> Self {
Pusher {
pusher: pusher,
channel: channel,
comm_channel: comm_channel,
counter: 0,
source: source,
target: target,
phantom: ::std::marker::PhantomData,
logging: logging,
}
}
}
impl<T, D, P: Push<Message<T, D>>> Push<(T, Content<D>)> for Pusher<T, D, P> {
#[inline(always)]
fn push(&mut self, pair: &mut Option<(T, Content<D>)>) {
if let Some((time, data)) = pair.take() {
let length = data.len();
let counter = self.counter;
let mut message = Some(Message::new(time, data, self.source, self.counter));
self.counter += 1;
self.pusher.push(&mut message);
*pair = message.map(|x| (x.time, x.data));
self.logging.when_enabled(|l| l.log(::logging::TimelyEvent::Messages(::logging::MessagesEvent {
is_send: true,
channel: self.channel,
comm_channel: self.comm_channel,
source: self.source,
target: self.target,
seq_no: counter,
length: length,
})));
}
else { self.pusher.done(); }
}
}
pub struct Puller<T, D, P: Pull<Message<T, D>>> {
puller: P,
current: Option<(T, Content<D>)>,
channel: usize,
comm_channel: Option<usize>,
counter: usize,
index: usize,
logging: Logger,
}
impl<T, D, P: Pull<Message<T, D>>> Puller<T, D, P> {
pub fn new(puller: P, index: usize, channel: usize, comm_channel: Option<usize>, logging: Logger) -> Self {
Puller {
puller: puller,
channel: channel,
comm_channel: comm_channel,
current: None,
counter: 0,
index: index,
logging: logging,
}
}
}
impl<T, D, P: Pull<Message<T, D>>> Pull<(T, Content<D>)> for Puller<T, D, P> {
#[inline(always)]
fn pull(&mut self) -> &mut Option<(T, Content<D>)> {
let mut previous = self.current.take().map(|(time, data)| Message::new(time, data, self.index, self.counter));
self.counter += 1;
::std::mem::swap(&mut previous, self.puller.pull());
if let Some(message) = previous.as_ref() {
self.logging.when_enabled(|l| l.log(::logging::TimelyEvent::Messages(::logging::MessagesEvent {
is_send: false,
channel: self.channel,
comm_channel: self.comm_channel,
source: message.from,
target: self.index,
seq_no: message.seq,
length: message.data.len(),
})));
}
self.current = previous.map(|message| (message.time, message.data));
&mut self.current
}
}