asyncs_sync/
notify.rs

1use std::future::Future;
2use std::mem::MaybeUninit;
3use std::pin::Pin;
4use std::ptr;
5use std::ptr::NonNull;
6use std::sync::atomic::AtomicUsize;
7use std::sync::atomic::Ordering::{self, *};
8use std::sync::Mutex;
9use std::task::{Context, Poll, Waker};
10
11use crate::parker::{Parking, WeakOrdering};
12
13trait Node {
14    fn link(&mut self) -> &mut Link<Self>;
15}
16
17struct Link<N: ?Sized> {
18    next: Option<NonNull<N>>,
19    prev: Option<NonNull<N>>,
20}
21
22impl<T> Default for Link<T> {
23    fn default() -> Self {
24        Self { next: None, prev: None }
25    }
26}
27
28struct List<N: ?Sized> {
29    head: Option<NonNull<N>>,
30    tail: Option<NonNull<N>>,
31}
32
33impl<T> Default for List<T> {
34    fn default() -> Self {
35        Self { head: None, tail: None }
36    }
37}
38
39impl<T: Node> List<T> {
40    pub fn push_front(&mut self, node: &mut T) {
41        let ptr = unsafe { NonNull::new_unchecked(node as *const T as *mut T) };
42        if let Some(mut head) = self.head {
43            unsafe {
44                head.as_mut().link().prev = Some(ptr);
45            }
46        }
47        let link = node.link();
48        link.next = self.head;
49        link.prev = None;
50        self.head = Some(ptr);
51        if self.tail.is_none() {
52            self.tail = self.head;
53        }
54    }
55
56    pub fn pop_back<'a>(&mut self) -> Option<&'a mut T> {
57        let node = match self.tail {
58            None => return None,
59            Some(mut ptr) => unsafe { ptr.as_mut() },
60        };
61        self.tail = node.link().prev;
62        match self.tail {
63            None => self.head = None,
64            Some(mut ptr) => unsafe { ptr.as_mut().link().next = None },
65        }
66        Some(node)
67    }
68
69    pub fn unlink(&mut self, node: &mut T) -> bool {
70        let ptr = unsafe { NonNull::new_unchecked(node as *const T as *mut T) };
71        let link = node.link();
72
73        if let Some(mut next) = link.next {
74            unsafe { next.as_mut().link().prev = link.prev };
75        } else if self.tail == Some(ptr) {
76            self.tail = link.prev;
77        } else {
78            return false;
79        }
80
81        if let Some(mut prev) = link.prev {
82            unsafe { prev.as_mut().link().next = link.next };
83        } else if self.head == Some(ptr) {
84            self.head = link.next;
85        } else {
86            return false;
87        }
88
89        link.next = None;
90        link.prev = None;
91
92        true
93    }
94
95    pub fn is_empty(&self) -> bool {
96        self.head.is_none()
97    }
98}
99
100struct GuardedList<'a, T> {
101    empty: bool,
102    guard: &'a mut T,
103}
104
105impl<'a, T: Node> GuardedList<'a, T> {
106    pub fn new(list: List<T>, guard: &'a mut T) -> Self {
107        let ptr = unsafe { NonNull::new_unchecked(guard as *mut T) };
108        let link = guard.link();
109        if list.is_empty() {
110            link.next = Some(ptr);
111            link.prev = Some(ptr);
112        } else {
113            link.next = list.head;
114            link.prev = list.tail;
115            unsafe {
116                list.head.unwrap_unchecked().as_mut().link().prev = Some(ptr);
117                list.tail.unwrap_unchecked().as_mut().link().next = Some(ptr);
118            }
119        }
120        Self { empty: false, guard }
121    }
122
123    pub fn pop_back<'b>(&mut self) -> Option<&'b mut T> {
124        let addr = self.guard as *mut _;
125        let link = self.guard.link();
126        let last = unsafe { link.prev.unwrap_unchecked().as_mut() };
127        if ptr::addr_eq(addr, last) {
128            self.empty = true;
129            return None;
130        }
131        link.prev = last.link().prev;
132        last.link().next = unsafe { Some(NonNull::new_unchecked(addr)) };
133        Some(last)
134    }
135
136    pub fn is_empty(&self) -> bool {
137        self.empty
138    }
139}
140
141struct WaiterList<'a> {
142    list: GuardedList<'a, Waiter>,
143    round: Round,
144    notify: &'a Notify,
145}
146
147impl<'a> WaiterList<'a> {
148    pub fn new(list: GuardedList<'a, Waiter>, round: Round, notify: &'a Notify) -> Self {
149        Self { list, round, notify }
150    }
151
152    pub fn pop_back<'b>(&mut self, _lock: &mut std::sync::MutexGuard<'_, List<Waiter>>) -> Option<&'b mut Waiter> {
153        self.list.pop_back()
154    }
155}
156
157impl Drop for WaiterList<'_> {
158    fn drop(&mut self) {
159        if self.list.is_empty() {
160            return;
161        }
162        let _lock = self.notify.list.lock().unwrap();
163        while let Some(waiter) = self.list.pop_back() {
164            waiter.notification.store(self.round.into_notification(NotificationKind::All), Release);
165        }
166    }
167}
168
169const STATUS_MASK: usize = 3usize;
170
171const ROUND_UNIT: usize = STATUS_MASK + 1;
172const ROUND_MASK: usize = !STATUS_MASK;
173
174#[derive(Copy, Clone, Debug, PartialEq)]
175struct Round(usize);
176
177impl Round {
178    const ZERO: Round = Self(0);
179
180    pub fn new() -> Self {
181        Self(ROUND_UNIT)
182    }
183
184    pub fn into_notification(self, kind: NotificationKind) -> Notification {
185        Notification { kind, round: self }
186    }
187
188    pub fn next(self) -> Self {
189        Self(self.0.wrapping_add(ROUND_UNIT))
190    }
191
192    pub fn into(self) -> usize {
193        self.0
194    }
195
196    pub fn from(i: usize) -> Self {
197        Self(i & ROUND_MASK)
198    }
199}
200
201#[derive(Clone, Copy, Debug, PartialEq)]
202struct State {
203    round: Round,
204    status: Status,
205}
206
207#[derive(Clone, Copy, Debug, PartialEq)]
208#[repr(usize)]
209enum Status {
210    Idle = 0,
211    Waiting = 1,
212    Notified = 2,
213}
214
215impl State {
216    pub fn new() -> Self {
217        Self { round: Round::new(), status: Status::Idle }
218    }
219
220    pub fn with_status(self, status: Status) -> Self {
221        Self { round: self.round, status }
222    }
223
224    pub fn with_round(self, round: Round) -> Self {
225        Self { round, status: self.status }
226    }
227
228    pub fn next_round(self) -> Self {
229        self.with_round(self.round.next())
230    }
231}
232
233struct AtomicState(AtomicUsize);
234
235impl Default for AtomicState {
236    fn default() -> Self {
237        Self::new(State::new())
238    }
239}
240
241impl AtomicState {
242    pub fn new(state: State) -> Self {
243        Self(AtomicUsize::new(state.into()))
244    }
245
246    pub fn store(&self, state: State, ordering: Ordering) {
247        self.0.store(state.into(), ordering)
248    }
249
250    pub fn load(&self, ordering: Ordering) -> State {
251        let u = self.0.load(ordering);
252        State::from(u)
253    }
254
255    pub fn compare_exchange(
256        &self,
257        current: State,
258        new: State,
259        success: Ordering,
260        failure: Ordering,
261    ) -> Result<State, State> {
262        match self.0.compare_exchange(current.into(), new.into(), success, failure) {
263            Ok(_) => Ok(current),
264            Err(updated) => Err(State::from(updated)),
265        }
266    }
267}
268
269impl From<State> for usize {
270    fn from(state: State) -> usize {
271        state.round.into() | state.status as usize
272    }
273}
274
275impl From<usize> for State {
276    fn from(i: usize) -> Self {
277        let status = i & STATUS_MASK;
278        Self { round: Round::from(i), status: unsafe { std::mem::transmute::<usize, Status>(status) } }
279    }
280}
281
282#[derive(Clone, Copy, PartialEq)]
283#[repr(usize)]
284enum NotificationKind {
285    One = 0,
286    All = 1,
287}
288
289#[derive(Clone, Copy)]
290struct Notification {
291    kind: NotificationKind,
292    round: Round,
293}
294
295impl From<Notification> for usize {
296    fn from(notification: Notification) -> usize {
297        notification.round.into() | notification.kind as usize
298    }
299}
300
301impl From<usize> for Notification {
302    fn from(u: usize) -> Self {
303        let kind = u & STATUS_MASK;
304        Self { kind: unsafe { std::mem::transmute::<usize, NotificationKind>(kind) }, round: Round::from(u) }
305    }
306}
307
308#[derive(Default)]
309struct AtomicNotification(AtomicUsize);
310
311impl AtomicNotification {
312    pub fn clear(&mut self) {
313        self.0.store(0, Relaxed)
314    }
315
316    pub fn take(&mut self) -> Option<Notification> {
317        let notification = std::mem::take(self);
318        notification.load(Relaxed)
319    }
320
321    pub fn load(&self, ordering: Ordering) -> Option<Notification> {
322        match self.0.load(ordering) {
323            0 => None,
324            u => Some(Notification::from(u)),
325        }
326    }
327
328    pub fn store(&self, notification: Notification, ordering: Ordering) {
329        self.0.store(notification.into(), ordering)
330    }
331}
332
333/// Notifies one task or all attached tasks to wakeup.
334///
335/// [notify_one] and [notified().await] behave similar to [Thread::unpark] and [thread::park]
336/// except that [notified().await] will not be waked up spuriously. One could assume that there is
337/// at most one permit associated with [Notify]. [notified().await] will block current task unless
338/// or until the permit is available to consume. [notify_one] release the permit for [notified().await]
339/// to acquire, it will wake up [Notified] in FIFO order if there are multiple [Notified]s blocking
340/// for the permit. The order of [Notified]s are the order of [notified().await] or [Notified::enable()]
341/// whichever first.
342///
343/// [notify_all], on the other hand, will wake up all attached [Notified]s and start a fresh new round
344/// for [notify_one] with no permit. [Notify::notified()]s are attached by default, one could use
345/// [Notified::detach] to detach from rounds of [Notify] until [Notified::enable] or future polling.
346///
347/// ## Differences with [tokio]
348/// * [tokio::sync::Notify::notify_all()] does not clear permit from [notify_one].
349/// * [tokio] does not have [Notified::detach()].
350///
351/// [thread::park]: std::thread::park
352/// [Thread::unpark]: std::thread::Thread::unpark
353/// [notified().await]: Notify::notified()
354/// [notify_one]: Notify::notify_one()
355/// [notify_all]: Notify::notify_all()
356/// [tokio]: https://docs.rs/tokio
357/// [tokio::sync::Notify::notify_all()]: https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html#method.notify_one
358#[derive(Default)]
359pub struct Notify {
360    // All link operations are guarded by this lock including GuardedList which actually is an
361    // independent list.
362    list: Mutex<List<Waiter>>,
363    state: AtomicState,
364}
365
366unsafe impl Send for Notify {}
367unsafe impl Sync for Notify {}
368
369impl Notify {
370    /// Constructs a new [Notify].
371    pub fn new() -> Self {
372        Self::default()
373    }
374
375    /// Constructs a attached [Notified] to consume permit from [Notify::notify_one].
376    pub fn notified(&self) -> Notified<'_> {
377        let round = self.round();
378        Notified { notify: self, stage: Stage::default(), round, waiter: Waiter::default() }
379    }
380
381    /// Notifies one waiting task or stores a permit to consume in case of no waiting task.
382    pub fn notify_one(&self) {
383        let state = self.state.load(SeqCst);
384        self.notify_one_in_round(state.round, state);
385    }
386
387    /// Notifies all attached [Notified]s and starts a fresh new round with no permit.
388    pub fn notify_all(&self) {
389        let mut state = self.state.load(SeqCst);
390        loop {
391            while state.status != Status::Waiting {
392                match self.state.compare_exchange(state, state.next_round().with_status(Status::Idle), Release, Relaxed)
393                {
394                    Ok(_) => return,
395                    Err(updated) => state = updated,
396                }
397            }
398            let mut list = self.list.lock().unwrap();
399            state = self.state.load(Relaxed);
400            if state.status != Status::Waiting {
401                drop(list);
402                continue;
403            }
404
405            // Release store to publish changes.
406            self.state.store(state.next_round().with_status(Status::Idle), Release);
407
408            let mut guard = Waiter::default();
409            let mut wakers = WakerList::new();
410            let mut waiters =
411                WaiterList::new(GuardedList::new(std::mem::take(&mut list), &mut guard), state.round, self);
412
413            'list: loop {
414                while !wakers.is_full() {
415                    let Some(waiter) = waiters.pop_back(&mut list) else {
416                        break 'list;
417                    };
418                    let waker = unsafe { waiter.parking.unpark() };
419                    waiter.notification.store(state.round.into_notification(NotificationKind::All), Release);
420                    if let Some(waker) = waker {
421                        wakers.push(waker)
422                    }
423                }
424                drop(list);
425                wakers.wake();
426                list = self.list.lock().unwrap();
427            }
428            drop(list);
429            wakers.wake();
430            return;
431        }
432    }
433
434    fn remove(&self, waiter: &mut Waiter) {
435        let notification = match waiter.notification.load(Acquire) {
436            None => {
437                let mut list = self.list.lock().unwrap();
438                if list.unlink(waiter) && list.is_empty() {
439                    let state = self.state.load(Relaxed);
440                    if state.status == Status::Waiting {
441                        self.state.store(state.with_status(Status::Idle), Relaxed);
442                    }
443                }
444                drop(list);
445                // Relaxed load as nothing is important in case of drop.
446                let Some(notification) = waiter.notification.load(Relaxed) else {
447                    return;
448                };
449                notification
450            },
451            Some(notification) => notification,
452        };
453        if notification.kind == NotificationKind::One {
454            self.release_notification(notification.round);
455        }
456    }
457
458    fn poll(&self, waiter: &mut Waiter, round: Round) -> Poll<Notification> {
459        let mut state = self.state.load(SeqCst);
460        let round = if round == Round::ZERO { state.round } else { round };
461        loop {
462            if state.round != round {
463                return Poll::Ready(round.into_notification(NotificationKind::All));
464            }
465            if state.status != Status::Notified {
466                break;
467            }
468            // Acquire load to observe changes in case of `notify_all`.
469            match self.state.compare_exchange(state, state.with_status(Status::Idle), Acquire, Acquire) {
470                Ok(_) => return Poll::Ready(state.round.into_notification(NotificationKind::One)),
471                Err(updated) => state = updated,
472            }
473        }
474        let mut list = self.list.lock().unwrap();
475        state = self.state.load(SeqCst);
476        loop {
477            if state.round != round {
478                drop(list);
479                return Poll::Ready(round.into_notification(NotificationKind::All));
480            }
481            match state.status {
482                Status::Waiting => break,
483                Status::Idle => {
484                    match self.state.compare_exchange(state, state.with_status(Status::Waiting), Relaxed, Relaxed) {
485                        Ok(_) => break,
486                        Err(updated) => state = updated,
487                    }
488                },
489                Status::Notified => {
490                    match self.state.compare_exchange(state, state.with_status(Status::Idle), Acquire, Relaxed) {
491                        Ok(_) => {
492                            drop(list);
493                            return Poll::Ready(state.round.into_notification(NotificationKind::One));
494                        },
495                        Err(updated) => state = updated,
496                    }
497                },
498            }
499        }
500        list.push_front(waiter);
501        drop(list);
502        Poll::Pending
503    }
504
505    fn notify_one_in_round(&self, round: Round, mut state: State) {
506        loop {
507            loop {
508                // There are must be at least one `notify_all`, all waiters from this round must be
509                // notified.
510                if state.round != round {
511                    return;
512                }
513                if state.status == Status::Waiting {
514                    break;
515                }
516                // Release store to transfer happens-before relationship.
517                match self.state.compare_exchange(state, state.with_status(Status::Notified), Release, Relaxed) {
518                    Ok(_) => return,
519                    Err(updated) => state = updated,
520                }
521            }
522            let mut list = self.list.lock().unwrap();
523            let state = self.state.load(Relaxed);
524            if state.round != round {
525                return;
526            }
527            if state.status != Status::Waiting {
528                drop(list);
529                continue;
530            }
531            let waiter = list.pop_back().unwrap();
532            let waker = unsafe { waiter.parking.unpark() };
533            waiter.notification.store(state.round.into_notification(NotificationKind::One), Release);
534            if list.is_empty() {
535                self.state.store(state.with_status(Status::Idle), Relaxed);
536            }
537            drop(list);
538            if let Some(waker) = waker {
539                waker.wake();
540            }
541            return;
542        }
543    }
544
545    fn round(&self) -> Round {
546        self.state.load(SeqCst).round
547    }
548
549    fn release_notification(&self, round: Round) {
550        let state = self.state.load(SeqCst);
551        self.notify_one_in_round(round, state);
552    }
553}
554
555struct WakerList {
556    next: usize,
557    wakers: [MaybeUninit<Waker>; 32],
558}
559
560impl WakerList {
561    pub fn new() -> Self {
562        Self { next: 0, wakers: std::array::from_fn(|_| MaybeUninit::uninit()) }
563    }
564
565    pub fn is_full(&self) -> bool {
566        self.next == self.wakers.len()
567    }
568
569    pub fn push(&mut self, waker: Waker) {
570        debug_assert!(self.next < self.wakers.len());
571        self.wakers[self.next].write(waker);
572        self.next += 1;
573    }
574
575    pub fn wake(&mut self) {
576        while self.next != 0 {
577            self.next -= 1;
578            let waker = unsafe { self.wakers[self.next].assume_init_read() };
579            waker.wake();
580        }
581    }
582}
583
584impl Drop for WakerList {
585    fn drop(&mut self) {
586        while self.next != 0 {
587            self.next -= 1;
588            unsafe {
589                self.wakers[self.next].assume_init_drop();
590            }
591        }
592    }
593}
594
595struct Waiter {
596    link: Link<Waiter>,
597    parking: Parking<WeakOrdering>,
598
599    /// Release store to release connection to `Waiter`.
600    /// Acquire load to observe all changes.
601    notification: AtomicNotification,
602}
603
604impl Default for Waiter {
605    fn default() -> Self {
606        Self { link: Link::default(), parking: Parking::new(), notification: AtomicNotification::default() }
607    }
608}
609
610impl Node for Waiter {
611    fn link(&mut self) -> &mut Link<Waiter> {
612        &mut self.link
613    }
614}
615
616#[repr(usize)]
617#[derive(Default, Debug, Copy, Clone, PartialEq)]
618enum Stage {
619    #[default]
620    Init = 0,
621    Waiting = 1,
622    Finished = 2,
623}
624
625/// Future created from [Notify::notified()].
626pub struct Notified<'a> {
627    notify: &'a Notify,
628
629    stage: Stage,
630    round: Round,
631
632    waiter: Waiter,
633}
634
635unsafe impl Send for Notified<'_> {}
636unsafe impl Sync for Notified<'_> {}
637
638impl<'a> Notified<'a> {
639    /// Enables to wait for a notification from [Notify::notify_one] or [Notify::notify_all].
640    ///
641    /// If there is permit from [Notify::notify_one], this will consume it temporarily for future
642    /// polling. If this [Notified] is dropped without further polling, the permit will be handed
643    /// over to [Notify] in case of no new [Notify::notify_all].
644    ///
645    /// [Notified::poll] will enable this also.
646    pub fn enable(mut self: Pin<&mut Self>) {
647        if self.stage != Stage::Init {
648            return;
649        }
650        let round = self.round;
651        if let Poll::Ready(notification) = self.notify.poll(&mut self.waiter, round) {
652            self.stage = Stage::Finished;
653            self.waiter.notification.store(notification, Relaxed);
654        } else {
655            self.stage = Stage::Waiting;
656        }
657    }
658
659    /// Detaches from rounds of [Notify] so it will not be notified until [Notified::enable] or
660    /// [Notified::poll].
661    pub fn detach(mut self) -> Notified<'a> {
662        self.round = Round::ZERO;
663        self
664    }
665}
666
667impl Future for Notified<'_> {
668    type Output = ();
669
670    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
671        let round = self.round;
672        match self.stage {
673            Stage::Init => match self.notify.poll(&mut self.waiter, round) {
674                Poll::Pending => self.stage = Stage::Waiting,
675                Poll::Ready(_) => {
676                    self.stage = Stage::Finished;
677                    return Poll::Ready(());
678                },
679            },
680            Stage::Waiting => match self.waiter.notification.load(Acquire) {
681                None => {},
682                Some(_) => {
683                    self.waiter.notification.clear();
684                    self.stage = Stage::Finished;
685                    return Poll::Ready(());
686                },
687            },
688            Stage::Finished => {
689                // We could come from `enable`.
690                self.waiter.notification.clear();
691                return Poll::Ready(());
692            },
693        }
694        debug_assert_eq!(self.stage, Stage::Waiting);
695        if unsafe { self.waiter.parking.park(cx.waker()).is_ready() } {
696            while self.waiter.notification.load(Acquire).is_none() {
697                std::hint::spin_loop();
698            }
699            self.waiter.notification.clear();
700            self.stage = Stage::Finished;
701            return Poll::Ready(());
702        }
703        Poll::Pending
704    }
705}
706
707impl Drop for Notified<'_> {
708    fn drop(&mut self) {
709        match self.stage {
710            Stage::Init => {},
711            Stage::Waiting => self.notify.remove(&mut self.waiter),
712            Stage::Finished => {
713                if let Some(Notification { round, kind: NotificationKind::One }) = self.waiter.notification.take() {
714                    self.notify.release_notification(round);
715                }
716            },
717        };
718    }
719}
720
721#[cfg(test)]
722mod tests {
723    use std::pin::{pin, Pin};
724
725    use asyncs::select;
726
727    use super::Notify;
728
729    #[asyncs::test]
730    async fn notify_one_simple() {
731        let notify = Notify::new();
732
733        // given: two notifieds polled in order
734        let mut notified1 = notify.notified();
735        let mut notified2 = notify.notified();
736        select! {
737            biased;
738            default => {},
739            _ = &mut notified1 => unreachable!(),
740            _ = &mut notified2 => unreachable!(),
741        }
742
743        // when: notify_one
744        notify.notify_one();
745
746        // then: only the first polled got notified
747        select! {
748            biased;
749            default => unreachable!(),
750            _ = &mut notified2 => unreachable!(),
751            _ = &mut notified1 => {}
752        }
753
754        // when: another notify_one
755        notify.notify_one();
756        // then: other got notified
757        select! {
758            default => unreachable!(),
759            _ = &mut notified2 => {},
760        }
761    }
762
763    #[asyncs::test]
764    async fn notify_one_enabled() {
765        let notify = Notify::new();
766        let notified1 = notify.notified();
767        let mut notified1 = pin!(notified1);
768        let mut notified2 = notify.notified();
769
770        // given: enabled notified
771        notified1.as_mut().enable();
772        select! {
773            default => {},
774            _ = &mut notified2 => unreachable!(),
775        }
776
777        // when: notify_one
778        notify.notify_one();
779
780        // then: enabled notified behaves same as polled notified
781        notified1.await;
782
783        select! {
784            default => {},
785            _ = &mut notified2 => unreachable!(),
786        }
787    }
788
789    #[asyncs::test]
790    async fn notify_one_permit_does_not_acculumate() {
791        let notify = Notify::new();
792
793        // given: two notifieds
794        let notified1 = notify.notified();
795        let notified2 = notify.notified();
796
797        // when: notify_one twice
798        notify.notify_one();
799        notify.notify_one();
800
801        // then: only one permit
802        select! {
803            default => unreachable!(),
804            _ = notified1 => {},
805        };
806        select! {
807            default => {},
808            _ = notified2 => unreachable!(),
809        };
810    }
811
812    #[asyncs::test]
813    async fn notify_one_permit_consumed_by_poll() {
814        let notify = Notify::new();
815        let mut notified1 = notify.notified();
816        let notified2 = notify.notified();
817
818        // given: notify_one permit
819        notify.notify_one();
820
821        // when: poll and drop
822        select! {
823            default => unreachable!(),
824            _ = &mut notified1 => {},
825        };
826        drop(notified1);
827
828        // then: no permit resumed
829        select! {
830            default => {},
831            _ = notified2 => unreachable!(),
832        };
833    }
834
835    #[asyncs::test]
836    async fn notify_one_permit_doesnot_consumed_by_enable() {
837        let notify = Notify::new();
838        let mut notified1 = notify.notified();
839        let notified2 = notify.notified();
840
841        // given: notify_one permit
842        notify.notify_one();
843
844        // when: enable and drop notified
845        unsafe {
846            Pin::new_unchecked(&mut notified1).enable();
847        }
848        drop(notified1);
849
850        // then: notify_one permit resumed
851        select! {
852            default => unreachable!(),
853            _ = notified2 => {},
854        };
855    }
856
857    #[asyncs::test]
858    async fn notify_one_permit_unconsumed_resumed_on_drop() {
859        let notify = Notify::new();
860
861        // given: enabled/polled notified
862        let mut notified1 = notify.notified();
863        select! {
864            default => {},
865            _ = &mut notified1 => unreachable!(),
866        };
867
868        // when: notify_one and drop with no further poll
869        notify.notify_one();
870        drop(notified1);
871
872        // then: unconsumed notify_one will be resumed
873        let notified2 = notify.notified();
874        select! {
875            default => unreachable!(),
876            _ = notified2 => {},
877        };
878    }
879
880    #[asyncs::test]
881    async fn notify_one_permit_does_not_resumed_cross_round() {
882        let notify = Notify::new();
883
884        // given: enabled/polled notified
885        let mut notified1 = notify.notified();
886        select! {
887            default => {},
888            _ = &mut notified1 => unreachable!(),
889        };
890
891        // when: notify_one and drop after notify_all with no further poll
892        notify.notify_one();
893        notify.notify_all();
894        drop(notified1);
895
896        // then: unconsumed notify_one will not be resumed cross round
897        let notified2 = notify.notified();
898        select! {
899            default => {},
900            _ = notified2 => unreachable!(),
901        };
902    }
903
904    #[asyncs::test]
905    async fn notify_all_simple() {
906        let notify = Notify::new();
907
908        // given: not enabled notified
909        let mut notified1 = notify.notified().detach();
910        let mut notified2 = notify.notified().detach();
911        let mut notified3 = notify.notified();
912
913        // when: notify_all
914        notify.notify_all();
915
916        // then: only attached ones got notified
917        select! {
918            // So all notifieds got polled
919            biased;
920            default => unreachable!(),
921            _ = &mut notified1 => unreachable!("not ready"),
922            _ = &mut notified2 => unreachable!("not ready"),
923            _ = &mut notified3 => {},
924        };
925
926        // given: polled notified
927        // when: notify_all
928        notify.notify_all();
929
930        // then: notified
931        select! {
932            default => unreachable!(),
933            _ = &mut notified1 => {},
934        };
935
936        select! {
937            default => unreachable!(),
938            _ = &mut notified2 => {},
939        };
940    }
941
942    #[asyncs::test]
943    async fn notify_all_enabled() {
944        let notify = Notify::new();
945        let notified = notify.notified();
946
947        // given: enabled notified
948        let mut notified = pin!(notified);
949        notified.as_mut().enable();
950
951        // when: notify_all
952        notify.notify_all();
953
954        // then: notified
955        select! {
956            default => unreachable!(),
957            _ = notified => {},
958        };
959    }
960
961    #[asyncs::test]
962    async fn notify_all_ruin_permit() {
963        let notify = Notify::new();
964
965        // given: a detached Notified
966        let notified = notify.notified().detach();
967
968        // when: notify_one and then notify_all
969        notify.notify_one();
970        notify.notify_all();
971
972        // then: permit got cleared
973        select! {
974            default => {},
975            _ = notified => unreachable!(),
976        }
977    }
978
979    #[asyncs::test]
980    async fn notify_unlink() {
981        let notify = Notify::new();
982
983        let mut notified1 = notify.notified();
984        let mut notified2 = notify.notified();
985
986        select! {
987            default => {},
988            _ = &mut notified1 => unreachable!(),
989            _ = &mut notified2 => unreachable!(),
990        }
991
992        let mut notified3 = notify.notified();
993        unsafe { Pin::new_unchecked(&mut notified3).enable() };
994
995        unsafe {
996            std::ptr::drop_in_place(&mut notified1);
997        }
998        unsafe {
999            std::ptr::drop_in_place(&mut notified2);
1000        }
1001        unsafe {
1002            std::ptr::drop_in_place(&mut notified3);
1003        }
1004
1005        std::mem::forget(notified1);
1006        std::mem::forget(notified2);
1007        std::mem::forget(notified3);
1008
1009        notify.notify_all();
1010    }
1011}