datacake_rpc/net/
client.rs

1use std::net::SocketAddr;
2
3use http::{Method, Request};
4use hyper::StatusCode;
5use rkyv::AlignedVec;
6
7#[cfg(feature = "simulation")]
8use super::simulation::LazyClient;
9use crate::body::Body;
10use crate::net::Error;
11use crate::request::MessageMetadata;
12
13#[derive(Clone)]
14/// A raw client connection which can produce multiplexed streams.
15pub struct Channel {
16    #[cfg(not(feature = "simulation"))]
17    connection: hyper::Client<hyper::client::HttpConnector, hyper::Body>,
18
19    #[cfg(feature = "simulation")]
20    connection: LazyClient,
21
22    remote_addr: SocketAddr,
23}
24
25impl Channel {
26    #[cfg(not(feature = "simulation"))]
27    /// Connects to a remote RPC server.
28    pub fn connect(remote_addr: SocketAddr) -> Self {
29        let mut http = hyper::client::HttpConnector::new();
30        http.enforce_http(false);
31        http.set_nodelay(true);
32        http.set_connect_timeout(Some(std::time::Duration::from_secs(2)));
33
34        let client = hyper::Client::builder()
35            .http2_keep_alive_while_idle(true)
36            .http2_only(true)
37            .http2_adaptive_window(true)
38            .build(http);
39
40        Self {
41            connection: client,
42            remote_addr,
43        }
44    }
45
46    #[cfg(feature = "simulation")]
47    /// Connects to a remote RPC server with turmoil simulation enabled.
48    pub fn connect(remote_addr: SocketAddr) -> Self {
49        let client = LazyClient::connect(remote_addr);
50
51        Self {
52            connection: client,
53            remote_addr,
54        }
55    }
56
57    /// Sends a message payload the remote server and gets the response
58    /// data back.
59    pub(crate) async fn send_msg(
60        &self,
61        metadata: MessageMetadata,
62        msg: Body,
63    ) -> Result<Result<Body, AlignedVec>, Error> {
64        let uri = format!(
65            "http://{}{}",
66            self.remote_addr,
67            crate::to_uri_path(&metadata.service_name, &metadata.path),
68        );
69        let request = Request::builder()
70            .method(Method::POST)
71            .uri(uri)
72            .body(msg.into_inner())
73            .unwrap();
74
75        #[cfg(not(feature = "simulation"))]
76        let resp = self.connection.request(request).await?;
77        #[cfg(feature = "simulation")]
78        let resp = {
79            let conn = self.connection.get_or_init().await?;
80            conn.lock().await.send_request(request).await?
81        };
82
83        let (req, body) = resp.into_parts();
84
85        if req.status == StatusCode::OK {
86            Ok(Ok(Body::new(body)))
87        } else {
88            let buffer = crate::utils::to_aligned(body).await?;
89            Ok(Err(buffer))
90        }
91    }
92
93    #[inline]
94    /// The address of the remote connection.
95    pub fn remote_addr(&self) -> SocketAddr {
96        self.remote_addr
97    }
98}