async_semaphore/
lib.rs

1//! DO NOT USE!
2//!
3//! This crate was merged into [async-lock], which provides the API this crate used to.
4//!
5//! [async-lock]: https://crates.io/crates/async-lock
6
7#![forbid(unsafe_code)]
8#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
9
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::sync::Arc;
12
13use event_listener::Event;
14
15/// A counter for limiting the number of concurrent operations.
16#[derive(Debug)]
17pub struct Semaphore {
18    count: AtomicUsize,
19    event: Event,
20}
21
22impl Semaphore {
23    /// Creates a new semaphore with a limit of `n` concurrent operations.
24    ///
25    /// # Examples
26    ///
27    /// ```
28    /// use async_semaphore::Semaphore;
29    ///
30    /// let s = Semaphore::new(5);
31    /// ```
32    pub const fn new(n: usize) -> Semaphore {
33        Semaphore {
34            count: AtomicUsize::new(n),
35            event: Event::new(),
36        }
37    }
38
39    /// Attempts to get a permit for a concurrent operation.
40    ///
41    /// If the permit could not be acquired at this time, then [`None`] is returned. Otherwise, a
42    /// guard is returned that releases the mutex when dropped.
43    ///
44    /// # Examples
45    ///
46    /// ```
47    /// use async_semaphore::Semaphore;
48    ///
49    /// let s = Semaphore::new(2);
50    ///
51    /// let g1 = s.try_acquire().unwrap();
52    /// let g2 = s.try_acquire().unwrap();
53    ///
54    /// assert!(s.try_acquire().is_none());
55    /// drop(g2);
56    /// assert!(s.try_acquire().is_some());
57    /// ```
58    pub fn try_acquire(&self) -> Option<SemaphoreGuard<'_>> {
59        let mut count = self.count.load(Ordering::Acquire);
60        loop {
61            if count == 0 {
62                return None;
63            }
64
65            match self.count.compare_exchange_weak(
66                count,
67                count - 1,
68                Ordering::AcqRel,
69                Ordering::Acquire,
70            ) {
71                Ok(_) => return Some(SemaphoreGuard(self)),
72                Err(c) => count = c,
73            }
74        }
75    }
76
77    /// Waits for a permit for a concurrent operation.
78    ///
79    /// Returns a guard that releases the permit when dropped.
80    ///
81    /// # Examples
82    ///
83    /// ```
84    /// # futures_lite::future::block_on(async {
85    /// use async_semaphore::Semaphore;
86    ///
87    /// let s = Semaphore::new(2);
88    /// let guard = s.acquire().await;
89    /// # });
90    /// ```
91    pub async fn acquire(&self) -> SemaphoreGuard<'_> {
92        let mut listener = None;
93
94        loop {
95            if let Some(guard) = self.try_acquire() {
96                return guard;
97            }
98
99            match listener.take() {
100                None => listener = Some(self.event.listen()),
101                Some(l) => l.await,
102            }
103        }
104    }
105}
106
107impl Semaphore {
108    /// Attempts to get an owned permit for a concurrent operation.
109    ///
110    /// If the permit could not be acquired at this time, then [`None`] is returned. Otherwise, an
111    /// owned guard is returned that releases the mutex when dropped.
112    ///
113    /// # Examples
114    ///
115    /// ```
116    /// use async_semaphore::Semaphore;
117    /// use std::sync::Arc;
118    ///
119    /// let s = Arc::new(Semaphore::new(2));
120    ///
121    /// let g1 = s.try_acquire_arc().unwrap();
122    /// let g2 = s.try_acquire_arc().unwrap();
123    ///
124    /// assert!(s.try_acquire_arc().is_none());
125    /// drop(g2);
126    /// assert!(s.try_acquire_arc().is_some());
127    /// ```
128    pub fn try_acquire_arc(self: &Arc<Self>) -> Option<SemaphoreGuardArc> {
129        let mut count = self.count.load(Ordering::Acquire);
130        loop {
131            if count == 0 {
132                return None;
133            }
134
135            match self.count.compare_exchange_weak(
136                count,
137                count - 1,
138                Ordering::AcqRel,
139                Ordering::Acquire,
140            ) {
141                Ok(_) => return Some(SemaphoreGuardArc(self.clone())),
142                Err(c) => count = c,
143            }
144        }
145    }
146
147    /// Waits for an owned permit for a concurrent operation.
148    ///
149    /// Returns a guard that releases the permit when dropped.
150    ///
151    /// # Examples
152    ///
153    /// ```
154    /// # futures_lite::future::block_on(async {
155    /// use async_semaphore::Semaphore;
156    /// use std::sync::Arc;
157    ///
158    /// let s = Arc::new(Semaphore::new(2));
159    /// let guard = s.acquire_arc().await;
160    /// # });
161    /// ```
162    pub async fn acquire_arc(self: &Arc<Self>) -> SemaphoreGuardArc {
163        let mut listener = None;
164
165        loop {
166            if let Some(guard) = self.try_acquire_arc() {
167                return guard;
168            }
169
170            match listener.take() {
171                None => listener = Some(self.event.listen()),
172                Some(l) => l.await,
173            }
174        }
175    }
176}
177
178/// A guard that releases the acquired permit.
179#[derive(Debug)]
180pub struct SemaphoreGuard<'a>(&'a Semaphore);
181
182impl Drop for SemaphoreGuard<'_> {
183    fn drop(&mut self) {
184        self.0.count.fetch_add(1, Ordering::AcqRel);
185        self.0.event.notify(1);
186    }
187}
188
189/// An owned guard that releases the acquired permit.
190#[derive(Debug)]
191pub struct SemaphoreGuardArc(Arc<Semaphore>);
192
193impl Drop for SemaphoreGuardArc {
194    fn drop(&mut self) {
195        self.0.count.fetch_add(1, Ordering::AcqRel);
196        self.0.event.notify(1);
197    }
198}