use core::{cell::Cell, fmt, ops, pin::pin, ptr::NonNull};
use r3_core::{
kernel::{EventGroupBits, EventGroupWaitFlags, WaitError, WaitTimeoutError},
utils::Init,
};
use crate::{
error::{expect_not_timeout, BadObjectStateError},
klock::{CpuLockCell, CpuLockGuard, CpuLockTokenRef, CpuLockTokenRefMut},
mutex, task,
task::{TaskCb, TaskSt},
timeout,
utils::intrusive_list::{self, HandleInconsistencyUnchecked, ListAccessorCell},
KernelTraits, Port, PortThreading,
};
struct WaitRef<Traits: PortThreading>(NonNull<Wait<Traits>>);
unsafe impl<Traits: PortThreading> Send for WaitRef<Traits> {}
unsafe impl<Traits: PortThreading> Sync for WaitRef<Traits> {}
impl<Traits: PortThreading> Clone for WaitRef<Traits> {
fn clone(&self) -> Self {
Self(self.0)
}
}
impl<Traits: PortThreading> Copy for WaitRef<Traits> {}
impl<Traits: PortThreading> fmt::Debug for WaitRef<Traits> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_tuple("WaitRef").field(&self.0).finish()
}
}
impl<Traits: PortThreading> PartialEq for WaitRef<Traits> {
fn eq(&self, other: &Self) -> bool {
self.0 == other.0
}
}
impl<Traits: PortThreading> Eq for WaitRef<Traits> {}
use self::unsafe_static::UnsafeStatic;
mod unsafe_static {
use super::*;
pub struct UnsafeStatic {
_nonexhaustive: (),
}
impl UnsafeStatic {
#[inline]
pub const unsafe fn new() -> &'static Self {
&Self { _nonexhaustive: () }
}
}
impl<Traits: Port> ops::Index<WaitRef<Traits>> for UnsafeStatic {
type Output = Wait<Traits>;
#[inline]
fn index(&self, index: WaitRef<Traits>) -> &Self::Output {
unsafe { &*index.0.as_ptr() }
}
}
}
macro_rules! wait_queue_accessor {
($list:expr, $key:expr) => {
unsafe {
ListAccessorCell::new(
$list,
UnsafeStatic::new(),
|wait: &Wait<_>| &wait.link,
$key,
)
.unchecked()
}
};
}
struct Wait<Traits: PortThreading> {
task: &'static TaskCb<Traits>,
link: CpuLockCell<Traits, Option<intrusive_list::Link<WaitRef<Traits>>>>,
wait_queue: Option<&'static WaitQueue<Traits>>,
payload: WaitPayload<Traits>,
}
pub(super) enum WaitPayload<Traits: PortThreading> {
EventGroupBits {
bits: EventGroupBits,
flags: EventGroupWaitFlags,
orig_bits: CpuLockCell<Traits, Cell<EventGroupBits>>,
},
Semaphore,
Mutex(&'static mutex::MutexCb<Traits>),
Park,
Sleep,
__Nonexhaustive,
}
impl<T: PortThreading> WaitPayload<T> {
#[inline]
fn r#move(self) -> Self {
match self {
Self::EventGroupBits {
bits,
flags,
orig_bits,
} => Self::EventGroupBits {
bits,
flags,
orig_bits,
},
Self::Semaphore => Self::Semaphore,
Self::Mutex(x) => Self::Mutex(x),
Self::Park => Self::Park,
Self::Sleep => Self::Sleep,
Self::__Nonexhaustive => Self::__Nonexhaustive,
}
}
}
pub(crate) struct WaitQueue<Traits: PortThreading> {
waits: CpuLockCell<Traits, intrusive_list::ListHead<WaitRef<Traits>>>,
order: QueueOrder,
}
impl<Traits: PortThreading> Init for WaitQueue<Traits> {
#[allow(clippy::declare_interior_mutable_const)]
const INIT: Self = Self {
waits: Init::INIT,
order: QueueOrder::Fifo,
};
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub(crate) enum QueueOrder {
Fifo,
TaskPriority,
}
impl const From<r3_core::kernel::QueueOrder> for QueueOrder {
fn from(x: r3_core::kernel::QueueOrder) -> Self {
match x {
r3_core::kernel::QueueOrder::Fifo => Self::Fifo,
r3_core::kernel::QueueOrder::TaskPriority => Self::TaskPriority,
_ => Self::TaskPriority,
}
}
}
pub(crate) struct TaskWait<Traits: PortThreading> {
current_wait: CpuLockCell<Traits, Option<WaitRef<Traits>>>,
wait_result: CpuLockCell<Traits, Result<(), WaitTimeoutError>>,
}
impl<Traits: PortThreading> Init for TaskWait<Traits> {
#[allow(clippy::declare_interior_mutable_const)]
const INIT: Self = Self {
current_wait: Init::INIT,
wait_result: CpuLockCell::new(Ok(())),
};
}
macro_rules! setup_timeout_wait {
($lock:ident, $task_cb:expr, $duration_time32:expr) => {
let timeout = pin!(new_timeout_object_for_task(
$lock.borrow_mut(),
$task_cb,
$duration_time32
));
let mut timeout_guard = timeout::TimeoutGuard {
timeout: timeout.as_ref(),
lock: $lock,
};
let mut $lock = timeout_guard.lock.borrow_mut();
timeout::insert_timeout($lock.borrow_mut(), timeout_guard.timeout);
};
}
impl<Traits: PortThreading> WaitQueue<Traits> {
pub(super) const fn new(order: QueueOrder) -> Self {
Self {
waits: Init::INIT,
order,
}
}
}
impl<Traits: KernelTraits> WaitQueue<Traits> {
#[inline]
pub(super) fn wait(
&'static self,
mut lock: CpuLockTokenRefMut<'_, Traits>,
payload: WaitPayload<Traits>,
) -> Result<WaitPayload<Traits>, WaitError> {
let task = Traits::state().running_task(lock.borrow_mut()).unwrap();
let wait = Wait {
task,
link: CpuLockCell::new(None),
wait_queue: Some(self),
payload: payload.r#move(),
};
self.wait_inner(lock, &wait).map_err(expect_not_timeout)?;
Ok(wait.payload)
}
#[inline]
pub(super) fn wait_timeout(
&'static self,
mut lock: CpuLockTokenRefMut<'_, Traits>,
payload: WaitPayload<Traits>,
duration_time32: timeout::Time32,
) -> Result<WaitPayload<Traits>, WaitTimeoutError> {
let task = Traits::state().running_task(lock.borrow_mut()).unwrap();
let wait = Wait {
task,
link: CpuLockCell::new(None),
wait_queue: Some(self),
payload: payload.r#move(),
};
setup_timeout_wait!(lock, task, duration_time32);
self.wait_inner(lock, &wait)?;
Ok(wait.payload)
}
fn wait_inner(
&'static self,
mut lock: CpuLockTokenRefMut<'_, Traits>,
wait: &Wait<Traits>,
) -> Result<(), WaitTimeoutError> {
let task = wait.task;
let wait_ref = WaitRef(wait.into());
debug_assert!(core::ptr::eq(
wait.task,
Traits::state().running_task(lock.borrow_mut()).unwrap()
));
debug_assert!(core::ptr::eq(wait.wait_queue.unwrap(), self));
let mut accessor = wait_queue_accessor!(&self.waits, lock.borrow_mut());
let insert_at = match self.order {
QueueOrder::Fifo => {
None
}
QueueOrder::TaskPriority => {
let cur_task_pri = *task.effective_priority.read(&**accessor.cell_key());
Self::find_insertion_position_by_task_priority(cur_task_pri, &accessor)
}
};
unsafe { accessor.insert(wait_ref, insert_at).unwrap_unchecked() };
task.wait.current_wait.replace(&mut *lock, Some(wait_ref));
task::wait_until_woken_up(lock.borrow_mut());
assert!(wait.link.read(&*lock).is_none());
assert!(task.wait.current_wait.get(&*lock).is_none());
task.wait.wait_result.get(&*lock)
}
fn find_insertion_position_by_task_priority<MapLink>(
cur_task_pri: Traits::TaskPriority,
accessor: &ListAccessorCell<
'_,
&CpuLockCell<Traits, intrusive_list::ListHead<WaitRef<Traits>>>,
UnsafeStatic,
MapLink,
CpuLockTokenRefMut<'_, Traits>,
HandleInconsistencyUnchecked,
>,
) -> Option<WaitRef<Traits>>
where
MapLink: Fn(
&Wait<Traits>,
) -> &CpuLockCell<Traits, Option<intrusive_list::Link<WaitRef<Traits>>>>,
{
let mut insert_at = None;
let Ok(mut cursor) = accessor.back();
while let Some(next_cursor) = cursor {
let next_cursor_task = accessor.pool()[next_cursor].task;
let next_cursor_task_pri = *next_cursor_task
.effective_priority
.read(&**accessor.cell_key());
if next_cursor_task_pri > cur_task_pri {
insert_at = Some(next_cursor);
cursor = unsafe { accessor.prev(next_cursor).unwrap_unchecked() };
} else {
break;
}
}
insert_at
}
fn reorder_wait(&'static self, mut lock: CpuLockTokenRefMut<'_, Traits>, wait: &Wait<Traits>) {
match self.order {
QueueOrder::Fifo => return,
QueueOrder::TaskPriority => {}
}
let wait_ref = WaitRef(wait.into());
let task = wait.task;
debug_assert!(core::ptr::eq(wait.wait_queue.unwrap(), self));
let mut accessor = wait_queue_accessor!(&self.waits, lock.borrow_mut());
unsafe {
accessor.remove(wait_ref).unwrap_unchecked();
}
let cur_task_pri = *task.effective_priority.read(&**accessor.cell_key());
let insert_at = Self::find_insertion_position_by_task_priority(cur_task_pri, &accessor);
unsafe {
accessor.insert(wait_ref, insert_at).unwrap_unchecked();
}
}
pub(super) fn first_waiting_task(
&self,
mut lock: CpuLockTokenRefMut<'_, Traits>,
) -> Option<&'static TaskCb<Traits>> {
let accessor = wait_queue_accessor!(&self.waits, lock.borrow_mut());
unsafe { accessor.front_data().unwrap_unchecked() }.map(|wait| wait.task)
}
pub(super) fn wake_up_one(&self, mut lock: CpuLockTokenRefMut<'_, Traits>) -> bool {
let mut accessor = wait_queue_accessor!(&self.waits, lock.borrow_mut());
let wait_ref = unsafe { accessor.pop_front().unwrap_unchecked() };
let wait_ref = if let Some(wait_ref) = wait_ref {
wait_ref
} else {
return false;
};
let wait = unsafe { wait_ref.0.as_ref() };
assert!(core::ptr::eq(wait.wait_queue.unwrap(), self));
complete_wait(lock.borrow_mut(), wait, Ok(()));
true
}
pub(super) fn wake_up_all_conditional(
&self,
mut lock: CpuLockTokenRefMut<'_, Traits>,
mut cond: impl FnMut(&WaitPayload<Traits>, CpuLockTokenRef<'_, Traits>) -> bool,
) {
let Ok(mut cur) = {
let accessor = wait_queue_accessor!(&self.waits, lock.borrow_mut());
accessor.front()
};
while let Some(wait_ref) = cur {
cur = {
let accessor = wait_queue_accessor!(&self.waits, lock.borrow_mut());
unsafe { accessor.next(wait_ref).unwrap_unchecked() }
};
let wait = unsafe { wait_ref.0.as_ref() };
assert!(core::ptr::eq(wait.wait_queue.unwrap(), self));
if !cond(&wait.payload, lock.borrow()) {
continue;
}
let mut accessor = wait_queue_accessor!(&self.waits, lock.borrow_mut());
unsafe { accessor.remove(wait_ref).unwrap_unchecked() };
complete_wait(lock.borrow_mut(), wait, Ok(()));
}
}
}
impl<Traits: KernelTraits> fmt::Debug for Wait<Traits> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"{{ task: {:p}, payload: {:?} }}",
self.task, self.payload
)
}
}
impl<Traits: KernelTraits> fmt::Debug for WaitPayload<Traits> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::EventGroupBits {
bits,
flags,
orig_bits,
} => f
.debug_struct("EventGroupBits")
.field("bits", bits)
.field("flags", flags)
.field("orig_bits", orig_bits)
.finish(),
Self::Semaphore => f.write_str("Semaphore"),
Self::Mutex(mutex) => write!(f, "Mutex({mutex:p})"),
Self::Park => f.write_str("Park"),
Self::Sleep => f.write_str("Sleep"),
Self::__Nonexhaustive => unreachable!(),
}
}
}
impl<Traits: KernelTraits> fmt::Debug for WaitQueue<Traits> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
struct WaitQueuePrinter<'a, Traits: KernelTraits> {
waits: &'a CpuLockCell<Traits, intrusive_list::ListHead<WaitRef<Traits>>>,
}
impl<Traits: KernelTraits> fmt::Debug for WaitQueuePrinter<'_, Traits> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if let Ok(mut lock) = super::klock::lock_cpu() {
let accessor = wait_queue_accessor!(&self.waits, lock.borrow_mut());
f.debug_list()
.entries(accessor.iter().map(|x| x.unwrap().1))
.finish()
} else {
f.write_str("< locked >")
}
}
}
f.debug_struct("WaitQueue")
.field("waits", &WaitQueuePrinter { waits: &self.waits })
.field("order", &self.order)
.finish()
}
}
impl<Traits: KernelTraits> fmt::Debug for TaskWait<Traits> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("TaskWait")
.field(
"current_wait",
&self.current_wait.debug_fmt_with(|wait_ref, f| {
let wait = wait_ref.map(|r| &unsafe { &*r.0.as_ptr() }.payload);
wait.fmt(f)
}),
)
.field("wait_result", &self.wait_result)
.finish()
}
}
pub(super) fn with_current_wait_payload<Traits: KernelTraits, R>(
lock: CpuLockTokenRefMut<'_, Traits>,
task_cb: &TaskCb<Traits>,
f: impl FnOnce(Option<&WaitPayload<Traits>>) -> R,
) -> R {
let wait_ref = task_cb.wait.current_wait.get(&*lock);
let wait = wait_ref.map(|r| &unsafe { &*r.0.as_ptr() }.payload);
f(wait)
}
pub(super) fn reorder_wait_of_task<Traits: KernelTraits>(
lock: CpuLockTokenRefMut<'_, Traits>,
task_cb: &TaskCb<Traits>,
) {
if let Some(wait_ref) = task_cb.wait.current_wait.get(&*lock) {
let wait = unsafe { &*wait_ref.0.as_ptr() };
if let Some(wait_queue) = wait.wait_queue {
wait_queue.reorder_wait(lock, wait);
}
}
}
#[inline]
pub(super) fn wait_no_queue<Traits: KernelTraits>(
mut lock: CpuLockTokenRefMut<'_, Traits>,
payload: WaitPayload<Traits>,
) -> Result<WaitPayload<Traits>, WaitError> {
let task = Traits::state().running_task(lock.borrow_mut()).unwrap();
let wait = Wait {
task,
link: CpuLockCell::new(None),
wait_queue: None,
payload: payload.r#move(),
};
wait_no_queue_inner(lock, &wait).map_err(expect_not_timeout)?;
Ok(wait.payload)
}
#[inline]
pub(super) fn wait_no_queue_timeout<Traits: KernelTraits>(
mut lock: CpuLockTokenRefMut<'_, Traits>,
payload: WaitPayload<Traits>,
duration_time32: timeout::Time32,
) -> Result<WaitPayload<Traits>, WaitTimeoutError> {
let task = Traits::state().running_task(lock.borrow_mut()).unwrap();
let wait = Wait {
task,
link: CpuLockCell::new(None),
wait_queue: None,
payload: payload.r#move(),
};
setup_timeout_wait!(lock, task, duration_time32);
wait_no_queue_inner(lock, &wait)?;
Ok(wait.payload)
}
fn wait_no_queue_inner<Traits: KernelTraits>(
mut lock: CpuLockTokenRefMut<'_, Traits>,
wait: &Wait<Traits>,
) -> Result<(), WaitTimeoutError> {
let task = wait.task;
let wait_ref = WaitRef(wait.into());
debug_assert!(core::ptr::eq(
wait.task,
Traits::state().running_task(lock.borrow_mut()).unwrap()
));
debug_assert!(wait.wait_queue.is_none());
debug_assert!(wait.link.read(&*lock).is_none());
task.wait.current_wait.replace(&mut *lock, Some(wait_ref));
task::wait_until_woken_up(lock.borrow_mut());
assert!(task.wait.current_wait.get(&*lock).is_none());
task.wait.wait_result.get(&*lock)
}
fn complete_wait<Traits: KernelTraits>(
mut lock: CpuLockTokenRefMut<'_, Traits>,
wait: &Wait<Traits>,
wait_result: Result<(), WaitTimeoutError>,
) {
let task_cb = wait.task;
assert_eq!(
*task_cb.wait.current_wait.read(&*lock),
Some(WaitRef(wait.into()))
);
task_cb.wait.current_wait.replace(&mut *lock, None);
let _ = task_cb.wait.wait_result.replace(&mut *lock, wait_result);
assert_eq!(*task_cb.st.read(&*lock), task::TaskSt::Waiting);
unsafe { task::make_ready(lock, task_cb) };
}
pub(super) fn interrupt_task<Traits: KernelTraits>(
mut lock: CpuLockTokenRefMut<'_, Traits>,
task_cb: &'static TaskCb<Traits>,
wait_result: Result<(), WaitTimeoutError>,
) -> Result<(), BadObjectStateError> {
match *task_cb.st.read(&*lock) {
TaskSt::Waiting => {
let wait_ref = task_cb.wait.current_wait.get(&*lock);
let wait_ref = wait_ref.unwrap();
let wait = unsafe { wait_ref.0.as_ref() };
if let Some(wait_queue) = wait.wait_queue {
let mut accessor = wait_queue_accessor!(&wait_queue.waits, lock.borrow_mut());
unsafe { accessor.remove(wait_ref).unwrap_unchecked() };
}
complete_wait(lock.borrow_mut(), wait, wait_result);
Ok(())
}
_ => Err(BadObjectStateError::BadObjectState),
}
}
fn new_timeout_object_for_task<Traits: KernelTraits>(
lock: CpuLockTokenRefMut<'_, Traits>,
task_cb: &'static TaskCb<Traits>,
duration_time32: timeout::Time32,
) -> timeout::Timeout<Traits> {
let param = task_cb as *const _ as usize;
let timeout_object = timeout::Timeout::new(interrupt_task_by_timeout, param);
fn interrupt_task_by_timeout<Traits: KernelTraits>(
param: usize,
mut lock: CpuLockGuard<Traits>,
) -> CpuLockGuard<Traits> {
let task_cb = unsafe { &*(param as *const TaskCb<Traits>) };
match interrupt_task(lock.borrow_mut(), task_cb, Err(WaitTimeoutError::Timeout)) {
Ok(()) | Err(BadObjectStateError::BadObjectState) => {}
}
lock
}
timeout_object.set_expiration_after(lock, duration_time32);
timeout_object
}