use std::collections::{BTreeMap, BTreeSet, VecDeque};
use crate::Message;
use crate::mc::{McEventId, McTime};
#[derive(Default, Clone, Hash, Eq, PartialEq, Debug)]
pub struct DependencyResolver {
timers: BTreeMap<McEventId, TimerInfo>,
messages: BTreeMap<(Message, String, String), VecDeque<McEventId>>,
proc_timers: BTreeMap<String, BTreeSet<McEventId>>,
}
#[derive(Clone, Hash, Eq, PartialEq, Debug)]
struct TimerInfo {
proc: String,
delay: McTime,
blockers: BTreeSet<McEventId>,
}
impl DependencyResolver {
pub fn add_timer(&mut self, proc: String, delay: McTime, event_id: McEventId) -> bool {
let proc_timers = self.proc_timers.entry(proc.clone()).or_default();
let mut blockers = BTreeSet::default();
for id in proc_timers.iter() {
if self.timers[id].delay <= delay {
blockers.insert(*id);
}
}
let is_available = blockers.is_empty();
assert!(
self.timers
.insert(event_id, TimerInfo { proc, delay, blockers })
.is_none(),
"event with such id already exists"
);
proc_timers.insert(event_id);
is_available
}
pub fn remove_timer(&mut self, event_id: McEventId) -> BTreeSet<McEventId> {
let timer = self.timers.remove(&event_id).unwrap();
let proc_timers = self.proc_timers.get_mut(&timer.proc).unwrap();
assert!(proc_timers.remove(&event_id));
let mut unblocked = BTreeSet::default();
for other_id in proc_timers.iter() {
let other_blockers = &mut self.timers.get_mut(other_id).unwrap().blockers;
other_blockers.remove(&event_id);
if other_blockers.is_empty() {
unblocked.insert(*other_id);
}
}
if proc_timers.is_empty() {
self.proc_timers.remove(&timer.proc);
}
unblocked
}
pub fn add_message(&mut self, msg: Message, src: String, dst: String, event_id: McEventId) -> bool {
let vec_ref = self.messages.entry((msg, src, dst)).or_default();
vec_ref.push_back(event_id);
vec_ref.len() == 1
}
pub fn remove_message(&mut self, msg: Message, src: String, dst: String) -> Option<McEventId> {
let ids = self.messages.get_mut(&(msg.clone(), src.clone(), dst.clone())).unwrap();
ids.pop_front();
if !ids.is_empty() {
Some(ids[0])
} else {
self.messages.remove(&(msg, src, dst));
None
}
}
}
#[cfg(test)]
mod tests {
use serde::Serialize;
use crate::Message;
use crate::mc::DependencyResolver;
#[derive(Serialize)]
struct EmptyMessage {}
#[test]
fn test_dependency_resolver_messages() {
let mut resolver = DependencyResolver::default();
let procs = ["proc-0", "proc-1", "proc-2"];
let mut counter: usize = 0;
let mut event_id = || {
counter += 1;
counter
};
for proc in procs {
assert!(resolver.add_timer(proc.to_owned(), ordered_float::OrderedFloat(1.0), event_id()));
assert!(!resolver.add_timer(proc.to_owned(), ordered_float::OrderedFloat(3.0), event_id()));
}
for proc_from in procs {
for proc_to in procs {
if proc_to == proc_from {
continue;
}
assert!(resolver.add_message(
Message::json("MSG", &EmptyMessage {}),
proc_from.to_owned(),
proc_to.to_owned(),
event_id()
));
assert!(!resolver.add_message(
Message::json("MSG", &EmptyMessage {}),
proc_from.to_owned(),
proc_to.to_owned(),
event_id()
));
}
}
assert_eq!(resolver.messages.len(), 6);
assert_eq!(resolver.timers.len(), 6);
let mut counter: usize = 0;
let mut event_id = || {
counter += 1;
counter
};
for _ in procs {
assert!(!resolver.remove_timer(event_id()).is_empty());
assert!(resolver.remove_timer(event_id()).is_empty());
}
for proc_from in procs {
for proc_to in procs {
if proc_to == proc_from {
continue;
}
assert!(resolver
.remove_message(
Message::json("MSG", &EmptyMessage {}),
proc_from.to_owned(),
proc_to.to_owned()
)
.is_some());
assert!(resolver
.remove_message(
Message::json("MSG", &EmptyMessage {}),
proc_from.to_owned(),
proc_to.to_owned()
)
.is_none());
}
}
assert!(resolver.messages.is_empty());
assert!(resolver.timers.is_empty());
assert!(resolver.proc_timers.is_empty());
}
}