clocked_dispatch/
lib.rs

1//! Provides a message dispatch service where each receiver is aware of messages passed to other
2//! peers. In particular, if a message is sent to some receiver `r`, another receiver `r'` will be
3//! aware that one message has been dispatched when it does a subsequent read. Furthermore, the
4//! dispatcher ensures that messages are delivered in order by not emitting data until all input
5//! sources have confirmed that they will not send data with lower sequence numbers.
6//!
7//! The library ensures that a sender will not block due to the slowness of a receiver that is not
8//! the intended recipient of the message in question. For example, if there are two receivers, `r`
9//! and `r'`, `r.send(v)` will not block even though `r'` is not currently reading from its input
10//! channel.
11//!
12//! The library is implemented by routing all messages through a single dispatcher.
13//! This central dispatcher operates in one of two modes, *forwarding* or *serializing*.
14//!
15//!  - In serializing mode, it assigns a monotonically increasing timestamp to each message, and
16//!    forwards it to the intended recipient's queue.
17//!  - In forwarding mode, it accepts timestamped messages from sources, and outputs them to the
18//!    intended recipients *in order*. Messages are buffered by the dispatcher until each of the
19//!    receiver's sources are at least as up-to-date as the message's timestamp. These timestamps
20//!    *must* be sequentially assigned, but *may* be sent to the dispatcher in any order. The
21//!    dispatcher guarantees that they are delivered in-order.
22//!
23//! This dual-mode operation allows dispatchers to be composed in a hierarchical fashion, with a
24//! serializing dispatcher at the "top", and forwarding dispatchers "below".
25//!
26//! # Examples:
27//!
28//! Simple usage:
29//!
30//! ```
31//! use std::thread;
32//! use clocked_dispatch;
33//!
34//! // Create a dispatcher
35//! let d = clocked_dispatch::new(1);
36//!
37//! // Create a simple streaming channel
38//! let (tx, rx) = d.new("atx1", "arx");
39//! thread::spawn(move|| {
40//!     tx.send(10);
41//! });
42//! assert_eq!(rx.recv().unwrap().0.unwrap(), 10);
43//! ```
44//!
45//! Shared usage:
46//!
47//! ```
48//! use std::thread;
49//! use clocked_dispatch;
50//!
51//! // Create a dispatcher.
52//! // Notice that we need more buffer space to the dispatcher here.
53//! // This is because clone() needs to talk to the dispatcher, but the buffer to the dispatcher
54//! // may already have been filled up by the sends in the threads we spawned.
55//! let d = clocked_dispatch::new(10);
56//!
57//! // Create a shared channel that can be sent along from many threads
58//! // where tx is the sending half (tx for transmission), and rx is the receiving
59//! // half (rx for receiving).
60//! let (tx, rx) = d.new("atx", "arx");
61//! for i in 0..10 {
62//!     let tx = tx.clone(format!("atx{}", i));
63//!     thread::spawn(move|| {
64//!         tx.send(i);
65//!     });
66//! }
67//!
68//! for _ in 0..10 {
69//!     let j = rx.recv().unwrap().0.unwrap();
70//!     assert!(0 <= j && j < 10);
71//! }
72//! ```
73//!
74//! Accessing timestamps:
75//!
76//! ```
77//! use clocked_dispatch;
78//! let m = clocked_dispatch::new(10);
79//! let (tx_a, rx_a) = m.new("atx1", "a");
80//!
81//! // notice that we can't use _ here even though tx_b is unused because
82//! // then tx_b would be dropped, causing rx_b to be closed immediately
83//! let (tx_b, rx_b) = m.new("btx1", "b");
84//! let _ = tx_b;
85//!
86//! tx_a.send("a1");
87//! let x = rx_a.recv().unwrap();
88//! assert_eq!(x.0, Some("a1"));
89//! assert_eq!(rx_b.recv(), Ok((None, x.1)));
90//!
91//! tx_a.send("a2");
92//! tx_a.send("a3");
93//!
94//! let a1 = rx_a.recv().unwrap();
95//! assert_eq!(a1.0, Some("a2"));
96//!
97//! let a2 = rx_a.recv().unwrap();
98//! assert_eq!(a2.0, Some("a3"));
99//!
100//! // b must see the timestamp from either a1 or a2
101//! // it could see a1 if a2 hasn't yet been delivered
102//! let b = rx_b.recv().unwrap();
103//! assert_eq!(b.0, None);
104//! assert!(b.1 == a1.1 || b.1 == a2.1);
105//! ```
106//!
107//! In-order delivery
108//!
109//! ```
110//! use clocked_dispatch;
111//! use std::sync::mpsc;
112//!
113//! let m = clocked_dispatch::new(10);
114//! let (tx1, rx) = m.new("tx1", "a");
115//! let tx2 = tx1.clone("tx2");
116//!
117//! tx1.forward(Some("a1"), 1);
118//! assert_eq!(rx.try_recv(), Err(mpsc::TryRecvError::Empty));
119//!
120//! tx2.forward(None, 1);
121//! assert_eq!(rx.recv(), Ok((Some("a1"), 1)));
122//! ```
123
124extern crate rand;
125
126use std::sync::{Arc, Mutex, Condvar};
127use std::cmp::Ordering;
128use std::collections::HashMap;
129use std::collections::HashSet;
130use std::collections::VecDeque;
131use std::collections::BinaryHeap;
132use std::sync::mpsc;
133use std::thread;
134use std::sync;
135
136macro_rules! debug {
137    ( $fmt:expr ) => {
138        // println!($fmt);
139    };
140    ( $fmt:expr, $( $args:expr ),+ ) => {
141        // println!($fmt, $($args),*);
142        $(let _ = $args;)*;
143    };
144}
145
146struct TaggedData<T> {
147    from: String,
148    to: Option<String>,
149    ts: Option<usize>,
150    data: Option<T>,
151}
152
153/// A message intended for the dispatcher.
154enum Message<T> {
155    Data(TaggedData<T>),
156    ReceiverJoin(String, Arc<ReceiverInner<T>>),
157    ReceiverLeave(String),
158    SenderJoin(Option<String>, String),
159    SenderLeave(Option<String>, String),
160}
161
162/// The sending half of a clocked synchronous channel.
163/// This half can only be owned by one thread, but it can be cloned to send to other threads.
164///
165/// Sending on a clocked channel will deliver the given message to the appropriate receiver, but
166/// also notify all other receivers about the timestamp assigned to the message. The sending will
167/// never block on a receiver that is not the destination of the message.
168///
169/// Beware that dropping a clocked sender incurs control messages to the dispatcher, and that those
170/// control messages may result in messages being sent to receivers. If the dispatch channel is not
171/// sufficiently buffered, this means that dropping a `ClockedSender` before the corresponding
172/// `ClockedReceiver` is receiving on its end of the channel may deadlock.
173///
174/// When the last `ClockedSender` is dropped for a target, and there are no `ClockedBroadcaster`s,
175/// the dispatcher will automatically be notified, and the recipient will see a disconnected
176/// channel error once it has read all buffered messages.
177///
178/// ```
179/// use clocked_dispatch;
180/// use std::thread;
181///
182/// let m = clocked_dispatch::new(10);
183/// let (tx_a, rx_a) = m.new("atx", "arx");
184///
185/// let tx_a1 = tx_a.clone("atx1");
186/// thread::spawn(move || {
187///     tx_a1.send("a1");
188/// });
189///
190/// let tx_a2 = tx_a.clone("atx2");
191/// thread::spawn(move || {
192///     tx_a2.send("a2");
193/// });
194///
195/// drop(tx_a);
196/// assert_eq!(rx_a.count(), 2);
197/// ```
198pub struct ClockedSender<T> {
199    target: String,
200    source: String,
201    dispatcher: mpsc::SyncSender<Message<T>>,
202}
203
204impl<T> Drop for ClockedSender<T> {
205    fn drop(&mut self) {
206        self.dispatcher
207            .send(Message::SenderLeave(Some(self.target.clone()), self.source.clone()))
208            .unwrap();
209    }
210}
211
212impl<T> ClockedSender<T> {
213    /// Sends a value on this synchronous channel, and notifies all other recipients of the
214    /// timestamp it is assigned by the dispatcher.
215    ///
216    /// This function will *block* until space in the internal buffer becomes available, or a
217    /// receiver is available to hand off the message to.
218    ///
219    /// Note that a successful send does *not* guarantee that the receiver will ever see the data if
220    /// there is a buffer on this channel. Items may be enqueued in the internal buffer for the
221    /// receiver to receive at a later time. If the buffer size is 0, however, it can be guaranteed
222    /// that the receiver has indeed received the data if this function returns success.
223    pub fn send(&self, data: T) {
224        // XXX: would be really neat if we could return the ts here, but that'll probably be tricky
225        // TODO: This function will never panic, but it may return `Err` if the `Receiver` has
226        // disconnected and is no longer able to receive information.
227        self.dispatcher
228            .send(Message::Data(TaggedData {
229                from: self.source.clone(),
230                to: Some(self.target.clone()),
231                ts: None,
232                data: Some(data),
233            }))
234            .unwrap()
235    }
236
237    /// Sends an already-sequenced value to the associated receiver. The message may be buffered
238    /// by the dispatcher until it can guarantee that no other sender will later try to send
239    /// messages with a lower sequence number.
240    ///
241    /// It is optional to include data when forwarding. If no data is included, this message
242    /// conveys to the dispatcher that this sender promises not to send later messages with a
243    /// higher sequence number than the one given.
244    pub fn forward(&self, data: Option<T>, ts: usize) {
245        self.dispatcher
246            .send(Message::Data(TaggedData {
247                from: self.source.clone(),
248                to: Some(self.target.clone()),
249                ts: Some(ts),
250                data: data,
251            }))
252            .unwrap()
253    }
254
255    /// Creates a new clocked sender for this sender's receiver.
256    ///
257    /// Clocked dispatch requires that all senders have a unique name so that the "up-to-date-ness"
258    /// of the senders can be tracked reliably.
259    pub fn clone<V: Into<String>>(&self, source: V) -> ClockedSender<T> {
260        let source = source.into();
261        self.dispatcher
262            .send(Message::SenderJoin(Some(self.target.clone()), source.clone()))
263            .unwrap();
264        ClockedSender {
265            source: source,
266            target: self.target.clone(),
267            dispatcher: self.dispatcher.clone(),
268        }
269    }
270}
271
272impl<T: Clone> ClockedSender<T> {
273    /// Converts this sender into a broadcast sender.
274    ///
275    /// Doing so detaches the sender from its receiver, and means all future sends will be
276    /// broadcast to all receivers. Note that the existence of a broadcaster prevents the closing
277    /// of all channels.
278    pub fn into_broadcaster(self) -> ClockedBroadcaster<T> {
279        let dispatcher = self.dispatcher.clone();
280        let source = format!("{}_bcast", self.source);
281        dispatcher.send(Message::SenderJoin(None, source.clone())).unwrap();
282
283        // NOTE: the drop of self causes a Message::SenderLeave to be sent for this sender
284        ClockedBroadcaster {
285            source: source,
286            dispatcher: dispatcher,
287        }
288    }
289}
290
291/// A sending half of a clocked synchronous channel that only allows broadcast. This half can only
292/// be owned by one thread, but it can be cloned to send to other threads. A `ClockedBroadcaster`
293/// can be constructed from a `ClockedSender` using `ClockedSender::into_broadcaster`.
294///
295/// Sending on a clocked channel will deliver the given message to the appropriate receiver, but
296/// also notify all other receivers about the timestamp assigned to the message. The sending will
297/// never block on a receiver that is not the destination of the message.
298///
299/// Beware that dropping a clocked sender incurs control messages to the dispatcher, and that those
300/// control messages may result in messages being sent to receivers. If the dispatch channel is not
301/// sufficiently buffered, this means that dropping a `ClockedSender` before the corresponding
302/// `ClockedReceiver` is receiving on its end of the channel may deadlock.
303///
304/// Note that the existence of a `ClockedBroadcater` prevents the closing of any clocked channels
305/// managed by this dispatcher.
306///
307/// # Examples
308///
309/// Regular broadcast:
310///
311/// ```
312/// use clocked_dispatch;
313/// use std::sync::mpsc;
314///
315/// let m = clocked_dispatch::new(10);
316/// let (tx_a, rx_a) = m.new("atx", "arx");
317/// let tx = tx_a.into_broadcaster();
318/// // note that the A channel is still open since there now exists a broadcaster,
319/// // even though all A senders have been dropped.
320///
321/// let (tx_b, rx_b) = m.new("btx", "brx");
322///
323/// tx.broadcast("1");
324///
325/// let x = rx_a.recv().unwrap();
326/// assert_eq!(x.0, Some("1"));
327/// assert_eq!(rx_b.recv(), Ok(x));
328///
329/// // non-broadcasts still work
330/// tx_b.send("2");
331/// let x = rx_b.recv().unwrap();
332/// assert_eq!(x.0, Some("2"));
333/// assert_eq!(rx_a.recv(), Ok((None, x.1)));
334///
335/// // drop broadcaster
336/// drop(tx);
337///
338/// // A is now closed because there are no more senders
339/// assert_eq!(rx_a.recv(), Err(mpsc::RecvError));
340///
341/// // rx_b is *not* closed because tx_b still exists
342/// assert_eq!(rx_b.try_recv(), Err(mpsc::TryRecvError::Empty));
343///
344/// drop(tx_b);
345/// // rx_b is now closed because its senders have all gone away
346/// assert_eq!(rx_b.recv(), Err(mpsc::RecvError));
347/// ```
348///
349/// Forwarding broadcast:
350///
351/// ```
352/// use clocked_dispatch;
353/// use std::sync::mpsc;
354///
355/// let m = clocked_dispatch::new(10);
356/// let (tx_a, rx_a) = m.new("atx", "arx");
357/// let (tx_b, rx_b) = m.new("btx", "brx");
358/// let (tx_c, rx_c) = m.new("ctx", "crx");
359///
360/// let tx = tx_a.into_broadcaster();
361/// tx.broadcast_forward(Some("1"), 1);
362///
363/// assert_eq!(rx_a.recv().unwrap(), (Some("1"), 1));
364/// assert_eq!(rx_b.recv().unwrap(), (Some("1"), 1));
365/// assert_eq!(rx_c.recv().unwrap(), (Some("1"), 1));
366///
367/// // non-broadcasts still work
368/// tx_c.forward(Some("c"), 2);
369/// assert_eq!(rx_a.recv().unwrap(), (None, 2));
370/// assert_eq!(rx_b.recv().unwrap(), (None, 2));
371/// assert_eq!(rx_c.recv().unwrap(), (Some("c"), 2));
372/// ```
373pub struct ClockedBroadcaster<T: Clone> {
374    source: String,
375    dispatcher: mpsc::SyncSender<Message<T>>,
376}
377
378impl<T: Clone> Drop for ClockedBroadcaster<T> {
379    fn drop(&mut self) {
380        self.dispatcher.send(Message::SenderLeave(None, self.source.clone())).unwrap();
381    }
382}
383
384impl<T: Clone> ClockedBroadcaster<T> {
385    /// Sends a value to all receivers known to this dispatcher. The value will be assigned a
386    /// sequence number by the dispatcher.
387    ///
388    /// This function will *block* until space in the internal buffer becomes available, or a
389    /// receiver is available to hand off the message to.
390    ///
391    /// Note that a successful send does *not* guarantee that the receiver will ever see the data if
392    /// there is a buffer on this channel. Items may be enqueued in the internal buffer for the
393    /// receiver to receive at a later time. If the buffer size is 0, however, it can be guaranteed
394    /// that the receiver has indeed received the data if this function returns success.
395    pub fn broadcast(&self, data: T) {
396        self.dispatcher
397            .send(Message::Data(TaggedData {
398                from: self.source.clone(),
399                to: None,
400                ts: None,
401                data: Some(data),
402            }))
403            .unwrap()
404    }
405
406    /// Sends an already-sequenced value to all receivers known to this dispatcher. The message may
407    /// be buffered by the dispatcher until it can guarantee that no other sender will later try to
408    /// send messages with a lower sequence number.
409    ///
410    /// This function will *block* until space in the internal buffer becomes available, or a
411    /// receiver is available to hand off the message to.
412    ///
413    /// Note that a successful send does *not* guarantee that the receiver will ever see the data if
414    /// there is a buffer on this channel. Items may be enqueued in the internal buffer for the
415    /// receiver to receive at a later time. If the buffer size is 0, however, it can be guaranteed
416    /// that the receiver has indeed received the data if this function returns success.
417    ///
418    /// It is optional to include data when forwarding. If no data is included, this message
419    /// conveys to the dispatcher that this sender promises not to send later messages with a
420    /// higher sequence number than the one given.
421    pub fn broadcast_forward(&self, data: Option<T>, ts: usize) {
422        self.dispatcher
423            .send(Message::Data(TaggedData {
424                from: self.source.clone(),
425                to: None,
426                ts: Some(ts),
427                data: data,
428            }))
429            .unwrap()
430    }
431
432    /// Creates a new clocked broadcast sender.
433    ///
434    /// Clocked dispatch requires that all senders have a unique name so that the "up-to-date-ness"
435    /// of the senders can be tracked reliably.
436    pub fn clone<V: Into<String>>(&self, source: V) -> ClockedBroadcaster<T> {
437        let source = source.into();
438        self.dispatcher.send(Message::SenderJoin(None, source.clone())).unwrap();
439        ClockedBroadcaster {
440            source: source,
441            dispatcher: self.dispatcher.clone(),
442        }
443    }
444}
445
446struct QueueState<T> {
447    queue: VecDeque<(T, usize)>,
448    ts_head: usize,
449    ts_tail: usize,
450    closed: bool,
451    left: bool,
452}
453
454struct ReceiverInner<T> {
455    mx: Mutex<QueueState<T>>,
456    cond: Condvar,
457}
458
459/// The receiving half of a clocked synchronous channel.
460///
461/// A clocked receiver will receive all messages sent by one of its associated senders. It will
462/// also receive notifications whenever a message with a higher timestamp than any it has seen has
463/// been sent to another receiver under the same dispatcher.
464///
465/// Dropping it will unblock any senders trying to send to this receiver.
466pub struct ClockedReceiver<T: Send + 'static> {
467    leave: mpsc::SyncSender<String>,
468    inner: Arc<ReceiverInner<T>>,
469    name: String,
470}
471
472impl<T: Send + 'static> ClockedReceiver<T> {
473    fn new<V: Into<String>>(name: V,
474                            leave: mpsc::SyncSender<String>,
475                            bound: usize)
476                            -> ClockedReceiver<T> {
477        ClockedReceiver {
478            leave: leave,
479            inner: Arc::new(ReceiverInner {
480                mx: Mutex::new(QueueState {
481                    queue: VecDeque::with_capacity(bound),
482                    ts_head: 0,
483                    ts_tail: 0,
484                    closed: false,
485                    left: false,
486                }),
487                cond: Condvar::new(),
488            }),
489            name: name.into(),
490        }
491    }
492}
493
494impl<T: Send + 'static> Iterator for ClockedReceiver<T> {
495    type Item = (Option<T>, usize);
496    fn next(&mut self) -> Option<Self::Item> {
497        self.recv().ok()
498    }
499}
500
501impl<T: Send + 'static> Drop for ClockedReceiver<T> {
502    fn drop(&mut self) {
503        use std::mem;
504
505        let name = mem::replace(&mut self.name, String::new());
506        self.leave.send(name).unwrap();
507        // wait until we've actually been dropped
508        self.count();
509    }
510}
511
512impl<T: Send + 'static> ClockedReceiver<T> {
513    /// Attempts to wait for a value on this receiver, returning an error if the corresponding
514    /// channel has hung up.
515    ///
516    /// This function will always block the current thread if there is no data available, the
517    /// receiver has seen the latest timestamp handled by the dispatcher, and it's possible for
518    /// more data to be sent. Once a message is sent to a corresponding `ClockedSender`, then this
519    /// receiver will wake up and return that message. If a message is sent by a `ClockedSender`
520    /// connected to a different receiver under the same dispatcher, this receiver will wake up and
521    /// receive the timestamp assigned to that message.
522    ///
523    /// If all corresponding `ClockedSender` have disconnected, or disconnect while this call is
524    /// blocking, this call will wake up and return `Err` to indicate that no more messages can
525    /// ever be received on this channel. However, since channels are buffered, messages sent
526    /// before the disconnect will still be properly received.
527    pub fn recv(&self) -> Result<(Option<T>, usize), mpsc::RecvError> {
528        let mut state = self.inner.mx.lock().unwrap();
529        while state.ts_head == state.ts_tail && state.queue.is_empty() && !state.closed {
530            // NOTE: is is *not* sufficient to use head == tail as an indicator that there are no
531            // messages. specifically, if there are duplicates for a given timestamp, the equality
532            // may work out while there are still elements in the queue.
533            state = self.inner.cond.wait(state).unwrap();
534        }
535
536        // if there's something at the head of the queue, return it
537        if let Some((t, ts)) = state.queue.pop_front() {
538            state.ts_head = ts;
539            self.inner.cond.notify_one();
540            return Ok((Some(t), ts));
541        }
542
543        if state.ts_head == state.ts_tail {
544            // we must be closed
545            assert_eq!(state.closed, true);
546            return Err(mpsc::RecvError);
547        }
548
549        // otherwise, notify about the newest available timestamp
550        state.ts_head = state.ts_tail;
551        self.inner.cond.notify_one();
552        Ok((None, state.ts_head))
553    }
554
555    /// Attempts to return a pending value on this receiver without blocking
556    ///
557    /// This method will never block the caller in order to wait for data to become available.
558    /// Instead, this will always return immediately with a possible option of pending data on the
559    /// channel.
560    ///
561    /// This is useful for a flavor of "optimistic check" before deciding to block on a receiver.
562    pub fn try_recv(&self) -> Result<(Option<T>, usize), mpsc::TryRecvError> {
563        let mut state = self.inner.mx.lock().unwrap();
564        if state.ts_head == state.ts_tail && !state.closed {
565            // we have observed all timestamps, so the queue must be empty
566            return Err(mpsc::TryRecvError::Empty);
567        }
568
569        if state.ts_head == state.ts_tail {
570            // we must be closed
571            assert_eq!(state.closed, true);
572            return Err(mpsc::TryRecvError::Disconnected);
573        }
574
575        // if there's something at the head of the queue, return it
576        if let Some((t, ts)) = state.queue.pop_front() {
577            state.ts_head = ts;
578            self.inner.cond.notify_one();
579            return Ok((Some(t), ts));
580        }
581
582        // otherwise, notify about the newest available timestamp
583        state.ts_head = state.ts_tail;
584        self.inner.cond.notify_one();
585        Ok((None, state.ts_head))
586    }
587}
588
589/// Dispatch coordinator for adding additional clocked channels.
590pub struct Dispatcher<T: Send> {
591    dispatcher: mpsc::SyncSender<Message<T>>,
592    leave: mpsc::SyncSender<String>,
593    bound: usize,
594}
595
596impl<T: Send> Dispatcher<T> {
597    /// Creates a new named, synchronous, bounded, clocked channel managed by this dispatcher.
598    ///
599    /// The given receiver and sender names *must* be unique for this dispatch.
600    ///
601    /// The `ClockedReceiver` will block until a message or a new timestamp becomes available.
602    ///
603    /// The receiver's incoming channel has an internal buffer on which messages will be queued.
604    /// Its size is inherited from the dispatch bound. When this buffer becomes full, future
605    /// messages from the dispatcher will block waiting for the buffer to open up. Note that a
606    /// buffer size of 0 is valid, but its behavior differs from that of synchronous Rust channels.
607    /// Because the dispatcher sits between the sender and the receiver, a bound of 0 will not
608    /// guarantee a "rendezvous" between the sender and the receiver, but rather between the sender
609    /// and the dispatcher (and subsequently, the dispatcher and the receiver).
610    pub fn new<S1: Into<String>, S2: Into<String>>(&self,
611                                                   sender: S1,
612                                                   receiver: S2)
613                                                   -> (ClockedSender<T>, ClockedReceiver<T>) {
614        let source = sender.into();
615        let target = receiver.into();
616        let send = ClockedSender {
617            source: source.clone(),
618            target: target.clone(),
619            dispatcher: self.dispatcher.clone(),
620        };
621        let recv = ClockedReceiver::new(target.clone(), self.leave.clone(), self.bound);
622
623        self.dispatcher.send(Message::ReceiverJoin(target.clone(), recv.inner.clone())).unwrap();
624        self.dispatcher.send(Message::SenderJoin(Some(target.clone()), source)).unwrap();
625        (send, recv)
626    }
627}
628
629/// `Delayed` is used to keep track of messages that cannot yet be safely delivered because it
630/// would violate the in-order guarantees.
631///
632/// `Delayed` structs are ordered by their timestamp such that the *lowest* is the "highest". This
633/// is so that `Delayed` can easily be used in a `BinaryHeap`.
634struct Delayed<T> {
635    ts: usize,
636    data: T,
637}
638
639impl<T> PartialEq for Delayed<T> {
640    fn eq(&self, other: &Delayed<T>) -> bool {
641        other.ts == self.ts
642    }
643}
644
645impl<T> PartialOrd for Delayed<T> {
646    fn partial_cmp(&self, other: &Delayed<T>) -> Option<Ordering> {
647        Some(self.cmp(other))
648    }
649}
650
651impl<T> Eq for Delayed<T> {}
652
653impl<T> Ord for Delayed<T> {
654    fn cmp(&self, other: &Delayed<T>) -> Ordering {
655        other.ts.cmp(&self.ts)
656    }
657}
658
659struct Target<T> {
660    // the receiver's channel
661    channel: Arc<ReceiverInner<T>>,
662    // messages for this receiver that have high timestamps, and must be delayed
663    // the mutex is so that we can allow the control thread to have an &Target without T: Sync
664    delayed: sync::Mutex<BinaryHeap<Delayed<T>>>,
665    // known senders for this receiver
666    senders: HashSet<String>,
667}
668
669// TODO:
670// It seems like dispatchers are always used in one of the following ways:
671//
672//  - Multi-in, multi-out, unicast, assigning timestamps, no buffering
673//  - Multi-in, single-out, unicast, forwarding, buffering
674//  - Single-in, multi-out, broadcast, forwarding, no buffering
675//
676// We could specialize for each of these, which might increase performance and further modularize
677// the code. This would also allow restricting the API such that you can't start using one kind of
678// dispatcher in another mode. Another potentially good example of this is forcing T: Clone only
679// for broadcast dispatchers.
680struct DispatchInner<T> {
681    // per-receiver information
682    // needs to be locked so that control thread can access ReceiverInner Arcs
683    targets: sync::Arc<sync::RwLock<HashMap<String, Target<T>>>>,
684    // essentially targets.keys()
685    // kept separate so we can mutably use targets while iterating over all receiver names
686    destinations: HashSet<String>,
687    // known broadcasters
688    broadcasters: HashSet<String>,
689    // broadcast messages that have high timestamps, and must be delayed
690    bdelay: BinaryHeap<Delayed<T>>,
691    // whether we are operating in forwarding or serializing mode
692    // in the former, the senders assign timestamps
693    // in the latter, we assign the timestamps
694    // the first message we receive dictate the mode
695    forwarding: Option<bool>,
696    // queue bound
697    bound: usize,
698    id: String,
699    // Sequence number counter.
700    // If we are in forwarding mode this is the highest consecutive sequence number we have
701    // received. If we are in serializing mode, this is the last sequence number we have assigned.
702    counter: usize,
703}
704
705impl<T: Clone> DispatchInner<T> {
706    /// Notifies all receivers of the given timestamp, and sends any given data to the intended
707    /// recipients.
708    ///
709    /// If `data == None`, `ts` is sent all receivers.
710    /// If `to == None`, `data.unwrap()` is sent to all receivers.
711    /// If `to == Some(t)`, `data.unwrap()` is sent to the the receiver named `t`.
712    fn notify(&self, to: Option<&String>, ts: usize, data: Option<T>) {
713        let tgts = self.targets.read().unwrap();
714        for (tn, t) in tgts.iter() {
715            let mut state = t.channel.mx.lock().unwrap();
716            debug!("{}: notifying {} about {}", self.id, tn, ts);
717            if data.is_some() && (to.is_none() || to.unwrap() == tn.as_str()) {
718                debug!("{}: including data", self.id);
719                while state.queue.len() == self.bound && !state.left {
720                    state = t.channel.cond.wait(state).unwrap();
721                }
722
723                if state.left {
724                    t.channel.cond.notify_one();
725                    continue;
726                }
727
728                // TODO: avoid clone() for the last send
729                state.queue.push_back((data.clone().unwrap(), ts));
730            }
731            state.ts_tail = ts;
732            t.channel.cond.notify_one();
733            drop(state);
734        }
735
736        // if data.is_some() && to.is_some() && !self.targets.contains_key(to.unwrap().as_str())
737        // this seems like a bad case, but it could just be that the receiver has left
738        // TODO: would be nice if we had some way of notifying the sender that this is the case
739    }
740
741    /// Find any delayed messages that are now earlier than the minimum sender sequence number, and
742    /// send them in-order. Will check both broadcast messages and messages to a given sender if
743    /// `to.is_some()`.
744    fn process_delayed(&mut self) {
745        assert!(self.forwarding.unwrap_or(false));
746        debug!("{}: processing delayed after {}", self.id, self.counter);
747
748        // keep looking for a candidate to send
749        loop {
750            // we need to find the message in `[bdelay + targets[to].delayed]` with the lowest
751            // timestamp. we do this by:
752            //
753            // 1. finding the smallest in `bdelay`
754            let next = self.bdelay.peek().map(|d| d.ts);
755            debug!("{}: next from bcast is {:?}", self.id, next);
756            // 2. finding the smallest in `targets[*].delayed`
757            let tnext = {
758                let tgts = self.targets.read().unwrap();
759                let t = self.destinations
760                    .iter()
761                    .map(|to| {
762                        let t = &tgts[to];
763                        (to,
764                         t.delayed
765                            .lock()
766                            .unwrap()
767                            .peek()
768                            .map(|d| d.ts))
769                    })
770                    .filter_map(|(to, ts)| ts.map(move |ts| (to, ts)))
771                    .min_by_key(|&(_, ts)| ts);
772                t.map(|(to, ts)| (to.to_owned(), ts))
773            };
774
775            debug!("{}: next from tdelay is {:?}", self.id, tnext);
776
777            // 3. using the message from 2 if it is the next message
778            if let Some((to, tnext)) = tnext {
779                if tnext == self.counter + 1 {
780                    debug!("{}: forwarding from tdelay", self.id);
781                    let d = {
782                        let tgts = self.targets.read().unwrap();
783                        let mut x = tgts[to.as_str()].delayed.lock().unwrap();
784                        x.pop().unwrap()
785                    };
786                    self.notify(Some(&to), d.ts, Some(d.data));
787                    self.counter += 1;
788                    continue;
789                }
790            }
791
792            // 4. using the message from 1 if it is the next message
793            if let Some(ts) = next {
794                if ts == self.counter + 1 {
795                    debug!("{}: forwarding from bdelay", self.id);
796                    let d = self.bdelay.pop().unwrap();
797                    self.notify(None, d.ts, Some(d.data));
798                    self.counter += 1;
799                    continue;
800                }
801            }
802
803            // no delayed message has a sequence number <= min
804            break;
805        }
806
807        debug!("{}: done replaying", self.id);
808    }
809
810    /// Takes a message from any sender, handles control messages, and delays or delivers data
811    /// messages.
812    ///
813    /// The first data message sets which mode the dispatcher operates in. If the first message has
814    /// a sequence number, the dispatcher will operate in forwarding mode. If it does not, it will
815    /// operate in assignment mode. In the former, it expects every data message to be numbered,
816    /// and delays too-new messages until all inputs are at least that up-to-date. In the latter,
817    /// it will deliver all messages immediately, and will assign sequence numbers to each one.
818    fn absorb(&mut self, m: Message<T>) {
819        match m {
820            Message::Data(td) => {
821                debug!("{}: got message with ts {:?} from {} for {:?}",
822                       self.id,
823                       td.ts,
824                       td.from,
825                       td.to);
826                if self.forwarding.is_some() {
827                    assert!(self.forwarding.unwrap() == td.ts.is_some(),
828                            "one sender sent timestamp, another did not");
829                } else {
830                    self.forwarding = Some(td.ts.is_some())
831                }
832
833                if let Some(ts) = td.ts {
834                    // if we are forwarding (which must be the case here), this message may be the
835                    // next to be sent out. in that case we should increase the sequence number
836                    // tracker so that any later messages will also be released
837                    assert!(ts >= self.counter);
838                    if ts == self.counter + 1 {
839                        self.counter = ts;
840                    }
841                }
842
843                if td.ts.is_none() {
844                    // the sender leaves it up to us to pick timestamps, so we know we're always up
845                    // to date. note that this latter case assumes that the senders will *never*
846                    // give us timestamps once they have let us pick once.
847                    self.counter += 1;
848                    self.notify(td.to.as_ref(), self.counter, td.data);
849                    return;
850                }
851
852                // we're in forwarding mode
853                let ts = td.ts.unwrap();
854                if ts == self.counter {
855                    // this messages is the next to be sent out, so we can send it immediately
856                    self.notify(td.to.as_ref(), ts, td.data);
857                    // since this messages must also have incremented the counter above, there may
858                    // be other messages that can now be sent out.
859                    self.process_delayed();
860                    return;
861                }
862
863                // need to buffer this message until the other views are sufficiently up-to-date.
864                if let Some(data) = td.data {
865                    if let Some(ref to) = td.to {
866                        debug!("{}: delayed in {:?}", self.id, to);
867                        let tgts = self.targets.read().unwrap();
868                        tgts[to].delayed.lock().unwrap().push(Delayed {
869                            ts: ts,
870                            data: data,
871                        });
872                        drop(tgts);
873                    } else {
874                        debug!("{}: delayed in bcast", self.id);
875                        self.bdelay.push(Delayed {
876                            ts: ts,
877                            data: data,
878                        });
879                    }
880                }
881            }
882            Message::ReceiverJoin(name, inner) => {
883                debug!("{}: receiver {} joined", self.id, name);
884                if !self.destinations.insert(name.clone()) {
885                    panic!("receiver {} already exists!", name);
886                }
887
888                let mut tgts = self.targets.write().unwrap();
889                tgts.insert(name,
890                            Target {
891                                channel: inner,
892                                senders: HashSet::new(),
893                                delayed: sync::Mutex::new(BinaryHeap::new()),
894                            });
895            }
896            Message::ReceiverLeave(name) => {
897                debug!("{}: receiver {} left", self.id, name);
898                // NOTE: Control thread has already unblocked senders and set .left
899
900                // Deregister the receiver
901                let mut tgts = self.targets.write().unwrap();
902                tgts.remove(&*name);
903                self.destinations.remove(&*name);
904
905                // TODO: ensure that subsequent send()'s return an error (somehow?) instead of just
906                // crashing and burning (panic) like what happens now.
907            }
908            Message::SenderJoin(target, source) => {
909                debug!("{}: sender {} for {:?} joined", self.id, source, target);
910
911                if let Some(target) = target {
912                    let mut tgts = self.targets.write().unwrap();
913                    tgts.get_mut(&*target).unwrap().senders.insert(source);
914                } else {
915                    self.broadcasters.insert(source);
916                }
917            }
918            Message::SenderLeave(target, source) => {
919                debug!("{}: sender {} for {:?} left", self.id, source, target);
920                if let Some(ref target) = target {
921                    // NOTE: target may not exist because receiver has left
922                    let mut tgts = self.targets.write().unwrap();
923                    if let Some(target) = tgts.get_mut(target.as_str()) {
924                        target.senders.remove(&*source);
925                    }
926                    drop(tgts);
927                } else {
928                    self.broadcasters.remove(&*source);
929                }
930
931                if self.broadcasters.is_empty() {
932                    // if there are broadcasters, no channel is closed
933                    let mut tgts = self.targets.write().unwrap();
934                    for (tn, t) in tgts.iter_mut()
935                        .filter(|&(_, ref t)| {
936                            t.senders.is_empty() && t.delayed.lock().unwrap().is_empty()
937                        }) {
938                        debug!("{}: closing now-done channel {}", self.id, tn);
939                        // having no senders when there are no broadcasters means the channel is closed
940                        let mut state = t.channel.mx.lock().unwrap();
941                        state.closed = true;
942                        t.channel.cond.notify_one();
943                        drop(state);
944                    }
945                }
946            }
947        }
948    }
949}
950
951/// Creates a new clocked dispatch. Dispatch channels can be constructed by calling `new` on the
952/// returned dispatcher.
953///
954/// The dispatcher has an internal buffer for incoming messages. When this buffer becomes full,
955/// future sends to the dispatcher will block waiting for the buffer to open up. Note that a buffer
956/// size of 0 is valid, but its behavior differs from that of synchronous Rust channels. Because
957/// the dispatcher sits between the sender and the receiver, a bound of 0 will not guarantee a
958/// "rendezvous" between the sender and the receiver, but rather between the sender and the
959/// dispatcher (and subsequently, the dispatcher and the receiver).
960///
961/// Be aware that a bound of 0 means that it is not safe to drop a `ClockedSender` before the
962/// corresponding `ClockedReceiver` is reading from its end of the channel.
963pub fn new<T: Clone + Send + 'static>(bound: usize) -> Dispatcher<T> {
964    new_with_seed(bound, 0)
965}
966
967/// Creates a new clocked dispatch whose automatically assigned sequence numbers start at a given
968/// value.
969///
970/// This method is useful for programs that wish to maintain monotonic sequence numbers between
971/// multiple executions of the application. Such an application should track received sequence
972/// numbers, store the latest one upon exiting, and then use this method to resume the sequence
973/// numbers from that point onward upon resuming.
974pub fn new_with_seed<T: Clone + Send + 'static>(bound: usize, seed: usize) -> Dispatcher<T> {
975    use rand::{thread_rng, Rng};
976
977    let (stx, srx) = mpsc::sync_channel(bound);
978    let mut d = DispatchInner {
979        targets: sync::Arc::new(sync::RwLock::new(HashMap::new())),
980        destinations: HashSet::new(),
981        bdelay: BinaryHeap::new(),
982        broadcasters: HashSet::new(),
983        forwarding: None,
984        bound: bound,
985        id: thread_rng().gen_ascii_chars().take(2).collect(),
986        counter: seed,
987    };
988
989    let id = d.id.clone();
990    let c_targets = d.targets.clone();
991    let c_stx = stx.clone();
992    let (ctx, crx) = mpsc::sync_channel::<String>(0);
993    thread::spawn(move || {
994        // this thread handles leaving receivers.
995        // it is *basically* the following loop:
996        //
997        // ```
998        // for left in crx {
999        //   c_targets[left].channel.left = true;
1000        //   ctx.send(ReceiverLeave(left));
1001        // }
1002        // ```
1003        //
1004        // unfortunately, it gets complicated by two factors:
1005        //
1006        //  - if a receiver is created and then dropped immediately, the Leave could reach us
1007        //    before the Join reaches the dispatcher. in this case, targets[left] doesn't exist
1008        //    yet. we thus need to wait for the dispatcher to catch up to the join.
1009        //
1010        //  - the dispatcher can't be blocking on a send to the receiver we are dropping because
1011        //    it doesn't know about it yet, and thus must process the join message before it can
1012        //    block waiting on us). however, there *is* a possibility of the dispatcher being
1013        //    blocked on a send to a channel that is queued to be dropped *behind* this one. this
1014        //    is the source of much of the complexity below.
1015        //
1016        // essentially, we keep track of leaving receivers that we haven't successfully handled
1017        // yet, but also keep reading from crx to see if there are other receivers that are also
1018        // trying to leave.
1019
1020        // receivers that are trying to leave
1021        let mut leaving = Vec::new();
1022        // temp for keeping track of nodes are *still* trying to leave while draining `leaving`
1023        let mut leaving_ = Vec::new();
1024
1025        'recv: loop {
1026            // are there more receivers trying to leave?
1027            let left = crx.try_recv();
1028            match left {
1029                Ok(left) => {
1030                    // yes -- deal with them too
1031                    leaving.push(left);
1032                }
1033                Err(..) if !leaving.is_empty() => {
1034                    // no, but deal with the receivers that wanted to leave
1035                }
1036                Err(mpsc::TryRecvError::Disconnected) => {
1037                    // no, and there will never be more
1038                    // we must also have dealt with all receivers who tried to leave
1039                    // it's safe for us to exit
1040                    break 'recv;
1041                }
1042                Err(mpsc::TryRecvError::Empty) => {
1043                    // no, and there also aren't any for us to retry
1044                    // to avoid busy looping, we can now do a blocking receive
1045                    let left = crx.recv();
1046                    if let Ok(left) = left {
1047                        // someone tried to leave -- let's try to deal with that
1048                        leaving.push(left);
1049                    } else {
1050                        // channel closed, and no one is waiting -- safe to exit
1051                        break 'recv;
1052                    }
1053                }
1054            }
1055
1056            // try to process any receivers trying to leave
1057            for left in leaving.drain(..) {
1058                debug!("{} control: dealing with departure of receiver {}",
1059                       id,
1060                       left);
1061
1062                let targets = c_targets.read().unwrap();
1063                if let Some(t) = targets.get(&*left) {
1064                    // the receiver exists, so we can remove it
1065                    let mut state = t.channel.mx.lock().unwrap();
1066                    state.left = true;
1067                    state.closed = true;
1068                    t.channel.cond.notify_one();
1069                    drop(state);
1070
1071                    // kick off a message to the dispatcher in the background.
1072                    // we don't want it to be in the foreground, because we may have other things
1073                    // to close that could block sending to the dispatcher.
1074                    let ctx = c_stx.clone();
1075                    thread::spawn(move || {
1076                        ctx.send(Message::ReceiverLeave(left)).unwrap();
1077                    });
1078                } else {
1079                    // dispatcher doesn't know about this receiver yet
1080                    leaving_.push(left);
1081                }
1082            }
1083            leaving.extend(leaving_.drain(..));
1084        }
1085    });
1086
1087    thread::spawn(move || {
1088        for m in srx.iter() {
1089            d.absorb(m);
1090        }
1091    });
1092
1093    Dispatcher {
1094        dispatcher: stx,
1095        leave: ctx,
1096        bound: bound,
1097    }
1098}
1099
1100#[cfg(test)]
1101mod tests {
1102    #[test]
1103    fn can_send_after_recv_drop() {
1104        // Create a dispatcher
1105        let d = super::new(1);
1106
1107        // Create two channels
1108        let (tx_a, rx_a) = d.new("atx", "arx");
1109        let (tx_b, rx_b) = d.new("btx", "brx");
1110        let _ = tx_a;
1111
1112        // Drop a receiver
1113        drop(rx_a);
1114
1115        // Ensure that sending doesn't block forever
1116        tx_b.send(10);
1117
1118        // And that messages are still delivered
1119        assert_eq!(rx_b.recv().unwrap().0.unwrap(), 10);
1120    }
1121
1122    #[test]
1123    fn recv_drop_unblocks_sender() {
1124        use std::thread;
1125        use std::time::Duration;
1126
1127        // Create a dispatcher
1128        let d = super::new(1);
1129
1130        // Create two channels
1131        let (tx_a, rx_a) = d.new("atx", "arx");
1132        let (tx_b, rx_b) = d.new("btx", "brx");
1133
1134        // Make tx_a a broadcaster (so it would block on b)
1135        // Note that we have to do this *before* we saturate the channel to the dispatcher
1136        let tx_a = tx_a.into_broadcaster();
1137
1138        // Fill rx_b
1139        thread::spawn(move || {
1140            for _ in 0..20 {
1141                tx_b.send("b");
1142            }
1143        });
1144        thread::sleep(Duration::from_millis(200));
1145
1146        // Drop b's receiver
1147        drop(rx_b);
1148
1149        // All of tx_b's sends should be dropped, and tx_a should be able to send
1150        tx_a.broadcast("a");
1151
1152        // And that messages are still delivered
1153        loop {
1154            let rx = rx_a.recv();
1155            assert!(rx.is_ok());
1156            let rx = rx.unwrap();
1157            if rx.0.is_some() {
1158                assert_eq!(rx.0, Some("a"));
1159                break;
1160            }
1161        }
1162    }
1163
1164    #[test]
1165    fn can_forward_after_recv_drop() {
1166        // Create a dispatcher
1167        let d = super::new(1);
1168
1169        // Create two channels
1170        let (tx_a, rx_a) = d.new("atx", "arx");
1171        let (tx_b, rx_b) = d.new("btx", "brx");
1172        let _ = tx_a;
1173
1174        // Drop a receiver
1175        drop(rx_a);
1176
1177        // Ensure that forwarding doesn't block forever
1178        tx_b.forward(Some(10), 1);
1179        // note that dropping the receiver kills the senders too!
1180
1181        // And that messages are still delivered
1182        assert_eq!(rx_b.recv(), Ok((Some(10), 1)));
1183    }
1184
1185    #[test]
1186    fn forward_with_no_senders() {
1187        use std::sync::mpsc;
1188
1189        let d = super::new(1);
1190        let (tx_a, rx_a) = d.new("atx", "arx");
1191        let (tx_b, rx_b) = d.new("btx", "brx");
1192
1193        tx_a.forward(Some(1), 1);
1194        // the message is queued because tx_b hasn't sent anything
1195
1196        // drop both senders, freeing Some(1) from the delayed queue. we specifically want to
1197        // explore the case where Some(1) is freed when there are no senders, so we have to drop
1198        // tx_a first (since tx_b is holding up the system).
1199        drop(tx_a);
1200        drop(tx_b);
1201
1202        // Ensure that receiver still gets notified of messages
1203        assert_eq!(rx_a.recv(), Ok((Some(1), 1)));
1204
1205        // And that other still gets a None
1206        assert_eq!(rx_b.recv(), Ok((None, 1)));
1207
1208        // And that no more entries are sent
1209        assert_eq!(rx_a.recv(), Err(mpsc::RecvError));
1210        assert_eq!(rx_b.recv(), Err(mpsc::RecvError));
1211    }
1212
1213    #[test]
1214    fn broadcast_dupe_termination() {
1215        use std::sync::mpsc;
1216
1217        let d = super::new(1);
1218        let (tx, rx) = d.new("tx", "rx");
1219        let tx = tx.into_broadcaster();
1220
1221        tx.broadcast_forward(Some("a"), 1);
1222        tx.broadcast_forward(Some("b"), 2);
1223        drop(tx);
1224
1225        assert_eq!(rx.recv(), Ok((Some("a"), 1)));
1226        assert_eq!(rx.recv(), Ok((Some("b"), 2)));
1227        assert_eq!(rx.recv(), Err(mpsc::RecvError));
1228    }
1229
1230    #[test]
1231    fn multisend_thread_interleaving() {
1232        use std::thread;
1233
1234        for _ in 0..1000 {
1235            let d = super::new(20);
1236            let (tx_a, rx) = d.new("tx_a", "rx");
1237            let tx_b = tx_a.clone("tx_b");
1238
1239            let t_a = thread::spawn(move || {
1240                tx_a.forward(Some("c_1"), 1);
1241                tx_a.forward(Some("c_3"), 3);
1242                tx_a.forward(Some("a_1"), 5);
1243            });
1244            let t_b = thread::spawn(move || {
1245                tx_b.forward(Some("c_2"), 2);
1246                tx_b.forward(Some("b_1"), 4);
1247                tx_b.forward(Some("a_2"), 6);
1248            });
1249
1250            assert_eq!(rx.recv(), Ok((Some("c_1"), 1)));
1251            assert_eq!(rx.recv(), Ok((Some("c_2"), 2)));
1252            assert_eq!(rx.recv(), Ok((Some("c_3"), 3)));
1253            assert_eq!(rx.recv(), Ok((Some("b_1"), 4)));
1254            assert_eq!(rx.recv(), Ok((Some("a_1"), 5)));
1255            assert_eq!(rx.recv(), Ok((Some("a_2"), 6)));
1256
1257            t_a.join().unwrap();
1258            t_b.join().unwrap();
1259        }
1260    }
1261
1262    #[test]
1263    fn test_new_with_seed() {
1264        let d = super::new_with_seed(1, 69105);
1265        let (tx, rx) = d.new("tx", "rx");
1266        tx.send("a");
1267        assert_eq!(rx.recv(), Ok((Some("a"), 69106)));
1268    }
1269}