edge_executor/
lib.rs

1#![cfg_attr(not(feature = "std"), no_std)]
2
3#[cfg(all(feature = "heapless", feature = "unbounded"))]
4compile_error!("Feature `heapless` is not compatible with feature `unbounded`.");
5
6use core::future::{poll_fn, Future};
7use core::marker::PhantomData;
8use core::task::{Context, Poll};
9
10extern crate alloc;
11
12use alloc::rc::Rc;
13
14use async_task::Runnable;
15
16pub use async_task::{FallibleTask, Task};
17
18use atomic_waker::AtomicWaker;
19use futures_lite::FutureExt;
20
21#[cfg(not(feature = "portable-atomic"))]
22use alloc::sync::Arc;
23#[cfg(feature = "portable-atomic")]
24use portable_atomic_util::Arc;
25
26use once_cell::sync::OnceCell;
27
28#[cfg(feature = "std")]
29pub use futures_lite::future::block_on;
30
31/// An async executor.
32///
33/// # Examples
34///
35/// A multi-threaded executor:
36///
37/// ```ignore
38/// use async_channel::unbounded;
39/// use easy_parallel::Parallel;
40///
41/// use edge_executor::{Executor, block_on};
42///
43/// let ex: Executor = Default::default();
44/// let (signal, shutdown) = unbounded::<()>();
45///
46/// Parallel::new()
47///     // Run four executor threads.
48///     .each(0..4, |_| block_on(ex.run(shutdown.recv())))
49///     // Run the main future on the current thread.
50///     .finish(|| block_on(async {
51///         println!("Hello world!");
52///         drop(signal);
53///     }));
54/// ```
55pub struct Executor<'a, const C: usize = 64> {
56    state: OnceCell<Arc<State<C>>>,
57    _invariant: PhantomData<core::cell::UnsafeCell<&'a ()>>,
58}
59
60impl<'a, const C: usize> Executor<'a, C> {
61    /// Creates a new executor.
62    ///
63    /// # Examples
64    ///
65    /// ```
66    /// use edge_executor::Executor;
67    ///
68    /// let ex: Executor = Default::default();
69    /// ```
70    pub const fn new() -> Self {
71        Self {
72            state: OnceCell::new(),
73            _invariant: PhantomData,
74        }
75    }
76
77    /// Spawns a task onto the executor.
78    ///
79    /// # Examples
80    ///
81    /// ```
82    /// use edge_executor::Executor;
83    ///
84    /// let ex: Executor = Default::default();
85    ///
86    /// let task = ex.spawn(async {
87    ///     println!("Hello world");
88    /// });
89    /// ```
90    ///
91    /// Note that if the executor's queue size is equal to the number of currently
92    /// spawned and running tasks, spawning this additional task might cause the executor to panic
93    /// later, when the task is scheduled for polling.
94    pub fn spawn<F>(&self, fut: F) -> Task<F::Output>
95    where
96        F: Future + Send + 'a,
97        F::Output: Send + 'a,
98    {
99        unsafe { self.spawn_unchecked(fut) }
100    }
101
102    /// Attempts to run a task if at least one is scheduled.
103    ///
104    /// Running a scheduled task means simply polling its future once.
105    ///
106    /// # Examples
107    ///
108    /// ```
109    /// use edge_executor::Executor;
110    ///
111    /// let ex: Executor = Default::default();
112    /// assert!(!ex.try_tick()); // no tasks to run
113    ///
114    /// let task = ex.spawn(async {
115    ///     println!("Hello world");
116    /// });
117    /// assert!(ex.try_tick()); // a task was found
118    /// ```    
119    pub fn try_tick(&self) -> bool {
120        if let Some(runnable) = self.try_runnable() {
121            runnable.run();
122
123            true
124        } else {
125            false
126        }
127    }
128
129    /// Runs a single task asynchronously.
130    ///
131    /// Running a task means simply polling its future once.
132    ///
133    /// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
134    ///
135    /// # Examples
136    ///
137    /// ```
138    /// use edge_executor::{Executor, block_on};
139    ///
140    /// let ex: Executor = Default::default();
141    ///
142    /// let task = ex.spawn(async {
143    ///     println!("Hello world");
144    /// });
145    /// block_on(ex.tick()); // runs the task
146    /// ```
147    pub async fn tick(&self) {
148        self.runnable().await.run();
149    }
150
151    /// Runs the executor asynchronously until the given future completes.
152    ///
153    /// # Examples
154    ///
155    /// ```
156    /// use edge_executor::{Executor, block_on};
157    ///
158    /// let ex: Executor = Default::default();
159    ///
160    /// let task = ex.spawn(async { 1 + 2 });
161    /// let res = block_on(ex.run(async { task.await * 2 }));
162    ///
163    /// assert_eq!(res, 6);
164    /// ```
165    pub async fn run<F>(&self, fut: F) -> F::Output
166    where
167        F: Future + Send + 'a,
168    {
169        unsafe { self.run_unchecked(fut).await }
170    }
171
172    /// Waits for the next runnable task to run.
173    async fn runnable(&self) -> Runnable {
174        poll_fn(|ctx| self.poll_runnable(ctx)).await
175    }
176
177    /// Polls the first task scheduled for execution by the executor.
178    fn poll_runnable(&self, ctx: &Context<'_>) -> Poll<Runnable> {
179        self.state().waker.register(ctx.waker());
180
181        if let Some(runnable) = self.try_runnable() {
182            Poll::Ready(runnable)
183        } else {
184            Poll::Pending
185        }
186    }
187
188    /// Pops the first task scheduled for execution by the executor.
189    ///
190    /// Returns
191    /// - `None` - if no task was scheduled for execution
192    /// - `Some(Runnnable)` - the first task scheduled for execution. Calling `Runnable::run` will
193    ///    execute the task. In other words, it will poll its future.
194    fn try_runnable(&self) -> Option<Runnable> {
195        let runnable;
196
197        #[cfg(not(feature = "heapless"))]
198        {
199            runnable = self.state().queue.pop();
200        }
201
202        #[cfg(feature = "heapless")]
203        {
204            runnable = self.state().queue.dequeue();
205        }
206
207        runnable
208    }
209
210    unsafe fn spawn_unchecked<F>(&self, fut: F) -> Task<F::Output>
211    where
212        F: Future,
213    {
214        let schedule = {
215            let state = self.state().clone();
216
217            move |runnable| {
218                #[cfg(all(not(feature = "heapless"), feature = "unbounded"))]
219                {
220                    state.queue.push(runnable);
221                }
222
223                #[cfg(all(not(feature = "heapless"), not(feature = "unbounded")))]
224                {
225                    state.queue.push(runnable).unwrap();
226                }
227
228                #[cfg(feature = "heapless")]
229                {
230                    state.queue.enqueue(runnable).unwrap();
231                }
232
233                if let Some(waker) = state.waker.take() {
234                    waker.wake();
235                }
236            }
237        };
238
239        let (runnable, task) = unsafe { async_task::spawn_unchecked(fut, schedule) };
240
241        runnable.schedule();
242
243        task
244    }
245
246    async unsafe fn run_unchecked<F>(&self, fut: F) -> F::Output
247    where
248        F: Future,
249    {
250        let run_forever = async {
251            loop {
252                self.tick().await;
253            }
254        };
255
256        run_forever.or(fut).await
257    }
258
259    /// Returns a reference to the inner state.
260    fn state(&self) -> &Arc<State<C>> {
261        self.state.get_or_init(|| Arc::new(State::new()))
262    }
263}
264
265impl<'a, const C: usize> Default for Executor<'a, C> {
266    fn default() -> Self {
267        Self::new()
268    }
269}
270
271unsafe impl<'a, const C: usize> Send for Executor<'a, C> {}
272unsafe impl<'a, const C: usize> Sync for Executor<'a, C> {}
273
274/// A thread-local executor.
275///
276/// The executor can only be run on the thread that created it.
277///
278/// # Examples
279///
280/// ```
281/// use edge_executor::{LocalExecutor, block_on};
282///
283/// let local_ex: LocalExecutor = Default::default();
284///
285/// block_on(local_ex.run(async {
286///     println!("Hello world!");
287/// }));
288/// ```
289pub struct LocalExecutor<'a, const C: usize = 64> {
290    executor: Executor<'a, C>,
291    _not_send: PhantomData<core::cell::UnsafeCell<&'a Rc<()>>>,
292}
293
294#[allow(clippy::missing_safety_doc)]
295impl<'a, const C: usize> LocalExecutor<'a, C> {
296    /// Creates a single-threaded executor.
297    ///
298    /// # Examples
299    ///
300    /// ```
301    /// use edge_executor::LocalExecutor;
302    ///
303    /// let local_ex: LocalExecutor = Default::default();
304    /// ```
305    pub const fn new() -> Self {
306        Self {
307            executor: Executor::<C>::new(),
308            _not_send: PhantomData,
309        }
310    }
311
312    /// Spawns a task onto the executor.
313    ///
314    /// # Examples
315    ///
316    /// ```
317    /// use edge_executor::LocalExecutor;
318    ///
319    /// let local_ex: LocalExecutor = Default::default();
320    ///
321    /// let task = local_ex.spawn(async {
322    ///     println!("Hello world");
323    /// });
324    /// ```
325    ///
326    /// Note that if the executor's queue size is equal to the number of currently
327    /// spawned and running tasks, spawning this additional task might cause the executor to panic
328    /// later, when the task is scheduled for polling.
329    pub fn spawn<F>(&self, fut: F) -> Task<F::Output>
330    where
331        F: Future + 'a,
332        F::Output: 'a,
333    {
334        unsafe { self.executor.spawn_unchecked(fut) }
335    }
336
337    /// Attempts to run a task if at least one is scheduled.
338    ///
339    /// Running a scheduled task means simply polling its future once.
340    ///
341    /// # Examples
342    ///
343    /// ```
344    /// use edge_executor::LocalExecutor;
345    ///
346    /// let local_ex: LocalExecutor = Default::default();
347    /// assert!(!local_ex.try_tick()); // no tasks to run
348    ///
349    /// let task = local_ex.spawn(async {
350    ///     println!("Hello world");
351    /// });
352    /// assert!(local_ex.try_tick()); // a task was found
353    /// ```    
354    pub fn try_tick(&self) -> bool {
355        self.executor.try_tick()
356    }
357
358    /// Runs a single task asynchronously.
359    ///
360    /// Running a task means simply polling its future once.
361    ///
362    /// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
363    ///
364    /// # Examples
365    ///
366    /// ```
367    /// use edge_executor::{LocalExecutor, block_on};
368    ///
369    /// let local_ex: LocalExecutor = Default::default();
370    ///
371    /// let task = local_ex.spawn(async {
372    ///     println!("Hello world");
373    /// });
374    /// block_on(local_ex.tick()); // runs the task
375    /// ```
376    pub async fn tick(&self) {
377        self.executor.tick().await
378    }
379
380    /// Runs the executor asynchronously until the given future completes.
381    ///
382    /// # Examples
383    ///
384    /// ```
385    /// use edge_executor::{LocalExecutor, block_on};
386    ///
387    /// let local_ex: LocalExecutor = Default::default();
388    ///
389    /// let task = local_ex.spawn(async { 1 + 2 });
390    /// let res = block_on(local_ex.run(async { task.await * 2 }));
391    ///
392    /// assert_eq!(res, 6);
393    /// ```
394    pub async fn run<F>(&self, fut: F) -> F::Output
395    where
396        F: Future,
397    {
398        unsafe { self.executor.run_unchecked(fut) }.await
399    }
400}
401
402impl<'a, const C: usize> Default for LocalExecutor<'a, C> {
403    fn default() -> Self {
404        Self::new()
405    }
406}
407
408struct State<const C: usize> {
409    #[cfg(all(not(feature = "heapless"), feature = "unbounded"))]
410    queue: crossbeam_queue::SegQueue<Runnable>,
411    #[cfg(all(not(feature = "heapless"), not(feature = "unbounded")))]
412    queue: crossbeam_queue::ArrayQueue<Runnable>,
413    #[cfg(feature = "heapless")]
414    queue: heapless::mpmc::MpMcQueue<Runnable, C>,
415    waker: AtomicWaker,
416}
417
418impl<const C: usize> State<C> {
419    fn new() -> Self {
420        Self {
421            #[cfg(all(not(feature = "heapless"), feature = "unbounded"))]
422            queue: crossbeam_queue::SegQueue::new(),
423            #[cfg(all(not(feature = "heapless"), not(feature = "unbounded")))]
424            queue: crossbeam_queue::ArrayQueue::new(C),
425            #[cfg(feature = "heapless")]
426            queue: heapless::mpmc::MpMcQueue::new(),
427            waker: AtomicWaker::new(),
428        }
429    }
430}