tari_broadcast_channel/
lib.rs

1//! # Lock-free Bounded Non-Blocking Pub-Sub Queue
2//!
3//! This is a publish subscribe pattern queue, where the publisher is never blocked by
4//! slow subscribers. The side effect is that slow subscribers will miss messages. The intended
5//! use-case are high throughput streams where receiving the latest message is prioritized over
6//! receiving the entire stream. Market Data Feeds, Live Streams, etc....
7//!
8//! The underlying data-structure is a vector of Arc(s) eliminating the use of copies.
9//!
10//! ## Features
11//! * Lock-Free Write/Read - Lock-Free for Publisher and Lock-Free for Subscribers.
12//! * Bounded - Constant size of memory used, max is `sizeof(MsgObject)*(queue_size + sub_cnt + 1)`. This is an
13//!   edge-case where each subscriber is holding a ref to an object while the publisher has published a full length of
14//!   queue in the mean time.
15//! * Non-Blocking - The queue never blocks the publisher, slow subscribers miss data proportinal to their speed.
16//! * Pub-Sub - Every Subscriber that can keep up with the Publisher will recieve all the data the Publisher publishes.
17//! * [`sync`]/[`async`] - both interfaces are provided, as well as a bare queue implementation without the thread
18//!   synchronisation ,and futures logic.
19//! * std::sync::mpsc like interface - The API is modeled after the standard library mpsc queue, channel function are
20//!   used to create a tuple of (Publisher, Subscriber), while the Clone trait on Subscriber creates additional
21//!   subscribers to the same Publisher.
22//!
23//! [`sync::Publisher`], [`async::Publisher`], and [`BarePublisher`] are used to broadcast data to
24//! [`sync::Subscriber`], [`async::Subscriber`], and [`BareSubscriber`] pools. Subscribers are
25//! clone-able such that many threads, or futures, can receive data simultaneously. The only
26//! limitation is that Subscribers have to keep up with the frequency of the Publisher. If a
27//! Subscriber is slow it will drop data.
28//!
29//! ## Disconnection
30//!
31//! The broadcast and receive operations on channels will all return a [`Result`]
32//! indicating whether the operation succeeded or not. An unsuccessful operation
33//! is normally indicative of the other half of a channel having "hung up" by
34//! being dropped in its corresponding thread.
35//!
36//! Once half of a channel has been deallocated, most operations can no longer
37//! continue to make progress, so [`Err`] will be returned. Many applications
38//! will continue to [`unwrap`] the results returned from this module,
39//! instigating a propagation of failure among threads if one unexpectedly dies.
40//!
41//!
42//! # Examples
43//! ## Simple raw usage
44//! ```rust
45//! use tari_broadcast_channel::raw_bounded;
46//!
47//! let (tx, rx) = raw_bounded(10, 1);
48//! (1..15).for_each(|x| tx.broadcast(x).unwrap());
49//!
50//! let received: Vec<i32> = rx.map(|x| *x).collect();
51//! // Test that only the last 10 elements are in the received list.
52//! let expected: Vec<i32> = (5..15).collect();
53//!
54//! assert_eq!(expected, received);
55//! ```
56//! ## Simple asynchronous usage
57//! ```rust
58//! # use tari_broadcast_channel::bounded;
59//! # use futures::executor::block_on;
60//! # use futures::StreamExt;
61//! # use futures::stream;
62//!
63//! let (publisher, subscriber1) = bounded(10, 2);
64//! let subscriber2 = subscriber1.clone();
65//!
66//! block_on(async move {
67//!     stream::iter(1..15).map(|i| Ok(i)).forward(publisher).await.unwrap();
68//! });
69//!
70//! let received1: Vec<u32> = block_on(async { subscriber1.map(|x| *x).collect().await });
71//! let received2: Vec<u32> = block_on(async { subscriber2.map(|x| *x).collect().await });
72//! // Test that only the last 10 elements are in the received list.
73//! let expected = (5..15).collect::<Vec<u32>>();
74//! assert_eq!(received1, expected);
75//! assert_eq!(received2, expected);
76//! ```
77//!
78//! [`BarePublisher`]: struct.BarePublisher.html
79//! [`BareSubscriber`]: struct.BareSubscriber.html
80//! [`sync`]: sync/index.html
81//! [`async`]: async/index.html
82//! [`sync::Publisher`]: sync/struct.Publisher.html
83//! [`sync::Subscriber`]: sync/struct.Subscriber.html
84//! [`async::Publisher`]: async/struct.Publisher.html
85//! [`async::Subscriber`]: async/struct.Subscriber.html
86//! [`Result`]: ../../../std/result/enum.Result.html
87//! [`Err`]: ../../../std/result/enum.Result.html#variant.Err
88//! [`unwrap`]: ../../../std/result/enum.Result.html#method.unwrap
89
90mod async_channel;
91pub(crate) mod atomic_counter;
92mod channel;
93
94pub use async_channel::{bounded, Publisher, Subscriber};
95pub use channel::{bounded as raw_bounded, SendError};