1use std::fmt::{self, Debug};
2use std::ops::FnMut;
3use super::State;
4use super::super::runner::Executable;
5
6#[cfg(feature = "futures_support")]
7use futures::{Async, Future, Poll};
8
9pub struct Task<'a, T = (), E = ()> {
51 task: Box<FnMut() -> State<T, E> + 'a>,
52 state: State<T, E>,
53}
54
55impl<'a, T, E> Task<'a, T, E>
56where
57 T: 'a,
58 E: 'a,
59{
60 pub fn new<F>(task: F) -> Self
84 where
85 F: FnMut() -> State<T, E> + 'a,
86 {
87 Self {
88 task: Box::new(task),
89 state: State::Pending,
90 }
91 }
92
93 pub fn from(val: T) -> Self {
113 let mut val = Some(val);
114 Self::new(move || match val.take() {
115 Some(v) => State::Resolve(v),
116 None => unreachable!(),
117 })
118 }
119
120 #[cfg(feature = "futures_support")]
121 pub fn from_future<F>(mut future: F) -> Self
122 where
123 F: Future<Item = T, Error = E> + 'a,
124 {
125 Self::new(move || loop {
126 match future.poll() {
127 Ok(v) => match v {
128 Async::Ready(v) => break State::Resolve(v),
129 Async::NotReady => (),
130 },
131 Err(e) => break State::Reject(e),
132 }
133 })
134 }
135
136 pub fn with<F>(mut with: F) -> Self
157 where
158 F: FnMut() -> T + 'a,
159 {
160 Self {
161 task: Box::new(move || State::Resolve(with())),
162 state: State::Pending,
163 }
164 }
165
166 pub fn join<U>(mut self, mut task: Task<'a, U, E>) -> Task<'a, (T, U), E>
190 where
191 U: 'a,
192 {
193 Task::new(move || {
194 if self.state.is_pending() {
195 self.exec();
196 }
197 if task.state.is_pending() {
198 task.exec();
199 }
200
201 if self.state.is_reject() {
202 return State::Reject(self.state.take().reject().unwrap());
203 }
204 if task.state.is_reject() {
205 return State::Reject(task.state.take().reject().unwrap());
206 }
207
208 if self.state.is_resolve() && task.state.is_resolve() {
209 let a_val = self.state.take().resolve().unwrap();
210 let b_val = task.state.take().resolve().unwrap();
211 return State::Resolve((a_val, b_val));
212 }
213 State::Pending
214 })
215 }
216
217 pub fn state(&self) -> &State<T, E> {
221 &self.state
222 }
223
224 pub fn poll(&mut self) -> Option<Result<T, E>> {
229 self.exec();
230 if self.state.is_pending() {
231 None
232 } else {
233 self.state.take().into_result()
234 }
235 }
236
237 pub fn wait(mut self) -> Option<Result<T, E>> {
239 loop {
240 self.exec();
241 if !self.state.is_pending() {
242 break self.state.into_result();
243 }
244 }
245 }
246
247 pub fn map<F, U>(self, mut map: F) -> Task<'a, U, E>
248 where
249 F: FnMut(T) -> U + 'a,
250 U: 'a,
251 {
252 self.then(move |v| State::Resolve(map(v)))
253 }
254
255 pub fn then<F, U>(mut self, mut task: F) -> Task<'a, U, E>
256 where
257 F: FnMut(T) -> State<U, E> + 'a,
258 U: 'a,
259 {
260 Task::new(move || {
261 self.exec();
262
263 if !self.state.is_pending() {
264 match self.state.take().into_result().unwrap() {
265 Ok(r) => task(r),
266 Err(e) => State::Reject(e),
267 }
268 } else {
269 State::Pending
270 }
271 })
272 }
273
274 pub fn done<F>(self, mut done: F) -> Task<'a, (), E>
275 where
276 F: FnMut(T) + 'a,
277 {
278 self.then(move |r| State::Resolve(done(r)))
279 }
280
281 pub fn recover<F, O>(mut self, mut recover: F) -> Task<'a, T, O>
282 where
283 F: FnMut(E) -> State<T, O> + 'a,
284 O: 'a,
285 {
286 Task::new(move || {
287 self.exec();
288
289 if !self.state.is_pending() {
290 match self.state.take().into_result().unwrap() {
291 Ok(r) => State::Resolve(r),
292 Err(e) => recover(e),
293 }
294 } else {
295 State::Pending
296 }
297 })
298 }
299
300 pub fn catch<F>(self, mut catch: F) -> Task<'a, T, ()>
301 where
302 F: FnMut(E) + 'a,
303 {
304 self.recover(move |e| State::Reject(catch(e)))
305 }
306}
307
308impl<'a, T, E> Debug for Task<'a, T, E> {
309 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
310 write!(f, "Task {{ state: {:?} }}", self.state)
311 }
312}
313
314impl<'a, T, E> Executable for Task<'a, T, E> {
315 fn exec(&mut self) -> bool {
316 if !self.state.is_pending() {
317 return true;
318 }
319
320 self.state = (self.task)();
321 !self.state.is_pending()
322 }
323}
324
325impl<'a, T, E> Task<'a, T, E>
326where
327 T: PartialEq + 'a,
328 E: PartialEq + 'a,
329{
330 pub fn eq(self, task: Task<'a, T, E>) -> Task<'a, bool, E> {
331 self.join(task).map(|(a, b)| a == b)
332 }
333 pub fn ne(self, task: Task<'a, T, E>) -> Task<'a, bool, E> {
334 self.join(task).map(|(a, b)| a != b)
335 }
336}
337
338impl<'a, T, E> Task<'a, T, E>
339where
340 T: PartialOrd + 'a,
341 E: PartialOrd + 'a,
342{
343 pub fn lt(self, task: Task<'a, T, E>) -> Task<'a, bool, E> {
344 self.join(task).map(|(a, b)| a < b)
345 }
346 pub fn le(self, task: Task<'a, T, E>) -> Task<'a, bool, E> {
347 self.join(task).map(|(a, b)| a <= b)
348 }
349 pub fn gt(self, task: Task<'a, T, E>) -> Task<'a, bool, E> {
350 self.join(task).map(|(a, b)| a > b)
351 }
352 pub fn ge(self, task: Task<'a, T, E>) -> Task<'a, bool, E> {
353 self.join(task).map(|(a, b)| a >= b)
354 }
355}
356
357#[cfg(feature = "futures_support")]
358impl<'a, T, E, F> From<F> for Task<'a, T, E>
359where
360 T: 'a,
361 E: 'a,
362 F: Future<Item = T, Error = E> + 'a,
363{
364 fn from(future: F) -> Self {
365 Self::from_future(future)
366 }
367}
368
369#[cfg(feature = "futures_support")]
370impl<'a, T, E> Future for Task<'a, T, E>
371where
372 T: 'a,
373 E: 'a,
374{
375 type Item = T;
376 type Error = E;
377
378 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
379 self.exec();
380
381 if let State::Pending = self.state {
382 return Ok(Async::NotReady);
383 }
384
385 match self.state.take() {
386 State::Resolve(v) => Ok(Async::Ready(v)),
387 State::Reject(e) => Err(e),
388 State::Resolved => panic!("Task already resolved"),
389 State::Rejected => panic!("Task already rejected"),
390 }
391 }
392}
393
394unsafe impl<'a, T, E> Send for Task<'a, T, E> {}
395unsafe impl<'a, T, E> Sync for Task<'a, T, E> {}
396
397#[cfg(test)]
398mod tests {
399 #[cfg(feature = "futures_support")]
400 extern crate tokio_timer;
401
402 use super::*;
403
404 #[test]
405 fn can_create_task() {
406 let _: Task<(), ()> = Task::new(|| State::Pending);
407 }
408
409 #[cfg(feature = "futures_support")]
410 #[test]
411 fn can_create_task_from_future() {
412 use self::tokio_timer::{Timer, TimerError};
413 use std::time::Duration;
414
415 let sleep_future = Timer::default().sleep(Duration::new(1, 0));
416 let _: Task<(), TimerError> = Task::from_future(sleep_future);
417 }
418
419 #[test]
420 fn can_poll_for_value() {
421 let mut i = 5;
422 let mut task: Task<_, ()> = Task::new(|| {
423 i += 1;
424 if i == 20 {
425 return State::Resolve(i);
426 }
427 State::Pending
428 });
429
430 let result = loop {
431 if let Some(Ok(r)) = task.poll() {
432 break r;
433 }
434 };
435
436 assert_eq!(result, 20);
437 }
438
439 #[test]
440 fn can_wait_for_value() {
441 let mut i = 5;
442 let task: Task<_, ()> = Task::new(|| {
443 i += 1;
444 if i == 20 {
445 return State::Resolve(i);
446 }
447 State::Pending
448 });
449 assert_eq!(task.wait().unwrap().unwrap(), 20);
450 }
451
452 #[test]
453 fn can_chain_tasks() {
454 let task: Task<_, ()> = Task::new(|| State::Resolve(1))
455 .then(|n| State::Resolve(n + 1))
456 .then(|n| State::Resolve(n + 2))
457 .then(|n| State::Resolve(n + 3));
458 assert_eq!(task.wait().unwrap().unwrap(), 7);
459 }
460
461 #[test]
462 fn can_use_done() {
463 let task: Task<_, ()> = Task::new(|| State::Resolve(1)).done(|val| assert_eq!(val, 1));
464 task.wait();
465 }
466}