use std::fmt::Debug;
use std::rc::Rc;
use crate::Accountable;
use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
use crate::communication::{Push, Pull};
use crate::dataflow::channels::Message;
use crate::logging::TimelyLogger as Logger;
use crate::worker::Worker;
pub trait ParallelizationContract<T, C> {
type Pusher: Push<Message<T, C>>+'static;
type Puller: Pull<Message<T, C>>+'static;
fn connect(self, worker: &Worker, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller);
}
#[derive(Debug)]
pub struct Pipeline;
impl<T: 'static, C: Accountable + 'static> ParallelizationContract<T, C> for Pipeline {
type Pusher = LogPusher<ThreadPusher<Message<T, C>>>;
type Puller = LogPuller<ThreadPuller<Message<T, C>>>;
fn connect(self, worker: &Worker, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
let (pusher, puller) = worker.pipeline::<Message<T, C>>(identifier, address);
(LogPusher::new(pusher, worker.index(), worker.index(), identifier, logging.clone()),
LogPuller::new(puller, worker.index(), identifier, logging))
}
}
pub use exchange::{ExchangeCore, Exchange};
mod exchange {
use crate::Container;
use crate::container::{DrainContainer, LengthPreservingContainerBuilder, SizableContainer, CapacityContainerBuilder};
use crate::dataflow::channels::pushers::exchange::DrainContainerDistributor;
use super::DistributorPact;
pub type ExchangeCore<CB, F> = DistributorPact<Box<dyn FnOnce(usize) -> DrainContainerDistributor<CB, F>>>;
pub type Exchange<D, F> = ExchangeCore<CapacityContainerBuilder<Vec<D>>, F>;
impl<CB, F> ExchangeCore<CB, F>
where
CB: LengthPreservingContainerBuilder,
CB::Container: DrainContainer,
for<'a> F: FnMut(&<CB::Container as DrainContainer>::Item<'a>)->u64 + 'static
{
pub fn new_core(func: F) -> ExchangeCore<CB, F> {
DistributorPact(Box::new(move |peers| DrainContainerDistributor::new(func, peers)))
}
}
impl<C, F> ExchangeCore<CapacityContainerBuilder<C>, F>
where
C: Container + SizableContainer + DrainContainer,
for<'a> F: FnMut(&C::Item<'a>)->u64 + 'static
{
pub fn new(func: F) -> ExchangeCore<CapacityContainerBuilder<C>, F> {
DistributorPact(Box::new(move |peers| DrainContainerDistributor::new(func, peers)))
}
}
}
pub use distributor::DistributorPact;
mod distributor {
use std::rc::Rc;
use crate::Accountable;
use crate::communication::{Push, Pull};
use crate::dataflow::channels::pushers::{Exchange, exchange::Distributor};
use crate::dataflow::channels::{ContainerBytes, Message};
use crate::logging::TimelyLogger;
use crate::progress::Timestamp;
use crate::worker::Worker;
use super::{ParallelizationContract, LogPusher, LogPuller};
pub struct DistributorPact<B>(pub B);
impl<T, B, C, D> ParallelizationContract<T, C> for DistributorPact<B>
where
T: Timestamp,
B: FnOnce(usize) -> D,
C: Accountable + ContainerBytes + Send + 'static,
D: Distributor<C> + 'static,
{
type Pusher = Exchange<T, LogPusher<Box<dyn Push<Message<T, C>>>>, D>;
type Puller = LogPuller<Box<dyn Pull<Message<T, C>>>>;
fn connect(self, worker: &Worker, identifier: usize, address: Rc<[usize]>, logging: Option<TimelyLogger>) -> (Self::Pusher, Self::Puller) {
let (senders, receiver) = worker.allocate::<Message<T, C>>(identifier, address);
let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, worker.index(), i, identifier, logging.clone())).collect::<Vec<_>>();
let distributor = (self.0)(worker.peers());
(Exchange::new(senders, distributor), LogPuller::new(receiver, worker.index(), identifier, logging.clone()))
}
}
}
pub use push_pull::{LogPusher, LogPuller};
mod push_pull {
use crate::Accountable;
use crate::communication::{Push, Pull};
use crate::dataflow::channels::Message;
use crate::logging::{TimelyLogger as Logger, MessagesEvent};
#[derive(Debug)]
pub struct LogPusher<P> {
pusher: P,
channel: usize,
counter: usize,
source: usize,
target: usize,
logging: Option<Logger>,
}
impl<P> LogPusher<P> {
pub fn new(pusher: P, source: usize, target: usize, channel: usize, logging: Option<Logger>) -> Self {
LogPusher {
pusher,
channel,
counter: 0,
source,
target,
logging,
}
}
}
impl<T, C: Accountable, P: Push<Message<T, C>>> Push<Message<T, C>> for LogPusher<P> {
#[inline]
fn push(&mut self, pair: &mut Option<Message<T, C>>) {
if let Some(bundle) = pair {
self.counter += 1;
bundle.seq = self.counter - 1;
bundle.from = self.source;
if let Some(logger) = self.logging.as_ref() {
logger.log(MessagesEvent {
is_send: true,
channel: self.channel,
source: self.source,
target: self.target,
seq_no: self.counter - 1,
record_count: bundle.data.record_count(),
})
}
}
self.pusher.push(pair);
}
}
#[derive(Debug)]
pub struct LogPuller<P> {
puller: P,
channel: usize,
index: usize,
logging: Option<Logger>,
}
impl<P> LogPuller<P> {
pub fn new(puller: P, index: usize, channel: usize, logging: Option<Logger>) -> Self {
LogPuller {
puller,
channel,
index,
logging,
}
}
}
impl<T, C: Accountable, P: Pull<Message<T, C>>> Pull<Message<T, C>> for LogPuller<P> {
#[inline]
fn pull(&mut self) -> &mut Option<Message<T, C>> {
let result = self.puller.pull();
if let Some(bundle) = result {
let channel = self.channel;
let target = self.index;
if let Some(logger) = self.logging.as_ref() {
logger.log(MessagesEvent {
is_send: false,
channel,
source: bundle.from,
target,
seq_no: bundle.seq,
record_count: bundle.data.record_count(),
});
}
}
result
}
}
}