1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302
//! # Lock-free Bounded Non-Blocking Pub-Sub Queue
//!
//! This is a publish subscribe pattern queue, where the publisher is never blocked by
//! slow subscribers. The side effect is that slow subscribers will miss messages. The intended
//! use-case are high throughput streams where receiving the latest message is prioritized over
//! receiving the entire stream. Market Data Feeds, Live Streams, etc....
//!
//! The underlying data-structure is a vector of Arc(s) eliminating the use of copies.
//!
//!## Features
//! * Lock-Free Write/Read - Lock-Free for Publisher and Lock-Free for Subscribers.
//! * Bounded - Constant size of memory used, max is `sizeof(MsgObject)*(queue_size + sub_cnt + 1)`.
//! This is an edge-case where each subscriber is holding a ref to an object while the publisher
//! has published a full length of queue in the mean time.
//! * Non-Blocking - The queue never blocks the publisher, slow subscribers miss data proportinal to
//! their speed.
//! * Pub-Sub - Every Subscriber that can keep up with the Publisher will recieve all the data the
//! Publisher publishes.
//! * [`sync`]/[`async`] - both interfaces are provided, as well as a bare queue implementation
//! without the thread synchronisation ,and futures logic.
//! * std::sync::mpsc like interface - The API is modeled after the standard library mpsc queue,
//! channel function are used to create a tuple of (Publisher, Subscriber), while the Clone trait on Subscribre
//!
//! [`sync::Publisher`], [`async::Publisher`], and [`BarePublisher`] are used to broadcast data to
//! [`sync::Subscriber`], [`async::Subscriber`], and [`BareSubscriber`] pools. Subscribers are
//! clone-able such that many threads, or futures, can receive data simultaneously. The only
//! limitation is that Subscribers have to keep up with the frequency of the Publisher. If a
//! Subscriber is slow it will drop data.
//!
//! ## Disconnection
//!
//! The broadcast and receive operations on channels will all return a [`Result`]
//! indicating whether the operation succeeded or not. An unsuccessful operation
//! is normally indicative of the other half of a channel having "hung up" by
//! being dropped in its corresponding thread.
//!
//! Once half of a channel has been deallocated, most operations can no longer
//! continue to make progress, so [`Err`] will be returned. Many applications
//! will continue to [`unwrap`] the results returned from this module,
//! instigating a propagation of failure among threads if one unexpectedly dies.
//!
//!
//! # Examples
//! ## Simple bare usage
//! ```
//! extern crate bus_queue;
//! use bus_queue::bare_channel;
//!
//!fn main() {
//! let (mut tx,rx) = bare_channel(1);
//!
//! tx.broadcast(4).unwrap();
//! assert_eq!(4,*rx.try_recv().unwrap());
//!}
//! ```
//! ## Simple synchronous usage
//! ```
//! extern crate bus_queue;
//!
//! use bus_queue::sync;
//! use std::thread;
//! fn main() {
//! // Create a sync channel
//! let (mut tx, rx) = sync::channel(1);
//! let t = thread::spawn(move|| {
//! let received = rx.recv().unwrap();
//! assert_eq!(*received, 10);
//! });
//! tx.broadcast(10).unwrap();
//! t.join().unwrap();
//!}
//! ```
//! ## Simple asynchronous usage
//! ```
//! extern crate bus_queue;
//! extern crate futures;
//! extern crate tokio;
//!
//! use bus_queue::async;
//! use futures::future::Future;
//! use futures::*;
//! use tokio::runtime::Runtime;
//!
//! fn subscriber(rx: async::Subscriber<i32>) -> impl Future<Item = (), Error = ()> {
//! assert_eq!(
//! rx.map(|x| *x).collect().wait().unwrap(),
//! vec![1, 2, 3, 4, 5]
//! );
//! future::ok(())
//! }
//!
//! fn main() {
//! let mut rt = Runtime::new().unwrap();
//! let (tx, rx): (async::Publisher<i32>, async::Subscriber<i32>) = async::channel(10);
//!
//! let publisher = stream::iter_ok(vec![1, 2, 3, 3, 5])
//! .forward(tx)
//! .and_then(|(_, mut sink)| sink.close())
//! .map_err(|_| ())
//! .map(|_| ());
//!
//! rt.spawn(publisher);
//! rt.block_on(subscriber(rx)).unwrap();
//! }
//! ```
//!
//! [`BarePublisher`]: struct.BarePublisher.html
//! [`BareSubscriber`]: struct.BareSubscriber.html
//! [`sync`]: sync/index.html
//! [`async`]: async/index.html
//! [`sync::Publisher`]: sync/struct.Publisher.html
//! [`sync::Subscriber`]: sync/struct.Subscriber.html
//! [`async::Publisher`]: async/struct.Publisher.html
//! [`async::Subscriber`]: async/struct.Subscriber.html
//! [`Result`]: ../../../std/result/enum.Result.html
//! [`Err`]: ../../../std/result/enum.Result.html#variant.Err
//! [`unwrap`]: ../../../std/result/enum.Result.html#method.unwrap
extern crate arc_swap;
use arc_swap::{ArcSwap, ArcSwapOption};
pub use std::sync::mpsc::{RecvError, RecvTimeoutError, SendError, TryRecvError};
use std::sync::{atomic::AtomicBool, atomic::AtomicUsize, atomic::Ordering, mpsc, Arc};
use std::iter::{Iterator};
/// Bare implementation of the publisher.
#[derive(Debug)]
pub struct BarePublisher<T: Send> {
buffer: Arc<Vec<ArcSwapOption<T>>>,
wi: Arc<AtomicUsize>,
size: usize,
sub_cnt: Arc<AtomicUsize>,
pub_available: Arc<AtomicBool>,
}
/// Bare implementation of the subscriber.
#[derive(Debug)]
pub struct BareSubscriber<T: Send> {
buffer: Arc<Vec<ArcSwapOption<T>>>,
wi: Arc<AtomicUsize>,
ri: AtomicUsize,
size: usize,
sub_cnt: Arc<AtomicUsize>,
pub_available: Arc<AtomicBool>,
}
/// Function used to create and initialise a ( BarePublisher, BareSubscriber ) tuple.
pub fn bare_channel<T: Send>(size: usize) -> (BarePublisher<T>, BareSubscriber<T>) {
let mut buffer = Vec::new();
buffer.resize(size, ArcSwapOption::new(None));
let buffer = Arc::new(buffer);
let sub_cnt = Arc::new(AtomicUsize::new(1));
let wi = Arc::new(AtomicUsize::new(0));
let pub_available = Arc::new(AtomicBool::new(true));
(
BarePublisher {
buffer: buffer.clone(),
size,
wi: wi.clone(),
sub_cnt: sub_cnt.clone(),
pub_available: pub_available.clone(),
},
BareSubscriber {
buffer: buffer.clone(),
size,
wi: wi.clone(),
ri: AtomicUsize::new(0),
sub_cnt: sub_cnt.clone(),
pub_available: pub_available.clone(),
},
)
}
impl<T: Send> BarePublisher<T> {
/// Publishes values to the circular buffer at wi % size
/// # Arguments
/// * `object` - owned object to be published
pub fn broadcast(&mut self, object: T) -> Result<(), SendError<T>> {
if self.sub_cnt.load(Ordering::Relaxed) == 0 {
return Err(SendError(object));
}
self.buffer[self.wi.load(Ordering::Relaxed) % self.size].store(Some(Arc::new(object)));
self.wi.fetch_add(1, Ordering::Relaxed);
Ok(())
}
}
/// Drop trait is used to let subscribers know that publisher is no longer available.
impl<T: Send> Drop for BarePublisher<T> {
fn drop(&mut self) {
self.pub_available.store(false, Ordering::Relaxed);
}
}
impl<T: Send> BareSubscriber<T> {
/// Receives some atomic reference to an object if queue is not empty, or None if it is. Never
/// Blocks
pub fn try_recv(&self) -> Result<Arc<T>, TryRecvError> {
if self.ri.load(Ordering::Relaxed) == self.wi.load(Ordering::Relaxed) {
if self.pub_available.load(Ordering::Relaxed) == false {
return Err(TryRecvError::Disconnected);
}
return Err(TryRecvError::Empty);
}
loop {
match self.buffer[self.ri.load(Ordering::Relaxed) % self.size].load() {
Some(some) => if self.wi.load(Ordering::Relaxed)
> self.ri.load(Ordering::Relaxed) + self.size
{
self.ri.store(
self.wi.load(Ordering::Relaxed) - self.size,
Ordering::Relaxed,
);
} else {
self.ri.fetch_add(1, Ordering::Relaxed);
return Ok(some);
},
None => unreachable!(),
}
}
}
}
/// Clone trait is used to create another BareSubscriber object, subscribed to the same
/// Publisher the initial object was subscribed to.
impl<T: Send> Clone for BareSubscriber<T> {
fn clone(&self) -> Self {
self.sub_cnt.fetch_add(1, Ordering::Relaxed);
Self {
buffer: self.buffer.clone(),
wi: self.wi.clone(),
ri: AtomicUsize::new(self.ri.load(Ordering::Relaxed)),
size: self.size,
sub_cnt: self.sub_cnt.clone(),
pub_available: self.pub_available.clone(),
}
}
}
impl<T: Send> Drop for BareSubscriber<T> {
fn drop(&mut self) {
self.sub_cnt.fetch_sub(1, Ordering::Relaxed);
}
}
impl<T: Send> Iterator for BareSubscriber<T> {
type Item = Arc<T>;
fn next(&mut self) -> Option<Self::Item>{
match self.try_recv() {
Ok(item) => Some(item),
Err(_) => None
}
}
}
/// Helper struct used by sync and async implementations to wake Tasks / Threads
#[derive(Debug)]
struct Waker<T> {
/// Vector of Tasks / Threads to be woken up.
pub sleepers: Vec<Arc<T>>,
/// A mpsc Receiver used to receive Tasks / Threads to be registered.
receiver: mpsc::Receiver<Arc<T>>,
}
/// Helper struct used by sync and async implementations to register Tasks / Threads to
/// be woken up.
#[derive(Debug)]
struct Sleeper<T> {
/// Current Task / Thread to be woken up.
pub sleeper: Arc<T>,
/// mpsc Sender used to register Task / Thread.
pub sender: mpsc::Sender<Arc<T>>,
}
impl<T> Waker<T> {
/// Register all the Tasks / Threads sent for registration.
pub fn register_receivers(&mut self) {
for receiver in self.receiver.try_recv() {
self.sleepers.push(receiver);
}
}
}
/// Function used to create a ( Waker, Sleeper ) tuple.
fn alarm<T>(current: T) -> (Waker<T>, Sleeper<T>) {
let mut vec = Vec::new();
let (sender, receiver) = mpsc::channel();
let arc_t = Arc::new(current);
vec.push(arc_t.clone());
(
Waker {
sleepers: vec,
receiver,
},
Sleeper {
sleeper: arc_t.clone(),
sender,
},
)
}
pub mod sync;
#[cfg(feature = "async")]
extern crate futures;
#[cfg(feature = "async")]
pub mod async;