pingora_header_serde/
lib.rs

1// Copyright 2025 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! HTTP Response header serialization with compression
16//!
17//! This crate is able to serialize http response header to about 1/3 of its original size (HTTP/1.1 wire format)
18//! with trained dictionary.
19
20#![warn(clippy::all)]
21#![allow(clippy::new_without_default)]
22#![allow(clippy::type_complexity)]
23
24pub mod dict;
25mod thread_zstd;
26
27use bytes::BufMut;
28use http::Version;
29use pingora_error::{Error, ErrorType, Result};
30use pingora_http::ResponseHeader;
31use std::cell::RefCell;
32use std::ops::DerefMut;
33use thread_local::ThreadLocal;
34
35/// HTTP Response header serialization
36///
37/// This struct provides the APIs to convert HTTP response header into compressed wired format for
38/// storage.
39pub struct HeaderSerde {
40    compression: ZstdCompression,
41    // internal buffer for uncompressed data to be compressed and vice versa
42    buf: ThreadLocal<RefCell<Vec<u8>>>,
43}
44
45const MAX_HEADER_SIZE: usize = 64 * 1024;
46const COMPRESS_LEVEL: i32 = 3;
47
48impl HeaderSerde {
49    /// Create a new [HeaderSerde]
50    ///
51    /// An optional zstd compression dictionary can be provided to improve the compression ratio
52    /// and speed. See [dict] for more details.
53    pub fn new(dict: Option<Vec<u8>>) -> Self {
54        if let Some(dict) = dict {
55            HeaderSerde {
56                compression: ZstdCompression::WithDict(thread_zstd::CompressionWithDict::new(
57                    &dict,
58                    COMPRESS_LEVEL,
59                )),
60                buf: ThreadLocal::new(),
61            }
62        } else {
63            HeaderSerde {
64                compression: ZstdCompression::Default(
65                    thread_zstd::Compression::new(),
66                    COMPRESS_LEVEL,
67                ),
68                buf: ThreadLocal::new(),
69            }
70        }
71    }
72
73    /// Serialize the given response header
74    pub fn serialize(&self, header: &ResponseHeader) -> Result<Vec<u8>> {
75        // for now we use HTTP 1.1 wire format for that
76        // TODO: should convert to h1 if the incoming header is for h2
77        let mut buf = self
78            .buf
79            .get_or(|| RefCell::new(Vec::with_capacity(MAX_HEADER_SIZE)))
80            .borrow_mut();
81        buf.clear(); // reset the buf
82        resp_header_to_buf(header, &mut buf);
83        self.compression.compress(&buf)
84    }
85
86    /// Deserialize the given response header
87    pub fn deserialize(&self, data: &[u8]) -> Result<ResponseHeader> {
88        let mut buf = self
89            .buf
90            .get_or(|| RefCell::new(Vec::with_capacity(MAX_HEADER_SIZE)))
91            .borrow_mut();
92        buf.clear(); // reset the buf
93        self.compression
94            .decompress_to_buffer(data, buf.deref_mut())?;
95        buf_to_http_header(&buf)
96    }
97}
98
99// Wrapper type to unify compressing with and withuot a dictionary,
100// since the two structs have different inputs for their APIs.
101enum ZstdCompression {
102    Default(thread_zstd::Compression, i32),
103    WithDict(thread_zstd::CompressionWithDict),
104}
105
106#[inline]
107fn into_error(e: &'static str, context: &'static str) -> Box<Error> {
108    Error::because(ErrorType::InternalError, context, e)
109}
110
111impl ZstdCompression {
112    fn compress(&self, data: &[u8]) -> Result<Vec<u8>> {
113        match &self {
114            ZstdCompression::Default(c, level) => c
115                .compress(data, *level)
116                .map_err(|e| into_error(e, "decompress header")),
117            ZstdCompression::WithDict(c) => c
118                .compress(data)
119                .map_err(|e| into_error(e, "decompress header")),
120        }
121    }
122
123    fn decompress_to_buffer(&self, source: &[u8], destination: &mut Vec<u8>) -> Result<usize> {
124        match &self {
125            ZstdCompression::Default(c, _) => c
126                .decompress_to_buffer(source, destination)
127                .map_err(|e| into_error(e, "decompress header")),
128            ZstdCompression::WithDict(c) => c
129                .decompress_to_buffer(source, destination)
130                .map_err(|e| into_error(e, "decompress header")),
131        }
132    }
133}
134
135const CRLF: &[u8; 2] = b"\r\n";
136
137// Borrowed from pingora http1
138#[inline]
139fn resp_header_to_buf(resp: &ResponseHeader, buf: &mut Vec<u8>) -> usize {
140    // Status-Line
141    let version = match resp.version {
142        Version::HTTP_10 => "HTTP/1.0 ",
143        Version::HTTP_11 => "HTTP/1.1 ",
144        _ => "HTTP/1.1 ", // store everything else (including h2) in http 1.1 format
145    };
146    buf.put_slice(version.as_bytes());
147    let status = resp.status;
148    buf.put_slice(status.as_str().as_bytes());
149    buf.put_u8(b' ');
150    let reason = status.canonical_reason();
151    if let Some(reason_buf) = reason {
152        buf.put_slice(reason_buf.as_bytes());
153    }
154    buf.put_slice(CRLF);
155
156    // headers
157    resp.header_to_h1_wire(buf);
158
159    buf.put_slice(CRLF);
160
161    buf.len()
162}
163
164// Should match pingora http1 setting
165const MAX_HEADERS: usize = 256;
166
167#[inline]
168fn buf_to_http_header(buf: &[u8]) -> Result<ResponseHeader> {
169    let mut headers = vec![httparse::EMPTY_HEADER; MAX_HEADERS];
170    let mut resp = httparse::Response::new(&mut headers);
171
172    match resp.parse(buf) {
173        Ok(s) => match s {
174            httparse::Status::Complete(_size) => parsed_to_header(&resp),
175            // we always feed the but that contains the entire header to parse
176            _ => Error::e_explain(ErrorType::InternalError, "incomplete uncompressed header"),
177        },
178        Err(e) => Error::e_because(
179            ErrorType::InternalError,
180            format!(
181                "parsing failed on uncompressed header, {}",
182                String::from_utf8_lossy(buf)
183            ),
184            e,
185        ),
186    }
187}
188
189#[inline]
190fn parsed_to_header(parsed: &httparse::Response) -> Result<ResponseHeader> {
191    // code should always be there
192    let mut resp = ResponseHeader::build(parsed.code.unwrap(), Some(parsed.headers.len()))?;
193
194    for header in parsed.headers.iter() {
195        resp.append_header(header.name.to_string(), header.value)?;
196    }
197
198    Ok(resp)
199}
200
201#[cfg(test)]
202mod tests {
203    use super::*;
204
205    #[test]
206    fn test_ser_wo_dict() {
207        let serde = HeaderSerde::new(None);
208        let mut header = ResponseHeader::build(200, None).unwrap();
209        header.append_header("foo", "bar").unwrap();
210        header.append_header("foo", "barbar").unwrap();
211        header.append_header("foo", "barbarbar").unwrap();
212        header.append_header("Server", "Pingora").unwrap();
213
214        let compressed = serde.serialize(&header).unwrap();
215        let mut buf = vec![];
216        let uncompressed = resp_header_to_buf(&header, &mut buf);
217        assert!(compressed.len() < uncompressed);
218    }
219
220    #[test]
221    fn test_ser_de_no_dict() {
222        let serde = HeaderSerde::new(None);
223        let mut header = ResponseHeader::build(200, None).unwrap();
224        header.append_header("foo1", "bar1").unwrap();
225        header.append_header("foo2", "barbar2").unwrap();
226        header.append_header("foo3", "barbarbar3").unwrap();
227        header.append_header("Server", "Pingora").unwrap();
228
229        let compressed = serde.serialize(&header).unwrap();
230        let header2 = serde.deserialize(&compressed).unwrap();
231        assert_eq!(header.status, header2.status);
232        assert_eq!(header.headers, header2.headers);
233    }
234}