1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
use std::net::SocketAddr;
use std::time::Duration;
use http::{Method, Request};
use hyper::body::{Bytes, HttpBody};
use hyper::client::HttpConnector;
use hyper::{Body, Client, StatusCode};
use rkyv::AlignedVec;
use crate::request::MessageMetadata;
#[derive(Clone)]
pub struct Channel {
connection: Client<HttpConnector, Body>,
remote_addr: SocketAddr,
}
impl Channel {
pub fn connect(remote_addr: SocketAddr) -> Self {
let mut http = HttpConnector::new();
http.enforce_http(false);
http.set_nodelay(true);
http.set_connect_timeout(Some(Duration::from_secs(2)));
let client = hyper::Client::builder()
.http2_keep_alive_while_idle(true)
.http2_only(true)
.http2_adaptive_window(true)
.build(http);
Self {
connection: client,
remote_addr,
}
}
pub(crate) async fn send_msg(
&self,
metadata: MessageMetadata,
msg: Bytes,
) -> Result<Result<AlignedVec, AlignedVec>, hyper::Error> {
let uri = format!(
"http://{}{}",
self.remote_addr,
crate::to_uri_path(&metadata.service_name, &metadata.path),
);
let request = Request::builder()
.method(Method::POST)
.uri(uri)
.body(Body::from(msg))
.unwrap();
let resp = self.connection.request(request).await?;
let (req, mut body) = resp.into_parts();
let size = body.size_hint().upper().unwrap_or(1024);
let mut buffer = AlignedVec::with_capacity(size as usize);
while let Some(chunk) = body.data().await {
buffer.extend_from_slice(&chunk?);
}
if req.status == StatusCode::OK {
Ok(Ok(buffer))
} else {
Ok(Err(buffer))
}
}
#[inline]
pub fn remote_addr(&self) -> SocketAddr {
self.remote_addr
}
}