promising_future/lib.rs
1//! Futures and Promises
2//! ====================
3//!
4//! Quick example:
5//!
6//! ```
7//! # use ::promising_future::future_promise;
8//! # use std::time::Duration;
9//! # #[allow(unused_variables)]
10//! # use std::thread;
11//! let (fut, prom) = future_promise();
12//!
13//! // A time-consuming process
14//! thread::spawn(|| { thread::sleep(Duration::from_millis(100)); prom.set(123) });
15//!
16//! // do something when the value is ready
17//! let fut = fut.then(|v| v + 1);
18//!
19//! // Wait for the final result
20//! assert_eq!(fut.value(), Some(124));
21//! ```
22//!
23//! This module implements a pair of concepts: `Future`s - a read-only placeholder for a variable
24//! whose value may not yet be known, and `Promise`s - a write-once container which sets the
25//! value.
26//!
27//! A `Future` may either be "resolved" or "unresolved". An unresolved `Future` still has a pending
28//! `Promise` for it. It becomes resolved once the `Promise` is complete. Once resolved, it will
29//! have a value if the `Promise` was fulfilled (ie, set a value), or no value if the `Promise` was
30//! unfulfilled (ie, dropped without setting a value).
31//!
32//! A `Promise` is either "pending" or "complete". A pending `Promise` is simply a live value of
33//! type `Promise<T>`. It can be fulfilled by setting a value, which consumes the Promise,
34//! completing it. Alternatively it can be completed unfulfilled by simply dropping the value
35//! without ever calling `set` on it.
36//!
37//! A `Future` can also be created already resolved (ie, not paired with a `Promise`). This is
38//! useful for lifting values into the `Promise`/`Future` domain.
39//!
40//! `Future`s may be chained in two ways. The most general way is with `callback`, which takes a
41//! `Future` and a function to act on the value when it becomes available. This function is called
42//! within the same context that completed the `Promise` so if the function blocks it will block
43//! that context. The callback is passed another `Promise` to take the return of the callback, which
44//! may be fulfilled or unfulfilled within the callback, or passed on somewhere else.
45//!
46//! Using `callback` directly can be a little cumbersome, so there are a couple of helpers. `then`
47//! simply calls a synchronous callback function and uses its return to fulfill the value. The
48//! function must be run within the `Promise` context, so it should probably be quick.
49//!
50//! Alternatively `chain` - like `then` - will take a function to act on the resolved future
51//! value. However, unlike `then` it runs it in its own thread, so it can be arbitrarily
52//! time-consuming. The variant `chain_with` allows the thread creation to be controlled, so that
53//! thread pools may be used, for example.
54//!
55//! Groups of `Future`s can be acted upon together. `all` takes an iterator of `Future<T>`s, and
56//! returns a `Future<Iterator<T>>`, so that its possible to wait for multiple Futures to be
57//! resolved.
58//!
59//! Similarly, `any` returns the first available value of an iterator of `Future`s, discarding all
60//! the other values.
61//!
62//! More generally, `FutureStream` provides a mechanism to wait on an arbitrary number of `Futures`
63//! and incrementally acquiring their values as they become available.
64//!
65//! If a `Future` is dropped while its corresponding `Promise` is still pending, then any value it
66//! does produce will be discarded. The `Promise` be queried with its `canceled` method to see if a
67//! corresponding `Future` still exists; if not, it may choose to abort some time-consuming process
68//! rather than have its output simply discarded.
69
70// Internal details
71//
72// A Future has a promise iff its Unresolved. A Future is either created resolved (`with_value`),
73// in which case it never has a Promise, or becomes resolved by the Promise, which is destroyed
74// in the process. A Promise can exist without a Future - for example, when a callback is set on
75// the Future, which consumes the Future while setting the callback in the Promise.
76//
77// A Future can also be owned by a FutureStream, which prevents any other call on it. Therefore
78// a Future in a FutureStream can't have a callback set on it.
79
80/// An implementation of `Spawner` that spawns threads from a `ThreadPool`.
81#[cfg(feature = "threadpool")]
82extern crate threadpool;
83
84mod inner;
85mod future;
86mod promise;
87
88mod fnbox;
89mod spawner;
90mod util;
91mod futurestream;
92mod cvmx;
93
94pub use spawner::{Spawner, ThreadSpawner};
95pub use util::{any, all, all_with};
96pub use futurestream::{FutureStream, FutureStreamIter, FutureStreamWaiter};
97
98/// Result of calling `Future.poll()`.
99#[derive(Debug)]
100pub enum Pollresult<T> {
101 /// `Future` is not yet resolved; returns the `Future` for further use.
102 Unresolved(Future<T>),
103
104 /// `Future` has been resolved, and may or may not have a value. The `Future` has been consumed.
105 Resolved(Option<T>),
106}
107
108impl<T> From<future::Pollresult<T>> for Pollresult<T> {
109 fn from(res: future::Pollresult<T>) -> Self {
110 match res {
111 future::Pollresult::Unresolved(fut) => Pollresult::Unresolved(Future(fut)),
112 future::Pollresult::Resolved(v) => Pollresult::Resolved(v),
113 }
114 }
115}
116
117/// An undetermined value.
118///
119/// A `Future` represents an undetermined value. A corresponding `Promise` may set the value.
120///
121/// It is typically created in a pair with a `Promise` using the function `future_promise()`.
122#[derive(Debug)]
123pub struct Future<T>(future::Future<T>);
124
125/// A box for resolving a `Future`.
126///
127/// A `Promise` is a write-once box which corresponds with a `Future` and may be used to resolve it.
128///
129/// A `Promise` is initially pending, and is completed once it is consumed, either by its `set`
130/// method, or by going out of scope. The former is "fulfilling" the `Promise`; the latter is
131/// leaving it "unfulfilled".
132///
133/// It may only be created in a pair with a `Future` using the function `future_promise()`.
134#[derive(Debug)]
135pub struct Promise<T>(promise::Promise<T>);
136
137impl<T> Future<T> {
138 /// Construct an already resolved `Future` with a value. It is equivalent to a `Future` whose
139 /// `Promise` has already been fulfilled.
140 ///
141 /// ```
142 /// # use promising_future::{Future, Pollresult};
143 /// let fut = Future::with_value(123);
144 /// match fut.poll() {
145 /// Pollresult::Resolved(Some(123)) => println!("ok"),
146 /// _ => panic!("unexpected result"),
147 /// }
148 /// ```
149 #[inline]
150 pub fn with_value(v: T) -> Future<T> {
151 Future(future::Future::with_value(v))
152 }
153
154 /// Construct a resolved `Future` which will never have a value; it is equivalent to a `Future`
155 /// whose `Promise` completed unfulfilled.
156 ///
157 /// ```
158 /// # use promising_future::{Future, Pollresult};
159 /// let fut = Future::<i32>::never();
160 /// match fut.poll() {
161 /// Pollresult::Resolved(None) => println!("ok"),
162 /// _ => panic!("unexpected result"),
163 /// }
164 /// ```
165 #[inline]
166 pub fn never() -> Future<T> {
167 Future(future::Future::never())
168 }
169
170 /// Test to see if the `Future` is resolved yet.
171 ///
172 /// It returns an `Pollresult`, which has two values:
173 ///
174 /// * `Unresolved(Future<T>)` - the `Future` is not yet resolved, so returns itself, or
175 /// * `Resolved(Option<T>)` - the `Future` has been resolved, and may have a value.
176 ///
177 /// ```
178 /// # use promising_future::{Future, Pollresult};
179 /// # let fut = Future::with_value(123);
180 /// match fut.poll() {
181 /// Pollresult::Unresolved(fut) => println!("unresolved future {:?}", fut),
182 /// Pollresult::Resolved(None) => println!("resolved, no value"),
183 /// Pollresult::Resolved(Some(v)) => println!("resolved, value {}", v),
184 /// }
185 /// ```
186 #[inline]
187 pub fn poll(self) -> Pollresult<T> {
188 Pollresult::from(self.0.poll())
189 }
190
191 /// Block until the `Future` is resolved.
192 ///
193 /// If the `Future` is not yet resolved, it will block until the corresponding `Promise` is
194 /// either fulfilled, or is completed unfulfilled. In the former case it will return `Some(v)`,
195 /// otherwise `None`.
196 ///
197 /// If the `Future` is already resolved - ie, has no corresponding `Promise` - then it will
198 /// return immediately without blocking.
199 ///
200 /// ```
201 /// # use promising_future::Future;
202 /// # let fut = Future::with_value(123);
203 /// match fut.value() {
204 /// Some(v) => println!("has value {}", v),
205 /// None => println!("no value"),
206 /// }
207 /// ```
208 #[inline]
209 pub fn value(self) -> Option<T> {
210 self.0.value()
211 }
212
213 /// Set a synchronous callback to run in the Promise's context.
214 ///
215 /// When the `Future<T>` completes, call the function on the value
216 /// (if any), returning a new value which appears in the returned
217 /// `Future<U>`.
218 ///
219 /// The function is called within the `Promise`s context, and so
220 /// will block the thread if it takes a long time. Because the
221 /// callback returns a value, not a `Future` it cannot be
222 /// async. See `callback` or `chain` for more general async ways
223 /// to apply a function to a `Future`.
224 ///
225 /// ```
226 /// # use promising_future::future_promise;
227 /// let (fut, prom) = future_promise();
228 ///
229 /// let fut = fut.then_opt(|v| v.map(|v| v + 123));
230 /// prom.set(1);
231 /// assert_eq!(fut.value(), Some(124))
232 /// ```
233 #[inline]
234 pub fn then_opt<F, U>(self, func: F) -> Future<U>
235 where F: FnOnce(Option<T>) -> Option<U> + Send + 'static,
236 U: Send + 'static
237 {
238 Future(self.0.then_opt(func))
239 }
240
241 /// Set synchronous callback
242 ///
243 /// Simplest form of callback. This is only called if the promise
244 /// is fulfilled, and may only allow a promise to be fulfilled.
245 #[inline]
246 pub fn then<F, U>(self, func: F) -> Future<U>
247 where F: FnOnce(T) -> U + Send + 'static,
248 U: Send + 'static
249 {
250 Future(self.0.then(func))
251 }
252
253 /// Set a callback to run in the `Promise`'s context.
254 ///
255 /// This function sets a general purpose callback which is called
256 /// when a `Future` is resolved. It is called in the `Promise`'s
257 /// context, so if it is long-running it will block whatever
258 /// thread that is. (If the `Future` is already resolved, it is
259 /// the calling thread.)
260 ///
261 /// The value passed to the callback is an `Option` - if it is
262 /// `None` it means the promise was unfulfilled.
263 ///
264 /// The callback is passed a new `Promise<U>` which is paired with
265 /// the `Future<U>` this function returns; the callback may either
266 /// set a value on it, pass it somewhere else, or simply drop it
267 /// leaving the promise unfulfilled.
268 ///
269 /// This is the most general form of a completion callback; see
270 /// also `then` and `chain` for simpler interfaces which are often
271 /// all that's needed.
272 ///
273 /// ```
274 /// # use promising_future::future_promise;
275 /// let (fut, prom) = future_promise();
276 ///
277 /// let fut = fut.callback(|v, p| {
278 /// match v {
279 /// None => (), // drop p
280 /// Some(v) => p.set(v + 123),
281 /// }
282 /// });
283 /// prom.set(1);
284 /// assert_eq!(fut.value(), Some(124))
285 /// ```
286 #[inline]
287 pub fn callback<F, U>(self, func: F) -> Future<U>
288 where F: FnOnce(Option<T>, Promise<U>) + Send + 'static,
289 U: Send + 'static
290 {
291 let func = move |v, p| func(v, Promise(p));
292 Future(self.0.callback(func))
293 }
294
295 /// Set a callback which returns `()`
296 ///
297 /// Set a callback with a closure which returns nothing, so its only useful for its side-effects.
298 #[inline]
299 pub fn callback_unit<F>(self, func: F)
300 where F: FnOnce(Option<T>) + Send + 'static
301 {
302 self.0.callback_unit(func)
303 }
304}
305
306impl<T: Send> Future<T> {
307 /// Chain two `Future`s.
308 ///
309 /// Asynchronously apply a function to the result of a `Future`, returning a new `Future` for
310 /// that value. This may spawn a thread to block waiting for the first `Future` to complete.
311 ///
312 /// The function is passed an `Option`, which indicates whether the `Future` ever received a
313 /// value. The function returns an `Option` so that the resulting `Future` can also be
314 /// valueless.
315 #[inline]
316 pub fn chain<F, U>(self, func: F) -> Future<U>
317 where F: FnOnce(Option<T>) -> Option<U> + Send + 'static, T: 'static, U: Send + 'static
318 {
319 Future(self.0.chain(func))
320 }
321
322 /// As with `chain`, but pass a `Spawner` to control how the thread is created.
323 #[inline]
324 pub fn chain_with<F, U, S>(self, func: F, spawner: &S) -> Future<U>
325 where F: FnOnce(Option<T>) -> Option<U> + Send + 'static, T: 'static, U: Send + 'static, S: Spawner
326 {
327 Future(self.0.chain_with(func, spawner))
328 }
329}
330
331
332impl<T> From<Option<T>> for Future<T> {
333 fn from(v: Option<T>) -> Future<T> {
334 Future(future::Future::from(v))
335 }
336}
337
338/// Blocking iterator for the value of a `Future`. Returns either 0 or 1 values.
339pub struct FutureIter<T>(Option<Future<T>>);
340
341impl<T> IntoIterator for Future<T> {
342 type Item = T;
343 type IntoIter = FutureIter<T>;
344
345 fn into_iter(self) -> Self::IntoIter {
346 FutureIter(Some(self))
347 }
348}
349
350impl<T> Iterator for FutureIter<T> {
351 type Item = T;
352
353 fn next(&mut self) -> Option<Self::Item> {
354 match self.0.take() {
355 None => None,
356 Some(fut) => fut.value()
357 }
358 }
359}
360
361impl<T> Promise<T> {
362 /// Fulfill the `Promise` by resolving the corresponding `Future` with a value.
363 #[inline]
364 pub fn set(self, v: T) {
365 self.0.set(v)
366 }
367
368 /// Return true if the corresponding `Future` no longer exists, and so any value set would be
369 /// discarded.
370 ///
371 /// ```
372 /// # use ::promising_future::future_promise;
373 /// # use std::thread;
374 /// # use std::mem;
375 /// # struct State; impl State { fn new() -> State { State } fn perform_action(&mut self) -> Option<u32> { None } }
376 /// let (fut, prom) = future_promise();
377 ///
378 /// thread::spawn(move || {
379 /// let mut s = State::new();
380 /// while !prom.canceled() {
381 /// match s.perform_action() {
382 /// None => (),
383 /// Some(res) => { prom.set(res); break },
384 /// }
385 /// }
386 /// });
387 /// // ...
388 /// mem::drop(fut);
389 /// ```
390 #[inline]
391 pub fn canceled(&self) -> bool {
392 self.0.canceled()
393 }
394}
395
396/// Construct a `Future`/`Promise` pair.
397///
398/// A `Future` represents a value which may not yet be known. A `Promise` is some process which will
399/// determine that value. This function produces a bound `Future`/`Promise` pair. If the `Promise`
400/// is dropped before the value is set, then the `Future` will never return a value. If the `Future`
401/// is dropped before fetching the value, or before the value is set, then the `Promise`'s value is
402/// lost.
403///
404/// ```
405/// # use promising_future::{Future, future_promise};
406/// let (fut, prom) = future_promise::<i32>();
407/// ```
408#[inline]
409pub fn future_promise<T>() -> (Future<T>, Promise<T>) {
410 let (f, p) = future::future_promise();
411 (Future(f), Promise(p))
412}