local_pool_with_id/
lib.rs

1//! # local-pool-with-id
2//! A minor variation on a [LocalPool](https://docs.rs/futures/0.3/futures/executor/struct.LocalPool.html) executor which exposes unique IDs for tracking future completion.
3//!
4//! This should almost be a drop in replacement for the existing LocalPool. All existing traits are still implemented. There are two API differences:
5//! 1. New `(Local)SpawnWithId` traits have been implemented. These accept the same arguments as their non-ID counterparts but return a unique ID that can be used to identify whether a spawned future has been completed.
6//! 2. `try_run_one` now returns an `Option<usize>` instead of a boolean. This usize will correspond to the ID received from the previous APIs and can be used with external tracking mechanism to know if a future is complete.
7//!
8//! ## Motivation
9//! The existing `LocalPool` allowed you to run all futures, opaquely, in a non-blocking way or to, blockingly, run a single future to completion and retrieve it's output. By providing tracking IDs, we can use an external lookup to infer which futures are finished and ask them for their results directly.
10//!
11//! ## Example
12//! ```rust
13//! use local_pool_with_id::SpawnWithIdExt;
14//! use futures::prelude::*;
15//!
16//! let mut spawned_ids = std::collections::HashSet::new();
17//! let mut pool = local_pool_with_id::LocalPool::new();
18//! let spawner = pool.spawner();
19//!
20//! let (id1, handle1) = spawner
21//!     .spawn_with_handle(futures::future::ready(1i32))
22//!     .unwrap();
23//! let (id2, handle2) = spawner
24//!     .spawn_with_handle(futures::future::ready(2u32))
25//!     .unwrap();
26//!
27//! spawned_ids.insert(id1);
28//! spawned_ids.insert(id2);
29//!
30//! while !spawned_ids.is_empty() {
31//!     if let Some(completed) = pool.try_run_one() {
32//!         assert!(spawned_ids.remove(&completed))
33//!     }
34//! }
35//!
36//! assert_eq!(handle1.now_or_never().unwrap(), 1);
37//! assert_eq!(handle2.now_or_never().unwrap(), 2);
38//! ```
39
40use futures::executor::enter;
41use futures::future::{FutureObj, LocalFutureObj, RemoteHandle};
42use futures::prelude::*;
43use futures::stream::FuturesUnordered;
44use futures::task::{waker_ref, ArcWake, LocalSpawn, Spawn, SpawnError};
45use std::cell::RefCell;
46use std::pin::Pin;
47use std::rc::{Rc, Weak};
48use std::sync::atomic::{AtomicBool, Ordering};
49use std::sync::Arc;
50use std::task::{Context, Poll};
51use std::thread;
52use std::thread::Thread;
53
54#[must_use = "futures do nothing unless you `.await` or poll them"]
55#[derive(Debug)]
56struct IndexWrapper<T> {
57    data: T, // A future or a future's output
58    index: usize,
59}
60
61impl<T> IndexWrapper<T> {
62    pin_utils::unsafe_pinned!(data: T);
63}
64
65impl<T> Future for IndexWrapper<T>
66where
67    T: Future,
68{
69    type Output = IndexWrapper<T::Output>;
70
71    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
72        self.as_mut()
73            .data()
74            .as_mut()
75            .poll(cx)
76            .map(|output| IndexWrapper {
77                data: output,
78                index: self.index,
79            })
80    }
81}
82
83/// A single-threaded task pool for polling futures to completion.
84///
85/// This executor allows you to multiplex any number of tasks onto a single
86/// thread. It's appropriate to poll strictly I/O-bound futures that do very
87/// little work in between I/O actions.
88///
89/// To get a handle to the pool that implements
90/// [`Spawn`](futures_task::Spawn), use the
91/// [`spawner()`](LocalPool::spawner) method. Because the executor is
92/// single-threaded, it supports a special form of task spawning for non-`Send`
93/// futures, via [`spawn_local_obj`](futures_task::LocalSpawn::spawn_local_obj).
94#[derive(Debug)]
95pub struct LocalPool {
96    pool: FuturesUnordered<IndexWrapper<LocalFutureObj<'static, ()>>>,
97    incoming: Rc<Incoming>,
98}
99
100/// A handle to a [`LocalPool`](LocalPool) that implements
101/// [`Spawn`](futures_task::Spawn).
102#[derive(Clone, Debug)]
103pub struct LocalSpawner {
104    incoming: Weak<Incoming>,
105}
106
107#[derive(Debug, Default)]
108struct IncomingTracking {
109    queue: Vec<(usize, LocalFutureObj<'static, ()>)>,
110    index: usize,
111}
112
113type Incoming = RefCell<IncomingTracking>;
114
115pub(crate) struct ThreadNotify {
116    /// The (single) executor thread.
117    thread: Thread,
118    /// A flag to ensure a wakeup (i.e. `unpark()`) is not "forgotten"
119    /// before the next `park()`, which may otherwise happen if the code
120    /// being executed as part of the future(s) being polled makes use of
121    /// park / unpark calls of its own, i.e. we cannot assume that no other
122    /// code uses park / unpark on the executing `thread`.
123    unparked: AtomicBool,
124}
125
126thread_local! {
127    static CURRENT_THREAD_NOTIFY: Arc<ThreadNotify> = Arc::new(ThreadNotify {
128        thread: thread::current(),
129        unparked: AtomicBool::new(false),
130    });
131}
132
133impl ArcWake for ThreadNotify {
134    fn wake_by_ref(arc_self: &Arc<Self>) {
135        // Make sure the wakeup is remembered until the next `park()`.
136        let unparked = arc_self.unparked.swap(true, Ordering::Relaxed);
137        if !unparked {
138            // If the thread has not been unparked yet, it must be done
139            // now. If it was actually parked, it will run again,
140            // otherwise the token made available by `unpark`
141            // may be consumed before reaching `park()`, but `unparked`
142            // ensures it is not forgotten.
143            arc_self.thread.unpark();
144        }
145    }
146}
147
148// Set up and run a basic single-threaded spawner loop, invoking `f` on each
149// turn.
150fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
151    let _enter = enter().expect(
152        "cannot execute `LocalPool` executor from within \
153         another executor",
154    );
155
156    CURRENT_THREAD_NOTIFY.with(|thread_notify| {
157        let waker = waker_ref(thread_notify);
158        let mut cx = Context::from_waker(&waker);
159        loop {
160            if let Poll::Ready(t) = f(&mut cx) {
161                return t;
162            }
163            // Consume the wakeup that occurred while executing `f`, if any.
164            let unparked = thread_notify.unparked.swap(false, Ordering::Acquire);
165            if !unparked {
166                // No wakeup occurred. It may occur now, right before parking,
167                // but in that case the token made available by `unpark()`
168                // is guaranteed to still be available and `park()` is a no-op.
169                thread::park();
170                // When the thread is unparked, `unparked` will have been set
171                // and needs to be unset before the next call to `f` to avoid
172                // a redundant loop iteration.
173                thread_notify.unparked.store(false, Ordering::Release);
174            }
175        }
176    })
177}
178
179fn poll_executor<T, F: FnMut(&mut Context<'_>) -> T>(mut f: F) -> T {
180    let _enter = enter().expect(
181        "cannot execute `LocalPool` executor from within \
182         another executor",
183    );
184
185    CURRENT_THREAD_NOTIFY.with(|thread_notify| {
186        let waker = waker_ref(thread_notify);
187        let mut cx = Context::from_waker(&waker);
188        f(&mut cx)
189    })
190}
191
192impl LocalPool {
193    /// Create a new, empty pool of tasks.
194    pub fn new() -> LocalPool {
195        LocalPool {
196            pool: FuturesUnordered::new(),
197            incoming: Default::default(),
198        }
199    }
200
201    /// Get a clonable handle to the pool as a [`Spawn`].
202    pub fn spawner(&self) -> LocalSpawner {
203        LocalSpawner {
204            incoming: Rc::downgrade(&self.incoming),
205        }
206    }
207
208    /// Run all tasks in the pool to completion.
209    ///
210    /// ```
211    /// use local_pool_with_id::LocalPool;
212    ///
213    /// let mut pool = LocalPool::new();
214    ///
215    /// // ... spawn some initial tasks using `spawn.spawn()` or `spawn.spawn_local()`
216    ///
217    /// // run *all* tasks in the pool to completion, including any newly-spawned ones.
218    /// pool.run();
219    /// ```
220    ///
221    /// The function will block the calling thread until *all* tasks in the pool
222    /// are complete, including any spawned while running existing tasks.
223    pub fn run(&mut self) {
224        run_executor(|cx| self.poll_pool(cx))
225    }
226
227    /// Runs all the tasks in the pool until the given future completes.
228    ///
229    /// ```
230    /// use local_pool_with_id::LocalPool;
231    ///
232    /// let mut pool = LocalPool::new();
233    /// # let my_app  = async {};
234    ///
235    /// // run tasks in the pool until `my_app` completes
236    /// pool.run_until(my_app);
237    /// ```
238    ///
239    /// The function will block the calling thread *only* until the future `f`
240    /// completes; there may still be incomplete tasks in the pool, which will
241    /// be inert after the call completes, but can continue with further use of
242    /// one of the pool's run or poll methods. While the function is running,
243    /// however, all tasks in the pool will try to make progress.
244    pub fn run_until<F: Future>(&mut self, future: F) -> F::Output {
245        pin_utils::pin_mut!(future);
246
247        run_executor(|cx| {
248            {
249                // if our main task is done, so are we
250                let result = future.as_mut().poll(cx);
251                if let Poll::Ready(output) = result {
252                    return Poll::Ready(output);
253                }
254            }
255
256            let _ = self.poll_pool(cx);
257            Poll::Pending
258        })
259    }
260
261    /// Runs all tasks and returns after completing one future or until no more progress
262    /// can be made. Returns the associated ID if one future was completed, `None` otherwise.
263    ///
264    /// ```
265    /// use local_pool_with_id::LocalPool;
266    /// use futures::task::LocalSpawnExt;
267    /// use futures::future::{ready, pending};
268    ///
269    /// let mut pool = LocalPool::new();
270    /// let spawner = pool.spawner();
271    ///
272    /// spawner.spawn_local(ready(())).unwrap();
273    /// spawner.spawn_local(ready(())).unwrap();
274    /// spawner.spawn_local(pending()).unwrap();
275    ///
276    /// // Run the two ready tasks and returns the IDs for them.
277    /// assert!(pool.try_run_one().is_some());
278    /// assert!(pool.try_run_one().is_some());
279    ///
280    /// // the remaining task can not be completed
281    /// assert!(pool.try_run_one().is_none()); // returns false
282    /// ```
283    ///
284    /// This function will not block the calling thread and will return the moment
285    /// that there are no tasks left for which progress can be made or after exactly one
286    /// task was completed; Remaining incomplete tasks in the pool can continue with
287    /// further use of one of the pool's run or poll methods.
288    /// Though only one task will be completed, progress may be made on multiple tasks.
289    pub fn try_run_one(&mut self) -> Option<usize> {
290        poll_executor(|ctx| {
291            loop {
292                let ret = self.poll_pool_once(ctx);
293
294                // return if we have executed a future
295                if let Poll::Ready(Some(key)) = ret {
296                    return Some(key);
297                }
298
299                // if there are no new incoming futures
300                // then there is no feature that can make progress
301                // and we can return without having completed a single future
302                if self.incoming.borrow().queue.is_empty() {
303                    return None;
304                }
305            }
306        })
307    }
308
309    /// Runs all tasks in the pool and returns if no more progress can be made
310    /// on any task.
311    ///
312    /// ```
313    /// use local_pool_with_id::LocalPool;
314    /// use futures::task::LocalSpawnExt;
315    /// use futures::future::{ready, pending};
316    ///
317    /// let mut pool = LocalPool::new();
318    /// let spawner = pool.spawner();
319    ///
320    /// spawner.spawn_local(ready(())).unwrap();
321    /// spawner.spawn_local(ready(())).unwrap();
322    /// spawner.spawn_local(pending()).unwrap();
323    ///
324    /// // Runs the two ready task and returns.
325    /// // The empty task remains in the pool.
326    /// pool.run_until_stalled();
327    /// ```
328    ///
329    /// This function will not block the calling thread and will return the moment
330    /// that there are no tasks left for which progress can be made;
331    /// remaining incomplete tasks in the pool can continue with further use of one
332    /// of the pool's run or poll methods. While the function is running, all tasks
333    /// in the pool will try to make progress.
334    pub fn run_until_stalled(&mut self) {
335        poll_executor(|ctx| {
336            let _ = self.poll_pool(ctx);
337        });
338    }
339
340    // Make maximal progress on the entire pool of spawned task, returning `Ready`
341    // if the pool is empty and `Pending` if no further progress can be made.
342    fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> {
343        // state for the FuturesUnordered, which will never be used
344        loop {
345            let ret = self.poll_pool_once(cx);
346
347            // we queued up some new tasks; add them and poll again
348            if !self.incoming.borrow().queue.is_empty() {
349                continue;
350            }
351
352            // no queued tasks; we may be done
353            match ret {
354                Poll::Pending => return Poll::Pending,
355                Poll::Ready(None) => return Poll::Ready(()),
356                _ => {}
357            }
358        }
359    }
360
361    // Try make minimal progress on the pool of spawned tasks
362    fn poll_pool_once(&mut self, cx: &mut Context<'_>) -> Poll<Option<usize>> {
363        // empty the incoming queue of newly-spawned tasks
364        {
365            let mut incoming = self.incoming.borrow_mut();
366            for (key, task) in incoming.queue.drain(..) {
367                self.pool.push(IndexWrapper {
368                    data: task,
369                    index: key,
370                })
371            }
372        }
373
374        // try to execute the next ready future
375        self.pool
376            .poll_next_unpin(cx)
377            .map(|poll| poll.map(|wrapper| wrapper.index))
378    }
379}
380
381impl Default for LocalPool {
382    fn default() -> Self {
383        Self::new()
384    }
385}
386
387impl Spawn for LocalSpawner {
388    fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
389        self.spawn_obj_with_id(future).map(|_| ())
390    }
391
392    fn status(&self) -> Result<(), SpawnError> {
393        if self.incoming.upgrade().is_some() {
394            Ok(())
395        } else {
396            Err(SpawnError::shutdown())
397        }
398    }
399}
400
401impl LocalSpawn for LocalSpawner {
402    fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
403        self.spawn_local_obj_with_id(future).map(|_| ())
404    }
405
406    fn status_local(&self) -> Result<(), SpawnError> {
407        if self.incoming.upgrade().is_some() {
408            Ok(())
409        } else {
410            Err(SpawnError::shutdown())
411        }
412    }
413}
414
415impl SpawnWithId for LocalSpawner {
416    fn spawn_obj_with_id(&self, future: FutureObj<'static, ()>) -> Result<usize, SpawnError> {
417        if let Some(incoming) = self.incoming.upgrade() {
418            let mut incoming = incoming.borrow_mut();
419            let id = incoming.index;
420            incoming.index += 1;
421            incoming.queue.push((id, future.into()));
422            Ok(id)
423        } else {
424            Err(SpawnError::shutdown())
425        }
426    }
427}
428
429impl LocalSpawnWithId for LocalSpawner {
430    fn spawn_local_obj_with_id(
431        &self,
432        future: LocalFutureObj<'static, ()>,
433    ) -> Result<usize, SpawnError> {
434        if let Some(incoming) = self.incoming.upgrade() {
435            let mut incoming = incoming.borrow_mut();
436            let id = incoming.index;
437            incoming.index += 1;
438            incoming.queue.push((id, future));
439            Ok(id)
440        } else {
441            Err(SpawnError::shutdown())
442        }
443    }
444}
445
446pub trait SpawnWithId {
447    fn spawn_obj_with_id(&self, future: FutureObj<'static, ()>) -> Result<usize, SpawnError>;
448}
449
450pub trait LocalSpawnWithId {
451    fn spawn_local_obj_with_id(
452        &self,
453        future: LocalFutureObj<'static, ()>,
454    ) -> Result<usize, SpawnError>;
455}
456
457impl<Sp: ?Sized> SpawnWithIdExt for Sp where Sp: SpawnWithId {}
458impl<Sp: ?Sized> LocalSpawnWithIdExt for Sp where Sp: LocalSpawnWithId {}
459
460/// Extension trait for `Spawn`.
461pub trait SpawnWithIdExt: SpawnWithId {
462    /// Spawns a task that polls the given future with output `()` to
463    /// completion.
464    ///
465    /// This method returns a [`Result`] that contains a [`SpawnError`] if
466    /// spawning fails.
467    ///
468    /// You can use [`spawn_with_handle`](SpawnWithIdExt::spawn_with_handle) if
469    /// you want to spawn a future with output other than `()` or if you want
470    /// to be able to await its completion.
471    ///
472    /// Note this method will eventually be replaced with the upcoming
473    /// `Spawn::spawn` method which will take a `dyn Future` as input.
474    /// Technical limitations prevent `Spawn::spawn` from being implemented
475    /// today. Feel free to use this method in the meantime.
476    ///
477    /// ```
478    /// use local_pool_with_id::LocalPool;
479    /// use local_pool_with_id::SpawnWithIdExt;
480    ///
481    /// let executor = LocalPool::new();
482    /// let spawner = executor.spawner();
483    ///
484    /// let future = async { /* ... */ };
485    /// spawner.spawn(future).unwrap();
486    /// ```
487    fn spawn<Fut>(&self, future: Fut) -> Result<usize, SpawnError>
488    where
489        Fut: Future<Output = ()> + Send + 'static,
490    {
491        self.spawn_obj_with_id(FutureObj::new(Box::new(future)))
492    }
493
494    /// Spawns a task that polls the given future to completion and returns a
495    /// future that resolves to the spawned future's output.
496    ///
497    /// This method returns a [`Result`] that contains a [`RemoteHandle`](futures::future::RemoteHandle), or, if
498    /// spawning fails, a [`SpawnError`]. [`RemoteHandle`](futures::future::RemoteHandle) is a future that
499    /// resolves to the output of the spawned future.
500    ///
501    /// ```
502    /// use futures::executor::block_on;
503    /// use futures::future;
504    /// use local_pool_with_id::LocalPool;
505    /// use local_pool_with_id::SpawnWithIdExt;
506    ///
507    /// let mut executor = LocalPool::new();
508    /// let spawner = executor.spawner();
509    ///
510    /// let future = future::ready(1);
511    /// let (id, join_handle_fut) = spawner.spawn_with_handle(future).unwrap();
512    /// assert_eq!(executor.run_until(join_handle_fut), 1);
513    /// ```
514    fn spawn_with_handle<Fut>(
515        &self,
516        future: Fut,
517    ) -> Result<(usize, RemoteHandle<Fut::Output>), SpawnError>
518    where
519        Fut: Future + Send + 'static,
520        Fut::Output: Send,
521    {
522        let (future, handle) = future.remote_handle();
523        let id = self.spawn(future)?;
524        Ok((id, handle))
525    }
526}
527
528/// Extension trait for `LocalSpawn`.
529pub trait LocalSpawnWithIdExt: LocalSpawnWithId {
530    /// Spawns a task that polls the given future with output `()` to
531    /// completion.
532    ///
533    /// This method returns a [`Result`] that contains a [`SpawnError`] if
534    /// spawning fails.
535    ///
536    /// You can use [`spawn_with_handle`](SpawnWithIdExt::spawn_with_handle) if
537    /// you want to spawn a future with output other than `()` or if you want
538    /// to be able to await its completion.
539    ///
540    /// Note this method will eventually be replaced with the upcoming
541    /// `Spawn::spawn` method which will take a `dyn Future` as input.
542    /// Technical limitations prevent `Spawn::spawn` from being implemented
543    /// today. Feel free to use this method in the meantime.
544    ///
545    /// ```
546    /// use local_pool_with_id::LocalPool;
547    /// use local_pool_with_id::LocalSpawnWithIdExt;
548    ///
549    /// let executor = LocalPool::new();
550    /// let spawner = executor.spawner();
551    ///
552    /// let future = async { /* ... */ };
553    /// spawner.spawn_local(future).unwrap();
554    /// ```
555    fn spawn_local<Fut>(&self, future: Fut) -> Result<usize, SpawnError>
556    where
557        Fut: Future<Output = ()> + 'static,
558    {
559        self.spawn_local_obj_with_id(LocalFutureObj::new(Box::new(future)))
560    }
561
562    /// Spawns a task that polls the given future to completion and returns a
563    /// future that resolves to the spawned future's output.
564    ///
565    /// This method returns a [`Result`] that contains a [`RemoteHandle`](futures::future::RemoteHandle), or, if
566    /// spawning fails, a [`SpawnError`]. [`RemoteHandle`](futures::future::RemoteHandle) is a future that
567    /// resolves to the output of the spawned future.
568    ///
569    /// ```
570    /// use local_pool_with_id::LocalPool;
571    /// use local_pool_with_id::LocalSpawnWithIdExt;
572    ///
573    /// let mut executor = LocalPool::new();
574    /// let spawner = executor.spawner();
575    ///
576    /// let future = async { 1 };
577    /// let (id, join_handle_fut) = spawner.spawn_local_with_handle(future).unwrap();
578    /// assert_eq!(executor.run_until(join_handle_fut), 1);
579    /// ```
580    fn spawn_local_with_handle<Fut>(
581        &self,
582        future: Fut,
583    ) -> Result<(usize, RemoteHandle<Fut::Output>), SpawnError>
584    where
585        Fut: Future + 'static,
586    {
587        let (future, handle) = future.remote_handle();
588        let id = self.spawn_local(future)?;
589        Ok((id, handle))
590    }
591}
592
593#[cfg(test)]
594mod tests {
595    use super::*;
596
597    #[test]
598    fn tracking() {
599        let mut spawned_ids = std::collections::HashSet::new();
600        let mut pool = LocalPool::new();
601        let spawner = pool.spawner();
602
603        let (id1, handle1) = spawner
604            .spawn_with_handle(futures::future::ready(1i32))
605            .unwrap();
606        let (id2, handle2) = spawner
607            .spawn_with_handle(futures::future::ready(2u32))
608            .unwrap();
609
610        spawned_ids.insert(id1);
611        spawned_ids.insert(id2);
612
613        while !spawned_ids.is_empty() {
614            if let Some(completed) = pool.try_run_one() {
615                assert!(spawned_ids.remove(&completed))
616            }
617        }
618
619        assert_eq!(handle1.now_or_never().unwrap(), 1);
620        assert_eq!(handle2.now_or_never().unwrap(), 2);
621    }
622}