mod builder;
mod connector;
mod error;
pub mod interconnect;
mod notification;
mod peer_map;
mod peer_ref;
mod token;
mod unreferenced;
use std::cmp::min;
use std::io::ErrorKind;
use std::sync::mpsc::{channel, Sender};
use std::thread;
use std::time::Instant;
use uuid::Uuid;
use crate::collections::{BiHashMap, RefMap};
use crate::error::InternalError;
use crate::network::connection_manager::ConnectionManagerNotification;
use crate::network::connection_manager::{ConnectionManagerError, Connector};
use crate::threading::lifecycle::ShutdownHandle;
use crate::threading::pacemaker;
pub use self::builder::PeerManagerBuilder;
use self::connector::PeerRemover;
pub use self::connector::{PeerLookup, PeerManagerConnector};
use self::error::{
PeerConnectionIdError, PeerListError, PeerLookupError, PeerManagerError, PeerRefAddError,
PeerRefRemoveError, PeerUnknownAddError,
};
pub use self::notification::{PeerManagerNotification, PeerNotificationIter, SubscriberId};
use self::notification::{Subscriber, SubscriberMap};
use self::peer_map::{PeerMap, PeerStatus};
pub use self::peer_ref::{EndpointPeerRef, PeerRef};
pub use self::token::{PeerAuthorizationToken, PeerTokenPair};
use self::unreferenced::{RequestedEndpoint, UnreferencedPeer, UnreferencedPeerState};
pub(crate) enum PeerManagerMessage {
Shutdown,
Request(PeerManagerRequest),
Subscribe(Sender<PeerManagerNotification>),
InternalNotification(ConnectionManagerNotification),
RetryPending,
}
impl From<ConnectionManagerNotification> for PeerManagerMessage {
fn from(notification: ConnectionManagerNotification) -> Self {
PeerManagerMessage::InternalNotification(notification)
}
}
pub(crate) enum PeerManagerRequest {
AddPeer {
peer_id: PeerAuthorizationToken,
endpoints: Vec<String>,
required_local_auth: PeerAuthorizationToken,
sender: Sender<Result<PeerRef, PeerRefAddError>>,
},
AddUnidentified {
endpoint: String,
local_authorization: PeerAuthorizationToken,
sender: Sender<Result<EndpointPeerRef, PeerUnknownAddError>>,
},
RemovePeer {
peer_id: PeerTokenPair,
sender: Sender<Result<(), PeerRefRemoveError>>,
},
RemovePeerByEndpoint {
endpoint: String,
connection_id: String,
sender: Sender<Result<(), PeerRefRemoveError>>,
},
ListPeers {
sender: Sender<Result<Vec<PeerAuthorizationToken>, PeerListError>>,
},
ListUnreferencedPeers {
sender: Sender<Result<Vec<PeerTokenPair>, PeerListError>>,
},
ConnectionIds {
sender: Sender<Result<BiHashMap<PeerTokenPair, String>, PeerConnectionIdError>>,
},
GetConnectionId {
peer_id: PeerTokenPair,
sender: Sender<Result<Option<String>, PeerLookupError>>,
},
GetPeerId {
connection_id: String,
sender: Sender<Result<Option<PeerTokenPair>, PeerLookupError>>,
},
Subscribe {
sender: Sender<Result<SubscriberId, PeerManagerError>>,
callback: Subscriber,
},
Unsubscribe {
subscriber_id: SubscriberId,
sender: Sender<Result<(), PeerManagerError>>,
},
}
pub struct PeerManager {
join_handle: thread::JoinHandle<()>,
sender: Sender<PeerManagerMessage>,
pacemaker_shutdown_signaler: pacemaker::ShutdownSignaler,
}
impl PeerManager {
#[deprecated(since = "0.5.1", note = "Please use PeerManagerBuilder instead")]
pub fn new(
connector: Connector,
max_retry_attempts: Option<u64>,
retry_interval: Option<u64>,
identity: String,
strict_ref_counts: bool,
) -> Self {
let mut builder = PeerManagerBuilder::default()
.with_connector(connector)
.with_identity(identity)
.with_strict_ref_counts(strict_ref_counts);
if let Some(max_retry) = max_retry_attempts {
builder = builder.with_max_retry_attempts(max_retry);
}
if let Some(retry_interval) = retry_interval {
builder = builder.with_retry_interval(retry_interval);
}
builder
.start()
.expect("Building the PeerManager failed unexpectedly")
}
pub fn builder() -> PeerManagerBuilder {
PeerManagerBuilder::default()
}
#[deprecated(
since = "0.5.1",
note = "Please use connector() instead. The PeerManagerBuilder starts up the PeerManager \
now"
)]
pub fn start(&mut self) -> Result<PeerManagerConnector, PeerManagerError> {
Ok(PeerManagerConnector::new(self.sender.clone()))
}
pub fn connector(&self) -> PeerManagerConnector {
PeerManagerConnector::new(self.sender.clone())
}
#[allow(clippy::too_many_arguments)]
fn build(
retry_interval: u64,
max_retry_attempts: u64,
strict_ref_counts: bool,
#[allow(unused_variables)] identity: String,
connector: Connector,
retry_frequency: u64,
max_retry_frequency: u64,
endpoint_retry_frequency: u64,
) -> Result<PeerManager, PeerManagerError> {
debug!(
"Starting peer manager with identity={}, retry_interval={}s, max_retry_attempts={} \
strict_ref_counts={}, retry_frequency={}, max_retry_frequency={}, and \
endpoint_retry_frequency={}",
identity,
retry_interval,
max_retry_attempts,
strict_ref_counts,
retry_frequency,
max_retry_frequency,
endpoint_retry_frequency,
);
let (sender, recv) = channel();
let peer_remover = PeerRemover {
sender: sender.clone(),
};
let subscriber_id = connector.subscribe(sender.clone()).map_err(|err| {
PeerManagerError::StartUpError(format!(
"Unable to subscribe to connection manager notifications: {}",
err
))
})?;
debug!(
"Starting peer manager pacemaker with interval of {}s",
retry_interval
);
let pacemaker = pacemaker::Pacemaker::builder()
.with_interval(retry_interval)
.with_sender(sender.clone())
.with_message_factory(|| PeerManagerMessage::RetryPending)
.start()
.map_err(|err| PeerManagerError::StartUpError(err.to_string()))?;
let pacemaker_shutdown_signaler = pacemaker.shutdown_signaler();
let join_handle = thread::Builder::new()
.name("Peer Manager".into())
.spawn(move || {
let mut peers = PeerMap::new(retry_frequency);
let mut unreferenced_peers = UnreferencedPeerState::new(endpoint_retry_frequency);
let mut ref_map = RefMap::new();
let mut subscribers = SubscriberMap::new();
loop {
match recv.recv() {
Ok(PeerManagerMessage::Shutdown) => break,
Ok(PeerManagerMessage::Request(request)) => {
handle_request(
request,
connector.clone(),
&mut unreferenced_peers,
&mut peers,
&peer_remover,
&mut ref_map,
&mut subscribers,
strict_ref_counts,
);
}
Ok(PeerManagerMessage::Subscribe(sender)) => {
subscribers.add_subscriber(Box::new(move |notification| {
sender.send(notification).map_err(Box::from)
}));
}
Ok(PeerManagerMessage::InternalNotification(notification)) => {
handle_notifications(
notification,
&mut unreferenced_peers,
&mut peers,
connector.clone(),
&mut subscribers,
max_retry_attempts,
&mut ref_map,
retry_frequency,
)
}
Ok(PeerManagerMessage::RetryPending) => retry_pending(
&mut peers,
connector.clone(),
&mut unreferenced_peers,
max_retry_frequency,
),
Err(_) => {
warn!("All senders have disconnected");
break;
}
}
}
if let Err(err) = connector.unsubscribe(subscriber_id) {
error!(
"Unable to unsubscribe from connection manager notifications: {}",
err
);
}
debug!("Shutting down peer manager pacemaker...");
pacemaker.await_shutdown();
debug!("Shutting down peer manager pacemaker (complete)");
})
.map_err(|err| {
PeerManagerError::StartUpError(format!(
"Unable to start PeerManager thread {}",
err
))
})?;
Ok(PeerManager {
join_handle,
sender,
pacemaker_shutdown_signaler,
})
}
}
impl ShutdownHandle for PeerManager {
fn signal_shutdown(&mut self) {
self.pacemaker_shutdown_signaler.shutdown();
if self.sender.send(PeerManagerMessage::Shutdown).is_err() {
warn!("PeerManager is no longer running");
}
}
fn wait_for_shutdown(self) -> Result<(), InternalError> {
debug!("Shutting down peer manager...");
self.join_handle.join().map_err(|err| {
InternalError::with_message(format!(
"Peer manager thread did not shutdown correctly: {:?}",
err
))
})?;
debug!("Shutting down peer manager (complete)");
Ok(())
}
}
#[allow(clippy::too_many_arguments)]
fn handle_request(
request: PeerManagerRequest,
connector: Connector,
unreferenced_peers: &mut UnreferencedPeerState,
peers: &mut PeerMap,
peer_remover: &PeerRemover,
ref_map: &mut RefMap<PeerTokenPair>,
subscribers: &mut SubscriberMap,
strict_ref_counts: bool,
) {
match request {
PeerManagerRequest::AddPeer {
peer_id,
endpoints,
required_local_auth,
sender,
} => {
if sender
.send(add_peer(
peer_id,
endpoints,
connector,
unreferenced_peers,
peers,
peer_remover,
ref_map,
subscribers,
required_local_auth,
))
.is_err()
{
warn!("Connector dropped before receiving result of adding peer");
}
}
PeerManagerRequest::AddUnidentified {
endpoint,
local_authorization,
sender,
} => {
if sender
.send(Ok(add_unidentified(
endpoint,
connector,
unreferenced_peers,
peer_remover,
peers,
ref_map,
local_authorization,
)))
.is_err()
{
warn!("Connector dropped before receiving result of adding unidentified peer");
}
}
PeerManagerRequest::RemovePeer { peer_id, sender } => {
if sender
.send(remove_peer(
peer_id,
connector,
unreferenced_peers,
peers,
ref_map,
strict_ref_counts,
))
.is_err()
{
warn!("Connector dropped before receiving result of removing peer");
}
}
PeerManagerRequest::RemovePeerByEndpoint {
endpoint,
connection_id,
sender,
} => {
if sender
.send(remove_peer_by_endpoint(
endpoint,
connection_id,
connector,
peers,
ref_map,
strict_ref_counts,
))
.is_err()
{
warn!("Connector dropped before receiving result of removing peer");
}
}
PeerManagerRequest::ListPeers { sender } => {
if sender.send(Ok(peers.peer_ids())).is_err() {
warn!("Connector dropped before receiving result of list peers");
}
}
PeerManagerRequest::ListUnreferencedPeers { sender } => {
let peer_ids = unreferenced_peers
.peers
.keys()
.map(|s| s.to_owned())
.collect();
if sender.send(Ok(peer_ids)).is_err() {
warn!("Connector dropped before receiving result of list unreferenced peers");
}
}
PeerManagerRequest::ConnectionIds { sender } => {
if sender.send(Ok(peers.connection_ids())).is_err() {
warn!("Connector dropped before receiving result of connection IDs");
}
}
PeerManagerRequest::GetConnectionId { peer_id, sender } => {
let connection_id = peers
.get_by_peer_id(&peer_id)
.map(|meta| meta.connection_id.clone())
.or_else(|| {
unreferenced_peers
.peers
.get(&peer_id)
.map(|meta| meta.connection_id.clone())
});
if sender.send(Ok(connection_id)).is_err() {
warn!("Connector dropped before receiving result of getting connection ID");
}
}
PeerManagerRequest::GetPeerId {
connection_id,
sender,
} => {
let peer_id = peers
.get_by_connection_id(&connection_id)
.map(|meta| PeerTokenPair::new(meta.id.clone(), meta.required_local_auth.clone()))
.or_else(|| {
unreferenced_peers
.get_by_connection_id(&connection_id)
.map(|(peer_id, _)| peer_id)
});
if sender.send(Ok(peer_id)).is_err() {
warn!("Connector dropped before receiving result of getting peer ID");
}
}
PeerManagerRequest::Subscribe { sender, callback } => {
let subscriber_id = subscribers.add_subscriber(callback);
if sender.send(Ok(subscriber_id)).is_err() {
warn!("connector dropped before receiving result of remove connection");
}
}
PeerManagerRequest::Unsubscribe {
sender,
subscriber_id,
} => {
subscribers.remove_subscriber(subscriber_id);
if sender.send(Ok(())).is_err() {
warn!("connector dropped before receiving result of remove connection");
}
}
};
}
#[allow(clippy::too_many_arguments)]
fn add_peer(
peer_id: PeerAuthorizationToken,
endpoints: Vec<String>,
connector: Connector,
unreferenced_peers: &mut UnreferencedPeerState,
peers: &mut PeerMap,
peer_remover: &PeerRemover,
ref_map: &mut RefMap<PeerTokenPair>,
subscribers: &mut SubscriberMap,
required_local_auth: PeerAuthorizationToken,
) -> Result<PeerRef, PeerRefAddError> {
let peer_token_pair = PeerTokenPair::new(peer_id.clone(), required_local_auth.clone());
if check_for_duplicate_endpoint(&peer_id, &endpoints, peers) {
return Err(PeerRefAddError::AddError(format!(
"Peer {} contains endpoints that already belong to another peer using trust",
peer_id
)));
}
let new_ref_count = ref_map.add_ref(peer_token_pair.clone());
if new_ref_count > 1 {
if let Some(mut peer_metadata) = peers.get_by_peer_id(&peer_token_pair).cloned() {
if peer_metadata.endpoints.len() == 1 && endpoints.len() > 1 {
if let Some(endpoint) = peer_metadata.endpoints.get(0) {
if unreferenced_peers
.requested_endpoints
.contains_key(endpoint)
&& endpoints.contains(endpoint)
{
info!(
"Updating peer {} to include endpoints {:?}",
peer_id, endpoints
);
peer_metadata.endpoints = endpoints;
peers.update_peer(peer_metadata.clone()).map_err(|err| {
PeerRefAddError::AddError(format!(
"Unable to update peer {}: {}",
peer_id, err
))
})?
} else {
if let Err(err) = ref_map.remove_ref(&peer_token_pair) {
error!(
"Unable to remove ref that was just added for peer {}: {}",
peer_id, err
);
};
return Err(PeerRefAddError::AddError(format!(
"Mismatch betwen existing and requested peer endpoints: {:?} does not \
contain {}",
endpoints, endpoint
)));
}
} else {
return Err(PeerRefAddError::AddError(format!(
"Peer {} does not have any endpoints",
peer_id
)));
}
}
if peer_metadata.status == PeerStatus::Connected {
let notification = PeerManagerNotification::Connected {
peer: peer_token_pair.clone(),
};
subscribers.broadcast(notification);
}
let peer_ref = PeerRef::new(peer_token_pair, peer_remover.clone());
return Ok(peer_ref);
} else {
return Err(PeerRefAddError::AddError(format!(
"A reference exists for peer {} but missing peer metadata",
peer_id
)));
}
};
if let Some(UnreferencedPeer {
connection_id,
endpoint,
old_connection_ids,
..
}) = unreferenced_peers.peers.remove(&peer_token_pair)
{
debug!("Updating unreferenced peer to full peer {}", peer_id);
peers.insert(
peer_id,
connection_id,
endpoints,
endpoint,
PeerStatus::Connected,
required_local_auth,
old_connection_ids,
);
let notification = PeerManagerNotification::Connected {
peer: peer_token_pair.clone(),
};
subscribers.broadcast(notification);
let peer_ref = PeerRef::new(peer_token_pair.clone(), peer_remover.clone());
return Ok(peer_ref);
}
info!("Attempting to peer with {}", peer_id);
let connection_id = format!("{}", Uuid::new_v4());
let mut active_endpoint = match endpoints.get(0) {
Some(endpoint) => endpoint.to_string(),
None => {
if let Err(err) = ref_map.remove_ref(&peer_token_pair) {
error!(
"Unable to remove ref that was just added for peer {}: {}",
peer_id, err
);
};
return Err(PeerRefAddError::AddError(format!(
"No endpoints provided for peer {}",
peer_id
)));
}
};
for endpoint in endpoints.iter() {
match connector.request_connection(
endpoint,
&connection_id,
Some(peer_id.clone().into()),
Some(required_local_auth.clone().into()),
) {
Ok(()) => {
active_endpoint = endpoint.to_string();
break;
}
Err(err) => {
log_connect_request_err(err, &peer_id, endpoint);
}
}
}
peers.insert(
peer_id,
connection_id,
endpoints.to_vec(),
active_endpoint,
PeerStatus::Pending,
required_local_auth,
vec![],
);
let peer_ref = PeerRef::new(peer_token_pair, peer_remover.clone());
Ok(peer_ref)
}
fn add_unidentified(
endpoint: String,
connector: Connector,
unreferenced_peers: &mut UnreferencedPeerState,
peer_remover: &PeerRemover,
peers: &PeerMap,
ref_map: &mut RefMap<PeerTokenPair>,
local_authorization: PeerAuthorizationToken,
) -> EndpointPeerRef {
info!("Attempting to peer with peer by endpoint {}", endpoint);
if let Some(peer_metadatas) = peers.get_peer_from_endpoint(&endpoint) {
for peer_metadata in peer_metadatas {
if peer_metadata.required_local_auth == local_authorization {
let peer_token_pair = PeerTokenPair::new(
peer_metadata.id.clone(),
peer_metadata.required_local_auth.clone(),
);
ref_map.add_ref(peer_token_pair);
return EndpointPeerRef::new(
endpoint,
peer_metadata.connection_id,
peer_remover.clone(),
);
}
}
}
let connection_id = format!("{}", Uuid::new_v4());
match connector.request_connection(
&endpoint,
&connection_id,
None,
Some(local_authorization.clone().into()),
) {
Ok(()) => (),
Err(err) => {
warn!("Unable to peer with peer at {}: {}", endpoint, err);
}
};
unreferenced_peers.requested_endpoints.insert(
endpoint.to_string(),
RequestedEndpoint {
endpoint: endpoint.to_string(),
local_authorization,
},
);
EndpointPeerRef::new(endpoint, connection_id, peer_remover.clone())
}
fn remove_peer(
peer_id: PeerTokenPair,
connector: Connector,
unreferenced_peers: &mut UnreferencedPeerState,
peers: &mut PeerMap,
ref_map: &mut RefMap<PeerTokenPair>,
strict_ref_counts: bool,
) -> Result<(), PeerRefRemoveError> {
debug!("Removing peer: {}", peer_id);
unreferenced_peers.peers.remove(&peer_id);
let removed_peer = match ref_map.remove_ref(&peer_id) {
Ok(removed_peer) => removed_peer,
Err(err) => {
if strict_ref_counts {
panic!(
"Trying to remove a reference that does not exist: {}",
peer_id
);
} else {
return Err(PeerRefRemoveError::Remove(format!(
"Failed to remove ref for peer {} from ref map: {}",
peer_id, err
)));
}
}
};
if removed_peer.is_some() {
let peer_metadata = peers.remove(&peer_id).ok_or_else(|| {
PeerRefRemoveError::Remove(format!(
"Peer {} has already been removed from the peer map",
peer_id
))
})?;
if peer_metadata.status == PeerStatus::Pending {
return Ok(());
}
match connector
.remove_connection(&peer_metadata.active_endpoint, &peer_metadata.connection_id)
{
Ok(Some(_)) => {
debug!(
"Peer {} has been removed and connection {} has been closed",
peer_id, peer_metadata.active_endpoint
);
Ok(())
}
Ok(None) => Err(PeerRefRemoveError::Remove(format!(
"The connection for peer {}'s active endpoint ({}) has already been removed",
peer_id, peer_metadata.active_endpoint
))),
Err(err) => Err(PeerRefRemoveError::Remove(format!("{}", err))),
}
} else {
Ok(())
}
}
fn remove_peer_by_endpoint(
endpoint: String,
connection_id: String,
connector: Connector,
peers: &mut PeerMap,
ref_map: &mut RefMap<PeerTokenPair>,
strict_ref_counts: bool,
) -> Result<(), PeerRefRemoveError> {
let peer_metadata = match peers.get_by_connection_id(&connection_id) {
Some(peer_metadata) => peer_metadata,
None => {
return Err(PeerRefRemoveError::Remove(format!(
"Peer with endpoint {} has already been removed from the peer map",
endpoint
)))
}
};
let peer_token_pair = PeerTokenPair::new(
peer_metadata.id.clone(),
peer_metadata.required_local_auth.clone(),
);
debug!(
"Removing peer {} by endpoint: {}",
peer_token_pair, endpoint
);
let removed_peer = match ref_map.remove_ref(&peer_token_pair) {
Ok(removed_peer) => removed_peer,
Err(err) => {
if strict_ref_counts {
panic!(
"Trying to remove a reference that does not exist: {}",
peer_token_pair
);
} else {
return Err(PeerRefRemoveError::Remove(format!(
"Failed to remove ref for peer {} from ref map: {}",
peer_token_pair, err
)));
}
}
};
if let Some(removed_peer) = removed_peer {
let peer_metadata = peers.remove(&removed_peer).ok_or_else(|| {
PeerRefRemoveError::Remove(format!(
"Peer with endpoint {} has already been removed from the peer map",
endpoint
))
})?;
if peer_metadata.status == PeerStatus::Pending {
return Ok(());
}
match connector
.remove_connection(&peer_metadata.active_endpoint, &peer_metadata.connection_id)
{
Ok(Some(_)) => {
debug!(
"Peer {} has been removed and connection {} has been closed",
peer_token_pair, peer_metadata.active_endpoint
);
Ok(())
}
Ok(None) => Err(PeerRefRemoveError::Remove(format!(
"The connection for peer {}'s active endpoint ({}) has already been removed",
peer_token_pair, peer_metadata.active_endpoint
))),
Err(err) => Err(PeerRefRemoveError::Remove(format!("{}", err))),
}
} else {
Ok(())
}
}
#[allow(clippy::too_many_arguments)]
fn handle_notifications(
notification: ConnectionManagerNotification,
unreferenced_peers: &mut UnreferencedPeerState,
peers: &mut PeerMap,
connector: Connector,
subscribers: &mut SubscriberMap,
max_retry_attempts: u64,
ref_map: &mut RefMap<PeerTokenPair>,
retry_frequency: u64,
) {
match notification {
ConnectionManagerNotification::Disconnected {
endpoint,
identity,
connection_id,
} => handle_disconnection(
endpoint,
PeerAuthorizationToken::from(identity),
connection_id,
unreferenced_peers,
peers,
connector,
subscribers,
),
ConnectionManagerNotification::NonFatalConnectionError {
endpoint,
attempts,
connection_id,
..
} => {
if let Some(mut peer_metadata) = peers.get_by_connection_id(&connection_id).cloned() {
info!(
"{} reconnection attempts have been made to peer {}",
attempts, peer_metadata.id
);
if attempts >= max_retry_attempts {
if endpoint != peer_metadata.active_endpoint {
warn!(
"Received non fatal connection notification for peer {} with \
different endpoint {}",
peer_metadata.id, endpoint
);
return;
};
info!(
"Attempting to find available endpoint for {}",
peer_metadata.id
);
for endpoint in peer_metadata.endpoints.iter() {
if endpoint == &peer_metadata.active_endpoint {
continue;
}
match connector.request_connection(
endpoint,
&peer_metadata.connection_id,
Some(peer_metadata.id.clone().into()),
Some(peer_metadata.required_local_auth.clone().into()),
) {
Ok(()) => break,
Err(err) => {
log_connect_request_err(err, &peer_metadata.id, endpoint);
}
}
}
}
peer_metadata.status = PeerStatus::Disconnected {
retry_attempts: attempts,
};
if let Err(err) = peers.update_peer(peer_metadata) {
error!("Unable to update peer: {}", err);
}
}
}
ConnectionManagerNotification::InboundConnection {
endpoint,
connection_id,
identity,
local_identity,
} => handle_inbound_connection(
endpoint,
PeerAuthorizationToken::from(identity),
connection_id,
PeerAuthorizationToken::from(local_identity),
unreferenced_peers,
peers,
connector,
subscribers,
retry_frequency,
),
ConnectionManagerNotification::Connected {
endpoint,
identity,
local_identity,
connection_id,
} => handle_connected(
endpoint,
PeerAuthorizationToken::from(identity),
connection_id,
PeerAuthorizationToken::from(local_identity),
unreferenced_peers,
peers,
connector,
subscribers,
ref_map,
retry_frequency,
),
ConnectionManagerNotification::FatalConnectionError {
connection_id,
error,
..
} => handle_fatal_connection(
connection_id,
error.to_string(),
peers,
subscribers,
max_retry_attempts,
),
}
}
fn handle_disconnection(
endpoint: String,
identity: PeerAuthorizationToken,
connection_id: String,
unreferenced_peers: &mut UnreferencedPeerState,
peers: &mut PeerMap,
connector: Connector,
subscribers: &mut SubscriberMap,
) {
if let Some(mut peer_metadata) = peers.get_by_connection_id(&connection_id).cloned() {
if endpoint != peer_metadata.active_endpoint {
warn!(
"Received disconnection notification for peer {} with \
different endpoint {}",
peer_metadata.id, endpoint
);
return;
}
let notification = PeerManagerNotification::Disconnected {
peer: PeerTokenPair::new(
peer_metadata.id.clone(),
peer_metadata.required_local_auth.clone(),
),
};
info!("Peer {} is currently disconnected", peer_metadata.id);
if peer_metadata.endpoints.contains(&endpoint) {
peer_metadata.status = PeerStatus::Disconnected { retry_attempts: 1 };
if let Err(err) = peers.update_peer(peer_metadata) {
error!("Unable to update peer: {}", err);
}
} else {
if let Err(err) = connector
.remove_connection(&peer_metadata.active_endpoint, &peer_metadata.connection_id)
{
error!("Unable to clean up old connection: {}", err);
}
info!("Attempting to find available endpoint for {}", identity);
for endpoint in peer_metadata.endpoints.iter() {
match connector.request_connection(
endpoint,
&peer_metadata.connection_id,
Some(identity.clone().into()),
Some(peer_metadata.required_local_auth.clone().into()),
) {
Ok(()) => break,
Err(err) => {
log_connect_request_err(err, &peer_metadata.id, endpoint);
}
}
}
peer_metadata.status = PeerStatus::Pending;
if let Err(err) = peers.update_peer(peer_metadata) {
error!("Unable to update peer: {}", err);
}
}
subscribers.broadcast(notification);
} else {
debug!("Removing disconnected peer: {}", identity);
let unreferenced_peer = unreferenced_peers.get_by_connection_id(&connection_id);
if let Some((id, unref_peer)) = unreferenced_peer {
unreferenced_peers.peers.remove(&id);
if let Err(err) =
connector.remove_connection(&unref_peer.endpoint, &unref_peer.connection_id)
{
error!("Unable to clean up old connection: {}", err);
}
}
}
}
#[allow(clippy::too_many_arguments)]
fn handle_inbound_connection(
endpoint: String,
identity: PeerAuthorizationToken,
connection_id: String,
local_authorization: PeerAuthorizationToken,
unreferenced_peers: &mut UnreferencedPeerState,
peers: &mut PeerMap,
connector: Connector,
subscribers: &mut SubscriberMap,
retry_frequency: u64,
) {
info!(
"Received peer connection from {} (remote endpoint: {})",
identity, endpoint
);
let peer_token_pair = PeerTokenPair::new(identity.clone(), local_authorization.clone());
if let Some(mut peer_metadata) = peers.get_by_peer_id(&peer_token_pair).cloned() {
match peer_metadata.status {
PeerStatus::Disconnected { .. } => {
info!(
"Adding inbound connection to Disconnected peer: {}",
peer_metadata.id
);
}
PeerStatus::Pending => {
info!(
"Adding inbound connection to Pending peer: {} ({})",
identity, connection_id
);
}
PeerStatus::Connected => {
if peer_metadata.required_local_auth > identity {
debug!(
"Removing inbound connection, already connected to {} ({})",
peer_metadata.id, connection_id
);
if let Err(err) = connector.remove_connection(&endpoint, &connection_id) {
error!("Unable to clean up old connection: {}", err);
}
return;
} else {
info!(
"Replacing existing connection with inbound for peer {} ({})",
peer_metadata.id, connection_id
);
}
}
}
let old_endpoint = peer_metadata.active_endpoint;
let old_connection_id = peer_metadata.connection_id;
let starting_status = peer_metadata.status;
peer_metadata.status = PeerStatus::Connected;
peer_metadata.connection_id = connection_id.clone();
peer_metadata.retry_frequency = retry_frequency;
peer_metadata.last_connection_attempt = Instant::now();
let notification = PeerManagerNotification::Connected {
peer: peer_token_pair.clone(),
};
peer_metadata.active_endpoint = endpoint;
if let Err(err) = peers.update_peer(peer_metadata) {
error!("Unable to update peer: {}", err);
}
subscribers.broadcast(notification);
if connection_id != old_connection_id && starting_status != PeerStatus::Pending {
if let Err(err) = connector.remove_connection(&old_endpoint, &old_connection_id) {
warn!("Unable to clean up old connection: {}", err);
}
}
} else if let Some(unreferenced_peer) = unreferenced_peers.peers.get_mut(&peer_token_pair) {
if unreferenced_peer.local_authorization > identity {
debug!(
"Removing inbound connection, already connected to unreferenced peer {} ({})",
peer_token_pair, connection_id
);
if let Err(err) = connector.remove_connection(&endpoint, &connection_id) {
error!("Unable to clean up old connection: {}", err);
}
} else {
info!(
"Replacing existing connection with inbound for unreferenced peer {} ({})",
peer_token_pair, connection_id
);
debug!(
"Removing old peer connection for unreferenced peer {}: {}",
peer_token_pair, unreferenced_peer.connection_id
);
if let Err(err) = connector.remove_connection(
&unreferenced_peer.endpoint,
&unreferenced_peer.connection_id,
) {
error!("Unable to clean up old connection: {}", err);
}
let mut old_connection_ids = unreferenced_peer.old_connection_ids.to_vec();
old_connection_ids.push(unreferenced_peer.connection_id.to_string());
*unreferenced_peer = UnreferencedPeer {
connection_id,
endpoint,
local_authorization,
old_connection_ids,
};
}
} else {
debug!(
"Add inbound unreferenced peer for {} ({})",
peer_token_pair, connection_id
);
unreferenced_peers.peers.insert(
peer_token_pair,
UnreferencedPeer {
connection_id,
endpoint,
local_authorization,
old_connection_ids: vec![],
},
);
}
}
#[allow(clippy::too_many_arguments, clippy::cognitive_complexity)]
fn handle_connected(
endpoint: String,
identity: PeerAuthorizationToken,
connection_id: String,
local_authorization: PeerAuthorizationToken,
unreferenced_peers: &mut UnreferencedPeerState,
peers: &mut PeerMap,
connector: Connector,
subscribers: &mut SubscriberMap,
ref_map: &mut RefMap<PeerTokenPair>,
retry_frequency: u64,
) {
let peer_token_pair = PeerTokenPair::new(identity.clone(), local_authorization.clone());
if let Some(mut peer_metadata) = peers.get_by_peer_id(&peer_token_pair).cloned() {
match peer_metadata.status {
PeerStatus::Pending => {
info!(
"Pending peer {} connected via {}",
peer_metadata.id, endpoint
);
}
PeerStatus::Disconnected { .. } => {
info!(
"Disconnected peer {} connected via {}",
peer_metadata.id, endpoint
);
}
PeerStatus::Connected => {
if peer_metadata.required_local_auth < identity {
info!(
"Removing outbound connection, peer {} is already connected ({})",
peer_metadata.id, connection_id
);
if endpoint != peer_metadata.active_endpoint {
if let Err(err) = connector.remove_connection(&endpoint, &connection_id) {
error!("Unable to clean up old connection: {}", err);
}
}
return;
} else {
info!(
"Replacing existing connection with outbound for peer {} connected via \
{} ({})",
peer_metadata.id, endpoint, connection_id
);
}
}
}
let notification = PeerManagerNotification::Connected {
peer: peer_token_pair.clone(),
};
let starting_status = peer_metadata.status;
let old_endpoint = peer_metadata.active_endpoint;
let old_connection_id = peer_metadata.connection_id;
peer_metadata.active_endpoint = endpoint;
peer_metadata.status = PeerStatus::Connected;
peer_metadata.connection_id = connection_id.clone();
peer_metadata.retry_frequency = retry_frequency;
peer_metadata.last_connection_attempt = Instant::now();
if let Err(err) = peers.update_peer(peer_metadata) {
error!("Unable to update peer: {}", err);
}
if connection_id != old_connection_id && starting_status != PeerStatus::Pending {
if let Err(err) = connector.remove_connection(&old_endpoint, &old_connection_id) {
error!("Unable to clean up old connection: {}", err);
}
}
subscribers.broadcast(notification);
} else {
if let Some(requested_endpoint) = unreferenced_peers.requested_endpoints.get(&endpoint) {
let mut new_peer_endpoint = endpoint.to_string();
let mut new_peer_connection_id = connection_id.clone();
let mut old_connection_ids = vec![];
if let Some(unreferenced_peer) = unreferenced_peers.peers.remove(&peer_token_pair) {
if unreferenced_peer.local_authorization < identity {
info!(
"Removing outbound connection, peer {} is already connected via \
unreferenced ({})",
peer_token_pair, connection_id
);
new_peer_endpoint = unreferenced_peer.endpoint.to_string();
new_peer_connection_id = unreferenced_peer.connection_id.to_string();
old_connection_ids = unreferenced_peer.old_connection_ids.clone();
old_connection_ids.push(connection_id.clone());
if endpoint != unreferenced_peer.endpoint {
if let Err(err) = connector.remove_connection(&endpoint, &connection_id) {
error!("Unable to clean up old connection: {}", err);
}
}
} else {
info!(
"Replacing existing unreferenced connection with outbound for peer {} \
connected via {} ({})",
peer_token_pair, endpoint, connection_id
);
old_connection_ids.push(unreferenced_peer.connection_id.to_string());
if let Err(err) = connector.remove_connection(
&unreferenced_peer.endpoint,
&unreferenced_peer.connection_id,
) {
error!("Unable to clean up old connection: {}", err);
}
}
}
debug!(
"Adding peer {} by endpoint {} ({})",
peer_token_pair, endpoint, connection_id
);
ref_map.add_ref(peer_token_pair.clone());
peers.insert(
identity,
new_peer_connection_id,
vec![endpoint.to_string()],
new_peer_endpoint,
PeerStatus::Connected,
requested_endpoint.local_authorization.clone(),
old_connection_ids,
);
let notification = PeerManagerNotification::Connected {
peer: peer_token_pair.clone(),
};
subscribers.broadcast(notification);
return;
}
if let Some(unreferenced_peer) = unreferenced_peers.peers.get_mut(&peer_token_pair) {
if unreferenced_peer.local_authorization < identity {
debug!(
"Removing outbound connection, already connected to unreferenced peer {} ({})",
peer_token_pair, connection_id
);
if let Err(err) = connector.remove_connection(&endpoint, &connection_id) {
error!("Unable to clean up old connection: {}", err);
}
} else {
info!(
"Replacing existing connection with outbound for unreferenced peer {} ({})",
peer_token_pair, connection_id
);
debug!(
"Removing old peer connection for unreferenced peer {}: {}",
peer_token_pair, unreferenced_peer.connection_id
);
if let Err(err) = connector.remove_connection(
&unreferenced_peer.endpoint,
&unreferenced_peer.connection_id,
) {
error!("Unable to clean up old connection: {}", err);
}
let mut old_connection_ids = unreferenced_peer.old_connection_ids.to_vec();
old_connection_ids.push(unreferenced_peer.connection_id.to_string());
*unreferenced_peer = UnreferencedPeer {
connection_id,
endpoint,
local_authorization,
old_connection_ids,
};
}
} else {
debug!(
"Adding outbound unreferenced peer {} by endpoint {} ({})",
peer_token_pair, endpoint, connection_id
);
unreferenced_peers.peers.insert(
peer_token_pair,
UnreferencedPeer {
connection_id,
endpoint,
local_authorization,
old_connection_ids: vec![],
},
);
}
}
}
fn handle_fatal_connection(
connection_id: String,
error: String,
peers: &mut PeerMap,
subscribers: &mut SubscriberMap,
max_retry_frequency: u64,
) {
if let Some(mut peer_metadata) = peers.get_by_connection_id(&connection_id).cloned() {
warn!(
"Peer {} encountered a fatal connection error: {}",
peer_metadata.id, error
);
let notification = PeerManagerNotification::Disconnected {
peer: PeerTokenPair::new(
peer_metadata.id.clone(),
peer_metadata.required_local_auth.clone(),
),
};
peer_metadata.retry_frequency = min(peer_metadata.retry_frequency * 2, max_retry_frequency);
peer_metadata.last_connection_attempt = Instant::now();
peer_metadata.status = PeerStatus::Pending;
if let Err(err) = peers.update_peer(peer_metadata) {
error!("Unable to update peer: {}", err);
}
subscribers.broadcast(notification);
}
}
fn retry_pending(
peers: &mut PeerMap,
connector: Connector,
unreferenced_peers: &mut UnreferencedPeerState,
max_retry_frequency: u64,
) {
let mut to_retry = Vec::new();
for (_, peer) in peers.get_pending() {
if peer.last_connection_attempt.elapsed().as_secs() > peer.retry_frequency {
to_retry.push(peer.clone());
}
}
for mut peer_metadata in to_retry {
debug!("Attempting to peer with pending peer {}", peer_metadata.id);
for endpoint in peer_metadata.endpoints.iter() {
match connector.request_connection(
endpoint,
&peer_metadata.connection_id,
Some(peer_metadata.id.clone().into()),
Some(peer_metadata.required_local_auth.clone().into()),
) {
Ok(()) => peer_metadata.active_endpoint = endpoint.to_string(),
Err(err) => {
log_connect_request_err(err, &peer_metadata.id, endpoint);
}
}
}
peer_metadata.retry_frequency = min(peer_metadata.retry_frequency * 2, max_retry_frequency);
peer_metadata.last_connection_attempt = Instant::now();
if let Err(err) = peers.update_peer(peer_metadata) {
error!("Unable to update peer: {}", err);
}
}
if unreferenced_peers
.last_connection_attempt
.elapsed()
.as_secs()
> unreferenced_peers.retry_frequency
{
for (endpoint, requested_endpoint) in unreferenced_peers.requested_endpoints.iter() {
if peers.contains_endpoint(&requested_endpoint.endpoint) {
continue;
}
info!("Attempting to peer with peer by {}", endpoint);
let connection_id = format!("{}", Uuid::new_v4());
match connector.request_connection(
endpoint,
&connection_id,
None,
Some(requested_endpoint.local_authorization.clone().into()),
) {
Ok(()) => (),
Err(err) => match err {
ConnectionManagerError::ConnectionCreationError {
context,
error_kind: None,
} => {
info!(
"Unable to request connection for peer endpoint {}: {}",
endpoint, context
);
}
ConnectionManagerError::ConnectionCreationError {
context,
error_kind: Some(err_kind),
} => match err_kind {
ErrorKind::ConnectionRefused => info!(
"Received connection refused while attempting to establish a \
connection to peer at endpoint {}",
endpoint
),
_ => info!(
"Unable to request connection for peer at {}: {}",
endpoint, context,
),
},
_ => info!(
"Unable to request connection for peer at endpoint {}: {}",
endpoint,
err.to_string()
),
},
}
}
unreferenced_peers.last_connection_attempt = Instant::now();
}
}
fn log_connect_request_err(
err: ConnectionManagerError,
peer_id: &PeerAuthorizationToken,
endpoint: &str,
) {
match err {
ConnectionManagerError::ConnectionCreationError {
context,
error_kind: None,
} => {
info!(
"Unable to request connection for peer {}: {}",
peer_id, context
);
}
ConnectionManagerError::ConnectionCreationError {
context,
error_kind: Some(err_kind),
} => match err_kind {
ErrorKind::ConnectionRefused => info!(
"Received connection refused while attempting to establish a \
connection to peer {}: endpoint {}",
peer_id, endpoint
),
_ => info!(
"Unable to request connection for peer {}: {}",
peer_id, context
),
},
_ => info!(
"Unable to request connection for peer {}: {}",
peer_id,
err.to_string()
),
}
}
fn check_for_duplicate_endpoint(
peer_id: &PeerAuthorizationToken,
endpoints: &[String],
peer_map: &PeerMap,
) -> bool {
if matches!(peer_id, PeerAuthorizationToken::Challenge { .. }) {
return false;
}
for endpoint in endpoints {
if let Some(peers) = peer_map.get_peer_from_endpoint(endpoint) {
for peer_meta in peers {
if matches!(peer_meta.id, PeerAuthorizationToken::Trust { .. })
&& &peer_meta.id != peer_id
{
return true;
}
}
}
}
false
}
#[cfg(test)]
pub mod tests {
use super::*;
use std::collections::VecDeque;
use std::sync::mpsc;
use std::time::Duration;
use protobuf::Message;
use crate::mesh::Mesh;
use crate::network::auth::ConnectionAuthorizationType;
use crate::network::connection_manager::{
AuthorizationResult, Authorizer, AuthorizerError, ConnectionManager,
};
use crate::protos::network::{NetworkMessage, NetworkMessageType};
use crate::threading::lifecycle::ShutdownHandle;
use crate::transport::inproc::InprocTransport;
use crate::transport::raw::RawTransport;
use crate::transport::{Connection, Transport};
#[test]
fn test_peer_manager_add_peer() {
let mut transport = Box::new(InprocTransport::default());
let mut listener = transport.listen("inproc://test").unwrap();
thread::spawn(move || {
listener.accept().unwrap();
});
let mut mesh = Mesh::new(512, 128);
let mut cm = ConnectionManager::builder()
.with_authorizer(Box::new(NoopAuthorizer::new("test_peer")))
.with_matrix_life_cycle(mesh.get_life_cycle())
.with_matrix_sender(mesh.get_sender())
.with_transport(transport.clone())
.start()
.expect("Unable to start Connection Manager");
let connector = cm.connector();
let mut peer_manager = PeerManager::builder()
.with_connector(connector)
.with_retry_interval(1)
.with_identity("my_id".to_string())
.with_strict_ref_counts(true)
.start()
.expect("Cannot start peer_manager");
let peer_connector = peer_manager.connector();
let (tx, notification_rx): (
Sender<PeerManagerNotification>,
mpsc::Receiver<PeerManagerNotification>,
) = channel();
peer_connector
.subscribe_sender(tx)
.expect("Unable to get subscriber");
let peer_ref = peer_connector
.add_peer_ref(
PeerAuthorizationToken::from_peer_id("test_peer"),
vec!["inproc://test".to_string()],
PeerAuthorizationToken::from_peer_id("my_id"),
)
.expect("Unable to add peer");
assert_eq!(
peer_ref.peer_id(),
&PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("test_peer"),
PeerAuthorizationToken::from_peer_id("my_id"),
)
);
let timeout = Duration::from_secs(60);
let notification = notification_rx
.recv_timeout(timeout)
.expect("Unable to get new notifications");
assert!(
notification
== PeerManagerNotification::Connected {
peer: PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("test_peer"),
PeerAuthorizationToken::from_peer_id("my_id"),
)
}
);
peer_manager.signal_shutdown();
cm.signal_shutdown();
peer_manager
.wait_for_shutdown()
.expect("Unable to shutdown peer manager");
cm.wait_for_shutdown()
.expect("Unable to shutdown connection manager");
mesh.signal_shutdown();
mesh.wait_for_shutdown().expect("Unable to shutdown mesh");
}
#[test]
fn test_peer_manager_add_peer_duplicate_endpoint() {
let mut transport = Box::new(InprocTransport::default());
let mut listener = transport.listen("inproc://test").unwrap();
thread::spawn(move || {
listener.accept().unwrap();
});
let mut mesh = Mesh::new(512, 128);
let mut cm = ConnectionManager::builder()
.with_authorizer(Box::new(NoopAuthorizer::new("test_peer")))
.with_matrix_life_cycle(mesh.get_life_cycle())
.with_matrix_sender(mesh.get_sender())
.with_transport(transport.clone())
.start()
.expect("Unable to start Connection Manager");
let connector = cm.connector();
let mut peer_manager = PeerManager::builder()
.with_connector(connector)
.with_retry_interval(1)
.with_identity("my_id".to_string())
.with_strict_ref_counts(true)
.start()
.expect("Cannot start peer_manager");
let peer_connector = peer_manager.connector();
let (tx, notification_rx): (
Sender<PeerManagerNotification>,
mpsc::Receiver<PeerManagerNotification>,
) = channel();
peer_connector
.subscribe_sender(tx)
.expect("Unable to get subscriber");
let peer_ref = peer_connector
.add_peer_ref(
PeerAuthorizationToken::from_peer_id("test_peer"),
vec!["inproc://test".to_string()],
PeerAuthorizationToken::from_peer_id("my_id"),
)
.expect("Unable to add peer");
assert_eq!(
peer_ref.peer_id(),
&PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("test_peer"),
PeerAuthorizationToken::from_peer_id("my_id"),
)
);
let timeout = Duration::from_secs(60);
let notification = notification_rx
.recv_timeout(timeout)
.expect("Unable to get new notifications");
assert!(
notification
== PeerManagerNotification::Connected {
peer: PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("test_peer"),
PeerAuthorizationToken::from_peer_id("my_id"),
)
}
);
if peer_connector
.add_peer_ref(
PeerAuthorizationToken::from_peer_id("different_peer"),
vec!["inproc://test".to_string()],
PeerAuthorizationToken::from_peer_id("my_id"),
)
.is_ok()
{
panic!(
"Should not have been able to add a different trust peer with duplicate \
endpoint"
)
}
peer_manager.signal_shutdown();
cm.signal_shutdown();
peer_manager
.wait_for_shutdown()
.expect("Unable to shutdown peer manager");
cm.wait_for_shutdown()
.expect("Unable to shutdown connection manager");
mesh.signal_shutdown();
mesh.wait_for_shutdown().expect("Unable to shutdown mesh");
}
#[test]
fn test_peer_manager_add_peer_endpoints() {
let mut transport = Box::new(InprocTransport::default());
let mut listener = transport.listen("inproc://test").unwrap();
thread::spawn(move || {
listener.accept().unwrap();
});
let mut mesh = Mesh::new(512, 128);
let mut cm = ConnectionManager::builder()
.with_authorizer(Box::new(NoopAuthorizer::new("test_peer")))
.with_matrix_life_cycle(mesh.get_life_cycle())
.with_matrix_sender(mesh.get_sender())
.with_transport(transport.clone())
.start()
.expect("Unable to start Connection Manager");
let connector = cm.connector();
let mut peer_manager = PeerManager::builder()
.with_connector(connector)
.with_retry_interval(1)
.with_identity("my_id".to_string())
.with_strict_ref_counts(true)
.start()
.expect("Cannot start peer_manager");
let peer_connector = peer_manager.connector();
let (tx, notification_rx): (
Sender<PeerManagerNotification>,
mpsc::Receiver<PeerManagerNotification>,
) = channel();
peer_connector
.subscribe_sender(tx)
.expect("Unable to get subscriber");
let peer_ref = peer_connector
.add_peer_ref(
PeerAuthorizationToken::from_peer_id("test_peer"),
vec![
"inproc://bad_endpoint".to_string(),
"inproc://test".to_string(),
],
PeerAuthorizationToken::from_peer_id("my_id"),
)
.expect("Unable to add peer");
assert_eq!(
peer_ref.peer_id(),
&PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("test_peer"),
PeerAuthorizationToken::from_peer_id("my_id"),
)
);
let timeout = Duration::from_secs(60);
let notification = notification_rx
.recv_timeout(timeout)
.expect("Unable to get new notifications");
assert!(
notification
== PeerManagerNotification::Connected {
peer: PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("test_peer"),
PeerAuthorizationToken::from_peer_id("my_id"),
)
}
);
peer_manager.signal_shutdown();
cm.signal_shutdown();
peer_manager
.wait_for_shutdown()
.expect("Unable to shutdown peer manager");
cm.wait_for_shutdown()
.expect("Unable to shutdown connection manager");
mesh.signal_shutdown();
mesh.wait_for_shutdown().expect("Unable to shutdown mesh");
}
#[test]
fn test_peer_manager_add_peer_multiple_times() {
let mut transport = Box::new(InprocTransport::default());
let mut listener = transport.listen("inproc://test").unwrap();
thread::spawn(move || {
listener.accept().unwrap();
});
let mut mesh = Mesh::new(512, 128);
let mut cm = ConnectionManager::builder()
.with_authorizer(Box::new(NoopAuthorizer::new("test_peer")))
.with_matrix_life_cycle(mesh.get_life_cycle())
.with_matrix_sender(mesh.get_sender())
.with_transport(transport.clone())
.start()
.expect("Unable to start Connection Manager");
let connector = cm.connector();
let mut peer_manager = PeerManager::builder()
.with_connector(connector)
.with_retry_interval(1)
.with_identity("my_id".to_string())
.with_strict_ref_counts(true)
.start()
.expect("Cannot start peer_manager");
let peer_connector = peer_manager.connector();
let (tx, notification_rx): (
Sender<PeerManagerNotification>,
mpsc::Receiver<PeerManagerNotification>,
) = channel();
peer_connector
.subscribe_sender(tx)
.expect("Unable to get subscriber");
let peer_ref = peer_connector
.add_peer_ref(
PeerAuthorizationToken::from_peer_id("test_peer"),
vec!["inproc://test".to_string()],
PeerAuthorizationToken::from_peer_id("my_id"),
)
.expect("Unable to add peer");
assert_eq!(
peer_ref.peer_id(),
&PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("test_peer"),
PeerAuthorizationToken::from_peer_id("my_id"),
)
);
let timeout = Duration::from_secs(60);
let notification = notification_rx
.recv_timeout(timeout)
.expect("Unable to get new notifications");
assert!(
notification
== PeerManagerNotification::Connected {
peer: PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("test_peer"),
PeerAuthorizationToken::from_peer_id("my_id"),
)
}
);
let peer_ref = peer_connector
.add_peer_ref(
PeerAuthorizationToken::from_peer_id("test_peer"),
vec!["inproc://test".to_string()],
PeerAuthorizationToken::from_peer_id("my_id"),
)
.expect("Unable to add peer");
assert_eq!(
peer_ref.peer_id(),
&PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("test_peer"),
PeerAuthorizationToken::from_peer_id("my_id"),
)
);
peer_manager.signal_shutdown();
cm.signal_shutdown();
peer_manager
.wait_for_shutdown()
.expect("Unable to shutdown peer manager");
cm.wait_for_shutdown()
.expect("Unable to shutdown connection manager");
mesh.signal_shutdown();
mesh.wait_for_shutdown().expect("Unable to shutdown mesh");
}
#[test]
fn test_peer_manager_list_peer() {
let mut transport = Box::new(InprocTransport::default());
let mut listener = transport.listen("inproc://test").unwrap();
thread::spawn(move || {
listener.accept().unwrap();
});
let mut listener = transport.listen("inproc://test_2").unwrap();
thread::spawn(move || {
listener.accept().unwrap();
});
let mut mesh = Mesh::new(512, 128);
let mut cm = ConnectionManager::builder()
.with_authorizer(Box::new(NoopAuthorizer::new_multiple(&[
"test_peer",
"next_peer",
])))
.with_matrix_life_cycle(mesh.get_life_cycle())
.with_matrix_sender(mesh.get_sender())
.with_transport(transport.clone())
.start()
.expect("Unable to start Connection Manager");
let connector = cm.connector();
let mut peer_manager = PeerManager::builder()
.with_connector(connector)
.with_retry_interval(1)
.with_identity("my_id".to_string())
.with_strict_ref_counts(true)
.start()
.expect("Cannot start peer_manager");
let peer_connector = peer_manager.connector();
let (tx, notification_rx): (
Sender<PeerManagerNotification>,
mpsc::Receiver<PeerManagerNotification>,
) = channel();
peer_connector
.subscribe_sender(tx)
.expect("Unable to get subscriber");
let peer_ref_1 = peer_connector
.add_peer_ref(
PeerAuthorizationToken::from_peer_id("test_peer"),
vec!["inproc://test".to_string()],
PeerAuthorizationToken::from_peer_id("my_id"),
)
.expect("Unable to add peer");
assert_eq!(
peer_ref_1.peer_id(),
&PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("test_peer"),
PeerAuthorizationToken::from_peer_id("my_id"),
)
);
let timeout = Duration::from_secs(60);
let notification = notification_rx
.recv_timeout(timeout)
.expect("Unable to get new notifications");
assert!(
notification
== PeerManagerNotification::Connected {
peer: PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("test_peer"),
PeerAuthorizationToken::from_peer_id("my_id"),
)
}
);
let peer_ref_2 = peer_connector
.add_peer_ref(
PeerAuthorizationToken::from_peer_id("next_peer"),
vec!["inproc://test_2".to_string()],
PeerAuthorizationToken::from_peer_id("my_id"),
)
.expect("Unable to add peer");
assert_eq!(
peer_ref_2.peer_id(),
&PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("next_peer"),
PeerAuthorizationToken::from_peer_id("my_id"),
)
);
let notification = notification_rx
.recv_timeout(timeout)
.expect("Unable to get new notifications");
assert!(
notification
== PeerManagerNotification::Connected {
peer: PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("next_peer"),
PeerAuthorizationToken::from_peer_id("my_id"),
)
}
);
let mut peer_list = peer_connector
.list_peers()
.expect("Unable to get peer list");
peer_list.sort();
assert_eq!(
peer_list,
vec![
PeerAuthorizationToken::from_peer_id("next_peer"),
PeerAuthorizationToken::from_peer_id("test_peer")
]
);
peer_manager.signal_shutdown();
cm.signal_shutdown();
peer_manager
.wait_for_shutdown()
.expect("Unable to shutdown peer manager");
cm.wait_for_shutdown()
.expect("Unable to shutdown connection manager");
mesh.signal_shutdown();
mesh.wait_for_shutdown().expect("Unable to shutdown mesh");
}
#[test]
fn test_peer_manager_connection_ids() {
let mut transport = Box::new(InprocTransport::default());
let mut listener = transport.listen("inproc://test").unwrap();
thread::spawn(move || {
listener.accept().unwrap();
});
let mut listener = transport.listen("inproc://test_2").unwrap();
thread::spawn(move || {
listener.accept().unwrap();
});
let mut mesh = Mesh::new(512, 128);
let mut cm = ConnectionManager::builder()
.with_authorizer(Box::new(NoopAuthorizer::new_multiple(&[
"test_peer",
"next_peer",
])))
.with_matrix_life_cycle(mesh.get_life_cycle())
.with_matrix_sender(mesh.get_sender())
.with_transport(transport.clone())
.start()
.expect("Unable to start Connection Manager");
let connector = cm.connector();
let mut peer_manager = PeerManager::builder()
.with_connector(connector)
.with_retry_interval(1)
.with_identity("my_id".to_string())
.with_strict_ref_counts(true)
.start()
.expect("Cannot start peer_manager");
let peer_connector = peer_manager.connector();
let (tx, notification_rx): (
Sender<PeerManagerNotification>,
mpsc::Receiver<PeerManagerNotification>,
) = channel();
peer_connector
.subscribe_sender(tx)
.expect("Unable to get subscriber");
let peer_ref_1 = peer_connector
.add_peer_ref(
PeerAuthorizationToken::from_peer_id("test_peer"),
vec!["inproc://test".to_string()],
PeerAuthorizationToken::from_peer_id("my_id"),
)
.expect("Unable to add peer");
assert_eq!(
peer_ref_1.peer_id(),
&PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("test_peer"),
PeerAuthorizationToken::from_peer_id("my_id"),
)
);
let timeout = Duration::from_secs(60);
let notification = notification_rx
.recv_timeout(timeout)
.expect("Unable to get new notifications");
assert!(
notification
== PeerManagerNotification::Connected {
peer: PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("test_peer"),
PeerAuthorizationToken::from_peer_id("my_id"),
)
}
);
let peer_ref_2 = peer_connector
.add_peer_ref(
PeerAuthorizationToken::from_peer_id("next_peer"),
vec!["inproc://test_2".to_string()],
PeerAuthorizationToken::from_peer_id("my_id"),
)
.expect("Unable to add peer");
assert_eq!(
peer_ref_2.peer_id(),
&PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("next_peer"),
PeerAuthorizationToken::from_peer_id("my_id"),
)
);
let notification = notification_rx
.recv_timeout(timeout)
.expect("Unable to get new notifications");
assert!(
notification
== PeerManagerNotification::Connected {
peer: PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("next_peer"),
PeerAuthorizationToken::from_peer_id("my_id"),
)
}
);
let peers = peer_connector
.connection_ids()
.expect("Unable to get peer list");
assert!(peers
.get_by_key(&PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("next_peer"),
PeerAuthorizationToken::from_peer_id("my_id"),
))
.is_some());
assert!(peers
.get_by_key(&PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("test_peer"),
PeerAuthorizationToken::from_peer_id("my_id"),
))
.is_some());
peer_manager.signal_shutdown();
cm.signal_shutdown();
peer_manager
.wait_for_shutdown()
.expect("Unable to shutdown peer manager");
cm.wait_for_shutdown()
.expect("Unable to shutdown connection manager");
mesh.signal_shutdown();
mesh.wait_for_shutdown().expect("Unable to shutdown mesh");
}
#[test]
fn test_peer_manager_drop_peer_ref() {
let mut transport = Box::new(InprocTransport::default());
let mut listener = transport.listen("inproc://test").unwrap();
thread::spawn(move || {
listener.accept().unwrap();
});
let mut mesh = Mesh::new(512, 128);
let mut cm = ConnectionManager::builder()
.with_authorizer(Box::new(NoopAuthorizer::new("test_peer")))
.with_matrix_life_cycle(mesh.get_life_cycle())
.with_matrix_sender(mesh.get_sender())
.with_transport(transport.clone())
.start()
.expect("Unable to start Connection Manager");
let connector = cm.connector();
let mut peer_manager = PeerManager::builder()
.with_connector(connector)
.with_retry_interval(1)
.with_identity("my_id".to_string())
.with_strict_ref_counts(true)
.start()
.expect("Cannot start peer_manager");
let peer_connector = peer_manager.connector();
{
let (tx, notification_rx): (
Sender<PeerManagerNotification>,
mpsc::Receiver<PeerManagerNotification>,
) = channel();
peer_connector
.subscribe_sender(tx)
.expect("Unable to get subscriber");
let peer_ref = peer_connector
.add_peer_ref(
PeerAuthorizationToken::from_peer_id("test_peer"),
vec!["inproc://test".to_string()],
PeerAuthorizationToken::from_peer_id("my_id"),
)
.expect("Unable to add peer");
assert_eq!(
peer_ref.peer_id(),
&PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("test_peer"),
PeerAuthorizationToken::from_peer_id("my_id"),
)
);
let timeout = Duration::from_secs(60);
let notification = notification_rx
.recv_timeout(timeout)
.expect("Unable to get new notifications");
assert!(
notification
== PeerManagerNotification::Connected {
peer: PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("test_peer"),
PeerAuthorizationToken::from_peer_id("my_id"),
)
}
);
let peer_list = peer_connector
.list_peers()
.expect("Unable to get peer list");
assert_eq!(
peer_list,
vec![PeerAuthorizationToken::from_peer_id("test_peer")]
);
}
let peer_list = peer_connector
.list_peers()
.expect("Unable to get peer list");
assert_eq!(peer_list, Vec::<PeerAuthorizationToken>::new());
peer_manager.signal_shutdown();
cm.signal_shutdown();
peer_manager
.wait_for_shutdown()
.expect("Unable to shutdown peer manager");
cm.wait_for_shutdown()
.expect("Unable to shutdown connection manager");
mesh.signal_shutdown();
mesh.wait_for_shutdown().expect("Unable to shutdown mesh");
}
#[test]
fn test_peer_manager_drop_endpoint_peer_ref() {
let mut transport = Box::new(InprocTransport::default());
let mut listener = transport.listen("inproc://test").unwrap();
thread::spawn(move || {
listener.accept().unwrap();
});
let mut mesh = Mesh::new(512, 128);
let mut cm = ConnectionManager::builder()
.with_authorizer(Box::new(NoopAuthorizer::new("test_peer")))
.with_matrix_life_cycle(mesh.get_life_cycle())
.with_matrix_sender(mesh.get_sender())
.with_transport(transport.clone())
.start()
.expect("Unable to start Connection Manager");
let connector = cm.connector();
let mut peer_manager = PeerManager::builder()
.with_connector(connector)
.with_retry_interval(1)
.with_identity("my_id".to_string())
.with_strict_ref_counts(true)
.start()
.expect("Cannot start peer_manager");
let peer_connector = peer_manager.connector();
{
let (tx, notification_rx): (
Sender<PeerManagerNotification>,
mpsc::Receiver<PeerManagerNotification>,
) = channel();
peer_connector
.subscribe_sender(tx)
.expect("Unable to get subscriber");
let endpoint_peer_ref = peer_connector
.add_unidentified_peer(
"inproc://test".to_string(),
PeerAuthorizationToken::from_peer_id("my_id"),
)
.expect("Unable to add peer by endpoint");
assert_eq!(endpoint_peer_ref.endpoint(), "inproc://test".to_string());
let timeout = Duration::from_secs(60);
let notification = notification_rx
.recv_timeout(timeout)
.expect("Unable to get new notifications");
assert!(
notification
== PeerManagerNotification::Connected {
peer: PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("test_peer"),
PeerAuthorizationToken::from_peer_id("my_id"),
)
}
);
let peer_ref = peer_connector
.add_peer_ref(
PeerAuthorizationToken::from_peer_id("test_peer"),
vec!["inproc://test".to_string()],
PeerAuthorizationToken::from_peer_id("my_id"),
)
.expect("Unable to add peer");
assert_eq!(
peer_ref.peer_id(),
&PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("test_peer"),
PeerAuthorizationToken::from_peer_id("my_id"),
)
);
let peer_list = peer_connector
.list_peers()
.expect("Unable to get peer list");
assert_eq!(
peer_list,
vec![PeerAuthorizationToken::from_peer_id("test_peer")]
);
drop(peer_ref);
let peer_list = peer_connector
.list_peers()
.expect("Unable to get peer list");
assert_eq!(
peer_list,
vec![PeerAuthorizationToken::from_peer_id("test_peer")]
);
}
let peer_list = peer_connector
.list_peers()
.expect("Unable to get peer list");
assert_eq!(peer_list, Vec::<PeerAuthorizationToken>::new());
peer_manager.signal_shutdown();
cm.signal_shutdown();
peer_manager
.wait_for_shutdown()
.expect("Unable to shutdown peer manager");
cm.wait_for_shutdown()
.expect("Unable to shutdown connection manager");
mesh.signal_shutdown();
mesh.wait_for_shutdown().expect("Unable to shutdown mesh");
}
#[test]
fn test_peer_manager_update_active_endpoint() {
let mut transport = Box::new(RawTransport::default());
let mut listener = transport
.listen("tcp://localhost:0")
.expect("Cannot listen for connections");
let endpoint = listener.endpoint();
let mut mesh1 = Mesh::new(512, 128);
let mut mesh2 = Mesh::new(512, 128);
let mut listener2 = transport
.listen("tcp://localhost:0")
.expect("Cannot listen for connections");
let endpoint2 = listener2.endpoint();
let (tx, rx) = mpsc::channel();
let jh = thread::spawn(move || {
let conn = listener.accept().expect("Cannot accept connection");
mesh2
.add(conn, "test_id".to_string())
.expect("Cannot add connection to mesh");
let envelope = mesh2.recv().expect("Cannot receive message");
let heartbeat: NetworkMessage = Message::parse_from_bytes(&envelope.payload())
.expect("Cannot parse NetworkMessage");
assert_eq!(
heartbeat.get_message_type(),
NetworkMessageType::NETWORK_HEARTBEAT
);
let mut connection = mesh2
.remove("test_id")
.expect("Cannot remove connection from mesh");
connection
.disconnect()
.expect("Connection failed to disconnect");
drop(listener);
let conn = listener2.accept().expect("Unable to accept connection");
mesh2
.add(conn, "test_id".to_string())
.expect("Cannot add connection to mesh");
rx.recv().unwrap();
mesh2.signal_shutdown();
mesh2.wait_for_shutdown().expect("Unable to shutdown mesh");
});
let mut cm = ConnectionManager::builder()
.with_authorizer(Box::new(NoopAuthorizer::new_multiple(&[
"test_peer",
"test_peer",
])))
.with_matrix_life_cycle(mesh1.get_life_cycle())
.with_matrix_sender(mesh1.get_sender())
.with_transport(transport)
.with_heartbeat_interval(1)
.start()
.expect("Unable to start Connection Manager");
let connector = cm.connector();
let mut peer_manager = PeerManager::builder()
.with_connector(connector)
.with_retry_interval(1)
.with_max_retry_attempts(1)
.with_identity("my_id".to_string())
.with_strict_ref_counts(true)
.start()
.expect("Cannot start peer_manager");
let peer_connector = peer_manager.connector();
let (notification_tx, notification_rx): (
Sender<PeerManagerNotification>,
mpsc::Receiver<PeerManagerNotification>,
) = channel();
peer_connector
.subscribe_sender(notification_tx)
.expect("Unable to get subscriber");
let peer_ref = peer_connector
.add_peer_ref(
PeerAuthorizationToken::from_peer_id("test_peer"),
vec![endpoint, endpoint2],
PeerAuthorizationToken::from_peer_id("my_id"),
)
.expect("Unable to add peer");
assert_eq!(
peer_ref.peer_id(),
&PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("test_peer"),
PeerAuthorizationToken::from_peer_id("my_id"),
)
);
let timeout = Duration::from_secs(60);
let notification = notification_rx
.recv_timeout(timeout)
.expect("Unable to get new notifications");
assert!(
notification
== PeerManagerNotification::Connected {
peer: PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("test_peer"),
PeerAuthorizationToken::from_peer_id("my_id"),
)
}
);
let disconnected_notification = notification_rx
.recv_timeout(timeout)
.expect("Cannot get message from subscriber");
assert!(
disconnected_notification
== PeerManagerNotification::Disconnected {
peer: PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("test_peer"),
PeerAuthorizationToken::from_peer_id("my_id"),
)
}
);
let connected_notification = notification_rx
.recv_timeout(timeout)
.expect("Cannot get message from subscriber");
assert!(
connected_notification
== PeerManagerNotification::Connected {
peer: PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("test_peer"),
PeerAuthorizationToken::from_peer_id("my_id"),
)
}
);
tx.send(()).unwrap();
jh.join().unwrap();
peer_manager.signal_shutdown();
cm.signal_shutdown();
peer_manager
.wait_for_shutdown()
.expect("Unable to shutdown peer manager");
cm.wait_for_shutdown()
.expect("Unable to shutdown connection manager");
mesh1.signal_shutdown();
mesh1.wait_for_shutdown().expect("Unable to shutdown mesh");
}
#[test]
fn test_peer_manager_shutdown() {
let transport = Box::new(InprocTransport::default());
let mut mesh = Mesh::new(512, 128);
let mut cm = ConnectionManager::builder()
.with_authorizer(Box::new(NoopAuthorizer::new("test_peer")))
.with_matrix_life_cycle(mesh.get_life_cycle())
.with_matrix_sender(mesh.get_sender())
.with_transport(transport.clone())
.start()
.expect("Unable to start Connection Manager");
let connector = cm.connector();
let mut peer_manager = PeerManager::builder()
.with_connector(connector)
.with_retry_interval(1)
.with_identity("my_id".to_string())
.with_strict_ref_counts(true)
.start()
.expect("Cannot start peer_manager");
peer_manager.signal_shutdown();
cm.signal_shutdown();
peer_manager
.wait_for_shutdown()
.expect("Unable to shutdown peer manager");
cm.wait_for_shutdown()
.expect("Unable to shutdown connection manager");
mesh.signal_shutdown();
mesh.wait_for_shutdown().expect("Unable to shutdown mesh");
}
#[test]
fn test_incoming_peer_request() {
let mut transport = InprocTransport::default();
let mut listener = transport.listen("inproc://test").unwrap();
let mut mesh = Mesh::new(512, 128);
let mut cm = ConnectionManager::builder()
.with_authorizer(Box::new(NoopAuthorizer::new("test_peer")))
.with_matrix_life_cycle(mesh.get_life_cycle())
.with_matrix_sender(mesh.get_sender())
.with_transport(Box::new(transport.clone()))
.start()
.expect("Unable to start Connection Manager");
let connector = cm.connector();
let recv_connector = connector.clone();
let jh = thread::spawn(move || {
let connection = listener.accept().unwrap();
let (subs_tx, subs_rx): (mpsc::Sender<ConnectionManagerNotification>, _) =
mpsc::channel();
let _ = recv_connector
.subscribe(subs_tx)
.expect("unable to get subscriber");
recv_connector.add_inbound_connection(connection).unwrap();
subs_rx.recv().expect("unable to get notification");
});
let mut peer_manager = PeerManager::builder()
.with_connector(connector)
.with_retry_interval(1)
.with_identity("my_id".to_string())
.with_strict_ref_counts(true)
.start()
.expect("Cannot start peer_manager");
let peer_connector = peer_manager.connector();
let _conn = transport.connect("inproc://test").unwrap();
jh.join().unwrap();
assert!(peer_connector.list_peers().unwrap().is_empty());
assert_eq!(
vec![PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("test_peer"),
PeerAuthorizationToken::from_peer_id("my_id"),
)],
peer_connector.list_unreferenced_peers().unwrap()
);
let peer_ref = peer_connector
.add_peer_ref(
PeerAuthorizationToken::from_peer_id("test_peer"),
vec!["inproc://test".to_string()],
PeerAuthorizationToken::from_peer_id("my_id"),
)
.expect("Unable to add peer");
assert_eq!(
peer_ref.peer_id(),
&PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("test_peer"),
PeerAuthorizationToken::from_peer_id("my_id"),
)
);
let peer_list = peer_connector
.list_peers()
.expect("Unable to get peer list");
assert_eq!(
peer_list,
vec![PeerAuthorizationToken::from_peer_id("test_peer")]
);
peer_manager.signal_shutdown();
cm.signal_shutdown();
peer_manager
.wait_for_shutdown()
.expect("Unable to shutdown peer manager");
cm.wait_for_shutdown()
.expect("Unable to shutdown connection manager");
mesh.signal_shutdown();
mesh.wait_for_shutdown().expect("Unable to shutdown mesh");
}
#[test]
fn test_peer_manager_no_builder() {
let transport = Box::new(InprocTransport::default());
let mut mesh = Mesh::new(512, 128);
let mut cm = ConnectionManager::builder()
.with_authorizer(Box::new(NoopAuthorizer::new("test_peer")))
.with_matrix_life_cycle(mesh.get_life_cycle())
.with_matrix_sender(mesh.get_sender())
.with_transport(transport.clone())
.start()
.expect("Unable to start Connection Manager");
let connector = cm.connector();
#[allow(deprecated)]
let mut peer_manager =
PeerManager::new(connector, Some(1), Some(1), "my_id".to_string(), true);
#[allow(deprecated)]
peer_manager.start().expect("Cannot start peer_manager");
peer_manager.signal_shutdown();
cm.signal_shutdown();
peer_manager
.wait_for_shutdown()
.expect("Unable to shutdown peer manager");
cm.wait_for_shutdown()
.expect("Unable to shutdown connection manager");
mesh.signal_shutdown();
mesh.wait_for_shutdown().expect("Unable to shutdown mesh");
}
#[test]
fn test_peer_manager_notifciation_convert() {
let mut transport = Box::new(InprocTransport::default());
let mut listener = transport.listen("inproc://test").unwrap();
thread::spawn(move || {
listener.accept().unwrap();
});
let mut mesh = Mesh::new(512, 128);
let mut cm = ConnectionManager::builder()
.with_authorizer(Box::new(NoopAuthorizer::new("test_peer")))
.with_matrix_life_cycle(mesh.get_life_cycle())
.with_matrix_sender(mesh.get_sender())
.with_transport(transport.clone())
.start()
.expect("Unable to start Connection Manager");
let connector = cm.connector();
let mut peer_manager = PeerManager::builder()
.with_connector(connector)
.with_retry_interval(1)
.with_identity("my_id".to_string())
.with_strict_ref_counts(true)
.start()
.expect("Cannot start peer_manager");
let peer_connector = peer_manager.connector();
let (tx, notification_rx): (Sender<TestEnum>, mpsc::Receiver<TestEnum>) = channel();
peer_connector
.subscribe_sender(tx)
.expect("Unable to get subscriber");
let peer_ref = peer_connector
.add_peer_ref(
PeerAuthorizationToken::from_peer_id("test_peer"),
vec!["inproc://test".to_string()],
PeerAuthorizationToken::from_peer_id("my_id"),
)
.expect("Unable to add peer");
assert_eq!(
peer_ref.peer_id(),
&PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("test_peer"),
PeerAuthorizationToken::from_peer_id("my_id"),
)
);
let timeout = Duration::from_secs(60);
let notification = notification_rx
.recv_timeout(timeout)
.expect("Unable to get new notifications");
assert!(
notification
== TestEnum::Notification(PeerManagerNotification::Connected {
peer: PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("test_peer"),
PeerAuthorizationToken::from_peer_id("my_id"),
)
})
);
peer_manager.signal_shutdown();
cm.signal_shutdown();
peer_manager
.wait_for_shutdown()
.expect("Unable to shutdown peer manager");
cm.wait_for_shutdown()
.expect("Unable to shutdown connection manager");
mesh.signal_shutdown();
mesh.wait_for_shutdown().expect("Unable to shutdown mesh");
}
#[derive(PartialEq)]
enum TestEnum {
Notification(PeerManagerNotification),
}
impl From<PeerManagerNotification> for TestEnum {
fn from(notification: PeerManagerNotification) -> Self {
TestEnum::Notification(notification)
}
}
struct NoopAuthorizer {
ids: std::cell::RefCell<VecDeque<String>>,
}
impl NoopAuthorizer {
fn new(id: &str) -> Self {
let mut ids = VecDeque::new();
ids.push_back(id.into());
Self {
ids: std::cell::RefCell::new(ids),
}
}
fn new_multiple(ids: &[&str]) -> Self {
Self {
ids: std::cell::RefCell::new(
ids.iter().map(std::string::ToString::to_string).collect(),
),
}
}
}
impl Authorizer for NoopAuthorizer {
fn authorize_connection(
&self,
connection_id: String,
connection: Box<dyn Connection>,
callback: Box<
dyn Fn(AuthorizationResult) -> Result<(), Box<dyn std::error::Error>> + Send,
>,
_expected_authorization: Option<ConnectionAuthorizationType>,
local_authorization: Option<ConnectionAuthorizationType>,
) -> Result<(), AuthorizerError> {
let identity = self
.ids
.borrow_mut()
.pop_front()
.expect("No more identities to provide");
(*callback)(AuthorizationResult::Authorized {
connection_id,
connection,
identity: ConnectionAuthorizationType::Trust {
identity: identity.clone(),
},
expected_authorization: ConnectionAuthorizationType::Trust { identity },
local_authorization: local_authorization.unwrap_or(
ConnectionAuthorizationType::Trust {
identity: "my_id".into(),
},
),
})
.map_err(|err| AuthorizerError(format!("Unable to return result: {}", err)))
}
}
}