datacake_rpc/net/
client.rs1use std::net::SocketAddr;
2
3use http::{Method, Request};
4use hyper::StatusCode;
5use rkyv::AlignedVec;
6
7#[cfg(feature = "simulation")]
8use super::simulation::LazyClient;
9use crate::body::Body;
10use crate::net::Error;
11use crate::request::MessageMetadata;
12
13#[derive(Clone)]
14pub struct Channel {
16 #[cfg(not(feature = "simulation"))]
17 connection: hyper::Client<hyper::client::HttpConnector, hyper::Body>,
18
19 #[cfg(feature = "simulation")]
20 connection: LazyClient,
21
22 remote_addr: SocketAddr,
23}
24
25impl Channel {
26 #[cfg(not(feature = "simulation"))]
27 pub fn connect(remote_addr: SocketAddr) -> Self {
29 let mut http = hyper::client::HttpConnector::new();
30 http.enforce_http(false);
31 http.set_nodelay(true);
32 http.set_connect_timeout(Some(std::time::Duration::from_secs(2)));
33
34 let client = hyper::Client::builder()
35 .http2_keep_alive_while_idle(true)
36 .http2_only(true)
37 .http2_adaptive_window(true)
38 .build(http);
39
40 Self {
41 connection: client,
42 remote_addr,
43 }
44 }
45
46 #[cfg(feature = "simulation")]
47 pub fn connect(remote_addr: SocketAddr) -> Self {
49 let client = LazyClient::connect(remote_addr);
50
51 Self {
52 connection: client,
53 remote_addr,
54 }
55 }
56
57 pub(crate) async fn send_msg(
60 &self,
61 metadata: MessageMetadata,
62 msg: Body,
63 ) -> Result<Result<Body, AlignedVec>, Error> {
64 let uri = format!(
65 "http://{}{}",
66 self.remote_addr,
67 crate::to_uri_path(&metadata.service_name, &metadata.path),
68 );
69 let request = Request::builder()
70 .method(Method::POST)
71 .uri(uri)
72 .body(msg.into_inner())
73 .unwrap();
74
75 #[cfg(not(feature = "simulation"))]
76 let resp = self.connection.request(request).await?;
77 #[cfg(feature = "simulation")]
78 let resp = {
79 let conn = self.connection.get_or_init().await?;
80 conn.lock().await.send_request(request).await?
81 };
82
83 let (req, body) = resp.into_parts();
84
85 if req.status == StatusCode::OK {
86 Ok(Ok(Body::new(body)))
87 } else {
88 let buffer = crate::utils::to_aligned(body).await?;
89 Ok(Err(buffer))
90 }
91 }
92
93 #[inline]
94 pub fn remote_addr(&self) -> SocketAddr {
96 self.remote_addr
97 }
98}