use protobuf::Message;
use crate::channel::Sender;
use crate::network::auth::{
AuthorizationAction, AuthorizationInquisitor, AuthorizationManager, AuthorizationState,
};
use crate::network::dispatch::{
DispatchError, DispatchMessage, Dispatcher, FromMessageBytes, Handler, MessageContext,
};
use crate::network::sender::SendRequest;
use crate::protos::authorization::{
AuthorizationError, AuthorizationMessage, AuthorizationMessageType, AuthorizedMessage,
ConnectRequest, ConnectRequest_HandshakeMode, ConnectResponse,
ConnectResponse_AuthorizationType, TrustRequest,
};
use crate::protos::network::{NetworkMessage, NetworkMessageType};
pub fn create_authorization_dispatcher(
auth_manager: AuthorizationManager,
network_sender: Box<dyn Sender<SendRequest>>,
) -> Dispatcher<AuthorizationMessageType> {
let mut auth_dispatcher = Dispatcher::new(network_sender);
auth_dispatcher.set_handler(
AuthorizationMessageType::CONNECT_REQUEST,
Box::new(ConnectRequestHandler::new(auth_manager.clone())),
);
auth_dispatcher.set_handler(
AuthorizationMessageType::CONNECT_RESPONSE,
Box::new(ConnectResponseHandler::new(auth_manager.clone())),
);
auth_dispatcher.set_handler(
AuthorizationMessageType::TRUST_REQUEST,
Box::new(TrustRequestHandler::new(auth_manager.clone())),
);
auth_dispatcher.set_handler(
AuthorizationMessageType::AUTHORIZE,
Box::new(
|_: AuthorizedMessage,
context: &MessageContext<AuthorizationMessageType>,
_: &dyn Sender<SendRequest>| {
info!(
"Connection authorized with peer {}",
context.source_peer_id()
);
Ok(())
},
),
);
auth_dispatcher.set_handler(
AuthorizationMessageType::AUTHORIZATION_ERROR,
Box::new(AuthorizationErrorHandler::new(auth_manager)),
);
auth_dispatcher
}
pub struct AuthorizationMessageHandler {
sender: Box<dyn Sender<DispatchMessage<AuthorizationMessageType>>>,
}
impl AuthorizationMessageHandler {
pub fn new(sender: Box<dyn Sender<DispatchMessage<AuthorizationMessageType>>>) -> Self {
AuthorizationMessageHandler { sender }
}
}
impl Handler<NetworkMessageType, AuthorizationMessage> for AuthorizationMessageHandler {
fn handle(
&self,
msg: AuthorizationMessage,
context: &MessageContext<NetworkMessageType>,
_sender: &dyn Sender<SendRequest>,
) -> Result<(), DispatchError> {
self.sender
.send(DispatchMessage::new(
msg.message_type,
msg.payload,
context.source_peer_id().to_string(),
))
.map_err(DispatchError::from)
}
}
pub struct NetworkAuthGuardHandler<M: FromMessageBytes> {
auth_manager: AuthorizationManager,
handler: Box<dyn Handler<NetworkMessageType, M>>,
}
impl<M: FromMessageBytes> NetworkAuthGuardHandler<M> {
pub fn new(
auth_manager: AuthorizationManager,
handler: Box<dyn Handler<NetworkMessageType, M>>,
) -> Self {
NetworkAuthGuardHandler {
auth_manager,
handler,
}
}
}
impl<M: FromMessageBytes> Handler<NetworkMessageType, M> for NetworkAuthGuardHandler<M> {
fn handle(
&self,
msg: M,
context: &MessageContext<NetworkMessageType>,
sender: &dyn Sender<SendRequest>,
) -> Result<(), DispatchError> {
if self.auth_manager.is_authorized(context.source_peer_id()) {
self.handler.handle(msg, context, sender)
} else {
debug!(
"{} attempting to send {:?} message before completing authorization",
context.source_peer_id(),
context.message_type()
);
Ok(())
}
}
}
struct ConnectRequestHandler {
auth_manager: AuthorizationManager,
}
impl ConnectRequestHandler {
fn new(auth_manager: AuthorizationManager) -> Self {
ConnectRequestHandler { auth_manager }
}
}
impl Handler<AuthorizationMessageType, ConnectRequest> for ConnectRequestHandler {
fn handle(
&self,
msg: ConnectRequest,
context: &MessageContext<AuthorizationMessageType>,
sender: &dyn Sender<SendRequest>,
) -> Result<(), DispatchError> {
match self
.auth_manager
.next_state(context.source_peer_id(), AuthorizationAction::Connecting)
{
Err(err) => {
debug!(
"Ignoring duplicate connect message from peer {}: {}",
context.source_peer_id(),
err
);
}
Ok(AuthorizationState::Connecting) => {
debug!("Beginning handshake for peer {}", context.source_peer_id(),);
if msg.get_handshake_mode() == ConnectRequest_HandshakeMode::BIDIRECTIONAL {
let mut connect_req = ConnectRequest::new();
connect_req.set_handshake_mode(ConnectRequest_HandshakeMode::UNIDIRECTIONAL);
sender.send(SendRequest::new(
context.source_peer_id().to_string(),
wrap_in_network_auth_envelopes(
AuthorizationMessageType::CONNECT_REQUEST,
connect_req,
)?,
))?;
debug!(
"Sent bidirectional connect request to peer {}",
context.source_peer_id()
);
}
let mut response = ConnectResponse::new();
response.set_accepted_authorization_types(vec![
ConnectResponse_AuthorizationType::TRUST,
]);
sender.send(SendRequest::new(
context.source_peer_id().to_string(),
wrap_in_network_auth_envelopes(
AuthorizationMessageType::CONNECT_RESPONSE,
response,
)?,
))?;
}
Ok(AuthorizationState::Internal) => {
debug!(
"Sending Authorized message to internal peer {}",
context.source_peer_id()
);
let auth_msg = AuthorizedMessage::new();
sender.send(SendRequest::new(
context.source_peer_id().to_string(),
wrap_in_network_auth_envelopes(AuthorizationMessageType::AUTHORIZE, auth_msg)?,
))?;
}
Ok(next_state) => panic!("Should not have been able to transition to {}", next_state),
}
Ok(())
}
}
struct ConnectResponseHandler {
auth_manager: AuthorizationManager,
}
impl ConnectResponseHandler {
fn new(auth_manager: AuthorizationManager) -> Self {
ConnectResponseHandler { auth_manager }
}
}
impl Handler<AuthorizationMessageType, ConnectResponse> for ConnectResponseHandler {
fn handle(
&self,
msg: ConnectResponse,
context: &MessageContext<AuthorizationMessageType>,
sender: &dyn Sender<SendRequest>,
) -> Result<(), DispatchError> {
debug!(
"Receive connect response from peer {}: {:?}",
context.source_peer_id(),
msg
);
if msg
.get_accepted_authorization_types()
.iter()
.any(|t| t == &ConnectResponse_AuthorizationType::TRUST)
{
let mut trust_request = TrustRequest::new();
trust_request.set_identity(self.auth_manager.identity.clone());
sender.send(SendRequest::new(
context.source_peer_id().to_string(),
wrap_in_network_auth_envelopes(
AuthorizationMessageType::TRUST_REQUEST,
trust_request,
)?,
))?;
}
Ok(())
}
}
struct TrustRequestHandler {
auth_manager: AuthorizationManager,
}
impl TrustRequestHandler {
fn new(auth_manager: AuthorizationManager) -> Self {
TrustRequestHandler { auth_manager }
}
}
impl Handler<AuthorizationMessageType, TrustRequest> for TrustRequestHandler {
fn handle(
&self,
msg: TrustRequest,
context: &MessageContext<AuthorizationMessageType>,
sender: &dyn Sender<SendRequest>,
) -> Result<(), DispatchError> {
match self.auth_manager.next_state(
context.source_peer_id(),
AuthorizationAction::TrustIdentifying(msg.get_identity().to_string()),
) {
Err(err) => {
debug!(
"Ignoring trust request message from peer {}: {}",
context.source_peer_id(),
err
);
}
Ok(AuthorizationState::Authorized) => {
debug!(
"Sending Authorized message to peer {} (formerly {})",
msg.get_identity(),
context.source_peer_id()
);
let auth_msg = AuthorizedMessage::new();
sender.send(SendRequest::new(
msg.get_identity().to_string(),
wrap_in_network_auth_envelopes(AuthorizationMessageType::AUTHORIZE, auth_msg)?,
))?;
}
Ok(next_state) => panic!("Should not have been able to transition to {}", next_state),
}
Ok(())
}
}
struct AuthorizationErrorHandler {
auth_manager: AuthorizationManager,
}
impl AuthorizationErrorHandler {
fn new(auth_manager: AuthorizationManager) -> Self {
AuthorizationErrorHandler { auth_manager }
}
}
impl Handler<AuthorizationMessageType, AuthorizationError> for AuthorizationErrorHandler {
fn handle(
&self,
msg: AuthorizationError,
context: &MessageContext<AuthorizationMessageType>,
_: &dyn Sender<SendRequest>,
) -> Result<(), DispatchError> {
match self
.auth_manager
.next_state(context.source_peer_id(), AuthorizationAction::Unauthorizing)
{
Ok(AuthorizationState::Unauthorized) => {
info!(
"Connection unauthorized by peer {}: {}",
context.source_peer_id(),
msg.get_error_message()
);
}
Err(err) => {
warn!(
"Unable to handle unauthorizing by peer {}: {}",
context.source_peer_id(),
err
);
}
Ok(next_state) => panic!("Should not have been able to transition to {}", next_state),
}
Ok(())
}
}
fn wrap_in_network_auth_envelopes<M: protobuf::Message>(
msg_type: AuthorizationMessageType,
auth_msg: M,
) -> Result<Vec<u8>, DispatchError> {
let mut auth_msg_env = AuthorizationMessage::new();
auth_msg_env.set_message_type(msg_type);
auth_msg_env.set_payload(auth_msg.write_to_bytes()?);
let mut network_msg = NetworkMessage::new();
network_msg.set_message_type(NetworkMessageType::AUTHORIZATION);
network_msg.set_payload(auth_msg_env.write_to_bytes()?);
network_msg.write_to_bytes().map_err(DispatchError::from)
}
#[cfg(test)]
mod tests {
use super::*;
use protobuf::Message;
use crate::channel::mock::MockSender;
use crate::mesh::Mesh;
use crate::network::Network;
use crate::protos::authorization::{
AuthorizationError, AuthorizationError_AuthorizationErrorType, AuthorizationMessage,
AuthorizedMessage, ConnectRequest, ConnectResponse, ConnectResponse_AuthorizationType,
TrustRequest,
};
use crate::protos::network::{NetworkMessage, NetworkMessageType};
use crate::transport::{
ConnectError, Connection, DisconnectError, RecvError, SendError, Transport,
};
#[test]
fn connect_request_dispatch() {
let (network, peer_id) = create_network_with_initial_temp_peer();
let auth_mgr = AuthorizationManager::new(network, "mock_identity".into());
let network_sender = MockSender::default();
let dispatcher =
create_authorization_dispatcher(auth_mgr, Box::new(network_sender.clone()));
let mut msg = ConnectRequest::new();
msg.set_handshake_mode(ConnectRequest_HandshakeMode::BIDIRECTIONAL);
let msg_bytes = msg.write_to_bytes().expect("Unable to serialize message");
assert_eq!(
Ok(()),
dispatcher.dispatch(
&peer_id,
&AuthorizationMessageType::CONNECT_REQUEST,
msg_bytes
)
);
let mut sent = network_sender.clear();
let send_request = sent.pop().expect("A message should have been sent");
let connect_res_msg: ConnectResponse = expect_auth_message(
AuthorizationMessageType::CONNECT_RESPONSE,
send_request.payload(),
);
assert_eq!(
vec![ConnectResponse_AuthorizationType::TRUST],
connect_res_msg.get_accepted_authorization_types().to_vec()
);
let send_request = sent
.pop()
.expect("An additional message should have been sent");
let connect_req_msg: ConnectRequest = expect_auth_message(
AuthorizationMessageType::CONNECT_REQUEST,
send_request.payload(),
);
assert_eq!(
ConnectRequest_HandshakeMode::UNIDIRECTIONAL,
connect_req_msg.get_handshake_mode()
);
}
#[test]
fn connect_response_dispatch() {
let (network, peer_id) = create_network_with_initial_temp_peer();
let auth_mgr = AuthorizationManager::new(network, "mock_identity".into());
let network_sender = MockSender::default();
let dispatcher =
create_authorization_dispatcher(auth_mgr, Box::new(network_sender.clone()));
let mut msg = ConnectResponse::new();
msg.set_accepted_authorization_types(vec![ConnectResponse_AuthorizationType::TRUST].into());
let msg_bytes = msg.write_to_bytes().expect("Unable to serialize message");
assert_eq!(
Ok(()),
dispatcher.dispatch(
&peer_id,
&AuthorizationMessageType::CONNECT_RESPONSE,
msg_bytes
)
);
let send_request = network_sender
.clear()
.pop()
.expect("A message should have been sent");
let trust_req: TrustRequest = expect_auth_message(
AuthorizationMessageType::TRUST_REQUEST,
send_request.payload(),
);
assert_eq!("mock_identity", trust_req.get_identity());
}
#[test]
fn trust_request_dispatch() {
let (network, peer_id) = create_network_with_initial_temp_peer();
let auth_mgr = AuthorizationManager::new(network, "mock_identity".into());
let network_sender = MockSender::default();
let dispatcher =
create_authorization_dispatcher(auth_mgr, Box::new(network_sender.clone()));
let mut msg = ConnectRequest::new();
msg.set_handshake_mode(ConnectRequest_HandshakeMode::UNIDIRECTIONAL);
let msg_bytes = msg.write_to_bytes().expect("Unable to serialize message");
assert_eq!(
Ok(()),
dispatcher.dispatch(
&peer_id,
&AuthorizationMessageType::CONNECT_REQUEST,
msg_bytes
)
);
let send_request = network_sender
.clear()
.pop()
.expect("A message should have been sent");
let _connect_res_msg: ConnectResponse = expect_auth_message(
AuthorizationMessageType::CONNECT_RESPONSE,
send_request.payload(),
);
let mut trust_req = TrustRequest::new();
trust_req.set_identity("my_identity".into());
let msg_bytes = trust_req
.write_to_bytes()
.expect("Unable to serialize message");
assert_eq!(
Ok(()),
dispatcher.dispatch(
&peer_id,
&AuthorizationMessageType::TRUST_REQUEST,
msg_bytes
)
);
let send_request = network_sender
.clear()
.pop()
.expect("A message should have been sent");
let _auth_msg: AuthorizedMessage =
expect_auth_message(AuthorizationMessageType::AUTHORIZE, send_request.payload());
}
#[test]
fn auth_error_dispatch() {
let (network, peer_id) = create_network_with_initial_temp_peer();
let auth_mgr = AuthorizationManager::new(network.clone(), "mock_pub_key".into());
let network_sender = MockSender::default();
let dispatcher =
create_authorization_dispatcher(auth_mgr, Box::new(network_sender.clone()));
let mut msg = ConnectRequest::new();
msg.set_handshake_mode(ConnectRequest_HandshakeMode::UNIDIRECTIONAL);
let msg_bytes = msg.write_to_bytes().expect("Unable to serialize message");
assert_eq!(
Ok(()),
dispatcher.dispatch(
&peer_id,
&AuthorizationMessageType::CONNECT_REQUEST,
msg_bytes
)
);
let send_request = network_sender
.clear()
.pop()
.expect("A message should have been sent");
let _connect_res_msg: ConnectResponse = expect_auth_message(
AuthorizationMessageType::CONNECT_RESPONSE,
send_request.payload(),
);
let mut error_message = AuthorizationError::new();
error_message
.set_error_type(AuthorizationError_AuthorizationErrorType::AUTHORIZATION_REJECTED);
error_message.set_error_message("Test Error!".into());
let msg_bytes = error_message
.write_to_bytes()
.expect("Unable to serialize error message");
assert_eq!(
Ok(()),
dispatcher.dispatch(
&peer_id,
&AuthorizationMessageType::AUTHORIZATION_ERROR,
msg_bytes
)
);
assert_eq!(0, network_sender.sent().len());
assert_eq!(0, network.peer_ids().len());
}
fn expect_auth_message<M: protobuf::Message>(
message_type: AuthorizationMessageType,
msg_bytes: &[u8],
) -> M {
let network_msg: NetworkMessage =
protobuf::parse_from_bytes(msg_bytes).expect("Unable to parse network message");
assert_eq!(NetworkMessageType::AUTHORIZATION, network_msg.message_type);
let auth_msg: AuthorizationMessage = protobuf::parse_from_bytes(network_msg.get_payload())
.expect("Unable to parse auth message");
assert_eq!(message_type, auth_msg.message_type);
match protobuf::parse_from_bytes(auth_msg.get_payload()) {
Ok(msg) => msg,
Err(err) => panic!(
"unable to parse message for type {:?}: {:?}",
message_type, err
),
}
}
fn create_network_with_initial_temp_peer() -> (Network, String) {
let network = Network::new(Mesh::new(5, 5), 0).unwrap();
let mut transport = MockConnectingTransport;
let connection = transport
.connect("local")
.expect("Unable to create the connection");
network
.add_connection(connection)
.expect("Unable to add connection to network");
let peer_id = network.peer_ids()[0].clone();
(network, peer_id)
}
struct MockConnectingTransport;
impl Transport for MockConnectingTransport {
fn accepts(&self, _: &str) -> bool {
true
}
fn connect(&mut self, _: &str) -> Result<Box<dyn Connection>, ConnectError> {
Ok(Box::new(MockConnection))
}
fn listen(
&mut self,
_: &str,
) -> Result<Box<dyn crate::transport::Listener>, crate::transport::ListenError> {
unimplemented!()
}
}
struct MockConnection;
impl Connection for MockConnection {
fn send(&mut self, _message: &[u8]) -> Result<(), SendError> {
Ok(())
}
fn recv(&mut self) -> Result<Vec<u8>, RecvError> {
unimplemented!()
}
fn remote_endpoint(&self) -> String {
String::from("MockConnection")
}
fn local_endpoint(&self) -> String {
String::from("MockConnection")
}
fn disconnect(&mut self) -> Result<(), DisconnectError> {
Ok(())
}
fn evented(&self) -> &dyn mio::Evented {
&MockEvented
}
}
struct MockEvented;
impl mio::Evented for MockEvented {
fn register(
&self,
_poll: &mio::Poll,
_token: mio::Token,
_interest: mio::Ready,
_opts: mio::PollOpt,
) -> std::io::Result<()> {
Ok(())
}
fn reregister(
&self,
_poll: &mio::Poll,
_token: mio::Token,
_interest: mio::Ready,
_opts: mio::PollOpt,
) -> std::io::Result<()> {
Ok(())
}
fn deregister(&self, _poll: &mio::Poll) -> std::io::Result<()> {
Ok(())
}
}
}