embassy_executor/raw/
mod.rs

1//! Raw executor.
2//!
3//! This module exposes "raw" Executor and Task structs for more low level control.
4//!
5//! ## WARNING: here be dragons!
6//!
7//! Using this module requires respecting subtle safety contracts. If you can, prefer using the safe
8//! [executor wrappers](crate::Executor) and the [`embassy_executor::task`](embassy_executor_macros::task) macro, which are fully safe.
9
10#[cfg_attr(target_has_atomic = "ptr", path = "run_queue_atomics.rs")]
11#[cfg_attr(not(target_has_atomic = "ptr"), path = "run_queue_critical_section.rs")]
12mod run_queue;
13
14#[cfg_attr(all(cortex_m, target_has_atomic = "32"), path = "state_atomics_arm.rs")]
15#[cfg_attr(all(not(cortex_m), target_has_atomic = "8"), path = "state_atomics.rs")]
16#[cfg_attr(not(target_has_atomic = "8"), path = "state_critical_section.rs")]
17mod state;
18
19pub mod timer_queue;
20#[cfg(feature = "trace")]
21pub mod trace;
22pub(crate) mod util;
23#[cfg_attr(feature = "turbowakers", path = "waker_turbo.rs")]
24mod waker;
25
26use core::future::Future;
27use core::marker::PhantomData;
28use core::mem;
29use core::pin::Pin;
30use core::ptr::NonNull;
31#[cfg(not(feature = "arch-avr"))]
32use core::sync::atomic::AtomicPtr;
33use core::sync::atomic::Ordering;
34use core::task::{Context, Poll};
35
36#[cfg(feature = "arch-avr")]
37use portable_atomic::AtomicPtr;
38
39use self::run_queue::{RunQueue, RunQueueItem};
40use self::state::State;
41use self::util::{SyncUnsafeCell, UninitCell};
42pub use self::waker::task_from_waker;
43use super::SpawnToken;
44
45/// Raw task header for use in task pointers.
46///
47/// A task can be in one of the following states:
48///
49/// - Not spawned: the task is ready to spawn.
50/// - `SPAWNED`: the task is currently spawned and may be running.
51/// - `RUN_ENQUEUED`: the task is enqueued to be polled. Note that the task may be `!SPAWNED`.
52///    In this case, the `RUN_ENQUEUED` state will be cleared when the task is next polled, without
53///    polling the task's future.
54///
55/// A task's complete life cycle is as follows:
56///
57/// ```text
58/// ┌────────────┐   ┌────────────────────────┐
59/// │Not spawned │◄─5┤Not spawned|Run enqueued│
60/// │            ├6─►│                        │
61/// └─────┬──────┘   └──────▲─────────────────┘
62///       1                 │
63///       │    ┌────────────┘
64///       │    4
65/// ┌─────▼────┴─────────┐
66/// │Spawned|Run enqueued│
67/// │                    │
68/// └─────┬▲─────────────┘
69///       2│
70///       │3
71/// ┌─────▼┴─────┐
72/// │  Spawned   │
73/// │            │
74/// └────────────┘
75/// ```
76///
77/// Transitions:
78/// - 1: Task is spawned - `AvailableTask::claim -> Executor::spawn`
79/// - 2: During poll - `RunQueue::dequeue_all -> State::run_dequeue`
80/// - 3: Task wakes itself, waker wakes task, or task exits - `Waker::wake -> wake_task -> State::run_enqueue`
81/// - 4: A run-queued task exits - `TaskStorage::poll -> Poll::Ready`
82/// - 5: Task is dequeued. The task's future is not polled, because exiting the task replaces its `poll_fn`.
83/// - 6: A task is waken when it is not spawned - `wake_task -> State::run_enqueue`
84pub(crate) struct TaskHeader {
85    pub(crate) state: State,
86    pub(crate) run_queue_item: RunQueueItem,
87    pub(crate) executor: AtomicPtr<SyncExecutor>,
88    poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>,
89
90    /// Integrated timer queue storage. This field should not be accessed outside of the timer queue.
91    pub(crate) timer_queue_item: timer_queue::TimerQueueItem,
92    #[cfg(feature = "trace")]
93    pub(crate) name: Option<&'static str>,
94    #[cfg(feature = "trace")]
95    pub(crate) id: u32,
96    #[cfg(feature = "trace")]
97    all_tasks_next: AtomicPtr<TaskHeader>,
98}
99
100/// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased.
101#[derive(Debug, Clone, Copy, PartialEq)]
102pub struct TaskRef {
103    ptr: NonNull<TaskHeader>,
104}
105
106unsafe impl Send for TaskRef where &'static TaskHeader: Send {}
107unsafe impl Sync for TaskRef where &'static TaskHeader: Sync {}
108
109impl TaskRef {
110    fn new<F: Future + 'static>(task: &'static TaskStorage<F>) -> Self {
111        Self {
112            ptr: NonNull::from(task).cast(),
113        }
114    }
115
116    /// Safety: The pointer must have been obtained with `Task::as_ptr`
117    pub(crate) unsafe fn from_ptr(ptr: *const TaskHeader) -> Self {
118        Self {
119            ptr: NonNull::new_unchecked(ptr as *mut TaskHeader),
120        }
121    }
122
123    /// # Safety
124    ///
125    /// The result of this function must only be compared
126    /// for equality, or stored, but not used.
127    pub const unsafe fn dangling() -> Self {
128        Self {
129            ptr: NonNull::dangling(),
130        }
131    }
132
133    pub(crate) fn header(self) -> &'static TaskHeader {
134        unsafe { self.ptr.as_ref() }
135    }
136
137    /// Returns a reference to the executor that the task is currently running on.
138    pub unsafe fn executor(self) -> Option<&'static Executor> {
139        let executor = self.header().executor.load(Ordering::Relaxed);
140        executor.as_ref().map(|e| Executor::wrap(e))
141    }
142
143    /// Returns a reference to the timer queue item.
144    pub fn timer_queue_item(&self) -> &'static timer_queue::TimerQueueItem {
145        &self.header().timer_queue_item
146    }
147
148    /// The returned pointer is valid for the entire TaskStorage.
149    pub(crate) fn as_ptr(self) -> *const TaskHeader {
150        self.ptr.as_ptr()
151    }
152}
153
154/// Raw storage in which a task can be spawned.
155///
156/// This struct holds the necessary memory to spawn one task whose future is `F`.
157/// At a given time, the `TaskStorage` may be in spawned or not-spawned state. You
158/// may spawn it with [`TaskStorage::spawn()`], which will fail if it is already spawned.
159///
160/// A `TaskStorage` must live forever, it may not be deallocated even after the task has finished
161/// running. Hence the relevant methods require `&'static self`. It may be reused, however.
162///
163/// Internally, the [embassy_executor::task](embassy_executor_macros::task) macro allocates an array of `TaskStorage`s
164/// in a `static`. The most common reason to use the raw `Task` is to have control of where
165/// the memory for the task is allocated: on the stack, or on the heap with e.g. `Box::leak`, etc.
166
167// repr(C) is needed to guarantee that the Task is located at offset 0
168// This makes it safe to cast between TaskHeader and TaskStorage pointers.
169#[repr(C)]
170pub struct TaskStorage<F: Future + 'static> {
171    raw: TaskHeader,
172    future: UninitCell<F>, // Valid if STATE_SPAWNED
173}
174
175unsafe fn poll_exited(_p: TaskRef) {
176    // Nothing to do, the task is already !SPAWNED and dequeued.
177}
178
179impl<F: Future + 'static> TaskStorage<F> {
180    const NEW: Self = Self::new();
181
182    /// Create a new TaskStorage, in not-spawned state.
183    pub const fn new() -> Self {
184        Self {
185            raw: TaskHeader {
186                state: State::new(),
187                run_queue_item: RunQueueItem::new(),
188                executor: AtomicPtr::new(core::ptr::null_mut()),
189                // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss`
190                poll_fn: SyncUnsafeCell::new(None),
191
192                timer_queue_item: timer_queue::TimerQueueItem::new(),
193                #[cfg(feature = "trace")]
194                name: None,
195                #[cfg(feature = "trace")]
196                id: 0,
197                #[cfg(feature = "trace")]
198                all_tasks_next: AtomicPtr::new(core::ptr::null_mut()),
199            },
200            future: UninitCell::uninit(),
201        }
202    }
203
204    /// Try to spawn the task.
205    ///
206    /// The `future` closure constructs the future. It's only called if spawning is
207    /// actually possible. It is a closure instead of a simple `future: F` param to ensure
208    /// the future is constructed in-place, avoiding a temporary copy in the stack thanks to
209    /// NRVO optimizations.
210    ///
211    /// This function will fail if the task is already spawned and has not finished running.
212    /// In this case, the error is delayed: a "poisoned" SpawnToken is returned, which will
213    /// cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error.
214    ///
215    /// Once the task has finished running, you may spawn it again. It is allowed to spawn it
216    /// on a different executor.
217    pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> {
218        let task = AvailableTask::claim(self);
219        match task {
220            Some(task) => task.initialize(future),
221            None => SpawnToken::new_failed(),
222        }
223    }
224
225    unsafe fn poll(p: TaskRef) {
226        let this = &*p.as_ptr().cast::<TaskStorage<F>>();
227
228        let future = Pin::new_unchecked(this.future.as_mut());
229        let waker = waker::from_task(p);
230        let mut cx = Context::from_waker(&waker);
231        match future.poll(&mut cx) {
232            Poll::Ready(_) => {
233                #[cfg(feature = "trace")]
234                let exec_ptr: *const SyncExecutor = this.raw.executor.load(Ordering::Relaxed);
235
236                // As the future has finished and this function will not be called
237                // again, we can safely drop the future here.
238                this.future.drop_in_place();
239
240                // We replace the poll_fn with a despawn function, so that the task is cleaned up
241                // when the executor polls it next.
242                this.raw.poll_fn.set(Some(poll_exited));
243
244                // Make sure we despawn last, so that other threads can only spawn the task
245                // after we're done with it.
246                this.raw.state.despawn();
247
248                #[cfg(feature = "trace")]
249                trace::task_end(exec_ptr, &p);
250            }
251            Poll::Pending => {}
252        }
253
254        // the compiler is emitting a virtual call for waker drop, but we know
255        // it's a noop for our waker.
256        mem::forget(waker);
257    }
258
259    #[doc(hidden)]
260    #[allow(dead_code)]
261    fn _assert_sync(self) {
262        fn assert_sync<T: Sync>(_: T) {}
263
264        assert_sync(self)
265    }
266}
267
268/// An uninitialized [`TaskStorage`].
269pub struct AvailableTask<F: Future + 'static> {
270    task: &'static TaskStorage<F>,
271}
272
273impl<F: Future + 'static> AvailableTask<F> {
274    /// Try to claim a [`TaskStorage`].
275    ///
276    /// This function returns `None` if a task has already been spawned and has not finished running.
277    pub fn claim(task: &'static TaskStorage<F>) -> Option<Self> {
278        task.raw.state.spawn().then(|| Self { task })
279    }
280
281    fn initialize_impl<S>(self, future: impl FnOnce() -> F) -> SpawnToken<S> {
282        unsafe {
283            self.task.raw.poll_fn.set(Some(TaskStorage::<F>::poll));
284            self.task.future.write_in_place(future);
285
286            let task = TaskRef::new(self.task);
287
288            SpawnToken::new(task)
289        }
290    }
291
292    /// Initialize the [`TaskStorage`] to run the given future.
293    pub fn initialize(self, future: impl FnOnce() -> F) -> SpawnToken<F> {
294        self.initialize_impl::<F>(future)
295    }
296
297    /// Initialize the [`TaskStorage`] to run the given future.
298    ///
299    /// # Safety
300    ///
301    /// `future` must be a closure of the form `move || my_async_fn(args)`, where `my_async_fn`
302    /// is an `async fn`, NOT a hand-written `Future`.
303    #[doc(hidden)]
304    pub unsafe fn __initialize_async_fn<FutFn>(self, future: impl FnOnce() -> F) -> SpawnToken<FutFn> {
305        // When send-spawning a task, we construct the future in this thread, and effectively
306        // "send" it to the executor thread by enqueuing it in its queue. Therefore, in theory,
307        // send-spawning should require the future `F` to be `Send`.
308        //
309        // The problem is this is more restrictive than needed. Once the future is executing,
310        // it is never sent to another thread. It is only sent when spawning. It should be
311        // enough for the task's arguments to be Send. (and in practice it's super easy to
312        // accidentally make your futures !Send, for example by holding an `Rc` or a `&RefCell` across an `.await`.)
313        //
314        // We can do it by sending the task args and constructing the future in the executor thread
315        // on first poll. However, this cannot be done in-place, so it'll waste stack space for a copy
316        // of the args.
317        //
318        // Luckily, an `async fn` future contains just the args when freshly constructed. So, if the
319        // args are Send, it's OK to send a !Send future, as long as we do it before first polling it.
320        //
321        // (Note: this is how the generators are implemented today, it's not officially guaranteed yet,
322        // but it's possible it'll be guaranteed in the future. See zulip thread:
323        // https://rust-lang.zulipchat.com/#narrow/stream/187312-wg-async/topic/.22only.20before.20poll.22.20Send.20futures )
324        //
325        // The `FutFn` captures all the args, so if it's Send, the task can be send-spawned.
326        // This is why we return `SpawnToken<FutFn>` below.
327        //
328        // This ONLY holds for `async fn` futures. The other `spawn` methods can be called directly
329        // by the user, with arbitrary hand-implemented futures. This is why these return `SpawnToken<F>`.
330        self.initialize_impl::<FutFn>(future)
331    }
332}
333
334/// Raw storage that can hold up to N tasks of the same type.
335///
336/// This is essentially a `[TaskStorage<F>; N]`.
337pub struct TaskPool<F: Future + 'static, const N: usize> {
338    pool: [TaskStorage<F>; N],
339}
340
341impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
342    /// Create a new TaskPool, with all tasks in non-spawned state.
343    pub const fn new() -> Self {
344        Self {
345            pool: [TaskStorage::NEW; N],
346        }
347    }
348
349    fn spawn_impl<T>(&'static self, future: impl FnOnce() -> F) -> SpawnToken<T> {
350        match self.pool.iter().find_map(AvailableTask::claim) {
351            Some(task) => task.initialize_impl::<T>(future),
352            None => SpawnToken::new_failed(),
353        }
354    }
355
356    /// Try to spawn a task in the pool.
357    ///
358    /// See [`TaskStorage::spawn()`] for details.
359    ///
360    /// This will loop over the pool and spawn the task in the first storage that
361    /// is currently free. If none is free, a "poisoned" SpawnToken is returned,
362    /// which will cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error.
363    pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> {
364        self.spawn_impl::<F>(future)
365    }
366
367    /// Like spawn(), but allows the task to be send-spawned if the args are Send even if
368    /// the future is !Send.
369    ///
370    /// Not covered by semver guarantees. DO NOT call this directly. Intended to be used
371    /// by the Embassy macros ONLY.
372    ///
373    /// SAFETY: `future` must be a closure of the form `move || my_async_fn(args)`, where `my_async_fn`
374    /// is an `async fn`, NOT a hand-written `Future`.
375    #[doc(hidden)]
376    pub unsafe fn _spawn_async_fn<FutFn>(&'static self, future: FutFn) -> SpawnToken<impl Sized>
377    where
378        FutFn: FnOnce() -> F,
379    {
380        // See the comment in AvailableTask::__initialize_async_fn for explanation.
381        self.spawn_impl::<FutFn>(future)
382    }
383}
384
385#[derive(Clone, Copy)]
386pub(crate) struct Pender(*mut ());
387
388unsafe impl Send for Pender {}
389unsafe impl Sync for Pender {}
390
391impl Pender {
392    pub(crate) fn pend(self) {
393        extern "Rust" {
394            fn __pender(context: *mut ());
395        }
396        unsafe { __pender(self.0) };
397    }
398}
399
400pub(crate) struct SyncExecutor {
401    run_queue: RunQueue,
402    pender: Pender,
403}
404
405impl SyncExecutor {
406    pub(crate) fn new(pender: Pender) -> Self {
407        Self {
408            run_queue: RunQueue::new(),
409            pender,
410        }
411    }
412
413    /// Enqueue a task in the task queue
414    ///
415    /// # Safety
416    /// - `task` must be a valid pointer to a spawned task.
417    /// - `task` must be set up to run in this executor.
418    /// - `task` must NOT be already enqueued (in this executor or another one).
419    #[inline(always)]
420    unsafe fn enqueue(&self, task: TaskRef, l: state::Token) {
421        #[cfg(feature = "trace")]
422        trace::task_ready_begin(self, &task);
423
424        if self.run_queue.enqueue(task, l) {
425            self.pender.pend();
426        }
427    }
428
429    pub(super) unsafe fn spawn(&'static self, task: TaskRef) {
430        task.header()
431            .executor
432            .store((self as *const Self).cast_mut(), Ordering::Relaxed);
433
434        #[cfg(feature = "trace")]
435        trace::task_new(self, &task);
436
437        state::locked(|l| {
438            self.enqueue(task, l);
439        })
440    }
441
442    /// # Safety
443    ///
444    /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created.
445    pub(crate) unsafe fn poll(&'static self) {
446        #[cfg(feature = "trace")]
447        trace::poll_start(self);
448
449        self.run_queue.dequeue_all(|p| {
450            let task = p.header();
451
452            #[cfg(feature = "trace")]
453            trace::task_exec_begin(self, &p);
454
455            // Run the task
456            task.poll_fn.get().unwrap_unchecked()(p);
457
458            #[cfg(feature = "trace")]
459            trace::task_exec_end(self, &p);
460        });
461
462        #[cfg(feature = "trace")]
463        trace::executor_idle(self)
464    }
465}
466
467/// Raw executor.
468///
469/// This is the core of the Embassy executor. It is low-level, requiring manual
470/// handling of wakeups and task polling. If you can, prefer using one of the
471/// [higher level executors](crate::Executor).
472///
473/// The raw executor leaves it up to you to handle wakeups and scheduling:
474///
475/// - To get the executor to do work, call `poll()`. This will poll all queued tasks (all tasks
476///   that "want to run").
477/// - You must supply a pender function, as shown below. The executor will call it to notify you
478///   it has work to do. You must arrange for `poll()` to be called as soon as possible.
479/// - Enabling `arch-xx` features will define a pender function for you. This means that you
480///   are limited to using the executors provided to you by the architecture/platform
481///   implementation. If you need a different executor, you must not enable `arch-xx` features.
482///
483/// The pender can be called from *any* context: any thread, any interrupt priority
484/// level, etc. It may be called synchronously from any `Executor` method call as well.
485/// You must deal with this correctly.
486///
487/// In particular, you must NOT call `poll` directly from the pender callback, as this violates
488/// the requirement for `poll` to not be called reentrantly.
489///
490/// The pender function must be exported with the name `__pender` and have the following signature:
491///
492/// ```rust
493/// #[export_name = "__pender"]
494/// fn pender(context: *mut ()) {
495///    // schedule `poll()` to be called
496/// }
497/// ```
498///
499/// The `context` argument is a piece of arbitrary data the executor will pass to the pender.
500/// You can set the `context` when calling [`Executor::new()`]. You can use it to, for example,
501/// differentiate between executors, or to pass a pointer to a callback that should be called.
502#[repr(transparent)]
503pub struct Executor {
504    pub(crate) inner: SyncExecutor,
505
506    _not_sync: PhantomData<*mut ()>,
507}
508
509impl Executor {
510    pub(crate) unsafe fn wrap(inner: &SyncExecutor) -> &Self {
511        mem::transmute(inner)
512    }
513
514    /// Create a new executor.
515    ///
516    /// When the executor has work to do, it will call the pender function and pass `context` to it.
517    ///
518    /// See [`Executor`] docs for details on the pender.
519    pub fn new(context: *mut ()) -> Self {
520        Self {
521            inner: SyncExecutor::new(Pender(context)),
522            _not_sync: PhantomData,
523        }
524    }
525
526    /// Spawn a task in this executor.
527    ///
528    /// # Safety
529    ///
530    /// `task` must be a valid pointer to an initialized but not-already-spawned task.
531    ///
532    /// It is OK to use `unsafe` to call this from a thread that's not the executor thread.
533    /// In this case, the task's Future must be Send. This is because this is effectively
534    /// sending the task to the executor thread.
535    pub(super) unsafe fn spawn(&'static self, task: TaskRef) {
536        self.inner.spawn(task)
537    }
538
539    /// Poll all queued tasks in this executor.
540    ///
541    /// This loops over all tasks that are queued to be polled (i.e. they're
542    /// freshly spawned or they've been woken). Other tasks are not polled.
543    ///
544    /// You must call `poll` after receiving a call to the pender. It is OK
545    /// to call `poll` even when not requested by the pender, but it wastes
546    /// energy.
547    ///
548    /// # Safety
549    ///
550    /// You must call `initialize` before calling this method.
551    ///
552    /// You must NOT call `poll` reentrantly on the same executor.
553    ///
554    /// In particular, note that `poll` may call the pender synchronously. Therefore, you
555    /// must NOT directly call `poll()` from the pender callback. Instead, the callback has to
556    /// somehow schedule for `poll()` to be called later, at a time you know for sure there's
557    /// no `poll()` already running.
558    pub unsafe fn poll(&'static self) {
559        self.inner.poll()
560    }
561
562    /// Get a spawner that spawns tasks in this executor.
563    ///
564    /// It is OK to call this method multiple times to obtain multiple
565    /// `Spawner`s. You may also copy `Spawner`s.
566    pub fn spawner(&'static self) -> super::Spawner {
567        super::Spawner::new(self)
568    }
569
570    /// Get a unique ID for this Executor.
571    pub fn id(&'static self) -> usize {
572        &self.inner as *const SyncExecutor as usize
573    }
574}
575
576/// Wake a task by `TaskRef`.
577///
578/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`].
579pub fn wake_task(task: TaskRef) {
580    let header = task.header();
581    header.state.run_enqueue(|l| {
582        // We have just marked the task as scheduled, so enqueue it.
583        unsafe {
584            let executor = header.executor.load(Ordering::Relaxed).as_ref().unwrap_unchecked();
585            executor.enqueue(task, l);
586        }
587    });
588}
589
590/// Wake a task by `TaskRef` without calling pend.
591///
592/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`].
593pub fn wake_task_no_pend(task: TaskRef) {
594    let header = task.header();
595    header.state.run_enqueue(|l| {
596        // We have just marked the task as scheduled, so enqueue it.
597        unsafe {
598            let executor = header.executor.load(Ordering::Relaxed).as_ref().unwrap_unchecked();
599            executor.run_queue.enqueue(task, l);
600        }
601    });
602}