pingora_header_serde/
lib.rs1#![warn(clippy::all)]
21#![allow(clippy::new_without_default)]
22#![allow(clippy::type_complexity)]
23
24pub mod dict;
25mod thread_zstd;
26
27use bytes::BufMut;
28use http::Version;
29use pingora_error::{Error, ErrorType, Result};
30use pingora_http::ResponseHeader;
31use std::cell::RefCell;
32use std::ops::DerefMut;
33use thread_local::ThreadLocal;
34
35pub struct HeaderSerde {
40 compression: ZstdCompression,
41 buf: ThreadLocal<RefCell<Vec<u8>>>,
43}
44
45const MAX_HEADER_SIZE: usize = 64 * 1024;
46const COMPRESS_LEVEL: i32 = 3;
47
48impl HeaderSerde {
49 pub fn new(dict: Option<Vec<u8>>) -> Self {
54 if let Some(dict) = dict {
55 HeaderSerde {
56 compression: ZstdCompression::WithDict(thread_zstd::CompressionWithDict::new(
57 &dict,
58 COMPRESS_LEVEL,
59 )),
60 buf: ThreadLocal::new(),
61 }
62 } else {
63 HeaderSerde {
64 compression: ZstdCompression::Default(
65 thread_zstd::Compression::new(),
66 COMPRESS_LEVEL,
67 ),
68 buf: ThreadLocal::new(),
69 }
70 }
71 }
72
73 pub fn serialize(&self, header: &ResponseHeader) -> Result<Vec<u8>> {
75 let mut buf = self
78 .buf
79 .get_or(|| RefCell::new(Vec::with_capacity(MAX_HEADER_SIZE)))
80 .borrow_mut();
81 buf.clear(); resp_header_to_buf(header, &mut buf);
83 self.compression.compress(&buf)
84 }
85
86 pub fn deserialize(&self, data: &[u8]) -> Result<ResponseHeader> {
88 let mut buf = self
89 .buf
90 .get_or(|| RefCell::new(Vec::with_capacity(MAX_HEADER_SIZE)))
91 .borrow_mut();
92 buf.clear(); self.compression
94 .decompress_to_buffer(data, buf.deref_mut())?;
95 buf_to_http_header(&buf)
96 }
97}
98
99enum ZstdCompression {
102 Default(thread_zstd::Compression, i32),
103 WithDict(thread_zstd::CompressionWithDict),
104}
105
106#[inline]
107fn into_error(e: &'static str, context: &'static str) -> Box<Error> {
108 Error::because(ErrorType::InternalError, context, e)
109}
110
111impl ZstdCompression {
112 fn compress(&self, data: &[u8]) -> Result<Vec<u8>> {
113 match &self {
114 ZstdCompression::Default(c, level) => c
115 .compress(data, *level)
116 .map_err(|e| into_error(e, "decompress header")),
117 ZstdCompression::WithDict(c) => c
118 .compress(data)
119 .map_err(|e| into_error(e, "decompress header")),
120 }
121 }
122
123 fn decompress_to_buffer(&self, source: &[u8], destination: &mut Vec<u8>) -> Result<usize> {
124 match &self {
125 ZstdCompression::Default(c, _) => c
126 .decompress_to_buffer(source, destination)
127 .map_err(|e| into_error(e, "decompress header")),
128 ZstdCompression::WithDict(c) => c
129 .decompress_to_buffer(source, destination)
130 .map_err(|e| into_error(e, "decompress header")),
131 }
132 }
133}
134
135const CRLF: &[u8; 2] = b"\r\n";
136
137#[inline]
139fn resp_header_to_buf(resp: &ResponseHeader, buf: &mut Vec<u8>) -> usize {
140 let version = match resp.version {
142 Version::HTTP_10 => "HTTP/1.0 ",
143 Version::HTTP_11 => "HTTP/1.1 ",
144 _ => "HTTP/1.1 ", };
146 buf.put_slice(version.as_bytes());
147 let status = resp.status;
148 buf.put_slice(status.as_str().as_bytes());
149 buf.put_u8(b' ');
150 let reason = status.canonical_reason();
151 if let Some(reason_buf) = reason {
152 buf.put_slice(reason_buf.as_bytes());
153 }
154 buf.put_slice(CRLF);
155
156 resp.header_to_h1_wire(buf);
158
159 buf.put_slice(CRLF);
160
161 buf.len()
162}
163
164const MAX_HEADERS: usize = 256;
166
167#[inline]
168fn buf_to_http_header(buf: &[u8]) -> Result<ResponseHeader> {
169 let mut headers = vec![httparse::EMPTY_HEADER; MAX_HEADERS];
170 let mut resp = httparse::Response::new(&mut headers);
171
172 match resp.parse(buf) {
173 Ok(s) => match s {
174 httparse::Status::Complete(_size) => parsed_to_header(&resp),
175 _ => Error::e_explain(ErrorType::InternalError, "incomplete uncompressed header"),
177 },
178 Err(e) => Error::e_because(
179 ErrorType::InternalError,
180 format!(
181 "parsing failed on uncompressed header, {}",
182 String::from_utf8_lossy(buf)
183 ),
184 e,
185 ),
186 }
187}
188
189#[inline]
190fn parsed_to_header(parsed: &httparse::Response) -> Result<ResponseHeader> {
191 let mut resp = ResponseHeader::build(parsed.code.unwrap(), Some(parsed.headers.len()))?;
193
194 for header in parsed.headers.iter() {
195 resp.append_header(header.name.to_string(), header.value)?;
196 }
197
198 Ok(resp)
199}
200
201#[cfg(test)]
202mod tests {
203 use super::*;
204
205 #[test]
206 fn test_ser_wo_dict() {
207 let serde = HeaderSerde::new(None);
208 let mut header = ResponseHeader::build(200, None).unwrap();
209 header.append_header("foo", "bar").unwrap();
210 header.append_header("foo", "barbar").unwrap();
211 header.append_header("foo", "barbarbar").unwrap();
212 header.append_header("Server", "Pingora").unwrap();
213
214 let compressed = serde.serialize(&header).unwrap();
215 let mut buf = vec![];
216 let uncompressed = resp_header_to_buf(&header, &mut buf);
217 assert!(compressed.len() < uncompressed);
218 }
219
220 #[test]
221 fn test_ser_de_no_dict() {
222 let serde = HeaderSerde::new(None);
223 let mut header = ResponseHeader::build(200, None).unwrap();
224 header.append_header("foo1", "bar1").unwrap();
225 header.append_header("foo2", "barbar2").unwrap();
226 header.append_header("foo3", "barbarbar3").unwrap();
227 header.append_header("Server", "Pingora").unwrap();
228
229 let compressed = serde.serialize(&header).unwrap();
230 let header2 = serde.deserialize(&compressed).unwrap();
231 assert_eq!(header.status, header2.status);
232 assert_eq!(header.headers, header2.headers);
233 }
234}