Skip to main content

pingora_proxy/
proxy_trait.rs

1// Copyright 2026 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::*;
16use pingora_cache::{
17    key::HashBinary,
18    CacheKey, CacheMeta, ForcedFreshness, HitHandler,
19    RespCacheable::{self, *},
20};
21use proxy_cache::range_filter::{self};
22use std::time::Duration;
23
24/// The interface to control the HTTP proxy
25///
26/// The methods in [ProxyHttp] are filters/callbacks which will be performed on all requests at their
27/// particular stage (if applicable).
28///
29/// If any of the filters returns [Result::Err], the request will fail, and the error will be logged.
30#[cfg_attr(not(doc_async_trait), async_trait)]
31pub trait ProxyHttp {
32    /// The per request object to share state across the different filters
33    type CTX;
34
35    /// Define how the `ctx` should be created.
36    fn new_ctx(&self) -> Self::CTX;
37
38    /// Define where the proxy should send the request to.
39    ///
40    /// The returned [HttpPeer] contains the information regarding where and how this request should
41    /// be forwarded to.
42    async fn upstream_peer(
43        &self,
44        session: &mut Session,
45        ctx: &mut Self::CTX,
46    ) -> Result<Box<HttpPeer>>;
47
48    /// Set up downstream modules.
49    ///
50    /// In this phase, users can add or configure [HttpModules] before the server starts up.
51    ///
52    /// In the default implementation of this method, [ResponseCompressionBuilder] is added
53    /// and disabled.
54    fn init_downstream_modules(&self, modules: &mut HttpModules) {
55        // Add disabled downstream compression module by default
56        modules.add_module(ResponseCompressionBuilder::enable(0));
57    }
58
59    /// Handle the incoming request.
60    ///
61    /// In this phase, users can parse, validate, rate limit, perform access control and/or
62    /// return a response for this request.
63    ///
64    /// If the user already sent a response to this request, an `Ok(true)` should be returned so that
65    /// the proxy would exit. The proxy continues to the next phases when `Ok(false)` is returned.
66    ///
67    /// By default this filter does nothing and returns `Ok(false)`.
68    async fn request_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool>
69    where
70        Self::CTX: Send + Sync,
71    {
72        Ok(false)
73    }
74
75    /// Handle the incoming request before any downstream module is executed.
76    ///
77    /// This function is similar to [Self::request_filter()] but executes before any other logic,
78    /// including downstream module logic. The main purpose of this function is to provide finer
79    /// grained control of the behavior of the modules.
80    ///
81    /// Note that because this function is executed before any module that might provide access
82    /// control or rate limiting, logic should stay in request_filter() if it can in order to be
83    /// protected by said modules.
84    async fn early_request_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> Result<()>
85    where
86        Self::CTX: Send + Sync,
87    {
88        Ok(())
89    }
90
91    /// Returns whether this session is allowed to spawn subrequests.
92    ///
93    /// This function is checked after [Self::early_request_filter] to allow that filter to configure
94    /// this if required. This will also run for subrequests themselves, which may allowed to spawn
95    /// their own subrequests.
96    ///
97    /// Note that this doesn't prevent subrequests from being spawned based on the session by proxy
98    /// core functionality, e.g. background cache revalidation requires spawning subrequests.
99    fn allow_spawning_subrequest(&self, _session: &Session, _ctx: &Self::CTX) -> bool
100    where
101        Self::CTX: Send + Sync,
102    {
103        false
104    }
105
106    /// Handle the incoming request body.
107    ///
108    /// This function will be called every time a piece of request body is received. The `body` is
109    /// **not the entire request body**.
110    ///
111    /// The async nature of this function allows to throttle the upload speed and/or executing
112    /// heavy computation logic such as WAF rules on offloaded threads without blocking the threads
113    /// who process the requests themselves.
114    async fn request_body_filter(
115        &self,
116        _session: &mut Session,
117        _body: &mut Option<Bytes>,
118        _end_of_stream: bool,
119        _ctx: &mut Self::CTX,
120    ) -> Result<()>
121    where
122        Self::CTX: Send + Sync,
123    {
124        Ok(())
125    }
126
127    /// This filter decides if the request is cacheable and what cache backend to use
128    ///
129    /// The caller can interact with `Session.cache` to enable caching.
130    ///
131    /// By default this filter does nothing which effectively disables caching.
132    // Ideally only session.cache should be modified, TODO: reflect that in this interface
133    fn request_cache_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> Result<()> {
134        Ok(())
135    }
136
137    /// This callback generates the cache key
138    ///
139    /// This callback is called only when cache is enabled for this request
140    ///
141    /// By default this callback returns a default cache key generated from the request.
142    fn cache_key_callback(&self, session: &Session, _ctx: &mut Self::CTX) -> Result<CacheKey> {
143        let req_header = session.req_header();
144        Ok(CacheKey::default(req_header))
145    }
146
147    /// This callback is invoked when a cacheable response is ready to be admitted to cache.
148    fn cache_miss(&self, session: &mut Session, _ctx: &mut Self::CTX) {
149        session.cache.cache_miss();
150    }
151
152    /// This filter is called after a successful cache lookup and before the
153    /// cache asset is ready to be used.
154    ///
155    /// This filter allows the user to log or force invalidate the asset, or
156    /// to adjust the body reader associated with the cache hit.
157    /// This also runs on stale hit assets (for which `is_fresh` is false).
158    ///
159    /// The value returned indicates if the force invalidation should be used,
160    /// and which kind. Returning `None` indicates no forced invalidation
161    async fn cache_hit_filter(
162        &self,
163        _session: &mut Session,
164        _meta: &CacheMeta,
165        _hit_handler: &mut HitHandler,
166        _is_fresh: bool,
167        _ctx: &mut Self::CTX,
168    ) -> Result<Option<ForcedFreshness>>
169    where
170        Self::CTX: Send + Sync,
171    {
172        Ok(None)
173    }
174
175    /// Decide if a request should continue to upstream after not being served from cache.
176    ///
177    /// returns: Ok(true) if the request should continue, Ok(false) if a response was written by the
178    /// callback and the session should be finished, or an error
179    ///
180    /// This filter can be used for deferring checks like rate limiting or access control to when they
181    /// actually needed after cache miss.
182    ///
183    /// By default the session will attempt to be reused after returning Ok(false). It is the
184    /// caller's responsibility to disable keepalive or drain the request body if needed.
185    async fn proxy_upstream_filter(
186        &self,
187        _session: &mut Session,
188        _ctx: &mut Self::CTX,
189    ) -> Result<bool>
190    where
191        Self::CTX: Send + Sync,
192    {
193        Ok(true)
194    }
195
196    /// Decide if the response is cacheable
197    fn response_cache_filter(
198        &self,
199        _session: &Session,
200        _resp: &ResponseHeader,
201        _ctx: &mut Self::CTX,
202    ) -> Result<RespCacheable> {
203        Ok(Uncacheable(NoCacheReason::Custom("default")))
204    }
205
206    /// Decide how to generate cache vary key from both request and response
207    ///
208    /// None means no variance is needed.
209    fn cache_vary_filter(
210        &self,
211        _meta: &CacheMeta,
212        _ctx: &mut Self::CTX,
213        _req: &RequestHeader,
214    ) -> Option<HashBinary> {
215        // default to None for now to disable vary feature
216        None
217    }
218
219    /// Decide if the incoming request's condition _fails_ against the cached response.
220    ///
221    /// Returning `Ok(true)` means that the response does _not_ match against the condition, and
222    /// that the proxy can return `304 Not Modified` downstream.
223    ///
224    /// An example is a conditional GET request with `If-None-Match: "foobar"`. If the cached
225    /// response contains the `ETag: "foobar"`, then the condition fails, and `304 Not Modified`
226    /// should be returned. Else, the condition passes which means the full `200 OK` response must
227    /// be sent.
228    fn cache_not_modified_filter(
229        &self,
230        session: &Session,
231        resp: &ResponseHeader,
232        _ctx: &mut Self::CTX,
233    ) -> Result<bool> {
234        Ok(
235            pingora_core::protocols::http::conditional_filter::not_modified_filter(
236                session.req_header(),
237                resp,
238            ),
239        )
240    }
241
242    /// This filter is called when cache is enabled to determine what byte range to return (in both
243    /// cache hit and miss cases) from the response body. It is only used when caching is enabled,
244    /// otherwise the upstream is responsible for any filtering. It allows users to define the range
245    /// this request is for via its return type `range_filter::RangeType`.
246    ///
247    /// It also allow users to modify the response header accordingly.
248    ///
249    /// The default implementation can handle a single-range as per [RFC7232].
250    ///
251    /// [RFC7232]: https://www.rfc-editor.org/rfc/rfc7232
252    fn range_header_filter(
253        &self,
254        session: &mut Session,
255        resp: &mut ResponseHeader,
256        _ctx: &mut Self::CTX,
257    ) -> range_filter::RangeType {
258        const DEFAULT_MAX_RANGES: Option<usize> = Some(200);
259        proxy_cache::range_filter::range_header_filter(
260            session.req_header(),
261            resp,
262            DEFAULT_MAX_RANGES,
263        )
264    }
265
266    /// Modify the request before it is sent to the upstream
267    ///
268    /// Unlike [Self::request_filter()], this filter allows to change the request headers to send
269    /// to the upstream.
270    async fn upstream_request_filter(
271        &self,
272        _session: &mut Session,
273        _upstream_request: &mut RequestHeader,
274        _ctx: &mut Self::CTX,
275    ) -> Result<()>
276    where
277        Self::CTX: Send + Sync,
278    {
279        Ok(())
280    }
281
282    /// Modify the response header from the upstream
283    ///
284    /// The modification is before caching, so any change here will be stored in the cache if enabled.
285    ///
286    /// Responses served from cache won't trigger this filter. If the cache needed revalidation,
287    /// only the 304 from upstream will trigger the filter (though it will be merged into the
288    /// cached header, not served directly to downstream).
289    async fn upstream_response_filter(
290        &self,
291        _session: &mut Session,
292        _upstream_response: &mut ResponseHeader,
293        _ctx: &mut Self::CTX,
294    ) -> Result<()>
295    where
296        Self::CTX: Send + Sync,
297    {
298        Ok(())
299    }
300
301    /// Modify the response header before it is send to the downstream
302    ///
303    /// The modification is after caching. This filter is called for all responses including
304    /// responses served from cache.
305    async fn response_filter(
306        &self,
307        _session: &mut Session,
308        _upstream_response: &mut ResponseHeader,
309        _ctx: &mut Self::CTX,
310    ) -> Result<()>
311    where
312        Self::CTX: Send + Sync,
313    {
314        Ok(())
315    }
316
317    // custom_forwarding is called when downstream and upstream connections are successfully established.
318    #[doc(hidden)]
319    async fn custom_forwarding(
320        &self,
321        _session: &mut Session,
322        _ctx: &mut Self::CTX,
323        _custom_message_to_upstream: Option<mpsc::Sender<Bytes>>,
324        _custom_message_to_downstream: mpsc::Sender<Bytes>,
325    ) -> Result<()>
326    where
327        Self::CTX: Send + Sync,
328    {
329        Ok(())
330    }
331
332    // received a custom message from the downstream before sending it to the upstream.
333    #[doc(hidden)]
334    async fn downstream_custom_message_proxy_filter(
335        &self,
336        _session: &mut Session,
337        custom_message: Bytes,
338        _ctx: &mut Self::CTX,
339        _final_hop: bool,
340    ) -> Result<Option<Bytes>>
341    where
342        Self::CTX: Send + Sync,
343    {
344        Ok(Some(custom_message))
345    }
346
347    // received a custom message from the upstream before sending it to the downstream.
348    #[doc(hidden)]
349    async fn upstream_custom_message_proxy_filter(
350        &self,
351        _session: &mut Session,
352        custom_message: Bytes,
353        _ctx: &mut Self::CTX,
354        _final_hop: bool,
355    ) -> Result<Option<Bytes>>
356    where
357        Self::CTX: Send + Sync,
358    {
359        Ok(Some(custom_message))
360    }
361
362    /// Similar to [Self::upstream_response_filter()] but for response body
363    ///
364    /// This function will be called every time a piece of response body is received. The `body` is
365    /// **not the entire response body**.
366    fn upstream_response_body_filter(
367        &self,
368        _session: &mut Session,
369        _body: &mut Option<Bytes>,
370        _end_of_stream: bool,
371        _ctx: &mut Self::CTX,
372    ) -> Result<Option<Duration>> {
373        Ok(None)
374    }
375
376    /// Similar to [Self::upstream_response_filter()] but for response trailers
377    fn upstream_response_trailer_filter(
378        &self,
379        _session: &mut Session,
380        _upstream_trailers: &mut header::HeaderMap,
381        _ctx: &mut Self::CTX,
382    ) -> Result<()> {
383        Ok(())
384    }
385
386    /// Similar to [Self::response_filter()] but for response body chunks
387    fn response_body_filter(
388        &self,
389        _session: &mut Session,
390        _body: &mut Option<Bytes>,
391        _end_of_stream: bool,
392        _ctx: &mut Self::CTX,
393    ) -> Result<Option<Duration>>
394    where
395        Self::CTX: Send + Sync,
396    {
397        Ok(None)
398    }
399
400    /// Similar to [Self::response_filter()] but for response trailers.
401    /// Note, returning an Ok(Some(Bytes)) will result in the downstream response
402    /// trailers being written to the response body.
403    ///
404    /// TODO: make this interface more intuitive
405    async fn response_trailer_filter(
406        &self,
407        _session: &mut Session,
408        _upstream_trailers: &mut header::HeaderMap,
409        _ctx: &mut Self::CTX,
410    ) -> Result<Option<Bytes>>
411    where
412        Self::CTX: Send + Sync,
413    {
414        Ok(None)
415    }
416
417    /// This filter is called when the entire response is sent to the downstream successfully or
418    /// there is a fatal error that terminate the request.
419    ///
420    /// An error log is already emitted if there is any error. This phase is used for collecting
421    /// metrics and sending access logs.
422    async fn logging(&self, _session: &mut Session, _e: Option<&Error>, _ctx: &mut Self::CTX)
423    where
424        Self::CTX: Send + Sync,
425    {
426    }
427
428    /// A value of true means that the log message will be suppressed. The default value is false.
429    fn suppress_error_log(&self, _session: &Session, _ctx: &Self::CTX, _error: &Error) -> bool {
430        false
431    }
432
433    /// This filter is called when there is an error **after** a connection is established (or reused)
434    /// to the upstream.
435    fn error_while_proxy(
436        &self,
437        peer: &HttpPeer,
438        session: &mut Session,
439        e: Box<Error>,
440        _ctx: &mut Self::CTX,
441        client_reused: bool,
442    ) -> Box<Error> {
443        let mut e = e.more_context(format!("Peer: {}", peer));
444        // only reused client connections where retry buffer is not truncated
445        e.retry
446            .decide_reuse(client_reused && !session.as_ref().retry_buffer_truncated());
447        e
448    }
449
450    /// This filter is called when there is an error in the process of establishing a connection
451    /// to the upstream.
452    ///
453    /// In this filter the user can decide whether the error is retry-able by marking the error `e`.
454    ///
455    /// If the error can be retried, [Self::upstream_peer()] will be called again so that the user
456    /// can decide whether to send the request to the same upstream or another upstream that is possibly
457    /// available.
458    fn fail_to_connect(
459        &self,
460        _session: &mut Session,
461        _peer: &HttpPeer,
462        _ctx: &mut Self::CTX,
463        e: Box<Error>,
464    ) -> Box<Error> {
465        e
466    }
467
468    /// This filter is called when the request encounters a fatal error.
469    ///
470    /// Users may write an error response to the downstream if the downstream is still writable.
471    ///
472    /// The response status code of the error response may be returned for logging purposes.
473    /// Additionally, the user can return whether this session may be reused in spite of the error.
474    /// Today this reuse status is only respected for errors that occur prior to upstream peer
475    /// selection, and the keepalive configured on the `Session` itself still takes precedent.
476    async fn fail_to_proxy(
477        &self,
478        session: &mut Session,
479        e: &Error,
480        _ctx: &mut Self::CTX,
481    ) -> FailToProxy
482    where
483        Self::CTX: Send + Sync,
484    {
485        let code = match e.etype() {
486            HTTPStatus(code) => *code,
487            _ => {
488                match e.esource() {
489                    ErrorSource::Upstream => 502,
490                    ErrorSource::Downstream => {
491                        match e.etype() {
492                            WriteError | ReadError | ConnectionClosed => {
493                                /* conn already dead */
494                                0
495                            }
496                            _ => 400,
497                        }
498                    }
499                    ErrorSource::Internal | ErrorSource::Unset => 500,
500                }
501            }
502        };
503        if code > 0 {
504            session.respond_error(code).await.unwrap_or_else(|e| {
505                error!("failed to send error response to downstream: {e}");
506            });
507        }
508
509        FailToProxy {
510            error_code: code,
511            // default to no reuse, which is safest
512            can_reuse_downstream: false,
513        }
514    }
515
516    /// Decide whether should serve stale when encountering an error or during revalidation
517    ///
518    /// An implementation should follow
519    /// <https://datatracker.ietf.org/doc/html/rfc9111#section-4.2.4>
520    /// <https://www.rfc-editor.org/rfc/rfc5861#section-4>
521    ///
522    /// This filter is only called if cache is enabled.
523    // 5xx HTTP status will be encoded as ErrorType::HTTPStatus(code)
524    fn should_serve_stale(
525        &self,
526        _session: &mut Session,
527        _ctx: &mut Self::CTX,
528        error: Option<&Error>, // None when it is called during stale while revalidate
529    ) -> bool {
530        // A cache MUST NOT generate a stale response unless
531        // it is disconnected
532        // or doing so is explicitly permitted by the client or origin server
533        // (e.g. headers or an out-of-band contract)
534        error.is_some_and(|e| e.esource() == &ErrorSource::Upstream)
535    }
536
537    /// This filter is called when the request just established or reused a connection to the upstream
538    ///
539    /// This filter allows user to log timing and connection related info.
540    async fn connected_to_upstream(
541        &self,
542        _session: &mut Session,
543        _reused: bool,
544        _peer: &HttpPeer,
545        #[cfg(unix)] _fd: std::os::unix::io::RawFd,
546        #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
547        _digest: Option<&Digest>,
548        _ctx: &mut Self::CTX,
549    ) -> Result<()>
550    where
551        Self::CTX: Send + Sync,
552    {
553        Ok(())
554    }
555
556    /// This callback is invoked every time request related error log needs to be generated
557    ///
558    /// Users can define what is important to be written about this request via the returned string.
559    fn request_summary(&self, session: &Session, _ctx: &Self::CTX) -> String {
560        session.as_ref().request_summary()
561    }
562
563    /// Whether the request should be used to invalidate(delete) the HTTP cache
564    ///
565    /// - `true`: this request will be used to invalidate the cache.
566    /// - `false`: this request is a treated as a normal request
567    fn is_purge(&self, _session: &Session, _ctx: &Self::CTX) -> bool {
568        false
569    }
570
571    /// This filter is called after the proxy cache generates the downstream response to the purge
572    /// request (to invalidate or delete from the HTTP cache), based on the purge status, which
573    /// indicates whether the request succeeded or failed.
574    ///
575    /// The filter allows the user to modify or replace the generated downstream response.
576    /// If the filter returns `Err`, the proxy will instead send a 500 response.
577    fn purge_response_filter(
578        &self,
579        _session: &Session,
580        _ctx: &mut Self::CTX,
581        _purge_status: PurgeStatus,
582        _purge_response: &mut std::borrow::Cow<'static, ResponseHeader>,
583    ) -> Result<()> {
584        Ok(())
585    }
586}
587
588/// Context struct returned by `fail_to_proxy`.
589pub struct FailToProxy {
590    pub error_code: u16,
591    pub can_reuse_downstream: bool,
592}