pingora_proxy/proxy_trait.rs
1// Copyright 2025 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::*;
16use pingora_cache::{
17 key::HashBinary,
18 CacheKey, CacheMeta, ForcedInvalidationKind, HitHandler,
19 RespCacheable::{self, *},
20};
21use proxy_cache::range_filter::{self};
22use std::time::Duration;
23
24/// The interface to control the HTTP proxy
25///
26/// The methods in [ProxyHttp] are filters/callbacks which will be performed on all requests at their
27/// particular stage (if applicable).
28///
29/// If any of the filters returns [Result::Err], the request will fail, and the error will be logged.
30#[cfg_attr(not(doc_async_trait), async_trait)]
31pub trait ProxyHttp {
32 /// The per request object to share state across the different filters
33 type CTX;
34
35 /// Define how the `ctx` should be created.
36 fn new_ctx(&self) -> Self::CTX;
37
38 /// Define where the proxy should send the request to.
39 ///
40 /// The returned [HttpPeer] contains the information regarding where and how this request should
41 /// be forwarded to.
42 async fn upstream_peer(
43 &self,
44 session: &mut Session,
45 ctx: &mut Self::CTX,
46 ) -> Result<Box<HttpPeer>>;
47
48 /// Set up downstream modules.
49 ///
50 /// In this phase, users can add or configure [HttpModules] before the server starts up.
51 ///
52 /// In the default implementation of this method, [ResponseCompressionBuilder] is added
53 /// and disabled.
54 fn init_downstream_modules(&self, modules: &mut HttpModules) {
55 // Add disabled downstream compression module by default
56 modules.add_module(ResponseCompressionBuilder::enable(0));
57 }
58
59 /// Handle the incoming request.
60 ///
61 /// In this phase, users can parse, validate, rate limit, perform access control and/or
62 /// return a response for this request.
63 ///
64 /// If the user already sent a response to this request, an `Ok(true)` should be returned so that
65 /// the proxy would exit. The proxy continues to the next phases when `Ok(false)` is returned.
66 ///
67 /// By default this filter does nothing and returns `Ok(false)`.
68 async fn request_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool>
69 where
70 Self::CTX: Send + Sync,
71 {
72 Ok(false)
73 }
74
75 /// Handle the incoming request before any downstream module is executed.
76 ///
77 /// This function is similar to [Self::request_filter()] but executes before any other logic,
78 /// including downstream module logic. The main purpose of this function is to provide finer
79 /// grained control of the behavior of the modules.
80 ///
81 /// Note that because this function is executed before any module that might provide access
82 /// control or rate limiting, logic should stay in request_filter() if it can in order to be
83 /// protected by said modules.
84 async fn early_request_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> Result<()>
85 where
86 Self::CTX: Send + Sync,
87 {
88 Ok(())
89 }
90
91 /// Handle the incoming request body.
92 ///
93 /// This function will be called every time a piece of request body is received. The `body` is
94 /// **not the entire request body**.
95 ///
96 /// The async nature of this function allows to throttle the upload speed and/or executing
97 /// heavy computation logic such as WAF rules on offloaded threads without blocking the threads
98 /// who process the requests themselves.
99 async fn request_body_filter(
100 &self,
101 _session: &mut Session,
102 _body: &mut Option<Bytes>,
103 _end_of_stream: bool,
104 _ctx: &mut Self::CTX,
105 ) -> Result<()>
106 where
107 Self::CTX: Send + Sync,
108 {
109 Ok(())
110 }
111
112 /// This filter decides if the request is cacheable and what cache backend to use
113 ///
114 /// The caller can interact with `Session.cache` to enable caching.
115 ///
116 /// By default this filter does nothing which effectively disables caching.
117 // Ideally only session.cache should be modified, TODO: reflect that in this interface
118 fn request_cache_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> Result<()> {
119 Ok(())
120 }
121
122 /// This callback generates the cache key
123 ///
124 /// This callback is called only when cache is enabled for this request
125 ///
126 /// By default this callback returns a default cache key generated from the request.
127 fn cache_key_callback(&self, session: &Session, _ctx: &mut Self::CTX) -> Result<CacheKey> {
128 let req_header = session.req_header();
129 Ok(CacheKey::default(req_header))
130 }
131
132 /// This callback is invoked when a cacheable response is ready to be admitted to cache
133 fn cache_miss(&self, session: &mut Session, _ctx: &mut Self::CTX) {
134 session.cache.cache_miss();
135 }
136
137 /// This filter is called after a successful cache lookup and before the
138 /// cache asset is ready to be used.
139 ///
140 /// This filter allows the user to log or force invalidate the asset, or
141 /// to adjust the body reader associated with the cache hit.
142 /// This also runs on stale hit assets (for which `is_fresh` is false).
143 ///
144 /// The value returned indicates if the force invalidation should be used,
145 /// and which kind. Returning `None` indicates no forced invalidation
146 async fn cache_hit_filter(
147 &self,
148 _session: &mut Session,
149 _meta: &CacheMeta,
150 _hit_handler: &mut HitHandler,
151 _is_fresh: bool,
152 _ctx: &mut Self::CTX,
153 ) -> Result<Option<ForcedInvalidationKind>>
154 where
155 Self::CTX: Send + Sync,
156 {
157 Ok(None)
158 }
159
160 /// Decide if a request should continue to upstream after not being served from cache.
161 ///
162 /// returns: Ok(true) if the request should continue, Ok(false) if a response was written by the
163 /// callback and the session should be finished, or an error
164 ///
165 /// This filter can be used for deferring checks like rate limiting or access control to when they
166 /// actually needed after cache miss.
167 ///
168 /// By default the session will attempt to be reused after returning Ok(false). It is the
169 /// caller's responsibility to disable keepalive or drain the request body if needed.
170 async fn proxy_upstream_filter(
171 &self,
172 _session: &mut Session,
173 _ctx: &mut Self::CTX,
174 ) -> Result<bool>
175 where
176 Self::CTX: Send + Sync,
177 {
178 Ok(true)
179 }
180
181 /// Decide if the response is cacheable
182 fn response_cache_filter(
183 &self,
184 _session: &Session,
185 _resp: &ResponseHeader,
186 _ctx: &mut Self::CTX,
187 ) -> Result<RespCacheable> {
188 Ok(Uncacheable(NoCacheReason::Custom("default")))
189 }
190
191 /// Decide how to generate cache vary key from both request and response
192 ///
193 /// None means no variance is needed.
194 fn cache_vary_filter(
195 &self,
196 _meta: &CacheMeta,
197 _ctx: &mut Self::CTX,
198 _req: &RequestHeader,
199 ) -> Option<HashBinary> {
200 // default to None for now to disable vary feature
201 None
202 }
203
204 /// Decide if the incoming request's condition _fails_ against the cached response.
205 ///
206 /// Returning `Ok(true)` means that the response does _not_ match against the condition, and
207 /// that the proxy can return `304 Not Modified` downstream.
208 ///
209 /// An example is a conditional GET request with `If-None-Match: "foobar"`. If the cached
210 /// response contains the `ETag: "foobar"`, then the condition fails, and `304 Not Modified`
211 /// should be returned. Else, the condition passes which means the full `200 OK` response must
212 /// be sent.
213 fn cache_not_modified_filter(
214 &self,
215 session: &Session,
216 resp: &ResponseHeader,
217 _ctx: &mut Self::CTX,
218 ) -> Result<bool> {
219 Ok(
220 pingora_core::protocols::http::conditional_filter::not_modified_filter(
221 session.req_header(),
222 resp,
223 ),
224 )
225 }
226
227 /// This filter is called when cache is enabled to determine what byte range to return (in both
228 /// cache hit and miss cases) from the response body. It is only used when caching is enabled,
229 /// otherwise the upstream is responsible for any filtering. It allows users to define the range
230 /// this request is for via its return type `range_filter::RangeType`.
231 ///
232 /// It also allow users to modify the response header accordingly.
233 ///
234 /// The default implementation can handle a single-range as per [RFC7232].
235 ///
236 /// [RFC7232]: https://www.rfc-editor.org/rfc/rfc7232
237 fn range_header_filter(
238 &self,
239 session: &mut Session,
240 resp: &mut ResponseHeader,
241 _ctx: &mut Self::CTX,
242 ) -> range_filter::RangeType {
243 proxy_cache::range_filter::range_header_filter(session.req_header(), resp)
244 }
245
246 /// Modify the request before it is sent to the upstream
247 ///
248 /// Unlike [Self::request_filter()], this filter allows to change the request headers to send
249 /// to the upstream.
250 async fn upstream_request_filter(
251 &self,
252 _session: &mut Session,
253 _upstream_request: &mut RequestHeader,
254 _ctx: &mut Self::CTX,
255 ) -> Result<()>
256 where
257 Self::CTX: Send + Sync,
258 {
259 Ok(())
260 }
261
262 /// Modify the response header from the upstream
263 ///
264 /// The modification is before caching, so any change here will be stored in the cache if enabled.
265 ///
266 /// Responses served from cache won't trigger this filter. If the cache needed revalidation,
267 /// only the 304 from upstream will trigger the filter (though it will be merged into the
268 /// cached header, not served directly to downstream).
269 fn upstream_response_filter(
270 &self,
271 _session: &mut Session,
272 _upstream_response: &mut ResponseHeader,
273 _ctx: &mut Self::CTX,
274 ) -> Result<()> {
275 Ok(())
276 }
277
278 /// Modify the response header before it is send to the downstream
279 ///
280 /// The modification is after caching. This filter is called for all responses including
281 /// responses served from cache.
282 async fn response_filter(
283 &self,
284 _session: &mut Session,
285 _upstream_response: &mut ResponseHeader,
286 _ctx: &mut Self::CTX,
287 ) -> Result<()>
288 where
289 Self::CTX: Send + Sync,
290 {
291 Ok(())
292 }
293
294 /// Similar to [Self::upstream_response_filter()] but for response body
295 ///
296 /// This function will be called every time a piece of response body is received. The `body` is
297 /// **not the entire response body**.
298 fn upstream_response_body_filter(
299 &self,
300 _session: &mut Session,
301 _body: &mut Option<Bytes>,
302 _end_of_stream: bool,
303 _ctx: &mut Self::CTX,
304 ) -> Result<()> {
305 Ok(())
306 }
307
308 /// Similar to [Self::upstream_response_filter()] but for response trailers
309 fn upstream_response_trailer_filter(
310 &self,
311 _session: &mut Session,
312 _upstream_trailers: &mut header::HeaderMap,
313 _ctx: &mut Self::CTX,
314 ) -> Result<()> {
315 Ok(())
316 }
317
318 /// Similar to [Self::response_filter()] but for response body chunks
319 fn response_body_filter(
320 &self,
321 _session: &mut Session,
322 _body: &mut Option<Bytes>,
323 _end_of_stream: bool,
324 _ctx: &mut Self::CTX,
325 ) -> Result<Option<Duration>>
326 where
327 Self::CTX: Send + Sync,
328 {
329 Ok(None)
330 }
331
332 /// Similar to [Self::response_filter()] but for response trailers.
333 /// Note, returning an Ok(Some(Bytes)) will result in the downstream response
334 /// trailers being written to the response body.
335 ///
336 /// TODO: make this interface more intuitive
337 async fn response_trailer_filter(
338 &self,
339 _session: &mut Session,
340 _upstream_trailers: &mut header::HeaderMap,
341 _ctx: &mut Self::CTX,
342 ) -> Result<Option<Bytes>>
343 where
344 Self::CTX: Send + Sync,
345 {
346 Ok(None)
347 }
348
349 /// This filter is called when the entire response is sent to the downstream successfully or
350 /// there is a fatal error that terminate the request.
351 ///
352 /// An error log is already emitted if there is any error. This phase is used for collecting
353 /// metrics and sending access logs.
354 async fn logging(&self, _session: &mut Session, _e: Option<&Error>, _ctx: &mut Self::CTX)
355 where
356 Self::CTX: Send + Sync,
357 {
358 }
359
360 /// A value of true means that the log message will be suppressed. The default value is false.
361 fn suppress_error_log(&self, _session: &Session, _ctx: &Self::CTX, _error: &Error) -> bool {
362 false
363 }
364
365 /// This filter is called when there is an error **after** a connection is established (or reused)
366 /// to the upstream.
367 fn error_while_proxy(
368 &self,
369 peer: &HttpPeer,
370 session: &mut Session,
371 e: Box<Error>,
372 _ctx: &mut Self::CTX,
373 client_reused: bool,
374 ) -> Box<Error> {
375 let mut e = e.more_context(format!("Peer: {}", peer));
376 // only reused client connections where retry buffer is not truncated
377 e.retry
378 .decide_reuse(client_reused && !session.as_ref().retry_buffer_truncated());
379 e
380 }
381
382 /// This filter is called when there is an error in the process of establishing a connection
383 /// to the upstream.
384 ///
385 /// In this filter the user can decide whether the error is retry-able by marking the error `e`.
386 ///
387 /// If the error can be retried, [Self::upstream_peer()] will be called again so that the user
388 /// can decide whether to send the request to the same upstream or another upstream that is possibly
389 /// available.
390 fn fail_to_connect(
391 &self,
392 _session: &mut Session,
393 _peer: &HttpPeer,
394 _ctx: &mut Self::CTX,
395 e: Box<Error>,
396 ) -> Box<Error> {
397 e
398 }
399
400 /// This filter is called when the request encounters a fatal error.
401 ///
402 /// Users may write an error response to the downstream if the downstream is still writable.
403 ///
404 /// The response status code of the error response may be returned for logging purposes.
405 /// Additionally, the user can return whether this session may be reused in spite of the error.
406 /// Today this reuse status is only respected for errors that occur prior to upstream peer
407 /// selection, and the keepalive configured on the `Session` itself still takes precedent.
408 async fn fail_to_proxy(
409 &self,
410 session: &mut Session,
411 e: &Error,
412 _ctx: &mut Self::CTX,
413 ) -> FailToProxy
414 where
415 Self::CTX: Send + Sync,
416 {
417 let code = match e.etype() {
418 HTTPStatus(code) => *code,
419 _ => {
420 match e.esource() {
421 ErrorSource::Upstream => 502,
422 ErrorSource::Downstream => {
423 match e.etype() {
424 WriteError | ReadError | ConnectionClosed => {
425 /* conn already dead */
426 0
427 }
428 _ => 400,
429 }
430 }
431 ErrorSource::Internal | ErrorSource::Unset => 500,
432 }
433 }
434 };
435 if code > 0 {
436 session.respond_error(code).await.unwrap_or_else(|e| {
437 error!("failed to send error response to downstream: {e}");
438 });
439 }
440
441 FailToProxy {
442 error_code: code,
443 // default to no reuse, which is safest
444 can_reuse_downstream: false,
445 }
446 }
447
448 /// Decide whether should serve stale when encountering an error or during revalidation
449 ///
450 /// An implementation should follow
451 /// <https://datatracker.ietf.org/doc/html/rfc9111#section-4.2.4>
452 /// <https://www.rfc-editor.org/rfc/rfc5861#section-4>
453 ///
454 /// This filter is only called if cache is enabled.
455 // 5xx HTTP status will be encoded as ErrorType::HTTPStatus(code)
456 fn should_serve_stale(
457 &self,
458 _session: &mut Session,
459 _ctx: &mut Self::CTX,
460 error: Option<&Error>, // None when it is called during stale while revalidate
461 ) -> bool {
462 // A cache MUST NOT generate a stale response unless
463 // it is disconnected
464 // or doing so is explicitly permitted by the client or origin server
465 // (e.g. headers or an out-of-band contract)
466 error.is_some_and(|e| e.esource() == &ErrorSource::Upstream)
467 }
468
469 /// This filter is called when the request just established or reused a connection to the upstream
470 ///
471 /// This filter allows user to log timing and connection related info.
472 async fn connected_to_upstream(
473 &self,
474 _session: &mut Session,
475 _reused: bool,
476 _peer: &HttpPeer,
477 #[cfg(unix)] _fd: std::os::unix::io::RawFd,
478 #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
479 _digest: Option<&Digest>,
480 _ctx: &mut Self::CTX,
481 ) -> Result<()>
482 where
483 Self::CTX: Send + Sync,
484 {
485 Ok(())
486 }
487
488 /// This callback is invoked every time request related error log needs to be generated
489 ///
490 /// Users can define what is important to be written about this request via the returned string.
491 fn request_summary(&self, session: &Session, _ctx: &Self::CTX) -> String {
492 session.as_ref().request_summary()
493 }
494
495 /// Whether the request should be used to invalidate(delete) the HTTP cache
496 ///
497 /// - `true`: this request will be used to invalidate the cache.
498 /// - `false`: this request is a treated as a normal request
499 fn is_purge(&self, _session: &Session, _ctx: &Self::CTX) -> bool {
500 false
501 }
502
503 /// This filter is called after the proxy cache generates the downstream response to the purge
504 /// request (to invalidate or delete from the HTTP cache), based on the purge status, which
505 /// indicates whether the request succeeded or failed.
506 ///
507 /// The filter allows the user to modify or replace the generated downstream response.
508 /// If the filter returns `Err`, the proxy will instead send a 500 response.
509 fn purge_response_filter(
510 &self,
511 _session: &Session,
512 _ctx: &mut Self::CTX,
513 _purge_status: PurgeStatus,
514 _purge_response: &mut std::borrow::Cow<'static, ResponseHeader>,
515 ) -> Result<()> {
516 Ok(())
517 }
518}
519
520/// Context struct returned by `fail_to_proxy`.
521pub struct FailToProxy {
522 pub error_code: u16,
523 pub can_reuse_downstream: bool,
524}