actori_utils/
condition.rs1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use slab::Slab;
6
7use crate::cell::Cell;
8use crate::task::LocalWaker;
9
10pub struct Condition(Cell<Inner>);
12
13struct Inner {
14 data: Slab<Option<LocalWaker>>,
15}
16
17impl Default for Condition {
18 fn default() -> Self {
19 Self::new()
20 }
21}
22
23impl Condition {
24 pub fn new() -> Condition {
25 Condition(Cell::new(Inner { data: Slab::new() }))
26 }
27
28 pub fn wait(&mut self) -> Waiter {
30 let token = self.0.get_mut().data.insert(None);
31 Waiter {
32 token,
33 inner: self.0.clone(),
34 }
35 }
36
37 pub fn notify(&self) {
39 let inner = self.0.get_ref();
40 for item in inner.data.iter() {
41 if let Some(waker) = item.1 {
42 waker.wake();
43 }
44 }
45 }
46}
47
48impl Drop for Condition {
49 fn drop(&mut self) {
50 self.notify()
51 }
52}
53
54#[must_use = "Waiter do nothing unless polled"]
55pub struct Waiter {
56 token: usize,
57 inner: Cell<Inner>,
58}
59
60impl Clone for Waiter {
61 fn clone(&self) -> Self {
62 let token = unsafe { self.inner.get_mut_unsafe() }.data.insert(None);
63 Waiter {
64 token,
65 inner: self.inner.clone(),
66 }
67 }
68}
69
70impl Future for Waiter {
71 type Output = ();
72
73 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
74 let this = self.get_mut();
75
76 let inner = unsafe { this.inner.get_mut().data.get_unchecked_mut(this.token) };
77 if inner.is_none() {
78 let waker = LocalWaker::default();
79 waker.register(cx.waker());
80 *inner = Some(waker);
81 Poll::Pending
82 } else if inner.as_mut().unwrap().register(cx.waker()) {
83 Poll::Pending
84 } else {
85 Poll::Ready(())
86 }
87 }
88}
89
90impl Drop for Waiter {
91 fn drop(&mut self) {
92 self.inner.get_mut().data.remove(self.token);
93 }
94}
95
96#[cfg(test)]
97mod tests {
98 use super::*;
99 use futures::future::lazy;
100
101 #[actori_rt::test]
102 async fn test_condition() {
103 let mut cond = Condition::new();
104 let mut waiter = cond.wait();
105 assert_eq!(
106 lazy(|cx| Pin::new(&mut waiter).poll(cx)).await,
107 Poll::Pending
108 );
109 cond.notify();
110 assert_eq!(waiter.await, ());
111
112 let mut waiter = cond.wait();
113 assert_eq!(
114 lazy(|cx| Pin::new(&mut waiter).poll(cx)).await,
115 Poll::Pending
116 );
117 let mut waiter2 = waiter.clone();
118 assert_eq!(
119 lazy(|cx| Pin::new(&mut waiter2).poll(cx)).await,
120 Poll::Pending
121 );
122
123 drop(cond);
124 assert_eq!(waiter.await, ());
125 assert_eq!(waiter2.await, ());
126 }
127}