1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
//! Remote storage adapter for warp web framework

use std::convert::Infallible;

use bytes::Bytes;
use prost::Message;
use warp::{
    body,
    http::HeaderValue,
    hyper::{
        header::{CONTENT_ENCODING, CONTENT_TYPE},
        StatusCode,
    },
    reject::{self, Reject},
    reply, Filter, Rejection, Reply,
};

use crate::{
    types::{Error, ReadRequest, ReadResponse, RemoteStorageRef, WriteRequest},
    util,
};

/// Warp handler for remote write request
pub async fn write<C: Send + Sync, Err: Reject + Send>(
    storage: RemoteStorageRef<C, Err>,
    ctx: C,
    req: WriteRequest,
) -> Result<impl Reply, Rejection> {
    storage
        .write(ctx, req)
        .await
        .map(|_| reply::reply())
        .map_err(reject::custom)
}

/// Warp handler for remote read request
pub async fn read<C: Send + Sync, Err: Reject + Send>(
    storage: RemoteStorageRef<C, Err>,
    ctx: C,
    req: ReadRequest,
) -> Result<impl Reply, Rejection> {
    storage.read(ctx, req).await.map_err(reject::custom)
}

/// Create a `Filter` that matches any requests and return a `RemoteStorageRef`,
/// which can be used in `and_then`.
pub fn with_remote_storage<C, Err>(
    storage: RemoteStorageRef<C, Err>,
) -> impl Filter<Extract = (RemoteStorageRef<C, Err>,), Error = Infallible> + Clone {
    warp::any().map(move || storage.clone())
}

// Make our `Error` `Reject`able
impl Reject for Error {}

/// Returns a `Filter` that matches any request and extracts a `Future` of a
/// protobuf-decode body
///
/// # Warning
///
/// This does not have a default size limit, it would be wise to use one to
/// prevent a overly large request from using too much memory.

// https://github.com/ParkMyCar/warp-protobuf/blob/master/src/lib.rs#L102
pub fn protobuf_body<T: Message + Send + Default>(
) -> impl Filter<Extract = (T,), Error = Rejection> + Copy {
    async fn from_reader<T: Message + Send + Default>(bytes: Bytes) -> Result<T, Rejection> {
        util::decode_snappy(&bytes)
            .map_err(reject::custom)
            .and_then(|decoded_buf| {
                T::decode(decoded_buf.as_slice())
                    .map_err(|err| reject::custom(Error::ProtoDecode(err)))
            })
    }

    body::bytes().and_then(from_reader)
}

impl warp::Reply for ReadResponse {
    fn into_response(self) -> reply::Response {
        let bytes = match util::encode_snappy(self.encode_to_vec().as_slice()) {
            Ok(v) => v,
            Err(e) => {
                return reply::with_status(
                    e.to_string().into_response(),
                    StatusCode::INTERNAL_SERVER_ERROR,
                )
                .into_response()
            }
        };

        let mut ret = warp::http::Response::new(bytes.into());
        let headers = ret.headers_mut();
        headers.insert(
            CONTENT_TYPE,
            HeaderValue::from_static("application/x-protobuf"),
        );
        headers.insert(CONTENT_ENCODING, HeaderValue::from_static("snappy"));

        ret
    }
}