1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
use std::net::SocketAddr;

use http::{Method, Request};
use hyper::StatusCode;
use rkyv::AlignedVec;

#[cfg(feature = "simulation")]
use super::simulation::LazyClient;
use crate::body::Body;
use crate::net::Error;
use crate::request::MessageMetadata;

#[derive(Clone)]
/// A raw client connection which can produce multiplexed streams.
pub struct Channel {
    #[cfg(not(feature = "simulation"))]
    connection: hyper::Client<hyper::client::HttpConnector, hyper::Body>,

    #[cfg(feature = "simulation")]
    connection: LazyClient,

    remote_addr: SocketAddr,
}

impl Channel {
    #[cfg(not(feature = "simulation"))]
    /// Connects to a remote RPC server.
    pub fn connect(remote_addr: SocketAddr) -> Self {
        let mut http = hyper::client::HttpConnector::new();
        http.enforce_http(false);
        http.set_nodelay(true);
        http.set_connect_timeout(Some(std::time::Duration::from_secs(2)));

        let client = hyper::Client::builder()
            .http2_keep_alive_while_idle(true)
            .http2_only(true)
            .http2_adaptive_window(true)
            .build(http);

        Self {
            connection: client,
            remote_addr,
        }
    }

    #[cfg(feature = "simulation")]
    /// Connects to a remote RPC server with turmoil simulation enabled.
    pub fn connect(remote_addr: SocketAddr) -> Self {
        let client = LazyClient::connect(remote_addr);

        Self {
            connection: client,
            remote_addr,
        }
    }

    /// Sends a message payload the remote server and gets the response
    /// data back.
    pub(crate) async fn send_msg(
        &self,
        metadata: MessageMetadata,
        msg: Body,
    ) -> Result<Result<Body, AlignedVec>, Error> {
        let uri = format!(
            "http://{}{}",
            self.remote_addr,
            crate::to_uri_path(&metadata.service_name, &metadata.path),
        );
        let request = Request::builder()
            .method(Method::POST)
            .uri(uri)
            .body(msg.into_inner())
            .unwrap();

        #[cfg(not(feature = "simulation"))]
        let resp = self.connection.request(request).await?;
        #[cfg(feature = "simulation")]
        let resp = {
            let conn = self.connection.get_or_init().await?;
            conn.lock().await.send_request(request).await?
        };

        let (req, body) = resp.into_parts();

        if req.status == StatusCode::OK {
            Ok(Ok(Body::new(body)))
        } else {
            let buffer = crate::utils::to_aligned(body).await?;
            Ok(Err(buffer))
        }
    }

    #[inline]
    /// The address of the remote connection.
    pub fn remote_addr(&self) -> SocketAddr {
        self.remote_addr
    }
}