use crate::Url;
use http::{HeaderValue, Uri};
use std::{
fmt::{Debug, Formatter},
str::FromStr,
};
use tower_service::Service;
use crate::{
invoker::clone_body::CloneBody,
triple::transport::{self, connection::Connection},
};
pub struct TripleInvoker {
url: Url,
conn: Connection,
}
impl TripleInvoker {
pub fn new(url: Url) -> TripleInvoker {
let uri = http::Uri::from_str(url.as_str()).unwrap();
Self {
url,
conn: Connection::new().with_host(uri).build(),
}
}
}
impl Debug for TripleInvoker {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str(format!("{:?}", self.url).as_str())
}
}
impl TripleInvoker {
pub fn map_request(&self, req: http::Request<CloneBody>) -> http::Request<CloneBody> {
let (parts, body) = req.into_parts();
let path_and_query = parts.headers.get("path").unwrap().to_str().unwrap();
let authority = self.url.authority();
let uri = Uri::builder()
.scheme("http")
.authority(authority)
.path_and_query(path_and_query)
.build()
.unwrap();
let mut req = hyper::Request::builder()
.version(http::Version::HTTP_2)
.uri(uri.clone())
.method("POST")
.body(body)
.unwrap();
for (k, v) in parts.headers.iter() {
req.headers_mut().insert(k, v.to_owned());
}
req.headers_mut()
.insert("method", HeaderValue::from_static("POST"));
req.headers_mut().insert(
"scheme",
HeaderValue::from_str(uri.scheme_str().unwrap()).unwrap(),
);
req.headers_mut()
.insert("path", HeaderValue::from_str(uri.path()).unwrap());
req.headers_mut().insert(
"authority",
HeaderValue::from_str(uri.authority().unwrap().as_str()).unwrap(),
);
req.headers_mut().insert(
"content-type",
HeaderValue::from_static("application/grpc+proto"),
);
req.headers_mut()
.insert("user-agent", HeaderValue::from_static("dubbo-rust/0.1.0"));
req.headers_mut()
.insert("te", HeaderValue::from_static("trailers"));
req.headers_mut().insert(
"tri-service-version",
HeaderValue::from_static("dubbo-rust/0.1.0"),
);
req.headers_mut()
.insert("tri-service-group", HeaderValue::from_static("cluster"));
req.headers_mut().insert(
"tri-unit-info",
HeaderValue::from_static("dubbo-rust/0.1.0"),
);
req.headers_mut()
.insert("grpc-encoding", http::HeaderValue::from_static("gzip"));
req.headers_mut().insert(
"grpc-accept-encoding",
http::HeaderValue::from_static("gzip"),
);
req
}
}
impl Service<http::Request<CloneBody>> for TripleInvoker {
type Response = http::Response<crate::BoxBody>;
type Error = crate::Error;
type Future = crate::BoxFuture<Self::Response, Self::Error>;
fn poll_ready(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
<transport::connection::Connection as Service<http::Request<CloneBody>>>::poll_ready(
&mut self.conn,
cx,
)
}
fn call(&mut self, req: http::Request<CloneBody>) -> Self::Future {
let req = self.map_request(req);
self.conn.call(req)
}
}