Skip to main content

embassy_executor/raw/
mod.rs

1//! Raw executor.
2//!
3//! This module exposes "raw" Executor and Task structs for more low level control.
4//!
5//! ## WARNING: here be dragons!
6//!
7//! Using this module requires respecting subtle safety contracts. If you can, prefer using the safe
8//! [executor wrappers](crate::Executor) and the [`embassy_executor::task`](embassy_executor_macros::task) macro, which are fully safe.
9
10mod run_queue;
11
12#[cfg_attr(all(cortex_m, target_has_atomic = "32"), path = "state_atomics_arm.rs")]
13#[cfg_attr(
14    all(not(cortex_m), any(target_has_atomic = "8", target_has_atomic = "32")),
15    path = "state_atomics.rs"
16)]
17#[cfg_attr(
18    not(any(target_has_atomic = "8", target_has_atomic = "32")),
19    path = "state_critical_section.rs"
20)]
21mod state;
22
23#[cfg(feature = "_any_trace")]
24pub mod trace;
25pub(crate) mod util;
26#[cfg_attr(feature = "turbowakers", path = "waker_turbo.rs")]
27mod waker;
28
29#[cfg(feature = "scheduler-deadline")]
30mod deadline;
31
32use core::future::Future;
33use core::marker::PhantomData;
34use core::mem;
35use core::pin::Pin;
36use core::ptr::NonNull;
37#[cfg(not(feature = "platform-avr"))]
38use core::sync::atomic::AtomicPtr;
39use core::sync::atomic::Ordering;
40use core::task::{Context, Poll, Waker};
41
42#[cfg(feature = "scheduler-deadline")]
43pub(crate) use deadline::Deadline;
44use embassy_executor_timer_queue::TimerQueueItem;
45#[cfg(feature = "platform-avr")]
46use portable_atomic::AtomicPtr;
47
48use self::run_queue::{RunQueue, RunQueueItem};
49use self::state::State;
50use self::util::{SyncUnsafeCell, UninitCell};
51pub use self::waker::task_from_waker;
52use self::waker::try_task_from_waker;
53use super::SpawnToken;
54use crate::{Metadata, SpawnError};
55
56#[unsafe(no_mangle)]
57extern "Rust" fn __embassy_time_queue_item_from_waker(waker: &Waker) -> &'static mut TimerQueueItem {
58    unsafe { task_from_waker(waker).timer_queue_item() }
59}
60
61#[unsafe(no_mangle)]
62extern "Rust" fn __try_embassy_time_queue_item_from_waker(waker: &Waker) -> Option<&'static mut TimerQueueItem> {
63    unsafe { try_task_from_waker(waker).map(|task| task.timer_queue_item()) }
64}
65
66/// Raw task header for use in task pointers.
67///
68/// A task can be in one of the following states:
69///
70/// - Not spawned: the task is ready to spawn.
71/// - `SPAWNED`: the task is currently spawned and may be running.
72/// - `RUN_ENQUEUED`: the task is enqueued to be polled. Note that the task may be `!SPAWNED`.
73///    In this case, the `RUN_ENQUEUED` state will be cleared when the task is next polled, without
74///    polling the task's future.
75///
76/// A task's complete life cycle is as follows:
77///
78/// ```text
79/// ┌────────────┐   ┌────────────────────────┐
80/// │Not spawned │◄─5┤Not spawned|Run enqueued│
81/// │            ├6─►│                        │
82/// └─────┬──────┘   └──────▲─────────────────┘
83///       1                 │
84///       │    ┌────────────┘
85///       │    4
86/// ┌─────▼────┴─────────┐
87/// │Spawned|Run enqueued│
88/// │                    │
89/// └─────┬▲─────────────┘
90///       2│
91///       │3
92/// ┌─────▼┴─────┐
93/// │  Spawned   │
94/// │            │
95/// └────────────┘
96/// ```
97///
98/// Transitions:
99/// - 1: Task is spawned - `AvailableTask::claim -> Executor::spawn`
100/// - 2: During poll - `RunQueue::dequeue_all -> State::run_dequeue`
101/// - 3: Task wakes itself, waker wakes task, or task exits - `Waker::wake -> wake_task -> State::run_enqueue`
102/// - 4: A run-queued task exits - `TaskStorage::poll -> Poll::Ready`
103/// - 5: Task is dequeued. The task's future is not polled, because exiting the task replaces its `poll_fn`.
104/// - 6: A task is waken when it is not spawned - `wake_task -> State::run_enqueue`
105pub(crate) struct TaskHeader {
106    pub(crate) state: State,
107    pub(crate) run_queue_item: RunQueueItem,
108
109    pub(crate) executor: AtomicPtr<SyncExecutor>,
110    poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>,
111
112    /// Integrated timer queue storage. This field should not be accessed outside of the timer queue.
113    pub(crate) timer_queue_item: TimerQueueItem,
114
115    pub(crate) metadata: Metadata,
116
117    #[cfg(feature = "rtos-trace")]
118    all_tasks_next: AtomicPtr<TaskHeader>,
119}
120
121/// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased.
122#[derive(Debug, Clone, Copy, PartialEq)]
123pub struct TaskRef {
124    ptr: NonNull<TaskHeader>,
125}
126
127unsafe impl Send for TaskRef where &'static TaskHeader: Send {}
128unsafe impl Sync for TaskRef where &'static TaskHeader: Sync {}
129
130impl TaskRef {
131    fn new<F: Future + 'static>(task: &'static TaskStorage<F>) -> Self {
132        Self {
133            ptr: NonNull::from(task).cast(),
134        }
135    }
136
137    /// Safety: The pointer must have been obtained with `Task::as_ptr`
138    pub(crate) unsafe fn from_ptr(ptr: *const TaskHeader) -> Self {
139        Self {
140            ptr: NonNull::new_unchecked(ptr as *mut TaskHeader),
141        }
142    }
143
144    pub(crate) fn header(self) -> &'static TaskHeader {
145        unsafe { self.ptr.as_ref() }
146    }
147
148    pub(crate) fn metadata(self) -> &'static Metadata {
149        unsafe { &self.ptr.as_ref().metadata }
150    }
151
152    /// Returns a reference to the executor that the task is currently running on.
153    pub unsafe fn executor(self) -> Option<&'static Executor> {
154        let executor = self.header().executor.load(Ordering::Relaxed);
155        executor.as_ref().map(|e| Executor::wrap(e))
156    }
157
158    /// Returns a mutable reference to the timer queue item.
159    ///
160    /// Safety
161    ///
162    /// This function must only be called in the context of the integrated timer queue.
163    pub unsafe fn timer_queue_item(mut self) -> &'static mut TimerQueueItem {
164        unsafe { &mut self.ptr.as_mut().timer_queue_item }
165    }
166
167    /// The returned pointer is valid for the entire TaskStorage.
168    pub(crate) fn as_ptr(self) -> *const TaskHeader {
169        self.ptr.as_ptr()
170    }
171
172    /// Returns the task ID.
173    /// This can be used in combination with rtos-trace to match task names with IDs
174    pub fn id(&self) -> u32 {
175        self.as_ptr() as u32
176    }
177}
178
179/// Raw storage in which a task can be spawned.
180///
181/// This struct holds the necessary memory to spawn one task whose future is `F`.
182/// At a given time, the `TaskStorage` may be in spawned or not-spawned state. You
183/// may spawn it with [`TaskStorage::spawn()`], which will fail if it is already spawned.
184///
185/// A `TaskStorage` must live forever, it may not be deallocated even after the task has finished
186/// running. Hence the relevant methods require `&'static self`. It may be reused, however.
187///
188/// Internally, the [embassy_executor::task](embassy_executor_macros::task) macro allocates an array of `TaskStorage`s
189/// in a `static`. The most common reason to use the raw `Task` is to have control of where
190/// the memory for the task is allocated: on the stack, or on the heap with e.g. `Box::leak`, etc.
191
192// repr(C) is needed to guarantee that the Task is located at offset 0
193// This makes it safe to cast between TaskHeader and TaskStorage pointers.
194#[repr(C)]
195pub struct TaskStorage<F: Future + 'static> {
196    raw: TaskHeader,
197    future: UninitCell<F>, // Valid if STATE_SPAWNED
198}
199
200unsafe fn poll_exited(_p: TaskRef) {
201    // Nothing to do, the task is already !SPAWNED and dequeued.
202}
203
204impl<F: Future + 'static> TaskStorage<F> {
205    const NEW: Self = Self::new();
206
207    /// Create a new TaskStorage, in not-spawned state.
208    pub const fn new() -> Self {
209        Self {
210            raw: TaskHeader {
211                state: State::new(),
212                run_queue_item: RunQueueItem::new(),
213                executor: AtomicPtr::new(core::ptr::null_mut()),
214                // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss`
215                poll_fn: SyncUnsafeCell::new(None),
216
217                timer_queue_item: TimerQueueItem::new(),
218                metadata: Metadata::new(),
219                #[cfg(feature = "rtos-trace")]
220                all_tasks_next: AtomicPtr::new(core::ptr::null_mut()),
221            },
222            future: UninitCell::uninit(),
223        }
224    }
225
226    /// Try to spawn the task.
227    ///
228    /// The `future` closure constructs the future. It's only called if spawning is
229    /// actually possible. It is a closure instead of a simple `future: F` param to ensure
230    /// the future is constructed in-place, avoiding a temporary copy in the stack thanks to
231    /// NRVO optimizations.
232    ///
233    /// This function will fail if the task is already spawned and has not finished running.
234    /// In this case, the error is delayed: a "poisoned" SpawnToken is returned, which will
235    /// cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error.
236    ///
237    /// Once the task has finished running, you may spawn it again. It is allowed to spawn it
238    /// on a different executor.
239    pub fn spawn(&'static self, future: impl FnOnce() -> F) -> Result<SpawnToken<impl Sized>, SpawnError> {
240        let task = AvailableTask::claim(self);
241        match task {
242            Some(task) => Ok(task.initialize(future)),
243            None => Err(SpawnError::Busy),
244        }
245    }
246
247    unsafe fn poll(p: TaskRef) {
248        let this = &*p.as_ptr().cast::<TaskStorage<F>>();
249
250        let future = Pin::new_unchecked(this.future.as_mut());
251        let waker = waker::from_task(p);
252        let mut cx = Context::from_waker(&waker);
253        match future.poll(&mut cx) {
254            Poll::Ready(_) => {
255                #[cfg(feature = "_any_trace")]
256                let exec_ptr: *const SyncExecutor = this.raw.executor.load(Ordering::Relaxed);
257
258                // As the future has finished and this function will not be called
259                // again, we can safely drop the future here.
260                this.future.drop_in_place();
261
262                // We replace the poll_fn with a despawn function, so that the task is cleaned up
263                // when the executor polls it next.
264                this.raw.poll_fn.set(Some(poll_exited));
265
266                // Make sure we despawn last, so that other threads can only spawn the task
267                // after we're done with it.
268                this.raw.state.despawn();
269
270                #[cfg(feature = "_any_trace")]
271                trace::task_end(exec_ptr, &p);
272            }
273            Poll::Pending => {}
274        }
275
276        // the compiler is emitting a virtual call for waker drop, but we know
277        // it's a noop for our waker.
278        mem::forget(waker);
279    }
280
281    #[doc(hidden)]
282    #[allow(dead_code)]
283    fn _assert_sync(self) {
284        fn assert_sync<T: Sync>(_: T) {}
285
286        assert_sync(self)
287    }
288}
289
290/// An uninitialized [`TaskStorage`].
291pub struct AvailableTask<F: Future + 'static> {
292    task: &'static TaskStorage<F>,
293}
294
295impl<F: Future + 'static> AvailableTask<F> {
296    /// Try to claim a [`TaskStorage`].
297    ///
298    /// This function returns `None` if a task has already been spawned and has not finished running.
299    pub fn claim(task: &'static TaskStorage<F>) -> Option<Self> {
300        task.raw.state.spawn().then(|| Self { task })
301    }
302
303    fn initialize_impl<S>(self, future: impl FnOnce() -> F) -> SpawnToken<S> {
304        unsafe {
305            self.task.raw.metadata.reset();
306            self.task.raw.poll_fn.set(Some(TaskStorage::<F>::poll));
307            self.task.future.write_in_place(future);
308
309            let task = TaskRef::new(self.task);
310
311            SpawnToken::new(task)
312        }
313    }
314
315    /// Initialize the [`TaskStorage`] to run the given future.
316    pub fn initialize(self, future: impl FnOnce() -> F) -> SpawnToken<F> {
317        self.initialize_impl::<F>(future)
318    }
319
320    /// Initialize the [`TaskStorage`] to run the given future.
321    ///
322    /// # Safety
323    ///
324    /// `future` must be a closure of the form `move || my_async_fn(args)`, where `my_async_fn`
325    /// is an `async fn`, NOT a hand-written `Future`.
326    #[doc(hidden)]
327    pub unsafe fn __initialize_async_fn<FutFn>(self, future: impl FnOnce() -> F) -> SpawnToken<FutFn> {
328        // When send-spawning a task, we construct the future in this thread, and effectively
329        // "send" it to the executor thread by enqueuing it in its queue. Therefore, in theory,
330        // send-spawning should require the future `F` to be `Send`.
331        //
332        // The problem is this is more restrictive than needed. Once the future is executing,
333        // it is never sent to another thread. It is only sent when spawning. It should be
334        // enough for the task's arguments to be Send. (and in practice it's super easy to
335        // accidentally make your futures !Send, for example by holding an `Rc` or a `&RefCell` across an `.await`.)
336        //
337        // We can do it by sending the task args and constructing the future in the executor thread
338        // on first poll. However, this cannot be done in-place, so it'll waste stack space for a copy
339        // of the args.
340        //
341        // Luckily, an `async fn` future contains just the args when freshly constructed. So, if the
342        // args are Send, it's OK to send a !Send future, as long as we do it before first polling it.
343        //
344        // (Note: this is how the generators are implemented today, it's not officially guaranteed yet,
345        // but it's possible it'll be guaranteed in the future. See zulip thread:
346        // https://rust-lang.zulipchat.com/#narrow/stream/187312-wg-async/topic/.22only.20before.20poll.22.20Send.20futures )
347        //
348        // The `FutFn` captures all the args, so if it's Send, the task can be send-spawned.
349        // This is why we return `SpawnToken<FutFn>` below.
350        //
351        // This ONLY holds for `async fn` futures. The other `spawn` methods can be called directly
352        // by the user, with arbitrary hand-implemented futures. This is why these return `SpawnToken<F>`.
353        self.initialize_impl::<FutFn>(future)
354    }
355}
356
357/// Raw storage that can hold up to N tasks of the same type.
358///
359/// This is essentially a `[TaskStorage<F>; N]`.
360pub struct TaskPool<F: Future + 'static, const N: usize> {
361    pool: [TaskStorage<F>; N],
362}
363
364impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
365    /// Create a new TaskPool, with all tasks in non-spawned state.
366    pub const fn new() -> Self {
367        Self {
368            pool: [TaskStorage::NEW; N],
369        }
370    }
371
372    fn spawn_impl<T>(&'static self, future: impl FnOnce() -> F) -> Result<SpawnToken<T>, SpawnError> {
373        match self.pool.iter().find_map(AvailableTask::claim) {
374            Some(task) => Ok(task.initialize_impl::<T>(future)),
375            None => Err(SpawnError::Busy),
376        }
377    }
378
379    /// Try to spawn a task in the pool.
380    ///
381    /// See [`TaskStorage::spawn()`] for details.
382    ///
383    /// This will loop over the pool and spawn the task in the first storage that
384    /// is currently free. If none is free, a "poisoned" SpawnToken is returned,
385    /// which will cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error.
386    pub fn spawn(&'static self, future: impl FnOnce() -> F) -> Result<SpawnToken<impl Sized>, SpawnError> {
387        self.spawn_impl::<F>(future)
388    }
389
390    /// Like spawn(), but allows the task to be send-spawned if the args are Send even if
391    /// the future is !Send.
392    ///
393    /// Not covered by semver guarantees. DO NOT call this directly. Intended to be used
394    /// by the Embassy macros ONLY.
395    ///
396    /// SAFETY: `future` must be a closure of the form `move || my_async_fn(args)`, where `my_async_fn`
397    /// is an `async fn`, NOT a hand-written `Future`.
398    #[doc(hidden)]
399    pub unsafe fn _spawn_async_fn<FutFn>(&'static self, future: FutFn) -> Result<SpawnToken<impl Sized>, SpawnError>
400    where
401        FutFn: FnOnce() -> F,
402    {
403        // See the comment in AvailableTask::__initialize_async_fn for explanation.
404        self.spawn_impl::<FutFn>(future)
405    }
406}
407
408#[derive(Clone, Copy)]
409pub(crate) struct Pender(*mut ());
410
411unsafe impl Send for Pender {}
412unsafe impl Sync for Pender {}
413
414impl Pender {
415    pub(crate) fn pend(self) {
416        unsafe extern "Rust" {
417            fn __pender(context: *mut ());
418        }
419        unsafe { __pender(self.0) };
420    }
421}
422
423pub(crate) struct SyncExecutor {
424    run_queue: RunQueue,
425    pender: Pender,
426}
427
428impl SyncExecutor {
429    pub(crate) fn new(pender: Pender) -> Self {
430        Self {
431            run_queue: RunQueue::new(),
432            pender,
433        }
434    }
435
436    /// Enqueue a task in the task queue
437    ///
438    /// # Safety
439    /// - `task` must be a valid pointer to a spawned task.
440    /// - `task` must be set up to run in this executor.
441    /// - `task` must NOT be already enqueued (in this executor or another one).
442    #[inline(always)]
443    unsafe fn enqueue(&self, task: TaskRef, l: state::Token) {
444        #[cfg(feature = "_any_trace")]
445        trace::task_ready_begin(self, &task);
446
447        if self.run_queue.enqueue(task, l) {
448            self.pender.pend();
449        }
450    }
451
452    pub(super) unsafe fn spawn(&'static self, task: TaskRef) {
453        task.header()
454            .executor
455            .store((self as *const Self).cast_mut(), Ordering::Relaxed);
456
457        #[cfg(feature = "_any_trace")]
458        trace::task_new(self, &task);
459
460        state::locked(|l| {
461            self.enqueue(task, l);
462        })
463    }
464
465    /// # Safety
466    ///
467    /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created.
468    pub(crate) unsafe fn poll(&'static self) {
469        #[cfg(feature = "_any_trace")]
470        trace::poll_start(self);
471
472        self.run_queue.dequeue_all(|p| {
473            let task = p.header();
474
475            #[cfg(feature = "_any_trace")]
476            trace::task_exec_begin(self, &p);
477
478            // Run the task
479            task.poll_fn.get().unwrap_unchecked()(p);
480
481            #[cfg(feature = "_any_trace")]
482            trace::task_exec_end(self, &p);
483        });
484
485        #[cfg(feature = "_any_trace")]
486        trace::executor_idle(self)
487    }
488}
489
490/// Raw executor.
491///
492/// This is the core of the Embassy executor. It is low-level, requiring manual
493/// handling of wakeups and task polling. If you can, prefer using one of the
494/// [higher level executors](crate::Executor).
495///
496/// The raw executor leaves it up to you to handle wakeups and scheduling:
497///
498/// - To get the executor to do work, call `poll()`. This will poll all queued tasks (all tasks
499///   that "want to run").
500/// - You must supply a pender function, as shown below. The executor will call it to notify you
501///   it has work to do. You must arrange for `poll()` to be called as soon as possible.
502/// - Enabling `arch-xx` features will define a pender function for you. This means that you
503///   are limited to using the executors provided to you by the architecture/platform
504///   implementation. If you need a different executor, you must not enable `arch-xx` features.
505///
506/// The pender can be called from *any* context: any thread, any interrupt priority
507/// level, etc. It may be called synchronously from any `Executor` method call as well.
508/// You must deal with this correctly.
509///
510/// In particular, you must NOT call `poll` directly from the pender callback, as this violates
511/// the requirement for `poll` to not be called reentrantly.
512///
513/// The pender function must be exported with the name `__pender` and have the following signature:
514///
515/// ```rust
516/// #[unsafe(export_name = "__pender")]
517/// fn pender(context: *mut ()) {
518///    // schedule `poll()` to be called
519/// }
520/// ```
521///
522/// The `context` argument is a piece of arbitrary data the executor will pass to the pender.
523/// You can set the `context` when calling [`Executor::new()`]. You can use it to, for example,
524/// differentiate between executors, or to pass a pointer to a callback that should be called.
525#[repr(transparent)]
526pub struct Executor {
527    pub(crate) inner: SyncExecutor,
528
529    _not_sync: PhantomData<*mut ()>,
530}
531
532impl Executor {
533    pub(crate) unsafe fn wrap(inner: &SyncExecutor) -> &Self {
534        mem::transmute(inner)
535    }
536
537    /// Create a new executor.
538    ///
539    /// When the executor has work to do, it will call the pender function and pass `context` to it.
540    ///
541    /// See [`Executor`] docs for details on the pender.
542    pub fn new(context: *mut ()) -> Self {
543        Self {
544            inner: SyncExecutor::new(Pender(context)),
545            _not_sync: PhantomData,
546        }
547    }
548
549    /// Spawn a task in this executor.
550    ///
551    /// # Safety
552    ///
553    /// `task` must be a valid pointer to an initialized but not-already-spawned task.
554    ///
555    /// It is OK to use `unsafe` to call this from a thread that's not the executor thread.
556    /// In this case, the task's Future must be Send. This is because this is effectively
557    /// sending the task to the executor thread.
558    pub(super) unsafe fn spawn(&'static self, task: TaskRef) {
559        self.inner.spawn(task)
560    }
561
562    /// Poll all queued tasks in this executor.
563    ///
564    /// This loops over all tasks that are queued to be polled (i.e. they're
565    /// freshly spawned or they've been woken). Other tasks are not polled.
566    ///
567    /// You must call `poll` after receiving a call to the pender. It is OK
568    /// to call `poll` even when not requested by the pender, but it wastes
569    /// energy.
570    ///
571    /// # Safety
572    ///
573    /// You must NOT call `poll` reentrantly on the same executor.
574    ///
575    /// In particular, note that `poll` may call the pender synchronously. Therefore, you
576    /// must NOT directly call `poll()` from the pender callback. Instead, the callback has to
577    /// somehow schedule for `poll()` to be called later, at a time you know for sure there's
578    /// no `poll()` already running.
579    pub unsafe fn poll(&'static self) {
580        self.inner.poll()
581    }
582
583    /// Get a spawner that spawns tasks in this executor.
584    ///
585    /// It is OK to call this method multiple times to obtain multiple
586    /// `Spawner`s. You may also copy `Spawner`s.
587    pub fn spawner(&'static self) -> super::Spawner {
588        super::Spawner::new(self)
589    }
590
591    /// Get a unique ID for this Executor.
592    pub fn id(&'static self) -> usize {
593        &self.inner as *const SyncExecutor as usize
594    }
595}
596
597/// Wake a task by `TaskRef`.
598///
599/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`].
600pub fn wake_task(task: TaskRef) {
601    let header = task.header();
602    header.state.run_enqueue(|l| {
603        // We have just marked the task as scheduled, so enqueue it.
604        unsafe {
605            let executor = header.executor.load(Ordering::Relaxed).as_ref().unwrap_unchecked();
606            executor.enqueue(task, l);
607        }
608    });
609}
610
611/// Wake a task by `TaskRef` without calling pend.
612///
613/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`].
614pub fn wake_task_no_pend(task: TaskRef) {
615    let header = task.header();
616    header.state.run_enqueue(|l| {
617        // We have just marked the task as scheduled, so enqueue it.
618        unsafe {
619            let executor = header.executor.load(Ordering::Relaxed).as_ref().unwrap_unchecked();
620            executor.run_queue.enqueue(task, l);
621        }
622    });
623}