use crate::channel::Sender;
use crate::circuit::handlers::create_message;
use crate::circuit::{ServiceId, SplinterState};
use crate::network::dispatch::{DispatchError, Handler, MessageContext};
use crate::network::sender::SendRequest;
use crate::protos::circuit::{CircuitError, CircuitMessageType};
pub struct CircuitErrorHandler {
node_id: String,
state: SplinterState,
}
impl Handler<CircuitMessageType, CircuitError> for CircuitErrorHandler {
fn handle(
&self,
msg: CircuitError,
context: &MessageContext<CircuitMessageType>,
sender: &dyn Sender<SendRequest>,
) -> Result<(), DispatchError> {
debug!("Handle Circuit Error Message {:?}", msg);
let circuit_name = msg.get_circuit_name();
let service_id = msg.get_service_id();
let unique_id = ServiceId::new(circuit_name.to_string(), service_id.to_string());
let recipient = match self
.state
.get_service(&unique_id)
.map_err(|err| DispatchError::HandleError(err.context()))?
{
Some(service) => {
let node_id = service.node().id();
if node_id == self.node_id {
match service.peer_id() {
Some(peer_id) => peer_id.to_string(),
None => {
warn!("No peer id for service:{} ", service.service_id());
return Ok(());
}
}
} else {
service.node().id().to_string()
}
}
None => {
warn!(
"Original message sender is not connected: {}, cannot send Circuit Error",
service_id
);
return Ok(());
}
};
let network_msg_bytes = create_message(
context.message_bytes().to_vec(),
CircuitMessageType::CIRCUIT_ERROR_MESSAGE,
)?;
let send_request = SendRequest::new(recipient, network_msg_bytes);
sender.send(send_request)?;
Ok(())
}
}
impl CircuitErrorHandler {
pub fn new(node_id: String, state: SplinterState) -> Self {
CircuitErrorHandler { node_id, state }
}
}
#[cfg(test)]
mod tests {
use protobuf::Message;
use super::*;
use crate::channel::mock::MockSender;
use crate::channel::Sender;
use crate::circuit::directory::CircuitDirectory;
use crate::circuit::service::{Service, SplinterNode};
use crate::circuit::{AuthorizationType, Circuit, DurabilityType, PersistenceType, RouteType};
use crate::network::dispatch::Dispatcher;
use crate::protos::circuit::{CircuitError_Error, CircuitMessage};
use crate::protos::network::NetworkMessage;
#[test]
fn test_circuit_error_handler_service() {
let sender = Box::new(MockSender::default());
let mut dispatcher = Dispatcher::new(sender.box_clone());
let circuit = Circuit::builder()
.with_id("alpha".into())
.with_auth(AuthorizationType::Trust)
.with_members(vec!["123".into()])
.with_roster(vec!["abc".into(), "def".into()])
.with_persistence(PersistenceType::Any)
.with_durability(DurabilityType::NoDurability)
.with_routes(RouteType::Any)
.with_circuit_management_type("circuit_errors_test_app".into())
.build()
.expect("Should have built a correct circuit");
let mut circuit_directory = CircuitDirectory::new();
circuit_directory.add_circuit("alpha".to_string(), circuit);
let state = SplinterState::new("memory".to_string(), circuit_directory);
let node_123 = SplinterNode::new("123".to_string(), vec!["123.0.0.1:0".to_string()]);
let node_345 = SplinterNode::new("345".to_string(), vec!["123.0.0.1:1".to_string()]);
let service_abc =
Service::new("abc".to_string(), Some("abc_network".to_string()), node_123);
let service_def =
Service::new("def".to_string(), Some("def_network".to_string()), node_345);
let abc_id = ServiceId::new("alpha".into(), "abc".into());
let def_id = ServiceId::new("alpha".into(), "def".into());
state.add_service(abc_id, service_abc).unwrap();
state.add_service(def_id, service_def).unwrap();
let handler = CircuitErrorHandler::new("123".to_string(), state);
dispatcher.set_handler(CircuitMessageType::CIRCUIT_ERROR_MESSAGE, Box::new(handler));
let mut circuit_error = CircuitError::new();
circuit_error.set_service_id("abc".into());
circuit_error.set_circuit_name("alpha".into());
circuit_error.set_correlation_id("1234".into());
circuit_error.set_error(CircuitError_Error::ERROR_RECIPIENT_NOT_IN_DIRECTORY);
circuit_error.set_error_message("TEST".into());
let error_bytes = circuit_error.write_to_bytes().unwrap();
dispatcher
.dispatch(
"345",
&CircuitMessageType::CIRCUIT_ERROR_MESSAGE,
error_bytes.clone(),
)
.unwrap();
let send_request = sender.sent().get(0).unwrap().clone();
assert_eq!(send_request.recipient(), "abc_network");
let network_msg: NetworkMessage =
protobuf::parse_from_bytes(send_request.payload()).unwrap();
let circuit_msg: CircuitMessage =
protobuf::parse_from_bytes(network_msg.get_payload()).unwrap();
let circuit_error: CircuitError =
protobuf::parse_from_bytes(circuit_msg.get_payload()).unwrap();
assert_eq!(
circuit_msg.get_message_type(),
CircuitMessageType::CIRCUIT_ERROR_MESSAGE
);
assert_eq!(circuit_error.get_service_id(), "abc");
assert_eq!(
circuit_error.get_error(),
CircuitError_Error::ERROR_RECIPIENT_NOT_IN_DIRECTORY
);
assert_eq!(circuit_error.get_error_message(), "TEST");
assert_eq!(circuit_error.get_correlation_id(), "1234");
}
#[test]
fn test_circuit_error_handler_node() {
let sender = Box::new(MockSender::default());
let mut dispatcher = Dispatcher::new(sender.box_clone());
let circuit = Circuit::builder()
.with_id("alpha".into())
.with_auth(AuthorizationType::Trust)
.with_members(vec!["123".into()])
.with_roster(vec!["abc".into(), "def".into()])
.with_persistence(PersistenceType::Any)
.with_durability(DurabilityType::NoDurability)
.with_routes(RouteType::Any)
.with_circuit_management_type("circuit_error_test_app".into())
.build()
.expect("Should have built a correct circuit");
let mut circuit_directory = CircuitDirectory::new();
circuit_directory.add_circuit("alpha".to_string(), circuit);
let state = SplinterState::new("memory".to_string(), circuit_directory);
let node_123 = SplinterNode::new("123".to_string(), vec!["123.0.0.1:0".to_string()]);
let node_345 = SplinterNode::new("345".to_string(), vec!["123.0.0.1:1".to_string()]);
let service_abc =
Service::new("abc".to_string(), Some("abc_network".to_string()), node_123);
let service_def =
Service::new("def".to_string(), Some("def_network".to_string()), node_345);
let abc_id = ServiceId::new("alpha".into(), "abc".into());
let def_id = ServiceId::new("alpha".into(), "def".into());
state.add_service(abc_id, service_abc).unwrap();
state.add_service(def_id, service_def).unwrap();
let handler = CircuitErrorHandler::new("123".to_string(), state);
dispatcher.set_handler(CircuitMessageType::CIRCUIT_ERROR_MESSAGE, Box::new(handler));
let mut circuit_error = CircuitError::new();
circuit_error.set_service_id("def".into());
circuit_error.set_circuit_name("alpha".into());
circuit_error.set_correlation_id("1234".into());
circuit_error.set_error(CircuitError_Error::ERROR_RECIPIENT_NOT_IN_DIRECTORY);
circuit_error.set_error_message("TEST".into());
let error_bytes = circuit_error.write_to_bytes().unwrap();
dispatcher
.dispatch(
"568",
&CircuitMessageType::CIRCUIT_ERROR_MESSAGE,
error_bytes.clone(),
)
.unwrap();
let send_request = sender.sent().get(0).unwrap().clone();
assert_eq!(send_request.recipient(), "345");
let network_msg: NetworkMessage =
protobuf::parse_from_bytes(send_request.payload()).unwrap();
let circuit_msg: CircuitMessage =
protobuf::parse_from_bytes(network_msg.get_payload()).unwrap();
let circuit_error: CircuitError =
protobuf::parse_from_bytes(circuit_msg.get_payload()).unwrap();
assert_eq!(
circuit_msg.get_message_type(),
CircuitMessageType::CIRCUIT_ERROR_MESSAGE
);
assert_eq!(circuit_error.get_service_id(), "def");
assert_eq!(
circuit_error.get_error(),
CircuitError_Error::ERROR_RECIPIENT_NOT_IN_DIRECTORY
);
assert_eq!(circuit_error.get_error_message(), "TEST");
assert_eq!(circuit_error.get_correlation_id(), "1234");
}
#[test]
fn test_circuit_error_handler_no_service() {
let sender = Box::new(MockSender::default());
let mut dispatcher = Dispatcher::new(sender.box_clone());
let circuit_directory = CircuitDirectory::new();
let state = SplinterState::new("memory".to_string(), circuit_directory);
let handler = CircuitErrorHandler::new("123".to_string(), state);
dispatcher.set_handler(CircuitMessageType::CIRCUIT_ERROR_MESSAGE, Box::new(handler));
let mut circuit_error = CircuitError::new();
circuit_error.set_service_id("abc".into());
circuit_error.set_circuit_name("alpha".into());
circuit_error.set_correlation_id("1234".into());
circuit_error.set_error(CircuitError_Error::ERROR_RECIPIENT_NOT_IN_DIRECTORY);
circuit_error.set_error_message("TEST".into());
let error_bytes = circuit_error.write_to_bytes().unwrap();
dispatcher
.dispatch(
"def",
&CircuitMessageType::CIRCUIT_ERROR_MESSAGE,
error_bytes.clone(),
)
.unwrap();
let send_request = sender.sent();
assert_eq!(send_request.len(), 0);
}
}