use super::limit_stream::LimitStream;
use super::HttpStore;
use bytes::BytesMut;
use futures::{
future::{err, Either},
Future, Stream,
};
use hyper::{
body::Body, header::AUTHORIZATION, service::Service as HttpService, Error, Request, Response,
};
use interledger_packet::{Fulfill, Prepare, Reject};
use interledger_service::*;
use log::{debug, error, trace};
use std::convert::TryFrom;
use std::str::FromStr;
pub const MAX_MESSAGE_SIZE: usize = 40000;
#[derive(Clone)]
pub struct HttpServerService<S, T> {
next: S,
store: T,
}
impl<S, T> HttpServerService<S, T>
where
S: IncomingService<T::Account> + Clone + 'static,
T: HttpStore,
{
pub fn new(next: S, store: T) -> Self {
HttpServerService { next, store }
}
fn check_authorization(
&self,
request: &Request<Body>,
) -> impl Future<Item = T::Account, Error = Response<Body>> {
let authorization = request
.headers()
.get(AUTHORIZATION)
.and_then(|auth| auth.to_str().ok())
.map(|auth| AuthToken::from_str(auth).ok())
.and_then(|x| x);
if let Some(authorization) = authorization {
Either::A(
self.store
.get_account_from_http_auth(
&authorization.username(),
&authorization.password(),
)
.map_err(move |_err| {
error!("Authorization not found in the DB: {:?}", authorization);
Response::builder().status(401).body(Body::empty()).unwrap()
}),
)
} else {
Either::B(err(Response::builder()
.status(401)
.body(Body::empty())
.unwrap()))
}
}
pub fn handle_http_request(
&mut self,
request: Request<Body>,
) -> impl Future<Item = Response<Body>, Error = Error> {
let mut next = self.next.clone();
self.check_authorization(&request)
.and_then(|from_account| {
parse_prepare_from_request(request, Some(MAX_MESSAGE_SIZE)).and_then(
move |prepare| {
trace!(
"Got incoming ILP over HTTP packet from account: {}",
from_account.id()
);
next.handle_request(IncomingRequest {
from: from_account,
prepare,
})
.then(ilp_response_to_http_response)
},
)
})
.then(|result| match result {
Ok(response) => Ok(response),
Err(response) => Ok(response),
})
}
}
impl<S, T> HttpService for HttpServerService<S, T>
where
S: IncomingService<T::Account> + Clone + Send + 'static,
T: HttpStore + 'static,
{
type ReqBody = Body;
type ResBody = Body;
type Error = Error;
type Future =
Box<dyn Future<Item = Response<Self::ResBody>, Error = Self::Error> + Send + 'static>;
fn call(&mut self, request: Request<Self::ReqBody>) -> Self::Future {
Box::new(self.handle_http_request(request))
}
}
fn parse_prepare_from_request(
request: Request<Body>,
max_message_size: Option<usize>,
) -> impl Future<Item = Prepare, Error = Response<Body>> + 'static {
LimitStream::new(max_message_size, request.into_body())
.concat2()
.map_err(|err| {
eprintln!("Concatenating stream failed: {:?}", err);
Response::builder().status(500).body(Body::empty()).unwrap()
})
.and_then(|body| {
let bytes = body.into_bytes().try_mut().unwrap_or_else(|bytes| {
debug!("Copying bytes from incoming HTTP request into Prepare packet");
BytesMut::from(bytes)
});
Prepare::try_from(bytes).map_err(|err| {
eprintln!("Parsing prepare packet failed: {:?}", err);
Response::builder().status(400).body(Body::empty()).unwrap()
})
})
}
fn ilp_response_to_http_response(
result: Result<Fulfill, Reject>,
) -> Result<Response<Body>, Response<Body>> {
let bytes: BytesMut = match result {
Ok(fulfill) => fulfill.into(),
Err(reject) => reject.into(),
};
Ok(Response::builder()
.status(200)
.header("content-type", "application/octet-stream")
.body(bytes.freeze().into())
.unwrap())
}
#[cfg(test)]
mod test_limit_stream {
use super::*;
use interledger_packet::{Address, PrepareBuilder};
use std::str::FromStr;
use std::time::{Duration, SystemTime};
#[test]
fn test_parse_prepare_from_request_less() {
let prepare_data = PrepareBuilder {
amount: 1,
destination: Address::from_str("test.prepare").unwrap(),
execution_condition: &[0; 32],
expires_at: SystemTime::now() + Duration::from_secs(30),
data: &[0; MAX_MESSAGE_SIZE],
};
let prepare = prepare_data.clone().build();
let body_size = BytesMut::from(prepare.clone()).len();
let parsed_prepare = make_prepare_and_parse(prepare_data, Some(body_size)).unwrap();
assert_eq!(prepare.amount(), parsed_prepare.amount());
assert_eq!(prepare.destination(), parsed_prepare.destination());
assert_eq!(
prepare.execution_condition(),
parsed_prepare.execution_condition()
);
assert_eq!(
get_millis_from_unix_epoch(prepare.expires_at()),
get_millis_from_unix_epoch(parsed_prepare.expires_at())
);
assert_eq!(prepare.data(), parsed_prepare.data());
}
#[test]
fn test_parse_prepare_from_request_more() {
let prepare_data = PrepareBuilder {
amount: 1,
destination: Address::from_str("test.prepare").unwrap(),
execution_condition: &[0; 32],
expires_at: SystemTime::now() + Duration::from_secs(30),
data: &[0; 0],
};
let prepare = make_prepare_and_parse(prepare_data, Some(1));
assert!(prepare.is_err());
}
#[test]
fn test_parse_prepare_from_request_no_limit() {
let prepare_data = PrepareBuilder {
amount: 1,
destination: Address::from_str("test.prepare").unwrap(),
execution_condition: &[0; 32],
expires_at: SystemTime::now() + Duration::from_secs(30),
data: &[0; MAX_MESSAGE_SIZE],
};
let prepare = prepare_data.clone().build();
let parsed_prepare = make_prepare_and_parse(prepare_data, None).unwrap();
assert_eq!(prepare.amount(), parsed_prepare.amount());
assert_eq!(prepare.destination(), parsed_prepare.destination());
assert_eq!(
prepare.execution_condition(),
parsed_prepare.execution_condition()
);
assert_eq!(
get_millis_from_unix_epoch(prepare.expires_at()),
get_millis_from_unix_epoch(parsed_prepare.expires_at())
);
assert_eq!(prepare.data(), parsed_prepare.data());
}
fn make_prepare_and_parse(
prepare_data: PrepareBuilder,
max_message_size: Option<usize>,
) -> Result<Prepare, Response<Body>> {
let prepare = prepare_data.build();
let prepare_bytes = BytesMut::from(prepare).freeze();
println!("prepare_bytes: {:?}", prepare_bytes);
let body: Body = hyper::Body::from(prepare_bytes);
let request = hyper::Request::builder()
.header("content-type", "application/octet-stream")
.body(body)
.unwrap();
parse_prepare_from_request(request, max_message_size).wait()
}
fn get_millis_from_unix_epoch(system_time: SystemTime) -> u128 {
system_time
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis()
}
}