pingap_core/
http_response.rs

1// Copyright 2024-2025 Tree xie.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::{
16    get_super_ts, new_internal_error, HttpHeader, HTTP_HEADER_CONTENT_HTML,
17    HTTP_HEADER_CONTENT_JSON, HTTP_HEADER_CONTENT_TEXT, HTTP_HEADER_NO_CACHE,
18    HTTP_HEADER_NO_STORE, HTTP_HEADER_TRANSFER_CHUNKED, LOG_CATEGORY,
19};
20use bytes::Bytes;
21use http::header;
22use http::StatusCode;
23use pingora::http::ResponseHeader;
24use pingora::proxy::Session;
25use serde::Serialize;
26use std::pin::Pin;
27use tokio::io::AsyncReadExt;
28use tracing::error;
29
30/// Helper function to generate cache control headers
31/// Returns a cache control header based on max age and privacy settings.
32/// - If max_age is 0: returns "private, no-cache"
33/// - If max_age is set: returns "private/public, max-age=X"
34/// - Otherwise: returns "private, no-cache"
35fn new_cache_control_header(
36    max_age: Option<u32>,
37    cache_private: Option<bool>,
38) -> HttpHeader {
39    if let Some(max_age) = max_age {
40        if max_age == 0 {
41            return HTTP_HEADER_NO_CACHE.clone();
42        }
43        let category = if cache_private.unwrap_or_default() {
44            "private"
45        } else {
46            "public"
47        };
48        if let Ok(value) = header::HeaderValue::from_str(&format!(
49            "{category}, max-age={max_age}"
50        )) {
51            return (header::CACHE_CONTROL, value);
52        }
53    }
54    HTTP_HEADER_NO_CACHE.clone()
55}
56
57/// Main HTTP response struct for handling complete responses
58#[derive(Default, Clone, Debug)]
59pub struct HttpResponse {
60    /// HTTP status code (200 OK, 404 Not Found, etc)
61    pub status: StatusCode,
62    /// Response body as bytes
63    pub body: Bytes,
64    /// Cache control max-age value in seconds
65    pub max_age: Option<u32>,
66    /// Timestamp when response was created
67    pub created_at: Option<u32>,
68    /// Whether cache should be private (true) or public (false)
69    pub cache_private: Option<bool>,
70    /// Additional HTTP headers
71    pub headers: Option<Vec<HttpHeader>>,
72}
73
74impl HttpResponse {
75    /// Creates a new HTTP response with 204 No Content status and no-store cache control
76    pub fn no_content() -> Self {
77        Self {
78            status: StatusCode::NO_CONTENT,
79            headers: Some(vec![HTTP_HEADER_NO_STORE.clone()]),
80            ..Default::default()
81        }
82    }
83    /// Creates a new HTTP response with 400 Bad Request status and the given body
84    pub fn bad_request(body: Bytes) -> Self {
85        Self {
86            status: StatusCode::BAD_REQUEST,
87            headers: Some(vec![HTTP_HEADER_NO_STORE.clone()]),
88            body,
89            ..Default::default()
90        }
91    }
92    /// Creates a new HTTP response with 404 Not Found status and the given body
93    pub fn not_found(body: Bytes) -> Self {
94        Self {
95            status: StatusCode::NOT_FOUND,
96            headers: Some(vec![HTTP_HEADER_NO_STORE.clone()]),
97            body,
98            ..Default::default()
99        }
100    }
101    /// Creates a new HTTP response with 500 Internal Server Error status and the given body
102    pub fn unknown_error(body: Bytes) -> Self {
103        Self {
104            status: StatusCode::INTERNAL_SERVER_ERROR,
105            headers: Some(vec![HTTP_HEADER_NO_STORE.clone()]),
106            body,
107            ..Default::default()
108        }
109    }
110    /// Creates a new HTTP response with 200 OK status, HTML content type, and the given body
111    pub fn html(body: Bytes) -> Self {
112        Self {
113            status: StatusCode::OK,
114            headers: Some(vec![
115                HTTP_HEADER_CONTENT_HTML.clone(),
116                HTTP_HEADER_NO_CACHE.clone(),
117            ]),
118            body,
119            ..Default::default()
120        }
121    }
122    /// Creates a new HTTP response with 302 Temporary Redirect status to the given location
123    pub fn redirect(location: &str) -> pingora::Result<Self> {
124        let value = http::HeaderValue::from_str(location).map_err(|e| {
125            error!(error = e.to_string(), "to header value fail");
126            new_internal_error(500, e.to_string())
127        })?;
128        Ok(Self {
129            status: StatusCode::TEMPORARY_REDIRECT,
130            headers: Some(vec![
131                (http::header::LOCATION.clone(), value),
132                HTTP_HEADER_NO_CACHE.clone(),
133            ]),
134            ..Default::default()
135        })
136    }
137
138    /// Creates a new HTTP response with 200 OK status, text/plain content type, and the given body
139    pub fn text(body: Bytes) -> Self {
140        Self {
141            status: StatusCode::OK,
142            headers: Some(vec![
143                HTTP_HEADER_CONTENT_TEXT.clone(),
144                HTTP_HEADER_NO_CACHE.clone(),
145            ]),
146            body,
147            ..Default::default()
148        }
149    }
150    /// Creates a new HTTP response from a serializable value with the specified status code
151    pub fn try_from_json_status<T>(
152        value: &T,
153        status: StatusCode,
154    ) -> pingora::Result<Self>
155    where
156        T: ?Sized + Serialize,
157    {
158        let mut resp = Self::try_from_json(value)?;
159        resp.status = status;
160        Ok(resp)
161    }
162
163    /// Creates a new HTTP response from a serializable value with 200 OK status
164    pub fn try_from_json<T>(value: &T) -> pingora::Result<Self>
165    where
166        T: ?Sized + Serialize,
167    {
168        let buf = serde_json::to_vec(value).map_err(|e| {
169            error!(
170                category = LOG_CATEGORY,
171                error = e.to_string(),
172                "to json fail"
173            );
174            new_internal_error(400, e.to_string())
175        })?;
176        Ok(Self {
177            status: StatusCode::OK,
178            body: buf.into(),
179            headers: Some(vec![HTTP_HEADER_CONTENT_JSON.clone()]),
180            ..Default::default()
181        })
182    }
183    /// Builds and returns the HTTP response headers based on the response configuration
184    pub fn new_response_header(&self) -> pingora::Result<ResponseHeader> {
185        let fix_size = 3;
186        let size = self
187            .headers
188            .as_ref()
189            .map_or_else(|| fix_size, |headers| headers.len() + fix_size);
190        let mut resp = ResponseHeader::build(self.status, Some(size))?;
191        resp.insert_header(
192            header::CONTENT_LENGTH,
193            self.body.len().to_string(),
194        )?;
195
196        // set cache control
197        let cache_control =
198            new_cache_control_header(self.max_age, self.cache_private);
199        resp.insert_header(cache_control.0, cache_control.1)?;
200
201        if let Some(created_at) = self.created_at {
202            let secs = get_super_ts() - created_at;
203            if let Ok(value) = header::HeaderValue::from_str(&secs.to_string())
204            {
205                resp.insert_header(header::AGE, value)?;
206            }
207        }
208
209        if let Some(headers) = &self.headers {
210            for (name, value) in headers {
211                resp.insert_header(name.to_owned(), value)?;
212            }
213        }
214        Ok(resp)
215    }
216    /// Sends the HTTP response to the client and returns the number of bytes sent
217    pub async fn send(self, session: &mut Session) -> pingora::Result<usize> {
218        let header = self.new_response_header()?;
219        let size = self.body.len();
220        session
221            .write_response_header(Box::new(header), false)
222            .await?;
223        session.write_response_body(Some(self.body), true).await?;
224        session.finish_body().await?;
225        Ok(size)
226    }
227}
228
229/// Chunked response handler for streaming large responses
230pub struct HttpChunkResponse<'r, R> {
231    /// Pinned reader for streaming data
232    pub reader: Pin<&'r mut R>,
233    /// Size of each chunk in bytes
234    pub chunk_size: usize,
235    /// Cache control settings
236    pub max_age: Option<u32>,
237    pub cache_private: Option<bool>,
238    pub headers: Option<Vec<HttpHeader>>,
239}
240
241// Default chunk size of 8KB for streaming responses
242const DEFAULT_BUF_SIZE: usize = 8 * 1024;
243
244impl<'r, R> HttpChunkResponse<'r, R>
245where
246    R: tokio::io::AsyncRead + std::marker::Unpin,
247{
248    /// Creates a new chunked HTTP response with the given reader
249    pub fn new(r: &'r mut R) -> Self {
250        Self {
251            reader: Pin::new(r),
252            chunk_size: DEFAULT_BUF_SIZE,
253            max_age: None,
254            headers: None,
255            cache_private: None,
256        }
257    }
258    /// Builds and returns the HTTP response headers for chunked transfer
259    pub fn get_response_header(&self) -> pingora::Result<ResponseHeader> {
260        let mut resp = ResponseHeader::build(StatusCode::OK, Some(4))?;
261        if let Some(headers) = &self.headers {
262            for (name, value) in headers {
263                resp.insert_header(name.to_owned(), value)?;
264            }
265        }
266
267        let chunked = HTTP_HEADER_TRANSFER_CHUNKED.clone();
268        resp.insert_header(chunked.0, chunked.1)?;
269
270        let cache_control =
271            new_cache_control_header(self.max_age, self.cache_private);
272        resp.insert_header(cache_control.0, cache_control.1)?;
273        Ok(resp)
274    }
275    /// Sends the chunked response data to the client and returns total bytes sent
276    pub async fn send(
277        mut self,
278        session: &mut Session,
279    ) -> pingora::Result<usize> {
280        let header = self.get_response_header()?;
281        session
282            .write_response_header(Box::new(header), false)
283            .await?;
284
285        let mut sent = 0;
286        let chunk_size = self.chunk_size.max(512);
287        let mut buffer = vec![0; chunk_size];
288        loop {
289            let size = self.reader.read(&mut buffer).await.map_err(|e| {
290                error!(error = e.to_string(), "read data fail");
291                new_internal_error(400, e.to_string())
292            })?;
293            let end = size < chunk_size;
294            session
295                .write_response_body(
296                    Some(Bytes::copy_from_slice(&buffer[..size])),
297                    end,
298                )
299                .await?;
300            sent += size;
301            if end {
302                break;
303            }
304        }
305        session.finish_body().await?;
306
307        Ok(sent)
308    }
309}
310
311#[cfg(test)]
312mod tests {
313    use super::*;
314    use crate::convert_headers;
315    use bytes::Bytes;
316    use http::StatusCode;
317    use pretty_assertions::assert_eq;
318    use serde::Serialize;
319    use std::io::Write;
320    use tempfile::NamedTempFile;
321    use tokio::fs;
322    #[test]
323    fn test_new_cache_control_header() {
324        assert_eq!(
325            r###"("cache-control", "private, max-age=3600")"###,
326            format!("{:?}", new_cache_control_header(Some(3600), Some(true)))
327        );
328        assert_eq!(
329            r###"("cache-control", "public, max-age=3600")"###,
330            format!("{:?}", new_cache_control_header(Some(3600), None))
331        );
332        assert_eq!(
333            r###"("cache-control", "private, no-cache")"###,
334            format!("{:?}", new_cache_control_header(Some(0), Some(false)))
335        );
336        assert_eq!(
337            r###"("cache-control", "private, no-cache")"###,
338            format!("{:?}", new_cache_control_header(None, None))
339        );
340    }
341
342    #[test]
343    fn test_http_response() {
344        assert_eq!(
345            r###"HttpResponse { status: 204, body: b"", max_age: None, created_at: None, cache_private: None, headers: Some([("cache-control", "private, no-store")]) }"###,
346            format!("{:?}", HttpResponse::no_content())
347        );
348        assert_eq!(
349            r###"HttpResponse { status: 404, body: b"Not Found", max_age: None, created_at: None, cache_private: None, headers: Some([("cache-control", "private, no-store")]) }"###,
350            format!("{:?}", HttpResponse::not_found("Not Found".into()))
351        );
352        assert_eq!(
353            r###"HttpResponse { status: 500, body: b"Unknown Error", max_age: None, created_at: None, cache_private: None, headers: Some([("cache-control", "private, no-store")]) }"###,
354            format!(
355                "{:?}",
356                HttpResponse::unknown_error("Unknown Error".into())
357            )
358        );
359
360        assert_eq!(
361            r###"HttpResponse { status: 400, body: b"Bad Request", max_age: None, created_at: None, cache_private: None, headers: Some([("cache-control", "private, no-store")]) }"###,
362            format!("{:?}", HttpResponse::bad_request("Bad Request".into()))
363        );
364
365        assert_eq!(
366            r###"HttpResponse { status: 200, body: b"<p>Pingap</p>", max_age: None, created_at: None, cache_private: None, headers: Some([("content-type", "text/html; charset=utf-8"), ("cache-control", "private, no-cache")]) }"###,
367            format!("{:?}", HttpResponse::html("<p>Pingap</p>".into()))
368        );
369
370        assert_eq!(
371            r###"HttpResponse { status: 307, body: b"", max_age: None, created_at: None, cache_private: None, headers: Some([("location", "http://example.com/"), ("cache-control", "private, no-cache")]) }"###,
372            format!(
373                "{:?}",
374                HttpResponse::redirect("http://example.com/").unwrap()
375            )
376        );
377
378        assert_eq!(
379            r###"HttpResponse { status: 200, body: b"Hello World!", max_age: None, created_at: None, cache_private: None, headers: Some([("content-type", "text/plain; charset=utf-8"), ("cache-control", "private, no-cache")]) }"###,
380            format!("{:?}", HttpResponse::text("Hello World!".into()))
381        );
382
383        #[derive(Serialize)]
384        struct Data {
385            message: String,
386        }
387        let resp = HttpResponse::try_from_json_status(
388            &Data {
389                message: "Hello World!".to_string(),
390            },
391            StatusCode::BAD_REQUEST,
392        )
393        .unwrap();
394        assert_eq!(
395            r###"HttpResponse { status: 400, body: b"{\"message\":\"Hello World!\"}", max_age: None, created_at: None, cache_private: None, headers: Some([("content-type", "application/json; charset=utf-8")]) }"###,
396            format!("{resp:?}")
397        );
398        let resp = HttpResponse::try_from_json(&Data {
399            message: "Hello World!".to_string(),
400        })
401        .unwrap();
402        assert_eq!(
403            r###"HttpResponse { status: 200, body: b"{\"message\":\"Hello World!\"}", max_age: None, created_at: None, cache_private: None, headers: Some([("content-type", "application/json; charset=utf-8")]) }"###,
404            format!("{resp:?}")
405        );
406
407        let resp = HttpResponse {
408            status: StatusCode::OK,
409            body: Bytes::from("Hello world!"),
410            max_age: Some(3600),
411            created_at: Some(get_super_ts() - 10),
412            cache_private: Some(true),
413            headers: Some(
414                convert_headers(&[
415                    "Contont-Type: application/json".to_string(),
416                    "Content-Encoding: gzip".to_string(),
417                ])
418                .unwrap(),
419            ),
420        };
421
422        let mut header = resp.new_response_header().unwrap();
423        assert_eq!(true, !header.headers.get("Age").unwrap().is_empty());
424        header.remove_header("Age").unwrap();
425
426        assert_eq!(
427            r###"ResponseHeader { base: Parts { status: 200, version: HTTP/1.1, headers: {"content-length": "12", "cache-control": "private, max-age=3600", "content-encoding": "gzip", "contont-type": "application/json"} }, header_name_map: Some({"content-length": CaseHeaderName(b"Content-Length"), "cache-control": CaseHeaderName(b"Cache-Control"), "content-encoding": CaseHeaderName(b"Content-Encoding"), "contont-type": CaseHeaderName(b"contont-type")}), reason_phrase: None }"###,
428            format!("{header:?}")
429        );
430    }
431
432    #[tokio::test]
433    async fn test_http_chunk_response() {
434        let file = include_bytes!("../../error.html");
435        let mut f = NamedTempFile::new().unwrap();
436        f.write_all(file).unwrap();
437        let mut f = fs::OpenOptions::new().read(true).open(f).await.unwrap();
438        let mut resp = HttpChunkResponse::new(&mut f);
439        resp.max_age = Some(3600);
440        resp.cache_private = Some(false);
441        resp.headers = Some(
442            convert_headers(&["Contont-Type: text/html".to_string()]).unwrap(),
443        );
444        let header = resp.get_response_header().unwrap();
445        assert_eq!(
446            r###"ResponseHeader { base: Parts { status: 200, version: HTTP/1.1, headers: {"contont-type": "text/html", "transfer-encoding": "chunked", "cache-control": "public, max-age=3600"} }, header_name_map: Some({"contont-type": CaseHeaderName(b"contont-type"), "transfer-encoding": CaseHeaderName(b"Transfer-Encoding"), "cache-control": CaseHeaderName(b"Cache-Control")}), reason_phrase: None }"###,
447            format!("{header:?}")
448        );
449    }
450}