use std::sync::mpsc::{channel, Sender};
use super::error::{
PeerListError, PeerManagerError, PeerRefAddError, PeerRefRemoveError, PeerRefUpdateError,
};
use super::notification::PeerNotificationIter;
use super::PeerRef;
use super::{PeerManagerMessage, PeerManagerRequest};
#[derive(Clone, Debug)]
pub struct PeerManagerConnector {
sender: Sender<PeerManagerMessage>,
}
impl PeerManagerConnector {
pub(crate) fn new(sender: Sender<PeerManagerMessage>) -> Self {
PeerManagerConnector { sender }
}
pub fn add_peer_ref(
&self,
peer_id: String,
endpoints: Vec<String>,
) -> Result<PeerRef, PeerRefAddError> {
let (sender, recv) = channel();
let message = PeerManagerMessage::Request(PeerManagerRequest::AddPeer {
peer_id,
endpoints,
sender,
});
match self.sender.send(message) {
Ok(()) => (),
Err(_) => {
return Err(PeerRefAddError::InternalError(
"Unable to send message to PeerManager, receiver dropped".to_string(),
))
}
};
recv.recv()
.map_err(|err| PeerRefAddError::ReceiveError(format!("{:?}", err)))?
}
pub fn update_peer_ref(
&self,
old_peer_id: &str,
new_peer_id: &str,
) -> Result<(), PeerRefUpdateError> {
let (sender, recv) = channel();
let message = PeerManagerMessage::Request(PeerManagerRequest::UpdatePeer {
old_peer_id: old_peer_id.to_string(),
new_peer_id: new_peer_id.to_string(),
sender,
});
match self.sender.send(message) {
Ok(()) => (),
Err(_) => {
return Err(PeerRefUpdateError::InternalError(
"Unable to send message to PeerManager, receiver dropped".to_string(),
))
}
};
recv.recv()
.map_err(|err| PeerRefUpdateError::ReceiveError(format!("{:?}", err)))?
}
pub fn list_peers(&self) -> Result<Vec<String>, PeerListError> {
let (sender, recv) = channel();
let message = PeerManagerMessage::Request(PeerManagerRequest::ListPeers { sender });
match self.sender.send(message) {
Ok(()) => (),
Err(_) => {
return Err(PeerListError::InternalError(
"Unable to send message to PeerManager, receiver dropped".to_string(),
))
}
};
recv.recv()
.map_err(|err| PeerListError::ReceiveError(format!("{:?}", err)))?
}
pub fn subscribe(&self) -> Result<PeerNotificationIter, PeerManagerError> {
let (send, recv) = channel();
match self.sender.send(PeerManagerMessage::Subscribe(send)) {
Ok(()) => Ok(PeerNotificationIter { recv }),
Err(_) => Err(PeerManagerError::SendMessageError(
"The peer manager is no longer running".into(),
)),
}
}
}
#[derive(Clone, Debug)]
pub(crate) struct PeerRemover {
pub sender: Sender<PeerManagerMessage>,
}
impl PeerRemover {
pub fn remove_peer_ref(&self, peer_id: &str) -> Result<(), PeerRefRemoveError> {
let (sender, recv) = channel();
let message = PeerManagerMessage::Request(PeerManagerRequest::RemovePeer {
peer_id: peer_id.to_string(),
sender,
});
match self.sender.send(message) {
Ok(()) => (),
Err(_) => {
return Err(PeerRefRemoveError::InternalError(
"Unable to send message to PeerManager, receiver dropped".to_string(),
))
}
};
recv.recv()
.map_err(|err| PeerRefRemoveError::ReceiveError(format!("{:?}", err)))?
}
}
impl PartialEq for PeerRemover {
fn eq(&self, _other: &Self) -> bool {
true
}
}