pingora_core/modules/http/
mod.rs1pub mod compression;
24pub mod grpc_web;
25
26use async_trait::async_trait;
27use bytes::Bytes;
28use http::HeaderMap;
29use once_cell::sync::OnceCell;
30use pingora_error::Result;
31use pingora_http::{RequestHeader, ResponseHeader};
32use std::any::Any;
33use std::any::TypeId;
34use std::collections::HashMap;
35use std::sync::Arc;
36
37#[async_trait]
39pub trait HttpModule {
40 async fn request_header_filter(&mut self, _req: &mut RequestHeader) -> Result<()> {
41 Ok(())
42 }
43
44 async fn request_body_filter(
45 &mut self,
46 _body: &mut Option<Bytes>,
47 _end_of_stream: bool,
48 ) -> Result<()> {
49 Ok(())
50 }
51
52 async fn response_header_filter(
53 &mut self,
54 _resp: &mut ResponseHeader,
55 _end_of_stream: bool,
56 ) -> Result<()> {
57 Ok(())
58 }
59
60 fn response_body_filter(
61 &mut self,
62 _body: &mut Option<Bytes>,
63 _end_of_stream: bool,
64 ) -> Result<()> {
65 Ok(())
66 }
67
68 fn response_trailer_filter(
69 &mut self,
70 _trailers: &mut Option<Box<HeaderMap>>,
71 ) -> Result<Option<Bytes>> {
72 Ok(None)
73 }
74
75 fn response_done_filter(&mut self) -> Result<Option<Bytes>> {
76 Ok(None)
77 }
78
79 fn as_any(&self) -> &dyn Any;
80 fn as_any_mut(&mut self) -> &mut dyn Any;
81}
82
83pub type Module = Box<dyn HttpModule + 'static + Send + Sync>;
84
85pub trait HttpModuleBuilder {
87 fn order(&self) -> i16 {
92 0
93 }
94
95 fn init(&self) -> Module;
97}
98
99pub type ModuleBuilder = Box<dyn HttpModuleBuilder + 'static + Send + Sync>;
100
101pub struct HttpModules {
103 modules: Vec<ModuleBuilder>,
104 module_index: OnceCell<Arc<HashMap<TypeId, usize>>>,
105}
106
107impl HttpModules {
108 pub fn new() -> Self {
110 HttpModules {
111 modules: vec![],
112 module_index: OnceCell::new(),
113 }
114 }
115
116 pub fn add_module(&mut self, builder: ModuleBuilder) {
122 if self.module_index.get().is_some() {
123 panic!("cannot add module after ctx is already built")
126 }
127 self.modules.push(builder);
128 self.modules.sort_by_key(|m| -m.order());
131 }
132
133 pub fn build_ctx(&self) -> HttpModuleCtx {
135 let module_ctx: Vec<_> = self.modules.iter().map(|b| b.init()).collect();
136 let module_index = self
137 .module_index
138 .get_or_init(|| {
139 let mut module_index = HashMap::with_capacity(self.modules.len());
140 for (i, c) in module_ctx.iter().enumerate() {
141 let exist = module_index.insert(c.as_any().type_id(), i);
142 if exist.is_some() {
143 panic!("duplicated filters found")
144 }
145 }
146 Arc::new(module_index)
147 })
148 .clone();
149
150 HttpModuleCtx {
151 module_ctx,
152 module_index,
153 }
154 }
155}
156
157pub struct HttpModuleCtx {
162 module_ctx: Vec<Module>,
164 module_index: Arc<HashMap<TypeId, usize>>,
166}
167
168impl HttpModuleCtx {
169 pub fn empty() -> Self {
173 HttpModuleCtx {
174 module_ctx: vec![],
175 module_index: Arc::new(HashMap::new()),
176 }
177 }
178
179 pub fn get<T: 'static>(&self) -> Option<&T> {
181 let idx = self.module_index.get(&TypeId::of::<T>())?;
182 let ctx = &self.module_ctx[*idx];
183 Some(
184 ctx.as_any()
185 .downcast_ref::<T>()
186 .expect("type should always match"),
187 )
188 }
189
190 pub fn get_mut<T: 'static>(&mut self) -> Option<&mut T> {
192 let idx = self.module_index.get(&TypeId::of::<T>())?;
193 let ctx = &mut self.module_ctx[*idx];
194 Some(
195 ctx.as_any_mut()
196 .downcast_mut::<T>()
197 .expect("type should always match"),
198 )
199 }
200
201 pub async fn request_header_filter(&mut self, req: &mut RequestHeader) -> Result<()> {
203 for filter in self.module_ctx.iter_mut() {
204 filter.request_header_filter(req).await?;
205 }
206 Ok(())
207 }
208
209 pub async fn request_body_filter(
211 &mut self,
212 body: &mut Option<Bytes>,
213 end_of_stream: bool,
214 ) -> Result<()> {
215 for filter in self.module_ctx.iter_mut() {
216 filter.request_body_filter(body, end_of_stream).await?;
217 }
218 Ok(())
219 }
220
221 pub async fn response_header_filter(
223 &mut self,
224 req: &mut ResponseHeader,
225 end_of_stream: bool,
226 ) -> Result<()> {
227 for filter in self.module_ctx.iter_mut() {
228 filter.response_header_filter(req, end_of_stream).await?;
229 }
230 Ok(())
231 }
232
233 pub fn response_body_filter(
235 &mut self,
236 body: &mut Option<Bytes>,
237 end_of_stream: bool,
238 ) -> Result<()> {
239 for filter in self.module_ctx.iter_mut() {
240 filter.response_body_filter(body, end_of_stream)?;
241 }
242 Ok(())
243 }
244
245 pub fn response_trailer_filter(
254 &mut self,
255 trailers: &mut Option<Box<HeaderMap>>,
256 ) -> Result<Option<Bytes>> {
257 let mut encoded = None;
258 for filter in self.module_ctx.iter_mut() {
259 if let Some(buf) = filter.response_trailer_filter(trailers)? {
260 encoded = Some(buf);
261 }
262 }
263 Ok(encoded)
264 }
265
266 pub fn response_done_filter(&mut self) -> Result<Option<Bytes>> {
275 let mut encoded = None;
276 for filter in self.module_ctx.iter_mut() {
277 if let Some(buf) = filter.response_done_filter()? {
278 encoded = Some(buf);
279 }
280 }
281 Ok(encoded)
282 }
283}
284
285#[cfg(test)]
286mod tests {
287 use super::*;
288
289 struct MyModule;
290 #[async_trait]
291 impl HttpModule for MyModule {
292 fn as_any(&self) -> &dyn Any {
293 self
294 }
295 fn as_any_mut(&mut self) -> &mut dyn Any {
296 self
297 }
298 async fn request_header_filter(&mut self, req: &mut RequestHeader) -> Result<()> {
299 req.insert_header("my-filter", "1")
300 }
301 }
302 struct MyModuleBuilder;
303 impl HttpModuleBuilder for MyModuleBuilder {
304 fn order(&self) -> i16 {
305 1
306 }
307
308 fn init(&self) -> Module {
309 Box::new(MyModule)
310 }
311 }
312
313 struct MyOtherModule;
314 #[async_trait]
315 impl HttpModule for MyOtherModule {
316 fn as_any(&self) -> &dyn Any {
317 self
318 }
319 fn as_any_mut(&mut self) -> &mut dyn Any {
320 self
321 }
322 async fn request_header_filter(&mut self, req: &mut RequestHeader) -> Result<()> {
323 if req.headers.get("my-filter").is_some() {
324 req.insert_header("my-filter", "2")
326 } else {
327 req.insert_header("my-other-filter", "1")
329 }
330 }
331 }
332 struct MyOtherModuleBuilder;
333 impl HttpModuleBuilder for MyOtherModuleBuilder {
334 fn order(&self) -> i16 {
335 -1
336 }
337
338 fn init(&self) -> Module {
339 Box::new(MyOtherModule)
340 }
341 }
342
343 #[test]
344 fn test_module_get() {
345 let mut http_module = HttpModules::new();
346 http_module.add_module(Box::new(MyModuleBuilder));
347 http_module.add_module(Box::new(MyOtherModuleBuilder));
348 let mut ctx = http_module.build_ctx();
349 assert!(ctx.get::<MyModule>().is_some());
350 assert!(ctx.get::<MyOtherModule>().is_some());
351 assert!(ctx.get::<usize>().is_none());
352 assert!(ctx.get_mut::<MyModule>().is_some());
353 assert!(ctx.get_mut::<MyOtherModule>().is_some());
354 assert!(ctx.get_mut::<usize>().is_none());
355 }
356
357 #[tokio::test]
358 async fn test_module_filter() {
359 let mut http_module = HttpModules::new();
360 http_module.add_module(Box::new(MyOtherModuleBuilder));
361 http_module.add_module(Box::new(MyModuleBuilder));
362 let mut ctx = http_module.build_ctx();
363 let mut req = RequestHeader::build("Get", b"/", None).unwrap();
364 ctx.request_header_filter(&mut req).await.unwrap();
365 assert_eq!(req.headers.get("my-filter").unwrap(), "2");
367 assert!(req.headers.get("my-other-filter").is_none());
368 }
369}