#![feature(const_fn)]
#![feature(thread_local)]
#![feature(external_doc)]
#![feature(deadline_api)]
#![feature(asm)]
#![feature(cfg_target_has_atomic)]
#![feature(unsafe_block_in_unsafe_fn)]
#![doc(include = "./lib.md")]
#![deny(unsafe_op_in_unsafe_fn)]
use atomic_ref::AtomicRef;
use once_cell::sync::OnceCell;
use r3::{
kernel::{
ClearInterruptLineError, EnableInterruptLineError, InterruptNum, InterruptPriority,
PendInterruptLineError, Port, PortToKernel, QueryInterruptLineError,
SetInterruptLinePriorityError, TaskCb, UTicks,
},
prelude::*,
};
use std::{
cell::Cell,
sync::mpsc,
time::{Duration, Instant},
};
use try_mutex::TryMutex;
#[cfg(unix)]
#[path = "threading_unix.rs"]
mod threading;
#[cfg(windows)]
#[path = "threading_win.rs"]
mod threading;
#[cfg(test)]
mod threading_test;
mod sched;
mod ums;
mod utils;
use self::utils::LockConsuming;
#[doc(hidden)]
pub extern crate r3;
#[doc(hidden)]
pub use std::sync::atomic::{AtomicBool, Ordering};
#[doc(hidden)]
pub extern crate env_logger;
pub const NUM_INTERRUPT_LINES: usize = 1024;
pub const INTERRUPT_LINE_DISPATCH: InterruptNum = 1023;
pub const INTERRUPT_PRIORITY_DISPATCH: InterruptPriority = 16384;
pub const INTERRUPT_LINE_TIMER: InterruptNum = 1022;
pub const INTERRUPT_PRIORITY_TIMER: InterruptPriority = 16383;
#[doc(hidden)]
pub unsafe trait PortInstance: Kernel + Port<PortTaskState = TaskState> {
fn port_state() -> &'static State;
}
#[doc(hidden)]
pub struct State {
thread_group: OnceCell<ums::ThreadGroup<sched::SchedState>>,
timer_cmd_send: TryMutex<Option<mpsc::Sender<TimerCmd>>>,
origin: AtomicRef<'static, Instant>,
}
#[derive(Debug)]
pub struct TaskState {
tsm: TryMutex<Tsm>,
}
impl Init for TaskState {
#[allow(clippy::declare_interior_mutable_const)]
const INIT: Self = Self::new();
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
enum Tsm {
Uninit,
Dormant,
Running(ums::ThreadId),
}
enum TimerCmd {
SetTimeout { at: Instant },
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ThreadRole {
Unknown,
Boot,
Interrupt,
Task,
}
thread_local! {
static THREAD_ROLE: Cell<ThreadRole> = Cell::new(ThreadRole::Unknown);
}
impl TaskState {
pub const fn new() -> Self {
Self {
tsm: TryMutex::new(Tsm::Uninit),
}
}
fn assert_current_thread(&self) {
let expected_thread_id = match &*self.tsm.lock() {
Tsm::Running(thread_id) => *thread_id,
_ => unreachable!(),
};
assert_eq!(ums::current_thread(), Some(expected_thread_id));
}
unsafe fn exit_and_dispatch<System: PortInstance>(&self, state: &'static State) -> ! {
log::trace!("exit_and_dispatch({:p}) enter", self);
self.assert_current_thread();
let mut lock = state.thread_group.get().unwrap().lock();
let thread_id = match std::mem::replace(&mut *self.tsm.lock(), Tsm::Uninit) {
Tsm::Running(thread_id) => thread_id,
_ => unreachable!(),
};
lock.scheduler().recycle_thread(thread_id);
lock.scheduler().cpu_lock = false;
drop(lock);
unsafe { state.yield_cpu::<System>() };
log::trace!("exit_and_dispatch({:p}) calling exit_thread", self);
unsafe { ums::exit_thread() };
}
}
#[allow(clippy::missing_safety_doc)]
impl State {
pub const fn new() -> Self {
Self {
thread_group: OnceCell::new(),
timer_cmd_send: TryMutex::new(None),
origin: AtomicRef::new(None),
}
}
pub fn port_boot<System: PortInstance>(&self) {
let (thread_group, join_handle) = ums::ThreadGroup::new(sched::SchedState::new::<System>());
self.thread_group.set(thread_group).ok().unwrap();
let (timer_cmd_send, timer_cmd_recv) = mpsc::channel();
log::trace!("starting the timer thread");
let timer_join_handle = std::thread::spawn(move || {
let mut next_deadline = None;
loop {
let recv_result = if let Some(next_deadline) = next_deadline {
timer_cmd_recv.recv_deadline(next_deadline)
} else {
timer_cmd_recv
.recv()
.map_err(|_| mpsc::RecvTimeoutError::Disconnected)
};
match recv_result {
Err(mpsc::RecvTimeoutError::Disconnected) => {
break;
}
Err(mpsc::RecvTimeoutError::Timeout) => {
pend_interrupt_line::<System>(INTERRUPT_LINE_TIMER).unwrap();
next_deadline = None;
}
Ok(TimerCmd::SetTimeout { at }) => {
next_deadline = Some(at);
}
}
}
});
*self.timer_cmd_send.lock() = Some(timer_cmd_send);
let mut lock = self.thread_group.get().unwrap().lock();
let thread_id = lock.spawn(|_| {
THREAD_ROLE.with(|role| role.set(ThreadRole::Boot));
unsafe {
<System as PortToKernel>::boot();
}
});
log::trace!("startup thread = {:?}", thread_id);
lock.scheduler().task_thread = Some(thread_id);
lock.scheduler().recycle_thread(thread_id);
lock.preempt();
lock.scheduler()
.update_line(INTERRUPT_LINE_TIMER, |line| {
line.priority = INTERRUPT_PRIORITY_TIMER;
line.enable = true;
line.start = Some(Self::timer_handler::<System>);
})
.ok()
.unwrap();
drop(lock);
let result = join_handle.join();
log::trace!("stopping the timer thread");
*self.timer_cmd_send.lock() = None;
timer_join_handle.join().unwrap();
log::trace!("stopped the timer thread");
if let Err(e) = result {
std::panic::resume_unwind(e);
}
}
pub unsafe fn dispatch_first_task<System: PortInstance>(&'static self) -> ! {
log::trace!("dispatch_first_task");
assert_eq!(expect_worker_thread::<System>(), ThreadRole::Boot);
assert!(self.is_cpu_lock_active::<System>());
let mut lock = self.thread_group.get().unwrap().lock();
lock.scheduler()
.update_line(INTERRUPT_LINE_DISPATCH, |line| {
line.priority = INTERRUPT_PRIORITY_DISPATCH;
line.enable = true;
line.pended = true;
line.start = Some(Self::dispatch_handler::<System>);
})
.ok()
.unwrap();
lock.scheduler().cpu_lock = false;
assert!(sched::check_preemption_by_interrupt(
self.thread_group.get().unwrap(),
&mut lock
));
drop(lock);
unsafe { ums::exit_thread() };
}
extern "C" fn dispatch_handler<System: PortInstance>() {
System::port_state().dispatch::<System>();
}
fn dispatch<System: PortInstance>(&'static self) {
assert_eq!(expect_worker_thread::<System>(), ThreadRole::Interrupt);
unsafe { self.enter_cpu_lock::<System>() };
unsafe { System::choose_running_task() };
unsafe { self.leave_cpu_lock::<System>() };
let mut lock = self.thread_group.get().unwrap().lock();
let running_task = unsafe { *System::state().running_task_ptr() };
lock.scheduler().task_thread = if let Some(task) = running_task {
log::trace!("dispatching task {:p}", task);
let mut tsm = task.port_task_state.tsm.lock();
match &*tsm {
Tsm::Dormant => {
let thread = lock.spawn(move |_| {
THREAD_ROLE.with(|role| role.set(ThreadRole::Task));
assert!(!self.is_cpu_lock_active::<System>());
log::debug!("task {:p} is now running", task);
unsafe {
(task.attr.entry_point)(task.attr.entry_param);
}
unsafe {
System::exit_task().unwrap();
}
});
log::trace!("spawned thread {:?} for the task {:p}", thread, task);
*tsm = Tsm::Running(thread);
Some(thread)
}
Tsm::Running(thread_id) => Some(*thread_id),
Tsm::Uninit => unreachable!(),
}
} else {
None
};
}
pub unsafe fn yield_cpu<System: PortInstance>(&'static self) {
log::trace!("yield_cpu");
expect_worker_thread::<System>();
assert!(!self.is_cpu_lock_active::<System>());
self.pend_interrupt_line::<System>(INTERRUPT_LINE_DISPATCH)
.unwrap();
}
pub unsafe fn exit_and_dispatch<System: PortInstance>(
&'static self,
task: &'static TaskCb<System>,
) -> ! {
log::trace!("exit_and_dispatch");
assert_eq!(expect_worker_thread::<System>(), ThreadRole::Task);
assert!(self.is_cpu_lock_active::<System>());
unsafe {
task.port_task_state.exit_and_dispatch::<System>(self);
}
}
pub unsafe fn enter_cpu_lock<System: PortInstance>(&self) {
log::trace!("enter_cpu_lock");
expect_worker_thread::<System>();
let mut lock = self.thread_group.get().unwrap().lock();
assert!(!lock.scheduler().cpu_lock);
lock.scheduler().cpu_lock = true;
}
pub unsafe fn leave_cpu_lock<System: PortInstance>(&'static self) {
log::trace!("leave_cpu_lock");
expect_worker_thread::<System>();
let mut lock = self.thread_group.get().unwrap().lock();
assert!(lock.scheduler().cpu_lock);
lock.scheduler().cpu_lock = false;
if sched::check_preemption_by_interrupt(self.thread_group.get().unwrap(), &mut lock) {
drop(lock);
ums::yield_now();
}
}
pub unsafe fn initialize_task_state<System: PortInstance>(
&self,
task: &'static TaskCb<System>,
) {
log::trace!("initialize_task_state {:p}", task);
expect_worker_thread::<System>();
assert!(self.is_cpu_lock_active::<System>());
let pts = &task.port_task_state;
let mut tsm = pts.tsm.lock();
match &*tsm {
Tsm::Dormant => {}
Tsm::Running(_) => {
todo!("terminating a thread is not implemented yet");
}
Tsm::Uninit => {
*tsm = Tsm::Dormant;
}
}
}
pub fn is_cpu_lock_active<System: PortInstance>(&self) -> bool {
expect_worker_thread::<System>();
(self.thread_group.get().unwrap().lock())
.scheduler()
.cpu_lock
}
pub fn is_task_context<System: PortInstance>(&self) -> bool {
expect_worker_thread::<System>();
THREAD_ROLE.with(|role| match role.get() {
ThreadRole::Interrupt | ThreadRole::Boot => false,
ThreadRole::Task => true,
_ => panic!("`is_task_context` was called from an unknown thread"),
})
}
pub fn set_interrupt_line_priority<System: PortInstance>(
&'static self,
num: InterruptNum,
priority: InterruptPriority,
) -> Result<(), SetInterruptLinePriorityError> {
log::trace!("set_interrupt_line_priority{:?}", (num, priority));
assert!(matches!(
expect_worker_thread::<System>(),
ThreadRole::Boot | ThreadRole::Task
));
let mut lock = self.thread_group.get().unwrap().lock();
lock.scheduler()
.update_line(num, |line| line.priority = priority)
.map_err(|sched::BadIntLineError| SetInterruptLinePriorityError::BadParam)?;
if sched::check_preemption_by_interrupt(self.thread_group.get().unwrap(), &mut lock) {
drop(lock);
ums::yield_now();
}
Ok(())
}
pub fn enable_interrupt_line<System: PortInstance>(
&'static self,
num: InterruptNum,
) -> Result<(), EnableInterruptLineError> {
log::trace!("enable_interrupt_line{:?}", (num,));
expect_worker_thread::<System>();
let mut lock = self.thread_group.get().unwrap().lock();
lock.scheduler()
.update_line(num, |line| line.enable = true)
.map_err(|sched::BadIntLineError| EnableInterruptLineError::BadParam)?;
if sched::check_preemption_by_interrupt(self.thread_group.get().unwrap(), &mut lock) {
drop(lock);
ums::yield_now();
}
Ok(())
}
pub fn disable_interrupt_line<System: PortInstance>(
&self,
num: InterruptNum,
) -> Result<(), EnableInterruptLineError> {
log::trace!("disable_interrupt_line{:?}", (num,));
expect_worker_thread::<System>();
(self.thread_group.get().unwrap().lock())
.scheduler()
.update_line(num, |line| line.enable = false)
.map_err(|sched::BadIntLineError| EnableInterruptLineError::BadParam)
}
pub fn pend_interrupt_line<System: PortInstance>(
&'static self,
num: InterruptNum,
) -> Result<(), PendInterruptLineError> {
log::trace!("pend_interrupt_line{:?}", (num,));
expect_worker_thread::<System>();
let mut lock = self.thread_group.get().unwrap().lock();
lock.scheduler()
.update_line(num, |line| line.pended = true)
.map_err(|sched::BadIntLineError| PendInterruptLineError::BadParam)?;
if sched::check_preemption_by_interrupt(self.thread_group.get().unwrap(), &mut lock) {
drop(lock);
ums::yield_now();
}
Ok(())
}
pub fn clear_interrupt_line<System: PortInstance>(
&self,
num: InterruptNum,
) -> Result<(), ClearInterruptLineError> {
log::trace!("clear_interrupt_line{:?}", (num,));
expect_worker_thread::<System>();
(self.thread_group.get().unwrap().lock())
.scheduler()
.update_line(num, |line| line.pended = false)
.map_err(|sched::BadIntLineError| ClearInterruptLineError::BadParam)
}
pub fn is_interrupt_line_pending<System: PortInstance>(
&self,
num: InterruptNum,
) -> Result<bool, QueryInterruptLineError> {
expect_worker_thread::<System>();
(self.thread_group.get().unwrap().lock())
.scheduler()
.is_line_pended(num)
.map_err(|sched::BadIntLineError| QueryInterruptLineError::BadParam)
}
pub const MAX_TICK_COUNT: UTicks = UTicks::MAX;
pub const MAX_TIMEOUT: UTicks = UTicks::MAX / 2;
pub fn tick_count<System: PortInstance>(&self) -> UTicks {
expect_worker_thread::<System>();
let origin = if let Some(x) = self.origin.load(Ordering::Acquire) {
x
} else {
let origin = Box::leak(Box::new(Instant::now()));
match self.origin.compare_exchange(
None,
Some(origin),
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => origin,
Err(x) => x.unwrap(),
}
};
let micros = Instant::now().duration_since(*origin).as_micros();
fn get_random_number() -> UTicks {
0x00c0ffee
}
(micros as UTicks).wrapping_add(get_random_number())
}
pub fn pend_tick_after<System: PortInstance>(&self, tick_count_delta: UTicks) {
expect_worker_thread::<System>();
log::trace!("pend_tick_after({:?})", tick_count_delta);
let now = Instant::now() + Duration::from_micros(tick_count_delta.into());
let _sched_lock = lock_scheduler::<System>();
let timer_cmd_send = self.timer_cmd_send.lock();
let timer_cmd_send = timer_cmd_send.as_ref().unwrap();
timer_cmd_send
.send(TimerCmd::SetTimeout { at: now })
.unwrap();
}
pub fn pend_tick<System: PortInstance>(&'static self) {
expect_worker_thread::<System>();
log::trace!("pend_tick");
self.pend_interrupt_line::<System>(INTERRUPT_LINE_TIMER)
.unwrap();
}
extern "C" fn timer_handler<System: PortInstance>() {
assert_eq!(expect_worker_thread::<System>(), ThreadRole::Interrupt);
log::trace!("timer_handler");
unsafe { <System as PortToKernel>::timer_tick() };
}
}
fn expect_worker_thread<System: PortInstance>() -> ThreadRole {
let role = THREAD_ROLE.with(|r| r.get());
assert_ne!(role, ThreadRole::Unknown);
role
}
pub fn shutdown<System: PortInstance>() {
System::port_state()
.thread_group
.get()
.unwrap()
.lock()
.shutdown();
}
pub fn pend_interrupt_line<System: PortInstance>(
num: InterruptNum,
) -> Result<(), PendInterruptLineError> {
log::trace!("external-pend_interrupt_line{:?}", (num,));
assert_eq!(
THREAD_ROLE.with(|r| r.get()),
ThreadRole::Unknown,
"this method cannot be called from a port-managed thread"
);
let state = System::port_state();
let mut lock = state.thread_group.get().unwrap().lock();
lock.scheduler()
.update_line(num, |line| line.pended = true)
.map_err(|sched::BadIntLineError| PendInterruptLineError::BadParam)?;
if sched::check_preemption_by_interrupt(state.thread_group.get().unwrap(), &mut lock) {
lock.preempt();
drop(lock);
}
Ok(())
}
pub fn lock_scheduler<System: PortInstance>() -> impl Sized {
let state = System::port_state();
state.thread_group.get().unwrap().lock()
}
#[macro_export]
macro_rules! use_port {
(unsafe $vis:vis struct $sys:ident) => {
$vis struct $sys;
mod port_std_impl {
use super::$sys;
use $crate::r3::kernel::{
ClearInterruptLineError, EnableInterruptLineError, InterruptNum, InterruptPriority,
PendInterruptLineError, Port, QueryInterruptLineError, SetInterruptLinePriorityError,
TaskCb, PortToKernel, PortInterrupts, PortThreading, UTicks, PortTimer,
};
use $crate::{State, TaskState, PortInstance};
pub(super) static PORT_STATE: State = State::new();
unsafe impl PortInstance for $sys {
#[inline]
fn port_state() -> &'static State {
&PORT_STATE
}
}
unsafe impl PortThreading for $sys {
type PortTaskState = TaskState;
#[allow(clippy::declare_interior_mutable_const)]
const PORT_TASK_STATE_INIT: Self::PortTaskState = TaskState::new();
unsafe fn dispatch_first_task() -> ! {
PORT_STATE.dispatch_first_task::<Self>()
}
unsafe fn yield_cpu() {
PORT_STATE.yield_cpu::<Self>()
}
unsafe fn exit_and_dispatch(task: &'static TaskCb<Self>) -> ! {
PORT_STATE.exit_and_dispatch::<Self>(task);
}
unsafe fn enter_cpu_lock() {
PORT_STATE.enter_cpu_lock::<Self>()
}
unsafe fn leave_cpu_lock() {
PORT_STATE.leave_cpu_lock::<Self>()
}
unsafe fn initialize_task_state(task: &'static TaskCb<Self>) {
PORT_STATE.initialize_task_state::<Self>(task)
}
fn is_cpu_lock_active() -> bool {
PORT_STATE.is_cpu_lock_active::<Self>()
}
fn is_task_context() -> bool {
PORT_STATE.is_task_context::<Self>()
}
}
unsafe impl PortInterrupts for $sys {
const MANAGED_INTERRUPT_PRIORITY_RANGE:
::std::ops::Range<InterruptPriority> = 0..InterruptPriority::MAX;
unsafe fn set_interrupt_line_priority(
line: InterruptNum,
priority: InterruptPriority,
) -> Result<(), SetInterruptLinePriorityError> {
PORT_STATE.set_interrupt_line_priority::<Self>(line, priority)
}
unsafe fn enable_interrupt_line(line: InterruptNum) -> Result<(), EnableInterruptLineError> {
PORT_STATE.enable_interrupt_line::<Self>(line)
}
unsafe fn disable_interrupt_line(line: InterruptNum) -> Result<(), EnableInterruptLineError> {
PORT_STATE.disable_interrupt_line::<Self>(line)
}
unsafe fn pend_interrupt_line(line: InterruptNum) -> Result<(), PendInterruptLineError> {
PORT_STATE.pend_interrupt_line::<Self>(line)
}
unsafe fn clear_interrupt_line(line: InterruptNum) -> Result<(), ClearInterruptLineError> {
PORT_STATE.clear_interrupt_line::<Self>(line)
}
unsafe fn is_interrupt_line_pending(
line: InterruptNum,
) -> Result<bool, QueryInterruptLineError> {
PORT_STATE.is_interrupt_line_pending::<Self>(line)
}
}
impl PortTimer for $sys {
const MAX_TICK_COUNT: UTicks = State::MAX_TICK_COUNT;
const MAX_TIMEOUT: UTicks = State::MAX_TIMEOUT;
unsafe fn tick_count() -> UTicks {
PORT_STATE.tick_count::<Self>()
}
unsafe fn pend_tick_after(tick_count_delta: UTicks) {
PORT_STATE.pend_tick_after::<Self>(tick_count_delta)
}
unsafe fn pend_tick() {
PORT_STATE.pend_tick::<Self>()
}
}
}
fn main() {
$crate::env_logger::init();
port_std_impl::PORT_STATE.port_boot::<$sys>();
}
};
}