1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
use std::task::Poll;
use deno_core::error::AnyError;
use futures::future::poll_fn;
use tokio::sync::oneshot::{self, error::TryRecvError};
pub struct ClientChannel<Req, Res> {
rx: Option<oneshot::Receiver<Res>>,
tx: Option<oneshot::Sender<Req>>,
}
pub struct ServerChannel<Req, Res> {
rx: Option<oneshot::Receiver<Req>>,
tx: Option<oneshot::Sender<Res>>,
}
pub fn create_client_server_channel<Req, Res>() -> (ClientChannel<Req, Res>, ServerChannel<Req, Res>)
{
let (tx, rx) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
(
ClientChannel {
rx: Some(rx),
tx: Some(tx2),
},
ServerChannel {
rx: Some(rx2),
tx: Some(tx),
},
)
}
impl<Req, Res> ClientChannel<Req, Res> {
pub fn send(&mut self, req: Req) -> Result<(), AnyError> {
let tx = self
.tx
.take()
.ok_or_else(|| AnyError::msg("client tx already taken"))?;
tx.send(req)
.map_err(|_e| AnyError::msg("Error: failed to send request"))?;
Ok(())
}
pub async fn recv(&mut self) -> Result<Res, AnyError> {
if let Some(rx) = self.rx.as_mut() {
poll_fn(move |_cx| match rx.try_recv() {
Ok(res) => Poll::Ready(Ok(res)),
Err(TryRecvError::Empty) => Poll::Pending,
_ => Poll::Ready(Err(AnyError::msg("Error: failed to receive response"))),
})
.await
} else {
Err(AnyError::msg("Error: client rx already taken"))
}
}
}
impl<Req, Res> ServerChannel<Req, Res> {
pub fn take_tx(&mut self) -> Result<oneshot::Sender<Res>, AnyError> {
let tx = self
.tx
.take()
.ok_or_else(|| AnyError::msg("server tx already taken"))?;
Ok(tx)
}
pub fn send(&mut self, res: Res) -> Result<(), AnyError> {
let tx = self.take_tx()?;
tx.send(res)
.map_err(|_e| AnyError::msg("Error: failed to send response"))?;
Ok(())
}
pub async fn recv(&mut self) -> Result<Req, AnyError> {
let rx = self
.rx
.take()
.ok_or_else(|| AnyError::msg("server rx already taken"))?;
Ok(rx.await?)
}
}