deadqueue/lib.rs
1//! # Deadqueue [](https://crates.io/crates/deadqueue) [](https://travis-ci.org/deadpool-rs/deadqueue)
2//!
3//! Deadqueue is a dead simple async queue with back pressure support.
4//!
5//! This crate provides three implementations:
6//!
7//! - Unlimited (`deadqueue::unlimited::Queue`)
8//! - Based on `crossbeam_queue::SegQueue`
9//! - Has unlimitied capacity and no back pressure on push
10//! - Enabled via the `unlimited` feature in your `Cargo.toml`
11//!
12//! - Resizable (`deadqueue::resizable::Queue`)
13//! - Based on `deadqueue::unlimited::Queue`
14//! - Has limited capacity with back pressure on push
15//! - Supports resizing
16//! - Enabled via the `resizable` feature in your `Cargo.toml`
17//!
18//! - Limited (`deadqueue::limited::Queue`)
19//! - Based on `crossbeam_queue::ArrayQueue`
20//! - Has limit capacity with back pressure on push
21//! - Does not support resizing
22//! - Enabled via the `limited` feature in your `Cargo.toml`
23//!
24//! ## Features
25//!
26//! | Feature | Description | Extra dependencies | Default |
27//! | ------- | ----------- | ------------------ | ------- |
28//! | `unlimited` | Enable unlimited queue implementation | – | yes |
29//! | `resizable` | Enable resizable queue implementation | `deadqueue/unlimited` | yes |
30//! | `limited` | Enable limited queue implementation | – | yes |
31//!
32//! ## Example
33//!
34//! ```rust
35//! use std::sync::Arc;
36//! use tokio::time::{sleep, Duration};
37//!
38//! const TASK_COUNT: usize = 1000;
39//! const WORKER_COUNT: usize = 10;
40//!
41//! type TaskQueue = deadqueue::limited::Queue<usize>;
42//!
43//! #[tokio::main]
44//! async fn main() {
45//! let queue = Arc::new(TaskQueue::new(TASK_COUNT));
46//! for i in 0..TASK_COUNT {
47//! queue.try_push(i).unwrap();
48//! }
49//! for worker in 0..WORKER_COUNT {
50//! let queue = queue.clone();
51//! tokio::spawn(async move {
52//! loop {
53//! let task = queue.pop().await;
54//! println!("worker[{}] processing task[{}] ...", worker, task);
55//! }
56//! });
57//! }
58//! println!("Waiting for workers to finish...");
59//! queue.wait_empty().await;
60//! println!("All tasks done. :-)");
61//! }
62//! ```
63//!
64//! ## Reasons for yet another queue
65//!
66//! Deadqueue is by no means the only queue implementation available. It does things a little different and provides features that other implementations are lacking:
67//!
68//! - **Resizable queue.** Usually you have to pick between `limited` and `unlimited` queues. This crate features a `resizable` Queue which can be resized as needed. This is probably a big **unique selling point** of this crate.
69//!
70//! - **Introspection support.** The methods `.len()`, `.capacity()` and `.available()` provide access the current state of the queue.
71//!
72//! - **Fair scheduling.** Tasks calling `pop` will receive items in a first-come-first-serve fashion. This is mainly due to the use of `tokio::sync::Semaphore` which is fair by nature.
73//!
74//! - **One struct, not two.** The channels of `tokio`, `async_std` and `futures-intrusive` split the queue in two structs (`Sender` and `Receiver`) which makes the usage sligthly more complicated.
75//!
76//! - **Bring your own `Arc`.** Since there is no separation between `Sender` and `Receiver` there is also no need for an internal `Arc`. (All implementations that split the channel into a `Sender` and `Receiver` need some kind of `Arc` internally.)
77//!
78//! - **Fully concurrent access.** No need to wrap the `Receiver` part in a `Mutex`. All methods support concurrent accesswithout the need for an additional synchronization primitive.
79//!
80//! - **Support for `try__` methods.** The methods `try_push` and `try_pop` can be used to access the queue from non-blocking synchroneous code.
81//!
82//! - **Support for detecting when the queue becomes empty or full**, using the `wait_empty`, `subscribe_empty`, `wait_full` and `subscribe_full` methods.
83//!
84//! ## Alternatives
85//!
86//! | Crate | Limitations | Documentation |
87//! | --- | --- | --- |
88//! | [`tokio`](https://crates.io/crates/tokio) | No resizable queue. No introspection support. Synchronization of `Receiver` needed. | [`tokio::sync::mpsc::channel`](https://docs.rs/tokio/latest/tokio/sync/mpsc/fn.channel.html), [`tokio::sync::mpsc::unbounded_channel`](https://docs.rs/tokio/latest/tokio/sync/mpsc/fn.unbounded_channel.html) |
89//! | [`async-std`](https://crates.io/crates/async-std) | No resizable or unlimited queue. No introspection support. No `try_send` or `try_recv` methods. | [`async_std::sync::channel`](https://docs.rs/async-std/latest/async_std/sync/fn.channel.html) |
90//! | [`futures`](https://crates.io/crates/futures) | No resizable queue. No introspection support. | [`futures::channel::mpsc::channel`](https://docs.rs/futures/0.3.1/futures/channel/mpsc/fn.channel.html), [`futures::channel::mpsc::unbounded`](https://docs.rs/futures/0.3.1/futures/channel/mpsc/fn.unbounded.html) |
91//!
92//! ## License
93//!
94//! Licensed under either of
95//!
96//! - Apache License, Version 2.0 ([LICENSE-APACHE](LICENSE-APACHE) or <http://www.apache.org/licenses/LICENSE-2.0)>
97//! - MIT license ([LICENSE-MIT](LICENSE-MIT) or <http://opensource.org/licenses/MIT)>
98//!
99//! at your option.
100#![warn(missing_docs)]
101
102use tokio::sync::watch;
103
104mod atomic;
105
106#[cfg(feature = "unlimited")]
107pub mod unlimited;
108
109#[cfg(feature = "resizable")]
110pub mod resizable;
111
112#[cfg(feature = "limited")]
113pub mod limited;
114
115/// Private type alias for notify_full and notify_empty
116type Notifier = watch::Sender<()>;
117
118/// Public type alias for subscribe_full and subscribe_empty
119pub type Receiver = watch::Receiver<()>;
120
121/// Initialize the notify_full sender
122fn new_notifier() -> Notifier {
123 let (sender, _) = watch::channel(());
124 sender
125}