pingora_proxy/
proxy_trait.rs

1// Copyright 2025 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::*;
16use pingora_cache::{
17    key::HashBinary,
18    CacheKey, CacheMeta, ForcedInvalidationKind,
19    RespCacheable::{self, *},
20};
21use proxy_cache::range_filter::{self};
22use std::time::Duration;
23
24/// The interface to control the HTTP proxy
25///
26/// The methods in [ProxyHttp] are filters/callbacks which will be performed on all requests at their
27/// particular stage (if applicable).
28///
29/// If any of the filters returns [Result::Err], the request will fail, and the error will be logged.
30#[cfg_attr(not(doc_async_trait), async_trait)]
31pub trait ProxyHttp {
32    /// The per request object to share state across the different filters
33    type CTX;
34
35    /// Define how the `ctx` should be created.
36    fn new_ctx(&self) -> Self::CTX;
37
38    /// Define where the proxy should send the request to.
39    ///
40    /// The returned [HttpPeer] contains the information regarding where and how this request should
41    /// be forwarded to.
42    async fn upstream_peer(
43        &self,
44        session: &mut Session,
45        ctx: &mut Self::CTX,
46    ) -> Result<Box<HttpPeer>>;
47
48    /// Set up downstream modules.
49    ///
50    /// In this phase, users can add or configure [HttpModules] before the server starts up.
51    ///
52    /// In the default implementation of this method, [ResponseCompressionBuilder] is added
53    /// and disabled.
54    fn init_downstream_modules(&self, modules: &mut HttpModules) {
55        // Add disabled downstream compression module by default
56        modules.add_module(ResponseCompressionBuilder::enable(0));
57    }
58
59    /// Handle the incoming request.
60    ///
61    /// In this phase, users can parse, validate, rate limit, perform access control and/or
62    /// return a response for this request.
63    ///
64    /// If the user already sent a response to this request, an `Ok(true)` should be returned so that
65    /// the proxy would exit. The proxy continues to the next phases when `Ok(false)` is returned.
66    ///
67    /// By default this filter does nothing and returns `Ok(false)`.
68    async fn request_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool>
69    where
70        Self::CTX: Send + Sync,
71    {
72        Ok(false)
73    }
74
75    /// Handle the incoming request before any downstream module is executed.
76    ///
77    /// This function is similar to [Self::request_filter()] but executes before any other logic,
78    /// including downstream module logic. The main purpose of this function is to provide finer
79    /// grained control of the behavior of the modules.
80    ///
81    /// Note that because this function is executed before any module that might provide access
82    /// control or rate limiting, logic should stay in request_filter() if it can in order to be
83    /// protected by said modules.
84    async fn early_request_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> Result<()>
85    where
86        Self::CTX: Send + Sync,
87    {
88        Ok(())
89    }
90
91    /// Handle the incoming request body.
92    ///
93    /// This function will be called every time a piece of request body is received. The `body` is
94    /// **not the entire request body**.
95    ///
96    /// The async nature of this function allows to throttle the upload speed and/or executing
97    /// heavy computation logic such as WAF rules on offloaded threads without blocking the threads
98    /// who process the requests themselves.
99    async fn request_body_filter(
100        &self,
101        _session: &mut Session,
102        _body: &mut Option<Bytes>,
103        _end_of_stream: bool,
104        _ctx: &mut Self::CTX,
105    ) -> Result<()>
106    where
107        Self::CTX: Send + Sync,
108    {
109        Ok(())
110    }
111
112    /// This filter decides if the request is cacheable and what cache backend to use
113    ///
114    /// The caller can interact with `Session.cache` to enable caching.
115    ///
116    /// By default this filter does nothing which effectively disables caching.
117    // Ideally only session.cache should be modified, TODO: reflect that in this interface
118    fn request_cache_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> Result<()> {
119        Ok(())
120    }
121
122    /// This callback generates the cache key
123    ///
124    /// This callback is called only when cache is enabled for this request
125    ///
126    /// By default this callback returns a default cache key generated from the request.
127    fn cache_key_callback(&self, session: &Session, _ctx: &mut Self::CTX) -> Result<CacheKey> {
128        let req_header = session.req_header();
129        Ok(CacheKey::default(req_header))
130    }
131
132    /// This callback is invoked when a cacheable response is ready to be admitted to cache
133    fn cache_miss(&self, session: &mut Session, _ctx: &mut Self::CTX) {
134        session.cache.cache_miss();
135    }
136
137    /// This filter is called after a successful cache lookup and before the
138    /// cache asset is ready to be used.
139    ///
140    /// This filter allows the user to log or force invalidate the asset.
141    /// This also runs on stale hit assets (for which `is_fresh` is false).
142    ///
143    /// The value returned indicates if the force invalidation should be used,
144    /// and which kind. Returning `None` indicates no forced invalidation
145    async fn cache_hit_filter(
146        &self,
147        _session: &Session,
148        _meta: &CacheMeta,
149        _is_fresh: bool,
150        _ctx: &mut Self::CTX,
151    ) -> Result<Option<ForcedInvalidationKind>>
152    where
153        Self::CTX: Send + Sync,
154    {
155        Ok(None)
156    }
157
158    /// Decide if a request should continue to upstream after not being served from cache.
159    ///
160    /// returns: Ok(true) if the request should continue, Ok(false) if a response was written by the
161    /// callback and the session should be finished, or an error
162    ///
163    /// This filter can be used for deferring checks like rate limiting or access control to when they
164    /// actually needed after cache miss.
165    ///
166    /// By default the session will attempt to be reused after returning Ok(false). It is the
167    /// caller's responsibility to disable keepalive or drain the request body if needed.
168    async fn proxy_upstream_filter(
169        &self,
170        _session: &mut Session,
171        _ctx: &mut Self::CTX,
172    ) -> Result<bool>
173    where
174        Self::CTX: Send + Sync,
175    {
176        Ok(true)
177    }
178
179    /// Decide if the response is cacheable
180    fn response_cache_filter(
181        &self,
182        _session: &Session,
183        _resp: &ResponseHeader,
184        _ctx: &mut Self::CTX,
185    ) -> Result<RespCacheable> {
186        Ok(Uncacheable(NoCacheReason::Custom("default")))
187    }
188
189    /// Decide how to generate cache vary key from both request and response
190    ///
191    /// None means no variance is needed.
192    fn cache_vary_filter(
193        &self,
194        _meta: &CacheMeta,
195        _ctx: &mut Self::CTX,
196        _req: &RequestHeader,
197    ) -> Option<HashBinary> {
198        // default to None for now to disable vary feature
199        None
200    }
201
202    /// Decide if the incoming request's condition _fails_ against the cached response.
203    ///
204    /// Returning `Ok(true)` means that the response does _not_ match against the condition, and
205    /// that the proxy can return `304 Not Modified` downstream.
206    ///
207    /// An example is a conditional GET request with `If-None-Match: "foobar"`. If the cached
208    /// response contains the `ETag: "foobar"`, then the condition fails, and `304 Not Modified`
209    /// should be returned. Else, the condition passes which means the full `200 OK` response must
210    /// be sent.
211    fn cache_not_modified_filter(
212        &self,
213        session: &Session,
214        resp: &ResponseHeader,
215        _ctx: &mut Self::CTX,
216    ) -> Result<bool> {
217        Ok(
218            pingora_core::protocols::http::conditional_filter::not_modified_filter(
219                session.req_header(),
220                resp,
221            ),
222        )
223    }
224
225    /// This filter is called when cache is enabled to determine what byte range to return (in both
226    /// cache hit and miss cases) from the response body. It is only used when caching is enabled,
227    /// otherwise the upstream is responsible for any filtering. It allows users to define the range
228    /// this request is for via its return type `range_filter::RangeType`.
229    ///
230    /// It also allow users to modify the response header accordingly.
231    ///
232    /// The default implementation can handle a single-range as per [RFC7232].
233    ///
234    /// [RFC7232]: https://www.rfc-editor.org/rfc/rfc7232
235    fn range_header_filter(
236        &self,
237        req: &RequestHeader,
238        resp: &mut ResponseHeader,
239        _ctx: &mut Self::CTX,
240    ) -> range_filter::RangeType {
241        proxy_cache::range_filter::range_header_filter(req, resp)
242    }
243
244    /// Modify the request before it is sent to the upstream
245    ///
246    /// Unlike [Self::request_filter()], this filter allows to change the request headers to send
247    /// to the upstream.
248    async fn upstream_request_filter(
249        &self,
250        _session: &mut Session,
251        _upstream_request: &mut RequestHeader,
252        _ctx: &mut Self::CTX,
253    ) -> Result<()>
254    where
255        Self::CTX: Send + Sync,
256    {
257        Ok(())
258    }
259
260    /// Modify the response header from the upstream
261    ///
262    /// The modification is before caching, so any change here will be stored in the cache if enabled.
263    ///
264    /// Responses served from cache won't trigger this filter. If the cache needed revalidation,
265    /// only the 304 from upstream will trigger the filter (though it will be merged into the
266    /// cached header, not served directly to downstream).
267    fn upstream_response_filter(
268        &self,
269        _session: &mut Session,
270        _upstream_response: &mut ResponseHeader,
271        _ctx: &mut Self::CTX,
272    ) -> Result<()> {
273        Ok(())
274    }
275
276    /// Modify the response header before it is send to the downstream
277    ///
278    /// The modification is after caching. This filter is called for all responses including
279    /// responses served from cache.
280    async fn response_filter(
281        &self,
282        _session: &mut Session,
283        _upstream_response: &mut ResponseHeader,
284        _ctx: &mut Self::CTX,
285    ) -> Result<()>
286    where
287        Self::CTX: Send + Sync,
288    {
289        Ok(())
290    }
291
292    /// Similar to [Self::upstream_response_filter()] but for response body
293    ///
294    /// This function will be called every time a piece of response body is received. The `body` is
295    /// **not the entire response body**.
296    fn upstream_response_body_filter(
297        &self,
298        _session: &mut Session,
299        _body: &mut Option<Bytes>,
300        _end_of_stream: bool,
301        _ctx: &mut Self::CTX,
302    ) -> Result<()> {
303        Ok(())
304    }
305
306    /// Similar to [Self::upstream_response_filter()] but for response trailers
307    fn upstream_response_trailer_filter(
308        &self,
309        _session: &mut Session,
310        _upstream_trailers: &mut header::HeaderMap,
311        _ctx: &mut Self::CTX,
312    ) -> Result<()> {
313        Ok(())
314    }
315
316    /// Similar to [Self::response_filter()] but for response body chunks
317    fn response_body_filter(
318        &self,
319        _session: &mut Session,
320        _body: &mut Option<Bytes>,
321        _end_of_stream: bool,
322        _ctx: &mut Self::CTX,
323    ) -> Result<Option<Duration>>
324    where
325        Self::CTX: Send + Sync,
326    {
327        Ok(None)
328    }
329
330    /// Similar to [Self::response_filter()] but for response trailers.
331    /// Note, returning an Ok(Some(Bytes)) will result in the downstream response
332    /// trailers being written to the response body.
333    ///
334    /// TODO: make this interface more intuitive
335    async fn response_trailer_filter(
336        &self,
337        _session: &mut Session,
338        _upstream_trailers: &mut header::HeaderMap,
339        _ctx: &mut Self::CTX,
340    ) -> Result<Option<Bytes>>
341    where
342        Self::CTX: Send + Sync,
343    {
344        Ok(None)
345    }
346
347    /// This filter is called when the entire response is sent to the downstream successfully or
348    /// there is a fatal error that terminate the request.
349    ///
350    /// An error log is already emitted if there is any error. This phase is used for collecting
351    /// metrics and sending access logs.
352    async fn logging(&self, _session: &mut Session, _e: Option<&Error>, _ctx: &mut Self::CTX)
353    where
354        Self::CTX: Send + Sync,
355    {
356    }
357
358    /// A value of true means that the log message will be suppressed. The default value is false.
359    fn suppress_error_log(&self, _session: &Session, _ctx: &Self::CTX, _error: &Error) -> bool {
360        false
361    }
362
363    /// This filter is called when there is an error **after** a connection is established (or reused)
364    /// to the upstream.
365    fn error_while_proxy(
366        &self,
367        peer: &HttpPeer,
368        session: &mut Session,
369        e: Box<Error>,
370        _ctx: &mut Self::CTX,
371        client_reused: bool,
372    ) -> Box<Error> {
373        let mut e = e.more_context(format!("Peer: {}", peer));
374        // only reused client connections where retry buffer is not truncated
375        e.retry
376            .decide_reuse(client_reused && !session.as_ref().retry_buffer_truncated());
377        e
378    }
379
380    /// This filter is called when there is an error in the process of establishing a connection
381    /// to the upstream.
382    ///
383    /// In this filter the user can decide whether the error is retry-able by marking the error `e`.
384    ///
385    /// If the error can be retried, [Self::upstream_peer()] will be called again so that the user
386    /// can decide whether to send the request to the same upstream or another upstream that is possibly
387    /// available.
388    fn fail_to_connect(
389        &self,
390        _session: &mut Session,
391        _peer: &HttpPeer,
392        _ctx: &mut Self::CTX,
393        e: Box<Error>,
394    ) -> Box<Error> {
395        e
396    }
397
398    /// This filter is called when the request encounters a fatal error.
399    ///
400    /// Users may write an error response to the downstream if the downstream is still writable.
401    ///
402    /// The response status code of the error response may be returned for logging purposes.
403    /// Additionally, the user can return whether this session may be reused in spite of the error.
404    /// Today this reuse status is only respected for errors that occur prior to upstream peer
405    /// selection, and the keepalive configured on the `Session` itself still takes precedent.
406    async fn fail_to_proxy(
407        &self,
408        session: &mut Session,
409        e: &Error,
410        _ctx: &mut Self::CTX,
411    ) -> FailToProxy
412    where
413        Self::CTX: Send + Sync,
414    {
415        let code = match e.etype() {
416            HTTPStatus(code) => *code,
417            _ => {
418                match e.esource() {
419                    ErrorSource::Upstream => 502,
420                    ErrorSource::Downstream => {
421                        match e.etype() {
422                            WriteError | ReadError | ConnectionClosed => {
423                                /* conn already dead */
424                                0
425                            }
426                            _ => 400,
427                        }
428                    }
429                    ErrorSource::Internal | ErrorSource::Unset => 500,
430                }
431            }
432        };
433        if code > 0 {
434            session.respond_error(code).await.unwrap_or_else(|e| {
435                error!("failed to send error response to downstream: {e}");
436            });
437        }
438
439        FailToProxy {
440            error_code: code,
441            // default to no reuse, which is safest
442            can_reuse_downstream: false,
443        }
444    }
445
446    /// Decide whether should serve stale when encountering an error or during revalidation
447    ///
448    /// An implementation should follow
449    /// <https://datatracker.ietf.org/doc/html/rfc9111#section-4.2.4>
450    /// <https://www.rfc-editor.org/rfc/rfc5861#section-4>
451    ///
452    /// This filter is only called if cache is enabled.
453    // 5xx HTTP status will be encoded as ErrorType::HTTPStatus(code)
454    fn should_serve_stale(
455        &self,
456        _session: &mut Session,
457        _ctx: &mut Self::CTX,
458        error: Option<&Error>, // None when it is called during stale while revalidate
459    ) -> bool {
460        // A cache MUST NOT generate a stale response unless
461        // it is disconnected
462        // or doing so is explicitly permitted by the client or origin server
463        // (e.g. headers or an out-of-band contract)
464        error.is_some_and(|e| e.esource() == &ErrorSource::Upstream)
465    }
466
467    /// This filter is called when the request just established or reused a connection to the upstream
468    ///
469    /// This filter allows user to log timing and connection related info.
470    async fn connected_to_upstream(
471        &self,
472        _session: &mut Session,
473        _reused: bool,
474        _peer: &HttpPeer,
475        #[cfg(unix)] _fd: std::os::unix::io::RawFd,
476        #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
477        _digest: Option<&Digest>,
478        _ctx: &mut Self::CTX,
479    ) -> Result<()>
480    where
481        Self::CTX: Send + Sync,
482    {
483        Ok(())
484    }
485
486    /// This callback is invoked every time request related error log needs to be generated
487    ///
488    /// Users can define what is important to be written about this request via the returned string.
489    fn request_summary(&self, session: &Session, _ctx: &Self::CTX) -> String {
490        session.as_ref().request_summary()
491    }
492
493    /// Whether the request should be used to invalidate(delete) the HTTP cache
494    ///
495    /// - `true`: this request will be used to invalidate the cache.
496    /// - `false`: this request is a treated as a normal request
497    fn is_purge(&self, _session: &Session, _ctx: &Self::CTX) -> bool {
498        false
499    }
500
501    /// This filter is called after the proxy cache generates the downstream response to the purge
502    /// request (to invalidate or delete from the HTTP cache), based on the purge status, which
503    /// indicates whether the request succeeded or failed.
504    ///
505    /// The filter allows the user to modify or replace the generated downstream response.
506    /// If the filter returns `Err`, the proxy will instead send a 500 response.
507    fn purge_response_filter(
508        &self,
509        _session: &Session,
510        _ctx: &mut Self::CTX,
511        _purge_status: PurgeStatus,
512        _purge_response: &mut std::borrow::Cow<'static, ResponseHeader>,
513    ) -> Result<()> {
514        Ok(())
515    }
516}
517
518/// Context struct returned by `fail_to_proxy`.
519pub struct FailToProxy {
520    pub error_code: u16,
521    pub can_reuse_downstream: bool,
522}