1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
//! A simple communication infrastructure providing typed exchange channels. //! //! This crate is part of the timely dataflow system, used primarily for its inter-worker communication. //! It may be indepedently useful, but it is separated out mostly to make clear boundaries in the project. //! //! Threads are spawned with an [`allocator::Generic`](./allocator/generic/enum.Generic.html), whose `allocate` method returns a pair of several send endpoints and one //! receive endpoint. Messages sent into a send endpoint will eventually be received by the corresponding worker, //! if it receives often enough. The point-to-point channels are each FIFO, but with no fairness guarantees. //! //! To be communicated, a type must implement the [`Serialize`](./trait.Serialize.html) trait. A default implementation of `Serialize` is //! provided for any type implementing [`Abomonation`](../abomonation/trait.Abomonation.html). To implement other serialization strategies, wrap your type //! and implement `Serialize` for your wrapper. //! //! Channel endpoints also implement a lower-level `push` and `pull` interface (through the [`Push`](./trait.Push.html) and [`Pull`](./trait.Pull.html) //! traits), which is used for more precise control of resources. //! //! #Examples //! ``` //! // configure for two threads, just one process. //! let config = communication::Configuration::Process(2); //! //! // initailizes communication, spawns workers //! communication::initialize(config, |mut allocator| { //! println!("worker {} started", allocator.index()); //! //! // allocates pair of senders list and one receiver. //! let (mut senders, mut receiver) = allocator.allocate(); //! //! // send typed data along each channel //! senders[0].send(format!("hello, {}", 0)); //! senders[1].send(format!("hello, {}", 1)); //! //! // no support for termination notification, //! // we have to count down ourselves. //! let mut expecting = 2; //! while expecting > 0 { //! if let Some(message) = receiver.recv() { //! println!("worker {}: received: <{}>", allocator.index(), message); //! expecting -= 1; //! } //! } //! }); //! ``` //! //! The should produce output like: //! //! ```ignore //! worker 0 started //! worker 1 started //! worker 0: received: <hello, 0> //! worker 1: received: <hello, 1> //! worker 0: received: <hello, 0> //! worker 1: received: <hello, 1> //! ``` //! extern crate getopts; extern crate byteorder; extern crate abomonation; pub mod allocator; mod networking; pub mod initialize; mod drain; use std::any::Any; use abomonation::{Abomonation, encode, decode}; pub use allocator::Generic as Allocator; pub use allocator::Allocate; pub use initialize::{initialize, Configuration}; /// A composite trait for types that may be used with channels. pub trait Data : Send+Any+Serialize+Clone+'static { } impl<T: Clone+Send+Any+Serialize+'static> Data for T { } /// Conversions to and from `Vec<u8>`. /// /// A type must implement this trait to move along the channels produced by an `A: Allocate`. /// /// A default implementation is provided for any `T: Abomonation+Clone`. pub trait Serialize { /// Append the binary representation of `self` to a vector of bytes. The `&mut self` argument /// may be mutated, but the second argument should only be appended to. fn into_bytes(&mut self, &mut Vec<u8>); /// Recover an instance of Self from its binary representation. The `&mut Vec<u8>` argument may /// be taken with `mem::replace` if it is needed. fn from_bytes(&mut Vec<u8>) -> Self; } // NOTE : this should be unsafe, because these methods are. // NOTE : figure this out later. don't use for serious things. impl<T: Abomonation+Clone> Serialize for T { fn into_bytes(&mut self, bytes: &mut Vec<u8>) { unsafe { encode(self, bytes); } } fn from_bytes(bytes: &mut Vec<u8>) -> Self { (* unsafe { decode::<T>(bytes) }.unwrap().0).clone() } } /// Pushing elements of type `T`. pub trait Push<T> { /// Pushes `element` and provides the opportunity to take ownership. /// /// The value of `element` after the call may be changed. A change does not imply anything other /// than that the implementor took resources associated with `element` and is returning other /// resources. fn push(&mut self, element: &mut Option<T>); /// Pushes `element` and drops any resulting resources. fn send(&mut self, element: T) { self.push(&mut Some(element)); } /// Pushes `None`, conventionally signalling a flush. fn done(&mut self) { self.push(&mut None); } } impl<T, P: ?Sized + Push<T>> Push<T> for Box<P> { fn push(&mut self, element: &mut Option<T>) { (**self).push(element) } } /// Pulling elements of type `T`. pub trait Pull<T> { /// Pulls an element and provides the opportunity to take ownership. /// /// The receiver may mutate the result, in particular take ownership of the data by replacing /// it with other data or even `None`. /// If `pull` returns `None` this conventially signals that no more data is available /// at the moment. fn pull(&mut self) -> &mut Option<T>; /// Takes an `Option<T>` and leaves `None` behind. fn recv(&mut self) -> Option<T> { self.pull().take() } } impl<T, P: ?Sized + Pull<T>> Pull<T> for Box<P> { fn pull(&mut self) -> &mut Option<T> { (**self).pull() } }