fumio_pool/
pool.rs

1
2mod task;
3
4use fumio_utils::park::Park;
5use futures_core::future::{Future, FutureObj, LocalFutureObj};
6use futures_core::task::{Spawn, LocalSpawn, SpawnError};
7use futures_executor::Enter;
8use futures_util::pin_mut;
9use std::rc::{Rc, Weak};
10use std::task::{Context, Poll};
11
12// Set up and run a basic single-threaded spawner loop, invoking `f` on each
13// turn.
14fn run_executor<P: Park, T, F: FnMut(&mut Context<'_>) -> Poll<T>>(park: &mut P, enter: &mut Enter, mut f: F) -> T {
15	let waker = park.waker();
16	let mut cx = Context::from_waker(&waker);
17
18	loop {
19		if let Poll::Ready(t) = f(&mut cx) {
20			return t;
21		}
22		park.park(enter, None);
23	}
24}
25
26/// A single-threaded task pool for polling futures to completion.
27///
28/// This executor allows you to multiplex any number of tasks onto a single
29/// thread. It's appropriate to poll strictly I/O-bound futures that do very
30/// little work in between I/O actions.
31///
32/// To get a handle to the pool that implements
33/// [`Spawn`](futures_core::task::Spawn), use the
34/// [`spawner()`](LocalPool::spawner) method. Because the executor is
35/// single-threaded, it supports a special form of task spawning for non-`Send`
36/// futures, via [`spawn_local_obj`](futures_core::task::LocalSpawn::spawn_local_obj).
37#[derive(Debug)]
38pub struct LocalPool {
39	task_list: Rc<task::LocalTaskList>,
40}
41
42impl LocalPool {
43	/// Create a new, empty pool of tasks.
44	pub fn new() -> Self {
45		Self {
46			task_list: Rc::new(task::LocalTaskList::new()),
47		}
48	}
49
50	/// Get a clonable handle to the pool as a [`Spawn`].
51	pub fn spawner(&self) -> LocalSpawner {
52		LocalSpawner {
53			task_list: Rc::downgrade(&self.task_list)
54		}
55	}
56
57	/// Run all tasks in the pool to completion.
58	///
59	/// The function will block the calling thread until *all* tasks in the pool
60	/// completed, including any spawned while running existing tasks.
61	pub fn run<P: Park>(&mut self, park: &mut P, enter: &mut Enter) {
62		run_executor(park, enter, |cx| self.poll_pool(cx))
63	}
64
65	/// Runs all the tasks in the pool until the given future completes.
66	///
67	/// The given spawner, `spawn`, is used as the default spawner for any
68	/// *newly*-spawned tasks. You can route these additional tasks back into
69	/// the `LocalPool` by using its spawner handle:
70	///
71	/// The function will block the calling thread *only* until the future `f`
72	/// completes; there may still be incomplete tasks in the pool, which will
73	/// be inert after the call completes, but can continue with further use of
74	/// one of the pool's run or poll methods. While the function is running,
75	/// however, all tasks in the pool will try to make progress.
76	pub fn run_until<P: Park, F: Future>(&mut self, park: &mut P, enter: &mut Enter, future: F) -> F::Output {
77		pin_mut!(future);
78
79		run_executor(park, enter, |cx| {
80			{
81				// if our main task is done, so are we
82				let result = future.as_mut().poll(cx);
83				if let Poll::Ready(output) = result {
84					return Poll::Ready(output);
85				}
86			}
87
88			let _ = self.poll_pool(cx);
89			Poll::Pending
90		})
91	}
92
93	/// Make progress on entire pool, polling each spawend task at most once.
94	///
95	/// Becomes `Ready` when all tasks are completed.
96	pub fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> {
97		self.task_list.poll(cx)
98	}
99
100	/// Spawn future on pool
101	pub fn spawn(&self, future: LocalFutureObj<'static, ()>) {
102		self.task_list.add_task(future);
103	}
104}
105
106impl Default for LocalPool {
107	fn default() -> Self {
108		Self::new()
109	}
110}
111
112impl Spawn for LocalPool {
113	fn spawn_obj(
114		&mut self,
115		future: FutureObj<'static, ()>,
116	) -> Result<(), SpawnError> {
117		self.spawn_local_obj(future.into())
118	}
119
120	fn status(&self) -> Result<(), SpawnError> {
121		self.status_local()
122	}
123}
124
125impl LocalSpawn for LocalPool {
126	fn spawn_local_obj(
127		&mut self,
128		future: LocalFutureObj<'static, ()>,
129	) -> Result<(), SpawnError> {
130		self.spawn(future);
131		Ok(())
132	}
133
134	fn status_local(&self) -> Result<(), SpawnError> {
135		Ok(())
136	}
137}
138
139/// A handle to a [`LocalPool`](LocalPool) that implements [`Spawn`](futures_core::task::Spawn) and
140/// [`LocalSpawn`](futures_core::task::LocalSpawn).
141#[derive(Clone, Debug)]
142pub struct LocalSpawner {
143	task_list: Weak<task::LocalTaskList>,
144}
145
146impl LocalSpawner {
147	/// Enter a spawner.
148	///
149	/// Clears the current spawner when `f` returns.
150	///
151	/// [`current_local`](fn.current_local.html) needs this to work.
152	///
153	/// A runtime (combining reactor, pool, ...) should enter a spawner handle (in
154	/// each thread it runs tasks from the pool) so all tasks have access to the
155	/// spawner.
156	///
157	/// # Panics
158	///
159	/// Panics if a spawner is already entered.
160	pub fn enter<F, T>(self, enter: &mut Enter, f: F) -> T
161	where
162		F: FnOnce(&mut Enter) -> T
163	{
164		crate::current::enter_local(self, enter, f)
165	}
166}
167
168impl Spawn for LocalSpawner {
169	fn spawn_obj(
170		&mut self,
171		future: FutureObj<'static, ()>,
172	) -> Result<(), SpawnError> {
173		self.spawn_local_obj(future.into())
174	}
175
176	fn status(&self) -> Result<(), SpawnError> {
177		self.status_local()
178	}
179}
180
181impl LocalSpawn for LocalSpawner {
182	fn spawn_local_obj(
183		&mut self,
184		future: LocalFutureObj<'static, ()>,
185	) -> Result<(), SpawnError> {
186		if let Some(task_list) = self.task_list.upgrade() {
187			task_list.add_task(future);
188			Ok(())
189		} else {
190			Err(SpawnError::shutdown())
191		}
192	}
193
194	fn status_local(&self) -> Result<(), SpawnError> {
195		if self.task_list.upgrade().is_some() {
196			Ok(())
197		} else {
198			Err(SpawnError::shutdown())
199		}
200	}
201}