1use std::collections::VecDeque;
2
3use async_stream::stream;
4use futures::StreamExt;
5use hyper::{
6 body::to_bytes, header::CONTENT_TYPE, Body, Method, Request as HttpRequest,
7 Response as HttpResponse, StatusCode, Uri,
8};
9use serde::{de::DeserializeOwned, Serialize};
10use serde_json::Value;
11
12use crate::{
13 error::ProtocolErrorType,
14 http::{
15 generic_error, HttpNotificationPayload, ModalHttpResponse, ResponseHttpConvert,
16 SSE_DATA_PREFIX,
17 },
18 NotificationStream, ProtocolError, ServiceError, ServiceResponse,
19};
20
21pub async fn parse_response<T: DeserializeOwned>(
26 response: HttpResponse<Body>,
27) -> Result<T, ProtocolError> {
28 let bytes = to_bytes(response)
29 .await
30 .map_err(|e| ProtocolError::new(ProtocolErrorType::Internal, Box::new(e)))?;
31 parse_response_payload(bytes.as_ref())
32}
33
34fn parse_response_payload<T: DeserializeOwned>(response: &[u8]) -> Result<T, ProtocolError> {
35 serde_json::from_slice(response)
36 .map_err(|e| ProtocolError::new(ProtocolErrorType::BadRequest, Box::new(e)))
37}
38
39pub fn serialize_to_http_request<T: Serialize>(
43 base_url: &Uri,
44 path: &str,
45 method: Method,
46 request: &T,
47) -> Result<HttpRequest<Body>, ProtocolError> {
48 let bytes = serde_json::to_vec(request)
49 .map_err(|e| ProtocolError::new(ProtocolErrorType::Internal, Box::new(e)))?;
50 let url = Uri::builder()
51 .scheme(
52 base_url
53 .scheme()
54 .expect("base url should contain scheme")
55 .clone(),
56 )
57 .authority(
58 base_url
59 .authority()
60 .expect("base url should contain authority")
61 .clone(),
62 )
63 .path_and_query(path)
64 .build()
65 .expect("should be able to build url");
66 Ok(HttpRequest::builder()
67 .method(method)
68 .uri(url)
69 .header(CONTENT_TYPE, "application/json")
70 .body(bytes.into())
71 .expect("should be able to create http request"))
72}
73
74pub fn notification_sse_stream<Request, Response>(
78 original_request: Request,
79 http_response: HttpResponse<Body>,
80) -> NotificationStream<Response>
81where
82 Request: Clone + Send + Sync + 'static,
83 Response: ResponseHttpConvert<Request, Response> + Send + Sync + 'static,
84{
85 let mut body = http_response.into_body();
86 stream! {
87 let mut buffer = VecDeque::new();
88 while let Some(bytes_result) = body.next().await {
89 match bytes_result {
90 Err(e) => {
91 let boxed_e: ServiceError = Box::new(e);
92 yield Err(boxed_e.into());
93 return;
94 },
95 Ok(bytes) => {
96 buffer.extend(bytes);
97 }
98 }
99 while let Some(linebreak_pos) = buffer.iter().position(|b| b == &b'\n') {
100 let line_bytes = buffer.drain(0..(linebreak_pos + 1)).collect::<Vec<_>>();
101 if let Ok(line) = std::str::from_utf8(&line_bytes) {
102 if !line.starts_with(SSE_DATA_PREFIX) {
103 continue;
104 }
105 if let Ok(payload) = serde_json::from_str::<HttpNotificationPayload>(&line[SSE_DATA_PREFIX.len()..]) {
106 let result: Result<Value, ProtocolError> = payload.into();
107 match result {
108 Err(e) => yield Err(e),
109 Ok(value) => {
110 yield Response::from_http_response(ModalHttpResponse::Event(value), &original_request).await
111 .and_then(|response| response.ok_or_else(|| generic_error(ProtocolErrorType::NotFound)))
112 .and_then(|response| match response {
113 ServiceResponse::Single(response) => Ok(response),
114 _ => Err(generic_error(ProtocolErrorType::NotFound))
115 });
116 }
117 }
118 }
119 }
120 }
121 }
122 }.boxed()
123}
124
125pub async fn parse_request<T: DeserializeOwned>(
130 request: HttpRequest<Body>,
131) -> Result<T, ProtocolError> {
132 let bytes = to_bytes(request)
133 .await
134 .map_err(|e| ProtocolError::new(ProtocolErrorType::Internal, Box::new(e)))?;
135 serde_json::from_slice(bytes.as_ref())
136 .map_err(|e| ProtocolError::new(ProtocolErrorType::BadRequest, Box::new(e)))
137}
138
139pub fn validate_method(
143 request: &HttpRequest<Body>,
144 expected_method: Method,
145) -> Result<(), ProtocolError> {
146 match request.method() == &expected_method {
147 true => Ok(()),
148 false => Err(generic_error(ProtocolErrorType::HttpMethodNotAllowed).into()),
149 }
150}
151
152fn serialize_response<T: Serialize>(response: &T) -> Result<Vec<u8>, ProtocolError> {
153 serde_json::to_vec(response)
154 .map_err(|e| ProtocolError::new(ProtocolErrorType::Internal, Box::new(e)))
155}
156
157pub fn serialize_to_http_response<T: Serialize>(
161 response: &T,
162 status: StatusCode,
163) -> Result<HttpResponse<Body>, ProtocolError> {
164 let bytes = serialize_response(response)?;
165 Ok(HttpResponse::builder()
166 .header(CONTENT_TYPE, "application/json")
167 .status(status)
168 .body(bytes.into())
169 .expect("should be able to create http response"))
170}
171
172pub fn notification_sse_response<Request, Response>(
176 notification_stream: NotificationStream<Response>,
177) -> HttpResponse<Body>
178where
179 Request: Clone,
180 Response: ResponseHttpConvert<Request, Response> + 'static,
181{
182 let payload_stream = notification_stream.map(|result| {
183 let payload = HttpNotificationPayload::from(result.and_then(|response| {
184 Response::to_http_response(ServiceResponse::Single(response)).map(|opt| {
185 opt.and_then(|response| match response {
186 ModalHttpResponse::Event(value) => Some(value),
187 _ => None,
188 })
189 })
190 }));
191 let payload_str = serde_json::to_string(&payload)?;
192 Ok::<String, serde_json::Error>(format!("data: {}\n\n", payload_str))
193 });
194 HttpResponse::new(Body::wrap_stream(payload_stream))
195}