use std::{
collections::{hash_map::Entry, HashMap, HashSet},
fmt,
fmt::Debug,
};
use libp2p_identity::PeerId;
use crate::{
topic::TopicHash,
types::{MessageId, RawMessage},
};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub(crate) struct CacheEntry {
mid: MessageId,
topic: TopicHash,
}
#[derive(Clone)]
pub(crate) struct MessageCache {
msgs: HashMap<MessageId, (RawMessage, HashSet<PeerId>)>,
iwant_counts: HashMap<MessageId, HashMap<PeerId, u32>>,
history: Vec<Vec<CacheEntry>>,
gossip: usize,
}
impl fmt::Debug for MessageCache {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("MessageCache")
.field("msgs", &self.msgs)
.field("history", &self.history)
.field("gossip", &self.gossip)
.finish()
}
}
impl MessageCache {
pub(crate) fn new(gossip: usize, history_capacity: usize) -> Self {
MessageCache {
gossip,
msgs: HashMap::default(),
iwant_counts: HashMap::default(),
history: vec![Vec::new(); history_capacity],
}
}
pub(crate) fn put(&mut self, message_id: &MessageId, msg: RawMessage) -> bool {
if self.history.is_empty() {
return true;
}
match self.msgs.entry(message_id.clone()) {
Entry::Occupied(_) => {
false
}
Entry::Vacant(entry) => {
let cache_entry = CacheEntry {
mid: message_id.clone(),
topic: msg.topic.clone(),
};
entry.insert((msg, HashSet::default()));
self.history[0].push(cache_entry);
tracing::trace!(message=?message_id, "Put message in mcache");
true
}
}
}
pub(crate) fn observe_duplicate(&mut self, message_id: &MessageId, source: &PeerId) {
if let Some((message, originating_peers)) = self.msgs.get_mut(message_id) {
if message.validated {
return;
}
originating_peers.insert(*source);
}
}
#[cfg(test)]
pub(crate) fn get(&self, message_id: &MessageId) -> Option<&RawMessage> {
self.msgs.get(message_id).map(|(message, _)| message)
}
pub(crate) fn get_with_iwant_counts(
&mut self,
message_id: &MessageId,
peer: &PeerId,
) -> Option<(&RawMessage, u32)> {
let iwant_counts = &mut self.iwant_counts;
self.msgs.get(message_id).and_then(|(message, _)| {
if !message.validated {
None
} else {
Some((message, {
let count = iwant_counts
.entry(message_id.clone())
.or_default()
.entry(*peer)
.or_default();
*count += 1;
*count
}))
}
})
}
pub(crate) fn validate(
&mut self,
message_id: &MessageId,
) -> Option<(&RawMessage, HashSet<PeerId>)> {
self.msgs.get_mut(message_id).map(|(message, known_peers)| {
message.validated = true;
let originating_peers = std::mem::take(known_peers);
(&*message, originating_peers)
})
}
pub(crate) fn get_gossip_message_ids(&self, topic: &TopicHash) -> Vec<MessageId> {
self.history[..self.gossip]
.iter()
.fold(vec![], |mut current_entries, entries| {
let mut found_entries: Vec<MessageId> = entries
.iter()
.filter_map(|entry| {
if &entry.topic == topic {
let mid = &entry.mid;
if let Some(true) = self.msgs.get(mid).map(|(msg, _)| msg.validated) {
Some(mid.clone())
} else {
None
}
} else {
None
}
})
.collect();
current_entries.append(&mut found_entries);
current_entries
})
}
pub(crate) fn shift(&mut self) {
if self.history.is_empty() {
return;
}
for entry in self.history.pop().expect("history is always > 1") {
if let Some((msg, _)) = self.msgs.remove(&entry.mid) {
if !msg.validated {
tracing::debug!(
message=%&entry.mid,
"The message got removed from the cache without being validated."
);
}
}
tracing::trace!(message=%&entry.mid, "Remove message from the cache");
self.iwant_counts.remove(&entry.mid);
}
self.history.insert(0, Vec::new());
}
pub(crate) fn remove(
&mut self,
message_id: &MessageId,
) -> Option<(RawMessage, HashSet<PeerId>)> {
self.iwant_counts.remove(message_id);
self.msgs.remove(message_id)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::IdentTopic as Topic;
fn gen_testm(x: u64, topic: TopicHash) -> (MessageId, RawMessage) {
let default_id = |message: &RawMessage| {
let mut source_string = message.source.as_ref().unwrap().to_base58();
source_string.push_str(&message.sequence_number.unwrap().to_string());
MessageId::from(source_string)
};
let u8x: u8 = x as u8;
let source = Some(PeerId::random());
let data: Vec<u8> = vec![u8x];
let sequence_number = Some(x);
let m = RawMessage {
source,
data,
sequence_number,
topic,
signature: None,
key: None,
validated: false,
};
let id = default_id(&m);
(id, m)
}
fn new_cache(gossip_size: usize, history: usize) -> MessageCache {
MessageCache::new(gossip_size, history)
}
#[test]
fn test_new_cache() {
let x: usize = 3;
let mc = new_cache(x, 5);
assert_eq!(mc.gossip, x);
}
#[test]
fn test_put_get_one() {
let mut mc = new_cache(10, 15);
let topic1_hash = Topic::new("topic1").hash();
let (id, m) = gen_testm(10, topic1_hash);
mc.put(&id, m.clone());
assert_eq!(mc.history[0].len(), 1);
let fetched = mc.get(&id);
assert_eq!(fetched.unwrap(), &m);
}
#[test]
fn test_get_wrong() {
let mut mc = new_cache(10, 15);
let topic1_hash = Topic::new("topic1").hash();
let (id, m) = gen_testm(10, topic1_hash);
mc.put(&id, m);
let wrong_id = MessageId::new(b"wrongid");
let fetched = mc.get(&wrong_id);
assert!(fetched.is_none());
}
#[test]
fn test_get_empty() {
let mc = new_cache(10, 15);
let wrong_string = MessageId::new(b"imempty");
let fetched = mc.get(&wrong_string);
assert!(fetched.is_none());
}
#[test]
fn test_shift() {
let mut mc = new_cache(1, 5);
let topic1_hash = Topic::new("topic1").hash();
for i in 0..10 {
let (id, m) = gen_testm(i, topic1_hash.clone());
mc.put(&id, m.clone());
}
mc.shift();
assert!(mc.history[0].is_empty());
assert!(mc.history[1].len() == 10);
assert!(mc.msgs.len() == 10);
}
#[test]
fn test_empty_shift() {
let mut mc = new_cache(1, 5);
let topic1_hash = Topic::new("topic1").hash();
for i in 0..10 {
let (id, m) = gen_testm(i, topic1_hash.clone());
mc.put(&id, m.clone());
}
mc.shift();
assert!(mc.history[0].is_empty());
assert!(mc.history[1].len() == 10);
mc.shift();
assert!(mc.history[2].len() == 10);
assert!(mc.history[1].is_empty());
assert!(mc.history[0].is_empty());
}
#[test]
fn test_remove_last_from_shift() {
let mut mc = new_cache(4, 5);
let topic1_hash = Topic::new("topic1").hash();
for i in 0..10 {
let (id, m) = gen_testm(i, topic1_hash.clone());
mc.put(&id, m.clone());
}
mc.shift();
mc.shift();
mc.shift();
mc.shift();
assert_eq!(mc.history[mc.history.len() - 1].len(), 10);
mc.shift();
assert_eq!(mc.history[mc.history.len() - 1].len(), 0);
assert_eq!(mc.history[0].len(), 0);
assert_eq!(mc.msgs.len(), 0);
}
}