pub mod channel;
mod cons;
pub mod coverage;
pub use coverage::{CoverageInfo, ExecutionId};
mod event;
mod event_label;
mod exec_graph;
mod exec_pool;
pub mod future;
mod identifier;
mod indexed_map;
pub mod loc;
pub mod monitor_types;
pub mod msg;
mod must;
mod predicate;
mod replay;
mod revisit;
mod runtime;
pub mod sync;
mod telemetry;
mod testmode;
use future::spawn_receive;
pub use testmode::{parallel_test, test};
pub mod thread;
mod vector_clock;
pub use crate::msg::Val;
use channel::{cons_to_model, self_loc_comm, thread_loc_comm, Receiver};
use coverage::ExecutionObserver;
use event_label::{Block, BlockType, CToss, Choice, RecvMsg, SendMsg};
use loc::{CommunicationModel, Loc, RecvLoc, SendLoc};
use msg::Message;
use rand::{prelude::*, rngs::OsRng, RngCore};
use replay::ReplayInformation;
use runtime::execution::{Execution, ExecutionState};
use runtime::failure::persist_task_failure;
use runtime::thread::continuation::{ContinuationPool, CONTINUATION_POOL};
use runtime::thread::switch;
use log::{debug, info, trace};
use serde::{Deserialize, Serialize};
use smallvec::alloc::sync::Arc;
use std::cell::RefCell;
use std::collections::HashMap;
use std::future::Future;
use std::iter;
use std::rc::Rc;
use std::time::Instant;
use thread::{spawn_without_switch, JoinHandle, ThreadId};
use crate::event_label::*;
use crate::exec_pool::ExecutionPool;
use crate::must::{MonitorInfo, Must};
use crate::predicate::{normalize_vec_tag, PredicateType};
use std::any::type_name;
fn type_of<T>(_: &T) -> &'static str {
type_name::<T>()
}
#[derive(Default, Clone, Debug)]
pub struct Stats {
pub execs: usize,
pub block: usize,
pub coverage: CoverageInfo,
}
impl Stats {
pub(crate) fn add(&mut self, rhs: &Stats) {
self.execs += rhs.execs;
self.block += rhs.block;
self.coverage.merge(&rhs.coverage);
}
}
#[derive(PartialEq, Eq, Default, Clone, Copy, Serialize, Deserialize, Debug)]
pub enum SchedulePolicy {
#[default]
LTR,
Arbitrary,
}
#[derive(PartialEq, Clone, Copy, Serialize, Deserialize, Debug)]
pub(crate) enum ExplorationMode {
Verification,
Estimation,
}
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub enum ConsType {
MO,
Bag,
#[deprecated]
WB,
FIFO,
#[deprecated]
CD,
Causal,
Mailbox,
}
impl Serialize for ConsType {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(match self {
ConsType::MO => "MO",
ConsType::Bag => "Bag",
#[allow(deprecated)]
ConsType::WB => "WB",
ConsType::FIFO => "FIFO",
#[allow(deprecated)]
ConsType::CD => "CD",
ConsType::Causal => "Causal",
ConsType::Mailbox => "Mailbox",
})
}
}
impl<'de> Deserialize<'de> for ConsType {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
match s.as_str() {
"MO" => Ok(ConsType::MO),
"Bag" => Ok(ConsType::Bag),
#[allow(deprecated)]
"WB" => Ok(ConsType::WB),
"FIFO" => Ok(ConsType::FIFO),
#[allow(deprecated)]
"CD" => Ok(ConsType::CD),
"Causal" => Ok(ConsType::Causal),
"Mailbox" => Ok(ConsType::Mailbox),
_ => Err(serde::de::Error::custom(format!(
"Invalid ConsType variant: {}",
s
))),
}
}
}
#[derive(Clone, Serialize, Deserialize)]
pub struct Config {
pub(crate) stack_size: usize,
pub(crate) progress_report: usize,
pub(crate) thread_threshold: u32,
pub(crate) warnings_as_errors: bool,
pub(crate) keep_going_after_error: bool,
pub(crate) mode: ExplorationMode,
pub(crate) cons_type: ConsType,
pub(crate) schedule_policy: SchedulePolicy,
pub(crate) max_iterations: Option<u64>,
pub(crate) verbose: usize,
pub(crate) seed: u64,
pub(crate) symmetry: bool,
pub(crate) vr: bool,
pub(crate) lossy_budget: usize,
pub(crate) dot_file: Option<String>,
pub(crate) trace_file: Option<String>,
pub(crate) error_trace_file: Option<String>,
pub(crate) turmoil_trace_file: Option<String>,
pub(crate) parallel: bool,
pub(crate) parallel_workers: Option<usize>,
pub(crate) keep_per_execution_coverage: bool,
pub(crate) predetermined_choices: HashMap<String, Vec<Vec<bool>>>,
#[serde(skip)]
pub(crate) callbacks: Arc<Mutex<Vec<Box<dyn ExecutionObserver + Send>>>>,
}
impl Config {
pub fn builder() -> ConfigBuilder {
ConfigBuilder::new()
}
pub(crate) fn rename_files(&mut self, suffix: String) {
if let Some(dot) = &self.dot_file {
self.dot_file = Some(dot.to_owned() + &suffix);
}
if let Some(trace) = &self.trace_file {
self.trace_file = Some(trace.to_owned() + &suffix);
}
if let Some(turmoiltf) = &self.turmoil_trace_file {
self.turmoil_trace_file = Some(turmoiltf.to_owned() + &suffix);
}
}
}
impl Default for Config {
fn default() -> Self {
ConfigBuilder::new().build()
}
}
pub struct ConfigBuilder(Config);
impl Default for ConfigBuilder {
fn default() -> Self {
Self::new()
}
}
impl ConfigBuilder {
pub fn new() -> Self {
ConfigBuilder(Config {
stack_size: 0x8000,
progress_report: 0,
thread_threshold: 1000,
warnings_as_errors: false,
keep_going_after_error: false,
mode: ExplorationMode::Verification,
cons_type: ConsType::FIFO,
schedule_policy: SchedulePolicy::LTR,
max_iterations: None,
verbose: 0,
seed: OsRng.next_u64(),
symmetry: false,
vr: false,
lossy_budget: 0,
dot_file: None,
trace_file: None,
error_trace_file: None,
turmoil_trace_file: None,
parallel: false,
parallel_workers: None,
keep_per_execution_coverage: false,
predetermined_choices: HashMap::new(),
callbacks: Arc::new(Mutex::new(Vec::new())),
})
}
fn check_valid(self) -> Self {
if self.0.symmetry {
panic!("Symmetry reduction is currently not supported")
}
if self.0.symmetry && self.0.schedule_policy == SchedulePolicy::Arbitrary {
eprintln!("Symmetry reduction can only be used with LTR!");
std::process::exit(exitcode::CONFIG);
} else {
self
}
}
#[allow(dead_code)]
pub(crate) fn with_mode(mut self, m: ExplorationMode) -> Self {
self.0.mode = m;
self
}
pub fn with_stack_size(mut self, s: usize) -> Self {
self.0.stack_size = s;
self
}
pub fn with_progress_report(mut self, n: usize) -> Self {
self.0.progress_report = n;
self
}
pub fn with_thread_threshold(mut self, s: u32) -> Self {
self.0.thread_threshold = s;
self
}
pub fn with_warnings_as_errors(mut self, b: bool) -> Self {
self.0.warnings_as_errors = b;
self
}
pub fn with_keep_going_after_error(mut self, b: bool) -> Self {
self.0.keep_going_after_error = b;
self
}
pub fn with_cons_type(mut self, t: ConsType) -> Self {
self.0.cons_type = t;
self
}
pub fn with_policy(mut self, p: SchedulePolicy) -> Self {
self.0.schedule_policy = p;
self
}
pub fn with_max_iterations(mut self, n: u64) -> Self {
self.0.max_iterations = Some(n);
self
}
pub fn with_verbose(mut self, v: usize) -> Self {
self.0.verbose = v;
self
}
pub fn with_seed(mut self, s: u64) -> Self {
self.0.seed = s;
self
}
pub fn with_symmetry(mut self, s: bool) -> Self {
self.0.symmetry = s;
self
}
pub fn with_value(self, _s: bool) -> Self {
panic!("Value reduction is currently not supported")
}
pub fn with_lossy(mut self, budget: usize) -> Self {
self.0.lossy_budget = budget;
self
}
pub fn with_dot_out(mut self, filename: &str) -> Self {
self.0.dot_file = Some(filename.to_string());
self
}
pub fn with_trace_out(mut self, filename: &str) -> Self {
self.0.trace_file = Some(filename.to_string());
self
}
pub fn with_turmoil_trace_out(mut self, filename: &str) -> Self {
self.0.turmoil_trace_file = Some(filename.to_string());
self
}
pub fn with_error_trace(mut self, filename: &str) -> Self {
self.0.error_trace_file = Some(filename.to_string());
self
}
pub fn with_parallel(mut self, use_parallel: bool) -> Self {
self.0.parallel = use_parallel;
self
}
pub fn with_parallel_workers(mut self, max_workers: usize) -> Self {
self.0.parallel_workers = Some(max_workers);
self
}
pub fn with_callback(self, cb: Box<dyn ExecutionObserver + Send>) -> Self {
self.0
.callbacks
.lock()
.expect("Could not lock callbacks configuration")
.push(cb);
self
}
pub fn with_keep_per_execution_coverage(mut self, keep: bool) -> Self {
self.0.keep_per_execution_coverage = keep;
self
}
pub fn with_predetermined_choices(mut self, choices: HashMap<String, Vec<Vec<bool>>>) -> Self {
self.0.predetermined_choices = choices;
self
}
pub fn build(self) -> Config {
self.check_valid().0
}
}
pub fn verify<F>(conf: Config, f: F) -> Stats
where
F: Fn() + Send + Sync + 'static,
{
let f = Arc::new(f);
if conf.parallel {
ExecutionPool::new(&conf).explore(&f)
} else {
let must = Rc::new(RefCell::new(Must::new(conf, false)));
explore(&must, &f);
let stats = must.borrow().stats();
stats
}
}
pub fn replay<F>(f: F, error_file: &str)
where
F: Fn() + Send + Sync + 'static,
{
let replay_str = std::fs::read_to_string(error_file).unwrap();
let replay_info: ReplayInformation = serde_json::from_str(&replay_str).unwrap();
replay_info.config().verbose = 2;
let must = Rc::new(RefCell::new(Must::new(replay_info.config(), true)));
let f = Arc::new(f);
info!("Sorted Execution Graph:");
info!("{}", replay_info.sorted_error_graph());
must.borrow_mut().load_replay_information(replay_info);
explore(&must, &f);
}
pub fn estimate_execs<F>(f: F) -> f64
where
F: Fn() + Send + Sync + 'static,
{
estimate_execs_with_samples(f, 1000)
}
pub fn estimate_execs_with_samples<F>(f: F, samples: u128) -> f64
where
F: Fn() + Send + Sync + 'static,
{
assert!(samples > 0);
estimate_execs_with_config(Config::builder().build(), f, samples)
}
pub fn estimate_execs_with_config<F>(mut config: Config, f: F, samples: u128) -> f64
where
F: Fn() + Send + Sync + 'static,
{
config.mode = ExplorationMode::Estimation;
config.schedule_policy = SchedulePolicy::LTR;
config.cons_type = ConsType::FIFO;
let f = Arc::new(f);
let num_samples = samples;
let mut estimate_sum: f64 = 0.0;
let mut nb_executions = 0;
for _ in 0..num_samples {
let must = Rc::new(RefCell::new(Must::new(config.clone(), false)));
explore(&must, &f);
estimate_sum += must.borrow().execs_est();
let stats = must.borrow().stats();
nb_executions += stats.execs + stats.block;
}
info!("[lib.rs] ESTIMATE ran {} executions", nb_executions);
estimate_sum / (num_samples as f64)
}
fn explore<F>(must: &Rc<RefCell<must::Must>>, f: &Arc<F>)
where
F: Fn() + Send + Sync + 'static,
{
must.borrow_mut().started_at = Instant::now();
Must::set_current(Some(must.clone()));
CONTINUATION_POOL.set(&ContinuationPool::new(), || loop {
let f = Arc::clone(f);
let execution = Execution::new(Rc::clone(must));
Must::begin_execution(must);
execution.run(move || f());
if Must::complete_execution(must) {
break;
}
});
must.borrow_mut().run_metrics_at_end();
}
fn explore_with_pool<F>(must: &Rc<RefCell<must::Must>>, f: &Arc<F>)
where
F: Fn() + Send + Sync + 'static,
{
must.borrow_mut().started_at = Instant::now();
Must::set_current(Some(must.clone()));
loop {
let f = Arc::clone(f);
let execution = Execution::new(Rc::clone(must));
Must::begin_execution(must);
execution.run(move || f());
if Must::complete_execution(must) {
break;
}
}
must.borrow_mut().run_metrics_at_end();
}
type MonitorCreateFn = fn(ThreadId, ThreadId, Val) -> Option<Val>;
type MonitorAcceptorFn = fn(ThreadId, ThreadId, Val) -> bool;
pub fn spawn_monitor<F, T>(
monitor_function: F,
create_fn: MonitorCreateFn,
acceptor_fn: MonitorAcceptorFn,
monitor: Arc<Mutex<dyn Monitor>>,
) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Message + 'static,
{
ExecutionState::with(|s| s.must.borrow().validate_monitor_spawn(&s.curr_pos()));
let jh = spawn_without_switch(monitor_function, None, true, None, None);
let thread_id = jh.thread().id();
ExecutionState::with(|s| {
info!("[lib.rs] Registering monitor: {:?}", thread_id);
let monitor_info = MonitorInfo {
thread_id,
create_fn,
acceptor_fn,
monitor_struct: monitor,
};
s.must.borrow_mut().handle_register_mon(monitor_info);
});
switch();
jh
}
pub fn publish<T: Message + 'static>(val: T) {
ExecutionState::with(|s| {
let thread_id = s.must.borrow().to_thread_id(s.current().id());
s.must.borrow_mut().publish(thread_id, val);
});
}
fn validate_locs(locs: &Vec<&Loc>) {
for (i, c1) in locs.iter().enumerate() {
for c2 in locs.iter().skip(i + 1) {
if c1 == c2 {
panic!("Detected duplicate channel {:?} in select", c1);
}
}
}
}
fn get_execution_state_info() -> (ThreadId, CommunicationModel) {
ExecutionState::with(|s| {
(
s.curr_pos().thread,
cons_to_model(s.must.borrow().config.cons_type),
)
})
}
fn expect_msg<T: 'static>(val: Val) -> T {
match val.as_any().downcast::<T>() {
Ok(v) => *v,
Err(_) => {
panic!(
"wrong message return type; expecting {} but got {}",
type_name::<T>(),
val.type_name
);
}
}
}
pub(crate) fn select_val_block<'a, T: Message + 'static, U: Message + 'static>(
primary: &'a Receiver<T>,
secondary: &'a Receiver<U>,
) -> (Val, usize) {
let locs = iter::once(&primary.inner).chain(iter::once(&secondary.inner));
let comm = primary.comm;
recv_val_block_with_tag(locs, comm, None)
}
pub fn async_recv_msg<T>(recv: &Receiver<T>) -> impl Future<Output = T>
where
T: Message + Clone + 'static,
{
futures::TryFutureExt::unwrap_or_else(spawn_receive(recv), |_| {
panic!("Async receive future failed!")
})
}
pub fn select_msg<'a, T: Message + 'static>(
recvs: impl Iterator<Item = &'a &'a Receiver<T>>,
comm: CommunicationModel,
) -> Option<(T, usize)> {
let locs = recvs.map(|r| &r.inner);
recv_msg_with_tag(locs, comm, None)
}
pub fn select_tagged_msg<'a, F, T>(
recvs: impl Iterator<Item = &'a &'a Receiver<T>>,
comm: CommunicationModel,
f: F,
) -> Option<(T, usize)>
where
F: Fn(ThreadId, Option<u32>) -> bool + 'static + Send + Sync,
T: Message + 'static,
{
select_vec_tagged_msg(recvs, comm, move |tid, tag| {
let tag = tag.and_then(|tags| tags.first().copied());
f(tid, tag)
})
}
pub fn select_vec_tagged_msg<'a, F, T>(
recvs: impl Iterator<Item = &'a &'a Receiver<T>>,
comm: CommunicationModel,
f: F,
) -> Option<(T, usize)>
where
F: Fn(ThreadId, Option<Vec<u32>>) -> bool + 'static + Send + Sync,
T: Message + 'static,
{
let locs = recvs.map(|r| &r.inner);
recv_msg_with_tag(locs, comm, Some(PredicateType(Arc::new(move |tid, tag| {
f(tid, normalize_vec_tag(tag))
}))))
}
pub fn select_msg_block<'a, T: Message + 'static>(
recvs: impl Iterator<Item = &'a &'a Receiver<T>>,
comm: CommunicationModel,
) -> (T, usize) {
let locs = recvs.map(|r| &r.inner);
recv_msg_block_with_tag(locs, comm, None)
}
pub fn select_tagged_msg_block<'a, F, T>(
recvs: impl Iterator<Item = &'a &'a Receiver<T>>,
comm: CommunicationModel,
f: F,
) -> (T, usize)
where
F: Fn(ThreadId, Option<u32>) -> bool + 'static + Send + Sync,
T: Message + 'static,
{
select_vec_tagged_msg_block(recvs, comm, move |tid, tag| {
let tag = tag.and_then(|tags| tags.first().copied());
f(tid, tag)
})
}
pub fn select_vec_tagged_msg_block<'a, F, T>(
recvs: impl Iterator<Item = &'a &'a Receiver<T>>,
comm: CommunicationModel,
f: F,
) -> (T, usize)
where
F: Fn(ThreadId, Option<Vec<u32>>) -> bool + 'static + Send + Sync,
T: Message + 'static,
{
let locs = recvs.map(|r| &r.inner);
recv_msg_block_with_tag(locs, comm, Some(PredicateType(Arc::new(move |tid, tag| {
f(tid, normalize_vec_tag(tag))
}))))
}
pub fn send_msg<T: Message + 'static>(t: ThreadId, v: T) {
let (loc, comm) = thread_loc_comm(t);
send_msg_with_tag(v, None, &loc, comm, false)
}
pub fn send_lossy_msg<T: Message + 'static>(t: ThreadId, v: T) {
let (loc, comm) = thread_loc_comm(t);
send_msg_with_tag(v, None, &loc, comm, true)
}
pub fn send_tagged_msg<T: Message + 'static>(t: ThreadId, tag: u32, v: T) {
let (loc, comm) = thread_loc_comm(t);
send_msg_with_tag(v, Some(tag), &loc, comm, false)
}
pub fn send_tagged_lossy_msg<T: Message + 'static>(t: ThreadId, tag: u32, v: T) {
let (loc, comm) = thread_loc_comm(t);
send_msg_with_tag(v, Some(tag), &loc, comm, true)
}
pub fn send_vec_tagged_msg<T: Message + 'static>(t: ThreadId, tag: Vec<u32>, v: T) {
let (loc, comm) = thread_loc_comm(t);
send_msg_with_vec_tag(v, Some(tag), &loc, comm, false)
}
pub fn send_vec_tagged_lossy_msg<T: Message + 'static>(t: ThreadId, tag: Vec<u32>, v: T) {
let (loc, comm) = thread_loc_comm(t);
send_msg_with_vec_tag(v, Some(tag), &loc, comm, true)
}
fn send_msg_with_tag<T: Message + 'static>(
v: T,
tag: Option<u32>,
loc: &Loc,
comm: CommunicationModel,
lossy: bool,
) {
send_msg_with_vec_tag(v, tag.map(|t| vec![t]), loc, comm, lossy);
}
fn send_msg_with_vec_tag<T: Message + 'static>(
v: T,
tag: Option<Vec<u32>>,
loc: &Loc,
comm: CommunicationModel,
lossy: bool,
) {
let tag = normalize_vec_tag(tag);
switch();
ExecutionState::with(|s| {
let pos = s.next_pos();
let sender_tid = pos.thread;
let val = Val::new(v);
let mut monitor_msgs = MonitorSends::new();
for (thread_id, mon) in s.must.borrow_mut().monitors().iter() {
let monitor_accepts_this_msg = (mon.acceptor_fn)(pos.thread, sender_tid, val.clone());
if monitor_accepts_this_msg {
let mvalue = (mon.create_fn)(pos.thread, sender_tid, val.clone());
if let Some(mv) = mvalue {
trace!(
"Produced value {:?} of type {}",
mv,
String::from(type_of(&mv))
);
monitor_msgs.insert(*thread_id, mv);
}
}
}
trace!(
"[lib.rs] The number of required monitor messages {}",
monitor_msgs.len()
);
let slab = SendMsg::new(
pos,
SendLoc::new(loc, sender_tid, tag),
comm,
val,
monitor_msgs,
lossy,
);
let maybe_stuck = s.must.borrow_mut().handle_send(slab);
maybe_stuck.iter().for_each(|r| {
let task = match s.must.borrow().to_task_id(r.thread) {
Some(task) => task,
None => return,
};
let task = s.get_mut(task);
if !task.is_stuck() {
return;
}
if task.instructions as u32 == r.index - 1 {
task.unstuck();
}
});
});
}
pub fn recv_msg<T: Message + 'static>() -> Option<T> {
let (loc, comm) = self_loc_comm();
recv_msg_with_tag(iter::once(&loc), comm, None).map(|x| x.0)
}
pub fn recv_tagged_msg<F, T>(f: F) -> Option<T>
where
F: Fn(ThreadId, Option<u32>) -> bool + 'static + Send + Sync,
T: Message + 'static,
{
recv_vec_tagged_msg(move |tid, tag| {
let tag = tag.and_then(|tags| tags.first().copied());
f(tid, tag)
})
}
pub fn recv_vec_tagged_msg<F, T>(f: F) -> Option<T>
where
F: Fn(ThreadId, Option<Vec<u32>>) -> bool + 'static + Send + Sync,
T: Message + 'static,
{
let (loc, comm) = self_loc_comm();
recv_msg_with_tag(
iter::once(&loc),
comm,
Some(PredicateType(Arc::new(move |tid, tag| {
f(tid, normalize_vec_tag(tag))
}))),
)
.map(|x| x.0)
}
fn recv_msg_with_tag<'a, T: Message + 'static>(
locs: impl Iterator<Item = &'a Loc>,
comm: CommunicationModel,
tag: Option<PredicateType>,
) -> Option<(T, usize)> {
recv_val_with_tag(locs, comm, tag).map(|(val, ind)| (expect_msg(val), ind))
}
fn recv_val_with_tag<'a>(
locs: impl Iterator<Item = &'a Loc>,
comm: CommunicationModel,
tag: Option<PredicateType>,
) -> Option<(Val, usize)> {
let locs = locs.collect::<Vec<_>>();
validate_locs(&locs);
loop {
switch();
let locs = locs.clone();
let tag = tag.clone();
let (val, ind) = ExecutionState::with(|s| {
let pos = s.next_pos();
s.must.borrow_mut().handle_recv(
RecvMsg::new(pos, RecvLoc::new(locs, tag), comm, None, true),
false,
)
});
if val.as_ref().is_some_and(Val::is_pending) {
ExecutionState::with(|s| {
s.current_mut().stuck();
s.prev_pos();
});
} else {
return val.map(|v| (v, ind.unwrap()));
}
}
}
pub fn recv_msg_block<T: Message + 'static>() -> T {
let (loc, comm) = self_loc_comm();
recv_msg_block_with_tag(iter::once(&loc), comm, None).0
}
pub fn recv_tagged_msg_block<F, T>(f: F) -> T
where
F: Fn(ThreadId, Option<u32>) -> bool + 'static + Send + Sync,
T: Message + 'static,
{
recv_vec_tagged_msg_block(move |tid, tag| {
let tag = tag.and_then(|tags| tags.first().copied());
f(tid, tag)
})
}
pub fn recv_vec_tagged_msg_block<F, T>(f: F) -> T
where
F: Fn(ThreadId, Option<Vec<u32>>) -> bool + 'static + Send + Sync,
T: Message + 'static,
{
let (loc, comm) = self_loc_comm();
recv_msg_block_with_tag(
iter::once(&loc),
comm,
Some(PredicateType(Arc::new(move |tid, tag| {
f(tid, normalize_vec_tag(tag))
}))),
)
.0
}
fn recv_msg_block_with_tag<'a, T: Message + 'static>(
locs: impl Iterator<Item = &'a Loc>,
comm: CommunicationModel,
tag: Option<PredicateType>,
) -> (T, usize) {
let (val, ind) = recv_val_block_with_tag(locs, comm, tag);
(expect_msg(val), ind)
}
fn recv_val_block_with_tag<'a>(
locs: impl Iterator<Item = &'a Loc>,
comm: CommunicationModel,
tag: Option<PredicateType>,
) -> (Val, usize) {
let locs = locs.collect::<Vec<_>>();
validate_locs(&locs);
loop {
switch();
let locs = locs.clone();
let (val, ind) = ExecutionState::with(|s| {
let pos = s.next_pos();
s.must.borrow_mut().handle_recv(
RecvMsg::new(pos, RecvLoc::new(locs, tag.clone()), comm, None, false),
true,
)
});
if let Some(box_msg) = val {
if box_msg.is_pending() {
ExecutionState::with(|s| s.current_mut().stuck());
} else {
return (box_msg, ind.unwrap());
}
};
ExecutionState::with(|s| s.prev_pos());
}
}
pub fn nondet() -> bool {
switch();
ExecutionState::with(|s| {
let pos = s.next_pos();
let toss = s.must.borrow_mut().gen_bool();
s.must.borrow_mut().handle_ctoss(CToss::new(pos, toss))
})
}
#[deprecated(
since = "0.2.0",
note = "please use `nondet()` or `<bool>::nondet()` instead"
)]
pub fn coin_toss() -> bool {
nondet()
}
pub fn named_nondet(name: &str) -> bool {
switch();
ExecutionState::with(|s| {
let pos = s.next_pos();
let thread_id = pos.thread;
let mut must = s.must.borrow_mut();
let thread_idx = if let Some(name_map) = must.thread_index_map.get(name) {
if let Some(&idx) = name_map.get(&thread_id) {
idx
} else {
let idx = *must.next_thread_index.get(name).unwrap_or(&0);
must.next_thread_index.insert(name.to_string(), idx + 1);
must.thread_index_map
.entry(name.to_string())
.or_insert_with(HashMap::new)
.insert(thread_id, idx);
if let Some(ref mut frozen) = must.frozen_thread_index_map {
frozen
.entry(name.to_string())
.or_insert_with(HashMap::new)
.insert(thread_id, idx);
debug!(
"Assigned and froze thread index {} to ThreadId {:?} for choice '{}'",
idx, thread_id, name
);
}
idx
}
} else {
let idx = 0;
must.next_thread_index.insert(name.to_string(), 1);
let mut name_map = HashMap::new();
name_map.insert(thread_id, idx);
must.thread_index_map.insert(name.to_string(), name_map);
if let Some(ref mut frozen) = must.frozen_thread_index_map {
let mut frozen_name_map = HashMap::new();
frozen_name_map.insert(thread_id, idx);
frozen.insert(name.to_string(), frozen_name_map);
debug!(
"Assigned and froze thread index {} to ThreadId {:?} for choice '{}'",
idx, thread_id, name
);
}
idx
};
let occurrence = must
.choice_occurrence_counters
.entry((name.to_string(), thread_idx))
.or_insert(0);
let current_occurrence = *occurrence;
*occurrence += 1;
let predetermined_value = must
.config
.predetermined_choices
.get(name)
.and_then(|thread_choices| thread_choices.get(thread_idx))
.and_then(|choices| choices.get(current_occurrence))
.copied();
if let Some(value) = predetermined_value {
debug!(
"Using predetermined value {} for choice '{}' [thread_idx={}, occurrence={}]",
value, name, thread_idx, current_occurrence
);
return must.handle_ctoss_predetermined(CToss::new(pos, value), value);
}
debug!(
"Using nondeterministic exploration for choice '{}' [thread_idx={}, occurrence={}]",
name, thread_idx, current_occurrence
);
drop(must);
let toss = s.must.borrow_mut().gen_bool();
s.must.borrow_mut().handle_ctoss(CToss::new(pos, toss))
})
}
use crate::monitor_types::{Monitor, MonitorResult};
use std::ops::{Range, RangeInclusive};
use std::sync::Mutex;
pub trait TypeNondet {
fn nondet() -> Self;
}
impl TypeNondet for bool {
fn nondet() -> Self {
switch();
ExecutionState::with(|s| {
let pos = s.next_pos();
let toss = s.must.borrow_mut().gen_bool();
s.must.borrow_mut().handle_ctoss(CToss::new(pos, toss))
})
}
}
pub trait Nondet<T> {
fn nondet(&self) -> T;
}
impl Nondet<usize> for RangeInclusive<usize> {
fn nondet(&self) -> usize {
switch();
ExecutionState::with(|s| {
let pos = s.next_pos();
if self.start() > self.end() {
panic!("Range {:?} is not well-formed", self)
}
let mut r = RangeInclusive::new(*self.start(), *self.end());
s.must.borrow_mut().handle_choice(Choice::new(pos, &mut r))
})
}
}
impl Nondet<usize> for Range<usize> {
fn nondet(&self) -> usize {
switch();
ExecutionState::with(|s| {
let pos = s.next_pos();
if self.start >= self.end {
panic!("Range {:?} is not well-formed", self)
}
let mut r = RangeInclusive::new(self.start, self.end - 1);
s.must.borrow_mut().handle_choice(Choice::new(pos, &mut r))
})
}
}
#[doc(hidden)]
pub fn sample<
T: Clone + std::fmt::Debug + Serialize + for<'a> Deserialize<'a>,
D: Distribution<T>,
>(
distr: D,
max_samples: usize,
) -> T {
ExecutionState::with(|s| {
let pos = s.next_pos();
let mut must = s.must.borrow_mut();
must.handle_sample(pos, distr, max_samples)
})
}
#[macro_export]
macro_rules! assume {
($bool:expr) => {
$crate::assume_impl($bool, Some((stringify!($bool), file!(), line!())));
};
}
#[deprecated(note = "Use assume!(x) instead to get more information.")]
pub fn assume(cond: bool) {
assume_impl(cond, None)
}
#[doc(hidden)]
pub fn assume_impl(cond: bool, macro_info: Option<(&str, &str, u32)>) {
switch();
if !cond {
match macro_info {
Some((descr, file, line)) => {
log::info!(
"This execution is ending because `assume!({})` is false at {}:{}",
descr,
file,
line
);
}
None => {
log::info!("This execution is ending because `assume(???)` is false.");
log::warn!("Use macro `assume!(x)` instead to get better debug information.");
}
}
ExecutionState::with(|s| {
let pos = s.next_pos();
s.must
.borrow_mut()
.handle_block(Block::new(pos, BlockType::Assume))
});
switch();
}
}
pub fn assert(cond: bool) {
if !cond {
ExecutionState::with(|s| {
let pos = s.next_pos();
let mut must = s.must.borrow_mut();
if must.config().keep_going_after_error {
let name = if let Some(task) = s.try_current() {
task.name()
.unwrap_or_else(|| format!("task-{:?}", task.id().0))
} else {
"<unknown>".into()
};
must.handle_block(Block::new(pos, BlockType::Assert));
if must.is_consistent() {
let message = persist_task_failure(name, Some(pos));
info!("Persisted failure {message}");
}
} else {
must.handle_block(Block::new(pos, BlockType::Assert));
if must.is_consistent() {
info!("Error Detected!");
println!("{}", must.print_graph(None));
must.store_replay_information(Some(pos));
assert!(cond);
}
}
});
}
}
pub fn spawn_symmetric<F, T>(f: F, tid: crate::thread::ThreadId) -> crate::thread::JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Message + 'static,
{
let jh = crate::thread::spawn_without_switch(f, None, false, None, Some(tid));
switch();
jh
}
#[doc(hidden)]
pub fn invoke_on_stop(monitor: &mut dyn Monitor) -> MonitorResult {
Must::invoke_on_stop(monitor)
}