1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
//! # Lock-free Bounded Non-Blocking Pub-Sub Queue
//!
//! This is a publish subscribe pattern queue, where the publisher is never blocked by
//! slow subscribers. The side effect is that slow subscribers will miss messages. The intended
//! use-case are high throughput streams where receiving the latest message is prioritized over
//! receiving the entire stream. Market Data Feeds, Live Streams, etc....
//!
//! The underlying data-structure is a vector of Arc(s) eliminating the use of copies.
//!
//!## Features
//! * Lock-Free Write/Read - Lock-Free for Publisher and Lock-Free for Subscribers.
//! * Bounded - Constant size of memory used, max is `sizeof(MsgObject)*(queue_size + sub_cnt + 1)`.
//!   This is an edge-case where each subscriber is holding a ref to an object while the publisher
//!   has published a full length of queue in the mean time.
//! * Non-Blocking - The queue never blocks the publisher, slow subscribers miss data proportinal to
//!   their speed.
//! * Pub-Sub - Every Subscriber that can keep up with the Publisher will recieve all the data the
//!   Publisher publishes.
//! * [`sync`]/[`async`] - both interfaces are provided, as well as a bare queue implementation
//!   without the thread synchronisation ,and futures logic.
//! * std::sync::mpsc like interface - The API is modeled after the standard library mpsc queue,
//!   channel function are used to create a tuple of (Publisher, Subscriber), while the Clone trait on Subscribre
//!
//! [`sync::Publisher`], [`async::Publisher`], and [`BarePublisher`] are used to broadcast data to
//! [`sync::Subscriber`], [`async::Subscriber`], and [`BareSubscriber`] pools. Subscribers are
//! clone-able such that many threads, or futures, can receive data simultaneously. The only
//! limitation is that Subscribers have to keep up with the frequency of the Publisher. If a
//! Subscriber is slow it will drop data.
//!
//! ## Disconnection
//!
//! The broadcast and receive operations on channels will all return a [`Result`]
//! indicating whether the operation succeeded or not. An unsuccessful operation
//! is normally indicative of the other half of a channel having "hung up" by
//! being dropped in its corresponding thread.
//!
//! Once half of a channel has been deallocated, most operations can no longer
//! continue to make progress, so [`Err`] will be returned. Many applications
//! will continue to [`unwrap`] the results returned from this module,
//! instigating a propagation of failure among threads if one unexpectedly dies.
//!
//!
//! # Examples
//! ## Simple bare usage
//! ```
//! extern crate bus_queue;
//! use bus_queue::bare_channel;
//!
//!fn main() {
//!    let (mut tx,rx) = bare_channel(1);
//!
//!    tx.broadcast(4).unwrap();
//!    assert_eq!(4,*rx.try_recv().unwrap());
//!}
//! ```
//! ## Simple synchronous usage
//! ```
//! extern crate bus_queue;
//!
//! use bus_queue::sync;
//! use std::thread;
//! fn main() {
//!    // Create a sync channel
//!    let (mut tx, rx) = sync::channel(1);
//!    let t = thread::spawn(move|| {
//!        let received = rx.recv().unwrap();
//!        assert_eq!(*received, 10);
//!    });
//!    tx.broadcast(10).unwrap();
//!    t.join().unwrap();
//!}
//! ```
//! ## Simple asynchronous usage
//! ```
//! extern crate bus_queue;
//! extern crate futures;
//! extern crate tokio;
//!
//! use bus_queue::async;
//! use futures::future::Future;
//! use futures::*;
//! use tokio::runtime::Runtime;
//!
//! fn subscriber(rx: async::Subscriber<i32>) -> impl Future<Item = (), Error = ()> {
//!     assert_eq!(
//!         rx.map(|x| *x).collect().wait().unwrap(),
//!         vec![1, 2, 3, 4, 5]
//!     );
//!     future::ok(())
//! }
//!
//! fn main() {
//!     let mut rt = Runtime::new().unwrap();
//!     let (tx, rx): (async::Publisher<i32>, async::Subscriber<i32>) = async::channel(10);
//!
//!     let publisher = stream::iter_ok(vec![1, 2, 3, 3, 5])
//!         .forward(tx)
//!         .and_then(|(_, mut sink)| sink.close())
//!         .map_err(|_| ())
//!         .map(|_| ());
//!
//!     rt.spawn(publisher);
//!     rt.block_on(subscriber(rx)).unwrap();
//! }
//! ```
//!
//! [`BarePublisher`]: struct.BarePublisher.html
//! [`BareSubscriber`]: struct.BareSubscriber.html
//! [`sync`]: sync/index.html
//! [`async`]: async/index.html
//! [`sync::Publisher`]: sync/struct.Publisher.html
//! [`sync::Subscriber`]: sync/struct.Subscriber.html
//! [`async::Publisher`]: async/struct.Publisher.html
//! [`async::Subscriber`]: async/struct.Subscriber.html
//! [`Result`]: ../../../std/result/enum.Result.html
//! [`Err`]: ../../../std/result/enum.Result.html#variant.Err
//! [`unwrap`]: ../../../std/result/enum.Result.html#method.unwrap
extern crate arc_swap;
use arc_swap::{ArcSwap, ArcSwapOption};
pub use std::sync::mpsc::{RecvError, RecvTimeoutError, SendError, TryRecvError};
use std::sync::{atomic::AtomicBool, atomic::AtomicUsize, atomic::Ordering, mpsc, Arc};
use std::iter::{Iterator};

