1use std::cell::{OnceCell, RefCell};
6use std::future::Future;
7use std::pin::Pin;
8use std::rc::Rc;
9use std::sync::{Arc, Condvar, Mutex};
10use std::task::{Context, Poll, Waker};
11
12pub type LocalFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>;
13pub type LocalVoidFuture<'a> = LocalFuture<'a, ()>;
14
15#[derive(Debug, Copy, Clone, Eq, PartialEq)]
16pub enum TaskError {
17 LockingError,
19
20 NotCompletedError,
22
23 ExecutorStoppingError,
25
26 ExchangerError,
28}
29
30struct CompletableTaskInner<T> {
31 result: OnceCell<T>,
32 is_complete: bool,
33 waker: Option<Waker>,
34}
35impl<T> CompletableTaskInner<T> {
36 pub fn new() -> Self {
37 CompletableTaskInner {
38 result: OnceCell::new(),
39 is_complete: false,
40 waker: None,
41 }
42 }
43
44 pub fn try_set(&mut self, value: T) -> Result<(), T> {
45 if self.is_complete {
46 return Err(value);
47 };
48 let oldval = self.result.set(value);
49 self.is_complete = true;
50 if let Some(waker) = self.waker.take() {
51 waker.wake();
52 }
53 oldval?;
54
55 Ok(())
56 }
57
58 pub fn take(&mut self) -> Option<T> {
59 self.result.take()
60 }
61}
62
63#[derive(Clone)]
71pub struct CompletableTask<T> {
72 inner: Arc<(Mutex<CompletableTaskInner<T>>, Condvar)>,
73}
74
75impl<T> CompletableTask<T> {
76 pub fn new() -> CompletableTask<T> {
79 let inner = CompletableTaskInner::new();
80 CompletableTask {
81 inner: Arc::new((Mutex::new(inner), Condvar::new())),
82 }
83 }
84
85 pub fn try_complete(&self, value: T) -> Result<(), T> {
93 let arc = self.inner.clone();
94 let Ok(mut inner) = arc.0.lock() else {
95 return Err(value);
96 };
97 if inner.is_complete {
98 return Err(value);
99 }
100 inner.try_set(value)?;
101
102 arc.1.notify_all();
103 Ok(())
104 }
105
106 pub fn is_complete(&self) -> Result<bool, TaskError> {
113 let arc = self.inner.clone();
114 let Ok(inner) = arc.0.lock() else {
115 return Err(TaskError::LockingError);
116 };
117 Ok(inner.is_complete)
118 }
119
120 pub fn try_take(&self) -> Result<Poll<T>, TaskError> {
128 let arc = self.inner.clone();
129 let Ok(mut inner) = arc.0.lock() else {
130 return Err(TaskError::LockingError);
131 };
132 if let Some(val) = inner.take() {
133 return Ok(Poll::Ready(val));
134 }
135 Ok(Poll::Pending)
136 }
137
138 pub fn take_blocking(&self) -> Result<T, TaskError> {
144 let arc = self.inner.clone();
145 let Ok(inner) = arc.0.lock() else {
146 return Err(TaskError::LockingError);
147 };
148 let Ok(mut res) = arc.1.wait_while(inner, |v| !v.is_complete) else {
149 return Err(TaskError::LockingError);
150 };
151 if let Some(val) = res.result.take() {
152 return Ok(val);
153 }
154 Err(TaskError::NotCompletedError)
155 }
156
157 pub(crate) fn set_waker(&self, waker: Waker) -> Result<(), TaskError> {
159 let arc = self.inner.clone();
160 let Ok(mut inner) = arc.0.lock() else {
161 return Err(TaskError::LockingError);
162 };
163 if let Some(waker) = inner.waker.replace(waker) {
164 waker.wake();
166 }
167
168 Ok(())
169 }
170}
171
172impl<T> Default for CompletableTask<T> {
173 fn default() -> Self {
174 CompletableTask::new()
175 }
176}
177
178impl<T> Future for CompletableTask<T> {
179 type Output = Result<T, TaskError>;
180
181 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
182 if let Err(e) = self.set_waker(cx.waker().clone()) {
183 return Poll::Ready(Err(e));
184 }
185
186 match self.try_take() {
187 Ok(Poll::Ready(v)) => Poll::Ready(Ok(v)),
188 Ok(Poll::Pending) => Poll::Pending,
189 Err(e) => Poll::Ready(Err(e)),
190 }
191 }
192}
193
194pub struct LocalCompletableTask<T> {
198 result: Rc<RefCell<Option<T>>>,
199}
200
201impl<T> Clone for LocalCompletableTask<T> {
202 fn clone(&self) -> Self {
203 LocalCompletableTask {
204 result: self.result.clone(),
205 }
206 }
207}
208
209impl<T> LocalCompletableTask<T> {
210 pub fn new() -> Self {
212 LocalCompletableTask {
213 result: Rc::new(RefCell::new(None)),
214 }
215 }
216
217 pub fn try_complete(&self, value: T) -> Result<(), T> {
222 let res = self.result.clone();
223 if res.borrow().is_some() {
224 return Err(value);
225 }
226 if let Some(t) = res.replace(Some(value)) {
227 return Err(t);
228 }
229 Ok(())
230 }
231
232 pub fn get(&self) -> Poll<T> {
236 if let Some(v) = self.result.take() {
237 return Poll::Ready(v);
238 }
239 Poll::Pending
240 }
241}
242
243impl<T> Default for LocalCompletableTask<T> {
244 fn default() -> Self {
245 LocalCompletableTask::new()
246 }
247}