priority_semaphore/
semaphore.rs

1//! Core implementation of [`PrioritySemaphore`].
2
3use crate::lock::Lock;
4use crate::{error::*, permit::Permit, queue::WaitQueue, waiter::AcquireFuture};
5use alloc::sync::Arc;
6use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
7
8/// Priority value used by the semaphore.
9///
10/// Larger numbers represent higher priority.
11pub type Priority = i32;
12
13/// Async-aware priority semaphore.
14pub struct PrioritySemaphore {
15    permits: AtomicUsize,
16    pub(crate) waiters: Lock<WaitQueue>,
17    max_permit: usize,
18    pub(crate) closed: AtomicBool,
19}
20
21impl core::fmt::Debug for PrioritySemaphore {
22    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
23        f.debug_struct("PrioritySemaphore")
24            .field("available", &self.available_permits())
25            .field("queued", &self.queued())
26            .field("closed", &self.closed.load(Ordering::Acquire))
27            .finish()
28    }
29}
30
31impl PrioritySemaphore {
32    /// Create new semaphore with given maximum concurrent permits.
33    pub const fn new(permits: usize) -> Self {
34        Self {
35            permits: AtomicUsize::new(permits),
36            waiters: Lock::new(WaitQueue::new()),
37            max_permit: permits,
38            closed: AtomicBool::new(false),
39        }
40    }
41
42    /// Async acquire (cancellation-safe).
43    pub fn acquire(self: &Arc<Self>, prio: Priority) -> AcquireFuture {
44        AcquireFuture {
45            root: self.clone(),
46            prio,
47            in_queue: false,
48            wait_id: None,
49        }
50    }
51
52    /// Try immediate acquire.
53    pub fn try_acquire(self: &Arc<Self>, _prio: Priority) -> Result<Permit, TryAcquireError> {
54        if self.closed.load(Ordering::Acquire) {
55            return Err(TryAcquireError::Closed);
56        }
57        let mut curr = self.permits.load(Ordering::Acquire);
58        loop {
59            if curr == 0 {
60                return Err(TryAcquireError::NoPermits);
61            }
62            match self.permits.compare_exchange_weak(
63                curr,
64                curr - 1,
65                Ordering::AcqRel,
66                Ordering::Acquire,
67            ) {
68                Ok(_) => return Ok(Permit::new(self.clone())),
69                Err(actual) => curr = actual,
70            }
71        }
72    }
73
74    /// Close the semaphore: further acquires fail.
75    pub fn close(&self) {
76        if self.closed.swap(true, Ordering::AcqRel) {
77            return;
78        }
79        let mut waiters = self.waiters.lock();
80        while let Some(entry) = waiters.pop() {
81            entry.waker.wake();
82        }
83    }
84
85    /// Number of currently available permits.
86    pub fn available_permits(&self) -> usize {
87        self.permits.load(Ordering::Acquire)
88    }
89
90    /// Number of tasks waiting in the queue.
91    pub fn queued(&self) -> usize {
92        self.waiters.lock().len()
93    }
94
95    /// (internal) Called when a permit is returned.
96    pub(crate) fn dispatch_next(&self) {
97        let closed = self.closed.load(Ordering::Acquire);
98        let mut waiters = self.waiters.lock();
99
100        if !closed {
101            if let Some(entry) = waiters.pop() {
102                entry.waker.wake();
103                return;
104            }
105        }
106
107        let prev = self.permits.fetch_add(1, Ordering::AcqRel);
108        if prev >= self.max_permit {
109            self.permits.store(self.max_permit, Ordering::Release);
110        }
111    }
112
113    pub(crate) fn remove_waiter(&self, id: usize) {
114        let mut waiters = self.waiters.lock();
115        waiters.remove(id);
116    }
117}