use std::sync::mpsc::{channel, Sender};
use std::thread;
use crate::protocol::network::{NetworkHeartbeat, NetworkMessage};
use crate::protos::network;
use crate::protos::prelude::*;
use crate::threading::pacemaker;
use crate::transport::matrix::{ConnectionMatrixLifeCycle, ConnectionMatrixSender};
use crate::transport::Transport;
use super::error::ConnectionManagerError;
use super::{
AuthResult, Authorizer, CmMessage, CmRequest, ConnectionManager, ConnectionManagerNotification,
ConnectionManagerState, ConnectionMetadataExt, OutboundConnection, SubscriberMap,
};
const DEFAULT_HEARTBEAT_INTERVAL: u64 = 10;
const DEFAULT_MAXIMUM_RETRY_FREQUENCY: u64 = 300;
pub struct ConnectionManagerBuilder<T, U> {
authorizer: Option<Box<dyn Authorizer + Send>>,
life_cycle: Option<T>,
matrix_sender: Option<U>,
transport: Option<Box<dyn Transport + Send>>,
heartbeat_interval: u64,
maximum_retry_frequency: u64,
}
impl<T, U> Default for ConnectionManagerBuilder<T, U> {
fn default() -> Self {
Self {
authorizer: None,
life_cycle: None,
matrix_sender: None,
transport: None,
heartbeat_interval: DEFAULT_HEARTBEAT_INTERVAL,
maximum_retry_frequency: DEFAULT_MAXIMUM_RETRY_FREQUENCY,
}
}
}
impl<T, U> ConnectionManagerBuilder<T, U>
where
T: ConnectionMatrixLifeCycle + 'static,
U: ConnectionMatrixSender + 'static,
{
pub fn new() -> Self {
Default::default()
}
pub fn with_authorizer(mut self, authorizer: Box<dyn Authorizer + Send>) -> Self {
self.authorizer = Some(authorizer);
self
}
pub fn with_matrix_life_cycle(mut self, life_cycle: T) -> Self {
self.life_cycle = Some(life_cycle);
self
}
pub fn with_matrix_sender(mut self, matrix_sender: U) -> Self {
self.matrix_sender = Some(matrix_sender);
self
}
pub fn with_transport(mut self, transport: Box<dyn Transport + Send>) -> Self {
self.transport = Some(transport);
self
}
pub fn with_heartbeat_interval(mut self, interval: u64) -> Self {
self.heartbeat_interval = interval;
self
}
pub fn with_maximum_retry_frequency(mut self, frequency: u64) -> Self {
self.maximum_retry_frequency = frequency;
self
}
pub fn start(mut self) -> Result<ConnectionManager, ConnectionManagerError> {
let (sender, recv) = channel();
let heartbeat = self.heartbeat_interval;
let retry_frequency = self.maximum_retry_frequency;
let authorizer = self
.authorizer
.take()
.ok_or_else(|| ConnectionManagerError::StartUpError("No authorizer provided".into()))?;
let transport = self
.transport
.take()
.ok_or_else(|| ConnectionManagerError::StartUpError("No transport provided".into()))?;
let matrix_sender = self.matrix_sender.take().ok_or_else(|| {
ConnectionManagerError::StartUpError("No matrix sender provided".into())
})?;
let life_cycle = self.life_cycle.take().ok_or_else(|| {
ConnectionManagerError::StartUpError("No matrix life cycle provided".into())
})?;
let resender = sender.clone();
let join_handle = thread::Builder::new()
.name("Connection Manager".into())
.spawn(move || {
let mut state = ConnectionManagerState::new(
life_cycle,
matrix_sender,
transport,
retry_frequency,
);
let mut subscribers = SubscriberMap::new();
loop {
match recv.recv() {
Ok(CmMessage::Shutdown) => break,
Ok(CmMessage::Request(req)) => {
handle_request(
req,
&mut state,
&mut subscribers,
&*authorizer,
resender.clone(),
);
}
Ok(CmMessage::AuthResult(auth_result)) => {
handle_auth_result(auth_result, &mut state, &mut subscribers);
}
Ok(CmMessage::SendHeartbeats) => send_heartbeats(
&mut state,
&mut subscribers,
&*authorizer,
resender.clone(),
),
Err(_) => {
warn!("All senders have disconnected");
break;
}
}
}
})?;
debug!(
"Starting connection manager pacemaker with interval of {}s",
heartbeat
);
let pacemaker = pacemaker::Pacemaker::builder()
.with_interval(heartbeat)
.with_sender(sender.clone())
.with_message_factory(|| CmMessage::SendHeartbeats)
.start()
.map_err(|err| ConnectionManagerError::StartUpError(err.to_string()))?;
Ok(ConnectionManager {
pacemaker,
join_handle,
sender,
})
}
}
fn handle_request<T: ConnectionMatrixLifeCycle, U: ConnectionMatrixSender>(
req: CmRequest,
state: &mut ConnectionManagerState<T, U>,
subscribers: &mut SubscriberMap,
authorizer: &dyn Authorizer,
internal_sender: Sender<CmMessage>,
) {
match req {
CmRequest::RequestOutboundConnection {
endpoint,
sender,
connection_id,
expected_authorization,
local_authorization,
} => state.add_outbound_connection(
OutboundConnection {
endpoint,
connection_id,
expected_authorization,
local_authorization,
},
sender,
internal_sender,
authorizer,
subscribers,
),
CmRequest::RemoveConnection {
endpoint,
connection_id,
sender,
} => {
let response = state
.remove_connection(&endpoint, &connection_id)
.map(|meta_opt| meta_opt.map(|meta| meta.endpoint().to_owned()));
if sender.send(response).is_err() {
warn!("connector dropped before receiving result of remove connection");
}
}
CmRequest::ListConnections { sender } => {
if sender
.send(Ok(state
.connection_metadata()
.iter()
.map(|(_, metadata)| metadata.endpoint().to_string())
.collect()))
.is_err()
{
warn!("connector dropped before receiving result of list connections");
}
}
CmRequest::AddInboundConnection { sender, connection } => {
state.add_inbound_connection(connection, sender, internal_sender, authorizer)
}
CmRequest::Subscribe { sender, callback } => {
let subscriber_id = subscribers.add_subscriber(callback);
if sender.send(Ok(subscriber_id)).is_err() {
warn!("connector dropped before receiving result of remove connection");
}
}
CmRequest::Unsubscribe {
sender,
subscriber_id,
} => {
subscribers.remove_subscriber(subscriber_id);
if sender.send(Ok(())).is_err() {
warn!("connector dropped before receiving result of remove connection");
}
}
};
}
fn handle_auth_result<T: ConnectionMatrixLifeCycle, U: ConnectionMatrixSender>(
auth_result: AuthResult,
state: &mut ConnectionManagerState<T, U>,
subscribers: &mut SubscriberMap,
) {
match auth_result {
AuthResult::Outbound {
endpoint,
auth_result,
} => {
state.on_outbound_authorization_complete(endpoint, auth_result, subscribers);
}
AuthResult::Inbound {
endpoint,
auth_result,
} => {
state.on_inbound_authorization_complete(endpoint, auth_result, subscribers);
}
}
}
fn send_heartbeats<T: ConnectionMatrixLifeCycle, U: ConnectionMatrixSender>(
state: &mut ConnectionManagerState<T, U>,
subscribers: &mut SubscriberMap,
authorizer: &dyn Authorizer,
internal_sender: Sender<CmMessage>,
) {
let heartbeat_message = match create_heartbeat() {
Ok(h) => h,
Err(err) => {
error!("Failed to create heartbeat message: {:?}", err);
return;
}
};
let matrix_sender = state.matrix_sender();
let mut reconnections = vec![];
for (connection_id, metadata) in state.connection_metadata_mut().iter_mut() {
match metadata.extended_metadata {
ConnectionMetadataExt::Outbound {
reconnecting,
retry_frequency,
last_connection_attempt,
..
} => {
if reconnecting {
if last_connection_attempt.elapsed().as_secs() > retry_frequency {
reconnections.push(metadata.clone());
}
} else {
trace!(
"Sending heartbeat to {} ({})",
metadata.endpoint(),
metadata.connection_id(),
);
if let Err(err) =
matrix_sender.send(connection_id.clone(), heartbeat_message.clone())
{
debug!(
"Outbound: failed to send heartbeat to {} ({}): \
{:?} attempting reconnection",
metadata.endpoint(),
metadata.connection_id(),
err
);
subscribers.broadcast(ConnectionManagerNotification::Disconnected {
endpoint: metadata.endpoint.clone(),
identity: metadata.identity.clone(),
connection_id: metadata.connection_id.clone(),
});
reconnections.push(metadata.clone());
}
}
}
ConnectionMetadataExt::Inbound {
ref mut disconnected,
..
} => {
trace!(
"Sending heartbeat to {} ({})",
metadata.endpoint,
metadata.connection_id,
);
if let Err(err) =
matrix_sender.send(connection_id.clone(), heartbeat_message.clone())
{
debug!(
"Inbound: failed to send heartbeat to {} ({}): {:?} ",
metadata.endpoint, metadata.connection_id, err,
);
if !*disconnected {
*disconnected = true;
subscribers.broadcast(ConnectionManagerNotification::Disconnected {
endpoint: metadata.endpoint.clone(),
identity: metadata.identity.clone(),
connection_id: metadata.connection_id.clone(),
});
}
} else {
*disconnected = false;
}
}
}
}
for metadata in reconnections {
if let Err(err) = state.reconnect(
metadata.endpoint(),
metadata.connection_id(),
subscribers,
&*authorizer,
internal_sender.clone(),
) {
error!(
"Reconnection attempt to {} ({}): failed: {:?}",
metadata.endpoint(),
metadata.connection_id(),
err
);
}
}
}
fn create_heartbeat() -> Result<Vec<u8>, ConnectionManagerError> {
IntoBytes::<network::NetworkMessage>::into_bytes(NetworkMessage::NetworkHeartbeat(
NetworkHeartbeat,
))
.map_err(|_| {
ConnectionManagerError::HeartbeatError("cannot create NetworkHeartbeat message".to_string())
})
}