use std::collections::hash_map::Entry::Occupied;
use std::collections::{HashMap, HashSet};
use std::time::Instant;
use crate::collections::BiHashMap;
use super::error::PeerUpdateError;
use super::{PeerAuthorizationToken, PeerTokenPair};
#[derive(Clone, PartialEq, Debug)]
pub enum PeerStatus {
Connected,
Pending,
Disconnected { retry_attempts: u64 },
}
#[derive(Clone, PartialEq, Debug)]
pub struct PeerMetadata {
pub id: PeerAuthorizationToken,
pub connection_id: String,
pub endpoints: Vec<String>,
pub active_endpoint: String,
pub status: PeerStatus,
pub last_connection_attempt: Instant,
pub retry_frequency: u64,
pub required_local_auth: PeerAuthorizationToken,
}
pub struct PeerMap {
peers: HashMap<PeerTokenPair, PeerMetadata>,
endpoints: HashMap<String, HashSet<PeerTokenPair>>,
initial_retry_frequency: u64,
removed_connection_ids: HashMap<String, PeerTokenPair>,
}
impl PeerMap {
pub fn new(initial_retry_frequency: u64) -> Self {
gauge!("splinter.peer_manager.peers", 0.0);
PeerMap {
peers: HashMap::new(),
endpoints: HashMap::new(),
initial_retry_frequency,
removed_connection_ids: HashMap::new(),
}
}
pub fn peer_ids(&self) -> Vec<PeerAuthorizationToken> {
self.peers
.iter()
.map(|(_, metadata)| metadata.id.clone())
.collect()
}
pub fn connection_ids(&self) -> BiHashMap<PeerTokenPair, String> {
let mut peer_to_connection_id = BiHashMap::new();
for (peer, metadata) in self.peers.iter() {
peer_to_connection_id.insert(peer.clone(), metadata.connection_id.to_string());
}
peer_to_connection_id
}
#[allow(clippy::too_many_arguments)]
pub fn insert(
&mut self,
peer_id: PeerAuthorizationToken,
connection_id: String,
endpoints: Vec<String>,
active_endpoint: String,
status: PeerStatus,
required_local_auth: PeerAuthorizationToken,
removed_connection_ids: Vec<String>,
) {
let peer_metadata = PeerMetadata {
id: peer_id.clone(),
endpoints: endpoints.clone(),
active_endpoint,
status,
connection_id,
last_connection_attempt: Instant::now(),
retry_frequency: self.initial_retry_frequency,
required_local_auth: required_local_auth.clone(),
};
let peer_token_pair = PeerTokenPair::new(peer_id, required_local_auth);
self.peers.insert(peer_token_pair.clone(), peer_metadata);
for endpoint in endpoints {
if let Some(peer_tokens) = self.endpoints.get_mut(&endpoint) {
peer_tokens.insert(peer_token_pair.clone());
} else {
let mut peer_tokens = HashSet::new();
peer_tokens.insert(peer_token_pair.clone());
self.endpoints.insert(endpoint.clone(), peer_tokens);
}
}
for old_connection_id in removed_connection_ids.into_iter() {
self.removed_connection_ids
.insert(old_connection_id.to_string(), peer_token_pair.clone());
}
gauge!("splinter.peer_manager.peers", self.peers.len() as f64);
}
pub fn remove(&mut self, peer_id: &PeerTokenPair) -> Option<PeerMetadata> {
if let Some(peer_metadata) = self.peers.remove(peer_id) {
for endpoint in peer_metadata.endpoints.iter() {
if let Some(mut peer_tokens) = self.endpoints.remove(endpoint) {
if peer_tokens.len() > 1 {
peer_tokens = peer_tokens
.into_iter()
.filter(|token| token != peer_id)
.collect();
self.endpoints.insert(endpoint.clone(), peer_tokens);
}
}
}
gauge!("splinter.peer_manager.peers", self.peers.len() as f64);
Some(peer_metadata)
} else {
gauge!("splinter.peer_manager.peers", self.peers.len() as f64);
None
}
}
pub fn update_peer(&mut self, peer_metadata: PeerMetadata) -> Result<(), PeerUpdateError> {
let peer_token_pair = PeerTokenPair::new(
peer_metadata.id.clone(),
peer_metadata.required_local_auth.clone(),
);
if let Occupied(mut peer_entry) = self.peers.entry(peer_token_pair.clone()) {
for endpoint in peer_metadata.endpoints.iter() {
if let Some(peer_tokens) = self.endpoints.get_mut(endpoint) {
peer_tokens.insert(peer_token_pair.clone());
} else {
let mut peer_tokens = HashSet::new();
peer_tokens.insert(peer_token_pair.clone());
self.endpoints.insert(endpoint.clone(), peer_tokens);
}
}
if peer_metadata.connection_id != peer_entry.get().connection_id {
self.removed_connection_ids
.insert(peer_entry.get().connection_id.to_string(), peer_token_pair);
}
peer_entry.insert(peer_metadata);
Ok(())
} else {
Err(PeerUpdateError(format!(
"Unable to update peer {}, does not exist",
peer_token_pair
)))
}
}
pub fn get_peer_from_endpoint(&self, endpoint: &str) -> Option<Vec<PeerMetadata>> {
if let Some(peer_tokens) = self.endpoints.get(endpoint) {
let mut peers = Vec::new();
for token in peer_tokens {
if let Some(peer_metadata) = self.peers.get(token) {
peers.push(peer_metadata.clone())
}
}
if peers.is_empty() {
None
} else {
Some(peers)
}
} else {
None
}
}
pub fn get_by_peer_id(&self, peer_id: &PeerTokenPair) -> Option<&PeerMetadata> {
self.peers.get(peer_id)
}
pub fn get_by_connection_id(&self, connection_id: &str) -> Option<&PeerMetadata> {
self.peers
.values()
.find(|meta| meta.connection_id == connection_id)
.or_else(|| {
if let Some(peer_id) = self.removed_connection_ids.get(connection_id) {
self.peers.get(peer_id)
} else {
None
}
})
}
pub fn get_pending(&self) -> impl Iterator<Item = (&PeerTokenPair, &PeerMetadata)> {
self.peers
.iter()
.filter(|(_id, peer_meta)| peer_meta.status == PeerStatus::Pending)
}
pub fn contains_endpoint(&self, endpoint: &str) -> bool {
self.endpoints.contains_key(endpoint)
}
}
#[cfg(test)]
pub mod tests {
use super::*;
#[test]
fn test_get_peer_ids() {
let mut peer_map = PeerMap::new(10);
let peers = peer_map.peer_ids();
assert_eq!(peers, Vec::<PeerAuthorizationToken>::new());
peer_map.insert(
PeerAuthorizationToken::Trust {
peer_id: "test_peer".to_string(),
},
"connection_id_1".to_string(),
vec!["test_endpoint1".to_string(), "test_endpoint2".to_string()],
"test_endpoint2".to_string(),
PeerStatus::Connected,
PeerAuthorizationToken::from_peer_id("my_id"),
vec![],
);
peer_map.insert(
PeerAuthorizationToken::Trust {
peer_id: "next_peer".to_string(),
},
"connection_id_2".to_string(),
vec!["endpoint1".to_string(), "endpoint2".to_string()],
"next_endpoint1".to_string(),
PeerStatus::Connected,
PeerAuthorizationToken::from_peer_id("my_id"),
vec![],
);
let mut peers = peer_map.peer_ids();
peers.sort();
assert_eq!(
peers,
vec![
PeerAuthorizationToken::Trust {
peer_id: "next_peer".to_string()
},
PeerAuthorizationToken::Trust {
peer_id: "test_peer".to_string()
}
]
);
}
#[test]
fn test_get_connection_ids() {
let mut peer_map = PeerMap::new(10);
let peers = peer_map.peer_ids();
assert_eq!(peers, Vec::<PeerAuthorizationToken>::new());
peer_map.insert(
PeerAuthorizationToken::Trust {
peer_id: "test_peer".to_string(),
},
"connection_id_1".to_string(),
vec!["test_endpoint1".to_string(), "test_endpoint2".to_string()],
"test_endpoint2".to_string(),
PeerStatus::Connected,
PeerAuthorizationToken::from_peer_id("my_id"),
vec![],
);
peer_map.insert(
PeerAuthorizationToken::Trust {
peer_id: "next_peer".to_string(),
},
"connection_id_2".to_string(),
vec!["endpoint1".to_string(), "endpoint2".to_string()],
"next_endpoint1".to_string(),
PeerStatus::Connected,
PeerAuthorizationToken::from_peer_id("my_id"),
vec![],
);
let peers = peer_map.connection_ids();
assert_eq!(
peers.get_by_key(&PeerTokenPair::new(
PeerAuthorizationToken::Trust {
peer_id: "test_peer".to_string(),
},
PeerAuthorizationToken::Trust {
peer_id: "my_id".to_string(),
},
)),
Some(&"connection_id_1".to_string())
);
assert_eq!(
peers.get_by_key(&PeerTokenPair::new(
PeerAuthorizationToken::Trust {
peer_id: "next_peer".to_string(),
},
PeerAuthorizationToken::Trust {
peer_id: "my_id".to_string(),
},
)),
Some(&"connection_id_2".to_string())
);
}
#[test]
fn test_get_peer_by_endpoint() {
let mut peer_map = PeerMap::new(10);
let peer_metadata = peer_map.get_peer_from_endpoint("bad_endpoint");
assert_eq!(peer_metadata, None);
peer_map.insert(
PeerAuthorizationToken::Trust {
peer_id: "test_peer".to_string(),
},
"connection_id".to_string(),
vec!["test_endpoint1".to_string(), "test_endpoint2".to_string()],
"test_endpoint2".to_string(),
PeerStatus::Pending,
PeerAuthorizationToken::from_peer_id("my_id"),
vec![],
);
let peer_metadata = peer_map
.get_peer_from_endpoint("test_endpoint1")
.expect("missing expected peer_metadata");
assert!(!peer_metadata.is_empty());
assert_eq!(
peer_metadata[0].id,
PeerAuthorizationToken::Trust {
peer_id: "test_peer".to_string()
}
);
assert_eq!(
peer_metadata[0].endpoints,
vec!["test_endpoint1".to_string(), "test_endpoint2".to_string()]
);
assert_eq!(
peer_metadata[0].active_endpoint,
"test_endpoint2".to_string()
);
assert_eq!(peer_metadata[0].status, PeerStatus::Pending);
assert_eq!(
Some(peer_metadata),
peer_map.get_peer_from_endpoint("test_endpoint2")
);
}
#[test]
fn test_insert_peer() {
let mut peer_map = PeerMap::new(10);
peer_map.insert(
PeerAuthorizationToken::Trust {
peer_id: "test_peer".to_string(),
},
"connection_id".to_string(),
vec!["test_endpoint1".to_string(), "test_endpoint2".to_string()],
"test_endpoint2".to_string(),
PeerStatus::Pending,
PeerAuthorizationToken::from_peer_id("my_id"),
vec![],
);
assert!(peer_map.peers.contains_key(&PeerTokenPair::new(
PeerAuthorizationToken::Trust {
peer_id: "test_peer".to_string(),
},
PeerAuthorizationToken::Trust {
peer_id: "my_id".to_string(),
},
)));
let peer_metadata = peer_map
.peers
.get(&PeerTokenPair::new(
PeerAuthorizationToken::Trust {
peer_id: "test_peer".to_string(),
},
PeerAuthorizationToken::Trust {
peer_id: "my_id".to_string(),
},
))
.expect("Missing peer_metadata");
assert_eq!(
peer_metadata.id,
PeerAuthorizationToken::Trust {
peer_id: "test_peer".to_string()
}
);
assert_eq!(
peer_metadata.endpoints,
vec!["test_endpoint1".to_string(), "test_endpoint2".to_string()]
);
assert_eq!(peer_metadata.active_endpoint, "test_endpoint2".to_string());
assert_eq!(peer_metadata.status, PeerStatus::Pending);
}
#[test]
fn test_remove_peer() {
let mut peer_map = PeerMap::new(10);
let peer_metdata = peer_map.remove(&PeerTokenPair::new(
PeerAuthorizationToken::Trust {
peer_id: "next_peer".to_string(),
},
PeerAuthorizationToken::Trust {
peer_id: "my_id".to_string(),
},
));
assert_eq!(peer_metdata, None);
peer_map.insert(
PeerAuthorizationToken::Trust {
peer_id: "test_peer".to_string(),
},
"connection_id".to_string(),
vec!["test_endpoint1".to_string(), "test_endpoint2".to_string()],
"test_endpoint2".to_string(),
PeerStatus::Pending,
PeerAuthorizationToken::from_peer_id("my_id"),
vec![],
);
assert!(peer_map.peers.contains_key(&PeerTokenPair::new(
PeerAuthorizationToken::Trust {
peer_id: "test_peer".to_string(),
},
PeerAuthorizationToken::Trust {
peer_id: "my_id".to_string(),
},
)));
let peer_metadata = peer_map
.remove(&PeerTokenPair::new(
PeerAuthorizationToken::Trust {
peer_id: "test_peer".to_string(),
},
PeerAuthorizationToken::Trust {
peer_id: "my_id".to_string(),
},
))
.expect("Missing peer_metadata");
assert!(!peer_map.peers.contains_key(&PeerTokenPair::new(
PeerAuthorizationToken::Trust {
peer_id: "test_peer".to_string(),
},
PeerAuthorizationToken::Trust {
peer_id: "my_id".to_string(),
},
)));
assert_eq!(peer_metadata.active_endpoint, "test_endpoint2".to_string());
assert_eq!(
peer_metadata.id,
PeerAuthorizationToken::Trust {
peer_id: "test_peer".to_string()
},
);
}
#[test]
fn test_get_update_active_endpoint() {
let mut peer_map = PeerMap::new(10);
let no_peer_metadata = PeerMetadata {
id: PeerAuthorizationToken::Trust {
peer_id: "test_peer".to_string(),
},
connection_id: "connection_id".to_string(),
endpoints: vec!["test_endpoint1".to_string(), "test_endpoint2".to_string()],
active_endpoint: "test_endpoint1".to_string(),
status: PeerStatus::Connected,
last_connection_attempt: Instant::now(),
retry_frequency: 10,
required_local_auth: PeerAuthorizationToken::from_peer_id("my_id"),
};
if let Ok(()) = peer_map.update_peer(no_peer_metadata) {
panic!("Should not have been able to update peer because test_peer does not exist")
}
peer_map.insert(
PeerAuthorizationToken::Trust {
peer_id: "test_peer".to_string(),
},
"connection_id".to_string(),
vec!["test_endpoint1".to_string(), "test_endpoint2".to_string()],
"test_endpoint2".to_string(),
PeerStatus::Connected,
PeerAuthorizationToken::from_peer_id("my_id"),
vec![],
);
assert!(peer_map.peers.contains_key(&PeerTokenPair::new(
PeerAuthorizationToken::Trust {
peer_id: "test_peer".to_string(),
},
PeerAuthorizationToken::Trust {
peer_id: "my_id".to_string(),
},
)));
let mut peer_metadata = peer_map
.get_by_connection_id("connection_id")
.cloned()
.expect("Unable to retrieve peer metadata with endpoint");
peer_metadata.active_endpoint = "test_endpoint1".to_string();
peer_metadata.endpoints.push("new_endpoint".to_string());
peer_metadata.status = PeerStatus::Disconnected { retry_attempts: 5 };
peer_map
.update_peer(peer_metadata)
.expect("Unable to update endpoint");
let peer_metadata = peer_map
.peers
.get(&PeerTokenPair::new(
PeerAuthorizationToken::Trust {
peer_id: "test_peer".to_string(),
},
PeerAuthorizationToken::Trust {
peer_id: "my_id".to_string(),
},
))
.expect("Missing peer_metadata");
assert_eq!(
peer_metadata.id,
PeerAuthorizationToken::Trust {
peer_id: "test_peer".to_string(),
}
);
assert_eq!(
peer_metadata.endpoints,
vec![
"test_endpoint1".to_string(),
"test_endpoint2".to_string(),
"new_endpoint".to_string()
]
);
assert_eq!(peer_metadata.active_endpoint, "test_endpoint1".to_string());
assert_eq!(
peer_metadata.status,
PeerStatus::Disconnected { retry_attempts: 5 }
);
}
}