use std::cmp::Ordering;
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use serde::{Deserialize, Serialize};
use crate::channel::Thread;
use crate::event::Event;
use crate::indexed_map::IndexedMap;
use crate::loc::{Loc, RecvLoc};
use crate::revisit::Revisit;
use crate::runtime::task::TaskId;
use crate::thread::{construct_thread_id, main_thread_id};
use crate::vector_clock::VectorClock;
use crate::{event_label::*, Val};
use crate::{replay as REPLAY, ThreadId};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub(crate) struct ThreadInfo {
tid: ThreadId,
task_id: Option<TaskId>,
tclab: TCreate,
pub(crate) labels: Vec<LabelEnum>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub(crate) struct ExecutionGraph {
pub(crate) threads: IndexedMap<ThreadInfo>,
stamp: usize,
task_id_map: HashMap<TaskId, ThreadId>,
pub(crate) finished_threads: HashSet<ThreadId>,
#[serde(skip)]
sends: HashMap<Loc, Vec<Event>>,
#[serde(skip)]
recvs: HashMap<Loc, Vec<Event>>,
dropped_sends: usize,
}
impl ExecutionGraph {
pub(crate) fn new() -> ExecutionGraph {
let t0 = main_thread_id();
let event = Event::new(t0, 0);
ExecutionGraph {
threads: IndexedMap::new_with_first(ThreadInfo {
tid: t0,
task_id: Some(TaskId(0)),
tclab: TCreate::new(event, t0, Some("main".to_owned()), false, None, vec![]),
labels: vec![LabelEnum::Begin(Begin::main())],
}),
stamp: 0,
task_id_map: HashMap::from([(TaskId(0), t0)]),
finished_threads: HashSet::new(),
sends: HashMap::new(),
recvs: HashMap::new(),
dropped_sends: 0,
}
}
pub(crate) fn initialize_for_execution(&mut self) {
assert_eq!(self.threads[0].task_id, Some(TaskId(0)));
for t in &mut self.threads.iter_mut().skip(1) {
self.task_id_map.remove(&t.task_id.unwrap());
t.task_id = None;
}
self.finished_threads.clear();
let tids = self.threads.iter().map(|t| t.tid).collect::<Vec<_>>();
tids.iter().for_each(|tid| self.on_thread_changed(tid));
for thr in self.threads.iter_mut() {
if let Some(LabelEnum::End(e)) = thr.labels.last_mut() {
e.result.set_pending();
}
}
for thr in self.threads.iter_mut() {
for e in &mut thr.labels {
if let LabelEnum::SendMsg(s) = e {
s.val.set_pending();
}
}
}
}
pub(crate) fn on_thread_changed(&mut self, tid: &ThreadId) {
if let Some(LabelEnum::End(_)) = self.get_thr(tid).labels.last() {
self.finished_threads.insert(*tid);
} else {
self.finished_threads.remove(tid);
}
}
pub(crate) fn validate_replay_event(&self, actual: &LabelEnum) {
let expected = &self.get_thr(&actual.thread()).labels[actual.index() as usize];
Self::panic_if_err(expected.compare_for_replay(actual));
}
pub(crate) fn panic_if_err(res: Result<(), String>) {
if let Err(e) = res {
panic!("Incorrect TraceForge Program. TraceForge programs must be deterministic. Any nondeterminism should be under the control of TraceForge via the nondet() function.\n{}",
e);
}
}
pub(crate) fn get_thr(&self, tid: &ThreadId) -> &ThreadInfo {
self.get_thr_opt(tid).unwrap_or_else(|| {
panic!(
"Can't find thread {} in graph with thread ids {:?}",
*tid,
self.threads.iter().map(|t| t.tid).collect::<Vec<_>>()
)
})
}
pub(crate) fn get_thr_opt(&self, tid: &ThreadId) -> Option<&ThreadInfo> {
self.threads.get(Into::<usize>::into(*tid))
}
pub(crate) fn get_thr_opt_mut(&mut self, tid: &ThreadId) -> Option<&mut ThreadInfo> {
self.threads.get_mut(Into::<usize>::into(*tid))
}
pub(crate) fn get_thr_mut(&mut self, tid: &ThreadId) -> &mut ThreadInfo {
self.get_thr_opt_mut(tid).unwrap_or_else(|| {
panic!("Can't find thread {}", *tid);
})
}
pub(crate) fn all_store_iter(&self) -> impl Iterator<Item = &SendMsg> {
self.threads
.iter()
.flat_map(|t| t.labels.iter().map(|l| l.pos()))
.filter_map(move |e| self.send_label(e))
}
pub(crate) fn matching_stores<'a>(
&'a self,
recv_loc: &'a RecvLoc,
) -> impl Iterator<Item = &'a SendMsg> {
recv_loc
.locs()
.iter()
.filter_map(move |c| self.sends.get(c))
.map(move |v| {
v.iter()
.map(move |&pos| self.send_label(pos).unwrap())
.filter(move |&slab| !slab.is_dropped() && recv_loc.matches_tag(slab))
})
.fold(
Box::new(std::iter::empty()) as Box<dyn Iterator<Item = &SendMsg>>,
|acc, it| {
Box::new(merging_iterator::MergeIter::with_custom_ordering(
acc,
it,
|a, b| a.stamp() < b.stamp(),
))
},
)
}
pub(crate) fn rev_matching_recvs<'a>(
&'a self,
send: &'a SendMsg,
) -> impl Iterator<Item = &'a RecvMsg> {
let init = self.recvs.get(send.loc()).map(move |vec| {
vec.iter()
.map(move |&pos| self.recv_label(pos).unwrap())
.rev()
});
let init: Box<dyn Iterator<Item = &RecvMsg>> = match init {
Some(i) => Box::new(i),
None => Box::new(std::iter::empty()),
};
send.monitor_sends()
.keys()
.map(|&tid| {
self.recvs.get(&Loc::new(Thread(tid))).map(|v| {
v.iter()
.map(move |&pos| self.recv_label(pos).unwrap())
.rev()
})
})
.flatten()
.fold(init, |acc, it| {
Box::new(merging_iterator::MergeIter::with_custom_ordering(
acc,
it,
|a, b| a.stamp() > b.stamp(),
))
})
.filter(move |&rlab| rlab.recv_loc().matches_tag(send))
}
pub(crate) fn stamp(&self) -> usize {
self.stamp
}
pub(crate) fn next_stamp(&mut self) -> usize {
self.stamp += 1;
self.stamp
}
pub(crate) fn add_new_thread(&mut self, tclab: TCreate, task_id: TaskId) {
assert!(self.get_thr_opt(&tclab.cid()).is_none());
let tid = tclab.cid();
let index: usize = tid.into();
self.threads.set(
index,
ThreadInfo {
tid,
task_id: Some(task_id),
tclab,
labels: vec![],
},
);
self.task_id_map.insert(task_id, tid);
}
pub(crate) fn thread_ids(&self) -> BTreeSet<ThreadId> {
self.threads.iter().map(|t| t.tid).collect()
}
pub(crate) fn to_task_id(&self, tid: ThreadId) -> Option<TaskId> {
self.get_thr(&tid).task_id
}
pub(crate) fn to_thread_id(&self, task_id: TaskId) -> ThreadId {
if let Some(tid) = self.task_id_map.get(&task_id) {
*tid
} else {
panic!("no thread id for task id {:?}", task_id);
}
}
pub(crate) fn get_thread_tclab(&self, tid: ThreadId) -> TCreate {
self.get_thr(&tid).tclab.clone()
}
pub(crate) fn tid_for_spawn(&self, pos: &Event, origination_vec: &[u32]) -> ThreadId {
assert_eq!(origination_vec.last(), Some(&pos.index));
let mut spawning_thread = main_thread_id();
for (i, &event_idx) in origination_vec.iter().enumerate() {
let is_last_spawn = i == origination_vec.len() - 1;
if event_idx < self.thread_size(spawning_thread) as u32 {
let spawn_pos = Event::new(spawning_thread, event_idx);
let lab = self.label(spawn_pos);
if let LabelEnum::TCreate(tclab) = lab {
let expected_origination_vec = &origination_vec[0..=i];
assert_eq!(expected_origination_vec, tclab.origination_vec());
if is_last_spawn {
return tclab.cid(); }
spawning_thread = tclab.cid(); } else {
let msg = format!("Expected spawn event at {:?} but have {:?}", spawn_pos, lab);
Self::panic_if_err(Result::Err(msg));
}
} else if !is_last_spawn {
let msg = format!(
"Expected to find event at {} for thread {}",
event_idx, spawning_thread
);
Self::panic_if_err(Result::Err(msg));
}
}
let opaque_id = self
.threads
.iter()
.max_by_key(|t| t.tid.to_number())
.expect("Didn't expect zero threads!")
.tid
.to_number();
let new_id = opaque_id + 1;
construct_thread_id(new_id)
}
pub(crate) fn set_task_for_replay(&mut self, tid: ThreadId, task_id: TaskId) {
if let Some(tid) = self.task_id_map.get(&task_id) {
panic!(
"A different thread {} already is associated to task_id {:?}",
*tid, &task_id
);
}
let index: usize = tid.into();
let thread = self.threads.get_mut(index).unwrap();
if let Some(other_task_id) = thread.task_id {
panic!(
"This thread {:?} already has a task_id {:?}",
thread.tid, other_task_id
);
}
thread.task_id = Some(task_id);
self.task_id_map.insert(task_id, thread.tid);
}
pub(crate) fn thread_size(&self, t: ThreadId) -> usize {
self.get_thr(&t).labels.len()
}
pub(crate) fn thread_last(&self, t: ThreadId) -> Option<&LabelEnum> {
self.get_thr(&t).labels.last()
}
pub(crate) fn thread_first(&self, t: ThreadId) -> Option<&Begin> {
self.get_thr(&t).labels.first().map(|lab| {
if let LabelEnum::Begin(blab) = lab {
blab
} else {
panic!()
}
})
}
pub(crate) fn is_thread_blocked(&self, t: ThreadId) -> bool {
matches!(self.thread_last(t).unwrap(), LabelEnum::Block(_))
}
pub(crate) fn is_thread_complete(&self, t: ThreadId) -> bool {
let old = matches!(self.thread_last(t).unwrap(), LabelEnum::End(_));
let new = self.finished_threads.contains(&t);
assert_eq!(old, new, "finished_threads set is not correct.");
new
}
pub(crate) fn is_thread_daemon(&self, t: ThreadId) -> bool {
self.get_thr(&t).tclab.daemon()
}
pub(crate) fn check_blocked(&self) -> Option<BlockType> {
let mut ret = None;
for t in self.thread_ids() {
if self.is_thread_blocked(t) {
let blab = self.thread_last(t).unwrap();
match blab {
LabelEnum::Block(b) => match b.btype() {
BlockType::Assume => {
return Some(BlockType::Assume);
}
BlockType::Assert => {
return Some(BlockType::Assert);
}
BlockType::Value(loc) => {
if self.is_thread_daemon(t) {
continue;
} else {
ret = Some(BlockType::Value(loc.clone()));
}
}
block => {
ret = Some(block.clone());
}
},
_ => panic!("Blocked thread has unexpected last label {}", blab),
}
}
}
ret
}
pub(crate) fn add_label(&mut self, lab: LabelEnum) -> Event {
self.add(lab).pos()
}
fn add(&mut self, mut lab: LabelEnum) -> &LabelEnum {
if !lab.stamped() {
lab.set_stamp(self.next_stamp());
}
let pos = lab.pos();
let existing_label_count = self.thread_size(lab.thread());
match (lab.index() as usize).cmp(&existing_label_count) {
Ordering::Greater => {
panic!(
"Label index {} must be <= {}",
lab.index(),
existing_label_count
);
}
Ordering::Equal => {
self.get_thr_mut(&pos.thread).labels.push(lab);
}
Ordering::Less => {
let old_label = self.get_thr(&pos.thread).labels[pos.index as usize].clone();
let old_tclab: Option<TCreate> = if let LabelEnum::TCreate(x) = old_label {
Some(x)
} else {
None
};
let new_tclab: Option<TCreate> = if let LabelEnum::TCreate(x) = lab.clone() {
Some(x)
} else {
None
};
assert_eq!(
old_tclab.is_some(),
new_tclab.is_some(),
"Requiring {:?} == {:?}",
old_tclab,
new_tclab
);
self.get_thr_mut(&pos.thread).labels[pos.index as usize] = lab;
}
}
self.on_thread_changed(&pos.thread);
&self.get_thr(&pos.thread).labels[pos.index as usize]
}
pub(crate) fn contains(&self, e: Event) -> bool {
self.get_thr_opt(&e.thread).is_some() && (e.index as usize) < self.thread_size(e.thread)
}
pub(crate) fn remove_last(&mut self, t: ThreadId) {
self.get_thr_mut(&t).labels.pop();
self.on_thread_changed(&t);
}
pub(crate) fn label(&self, e: Event) -> &LabelEnum {
&self.get_thr(&e.thread).labels[e.index as usize]
}
#[allow(dead_code)]
pub(crate) fn label_opt(&self, e: Event) -> Option<&LabelEnum> {
self.get_thr(&e.thread).labels.get(e.index as usize)
}
pub(crate) fn label_mut(&mut self, e: Event) -> &mut LabelEnum {
&mut self.get_thr_mut(&e.thread).labels[e.index as usize]
}
pub(crate) fn create_label(&self, e: Event) -> Option<&TCreate> {
if let LabelEnum::TCreate(l) = self.label(e) {
Some(l)
} else {
None
}
}
pub(crate) fn is_recv(&self, e: Event) -> bool {
matches!(self.label(e), LabelEnum::RecvMsg(_))
}
pub(crate) fn recv_label(&self, e: Event) -> Option<&RecvMsg> {
if let LabelEnum::RecvMsg(l) = self.label(e) {
Some(l)
} else {
None
}
}
pub(crate) fn recv_label_mut(&mut self, e: Event) -> Option<&mut RecvMsg> {
if let LabelEnum::RecvMsg(l) = self.label_mut(e) {
Some(l)
} else {
None
}
}
pub(crate) fn val(&self, e: Event) -> Option<&Val> {
match self.label(e) {
LabelEnum::RecvMsg(rlab) => {
if let Some(rf) = rlab.rf() {
Some(self.send_label(rf).unwrap().recv_val(rlab))
} else {
None
}
}
LabelEnum::Block(_) => None, a => panic!("Expecting RecvMsg or Block but got {}", a),
}
}
pub(crate) fn val_copy(&self, rpos: Event) -> Option<Val> {
self.val(rpos).cloned()
}
pub(crate) fn is_send(&self, e: Event) -> bool {
matches!(self.label(e), LabelEnum::SendMsg(_))
}
pub(crate) fn send_label(&self, e: Event) -> Option<&SendMsg> {
if let LabelEnum::SendMsg(l) = self.label(e) {
Some(l)
} else {
None
}
}
pub(crate) fn send_label_mut(&mut self, e: Event) -> Option<&mut SendMsg> {
if let LabelEnum::SendMsg(l) = self.label_mut(e) {
Some(l)
} else {
None
}
}
pub(crate) fn is_rf_maximal_send(&self, e: Event) -> bool {
self.send_label(e)
.filter(|send| send.is_unread() && !send.is_monitor_read())
.is_none()
}
pub(crate) fn revisit_view(&self, rev: &Revisit) -> VectorClock {
let mut v = self.view_from_stamp(self.label(rev.pos).stamp());
v.update(self.send_label(rev.rev).unwrap().porf());
for thr in self.threads.iter() {
if let Some(vc_limit_inclusive) = v.get(thr.tid) {
for lab in thr.labels.iter().take(vc_limit_inclusive as usize + 1) {
if let LabelEnum::TCreate(tclab) = lab {
v.update_or_set(Event::new(tclab.cid(), 0));
}
}
}
}
v
}
pub(crate) fn view_from_stamp(&self, s: usize) -> VectorClock {
let mut v = VectorClock::new();
for thread in self.threads.iter() {
let i = thread.labels.partition_point(|lab| lab.stamp() <= s);
if i != 0 {
v.update_or_set(thread.labels[i - 1].pos());
}
}
v
}
pub(crate) fn get_receiving_index(&self, rlab: &RecvMsg) -> Option<usize> {
rlab.rf().map(|send| {
let slab = self.send_label(send).unwrap();
if rlab.monitors(slab) {
0
} else {
rlab.recv_loc().get_matching_index(slab.send_loc())
}
})
}
fn remove_from_readers(&mut self, recv: Event) {
let rlab = self.recv_label(recv).unwrap();
if let Some(old_send) = rlab.rf() {
let monitors = rlab.monitors(self.send_label(old_send).unwrap());
let old_send = self.send_label_mut(old_send).unwrap();
if monitors {
old_send.remove_monitor_reader(recv)
}
if old_send.reader().is_some_and(|r| r == recv) {
old_send.set_reader(None);
}
}
}
pub(crate) fn change_rf(&mut self, recv: Event, send: Option<Event>) {
assert!(self.is_recv(recv));
assert!(send.is_none() || self.is_send(send.unwrap()));
self.remove_from_readers(recv);
if let Some(new_send) = send {
if self
.recv_label(recv)
.unwrap()
.monitors(self.send_label(new_send).unwrap())
{
self.send_label_mut(new_send)
.unwrap()
.add_monitor_reader(recv)
} else {
self.send_label_mut(new_send)
.unwrap()
.set_reader(Some(recv))
}
}
self.recv_label_mut(recv).unwrap().set_rf(send);
}
fn check_spawn_invariants(&self) {
let child_thread_ids: BTreeSet<ThreadId> = self
.thread_ids()
.iter()
.copied()
.filter(|&tid| tid != main_thread_id())
.collect();
let mut threads_from_tcreate: BTreeMap<ThreadId, (ThreadId, usize)> = BTreeMap::new();
for thread_info in self.threads.iter() {
let parent_thread_id = thread_info.tid;
for (event_idx, event) in thread_info.labels.iter().enumerate() {
if let LabelEnum::TCreate(tc) = &event {
let child_thread_id = tc.cid();
assert!(!threads_from_tcreate.contains_key(&child_thread_id));
threads_from_tcreate.insert(child_thread_id, (parent_thread_id, event_idx));
}
}
}
let thread_ids_from_tcreate = threads_from_tcreate.keys().copied().collect::<Vec<_>>();
let child_vec: Vec<ThreadId> = child_thread_ids.iter().copied().collect();
assert_eq!(
child_vec, thread_ids_from_tcreate,
"threads and TCreate labels aren't consistent"
);
let mut threads_from_begin: BTreeMap<ThreadId, (ThreadId, usize)> = BTreeMap::new();
for thread_info in self.threads.iter() {
if thread_info.tid == main_thread_id() {
continue;
}
let child_thread_id = thread_info.tid;
if let Some(LabelEnum::Begin(blab)) = thread_info.labels.first() {
if let Some(Event {
thread: parent_thread_id,
index: event_idx,
}) = blab.parent()
{
assert!(!threads_from_begin.contains_key(&child_thread_id));
threads_from_begin
.insert(child_thread_id, (parent_thread_id, event_idx as usize));
} else {
panic!("Every thread other than main must have a parent");
}
} else {
panic!("First event must be Begin");
}
}
assert_eq!(
threads_from_begin, threads_from_tcreate,
"begin and tcreate events are inconsistent"
);
let mut threads_from_thdinfo: BTreeMap<ThreadId, (ThreadId, usize)> = BTreeMap::new();
for thread_info in self.threads.iter() {
if thread_info.tid == main_thread_id() {
continue;
}
let child_thread_id = thread_info.tid;
let Event {
thread: parent_thread_id,
index: event_idx,
} = thread_info.tclab.pos();
assert!(!threads_from_thdinfo.contains_key(&child_thread_id));
threads_from_thdinfo.insert(child_thread_id, (parent_thread_id, event_idx as usize));
}
assert_eq!(
threads_from_tcreate, threads_from_thdinfo,
"self.thdinfo has information that's inconsistent with self.threads() events"
);
}
pub(crate) fn register_send(&mut self, send: &Event) {
let monitor_tids = self
.send_label(*send)
.unwrap()
.monitor_sends()
.keys()
.cloned()
.collect::<Vec<_>>();
for tid in monitor_tids {
let legacy_chan = Loc::new(Thread(tid));
let sends = self.sends.entry(legacy_chan).or_default();
sends.push(*send);
}
let slab = self.send_label(*send).unwrap();
let sends = self.sends.entry(slab.loc().clone()).or_default();
sends.push(*send);
}
pub(crate) fn register_recv(&mut self, recv: &Event) {
let rlab = self.recv_label(*recv);
if rlab.is_none() {
return;
}
let locs = rlab.unwrap().recv_loc().locs().clone();
locs.iter().for_each(|l| {
self.recvs.entry(l.clone()).or_default().push(*recv);
});
}
pub(crate) fn dropped_sends(&self) -> usize {
self.dropped_sends
}
pub(crate) fn incr_dropped_sends(&mut self) {
self.dropped_sends += 1;
}
pub(crate) fn decr_dropped_sends(&mut self, count: usize) {
self.dropped_sends -= count;
}
fn cut_to_view(&mut self, v: &VectorClock) {
let mut deleted: HashSet<Event> = HashSet::new();
let some_dropped = self.dropped_sends() != 0;
self.sends.values_mut().for_each(|stores| {
stores.retain(|e| {
let kept = v.contains(*e);
if !kept && some_dropped {
deleted.insert(*e);
}
kept
})
});
self.sends.retain(|_, vec| !vec.is_empty());
if some_dropped {
let mut deleted_dropped: usize = 0;
deleted.iter().for_each(|&e| {
self.send_label(e).map(|s| {
if s.is_dropped() {
deleted_dropped += 1;
}
});
});
self.decr_dropped_sends(deleted_dropped);
}
self.recvs
.values_mut()
.for_each(|vec| vec.retain(|e| v.contains(*e)));
let mut deleted_receives = vec![];
for threads in self.threads.iter() {
let j = threads.labels.partition_point(|lab| v.contains(lab.pos()));
for lab in threads.labels[j..].iter() {
if let LabelEnum::RecvMsg(rlab) = lab {
deleted_receives.push(rlab.pos());
}
}
}
for deleted in deleted_receives {
self.remove_from_readers(deleted);
}
let threads = &mut self.threads;
let tasks = &mut self.task_id_map;
threads.retain(|t| {
if v.get(t.tid).is_some() {
true
} else {
tasks.remove(&t.task_id.unwrap());
false
}
});
let tids = self.threads.iter().map(|t| t.tid).collect::<Vec<_>>();
for tid in tids {
let event_idx = v
.get(tid)
.expect("any thread not in the vector clock should already be erased")
as usize
+ 1;
let ind: usize = tid.into();
self.threads[ind].labels.truncate(event_idx);
self.on_thread_changed(&tid);
}
self.check_spawn_invariants();
}
pub(crate) fn cut_to_stamp(&mut self, s: usize) {
let v = self.view_from_stamp(s);
self.cut_to_view(&v);
}
pub(crate) fn copy_to_view(&self, v: &VectorClock) -> ExecutionGraph {
let mut other = self.clone();
other.cut_to_view(v);
other
}
pub(crate) fn porf(&self, pos: Event) -> VectorClock {
let lab = self.label(pos);
let mut porf = lab.cached_porf().clone();
match lab {
LabelEnum::Begin(blab) => {
if let Some(parent) = blab.parent() {
porf.update(self.label(parent).cached_porf());
}
}
LabelEnum::TJoin(jlab) => {
porf.update(self.thread_last(jlab.cid()).unwrap().cached_porf());
}
LabelEnum::RecvMsg(rlab) => {
if let Some(rf) = rlab.rf() {
porf.update(self.label(rf).cached_porf());
}
}
_ => { }
};
porf
}
pub(crate) fn in_porf(&self, first: Event, second: Event) -> bool {
let lab = self.label(second);
match lab {
LabelEnum::Begin(blab) => {
if let Some(parent) = blab.parent() {
if self.label(parent).cached_porf().contains(first) {
return true;
}
}
}
LabelEnum::TJoin(jlab) => {
if self
.thread_last(jlab.cid())
.unwrap()
.cached_porf()
.contains(first)
{
return true;
}
}
LabelEnum::RecvMsg(rlab) => {
if let Some(rf) = rlab.rf() {
if self.label(rf).cached_porf().contains(first) {
return true;
}
}
}
_ => { }
};
lab.cached_porf().contains(first)
}
pub(crate) fn top_sort(&self, pos: Option<Event>) -> REPLAY::TopologicallySortedExecutionGraph {
let maxs = if let Some(ev) = pos {
vec![ev]
} else {
self.threads
.iter()
.map(|t| t.tid)
.filter(|&tid| {
let last = self.thread_last(tid).unwrap().pos();
!self.is_send(last) || self.is_rf_maximal_send(last)
})
.map(|tid| self.thread_last(tid).unwrap().pos())
.collect()
};
let mut v = VectorClock::new();
let mut sorted_graph = REPLAY::TopologicallySortedExecutionGraph::new();
for e in maxs {
self.top_sort_util(&mut v, &mut sorted_graph, e);
}
sorted_graph
}
fn top_sort_util(
&self,
view: &mut VectorClock,
graph: &mut REPLAY::TopologicallySortedExecutionGraph,
e: Event,
) {
if view.contains(e) {
return;
}
let start_idx = view.get(e.thread).unwrap_or(0);
view.update_or_set(e);
for i in start_idx..=e.index {
let ei = Event::new(e.thread, i);
if self.is_recv(ei) && self.recv_label(ei).unwrap().rf().is_some() {
self.top_sort_util(view, graph, self.recv_label(ei).unwrap().rf().unwrap());
}
if let LabelEnum::TJoin(jlab) = self.label(ei) {
self.top_sort_util(view, graph, self.thread_last(jlab.cid()).unwrap().pos());
}
if let LabelEnum::Begin(blab) = self.label(ei) {
if blab.parent().is_some() {
self.top_sort_util(view, graph, blab.parent().unwrap());
}
}
graph.insert_label(self.label(ei).clone());
}
}
}
impl Default for ExecutionGraph {
fn default() -> Self {
ExecutionGraph::new()
}
}
impl std::fmt::Display for ExecutionGraph {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(f, "Printing exec graph")?;
for thread_info in self.threads.iter() {
let tid = thread_info.tid;
let daemon = (if thread_info.tclab.daemon() {
", daemon"
} else {
""
})
.to_owned();
match thread_info.tclab.name() {
None => writeln!(f, "thread {}{}:", tid, daemon)?,
Some(name) => writeln!(f, "thread \"{}\"[tid={}{}]:", name, tid, daemon)?,
}
for lab in thread_info.labels.iter() {
writeln!(f, "\t{}", lab)?;
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
#[should_panic(expected = "no thread id for task id")]
fn test_to_thread_id_with_no_thread_id() {
let exec_graph = ExecutionGraph::default();
exec_graph.to_thread_id(TaskId(1));
}
}