use crate::circuit::handlers::create_message;
use crate::circuit::{ServiceId, SplinterState};
use crate::network::dispatch::{DispatchError, Handler, MessageContext, MessageSender, PeerId};
use crate::protos::circuit::{
CircuitDirectMessage, CircuitError, CircuitError_Error, CircuitMessageType,
};
use protobuf::Message;
pub struct CircuitDirectMessageHandler {
node_id: String,
state: SplinterState,
}
impl Handler for CircuitDirectMessageHandler {
type Source = PeerId;
type MessageType = CircuitMessageType;
type Message = CircuitDirectMessage;
fn match_type(&self) -> Self::MessageType {
CircuitMessageType::CIRCUIT_DIRECT_MESSAGE
}
fn handle(
&self,
msg: Self::Message,
context: &MessageContext<Self::Source, Self::MessageType>,
sender: &dyn MessageSender<Self::Source>,
) -> Result<(), DispatchError> {
debug!(
"Handle Circuit Direct Message {}on {} ({} => {}) [{} byte{}]",
if msg.get_correlation_id().is_empty() {
"".to_string()
} else {
format!("{} ", msg.get_correlation_id())
},
msg.get_circuit(),
msg.get_sender(),
msg.get_recipient(),
msg.get_payload().len(),
if msg.get_payload().len() == 1 {
""
} else {
"s"
}
);
let circuit_name = msg.get_circuit();
let msg_sender = msg.get_sender();
let recipient = msg.get_recipient();
let recipient_id = ServiceId::new(circuit_name.to_string(), recipient.to_string());
let sender_id = ServiceId::new(circuit_name.to_string(), msg_sender.to_string());
let (msg_bytes, msg_recipient) = {
if let Some(circuit) = self
.state
.circuit(circuit_name)
.map_err(|err| DispatchError::HandleError(err.context()))?
{
if !circuit.roster().contains(&msg_sender) {
let mut error_message = CircuitError::new();
error_message.set_correlation_id(msg.get_correlation_id().to_string());
error_message.set_service_id(msg_sender.into());
error_message.set_circuit_name(circuit_name.into());
error_message.set_error(CircuitError_Error::ERROR_SENDER_NOT_IN_CIRCUIT_ROSTER);
error_message.set_error_message(format!(
"Sender is not allowed in the Circuit: {}",
msg_sender
));
let msg_bytes = error_message.write_to_bytes()?;
let network_msg_bytes =
create_message(msg_bytes, CircuitMessageType::CIRCUIT_ERROR_MESSAGE)?;
(network_msg_bytes, context.source_peer_id().to_string())
} else if self
.state
.get_service(&sender_id)
.map_err(|err| DispatchError::HandleError(err.context()))?
.is_none()
{
let mut error_message = CircuitError::new();
error_message.set_correlation_id(msg.get_correlation_id().to_string());
error_message.set_service_id(msg_sender.into());
error_message.set_circuit_name(circuit_name.into());
error_message.set_error(CircuitError_Error::ERROR_SENDER_NOT_IN_DIRECTORY);
error_message.set_error_message(format!(
"Sender is not in the service directory: {}",
recipient
));
let msg_bytes = error_message.write_to_bytes()?;
let network_msg_bytes =
create_message(msg_bytes, CircuitMessageType::CIRCUIT_ERROR_MESSAGE)?;
(network_msg_bytes, context.source_peer_id().to_string())
} else if circuit.roster().contains(&recipient) {
if let Some(service) = self
.state
.get_service(&recipient_id)
.map_err(|err| DispatchError::HandleError(err.context()))?
{
let node_id = service.node().id().to_string();
if node_id != self.node_id {
let msg_bytes = context.message_bytes().to_vec();
let network_msg_bytes = create_message(
msg_bytes,
CircuitMessageType::CIRCUIT_DIRECT_MESSAGE,
)?;
(network_msg_bytes, node_id)
} else {
let msg_bytes = context.message_bytes().to_vec();
let network_msg_bytes = create_message(
msg_bytes,
CircuitMessageType::CIRCUIT_DIRECT_MESSAGE,
)?;
let peer_id = match service.peer_id() {
Some(peer_id) => peer_id.clone(),
None => {
warn!("No peer id for service:{} ", service.service_id());
return Ok(());
}
};
(network_msg_bytes, peer_id)
}
} else {
let mut error_message = CircuitError::new();
error_message.set_correlation_id(msg.get_correlation_id().to_string());
error_message.set_service_id(msg_sender.into());
error_message.set_circuit_name(circuit_name.into());
error_message
.set_error(CircuitError_Error::ERROR_RECIPIENT_NOT_IN_DIRECTORY);
error_message.set_error_message(format!(
"Recipient is not in the service directory: {}",
recipient
));
let msg_bytes = error_message.write_to_bytes()?;
let network_msg_bytes =
create_message(msg_bytes, CircuitMessageType::CIRCUIT_ERROR_MESSAGE)?;
(network_msg_bytes, context.source_peer_id().to_string())
}
} else {
let mut error_message = CircuitError::new();
error_message.set_correlation_id(msg.get_correlation_id().to_string());
error_message.set_service_id(msg_sender.into());
error_message.set_circuit_name(circuit_name.into());
error_message
.set_error(CircuitError_Error::ERROR_RECIPIENT_NOT_IN_CIRCUIT_ROSTER);
error_message.set_error_message(format!(
"Recipient is not allowed in the Circuit: {}",
recipient
));
let msg_bytes = error_message.write_to_bytes()?;
let network_msg_bytes =
create_message(msg_bytes, CircuitMessageType::CIRCUIT_ERROR_MESSAGE)?;
(network_msg_bytes, context.source_peer_id().to_string())
}
} else {
let mut error_message = CircuitError::new();
error_message.set_correlation_id(msg.get_correlation_id().into());
error_message.set_service_id(msg_sender.into());
error_message.set_circuit_name(circuit_name.into());
error_message.set_error(CircuitError_Error::ERROR_CIRCUIT_DOES_NOT_EXIST);
error_message
.set_error_message(format!("Circuit does not exist: {}", circuit_name));
let msg_bytes = error_message.write_to_bytes()?;
let network_msg_bytes =
create_message(msg_bytes, CircuitMessageType::CIRCUIT_ERROR_MESSAGE)?;
(network_msg_bytes, context.source_peer_id().to_string())
}
};
sender
.send(msg_recipient.into(), msg_bytes)
.map_err(|(recipient, payload)| {
DispatchError::NetworkSendError((recipient.into(), payload))
})?;
Ok(())
}
}
impl CircuitDirectMessageHandler {
pub fn new(node_id: String, state: SplinterState) -> Self {
CircuitDirectMessageHandler { node_id, state }
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::circuit::directory::CircuitDirectory;
use crate::circuit::service::{Service, SplinterNode};
use crate::circuit::{AuthorizationType, Circuit, DurabilityType, PersistenceType, RouteType};
use crate::mesh::Mesh;
use crate::network::dispatch::Dispatcher;
use crate::network::sender;
use crate::network::{Network, NetworkMessageWrapper};
use crate::protos::circuit::CircuitMessage;
use crate::protos::network::NetworkMessage;
use crate::transport::inproc::InprocTransport;
use crate::transport::{Listener, Transport};
#[test]
fn test_circuit_direct_message_handler_service() {
run_test(
|mut listener, mut dispatcher, network1| {
let connection = listener.accept().expect("Cannot accept connection");
network1
.add_peer("abc_network".to_string(), connection)
.expect("Unable to add peer");
let circuit = Circuit::builder()
.with_id("alpha".into())
.with_auth(AuthorizationType::Trust)
.with_members(vec!["123".into()])
.with_roster(vec!["abc".into(), "def".into()])
.with_persistence(PersistenceType::Any)
.with_durability(DurabilityType::NoDurability)
.with_routes(RouteType::Any)
.with_circuit_management_type("circuit_direct_test_app".into())
.build()
.expect("Should have built a correct circuit");
let mut circuit_directory = CircuitDirectory::new();
circuit_directory.add_circuit("alpha".to_string(), circuit);
let state = SplinterState::new("memory".to_string(), circuit_directory);
let node = SplinterNode::new("123".to_string(), vec!["123.0.0.1:0".to_string()]);
let service_abc = Service::new(
"abc".to_string(),
Some("abc_network".to_string()),
node.clone(),
);
let service_def =
Service::new("def".to_string(), Some("def_network".to_string()), node);
let abc_id = ServiceId::new("alpha".into(), "abc".into());
let def_id = ServiceId::new("alpha".into(), "def".into());
state.add_service(abc_id, service_abc).unwrap();
state.add_service(def_id, service_def).unwrap();
let handler = CircuitDirectMessageHandler::new("123".to_string(), state);
dispatcher.set_handler(Box::new(handler));
let mut direct_message = CircuitDirectMessage::new();
direct_message.set_circuit("alpha".into());
direct_message.set_sender("def".into());
direct_message.set_recipient("abc".into());
direct_message.set_payload(b"test".to_vec());
direct_message.set_correlation_id("1234".into());
let direct_bytes = direct_message.write_to_bytes().unwrap();
dispatcher
.dispatch(
"def".into(),
&CircuitMessageType::CIRCUIT_DIRECT_MESSAGE,
direct_bytes.clone(),
)
.unwrap();
},
"345",
CircuitMessageType::CIRCUIT_DIRECT_MESSAGE,
|msg: CircuitDirectMessage| {
assert_eq!(msg.get_sender(), "def");
assert_eq!(msg.get_circuit(), "alpha");
assert_eq!(msg.get_recipient(), "abc");
assert_eq!(msg.get_payload().to_vec(), b"test".to_vec());
assert_eq!(msg.get_correlation_id(), "1234");
},
)
}
#[test]
fn test_circuit_direct_message_handler_node() {
run_test(
|mut listener, mut dispatcher, network1| {
let connection = listener.accept().expect("Cannot accept connection");
network1
.add_peer("123".to_string(), connection)
.expect("Unable to add peer");
let circuit = Circuit::builder()
.with_id("alpha".into())
.with_auth(AuthorizationType::Trust)
.with_members(vec!["123".into(), "345".into()])
.with_roster(vec!["abc".into(), "def".into()])
.with_persistence(PersistenceType::Any)
.with_durability(DurabilityType::NoDurability)
.with_routes(RouteType::Any)
.with_circuit_management_type("circuit_direct_test_app".into())
.build()
.expect("Should have built a correct circuit");
let mut circuit_directory = CircuitDirectory::new();
circuit_directory.add_circuit("alpha".to_string(), circuit);
let state = SplinterState::new("memory".to_string(), circuit_directory);
let node_123 =
SplinterNode::new("123".to_string(), vec!["123.0.0.1:0".to_string()]);
let node_345 =
SplinterNode::new("345".to_string(), vec!["123.0.0.1:0".to_string()]);
let service_abc =
Service::new("abc".to_string(), Some("abc_network".to_string()), node_123);
let service_def =
Service::new("def".to_string(), Some("def_network".to_string()), node_345);
let abc_id = ServiceId::new("alpha".into(), "abc".into());
let def_id = ServiceId::new("alpha".into(), "def".into());
state.add_service(abc_id, service_abc).unwrap();
state.add_service(def_id, service_def).unwrap();
let handler = CircuitDirectMessageHandler::new("345".to_string(), state);
dispatcher.set_handler(Box::new(handler));
let mut direct_message = CircuitDirectMessage::new();
direct_message.set_circuit("alpha".into());
direct_message.set_sender("def".into());
direct_message.set_recipient("abc".into());
direct_message.set_payload(b"test".to_vec());
direct_message.set_correlation_id("1234".into());
let direct_bytes = direct_message.write_to_bytes().unwrap();
dispatcher
.dispatch(
"def".into(),
&CircuitMessageType::CIRCUIT_DIRECT_MESSAGE,
direct_bytes.clone(),
)
.unwrap();
},
"345",
CircuitMessageType::CIRCUIT_DIRECT_MESSAGE,
|msg: CircuitDirectMessage| {
assert_eq!(msg.get_sender(), "def");
assert_eq!(msg.get_circuit(), "alpha");
assert_eq!(msg.get_recipient(), "abc");
assert_eq!(msg.get_payload().to_vec(), b"test".to_vec());
assert_eq!(msg.get_correlation_id(), "1234");
},
)
}
#[test]
fn test_circuit_direct_message_handler_sender_not_in_directory() {
run_test(
|mut listener, mut dispatcher, network1| {
let connection = listener.accept().expect("Cannot accept connection");
network1
.add_peer("def".to_string(), connection)
.expect("Unable to add peer");
let circuit = Circuit::builder()
.with_id("alpha".into())
.with_auth(AuthorizationType::Trust)
.with_members(vec!["123".into()])
.with_roster(vec!["abc".into(), "def".into()])
.with_persistence(PersistenceType::Any)
.with_durability(DurabilityType::NoDurability)
.with_routes(RouteType::Any)
.with_circuit_management_type("circuit_direct_test_app".into())
.build()
.expect("Should have built a correct circuit");
let mut circuit_directory = CircuitDirectory::new();
circuit_directory.add_circuit("alpha".to_string(), circuit);
let state = SplinterState::new("memory".to_string(), circuit_directory);
let node = SplinterNode::new("123".to_string(), vec!["123.0.0.1:0".to_string()]);
let service_abc = Service::new(
"abc".to_string(),
Some("abc_network".to_string()),
node.clone(),
);
let id = ServiceId::new("alpha".into(), "abc".into());
state.add_service(id.clone(), service_abc).unwrap();
let handler = CircuitDirectMessageHandler::new("123".to_string(), state);
dispatcher.set_handler(Box::new(handler));
let mut direct_message = CircuitDirectMessage::new();
direct_message.set_circuit("alpha".into());
direct_message.set_sender("def".into());
direct_message.set_recipient("abc".into());
direct_message.set_payload(b"test".to_vec());
direct_message.set_correlation_id("1234".into());
let direct_bytes = direct_message.write_to_bytes().unwrap();
dispatcher
.dispatch(
"def".into(),
&CircuitMessageType::CIRCUIT_DIRECT_MESSAGE,
direct_bytes.clone(),
)
.unwrap();
},
"345",
CircuitMessageType::CIRCUIT_ERROR_MESSAGE,
|msg: CircuitError| {
assert_eq!(msg.get_service_id(), "def");
assert_eq!(
msg.get_error(),
CircuitError_Error::ERROR_SENDER_NOT_IN_DIRECTORY
);
assert_eq!(msg.get_correlation_id(), "1234");
},
)
}
#[test]
fn test_circuit_direct_message_handler_sender_not_in_circuit_roster() {
run_test(
|mut listener, mut dispatcher, network1| {
let connection = listener.accept().expect("Cannot accept connection");
network1
.add_peer("def".to_string(), connection)
.expect("Unable to add peer");
let circuit = Circuit::builder()
.with_id("alpha".into())
.with_auth(AuthorizationType::Trust)
.with_members(vec!["123".into()])
.with_roster(vec!["abc".into()])
.with_persistence(PersistenceType::Any)
.with_durability(DurabilityType::NoDurability)
.with_routes(RouteType::Any)
.with_circuit_management_type("circuit_direct_test_app".into())
.build()
.expect("Should have built a correct circuit");
let mut circuit_directory = CircuitDirectory::new();
circuit_directory.add_circuit("alpha".to_string(), circuit);
let state = SplinterState::new("memory".to_string(), circuit_directory);
let node = SplinterNode::new("123".to_string(), vec!["123.0.0.1:0".to_string()]);
let service_abc = Service::new(
"abc".to_string(),
Some("abc_network".to_string()),
node.clone(),
);
let id = ServiceId::new("alpha".into(), "abc".into());
state.add_service(id.clone(), service_abc).unwrap();
let handler = CircuitDirectMessageHandler::new("123".to_string(), state);
dispatcher.set_handler(Box::new(handler));
let mut direct_message = CircuitDirectMessage::new();
direct_message.set_circuit("alpha".into());
direct_message.set_sender("def".into());
direct_message.set_recipient("abc".into());
direct_message.set_payload(b"test".to_vec());
direct_message.set_correlation_id("1234".into());
let direct_bytes = direct_message.write_to_bytes().unwrap();
dispatcher
.dispatch(
"def".into(),
&CircuitMessageType::CIRCUIT_DIRECT_MESSAGE,
direct_bytes.clone(),
)
.unwrap();
},
"345",
CircuitMessageType::CIRCUIT_ERROR_MESSAGE,
|msg: CircuitError| {
assert_eq!(msg.get_service_id(), "def");
assert_eq!(
msg.get_error(),
CircuitError_Error::ERROR_SENDER_NOT_IN_CIRCUIT_ROSTER
);
assert_eq!(msg.get_correlation_id(), "1234");
},
)
}
#[test]
fn test_circuit_direct_message_handler_recipient_not_in_directory() {
run_test(
|mut listener, mut dispatcher, network1| {
let connection = listener.accept().expect("Cannot accept connection");
network1
.add_peer("def".to_string(), connection)
.expect("Unable to add peer");
let circuit = Circuit::builder()
.with_id("alpha".into())
.with_auth(AuthorizationType::Trust)
.with_members(vec!["123".into(), "345".into()])
.with_roster(vec!["abc".into(), "def".into()])
.with_persistence(PersistenceType::Any)
.with_durability(DurabilityType::NoDurability)
.with_routes(RouteType::Any)
.with_circuit_management_type("circuit_direct_test_app".into())
.build()
.expect("Should have built a correct circuit");
let mut circuit_directory = CircuitDirectory::new();
circuit_directory.add_circuit("alpha".to_string(), circuit);
let state = SplinterState::new("memory".to_string(), circuit_directory);
let node_345 =
SplinterNode::new("345".to_string(), vec!["123.0.0.1:0".to_string()]);
let service_def =
Service::new("def".to_string(), Some("def_network".to_string()), node_345);
let id = ServiceId::new("alpha".into(), "def".into());
state.add_service(id.clone(), service_def).unwrap();
let handler = CircuitDirectMessageHandler::new("345".to_string(), state);
dispatcher.set_handler(Box::new(handler));
let mut direct_message = CircuitDirectMessage::new();
direct_message.set_circuit("alpha".into());
direct_message.set_sender("def".into());
direct_message.set_recipient("abc".into());
direct_message.set_payload(b"test".to_vec());
direct_message.set_correlation_id("1234".into());
let direct_bytes = direct_message.write_to_bytes().unwrap();
dispatcher
.dispatch(
"def".into(),
&CircuitMessageType::CIRCUIT_DIRECT_MESSAGE,
direct_bytes.clone(),
)
.unwrap();
},
"345",
CircuitMessageType::CIRCUIT_ERROR_MESSAGE,
|msg: CircuitError| {
assert_eq!(msg.get_service_id(), "def");
assert_eq!(
msg.get_error(),
CircuitError_Error::ERROR_RECIPIENT_NOT_IN_DIRECTORY
);
assert_eq!(msg.get_correlation_id(), "1234");
},
)
}
#[test]
fn test_circuit_direct_message_handler_recipient_not_in_circuit_roster() {
run_test(
|mut listener, mut dispatcher, network1| {
let connection = listener.accept().expect("Cannot accept connection");
network1
.add_peer("def".to_string(), connection)
.expect("Unable to add peer");
let circuit = Circuit::builder()
.with_id("alpha".into())
.with_auth(AuthorizationType::Trust)
.with_members(vec!["123".into(), "345".into()])
.with_roster(vec!["def".into()])
.with_persistence(PersistenceType::Any)
.with_durability(DurabilityType::NoDurability)
.with_routes(RouteType::Any)
.with_circuit_management_type("circuit_direct_test_app".into())
.build()
.expect("Should have built a correct circuit");
let mut circuit_directory = CircuitDirectory::new();
circuit_directory.add_circuit("alpha".to_string(), circuit);
let state = SplinterState::new("memory".to_string(), circuit_directory);
let node_345 =
SplinterNode::new("123".to_string(), vec!["123.0.0.1:0".to_string()]);
let service_def =
Service::new("def".to_string(), Some("def_network".to_string()), node_345);
let id = ServiceId::new("alpha".into(), "def".into());
state.add_service(id.clone(), service_def).unwrap();
let handler = CircuitDirectMessageHandler::new("345".to_string(), state);
dispatcher.set_handler(Box::new(handler));
let mut direct_message = CircuitDirectMessage::new();
direct_message.set_circuit("alpha".into());
direct_message.set_sender("def".into());
direct_message.set_recipient("abc".into());
direct_message.set_payload(b"test".to_vec());
direct_message.set_correlation_id("1234".into());
let direct_bytes = direct_message.write_to_bytes().unwrap();
dispatcher
.dispatch(
"def".into(),
&CircuitMessageType::CIRCUIT_DIRECT_MESSAGE,
direct_bytes.clone(),
)
.unwrap();
},
"345",
CircuitMessageType::CIRCUIT_ERROR_MESSAGE,
|msg: CircuitError| {
assert_eq!(msg.get_service_id(), "def");
assert_eq!(
msg.get_error(),
CircuitError_Error::ERROR_RECIPIENT_NOT_IN_CIRCUIT_ROSTER
);
assert_eq!(msg.get_correlation_id(), "1234");
},
)
}
#[test]
fn test_circuit_direct_message_handler_no_circuit() {
run_test(
|mut listener, mut dispatcher, network1| {
let connection = listener.accept().expect("Cannot accept connection");
network1
.add_peer("def".to_string(), connection)
.expect("Unable to add peer");
let circuit_directory = CircuitDirectory::new();
let state = SplinterState::new("memory".to_string(), circuit_directory);
let handler = CircuitDirectMessageHandler::new("345".to_string(), state);
dispatcher.set_handler(Box::new(handler));
let mut direct_message = CircuitDirectMessage::new();
direct_message.set_circuit("alpha".into());
direct_message.set_sender("def".into());
direct_message.set_recipient("abc".into());
direct_message.set_payload(b"test".to_vec());
direct_message.set_correlation_id("1234".into());
let direct_bytes = direct_message.write_to_bytes().unwrap();
dispatcher
.dispatch(
"def".into(),
&CircuitMessageType::CIRCUIT_DIRECT_MESSAGE,
direct_bytes.clone(),
)
.unwrap();
},
"345",
CircuitMessageType::CIRCUIT_ERROR_MESSAGE,
|msg: CircuitError| {
assert_eq!(msg.get_service_id(), "def");
assert_eq!(
msg.get_error(),
CircuitError_Error::ERROR_CIRCUIT_DOES_NOT_EXIST
);
assert_eq!(msg.get_correlation_id(), "1234");
},
)
}
fn run_test<F: 'static, M: protobuf::Message, A>(
test: F,
expected_sender: &str,
expected_circuit_msg_type: CircuitMessageType,
detail_assertions: A,
) where
F: Fn(Box<dyn Listener>, Dispatcher<CircuitMessageType>, Network) -> () + Send,
A: Fn(M),
{
let mesh1 = Mesh::new(1, 1);
let network1 = Network::new(mesh1.clone(), 0).unwrap();
let network_message_queue = sender::Builder::new()
.with_network(network1.clone())
.build()
.expect("Unable to create queue");
let network_sender = network_message_queue.new_network_sender();
let mut inproc_transport = InprocTransport::default();
let dispatcher = Dispatcher::new(Box::new(network_sender));
let listener = inproc_transport
.listen("inproc://direct_message")
.expect("Cannot get listener");
std::thread::spawn(move || test(listener, dispatcher, network1));
let mesh2 = Mesh::new(1, 1);
let network2 = Network::new(mesh2.clone(), 0).unwrap();
let connection = inproc_transport
.connect("inproc://direct_message")
.expect("Unable to connect to inproc");
network2
.add_peer("345".to_string(), connection)
.expect("Unable to add peer");
let network_message = network2
.recv()
.expect("Unable to receive message over the network");
assert_network_message(
network_message,
expected_sender,
expected_circuit_msg_type,
detail_assertions,
)
}
fn assert_network_message<M: protobuf::Message, F: Fn(M)>(
network_message: NetworkMessageWrapper,
expected_sender: &str,
expected_circuit_msg_type: CircuitMessageType,
detail_assertions: F,
) {
assert_eq!(expected_sender, network_message.peer_id());
let network_msg: NetworkMessage =
protobuf::parse_from_bytes(network_message.payload()).unwrap();
let circuit_msg: CircuitMessage =
protobuf::parse_from_bytes(network_msg.get_payload()).unwrap();
assert_eq!(expected_circuit_msg_type, circuit_msg.get_message_type(),);
let circuit_msg: M = protobuf::parse_from_bytes(circuit_msg.get_payload()).unwrap();
detail_assertions(circuit_msg);
}
}