nanorpc_http/
client.rs

1use std::{
2    net::SocketAddr,
3    str::FromStr,
4    sync::RwLock,
5    time::{Duration, Instant},
6};
7
8use async_compat::CompatExt;
9use async_socks5::AddrKind;
10use async_trait::async_trait;
11use bytes::Bytes;
12use concurrent_queue::ConcurrentQueue;
13use http_body_util::{BodyExt, Full};
14use hyper::{client::conn::http1::SendRequest, Request};
15use nanorpc::{JrpcRequest, JrpcResponse, RpcTransport};
16use smol::{future::FutureExt, net::TcpStream};
17use std::io::Result as IoResult;
18
19type Conn = SendRequest<Full<Bytes>>;
20
21/// An HTTP-based [RpcTransport] for nanorpc.
22pub struct HttpRpcTransport {
23    exec: smol::Executor<'static>,
24    remote: String,
25    proxy: Proxy,
26    pool: ConcurrentQueue<(Conn, Instant)>,
27    idle_timeout: Duration,
28
29    req_timeout: RwLock<Duration>,
30}
31
32#[derive(Clone)]
33pub enum Proxy {
34    Direct,
35    Socks5(SocketAddr),
36}
37
38impl HttpRpcTransport {
39    /// Create a new HttpRpcTransport that goes to the given socket address over unencrypted HTTP/1.1. A default timeout is applied.
40    ///
41    /// Currently, custom paths, HTTPS, etc are not supported.
42    pub fn new(remote: String) -> Self {
43        Self::new_with_proxy(remote, Proxy::Direct)
44    }
45
46    /// Create a new HttpRpcTransport that goes to the given socket address over unencrypted HTTP/1.1. A default timeout is applied.
47    ///
48    /// Currently, custom paths, HTTPS, etc are not supported.
49    pub fn new_with_proxy(remote: String, proxy: Proxy) -> Self {
50        Self {
51            exec: smol::Executor::new(),
52            remote,
53            proxy,
54            pool: ConcurrentQueue::bounded(64),
55            idle_timeout: Duration::from_secs(60),
56            req_timeout: Duration::from_secs(30).into(),
57        }
58    }
59
60    /// Sets the timeout for this transport. If `None`, disables any timeout handling.
61    pub fn set_timeout(&self, timeout: Option<Duration>) {
62        *self.req_timeout.write().unwrap() = timeout.unwrap_or(Duration::MAX);
63    }
64
65    /// Gets the timeout for this transport.
66    pub fn timeout(&self) -> Option<Duration> {
67        let to = *self.req_timeout.read().unwrap();
68        if to == Duration::MAX {
69            None
70        } else {
71            Some(to)
72        }
73    }
74
75    /// Opens a new connection to the remote.
76    async fn open_conn(&self) -> IoResult<Conn> {
77        while let Ok((conn, expiry)) = self.pool.pop() {
78            if expiry.elapsed() < self.idle_timeout {
79                return Ok(conn);
80            }
81        }
82
83        let conn = match &self.proxy {
84            Proxy::Direct => TcpStream::connect(self.remote.clone()).await?,
85            Proxy::Socks5(proxy_addr) => {
86                let tcp_stream = TcpStream::connect(proxy_addr).await?;
87                let mut compat_stream = tcp_stream.compat();
88
89                // TODO: this doesn't handle normal domains with .haven, find a smarter way.
90                let processed_remote: AddrKind =
91                    if let Ok(sockaddr) = SocketAddr::from_str(&self.remote) {
92                        AddrKind::Ip(sockaddr)
93                    } else {
94                        let (domain, port) = self.remote.split_once(':').ok_or_else(|| {
95                            std::io::Error::new(
96                                std::io::ErrorKind::InvalidData,
97                                "destination not a valid domain:port",
98                            )
99                        })?;
100                        AddrKind::Domain(
101                            domain.to_string(),
102                            port.parse().ok().ok_or_else(|| {
103                                std::io::Error::new(
104                                    std::io::ErrorKind::InvalidData,
105                                    "destination port not a valid number ",
106                                )
107                            })?,
108                        )
109                    };
110                async_socks5::connect(&mut compat_stream, processed_remote, None)
111                    .await
112                    .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
113
114                compat_stream.into_inner()
115            }
116        };
117
118        let (conn, handle) = hyper::client::conn::http1::handshake(conn.compat())
119            .await
120            .map_err(|e| std::io::Error::new(std::io::ErrorKind::BrokenPipe, e))?;
121
122        self.exec
123            .spawn(async move {
124                let _ = handle.await;
125            })
126            .detach();
127
128        Ok(conn)
129    }
130}
131
132#[async_trait]
133impl RpcTransport for HttpRpcTransport {
134    type Error = std::io::Error;
135
136    async fn call_raw(&self, req: JrpcRequest) -> Result<JrpcResponse, Self::Error> {
137        let timeout = *self.req_timeout.read().unwrap();
138        async {
139            let mut conn = self.open_conn().await?;
140            let response = conn
141                .send_request(
142                    Request::builder()
143                        .method("POST")
144                        .body(Full::new(serde_json::to_vec(&req)?.into()))
145                        .expect("could not build request"),
146                )
147                .await
148                .map_err(|e| std::io::Error::new(std::io::ErrorKind::BrokenPipe, e))?;
149            let response = response
150                .into_body()
151                .collect()
152                .await
153                .map_err(|e| std::io::Error::new(std::io::ErrorKind::BrokenPipe, e))?
154                .to_bytes();
155            let resp = serde_json::from_slice(&response)
156                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
157            let _ = self.pool.push((conn, Instant::now()));
158            Ok(resp)
159        }
160        .or(async {
161            smol::Timer::after(timeout).await;
162            Err(std::io::Error::new(
163                std::io::ErrorKind::TimedOut,
164                "nanorpc-http request timed out",
165            ))
166        })
167        .or(self.exec.run(smol::future::pending()))
168        .await
169    }
170}