1mod deque;
8mod local;
9mod poll_fn;
10mod shared;
11
12use std::future::Future;
13use std::marker::PhantomData;
14use std::pin::Pin;
15use std::task::Poll;
16
17use crate::poll_fn::poll_fn;
18
19pub fn push(task: impl Future<Output = ()> + Send + 'static) {
21 shared::push(task);
22}
23
24pub fn push_local(task: impl Future<Output = ()> + 'static) {
26 local::push(task);
27}
28
29pub struct Poller<'a> {
31 local: local::Poller<'a>,
32 shared: shared::Poller<'a>,
33
34 _marker: PhantomData<*mut ()>,
36}
37
38pub fn poller<'a>() -> Poller<'a> {
42 Poller {
43 local: local::poller(),
44 shared: shared::poller(),
45 _marker: PhantomData,
46 }
47}
48
49impl<'a> Poller<'a> {
50 #[inline(always)]
61 pub fn poll_one(&self) -> bool {
62 if self.local.poll_one() {
63 return true;
64 }
65
66 if self.shared.poll_one() {
67 return true;
68 }
69
70 false
71 }
72
73 #[inline(always)]
77 pub async fn wait(&self) -> bool {
78 let mut local_wait = Some(self.local.wait());
79 let mut shared_wait = Some(self.shared.wait());
80 poll_fn(move |cx| {
81 assert!(
82 local_wait.is_some() || shared_wait.is_some(),
83 "calling poll when future is already done"
84 );
85
86 if local_wait.is_some() {
87 let f = local_wait.as_mut().unwrap();
88 match unsafe { Pin::new_unchecked(f) }.poll(cx) {
89 Poll::Ready(ok) => {
90 local_wait.take();
91 if ok {
92 shared_wait.take();
93 return Poll::Ready(true);
94 }
95 }
96 Poll::Pending => {}
97 }
98 }
99
100 if shared_wait.is_some() {
101 let f = shared_wait.as_mut().unwrap();
102 match unsafe { Pin::new_unchecked(f) }.poll(cx) {
103 Poll::Ready(ok) => {
104 shared_wait.take();
105 if ok {
106 local_wait.take();
107 return Poll::Ready(true);
108 }
109 }
110 Poll::Pending => {}
111 }
112 }
113
114 if local_wait.is_some() || shared_wait.is_some() {
115 Poll::Pending
116 } else {
117 Poll::Ready(false)
118 }
119 })
120 .await
121 }
122}