prom_remote_api/web/
warp.rs

1//! Remote storage adapter for warp web framework
2
3use std::convert::Infallible;
4
5use bytes::Bytes;
6use prost::Message;
7use warp::{
8    body,
9    http::HeaderValue,
10    hyper::{
11        header::{CONTENT_ENCODING, CONTENT_TYPE},
12        StatusCode,
13    },
14    reject::{self, Reject},
15    reply, Filter, Rejection, Reply,
16};
17
18use crate::{
19    types::{Error, ReadRequest, ReadResponse, RemoteStorageRef, WriteRequest},
20    util,
21};
22
23/// Warp handler for remote write request
24pub async fn write<C: Send + Sync, Err: Reject + Send>(
25    storage: RemoteStorageRef<C, Err>,
26    ctx: C,
27    req: WriteRequest,
28) -> Result<impl Reply, Rejection> {
29    storage
30        .write(ctx, req)
31        .await
32        .map(|_| reply::reply())
33        .map_err(reject::custom)
34}
35
36/// Warp handler for remote read request
37pub async fn read<C: Send + Sync, Err: Reject + Send>(
38    storage: RemoteStorageRef<C, Err>,
39    ctx: C,
40    req: ReadRequest,
41) -> Result<impl Reply, Rejection> {
42    storage.read(ctx, req).await.map_err(reject::custom)
43}
44
45/// Create a `Filter` that matches any requests and return a `RemoteStorageRef`,
46/// which can be used in `and_then`.
47pub fn with_remote_storage<C, Err>(
48    storage: RemoteStorageRef<C, Err>,
49) -> impl Filter<Extract = (RemoteStorageRef<C, Err>,), Error = Infallible> + Clone {
50    warp::any().map(move || storage.clone())
51}
52
53// Make our `Error` `Reject`able
54impl Reject for Error {}
55
56/// Returns a `Filter` that matches any request and extracts a `Future` of a
57/// protobuf-decode body
58///
59/// # Warning
60///
61/// This does not have a default size limit, it would be wise to use one to
62/// prevent a overly large request from using too much memory.
63
64// https://github.com/ParkMyCar/warp-protobuf/blob/master/src/lib.rs#L102
65pub fn protobuf_body<T: Message + Send + Default>(
66) -> impl Filter<Extract = (T,), Error = Rejection> + Copy {
67    async fn from_reader<T: Message + Send + Default>(bytes: Bytes) -> Result<T, Rejection> {
68        util::decode_snappy(&bytes)
69            .map_err(reject::custom)
70            .and_then(|decoded_buf| {
71                T::decode(decoded_buf.as_slice())
72                    .map_err(|err| reject::custom(Error::ProtoDecode(err)))
73            })
74    }
75
76    body::bytes().and_then(from_reader)
77}
78
79impl warp::Reply for ReadResponse {
80    fn into_response(self) -> reply::Response {
81        let bytes = match util::encode_snappy(self.encode_to_vec().as_slice()) {
82            Ok(v) => v,
83            Err(e) => {
84                return reply::with_status(
85                    e.to_string().into_response(),
86                    StatusCode::INTERNAL_SERVER_ERROR,
87                )
88                .into_response()
89            }
90        };
91
92        let mut ret = warp::http::Response::new(bytes.into());
93        let headers = ret.headers_mut();
94        headers.insert(
95            CONTENT_TYPE,
96            HeaderValue::from_static("application/x-protobuf"),
97        );
98        headers.insert(CONTENT_ENCODING, HeaderValue::from_static("snappy"));
99
100        ret
101    }
102}