use std::sync::Arc;
use ahash::AHashMap;
use cid::Cid;
use libp2p::PeerId;
use tokio::sync::RwLock;
#[derive(Debug, Clone, Default)]
pub struct BlockPresenceManager {
presence: Arc<RwLock<AHashMap<Cid, AHashMap<PeerId, bool>>>>,
}
impl BlockPresenceManager {
pub fn new() -> Self {
Self::default()
}
pub async fn receive_from(&self, peer: &PeerId, haves: &[Cid], dont_haves: &[Cid]) {
let presence = &mut *self.presence.write().await;
for key in haves {
update_block_presence(presence, peer, key, true);
}
for key in dont_haves {
update_block_presence(presence, peer, key, false);
}
}
pub async fn peer_has_block(&self, peer: &PeerId, cid: &Cid) -> bool {
let presence = self.presence.read().await;
if let Some(list) = presence.get(cid) {
if let Some(have) = list.get(peer) {
return *have;
}
}
false
}
pub async fn peer_does_not_have_block(&self, peer: &PeerId, cid: &Cid) -> bool {
let presence = self.presence.read().await;
if let Some(list) = presence.get(cid) {
if let Some(have) = list.get(peer) {
!have
} else {
false
}
} else {
false
}
}
pub async fn all_peers_do_not_have_block(
&self,
peers: &[PeerId],
keys: impl IntoIterator<Item = Cid>,
) -> Vec<Cid> {
let presence = &*self.presence.read().await;
let mut res = Vec::new();
for key in keys.into_iter() {
if all_dont_have(presence, peers, &key) {
res.push(key);
}
}
res
}
pub async fn remove_keys(&self, keys: &[Cid]) {
let presence = &mut *self.presence.write().await;
for key in keys {
presence.remove(key);
}
}
pub async fn has_key(&self, cid: &Cid) -> bool {
let presence = &*self.presence.read().await;
presence.contains_key(cid)
}
}
fn update_block_presence(
presence: &mut AHashMap<Cid, AHashMap<PeerId, bool>>,
peer: &PeerId,
key: &Cid,
present: bool,
) {
let entry = presence.entry(*key).or_default();
if let Some(has) = entry.get(peer) {
if *has {
return;
}
}
entry.insert(*peer, present);
}
fn all_dont_have(
presence: &AHashMap<Cid, AHashMap<PeerId, bool>>,
peers: &[PeerId],
key: &Cid,
) -> bool {
if let Some(ps) = presence.get(key) {
for peer in peers {
if let Some(has) = ps.get(peer) {
if *has {
return false;
}
} else {
return false;
}
}
true
} else {
false
}
}
#[cfg(test)]
mod tests {
use crate::block::tests::create_random_block_v1;
use super::*;
#[tokio::test]
async fn test_block_presence_manager() {
let bpm = BlockPresenceManager::new();
let p = PeerId::random();
let cids = gen_cids(2);
let c0 = cids[0];
let c1 = cids[1];
assert!(!bpm.peer_has_block(&p, &c0).await);
assert!(!bpm.peer_does_not_have_block(&p, &c0).await);
bpm.receive_from(&p, &[c0][..], &[c1][..]).await;
assert!(bpm.peer_has_block(&p, &c0).await);
assert!(!bpm.peer_does_not_have_block(&p, &c0).await);
assert!(!bpm.peer_has_block(&p, &c1).await);
assert!(bpm.peer_does_not_have_block(&p, &c1).await);
bpm.receive_from(&p, &[c1][..], &[c0][..]).await;
assert!(bpm.peer_has_block(&p, &c0).await);
assert!(!bpm.peer_does_not_have_block(&p, &c0).await);
assert!(bpm.peer_has_block(&p, &c1).await);
assert!(!bpm.peer_does_not_have_block(&p, &c1).await);
bpm.remove_keys(&[c0][..]).await;
assert!(!bpm.peer_has_block(&p, &c0).await);
assert!(!bpm.peer_does_not_have_block(&p, &c0).await);
bpm.remove_keys(&[c1][..]).await;
assert!(!bpm.peer_has_block(&p, &c1).await);
assert!(!bpm.peer_does_not_have_block(&p, &c1).await);
}
#[tokio::test]
async fn test_add_remove_multi() {
let bpm = BlockPresenceManager::new();
let p0 = PeerId::random();
let p1 = PeerId::random();
let cids = gen_cids(3);
let c0 = cids[0];
let c1 = cids[1];
let c2 = cids[2];
bpm.receive_from(&p0, &[c0, c1][..], &[c1, c2][..]).await;
bpm.receive_from(&p1, &[c1, c2][..], &[c0][..]).await;
assert!(bpm.peer_has_block(&p0, &c0).await);
assert!(bpm.peer_has_block(&p0, &c1).await);
assert!(bpm.peer_does_not_have_block(&p0, &c2).await);
assert!(bpm.peer_has_block(&p1, &c1).await);
assert!(bpm.peer_has_block(&p1, &c2).await);
assert!(bpm.peer_does_not_have_block(&p1, &c0).await);
bpm.remove_keys(&[c1, c2][..]).await;
assert!(bpm.peer_has_block(&p0, &c0).await);
assert!(bpm.peer_does_not_have_block(&p1, &c0).await);
assert!(!bpm.peer_has_block(&p0, &c1).await);
assert!(!bpm.peer_does_not_have_block(&p0, &c1).await);
assert!(!bpm.peer_has_block(&p0, &c2).await);
assert!(!bpm.peer_does_not_have_block(&p0, &c2).await);
assert!(!bpm.peer_has_block(&p1, &c1).await);
assert!(!bpm.peer_does_not_have_block(&p1, &c1).await);
assert!(!bpm.peer_has_block(&p1, &c2).await);
assert!(!bpm.peer_does_not_have_block(&p1, &c2).await);
}
#[tokio::test]
async fn test_all_peers_do_not_have_block() {
let bpm = BlockPresenceManager::new();
let p0 = PeerId::random();
let p1 = PeerId::random();
let p2 = PeerId::random();
let cids = gen_cids(3);
let c0 = cids[0];
let c1 = cids[1];
let c2 = cids[2];
bpm.receive_from(&p0, &[][..], &[c1, c2][..]).await;
bpm.receive_from(&p1, &[c1][..], &[c0][..]).await;
bpm.receive_from(&p2, &[c0, c1][..], &[c2][..]).await;
struct TestCase {
peers: Vec<PeerId>,
ks: Vec<Cid>,
exp: Vec<Cid>,
}
let testcases = [
TestCase {
peers: vec![p0],
ks: vec![c0],
exp: vec![],
},
TestCase {
peers: vec![p1],
ks: vec![c0],
exp: vec![c0],
},
TestCase {
peers: vec![p2],
ks: vec![c0],
exp: vec![],
},
TestCase {
peers: vec![p0],
ks: vec![c1],
exp: vec![c1],
},
TestCase {
peers: vec![p1],
ks: vec![c1],
exp: vec![],
},
TestCase {
peers: vec![p2],
ks: vec![c1],
exp: vec![],
},
TestCase {
peers: vec![p0],
ks: vec![c2],
exp: vec![c2],
},
TestCase {
peers: vec![p1],
ks: vec![c2],
exp: vec![],
},
TestCase {
peers: vec![p2],
ks: vec![c2],
exp: vec![c2],
},
TestCase {
peers: vec![p0],
ks: vec![c0, c1, c2],
exp: vec![c1, c2],
},
TestCase {
peers: vec![p0, p1],
ks: vec![c0, c1, c2],
exp: vec![],
},
TestCase {
peers: vec![p0, p2],
ks: vec![c0, c1, c2],
exp: vec![c2],
},
TestCase {
peers: vec![p0, p1, p2],
ks: vec![c0, c1, c2],
exp: vec![],
},
];
for (i, mut tc) in testcases.into_iter().enumerate() {
let mut peers = bpm.all_peers_do_not_have_block(&tc.peers, tc.ks).await;
peers.sort();
tc.exp.sort();
assert_eq!(
peers, tc.exp,
"test case {i} failed: expected matching keys"
);
}
}
fn gen_cids(n: usize) -> Vec<Cid> {
(0..n).map(|_| *create_random_block_v1().cid()).collect()
}
}