ntex_util/channel/
condition.rs1use std::{
2 cell, fmt, future::poll_fn, future::Future, pin::Pin, task::Context, task::Poll,
3};
4
5use slab::Slab;
6
7use super::cell::Cell;
8use crate::task::LocalWaker;
9
10pub struct Condition<T = ()>
12where
13 T: Default,
14{
15 inner: Cell<Inner<T>>,
16}
17
18struct Inner<T> {
19 data: Slab<Option<Item<T>>>,
20 ready: bool,
21 count: usize,
22}
23
24struct Item<T> {
25 val: cell::Cell<T>,
26 waker: LocalWaker,
27}
28
29impl<T: Default> Default for Condition<T> {
30 fn default() -> Self {
31 Condition {
32 inner: Cell::new(Inner {
33 data: Slab::new(),
34 ready: false,
35 count: 1,
36 }),
37 }
38 }
39}
40
41impl<T: Default> Clone for Condition<T> {
42 fn clone(&self) -> Self {
43 let inner = self.inner.clone();
44 inner.get_mut().count += 1;
45 Self { inner }
46 }
47}
48
49impl<T: Default> fmt::Debug for Condition<T> {
50 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
51 f.debug_struct("Condition")
52 .field("ready", &self.inner.get_ref().ready)
53 .finish()
54 }
55}
56
57impl Condition<()> {
58 pub fn new() -> Condition<()> {
60 Condition {
61 inner: Cell::new(Inner {
62 data: Slab::new(),
63 ready: false,
64 count: 1,
65 }),
66 }
67 }
68}
69
70impl<T: Default> Condition<T> {
71 pub fn wait(&self) -> Waiter<T> {
73 let token = self.inner.get_mut().data.insert(None);
74 Waiter {
75 token,
76 inner: self.inner.clone(),
77 }
78 }
79
80 pub fn notify(&self) {
82 let inner = self.inner.get_ref();
83 for (_, item) in inner.data.iter() {
84 if let Some(item) = item {
85 item.waker.wake();
86 }
87 }
88 }
89
90 #[doc(hidden)]
91 pub fn notify_and_lock_readiness(&self) {
95 self.inner.get_mut().ready = true;
96 self.notify();
97 }
98}
99
100impl<T: Clone + Default> Condition<T> {
101 pub fn notify_with(&self, val: T) {
103 let inner = self.inner.get_ref();
104 for (_, item) in inner.data.iter() {
105 if let Some(item) = item {
106 if item.waker.wake_checked() {
107 item.val.set(val.clone());
108 }
109 }
110 }
111 }
112
113 #[doc(hidden)]
114 pub fn notify_with_and_lock_readiness(&self, val: T) {
118 self.inner.get_mut().ready = true;
119 self.notify_with(val);
120 }
121}
122
123impl<T: Default> Drop for Condition<T> {
124 fn drop(&mut self) {
125 let inner = self.inner.get_mut();
126 inner.count -= 1;
127 if inner.count == 0 {
128 self.notify_and_lock_readiness()
129 }
130 }
131}
132
133pub struct Waiter<T = ()> {
135 token: usize,
136 inner: Cell<Inner<T>>,
137}
138
139impl<T: Default> Waiter<T> {
140 pub async fn ready(&self) -> T {
142 poll_fn(|cx| self.poll_ready(cx)).await
143 }
144
145 pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<T> {
147 let parent = self.inner.get_mut();
148
149 let inner = unsafe { parent.data.get_unchecked_mut(self.token) };
150 if inner.is_none() {
151 let waker = LocalWaker::default();
152 waker.register(cx.waker());
153 *inner = Some(Item {
154 waker,
155 val: cell::Cell::new(Default::default()),
156 });
157 } else {
158 let item = inner.as_mut().unwrap();
159 if !item.waker.register(cx.waker()) {
160 return Poll::Ready(item.val.replace(Default::default()));
161 }
162 }
163 if parent.ready {
164 Poll::Ready(Default::default())
165 } else {
166 Poll::Pending
167 }
168 }
169}
170
171impl<T> Clone for Waiter<T> {
172 fn clone(&self) -> Self {
173 let token = self.inner.get_mut().data.insert(None);
174 Waiter {
175 token,
176 inner: self.inner.clone(),
177 }
178 }
179}
180
181impl<T: Default> Future for Waiter<T> {
182 type Output = T;
183
184 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
185 self.get_mut().poll_ready(cx)
186 }
187}
188
189impl<T: Default> fmt::Debug for Waiter<T> {
190 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
191 f.debug_struct("Waiter").finish()
192 }
193}
194
195impl<T> Drop for Waiter<T> {
196 fn drop(&mut self) {
197 self.inner.get_mut().data.remove(self.token);
198 }
199}
200
201#[cfg(test)]
202mod tests {
203 use super::*;
204 use crate::future::lazy;
205
206 #[ntex_macros::rt_test2]
207 #[allow(clippy::unit_cmp)]
208 async fn test_condition() {
209 let cond = Condition::new();
210 let mut waiter = cond.wait();
211 assert_eq!(
212 lazy(|cx| Pin::new(&mut waiter).poll(cx)).await,
213 Poll::Pending
214 );
215 cond.notify();
216 assert!(format!("{cond:?}").contains("Condition"));
217 assert!(format!("{waiter:?}").contains("Waiter"));
218 assert_eq!(waiter.await, ());
219
220 let mut waiter = cond.wait();
221 assert_eq!(
222 lazy(|cx| Pin::new(&mut waiter).poll(cx)).await,
223 Poll::Pending
224 );
225 let mut waiter2 = waiter.clone();
226 assert_eq!(
227 lazy(|cx| Pin::new(&mut waiter2).poll(cx)).await,
228 Poll::Pending
229 );
230
231 drop(cond);
232 assert_eq!(waiter.await, ());
233 assert_eq!(waiter2.await, ());
234 }
235
236 #[ntex_macros::rt_test2]
237 async fn test_condition_poll() {
238 let cond = Condition::default().clone();
239 let waiter = cond.wait();
240 assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Pending);
241 cond.notify();
242 waiter.ready().await;
243
244 let waiter2 = waiter.clone();
245 assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Pending);
246 assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Pending);
247 assert_eq!(lazy(|cx| waiter2.poll_ready(cx)).await, Poll::Pending);
248 assert_eq!(lazy(|cx| waiter2.poll_ready(cx)).await, Poll::Pending);
249
250 drop(cond);
251 assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Ready(()));
252 assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Ready(()));
253 assert_eq!(lazy(|cx| waiter2.poll_ready(cx)).await, Poll::Ready(()));
254 assert_eq!(lazy(|cx| waiter2.poll_ready(cx)).await, Poll::Ready(()));
255 }
256
257 #[ntex_macros::rt_test2]
258 async fn test_condition_with() {
259 let cond = Condition::<String>::default();
260 let waiter = cond.wait();
261 assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Pending);
262 cond.notify_with("TEST".into());
263 assert_eq!(waiter.ready().await, "TEST".to_string());
264
265 let waiter2 = waiter.clone();
266 assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Pending);
267 assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Pending);
268 assert_eq!(lazy(|cx| waiter2.poll_ready(cx)).await, Poll::Pending);
269 assert_eq!(lazy(|cx| waiter2.poll_ready(cx)).await, Poll::Pending);
270
271 drop(cond);
272 assert_eq!(
273 lazy(|cx| waiter.poll_ready(cx)).await,
274 Poll::Ready("".into())
275 );
276 assert_eq!(
277 lazy(|cx| waiter.poll_ready(cx)).await,
278 Poll::Ready("".into())
279 );
280 assert_eq!(
281 lazy(|cx| waiter2.poll_ready(cx)).await,
282 Poll::Ready("".into())
283 );
284 assert_eq!(
285 lazy(|cx| waiter2.poll_ready(cx)).await,
286 Poll::Ready("".into())
287 );
288 }
289
290 #[ntex_macros::rt_test2]
291 async fn notify_ready() {
292 let cond = Condition::default().clone();
293 let waiter = cond.wait();
294 assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Pending);
295
296 cond.notify_and_lock_readiness();
297 assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Ready(()));
298 assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Ready(()));
299 assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Ready(()));
300
301 let waiter2 = cond.wait();
302 assert_eq!(lazy(|cx| waiter2.poll_ready(cx)).await, Poll::Ready(()));
303 }
304
305 #[ntex_macros::rt_test2]
306 async fn notify_with_and_lock_ready() {
307 let cond = Condition::<String>::default();
309 let waiter = cond.wait();
310 let waiter2 = cond.wait();
311 assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Pending);
312 assert_eq!(lazy(|cx| waiter2.poll_ready(cx)).await, Poll::Pending);
313
314 cond.notify_with_and_lock_readiness("TEST".into());
315 assert_eq!(
316 lazy(|cx| waiter.poll_ready(cx)).await,
317 Poll::Ready("TEST".into())
318 );
319 assert_eq!(
320 lazy(|cx| waiter.poll_ready(cx)).await,
321 Poll::Ready("".into())
322 );
323 assert_eq!(
324 lazy(|cx| waiter.poll_ready(cx)).await,
325 Poll::Ready("".into())
326 );
327 assert_eq!(
328 lazy(|cx| waiter2.poll_ready(cx)).await,
329 Poll::Ready("TEST".into())
330 );
331 assert_eq!(
332 lazy(|cx| waiter2.poll_ready(cx)).await,
333 Poll::Ready("".into())
334 );
335
336 let waiter2 = cond.wait();
337 assert_eq!(
338 lazy(|cx| waiter2.poll_ready(cx)).await,
339 Poll::Ready("".into())
340 );
341 }
342}