pingora_proxy/
proxy_trait.rs

1// Copyright 2025 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::*;
16use pingora_cache::{
17    key::HashBinary,
18    CacheKey, CacheMeta, ForcedInvalidationKind, HitHandler,
19    RespCacheable::{self, *},
20};
21use proxy_cache::range_filter::{self};
22use std::time::Duration;
23
24/// The interface to control the HTTP proxy
25///
26/// The methods in [ProxyHttp] are filters/callbacks which will be performed on all requests at their
27/// particular stage (if applicable).
28///
29/// If any of the filters returns [Result::Err], the request will fail, and the error will be logged.
30#[cfg_attr(not(doc_async_trait), async_trait)]
31pub trait ProxyHttp {
32    /// The per request object to share state across the different filters
33    type CTX;
34
35    /// Define how the `ctx` should be created.
36    fn new_ctx(&self) -> Self::CTX;
37
38    /// Define where the proxy should send the request to.
39    ///
40    /// The returned [HttpPeer] contains the information regarding where and how this request should
41    /// be forwarded to.
42    async fn upstream_peer(
43        &self,
44        session: &mut Session,
45        ctx: &mut Self::CTX,
46    ) -> Result<Box<HttpPeer>>;
47
48    /// Set up downstream modules.
49    ///
50    /// In this phase, users can add or configure [HttpModules] before the server starts up.
51    ///
52    /// In the default implementation of this method, [ResponseCompressionBuilder] is added
53    /// and disabled.
54    fn init_downstream_modules(&self, modules: &mut HttpModules) {
55        // Add disabled downstream compression module by default
56        modules.add_module(ResponseCompressionBuilder::enable(0));
57    }
58
59    /// Handle the incoming request.
60    ///
61    /// In this phase, users can parse, validate, rate limit, perform access control and/or
62    /// return a response for this request.
63    ///
64    /// If the user already sent a response to this request, an `Ok(true)` should be returned so that
65    /// the proxy would exit. The proxy continues to the next phases when `Ok(false)` is returned.
66    ///
67    /// By default this filter does nothing and returns `Ok(false)`.
68    async fn request_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool>
69    where
70        Self::CTX: Send + Sync,
71    {
72        Ok(false)
73    }
74
75    /// Handle the incoming request before any downstream module is executed.
76    ///
77    /// This function is similar to [Self::request_filter()] but executes before any other logic,
78    /// including downstream module logic. The main purpose of this function is to provide finer
79    /// grained control of the behavior of the modules.
80    ///
81    /// Note that because this function is executed before any module that might provide access
82    /// control or rate limiting, logic should stay in request_filter() if it can in order to be
83    /// protected by said modules.
84    async fn early_request_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> Result<()>
85    where
86        Self::CTX: Send + Sync,
87    {
88        Ok(())
89    }
90
91    /// Handle the incoming request body.
92    ///
93    /// This function will be called every time a piece of request body is received. The `body` is
94    /// **not the entire request body**.
95    ///
96    /// The async nature of this function allows to throttle the upload speed and/or executing
97    /// heavy computation logic such as WAF rules on offloaded threads without blocking the threads
98    /// who process the requests themselves.
99    async fn request_body_filter(
100        &self,
101        _session: &mut Session,
102        _body: &mut Option<Bytes>,
103        _end_of_stream: bool,
104        _ctx: &mut Self::CTX,
105    ) -> Result<()>
106    where
107        Self::CTX: Send + Sync,
108    {
109        Ok(())
110    }
111
112    /// This filter decides if the request is cacheable and what cache backend to use
113    ///
114    /// The caller can interact with `Session.cache` to enable caching.
115    ///
116    /// By default this filter does nothing which effectively disables caching.
117    // Ideally only session.cache should be modified, TODO: reflect that in this interface
118    fn request_cache_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> Result<()> {
119        Ok(())
120    }
121
122    /// This callback generates the cache key
123    ///
124    /// This callback is called only when cache is enabled for this request
125    ///
126    /// By default this callback returns a default cache key generated from the request.
127    fn cache_key_callback(&self, session: &Session, _ctx: &mut Self::CTX) -> Result<CacheKey> {
128        let req_header = session.req_header();
129        Ok(CacheKey::default(req_header))
130    }
131
132    /// This callback is invoked when a cacheable response is ready to be admitted to cache
133    fn cache_miss(&self, session: &mut Session, _ctx: &mut Self::CTX) {
134        session.cache.cache_miss();
135    }
136
137    /// This filter is called after a successful cache lookup and before the
138    /// cache asset is ready to be used.
139    ///
140    /// This filter allows the user to log or force invalidate the asset, or
141    /// to adjust the body reader associated with the cache hit.
142    /// This also runs on stale hit assets (for which `is_fresh` is false).
143    ///
144    /// The value returned indicates if the force invalidation should be used,
145    /// and which kind. Returning `None` indicates no forced invalidation
146    async fn cache_hit_filter(
147        &self,
148        _session: &mut Session,
149        _meta: &CacheMeta,
150        _hit_handler: &mut HitHandler,
151        _is_fresh: bool,
152        _ctx: &mut Self::CTX,
153    ) -> Result<Option<ForcedInvalidationKind>>
154    where
155        Self::CTX: Send + Sync,
156    {
157        Ok(None)
158    }
159
160    /// Decide if a request should continue to upstream after not being served from cache.
161    ///
162    /// returns: Ok(true) if the request should continue, Ok(false) if a response was written by the
163    /// callback and the session should be finished, or an error
164    ///
165    /// This filter can be used for deferring checks like rate limiting or access control to when they
166    /// actually needed after cache miss.
167    ///
168    /// By default the session will attempt to be reused after returning Ok(false). It is the
169    /// caller's responsibility to disable keepalive or drain the request body if needed.
170    async fn proxy_upstream_filter(
171        &self,
172        _session: &mut Session,
173        _ctx: &mut Self::CTX,
174    ) -> Result<bool>
175    where
176        Self::CTX: Send + Sync,
177    {
178        Ok(true)
179    }
180
181    /// Decide if the response is cacheable
182    fn response_cache_filter(
183        &self,
184        _session: &Session,
185        _resp: &ResponseHeader,
186        _ctx: &mut Self::CTX,
187    ) -> Result<RespCacheable> {
188        Ok(Uncacheable(NoCacheReason::Custom("default")))
189    }
190
191    /// Decide how to generate cache vary key from both request and response
192    ///
193    /// None means no variance is needed.
194    fn cache_vary_filter(
195        &self,
196        _meta: &CacheMeta,
197        _ctx: &mut Self::CTX,
198        _req: &RequestHeader,
199    ) -> Option<HashBinary> {
200        // default to None for now to disable vary feature
201        None
202    }
203
204    /// Decide if the incoming request's condition _fails_ against the cached response.
205    ///
206    /// Returning `Ok(true)` means that the response does _not_ match against the condition, and
207    /// that the proxy can return `304 Not Modified` downstream.
208    ///
209    /// An example is a conditional GET request with `If-None-Match: "foobar"`. If the cached
210    /// response contains the `ETag: "foobar"`, then the condition fails, and `304 Not Modified`
211    /// should be returned. Else, the condition passes which means the full `200 OK` response must
212    /// be sent.
213    fn cache_not_modified_filter(
214        &self,
215        session: &Session,
216        resp: &ResponseHeader,
217        _ctx: &mut Self::CTX,
218    ) -> Result<bool> {
219        Ok(
220            pingora_core::protocols::http::conditional_filter::not_modified_filter(
221                session.req_header(),
222                resp,
223            ),
224        )
225    }
226
227    /// This filter is called when cache is enabled to determine what byte range to return (in both
228    /// cache hit and miss cases) from the response body. It is only used when caching is enabled,
229    /// otherwise the upstream is responsible for any filtering. It allows users to define the range
230    /// this request is for via its return type `range_filter::RangeType`.
231    ///
232    /// It also allow users to modify the response header accordingly.
233    ///
234    /// The default implementation can handle a single-range as per [RFC7232].
235    ///
236    /// [RFC7232]: https://www.rfc-editor.org/rfc/rfc7232
237    fn range_header_filter(
238        &self,
239        session: &mut Session,
240        resp: &mut ResponseHeader,
241        _ctx: &mut Self::CTX,
242    ) -> range_filter::RangeType {
243        proxy_cache::range_filter::range_header_filter(session.req_header(), resp)
244    }
245
246    /// Modify the request before it is sent to the upstream
247    ///
248    /// Unlike [Self::request_filter()], this filter allows to change the request headers to send
249    /// to the upstream.
250    async fn upstream_request_filter(
251        &self,
252        _session: &mut Session,
253        _upstream_request: &mut RequestHeader,
254        _ctx: &mut Self::CTX,
255    ) -> Result<()>
256    where
257        Self::CTX: Send + Sync,
258    {
259        Ok(())
260    }
261
262    /// Modify the response header from the upstream
263    ///
264    /// The modification is before caching, so any change here will be stored in the cache if enabled.
265    ///
266    /// Responses served from cache won't trigger this filter. If the cache needed revalidation,
267    /// only the 304 from upstream will trigger the filter (though it will be merged into the
268    /// cached header, not served directly to downstream).
269    fn upstream_response_filter(
270        &self,
271        _session: &mut Session,
272        _upstream_response: &mut ResponseHeader,
273        _ctx: &mut Self::CTX,
274    ) -> Result<()> {
275        Ok(())
276    }
277
278    /// Modify the response header before it is send to the downstream
279    ///
280    /// The modification is after caching. This filter is called for all responses including
281    /// responses served from cache.
282    async fn response_filter(
283        &self,
284        _session: &mut Session,
285        _upstream_response: &mut ResponseHeader,
286        _ctx: &mut Self::CTX,
287    ) -> Result<()>
288    where
289        Self::CTX: Send + Sync,
290    {
291        Ok(())
292    }
293
294    /// Similar to [Self::upstream_response_filter()] but for response body
295    ///
296    /// This function will be called every time a piece of response body is received. The `body` is
297    /// **not the entire response body**.
298    fn upstream_response_body_filter(
299        &self,
300        _session: &mut Session,
301        _body: &mut Option<Bytes>,
302        _end_of_stream: bool,
303        _ctx: &mut Self::CTX,
304    ) -> Result<()> {
305        Ok(())
306    }
307
308    /// Similar to [Self::upstream_response_filter()] but for response trailers
309    fn upstream_response_trailer_filter(
310        &self,
311        _session: &mut Session,
312        _upstream_trailers: &mut header::HeaderMap,
313        _ctx: &mut Self::CTX,
314    ) -> Result<()> {
315        Ok(())
316    }
317
318    /// Similar to [Self::response_filter()] but for response body chunks
319    fn response_body_filter(
320        &self,
321        _session: &mut Session,
322        _body: &mut Option<Bytes>,
323        _end_of_stream: bool,
324        _ctx: &mut Self::CTX,
325    ) -> Result<Option<Duration>>
326    where
327        Self::CTX: Send + Sync,
328    {
329        Ok(None)
330    }
331
332    /// Similar to [Self::response_filter()] but for response trailers.
333    /// Note, returning an Ok(Some(Bytes)) will result in the downstream response
334    /// trailers being written to the response body.
335    ///
336    /// TODO: make this interface more intuitive
337    async fn response_trailer_filter(
338        &self,
339        _session: &mut Session,
340        _upstream_trailers: &mut header::HeaderMap,
341        _ctx: &mut Self::CTX,
342    ) -> Result<Option<Bytes>>
343    where
344        Self::CTX: Send + Sync,
345    {
346        Ok(None)
347    }
348
349    /// This filter is called when the entire response is sent to the downstream successfully or
350    /// there is a fatal error that terminate the request.
351    ///
352    /// An error log is already emitted if there is any error. This phase is used for collecting
353    /// metrics and sending access logs.
354    async fn logging(&self, _session: &mut Session, _e: Option<&Error>, _ctx: &mut Self::CTX)
355    where
356        Self::CTX: Send + Sync,
357    {
358    }
359
360    /// A value of true means that the log message will be suppressed. The default value is false.
361    fn suppress_error_log(&self, _session: &Session, _ctx: &Self::CTX, _error: &Error) -> bool {
362        false
363    }
364
365    /// This filter is called when there is an error **after** a connection is established (or reused)
366    /// to the upstream.
367    fn error_while_proxy(
368        &self,
369        peer: &HttpPeer,
370        session: &mut Session,
371        e: Box<Error>,
372        _ctx: &mut Self::CTX,
373        client_reused: bool,
374    ) -> Box<Error> {
375        let mut e = e.more_context(format!("Peer: {}", peer));
376        // only reused client connections where retry buffer is not truncated
377        e.retry
378            .decide_reuse(client_reused && !session.as_ref().retry_buffer_truncated());
379        e
380    }
381
382    /// This filter is called when there is an error in the process of establishing a connection
383    /// to the upstream.
384    ///
385    /// In this filter the user can decide whether the error is retry-able by marking the error `e`.
386    ///
387    /// If the error can be retried, [Self::upstream_peer()] will be called again so that the user
388    /// can decide whether to send the request to the same upstream or another upstream that is possibly
389    /// available.
390    fn fail_to_connect(
391        &self,
392        _session: &mut Session,
393        _peer: &HttpPeer,
394        _ctx: &mut Self::CTX,
395        e: Box<Error>,
396    ) -> Box<Error> {
397        e
398    }
399
400    /// This filter is called when the request encounters a fatal error.
401    ///
402    /// Users may write an error response to the downstream if the downstream is still writable.
403    ///
404    /// The response status code of the error response may be returned for logging purposes.
405    /// Additionally, the user can return whether this session may be reused in spite of the error.
406    /// Today this reuse status is only respected for errors that occur prior to upstream peer
407    /// selection, and the keepalive configured on the `Session` itself still takes precedent.
408    async fn fail_to_proxy(
409        &self,
410        session: &mut Session,
411        e: &Error,
412        _ctx: &mut Self::CTX,
413    ) -> FailToProxy
414    where
415        Self::CTX: Send + Sync,
416    {
417        let code = match e.etype() {
418            HTTPStatus(code) => *code,
419            _ => {
420                match e.esource() {
421                    ErrorSource::Upstream => 502,
422                    ErrorSource::Downstream => {
423                        match e.etype() {
424                            WriteError | ReadError | ConnectionClosed => {
425                                /* conn already dead */
426                                0
427                            }
428                            _ => 400,
429                        }
430                    }
431                    ErrorSource::Internal | ErrorSource::Unset => 500,
432                }
433            }
434        };
435        if code > 0 {
436            session.respond_error(code).await.unwrap_or_else(|e| {
437                error!("failed to send error response to downstream: {e}");
438            });
439        }
440
441        FailToProxy {
442            error_code: code,
443            // default to no reuse, which is safest
444            can_reuse_downstream: false,
445        }
446    }
447
448    /// Decide whether should serve stale when encountering an error or during revalidation
449    ///
450    /// An implementation should follow
451    /// <https://datatracker.ietf.org/doc/html/rfc9111#section-4.2.4>
452    /// <https://www.rfc-editor.org/rfc/rfc5861#section-4>
453    ///
454    /// This filter is only called if cache is enabled.
455    // 5xx HTTP status will be encoded as ErrorType::HTTPStatus(code)
456    fn should_serve_stale(
457        &self,
458        _session: &mut Session,
459        _ctx: &mut Self::CTX,
460        error: Option<&Error>, // None when it is called during stale while revalidate
461    ) -> bool {
462        // A cache MUST NOT generate a stale response unless
463        // it is disconnected
464        // or doing so is explicitly permitted by the client or origin server
465        // (e.g. headers or an out-of-band contract)
466        error.is_some_and(|e| e.esource() == &ErrorSource::Upstream)
467    }
468
469    /// This filter is called when the request just established or reused a connection to the upstream
470    ///
471    /// This filter allows user to log timing and connection related info.
472    async fn connected_to_upstream(
473        &self,
474        _session: &mut Session,
475        _reused: bool,
476        _peer: &HttpPeer,
477        #[cfg(unix)] _fd: std::os::unix::io::RawFd,
478        #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
479        _digest: Option<&Digest>,
480        _ctx: &mut Self::CTX,
481    ) -> Result<()>
482    where
483        Self::CTX: Send + Sync,
484    {
485        Ok(())
486    }
487
488    /// This callback is invoked every time request related error log needs to be generated
489    ///
490    /// Users can define what is important to be written about this request via the returned string.
491    fn request_summary(&self, session: &Session, _ctx: &Self::CTX) -> String {
492        session.as_ref().request_summary()
493    }
494
495    /// Whether the request should be used to invalidate(delete) the HTTP cache
496    ///
497    /// - `true`: this request will be used to invalidate the cache.
498    /// - `false`: this request is a treated as a normal request
499    fn is_purge(&self, _session: &Session, _ctx: &Self::CTX) -> bool {
500        false
501    }
502
503    /// This filter is called after the proxy cache generates the downstream response to the purge
504    /// request (to invalidate or delete from the HTTP cache), based on the purge status, which
505    /// indicates whether the request succeeded or failed.
506    ///
507    /// The filter allows the user to modify or replace the generated downstream response.
508    /// If the filter returns `Err`, the proxy will instead send a 500 response.
509    fn purge_response_filter(
510        &self,
511        _session: &Session,
512        _ctx: &mut Self::CTX,
513        _purge_status: PurgeStatus,
514        _purge_response: &mut std::borrow::Cow<'static, ResponseHeader>,
515    ) -> Result<()> {
516        Ok(())
517    }
518}
519
520/// Context struct returned by `fail_to_proxy`.
521pub struct FailToProxy {
522    pub error_code: u16,
523    pub can_reuse_downstream: bool,
524}