multiqueue2/
lib.rs

1//! This crate provides a fast mpmc broadcast queue.
2//! It's based on the queue design from the LMAX Disruptor, with a few improvements:
3//!
4//!   * It acts as a futures stream/sink, so you can set up high-performance pipelines
5//!
6//!   * It can dynamically add/remove senders, and each stream can have multiple receivers
7//!
8//!   * It has fast runtime fallbacks for when there's a single consumer and/or a single producer
9//!
10//!   * It works on 32 bit systems without any performance or capability penalty
11//!
12//!   * In most cases, one can view data written directly into the queue without copying it
13//!
14//! In many cases, ```MultiQueue``` will be a good replacement for channels and it's broadcast
15//! capabilities can replace more complex concurrency systems with a single queue.
16//!
17//! # Queue Model:
18//! ```MultiQueue``` functions similarly to the LMAX Disruptor from a high level view.
19//! There's an incoming FIFO data stream that is broadcast to a set of subscribers
20//! as if there were multiple streams being written to.
21//! There are two main differences:
22//!
23//!   * ```MultiQueue``` transparently supports switching between single and multiple producers.
24//!
25//!   * Each broadcast stream can be shared among multiple consumers.
26//!
27//! The last part makes the model a bit confusing, since there's a difference between a
28//! stream of data and something consuming that stream. To make things worse, each consumer
29//! may not actually see each value on the stream. Instead, multiple consumers may act on
30//! a single stream each getting unique access to certain elements.
31//!
32//! A helpful mental model may be to think about this as if each stream was really just an mpmc
33//! queue that was getting pushed to, and the ```MultiQueue``` structure just assembled a bunch
34//! together behind the scenes. This isn't the case of course, but it's helpful for thinking.
35//!
36//! An diagram that represents a general use case of the queue where each consumer has unique
37//! access to a stream is below - the # stand in for producers and @ stands in for the consumer of
38//! each stream, each with a label. The lines are meant to show the data flow through the queue.
39//!
40//! ```text
41//!. -> #        @-1
42//!.     \      /
43//!.      -> -> -> @-2
44//!.     /      \
45//!. -> #        @-3
46//! ```
47//!
48//! This is a pretty standard broadcast queue setup -
49//! for each element sent in, it is seen on each stream by that's streams consumer.
50//!
51//!
52//! However, in MultiQueue, each logical consumer might actually be demultiplexed
53//! across many actual consumers, like below.
54//!
55//! ```text
56//!. -> #        @-1
57//!.     \      /
58//!.      -> -> -> @-2' (really @+@+@ each compete for a spot)
59//!.     /      \
60//!. -> #        @-3
61//! ```
62//!
63//! If this diagram is redrawn with each of the producers sending in a
64//! sequenced element (time goes left  to right):
65//!
66//! ```text
67//!. t=1|t=2|    t=3    | t=4|
68//!. 1 -> #              @-1 (1, 2)
69//!.       \            /
70//!.        -> 2 -> 1 -> -> @-2' (really @ (1) + @ (2) + @ (nothing yet))
71//!.       /            \
72//!. 2 -> #              @-3 (1, 2)
73//!```
74//!
75//! If one imagines this as a webserver, the streams for @-1 and @-3 might be doing random
76//! webservery work like some logging or metrics gathering and can handle
77//! the workload completely on one core, @-2 is doing expensive work handling requests
78//! and is split into multiple workers dealing with the data stream.
79//!
80//! # MPMC Mode:
81//! One might notice that the broadcast queue modes requires that a type be Clone,
82//! and the single-reader inplace variants require that a type be Sync as well.
83//! This is only required for broadcast queues and not normal mpmc queues,
84//! so there's an mpmc api as well. It doesn't require that a type be Clone or Sync
85//! for any api, and also moves items directly out of the queue instead of cloning them.
86//!
87//! # Futures Mode:
88//! For both mpmc and broadcast, a futures mode is supported. The datastructures are quite
89//! similar to the normal ones, except they implement the Futures Sink/Stream traits for
90//! senders and receivers. This comes at a bit of a performance cost, which is why the
91//! futures types are separate
92//!
93//! # Usage:
94//! From the receiving side, this behaves quite similarly to a channel receiver.
95//! The .recv function will block until data is available and then return the data.
96//!
97//! For senders, there is only ```.try_send``` (except for the futures sink, which can park),
98//! This is due to performance and api reasons - you should handle backlog instead of just blocking.
99//!
100//! # Example: SPSC channel
101//!
102//! ```
103//! extern crate multiqueue2 as multiqueue;
104//!
105//! use std::thread;
106//!
107//! let (send, recv) = multiqueue::mpmc_queue(10);
108//!
109//! let handle = thread::spawn(move || {
110//!     for val in recv {
111//!         println!("Got {}", val);
112//!     }
113//! });
114//!
115//! for i in 0..10 {
116//!     send.try_send(i).unwrap();
117//! }
118//!
119//! // Drop the sender to close the queue
120//! drop(send);
121//!
122//! handle.join();
123//!
124//! // prints
125//! // Got 0
126//! // Got 1
127//! // Got 2
128//! // etc
129//! ```
130//!
131//! # Example: SPSC broadcasting
132//!
133//! ```
134//! extern crate multiqueue2 as multiqueue;
135//!
136//! use std::thread;
137//!
138//! let (send, recv) = multiqueue::broadcast_queue(4);
139//! let mut handles = vec![];
140//! for i in 0..2 { // or n
141//!     let cur_recv = recv.add_stream();
142//!     handles.push(thread::spawn(move || {
143//!         for val in cur_recv {
144//!             println!("Stream {} got {}", i, val);
145//!         }
146//!     }));
147//! }
148//!
149//! // Take notice that I drop the reader - this removes it from
150//! // the queue, meaning that the readers in the new threads
151//! // won't get starved by the lack of progress from recv
152//! recv.unsubscribe();
153//!
154//! for i in 0..10 {
155//!     // Don't do this busy loop in real stuff unless you're really sure
156//!     loop {
157//!         if send.try_send(i).is_ok() {
158//!             break;
159//!         }
160//!     }
161//! }
162//!
163//! // Drop the sender to close the queue
164//! drop(send);
165//!
166//! for t in handles {
167//!     t.join();
168//! }
169//!
170//! // prints along the lines of
171//! // Stream 0 got 0
172//! // Stream 0 got 1
173//! // Stream 1 got 0
174//! // Stream 0 got 2
175//! // Stream 1 got 1
176//! // etc
177//!
178//! ```
179//!
180//! # Example: SPMC broadcast
181//!
182//! ```
183//! extern crate multiqueue2 as multiqueue;
184//!
185//! use std::thread;
186//!
187//! let (send, recv) = multiqueue::broadcast_queue(4);
188//!
189//! let mut handles = vec![];
190//!
191//! for i in 0..2 { // or n
192//!     let cur_recv = recv.add_stream();
193//!     for j in 0..2 {
194//!         let stream_consumer = cur_recv.clone();
195//!         handles.push(thread::spawn(move || {
196//!             for val in stream_consumer {
197//!                 println!("Stream {} consumer {} got {}", i, j, val);
198//!             }
199//!         }));
200//!     }
201//!     // cur_recv is dropped here
202//! }
203//!
204//! // Take notice that I drop the reader - this removes it from
205//! // the queue, meaning that the readers in the new threads
206//! // won't get starved by the lack of progress from recv
207//! recv.unsubscribe();
208//!
209//! for i in 0..10 {
210//!     // Don't do this busy loop in real stuff unless you're really sure
211//!     loop {
212//!         if send.try_send(i).is_ok() {
213//!             break;
214//!         }
215//!     }
216//! }
217//! drop(send);
218//!
219//! for t in handles {
220//!     t.join();
221//! }
222//!
223//! // prints along the lines of
224//! // Stream 0 consumer 1 got 2
225//! // Stream 0 consumer 0 got 0
226//! // Stream 1 consumer 0 got 0
227//! // Stream 0 consumer 1 got 1
228//! // Stream 1 consumer 1 got 1
229//! // Stream 1 consumer 0 got 2
230//! // etc
231//!
232//! // some join mechanics here
233//! ```
234//!
235//! # Example: Usage menagerie
236//!
237//! ```
238//! extern crate multiqueue2 as multiqueue;
239//!
240//! use std::thread;
241//!
242//! let (send, recv) = multiqueue::broadcast_queue(4);
243//! let mut handles = vec![];
244//!
245//! // start like before
246//! for i in 0..2 { // or n
247//!     let cur_recv = recv.add_stream();
248//!     for j in 0..2 {
249//!         let stream_consumer = cur_recv.clone();
250//!         handles.push(thread::spawn(move ||
251//!             for val in stream_consumer {
252//!                 println!("Stream {} consumer {} got {}", i, j, val);
253//!             }
254//!         ));
255//!     }
256//!     // cur_recv is dropped here
257//! }
258//!
259//! // On this stream, since there's only one consumer,
260//! // the receiver can be made into a UniReceiver
261//! // which can view items inline in the queue
262//! let single_recv = recv.add_stream().into_single().unwrap();
263//!
264//! handles.push(thread::spawn(move ||
265//!     for val in single_recv.iter_with(|item_ref| 10 * *item_ref) {
266//!         println!("{}", val);
267//!     }
268//! ));
269//!
270//! // Same as above, except this time we just want to iterate until the receiver is empty
271//! let single_recv_2 = recv.add_stream().into_single().unwrap();
272//!
273//! handles.push(thread::spawn(move ||
274//!     for val in single_recv_2.try_iter_with(|item_ref| 10 * *item_ref) {
275//!         println!("{}", val);
276//!     }
277//! ));
278//!
279//! // Take notice that I drop the reader - this removes it from
280//! // the queue, meaning that the readers in the new threads
281//! // won't get starved by the lack of progress from recv
282//! recv.unsubscribe();
283//!
284//! // Many senders to give all the receivers something
285//! for _ in 0..3 {
286//!     let cur_send = send.clone();
287//!     handles.push(thread::spawn(move ||
288//!         for i in 0..10 {
289//!             loop {
290//!                 if cur_send.try_send(i).is_ok() {
291//!                     break;
292//!                 }
293//!             }
294//!         }
295//!     ));
296//! }
297//! drop(send);
298//!
299//! for t in handles {
300//!    t.join();
301//! }
302//! ```
303
304#![cfg_attr(clippy, allow(clippy::inline_always))]
305
306mod alloc;
307mod atomicsignal;
308mod broadcast;
309mod consume;
310mod countedindex;
311mod maybe_acquire;
312mod memory;
313mod mpmc;
314mod multiqueue;
315mod read_cursor;
316pub mod wait;
317
318pub use crate::broadcast::{
319    BroadcastFutReceiver, BroadcastFutSender, BroadcastFutUniReceiver, BroadcastReceiver,
320    BroadcastSender, BroadcastUniReceiver, broadcast_fut_queue, broadcast_fut_queue_with,
321    broadcast_queue, broadcast_queue_with,
322};
323
324pub use crate::mpmc::{
325    MPMCFutReceiver, MPMCFutSender, MPMCFutUniReceiver, MPMCReceiver, MPMCSender, MPMCUniReceiver,
326    mpmc_fut_queue, mpmc_queue, mpmc_queue_with,
327};