pingora_proxy/proxy_trait.rs
1// Copyright 2024 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::*;
16use pingora_cache::{key::HashBinary, CacheKey, CacheMeta, RespCacheable, RespCacheable::*};
17use std::time::Duration;
18
19/// The interface to control the HTTP proxy
20///
21/// The methods in [ProxyHttp] are filters/callbacks which will be performed on all requests at their
22/// particular stage (if applicable).
23///
24/// If any of the filters returns [Result::Err], the request will fail, and the error will be logged.
25#[cfg_attr(not(doc_async_trait), async_trait)]
26pub trait ProxyHttp {
27 /// The per request object to share state across the different filters
28 type CTX;
29
30 /// Define how the `ctx` should be created.
31 fn new_ctx(&self) -> Self::CTX;
32
33 /// Define where the proxy should send the request to.
34 ///
35 /// The returned [HttpPeer] contains the information regarding where and how this request should
36 /// be forwarded to.
37 async fn upstream_peer(
38 &self,
39 session: &mut Session,
40 ctx: &mut Self::CTX,
41 ) -> Result<Box<HttpPeer>>;
42
43 /// Set up downstream modules.
44 ///
45 /// In this phase, users can add or configure [HttpModules] before the server starts up.
46 ///
47 /// In the default implementation of this method, [ResponseCompressionBuilder] is added
48 /// and disabled.
49 fn init_downstream_modules(&self, modules: &mut HttpModules) {
50 // Add disabled downstream compression module by default
51 modules.add_module(ResponseCompressionBuilder::enable(0));
52 }
53
54 /// Handle the incoming request.
55 ///
56 /// In this phase, users can parse, validate, rate limit, perform access control and/or
57 /// return a response for this request.
58 ///
59 /// If the user already sent a response to this request, an `Ok(true)` should be returned so that
60 /// the proxy would exit. The proxy continues to the next phases when `Ok(false)` is returned.
61 ///
62 /// By default this filter does nothing and returns `Ok(false)`.
63 async fn request_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool>
64 where
65 Self::CTX: Send + Sync,
66 {
67 Ok(false)
68 }
69
70 /// Handle the incoming request before any downstream module is executed.
71 ///
72 /// This function is similar to [Self::request_filter()] but executes before any other logic,
73 /// including downstream module logic. The main purpose of this function is to provide finer
74 /// grained control of the behavior of the modules.
75 ///
76 /// Note that because this function is executed before any module that might provide access
77 /// control or rate limiting, logic should stay in request_filter() if it can in order to be
78 /// protected by said modules.
79 async fn early_request_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> Result<()>
80 where
81 Self::CTX: Send + Sync,
82 {
83 Ok(())
84 }
85
86 /// Handle the incoming request body.
87 ///
88 /// This function will be called every time a piece of request body is received. The `body` is
89 /// **not the entire request body**.
90 ///
91 /// The async nature of this function allows to throttle the upload speed and/or executing
92 /// heavy computation logic such as WAF rules on offloaded threads without blocking the threads
93 /// who process the requests themselves.
94 async fn request_body_filter(
95 &self,
96 _session: &mut Session,
97 _body: &mut Option<Bytes>,
98 _end_of_stream: bool,
99 _ctx: &mut Self::CTX,
100 ) -> Result<()>
101 where
102 Self::CTX: Send + Sync,
103 {
104 Ok(())
105 }
106
107 /// This filter decides if the request is cacheable and what cache backend to use
108 ///
109 /// The caller can interact with `Session.cache` to enable caching.
110 ///
111 /// By default this filter does nothing which effectively disables caching.
112 // Ideally only session.cache should be modified, TODO: reflect that in this interface
113 fn request_cache_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> Result<()> {
114 Ok(())
115 }
116
117 /// This callback generates the cache key
118 ///
119 /// This callback is called only when cache is enabled for this request
120 ///
121 /// By default this callback returns a default cache key generated from the request.
122 fn cache_key_callback(&self, session: &Session, _ctx: &mut Self::CTX) -> Result<CacheKey> {
123 let req_header = session.req_header();
124 Ok(CacheKey::default(req_header))
125 }
126
127 /// This callback is invoked when a cacheable response is ready to be admitted to cache
128 fn cache_miss(&self, session: &mut Session, _ctx: &mut Self::CTX) {
129 session.cache.cache_miss();
130 }
131
132 /// This filter is called after a successful cache lookup and before the cache asset is ready to
133 /// be used.
134 ///
135 /// This filter allow the user to log or force expire the asset.
136 // flex purge, other filtering, returns whether asset is should be force expired or not
137 async fn cache_hit_filter(
138 &self,
139 _session: &Session,
140 _meta: &CacheMeta,
141 _ctx: &mut Self::CTX,
142 ) -> Result<bool>
143 where
144 Self::CTX: Send + Sync,
145 {
146 Ok(false)
147 }
148
149 /// Decide if a request should continue to upstream after not being served from cache.
150 ///
151 /// returns: Ok(true) if the request should continue, Ok(false) if a response was written by the
152 /// callback and the session should be finished, or an error
153 ///
154 /// This filter can be used for deferring checks like rate limiting or access control to when they
155 /// actually needed after cache miss.
156 async fn proxy_upstream_filter(
157 &self,
158 _session: &mut Session,
159 _ctx: &mut Self::CTX,
160 ) -> Result<bool>
161 where
162 Self::CTX: Send + Sync,
163 {
164 Ok(true)
165 }
166
167 /// Decide if the response is cacheable
168 fn response_cache_filter(
169 &self,
170 _session: &Session,
171 _resp: &ResponseHeader,
172 _ctx: &mut Self::CTX,
173 ) -> Result<RespCacheable> {
174 Ok(Uncacheable(NoCacheReason::Custom("default")))
175 }
176
177 /// Decide how to generate cache vary key from both request and response
178 ///
179 /// None means no variance is needed.
180 fn cache_vary_filter(
181 &self,
182 _meta: &CacheMeta,
183 _ctx: &mut Self::CTX,
184 _req: &RequestHeader,
185 ) -> Option<HashBinary> {
186 // default to None for now to disable vary feature
187 None
188 }
189
190 /// Decide if the incoming request's condition _fails_ against the cached response.
191 ///
192 /// Returning `Ok(true)` means that the response does _not_ match against the condition, and
193 /// that the proxy can return `304 Not Modified` downstream.
194 ///
195 /// An example is a conditional GET request with `If-None-Match: "foobar"`. If the cached
196 /// response contains the `ETag: "foobar"`, then the condition fails, and `304 Not Modified`
197 /// should be returned. Else, the condition passes which means the full `200 OK` response must
198 /// be sent.
199 fn cache_not_modified_filter(
200 &self,
201 session: &Session,
202 resp: &ResponseHeader,
203 _ctx: &mut Self::CTX,
204 ) -> Result<bool> {
205 Ok(
206 pingora_core::protocols::http::conditional_filter::not_modified_filter(
207 session.req_header(),
208 resp,
209 ),
210 )
211 }
212
213 /// Modify the request before it is sent to the upstream
214 ///
215 /// Unlike [Self::request_filter()], this filter allows to change the request headers to send
216 /// to the upstream.
217 async fn upstream_request_filter(
218 &self,
219 _session: &mut Session,
220 _upstream_request: &mut RequestHeader,
221 _ctx: &mut Self::CTX,
222 ) -> Result<()>
223 where
224 Self::CTX: Send + Sync,
225 {
226 Ok(())
227 }
228
229 /// Modify the response header from the upstream
230 ///
231 /// The modification is before caching, so any change here will be stored in the cache if enabled.
232 ///
233 /// Responses served from cache won't trigger this filter. If the cache needed revalidation,
234 /// only the 304 from upstream will trigger the filter (though it will be merged into the
235 /// cached header, not served directly to downstream).
236 fn upstream_response_filter(
237 &self,
238 _session: &mut Session,
239 _upstream_response: &mut ResponseHeader,
240 _ctx: &mut Self::CTX,
241 ) {
242 }
243
244 /// Modify the response header before it is send to the downstream
245 ///
246 /// The modification is after caching. This filter is called for all responses including
247 /// responses served from cache.
248 async fn response_filter(
249 &self,
250 _session: &mut Session,
251 _upstream_response: &mut ResponseHeader,
252 _ctx: &mut Self::CTX,
253 ) -> Result<()>
254 where
255 Self::CTX: Send + Sync,
256 {
257 Ok(())
258 }
259
260 /// Similar to [Self::upstream_response_filter()] but for response body
261 ///
262 /// This function will be called every time a piece of response body is received. The `body` is
263 /// **not the entire response body**.
264 fn upstream_response_body_filter(
265 &self,
266 _session: &mut Session,
267 _body: &mut Option<Bytes>,
268 _end_of_stream: bool,
269 _ctx: &mut Self::CTX,
270 ) {
271 }
272
273 /// Similar to [Self::upstream_response_filter()] but for response trailers
274 fn upstream_response_trailer_filter(
275 &self,
276 _session: &mut Session,
277 _upstream_trailers: &mut header::HeaderMap,
278 _ctx: &mut Self::CTX,
279 ) -> Result<()> {
280 Ok(())
281 }
282
283 /// Similar to [Self::response_filter()] but for response body chunks
284 fn response_body_filter(
285 &self,
286 _session: &mut Session,
287 _body: &mut Option<Bytes>,
288 _end_of_stream: bool,
289 _ctx: &mut Self::CTX,
290 ) -> Result<Option<Duration>>
291 where
292 Self::CTX: Send + Sync,
293 {
294 Ok(None)
295 }
296
297 /// Similar to [Self::response_filter()] but for response trailers.
298 /// Note, returning an Ok(Some(Bytes)) will result in the downstream response
299 /// trailers being written to the response body.
300 ///
301 /// TODO: make this interface more intuitive
302 async fn response_trailer_filter(
303 &self,
304 _session: &mut Session,
305 _upstream_trailers: &mut header::HeaderMap,
306 _ctx: &mut Self::CTX,
307 ) -> Result<Option<Bytes>>
308 where
309 Self::CTX: Send + Sync,
310 {
311 Ok(None)
312 }
313
314 /// This filter is called when the entire response is sent to the downstream successfully or
315 /// there is a fatal error that terminate the request.
316 ///
317 /// An error log is already emitted if there is any error. This phase is used for collecting
318 /// metrics and sending access logs.
319 async fn logging(&self, _session: &mut Session, _e: Option<&Error>, _ctx: &mut Self::CTX)
320 where
321 Self::CTX: Send + Sync,
322 {
323 }
324
325 /// A value of true means that the log message will be suppressed. The default value is false.
326 fn suppress_error_log(&self, _session: &Session, _ctx: &Self::CTX, _error: &Error) -> bool {
327 false
328 }
329
330 /// This filter is called when there is an error **after** a connection is established (or reused)
331 /// to the upstream.
332 fn error_while_proxy(
333 &self,
334 peer: &HttpPeer,
335 session: &mut Session,
336 e: Box<Error>,
337 _ctx: &mut Self::CTX,
338 client_reused: bool,
339 ) -> Box<Error> {
340 let mut e = e.more_context(format!("Peer: {}", peer));
341 // only reused client connections where retry buffer is not truncated
342 e.retry
343 .decide_reuse(client_reused && !session.as_ref().retry_buffer_truncated());
344 e
345 }
346
347 /// This filter is called when there is an error in the process of establishing a connection
348 /// to the upstream.
349 ///
350 /// In this filter the user can decide whether the error is retry-able by marking the error `e`.
351 ///
352 /// If the error can be retried, [Self::upstream_peer()] will be called again so that the user
353 /// can decide whether to send the request to the same upstream or another upstream that is possibly
354 /// available.
355 fn fail_to_connect(
356 &self,
357 _session: &mut Session,
358 _peer: &HttpPeer,
359 _ctx: &mut Self::CTX,
360 e: Box<Error>,
361 ) -> Box<Error> {
362 e
363 }
364
365 /// This filter is called when the request encounters a fatal error.
366 ///
367 /// Users may write an error response to the downstream if the downstream is still writable.
368 ///
369 /// The response status code of the error response maybe returned for logging purpose.
370 async fn fail_to_proxy(&self, session: &mut Session, e: &Error, _ctx: &mut Self::CTX) -> u16
371 where
372 Self::CTX: Send + Sync,
373 {
374 let server_session = session.as_mut();
375 let code = match e.etype() {
376 HTTPStatus(code) => *code,
377 _ => {
378 match e.esource() {
379 ErrorSource::Upstream => 502,
380 ErrorSource::Downstream => {
381 match e.etype() {
382 WriteError | ReadError | ConnectionClosed => {
383 /* conn already dead */
384 0
385 }
386 _ => 400,
387 }
388 }
389 ErrorSource::Internal | ErrorSource::Unset => 500,
390 }
391 }
392 };
393 if code > 0 {
394 server_session.respond_error(code).await
395 }
396 code
397 }
398
399 /// Decide whether should serve stale when encountering an error or during revalidation
400 ///
401 /// An implementation should follow
402 /// <https://datatracker.ietf.org/doc/html/rfc9111#section-4.2.4>
403 /// <https://www.rfc-editor.org/rfc/rfc5861#section-4>
404 ///
405 /// This filter is only called if cache is enabled.
406 // 5xx HTTP status will be encoded as ErrorType::HTTPStatus(code)
407 fn should_serve_stale(
408 &self,
409 _session: &mut Session,
410 _ctx: &mut Self::CTX,
411 error: Option<&Error>, // None when it is called during stale while revalidate
412 ) -> bool {
413 // A cache MUST NOT generate a stale response unless
414 // it is disconnected
415 // or doing so is explicitly permitted by the client or origin server
416 // (e.g. headers or an out-of-band contract)
417 error.map_or(false, |e| e.esource() == &ErrorSource::Upstream)
418 }
419
420 /// This filter is called when the request just established or reused a connection to the upstream
421 ///
422 /// This filter allows user to log timing and connection related info.
423 async fn connected_to_upstream(
424 &self,
425 _session: &mut Session,
426 _reused: bool,
427 _peer: &HttpPeer,
428 #[cfg(unix)] _fd: std::os::unix::io::RawFd,
429 #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
430 _digest: Option<&Digest>,
431 _ctx: &mut Self::CTX,
432 ) -> Result<()>
433 where
434 Self::CTX: Send + Sync,
435 {
436 Ok(())
437 }
438
439 /// This callback is invoked every time request related error log needs to be generated
440 ///
441 /// Users can define what is important to be written about this request via the returned string.
442 fn request_summary(&self, session: &Session, _ctx: &Self::CTX) -> String {
443 session.as_ref().request_summary()
444 }
445
446 /// Whether the request should be used to invalidate(delete) the HTTP cache
447 ///
448 /// - `true`: this request will be used to invalidate the cache.
449 /// - `false`: this request is a treated as a normal request
450 fn is_purge(&self, _session: &Session, _ctx: &Self::CTX) -> bool {
451 false
452 }
453
454 /// This filter is called after the proxy cache generates the downstream response to the purge
455 /// request (to invalidate or delete from the HTTP cache), based on the purge status, which
456 /// indicates whether the request succeeded or failed.
457 ///
458 /// The filter allows the user to modify or replace the generated downstream response.
459 /// If the filter returns `Err`, the proxy will instead send a 500 response.
460 fn purge_response_filter(
461 &self,
462 _session: &Session,
463 _ctx: &mut Self::CTX,
464 _purge_status: PurgeStatus,
465 _purge_response: &mut std::borrow::Cow<'static, ResponseHeader>,
466 ) -> Result<()> {
467 Ok(())
468 }
469}