use std::collections::{BTreeSet, HashMap};
use log::{info, warn};
use std::fs::File;
use std::io::Write;
use crate::brb_data_type::BRBDataType;
use crate::deterministic_brb::DeterministicBRB;
pub use brb_membership::actor::ed25519::{Actor, Sig, SigningActor};
use brb_membership::SigningActor as SigningActorTrait;
pub type State<BRBDT> = DeterministicBRB<Actor, SigningActor, Sig, BRBDT>;
pub type Packet<BRBDT> = crate::packet::Packet<Actor, Sig, BRBDT>;
pub trait BRBDT: BRBDataType<Actor> {}
impl<T: BRBDataType<Actor>> BRBDT for T {}
#[derive(Debug)]
pub struct Net<DT: BRBDT> {
pub procs: Vec<State<DT>>,
pub delivered_packets: Vec<Packet<DT::Op>>,
pub n_packets: u64,
pub invalid_packets: HashMap<Actor, u64>,
}
impl<DT: BRBDT> Default for Net<DT> {
fn default() -> Self {
Self::new()
}
}
impl<DT: BRBDT> Net<DT> {
pub fn new() -> Self {
Self {
procs: Vec::new(),
n_packets: 0,
delivered_packets: Default::default(),
invalid_packets: Default::default(),
}
}
pub fn members(&self) -> BTreeSet<Actor> {
self.procs
.iter()
.map(|proc| {
proc.peers()
.unwrap()
.iter()
.flat_map(|peer| self.proc(peer))
.filter(|peer_proc| peer_proc.peers().unwrap().contains(&proc.actor()))
.map(|peer_proc| peer_proc.actor())
.collect::<BTreeSet<_>>()
})
.max_by_key(|members| members.len())
.unwrap_or_default()
}
pub fn actors(&self) -> BTreeSet<Actor> {
self.procs.iter().map(|p| p.actor()).collect()
}
pub fn initialize_proc(&mut self) -> Actor {
let proc = DeterministicBRB::new();
let actor = proc.actor();
self.procs.push(proc);
actor
}
pub fn proc(&self, actor: &Actor) -> Option<&State<DT>> {
self.procs
.iter()
.find(|secure_p| &secure_p.actor() == actor)
}
pub fn proc_mut(&mut self, actor: &Actor) -> Option<&mut State<DT>> {
self.procs
.iter_mut()
.find(|secure_p| &secure_p.actor() == actor)
}
pub fn anti_entropy(&mut self) {
info!("[NET] anti-entropy");
let packets: Vec<_> = self
.procs
.iter()
.flat_map(|proc| {
proc.peers()
.unwrap()
.into_iter()
.map(move |peer| proc.anti_entropy(peer).unwrap())
})
.collect();
self.run_packets_to_completion(packets);
}
pub fn deliver_packet(&mut self, packet: Packet<DT::Op>) -> Vec<Packet<DT::Op>> {
info!("[NET] packet {}->{}", packet.source, packet.dest);
self.n_packets += 1;
let dest = packet.dest;
self.delivered_packets.push(packet.clone());
self.proc_mut(&dest)
.map(|p| p.handle_packet(packet))
.unwrap_or_else(|| Ok(vec![])) .unwrap_or_else(|err| {
warn!("[BRB] Rejected packet: {:?}", err);
let count = self.invalid_packets.entry(dest).or_default();
*count += 1;
vec![]
})
}
pub fn members_are_in_agreement(&self) -> bool {
let mut member_states_iter = self
.members()
.into_iter()
.flat_map(|actor| self.proc(&actor))
.map(|p| &p.history_from_source);
if let Some(reference_state) = member_states_iter.next() {
member_states_iter.all(|s| s == reference_state)
} else {
true }
}
pub fn count_invalid_packets(&self) -> u64 {
self.invalid_packets.values().sum()
}
pub fn run_packets_to_completion(&mut self, mut packets: Vec<Packet<DT::Op>>) {
while !packets.is_empty() {
let packet = packets.remove(0);
packets.extend(self.deliver_packet(packet));
}
}
pub fn generate_msc(&self, chart_name: &str) {
let mut msc = String::from(
"
msc {\n
hscale = \"2\";\n
",
);
let procs = self
.procs
.iter()
.map(|p| p.membership.id.actor())
.collect::<BTreeSet<_>>() .into_iter()
.map(|id| format!("{:?}", id))
.collect::<Vec<_>>()
.join(",");
msc.push_str(&procs);
msc.push_str(";\n");
for packet in self.delivered_packets.iter() {
msc.push_str(&format!(
"{}->{} [ label=\"{:?}\"];\n",
packet.source, packet.dest, packet.payload
));
}
msc.push_str("}\n");
for (idx, proc_id) in self
.procs
.iter()
.map(|p| p.membership.id.actor())
.enumerate()
{
let proc_id_as_str = format!("{}", proc_id);
msc = msc.replace(&proc_id_as_str, &format!("{}", idx + 1));
}
let mut msc_file = File::create(format!("{}.msc", chart_name)).unwrap();
msc_file.write_all(msc.as_bytes()).unwrap();
}
}