1use super::{
16 get_super_ts, new_internal_error, HttpHeader, HTTP_HEADER_CONTENT_HTML,
17 HTTP_HEADER_CONTENT_JSON, HTTP_HEADER_CONTENT_TEXT, HTTP_HEADER_NO_CACHE,
18 HTTP_HEADER_NO_STORE, HTTP_HEADER_TRANSFER_CHUNKED, LOG_CATEGORY,
19};
20use bytes::Bytes;
21use http::header;
22use http::StatusCode;
23use pingora::http::ResponseHeader;
24use pingora::proxy::Session;
25use serde::Serialize;
26use std::pin::Pin;
27use tokio::io::AsyncReadExt;
28use tracing::error;
29
30fn new_cache_control_header(
36 max_age: Option<u32>,
37 cache_private: Option<bool>,
38) -> HttpHeader {
39 if let Some(max_age) = max_age {
40 if max_age == 0 {
41 return HTTP_HEADER_NO_CACHE.clone();
42 }
43 let category = if cache_private.unwrap_or_default() {
44 "private"
45 } else {
46 "public"
47 };
48 if let Ok(value) = header::HeaderValue::from_str(&format!(
49 "{category}, max-age={max_age}"
50 )) {
51 return (header::CACHE_CONTROL, value);
52 }
53 }
54 HTTP_HEADER_NO_CACHE.clone()
55}
56
57#[derive(Default, Clone, Debug)]
59pub struct HttpResponse {
60 pub status: StatusCode,
62 pub body: Bytes,
64 pub max_age: Option<u32>,
66 pub created_at: Option<u32>,
68 pub cache_private: Option<bool>,
70 pub headers: Option<Vec<HttpHeader>>,
72}
73
74impl HttpResponse {
75 pub fn no_content() -> Self {
77 Self {
78 status: StatusCode::NO_CONTENT,
79 headers: Some(vec![HTTP_HEADER_NO_STORE.clone()]),
80 ..Default::default()
81 }
82 }
83 pub fn bad_request(body: Bytes) -> Self {
85 Self {
86 status: StatusCode::BAD_REQUEST,
87 headers: Some(vec![HTTP_HEADER_NO_STORE.clone()]),
88 body,
89 ..Default::default()
90 }
91 }
92 pub fn not_found(body: Bytes) -> Self {
94 Self {
95 status: StatusCode::NOT_FOUND,
96 headers: Some(vec![HTTP_HEADER_NO_STORE.clone()]),
97 body,
98 ..Default::default()
99 }
100 }
101 pub fn unknown_error(body: Bytes) -> Self {
103 Self {
104 status: StatusCode::INTERNAL_SERVER_ERROR,
105 headers: Some(vec![HTTP_HEADER_NO_STORE.clone()]),
106 body,
107 ..Default::default()
108 }
109 }
110 pub fn html(body: Bytes) -> Self {
112 Self {
113 status: StatusCode::OK,
114 headers: Some(vec![
115 HTTP_HEADER_CONTENT_HTML.clone(),
116 HTTP_HEADER_NO_CACHE.clone(),
117 ]),
118 body,
119 ..Default::default()
120 }
121 }
122 pub fn redirect(location: &str) -> pingora::Result<Self> {
124 let value = http::HeaderValue::from_str(location).map_err(|e| {
125 error!(error = e.to_string(), "to header value fail");
126 new_internal_error(500, e.to_string())
127 })?;
128 Ok(Self {
129 status: StatusCode::TEMPORARY_REDIRECT,
130 headers: Some(vec![
131 (http::header::LOCATION.clone(), value),
132 HTTP_HEADER_NO_CACHE.clone(),
133 ]),
134 ..Default::default()
135 })
136 }
137
138 pub fn text(body: Bytes) -> Self {
140 Self {
141 status: StatusCode::OK,
142 headers: Some(vec![
143 HTTP_HEADER_CONTENT_TEXT.clone(),
144 HTTP_HEADER_NO_CACHE.clone(),
145 ]),
146 body,
147 ..Default::default()
148 }
149 }
150 pub fn try_from_json_status<T>(
152 value: &T,
153 status: StatusCode,
154 ) -> pingora::Result<Self>
155 where
156 T: ?Sized + Serialize,
157 {
158 let mut resp = Self::try_from_json(value)?;
159 resp.status = status;
160 Ok(resp)
161 }
162
163 pub fn try_from_json<T>(value: &T) -> pingora::Result<Self>
165 where
166 T: ?Sized + Serialize,
167 {
168 let buf = serde_json::to_vec(value).map_err(|e| {
169 error!(
170 category = LOG_CATEGORY,
171 error = e.to_string(),
172 "to json fail"
173 );
174 new_internal_error(400, e.to_string())
175 })?;
176 Ok(Self {
177 status: StatusCode::OK,
178 body: buf.into(),
179 headers: Some(vec![HTTP_HEADER_CONTENT_JSON.clone()]),
180 ..Default::default()
181 })
182 }
183 pub fn new_response_header(&self) -> pingora::Result<ResponseHeader> {
185 let fix_size = 3;
186 let size = self
187 .headers
188 .as_ref()
189 .map_or_else(|| fix_size, |headers| headers.len() + fix_size);
190 let mut resp = ResponseHeader::build(self.status, Some(size))?;
191 resp.insert_header(
192 header::CONTENT_LENGTH,
193 self.body.len().to_string(),
194 )?;
195
196 let cache_control =
198 new_cache_control_header(self.max_age, self.cache_private);
199 resp.insert_header(cache_control.0, cache_control.1)?;
200
201 if let Some(created_at) = self.created_at {
202 let secs = get_super_ts() - created_at;
203 if let Ok(value) = header::HeaderValue::from_str(&secs.to_string())
204 {
205 resp.insert_header(header::AGE, value)?;
206 }
207 }
208
209 if let Some(headers) = &self.headers {
210 for (name, value) in headers {
211 resp.insert_header(name.to_owned(), value)?;
212 }
213 }
214 Ok(resp)
215 }
216 pub async fn send(self, session: &mut Session) -> pingora::Result<usize> {
218 let header = self.new_response_header()?;
219 let size = self.body.len();
220 session
221 .write_response_header(Box::new(header), false)
222 .await?;
223 session.write_response_body(Some(self.body), true).await?;
224 session.finish_body().await?;
225 Ok(size)
226 }
227}
228
229pub struct HttpChunkResponse<'r, R> {
231 pub reader: Pin<&'r mut R>,
233 pub chunk_size: usize,
235 pub max_age: Option<u32>,
237 pub cache_private: Option<bool>,
238 pub headers: Option<Vec<HttpHeader>>,
239}
240
241const DEFAULT_BUF_SIZE: usize = 8 * 1024;
243
244impl<'r, R> HttpChunkResponse<'r, R>
245where
246 R: tokio::io::AsyncRead + std::marker::Unpin,
247{
248 pub fn new(r: &'r mut R) -> Self {
250 Self {
251 reader: Pin::new(r),
252 chunk_size: DEFAULT_BUF_SIZE,
253 max_age: None,
254 headers: None,
255 cache_private: None,
256 }
257 }
258 pub fn get_response_header(&self) -> pingora::Result<ResponseHeader> {
260 let mut resp = ResponseHeader::build(StatusCode::OK, Some(4))?;
261 if let Some(headers) = &self.headers {
262 for (name, value) in headers {
263 resp.insert_header(name.to_owned(), value)?;
264 }
265 }
266
267 let chunked = HTTP_HEADER_TRANSFER_CHUNKED.clone();
268 resp.insert_header(chunked.0, chunked.1)?;
269
270 let cache_control =
271 new_cache_control_header(self.max_age, self.cache_private);
272 resp.insert_header(cache_control.0, cache_control.1)?;
273 Ok(resp)
274 }
275 pub async fn send(
277 mut self,
278 session: &mut Session,
279 ) -> pingora::Result<usize> {
280 let header = self.get_response_header()?;
281 session
282 .write_response_header(Box::new(header), false)
283 .await?;
284
285 let mut sent = 0;
286 let chunk_size = self.chunk_size.max(512);
287 let mut buffer = vec![0; chunk_size];
288 loop {
289 let size = self.reader.read(&mut buffer).await.map_err(|e| {
290 error!(error = e.to_string(), "read data fail");
291 new_internal_error(400, e.to_string())
292 })?;
293 let end = size < chunk_size;
294 session
295 .write_response_body(
296 Some(Bytes::copy_from_slice(&buffer[..size])),
297 end,
298 )
299 .await?;
300 sent += size;
301 if end {
302 break;
303 }
304 }
305 session.finish_body().await?;
306
307 Ok(sent)
308 }
309}
310
311#[cfg(test)]
312mod tests {
313 use super::*;
314 use crate::convert_headers;
315 use bytes::Bytes;
316 use http::StatusCode;
317 use pretty_assertions::assert_eq;
318 use serde::Serialize;
319 use std::io::Write;
320 use tempfile::NamedTempFile;
321 use tokio::fs;
322 #[test]
323 fn test_new_cache_control_header() {
324 assert_eq!(
325 r###"("cache-control", "private, max-age=3600")"###,
326 format!("{:?}", new_cache_control_header(Some(3600), Some(true)))
327 );
328 assert_eq!(
329 r###"("cache-control", "public, max-age=3600")"###,
330 format!("{:?}", new_cache_control_header(Some(3600), None))
331 );
332 assert_eq!(
333 r###"("cache-control", "private, no-cache")"###,
334 format!("{:?}", new_cache_control_header(Some(0), Some(false)))
335 );
336 assert_eq!(
337 r###"("cache-control", "private, no-cache")"###,
338 format!("{:?}", new_cache_control_header(None, None))
339 );
340 }
341
342 #[test]
343 fn test_http_response() {
344 assert_eq!(
345 r###"HttpResponse { status: 204, body: b"", max_age: None, created_at: None, cache_private: None, headers: Some([("cache-control", "private, no-store")]) }"###,
346 format!("{:?}", HttpResponse::no_content())
347 );
348 assert_eq!(
349 r###"HttpResponse { status: 404, body: b"Not Found", max_age: None, created_at: None, cache_private: None, headers: Some([("cache-control", "private, no-store")]) }"###,
350 format!("{:?}", HttpResponse::not_found("Not Found".into()))
351 );
352 assert_eq!(
353 r###"HttpResponse { status: 500, body: b"Unknown Error", max_age: None, created_at: None, cache_private: None, headers: Some([("cache-control", "private, no-store")]) }"###,
354 format!(
355 "{:?}",
356 HttpResponse::unknown_error("Unknown Error".into())
357 )
358 );
359
360 assert_eq!(
361 r###"HttpResponse { status: 400, body: b"Bad Request", max_age: None, created_at: None, cache_private: None, headers: Some([("cache-control", "private, no-store")]) }"###,
362 format!("{:?}", HttpResponse::bad_request("Bad Request".into()))
363 );
364
365 assert_eq!(
366 r###"HttpResponse { status: 200, body: b"<p>Pingap</p>", max_age: None, created_at: None, cache_private: None, headers: Some([("content-type", "text/html; charset=utf-8"), ("cache-control", "private, no-cache")]) }"###,
367 format!("{:?}", HttpResponse::html("<p>Pingap</p>".into()))
368 );
369
370 assert_eq!(
371 r###"HttpResponse { status: 307, body: b"", max_age: None, created_at: None, cache_private: None, headers: Some([("location", "http://example.com/"), ("cache-control", "private, no-cache")]) }"###,
372 format!(
373 "{:?}",
374 HttpResponse::redirect("http://example.com/").unwrap()
375 )
376 );
377
378 assert_eq!(
379 r###"HttpResponse { status: 200, body: b"Hello World!", max_age: None, created_at: None, cache_private: None, headers: Some([("content-type", "text/plain; charset=utf-8"), ("cache-control", "private, no-cache")]) }"###,
380 format!("{:?}", HttpResponse::text("Hello World!".into()))
381 );
382
383 #[derive(Serialize)]
384 struct Data {
385 message: String,
386 }
387 let resp = HttpResponse::try_from_json_status(
388 &Data {
389 message: "Hello World!".to_string(),
390 },
391 StatusCode::BAD_REQUEST,
392 )
393 .unwrap();
394 assert_eq!(
395 r###"HttpResponse { status: 400, body: b"{\"message\":\"Hello World!\"}", max_age: None, created_at: None, cache_private: None, headers: Some([("content-type", "application/json; charset=utf-8")]) }"###,
396 format!("{resp:?}")
397 );
398 let resp = HttpResponse::try_from_json(&Data {
399 message: "Hello World!".to_string(),
400 })
401 .unwrap();
402 assert_eq!(
403 r###"HttpResponse { status: 200, body: b"{\"message\":\"Hello World!\"}", max_age: None, created_at: None, cache_private: None, headers: Some([("content-type", "application/json; charset=utf-8")]) }"###,
404 format!("{resp:?}")
405 );
406
407 let resp = HttpResponse {
408 status: StatusCode::OK,
409 body: Bytes::from("Hello world!"),
410 max_age: Some(3600),
411 created_at: Some(get_super_ts() - 10),
412 cache_private: Some(true),
413 headers: Some(
414 convert_headers(&[
415 "Contont-Type: application/json".to_string(),
416 "Content-Encoding: gzip".to_string(),
417 ])
418 .unwrap(),
419 ),
420 };
421
422 let mut header = resp.new_response_header().unwrap();
423 assert_eq!(true, !header.headers.get("Age").unwrap().is_empty());
424 header.remove_header("Age").unwrap();
425
426 assert_eq!(
427 r###"ResponseHeader { base: Parts { status: 200, version: HTTP/1.1, headers: {"content-length": "12", "cache-control": "private, max-age=3600", "content-encoding": "gzip", "contont-type": "application/json"} }, header_name_map: Some({"content-length": CaseHeaderName(b"Content-Length"), "cache-control": CaseHeaderName(b"Cache-Control"), "content-encoding": CaseHeaderName(b"Content-Encoding"), "contont-type": CaseHeaderName(b"contont-type")}), reason_phrase: None }"###,
428 format!("{header:?}")
429 );
430 }
431
432 #[tokio::test]
433 async fn test_http_chunk_response() {
434 let file = include_bytes!("../../error.html");
435 let mut f = NamedTempFile::new().unwrap();
436 f.write_all(file).unwrap();
437 let mut f = fs::OpenOptions::new().read(true).open(f).await.unwrap();
438 let mut resp = HttpChunkResponse::new(&mut f);
439 resp.max_age = Some(3600);
440 resp.cache_private = Some(false);
441 resp.headers = Some(
442 convert_headers(&["Contont-Type: text/html".to_string()]).unwrap(),
443 );
444 let header = resp.get_response_header().unwrap();
445 assert_eq!(
446 r###"ResponseHeader { base: Parts { status: 200, version: HTTP/1.1, headers: {"contont-type": "text/html", "transfer-encoding": "chunked", "cache-control": "public, max-age=3600"} }, header_name_map: Some({"contont-type": CaseHeaderName(b"contont-type"), "transfer-encoding": CaseHeaderName(b"Transfer-Encoding"), "cache-control": CaseHeaderName(b"Cache-Control")}), reason_phrase: None }"###,
447 format!("{header:?}")
448 );
449 }
450}