1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
//! # Lock-free Bounded Non-Blocking Pub-Sub Queue //! //! This is a publish subscribe pattern queue, where the publisher is never blocked by //! slow subscribers. The side effect is that slow subscribers will miss messages. The intended //! use-case are high throughput streams where receiving the latest message is prioritized over //! receiving the entire stream. Market Data Feeds, Live Streams, etc.... //! //! The underlying data-structure is a vector of Arc(s) eliminating the use of copies. //! //!## Features //! * Lock-Free Write/Read - Lock-Free for Publisher and Lock-Free for Subscribers. //! * Bounded - Constant size of memory used, max is `sizeof(MsgObject)*(queue_size + sub_cnt + 1)`. //! This is an edge-case where each subscriber is holding a ref to an object while the publisher //! has published a full length of queue in the mean time. //! * Non-Blocking - The queue never blocks the publisher, slow subscribers miss data proportinal to //! their speed. //! * Pub-Sub - Every Subscriber that can keep up with the Publisher will recieve all the data the //! Publisher publishes. //! * [`sync`]/[`async`] - both interfaces are provided, as well as a bare queue implementation //! without the thread synchronisation ,and futures logic. //! * std::sync::mpsc like interface - The API is modeled after the standard library mpsc queue, //! channel function are used to create a tuple of (Publisher, Subscriber), while the Clone trait //! on Subscriber creates additional subscribers to the same Publisher. //! //! [`sync::Publisher`], [`async::Publisher`], and [`BarePublisher`] are used to broadcast data to //! [`sync::Subscriber`], [`async::Subscriber`], and [`BareSubscriber`] pools. Subscribers are //! clone-able such that many threads, or futures, can receive data simultaneously. The only //! limitation is that Subscribers have to keep up with the frequency of the Publisher. If a //! Subscriber is slow it will drop data. //! //! ## Disconnection //! //! The broadcast and receive operations on channels will all return a [`Result`] //! indicating whether the operation succeeded or not. An unsuccessful operation //! is normally indicative of the other half of a channel having "hung up" by //! being dropped in its corresponding thread. //! //! Once half of a channel has been deallocated, most operations can no longer //! continue to make progress, so [`Err`] will be returned. Many applications //! will continue to [`unwrap`] the results returned from this module, //! instigating a propagation of failure among threads if one unexpectedly dies. //! //! //! # Examples //! ## Simple raw usage //! ```rust //! extern crate bus_queue; //! //! use bus_queue::raw_bounded; //! //! fn main() { //! let (tx, rx) = raw_bounded(10); //! (1..15).for_each(|x| { tx.broadcast(x).unwrap() }); //! //! let received: Vec<i32> = rx.map(|x| *x).collect(); //! // Test that only the last 10 elements are in the received list. //! let expected: Vec<i32> = (5..15).collect(); //! //! assert_eq!(expected, received); //! } //! ``` //! ## Simple asynchronous usage //! ```rust //! # use bus_queue::bounded; //! # use futures::{executor::block_on, StreamExt, stream}; //! //! let (publisher, subscriber1) = bounded(10); //! let subscriber2 = subscriber1.clone(); //! //! block_on(async move { //! stream::iter(1..15) //! .map(|i| Ok(i)) //! .forward(publisher) //! .await //! .unwrap(); //! }); //! //! let received1: Vec<u32> = block_on(async { subscriber1.map(|x| *x).collect().await }); //! let received2: Vec<u32> = block_on(async { subscriber2.map(|x| *x).collect().await }); //! // Test that only the last 10 elements are in the received list. //! let expected = (5..15).collect::<Vec<u32>>(); //! assert_eq!(received1, expected); //! assert_eq!(received2, expected); //! ``` //! //! [`BarePublisher`]: struct.BarePublisher.html //! [`BareSubscriber`]: struct.BareSubscriber.html //! [`sync`]: sync/index.html //! [`async`]: async/index.html //! [`sync::Publisher`]: sync/struct.Publisher.html //! [`sync::Subscriber`]: sync/struct.Subscriber.html //! [`async::Publisher`]: async/struct.Publisher.html //! [`async::Subscriber`]: async/struct.Subscriber.html //! [`Result`]: ../../../std/result/enum.Result.html //! [`Err`]: ../../../std/result/enum.Result.html#variant.Err //! [`unwrap`]: ../../../std/result/enum.Result.html#method.unwrap pub(crate) mod atomic_counter; pub mod bus; pub mod channel; pub mod flavors; pub(crate) mod piper; pub mod swap_slot; #[cfg(not(any(feature = "rwlock-export")))] pub use flavors::arc_swap::{bounded, raw_bounded, Publisher, Slot, Subscriber}; #[cfg(feature = "rwlock-export")] pub use flavors::rw_lock::{bounded, raw_bounded, Publisher, Slot, Subscriber}; // #[cfg(feature = "conc-atomic")] // pub use flavors::conc_atomic::{bounded, raw_bounded, Publisher, Subscriber};