use crate::configuration::middleware_configuration::Configuration;
use crate::graph::middleware::dot::Dot;
use crate::vv::structs::messages::{Message, MiddlewareClient};
use crate::vv::structs::version_vector::VersionVector;
use crossbeam::Sender;
use std::collections::HashMap;
use std::sync::Arc;
#[derive(Debug, Clone)]
pub struct QueueNode {
pub j: usize,
pub message: Message,
}
#[derive(Debug, Clone)]
pub struct StableDot {
pub ctr: usize,
pub j: usize,
pub message: Message,
}
impl StableDot {
pub fn new(ctr: usize, j: usize, message: Message) -> Self {
Self { ctr, j, message }
}
}
#[allow(non_snake_case)]
pub struct VV {
pub V: VersionVector,
pub R: VersionVector,
pub DQ: Vec<QueueNode>,
pub M: Vec<VersionVector>,
pub M_entry_row_num: VersionVector,
pub SV: VersionVector,
pub SMap: HashMap<Dot, StableDot>,
pub ctr: usize,
pub peer_index: usize,
pub client: Sender<MiddlewareClient>,
pub configuration: Arc<Configuration>,
pub peer_number: usize,
}
#[allow(non_snake_case)]
impl VV {
pub fn new(
peer_number: usize,
peer_index: usize,
client: Sender<MiddlewareClient>,
configuration: Arc<Configuration>,
) -> Self {
let DQ: Vec<QueueNode> = Vec::with_capacity(peer_number * 2);
let mut M: Vec<VersionVector> = Vec::new();
for _ in 0..peer_number {
M.push(VersionVector::new(peer_number));
}
Self {
V: VersionVector::new(peer_number),
R: VersionVector::new(peer_number),
DQ,
M,
M_entry_row_num: VersionVector::new(peer_number),
SV: VersionVector::new(peer_number),
SMap: HashMap::new(),
ctr: 0,
peer_index,
client,
configuration,
peer_number,
}
}
pub fn dequeue(&mut self, message: Message) {
self.V[self.peer_index] += 1;
if self.configuration.track_causal_stability {
self.updatestability(self.peer_index, message);
}
}
pub fn receive(&mut self, j: usize, message: Message) {
if self.R[j] < message.version_vector[j] {
self.R[j] += 1;
if VersionVector::compare_version_vectors(j, &self.V, &message.version_vector) {
self.deliver_and_log_message(None, Some(message), Some(j));
if self.DQ.len() > 0 {
self.deliver();
}
} else {
let queue_node = QueueNode { j, message };
self.DQ.push(queue_node);
}
}
}
fn deliver(&mut self) {
let mut delivered_index = 0;
let mut received_index = 0;
loop {
if delivered_index >= self.DQ.len() {
if received_index < delivered_index {
self.DQ.truncate(received_index);
if self.DQ.len() > 0 {
delivered_index = 0;
received_index = 0;
} else {
break;
}
} else {
break;
}
} else {
let queue_node = self.DQ[delivered_index].clone();
if VersionVector::compare_version_vectors(
queue_node.j,
&self.V,
&queue_node.message.version_vector,
) {
self.deliver_and_log_message(Some(delivered_index), None, None);
delivered_index += 1;
} else {
self.DQ[received_index] = queue_node.clone();
received_index += 1;
delivered_index += 1;
}
}
}
}
fn deliver_and_log_message(
&mut self,
message_index: Option<usize>,
received_message: Option<Message>,
j: Option<usize>,
) {
let message: Message;
let sender_id: usize;
if let Some(index) = message_index {
message = self.DQ[index].message.clone();
sender_id = self.DQ[index].j;
} else {
message = received_message.unwrap();
sender_id = j.unwrap();
}
self.V[sender_id] += 1;
let delivered_message = MiddlewareClient::DELIVER {
sender_id,
message: message.clone(),
version_vector: message.version_vector.clone(),
};
self.client.send(delivered_message).unwrap();
if self.configuration.track_causal_stability {
self.updatestability(sender_id, message);
}
}
fn updatestability(&mut self, j: usize, message: Message) {
self.M[self.peer_index] = self.V.clone();
if j != self.peer_index {
self.M[j] = message.version_vector.clone();
}
let temp_dot = Dot::new(j, message.version_vector[j]);
self.ctr += 1;
if self.SMap.contains_key(&temp_dot) {
panic!("Repeated dot on SMap!");
}
let stable_dot = StableDot::new(self.ctr, j, message);
self.SMap.insert(temp_dot, stable_dot);
if self.M_entry_row_num.contains(&j) {
let newSV = self.calculateSV(j);
if !self.SV.equal(&newSV) {
let stable_dot_counters = VersionVector::dif(&newSV, &self.SV, newSV.len());
let mut SD: Vec<Dot> = Vec::new();
for (id, counter) in stable_dot_counters {
SD.push(Dot::new(id, counter));
}
self.SV = newSV;
self.stabilize(SD);
}
}
}
fn stabilize(&mut self, mut SD: Vec<Dot>) {
SD.sort_by(|dot_a, dot_b| {
let stable_dot_a = self.SMap.get(&dot_a).unwrap();
let stable_dot_b = self.SMap.get(&dot_b).unwrap();
stable_dot_a.ctr.cmp(&stable_dot_b.ctr)
});
for s in &SD {
if !self.SMap.contains_key(&s) {
let error_message =
format!("ERROR {} {:?} Dot key isn't in SMap", self.peer_index, s);
panic!(error_message);
}
let stable_dot = self.SMap.remove(&s).unwrap();
let stable_message = MiddlewareClient::STABLE {
sender_id: stable_dot.j,
message_id: stable_dot.message.id,
version_vector: stable_dot.message.version_vector,
};
self.client.send(stable_message).unwrap();
}
}
fn calculateSV(&mut self, sender_id: usize) -> VersionVector {
let mut new_sv = self.SV.clone();
let mut min: usize;
let mut min_row_num;
for column in 0..self.peer_number {
if self.M_entry_row_num[column] == sender_id {
min = self.M[0][column];
min_row_num = 0;
for row in 1..self.peer_number {
if self.M[row][column] < min {
min = self.M[row][column];
min_row_num = row;
}
}
new_sv[column] = min;
self.M_entry_row_num[column] = min_row_num;
}
}
new_sv
}
}