prom_remote_api/web/
actix.rs1use actix_web::{
2 http::header::HeaderValue,
3 http::{
4 header::{CONTENT_ENCODING, CONTENT_TYPE},
5 StatusCode,
6 },
7 web, HttpRequest, HttpResponse, Responder, ResponseError,
8};
9use futures::StreamExt;
10use prost::Message;
11
12use crate::types::{Error, ReadRequest, ReadResponse, RemoteStorageRef, WriteRequest};
13
14impl ResponseError for Error {}
15
16pub async fn read<C: Send + Sync, Err: ResponseError + Send + 'static>(
18 storage: web::Data<RemoteStorageRef<C, Err>>,
19 ctx: C,
20 body: web::Payload,
21) -> Result<ReadResponse, actix_web::Error> {
22 let req = decode_request::<ReadRequest>(body).await?;
23 storage
24 .read(ctx, req)
25 .await
26 .map_err(|e| actix_web::Error::from(e))
27}
28
29pub async fn write<C: Send + Sync, Err: ResponseError + Send + 'static>(
31 storage: web::Data<RemoteStorageRef<C, Err>>,
32 ctx: C,
33 body: web::Payload,
34) -> Result<WriteResponse, actix_web::Error> {
35 let req = decode_request::<WriteRequest>(body).await?;
36 storage
37 .write(ctx, req)
38 .await
39 .map_err(|e| actix_web::Error::from(e))
40 .map(|v| v.into())
41}
42
43pub struct WriteResponse;
44
45impl From<()> for WriteResponse {
46 fn from(_v: ()) -> Self {
47 WriteResponse
48 }
49}
50
51async fn decode_request<T: Message + Default>(mut body: web::Payload) -> Result<T, Error> {
52 let mut bytes = web::BytesMut::with_capacity(4096);
53 while let Some(item) = body.next().await {
54 bytes.extend_from_slice(&item.unwrap());
55 }
56
57 crate::util::decode_snappy(&bytes)
58 .and_then(|b| T::decode(b.as_slice()).map_err(Error::ProtoDecode))
59}
60
61impl Responder for WriteResponse {
62 type Body = ();
63
64 fn respond_to(self, _req: &HttpRequest) -> HttpResponse<Self::Body> {
65 HttpResponse::with_body(StatusCode::OK, ())
66 }
67}
68
69impl Responder for ReadResponse {
70 type Body = Vec<u8>;
71
72 fn respond_to(self, _req: &HttpRequest) -> HttpResponse<Self::Body> {
73 let bytes = match crate::util::encode_snappy(self.encode_to_vec().as_slice()) {
74 Ok(v) => v,
75 Err(e) => {
76 return HttpResponse::with_body(
77 StatusCode::INTERNAL_SERVER_ERROR,
78 e.to_string().into_bytes(),
79 );
80 }
81 };
82
83 let mut resp = HttpResponse::with_body(StatusCode::OK, bytes);
84 let headers = resp.headers_mut();
85 headers.insert(
86 CONTENT_TYPE,
87 HeaderValue::from_static("application/x-protobuf"),
88 );
89 headers.insert(CONTENT_ENCODING, HeaderValue::from_static("snappy"));
90
91 resp
92 }
93}