use crate::network::dispatch::{
DispatchError, DispatchMessageSender, Handler, MessageContext, MessageSender, PeerId,
};
use crate::protos::circuit::{CircuitMessage, CircuitMessageType};
use crate::protos::network::NetworkMessageType;
pub struct CircuitMessageHandler {
sender: DispatchMessageSender<CircuitMessageType>,
}
impl Handler for CircuitMessageHandler {
type Source = PeerId;
type MessageType = NetworkMessageType;
type Message = CircuitMessage;
fn match_type(&self) -> Self::MessageType {
NetworkMessageType::CIRCUIT
}
fn handle(
&self,
msg: Self::Message,
context: &MessageContext<Self::Source, Self::MessageType>,
_: &dyn MessageSender<Self::Source>,
) -> Result<(), DispatchError> {
debug!(
"Handle CircuitMessage {:?} from {} [{} byte{}]",
msg.get_message_type(),
context.source_peer_id(),
msg.get_payload().len(),
if msg.get_payload().len() == 1 {
""
} else {
"s"
}
);
self.sender
.send(
msg.get_message_type(),
msg.get_payload().to_vec(),
context.source_id().clone(),
)
.map_err(|_| {
DispatchError::NetworkSendError((context.source_peer_id().to_string(), msg.payload))
})?;
Ok(())
}
}
impl CircuitMessageHandler {
pub fn new(sender: DispatchMessageSender<CircuitMessageType>) -> Self {
CircuitMessageHandler { sender }
}
}
#[cfg(test)]
mod tests {
use std::sync::mpsc::{channel, Sender};
use super::*;
use crate::network::dispatch::{DispatchLoopBuilder, Dispatcher};
use crate::peer::{PeerAuthorizationToken, PeerTokenPair};
use crate::protos::circuit::ServiceConnectRequest;
use crate::protos::network::NetworkMessageType;
use protobuf::Message;
#[test]
fn test_circuit_message_handler() {
let network_sender = MockSender::default();
let mut network_dispatcher = Dispatcher::new(Box::new(network_sender.clone()));
let mut circuit_dispatcher = Dispatcher::new(Box::new(network_sender));
let (tx, rx) = channel();
let handler = ServiceConnectedTestHandler::new(tx);
circuit_dispatcher.set_handler(Box::new(handler));
let circuit_dispatcher_loop = DispatchLoopBuilder::new()
.with_dispatcher(circuit_dispatcher)
.build()
.unwrap();
let circuit_dispatcher_message_sender = circuit_dispatcher_loop.new_dispatcher_sender();
let handler = CircuitMessageHandler::new(circuit_dispatcher_message_sender);
network_dispatcher.set_handler(Box::new(handler));
let mut service_request = ServiceConnectRequest::new();
service_request.set_service_id("TEST_SERVICE".to_string());
let service_msg_bytes = service_request.write_to_bytes().unwrap();
let mut circuit_msg = CircuitMessage::new();
circuit_msg.set_message_type(CircuitMessageType::SERVICE_CONNECT_REQUEST);
circuit_msg.set_payload(service_msg_bytes);
let circuit_bytes = circuit_msg.write_to_bytes().unwrap();
network_dispatcher
.dispatch(
PeerTokenPair::new(
PeerAuthorizationToken::from_peer_id("PEER"),
PeerAuthorizationToken::from_peer_id("123"),
)
.into(),
&NetworkMessageType::CIRCUIT,
circuit_bytes.clone(),
)
.unwrap();
assert_eq!("TEST_SERVICE".to_string(), rx.recv().unwrap());
}
struct ServiceConnectedTestHandler {
echos: Sender<String>,
}
impl ServiceConnectedTestHandler {
fn new(echos: Sender<String>) -> Self {
Self { echos }
}
}
impl Handler for ServiceConnectedTestHandler {
type Source = PeerId;
type MessageType = CircuitMessageType;
type Message = ServiceConnectRequest;
fn match_type(&self) -> Self::MessageType {
CircuitMessageType::SERVICE_CONNECT_REQUEST
}
fn handle(
&self,
message: Self::Message,
_message_context: &MessageContext<Self::Source, Self::MessageType>,
_: &dyn MessageSender<Self::Source>,
) -> Result<(), DispatchError> {
self.echos
.send(message.get_service_id().to_string())
.unwrap();
Ok(())
}
}
#[derive(Clone, Default)]
struct MockSender {}
impl MessageSender<PeerId> for MockSender {
fn send(&self, _id: PeerId, _message: Vec<u8>) -> Result<(), (PeerId, Vec<u8>)> {
Ok(())
}
}
}