priority_semaphore/
semaphore.rs1use crate::lock::Lock;
4use crate::{error::*, permit::Permit, queue::WaitQueue, waiter::AcquireFuture};
5use alloc::sync::Arc;
6use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
7
8pub type Priority = i32;
12
13pub struct PrioritySemaphore {
15 permits: AtomicUsize,
16 pub(crate) waiters: Lock<WaitQueue>,
17 max_permit: usize,
18 pub(crate) closed: AtomicBool,
19}
20
21impl core::fmt::Debug for PrioritySemaphore {
22 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
23 f.debug_struct("PrioritySemaphore")
24 .field("available", &self.available_permits())
25 .field("queued", &self.queued())
26 .field("closed", &self.closed.load(Ordering::Acquire))
27 .finish()
28 }
29}
30
31impl PrioritySemaphore {
32 pub const fn new(permits: usize) -> Self {
34 Self {
35 permits: AtomicUsize::new(permits),
36 waiters: Lock::new(WaitQueue::new()),
37 max_permit: permits,
38 closed: AtomicBool::new(false),
39 }
40 }
41
42 pub fn acquire(self: &Arc<Self>, prio: Priority) -> AcquireFuture {
44 AcquireFuture {
45 root: self.clone(),
46 prio,
47 in_queue: false,
48 wait_id: None,
49 }
50 }
51
52 pub fn try_acquire(self: &Arc<Self>, _prio: Priority) -> Result<Permit, TryAcquireError> {
54 if self.closed.load(Ordering::Acquire) {
55 return Err(TryAcquireError::Closed);
56 }
57 let mut curr = self.permits.load(Ordering::Acquire);
58 loop {
59 if curr == 0 {
60 return Err(TryAcquireError::NoPermits);
61 }
62 match self.permits.compare_exchange_weak(
63 curr,
64 curr - 1,
65 Ordering::AcqRel,
66 Ordering::Acquire,
67 ) {
68 Ok(_) => return Ok(Permit::new(self.clone())),
69 Err(actual) => curr = actual,
70 }
71 }
72 }
73
74 pub fn close(&self) {
76 if self.closed.swap(true, Ordering::AcqRel) {
77 return;
78 }
79 let mut waiters = self.waiters.lock();
80 while let Some(entry) = waiters.pop() {
81 entry.waker.wake();
82 }
83 }
84
85 pub fn available_permits(&self) -> usize {
87 self.permits.load(Ordering::Acquire)
88 }
89
90 pub fn queued(&self) -> usize {
92 self.waiters.lock().len()
93 }
94
95 pub(crate) fn dispatch_next(&self) {
97 let closed = self.closed.load(Ordering::Acquire);
98 let mut waiters = self.waiters.lock();
99
100 if !closed {
101 if let Some(entry) = waiters.pop() {
102 entry.waker.wake();
103 return;
104 }
105 }
106
107 let prev = self.permits.fetch_add(1, Ordering::AcqRel);
108 if prev >= self.max_permit {
109 self.permits.store(self.max_permit, Ordering::Release);
110 }
111 }
112
113 pub(crate) fn remove_waiter(&self, id: usize) {
114 let mut waiters = self.waiters.lock();
115 waiters.remove(id);
116 }
117}