mod error;
mod notification;
mod pacemaker;
use std;
use std::cmp::min;
use std::collections::HashMap;
use std::sync::mpsc::{channel, Sender};
use std::thread;
use std::time::Instant;
use uuid::Uuid;
pub use error::ConnectionManagerError;
pub use notification::{ConnectionManagerNotification, NotificationIter};
use pacemaker::Pacemaker;
use protobuf::Message;
use crate::matrix::{MatrixLifeCycle, MatrixSender};
use crate::protos::network::{NetworkHeartbeat, NetworkMessage, NetworkMessageType};
use crate::transport::{Connection, Transport};
const DEFAULT_HEARTBEAT_INTERVAL: u64 = 10;
const INITIAL_RETRY_FREQUENCY: u64 = 10;
const DEFAULT_MAXIMUM_RETRY_FREQUENCY: u64 = 300;
pub type SubscriberId = usize;
type Subscriber =
Box<dyn Fn(ConnectionManagerNotification) -> Result<(), Box<dyn std::error::Error>> + Send>;
struct SubscriberMap {
subscribers: HashMap<SubscriberId, Subscriber>,
next_id: SubscriberId,
}
impl SubscriberMap {
fn new() -> Self {
Self {
subscribers: HashMap::new(),
next_id: 0,
}
}
fn broadcast(&mut self, notification: ConnectionManagerNotification) {
let mut failures = vec![];
for (id, callback) in self.subscribers.iter() {
if let Err(err) = (*callback)(notification.clone()) {
failures.push(*id);
debug!("Dropping subscriber ({}): {}", id, err);
}
}
for id in failures {
self.subscribers.remove(&id);
}
}
fn add_subscriber(&mut self, subscriber: Subscriber) -> SubscriberId {
let subscriber_id = self.next_id;
self.next_id += 1;
self.subscribers.insert(subscriber_id, subscriber);
subscriber_id
}
fn remove_subscriber(&mut self, subscriber_id: SubscriberId) {
self.subscribers.remove(&subscriber_id);
}
}
enum CmMessage {
Shutdown,
Request(CmRequest),
SendHeartbeats,
}
enum CmRequest {
RequestOutboundConnection {
endpoint: String,
connection_id: String,
sender: Sender<Result<(), ConnectionManagerError>>,
},
RemoveConnection {
endpoint: String,
sender: Sender<Result<Option<String>, ConnectionManagerError>>,
},
ListConnections {
sender: Sender<Result<Vec<String>, ConnectionManagerError>>,
},
AddInboundConnection {
connection: Box<dyn Connection>,
sender: Sender<Result<(), ConnectionManagerError>>,
},
Subscribe {
sender: Sender<Result<SubscriberId, ConnectionManagerError>>,
callback: Subscriber,
},
Unsubscribe {
subscriber_id: SubscriberId,
sender: Sender<Result<(), ConnectionManagerError>>,
},
}
pub struct ConnectionManager<T: 'static, U: 'static>
where
T: MatrixLifeCycle,
U: MatrixSender,
{
pacemaker: Pacemaker,
connection_state: Option<ConnectionState<T, U>>,
join_handle: Option<thread::JoinHandle<()>>,
sender: Option<Sender<CmMessage>>,
shutdown_handle: Option<ShutdownHandle>,
}
impl<T, U> ConnectionManager<T, U>
where
T: MatrixLifeCycle,
U: MatrixSender,
{
pub fn new(
life_cycle: T,
matrix_sender: U,
transport: Box<dyn Transport + Send>,
heartbeat_interval: Option<u64>,
maximum_retry_frequency: Option<u64>,
) -> Self {
let heartbeat = heartbeat_interval.unwrap_or(DEFAULT_HEARTBEAT_INTERVAL);
let retry_frequency = maximum_retry_frequency.unwrap_or(DEFAULT_MAXIMUM_RETRY_FREQUENCY);
let connection_state = Some(ConnectionState::new(
life_cycle,
matrix_sender,
transport,
retry_frequency,
));
let pacemaker = Pacemaker::new(heartbeat);
Self {
pacemaker,
connection_state,
join_handle: None,
sender: None,
shutdown_handle: None,
}
}
pub fn start(&mut self) -> Result<Connector, ConnectionManagerError> {
let (sender, recv) = channel();
let mut state = self.connection_state.take().ok_or_else(|| {
ConnectionManagerError::StartUpError("Service has already started".into())
})?;
let join_handle = thread::Builder::new()
.name("Connection Manager".into())
.spawn(move || {
let mut subscribers = SubscriberMap::new();
loop {
match recv.recv() {
Ok(CmMessage::Shutdown) => break,
Ok(CmMessage::Request(req)) => {
handle_request(req, &mut state, &mut subscribers);
}
Ok(CmMessage::SendHeartbeats) => {
send_heartbeats(&mut state, &mut subscribers)
}
Err(_) => {
warn!("All senders have disconnected");
break;
}
}
}
})?;
self.pacemaker
.start(sender.clone(), || CmMessage::SendHeartbeats)?;
self.join_handle = Some(join_handle);
self.shutdown_handle = Some(ShutdownHandle {
sender: sender.clone(),
pacemaker_shutdown_handle: self.pacemaker.shutdown_handle().unwrap(),
});
self.sender = Some(sender.clone());
Ok(Connector { sender })
}
pub fn shutdown_handle(&self) -> Option<ShutdownHandle> {
self.shutdown_handle.clone()
}
pub fn await_shutdown(self) {
self.pacemaker.await_shutdown();
let join_handle = if let Some(jh) = self.join_handle {
jh
} else {
return;
};
if let Err(err) = join_handle.join() {
error!(
"Connection manager thread did not shutdown correctly: {:?}",
err
);
}
}
pub fn shutdown_and_wait(self) {
if let Some(sh) = self.shutdown_handle.clone() {
sh.shutdown();
} else {
return;
}
self.await_shutdown();
}
}
#[derive(Clone)]
pub struct Connector {
sender: Sender<CmMessage>,
}
impl Connector {
pub fn request_connection(
&self,
endpoint: &str,
id: &str,
) -> Result<(), ConnectionManagerError> {
let (sender, recv) = channel();
self.sender
.send(CmMessage::Request(CmRequest::RequestOutboundConnection {
sender,
endpoint: endpoint.to_string(),
connection_id: id.to_string(),
}))
.map_err(|_| {
ConnectionManagerError::SendMessageError(
"The connection manager is no longer running".into(),
)
})?;
recv.recv().map_err(|_| {
ConnectionManagerError::SendMessageError(
"The connection manager is no longer running".into(),
)
})?
}
pub fn remove_connection(
&self,
endpoint: &str,
) -> Result<Option<String>, ConnectionManagerError> {
let (sender, recv) = channel();
self.sender
.send(CmMessage::Request(CmRequest::RemoveConnection {
sender,
endpoint: endpoint.to_string(),
}))
.map_err(|_| {
ConnectionManagerError::SendMessageError(
"The connection manager is no longer running".into(),
)
})?;
recv.recv().map_err(|_| {
ConnectionManagerError::SendMessageError(
"The connection manager is no longer running".into(),
)
})?
}
pub fn subscription_iter(&self) -> Result<NotificationIter, ConnectionManagerError> {
let (send, recv) = channel();
self.subscribe(send)?;
Ok(NotificationIter { recv })
}
pub fn subscribe<T>(
&self,
subscriber: Sender<T>,
) -> Result<SubscriberId, ConnectionManagerError>
where
T: From<ConnectionManagerNotification> + Send + 'static,
{
let (sender, recv) = channel();
self.sender
.send(CmMessage::Request(CmRequest::Subscribe {
sender,
callback: Box::new(move |notification| {
subscriber.send(T::from(notification)).map_err(Box::from)
}),
}))
.map_err(|_| {
ConnectionManagerError::SendMessageError(
"The connection manager is no longer running".into(),
)
})?;
recv.recv().map_err(|_| {
ConnectionManagerError::SendMessageError(
"The connection manager is no longer running".into(),
)
})?
}
pub fn unsubscribe(&self, subscriber_id: SubscriberId) -> Result<(), ConnectionManagerError> {
let (sender, recv) = channel();
self.sender
.send(CmMessage::Request(CmRequest::Unsubscribe {
subscriber_id,
sender,
}))
.map_err(|_| {
ConnectionManagerError::SendMessageError(
"The connection manager is no longer running".into(),
)
})?;
recv.recv().map_err(|_| {
ConnectionManagerError::SendMessageError(
"The connection manager is no longer running".into(),
)
})?
}
pub fn list_connections(&self) -> Result<Vec<String>, ConnectionManagerError> {
let (sender, recv) = channel();
self.sender
.send(CmMessage::Request(CmRequest::ListConnections { sender }))
.map_err(|_| {
ConnectionManagerError::SendMessageError(
"The connection manager is no longer running".into(),
)
})?;
recv.recv().map_err(|_| {
ConnectionManagerError::SendMessageError(
"The connection manager is no longer running".into(),
)
})?
}
pub fn add_inbound_connection(
&self,
connection: Box<dyn Connection>,
) -> Result<(), ConnectionManagerError> {
let (sender, recv) = channel();
self.sender
.send(CmMessage::Request(CmRequest::AddInboundConnection {
connection,
sender,
}))
.map_err(|_| {
ConnectionManagerError::SendMessageError(
"The connection manager is no longer running".into(),
)
})?;
recv.recv().map_err(|_| {
ConnectionManagerError::SendMessageError(
"The connection manager is no longer running".into(),
)
})?
}
}
#[derive(Clone)]
pub struct ShutdownHandle {
sender: Sender<CmMessage>,
pacemaker_shutdown_handle: pacemaker::ShutdownHandle,
}
impl ShutdownHandle {
pub fn shutdown(self) {
self.pacemaker_shutdown_handle.shutdown();
if self.sender.send(CmMessage::Shutdown).is_err() {
warn!("Connection manager is no longer running");
}
}
}
#[derive(Clone, Debug)]
enum ConnectionMetadata {
Outbound {
id: String,
outbound: OutboundConnection,
},
Inbound {
id: String,
inbound: InboundConnection,
},
}
impl ConnectionMetadata {
fn is_outbound(&self) -> bool {
match self {
ConnectionMetadata::Outbound { .. } => true,
_ => false,
}
}
fn id(&self) -> &str {
match self {
ConnectionMetadata::Outbound { id, .. } => id,
ConnectionMetadata::Inbound { id, .. } => id,
}
}
fn endpoint(&self) -> &str {
match self {
ConnectionMetadata::Outbound { outbound, .. } => &outbound.endpoint,
ConnectionMetadata::Inbound { inbound, .. } => &inbound.endpoint,
}
}
}
#[derive(Clone, Debug)]
struct OutboundConnection {
endpoint: String,
reconnecting: bool,
retry_frequency: u64,
last_connection_attempt: Instant,
reconnection_attempts: u64,
}
#[derive(Clone, Debug)]
struct InboundConnection {
endpoint: String,
disconnected: bool,
}
struct ConnectionState<T, U>
where
T: MatrixLifeCycle,
U: MatrixSender,
{
connections: HashMap<String, ConnectionMetadata>,
life_cycle: T,
matrix_sender: U,
transport: Box<dyn Transport>,
maximum_retry_frequency: u64,
}
impl<T, U> ConnectionState<T, U>
where
T: MatrixLifeCycle,
U: MatrixSender,
{
fn new(
life_cycle: T,
matrix_sender: U,
transport: Box<dyn Transport + Send>,
maximum_retry_frequency: u64,
) -> Self {
Self {
life_cycle,
matrix_sender,
transport,
connections: HashMap::new(),
maximum_retry_frequency,
}
}
fn add_inbound_connection(
&mut self,
connection: Box<dyn Connection>,
) -> Result<String, ConnectionManagerError> {
let endpoint = connection.remote_endpoint();
let id = Uuid::new_v4().to_string();
self.life_cycle
.add(connection, id.clone())
.map_err(|err| ConnectionManagerError::ConnectionCreationError(format!("{:?}", err)))?;
self.connections.insert(
endpoint.clone(),
ConnectionMetadata::Inbound {
id: id.clone(),
inbound: InboundConnection {
endpoint,
disconnected: false,
},
},
);
Ok(id)
}
fn add_connection(&mut self, endpoint: &str, id: String) -> Result<(), ConnectionManagerError> {
if self.connections.get_mut(endpoint).is_some() {
return Ok(());
} else {
let connection = self.transport.connect(endpoint).map_err(|err| {
ConnectionManagerError::ConnectionCreationError(format!("{:?}", err))
})?;
self.life_cycle
.add(connection, id.to_string())
.map_err(|err| {
ConnectionManagerError::ConnectionCreationError(format!("{:?}", err))
})?;
self.connections.insert(
endpoint.to_string(),
ConnectionMetadata::Outbound {
id,
outbound: OutboundConnection {
endpoint: endpoint.to_string(),
reconnecting: false,
retry_frequency: INITIAL_RETRY_FREQUENCY,
last_connection_attempt: Instant::now(),
reconnection_attempts: 0,
},
},
);
};
Ok(())
}
fn remove_connection(
&mut self,
endpoint: &str,
) -> Result<Option<ConnectionMetadata>, ConnectionManagerError> {
let meta = if let Some(meta) = self.connections.get_mut(endpoint) {
meta.clone()
} else {
return Ok(None);
};
self.connections.remove(endpoint);
self.life_cycle.remove(meta.id()).map_err(|err| {
ConnectionManagerError::ConnectionRemovalError(format!(
"Cannot remove connection {} from life cycle: {}",
endpoint, err
))
})?;
Ok(Some(meta))
}
fn reconnect(
&mut self,
endpoint: &str,
subscribers: &mut SubscriberMap,
) -> Result<(), ConnectionManagerError> {
let mut meta = if let Some(meta) = self.connections.get_mut(endpoint) {
meta.clone()
} else {
return Err(ConnectionManagerError::ConnectionRemovalError(
"Cannot reconnect to endpoint without metadata".into(),
));
};
if !meta.is_outbound() {
return Ok(());
}
if let Ok(connection) = self.transport.connect(endpoint) {
self.life_cycle.remove(meta.id()).map_err(|err| {
ConnectionManagerError::ConnectionRemovalError(format!(
"Cannot remove connection {} from life cycle: {}",
endpoint, err
))
})?;
self.life_cycle
.add(connection, meta.id().to_string())
.map_err(|err| {
ConnectionManagerError::ConnectionReconnectError(format!("{:?}", err))
})?;
match meta {
ConnectionMetadata::Outbound {
ref mut outbound, ..
} => {
outbound.reconnecting = false;
outbound.retry_frequency = INITIAL_RETRY_FREQUENCY;
outbound.last_connection_attempt = Instant::now();
outbound.reconnection_attempts = 0;
}
_ => unreachable!(),
}
self.connections.insert(endpoint.to_string(), meta);
subscribers.broadcast(ConnectionManagerNotification::Connected {
endpoint: endpoint.to_string(),
});
} else {
let reconnection_attempts = match meta {
ConnectionMetadata::Outbound {
ref mut outbound, ..
} => {
outbound.reconnecting = true;
outbound.retry_frequency =
min(outbound.retry_frequency * 2, self.maximum_retry_frequency);
outbound.last_connection_attempt = Instant::now();
outbound.reconnection_attempts += 1;
outbound.reconnection_attempts
}
_ => unreachable!(),
};
self.connections.insert(endpoint.to_string(), meta);
subscribers.broadcast(ConnectionManagerNotification::ReconnectionFailed {
endpoint: endpoint.to_string(),
attempts: reconnection_attempts,
});
}
Ok(())
}
fn connection_metadata(&self) -> &HashMap<String, ConnectionMetadata> {
&self.connections
}
fn connection_metadata_mut(&mut self) -> &mut HashMap<String, ConnectionMetadata> {
&mut self.connections
}
fn matrix_sender(&self) -> U {
self.matrix_sender.clone()
}
}
fn handle_request<T: MatrixLifeCycle, U: MatrixSender>(
req: CmRequest,
state: &mut ConnectionState<T, U>,
subscribers: &mut SubscriberMap,
) {
match req {
CmRequest::RequestOutboundConnection {
endpoint,
sender,
connection_id,
} => {
if sender
.send(state.add_connection(&endpoint, connection_id))
.is_err()
{
warn!("connector dropped before receiving result of add connection");
}
}
CmRequest::RemoveConnection { endpoint, sender } => {
let response = state
.remove_connection(&endpoint)
.map(|meta_opt| meta_opt.map(|meta| meta.endpoint().to_owned()));
if sender.send(response).is_err() {
warn!("connector dropped before receiving result of remove connection");
}
}
CmRequest::ListConnections { sender } => {
if sender
.send(Ok(state
.connection_metadata()
.iter()
.map(|(key, _)| key.to_string())
.collect()))
.is_err()
{
warn!("connector dropped before receiving result of list connections");
}
}
CmRequest::AddInboundConnection { sender, connection } => {
let endpoint = connection.remote_endpoint();
let res = state
.add_inbound_connection(connection)
.and_then(|connection_id| {
subscribers.broadcast(ConnectionManagerNotification::InboundConnection {
endpoint,
connection_id,
});
Ok(())
});
if sender.send(res).is_err() {
warn!("connector dropped before receiving result of add inbound callback");
}
}
CmRequest::Subscribe { sender, callback } => {
let subscriber_id = subscribers.add_subscriber(callback);
if sender.send(Ok(subscriber_id)).is_err() {
warn!("connector dropped before receiving result of remove connection");
}
}
CmRequest::Unsubscribe {
sender,
subscriber_id,
} => {
subscribers.remove_subscriber(subscriber_id);
if sender.send(Ok(())).is_err() {
warn!("connector dropped before receiving result of remove connection");
}
}
};
}
fn send_heartbeats<T: MatrixLifeCycle, U: MatrixSender>(
state: &mut ConnectionState<T, U>,
subscribers: &mut SubscriberMap,
) {
let heartbeat_message = match create_heartbeat() {
Ok(h) => h,
Err(err) => {
error!("Failed to create heartbeat message: {:?}", err);
return;
}
};
let matrix_sender = state.matrix_sender();
let mut reconnections = vec![];
for (endpoint, metadata) in state.connection_metadata_mut().iter_mut() {
match metadata {
ConnectionMetadata::Outbound { id, outbound } => {
if outbound.reconnecting {
if outbound.last_connection_attempt.elapsed().as_secs()
> outbound.retry_frequency
{
reconnections.push(endpoint.to_string());
}
} else {
info!("Sending heartbeat to {}", endpoint);
if let Err(err) =
matrix_sender.send((*id).to_string(), heartbeat_message.clone())
{
error!(
"failed to send heartbeat: {:?} attempting reconnection",
err
);
subscribers.broadcast(ConnectionManagerNotification::Disconnected {
endpoint: endpoint.clone(),
});
reconnections.push(endpoint.to_string());
}
}
}
ConnectionMetadata::Inbound {
id,
ref mut inbound,
} => {
info!("Sending heartbeat to {}", endpoint);
if let Err(err) = matrix_sender.send((*id).to_string(), heartbeat_message.clone()) {
error!(
"failed to send heartbeat: {:?} attempting reconnection",
err
);
if !inbound.disconnected {
inbound.disconnected = true;
subscribers.broadcast(ConnectionManagerNotification::Disconnected {
endpoint: endpoint.clone(),
});
}
}
inbound.disconnected = false;
}
}
}
for endpoint in reconnections {
if let Err(err) = state.reconnect(&endpoint, subscribers) {
error!("Reconnection attempt to {} failed: {:?}", endpoint, err);
}
}
}
fn create_heartbeat() -> Result<Vec<u8>, ConnectionManagerError> {
let heartbeat = NetworkHeartbeat::new().write_to_bytes().map_err(|_| {
ConnectionManagerError::HeartbeatError("cannot create NetworkHeartbeat message".to_string())
})?;
let mut heartbeat_message = NetworkMessage::new();
heartbeat_message.set_message_type(NetworkMessageType::NETWORK_HEARTBEAT);
heartbeat_message.set_payload(heartbeat);
let heartbeat_bytes = heartbeat_message.write_to_bytes().map_err(|_| {
ConnectionManagerError::HeartbeatError("cannot create NetworkMessage".to_string())
})?;
Ok(heartbeat_bytes)
}
#[cfg(test)]
pub mod tests {
use super::*;
use std::sync::mpsc;
use crate::mesh::Mesh;
use crate::transport::inproc::InprocTransport;
use crate::transport::socket::TcpTransport;
#[test]
fn test_connection_manager_startup_and_shutdown() {
let mut transport = Box::new(InprocTransport::default());
transport.listen("inproc://test").unwrap();
let mesh = Mesh::new(512, 128);
let mut cm = ConnectionManager::new(
mesh.get_life_cycle(),
mesh.get_sender(),
transport,
None,
None,
);
cm.start().unwrap();
cm.shutdown_and_wait();
}
#[test]
fn test_add_connection_request() {
let mut transport = Box::new(InprocTransport::default());
let mut listener = transport.listen("inproc://test").unwrap();
thread::spawn(move || {
listener.accept().unwrap();
});
let mesh = Mesh::new(512, 128);
let mut cm = ConnectionManager::new(
mesh.get_life_cycle(),
mesh.get_sender(),
transport,
None,
None,
);
let connector = cm.start().unwrap();
connector
.request_connection("inproc://test", "test_id")
.expect("A connection could not be created");
cm.shutdown_and_wait();
}
#[test]
fn test_mutiple_add_connection_requests() {
let mut transport = Box::new(InprocTransport::default());
let mut listener = transport.listen("inproc://test").unwrap();
thread::spawn(move || {
listener.accept().unwrap();
});
let mesh = Mesh::new(512, 128);
let mut cm = ConnectionManager::new(
mesh.get_life_cycle(),
mesh.get_sender(),
transport,
None,
None,
);
let connector = cm.start().unwrap();
connector
.request_connection("inproc://test", "test_id")
.expect("A connection could not be created");
connector
.request_connection("inproc://test", "test_id")
.expect("A connection could not be re-requested");
cm.shutdown_and_wait();
}
#[test]
fn test_heartbeat_inproc() {
let mut transport = Box::new(InprocTransport::default());
let mut listener = transport.listen("inproc://test").unwrap();
let mesh = Mesh::new(512, 128);
let mesh_clone = mesh.clone();
thread::spawn(move || {
let conn = listener.accept().unwrap();
mesh_clone.add(conn, "test_id".to_string()).unwrap();
});
let mut cm = ConnectionManager::new(
mesh.get_life_cycle(),
mesh.get_sender(),
transport,
Some(1),
None,
);
let connector = cm.start().unwrap();
connector
.request_connection("inproc://test", "test_id")
.expect("A connection could not be created");
let envelope = mesh.recv().unwrap();
let heartbeat: NetworkMessage = protobuf::parse_from_bytes(&envelope.payload()).unwrap();
assert_eq!(
heartbeat.get_message_type(),
NetworkMessageType::NETWORK_HEARTBEAT
);
cm.shutdown_and_wait();
}
#[test]
fn test_heartbeat_raw_tcp() {
let mut transport = Box::new(TcpTransport::default());
let mut listener = transport.listen("tcp://localhost:0").unwrap();
let endpoint = listener.endpoint();
let mesh = Mesh::new(512, 128);
let mesh_clone = mesh.clone();
thread::spawn(move || {
let conn = listener.accept().unwrap();
mesh_clone.add(conn, "test_id".to_string()).unwrap();
});
let mut cm = ConnectionManager::new(
mesh.get_life_cycle(),
mesh.get_sender(),
transport,
None,
None,
);
let connector = cm.start().unwrap();
connector
.request_connection(&endpoint, "test_id")
.expect("A connection could not be created");
let envelope = mesh.recv().unwrap();
let heartbeat: NetworkMessage = protobuf::parse_from_bytes(&envelope.payload()).unwrap();
assert_eq!(
heartbeat.get_message_type(),
NetworkMessageType::NETWORK_HEARTBEAT
);
cm.shutdown_and_wait();
}
#[test]
fn test_remove_connection() {
let mut transport = Box::new(TcpTransport::default());
let mut listener = transport.listen("tcp://localhost:0").unwrap();
let endpoint = listener.endpoint();
let mesh = Mesh::new(512, 128);
let mesh_clone = mesh.clone();
thread::spawn(move || {
let conn = listener.accept().unwrap();
mesh_clone.add(conn, "test_id".to_string()).unwrap();
});
let mut cm = ConnectionManager::new(
mesh.get_life_cycle(),
mesh.get_sender(),
transport,
None,
None,
);
let connector = cm.start().unwrap();
connector
.request_connection(&endpoint, "test_id")
.expect("A connection could not be created");
assert_eq!(
vec![endpoint.clone()],
connector
.list_connections()
.expect("Unable to list connections")
);
let endpoint_removed = connector
.remove_connection(&endpoint)
.expect("Unable to remove connection");
assert_eq!(Some(endpoint.clone()), endpoint_removed);
assert!(connector
.list_connections()
.expect("Unable to list connections")
.is_empty());
cm.shutdown_and_wait();
}
#[test]
fn test_remove_nonexistent_connection() {
let mut transport = Box::new(TcpTransport::default());
let mut listener = transport.listen("tcp://localhost:0").unwrap();
let endpoint = listener.endpoint();
let mesh = Mesh::new(512, 128);
let mesh_clone = mesh.clone();
thread::spawn(move || {
let conn = listener.accept().unwrap();
mesh_clone.add(conn, "test_id".to_string()).unwrap();
});
let mut cm = ConnectionManager::new(
mesh.get_life_cycle(),
mesh.get_sender(),
transport,
None,
None,
);
let connector = cm.start().unwrap();
let endpoint_removed = connector
.remove_connection(&endpoint)
.expect("Unable to remove connection");
assert_eq!(None, endpoint_removed);
cm.shutdown_and_wait();
}
#[test]
fn test_notifications_handler_iterator() {
let (send, recv) = channel();
let nh = NotificationIter { recv };
let join_handle = thread::spawn(move || {
for _ in 0..5 {
send.send(ConnectionManagerNotification::Connected {
endpoint: "tcp://localhost:3030".to_string(),
})
.unwrap();
}
});
let mut notifications_sent = 0;
for n in nh {
assert_eq!(
n,
ConnectionManagerNotification::Connected {
endpoint: "tcp://localhost:3030".to_string()
}
);
notifications_sent += 1;
}
assert_eq!(notifications_sent, 5);
join_handle.join().unwrap();
}
#[test]
fn test_reconnect_raw_tcp() {
let mut transport = Box::new(TcpTransport::default());
let mut listener = transport
.listen("tcp://localhost:0")
.expect("Cannot listen for connections");
let endpoint = listener.endpoint();
let mesh1 = Mesh::new(512, 128);
let mesh2 = Mesh::new(512, 128);
thread::spawn(move || {
let conn = listener.accept().expect("Cannot accept connection");
mesh2
.add(conn, "test_id".to_string())
.expect("Cannot add connection to mesh");
let envelope = mesh2.recv().expect("Cannot receive message");
let heartbeat: NetworkMessage = protobuf::parse_from_bytes(&envelope.payload())
.expect("Cannot parse NetworkMessage");
assert_eq!(
heartbeat.get_message_type(),
NetworkMessageType::NETWORK_HEARTBEAT
);
let mut connection = mesh2
.remove(&"test_id".to_string())
.expect("Cannot remove connection from mesh");
connection
.disconnect()
.expect("Connection failed to disconnect");
listener.accept().expect("Unable to accept connection");
});
let mut cm = ConnectionManager::new(
mesh1.get_life_cycle(),
mesh1.get_sender(),
transport,
Some(1),
None,
);
let connector = cm.start().expect("Unable to start ConnectionManager");
connector
.request_connection(&endpoint, "test_id")
.expect("Unable to request connection");
let mut subscriber = connector
.subscription_iter()
.expect("Cannot get subscriber");
let reconnecting_notification = subscriber
.next()
.expect("Cannot get message from subscriber");
assert!(
reconnecting_notification
== ConnectionManagerNotification::Disconnected {
endpoint: endpoint.clone(),
}
);
let reconnection_notification = subscriber
.next()
.expect("Cannot get message from subscriber");
assert!(
reconnection_notification
== ConnectionManagerNotification::Connected {
endpoint: endpoint.clone(),
}
);
cm.shutdown_and_wait();
}
#[test]
fn test_inbound_connection() {
let mut transport = InprocTransport::default();
let mut listener = transport
.listen("inproc://test_inbound_connection")
.expect("Cannot listen for connections");
let mesh = Mesh::new(512, 128);
let mut cm = ConnectionManager::new(
mesh.get_life_cycle(),
mesh.get_sender(),
Box::new(transport.clone()),
Some(1),
None,
);
let (conn_tx, conn_rx) = mpsc::channel();
let jh = thread::spawn(move || {
let _connection = transport
.connect("inproc://test_inbound_connection")
.unwrap();
conn_rx.recv().unwrap();
});
let connector = cm.start().expect("Unable to start ConnectionManager");
let mut subscriber = connector
.subscription_iter()
.expect("Cannot get subscriber");
let connection = listener.accept().unwrap();
connector
.add_inbound_connection(connection)
.expect("Unable to add inbound connection");
let notification = subscriber
.next()
.expect("Cannot get message from subscriber");
if let ConnectionManagerNotification::InboundConnection { endpoint, .. } = notification {
assert_eq!("inproc://test_inbound_connection", &endpoint);
} else {
panic!("Incorrect notification received: {:?}", notification);
}
let connection_endpoints = connector.list_connections().unwrap();
assert_eq!(
vec!["inproc://test_inbound_connection".to_string()],
connection_endpoints
);
connector
.remove_connection("inproc://test_inbound_connection")
.unwrap();
let connection_endpoints = connector.list_connections().unwrap();
assert!(connection_endpoints.is_empty());
conn_tx.send(()).unwrap();
jh.join().unwrap();
cm.shutdown_and_wait();
}
}