1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tower_service::Service;

/// For automated code generation in build.rs.
pub mod build;

mod compiler;

pub use async_trait::async_trait;

/// Include the code generated by the compiler.
#[macro_export]
macro_rules! include_code {
    ($name: tt) => {
        include!(concat!(env!("OUT_DIR"), concat!("/", $name, ".rs")));
    };
}

#[derive(Debug)]
pub enum Error<AppError> {
    AppError(AppError),
    ChanError(anyhow::Error),
}

/// MPSC channel wrapper on the client-side.
pub struct ClientChannel<X, Y> {
    tx: mpsc::UnboundedSender<Request<X, Y>>,
}
impl<X, Y> ClientChannel<X, Y> {
    pub fn new(tx: mpsc::UnboundedSender<Request<X, Y>>) -> Self {
        Self { tx }
    }
    pub async fn call(self, req: X) -> anyhow::Result<Y> {
        let (tx1, rx1) = oneshot::channel::<Y>();
        let req = Request {
            inner: req,
            tx: tx1,
        };
        if self.tx.send(req).is_ok() {
            let rep = rx1.await?;
            Ok(rep)
        } else {
            anyhow::bail!("failed to send a request")
        }
    }
}
impl<X, Y> Clone for ClientChannel<X, Y> {
    fn clone(&self) -> Self {
        Self {
            tx: self.tx.clone(),
        }
    }
}

/// Pair of user defined request and
/// a oneshot sender for the response.
pub struct Request<X, Y> {
    inner: X,
    tx: oneshot::Sender<Y>,
}

/// MPSC channel wrapper on the server-side.
pub struct ServerChannel<Req, Svc: Service<Req>> {
    service: Svc,
    rx: mpsc::UnboundedReceiver<Request<Req, Svc::Response>>,
}
impl<Req, Svc: Service<Req> + 'static + Send + Clone> ServerChannel<Req, Svc>
where
    Req: 'static + Send,
    Svc::Future: Send,
    Svc::Response: Send,
{
    pub fn new(rx: mpsc::UnboundedReceiver<Request<Req, Svc::Response>>, service: Svc) -> Self {
        Self { service, rx }
    }
    pub async fn serve(mut self) {
        while let Some(Request { tx, inner }) = self.rx.recv().await {
            let mut cln = self.service.clone();
            tokio::spawn(async move {
                if let Ok(rep) = cln.call(inner).await {
                    tx.send(rep).ok();
                }
            });
        }
    }
}