1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
use std::{
    net::SocketAddr,
    time::{Duration, Instant},
};

use async_compat::CompatExt;
use async_trait::async_trait;
use bytes::Bytes;
use concurrent_queue::ConcurrentQueue;
use http_body_util::{BodyExt, Full};
use hyper::{client::conn::http1::SendRequest, Request};
use nanorpc::{JrpcRequest, JrpcResponse, RpcTransport};
use smol::future::FutureExt;

type Conn = SendRequest<Full<Bytes>>;

/// An HTTP-based [RpcTransport] for nanorpc.
pub struct HttpRpcTransport {
    exec: smol::Executor<'static>,
    remote: SocketAddr,
    pool: ConcurrentQueue<(Conn, Instant)>,
    idle_timeout: Duration,
}

impl HttpRpcTransport {
    /// Create a new HttpRpcTransport that goes to the given socket address over unencrypted HTTP/1.1.
    ///
    /// Currently, custom paths, HTTPS, etc are not supported.
    pub fn new(remote: SocketAddr) -> Self {
        Self {
            exec: smol::Executor::new(),
            remote,
            pool: ConcurrentQueue::bounded(64),
            idle_timeout: Duration::from_secs(60),
        }
    }
    /// Opens a new connection to the remote.
    async fn open_conn(&self) -> std::io::Result<Conn> {
        while let Ok((conn, expiry)) = self.pool.pop() {
            if expiry.elapsed() < self.idle_timeout {
                return Ok(conn);
            }
        }
        // okay there's nothing in the pool for us. create a new conn asap
        let conn = smol::net::TcpStream::connect(self.remote).await?;
        let (conn, handle) = hyper::client::conn::http1::handshake(conn.compat())
            .await
            .map_err(|e| std::io::Error::new(std::io::ErrorKind::BrokenPipe, e))?;
        self.exec
            .spawn(async move {
                let _ = handle.await;
            })
            .detach();
        Ok(conn)
    }
}

#[async_trait]
impl RpcTransport for HttpRpcTransport {
    type Error = std::io::Error;

    async fn call_raw(&self, req: JrpcRequest) -> Result<JrpcResponse, Self::Error> {
        async {
            let mut conn = self.open_conn().await?;
            let response = conn
                .send_request(
                    Request::builder()
                        .body(Full::new(serde_json::to_vec(&req)?.into()))
                        .expect("could not build request"),
                )
                .await
                .map_err(|e| std::io::Error::new(std::io::ErrorKind::BrokenPipe, e))?;
            let response = response
                .into_body()
                .collect()
                .await
                .map_err(|e| std::io::Error::new(std::io::ErrorKind::BrokenPipe, e))?
                .to_bytes();
            let resp = serde_json::from_slice(&response)
                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
            let _ = self.pool.push((conn, Instant::now()));
            Ok(resp)
        }
        .or(self.exec.run(smol::future::pending()))
        .await
    }
}