[−][src]Crate timely_communication
A simple communication infrastructure providing typed exchange channels.
This crate is part of the timely dataflow system, used primarily for its inter-worker communication. It may be independently useful, but it is separated out mostly to make clear boundaries in the project.
Threads are spawned with an allocator::Generic
, whose allocate
method returns a pair of several send endpoints and one
receive endpoint. Messages sent into a send endpoint will eventually be received by the corresponding worker,
if it receives often enough. The point-to-point channels are each FIFO, but with no fairness guarantees.
To be communicated, a type must implement the Serialize
trait. A default implementation of Serialize
is
provided for any type implementing Abomonation
. To implement other serialization strategies, wrap your type
and implement Serialize
for your wrapper.
Channel endpoints also implement a lower-level push
and pull
interface (through the Push
and Pull
traits), which is used for more precise control of resources.
Examples
use timely_communication::Allocate; // configure for two threads, just one process. let config = timely_communication::Configuration::Process(2); // initializes communication, spawns workers let guards = timely_communication::initialize(config, |mut allocator| { println!("worker {} started", allocator.index()); // allocates pair of senders list and one receiver. let (mut senders, mut receiver) = allocator.allocate(0); // send typed data along each channel use timely_communication::Message; senders[0].send(Message::from_typed(format!("hello, {}", 0))); senders[1].send(Message::from_typed(format!("hello, {}", 1))); // no support for termination notification, // we have to count down ourselves. let mut expecting = 2; while expecting > 0 { allocator.receive(); if let Some(message) = receiver.recv() { use std::ops::Deref; println!("worker {}: received: <{}>", allocator.index(), message.deref()); expecting -= 1; } allocator.release(); } // optionally, return something allocator.index() }); // computation runs until guards are joined or dropped. if let Ok(guards) = guards { for guard in guards.join() { println!("result: {:?}", guard); } } else { println!("error in computation"); }
The should produce output like:
worker 0 started worker 1 started worker 0: received: <hello, 0> worker 1: received: <hello, 1> worker 0: received: <hello, 0> worker 1: received: <hello, 1> result: Ok(0) result: Ok(1)
Re-exports
pub use allocator::Generic as Allocator; |
pub use allocator::Allocate; |
pub use initialize::initialize; |
pub use initialize::initialize_from; |
pub use initialize::Configuration; |
pub use initialize::WorkerGuards; |
pub use message::Message; |
Modules
allocator | Types and traits for the allocation of channels. |
buzzer | A type that can unpark specific threads. |
initialize | Initialization logic for a generic instance of the |
logging | Configuration and events for communication logging. |
message | Types wrapping typed data. |
networking | Networking code for sending and receiving fixed size |
Traits
Data | A composite trait for types that may be used with channels. |
Pull | Pulling elements of type |
Push | Pushing elements of type |