1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tower_service::Service;
pub mod build;
mod compiler;
pub use async_trait::async_trait;
#[macro_export]
macro_rules! include_code {
($name: tt) => {
include!(concat!(env!("OUT_DIR"), concat!("/", $name, ".rs")));
};
}
#[derive(Debug)]
pub enum Error<AppError> {
AppError(AppError),
ChanError(anyhow::Error),
}
pub struct ClientChannel<X, Y> {
tx: mpsc::UnboundedSender<Request<X, Y>>,
}
impl<X, Y> ClientChannel<X, Y> {
pub fn new(tx: mpsc::UnboundedSender<Request<X, Y>>) -> Self {
Self { tx }
}
pub async fn call(self, req: X) -> anyhow::Result<Y> {
let (tx1, rx1) = oneshot::channel::<Y>();
let req = Request {
inner: req,
tx: tx1,
};
if self.tx.send(req).is_ok() {
let rep = rx1.await?;
Ok(rep)
} else {
anyhow::bail!("failed to send a request")
}
}
}
impl<X, Y> Clone for ClientChannel<X, Y> {
fn clone(&self) -> Self {
Self {
tx: self.tx.clone(),
}
}
}
pub struct Request<X, Y> {
inner: X,
tx: oneshot::Sender<Y>,
}
pub struct ServerChannel<Req, Svc: Service<Req>> {
service: Svc,
rx: mpsc::UnboundedReceiver<Request<Req, Svc::Response>>,
}
impl<Req, Svc: Service<Req> + 'static + Send + Clone> ServerChannel<Req, Svc>
where
Req: 'static + Send,
Svc::Future: Send,
Svc::Response: Send,
{
pub fn new(rx: mpsc::UnboundedReceiver<Request<Req, Svc::Response>>, service: Svc) -> Self {
Self { service, rx }
}
pub async fn serve(mut self) {
while let Some(Request { tx, inner }) = self.rx.recv().await {
let mut cln = self.service.clone();
tokio::spawn(async move {
if let Ok(rep) = cln.call(inner).await {
tx.send(rep).ok();
}
});
}
}
}