minimal_executor/
local_pool_new.rs

1use alloc::sync::{Arc, Weak};
2use futures::stream::FuturesUnordered;
3use futures::future::LocalFutureObj;
4use futures::StreamExt;
5use core::task::{Poll};
6use crossbeam::queue::SegQueue;
7use futures::task::UnsafeFutureObj;
8use crate::poll_fn;
9use futures::future::FutureObj;
10use futures::task::Spawn;
11use futures::task::SpawnError;
12
13/// A single-threaded task pool for polling futures to completion.
14///
15/// This executor allows you to multiplex any number of tasks onto a single
16/// thread. It's appropriate to poll strictly I/O-bound futures that do very
17/// little work in between I/O actions.
18///
19/// To get a handle to the pool that implements
20/// [`Spawn`](futures_task::Spawn), use the
21/// [`spawner()`](LocalPool::spawner) method. Because the executor is
22/// single-threaded, it supports a special form of task spawning for non-`Send`
23/// futures, via [`spawn_local_obj`](futures_task::LocalSpawn::spawn_local_obj).
24#[derive(Debug)]
25pub struct LocalPool<'a, Ret = ()> {
26    pool: FuturesUnordered<LocalFutureObj<'a, Ret>>,
27    other: Arc<SegQueue<LocalFutureObj<'static, Ret>>>,
28}
29
30#[derive(Clone)]
31pub struct Spawner<Ret> {
32    tx: Weak<SegQueue<LocalFutureObj<'static, Ret>>>,
33}
34
35impl Spawner<()> {
36    pub fn spawn<F>(&self, f: F) -> Result<(), SpawnError>
37        where F: UnsafeFutureObj<'static, ()> + Send {
38        self.spawn_obj(FutureObj::new(f))
39    }
40}
41
42impl Spawn for Spawner<()> {
43    fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
44        let tx = self.tx.upgrade().ok_or(SpawnError::shutdown())?;
45        tx.push(future.into());
46        Ok(())
47    }
48}
49
50
51impl<'a, Ret> LocalPool<'a, Ret> {
52    /// Create a new, empty pool of tasks.
53    pub fn new() -> Self {
54        Self {
55            pool: FuturesUnordered::new(),
56            other: Arc::new(SegQueue::new())
57        }
58    }
59    pub fn spawner(&self) -> Spawner<Ret> {
60        Spawner {
61            tx: Arc::downgrade(&self.other),
62        }
63    }
64    pub fn spawn<F>(&mut self, f: F)
65        where F: UnsafeFutureObj<'a, Ret> {
66        self.pool.push(LocalFutureObj::new(f))
67    }
68    /// Run all tasks in the pool to completion.
69    ///
70    /// ```rust
71    ///
72    /// use minimal_executor::LocalPool;
73    ///
74    /// let mut pool: LocalPool<'_, ()> = LocalPool::new();
75    ///
76    /// // ... spawn some initial tasks using `spawn.spawn()` or `spawn.spawn_local()`
77    ///
78    /// // run *all* tasks in the pool to completion, including any newly-spawned ones.
79    /// pool.run();
80    /// ```
81    ///
82    /// The function will block the calling thread until *all* tasks in the pool
83    /// are complete, including any spawned while running existing tasks.
84    pub fn run(&mut self) -> alloc::vec::Vec<Ret> {
85        let mut results = alloc::vec::Vec::new();
86        loop {
87            let ret = self.poll_once();
88
89            // no queued tasks; we may be done
90            match ret {
91                Poll::Pending => {}
92                Poll::Ready(None) => break,
93                Poll::Ready(Some(r)) => { results.push(r); }
94            }
95        }
96        results
97    }
98
99    /// Runs all tasks and returns after completing one future or until no more progress
100    /// can be made. Returns `true` if one future was completed, `false` otherwise.
101    ///
102    /// ```rust
103    ///
104    /// use futures::task::LocalSpawnExt;
105    /// use futures::future::{ready, pending};
106    /// use minimal_executor::LocalPool;
107    ///
108    /// let mut pool: LocalPool<'_, ()> = LocalPool::new();
109    /// pool.spawn(Box::pin(ready(())));
110    /// pool.spawn(Box::pin(ready(())));
111    /// pool.spawn(Box::pin(pending()));
112    ///
113    /// // Run the two ready tasks and return true for them.
114    /// pool.try_run_one(); // returns true after completing one of the ready futures
115    /// pool.try_run_one(); // returns true after completing the other ready future
116    ///
117    /// // the remaining task can not be completed
118    /// assert!(pool.try_run_one().is_pending()); // returns false
119    /// ```
120    ///
121    /// This function will not block the calling thread and will return the moment
122    /// that there are no tasks left for which progress can be made or after exactly one
123    /// task was completed; Remaining incomplete tasks in the pool can continue with
124    /// further use of one of the pool's run or poll methods.
125    /// Though only one task will be completed, progress may be made on multiple tasks.
126    pub fn try_run_one(&mut self) -> Poll<Ret> {
127        let ret = self.poll_once();
128        match ret {
129            Poll::Ready(Some(ret)) => {
130                Poll::Ready(ret)
131            }
132            Poll::Ready(None) => {
133                Poll::Pending
134            }
135            Poll::Pending => {
136                Poll::Pending
137            }
138        }
139    }
140
141
142    pub fn poll_once(&mut self) -> Poll<Option<Ret>> {
143        poll_fn(|cx| {
144            while let Some(fut) = self.other.pop() {
145                self.pool.push(fut);
146            }
147            self.pool.poll_next_unpin(cx)
148        })
149    }
150}
151
152impl<'a, Ret> Default for LocalPool<'a, Ret> {
153    fn default() -> Self {
154        Self::new()
155    }
156}
157