mod connection;
pub mod supervisor;
use std::fmt::{Debug, Display};
pub use connection::{PeerConnection, PeerReceiver, PeerSender, RecvMessage, SendMessage};
use internet2::addr::NodeAddr;
use internet2::presentation::{Error, TypedEnum, Unmarshall, Unmarshaller};
use crate::node::TryService;
#[derive(Clone, PartialEq, Eq, Debug, Display)]
pub enum PeerSocket {
#[display("--listen={0}")]
Listen(NodeAddr),
#[display("--connect={0}")]
Connect(NodeAddr),
}
pub trait Handler<T: TypedEnum> {
type Error: crate::error::Error + From<Error>;
fn handle(&mut self, message: <Unmarshaller<T> as Unmarshall>::Data)
-> Result<(), Self::Error>;
fn handle_err(&mut self, error: Self::Error) -> Result<(), Self::Error>;
}
pub struct Listener<H, T>
where
T: TypedEnum,
H: Handler<T>,
Unmarshaller<T>: Unmarshall,
<Unmarshaller<T> as Unmarshall>::Data: Display + Debug,
<Unmarshaller<T> as Unmarshall>::Error: Into<Error>,
{
receiver: PeerReceiver,
handler: H,
unmarshaller: Unmarshaller<T>,
}
impl<H, T> Listener<H, T>
where
T: TypedEnum,
H: Handler<T>,
Unmarshaller<T>: Unmarshall,
<Unmarshaller<T> as Unmarshall>::Data: Display + Debug,
<Unmarshaller<T> as Unmarshall>::Error: Into<Error>,
{
pub fn with(receiver: PeerReceiver, handler: H, unmarshaller: Unmarshaller<T>) -> Self {
Self { receiver, handler, unmarshaller }
}
}
impl<H, T> TryService for Listener<H, T>
where
T: TypedEnum,
H: Handler<T>,
Unmarshaller<T>: Unmarshall,
<Unmarshaller<T> as Unmarshall>::Data: Display + Debug,
<Unmarshaller<T> as Unmarshall>::Error: Into<Error>,
{
type ErrorType = H::Error;
fn try_run_loop(mut self) -> Result<(), Self::ErrorType> {
trace!("Entering event loop of the sender service");
loop {
match self.run() {
Ok(_) => trace!("Peer message processing complete"),
Err(err) => {
trace!("Peer connection generated {}", err);
self.handler.handle_err(err)?;
}
}
}
}
}
impl<H, T> Listener<H, T>
where
T: TypedEnum,
H: Handler<T>,
Unmarshaller<T>: Unmarshall,
<Unmarshaller<T> as Unmarshall>::Data: Display + Debug,
<Unmarshaller<T> as Unmarshall>::Error: Into<Error>,
{
fn run(&mut self) -> Result<(), H::Error> {
trace!("Awaiting for peer messages...");
let msg = self.receiver.recv_message(&self.unmarshaller)?;
debug!("Processing message {}", msg);
trace!("Message details: {:?}", msg);
self.handler.handle(msg)
}
}