embassy_executor/raw/mod.rs
1//! Raw executor.
2//!
3//! This module exposes "raw" Executor and Task structs for more low level control.
4//!
5//! ## WARNING: here be dragons!
6//!
7//! Using this module requires respecting subtle safety contracts. If you can, prefer using the safe
8//! [executor wrappers](crate::Executor) and the [`embassy_executor::task`](embassy_executor_macros::task) macro, which are fully safe.
9
10#[cfg_attr(target_has_atomic = "ptr", path = "run_queue_atomics.rs")]
11#[cfg_attr(not(target_has_atomic = "ptr"), path = "run_queue_critical_section.rs")]
12mod run_queue;
13
14#[cfg_attr(all(cortex_m, target_has_atomic = "8"), path = "state_atomics_arm.rs")]
15#[cfg_attr(all(not(cortex_m), target_has_atomic = "8"), path = "state_atomics.rs")]
16#[cfg_attr(not(target_has_atomic = "8"), path = "state_critical_section.rs")]
17mod state;
18
19pub mod timer_queue;
20#[cfg(feature = "trace")]
21mod trace;
22pub(crate) mod util;
23#[cfg_attr(feature = "turbowakers", path = "waker_turbo.rs")]
24mod waker;
25
26use core::future::Future;
27use core::marker::PhantomData;
28use core::mem;
29use core::pin::Pin;
30use core::ptr::NonNull;
31use core::sync::atomic::{AtomicPtr, Ordering};
32use core::task::{Context, Poll};
33
34use self::run_queue::{RunQueue, RunQueueItem};
35use self::state::State;
36use self::util::{SyncUnsafeCell, UninitCell};
37pub use self::waker::task_from_waker;
38use super::SpawnToken;
39
40/// Raw task header for use in task pointers.
41///
42/// A task can be in one of the following states:
43///
44/// - Not spawned: the task is ready to spawn.
45/// - `SPAWNED`: the task is currently spawned and may be running.
46/// - `RUN_ENQUEUED`: the task is enqueued to be polled. Note that the task may be `!SPAWNED`.
47/// In this case, the `RUN_ENQUEUED` state will be cleared when the task is next polled, without
48/// polling the task's future.
49///
50/// A task's complete life cycle is as follows:
51///
52/// ```text
53/// ┌────────────┐ ┌────────────────────────┐
54/// │Not spawned │◄─5┤Not spawned|Run enqueued│
55/// │ ├6─►│ │
56/// └─────┬──────┘ └──────▲─────────────────┘
57/// 1 │
58/// │ ┌────────────┘
59/// │ 4
60/// ┌─────▼────┴─────────┐
61/// │Spawned|Run enqueued│
62/// │ │
63/// └─────┬▲─────────────┘
64/// 2│
65/// │3
66/// ┌─────▼┴─────┐
67/// │ Spawned │
68/// │ │
69/// └────────────┘
70/// ```
71///
72/// Transitions:
73/// - 1: Task is spawned - `AvailableTask::claim -> Executor::spawn`
74/// - 2: During poll - `RunQueue::dequeue_all -> State::run_dequeue`
75/// - 3: Task wakes itself, waker wakes task, or task exits - `Waker::wake -> wake_task -> State::run_enqueue`
76/// - 4: A run-queued task exits - `TaskStorage::poll -> Poll::Ready`
77/// - 5: Task is dequeued. The task's future is not polled, because exiting the task replaces its `poll_fn`.
78/// - 6: A task is waken when it is not spawned - `wake_task -> State::run_enqueue`
79pub(crate) struct TaskHeader {
80 pub(crate) state: State,
81 pub(crate) run_queue_item: RunQueueItem,
82 pub(crate) executor: AtomicPtr<SyncExecutor>,
83 poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>,
84
85 /// Integrated timer queue storage. This field should not be accessed outside of the timer queue.
86 pub(crate) timer_queue_item: timer_queue::TimerQueueItem,
87}
88
89/// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased.
90#[derive(Clone, Copy, PartialEq)]
91pub struct TaskRef {
92 ptr: NonNull<TaskHeader>,
93}
94
95unsafe impl Send for TaskRef where &'static TaskHeader: Send {}
96unsafe impl Sync for TaskRef where &'static TaskHeader: Sync {}
97
98impl TaskRef {
99 fn new<F: Future + 'static>(task: &'static TaskStorage<F>) -> Self {
100 Self {
101 ptr: NonNull::from(task).cast(),
102 }
103 }
104
105 /// Safety: The pointer must have been obtained with `Task::as_ptr`
106 pub(crate) unsafe fn from_ptr(ptr: *const TaskHeader) -> Self {
107 Self {
108 ptr: NonNull::new_unchecked(ptr as *mut TaskHeader),
109 }
110 }
111
112 /// # Safety
113 ///
114 /// The result of this function must only be compared
115 /// for equality, or stored, but not used.
116 pub const unsafe fn dangling() -> Self {
117 Self {
118 ptr: NonNull::dangling(),
119 }
120 }
121
122 pub(crate) fn header(self) -> &'static TaskHeader {
123 unsafe { self.ptr.as_ref() }
124 }
125
126 /// Returns a reference to the executor that the task is currently running on.
127 pub unsafe fn executor(self) -> Option<&'static Executor> {
128 let executor = self.header().executor.load(Ordering::Relaxed);
129 executor.as_ref().map(|e| Executor::wrap(e))
130 }
131
132 /// Returns a reference to the timer queue item.
133 pub fn timer_queue_item(&self) -> &'static timer_queue::TimerQueueItem {
134 &self.header().timer_queue_item
135 }
136
137 /// The returned pointer is valid for the entire TaskStorage.
138 pub(crate) fn as_ptr(self) -> *const TaskHeader {
139 self.ptr.as_ptr()
140 }
141}
142
143/// Raw storage in which a task can be spawned.
144///
145/// This struct holds the necessary memory to spawn one task whose future is `F`.
146/// At a given time, the `TaskStorage` may be in spawned or not-spawned state. You
147/// may spawn it with [`TaskStorage::spawn()`], which will fail if it is already spawned.
148///
149/// A `TaskStorage` must live forever, it may not be deallocated even after the task has finished
150/// running. Hence the relevant methods require `&'static self`. It may be reused, however.
151///
152/// Internally, the [embassy_executor::task](embassy_executor_macros::task) macro allocates an array of `TaskStorage`s
153/// in a `static`. The most common reason to use the raw `Task` is to have control of where
154/// the memory for the task is allocated: on the stack, or on the heap with e.g. `Box::leak`, etc.
155
156// repr(C) is needed to guarantee that the Task is located at offset 0
157// This makes it safe to cast between TaskHeader and TaskStorage pointers.
158#[repr(C)]
159pub struct TaskStorage<F: Future + 'static> {
160 raw: TaskHeader,
161 future: UninitCell<F>, // Valid if STATE_SPAWNED
162}
163
164unsafe fn poll_exited(_p: TaskRef) {
165 // Nothing to do, the task is already !SPAWNED and dequeued.
166}
167
168impl<F: Future + 'static> TaskStorage<F> {
169 const NEW: Self = Self::new();
170
171 /// Create a new TaskStorage, in not-spawned state.
172 pub const fn new() -> Self {
173 Self {
174 raw: TaskHeader {
175 state: State::new(),
176 run_queue_item: RunQueueItem::new(),
177 executor: AtomicPtr::new(core::ptr::null_mut()),
178 // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss`
179 poll_fn: SyncUnsafeCell::new(None),
180
181 timer_queue_item: timer_queue::TimerQueueItem::new(),
182 },
183 future: UninitCell::uninit(),
184 }
185 }
186
187 /// Try to spawn the task.
188 ///
189 /// The `future` closure constructs the future. It's only called if spawning is
190 /// actually possible. It is a closure instead of a simple `future: F` param to ensure
191 /// the future is constructed in-place, avoiding a temporary copy in the stack thanks to
192 /// NRVO optimizations.
193 ///
194 /// This function will fail if the task is already spawned and has not finished running.
195 /// In this case, the error is delayed: a "poisoned" SpawnToken is returned, which will
196 /// cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error.
197 ///
198 /// Once the task has finished running, you may spawn it again. It is allowed to spawn it
199 /// on a different executor.
200 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> {
201 let task = AvailableTask::claim(self);
202 match task {
203 Some(task) => task.initialize(future),
204 None => SpawnToken::new_failed(),
205 }
206 }
207
208 unsafe fn poll(p: TaskRef) {
209 let this = &*p.as_ptr().cast::<TaskStorage<F>>();
210
211 let future = Pin::new_unchecked(this.future.as_mut());
212 let waker = waker::from_task(p);
213 let mut cx = Context::from_waker(&waker);
214 match future.poll(&mut cx) {
215 Poll::Ready(_) => {
216 // As the future has finished and this function will not be called
217 // again, we can safely drop the future here.
218 this.future.drop_in_place();
219
220 // We replace the poll_fn with a despawn function, so that the task is cleaned up
221 // when the executor polls it next.
222 this.raw.poll_fn.set(Some(poll_exited));
223
224 // Make sure we despawn last, so that other threads can only spawn the task
225 // after we're done with it.
226 this.raw.state.despawn();
227 }
228 Poll::Pending => {}
229 }
230
231 // the compiler is emitting a virtual call for waker drop, but we know
232 // it's a noop for our waker.
233 mem::forget(waker);
234 }
235
236 #[doc(hidden)]
237 #[allow(dead_code)]
238 fn _assert_sync(self) {
239 fn assert_sync<T: Sync>(_: T) {}
240
241 assert_sync(self)
242 }
243}
244
245/// An uninitialized [`TaskStorage`].
246pub struct AvailableTask<F: Future + 'static> {
247 task: &'static TaskStorage<F>,
248}
249
250impl<F: Future + 'static> AvailableTask<F> {
251 /// Try to claim a [`TaskStorage`].
252 ///
253 /// This function returns `None` if a task has already been spawned and has not finished running.
254 pub fn claim(task: &'static TaskStorage<F>) -> Option<Self> {
255 task.raw.state.spawn().then(|| Self { task })
256 }
257
258 fn initialize_impl<S>(self, future: impl FnOnce() -> F) -> SpawnToken<S> {
259 unsafe {
260 self.task.raw.poll_fn.set(Some(TaskStorage::<F>::poll));
261 self.task.future.write_in_place(future);
262
263 let task = TaskRef::new(self.task);
264
265 SpawnToken::new(task)
266 }
267 }
268
269 /// Initialize the [`TaskStorage`] to run the given future.
270 pub fn initialize(self, future: impl FnOnce() -> F) -> SpawnToken<F> {
271 self.initialize_impl::<F>(future)
272 }
273
274 /// Initialize the [`TaskStorage`] to run the given future.
275 ///
276 /// # Safety
277 ///
278 /// `future` must be a closure of the form `move || my_async_fn(args)`, where `my_async_fn`
279 /// is an `async fn`, NOT a hand-written `Future`.
280 #[doc(hidden)]
281 pub unsafe fn __initialize_async_fn<FutFn>(self, future: impl FnOnce() -> F) -> SpawnToken<FutFn> {
282 // When send-spawning a task, we construct the future in this thread, and effectively
283 // "send" it to the executor thread by enqueuing it in its queue. Therefore, in theory,
284 // send-spawning should require the future `F` to be `Send`.
285 //
286 // The problem is this is more restrictive than needed. Once the future is executing,
287 // it is never sent to another thread. It is only sent when spawning. It should be
288 // enough for the task's arguments to be Send. (and in practice it's super easy to
289 // accidentally make your futures !Send, for example by holding an `Rc` or a `&RefCell` across an `.await`.)
290 //
291 // We can do it by sending the task args and constructing the future in the executor thread
292 // on first poll. However, this cannot be done in-place, so it'll waste stack space for a copy
293 // of the args.
294 //
295 // Luckily, an `async fn` future contains just the args when freshly constructed. So, if the
296 // args are Send, it's OK to send a !Send future, as long as we do it before first polling it.
297 //
298 // (Note: this is how the generators are implemented today, it's not officially guaranteed yet,
299 // but it's possible it'll be guaranteed in the future. See zulip thread:
300 // https://rust-lang.zulipchat.com/#narrow/stream/187312-wg-async/topic/.22only.20before.20poll.22.20Send.20futures )
301 //
302 // The `FutFn` captures all the args, so if it's Send, the task can be send-spawned.
303 // This is why we return `SpawnToken<FutFn>` below.
304 //
305 // This ONLY holds for `async fn` futures. The other `spawn` methods can be called directly
306 // by the user, with arbitrary hand-implemented futures. This is why these return `SpawnToken<F>`.
307 self.initialize_impl::<FutFn>(future)
308 }
309}
310
311/// Raw storage that can hold up to N tasks of the same type.
312///
313/// This is essentially a `[TaskStorage<F>; N]`.
314pub struct TaskPool<F: Future + 'static, const N: usize> {
315 pool: [TaskStorage<F>; N],
316}
317
318impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
319 /// Create a new TaskPool, with all tasks in non-spawned state.
320 pub const fn new() -> Self {
321 Self {
322 pool: [TaskStorage::NEW; N],
323 }
324 }
325
326 fn spawn_impl<T>(&'static self, future: impl FnOnce() -> F) -> SpawnToken<T> {
327 match self.pool.iter().find_map(AvailableTask::claim) {
328 Some(task) => task.initialize_impl::<T>(future),
329 None => SpawnToken::new_failed(),
330 }
331 }
332
333 /// Try to spawn a task in the pool.
334 ///
335 /// See [`TaskStorage::spawn()`] for details.
336 ///
337 /// This will loop over the pool and spawn the task in the first storage that
338 /// is currently free. If none is free, a "poisoned" SpawnToken is returned,
339 /// which will cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error.
340 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> {
341 self.spawn_impl::<F>(future)
342 }
343
344 /// Like spawn(), but allows the task to be send-spawned if the args are Send even if
345 /// the future is !Send.
346 ///
347 /// Not covered by semver guarantees. DO NOT call this directly. Intended to be used
348 /// by the Embassy macros ONLY.
349 ///
350 /// SAFETY: `future` must be a closure of the form `move || my_async_fn(args)`, where `my_async_fn`
351 /// is an `async fn`, NOT a hand-written `Future`.
352 #[doc(hidden)]
353 pub unsafe fn _spawn_async_fn<FutFn>(&'static self, future: FutFn) -> SpawnToken<impl Sized>
354 where
355 FutFn: FnOnce() -> F,
356 {
357 // See the comment in AvailableTask::__initialize_async_fn for explanation.
358 self.spawn_impl::<FutFn>(future)
359 }
360}
361
362#[derive(Clone, Copy)]
363pub(crate) struct Pender(*mut ());
364
365unsafe impl Send for Pender {}
366unsafe impl Sync for Pender {}
367
368impl Pender {
369 pub(crate) fn pend(self) {
370 extern "Rust" {
371 fn __pender(context: *mut ());
372 }
373 unsafe { __pender(self.0) };
374 }
375}
376
377pub(crate) struct SyncExecutor {
378 run_queue: RunQueue,
379 pender: Pender,
380}
381
382impl SyncExecutor {
383 pub(crate) fn new(pender: Pender) -> Self {
384 Self {
385 run_queue: RunQueue::new(),
386 pender,
387 }
388 }
389
390 /// Enqueue a task in the task queue
391 ///
392 /// # Safety
393 /// - `task` must be a valid pointer to a spawned task.
394 /// - `task` must be set up to run in this executor.
395 /// - `task` must NOT be already enqueued (in this executor or another one).
396 #[inline(always)]
397 unsafe fn enqueue(&self, task: TaskRef, l: state::Token) {
398 #[cfg(feature = "trace")]
399 trace::task_ready_begin(self, &task);
400
401 if self.run_queue.enqueue(task, l) {
402 self.pender.pend();
403 }
404 }
405
406 pub(super) unsafe fn spawn(&'static self, task: TaskRef) {
407 task.header()
408 .executor
409 .store((self as *const Self).cast_mut(), Ordering::Relaxed);
410
411 #[cfg(feature = "trace")]
412 trace::task_new(self, &task);
413
414 state::locked(|l| {
415 self.enqueue(task, l);
416 })
417 }
418
419 /// # Safety
420 ///
421 /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created.
422 pub(crate) unsafe fn poll(&'static self) {
423 self.run_queue.dequeue_all(|p| {
424 let task = p.header();
425
426 #[cfg(feature = "trace")]
427 trace::task_exec_begin(self, &p);
428
429 // Run the task
430 task.poll_fn.get().unwrap_unchecked()(p);
431
432 #[cfg(feature = "trace")]
433 trace::task_exec_end(self, &p);
434 });
435
436 #[cfg(feature = "trace")]
437 trace::executor_idle(self)
438 }
439}
440
441/// Raw executor.
442///
443/// This is the core of the Embassy executor. It is low-level, requiring manual
444/// handling of wakeups and task polling. If you can, prefer using one of the
445/// [higher level executors](crate::Executor).
446///
447/// The raw executor leaves it up to you to handle wakeups and scheduling:
448///
449/// - To get the executor to do work, call `poll()`. This will poll all queued tasks (all tasks
450/// that "want to run").
451/// - You must supply a pender function, as shown below. The executor will call it to notify you
452/// it has work to do. You must arrange for `poll()` to be called as soon as possible.
453/// - Enabling `arch-xx` features will define a pender function for you. This means that you
454/// are limited to using the executors provided to you by the architecture/platform
455/// implementation. If you need a different executor, you must not enable `arch-xx` features.
456///
457/// The pender can be called from *any* context: any thread, any interrupt priority
458/// level, etc. It may be called synchronously from any `Executor` method call as well.
459/// You must deal with this correctly.
460///
461/// In particular, you must NOT call `poll` directly from the pender callback, as this violates
462/// the requirement for `poll` to not be called reentrantly.
463///
464/// The pender function must be exported with the name `__pender` and have the following signature:
465///
466/// ```rust
467/// #[export_name = "__pender"]
468/// fn pender(context: *mut ()) {
469/// // schedule `poll()` to be called
470/// }
471/// ```
472///
473/// The `context` argument is a piece of arbitrary data the executor will pass to the pender.
474/// You can set the `context` when calling [`Executor::new()`]. You can use it to, for example,
475/// differentiate between executors, or to pass a pointer to a callback that should be called.
476#[repr(transparent)]
477pub struct Executor {
478 pub(crate) inner: SyncExecutor,
479
480 _not_sync: PhantomData<*mut ()>,
481}
482
483impl Executor {
484 pub(crate) unsafe fn wrap(inner: &SyncExecutor) -> &Self {
485 mem::transmute(inner)
486 }
487
488 /// Create a new executor.
489 ///
490 /// When the executor has work to do, it will call the pender function and pass `context` to it.
491 ///
492 /// See [`Executor`] docs for details on the pender.
493 pub fn new(context: *mut ()) -> Self {
494 Self {
495 inner: SyncExecutor::new(Pender(context)),
496 _not_sync: PhantomData,
497 }
498 }
499
500 /// Spawn a task in this executor.
501 ///
502 /// # Safety
503 ///
504 /// `task` must be a valid pointer to an initialized but not-already-spawned task.
505 ///
506 /// It is OK to use `unsafe` to call this from a thread that's not the executor thread.
507 /// In this case, the task's Future must be Send. This is because this is effectively
508 /// sending the task to the executor thread.
509 pub(super) unsafe fn spawn(&'static self, task: TaskRef) {
510 self.inner.spawn(task)
511 }
512
513 /// Poll all queued tasks in this executor.
514 ///
515 /// This loops over all tasks that are queued to be polled (i.e. they're
516 /// freshly spawned or they've been woken). Other tasks are not polled.
517 ///
518 /// You must call `poll` after receiving a call to the pender. It is OK
519 /// to call `poll` even when not requested by the pender, but it wastes
520 /// energy.
521 ///
522 /// # Safety
523 ///
524 /// You must call `initialize` before calling this method.
525 ///
526 /// You must NOT call `poll` reentrantly on the same executor.
527 ///
528 /// In particular, note that `poll` may call the pender synchronously. Therefore, you
529 /// must NOT directly call `poll()` from the pender callback. Instead, the callback has to
530 /// somehow schedule for `poll()` to be called later, at a time you know for sure there's
531 /// no `poll()` already running.
532 pub unsafe fn poll(&'static self) {
533 self.inner.poll()
534 }
535
536 /// Get a spawner that spawns tasks in this executor.
537 ///
538 /// It is OK to call this method multiple times to obtain multiple
539 /// `Spawner`s. You may also copy `Spawner`s.
540 pub fn spawner(&'static self) -> super::Spawner {
541 super::Spawner::new(self)
542 }
543}
544
545/// Wake a task by `TaskRef`.
546///
547/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`].
548pub fn wake_task(task: TaskRef) {
549 let header = task.header();
550 header.state.run_enqueue(|l| {
551 // We have just marked the task as scheduled, so enqueue it.
552 unsafe {
553 let executor = header.executor.load(Ordering::Relaxed).as_ref().unwrap_unchecked();
554 executor.enqueue(task, l);
555 }
556 });
557}
558
559/// Wake a task by `TaskRef` without calling pend.
560///
561/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`].
562pub fn wake_task_no_pend(task: TaskRef) {
563 let header = task.header();
564 header.state.run_enqueue(|l| {
565 // We have just marked the task as scheduled, so enqueue it.
566 unsafe {
567 let executor = header.executor.load(Ordering::Relaxed).as_ref().unwrap_unchecked();
568 executor.run_queue.enqueue(task, l);
569 }
570 });
571}