multilink/http/
util.rs

1use std::collections::VecDeque;
2
3use async_stream::stream;
4use futures::StreamExt;
5use hyper::{
6    body::to_bytes, header::CONTENT_TYPE, Body, Method, Request as HttpRequest,
7    Response as HttpResponse, StatusCode, Uri,
8};
9use serde::{de::DeserializeOwned, Serialize};
10use serde_json::Value;
11
12use crate::{
13    error::ProtocolErrorType,
14    http::{
15        generic_error, HttpNotificationPayload, ModalHttpResponse, ResponseHttpConvert,
16        SSE_DATA_PREFIX,
17    },
18    NotificationStream, ProtocolError, ServiceError, ServiceResponse,
19};
20
21/// Deserializes the body of [`HttpResponse<Body>`] into `T`.
22/// Returns a "bad request" error if JSON deserialization fails,
23/// and returns an "internal" error if raw data retrieval from the request fails.
24/// Can be useful for implementing [`ResponseHttpConvert::from_http_response`].
25pub async fn parse_response<T: DeserializeOwned>(
26    response: HttpResponse<Body>,
27) -> Result<T, ProtocolError> {
28    let bytes = to_bytes(response)
29        .await
30        .map_err(|e| ProtocolError::new(ProtocolErrorType::Internal, Box::new(e)))?;
31    parse_response_payload(bytes.as_ref())
32}
33
34fn parse_response_payload<T: DeserializeOwned>(response: &[u8]) -> Result<T, ProtocolError> {
35    serde_json::from_slice(response)
36        .map_err(|e| ProtocolError::new(ProtocolErrorType::BadRequest, Box::new(e)))
37}
38
39/// Serializes `T` into [`HttpRequest<Body>`]. Returns an "internal" error if
40/// JSON serialization fails. Can be useful for
41/// implementing [`RequestHttpConvert::to_http_request`](crate::http::RequestHttpConvert::to_http_request).
42pub fn serialize_to_http_request<T: Serialize>(
43    base_url: &Uri,
44    path: &str,
45    method: Method,
46    request: &T,
47) -> Result<HttpRequest<Body>, ProtocolError> {
48    let bytes = serde_json::to_vec(request)
49        .map_err(|e| ProtocolError::new(ProtocolErrorType::Internal, Box::new(e)))?;
50    let url = Uri::builder()
51        .scheme(
52            base_url
53                .scheme()
54                .expect("base url should contain scheme")
55                .clone(),
56        )
57        .authority(
58            base_url
59                .authority()
60                .expect("base url should contain authority")
61                .clone(),
62        )
63        .path_and_query(path)
64        .build()
65        .expect("should be able to build url");
66    Ok(HttpRequest::builder()
67        .method(method)
68        .uri(url)
69        .header(CONTENT_TYPE, "application/json")
70        .body(bytes.into())
71        .expect("should be able to create http request"))
72}
73
74/// Converts an [`HttpResponse<Body>`] to a [`NotificationStream<Response>`] so
75/// server-side events can be consumed by the HTTP client. Can be useful for implementing
76/// [`ResponseHttpConvert::from_http_response`].
77pub fn notification_sse_stream<Request, Response>(
78    original_request: Request,
79    http_response: HttpResponse<Body>,
80) -> NotificationStream<Response>
81where
82    Request: Clone + Send + Sync + 'static,
83    Response: ResponseHttpConvert<Request, Response> + Send + Sync + 'static,
84{
85    let mut body = http_response.into_body();
86    stream! {
87        let mut buffer = VecDeque::new();
88        while let Some(bytes_result) = body.next().await {
89            match bytes_result {
90                Err(e) => {
91                    let boxed_e: ServiceError = Box::new(e);
92                    yield Err(boxed_e.into());
93                    return;
94                },
95                Ok(bytes) => {
96                    buffer.extend(bytes);
97                }
98            }
99            while let Some(linebreak_pos) = buffer.iter().position(|b| b == &b'\n') {
100                let line_bytes = buffer.drain(0..(linebreak_pos + 1)).collect::<Vec<_>>();
101                if let Ok(line) = std::str::from_utf8(&line_bytes) {
102                    if !line.starts_with(SSE_DATA_PREFIX) {
103                        continue;
104                    }
105                    if let Ok(payload) = serde_json::from_str::<HttpNotificationPayload>(&line[SSE_DATA_PREFIX.len()..]) {
106                        let result: Result<Value, ProtocolError> = payload.into();
107                        match result {
108                            Err(e) => yield Err(e),
109                            Ok(value) => {
110                                yield Response::from_http_response(ModalHttpResponse::Event(value), &original_request).await
111                                    .and_then(|response| response.ok_or_else(|| generic_error(ProtocolErrorType::NotFound)))
112                                    .and_then(|response| match response {
113                                        ServiceResponse::Single(response) => Ok(response),
114                                        _ => Err(generic_error(ProtocolErrorType::NotFound))
115                                    });
116                            }
117                        }
118                    }
119                }
120            }
121        }
122    }.boxed()
123}
124
125/// Deserializes the body of [`HttpRequest<Body>`] into `T`.
126/// Returns a "bad request" error if JSON deserialization fails,
127/// and returns an "internal" error if raw data retrieval from the request fails.
128/// Can be useful for implementing [`RequestHttpConvert::from_http_request`](crate::http::RequestHttpConvert::from_http_request).
129pub async fn parse_request<T: DeserializeOwned>(
130    request: HttpRequest<Body>,
131) -> Result<T, ProtocolError> {
132    let bytes = to_bytes(request)
133        .await
134        .map_err(|e| ProtocolError::new(ProtocolErrorType::Internal, Box::new(e)))?;
135    serde_json::from_slice(bytes.as_ref())
136        .map_err(|e| ProtocolError::new(ProtocolErrorType::BadRequest, Box::new(e)))
137}
138
139/// Compares the request method with an expected method and returns
140/// [`ProtocolErrorType::HttpMethodNotAllowed`] if there is a mismatch.
141/// Can be useful for implementing [`RequestHttpConvert::from_http_request`](crate::http::RequestHttpConvert::from_http_request).
142pub fn validate_method(
143    request: &HttpRequest<Body>,
144    expected_method: Method,
145) -> Result<(), ProtocolError> {
146    match request.method() == &expected_method {
147        true => Ok(()),
148        false => Err(generic_error(ProtocolErrorType::HttpMethodNotAllowed).into()),
149    }
150}
151
152fn serialize_response<T: Serialize>(response: &T) -> Result<Vec<u8>, ProtocolError> {
153    serde_json::to_vec(response)
154        .map_err(|e| ProtocolError::new(ProtocolErrorType::Internal, Box::new(e)))
155}
156
157/// Serializes `T` into [`HttpResponse<Body>`]. Returns an "internal" error if
158/// JSON serialization fails. Can be useful for
159/// implementing [`ResponseHttpConvert::to_http_response`].
160pub fn serialize_to_http_response<T: Serialize>(
161    response: &T,
162    status: StatusCode,
163) -> Result<HttpResponse<Body>, ProtocolError> {
164    let bytes = serialize_response(response)?;
165    Ok(HttpResponse::builder()
166        .header(CONTENT_TYPE, "application/json")
167        .status(status)
168        .body(bytes.into())
169        .expect("should be able to create http response"))
170}
171
172/// Converts a [`NotificationStream<Response>`] to an [`HttpResponse<Body>`] so
173/// server-side events can be produced by the HTTP server. Can be useful for implementing
174/// [`ResponseHttpConvert::to_http_response`].
175pub fn notification_sse_response<Request, Response>(
176    notification_stream: NotificationStream<Response>,
177) -> HttpResponse<Body>
178where
179    Request: Clone,
180    Response: ResponseHttpConvert<Request, Response> + 'static,
181{
182    let payload_stream = notification_stream.map(|result| {
183        let payload = HttpNotificationPayload::from(result.and_then(|response| {
184            Response::to_http_response(ServiceResponse::Single(response)).map(|opt| {
185                opt.and_then(|response| match response {
186                    ModalHttpResponse::Event(value) => Some(value),
187                    _ => None,
188                })
189            })
190        }));
191        let payload_str = serde_json::to_string(&payload)?;
192        Ok::<String, serde_json::Error>(format!("data: {}\n\n", payload_str))
193    });
194    HttpResponse::new(Body::wrap_stream(payload_stream))
195}