use crate::event_label::{End, TCreate, TJoin};
use crate::msg::Message;
use crate::runtime::execution::ExecutionState;
use crate::runtime::task::TaskId;
use crate::runtime::thread;
use std::cmp::Ordering;
use std::marker::PhantomData;
use std::time::Duration;
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct ThreadId {
task_id: TaskId,
}
impl From<ThreadId> for usize {
fn from(id: ThreadId) -> usize {
id.task_id.into()
}
}
impl From<u32> for ThreadId {
fn from(id: u32) -> ThreadId {
Self {
task_id: (id as usize).into(),
}
}
}
impl PartialOrd for ThreadId {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some((usize::from(self.task_id)).cmp(&usize::from(other.task_id)))
}
}
impl Ord for ThreadId {
fn cmp(&self, other: &Self) -> Ordering {
(usize::from(self.task_id)).cmp(&usize::from(other.task_id))
}
}
#[derive(Debug, Clone)]
pub struct Thread {
name: Option<String>,
id: ThreadId,
}
impl Thread {
pub fn name(&self) -> Option<&str> {
self.name.as_deref()
}
pub fn id(&self) -> ThreadId {
self.id
}
pub fn unpark(&self) {
ExecutionState::with(|s| {
s.get_mut(self.id.task_id).unpark();
});
thread::switch();
}
}
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Message + 'static,
{
spawn_named(f, None, None, None)
}
pub(crate) fn spawn_named<F, T>(
f: F,
name: Option<String>,
stack_size: Option<usize>,
sym_cid: Option<ThreadId>,
) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Message + 'static,
{
let stack_size =
stack_size.unwrap_or_else(|| ExecutionState::with(|s| s.must.borrow().config().stack_size));
let result = std::sync::Arc::new(std::sync::Mutex::new(None));
let task_id = {
let f = move || {
let ret = f();
ExecutionState::with(|state| {
let pos = state.next_pos();
state
.must
.borrow_mut()
.handle_tend(End::new(pos, Box::new(ret)));
});
};
let cid = ExecutionState::spawn_thread(f, stack_size, name.clone());
ExecutionState::with(|state| {
let pos = state.next_pos();
state.must.borrow_mut().handle_tcreate(TCreate::new(
pos,
usize::from(cid) as u32,
name.clone(),
sym_cid.map(|tid| usize::from(tid) as u32),
));
});
cid
};
thread::switch();
let thread = Thread {
id: ThreadId { task_id },
name,
};
JoinHandle {
task_id,
thread,
result,
}
}
#[allow(dead_code)] #[derive(Debug)]
pub struct JoinHandle<T> {
task_id: TaskId,
thread: Thread,
result: std::sync::Arc<std::sync::Mutex<Option<std::thread::Result<T>>>>,
}
impl<T: 'static> JoinHandle<T> {
pub fn join(self) -> std::thread::Result<T> {
let ret = loop {
thread::switch();
let val = ExecutionState::with(|s| {
let target_id = s.get(self.task_id).id();
let pos = s.next_pos();
s.must
.borrow_mut()
.handle_tjoin(TJoin::new(pos, usize::from(target_id) as u32))
});
if let Some(message) = val {
break message;
}
ExecutionState::with(|s| s.prev_pos());
};
Ok(*ret.as_any().downcast().expect("wrong join return type"))
}
pub fn thread(&self) -> &Thread {
&self.thread
}
}
pub fn yield_now() {
ExecutionState::request_yield();
thread::switch();
}
pub fn sleep(_dur: Duration) {
thread::switch();
}
pub fn current() -> Thread {
let (task_id, name) = ExecutionState::with(|s| {
let me = s.current();
(me.id(), me.name())
});
Thread {
id: ThreadId { task_id },
name,
}
}
pub fn park() {
let switch = ExecutionState::with(|s| s.current_mut().park());
if switch {
thread::switch();
}
}
pub fn park_timeout(_dur: Duration) {
park();
}
#[derive(Debug, Default)]
pub struct Builder {
name: Option<String>,
stack_size: Option<usize>,
}
impl Builder {
pub fn new() -> Self {
Self {
name: None,
stack_size: None,
}
}
pub fn name(mut self, name: String) -> Self {
self.name = Some(name);
self
}
pub fn stack_size(mut self, stack_size: usize) -> Self {
self.stack_size = Some(stack_size);
self
}
pub fn spawn<F, T>(self, f: F) -> std::io::Result<JoinHandle<T>>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Message + 'static,
{
Ok(spawn_named(f, self.name, self.stack_size, None))
}
}
pub struct LocalKey<T: 'static> {
#[doc(hidden)]
pub init: fn() -> T,
#[doc(hidden)]
pub _p: PhantomData<T>,
}
unsafe impl<T> Send for LocalKey<T> {}
unsafe impl<T> Sync for LocalKey<T> {}
impl<T: 'static> std::fmt::Debug for LocalKey<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LocalKey").finish_non_exhaustive()
}
}
impl<T: 'static> LocalKey<T> {
pub fn with<F, R>(&'static self, f: F) -> R
where
F: FnOnce(&T) -> R,
{
self.try_with(f).expect(
"cannot access a Thread Local Storage value \
during or after destruction",
)
}
pub fn try_with<F, R>(&'static self, f: F) -> Result<R, AccessError>
where
F: FnOnce(&T) -> R,
{
let value = self.get().unwrap_or_else(|| {
self.get().unwrap()
})?;
Ok(f(value))
}
fn get(&'static self) -> Option<Result<&T, AccessError>> {
ExecutionState::with(|_state| {
Some(Err(AccessError))
})
}
}
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
#[non_exhaustive]
pub struct AccessError;
impl std::fmt::Display for AccessError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Display::fmt("already destroyed", f)
}
}
impl std::error::Error for AccessError {}