#[cfg(feature = "esp-radio")]
use core::ffi::c_void;
use core::{
cell::{RefCell, RefMut},
ptr::NonNull,
};
#[cfg(feature = "alloc")]
use allocator_api2::boxed::Box;
use embassy_sync::blocking_mutex::Mutex;
use esp_hal::{system::Cpu, time::Instant};
use esp_sync::RawMutex;
use macros::ram;
#[cfg(feature = "alloc")]
use crate::InternalMemory;
#[cfg(feature = "rtos-trace")]
use crate::TraceEvents;
#[cfg(feature = "embassy")]
use crate::timer::embassy::TimerQueue;
use crate::{
run_queue::{Priority, RunQueue},
task::{
self,
ContextExt,
CpuContext,
IdleFn,
Task,
TaskAllocListElement,
TaskDeleteListElement,
TaskExt,
TaskList,
TaskListItem,
TaskPtr,
TaskState,
ThreadLocalData,
read_thread_pointer,
},
timer::TimeDriver,
};
pub(crate) struct SchedulerState {
pub(crate) all_tasks: TaskList<TaskAllocListElement>,
pub(crate) run_queue: RunQueue,
pub(crate) to_delete: TaskList<TaskDeleteListElement>,
pub(crate) time_driver: Option<TimeDriver>,
pub(crate) per_cpu: [CpuState; Cpu::COUNT],
}
pub(crate) struct CpuState {
pub(crate) initialized: bool,
idle_context: CpuContext,
#[cfg(multi_core)]
current_task: *mut Task,
pub(crate) main_task: Task,
}
impl CpuState {
const fn new() -> Self {
Self {
initialized: false,
idle_context: CpuContext::new(),
#[cfg(multi_core)]
current_task: core::ptr::null_mut(),
main_task: Task {
cpu_context: CpuContext::new(),
thread_local: ThreadLocalData::new(),
state: TaskState::Ready,
stack: core::ptr::slice_from_raw_parts_mut(core::ptr::null_mut(), 0),
#[cfg(any(hw_task_overflow_detection, sw_task_overflow_detection))]
stack_guard: core::ptr::null_mut(),
#[cfg(sw_task_overflow_detection)]
stack_guard_value: 0,
#[cfg(feature = "esp-radio")]
current_wait_queue: None,
priority: Priority::ZERO,
#[cfg(multi_core)]
pinned_to: None,
wakeup_at: 0,
in_run_or_wait_queue: false,
timer_queued: false,
alloc_list_item: TaskListItem::None,
ready_queue_item: TaskListItem::None,
timer_queue_item: TaskListItem::None,
delete_list_item: TaskListItem::None,
#[cfg(feature = "alloc")]
heap_allocated: false,
},
}
}
}
unsafe impl Send for SchedulerState {}
impl SchedulerState {
const fn new() -> Self {
Self {
all_tasks: TaskList::new(),
run_queue: RunQueue::new(),
to_delete: TaskList::new(),
time_driver: None,
per_cpu: [const { CpuState::new() }; Cpu::COUNT],
}
}
#[cfg(multi_core)]
pub(crate) fn priority_of_core(per_cpu: &[CpuState], core: usize) -> Priority {
unsafe { per_cpu[core].current_task.as_ref() }
.map(|task| task.priority)
.unwrap_or(Priority::ZERO)
}
#[inline]
#[cfg(multi_core)]
pub(crate) fn set_current_task(&mut self, cpu: Cpu, task: Option<TaskPtr>) {
self.per_cpu[cpu as usize].current_task =
task.map(|task| task.as_ptr()).unwrap_or_default();
}
#[inline]
#[cfg(multi_core)]
pub(crate) fn try_get_current_task(&self, cpu: Cpu) -> Option<TaskPtr> {
NonNull::new(self.per_cpu[cpu as usize].current_task)
}
pub(crate) fn setup(&mut self, time_driver: TimeDriver, idle_hook: IdleFn) {
assert!(
self.time_driver.is_none(),
"The scheduler has already been started"
);
self.time_driver = Some(time_driver);
for cpu in 0..Cpu::COUNT {
task::set_idle_hook_entry(&mut self.per_cpu[cpu].idle_context, idle_hook);
}
}
#[cfg(feature = "esp-radio")]
pub(crate) fn create_task(
&mut self,
name: &str,
task: extern "C" fn(*mut c_void),
param: *mut c_void,
task_stack_size: usize,
priority: usize,
pinned_to: Option<Cpu>,
) -> TaskPtr {
if let Some(cpu) = pinned_to {
assert!(
self.per_cpu[cpu as usize].initialized,
"Cannot create task on uninitialized CPU"
);
}
let mut task = Box::new_in(
Task::new(name, task, param, task_stack_size, priority, pinned_to),
InternalMemory,
);
task.heap_allocated = true;
let mut task_ptr = NonNull::from(Box::leak(task));
unsafe {
task_ptr
.as_mut()
.cpu_context
.set_tp(task_ptr.as_ptr() as u32)
};
#[cfg(feature = "rtos-trace")]
rtos_trace::trace::task_new(task_ptr.rtos_trace_id());
self.all_tasks.push(task_ptr);
let run_scheduler = self.run_queue.mark_task_ready(&self.per_cpu, task_ptr);
task::trigger_scheduler(run_scheduler);
debug!("Task '{}' created: {:?}", name, task_ptr);
task_ptr
}
#[cold]
#[inline(never)]
fn delete_marked_tasks(&mut self) {
let mut to_delete = core::mem::take(&mut self.to_delete);
while let Some(task_ptr) = to_delete.pop() {
assert!(task_ptr.state() == TaskState::Deleted);
#[cfg(multi_core)]
if Cpu::other()
.filter_map(|cpu| self.try_get_current_task(cpu))
.any(|task| task == task_ptr)
{
self.to_delete.push(task_ptr);
continue;
}
trace!("delete_marked_tasks {:?}", task_ptr);
self.delete_task(task_ptr);
}
}
fn run_scheduler(&mut self, task_switch: impl FnOnce(*mut CpuContext, *mut CpuContext)) {
#[cfg(feature = "rtos-trace")]
rtos_trace::trace::marker_begin(TraceEvents::RunSchedule as u32);
if !self.to_delete.is_empty() {
self.delete_marked_tasks();
}
let cpu = Cpu::current();
let current_cpu = cpu as usize;
let current_sp: u32;
cfg_if::cfg_if! {
if #[cfg(xtensa)] {
unsafe { core::arch::asm!("mov {0}, sp", out(reg) current_sp); }
} else {
unsafe { core::arch::asm!("mv {0}, sp", out(reg) current_sp); }
}
}
let current_task = NonNull::new(read_thread_pointer());
if let Some(current_task) = current_task {
unsafe {
current_task
.as_ref()
.ensure_no_stack_overflow(current_sp as usize)
};
if current_task.state() == TaskState::Ready {
debug!("re-queueing current task: {:?}", current_task);
self.run_queue.mark_task_ready(&self.per_cpu, current_task);
}
};
let mut arm_next_timeslice_tick = false;
let next_task = self.run_queue.pop();
if next_task != current_task {
debug!("Switching task {:?} -> {:?}", current_task, next_task);
let current_context = if let Some(current) = current_task {
#[cfg(feature = "rtos-trace")]
rtos_trace::trace::task_exec_end();
#[cfg(multi_core)]
let current_ref = unsafe { current.as_ref() };
#[cfg(multi_core)]
if current_ref.pinned_to.is_none()
&& current_ref.priority
>= Self::priority_of_core(&self.per_cpu, 1 - current_cpu)
{
task::schedule_other_core();
}
unsafe { &raw mut (*current.as_ptr()).cpu_context }
} else {
core::ptr::null_mut()
};
let next_context = if let Some(next) = next_task {
#[cfg(feature = "rtos-trace")]
rtos_trace::trace::task_exec_begin(next.rtos_trace_id());
unsafe { next.as_ref().set_up_stack_watchpoint() };
let new_core_priority = next.priority(&mut self.run_queue);
arm_next_timeslice_tick = !self.run_queue.is_level_empty(new_core_priority);
unsafe { &raw mut (*next.as_ptr()).cpu_context }
} else {
let idle_sp = if current_context
== &raw mut self.per_cpu[current_cpu].main_task.cpu_context
{
current_sp
} else {
self.per_cpu[current_cpu]
.main_task
.set_up_stack_watchpoint();
self.per_cpu[current_cpu].main_task.cpu_context.sp()
};
self.per_cpu[current_cpu].idle_context.set_sp(idle_sp);
#[cfg(feature = "rtos-trace")]
rtos_trace::trace::system_idle();
&raw mut self.per_cpu[current_cpu].idle_context
};
task_switch(current_context, next_context);
#[cfg(multi_core)]
self.set_current_task(cpu, next_task);
}
let time_driver = unwrap!(self.time_driver.as_mut());
let now = crate::now();
time_driver.set_time_slice(cpu, now, arm_next_timeslice_tick);
time_driver.arm_next_wakeup(now);
#[cfg(feature = "rtos-trace")]
rtos_trace::trace::marker_end(TraceEvents::RunSchedule as u32);
}
pub(crate) fn switch_task(&mut self, #[cfg(xtensa)] trap_frame: &mut CpuContext) {
self.run_scheduler(|current_context, next_context| {
trace!(
"Task switch: {:x} -> {:x}",
current_context as usize, next_context as usize
);
task::task_switch(
current_context,
next_context,
#[cfg(xtensa)]
trap_frame,
)
});
}
#[cfg(feature = "esp-radio")]
pub(crate) fn schedule_task_deletion(&mut self, task_to_delete: Option<TaskPtr>) -> bool {
let current_task = SCHEDULER.current_task();
let task_to_delete = task_to_delete.unwrap_or(current_task);
let is_current = task_to_delete == current_task;
self.remove_from_all_queues(task_to_delete);
if is_current {
if task_to_delete.state() != TaskState::Deleted {
self.to_delete.push(task_to_delete);
task_to_delete.set_state(TaskState::Deleted);
}
crate::task::write_thread_pointer(core::ptr::null_mut());
} else {
self.delete_task(task_to_delete);
}
is_current
}
pub(crate) fn sleep_task_until(&mut self, task: TaskPtr, at: Instant) -> bool {
let timer_queue = unwrap!(self.time_driver.as_mut());
timer_queue.schedule_wakeup(task, at)
}
#[ram]
pub(crate) fn resume_task(&mut self, task: TaskPtr) {
let timer_queue = unwrap!(self.time_driver.as_mut());
timer_queue.timer_queue.remove(task);
let run_scheduler = self.run_queue.mark_task_ready(&self.per_cpu, task);
task::trigger_scheduler(run_scheduler);
}
fn delete_task(&mut self, mut to_delete: TaskPtr) {
unsafe {
cfg_if::cfg_if! {
if #[cfg(xtensa)] {
let saved_sp = to_delete.as_ref().cpu_context.A1 as usize;
} else {
let saved_sp = to_delete.as_ref().cpu_context.sp;
}
}
to_delete.as_ref().ensure_no_stack_overflow(saved_sp)
};
debug!("Dropping task: {:x}", to_delete.as_ptr() as usize);
unsafe {
#[cfg(feature = "alloc")]
if to_delete.as_ref().heap_allocated {
let task = Box::from_raw_in(to_delete.as_ptr(), InternalMemory);
core::mem::drop(task);
return;
}
core::ptr::drop_in_place(to_delete.as_mut());
}
}
#[cfg(feature = "esp-radio")]
fn remove_from_all_queues(&mut self, mut task: TaskPtr) {
self.all_tasks.remove(task);
unwrap!(self.time_driver.as_mut()).timer_queue.remove(task);
if let Some(mut containing_queue) = unsafe { task.as_mut().current_wait_queue.take() } {
unsafe { containing_queue.as_mut().remove(task) };
} else {
self.run_queue.remove(task);
}
}
pub(crate) fn set_priority(&mut self, mut task: TaskPtr, new_priority: Priority) {
let task_in_run_queue = {
let task = unsafe { task.as_ref() };
let in_queue = task.in_run_or_wait_queue;
cfg_if::cfg_if! {
if #[cfg(feature = "esp-radio")] {
let in_waitqueue = task.current_wait_queue.is_some();
} else {
let in_waitqueue = false;
}
}
in_queue && !in_waitqueue
};
if task_in_run_queue {
self.run_queue.remove(task);
}
{
let task = unsafe { task.as_mut() };
task.priority = new_priority;
}
if task_in_run_queue {
self.resume_task(task);
}
}
}
pub(crate) struct GlobalState {
pub scheduler: RefCell<SchedulerState>,
#[cfg(feature = "embassy")]
pub embassy_timer_queue: RefCell<TimerQueue>,
}
impl GlobalState {
const fn new() -> Self {
Self {
scheduler: RefCell::new(SchedulerState::new()),
#[cfg(feature = "embassy")]
embassy_timer_queue: RefCell::new(TimerQueue::new()),
}
}
pub fn scheduler(&self) -> RefMut<'_, SchedulerState> {
unwrap!(self.scheduler.try_borrow_mut())
}
#[cfg(feature = "embassy")]
pub fn embassy_timer_queue(&self) -> RefMut<'_, TimerQueue> {
unwrap!(self.embassy_timer_queue.try_borrow_mut())
}
}
pub(crate) struct Scheduler {
inner: Mutex<RawMutex, GlobalState>,
}
impl Scheduler {
const fn new() -> Self {
Self {
inner: Mutex::new(GlobalState::new()),
}
}
pub(crate) fn with<R>(&self, cb: impl FnOnce(&mut SchedulerState) -> R) -> R {
self.with_shared(|shared| cb(&mut shared.scheduler()))
}
pub(crate) fn with_shared<R>(&self, cb: impl FnOnce(&GlobalState) -> R) -> R {
self.inner.lock(cb)
}
pub(crate) fn current_task(&self) -> TaskPtr {
let tp = read_thread_pointer();
unwrap!(
TaskPtr::new(tp),
"The scheduler has not been started. Make sure to call `esp_rtos::init()` before trying to access the current task."
)
}
#[cfg(feature = "esp-radio")]
pub(crate) fn create_task(
&self,
name: &str,
task: extern "C" fn(*mut c_void),
param: *mut c_void,
task_stack_size: usize,
priority: u32,
pinned_to: Option<Cpu>,
) -> TaskPtr {
self.with(|state| {
state.create_task(
name,
task,
param,
task_stack_size,
priority as usize,
pinned_to,
)
})
}
pub(crate) fn sleep_until(&self, wake_at: Instant) -> bool {
self.with(|scheduler| {
let current_task = SCHEDULER.current_task();
if scheduler.sleep_task_until(current_task, wake_at) {
task::yield_task();
true
} else {
false
}
})
}
}
#[cfg(feature = "esp-radio")]
esp_radio_rtos_driver::register_scheduler_implementation!(pub(crate) static SCHEDULER: Scheduler = Scheduler::new());
#[cfg(not(feature = "esp-radio"))]
pub(crate) static SCHEDULER: Scheduler = Scheduler::new();
#[cfg(feature = "rtos-trace")]
impl rtos_trace::RtosTraceOSCallbacks for Scheduler {
fn task_list() {
SCHEDULER.with(|s| {
for task in s.all_tasks.iter() {
rtos_trace::trace::task_send_info(
task.rtos_trace_id(),
task.rtos_trace_info(&mut s.run_queue),
);
}
})
}
fn time() -> u64 {
crate::now()
}
}
#[cfg(feature = "rtos-trace")]
rtos_trace::global_os_callbacks!(Scheduler);