ranked_semaphore/semaphore/
methods.rs

1use crate::error::{AcquireError, TryAcquireError};
2use crate::semaphore::permits::{OwnedRankedSemaphorePermit, RankedSemaphorePermit};
3use std::future::Future;
4use std::sync::atomic::Ordering;
5use std::sync::Arc;
6
7impl super::RankedSemaphore {
8    // === Acquire methods ===
9
10    /// Acquires a single permit with default priority (0).
11    ///
12    /// This method will wait until a permit becomes available. If the semaphore
13    /// is closed while waiting, it returns `AcquireError::Closed`.
14    ///
15    /// # Returns
16    ///
17    /// A future that resolves to either:
18    /// - `Ok(RankedSemaphorePermit)` - Successfully acquired permit
19    /// - `Err(AcquireError)` - Semaphore was closed
20    ///
21    /// # Examples
22    ///
23    /// ```rust
24    /// use ranked_semaphore::RankedSemaphore;
25    ///
26    /// # #[tokio::main]
27    /// # async fn main() {
28    /// let sem = RankedSemaphore::new_fifo(1);
29    /// let permit = sem.acquire().await.unwrap();
30    /// // Permit is automatically released when dropped
31    /// # }
32    /// ```
33    pub fn acquire(&self) -> impl Future<Output = Result<RankedSemaphorePermit<'_>, AcquireError>> {
34        self.acquire_many_with_priority(0, 1)
35    }
36
37    /// Acquires a single permit with the specified priority.
38    ///
39    /// Higher priority values are served first. Tasks with the same priority
40    /// are served according to the queue strategy (FIFO or LIFO).
41    ///
42    /// # Arguments
43    ///
44    /// * `priority` - The priority level for this acquisition request
45    ///
46    /// # Returns
47    ///
48    /// A future that resolves to either:
49    /// - `Ok(RankedSemaphorePermit)` - Successfully acquired permit
50    /// - `Err(AcquireError)` - Semaphore was closed
51    ///
52    /// # Examples
53    ///
54    /// ```rust
55    /// use ranked_semaphore::RankedSemaphore;
56    ///
57    /// # #[tokio::main]
58    /// # async fn main() {
59    /// let sem = RankedSemaphore::new_fifo(1);
60    /// let permit = sem.acquire_with_priority(10).await.unwrap();
61    /// # }
62    /// ```
63    pub fn acquire_with_priority(&self, priority: isize) -> impl Future<Output = Result<RankedSemaphorePermit<'_>, AcquireError>> {
64        self.acquire_many_with_priority(priority, 1)
65    }
66
67    /// Acquires multiple permits with default priority (0).
68    ///
69    /// This method will wait until all requested permits become available.
70    /// The operation is atomic - either all permits are acquired or none are.
71    ///
72    /// # Arguments
73    ///
74    /// * `n` - The number of permits to acquire
75    ///
76    /// # Returns
77    ///
78    /// A future that resolves to either:
79    /// - `Ok(RankedSemaphorePermit)` - Successfully acquired all permits
80    /// - `Err(AcquireError)` - Semaphore was closed
81    ///
82    /// # Examples
83    ///
84    /// ```rust
85    /// use ranked_semaphore::RankedSemaphore;
86    ///
87    /// # #[tokio::main]
88    /// # async fn main() {
89    /// let sem = RankedSemaphore::new_fifo(5);
90    /// let permits = sem.acquire_many(3).await.unwrap();
91    /// assert_eq!(permits.num_permits(), 3);
92    /// # }
93    /// ```
94    pub fn acquire_many(&self, n: u32) -> impl Future<Output = Result<RankedSemaphorePermit<'_>, AcquireError>> {
95        self.acquire_many_with_priority(0, n)
96    }
97
98    /// Acquires multiple permits with the specified priority.
99    ///
100    /// This method will wait until all requested permits become available.
101    /// The operation is atomic - either all permits are acquired or none are.
102    ///
103    /// # Arguments
104    ///
105    /// * `priority` - The priority level for this acquisition request
106    /// * `n` - The number of permits to acquire
107    ///
108    /// # Returns
109    ///
110    /// A future that resolves to either:
111    /// - `Ok(RankedSemaphorePermit)` - Successfully acquired all permits
112    /// - `Err(AcquireError)` - Semaphore was closed
113    ///
114    /// # Examples
115    ///
116    /// ```rust
117    /// use ranked_semaphore::RankedSemaphore;
118    ///
119    /// # #[tokio::main]
120    /// # async fn main() {
121    /// let sem = RankedSemaphore::new_fifo(5);
122    /// let permits = sem.acquire_many_with_priority(10, 3).await.unwrap();
123    /// assert_eq!(permits.num_permits(), 3);
124    /// # }
125    /// ```
126    pub fn acquire_many_with_priority(
127        &self,
128        priority: isize,
129        n: u32,
130    ) -> impl Future<Output = Result<RankedSemaphorePermit<'_>, AcquireError>> {
131        super::futures::Acquire {
132            semaphore: self,
133            permits_needed: n as usize,
134            priority,
135            waiter_handle: None,
136        }
137    }
138
139    // === Non-blocking acquire methods ===
140
141    /// Attempts to acquire a single permit without waiting.
142    ///
143    /// This method returns immediately, either with a permit or an error.
144    /// It will not wait for permits to become available.
145    ///
146    /// # Returns
147    ///
148    /// * `Ok(RankedSemaphorePermit)` - Successfully acquired permit
149    /// * `Err(TryAcquireError::Closed)` - Semaphore is closed
150    /// * `Err(TryAcquireError::NoPermits)` - No permits available
151    ///
152    /// # Examples
153    ///
154    /// ```rust
155    /// use ranked_semaphore::{RankedSemaphore, TryAcquireError};
156    ///
157    /// let sem = RankedSemaphore::new_fifo(1);
158    /// let _permit1 = sem.try_acquire().unwrap();
159    /// 
160    /// match sem.try_acquire() {
161    ///     Ok(_) => panic!("Should not succeed"),
162    ///     Err(TryAcquireError::NoPermits) => println!("No permits available"),
163    ///     Err(TryAcquireError::Closed) => println!("Semaphore closed"),
164    /// };
165    /// ```
166    pub fn try_acquire(
167        &self,
168    ) -> Result<super::permits::RankedSemaphorePermit<'_>, TryAcquireError> {
169        // Optimized: single load and combined check
170        let curr = self.permits.load(Ordering::Acquire);
171
172        // Quick check: closed and no permits in one go
173        if curr & Self::CLOSED != 0 {
174            return Err(TryAcquireError::Closed);
175        }
176        if curr < (1 << Self::PERMIT_SHIFT) {
177            return Err(TryAcquireError::NoPermits);
178        }
179
180        // Try immediate CAS
181        let next = curr - (1 << Self::PERMIT_SHIFT);
182        match self
183            .permits
184            .compare_exchange(curr, next, Ordering::AcqRel, Ordering::Acquire)
185        {
186            Ok(_) => Ok(super::permits::RankedSemaphorePermit {
187                sem: self,
188                permits: 1,
189            }),
190            Err(_) => {
191                // Fall back to original retry loop only if immediate CAS fails
192                self.try_acquire_retry_loop()
193            }
194        }
195    }
196
197    /// Retry loop for contended cases
198    fn try_acquire_retry_loop(
199        &self,
200    ) -> Result<super::permits::RankedSemaphorePermit<'_>, TryAcquireError> {
201        let mut curr = self.permits.load(Ordering::Acquire);
202        loop {
203            // Check if semaphore is closed
204            if curr & Self::CLOSED == Self::CLOSED {
205                return Err(TryAcquireError::Closed);
206            }
207
208            // Check if enough permits are available
209            if curr < (1 << Self::PERMIT_SHIFT) {
210                return Err(TryAcquireError::NoPermits);
211            }
212
213            let next = curr - (1 << Self::PERMIT_SHIFT);
214            match self.permits.compare_exchange_weak(
215                curr,
216                next,
217                Ordering::AcqRel,
218                Ordering::Acquire,
219            ) {
220                Ok(_) => {
221                    return Ok(super::permits::RankedSemaphorePermit {
222                        sem: self,
223                        permits: 1,
224                    })
225                }
226                Err(actual) => curr = actual,
227            }
228        }
229    }
230
231    /// Attempts to acquire multiple permits without waiting.
232    ///
233    /// This method returns immediately, either with all requested permits or an error.
234    /// The operation is atomic - either all permits are acquired or none are.
235    ///
236    /// # Arguments
237    ///
238    /// * `n` - The number of permits to acquire
239    ///
240    /// # Returns
241    ///
242    /// * `Ok(RankedSemaphorePermit)` - Successfully acquired all permits
243    /// * `Err(TryAcquireError::Closed)` - Semaphore is closed
244    /// * `Err(TryAcquireError::NoPermits)` - Insufficient permits available
245    ///
246    /// # Panics
247    ///
248    /// Panics if `n` exceeds `MAX_PERMITS` (usize::MAX >> 3).
249    ///
250    /// # Examples
251    ///
252    /// ```rust
253    /// use ranked_semaphore::{RankedSemaphore, TryAcquireError};
254    ///
255    /// let sem = RankedSemaphore::new_fifo(5);
256    /// let _permits = sem.try_acquire_many(3).unwrap();
257    /// 
258    /// match sem.try_acquire_many(5) {
259    ///     Ok(_) => panic!("Should not succeed"),
260    ///     Err(TryAcquireError::NoPermits) => println!("Not enough permits"),
261    ///     Err(TryAcquireError::Closed) => println!("Semaphore closed"),
262    /// };
263    /// ```
264    pub fn try_acquire_many(
265        &self,
266        n: u32,
267    ) -> Result<super::permits::RankedSemaphorePermit<'_>, TryAcquireError> {
268        if n == 0 {
269            return Ok(super::permits::RankedSemaphorePermit {
270                sem: self,
271                permits: 0,
272            });
273        }
274
275        if n as usize > Self::MAX_PERMITS {
276            panic!("try_acquire_many: n exceeds MAX_PERMITS");
277        }
278
279        let n_shifted = (n as usize) << Self::PERMIT_SHIFT;
280        let mut curr = self.permits.load(Ordering::Acquire);
281        loop {
282            // Check if semaphore is closed
283            if curr & Self::CLOSED == Self::CLOSED {
284                return Err(TryAcquireError::Closed);
285            }
286
287            // Check if enough permits are available
288            if curr < n_shifted {
289                return Err(TryAcquireError::NoPermits);
290            }
291
292            let next = curr - n_shifted;
293            match self.permits.compare_exchange_weak(
294                curr,
295                next,
296                Ordering::AcqRel,
297                Ordering::Acquire,
298            ) {
299                Ok(_) => {
300                    return Ok(super::permits::RankedSemaphorePermit {
301                        sem: self,
302                        permits: n,
303                    })
304                }
305                Err(actual) => curr = actual,
306            }
307        }
308    }
309
310    // === Owned acquire methods ===
311
312    /// Acquires a single owned permit with default priority (0).
313    ///
314    /// This method returns an owned permit that holds a reference to the semaphore.
315    /// The semaphore must be wrapped in an `Arc` to call this method.
316    ///
317    /// # Returns
318    ///
319    /// A future that resolves to either:
320    /// - `Ok(OwnedRankedSemaphorePermit)` - Successfully acquired permit
321    /// - `Err(AcquireError)` - Semaphore was closed
322    ///
323    /// # Examples
324    ///
325    /// ```rust
326    /// use ranked_semaphore::RankedSemaphore;
327    /// use std::sync::Arc;
328    ///
329    /// # #[tokio::main]
330    /// # async fn main() {
331    /// let sem = Arc::new(RankedSemaphore::new_fifo(1));
332    /// let permit = sem.acquire_owned().await.unwrap();
333    /// # }
334    /// ```
335    pub fn acquire_owned(self: Arc<Self>) -> impl Future<Output = Result<OwnedRankedSemaphorePermit, AcquireError>> {
336        self.acquire_many_owned_with_priority(0, 1)
337    }
338
339    /// Acquires a single owned permit with the specified priority.
340    ///
341    /// Higher priority values are served first. The semaphore must be wrapped
342    /// in an `Arc` to call this method.
343    ///
344    /// # Arguments
345    ///
346    /// * `priority` - The priority level for this acquisition request
347    ///
348    /// # Returns
349    ///
350    /// A future that resolves to either:
351    /// - `Ok(OwnedRankedSemaphorePermit)` - Successfully acquired permit
352    /// - `Err(AcquireError)` - Semaphore was closed
353    ///
354    /// # Examples
355    ///
356    /// ```rust
357    /// use ranked_semaphore::RankedSemaphore;
358    /// use std::sync::Arc;
359    ///
360    /// # #[tokio::main]
361    /// # async fn main() {
362    /// let sem = Arc::new(RankedSemaphore::new_fifo(1));
363    /// let permit = sem.acquire_owned_with_priority(10).await.unwrap();
364    /// # }
365    /// ```
366    pub fn acquire_owned_with_priority(
367        self: Arc<Self>,
368        priority: isize,
369    ) -> impl Future<Output = Result<OwnedRankedSemaphorePermit, AcquireError>> {
370        self.acquire_many_owned_with_priority(priority, 1)
371    }
372
373    /// Acquires multiple owned permits with default priority (0).
374    ///
375    /// This method will wait until all requested permits become available.
376    /// The operation is atomic - either all permits are acquired or none are.
377    ///
378    /// # Arguments
379    ///
380    /// * `n` - The number of permits to acquire
381    ///
382    /// # Returns
383    ///
384    /// A future that resolves to either:
385    /// - `Ok(OwnedRankedSemaphorePermit)` - Successfully acquired all permits
386    /// - `Err(AcquireError)` - Semaphore was closed
387    ///
388    /// # Examples
389    ///
390    /// ```rust
391    /// use ranked_semaphore::RankedSemaphore;
392    /// use std::sync::Arc;
393    ///
394    /// # #[tokio::main]
395    /// # async fn main() {
396    /// let sem = Arc::new(RankedSemaphore::new_fifo(5));
397    /// let permits = sem.acquire_many_owned(3).await.unwrap();
398    /// assert_eq!(permits.num_permits(), 3);
399    /// # }
400    /// ```
401    pub fn acquire_many_owned(self: Arc<Self>, n: u32) -> impl Future<Output = Result<OwnedRankedSemaphorePermit, AcquireError>> {
402        self.acquire_many_owned_with_priority(0, n)
403    }
404
405    /// Acquires multiple owned permits with the specified priority.
406    ///
407    /// This method will wait until all requested permits become available.
408    /// The operation is atomic - either all permits are acquired or none are.
409    ///
410    /// # Arguments
411    ///
412    /// * `priority` - The priority level for this acquisition request
413    /// * `n` - The number of permits to acquire
414    ///
415    /// # Returns
416    ///
417    /// A future that resolves to either:
418    /// - `Ok(OwnedRankedSemaphorePermit)` - Successfully acquired all permits
419    /// - `Err(AcquireError)` - Semaphore was closed
420    ///
421    /// # Examples
422    ///
423    /// ```rust
424    /// use ranked_semaphore::RankedSemaphore;
425    /// use std::sync::Arc;
426    ///
427    /// # #[tokio::main]
428    /// # async fn main() {
429    /// let sem = Arc::new(RankedSemaphore::new_fifo(5));
430    /// let permits = sem.acquire_many_owned_with_priority(10, 3).await.unwrap();
431    /// assert_eq!(permits.num_permits(), 3);
432    /// # }
433    /// ```
434    pub fn acquire_many_owned_with_priority(
435        self: Arc<Self>,
436        priority: isize,
437        n: u32,
438    ) -> impl Future<Output = Result<OwnedRankedSemaphorePermit, AcquireError>> {
439        super::futures::AcquireOwned {
440            semaphore: self,
441            permits_needed: n as usize,
442            priority,
443            waiter_handle: None,
444        }
445    }
446
447    /// Attempts to acquire a single owned permit without waiting.
448    ///
449    /// This method returns immediately, either with a permit or an error.
450    /// The semaphore must be wrapped in an `Arc` to call this method.
451    ///
452    /// # Returns
453    ///
454    /// * `Ok(OwnedRankedSemaphorePermit)` - Successfully acquired permit
455    /// * `Err(TryAcquireError::Closed)` - Semaphore is closed
456    /// * `Err(TryAcquireError::NoPermits)` - No permits available
457    ///
458    /// # Examples
459    ///
460    /// ```rust
461    /// use ranked_semaphore::{RankedSemaphore, TryAcquireError};
462    /// use std::sync::Arc;
463    ///
464    /// let sem = Arc::new(RankedSemaphore::new_fifo(1));
465    /// let _permit1 = sem.clone().try_acquire_owned().unwrap();
466    /// 
467    /// match sem.try_acquire_owned() {
468    ///     Ok(_) => panic!("Should not succeed"),
469    ///     Err(TryAcquireError::NoPermits) => println!("No permits available"),
470    ///     Err(TryAcquireError::Closed) => println!("Semaphore closed"),
471    /// }
472    /// ```
473    pub fn try_acquire_owned(
474        self: Arc<Self>,
475    ) -> Result<super::permits::OwnedRankedSemaphorePermit, TryAcquireError> {
476        let mut curr = self.permits.load(Ordering::Acquire);
477        loop {
478            // Check if semaphore is closed
479            if curr & Self::CLOSED == Self::CLOSED {
480                return Err(TryAcquireError::Closed);
481            }
482
483            // Check if enough permits are available (need at least 1 << PERMIT_SHIFT)
484            if curr < (1 << Self::PERMIT_SHIFT) {
485                return Err(TryAcquireError::NoPermits);
486            }
487
488            let next = curr - (1 << Self::PERMIT_SHIFT);
489            match self.permits.compare_exchange_weak(
490                curr,
491                next,
492                Ordering::AcqRel,
493                Ordering::Acquire,
494            ) {
495                Ok(_) => {
496                    return Ok(super::permits::OwnedRankedSemaphorePermit {
497                        sem: self,
498                        permits: 1,
499                    })
500                }
501                Err(actual) => curr = actual,
502            }
503        }
504    }
505
506    /// Attempts to acquire multiple owned permits without waiting.
507    ///
508    /// This method returns immediately, either with all requested permits or an error.
509    /// The operation is atomic - either all permits are acquired or none are.
510    /// The semaphore must be wrapped in an `Arc` to call this method.
511    ///
512    /// # Arguments
513    ///
514    /// * `n` - The number of permits to acquire
515    ///
516    /// # Returns
517    ///
518    /// * `Ok(OwnedRankedSemaphorePermit)` - Successfully acquired all permits
519    /// * `Err(TryAcquireError::Closed)` - Semaphore is closed
520    /// * `Err(TryAcquireError::NoPermits)` - Insufficient permits available
521    ///
522    /// # Panics
523    ///
524    /// Panics if `n` exceeds `MAX_PERMITS` (usize::MAX >> 3).
525    ///
526    /// # Examples
527    ///
528    /// ```rust
529    /// use ranked_semaphore::{RankedSemaphore, TryAcquireError};
530    /// use std::sync::Arc;
531    ///
532    /// let sem = Arc::new(RankedSemaphore::new_fifo(5));
533    /// let _permits = sem.clone().try_acquire_many_owned(3).unwrap();
534    /// 
535    /// match sem.try_acquire_many_owned(5) {
536    ///     Ok(_) => panic!("Should not succeed"),
537    ///     Err(TryAcquireError::NoPermits) => println!("Not enough permits"),
538    ///     Err(TryAcquireError::Closed) => println!("Semaphore closed"),
539    /// }
540    /// ```
541    pub fn try_acquire_many_owned(
542        self: Arc<Self>,
543        n: u32,
544    ) -> Result<super::permits::OwnedRankedSemaphorePermit, TryAcquireError> {
545        if n == 0 {
546            return Ok(super::permits::OwnedRankedSemaphorePermit {
547                sem: self,
548                permits: 0,
549            });
550        }
551
552        if n as usize > Self::MAX_PERMITS {
553            panic!("try_acquire_many_owned: n exceeds MAX_PERMITS");
554        }
555
556        let n_shifted = (n as usize) << Self::PERMIT_SHIFT;
557        let mut curr = self.permits.load(Ordering::Acquire);
558        loop {
559            // Check if semaphore is closed
560            if curr & Self::CLOSED == Self::CLOSED {
561                return Err(TryAcquireError::Closed);
562            }
563
564            // Check if enough permits are available
565            if curr < n_shifted {
566                return Err(TryAcquireError::NoPermits);
567            }
568
569            let next = curr - n_shifted;
570            match self.permits.compare_exchange_weak(
571                curr,
572                next,
573                Ordering::AcqRel,
574                Ordering::Acquire,
575            ) {
576                Ok(_) => {
577                    return Ok(super::permits::OwnedRankedSemaphorePermit {
578                        sem: self,
579                        permits: n,
580                    })
581                }
582                Err(actual) => curr = actual,
583            }
584        }
585    }
586}