1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
use super::{error::*, HttpStore};
use bytes::{buf::Buf, Bytes, BytesMut};
use futures::{
future::{err, Either, FutureResult},
Future,
};
use interledger_packet::Prepare;
use interledger_service::{AuthToken, IncomingRequest, IncomingService};
use log::error;
use std::{convert::TryFrom, net::SocketAddr};
use warp::{self, Filter, Rejection};
pub const MAX_PACKET_SIZE: u64 = 40000;
#[derive(Clone)]
pub struct HttpServer<I, S> {
incoming: I,
store: S,
}
impl<I, S> HttpServer<I, S>
where
I: IncomingService<S::Account> + Clone + Send + Sync + 'static,
S: HttpStore,
{
pub fn new(incoming: I, store: S) -> Self {
HttpServer { incoming, store }
}
pub fn as_filter(
&self,
) -> impl warp::Filter<Extract = (warp::http::Response<Bytes>,), Error = warp::Rejection> + Clone
{
let incoming = self.incoming.clone();
let store = self.store.clone();
warp::post2()
.and(warp::path("ilp"))
.and(warp::path::end())
.and(warp::header::<AuthToken>("authorization"))
.and_then(move |auth: AuthToken| {
store
.get_account_from_http_auth(auth.username(), auth.password())
.map_err(move |_| -> Rejection {
error!(
"Invalid authorization provided for user: {}",
auth.username()
);
ApiError::unauthorized().into()
})
})
.and(warp::body::content_length_limit(MAX_PACKET_SIZE))
.and(warp::body::concat())
.and_then(
move |account: S::Account,
body: warp::body::FullBody|
-> Either<_, FutureResult<_, Rejection>> {
let buffer = BytesMut::from(body.bytes());
if let Ok(prepare) = Prepare::try_from(buffer) {
Either::A(
incoming
.clone()
.handle_request(IncomingRequest {
from: account,
prepare,
})
.then(|result| {
let bytes: BytesMut = match result {
Ok(fulfill) => fulfill.into(),
Err(reject) => reject.into(),
};
Ok(warp::http::Response::builder()
.header("Content-Type", "application/octet-stream")
.status(200)
.body(bytes.freeze())
.unwrap())
}),
)
} else {
error!("Body was not a valid Prepare packet");
Either::B(err(ApiError::invalid_ilp_packet().into()))
}
},
)
}
pub fn bind(&self, addr: SocketAddr) -> impl Future<Item = (), Error = ()> + Send {
warp::serve(self.as_filter()).bind(addr)
}
}