use crate::rt::execution;
use crate::rt::object::Operation;
use crate::rt::vv::VersionVec;
use std::{any::Any, collections::HashMap, fmt, ops};
pub(crate) struct Thread {
pub id: Id,
pub state: State,
pub critical: bool,
pub(super) operation: Option<Operation>,
pub causality: VersionVec,
pub dpor_vv: VersionVec,
pub last_yield: Option<usize>,
pub yield_count: usize,
locals: LocalMap,
}
#[derive(Debug)]
pub(crate) struct Set {
execution_id: execution::Id,
threads: Vec<Thread>,
active: Option<usize>,
pub seq_cst_causality: VersionVec,
}
#[derive(Eq, PartialEq, Hash, Copy, Clone)]
pub(crate) struct Id {
execution_id: execution::Id,
id: usize,
}
#[derive(Debug, Clone, Copy)]
pub(crate) enum State {
Runnable,
Blocked,
Yield,
Terminated,
}
type LocalMap = HashMap<LocalKeyId, LocalValue>;
#[derive(Eq, PartialEq, Hash, Copy, Clone)]
struct LocalKeyId(usize);
struct LocalValue(Option<Box<dyn Any>>);
impl Thread {
fn new(id: Id, max_threads: usize) -> Thread {
Thread {
id,
state: State::Runnable,
critical: false,
operation: None,
causality: VersionVec::new(max_threads),
dpor_vv: VersionVec::new(max_threads),
last_yield: None,
yield_count: 0,
locals: HashMap::new(),
}
}
pub(crate) fn is_runnable(&self) -> bool {
match self.state {
State::Runnable => true,
_ => false,
}
}
pub(crate) fn set_runnable(&mut self) {
self.state = State::Runnable;
}
pub(crate) fn set_blocked(&mut self) {
self.state = State::Blocked;
}
pub(crate) fn is_blocked(&self) -> bool {
match self.state {
State::Blocked => true,
_ => false,
}
}
pub(crate) fn is_yield(&self) -> bool {
match self.state {
State::Yield => true,
_ => false,
}
}
pub(crate) fn set_yield(&mut self) {
self.state = State::Yield;
self.last_yield = Some(self.causality[self.id]);
self.yield_count += 1;
}
pub(crate) fn is_terminated(&self) -> bool {
match self.state {
State::Terminated => true,
_ => false,
}
}
pub(crate) fn set_terminated(&mut self) {
self.state = State::Terminated;
}
pub(crate) fn drop_locals(&mut self) -> Box<dyn std::any::Any> {
let mut locals = Vec::with_capacity(self.locals.len());
for (_, local) in &mut self.locals {
locals.push(local.0.take());
}
Box::new(locals)
}
pub(crate) fn unpark(&mut self, unparker: &Thread) {
self.causality.join(&unparker.causality);
if self.is_blocked() || self.is_yield() {
self.set_runnable();
}
}
}
impl fmt::Debug for Thread {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Thread")
.field("id", &self.id)
.field("state", &self.state)
.field("critical", &self.critical)
.field("operation", &self.operation)
.field("causality", &self.causality)
.field("dpor_vv", &self.dpor_vv)
.field("last_yield", &self.last_yield)
.field("yield_count", &self.yield_count)
.field("locals", &format_args!("[..locals..]"))
.finish()
}
}
impl Set {
pub(crate) fn new(execution_id: execution::Id, max_threads: usize) -> Set {
let mut threads = Vec::with_capacity(max_threads);
threads.push(Thread::new(Id::new(execution_id, 0), max_threads));
Set {
execution_id,
threads,
active: Some(0),
seq_cst_causality: VersionVec::new(max_threads),
}
}
pub(crate) fn execution_id(&self) -> execution::Id {
self.execution_id
}
pub(crate) fn new_thread(&mut self) -> Id {
assert!(self.threads.len() < self.max());
let id = self.threads.len();
let max_threads = self.threads.capacity();
self.threads
.push(Thread::new(Id::new(self.execution_id, id), max_threads));
Id::new(self.execution_id, id)
}
pub(crate) fn max(&self) -> usize {
self.threads.capacity()
}
pub(crate) fn is_active(&self) -> bool {
self.active.is_some()
}
pub(crate) fn active_id(&self) -> Id {
Id::new(self.execution_id, self.active.unwrap())
}
pub(crate) fn active(&self) -> &Thread {
&self.threads[self.active.unwrap()]
}
pub(crate) fn set_active(&mut self, id: Option<Id>) {
self.active = id.map(Id::as_usize);
}
pub(crate) fn active_mut(&mut self) -> &mut Thread {
&mut self.threads[self.active.unwrap()]
}
pub(crate) fn active2_mut(&mut self, other: Id) -> (&mut Thread, &mut Thread) {
let active = self.active.unwrap();
let other = other.id;
if other >= active {
let (l, r) = self.threads.split_at_mut(other);
(&mut l[active], &mut r[0])
} else {
let (l, r) = self.threads.split_at_mut(active);
(&mut r[0], &mut l[other])
}
}
pub(crate) fn active_causality_inc(&mut self) {
let id = self.active_id();
self.active_mut().causality.inc(id);
}
pub(crate) fn active_atomic_version(&self) -> usize {
let id = self.active_id();
self.active().causality[id]
}
pub(crate) fn unpark(&mut self, id: Id) {
if id == self.active_id() {
return;
}
let (active, th) = self.active2_mut(id);
th.unpark(&active);
}
pub(crate) fn seq_cst(&mut self) {
self.threads[self.active.unwrap()]
.causality
.join(&self.seq_cst_causality);
self.seq_cst_causality
.join(&self.threads[self.active.unwrap()].causality);
}
pub(crate) fn clear(&mut self, execution_id: execution::Id) {
self.threads.clear();
self.threads.push(Thread::new(
Id::new(execution_id, 0),
self.threads.capacity(),
));
self.execution_id = execution_id;
self.active = Some(0);
self.seq_cst_causality = VersionVec::new(self.max());
}
pub(crate) fn iter<'a>(&'a self) -> impl Iterator<Item = (Id, &'a Thread)> + 'a {
let execution_id = self.execution_id;
self.threads
.iter()
.enumerate()
.map(move |(id, thread)| (Id::new(execution_id, id), thread))
}
pub(crate) fn iter_mut<'a>(
&'a mut self,
) -> Box<dyn Iterator<Item = (Id, &'a mut Thread)> + 'a> {
let execution_id = self.execution_id;
Box::new({
self.threads
.iter_mut()
.enumerate()
.map(move |(id, thread)| (Id::new(execution_id, id), thread))
})
}
pub(crate) fn split_active(&mut self) -> (&mut Thread, impl Iterator<Item = &mut Thread>) {
let active = self.active.unwrap();
let (one, two) = self.threads.split_at_mut(active);
let (active, two) = two.split_at_mut(1);
let iter = one.iter_mut().chain(two.iter_mut());
(&mut active[0], iter)
}
pub(crate) fn local<T: 'static>(
&mut self,
key: &'static crate::thread::LocalKey<T>,
) -> Option<Result<&T, AccessError>> {
self.active_mut()
.locals
.get(&LocalKeyId::new(key))
.map(|local_value| local_value.get())
}
pub(crate) fn local_init<T: 'static>(
&mut self,
key: &'static crate::thread::LocalKey<T>,
value: T,
) {
assert!(self
.active_mut()
.locals
.insert(LocalKeyId::new(key), LocalValue::new(value))
.is_none())
}
}
impl ops::Index<Id> for Set {
type Output = Thread;
fn index(&self, index: Id) -> &Thread {
&self.threads[index.id]
}
}
impl ops::IndexMut<Id> for Set {
fn index_mut(&mut self, index: Id) -> &mut Thread {
&mut self.threads[index.id]
}
}
impl Id {
pub(crate) fn new(execution_id: execution::Id, id: usize) -> Id {
Id { execution_id, id }
}
pub(crate) fn as_usize(self) -> usize {
self.id
}
}
impl fmt::Display for Id {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
self.id.fmt(fmt)
}
}
impl fmt::Debug for Id {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "Id({})", self.id)
}
}
impl LocalKeyId {
fn new<T>(key: &'static crate::thread::LocalKey<T>) -> Self {
Self(key as *const _ as usize)
}
}
impl LocalValue {
fn new<T: 'static>(value: T) -> Self {
Self(Some(Box::new(value)))
}
fn get<T: 'static>(&self) -> Result<&T, AccessError> {
self.0
.as_ref()
.ok_or(AccessError { _private: () })
.map(|val| {
val.downcast_ref::<T>()
.expect("local value must downcast to expected type")
})
}
}
pub struct AccessError {
_private: (),
}
impl fmt::Debug for AccessError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("AccessError").finish()
}
}
impl fmt::Display for AccessError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt("already destroyed", f)
}
}