use crate::service::AllowedKeys;
use alloy_primitives::Address;
use blueprint_core::debug;
use blueprint_crypto::BytesEncoding;
use blueprint_crypto::KeyType;
use blueprint_crypto::hashing::keccak_256;
use crossbeam_channel::Receiver;
use dashmap::{DashMap, DashSet};
use libp2p::{PeerId, core::Multiaddr, identify};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::{
collections::HashSet,
sync::Arc,
time::{Duration, Instant, SystemTime},
};
use tokio::sync::broadcast;
use super::utils::{get_address_from_pubkey, secp256k1_ecdsa_recover};
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub enum VerificationIdentifierKey<K: KeyType> {
EvmAddress(Address),
InstancePublicKey(K::Public),
}
impl<K: KeyType> VerificationIdentifierKey<K> {
pub fn verify(
&self,
msg: &[u8],
signature: &[u8],
) -> Result<bool, Box<dyn blueprint_std::error::Error>> {
match self {
VerificationIdentifierKey::EvmAddress(address) => {
let msg = keccak_256(msg);
let mut sig: [u8; 65] = [0u8; 65];
sig[..signature.len()].copy_from_slice(signature);
let pubkey = secp256k1_ecdsa_recover(&sig, &msg)?;
let address_from_pk = get_address_from_pubkey(&pubkey);
Ok(address_from_pk == *address)
}
VerificationIdentifierKey::InstancePublicKey(public_key) => {
let signature = K::Signature::from_bytes(signature)?;
Ok(K::verify(public_key, msg, &signature))
}
}
}
pub fn to_bytes(&self) -> Vec<u8> {
match self {
VerificationIdentifierKey::EvmAddress(address) => address.0.0.to_vec(),
VerificationIdentifierKey::InstancePublicKey(public_key) => public_key.to_bytes(),
}
}
}
#[derive(Clone, Debug)]
pub struct PeerInfo {
pub addresses: HashSet<Multiaddr>,
pub identify_info: Option<identify::Info>,
pub last_seen: SystemTime,
pub ping_latency: Option<Duration>,
pub successes: u32,
pub failures: u32,
pub average_response_time: Option<Duration>,
}
impl Default for PeerInfo {
fn default() -> Self {
Self {
addresses: HashSet::new(),
identify_info: None,
last_seen: SystemTime::now(),
ping_latency: None,
successes: 0,
failures: 0,
average_response_time: None,
}
}
}
#[derive(Debug, Clone)]
pub enum PeerEvent {
PeerUpdated {
peer_id: PeerId,
info: Box<PeerInfo>,
},
PeerRemoved { peer_id: PeerId, reason: String },
PeerBanned {
peer_id: PeerId,
reason: String,
expires_at: Option<Instant>,
},
PeerUnbanned { peer_id: PeerId },
}
pub struct PeerManager<K: KeyType> {
peers: DashMap<PeerId, PeerInfo>,
verified_peers: DashSet<PeerId>,
verification_id_keys_to_peer_ids: Arc<DashMap<VerificationIdentifierKey<K>, PeerId>>,
banned_peers: DashMap<PeerId, Option<Instant>>,
pub whitelisted_keys: Arc<RwLock<Vec<VerificationIdentifierKey<K>>>>,
event_tx: broadcast::Sender<PeerEvent>,
}
impl<K: KeyType> Default for PeerManager<K> {
fn default() -> Self {
Self::new(AllowedKeys::InstancePublicKeys(HashSet::default()))
}
}
impl<K: KeyType> PeerManager<K> {
#[must_use]
pub fn new(allowed_keys: AllowedKeys<K>) -> Self {
let (event_tx, _) = broadcast::channel(100);
Self {
peers: DashMap::default(),
banned_peers: DashMap::default(),
verified_peers: DashSet::default(),
verification_id_keys_to_peer_ids: Arc::new(DashMap::default()),
whitelisted_keys: match allowed_keys {
AllowedKeys::EvmAddresses(addresses) => Arc::new(RwLock::new(
addresses
.into_iter()
.map(VerificationIdentifierKey::EvmAddress)
.collect(),
)),
AllowedKeys::InstancePublicKeys(keys) => Arc::new(RwLock::new(
keys.into_iter()
.map(VerificationIdentifierKey::InstancePublicKey)
.collect(),
)),
},
event_tx,
}
}
pub fn run_allowed_keys_updater(&self, allowed_keys_rx: &Receiver<AllowedKeys<K>>) {
while let Ok(allowed_keys) = allowed_keys_rx.recv() {
self.clear_whitelisted_keys();
self.insert_whitelisted_keys(allowed_keys);
}
}
pub fn clear_whitelisted_keys(&self) {
self.whitelisted_keys.write().clear();
}
pub fn insert_whitelisted_keys(&self, keys: AllowedKeys<K>) {
match keys {
AllowedKeys::EvmAddresses(addresses) => {
self.whitelisted_keys.write().extend(
addresses
.into_iter()
.map(VerificationIdentifierKey::EvmAddress),
);
}
AllowedKeys::InstancePublicKeys(keys) => {
self.whitelisted_keys.write().extend(
keys.into_iter()
.map(VerificationIdentifierKey::InstancePublicKey),
);
}
}
}
#[must_use]
pub fn is_key_whitelisted(&self, key: &VerificationIdentifierKey<K>) -> bool {
self.whitelisted_keys.read().contains(key)
}
pub fn handle_nonwhitelisted_peer(&self, peer: &PeerId) {
self.remove_peer(peer, "non-whitelisted");
self.ban_peer(*peer, "non-whitelisted", None);
}
#[must_use]
pub fn subscribe(&self) -> broadcast::Receiver<PeerEvent> {
self.event_tx.subscribe()
}
pub fn update_peer(&self, peer_id: PeerId, mut info: PeerInfo) {
info.last_seen = SystemTime::now();
self.peers.insert(peer_id, info.clone());
let _ = self.event_tx.send(PeerEvent::PeerUpdated {
peer_id,
info: Box::new(info),
});
}
pub fn remove_peer(&self, peer_id: &PeerId, reason: impl Into<String>) {
if self.peers.remove(peer_id).is_some() {
let reason = reason.into();
debug!(%peer_id, %reason, "removed peer");
let _ = self.event_tx.send(PeerEvent::PeerRemoved {
peer_id: *peer_id,
reason,
});
}
}
pub fn verify_peer(&self, peer_id: &PeerId) {
self.verified_peers.insert(*peer_id);
}
#[must_use]
pub fn is_peer_verified(&self, peer_id: &PeerId) -> bool {
self.verified_peers.contains(peer_id)
}
pub fn ban_peer(&self, peer_id: PeerId, reason: impl Into<String>, duration: Option<Duration>) {
let expires_at = duration.map(|d| Instant::now() + d);
self.remove_peer(&peer_id, "banned");
self.banned_peers.insert(peer_id, expires_at);
let reason = reason.into();
debug!(%peer_id, %reason, "banned peer");
let _ = self.event_tx.send(PeerEvent::PeerBanned {
peer_id,
reason,
expires_at,
});
}
pub fn ban_peer_with_default_duration(&self, peer: PeerId, reason: impl Into<String>) {
const BAN_PEER_DURATION: Duration = Duration::from_secs(60 * 60); self.ban_peer(peer, reason, Some(BAN_PEER_DURATION));
}
pub fn unban_peer(&self, peer_id: &PeerId) {
if self.banned_peers.remove(peer_id).is_some() {
debug!(%peer_id, "unbanned peer");
let _ = self
.event_tx
.send(PeerEvent::PeerUnbanned { peer_id: *peer_id });
}
}
#[must_use]
pub fn is_banned(&self, peer_id: &PeerId) -> bool {
match self.banned_peers.get(peer_id) {
Some(entry) => {
if let Some(expiry) = *entry
&& Instant::now() >= expiry
{
drop(entry);
self.banned_peers.remove(peer_id);
return false;
}
true
}
None => false,
}
}
pub fn log_success(&self, peer_id: &PeerId, duration: Duration) {
if let Some(mut info) = self.peers.get_mut(peer_id) {
info.successes += 1;
update_average_time(&mut info, duration);
self.update_peer(*peer_id, info.clone());
}
}
pub fn log_failure(&self, peer_id: &PeerId, duration: Duration) {
if let Some(mut info) = self.peers.get_mut(peer_id) {
info.failures += 1;
update_average_time(&mut info, duration);
self.update_peer(*peer_id, info.clone());
}
}
#[must_use]
pub fn get_peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
self.peers.get(peer_id).map(|info| info.value().clone())
}
#[must_use]
pub fn get_peers(&self) -> DashMap<PeerId, PeerInfo> {
self.peers.clone()
}
#[must_use]
pub fn peer_count(&self) -> usize {
self.peers.len()
}
pub async fn run_ban_cleanup(self: Arc<Self>) {
loop {
let now = Instant::now();
let mut to_unban = Vec::new();
let banned_peers = self.banned_peers.clone().into_read_only();
for (peer_id, expires_at) in banned_peers.iter() {
if let Some(expiry) = expires_at
&& now >= *expiry
{
to_unban.push(*peer_id);
}
}
for peer_id in to_unban {
self.unban_peer(&peer_id);
}
tokio::time::sleep(Duration::from_secs(60)).await;
}
}
pub fn link_peer_id_to_verification_id_key(
&self,
peer_id: &PeerId,
verification_id_key: &VerificationIdentifierKey<K>,
) {
self.verification_id_keys_to_peer_ids
.insert(verification_id_key.clone(), *peer_id);
}
pub fn remove_peer_id_from_verification_id_key(&self, peer_id: &PeerId) {
self.verification_id_keys_to_peer_ids
.retain(|_, id| id != peer_id);
}
#[must_use]
pub fn get_peer_id_from_verification_id_key(
&self,
verification_id_key: &VerificationIdentifierKey<K>,
) -> Option<PeerId> {
self.verification_id_keys_to_peer_ids
.get(verification_id_key)
.map(|id| *id)
}
#[must_use]
pub fn get_key_position_in_whitelist(
&self,
key: &VerificationIdentifierKey<K>,
) -> Option<usize> {
let whitelist = self.whitelisted_keys.read();
whitelist.iter().position(|k| k == key)
}
#[must_use]
pub fn get_key_from_whitelist_index(
&self,
index: usize,
) -> Option<VerificationIdentifierKey<K>> {
self.whitelisted_keys.read().get(index).cloned()
}
#[must_use]
pub fn get_peer_id_from_whitelist_index(&self, index: usize) -> Option<PeerId> {
self.whitelisted_keys
.read()
.get(index)
.and_then(|k| self.get_peer_id_from_verification_id_key(k))
}
#[must_use]
pub fn get_whitelist_index_from_peer_id(&self, peer_id: &PeerId) -> Option<usize> {
self.whitelisted_keys
.read()
.iter()
.position(|k| self.get_peer_id_from_verification_id_key(k) == Some(*peer_id))
}
}
fn update_average_time(info: &mut PeerInfo, duration: Duration) {
const ALPHA: u32 = 5;
if info.average_response_time.is_none() {
info.average_response_time = Some(duration);
} else if duration < info.average_response_time.unwrap() {
let delta = (info.average_response_time.unwrap() - duration) / ALPHA;
info.average_response_time = Some(info.average_response_time.unwrap() - delta);
} else {
let delta = (duration - info.average_response_time.unwrap()) / ALPHA;
info.average_response_time = Some(info.average_response_time.unwrap() + delta);
}
}