uchan/
parker.rs

1use super::event::{Event, TimedEvent};
2use core::{
3    marker::PhantomPinned,
4    pin::Pin,
5    ptr::{null_mut, read, NonNull},
6    sync::atomic::{AtomicPtr, Ordering},
7};
8
9pub(super) enum TryParkResult<E: TimedEvent> {
10    Interrupted(E::Duration),
11    Unparked,
12    Timeout,
13}
14
15struct EventRef {
16    _pinned: PhantomPinned,
17    ptr: NonNull<()>,
18    set_fn: unsafe fn(NonNull<()>),
19}
20
21pub(super) struct Parker {
22    state: AtomicPtr<EventRef>,
23}
24
25impl Parker {
26    pub(super) const EMPTY: Self = Self {
27        state: AtomicPtr::new(null_mut()),
28    };
29
30    pub(super) fn park<E: Event>(&self) {
31        assert!(self.park_with::<E>(|event| {
32            event.wait();
33            true
34        }))
35    }
36
37    pub(super) fn try_park_for<E: TimedEvent>(&self, timeout: E::Duration) -> TryParkResult<E> {
38        let mut timeout = Some(timeout);
39        let unparked = self.park_with::<E>(|event| {
40            let duration = timeout.take().unwrap();
41            event.try_wait_for(duration)
42        });
43
44        if !unparked {
45            assert!(timeout.is_none());
46            return TryParkResult::Timeout;
47        }
48
49        match timeout {
50            Some(unused) => TryParkResult::Interrupted(unused),
51            None => TryParkResult::Unparked,
52        }
53    }
54
55    const UNPARKED: *mut EventRef = NonNull::dangling().as_ptr();
56
57    fn park_with<E: Event>(&self, do_park: impl FnOnce(Pin<&E>) -> bool) -> bool {
58        let mut unparked = true;
59        let mut state = self.state.load(Ordering::Acquire);
60
61        if state.is_null() {
62            E::with(|event| {
63                let event_ref = EventRef {
64                    _pinned: PhantomPinned,
65                    ptr: NonNull::from(&*event).cast(),
66                    set_fn: |ptr| unsafe {
67                        let ptr = ptr.cast::<E>();
68                        Pin::new_unchecked(ptr.as_ref()).set();
69                    },
70                };
71
72                let pinned = unsafe { Pin::new_unchecked(&event_ref) };
73                let event_ref_ptr = NonNull::from(&*pinned).as_ptr();
74                assert_ne!(event_ref_ptr, Self::UNPARKED);
75
76                if let Err(e) = self.state.compare_exchange(
77                    null_mut(),
78                    event_ref_ptr,
79                    Ordering::Release,
80                    Ordering::Acquire,
81                ) {
82                    state = e;
83                    return;
84                }
85
86                if do_park(event.as_ref()) {
87                    state = self.state.load(Ordering::Acquire);
88                    return;
89                }
90
91                match self.state.compare_exchange(
92                    event_ref_ptr,
93                    null_mut(),
94                    Ordering::Acquire,
95                    Ordering::Acquire,
96                ) {
97                    Ok(_) => unparked = false,
98                    Err(e) => {
99                        state = e;
100                        event.wait();
101                    }
102                }
103            });
104        }
105
106        if unparked {
107            assert_eq!(state, Self::UNPARKED);
108            self.state.store(null_mut(), Ordering::Relaxed);
109        }
110
111        unparked
112    }
113
114    pub(super) fn unpark(&self) {
115        let mut state = self.state.load(Ordering::SeqCst);
116
117        if !state.is_null() && (state != Self::UNPARKED) {
118            state = self.state.swap(Self::UNPARKED, Ordering::AcqRel);
119        }
120
121        if !state.is_null() && (state != Self::UNPARKED) {
122            unsafe {
123                let event_ref = read(state);
124                (event_ref.set_fn)(event_ref.ptr);
125            }
126        }
127    }
128}