1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
use std::task::Poll;
use hyper::client::{conn::Builder, service::Connect};
use tower_service::Service;
use tracing::debug;
use crate::{boxed, triple::transport::connector::get_connector};
#[derive(Debug, Clone)]
pub struct Connection {
host: hyper::Uri,
connector: &'static str,
builder: Builder,
}
impl Default for Connection {
fn default() -> Self {
Self::new()
}
}
impl Connection {
pub fn new() -> Self {
Connection {
host: hyper::Uri::default(),
connector: "http",
builder: Builder::new(),
}
}
pub fn with_connector(mut self, connector: &'static str) -> Self {
self.connector = connector;
self
}
pub fn with_host(mut self, uri: hyper::Uri) -> Self {
self.host = uri;
self
}
pub fn with_builder(mut self, builder: Builder) -> Self {
self.builder = builder;
self
}
}
impl<ReqBody> Service<http::Request<ReqBody>> for Connection
where
ReqBody: http_body::Body + Unpin + Send + 'static,
ReqBody::Data: Send + Unpin,
ReqBody::Error: Into<crate::Error>,
{
type Response = http::Response<crate::BoxBody>;
type Error = crate::Error;
type Future = crate::BoxFuture<Self::Response, Self::Error>;
fn poll_ready(
&mut self,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: http::Request<ReqBody>) -> Self::Future {
let builder = self.builder.clone().http2_only(true).to_owned();
let mut connector = Connect::new(get_connector(self.connector), builder);
let uri = self.host.clone();
let fut = async move {
debug!("send rpc call to {}", uri);
let mut con = connector.call(uri).await.unwrap();
con.call(req)
.await
.map_err(|err| err.into())
.map(|res| res.map(boxed))
};
Box::pin(fut)
}
}