edge_executor/lib.rs
1#![cfg_attr(not(feature = "std"), no_std)]
2
3#[cfg(all(feature = "heapless", feature = "unbounded"))]
4compile_error!("Feature `heapless` is not compatible with feature `unbounded`.");
5
6use core::future::{poll_fn, Future};
7use core::marker::PhantomData;
8use core::task::{Context, Poll};
9
10extern crate alloc;
11
12use alloc::rc::Rc;
13
14use async_task::Runnable;
15
16pub use async_task::{FallibleTask, Task};
17
18use atomic_waker::AtomicWaker;
19use futures_lite::FutureExt;
20
21#[cfg(not(feature = "portable-atomic"))]
22use alloc::sync::Arc;
23#[cfg(feature = "portable-atomic")]
24use portable_atomic_util::Arc;
25
26use once_cell::sync::OnceCell;
27
28#[cfg(feature = "std")]
29pub use futures_lite::future::block_on;
30
31/// An async executor.
32///
33/// # Examples
34///
35/// A multi-threaded executor:
36///
37/// ```ignore
38/// use async_channel::unbounded;
39/// use easy_parallel::Parallel;
40///
41/// use edge_executor::{Executor, block_on};
42///
43/// let ex: Executor = Default::default();
44/// let (signal, shutdown) = unbounded::<()>();
45///
46/// Parallel::new()
47/// // Run four executor threads.
48/// .each(0..4, |_| block_on(ex.run(shutdown.recv())))
49/// // Run the main future on the current thread.
50/// .finish(|| block_on(async {
51/// println!("Hello world!");
52/// drop(signal);
53/// }));
54/// ```
55pub struct Executor<'a, const C: usize = 64> {
56 state: OnceCell<Arc<State<C>>>,
57 _invariant: PhantomData<core::cell::UnsafeCell<&'a ()>>,
58}
59
60impl<'a, const C: usize> Executor<'a, C> {
61 /// Creates a new executor.
62 ///
63 /// # Examples
64 ///
65 /// ```
66 /// use edge_executor::Executor;
67 ///
68 /// let ex: Executor = Default::default();
69 /// ```
70 pub const fn new() -> Self {
71 Self {
72 state: OnceCell::new(),
73 _invariant: PhantomData,
74 }
75 }
76
77 /// Spawns a task onto the executor.
78 ///
79 /// # Examples
80 ///
81 /// ```
82 /// use edge_executor::Executor;
83 ///
84 /// let ex: Executor = Default::default();
85 ///
86 /// let task = ex.spawn(async {
87 /// println!("Hello world");
88 /// });
89 /// ```
90 ///
91 /// Note that if the executor's queue size is equal to the number of currently
92 /// spawned and running tasks, spawning this additional task might cause the executor to panic
93 /// later, when the task is scheduled for polling.
94 pub fn spawn<F>(&self, fut: F) -> Task<F::Output>
95 where
96 F: Future + Send + 'a,
97 F::Output: Send + 'a,
98 {
99 unsafe { self.spawn_unchecked(fut) }
100 }
101
102 /// Attempts to run a task if at least one is scheduled.
103 ///
104 /// Running a scheduled task means simply polling its future once.
105 ///
106 /// # Examples
107 ///
108 /// ```
109 /// use edge_executor::Executor;
110 ///
111 /// let ex: Executor = Default::default();
112 /// assert!(!ex.try_tick()); // no tasks to run
113 ///
114 /// let task = ex.spawn(async {
115 /// println!("Hello world");
116 /// });
117 /// assert!(ex.try_tick()); // a task was found
118 /// ```
119 pub fn try_tick(&self) -> bool {
120 if let Some(runnable) = self.try_runnable() {
121 runnable.run();
122
123 true
124 } else {
125 false
126 }
127 }
128
129 /// Runs a single task asynchronously.
130 ///
131 /// Running a task means simply polling its future once.
132 ///
133 /// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
134 ///
135 /// # Examples
136 ///
137 /// ```
138 /// use edge_executor::{Executor, block_on};
139 ///
140 /// let ex: Executor = Default::default();
141 ///
142 /// let task = ex.spawn(async {
143 /// println!("Hello world");
144 /// });
145 /// block_on(ex.tick()); // runs the task
146 /// ```
147 pub async fn tick(&self) {
148 self.runnable().await.run();
149 }
150
151 /// Runs the executor asynchronously until the given future completes.
152 ///
153 /// # Examples
154 ///
155 /// ```
156 /// use edge_executor::{Executor, block_on};
157 ///
158 /// let ex: Executor = Default::default();
159 ///
160 /// let task = ex.spawn(async { 1 + 2 });
161 /// let res = block_on(ex.run(async { task.await * 2 }));
162 ///
163 /// assert_eq!(res, 6);
164 /// ```
165 pub async fn run<F>(&self, fut: F) -> F::Output
166 where
167 F: Future + Send + 'a,
168 {
169 unsafe { self.run_unchecked(fut).await }
170 }
171
172 /// Waits for the next runnable task to run.
173 async fn runnable(&self) -> Runnable {
174 poll_fn(|ctx| self.poll_runnable(ctx)).await
175 }
176
177 /// Polls the first task scheduled for execution by the executor.
178 fn poll_runnable(&self, ctx: &Context<'_>) -> Poll<Runnable> {
179 self.state().waker.register(ctx.waker());
180
181 if let Some(runnable) = self.try_runnable() {
182 Poll::Ready(runnable)
183 } else {
184 Poll::Pending
185 }
186 }
187
188 /// Pops the first task scheduled for execution by the executor.
189 ///
190 /// Returns
191 /// - `None` - if no task was scheduled for execution
192 /// - `Some(Runnnable)` - the first task scheduled for execution. Calling `Runnable::run` will
193 /// execute the task. In other words, it will poll its future.
194 fn try_runnable(&self) -> Option<Runnable> {
195 let runnable;
196
197 #[cfg(not(feature = "heapless"))]
198 {
199 runnable = self.state().queue.pop();
200 }
201
202 #[cfg(feature = "heapless")]
203 {
204 runnable = self.state().queue.dequeue();
205 }
206
207 runnable
208 }
209
210 unsafe fn spawn_unchecked<F>(&self, fut: F) -> Task<F::Output>
211 where
212 F: Future,
213 {
214 let schedule = {
215 let state = self.state().clone();
216
217 move |runnable| {
218 #[cfg(all(not(feature = "heapless"), feature = "unbounded"))]
219 {
220 state.queue.push(runnable);
221 }
222
223 #[cfg(all(not(feature = "heapless"), not(feature = "unbounded")))]
224 {
225 state.queue.push(runnable).unwrap();
226 }
227
228 #[cfg(feature = "heapless")]
229 {
230 state.queue.enqueue(runnable).unwrap();
231 }
232
233 if let Some(waker) = state.waker.take() {
234 waker.wake();
235 }
236 }
237 };
238
239 let (runnable, task) = unsafe { async_task::spawn_unchecked(fut, schedule) };
240
241 runnable.schedule();
242
243 task
244 }
245
246 async unsafe fn run_unchecked<F>(&self, fut: F) -> F::Output
247 where
248 F: Future,
249 {
250 let run_forever = async {
251 loop {
252 self.tick().await;
253 }
254 };
255
256 run_forever.or(fut).await
257 }
258
259 /// Returns a reference to the inner state.
260 fn state(&self) -> &Arc<State<C>> {
261 self.state.get_or_init(|| Arc::new(State::new()))
262 }
263}
264
265impl<'a, const C: usize> Default for Executor<'a, C> {
266 fn default() -> Self {
267 Self::new()
268 }
269}
270
271unsafe impl<'a, const C: usize> Send for Executor<'a, C> {}
272unsafe impl<'a, const C: usize> Sync for Executor<'a, C> {}
273
274/// A thread-local executor.
275///
276/// The executor can only be run on the thread that created it.
277///
278/// # Examples
279///
280/// ```
281/// use edge_executor::{LocalExecutor, block_on};
282///
283/// let local_ex: LocalExecutor = Default::default();
284///
285/// block_on(local_ex.run(async {
286/// println!("Hello world!");
287/// }));
288/// ```
289pub struct LocalExecutor<'a, const C: usize = 64> {
290 executor: Executor<'a, C>,
291 _not_send: PhantomData<core::cell::UnsafeCell<&'a Rc<()>>>,
292}
293
294#[allow(clippy::missing_safety_doc)]
295impl<'a, const C: usize> LocalExecutor<'a, C> {
296 /// Creates a single-threaded executor.
297 ///
298 /// # Examples
299 ///
300 /// ```
301 /// use edge_executor::LocalExecutor;
302 ///
303 /// let local_ex: LocalExecutor = Default::default();
304 /// ```
305 pub const fn new() -> Self {
306 Self {
307 executor: Executor::<C>::new(),
308 _not_send: PhantomData,
309 }
310 }
311
312 /// Spawns a task onto the executor.
313 ///
314 /// # Examples
315 ///
316 /// ```
317 /// use edge_executor::LocalExecutor;
318 ///
319 /// let local_ex: LocalExecutor = Default::default();
320 ///
321 /// let task = local_ex.spawn(async {
322 /// println!("Hello world");
323 /// });
324 /// ```
325 ///
326 /// Note that if the executor's queue size is equal to the number of currently
327 /// spawned and running tasks, spawning this additional task might cause the executor to panic
328 /// later, when the task is scheduled for polling.
329 pub fn spawn<F>(&self, fut: F) -> Task<F::Output>
330 where
331 F: Future + 'a,
332 F::Output: 'a,
333 {
334 unsafe { self.executor.spawn_unchecked(fut) }
335 }
336
337 /// Attempts to run a task if at least one is scheduled.
338 ///
339 /// Running a scheduled task means simply polling its future once.
340 ///
341 /// # Examples
342 ///
343 /// ```
344 /// use edge_executor::LocalExecutor;
345 ///
346 /// let local_ex: LocalExecutor = Default::default();
347 /// assert!(!local_ex.try_tick()); // no tasks to run
348 ///
349 /// let task = local_ex.spawn(async {
350 /// println!("Hello world");
351 /// });
352 /// assert!(local_ex.try_tick()); // a task was found
353 /// ```
354 pub fn try_tick(&self) -> bool {
355 self.executor.try_tick()
356 }
357
358 /// Runs a single task asynchronously.
359 ///
360 /// Running a task means simply polling its future once.
361 ///
362 /// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
363 ///
364 /// # Examples
365 ///
366 /// ```
367 /// use edge_executor::{LocalExecutor, block_on};
368 ///
369 /// let local_ex: LocalExecutor = Default::default();
370 ///
371 /// let task = local_ex.spawn(async {
372 /// println!("Hello world");
373 /// });
374 /// block_on(local_ex.tick()); // runs the task
375 /// ```
376 pub async fn tick(&self) {
377 self.executor.tick().await
378 }
379
380 /// Runs the executor asynchronously until the given future completes.
381 ///
382 /// # Examples
383 ///
384 /// ```
385 /// use edge_executor::{LocalExecutor, block_on};
386 ///
387 /// let local_ex: LocalExecutor = Default::default();
388 ///
389 /// let task = local_ex.spawn(async { 1 + 2 });
390 /// let res = block_on(local_ex.run(async { task.await * 2 }));
391 ///
392 /// assert_eq!(res, 6);
393 /// ```
394 pub async fn run<F>(&self, fut: F) -> F::Output
395 where
396 F: Future,
397 {
398 unsafe { self.executor.run_unchecked(fut) }.await
399 }
400}
401
402impl<'a, const C: usize> Default for LocalExecutor<'a, C> {
403 fn default() -> Self {
404 Self::new()
405 }
406}
407
408struct State<const C: usize> {
409 #[cfg(all(not(feature = "heapless"), feature = "unbounded"))]
410 queue: crossbeam_queue::SegQueue<Runnable>,
411 #[cfg(all(not(feature = "heapless"), not(feature = "unbounded")))]
412 queue: crossbeam_queue::ArrayQueue<Runnable>,
413 #[cfg(feature = "heapless")]
414 queue: heapless::mpmc::MpMcQueue<Runnable, C>,
415 waker: AtomicWaker,
416}
417
418impl<const C: usize> State<C> {
419 fn new() -> Self {
420 Self {
421 #[cfg(all(not(feature = "heapless"), feature = "unbounded"))]
422 queue: crossbeam_queue::SegQueue::new(),
423 #[cfg(all(not(feature = "heapless"), not(feature = "unbounded")))]
424 queue: crossbeam_queue::ArrayQueue::new(C),
425 #[cfg(feature = "heapless")]
426 queue: heapless::mpmc::MpMcQueue::new(),
427 waker: AtomicWaker::new(),
428 }
429 }
430}