use std::{collections::HashMap, fmt, time::Duration};
use nom::lib::std::collections::hash_map::Entry;
use crate::{peer_manager::NodeId, Minimized, PeerConnection};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConnectionStatus {
NotConnected,
Connecting,
Connected,
Retrying,
Failed,
Disconnected(Minimized),
}
impl ConnectionStatus {
pub fn is_connected(self) -> bool {
matches!(self, ConnectionStatus::Connected)
}
}
impl fmt::Display for ConnectionStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{self:?}")
}
}
#[derive(Debug, Clone)]
pub struct PeerConnectionState {
node_id: NodeId,
connection: Option<PeerConnection>,
status: ConnectionStatus,
}
impl PeerConnectionState {
pub fn connection(&self) -> Option<&PeerConnection> {
self.connection.as_ref()
}
pub fn is_connected(&self) -> bool {
self.status.is_connected() && self.connection().is_some_and(|c| c.is_connected())
}
pub fn connection_mut(&mut self) -> Option<&mut PeerConnection> {
self.connection.as_mut()
}
pub fn into_connection(self) -> Option<PeerConnection> {
self.connection
}
pub fn status(&self) -> ConnectionStatus {
self.status
}
pub fn node_id(&self) -> &NodeId {
&self.node_id
}
fn connected(conn: PeerConnection) -> Self {
Self {
node_id: conn.peer_node_id().clone(),
connection: Some(conn),
status: ConnectionStatus::Connected,
}
}
fn set_connection(&mut self, conn: PeerConnection) {
self.connection = Some(conn);
}
}
impl fmt::Display for PeerConnectionState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}, Status = {}",
self.connection()
.map(ToString::to_string)
.unwrap_or_else(|| self.node_id.to_string()),
self.status()
)
}
}
#[derive(Debug, Clone, Default)]
pub struct ConnectionPool {
connections: HashMap<NodeId, PeerConnectionState>,
}
impl ConnectionPool {
pub fn new() -> Self {
Default::default()
}
pub fn contains(&mut self, node_id: &NodeId) -> bool {
self.connections.contains_key(node_id)
}
pub fn insert_connection(&mut self, conn: PeerConnection) -> ConnectionStatus {
match self.connections.entry(conn.peer_node_id().clone()) {
Entry::Occupied(mut entry) => {
let entry_mut = entry.get_mut();
entry_mut.status = if conn.is_connected() {
ConnectionStatus::Connected
} else {
ConnectionStatus::Disconnected(Minimized::No)
};
entry_mut.set_connection(conn);
entry_mut.status
},
Entry::Vacant(entry) => entry.insert(PeerConnectionState::connected(conn)).status,
}
}
#[inline]
pub fn get(&self, node_id: &NodeId) -> Option<&PeerConnectionState> {
self.connections.get(node_id)
}
#[inline]
pub fn get_mut(&mut self, node_id: &NodeId) -> Option<&mut PeerConnectionState> {
self.connections.get_mut(node_id)
}
pub fn all(&self) -> Vec<&PeerConnectionState> {
self.connections.values().collect()
}
pub fn get_connection(&self, node_id: &NodeId) -> Option<&PeerConnection> {
self.get(node_id).and_then(|c| c.connection())
}
pub fn get_connection_mut(&mut self, node_id: &NodeId) -> Option<&mut PeerConnection> {
self.get_mut(node_id).and_then(|c| c.connection_mut())
}
pub fn get_connection_status(&self, node_id: &NodeId) -> ConnectionStatus {
self.get(node_id)
.map(|c| c.status())
.unwrap_or(ConnectionStatus::NotConnected)
}
pub fn get_inactive_outbound_connections_mut(&mut self, min_age: Duration) -> Vec<&mut PeerConnection> {
self.filter_connections_mut(|conn| {
conn.age() > min_age && conn.handle_count() <= 1 && conn.substream_count() > 2
})
}
pub(in crate::connectivity) fn filter_drain<P>(&mut self, mut predicate: P) -> Vec<PeerConnectionState>
where P: FnMut(&PeerConnectionState) -> bool {
let (keep, remove) = self
.connections
.drain()
.partition::<Vec<_>, _>(|(_, c)| !(predicate)(c));
self.connections = keep.into_iter().collect::<HashMap<_, _>>();
remove.into_iter().map(|(_, s)| s).collect()
}
pub(in crate::connectivity) fn filter_connection_states<P>(&self, mut predicate: P) -> Vec<&PeerConnection>
where P: FnMut(&PeerConnectionState) -> bool {
self.connections
.values()
.filter(|c| (predicate)(c))
.filter_map(|c| c.connection())
.collect()
}
fn filter_connections_mut<P>(&mut self, mut predicate: P) -> Vec<&mut PeerConnection>
where P: FnMut(&PeerConnection) -> bool {
self.connections
.values_mut()
.filter_map(|c| c.connection_mut())
.filter(|c| (predicate)(c))
.collect()
}
pub fn set_status(&mut self, node_id: &NodeId, status: ConnectionStatus) -> ConnectionStatus {
match self.connections.get_mut(node_id) {
Some(state) => {
let old_status = state.status();
state.status = status;
old_status
},
None => ConnectionStatus::NotConnected,
}
}
pub fn remove(&mut self, node_id: &NodeId) -> Option<PeerConnection> {
self.connections.remove(node_id).and_then(|c| c.into_connection())
}
pub fn count_connected_nodes(&self) -> usize {
self.count_filtered(|c| {
c.status() == ConnectionStatus::Connected &&
c.connection()
.filter(|c| c.is_connected() && c.peer_features().is_node())
.is_some()
})
}
pub fn count_connected_clients(&self) -> usize {
self.count_filtered(|c| {
c.status() == ConnectionStatus::Connected &&
c.connection()
.filter(|c| c.is_connected() && c.peer_features().is_client())
.is_some()
})
}
pub fn count_connected(&self) -> usize {
self.count_filtered(|c| c.is_connected())
}
pub fn count_failed(&self) -> usize {
self.count_filtered(|c| c.status() == ConnectionStatus::Failed)
}
pub fn count_disconnected(&self) -> usize {
self.count_filtered(|c| matches!(c.status(), ConnectionStatus::Disconnected(_)))
}
pub fn count_entries(&self) -> usize {
self.connections.len()
}
pub fn get_connected_node_ids(&self) -> Vec<NodeId> {
self.connections
.iter()
.filter_map(|(node_id, conn_state)| {
if conn_state.status() == ConnectionStatus::Connected &&
conn_state
.connection()
.filter(|c| c.is_connected() && c.peer_features().is_node())
.is_some()
{
Some(node_id.clone())
} else {
None
}
})
.collect()
}
pub(in crate::connectivity) fn count_filtered<P>(&self, mut predicate: P) -> usize
where P: FnMut(&PeerConnectionState) -> bool {
self.connections.values().filter(|c| (predicate)(c)).count()
}
}