ranked_semaphore/semaphore/core.rs
1use crate::config::{PriorityConfig, QueueStrategy};
2use crate::wait_queue::queue::WaitQueue;
3use std::sync::atomic::{AtomicUsize, Ordering};
4use std::sync::Mutex;
5
6/// A priority-aware semaphore for controlling access to shared resources.
7///
8/// RankedSemaphore maintains a count of available permits and allows tasks to
9/// acquire permits with different priorities. Higher priority tasks are served
10/// before lower priority tasks when permits become available.
11///
12/// The semaphore supports:
13/// - Priority-based scheduling with configurable queue strategies
14/// - Multiple permit acquisition
15/// - Both borrowed and owned permit types
16/// - Graceful shutdown via the `close()` method
17///
18/// # Examples
19///
20/// ```rust
21/// use ranked_semaphore::RankedSemaphore;
22/// use std::sync::Arc;
23///
24/// # #[tokio::main]
25/// # async fn main() {
26/// let sem = Arc::new(RankedSemaphore::new_fifo(2));
27///
28/// // High priority task
29/// let high_permit = sem.acquire_with_priority(10).await.unwrap();
30///
31/// // Low priority task will wait
32/// let low_permit = sem.acquire_with_priority(1).await.unwrap();
33/// # }
34/// ```
35#[derive(Debug)]
36pub struct RankedSemaphore {
37 /// Combined permit count and status flags using bit operations
38 /// Bit layout: [permit_count << 1 | closed_flag]
39 /// This optimization reduces atomic operations by 50% compared to separate fields
40 pub(crate) permits: AtomicUsize,
41 /// Wait queue (used only when needed)
42 pub(crate) waiters: Mutex<WaitQueue>,
43}
44
45impl RankedSemaphore {
46 /// Maximum permits (reserve 3 bits for flags, same as tokio)
47 pub const MAX_PERMITS: usize = usize::MAX >> 3;
48
49 /// Bit flag constants for state encoding (aligned with tokio)
50 pub(crate) const CLOSED: usize = 1;
51 pub(crate) const PERMIT_SHIFT: usize = 1;
52
53 /// Creates a new semaphore with FIFO (First In, First Out) queue strategy.
54 ///
55 /// All waiters regardless of priority will be served in the order they arrive.
56 /// This is the most common queue strategy for fair resource allocation.
57 ///
58 /// # Arguments
59 ///
60 /// * `permits` - The initial number of permits available
61 ///
62 /// # Panics
63 ///
64 /// Panics if `permits` exceeds `MAX_PERMITS` (usize::MAX >> 3).
65 ///
66 /// # Examples
67 ///
68 /// ```rust
69 /// use ranked_semaphore::RankedSemaphore;
70 ///
71 /// let sem = RankedSemaphore::new_fifo(3);
72 /// assert_eq!(sem.available_permits(), 3);
73 /// ```
74 pub fn new_fifo(permits: usize) -> Self {
75 if permits > Self::MAX_PERMITS {
76 panic!("permits exceed MAX_PERMITS");
77 }
78 Self::new(permits, QueueStrategy::Fifo)
79 }
80
81 /// Creates a new semaphore with LIFO (Last In, First Out) queue strategy.
82 ///
83 /// All waiters regardless of priority will be served in reverse order of arrival.
84 /// This can be useful for scenarios where you want to prioritize recently arrived tasks.
85 ///
86 /// # Arguments
87 ///
88 /// * `permits` - The initial number of permits available
89 ///
90 /// # Panics
91 ///
92 /// Panics if `permits` exceeds `MAX_PERMITS` (usize::MAX >> 3).
93 ///
94 /// # Examples
95 ///
96 /// ```rust
97 /// use ranked_semaphore::RankedSemaphore;
98 ///
99 /// let sem = RankedSemaphore::new_lifo(3);
100 /// assert_eq!(sem.available_permits(), 3);
101 /// ```
102 pub fn new_lifo(permits: usize) -> Self {
103 if permits > Self::MAX_PERMITS {
104 panic!("permits exceed MAX_PERMITS");
105 }
106 Self::new(permits, QueueStrategy::Lifo)
107 }
108
109 /// Creates a new semaphore with the specified default queue strategy.
110 ///
111 /// All waiters will use the specified queue strategy regardless of their priority.
112 /// For more fine-grained control over queue strategies per priority level,
113 /// use `new_with_config()`.
114 ///
115 /// # Arguments
116 ///
117 /// * `permits` - The initial number of permits available
118 /// * `default_strategy` - The queue strategy to use for all waiters
119 ///
120 /// # Panics
121 ///
122 /// Panics if `permits` exceeds `MAX_PERMITS` (usize::MAX >> 3).
123 ///
124 /// # Examples
125 ///
126 /// ```rust
127 /// use ranked_semaphore::{RankedSemaphore, QueueStrategy};
128 ///
129 /// let sem = RankedSemaphore::new(3, QueueStrategy::Fifo);
130 /// assert_eq!(sem.available_permits(), 3);
131 /// ```
132 pub fn new(permits: usize, default_strategy: QueueStrategy) -> Self {
133 if permits > Self::MAX_PERMITS {
134 panic!("permits exceed MAX_PERMITS");
135 }
136 let config = PriorityConfig::new().default_strategy(default_strategy);
137 Self::new_with_config(permits, config)
138 }
139
140 /// Creates a new semaphore with custom priority-based configuration.
141 ///
142 /// This allows fine-grained control over queue strategies for different
143 /// priority levels. Different priorities can use different queue strategies
144 /// (FIFO or LIFO) based on the configuration rules.
145 ///
146 /// # Arguments
147 ///
148 /// * `permits` - The initial number of permits available
149 /// * `config` - The priority configuration specifying queue strategies
150 ///
151 /// # Panics
152 ///
153 /// Panics if `permits` exceeds `MAX_PERMITS` (usize::MAX >> 3).
154 ///
155 /// # Examples
156 ///
157 /// ```rust
158 /// use ranked_semaphore::{RankedSemaphore, PriorityConfig, QueueStrategy};
159 ///
160 /// let config = PriorityConfig::new()
161 /// .default_strategy(QueueStrategy::Fifo)
162 /// .exact(10, QueueStrategy::Lifo); // Priority 10 uses LIFO
163 ///
164 /// let sem = RankedSemaphore::new_with_config(3, config);
165 /// assert_eq!(sem.available_permits(), 3);
166 /// ```
167 pub fn new_with_config(permits: usize, config: PriorityConfig) -> Self {
168 if permits > Self::MAX_PERMITS {
169 panic!("permits exceed MAX_PERMITS");
170 }
171 Self {
172 permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT),
173 waiters: Mutex::new(WaitQueue::new(config)),
174 }
175 }
176
177 /// Returns the current number of available permits.
178 ///
179 /// This value represents permits that can be acquired immediately without waiting.
180 /// Note that this value can change rapidly in concurrent environments.
181 ///
182 /// # Examples
183 ///
184 /// ```rust
185 /// use ranked_semaphore::RankedSemaphore;
186 ///
187 /// let sem = RankedSemaphore::new_fifo(3);
188 /// assert_eq!(sem.available_permits(), 3);
189 ///
190 /// let _permit = sem.try_acquire().unwrap();
191 /// assert_eq!(sem.available_permits(), 2);
192 /// ```
193 pub fn available_permits(&self) -> usize {
194 self.permits.load(Ordering::Acquire) >> Self::PERMIT_SHIFT
195 }
196
197 /// Returns `true` if the semaphore has been closed.
198 ///
199 /// A closed semaphore will not issue new permits and all pending
200 /// acquire operations will return `AcquireError::Closed`.
201 ///
202 /// # Examples
203 ///
204 /// ```rust
205 /// use ranked_semaphore::RankedSemaphore;
206 ///
207 /// let sem = RankedSemaphore::new_fifo(3);
208 /// assert!(!sem.is_closed());
209 ///
210 /// sem.close();
211 /// assert!(sem.is_closed());
212 /// ```
213 pub fn is_closed(&self) -> bool {
214 self.permits.load(Ordering::Acquire) & Self::CLOSED == Self::CLOSED
215 }
216
217 /// Adds permits to the semaphore and notifies waiting tasks.
218 ///
219 /// If there are tasks waiting for permits, they will be notified in priority order.
220 /// Any excess permits (beyond what waiting tasks need) are added to the
221 /// semaphore's available permit count.
222 ///
223 /// # Arguments
224 ///
225 /// * `added` - The number of permits to add
226 ///
227 /// # Examples
228 ///
229 /// ```rust
230 /// use ranked_semaphore::RankedSemaphore;
231 ///
232 /// let sem = RankedSemaphore::new_fifo(1);
233 /// let _permit = sem.try_acquire().unwrap();
234 /// assert_eq!(sem.available_permits(), 0);
235 ///
236 /// sem.add_permits(2);
237 /// assert_eq!(sem.available_permits(), 2);
238 /// ```
239 pub fn add_permits(&self, added: usize) {
240 if added == 0 {
241 return;
242 }
243
244 // Assign permits to the wait queue, following tokio's approach
245 self.add_permits_locked(added, self.waiters.lock().unwrap());
246 }
247
248 /// Add permits to the semaphore and wake eligible waiters.
249 ///
250 /// If `rem` exceeds the number of permits needed by waiting tasks, the
251 /// remainder are returned to the semaphore's available permit count.
252 ///
253 /// This implementation processes waiters in batches to minimize thundering herd
254 /// effects and reduce lock contention.
255 pub(crate) fn add_permits_locked(
256 &self,
257 mut rem: usize,
258 waiters: std::sync::MutexGuard<'_, crate::wait_queue::queue::WaitQueue>,
259 ) {
260 let mut lock = Some(waiters);
261
262 // Process waiters in batches like tokio to prevent thundering herd
263 while rem > 0 {
264 let mut waiters = lock.take().unwrap_or_else(|| self.waiters.lock().unwrap());
265
266 // Check if queue is empty
267 if waiters.is_empty() {
268 drop(waiters);
269 break;
270 }
271
272 // Select waiters to notify in this batch
273 let (wake_list, permits_assigned) = waiters.select_waiters_to_notify(rem);
274 rem -= permits_assigned;
275
276 // If we couldn't assign any permits, we're done
277 if permits_assigned == 0 || wake_list.is_empty() {
278 drop(waiters);
279 break;
280 }
281
282 // Drop the lock before waking to reduce lock contention
283 drop(waiters);
284
285 // **Critical check**: if the semaphore was closed while we didn't have the lock,
286 // we must not wake up the waiters. They will be woken by the `close()` call.
287 if self.is_closed() {
288 // Return the permits that were assigned to waiters back to the semaphore
289 // since we won't wake them due to the semaphore being closed.
290 self.permits
291 .fetch_add(permits_assigned << Self::PERMIT_SHIFT, Ordering::Release);
292 return;
293 }
294
295 // Wake this batch of waiters
296 let mut wake_list = wake_list;
297 wake_list.wake_all();
298
299 // If WakeList was not at capacity, we processed all available waiters
300 if !wake_list.was_full() {
301 break;
302 }
303 }
304
305 // Add any remaining permits back to the semaphore
306 if rem > 0 {
307 let prev = self
308 .permits
309 .fetch_add(rem << Self::PERMIT_SHIFT, Ordering::Release);
310 let prev_permits = prev >> Self::PERMIT_SHIFT;
311
312 // Check for overflow after the operation
313 if prev_permits + rem > Self::MAX_PERMITS {
314 panic!(
315 "number of added permits ({}) would overflow MAX_PERMITS ({})",
316 rem,
317 Self::MAX_PERMITS
318 );
319 }
320 }
321 }
322
323 /// Closes the semaphore and cancels all pending waiters.
324 ///
325 /// After calling this method:
326 /// - No new permits will be issued
327 /// - All pending `acquire` operations will return `AcquireError::Closed`
328 /// - All `try_acquire` operations will return `TryAcquireError::Closed`
329 ///
330 /// This operation is irreversible - once closed, a semaphore cannot be reopened.
331 ///
332 /// # Examples
333 ///
334 /// ```rust
335 /// use ranked_semaphore::RankedSemaphore;
336 ///
337 /// let sem = RankedSemaphore::new_fifo(3);
338 /// sem.close();
339 ///
340 /// assert!(sem.is_closed());
341 /// assert!(sem.try_acquire().is_err());
342 /// ```
343 pub fn close(&self) {
344 self.permits.fetch_or(Self::CLOSED, Ordering::Release);
345
346 let mut waiters = self.waiters.lock().unwrap();
347 waiters.close();
348 }
349
350 /// Permanently removes permits from the semaphore.
351 ///
352 /// This operation reduces the number of available permits without releasing
353 /// them back to the semaphore. It's useful for scenarios where the resource
354 /// pool needs to be dynamically reduced.
355 ///
356 /// # Arguments
357 ///
358 /// * `n` - The number of permits to remove
359 ///
360 /// # Returns
361 ///
362 /// The number of permits that were actually removed. This may be less than
363 /// `n` if there were insufficient permits available.
364 ///
365 /// # Examples
366 ///
367 /// ```rust
368 /// use ranked_semaphore::RankedSemaphore;
369 ///
370 /// let sem = RankedSemaphore::new_fifo(5);
371 /// assert_eq!(sem.available_permits(), 5);
372 ///
373 /// let removed = sem.forget_permits(3);
374 /// assert_eq!(removed, 3);
375 /// assert_eq!(sem.available_permits(), 2);
376 ///
377 /// // Trying to remove more permits than available
378 /// let removed = sem.forget_permits(10);
379 /// assert_eq!(removed, 2); // Only 2 were available
380 /// assert_eq!(sem.available_permits(), 0);
381 /// ```
382 ///
383 /// # Warning
384 ///
385 /// Use this method with caution. Removing too many permits can lead to
386 /// resource starvation if tasks are waiting for permits that will never
387 /// become available.
388 pub fn forget_permits(&self, n: usize) -> usize {
389 if n == 0 {
390 return 0;
391 }
392
393 let mut curr_bits = self.permits.load(Ordering::Acquire);
394 loop {
395 let curr_permits = curr_bits >> Self::PERMIT_SHIFT;
396 let removed = curr_permits.min(n);
397 let new_permits = curr_permits - removed;
398 let new_bits = (new_permits << Self::PERMIT_SHIFT) | (curr_bits & Self::CLOSED);
399
400 match self.permits.compare_exchange_weak(
401 curr_bits,
402 new_bits,
403 Ordering::AcqRel,
404 Ordering::Acquire,
405 ) {
406 Ok(_) => return removed,
407 Err(actual) => curr_bits = actual,
408 }
409 }
410 }
411}