use crate::rt::execution;
use crate::rt::object::Operation;
use crate::rt::vv::VersionVec;
use std::{any::Any, collections::HashMap, fmt, ops};
use super::Location;
pub(crate) struct Thread {
pub id: Id,
pub state: State,
pub critical: bool,
pub(super) operation: Option<Operation>,
pub causality: VersionVec,
pub released: VersionVec,
pub dpor_vv: VersionVec,
pub last_yield: Option<u16>,
pub yield_count: usize,
locals: LocalMap,
span: tracing::Span,
}
#[derive(Debug)]
pub(crate) struct Set {
execution_id: execution::Id,
threads: Vec<Thread>,
active: Option<usize>,
pub seq_cst_causality: VersionVec,
iteration_span: tracing::Span,
}
#[derive(Eq, PartialEq, Hash, Copy, Clone)]
pub(crate) struct Id {
execution_id: execution::Id,
id: usize,
}
impl Id {
pub(crate) fn public_id(&self) -> usize {
self.id
}
}
#[derive(Debug, Clone, Copy)]
pub(crate) enum State {
Runnable { unparked: bool },
Blocked(#[allow(dead_code)] Location),
Yield,
Terminated,
}
type LocalMap = HashMap<LocalKeyId, LocalValue>;
#[derive(Eq, PartialEq, Hash, Copy, Clone)]
struct LocalKeyId(usize);
struct LocalValue(Option<Box<dyn Any>>);
impl Thread {
fn new(id: Id, parent_span: &tracing::Span) -> Thread {
Thread {
id,
span: tracing::info_span!(parent: parent_span.id(), "thread", id = id.id),
state: State::Runnable { unparked: false },
critical: false,
operation: None,
causality: VersionVec::new(),
released: VersionVec::new(),
dpor_vv: VersionVec::new(),
last_yield: None,
yield_count: 0,
locals: HashMap::new(),
}
}
pub(crate) fn is_runnable(&self) -> bool {
matches!(self.state, State::Runnable { .. })
}
pub(crate) fn set_runnable(&mut self) {
self.state = State::Runnable { unparked: false };
}
pub(crate) fn set_blocked(&mut self, location: Location) {
self.state = State::Blocked(location);
}
pub(crate) fn is_blocked(&self) -> bool {
matches!(self.state, State::Blocked(..))
}
pub(crate) fn is_yield(&self) -> bool {
matches!(self.state, State::Yield)
}
pub(crate) fn set_yield(&mut self) {
self.state = State::Yield;
self.last_yield = Some(self.causality[self.id]);
self.yield_count += 1;
}
pub(crate) fn is_terminated(&self) -> bool {
matches!(self.state, State::Terminated)
}
pub(crate) fn set_terminated(&mut self) {
self.state = State::Terminated;
}
pub(crate) fn drop_locals(&mut self) -> Box<dyn std::any::Any> {
let mut locals = Vec::with_capacity(self.locals.len());
for local in self.locals.values_mut() {
locals.push(local.0.take());
}
Box::new(locals)
}
pub(crate) fn unpark(&mut self, unparker: &Thread) {
self.causality.join(&unparker.causality);
self.set_unparked();
}
fn set_unparked(&mut self) {
if self.is_blocked() || self.is_yield() {
self.set_runnable();
} else if self.is_runnable() {
self.state = State::Runnable { unparked: true }
}
}
}
impl fmt::Debug for Thread {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Thread")
.field("id", &self.id)
.field("state", &self.state)
.field("critical", &self.critical)
.field("operation", &self.operation)
.field("causality", &self.causality)
.field("released", &self.released)
.field("dpor_vv", &self.dpor_vv)
.field("last_yield", &self.last_yield)
.field("yield_count", &self.yield_count)
.field("locals", &format_args!("[..locals..]"))
.finish()
}
}
impl Set {
pub(crate) fn new(execution_id: execution::Id, max_threads: usize) -> Set {
let mut threads = Vec::with_capacity(max_threads);
let iteration_span = tracing::Span::current();
threads.push(Thread::new(Id::new(execution_id, 0), &iteration_span));
Set {
execution_id,
threads,
active: Some(0),
seq_cst_causality: VersionVec::new(),
iteration_span,
}
}
pub(crate) fn execution_id(&self) -> execution::Id {
self.execution_id
}
pub(crate) fn new_thread(&mut self) -> Id {
assert!(self.threads.len() < self.max());
let id = self.threads.len();
self.threads.push(Thread::new(
Id::new(self.execution_id, id),
&self.iteration_span,
));
Id::new(self.execution_id, id)
}
pub(crate) fn max(&self) -> usize {
self.threads.capacity()
}
pub(crate) fn is_active(&self) -> bool {
self.active.is_some()
}
pub(crate) fn is_complete(&self) -> bool {
if self.active.is_none() {
for thread in &self.threads {
assert!(
thread.is_terminated(),
"thread not terminated; {:#?}",
thread
);
}
true
} else {
false
}
}
pub(crate) fn active_id(&self) -> Id {
Id::new(self.execution_id, self.active.unwrap())
}
pub(crate) fn active(&self) -> &Thread {
&self.threads[self.active.unwrap()]
}
pub(crate) fn set_active(&mut self, id: Option<Id>) {
tracing::dispatcher::get_default(|subscriber| {
if let Some(span_id) = self.active().span.id() {
subscriber.exit(&span_id)
}
if let Some(span_id) = id.and_then(|id| self.threads.get(id.id)?.span.id()) {
subscriber.enter(&span_id);
}
});
self.active = id.map(Id::as_usize);
}
pub(crate) fn active_mut(&mut self) -> &mut Thread {
&mut self.threads[self.active.unwrap()]
}
pub(crate) fn active2_mut(&mut self, other: Id) -> (&mut Thread, &mut Thread) {
let active = self.active.unwrap();
let other = other.id;
if other >= active {
let (l, r) = self.threads.split_at_mut(other);
(&mut l[active], &mut r[0])
} else {
let (l, r) = self.threads.split_at_mut(active);
(&mut r[0], &mut l[other])
}
}
pub(crate) fn active_causality_inc(&mut self) {
let id = self.active_id();
self.active_mut().causality.inc(id);
}
pub(crate) fn active_atomic_version(&self) -> u16 {
let id = self.active_id();
self.active().causality[id]
}
pub(crate) fn unpark(&mut self, id: Id) {
if id == self.active_id() {
self.active_mut().set_unparked();
return;
}
let (active, th) = self.active2_mut(id);
th.unpark(active);
}
pub(crate) fn seq_cst(&mut self) {
}
pub(crate) fn seq_cst_fence(&mut self) {
self.threads[self.active.unwrap()]
.causality
.join(&self.seq_cst_causality);
self.seq_cst_causality
.join(&self.threads[self.active.unwrap()].causality);
}
pub(crate) fn clear(&mut self, execution_id: execution::Id) {
self.iteration_span = tracing::Span::current();
self.threads.clear();
self.threads
.push(Thread::new(Id::new(execution_id, 0), &self.iteration_span));
self.execution_id = execution_id;
self.active = Some(0);
self.seq_cst_causality = VersionVec::new();
}
pub(crate) fn iter(&self) -> impl ExactSizeIterator<Item = (Id, &Thread)> + '_ {
let execution_id = self.execution_id;
self.threads
.iter()
.enumerate()
.map(move |(id, thread)| (Id::new(execution_id, id), thread))
}
pub(crate) fn iter_mut(&mut self) -> impl ExactSizeIterator<Item = (Id, &mut Thread)> + '_ {
let execution_id = self.execution_id;
self.threads
.iter_mut()
.enumerate()
.map(move |(id, thread)| (Id::new(execution_id, id), thread))
}
pub(crate) fn split_active(&mut self) -> (&mut Thread, impl Iterator<Item = &mut Thread>) {
let active = self.active.unwrap();
let (one, two) = self.threads.split_at_mut(active);
let (active, two) = two.split_at_mut(1);
let iter = one.iter_mut().chain(two.iter_mut());
(&mut active[0], iter)
}
pub(crate) fn local<T: 'static>(
&mut self,
key: &'static crate::thread::LocalKey<T>,
) -> Option<Result<&T, AccessError>> {
self.active_mut()
.locals
.get(&LocalKeyId::new(key))
.map(|local_value| local_value.get())
}
pub(crate) fn local_init<T: 'static>(
&mut self,
key: &'static crate::thread::LocalKey<T>,
value: T,
) {
assert!(self
.active_mut()
.locals
.insert(LocalKeyId::new(key), LocalValue::new(value))
.is_none())
}
}
impl ops::Index<Id> for Set {
type Output = Thread;
fn index(&self, index: Id) -> &Thread {
&self.threads[index.id]
}
}
impl ops::IndexMut<Id> for Set {
fn index_mut(&mut self, index: Id) -> &mut Thread {
&mut self.threads[index.id]
}
}
impl Id {
pub(crate) fn new(execution_id: execution::Id, id: usize) -> Id {
Id { execution_id, id }
}
pub(crate) fn as_usize(self) -> usize {
self.id
}
}
impl From<Id> for usize {
fn from(src: Id) -> usize {
src.id
}
}
impl fmt::Display for Id {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
self.id.fmt(fmt)
}
}
impl fmt::Debug for Id {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "Id({})", self.id)
}
}
impl LocalKeyId {
fn new<T>(key: &'static crate::thread::LocalKey<T>) -> Self {
Self(key as *const _ as usize)
}
}
impl LocalValue {
fn new<T: 'static>(value: T) -> Self {
Self(Some(Box::new(value)))
}
fn get<T: 'static>(&self) -> Result<&T, AccessError> {
self.0
.as_ref()
.ok_or(AccessError { _private: () })
.map(|val| {
val.downcast_ref::<T>()
.expect("local value must downcast to expected type")
})
}
}
pub struct AccessError {
_private: (),
}
impl fmt::Debug for AccessError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("AccessError").finish()
}
}
impl fmt::Display for AccessError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt("already destroyed", f)
}
}