use std::collections::HashMap;
use std::time::Instant;
use crate::collections::BiHashMap;
use super::error::PeerUpdateError;
#[derive(Clone, PartialEq, Debug)]
pub enum PeerStatus {
Connected,
Pending,
Disconnected { retry_attempts: u64 },
}
#[derive(Clone, PartialEq, Debug)]
pub struct PeerMetadata {
pub id: String,
pub connection_id: String,
pub endpoints: Vec<String>,
pub active_endpoint: String,
pub status: PeerStatus,
pub last_connection_attempt: Instant,
pub retry_frequency: u64,
}
pub struct PeerMap {
peers: HashMap<String, PeerMetadata>,
endpoints: HashMap<String, String>,
initial_retry_frequency: u64,
}
impl PeerMap {
pub fn new(initial_retry_frequency: u64) -> Self {
PeerMap {
peers: HashMap::new(),
endpoints: HashMap::new(),
initial_retry_frequency,
}
}
pub fn peer_ids(&self) -> Vec<String> {
self.peers
.iter()
.map(|(_, metadata)| metadata.id.to_string())
.collect()
}
pub fn connection_ids(&self) -> BiHashMap<String, String> {
let mut peer_to_connection_id = BiHashMap::new();
for (peer, metadata) in self.peers.iter() {
peer_to_connection_id.insert(peer.to_string(), metadata.connection_id.to_string());
}
peer_to_connection_id
}
pub fn insert(
&mut self,
peer_id: String,
connection_id: String,
endpoints: Vec<String>,
active_endpoint: String,
status: PeerStatus,
) {
let peer_metadata = PeerMetadata {
id: peer_id.clone(),
endpoints: endpoints.clone(),
active_endpoint,
status,
connection_id,
last_connection_attempt: Instant::now(),
retry_frequency: self.initial_retry_frequency,
};
self.peers.insert(peer_id.clone(), peer_metadata);
for endpoint in endpoints {
self.endpoints.insert(endpoint, peer_id.clone());
}
}
pub fn remove(&mut self, peer_id: &str) -> Option<PeerMetadata> {
if let Some(peer_metadata) = self.peers.remove(&peer_id.to_string()) {
for endpoint in peer_metadata.endpoints.iter() {
self.endpoints.remove(endpoint);
}
Some(peer_metadata)
} else {
None
}
}
pub fn update_peer(&mut self, peer_metadata: PeerMetadata) -> Result<(), PeerUpdateError> {
if self.peers.contains_key(&peer_metadata.id) {
for endpoint in peer_metadata.endpoints.iter() {
self.endpoints
.insert(endpoint.to_string(), peer_metadata.id.clone());
}
self.peers
.insert(peer_metadata.id.to_string(), peer_metadata);
Ok(())
} else {
Err(PeerUpdateError(format!(
"Unable to update peer {}, does not exist",
peer_metadata.id
)))
}
}
pub fn get_peer_from_endpoint(&self, endpoint: &str) -> Option<&PeerMetadata> {
if let Some(peer) = self.endpoints.get(endpoint) {
self.peers.get(peer)
} else {
None
}
}
pub fn get_by_peer_id(&self, peer_id: &str) -> Option<&PeerMetadata> {
self.peers.get(peer_id)
}
pub fn get_by_connection_id(&self, connection_id: &str) -> Option<&PeerMetadata> {
self.peers
.values()
.find(|meta| meta.connection_id == connection_id)
}
pub fn get_pending(&self) -> impl Iterator<Item = (&String, &PeerMetadata)> {
self.peers
.iter()
.filter(|(_id, peer_meta)| peer_meta.status == PeerStatus::Pending)
}
pub fn contains_endpoint(&self, endpoint: &str) -> bool {
self.endpoints.contains_key(endpoint)
}
}
#[cfg(test)]
pub mod tests {
use super::*;
#[test]
fn test_get_peer_ids() {
let mut peer_map = PeerMap::new(10);
let peers = peer_map.peer_ids();
assert_eq!(peers, Vec::<String>::new());
peer_map.insert(
"test_peer".to_string(),
"connection_id_1".to_string(),
vec!["test_endpoint1".to_string(), "test_endpoint2".to_string()],
"test_endpoint2".to_string(),
PeerStatus::Connected,
);
peer_map.insert(
"next_peer".to_string(),
"connection_id_2".to_string(),
vec!["endpoint1".to_string(), "endpoint2".to_string()],
"next_endpoint1".to_string(),
PeerStatus::Connected,
);
let mut peers = peer_map.peer_ids();
peers.sort();
assert_eq!(
peers,
vec!["next_peer".to_string(), "test_peer".to_string()]
);
}
#[test]
fn test_get_connection_ids() {
let mut peer_map = PeerMap::new(10);
let peers = peer_map.peer_ids();
assert_eq!(peers, Vec::<String>::new());
peer_map.insert(
"test_peer".to_string(),
"connection_id_1".to_string(),
vec!["test_endpoint1".to_string(), "test_endpoint2".to_string()],
"test_endpoint2".to_string(),
PeerStatus::Connected,
);
peer_map.insert(
"next_peer".to_string(),
"connection_id_2".to_string(),
vec!["endpoint1".to_string(), "endpoint2".to_string()],
"next_endpoint1".to_string(),
PeerStatus::Connected,
);
let peers = peer_map.connection_ids();
assert_eq!(
peers.get_by_key("test_peer"),
Some(&"connection_id_1".to_string())
);
assert_eq!(
peers.get_by_key("next_peer"),
Some(&"connection_id_2".to_string())
);
}
#[test]
fn test_get_peer_by_endpoint() {
let mut peer_map = PeerMap::new(10);
let peer_metadata = peer_map.get_peer_from_endpoint("bad_endpoint");
assert_eq!(peer_metadata, None);
peer_map.insert(
"test_peer".to_string(),
"connection_id".to_string(),
vec!["test_endpoint1".to_string(), "test_endpoint2".to_string()],
"test_endpoint2".to_string(),
PeerStatus::Pending,
);
let peer_metadata = peer_map
.get_peer_from_endpoint("test_endpoint1")
.expect("missing expected peer_metadata");
assert_eq!(peer_metadata.id, "test_peer".to_string());
assert_eq!(
peer_metadata.endpoints,
vec!["test_endpoint1".to_string(), "test_endpoint2".to_string()]
);
assert_eq!(peer_metadata.active_endpoint, "test_endpoint2".to_string());
assert_eq!(peer_metadata.status, PeerStatus::Pending);
assert_eq!(
Some(peer_metadata),
peer_map.get_peer_from_endpoint("test_endpoint2")
);
}
#[test]
fn test_insert_peer() {
let mut peer_map = PeerMap::new(10);
peer_map.insert(
"test_peer".to_string(),
"connection_id".to_string(),
vec!["test_endpoint1".to_string(), "test_endpoint2".to_string()],
"test_endpoint2".to_string(),
PeerStatus::Pending,
);
assert!(peer_map.peers.contains_key("test_peer"));
let peer_metadata = peer_map
.peers
.get("test_peer")
.expect("Missing peer_metadata");
assert_eq!(peer_metadata.id, "test_peer".to_string());
assert_eq!(
peer_metadata.endpoints,
vec!["test_endpoint1".to_string(), "test_endpoint2".to_string()]
);
assert_eq!(peer_metadata.active_endpoint, "test_endpoint2".to_string());
assert_eq!(peer_metadata.status, PeerStatus::Pending);
}
#[test]
fn test_remove_peer() {
let mut peer_map = PeerMap::new(10);
let peer_metdata = peer_map.remove("test_peer");
assert_eq!(peer_metdata, None);
peer_map.insert(
"test_peer".to_string(),
"connection_id".to_string(),
vec!["test_endpoint1".to_string(), "test_endpoint2".to_string()],
"test_endpoint2".to_string(),
PeerStatus::Pending,
);
assert!(peer_map.peers.contains_key("test_peer"));
let peer_metadata = peer_map.remove("test_peer").expect("Missing peer_metadata");
assert!(!peer_map.peers.contains_key("test_peer"));
assert_eq!(peer_metadata.active_endpoint, "test_endpoint2".to_string());
assert_eq!(peer_metadata.id, "test_peer".to_string());
}
#[test]
fn test_get_update_active_endpoint() {
let mut peer_map = PeerMap::new(10);
let no_peer_metadata = PeerMetadata {
id: "test_peer".to_string(),
connection_id: "connection_id".to_string(),
endpoints: vec!["test_endpoint1".to_string(), "test_endpoint2".to_string()],
active_endpoint: "test_endpoint1".to_string(),
status: PeerStatus::Connected,
last_connection_attempt: Instant::now(),
retry_frequency: 10,
};
if let Ok(()) = peer_map.update_peer(no_peer_metadata) {
panic!("Should not have been able to update peer because test_peer does not exist")
}
peer_map.insert(
"test_peer".to_string(),
"connection_id".to_string(),
vec!["test_endpoint1".to_string(), "test_endpoint2".to_string()],
"test_endpoint2".to_string(),
PeerStatus::Connected,
);
assert!(peer_map.peers.contains_key("test_peer"));
let mut peer_metadata = peer_map
.get_peer_from_endpoint("test_endpoint2")
.cloned()
.expect("Unable to retrieve peer metadata with endpoint");
peer_metadata.active_endpoint = "test_endpoint1".to_string();
peer_metadata.endpoints.push("new_endpoint".to_string());
peer_metadata.status = PeerStatus::Disconnected { retry_attempts: 5 };
peer_map
.update_peer(peer_metadata)
.expect("Unable to update endpoint");
let peer_metadata = peer_map
.peers
.get("test_peer")
.expect("Missing peer_metadata");
assert_eq!(peer_metadata.id, "test_peer".to_string());
assert_eq!(
peer_metadata.endpoints,
vec![
"test_endpoint1".to_string(),
"test_endpoint2".to_string(),
"new_endpoint".to_string()
]
);
assert_eq!(peer_metadata.active_endpoint, "test_endpoint1".to_string());
assert_eq!(
peer_metadata.status,
PeerStatus::Disconnected { retry_attempts: 5 }
);
}
}