use std::convert::Infallible;
use bytes::Bytes;
use prost::Message;
use warp::{
body,
http::HeaderValue,
hyper::{
header::{CONTENT_ENCODING, CONTENT_TYPE},
StatusCode,
},
reject::{self, Reject},
reply, Filter, Rejection, Reply,
};
use crate::{
types::{Error, ReadRequest, ReadResponse, RemoteStorageRef, WriteRequest},
util,
};
pub async fn write<C: Send + Sync, Err: Reject + Send>(
storage: RemoteStorageRef<C, Err>,
ctx: C,
req: WriteRequest,
) -> Result<impl Reply, Rejection> {
storage
.write(ctx, req)
.await
.map(|_| reply::reply())
.map_err(reject::custom)
}
pub async fn read<C: Send + Sync, Err: Reject + Send>(
storage: RemoteStorageRef<C, Err>,
ctx: C,
req: ReadRequest,
) -> Result<impl Reply, Rejection> {
storage.read(ctx, req).await.map_err(reject::custom)
}
pub fn with_remote_storage<C, Err>(
storage: RemoteStorageRef<C, Err>,
) -> impl Filter<Extract = (RemoteStorageRef<C, Err>,), Error = Infallible> + Clone {
warp::any().map(move || storage.clone())
}
impl Reject for Error {}
pub fn protobuf_body<T: Message + Send + Default>(
) -> impl Filter<Extract = (T,), Error = Rejection> + Copy {
async fn from_reader<T: Message + Send + Default>(bytes: Bytes) -> Result<T, Rejection> {
util::decode_snappy(&bytes)
.map_err(reject::custom)
.and_then(|decoded_buf| {
T::decode(decoded_buf.as_slice())
.map_err(|err| reject::custom(Error::ProtoDecode(err)))
})
}
body::bytes().and_then(from_reader)
}
impl warp::Reply for ReadResponse {
fn into_response(self) -> reply::Response {
let bytes = match util::encode_snappy(self.encode_to_vec().as_slice()) {
Ok(v) => v,
Err(e) => {
return reply::with_status(
e.to_string().into_response(),
StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response()
}
};
let mut ret = warp::http::Response::new(bytes.into());
let headers = ret.headers_mut();
headers.insert(
CONTENT_TYPE,
HeaderValue::from_static("application/x-protobuf"),
);
headers.insert(CONTENT_ENCODING, HeaderValue::from_static("snappy"));
ret
}
}