local_pool_with_id/lib.rs
1//! # local-pool-with-id
2//! A minor variation on a [LocalPool](https://docs.rs/futures/0.3/futures/executor/struct.LocalPool.html) executor which exposes unique IDs for tracking future completion.
3//!
4//! This should almost be a drop in replacement for the existing LocalPool. All existing traits are still implemented. There are two API differences:
5//! 1. New `(Local)SpawnWithId` traits have been implemented. These accept the same arguments as their non-ID counterparts but return a unique ID that can be used to identify whether a spawned future has been completed.
6//! 2. `try_run_one` now returns an `Option<usize>` instead of a boolean. This usize will correspond to the ID received from the previous APIs and can be used with external tracking mechanism to know if a future is complete.
7//!
8//! ## Motivation
9//! The existing `LocalPool` allowed you to run all futures, opaquely, in a non-blocking way or to, blockingly, run a single future to completion and retrieve it's output. By providing tracking IDs, we can use an external lookup to infer which futures are finished and ask them for their results directly.
10//!
11//! ## Example
12//! ```rust
13//! use local_pool_with_id::SpawnWithIdExt;
14//! use futures::prelude::*;
15//!
16//! let mut spawned_ids = std::collections::HashSet::new();
17//! let mut pool = local_pool_with_id::LocalPool::new();
18//! let spawner = pool.spawner();
19//!
20//! let (id1, handle1) = spawner
21//! .spawn_with_handle(futures::future::ready(1i32))
22//! .unwrap();
23//! let (id2, handle2) = spawner
24//! .spawn_with_handle(futures::future::ready(2u32))
25//! .unwrap();
26//!
27//! spawned_ids.insert(id1);
28//! spawned_ids.insert(id2);
29//!
30//! while !spawned_ids.is_empty() {
31//! if let Some(completed) = pool.try_run_one() {
32//! assert!(spawned_ids.remove(&completed))
33//! }
34//! }
35//!
36//! assert_eq!(handle1.now_or_never().unwrap(), 1);
37//! assert_eq!(handle2.now_or_never().unwrap(), 2);
38//! ```
39
40use futures::executor::enter;
41use futures::future::{FutureObj, LocalFutureObj, RemoteHandle};
42use futures::prelude::*;
43use futures::stream::FuturesUnordered;
44use futures::task::{waker_ref, ArcWake, LocalSpawn, Spawn, SpawnError};
45use std::cell::RefCell;
46use std::pin::Pin;
47use std::rc::{Rc, Weak};
48use std::sync::atomic::{AtomicBool, Ordering};
49use std::sync::Arc;
50use std::task::{Context, Poll};
51use std::thread;
52use std::thread::Thread;
53
54#[must_use = "futures do nothing unless you `.await` or poll them"]
55#[derive(Debug)]
56struct IndexWrapper<T> {
57 data: T, // A future or a future's output
58 index: usize,
59}
60
61impl<T> IndexWrapper<T> {
62 pin_utils::unsafe_pinned!(data: T);
63}
64
65impl<T> Future for IndexWrapper<T>
66where
67 T: Future,
68{
69 type Output = IndexWrapper<T::Output>;
70
71 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
72 self.as_mut()
73 .data()
74 .as_mut()
75 .poll(cx)
76 .map(|output| IndexWrapper {
77 data: output,
78 index: self.index,
79 })
80 }
81}
82
83/// A single-threaded task pool for polling futures to completion.
84///
85/// This executor allows you to multiplex any number of tasks onto a single
86/// thread. It's appropriate to poll strictly I/O-bound futures that do very
87/// little work in between I/O actions.
88///
89/// To get a handle to the pool that implements
90/// [`Spawn`](futures_task::Spawn), use the
91/// [`spawner()`](LocalPool::spawner) method. Because the executor is
92/// single-threaded, it supports a special form of task spawning for non-`Send`
93/// futures, via [`spawn_local_obj`](futures_task::LocalSpawn::spawn_local_obj).
94#[derive(Debug)]
95pub struct LocalPool {
96 pool: FuturesUnordered<IndexWrapper<LocalFutureObj<'static, ()>>>,
97 incoming: Rc<Incoming>,
98}
99
100/// A handle to a [`LocalPool`](LocalPool) that implements
101/// [`Spawn`](futures_task::Spawn).
102#[derive(Clone, Debug)]
103pub struct LocalSpawner {
104 incoming: Weak<Incoming>,
105}
106
107#[derive(Debug, Default)]
108struct IncomingTracking {
109 queue: Vec<(usize, LocalFutureObj<'static, ()>)>,
110 index: usize,
111}
112
113type Incoming = RefCell<IncomingTracking>;
114
115pub(crate) struct ThreadNotify {
116 /// The (single) executor thread.
117 thread: Thread,
118 /// A flag to ensure a wakeup (i.e. `unpark()`) is not "forgotten"
119 /// before the next `park()`, which may otherwise happen if the code
120 /// being executed as part of the future(s) being polled makes use of
121 /// park / unpark calls of its own, i.e. we cannot assume that no other
122 /// code uses park / unpark on the executing `thread`.
123 unparked: AtomicBool,
124}
125
126thread_local! {
127 static CURRENT_THREAD_NOTIFY: Arc<ThreadNotify> = Arc::new(ThreadNotify {
128 thread: thread::current(),
129 unparked: AtomicBool::new(false),
130 });
131}
132
133impl ArcWake for ThreadNotify {
134 fn wake_by_ref(arc_self: &Arc<Self>) {
135 // Make sure the wakeup is remembered until the next `park()`.
136 let unparked = arc_self.unparked.swap(true, Ordering::Relaxed);
137 if !unparked {
138 // If the thread has not been unparked yet, it must be done
139 // now. If it was actually parked, it will run again,
140 // otherwise the token made available by `unpark`
141 // may be consumed before reaching `park()`, but `unparked`
142 // ensures it is not forgotten.
143 arc_self.thread.unpark();
144 }
145 }
146}
147
148// Set up and run a basic single-threaded spawner loop, invoking `f` on each
149// turn.
150fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
151 let _enter = enter().expect(
152 "cannot execute `LocalPool` executor from within \
153 another executor",
154 );
155
156 CURRENT_THREAD_NOTIFY.with(|thread_notify| {
157 let waker = waker_ref(thread_notify);
158 let mut cx = Context::from_waker(&waker);
159 loop {
160 if let Poll::Ready(t) = f(&mut cx) {
161 return t;
162 }
163 // Consume the wakeup that occurred while executing `f`, if any.
164 let unparked = thread_notify.unparked.swap(false, Ordering::Acquire);
165 if !unparked {
166 // No wakeup occurred. It may occur now, right before parking,
167 // but in that case the token made available by `unpark()`
168 // is guaranteed to still be available and `park()` is a no-op.
169 thread::park();
170 // When the thread is unparked, `unparked` will have been set
171 // and needs to be unset before the next call to `f` to avoid
172 // a redundant loop iteration.
173 thread_notify.unparked.store(false, Ordering::Release);
174 }
175 }
176 })
177}
178
179fn poll_executor<T, F: FnMut(&mut Context<'_>) -> T>(mut f: F) -> T {
180 let _enter = enter().expect(
181 "cannot execute `LocalPool` executor from within \
182 another executor",
183 );
184
185 CURRENT_THREAD_NOTIFY.with(|thread_notify| {
186 let waker = waker_ref(thread_notify);
187 let mut cx = Context::from_waker(&waker);
188 f(&mut cx)
189 })
190}
191
192impl LocalPool {
193 /// Create a new, empty pool of tasks.
194 pub fn new() -> LocalPool {
195 LocalPool {
196 pool: FuturesUnordered::new(),
197 incoming: Default::default(),
198 }
199 }
200
201 /// Get a clonable handle to the pool as a [`Spawn`].
202 pub fn spawner(&self) -> LocalSpawner {
203 LocalSpawner {
204 incoming: Rc::downgrade(&self.incoming),
205 }
206 }
207
208 /// Run all tasks in the pool to completion.
209 ///
210 /// ```
211 /// use local_pool_with_id::LocalPool;
212 ///
213 /// let mut pool = LocalPool::new();
214 ///
215 /// // ... spawn some initial tasks using `spawn.spawn()` or `spawn.spawn_local()`
216 ///
217 /// // run *all* tasks in the pool to completion, including any newly-spawned ones.
218 /// pool.run();
219 /// ```
220 ///
221 /// The function will block the calling thread until *all* tasks in the pool
222 /// are complete, including any spawned while running existing tasks.
223 pub fn run(&mut self) {
224 run_executor(|cx| self.poll_pool(cx))
225 }
226
227 /// Runs all the tasks in the pool until the given future completes.
228 ///
229 /// ```
230 /// use local_pool_with_id::LocalPool;
231 ///
232 /// let mut pool = LocalPool::new();
233 /// # let my_app = async {};
234 ///
235 /// // run tasks in the pool until `my_app` completes
236 /// pool.run_until(my_app);
237 /// ```
238 ///
239 /// The function will block the calling thread *only* until the future `f`
240 /// completes; there may still be incomplete tasks in the pool, which will
241 /// be inert after the call completes, but can continue with further use of
242 /// one of the pool's run or poll methods. While the function is running,
243 /// however, all tasks in the pool will try to make progress.
244 pub fn run_until<F: Future>(&mut self, future: F) -> F::Output {
245 pin_utils::pin_mut!(future);
246
247 run_executor(|cx| {
248 {
249 // if our main task is done, so are we
250 let result = future.as_mut().poll(cx);
251 if let Poll::Ready(output) = result {
252 return Poll::Ready(output);
253 }
254 }
255
256 let _ = self.poll_pool(cx);
257 Poll::Pending
258 })
259 }
260
261 /// Runs all tasks and returns after completing one future or until no more progress
262 /// can be made. Returns the associated ID if one future was completed, `None` otherwise.
263 ///
264 /// ```
265 /// use local_pool_with_id::LocalPool;
266 /// use futures::task::LocalSpawnExt;
267 /// use futures::future::{ready, pending};
268 ///
269 /// let mut pool = LocalPool::new();
270 /// let spawner = pool.spawner();
271 ///
272 /// spawner.spawn_local(ready(())).unwrap();
273 /// spawner.spawn_local(ready(())).unwrap();
274 /// spawner.spawn_local(pending()).unwrap();
275 ///
276 /// // Run the two ready tasks and returns the IDs for them.
277 /// assert!(pool.try_run_one().is_some());
278 /// assert!(pool.try_run_one().is_some());
279 ///
280 /// // the remaining task can not be completed
281 /// assert!(pool.try_run_one().is_none()); // returns false
282 /// ```
283 ///
284 /// This function will not block the calling thread and will return the moment
285 /// that there are no tasks left for which progress can be made or after exactly one
286 /// task was completed; Remaining incomplete tasks in the pool can continue with
287 /// further use of one of the pool's run or poll methods.
288 /// Though only one task will be completed, progress may be made on multiple tasks.
289 pub fn try_run_one(&mut self) -> Option<usize> {
290 poll_executor(|ctx| {
291 loop {
292 let ret = self.poll_pool_once(ctx);
293
294 // return if we have executed a future
295 if let Poll::Ready(Some(key)) = ret {
296 return Some(key);
297 }
298
299 // if there are no new incoming futures
300 // then there is no feature that can make progress
301 // and we can return without having completed a single future
302 if self.incoming.borrow().queue.is_empty() {
303 return None;
304 }
305 }
306 })
307 }
308
309 /// Runs all tasks in the pool and returns if no more progress can be made
310 /// on any task.
311 ///
312 /// ```
313 /// use local_pool_with_id::LocalPool;
314 /// use futures::task::LocalSpawnExt;
315 /// use futures::future::{ready, pending};
316 ///
317 /// let mut pool = LocalPool::new();
318 /// let spawner = pool.spawner();
319 ///
320 /// spawner.spawn_local(ready(())).unwrap();
321 /// spawner.spawn_local(ready(())).unwrap();
322 /// spawner.spawn_local(pending()).unwrap();
323 ///
324 /// // Runs the two ready task and returns.
325 /// // The empty task remains in the pool.
326 /// pool.run_until_stalled();
327 /// ```
328 ///
329 /// This function will not block the calling thread and will return the moment
330 /// that there are no tasks left for which progress can be made;
331 /// remaining incomplete tasks in the pool can continue with further use of one
332 /// of the pool's run or poll methods. While the function is running, all tasks
333 /// in the pool will try to make progress.
334 pub fn run_until_stalled(&mut self) {
335 poll_executor(|ctx| {
336 let _ = self.poll_pool(ctx);
337 });
338 }
339
340 // Make maximal progress on the entire pool of spawned task, returning `Ready`
341 // if the pool is empty and `Pending` if no further progress can be made.
342 fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> {
343 // state for the FuturesUnordered, which will never be used
344 loop {
345 let ret = self.poll_pool_once(cx);
346
347 // we queued up some new tasks; add them and poll again
348 if !self.incoming.borrow().queue.is_empty() {
349 continue;
350 }
351
352 // no queued tasks; we may be done
353 match ret {
354 Poll::Pending => return Poll::Pending,
355 Poll::Ready(None) => return Poll::Ready(()),
356 _ => {}
357 }
358 }
359 }
360
361 // Try make minimal progress on the pool of spawned tasks
362 fn poll_pool_once(&mut self, cx: &mut Context<'_>) -> Poll<Option<usize>> {
363 // empty the incoming queue of newly-spawned tasks
364 {
365 let mut incoming = self.incoming.borrow_mut();
366 for (key, task) in incoming.queue.drain(..) {
367 self.pool.push(IndexWrapper {
368 data: task,
369 index: key,
370 })
371 }
372 }
373
374 // try to execute the next ready future
375 self.pool
376 .poll_next_unpin(cx)
377 .map(|poll| poll.map(|wrapper| wrapper.index))
378 }
379}
380
381impl Default for LocalPool {
382 fn default() -> Self {
383 Self::new()
384 }
385}
386
387impl Spawn for LocalSpawner {
388 fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
389 self.spawn_obj_with_id(future).map(|_| ())
390 }
391
392 fn status(&self) -> Result<(), SpawnError> {
393 if self.incoming.upgrade().is_some() {
394 Ok(())
395 } else {
396 Err(SpawnError::shutdown())
397 }
398 }
399}
400
401impl LocalSpawn for LocalSpawner {
402 fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
403 self.spawn_local_obj_with_id(future).map(|_| ())
404 }
405
406 fn status_local(&self) -> Result<(), SpawnError> {
407 if self.incoming.upgrade().is_some() {
408 Ok(())
409 } else {
410 Err(SpawnError::shutdown())
411 }
412 }
413}
414
415impl SpawnWithId for LocalSpawner {
416 fn spawn_obj_with_id(&self, future: FutureObj<'static, ()>) -> Result<usize, SpawnError> {
417 if let Some(incoming) = self.incoming.upgrade() {
418 let mut incoming = incoming.borrow_mut();
419 let id = incoming.index;
420 incoming.index += 1;
421 incoming.queue.push((id, future.into()));
422 Ok(id)
423 } else {
424 Err(SpawnError::shutdown())
425 }
426 }
427}
428
429impl LocalSpawnWithId for LocalSpawner {
430 fn spawn_local_obj_with_id(
431 &self,
432 future: LocalFutureObj<'static, ()>,
433 ) -> Result<usize, SpawnError> {
434 if let Some(incoming) = self.incoming.upgrade() {
435 let mut incoming = incoming.borrow_mut();
436 let id = incoming.index;
437 incoming.index += 1;
438 incoming.queue.push((id, future));
439 Ok(id)
440 } else {
441 Err(SpawnError::shutdown())
442 }
443 }
444}
445
446pub trait SpawnWithId {
447 fn spawn_obj_with_id(&self, future: FutureObj<'static, ()>) -> Result<usize, SpawnError>;
448}
449
450pub trait LocalSpawnWithId {
451 fn spawn_local_obj_with_id(
452 &self,
453 future: LocalFutureObj<'static, ()>,
454 ) -> Result<usize, SpawnError>;
455}
456
457impl<Sp: ?Sized> SpawnWithIdExt for Sp where Sp: SpawnWithId {}
458impl<Sp: ?Sized> LocalSpawnWithIdExt for Sp where Sp: LocalSpawnWithId {}
459
460/// Extension trait for `Spawn`.
461pub trait SpawnWithIdExt: SpawnWithId {
462 /// Spawns a task that polls the given future with output `()` to
463 /// completion.
464 ///
465 /// This method returns a [`Result`] that contains a [`SpawnError`] if
466 /// spawning fails.
467 ///
468 /// You can use [`spawn_with_handle`](SpawnWithIdExt::spawn_with_handle) if
469 /// you want to spawn a future with output other than `()` or if you want
470 /// to be able to await its completion.
471 ///
472 /// Note this method will eventually be replaced with the upcoming
473 /// `Spawn::spawn` method which will take a `dyn Future` as input.
474 /// Technical limitations prevent `Spawn::spawn` from being implemented
475 /// today. Feel free to use this method in the meantime.
476 ///
477 /// ```
478 /// use local_pool_with_id::LocalPool;
479 /// use local_pool_with_id::SpawnWithIdExt;
480 ///
481 /// let executor = LocalPool::new();
482 /// let spawner = executor.spawner();
483 ///
484 /// let future = async { /* ... */ };
485 /// spawner.spawn(future).unwrap();
486 /// ```
487 fn spawn<Fut>(&self, future: Fut) -> Result<usize, SpawnError>
488 where
489 Fut: Future<Output = ()> + Send + 'static,
490 {
491 self.spawn_obj_with_id(FutureObj::new(Box::new(future)))
492 }
493
494 /// Spawns a task that polls the given future to completion and returns a
495 /// future that resolves to the spawned future's output.
496 ///
497 /// This method returns a [`Result`] that contains a [`RemoteHandle`](futures::future::RemoteHandle), or, if
498 /// spawning fails, a [`SpawnError`]. [`RemoteHandle`](futures::future::RemoteHandle) is a future that
499 /// resolves to the output of the spawned future.
500 ///
501 /// ```
502 /// use futures::executor::block_on;
503 /// use futures::future;
504 /// use local_pool_with_id::LocalPool;
505 /// use local_pool_with_id::SpawnWithIdExt;
506 ///
507 /// let mut executor = LocalPool::new();
508 /// let spawner = executor.spawner();
509 ///
510 /// let future = future::ready(1);
511 /// let (id, join_handle_fut) = spawner.spawn_with_handle(future).unwrap();
512 /// assert_eq!(executor.run_until(join_handle_fut), 1);
513 /// ```
514 fn spawn_with_handle<Fut>(
515 &self,
516 future: Fut,
517 ) -> Result<(usize, RemoteHandle<Fut::Output>), SpawnError>
518 where
519 Fut: Future + Send + 'static,
520 Fut::Output: Send,
521 {
522 let (future, handle) = future.remote_handle();
523 let id = self.spawn(future)?;
524 Ok((id, handle))
525 }
526}
527
528/// Extension trait for `LocalSpawn`.
529pub trait LocalSpawnWithIdExt: LocalSpawnWithId {
530 /// Spawns a task that polls the given future with output `()` to
531 /// completion.
532 ///
533 /// This method returns a [`Result`] that contains a [`SpawnError`] if
534 /// spawning fails.
535 ///
536 /// You can use [`spawn_with_handle`](SpawnWithIdExt::spawn_with_handle) if
537 /// you want to spawn a future with output other than `()` or if you want
538 /// to be able to await its completion.
539 ///
540 /// Note this method will eventually be replaced with the upcoming
541 /// `Spawn::spawn` method which will take a `dyn Future` as input.
542 /// Technical limitations prevent `Spawn::spawn` from being implemented
543 /// today. Feel free to use this method in the meantime.
544 ///
545 /// ```
546 /// use local_pool_with_id::LocalPool;
547 /// use local_pool_with_id::LocalSpawnWithIdExt;
548 ///
549 /// let executor = LocalPool::new();
550 /// let spawner = executor.spawner();
551 ///
552 /// let future = async { /* ... */ };
553 /// spawner.spawn_local(future).unwrap();
554 /// ```
555 fn spawn_local<Fut>(&self, future: Fut) -> Result<usize, SpawnError>
556 where
557 Fut: Future<Output = ()> + 'static,
558 {
559 self.spawn_local_obj_with_id(LocalFutureObj::new(Box::new(future)))
560 }
561
562 /// Spawns a task that polls the given future to completion and returns a
563 /// future that resolves to the spawned future's output.
564 ///
565 /// This method returns a [`Result`] that contains a [`RemoteHandle`](futures::future::RemoteHandle), or, if
566 /// spawning fails, a [`SpawnError`]. [`RemoteHandle`](futures::future::RemoteHandle) is a future that
567 /// resolves to the output of the spawned future.
568 ///
569 /// ```
570 /// use local_pool_with_id::LocalPool;
571 /// use local_pool_with_id::LocalSpawnWithIdExt;
572 ///
573 /// let mut executor = LocalPool::new();
574 /// let spawner = executor.spawner();
575 ///
576 /// let future = async { 1 };
577 /// let (id, join_handle_fut) = spawner.spawn_local_with_handle(future).unwrap();
578 /// assert_eq!(executor.run_until(join_handle_fut), 1);
579 /// ```
580 fn spawn_local_with_handle<Fut>(
581 &self,
582 future: Fut,
583 ) -> Result<(usize, RemoteHandle<Fut::Output>), SpawnError>
584 where
585 Fut: Future + 'static,
586 {
587 let (future, handle) = future.remote_handle();
588 let id = self.spawn_local(future)?;
589 Ok((id, handle))
590 }
591}
592
593#[cfg(test)]
594mod tests {
595 use super::*;
596
597 #[test]
598 fn tracking() {
599 let mut spawned_ids = std::collections::HashSet::new();
600 let mut pool = LocalPool::new();
601 let spawner = pool.spawner();
602
603 let (id1, handle1) = spawner
604 .spawn_with_handle(futures::future::ready(1i32))
605 .unwrap();
606 let (id2, handle2) = spawner
607 .spawn_with_handle(futures::future::ready(2u32))
608 .unwrap();
609
610 spawned_ids.insert(id1);
611 spawned_ids.insert(id2);
612
613 while !spawned_ids.is_empty() {
614 if let Some(completed) = pool.try_run_one() {
615 assert!(spawned_ids.remove(&completed))
616 }
617 }
618
619 assert_eq!(handle1.now_or_never().unwrap(), 1);
620 assert_eq!(handle2.now_or_never().unwrap(), 2);
621 }
622}