minimal_executor/local_pool_old.rs
1use futures::stream::FuturesUnordered;
2use futures::future::LocalFutureObj;
3use futures::StreamExt;
4use core::task::{Poll};
5use futures::task::UnsafeFutureObj;
6use crate::poll_fn;
7use futures::future::FutureObj;
8use futures::task::Spawn;
9use futures::task::SpawnError;
10
11/// A single-threaded task pool for polling futures to completion.
12///
13/// This executor allows you to multiplex any number of tasks onto a single
14/// thread. It's appropriate to poll strictly I/O-bound futures that do very
15/// little work in between I/O actions.
16///
17/// To get a handle to the pool that implements
18/// [`Spawn`](futures_task::Spawn), use the
19/// [`spawner()`](LocalPool::spawner) method. Because the executor is
20/// single-threaded, it supports a special form of task spawning for non-`Send`
21/// futures, via [`spawn_local_obj`](futures_task::LocalSpawn::spawn_local_obj).
22#[derive(Debug)]
23pub struct LocalPool<'a, Ret = ()> {
24 pool: FuturesUnordered<LocalFutureObj<'a, Ret>>,
25 rx: kanal::Receiver<FutureObj<'static, Ret>>,
26 tx: kanal::Sender<FutureObj<'static, Ret>>,
27}
28
29
30#[derive(Clone)]
31pub struct Spawner<Ret> {
32 tx: kanal::Sender<FutureObj<'static, Ret>>,
33}
34
35impl<Ret> Spawner<Ret> {
36 pub fn spawn<F>(&self, f: F) -> Result<(), SpawnError>
37 where F: UnsafeFutureObj<'static, Ret> + Send {
38 self.tx.send(FutureObj::new(f)).map_err(|_| SpawnError::shutdown())
39 }
40}
41
42impl Spawn for Spawner<()> {
43 fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
44 self.tx.send(future).map_err(|_| SpawnError::shutdown())
45 }
46}
47
48
49impl<'a, Ret> LocalPool<'a, Ret> {
50 /// Create a new, empty pool of tasks.
51 pub fn new() -> Self {
52 let (tx, rx) = kanal::unbounded();
53 Self { pool: FuturesUnordered::new(), rx, tx }
54 }
55
56 pub fn spawner(&self) -> Spawner<Ret> {
57 Spawner {
58 tx: self.tx.clone()
59 }
60 }
61 pub fn spawn<F>(&mut self, f: F)
62 where F: UnsafeFutureObj<'a, Ret> {
63 self.pool.push(LocalFutureObj::new(f))
64 }
65 /// Run all tasks in the pool to completion.
66 ///
67 /// ```rust
68 ///
69 /// use minimal_executor::LocalPool;
70 ///
71 /// let mut pool: LocalPool<'_, ()> = LocalPool::new();
72 ///
73 /// // ... spawn some initial tasks using `spawn.spawn()` or `spawn.spawn_local()`
74 ///
75 /// // run *all* tasks in the pool to completion, including any newly-spawned ones.
76 /// pool.run();
77 /// ```
78 ///
79 /// The function will block the calling thread until *all* tasks in the pool
80 /// are complete, including any spawned while running existing tasks.
81 pub fn run(&mut self) -> alloc::vec::Vec<Ret> {
82 let mut results = alloc::vec::Vec::new();
83 loop {
84 let ret = self.poll_once();
85
86 // no queued tasks; we may be done
87 match ret {
88 Poll::Pending => {}
89 Poll::Ready(None) => break,
90 Poll::Ready(Some(r)) => { results.push(r); }
91 }
92 }
93 results
94 }
95
96 /// Runs all tasks and returns after completing one future or until no more progress
97 /// can be made. Returns `true` if one future was completed, `false` otherwise.
98 ///
99 /// ```rust
100 ///
101 /// use futures::task::LocalSpawnExt;
102 /// use futures::future::{ready, pending};
103 /// use minimal_executor::LocalPool;
104 ///
105 /// let mut pool: LocalPool<'_, ()> = LocalPool::new();
106 /// pool.spawn(Box::pin(ready(())));
107 /// pool.spawn(Box::pin(ready(())));
108 /// pool.spawn(Box::pin(pending()));
109 ///
110 /// // Run the two ready tasks and return true for them.
111 /// pool.try_run_one(); // returns true after completing one of the ready futures
112 /// pool.try_run_one(); // returns true after completing the other ready future
113 ///
114 /// // the remaining task can not be completed
115 /// assert!(pool.try_run_one().is_pending()); // returns false
116 /// ```
117 ///
118 /// This function will not block the calling thread and will return the moment
119 /// that there are no tasks left for which progress can be made or after exactly one
120 /// task was completed; Remaining incomplete tasks in the pool can continue with
121 /// further use of one of the pool's run or poll methods.
122 /// Though only one task will be completed, progress may be made on multiple tasks.
123 pub fn try_run_one(&mut self) -> Poll<Ret> {
124 let ret = self.poll_once();
125 match ret {
126 Poll::Ready(Some(ret)) => {
127 Poll::Ready(ret)
128 }
129 Poll::Ready(None) => {
130 Poll::Pending
131 }
132 Poll::Pending => {
133 Poll::Pending
134 }
135 }
136 }
137
138
139 pub fn poll_once(&mut self) -> Poll<Option<Ret>> {
140 poll_fn(|cx| {
141 while let Some(fut) = self.rx.try_recv().ok().flatten() {
142 self.pool.push(LocalFutureObj::from(fut))
143 }
144 self.pool.poll_next_unpin(cx)
145 })
146 }
147}
148
149impl<'a, Ret> Default for LocalPool<'a, Ret> {
150 fn default() -> Self {
151 Self::new()
152 }
153}
154