mod context;
mod r#loop;
mod peer;
mod proto;
use std::any::Any;
use std::collections::HashMap;
use std::fmt;
use std::fmt::Debug;
use std::hash::Hash;
pub use context::MessageContext;
pub use r#loop::{
dispatch_channel, DispatchLoop, DispatchLoopBuilder, DispatchLoopError,
DispatchMessageReceiver, DispatchMessageSender,
};
#[derive(Debug, Clone, Default, PartialEq)]
pub struct PeerId(String);
impl std::ops::Deref for PeerId {
type Target = str;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl From<String> for PeerId {
fn from(s: String) -> PeerId {
PeerId(s)
}
}
impl From<&str> for PeerId {
fn from(s: &str) -> PeerId {
PeerId(s.into())
}
}
impl From<PeerId> for String {
fn from(peer_id: PeerId) -> String {
peer_id.0
}
}
impl fmt::Display for PeerId {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str(&self.0)
}
}
#[derive(Debug, Clone, Default, PartialEq)]
pub struct ConnectionId(String);
impl std::ops::Deref for ConnectionId {
type Target = str;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl From<String> for ConnectionId {
fn from(s: String) -> ConnectionId {
ConnectionId(s)
}
}
impl From<&str> for ConnectionId {
fn from(s: &str) -> ConnectionId {
ConnectionId(s.into())
}
}
impl From<ConnectionId> for String {
fn from(connection_id: ConnectionId) -> String {
connection_id.0
}
}
impl fmt::Display for ConnectionId {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str(&self.0)
}
}
pub trait Handler: Send {
type Source;
type MessageType: Hash + Eq + Debug + Clone;
type Message: FromMessageBytes;
fn handle(
&self,
message: Self::Message,
message_context: &MessageContext<Self::Source, Self::MessageType>,
network_sender: &dyn MessageSender<Self::Source>,
) -> Result<(), DispatchError>;
fn match_type(&self) -> Self::MessageType;
}
pub trait FromMessageBytes: Any + Sized {
fn from_message_bytes(message_bytes: &[u8]) -> Result<Self, DispatchError>;
}
#[derive(Debug, Clone)]
pub struct RawBytes {
bytes: Vec<u8>,
}
impl RawBytes {
pub fn into_inner(self) -> Vec<u8> {
self.bytes
}
pub fn bytes(&self) -> &[u8] {
&self.bytes
}
}
impl From<&[u8]> for RawBytes {
fn from(source: &[u8]) -> Self {
RawBytes {
bytes: source.to_vec(),
}
}
}
impl AsRef<[u8]> for RawBytes {
fn as_ref(&self) -> &[u8] {
&self.bytes
}
}
impl FromMessageBytes for RawBytes {
fn from_message_bytes(message_bytes: &[u8]) -> Result<Self, DispatchError> {
Ok(RawBytes::from(message_bytes))
}
}
#[derive(Debug, PartialEq)]
pub enum DispatchError {
DeserializationError(String),
SerializationError(String),
UnknownMessageType(String),
NetworkSendError((String, Vec<u8>)),
HandleError(String),
MissingNetworkSender,
}
impl std::error::Error for DispatchError {}
impl std::fmt::Display for DispatchError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
DispatchError::DeserializationError(msg) => {
write!(f, "unable to deserialize message: {}", msg)
}
DispatchError::SerializationError(msg) => {
write!(f, "unable to serialize message: {}", msg)
}
DispatchError::UnknownMessageType(msg) => write!(f, "unknown message type: {}", msg),
DispatchError::NetworkSendError((recipient, _)) => {
write!(f, "unable to send message to receipt {}", recipient)
}
DispatchError::HandleError(msg) => write!(f, "unable to handle message: {}", msg),
DispatchError::MissingNetworkSender => write!(f, "missing network sender"),
}
}
}
pub trait MessageSender<R>: Send {
fn send(&self, reciptient: R, message: Vec<u8>) -> Result<(), (R, Vec<u8>)>;
}
pub struct Dispatcher<MT, Source = PeerId>
where
Source: 'static,
MT: Any + Hash + Eq + Debug + Clone,
{
handlers: HashMap<MT, HandlerWrapper<Source, MT>>,
network_sender: Box<dyn MessageSender<Source>>,
}
impl<MT, Source> Dispatcher<MT, Source>
where
Source: 'static,
MT: Any + Hash + Eq + Debug + Clone,
{
pub fn new(network_sender: Box<dyn MessageSender<Source>>) -> Self {
Dispatcher {
handlers: HashMap::new(),
network_sender,
}
}
pub fn set_handler<T>(
&mut self,
handler: Box<dyn Handler<Source = Source, MessageType = MT, Message = T>>,
) where
T: FromMessageBytes,
{
self.handlers.insert(
handler.match_type(),
HandlerWrapper {
inner: Box::new(move |message_bytes, message_context, network_sender| {
let message = FromMessageBytes::from_message_bytes(message_bytes)?;
handler.handle(message, message_context, network_sender)
}),
},
);
}
pub fn dispatch(
&self,
source_id: Source,
message_type: &MT,
message_bytes: Vec<u8>,
) -> Result<(), DispatchError> {
let message_context = MessageContext::new(message_type.clone(), message_bytes, source_id);
self.execute(message_context)
}
pub fn dispatch_with_parent_context(
&self,
source_id: Source,
message_type: &MT,
message_bytes: Vec<u8>,
parent_context: Box<dyn Any + Send>,
) -> Result<(), DispatchError> {
let mut message_context =
MessageContext::new(message_type.clone(), message_bytes, source_id);
message_context.set_parent_context(parent_context);
self.execute(message_context)
}
fn execute(&self, ctx: MessageContext<Source, MT>) -> Result<(), DispatchError> {
self.handlers
.get(ctx.message_type())
.ok_or_else(|| {
DispatchError::UnknownMessageType(format!(
"No handler for type {:?}",
ctx.message_type(),
))
})
.and_then(|handler| handler.handle(ctx.message_bytes(), &ctx, &*self.network_sender))
}
}
type InnerHandler<Source, MT> = Box<
dyn Fn(
&[u8],
&MessageContext<Source, MT>,
&dyn MessageSender<Source>,
) -> Result<(), DispatchError>
+ Send,
>;
struct HandlerWrapper<Source, MT>
where
MT: Hash + Eq + Debug + Clone,
{
inner: InnerHandler<Source, MT>,
}
impl<Source, MT> HandlerWrapper<Source, MT>
where
MT: Hash + Eq + Debug + Clone,
{
fn handle(
&self,
message_bytes: &[u8],
message_context: &MessageContext<Source, MT>,
network_sender: &dyn MessageSender<Source>,
) -> Result<(), DispatchError> {
(*self.inner)(message_bytes, message_context, network_sender)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::{Arc, Mutex};
use protobuf::Message;
use crate::protos::network::{NetworkEcho, NetworkMessageType};
#[test]
fn dispatch_to_handler() {
let network_sender = MockSender::default();
let mut dispatcher = Dispatcher::new(Box::new(network_sender));
let handler = NetworkEchoHandler::default();
let echos = handler.echos.clone();
dispatcher.set_handler(Box::new(handler));
let mut outgoing_message = NetworkEcho::new();
outgoing_message.set_payload(b"test_dispatcher".to_vec());
let outgoing_message_bytes = outgoing_message.write_to_bytes().unwrap();
assert_eq!(
Ok(()),
dispatcher.dispatch(
"TestPeer".into(),
&NetworkMessageType::NETWORK_ECHO,
outgoing_message_bytes
)
);
assert_eq!(
vec!["test_dispatcher".to_string()],
echos.lock().unwrap().clone()
);
}
#[test]
fn move_dispatcher_to_thread() {
let network_sender = MockSender::default();
let mut dispatcher = Dispatcher::new(Box::new(network_sender));
let handler = NetworkEchoHandler::default();
let echos = handler.echos.clone();
dispatcher.set_handler(Box::new(handler));
std::thread::spawn(move || {
let mut outgoing_message = NetworkEcho::new();
outgoing_message.set_payload(b"thread_echo".to_vec());
let outgoing_message_bytes = outgoing_message.write_to_bytes().unwrap();
assert_eq!(
Ok(()),
dispatcher.dispatch(
"TestPeer".into(),
&NetworkMessageType::NETWORK_ECHO,
outgoing_message_bytes
)
);
})
.join()
.unwrap();
assert_eq!(
vec!["thread_echo".to_string()],
echos.lock().unwrap().clone()
);
}
#[derive(Default)]
struct NetworkEchoHandler {
echos: Arc<Mutex<Vec<String>>>,
}
impl Handler for NetworkEchoHandler {
type Source = PeerId;
type MessageType = NetworkMessageType;
type Message = NetworkEcho;
fn match_type(&self) -> Self::MessageType {
NetworkMessageType::NETWORK_ECHO
}
fn handle(
&self,
message: NetworkEcho,
_message_context: &MessageContext<Self::Source, NetworkMessageType>,
_: &dyn MessageSender<Self::Source>,
) -> Result<(), DispatchError> {
let echo_string = String::from_utf8(message.get_payload().to_vec()).unwrap();
self.echos.lock().unwrap().push(echo_string);
Ok(())
}
}
#[derive(Clone, Default)]
struct MockSender {}
impl MessageSender<PeerId> for MockSender {
fn send(&self, _id: PeerId, _message: Vec<u8>) -> Result<(), (PeerId, Vec<u8>)> {
Ok(())
}
}
}