ntex_util/channel/
condition.rs1use std::{
2 cell, fmt, future::Future, future::poll_fn, pin::Pin, task::Context, task::Poll,
3};
4
5use slab::Slab;
6
7use super::cell::Cell;
8use crate::task::LocalWaker;
9
10pub struct Condition<T = ()>
12where
13 T: Default,
14{
15 inner: Cell<Inner<T>>,
16}
17
18struct Inner<T> {
19 data: Slab<Option<Item<T>>>,
20 ready: bool,
21 count: usize,
22}
23
24struct Item<T> {
25 val: cell::Cell<T>,
26 waker: LocalWaker,
27}
28
29impl<T: Default> Default for Condition<T> {
30 fn default() -> Self {
31 Condition {
32 inner: Cell::new(Inner {
33 data: Slab::new(),
34 ready: false,
35 count: 1,
36 }),
37 }
38 }
39}
40
41impl<T: Default> Clone for Condition<T> {
42 fn clone(&self) -> Self {
43 let inner = self.inner.clone();
44 inner.get_mut().count += 1;
45 Self { inner }
46 }
47}
48
49impl<T: Default> fmt::Debug for Condition<T> {
50 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
51 f.debug_struct("Condition")
52 .field("ready", &self.inner.get_ref().ready)
53 .finish()
54 }
55}
56
57impl Condition<()> {
58 pub fn new() -> Condition<()> {
60 Condition {
61 inner: Cell::new(Inner {
62 data: Slab::new(),
63 ready: false,
64 count: 1,
65 }),
66 }
67 }
68}
69
70impl<T: Default> Condition<T> {
71 pub fn wait(&self) -> Waiter<T> {
73 let token = self.inner.get_mut().data.insert(None);
74 Waiter {
75 token,
76 inner: self.inner.clone(),
77 }
78 }
79
80 pub fn notify(&self) {
82 let inner = self.inner.get_ref();
83 for (_, item) in &inner.data {
84 if let Some(item) = item {
85 item.waker.wake();
86 }
87 }
88 }
89
90 #[doc(hidden)]
91 pub fn notify_and_lock_readiness(&self) {
95 self.inner.get_mut().ready = true;
96 self.notify();
97 }
98}
99
100impl<T: Clone + Default> Condition<T> {
101 #[allow(clippy::needless_pass_by_value)]
102 pub fn notify_with(&self, val: T) {
104 let inner = self.inner.get_ref();
105 for (_, item) in &inner.data {
106 if let Some(item) = item
107 && item.waker.wake_checked()
108 {
109 item.val.set(val.clone());
110 }
111 }
112 }
113
114 #[doc(hidden)]
115 pub fn notify_with_and_lock_readiness(&self, val: T) {
119 self.inner.get_mut().ready = true;
120 self.notify_with(val);
121 }
122}
123
124impl<T: Default> Drop for Condition<T> {
125 fn drop(&mut self) {
126 let inner = self.inner.get_mut();
127 inner.count -= 1;
128 if inner.count == 0 {
129 self.notify_and_lock_readiness();
130 }
131 }
132}
133
134pub struct Waiter<T = ()> {
136 token: usize,
137 inner: Cell<Inner<T>>,
138}
139
140impl<T: Default> Waiter<T> {
141 pub async fn ready(&self) -> T {
143 poll_fn(|cx| self.poll_ready(cx)).await
144 }
145
146 #[allow(clippy::missing_panics_doc)]
147 pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<T> {
149 let parent = self.inner.get_mut();
150
151 let inner = unsafe { parent.data.get_unchecked_mut(self.token) };
152 if inner.is_none() {
153 let waker = LocalWaker::default();
154 waker.register(cx.waker());
155 *inner = Some(Item {
156 waker,
157 val: cell::Cell::new(Default::default()),
158 });
159 } else {
160 let item = inner.as_mut().unwrap();
161 if !item.waker.register(cx.waker()) {
162 return Poll::Ready(item.val.replace(Default::default()));
163 }
164 }
165 if parent.ready {
166 Poll::Ready(Default::default())
167 } else {
168 Poll::Pending
169 }
170 }
171}
172
173impl<T> Clone for Waiter<T> {
174 fn clone(&self) -> Self {
175 let token = self.inner.get_mut().data.insert(None);
176 Waiter {
177 token,
178 inner: self.inner.clone(),
179 }
180 }
181}
182
183impl<T: Default> Future for Waiter<T> {
184 type Output = T;
185
186 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
187 self.get_mut().poll_ready(cx)
188 }
189}
190
191impl<T: Default> fmt::Debug for Waiter<T> {
192 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
193 f.debug_struct("Waiter").finish()
194 }
195}
196
197impl<T> Drop for Waiter<T> {
198 fn drop(&mut self) {
199 self.inner.get_mut().data.remove(self.token);
200 }
201}
202
203#[cfg(test)]
204mod tests {
205 use super::*;
206 use crate::future::lazy;
207
208 #[ntex::test]
209 #[allow(clippy::unit_cmp)]
210 async fn test_condition() {
211 let cond = Condition::new();
212 let mut waiter = cond.wait();
213 assert_eq!(
214 lazy(|cx| Pin::new(&mut waiter).poll(cx)).await,
215 Poll::Pending
216 );
217 cond.notify();
218 assert!(format!("{cond:?}").contains("Condition"));
219 assert!(format!("{waiter:?}").contains("Waiter"));
220 assert_eq!(waiter.await, ());
221
222 let mut waiter = cond.wait();
223 assert_eq!(
224 lazy(|cx| Pin::new(&mut waiter).poll(cx)).await,
225 Poll::Pending
226 );
227 let mut waiter2 = waiter.clone();
228 assert_eq!(
229 lazy(|cx| Pin::new(&mut waiter2).poll(cx)).await,
230 Poll::Pending
231 );
232
233 drop(cond);
234 assert_eq!(waiter.await, ());
235 assert_eq!(waiter2.await, ());
236 }
237
238 #[ntex::test]
239 async fn test_condition_poll() {
240 let cond = Condition::default().clone();
241 let waiter = cond.wait();
242 assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Pending);
243 cond.notify();
244 waiter.ready().await;
245
246 let waiter2 = waiter.clone();
247 assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Pending);
248 assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Pending);
249 assert_eq!(lazy(|cx| waiter2.poll_ready(cx)).await, Poll::Pending);
250 assert_eq!(lazy(|cx| waiter2.poll_ready(cx)).await, Poll::Pending);
251
252 drop(cond);
253 assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Ready(()));
254 assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Ready(()));
255 assert_eq!(lazy(|cx| waiter2.poll_ready(cx)).await, Poll::Ready(()));
256 assert_eq!(lazy(|cx| waiter2.poll_ready(cx)).await, Poll::Ready(()));
257 }
258
259 #[ntex::test]
260 async fn test_condition_with() {
261 let cond = Condition::<String>::default();
262 let waiter = cond.wait();
263 assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Pending);
264 cond.notify_with("TEST".into());
265 assert_eq!(waiter.ready().await, "TEST".to_string());
266
267 let waiter2 = waiter.clone();
268 assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Pending);
269 assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Pending);
270 assert_eq!(lazy(|cx| waiter2.poll_ready(cx)).await, Poll::Pending);
271 assert_eq!(lazy(|cx| waiter2.poll_ready(cx)).await, Poll::Pending);
272
273 drop(cond);
274 assert_eq!(
275 lazy(|cx| waiter.poll_ready(cx)).await,
276 Poll::Ready(String::new())
277 );
278 assert_eq!(
279 lazy(|cx| waiter.poll_ready(cx)).await,
280 Poll::Ready(String::new())
281 );
282 assert_eq!(
283 lazy(|cx| waiter2.poll_ready(cx)).await,
284 Poll::Ready(String::new())
285 );
286 assert_eq!(
287 lazy(|cx| waiter2.poll_ready(cx)).await,
288 Poll::Ready(String::new())
289 );
290 }
291
292 #[ntex::test]
293 async fn notify_ready() {
294 let cond = Condition::default().clone();
295 let waiter = cond.wait();
296 assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Pending);
297
298 cond.notify_and_lock_readiness();
299 assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Ready(()));
300 assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Ready(()));
301 assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Ready(()));
302
303 let waiter2 = cond.wait();
304 assert_eq!(lazy(|cx| waiter2.poll_ready(cx)).await, Poll::Ready(()));
305 }
306
307 #[ntex::test]
308 async fn notify_with_and_lock_ready() {
309 let cond = Condition::<String>::default();
311 let waiter = cond.wait();
312 let waiter2 = cond.wait();
313 assert_eq!(lazy(|cx| waiter.poll_ready(cx)).await, Poll::Pending);
314 assert_eq!(lazy(|cx| waiter2.poll_ready(cx)).await, Poll::Pending);
315
316 cond.notify_with_and_lock_readiness("TEST".into());
317 assert_eq!(
318 lazy(|cx| waiter.poll_ready(cx)).await,
319 Poll::Ready("TEST".into())
320 );
321 assert_eq!(
322 lazy(|cx| waiter.poll_ready(cx)).await,
323 Poll::Ready(String::new())
324 );
325 assert_eq!(
326 lazy(|cx| waiter.poll_ready(cx)).await,
327 Poll::Ready(String::new())
328 );
329 assert_eq!(
330 lazy(|cx| waiter2.poll_ready(cx)).await,
331 Poll::Ready("TEST".into())
332 );
333 assert_eq!(
334 lazy(|cx| waiter2.poll_ready(cx)).await,
335 Poll::Ready(String::new())
336 );
337
338 let waiter2 = cond.wait();
339 assert_eq!(
340 lazy(|cx| waiter2.poll_ready(cx)).await,
341 Poll::Ready(String::new())
342 );
343 }
344}