Skip to main content

ntex_util/channel/
condition.rs

1use std::{
2    cell, fmt, future::Future, future::poll_fn, pin::Pin, task::Context, task::Poll,
3};
4
5use slab::Slab;
6
7use super::cell::Cell;
8use crate::task::LocalWaker;
9
10/// Condition allows to notify multiple waiters at the same time
11pub struct Condition<T = ()>
12where
13    T: Default,
14{
15    inner: Cell<Inner<T>>,
16}
17
18struct Inner<T> {
19    data: Slab<Option<Item<T>>>,
20    ready: bool,
21    count: usize,
22}
23
24struct Item<T> {
25    val: cell::Cell<T>,
26    waker: LocalWaker,
27}
28
29impl<T: Default> Default for Condition<T> {
30    fn default() -> Self {
31        Condition {
32            inner: Cell::new(Inner {
33                data: Slab::new(),
34                ready: false,
35                count: 1,
36            }),
37        }
38    }
39}
40
41impl<T: Default> Clone for Condition<T> {
42    fn clone(&self) -> Self {
43        let inner = self.inner.clone();
44        inner.get_mut().count += 1;
45        Self { inner }
46    }
47}
48
49impl<T: Default> fmt::Debug for Condition<T> {
50    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
51        f.debug_struct("Condition")
52            .field("ready", &self.inner.get_ref().ready)
53            .finish()
54    }
55}
56
57impl Condition<()> {
58    /// Construct new condition instance
59    pub fn new() -> Condition<()> {
60        Condition {
61            inner: Cell::new(Inner {
62                data: Slab::new(),
63                ready: false,
64                count: 1,
65            }),
66        }
67    }
68}
69
70impl<T: Default> Condition<T> {
71    /// Get condition waiter
72    pub fn wait(&self) -> Waiter<T> {
73        let token = self.inner.get_mut().data.insert(None);
74        Waiter {
75            token,
76            inner: self.inner.clone(),
77        }
78    }
79
80    /// Notify all waiters
81    pub fn notify(&self) {
82        let inner = self.inner.get_ref();
83        for (_, item) in &inner.data {
84            if let Some(item) = item {
85                item.waker.wake();
86            }
87        }
88    }
89
90    #[doc(hidden)]
91    /// Notify all waiters.
92    ///
93    /// All subsequent waiter readiness checks always returns `ready`
94    pub fn notify_and_lock_readiness(&self) {
95        self.inner.get_mut().ready = true;
96        self.notify();
97    }
98}
99
100impl<T: Clone + Default> Condition<T> {
101    #[allow(clippy::needless_pass_by_value)]
102    /// Notify all waiters
103    pub fn notify_with(&self, val: T) {
104        let inner = self.inner.get_ref();
105        for (_, item) in &inner.data {
106            if let Some(item) = item
107                && item.waker.wake_checked()
108            {
109                item.val.set(val.clone());
110            }
111        }
112    }
113
114    #[doc(hidden)]
115    /// Notify all waiters.
116    ///
117    /// All subsequent waiter readiness checks always returns `ready`
118    pub fn notify_with_and_lock_readiness(&self, val: T) {
119        self.inner.get_mut().ready = true;
120        self.notify_with(val);
121    }
122}
123
124impl<T: Default> Drop for Condition<T> {
125    fn drop(&mut self) {
126        let inner = self.inner.get_mut();
127        inner.count -= 1;
128        if inner.count == 0 {
129            self.notify_and_lock_readiness();
130        }
131    }
132}
133
134/// Waits for result from condition
135pub struct Waiter<T = ()> {
136    token: usize,
137    inner: Cell<Inner<T>>,
138}
139
140impl<T: Default> Waiter<T> {
141    /// Returns readiness state of the condition.
142    pub async fn ready(&self) -> T {
143        poll_fn(|cx| self.poll_ready(cx)).await
144    }
145
146    #[allow(clippy::missing_panics_doc)]
147    /// Returns readiness state of the condition.
148    pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<T> {
149        let parent = self.inner.get_mut();
150
151        let inner = unsafe { parent.data.get_unchecked_mut(self.token) };
152        if inner.is_none() {
153            let waker = LocalWaker::default();
154            waker.register(cx.waker());
155            *inner = Some(Item {
156                waker,
157                val: cell::Cell::new(Default::default()),
158            });
159        } else {
160            let item = inner.as_mut().unwrap();
161            if !item.waker.register(cx.waker()) {
162                return Poll::Ready(item.val.replace(Default::default()));
163            }
164        }
165        if parent.ready {
166            Poll::Ready(Default::default())
167        } else {
168            Poll::Pending
169        }
170    }
171}
172
173impl<T> Clone for Waiter<T> {
174    fn clone(&self) -> Self {
175        let token = self.inner.get_mut().data.insert(None);
176        Waiter {
177            token,
178            inner: self.inner.clone(),
179        }
180    }
181}
182
183impl<T: Default> Future for Waiter<T> {
184    type Output = T;
185
186    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
187        self.get_mut().poll_ready(cx)
188    }
189}
190
191impl<T: Default> fmt::Debug for Waiter<T> {
192    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
193        f.debug_struct("Waiter").finish()
194    }
195}
196
197impl<T> Drop for Waiter<T> {
198    fn drop(&mut self) {
199        self.inner.get_mut().data.remove(self.token);
200    }
201}
202
203#[cfg(test)]
204mod tests {
205    use super::*;
206    use crate::future::lazy;
207
208    #[ntex::test]
209    #[allow(clippy::unit_cmp)]
210    async fn test_condition() {
211        let cond = Condition::new();
212        let mut waiter = cond.wait();
213        assert_eq!(
214            lazy(|cx| Pin::new(&mut waiter).poll(cx)).await,
215            Poll::Pending
216        );
217        cond.notify();
218        assert!(format!("{cond:?}").contains("Condition"));
219        assert!(format!("{waiter:?}").contains("Waiter"));
220        assert_eq!(waiter.await, ());
221
222        let mut waiter = cond.wait();
223        assert_eq!(
224            lazy(|cx| Pin::new(&mut waiter).poll(cx)).await,
225            Poll::Pending
226        );
227        let mut waiter2 = waiter.clone();
228        assert_eq!(
229            lazy(|cx| Pin::new(&mut waiter2).poll(cx)).await,
230            Poll::Pending
231        );
232
233        drop(cond);
234        assert_eq!(waiter.await, ());
235        assert_eq!(waiter2.await, ());
236    }
237
238    #[ntex::test]
239    async fn test_condition_poll() {
240        let cond = Condition::default().clone();
241        let waiter = cond.wait();
242        assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Pending);
243        cond.notify();
244        waiter.ready().await;
245
246        let waiter2 = waiter.clone();
247        assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Pending);
248        assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Pending);
249        assert_eq!(lazy(|cx| waiter2.poll_ready(cx)).await, Poll::Pending);
250        assert_eq!(lazy(|cx| waiter2.poll_ready(cx)).await, Poll::Pending);
251
252        drop(cond);
253        assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Ready(()));
254        assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Ready(()));
255        assert_eq!(lazy(|cx| waiter2.poll_ready(cx)).await, Poll::Ready(()));
256        assert_eq!(lazy(|cx| waiter2.poll_ready(cx)).await, Poll::Ready(()));
257    }
258
259    #[ntex::test]
260    async fn test_condition_with() {
261        let cond = Condition::<String>::default();
262        let waiter = cond.wait();
263        assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Pending);
264        cond.notify_with("TEST".into());
265        assert_eq!(waiter.ready().await, "TEST".to_string());
266
267        let waiter2 = waiter.clone();
268        assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Pending);
269        assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Pending);
270        assert_eq!(lazy(|cx| waiter2.poll_ready(cx)).await, Poll::Pending);
271        assert_eq!(lazy(|cx| waiter2.poll_ready(cx)).await, Poll::Pending);
272
273        drop(cond);
274        assert_eq!(
275            lazy(|cx| waiter.poll_ready(cx)).await,
276            Poll::Ready(String::new())
277        );
278        assert_eq!(
279            lazy(|cx| waiter.poll_ready(cx)).await,
280            Poll::Ready(String::new())
281        );
282        assert_eq!(
283            lazy(|cx| waiter2.poll_ready(cx)).await,
284            Poll::Ready(String::new())
285        );
286        assert_eq!(
287            lazy(|cx| waiter2.poll_ready(cx)).await,
288            Poll::Ready(String::new())
289        );
290    }
291
292    #[ntex::test]
293    async fn notify_ready() {
294        let cond = Condition::default().clone();
295        let waiter = cond.wait();
296        assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Pending);
297
298        cond.notify_and_lock_readiness();
299        assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Ready(()));
300        assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Ready(()));
301        assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Ready(()));
302
303        let waiter2 = cond.wait();
304        assert_eq!(lazy(|cx| waiter2.poll_ready(cx)).await, Poll::Ready(()));
305    }
306
307    #[ntex::test]
308    async fn notify_with_and_lock_ready() {
309        // with
310        let cond = Condition::<String>::default();
311        let waiter = cond.wait();
312        let waiter2 = cond.wait();
313        assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Pending);
314        assert_eq!(lazy(|cx| waiter2.poll_ready(cx)).await, Poll::Pending);
315
316        cond.notify_with_and_lock_readiness("TEST".into());
317        assert_eq!(
318            lazy(|cx| waiter.poll_ready(cx)).await,
319            Poll::Ready("TEST".into())
320        );
321        assert_eq!(
322            lazy(|cx| waiter.poll_ready(cx)).await,
323            Poll::Ready(String::new())
324        );
325        assert_eq!(
326            lazy(|cx| waiter.poll_ready(cx)).await,
327            Poll::Ready(String::new())
328        );
329        assert_eq!(
330            lazy(|cx| waiter2.poll_ready(cx)).await,
331            Poll::Ready("TEST".into())
332        );
333        assert_eq!(
334            lazy(|cx| waiter2.poll_ready(cx)).await,
335            Poll::Ready(String::new())
336        );
337
338        let waiter2 = cond.wait();
339        assert_eq!(
340            lazy(|cx| waiter2.poll_ready(cx)).await,
341            Poll::Ready(String::new())
342        );
343    }
344}