use std::sync::mpsc::{channel, Sender};
use crate::collections::BiHashMap;
use super::error::{
PeerConnectionIdError, PeerListError, PeerLookupError, PeerManagerError, PeerRefAddError,
PeerRefRemoveError, PeerUnknownAddError,
};
use super::notification::{PeerManagerNotification, PeerNotificationIter, SubscriberId};
use super::{EndpointPeerRef, PeerRef};
use super::{PeerAuthorizationToken, PeerTokenPair};
use super::{PeerManagerMessage, PeerManagerRequest};
pub trait PeerLookup: Send {
fn connection_id(&self, peer_id: &PeerTokenPair) -> Result<Option<String>, PeerLookupError>;
fn peer_id(&self, connection_id: &str) -> Result<Option<PeerTokenPair>, PeerLookupError>;
}
pub trait PeerLookupProvider {
fn peer_lookup(&self) -> Box<dyn PeerLookup>;
}
#[derive(Clone, Debug)]
pub struct PeerManagerConnector {
sender: Sender<PeerManagerMessage>,
}
impl PeerManagerConnector {
pub(crate) fn new(sender: Sender<PeerManagerMessage>) -> Self {
PeerManagerConnector { sender }
}
pub fn add_peer_ref(
&self,
peer_id: PeerAuthorizationToken,
endpoints: Vec<String>,
required_local_auth: PeerAuthorizationToken,
) -> Result<PeerRef, PeerRefAddError> {
let (sender, recv) = channel();
let message = PeerManagerMessage::Request(PeerManagerRequest::AddPeer {
peer_id,
endpoints,
required_local_auth,
sender,
});
match self.sender.send(message) {
Ok(()) => (),
Err(_) => {
return Err(PeerRefAddError::InternalError(
"Unable to send message to PeerManager, receiver dropped".to_string(),
))
}
};
recv.recv()
.map_err(|err| PeerRefAddError::ReceiveError(format!("{:?}", err)))?
}
pub fn add_unidentified_peer(
&self,
endpoint: String,
local_authorization: PeerAuthorizationToken,
) -> Result<EndpointPeerRef, PeerUnknownAddError> {
let (sender, recv) = channel();
let message = PeerManagerMessage::Request(PeerManagerRequest::AddUnidentified {
endpoint,
local_authorization,
sender,
});
match self.sender.send(message) {
Ok(()) => (),
Err(_) => {
return Err(PeerUnknownAddError::InternalError(
"Unable to send message to PeerManager, receiver dropped".to_string(),
))
}
};
recv.recv()
.map_err(|err| PeerUnknownAddError::ReceiveError(format!("{:?}", err)))?
}
pub fn list_peers(&self) -> Result<Vec<PeerAuthorizationToken>, PeerListError> {
let (sender, recv) = channel();
let message = PeerManagerMessage::Request(PeerManagerRequest::ListPeers { sender });
match self.sender.send(message) {
Ok(()) => (),
Err(_) => {
return Err(PeerListError::Internal(
"Unable to send message to PeerManager, receiver dropped".to_string(),
))
}
};
recv.recv()
.map_err(|err| PeerListError::Receive(format!("{:?}", err)))?
}
pub fn list_unreferenced_peers(&self) -> Result<Vec<PeerTokenPair>, PeerListError> {
let (sender, recv) = channel();
let message =
PeerManagerMessage::Request(PeerManagerRequest::ListUnreferencedPeers { sender });
match self.sender.send(message) {
Ok(()) => (),
Err(_) => {
return Err(PeerListError::Internal(
"Unable to send message to PeerManager, receiver dropped".to_string(),
))
}
};
recv.recv()
.map_err(|err| PeerListError::Receive(format!("{:?}", err)))?
}
pub fn connection_ids(
&self,
) -> Result<BiHashMap<PeerTokenPair, String>, PeerConnectionIdError> {
let (sender, recv) = channel();
let message = PeerManagerMessage::Request(PeerManagerRequest::ConnectionIds { sender });
match self.sender.send(message) {
Ok(()) => (),
Err(_) => {
return Err(PeerConnectionIdError::InternalError(
"Unable to send message to PeerManager, receiver dropped".to_string(),
))
}
};
recv.recv()
.map_err(|err| PeerConnectionIdError::ReceiveError(format!("{:?}", err)))?
}
#[deprecated(since = "0.5.1", note = "please use `subscribe_sender` instead")]
pub fn subscribe(&self) -> Result<PeerNotificationIter, PeerManagerError> {
let (send, recv) = channel();
match self.sender.send(PeerManagerMessage::Subscribe(send)) {
Ok(()) => Ok(PeerNotificationIter { recv }),
Err(_) => Err(PeerManagerError::SendMessageError(
"The peer manager is no longer running".into(),
)),
}
}
pub fn subscribe_sender<T>(
&self,
subscriber: Sender<T>,
) -> Result<SubscriberId, PeerManagerError>
where
T: From<PeerManagerNotification> + Send + 'static,
{
let (sender, recv) = channel();
self.sender
.send(PeerManagerMessage::Request(PeerManagerRequest::Subscribe {
sender,
callback: Box::new(move |notification| {
subscriber.send(T::from(notification)).map_err(Box::from)
}),
}))
.map_err(|_| {
PeerManagerError::SendMessageError("The peer manager is no longer running".into())
})?;
recv.recv().map_err(|_| {
PeerManagerError::SendMessageError("The peer manager is no longer running".into())
})?
}
pub fn unsubscribe(&self, subscriber_id: SubscriberId) -> Result<(), PeerManagerError> {
let (sender, recv) = channel();
self.sender
.send(PeerManagerMessage::Request(
PeerManagerRequest::Unsubscribe {
subscriber_id,
sender,
},
))
.map_err(|_| {
PeerManagerError::SendMessageError("The peer manager is no longer running".into())
})?;
recv.recv().map_err(|_| {
PeerManagerError::SendMessageError("The peer manager is no longer running".into())
})?
}
}
impl PeerLookup for PeerManagerConnector {
fn connection_id(&self, peer_id: &PeerTokenPair) -> Result<Option<String>, PeerLookupError> {
let (sender, recv) = channel();
let message = PeerManagerMessage::Request(PeerManagerRequest::GetConnectionId {
peer_id: peer_id.clone(),
sender,
});
match self.sender.send(message) {
Ok(()) => (),
Err(_) => {
return Err(PeerLookupError(
"Unable to send message to PeerManager, receiver dropped".to_string(),
))
}
};
recv.recv()
.map_err(|err| PeerLookupError(format!("{:?}", err)))?
}
fn peer_id(&self, connection_id: &str) -> Result<Option<PeerTokenPair>, PeerLookupError> {
let (sender, recv) = channel();
let message = PeerManagerMessage::Request(PeerManagerRequest::GetPeerId {
connection_id: connection_id.to_string(),
sender,
});
match self.sender.send(message) {
Ok(()) => (),
Err(_) => {
return Err(PeerLookupError(
"Unable to send message to PeerManager, receiver dropped".to_string(),
))
}
};
recv.recv()
.map_err(|err| PeerLookupError(format!("{:?}", err)))?
}
}
impl PeerLookupProvider for PeerManagerConnector {
fn peer_lookup(&self) -> Box<dyn PeerLookup> {
Box::new(self.clone())
}
}
#[derive(Clone, Debug)]
pub(crate) struct PeerRemover {
pub sender: Sender<PeerManagerMessage>,
}
impl PeerRemover {
pub fn remove_peer_ref(&self, peer_id: &PeerTokenPair) -> Result<(), PeerRefRemoveError> {
let (sender, recv) = channel();
let message = PeerManagerMessage::Request(PeerManagerRequest::RemovePeer {
peer_id: peer_id.clone(),
sender,
});
match self.sender.send(message) {
Ok(()) => (),
Err(_) => {
return Err(PeerRefRemoveError::Internal(
"Unable to send message to PeerManager, receiver dropped".to_string(),
))
}
};
recv.recv()
.map_err(|err| PeerRefRemoveError::Receive(format!("{:?}", err)))?
}
pub fn remove_peer_ref_by_endpoint(
&self,
endpoint: &str,
connection_id: &str,
) -> Result<(), PeerRefRemoveError> {
let (sender, recv) = channel();
let message = PeerManagerMessage::Request(PeerManagerRequest::RemovePeerByEndpoint {
endpoint: endpoint.to_string(),
connection_id: connection_id.to_string(),
sender,
});
match self.sender.send(message) {
Ok(()) => (),
Err(_) => {
return Err(PeerRefRemoveError::Internal(
"Unable to send message to PeerManager, receiver dropped".to_string(),
))
}
};
recv.recv()
.map_err(|err| PeerRefRemoveError::Receive(format!("{:?}", err)))?
}
}
impl PartialEq for PeerRemover {
fn eq(&self, _other: &Self) -> bool {
true
}
}