bus/
lib.rs

1//! Bus provides a lock-free, bounded, single-producer, multi-consumer, broadcast channel.
2//!
3//! It uses a circular buffer and atomic instructions to implement a lock-free single-producer,
4//! multi-consumer channel. The interface is similar to that of the `std::sync::mpsc` channels,
5//! except that multiple consumers (readers of the channel) can be produced, whereas only a single
6//! sender can exist. Furthermore, in contrast to most multi-consumer FIFO queues, bus is
7//! *broadcast*; every send goes to every consumer.
8//!
9//! I haven't seen this particular implementation in literature (some extra bookkeeping is
10//! necessary to allow multiple consumers), but a lot of related reading can be found in Ross
11//! Bencina's blog post ["Some notes on lock-free and wait-free
12//! algorithms"](http://www.rossbencina.com/code/lockfree).
13//!
14//! Bus achieves broadcast by cloning the element in question, which is why `T` must implement
15//! `Clone`. However, Bus is clever about only cloning when necessary. Specifically, the last
16//! consumer to see a given value will move it instead of cloning, which means no cloning is
17//! happening for the single-consumer case. For cases where cloning is expensive, `Arc` should be
18//! used instead.
19//!
20//! # Examples
21//!
22//! Single-send, multi-consumer example
23//!
24//! ```rust
25//! use bus::Bus;
26//! let mut bus = Bus::new(10);
27//! let mut rx1 = bus.add_rx();
28//! let mut rx2 = bus.add_rx();
29//!
30//! bus.broadcast("Hello");
31//! assert_eq!(rx1.recv(), Ok("Hello"));
32//! assert_eq!(rx2.recv(), Ok("Hello"));
33//! ```
34//!
35//! Multi-send, multi-consumer example
36//!
37//! ```rust
38//! # if cfg!(miri) { return } // Miri is too slow
39//! use bus::Bus;
40//! use std::thread;
41//!
42//! let mut bus = Bus::new(10);
43//! let mut rx1 = bus.add_rx();
44//! let mut rx2 = bus.add_rx();
45//!
46//! // start a thread that sends 1..100
47//! let j = thread::spawn(move || {
48//!     for i in 1..100 {
49//!         bus.broadcast(i);
50//!     }
51//! });
52//!
53//! // every value should be received by both receivers
54//! for i in 1..100 {
55//!     // rx1
56//!     assert_eq!(rx1.recv(), Ok(i));
57//!     // and rx2
58//!     assert_eq!(rx2.recv(), Ok(i));
59//! }
60//!
61//! j.join().unwrap();
62//! ```
63//!
64//! Many-to-many channel using a dispatcher
65//!
66//! ```rust
67//! use bus::Bus;
68//!
69//! use std::thread;
70//! use std::sync::mpsc;
71//!
72//! // set up fan-in
73//! let (tx1, mix_rx) = mpsc::sync_channel(100);
74//! let tx2 = tx1.clone();
75//! // set up fan-out
76//! let mut mix_tx = Bus::new(100);
77//! let mut rx1 = mix_tx.add_rx();
78//! let mut rx2 = mix_tx.add_rx();
79//! // start dispatcher
80//! thread::spawn(move || {
81//!     for m in mix_rx.iter() {
82//!         mix_tx.broadcast(m);
83//!     }
84//! });
85//!
86//! // sends on tx1 are received ...
87//! tx1.send("Hello").unwrap();
88//!
89//! // ... by both receiver rx1 ...
90//! assert_eq!(rx1.recv(), Ok("Hello"));
91//! // ... and receiver rx2
92//! assert_eq!(rx2.recv(), Ok("Hello"));
93//!
94//! // same with sends on tx2
95//! tx2.send("world").unwrap();
96//! assert_eq!(rx1.recv(), Ok("world"));
97//! assert_eq!(rx2.recv(), Ok("world"));
98//! ```
99
100#![deny(missing_docs)]
101#![warn(rust_2018_idioms)]
102
103use crossbeam_channel as mpsc;
104use parking_lot_core::SpinWait;
105
106use std::cell::UnsafeCell;
107use std::fmt;
108use std::marker::PhantomData;
109use std::ops::Deref;
110use std::ptr;
111use std::sync::atomic;
112use std::sync::mpsc as std_mpsc;
113use std::sync::Arc;
114use std::thread;
115use std::time;
116
117const SPINTIME: u32 = 100_000; //ns
118
119struct SeatState<T> {
120    max: usize,
121    val: Option<T>,
122}
123
124struct MutSeatState<T>(UnsafeCell<SeatState<T>>);
125unsafe impl<T> Sync for MutSeatState<T> {}
126impl<T> Deref for MutSeatState<T> {
127    type Target = UnsafeCell<SeatState<T>>;
128    fn deref(&self) -> &Self::Target {
129        &self.0
130    }
131}
132
133impl<T> fmt::Debug for MutSeatState<T> {
134    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
135        f.debug_tuple("MutSeatState").field(&self.0).finish()
136    }
137}
138
139/// A Seat is a single location in the circular buffer.
140/// Each Seat knows how many readers are expected to access it, as well as how many have. The
141/// producer will never modify a seat's state unless all readers for a particular seat have either
142/// called `.take()` on it, or have left (see `Bus.rleft`).
143///
144/// The producer walks the seats of the ring in order, and will always only modify the seat at
145/// `tail + 1` once all readers have finished with the seat at `head + 2`. A reader will never
146/// access a seat unless it is between the reader's `head` and the producer's `tail`. Together,
147/// these properties ensure that a Seat is either accessed only by readers, or by only the
148/// producer.
149///
150/// The `read` attribute is used to ensure that readers see the most recent write to the seat when
151/// they access it. This is done using `atomic::Ordering::Acquire` and `atomic::Ordering::Release`.
152struct Seat<T> {
153    read: atomic::AtomicUsize,
154    state: MutSeatState<T>,
155
156    // is the writer waiting for this seat to be emptied? needs to be atomic since both the last
157    // reader and the writer might be accessing it at the same time.
158    waiting: AtomicOption<thread::Thread>,
159}
160
161impl<T> fmt::Debug for Seat<T> {
162    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
163        f.debug_struct("Seat")
164            .field("read", &self.read)
165            .field("state", &self.state)
166            .field("waiting", &self.waiting)
167            .finish()
168    }
169}
170
171impl<T: Clone + Sync> Seat<T> {
172    /// take is used by a reader to extract a copy of the value stored on this seat. only readers
173    /// that were created strictly before the time this seat was last written to by the producer
174    /// are allowed to call this method, and they may each only call it once.
175    fn take(&self) -> T {
176        let read = self.read.load(atomic::Ordering::Acquire);
177
178        // the writer will only modify this element when .read hits .max - writer.rleft[i]. we can
179        // be sure that this is not currently the case (which means it's safe for us to read)
180        // because:
181        //
182        //  - .max is set to the number of readers at the time when the write happens
183        //  - any joining readers will start at a later seat
184        //  - so, at most .max readers will call .take() on this seat this time around the buffer
185        //  - a reader must leave either *before* or *after* a call to recv. there are two cases:
186        //
187        //    - it leaves before, rleft is decremented, but .take is not called
188        //    - it leaves after, .take is called, but head has been incremented, so rleft will be
189        //      decremented for the *next* seat, not this one
190        //
191        //    so, either .take is called, and .read is incremented, or writer.rleft is incremented.
192        //    thus, for a writer to modify this element, *all* readers at the time of the previous
193        //    write to this seat must have either called .take or have left.
194        //  - since we are one of those readers, this cannot be true, so it's safe for us to assume
195        //    that there is no concurrent writer for this seat
196        let state = unsafe { &*self.state.get() };
197        assert!(
198            read < state.max,
199            "reader hit seat with exhausted reader count"
200        );
201
202        let mut waiting = None;
203
204        // NOTE
205        // we must extract the value *before* we decrement the number of remaining items otherwise,
206        // the object might be replaced by the time we read it!
207        let v = if read + 1 == state.max {
208            // we're the last reader, so we may need to notify the writer there's space in the buf.
209            // can be relaxed, since the acquire at the top already guarantees that we'll see
210            // updates.
211            waiting = self.waiting.take();
212
213            // since we're the last reader, no-one else will be cloning this value, so we can
214            // safely take a mutable reference, and just take the val instead of cloning it.
215            unsafe { &mut *self.state.get() }.val.take().unwrap()
216        } else {
217            let v = state
218                .val
219                .clone()
220                .expect("seat that should be occupied was empty");
221
222            // let writer know that we no longer need this item.
223            // state is no longer safe to access.
224            #[allow(clippy::drop_ref)]
225            drop(state);
226            v
227        };
228
229        self.read.fetch_add(1, atomic::Ordering::AcqRel);
230
231        if let Some(t) = waiting {
232            // writer was waiting for us to finish with this
233            t.unpark();
234        }
235
236        v
237    }
238}
239
240impl<T> Default for Seat<T> {
241    fn default() -> Self {
242        Seat {
243            read: atomic::AtomicUsize::new(0),
244            waiting: AtomicOption::empty(),
245            state: MutSeatState(UnsafeCell::new(SeatState { max: 0, val: None })),
246        }
247    }
248}
249
250/// `BusInner` encapsulates data that both the writer and the readers need to access. The tail is
251/// only ever modified by the producer, and read by the consumers. The length of the bus is
252/// instantiated when the bus is created, and is never modified.
253struct BusInner<T> {
254    ring: Vec<Seat<T>>,
255    len: usize,
256    tail: atomic::AtomicUsize,
257    closed: atomic::AtomicBool,
258}
259
260impl<T> fmt::Debug for BusInner<T> {
261    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
262        f.debug_struct("BusInner")
263            .field("ring", &self.ring)
264            .field("len", &self.len)
265            .field("tail", &self.tail)
266            .field("closed", &self.closed)
267            .finish()
268    }
269}
270
271/// `Bus` is the main interconnect for broadcast messages. It can be used to send broadcast
272/// messages, or to connect additional consumers. When the `Bus` is dropped, receivers will
273/// continue receiving any outstanding broadcast messages they would have received if the bus were
274/// not dropped. After all those messages have been received, any subsequent receive call on a
275/// receiver will return a disconnected error.
276pub struct Bus<T> {
277    state: Arc<BusInner<T>>,
278
279    // current number of readers
280    readers: usize,
281
282    // rleft keeps track of readers that should be skipped for each index. we must do this because
283    // .read will be < max for those indices, even though all active readers have received them.
284    rleft: Vec<usize>,
285
286    // leaving is used by receivers to signal that they are done
287    leaving: (mpsc::Sender<usize>, mpsc::Receiver<usize>),
288
289    // waiting is used by receivers to signal that they are waiting for new entries, and where they
290    // are waiting
291    #[allow(clippy::type_complexity)]
292    waiting: (
293        mpsc::Sender<(thread::Thread, usize)>,
294        mpsc::Receiver<(thread::Thread, usize)>,
295    ),
296
297    // channel used to communicate to unparker that a given thread should be woken up
298    unpark: mpsc::Sender<thread::Thread>,
299
300    // cache used to keep track of threads waiting for next write.
301    // this is only here to avoid allocating one on every broadcast()
302    cache: Vec<(thread::Thread, usize)>,
303}
304
305impl<T> fmt::Debug for Bus<T> {
306    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
307        f.debug_struct("Bus")
308            .field("state", &self.state)
309            .field("readers", &self.readers)
310            .field("rleft", &self.rleft)
311            .field("leaving", &self.leaving)
312            .field("waiting", &self.waiting)
313            .field("unpark", &self.unpark)
314            .field("cache", &self.cache)
315            .finish()
316    }
317}
318
319impl<T> Bus<T> {
320    /// Allocates a new `Bus`.
321    ///
322    /// The provided length should be sufficient to absorb temporary peaks in the data flow, and is
323    /// thus workflow-dependent. Bus performance degrades somewhat when the queue is full, so it is
324    /// generally better to set this high than low unless you are pressed for memory.
325    pub fn new(mut len: usize) -> Bus<T> {
326        use std::iter;
327
328        // ring buffer must have room for one padding element
329        len += 1;
330
331        let inner = Arc::new(BusInner {
332            ring: (0..len).map(|_| Seat::default()).collect(),
333            tail: atomic::AtomicUsize::new(0),
334            closed: atomic::AtomicBool::new(false),
335            len,
336        });
337
338        // work around https://github.com/rust-lang/rust/issues/59020
339        if !cfg!(miri) && cfg!(target = "macos") {
340            let _ = time::Instant::now().elapsed();
341        }
342
343        // we run a separate thread responsible for unparking
344        // so we don't have to wait for unpark() to return in broadcast_inner
345        // sending on a channel without contention is cheap, unparking is not
346        let (unpark_tx, unpark_rx) = mpsc::unbounded::<thread::Thread>();
347        let _ = thread::Builder::new()
348            .name("bus_unparking".to_owned())
349            .spawn(move || {
350                for t in unpark_rx.iter() {
351                    t.unpark();
352                }
353            });
354
355        Bus {
356            state: inner,
357            readers: 0,
358            rleft: iter::repeat(0).take(len).collect(),
359            leaving: mpsc::unbounded(),
360            waiting: mpsc::unbounded(),
361            unpark: unpark_tx,
362
363            cache: Vec::new(),
364        }
365    }
366
367    /// Get the expected number of reads for the given seat. This number will always be
368    /// conservative, in that fewer reads may be fine. Specifically, `.rleft` may not be
369    /// sufficiently up-to-date to account for all readers that have left.
370    #[inline]
371    fn expected(&mut self, at: usize) -> usize {
372        // since only the producer will modify the ring, and &mut self guarantees that *we* are the
373        // producer, no-one is modifying the ring. Multiple read-only borrows are safe, and so the
374        // cast below is safe.
375        unsafe { &*self.state.ring[at].state.get() }.max - self.rleft[at]
376    }
377
378    /// Attempts to place the given value on the bus.
379    ///
380    /// If the bus is full, the behavior depends on `block`. If false, the value given is returned
381    /// in an `Err()`. Otherwise, the current thread will be parked until there is space in the bus
382    /// again, and the broadcast will be tried again until it succeeds.
383    ///
384    /// Note that broadcasts will succeed even if there are no consumers!
385    fn broadcast_inner(&mut self, val: T, block: bool) -> Result<(), T> {
386        let tail = self.state.tail.load(atomic::Ordering::Relaxed);
387
388        // we want to check if the next element over is free to ensure that we always leave one
389        // empty space between the head and the tail. This is necessary so that readers can
390        // distinguish between an empty and a full list. If the fence seat is free, the seat at
391        // tail must also be free, which is simple enough to show by induction (exercise for the
392        // reader).
393        let fence = (tail + 1) % self.state.len;
394
395        let spintime = time::Duration::new(0, SPINTIME);
396
397        // to avoid parking when a slot frees up quickly, we use an exponential back-off SpinWait.
398        let mut sw = SpinWait::new();
399        loop {
400            let fence_read = self.state.ring[fence].read.load(atomic::Ordering::Acquire);
401
402            // is there room left in the ring?
403            if fence_read == self.expected(fence) {
404                break;
405            }
406
407            // no!
408            // let's check if any readers have left, which might increment self.rleft[tail].
409            while let Ok(mut left) = self.leaving.1.try_recv() {
410                // a reader has left! this means that every seat between `left` and `tail-1`
411                // has max set one too high. we track the number of such "missing" reads that
412                // should be ignored in self.rleft, and compensate for them when looking at
413                // seat.read above.
414                self.readers -= 1;
415                while left != tail {
416                    self.rleft[left] += 1;
417                    left = (left + 1) % self.state.len
418                }
419            }
420
421            // is the fence block now free?
422            if fence_read == self.expected(fence) {
423                // yes! go ahead and write!
424                break;
425            } else if block {
426                // no, so block by parking and telling readers to notify on last read
427                self.state.ring[fence]
428                    .waiting
429                    .swap(Some(Box::new(thread::current())));
430
431                // need the atomic fetch_add to ensure reader threads will see the new .waiting
432                self.state.ring[fence]
433                    .read
434                    .fetch_add(0, atomic::Ordering::Release);
435
436                if !sw.spin() {
437                    // not likely to get a slot soon -- wait to be unparked instead.
438                    // note that we *need* to wait, because there are some cases in which we
439                    // *won't* be unparked even though a slot has opened up.
440                    thread::park_timeout(spintime);
441                }
442                continue;
443            } else {
444                // no, and blocking isn't allowed, so return an error
445                return Err(val);
446            }
447        }
448
449        // next one over is free, we have a free seat!
450        let readers = self.readers;
451        {
452            let next = &self.state.ring[tail];
453            // we are the only writer, so no-one else can be writing. however, since we're
454            // mutating state, we also need for there to be no readers for this to be safe. the
455            // argument for why this is the case is roughly an inverse of the argument for why
456            // the unsafe block in Seat.take() is safe.  basically, since
457            //
458            //   .read + .rleft == .max
459            //
460            // we know all readers at the time of the seat's previous write have accessed this
461            // seat. we also know that no other readers will access that seat (they must have
462            // started at later seats). thus, we are the only thread accessing this seat, and
463            // so we can safely access it as mutable.
464            let state = unsafe { &mut *next.state.get() };
465            state.max = readers;
466            state.val = Some(val);
467            next.waiting.take();
468            next.read.store(0, atomic::Ordering::Release);
469        }
470        self.rleft[tail] = 0;
471        // now tell readers that they can read
472        let tail = (tail + 1) % self.state.len;
473        self.state.tail.store(tail, atomic::Ordering::Release);
474
475        // unblock any blocked receivers
476        while let Ok((t, at)) = self.waiting.1.try_recv() {
477            // the only readers we can't unblock are those that have already absorbed the
478            // broadcast we just made, since they are blocking on the *next* broadcast
479            if at == tail {
480                self.cache.push((t, at))
481            } else {
482                self.unpark.send(t).unwrap();
483            }
484        }
485        for w in self.cache.drain(..) {
486            // fine to do here because it is guaranteed not to block
487            self.waiting.0.send(w).unwrap();
488        }
489
490        Ok(())
491    }
492
493    /// Attempt to broadcast the given value to all consumers, but does not block if full.
494    ///
495    /// Note that, in contrast to regular channels, a bus is *not* considered closed if there are
496    /// no consumers, and thus broadcasts will continue to succeed. Thus, a successful broadcast
497    /// occurs as long as there is room on the internal bus to store the value, or some older value
498    /// has been received by all consumers. Note that a return value of `Err` means that the data
499    /// will never be received (by any consumer), but a return value of Ok does not mean that the
500    /// data will be received by a given consumer. It is possible for a receiver to hang up
501    /// immediately after this function returns Ok.
502    ///
503    /// This method will never block the current thread.
504    ///
505    /// ```rust
506    /// use bus::Bus;
507    /// let mut tx = Bus::new(1);
508    /// let mut rx = tx.add_rx();
509    /// assert_eq!(tx.try_broadcast("Hello"), Ok(()));
510    /// assert_eq!(tx.try_broadcast("world"), Err("world"));
511    /// ```
512    pub fn try_broadcast(&mut self, val: T) -> Result<(), T> {
513        self.broadcast_inner(val, false)
514    }
515
516    /// Broadcasts a value on the bus to all consumers.
517    ///
518    /// This function will block until space in the internal buffer becomes available.
519    ///
520    /// Note that a successful send does not guarantee that the receiver will ever see the data if
521    /// there is a buffer on this channel. Items may be enqueued in the internal buffer for the
522    /// receiver to receive at a later time. Furthermore, in contrast to regular channels, a bus is
523    /// *not* considered closed if there are no consumers, and thus broadcasts will continue to
524    /// succeed.
525    pub fn broadcast(&mut self, val: T) {
526        if let Err(..) = self.broadcast_inner(val, true) {
527            unreachable!("blocking broadcast_inner can't fail");
528        }
529    }
530
531    /// Add a new consumer to this bus.
532    ///
533    /// The new consumer will receive all *future* broadcasts on this bus.
534    ///
535    /// # Examples
536    ///
537    /// ```rust
538    /// use bus::Bus;
539    /// use std::sync::mpsc::TryRecvError;
540    ///
541    /// let mut bus = Bus::new(10);
542    /// let mut rx1 = bus.add_rx();
543    ///
544    /// bus.broadcast("Hello");
545    ///
546    /// // consumer present during broadcast sees update
547    /// assert_eq!(rx1.recv(), Ok("Hello"));
548    ///
549    /// // new consumer does *not* see broadcast
550    /// let mut rx2 = bus.add_rx();
551    /// assert_eq!(rx2.try_recv(), Err(TryRecvError::Empty));
552    ///
553    /// // both consumers see new broadcast
554    /// bus.broadcast("world");
555    /// assert_eq!(rx1.recv(), Ok("world"));
556    /// assert_eq!(rx2.recv(), Ok("world"));
557    /// ```
558    pub fn add_rx(&mut self) -> BusReader<T> {
559        self.readers += 1;
560
561        BusReader {
562            bus: Arc::clone(&self.state),
563            head: self.state.tail.load(atomic::Ordering::Relaxed),
564            leaving: self.leaving.0.clone(),
565            waiting: self.waiting.0.clone(),
566            closed: false,
567        }
568    }
569
570    /// Returns the number of active consumers currently attached to this bus.
571    ///
572    /// It is not guaranteed that a sent message will reach this number of consumers, as active
573    /// consumers may never call `recv` or `try_recv` again before dropping.
574    ///
575    /// # Examples
576    ///
577    /// ```rust
578    /// use bus::Bus;
579    ///
580    /// let mut bus = Bus::<u8>::new(10);
581    /// assert_eq!(bus.rx_count(), 0);
582    ///
583    /// let rx1 = bus.add_rx();
584    /// assert_eq!(bus.rx_count(), 1);
585    ///
586    /// drop(rx1);
587    /// assert_eq!(bus.rx_count(), 0);
588    /// ```
589    pub fn rx_count(&self) -> usize {
590        self.readers - self.leaving.1.len()
591    }
592}
593
594impl<T> Drop for Bus<T> {
595    fn drop(&mut self) {
596        self.state.closed.store(true, atomic::Ordering::Relaxed);
597        // Acquire/Release .tail to ensure other threads see new .closed
598        self.state.tail.fetch_add(0, atomic::Ordering::AcqRel);
599        // TODO: unpark receivers -- this is not absolutely necessary, since the reader's park will
600        // time out, but it would cause them to detect the closed bus somewhat faster.
601    }
602}
603
604#[derive(Clone, Copy)]
605enum RecvCondition {
606    Try,
607    Block,
608    Timeout(time::Duration),
609}
610
611/// A `BusReader` is a single consumer of `Bus` broadcasts. It will see every new value that is
612/// passed to `.broadcast()` (or successful calls to `.try_broadcast()`) on the `Bus` that it was
613/// created from.
614///
615/// Dropping a `BusReader` is perfectly safe, and will unblock the writer if it was waiting for
616/// that read to see a particular update.
617///
618/// ```rust
619/// use bus::Bus;
620/// let mut tx = Bus::new(1);
621/// let mut r1 = tx.add_rx();
622/// let r2 = tx.add_rx();
623/// assert_eq!(tx.try_broadcast(true), Ok(()));
624/// assert_eq!(r1.recv(), Ok(true));
625///
626/// // the bus does not have room for another broadcast
627/// // since it knows r2 has not yet read the first broadcast
628/// assert_eq!(tx.try_broadcast(true), Err(true));
629///
630/// // dropping r2 tells the producer that there is a free slot
631/// // (i.e., it has been read by everyone)
632/// drop(r2);
633/// assert_eq!(tx.try_broadcast(true), Ok(()));
634/// ```
635pub struct BusReader<T> {
636    bus: Arc<BusInner<T>>,
637    head: usize,
638    leaving: mpsc::Sender<usize>,
639    waiting: mpsc::Sender<(thread::Thread, usize)>,
640    closed: bool,
641}
642
643impl<T> fmt::Debug for BusReader<T> {
644    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
645        f.debug_struct("BusReader")
646            .field("bus", &self.bus)
647            .field("head", &self.head)
648            .field("leaving", &self.leaving)
649            .field("waiting", &self.waiting)
650            .field("closed", &self.closed)
651            .finish()
652    }
653}
654
655impl<T: Clone + Sync> BusReader<T> {
656    /// Attempts to read a broadcast from the bus.
657    ///
658    /// If the bus is empty, the behavior depends on `block`. If false,
659    /// `Err(mpsc::RecvTimeoutError::Timeout)` is returned. Otherwise, the current thread will be
660    /// parked until there is another broadcast on the bus, at which point the receive will be
661    /// performed.
662    fn recv_inner(&mut self, block: RecvCondition) -> Result<T, std_mpsc::RecvTimeoutError> {
663        if self.closed {
664            return Err(std_mpsc::RecvTimeoutError::Disconnected);
665        }
666
667        let start = match block {
668            RecvCondition::Timeout(_) => Some(time::Instant::now()),
669            _ => None,
670        };
671
672        let spintime = time::Duration::new(0, SPINTIME);
673
674        let mut was_closed = false;
675        let mut sw = SpinWait::new();
676        let mut first = true;
677        loop {
678            let tail = self.bus.tail.load(atomic::Ordering::Acquire);
679            if tail != self.head {
680                break;
681            }
682
683            // buffer is empty, check whether it's closed.
684            // relaxed is fine since Bus.drop does an acquire/release on tail
685            if self.bus.closed.load(atomic::Ordering::Relaxed) {
686                // we need to check again that there's nothing in the bus, otherwise we might have
687                // missed a write between when we did the read of .tail above and when we read
688                // .closed here
689                if !was_closed {
690                    was_closed = true;
691                    continue;
692                }
693
694                // the bus is closed, and we didn't miss anything!
695                self.closed = true;
696                return Err(std_mpsc::RecvTimeoutError::Disconnected);
697            }
698
699            // not closed, should we block?
700            if let RecvCondition::Try = block {
701                return Err(std_mpsc::RecvTimeoutError::Timeout);
702            }
703
704            // park and tell writer to notify on write
705            if first {
706                if let Err(..) = self.waiting.send((thread::current(), self.head)) {
707                    // writer has gone away, but somehow we _just_ missed the close signal (in
708                    // self.bus.closed). iterate again to ensure the channel is _actually_ empty.
709                    atomic::fence(atomic::Ordering::SeqCst);
710                    continue;
711                }
712                first = false;
713            }
714
715            if !sw.spin() {
716                match block {
717                    RecvCondition::Timeout(t) => {
718                        match t.checked_sub(start.as_ref().unwrap().elapsed()) {
719                            Some(left) => {
720                                if left < spintime {
721                                    thread::park_timeout(left);
722                                } else {
723                                    thread::park_timeout(spintime);
724                                }
725                            }
726                            None => {
727                                // So, the wake-up thread is still going to try to wake us up later
728                                // since we sent thread::current() above, but that's fine.
729                                return Err(std_mpsc::RecvTimeoutError::Timeout);
730                            }
731                        }
732                    }
733                    RecvCondition::Block => {
734                        thread::park_timeout(spintime);
735                    }
736                    RecvCondition::Try => unreachable!(),
737                }
738            }
739        }
740
741        let head = self.head;
742        let ret = self.bus.ring[head].take();
743
744        // safe because len is read-only
745        self.head = (head + 1) % self.bus.len;
746        Ok(ret)
747    }
748
749    /// Attempts to return a pending broadcast on this receiver without blocking.
750    ///
751    /// This method will never block the caller in order to wait for data to become available.
752    /// Instead, this will always return immediately with a possible option of pending data on the
753    /// channel.
754    ///
755    /// If the corresponding bus has been dropped, and all broadcasts have been received, this
756    /// method will return with a disconnected error.
757    ///
758    /// This method is useful for a flavor of "optimistic check" before deciding to block on a
759    /// receiver.
760    ///
761    /// ```rust
762    /// use bus::Bus;
763    /// use std::thread;
764    ///
765    /// let mut tx = Bus::new(10);
766    /// let mut rx = tx.add_rx();
767    ///
768    /// // spawn a thread that will broadcast at some point
769    /// let j = thread::spawn(move || {
770    ///     tx.broadcast(true);
771    /// });
772    ///
773    /// loop {
774    ///     match rx.try_recv() {
775    ///         Ok(val) => {
776    ///             assert_eq!(val, true);
777    ///             break;
778    ///         }
779    ///         Err(..) => {
780    ///             // maybe we can do other useful work here
781    ///             // or we can just busy-loop
782    ///             thread::yield_now()
783    ///         },
784    ///     }
785    /// }
786    ///
787    /// j.join().unwrap();
788    /// ```
789    pub fn try_recv(&mut self) -> Result<T, std_mpsc::TryRecvError> {
790        self.recv_inner(RecvCondition::Try).map_err(|e| match e {
791            std_mpsc::RecvTimeoutError::Disconnected => std_mpsc::TryRecvError::Disconnected,
792            std_mpsc::RecvTimeoutError::Timeout => std_mpsc::TryRecvError::Empty,
793        })
794    }
795
796    /// Read another broadcast message from the bus, and block if none are available.
797    ///
798    /// This function will always block the current thread if there is no data available and it's
799    /// possible for more broadcasts to be sent. Once a broadcast is sent on the corresponding
800    /// `Bus`, then this receiver will wake up and return that message.
801    ///
802    /// If the corresponding `Bus` has been dropped, or it is dropped while this call is blocking,
803    /// this call will wake up and return `Err` to indicate that no more messages can ever be
804    /// received on this channel. However, since channels are buffered, messages sent before the
805    /// disconnect will still be properly received.
806    pub fn recv(&mut self) -> Result<T, std_mpsc::RecvError> {
807        match self.recv_inner(RecvCondition::Block) {
808            Ok(val) => Ok(val),
809            Err(std_mpsc::RecvTimeoutError::Disconnected) => Err(std_mpsc::RecvError),
810            _ => unreachable!("blocking recv_inner can't fail"),
811        }
812    }
813
814    /// Attempts to wait for a value from the bus, returning an error if the corresponding channel
815    /// has hung up, or if it waits more than `timeout`.
816    ///
817    /// This function will always block the current thread if there is no data available and it's
818    /// possible for more broadcasts to be sent. Once a message is sent on the corresponding `Bus`,
819    /// then this receiver will wake up and return that message.
820    ///
821    /// If the corresponding `Bus` has been dropped, or it is dropped while this call is blocking,
822    /// this call will wake up and return Err to indicate that no more messages can ever be
823    /// received on this channel. However, since channels are buffered, messages sent before the
824    /// disconnect will still be properly received.
825    ///
826    /// # Examples
827    ///
828    /// ```rust
829    /// use bus::Bus;
830    /// use std::sync::mpsc::RecvTimeoutError;
831    /// use std::time::Duration;
832    ///
833    /// let mut tx = Bus::<bool>::new(10);
834    /// let mut rx = tx.add_rx();
835    ///
836    /// let timeout = Duration::from_millis(100);
837    /// assert_eq!(Err(RecvTimeoutError::Timeout), rx.recv_timeout(timeout));
838    /// ```
839    pub fn recv_timeout(
840        &mut self,
841        timeout: time::Duration,
842    ) -> Result<T, std_mpsc::RecvTimeoutError> {
843        self.recv_inner(RecvCondition::Timeout(timeout))
844    }
845}
846
847impl<T> BusReader<T> {
848    /// Returns an iterator that will block waiting for broadcasts. It will return `None` when the
849    /// bus has been closed (i.e., the `Bus` has been dropped).
850    pub fn iter(&mut self) -> BusIter<'_, T> {
851        BusIter(self)
852    }
853}
854
855impl<T> Drop for BusReader<T> {
856    #[allow(unused_must_use)]
857    fn drop(&mut self) {
858        // we allow not checking the result here because the writer might have gone away, which
859        // would result in an error, but is okay nonetheless.
860        self.leaving.send(self.head);
861    }
862}
863
864/// An iterator over messages on a receiver. This iterator will block whenever `next` is called,
865/// waiting for a new message, and `None` will be returned when the corresponding channel has been
866/// closed.
867pub struct BusIter<'a, T>(&'a mut BusReader<T>);
868
869/// An owning iterator over messages on a receiver. This iterator will block whenever `next` is
870/// called, waiting for a new message, and `None` will be returned when the corresponding bus has
871/// been closed.
872pub struct BusIntoIter<T>(BusReader<T>);
873
874impl<'a, T: Clone + Sync> IntoIterator for &'a mut BusReader<T> {
875    type Item = T;
876    type IntoIter = BusIter<'a, T>;
877    fn into_iter(self) -> BusIter<'a, T> {
878        BusIter(self)
879    }
880}
881
882impl<T: Clone + Sync> IntoIterator for BusReader<T> {
883    type Item = T;
884    type IntoIter = BusIntoIter<T>;
885    fn into_iter(self) -> BusIntoIter<T> {
886        BusIntoIter(self)
887    }
888}
889
890impl<'a, T: Clone + Sync> Iterator for BusIter<'a, T> {
891    type Item = T;
892    fn next(&mut self) -> Option<T> {
893        self.0.recv().ok()
894    }
895}
896
897impl<T: Clone + Sync> Iterator for BusIntoIter<T> {
898    type Item = T;
899    fn next(&mut self) -> Option<T> {
900        self.0.recv().ok()
901    }
902}
903
904struct AtomicOption<T> {
905    ptr: atomic::AtomicPtr<T>,
906    _marker: PhantomData<Option<Box<T>>>,
907}
908
909impl<T> fmt::Debug for AtomicOption<T> {
910    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
911        f.debug_struct("AtomicOption")
912            .field("ptr", &self.ptr)
913            .finish()
914    }
915}
916
917unsafe impl<T: Send> Send for AtomicOption<T> {}
918unsafe impl<T: Send> Sync for AtomicOption<T> {}
919
920impl<T> AtomicOption<T> {
921    fn empty() -> Self {
922        Self {
923            ptr: atomic::AtomicPtr::new(ptr::null_mut()),
924            _marker: PhantomData,
925        }
926    }
927
928    fn swap(&self, val: Option<Box<T>>) -> Option<Box<T>> {
929        let old = match val {
930            Some(val) => self.ptr.swap(Box::into_raw(val), atomic::Ordering::AcqRel),
931            // Acquire is needed to synchronize with the store of a non-null ptr, but since a null ptr
932            // will never be dereferenced, there is no need to synchronize the store of a null ptr.
933            None => self.ptr.swap(ptr::null_mut(), atomic::Ordering::Acquire),
934        };
935        if old.is_null() {
936            None
937        } else {
938            // SAFETY:
939            // - AcqRel/Acquire ensures that it does not read a pointer to potentially invalid memory.
940            // - We've checked that old is not null.
941            // - We do not store invalid pointers other than null in self.ptr.
942            Some(unsafe { Box::from_raw(old) })
943        }
944    }
945
946    fn take(&self) -> Option<Box<T>> {
947        self.swap(None)
948    }
949}
950
951impl<T> Drop for AtomicOption<T> {
952    fn drop(&mut self) {
953        drop(self.take());
954    }
955}