Skip to main content

pingora_proxy/
proxy_trait.rs

1// Copyright 2026 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::*;
16use pingora_cache::{
17    key::HashBinary,
18    CacheKey, CacheMeta, ForcedFreshness, HitHandler,
19    RespCacheable::{self, *},
20};
21use proxy_cache::range_filter::{self};
22use std::time::Duration;
23
24/// The interface to control the HTTP proxy
25///
26/// The methods in [ProxyHttp] are filters/callbacks which will be performed on all requests at their
27/// particular stage (if applicable).
28///
29/// If any of the filters returns [Result::Err], the request will fail, and the error will be logged.
30#[cfg_attr(not(doc_async_trait), async_trait)]
31pub trait ProxyHttp {
32    /// The per request object to share state across the different filters
33    type CTX;
34
35    /// Define how the `ctx` should be created.
36    fn new_ctx(&self) -> Self::CTX;
37
38    /// Define where the proxy should send the request to.
39    ///
40    /// The returned [HttpPeer] contains the information regarding where and how this request should
41    /// be forwarded to.
42    async fn upstream_peer(
43        &self,
44        session: &mut Session,
45        ctx: &mut Self::CTX,
46    ) -> Result<Box<HttpPeer>>;
47
48    /// Set up downstream modules.
49    ///
50    /// In this phase, users can add or configure [HttpModules] before the server starts up.
51    ///
52    /// In the default implementation of this method, [ResponseCompressionBuilder] is added
53    /// and disabled.
54    fn init_downstream_modules(&self, modules: &mut HttpModules) {
55        // Add disabled downstream compression module by default
56        modules.add_module(ResponseCompressionBuilder::enable(0));
57    }
58
59    /// Handle the incoming request.
60    ///
61    /// In this phase, users can parse, validate, rate limit, perform access control and/or
62    /// return a response for this request.
63    ///
64    /// If the user already sent a response to this request, an `Ok(true)` should be returned so that
65    /// the proxy would exit. The proxy continues to the next phases when `Ok(false)` is returned.
66    ///
67    /// By default this filter does nothing and returns `Ok(false)`.
68    async fn request_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool>
69    where
70        Self::CTX: Send + Sync,
71    {
72        Ok(false)
73    }
74
75    /// Handle the incoming request before any downstream module is executed.
76    ///
77    /// This function is similar to [Self::request_filter()] but executes before any other logic,
78    /// including downstream module logic. The main purpose of this function is to provide finer
79    /// grained control of the behavior of the modules.
80    ///
81    /// Note that because this function is executed before any module that might provide access
82    /// control or rate limiting, logic should stay in request_filter() if it can in order to be
83    /// protected by said modules.
84    async fn early_request_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> Result<()>
85    where
86        Self::CTX: Send + Sync,
87    {
88        Ok(())
89    }
90
91    /// Returns whether this session is allowed to spawn subrequests.
92    ///
93    /// This function is checked after [Self::early_request_filter] to allow that filter to configure
94    /// this if required. This will also run for subrequests themselves, which may allowed to spawn
95    /// their own subrequests.
96    ///
97    /// Note that this doesn't prevent subrequests from being spawned based on the session by proxy
98    /// core functionality, e.g. background cache revalidation requires spawning subrequests.
99    fn allow_spawning_subrequest(&self, _session: &Session, _ctx: &Self::CTX) -> bool
100    where
101        Self::CTX: Send + Sync,
102    {
103        false
104    }
105
106    /// Handle the incoming request body.
107    ///
108    /// This function will be called every time a piece of request body is received. The `body` is
109    /// **not the entire request body**.
110    ///
111    /// The async nature of this function allows to throttle the upload speed and/or executing
112    /// heavy computation logic such as WAF rules on offloaded threads without blocking the threads
113    /// who process the requests themselves.
114    async fn request_body_filter(
115        &self,
116        _session: &mut Session,
117        _body: &mut Option<Bytes>,
118        _end_of_stream: bool,
119        _ctx: &mut Self::CTX,
120    ) -> Result<()>
121    where
122        Self::CTX: Send + Sync,
123    {
124        Ok(())
125    }
126
127    /// This filter decides if the request is cacheable and what cache backend to use
128    ///
129    /// The caller can interact with `Session.cache` to enable caching.
130    ///
131    /// By default this filter does nothing which effectively disables caching.
132    // Ideally only session.cache should be modified, TODO: reflect that in this interface
133    fn request_cache_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> Result<()>
134    where
135        Self::CTX: Send + Sync,
136    {
137        Ok(())
138    }
139
140    /// This callback generates the cache key.
141    ///
142    /// This callback is called only when cache is enabled for this request.
143    ///
144    /// There is no sensible default cache key for all proxy applications. The
145    /// correct key depends on which request properties affect upstream responses
146    /// (e.g. `Vary` headers, custom request filters that modify the origin host).
147    /// Getting this wrong leads to cache poisoning.
148    ///
149    /// See `pingora-proxy/tests/utils/server_utils.rs` for a minimal (not
150    /// production-ready) reference implementation.
151    ///
152    /// # Panics
153    ///
154    /// The default implementation panics. You **must** override this method when
155    /// caching is enabled.
156    fn cache_key_callback(&self, _session: &Session, _ctx: &mut Self::CTX) -> Result<CacheKey> {
157        unimplemented!("cache_key_callback must be implemented when caching is enabled")
158    }
159
160    /// This callback is invoked when a cacheable response is ready to be admitted to cache.
161    fn cache_miss(&self, session: &mut Session, _ctx: &mut Self::CTX) {
162        session.cache.cache_miss();
163    }
164
165    /// This filter is called after a successful cache lookup and before the
166    /// cache asset is ready to be used.
167    ///
168    /// This filter allows the user to log or force invalidate the asset, or
169    /// to adjust the body reader associated with the cache hit.
170    /// This also runs on stale hit assets (for which `is_fresh` is false).
171    ///
172    /// The value returned indicates if the force invalidation should be used,
173    /// and which kind. Returning `None` indicates no forced invalidation
174    async fn cache_hit_filter(
175        &self,
176        _session: &mut Session,
177        _meta: &CacheMeta,
178        _hit_handler: &mut HitHandler,
179        _is_fresh: bool,
180        _ctx: &mut Self::CTX,
181    ) -> Result<Option<ForcedFreshness>>
182    where
183        Self::CTX: Send + Sync,
184    {
185        Ok(None)
186    }
187
188    /// Decide if a request should continue to upstream after not being served from cache.
189    ///
190    /// returns: Ok(true) if the request should continue, Ok(false) if a response was written by the
191    /// callback and the session should be finished, or an error
192    ///
193    /// This filter can be used for deferring checks like rate limiting or access control to when they
194    /// actually needed after cache miss.
195    ///
196    /// By default the session will attempt to be reused after returning Ok(false). It is the
197    /// caller's responsibility to disable keepalive or drain the request body if needed.
198    async fn proxy_upstream_filter(
199        &self,
200        _session: &mut Session,
201        _ctx: &mut Self::CTX,
202    ) -> Result<bool>
203    where
204        Self::CTX: Send + Sync,
205    {
206        Ok(true)
207    }
208
209    /// Decide if the response is cacheable
210    fn response_cache_filter(
211        &self,
212        _session: &Session,
213        _resp: &ResponseHeader,
214        _ctx: &mut Self::CTX,
215    ) -> Result<RespCacheable> {
216        Ok(Uncacheable(NoCacheReason::Custom("default")))
217    }
218
219    /// Decide how to generate cache vary key from both request and response
220    ///
221    /// None means no variance is needed.
222    fn cache_vary_filter(
223        &self,
224        _meta: &CacheMeta,
225        _ctx: &mut Self::CTX,
226        _req: &RequestHeader,
227    ) -> Option<HashBinary> {
228        // default to None for now to disable vary feature
229        None
230    }
231
232    /// Decide if the incoming request's condition _fails_ against the cached response.
233    ///
234    /// Returning `Ok(true)` means that the response does _not_ match against the condition, and
235    /// that the proxy can return `304 Not Modified` downstream.
236    ///
237    /// An example is a conditional GET request with `If-None-Match: "foobar"`. If the cached
238    /// response contains the `ETag: "foobar"`, then the condition fails, and `304 Not Modified`
239    /// should be returned. Else, the condition passes which means the full `200 OK` response must
240    /// be sent.
241    fn cache_not_modified_filter(
242        &self,
243        session: &Session,
244        resp: &ResponseHeader,
245        _ctx: &mut Self::CTX,
246    ) -> Result<bool> {
247        Ok(
248            pingora_core::protocols::http::conditional_filter::not_modified_filter(
249                session.req_header(),
250                resp,
251            ),
252        )
253    }
254
255    /// This filter is called when cache is enabled to determine what byte range to return (in both
256    /// cache hit and miss cases) from the response body. It is only used when caching is enabled,
257    /// otherwise the upstream is responsible for any filtering. It allows users to define the range
258    /// this request is for via its return type `range_filter::RangeType`.
259    ///
260    /// It also allow users to modify the response header accordingly.
261    ///
262    /// The default implementation can handle a single-range as per [RFC7232].
263    ///
264    /// [RFC7232]: https://www.rfc-editor.org/rfc/rfc7232
265    fn range_header_filter(
266        &self,
267        session: &mut Session,
268        resp: &mut ResponseHeader,
269        _ctx: &mut Self::CTX,
270    ) -> range_filter::RangeType {
271        const DEFAULT_MAX_RANGES: Option<usize> = Some(200);
272        proxy_cache::range_filter::range_header_filter(
273            session.req_header(),
274            resp,
275            DEFAULT_MAX_RANGES,
276        )
277    }
278
279    /// Modify the request before it is sent to the upstream
280    ///
281    /// Unlike [Self::request_filter()], this filter allows to change the request headers to send
282    /// to the upstream.
283    async fn upstream_request_filter(
284        &self,
285        _session: &mut Session,
286        _upstream_request: &mut RequestHeader,
287        _ctx: &mut Self::CTX,
288    ) -> Result<()>
289    where
290        Self::CTX: Send + Sync,
291    {
292        Ok(())
293    }
294
295    /// Modify the response header from the upstream
296    ///
297    /// The modification is before caching, so any change here will be stored in the cache if enabled.
298    ///
299    /// Responses served from cache won't trigger this filter. If the cache needed revalidation,
300    /// only the 304 from upstream will trigger the filter (though it will be merged into the
301    /// cached header, not served directly to downstream).
302    async fn upstream_response_filter(
303        &self,
304        _session: &mut Session,
305        _upstream_response: &mut ResponseHeader,
306        _ctx: &mut Self::CTX,
307    ) -> Result<()>
308    where
309        Self::CTX: Send + Sync,
310    {
311        Ok(())
312    }
313
314    /// Modify the response header before it is send to the downstream
315    ///
316    /// The modification is after caching. This filter is called for all responses including
317    /// responses served from cache.
318    async fn response_filter(
319        &self,
320        _session: &mut Session,
321        _upstream_response: &mut ResponseHeader,
322        _ctx: &mut Self::CTX,
323    ) -> Result<()>
324    where
325        Self::CTX: Send + Sync,
326    {
327        Ok(())
328    }
329
330    // custom_forwarding is called when downstream and upstream connections are successfully established.
331    #[doc(hidden)]
332    async fn custom_forwarding(
333        &self,
334        _session: &mut Session,
335        _ctx: &mut Self::CTX,
336        _custom_message_to_upstream: Option<mpsc::Sender<Bytes>>,
337        _custom_message_to_downstream: mpsc::Sender<Bytes>,
338    ) -> Result<()>
339    where
340        Self::CTX: Send + Sync,
341    {
342        Ok(())
343    }
344
345    // received a custom message from the downstream before sending it to the upstream.
346    #[doc(hidden)]
347    async fn downstream_custom_message_proxy_filter(
348        &self,
349        _session: &mut Session,
350        custom_message: Bytes,
351        _ctx: &mut Self::CTX,
352        _final_hop: bool,
353    ) -> Result<Option<Bytes>>
354    where
355        Self::CTX: Send + Sync,
356    {
357        Ok(Some(custom_message))
358    }
359
360    // received a custom message from the upstream before sending it to the downstream.
361    #[doc(hidden)]
362    async fn upstream_custom_message_proxy_filter(
363        &self,
364        _session: &mut Session,
365        custom_message: Bytes,
366        _ctx: &mut Self::CTX,
367        _final_hop: bool,
368    ) -> Result<Option<Bytes>>
369    where
370        Self::CTX: Send + Sync,
371    {
372        Ok(Some(custom_message))
373    }
374
375    /// Similar to [Self::upstream_response_filter()] but for response body
376    ///
377    /// This function will be called every time a piece of response body is received. The `body` is
378    /// **not the entire response body**.
379    fn upstream_response_body_filter(
380        &self,
381        _session: &mut Session,
382        _body: &mut Option<Bytes>,
383        _end_of_stream: bool,
384        _ctx: &mut Self::CTX,
385    ) -> Result<Option<Duration>> {
386        Ok(None)
387    }
388
389    /// Similar to [Self::upstream_response_filter()] but for response trailers
390    fn upstream_response_trailer_filter(
391        &self,
392        _session: &mut Session,
393        _upstream_trailers: &mut header::HeaderMap,
394        _ctx: &mut Self::CTX,
395    ) -> Result<()> {
396        Ok(())
397    }
398
399    /// Similar to [Self::response_filter()] but for response body chunks
400    fn response_body_filter(
401        &self,
402        _session: &mut Session,
403        _body: &mut Option<Bytes>,
404        _end_of_stream: bool,
405        _ctx: &mut Self::CTX,
406    ) -> Result<Option<Duration>>
407    where
408        Self::CTX: Send + Sync,
409    {
410        Ok(None)
411    }
412
413    /// Similar to [Self::response_filter()] but for response trailers.
414    /// Note, returning an Ok(Some(Bytes)) will result in the downstream response
415    /// trailers being written to the response body.
416    ///
417    /// TODO: make this interface more intuitive
418    async fn response_trailer_filter(
419        &self,
420        _session: &mut Session,
421        _upstream_trailers: &mut header::HeaderMap,
422        _ctx: &mut Self::CTX,
423    ) -> Result<Option<Bytes>>
424    where
425        Self::CTX: Send + Sync,
426    {
427        Ok(None)
428    }
429
430    /// This filter is called when the entire response is sent to the downstream successfully or
431    /// there is a fatal error that terminate the request.
432    ///
433    /// An error log is already emitted if there is any error. This phase is used for collecting
434    /// metrics and sending access logs.
435    async fn logging(&self, _session: &mut Session, _e: Option<&Error>, _ctx: &mut Self::CTX)
436    where
437        Self::CTX: Send + Sync,
438    {
439    }
440
441    /// A value of true means that the log message will be suppressed. The default value is false.
442    fn suppress_error_log(&self, _session: &Session, _ctx: &Self::CTX, _error: &Error) -> bool {
443        false
444    }
445
446    /// This filter is called when there is an error **after** a connection is established (or reused)
447    /// to the upstream.
448    fn error_while_proxy(
449        &self,
450        peer: &HttpPeer,
451        session: &mut Session,
452        e: Box<Error>,
453        _ctx: &mut Self::CTX,
454        client_reused: bool,
455    ) -> Box<Error> {
456        let mut e = e.more_context(format!("Peer: {}", peer));
457        // only reused client connections where retry buffer is not truncated
458        e.retry
459            .decide_reuse(client_reused && !session.as_ref().retry_buffer_truncated());
460        e
461    }
462
463    /// This filter is called when there is an error in the process of establishing a connection
464    /// to the upstream.
465    ///
466    /// In this filter the user can decide whether the error is retry-able by marking the error `e`.
467    ///
468    /// If the error can be retried, [Self::upstream_peer()] will be called again so that the user
469    /// can decide whether to send the request to the same upstream or another upstream that is possibly
470    /// available.
471    fn fail_to_connect(
472        &self,
473        _session: &mut Session,
474        _peer: &HttpPeer,
475        _ctx: &mut Self::CTX,
476        e: Box<Error>,
477    ) -> Box<Error> {
478        e
479    }
480
481    /// This filter is called when the request encounters a fatal error.
482    ///
483    /// Users may write an error response to the downstream if the downstream is still writable.
484    ///
485    /// The response status code of the error response may be returned for logging purposes.
486    /// Additionally, the user can return whether this session may be reused in spite of the error.
487    /// Today this reuse status is only respected for errors that occur prior to upstream peer
488    /// selection, and the keepalive configured on the `Session` itself still takes precedent.
489    async fn fail_to_proxy(
490        &self,
491        session: &mut Session,
492        e: &Error,
493        _ctx: &mut Self::CTX,
494    ) -> FailToProxy
495    where
496        Self::CTX: Send + Sync,
497    {
498        let code = match e.etype() {
499            HTTPStatus(code) => *code,
500            _ => {
501                match e.esource() {
502                    ErrorSource::Upstream => 502,
503                    ErrorSource::Downstream => {
504                        match e.etype() {
505                            WriteError | ReadError | ConnectionClosed => {
506                                /* conn already dead */
507                                0
508                            }
509                            _ => 400,
510                        }
511                    }
512                    ErrorSource::Internal | ErrorSource::Unset => 500,
513                }
514            }
515        };
516        if code > 0 {
517            session.respond_error(code).await.unwrap_or_else(|e| {
518                error!("failed to send error response to downstream: {e}");
519            });
520        }
521
522        FailToProxy {
523            error_code: code,
524            // default to no reuse, which is safest
525            can_reuse_downstream: false,
526        }
527    }
528
529    /// Decide whether should serve stale when encountering an error or during revalidation
530    ///
531    /// An implementation should follow
532    /// <https://datatracker.ietf.org/doc/html/rfc9111#section-4.2.4>
533    /// <https://www.rfc-editor.org/rfc/rfc5861#section-4>
534    ///
535    /// This filter is only called if cache is enabled.
536    // 5xx HTTP status will be encoded as ErrorType::HTTPStatus(code)
537    fn should_serve_stale(
538        &self,
539        _session: &mut Session,
540        _ctx: &mut Self::CTX,
541        error: Option<&Error>, // None when it is called during stale while revalidate
542    ) -> bool {
543        // A cache MUST NOT generate a stale response unless
544        // it is disconnected
545        // or doing so is explicitly permitted by the client or origin server
546        // (e.g. headers or an out-of-band contract)
547        error.is_some_and(|e| e.esource() == &ErrorSource::Upstream)
548    }
549
550    /// This filter is called when the request just established or reused a connection to the upstream
551    ///
552    /// This filter allows user to log timing and connection related info.
553    async fn connected_to_upstream(
554        &self,
555        _session: &mut Session,
556        _reused: bool,
557        _peer: &HttpPeer,
558        #[cfg(unix)] _fd: std::os::unix::io::RawFd,
559        #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
560        _digest: Option<&Digest>,
561        _ctx: &mut Self::CTX,
562    ) -> Result<()>
563    where
564        Self::CTX: Send + Sync,
565    {
566        Ok(())
567    }
568
569    /// This callback is invoked every time request related error log needs to be generated
570    ///
571    /// Users can define what is important to be written about this request via the returned string.
572    fn request_summary(&self, session: &Session, _ctx: &Self::CTX) -> String {
573        session.as_ref().request_summary()
574    }
575
576    /// Whether the request should be used to invalidate(delete) the HTTP cache
577    ///
578    /// - `true`: this request will be used to invalidate the cache.
579    /// - `false`: this request is a treated as a normal request
580    fn is_purge(&self, _session: &Session, _ctx: &Self::CTX) -> bool {
581        false
582    }
583
584    /// This filter is called after the proxy cache generates the downstream response to the purge
585    /// request (to invalidate or delete from the HTTP cache), based on the purge status, which
586    /// indicates whether the request succeeded or failed.
587    ///
588    /// The filter allows the user to modify or replace the generated downstream response.
589    /// If the filter returns `Err`, the proxy will instead send a 500 response.
590    fn purge_response_filter(
591        &self,
592        _session: &Session,
593        _ctx: &mut Self::CTX,
594        _purge_status: PurgeStatus,
595        _purge_response: &mut std::borrow::Cow<'static, ResponseHeader>,
596    ) -> Result<()> {
597        Ok(())
598    }
599}
600
601/// Context struct returned by `fail_to_proxy`.
602pub struct FailToProxy {
603    pub error_code: u16,
604    pub can_reuse_downstream: bool,
605}