prom_remote_api/web/
warp.rs1use std::convert::Infallible;
4
5use bytes::Bytes;
6use prost::Message;
7use warp::{
8 body,
9 http::HeaderValue,
10 hyper::{
11 header::{CONTENT_ENCODING, CONTENT_TYPE},
12 StatusCode,
13 },
14 reject::{self, Reject},
15 reply, Filter, Rejection, Reply,
16};
17
18use crate::{
19 types::{Error, ReadRequest, ReadResponse, RemoteStorageRef, WriteRequest},
20 util,
21};
22
23pub async fn write<C: Send + Sync, Err: Reject + Send>(
25 storage: RemoteStorageRef<C, Err>,
26 ctx: C,
27 req: WriteRequest,
28) -> Result<impl Reply, Rejection> {
29 storage
30 .write(ctx, req)
31 .await
32 .map(|_| reply::reply())
33 .map_err(reject::custom)
34}
35
36pub async fn read<C: Send + Sync, Err: Reject + Send>(
38 storage: RemoteStorageRef<C, Err>,
39 ctx: C,
40 req: ReadRequest,
41) -> Result<impl Reply, Rejection> {
42 storage.read(ctx, req).await.map_err(reject::custom)
43}
44
45pub fn with_remote_storage<C, Err>(
48 storage: RemoteStorageRef<C, Err>,
49) -> impl Filter<Extract = (RemoteStorageRef<C, Err>,), Error = Infallible> + Clone {
50 warp::any().map(move || storage.clone())
51}
52
53impl Reject for Error {}
55
56pub fn protobuf_body<T: Message + Send + Default>(
66) -> impl Filter<Extract = (T,), Error = Rejection> + Copy {
67 async fn from_reader<T: Message + Send + Default>(bytes: Bytes) -> Result<T, Rejection> {
68 util::decode_snappy(&bytes)
69 .map_err(reject::custom)
70 .and_then(|decoded_buf| {
71 T::decode(decoded_buf.as_slice())
72 .map_err(|err| reject::custom(Error::ProtoDecode(err)))
73 })
74 }
75
76 body::bytes().and_then(from_reader)
77}
78
79impl warp::Reply for ReadResponse {
80 fn into_response(self) -> reply::Response {
81 let bytes = match util::encode_snappy(self.encode_to_vec().as_slice()) {
82 Ok(v) => v,
83 Err(e) => {
84 return reply::with_status(
85 e.to_string().into_response(),
86 StatusCode::INTERNAL_SERVER_ERROR,
87 )
88 .into_response()
89 }
90 };
91
92 let mut ret = warp::http::Response::new(bytes.into());
93 let headers = ret.headers_mut();
94 headers.insert(
95 CONTENT_TYPE,
96 HeaderValue::from_static("application/x-protobuf"),
97 );
98 headers.insert(CONTENT_ENCODING, HeaderValue::from_static("snappy"));
99
100 ret
101 }
102}