timely_communication/lib.rs
1//! A simple communication infrastructure providing typed exchange channels.
2//!
3//! This crate is part of the timely dataflow system, used primarily for its inter-worker communication.
4//! It may be independently useful, but it is separated out mostly to make clear boundaries in the project.
5//!
6//! Threads are spawned with an [`allocator::Generic`](allocator::generic::Generic), whose
7//! [`allocate`](Allocate::allocate) method returns a pair of several send endpoints and one
8//! receive endpoint. Messages sent into a send endpoint will eventually be received by the corresponding worker,
9//! if it receives often enough. The point-to-point channels are each FIFO, but with no fairness guarantees.
10//!
11//! To be communicated, a type must implement the [`Serialize`](serde::Serialize) trait when using the
12//! `bincode` feature or the [`Abomonation`](abomonation::Abomonation) trait when not.
13//!
14//! Channel endpoints also implement a lower-level `push` and `pull` interface (through the [`Push`](Push) and [`Pull`](Pull)
15//! traits), which is used for more precise control of resources.
16//!
17//! # Examples
18//! ```
19//! use timely_communication::Allocate;
20//!
21//! // configure for two threads, just one process.
22//! let config = timely_communication::Config::Process(2);
23//!
24//! // initializes communication, spawns workers
25//! let guards = timely_communication::initialize(config, |mut allocator| {
26//! println!("worker {} started", allocator.index());
27//!
28//! // allocates a pair of senders list and one receiver.
29//! let (mut senders, mut receiver) = allocator.allocate(0);
30//!
31//! // send typed data along each channel
32//! use timely_communication::Message;
33//! senders[0].send(Message::from_typed(format!("hello, {}", 0)));
34//! senders[1].send(Message::from_typed(format!("hello, {}", 1)));
35//!
36//! // no support for termination notification,
37//! // we have to count down ourselves.
38//! let mut expecting = 2;
39//! while expecting > 0 {
40//!
41//! allocator.receive();
42//! if let Some(message) = receiver.recv() {
43//! use std::ops::Deref;
44//! println!("worker {}: received: <{}>", allocator.index(), message.deref());
45//! expecting -= 1;
46//! }
47//! allocator.release();
48//! }
49//!
50//! // optionally, return something
51//! allocator.index()
52//! });
53//!
54//! // computation runs until guards are joined or dropped.
55//! if let Ok(guards) = guards {
56//! for guard in guards.join() {
57//! println!("result: {:?}", guard);
58//! }
59//! }
60//! else { println!("error in computation"); }
61//! ```
62//!
63//! The should produce output like:
64//!
65//! ```ignore
66//! worker 0 started
67//! worker 1 started
68//! worker 0: received: <hello, 0>
69//! worker 1: received: <hello, 1>
70//! worker 0: received: <hello, 0>
71//! worker 1: received: <hello, 1>
72//! result: Ok(0)
73//! result: Ok(1)
74//! ```
75
76#![forbid(missing_docs)]
77
78#[cfg(feature = "getopts")]
79extern crate getopts;
80#[cfg(feature = "bincode")]
81extern crate bincode;
82#[cfg(feature = "bincode")]
83extern crate serde;
84
85extern crate abomonation;
86#[macro_use] extern crate abomonation_derive;
87
88extern crate timely_bytes as bytes;
89extern crate timely_logging as logging_core;
90
91pub mod allocator;
92pub mod networking;
93pub mod initialize;
94pub mod logging;
95pub mod message;
96pub mod buzzer;
97
98use std::any::Any;
99
100#[cfg(feature = "bincode")]
101use serde::{Serialize, Deserialize};
102#[cfg(not(feature = "bincode"))]
103use abomonation::Abomonation;
104
105pub use allocator::Generic as Allocator;
106pub use allocator::Allocate;
107pub use initialize::{initialize, initialize_from, Config, WorkerGuards};
108pub use message::Message;
109
110/// A composite trait for types that may be used with channels.
111#[cfg(not(feature = "bincode"))]
112pub trait Data : Send+Sync+Any+Abomonation+'static { }
113#[cfg(not(feature = "bincode"))]
114impl<T: Send+Sync+Any+Abomonation+'static> Data for T { }
115
116/// A composite trait for types that may be used with channels.
117#[cfg(feature = "bincode")]
118pub trait Data : Send+Sync+Any+Serialize+for<'a>Deserialize<'a>+'static { }
119#[cfg(feature = "bincode")]
120impl<T: Send+Sync+Any+Serialize+for<'a>Deserialize<'a>+'static> Data for T { }
121
122/// Pushing elements of type `T`.
123///
124/// This trait moves data around using references rather than ownership,
125/// which provides the opportunity for zero-copy operation. In the call
126/// to `push(element)` the implementor can *swap* some other value to
127/// replace `element`, effectively returning the value to the caller.
128///
129/// Conventionally, a sequence of calls to `push()` should conclude with
130/// a call of `push(&mut None)` or `done()` to signal to implementors that
131/// another call to `push()` may not be coming.
132pub trait Push<T> {
133 /// Pushes `element` with the opportunity to take ownership.
134 fn push(&mut self, element: &mut Option<T>);
135 /// Pushes `element` and drops any resulting resources.
136 #[inline]
137 fn send(&mut self, element: T) { self.push(&mut Some(element)); }
138 /// Pushes `None`, conventionally signalling a flush.
139 #[inline]
140 fn done(&mut self) { self.push(&mut None); }
141}
142
143impl<T, P: ?Sized + Push<T>> Push<T> for Box<P> {
144 #[inline]
145 fn push(&mut self, element: &mut Option<T>) { (**self).push(element) }
146}
147
148/// Pulling elements of type `T`.
149pub trait Pull<T> {
150 /// Pulls an element and provides the opportunity to take ownership.
151 ///
152 /// The puller may mutate the result, in particular take ownership of the data by
153 /// replacing it with other data or even `None`. This allows the puller to return
154 /// resources to the implementor.
155 ///
156 /// If `pull` returns `None` this conventionally signals that no more data is available
157 /// at the moment, and the puller should find something better to do.
158 fn pull(&mut self) -> &mut Option<T>;
159 /// Takes an `Option<T>` and leaves `None` behind.
160 #[inline]
161 fn recv(&mut self) -> Option<T> { self.pull().take() }
162}
163
164impl<T, P: ?Sized + Pull<T>> Pull<T> for Box<P> {
165 #[inline]
166 fn pull(&mut self) -> &mut Option<T> { (**self).pull() }
167}
168
169
170use crossbeam_channel::{Sender, Receiver};
171
172/// Allocate a matrix of send and receive changes to exchange items.
173///
174/// This method constructs channels for `sends` threads to create and send
175/// items of type `T` to `recvs` receiver threads.
176fn promise_futures<T>(sends: usize, recvs: usize) -> (Vec<Vec<Sender<T>>>, Vec<Vec<Receiver<T>>>) {
177
178 // each pair of workers has a sender and a receiver.
179 let mut senders: Vec<_> = (0 .. sends).map(|_| Vec::with_capacity(recvs)).collect();
180 let mut recvers: Vec<_> = (0 .. recvs).map(|_| Vec::with_capacity(sends)).collect();
181
182 for sender in 0 .. sends {
183 for recver in 0 .. recvs {
184 let (send, recv) = crossbeam_channel::unbounded();
185 senders[sender].push(send);
186 recvers[recver].push(recv);
187 }
188 }
189
190 (senders, recvers)
191}