1use std::{fmt::{self, Debug}, marker::PhantomData};
11use timely_container::PushPartitioned;
12
13use crate::communication::{Push, Pull, Data};
14use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
15use crate::Container;
16
17use crate::worker::AsWorker;
18use crate::dataflow::channels::pushers::Exchange as ExchangePusher;
19use super::{BundleCore, Message};
20
21use crate::logging::{TimelyLogger as Logger, MessagesEvent};
22use crate::progress::Timestamp;
23
24pub trait ParallelizationContractCore<T, D> {
26 type Pusher: Push<BundleCore<T, D>>+'static;
28 type Puller: Pull<BundleCore<T, D>>+'static;
30 fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option<Logger>) -> (Self::Pusher, Self::Puller);
32}
33
34pub trait ParallelizationContract<T, D: Clone>: ParallelizationContractCore<T, Vec<D>> { }
37impl<T, D: Clone, P: ParallelizationContractCore<T, Vec<D>>> ParallelizationContract<T, D> for P { }
38
39#[derive(Debug)]
41pub struct Pipeline;
42
43impl<T: 'static, D: Container> ParallelizationContractCore<T, D> for Pipeline {
44 type Pusher = LogPusher<T, D, ThreadPusher<BundleCore<T, D>>>;
45 type Puller = LogPuller<T, D, ThreadPuller<BundleCore<T, D>>>;
46 fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
47 let (pusher, puller) = allocator.pipeline::<Message<T, D>>(identifier, address);
48 (LogPusher::new(pusher, allocator.index(), allocator.index(), identifier, logging.clone()),
51 LogPuller::new(puller, allocator.index(), identifier, logging))
52 }
53}
54
55pub struct ExchangeCore<C, D, F> { hash_func: F, phantom: PhantomData<(C, D)> }
57
58pub type Exchange<D, F> = ExchangeCore<Vec<D>, D, F>;
60
61impl<C, D, F: FnMut(&D)->u64+'static> ExchangeCore<C, D, F> {
62 pub fn new(func: F) -> ExchangeCore<C, D, F> {
64 ExchangeCore {
65 hash_func: func,
66 phantom: PhantomData,
67 }
68 }
69}
70
71impl<T: Timestamp, C, D: Data+Clone, F: FnMut(&D)->u64+'static> ParallelizationContractCore<T, C> for ExchangeCore<C, D, F>
73where
74 C: Data + Container + PushPartitioned<Item=D>,
75{
76 type Pusher = ExchangePusher<T, C, D, LogPusher<T, C, Box<dyn Push<BundleCore<T, C>>>>, F>;
77 type Puller = LogPuller<T, C, Box<dyn Pull<BundleCore<T, C>>>>;
78
79 fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
80 let (senders, receiver) = allocator.allocate::<Message<T, C>>(identifier, address);
81 let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::<Vec<_>>();
82 (ExchangePusher::new(senders, self.hash_func), LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))
83 }
84}
85
86impl<C, D, F> Debug for ExchangeCore<C, D, F> {
87 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88 f.debug_struct("Exchange").finish()
89 }
90}
91
92#[derive(Debug)]
94pub struct LogPusher<T, D, P: Push<BundleCore<T, D>>> {
95 pusher: P,
96 channel: usize,
97 counter: usize,
98 source: usize,
99 target: usize,
100 phantom: PhantomData<(T, D)>,
101 logging: Option<Logger>,
102}
103
104impl<T, D, P: Push<BundleCore<T, D>>> LogPusher<T, D, P> {
105 pub fn new(pusher: P, source: usize, target: usize, channel: usize, logging: Option<Logger>) -> Self {
107 LogPusher {
108 pusher,
109 channel,
110 counter: 0,
111 source,
112 target,
113 phantom: PhantomData,
114 logging,
115 }
116 }
117}
118
119impl<T, D: Container, P: Push<BundleCore<T, D>>> Push<BundleCore<T, D>> for LogPusher<T, D, P> {
120 #[inline]
121 fn push(&mut self, pair: &mut Option<BundleCore<T, D>>) {
122 if let Some(bundle) = pair {
123 self.counter += 1;
124
125 if let Some(message) = bundle.if_mut() {
128 message.seq = self.counter - 1;
129 message.from = self.source;
130 }
131
132 if let Some(logger) = self.logging.as_ref() {
133 logger.log(MessagesEvent {
134 is_send: true,
135 channel: self.channel,
136 source: self.source,
137 target: self.target,
138 seq_no: self.counter - 1,
139 length: bundle.data.len(),
140 })
141 }
142 }
143
144 self.pusher.push(pair);
145 }
146}
147
148#[derive(Debug)]
150pub struct LogPuller<T, D, P: Pull<BundleCore<T, D>>> {
151 puller: P,
152 channel: usize,
153 index: usize,
154 phantom: PhantomData<(T, D)>,
155 logging: Option<Logger>,
156}
157
158impl<T, D, P: Pull<BundleCore<T, D>>> LogPuller<T, D, P> {
159 pub fn new(puller: P, index: usize, channel: usize, logging: Option<Logger>) -> Self {
161 LogPuller {
162 puller,
163 channel,
164 index,
165 phantom: PhantomData,
166 logging,
167 }
168 }
169}
170
171impl<T, D: Container, P: Pull<BundleCore<T, D>>> Pull<BundleCore<T, D>> for LogPuller<T, D, P> {
172 #[inline]
173 fn pull(&mut self) -> &mut Option<BundleCore<T,D>> {
174 let result = self.puller.pull();
175 if let Some(bundle) = result {
176 let channel = self.channel;
177 let target = self.index;
178
179 if let Some(logger) = self.logging.as_ref() {
180 logger.log(MessagesEvent {
181 is_send: false,
182 channel,
183 source: bundle.from,
184 target,
185 seq_no: bundle.seq,
186 length: bundle.data.len(),
187 });
188 }
189 }
190
191 result
192 }
193}