use std::any::Any;
use std::error::Error;
use std::fmt;
use std::fmt::Debug;
use std::hash::Hash;
use std::sync::mpsc::{channel, Receiver, RecvError, Sender};
use super::{Dispatcher, PeerId};
use crate::error::InternalError;
use crate::threading::lifecycle::ShutdownHandle;
enum DispatchMessage<MT, Source = PeerId>
where
MT: Any + Hash + Eq + Debug + Clone,
{
Message {
message_type: MT,
message_bytes: Vec<u8>,
source_id: Source,
parent_context: Option<Box<dyn Any + Send>>,
},
Shutdown,
}
#[derive(Debug)]
pub struct DispatchLoopError(String);
impl Error for DispatchLoopError {}
impl fmt::Display for DispatchLoopError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "received error from dispatch loop: {}", self.0)
}
}
#[derive(Default)]
pub struct DispatchLoopBuilder<MT, Source = PeerId>
where
Source: 'static,
MT: Any + Hash + Eq + Debug + Clone,
{
dispatcher: Option<Dispatcher<MT, Source>>,
channel: Option<(
DispatchMessageSender<MT, Source>,
DispatchMessageReceiver<MT, Source>,
)>,
thread_name: Option<String>,
}
impl<MT, Source> DispatchLoopBuilder<MT, Source>
where
MT: Any + Hash + Eq + Debug + Clone + Send,
Source: Send + 'static,
{
pub fn new() -> Self {
DispatchLoopBuilder {
dispatcher: None,
channel: None,
thread_name: None,
}
}
pub fn with_dispatch_channel(
mut self,
channel: (
DispatchMessageSender<MT, Source>,
DispatchMessageReceiver<MT, Source>,
),
) -> Self {
self.channel = Some(channel);
self
}
pub fn with_dispatcher(mut self, dispatcher: Dispatcher<MT, Source>) -> Self {
self.dispatcher = Some(dispatcher);
self
}
pub fn with_thread_name(mut self, name: String) -> Self {
self.thread_name = Some(name);
self
}
pub fn build(mut self) -> Result<DispatchLoop<MT, Source>, String> {
let (tx, rx) = self.channel.take().unwrap_or_else(dispatch_channel);
let dispatcher = self
.dispatcher
.take()
.ok_or_else(|| "No dispatch provided".to_string())?;
let thread_name = self
.thread_name
.unwrap_or_else(|| format!("DispatchLoop({})", std::any::type_name::<MT>()));
let join_handle = std::thread::Builder::new()
.name(thread_name)
.spawn(move || loop {
match rx.receiver.recv() {
Ok(DispatchMessage::Message {
message_type,
message_bytes,
source_id,
parent_context: Some(context),
}) => {
if let Err(err) = dispatcher.dispatch_with_parent_context(
source_id,
&message_type,
message_bytes,
context,
) {
warn!("Unable to dispatch message: {:?}", err);
}
}
Ok(DispatchMessage::Message {
message_type,
message_bytes,
source_id,
parent_context: None,
}) => {
if let Err(err) =
dispatcher.dispatch(source_id, &message_type, message_bytes)
{
warn!("Unable to dispatch message: {:?}", err);
}
}
Ok(DispatchMessage::Shutdown) => {
debug!("Received shutdown signal");
break;
}
Err(RecvError) => {
error!("Received error from receiver");
break;
}
}
});
match join_handle {
Ok(join_handle) => Ok(DispatchLoop {
sender: tx.sender,
join_handle,
}),
Err(err) => Err(format!("Unable to start up dispatch loop thread: {}", err)),
}
}
}
pub struct DispatchLoop<MT, Source = PeerId>
where
MT: Any + Hash + Eq + Debug + Clone,
{
sender: Sender<DispatchMessage<MT, Source>>,
join_handle: std::thread::JoinHandle<()>,
}
impl<MT, Source> DispatchLoop<MT, Source>
where
MT: Any + Hash + Eq + Debug + Clone,
{
pub fn new_dispatcher_sender(&self) -> DispatchMessageSender<MT, Source> {
DispatchMessageSender {
sender: self.sender.clone(),
}
}
}
impl<MT, Source> ShutdownHandle for DispatchLoop<MT, Source>
where
MT: Any + Hash + Eq + Debug + Clone,
{
fn signal_shutdown(&mut self) {
if self.sender.send(DispatchMessage::Shutdown).is_err() {
error!("Unable to send shutdown signal to already shutdown dispatch loop");
}
}
fn wait_for_shutdown(self) -> Result<(), InternalError> {
if self.join_handle.join().is_err() {
return Err(InternalError::with_message(
"Unable to join dispatch loop thread".into(),
));
}
Ok(())
}
}
pub fn dispatch_channel<MT, Source>() -> (
DispatchMessageSender<MT, Source>,
DispatchMessageReceiver<MT, Source>,
)
where
MT: Any + Hash + Eq + Debug + Clone,
{
let (tx, rx) = channel();
(
DispatchMessageSender { sender: tx },
DispatchMessageReceiver { receiver: rx },
)
}
pub struct DispatchMessageReceiver<MT, Source = PeerId>
where
MT: Any + Hash + Eq + Debug + Clone,
{
receiver: Receiver<DispatchMessage<MT, Source>>,
}
type MessageTuple<MT, Source> = (MT, Vec<u8>, Source);
type MessageTupleWithParentContext<MT, Source> = (MT, Vec<u8>, Source, Box<dyn Any + Send>);
#[derive(Clone)]
pub struct DispatchMessageSender<MT, Source = PeerId>
where
MT: Any + Hash + Eq + Debug + Clone,
{
sender: Sender<DispatchMessage<MT, Source>>,
}
impl<MT, Source> DispatchMessageSender<MT, Source>
where
MT: Any + Hash + Eq + Debug + Clone,
{
pub fn send(
&self,
message_type: MT,
message_bytes: Vec<u8>,
source_id: Source,
) -> Result<(), MessageTuple<MT, Source>> {
self.sender
.send(DispatchMessage::Message {
message_type,
message_bytes,
source_id,
parent_context: None,
})
.map_err(|err| match err.0 {
DispatchMessage::Message {
message_type,
message_bytes,
source_id,
..
} => (message_type, message_bytes, source_id),
DispatchMessage::Shutdown => unreachable!(), })
}
pub fn send_with_parent_context(
&self,
message_type: MT,
message_bytes: Vec<u8>,
source_id: Source,
parent_context: Box<dyn Any + Send>,
) -> Result<(), MessageTupleWithParentContext<MT, Source>> {
self.sender
.send(DispatchMessage::Message {
message_type,
message_bytes,
source_id,
parent_context: Some(parent_context),
})
.map_err(|err| match err.0 {
DispatchMessage::Message {
message_type,
message_bytes,
source_id,
parent_context: Some(pc),
} => (message_type, message_bytes, source_id, pc),
_ => unreachable!(), })
}
}