use std::{
cmp::Ordering,
fmt::{self, Display, Formatter},
future::Future,
hash::{Hash, Hasher},
net::SocketAddr,
sync::Arc,
};
use tokio::sync::{RwLock, RwLockReadGuard};
use xor_name::{XorName, XOR_NAME_LEN};
#[derive(Clone)]
pub struct Peer {
name: XorName,
addr: SocketAddr,
connection: Arc<RwLock<Option<qp2p::Connection>>>,
}
impl fmt::Debug for Peer {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let guard = self.connection.try_read();
let connection; f.debug_struct("Peer")
.field("name", &self.name)
.field("addr", &self.addr)
.field(
"connection",
match &guard {
Ok(guard) => {
connection = guard.as_ref();
&connection
}
Err(_) => &"<locked>",
},
)
.finish()
}
}
impl Display for Peer {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(
f,
"{} at {} ({})",
self.name,
self.addr,
match self.connection.try_read() {
Ok(guard) => guard
.as_ref()
.map(|_| "connected")
.unwrap_or("not connected"),
Err(_) => "<locked>",
}
)
}
}
impl Eq for Peer {}
impl Hash for Peer {
fn hash<H: Hasher>(&self, state: &mut H) {
self.name.hash(state);
self.addr.hash(state);
}
}
impl Ord for Peer {
fn cmp(&self, other: &Self) -> Ordering {
self.name
.cmp(&other.name)
.then_with(|| self.addr.cmp(&other.addr))
}
}
impl PartialEq for Peer {
fn eq(&self, other: &Self) -> bool {
self.name == other.name && self.addr == other.addr
}
}
impl PartialEq<&Self> for Peer {
fn eq(&self, other: &&Self) -> bool {
self.name == other.name && self.addr == other.addr
}
}
impl PartialOrd for Peer {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Peer {
pub(crate) fn new(name: XorName, addr: SocketAddr) -> Self {
Self {
name,
addr,
connection: Arc::new(RwLock::new(None)),
}
}
pub(crate) fn name(&self) -> XorName {
self.name
}
pub(crate) fn addr(&self) -> SocketAddr {
self.addr
}
pub(crate) fn age(&self) -> u8 {
self.name[XOR_NAME_LEN - 1]
}
pub(crate) async fn connection(&self) -> Option<qp2p::Connection> {
self.connection.read().await.as_ref().cloned()
}
pub(crate) async fn merge_connection(&self, other: &Self) {
if self.addr != other.addr {
return;
}
if let Ok(true) = self
.connection
.try_read()
.map(|connection| connection.is_some())
{
return;
}
let other_connection = if let Ok(connection) =
RwLockReadGuard::try_map(other.connection.read().await, Option::as_ref)
{
connection.clone()
} else {
return;
};
if self.addr != other_connection.remote_address() {
return;
}
let mut guard = self.connection.write().await;
if guard.is_some() {
return;
}
*guard = Some(other_connection);
}
pub(crate) async fn ensure_connection<Connect, Fut>(
&self,
is_valid: impl Fn(&qp2p::Connection) -> bool,
connect: Connect,
) -> Result<RwLockReadGuard<'_, qp2p::Connection>, qp2p::ConnectionError>
where
Connect: FnOnce(SocketAddr) -> Fut,
Fut: Future<Output = Result<qp2p::Connection, qp2p::ConnectionError>>,
{
if let Some(guard) = self
.connection
.try_read()
.ok()
.and_then(|guard| RwLockReadGuard::try_map(guard, Option::as_ref).ok())
{
if is_valid(&guard) {
return Ok(guard);
}
}
let mut guard = self.connection.write().await;
if let Some(connection) = guard.as_ref() {
if is_valid(connection) {
return Ok(RwLockReadGuard::try_map(guard.downgrade(), Option::as_ref)
.expect("write-locked value can't have changed"));
}
}
*guard = Some(connect(self.addr).await?);
Ok(RwLockReadGuard::try_map(guard.downgrade(), Option::as_ref)
.expect("write-locked value can't have changed"))
}
}
#[derive(Clone, Debug)]
pub(crate) struct UnnamedPeer {
addr: SocketAddr,
connection: Option<qp2p::Connection>,
}
impl UnnamedPeer {
pub(crate) fn addressed(addr: SocketAddr) -> Self {
Self {
addr,
connection: None,
}
}
pub(crate) fn connected(connection: qp2p::Connection) -> Self {
Self {
addr: connection.remote_address(),
connection: Some(connection),
}
}
pub(crate) fn addr(&self) -> SocketAddr {
self.addr
}
pub(crate) fn named(self, name: XorName) -> Peer {
Peer {
name,
addr: self.addr,
connection: Arc::new(RwLock::new(self.connection)),
}
}
}
#[cfg(test)]
pub(crate) mod test_utils {
use super::*;
use proptest::{collection::SizeRange, prelude::*};
use xor_name::XOR_NAME_LEN;
pub(crate) fn arbitrary_bytes() -> impl Strategy<Value = [u8; XOR_NAME_LEN]> {
any::<[u8; XOR_NAME_LEN]>()
}
pub(crate) fn arbitrary_unique_peers(
count: impl Into<SizeRange>,
age: impl Strategy<Value = u8>,
) -> impl Strategy<Value = Vec<Peer>> {
proptest::collection::btree_map(arbitrary_bytes(), (any::<SocketAddr>(), age), count)
.prop_map(|peers| {
peers
.into_iter()
.map(|(mut bytes, (addr, age))| {
bytes[XOR_NAME_LEN - 1] = age;
let name = XorName(bytes);
Peer::new(name, addr)
})
.collect()
})
}
}