async_weighted_semaphore/lib.rs
1//! An async weighted semaphore: a synchronization primitive for limiting concurrent usage of a
2//! resource or signaling availability of a resource to a consumer.
3//!
4//! A [`Semaphore`] starts with an initial counter of permits. Calling [release](#method.release) will increase the
5//! counter. Calling [acquire](#method.acquire) will attempt to decrease the counter, waiting if the counter
6//! would be negative.
7//!
8//! # Examples
9//! A semaphore can limit memory usage of concurrent futures:
10//! ```
11//! # use async_weighted_semaphore::Semaphore;
12//! # use std::{io};
13//! # use async_std::fs;
14//! struct ChecksumPool(Semaphore);
15//! impl ChecksumPool{
16//! async fn checksum(&self, path: &str) -> io::Result<u64> {
17//! let len = fs::metadata(path).await?.len();
18//! // Acquire enough permits to create a buffer
19//! let _guard = self.0.acquire(len as usize).await.unwrap();
20//! // Create a buffer
21//! let contents = fs::read(path).await?;
22//! Ok(contents.into_iter().map(|x| x as u64).sum::<u64>())
23//! // End of scope: buffer is dropped and then _guard is dropped, releasing the permits.
24//! }
25//! }
26//! ```
27//! A semaphore can limit memory usage of a producer-consumer queue:
28//! ```
29//! # use async_weighted_semaphore::{Semaphore, SemaphoreGuardArc};
30//! # use std::sync::Arc;
31//! # use futures::executor::block_on;
32//! # use std::mem;
33//! # use futures::join;
34//! use async_channel::{Sender, Receiver, unbounded, SendError};
35//! # block_on(async {
36//! let (sender, receiver) = unbounded();
37//! let sender = async move {
38//! // The total size of strings in queue and being parsed will not exceed 10.
39//! let capacity = 10;
40//! let semaphore = Arc::new(Semaphore::new(capacity));
41//! for i in 0..100 {
42//! let data = format!("{}", i);
43//! // Don't deadlock if data.len() exceeds capacity.
44//! let permits = data.len().max(capacity);
45//! let guard = semaphore.acquire_arc(permits).await.unwrap();
46//! if let Err(SendError(_)) = sender.send((guard, data)).await {
47//! break;
48//! }
49//! }
50//! };
51//! let receiver = async {
52//! for i in 0..100 {
53//! if let Ok((guard, data)) = receiver.recv().await{
54//! assert_eq!(Ok(i), data.parse());
55//! mem::drop(data);
56//! // Drop guard after data to ensure data being parsed counts against the capacity.
57//! mem::drop(guard);
58//! }
59//! }
60//! };
61//! join!(receiver, sender);
62//! # });
63//! ```
64//! A semaphore can signal the availability of data for batch processing:
65//! ```
66//! # use std::collections::VecDeque;
67//! # use async_weighted_semaphore::Semaphore;
68//! # use std::sync::Arc;
69//! # use futures::executor::block_on;
70//! # use async_std::sync::Mutex;
71//! # use futures::join;
72//! # block_on(async {
73//! let buffer1 = Arc::new((Semaphore::new(0), Mutex::new(VecDeque::<u8>::new())));
74//! let buffer2 = buffer1.clone();
75//! let sender = async move {
76//! for i in 0..100 {
77//! buffer1.1.lock().await.extend(b"AAA");
78//! buffer1.0.release(3);
79//! }
80//! // Indicate no more data will arrive.
81//! buffer1.0.poison();
82//! };
83//! let receiver = async {
84//! for i in 0..100 {
85//! if let Ok(guard) = buffer2.0.acquire(2).await {
86//! guard.forget();
87//! }
88//! let batch = buffer2.1.lock().await.drain(0..2).collect::<Vec<_>>();
89//! assert!(batch == b"" || batch == b"A" || batch == b"AA");
90//! if batch.len() < 2 {
91//! break;
92//! }
93//! }
94//! };
95//! join!(receiver, sender);
96//! # });
97//! ```
98//! # Priority
99//! Acquiring has "first-in-first-out" semantics: calls to `acquire` finish in the same order that
100//! they start. If there is a pending call to `acquire`, a new call to `acquire` will always block,
101//! even if there are enough permits available for the new call. This policy reduces starvation and
102//! tail latency at the cost of utilization.
103//! ```
104//! # use async_weighted_semaphore::Semaphore;
105//! # use futures::executor::block_on;
106//! # block_on(async{
107//! # use futures::pin_mut;
108//! # use futures::poll;
109//! let sem = Semaphore::new(1);
110//! let a = sem.acquire(2);
111//! let b = sem.acquire(1);
112//! pin_mut!(a);
113//! pin_mut!(b);
114//! assert!(poll!(&mut a).is_pending());
115//! assert!(poll!(&mut b).is_pending());
116//! # });
117//! ```
118//!
119//! # Poisoning
120//! If a guard is dropped while panicking, or the number of available permits exceeds [`Semaphore::MAX_AVAILABLE`],
121//! the semaphore will be permanently poisoned. All current and future acquires will fail,
122//! and release will become a no-op. This is similar in principle to poisoning a [`std::sync::Mutex`].
123//! Explicitly poisoning with [`Semaphore::poison`] can also be useful to coordinate termination
124//! (e.g. closing a producer-consumer channel).
125//!
126//! # Performance
127//! [`Semaphore`] uses no heap allocations. Most calls are lock-free. The only operation that may
128//! wait for a lock is cancellation: if a [`AcquireFuture`] or [`AcquireFutureArc`] is dropped
129//! before [`Future::poll`] returns [`Poll::Ready`], the drop may synchronously wait for a lock.
130
131#![doc(html_root_url = "https://docs.rs/async-weighted-semaphore/0.2.1")]
132
133#[cfg(test)]
134#[macro_use]
135extern crate lazy_static;
136
137pub use crate::errors::{TryAcquireError, PoisonError};
138pub use crate::guard::SemaphoreGuard;
139pub use crate::guard::SemaphoreGuardArc;
140pub use crate::acquire::{AcquireFuture, AcquireFutureArc};
141pub use semaphore::Semaphore;
142#[allow(unused_imports)] // used by docs
143use std::future::Future;
144#[allow(unused_imports)] // used by docs
145use std::task::Poll;
146
147mod atomic;
148mod waker;
149mod guard;
150mod state;
151mod errors;
152mod acquire;
153mod release;
154#[cfg(test)]
155mod tests;
156mod semaphore;
157
158#[test]
159fn test_readme_deps() {
160 version_sync::assert_markdown_deps_updated!("README.md");
161}
162
163#[test]
164fn test_html_root_url() {
165 version_sync::assert_html_root_url_updated!("src/lib.rs");
166}