minimal_executor/
local_pool_old.rs

1use futures::stream::FuturesUnordered;
2use futures::future::LocalFutureObj;
3use futures::StreamExt;
4use core::task::{Poll};
5use futures::task::UnsafeFutureObj;
6use crate::poll_fn;
7use futures::future::FutureObj;
8use futures::task::Spawn;
9use futures::task::SpawnError;
10
11/// A single-threaded task pool for polling futures to completion.
12///
13/// This executor allows you to multiplex any number of tasks onto a single
14/// thread. It's appropriate to poll strictly I/O-bound futures that do very
15/// little work in between I/O actions.
16///
17/// To get a handle to the pool that implements
18/// [`Spawn`](futures_task::Spawn), use the
19/// [`spawner()`](LocalPool::spawner) method. Because the executor is
20/// single-threaded, it supports a special form of task spawning for non-`Send`
21/// futures, via [`spawn_local_obj`](futures_task::LocalSpawn::spawn_local_obj).
22#[derive(Debug)]
23pub struct LocalPool<'a, Ret = ()> {
24    pool: FuturesUnordered<LocalFutureObj<'a, Ret>>,
25    rx: kanal::Receiver<FutureObj<'static, Ret>>,
26    tx: kanal::Sender<FutureObj<'static, Ret>>,
27}
28
29
30#[derive(Clone)]
31pub struct Spawner<Ret> {
32    tx: kanal::Sender<FutureObj<'static, Ret>>,
33}
34
35impl<Ret> Spawner<Ret> {
36    pub fn spawn<F>(&self, f: F) -> Result<(), SpawnError>
37        where F: UnsafeFutureObj<'static, Ret> + Send {
38        self.tx.send(FutureObj::new(f)).map_err(|_| SpawnError::shutdown())
39    }
40}
41
42impl Spawn for Spawner<()> {
43    fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
44        self.tx.send(future).map_err(|_| SpawnError::shutdown())
45    }
46}
47
48
49impl<'a, Ret> LocalPool<'a, Ret> {
50    /// Create a new, empty pool of tasks.
51    pub fn new() -> Self {
52        let (tx, rx) = kanal::unbounded();
53        Self { pool: FuturesUnordered::new(), rx, tx }
54    }
55
56    pub fn spawner(&self) -> Spawner<Ret> {
57        Spawner {
58            tx: self.tx.clone()
59        }
60    }
61    pub fn spawn<F>(&mut self, f: F)
62        where F: UnsafeFutureObj<'a, Ret> {
63        self.pool.push(LocalFutureObj::new(f))
64    }
65    /// Run all tasks in the pool to completion.
66    ///
67    /// ```rust
68    ///
69    /// use minimal_executor::LocalPool;
70    ///
71    /// let mut pool: LocalPool<'_, ()> = LocalPool::new();
72    ///
73    /// // ... spawn some initial tasks using `spawn.spawn()` or `spawn.spawn_local()`
74    ///
75    /// // run *all* tasks in the pool to completion, including any newly-spawned ones.
76    /// pool.run();
77    /// ```
78    ///
79    /// The function will block the calling thread until *all* tasks in the pool
80    /// are complete, including any spawned while running existing tasks.
81    pub fn run(&mut self) -> alloc::vec::Vec<Ret> {
82        let mut results = alloc::vec::Vec::new();
83        loop {
84            let ret = self.poll_once();
85
86            // no queued tasks; we may be done
87            match ret {
88                Poll::Pending => {}
89                Poll::Ready(None) => break,
90                Poll::Ready(Some(r)) => { results.push(r); }
91            }
92        }
93        results
94    }
95
96    /// Runs all tasks and returns after completing one future or until no more progress
97    /// can be made. Returns `true` if one future was completed, `false` otherwise.
98    ///
99    /// ```rust
100    ///
101    /// use futures::task::LocalSpawnExt;
102    /// use futures::future::{ready, pending};
103    /// use minimal_executor::LocalPool;
104    ///
105    /// let mut pool: LocalPool<'_, ()> = LocalPool::new();
106    /// pool.spawn(Box::pin(ready(())));
107    /// pool.spawn(Box::pin(ready(())));
108    /// pool.spawn(Box::pin(pending()));
109    ///
110    /// // Run the two ready tasks and return true for them.
111    /// pool.try_run_one(); // returns true after completing one of the ready futures
112    /// pool.try_run_one(); // returns true after completing the other ready future
113    ///
114    /// // the remaining task can not be completed
115    /// assert!(pool.try_run_one().is_pending()); // returns false
116    /// ```
117    ///
118    /// This function will not block the calling thread and will return the moment
119    /// that there are no tasks left for which progress can be made or after exactly one
120    /// task was completed; Remaining incomplete tasks in the pool can continue with
121    /// further use of one of the pool's run or poll methods.
122    /// Though only one task will be completed, progress may be made on multiple tasks.
123    pub fn try_run_one(&mut self) -> Poll<Ret> {
124        let ret = self.poll_once();
125        match ret {
126            Poll::Ready(Some(ret)) => {
127                Poll::Ready(ret)
128            }
129            Poll::Ready(None) => {
130                Poll::Pending
131            }
132            Poll::Pending => {
133                Poll::Pending
134            }
135        }
136    }
137
138
139    pub fn poll_once(&mut self) -> Poll<Option<Ret>> {
140        poll_fn(|cx| {
141            while let Some(fut) = self.rx.try_recv().ok().flatten() {
142                self.pool.push(LocalFutureObj::from(fut))
143            }
144            self.pool.poll_next_unpin(cx)
145        })
146    }
147}
148
149impl<'a, Ret> Default for LocalPool<'a, Ret> {
150    fn default() -> Self {
151        Self::new()
152    }
153}
154