use std::fmt;
use crate::peer::Address;
use crate::message;
pub use tokio::sync::mpsc as channel;
pub use channel::unbounded_channel as new_channel;
pub use tokio::sync::oneshot as oneshot;
pub use oneshot::channel as new_oneshot;
pub type Sender<T> = channel::UnboundedSender::<T>;
pub type Receiver<T> = channel::UnboundedReceiver::<T>;
pub type Replier<T> = oneshot::Sender<T>;
pub type ReplyReceiver<T> = oneshot::Receiver<T>;
pub struct ChannelPair<A, B> {
tx: Sender::<A>,
rx: Receiver::<B>
}
pub type PktTo = (Address, message::Packet);
pub type PktFrom = (Address, message::Packet);
pub trait Respond {
type R;
fn reply<IntoR: Into<Self::R>>(self, reply: IntoR);
}
impl<T: fmt::Debug> Respond for Replier<T> {
type R = T;
fn reply<IntoR: Into<Self::R>>(self, reply: IntoR) {
match self.send(reply.into()) {
Ok(_) => {},
Err(reply) => error!(?reply, "failed to respond")
}
}
}
impl <A, B> ChannelPair<A, B> {
pub fn combine(tx: Sender<A>, rx: Receiver<B>) -> Self {
Self {tx, rx}
}
pub fn duplex_pair() -> (ChannelPair<A, B>, ChannelPair<B, A>) {
let (tx0, rx0) = new_channel::<A>();
let (tx1, rx1) = new_channel::<B>();
(
ChannelPair::<A, B> {tx: tx0, rx: rx1},
ChannelPair::<B, A> {tx: tx1, rx: rx0}
)
}
pub fn split(self) -> (Sender<A>, Receiver<B>) {
(self.tx, self.rx)
}
}
pub type ServerChannel = ChannelPair<PktFrom, PktTo>;
pub type Request<C, R> = (C, Replier<R>);
pub type GetRequest<R> = Request<(), R>;
pub trait Requester: Sized + Respond {
type Content;
type Reply;
fn combine(content: Self::Content, replier: Replier<Self::Reply>) -> Self;
fn make(content: Self::Content) -> (Self, ReplyReceiver<Self::Reply>) {
let (replier, receiver) = new_oneshot();
(Self::combine(content, replier), receiver)
}
}
pub trait RequestSender {
type S;
fn send_request(&self, request: Self::S);
fn request<R, W, IntoS>(&self, wrap: W, content: R::Content)
-> ReplyReceiver<R::Reply>
where W: Fn(R) -> IntoS,
IntoS: Into<Self::S>,
R: Requester
{
let (req, receiver) = R::make(content);
self.send_request(wrap(req).into());
receiver
}
fn get<R, W, IntoS>(&self, wrap: W) -> ReplyReceiver<R::Reply>
where W: Fn(R) -> IntoS,
IntoS: Into<Self::S>,
R: Requester<Content = ()>,
{
let (req, receiver) = R::make(());
self.send_request(wrap(req).into());
receiver
}
}
impl<S: fmt::Debug> RequestSender for Sender<S> {
type S = S;
fn send_request(&self, request: S) {
self.send(request)
.unwrap_or_else(|request| error!(?request, "failed to send"))
}
}
impl<C, R: fmt::Debug> Requester for Request<C, R> {
type Content = C;
type Reply = R;
fn combine(content: C, replier: Replier<R>) -> Self {
(content, replier)
}
}
impl<C, R: fmt::Debug> Respond for Request<C, R> {
type R = R;
fn reply<IntoR: Into<Self::R>>(self, reply: IntoR) {
self.1.reply(reply)
}
}