use flume::{Receiver, Sender};
use std::{fmt, time::Duration};
use tokio::time;
pub struct Request<Req, Res> {
data: Req,
tx: Sender<Res>,
}
impl<Req, Res> fmt::Debug for Request<Req, Res>
where
Req: fmt::Debug,
Res: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Request")
.field("data", &self.data)
.field("tx", &self.tx)
.finish()
}
}
impl<Req, Res> Request<Req, Res> {
pub fn data(&self) -> &Req {
&self.data
}
pub fn reply(&self, res: Res) {
let _ = self.tx.send(res);
}
}
pub struct Response<Res> {
rx: Receiver<Res>,
}
impl<Res> fmt::Debug for Response<Res>
where
Res: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Response").field("rx", &self.rx).finish()
}
}
#[derive(Debug)]
pub enum ReqErr {
Dropped,
Timeout,
}
impl fmt::Display for ReqErr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Dropped => write!(f, "Request object dropped before replying."),
Self::Timeout => write!(f, "Timed out before receiving response."),
}
}
}
impl std::error::Error for ReqErr {}
impl<Res> Response<Res> {
pub async fn recv(self) -> Result<Res, ReqErr> {
self.rx.recv_async().await.map_err(|_| ReqErr::Dropped)
}
pub async fn recv_timeout(self, dur: Duration) -> Result<Res, ReqErr> {
time::timeout(dur, self.recv())
.await
.map_err(|_| ReqErr::Timeout)
.and_then(|x| x)
}
}
pub fn req_res<Req, Res>(req: Req) -> (Request<Req, Res>, Response<Res>) {
let (tx, rx) = flume::unbounded();
(Request { data: req, tx }, Response { rx })
}