Skip to main content

async_priority_lock/
semaphore.rs

1//! Contains the [Semaphore] struct for async, priority-ordered acquisition of permits.
2#[cfg(feature = "semaphore-total")]
3use crate::SyncWriteGuard;
4#[cfg(doc)]
5use crate::{FIFO, LIFO, Mutex, queue::*};
6use crate::{
7    Priority, RwLock,
8    queue::{PriorityQueue, PriorityQueueHandle},
9    waiter::{self, Waiter, WaiterFlagFut},
10};
11use core::{
12    cmp::Ordering,
13    error::Error,
14    fmt::{Debug, Display},
15    marker::PhantomData,
16    mem::ManuallyDrop,
17    usize,
18};
19
20#[cfg(feature = "const-default")]
21use const_default::ConstDefault;
22pub trait SemaphoreQueue<P: Priority>: PriorityQueue<SemaphoreWaiter<P>> {}
23impl<T: PriorityQueue<SemaphoreWaiter<P>>, P: Priority> SemaphoreQueue<P> for T {}
24
25#[derive(Debug)]
26/// Opaque waiter type used for [PriorityQueue] entries.
27///
28/// This implements [Priority] and is the entry type used by [Semaphore].
29pub struct SemaphoreWaiter<P: Priority> {
30    priority: P,
31    waiter: Waiter,
32    count: usize,
33}
34
35impl<P: Priority> SemaphoreWaiter<P> {
36    #[inline(always)]
37    fn count(&self) -> usize {
38        if cfg!(feature = "semaphore-total") {
39            self.count & !WITHIN_TOTAL_BIT
40        } else {
41            self.count
42        }
43    }
44
45    #[cfg(feature = "semaphore-total")]
46    #[inline(always)]
47    fn count_and_flag(&self) -> (usize, bool) {
48        (
49            self.count & !WITHIN_TOTAL_BIT,
50            self.count & WITHIN_TOTAL_BIT != 0,
51        )
52    }
53}
54
55/// Has the same priority as P except "held" entries always have higher priority than pending requesters
56///
57/// Additionally constraints order with the same priority to enqueue the lowest request count first
58/// to minimize waiting (this applies after the inner priority has compared, so [FIFO] and [LIFO]
59/// will prevent this).
60impl<P: Priority> Priority for SemaphoreWaiter<P> {
61    #[inline]
62    fn compare(&self, other: &Self) -> core::cmp::Ordering {
63        match (self.waiter.has_lock(), other.waiter.has_lock()) {
64            (true, false) => Ordering::Greater,
65            (false, true) => Ordering::Less,
66            // notably, we don't check count here - this is a hidden unsafe bit we have here
67            // based on us not changing count unless the waiter already has the lock.
68            //
69            // (at which point, we don't care )
70            _ => self.priority.compare(&other.priority),
71        }
72    }
73
74    #[inline]
75    fn compare_new(&self, old: &Self) -> Ordering {
76        match (self.waiter.has_lock(), old.waiter.has_lock()) {
77            (true, false) => Ordering::Greater,
78            (false, true) => Ordering::Less,
79            (is_held, _) => {
80                let ret = self.priority.compare_new(&old.priority);
81
82                if !is_held {
83                    // if the priority is the same, place nodes with lower count first
84                    // we can skip this once the permits are both held - we only care which has
85                    // lower count in the pending part of the queue so we can return those without
86                    // waiting to have enough permits for waiters of the same priority which need
87                    // more.
88
89                    return ret.then_with(|| self.count().compare(&old.count()).reverse());
90                }
91                ret
92            }
93        }
94    }
95}
96
97#[cfg(feature = "arena-queue")]
98type DefaultSemaphoreQueue_<P> = crate::queue::DualLinkArenaQueue<SemaphoreWaiter<P>>;
99#[cfg(all(feature = "box-queue", not(feature = "arena-queue")))]
100type DefaultSemaphoreQueue_<P> = crate::queue::DualLinkBoxQueue<SemaphoreWaiter<P>>;
101
102/// The default queue used for [Semaphore].
103///
104/// The actual queue used here may change, however it will always be
105/// [`Sync`]` + `[`Send`] if `P` is [`Sync`]` + `[`Send`].
106///
107/// Currently, the default queue used is as follows:
108/// - if `arena-queue` is enabled (default): [DualLinkArenaQueue] - there's more cases than what we
109/// have with [Mutex] where nodes won't be the head node when removed, thus the reduced performance
110/// in queueing is likely to be made up for by the reduced dequeue time.
111/// - if `box-queue` is enabled but not `arena-queue`: [DualLinkBoxQueue] - same rationale as for
112/// [DualLinkArenaQueue] except dequeuing is even more expensive for [BoxQueue] so this is almost
113/// always prefferred over [SingleLinkBoxQueue]
114/// - if neither feature is enabled, no default is provided
115#[cfg(any(feature = "arena-queue", feature = "box-queue"))]
116pub type DefaultSemaphoreQueue<P> = DefaultSemaphoreQueue_<P>;
117
118#[derive(Default)]
119/// The inner queue for this type is a bit complicated:
120/// At the start, we have all of the active holders ordered by their priorities.
121/// After, we have all of the waiters ordered by their priorities.
122///
123/// This means that when a waiter is granted permits, it needs to be repositioned to the start of
124/// the queue.
125struct SemaphoreInner<P: Priority, Q: SemaphoreQueue<P>> {
126    // PERF: We actually don't need to retain the waiters which hold permits if neither the
127    // `semaphore-total` nor `evict` features are enabled.
128    queue: Q,
129    #[cfg(feature = "semaphore-total")]
130    total: usize,
131    available: usize,
132    _phantom: PhantomData<P>,
133}
134
135impl<P: Priority, Q: SemaphoreQueue<P>> SemaphoreInner<P, Q> {
136    #[inline]
137    fn activate_waiters(&mut self, mut next: Option<Q::SharedHandle>) {
138        while let Some(handle) = next.take() {
139            let node = self.queue.get_by_handle(handle.as_ref());
140            let flags = node.waiter.flags();
141            let is_held = flags & waiter::WAITER_FLAG_HAS_LOCK != 0;
142
143            let count = node.count();
144            next = self.queue.get_next_handle(handle.as_ref());
145
146            // we still issue permits here even if the waiter is flagged for eviction (in the case
147            // where we started eviction and then raised capacity to sufficient levels before
148            // adding more permits)
149            if is_held {
150                continue;
151            }
152
153            if count > self.available {
154                // we will only ever evict nodes that aren't held when we evicted due to capacity.
155                // in that case, we should enqueue subsequent nodes.
156                let will_evict = cfg!(feature = "semaphore-total")
157                    && flags & waiter::WAITER_FLAG_WANTS_EVICT != 0;
158
159                // fine to skip this node since it will be evicted anyways
160                if will_evict {
161                    continue;
162                }
163
164                break;
165            }
166
167            self.available -= count;
168            self.queue.update_node(handle.as_ref(), |x| {
169                x.waiter.start();
170                true
171            });
172        }
173    }
174
175    #[cfg(feature = "semaphore-total")]
176    #[inline]
177    fn notify_oversized_waiters(&self, start: Option<&Q::Handle>) {
178        for node in self.queue.iter_at(start) {
179            // We should NEVER evict a holder here which already has permits, but it's impossible
180            // for a holder with permits to have > self.total permits - so we can filter those out
181            // implicitly here without having to do an atomic load for .has_lock()
182            let (count, should_evict) = node.count_and_flag();
183
184            if should_evict && count > self.total {
185                node.waiter.evict();
186            }
187        }
188    }
189}
190
191#[cfg(feature = "const-default")]
192impl<P: Priority, Q: ConstDefault + SemaphoreQueue<P>> ConstDefault for SemaphoreInner<P, Q> {
193    const DEFAULT: Self = Self {
194        queue: Q::DEFAULT,
195        #[cfg(feature = "semaphore-total")]
196        total: 0,
197        available: 0,
198        _phantom: PhantomData,
199    };
200}
201
202/// A guard which will conditionally activate subsequent nodes if either it or the previous node
203/// has the lock.
204///
205/// This is the inner type for SemaphorePermit, which has an optimized drop function as it
206/// knows that it currently holds the permits when it is dropped.
207struct SemaphorePermitWaiter<'a, P: Priority, Q: SemaphoreQueue<P>> {
208    semaphore: &'a Semaphore<P, Q>,
209    handle: ManuallyDrop<Q::Handle>,
210}
211
212unsafe impl<'a, P: Priority, Q: SemaphoreQueue<P>> Sync for SemaphorePermitWaiter<'a, P, Q>
213where
214    Semaphore<P, Q>: Sync,
215    Q::Handle: Sync,
216{
217}
218
219unsafe impl<'a, P: Priority, Q: SemaphoreQueue<P>> Send for SemaphorePermitWaiter<'a, P, Q>
220where
221    Semaphore<P, Q>: Sync,
222    Q::Handle: Send,
223{
224}
225
226impl<'a, P: Priority, Q: SemaphoreQueue<P>> SemaphorePermitWaiter<'a, P, Q> {
227    const HAS_PURE_LOAD: bool = Q::Handle::LOAD_PURE.is_some();
228}
229
230impl<'a, P: Priority, Q: SemaphoreQueue<P>> waiter::WaiterHandle
231    for SemaphorePermitWaiter<'a, P, Q>
232{
233    #[inline]
234    fn with_waker<T>(&self, f: impl FnOnce(&Waiter) -> T) -> T {
235        if Self::HAS_PURE_LOAD {
236            unsafe { f(&Q::Handle::LOAD_PURE.unwrap_unchecked()(&self.handle).waiter) }
237        } else {
238            let sem = self.semaphore.0.read();
239            f(&sem.queue.get_by_handle(&self.handle).waiter)
240        }
241    }
242}
243
244impl<'a, P: Priority, Q: SemaphoreQueue<P>> Drop for SemaphorePermitWaiter<'a, P, Q> {
245    #[inline]
246    fn drop(&mut self) {
247        let mut sem = self.semaphore.0.write();
248
249        let handle = unsafe { ManuallyDrop::take(&mut self.handle) };
250        let node = sem.queue.get_by_handle(&handle);
251        let is_active = node.waiter.has_lock();
252        if is_active {
253            sem.available += node.count();
254        }
255
256        // PERF: `is_active` is added here because node.count() cannot be zero, thus if
257        // node was active, the semaphore must now have permits.  This is used as an
258        // alternative to core::hint::assert_unchecked(node.count() > 0);
259        let has_available = is_active || sem.available > 0;
260
261        let (prev, next) = sem.queue.dequeue(handle);
262
263        if next.is_none() || !has_available {
264            return;
265        }
266
267        // we were active, thus we have permits to (potentially) grant
268        if is_active {
269            return sem.activate_waiters(next);
270        }
271
272        // Previous was locked (or was head) but this node wasn't. Since a previous node was, that
273        // means the later nodes may have been blocked due to waiting on this one.
274        if prev.is_none_or(|x| x.waiter.has_lock()) {
275            sem.activate_waiters(next);
276        }
277    }
278}
279
280#[repr(transparent)]
281/// A permit (or collection of permits) from a [Semaphore]
282///
283/// Acquired via [Semaphore::acquire] and associated fns.
284pub struct SemaphorePermit<'a, P: Priority, Q: SemaphoreQueue<P>>(
285    /// Publicly exposed semaphore guards are definitely loaded, so we can actually skip some of the
286    /// checks we need to do for [SemaphorePermitWaiter]
287    ManuallyDrop<SemaphorePermitWaiter<'a, P, Q>>,
288);
289
290unsafe impl<'a, P: Priority, Q: SemaphoreQueue<P>> Sync for SemaphorePermit<'a, P, Q>
291where
292    Semaphore<P, Q>: Sync,
293    Q::Handle: Sync,
294{
295}
296
297unsafe impl<'a, P: Priority, Q: SemaphoreQueue<P>> Send for SemaphorePermit<'a, P, Q>
298where
299    Semaphore<P, Q>: Sync,
300    Q::Handle: Send,
301{
302}
303
304impl<'a, P: Priority, Q: SemaphoreQueue<P>> SemaphorePermit<'a, P, Q> {
305    #[cfg(feature = "evict")]
306    #[inline]
307    /// Returns a future which resolves when / if a higher priority requester is waiting for permit
308    /// acquisition.  Available only if the `evict` flag is enabled.
309    ///
310    /// Cancel Safety: This function is cancel safe.
311    pub fn evicted(&mut self) -> impl Future<Output = ()> {
312        waiter::VoidFut(WaiterFlagFut::<_, { waiter::WAITER_FLAG_WANTS_EVICT }>::new(&*self.0))
313    }
314
315    /// Remove these permits from the pool of permits in the semaphore.
316    #[inline]
317    pub fn forget(mut self) {
318        let mut sem = self.0.semaphore.0.write();
319        #[cfg_attr(not(feature = "semaphore-total"), allow(unused))]
320        let count = sem.queue.get_by_handle(&self.0.handle).count();
321
322        #[cfg_attr(not(feature = "semaphore-total"), allow(unused))]
323        let (_, maybe_next) = sem
324            .queue
325            .dequeue(unsafe { ManuallyDrop::take(&mut self.0.handle) });
326
327        core::mem::forget(self);
328
329        #[cfg(feature = "semaphore-total")]
330        {
331            sem.total -= count;
332
333            if let Some(next) = maybe_next {
334                sem.downgrade()
335                    .notify_oversized_waiters(Some(next.as_ref()));
336            }
337        }
338    }
339
340    #[inline]
341    /// The count of permits held
342    pub fn permits(&self) -> usize {
343        if SemaphorePermitWaiter::<'a, P, Q>::HAS_PURE_LOAD {
344            return unsafe { Q::Handle::LOAD_PURE.unwrap_unchecked()(&self.0.handle).count() };
345        }
346        let sem = self.0.semaphore.0.read();
347
348        sem.queue.get_by_handle(&self.0.handle).count()
349    }
350
351    #[inline]
352    /// Checks if `self` belongs to `semaphore`.
353    pub fn belongs_to(&self, semaphore: &Semaphore<P, Q>) -> bool {
354        core::ptr::eq(self.0.semaphore, semaphore)
355    }
356
357    /// Split into multiple permit guards / holder.  The new guard will have the same priority for
358    /// evictions.
359    ///
360    /// `P` must implement [Clone] for this to succeed. If `P` is not [Clone],
361    /// [split_with_priority](Self::split_with_priority) may be used instead.
362    ///
363    #[inline]
364    pub fn split(&mut self, count: usize) -> Result<Self, InsufficientPermitsError>
365    where
366        P: Clone,
367    {
368        assert!(
369            count > 0,
370            "count must be greater than zero, received {count}"
371        );
372        let mut sem = self.0.semaphore.0.write();
373
374        let mut priority: Option<P> = None;
375        let mut avail = 0;
376
377        sem.queue.update_node(&self.0.handle, |node| {
378            avail = node.count();
379            // PERF: Try making this likely(avail > count) once likely_unlikely is stable
380            if avail > count {
381                node.count -= count;
382                priority = Some(node.priority.clone());
383            }
384
385            false
386        });
387
388        if priority.is_none() {
389            return Err(InsufficientPermitsError {
390                total: avail,
391                requested: count,
392            });
393        }
394
395        let handle = sem.queue.enqueue(SemaphoreWaiter {
396            priority: priority.unwrap(),
397            waiter: Waiter::new(true),
398            count,
399        });
400
401        Ok(SemaphorePermitWaiter {
402            semaphore: self.0.semaphore,
403            handle: ManuallyDrop::new(handle),
404        }
405        .into())
406    }
407
408    pub fn split_with_priority(
409        &mut self,
410        count: usize,
411        priority: P,
412    ) -> Result<Self, InsufficientPermitsError> {
413        assert!(
414            count > 0,
415            "count must be greater than zero, received {count}"
416        );
417        let mut sem = self.0.semaphore.0.write();
418
419        let mut avail = 0;
420        let mut has_capacity = false;
421
422        sem.queue.update_node(&self.0.handle, |node| {
423            avail = node.count();
424            // PERF: Try making this likely(avail > count) once likely_unlikely is stable
425            if avail > count {
426                node.count -= count;
427                has_capacity = true
428            }
429
430            false
431        });
432
433        if !has_capacity {
434            return Err(InsufficientPermitsError {
435                total: avail,
436                requested: count,
437            });
438        }
439
440        let handle = sem.queue.enqueue(SemaphoreWaiter {
441            priority: priority.into(),
442            waiter: Waiter::new(true),
443            count,
444        });
445
446        Ok(SemaphorePermitWaiter {
447            semaphore: self.0.semaphore,
448            handle: ManuallyDrop::new(handle),
449        }
450        .into())
451    }
452
453    #[inline]
454    /// Merge another permit into this one. Returns an error if the other node belongs to another
455    /// semaphore
456    ///
457    /// Panics if the sum of their permits would exceed the max permit count
458    pub fn merge(&mut self, mut other: Self) -> Result<(), ()> {
459        if &raw const *self.0.semaphore != other.0.semaphore {
460            return Err(());
461        }
462
463        let mut sem = self.0.semaphore.0.write();
464
465        let other_count = sem.queue.get_by_handle(&other.0.handle).count();
466
467        let mut would_overflow = false;
468        sem.queue.update_node(&self.0.handle, |node| {
469            would_overflow = node.count() + other_count > MAX_PERMITS;
470            if !would_overflow {
471                node.count += other_count
472            }
473
474            false
475        });
476
477        if would_overflow {
478            return Err(());
479        }
480
481        let other_handle = unsafe { ManuallyDrop::take(&mut other.0.handle) };
482        core::mem::forget(other);
483
484        sem.queue.dequeue(other_handle);
485
486        Ok(())
487    }
488}
489
490impl<'a, P: Priority, Q: SemaphoreQueue<P>> Drop for SemaphorePermit<'a, P, Q> {
491    #[inline]
492    /// Releases the permits back to the semaphore.
493    fn drop(&mut self) {
494        let mut sem = self.0.semaphore.0.write();
495
496        let handle = unsafe { ManuallyDrop::take(&mut self.0.handle) };
497        let count = sem.queue.get_by_handle(&handle).count();
498        let (_, next) = sem.queue.dequeue(handle);
499        if cfg!(feature = "semaphore-total") {
500            sem.available += count;
501        } else {
502            // in terms of panic safety: it's fine to panic here and not activate waiters as we'd
503            // fail to add new permits anyways.
504            sem.available = match sem.available.checked_add(count) {
505                Some(x) => x,
506                None => {
507                    let avail = sem.available;
508                    // drop guard to avoid poisoning it (for std locks)
509                    drop(sem);
510                    core::panic!(
511                        "failed to release {} permits back to semaphore as that would overflow (current available: {})",
512                        count,
513                        avail
514                    );
515                }
516            }
517        }
518
519        sem.activate_waiters(next);
520    }
521}
522
523impl<'a, P: Priority, Q: SemaphoreQueue<P>> From<SemaphorePermitWaiter<'a, P, Q>>
524    for SemaphorePermit<'a, P, Q>
525{
526    #[inline(always)]
527    fn from(value: SemaphorePermitWaiter<'a, P, Q>) -> Self {
528        Self(ManuallyDrop::new(value))
529    }
530}
531
532impl<'a, P: Priority, Q: SemaphoreQueue<P>> Debug for SemaphorePermit<'a, P, Q> {
533    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
534        f.debug_struct("SemaphorePermit")
535            .field("permits", &self.permits())
536            .finish()
537    }
538}
539
540/// An async semaphore where queued waiters are granted permits by order of priority.
541///
542/// With the `evict` flag enabled, existing handles will be requested to return their permits back
543/// to the queue (which may be echecked / waited for via [evicted](SemaphorePermit::evicted))
544///
545/// Note that to minimize waiting, requesters with the same priority will additionally be sorted by
546/// permit count (lowest first).  If this isn't desireable, use a [Priority] wraper like
547/// [FIFO] or [LIFO].  This makes it so requesters won't be blocked by larger requests with
548/// the same priority.
549///
550/// The default queue used may change, however its characteristics will remain the same, notably:
551/// - If P is [`Send`]` + `[`Sync`] the queue will always be [`Send`]` + `[`Sync`]
552#[derive(Default)]
553pub struct Semaphore<
554    P: Priority,
555    #[cfg(any(feature = "arena-queue", feature = "box-queue"))] Q: SemaphoreQueue<P> = DefaultSemaphoreQueue<P>,
556    #[cfg(not(any(feature = "arena-queue", feature = "box-queue")))] Q: SemaphoreQueue<P>,
557>(RwLock<SemaphoreInner<P, Q>>);
558
559unsafe impl<P: Priority, Q: SemaphoreQueue<P> + Send + Sync> Sync for Semaphore<P, Q> {}
560unsafe impl<P: Priority, Q: SemaphoreQueue<P> + Send> Send for Semaphore<P, Q> {}
561
562/// The maximum amount of permits a single holder can have.  This is `usize::MAX >> 1`.
563pub const MAX_PERMITS: usize = usize::MAX >> 1;
564/// A bit that is set in the count for permits if the `semaphore-total` flag is enabled and the
565/// waiter should be evicted when if the total permit count for the semaphore is reduced to below
566/// the requested permit count.
567// this is the same as !MAX_PERMITS, just a bit more verbose.
568const WITHIN_TOTAL_BIT: usize = 1 << (usize::BITS - 1);
569
570/// Error returned when we lack sufficient permits to perform an operation.
571///
572/// - With the `semaphore-total` flag: [Semaphore] lacks sufficient permits when using
573/// [acquire_within_total](Semaphore::acquire_within_total)
574///
575/// - When more permits were requested via [SemaphorePermit::split].than the permit contains.
576///
577/// Contains the requested count of permits via [requested](Self::requested) and optionally the total
578/// available when the request failed via [total](Self::total).
579///
580/// Note that [total](Self::total) may be [None] if the requester was evicted after being queued due to
581/// insufficient capacity (at which point, we don't know what the total permits for the queue was)
582///
583#[derive(Debug, Clone, Copy, PartialEq, Eq)]
584pub struct InsufficientPermitsError {
585    // PERF:
586    // Ideally we could throw this in an option with a sentinel value and have it retain the size
587    // of a usize but this doesn't seem to be possible yet unless the sentinel is zero.
588    // (and in our case, a total of zero is very much so valid)
589    // Note that while in both contexts this is used, usize::MAX is actually a valid value -
590    // but if we had a total of usize::MAX then requested could not be > total.
591    total: usize,
592    requested: usize,
593}
594
595impl InsufficientPermitsError {
596    /// The total amount of permits which were available when the waiter was rejected
597    /// (or None if we don't know)
598    #[inline(always)]
599    pub fn total(&self) -> Option<usize> {
600        (self.total != usize::MAX).then_some(self.total)
601    }
602
603    /// The amount of permits requested
604    #[inline(always)]
605    pub fn requested(&self) -> usize {
606        self.requested
607    }
608}
609
610impl Display for InsufficientPermitsError {
611    #[inline]
612    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
613        if self.total == usize::MAX {
614            write!(
615                f,
616                "semaphoer lacks sufficient permits: want {}",
617                self.requested
618            )
619        } else {
620            write!(
621                f,
622                "insufficient total permits: have {} want {}",
623                self.total, self.requested
624            )
625        }
626    }
627}
628
629impl Error for InsufficientPermitsError {}
630
631impl<P: Priority, Q: SemaphoreQueue<P>> Debug for Semaphore<P, Q> {
632    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
633        let sem = self.0.read();
634        let mut dbg = f.debug_struct("Semaphore");
635        dbg.field("available", &sem.available);
636
637        #[cfg(feature = "semaphore-total")]
638        dbg.field("total", &sem.total);
639        dbg.finish()
640    }
641}
642
643impl<P: Priority, Q: SemaphoreQueue<P>> Semaphore<P, Q> {
644    #[inline]
645    /// Create a new semaphore with `permits` permits available.
646    pub fn new(capacity: usize) -> Self {
647        Self(RwLock::new(SemaphoreInner {
648            queue: Q::default(),
649            #[cfg(feature = "semaphore-total")]
650            total: capacity,
651            available: capacity,
652            _phantom: PhantomData,
653        }))
654    }
655
656    #[cfg(feature = "const-default")]
657    /// Create a new semaphore with `permits` permits available.  Usable for const if the
658    /// underlying queue is [ConstDefault] and the `const-default` feature is enabled.
659    ///
660    /// All builtin queues impl [ConstDefault].
661    pub const fn const_new(permits: usize) -> Self
662    where
663        Q: ConstDefault,
664    {
665        Self(RwLock::new(SemaphoreInner {
666            queue: Q::DEFAULT,
667            #[cfg(feature = "semaphore-total")]
668            total: permits,
669            available: permits,
670            _phantom: PhantomData,
671        }))
672    }
673
674    #[inline]
675    fn do_acquire(
676        &self,
677        inner: &mut SemaphoreInner<P, Q>,
678        priority: P,
679        count: usize,
680    ) -> (SemaphorePermitWaiter<'_, P, Q>, bool) {
681        let has_available = inner.available >= (count & !WITHIN_TOTAL_BIT);
682        let will_acquire = has_available
683            && inner
684                .queue
685                .iter()
686                // skip to first waiter which doesn't hold permits
687                .skip_while(|x| {
688                    let flags = x.waiter.flags();
689                    if flags & waiter::WAITER_FLAG_HAS_LOCK != 0 {
690                        #[cfg(feature = "evict")]
691                        if priority.compare(&x.priority).is_gt() {
692                            x.waiter.evict();
693                        }
694                        return true;
695                    }
696
697                    // also skip any evicted flags (as we don't need to wait for those to be
698                    // removed from the queue)
699                    cfg!(feature = "semaphore-total")
700                        && flags & waiter::WAITER_FLAG_WANTS_EVICT == 0
701                })
702                .next()
703                // ge is fine here - if priority is equal, then order doesn't matter; thus we
704                // should place first since we can immediately take the permits this way
705                .is_none_or(|first_pending| priority.compare(&first_pending.priority).is_ge());
706
707        let handle = inner.queue.enqueue(SemaphoreWaiter {
708            priority,
709            waiter: Waiter::new(will_acquire),
710            count,
711        });
712
713        let guard = SemaphorePermitWaiter {
714            semaphore: self,
715            handle: ManuallyDrop::new(handle),
716        };
717
718        if will_acquire {
719            inner.available -= count & !WITHIN_TOTAL_BIT;
720        }
721
722        #[cfg(feature = "evict")]
723        // if we had available, then we would have already marked the pieces that need to be
724        // evicted as such
725        if !has_available {
726            let node = inner.queue.get_by_handle(&guard.handle);
727            for ex in inner.queue.iter() {
728                if ex.waiter.has_lock() {
729                    // break;
730
731                    if node.priority.compare(&ex.priority).is_gt() {
732                        ex.waiter.evict();
733                    }
734                }
735            }
736        }
737
738        return (guard, will_acquire);
739    }
740
741    /// Acquire a single permit, waiting if necessary.  Permits will be issued by order of
742    /// priority.
743    #[inline]
744    pub fn acquire(&self, priority: P) -> impl Future<Output = SemaphorePermit<'_, P, Q>> {
745        self.acquire_many(priority, 1)
746    }
747
748    /// Acquire a single permit, waiting if necessary.
749    ///
750    /// Shorthand for [`acquire`][Self::acquire]`(priority.into())`.
751    ///
752    /// Cancel Safety: This function is cancel safe.
753    #[inline(always)]
754    pub fn acquire_from(
755        &self,
756        priority: impl Into<P>,
757    ) -> impl Future<Output = SemaphorePermit<'_, P, Q>> {
758        self.acquire(priority.into())
759    }
760
761    #[inline]
762    /// Acquire a permit with the [Default] priority.
763    ///
764    /// Shorthand for [acquire](Self::acquire) with [`Default::default()`].
765    ///
766    /// Cancel Safety: This function is cancel safe.
767    pub fn acquire_default(&self) -> impl Future<Output = SemaphorePermit<'_, P, Q>>
768    where
769        P: Default,
770    {
771        self.acquire_many(Default::default(), 1)
772    }
773
774    /// Acquire multiple permits, waiting if necessary.  Permits will be issued by order of
775    /// priority.  Lower priority waiters will be blocked until all requested permits are acquired
776    /// (and subsequently released).
777    ///
778    /// Panics if `count >= MAX_PERMITS` (`usize::MAX >> 1`).
779    ///
780    /// Cancel Safety: This function is cancel safe.
781    pub async fn acquire_many(&self, priority: P, count: usize) -> SemaphorePermit<'_, P, Q> {
782        assert!(
783            count < MAX_PERMITS,
784            "count for a single holder must be less than {} and not zero (received {})",
785            MAX_PERMITS,
786            count
787        );
788        // need to be a bit explicit here since rust won't realize that we dropped guard otherwise
789        let guard = {
790            let mut inner = self.0.write();
791            let (guard, did_acquire) = self.do_acquire(&mut inner, priority.into(), count);
792
793            if did_acquire {
794                return guard.into();
795            }
796            guard
797        };
798
799        WaiterFlagFut::<_, { waiter::WAITER_FLAG_HAS_LOCK }>::new(&guard).await;
800
801        guard.into()
802    }
803
804    /// Acquire multiple permits with the [Default] priority.
805    ///
806    /// Panics if `count >= MAX_PERMITS` (`usize::MAX >> 1`).
807    ///
808    /// Shorthand for [acquire_many](Self::acquire_many) with [`Default::default()`].
809    ///
810    /// Cancel Safety: This function is cancel safe.
811    #[inline(always)]
812    pub async fn acquire_many_default(
813        &self,
814        count: usize,
815    ) -> impl Future<Output = SemaphorePermit<'_, P, Q>>
816    where
817        P: Default,
818    {
819        self.acquire_many(Default::default(), count)
820    }
821
822    /// Acquire multiple permits with the .
823    ///
824    /// Panics if `count >= MAX_PERMITS` (`usize::MAX >> 1`).
825    ///
826    /// Shorthand for [acquire_many](Self::acquire_many) with `priority.into()`.
827    ///
828    /// Cancel Safety: This function is cancel safe.
829    #[inline(always)]
830    pub async fn acquire_many_from(
831        &self,
832        count: usize,
833        priority: impl Into<P>,
834    ) -> impl Future<Output = SemaphorePermit<'_, P, Q>>
835    where
836        P: Default,
837    {
838        self.acquire_many(priority.into(), count)
839    }
840
841    #[cfg(feature = "semaphore-total")]
842    /// Acquire `count` permits without blocking the queue if the requested count of permits is
843    /// more than the total associated.  If the semaphore lacks sufficient associated permits or loses
844    /// them while waiting, this returns an InsufficientPermitsError.
845    ///
846    /// Example:
847    /// ```rust
848    /// let sem = Semaphore::<usize>::new(10);
849    ///
850    /// let permit = sem.acquire(0).await;
851    /// // try to acquire 10 permits
852    /// let mut many_permits_fut = pin!(sem.acquire_within_total(1, 10));
853    /// tokio::select! {
854    ///     _ = tokio::time::sleep(Duration::from_secs(1)) => {},
855    ///     // can't happen
856    ///     _ =  many_permits_fut.as_mut() => { panic!("total of 11 tokens held")},
857    /// };
858    ///
859    /// // remove 1 permit from the semaphore
860    /// permit.forget();
861    ///
862    /// assert!(many_permits_fut.await.is_err());
863    /// ```
864    ///
865    /// Cancel Safety: This function is cancel safe.
866    pub async fn acquire_within_total(
867        &self,
868        priority: P,
869        count: usize,
870    ) -> Result<SemaphorePermit<'_, P, Q>, InsufficientPermitsError> {
871        assert!(
872            count < MAX_PERMITS,
873            "count for a single holder must be less than {} and not zero (received {})",
874            MAX_PERMITS,
875            count
876        );
877        let guard = {
878            let mut inner = self.0.write();
879            if inner.total < count {
880                return Err(InsufficientPermitsError {
881                    total: inner.total,
882                    requested: count,
883                });
884            }
885
886            let (guard, did_acquire) =
887                self.do_acquire(&mut inner, priority.into(), count | WITHIN_TOTAL_BIT);
888
889            if did_acquire {
890                return Ok(guard.into());
891            }
892
893            guard
894        };
895
896        let flags = WaiterFlagFut::<
897            _,
898            { waiter::WAITER_FLAG_HAS_LOCK | waiter::WAITER_FLAG_WANTS_EVICT },
899        >::new(&guard)
900        .await;
901
902        if flags & waiter::WAITER_FLAG_HAS_LOCK == 0 {
903            // we were evicted before we got the lock (due to running out of capacity)
904            // So we need to return an error
905            return Err(InsufficientPermitsError {
906                total: usize::MAX,
907                requested: count,
908            });
909        }
910
911        // Since we hold no locks, it's possible that we've been evicted at this point - but since
912        // we already have the permits, that would mean it was evicted afterwards (so it's up to
913        // receiver to decide whether to check for eviction)
914        Ok(guard.into())
915    }
916
917    #[inline]
918    /// Add permits to the semaphore.
919    ///
920    /// Panics if:
921    /// - The count of permits to add would overflow available
922    /// - (if the `semaphore-total` feature is enabled) The count of permits plus the current total
923    ///   would overflow.
924    ///
925    /// Use [try_add_permits][Self::try_add_permits] if overflows may occur.
926    ///
927    /// Note that if the `semaphore-total` feature isn't enabled, the sum of permits issued + permits
928    /// available can exceed the max usize and overflow.  If an owned permit is dropped and would cause an
929    /// overflow when adding held permits back to available, it will panic.  The semaphore itself
930    /// will remain usable, however the permits will be discarded.
931    ///
932    /// For compatibility, it's best to assume that it's unsafe to exceed `usize::MAX` permits
933    /// associated with a semaphore, even if the `semaphore-total` flag isn't enabled (as other
934    /// crates may require it) (associated permi.ts being available + sum of permits issued)
935    pub fn add_permits(&self, count: usize) -> usize {
936        self.try_add_permits(count).expect("must add permits")
937    }
938
939    #[cfg(feature = "semaphore-total")]
940    #[inline]
941    /// Returns the capacity / total count of permits associated with the semaphore..
942    pub fn total_permits(&self) -> usize {
943        self.0.read().total
944    }
945
946    /// Returns the amount of currently available permits.
947    #[inline]
948    pub fn available_permits(&self) -> usize {
949        self.0.read().available
950    }
951
952    #[inline]
953    /// Try adding permits, returning an error if adding the permits would overflow. See the notes
954    /// on [add_permits](Self::add_permits) for more details.
955    pub fn try_add_permits(&self, count: usize) -> Result<usize, ()> {
956        let mut inner = self.0.write();
957
958        #[cfg(feature = "semaphore-total")]
959        {
960            inner.total = inner.total.checked_add(count).ok_or(())?;
961            // available must be <= total
962            inner.available += count
963        }
964        #[cfg(not(feature = "semaphore-total"))]
965        {
966            inner.available = inner.available.checked_add(count).ok_or(())?;
967        }
968
969        let head = inner.queue.head_handle();
970        inner.activate_waiters(head);
971
972        Ok(inner.available)
973    }
974
975    /// Forget up to count permits, up to the currently available amount.  Returns the amount of
976    /// permits forgotten.
977    ///
978    /// If `n` permits *need* to be removed, calling `acquire_many` with the highest priority then
979    /// calling forget on the returned guard may be a better choice.
980    ///
981    /// (if the `semaphore-total` flag is enabled, `acquire_within_total` may be a safer choice
982    /// than `acquire_many` - if it's possible for any permits to be forgotten elsewhere)
983    #[inline]
984    pub fn forget_permits(&self, mut count: usize) -> usize {
985        let mut inner = self.0.write();
986
987        count = count.min(inner.available);
988
989        inner.available -= count;
990
991        #[cfg(feature = "semaphore-total")]
992        if count != 0 {
993            unsafe { core::hint::assert_unchecked(inner.total >= count) };
994            inner.total -= count;
995
996            inner.downgrade().notify_oversized_waiters(None);
997        }
998
999        count
1000    }
1001}
1002
1003#[cfg(feature = "const-default")]
1004impl<P: Priority, Q: ConstDefault + SemaphoreQueue<P>> ConstDefault for Semaphore<P, Q> {
1005    const DEFAULT: Self = Self(RwLock::new(ConstDefault::DEFAULT));
1006}