/// Bare implementation of the publisher.
#[derive(Debug)]
pub struct BarePublisher<T: Send> {
    buffer: Arc<Vec<ArcSwapOption<T>>>,
    wi: Arc<AtomicUsize>,
    size: usize,
    sub_cnt: Arc<AtomicUsize>,
    pub_available: Arc<AtomicBool>,
}
/// Bare implementation of the subscriber.
#[derive(Debug)]
pub struct BareSubscriber<T: Send> {
    buffer: Arc<Vec<ArcSwapOption<T>>>,
    wi: Arc<AtomicUsize>,
    ri: AtomicUsize,
    size: usize,
    sub_cnt: Arc<AtomicUsize>,
    pub_available: Arc<AtomicBool>,
}

/// Function used to create and initialise a ( BarePublisher, BareSubscriber ) tuple.
pub fn bare_channel<T: Send>(size: usize) -> (BarePublisher<T>, BareSubscriber<T>) {
    let mut buffer = Vec::new();
    buffer.resize(size, ArcSwapOption::new(None));
    let buffer = Arc::new(buffer);
    let sub_cnt = Arc::new(AtomicUsize::new(1));
    let wi = Arc::new(AtomicUsize::new(0));
    let pub_available = Arc::new(AtomicBool::new(true));
    (
        BarePublisher {
            buffer: buffer.clone(),
            size,
            wi: wi.clone(),
            sub_cnt: sub_cnt.clone(),
            pub_available: pub_available.clone(),
        },
        BareSubscriber {
            buffer: buffer.clone(),
            size,
            wi: wi.clone(),
            ri: AtomicUsize::new(0),
            sub_cnt: sub_cnt.clone(),
            pub_available: pub_available.clone(),
        },
    )
}

impl<T: Send> BarePublisher<T> {
    /// Publishes values to the circular buffer at wi % size
    /// # Arguments
    /// * `object` - owned object to be published
    pub fn broadcast(&mut self, object: T) -> Result<(), SendError<T>> {
        if self.sub_cnt.load(Ordering::Relaxed) == 0 {
            return Err(SendError(object));
        }
        self.buffer[self.wi.load(Ordering::Relaxed) % self.size].store(Some(Arc::new(object)));
        self.wi.fetch_add(1, Ordering::Relaxed);
        Ok(())
    }
}
/// Drop trait is used to let subscribers know that publisher is no longer available.
impl<T: Send> Drop for BarePublisher<T> {
    fn drop(&mut self) {
        self.pub_available.store(false, Ordering::Relaxed);
    }
}

