pingora_proxy/proxy_trait.rs
1// Copyright 2026 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::*;
16use pingora_cache::{
17 key::HashBinary,
18 CacheKey, CacheMeta, ForcedFreshness, HitHandler,
19 RespCacheable::{self, *},
20};
21use proxy_cache::range_filter::{self};
22use std::time::Duration;
23
24/// The interface to control the HTTP proxy
25///
26/// The methods in [ProxyHttp] are filters/callbacks which will be performed on all requests at their
27/// particular stage (if applicable).
28///
29/// If any of the filters returns [Result::Err], the request will fail, and the error will be logged.
30#[cfg_attr(not(doc_async_trait), async_trait)]
31pub trait ProxyHttp {
32 /// The per request object to share state across the different filters
33 type CTX;
34
35 /// Define how the `ctx` should be created.
36 fn new_ctx(&self) -> Self::CTX;
37
38 /// Define where the proxy should send the request to.
39 ///
40 /// The returned [HttpPeer] contains the information regarding where and how this request should
41 /// be forwarded to.
42 async fn upstream_peer(
43 &self,
44 session: &mut Session,
45 ctx: &mut Self::CTX,
46 ) -> Result<Box<HttpPeer>>;
47
48 /// Set up downstream modules.
49 ///
50 /// In this phase, users can add or configure [HttpModules] before the server starts up.
51 ///
52 /// In the default implementation of this method, [ResponseCompressionBuilder] is added
53 /// and disabled.
54 fn init_downstream_modules(&self, modules: &mut HttpModules) {
55 // Add disabled downstream compression module by default
56 modules.add_module(ResponseCompressionBuilder::enable(0));
57 }
58
59 /// Handle the incoming request.
60 ///
61 /// In this phase, users can parse, validate, rate limit, perform access control and/or
62 /// return a response for this request.
63 ///
64 /// If the user already sent a response to this request, an `Ok(true)` should be returned so that
65 /// the proxy would exit. The proxy continues to the next phases when `Ok(false)` is returned.
66 ///
67 /// By default this filter does nothing and returns `Ok(false)`.
68 async fn request_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool>
69 where
70 Self::CTX: Send + Sync,
71 {
72 Ok(false)
73 }
74
75 /// Handle the incoming request before any downstream module is executed.
76 ///
77 /// This function is similar to [Self::request_filter()] but executes before any other logic,
78 /// including downstream module logic. The main purpose of this function is to provide finer
79 /// grained control of the behavior of the modules.
80 ///
81 /// Note that because this function is executed before any module that might provide access
82 /// control or rate limiting, logic should stay in request_filter() if it can in order to be
83 /// protected by said modules.
84 async fn early_request_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> Result<()>
85 where
86 Self::CTX: Send + Sync,
87 {
88 Ok(())
89 }
90
91 /// Returns whether this session is allowed to spawn subrequests.
92 ///
93 /// This function is checked after [Self::early_request_filter] to allow that filter to configure
94 /// this if required. This will also run for subrequests themselves, which may allowed to spawn
95 /// their own subrequests.
96 ///
97 /// Note that this doesn't prevent subrequests from being spawned based on the session by proxy
98 /// core functionality, e.g. background cache revalidation requires spawning subrequests.
99 fn allow_spawning_subrequest(&self, _session: &Session, _ctx: &Self::CTX) -> bool
100 where
101 Self::CTX: Send + Sync,
102 {
103 false
104 }
105
106 /// Handle the incoming request body.
107 ///
108 /// This function will be called every time a piece of request body is received. The `body` is
109 /// **not the entire request body**.
110 ///
111 /// The async nature of this function allows to throttle the upload speed and/or executing
112 /// heavy computation logic such as WAF rules on offloaded threads without blocking the threads
113 /// who process the requests themselves.
114 async fn request_body_filter(
115 &self,
116 _session: &mut Session,
117 _body: &mut Option<Bytes>,
118 _end_of_stream: bool,
119 _ctx: &mut Self::CTX,
120 ) -> Result<()>
121 where
122 Self::CTX: Send + Sync,
123 {
124 Ok(())
125 }
126
127 /// This filter decides if the request is cacheable and what cache backend to use
128 ///
129 /// The caller can interact with `Session.cache` to enable caching.
130 ///
131 /// By default this filter does nothing which effectively disables caching.
132 // Ideally only session.cache should be modified, TODO: reflect that in this interface
133 fn request_cache_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> Result<()> {
134 Ok(())
135 }
136
137 /// This callback generates the cache key
138 ///
139 /// This callback is called only when cache is enabled for this request
140 ///
141 /// By default this callback returns a default cache key generated from the request.
142 fn cache_key_callback(&self, session: &Session, _ctx: &mut Self::CTX) -> Result<CacheKey> {
143 let req_header = session.req_header();
144 Ok(CacheKey::default(req_header))
145 }
146
147 /// This callback is invoked when a cacheable response is ready to be admitted to cache.
148 fn cache_miss(&self, session: &mut Session, _ctx: &mut Self::CTX) {
149 session.cache.cache_miss();
150 }
151
152 /// This filter is called after a successful cache lookup and before the
153 /// cache asset is ready to be used.
154 ///
155 /// This filter allows the user to log or force invalidate the asset, or
156 /// to adjust the body reader associated with the cache hit.
157 /// This also runs on stale hit assets (for which `is_fresh` is false).
158 ///
159 /// The value returned indicates if the force invalidation should be used,
160 /// and which kind. Returning `None` indicates no forced invalidation
161 async fn cache_hit_filter(
162 &self,
163 _session: &mut Session,
164 _meta: &CacheMeta,
165 _hit_handler: &mut HitHandler,
166 _is_fresh: bool,
167 _ctx: &mut Self::CTX,
168 ) -> Result<Option<ForcedFreshness>>
169 where
170 Self::CTX: Send + Sync,
171 {
172 Ok(None)
173 }
174
175 /// Decide if a request should continue to upstream after not being served from cache.
176 ///
177 /// returns: Ok(true) if the request should continue, Ok(false) if a response was written by the
178 /// callback and the session should be finished, or an error
179 ///
180 /// This filter can be used for deferring checks like rate limiting or access control to when they
181 /// actually needed after cache miss.
182 ///
183 /// By default the session will attempt to be reused after returning Ok(false). It is the
184 /// caller's responsibility to disable keepalive or drain the request body if needed.
185 async fn proxy_upstream_filter(
186 &self,
187 _session: &mut Session,
188 _ctx: &mut Self::CTX,
189 ) -> Result<bool>
190 where
191 Self::CTX: Send + Sync,
192 {
193 Ok(true)
194 }
195
196 /// Decide if the response is cacheable
197 fn response_cache_filter(
198 &self,
199 _session: &Session,
200 _resp: &ResponseHeader,
201 _ctx: &mut Self::CTX,
202 ) -> Result<RespCacheable> {
203 Ok(Uncacheable(NoCacheReason::Custom("default")))
204 }
205
206 /// Decide how to generate cache vary key from both request and response
207 ///
208 /// None means no variance is needed.
209 fn cache_vary_filter(
210 &self,
211 _meta: &CacheMeta,
212 _ctx: &mut Self::CTX,
213 _req: &RequestHeader,
214 ) -> Option<HashBinary> {
215 // default to None for now to disable vary feature
216 None
217 }
218
219 /// Decide if the incoming request's condition _fails_ against the cached response.
220 ///
221 /// Returning `Ok(true)` means that the response does _not_ match against the condition, and
222 /// that the proxy can return `304 Not Modified` downstream.
223 ///
224 /// An example is a conditional GET request with `If-None-Match: "foobar"`. If the cached
225 /// response contains the `ETag: "foobar"`, then the condition fails, and `304 Not Modified`
226 /// should be returned. Else, the condition passes which means the full `200 OK` response must
227 /// be sent.
228 fn cache_not_modified_filter(
229 &self,
230 session: &Session,
231 resp: &ResponseHeader,
232 _ctx: &mut Self::CTX,
233 ) -> Result<bool> {
234 Ok(
235 pingora_core::protocols::http::conditional_filter::not_modified_filter(
236 session.req_header(),
237 resp,
238 ),
239 )
240 }
241
242 /// This filter is called when cache is enabled to determine what byte range to return (in both
243 /// cache hit and miss cases) from the response body. It is only used when caching is enabled,
244 /// otherwise the upstream is responsible for any filtering. It allows users to define the range
245 /// this request is for via its return type `range_filter::RangeType`.
246 ///
247 /// It also allow users to modify the response header accordingly.
248 ///
249 /// The default implementation can handle a single-range as per [RFC7232].
250 ///
251 /// [RFC7232]: https://www.rfc-editor.org/rfc/rfc7232
252 fn range_header_filter(
253 &self,
254 session: &mut Session,
255 resp: &mut ResponseHeader,
256 _ctx: &mut Self::CTX,
257 ) -> range_filter::RangeType {
258 const DEFAULT_MAX_RANGES: Option<usize> = Some(200);
259 proxy_cache::range_filter::range_header_filter(
260 session.req_header(),
261 resp,
262 DEFAULT_MAX_RANGES,
263 )
264 }
265
266 /// Modify the request before it is sent to the upstream
267 ///
268 /// Unlike [Self::request_filter()], this filter allows to change the request headers to send
269 /// to the upstream.
270 async fn upstream_request_filter(
271 &self,
272 _session: &mut Session,
273 _upstream_request: &mut RequestHeader,
274 _ctx: &mut Self::CTX,
275 ) -> Result<()>
276 where
277 Self::CTX: Send + Sync,
278 {
279 Ok(())
280 }
281
282 /// Modify the response header from the upstream
283 ///
284 /// The modification is before caching, so any change here will be stored in the cache if enabled.
285 ///
286 /// Responses served from cache won't trigger this filter. If the cache needed revalidation,
287 /// only the 304 from upstream will trigger the filter (though it will be merged into the
288 /// cached header, not served directly to downstream).
289 async fn upstream_response_filter(
290 &self,
291 _session: &mut Session,
292 _upstream_response: &mut ResponseHeader,
293 _ctx: &mut Self::CTX,
294 ) -> Result<()>
295 where
296 Self::CTX: Send + Sync,
297 {
298 Ok(())
299 }
300
301 /// Modify the response header before it is send to the downstream
302 ///
303 /// The modification is after caching. This filter is called for all responses including
304 /// responses served from cache.
305 async fn response_filter(
306 &self,
307 _session: &mut Session,
308 _upstream_response: &mut ResponseHeader,
309 _ctx: &mut Self::CTX,
310 ) -> Result<()>
311 where
312 Self::CTX: Send + Sync,
313 {
314 Ok(())
315 }
316
317 // custom_forwarding is called when downstream and upstream connections are successfully established.
318 #[doc(hidden)]
319 async fn custom_forwarding(
320 &self,
321 _session: &mut Session,
322 _ctx: &mut Self::CTX,
323 _custom_message_to_upstream: Option<mpsc::Sender<Bytes>>,
324 _custom_message_to_downstream: mpsc::Sender<Bytes>,
325 ) -> Result<()>
326 where
327 Self::CTX: Send + Sync,
328 {
329 Ok(())
330 }
331
332 // received a custom message from the downstream before sending it to the upstream.
333 #[doc(hidden)]
334 async fn downstream_custom_message_proxy_filter(
335 &self,
336 _session: &mut Session,
337 custom_message: Bytes,
338 _ctx: &mut Self::CTX,
339 _final_hop: bool,
340 ) -> Result<Option<Bytes>>
341 where
342 Self::CTX: Send + Sync,
343 {
344 Ok(Some(custom_message))
345 }
346
347 // received a custom message from the upstream before sending it to the downstream.
348 #[doc(hidden)]
349 async fn upstream_custom_message_proxy_filter(
350 &self,
351 _session: &mut Session,
352 custom_message: Bytes,
353 _ctx: &mut Self::CTX,
354 _final_hop: bool,
355 ) -> Result<Option<Bytes>>
356 where
357 Self::CTX: Send + Sync,
358 {
359 Ok(Some(custom_message))
360 }
361
362 /// Similar to [Self::upstream_response_filter()] but for response body
363 ///
364 /// This function will be called every time a piece of response body is received. The `body` is
365 /// **not the entire response body**.
366 fn upstream_response_body_filter(
367 &self,
368 _session: &mut Session,
369 _body: &mut Option<Bytes>,
370 _end_of_stream: bool,
371 _ctx: &mut Self::CTX,
372 ) -> Result<Option<Duration>> {
373 Ok(None)
374 }
375
376 /// Similar to [Self::upstream_response_filter()] but for response trailers
377 fn upstream_response_trailer_filter(
378 &self,
379 _session: &mut Session,
380 _upstream_trailers: &mut header::HeaderMap,
381 _ctx: &mut Self::CTX,
382 ) -> Result<()> {
383 Ok(())
384 }
385
386 /// Similar to [Self::response_filter()] but for response body chunks
387 fn response_body_filter(
388 &self,
389 _session: &mut Session,
390 _body: &mut Option<Bytes>,
391 _end_of_stream: bool,
392 _ctx: &mut Self::CTX,
393 ) -> Result<Option<Duration>>
394 where
395 Self::CTX: Send + Sync,
396 {
397 Ok(None)
398 }
399
400 /// Similar to [Self::response_filter()] but for response trailers.
401 /// Note, returning an Ok(Some(Bytes)) will result in the downstream response
402 /// trailers being written to the response body.
403 ///
404 /// TODO: make this interface more intuitive
405 async fn response_trailer_filter(
406 &self,
407 _session: &mut Session,
408 _upstream_trailers: &mut header::HeaderMap,
409 _ctx: &mut Self::CTX,
410 ) -> Result<Option<Bytes>>
411 where
412 Self::CTX: Send + Sync,
413 {
414 Ok(None)
415 }
416
417 /// This filter is called when the entire response is sent to the downstream successfully or
418 /// there is a fatal error that terminate the request.
419 ///
420 /// An error log is already emitted if there is any error. This phase is used for collecting
421 /// metrics and sending access logs.
422 async fn logging(&self, _session: &mut Session, _e: Option<&Error>, _ctx: &mut Self::CTX)
423 where
424 Self::CTX: Send + Sync,
425 {
426 }
427
428 /// A value of true means that the log message will be suppressed. The default value is false.
429 fn suppress_error_log(&self, _session: &Session, _ctx: &Self::CTX, _error: &Error) -> bool {
430 false
431 }
432
433 /// This filter is called when there is an error **after** a connection is established (or reused)
434 /// to the upstream.
435 fn error_while_proxy(
436 &self,
437 peer: &HttpPeer,
438 session: &mut Session,
439 e: Box<Error>,
440 _ctx: &mut Self::CTX,
441 client_reused: bool,
442 ) -> Box<Error> {
443 let mut e = e.more_context(format!("Peer: {}", peer));
444 // only reused client connections where retry buffer is not truncated
445 e.retry
446 .decide_reuse(client_reused && !session.as_ref().retry_buffer_truncated());
447 e
448 }
449
450 /// This filter is called when there is an error in the process of establishing a connection
451 /// to the upstream.
452 ///
453 /// In this filter the user can decide whether the error is retry-able by marking the error `e`.
454 ///
455 /// If the error can be retried, [Self::upstream_peer()] will be called again so that the user
456 /// can decide whether to send the request to the same upstream or another upstream that is possibly
457 /// available.
458 fn fail_to_connect(
459 &self,
460 _session: &mut Session,
461 _peer: &HttpPeer,
462 _ctx: &mut Self::CTX,
463 e: Box<Error>,
464 ) -> Box<Error> {
465 e
466 }
467
468 /// This filter is called when the request encounters a fatal error.
469 ///
470 /// Users may write an error response to the downstream if the downstream is still writable.
471 ///
472 /// The response status code of the error response may be returned for logging purposes.
473 /// Additionally, the user can return whether this session may be reused in spite of the error.
474 /// Today this reuse status is only respected for errors that occur prior to upstream peer
475 /// selection, and the keepalive configured on the `Session` itself still takes precedent.
476 async fn fail_to_proxy(
477 &self,
478 session: &mut Session,
479 e: &Error,
480 _ctx: &mut Self::CTX,
481 ) -> FailToProxy
482 where
483 Self::CTX: Send + Sync,
484 {
485 let code = match e.etype() {
486 HTTPStatus(code) => *code,
487 _ => {
488 match e.esource() {
489 ErrorSource::Upstream => 502,
490 ErrorSource::Downstream => {
491 match e.etype() {
492 WriteError | ReadError | ConnectionClosed => {
493 /* conn already dead */
494 0
495 }
496 _ => 400,
497 }
498 }
499 ErrorSource::Internal | ErrorSource::Unset => 500,
500 }
501 }
502 };
503 if code > 0 {
504 session.respond_error(code).await.unwrap_or_else(|e| {
505 error!("failed to send error response to downstream: {e}");
506 });
507 }
508
509 FailToProxy {
510 error_code: code,
511 // default to no reuse, which is safest
512 can_reuse_downstream: false,
513 }
514 }
515
516 /// Decide whether should serve stale when encountering an error or during revalidation
517 ///
518 /// An implementation should follow
519 /// <https://datatracker.ietf.org/doc/html/rfc9111#section-4.2.4>
520 /// <https://www.rfc-editor.org/rfc/rfc5861#section-4>
521 ///
522 /// This filter is only called if cache is enabled.
523 // 5xx HTTP status will be encoded as ErrorType::HTTPStatus(code)
524 fn should_serve_stale(
525 &self,
526 _session: &mut Session,
527 _ctx: &mut Self::CTX,
528 error: Option<&Error>, // None when it is called during stale while revalidate
529 ) -> bool {
530 // A cache MUST NOT generate a stale response unless
531 // it is disconnected
532 // or doing so is explicitly permitted by the client or origin server
533 // (e.g. headers or an out-of-band contract)
534 error.is_some_and(|e| e.esource() == &ErrorSource::Upstream)
535 }
536
537 /// This filter is called when the request just established or reused a connection to the upstream
538 ///
539 /// This filter allows user to log timing and connection related info.
540 async fn connected_to_upstream(
541 &self,
542 _session: &mut Session,
543 _reused: bool,
544 _peer: &HttpPeer,
545 #[cfg(unix)] _fd: std::os::unix::io::RawFd,
546 #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
547 _digest: Option<&Digest>,
548 _ctx: &mut Self::CTX,
549 ) -> Result<()>
550 where
551 Self::CTX: Send + Sync,
552 {
553 Ok(())
554 }
555
556 /// This callback is invoked every time request related error log needs to be generated
557 ///
558 /// Users can define what is important to be written about this request via the returned string.
559 fn request_summary(&self, session: &Session, _ctx: &Self::CTX) -> String {
560 session.as_ref().request_summary()
561 }
562
563 /// Whether the request should be used to invalidate(delete) the HTTP cache
564 ///
565 /// - `true`: this request will be used to invalidate the cache.
566 /// - `false`: this request is a treated as a normal request
567 fn is_purge(&self, _session: &Session, _ctx: &Self::CTX) -> bool {
568 false
569 }
570
571 /// This filter is called after the proxy cache generates the downstream response to the purge
572 /// request (to invalidate or delete from the HTTP cache), based on the purge status, which
573 /// indicates whether the request succeeded or failed.
574 ///
575 /// The filter allows the user to modify or replace the generated downstream response.
576 /// If the filter returns `Err`, the proxy will instead send a 500 response.
577 fn purge_response_filter(
578 &self,
579 _session: &Session,
580 _ctx: &mut Self::CTX,
581 _purge_status: PurgeStatus,
582 _purge_response: &mut std::borrow::Cow<'static, ResponseHeader>,
583 ) -> Result<()> {
584 Ok(())
585 }
586}
587
588/// Context struct returned by `fail_to_proxy`.
589pub struct FailToProxy {
590 pub error_code: u16,
591 pub can_reuse_downstream: bool,
592}