bus/lib.rs
1//! Bus provides a lock-free, bounded, single-producer, multi-consumer, broadcast channel.
2//!
3//! It uses a circular buffer and atomic instructions to implement a lock-free single-producer,
4//! multi-consumer channel. The interface is similar to that of the `std::sync::mpsc` channels,
5//! except that multiple consumers (readers of the channel) can be produced, whereas only a single
6//! sender can exist. Furthermore, in contrast to most multi-consumer FIFO queues, bus is
7//! *broadcast*; every send goes to every consumer.
8//!
9//! I haven't seen this particular implementation in literature (some extra bookkeeping is
10//! necessary to allow multiple consumers), but a lot of related reading can be found in Ross
11//! Bencina's blog post ["Some notes on lock-free and wait-free
12//! algorithms"](http://www.rossbencina.com/code/lockfree).
13//!
14//! Bus achieves broadcast by cloning the element in question, which is why `T` must implement
15//! `Clone`. However, Bus is clever about only cloning when necessary. Specifically, the last
16//! consumer to see a given value will move it instead of cloning, which means no cloning is
17//! happening for the single-consumer case. For cases where cloning is expensive, `Arc` should be
18//! used instead.
19//!
20//! # Examples
21//!
22//! Single-send, multi-consumer example
23//!
24//! ```rust
25//! use bus::Bus;
26//! let mut bus = Bus::new(10);
27//! let mut rx1 = bus.add_rx();
28//! let mut rx2 = bus.add_rx();
29//!
30//! bus.broadcast("Hello");
31//! assert_eq!(rx1.recv(), Ok("Hello"));
32//! assert_eq!(rx2.recv(), Ok("Hello"));
33//! ```
34//!
35//! Multi-send, multi-consumer example
36//!
37//! ```rust
38//! # if cfg!(miri) { return } // Miri is too slow
39//! use bus::Bus;
40//! use std::thread;
41//!
42//! let mut bus = Bus::new(10);
43//! let mut rx1 = bus.add_rx();
44//! let mut rx2 = bus.add_rx();
45//!
46//! // start a thread that sends 1..100
47//! let j = thread::spawn(move || {
48//! for i in 1..100 {
49//! bus.broadcast(i);
50//! }
51//! });
52//!
53//! // every value should be received by both receivers
54//! for i in 1..100 {
55//! // rx1
56//! assert_eq!(rx1.recv(), Ok(i));
57//! // and rx2
58//! assert_eq!(rx2.recv(), Ok(i));
59//! }
60//!
61//! j.join().unwrap();
62//! ```
63//!
64//! Many-to-many channel using a dispatcher
65//!
66//! ```rust
67//! use bus::Bus;
68//!
69//! use std::thread;
70//! use std::sync::mpsc;
71//!
72//! // set up fan-in
73//! let (tx1, mix_rx) = mpsc::sync_channel(100);
74//! let tx2 = tx1.clone();
75//! // set up fan-out
76//! let mut mix_tx = Bus::new(100);
77//! let mut rx1 = mix_tx.add_rx();
78//! let mut rx2 = mix_tx.add_rx();
79//! // start dispatcher
80//! thread::spawn(move || {
81//! for m in mix_rx.iter() {
82//! mix_tx.broadcast(m);
83//! }
84//! });
85//!
86//! // sends on tx1 are received ...
87//! tx1.send("Hello").unwrap();
88//!
89//! // ... by both receiver rx1 ...
90//! assert_eq!(rx1.recv(), Ok("Hello"));
91//! // ... and receiver rx2
92//! assert_eq!(rx2.recv(), Ok("Hello"));
93//!
94//! // same with sends on tx2
95//! tx2.send("world").unwrap();
96//! assert_eq!(rx1.recv(), Ok("world"));
97//! assert_eq!(rx2.recv(), Ok("world"));
98//! ```
99
100#![deny(missing_docs)]
101#![warn(rust_2018_idioms)]
102
103use crossbeam_channel as mpsc;
104use parking_lot_core::SpinWait;
105
106use std::cell::UnsafeCell;
107use std::fmt;
108use std::marker::PhantomData;
109use std::ops::Deref;
110use std::ptr;
111use std::sync::atomic;
112use std::sync::mpsc as std_mpsc;
113use std::sync::Arc;
114use std::thread;
115use std::time;
116
117const SPINTIME: u32 = 100_000; //ns
118
119struct SeatState<T> {
120 max: usize,
121 val: Option<T>,
122}
123
124struct MutSeatState<T>(UnsafeCell<SeatState<T>>);
125unsafe impl<T> Sync for MutSeatState<T> {}
126impl<T> Deref for MutSeatState<T> {
127 type Target = UnsafeCell<SeatState<T>>;
128 fn deref(&self) -> &Self::Target {
129 &self.0
130 }
131}
132
133impl<T> fmt::Debug for MutSeatState<T> {
134 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
135 f.debug_tuple("MutSeatState").field(&self.0).finish()
136 }
137}
138
139/// A Seat is a single location in the circular buffer.
140/// Each Seat knows how many readers are expected to access it, as well as how many have. The
141/// producer will never modify a seat's state unless all readers for a particular seat have either
142/// called `.take()` on it, or have left (see `Bus.rleft`).
143///
144/// The producer walks the seats of the ring in order, and will always only modify the seat at
145/// `tail + 1` once all readers have finished with the seat at `head + 2`. A reader will never
146/// access a seat unless it is between the reader's `head` and the producer's `tail`. Together,
147/// these properties ensure that a Seat is either accessed only by readers, or by only the
148/// producer.
149///
150/// The `read` attribute is used to ensure that readers see the most recent write to the seat when
151/// they access it. This is done using `atomic::Ordering::Acquire` and `atomic::Ordering::Release`.
152struct Seat<T> {
153 read: atomic::AtomicUsize,
154 state: MutSeatState<T>,
155
156 // is the writer waiting for this seat to be emptied? needs to be atomic since both the last
157 // reader and the writer might be accessing it at the same time.
158 waiting: AtomicOption<thread::Thread>,
159}
160
161impl<T> fmt::Debug for Seat<T> {
162 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
163 f.debug_struct("Seat")
164 .field("read", &self.read)
165 .field("state", &self.state)
166 .field("waiting", &self.waiting)
167 .finish()
168 }
169}
170
171impl<T: Clone + Sync> Seat<T> {
172 /// take is used by a reader to extract a copy of the value stored on this seat. only readers
173 /// that were created strictly before the time this seat was last written to by the producer
174 /// are allowed to call this method, and they may each only call it once.
175 fn take(&self) -> T {
176 let read = self.read.load(atomic::Ordering::Acquire);
177
178 // the writer will only modify this element when .read hits .max - writer.rleft[i]. we can
179 // be sure that this is not currently the case (which means it's safe for us to read)
180 // because:
181 //
182 // - .max is set to the number of readers at the time when the write happens
183 // - any joining readers will start at a later seat
184 // - so, at most .max readers will call .take() on this seat this time around the buffer
185 // - a reader must leave either *before* or *after* a call to recv. there are two cases:
186 //
187 // - it leaves before, rleft is decremented, but .take is not called
188 // - it leaves after, .take is called, but head has been incremented, so rleft will be
189 // decremented for the *next* seat, not this one
190 //
191 // so, either .take is called, and .read is incremented, or writer.rleft is incremented.
192 // thus, for a writer to modify this element, *all* readers at the time of the previous
193 // write to this seat must have either called .take or have left.
194 // - since we are one of those readers, this cannot be true, so it's safe for us to assume
195 // that there is no concurrent writer for this seat
196 let state = unsafe { &*self.state.get() };
197 assert!(
198 read < state.max,
199 "reader hit seat with exhausted reader count"
200 );
201
202 let mut waiting = None;
203
204 // NOTE
205 // we must extract the value *before* we decrement the number of remaining items otherwise,
206 // the object might be replaced by the time we read it!
207 let v = if read + 1 == state.max {
208 // we're the last reader, so we may need to notify the writer there's space in the buf.
209 // can be relaxed, since the acquire at the top already guarantees that we'll see
210 // updates.
211 waiting = self.waiting.take();
212
213 // since we're the last reader, no-one else will be cloning this value, so we can
214 // safely take a mutable reference, and just take the val instead of cloning it.
215 unsafe { &mut *self.state.get() }.val.take().unwrap()
216 } else {
217 let v = state
218 .val
219 .clone()
220 .expect("seat that should be occupied was empty");
221
222 // let writer know that we no longer need this item.
223 // state is no longer safe to access.
224 #[allow(clippy::drop_ref)]
225 drop(state);
226 v
227 };
228
229 self.read.fetch_add(1, atomic::Ordering::AcqRel);
230
231 if let Some(t) = waiting {
232 // writer was waiting for us to finish with this
233 t.unpark();
234 }
235
236 v
237 }
238}
239
240impl<T> Default for Seat<T> {
241 fn default() -> Self {
242 Seat {
243 read: atomic::AtomicUsize::new(0),
244 waiting: AtomicOption::empty(),
245 state: MutSeatState(UnsafeCell::new(SeatState { max: 0, val: None })),
246 }
247 }
248}
249
250/// `BusInner` encapsulates data that both the writer and the readers need to access. The tail is
251/// only ever modified by the producer, and read by the consumers. The length of the bus is
252/// instantiated when the bus is created, and is never modified.
253struct BusInner<T> {
254 ring: Vec<Seat<T>>,
255 len: usize,
256 tail: atomic::AtomicUsize,
257 closed: atomic::AtomicBool,
258}
259
260impl<T> fmt::Debug for BusInner<T> {
261 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
262 f.debug_struct("BusInner")
263 .field("ring", &self.ring)
264 .field("len", &self.len)
265 .field("tail", &self.tail)
266 .field("closed", &self.closed)
267 .finish()
268 }
269}
270
271/// `Bus` is the main interconnect for broadcast messages. It can be used to send broadcast
272/// messages, or to connect additional consumers. When the `Bus` is dropped, receivers will
273/// continue receiving any outstanding broadcast messages they would have received if the bus were
274/// not dropped. After all those messages have been received, any subsequent receive call on a
275/// receiver will return a disconnected error.
276pub struct Bus<T> {
277 state: Arc<BusInner<T>>,
278
279 // current number of readers
280 readers: usize,
281
282 // rleft keeps track of readers that should be skipped for each index. we must do this because
283 // .read will be < max for those indices, even though all active readers have received them.
284 rleft: Vec<usize>,
285
286 // leaving is used by receivers to signal that they are done
287 leaving: (mpsc::Sender<usize>, mpsc::Receiver<usize>),
288
289 // waiting is used by receivers to signal that they are waiting for new entries, and where they
290 // are waiting
291 #[allow(clippy::type_complexity)]
292 waiting: (
293 mpsc::Sender<(thread::Thread, usize)>,
294 mpsc::Receiver<(thread::Thread, usize)>,
295 ),
296
297 // channel used to communicate to unparker that a given thread should be woken up
298 unpark: mpsc::Sender<thread::Thread>,
299
300 // cache used to keep track of threads waiting for next write.
301 // this is only here to avoid allocating one on every broadcast()
302 cache: Vec<(thread::Thread, usize)>,
303}
304
305impl<T> fmt::Debug for Bus<T> {
306 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
307 f.debug_struct("Bus")
308 .field("state", &self.state)
309 .field("readers", &self.readers)
310 .field("rleft", &self.rleft)
311 .field("leaving", &self.leaving)
312 .field("waiting", &self.waiting)
313 .field("unpark", &self.unpark)
314 .field("cache", &self.cache)
315 .finish()
316 }
317}
318
319impl<T> Bus<T> {
320 /// Allocates a new `Bus`.
321 ///
322 /// The provided length should be sufficient to absorb temporary peaks in the data flow, and is
323 /// thus workflow-dependent. Bus performance degrades somewhat when the queue is full, so it is
324 /// generally better to set this high than low unless you are pressed for memory.
325 pub fn new(mut len: usize) -> Bus<T> {
326 use std::iter;
327
328 // ring buffer must have room for one padding element
329 len += 1;
330
331 let inner = Arc::new(BusInner {
332 ring: (0..len).map(|_| Seat::default()).collect(),
333 tail: atomic::AtomicUsize::new(0),
334 closed: atomic::AtomicBool::new(false),
335 len,
336 });
337
338 // work around https://github.com/rust-lang/rust/issues/59020
339 if !cfg!(miri) && cfg!(target = "macos") {
340 let _ = time::Instant::now().elapsed();
341 }
342
343 // we run a separate thread responsible for unparking
344 // so we don't have to wait for unpark() to return in broadcast_inner
345 // sending on a channel without contention is cheap, unparking is not
346 let (unpark_tx, unpark_rx) = mpsc::unbounded::<thread::Thread>();
347 let _ = thread::Builder::new()
348 .name("bus_unparking".to_owned())
349 .spawn(move || {
350 for t in unpark_rx.iter() {
351 t.unpark();
352 }
353 });
354
355 Bus {
356 state: inner,
357 readers: 0,
358 rleft: iter::repeat(0).take(len).collect(),
359 leaving: mpsc::unbounded(),
360 waiting: mpsc::unbounded(),
361 unpark: unpark_tx,
362
363 cache: Vec::new(),
364 }
365 }
366
367 /// Get the expected number of reads for the given seat. This number will always be
368 /// conservative, in that fewer reads may be fine. Specifically, `.rleft` may not be
369 /// sufficiently up-to-date to account for all readers that have left.
370 #[inline]
371 fn expected(&mut self, at: usize) -> usize {
372 // since only the producer will modify the ring, and &mut self guarantees that *we* are the
373 // producer, no-one is modifying the ring. Multiple read-only borrows are safe, and so the
374 // cast below is safe.
375 unsafe { &*self.state.ring[at].state.get() }.max - self.rleft[at]
376 }
377
378 /// Attempts to place the given value on the bus.
379 ///
380 /// If the bus is full, the behavior depends on `block`. If false, the value given is returned
381 /// in an `Err()`. Otherwise, the current thread will be parked until there is space in the bus
382 /// again, and the broadcast will be tried again until it succeeds.
383 ///
384 /// Note that broadcasts will succeed even if there are no consumers!
385 fn broadcast_inner(&mut self, val: T, block: bool) -> Result<(), T> {
386 let tail = self.state.tail.load(atomic::Ordering::Relaxed);
387
388 // we want to check if the next element over is free to ensure that we always leave one
389 // empty space between the head and the tail. This is necessary so that readers can
390 // distinguish between an empty and a full list. If the fence seat is free, the seat at
391 // tail must also be free, which is simple enough to show by induction (exercise for the
392 // reader).
393 let fence = (tail + 1) % self.state.len;
394
395 let spintime = time::Duration::new(0, SPINTIME);
396
397 // to avoid parking when a slot frees up quickly, we use an exponential back-off SpinWait.
398 let mut sw = SpinWait::new();
399 loop {
400 let fence_read = self.state.ring[fence].read.load(atomic::Ordering::Acquire);
401
402 // is there room left in the ring?
403 if fence_read == self.expected(fence) {
404 break;
405 }
406
407 // no!
408 // let's check if any readers have left, which might increment self.rleft[tail].
409 while let Ok(mut left) = self.leaving.1.try_recv() {
410 // a reader has left! this means that every seat between `left` and `tail-1`
411 // has max set one too high. we track the number of such "missing" reads that
412 // should be ignored in self.rleft, and compensate for them when looking at
413 // seat.read above.
414 self.readers -= 1;
415 while left != tail {
416 self.rleft[left] += 1;
417 left = (left + 1) % self.state.len
418 }
419 }
420
421 // is the fence block now free?
422 if fence_read == self.expected(fence) {
423 // yes! go ahead and write!
424 break;
425 } else if block {
426 // no, so block by parking and telling readers to notify on last read
427 self.state.ring[fence]
428 .waiting
429 .swap(Some(Box::new(thread::current())));
430
431 // need the atomic fetch_add to ensure reader threads will see the new .waiting
432 self.state.ring[fence]
433 .read
434 .fetch_add(0, atomic::Ordering::Release);
435
436 if !sw.spin() {
437 // not likely to get a slot soon -- wait to be unparked instead.
438 // note that we *need* to wait, because there are some cases in which we
439 // *won't* be unparked even though a slot has opened up.
440 thread::park_timeout(spintime);
441 }
442 continue;
443 } else {
444 // no, and blocking isn't allowed, so return an error
445 return Err(val);
446 }
447 }
448
449 // next one over is free, we have a free seat!
450 let readers = self.readers;
451 {
452 let next = &self.state.ring[tail];
453 // we are the only writer, so no-one else can be writing. however, since we're
454 // mutating state, we also need for there to be no readers for this to be safe. the
455 // argument for why this is the case is roughly an inverse of the argument for why
456 // the unsafe block in Seat.take() is safe. basically, since
457 //
458 // .read + .rleft == .max
459 //
460 // we know all readers at the time of the seat's previous write have accessed this
461 // seat. we also know that no other readers will access that seat (they must have
462 // started at later seats). thus, we are the only thread accessing this seat, and
463 // so we can safely access it as mutable.
464 let state = unsafe { &mut *next.state.get() };
465 state.max = readers;
466 state.val = Some(val);
467 next.waiting.take();
468 next.read.store(0, atomic::Ordering::Release);
469 }
470 self.rleft[tail] = 0;
471 // now tell readers that they can read
472 let tail = (tail + 1) % self.state.len;
473 self.state.tail.store(tail, atomic::Ordering::Release);
474
475 // unblock any blocked receivers
476 while let Ok((t, at)) = self.waiting.1.try_recv() {
477 // the only readers we can't unblock are those that have already absorbed the
478 // broadcast we just made, since they are blocking on the *next* broadcast
479 if at == tail {
480 self.cache.push((t, at))
481 } else {
482 self.unpark.send(t).unwrap();
483 }
484 }
485 for w in self.cache.drain(..) {
486 // fine to do here because it is guaranteed not to block
487 self.waiting.0.send(w).unwrap();
488 }
489
490 Ok(())
491 }
492
493 /// Attempt to broadcast the given value to all consumers, but does not block if full.
494 ///
495 /// Note that, in contrast to regular channels, a bus is *not* considered closed if there are
496 /// no consumers, and thus broadcasts will continue to succeed. Thus, a successful broadcast
497 /// occurs as long as there is room on the internal bus to store the value, or some older value
498 /// has been received by all consumers. Note that a return value of `Err` means that the data
499 /// will never be received (by any consumer), but a return value of Ok does not mean that the
500 /// data will be received by a given consumer. It is possible for a receiver to hang up
501 /// immediately after this function returns Ok.
502 ///
503 /// This method will never block the current thread.
504 ///
505 /// ```rust
506 /// use bus::Bus;
507 /// let mut tx = Bus::new(1);
508 /// let mut rx = tx.add_rx();
509 /// assert_eq!(tx.try_broadcast("Hello"), Ok(()));
510 /// assert_eq!(tx.try_broadcast("world"), Err("world"));
511 /// ```
512 pub fn try_broadcast(&mut self, val: T) -> Result<(), T> {
513 self.broadcast_inner(val, false)
514 }
515
516 /// Broadcasts a value on the bus to all consumers.
517 ///
518 /// This function will block until space in the internal buffer becomes available.
519 ///
520 /// Note that a successful send does not guarantee that the receiver will ever see the data if
521 /// there is a buffer on this channel. Items may be enqueued in the internal buffer for the
522 /// receiver to receive at a later time. Furthermore, in contrast to regular channels, a bus is
523 /// *not* considered closed if there are no consumers, and thus broadcasts will continue to
524 /// succeed.
525 pub fn broadcast(&mut self, val: T) {
526 if let Err(..) = self.broadcast_inner(val, true) {
527 unreachable!("blocking broadcast_inner can't fail");
528 }
529 }
530
531 /// Add a new consumer to this bus.
532 ///
533 /// The new consumer will receive all *future* broadcasts on this bus.
534 ///
535 /// # Examples
536 ///
537 /// ```rust
538 /// use bus::Bus;
539 /// use std::sync::mpsc::TryRecvError;
540 ///
541 /// let mut bus = Bus::new(10);
542 /// let mut rx1 = bus.add_rx();
543 ///
544 /// bus.broadcast("Hello");
545 ///
546 /// // consumer present during broadcast sees update
547 /// assert_eq!(rx1.recv(), Ok("Hello"));
548 ///
549 /// // new consumer does *not* see broadcast
550 /// let mut rx2 = bus.add_rx();
551 /// assert_eq!(rx2.try_recv(), Err(TryRecvError::Empty));
552 ///
553 /// // both consumers see new broadcast
554 /// bus.broadcast("world");
555 /// assert_eq!(rx1.recv(), Ok("world"));
556 /// assert_eq!(rx2.recv(), Ok("world"));
557 /// ```
558 pub fn add_rx(&mut self) -> BusReader<T> {
559 self.readers += 1;
560
561 BusReader {
562 bus: Arc::clone(&self.state),
563 head: self.state.tail.load(atomic::Ordering::Relaxed),
564 leaving: self.leaving.0.clone(),
565 waiting: self.waiting.0.clone(),
566 closed: false,
567 }
568 }
569
570 /// Returns the number of active consumers currently attached to this bus.
571 ///
572 /// It is not guaranteed that a sent message will reach this number of consumers, as active
573 /// consumers may never call `recv` or `try_recv` again before dropping.
574 ///
575 /// # Examples
576 ///
577 /// ```rust
578 /// use bus::Bus;
579 ///
580 /// let mut bus = Bus::<u8>::new(10);
581 /// assert_eq!(bus.rx_count(), 0);
582 ///
583 /// let rx1 = bus.add_rx();
584 /// assert_eq!(bus.rx_count(), 1);
585 ///
586 /// drop(rx1);
587 /// assert_eq!(bus.rx_count(), 0);
588 /// ```
589 pub fn rx_count(&self) -> usize {
590 self.readers - self.leaving.1.len()
591 }
592}
593
594impl<T> Drop for Bus<T> {
595 fn drop(&mut self) {
596 self.state.closed.store(true, atomic::Ordering::Relaxed);
597 // Acquire/Release .tail to ensure other threads see new .closed
598 self.state.tail.fetch_add(0, atomic::Ordering::AcqRel);
599 // TODO: unpark receivers -- this is not absolutely necessary, since the reader's park will
600 // time out, but it would cause them to detect the closed bus somewhat faster.
601 }
602}
603
604#[derive(Clone, Copy)]
605enum RecvCondition {
606 Try,
607 Block,
608 Timeout(time::Duration),
609}
610
611/// A `BusReader` is a single consumer of `Bus` broadcasts. It will see every new value that is
612/// passed to `.broadcast()` (or successful calls to `.try_broadcast()`) on the `Bus` that it was
613/// created from.
614///
615/// Dropping a `BusReader` is perfectly safe, and will unblock the writer if it was waiting for
616/// that read to see a particular update.
617///
618/// ```rust
619/// use bus::Bus;
620/// let mut tx = Bus::new(1);
621/// let mut r1 = tx.add_rx();
622/// let r2 = tx.add_rx();
623/// assert_eq!(tx.try_broadcast(true), Ok(()));
624/// assert_eq!(r1.recv(), Ok(true));
625///
626/// // the bus does not have room for another broadcast
627/// // since it knows r2 has not yet read the first broadcast
628/// assert_eq!(tx.try_broadcast(true), Err(true));
629///
630/// // dropping r2 tells the producer that there is a free slot
631/// // (i.e., it has been read by everyone)
632/// drop(r2);
633/// assert_eq!(tx.try_broadcast(true), Ok(()));
634/// ```
635pub struct BusReader<T> {
636 bus: Arc<BusInner<T>>,
637 head: usize,
638 leaving: mpsc::Sender<usize>,
639 waiting: mpsc::Sender<(thread::Thread, usize)>,
640 closed: bool,
641}
642
643impl<T> fmt::Debug for BusReader<T> {
644 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
645 f.debug_struct("BusReader")
646 .field("bus", &self.bus)
647 .field("head", &self.head)
648 .field("leaving", &self.leaving)
649 .field("waiting", &self.waiting)
650 .field("closed", &self.closed)
651 .finish()
652 }
653}
654
655impl<T: Clone + Sync> BusReader<T> {
656 /// Attempts to read a broadcast from the bus.
657 ///
658 /// If the bus is empty, the behavior depends on `block`. If false,
659 /// `Err(mpsc::RecvTimeoutError::Timeout)` is returned. Otherwise, the current thread will be
660 /// parked until there is another broadcast on the bus, at which point the receive will be
661 /// performed.
662 fn recv_inner(&mut self, block: RecvCondition) -> Result<T, std_mpsc::RecvTimeoutError> {
663 if self.closed {
664 return Err(std_mpsc::RecvTimeoutError::Disconnected);
665 }
666
667 let start = match block {
668 RecvCondition::Timeout(_) => Some(time::Instant::now()),
669 _ => None,
670 };
671
672 let spintime = time::Duration::new(0, SPINTIME);
673
674 let mut was_closed = false;
675 let mut sw = SpinWait::new();
676 let mut first = true;
677 loop {
678 let tail = self.bus.tail.load(atomic::Ordering::Acquire);
679 if tail != self.head {
680 break;
681 }
682
683 // buffer is empty, check whether it's closed.
684 // relaxed is fine since Bus.drop does an acquire/release on tail
685 if self.bus.closed.load(atomic::Ordering::Relaxed) {
686 // we need to check again that there's nothing in the bus, otherwise we might have
687 // missed a write between when we did the read of .tail above and when we read
688 // .closed here
689 if !was_closed {
690 was_closed = true;
691 continue;
692 }
693
694 // the bus is closed, and we didn't miss anything!
695 self.closed = true;
696 return Err(std_mpsc::RecvTimeoutError::Disconnected);
697 }
698
699 // not closed, should we block?
700 if let RecvCondition::Try = block {
701 return Err(std_mpsc::RecvTimeoutError::Timeout);
702 }
703
704 // park and tell writer to notify on write
705 if first {
706 if let Err(..) = self.waiting.send((thread::current(), self.head)) {
707 // writer has gone away, but somehow we _just_ missed the close signal (in
708 // self.bus.closed). iterate again to ensure the channel is _actually_ empty.
709 atomic::fence(atomic::Ordering::SeqCst);
710 continue;
711 }
712 first = false;
713 }
714
715 if !sw.spin() {
716 match block {
717 RecvCondition::Timeout(t) => {
718 match t.checked_sub(start.as_ref().unwrap().elapsed()) {
719 Some(left) => {
720 if left < spintime {
721 thread::park_timeout(left);
722 } else {
723 thread::park_timeout(spintime);
724 }
725 }
726 None => {
727 // So, the wake-up thread is still going to try to wake us up later
728 // since we sent thread::current() above, but that's fine.
729 return Err(std_mpsc::RecvTimeoutError::Timeout);
730 }
731 }
732 }
733 RecvCondition::Block => {
734 thread::park_timeout(spintime);
735 }
736 RecvCondition::Try => unreachable!(),
737 }
738 }
739 }
740
741 let head = self.head;
742 let ret = self.bus.ring[head].take();
743
744 // safe because len is read-only
745 self.head = (head + 1) % self.bus.len;
746 Ok(ret)
747 }
748
749 /// Attempts to return a pending broadcast on this receiver without blocking.
750 ///
751 /// This method will never block the caller in order to wait for data to become available.
752 /// Instead, this will always return immediately with a possible option of pending data on the
753 /// channel.
754 ///
755 /// If the corresponding bus has been dropped, and all broadcasts have been received, this
756 /// method will return with a disconnected error.
757 ///
758 /// This method is useful for a flavor of "optimistic check" before deciding to block on a
759 /// receiver.
760 ///
761 /// ```rust
762 /// use bus::Bus;
763 /// use std::thread;
764 ///
765 /// let mut tx = Bus::new(10);
766 /// let mut rx = tx.add_rx();
767 ///
768 /// // spawn a thread that will broadcast at some point
769 /// let j = thread::spawn(move || {
770 /// tx.broadcast(true);
771 /// });
772 ///
773 /// loop {
774 /// match rx.try_recv() {
775 /// Ok(val) => {
776 /// assert_eq!(val, true);
777 /// break;
778 /// }
779 /// Err(..) => {
780 /// // maybe we can do other useful work here
781 /// // or we can just busy-loop
782 /// thread::yield_now()
783 /// },
784 /// }
785 /// }
786 ///
787 /// j.join().unwrap();
788 /// ```
789 pub fn try_recv(&mut self) -> Result<T, std_mpsc::TryRecvError> {
790 self.recv_inner(RecvCondition::Try).map_err(|e| match e {
791 std_mpsc::RecvTimeoutError::Disconnected => std_mpsc::TryRecvError::Disconnected,
792 std_mpsc::RecvTimeoutError::Timeout => std_mpsc::TryRecvError::Empty,
793 })
794 }
795
796 /// Read another broadcast message from the bus, and block if none are available.
797 ///
798 /// This function will always block the current thread if there is no data available and it's
799 /// possible for more broadcasts to be sent. Once a broadcast is sent on the corresponding
800 /// `Bus`, then this receiver will wake up and return that message.
801 ///
802 /// If the corresponding `Bus` has been dropped, or it is dropped while this call is blocking,
803 /// this call will wake up and return `Err` to indicate that no more messages can ever be
804 /// received on this channel. However, since channels are buffered, messages sent before the
805 /// disconnect will still be properly received.
806 pub fn recv(&mut self) -> Result<T, std_mpsc::RecvError> {
807 match self.recv_inner(RecvCondition::Block) {
808 Ok(val) => Ok(val),
809 Err(std_mpsc::RecvTimeoutError::Disconnected) => Err(std_mpsc::RecvError),
810 _ => unreachable!("blocking recv_inner can't fail"),
811 }
812 }
813
814 /// Attempts to wait for a value from the bus, returning an error if the corresponding channel
815 /// has hung up, or if it waits more than `timeout`.
816 ///
817 /// This function will always block the current thread if there is no data available and it's
818 /// possible for more broadcasts to be sent. Once a message is sent on the corresponding `Bus`,
819 /// then this receiver will wake up and return that message.
820 ///
821 /// If the corresponding `Bus` has been dropped, or it is dropped while this call is blocking,
822 /// this call will wake up and return Err to indicate that no more messages can ever be
823 /// received on this channel. However, since channels are buffered, messages sent before the
824 /// disconnect will still be properly received.
825 ///
826 /// # Examples
827 ///
828 /// ```rust
829 /// use bus::Bus;
830 /// use std::sync::mpsc::RecvTimeoutError;
831 /// use std::time::Duration;
832 ///
833 /// let mut tx = Bus::<bool>::new(10);
834 /// let mut rx = tx.add_rx();
835 ///
836 /// let timeout = Duration::from_millis(100);
837 /// assert_eq!(Err(RecvTimeoutError::Timeout), rx.recv_timeout(timeout));
838 /// ```
839 pub fn recv_timeout(
840 &mut self,
841 timeout: time::Duration,
842 ) -> Result<T, std_mpsc::RecvTimeoutError> {
843 self.recv_inner(RecvCondition::Timeout(timeout))
844 }
845}
846
847impl<T> BusReader<T> {
848 /// Returns an iterator that will block waiting for broadcasts. It will return `None` when the
849 /// bus has been closed (i.e., the `Bus` has been dropped).
850 pub fn iter(&mut self) -> BusIter<'_, T> {
851 BusIter(self)
852 }
853}
854
855impl<T> Drop for BusReader<T> {
856 #[allow(unused_must_use)]
857 fn drop(&mut self) {
858 // we allow not checking the result here because the writer might have gone away, which
859 // would result in an error, but is okay nonetheless.
860 self.leaving.send(self.head);
861 }
862}
863
864/// An iterator over messages on a receiver. This iterator will block whenever `next` is called,
865/// waiting for a new message, and `None` will be returned when the corresponding channel has been
866/// closed.
867pub struct BusIter<'a, T>(&'a mut BusReader<T>);
868
869/// An owning iterator over messages on a receiver. This iterator will block whenever `next` is
870/// called, waiting for a new message, and `None` will be returned when the corresponding bus has
871/// been closed.
872pub struct BusIntoIter<T>(BusReader<T>);
873
874impl<'a, T: Clone + Sync> IntoIterator for &'a mut BusReader<T> {
875 type Item = T;
876 type IntoIter = BusIter<'a, T>;
877 fn into_iter(self) -> BusIter<'a, T> {
878 BusIter(self)
879 }
880}
881
882impl<T: Clone + Sync> IntoIterator for BusReader<T> {
883 type Item = T;
884 type IntoIter = BusIntoIter<T>;
885 fn into_iter(self) -> BusIntoIter<T> {
886 BusIntoIter(self)
887 }
888}
889
890impl<'a, T: Clone + Sync> Iterator for BusIter<'a, T> {
891 type Item = T;
892 fn next(&mut self) -> Option<T> {
893 self.0.recv().ok()
894 }
895}
896
897impl<T: Clone + Sync> Iterator for BusIntoIter<T> {
898 type Item = T;
899 fn next(&mut self) -> Option<T> {
900 self.0.recv().ok()
901 }
902}
903
904struct AtomicOption<T> {
905 ptr: atomic::AtomicPtr<T>,
906 _marker: PhantomData<Option<Box<T>>>,
907}
908
909impl<T> fmt::Debug for AtomicOption<T> {
910 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
911 f.debug_struct("AtomicOption")
912 .field("ptr", &self.ptr)
913 .finish()
914 }
915}
916
917unsafe impl<T: Send> Send for AtomicOption<T> {}
918unsafe impl<T: Send> Sync for AtomicOption<T> {}
919
920impl<T> AtomicOption<T> {
921 fn empty() -> Self {
922 Self {
923 ptr: atomic::AtomicPtr::new(ptr::null_mut()),
924 _marker: PhantomData,
925 }
926 }
927
928 fn swap(&self, val: Option<Box<T>>) -> Option<Box<T>> {
929 let old = match val {
930 Some(val) => self.ptr.swap(Box::into_raw(val), atomic::Ordering::AcqRel),
931 // Acquire is needed to synchronize with the store of a non-null ptr, but since a null ptr
932 // will never be dereferenced, there is no need to synchronize the store of a null ptr.
933 None => self.ptr.swap(ptr::null_mut(), atomic::Ordering::Acquire),
934 };
935 if old.is_null() {
936 None
937 } else {
938 // SAFETY:
939 // - AcqRel/Acquire ensures that it does not read a pointer to potentially invalid memory.
940 // - We've checked that old is not null.
941 // - We do not store invalid pointers other than null in self.ptr.
942 Some(unsafe { Box::from_raw(old) })
943 }
944 }
945
946 fn take(&self) -> Option<Box<T>> {
947 self.swap(None)
948 }
949}
950
951impl<T> Drop for AtomicOption<T> {
952 fn drop(&mut self) {
953 drop(self.take());
954 }
955}