task_kit/task/
task.rs

1use std::fmt::{self, Debug};
2use std::ops::FnMut;
3use super::State;
4use super::super::runner::Executable;
5
6#[cfg(feature = "futures_support")]
7use futures::{Async, Future, Poll};
8
9/// Tasks can be used to execute code in Task Kit's runner thread pool.
10/// This is the key primive of this crate. It can be used to build and
11/// organize asyncronous code paths.
12///
13/// For more on how to use tasks see the [crate documentation](../index.html).
14///
15/// #Examples
16///
17/// Polling example:
18///
19/// ```
20/// # use task_kit::prelude::*;
21/// # let mut runner = Runner::new();
22/// let mut i = 0;
23/// let task: Task<u32, ()> = Task::new(move || {
24///   if i < 100 {
25///     i += 1;
26///     Pending
27///   } else {
28///     Resolve(i)
29///   }
30/// });
31/// # runner.run(task);
32/// # runner.finish();
33/// ```
34///
35/// Long running example:
36///
37/// ```
38/// # use task_kit::prelude::*;
39/// # let mut runner = Runner::new();
40/// let task: Task<u32, ()> = Task::with(move || {
41///   let mut i = 0;
42///   while i < 100 {
43///     i += 1;
44///   }
45///   i
46/// });
47/// # runner.run(task);
48/// # runner.finish();
49/// ```
50pub struct Task<'a, T = (), E = ()> {
51  task: Box<FnMut() -> State<T, E> + 'a>,
52  state: State<T, E>,
53}
54
55impl<'a, T, E> Task<'a, T, E>
56where
57  T: 'a,
58  E: 'a,
59{
60  /// Create a new task from a closure returning a `State`
61  ///
62  /// Provide the new function a closure that contains the logic you wish
63  /// to execute asyncronously. This closure will be executed upon the thread
64  /// pool within the runner until your closure returns an instance of
65  /// `State::Resolve` containing a value or an instance of `State::Reject`
66  /// containing an error value.
67  ///
68  /// # Arguments
69  ///
70  /// * `task` - A closure containing code to be executed asyncronously by the
71  ///            runner.
72  ///
73  /// # Examples
74  ///
75  /// ```
76  /// # use task_kit::prelude::*;
77  /// # fn do_something_blocking() -> String { String::new() }
78  /// # let mut runner = Runner::new();
79  /// let task: Task<String, ()> = Task::new(|| Resolve(do_something_blocking()));
80  /// # runner.run(task);
81  /// # runner.finish();
82  /// ```
83  pub fn new<F>(task: F) -> Self
84  where
85    F: FnMut() -> State<T, E> + 'a,
86  {
87    Self {
88      task: Box::new(task),
89      state: State::Pending,
90    }
91  }
92
93  /// Create a new task from a value.
94  ///
95  /// Useful only in cases where you need to pass a task to something, but
96  /// already have the value you wish to resolve.
97  ///
98  /// # Arguments
99  ///
100  /// * `val` - The value you'd like the task to resolve
101  ///
102  /// # Examples
103  ///
104  /// ```
105  /// # use task_kit::prelude::*;
106  /// # let my_string = String::new();
107  /// # let mut runner = Runner::new();
108  /// let task: Task<String, ()> = Task::from(my_string);
109  /// # runner.run(task);
110  /// # runner.finish();
111  /// ```
112  pub fn from(val: T) -> Self {
113    let mut val = Some(val);
114    Self::new(move || match val.take() {
115      Some(v) => State::Resolve(v),
116      None => unreachable!(),
117    })
118  }
119
120  #[cfg(feature = "futures_support")]
121  pub fn from_future<F>(mut future: F) -> Self
122  where
123    F: Future<Item = T, Error = E> + 'a,
124  {
125    Self::new(move || loop {
126      match future.poll() {
127        Ok(v) => match v {
128          Async::Ready(v) => break State::Resolve(v),
129          Async::NotReady => (),
130        },
131        Err(e) => break State::Reject(e),
132      }
133    })
134  }
135
136  /// Create a new task from a closure returning a value.
137  ///
138  /// The closure will only be executed once by the runner, and is expected to
139  /// return the value you wish to resolve.
140  ///
141  /// # Arguments
142  ///
143  /// * `with` - A closure that will return the value you'd like the task to
144  ///            resolve.
145  ///
146  /// # Examples
147  ///
148  /// ```
149  /// # use task_kit::prelude::*;
150  /// # fn do_something_blocking() -> String { String::new() }
151  /// # let mut runner = Runner::new();
152  /// let task: Task<String, ()> = Task::with(|| do_something_blocking());
153  /// # runner.run(task);
154  /// # runner.finish();
155  /// ```
156  pub fn with<F>(mut with: F) -> Self
157  where
158    F: FnMut() -> T + 'a,
159  {
160    Self {
161      task: Box::new(move || State::Resolve(with())),
162      state: State::Pending,
163    }
164  }
165
166  /// Create a new merged task from the current task instance and a second task
167  ///
168  /// Join will return a new task that will resolve a tuple containing the
169  /// results from both the task `join` is called upon, and the task passed in.
170  ///
171  /// Both the current task and the second task passed in will still execute in
172  /// parallel.
173  ///
174  /// # Arguments
175  ///
176  /// * `task` - A second task to join with the current task
177  ///
178  /// # Examples
179  ///
180  /// ```
181  /// # use task_kit::prelude::*;
182  /// # let mut runner = Runner::new();
183  /// # let my_task: Task<String, ()> = Task::from(String::new());
184  /// # let my_other_task: Task<String, ()> = Task::from(String::new());
185  /// let merged_task = my_task.join(my_other_task);
186  /// # runner.run(merged_task);
187  /// # runner.finish();
188  /// ```
189  pub fn join<U>(mut self, mut task: Task<'a, U, E>) -> Task<'a, (T, U), E>
190  where
191    U: 'a,
192  {
193    Task::new(move || {
194      if self.state.is_pending() {
195        self.exec();
196      }
197      if task.state.is_pending() {
198        task.exec();
199      }
200
201      if self.state.is_reject() {
202        return State::Reject(self.state.take().reject().unwrap());
203      }
204      if task.state.is_reject() {
205        return State::Reject(task.state.take().reject().unwrap());
206      }
207
208      if self.state.is_resolve() && task.state.is_resolve() {
209        let a_val = self.state.take().resolve().unwrap();
210        let b_val = task.state.take().resolve().unwrap();
211        return State::Resolve((a_val, b_val));
212      }
213      State::Pending
214    })
215  }
216
217  /// Get the task state
218  ///
219  /// Returns a reference to the internal state of the task
220  pub fn state(&self) -> &State<T, E> {
221    &self.state
222  }
223
224  /// Executes the closure within the task once
225  ///
226  /// If the task resolves or rejects then the returned option will contain
227  /// a result object.
228  pub fn poll(&mut self) -> Option<Result<T, E>> {
229    self.exec();
230    if self.state.is_pending() {
231      None
232    } else {
233      self.state.take().into_result()
234    }
235  }
236
237  /// Executes the closure within the task blocking until the task completes
238  pub fn wait(mut self) -> Option<Result<T, E>> {
239    loop {
240      self.exec();
241      if !self.state.is_pending() {
242        break self.state.into_result();
243      }
244    }
245  }
246
247  pub fn map<F, U>(self, mut map: F) -> Task<'a, U, E>
248  where
249    F: FnMut(T) -> U + 'a,
250    U: 'a,
251  {
252    self.then(move |v| State::Resolve(map(v)))
253  }
254
255  pub fn then<F, U>(mut self, mut task: F) -> Task<'a, U, E>
256  where
257    F: FnMut(T) -> State<U, E> + 'a,
258    U: 'a,
259  {
260    Task::new(move || {
261      self.exec();
262
263      if !self.state.is_pending() {
264        match self.state.take().into_result().unwrap() {
265          Ok(r) => task(r),
266          Err(e) => State::Reject(e),
267        }
268      } else {
269        State::Pending
270      }
271    })
272  }
273
274  pub fn done<F>(self, mut done: F) -> Task<'a, (), E>
275  where
276    F: FnMut(T) + 'a,
277  {
278    self.then(move |r| State::Resolve(done(r)))
279  }
280
281  pub fn recover<F, O>(mut self, mut recover: F) -> Task<'a, T, O>
282  where
283    F: FnMut(E) -> State<T, O> + 'a,
284    O: 'a,
285  {
286    Task::new(move || {
287      self.exec();
288
289      if !self.state.is_pending() {
290        match self.state.take().into_result().unwrap() {
291          Ok(r) => State::Resolve(r),
292          Err(e) => recover(e),
293        }
294      } else {
295        State::Pending
296      }
297    })
298  }
299
300  pub fn catch<F>(self, mut catch: F) -> Task<'a, T, ()>
301  where
302    F: FnMut(E) + 'a,
303  {
304    self.recover(move |e| State::Reject(catch(e)))
305  }
306}
307
308impl<'a, T, E> Debug for Task<'a, T, E> {
309  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
310    write!(f, "Task {{ state: {:?} }}", self.state)
311  }
312}
313
314impl<'a, T, E> Executable for Task<'a, T, E> {
315  fn exec(&mut self) -> bool {
316    if !self.state.is_pending() {
317      return true;
318    }
319
320    self.state = (self.task)();
321    !self.state.is_pending()
322  }
323}
324
325impl<'a, T, E> Task<'a, T, E>
326where
327  T: PartialEq + 'a,
328  E: PartialEq + 'a,
329{
330  pub fn eq(self, task: Task<'a, T, E>) -> Task<'a, bool, E> {
331    self.join(task).map(|(a, b)| a == b)
332  }
333  pub fn ne(self, task: Task<'a, T, E>) -> Task<'a, bool, E> {
334    self.join(task).map(|(a, b)| a != b)
335  }
336}
337
338impl<'a, T, E> Task<'a, T, E>
339where
340  T: PartialOrd + 'a,
341  E: PartialOrd + 'a,
342{
343  pub fn lt(self, task: Task<'a, T, E>) -> Task<'a, bool, E> {
344    self.join(task).map(|(a, b)| a < b)
345  }
346  pub fn le(self, task: Task<'a, T, E>) -> Task<'a, bool, E> {
347    self.join(task).map(|(a, b)| a <= b)
348  }
349  pub fn gt(self, task: Task<'a, T, E>) -> Task<'a, bool, E> {
350    self.join(task).map(|(a, b)| a > b)
351  }
352  pub fn ge(self, task: Task<'a, T, E>) -> Task<'a, bool, E> {
353    self.join(task).map(|(a, b)| a >= b)
354  }
355}
356
357#[cfg(feature = "futures_support")]
358impl<'a, T, E, F> From<F> for Task<'a, T, E>
359where
360  T: 'a,
361  E: 'a,
362  F: Future<Item = T, Error = E> + 'a,
363{
364  fn from(future: F) -> Self {
365    Self::from_future(future)
366  }
367}
368
369#[cfg(feature = "futures_support")]
370impl<'a, T, E> Future for Task<'a, T, E>
371where
372  T: 'a,
373  E: 'a,
374{
375  type Item = T;
376  type Error = E;
377
378  fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
379    self.exec();
380
381    if let State::Pending = self.state {
382      return Ok(Async::NotReady);
383    }
384
385    match self.state.take() {
386      State::Resolve(v) => Ok(Async::Ready(v)),
387      State::Reject(e) => Err(e),
388      State::Resolved => panic!("Task already resolved"),
389      State::Rejected => panic!("Task already rejected"),
390    }
391  }
392}
393
394unsafe impl<'a, T, E> Send for Task<'a, T, E> {}
395unsafe impl<'a, T, E> Sync for Task<'a, T, E> {}
396
397#[cfg(test)]
398mod tests {
399  #[cfg(feature = "futures_support")]
400  extern crate tokio_timer;
401
402  use super::*;
403
404  #[test]
405  fn can_create_task() {
406    let _: Task<(), ()> = Task::new(|| State::Pending);
407  }
408
409  #[cfg(feature = "futures_support")]
410  #[test]
411  fn can_create_task_from_future() {
412    use self::tokio_timer::{Timer, TimerError};
413    use std::time::Duration;
414
415    let sleep_future = Timer::default().sleep(Duration::new(1, 0));
416    let _: Task<(), TimerError> = Task::from_future(sleep_future);
417  }
418
419  #[test]
420  fn can_poll_for_value() {
421    let mut i = 5;
422    let mut task: Task<_, ()> = Task::new(|| {
423      i += 1;
424      if i == 20 {
425        return State::Resolve(i);
426      }
427      State::Pending
428    });
429
430    let result = loop {
431      if let Some(Ok(r)) = task.poll() {
432        break r;
433      }
434    };
435
436    assert_eq!(result, 20);
437  }
438
439  #[test]
440  fn can_wait_for_value() {
441    let mut i = 5;
442    let task: Task<_, ()> = Task::new(|| {
443      i += 1;
444      if i == 20 {
445        return State::Resolve(i);
446      }
447      State::Pending
448    });
449    assert_eq!(task.wait().unwrap().unwrap(), 20);
450  }
451
452  #[test]
453  fn can_chain_tasks() {
454    let task: Task<_, ()> = Task::new(|| State::Resolve(1))
455      .then(|n| State::Resolve(n + 1))
456      .then(|n| State::Resolve(n + 2))
457      .then(|n| State::Resolve(n + 3));
458    assert_eq!(task.wait().unwrap().unwrap(), 7);
459  }
460
461  #[test]
462  fn can_use_done() {
463    let task: Task<_, ()> = Task::new(|| State::Resolve(1)).done(|val| assert_eq!(val, 1));
464    task.wait();
465  }
466}