pasts/
spawn.rs

1use alloc::{sync::Arc, task::Wake, vec::Vec};
2use core::{cell::Cell, fmt, future::Future};
3
4use crate::prelude::*;
5
6/// Pasts' executor.
7///
8/// # Run a Future
9/// It's relatively simple to block on a future, and run it to completion:
10///
11/// ```rust
12#[doc = include_str!("../examples/executor.rs")]
13/// ```
14/// 
15/// # Spawn a Future
16/// You may spawn tasks on an `Executor`.  Only once all tasks have completed,
17/// can [`block_on()`](Executor::block_on()) return.
18/// ```rust,no_run
19#[doc = include_str!("../examples/spawn.rs")]
20/// ```
21/// 
22/// # Recursive `block_on()`
23/// One cool feature about the pasts executor is that you can run it from within
24/// the context of another:
25/// ```rust
26#[doc = include_str!("../examples/recursive.rs")]
27/// ```
28/// 
29/// Or even resume the executor from within it's own context:
30/// ```rust
31#[doc = include_str!("../examples/resume.rs")]
32/// ```
33pub struct Executor<P: Pool = DefaultPool>(Arc<P>);
34
35impl Default for Executor {
36    fn default() -> Self {
37        Self::new(DefaultPool::default())
38    }
39}
40
41impl<P: Pool> Clone for Executor<P> {
42    fn clone(&self) -> Self {
43        Self(Arc::clone(&self.0))
44    }
45}
46
47impl<P: Pool + fmt::Debug> fmt::Debug for Executor<P> {
48    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49        f.debug_tuple("Executor").field(&self.0).finish()
50    }
51}
52
53impl<P: Pool> Executor<P> {
54    /// Create a new executor that can only spawn tasks from the current thread.
55    ///
56    /// Custom executors can be built by implementing [`Pool`].
57    #[inline(always)]
58    pub fn new(pool: P) -> Self {
59        Self(Arc::new(pool))
60    }
61
62    /// Block on a future and return it's result.
63    ///
64    /// # Platform-Specific Behavior
65    /// When building with feature _`web`_, spawns task and returns
66    /// immediately instead of blocking.
67    #[inline(always)]
68    pub fn block_on(self, f: impl Future<Output = ()> + 'static) {
69        #[cfg(feature = "web")]
70        wasm_bindgen_futures::spawn_local(f);
71
72        #[cfg(not(feature = "web"))]
73        block_on(f, &self.0);
74    }
75}
76
77impl<P: Pool> Executor<P> {
78    /// Spawn a [`LocalBoxNotify`] on this executor.
79    ///
80    /// Execution of the [`LocalBoxNotify`] will halt after the first poll that
81    /// returns [`Ready`].
82    #[inline(always)]
83    pub fn spawn_notify(&self, n: LocalBoxNotify<'static>) {
84        // Convert the notify into a future and spawn on wasm_bindgen_futures
85        #[cfg(feature = "web")]
86        wasm_bindgen_futures::spawn_local(async move {
87            let mut n = n;
88
89            n.next().await;
90        });
91
92        // Push the notify onto the pool.
93        #[cfg(not(feature = "web"))]
94        self.0.push(n);
95    }
96
97    /// Box and spawn a future on this executor.
98    #[inline(always)]
99    pub fn spawn_boxed(&self, f: impl Future<Output = ()> + 'static) {
100        // Spawn the future on wasm_bindgen_futures
101        #[cfg(feature = "web")]
102        wasm_bindgen_futures::spawn_local(f);
103
104        // Fuse the future, box it, and push it onto the pool.
105        #[cfg(not(feature = "web"))]
106        self.spawn_notify(Box::pin(f.fuse()));
107    }
108}
109
110/// Storage for a task pool.
111///
112/// # Implementing `Pool` For A Custom Executor
113/// This example shows how to create a custom single-threaded executor using
114/// [`Executor::new()`].
115///
116/// ```rust
117#[doc = include_str!("../examples/pool.rs")]
118/// ```
119pub trait Pool {
120    /// Type that handles the sleeping / waking of the executor.
121    type Park: Park;
122
123    /// Push a task into the thread pool queue.
124    fn push(&self, task: LocalBoxNotify<'static>);
125
126    /// Drain tasks from the thread pool queue.  Should returns true if drained
127    /// at least one task.
128    fn drain(&self, tasks: &mut Vec<LocalBoxNotify<'static>>) -> bool;
129}
130
131/// Trait for implementing the parking / unparking threads.
132pub trait Park: Default + Send + Sync + 'static {
133    /// The park routine; should put the processor or thread to sleep in order
134    /// to save CPU cycles and power, until the hardware tells it to wake up.
135    fn park(&self);
136
137    /// Wake the processor or thread.
138    fn unpark(&self);
139}
140
141#[derive(Default)]
142pub struct DefaultPool {
143    spawning_queue: Cell<Vec<LocalBoxNotify<'static>>>,
144}
145
146impl fmt::Debug for DefaultPool {
147    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
148        let queue = self.spawning_queue.take();
149
150        f.debug_struct("DefaultPool")
151            .field("spawning_queue", &queue)
152            .finish()?;
153        self.spawning_queue.set(queue);
154
155        Ok(())
156    }
157}
158
159impl Pool for DefaultPool {
160    type Park = DefaultPark;
161
162    // Push onto queue of tasks to spawn.
163    #[inline(always)]
164    fn push(&self, task: LocalBoxNotify<'static>) {
165        let mut queue = self.spawning_queue.take();
166
167        queue.push(task);
168        self.spawning_queue.set(queue);
169    }
170
171    // Drain from queue of tasks to spawn.
172    #[inline(always)]
173    fn drain(&self, tasks: &mut Vec<LocalBoxNotify<'static>>) -> bool {
174        let mut queue = self.spawning_queue.take();
175        let mut drained = queue.drain(..).peekable();
176        let has_drained = drained.peek().is_some();
177
178        tasks.extend(drained);
179        self.spawning_queue.set(queue);
180
181        has_drained
182    }
183}
184
185#[cfg(not(feature = "std"))]
186#[derive(Copy, Clone, Debug, Default)]
187pub struct DefaultPark;
188
189#[cfg(feature = "std")]
190#[derive(Debug)]
191pub struct DefaultPark(std::sync::atomic::AtomicBool, std::thread::Thread);
192
193#[cfg(feature = "std")]
194impl Default for DefaultPark {
195    fn default() -> Self {
196        Self(
197            std::sync::atomic::AtomicBool::new(true),
198            std::thread::current(),
199        )
200    }
201}
202
203impl Park for DefaultPark {
204    // Park the current thread.
205    #[inline(always)]
206    fn park(&self) {
207        // Only park on std; There is no portable parking for no-std.
208        #[cfg(feature = "std")]
209        while self.0.swap(true, std::sync::atomic::Ordering::SeqCst) {
210            std::thread::park();
211        }
212
213        // Hint at spin loop to possibly short sleep on no-std to save CPU time.
214        #[cfg(not(feature = "std"))]
215        core::hint::spin_loop();
216    }
217
218    // Unpark the parked thread
219    #[inline(always)]
220    fn unpark(&self) {
221        // Only unpark on std; Since no-std doesn't park, it's already unparked.
222        #[cfg(feature = "std")]
223        if self.0.swap(false, std::sync::atomic::Ordering::SeqCst) {
224            self.1.unpark();
225        }
226    }
227}
228
229struct Unpark<P: Park>(P);
230
231impl<P: Park> Wake for Unpark<P> {
232    #[inline(always)]
233    fn wake(self: Arc<Self>) {
234        self.0.unpark();
235    }
236
237    #[inline(always)]
238    fn wake_by_ref(self: &Arc<Self>) {
239        self.0.unpark();
240    }
241}
242
243#[cfg(not(feature = "web"))]
244fn block_on<P: Pool>(f: impl Future<Output = ()> + 'static, pool: &Arc<P>) {
245    // Fuse main task
246    let f: LocalBoxNotify<'_> = Box::pin(f.fuse());
247
248    // Set up the notify
249    let tasks = &mut Vec::new();
250
251    // Set up the park, waker, and context.
252    let parky = Arc::new(Unpark(<P as Pool>::Park::default()));
253    let waker = parky.clone().into();
254    let tasky = &mut Task::from_waker(&waker);
255
256    // Spawn main task
257    tasks.push(f);
258
259    // Run the set of futures to completion.
260    while !tasks.is_empty() {
261        // Poll the set of futures
262        let poll = Pin::new(tasks.as_mut_slice()).poll_next(tasky);
263        // If no tasks have completed, then park
264        let Ready((task_index, ())) = poll else {
265            // Initiate execution of any spawned tasks - if no new tasks, park
266            if !pool.drain(tasks) {
267                parky.0.park();
268            }
269            continue;
270        };
271
272        // Task has completed
273        tasks.swap_remove(task_index);
274        // Drain any spawned tasks into the pool
275        pool.drain(tasks);
276    }
277}