1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
use std::{
net::SocketAddr,
time::{Duration, Instant},
};
use async_compat::CompatExt;
use async_trait::async_trait;
use bytes::Bytes;
use concurrent_queue::ConcurrentQueue;
use http_body_util::{BodyExt, Full};
use hyper::{client::conn::http1::SendRequest, Request};
use nanorpc::{JrpcRequest, JrpcResponse, RpcTransport};
use smol::future::FutureExt;
type Conn = SendRequest<Full<Bytes>>;
pub struct HttpRpcTransport {
exec: smol::Executor<'static>,
remote: SocketAddr,
pool: ConcurrentQueue<(Conn, Instant)>,
idle_timeout: Duration,
}
impl HttpRpcTransport {
pub fn new(remote: SocketAddr) -> Self {
Self {
exec: smol::Executor::new(),
remote,
pool: ConcurrentQueue::bounded(64),
idle_timeout: Duration::from_secs(60),
}
}
async fn open_conn(&self) -> std::io::Result<Conn> {
while let Ok((conn, expiry)) = self.pool.pop() {
if expiry.elapsed() < self.idle_timeout {
return Ok(conn);
}
}
let conn = smol::net::TcpStream::connect(self.remote).await?;
let (conn, handle) = hyper::client::conn::http1::handshake(conn.compat())
.await
.map_err(|e| std::io::Error::new(std::io::ErrorKind::BrokenPipe, e))?;
self.exec
.spawn(async move {
let _ = handle.await;
})
.detach();
Ok(conn)
}
}
#[async_trait]
impl RpcTransport for HttpRpcTransport {
type Error = std::io::Error;
async fn call_raw(&self, req: JrpcRequest) -> Result<JrpcResponse, Self::Error> {
async {
let mut conn = self.open_conn().await?;
let response = conn
.send_request(
Request::builder()
.body(Full::new(serde_json::to_vec(&req)?.into()))
.expect("could not build request"),
)
.await
.map_err(|e| std::io::Error::new(std::io::ErrorKind::BrokenPipe, e))?;
let response = response
.into_body()
.collect()
.await
.map_err(|e| std::io::Error::new(std::io::ErrorKind::BrokenPipe, e))?
.to_bytes();
let resp = serde_json::from_slice(&response)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
let _ = self.pool.push((conn, Instant::now()));
Ok(resp)
}
.or(self.exec.run(smol::future::pending()))
.await
}
}