Skip to main content

tower_limit/concurrency/sync/
semaphore.rs

1#![allow(dead_code)]
2
3//! Thread-safe, asynchronous counting semaphore.
4//!
5//! A `Semaphore` instance holds a set of permits. Permits are used to
6//! synchronize access to a shared resource.
7//!
8//! Before accessing the shared resource, callers acquire a permit from the
9//! semaphore. Once the permit is acquired, the caller then enters the critical
10//! section. If no permits are available, then acquiring the semaphore returns
11//! `Pending`. The task is woken once a permit becomes available.
12
13use super::cell::CausalCell;
14use super::waker::AtomicWaker;
15use std::{
16    sync::atomic::{AtomicPtr, AtomicUsize},
17    thread,
18};
19
20use std::fmt;
21use std::ptr::{self, NonNull};
22use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Relaxed, Release};
23use std::sync::Arc;
24use std::task::Poll::{Pending, Ready};
25use std::task::{Context, Poll};
26use std::usize;
27
28/// Futures-aware semaphore.
29pub(crate) struct Semaphore {
30    /// Tracks both the waiter queue tail pointer and the number of remaining
31    /// permits.
32    state: AtomicUsize,
33
34    /// waiter queue head pointer.
35    head: CausalCell<NonNull<WaiterNode>>,
36
37    /// Coordinates access to the queue head.
38    rx_lock: AtomicUsize,
39
40    /// Stub waiter node used as part of the MPSC channel algorithm.
41    stub: Box<WaiterNode>,
42}
43
44/// A semaphore permit
45///
46/// Tracks the lifecycle of a semaphore permit.
47///
48/// An instance of `Permit` is intended to be used with a **single** instance of
49/// `Semaphore`. Using a single instance of `Permit` with multiple semaphore
50/// instances will result in unexpected behavior.
51///
52/// `Permit` does **not** release the permit back to the semaphore on drop. It
53/// is the user's responsibility to ensure that `Permit::release` is called
54/// before dropping the permit.
55#[derive(Debug)]
56pub(crate) struct Permit {
57    waiter: Option<Arc<WaiterNode>>,
58    state: PermitState,
59}
60
61/// Error returned by `Permit::poll_acquire`.
62#[derive(Debug)]
63pub(crate) struct AcquireError(());
64
65/// Error returned by `Permit::try_acquire`.
66#[derive(Debug)]
67pub(crate) struct TryAcquireError {
68    kind: ErrorKind,
69}
70
71#[derive(Debug)]
72enum ErrorKind {
73    Closed,
74    NoPermits,
75}
76
77/// Node used to notify the semaphore waiter when permit is available.
78#[derive(Debug)]
79struct WaiterNode {
80    /// Stores waiter state.
81    ///
82    /// See `NodeState` for more details.
83    state: AtomicUsize,
84
85    /// Task to wake when a permit is made available.
86    waker: AtomicWaker,
87
88    /// Next pointer in the queue of waiting senders.
89    next: AtomicPtr<WaiterNode>,
90}
91
92/// Semaphore state
93///
94/// The 2 low bits track the modes.
95///
96/// - Closed
97/// - Full
98///
99/// When not full, the rest of the `usize` tracks the total number of messages
100/// in the channel. When full, the rest of the `usize` is a pointer to the tail
101/// of the "waiting senders" queue.
102#[derive(Copy, Clone)]
103struct SemState(usize);
104
105/// Permit state
106#[derive(Debug, Copy, Clone, Eq, PartialEq)]
107enum PermitState {
108    /// The permit has not been requested.
109    Idle,
110
111    /// Currently waiting for a permit to be made available and assigned to the
112    /// waiter.
113    Waiting,
114
115    /// The permit has been acquired.
116    Acquired,
117}
118
119/// Waiter node state
120#[derive(Debug, Copy, Clone, Eq, PartialEq)]
121#[repr(usize)]
122enum NodeState {
123    /// Not waiting for a permit and the node is not in the wait queue.
124    ///
125    /// This is the initial state.
126    Idle = 0,
127
128    /// Not waiting for a permit but the node is in the wait queue.
129    ///
130    /// This happens when the waiter has previously requested a permit, but has
131    /// since canceled the request. The node cannot be removed by the waiter, so
132    /// this state informs the receiver to skip the node when it pops it from
133    /// the wait queue.
134    Queued = 1,
135
136    /// Waiting for a permit and the node is in the wait queue.
137    QueuedWaiting = 2,
138
139    /// The waiter has been assigned a permit and the node has been removed from
140    /// the queue.
141    Assigned = 3,
142
143    /// The semaphore has been closed. No more permits will be issued.
144    Closed = 4,
145}
146
147// ===== impl Semaphore =====
148
149impl Semaphore {
150    /// Creates a new semaphore with the initial number of permits
151    ///
152    /// # Panics
153    ///
154    /// Panics if `permits` is zero.
155    pub(crate) fn new(permits: usize) -> Semaphore {
156        let stub = Box::new(WaiterNode::new());
157        let ptr = NonNull::new(&*stub as *const _ as *mut _).unwrap();
158
159        // Allocations are aligned
160        debug_assert!(ptr.as_ptr() as usize & NUM_FLAG == 0);
161
162        let state = SemState::new(permits, &stub);
163
164        Semaphore {
165            state: AtomicUsize::new(state.to_usize()),
166            head: CausalCell::new(ptr),
167            rx_lock: AtomicUsize::new(0),
168            stub,
169        }
170    }
171
172    /// Returns the current number of available permits
173    pub(crate) fn available_permits(&self) -> usize {
174        let curr = SemState::load(&self.state, Acquire);
175        curr.available_permits()
176    }
177
178    /// Poll for a permit
179    fn poll_permit(
180        &self,
181        mut permit: Option<(&mut Context<'_>, &mut Permit)>,
182    ) -> Poll<Result<(), AcquireError>> {
183        // Load the current state
184        let mut curr = SemState::load(&self.state, Acquire);
185
186        // Tracks a *mut WaiterNode representing an Arc clone.
187        //
188        // This avoids having to bump the ref count unless required.
189        let mut maybe_strong: Option<NonNull<WaiterNode>> = None;
190
191        macro_rules! undo_strong {
192            () => {
193                if let Some(waiter) = maybe_strong {
194                    // The waiter was cloned, but never got queued.
195                    // Before entering `poll_permit`, the waiter was in the
196                    // `Idle` state. We must transition the node back to the
197                    // idle state.
198                    let waiter = unsafe { Arc::from_raw(waiter.as_ptr()) };
199                    waiter.revert_to_idle();
200                }
201            };
202        }
203
204        loop {
205            let mut next = curr;
206
207            if curr.is_closed() {
208                undo_strong!();
209                return Ready(Err(AcquireError::closed()));
210            }
211
212            if !next.acquire_permit(&self.stub) {
213                debug_assert!(curr.waiter().is_some());
214
215                if maybe_strong.is_none() {
216                    if let Some((ref mut cx, ref mut permit)) = permit {
217                        // Get the Sender's waiter node, or initialize one
218                        let waiter = permit
219                            .waiter
220                            .get_or_insert_with(|| Arc::new(WaiterNode::new()));
221
222                        waiter.register(cx);
223
224                        if !waiter.to_queued_waiting() {
225                            // The node is alrady queued, there is no further work
226                            // to do.
227                            return Pending;
228                        }
229
230                        maybe_strong = Some(WaiterNode::into_non_null(waiter.clone()));
231                    } else {
232                        // If no `waiter`, then the task is not registered and there
233                        // is no further work to do.
234                        return Pending;
235                    }
236                }
237
238                next.set_waiter(maybe_strong.unwrap());
239            }
240
241            debug_assert_ne!(curr.0, 0);
242            debug_assert_ne!(next.0, 0);
243
244            match next.compare_exchange(&self.state, curr, AcqRel, Acquire) {
245                Ok(_) => {
246                    match curr.waiter() {
247                        Some(prev_waiter) => {
248                            let waiter = maybe_strong.unwrap();
249
250                            // Finish pushing
251                            unsafe {
252                                prev_waiter.as_ref().next.store(waiter.as_ptr(), Release);
253                            }
254
255                            return Pending;
256                        }
257                        None => {
258                            undo_strong!();
259
260                            return Ready(Ok(()));
261                        }
262                    }
263                }
264                Err(actual) => {
265                    curr = actual;
266                }
267            }
268        }
269    }
270
271    /// Close the semaphore. This prevents the semaphore from issuing new
272    /// permits and notifies all pending waiters.
273    pub(crate) fn close(&self) {
274        // Acquire the `rx_lock`, setting the "closed" flag on the lock.
275        let prev = self.rx_lock.fetch_or(1, AcqRel);
276
277        if prev != 0 {
278            // Another thread has the lock and will be responsible for notifying
279            // pending waiters.
280            return;
281        }
282
283        self.add_permits_locked(0, true);
284    }
285
286    /// Add `n` new permits to the semaphore.
287    pub(crate) fn add_permits(&self, n: usize) {
288        if n == 0 {
289            return;
290        }
291
292        // TODO: Handle overflow. A panic is not sufficient, the process must
293        // abort.
294        let prev = self.rx_lock.fetch_add(n << 1, AcqRel);
295
296        if prev != 0 {
297            // Another thread has the lock and will be responsible for notifying
298            // pending waiters.
299            return;
300        }
301
302        self.add_permits_locked(n, false);
303    }
304
305    fn add_permits_locked(&self, mut rem: usize, mut closed: bool) {
306        while rem > 0 || closed {
307            if closed {
308                SemState::fetch_set_closed(&self.state, AcqRel);
309            }
310
311            // Release the permits and notify
312            self.add_permits_locked2(rem, closed);
313
314            let n = rem << 1;
315
316            let actual = if closed {
317                let actual = self.rx_lock.fetch_sub(n | 1, AcqRel);
318                closed = false;
319                actual
320            } else {
321                let actual = self.rx_lock.fetch_sub(n, AcqRel);
322                closed = actual & 1 == 1;
323                actual
324            };
325
326            rem = (actual >> 1) - rem;
327        }
328    }
329
330    /// Release a specific amount of permits to the semaphore
331    ///
332    /// This function is called by `add_permits` after the add lock has been
333    /// acquired.
334    fn add_permits_locked2(&self, mut n: usize, closed: bool) {
335        while n > 0 || closed {
336            let waiter = match self.pop(n, closed) {
337                Some(waiter) => waiter,
338                None => {
339                    return;
340                }
341            };
342
343            if waiter.notify(closed) {
344                n = n.saturating_sub(1);
345            }
346        }
347    }
348
349    /// Pop a waiter
350    ///
351    /// `rem` represents the remaining number of times the caller will pop. If
352    /// there are no more waiters to pop, `rem` is used to set the available
353    /// permits.
354    fn pop(&self, rem: usize, closed: bool) -> Option<Arc<WaiterNode>> {
355        'outer: loop {
356            unsafe {
357                let mut head = self.head.with(|head| *head);
358                let mut next_ptr = head.as_ref().next.load(Acquire);
359
360                let stub = self.stub();
361
362                if head == stub {
363                    let next = match NonNull::new(next_ptr) {
364                        Some(next) => next,
365                        None => {
366                            // This loop is not part of the standard intrusive mpsc
367                            // channel algorithm. This is where we atomically pop
368                            // the last task and add `rem` to the remaining capacity.
369                            //
370                            // This modification to the pop algorithm works because,
371                            // at this point, we have not done any work (only done
372                            // reading). We have a *pretty* good idea that there is
373                            // no concurrent pusher.
374                            //
375                            // The capacity is then atomically added by doing an
376                            // AcqRel CAS on `state`. The `state` cell is the
377                            // linchpin of the algorithm.
378                            //
379                            // By successfully CASing `head` w/ AcqRel, we ensure
380                            // that, if any thread was racing and entered a push, we
381                            // see that and abort pop, retrying as it is
382                            // "inconsistent".
383                            let mut curr = SemState::load(&self.state, Acquire);
384
385                            loop {
386                                if curr.has_waiter(&self.stub) {
387                                    // Inconsistent
388                                    thread::yield_now();
389                                    continue 'outer;
390                                }
391
392                                // When closing the semaphore, nodes are popped
393                                // with `rem == 0`. In this case, we are not
394                                // adding permits, but notifying waiters of the
395                                // semaphore's closed state.
396                                if rem == 0 {
397                                    debug_assert!(curr.is_closed(), "state = {:?}", curr);
398                                    return None;
399                                }
400
401                                let mut next = curr;
402                                next.release_permits(rem, &self.stub);
403
404                                match next.compare_exchange(&self.state, curr, AcqRel, Acquire) {
405                                    Ok(_) => return None,
406                                    Err(actual) => {
407                                        curr = actual;
408                                    }
409                                }
410                            }
411                        }
412                    };
413
414                    self.head.with_mut(|head| *head = next);
415                    head = next;
416                    next_ptr = next.as_ref().next.load(Acquire);
417                }
418
419                if let Some(next) = NonNull::new(next_ptr) {
420                    self.head.with_mut(|head| *head = next);
421
422                    return Some(Arc::from_raw(head.as_ptr()));
423                }
424
425                let state = SemState::load(&self.state, Acquire);
426
427                // This must always be a pointer as the wait list is not empty.
428                let tail = state.waiter().unwrap();
429
430                if tail != head {
431                    // Inconsistent
432                    thread::yield_now();
433                    continue 'outer;
434                }
435
436                self.push_stub(closed);
437
438                next_ptr = head.as_ref().next.load(Acquire);
439
440                if let Some(next) = NonNull::new(next_ptr) {
441                    self.head.with_mut(|head| *head = next);
442
443                    return Some(Arc::from_raw(head.as_ptr()));
444                }
445
446                // Inconsistent state, loop
447                thread::yield_now();
448            }
449        }
450    }
451
452    unsafe fn push_stub(&self, closed: bool) {
453        let stub = self.stub();
454
455        // Set the next pointer. This does not require an atomic operation as
456        // this node is not accessible. The write will be flushed with the next
457        // operation
458        stub.as_ref().next.store(ptr::null_mut(), Relaxed);
459
460        // Update the tail to point to the new node. We need to see the previous
461        // node in order to update the next pointer as well as release `task`
462        // to any other threads calling `push`.
463        let prev = SemState::new_ptr(stub, closed).swap(&self.state, AcqRel);
464
465        debug_assert_eq!(closed, prev.is_closed());
466
467        // The stub is only pushed when there are pending tasks. Because of
468        // this, the state must *always* be in pointer mode.
469        let prev = prev.waiter().unwrap();
470
471        // We don't want the *existing* pointer to be a stub.
472        debug_assert_ne!(prev, stub);
473
474        // Release `task` to the consume end.
475        prev.as_ref().next.store(stub.as_ptr(), Release);
476    }
477
478    fn stub(&self) -> NonNull<WaiterNode> {
479        unsafe { NonNull::new_unchecked(&*self.stub as *const _ as *mut _) }
480    }
481}
482
483impl fmt::Debug for Semaphore {
484    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
485        fmt.debug_struct("Semaphore")
486            .field("state", &SemState::load(&self.state, Relaxed))
487            .field("head", &self.head.with(|ptr| ptr))
488            .field("rx_lock", &self.rx_lock.load(Relaxed))
489            .field("stub", &self.stub)
490            .finish()
491    }
492}
493
494unsafe impl Send for Semaphore {}
495unsafe impl Sync for Semaphore {}
496
497// ===== impl Permit =====
498
499impl Permit {
500    /// Create a new `Permit`.
501    ///
502    /// The permit begins in the "unacquired" state.
503    pub(crate) fn new() -> Permit {
504        Permit {
505            waiter: None,
506            state: PermitState::Idle,
507        }
508    }
509
510    /// Returns true if the permit has been acquired
511    pub(crate) fn is_acquired(&self) -> bool {
512        self.state == PermitState::Acquired
513    }
514
515    /// Try to acquire the permit. If no permits are available, the current task
516    /// is notified once a new permit becomes available.
517    pub(crate) fn poll_acquire(
518        &mut self,
519        cx: &mut Context<'_>,
520        semaphore: &Semaphore,
521    ) -> Poll<Result<(), AcquireError>> {
522        match self.state {
523            PermitState::Idle => {}
524            PermitState::Waiting => {
525                let waiter = self.waiter.as_ref().unwrap();
526
527                if waiter.acquire(cx)? {
528                    self.state = PermitState::Acquired;
529                    return Ready(Ok(()));
530                } else {
531                    return Pending;
532                }
533            }
534            PermitState::Acquired => {
535                return Ready(Ok(()));
536            }
537        }
538
539        match semaphore.poll_permit(Some((cx, self)))? {
540            Ready(()) => {
541                self.state = PermitState::Acquired;
542                Ready(Ok(()))
543            }
544            Pending => {
545                self.state = PermitState::Waiting;
546                Pending
547            }
548        }
549    }
550
551    /// Try to acquire the permit.
552    pub(crate) fn try_acquire(&mut self, semaphore: &Semaphore) -> Result<(), TryAcquireError> {
553        match self.state {
554            PermitState::Idle => {}
555            PermitState::Waiting => {
556                let waiter = self.waiter.as_ref().unwrap();
557
558                if waiter.acquire2().map_err(to_try_acquire)? {
559                    self.state = PermitState::Acquired;
560                    return Ok(());
561                } else {
562                    return Err(TryAcquireError::no_permits());
563                }
564            }
565            PermitState::Acquired => {
566                return Ok(());
567            }
568        }
569
570        match semaphore.poll_permit(None).map_err(to_try_acquire)? {
571            Ready(()) => {
572                self.state = PermitState::Acquired;
573                Ok(())
574            }
575            Pending => Err(TryAcquireError::no_permits()),
576        }
577    }
578
579    /// Release a permit back to the semaphore
580    pub(crate) fn release(&mut self, semaphore: &Semaphore) {
581        if self.forget2() {
582            semaphore.add_permits(1);
583        }
584    }
585
586    /// Forget the permit **without** releasing it back to the semaphore.
587    ///
588    /// After calling `forget`, `poll_acquire` is able to acquire new permit
589    /// from the sempahore.
590    ///
591    /// Repeatedly calling `forget` without associated calls to `add_permit`
592    /// will result in the semaphore losing all permits.
593    pub(crate) fn forget(&mut self) {
594        self.forget2();
595    }
596
597    /// Returns `true` if the permit was acquired
598    fn forget2(&mut self) -> bool {
599        match self.state {
600            PermitState::Idle => false,
601            PermitState::Waiting => {
602                let ret = self.waiter.as_ref().unwrap().cancel_interest();
603                self.state = PermitState::Idle;
604                ret
605            }
606            PermitState::Acquired => {
607                self.state = PermitState::Idle;
608                true
609            }
610        }
611    }
612}
613
614impl Default for Permit {
615    fn default() -> Self {
616        Self::new()
617    }
618}
619
620// ===== impl AcquireError ====
621
622impl AcquireError {
623    fn closed() -> AcquireError {
624        AcquireError(())
625    }
626}
627
628fn to_try_acquire(_: AcquireError) -> TryAcquireError {
629    TryAcquireError::closed()
630}
631
632impl fmt::Display for AcquireError {
633    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
634        write!(fmt, "semaphore closed")
635    }
636}
637
638impl ::std::error::Error for AcquireError {}
639
640// ===== impl TryAcquireError =====
641
642impl TryAcquireError {
643    fn closed() -> TryAcquireError {
644        TryAcquireError {
645            kind: ErrorKind::Closed,
646        }
647    }
648
649    fn no_permits() -> TryAcquireError {
650        TryAcquireError {
651            kind: ErrorKind::NoPermits,
652        }
653    }
654
655    /// Returns true if the error was caused by a closed semaphore.
656    pub(crate) fn is_closed(&self) -> bool {
657        match self.kind {
658            ErrorKind::Closed => true,
659            _ => false,
660        }
661    }
662
663    /// Returns true if the error was caused by calling `try_acquire` on a
664    /// semaphore with no available permits.
665    pub(crate) fn is_no_permits(&self) -> bool {
666        match self.kind {
667            ErrorKind::NoPermits => true,
668            _ => false,
669        }
670    }
671}
672
673impl fmt::Display for TryAcquireError {
674    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
675        let descr = match self.kind {
676            ErrorKind::Closed => "semaphore closed",
677            ErrorKind::NoPermits => "no permits available",
678        };
679        write!(fmt, "{}", descr)
680    }
681}
682
683impl ::std::error::Error for TryAcquireError {}
684
685// ===== impl WaiterNode =====
686
687impl WaiterNode {
688    fn new() -> WaiterNode {
689        WaiterNode {
690            state: AtomicUsize::new(NodeState::new().to_usize()),
691            waker: AtomicWaker::new(),
692            next: AtomicPtr::new(ptr::null_mut()),
693        }
694    }
695
696    fn acquire(&self, cx: &mut Context<'_>) -> Result<bool, AcquireError> {
697        if self.acquire2()? {
698            return Ok(true);
699        }
700
701        self.waker.register_by_ref(cx.waker());
702
703        self.acquire2()
704    }
705
706    fn acquire2(&self) -> Result<bool, AcquireError> {
707        use self::NodeState::*;
708
709        match Idle.compare_exchange(&self.state, Assigned, AcqRel, Acquire) {
710            Ok(_) => Ok(true),
711            Err(Closed) => Err(AcquireError::closed()),
712            Err(_) => Ok(false),
713        }
714    }
715
716    fn register(&self, cx: &mut Context<'_>) {
717        self.waker.register_by_ref(cx.waker())
718    }
719
720    /// Returns `true` if the permit has been acquired
721    fn cancel_interest(&self) -> bool {
722        use self::NodeState::*;
723
724        match Queued.compare_exchange(&self.state, QueuedWaiting, AcqRel, Acquire) {
725            // Successfully removed interest from the queued node. The permit
726            // has not been assigned to the node.
727            Ok(_) => false,
728            // The semaphore has been closed, there is no further action to
729            // take.
730            Err(Closed) => false,
731            // The permit has been assigned. It must be acquired in order to
732            // be released back to the semaphore.
733            Err(Assigned) => {
734                match self.acquire2() {
735                    Ok(true) => true,
736                    // Not a reachable state
737                    Ok(false) => panic!(),
738                    // The semaphore has been closed, no further action to take.
739                    Err(_) => false,
740                }
741            }
742            Err(state) => panic!("unexpected state = {:?}", state),
743        }
744    }
745
746    /// Transition the state to `QueuedWaiting`.
747    ///
748    /// This step can only happen from `Queued` or from `Idle`.
749    ///
750    /// Returns `true` if transitioning into a queued state.
751    fn to_queued_waiting(&self) -> bool {
752        use self::NodeState::*;
753
754        let mut curr = NodeState::load(&self.state, Acquire);
755
756        loop {
757            debug_assert!(curr == Idle || curr == Queued, "actual = {:?}", curr);
758            let next = QueuedWaiting;
759
760            match next.compare_exchange(&self.state, curr, AcqRel, Acquire) {
761                Ok(_) => {
762                    if curr.is_queued() {
763                        return false;
764                    } else {
765                        // Transitioned to queued, reset next pointer
766                        self.next.store(ptr::null_mut(), Relaxed);
767                        return true;
768                    }
769                }
770                Err(actual) => {
771                    curr = actual;
772                }
773            }
774        }
775    }
776
777    /// Notify the waiter
778    ///
779    /// Returns `true` if the waiter accepts the notification
780    fn notify(&self, closed: bool) -> bool {
781        use self::NodeState::*;
782
783        // Assume QueuedWaiting state
784        let mut curr = QueuedWaiting;
785
786        loop {
787            let next = match curr {
788                Queued => Idle,
789                QueuedWaiting => {
790                    if closed {
791                        Closed
792                    } else {
793                        Assigned
794                    }
795                }
796                actual => panic!("actual = {:?}", actual),
797            };
798
799            match next.compare_exchange(&self.state, curr, AcqRel, Acquire) {
800                Ok(_) => match curr {
801                    QueuedWaiting => {
802                        self.waker.wake();
803                        return true;
804                    }
805                    _ => return false,
806                },
807                Err(actual) => curr = actual,
808            }
809        }
810    }
811
812    fn revert_to_idle(&self) {
813        use self::NodeState::Idle;
814
815        // There are no other handles to the node
816        NodeState::store(&self.state, Idle, Relaxed);
817    }
818
819    #[allow(clippy::wrong_self_convention)] // https://github.com/rust-lang/rust-clippy/issues/4293
820    fn into_non_null(self: Arc<WaiterNode>) -> NonNull<WaiterNode> {
821        let ptr = Arc::into_raw(self);
822        unsafe { NonNull::new_unchecked(ptr as *mut _) }
823    }
824}
825
826// ===== impl State =====
827
828/// Flag differentiating between available permits and waiter pointers.
829///
830/// If we assume pointers are properly aligned, then the least significant bit
831/// will always be zero. So, we use that bit to track if the value represents a
832/// number.
833const NUM_FLAG: usize = 0b01;
834
835const CLOSED_FLAG: usize = 0b10;
836
837const MAX_PERMITS: usize = usize::MAX >> NUM_SHIFT;
838
839/// When representing "numbers", the state has to be shifted this much (to get
840/// rid of the flag bit).
841const NUM_SHIFT: usize = 2;
842
843impl SemState {
844    /// Returns a new default `State` value.
845    fn new(permits: usize, stub: &WaiterNode) -> SemState {
846        assert!(permits <= MAX_PERMITS);
847
848        if permits > 0 {
849            SemState((permits << NUM_SHIFT) | NUM_FLAG)
850        } else {
851            SemState(stub as *const _ as usize)
852        }
853    }
854
855    /// Returns a `State` tracking `ptr` as the tail of the queue.
856    fn new_ptr(tail: NonNull<WaiterNode>, closed: bool) -> SemState {
857        let mut val = tail.as_ptr() as usize;
858
859        if closed {
860            val |= CLOSED_FLAG;
861        }
862
863        SemState(val)
864    }
865
866    /// Returns the amount of remaining capacity
867    fn available_permits(self) -> usize {
868        if !self.has_available_permits() {
869            return 0;
870        }
871
872        self.0 >> NUM_SHIFT
873    }
874
875    /// Returns true if the state has permits that can be claimed by a waiter.
876    fn has_available_permits(self) -> bool {
877        self.0 & NUM_FLAG == NUM_FLAG
878    }
879
880    fn has_waiter(self, stub: &WaiterNode) -> bool {
881        !self.has_available_permits() && !self.is_stub(stub)
882    }
883
884    /// Try to acquire a permit
885    ///
886    /// # Return
887    ///
888    /// Returns `true` if the permit was acquired, `false` otherwise. If `false`
889    /// is returned, it can be assumed that `State` represents the head pointer
890    /// in the mpsc channel.
891    fn acquire_permit(&mut self, stub: &WaiterNode) -> bool {
892        if !self.has_available_permits() {
893            return false;
894        }
895
896        debug_assert!(self.waiter().is_none());
897
898        self.0 -= 1 << NUM_SHIFT;
899
900        if self.0 == NUM_FLAG {
901            // Set the state to the stub pointer.
902            self.0 = stub as *const _ as usize;
903        }
904
905        true
906    }
907
908    /// Release permits
909    ///
910    /// Returns `true` if the permits were accepted.
911    fn release_permits(&mut self, permits: usize, stub: &WaiterNode) {
912        debug_assert!(permits > 0);
913
914        if self.is_stub(stub) {
915            self.0 = (permits << NUM_SHIFT) | NUM_FLAG | (self.0 & CLOSED_FLAG);
916            return;
917        }
918
919        debug_assert!(self.has_available_permits());
920
921        self.0 += permits << NUM_SHIFT;
922    }
923
924    fn is_waiter(self) -> bool {
925        self.0 & NUM_FLAG == 0
926    }
927
928    /// Returns the waiter, if one is set.
929    fn waiter(self) -> Option<NonNull<WaiterNode>> {
930        if self.is_waiter() {
931            let waiter = NonNull::new(self.as_ptr()).expect("null pointer stored");
932
933            Some(waiter)
934        } else {
935            None
936        }
937    }
938
939    /// Assumes `self` represents a pointer
940    fn as_ptr(self) -> *mut WaiterNode {
941        (self.0 & !CLOSED_FLAG) as *mut WaiterNode
942    }
943
944    /// Set to a pointer to a waiter.
945    ///
946    /// This can only be done from the full state.
947    fn set_waiter(&mut self, waiter: NonNull<WaiterNode>) {
948        let waiter = waiter.as_ptr() as usize;
949        debug_assert!(waiter & NUM_FLAG == 0);
950        debug_assert!(!self.is_closed());
951
952        self.0 = waiter;
953    }
954
955    fn is_stub(self, stub: &WaiterNode) -> bool {
956        self.as_ptr() as usize == stub as *const _ as usize
957    }
958
959    /// Load the state from an AtomicUsize.
960    fn load(cell: &AtomicUsize, ordering: Ordering) -> SemState {
961        let value = cell.load(ordering);
962        SemState(value)
963    }
964
965    /// Swap the values
966    fn swap(self, cell: &AtomicUsize, ordering: Ordering) -> SemState {
967        let prev = SemState(cell.swap(self.to_usize(), ordering));
968        debug_assert_eq!(prev.is_closed(), self.is_closed());
969        prev
970    }
971
972    /// Compare and exchange the current value into the provided cell
973    fn compare_exchange(
974        self,
975        cell: &AtomicUsize,
976        prev: SemState,
977        success: Ordering,
978        failure: Ordering,
979    ) -> Result<SemState, SemState> {
980        debug_assert_eq!(prev.is_closed(), self.is_closed());
981
982        let res = cell.compare_exchange(prev.to_usize(), self.to_usize(), success, failure);
983
984        res.map(SemState).map_err(SemState)
985    }
986
987    fn fetch_set_closed(cell: &AtomicUsize, ordering: Ordering) -> SemState {
988        let value = cell.fetch_or(CLOSED_FLAG, ordering);
989        SemState(value)
990    }
991
992    fn is_closed(self) -> bool {
993        self.0 & CLOSED_FLAG == CLOSED_FLAG
994    }
995
996    /// Converts the state into a `usize` representation.
997    fn to_usize(self) -> usize {
998        self.0
999    }
1000}
1001
1002impl fmt::Debug for SemState {
1003    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1004        let mut fmt = fmt.debug_struct("SemState");
1005
1006        if self.is_waiter() {
1007            fmt.field("state", &"<waiter>");
1008        } else {
1009            fmt.field("permits", &self.available_permits());
1010        }
1011
1012        fmt.finish()
1013    }
1014}
1015
1016// ===== impl NodeState =====
1017
1018impl NodeState {
1019    fn new() -> NodeState {
1020        NodeState::Idle
1021    }
1022
1023    fn from_usize(value: usize) -> NodeState {
1024        use self::NodeState::*;
1025
1026        match value {
1027            0 => Idle,
1028            1 => Queued,
1029            2 => QueuedWaiting,
1030            3 => Assigned,
1031            4 => Closed,
1032            _ => panic!(),
1033        }
1034    }
1035
1036    fn load(cell: &AtomicUsize, ordering: Ordering) -> NodeState {
1037        NodeState::from_usize(cell.load(ordering))
1038    }
1039
1040    /// Store a value
1041    fn store(cell: &AtomicUsize, value: NodeState, ordering: Ordering) {
1042        cell.store(value.to_usize(), ordering);
1043    }
1044
1045    fn compare_exchange(
1046        self,
1047        cell: &AtomicUsize,
1048        prev: NodeState,
1049        success: Ordering,
1050        failure: Ordering,
1051    ) -> Result<NodeState, NodeState> {
1052        cell.compare_exchange(prev.to_usize(), self.to_usize(), success, failure)
1053            .map(NodeState::from_usize)
1054            .map_err(NodeState::from_usize)
1055    }
1056
1057    /// Returns `true` if `self` represents a queued state.
1058    fn is_queued(self) -> bool {
1059        use self::NodeState::*;
1060
1061        match self {
1062            Queued | QueuedWaiting => true,
1063            _ => false,
1064        }
1065    }
1066
1067    fn to_usize(self) -> usize {
1068        self as usize
1069    }
1070}