1use alloc::{sync::Arc, task::Wake, vec::Vec};
2use core::{cell::Cell, fmt, future::Future};
3
4use crate::prelude::*;
5
6#[doc = include_str!("../examples/executor.rs")]
13#[doc = include_str!("../examples/spawn.rs")]
20#[doc = include_str!("../examples/recursive.rs")]
27#[doc = include_str!("../examples/resume.rs")]
32pub struct Executor<P: Pool = DefaultPool>(Arc<P>);
34
35impl Default for Executor {
36 fn default() -> Self {
37 Self::new(DefaultPool::default())
38 }
39}
40
41impl<P: Pool> Clone for Executor<P> {
42 fn clone(&self) -> Self {
43 Self(Arc::clone(&self.0))
44 }
45}
46
47impl<P: Pool + fmt::Debug> fmt::Debug for Executor<P> {
48 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49 f.debug_tuple("Executor").field(&self.0).finish()
50 }
51}
52
53impl<P: Pool> Executor<P> {
54 #[inline(always)]
58 pub fn new(pool: P) -> Self {
59 Self(Arc::new(pool))
60 }
61
62 #[inline(always)]
68 pub fn block_on(self, f: impl Future<Output = ()> + 'static) {
69 #[cfg(feature = "web")]
70 wasm_bindgen_futures::spawn_local(f);
71
72 #[cfg(not(feature = "web"))]
73 block_on(f, &self.0);
74 }
75}
76
77impl<P: Pool> Executor<P> {
78 #[inline(always)]
83 pub fn spawn_notify(&self, n: LocalBoxNotify<'static>) {
84 #[cfg(feature = "web")]
86 wasm_bindgen_futures::spawn_local(async move {
87 let mut n = n;
88
89 n.next().await;
90 });
91
92 #[cfg(not(feature = "web"))]
94 self.0.push(n);
95 }
96
97 #[inline(always)]
99 pub fn spawn_boxed(&self, f: impl Future<Output = ()> + 'static) {
100 #[cfg(feature = "web")]
102 wasm_bindgen_futures::spawn_local(f);
103
104 #[cfg(not(feature = "web"))]
106 self.spawn_notify(Box::pin(f.fuse()));
107 }
108}
109
110#[doc = include_str!("../examples/pool.rs")]
118pub trait Pool {
120 type Park: Park;
122
123 fn push(&self, task: LocalBoxNotify<'static>);
125
126 fn drain(&self, tasks: &mut Vec<LocalBoxNotify<'static>>) -> bool;
129}
130
131pub trait Park: Default + Send + Sync + 'static {
133 fn park(&self);
136
137 fn unpark(&self);
139}
140
141#[derive(Default)]
142pub struct DefaultPool {
143 spawning_queue: Cell<Vec<LocalBoxNotify<'static>>>,
144}
145
146impl fmt::Debug for DefaultPool {
147 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
148 let queue = self.spawning_queue.take();
149
150 f.debug_struct("DefaultPool")
151 .field("spawning_queue", &queue)
152 .finish()?;
153 self.spawning_queue.set(queue);
154
155 Ok(())
156 }
157}
158
159impl Pool for DefaultPool {
160 type Park = DefaultPark;
161
162 #[inline(always)]
164 fn push(&self, task: LocalBoxNotify<'static>) {
165 let mut queue = self.spawning_queue.take();
166
167 queue.push(task);
168 self.spawning_queue.set(queue);
169 }
170
171 #[inline(always)]
173 fn drain(&self, tasks: &mut Vec<LocalBoxNotify<'static>>) -> bool {
174 let mut queue = self.spawning_queue.take();
175 let mut drained = queue.drain(..).peekable();
176 let has_drained = drained.peek().is_some();
177
178 tasks.extend(drained);
179 self.spawning_queue.set(queue);
180
181 has_drained
182 }
183}
184
185#[cfg(not(feature = "std"))]
186#[derive(Copy, Clone, Debug, Default)]
187pub struct DefaultPark;
188
189#[cfg(feature = "std")]
190#[derive(Debug)]
191pub struct DefaultPark(std::sync::atomic::AtomicBool, std::thread::Thread);
192
193#[cfg(feature = "std")]
194impl Default for DefaultPark {
195 fn default() -> Self {
196 Self(
197 std::sync::atomic::AtomicBool::new(true),
198 std::thread::current(),
199 )
200 }
201}
202
203impl Park for DefaultPark {
204 #[inline(always)]
206 fn park(&self) {
207 #[cfg(feature = "std")]
209 while self.0.swap(true, std::sync::atomic::Ordering::SeqCst) {
210 std::thread::park();
211 }
212
213 #[cfg(not(feature = "std"))]
215 core::hint::spin_loop();
216 }
217
218 #[inline(always)]
220 fn unpark(&self) {
221 #[cfg(feature = "std")]
223 if self.0.swap(false, std::sync::atomic::Ordering::SeqCst) {
224 self.1.unpark();
225 }
226 }
227}
228
229struct Unpark<P: Park>(P);
230
231impl<P: Park> Wake for Unpark<P> {
232 #[inline(always)]
233 fn wake(self: Arc<Self>) {
234 self.0.unpark();
235 }
236
237 #[inline(always)]
238 fn wake_by_ref(self: &Arc<Self>) {
239 self.0.unpark();
240 }
241}
242
243#[cfg(not(feature = "web"))]
244fn block_on<P: Pool>(f: impl Future<Output = ()> + 'static, pool: &Arc<P>) {
245 let f: LocalBoxNotify<'_> = Box::pin(f.fuse());
247
248 let tasks = &mut Vec::new();
250
251 let parky = Arc::new(Unpark(<P as Pool>::Park::default()));
253 let waker = parky.clone().into();
254 let tasky = &mut Task::from_waker(&waker);
255
256 tasks.push(f);
258
259 while !tasks.is_empty() {
261 let poll = Pin::new(tasks.as_mut_slice()).poll_next(tasky);
263 let Ready((task_index, ())) = poll else {
265 if !pool.drain(tasks) {
267 parky.0.park();
268 }
269 continue;
270 };
271
272 tasks.swap_remove(task_index);
274 pool.drain(tasks);
276 }
277}