use crate::runtime::failure::{init_panic_hook, persist_failure, persist_task_failure};
use crate::runtime::storage::{StorageKey, StorageMap};
use crate::runtime::task::clock::VectorClock;
use crate::runtime::task::{Task, TaskId, DEFAULT_INLINE_TASKS};
use crate::runtime::thread::continuation::PooledContinuation;
use crate::scheduler::{Schedule, Scheduler};
use crate::thread::thread_fn;
use crate::{Config, MaxSteps};
use scoped_tls::scoped_thread_local;
use smallvec::SmallVec;
use std::any::Any;
use std::cell::RefCell;
use std::future::Future;
use std::panic;
use std::rc::Rc;
use tracing::span::Entered;
use tracing::{span, trace, Level, Span};
scoped_thread_local! {
static EXECUTION_STATE: RefCell<ExecutionState>
}
pub(crate) struct Execution {
scheduler: Rc<RefCell<dyn Scheduler>>,
initial_schedule: Schedule,
}
impl Execution {
pub(crate) fn new(scheduler: Rc<RefCell<dyn Scheduler>>, initial_schedule: Schedule) -> Self {
Self {
scheduler,
initial_schedule,
}
}
}
impl Execution {
pub(crate) fn run<F>(mut self, config: &Config, f: F)
where
F: FnOnce() + Send + 'static,
{
let state = RefCell::new(ExecutionState::new(
config.clone(),
Rc::clone(&self.scheduler),
self.initial_schedule.clone(),
));
let _guard = init_panic_hook(config.clone());
EXECUTION_STATE.set(&state, move || {
ExecutionState::spawn_thread(
move || thread_fn(f, Default::default()),
config.stack_size,
Some("main-thread".to_string()),
Some(VectorClock::new()),
);
while self.step(config) {}
ExecutionState::cleanup();
});
}
#[inline]
fn step(&mut self, config: &Config) -> bool {
enum NextStep {
Task(Rc<RefCell<PooledContinuation>>),
Failure(String, Schedule),
Finished,
}
let next_step = ExecutionState::with(|state| {
if let Err(msg) = state.schedule() {
return NextStep::Failure(msg, state.current_schedule.clone());
}
state.advance_to_next_task();
match state.current_task {
ScheduledTask::Some(tid) => {
let task = state.get(tid);
NextStep::Task(Rc::clone(&task.continuation))
}
ScheduledTask::Finished => {
if state.tasks.iter().any(|t| !t.finished() && !t.detached) {
let blocked_tasks = state
.tasks
.iter()
.filter(|t| !t.finished())
.map(|t| {
format!(
"{} (task {}{}{})",
t.name().unwrap_or_else(|| "<unknown>".to_string()),
t.id().0,
if t.detached { ", detached" } else { "" },
if t.sleeping() { ", pending future" } else { "" },
)
})
.collect::<Vec<_>>();
NextStep::Failure(
format!("deadlock! blocked tasks: [{}]", blocked_tasks.join(", ")),
state.current_schedule.clone(),
)
} else {
NextStep::Finished
}
}
ScheduledTask::Stopped => NextStep::Finished,
ScheduledTask::None => {
NextStep::Failure("no task was scheduled".to_string(), state.current_schedule.clone())
}
}
});
let ret = match next_step {
NextStep::Task(continuation) => {
panic::catch_unwind(panic::AssertUnwindSafe(|| continuation.borrow_mut().resume()))
}
NextStep::Failure(msg, schedule) => {
let message = persist_failure(&schedule, msg, config, false);
panic!("{}", message);
}
NextStep::Finished => return false,
};
match ret {
Ok(true) => {
ExecutionState::with(|state| state.current_mut().finish());
}
Ok(false) => {}
Err(e) => {
let (name, schedule) = ExecutionState::failure_info().unwrap();
let message = persist_task_failure(&schedule, name, config, true);
let payload: Box<dyn Any + Send> = match e.downcast::<String>() {
Ok(panic_msg) => Box::new(format!("{}\noriginal panic: {}", message, panic_msg)),
Err(panic) => panic,
};
panic::resume_unwind(payload);
}
}
true
}
}
pub(crate) struct ExecutionState {
pub config: Config,
tasks: SmallVec<[Task; DEFAULT_INLINE_TASKS]>,
current_task: ScheduledTask,
next_task: ScheduledTask,
has_yielded: bool,
context_switches: usize,
storage: StorageMap,
scheduler: Rc<RefCell<dyn Scheduler>>,
current_schedule: Schedule,
current_span_entered: Option<Entered<'static>>,
current_span: Span,
#[cfg(debug_assertions)]
has_cleaned_up: bool,
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
enum ScheduledTask {
None, Some(TaskId), Stopped, Finished, }
impl ScheduledTask {
fn id(&self) -> Option<TaskId> {
match self {
ScheduledTask::Some(tid) => Some(*tid),
_ => None,
}
}
fn take(&mut self) -> Self {
std::mem::replace(self, ScheduledTask::None)
}
}
impl ExecutionState {
fn new(config: Config, scheduler: Rc<RefCell<dyn Scheduler>>, initial_schedule: Schedule) -> Self {
Self {
config,
tasks: SmallVec::new(),
current_task: ScheduledTask::None,
next_task: ScheduledTask::None,
has_yielded: false,
context_switches: 0,
storage: StorageMap::new(),
scheduler,
current_schedule: initial_schedule,
current_span_entered: None,
current_span: Span::none(),
#[cfg(debug_assertions)]
has_cleaned_up: false,
}
}
#[inline]
pub(crate) fn with<F, T>(f: F) -> T
where
F: FnOnce(&mut ExecutionState) -> T,
{
Self::try_with(f).expect("Shuttle internal error: cannot access ExecutionState. are you trying to access a Shuttle primitive from outside a Shuttle test?")
}
#[inline]
pub(crate) fn try_with<F, T>(f: F) -> Option<T>
where
F: FnOnce(&mut ExecutionState) -> T,
{
if EXECUTION_STATE.is_set() {
EXECUTION_STATE.with(|cell| {
if let Ok(mut state) = cell.try_borrow_mut() {
Some(f(&mut state))
} else {
None
}
})
} else {
None
}
}
pub(crate) fn me() -> TaskId {
Self::with(|s| s.current().id())
}
pub(crate) fn spawn_future<F>(future: F, stack_size: usize, name: Option<String>) -> TaskId
where
F: Future<Output = ()> + Send + 'static,
{
Self::with(|state| {
let task_id = TaskId(state.tasks.len());
let clock = state.increment_clock_mut(); clock.extend(task_id); let task = Task::from_future(future, stack_size, task_id, name, clock.clone());
state.tasks.push(task);
task_id
})
}
pub(crate) fn spawn_thread<F>(
f: F,
stack_size: usize,
name: Option<String>,
mut initial_clock: Option<VectorClock>,
) -> TaskId
where
F: FnOnce() + Send + 'static,
{
Self::with(|state| {
let task_id = TaskId(state.tasks.len());
let clock = if let Some(ref mut clock) = initial_clock {
clock
} else {
state.increment_clock_mut()
};
clock.extend(task_id); let task = Task::from_closure(f, stack_size, task_id, name, clock.clone());
state.tasks.push(task);
task_id
})
}
fn cleanup() {
let (mut tasks, final_state) = Self::with(|state| {
assert!(state.current_task == ScheduledTask::Stopped || state.current_task == ScheduledTask::Finished);
(std::mem::replace(&mut state.tasks, SmallVec::new()), state.current_task)
});
for task in tasks.drain(..) {
assert!(
final_state == ScheduledTask::Stopped || task.finished() || task.detached,
"execution finished but task is not"
);
Rc::try_unwrap(task.continuation)
.map_err(|_| ())
.expect("couldn't cleanup a future");
}
while Self::with(|state| state.storage.pop()).is_some() {}
#[cfg(debug_assertions)]
Self::with(|state| state.has_cleaned_up = true);
}
pub(crate) fn is_finished(&self) -> bool {
self.current_task == ScheduledTask::Stopped || self.current_task == ScheduledTask::Finished
}
pub(crate) fn maybe_yield() -> bool {
Self::with(|state| {
debug_assert!(
matches!(state.current_task, ScheduledTask::Some(_)) && state.next_task == ScheduledTask::None,
"we're inside a task and scheduler should not yet have run"
);
let result = state.schedule();
if result.is_err() {
return true;
}
if state.current_task == state.next_task {
state.advance_to_next_task();
false
} else {
true
}
})
}
pub(crate) fn request_yield() {
Self::with(|state| {
state.has_yielded = true;
});
}
pub(crate) fn should_stop() -> bool {
std::thread::panicking()
|| Self::with(|s| {
assert_ne!(s.current_task, ScheduledTask::Finished);
s.current_task == ScheduledTask::Stopped
})
}
pub(crate) fn failure_info() -> Option<(String, Schedule)> {
Self::try_with(|state| {
let name = if let Some(task) = state.try_current() {
task.name().unwrap_or_else(|| format!("task-{:?}", task.id().0))
} else {
"<unknown>".into()
};
(name, state.current_schedule.clone())
})
}
#[inline]
pub(crate) fn next_u64() -> u64 {
Self::with(|state| {
state.current_schedule.push_random();
state.scheduler.borrow_mut().next_u64()
})
}
pub(crate) fn current(&self) -> &Task {
self.get(self.current_task.id().unwrap())
}
pub(crate) fn current_mut(&mut self) -> &mut Task {
self.get_mut(self.current_task.id().unwrap())
}
pub(crate) fn try_current(&self) -> Option<&Task> {
self.try_get(self.current_task.id()?)
}
pub(crate) fn get(&self, id: TaskId) -> &Task {
self.try_get(id).unwrap()
}
pub(crate) fn get_mut(&mut self, id: TaskId) -> &mut Task {
self.tasks.get_mut(id.0).unwrap()
}
pub(crate) fn try_get(&self, id: TaskId) -> Option<&Task> {
self.tasks.get(id.0)
}
pub(crate) fn context_switches() -> usize {
Self::with(|state| state.context_switches)
}
pub(crate) fn get_storage<K: Into<StorageKey>, T: 'static>(&self, key: K) -> Option<&T> {
self.storage
.get(key.into())
.map(|result| result.expect("global storage is never destructed"))
}
pub(crate) fn init_storage<K: Into<StorageKey>, T: 'static>(&mut self, key: K, value: T) {
self.storage.init(key.into(), value);
}
pub(crate) fn get_clock(&self, id: TaskId) -> &VectorClock {
&self.tasks.get(id.0).unwrap().clock
}
pub(crate) fn get_clock_mut(&mut self, id: TaskId) -> &mut VectorClock {
&mut self.tasks.get_mut(id.0).unwrap().clock
}
pub(crate) fn update_clock(&mut self, clock: &VectorClock) {
let task = self.current_mut();
task.clock.increment(task.id);
task.clock.update(clock);
}
pub(crate) fn increment_clock(&mut self) -> &VectorClock {
let task = self.current_mut();
task.clock.increment(task.id);
&task.clock
}
pub(crate) fn increment_clock_mut(&mut self) -> &mut VectorClock {
let task = self.current_mut();
task.clock.increment(task.id);
&mut task.clock
}
fn schedule(&mut self) -> Result<(), String> {
if self.next_task != ScheduledTask::None {
return Ok(());
}
self.context_switches += 1;
match self.config.max_steps {
MaxSteps::FailAfter(n) if self.current_schedule.len() >= n => {
let msg = format!(
"exceeded max_steps bound {}. this might be caused by an unfair schedule (e.g., a spin loop)?",
n
);
return Err(msg);
}
MaxSteps::ContinueAfter(n) if self.current_schedule.len() >= n => {
self.next_task = ScheduledTask::Stopped;
return Ok(());
}
_ => {}
}
let mut unfinished_attached = false;
let runnable = self
.tasks
.iter()
.inspect(|t| unfinished_attached = unfinished_attached || (!t.finished() && !t.detached))
.filter(|t| t.runnable())
.map(|t| t.id)
.collect::<SmallVec<[_; DEFAULT_INLINE_TASKS]>>();
if runnable.is_empty() || (!unfinished_attached && runnable.iter().all(|id| self.get(*id).detached)) {
self.next_task = ScheduledTask::Finished;
return Ok(());
}
let is_yielding = std::mem::replace(&mut self.has_yielded, false);
self.next_task = self
.scheduler
.borrow_mut()
.next_task(&runnable, self.current_task.id(), is_yielding)
.map(ScheduledTask::Some)
.unwrap_or(ScheduledTask::Stopped);
trace!(?runnable, next_task=?self.next_task);
Ok(())
}
fn advance_to_next_task(&mut self) {
debug_assert_ne!(self.next_task, ScheduledTask::None);
self.current_task = self.next_task.take();
if let ScheduledTask::Some(tid) = self.current_task {
self.current_schedule.push_task(tid);
}
self.current_span_entered.take();
if let ScheduledTask::Some(tid) = self.current_task {
self.current_span = span!(Level::INFO, "step", i = self.current_schedule.len() - 1, task = tid.0);
self.current_span_entered = Some(unsafe { extend_span_entered_lt(self.current_span.enter()) });
}
}
}
unsafe fn extend_span_entered_lt<'a>(entered: Entered<'a>) -> Entered<'static> {
std::mem::transmute(entered)
}
#[cfg(debug_assertions)]
impl Drop for ExecutionState {
fn drop(&mut self) {
assert!(self.has_cleaned_up || std::thread::panicking());
}
}