timely/dataflow/channels/
pact.rs

1//! Parallelization contracts, describing requirements for data movement along dataflow edges.
2//!
3//! Pacts describe how data should be exchanged between workers, and implement a method which
4//! creates a pair of `Push` and `Pull` implementors from an `A: AsWorker`. These two endpoints
5//! respectively distribute and collect data among workers according to the pact.
6//!
7//! The only requirement of a pact is that it not alter the number of `D` records at each time `T`.
8//! The progress tracking logic assumes that this number is independent of the pact used.
9
10use std::{fmt::{self, Debug}, marker::PhantomData};
11use timely_container::PushPartitioned;
12
13use crate::communication::{Push, Pull, Data};
14use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
15use crate::Container;
16
17use crate::worker::AsWorker;
18use crate::dataflow::channels::pushers::Exchange as ExchangePusher;
19use super::{BundleCore, Message};
20
21use crate::logging::{TimelyLogger as Logger, MessagesEvent};
22use crate::progress::Timestamp;
23
24/// A `ParallelizationContractCore` allocates paired `Push` and `Pull` implementors.
25pub trait ParallelizationContractCore<T, D> {
26    /// Type implementing `Push` produced by this pact.
27    type Pusher: Push<BundleCore<T, D>>+'static;
28    /// Type implementing `Pull` produced by this pact.
29    type Puller: Pull<BundleCore<T, D>>+'static;
30    /// Allocates a matched pair of push and pull endpoints implementing the pact.
31    fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option<Logger>) -> (Self::Pusher, Self::Puller);
32}
33
34/// A `ParallelizationContractCore` specialized for `Vec` containers
35/// TODO: Use trait aliases once stable.
36pub trait ParallelizationContract<T, D: Clone>: ParallelizationContractCore<T, Vec<D>> { }
37impl<T, D: Clone, P: ParallelizationContractCore<T, Vec<D>>> ParallelizationContract<T, D> for P { }
38
39/// A direct connection
40#[derive(Debug)]
41pub struct Pipeline;
42
43impl<T: 'static, D: Container> ParallelizationContractCore<T, D> for Pipeline {
44    type Pusher = LogPusher<T, D, ThreadPusher<BundleCore<T, D>>>;
45    type Puller = LogPuller<T, D, ThreadPuller<BundleCore<T, D>>>;
46    fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
47        let (pusher, puller) = allocator.pipeline::<Message<T, D>>(identifier, address);
48        // // ignore `&mut A` and use thread allocator
49        // let (pusher, puller) = Thread::new::<Bundle<T, D>>();
50        (LogPusher::new(pusher, allocator.index(), allocator.index(), identifier, logging.clone()),
51         LogPuller::new(puller, allocator.index(), identifier, logging))
52    }
53}
54
55/// An exchange between multiple observers by data
56pub struct ExchangeCore<C, D, F> { hash_func: F, phantom: PhantomData<(C, D)> }
57
58/// [ExchangeCore] specialized to vector-based containers.
59pub type Exchange<D, F> = ExchangeCore<Vec<D>, D, F>;
60
61impl<C, D, F: FnMut(&D)->u64+'static> ExchangeCore<C, D, F> {
62    /// Allocates a new `Exchange` pact from a distribution function.
63    pub fn new(func: F) -> ExchangeCore<C, D, F> {
64        ExchangeCore {
65            hash_func:  func,
66            phantom:    PhantomData,
67        }
68    }
69}
70
71// Exchange uses a `Box<Pushable>` because it cannot know what type of pushable will return from the allocator.
72impl<T: Timestamp, C, D: Data+Clone, F: FnMut(&D)->u64+'static> ParallelizationContractCore<T, C> for ExchangeCore<C, D, F>
73where
74    C: Data + Container + PushPartitioned<Item=D>,
75{
76    type Pusher = ExchangePusher<T, C, D, LogPusher<T, C, Box<dyn Push<BundleCore<T, C>>>>, F>;
77    type Puller = LogPuller<T, C, Box<dyn Pull<BundleCore<T, C>>>>;
78
79    fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
80        let (senders, receiver) = allocator.allocate::<Message<T, C>>(identifier, address);
81        let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::<Vec<_>>();
82        (ExchangePusher::new(senders, self.hash_func), LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))
83    }
84}
85
86impl<C, D, F> Debug for ExchangeCore<C, D, F> {
87    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88        f.debug_struct("Exchange").finish()
89    }
90}
91
92/// Wraps a `Message<T,D>` pusher to provide a `Push<(T, Content<D>)>`.
93#[derive(Debug)]
94pub struct LogPusher<T, D, P: Push<BundleCore<T, D>>> {
95    pusher: P,
96    channel: usize,
97    counter: usize,
98    source: usize,
99    target: usize,
100    phantom: PhantomData<(T, D)>,
101    logging: Option<Logger>,
102}
103
104impl<T, D, P: Push<BundleCore<T, D>>> LogPusher<T, D, P> {
105    /// Allocates a new pusher.
106    pub fn new(pusher: P, source: usize, target: usize, channel: usize, logging: Option<Logger>) -> Self {
107        LogPusher {
108            pusher,
109            channel,
110            counter: 0,
111            source,
112            target,
113            phantom: PhantomData,
114            logging,
115        }
116    }
117}
118
119impl<T, D: Container, P: Push<BundleCore<T, D>>> Push<BundleCore<T, D>> for LogPusher<T, D, P> {
120    #[inline]
121    fn push(&mut self, pair: &mut Option<BundleCore<T, D>>) {
122        if let Some(bundle) = pair {
123            self.counter += 1;
124
125            // Stamp the sequence number and source.
126            // FIXME: Awkward moment/logic.
127            if let Some(message) = bundle.if_mut() {
128                message.seq = self.counter - 1;
129                message.from = self.source;
130            }
131
132            if let Some(logger) = self.logging.as_ref() {
133                logger.log(MessagesEvent {
134                    is_send: true,
135                    channel: self.channel,
136                    source: self.source,
137                    target: self.target,
138                    seq_no: self.counter - 1,
139                    length: bundle.data.len(),
140                })
141            }
142        }
143
144        self.pusher.push(pair);
145    }
146}
147
148/// Wraps a `Message<T,D>` puller to provide a `Pull<(T, Content<D>)>`.
149#[derive(Debug)]
150pub struct LogPuller<T, D, P: Pull<BundleCore<T, D>>> {
151    puller: P,
152    channel: usize,
153    index: usize,
154    phantom: PhantomData<(T, D)>,
155    logging: Option<Logger>,
156}
157
158impl<T, D, P: Pull<BundleCore<T, D>>> LogPuller<T, D, P> {
159    /// Allocates a new `Puller`.
160    pub fn new(puller: P, index: usize, channel: usize, logging: Option<Logger>) -> Self {
161        LogPuller {
162            puller,
163            channel,
164            index,
165            phantom: PhantomData,
166            logging,
167        }
168    }
169}
170
171impl<T, D: Container, P: Pull<BundleCore<T, D>>> Pull<BundleCore<T, D>> for LogPuller<T, D, P> {
172    #[inline]
173    fn pull(&mut self) -> &mut Option<BundleCore<T,D>> {
174        let result = self.puller.pull();
175        if let Some(bundle) = result {
176            let channel = self.channel;
177            let target = self.index;
178
179            if let Some(logger) = self.logging.as_ref() {
180                logger.log(MessagesEvent {
181                    is_send: false,
182                    channel,
183                    source: bundle.from,
184                    target,
185                    seq_no: bundle.seq,
186                    length: bundle.data.len(),
187                });
188            }
189        }
190
191        result
192    }
193}