mod run_queue;
#[cfg_attr(all(cortex_m, target_has_atomic = "32"), path = "state_atomics_arm.rs")]
#[cfg_attr(
all(not(cortex_m), any(target_has_atomic = "8", target_has_atomic = "32")),
path = "state_atomics.rs"
)]
#[cfg_attr(
not(any(target_has_atomic = "8", target_has_atomic = "32")),
path = "state_critical_section.rs"
)]
mod state;
#[cfg(feature = "_any_trace")]
pub mod trace;
pub(crate) mod util;
#[cfg_attr(feature = "turbowakers", path = "waker_turbo.rs")]
mod waker;
#[cfg(feature = "scheduler-deadline")]
mod deadline;
use core::future::Future;
use core::marker::PhantomData;
use core::mem;
use core::pin::Pin;
use core::ptr::NonNull;
#[cfg(not(feature = "platform-avr"))]
use core::sync::atomic::AtomicPtr;
use core::sync::atomic::Ordering;
use core::task::{Context, Poll, Waker};
#[cfg(feature = "scheduler-deadline")]
pub(crate) use deadline::Deadline;
use embassy_executor_timer_queue::TimerQueueItem;
#[cfg(feature = "platform-avr")]
use portable_atomic::AtomicPtr;
use self::run_queue::{RunQueue, RunQueueItem};
use self::state::State;
use self::util::{SyncUnsafeCell, UninitCell};
pub use self::waker::task_from_waker;
use self::waker::try_task_from_waker;
use super::SpawnToken;
use crate::{Metadata, SpawnError};
#[unsafe(no_mangle)]
extern "Rust" fn __embassy_time_queue_item_from_waker(waker: &Waker) -> &'static mut TimerQueueItem {
unsafe { task_from_waker(waker).timer_queue_item() }
}
#[unsafe(no_mangle)]
extern "Rust" fn __try_embassy_time_queue_item_from_waker(waker: &Waker) -> Option<&'static mut TimerQueueItem> {
unsafe { try_task_from_waker(waker).map(|task| task.timer_queue_item()) }
}
pub(crate) struct TaskHeader {
pub(crate) state: State,
pub(crate) run_queue_item: RunQueueItem,
pub(crate) executor: AtomicPtr<SyncExecutor>,
poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>,
pub(crate) timer_queue_item: TimerQueueItem,
pub(crate) metadata: Metadata,
#[cfg(feature = "rtos-trace")]
all_tasks_next: AtomicPtr<TaskHeader>,
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct TaskRef {
ptr: NonNull<TaskHeader>,
}
unsafe impl Send for TaskRef where &'static TaskHeader: Send {}
unsafe impl Sync for TaskRef where &'static TaskHeader: Sync {}
impl TaskRef {
fn new<F: Future + 'static>(task: &'static TaskStorage<F>) -> Self {
Self {
ptr: NonNull::from(task).cast(),
}
}
pub(crate) unsafe fn from_ptr(ptr: *const TaskHeader) -> Self {
Self {
ptr: NonNull::new_unchecked(ptr as *mut TaskHeader),
}
}
pub(crate) fn header(self) -> &'static TaskHeader {
unsafe { self.ptr.as_ref() }
}
pub(crate) fn metadata(self) -> &'static Metadata {
unsafe { &self.ptr.as_ref().metadata }
}
pub unsafe fn executor(self) -> Option<&'static Executor> {
let executor = self.header().executor.load(Ordering::Relaxed);
executor.as_ref().map(|e| Executor::wrap(e))
}
pub unsafe fn timer_queue_item(mut self) -> &'static mut TimerQueueItem {
unsafe { &mut self.ptr.as_mut().timer_queue_item }
}
pub(crate) fn as_ptr(self) -> *const TaskHeader {
self.ptr.as_ptr()
}
pub fn id(&self) -> u32 {
self.as_ptr() as u32
}
}
#[repr(C)]
pub struct TaskStorage<F: Future + 'static> {
raw: TaskHeader,
future: UninitCell<F>, }
unsafe fn poll_exited(_p: TaskRef) {
}
impl<F: Future + 'static> TaskStorage<F> {
const NEW: Self = Self::new();
pub const fn new() -> Self {
Self {
raw: TaskHeader {
state: State::new(),
run_queue_item: RunQueueItem::new(),
executor: AtomicPtr::new(core::ptr::null_mut()),
poll_fn: SyncUnsafeCell::new(None),
timer_queue_item: TimerQueueItem::new(),
metadata: Metadata::new(),
#[cfg(feature = "rtos-trace")]
all_tasks_next: AtomicPtr::new(core::ptr::null_mut()),
},
future: UninitCell::uninit(),
}
}
pub fn spawn(&'static self, future: impl FnOnce() -> F) -> Result<SpawnToken<impl Sized>, SpawnError> {
let task = AvailableTask::claim(self);
match task {
Some(task) => Ok(task.initialize(future)),
None => Err(SpawnError::Busy),
}
}
unsafe fn poll(p: TaskRef) {
let this = &*p.as_ptr().cast::<TaskStorage<F>>();
let future = Pin::new_unchecked(this.future.as_mut());
let waker = waker::from_task(p);
let mut cx = Context::from_waker(&waker);
match future.poll(&mut cx) {
Poll::Ready(_) => {
#[cfg(feature = "_any_trace")]
let exec_ptr: *const SyncExecutor = this.raw.executor.load(Ordering::Relaxed);
this.future.drop_in_place();
this.raw.poll_fn.set(Some(poll_exited));
this.raw.state.despawn();
#[cfg(feature = "_any_trace")]
trace::task_end(exec_ptr, &p);
}
Poll::Pending => {}
}
mem::forget(waker);
}
#[doc(hidden)]
#[allow(dead_code)]
fn _assert_sync(self) {
fn assert_sync<T: Sync>(_: T) {}
assert_sync(self)
}
}
pub struct AvailableTask<F: Future + 'static> {
task: &'static TaskStorage<F>,
}
impl<F: Future + 'static> AvailableTask<F> {
pub fn claim(task: &'static TaskStorage<F>) -> Option<Self> {
task.raw.state.spawn().then(|| Self { task })
}
fn initialize_impl<S>(self, future: impl FnOnce() -> F) -> SpawnToken<S> {
unsafe {
self.task.raw.metadata.reset();
self.task.raw.poll_fn.set(Some(TaskStorage::<F>::poll));
self.task.future.write_in_place(future);
let task = TaskRef::new(self.task);
SpawnToken::new(task)
}
}
pub fn initialize(self, future: impl FnOnce() -> F) -> SpawnToken<F> {
self.initialize_impl::<F>(future)
}
#[doc(hidden)]
pub unsafe fn __initialize_async_fn<FutFn>(self, future: impl FnOnce() -> F) -> SpawnToken<FutFn> {
self.initialize_impl::<FutFn>(future)
}
}
pub struct TaskPool<F: Future + 'static, const N: usize> {
pool: [TaskStorage<F>; N],
}
impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
pub const fn new() -> Self {
Self {
pool: [TaskStorage::NEW; N],
}
}
fn spawn_impl<T>(&'static self, future: impl FnOnce() -> F) -> Result<SpawnToken<T>, SpawnError> {
match self.pool.iter().find_map(AvailableTask::claim) {
Some(task) => Ok(task.initialize_impl::<T>(future)),
None => Err(SpawnError::Busy),
}
}
pub fn spawn(&'static self, future: impl FnOnce() -> F) -> Result<SpawnToken<impl Sized>, SpawnError> {
self.spawn_impl::<F>(future)
}
#[doc(hidden)]
pub unsafe fn _spawn_async_fn<FutFn>(&'static self, future: FutFn) -> Result<SpawnToken<impl Sized>, SpawnError>
where
FutFn: FnOnce() -> F,
{
self.spawn_impl::<FutFn>(future)
}
}
#[derive(Clone, Copy)]
pub(crate) struct Pender(*mut ());
unsafe impl Send for Pender {}
unsafe impl Sync for Pender {}
impl Pender {
pub(crate) fn pend(self) {
unsafe extern "Rust" {
fn __pender(context: *mut ());
}
unsafe { __pender(self.0) };
}
}
pub(crate) struct SyncExecutor {
run_queue: RunQueue,
pender: Pender,
}
impl SyncExecutor {
pub(crate) fn new(pender: Pender) -> Self {
Self {
run_queue: RunQueue::new(),
pender,
}
}
#[inline(always)]
unsafe fn enqueue(&self, task: TaskRef, l: state::Token) {
#[cfg(feature = "_any_trace")]
trace::task_ready_begin(self, &task);
if self.run_queue.enqueue(task, l) {
self.pender.pend();
}
}
pub(super) unsafe fn spawn(&'static self, task: TaskRef) {
task.header()
.executor
.store((self as *const Self).cast_mut(), Ordering::Relaxed);
#[cfg(feature = "_any_trace")]
trace::task_new(self, &task);
state::locked(|l| {
self.enqueue(task, l);
})
}
pub(crate) unsafe fn poll(&'static self) {
#[cfg(feature = "_any_trace")]
trace::poll_start(self);
self.run_queue.dequeue_all(|p| {
let task = p.header();
#[cfg(feature = "_any_trace")]
trace::task_exec_begin(self, &p);
task.poll_fn.get().unwrap_unchecked()(p);
#[cfg(feature = "_any_trace")]
trace::task_exec_end(self, &p);
});
#[cfg(feature = "_any_trace")]
trace::executor_idle(self)
}
}
#[repr(transparent)]
pub struct Executor {
pub(crate) inner: SyncExecutor,
_not_sync: PhantomData<*mut ()>,
}
impl Executor {
pub(crate) unsafe fn wrap(inner: &SyncExecutor) -> &Self {
mem::transmute(inner)
}
pub fn new(context: *mut ()) -> Self {
Self {
inner: SyncExecutor::new(Pender(context)),
_not_sync: PhantomData,
}
}
pub(super) unsafe fn spawn(&'static self, task: TaskRef) {
self.inner.spawn(task)
}
pub unsafe fn poll(&'static self) {
self.inner.poll()
}
pub fn spawner(&'static self) -> super::Spawner {
super::Spawner::new(self)
}
pub fn id(&'static self) -> usize {
&self.inner as *const SyncExecutor as usize
}
}
pub fn wake_task(task: TaskRef) {
let header = task.header();
header.state.run_enqueue(|l| {
unsafe {
let executor = header.executor.load(Ordering::Relaxed).as_ref().unwrap_unchecked();
executor.enqueue(task, l);
}
});
}
pub fn wake_task_no_pend(task: TaskRef) {
let header = task.header();
header.state.run_enqueue(|l| {
unsafe {
let executor = header.executor.load(Ordering::Relaxed).as_ref().unwrap_unchecked();
executor.run_queue.enqueue(task, l);
}
});
}