async_weighted_semaphore/guard.rs
1use std::{mem, fmt};
2use crate::Semaphore;
3use std::sync::Arc;
4use std::thread::panicking;
5use std::fmt::{Debug, Formatter};
6
7/// A guard returned by [`Semaphore::acquire`] that will call [`Semaphore::release`] when it
8/// is dropped (falls out of scope).
9/// # Examples
10/// ```
11/// # use futures::executor::block_on;
12/// use async_weighted_semaphore::{Semaphore, SemaphoreGuard};
13/// # block_on(async{
14/// let semaphore = Semaphore::new(1);
15/// let guard: SemaphoreGuard = semaphore.acquire(1).await.unwrap();
16/// # })
17/// ```
18#[must_use]
19pub struct SemaphoreGuard<'a> {
20 semaphore: &'a Semaphore,
21 amount: usize,
22 panicking: bool,
23}
24
25/// A guard returned by [`Semaphore::acquire_arc`] that will call [`Semaphore::release`] when it
26/// is dropped (falls out of scope). Can be sent between threads.
27#[must_use]
28pub struct SemaphoreGuardArc {
29 semaphore: Option<Arc<Semaphore>>,
30 amount: usize,
31 panicking: bool,
32}
33
34#[derive(Clone, Copy, Debug, Eq, PartialEq)]
35pub enum SemaphoreGuardSplitErr {
36 Underflow
37}
38
39impl<'a> SemaphoreGuard<'a> {
40 pub fn new(semaphore: &'a Semaphore, amount: usize) -> Self {
41 SemaphoreGuard { semaphore, amount, panicking: panicking() }
42 }
43
44 /// Combine two `SemaphoreGuard`s into one, with the sum of the originals' permits.
45 ///
46 /// # Examples
47 /// ```
48 /// # use async_weighted_semaphore::Semaphore;
49 /// # tokio_test::block_on(async {
50 /// let semaphore = Semaphore::new(15);
51 /// let mut g1 = semaphore.acquire(10).await.unwrap();
52 /// let g2 = semaphore.acquire(5).await.unwrap();
53 /// g1.extend(g2);
54 /// # })
55 /// ```
56 pub fn extend(&mut self, other: SemaphoreGuard<'a>) {
57 assert!(std::ptr::eq(self.semaphore, other.semaphore),
58 "Can't extend a guard with a guard from a different Semaphore");
59 self.amount = self.amount.saturating_add(other.forget());
60 }
61
62 /// Drop the guard without calling [`Semaphore::release`]. This is useful when `release`s don't
63 /// correspond one-to-one with `acquires` or it's difficult to send the guard to the releaser.
64 /// # Examples
65 /// ```
66 /// # use async_weighted_semaphore::{Semaphore, PoisonError, SemaphoreGuardArc};
67 /// use async_channel::{Sender, SendError};
68 /// // Limit size of a producer-consumer queue. Receivers may wait for any number of items
69 /// // to be available.
70 /// async fn send<T>(semaphore: &Semaphore,
71 /// sender: &Sender<T>,
72 /// message: T
73 /// ) -> Result<(), SendError<T>>{
74 /// match semaphore.acquire(1).await {
75 /// // A semaphore can be poisoned to prevent deadlock when a channel closes.
76 /// Err(PoisonError) => Err(SendError(message)),
77 /// Ok(guard) => {
78 /// sender.send(message).await?;
79 /// guard.forget();
80 /// Ok(())
81 /// }
82 /// }
83 /// }
84 /// ```
85 pub fn forget(self) -> usize {
86 let amount = self.amount;
87 mem::forget(self);
88 amount
89 }
90
91 /// Split this `SemaphoreGuard` into two.
92 ///
93 /// The new guard will have `permits` permits, and this guard's permits will be reduced
94 /// accordingly.
95 ///
96 /// # Examples
97 /// ```
98 /// # use async_weighted_semaphore::Semaphore;
99 /// # tokio_test::block_on(async {
100 /// let semaphore = Semaphore::new(15);
101 /// let mut g1 = semaphore.acquire(15).await.unwrap();
102 /// let g2 = g1.split(5).unwrap();
103 /// # })
104 /// ```
105 pub fn split(&mut self, permits: usize) -> Result<SemaphoreGuard<'a>, SemaphoreGuardSplitErr> {
106 if self.amount >= permits {
107 self.amount -= permits;
108 Ok(SemaphoreGuard {
109 semaphore: self.semaphore.clone(),
110 amount: permits,
111 panicking: self.panicking
112 })
113 } else {
114 Err(SemaphoreGuardSplitErr::Underflow)
115 }
116 }
117}
118
119impl SemaphoreGuardArc {
120 pub fn new(semaphore: Arc<Semaphore>, amount: usize) -> Self {
121 SemaphoreGuardArc { semaphore: Some(semaphore), amount, panicking: panicking() }
122 }
123
124 /// Combine two `SemaphoreGuardArc`s into one, with the sum of the originals' permits.
125 ///
126 /// # Examples
127 /// ```
128 /// # use async_weighted_semaphore::Semaphore;
129 /// # tokio_test::block_on(async {
130 /// let semaphore = Semaphore::new(15);
131 /// let mut g1 = semaphore.acquire(10).await.unwrap();
132 /// let g2 = semaphore.acquire(5).await.unwrap();
133 /// g1.extend(g2);
134 /// # })
135 /// ```
136 pub fn extend(&mut self, other: SemaphoreGuardArc) {
137 let sem1 = self.semaphore.as_ref().unwrap();
138 let sem2 = other.semaphore.as_ref().unwrap();
139 assert!(Arc::ptr_eq(sem1, sem2),
140 "Can't extend a guard with a guard from a different Semaphore");
141 self.amount = self.amount.saturating_add(other.forget());
142 }
143
144 /// Drop the guard without calling [`Semaphore::release`]. This is useful when `release`s don't
145 /// correspond one-to-one with `acquires` or it's difficult to send the guard to the releaser.
146 /// # Examples
147 /// ```
148 /// # use async_weighted_semaphore::{Semaphore, PoisonError, SemaphoreGuardArc};
149 /// # use std::sync::Arc;
150 /// use async_channel::{Sender, SendError};
151 /// // Limit size of a producer-consumer queue. Receivers may wait for any number of items
152 /// // to be available.
153 /// async fn send<T>(semaphore: &Arc<Semaphore>,
154 /// sender: &Sender<T>,
155 /// message: T
156 /// ) -> Result<(), SendError<T>>{
157 /// match semaphore.acquire_arc(1).await {
158 /// // A semaphore can be poisoned to prevent deadlock when a channel closes.
159 /// Err(PoisonError) => Err(SendError(message)),
160 /// Ok(guard) => {
161 /// sender.send(message).await?;
162 /// guard.forget();
163 /// Ok(())
164 /// }
165 /// }
166 /// }
167 /// ```
168 pub fn forget(mut self) -> usize {
169 self.semaphore = None;
170 let amount = self.amount;
171 mem::forget(self);
172 amount
173 }
174
175 /// Split this `SemaphoreGuardArc` into two.
176 ///
177 /// The new guard will have `permits` permits, and this guard's permits will be reduced
178 /// accordingly.
179 ///
180 /// # Examples
181 /// ```
182 /// # use async_weighted_semaphore::Semaphore;
183 /// # use std::sync::Arc;
184 /// # tokio_test::block_on(async {
185 /// let semaphore = Arc::new(Semaphore::new(15));
186 /// let mut g1 = semaphore.acquire_arc(15).await.unwrap();
187 /// let g2 = g1.split(5).unwrap();
188 /// # })
189 /// ```
190 pub fn split(&mut self, permits: usize) -> Result<SemaphoreGuardArc, SemaphoreGuardSplitErr> {
191 if self.amount >= permits {
192 self.amount -= permits;
193 Ok(SemaphoreGuardArc {
194 semaphore: self.semaphore.clone(),
195 amount: permits,
196 panicking: self.panicking
197 })
198 } else {
199 Err(SemaphoreGuardSplitErr::Underflow)
200 }
201 }
202}
203
204
205impl<'a> Drop for SemaphoreGuard<'a> {
206 fn drop(&mut self) {
207 if !self.panicking && panicking() {
208 self.semaphore.poison();
209 } else {
210 self.semaphore.release(self.amount);
211 }
212 }
213}
214
215
216impl Drop for SemaphoreGuardArc {
217 fn drop(&mut self) {
218 if let Some(semaphore) = self.semaphore.take() {
219 if !self.panicking && panicking() {
220 semaphore.poison();
221 } else {
222 semaphore.release(self.amount);
223 }
224 }
225 }
226}
227
228impl<'a> Debug for SemaphoreGuard<'a> {
229 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
230 write!(f, "SemaphoreGuard({})", self.amount)
231 }
232}
233
234impl Debug for SemaphoreGuardArc {
235 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
236 write!(f, "SemaphoreGuardArc({})", self.amount)
237 }
238}
239