pingora_proxy/
proxy_trait.rs

1// Copyright 2024 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::*;
16use pingora_cache::{key::HashBinary, CacheKey, CacheMeta, RespCacheable, RespCacheable::*};
17use std::time::Duration;
18
19/// The interface to control the HTTP proxy
20///
21/// The methods in [ProxyHttp] are filters/callbacks which will be performed on all requests at their
22/// particular stage (if applicable).
23///
24/// If any of the filters returns [Result::Err], the request will fail, and the error will be logged.
25#[cfg_attr(not(doc_async_trait), async_trait)]
26pub trait ProxyHttp {
27    /// The per request object to share state across the different filters
28    type CTX;
29
30    /// Define how the `ctx` should be created.
31    fn new_ctx(&self) -> Self::CTX;
32
33    /// Define where the proxy should send the request to.
34    ///
35    /// The returned [HttpPeer] contains the information regarding where and how this request should
36    /// be forwarded to.
37    async fn upstream_peer(
38        &self,
39        session: &mut Session,
40        ctx: &mut Self::CTX,
41    ) -> Result<Box<HttpPeer>>;
42
43    /// Set up downstream modules.
44    ///
45    /// In this phase, users can add or configure [HttpModules] before the server starts up.
46    ///
47    /// In the default implementation of this method, [ResponseCompressionBuilder] is added
48    /// and disabled.
49    fn init_downstream_modules(&self, modules: &mut HttpModules) {
50        // Add disabled downstream compression module by default
51        modules.add_module(ResponseCompressionBuilder::enable(0));
52    }
53
54    /// Handle the incoming request.
55    ///
56    /// In this phase, users can parse, validate, rate limit, perform access control and/or
57    /// return a response for this request.
58    ///
59    /// If the user already sent a response to this request, an `Ok(true)` should be returned so that
60    /// the proxy would exit. The proxy continues to the next phases when `Ok(false)` is returned.
61    ///
62    /// By default this filter does nothing and returns `Ok(false)`.
63    async fn request_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool>
64    where
65        Self::CTX: Send + Sync,
66    {
67        Ok(false)
68    }
69
70    /// Handle the incoming request before any downstream module is executed.
71    ///
72    /// This function is similar to [Self::request_filter()] but executes before any other logic,
73    /// including downstream module logic. The main purpose of this function is to provide finer
74    /// grained control of the behavior of the modules.
75    ///
76    /// Note that because this function is executed before any module that might provide access
77    /// control or rate limiting, logic should stay in request_filter() if it can in order to be
78    /// protected by said modules.
79    async fn early_request_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> Result<()>
80    where
81        Self::CTX: Send + Sync,
82    {
83        Ok(())
84    }
85
86    /// Handle the incoming request body.
87    ///
88    /// This function will be called every time a piece of request body is received. The `body` is
89    /// **not the entire request body**.
90    ///
91    /// The async nature of this function allows to throttle the upload speed and/or executing
92    /// heavy computation logic such as WAF rules on offloaded threads without blocking the threads
93    /// who process the requests themselves.
94    async fn request_body_filter(
95        &self,
96        _session: &mut Session,
97        _body: &mut Option<Bytes>,
98        _end_of_stream: bool,
99        _ctx: &mut Self::CTX,
100    ) -> Result<()>
101    where
102        Self::CTX: Send + Sync,
103    {
104        Ok(())
105    }
106
107    /// This filter decides if the request is cacheable and what cache backend to use
108    ///
109    /// The caller can interact with `Session.cache` to enable caching.
110    ///
111    /// By default this filter does nothing which effectively disables caching.
112    // Ideally only session.cache should be modified, TODO: reflect that in this interface
113    fn request_cache_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> Result<()> {
114        Ok(())
115    }
116
117    /// This callback generates the cache key
118    ///
119    /// This callback is called only when cache is enabled for this request
120    ///
121    /// By default this callback returns a default cache key generated from the request.
122    fn cache_key_callback(&self, session: &Session, _ctx: &mut Self::CTX) -> Result<CacheKey> {
123        let req_header = session.req_header();
124        Ok(CacheKey::default(req_header))
125    }
126
127    /// This callback is invoked when a cacheable response is ready to be admitted to cache
128    fn cache_miss(&self, session: &mut Session, _ctx: &mut Self::CTX) {
129        session.cache.cache_miss();
130    }
131
132    /// This filter is called after a successful cache lookup and before the cache asset is ready to
133    /// be used.
134    ///
135    /// This filter allow the user to log or force expire the asset.
136    // flex purge, other filtering, returns whether asset is should be force expired or not
137    async fn cache_hit_filter(
138        &self,
139        _session: &Session,
140        _meta: &CacheMeta,
141        _ctx: &mut Self::CTX,
142    ) -> Result<bool>
143    where
144        Self::CTX: Send + Sync,
145    {
146        Ok(false)
147    }
148
149    /// Decide if a request should continue to upstream after not being served from cache.
150    ///
151    /// returns: Ok(true) if the request should continue, Ok(false) if a response was written by the
152    /// callback and the session should be finished, or an error
153    ///
154    /// This filter can be used for deferring checks like rate limiting or access control to when they
155    /// actually needed after cache miss.
156    async fn proxy_upstream_filter(
157        &self,
158        _session: &mut Session,
159        _ctx: &mut Self::CTX,
160    ) -> Result<bool>
161    where
162        Self::CTX: Send + Sync,
163    {
164        Ok(true)
165    }
166
167    /// Decide if the response is cacheable
168    fn response_cache_filter(
169        &self,
170        _session: &Session,
171        _resp: &ResponseHeader,
172        _ctx: &mut Self::CTX,
173    ) -> Result<RespCacheable> {
174        Ok(Uncacheable(NoCacheReason::Custom("default")))
175    }
176
177    /// Decide how to generate cache vary key from both request and response
178    ///
179    /// None means no variance is needed.
180    fn cache_vary_filter(
181        &self,
182        _meta: &CacheMeta,
183        _ctx: &mut Self::CTX,
184        _req: &RequestHeader,
185    ) -> Option<HashBinary> {
186        // default to None for now to disable vary feature
187        None
188    }
189
190    /// Decide if the incoming request's condition _fails_ against the cached response.
191    ///
192    /// Returning `Ok(true)` means that the response does _not_ match against the condition, and
193    /// that the proxy can return `304 Not Modified` downstream.
194    ///
195    /// An example is a conditional GET request with `If-None-Match: "foobar"`. If the cached
196    /// response contains the `ETag: "foobar"`, then the condition fails, and `304 Not Modified`
197    /// should be returned. Else, the condition passes which means the full `200 OK` response must
198    /// be sent.
199    fn cache_not_modified_filter(
200        &self,
201        session: &Session,
202        resp: &ResponseHeader,
203        _ctx: &mut Self::CTX,
204    ) -> Result<bool> {
205        Ok(
206            pingora_core::protocols::http::conditional_filter::not_modified_filter(
207                session.req_header(),
208                resp,
209            ),
210        )
211    }
212
213    /// Modify the request before it is sent to the upstream
214    ///
215    /// Unlike [Self::request_filter()], this filter allows to change the request headers to send
216    /// to the upstream.
217    async fn upstream_request_filter(
218        &self,
219        _session: &mut Session,
220        _upstream_request: &mut RequestHeader,
221        _ctx: &mut Self::CTX,
222    ) -> Result<()>
223    where
224        Self::CTX: Send + Sync,
225    {
226        Ok(())
227    }
228
229    /// Modify the response header from the upstream
230    ///
231    /// The modification is before caching, so any change here will be stored in the cache if enabled.
232    ///
233    /// Responses served from cache won't trigger this filter. If the cache needed revalidation,
234    /// only the 304 from upstream will trigger the filter (though it will be merged into the
235    /// cached header, not served directly to downstream).
236    fn upstream_response_filter(
237        &self,
238        _session: &mut Session,
239        _upstream_response: &mut ResponseHeader,
240        _ctx: &mut Self::CTX,
241    ) {
242    }
243
244    /// Modify the response header before it is send to the downstream
245    ///
246    /// The modification is after caching. This filter is called for all responses including
247    /// responses served from cache.
248    async fn response_filter(
249        &self,
250        _session: &mut Session,
251        _upstream_response: &mut ResponseHeader,
252        _ctx: &mut Self::CTX,
253    ) -> Result<()>
254    where
255        Self::CTX: Send + Sync,
256    {
257        Ok(())
258    }
259
260    /// Similar to [Self::upstream_response_filter()] but for response body
261    ///
262    /// This function will be called every time a piece of response body is received. The `body` is
263    /// **not the entire response body**.
264    fn upstream_response_body_filter(
265        &self,
266        _session: &mut Session,
267        _body: &mut Option<Bytes>,
268        _end_of_stream: bool,
269        _ctx: &mut Self::CTX,
270    ) {
271    }
272
273    /// Similar to [Self::upstream_response_filter()] but for response trailers
274    fn upstream_response_trailer_filter(
275        &self,
276        _session: &mut Session,
277        _upstream_trailers: &mut header::HeaderMap,
278        _ctx: &mut Self::CTX,
279    ) -> Result<()> {
280        Ok(())
281    }
282
283    /// Similar to [Self::response_filter()] but for response body chunks
284    fn response_body_filter(
285        &self,
286        _session: &mut Session,
287        _body: &mut Option<Bytes>,
288        _end_of_stream: bool,
289        _ctx: &mut Self::CTX,
290    ) -> Result<Option<Duration>>
291    where
292        Self::CTX: Send + Sync,
293    {
294        Ok(None)
295    }
296
297    /// Similar to [Self::response_filter()] but for response trailers.
298    /// Note, returning an Ok(Some(Bytes)) will result in the downstream response
299    /// trailers being written to the response body.
300    ///
301    /// TODO: make this interface more intuitive
302    async fn response_trailer_filter(
303        &self,
304        _session: &mut Session,
305        _upstream_trailers: &mut header::HeaderMap,
306        _ctx: &mut Self::CTX,
307    ) -> Result<Option<Bytes>>
308    where
309        Self::CTX: Send + Sync,
310    {
311        Ok(None)
312    }
313
314    /// This filter is called when the entire response is sent to the downstream successfully or
315    /// there is a fatal error that terminate the request.
316    ///
317    /// An error log is already emitted if there is any error. This phase is used for collecting
318    /// metrics and sending access logs.
319    async fn logging(&self, _session: &mut Session, _e: Option<&Error>, _ctx: &mut Self::CTX)
320    where
321        Self::CTX: Send + Sync,
322    {
323    }
324
325    /// A value of true means that the log message will be suppressed. The default value is false.
326    fn suppress_error_log(&self, _session: &Session, _ctx: &Self::CTX, _error: &Error) -> bool {
327        false
328    }
329
330    /// This filter is called when there is an error **after** a connection is established (or reused)
331    /// to the upstream.
332    fn error_while_proxy(
333        &self,
334        peer: &HttpPeer,
335        session: &mut Session,
336        e: Box<Error>,
337        _ctx: &mut Self::CTX,
338        client_reused: bool,
339    ) -> Box<Error> {
340        let mut e = e.more_context(format!("Peer: {}", peer));
341        // only reused client connections where retry buffer is not truncated
342        e.retry
343            .decide_reuse(client_reused && !session.as_ref().retry_buffer_truncated());
344        e
345    }
346
347    /// This filter is called when there is an error in the process of establishing a connection
348    /// to the upstream.
349    ///
350    /// In this filter the user can decide whether the error is retry-able by marking the error `e`.
351    ///
352    /// If the error can be retried, [Self::upstream_peer()] will be called again so that the user
353    /// can decide whether to send the request to the same upstream or another upstream that is possibly
354    /// available.
355    fn fail_to_connect(
356        &self,
357        _session: &mut Session,
358        _peer: &HttpPeer,
359        _ctx: &mut Self::CTX,
360        e: Box<Error>,
361    ) -> Box<Error> {
362        e
363    }
364
365    /// This filter is called when the request encounters a fatal error.
366    ///
367    /// Users may write an error response to the downstream if the downstream is still writable.
368    ///
369    /// The response status code of the error response maybe returned for logging purpose.
370    async fn fail_to_proxy(&self, session: &mut Session, e: &Error, _ctx: &mut Self::CTX) -> u16
371    where
372        Self::CTX: Send + Sync,
373    {
374        let server_session = session.as_mut();
375        let code = match e.etype() {
376            HTTPStatus(code) => *code,
377            _ => {
378                match e.esource() {
379                    ErrorSource::Upstream => 502,
380                    ErrorSource::Downstream => {
381                        match e.etype() {
382                            WriteError | ReadError | ConnectionClosed => {
383                                /* conn already dead */
384                                0
385                            }
386                            _ => 400,
387                        }
388                    }
389                    ErrorSource::Internal | ErrorSource::Unset => 500,
390                }
391            }
392        };
393        if code > 0 {
394            server_session.respond_error(code).await
395        }
396        code
397    }
398
399    /// Decide whether should serve stale when encountering an error or during revalidation
400    ///
401    /// An implementation should follow
402    /// <https://datatracker.ietf.org/doc/html/rfc9111#section-4.2.4>
403    /// <https://www.rfc-editor.org/rfc/rfc5861#section-4>
404    ///
405    /// This filter is only called if cache is enabled.
406    // 5xx HTTP status will be encoded as ErrorType::HTTPStatus(code)
407    fn should_serve_stale(
408        &self,
409        _session: &mut Session,
410        _ctx: &mut Self::CTX,
411        error: Option<&Error>, // None when it is called during stale while revalidate
412    ) -> bool {
413        // A cache MUST NOT generate a stale response unless
414        // it is disconnected
415        // or doing so is explicitly permitted by the client or origin server
416        // (e.g. headers or an out-of-band contract)
417        error.map_or(false, |e| e.esource() == &ErrorSource::Upstream)
418    }
419
420    /// This filter is called when the request just established or reused a connection to the upstream
421    ///
422    /// This filter allows user to log timing and connection related info.
423    async fn connected_to_upstream(
424        &self,
425        _session: &mut Session,
426        _reused: bool,
427        _peer: &HttpPeer,
428        #[cfg(unix)] _fd: std::os::unix::io::RawFd,
429        #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
430        _digest: Option<&Digest>,
431        _ctx: &mut Self::CTX,
432    ) -> Result<()>
433    where
434        Self::CTX: Send + Sync,
435    {
436        Ok(())
437    }
438
439    /// This callback is invoked every time request related error log needs to be generated
440    ///
441    /// Users can define what is important to be written about this request via the returned string.
442    fn request_summary(&self, session: &Session, _ctx: &Self::CTX) -> String {
443        session.as_ref().request_summary()
444    }
445
446    /// Whether the request should be used to invalidate(delete) the HTTP cache
447    ///
448    /// - `true`: this request will be used to invalidate the cache.
449    /// - `false`: this request is a treated as a normal request
450    fn is_purge(&self, _session: &Session, _ctx: &Self::CTX) -> bool {
451        false
452    }
453
454    /// This filter is called after the proxy cache generates the downstream response to the purge
455    /// request (to invalidate or delete from the HTTP cache), based on the purge status, which
456    /// indicates whether the request succeeded or failed.
457    ///
458    /// The filter allows the user to modify or replace the generated downstream response.
459    /// If the filter returns `Err`, the proxy will instead send a 500 response.
460    fn purge_response_filter(
461        &self,
462        _session: &Session,
463        _ctx: &mut Self::CTX,
464        _purge_status: PurgeStatus,
465        _purge_response: &mut std::borrow::Cow<'static, ResponseHeader>,
466    ) -> Result<()> {
467        Ok(())
468    }
469}