embassy_executor/raw/mod.rs
1//! Raw executor.
2//!
3//! This module exposes "raw" Executor and Task structs for more low level control.
4//!
5//! ## WARNING: here be dragons!
6//!
7//! Using this module requires respecting subtle safety contracts. If you can, prefer using the safe
8//! [executor wrappers](crate::Executor) and the [`embassy_executor::task`](embassy_executor_macros::task) macro, which are fully safe.
9
10mod run_queue;
11
12#[cfg_attr(all(cortex_m, target_has_atomic = "32"), path = "state_atomics_arm.rs")]
13#[cfg_attr(
14 all(not(cortex_m), any(target_has_atomic = "8", target_has_atomic = "32")),
15 path = "state_atomics.rs"
16)]
17#[cfg_attr(
18 not(any(target_has_atomic = "8", target_has_atomic = "32")),
19 path = "state_critical_section.rs"
20)]
21mod state;
22
23#[cfg(feature = "_any_trace")]
24pub mod trace;
25pub(crate) mod util;
26#[cfg_attr(feature = "turbowakers", path = "waker_turbo.rs")]
27mod waker;
28
29#[cfg(feature = "scheduler-deadline")]
30mod deadline;
31
32use core::future::Future;
33use core::marker::PhantomData;
34use core::mem;
35use core::pin::Pin;
36use core::ptr::NonNull;
37#[cfg(not(feature = "platform-avr"))]
38use core::sync::atomic::AtomicPtr;
39use core::sync::atomic::Ordering;
40use core::task::{Context, Poll, Waker};
41
42#[cfg(feature = "scheduler-deadline")]
43pub(crate) use deadline::Deadline;
44use embassy_executor_timer_queue::TimerQueueItem;
45#[cfg(feature = "platform-avr")]
46use portable_atomic::AtomicPtr;
47
48use self::run_queue::{RunQueue, RunQueueItem};
49use self::state::State;
50use self::util::{SyncUnsafeCell, UninitCell};
51pub use self::waker::task_from_waker;
52use self::waker::try_task_from_waker;
53use super::SpawnToken;
54use crate::{Metadata, SpawnError};
55
56#[unsafe(no_mangle)]
57extern "Rust" fn __embassy_time_queue_item_from_waker(waker: &Waker) -> &'static mut TimerQueueItem {
58 unsafe { task_from_waker(waker).timer_queue_item() }
59}
60
61#[unsafe(no_mangle)]
62extern "Rust" fn __try_embassy_time_queue_item_from_waker(waker: &Waker) -> Option<&'static mut TimerQueueItem> {
63 unsafe { try_task_from_waker(waker).map(|task| task.timer_queue_item()) }
64}
65
66/// Raw task header for use in task pointers.
67///
68/// A task can be in one of the following states:
69///
70/// - Not spawned: the task is ready to spawn.
71/// - `SPAWNED`: the task is currently spawned and may be running.
72/// - `RUN_ENQUEUED`: the task is enqueued to be polled. Note that the task may be `!SPAWNED`.
73/// In this case, the `RUN_ENQUEUED` state will be cleared when the task is next polled, without
74/// polling the task's future.
75///
76/// A task's complete life cycle is as follows:
77///
78/// ```text
79/// ┌────────────┐ ┌────────────────────────┐
80/// │Not spawned │◄─5┤Not spawned|Run enqueued│
81/// │ ├6─►│ │
82/// └─────┬──────┘ └──────▲─────────────────┘
83/// 1 │
84/// │ ┌────────────┘
85/// │ 4
86/// ┌─────▼────┴─────────┐
87/// │Spawned|Run enqueued│
88/// │ │
89/// └─────┬▲─────────────┘
90/// 2│
91/// │3
92/// ┌─────▼┴─────┐
93/// │ Spawned │
94/// │ │
95/// └────────────┘
96/// ```
97///
98/// Transitions:
99/// - 1: Task is spawned - `AvailableTask::claim -> Executor::spawn`
100/// - 2: During poll - `RunQueue::dequeue_all -> State::run_dequeue`
101/// - 3: Task wakes itself, waker wakes task, or task exits - `Waker::wake -> wake_task -> State::run_enqueue`
102/// - 4: A run-queued task exits - `TaskStorage::poll -> Poll::Ready`
103/// - 5: Task is dequeued. The task's future is not polled, because exiting the task replaces its `poll_fn`.
104/// - 6: A task is waken when it is not spawned - `wake_task -> State::run_enqueue`
105pub(crate) struct TaskHeader {
106 pub(crate) state: State,
107 pub(crate) run_queue_item: RunQueueItem,
108
109 pub(crate) executor: AtomicPtr<SyncExecutor>,
110 poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>,
111
112 /// Integrated timer queue storage. This field should not be accessed outside of the timer queue.
113 pub(crate) timer_queue_item: TimerQueueItem,
114
115 pub(crate) metadata: Metadata,
116
117 #[cfg(feature = "rtos-trace")]
118 all_tasks_next: AtomicPtr<TaskHeader>,
119}
120
121/// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased.
122#[derive(Debug, Clone, Copy, PartialEq)]
123pub struct TaskRef {
124 ptr: NonNull<TaskHeader>,
125}
126
127unsafe impl Send for TaskRef where &'static TaskHeader: Send {}
128unsafe impl Sync for TaskRef where &'static TaskHeader: Sync {}
129
130impl TaskRef {
131 fn new<F: Future + 'static>(task: &'static TaskStorage<F>) -> Self {
132 Self {
133 ptr: NonNull::from(task).cast(),
134 }
135 }
136
137 /// Safety: The pointer must have been obtained with `Task::as_ptr`
138 pub(crate) unsafe fn from_ptr(ptr: *const TaskHeader) -> Self {
139 Self {
140 ptr: NonNull::new_unchecked(ptr as *mut TaskHeader),
141 }
142 }
143
144 pub(crate) fn header(self) -> &'static TaskHeader {
145 unsafe { self.ptr.as_ref() }
146 }
147
148 pub(crate) fn metadata(self) -> &'static Metadata {
149 unsafe { &self.ptr.as_ref().metadata }
150 }
151
152 /// Returns a reference to the executor that the task is currently running on.
153 pub unsafe fn executor(self) -> Option<&'static Executor> {
154 let executor = self.header().executor.load(Ordering::Relaxed);
155 executor.as_ref().map(|e| Executor::wrap(e))
156 }
157
158 /// Returns a mutable reference to the timer queue item.
159 ///
160 /// Safety
161 ///
162 /// This function must only be called in the context of the integrated timer queue.
163 pub unsafe fn timer_queue_item(mut self) -> &'static mut TimerQueueItem {
164 unsafe { &mut self.ptr.as_mut().timer_queue_item }
165 }
166
167 /// The returned pointer is valid for the entire TaskStorage.
168 pub(crate) fn as_ptr(self) -> *const TaskHeader {
169 self.ptr.as_ptr()
170 }
171
172 /// Returns the task ID.
173 /// This can be used in combination with rtos-trace to match task names with IDs
174 pub fn id(&self) -> u32 {
175 self.as_ptr() as u32
176 }
177}
178
179/// Raw storage in which a task can be spawned.
180///
181/// This struct holds the necessary memory to spawn one task whose future is `F`.
182/// At a given time, the `TaskStorage` may be in spawned or not-spawned state. You
183/// may spawn it with [`TaskStorage::spawn()`], which will fail if it is already spawned.
184///
185/// A `TaskStorage` must live forever, it may not be deallocated even after the task has finished
186/// running. Hence the relevant methods require `&'static self`. It may be reused, however.
187///
188/// Internally, the [embassy_executor::task](embassy_executor_macros::task) macro allocates an array of `TaskStorage`s
189/// in a `static`. The most common reason to use the raw `Task` is to have control of where
190/// the memory for the task is allocated: on the stack, or on the heap with e.g. `Box::leak`, etc.
191
192// repr(C) is needed to guarantee that the Task is located at offset 0
193// This makes it safe to cast between TaskHeader and TaskStorage pointers.
194#[repr(C)]
195pub struct TaskStorage<F: Future + 'static> {
196 raw: TaskHeader,
197 future: UninitCell<F>, // Valid if STATE_SPAWNED
198}
199
200unsafe fn poll_exited(_p: TaskRef) {
201 // Nothing to do, the task is already !SPAWNED and dequeued.
202}
203
204impl<F: Future + 'static> TaskStorage<F> {
205 const NEW: Self = Self::new();
206
207 /// Create a new TaskStorage, in not-spawned state.
208 pub const fn new() -> Self {
209 Self {
210 raw: TaskHeader {
211 state: State::new(),
212 run_queue_item: RunQueueItem::new(),
213 executor: AtomicPtr::new(core::ptr::null_mut()),
214 // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss`
215 poll_fn: SyncUnsafeCell::new(None),
216
217 timer_queue_item: TimerQueueItem::new(),
218 metadata: Metadata::new(),
219 #[cfg(feature = "rtos-trace")]
220 all_tasks_next: AtomicPtr::new(core::ptr::null_mut()),
221 },
222 future: UninitCell::uninit(),
223 }
224 }
225
226 /// Try to spawn the task.
227 ///
228 /// The `future` closure constructs the future. It's only called if spawning is
229 /// actually possible. It is a closure instead of a simple `future: F` param to ensure
230 /// the future is constructed in-place, avoiding a temporary copy in the stack thanks to
231 /// NRVO optimizations.
232 ///
233 /// This function will fail if the task is already spawned and has not finished running.
234 /// In this case, the error is delayed: a "poisoned" SpawnToken is returned, which will
235 /// cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error.
236 ///
237 /// Once the task has finished running, you may spawn it again. It is allowed to spawn it
238 /// on a different executor.
239 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> Result<SpawnToken<impl Sized>, SpawnError> {
240 let task = AvailableTask::claim(self);
241 match task {
242 Some(task) => Ok(task.initialize(future)),
243 None => Err(SpawnError::Busy),
244 }
245 }
246
247 unsafe fn poll(p: TaskRef) {
248 let this = &*p.as_ptr().cast::<TaskStorage<F>>();
249
250 let future = Pin::new_unchecked(this.future.as_mut());
251 let waker = waker::from_task(p);
252 let mut cx = Context::from_waker(&waker);
253 match future.poll(&mut cx) {
254 Poll::Ready(_) => {
255 #[cfg(feature = "_any_trace")]
256 let exec_ptr: *const SyncExecutor = this.raw.executor.load(Ordering::Relaxed);
257
258 // As the future has finished and this function will not be called
259 // again, we can safely drop the future here.
260 this.future.drop_in_place();
261
262 // We replace the poll_fn with a despawn function, so that the task is cleaned up
263 // when the executor polls it next.
264 this.raw.poll_fn.set(Some(poll_exited));
265
266 // Make sure we despawn last, so that other threads can only spawn the task
267 // after we're done with it.
268 this.raw.state.despawn();
269
270 #[cfg(feature = "_any_trace")]
271 trace::task_end(exec_ptr, &p);
272 }
273 Poll::Pending => {}
274 }
275
276 // the compiler is emitting a virtual call for waker drop, but we know
277 // it's a noop for our waker.
278 mem::forget(waker);
279 }
280
281 #[doc(hidden)]
282 #[allow(dead_code)]
283 fn _assert_sync(self) {
284 fn assert_sync<T: Sync>(_: T) {}
285
286 assert_sync(self)
287 }
288}
289
290/// An uninitialized [`TaskStorage`].
291pub struct AvailableTask<F: Future + 'static> {
292 task: &'static TaskStorage<F>,
293}
294
295impl<F: Future + 'static> AvailableTask<F> {
296 /// Try to claim a [`TaskStorage`].
297 ///
298 /// This function returns `None` if a task has already been spawned and has not finished running.
299 pub fn claim(task: &'static TaskStorage<F>) -> Option<Self> {
300 task.raw.state.spawn().then(|| Self { task })
301 }
302
303 fn initialize_impl<S>(self, future: impl FnOnce() -> F) -> SpawnToken<S> {
304 unsafe {
305 self.task.raw.metadata.reset();
306 self.task.raw.poll_fn.set(Some(TaskStorage::<F>::poll));
307 self.task.future.write_in_place(future);
308
309 let task = TaskRef::new(self.task);
310
311 SpawnToken::new(task)
312 }
313 }
314
315 /// Initialize the [`TaskStorage`] to run the given future.
316 pub fn initialize(self, future: impl FnOnce() -> F) -> SpawnToken<F> {
317 self.initialize_impl::<F>(future)
318 }
319
320 /// Initialize the [`TaskStorage`] to run the given future.
321 ///
322 /// # Safety
323 ///
324 /// `future` must be a closure of the form `move || my_async_fn(args)`, where `my_async_fn`
325 /// is an `async fn`, NOT a hand-written `Future`.
326 #[doc(hidden)]
327 pub unsafe fn __initialize_async_fn<FutFn>(self, future: impl FnOnce() -> F) -> SpawnToken<FutFn> {
328 // When send-spawning a task, we construct the future in this thread, and effectively
329 // "send" it to the executor thread by enqueuing it in its queue. Therefore, in theory,
330 // send-spawning should require the future `F` to be `Send`.
331 //
332 // The problem is this is more restrictive than needed. Once the future is executing,
333 // it is never sent to another thread. It is only sent when spawning. It should be
334 // enough for the task's arguments to be Send. (and in practice it's super easy to
335 // accidentally make your futures !Send, for example by holding an `Rc` or a `&RefCell` across an `.await`.)
336 //
337 // We can do it by sending the task args and constructing the future in the executor thread
338 // on first poll. However, this cannot be done in-place, so it'll waste stack space for a copy
339 // of the args.
340 //
341 // Luckily, an `async fn` future contains just the args when freshly constructed. So, if the
342 // args are Send, it's OK to send a !Send future, as long as we do it before first polling it.
343 //
344 // (Note: this is how the generators are implemented today, it's not officially guaranteed yet,
345 // but it's possible it'll be guaranteed in the future. See zulip thread:
346 // https://rust-lang.zulipchat.com/#narrow/stream/187312-wg-async/topic/.22only.20before.20poll.22.20Send.20futures )
347 //
348 // The `FutFn` captures all the args, so if it's Send, the task can be send-spawned.
349 // This is why we return `SpawnToken<FutFn>` below.
350 //
351 // This ONLY holds for `async fn` futures. The other `spawn` methods can be called directly
352 // by the user, with arbitrary hand-implemented futures. This is why these return `SpawnToken<F>`.
353 self.initialize_impl::<FutFn>(future)
354 }
355}
356
357/// Raw storage that can hold up to N tasks of the same type.
358///
359/// This is essentially a `[TaskStorage<F>; N]`.
360pub struct TaskPool<F: Future + 'static, const N: usize> {
361 pool: [TaskStorage<F>; N],
362}
363
364impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
365 /// Create a new TaskPool, with all tasks in non-spawned state.
366 pub const fn new() -> Self {
367 Self {
368 pool: [TaskStorage::NEW; N],
369 }
370 }
371
372 fn spawn_impl<T>(&'static self, future: impl FnOnce() -> F) -> Result<SpawnToken<T>, SpawnError> {
373 match self.pool.iter().find_map(AvailableTask::claim) {
374 Some(task) => Ok(task.initialize_impl::<T>(future)),
375 None => Err(SpawnError::Busy),
376 }
377 }
378
379 /// Try to spawn a task in the pool.
380 ///
381 /// See [`TaskStorage::spawn()`] for details.
382 ///
383 /// This will loop over the pool and spawn the task in the first storage that
384 /// is currently free. If none is free, a "poisoned" SpawnToken is returned,
385 /// which will cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error.
386 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> Result<SpawnToken<impl Sized>, SpawnError> {
387 self.spawn_impl::<F>(future)
388 }
389
390 /// Like spawn(), but allows the task to be send-spawned if the args are Send even if
391 /// the future is !Send.
392 ///
393 /// Not covered by semver guarantees. DO NOT call this directly. Intended to be used
394 /// by the Embassy macros ONLY.
395 ///
396 /// SAFETY: `future` must be a closure of the form `move || my_async_fn(args)`, where `my_async_fn`
397 /// is an `async fn`, NOT a hand-written `Future`.
398 #[doc(hidden)]
399 pub unsafe fn _spawn_async_fn<FutFn>(&'static self, future: FutFn) -> Result<SpawnToken<impl Sized>, SpawnError>
400 where
401 FutFn: FnOnce() -> F,
402 {
403 // See the comment in AvailableTask::__initialize_async_fn for explanation.
404 self.spawn_impl::<FutFn>(future)
405 }
406}
407
408#[derive(Clone, Copy)]
409pub(crate) struct Pender(*mut ());
410
411unsafe impl Send for Pender {}
412unsafe impl Sync for Pender {}
413
414impl Pender {
415 pub(crate) fn pend(self) {
416 unsafe extern "Rust" {
417 fn __pender(context: *mut ());
418 }
419 unsafe { __pender(self.0) };
420 }
421}
422
423pub(crate) struct SyncExecutor {
424 run_queue: RunQueue,
425 pender: Pender,
426}
427
428impl SyncExecutor {
429 pub(crate) fn new(pender: Pender) -> Self {
430 Self {
431 run_queue: RunQueue::new(),
432 pender,
433 }
434 }
435
436 /// Enqueue a task in the task queue
437 ///
438 /// # Safety
439 /// - `task` must be a valid pointer to a spawned task.
440 /// - `task` must be set up to run in this executor.
441 /// - `task` must NOT be already enqueued (in this executor or another one).
442 #[inline(always)]
443 unsafe fn enqueue(&self, task: TaskRef, l: state::Token) {
444 #[cfg(feature = "_any_trace")]
445 trace::task_ready_begin(self, &task);
446
447 if self.run_queue.enqueue(task, l) {
448 self.pender.pend();
449 }
450 }
451
452 pub(super) unsafe fn spawn(&'static self, task: TaskRef) {
453 task.header()
454 .executor
455 .store((self as *const Self).cast_mut(), Ordering::Relaxed);
456
457 #[cfg(feature = "_any_trace")]
458 trace::task_new(self, &task);
459
460 state::locked(|l| {
461 self.enqueue(task, l);
462 })
463 }
464
465 /// # Safety
466 ///
467 /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created.
468 pub(crate) unsafe fn poll(&'static self) {
469 #[cfg(feature = "_any_trace")]
470 trace::poll_start(self);
471
472 self.run_queue.dequeue_all(|p| {
473 let task = p.header();
474
475 #[cfg(feature = "_any_trace")]
476 trace::task_exec_begin(self, &p);
477
478 // Run the task
479 task.poll_fn.get().unwrap_unchecked()(p);
480
481 #[cfg(feature = "_any_trace")]
482 trace::task_exec_end(self, &p);
483 });
484
485 #[cfg(feature = "_any_trace")]
486 trace::executor_idle(self)
487 }
488}
489
490/// Raw executor.
491///
492/// This is the core of the Embassy executor. It is low-level, requiring manual
493/// handling of wakeups and task polling. If you can, prefer using one of the
494/// [higher level executors](crate::Executor).
495///
496/// The raw executor leaves it up to you to handle wakeups and scheduling:
497///
498/// - To get the executor to do work, call `poll()`. This will poll all queued tasks (all tasks
499/// that "want to run").
500/// - You must supply a pender function, as shown below. The executor will call it to notify you
501/// it has work to do. You must arrange for `poll()` to be called as soon as possible.
502/// - Enabling `arch-xx` features will define a pender function for you. This means that you
503/// are limited to using the executors provided to you by the architecture/platform
504/// implementation. If you need a different executor, you must not enable `arch-xx` features.
505///
506/// The pender can be called from *any* context: any thread, any interrupt priority
507/// level, etc. It may be called synchronously from any `Executor` method call as well.
508/// You must deal with this correctly.
509///
510/// In particular, you must NOT call `poll` directly from the pender callback, as this violates
511/// the requirement for `poll` to not be called reentrantly.
512///
513/// The pender function must be exported with the name `__pender` and have the following signature:
514///
515/// ```rust
516/// #[unsafe(export_name = "__pender")]
517/// fn pender(context: *mut ()) {
518/// // schedule `poll()` to be called
519/// }
520/// ```
521///
522/// The `context` argument is a piece of arbitrary data the executor will pass to the pender.
523/// You can set the `context` when calling [`Executor::new()`]. You can use it to, for example,
524/// differentiate between executors, or to pass a pointer to a callback that should be called.
525#[repr(transparent)]
526pub struct Executor {
527 pub(crate) inner: SyncExecutor,
528
529 _not_sync: PhantomData<*mut ()>,
530}
531
532impl Executor {
533 pub(crate) unsafe fn wrap(inner: &SyncExecutor) -> &Self {
534 mem::transmute(inner)
535 }
536
537 /// Create a new executor.
538 ///
539 /// When the executor has work to do, it will call the pender function and pass `context` to it.
540 ///
541 /// See [`Executor`] docs for details on the pender.
542 pub fn new(context: *mut ()) -> Self {
543 Self {
544 inner: SyncExecutor::new(Pender(context)),
545 _not_sync: PhantomData,
546 }
547 }
548
549 /// Spawn a task in this executor.
550 ///
551 /// # Safety
552 ///
553 /// `task` must be a valid pointer to an initialized but not-already-spawned task.
554 ///
555 /// It is OK to use `unsafe` to call this from a thread that's not the executor thread.
556 /// In this case, the task's Future must be Send. This is because this is effectively
557 /// sending the task to the executor thread.
558 pub(super) unsafe fn spawn(&'static self, task: TaskRef) {
559 self.inner.spawn(task)
560 }
561
562 /// Poll all queued tasks in this executor.
563 ///
564 /// This loops over all tasks that are queued to be polled (i.e. they're
565 /// freshly spawned or they've been woken). Other tasks are not polled.
566 ///
567 /// You must call `poll` after receiving a call to the pender. It is OK
568 /// to call `poll` even when not requested by the pender, but it wastes
569 /// energy.
570 ///
571 /// # Safety
572 ///
573 /// You must NOT call `poll` reentrantly on the same executor.
574 ///
575 /// In particular, note that `poll` may call the pender synchronously. Therefore, you
576 /// must NOT directly call `poll()` from the pender callback. Instead, the callback has to
577 /// somehow schedule for `poll()` to be called later, at a time you know for sure there's
578 /// no `poll()` already running.
579 pub unsafe fn poll(&'static self) {
580 self.inner.poll()
581 }
582
583 /// Get a spawner that spawns tasks in this executor.
584 ///
585 /// It is OK to call this method multiple times to obtain multiple
586 /// `Spawner`s. You may also copy `Spawner`s.
587 pub fn spawner(&'static self) -> super::Spawner {
588 super::Spawner::new(self)
589 }
590
591 /// Get a unique ID for this Executor.
592 pub fn id(&'static self) -> usize {
593 &self.inner as *const SyncExecutor as usize
594 }
595}
596
597/// Wake a task by `TaskRef`.
598///
599/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`].
600pub fn wake_task(task: TaskRef) {
601 let header = task.header();
602 header.state.run_enqueue(|l| {
603 // We have just marked the task as scheduled, so enqueue it.
604 unsafe {
605 let executor = header.executor.load(Ordering::Relaxed).as_ref().unwrap_unchecked();
606 executor.enqueue(task, l);
607 }
608 });
609}
610
611/// Wake a task by `TaskRef` without calling pend.
612///
613/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`].
614pub fn wake_task_no_pend(task: TaskRef) {
615 let header = task.header();
616 header.state.run_enqueue(|l| {
617 // We have just marked the task as scheduled, so enqueue it.
618 unsafe {
619 let executor = header.executor.load(Ordering::Relaxed).as_ref().unwrap_unchecked();
620 executor.run_queue.enqueue(task, l);
621 }
622 });
623}