use std::fmt::Debug;
use traceforge::thread::*;
use traceforge::*;
const NUM_WORKERS: usize = 2;
const INIT_TAG: u32 = 0;
const REQUEST_TAG: u32 = 1;
const SUBMIT_TAG: u32 = 2;
#[derive(Clone, Debug, PartialEq)]
enum Job {
TopLevelJob(String),
LeafJob(String),
}
trait JobQueue {
fn submit_job(&self, job: Job) -> ();
}
#[derive(Clone, Debug, PartialEq)]
struct PushJobQueue {
workers: Vec<ThreadId>,
}
impl JobQueue for PushJobQueue {
fn submit_job(&self, job: Job) {
let size = self.workers.len();
let index = (0..size).nondet();
let worker = self.workers.get(index).unwrap();
send_msg(worker.clone(), job);
}
}
fn create_job_queue(num_workers: usize) -> PushJobQueue {
let mut join_handles: Vec<JoinHandle<()>> = Vec::new();
join_handles.resize_with(num_workers, || {
spawn_daemon(push_worker_thread::<PushJobQueue>)
});
let workers = join_handles
.iter()
.map(|a| a.thread().id())
.collect::<Vec<_>>();
let job_queue = PushJobQueue {
workers: workers.clone(),
};
for thread_id in &workers {
send_tagged_msg(*thread_id, INIT_TAG, job_queue.clone());
}
return job_queue;
}
fn push_worker_thread<T: JobQueue + Clone + Debug + PartialEq + Send + 'static>() -> () {
let job_queue: T = recv_tagged_msg_block(|_, t| t.is_some() && t.unwrap() == INIT_TAG);
loop {
match recv_msg_block() {
Job::TopLevelJob(name) => {
job_queue.submit_job(Job::LeafJob(name.clone() + "1"));
job_queue.submit_job(Job::LeafJob(name + "2"));
}
Job::LeafJob(..) => {
}
}
}
}
#[test]
fn push_test() -> () {
let stats = traceforge::verify(
Config::builder()
.with_policy(traceforge::SchedulePolicy::LTR)
.with_verbose(1)
.with_trace_out("/tmp/sharedmem.traces")
.build(),
|| {
let job_queue = create_job_queue(NUM_WORKERS);
job_queue.submit_job(Job::TopLevelJob(String::from("a")));
},
);
println!("Number of executions explored {}", stats.execs);
println!("Number of blocked executions explored {}", stats.block);
assert_eq!(stats.execs, 8);
assert_eq!(stats.block, 0);
}
#[derive(Clone, Debug, PartialEq)]
struct PushWithThreadJobQueue {
queue_thread: ThreadId,
workers: Vec<ThreadId>,
}
impl JobQueue for PushWithThreadJobQueue {
fn submit_job(&self, job: Job) {
send_msg(self.queue_thread, job);
}
}
fn create_push_with_thread_job_queue(num_workers: usize) -> PushWithThreadJobQueue {
let mut join_handles: Vec<JoinHandle<()>> = Vec::new();
join_handles.resize_with(num_workers, || {
spawn_daemon(push_worker_thread::<PushWithThreadJobQueue>)
});
let workers = join_handles
.iter()
.map(|a| a.thread().id())
.collect::<Vec<_>>();
let queue_thread_id = spawn_daemon(|| {
let workers: Vec<ThreadId> =
recv_tagged_msg_block(|_, t| t.is_some() && t.unwrap() == INIT_TAG);
loop {
let msg: Job = recv_msg_block();
let index = (0..workers.len()).nondet();
send_msg(*workers.get(index).unwrap(), msg);
}
})
.thread()
.id();
send_tagged_msg(queue_thread_id, INIT_TAG, workers.clone());
let job_queue = PushWithThreadJobQueue {
queue_thread: queue_thread_id,
workers: workers.clone(),
};
for thread_id in &workers {
send_tagged_msg(*thread_id, INIT_TAG, job_queue.clone());
}
return job_queue;
}
#[test]
fn push_with_thread_test() -> () {
let stats = traceforge::verify(
Config::builder()
.with_policy(traceforge::SchedulePolicy::LTR)
.with_verbose(1)
.with_trace_out("/tmp/sharedmem.traces")
.build(),
|| {
let job_queue = create_push_with_thread_job_queue(NUM_WORKERS);
job_queue.submit_job(Job::TopLevelJob(String::from("a")));
},
);
println!("Number of executions explored {}", stats.execs);
println!("Number of blocked executions explored {}", stats.block);
assert_eq!(stats.execs, 8);
assert_eq!(stats.block, 0);
}
#[derive(Clone, Debug, PartialEq)]
struct PullJobQueue {
queue_thread: ThreadId,
}
#[derive(Clone, Debug, PartialEq)]
struct WorkRequest {
requester: ThreadId,
}
impl JobQueue for PullJobQueue {
fn submit_job(&self, job: Job) {
send_tagged_msg(self.queue_thread, SUBMIT_TAG, job);
}
}
fn create_pull_job_queue(num_workers: usize) -> PullJobQueue {
let mut join_handles: Vec<JoinHandle<()>> = Vec::new();
join_handles.resize_with(num_workers, || spawn_daemon(pull_worker_thread));
let workers = join_handles
.iter()
.map(|a| a.thread().id())
.collect::<Vec<_>>();
let queue_thread_id = spawn_daemon(|| {
loop {
let WorkRequest { requester } =
recv_tagged_msg_block(|_, t| t.is_some() && t.unwrap() == REQUEST_TAG);
let job: Job = recv_tagged_msg_block(|_, t| t.is_some() && t.unwrap() == SUBMIT_TAG);
send_msg(requester, job);
}
})
.thread()
.id();
let job_queue = PullJobQueue {
queue_thread: queue_thread_id,
};
for thread_id in &workers {
send_tagged_msg(*thread_id, INIT_TAG, job_queue.clone());
}
return job_queue;
}
fn pull_worker_thread() -> () {
let job_queue: PullJobQueue =
recv_tagged_msg_block(|_, t| t.is_some() && t.unwrap() == INIT_TAG);
loop {
send_tagged_msg(
job_queue.queue_thread,
REQUEST_TAG,
WorkRequest {
requester: current().id(),
},
);
match recv_msg_block() {
Job::TopLevelJob(name) => {
job_queue.submit_job(Job::LeafJob(name.clone() + "1"));
job_queue.submit_job(Job::LeafJob(name + "2"));
}
Job::LeafJob(..) => {
}
}
}
}
#[test]
fn pull_test() -> () {
let stats = traceforge::verify(
Config::builder()
.with_policy(traceforge::SchedulePolicy::LTR)
.with_verbose(1)
.with_trace_out("/tmp/sharedmem.traces")
.build(),
|| {
let job_queue = create_pull_job_queue(NUM_WORKERS);
job_queue.submit_job(Job::TopLevelJob(String::from("a")));
},
);
println!("Number of executions explored {}", stats.execs);
println!("Number of blocked executions explored {}", stats.block);
assert_eq!(stats.execs, 16);
assert_eq!(stats.block, 0);
}