use crate::event::Event;
use crate::must::Must;
use crate::runtime::failure::{init_panic_hook, persist_task_failure};
use crate::runtime::task::{Task, TaskId, DEFAULT_INLINE_TASKS};
use crate::runtime::thread::continuation::PooledContinuation;
use scoped_tls::scoped_thread_local;
use smallvec::SmallVec;
use std::any::Any;
use std::cell::RefCell;
use std::panic;
use std::rc::Rc;
scoped_thread_local! {
static EXECUTION_STATE: RefCell<ExecutionState>
}
pub(crate) struct Execution {
must: Rc<RefCell<Must>>,
}
impl Execution {
pub(crate) fn new(must: Rc<RefCell<Must>>) -> Self {
Self { must }
}
}
impl Execution {
pub(crate) fn run<F>(mut self, f: F)
where
F: FnOnce() + Send + 'static,
{
let state = RefCell::new(ExecutionState::new(Rc::clone(&self.must)));
let _guard = init_panic_hook();
EXECUTION_STATE.set(&state, move || {
ExecutionState::spawn_thread(
f,
self.must.borrow().config().stack_size,
Some(format!("main-thread-{:?}", std::thread::current().id())),
);
let panic_payload = match panic::catch_unwind(panic::AssertUnwindSafe(|| {
while self.step() {}
})) {
Ok(()) => None,
Err(e) => {
ExecutionState::with(|state| {
state.current_task = ScheduledTask::Stopped;
});
Some(e)
}
};
ExecutionState::cleanup();
if let Some(payload) = panic_payload {
panic::resume_unwind(payload);
}
});
}
#[inline]
fn step(&mut self) -> bool {
enum NextStep {
Task(Rc<RefCell<PooledContinuation>>),
Failure(String),
Finished,
}
let next_step = ExecutionState::with(|state| {
if let Err(msg) = state.schedule() {
return NextStep::Failure(msg);
}
state.advance_to_next_task();
match state.current_task {
ScheduledTask::Some(tid) => {
let task = state.get(tid);
NextStep::Task(Rc::clone(&task.continuation))
}
ScheduledTask::Finished => {
if state.tasks.iter().any(|t| !t.finished()) {
let blocked_tasks = state
.tasks
.iter()
.filter(|t| !t.finished())
.map(|t| {
format!(
"{} (task {})",
t.name().unwrap_or_else(|| "<unknown>".to_string()),
t.id().0,
)
})
.collect::<Vec<_>>();
NextStep::Failure(
format!("deadlock! blocked tasks: [{}]", blocked_tasks.join(", ")), )
} else {
NextStep::Finished
}
}
ScheduledTask::Stopped => NextStep::Finished,
ScheduledTask::None => NextStep::Failure(
"no task was scheduled".to_string(),
),
}
});
let ret = match next_step {
NextStep::Task(continuation) => panic::catch_unwind(panic::AssertUnwindSafe(|| {
continuation.borrow_mut().resume()
})),
NextStep::Failure(
msg, ) => {
let pos = ExecutionState::failure_info().map(|(_, pos)| pos);
let message = persist_task_failure(msg, pos);
panic!("{}", message);
}
NextStep::Finished => return false,
};
match ret {
Ok(true) => {
ExecutionState::with(|state| state.current_mut().finish());
}
Ok(false) => {}
Err(e) => {
let (name, pos) = ExecutionState::failure_info().unwrap();
let message = persist_task_failure(name, Some(pos));
let payload: Box<dyn Any + Send> = match e.downcast::<String>() {
Ok(panic_msg) => {
Box::new(format!("{}\noriginal panic: {}", message, panic_msg))
}
Err(panic) => panic,
};
panic::resume_unwind(payload);
}
}
true
}
}
pub(crate) struct ExecutionState {
pub(crate) tasks: SmallVec<[Task; DEFAULT_INLINE_TASKS]>,
current_task: ScheduledTask,
next_task: ScheduledTask,
pub must: Rc<RefCell<Must>>,
#[cfg(debug_assertions)]
has_cleaned_up: bool,
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
enum ScheduledTask {
None, Some(TaskId), Stopped, Finished, }
impl ScheduledTask {
fn id(&self) -> Option<TaskId> {
match self {
ScheduledTask::Some(tid) => Some(*tid),
_ => None,
}
}
fn take(&mut self) -> Self {
std::mem::replace(self, ScheduledTask::None)
}
}
impl ExecutionState {
fn new(must: Rc<RefCell<Must>>) -> ExecutionState {
Self {
tasks: SmallVec::new(),
current_task: ScheduledTask::None,
next_task: ScheduledTask::None,
must,
#[cfg(debug_assertions)]
has_cleaned_up: false,
}
}
#[inline]
pub(crate) fn with<F, T>(f: F) -> T
where
F: FnOnce(&mut ExecutionState) -> T,
{
Self::try_with(f).expect("The TraceForge API (spawn, recv_msg, etc.) should be used only from threads launched using traceforge::spawn and only inside a TraceForge test.")
}
#[inline]
pub(crate) fn try_with<F, T>(f: F) -> Option<T>
where
F: FnOnce(&mut ExecutionState) -> T,
{
if EXECUTION_STATE.is_set() {
EXECUTION_STATE.with(|cell| {
if let Ok(mut state) = cell.try_borrow_mut() {
Some(f(&mut state))
} else {
None
}
})
} else {
None
}
}
pub(crate) fn spawn_thread<F>(
f: F,
stack_size: usize,
name: Option<String>,
) -> TaskId
where
F: FnOnce() + Send + 'static,
{
Self::with(|state| {
let task_id = TaskId(state.tasks.len());
let task = Task::from_closure(f, stack_size, task_id, name);
state.tasks.push(task);
task_id
})
}
fn cleanup() {
let (mut tasks, final_state) = Self::with(|state| {
assert!(
state.current_task == ScheduledTask::Stopped
|| state.current_task == ScheduledTask::Finished
);
(
std::mem::replace(&mut state.tasks, SmallVec::new()),
state.current_task,
)
});
for task in tasks.drain(..) {
let finished = task.finished();
assert!(
final_state == ScheduledTask::Stopped || finished,
"execution finished but task is not"
);
match Rc::try_unwrap(task.continuation) {
Ok(refcell) => {
if !finished {
let mut pc = refcell.borrow_mut();
pc.cancel_gen();
pc.reinitialize_generator();
drop(pc);
}
drop(refcell);
}
Err(_) => panic!("couldn't cleanup a future"),
}
}
#[cfg(debug_assertions)]
Self::with(|state| state.has_cleaned_up = true);
}
pub(crate) fn maybe_yield() -> bool {
Self::with(|state| {
debug_assert!(
matches!(state.current_task, ScheduledTask::Some(_))
&& state.next_task == ScheduledTask::None,
"we're inside a task and scheduler should not yet have run"
);
let result = state.schedule();
if result.is_err() {
return true;
}
if state.current_task == state.next_task {
state.advance_to_next_task();
false
} else {
true
}
})
}
pub(crate) fn failure_info() -> Option<(String, Event)> {
let fi: Option<Option<(String, Event)>> = Self::try_with(|state| {
if let Some(task) = state.try_current() {
let name = task
.name()
.unwrap_or_else(|| format!("task-{:?}", task.id().0));
Some((name, state.curr_pos()))
} else {
None
}
});
fi.flatten()
}
pub(crate) fn current(&self) -> &Task {
self.get(self.current_task.id().unwrap())
}
pub(crate) fn current_mut(&mut self) -> &mut Task {
self.get_mut(self.current_task.id().unwrap())
}
pub(crate) fn try_current(&self) -> Option<&Task> {
self.try_get(self.current_task.id()?)
}
pub(crate) fn get(&self, id: TaskId) -> &Task {
self.try_get(id).unwrap()
}
pub(crate) fn get_mut(&mut self, id: TaskId) -> &mut Task {
self.tasks.get_mut(id.0).unwrap()
}
pub(crate) fn try_get(&self, id: TaskId) -> Option<&Task> {
self.tasks.get(id.0)
}
pub(crate) fn next_pos(&mut self) -> Event {
let tid = self.must.borrow().to_thread_id(self.current().id());
self.current_mut().instructions += 1;
let icount = self.current().instructions as u32;
Event::new(tid, icount)
}
pub(crate) fn prev_pos(&mut self) -> Event {
let tid = self.must.borrow().to_thread_id(self.current().id());
self.current_mut().instructions -= 1;
let icount = self.current().instructions as u32;
Event::new(tid, icount)
}
pub(crate) fn curr_pos(&self) -> Event {
let tid = self.must.borrow().to_thread_id(self.current().id());
let icount = self.current().instructions as u32;
Event::new(tid, icount)
}
fn schedule(&mut self) -> Result<(), String> {
if self.next_task != ScheduledTask::None {
return Ok(());
}
let runnable = self
.tasks
.iter()
.filter(|t| t.runnable())
.map(|t| (t.id, t.instructions))
.collect::<SmallVec<[_; DEFAULT_INLINE_TASKS]>>();
if runnable.is_empty() {
self.next_task = ScheduledTask::Finished;
return Ok(());
}
self.next_task = self
.must
.borrow_mut()
.next_task(&runnable, self.current_task.id())
.map(ScheduledTask::Some)
.unwrap_or(ScheduledTask::Stopped);
Ok(())
}
fn advance_to_next_task(&mut self) {
debug_assert_ne!(self.next_task, ScheduledTask::None);
self.current_task = self.next_task.take();
}
pub(crate) fn is_running(&self) -> bool {
matches!(self.current_task, ScheduledTask::Some(_))
}
}
#[cfg(debug_assertions)]
impl Drop for ExecutionState {
fn drop(&mut self) {
assert!(self.has_cleaned_up || std::thread::panicking());
}
}