promising_future/
lib.rs

1//! Futures and Promises
2//! ====================
3//!
4//! Quick example:
5//!
6//! ```
7//! # use ::promising_future::future_promise;
8//! # use std::time::Duration;
9//! # #[allow(unused_variables)]
10//! # use std::thread;
11//! let (fut, prom) = future_promise();
12//!
13//! // A time-consuming process
14//! thread::spawn(|| { thread::sleep(Duration::from_millis(100)); prom.set(123) });
15//!
16//! // do something when the value is ready
17//! let fut = fut.then(|v| v + 1);
18//!
19//! // Wait for the final result
20//! assert_eq!(fut.value(), Some(124));
21//! ```
22//!
23//! This module implements a pair of concepts: `Future`s - a read-only placeholder for a variable
24//! whose value may not yet be known, and `Promise`s - a write-once container which sets the
25//! value.
26//!
27//! A `Future` may either be "resolved" or "unresolved". An unresolved `Future` still has a pending
28//! `Promise` for it. It becomes resolved once the `Promise` is complete. Once resolved, it will
29//! have a value if the `Promise` was fulfilled (ie, set a value), or no value if the `Promise` was
30//! unfulfilled (ie, dropped without setting a value).
31//!
32//! A `Promise` is either "pending" or "complete". A pending `Promise` is simply a live value of
33//! type `Promise<T>`. It can be fulfilled by setting a value, which consumes the Promise,
34//! completing it. Alternatively it can be completed unfulfilled by simply dropping the value
35//! without ever calling `set` on it.
36//!
37//! A `Future` can also be created already resolved (ie, not paired with a `Promise`). This is
38//! useful for lifting values into the `Promise`/`Future` domain.
39//!
40//! `Future`s may be chained in two ways. The most general way is with `callback`, which takes a
41//! `Future` and a function to act on the value when it becomes available. This function is called
42//! within the same context that completed the `Promise` so if the function blocks it will block
43//! that context. The callback is passed another `Promise` to take the return of the callback, which
44//! may be fulfilled or unfulfilled within the callback, or passed on somewhere else.
45//!
46//! Using `callback` directly can be a little cumbersome, so there are a couple of helpers. `then`
47//! simply calls a synchronous callback function and uses its return to fulfill the value. The
48//! function must be run within the `Promise` context, so it should probably be quick.
49//!
50//! Alternatively `chain` - like `then` - will take a function to act on the resolved future
51//! value. However, unlike `then` it runs it in its own thread, so it can be arbitrarily
52//! time-consuming. The variant `chain_with` allows the thread creation to be controlled, so that
53//! thread pools may be used, for example.
54//!
55//! Groups of `Future`s can be acted upon together. `all` takes an iterator of `Future<T>`s, and
56//! returns a `Future<Iterator<T>>`, so that its possible to wait for multiple Futures to be
57//! resolved.
58//!
59//! Similarly, `any` returns the first available value of an iterator of `Future`s, discarding all
60//! the other values.
61//!
62//! More generally, `FutureStream` provides a mechanism to wait on an arbitrary number of `Futures`
63//! and incrementally acquiring their values as they become available.
64//!
65//! If a `Future` is dropped while its corresponding `Promise` is still pending, then any value it
66//! does produce will be discarded. The `Promise` be queried with its `canceled` method to see if a
67//! corresponding `Future` still exists; if not, it may choose to abort some time-consuming process
68//! rather than have its output simply discarded.
69
70// Internal details
71//
72// A Future has a promise iff its Unresolved. A Future is either created resolved (`with_value`),
73// in which case it never has a Promise, or becomes resolved by the Promise, which is destroyed
74// in the process. A Promise can exist without a Future - for example, when a callback is set on
75// the Future, which consumes the Future while setting the callback in the Promise.
76//
77// A Future can also be owned by a FutureStream, which prevents any other call on it. Therefore
78// a Future in a FutureStream can't have a callback set on it.
79
80/// An implementation of `Spawner` that spawns threads from a `ThreadPool`.
81#[cfg(feature = "threadpool")]
82extern crate threadpool;
83
84mod inner;
85mod future;
86mod promise;
87
88mod fnbox;
89mod spawner;
90mod util;
91mod futurestream;
92mod cvmx;
93
94pub use spawner::{Spawner, ThreadSpawner};
95pub use util::{any, all, all_with};
96pub use futurestream::{FutureStream, FutureStreamIter, FutureStreamWaiter};
97
98/// Result of calling `Future.poll()`.
99#[derive(Debug)]
100pub enum Pollresult<T> {
101    /// `Future` is not yet resolved; returns the `Future` for further use.
102    Unresolved(Future<T>),
103
104    /// `Future` has been resolved, and may or may not have a value. The `Future` has been consumed.
105    Resolved(Option<T>),
106}
107
108impl<T> From<future::Pollresult<T>> for Pollresult<T> {
109    fn from(res: future::Pollresult<T>) -> Self {
110        match res {
111            future::Pollresult::Unresolved(fut) => Pollresult::Unresolved(Future(fut)),
112            future::Pollresult::Resolved(v) => Pollresult::Resolved(v),
113        }
114    }
115}
116
117/// An undetermined value.
118///
119/// A `Future` represents an undetermined value. A corresponding `Promise` may set the value.
120///
121/// It is typically created in a pair with a `Promise` using the function `future_promise()`.
122#[derive(Debug)]
123pub struct Future<T>(future::Future<T>);
124
125/// A box for resolving a `Future`.
126///
127/// A `Promise` is a write-once box which corresponds with a `Future` and may be used to resolve it.
128///
129/// A `Promise` is initially pending, and is completed once it is consumed, either by its `set`
130/// method, or by going out of scope. The former is "fulfilling" the `Promise`; the latter is
131/// leaving it "unfulfilled".
132///
133/// It may only be created in a pair with a `Future` using the function `future_promise()`.
134#[derive(Debug)]
135pub struct Promise<T>(promise::Promise<T>);
136
137impl<T> Future<T> {
138    /// Construct an already resolved `Future` with a value. It is equivalent to a `Future` whose
139    /// `Promise` has already been fulfilled.
140    ///
141    /// ```
142    /// # use promising_future::{Future, Pollresult};
143    /// let fut = Future::with_value(123);
144    /// match fut.poll() {
145    ///    Pollresult::Resolved(Some(123)) => println!("ok"),
146    ///    _ => panic!("unexpected result"),
147    /// }
148    /// ```
149    #[inline]
150    pub fn with_value(v: T) -> Future<T> {
151        Future(future::Future::with_value(v))
152    }
153
154    /// Construct a resolved `Future` which will never have a value; it is equivalent to a `Future`
155    /// whose `Promise` completed unfulfilled.
156    ///
157    /// ```
158    /// # use promising_future::{Future, Pollresult};
159    /// let fut = Future::<i32>::never();
160    /// match fut.poll() {
161    ///    Pollresult::Resolved(None) => println!("ok"),
162    ///    _ => panic!("unexpected result"),
163    /// }
164    /// ```
165    #[inline]
166    pub fn never() -> Future<T> {
167        Future(future::Future::never())
168    }
169
170    /// Test to see if the `Future` is resolved yet.
171    ///
172    /// It returns an `Pollresult`, which has two values:
173    ///
174    /// * `Unresolved(Future<T>)` - the `Future` is not yet resolved, so returns itself, or
175    /// * `Resolved(Option<T>)` - the `Future` has been resolved, and may have a value.
176    ///
177    /// ```
178    /// # use promising_future::{Future, Pollresult};
179    /// # let fut = Future::with_value(123);
180    /// match fut.poll() {
181    ///   Pollresult::Unresolved(fut) => println!("unresolved future {:?}", fut),
182    ///   Pollresult::Resolved(None) => println!("resolved, no value"),
183    ///   Pollresult::Resolved(Some(v)) => println!("resolved, value {}", v),
184    /// }
185    /// ```
186    #[inline]
187    pub fn poll(self) -> Pollresult<T> {
188        Pollresult::from(self.0.poll())
189    }
190
191    /// Block until the `Future` is resolved.
192    ///
193    /// If the `Future` is not yet resolved, it will block until the corresponding `Promise` is
194    /// either fulfilled, or is completed unfulfilled. In the former case it will return `Some(v)`,
195    /// otherwise `None`.
196    ///
197    /// If the `Future` is already resolved - ie, has no corresponding `Promise` - then it will
198    /// return immediately without blocking.
199    ///
200    /// ```
201    /// # use promising_future::Future;
202    /// # let fut = Future::with_value(123);
203    /// match fut.value() {
204    ///   Some(v) => println!("has value {}", v),
205    ///   None => println!("no value"),
206    /// }
207    /// ```
208    #[inline]
209    pub fn value(self) -> Option<T> {
210        self.0.value()
211    }
212
213    /// Set a synchronous callback to run in the Promise's context.
214    ///
215    /// When the `Future<T>` completes, call the function on the value
216    /// (if any), returning a new value which appears in the returned
217    /// `Future<U>`.
218    ///
219    /// The function is called within the `Promise`s context, and so
220    /// will block the thread if it takes a long time. Because the
221    /// callback returns a value, not a `Future` it cannot be
222    /// async. See `callback` or `chain` for more general async ways
223    /// to apply a function to a `Future`.
224    ///
225    /// ```
226    /// # use promising_future::future_promise;
227    /// let (fut, prom) = future_promise();
228    ///
229    /// let fut = fut.then_opt(|v| v.map(|v| v + 123));
230    /// prom.set(1);
231    /// assert_eq!(fut.value(), Some(124))
232    /// ```
233    #[inline]
234    pub fn then_opt<F, U>(self, func: F) -> Future<U>
235        where F: FnOnce(Option<T>) -> Option<U> + Send + 'static,
236              U: Send + 'static
237    {
238        Future(self.0.then_opt(func))
239    }
240
241    /// Set synchronous callback
242    ///
243    /// Simplest form of callback. This is only called if the promise
244    /// is fulfilled, and may only allow a promise to be fulfilled.
245    #[inline]
246    pub fn then<F, U>(self, func: F) -> Future<U>
247        where F: FnOnce(T) -> U + Send + 'static,
248              U: Send + 'static
249    {
250        Future(self.0.then(func))
251    }
252
253    /// Set a callback to run in the `Promise`'s context.
254    ///
255    /// This function sets a general purpose callback which is called
256    /// when a `Future` is resolved. It is called in the `Promise`'s
257    /// context, so if it is long-running it will block whatever
258    /// thread that is. (If the `Future` is already resolved, it is
259    /// the calling thread.)
260    ///
261    /// The value passed to the callback is an `Option` - if it is
262    /// `None` it means the promise was unfulfilled.
263    ///
264    /// The callback is passed a new `Promise<U>` which is paired with
265    /// the `Future<U>` this function returns; the callback may either
266    /// set a value on it, pass it somewhere else, or simply drop it
267    /// leaving the promise unfulfilled.
268    ///
269    /// This is the most general form of a completion callback; see
270    /// also `then` and `chain` for simpler interfaces which are often
271    /// all that's needed.
272    ///
273    /// ```
274    /// # use promising_future::future_promise;
275    /// let (fut, prom) = future_promise();
276    ///
277    /// let fut = fut.callback(|v, p| {
278    ///    match v {
279    ///      None => (), // drop p
280    ///      Some(v) => p.set(v + 123),
281    ///    }
282    /// });
283    /// prom.set(1);
284    /// assert_eq!(fut.value(), Some(124))
285    /// ```
286    #[inline]
287    pub fn callback<F, U>(self, func: F) -> Future<U>
288        where F: FnOnce(Option<T>, Promise<U>) + Send + 'static,
289              U: Send + 'static
290    {
291        let func = move |v, p| func(v, Promise(p));
292        Future(self.0.callback(func))
293    }
294
295    /// Set a callback which returns `()`
296    ///
297    /// Set a callback with a closure which returns nothing, so its only useful for its side-effects.
298    #[inline]
299    pub fn callback_unit<F>(self, func: F)
300        where F: FnOnce(Option<T>) + Send + 'static
301    {
302        self.0.callback_unit(func)
303    }
304}
305
306impl<T: Send> Future<T> {
307    /// Chain two `Future`s.
308    ///
309    /// Asynchronously apply a function to the result of a `Future`, returning a new `Future` for
310    /// that value. This may spawn a thread to block waiting for the first `Future` to complete.
311    ///
312    /// The function is passed an `Option`, which indicates whether the `Future` ever received a
313    /// value. The function returns an `Option` so that the resulting `Future` can also be
314    /// valueless.
315    #[inline]
316    pub fn chain<F, U>(self, func: F) -> Future<U>
317        where F: FnOnce(Option<T>) -> Option<U> + Send + 'static, T: 'static, U: Send + 'static
318    {
319        Future(self.0.chain(func))
320    }
321
322    /// As with `chain`, but pass a `Spawner` to control how the thread is created.
323    #[inline]
324    pub fn chain_with<F, U, S>(self, func: F, spawner: &S) -> Future<U>
325        where F: FnOnce(Option<T>) -> Option<U> + Send + 'static, T: 'static, U: Send + 'static, S: Spawner
326    {
327        Future(self.0.chain_with(func, spawner))
328    }
329}
330
331
332impl<T> From<Option<T>> for Future<T> {
333    fn from(v: Option<T>) -> Future<T> {
334        Future(future::Future::from(v))
335    }
336}
337
338/// Blocking iterator for the value of a `Future`. Returns either 0 or 1 values.
339pub struct FutureIter<T>(Option<Future<T>>);
340
341impl<T> IntoIterator for Future<T> {
342    type Item = T;
343    type IntoIter = FutureIter<T>;
344
345    fn into_iter(self) -> Self::IntoIter {
346        FutureIter(Some(self))
347    }
348}
349
350impl<T> Iterator for FutureIter<T> {
351    type Item = T;
352
353    fn next(&mut self) -> Option<Self::Item> {
354        match self.0.take() {
355            None => None,
356            Some(fut) => fut.value()
357        }
358    }
359}
360
361impl<T> Promise<T> {
362    /// Fulfill the `Promise` by resolving the corresponding `Future` with a value.
363    #[inline]
364    pub fn set(self, v: T) {
365        self.0.set(v)
366    }
367
368    /// Return true if the corresponding `Future` no longer exists, and so any value set would be
369    /// discarded.
370    ///
371    /// ```
372    /// # use ::promising_future::future_promise;
373    /// # use std::thread;
374    /// # use std::mem;
375    /// # struct State; impl State { fn new() -> State { State } fn perform_action(&mut self) -> Option<u32> { None } }
376    /// let (fut, prom) = future_promise();
377    ///
378    /// thread::spawn(move || {
379    ///     let mut s = State::new();
380    ///     while !prom.canceled() {
381    ///         match s.perform_action() {
382    ///             None => (),
383    ///             Some(res) => { prom.set(res); break },
384    ///         }
385    ///     }
386    /// });
387    /// // ...
388    /// mem::drop(fut);
389    /// ```
390    #[inline]
391    pub fn canceled(&self) -> bool {
392        self.0.canceled()
393    }
394}
395
396/// Construct a `Future`/`Promise` pair.
397///
398/// A `Future` represents a value which may not yet be known. A `Promise` is some process which will
399/// determine that value. This function produces a bound `Future`/`Promise` pair. If the `Promise`
400/// is dropped before the value is set, then the `Future` will never return a value. If the `Future`
401/// is dropped before fetching the value, or before the value is set, then the `Promise`'s value is
402/// lost.
403///
404/// ```
405/// # use promising_future::{Future, future_promise};
406/// let (fut, prom) = future_promise::<i32>();
407/// ```
408#[inline]
409pub fn future_promise<T>() -> (Future<T>, Promise<T>) {
410    let (f, p) = future::future_promise();
411    (Future(f), Promise(p))
412}