multiqueue2/lib.rs
1//! This crate provides a fast mpmc broadcast queue.
2//! It's based on the queue design from the LMAX Disruptor, with a few improvements:
3//!
4//! * It acts as a futures stream/sink, so you can set up high-performance pipelines
5//!
6//! * It can dynamically add/remove senders, and each stream can have multiple receivers
7//!
8//! * It has fast runtime fallbacks for when there's a single consumer and/or a single producer
9//!
10//! * It works on 32 bit systems without any performance or capability penalty
11//!
12//! * In most cases, one can view data written directly into the queue without copying it
13//!
14//! In many cases, ```MultiQueue``` will be a good replacement for channels and it's broadcast
15//! capabilities can replace more complex concurrency systems with a single queue.
16//!
17//! # Queue Model:
18//! ```MultiQueue``` functions similarly to the LMAX Disruptor from a high level view.
19//! There's an incoming FIFO data stream that is broadcast to a set of subscribers
20//! as if there were multiple streams being written to.
21//! There are two main differences:
22//!
23//! * ```MultiQueue``` transparently supports switching between single and multiple producers.
24//!
25//! * Each broadcast stream can be shared among multiple consumers.
26//!
27//! The last part makes the model a bit confusing, since there's a difference between a
28//! stream of data and something consuming that stream. To make things worse, each consumer
29//! may not actually see each value on the stream. Instead, multiple consumers may act on
30//! a single stream each getting unique access to certain elements.
31//!
32//! A helpful mental model may be to think about this as if each stream was really just an mpmc
33//! queue that was getting pushed to, and the ```MultiQueue``` structure just assembled a bunch
34//! together behind the scenes. This isn't the case of course, but it's helpful for thinking.
35//!
36//! An diagram that represents a general use case of the queue where each consumer has unique
37//! access to a stream is below - the # stand in for producers and @ stands in for the consumer of
38//! each stream, each with a label. The lines are meant to show the data flow through the queue.
39//!
40//! ```text
41//!. -> # @-1
42//!. \ /
43//!. -> -> -> @-2
44//!. / \
45//!. -> # @-3
46//! ```
47//!
48//! This is a pretty standard broadcast queue setup -
49//! for each element sent in, it is seen on each stream by that's streams consumer.
50//!
51//!
52//! However, in MultiQueue, each logical consumer might actually be demultiplexed
53//! across many actual consumers, like below.
54//!
55//! ```text
56//!. -> # @-1
57//!. \ /
58//!. -> -> -> @-2' (really @+@+@ each compete for a spot)
59//!. / \
60//!. -> # @-3
61//! ```
62//!
63//! If this diagram is redrawn with each of the producers sending in a
64//! sequenced element (time goes left to right):
65//!
66//! ```text
67//!. t=1|t=2| t=3 | t=4|
68//!. 1 -> # @-1 (1, 2)
69//!. \ /
70//!. -> 2 -> 1 -> -> @-2' (really @ (1) + @ (2) + @ (nothing yet))
71//!. / \
72//!. 2 -> # @-3 (1, 2)
73//!```
74//!
75//! If one imagines this as a webserver, the streams for @-1 and @-3 might be doing random
76//! webservery work like some logging or metrics gathering and can handle
77//! the workload completely on one core, @-2 is doing expensive work handling requests
78//! and is split into multiple workers dealing with the data stream.
79//!
80//! # MPMC Mode:
81//! One might notice that the broadcast queue modes requires that a type be Clone,
82//! and the single-reader inplace variants require that a type be Sync as well.
83//! This is only required for broadcast queues and not normal mpmc queues,
84//! so there's an mpmc api as well. It doesn't require that a type be Clone or Sync
85//! for any api, and also moves items directly out of the queue instead of cloning them.
86//!
87//! # Futures Mode:
88//! For both mpmc and broadcast, a futures mode is supported. The datastructures are quite
89//! similar to the normal ones, except they implement the Futures Sink/Stream traits for
90//! senders and receivers. This comes at a bit of a performance cost, which is why the
91//! futures types are separate
92//!
93//! # Usage:
94//! From the receiving side, this behaves quite similarly to a channel receiver.
95//! The .recv function will block until data is available and then return the data.
96//!
97//! For senders, there is only ```.try_send``` (except for the futures sink, which can park),
98//! This is due to performance and api reasons - you should handle backlog instead of just blocking.
99//!
100//! # Example: SPSC channel
101//!
102//! ```
103//! extern crate multiqueue2 as multiqueue;
104//!
105//! use std::thread;
106//!
107//! let (send, recv) = multiqueue::mpmc_queue(10);
108//!
109//! let handle = thread::spawn(move || {
110//! for val in recv {
111//! println!("Got {}", val);
112//! }
113//! });
114//!
115//! for i in 0..10 {
116//! send.try_send(i).unwrap();
117//! }
118//!
119//! // Drop the sender to close the queue
120//! drop(send);
121//!
122//! handle.join();
123//!
124//! // prints
125//! // Got 0
126//! // Got 1
127//! // Got 2
128//! // etc
129//! ```
130//!
131//! # Example: SPSC broadcasting
132//!
133//! ```
134//! extern crate multiqueue2 as multiqueue;
135//!
136//! use std::thread;
137//!
138//! let (send, recv) = multiqueue::broadcast_queue(4);
139//! let mut handles = vec![];
140//! for i in 0..2 { // or n
141//! let cur_recv = recv.add_stream();
142//! handles.push(thread::spawn(move || {
143//! for val in cur_recv {
144//! println!("Stream {} got {}", i, val);
145//! }
146//! }));
147//! }
148//!
149//! // Take notice that I drop the reader - this removes it from
150//! // the queue, meaning that the readers in the new threads
151//! // won't get starved by the lack of progress from recv
152//! recv.unsubscribe();
153//!
154//! for i in 0..10 {
155//! // Don't do this busy loop in real stuff unless you're really sure
156//! loop {
157//! if send.try_send(i).is_ok() {
158//! break;
159//! }
160//! }
161//! }
162//!
163//! // Drop the sender to close the queue
164//! drop(send);
165//!
166//! for t in handles {
167//! t.join();
168//! }
169//!
170//! // prints along the lines of
171//! // Stream 0 got 0
172//! // Stream 0 got 1
173//! // Stream 1 got 0
174//! // Stream 0 got 2
175//! // Stream 1 got 1
176//! // etc
177//!
178//! ```
179//!
180//! # Example: SPMC broadcast
181//!
182//! ```
183//! extern crate multiqueue2 as multiqueue;
184//!
185//! use std::thread;
186//!
187//! let (send, recv) = multiqueue::broadcast_queue(4);
188//!
189//! let mut handles = vec![];
190//!
191//! for i in 0..2 { // or n
192//! let cur_recv = recv.add_stream();
193//! for j in 0..2 {
194//! let stream_consumer = cur_recv.clone();
195//! handles.push(thread::spawn(move || {
196//! for val in stream_consumer {
197//! println!("Stream {} consumer {} got {}", i, j, val);
198//! }
199//! }));
200//! }
201//! // cur_recv is dropped here
202//! }
203//!
204//! // Take notice that I drop the reader - this removes it from
205//! // the queue, meaning that the readers in the new threads
206//! // won't get starved by the lack of progress from recv
207//! recv.unsubscribe();
208//!
209//! for i in 0..10 {
210//! // Don't do this busy loop in real stuff unless you're really sure
211//! loop {
212//! if send.try_send(i).is_ok() {
213//! break;
214//! }
215//! }
216//! }
217//! drop(send);
218//!
219//! for t in handles {
220//! t.join();
221//! }
222//!
223//! // prints along the lines of
224//! // Stream 0 consumer 1 got 2
225//! // Stream 0 consumer 0 got 0
226//! // Stream 1 consumer 0 got 0
227//! // Stream 0 consumer 1 got 1
228//! // Stream 1 consumer 1 got 1
229//! // Stream 1 consumer 0 got 2
230//! // etc
231//!
232//! // some join mechanics here
233//! ```
234//!
235//! # Example: Usage menagerie
236//!
237//! ```
238//! extern crate multiqueue2 as multiqueue;
239//!
240//! use std::thread;
241//!
242//! let (send, recv) = multiqueue::broadcast_queue(4);
243//! let mut handles = vec![];
244//!
245//! // start like before
246//! for i in 0..2 { // or n
247//! let cur_recv = recv.add_stream();
248//! for j in 0..2 {
249//! let stream_consumer = cur_recv.clone();
250//! handles.push(thread::spawn(move ||
251//! for val in stream_consumer {
252//! println!("Stream {} consumer {} got {}", i, j, val);
253//! }
254//! ));
255//! }
256//! // cur_recv is dropped here
257//! }
258//!
259//! // On this stream, since there's only one consumer,
260//! // the receiver can be made into a UniReceiver
261//! // which can view items inline in the queue
262//! let single_recv = recv.add_stream().into_single().unwrap();
263//!
264//! handles.push(thread::spawn(move ||
265//! for val in single_recv.iter_with(|item_ref| 10 * *item_ref) {
266//! println!("{}", val);
267//! }
268//! ));
269//!
270//! // Same as above, except this time we just want to iterate until the receiver is empty
271//! let single_recv_2 = recv.add_stream().into_single().unwrap();
272//!
273//! handles.push(thread::spawn(move ||
274//! for val in single_recv_2.try_iter_with(|item_ref| 10 * *item_ref) {
275//! println!("{}", val);
276//! }
277//! ));
278//!
279//! // Take notice that I drop the reader - this removes it from
280//! // the queue, meaning that the readers in the new threads
281//! // won't get starved by the lack of progress from recv
282//! recv.unsubscribe();
283//!
284//! // Many senders to give all the receivers something
285//! for _ in 0..3 {
286//! let cur_send = send.clone();
287//! handles.push(thread::spawn(move ||
288//! for i in 0..10 {
289//! loop {
290//! if cur_send.try_send(i).is_ok() {
291//! break;
292//! }
293//! }
294//! }
295//! ));
296//! }
297//! drop(send);
298//!
299//! for t in handles {
300//! t.join();
301//! }
302//! ```
303
304#![cfg_attr(clippy, allow(clippy::inline_always))]
305
306mod alloc;
307mod atomicsignal;
308mod broadcast;
309mod consume;
310mod countedindex;
311mod maybe_acquire;
312mod memory;
313mod mpmc;
314mod multiqueue;
315mod read_cursor;
316pub mod wait;
317
318pub use crate::broadcast::{
319 BroadcastFutReceiver, BroadcastFutSender, BroadcastFutUniReceiver, BroadcastReceiver,
320 BroadcastSender, BroadcastUniReceiver, broadcast_fut_queue, broadcast_fut_queue_with,
321 broadcast_queue, broadcast_queue_with,
322};
323
324pub use crate::mpmc::{
325 MPMCFutReceiver, MPMCFutSender, MPMCFutUniReceiver, MPMCReceiver, MPMCSender, MPMCUniReceiver,
326 mpmc_fut_queue, mpmc_queue, mpmc_queue_with,
327};