ntex_util/channel/
condition.rs

1use std::{
2    cell, fmt, future::poll_fn, future::Future, pin::Pin, task::Context, task::Poll,
3};
4
5use slab::Slab;
6
7use super::cell::Cell;
8use crate::task::LocalWaker;
9
10/// Condition allows to notify multiple waiters at the same time
11pub struct Condition<T = ()>
12where
13    T: Default,
14{
15    inner: Cell<Inner<T>>,
16}
17
18struct Inner<T> {
19    data: Slab<Option<Item<T>>>,
20    ready: bool,
21    count: usize,
22}
23
24struct Item<T> {
25    val: cell::Cell<T>,
26    waker: LocalWaker,
27}
28
29impl<T: Default> Default for Condition<T> {
30    fn default() -> Self {
31        Condition {
32            inner: Cell::new(Inner {
33                data: Slab::new(),
34                ready: false,
35                count: 1,
36            }),
37        }
38    }
39}
40
41impl<T: Default> Clone for Condition<T> {
42    fn clone(&self) -> Self {
43        let inner = self.inner.clone();
44        inner.get_mut().count += 1;
45        Self { inner }
46    }
47}
48
49impl<T: Default> fmt::Debug for Condition<T> {
50    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
51        f.debug_struct("Condition")
52            .field("ready", &self.inner.get_ref().ready)
53            .finish()
54    }
55}
56
57impl Condition<()> {
58    /// Coonstruct new condition instance
59    pub fn new() -> Condition<()> {
60        Condition {
61            inner: Cell::new(Inner {
62                data: Slab::new(),
63                ready: false,
64                count: 1,
65            }),
66        }
67    }
68}
69
70impl<T: Default> Condition<T> {
71    /// Get condition waiter
72    pub fn wait(&self) -> Waiter<T> {
73        let token = self.inner.get_mut().data.insert(None);
74        Waiter {
75            token,
76            inner: self.inner.clone(),
77        }
78    }
79
80    /// Notify all waiters
81    pub fn notify(&self) {
82        let inner = self.inner.get_ref();
83        for (_, item) in inner.data.iter() {
84            if let Some(item) = item {
85                item.waker.wake();
86            }
87        }
88    }
89
90    #[doc(hidden)]
91    /// Notify all waiters.
92    ///
93    /// All subsequent waiter readiness checks always returns `ready`
94    pub fn notify_and_lock_readiness(&self) {
95        self.inner.get_mut().ready = true;
96        self.notify();
97    }
98}
99
100impl<T: Clone + Default> Condition<T> {
101    /// Notify all waiters
102    pub fn notify_with(&self, val: T) {
103        let inner = self.inner.get_ref();
104        for (_, item) in inner.data.iter() {
105            if let Some(item) = item {
106                if item.waker.wake_checked() {
107                    item.val.set(val.clone());
108                }
109            }
110        }
111    }
112
113    #[doc(hidden)]
114    /// Notify all waiters.
115    ///
116    /// All subsequent waiter readiness checks always returns `ready`
117    pub fn notify_with_and_lock_readiness(&self, val: T) {
118        self.inner.get_mut().ready = true;
119        self.notify_with(val);
120    }
121}
122
123impl<T: Default> Drop for Condition<T> {
124    fn drop(&mut self) {
125        let inner = self.inner.get_mut();
126        inner.count -= 1;
127        if inner.count == 0 {
128            self.notify_and_lock_readiness()
129        }
130    }
131}
132
133/// Waits for result from condition
134pub struct Waiter<T = ()> {
135    token: usize,
136    inner: Cell<Inner<T>>,
137}
138
139impl<T: Default> Waiter<T> {
140    /// Returns readiness state of the condition.
141    pub async fn ready(&self) -> T {
142        poll_fn(|cx| self.poll_ready(cx)).await
143    }
144
145    /// Returns readiness state of the condition.
146    pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<T> {
147        let parent = self.inner.get_mut();
148
149        let inner = unsafe { parent.data.get_unchecked_mut(self.token) };
150        if inner.is_none() {
151            let waker = LocalWaker::default();
152            waker.register(cx.waker());
153            *inner = Some(Item {
154                waker,
155                val: cell::Cell::new(Default::default()),
156            });
157        } else {
158            let item = inner.as_mut().unwrap();
159            if !item.waker.register(cx.waker()) {
160                return Poll::Ready(item.val.replace(Default::default()));
161            }
162        }
163        if parent.ready {
164            Poll::Ready(Default::default())
165        } else {
166            Poll::Pending
167        }
168    }
169}
170
171impl<T> Clone for Waiter<T> {
172    fn clone(&self) -> Self {
173        let token = self.inner.get_mut().data.insert(None);
174        Waiter {
175            token,
176            inner: self.inner.clone(),
177        }
178    }
179}
180
181impl<T: Default> Future for Waiter<T> {
182    type Output = T;
183
184    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
185        self.get_mut().poll_ready(cx)
186    }
187}
188
189impl<T: Default> fmt::Debug for Waiter<T> {
190    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
191        f.debug_struct("Waiter").finish()
192    }
193}
194
195impl<T> Drop for Waiter<T> {
196    fn drop(&mut self) {
197        self.inner.get_mut().data.remove(self.token);
198    }
199}
200
201#[cfg(test)]
202mod tests {
203    use super::*;
204    use crate::future::lazy;
205
206    #[ntex_macros::rt_test2]
207    #[allow(clippy::unit_cmp)]
208    async fn test_condition() {
209        let cond = Condition::new();
210        let mut waiter = cond.wait();
211        assert_eq!(
212            lazy(|cx| Pin::new(&mut waiter).poll(cx)).await,
213            Poll::Pending
214        );
215        cond.notify();
216        assert!(format!("{cond:?}").contains("Condition"));
217        assert!(format!("{waiter:?}").contains("Waiter"));
218        assert_eq!(waiter.await, ());
219
220        let mut waiter = cond.wait();
221        assert_eq!(
222            lazy(|cx| Pin::new(&mut waiter).poll(cx)).await,
223            Poll::Pending
224        );
225        let mut waiter2 = waiter.clone();
226        assert_eq!(
227            lazy(|cx| Pin::new(&mut waiter2).poll(cx)).await,
228            Poll::Pending
229        );
230
231        drop(cond);
232        assert_eq!(waiter.await, ());
233        assert_eq!(waiter2.await, ());
234    }
235
236    #[ntex_macros::rt_test2]
237    async fn test_condition_poll() {
238        let cond = Condition::default().clone();
239        let waiter = cond.wait();
240        assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Pending);
241        cond.notify();
242        waiter.ready().await;
243
244        let waiter2 = waiter.clone();
245        assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Pending);
246        assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Pending);
247        assert_eq!(lazy(|cx| waiter2.poll_ready(cx)).await, Poll::Pending);
248        assert_eq!(lazy(|cx| waiter2.poll_ready(cx)).await, Poll::Pending);
249
250        drop(cond);
251        assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Ready(()));
252        assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Ready(()));
253        assert_eq!(lazy(|cx| waiter2.poll_ready(cx)).await, Poll::Ready(()));
254        assert_eq!(lazy(|cx| waiter2.poll_ready(cx)).await, Poll::Ready(()));
255    }
256
257    #[ntex_macros::rt_test2]
258    async fn test_condition_with() {
259        let cond = Condition::<String>::default();
260        let waiter = cond.wait();
261        assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Pending);
262        cond.notify_with("TEST".into());
263        assert_eq!(waiter.ready().await, "TEST".to_string());
264
265        let waiter2 = waiter.clone();
266        assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Pending);
267        assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Pending);
268        assert_eq!(lazy(|cx| waiter2.poll_ready(cx)).await, Poll::Pending);
269        assert_eq!(lazy(|cx| waiter2.poll_ready(cx)).await, Poll::Pending);
270
271        drop(cond);
272        assert_eq!(
273            lazy(|cx| waiter.poll_ready(cx)).await,
274            Poll::Ready("".into())
275        );
276        assert_eq!(
277            lazy(|cx| waiter.poll_ready(cx)).await,
278            Poll::Ready("".into())
279        );
280        assert_eq!(
281            lazy(|cx| waiter2.poll_ready(cx)).await,
282            Poll::Ready("".into())
283        );
284        assert_eq!(
285            lazy(|cx| waiter2.poll_ready(cx)).await,
286            Poll::Ready("".into())
287        );
288    }
289
290    #[ntex_macros::rt_test2]
291    async fn notify_ready() {
292        let cond = Condition::default().clone();
293        let waiter = cond.wait();
294        assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Pending);
295
296        cond.notify_and_lock_readiness();
297        assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Ready(()));
298        assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Ready(()));
299        assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Ready(()));
300
301        let waiter2 = cond.wait();
302        assert_eq!(lazy(|cx| waiter2.poll_ready(cx)).await, Poll::Ready(()));
303    }
304
305    #[ntex_macros::rt_test2]
306    async fn notify_with_and_lock_ready() {
307        // with
308        let cond = Condition::<String>::default();
309        let waiter = cond.wait();
310        let waiter2 = cond.wait();
311        assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Pending);
312        assert_eq!(lazy(|cx| waiter2.poll_ready(cx)).await, Poll::Pending);
313
314        cond.notify_with_and_lock_readiness("TEST".into());
315        assert_eq!(
316            lazy(|cx| waiter.poll_ready(cx)).await,
317            Poll::Ready("TEST".into())
318        );
319        assert_eq!(
320            lazy(|cx| waiter.poll_ready(cx)).await,
321            Poll::Ready("".into())
322        );
323        assert_eq!(
324            lazy(|cx| waiter.poll_ready(cx)).await,
325            Poll::Ready("".into())
326        );
327        assert_eq!(
328            lazy(|cx| waiter2.poll_ready(cx)).await,
329            Poll::Ready("TEST".into())
330        );
331        assert_eq!(
332            lazy(|cx| waiter2.poll_ready(cx)).await,
333            Poll::Ready("".into())
334        );
335
336        let waiter2 = cond.wait();
337        assert_eq!(
338            lazy(|cx| waiter2.poll_ready(cx)).await,
339            Poll::Ready("".into())
340        );
341    }
342}