use std::collections::BTreeMap;
use std::collections::BTreeSet;
use crate::mc::{DependencyResolver, EventOrderingMode, McEvent, McEventId};
#[derive(Default, Clone, Hash, Eq, PartialEq, Debug)]
pub struct PendingEvents {
events: BTreeMap<McEventId, McEvent>,
timer_mapping: BTreeMap<(String, String), usize>,
available_events: BTreeSet<McEventId>,
resolver: DependencyResolver,
id_counter: McEventId,
}
impl PendingEvents {
pub fn new() -> Self {
PendingEvents {
events: BTreeMap::default(),
timer_mapping: BTreeMap::default(),
available_events: BTreeSet::default(),
resolver: DependencyResolver::default(),
id_counter: 0,
}
}
pub fn push(&mut self, event: McEvent) -> McEventId {
let id = self.id_counter;
self.id_counter += 1;
self.push_with_fixed_id(event, id)
}
pub(crate) fn push_with_fixed_id(&mut self, event: McEvent, id: McEventId) -> McEventId {
assert!(!self.events.contains_key(&id), "event with such id already exists");
match &event {
McEvent::MessageReceived { msg, src, dst, .. } => {
if self.resolver.add_message(msg.clone(), src.clone(), dst.clone(), id) {
self.available_events.insert(id);
}
}
McEvent::TimerFired {
proc,
timer_delay,
timer,
} => {
self.timer_mapping.insert((proc.clone(), timer.clone()), id);
if self.resolver.add_timer(proc.clone(), *timer_delay, id) {
self.available_events.insert(id);
}
}
_ => {
panic!("should only have TimerFired or MessageReceived events");
}
};
self.events.insert(id, event);
id
}
pub fn get(&self, id: McEventId) -> Option<&McEvent> {
self.events.get(&id)
}
pub(crate) fn available_events(&self, delivery_guarantee: &EventOrderingMode) -> BTreeSet<McEventId> {
assert!(!self.available_events.is_empty() || self.events.is_empty());
match delivery_guarantee {
EventOrderingMode::Normal => self.available_events.clone(),
EventOrderingMode::MessagesFirst => {
let only_messages = self
.available_events
.clone()
.into_iter()
.filter(|x| matches!(self.events[x], McEvent::MessageReceived { .. }))
.collect::<BTreeSet<McEventId>>();
if only_messages.is_empty() {
self.available_events.clone()
} else {
only_messages
}
}
}
}
pub fn is_empty(&self) -> bool {
assert!(!self.available_events.is_empty() || self.events.is_empty());
self.available_events.is_empty()
}
pub fn cancel_timer(&mut self, proc: String, timer: String) {
let id = self.timer_mapping.remove(&(proc, timer));
if let Some(id) = id {
self.pop(id);
}
}
pub fn pop(&mut self, event_id: McEventId) -> McEvent {
let result = self.events.remove(&event_id).unwrap();
self.available_events.remove(&event_id);
if let McEvent::TimerFired { .. } = result {
let unblocked_events = self.resolver.remove_timer(event_id);
self.available_events.extend(unblocked_events);
}
if let McEvent::MessageReceived { msg, src, dst, .. } = result.clone() {
if let Some(unblocked_event) = self.resolver.remove_message(msg, src, dst) {
self.available_events.insert(unblocked_event);
}
}
result
}
pub(crate) fn cancel_proc_events(&mut self, proc: &String) -> Vec<McEvent> {
let mut events_to_clear = Vec::new();
for (event_id, event) in &self.events {
let need_to_clear = match event {
McEvent::MessageReceived { src, dst, .. } => src == proc || dst == proc,
McEvent::TimerFired { proc: event_proc, .. } => event_proc == proc,
_ => true,
};
if need_to_clear {
events_to_clear.push(*event_id);
}
}
let mut new_events = Vec::new();
for event_id in events_to_clear {
if let McEvent::MessageReceived { msg, src, dst, .. } = self.pop(event_id) {
new_events.push(McEvent::MessageDropped {
msg,
src,
dst,
receive_event_id: Some(event_id),
});
}
}
new_events
}
}
#[cfg(test)]
mod tests {
use rand::prelude::IteratorRandom;
use std::collections::BTreeSet;
use crate::Message;
use crate::mc::network::DeliveryOptions;
use crate::mc::{EventOrderingMode, McEvent, McTime, PendingEvents};
#[test]
fn test_mc_time() {
let a = McTime::from(0.0);
let b = McTime::from(0.0);
assert!(b <= a);
assert!(a <= b);
assert_eq!(a, b);
}
#[test]
fn test_dependency_resolver_simple() {
let mut pending_events = PendingEvents::new();
let mut sequence = Vec::new();
let mut rev_id = vec![0; 9];
for node_id in 0..3 {
let times: Vec<u64> = (0..3).collect();
for event_time in times {
let event = McEvent::TimerFired {
proc: node_id.to_string(),
timer: format!("{event_time}"),
timer_delay: McTime::from(event_time as f64),
};
rev_id[pending_events.push(event)] = event_time * 3 + node_id;
}
}
println!("{rev_id:?}");
while let Some(id) = pending_events
.available_events(&EventOrderingMode::Normal)
.iter()
.choose(&mut rand::thread_rng())
{
let id = *id;
sequence.push(rev_id[id]);
pending_events.pop(id);
}
println!("{sequence:?}");
assert_eq!(sequence.len(), 9);
let mut timers = [0, 0, 0];
for event_id in sequence {
let time = event_id / 3;
let node = event_id % 3;
assert_eq!(timers[node as usize], time);
timers[node as usize] += 1;
}
}
#[test]
fn test_dependency_resolver_pop() {
let mut pending_events = PendingEvents::new();
let mut sequence = Vec::new();
let mut rev_id = [0; 12];
for node_id in 0..3 {
let times: Vec<u64> = (0..3).collect();
for event_time in times {
let event = McEvent::TimerFired {
proc: node_id.to_string(),
timer: format!("{event_time}"),
timer_delay: McTime::from(1.0 + event_time as f64),
};
rev_id[pending_events.push(event)] = event_time * 3 + node_id;
}
}
for _ in 0..7 {
let id = *pending_events
.available_events(&EventOrderingMode::Normal)
.iter()
.choose(&mut rand::thread_rng())
.unwrap();
sequence.push(rev_id[id]);
pending_events.pop(id);
}
for node_id in 0..3 {
let event = McEvent::TimerFired {
proc: node_id.to_string(),
timer: format!("{node_id}"),
timer_delay: McTime::from(3.),
};
rev_id[pending_events.push(event)] = 9 + node_id;
}
while let Some(id) = pending_events
.available_events(&EventOrderingMode::Normal)
.iter()
.choose(&mut rand::thread_rng())
{
let id = *id;
sequence.push(rev_id[id]);
pending_events.pop(id);
}
println!("{sequence:?}");
assert_eq!(sequence.len(), 12);
let mut timers = [0, 0, 0];
for event_id in sequence {
let time = event_id / 3;
let node = event_id % 3;
assert_eq!(timers[node as usize], time);
timers[node as usize] += 1;
}
}
#[test]
fn test_dependency_resolver_event_ordering() {
let mut pending_events = PendingEvents::new();
let id_timer = pending_events.push(McEvent::TimerFired {
proc: "proc".to_string(),
timer: "timer".to_string(),
timer_delay: McTime::from(0.0),
});
let id_message = pending_events.push(McEvent::MessageReceived {
msg: Message::new("TIP", "DATA"),
src: "src".to_string(),
dst: "dst".to_string(),
options: DeliveryOptions::NoFailures(McTime::from(0.0)),
});
assert_eq!(
pending_events.available_events(&EventOrderingMode::Normal),
BTreeSet::from_iter([id_timer, id_message])
);
assert_eq!(
pending_events.available_events(&EventOrderingMode::MessagesFirst),
BTreeSet::from_iter([id_message])
);
pending_events.pop(id_message);
assert_eq!(
pending_events.available_events(&EventOrderingMode::Normal),
BTreeSet::from_iter([id_timer])
);
assert_eq!(
pending_events.available_events(&EventOrderingMode::MessagesFirst),
BTreeSet::from_iter([id_timer])
);
pending_events.pop(id_timer);
assert_eq!(
pending_events.available_events(&EventOrderingMode::Normal),
BTreeSet::new()
);
assert_eq!(
pending_events.available_events(&EventOrderingMode::MessagesFirst),
BTreeSet::new()
);
}
}