futures_executor/local_pool.rs
1use std::prelude::v1::*;
2
3use std::cell::{RefCell};
4use std::rc::{Rc, Weak};
5
6use futures_core::{Future, Poll, Async, Stream};
7use futures_core::task::{Context, Waker, LocalMap};
8use futures_core::executor::{Executor, SpawnError};
9use futures_core::never::Never;
10use futures_util::stream::FuturesUnordered;
11use futures_util::stream::StreamExt;
12
13use thread::ThreadNotify;
14use enter;
15use ThreadPool;
16
17struct Task {
18 fut: Box<Future<Item = (), Error = Never>>,
19 map: LocalMap,
20}
21
22/// A single-threaded task pool.
23///
24/// This executor allows you to multiplex any number of tasks onto a single
25/// thread. It's appropriate for strictly I/O-bound tasks that do very little
26/// work in between I/O actions.
27///
28/// To get a handle to the pool that implements
29/// [`Executor`](::futures_core::executor::Executor), use the
30/// [`executor()`](LocalPool::executor) method. Because the executor is
31/// single-threaded, it supports a special form of task spawning for non-`Send`
32/// futures, via [`spawn_local`](LocalExecutor::spawn_local).
33pub struct LocalPool {
34 pool: FuturesUnordered<Task>,
35 incoming: Rc<Incoming>,
36}
37
38/// A handle to a [`LocalPool`](LocalPool) that implements
39/// [`Executor`](::futures_core::executor::Executor).
40#[derive(Clone)]
41pub struct LocalExecutor {
42 incoming: Weak<Incoming>,
43}
44
45type Incoming = RefCell<Vec<Task>>;
46
47// Set up and run a basic single-threaded executor loop, invocing `f` on each
48// turn.
49fn run_executor<T, F: FnMut(&Waker) -> Async<T>>(mut f: F) -> T {
50 let _enter = enter()
51 .expect("cannot execute `LocalPool` executor from within \
52 another executor");
53
54 ThreadNotify::with_current(|thread| {
55 let waker = &Waker::from(thread.clone());
56 loop {
57 if let Async::Ready(t) = f(waker) {
58 return t;
59 }
60 thread.park();
61 }
62 })
63}
64
65impl LocalPool {
66 /// Create a new, empty pool of tasks.
67 pub fn new() -> LocalPool {
68 LocalPool {
69 pool: FuturesUnordered::new(),
70 incoming: Default::default(),
71 }
72 }
73
74 /// Get a clonable handle to the pool as an executor.
75 pub fn executor(&self) -> LocalExecutor {
76 LocalExecutor {
77 incoming: Rc::downgrade(&self.incoming)
78 }
79 }
80
81 /// Run all tasks in the pool to completion.
82 ///
83 /// The given executor, `exec`, is used as the default executor for any
84 /// *newly*-spawned tasks. You can route these additional tasks back into
85 /// the `LocalPool` by using its executor handle:
86 ///
87 /// ```
88 /// # extern crate futures;
89 /// # use futures::executor::LocalPool;
90 ///
91 /// # fn main() {
92 /// let mut pool = LocalPool::new();
93 /// let mut exec = pool.executor();
94 ///
95 /// // ... spawn some initial tasks using `exec.spawn()` or `exec.spawn_local()`
96 ///
97 /// // run *all* tasks in the pool to completion, including any newly-spawned ones.
98 /// pool.run(&mut exec);
99 /// # }
100 /// ```
101 ///
102 /// The function will block the calling thread until *all* tasks in the pool
103 /// are complete, including any spawned while running existing tasks.
104 pub fn run(&mut self, exec: &mut Executor) {
105 run_executor(|waker| self.poll_pool(waker, exec))
106 }
107
108 /// Runs all the tasks in the pool until the given future completes.
109 ///
110 /// The given executor, `exec`, is used as the default executor for any
111 /// *newly*-spawned tasks. You can route these additional tasks back into
112 /// the `LocalPool` by using its executor handle:
113 ///
114 /// ```
115 /// # extern crate futures;
116 /// # use futures::executor::LocalPool;
117 /// # use futures::future::{Future, ok};
118 ///
119 /// # fn main() {
120 /// let mut pool = LocalPool::new();
121 /// let mut exec = pool.executor();
122 /// # let my_app: Box<Future<Item = (), Error = ()>> = Box::new(ok(()));
123 ///
124 /// // run tasks in the pool until `my_app` completes, by default spawning
125 /// // further tasks back onto the pool
126 /// pool.run_until(my_app, &mut exec);
127 /// # }
128 /// ```
129 ///
130 /// The function will block the calling thread *only* until the future `f`
131 /// completes; there may still be incomplete tasks in the pool, which will
132 /// be inert after the call completes, but can continue with further use of
133 /// `run` or `run_until`. While the function is running, however, all tasks
134 /// in the pool will try to make progress.
135 pub fn run_until<F>(&mut self, mut f: F, exec: &mut Executor) -> Result<F::Item, F::Error>
136 where F: Future
137 {
138 // persistent state for the "main task"
139 let mut main_map = LocalMap::new();
140
141 run_executor(|waker| {
142 {
143 let mut main_cx = Context::new(&mut main_map, waker, exec);
144
145 // if our main task is done, so are we
146 match f.poll(&mut main_cx) {
147 Ok(Async::Ready(v)) => return Async::Ready(Ok(v)),
148 Err(err) => return Async::Ready(Err(err)),
149 _ => {}
150 }
151 }
152
153 self.poll_pool(waker, exec);
154 Async::Pending
155 })
156 }
157
158 // Make maximal progress on the entire pool of spawned task, returning `Ready`
159 // if the pool is empty and `Pending` if no further progress can be made.
160 fn poll_pool(&mut self, waker: &Waker, exec: &mut Executor) -> Async<()> {
161 // state for the FuturesUnordered, which will never be used
162 let mut pool_map = LocalMap::new();
163 let mut pool_cx = Context::new(&mut pool_map, waker, exec);
164
165 loop {
166 // empty the incoming queue of newly-spawned tasks
167 {
168 let mut incoming = self.incoming.borrow_mut();
169 for task in incoming.drain(..) {
170 self.pool.push(task)
171 }
172 }
173
174 if let Ok(ret) = self.pool.poll_next(&mut pool_cx) {
175 // we queued up some new tasks; add them and poll again
176 if !self.incoming.borrow().is_empty() {
177 continue;
178 }
179
180 // no queued tasks; we may be done
181 match ret {
182 Async::Pending => return Async::Pending,
183 Async::Ready(None) => return Async::Ready(()),
184 _ => {}
185 }
186 }
187 }
188 }
189}
190
191lazy_static! {
192 static ref GLOBAL_POOL: ThreadPool = ThreadPool::builder()
193 .name_prefix("block_on-")
194 .create()
195 .expect("Unable to create global thread-pool");
196}
197
198/// Run a future to completion on the current thread.
199///
200/// This function will block the caller until the given future has completed.
201/// The default executor for the future is a global `ThreadPool`.
202///
203/// Use a [`LocalPool`](LocalPool) if you need finer-grained control over
204/// spawned tasks.
205pub fn block_on<F: Future>(f: F) -> Result<F::Item, F::Error> {
206 let mut pool = LocalPool::new();
207 pool.run_until(f, &mut GLOBAL_POOL.clone())
208}
209
210/// Turn a stream into a blocking iterator.
211///
212/// When `next` is called on the resulting `BlockingStream`, the caller
213/// will be blocked until the next element of the `Stream` becomes available.
214/// The default executor for the future is a global `ThreadPool`.
215pub fn block_on_stream<S: Stream>(s: S) -> BlockingStream<S> {
216 BlockingStream { stream: Some(s) }
217}
218
219/// An iterator which blocks on values from a stream until they become available.
220pub struct BlockingStream<S: Stream> { stream: Option<S> }
221
222impl<S: Stream> BlockingStream<S> {
223 /// Convert this `BlockingStream` into the inner `Stream` type.
224 pub fn into_inner(self) -> S {
225 self.stream.expect("BlockingStream shouldn't be empty")
226 }
227}
228
229impl<S: Stream> Iterator for BlockingStream<S> {
230 type Item = Result<S::Item, S::Error>;
231 fn next(&mut self) -> Option<Self::Item> {
232 let s = self.stream.take().expect("BlockingStream shouldn't be empty");
233 let (item, s) =
234 match LocalPool::new().run_until(s.next(), &mut GLOBAL_POOL.clone()) {
235 Ok((Some(item), s)) => (Some(Ok(item)), s),
236 Ok((None, s)) => (None, s),
237 Err((e, s)) => (Some(Err(e)), s),
238 };
239
240 self.stream = Some(s);
241 item
242 }
243}
244
245impl Executor for LocalExecutor {
246 fn spawn(&mut self, f: Box<Future<Item = (), Error = Never> + Send>) -> Result<(), SpawnError> {
247 self.spawn_task(Task {
248 fut: f,
249 map: LocalMap::new(),
250 })
251 }
252
253 fn status(&self) -> Result<(), SpawnError> {
254 if self.incoming.upgrade().is_some() {
255 Ok(())
256 } else {
257 Err(SpawnError::shutdown())
258 }
259 }
260}
261
262impl LocalExecutor {
263 fn spawn_task(&self, task: Task) -> Result<(), SpawnError> {
264 let incoming = self.incoming.upgrade().ok_or(SpawnError::shutdown())?;
265 incoming.borrow_mut().push(task);
266 Ok(())
267 }
268
269 /// Spawn a non-`Send` future onto the associated [`LocalPool`](LocalPool).
270 pub fn spawn_local<F>(&mut self, f: F) -> Result<(), SpawnError>
271 where F: Future<Item = (), Error = Never> + 'static
272 {
273 self.spawn_task(Task {
274 fut: Box::new(f),
275 map: LocalMap::new(),
276 })
277 }
278}
279
280impl Future for Task {
281 type Item = ();
282 type Error = Never;
283
284 fn poll(&mut self, cx: &mut Context) -> Poll<(), Never> {
285 self.fut.poll(&mut cx.with_locals(&mut self.map))
286 }
287}