1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
use std::net::SocketAddr;
use std::time::Duration;

use http::{Method, Request};
use hyper::body::{Bytes, HttpBody};
use hyper::client::HttpConnector;
use hyper::{Body, Client, StatusCode};
use rkyv::AlignedVec;

use crate::request::MessageMetadata;

#[derive(Clone)]
/// A raw client connection which can produce multiplexed streams.
pub struct Channel {
    connection: Client<HttpConnector, Body>,
    remote_addr: SocketAddr,
}

impl Channel {
    /// Connects to a remote RPC server.
    pub fn connect(remote_addr: SocketAddr) -> Self {
        let mut http = HttpConnector::new();
        http.enforce_http(false);
        http.set_nodelay(true);
        http.set_connect_timeout(Some(Duration::from_secs(2)));

        let client = hyper::Client::builder()
            .http2_keep_alive_while_idle(true)
            .http2_only(true)
            .http2_adaptive_window(true)
            .build(http);

        Self {
            connection: client,
            remote_addr,
        }
    }

    /// Sends a message payload the remote server and gets the response
    /// data back.
    pub(crate) async fn send_msg(
        &self,
        metadata: MessageMetadata,
        msg: Bytes,
    ) -> Result<Result<AlignedVec, AlignedVec>, hyper::Error> {
        let uri = format!(
            "http://{}{}",
            self.remote_addr,
            crate::to_uri_path(&metadata.service_name, &metadata.path),
        );
        let request = Request::builder()
            .method(Method::POST)
            .uri(uri)
            .body(Body::from(msg))
            .unwrap();

        let resp = self.connection.request(request).await?;
        let (req, mut body) = resp.into_parts();

        let size = body.size_hint().upper().unwrap_or(1024);
        let mut buffer = AlignedVec::with_capacity(size as usize);
        while let Some(chunk) = body.data().await {
            buffer.extend_from_slice(&chunk?);
        }

        if req.status == StatusCode::OK {
            Ok(Ok(buffer))
        } else {
            Ok(Err(buffer))
        }
    }

    #[inline]
    /// The address of the remote connection.
    pub fn remote_addr(&self) -> SocketAddr {
        self.remote_addr
    }
}