1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
use super::HttpStore;
use bytes::{buf::Buf, Bytes, BytesMut};
use futures::{
    future::{err, Either},
    Future,
};
use interledger_packet::Prepare;
use interledger_service::{AuthToken, IncomingRequest, IncomingService};
use log::error;
use std::{
    convert::TryFrom,
    error::Error as StdError,
    fmt::{self, Display},
    net::SocketAddr,
};
use warp::{self, Filter};

/// Max message size that is allowed to transfer from a request or a message.
pub const MAX_PACKET_SIZE: u64 = 40000;

/// A warp filter that parses incoming ILP-Over-HTTP requests, validates the authorization,
/// and passes the request to an IncomingService handler.
#[derive(Clone)]
pub struct HttpServer<I, S> {
    incoming: I,
    store: S,
}

#[derive(Clone, Copy, Debug)]
enum ApiError {
    InvalidPacket,
    Unauthorized,
}

impl Display for ApiError {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        f.write_str(match self {
            ApiError::InvalidPacket => "Body was not a valid ILP Prepare packet",
            ApiError::Unauthorized => "Unauthorized",
        })
    }
}

impl StdError for ApiError {}

impl<I, S> HttpServer<I, S>
where
    I: IncomingService<S::Account> + Clone + Send + Sync + 'static,
    S: HttpStore,
{
    pub fn new(incoming: I, store: S) -> Self {
        HttpServer { incoming, store }
    }

    pub fn as_filter(
        &self,
    ) -> impl warp::Filter<Extract = (warp::http::Response<Bytes>,), Error = warp::Rejection> + Clone
    {
        let incoming = self.incoming.clone();
        let store = self.store.clone();

        warp::post2()
            .and(warp::header::<AuthToken>("authorization"))
            .and_then(move |auth: AuthToken| {
                store
                    .get_account_from_http_auth(auth.username(), auth.password())
                    .map_err(move |_| {
                        error!(
                            "Invalid authorization provided for user: {}",
                            auth.username()
                        );
                        warp::reject::custom(ApiError::Unauthorized)
                    })
            })
            .and(warp::body::content_length_limit(MAX_PACKET_SIZE))
            .and(warp::body::concat())
            .and_then(move |account: S::Account, body: warp::body::FullBody| {
                // TODO don't copy ILP packet
                let buffer = BytesMut::from(body.bytes());
                if let Ok(prepare) = Prepare::try_from(buffer) {
                    Either::A(
                        incoming
                            .clone()
                            .handle_request(IncomingRequest {
                                from: account,
                                prepare,
                            })
                            .then(|result| {
                                let bytes: BytesMut = match result {
                                    Ok(fulfill) => fulfill.into(),
                                    Err(reject) => reject.into(),
                                };
                                Ok(warp::http::Response::builder()
                                    .header("Content-Type", "application/octet-stream")
                                    .status(200)
                                    .body(bytes.freeze())
                                    .unwrap())
                            }),
                    )
                } else {
                    error!("Body was not a valid Prepare packet");
                    Either::B(err(warp::reject::custom(ApiError::InvalidPacket)))
                }
            })
    }

    pub fn bind(&self, addr: SocketAddr) -> impl Future<Item = (), Error = ()> + Send {
        warp::serve(self.as_filter()).bind(addr)
    }
}