1use std::{
2 net::SocketAddr,
3 str::FromStr,
4 sync::RwLock,
5 time::{Duration, Instant},
6};
7
8use async_compat::CompatExt;
9use async_socks5::AddrKind;
10use async_trait::async_trait;
11use bytes::Bytes;
12use concurrent_queue::ConcurrentQueue;
13use http_body_util::{BodyExt, Full};
14use hyper::{client::conn::http1::SendRequest, Request};
15use nanorpc::{JrpcRequest, JrpcResponse, RpcTransport};
16use smol::{future::FutureExt, net::TcpStream};
17use std::io::Result as IoResult;
18
19type Conn = SendRequest<Full<Bytes>>;
20
21pub struct HttpRpcTransport {
23 exec: smol::Executor<'static>,
24 remote: String,
25 proxy: Proxy,
26 pool: ConcurrentQueue<(Conn, Instant)>,
27 idle_timeout: Duration,
28
29 req_timeout: RwLock<Duration>,
30}
31
32#[derive(Clone)]
33pub enum Proxy {
34 Direct,
35 Socks5(SocketAddr),
36}
37
38impl HttpRpcTransport {
39 pub fn new(remote: String) -> Self {
43 Self::new_with_proxy(remote, Proxy::Direct)
44 }
45
46 pub fn new_with_proxy(remote: String, proxy: Proxy) -> Self {
50 Self {
51 exec: smol::Executor::new(),
52 remote,
53 proxy,
54 pool: ConcurrentQueue::bounded(64),
55 idle_timeout: Duration::from_secs(60),
56 req_timeout: Duration::from_secs(30).into(),
57 }
58 }
59
60 pub fn set_timeout(&self, timeout: Option<Duration>) {
62 *self.req_timeout.write().unwrap() = timeout.unwrap_or(Duration::MAX);
63 }
64
65 pub fn timeout(&self) -> Option<Duration> {
67 let to = *self.req_timeout.read().unwrap();
68 if to == Duration::MAX {
69 None
70 } else {
71 Some(to)
72 }
73 }
74
75 async fn open_conn(&self) -> IoResult<Conn> {
77 while let Ok((conn, expiry)) = self.pool.pop() {
78 if expiry.elapsed() < self.idle_timeout {
79 return Ok(conn);
80 }
81 }
82
83 let conn = match &self.proxy {
84 Proxy::Direct => TcpStream::connect(self.remote.clone()).await?,
85 Proxy::Socks5(proxy_addr) => {
86 let tcp_stream = TcpStream::connect(proxy_addr).await?;
87 let mut compat_stream = tcp_stream.compat();
88
89 let processed_remote: AddrKind =
91 if let Ok(sockaddr) = SocketAddr::from_str(&self.remote) {
92 AddrKind::Ip(sockaddr)
93 } else {
94 let (domain, port) = self.remote.split_once(':').ok_or_else(|| {
95 std::io::Error::new(
96 std::io::ErrorKind::InvalidData,
97 "destination not a valid domain:port",
98 )
99 })?;
100 AddrKind::Domain(
101 domain.to_string(),
102 port.parse().ok().ok_or_else(|| {
103 std::io::Error::new(
104 std::io::ErrorKind::InvalidData,
105 "destination port not a valid number ",
106 )
107 })?,
108 )
109 };
110 async_socks5::connect(&mut compat_stream, processed_remote, None)
111 .await
112 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
113
114 compat_stream.into_inner()
115 }
116 };
117
118 let (conn, handle) = hyper::client::conn::http1::handshake(conn.compat())
119 .await
120 .map_err(|e| std::io::Error::new(std::io::ErrorKind::BrokenPipe, e))?;
121
122 self.exec
123 .spawn(async move {
124 let _ = handle.await;
125 })
126 .detach();
127
128 Ok(conn)
129 }
130}
131
132#[async_trait]
133impl RpcTransport for HttpRpcTransport {
134 type Error = std::io::Error;
135
136 async fn call_raw(&self, req: JrpcRequest) -> Result<JrpcResponse, Self::Error> {
137 let timeout = *self.req_timeout.read().unwrap();
138 async {
139 let mut conn = self.open_conn().await?;
140 let response = conn
141 .send_request(
142 Request::builder()
143 .method("POST")
144 .body(Full::new(serde_json::to_vec(&req)?.into()))
145 .expect("could not build request"),
146 )
147 .await
148 .map_err(|e| std::io::Error::new(std::io::ErrorKind::BrokenPipe, e))?;
149 let response = response
150 .into_body()
151 .collect()
152 .await
153 .map_err(|e| std::io::Error::new(std::io::ErrorKind::BrokenPipe, e))?
154 .to_bytes();
155 let resp = serde_json::from_slice(&response)
156 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
157 let _ = self.pool.push((conn, Instant::now()));
158 Ok(resp)
159 }
160 .or(async {
161 smol::Timer::after(timeout).await;
162 Err(std::io::Error::new(
163 std::io::ErrorKind::TimedOut,
164 "nanorpc-http request timed out",
165 ))
166 })
167 .or(self.exec.run(smol::future::pending()))
168 .await
169 }
170}