async_weighted_semaphore/
lib.rs

1//! An async weighted semaphore: a synchronization primitive for limiting concurrent usage of a
2//! resource or signaling availability of a resource to a consumer.
3//!
4//! A [`Semaphore`] starts with an initial counter of permits. Calling [release](#method.release) will increase the
5//! counter. Calling [acquire](#method.acquire) will attempt to decrease the counter, waiting if the counter
6//! would be negative.
7//!
8//! # Examples
9//! A semaphore can limit memory usage of concurrent futures:
10//! ```
11//! # use async_weighted_semaphore::Semaphore;
12//! # use std::{io};
13//! # use async_std::fs;
14//! struct ChecksumPool(Semaphore);
15//! impl ChecksumPool{
16//!     async fn checksum(&self, path: &str) -> io::Result<u64> {
17//!         let len = fs::metadata(path).await?.len();
18//!         // Acquire enough permits to create a buffer
19//!         let _guard = self.0.acquire(len as usize).await.unwrap();
20//!         // Create a buffer
21//!         let contents = fs::read(path).await?;
22//!         Ok(contents.into_iter().map(|x| x as u64).sum::<u64>())
23//!         // End of scope: buffer is dropped and then _guard is dropped, releasing the permits.
24//!     }
25//! }
26//! ```
27//! A semaphore can limit memory usage of a producer-consumer queue:
28//! ```
29//! # use async_weighted_semaphore::{Semaphore, SemaphoreGuardArc};
30//! # use std::sync::Arc;
31//! # use futures::executor::block_on;
32//! # use std::mem;
33//! # use futures::join;
34//! use async_channel::{Sender, Receiver, unbounded, SendError};
35//! # block_on(async {
36//! let (sender, receiver) = unbounded();
37//! let sender = async move {
38//!     // The total size of strings in queue and being parsed will not exceed 10.
39//!     let capacity = 10;
40//!     let semaphore = Arc::new(Semaphore::new(capacity));
41//!     for i in 0..100 {
42//!         let data = format!("{}", i);
43//!         // Don't deadlock if data.len() exceeds capacity.
44//!         let permits = data.len().max(capacity);
45//!         let guard = semaphore.acquire_arc(permits).await.unwrap();
46//!         if let Err(SendError(_)) = sender.send((guard, data)).await {
47//!             break;
48//!         }
49//!     }
50//! };
51//! let receiver = async {
52//!     for i in 0..100 {
53//!         if let Ok((guard, data)) = receiver.recv().await{
54//!             assert_eq!(Ok(i), data.parse());
55//!             mem::drop(data);
56//!             // Drop guard after data to ensure data being parsed counts against the capacity.
57//!             mem::drop(guard);
58//!         }
59//!     }
60//! };
61//! join!(receiver, sender);
62//! # });
63//! ```
64//! A semaphore can signal the availability of data for batch processing:
65//! ```
66//! # use std::collections::VecDeque;
67//! # use async_weighted_semaphore::Semaphore;
68//! # use std::sync::Arc;
69//! # use futures::executor::block_on;
70//! # use async_std::sync::Mutex;
71//! # use futures::join;
72//! # block_on(async {
73//! let buffer1 = Arc::new((Semaphore::new(0), Mutex::new(VecDeque::<u8>::new())));
74//! let buffer2 = buffer1.clone();
75//! let sender = async move {
76//!     for i in 0..100 {
77//!         buffer1.1.lock().await.extend(b"AAA");
78//!         buffer1.0.release(3);
79//!     }
80//!     // Indicate no more data will arrive.
81//!     buffer1.0.poison();
82//! };
83//! let receiver = async {
84//!     for i in 0..100 {
85//!         if let Ok(guard) = buffer2.0.acquire(2).await {
86//!             guard.forget();
87//!         }
88//!         let batch = buffer2.1.lock().await.drain(0..2).collect::<Vec<_>>();
89//!         assert!(batch == b"" || batch == b"A" || batch == b"AA");
90//!         if batch.len() < 2 {
91//!             break;
92//!         }
93//!     }
94//! };
95//! join!(receiver, sender);
96//! # });
97//! ```
98//! # Priority
99//! Acquiring has "first-in-first-out" semantics: calls to `acquire` finish in the same order that
100//! they start. If there is a pending call to `acquire`, a new call to `acquire` will always block,
101//! even if there are enough permits available for the new call. This policy reduces starvation and
102//! tail latency at the cost of utilization.
103//! ```
104//! # use async_weighted_semaphore::Semaphore;
105//! # use futures::executor::block_on;
106//! # block_on(async{
107//! # use futures::pin_mut;
108//! # use futures::poll;
109//! let sem = Semaphore::new(1);
110//! let a = sem.acquire(2);
111//! let b = sem.acquire(1);
112//! pin_mut!(a);
113//! pin_mut!(b);
114//! assert!(poll!(&mut a).is_pending());
115//! assert!(poll!(&mut b).is_pending());
116//! # });
117//! ```
118//!
119//! # Poisoning
120//! If a guard is dropped while panicking, or the number of available permits exceeds [`Semaphore::MAX_AVAILABLE`],
121//! the semaphore will be permanently poisoned. All current and future acquires will fail,
122//! and release will become a no-op. This is similar in principle to poisoning a [`std::sync::Mutex`].
123//! Explicitly poisoning with [`Semaphore::poison`] can also be useful to coordinate termination
124//! (e.g. closing a producer-consumer channel).
125//!
126//! # Performance
127//! [`Semaphore`] uses no heap allocations. Most calls are lock-free. The only operation that may
128//! wait for a lock is cancellation: if a [`AcquireFuture`] or [`AcquireFutureArc`] is dropped
129//! before [`Future::poll`] returns [`Poll::Ready`], the drop may synchronously wait for a lock.
130
131#![doc(html_root_url = "https://docs.rs/async-weighted-semaphore/0.2.1")]
132
133#[cfg(test)]
134#[macro_use]
135extern crate lazy_static;
136
137pub use crate::errors::{TryAcquireError, PoisonError};
138pub use crate::guard::SemaphoreGuard;
139pub use crate::guard::SemaphoreGuardArc;
140pub use crate::acquire::{AcquireFuture, AcquireFutureArc};
141pub use semaphore::Semaphore;
142#[allow(unused_imports)] // used by docs
143use std::future::Future;
144#[allow(unused_imports)] // used by docs
145use std::task::Poll;
146
147mod atomic;
148mod waker;
149mod guard;
150mod state;
151mod errors;
152mod acquire;
153mod release;
154#[cfg(test)]
155mod tests;
156mod semaphore;
157
158#[test]
159fn test_readme_deps() {
160    version_sync::assert_markdown_deps_updated!("README.md");
161}
162
163#[test]
164fn test_html_root_url() {
165    version_sync::assert_html_root_url_updated!("src/lib.rs");
166}