pingora_core/modules/http/
mod.rs

1// Copyright 2024 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Modules for HTTP traffic.
16//!
17//! [HttpModule]s define request and response filters to use while running an
18//! [HttpServer](crate::apps::http_app::HttpServer)
19//! application.
20//! See the [ResponseCompression](crate::modules::http::compression::ResponseCompression)
21//! module for an example of how to implement a basic module.
22
23pub mod compression;
24pub mod grpc_web;
25
26use async_trait::async_trait;
27use bytes::Bytes;
28use http::HeaderMap;
29use once_cell::sync::OnceCell;
30use pingora_error::Result;
31use pingora_http::{RequestHeader, ResponseHeader};
32use std::any::Any;
33use std::any::TypeId;
34use std::collections::HashMap;
35use std::sync::Arc;
36
37/// The trait an HTTP traffic module needs to implement
38#[async_trait]
39pub trait HttpModule {
40    async fn request_header_filter(&mut self, _req: &mut RequestHeader) -> Result<()> {
41        Ok(())
42    }
43
44    async fn request_body_filter(
45        &mut self,
46        _body: &mut Option<Bytes>,
47        _end_of_stream: bool,
48    ) -> Result<()> {
49        Ok(())
50    }
51
52    async fn response_header_filter(
53        &mut self,
54        _resp: &mut ResponseHeader,
55        _end_of_stream: bool,
56    ) -> Result<()> {
57        Ok(())
58    }
59
60    fn response_body_filter(
61        &mut self,
62        _body: &mut Option<Bytes>,
63        _end_of_stream: bool,
64    ) -> Result<()> {
65        Ok(())
66    }
67
68    fn response_trailer_filter(
69        &mut self,
70        _trailers: &mut Option<Box<HeaderMap>>,
71    ) -> Result<Option<Bytes>> {
72        Ok(None)
73    }
74
75    fn as_any(&self) -> &dyn Any;
76    fn as_any_mut(&mut self) -> &mut dyn Any;
77}
78
79pub type Module = Box<dyn HttpModule + 'static + Send + Sync>;
80
81/// Trait to init the http module ctx for each request
82pub trait HttpModuleBuilder {
83    /// The order the module will run
84    ///
85    /// The lower the value, the later it runs relative to other filters.
86    /// If the order of the filter is not important, leave it to the default 0.
87    fn order(&self) -> i16 {
88        0
89    }
90
91    /// Initialize and return the per request module context
92    fn init(&self) -> Module;
93}
94
95pub type ModuleBuilder = Box<dyn HttpModuleBuilder + 'static + Send + Sync>;
96
97/// The object to hold multiple http modules
98pub struct HttpModules {
99    modules: Vec<ModuleBuilder>,
100    module_index: OnceCell<Arc<HashMap<TypeId, usize>>>,
101}
102
103impl HttpModules {
104    /// Create a new [HttpModules]
105    pub fn new() -> Self {
106        HttpModules {
107            modules: vec![],
108            module_index: OnceCell::new(),
109        }
110    }
111
112    /// Add a new [ModuleBuilder] to [HttpModules]
113    ///
114    /// Each type of [HttpModule] can be only added once.
115    /// # Panic
116    /// Panic if any [HttpModule] is added more than once.
117    pub fn add_module(&mut self, builder: ModuleBuilder) {
118        if self.module_index.get().is_some() {
119            // We use a shared module_index the index would be out of sync if we
120            // add more modules.
121            panic!("cannot add module after ctx is already built")
122        }
123        self.modules.push(builder);
124        // not the most efficient way but should be fine
125        // largest order first
126        self.modules.sort_by_key(|m| -m.order());
127    }
128
129    /// Build the contexts of all the modules added to this [HttpModules]
130    pub fn build_ctx(&self) -> HttpModuleCtx {
131        let module_ctx: Vec<_> = self.modules.iter().map(|b| b.init()).collect();
132        let module_index = self
133            .module_index
134            .get_or_init(|| {
135                let mut module_index = HashMap::with_capacity(self.modules.len());
136                for (i, c) in module_ctx.iter().enumerate() {
137                    let exist = module_index.insert(c.as_any().type_id(), i);
138                    if exist.is_some() {
139                        panic!("duplicated filters found")
140                    }
141                }
142                Arc::new(module_index)
143            })
144            .clone();
145
146        HttpModuleCtx {
147            module_ctx,
148            module_index,
149        }
150    }
151}
152
153/// The Contexts of multiple modules
154///
155/// This is the object that will apply all the included modules to a certain HTTP request.
156/// The modules are ordered according to their `order()`.
157pub struct HttpModuleCtx {
158    // the modules in the order of execution
159    module_ctx: Vec<Module>,
160    // find the module in the vec with its type ID
161    module_index: Arc<HashMap<TypeId, usize>>,
162}
163
164impl HttpModuleCtx {
165    /// Create a placeholder empty [HttpModuleCtx].
166    ///
167    /// [HttpModules] should be used to create nonempty [HttpModuleCtx].
168    pub fn empty() -> Self {
169        HttpModuleCtx {
170            module_ctx: vec![],
171            module_index: Arc::new(HashMap::new()),
172        }
173    }
174
175    /// Get a ref to [HttpModule] if any.
176    pub fn get<T: 'static>(&self) -> Option<&T> {
177        let idx = self.module_index.get(&TypeId::of::<T>())?;
178        let ctx = &self.module_ctx[*idx];
179        Some(
180            ctx.as_any()
181                .downcast_ref::<T>()
182                .expect("type should always match"),
183        )
184    }
185
186    /// Get a mut ref to [HttpModule] if any.
187    pub fn get_mut<T: 'static>(&mut self) -> Option<&mut T> {
188        let idx = self.module_index.get(&TypeId::of::<T>())?;
189        let ctx = &mut self.module_ctx[*idx];
190        Some(
191            ctx.as_any_mut()
192                .downcast_mut::<T>()
193                .expect("type should always match"),
194        )
195    }
196
197    /// Run the `request_header_filter` for all the modules according to their orders.
198    pub async fn request_header_filter(&mut self, req: &mut RequestHeader) -> Result<()> {
199        for filter in self.module_ctx.iter_mut() {
200            filter.request_header_filter(req).await?;
201        }
202        Ok(())
203    }
204
205    /// Run the `request_body_filter` for all the modules according to their orders.
206    pub async fn request_body_filter(
207        &mut self,
208        body: &mut Option<Bytes>,
209        end_of_stream: bool,
210    ) -> Result<()> {
211        for filter in self.module_ctx.iter_mut() {
212            filter.request_body_filter(body, end_of_stream).await?;
213        }
214        Ok(())
215    }
216
217    /// Run the `response_header_filter` for all the modules according to their orders.
218    pub async fn response_header_filter(
219        &mut self,
220        req: &mut ResponseHeader,
221        end_of_stream: bool,
222    ) -> Result<()> {
223        for filter in self.module_ctx.iter_mut() {
224            filter.response_header_filter(req, end_of_stream).await?;
225        }
226        Ok(())
227    }
228
229    /// Run the `response_body_filter` for all the modules according to their orders.
230    pub fn response_body_filter(
231        &mut self,
232        body: &mut Option<Bytes>,
233        end_of_stream: bool,
234    ) -> Result<()> {
235        for filter in self.module_ctx.iter_mut() {
236            filter.response_body_filter(body, end_of_stream)?;
237        }
238        Ok(())
239    }
240
241    /// Run the `response_trailer_filter` for all the modules according to their orders.
242    ///
243    /// Returns an `Option<Bytes>` which can be used to write response trailers into
244    /// the response body. Note, if multiple modules attempt to write trailers into
245    /// the body the last one will be used.
246    ///
247    /// Implementors that intend to write trailers into the body need to ensure their filter
248    /// is using an encoding that supports this.
249    pub fn response_trailer_filter(
250        &mut self,
251        trailers: &mut Option<Box<HeaderMap>>,
252    ) -> Result<Option<Bytes>> {
253        let mut encoded = None;
254        for filter in self.module_ctx.iter_mut() {
255            if let Some(buf) = filter.response_trailer_filter(trailers)? {
256                encoded = Some(buf);
257            }
258        }
259        Ok(encoded)
260    }
261}
262
263#[cfg(test)]
264mod tests {
265    use super::*;
266
267    struct MyModule;
268    #[async_trait]
269    impl HttpModule for MyModule {
270        fn as_any(&self) -> &dyn Any {
271            self
272        }
273        fn as_any_mut(&mut self) -> &mut dyn Any {
274            self
275        }
276        async fn request_header_filter(&mut self, req: &mut RequestHeader) -> Result<()> {
277            req.insert_header("my-filter", "1")
278        }
279    }
280    struct MyModuleBuilder;
281    impl HttpModuleBuilder for MyModuleBuilder {
282        fn order(&self) -> i16 {
283            1
284        }
285
286        fn init(&self) -> Module {
287            Box::new(MyModule)
288        }
289    }
290
291    struct MyOtherModule;
292    #[async_trait]
293    impl HttpModule for MyOtherModule {
294        fn as_any(&self) -> &dyn Any {
295            self
296        }
297        fn as_any_mut(&mut self) -> &mut dyn Any {
298            self
299        }
300        async fn request_header_filter(&mut self, req: &mut RequestHeader) -> Result<()> {
301            if req.headers.get("my-filter").is_some() {
302                // if this MyOtherModule runs after MyModule
303                req.insert_header("my-filter", "2")
304            } else {
305                // if this MyOtherModule runs before MyModule
306                req.insert_header("my-other-filter", "1")
307            }
308        }
309    }
310    struct MyOtherModuleBuilder;
311    impl HttpModuleBuilder for MyOtherModuleBuilder {
312        fn order(&self) -> i16 {
313            -1
314        }
315
316        fn init(&self) -> Module {
317            Box::new(MyOtherModule)
318        }
319    }
320
321    #[test]
322    fn test_module_get() {
323        let mut http_module = HttpModules::new();
324        http_module.add_module(Box::new(MyModuleBuilder));
325        http_module.add_module(Box::new(MyOtherModuleBuilder));
326        let mut ctx = http_module.build_ctx();
327        assert!(ctx.get::<MyModule>().is_some());
328        assert!(ctx.get::<MyOtherModule>().is_some());
329        assert!(ctx.get::<usize>().is_none());
330        assert!(ctx.get_mut::<MyModule>().is_some());
331        assert!(ctx.get_mut::<MyOtherModule>().is_some());
332        assert!(ctx.get_mut::<usize>().is_none());
333    }
334
335    #[tokio::test]
336    async fn test_module_filter() {
337        let mut http_module = HttpModules::new();
338        http_module.add_module(Box::new(MyOtherModuleBuilder));
339        http_module.add_module(Box::new(MyModuleBuilder));
340        let mut ctx = http_module.build_ctx();
341        let mut req = RequestHeader::build("Get", b"/", None).unwrap();
342        ctx.request_header_filter(&mut req).await.unwrap();
343        // MyModule runs before MyOtherModule
344        assert_eq!(req.headers.get("my-filter").unwrap(), "2");
345        assert!(req.headers.get("my-other-filter").is_none());
346    }
347}