async_weighted_semaphore/
guard.rs

1use std::{mem, fmt};
2use crate::Semaphore;
3use std::sync::Arc;
4use std::thread::panicking;
5use std::fmt::{Debug, Formatter};
6
7/// A guard returned by [`Semaphore::acquire`] that will call [`Semaphore::release`] when it
8/// is dropped (falls out of scope).
9/// # Examples
10/// ```
11/// # use futures::executor::block_on;
12/// use async_weighted_semaphore::{Semaphore, SemaphoreGuard};
13/// # block_on(async{
14/// let semaphore = Semaphore::new(1);
15/// let guard: SemaphoreGuard = semaphore.acquire(1).await.unwrap();
16/// # })
17/// ```
18#[must_use]
19pub struct SemaphoreGuard<'a> {
20    semaphore: &'a Semaphore,
21    amount: usize,
22    panicking: bool,
23}
24
25/// A guard returned by [`Semaphore::acquire_arc`] that will call [`Semaphore::release`] when it
26/// is dropped (falls out of scope). Can be sent between threads.
27#[must_use]
28pub struct SemaphoreGuardArc {
29    semaphore: Option<Arc<Semaphore>>,
30    amount: usize,
31    panicking: bool,
32}
33
34#[derive(Clone, Copy, Debug, Eq, PartialEq)]
35pub enum SemaphoreGuardSplitErr {
36    Underflow
37}
38
39impl<'a> SemaphoreGuard<'a> {
40    pub fn new(semaphore: &'a Semaphore, amount: usize) -> Self {
41        SemaphoreGuard { semaphore, amount, panicking: panicking() }
42    }
43
44    /// Combine two `SemaphoreGuard`s into one, with the sum of the originals' permits.
45    ///
46    /// # Examples
47    /// ```
48    /// # use async_weighted_semaphore::Semaphore;
49    /// # tokio_test::block_on(async {
50    /// let semaphore = Semaphore::new(15);
51    /// let mut g1 = semaphore.acquire(10).await.unwrap();
52    /// let g2 = semaphore.acquire(5).await.unwrap();
53    /// g1.extend(g2);
54    /// # })
55    /// ```
56    pub fn extend(&mut self, other: SemaphoreGuard<'a>) {
57        assert!(std::ptr::eq(self.semaphore, other.semaphore),
58            "Can't extend a guard with a guard from a different Semaphore");
59        self.amount = self.amount.saturating_add(other.forget());
60    }
61
62    /// Drop the guard without calling [`Semaphore::release`]. This is useful when `release`s don't
63    /// correspond one-to-one with `acquires` or it's difficult to send the guard to the releaser.
64    /// # Examples
65    /// ```
66    /// # use async_weighted_semaphore::{Semaphore, PoisonError, SemaphoreGuardArc};
67    /// use async_channel::{Sender, SendError};
68    /// // Limit size of a producer-consumer queue. Receivers may wait for any number of items
69    /// // to be available.
70    /// async fn send<T>(semaphore: &Semaphore,
71    ///                  sender: &Sender<T>,
72    ///                  message: T
73    ///         ) -> Result<(), SendError<T>>{
74    ///     match semaphore.acquire(1).await {
75    ///         // A semaphore can be poisoned to prevent deadlock when a channel closes.
76    ///         Err(PoisonError) => Err(SendError(message)),
77    ///         Ok(guard) => {
78    ///             sender.send(message).await?;
79    ///             guard.forget();
80    ///             Ok(())
81    ///         }
82    ///     }
83    /// }
84    /// ```
85    pub fn forget(self) -> usize {
86        let amount = self.amount;
87        mem::forget(self);
88        amount
89    }
90
91    /// Split this `SemaphoreGuard` into two.
92    ///
93    /// The new guard will have `permits` permits, and this guard's permits will be reduced
94    /// accordingly.
95    ///
96    /// # Examples
97    /// ```
98    /// # use async_weighted_semaphore::Semaphore;
99    /// # tokio_test::block_on(async {
100    /// let semaphore = Semaphore::new(15);
101    /// let mut g1 = semaphore.acquire(15).await.unwrap();
102    /// let g2 = g1.split(5).unwrap();
103    /// # })
104    /// ```
105    pub fn split(&mut self, permits: usize) -> Result<SemaphoreGuard<'a>, SemaphoreGuardSplitErr> {
106        if self.amount >= permits {
107            self.amount -= permits;
108            Ok(SemaphoreGuard {
109                semaphore: self.semaphore.clone(),
110                amount: permits,
111                panicking: self.panicking
112            })
113        } else {
114            Err(SemaphoreGuardSplitErr::Underflow)
115        }
116    }
117}
118
119impl SemaphoreGuardArc {
120    pub fn new(semaphore: Arc<Semaphore>, amount: usize) -> Self {
121        SemaphoreGuardArc { semaphore: Some(semaphore), amount, panicking: panicking() }
122    }
123
124    /// Combine two `SemaphoreGuardArc`s into one, with the sum of the originals' permits.
125    ///
126    /// # Examples
127    /// ```
128    /// # use async_weighted_semaphore::Semaphore;
129    /// # tokio_test::block_on(async {
130    /// let semaphore = Semaphore::new(15);
131    /// let mut g1 = semaphore.acquire(10).await.unwrap();
132    /// let g2 = semaphore.acquire(5).await.unwrap();
133    /// g1.extend(g2);
134    /// # })
135    /// ```
136    pub fn extend(&mut self, other: SemaphoreGuardArc) {
137        let sem1 = self.semaphore.as_ref().unwrap();
138        let sem2 = other.semaphore.as_ref().unwrap();
139        assert!(Arc::ptr_eq(sem1, sem2),
140            "Can't extend a guard with a guard from a different Semaphore");
141        self.amount = self.amount.saturating_add(other.forget());
142    }
143
144    /// Drop the guard without calling [`Semaphore::release`]. This is useful when `release`s don't
145    /// correspond one-to-one with `acquires` or it's difficult to send the guard to the releaser.
146    /// # Examples
147    /// ```
148    /// # use async_weighted_semaphore::{Semaphore, PoisonError, SemaphoreGuardArc};
149    /// # use std::sync::Arc;
150    /// use async_channel::{Sender, SendError};
151    /// // Limit size of a producer-consumer queue. Receivers may wait for any number of items
152    /// // to be available.
153    /// async fn send<T>(semaphore: &Arc<Semaphore>,
154    ///                  sender: &Sender<T>,
155    ///                  message: T
156    ///         ) -> Result<(), SendError<T>>{
157    ///     match semaphore.acquire_arc(1).await {
158    ///         // A semaphore can be poisoned to prevent deadlock when a channel closes.
159    ///         Err(PoisonError) => Err(SendError(message)),
160    ///         Ok(guard) => {
161    ///             sender.send(message).await?;
162    ///             guard.forget();
163    ///             Ok(())
164    ///         }
165    ///     }
166    /// }
167    /// ```
168    pub fn forget(mut self) -> usize {
169        self.semaphore = None;
170        let amount = self.amount;
171        mem::forget(self);
172        amount
173    }
174
175    /// Split this `SemaphoreGuardArc` into two.
176    ///
177    /// The new guard will have `permits` permits, and this guard's permits will be reduced
178    /// accordingly.
179    ///
180    /// # Examples
181    /// ```
182    /// # use async_weighted_semaphore::Semaphore;
183    /// # use std::sync::Arc;
184    /// # tokio_test::block_on(async {
185    /// let semaphore = Arc::new(Semaphore::new(15));
186    /// let mut g1 = semaphore.acquire_arc(15).await.unwrap();
187    /// let g2 = g1.split(5).unwrap();
188    /// # })
189    /// ```
190    pub fn split(&mut self, permits: usize) -> Result<SemaphoreGuardArc, SemaphoreGuardSplitErr> {
191        if self.amount >= permits {
192            self.amount -= permits;
193            Ok(SemaphoreGuardArc {
194                semaphore: self.semaphore.clone(),
195                amount: permits,
196                panicking: self.panicking
197            })
198        } else {
199            Err(SemaphoreGuardSplitErr::Underflow)
200        }
201    }
202}
203
204
205impl<'a> Drop for SemaphoreGuard<'a> {
206    fn drop(&mut self) {
207        if !self.panicking && panicking() {
208            self.semaphore.poison();
209        } else {
210            self.semaphore.release(self.amount);
211        }
212    }
213}
214
215
216impl Drop for SemaphoreGuardArc {
217    fn drop(&mut self) {
218        if let Some(semaphore) = self.semaphore.take() {
219            if !self.panicking && panicking() {
220                semaphore.poison();
221            } else {
222                semaphore.release(self.amount);
223            }
224        }
225    }
226}
227
228impl<'a> Debug for SemaphoreGuard<'a> {
229    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
230        write!(f, "SemaphoreGuard({})", self.amount)
231    }
232}
233
234impl Debug for SemaphoreGuardArc {
235    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
236        write!(f, "SemaphoreGuardArc({})", self.amount)
237    }
238}
239