prom_remote_api/web/
actix.rs

1use actix_web::{
2    http::header::HeaderValue,
3    http::{
4        header::{CONTENT_ENCODING, CONTENT_TYPE},
5        StatusCode,
6    },
7    web, HttpRequest, HttpResponse, Responder, ResponseError,
8};
9use futures::StreamExt;
10use prost::Message;
11
12use crate::types::{Error, ReadRequest, ReadResponse, RemoteStorageRef, WriteRequest};
13
14impl ResponseError for Error {}
15
16/// Actix-web handler for remote read request
17pub async fn read<C: Send + Sync, Err: ResponseError + Send + 'static>(
18    storage: web::Data<RemoteStorageRef<C, Err>>,
19    ctx: C,
20    body: web::Payload,
21) -> Result<ReadResponse, actix_web::Error> {
22    let req = decode_request::<ReadRequest>(body).await?;
23    storage
24        .read(ctx, req)
25        .await
26        .map_err(|e| actix_web::Error::from(e))
27}
28
29/// Actix-web handler for remote write request
30pub async fn write<C: Send + Sync, Err: ResponseError + Send + 'static>(
31    storage: web::Data<RemoteStorageRef<C, Err>>,
32    ctx: C,
33    body: web::Payload,
34) -> Result<WriteResponse, actix_web::Error> {
35    let req = decode_request::<WriteRequest>(body).await?;
36    storage
37        .write(ctx, req)
38        .await
39        .map_err(|e| actix_web::Error::from(e))
40        .map(|v| v.into())
41}
42
43pub struct WriteResponse;
44
45impl From<()> for WriteResponse {
46    fn from(_v: ()) -> Self {
47        WriteResponse
48    }
49}
50
51async fn decode_request<T: Message + Default>(mut body: web::Payload) -> Result<T, Error> {
52    let mut bytes = web::BytesMut::with_capacity(4096);
53    while let Some(item) = body.next().await {
54        bytes.extend_from_slice(&item.unwrap());
55    }
56
57    crate::util::decode_snappy(&bytes)
58        .and_then(|b| T::decode(b.as_slice()).map_err(Error::ProtoDecode))
59}
60
61impl Responder for WriteResponse {
62    type Body = ();
63
64    fn respond_to(self, _req: &HttpRequest) -> HttpResponse<Self::Body> {
65        HttpResponse::with_body(StatusCode::OK, ())
66    }
67}
68
69impl Responder for ReadResponse {
70    type Body = Vec<u8>;
71
72    fn respond_to(self, _req: &HttpRequest) -> HttpResponse<Self::Body> {
73        let bytes = match crate::util::encode_snappy(self.encode_to_vec().as_slice()) {
74            Ok(v) => v,
75            Err(e) => {
76                return HttpResponse::with_body(
77                    StatusCode::INTERNAL_SERVER_ERROR,
78                    e.to_string().into_bytes(),
79                );
80            }
81        };
82
83        let mut resp = HttpResponse::with_body(StatusCode::OK, bytes);
84        let headers = resp.headers_mut();
85        headers.insert(
86            CONTENT_TYPE,
87            HeaderValue::from_static("application/x-protobuf"),
88        );
89        headers.insert(CONTENT_ENCODING, HeaderValue::from_static("snappy"));
90
91        resp
92    }
93}