pub(crate) mod collector_checkpoints;
pub(crate) mod collector_commits;
pub(crate) mod collector_req_view_changes;
pub(crate) mod collector_view_changes;
use std::{collections::HashMap, hash::Hash};
use serde::{Deserialize, Serialize};
use shared_ids::ReplicaId;
use tracing::trace;
use crate::Config;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub(crate) struct CollectorMessages<K: Eq + Hash + PartialOrd + Clone, M>(
HashMap<K, HashMap<ReplicaId, M>>,
);
impl<K: Eq + Hash + PartialOrd + Clone, M> CollectorMessages<K, M> {
pub(crate) fn new() -> Self {
Self(HashMap::new())
}
fn collect(&mut self, msg: M, from: ReplicaId, key: K) -> u64 {
match self.0.get_mut(&key) {
Some(messages) => {
if messages.get(&from).is_some() {
trace!("Skipped inserting message (origin: {from:?}) into collector: Message was a duplicate.");
return messages.len() as u64;
}
messages.insert(from, msg);
trace!("Inserted message (origin: {from:?}) into collector.");
}
None => {
let mut messages = HashMap::new();
messages.insert(from, msg);
self.0.insert(key.clone(), messages);
trace!("Inserted message (origin: {from:?}) into collector.");
}
}
self.0.get_mut(&key).unwrap().len() as u64
}
fn retrieve(&mut self, key: K, config: &Config) -> Option<(M, Vec<M>)> {
let messages = self.0.get_mut(&key);
messages.as_ref()?;
let messages = messages.unwrap();
if messages.len() <= config.t.try_into().unwrap() {
return None;
}
if !messages.contains_key(&config.id) {
return None;
}
let mut my_retrieved_message = None;
let mut other_retrieved_messages = Vec::new();
let mut messages = self.0.remove(&key).unwrap();
for (rep_id, msg) in messages.drain() {
if rep_id == config.id {
my_retrieved_message = Some(msg);
} else {
other_retrieved_messages.push(msg);
}
}
self.0.retain(|n, _| n > &key);
let my_retrieved_message = my_retrieved_message?;
Some((my_retrieved_message, other_retrieved_messages))
}
}