impl<T: Send> BareSubscriber<T> {
    /// Receives some atomic reference to an object if queue is not empty, or None if it is. Never
    /// Blocks
    pub fn try_recv(&self) -> Result<Arc<T>, TryRecvError> {
        if self.ri.load(Ordering::Relaxed) == self.wi.load(Ordering::Relaxed) {
            if self.pub_available.load(Ordering::Relaxed) == false {
                return Err(TryRecvError::Disconnected);
            }
            return Err(TryRecvError::Empty);
        }
        loop {
            match self.buffer[self.ri.load(Ordering::Relaxed) % self.size].load() {
                Some(some) => if self.wi.load(Ordering::Relaxed)
                    > self.ri.load(Ordering::Relaxed) + self.size
                {
                    self.ri.store(
                        self.wi.load(Ordering::Relaxed) - self.size,
                        Ordering::Relaxed,
                    );
                } else {
                    self.ri.fetch_add(1, Ordering::Relaxed);
                    return Ok(some);
                },
                None => unreachable!(),
            }
        }
    }
}

/// Clone trait is used to create another BareSubscriber object, subscribed to the same
/// Publisher the initial object was subscribed to.
impl<T: Send> Clone for BareSubscriber<T> {
    fn clone(&self) -> Self {
        self.sub_cnt.fetch_add(1, Ordering::Relaxed);
        Self {
            buffer: self.buffer.clone(),
            wi: self.wi.clone(),
            ri: AtomicUsize::new(self.ri.load(Ordering::Relaxed)),
            size: self.size,
            sub_cnt: self.sub_cnt.clone(),
            pub_available: self.pub_available.clone(),
        }
    }
}

impl<T: Send> Drop for BareSubscriber<T> {
    fn drop(&mut self) {
        self.sub_cnt.fetch_sub(1, Ordering::Relaxed);
    }
}

impl<T: Send> Iterator for BareSubscriber<T> {
    type Item = Arc<T>;

    fn next(&mut self) -> Option<Self::Item>{
        match self.try_recv() {
            Ok(item) => Some(item),
            Err(_) => None
        }
    }
}
/// Helper struct used by sync and async implementations to wake Tasks / Threads
#[derive(Debug)]
struct Waker<T> {
    /// Vector of Tasks / Threads to be woken up.
    pub sleepers: Vec<Arc<T>>,
    /// A mpsc Receiver used to receive Tasks / Threads to be registered.
    receiver: mpsc::Receiver<Arc<T>>,
}

/// Helper struct used by sync and async implementations to register Tasks / Threads to
/// be woken up.
#[derive(Debug)]
struct Sleeper<T> {
    /// Current Task / Thread to be woken up.
    pub sleeper: Arc<T>,
    /// mpsc Sender used to register Task / Thread.
    pub sender: mpsc::Sender<Arc<T>>,
}

impl<T> Waker<T> {
    /// Register all the Tasks / Threads sent for registration.
    pub fn register_receivers(&mut self) {
        for receiver in self.receiver.try_recv() {
            self.sleepers.push(receiver);
        }
    }
}

/// Function used to create a ( Waker, Sleeper ) tuple.
fn alarm<T>(current: T) -> (Waker<T>, Sleeper<T>) {
    let mut vec = Vec::new();
    let (sender, receiver) = mpsc::channel();
    let arc_t = Arc::new(current);
    vec.push(arc_t.clone());
    (
        Waker {
            sleepers: vec,
            receiver,
        },
        Sleeper {
            sleeper: arc_t.clone(),
            sender,
        },
    )
}

pub mod sync;
#[cfg(feature = "async")]
extern crate futures;
#[cfg(feature = "async")]
pub mod async;