1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
use super::{error::*, HttpStore};
use bytes::{buf::Buf, Bytes, BytesMut};
use futures::{
    future::{err, Either, FutureResult},
    Future,
};
use interledger_packet::Prepare;
use interledger_service::{AuthToken, IncomingRequest, IncomingService};
use log::error;
use std::{convert::TryFrom, net::SocketAddr};
use warp::{self, Filter, Rejection};

/// Max message size that is allowed to transfer from a request or a message.
pub const MAX_PACKET_SIZE: u64 = 40000;

/// A warp filter that parses incoming ILP-Over-HTTP requests, validates the authorization,
/// and passes the request to an IncomingService handler.
#[derive(Clone)]
pub struct HttpServer<I, S> {
    incoming: I,
    store: S,
}

impl<I, S> HttpServer<I, S>
where
    I: IncomingService<S::Account> + Clone + Send + Sync + 'static,
    S: HttpStore,
{
    pub fn new(incoming: I, store: S) -> Self {
        HttpServer { incoming, store }
    }

    pub fn as_filter(
        &self,
    ) -> impl warp::Filter<Extract = (warp::http::Response<Bytes>,), Error = warp::Rejection> + Clone
    {
        let incoming = self.incoming.clone();
        let store = self.store.clone();

        warp::post2()
            .and(warp::path("ilp"))
            .and(warp::path::end())
            .and(warp::header::<AuthToken>("authorization"))
            .and_then(move |auth: AuthToken| {
                store
                    .get_account_from_http_auth(auth.username(), auth.password())
                    .map_err(move |_| -> Rejection {
                        error!(
                            "Invalid authorization provided for user: {}",
                            auth.username()
                        );
                        ApiError::unauthorized().into()
                    })
            })
            .and(warp::body::content_length_limit(MAX_PACKET_SIZE))
            .and(warp::body::concat())
            .and_then(
                move |account: S::Account,
                      body: warp::body::FullBody|
                      -> Either<_, FutureResult<_, Rejection>> {
                    // TODO don't copy ILP packet
                    let buffer = BytesMut::from(body.bytes());
                    if let Ok(prepare) = Prepare::try_from(buffer) {
                        Either::A(
                            incoming
                                .clone()
                                .handle_request(IncomingRequest {
                                    from: account,
                                    prepare,
                                })
                                .then(|result| {
                                    let bytes: BytesMut = match result {
                                        Ok(fulfill) => fulfill.into(),
                                        Err(reject) => reject.into(),
                                    };
                                    Ok(warp::http::Response::builder()
                                        .header("Content-Type", "application/octet-stream")
                                        .status(200)
                                        .body(bytes.freeze())
                                        .unwrap())
                                }),
                        )
                    } else {
                        error!("Body was not a valid Prepare packet");
                        Either::B(err(ApiError::invalid_ilp_packet().into()))
                    }
                },
            )
    }

    pub fn bind(&self, addr: SocketAddr) -> impl Future<Item = (), Error = ()> + Send {
        warp::serve(self.as_filter()).bind(addr)
    }
}