ranked_semaphore/semaphore/methods.rs
1use crate::error::{AcquireError, TryAcquireError};
2use crate::semaphore::permits::{OwnedRankedSemaphorePermit, RankedSemaphorePermit};
3use std::future::Future;
4use std::sync::atomic::Ordering;
5use std::sync::Arc;
6
7impl super::RankedSemaphore {
8 // === Acquire methods ===
9
10 /// Acquires a single permit with default priority (0).
11 ///
12 /// This method will wait until a permit becomes available. If the semaphore
13 /// is closed while waiting, it returns `AcquireError::Closed`.
14 ///
15 /// # Returns
16 ///
17 /// A future that resolves to either:
18 /// - `Ok(RankedSemaphorePermit)` - Successfully acquired permit
19 /// - `Err(AcquireError)` - Semaphore was closed
20 ///
21 /// # Examples
22 ///
23 /// ```rust
24 /// use ranked_semaphore::RankedSemaphore;
25 ///
26 /// # #[tokio::main]
27 /// # async fn main() {
28 /// let sem = RankedSemaphore::new_fifo(1);
29 /// let permit = sem.acquire().await.unwrap();
30 /// // Permit is automatically released when dropped
31 /// # }
32 /// ```
33 pub fn acquire(&self) -> impl Future<Output = Result<RankedSemaphorePermit<'_>, AcquireError>> {
34 self.acquire_many_with_priority(0, 1)
35 }
36
37 /// Acquires a single permit with the specified priority.
38 ///
39 /// Higher priority values are served first. Tasks with the same priority
40 /// are served according to the queue strategy (FIFO or LIFO).
41 ///
42 /// # Arguments
43 ///
44 /// * `priority` - The priority level for this acquisition request
45 ///
46 /// # Returns
47 ///
48 /// A future that resolves to either:
49 /// - `Ok(RankedSemaphorePermit)` - Successfully acquired permit
50 /// - `Err(AcquireError)` - Semaphore was closed
51 ///
52 /// # Examples
53 ///
54 /// ```rust
55 /// use ranked_semaphore::RankedSemaphore;
56 ///
57 /// # #[tokio::main]
58 /// # async fn main() {
59 /// let sem = RankedSemaphore::new_fifo(1);
60 /// let permit = sem.acquire_with_priority(10).await.unwrap();
61 /// # }
62 /// ```
63 pub fn acquire_with_priority(&self, priority: isize) -> impl Future<Output = Result<RankedSemaphorePermit<'_>, AcquireError>> {
64 self.acquire_many_with_priority(priority, 1)
65 }
66
67 /// Acquires multiple permits with default priority (0).
68 ///
69 /// This method will wait until all requested permits become available.
70 /// The operation is atomic - either all permits are acquired or none are.
71 ///
72 /// # Arguments
73 ///
74 /// * `n` - The number of permits to acquire
75 ///
76 /// # Returns
77 ///
78 /// A future that resolves to either:
79 /// - `Ok(RankedSemaphorePermit)` - Successfully acquired all permits
80 /// - `Err(AcquireError)` - Semaphore was closed
81 ///
82 /// # Examples
83 ///
84 /// ```rust
85 /// use ranked_semaphore::RankedSemaphore;
86 ///
87 /// # #[tokio::main]
88 /// # async fn main() {
89 /// let sem = RankedSemaphore::new_fifo(5);
90 /// let permits = sem.acquire_many(3).await.unwrap();
91 /// assert_eq!(permits.num_permits(), 3);
92 /// # }
93 /// ```
94 pub fn acquire_many(&self, n: u32) -> impl Future<Output = Result<RankedSemaphorePermit<'_>, AcquireError>> {
95 self.acquire_many_with_priority(0, n)
96 }
97
98 /// Acquires multiple permits with the specified priority.
99 ///
100 /// This method will wait until all requested permits become available.
101 /// The operation is atomic - either all permits are acquired or none are.
102 ///
103 /// # Arguments
104 ///
105 /// * `priority` - The priority level for this acquisition request
106 /// * `n` - The number of permits to acquire
107 ///
108 /// # Returns
109 ///
110 /// A future that resolves to either:
111 /// - `Ok(RankedSemaphorePermit)` - Successfully acquired all permits
112 /// - `Err(AcquireError)` - Semaphore was closed
113 ///
114 /// # Examples
115 ///
116 /// ```rust
117 /// use ranked_semaphore::RankedSemaphore;
118 ///
119 /// # #[tokio::main]
120 /// # async fn main() {
121 /// let sem = RankedSemaphore::new_fifo(5);
122 /// let permits = sem.acquire_many_with_priority(10, 3).await.unwrap();
123 /// assert_eq!(permits.num_permits(), 3);
124 /// # }
125 /// ```
126 pub fn acquire_many_with_priority(
127 &self,
128 priority: isize,
129 n: u32,
130 ) -> impl Future<Output = Result<RankedSemaphorePermit<'_>, AcquireError>> {
131 super::futures::Acquire {
132 semaphore: self,
133 permits_needed: n as usize,
134 priority,
135 waiter_handle: None,
136 }
137 }
138
139 // === Non-blocking acquire methods ===
140
141 /// Attempts to acquire a single permit without waiting.
142 ///
143 /// This method returns immediately, either with a permit or an error.
144 /// It will not wait for permits to become available.
145 ///
146 /// # Returns
147 ///
148 /// * `Ok(RankedSemaphorePermit)` - Successfully acquired permit
149 /// * `Err(TryAcquireError::Closed)` - Semaphore is closed
150 /// * `Err(TryAcquireError::NoPermits)` - No permits available
151 ///
152 /// # Examples
153 ///
154 /// ```rust
155 /// use ranked_semaphore::{RankedSemaphore, TryAcquireError};
156 ///
157 /// let sem = RankedSemaphore::new_fifo(1);
158 /// let _permit1 = sem.try_acquire().unwrap();
159 ///
160 /// match sem.try_acquire() {
161 /// Ok(_) => panic!("Should not succeed"),
162 /// Err(TryAcquireError::NoPermits) => println!("No permits available"),
163 /// Err(TryAcquireError::Closed) => println!("Semaphore closed"),
164 /// };
165 /// ```
166 pub fn try_acquire(
167 &self,
168 ) -> Result<super::permits::RankedSemaphorePermit<'_>, TryAcquireError> {
169 // Optimized: single load and combined check
170 let curr = self.permits.load(Ordering::Acquire);
171
172 // Quick check: closed and no permits in one go
173 if curr & Self::CLOSED != 0 {
174 return Err(TryAcquireError::Closed);
175 }
176 if curr < (1 << Self::PERMIT_SHIFT) {
177 return Err(TryAcquireError::NoPermits);
178 }
179
180 // Try immediate CAS
181 let next = curr - (1 << Self::PERMIT_SHIFT);
182 match self
183 .permits
184 .compare_exchange(curr, next, Ordering::AcqRel, Ordering::Acquire)
185 {
186 Ok(_) => Ok(super::permits::RankedSemaphorePermit {
187 sem: self,
188 permits: 1,
189 }),
190 Err(_) => {
191 // Fall back to original retry loop only if immediate CAS fails
192 self.try_acquire_retry_loop()
193 }
194 }
195 }
196
197 /// Retry loop for contended cases
198 fn try_acquire_retry_loop(
199 &self,
200 ) -> Result<super::permits::RankedSemaphorePermit<'_>, TryAcquireError> {
201 let mut curr = self.permits.load(Ordering::Acquire);
202 loop {
203 // Check if semaphore is closed
204 if curr & Self::CLOSED == Self::CLOSED {
205 return Err(TryAcquireError::Closed);
206 }
207
208 // Check if enough permits are available
209 if curr < (1 << Self::PERMIT_SHIFT) {
210 return Err(TryAcquireError::NoPermits);
211 }
212
213 let next = curr - (1 << Self::PERMIT_SHIFT);
214 match self.permits.compare_exchange_weak(
215 curr,
216 next,
217 Ordering::AcqRel,
218 Ordering::Acquire,
219 ) {
220 Ok(_) => {
221 return Ok(super::permits::RankedSemaphorePermit {
222 sem: self,
223 permits: 1,
224 })
225 }
226 Err(actual) => curr = actual,
227 }
228 }
229 }
230
231 /// Attempts to acquire multiple permits without waiting.
232 ///
233 /// This method returns immediately, either with all requested permits or an error.
234 /// The operation is atomic - either all permits are acquired or none are.
235 ///
236 /// # Arguments
237 ///
238 /// * `n` - The number of permits to acquire
239 ///
240 /// # Returns
241 ///
242 /// * `Ok(RankedSemaphorePermit)` - Successfully acquired all permits
243 /// * `Err(TryAcquireError::Closed)` - Semaphore is closed
244 /// * `Err(TryAcquireError::NoPermits)` - Insufficient permits available
245 ///
246 /// # Panics
247 ///
248 /// Panics if `n` exceeds `MAX_PERMITS` (usize::MAX >> 3).
249 ///
250 /// # Examples
251 ///
252 /// ```rust
253 /// use ranked_semaphore::{RankedSemaphore, TryAcquireError};
254 ///
255 /// let sem = RankedSemaphore::new_fifo(5);
256 /// let _permits = sem.try_acquire_many(3).unwrap();
257 ///
258 /// match sem.try_acquire_many(5) {
259 /// Ok(_) => panic!("Should not succeed"),
260 /// Err(TryAcquireError::NoPermits) => println!("Not enough permits"),
261 /// Err(TryAcquireError::Closed) => println!("Semaphore closed"),
262 /// };
263 /// ```
264 pub fn try_acquire_many(
265 &self,
266 n: u32,
267 ) -> Result<super::permits::RankedSemaphorePermit<'_>, TryAcquireError> {
268 if n == 0 {
269 return Ok(super::permits::RankedSemaphorePermit {
270 sem: self,
271 permits: 0,
272 });
273 }
274
275 if n as usize > Self::MAX_PERMITS {
276 panic!("try_acquire_many: n exceeds MAX_PERMITS");
277 }
278
279 let n_shifted = (n as usize) << Self::PERMIT_SHIFT;
280 let mut curr = self.permits.load(Ordering::Acquire);
281 loop {
282 // Check if semaphore is closed
283 if curr & Self::CLOSED == Self::CLOSED {
284 return Err(TryAcquireError::Closed);
285 }
286
287 // Check if enough permits are available
288 if curr < n_shifted {
289 return Err(TryAcquireError::NoPermits);
290 }
291
292 let next = curr - n_shifted;
293 match self.permits.compare_exchange_weak(
294 curr,
295 next,
296 Ordering::AcqRel,
297 Ordering::Acquire,
298 ) {
299 Ok(_) => {
300 return Ok(super::permits::RankedSemaphorePermit {
301 sem: self,
302 permits: n,
303 })
304 }
305 Err(actual) => curr = actual,
306 }
307 }
308 }
309
310 // === Owned acquire methods ===
311
312 /// Acquires a single owned permit with default priority (0).
313 ///
314 /// This method returns an owned permit that holds a reference to the semaphore.
315 /// The semaphore must be wrapped in an `Arc` to call this method.
316 ///
317 /// # Returns
318 ///
319 /// A future that resolves to either:
320 /// - `Ok(OwnedRankedSemaphorePermit)` - Successfully acquired permit
321 /// - `Err(AcquireError)` - Semaphore was closed
322 ///
323 /// # Examples
324 ///
325 /// ```rust
326 /// use ranked_semaphore::RankedSemaphore;
327 /// use std::sync::Arc;
328 ///
329 /// # #[tokio::main]
330 /// # async fn main() {
331 /// let sem = Arc::new(RankedSemaphore::new_fifo(1));
332 /// let permit = sem.acquire_owned().await.unwrap();
333 /// # }
334 /// ```
335 pub fn acquire_owned(self: Arc<Self>) -> impl Future<Output = Result<OwnedRankedSemaphorePermit, AcquireError>> {
336 self.acquire_many_owned_with_priority(0, 1)
337 }
338
339 /// Acquires a single owned permit with the specified priority.
340 ///
341 /// Higher priority values are served first. The semaphore must be wrapped
342 /// in an `Arc` to call this method.
343 ///
344 /// # Arguments
345 ///
346 /// * `priority` - The priority level for this acquisition request
347 ///
348 /// # Returns
349 ///
350 /// A future that resolves to either:
351 /// - `Ok(OwnedRankedSemaphorePermit)` - Successfully acquired permit
352 /// - `Err(AcquireError)` - Semaphore was closed
353 ///
354 /// # Examples
355 ///
356 /// ```rust
357 /// use ranked_semaphore::RankedSemaphore;
358 /// use std::sync::Arc;
359 ///
360 /// # #[tokio::main]
361 /// # async fn main() {
362 /// let sem = Arc::new(RankedSemaphore::new_fifo(1));
363 /// let permit = sem.acquire_owned_with_priority(10).await.unwrap();
364 /// # }
365 /// ```
366 pub fn acquire_owned_with_priority(
367 self: Arc<Self>,
368 priority: isize,
369 ) -> impl Future<Output = Result<OwnedRankedSemaphorePermit, AcquireError>> {
370 self.acquire_many_owned_with_priority(priority, 1)
371 }
372
373 /// Acquires multiple owned permits with default priority (0).
374 ///
375 /// This method will wait until all requested permits become available.
376 /// The operation is atomic - either all permits are acquired or none are.
377 ///
378 /// # Arguments
379 ///
380 /// * `n` - The number of permits to acquire
381 ///
382 /// # Returns
383 ///
384 /// A future that resolves to either:
385 /// - `Ok(OwnedRankedSemaphorePermit)` - Successfully acquired all permits
386 /// - `Err(AcquireError)` - Semaphore was closed
387 ///
388 /// # Examples
389 ///
390 /// ```rust
391 /// use ranked_semaphore::RankedSemaphore;
392 /// use std::sync::Arc;
393 ///
394 /// # #[tokio::main]
395 /// # async fn main() {
396 /// let sem = Arc::new(RankedSemaphore::new_fifo(5));
397 /// let permits = sem.acquire_many_owned(3).await.unwrap();
398 /// assert_eq!(permits.num_permits(), 3);
399 /// # }
400 /// ```
401 pub fn acquire_many_owned(self: Arc<Self>, n: u32) -> impl Future<Output = Result<OwnedRankedSemaphorePermit, AcquireError>> {
402 self.acquire_many_owned_with_priority(0, n)
403 }
404
405 /// Acquires multiple owned permits with the specified priority.
406 ///
407 /// This method will wait until all requested permits become available.
408 /// The operation is atomic - either all permits are acquired or none are.
409 ///
410 /// # Arguments
411 ///
412 /// * `priority` - The priority level for this acquisition request
413 /// * `n` - The number of permits to acquire
414 ///
415 /// # Returns
416 ///
417 /// A future that resolves to either:
418 /// - `Ok(OwnedRankedSemaphorePermit)` - Successfully acquired all permits
419 /// - `Err(AcquireError)` - Semaphore was closed
420 ///
421 /// # Examples
422 ///
423 /// ```rust
424 /// use ranked_semaphore::RankedSemaphore;
425 /// use std::sync::Arc;
426 ///
427 /// # #[tokio::main]
428 /// # async fn main() {
429 /// let sem = Arc::new(RankedSemaphore::new_fifo(5));
430 /// let permits = sem.acquire_many_owned_with_priority(10, 3).await.unwrap();
431 /// assert_eq!(permits.num_permits(), 3);
432 /// # }
433 /// ```
434 pub fn acquire_many_owned_with_priority(
435 self: Arc<Self>,
436 priority: isize,
437 n: u32,
438 ) -> impl Future<Output = Result<OwnedRankedSemaphorePermit, AcquireError>> {
439 super::futures::AcquireOwned {
440 semaphore: self,
441 permits_needed: n as usize,
442 priority,
443 waiter_handle: None,
444 }
445 }
446
447 /// Attempts to acquire a single owned permit without waiting.
448 ///
449 /// This method returns immediately, either with a permit or an error.
450 /// The semaphore must be wrapped in an `Arc` to call this method.
451 ///
452 /// # Returns
453 ///
454 /// * `Ok(OwnedRankedSemaphorePermit)` - Successfully acquired permit
455 /// * `Err(TryAcquireError::Closed)` - Semaphore is closed
456 /// * `Err(TryAcquireError::NoPermits)` - No permits available
457 ///
458 /// # Examples
459 ///
460 /// ```rust
461 /// use ranked_semaphore::{RankedSemaphore, TryAcquireError};
462 /// use std::sync::Arc;
463 ///
464 /// let sem = Arc::new(RankedSemaphore::new_fifo(1));
465 /// let _permit1 = sem.clone().try_acquire_owned().unwrap();
466 ///
467 /// match sem.try_acquire_owned() {
468 /// Ok(_) => panic!("Should not succeed"),
469 /// Err(TryAcquireError::NoPermits) => println!("No permits available"),
470 /// Err(TryAcquireError::Closed) => println!("Semaphore closed"),
471 /// }
472 /// ```
473 pub fn try_acquire_owned(
474 self: Arc<Self>,
475 ) -> Result<super::permits::OwnedRankedSemaphorePermit, TryAcquireError> {
476 let mut curr = self.permits.load(Ordering::Acquire);
477 loop {
478 // Check if semaphore is closed
479 if curr & Self::CLOSED == Self::CLOSED {
480 return Err(TryAcquireError::Closed);
481 }
482
483 // Check if enough permits are available (need at least 1 << PERMIT_SHIFT)
484 if curr < (1 << Self::PERMIT_SHIFT) {
485 return Err(TryAcquireError::NoPermits);
486 }
487
488 let next = curr - (1 << Self::PERMIT_SHIFT);
489 match self.permits.compare_exchange_weak(
490 curr,
491 next,
492 Ordering::AcqRel,
493 Ordering::Acquire,
494 ) {
495 Ok(_) => {
496 return Ok(super::permits::OwnedRankedSemaphorePermit {
497 sem: self,
498 permits: 1,
499 })
500 }
501 Err(actual) => curr = actual,
502 }
503 }
504 }
505
506 /// Attempts to acquire multiple owned permits without waiting.
507 ///
508 /// This method returns immediately, either with all requested permits or an error.
509 /// The operation is atomic - either all permits are acquired or none are.
510 /// The semaphore must be wrapped in an `Arc` to call this method.
511 ///
512 /// # Arguments
513 ///
514 /// * `n` - The number of permits to acquire
515 ///
516 /// # Returns
517 ///
518 /// * `Ok(OwnedRankedSemaphorePermit)` - Successfully acquired all permits
519 /// * `Err(TryAcquireError::Closed)` - Semaphore is closed
520 /// * `Err(TryAcquireError::NoPermits)` - Insufficient permits available
521 ///
522 /// # Panics
523 ///
524 /// Panics if `n` exceeds `MAX_PERMITS` (usize::MAX >> 3).
525 ///
526 /// # Examples
527 ///
528 /// ```rust
529 /// use ranked_semaphore::{RankedSemaphore, TryAcquireError};
530 /// use std::sync::Arc;
531 ///
532 /// let sem = Arc::new(RankedSemaphore::new_fifo(5));
533 /// let _permits = sem.clone().try_acquire_many_owned(3).unwrap();
534 ///
535 /// match sem.try_acquire_many_owned(5) {
536 /// Ok(_) => panic!("Should not succeed"),
537 /// Err(TryAcquireError::NoPermits) => println!("Not enough permits"),
538 /// Err(TryAcquireError::Closed) => println!("Semaphore closed"),
539 /// }
540 /// ```
541 pub fn try_acquire_many_owned(
542 self: Arc<Self>,
543 n: u32,
544 ) -> Result<super::permits::OwnedRankedSemaphorePermit, TryAcquireError> {
545 if n == 0 {
546 return Ok(super::permits::OwnedRankedSemaphorePermit {
547 sem: self,
548 permits: 0,
549 });
550 }
551
552 if n as usize > Self::MAX_PERMITS {
553 panic!("try_acquire_many_owned: n exceeds MAX_PERMITS");
554 }
555
556 let n_shifted = (n as usize) << Self::PERMIT_SHIFT;
557 let mut curr = self.permits.load(Ordering::Acquire);
558 loop {
559 // Check if semaphore is closed
560 if curr & Self::CLOSED == Self::CLOSED {
561 return Err(TryAcquireError::Closed);
562 }
563
564 // Check if enough permits are available
565 if curr < n_shifted {
566 return Err(TryAcquireError::NoPermits);
567 }
568
569 let next = curr - n_shifted;
570 match self.permits.compare_exchange_weak(
571 curr,
572 next,
573 Ordering::AcqRel,
574 Ordering::Acquire,
575 ) {
576 Ok(_) => {
577 return Ok(super::permits::OwnedRankedSemaphorePermit {
578 sem: self,
579 permits: n,
580 })
581 }
582 Err(actual) => curr = actual,
583 }
584 }
585 }
586}