pingora_proxy/proxy_trait.rs
1// Copyright 2026 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::*;
16use pingora_cache::{
17 key::HashBinary,
18 CacheKey, CacheMeta, ForcedFreshness, HitHandler,
19 RespCacheable::{self, *},
20};
21use proxy_cache::range_filter::{self};
22use std::time::Duration;
23
24/// The interface to control the HTTP proxy
25///
26/// The methods in [ProxyHttp] are filters/callbacks which will be performed on all requests at their
27/// particular stage (if applicable).
28///
29/// If any of the filters returns [Result::Err], the request will fail, and the error will be logged.
30#[cfg_attr(not(doc_async_trait), async_trait)]
31pub trait ProxyHttp {
32 /// The per request object to share state across the different filters
33 type CTX;
34
35 /// Define how the `ctx` should be created.
36 fn new_ctx(&self) -> Self::CTX;
37
38 /// Define where the proxy should send the request to.
39 ///
40 /// The returned [HttpPeer] contains the information regarding where and how this request should
41 /// be forwarded to.
42 async fn upstream_peer(
43 &self,
44 session: &mut Session,
45 ctx: &mut Self::CTX,
46 ) -> Result<Box<HttpPeer>>;
47
48 /// Set up downstream modules.
49 ///
50 /// In this phase, users can add or configure [HttpModules] before the server starts up.
51 ///
52 /// In the default implementation of this method, [ResponseCompressionBuilder] is added
53 /// and disabled.
54 fn init_downstream_modules(&self, modules: &mut HttpModules) {
55 // Add disabled downstream compression module by default
56 modules.add_module(ResponseCompressionBuilder::enable(0));
57 }
58
59 /// Handle the incoming request.
60 ///
61 /// In this phase, users can parse, validate, rate limit, perform access control and/or
62 /// return a response for this request.
63 ///
64 /// If the user already sent a response to this request, an `Ok(true)` should be returned so that
65 /// the proxy would exit. The proxy continues to the next phases when `Ok(false)` is returned.
66 ///
67 /// By default this filter does nothing and returns `Ok(false)`.
68 async fn request_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool>
69 where
70 Self::CTX: Send + Sync,
71 {
72 Ok(false)
73 }
74
75 /// Handle the incoming request before any downstream module is executed.
76 ///
77 /// This function is similar to [Self::request_filter()] but executes before any other logic,
78 /// including downstream module logic. The main purpose of this function is to provide finer
79 /// grained control of the behavior of the modules.
80 ///
81 /// Note that because this function is executed before any module that might provide access
82 /// control or rate limiting, logic should stay in request_filter() if it can in order to be
83 /// protected by said modules.
84 async fn early_request_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> Result<()>
85 where
86 Self::CTX: Send + Sync,
87 {
88 Ok(())
89 }
90
91 /// Returns whether this session is allowed to spawn subrequests.
92 ///
93 /// This function is checked after [Self::early_request_filter] to allow that filter to configure
94 /// this if required. This will also run for subrequests themselves, which may allowed to spawn
95 /// their own subrequests.
96 ///
97 /// Note that this doesn't prevent subrequests from being spawned based on the session by proxy
98 /// core functionality, e.g. background cache revalidation requires spawning subrequests.
99 fn allow_spawning_subrequest(&self, _session: &Session, _ctx: &Self::CTX) -> bool
100 where
101 Self::CTX: Send + Sync,
102 {
103 false
104 }
105
106 /// Handle the incoming request body.
107 ///
108 /// This function will be called every time a piece of request body is received. The `body` is
109 /// **not the entire request body**.
110 ///
111 /// The async nature of this function allows to throttle the upload speed and/or executing
112 /// heavy computation logic such as WAF rules on offloaded threads without blocking the threads
113 /// who process the requests themselves.
114 async fn request_body_filter(
115 &self,
116 _session: &mut Session,
117 _body: &mut Option<Bytes>,
118 _end_of_stream: bool,
119 _ctx: &mut Self::CTX,
120 ) -> Result<()>
121 where
122 Self::CTX: Send + Sync,
123 {
124 Ok(())
125 }
126
127 /// This filter decides if the request is cacheable and what cache backend to use
128 ///
129 /// The caller can interact with `Session.cache` to enable caching.
130 ///
131 /// By default this filter does nothing which effectively disables caching.
132 // Ideally only session.cache should be modified, TODO: reflect that in this interface
133 fn request_cache_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> Result<()>
134 where
135 Self::CTX: Send + Sync,
136 {
137 Ok(())
138 }
139
140 /// This callback generates the cache key.
141 ///
142 /// This callback is called only when cache is enabled for this request.
143 ///
144 /// There is no sensible default cache key for all proxy applications. The
145 /// correct key depends on which request properties affect upstream responses
146 /// (e.g. `Vary` headers, custom request filters that modify the origin host).
147 /// Getting this wrong leads to cache poisoning.
148 ///
149 /// See `pingora-proxy/tests/utils/server_utils.rs` for a minimal (not
150 /// production-ready) reference implementation.
151 ///
152 /// # Panics
153 ///
154 /// The default implementation panics. You **must** override this method when
155 /// caching is enabled.
156 fn cache_key_callback(&self, _session: &Session, _ctx: &mut Self::CTX) -> Result<CacheKey> {
157 unimplemented!("cache_key_callback must be implemented when caching is enabled")
158 }
159
160 /// This callback is invoked when a cacheable response is ready to be admitted to cache.
161 fn cache_miss(&self, session: &mut Session, _ctx: &mut Self::CTX) {
162 session.cache.cache_miss();
163 }
164
165 /// This filter is called after a successful cache lookup and before the
166 /// cache asset is ready to be used.
167 ///
168 /// This filter allows the user to log or force invalidate the asset, or
169 /// to adjust the body reader associated with the cache hit.
170 /// This also runs on stale hit assets (for which `is_fresh` is false).
171 ///
172 /// The value returned indicates if the force invalidation should be used,
173 /// and which kind. Returning `None` indicates no forced invalidation
174 async fn cache_hit_filter(
175 &self,
176 _session: &mut Session,
177 _meta: &CacheMeta,
178 _hit_handler: &mut HitHandler,
179 _is_fresh: bool,
180 _ctx: &mut Self::CTX,
181 ) -> Result<Option<ForcedFreshness>>
182 where
183 Self::CTX: Send + Sync,
184 {
185 Ok(None)
186 }
187
188 /// Decide if a request should continue to upstream after not being served from cache.
189 ///
190 /// returns: Ok(true) if the request should continue, Ok(false) if a response was written by the
191 /// callback and the session should be finished, or an error
192 ///
193 /// This filter can be used for deferring checks like rate limiting or access control to when they
194 /// actually needed after cache miss.
195 ///
196 /// By default the session will attempt to be reused after returning Ok(false). It is the
197 /// caller's responsibility to disable keepalive or drain the request body if needed.
198 async fn proxy_upstream_filter(
199 &self,
200 _session: &mut Session,
201 _ctx: &mut Self::CTX,
202 ) -> Result<bool>
203 where
204 Self::CTX: Send + Sync,
205 {
206 Ok(true)
207 }
208
209 /// Decide if the response is cacheable
210 fn response_cache_filter(
211 &self,
212 _session: &Session,
213 _resp: &ResponseHeader,
214 _ctx: &mut Self::CTX,
215 ) -> Result<RespCacheable> {
216 Ok(Uncacheable(NoCacheReason::Custom("default")))
217 }
218
219 /// Decide how to generate cache vary key from both request and response
220 ///
221 /// None means no variance is needed.
222 fn cache_vary_filter(
223 &self,
224 _meta: &CacheMeta,
225 _ctx: &mut Self::CTX,
226 _req: &RequestHeader,
227 ) -> Option<HashBinary> {
228 // default to None for now to disable vary feature
229 None
230 }
231
232 /// Decide if the incoming request's condition _fails_ against the cached response.
233 ///
234 /// Returning `Ok(true)` means that the response does _not_ match against the condition, and
235 /// that the proxy can return `304 Not Modified` downstream.
236 ///
237 /// An example is a conditional GET request with `If-None-Match: "foobar"`. If the cached
238 /// response contains the `ETag: "foobar"`, then the condition fails, and `304 Not Modified`
239 /// should be returned. Else, the condition passes which means the full `200 OK` response must
240 /// be sent.
241 fn cache_not_modified_filter(
242 &self,
243 session: &Session,
244 resp: &ResponseHeader,
245 _ctx: &mut Self::CTX,
246 ) -> Result<bool> {
247 Ok(
248 pingora_core::protocols::http::conditional_filter::not_modified_filter(
249 session.req_header(),
250 resp,
251 ),
252 )
253 }
254
255 /// This filter is called when cache is enabled to determine what byte range to return (in both
256 /// cache hit and miss cases) from the response body. It is only used when caching is enabled,
257 /// otherwise the upstream is responsible for any filtering. It allows users to define the range
258 /// this request is for via its return type `range_filter::RangeType`.
259 ///
260 /// It also allow users to modify the response header accordingly.
261 ///
262 /// The default implementation can handle a single-range as per [RFC7232].
263 ///
264 /// [RFC7232]: https://www.rfc-editor.org/rfc/rfc7232
265 fn range_header_filter(
266 &self,
267 session: &mut Session,
268 resp: &mut ResponseHeader,
269 _ctx: &mut Self::CTX,
270 ) -> range_filter::RangeType {
271 const DEFAULT_MAX_RANGES: Option<usize> = Some(200);
272 proxy_cache::range_filter::range_header_filter(
273 session.req_header(),
274 resp,
275 DEFAULT_MAX_RANGES,
276 )
277 }
278
279 /// Modify the request before it is sent to the upstream
280 ///
281 /// Unlike [Self::request_filter()], this filter allows to change the request headers to send
282 /// to the upstream.
283 async fn upstream_request_filter(
284 &self,
285 _session: &mut Session,
286 _upstream_request: &mut RequestHeader,
287 _ctx: &mut Self::CTX,
288 ) -> Result<()>
289 where
290 Self::CTX: Send + Sync,
291 {
292 Ok(())
293 }
294
295 /// Modify the response header from the upstream
296 ///
297 /// The modification is before caching, so any change here will be stored in the cache if enabled.
298 ///
299 /// Responses served from cache won't trigger this filter. If the cache needed revalidation,
300 /// only the 304 from upstream will trigger the filter (though it will be merged into the
301 /// cached header, not served directly to downstream).
302 async fn upstream_response_filter(
303 &self,
304 _session: &mut Session,
305 _upstream_response: &mut ResponseHeader,
306 _ctx: &mut Self::CTX,
307 ) -> Result<()>
308 where
309 Self::CTX: Send + Sync,
310 {
311 Ok(())
312 }
313
314 /// Modify the response header before it is send to the downstream
315 ///
316 /// The modification is after caching. This filter is called for all responses including
317 /// responses served from cache.
318 async fn response_filter(
319 &self,
320 _session: &mut Session,
321 _upstream_response: &mut ResponseHeader,
322 _ctx: &mut Self::CTX,
323 ) -> Result<()>
324 where
325 Self::CTX: Send + Sync,
326 {
327 Ok(())
328 }
329
330 // custom_forwarding is called when downstream and upstream connections are successfully established.
331 #[doc(hidden)]
332 async fn custom_forwarding(
333 &self,
334 _session: &mut Session,
335 _ctx: &mut Self::CTX,
336 _custom_message_to_upstream: Option<mpsc::Sender<Bytes>>,
337 _custom_message_to_downstream: mpsc::Sender<Bytes>,
338 ) -> Result<()>
339 where
340 Self::CTX: Send + Sync,
341 {
342 Ok(())
343 }
344
345 // received a custom message from the downstream before sending it to the upstream.
346 #[doc(hidden)]
347 async fn downstream_custom_message_proxy_filter(
348 &self,
349 _session: &mut Session,
350 custom_message: Bytes,
351 _ctx: &mut Self::CTX,
352 _final_hop: bool,
353 ) -> Result<Option<Bytes>>
354 where
355 Self::CTX: Send + Sync,
356 {
357 Ok(Some(custom_message))
358 }
359
360 // received a custom message from the upstream before sending it to the downstream.
361 #[doc(hidden)]
362 async fn upstream_custom_message_proxy_filter(
363 &self,
364 _session: &mut Session,
365 custom_message: Bytes,
366 _ctx: &mut Self::CTX,
367 _final_hop: bool,
368 ) -> Result<Option<Bytes>>
369 where
370 Self::CTX: Send + Sync,
371 {
372 Ok(Some(custom_message))
373 }
374
375 /// Similar to [Self::upstream_response_filter()] but for response body
376 ///
377 /// This function will be called every time a piece of response body is received. The `body` is
378 /// **not the entire response body**.
379 fn upstream_response_body_filter(
380 &self,
381 _session: &mut Session,
382 _body: &mut Option<Bytes>,
383 _end_of_stream: bool,
384 _ctx: &mut Self::CTX,
385 ) -> Result<Option<Duration>> {
386 Ok(None)
387 }
388
389 /// Similar to [Self::upstream_response_filter()] but for response trailers
390 fn upstream_response_trailer_filter(
391 &self,
392 _session: &mut Session,
393 _upstream_trailers: &mut header::HeaderMap,
394 _ctx: &mut Self::CTX,
395 ) -> Result<()> {
396 Ok(())
397 }
398
399 /// Similar to [Self::response_filter()] but for response body chunks
400 fn response_body_filter(
401 &self,
402 _session: &mut Session,
403 _body: &mut Option<Bytes>,
404 _end_of_stream: bool,
405 _ctx: &mut Self::CTX,
406 ) -> Result<Option<Duration>>
407 where
408 Self::CTX: Send + Sync,
409 {
410 Ok(None)
411 }
412
413 /// Similar to [Self::response_filter()] but for response trailers.
414 /// Note, returning an Ok(Some(Bytes)) will result in the downstream response
415 /// trailers being written to the response body.
416 ///
417 /// TODO: make this interface more intuitive
418 async fn response_trailer_filter(
419 &self,
420 _session: &mut Session,
421 _upstream_trailers: &mut header::HeaderMap,
422 _ctx: &mut Self::CTX,
423 ) -> Result<Option<Bytes>>
424 where
425 Self::CTX: Send + Sync,
426 {
427 Ok(None)
428 }
429
430 /// This filter is called when the entire response is sent to the downstream successfully or
431 /// there is a fatal error that terminate the request.
432 ///
433 /// An error log is already emitted if there is any error. This phase is used for collecting
434 /// metrics and sending access logs.
435 async fn logging(&self, _session: &mut Session, _e: Option<&Error>, _ctx: &mut Self::CTX)
436 where
437 Self::CTX: Send + Sync,
438 {
439 }
440
441 /// A value of true means that the log message will be suppressed. The default value is false.
442 fn suppress_error_log(&self, _session: &Session, _ctx: &Self::CTX, _error: &Error) -> bool {
443 false
444 }
445
446 /// This filter is called when there is an error **after** a connection is established (or reused)
447 /// to the upstream.
448 fn error_while_proxy(
449 &self,
450 peer: &HttpPeer,
451 session: &mut Session,
452 e: Box<Error>,
453 _ctx: &mut Self::CTX,
454 client_reused: bool,
455 ) -> Box<Error> {
456 let mut e = e.more_context(format!("Peer: {}", peer));
457 // only reused client connections where retry buffer is not truncated
458 e.retry
459 .decide_reuse(client_reused && !session.as_ref().retry_buffer_truncated());
460 e
461 }
462
463 /// This filter is called when there is an error in the process of establishing a connection
464 /// to the upstream.
465 ///
466 /// In this filter the user can decide whether the error is retry-able by marking the error `e`.
467 ///
468 /// If the error can be retried, [Self::upstream_peer()] will be called again so that the user
469 /// can decide whether to send the request to the same upstream or another upstream that is possibly
470 /// available.
471 fn fail_to_connect(
472 &self,
473 _session: &mut Session,
474 _peer: &HttpPeer,
475 _ctx: &mut Self::CTX,
476 e: Box<Error>,
477 ) -> Box<Error> {
478 e
479 }
480
481 /// This filter is called when the request encounters a fatal error.
482 ///
483 /// Users may write an error response to the downstream if the downstream is still writable.
484 ///
485 /// The response status code of the error response may be returned for logging purposes.
486 /// Additionally, the user can return whether this session may be reused in spite of the error.
487 /// Today this reuse status is only respected for errors that occur prior to upstream peer
488 /// selection, and the keepalive configured on the `Session` itself still takes precedent.
489 async fn fail_to_proxy(
490 &self,
491 session: &mut Session,
492 e: &Error,
493 _ctx: &mut Self::CTX,
494 ) -> FailToProxy
495 where
496 Self::CTX: Send + Sync,
497 {
498 let code = match e.etype() {
499 HTTPStatus(code) => *code,
500 _ => {
501 match e.esource() {
502 ErrorSource::Upstream => 502,
503 ErrorSource::Downstream => {
504 match e.etype() {
505 WriteError | ReadError | ConnectionClosed => {
506 /* conn already dead */
507 0
508 }
509 _ => 400,
510 }
511 }
512 ErrorSource::Internal | ErrorSource::Unset => 500,
513 }
514 }
515 };
516 if code > 0 {
517 session.respond_error(code).await.unwrap_or_else(|e| {
518 error!("failed to send error response to downstream: {e}");
519 });
520 }
521
522 FailToProxy {
523 error_code: code,
524 // default to no reuse, which is safest
525 can_reuse_downstream: false,
526 }
527 }
528
529 /// Decide whether should serve stale when encountering an error or during revalidation
530 ///
531 /// An implementation should follow
532 /// <https://datatracker.ietf.org/doc/html/rfc9111#section-4.2.4>
533 /// <https://www.rfc-editor.org/rfc/rfc5861#section-4>
534 ///
535 /// This filter is only called if cache is enabled.
536 // 5xx HTTP status will be encoded as ErrorType::HTTPStatus(code)
537 fn should_serve_stale(
538 &self,
539 _session: &mut Session,
540 _ctx: &mut Self::CTX,
541 error: Option<&Error>, // None when it is called during stale while revalidate
542 ) -> bool {
543 // A cache MUST NOT generate a stale response unless
544 // it is disconnected
545 // or doing so is explicitly permitted by the client or origin server
546 // (e.g. headers or an out-of-band contract)
547 error.is_some_and(|e| e.esource() == &ErrorSource::Upstream)
548 }
549
550 /// This filter is called when the request just established or reused a connection to the upstream
551 ///
552 /// This filter allows user to log timing and connection related info.
553 async fn connected_to_upstream(
554 &self,
555 _session: &mut Session,
556 _reused: bool,
557 _peer: &HttpPeer,
558 #[cfg(unix)] _fd: std::os::unix::io::RawFd,
559 #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
560 _digest: Option<&Digest>,
561 _ctx: &mut Self::CTX,
562 ) -> Result<()>
563 where
564 Self::CTX: Send + Sync,
565 {
566 Ok(())
567 }
568
569 /// This callback is invoked every time request related error log needs to be generated
570 ///
571 /// Users can define what is important to be written about this request via the returned string.
572 fn request_summary(&self, session: &Session, _ctx: &Self::CTX) -> String {
573 session.as_ref().request_summary()
574 }
575
576 /// Whether the request should be used to invalidate(delete) the HTTP cache
577 ///
578 /// - `true`: this request will be used to invalidate the cache.
579 /// - `false`: this request is a treated as a normal request
580 fn is_purge(&self, _session: &Session, _ctx: &Self::CTX) -> bool {
581 false
582 }
583
584 /// This filter is called after the proxy cache generates the downstream response to the purge
585 /// request (to invalidate or delete from the HTTP cache), based on the purge status, which
586 /// indicates whether the request succeeded or failed.
587 ///
588 /// The filter allows the user to modify or replace the generated downstream response.
589 /// If the filter returns `Err`, the proxy will instead send a 500 response.
590 fn purge_response_filter(
591 &self,
592 _session: &Session,
593 _ctx: &mut Self::CTX,
594 _purge_status: PurgeStatus,
595 _purge_response: &mut std::borrow::Cow<'static, ResponseHeader>,
596 ) -> Result<()> {
597 Ok(())
598 }
599}
600
601/// Context struct returned by `fail_to_proxy`.
602pub struct FailToProxy {
603 pub error_code: u16,
604 pub can_reuse_downstream: bool,
605}