actori_utils/
condition.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use slab::Slab;
6
7use crate::cell::Cell;
8use crate::task::LocalWaker;
9
10/// Condition allows to notify multiple receivers at the same time
11pub struct Condition(Cell<Inner>);
12
13struct Inner {
14    data: Slab<Option<LocalWaker>>,
15}
16
17impl Default for Condition {
18    fn default() -> Self {
19        Self::new()
20    }
21}
22
23impl Condition {
24    pub fn new() -> Condition {
25        Condition(Cell::new(Inner { data: Slab::new() }))
26    }
27
28    /// Get condition waiter
29    pub fn wait(&mut self) -> Waiter {
30        let token = self.0.get_mut().data.insert(None);
31        Waiter {
32            token,
33            inner: self.0.clone(),
34        }
35    }
36
37    /// Notify all waiters
38    pub fn notify(&self) {
39        let inner = self.0.get_ref();
40        for item in inner.data.iter() {
41            if let Some(waker) = item.1 {
42                waker.wake();
43            }
44        }
45    }
46}
47
48impl Drop for Condition {
49    fn drop(&mut self) {
50        self.notify()
51    }
52}
53
54#[must_use = "Waiter do nothing unless polled"]
55pub struct Waiter {
56    token: usize,
57    inner: Cell<Inner>,
58}
59
60impl Clone for Waiter {
61    fn clone(&self) -> Self {
62        let token = unsafe { self.inner.get_mut_unsafe() }.data.insert(None);
63        Waiter {
64            token,
65            inner: self.inner.clone(),
66        }
67    }
68}
69
70impl Future for Waiter {
71    type Output = ();
72
73    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
74        let this = self.get_mut();
75
76        let inner = unsafe { this.inner.get_mut().data.get_unchecked_mut(this.token) };
77        if inner.is_none() {
78            let waker = LocalWaker::default();
79            waker.register(cx.waker());
80            *inner = Some(waker);
81            Poll::Pending
82        } else if inner.as_mut().unwrap().register(cx.waker()) {
83            Poll::Pending
84        } else {
85            Poll::Ready(())
86        }
87    }
88}
89
90impl Drop for Waiter {
91    fn drop(&mut self) {
92        self.inner.get_mut().data.remove(self.token);
93    }
94}
95
96#[cfg(test)]
97mod tests {
98    use super::*;
99    use futures::future::lazy;
100
101    #[actori_rt::test]
102    async fn test_condition() {
103        let mut cond = Condition::new();
104        let mut waiter = cond.wait();
105        assert_eq!(
106            lazy(|cx| Pin::new(&mut waiter).poll(cx)).await,
107            Poll::Pending
108        );
109        cond.notify();
110        assert_eq!(waiter.await, ());
111
112        let mut waiter = cond.wait();
113        assert_eq!(
114            lazy(|cx| Pin::new(&mut waiter).poll(cx)).await,
115            Poll::Pending
116        );
117        let mut waiter2 = waiter.clone();
118        assert_eq!(
119            lazy(|cx| Pin::new(&mut waiter2).poll(cx)).await,
120            Poll::Pending
121        );
122
123        drop(cond);
124        assert_eq!(waiter.await, ());
125        assert_eq!(waiter2.await, ());
126    }
127}