use std::{fmt, pin::Pin};
use futures_core::{
task::{Context, Poll},
Future,
};
use futures_util::future::{join, join_all};
use hyper::{
body::{aggregate, to_bytes},
http::header::AUTHORIZATION,
http::Method,
Body, Error as HyperError, Request, Response, StatusCode,
};
pub use hyper::{
client::{connect::Connect, HttpConnector},
Uri,
};
use prost::{DecodeError, Message as _};
use thiserror::Error;
use tower_service::Service;
use super::{KeyserverClient, MetadataPackage, RawAuthWrapperPackage};
use crate::models::*;
type FutResponse<Response, Error> =
Pin<Box<dyn Future<Output = Result<Response, Error>> + 'static + Send>>;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct GetPeers;
#[derive(Debug, Error)]
pub enum GetPeersError<E: fmt::Debug + fmt::Display> {
#[error("processing body failed: {0}")]
Body(HyperError),
#[error("connection failure: {0}")]
Service(E),
#[error("body decoding failure: {0}")]
Decode(DecodeError),
#[error("unexpected status code: {0}")]
UnexpectedStatusCode(u16),
#[error("peering disabled")]
PeeringDisabled,
}
impl<S> Service<(Uri, GetPeers)> for KeyserverClient<S>
where
S: Service<Request<Body>, Response = Response<Body>>,
S: Send + Clone + 'static,
S::Error: fmt::Debug,
<S as Service<Request<Body>>>::Error: fmt::Display,
<S as Service<Request<Body>>>::Future: Send,
{
type Response = Peers;
type Error = GetPeersError<S::Error>;
type Future = FutResponse<Self::Response, Self::Error>;
fn poll_ready(&mut self, context: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner_client
.poll_ready(context)
.map_err(GetPeersError::Service)
}
fn call(&mut self, (uri, _): (Uri, GetPeers)) -> Self::Future {
let mut client = self.inner_client.clone();
let http_request = Request::builder()
.method(Method::GET)
.uri(uri)
.body(Body::empty())
.unwrap();
let fut = async move {
let response = client
.call(http_request)
.await
.map_err(Self::Error::Service)?;
match response.status() {
StatusCode::OK => (),
StatusCode::NOT_IMPLEMENTED => return Err(Self::Error::PeeringDisabled),
code => return Err(Self::Error::UnexpectedStatusCode(code.as_u16())),
}
let body = response.into_body();
let buf = aggregate(body).await.map_err(Self::Error::Body)?;
let peers = Peers::decode(buf).map_err(Self::Error::Decode)?;
Ok(peers)
};
Box::pin(fut)
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct GetRawAuthWrapper;
#[derive(Debug, Error)]
pub enum GetRawAuthWrapperError<E: fmt::Debug + fmt::Display> {
#[error("processing body failed: {0}")]
Body(HyperError),
#[error("connection failure: {0}")]
Service(E),
#[error("unexpected status code: {0}")]
UnexpectedStatusCode(u16),
#[error("missing token")]
MissingToken,
}
impl<S> Service<(Uri, GetRawAuthWrapper)> for KeyserverClient<S>
where
S: Service<Request<Body>, Response = Response<Body>>,
S: Send + Clone + 'static,
S::Future: Send,
S::Error: fmt::Debug + fmt::Display,
{
type Response = RawAuthWrapperPackage;
type Error = GetRawAuthWrapperError<S::Error>;
type Future = FutResponse<Self::Response, Self::Error>;
fn poll_ready(&mut self, context: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner_client
.poll_ready(context)
.map_err(GetRawAuthWrapperError::Service)
}
fn call(&mut self, (uri, _): (Uri, GetRawAuthWrapper)) -> Self::Future {
let mut client = self.inner_client.clone();
let http_request = Request::builder()
.method(Method::GET)
.uri(uri)
.body(Body::empty())
.unwrap(); let fut = async move {
let response = client
.call(http_request)
.await
.map_err(Self::Error::Service)?;
match response.status() {
StatusCode::OK => (),
code => return Err(Self::Error::UnexpectedStatusCode(code.as_u16())),
}
#[allow(clippy::borrow_interior_mutable_const)]
let token = response
.headers()
.into_iter()
.find(|(name, value)| {
*name == AUTHORIZATION && value.as_bytes()[..4] == b"POP "[..]
})
.ok_or(Self::Error::MissingToken)?
.0
.to_string();
let body = response.into_body();
let raw_auth_wrapper = to_bytes(body).await.map_err(Self::Error::Body)?;
Ok(RawAuthWrapperPackage {
token,
raw_auth_wrapper,
})
};
Box::pin(fut)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct GetMetadata;
#[derive(Debug, Error)]
pub enum GetMetadataError<E: fmt::Debug + fmt::Display> {
#[error("metadata decoding failure: {0}")]
MetadataDecode(DecodeError),
#[error("authwrapper decoding failure: {0}")]
AuthWrapperDecode(DecodeError),
#[error("authwrapper parsing failure: {0}")]
AuthWrapperParse(ParseError),
#[error("authwrapper verification failure: {0}")]
AuthWrapperVerify(VerifyError),
#[error("processing body failed: {0}")]
Body(HyperError),
#[error("connection failure: {0}")]
Service(E),
#[error("unexpected status code: {0}")]
UnexpectedStatusCode(u16),
#[error("missing token")]
MissingToken,
}
impl<S> Service<(Uri, GetMetadata)> for KeyserverClient<S>
where
S: Service<Request<Body>, Response = Response<Body>>,
S: Send + Clone + 'static,
S::Future: Send,
S::Error: fmt::Debug + fmt::Display,
{
type Response = MetadataPackage;
type Error = GetMetadataError<S::Error>;
type Future = FutResponse<Self::Response, Self::Error>;
fn poll_ready(&mut self, context: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner_client
.poll_ready(context)
.map_err(GetMetadataError::Service)
}
fn call(&mut self, (uri, _): (Uri, GetMetadata)) -> Self::Future {
let mut client = self.inner_client.clone();
let http_request = Request::builder()
.method(Method::GET)
.uri(uri)
.body(Body::empty())
.unwrap(); let fut = async move {
let response = client
.call(http_request)
.await
.map_err(Self::Error::Service)?;
match response.status() {
StatusCode::OK => (),
code => return Err(Self::Error::UnexpectedStatusCode(code.as_u16())),
}
#[allow(clippy::borrow_interior_mutable_const)]
let token = response
.headers()
.into_iter()
.find(|(name, value)| {
*name == AUTHORIZATION && value.as_bytes()[..4] == b"POP "[..]
})
.ok_or(Self::Error::MissingToken)?
.0
.to_string();
let body = response.into_body();
let raw_auth_wrapper = to_bytes(body).await.map_err(Self::Error::Body)?;
let auth_wrapper = AuthWrapper::decode(raw_auth_wrapper.clone())
.map_err(Self::Error::AuthWrapperDecode)?;
let parsed_auth_wrapper = auth_wrapper
.parse()
.map_err(Self::Error::AuthWrapperParse)?;
parsed_auth_wrapper
.verify()
.map_err(Self::Error::AuthWrapperVerify)?;
let metadata = AddressMetadata::decode(&mut parsed_auth_wrapper.payload.as_slice())
.map_err(Self::Error::MetadataDecode)?;
Ok(MetadataPackage {
token,
public_key: parsed_auth_wrapper.public_key,
metadata,
raw_auth_wrapper,
})
};
Box::pin(fut)
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct PutMetadata {
pub token: String,
pub auth_wrapper: AuthWrapper,
}
#[derive(Debug, Clone, PartialEq, Eq, Error)]
pub enum PutMetadataError<E: fmt::Debug + fmt::Display> {
#[error("connection failure: {0}")]
Service(E),
#[error("unexpected status code: {0}")]
UnexpectedStatusCode(u16),
}
impl<S> Service<(Uri, PutMetadata)> for KeyserverClient<S>
where
S: Service<Request<Body>, Response = Response<Body>>,
S: Send + Clone + 'static,
S::Error: fmt::Debug + fmt::Display,
S::Future: Send,
{
type Response = ();
type Error = PutMetadataError<S::Error>;
type Future = FutResponse<Self::Response, Self::Error>;
fn poll_ready(&mut self, context: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner_client
.poll_ready(context)
.map_err(PutMetadataError::Service)
}
fn call(&mut self, (uri, request): (Uri, PutMetadata)) -> Self::Future {
let mut client = self.inner_client.clone();
let mut body = Vec::with_capacity(request.auth_wrapper.encoded_len());
request.auth_wrapper.encode(&mut body).unwrap();
let http_request = Request::builder()
.method(Method::PUT)
.uri(uri)
.header(AUTHORIZATION, request.token)
.body(Body::from(body))
.unwrap();
let fut = async move {
let response = client
.call(http_request)
.await
.map_err(Self::Error::Service)?;
match response.status() {
StatusCode::OK => (),
code => return Err(Self::Error::UnexpectedStatusCode(code.as_u16())),
}
Ok(())
};
Box::pin(fut)
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct PutRawAuthWrapper {
pub token: String,
pub raw_auth_wrapper: Vec<u8>,
}
impl<S> Service<(Uri, PutRawAuthWrapper)> for KeyserverClient<S>
where
S: Service<Request<Body>, Response = Response<Body>>,
S: Send + Clone + 'static,
S::Error: fmt::Debug + fmt::Display,
S::Future: Send,
{
type Response = ();
type Error = PutMetadataError<S::Error>;
type Future = FutResponse<Self::Response, Self::Error>;
fn poll_ready(&mut self, context: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner_client
.poll_ready(context)
.map_err(PutMetadataError::Service)
}
fn call(&mut self, (uri, request): (Uri, PutRawAuthWrapper)) -> Self::Future {
let mut client = self.inner_client.clone();
let body = request.raw_auth_wrapper;
let http_request = Request::builder()
.method(Method::PUT)
.uri(uri)
.header(AUTHORIZATION, request.token)
.body(Body::from(body))
.unwrap();
let fut = async move {
let response = client
.call(http_request)
.await
.map_err(Self::Error::Service)?;
match response.status() {
StatusCode::OK => (),
code => return Err(Self::Error::UnexpectedStatusCode(code.as_u16())),
}
Ok(())
};
Box::pin(fut)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SampleRequest<T> {
pub uris: Vec<Uri>,
pub request: T,
}
#[derive(Debug, Error)]
pub enum SampleError<E: fmt::Debug + fmt::Display> {
#[error("polling failure: {0}")]
Poll(E),
#[error("sampling failure: {0:?}")] Sample(Vec<(Uri, E)>),
}
impl<S, T> Service<SampleRequest<T>> for KeyserverClient<S>
where
T: Send + 'static + Clone + Sized,
S: Send + Clone + 'static,
Self: Service<(Uri, T)>,
<Self as Service<(Uri, T)>>::Response: Send + fmt::Debug,
<Self as Service<(Uri, T)>>::Error: fmt::Debug + fmt::Display + Send,
<Self as Service<(Uri, T)>>::Future: Send,
{
#[allow(clippy::type_complexity)]
type Response = Vec<(
Uri,
Result<<Self as Service<(Uri, T)>>::Response, <Self as Service<(Uri, T)>>::Error>,
)>;
type Error = SampleError<<Self as Service<(Uri, T)>>::Error>;
type Future = FutResponse<Self::Response, Self::Error>;
fn poll_ready(&mut self, context: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.poll_ready(context).map_err(SampleError::Poll)
}
fn call(&mut self, SampleRequest { uris, request }: SampleRequest<T>) -> Self::Future {
let mut inner_client = self.clone();
let fut = async move {
let response_futs = uris.into_iter().map(move |uri| {
let response_fut = inner_client.call((uri.clone(), request.clone()));
let uri_fut = async move { uri };
join(uri_fut, response_fut)
});
let responses: Vec<(Uri, Result<_, _>)> = join_all(response_futs).await;
if responses.iter().all(|(_, res)| res.is_err()) {
let errors = responses
.into_iter()
.map(|(uri, result)| (uri, result.unwrap_err()))
.collect();
return Err(SampleError::Sample(errors));
}
Ok(responses)
};
Box::pin(fut)
}
}