pingora_core/modules/http/
mod.rs1pub mod compression;
24pub mod grpc_web;
25
26use async_trait::async_trait;
27use bytes::Bytes;
28use http::HeaderMap;
29use once_cell::sync::OnceCell;
30use pingora_error::Result;
31use pingora_http::{RequestHeader, ResponseHeader};
32use std::any::Any;
33use std::any::TypeId;
34use std::collections::HashMap;
35use std::sync::Arc;
36
37#[async_trait]
39pub trait HttpModule {
40 async fn request_header_filter(&mut self, _req: &mut RequestHeader) -> Result<()> {
41 Ok(())
42 }
43
44 async fn request_body_filter(
45 &mut self,
46 _body: &mut Option<Bytes>,
47 _end_of_stream: bool,
48 ) -> Result<()> {
49 Ok(())
50 }
51
52 async fn response_header_filter(
53 &mut self,
54 _resp: &mut ResponseHeader,
55 _end_of_stream: bool,
56 ) -> Result<()> {
57 Ok(())
58 }
59
60 fn response_body_filter(
61 &mut self,
62 _body: &mut Option<Bytes>,
63 _end_of_stream: bool,
64 ) -> Result<()> {
65 Ok(())
66 }
67
68 fn response_trailer_filter(
69 &mut self,
70 _trailers: &mut Option<Box<HeaderMap>>,
71 ) -> Result<Option<Bytes>> {
72 Ok(None)
73 }
74
75 fn as_any(&self) -> &dyn Any;
76 fn as_any_mut(&mut self) -> &mut dyn Any;
77}
78
79pub type Module = Box<dyn HttpModule + 'static + Send + Sync>;
80
81pub trait HttpModuleBuilder {
83 fn order(&self) -> i16 {
88 0
89 }
90
91 fn init(&self) -> Module;
93}
94
95pub type ModuleBuilder = Box<dyn HttpModuleBuilder + 'static + Send + Sync>;
96
97pub struct HttpModules {
99 modules: Vec<ModuleBuilder>,
100 module_index: OnceCell<Arc<HashMap<TypeId, usize>>>,
101}
102
103impl HttpModules {
104 pub fn new() -> Self {
106 HttpModules {
107 modules: vec![],
108 module_index: OnceCell::new(),
109 }
110 }
111
112 pub fn add_module(&mut self, builder: ModuleBuilder) {
118 if self.module_index.get().is_some() {
119 panic!("cannot add module after ctx is already built")
122 }
123 self.modules.push(builder);
124 self.modules.sort_by_key(|m| -m.order());
127 }
128
129 pub fn build_ctx(&self) -> HttpModuleCtx {
131 let module_ctx: Vec<_> = self.modules.iter().map(|b| b.init()).collect();
132 let module_index = self
133 .module_index
134 .get_or_init(|| {
135 let mut module_index = HashMap::with_capacity(self.modules.len());
136 for (i, c) in module_ctx.iter().enumerate() {
137 let exist = module_index.insert(c.as_any().type_id(), i);
138 if exist.is_some() {
139 panic!("duplicated filters found")
140 }
141 }
142 Arc::new(module_index)
143 })
144 .clone();
145
146 HttpModuleCtx {
147 module_ctx,
148 module_index,
149 }
150 }
151}
152
153pub struct HttpModuleCtx {
158 module_ctx: Vec<Module>,
160 module_index: Arc<HashMap<TypeId, usize>>,
162}
163
164impl HttpModuleCtx {
165 pub fn empty() -> Self {
169 HttpModuleCtx {
170 module_ctx: vec![],
171 module_index: Arc::new(HashMap::new()),
172 }
173 }
174
175 pub fn get<T: 'static>(&self) -> Option<&T> {
177 let idx = self.module_index.get(&TypeId::of::<T>())?;
178 let ctx = &self.module_ctx[*idx];
179 Some(
180 ctx.as_any()
181 .downcast_ref::<T>()
182 .expect("type should always match"),
183 )
184 }
185
186 pub fn get_mut<T: 'static>(&mut self) -> Option<&mut T> {
188 let idx = self.module_index.get(&TypeId::of::<T>())?;
189 let ctx = &mut self.module_ctx[*idx];
190 Some(
191 ctx.as_any_mut()
192 .downcast_mut::<T>()
193 .expect("type should always match"),
194 )
195 }
196
197 pub async fn request_header_filter(&mut self, req: &mut RequestHeader) -> Result<()> {
199 for filter in self.module_ctx.iter_mut() {
200 filter.request_header_filter(req).await?;
201 }
202 Ok(())
203 }
204
205 pub async fn request_body_filter(
207 &mut self,
208 body: &mut Option<Bytes>,
209 end_of_stream: bool,
210 ) -> Result<()> {
211 for filter in self.module_ctx.iter_mut() {
212 filter.request_body_filter(body, end_of_stream).await?;
213 }
214 Ok(())
215 }
216
217 pub async fn response_header_filter(
219 &mut self,
220 req: &mut ResponseHeader,
221 end_of_stream: bool,
222 ) -> Result<()> {
223 for filter in self.module_ctx.iter_mut() {
224 filter.response_header_filter(req, end_of_stream).await?;
225 }
226 Ok(())
227 }
228
229 pub fn response_body_filter(
231 &mut self,
232 body: &mut Option<Bytes>,
233 end_of_stream: bool,
234 ) -> Result<()> {
235 for filter in self.module_ctx.iter_mut() {
236 filter.response_body_filter(body, end_of_stream)?;
237 }
238 Ok(())
239 }
240
241 pub fn response_trailer_filter(
250 &mut self,
251 trailers: &mut Option<Box<HeaderMap>>,
252 ) -> Result<Option<Bytes>> {
253 let mut encoded = None;
254 for filter in self.module_ctx.iter_mut() {
255 if let Some(buf) = filter.response_trailer_filter(trailers)? {
256 encoded = Some(buf);
257 }
258 }
259 Ok(encoded)
260 }
261}
262
263#[cfg(test)]
264mod tests {
265 use super::*;
266
267 struct MyModule;
268 #[async_trait]
269 impl HttpModule for MyModule {
270 fn as_any(&self) -> &dyn Any {
271 self
272 }
273 fn as_any_mut(&mut self) -> &mut dyn Any {
274 self
275 }
276 async fn request_header_filter(&mut self, req: &mut RequestHeader) -> Result<()> {
277 req.insert_header("my-filter", "1")
278 }
279 }
280 struct MyModuleBuilder;
281 impl HttpModuleBuilder for MyModuleBuilder {
282 fn order(&self) -> i16 {
283 1
284 }
285
286 fn init(&self) -> Module {
287 Box::new(MyModule)
288 }
289 }
290
291 struct MyOtherModule;
292 #[async_trait]
293 impl HttpModule for MyOtherModule {
294 fn as_any(&self) -> &dyn Any {
295 self
296 }
297 fn as_any_mut(&mut self) -> &mut dyn Any {
298 self
299 }
300 async fn request_header_filter(&mut self, req: &mut RequestHeader) -> Result<()> {
301 if req.headers.get("my-filter").is_some() {
302 req.insert_header("my-filter", "2")
304 } else {
305 req.insert_header("my-other-filter", "1")
307 }
308 }
309 }
310 struct MyOtherModuleBuilder;
311 impl HttpModuleBuilder for MyOtherModuleBuilder {
312 fn order(&self) -> i16 {
313 -1
314 }
315
316 fn init(&self) -> Module {
317 Box::new(MyOtherModule)
318 }
319 }
320
321 #[test]
322 fn test_module_get() {
323 let mut http_module = HttpModules::new();
324 http_module.add_module(Box::new(MyModuleBuilder));
325 http_module.add_module(Box::new(MyOtherModuleBuilder));
326 let mut ctx = http_module.build_ctx();
327 assert!(ctx.get::<MyModule>().is_some());
328 assert!(ctx.get::<MyOtherModule>().is_some());
329 assert!(ctx.get::<usize>().is_none());
330 assert!(ctx.get_mut::<MyModule>().is_some());
331 assert!(ctx.get_mut::<MyOtherModule>().is_some());
332 assert!(ctx.get_mut::<usize>().is_none());
333 }
334
335 #[tokio::test]
336 async fn test_module_filter() {
337 let mut http_module = HttpModules::new();
338 http_module.add_module(Box::new(MyOtherModuleBuilder));
339 http_module.add_module(Box::new(MyModuleBuilder));
340 let mut ctx = http_module.build_ctx();
341 let mut req = RequestHeader::build("Get", b"/", None).unwrap();
342 ctx.request_header_filter(&mut req).await.unwrap();
343 assert_eq!(req.headers.get("my-filter").unwrap(), "2");
345 assert!(req.headers.get("my-other-filter").is_none());
346 }
347}