clocked_dispatch/lib.rs
1//! Provides a message dispatch service where each receiver is aware of messages passed to other
2//! peers. In particular, if a message is sent to some receiver `r`, another receiver `r'` will be
3//! aware that one message has been dispatched when it does a subsequent read. Furthermore, the
4//! dispatcher ensures that messages are delivered in order by not emitting data until all input
5//! sources have confirmed that they will not send data with lower sequence numbers.
6//!
7//! The library ensures that a sender will not block due to the slowness of a receiver that is not
8//! the intended recipient of the message in question. For example, if there are two receivers, `r`
9//! and `r'`, `r.send(v)` will not block even though `r'` is not currently reading from its input
10//! channel.
11//!
12//! The library is implemented by routing all messages through a single dispatcher.
13//! This central dispatcher operates in one of two modes, *forwarding* or *serializing*.
14//!
15//! - In serializing mode, it assigns a monotonically increasing timestamp to each message, and
16//! forwards it to the intended recipient's queue.
17//! - In forwarding mode, it accepts timestamped messages from sources, and outputs them to the
18//! intended recipients *in order*. Messages are buffered by the dispatcher until each of the
19//! receiver's sources are at least as up-to-date as the message's timestamp. These timestamps
20//! *must* be sequentially assigned, but *may* be sent to the dispatcher in any order. The
21//! dispatcher guarantees that they are delivered in-order.
22//!
23//! This dual-mode operation allows dispatchers to be composed in a hierarchical fashion, with a
24//! serializing dispatcher at the "top", and forwarding dispatchers "below".
25//!
26//! # Examples:
27//!
28//! Simple usage:
29//!
30//! ```
31//! use std::thread;
32//! use clocked_dispatch;
33//!
34//! // Create a dispatcher
35//! let d = clocked_dispatch::new(1);
36//!
37//! // Create a simple streaming channel
38//! let (tx, rx) = d.new("atx1", "arx");
39//! thread::spawn(move|| {
40//! tx.send(10);
41//! });
42//! assert_eq!(rx.recv().unwrap().0.unwrap(), 10);
43//! ```
44//!
45//! Shared usage:
46//!
47//! ```
48//! use std::thread;
49//! use clocked_dispatch;
50//!
51//! // Create a dispatcher.
52//! // Notice that we need more buffer space to the dispatcher here.
53//! // This is because clone() needs to talk to the dispatcher, but the buffer to the dispatcher
54//! // may already have been filled up by the sends in the threads we spawned.
55//! let d = clocked_dispatch::new(10);
56//!
57//! // Create a shared channel that can be sent along from many threads
58//! // where tx is the sending half (tx for transmission), and rx is the receiving
59//! // half (rx for receiving).
60//! let (tx, rx) = d.new("atx", "arx");
61//! for i in 0..10 {
62//! let tx = tx.clone(format!("atx{}", i));
63//! thread::spawn(move|| {
64//! tx.send(i);
65//! });
66//! }
67//!
68//! for _ in 0..10 {
69//! let j = rx.recv().unwrap().0.unwrap();
70//! assert!(0 <= j && j < 10);
71//! }
72//! ```
73//!
74//! Accessing timestamps:
75//!
76//! ```
77//! use clocked_dispatch;
78//! let m = clocked_dispatch::new(10);
79//! let (tx_a, rx_a) = m.new("atx1", "a");
80//!
81//! // notice that we can't use _ here even though tx_b is unused because
82//! // then tx_b would be dropped, causing rx_b to be closed immediately
83//! let (tx_b, rx_b) = m.new("btx1", "b");
84//! let _ = tx_b;
85//!
86//! tx_a.send("a1");
87//! let x = rx_a.recv().unwrap();
88//! assert_eq!(x.0, Some("a1"));
89//! assert_eq!(rx_b.recv(), Ok((None, x.1)));
90//!
91//! tx_a.send("a2");
92//! tx_a.send("a3");
93//!
94//! let a1 = rx_a.recv().unwrap();
95//! assert_eq!(a1.0, Some("a2"));
96//!
97//! let a2 = rx_a.recv().unwrap();
98//! assert_eq!(a2.0, Some("a3"));
99//!
100//! // b must see the timestamp from either a1 or a2
101//! // it could see a1 if a2 hasn't yet been delivered
102//! let b = rx_b.recv().unwrap();
103//! assert_eq!(b.0, None);
104//! assert!(b.1 == a1.1 || b.1 == a2.1);
105//! ```
106//!
107//! In-order delivery
108//!
109//! ```
110//! use clocked_dispatch;
111//! use std::sync::mpsc;
112//!
113//! let m = clocked_dispatch::new(10);
114//! let (tx1, rx) = m.new("tx1", "a");
115//! let tx2 = tx1.clone("tx2");
116//!
117//! tx1.forward(Some("a1"), 1);
118//! assert_eq!(rx.try_recv(), Err(mpsc::TryRecvError::Empty));
119//!
120//! tx2.forward(None, 1);
121//! assert_eq!(rx.recv(), Ok((Some("a1"), 1)));
122//! ```
123
124extern crate rand;
125
126use std::sync::{Arc, Mutex, Condvar};
127use std::cmp::Ordering;
128use std::collections::HashMap;
129use std::collections::HashSet;
130use std::collections::VecDeque;
131use std::collections::BinaryHeap;
132use std::sync::mpsc;
133use std::thread;
134use std::sync;
135
136macro_rules! debug {
137 ( $fmt:expr ) => {
138 // println!($fmt);
139 };
140 ( $fmt:expr, $( $args:expr ),+ ) => {
141 // println!($fmt, $($args),*);
142 $(let _ = $args;)*;
143 };
144}
145
146struct TaggedData<T> {
147 from: String,
148 to: Option<String>,
149 ts: Option<usize>,
150 data: Option<T>,
151}
152
153/// A message intended for the dispatcher.
154enum Message<T> {
155 Data(TaggedData<T>),
156 ReceiverJoin(String, Arc<ReceiverInner<T>>),
157 ReceiverLeave(String),
158 SenderJoin(Option<String>, String),
159 SenderLeave(Option<String>, String),
160}
161
162/// The sending half of a clocked synchronous channel.
163/// This half can only be owned by one thread, but it can be cloned to send to other threads.
164///
165/// Sending on a clocked channel will deliver the given message to the appropriate receiver, but
166/// also notify all other receivers about the timestamp assigned to the message. The sending will
167/// never block on a receiver that is not the destination of the message.
168///
169/// Beware that dropping a clocked sender incurs control messages to the dispatcher, and that those
170/// control messages may result in messages being sent to receivers. If the dispatch channel is not
171/// sufficiently buffered, this means that dropping a `ClockedSender` before the corresponding
172/// `ClockedReceiver` is receiving on its end of the channel may deadlock.
173///
174/// When the last `ClockedSender` is dropped for a target, and there are no `ClockedBroadcaster`s,
175/// the dispatcher will automatically be notified, and the recipient will see a disconnected
176/// channel error once it has read all buffered messages.
177///
178/// ```
179/// use clocked_dispatch;
180/// use std::thread;
181///
182/// let m = clocked_dispatch::new(10);
183/// let (tx_a, rx_a) = m.new("atx", "arx");
184///
185/// let tx_a1 = tx_a.clone("atx1");
186/// thread::spawn(move || {
187/// tx_a1.send("a1");
188/// });
189///
190/// let tx_a2 = tx_a.clone("atx2");
191/// thread::spawn(move || {
192/// tx_a2.send("a2");
193/// });
194///
195/// drop(tx_a);
196/// assert_eq!(rx_a.count(), 2);
197/// ```
198pub struct ClockedSender<T> {
199 target: String,
200 source: String,
201 dispatcher: mpsc::SyncSender<Message<T>>,
202}
203
204impl<T> Drop for ClockedSender<T> {
205 fn drop(&mut self) {
206 self.dispatcher
207 .send(Message::SenderLeave(Some(self.target.clone()), self.source.clone()))
208 .unwrap();
209 }
210}
211
212impl<T> ClockedSender<T> {
213 /// Sends a value on this synchronous channel, and notifies all other recipients of the
214 /// timestamp it is assigned by the dispatcher.
215 ///
216 /// This function will *block* until space in the internal buffer becomes available, or a
217 /// receiver is available to hand off the message to.
218 ///
219 /// Note that a successful send does *not* guarantee that the receiver will ever see the data if
220 /// there is a buffer on this channel. Items may be enqueued in the internal buffer for the
221 /// receiver to receive at a later time. If the buffer size is 0, however, it can be guaranteed
222 /// that the receiver has indeed received the data if this function returns success.
223 pub fn send(&self, data: T) {
224 // XXX: would be really neat if we could return the ts here, but that'll probably be tricky
225 // TODO: This function will never panic, but it may return `Err` if the `Receiver` has
226 // disconnected and is no longer able to receive information.
227 self.dispatcher
228 .send(Message::Data(TaggedData {
229 from: self.source.clone(),
230 to: Some(self.target.clone()),
231 ts: None,
232 data: Some(data),
233 }))
234 .unwrap()
235 }
236
237 /// Sends an already-sequenced value to the associated receiver. The message may be buffered
238 /// by the dispatcher until it can guarantee that no other sender will later try to send
239 /// messages with a lower sequence number.
240 ///
241 /// It is optional to include data when forwarding. If no data is included, this message
242 /// conveys to the dispatcher that this sender promises not to send later messages with a
243 /// higher sequence number than the one given.
244 pub fn forward(&self, data: Option<T>, ts: usize) {
245 self.dispatcher
246 .send(Message::Data(TaggedData {
247 from: self.source.clone(),
248 to: Some(self.target.clone()),
249 ts: Some(ts),
250 data: data,
251 }))
252 .unwrap()
253 }
254
255 /// Creates a new clocked sender for this sender's receiver.
256 ///
257 /// Clocked dispatch requires that all senders have a unique name so that the "up-to-date-ness"
258 /// of the senders can be tracked reliably.
259 pub fn clone<V: Into<String>>(&self, source: V) -> ClockedSender<T> {
260 let source = source.into();
261 self.dispatcher
262 .send(Message::SenderJoin(Some(self.target.clone()), source.clone()))
263 .unwrap();
264 ClockedSender {
265 source: source,
266 target: self.target.clone(),
267 dispatcher: self.dispatcher.clone(),
268 }
269 }
270}
271
272impl<T: Clone> ClockedSender<T> {
273 /// Converts this sender into a broadcast sender.
274 ///
275 /// Doing so detaches the sender from its receiver, and means all future sends will be
276 /// broadcast to all receivers. Note that the existence of a broadcaster prevents the closing
277 /// of all channels.
278 pub fn into_broadcaster(self) -> ClockedBroadcaster<T> {
279 let dispatcher = self.dispatcher.clone();
280 let source = format!("{}_bcast", self.source);
281 dispatcher.send(Message::SenderJoin(None, source.clone())).unwrap();
282
283 // NOTE: the drop of self causes a Message::SenderLeave to be sent for this sender
284 ClockedBroadcaster {
285 source: source,
286 dispatcher: dispatcher,
287 }
288 }
289}
290
291/// A sending half of a clocked synchronous channel that only allows broadcast. This half can only
292/// be owned by one thread, but it can be cloned to send to other threads. A `ClockedBroadcaster`
293/// can be constructed from a `ClockedSender` using `ClockedSender::into_broadcaster`.
294///
295/// Sending on a clocked channel will deliver the given message to the appropriate receiver, but
296/// also notify all other receivers about the timestamp assigned to the message. The sending will
297/// never block on a receiver that is not the destination of the message.
298///
299/// Beware that dropping a clocked sender incurs control messages to the dispatcher, and that those
300/// control messages may result in messages being sent to receivers. If the dispatch channel is not
301/// sufficiently buffered, this means that dropping a `ClockedSender` before the corresponding
302/// `ClockedReceiver` is receiving on its end of the channel may deadlock.
303///
304/// Note that the existence of a `ClockedBroadcater` prevents the closing of any clocked channels
305/// managed by this dispatcher.
306///
307/// # Examples
308///
309/// Regular broadcast:
310///
311/// ```
312/// use clocked_dispatch;
313/// use std::sync::mpsc;
314///
315/// let m = clocked_dispatch::new(10);
316/// let (tx_a, rx_a) = m.new("atx", "arx");
317/// let tx = tx_a.into_broadcaster();
318/// // note that the A channel is still open since there now exists a broadcaster,
319/// // even though all A senders have been dropped.
320///
321/// let (tx_b, rx_b) = m.new("btx", "brx");
322///
323/// tx.broadcast("1");
324///
325/// let x = rx_a.recv().unwrap();
326/// assert_eq!(x.0, Some("1"));
327/// assert_eq!(rx_b.recv(), Ok(x));
328///
329/// // non-broadcasts still work
330/// tx_b.send("2");
331/// let x = rx_b.recv().unwrap();
332/// assert_eq!(x.0, Some("2"));
333/// assert_eq!(rx_a.recv(), Ok((None, x.1)));
334///
335/// // drop broadcaster
336/// drop(tx);
337///
338/// // A is now closed because there are no more senders
339/// assert_eq!(rx_a.recv(), Err(mpsc::RecvError));
340///
341/// // rx_b is *not* closed because tx_b still exists
342/// assert_eq!(rx_b.try_recv(), Err(mpsc::TryRecvError::Empty));
343///
344/// drop(tx_b);
345/// // rx_b is now closed because its senders have all gone away
346/// assert_eq!(rx_b.recv(), Err(mpsc::RecvError));
347/// ```
348///
349/// Forwarding broadcast:
350///
351/// ```
352/// use clocked_dispatch;
353/// use std::sync::mpsc;
354///
355/// let m = clocked_dispatch::new(10);
356/// let (tx_a, rx_a) = m.new("atx", "arx");
357/// let (tx_b, rx_b) = m.new("btx", "brx");
358/// let (tx_c, rx_c) = m.new("ctx", "crx");
359///
360/// let tx = tx_a.into_broadcaster();
361/// tx.broadcast_forward(Some("1"), 1);
362///
363/// assert_eq!(rx_a.recv().unwrap(), (Some("1"), 1));
364/// assert_eq!(rx_b.recv().unwrap(), (Some("1"), 1));
365/// assert_eq!(rx_c.recv().unwrap(), (Some("1"), 1));
366///
367/// // non-broadcasts still work
368/// tx_c.forward(Some("c"), 2);
369/// assert_eq!(rx_a.recv().unwrap(), (None, 2));
370/// assert_eq!(rx_b.recv().unwrap(), (None, 2));
371/// assert_eq!(rx_c.recv().unwrap(), (Some("c"), 2));
372/// ```
373pub struct ClockedBroadcaster<T: Clone> {
374 source: String,
375 dispatcher: mpsc::SyncSender<Message<T>>,
376}
377
378impl<T: Clone> Drop for ClockedBroadcaster<T> {
379 fn drop(&mut self) {
380 self.dispatcher.send(Message::SenderLeave(None, self.source.clone())).unwrap();
381 }
382}
383
384impl<T: Clone> ClockedBroadcaster<T> {
385 /// Sends a value to all receivers known to this dispatcher. The value will be assigned a
386 /// sequence number by the dispatcher.
387 ///
388 /// This function will *block* until space in the internal buffer becomes available, or a
389 /// receiver is available to hand off the message to.
390 ///
391 /// Note that a successful send does *not* guarantee that the receiver will ever see the data if
392 /// there is a buffer on this channel. Items may be enqueued in the internal buffer for the
393 /// receiver to receive at a later time. If the buffer size is 0, however, it can be guaranteed
394 /// that the receiver has indeed received the data if this function returns success.
395 pub fn broadcast(&self, data: T) {
396 self.dispatcher
397 .send(Message::Data(TaggedData {
398 from: self.source.clone(),
399 to: None,
400 ts: None,
401 data: Some(data),
402 }))
403 .unwrap()
404 }
405
406 /// Sends an already-sequenced value to all receivers known to this dispatcher. The message may
407 /// be buffered by the dispatcher until it can guarantee that no other sender will later try to
408 /// send messages with a lower sequence number.
409 ///
410 /// This function will *block* until space in the internal buffer becomes available, or a
411 /// receiver is available to hand off the message to.
412 ///
413 /// Note that a successful send does *not* guarantee that the receiver will ever see the data if
414 /// there is a buffer on this channel. Items may be enqueued in the internal buffer for the
415 /// receiver to receive at a later time. If the buffer size is 0, however, it can be guaranteed
416 /// that the receiver has indeed received the data if this function returns success.
417 ///
418 /// It is optional to include data when forwarding. If no data is included, this message
419 /// conveys to the dispatcher that this sender promises not to send later messages with a
420 /// higher sequence number than the one given.
421 pub fn broadcast_forward(&self, data: Option<T>, ts: usize) {
422 self.dispatcher
423 .send(Message::Data(TaggedData {
424 from: self.source.clone(),
425 to: None,
426 ts: Some(ts),
427 data: data,
428 }))
429 .unwrap()
430 }
431
432 /// Creates a new clocked broadcast sender.
433 ///
434 /// Clocked dispatch requires that all senders have a unique name so that the "up-to-date-ness"
435 /// of the senders can be tracked reliably.
436 pub fn clone<V: Into<String>>(&self, source: V) -> ClockedBroadcaster<T> {
437 let source = source.into();
438 self.dispatcher.send(Message::SenderJoin(None, source.clone())).unwrap();
439 ClockedBroadcaster {
440 source: source,
441 dispatcher: self.dispatcher.clone(),
442 }
443 }
444}
445
446struct QueueState<T> {
447 queue: VecDeque<(T, usize)>,
448 ts_head: usize,
449 ts_tail: usize,
450 closed: bool,
451 left: bool,
452}
453
454struct ReceiverInner<T> {
455 mx: Mutex<QueueState<T>>,
456 cond: Condvar,
457}
458
459/// The receiving half of a clocked synchronous channel.
460///
461/// A clocked receiver will receive all messages sent by one of its associated senders. It will
462/// also receive notifications whenever a message with a higher timestamp than any it has seen has
463/// been sent to another receiver under the same dispatcher.
464///
465/// Dropping it will unblock any senders trying to send to this receiver.
466pub struct ClockedReceiver<T: Send + 'static> {
467 leave: mpsc::SyncSender<String>,
468 inner: Arc<ReceiverInner<T>>,
469 name: String,
470}
471
472impl<T: Send + 'static> ClockedReceiver<T> {
473 fn new<V: Into<String>>(name: V,
474 leave: mpsc::SyncSender<String>,
475 bound: usize)
476 -> ClockedReceiver<T> {
477 ClockedReceiver {
478 leave: leave,
479 inner: Arc::new(ReceiverInner {
480 mx: Mutex::new(QueueState {
481 queue: VecDeque::with_capacity(bound),
482 ts_head: 0,
483 ts_tail: 0,
484 closed: false,
485 left: false,
486 }),
487 cond: Condvar::new(),
488 }),
489 name: name.into(),
490 }
491 }
492}
493
494impl<T: Send + 'static> Iterator for ClockedReceiver<T> {
495 type Item = (Option<T>, usize);
496 fn next(&mut self) -> Option<Self::Item> {
497 self.recv().ok()
498 }
499}
500
501impl<T: Send + 'static> Drop for ClockedReceiver<T> {
502 fn drop(&mut self) {
503 use std::mem;
504
505 let name = mem::replace(&mut self.name, String::new());
506 self.leave.send(name).unwrap();
507 // wait until we've actually been dropped
508 self.count();
509 }
510}
511
512impl<T: Send + 'static> ClockedReceiver<T> {
513 /// Attempts to wait for a value on this receiver, returning an error if the corresponding
514 /// channel has hung up.
515 ///
516 /// This function will always block the current thread if there is no data available, the
517 /// receiver has seen the latest timestamp handled by the dispatcher, and it's possible for
518 /// more data to be sent. Once a message is sent to a corresponding `ClockedSender`, then this
519 /// receiver will wake up and return that message. If a message is sent by a `ClockedSender`
520 /// connected to a different receiver under the same dispatcher, this receiver will wake up and
521 /// receive the timestamp assigned to that message.
522 ///
523 /// If all corresponding `ClockedSender` have disconnected, or disconnect while this call is
524 /// blocking, this call will wake up and return `Err` to indicate that no more messages can
525 /// ever be received on this channel. However, since channels are buffered, messages sent
526 /// before the disconnect will still be properly received.
527 pub fn recv(&self) -> Result<(Option<T>, usize), mpsc::RecvError> {
528 let mut state = self.inner.mx.lock().unwrap();
529 while state.ts_head == state.ts_tail && state.queue.is_empty() && !state.closed {
530 // NOTE: is is *not* sufficient to use head == tail as an indicator that there are no
531 // messages. specifically, if there are duplicates for a given timestamp, the equality
532 // may work out while there are still elements in the queue.
533 state = self.inner.cond.wait(state).unwrap();
534 }
535
536 // if there's something at the head of the queue, return it
537 if let Some((t, ts)) = state.queue.pop_front() {
538 state.ts_head = ts;
539 self.inner.cond.notify_one();
540 return Ok((Some(t), ts));
541 }
542
543 if state.ts_head == state.ts_tail {
544 // we must be closed
545 assert_eq!(state.closed, true);
546 return Err(mpsc::RecvError);
547 }
548
549 // otherwise, notify about the newest available timestamp
550 state.ts_head = state.ts_tail;
551 self.inner.cond.notify_one();
552 Ok((None, state.ts_head))
553 }
554
555 /// Attempts to return a pending value on this receiver without blocking
556 ///
557 /// This method will never block the caller in order to wait for data to become available.
558 /// Instead, this will always return immediately with a possible option of pending data on the
559 /// channel.
560 ///
561 /// This is useful for a flavor of "optimistic check" before deciding to block on a receiver.
562 pub fn try_recv(&self) -> Result<(Option<T>, usize), mpsc::TryRecvError> {
563 let mut state = self.inner.mx.lock().unwrap();
564 if state.ts_head == state.ts_tail && !state.closed {
565 // we have observed all timestamps, so the queue must be empty
566 return Err(mpsc::TryRecvError::Empty);
567 }
568
569 if state.ts_head == state.ts_tail {
570 // we must be closed
571 assert_eq!(state.closed, true);
572 return Err(mpsc::TryRecvError::Disconnected);
573 }
574
575 // if there's something at the head of the queue, return it
576 if let Some((t, ts)) = state.queue.pop_front() {
577 state.ts_head = ts;
578 self.inner.cond.notify_one();
579 return Ok((Some(t), ts));
580 }
581
582 // otherwise, notify about the newest available timestamp
583 state.ts_head = state.ts_tail;
584 self.inner.cond.notify_one();
585 Ok((None, state.ts_head))
586 }
587}
588
589/// Dispatch coordinator for adding additional clocked channels.
590pub struct Dispatcher<T: Send> {
591 dispatcher: mpsc::SyncSender<Message<T>>,
592 leave: mpsc::SyncSender<String>,
593 bound: usize,
594}
595
596impl<T: Send> Dispatcher<T> {
597 /// Creates a new named, synchronous, bounded, clocked channel managed by this dispatcher.
598 ///
599 /// The given receiver and sender names *must* be unique for this dispatch.
600 ///
601 /// The `ClockedReceiver` will block until a message or a new timestamp becomes available.
602 ///
603 /// The receiver's incoming channel has an internal buffer on which messages will be queued.
604 /// Its size is inherited from the dispatch bound. When this buffer becomes full, future
605 /// messages from the dispatcher will block waiting for the buffer to open up. Note that a
606 /// buffer size of 0 is valid, but its behavior differs from that of synchronous Rust channels.
607 /// Because the dispatcher sits between the sender and the receiver, a bound of 0 will not
608 /// guarantee a "rendezvous" between the sender and the receiver, but rather between the sender
609 /// and the dispatcher (and subsequently, the dispatcher and the receiver).
610 pub fn new<S1: Into<String>, S2: Into<String>>(&self,
611 sender: S1,
612 receiver: S2)
613 -> (ClockedSender<T>, ClockedReceiver<T>) {
614 let source = sender.into();
615 let target = receiver.into();
616 let send = ClockedSender {
617 source: source.clone(),
618 target: target.clone(),
619 dispatcher: self.dispatcher.clone(),
620 };
621 let recv = ClockedReceiver::new(target.clone(), self.leave.clone(), self.bound);
622
623 self.dispatcher.send(Message::ReceiverJoin(target.clone(), recv.inner.clone())).unwrap();
624 self.dispatcher.send(Message::SenderJoin(Some(target.clone()), source)).unwrap();
625 (send, recv)
626 }
627}
628
629/// `Delayed` is used to keep track of messages that cannot yet be safely delivered because it
630/// would violate the in-order guarantees.
631///
632/// `Delayed` structs are ordered by their timestamp such that the *lowest* is the "highest". This
633/// is so that `Delayed` can easily be used in a `BinaryHeap`.
634struct Delayed<T> {
635 ts: usize,
636 data: T,
637}
638
639impl<T> PartialEq for Delayed<T> {
640 fn eq(&self, other: &Delayed<T>) -> bool {
641 other.ts == self.ts
642 }
643}
644
645impl<T> PartialOrd for Delayed<T> {
646 fn partial_cmp(&self, other: &Delayed<T>) -> Option<Ordering> {
647 Some(self.cmp(other))
648 }
649}
650
651impl<T> Eq for Delayed<T> {}
652
653impl<T> Ord for Delayed<T> {
654 fn cmp(&self, other: &Delayed<T>) -> Ordering {
655 other.ts.cmp(&self.ts)
656 }
657}
658
659struct Target<T> {
660 // the receiver's channel
661 channel: Arc<ReceiverInner<T>>,
662 // messages for this receiver that have high timestamps, and must be delayed
663 // the mutex is so that we can allow the control thread to have an &Target without T: Sync
664 delayed: sync::Mutex<BinaryHeap<Delayed<T>>>,
665 // known senders for this receiver
666 senders: HashSet<String>,
667}
668
669// TODO:
670// It seems like dispatchers are always used in one of the following ways:
671//
672// - Multi-in, multi-out, unicast, assigning timestamps, no buffering
673// - Multi-in, single-out, unicast, forwarding, buffering
674// - Single-in, multi-out, broadcast, forwarding, no buffering
675//
676// We could specialize for each of these, which might increase performance and further modularize
677// the code. This would also allow restricting the API such that you can't start using one kind of
678// dispatcher in another mode. Another potentially good example of this is forcing T: Clone only
679// for broadcast dispatchers.
680struct DispatchInner<T> {
681 // per-receiver information
682 // needs to be locked so that control thread can access ReceiverInner Arcs
683 targets: sync::Arc<sync::RwLock<HashMap<String, Target<T>>>>,
684 // essentially targets.keys()
685 // kept separate so we can mutably use targets while iterating over all receiver names
686 destinations: HashSet<String>,
687 // known broadcasters
688 broadcasters: HashSet<String>,
689 // broadcast messages that have high timestamps, and must be delayed
690 bdelay: BinaryHeap<Delayed<T>>,
691 // whether we are operating in forwarding or serializing mode
692 // in the former, the senders assign timestamps
693 // in the latter, we assign the timestamps
694 // the first message we receive dictate the mode
695 forwarding: Option<bool>,
696 // queue bound
697 bound: usize,
698 id: String,
699 // Sequence number counter.
700 // If we are in forwarding mode this is the highest consecutive sequence number we have
701 // received. If we are in serializing mode, this is the last sequence number we have assigned.
702 counter: usize,
703}
704
705impl<T: Clone> DispatchInner<T> {
706 /// Notifies all receivers of the given timestamp, and sends any given data to the intended
707 /// recipients.
708 ///
709 /// If `data == None`, `ts` is sent all receivers.
710 /// If `to == None`, `data.unwrap()` is sent to all receivers.
711 /// If `to == Some(t)`, `data.unwrap()` is sent to the the receiver named `t`.
712 fn notify(&self, to: Option<&String>, ts: usize, data: Option<T>) {
713 let tgts = self.targets.read().unwrap();
714 for (tn, t) in tgts.iter() {
715 let mut state = t.channel.mx.lock().unwrap();
716 debug!("{}: notifying {} about {}", self.id, tn, ts);
717 if data.is_some() && (to.is_none() || to.unwrap() == tn.as_str()) {
718 debug!("{}: including data", self.id);
719 while state.queue.len() == self.bound && !state.left {
720 state = t.channel.cond.wait(state).unwrap();
721 }
722
723 if state.left {
724 t.channel.cond.notify_one();
725 continue;
726 }
727
728 // TODO: avoid clone() for the last send
729 state.queue.push_back((data.clone().unwrap(), ts));
730 }
731 state.ts_tail = ts;
732 t.channel.cond.notify_one();
733 drop(state);
734 }
735
736 // if data.is_some() && to.is_some() && !self.targets.contains_key(to.unwrap().as_str())
737 // this seems like a bad case, but it could just be that the receiver has left
738 // TODO: would be nice if we had some way of notifying the sender that this is the case
739 }
740
741 /// Find any delayed messages that are now earlier than the minimum sender sequence number, and
742 /// send them in-order. Will check both broadcast messages and messages to a given sender if
743 /// `to.is_some()`.
744 fn process_delayed(&mut self) {
745 assert!(self.forwarding.unwrap_or(false));
746 debug!("{}: processing delayed after {}", self.id, self.counter);
747
748 // keep looking for a candidate to send
749 loop {
750 // we need to find the message in `[bdelay + targets[to].delayed]` with the lowest
751 // timestamp. we do this by:
752 //
753 // 1. finding the smallest in `bdelay`
754 let next = self.bdelay.peek().map(|d| d.ts);
755 debug!("{}: next from bcast is {:?}", self.id, next);
756 // 2. finding the smallest in `targets[*].delayed`
757 let tnext = {
758 let tgts = self.targets.read().unwrap();
759 let t = self.destinations
760 .iter()
761 .map(|to| {
762 let t = &tgts[to];
763 (to,
764 t.delayed
765 .lock()
766 .unwrap()
767 .peek()
768 .map(|d| d.ts))
769 })
770 .filter_map(|(to, ts)| ts.map(move |ts| (to, ts)))
771 .min_by_key(|&(_, ts)| ts);
772 t.map(|(to, ts)| (to.to_owned(), ts))
773 };
774
775 debug!("{}: next from tdelay is {:?}", self.id, tnext);
776
777 // 3. using the message from 2 if it is the next message
778 if let Some((to, tnext)) = tnext {
779 if tnext == self.counter + 1 {
780 debug!("{}: forwarding from tdelay", self.id);
781 let d = {
782 let tgts = self.targets.read().unwrap();
783 let mut x = tgts[to.as_str()].delayed.lock().unwrap();
784 x.pop().unwrap()
785 };
786 self.notify(Some(&to), d.ts, Some(d.data));
787 self.counter += 1;
788 continue;
789 }
790 }
791
792 // 4. using the message from 1 if it is the next message
793 if let Some(ts) = next {
794 if ts == self.counter + 1 {
795 debug!("{}: forwarding from bdelay", self.id);
796 let d = self.bdelay.pop().unwrap();
797 self.notify(None, d.ts, Some(d.data));
798 self.counter += 1;
799 continue;
800 }
801 }
802
803 // no delayed message has a sequence number <= min
804 break;
805 }
806
807 debug!("{}: done replaying", self.id);
808 }
809
810 /// Takes a message from any sender, handles control messages, and delays or delivers data
811 /// messages.
812 ///
813 /// The first data message sets which mode the dispatcher operates in. If the first message has
814 /// a sequence number, the dispatcher will operate in forwarding mode. If it does not, it will
815 /// operate in assignment mode. In the former, it expects every data message to be numbered,
816 /// and delays too-new messages until all inputs are at least that up-to-date. In the latter,
817 /// it will deliver all messages immediately, and will assign sequence numbers to each one.
818 fn absorb(&mut self, m: Message<T>) {
819 match m {
820 Message::Data(td) => {
821 debug!("{}: got message with ts {:?} from {} for {:?}",
822 self.id,
823 td.ts,
824 td.from,
825 td.to);
826 if self.forwarding.is_some() {
827 assert!(self.forwarding.unwrap() == td.ts.is_some(),
828 "one sender sent timestamp, another did not");
829 } else {
830 self.forwarding = Some(td.ts.is_some())
831 }
832
833 if let Some(ts) = td.ts {
834 // if we are forwarding (which must be the case here), this message may be the
835 // next to be sent out. in that case we should increase the sequence number
836 // tracker so that any later messages will also be released
837 assert!(ts >= self.counter);
838 if ts == self.counter + 1 {
839 self.counter = ts;
840 }
841 }
842
843 if td.ts.is_none() {
844 // the sender leaves it up to us to pick timestamps, so we know we're always up
845 // to date. note that this latter case assumes that the senders will *never*
846 // give us timestamps once they have let us pick once.
847 self.counter += 1;
848 self.notify(td.to.as_ref(), self.counter, td.data);
849 return;
850 }
851
852 // we're in forwarding mode
853 let ts = td.ts.unwrap();
854 if ts == self.counter {
855 // this messages is the next to be sent out, so we can send it immediately
856 self.notify(td.to.as_ref(), ts, td.data);
857 // since this messages must also have incremented the counter above, there may
858 // be other messages that can now be sent out.
859 self.process_delayed();
860 return;
861 }
862
863 // need to buffer this message until the other views are sufficiently up-to-date.
864 if let Some(data) = td.data {
865 if let Some(ref to) = td.to {
866 debug!("{}: delayed in {:?}", self.id, to);
867 let tgts = self.targets.read().unwrap();
868 tgts[to].delayed.lock().unwrap().push(Delayed {
869 ts: ts,
870 data: data,
871 });
872 drop(tgts);
873 } else {
874 debug!("{}: delayed in bcast", self.id);
875 self.bdelay.push(Delayed {
876 ts: ts,
877 data: data,
878 });
879 }
880 }
881 }
882 Message::ReceiverJoin(name, inner) => {
883 debug!("{}: receiver {} joined", self.id, name);
884 if !self.destinations.insert(name.clone()) {
885 panic!("receiver {} already exists!", name);
886 }
887
888 let mut tgts = self.targets.write().unwrap();
889 tgts.insert(name,
890 Target {
891 channel: inner,
892 senders: HashSet::new(),
893 delayed: sync::Mutex::new(BinaryHeap::new()),
894 });
895 }
896 Message::ReceiverLeave(name) => {
897 debug!("{}: receiver {} left", self.id, name);
898 // NOTE: Control thread has already unblocked senders and set .left
899
900 // Deregister the receiver
901 let mut tgts = self.targets.write().unwrap();
902 tgts.remove(&*name);
903 self.destinations.remove(&*name);
904
905 // TODO: ensure that subsequent send()'s return an error (somehow?) instead of just
906 // crashing and burning (panic) like what happens now.
907 }
908 Message::SenderJoin(target, source) => {
909 debug!("{}: sender {} for {:?} joined", self.id, source, target);
910
911 if let Some(target) = target {
912 let mut tgts = self.targets.write().unwrap();
913 tgts.get_mut(&*target).unwrap().senders.insert(source);
914 } else {
915 self.broadcasters.insert(source);
916 }
917 }
918 Message::SenderLeave(target, source) => {
919 debug!("{}: sender {} for {:?} left", self.id, source, target);
920 if let Some(ref target) = target {
921 // NOTE: target may not exist because receiver has left
922 let mut tgts = self.targets.write().unwrap();
923 if let Some(target) = tgts.get_mut(target.as_str()) {
924 target.senders.remove(&*source);
925 }
926 drop(tgts);
927 } else {
928 self.broadcasters.remove(&*source);
929 }
930
931 if self.broadcasters.is_empty() {
932 // if there are broadcasters, no channel is closed
933 let mut tgts = self.targets.write().unwrap();
934 for (tn, t) in tgts.iter_mut()
935 .filter(|&(_, ref t)| {
936 t.senders.is_empty() && t.delayed.lock().unwrap().is_empty()
937 }) {
938 debug!("{}: closing now-done channel {}", self.id, tn);
939 // having no senders when there are no broadcasters means the channel is closed
940 let mut state = t.channel.mx.lock().unwrap();
941 state.closed = true;
942 t.channel.cond.notify_one();
943 drop(state);
944 }
945 }
946 }
947 }
948 }
949}
950
951/// Creates a new clocked dispatch. Dispatch channels can be constructed by calling `new` on the
952/// returned dispatcher.
953///
954/// The dispatcher has an internal buffer for incoming messages. When this buffer becomes full,
955/// future sends to the dispatcher will block waiting for the buffer to open up. Note that a buffer
956/// size of 0 is valid, but its behavior differs from that of synchronous Rust channels. Because
957/// the dispatcher sits between the sender and the receiver, a bound of 0 will not guarantee a
958/// "rendezvous" between the sender and the receiver, but rather between the sender and the
959/// dispatcher (and subsequently, the dispatcher and the receiver).
960///
961/// Be aware that a bound of 0 means that it is not safe to drop a `ClockedSender` before the
962/// corresponding `ClockedReceiver` is reading from its end of the channel.
963pub fn new<T: Clone + Send + 'static>(bound: usize) -> Dispatcher<T> {
964 new_with_seed(bound, 0)
965}
966
967/// Creates a new clocked dispatch whose automatically assigned sequence numbers start at a given
968/// value.
969///
970/// This method is useful for programs that wish to maintain monotonic sequence numbers between
971/// multiple executions of the application. Such an application should track received sequence
972/// numbers, store the latest one upon exiting, and then use this method to resume the sequence
973/// numbers from that point onward upon resuming.
974pub fn new_with_seed<T: Clone + Send + 'static>(bound: usize, seed: usize) -> Dispatcher<T> {
975 use rand::{thread_rng, Rng};
976
977 let (stx, srx) = mpsc::sync_channel(bound);
978 let mut d = DispatchInner {
979 targets: sync::Arc::new(sync::RwLock::new(HashMap::new())),
980 destinations: HashSet::new(),
981 bdelay: BinaryHeap::new(),
982 broadcasters: HashSet::new(),
983 forwarding: None,
984 bound: bound,
985 id: thread_rng().gen_ascii_chars().take(2).collect(),
986 counter: seed,
987 };
988
989 let id = d.id.clone();
990 let c_targets = d.targets.clone();
991 let c_stx = stx.clone();
992 let (ctx, crx) = mpsc::sync_channel::<String>(0);
993 thread::spawn(move || {
994 // this thread handles leaving receivers.
995 // it is *basically* the following loop:
996 //
997 // ```
998 // for left in crx {
999 // c_targets[left].channel.left = true;
1000 // ctx.send(ReceiverLeave(left));
1001 // }
1002 // ```
1003 //
1004 // unfortunately, it gets complicated by two factors:
1005 //
1006 // - if a receiver is created and then dropped immediately, the Leave could reach us
1007 // before the Join reaches the dispatcher. in this case, targets[left] doesn't exist
1008 // yet. we thus need to wait for the dispatcher to catch up to the join.
1009 //
1010 // - the dispatcher can't be blocking on a send to the receiver we are dropping because
1011 // it doesn't know about it yet, and thus must process the join message before it can
1012 // block waiting on us). however, there *is* a possibility of the dispatcher being
1013 // blocked on a send to a channel that is queued to be dropped *behind* this one. this
1014 // is the source of much of the complexity below.
1015 //
1016 // essentially, we keep track of leaving receivers that we haven't successfully handled
1017 // yet, but also keep reading from crx to see if there are other receivers that are also
1018 // trying to leave.
1019
1020 // receivers that are trying to leave
1021 let mut leaving = Vec::new();
1022 // temp for keeping track of nodes are *still* trying to leave while draining `leaving`
1023 let mut leaving_ = Vec::new();
1024
1025 'recv: loop {
1026 // are there more receivers trying to leave?
1027 let left = crx.try_recv();
1028 match left {
1029 Ok(left) => {
1030 // yes -- deal with them too
1031 leaving.push(left);
1032 }
1033 Err(..) if !leaving.is_empty() => {
1034 // no, but deal with the receivers that wanted to leave
1035 }
1036 Err(mpsc::TryRecvError::Disconnected) => {
1037 // no, and there will never be more
1038 // we must also have dealt with all receivers who tried to leave
1039 // it's safe for us to exit
1040 break 'recv;
1041 }
1042 Err(mpsc::TryRecvError::Empty) => {
1043 // no, and there also aren't any for us to retry
1044 // to avoid busy looping, we can now do a blocking receive
1045 let left = crx.recv();
1046 if let Ok(left) = left {
1047 // someone tried to leave -- let's try to deal with that
1048 leaving.push(left);
1049 } else {
1050 // channel closed, and no one is waiting -- safe to exit
1051 break 'recv;
1052 }
1053 }
1054 }
1055
1056 // try to process any receivers trying to leave
1057 for left in leaving.drain(..) {
1058 debug!("{} control: dealing with departure of receiver {}",
1059 id,
1060 left);
1061
1062 let targets = c_targets.read().unwrap();
1063 if let Some(t) = targets.get(&*left) {
1064 // the receiver exists, so we can remove it
1065 let mut state = t.channel.mx.lock().unwrap();
1066 state.left = true;
1067 state.closed = true;
1068 t.channel.cond.notify_one();
1069 drop(state);
1070
1071 // kick off a message to the dispatcher in the background.
1072 // we don't want it to be in the foreground, because we may have other things
1073 // to close that could block sending to the dispatcher.
1074 let ctx = c_stx.clone();
1075 thread::spawn(move || {
1076 ctx.send(Message::ReceiverLeave(left)).unwrap();
1077 });
1078 } else {
1079 // dispatcher doesn't know about this receiver yet
1080 leaving_.push(left);
1081 }
1082 }
1083 leaving.extend(leaving_.drain(..));
1084 }
1085 });
1086
1087 thread::spawn(move || {
1088 for m in srx.iter() {
1089 d.absorb(m);
1090 }
1091 });
1092
1093 Dispatcher {
1094 dispatcher: stx,
1095 leave: ctx,
1096 bound: bound,
1097 }
1098}
1099
1100#[cfg(test)]
1101mod tests {
1102 #[test]
1103 fn can_send_after_recv_drop() {
1104 // Create a dispatcher
1105 let d = super::new(1);
1106
1107 // Create two channels
1108 let (tx_a, rx_a) = d.new("atx", "arx");
1109 let (tx_b, rx_b) = d.new("btx", "brx");
1110 let _ = tx_a;
1111
1112 // Drop a receiver
1113 drop(rx_a);
1114
1115 // Ensure that sending doesn't block forever
1116 tx_b.send(10);
1117
1118 // And that messages are still delivered
1119 assert_eq!(rx_b.recv().unwrap().0.unwrap(), 10);
1120 }
1121
1122 #[test]
1123 fn recv_drop_unblocks_sender() {
1124 use std::thread;
1125 use std::time::Duration;
1126
1127 // Create a dispatcher
1128 let d = super::new(1);
1129
1130 // Create two channels
1131 let (tx_a, rx_a) = d.new("atx", "arx");
1132 let (tx_b, rx_b) = d.new("btx", "brx");
1133
1134 // Make tx_a a broadcaster (so it would block on b)
1135 // Note that we have to do this *before* we saturate the channel to the dispatcher
1136 let tx_a = tx_a.into_broadcaster();
1137
1138 // Fill rx_b
1139 thread::spawn(move || {
1140 for _ in 0..20 {
1141 tx_b.send("b");
1142 }
1143 });
1144 thread::sleep(Duration::from_millis(200));
1145
1146 // Drop b's receiver
1147 drop(rx_b);
1148
1149 // All of tx_b's sends should be dropped, and tx_a should be able to send
1150 tx_a.broadcast("a");
1151
1152 // And that messages are still delivered
1153 loop {
1154 let rx = rx_a.recv();
1155 assert!(rx.is_ok());
1156 let rx = rx.unwrap();
1157 if rx.0.is_some() {
1158 assert_eq!(rx.0, Some("a"));
1159 break;
1160 }
1161 }
1162 }
1163
1164 #[test]
1165 fn can_forward_after_recv_drop() {
1166 // Create a dispatcher
1167 let d = super::new(1);
1168
1169 // Create two channels
1170 let (tx_a, rx_a) = d.new("atx", "arx");
1171 let (tx_b, rx_b) = d.new("btx", "brx");
1172 let _ = tx_a;
1173
1174 // Drop a receiver
1175 drop(rx_a);
1176
1177 // Ensure that forwarding doesn't block forever
1178 tx_b.forward(Some(10), 1);
1179 // note that dropping the receiver kills the senders too!
1180
1181 // And that messages are still delivered
1182 assert_eq!(rx_b.recv(), Ok((Some(10), 1)));
1183 }
1184
1185 #[test]
1186 fn forward_with_no_senders() {
1187 use std::sync::mpsc;
1188
1189 let d = super::new(1);
1190 let (tx_a, rx_a) = d.new("atx", "arx");
1191 let (tx_b, rx_b) = d.new("btx", "brx");
1192
1193 tx_a.forward(Some(1), 1);
1194 // the message is queued because tx_b hasn't sent anything
1195
1196 // drop both senders, freeing Some(1) from the delayed queue. we specifically want to
1197 // explore the case where Some(1) is freed when there are no senders, so we have to drop
1198 // tx_a first (since tx_b is holding up the system).
1199 drop(tx_a);
1200 drop(tx_b);
1201
1202 // Ensure that receiver still gets notified of messages
1203 assert_eq!(rx_a.recv(), Ok((Some(1), 1)));
1204
1205 // And that other still gets a None
1206 assert_eq!(rx_b.recv(), Ok((None, 1)));
1207
1208 // And that no more entries are sent
1209 assert_eq!(rx_a.recv(), Err(mpsc::RecvError));
1210 assert_eq!(rx_b.recv(), Err(mpsc::RecvError));
1211 }
1212
1213 #[test]
1214 fn broadcast_dupe_termination() {
1215 use std::sync::mpsc;
1216
1217 let d = super::new(1);
1218 let (tx, rx) = d.new("tx", "rx");
1219 let tx = tx.into_broadcaster();
1220
1221 tx.broadcast_forward(Some("a"), 1);
1222 tx.broadcast_forward(Some("b"), 2);
1223 drop(tx);
1224
1225 assert_eq!(rx.recv(), Ok((Some("a"), 1)));
1226 assert_eq!(rx.recv(), Ok((Some("b"), 2)));
1227 assert_eq!(rx.recv(), Err(mpsc::RecvError));
1228 }
1229
1230 #[test]
1231 fn multisend_thread_interleaving() {
1232 use std::thread;
1233
1234 for _ in 0..1000 {
1235 let d = super::new(20);
1236 let (tx_a, rx) = d.new("tx_a", "rx");
1237 let tx_b = tx_a.clone("tx_b");
1238
1239 let t_a = thread::spawn(move || {
1240 tx_a.forward(Some("c_1"), 1);
1241 tx_a.forward(Some("c_3"), 3);
1242 tx_a.forward(Some("a_1"), 5);
1243 });
1244 let t_b = thread::spawn(move || {
1245 tx_b.forward(Some("c_2"), 2);
1246 tx_b.forward(Some("b_1"), 4);
1247 tx_b.forward(Some("a_2"), 6);
1248 });
1249
1250 assert_eq!(rx.recv(), Ok((Some("c_1"), 1)));
1251 assert_eq!(rx.recv(), Ok((Some("c_2"), 2)));
1252 assert_eq!(rx.recv(), Ok((Some("c_3"), 3)));
1253 assert_eq!(rx.recv(), Ok((Some("b_1"), 4)));
1254 assert_eq!(rx.recv(), Ok((Some("a_1"), 5)));
1255 assert_eq!(rx.recv(), Ok((Some("a_2"), 6)));
1256
1257 t_a.join().unwrap();
1258 t_b.join().unwrap();
1259 }
1260 }
1261
1262 #[test]
1263 fn test_new_with_seed() {
1264 let d = super::new_with_seed(1, 69105);
1265 let (tx, rx) = d.new("tx", "rx");
1266 tx.send("a");
1267 assert_eq!(rx.recv(), Ok((Some("a"), 69106)));
1268 }
1269}