#[cfg(feature = "smp")]
use alloc::sync::Weak;
use alloc::{collections::VecDeque, sync::Arc};
use core::mem::MaybeUninit;
use ax_hal::percpu::this_cpu_id;
use ax_kernel_guard::BaseGuard;
use ax_kspin::{SpinNoIrqGuard, SpinRaw};
use ax_lazyinit::LazyInit;
use ax_sched::BaseScheduler;
use crate::{
AxCpuMask, AxTaskRef, Scheduler, TaskInner, WaitQueue,
task::{CurrentTask, TaskState},
wait_queue::WaitQueueGuard,
};
macro_rules! percpu_static {
($(
$(#[$comment:meta])*
$name:ident: $ty:ty = $init:expr
),* $(,)?) => {
$(
$(#[$comment])*
#[ax_percpu::def_percpu]
static $name: $ty = $init;
)*
};
}
percpu_static! {
RUN_QUEUE: LazyInit<AxRunQueue> = LazyInit::new(),
EXITED_TASKS: VecDeque<AxTaskRef> = VecDeque::new(),
WAIT_FOR_EXIT: WaitQueue = WaitQueue::new(),
IDLE_TASK: LazyInit<AxTaskRef> = LazyInit::new(),
#[cfg(feature = "smp")]
PREV_TASK: Weak<crate::AxTask> = Weak::new(),
}
static mut RUN_QUEUES: [MaybeUninit<&'static mut AxRunQueue>; ax_config::plat::MAX_CPU_NUM] =
[ARRAY_REPEAT_VALUE; ax_config::plat::MAX_CPU_NUM];
#[allow(clippy::declare_interior_mutable_const)] const ARRAY_REPEAT_VALUE: MaybeUninit<&'static mut AxRunQueue> = MaybeUninit::uninit();
#[inline(always)]
pub(crate) fn current_run_queue<G: BaseGuard>() -> CurrentRunQueueRef<'static, G> {
let irq_state = G::acquire();
CurrentRunQueueRef {
inner: unsafe { RUN_QUEUE.current_ref_mut_raw() },
current_task: crate::current(),
state: irq_state,
_phantom: core::marker::PhantomData,
}
}
#[cfg(feature = "smp")]
#[allow(clippy::modulo_one)]
#[inline]
fn select_run_queue_index(cpumask: AxCpuMask) -> usize {
use core::sync::atomic::{AtomicUsize, Ordering};
static RUN_QUEUE_INDEX: AtomicUsize = AtomicUsize::new(0);
assert!(!cpumask.is_empty(), "No available CPU for task execution");
loop {
let index = RUN_QUEUE_INDEX.fetch_add(1, Ordering::SeqCst) % ax_config::plat::MAX_CPU_NUM;
if cpumask.get(index) {
return index;
}
}
}
#[cfg(feature = "smp")]
#[inline]
fn get_run_queue(index: usize) -> &'static mut AxRunQueue {
unsafe { RUN_QUEUES[index].assume_init_mut() }
}
#[inline]
pub(crate) fn select_run_queue<G: BaseGuard>(task: &AxTaskRef) -> AxRunQueueRef<'static, G> {
let irq_state = G::acquire();
#[cfg(not(feature = "smp"))]
{
let _ = task;
AxRunQueueRef {
inner: unsafe { RUN_QUEUE.current_ref_mut_raw() },
state: irq_state,
_phantom: core::marker::PhantomData,
}
}
#[cfg(feature = "smp")]
{
let index = select_run_queue_index(task.cpumask());
AxRunQueueRef {
inner: get_run_queue(index),
state: irq_state,
_phantom: core::marker::PhantomData,
}
}
}
pub(crate) struct AxRunQueue {
cpu_id: usize,
scheduler: SpinRaw<Scheduler>,
}
pub(crate) struct AxRunQueueRef<'a, G: BaseGuard> {
inner: &'a mut AxRunQueue,
state: G::State,
_phantom: core::marker::PhantomData<G>,
}
impl<G: BaseGuard> Drop for AxRunQueueRef<'_, G> {
fn drop(&mut self) {
G::release(self.state);
}
}
pub(crate) struct CurrentRunQueueRef<'a, G: BaseGuard> {
inner: &'a mut AxRunQueue,
current_task: CurrentTask,
state: G::State,
_phantom: core::marker::PhantomData<G>,
}
impl<G: BaseGuard> Drop for CurrentRunQueueRef<'_, G> {
fn drop(&mut self) {
G::release(self.state);
}
}
impl<G: BaseGuard> AxRunQueueRef<'_, G> {
pub fn add_task(&mut self, task: AxTaskRef) {
debug!(
"task add: {} on run_queue {}",
task.id_name(),
self.inner.cpu_id
);
assert!(task.is_ready());
self.inner.scheduler.lock().add_task(task);
}
pub fn unblock_task(&mut self, task: AxTaskRef, resched: bool) {
let task_id_name = task.id_name();
if self
.inner
.put_task_with_state(task, TaskState::Blocked, resched)
{
let cpu_id = self.inner.cpu_id;
debug!("task unblock: {task_id_name} on run_queue {cpu_id}");
if resched && cpu_id == this_cpu_id() {
#[cfg(feature = "preempt")]
crate::current().set_preempt_pending(true);
}
}
}
}
impl<G: BaseGuard> CurrentRunQueueRef<'_, G> {
#[cfg(feature = "irq")]
pub fn scheduler_timer_tick(&mut self) {
let curr = &self.current_task;
if !curr.is_idle() && self.inner.scheduler.lock().task_tick(curr) {
#[cfg(feature = "preempt")]
curr.set_preempt_pending(true);
}
}
pub fn yield_current(&mut self) {
let curr = &self.current_task;
trace!("task yield: {}", curr.id_name());
assert!(curr.is_running());
self.inner
.put_task_with_state(curr.clone(), TaskState::Running, false);
self.inner.resched();
}
#[cfg(feature = "smp")]
pub fn migrate_current(&mut self, migration_task: AxTaskRef) {
let curr = &self.current_task;
trace!("task migrate: {}", curr.id_name());
assert!(curr.is_running());
curr.set_state(TaskState::Ready);
self.inner.switch_to(crate::current(), migration_task);
}
#[cfg(feature = "preempt")]
pub fn preempt_resched(&mut self) {
let curr = &self.current_task;
assert!(curr.is_running());
let can_preempt = curr.can_preempt(1);
trace!(
"current task is to be preempted: {}, allow={}",
curr.id_name(),
can_preempt
);
if can_preempt {
self.inner
.put_task_with_state(curr.clone(), TaskState::Running, true);
self.inner.resched();
} else {
curr.set_preempt_pending(true);
}
}
pub fn exit_current(&mut self, exit_code: i32) -> ! {
let curr = &self.current_task;
debug!("task exit: {}, exit_code={}", curr.id_name(), exit_code);
assert!(curr.is_running(), "task is not running: {:?}", curr.state());
assert!(!curr.is_idle());
if curr.is_init() {
unsafe {
EXITED_TASKS.current_ref_mut_raw().clear();
}
ax_hal::power::system_off();
} else {
curr.set_state(TaskState::Exited);
curr.notify_exit(exit_code);
unsafe {
EXITED_TASKS.current_ref_mut_raw().push_back(curr.clone());
WAIT_FOR_EXIT.current_ref_mut_raw().notify_one(false);
}
self.inner.resched();
}
unreachable!("task exited!");
}
pub fn blocked_resched(&mut self, mut wq_guard: WaitQueueGuard) {
let curr = &self.current_task;
assert!(curr.is_running());
assert!(!curr.is_idle());
#[cfg(feature = "preempt")]
assert!(curr.can_preempt(2));
curr.set_state(TaskState::Blocked);
curr.set_in_wait_queue(true);
wq_guard.push_back(curr.clone());
drop(wq_guard);
debug!("task block: {}", curr.id_name());
self.inner.resched();
}
pub fn future_blocked_resched(&mut self, mut woke: SpinNoIrqGuard<'_, bool>) {
let curr = &self.current_task;
assert!(curr.is_running());
assert!(!curr.is_idle());
#[cfg(feature = "preempt")]
assert!(curr.can_preempt(2));
curr.set_state(TaskState::Blocked);
*woke = false;
drop(woke);
debug!("task block: {}", curr.id_name());
self.inner.resched();
}
#[cfg(feature = "irq")]
pub fn sleep_until(&mut self, deadline: ax_hal::time::TimeValue) {
let curr = &self.current_task;
debug!("task sleep: {}, deadline={:?}", curr.id_name(), deadline);
assert!(curr.is_running());
assert!(!curr.is_idle());
let now = ax_hal::time::wall_time();
if now < deadline {
crate::timers::set_alarm_wakeup(deadline, curr.clone());
curr.set_state(TaskState::Blocked);
self.inner.resched();
}
}
pub fn set_current_priority(&mut self, prio: isize) -> bool {
self.inner
.scheduler
.lock()
.set_priority(&self.current_task, prio)
}
}
impl AxRunQueue {
fn new(cpu_id: usize) -> Self {
let gc_task = TaskInner::new(gc_entry, "gc".into(), ax_config::TASK_STACK_SIZE).into_arc();
gc_task.set_cpumask(AxCpuMask::one_shot(cpu_id));
let mut scheduler = Scheduler::new();
scheduler.add_task(gc_task);
Self {
cpu_id,
scheduler: SpinRaw::new(scheduler),
}
}
fn put_task_with_state(
&mut self,
task: AxTaskRef,
current_state: TaskState,
preempt: bool,
) -> bool {
if task.transition_state(current_state, TaskState::Ready) && !task.is_idle() {
if current_state == TaskState::Blocked {
#[cfg(feature = "smp")]
while task.on_cpu() {
core::hint::spin_loop();
}
}
#[cfg(feature = "smp")]
task.set_cpu_id(self.cpu_id as _);
self.scheduler.lock().put_prev_task(task, preempt);
true
} else {
false
}
}
fn resched(&mut self) {
let next = self
.scheduler
.lock()
.pick_next_task()
.unwrap_or_else(|| unsafe {
IDLE_TASK.current_ref_raw().get_unchecked().clone()
});
assert!(
next.is_ready(),
"next {} is not ready: {:?}",
next.id_name(),
next.state()
);
self.switch_to(crate::current(), next);
}
fn switch_to(&mut self, prev_task: CurrentTask, next_task: AxTaskRef) {
#[cfg(all(target_os = "none", feature = "irq"))] assert!(
!ax_hal::asm::irqs_enabled(),
"IRQs must be disabled during scheduling"
);
trace!(
"context switch: {} -> {}",
prev_task.id_name(),
next_task.id_name()
);
#[cfg(feature = "preempt")]
next_task.set_preempt_pending(false);
next_task.set_state(TaskState::Running);
if prev_task.ptr_eq(&next_task) {
return;
}
#[cfg(feature = "smp")]
next_task.set_on_cpu(true);
#[cfg(feature = "task-ext")]
{
use crate::TaskExt;
if let Some(ext) = prev_task.task_ext() {
ext.on_leave()
}
if let Some(ext) = next_task.task_ext() {
ext.on_enter()
}
}
unsafe {
let prev_ctx_ptr = prev_task.ctx_mut_ptr();
let next_ctx_ptr = next_task.ctx_mut_ptr();
#[cfg(feature = "smp")]
{
*PREV_TASK.current_ref_mut_raw() = Arc::downgrade(&prev_task);
}
assert!(Arc::strong_count(&prev_task) > 1);
assert!(Arc::strong_count(&next_task) >= 1);
CurrentTask::set_current(prev_task, next_task);
(*prev_ctx_ptr).switch_to(&*next_ctx_ptr);
#[cfg(feature = "smp")]
clear_prev_task_on_cpu();
}
}
}
fn gc_entry() {
loop {
let n = EXITED_TASKS.with_current(|exited_tasks| exited_tasks.len());
for _ in 0..n {
let task = EXITED_TASKS.with_current(|exited_tasks| exited_tasks.pop_front());
if let Some(task) = task {
if Arc::strong_count(&task) == 1 {
drop(task);
} else {
EXITED_TASKS.with_current(|exited_tasks| exited_tasks.push_back(task));
}
}
}
#[cfg(feature = "irq")]
unsafe {
let _timeout = WAIT_FOR_EXIT
.current_ref_raw()
.wait_timeout(core::time::Duration::from_millis(100));
}
#[cfg(not(feature = "irq"))]
unsafe {
WAIT_FOR_EXIT.current_ref_raw().wait();
}
}
}
#[cfg(feature = "smp")]
pub(crate) fn migrate_entry(migrated_task: AxTaskRef) {
select_run_queue::<ax_kernel_guard::NoPreemptIrqSave>(&migrated_task)
.inner
.scheduler
.lock()
.put_prev_task(migrated_task, false)
}
#[cfg(feature = "smp")]
pub(crate) unsafe fn clear_prev_task_on_cpu() {
unsafe {
PREV_TASK
.current_ref_raw()
.upgrade()
.expect("Invalid prev_task pointer or prev_task has been dropped")
.set_on_cpu(false);
}
}
pub(crate) fn init() {
let cpu_id = this_cpu_id();
const IDLE_TASK_STACK_SIZE: usize = 16384;
let idle_task = TaskInner::new(|| crate::run_idle(), "idle".into(), IDLE_TASK_STACK_SIZE);
idle_task.set_cpumask(AxCpuMask::one_shot(cpu_id));
IDLE_TASK.with_current(|i| {
i.init_once(idle_task.into_arc());
});
let main_task = TaskInner::new_init("main".into()).into_arc();
main_task.set_state(TaskState::Running);
unsafe { CurrentTask::init_current(main_task) }
RUN_QUEUE.with_current(|rq| {
rq.init_once(AxRunQueue::new(cpu_id));
});
unsafe {
RUN_QUEUES[cpu_id].write(RUN_QUEUE.current_ref_mut_raw());
}
}
pub(crate) fn init_secondary() {
let cpu_id = this_cpu_id();
let idle_task = TaskInner::new_init("idle".into()).into_arc();
idle_task.set_state(TaskState::Running);
IDLE_TASK.with_current(|i| {
i.init_once(idle_task.clone());
});
unsafe { CurrentTask::init_current(idle_task) }
RUN_QUEUE.with_current(|rq| {
rq.init_once(AxRunQueue::new(cpu_id));
});
unsafe {
RUN_QUEUES[cpu_id].write(RUN_QUEUE.current_ref_mut_raw());
}
}