use std::collections::HashMap;
use super::error::PeerUpdateError;
#[derive(Clone, PartialEq, Debug)]
pub enum PeerStatus {
Connected,
Disconnected { retry_attempts: u64 },
}
#[derive(Clone, PartialEq, Debug)]
pub struct PeerMetadata {
pub id: String,
pub connection_id: String,
pub endpoints: Vec<String>,
pub active_endpoint: String,
pub status: PeerStatus,
}
pub struct PeerMap {
peers: HashMap<String, PeerMetadata>,
redirects: HashMap<String, String>,
endpoints: HashMap<String, String>,
}
impl PeerMap {
pub fn new() -> Self {
PeerMap {
peers: HashMap::new(),
redirects: HashMap::new(),
endpoints: HashMap::new(),
}
}
pub fn peer_ids(&self) -> Vec<String> {
self.peers
.iter()
.map(|(_, metadata)| metadata.id.to_string())
.collect()
}
pub fn insert(
&mut self,
peer_id: String,
connection_id: String,
endpoints: Vec<String>,
active_endpoint: String,
) {
let peer_metadata = PeerMetadata {
id: peer_id.clone(),
endpoints: endpoints.clone(),
active_endpoint,
status: PeerStatus::Connected,
connection_id,
};
self.peers.insert(peer_id.clone(), peer_metadata);
for endpoint in endpoints {
self.endpoints.insert(endpoint, peer_id.clone());
}
}
pub fn remove(&mut self, peer_id: &str) -> Option<String> {
self.redirects
.retain(|_, target_peer_id| target_peer_id != peer_id);
if let Some(peer_metadata) = self.peers.remove(&peer_id.to_string()) {
for endpoint in peer_metadata.endpoints.iter() {
self.endpoints.remove(endpoint);
}
Some(peer_metadata.active_endpoint)
} else {
None
}
}
pub fn update_peer_id(
&mut self,
old_peer_id: String,
new_peer_id: String,
) -> Result<(), PeerUpdateError> {
if let Some(mut peer_metadata) = self.peers.remove(&old_peer_id) {
for endpoint in peer_metadata.endpoints.iter() {
self.endpoints
.insert(endpoint.to_string(), new_peer_id.clone());
}
peer_metadata.id = new_peer_id.clone();
self.peers.insert(new_peer_id.clone(), peer_metadata);
for (_, v) in self
.redirects
.iter_mut()
.filter(|(_, v)| **v == old_peer_id)
{
*v = new_peer_id.clone()
}
self.redirects.insert(old_peer_id, new_peer_id);
Ok(())
} else {
Err(PeerUpdateError(format!(
"Unable to update {} to {}",
old_peer_id, new_peer_id
)))
}
}
pub fn update_peer(&mut self, peer_metadata: PeerMetadata) -> Result<(), PeerUpdateError> {
if self.peers.contains_key(&peer_metadata.id) {
for endpoint in peer_metadata.endpoints.iter() {
self.endpoints
.insert(endpoint.to_string(), peer_metadata.id.clone());
}
self.peers
.insert(peer_metadata.id.to_string(), peer_metadata);
Ok(())
} else {
Err(PeerUpdateError(format!(
"Unable to update peer {}, does not exist",
peer_metadata.id
)))
}
}
pub fn get_peer_from_endpoint(&self, endpoint: &str) -> Option<&PeerMetadata> {
if let Some(peer) = self.endpoints.get(endpoint) {
self.peers.get(peer)
} else {
None
}
}
}
#[cfg(test)]
pub mod tests {
use super::*;
#[test]
fn test_get_peer_ids() {
let mut peer_map = PeerMap::new();
let peers = peer_map.peer_ids();
assert_eq!(peers, Vec::<String>::new());
peer_map.insert(
"test_peer".to_string(),
"connection_id_1".to_string(),
vec!["test_endpoint1".to_string(), "test_endpoint2".to_string()],
"test_endpoint2".to_string(),
);
peer_map.insert(
"next_peer".to_string(),
"connection_id_2".to_string(),
vec!["endpoint1".to_string(), "endpoint2".to_string()],
"next_endpoint1".to_string(),
);
let mut peers = peer_map.peer_ids();
peers.sort();
assert_eq!(
peers,
vec!["next_peer".to_string(), "test_peer".to_string()]
);
peer_map
.update_peer_id("test_peer".to_string(), "new_peer".to_string())
.expect("Unable to update peer id");
let mut peers = peer_map.peer_ids();
peers.sort();
assert_eq!(peers, vec!["new_peer".to_string(), "next_peer".to_string()]);
}
#[test]
fn test_get_peer_by_endpoint() {
let mut peer_map = PeerMap::new();
let peer_metadata = peer_map.get_peer_from_endpoint("bad_endpoint");
assert_eq!(peer_metadata, None);
peer_map.insert(
"test_peer".to_string(),
"connection_id".to_string(),
vec!["test_endpoint1".to_string(), "test_endpoint2".to_string()],
"test_endpoint2".to_string(),
);
let peer_metadata = peer_map.get_peer_from_endpoint("test_endpoint1");
assert_eq!(
peer_metadata,
Some(&PeerMetadata {
id: "test_peer".to_string(),
connection_id: "connection_id".to_string(),
endpoints: vec!["test_endpoint1".to_string(), "test_endpoint2".to_string()],
active_endpoint: "test_endpoint2".to_string(),
status: PeerStatus::Connected
})
);
let peer_metadata = peer_map.get_peer_from_endpoint("test_endpoint2");
assert_eq!(
peer_metadata,
Some(&PeerMetadata {
id: "test_peer".to_string(),
connection_id: "connection_id".to_string(),
endpoints: vec!["test_endpoint1".to_string(), "test_endpoint2".to_string()],
active_endpoint: "test_endpoint2".to_string(),
status: PeerStatus::Connected
})
);
}
#[test]
fn test_insert_peer() {
let mut peer_map = PeerMap::new();
peer_map.insert(
"test_peer".to_string(),
"connection_id".to_string(),
vec!["test_endpoint1".to_string(), "test_endpoint2".to_string()],
"test_endpoint2".to_string(),
);
assert!(peer_map.peers.contains_key("test_peer"));
let peer_metadata = peer_map.peers.get("test_peer");
assert_eq!(
peer_metadata,
Some(&PeerMetadata {
id: "test_peer".to_string(),
connection_id: "connection_id".to_string(),
endpoints: vec!["test_endpoint1".to_string(), "test_endpoint2".to_string()],
active_endpoint: "test_endpoint2".to_string(),
status: PeerStatus::Connected
})
);
}
#[test]
fn test_remove_peer() {
let mut peer_map = PeerMap::new();
let active_endpoint = peer_map.remove("test_peer");
assert_eq!(active_endpoint, None,);
peer_map.insert(
"test_peer".to_string(),
"connection_id".to_string(),
vec!["test_endpoint1".to_string(), "test_endpoint2".to_string()],
"test_endpoint2".to_string(),
);
assert!(peer_map.peers.contains_key("test_peer"));
let active_endpoint = peer_map.remove("test_peer");
assert!(!peer_map.peers.contains_key("test_peer"));
assert_eq!(active_endpoint, Some("test_endpoint2".to_string()),);
}
#[test]
fn test_update_peer_id() {
let mut peer_map = PeerMap::new();
if let Ok(()) = peer_map.update_peer_id("test_peer".to_string(), "new_peer".to_string()) {
panic!("Should not be able to update peer because old peer does not exist")
}
peer_map.insert(
"test_peer".to_string(),
"connection_id".to_string(),
vec!["test_endpoint1".to_string(), "test_endpoint2".to_string()],
"test_endpoint2".to_string(),
);
assert!(peer_map.peers.contains_key("test_peer"));
peer_map
.update_peer_id("test_peer".to_string(), "new_peer".to_string())
.expect("Unable to update peer id");
assert!(peer_map.peers.contains_key("new_peer"));
assert!(peer_map.redirects.contains_key("test_peer"));
}
#[test]
fn test_get_update_active_endpoint() {
let mut peer_map = PeerMap::new();
let no_peer_metadata = PeerMetadata {
id: "test_peer".to_string(),
connection_id: "connection_id".to_string(),
endpoints: vec!["test_endpoint1".to_string(), "test_endpoint2".to_string()],
active_endpoint: "test_endpoint1".to_string(),
status: PeerStatus::Connected,
};
if let Ok(()) = peer_map.update_peer(no_peer_metadata) {
panic!("Should not have been able to update peer because test_peer does not exist")
}
peer_map.insert(
"test_peer".to_string(),
"connection_id".to_string(),
vec!["test_endpoint1".to_string(), "test_endpoint2".to_string()],
"test_endpoint2".to_string(),
);
assert!(peer_map.peers.contains_key("test_peer"));
let mut peer_metadata = peer_map
.get_peer_from_endpoint("test_endpoint2")
.cloned()
.expect("Unable to retrieve peer metadata with endpoint");
peer_metadata.active_endpoint = "test_endpoint1".to_string();
peer_metadata.endpoints.push("new_endpoint".to_string());
peer_metadata.status = PeerStatus::Disconnected { retry_attempts: 5 };
peer_map
.update_peer(peer_metadata)
.expect("Unable to update endpoint");
let peer_metadata = peer_map.peers.get("test_peer");
assert_eq!(
peer_metadata,
Some(&PeerMetadata {
id: "test_peer".to_string(),
endpoints: vec![
"test_endpoint1".to_string(),
"test_endpoint2".to_string(),
"new_endpoint".to_string()
],
connection_id: "connection_id".to_string(),
active_endpoint: "test_endpoint1".to_string(),
status: PeerStatus::Disconnected { retry_attempts: 5 },
})
);
}
}