timely_communication/
lib.rs

1//! A simple communication infrastructure providing typed exchange channels.
2//!
3//! This crate is part of the timely dataflow system, used primarily for its inter-worker communication.
4//! It may be independently useful, but it is separated out mostly to make clear boundaries in the project.
5//!
6//! Threads are spawned with an [`allocator::Generic`](allocator::generic::Generic), whose
7//! [`allocate`](Allocate::allocate) method returns a pair of several send endpoints and one
8//! receive endpoint. Messages sent into a send endpoint will eventually be received by the corresponding worker,
9//! if it receives often enough. The point-to-point channels are each FIFO, but with no fairness guarantees.
10//!
11//! To be communicated, a type must implement the [`Serialize`](serde::Serialize) trait when using the
12//! `bincode` feature or the [`Abomonation`](abomonation::Abomonation) trait when not.
13//!
14//! Channel endpoints also implement a lower-level `push` and `pull` interface (through the [`Push`](Push) and [`Pull`](Pull)
15//! traits), which is used for more precise control of resources.
16//!
17//! # Examples
18//! ```
19//! use timely_communication::Allocate;
20//!
21//! // configure for two threads, just one process.
22//! let config = timely_communication::Config::Process(2);
23//!
24//! // initializes communication, spawns workers
25//! let guards = timely_communication::initialize(config, |mut allocator| {
26//!     println!("worker {} started", allocator.index());
27//!
28//!     // allocates a pair of senders list and one receiver.
29//!     let (mut senders, mut receiver) = allocator.allocate(0);
30//!
31//!     // send typed data along each channel
32//!     use timely_communication::Message;
33//!     senders[0].send(Message::from_typed(format!("hello, {}", 0)));
34//!     senders[1].send(Message::from_typed(format!("hello, {}", 1)));
35//!
36//!     // no support for termination notification,
37//!     // we have to count down ourselves.
38//!     let mut expecting = 2;
39//!     while expecting > 0 {
40//!
41//!         allocator.receive();
42//!         if let Some(message) = receiver.recv() {
43//!             use std::ops::Deref;
44//!             println!("worker {}: received: <{}>", allocator.index(), message.deref());
45//!             expecting -= 1;
46//!         }
47//!         allocator.release();
48//!     }
49//!
50//!     // optionally, return something
51//!     allocator.index()
52//! });
53//!
54//! // computation runs until guards are joined or dropped.
55//! if let Ok(guards) = guards {
56//!     for guard in guards.join() {
57//!         println!("result: {:?}", guard);
58//!     }
59//! }
60//! else { println!("error in computation"); }
61//! ```
62//!
63//! The should produce output like:
64//!
65//! ```ignore
66//! worker 0 started
67//! worker 1 started
68//! worker 0: received: <hello, 0>
69//! worker 1: received: <hello, 1>
70//! worker 0: received: <hello, 0>
71//! worker 1: received: <hello, 1>
72//! result: Ok(0)
73//! result: Ok(1)
74//! ```
75
76#![forbid(missing_docs)]
77
78#[cfg(feature = "getopts")]
79extern crate getopts;
80#[cfg(feature = "bincode")]
81extern crate bincode;
82#[cfg(feature = "bincode")]
83extern crate serde;
84
85extern crate abomonation;
86#[macro_use] extern crate abomonation_derive;
87
88extern crate timely_bytes as bytes;
89extern crate timely_logging as logging_core;
90
91pub mod allocator;
92pub mod networking;
93pub mod initialize;
94pub mod logging;
95pub mod message;
96pub mod buzzer;
97
98use std::any::Any;
99
100#[cfg(feature = "bincode")]
101use serde::{Serialize, Deserialize};
102#[cfg(not(feature = "bincode"))]
103use abomonation::Abomonation;
104
105pub use allocator::Generic as Allocator;
106pub use allocator::Allocate;
107pub use initialize::{initialize, initialize_from, Config, WorkerGuards};
108pub use message::Message;
109
110/// A composite trait for types that may be used with channels.
111#[cfg(not(feature = "bincode"))]
112pub trait Data : Send+Sync+Any+Abomonation+'static { }
113#[cfg(not(feature = "bincode"))]
114impl<T: Send+Sync+Any+Abomonation+'static> Data for T { }
115
116/// A composite trait for types that may be used with channels.
117#[cfg(feature = "bincode")]
118pub trait Data : Send+Sync+Any+Serialize+for<'a>Deserialize<'a>+'static { }
119#[cfg(feature = "bincode")]
120impl<T: Send+Sync+Any+Serialize+for<'a>Deserialize<'a>+'static> Data for T { }
121
122/// Pushing elements of type `T`.
123///
124/// This trait moves data around using references rather than ownership,
125/// which provides the opportunity for zero-copy operation. In the call
126/// to `push(element)` the implementor can *swap* some other value to
127/// replace `element`, effectively returning the value to the caller.
128///
129/// Conventionally, a sequence of calls to `push()` should conclude with
130/// a call of `push(&mut None)` or `done()` to signal to implementors that
131/// another call to `push()` may not be coming.
132pub trait Push<T> {
133    /// Pushes `element` with the opportunity to take ownership.
134    fn push(&mut self, element: &mut Option<T>);
135    /// Pushes `element` and drops any resulting resources.
136    #[inline]
137    fn send(&mut self, element: T) { self.push(&mut Some(element)); }
138    /// Pushes `None`, conventionally signalling a flush.
139    #[inline]
140    fn done(&mut self) { self.push(&mut None); }
141}
142
143impl<T, P: ?Sized + Push<T>> Push<T> for Box<P> {
144    #[inline]
145    fn push(&mut self, element: &mut Option<T>) { (**self).push(element) }
146}
147
148/// Pulling elements of type `T`.
149pub trait Pull<T> {
150    /// Pulls an element and provides the opportunity to take ownership.
151    ///
152    /// The puller may mutate the result, in particular take ownership of the data by
153    /// replacing it with other data or even `None`. This allows the puller to return
154    /// resources to the implementor.
155    ///
156    /// If `pull` returns `None` this conventionally signals that no more data is available
157    /// at the moment, and the puller should find something better to do.
158    fn pull(&mut self) -> &mut Option<T>;
159    /// Takes an `Option<T>` and leaves `None` behind.
160    #[inline]
161    fn recv(&mut self) -> Option<T> { self.pull().take() }
162}
163
164impl<T, P: ?Sized + Pull<T>> Pull<T> for Box<P> {
165    #[inline]
166    fn pull(&mut self) -> &mut Option<T> { (**self).pull() }
167}
168
169
170use crossbeam_channel::{Sender, Receiver};
171
172/// Allocate a matrix of send and receive changes to exchange items.
173///
174/// This method constructs channels for `sends` threads to create and send
175/// items of type `T` to `recvs` receiver threads.
176fn promise_futures<T>(sends: usize, recvs: usize) -> (Vec<Vec<Sender<T>>>, Vec<Vec<Receiver<T>>>) {
177
178    // each pair of workers has a sender and a receiver.
179    let mut senders: Vec<_> = (0 .. sends).map(|_| Vec::with_capacity(recvs)).collect();
180    let mut recvers: Vec<_> = (0 .. recvs).map(|_| Vec::with_capacity(sends)).collect();
181
182    for sender in 0 .. sends {
183        for recver in 0 .. recvs {
184            let (send, recv) = crossbeam_channel::unbounded();
185            senders[sender].push(send);
186            recvers[recver].push(recv);
187        }
188    }
189
190    (senders, recvers)
191}