ranked_semaphore/semaphore/
core.rs

1use crate::config::{PriorityConfig, QueueStrategy};
2use crate::wait_queue::queue::WaitQueue;
3use std::sync::atomic::{AtomicUsize, Ordering};
4use std::sync::Mutex;
5
6/// A priority-aware semaphore for controlling access to shared resources.
7///
8/// RankedSemaphore maintains a count of available permits and allows tasks to
9/// acquire permits with different priorities. Higher priority tasks are served
10/// before lower priority tasks when permits become available.
11///
12/// The semaphore supports:
13/// - Priority-based scheduling with configurable queue strategies
14/// - Multiple permit acquisition
15/// - Both borrowed and owned permit types
16/// - Graceful shutdown via the `close()` method
17///
18/// # Examples
19///
20/// ```rust
21/// use ranked_semaphore::RankedSemaphore;
22/// use std::sync::Arc;
23///
24/// # #[tokio::main]
25/// # async fn main() {
26/// let sem = Arc::new(RankedSemaphore::new_fifo(2));
27///
28/// // High priority task
29/// let high_permit = sem.acquire_with_priority(10).await.unwrap();
30///
31/// // Low priority task will wait
32/// let low_permit = sem.acquire_with_priority(1).await.unwrap();
33/// # }
34/// ```
35#[derive(Debug)]
36pub struct RankedSemaphore {
37    /// Combined permit count and status flags using bit operations
38    /// Bit layout: [permit_count << 1 | closed_flag]
39    /// This optimization reduces atomic operations by 50% compared to separate fields
40    pub(crate) permits: AtomicUsize,
41    /// Wait queue (used only when needed)
42    pub(crate) waiters: Mutex<WaitQueue>,
43}
44
45impl RankedSemaphore {
46    /// Maximum permits (reserve 3 bits for flags, same as tokio)
47    pub const MAX_PERMITS: usize = usize::MAX >> 3;
48
49    /// Bit flag constants for state encoding (aligned with tokio)
50    pub(crate) const CLOSED: usize = 1;
51    pub(crate) const PERMIT_SHIFT: usize = 1;
52
53    /// Creates a new semaphore with FIFO (First In, First Out) queue strategy.
54    ///
55    /// All waiters regardless of priority will be served in the order they arrive.
56    /// This is the most common queue strategy for fair resource allocation.
57    ///
58    /// # Arguments
59    ///
60    /// * `permits` - The initial number of permits available
61    ///
62    /// # Panics
63    ///
64    /// Panics if `permits` exceeds `MAX_PERMITS` (usize::MAX >> 3).
65    ///
66    /// # Examples
67    ///
68    /// ```rust
69    /// use ranked_semaphore::RankedSemaphore;
70    ///
71    /// let sem = RankedSemaphore::new_fifo(3);
72    /// assert_eq!(sem.available_permits(), 3);
73    /// ```
74    pub fn new_fifo(permits: usize) -> Self {
75        if permits > Self::MAX_PERMITS {
76            panic!("permits exceed MAX_PERMITS");
77        }
78        Self::new(permits, QueueStrategy::Fifo)
79    }
80
81    /// Creates a new semaphore with LIFO (Last In, First Out) queue strategy.
82    ///
83    /// All waiters regardless of priority will be served in reverse order of arrival.
84    /// This can be useful for scenarios where you want to prioritize recently arrived tasks.
85    ///
86    /// # Arguments
87    ///
88    /// * `permits` - The initial number of permits available
89    ///
90    /// # Panics
91    ///
92    /// Panics if `permits` exceeds `MAX_PERMITS` (usize::MAX >> 3).
93    ///
94    /// # Examples
95    ///
96    /// ```rust
97    /// use ranked_semaphore::RankedSemaphore;
98    ///
99    /// let sem = RankedSemaphore::new_lifo(3);
100    /// assert_eq!(sem.available_permits(), 3);
101    /// ```
102    pub fn new_lifo(permits: usize) -> Self {
103        if permits > Self::MAX_PERMITS {
104            panic!("permits exceed MAX_PERMITS");
105        }
106        Self::new(permits, QueueStrategy::Lifo)
107    }
108
109    /// Creates a new semaphore with the specified default queue strategy.
110    ///
111    /// All waiters will use the specified queue strategy regardless of their priority.
112    /// For more fine-grained control over queue strategies per priority level,
113    /// use `new_with_config()`.
114    ///
115    /// # Arguments
116    ///
117    /// * `permits` - The initial number of permits available
118    /// * `default_strategy` - The queue strategy to use for all waiters
119    ///
120    /// # Panics
121    ///
122    /// Panics if `permits` exceeds `MAX_PERMITS` (usize::MAX >> 3).
123    ///
124    /// # Examples
125    ///
126    /// ```rust
127    /// use ranked_semaphore::{RankedSemaphore, QueueStrategy};
128    ///
129    /// let sem = RankedSemaphore::new(3, QueueStrategy::Fifo);
130    /// assert_eq!(sem.available_permits(), 3);
131    /// ```
132    pub fn new(permits: usize, default_strategy: QueueStrategy) -> Self {
133        if permits > Self::MAX_PERMITS {
134            panic!("permits exceed MAX_PERMITS");
135        }
136        let config = PriorityConfig::new().default_strategy(default_strategy);
137        Self::new_with_config(permits, config)
138    }
139
140    /// Creates a new semaphore with custom priority-based configuration.
141    ///
142    /// This allows fine-grained control over queue strategies for different
143    /// priority levels. Different priorities can use different queue strategies
144    /// (FIFO or LIFO) based on the configuration rules.
145    ///
146    /// # Arguments
147    ///
148    /// * `permits` - The initial number of permits available
149    /// * `config` - The priority configuration specifying queue strategies
150    ///
151    /// # Panics
152    ///
153    /// Panics if `permits` exceeds `MAX_PERMITS` (usize::MAX >> 3).
154    ///
155    /// # Examples
156    ///
157    /// ```rust
158    /// use ranked_semaphore::{RankedSemaphore, PriorityConfig, QueueStrategy};
159    ///
160    /// let config = PriorityConfig::new()
161    ///     .default_strategy(QueueStrategy::Fifo)
162    ///     .exact(10, QueueStrategy::Lifo); // Priority 10 uses LIFO
163    ///
164    /// let sem = RankedSemaphore::new_with_config(3, config);
165    /// assert_eq!(sem.available_permits(), 3);
166    /// ```
167    pub fn new_with_config(permits: usize, config: PriorityConfig) -> Self {
168        if permits > Self::MAX_PERMITS {
169            panic!("permits exceed MAX_PERMITS");
170        }
171        Self {
172            permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT),
173            waiters: Mutex::new(WaitQueue::new(config)),
174        }
175    }
176
177    /// Returns the current number of available permits.
178    ///
179    /// This value represents permits that can be acquired immediately without waiting.
180    /// Note that this value can change rapidly in concurrent environments.
181    ///
182    /// # Examples
183    ///
184    /// ```rust
185    /// use ranked_semaphore::RankedSemaphore;
186    ///
187    /// let sem = RankedSemaphore::new_fifo(3);
188    /// assert_eq!(sem.available_permits(), 3);
189    ///
190    /// let _permit = sem.try_acquire().unwrap();
191    /// assert_eq!(sem.available_permits(), 2);
192    /// ```
193    pub fn available_permits(&self) -> usize {
194        self.permits.load(Ordering::Acquire) >> Self::PERMIT_SHIFT
195    }
196
197    /// Returns `true` if the semaphore has been closed.
198    ///
199    /// A closed semaphore will not issue new permits and all pending
200    /// acquire operations will return `AcquireError::Closed`.
201    ///
202    /// # Examples
203    ///
204    /// ```rust
205    /// use ranked_semaphore::RankedSemaphore;
206    ///
207    /// let sem = RankedSemaphore::new_fifo(3);
208    /// assert!(!sem.is_closed());
209    ///
210    /// sem.close();
211    /// assert!(sem.is_closed());
212    /// ```
213    pub fn is_closed(&self) -> bool {
214        self.permits.load(Ordering::Acquire) & Self::CLOSED == Self::CLOSED
215    }
216
217    /// Adds permits to the semaphore and notifies waiting tasks.
218    ///
219    /// If there are tasks waiting for permits, they will be notified in priority order.
220    /// Any excess permits (beyond what waiting tasks need) are added to the
221    /// semaphore's available permit count.
222    ///
223    /// # Arguments
224    ///
225    /// * `added` - The number of permits to add
226    ///
227    /// # Examples
228    ///
229    /// ```rust
230    /// use ranked_semaphore::RankedSemaphore;
231    ///
232    /// let sem = RankedSemaphore::new_fifo(1);
233    /// let _permit = sem.try_acquire().unwrap();
234    /// assert_eq!(sem.available_permits(), 0);
235    ///
236    /// sem.add_permits(2);
237    /// assert_eq!(sem.available_permits(), 2);
238    /// ```
239    pub fn add_permits(&self, added: usize) {
240        if added == 0 {
241            return;
242        }
243
244        // Assign permits to the wait queue, following tokio's approach
245        self.add_permits_locked(added, self.waiters.lock().unwrap());
246    }
247
248    /// Add permits to the semaphore and wake eligible waiters.
249    ///
250    /// If `rem` exceeds the number of permits needed by waiting tasks, the
251    /// remainder are returned to the semaphore's available permit count.
252    ///
253    /// This implementation processes waiters in batches to minimize thundering herd
254    /// effects and reduce lock contention.
255    pub(crate) fn add_permits_locked(
256        &self,
257        mut rem: usize,
258        waiters: std::sync::MutexGuard<'_, crate::wait_queue::queue::WaitQueue>,
259    ) {
260        let mut lock = Some(waiters);
261
262        // Process waiters in batches like tokio to prevent thundering herd
263        while rem > 0 {
264            let mut waiters = lock.take().unwrap_or_else(|| self.waiters.lock().unwrap());
265
266            // Check if queue is empty
267            if waiters.is_empty() {
268                drop(waiters);
269                break;
270            }
271
272            // Select waiters to notify in this batch
273            let (wake_list, permits_assigned) = waiters.select_waiters_to_notify(rem);
274            rem -= permits_assigned;
275
276            // If we couldn't assign any permits, we're done
277            if permits_assigned == 0 || wake_list.is_empty() {
278                drop(waiters);
279                break;
280            }
281
282            // Drop the lock before waking to reduce lock contention
283            drop(waiters);
284
285            // **Critical check**: if the semaphore was closed while we didn't have the lock,
286            // we must not wake up the waiters. They will be woken by the `close()` call.
287            if self.is_closed() {
288                // Return the permits that were assigned to waiters back to the semaphore
289                // since we won't wake them due to the semaphore being closed.
290                self.permits
291                    .fetch_add(permits_assigned << Self::PERMIT_SHIFT, Ordering::Release);
292                return;
293            }
294
295            // Wake this batch of waiters
296            let mut wake_list = wake_list;
297            wake_list.wake_all();
298
299            // If WakeList was not at capacity, we processed all available waiters
300            if !wake_list.was_full() {
301                break;
302            }
303        }
304
305        // Add any remaining permits back to the semaphore
306        if rem > 0 {
307            let prev = self
308                .permits
309                .fetch_add(rem << Self::PERMIT_SHIFT, Ordering::Release);
310            let prev_permits = prev >> Self::PERMIT_SHIFT;
311
312            // Check for overflow after the operation
313            if prev_permits + rem > Self::MAX_PERMITS {
314                panic!(
315                    "number of added permits ({}) would overflow MAX_PERMITS ({})",
316                    rem,
317                    Self::MAX_PERMITS
318                );
319            }
320        }
321    }
322
323    /// Closes the semaphore and cancels all pending waiters.
324    ///
325    /// After calling this method:
326    /// - No new permits will be issued
327    /// - All pending `acquire` operations will return `AcquireError::Closed`
328    /// - All `try_acquire` operations will return `TryAcquireError::Closed`
329    ///
330    /// This operation is irreversible - once closed, a semaphore cannot be reopened.
331    ///
332    /// # Examples
333    ///
334    /// ```rust
335    /// use ranked_semaphore::RankedSemaphore;
336    ///
337    /// let sem = RankedSemaphore::new_fifo(3);
338    /// sem.close();
339    /// 
340    /// assert!(sem.is_closed());
341    /// assert!(sem.try_acquire().is_err());
342    /// ```
343    pub fn close(&self) {
344        self.permits.fetch_or(Self::CLOSED, Ordering::Release);
345
346        let mut waiters = self.waiters.lock().unwrap();
347        waiters.close();
348    }
349
350    /// Permanently removes permits from the semaphore.
351    ///
352    /// This operation reduces the number of available permits without releasing
353    /// them back to the semaphore. It's useful for scenarios where the resource
354    /// pool needs to be dynamically reduced.
355    ///
356    /// # Arguments
357    ///
358    /// * `n` - The number of permits to remove
359    ///
360    /// # Returns
361    ///
362    /// The number of permits that were actually removed. This may be less than
363    /// `n` if there were insufficient permits available.
364    ///
365    /// # Examples
366    ///
367    /// ```rust
368    /// use ranked_semaphore::RankedSemaphore;
369    ///
370    /// let sem = RankedSemaphore::new_fifo(5);
371    /// assert_eq!(sem.available_permits(), 5);
372    ///
373    /// let removed = sem.forget_permits(3);
374    /// assert_eq!(removed, 3);
375    /// assert_eq!(sem.available_permits(), 2);
376    ///
377    /// // Trying to remove more permits than available
378    /// let removed = sem.forget_permits(10);
379    /// assert_eq!(removed, 2); // Only 2 were available
380    /// assert_eq!(sem.available_permits(), 0);
381    /// ```
382    ///
383    /// # Warning
384    ///
385    /// Use this method with caution. Removing too many permits can lead to
386    /// resource starvation if tasks are waiting for permits that will never
387    /// become available.
388    pub fn forget_permits(&self, n: usize) -> usize {
389        if n == 0 {
390            return 0;
391        }
392
393        let mut curr_bits = self.permits.load(Ordering::Acquire);
394        loop {
395            let curr_permits = curr_bits >> Self::PERMIT_SHIFT;
396            let removed = curr_permits.min(n);
397            let new_permits = curr_permits - removed;
398            let new_bits = (new_permits << Self::PERMIT_SHIFT) | (curr_bits & Self::CLOSED);
399
400            match self.permits.compare_exchange_weak(
401                curr_bits,
402                new_bits,
403                Ordering::AcqRel,
404                Ordering::Acquire,
405            ) {
406                Ok(_) => return removed,
407                Err(actual) => curr_bits = actual,
408            }
409        }
410    }
411}