use crate::circuit::handlers::create_message;
use crate::circuit::routing::{RoutingTableReader, RoutingTableWriter, Service, ServiceId};
use crate::network::dispatch::{DispatchError, Handler, MessageContext, MessageSender, PeerId};
use crate::peer::PeerTokenPair;
use crate::protos::circuit::{
CircuitMessageType, ServiceConnectRequest, ServiceConnectResponse,
ServiceConnectResponse_Status, ServiceDisconnectRequest, ServiceDisconnectResponse,
ServiceDisconnectResponse_Status,
};
use protobuf::Message;
const ADMIN_SERVICE_ID_PREFIX: &str = "admin::";
const ADMIN_CIRCUIT_ID: &str = "admin";
pub struct ServiceConnectRequestHandler {
node_id: String,
routing_table_reader: Box<dyn RoutingTableReader>,
routing_table_writer: Box<dyn RoutingTableWriter>,
}
impl Handler for ServiceConnectRequestHandler {
type Source = PeerId;
type MessageType = CircuitMessageType;
type Message = ServiceConnectRequest;
fn match_type(&self) -> Self::MessageType {
CircuitMessageType::SERVICE_CONNECT_REQUEST
}
fn handle(
&self,
msg: Self::Message,
context: &MessageContext<Self::Source, Self::MessageType>,
sender: &dyn MessageSender<Self::Source>,
) -> Result<(), DispatchError> {
debug!("Handle Service Connect Request {:?}", msg);
let circuit_name = msg.get_circuit();
let service_id = msg.get_service_id();
let unique_id = ServiceId::new(circuit_name.to_string(), service_id.to_string());
let mut response = ServiceConnectResponse::new();
response.set_correlation_id(msg.get_correlation_id().into());
response.set_circuit(circuit_name.into());
response.set_service_id(service_id.into());
let circuit_result = self
.routing_table_reader
.get_circuit(circuit_name)
.map_err(|err| DispatchError::HandleError(err.to_string()))?;
if let Some(circuit) = circuit_result {
if circuit
.roster()
.iter()
.any(|service| service.service_id() == service_id)
|| service_id.starts_with(ADMIN_SERVICE_ID_PREFIX)
{
let mut service = {
if !service_id.starts_with(ADMIN_SERVICE_ID_PREFIX) {
self.routing_table_reader
.get_service(&unique_id)
.map_err(|err| DispatchError::HandleError(err.to_string()))?
.ok_or_else(|| {
DispatchError::HandleError(format!(
"Unable to get service {}",
service_id
))
})?
} else {
Service::new(
service_id.to_string(),
ADMIN_CIRCUIT_ID.to_string(),
self.node_id.to_string(),
vec![],
)
}
};
if service.local_peer_id().is_some() {
response.set_status(
ServiceConnectResponse_Status::ERROR_SERVICE_ALREADY_REGISTERED,
);
response
.set_error_message(format!("Service is already registered: {}", service_id))
} else if service.node_id() != self.node_id {
response.set_status(ServiceConnectResponse_Status::ERROR_NOT_AN_ALLOWED_NODE);
response.set_error_message(format!("{} is not allowed on this node", unique_id))
} else {
service
.set_local_peer_id(PeerTokenPair::from(context.source_peer_id().clone()));
let mut writer = self.routing_table_writer.clone();
writer
.add_service(unique_id, service)
.map_err(|err| DispatchError::HandleError(err.to_string()))?;
response.set_status(ServiceConnectResponse_Status::OK);
}
} else {
response.set_status(
ServiceConnectResponse_Status::ERROR_SERVICE_NOT_IN_CIRCUIT_REGISTRY,
);
response.set_error_message(format!(
"Service is not allowed in the circuit: {}:{}",
circuit_name, service_id
))
}
} else {
response.set_status(ServiceConnectResponse_Status::ERROR_CIRCUIT_DOES_NOT_EXIST);
response.set_error_message(format!("Circuit does not exist: {}", msg.get_circuit()))
}
let response_bytes = response.write_to_bytes()?;
let network_msg_bytes =
create_message(response_bytes, CircuitMessageType::SERVICE_CONNECT_RESPONSE)?;
let recipient = context.source_peer_id().clone();
sender
.send(recipient, network_msg_bytes)
.map_err(|(recipient, payload)| {
DispatchError::NetworkSendError((recipient.into(), payload))
})?;
Ok(())
}
}
impl ServiceConnectRequestHandler {
pub fn new(
node_id: String,
routing_table_reader: Box<dyn RoutingTableReader>,
routing_table_writer: Box<dyn RoutingTableWriter>,
) -> Self {
ServiceConnectRequestHandler {
node_id,
routing_table_reader,
routing_table_writer,
}
}
}
pub struct ServiceDisconnectRequestHandler {
routing_table_reader: Box<dyn RoutingTableReader>,
routing_table_writer: Box<dyn RoutingTableWriter>,
}
impl Handler for ServiceDisconnectRequestHandler {
type Source = PeerId;
type MessageType = CircuitMessageType;
type Message = ServiceDisconnectRequest;
fn match_type(&self) -> Self::MessageType {
CircuitMessageType::SERVICE_DISCONNECT_REQUEST
}
fn handle(
&self,
msg: Self::Message,
context: &MessageContext<Self::Source, Self::MessageType>,
sender: &dyn MessageSender<Self::Source>,
) -> Result<(), DispatchError> {
debug!("Handle Service Disconnect Request {:?}", msg);
let circuit_name = msg.get_circuit();
let service_id = msg.get_service_id();
let unique_id = ServiceId::new(circuit_name.to_string(), service_id.to_string());
let mut response = ServiceDisconnectResponse::new();
response.set_correlation_id(msg.get_correlation_id().into());
response.set_circuit(circuit_name.into());
response.set_service_id(service_id.into());
let circuit_result = self
.routing_table_reader
.get_circuit(circuit_name)
.map_err(|err| DispatchError::HandleError(err.to_string()))?;
if let Some(circuit) = circuit_result {
if circuit
.roster()
.iter()
.any(|service| service.service_id() == service_id)
|| service_id.starts_with(ADMIN_SERVICE_ID_PREFIX)
{
let mut service = self
.routing_table_reader
.get_service(&unique_id)
.map_err(|err| DispatchError::HandleError(err.to_string()))?
.ok_or_else(|| {
DispatchError::HandleError(format!("Unable to get service {}", service_id))
})?;
if service.local_peer_id().is_some() {
service.remove_local_peer_id();
let mut writer = self.routing_table_writer.clone();
writer
.add_service(unique_id, service)
.map_err(|err| DispatchError::HandleError(err.to_string()))?;
response.set_status(ServiceDisconnectResponse_Status::OK);
} else {
response
.set_status(ServiceDisconnectResponse_Status::ERROR_SERVICE_NOT_REGISTERED);
response.set_error_message(format!("Service is not registered: {}", service_id))
}
} else {
response.set_status(
ServiceDisconnectResponse_Status::ERROR_SERVICE_NOT_IN_CIRCUIT_REGISTRY,
);
response.set_error_message(format!(
"Service is not allowed in the circuit: {}:{}",
circuit_name, service_id
))
}
} else {
response.set_status(ServiceDisconnectResponse_Status::ERROR_CIRCUIT_DOES_NOT_EXIST);
response.set_error_message(format!("Circuit does not exist: {}", msg.get_circuit()))
}
let response_bytes = response.write_to_bytes()?;
let network_msg_bytes = create_message(
response_bytes,
CircuitMessageType::SERVICE_DISCONNECT_RESPONSE,
)?;
let recipient = context.source_peer_id().clone();
sender
.send(recipient, network_msg_bytes)
.map_err(|(recipient, payload)| {
DispatchError::NetworkSendError((recipient.into(), payload))
})?;
Ok(())
}
}
impl ServiceDisconnectRequestHandler {
pub fn new(
routing_table_reader: Box<dyn RoutingTableReader>,
routing_table_writer: Box<dyn RoutingTableWriter>,
) -> Self {
ServiceDisconnectRequestHandler {
routing_table_reader,
routing_table_writer,
}
}
}
impl From<protobuf::error::ProtobufError> for DispatchError {
fn from(e: protobuf::error::ProtobufError) -> Self {
DispatchError::SerializationError(e.to_string())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use crate::circuit::routing::AuthorizationType;
use crate::circuit::routing::{
memory::RoutingTable, Circuit, CircuitNode, RoutingTableWriter, Service,
};
use crate::network::dispatch::Dispatcher;
use crate::peer::PeerAuthorizationToken;
use crate::protos::circuit::CircuitMessage;
use crate::protos::network::NetworkMessage;
#[test]
fn test_service_connect_request_handler_no_circuit() {
let mock_sender = MockSender::new();
let mut dispatcher = Dispatcher::new(Box::new(mock_sender.clone()));
let table = RoutingTable::default();
let reader: Box<dyn RoutingTableReader> = Box::new(table.clone());
let writer: Box<dyn RoutingTableWriter> = Box::new(table.clone());
let handler = ServiceConnectRequestHandler::new("123".to_string(), reader, writer);
dispatcher.set_handler(Box::new(handler));
let mut connect_request = ServiceConnectRequest::new();
connect_request.set_circuit("alpha".into());
connect_request.set_service_id("abc".into());
let connect_bytes = connect_request.write_to_bytes().unwrap();
dispatcher
.dispatch(
PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("abc"),
PeerAuthorizationToken::from_peer_id("123"),
)
.into(),
&CircuitMessageType::SERVICE_CONNECT_REQUEST,
connect_bytes.clone(),
)
.unwrap();
let (id, message) = mock_sender.next_outbound().expect("No message was sent");
assert_network_message(
message,
id.into(),
PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("abc"),
PeerAuthorizationToken::from_peer_id("123"),
),
CircuitMessageType::SERVICE_CONNECT_RESPONSE,
|msg: ServiceConnectResponse| {
assert_eq!(msg.get_service_id(), "abc");
assert_eq!(msg.get_circuit(), "alpha");
assert_eq!(
msg.get_status(),
ServiceConnectResponse_Status::ERROR_CIRCUIT_DOES_NOT_EXIST
);
},
)
}
#[test]
fn test_service_connect_request_handler_not_in_circuit() {
let mock_sender = MockSender::new();
let mut dispatcher = Dispatcher::new(Box::new(mock_sender.clone()));
let (circuit, nodes) = build_circuit();
let table = RoutingTable::default();
let reader: Box<dyn RoutingTableReader> = Box::new(table.clone());
let mut writer: Box<dyn RoutingTableWriter> = Box::new(table.clone());
writer
.add_circuit(circuit.circuit_id().to_string(), circuit, nodes)
.expect("Unable to add circuit");
let handler = ServiceConnectRequestHandler::new("123".to_string(), reader, writer);
dispatcher.set_handler(Box::new(handler));
let mut connect_request = ServiceConnectRequest::new();
connect_request.set_circuit("alpha".into());
connect_request.set_service_id("BAD".into());
let connect_bytes = connect_request.write_to_bytes().unwrap();
dispatcher
.dispatch(
PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("BAD"),
PeerAuthorizationToken::from_peer_id("123"),
)
.into(),
&CircuitMessageType::SERVICE_CONNECT_REQUEST,
connect_bytes.clone(),
)
.unwrap();
let (id, message) = mock_sender.next_outbound().expect("No message was sent");
assert_network_message(
message,
id.into(),
PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("BAD"),
PeerAuthorizationToken::from_peer_id("123"),
),
CircuitMessageType::SERVICE_CONNECT_RESPONSE,
|msg: ServiceConnectResponse| {
assert_eq!(msg.get_service_id(), "BAD");
assert_eq!(msg.get_circuit(), "alpha");
assert_eq!(
msg.get_status(),
ServiceConnectResponse_Status::ERROR_SERVICE_NOT_IN_CIRCUIT_REGISTRY
);
},
)
}
#[test]
fn test_service_connect_request_handler() {
let mock_sender = MockSender::new();
let mut dispatcher = Dispatcher::new(Box::new(mock_sender.clone()));
let (circuit, nodes) = build_circuit();
let table = RoutingTable::default();
let reader: Box<dyn RoutingTableReader> = Box::new(table.clone());
let mut writer: Box<dyn RoutingTableWriter> = Box::new(table.clone());
writer
.add_circuit(circuit.circuit_id().to_string(), circuit, nodes)
.expect("Unable to add circuit");
let handler = ServiceConnectRequestHandler::new("123".to_string(), reader.clone(), writer);
dispatcher.set_handler(Box::new(handler));
let mut connect_request = ServiceConnectRequest::new();
connect_request.set_circuit("alpha".into());
connect_request.set_service_id("abc".into());
let connect_bytes = connect_request.write_to_bytes().unwrap();
dispatcher
.dispatch(
PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("abc"),
PeerAuthorizationToken::from_peer_id("123"),
)
.into(),
&CircuitMessageType::SERVICE_CONNECT_REQUEST,
connect_bytes.clone(),
)
.unwrap();
let id = ServiceId::new("alpha".into(), "abc".into());
assert!(reader.get_service(&id).unwrap().is_some());
let (id, message) = mock_sender.next_outbound().expect("No message was sent");
assert_network_message(
message,
id.into(),
PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("abc"),
PeerAuthorizationToken::from_peer_id("123"),
),
CircuitMessageType::SERVICE_CONNECT_RESPONSE,
|msg: ServiceConnectResponse| {
assert_eq!(msg.get_service_id(), "abc");
assert_eq!(msg.get_circuit(), "alpha");
assert_eq!(msg.get_status(), ServiceConnectResponse_Status::OK);
},
)
}
#[test]
fn test_service_connect_request_handler_already_connected() {
let mock_sender = MockSender::new();
let mut dispatcher = Dispatcher::new(Box::new(mock_sender.clone()));
let (circuit, nodes) = build_circuit();
let table = RoutingTable::default();
let reader: Box<dyn RoutingTableReader> = Box::new(table.clone());
let mut writer: Box<dyn RoutingTableWriter> = Box::new(table.clone());
writer
.add_circuit(circuit.circuit_id().to_string(), circuit, nodes)
.expect("Unable to add circuit");
let id = ServiceId::new("alpha".into(), "abc".into());
let mut service = reader
.get_service(&id)
.expect("Unable to get service")
.unwrap();
service.set_local_peer_id(
PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("abc_network"),
PeerAuthorizationToken::from_peer_id("123"),
)
.into(),
);
writer
.add_service(id, service)
.expect("Unable to add circuit");
let handler = ServiceConnectRequestHandler::new("123".to_string(), reader, writer);
dispatcher.set_handler(Box::new(handler));
let mut connect_request = ServiceConnectRequest::new();
connect_request.set_circuit("alpha".into());
connect_request.set_service_id("abc".into());
let connect_bytes = connect_request.write_to_bytes().unwrap();
dispatcher
.dispatch(
PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("abc"),
PeerAuthorizationToken::from_peer_id("123"),
)
.into(),
&CircuitMessageType::SERVICE_CONNECT_REQUEST,
connect_bytes.clone(),
)
.unwrap();
let (id, message) = mock_sender.next_outbound().expect("No message was sent");
assert_network_message(
message,
id.into(),
PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("abc"),
PeerAuthorizationToken::from_peer_id("123"),
),
CircuitMessageType::SERVICE_CONNECT_RESPONSE,
|msg: ServiceConnectResponse| {
assert_eq!(msg.get_service_id(), "abc");
assert_eq!(msg.get_circuit(), "alpha");
assert_eq!(
msg.get_status(),
ServiceConnectResponse_Status::ERROR_SERVICE_ALREADY_REGISTERED
);
},
)
}
#[test]
fn test_service_disconnect_request_handler_no_circuit() {
let mock_sender = MockSender::new();
let mut dispatcher = Dispatcher::new(Box::new(mock_sender.clone()));
let table = RoutingTable::default();
let reader: Box<dyn RoutingTableReader> = Box::new(table.clone());
let writer: Box<dyn RoutingTableWriter> = Box::new(table.clone());
let handler = ServiceDisconnectRequestHandler::new(reader, writer);
dispatcher.set_handler(Box::new(handler));
let mut disconnect_request = ServiceDisconnectRequest::new();
disconnect_request.set_circuit("alpha".into());
disconnect_request.set_service_id("abc".into());
let disconnect_bytes = disconnect_request.write_to_bytes().unwrap();
dispatcher
.dispatch(
PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("abc"),
PeerAuthorizationToken::from_peer_id("123"),
)
.into(),
&CircuitMessageType::SERVICE_DISCONNECT_REQUEST,
disconnect_bytes.clone(),
)
.unwrap();
let (id, message) = mock_sender.next_outbound().expect("No message was sent");
assert_network_message(
message,
id.into(),
PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("abc"),
PeerAuthorizationToken::from_peer_id("123"),
),
CircuitMessageType::SERVICE_DISCONNECT_RESPONSE,
|msg: ServiceDisconnectResponse| {
assert_eq!(msg.get_service_id(), "abc");
assert_eq!(msg.get_circuit(), "alpha");
assert_eq!(
msg.get_status(),
ServiceDisconnectResponse_Status::ERROR_CIRCUIT_DOES_NOT_EXIST
);
},
)
}
#[test]
fn test_service_disconnect_request_handler_not_in_circuit() {
let mock_sender = MockSender::new();
let mut dispatcher = Dispatcher::new(Box::new(mock_sender.clone()));
let (circuit, nodes) = build_circuit();
let table = RoutingTable::default();
let reader: Box<dyn RoutingTableReader> = Box::new(table.clone());
let mut writer: Box<dyn RoutingTableWriter> = Box::new(table.clone());
writer
.add_circuit(circuit.circuit_id().to_string(), circuit, nodes)
.expect("Unable to add circuit");
let handler = ServiceDisconnectRequestHandler::new(reader, writer);
dispatcher.set_handler(Box::new(handler));
let mut disconnect_request = ServiceDisconnectRequest::new();
disconnect_request.set_circuit("alpha".into());
disconnect_request.set_service_id("BAD".into());
let disconnect_bytes = disconnect_request.write_to_bytes().unwrap();
dispatcher
.dispatch(
PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("BAD"),
PeerAuthorizationToken::from_peer_id("123"),
)
.into(),
&CircuitMessageType::SERVICE_DISCONNECT_REQUEST,
disconnect_bytes.clone(),
)
.unwrap();
let (id, message) = mock_sender.next_outbound().expect("No message was sent");
assert_network_message(
message,
id.into(),
PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("BAD"),
PeerAuthorizationToken::from_peer_id("123"),
),
CircuitMessageType::SERVICE_DISCONNECT_RESPONSE,
|msg: ServiceDisconnectResponse| {
assert_eq!(msg.get_service_id(), "BAD");
assert_eq!(msg.get_circuit(), "alpha");
assert_eq!(
msg.get_status(),
ServiceDisconnectResponse_Status::ERROR_SERVICE_NOT_IN_CIRCUIT_REGISTRY
);
},
)
}
#[test]
fn test_service_disconnect_request_handler() {
let mock_sender = MockSender::new();
let mut dispatcher = Dispatcher::new(Box::new(mock_sender.clone()));
let (circuit, nodes) = build_circuit();
let table = RoutingTable::default();
let reader: Box<dyn RoutingTableReader> = Box::new(table.clone());
let mut writer: Box<dyn RoutingTableWriter> = Box::new(table.clone());
writer
.add_circuit(circuit.circuit_id().to_string(), circuit, nodes)
.expect("Unable to add circuit");
let id = ServiceId::new("alpha".into(), "abc".into());
let mut service = reader
.get_service(&id)
.expect("Unable to get service")
.unwrap();
service.set_local_peer_id(
PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("abc_network"),
PeerAuthorizationToken::from_peer_id("123"),
)
.into(),
);
writer
.add_service(id, service)
.expect("Unable to add circuit");
let handler = ServiceDisconnectRequestHandler::new(reader, writer);
dispatcher.set_handler(Box::new(handler));
let mut disconnect_request = ServiceDisconnectRequest::new();
disconnect_request.set_circuit("alpha".into());
disconnect_request.set_service_id("abc".into());
let disconnect_bytes = disconnect_request.write_to_bytes().unwrap();
dispatcher
.dispatch(
PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("abc"),
PeerAuthorizationToken::from_peer_id("123"),
)
.into(),
&CircuitMessageType::SERVICE_DISCONNECT_REQUEST,
disconnect_bytes.clone(),
)
.unwrap();
let (id, message) = mock_sender.next_outbound().expect("No message was sent");
assert_network_message(
message,
id.into(),
PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("abc"),
PeerAuthorizationToken::from_peer_id("123"),
),
CircuitMessageType::SERVICE_DISCONNECT_RESPONSE,
|msg: ServiceDisconnectResponse| {
assert_eq!(msg.get_service_id(), "abc");
assert_eq!(msg.get_circuit(), "alpha");
assert_eq!(msg.get_status(), ServiceDisconnectResponse_Status::OK);
},
)
}
#[test]
fn test_service_disconnect_request_handler_not_connected() {
let mock_sender = MockSender::new();
let mut dispatcher = Dispatcher::new(Box::new(mock_sender.clone()));
let (circuit, nodes) = build_circuit();
let table = RoutingTable::default();
let reader: Box<dyn RoutingTableReader> = Box::new(table.clone());
let mut writer: Box<dyn RoutingTableWriter> = Box::new(table.clone());
writer
.add_circuit(circuit.circuit_id().to_string(), circuit, nodes)
.expect("Unable to add circuit");
let handler = ServiceDisconnectRequestHandler::new(reader, writer);
dispatcher.set_handler(Box::new(handler));
let mut disconnect_request = ServiceDisconnectRequest::new();
disconnect_request.set_circuit("alpha".into());
disconnect_request.set_service_id("abc".into());
let disconnect_bytes = disconnect_request.write_to_bytes().unwrap();
dispatcher
.dispatch(
PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("abc"),
PeerAuthorizationToken::from_peer_id("123"),
)
.into(),
&CircuitMessageType::SERVICE_DISCONNECT_REQUEST,
disconnect_bytes.clone(),
)
.unwrap();
let (id, message) = mock_sender.next_outbound().expect("No message was sent");
assert_network_message(
message,
id.into(),
PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("abc"),
PeerAuthorizationToken::from_peer_id("123"),
),
CircuitMessageType::SERVICE_DISCONNECT_RESPONSE,
|msg: ServiceDisconnectResponse| {
assert_eq!(msg.get_service_id(), "abc");
assert_eq!(msg.get_circuit(), "alpha");
assert_eq!(
msg.get_status(),
ServiceDisconnectResponse_Status::ERROR_SERVICE_NOT_REGISTERED
);
},
)
}
fn build_circuit() -> (Circuit, Vec<CircuitNode>) {
let node_123 = CircuitNode::new("123".to_string(), vec!["123.0.0.1:0".to_string()], None);
let node_345 = CircuitNode::new("345".to_string(), vec!["123.0.0.1:1".to_string()], None);
let service_abc = Service::new(
"abc".to_string(),
"test".to_string(),
"123".to_string(),
vec![],
);
let service_def = Service::new(
"def".to_string(),
"test".to_string(),
"345".to_string(),
vec![],
);
let circuit = Circuit::new(
"alpha".into(),
vec![service_abc.clone(), service_def.clone()],
vec!["123".into(), "345".into()],
AuthorizationType::Trust,
);
(circuit, vec![node_123, node_345])
}
fn assert_network_message<M: protobuf::Message, F: Fn(M)>(
message: Vec<u8>,
recipient: PeerTokenPair,
expected_recipient: PeerTokenPair,
expected_circuit_msg_type: CircuitMessageType,
detail_assertions: F,
) {
assert_eq!(expected_recipient, recipient);
let network_msg: NetworkMessage = Message::parse_from_bytes(&message).unwrap();
let circuit_msg: CircuitMessage =
Message::parse_from_bytes(network_msg.get_payload()).unwrap();
assert_eq!(expected_circuit_msg_type, circuit_msg.get_message_type(),);
let circuit_msg: M = Message::parse_from_bytes(circuit_msg.get_payload()).unwrap();
detail_assertions(circuit_msg);
}
#[derive(Clone)]
struct MockSender {
outbound: Arc<Mutex<VecDeque<(PeerId, Vec<u8>)>>>,
}
impl MockSender {
fn new() -> Self {
Self {
outbound: Arc::new(Mutex::new(VecDeque::new())),
}
}
fn next_outbound(&self) -> Option<(PeerId, Vec<u8>)> {
self.outbound.lock().expect("lock was poisoned").pop_front()
}
}
impl MessageSender<PeerId> for MockSender {
fn send(&self, id: PeerId, message: Vec<u8>) -> Result<(), (PeerId, Vec<u8>)> {
self.outbound
.lock()
.expect("lock was poisoned")
.push_back((id, message));
Ok(())
}
}
}