pingora_proxy/proxy_trait.rs
1// Copyright 2025 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::*;
16use pingora_cache::{
17 key::HashBinary,
18 CacheKey, CacheMeta, ForcedInvalidationKind,
19 RespCacheable::{self, *},
20};
21use proxy_cache::range_filter::{self};
22use std::time::Duration;
23
24/// The interface to control the HTTP proxy
25///
26/// The methods in [ProxyHttp] are filters/callbacks which will be performed on all requests at their
27/// particular stage (if applicable).
28///
29/// If any of the filters returns [Result::Err], the request will fail, and the error will be logged.
30#[cfg_attr(not(doc_async_trait), async_trait)]
31pub trait ProxyHttp {
32 /// The per request object to share state across the different filters
33 type CTX;
34
35 /// Define how the `ctx` should be created.
36 fn new_ctx(&self) -> Self::CTX;
37
38 /// Define where the proxy should send the request to.
39 ///
40 /// The returned [HttpPeer] contains the information regarding where and how this request should
41 /// be forwarded to.
42 async fn upstream_peer(
43 &self,
44 session: &mut Session,
45 ctx: &mut Self::CTX,
46 ) -> Result<Box<HttpPeer>>;
47
48 /// Set up downstream modules.
49 ///
50 /// In this phase, users can add or configure [HttpModules] before the server starts up.
51 ///
52 /// In the default implementation of this method, [ResponseCompressionBuilder] is added
53 /// and disabled.
54 fn init_downstream_modules(&self, modules: &mut HttpModules) {
55 // Add disabled downstream compression module by default
56 modules.add_module(ResponseCompressionBuilder::enable(0));
57 }
58
59 /// Handle the incoming request.
60 ///
61 /// In this phase, users can parse, validate, rate limit, perform access control and/or
62 /// return a response for this request.
63 ///
64 /// If the user already sent a response to this request, an `Ok(true)` should be returned so that
65 /// the proxy would exit. The proxy continues to the next phases when `Ok(false)` is returned.
66 ///
67 /// By default this filter does nothing and returns `Ok(false)`.
68 async fn request_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool>
69 where
70 Self::CTX: Send + Sync,
71 {
72 Ok(false)
73 }
74
75 /// Handle the incoming request before any downstream module is executed.
76 ///
77 /// This function is similar to [Self::request_filter()] but executes before any other logic,
78 /// including downstream module logic. The main purpose of this function is to provide finer
79 /// grained control of the behavior of the modules.
80 ///
81 /// Note that because this function is executed before any module that might provide access
82 /// control or rate limiting, logic should stay in request_filter() if it can in order to be
83 /// protected by said modules.
84 async fn early_request_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> Result<()>
85 where
86 Self::CTX: Send + Sync,
87 {
88 Ok(())
89 }
90
91 /// Handle the incoming request body.
92 ///
93 /// This function will be called every time a piece of request body is received. The `body` is
94 /// **not the entire request body**.
95 ///
96 /// The async nature of this function allows to throttle the upload speed and/or executing
97 /// heavy computation logic such as WAF rules on offloaded threads without blocking the threads
98 /// who process the requests themselves.
99 async fn request_body_filter(
100 &self,
101 _session: &mut Session,
102 _body: &mut Option<Bytes>,
103 _end_of_stream: bool,
104 _ctx: &mut Self::CTX,
105 ) -> Result<()>
106 where
107 Self::CTX: Send + Sync,
108 {
109 Ok(())
110 }
111
112 /// This filter decides if the request is cacheable and what cache backend to use
113 ///
114 /// The caller can interact with `Session.cache` to enable caching.
115 ///
116 /// By default this filter does nothing which effectively disables caching.
117 // Ideally only session.cache should be modified, TODO: reflect that in this interface
118 fn request_cache_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> Result<()> {
119 Ok(())
120 }
121
122 /// This callback generates the cache key
123 ///
124 /// This callback is called only when cache is enabled for this request
125 ///
126 /// By default this callback returns a default cache key generated from the request.
127 fn cache_key_callback(&self, session: &Session, _ctx: &mut Self::CTX) -> Result<CacheKey> {
128 let req_header = session.req_header();
129 Ok(CacheKey::default(req_header))
130 }
131
132 /// This callback is invoked when a cacheable response is ready to be admitted to cache
133 fn cache_miss(&self, session: &mut Session, _ctx: &mut Self::CTX) {
134 session.cache.cache_miss();
135 }
136
137 /// This filter is called after a successful cache lookup and before the
138 /// cache asset is ready to be used.
139 ///
140 /// This filter allows the user to log or force invalidate the asset.
141 /// This also runs on stale hit assets (for which `is_fresh` is false).
142 ///
143 /// The value returned indicates if the force invalidation should be used,
144 /// and which kind. Returning `None` indicates no forced invalidation
145 async fn cache_hit_filter(
146 &self,
147 _session: &Session,
148 _meta: &CacheMeta,
149 _is_fresh: bool,
150 _ctx: &mut Self::CTX,
151 ) -> Result<Option<ForcedInvalidationKind>>
152 where
153 Self::CTX: Send + Sync,
154 {
155 Ok(None)
156 }
157
158 /// Decide if a request should continue to upstream after not being served from cache.
159 ///
160 /// returns: Ok(true) if the request should continue, Ok(false) if a response was written by the
161 /// callback and the session should be finished, or an error
162 ///
163 /// This filter can be used for deferring checks like rate limiting or access control to when they
164 /// actually needed after cache miss.
165 ///
166 /// By default the session will attempt to be reused after returning Ok(false). It is the
167 /// caller's responsibility to disable keepalive or drain the request body if needed.
168 async fn proxy_upstream_filter(
169 &self,
170 _session: &mut Session,
171 _ctx: &mut Self::CTX,
172 ) -> Result<bool>
173 where
174 Self::CTX: Send + Sync,
175 {
176 Ok(true)
177 }
178
179 /// Decide if the response is cacheable
180 fn response_cache_filter(
181 &self,
182 _session: &Session,
183 _resp: &ResponseHeader,
184 _ctx: &mut Self::CTX,
185 ) -> Result<RespCacheable> {
186 Ok(Uncacheable(NoCacheReason::Custom("default")))
187 }
188
189 /// Decide how to generate cache vary key from both request and response
190 ///
191 /// None means no variance is needed.
192 fn cache_vary_filter(
193 &self,
194 _meta: &CacheMeta,
195 _ctx: &mut Self::CTX,
196 _req: &RequestHeader,
197 ) -> Option<HashBinary> {
198 // default to None for now to disable vary feature
199 None
200 }
201
202 /// Decide if the incoming request's condition _fails_ against the cached response.
203 ///
204 /// Returning `Ok(true)` means that the response does _not_ match against the condition, and
205 /// that the proxy can return `304 Not Modified` downstream.
206 ///
207 /// An example is a conditional GET request with `If-None-Match: "foobar"`. If the cached
208 /// response contains the `ETag: "foobar"`, then the condition fails, and `304 Not Modified`
209 /// should be returned. Else, the condition passes which means the full `200 OK` response must
210 /// be sent.
211 fn cache_not_modified_filter(
212 &self,
213 session: &Session,
214 resp: &ResponseHeader,
215 _ctx: &mut Self::CTX,
216 ) -> Result<bool> {
217 Ok(
218 pingora_core::protocols::http::conditional_filter::not_modified_filter(
219 session.req_header(),
220 resp,
221 ),
222 )
223 }
224
225 /// This filter is called when cache is enabled to determine what byte range to return (in both
226 /// cache hit and miss cases) from the response body. It is only used when caching is enabled,
227 /// otherwise the upstream is responsible for any filtering. It allows users to define the range
228 /// this request is for via its return type `range_filter::RangeType`.
229 ///
230 /// It also allow users to modify the response header accordingly.
231 ///
232 /// The default implementation can handle a single-range as per [RFC7232].
233 ///
234 /// [RFC7232]: https://www.rfc-editor.org/rfc/rfc7232
235 fn range_header_filter(
236 &self,
237 req: &RequestHeader,
238 resp: &mut ResponseHeader,
239 _ctx: &mut Self::CTX,
240 ) -> range_filter::RangeType {
241 proxy_cache::range_filter::range_header_filter(req, resp)
242 }
243
244 /// Modify the request before it is sent to the upstream
245 ///
246 /// Unlike [Self::request_filter()], this filter allows to change the request headers to send
247 /// to the upstream.
248 async fn upstream_request_filter(
249 &self,
250 _session: &mut Session,
251 _upstream_request: &mut RequestHeader,
252 _ctx: &mut Self::CTX,
253 ) -> Result<()>
254 where
255 Self::CTX: Send + Sync,
256 {
257 Ok(())
258 }
259
260 /// Modify the response header from the upstream
261 ///
262 /// The modification is before caching, so any change here will be stored in the cache if enabled.
263 ///
264 /// Responses served from cache won't trigger this filter. If the cache needed revalidation,
265 /// only the 304 from upstream will trigger the filter (though it will be merged into the
266 /// cached header, not served directly to downstream).
267 fn upstream_response_filter(
268 &self,
269 _session: &mut Session,
270 _upstream_response: &mut ResponseHeader,
271 _ctx: &mut Self::CTX,
272 ) -> Result<()> {
273 Ok(())
274 }
275
276 /// Modify the response header before it is send to the downstream
277 ///
278 /// The modification is after caching. This filter is called for all responses including
279 /// responses served from cache.
280 async fn response_filter(
281 &self,
282 _session: &mut Session,
283 _upstream_response: &mut ResponseHeader,
284 _ctx: &mut Self::CTX,
285 ) -> Result<()>
286 where
287 Self::CTX: Send + Sync,
288 {
289 Ok(())
290 }
291
292 /// Similar to [Self::upstream_response_filter()] but for response body
293 ///
294 /// This function will be called every time a piece of response body is received. The `body` is
295 /// **not the entire response body**.
296 fn upstream_response_body_filter(
297 &self,
298 _session: &mut Session,
299 _body: &mut Option<Bytes>,
300 _end_of_stream: bool,
301 _ctx: &mut Self::CTX,
302 ) -> Result<()> {
303 Ok(())
304 }
305
306 /// Similar to [Self::upstream_response_filter()] but for response trailers
307 fn upstream_response_trailer_filter(
308 &self,
309 _session: &mut Session,
310 _upstream_trailers: &mut header::HeaderMap,
311 _ctx: &mut Self::CTX,
312 ) -> Result<()> {
313 Ok(())
314 }
315
316 /// Similar to [Self::response_filter()] but for response body chunks
317 fn response_body_filter(
318 &self,
319 _session: &mut Session,
320 _body: &mut Option<Bytes>,
321 _end_of_stream: bool,
322 _ctx: &mut Self::CTX,
323 ) -> Result<Option<Duration>>
324 where
325 Self::CTX: Send + Sync,
326 {
327 Ok(None)
328 }
329
330 /// Similar to [Self::response_filter()] but for response trailers.
331 /// Note, returning an Ok(Some(Bytes)) will result in the downstream response
332 /// trailers being written to the response body.
333 ///
334 /// TODO: make this interface more intuitive
335 async fn response_trailer_filter(
336 &self,
337 _session: &mut Session,
338 _upstream_trailers: &mut header::HeaderMap,
339 _ctx: &mut Self::CTX,
340 ) -> Result<Option<Bytes>>
341 where
342 Self::CTX: Send + Sync,
343 {
344 Ok(None)
345 }
346
347 /// This filter is called when the entire response is sent to the downstream successfully or
348 /// there is a fatal error that terminate the request.
349 ///
350 /// An error log is already emitted if there is any error. This phase is used for collecting
351 /// metrics and sending access logs.
352 async fn logging(&self, _session: &mut Session, _e: Option<&Error>, _ctx: &mut Self::CTX)
353 where
354 Self::CTX: Send + Sync,
355 {
356 }
357
358 /// A value of true means that the log message will be suppressed. The default value is false.
359 fn suppress_error_log(&self, _session: &Session, _ctx: &Self::CTX, _error: &Error) -> bool {
360 false
361 }
362
363 /// This filter is called when there is an error **after** a connection is established (or reused)
364 /// to the upstream.
365 fn error_while_proxy(
366 &self,
367 peer: &HttpPeer,
368 session: &mut Session,
369 e: Box<Error>,
370 _ctx: &mut Self::CTX,
371 client_reused: bool,
372 ) -> Box<Error> {
373 let mut e = e.more_context(format!("Peer: {}", peer));
374 // only reused client connections where retry buffer is not truncated
375 e.retry
376 .decide_reuse(client_reused && !session.as_ref().retry_buffer_truncated());
377 e
378 }
379
380 /// This filter is called when there is an error in the process of establishing a connection
381 /// to the upstream.
382 ///
383 /// In this filter the user can decide whether the error is retry-able by marking the error `e`.
384 ///
385 /// If the error can be retried, [Self::upstream_peer()] will be called again so that the user
386 /// can decide whether to send the request to the same upstream or another upstream that is possibly
387 /// available.
388 fn fail_to_connect(
389 &self,
390 _session: &mut Session,
391 _peer: &HttpPeer,
392 _ctx: &mut Self::CTX,
393 e: Box<Error>,
394 ) -> Box<Error> {
395 e
396 }
397
398 /// This filter is called when the request encounters a fatal error.
399 ///
400 /// Users may write an error response to the downstream if the downstream is still writable.
401 ///
402 /// The response status code of the error response may be returned for logging purposes.
403 /// Additionally, the user can return whether this session may be reused in spite of the error.
404 /// Today this reuse status is only respected for errors that occur prior to upstream peer
405 /// selection, and the keepalive configured on the `Session` itself still takes precedent.
406 async fn fail_to_proxy(
407 &self,
408 session: &mut Session,
409 e: &Error,
410 _ctx: &mut Self::CTX,
411 ) -> FailToProxy
412 where
413 Self::CTX: Send + Sync,
414 {
415 let code = match e.etype() {
416 HTTPStatus(code) => *code,
417 _ => {
418 match e.esource() {
419 ErrorSource::Upstream => 502,
420 ErrorSource::Downstream => {
421 match e.etype() {
422 WriteError | ReadError | ConnectionClosed => {
423 /* conn already dead */
424 0
425 }
426 _ => 400,
427 }
428 }
429 ErrorSource::Internal | ErrorSource::Unset => 500,
430 }
431 }
432 };
433 if code > 0 {
434 session.respond_error(code).await.unwrap_or_else(|e| {
435 error!("failed to send error response to downstream: {e}");
436 });
437 }
438
439 FailToProxy {
440 error_code: code,
441 // default to no reuse, which is safest
442 can_reuse_downstream: false,
443 }
444 }
445
446 /// Decide whether should serve stale when encountering an error or during revalidation
447 ///
448 /// An implementation should follow
449 /// <https://datatracker.ietf.org/doc/html/rfc9111#section-4.2.4>
450 /// <https://www.rfc-editor.org/rfc/rfc5861#section-4>
451 ///
452 /// This filter is only called if cache is enabled.
453 // 5xx HTTP status will be encoded as ErrorType::HTTPStatus(code)
454 fn should_serve_stale(
455 &self,
456 _session: &mut Session,
457 _ctx: &mut Self::CTX,
458 error: Option<&Error>, // None when it is called during stale while revalidate
459 ) -> bool {
460 // A cache MUST NOT generate a stale response unless
461 // it is disconnected
462 // or doing so is explicitly permitted by the client or origin server
463 // (e.g. headers or an out-of-band contract)
464 error.is_some_and(|e| e.esource() == &ErrorSource::Upstream)
465 }
466
467 /// This filter is called when the request just established or reused a connection to the upstream
468 ///
469 /// This filter allows user to log timing and connection related info.
470 async fn connected_to_upstream(
471 &self,
472 _session: &mut Session,
473 _reused: bool,
474 _peer: &HttpPeer,
475 #[cfg(unix)] _fd: std::os::unix::io::RawFd,
476 #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
477 _digest: Option<&Digest>,
478 _ctx: &mut Self::CTX,
479 ) -> Result<()>
480 where
481 Self::CTX: Send + Sync,
482 {
483 Ok(())
484 }
485
486 /// This callback is invoked every time request related error log needs to be generated
487 ///
488 /// Users can define what is important to be written about this request via the returned string.
489 fn request_summary(&self, session: &Session, _ctx: &Self::CTX) -> String {
490 session.as_ref().request_summary()
491 }
492
493 /// Whether the request should be used to invalidate(delete) the HTTP cache
494 ///
495 /// - `true`: this request will be used to invalidate the cache.
496 /// - `false`: this request is a treated as a normal request
497 fn is_purge(&self, _session: &Session, _ctx: &Self::CTX) -> bool {
498 false
499 }
500
501 /// This filter is called after the proxy cache generates the downstream response to the purge
502 /// request (to invalidate or delete from the HTTP cache), based on the purge status, which
503 /// indicates whether the request succeeded or failed.
504 ///
505 /// The filter allows the user to modify or replace the generated downstream response.
506 /// If the filter returns `Err`, the proxy will instead send a 500 response.
507 fn purge_response_filter(
508 &self,
509 _session: &Session,
510 _ctx: &mut Self::CTX,
511 _purge_status: PurgeStatus,
512 _purge_response: &mut std::borrow::Cow<'static, ResponseHeader>,
513 ) -> Result<()> {
514 Ok(())
515 }
516}
517
518/// Context struct returned by `fail_to_proxy`.
519pub struct FailToProxy {
520 pub error_code: u16,
521 pub can_reuse_downstream: bool,
522}