1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
use std::convert::Infallible;
use prost::Message;
use warp::{
body,
http::HeaderValue,
hyper::{
header::{CONTENT_ENCODING, CONTENT_TYPE},
StatusCode,
},
reject::{self, Reject},
reply, Buf, Filter, Rejection, Reply,
};
use crate::{
types::{Error, ReadRequest, ReadResponse, RemoteStorageRef, WriteRequest},
util,
};
pub async fn write<C: Send + Sync + Clone, Err: Reject + Send>(
storage: RemoteStorageRef<C, Err>,
ctx: C,
req: WriteRequest,
) -> Result<impl Reply, Rejection> {
storage
.write(ctx, req)
.await
.map(|_| reply::reply())
.map_err(reject::custom)
}
pub async fn read<C: Send + Sync + Clone, Err: Reject + Send>(
storage: RemoteStorageRef<C, Err>,
ctx: C,
req: ReadRequest,
) -> Result<impl Reply, Rejection> {
storage.read(ctx, req).await.map_err(reject::custom)
}
pub fn with_remote_storage<C, Err>(
storage: RemoteStorageRef<C, Err>,
) -> impl Filter<Extract = (RemoteStorageRef<C, Err>,), Error = Infallible> + Clone {
warp::any().map(move || storage.clone())
}
impl Reject for Error {}
pub fn protobuf_body<T: Message + Send + Default>(
) -> impl Filter<Extract = (T,), Error = Rejection> + Copy {
async fn from_reader<T: Message + Send + Default>(buf: impl Buf) -> Result<T, Rejection> {
util::decode_snappy(buf.chunk())
.map_err(reject::custom)
.and_then(|decoded_buf| {
T::decode(decoded_buf.as_slice())
.map_err(|err| reject::custom(Error::ProtoDecode(err)))
})
}
body::aggregate().and_then(from_reader)
}
impl warp::Reply for ReadResponse {
fn into_response(self) -> reply::Response {
let bytes = match util::encode_snappy(self.encode_to_vec().as_slice()) {
Ok(v) => v,
Err(e) => {
return reply::with_status(
e.to_string().into_response(),
StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response()
}
};
let mut ret = warp::http::Response::new(bytes.into());
let headers = ret.headers_mut();
headers.insert(
CONTENT_TYPE,
HeaderValue::from_static("application/x-protobuf"),
);
headers.insert(CONTENT_ENCODING, HeaderValue::from_static("snappy"));
ret
}
}