pingora_proxy/proxy_cache.rs
1// Copyright 2025 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::*;
16use http::{Method, StatusCode};
17use pingora_cache::key::CacheHashKey;
18use pingora_cache::lock::LockStatus;
19use pingora_cache::max_file_size::ERR_RESPONSE_TOO_LARGE;
20use pingora_cache::{ForcedInvalidationKind, HitStatus, RespCacheable::*};
21use pingora_core::protocols::http::conditional_filter::to_304;
22use pingora_core::protocols::http::v1::common::header_value_content_length;
23use pingora_core::ErrorType;
24use range_filter::RangeBodyFilter;
25use std::time::SystemTime;
26
27impl<SV> HttpProxy<SV> {
28 // return bool: server_session can be reused, and error if any
29 pub(crate) async fn proxy_cache(
30 self: &Arc<Self>,
31 session: &mut Session,
32 ctx: &mut SV::CTX,
33 ) -> Option<(bool, Option<Box<Error>>)>
34 // None: continue to proxy, Some: return
35 where
36 SV: ProxyHttp + Send + Sync + 'static,
37 SV::CTX: Send + Sync,
38 {
39 // Cache logic request phase
40 if let Err(e) = self.inner.request_cache_filter(session, ctx) {
41 // TODO: handle this error
42 warn!(
43 "Fail to request_cache_filter: {e}, {}",
44 self.inner.request_summary(session, ctx)
45 );
46 }
47
48 // cache key logic, should this be part of request_cache_filter?
49 if session.cache.enabled() {
50 match self.inner.cache_key_callback(session, ctx) {
51 Ok(key) => {
52 session.cache.set_cache_key(key);
53 }
54 Err(e) => {
55 // TODO: handle this error
56 session.cache.disable(NoCacheReason::StorageError);
57 warn!(
58 "Fail to cache_key_callback: {e}, {}",
59 self.inner.request_summary(session, ctx)
60 );
61 }
62 }
63 }
64
65 // cache purge logic: PURGE short-circuits rest of request
66 if self.inner.is_purge(session, ctx) {
67 return self.proxy_purge(session, ctx).await;
68 }
69
70 // bypass cache lookup if we predict to be uncacheable
71 if session.cache.enabled() && !session.cache.cacheable_prediction() {
72 session.cache.bypass();
73 }
74
75 if !session.cache.enabled() {
76 return None;
77 }
78
79 // cache lookup logic
80 loop {
81 // for cache lock, TODO: cap the max number of loops
82 match session.cache.cache_lookup().await {
83 Ok(res) => {
84 let mut hit_status_opt = None;
85 if let Some((mut meta, handler)) = res {
86 // Vary logic
87 // Because this branch can be called multiple times in a loop, and we only
88 // need to update the vary once, check if variance is already set to
89 // prevent unnecessary vary lookups.
90 let cache_key = session.cache.cache_key();
91 if let Some(variance) = cache_key.variance_bin() {
92 // We've looked up a secondary slot.
93 // Adhoc double check that the variance found is the variance we want.
94 if Some(variance) != meta.variance() {
95 warn!("Cache variance mismatch, {variance:?}, {cache_key:?}");
96 session.cache.disable(NoCacheReason::InternalError);
97 break None;
98 }
99 } else {
100 // Basic cache key; either variance is off, or this is the primary slot.
101 let req_header = session.req_header();
102 let variance = self.inner.cache_vary_filter(&meta, ctx, req_header);
103 if let Some(variance) = variance {
104 // Variance is on. This is the primary slot.
105 if !session.cache.cache_vary_lookup(variance, &meta) {
106 // This wasn't the desired variant. Updated cache key variance, cause another
107 // lookup to get the desired variant, which would be in a secondary slot.
108 continue;
109 }
110 } // else: vary is not in use
111 }
112
113 // Either no variance, or the current handler targets the correct variant.
114
115 // hit
116 // TODO: maybe round and/or cache now()
117 let is_fresh = meta.is_fresh(SystemTime::now());
118 // check if we should force expire or force miss
119 let hit_status = match self
120 .inner
121 .cache_hit_filter(session, &meta, is_fresh, ctx)
122 .await
123 {
124 Err(e) => {
125 error!(
126 "Failed to filter cache hit: {e}, {}",
127 self.inner.request_summary(session, ctx)
128 );
129 // this return value will cause us to fetch from upstream
130 HitStatus::FailedHitFilter
131 }
132 Ok(None) => {
133 if is_fresh {
134 HitStatus::Fresh
135 } else {
136 HitStatus::Expired
137 }
138 }
139 Ok(Some(ForcedInvalidationKind::ForceExpired)) => {
140 // force expired asset should not be serve as stale
141 // because force expire is usually to remove data
142 meta.disable_serve_stale();
143 HitStatus::ForceExpired
144 }
145 Ok(Some(ForcedInvalidationKind::ForceMiss)) => HitStatus::ForceMiss,
146 };
147
148 hit_status_opt = Some(hit_status);
149
150 // init cache for hit / stale
151 session.cache.cache_found(meta, handler, hit_status);
152 }
153
154 if hit_status_opt.map_or(true, HitStatus::is_treated_as_miss) {
155 // cache miss
156 if session.cache.is_cache_locked() {
157 // Another request is filling the cache; try waiting til that's done and retry.
158 let lock_status = session.cache.cache_lock_wait().await;
159 if self.handle_lock_status(session, ctx, lock_status) {
160 continue;
161 } else {
162 break None;
163 }
164 } else {
165 self.inner.cache_miss(session, ctx);
166 break None;
167 }
168 }
169
170 // Safe because an empty hit status would have broken out
171 // in the block above
172 let hit_status = hit_status_opt.expect("None case handled as miss");
173
174 if !hit_status.is_fresh() {
175 // expired or force expired asset
176 if session.cache.is_cache_locked() {
177 // first if this is the sub request for the background cache update
178 if let Some(write_lock) = session
179 .subrequest_ctx
180 .as_mut()
181 .and_then(|ctx| ctx.take_write_lock())
182 {
183 // Put the write lock in the request
184 session.cache.set_write_lock(write_lock);
185 session.cache.tag_as_subrequest();
186 // and then let it go to upstream
187 break None;
188 }
189 let will_serve_stale = session.cache.can_serve_stale_updating()
190 && self.inner.should_serve_stale(session, ctx, None);
191 if !will_serve_stale {
192 let lock_status = session.cache.cache_lock_wait().await;
193 if self.handle_lock_status(session, ctx, lock_status) {
194 continue;
195 } else {
196 break None;
197 }
198 }
199 // else continue to serve stale
200 session.cache.set_stale_updating();
201 } else if session.cache.is_cache_lock_writer() {
202 // stale while revalidate logic for the writer
203 let will_serve_stale = session.cache.can_serve_stale_updating()
204 && self.inner.should_serve_stale(session, ctx, None);
205 if will_serve_stale {
206 // create a background thread to do the actual update
207 let subrequest =
208 Box::new(crate::subrequest::create_dummy_session(session));
209 let new_app = self.clone(); // Clone the Arc
210 let (permit, cache_lock) = session.cache.take_write_lock();
211 let sub_req_ctx = Box::new(SubReqCtx::with_write_lock(
212 cache_lock,
213 session.cache.cache_key().clone(),
214 permit,
215 ));
216 tokio::spawn(async move {
217 new_app.process_subrequest(subrequest, sub_req_ctx).await;
218 });
219 // continue to serve stale for this request
220 session.cache.set_stale_updating();
221 } else {
222 // return to fetch from upstream
223 break None;
224 }
225 } else {
226 // return to fetch from upstream
227 break None;
228 }
229 }
230
231 let (reuse, err) = self.proxy_cache_hit(session, ctx).await;
232 if let Some(e) = err.as_ref() {
233 error!(
234 "Fail to serve cache: {e}, {}",
235 self.inner.request_summary(session, ctx)
236 );
237 }
238 // responses is served from cache, exit
239 break Some((reuse, err));
240 }
241 Err(e) => {
242 // Allow cache miss to fill cache even if cache lookup errors
243 // this is mostly to support backward incompatible metadata update
244 // TODO: check error types
245 // session.cache.disable();
246 self.inner.cache_miss(session, ctx);
247 warn!(
248 "Fail to cache lookup: {e}, {}",
249 self.inner.request_summary(session, ctx)
250 );
251 break None;
252 }
253 }
254 }
255 }
256
257 // return bool: server_session can be reused, and error if any
258 pub(crate) async fn proxy_cache_hit(
259 &self,
260 session: &mut Session,
261 ctx: &mut SV::CTX,
262 ) -> (bool, Option<Box<Error>>)
263 where
264 SV: ProxyHttp + Send + Sync,
265 SV::CTX: Send + Sync,
266 {
267 use range_filter::*;
268
269 let seekable = session.cache.hit_handler().can_seek();
270 let mut header = cache_hit_header(&session.cache);
271
272 let req = session.req_header();
273
274 let not_modified = match self.inner.cache_not_modified_filter(session, &header, ctx) {
275 Ok(not_modified) => not_modified,
276 Err(e) => {
277 // fail open if cache_not_modified_filter errors,
278 // just return the whole original response
279 warn!(
280 "Failed to run cache not modified filter: {e}, {}",
281 self.inner.request_summary(session, ctx)
282 );
283 false
284 }
285 };
286 if not_modified {
287 to_304(&mut header);
288 }
289 let header_only = not_modified || req.method == http::method::Method::HEAD;
290
291 // process range header if the cache storage supports seek
292 let range_type = if seekable && !session.ignore_downstream_range {
293 self.inner.range_header_filter(req, &mut header, ctx)
294 } else {
295 RangeType::None
296 };
297
298 // return a 416 with an empty body for simplicity
299 let header_only = header_only || matches!(range_type, RangeType::Invalid);
300
301 // TODO: use ProxyUseCache to replace the logic below
302 match self.inner.response_filter(session, &mut header, ctx).await {
303 Ok(_) => {
304 if let Err(e) = session
305 .downstream_modules_ctx
306 .response_header_filter(&mut header, header_only)
307 .await
308 {
309 error!(
310 "Failed to run downstream modules response header filter in hit: {e}, {}",
311 self.inner.request_summary(session, ctx)
312 );
313 session
314 .as_mut()
315 .respond_error(500)
316 .await
317 .unwrap_or_else(|e| {
318 error!("failed to send error response to downstream: {e}");
319 });
320 // we have not write anything dirty to downstream, it is still reusable
321 return (true, Some(e));
322 }
323
324 if let Err(e) = session
325 .as_mut()
326 .write_response_header(header)
327 .await
328 .map_err(|e| e.into_down())
329 {
330 // downstream connection is bad already
331 return (false, Some(e));
332 }
333 }
334 Err(e) => {
335 error!(
336 "Failed to run response filter in hit: {e}, {}",
337 self.inner.request_summary(session, ctx)
338 );
339 session
340 .as_mut()
341 .respond_error(500)
342 .await
343 .unwrap_or_else(|e| {
344 error!("failed to send error response to downstream: {e}");
345 });
346 // we have not write anything dirty to downstream, it is still reusable
347 return (true, Some(e));
348 }
349 }
350 debug!("finished sending cached header to downstream");
351
352 if !header_only {
353 if let RangeType::Single(r) = range_type {
354 if let Err(e) = session.cache.hit_handler().seek(r.start, Some(r.end)) {
355 return (false, Some(e));
356 }
357 }
358 loop {
359 match session.cache.hit_handler().read_body().await {
360 Ok(mut body) => {
361 let end = body.is_none();
362 match self
363 .inner
364 .response_body_filter(session, &mut body, end, ctx)
365 {
366 Ok(Some(duration)) => {
367 trace!("delaying response for {duration:?}");
368 time::sleep(duration).await;
369 }
370 Ok(None) => { /* continue */ }
371 Err(e) => {
372 // body is being sent, don't treat downstream as reusable
373 return (false, Some(e));
374 }
375 }
376
377 if let Err(e) = session
378 .downstream_modules_ctx
379 .response_body_filter(&mut body, end)
380 {
381 // body is being sent, don't treat downstream as reusable
382 return (false, Some(e));
383 }
384
385 if !end && body.as_ref().map_or(true, |b| b.is_empty()) {
386 // Don't write empty body which will end session,
387 // still more hit handler bytes to read
388 continue;
389 }
390
391 // write to downstream
392 let b = body.unwrap_or_default();
393 if let Err(e) = session
394 .as_mut()
395 .write_response_body(b, end)
396 .await
397 .map_err(|e| e.into_down())
398 {
399 return (false, Some(e));
400 }
401 if end {
402 break;
403 }
404 }
405 Err(e) => return (false, Some(e)),
406 }
407 }
408 }
409
410 if let Err(e) = session.cache.finish_hit_handler().await {
411 warn!("Error during finish_hit_handler: {}", e);
412 }
413
414 match session.as_mut().finish_body().await {
415 Ok(_) => {
416 debug!("finished sending cached body to downstream");
417 (true, None)
418 }
419 Err(e) => (false, Some(e)),
420 }
421 }
422
423 /* Downstream revalidation, only needed when cache is on because otherwise origin
424 * will handle it */
425 pub(crate) fn downstream_response_conditional_filter(
426 &self,
427 use_cache: &mut ServeFromCache,
428 session: &Session,
429 resp: &mut ResponseHeader,
430 ctx: &mut SV::CTX,
431 ) where
432 SV: ProxyHttp,
433 {
434 // TODO: range
435 let req = session.req_header();
436
437 let not_modified = match self.inner.cache_not_modified_filter(session, resp, ctx) {
438 Ok(not_modified) => not_modified,
439 Err(e) => {
440 // fail open if cache_not_modified_filter errors,
441 // just return the whole original response
442 warn!(
443 "Failed to run cache not modified filter: {e}, {}",
444 self.inner.request_summary(session, ctx)
445 );
446 false
447 }
448 };
449
450 if not_modified {
451 to_304(resp);
452 }
453 let header_only = not_modified || req.method == http::method::Method::HEAD;
454 if header_only {
455 if use_cache.is_on() {
456 // tell cache to stop after yielding header
457 use_cache.enable_header_only();
458 } else {
459 // headers only during cache miss, upstream should continue send
460 // body to cache, `session` will ignore body automatically because
461 // of the signature of `header` (304)
462 // TODO: we should drop body before/within this filter so that body
463 // filter only runs on data downstream sees
464 }
465 }
466 }
467
468 // TODO: cache upstream header filter to add/remove headers
469
470 pub(crate) async fn cache_http_task(
471 &self,
472 session: &mut Session,
473 task: &HttpTask,
474 ctx: &mut SV::CTX,
475 serve_from_cache: &mut ServeFromCache,
476 ) -> Result<()>
477 where
478 SV: ProxyHttp + Send + Sync,
479 SV::CTX: Send + Sync,
480 {
481 if !session.cache.enabled() && !session.cache.bypassing() {
482 return Ok(());
483 }
484
485 match task {
486 HttpTask::Header(header, end_stream) => {
487 // decide if cacheable and create cache meta
488 // for now, skip 1xxs (should not affect response cache decisions)
489 // However 101 is an exception because it is the final response header
490 if header.status.is_informational()
491 && header.status != StatusCode::SWITCHING_PROTOCOLS
492 {
493 return Ok(());
494 }
495 match self.inner.response_cache_filter(session, header, ctx)? {
496 Cacheable(meta) => {
497 let mut fill_cache = true;
498 if session.cache.bypassing() {
499 // The cache might have been bypassed because the response exceeded the
500 // maximum cacheable asset size. If that looks like the case (there
501 // is a maximum file size configured and we don't know the content
502 // length up front), attempting to re-enable the cache now would cause
503 // the request to fail when the chunked response exceeds the maximum
504 // file size again.
505 if session.cache.max_file_size_bytes().is_some()
506 && !meta.headers().contains_key(header::CONTENT_LENGTH)
507 {
508 session.cache.disable(NoCacheReason::ResponseTooLarge);
509 return Ok(());
510 }
511
512 session.cache.response_became_cacheable();
513
514 if session.req_header().method == Method::GET
515 && meta.response_header().status == StatusCode::OK
516 {
517 self.inner.cache_miss(session, ctx);
518 } else {
519 // we've allowed caching on the next request,
520 // but do not cache _this_ request if bypassed and not 200
521 // (We didn't run upstream request cache filters to strip range or condition headers,
522 // so this could be an uncacheable response e.g. 206 or 304 or HEAD.
523 // Exclude all non-200/GET for simplicity, may expand allowable codes in the future.)
524 fill_cache = false;
525 session.cache.disable(NoCacheReason::Deferred);
526 }
527 }
528
529 // If the Content-Length is known, and a maximum asset size has been configured
530 // on the cache, validate that the response does not exceed the maximum asset size.
531 if session.cache.enabled() {
532 if let Some(max_file_size) = session.cache.max_file_size_bytes() {
533 let content_length_hdr = meta.headers().get(header::CONTENT_LENGTH);
534 if let Some(content_length) =
535 header_value_content_length(content_length_hdr)
536 {
537 if content_length > max_file_size {
538 fill_cache = false;
539 session.cache.response_became_uncacheable(
540 NoCacheReason::ResponseTooLarge,
541 );
542 session.cache.disable(NoCacheReason::ResponseTooLarge);
543 }
544 }
545 // if the content-length header is not specified, the miss handler
546 // will count the response size on the fly, aborting the request
547 // mid-transfer if the max file size is exceeded
548 }
549 }
550 if fill_cache {
551 let req_header = session.req_header();
552 // Update the variance in the meta via the same callback,
553 // cache_vary_filter(), used in cache lookup for consistency.
554 // Future cache lookups need a matching variance in the meta
555 // with the cache key to pick up the correct variance
556 let variance = self.inner.cache_vary_filter(&meta, ctx, req_header);
557 session.cache.set_cache_meta(meta);
558 session.cache.update_variance(variance);
559 // this sends the meta and header
560 session.cache.set_miss_handler().await?;
561 if session.cache.miss_body_reader().is_some() {
562 serve_from_cache.enable_miss();
563 }
564 if *end_stream {
565 session
566 .cache
567 .miss_handler()
568 .unwrap() // safe, it is set above
569 .write_body(Bytes::new(), true)
570 .await?;
571 session.cache.finish_miss_handler().await?;
572 }
573 }
574 }
575 Uncacheable(reason) => {
576 if !session.cache.bypassing() {
577 // mark as uncacheable, so we bypass cache next time
578 session.cache.response_became_uncacheable(reason);
579 }
580 session.cache.disable(reason);
581 }
582 }
583 }
584 HttpTask::Body(data, end_stream) => match data {
585 Some(d) => {
586 if session.cache.enabled() {
587 // this will panic if more data is sent after we see end_stream
588 // but should be impossible in real world
589 let miss_handler = session.cache.miss_handler().unwrap();
590 // TODO: do this async
591 let res = miss_handler.write_body(d.clone(), *end_stream).await;
592 if let Err(err) = res {
593 if err.etype == ERR_RESPONSE_TOO_LARGE {
594 debug!("chunked response exceeded max cache size, remembering that it is uncacheable");
595 session
596 .cache
597 .response_became_uncacheable(NoCacheReason::ResponseTooLarge);
598 }
599
600 return Err(err);
601 }
602 if *end_stream {
603 session.cache.finish_miss_handler().await?;
604 }
605 }
606 }
607 None => {
608 if session.cache.enabled() && *end_stream {
609 session.cache.finish_miss_handler().await?;
610 }
611 }
612 },
613 HttpTask::Trailer(_) => {} // h1 trailer is not supported yet
614 HttpTask::Done => {
615 if session.cache.enabled() {
616 session.cache.finish_miss_handler().await?;
617 }
618 }
619 HttpTask::Failed(_) => {
620 // TODO: handle this failure: delete the temp files?
621 }
622 }
623 Ok(())
624 }
625
626 // Decide if local cache can be used according to upstream http header
627 // 1. when upstream returns 304, the local cache is refreshed and served fresh
628 // 2. when upstream returns certain HTTP error status, the local cache is served stale
629 // Return true if local cache should be used, false otherwise
630 pub(crate) async fn revalidate_or_stale(
631 &self,
632 session: &mut Session,
633 task: &mut HttpTask,
634 ctx: &mut SV::CTX,
635 ) -> bool
636 where
637 SV: ProxyHttp + Send + Sync,
638 SV::CTX: Send + Sync,
639 {
640 if !session.cache.enabled() {
641 return false;
642 }
643
644 match task {
645 HttpTask::Header(resp, _eos) => {
646 if resp.status == StatusCode::NOT_MODIFIED {
647 if session.cache.maybe_cache_meta().is_some() {
648 // run upstream response filters on upstream 304 first
649 if let Err(err) = self.inner.upstream_response_filter(session, resp, ctx) {
650 error!("upstream response filter error on 304: {err:?}");
651 session.cache.revalidate_uncacheable(
652 *resp.clone(),
653 NoCacheReason::InternalError,
654 );
655 // always serve from cache after receiving the 304
656 return true;
657 }
658 // 304 doesn't contain all the headers, merge 304 into cached 200 header
659 // in order for response_cache_filter to run correctly
660 let merged_header = session.cache.revalidate_merge_header(resp);
661 match self
662 .inner
663 .response_cache_filter(session, &merged_header, ctx)
664 {
665 Ok(Cacheable(mut meta)) => {
666 // For simplicity, ignore changes to variance over 304 for now.
667 // Note this means upstream can only update variance via 2xx
668 // (expired response).
669 //
670 // TODO: if we choose to respect changing Vary / variance over 304,
671 // then there are a few cases to consider. See `update_variance` in
672 // the `pingora-cache` module.
673 let old_meta = session.cache.maybe_cache_meta().unwrap(); // safe, checked above
674 if let Some(old_variance) = old_meta.variance() {
675 meta.set_variance(old_variance);
676 }
677 if let Err(e) = session.cache.revalidate_cache_meta(meta).await {
678 // Fail open: we can continue use the revalidated response even
679 // if the meta failed to write to storage
680 warn!("revalidate_cache_meta failed {e:?}");
681 }
682 }
683 Ok(Uncacheable(reason)) => {
684 // This response was once cacheable, and upstream tells us it has not changed
685 // but now we decided it is uncacheable!
686 // RFC 9111: still allowed to reuse stored response this time because
687 // it was "successfully validated"
688 // https://www.rfc-editor.org/rfc/rfc9111#constructing.responses.from.caches
689 // Serve the response, but do not update cache
690
691 // We also want to avoid poisoning downstream's cache with an unsolicited 304
692 // if we did not receive a conditional request from downstream
693 // (downstream may have a different cacheability assessment and could cache the 304)
694
695 //TODO: log more
696 warn!("Uncacheable {reason:?} 304 received");
697 session.cache.response_became_uncacheable(reason);
698 session.cache.revalidate_uncacheable(merged_header, reason);
699 }
700 Err(e) => {
701 // Error during revalidation, similarly to the reasons above
702 // (avoid poisoning downstream cache with passthrough 304),
703 // allow serving the stored response without updating cache
704 warn!("Error {e:?} response_cache_filter during revalidation");
705 session.cache.revalidate_uncacheable(
706 merged_header,
707 NoCacheReason::InternalError,
708 );
709 // Assume the next 304 may succeed, so don't mark uncacheable
710 }
711 }
712 // always serve from cache after receiving the 304
713 true
714 } else {
715 //TODO: log more
716 warn!("304 received without cached asset, disable caching");
717 let reason = NoCacheReason::Custom("304 on miss");
718 session.cache.response_became_uncacheable(reason);
719 session.cache.disable(reason);
720 false
721 }
722 } else if resp.status.is_server_error() {
723 // stale if error logic, 5xx only for now
724
725 // this is response header filter, response_written should always be None?
726 if !session.cache.can_serve_stale_error()
727 || session.response_written().is_some()
728 {
729 return false;
730 }
731
732 // create an error to encode the http status code
733 let http_status_error = Error::create(
734 ErrorType::HTTPStatus(resp.status.as_u16()),
735 ErrorSource::Upstream,
736 None,
737 None,
738 );
739 if self
740 .inner
741 .should_serve_stale(session, ctx, Some(&http_status_error))
742 {
743 // no more need to keep the write lock
744 session
745 .cache
746 .release_write_lock(NoCacheReason::UpstreamError);
747 true
748 } else {
749 false
750 }
751 } else {
752 false // not 304, not stale if error status code
753 }
754 }
755 _ => false, // not header
756 }
757 }
758
759 // None: no staled asset is used, Some(_): staled asset is sent to downstream
760 // bool: can the downstream connection be reused
761 pub(crate) async fn handle_stale_if_error(
762 &self,
763 session: &mut Session,
764 ctx: &mut SV::CTX,
765 error: &Error,
766 ) -> Option<(bool, Option<Box<Error>>)>
767 where
768 SV: ProxyHttp + Send + Sync,
769 SV::CTX: Send + Sync,
770 {
771 // the caller might already checked this as an optimization
772 if !session.cache.can_serve_stale_error() {
773 return None;
774 }
775
776 // the error happen halfway through a regular response to downstream
777 // can't resend the response
778 if session.response_written().is_some() {
779 return None;
780 }
781
782 // check error types
783 if !self.inner.should_serve_stale(session, ctx, Some(error)) {
784 return None;
785 }
786
787 // log the original error
788 warn!(
789 "Fail to proxy: {}, serving stale, {}",
790 error,
791 self.inner.request_summary(session, ctx)
792 );
793
794 // no more need to hang onto the cache lock
795 session
796 .cache
797 .release_write_lock(NoCacheReason::UpstreamError);
798
799 Some(self.proxy_cache_hit(session, ctx).await)
800 }
801
802 // helper function to check when to continue to retry lock (true) or give up (false)
803 fn handle_lock_status(
804 &self,
805 session: &mut Session,
806 ctx: &SV::CTX,
807 lock_status: LockStatus,
808 ) -> bool
809 where
810 SV: ProxyHttp,
811 {
812 debug!("cache unlocked {lock_status:?}");
813 match lock_status {
814 // should lookup the cached asset again
815 LockStatus::Done => true,
816 // should compete to be a new writer
817 LockStatus::TransientError => true,
818 // the request is uncacheable, go ahead to fetch from the origin
819 LockStatus::GiveUp => {
820 // TODO: It will be nice for the writer to propagate the real reason
821 session.cache.disable(NoCacheReason::CacheLockGiveUp);
822 // not cacheable, just go to the origin.
823 false
824 }
825 // treat this the same as TransientError
826 LockStatus::Dangling => {
827 // software bug, but request can recover from this
828 warn!(
829 "Dangling cache lock, {}",
830 self.inner.request_summary(session, ctx)
831 );
832 true
833 }
834 /* We have 3 options when a lock is held too long
835 * 1. release the lock and let every request complete for it again
836 * 2. let every request cache miss
837 * 3. let every request through while disabling cache
838 * #1 could repeat the situation but protect the origin from load
839 * #2 could amplify disk writes and storage for temp file
840 * #3 is the simplest option for now */
841 LockStatus::Timeout => {
842 warn!(
843 "Cache lock timeout, {}",
844 self.inner.request_summary(session, ctx)
845 );
846 session.cache.disable(NoCacheReason::CacheLockTimeout);
847 // not cacheable, just go to the origin.
848 false
849 }
850 // software bug, this status should be impossible to reach
851 LockStatus::Waiting => panic!("impossible LockStatus::Waiting"),
852 }
853 }
854}
855
856fn cache_hit_header(cache: &HttpCache) -> Box<ResponseHeader> {
857 let mut header = Box::new(cache.cache_meta().response_header_copy());
858 // convert cache response
859
860 // these status codes / method cannot have body, so no need to add chunked encoding
861 let no_body = matches!(header.status.as_u16(), 204 | 304);
862
863 // https://www.rfc-editor.org/rfc/rfc9111#section-4:
864 // When a stored response is used to satisfy a request without validation, a cache
865 // MUST generate an Age header field
866 if !cache.upstream_used() {
867 let age = cache.cache_meta().age().as_secs();
868 header.insert_header(http::header::AGE, age).unwrap();
869 }
870
871 /* Add chunked header to tell downstream to use chunked encoding
872 * during the absent of content-length in h2 */
873 if !no_body
874 && !header.status.is_informational()
875 && header.headers.get(http::header::CONTENT_LENGTH).is_none()
876 {
877 header
878 .insert_header(http::header::TRANSFER_ENCODING, "chunked")
879 .unwrap();
880 }
881 header
882}
883
884// https://datatracker.ietf.org/doc/html/rfc7233#section-3
885pub mod range_filter {
886 use super::*;
887 use http::header::*;
888 use std::ops::Range;
889
890 // parse bytes into usize, ignores specific error
891 fn parse_number(input: &[u8]) -> Option<usize> {
892 str::from_utf8(input).ok()?.parse().ok()
893 }
894
895 fn parse_range_header(range: &[u8], content_length: usize) -> RangeType {
896 use regex::Regex;
897
898 // single byte range only for now
899 // https://datatracker.ietf.org/doc/html/rfc7233#section-2.1
900 // https://datatracker.ietf.org/doc/html/rfc7233#appendix-C: case-insensitive
901 static RE_SINGLE_RANGE: Lazy<Regex> =
902 Lazy::new(|| Regex::new(r"(?i)bytes=(?P<start>\d*)-(?P<end>\d*)").unwrap());
903
904 // ignore invalid range header
905 let Ok(range_str) = str::from_utf8(range) else {
906 return RangeType::None;
907 };
908
909 let Some(captured) = RE_SINGLE_RANGE.captures(range_str) else {
910 return RangeType::None;
911 };
912 let maybe_start = captured
913 .name("start")
914 .and_then(|s| s.as_str().parse::<usize>().ok());
915 let end = captured
916 .name("end")
917 .and_then(|s| s.as_str().parse::<usize>().ok());
918
919 if let Some(start) = maybe_start {
920 if start >= content_length {
921 RangeType::Invalid
922 } else {
923 // open-ended range should end at the last byte
924 // over sized end is allow but ignored
925 // range end is inclusive
926 let end = std::cmp::min(end.unwrap_or(content_length - 1), content_length - 1) + 1;
927 if end <= start {
928 RangeType::Invalid
929 } else {
930 RangeType::new_single(start, end)
931 }
932 }
933 } else {
934 // start is empty, this changes the meaning of the value of `end`
935 // Now it means to read the last `end` bytes
936 if let Some(end) = end {
937 if content_length >= end {
938 RangeType::new_single(content_length - end, content_length)
939 } else {
940 // over sized end is allow but ignored
941 RangeType::new_single(0, content_length)
942 }
943 } else {
944 // both empty/invalid
945 RangeType::Invalid
946 }
947 }
948 }
949 #[test]
950 fn test_parse_range() {
951 assert_eq!(
952 parse_range_header(b"bytes=0-1", 10),
953 RangeType::new_single(0, 2)
954 );
955 assert_eq!(
956 parse_range_header(b"bYTes=0-9", 10),
957 RangeType::new_single(0, 10)
958 );
959 assert_eq!(
960 parse_range_header(b"bytes=0-12", 10),
961 RangeType::new_single(0, 10)
962 );
963 assert_eq!(
964 parse_range_header(b"bytes=0-", 10),
965 RangeType::new_single(0, 10)
966 );
967 assert_eq!(parse_range_header(b"bytes=2-1", 10), RangeType::Invalid);
968 assert_eq!(parse_range_header(b"bytes=10-11", 10), RangeType::Invalid);
969 assert_eq!(
970 parse_range_header(b"bytes=-2", 10),
971 RangeType::new_single(8, 10)
972 );
973 assert_eq!(
974 parse_range_header(b"bytes=-12", 10),
975 RangeType::new_single(0, 10)
976 );
977 assert_eq!(parse_range_header(b"bytes=-", 10), RangeType::Invalid);
978 assert_eq!(parse_range_header(b"bytes=", 10), RangeType::None);
979 }
980
981 #[derive(Debug, Eq, PartialEq, Clone)]
982 pub enum RangeType {
983 None,
984 Single(Range<usize>),
985 // TODO: multi-range
986 Invalid,
987 }
988
989 impl RangeType {
990 fn new_single(start: usize, end: usize) -> Self {
991 RangeType::Single(Range { start, end })
992 }
993 }
994
995 // TODO: if-range
996
997 // single range for now
998 pub fn range_header_filter(req: &RequestHeader, resp: &mut ResponseHeader) -> RangeType {
999 // The Range header field is evaluated after evaluating the precondition
1000 // header fields defined in [RFC7232], and only if the result in absence
1001 // of the Range header field would be a 200 (OK) response
1002 if resp.status != StatusCode::OK {
1003 return RangeType::None;
1004 }
1005
1006 // "A server MUST ignore a Range header field received with a request method other than GET."
1007 if req.method != http::Method::GET && req.method != http::Method::HEAD {
1008 return RangeType::None;
1009 }
1010
1011 let Some(range_header) = req.headers.get(RANGE) else {
1012 return RangeType::None;
1013 };
1014
1015 // Content-Length is not required by RFC but it is what nginx does and easier to implement
1016 // with this header present.
1017 let Some(content_length_bytes) = resp.headers.get(CONTENT_LENGTH) else {
1018 return RangeType::None;
1019 };
1020 // bail on invalid content length
1021 let Some(content_length) = parse_number(content_length_bytes.as_bytes()) else {
1022 return RangeType::None;
1023 };
1024
1025 // if-range wants to understand if the Last-Modified / ETag value matches exactly for use
1026 // with resumable downloads.
1027 // https://datatracker.ietf.org/doc/html/rfc9110#name-if-range
1028 // Note that the RFC wants strong validation, and suggests that
1029 // "A valid entity-tag can be distinguished from a valid HTTP-date
1030 // by examining the first three characters for a DQUOTE,"
1031 // but this current etag matching behavior most closely mirrors nginx.
1032 if let Some(if_range) = req.headers.get(IF_RANGE) {
1033 let ir = if_range.as_bytes();
1034 let matches = if ir.len() >= 2 && ir.last() == Some(&b'"') {
1035 resp.headers.get(ETAG).is_some_and(|etag| etag == if_range)
1036 } else if let Some(last_modified) = resp.headers.get(LAST_MODIFIED) {
1037 last_modified == if_range
1038 } else {
1039 false
1040 };
1041 if !matches {
1042 return RangeType::None;
1043 }
1044 }
1045
1046 // TODO: we can also check Accept-Range header from resp. Nginx gives uses the option
1047 // see proxy_force_ranges
1048
1049 let range_type = parse_range_header(range_header.as_bytes(), content_length);
1050
1051 match &range_type {
1052 RangeType::None => { /* nothing to do*/ }
1053 RangeType::Single(r) => {
1054 // 206 response
1055 resp.set_status(StatusCode::PARTIAL_CONTENT).unwrap();
1056 resp.insert_header(&CONTENT_LENGTH, r.end - r.start)
1057 .unwrap();
1058 resp.insert_header(
1059 &CONTENT_RANGE,
1060 format!("bytes {}-{}/{content_length}", r.start, r.end - 1), // range end is inclusive
1061 )
1062 .unwrap()
1063 }
1064 RangeType::Invalid => {
1065 // 416 response
1066 resp.set_status(StatusCode::RANGE_NOT_SATISFIABLE).unwrap();
1067 // empty body for simplicity
1068 resp.insert_header(&CONTENT_LENGTH, HeaderValue::from_static("0"))
1069 .unwrap();
1070 // TODO: remove other headers like content-encoding
1071 resp.remove_header(&CONTENT_TYPE);
1072 resp.insert_header(&CONTENT_RANGE, format!("bytes */{content_length}"))
1073 .unwrap()
1074 }
1075 }
1076
1077 range_type
1078 }
1079
1080 #[test]
1081 fn test_range_filter() {
1082 fn gen_req() -> RequestHeader {
1083 RequestHeader::build(http::Method::GET, b"/", Some(1)).unwrap()
1084 }
1085 fn gen_resp() -> ResponseHeader {
1086 let mut resp = ResponseHeader::build(200, Some(1)).unwrap();
1087 resp.append_header("Content-Length", "10").unwrap();
1088 resp
1089 }
1090
1091 // no range
1092 let req = gen_req();
1093 let mut resp = gen_resp();
1094 assert_eq!(RangeType::None, range_header_filter(&req, &mut resp));
1095 assert_eq!(resp.status.as_u16(), 200);
1096
1097 // regular range
1098 let mut req = gen_req();
1099 req.insert_header("Range", "bytes=0-1").unwrap();
1100 let mut resp = gen_resp();
1101 assert_eq!(
1102 RangeType::new_single(0, 2),
1103 range_header_filter(&req, &mut resp)
1104 );
1105 assert_eq!(resp.status.as_u16(), 206);
1106 assert_eq!(resp.headers.get("content-length").unwrap().as_bytes(), b"2");
1107 assert_eq!(
1108 resp.headers.get("content-range").unwrap().as_bytes(),
1109 b"bytes 0-1/10"
1110 );
1111
1112 // bad range
1113 let mut req = gen_req();
1114 req.insert_header("Range", "bytes=1-0").unwrap();
1115 let mut resp = gen_resp();
1116 assert_eq!(RangeType::Invalid, range_header_filter(&req, &mut resp));
1117 assert_eq!(resp.status.as_u16(), 416);
1118 assert_eq!(resp.headers.get("content-length").unwrap().as_bytes(), b"0");
1119 assert_eq!(
1120 resp.headers.get("content-range").unwrap().as_bytes(),
1121 b"bytes */10"
1122 );
1123 }
1124
1125 #[test]
1126 fn test_if_range() {
1127 const DATE: &str = "Fri, 07 Jul 2023 22:03:29 GMT";
1128 const ETAG: &str = "\"1234\"";
1129
1130 fn gen_req() -> RequestHeader {
1131 let mut req = RequestHeader::build(http::Method::GET, b"/", Some(1)).unwrap();
1132 req.append_header("Range", "bytes=0-1").unwrap();
1133 req
1134 }
1135 fn gen_resp() -> ResponseHeader {
1136 let mut resp = ResponseHeader::build(200, Some(1)).unwrap();
1137 resp.append_header("Content-Length", "10").unwrap();
1138 resp.append_header("Last-Modified", DATE).unwrap();
1139 resp.append_header("ETag", ETAG).unwrap();
1140 resp
1141 }
1142
1143 // matching Last-Modified date
1144 let mut req = gen_req();
1145 req.insert_header("If-Range", DATE).unwrap();
1146 let mut resp = gen_resp();
1147 assert_eq!(
1148 RangeType::new_single(0, 2),
1149 range_header_filter(&req, &mut resp)
1150 );
1151
1152 // non-matching date
1153 let mut req = gen_req();
1154 req.insert_header("If-Range", "Fri, 07 Jul 2023 22:03:25 GMT")
1155 .unwrap();
1156 let mut resp = gen_resp();
1157 assert_eq!(RangeType::None, range_header_filter(&req, &mut resp));
1158
1159 // match ETag
1160 let mut req = gen_req();
1161 req.insert_header("If-Range", ETAG).unwrap();
1162 let mut resp = gen_resp();
1163 assert_eq!(
1164 RangeType::new_single(0, 2),
1165 range_header_filter(&req, &mut resp)
1166 );
1167
1168 // non-matching ETags do not result in range
1169 let mut req = gen_req();
1170 req.insert_header("If-Range", "\"4567\"").unwrap();
1171 let mut resp = gen_resp();
1172 assert_eq!(RangeType::None, range_header_filter(&req, &mut resp));
1173
1174 let mut req = gen_req();
1175 req.insert_header("If-Range", "1234").unwrap();
1176 let mut resp = gen_resp();
1177 assert_eq!(RangeType::None, range_header_filter(&req, &mut resp));
1178 }
1179
1180 pub struct RangeBodyFilter {
1181 pub range: RangeType,
1182 current: usize,
1183 }
1184
1185 impl RangeBodyFilter {
1186 pub fn new() -> Self {
1187 RangeBodyFilter {
1188 range: RangeType::None,
1189 current: 0,
1190 }
1191 }
1192
1193 pub fn set(&mut self, range: RangeType) {
1194 self.range = range;
1195 }
1196
1197 pub fn filter_body(&mut self, data: Option<Bytes>) -> Option<Bytes> {
1198 match &self.range {
1199 RangeType::None => data,
1200 RangeType::Invalid => None,
1201 RangeType::Single(r) => {
1202 let current = self.current;
1203 self.current += data.as_ref().map_or(0, |d| d.len());
1204 data.and_then(|d| Self::filter_range_data(r.start, r.end, current, d))
1205 }
1206 }
1207 }
1208
1209 fn filter_range_data(
1210 start: usize,
1211 end: usize,
1212 current: usize,
1213 data: Bytes,
1214 ) -> Option<Bytes> {
1215 if current + data.len() < start || current >= end {
1216 // if the current data is out side the desired range, just drop the data
1217 None
1218 } else if current >= start && current + data.len() <= end {
1219 // all data is within the slice
1220 Some(data)
1221 } else {
1222 // data: current........current+data.len()
1223 // range: start...........end
1224 let slice_start = start.saturating_sub(current);
1225 let slice_end = std::cmp::min(data.len(), end - current);
1226 Some(data.slice(slice_start..slice_end))
1227 }
1228 }
1229 }
1230
1231 #[test]
1232 fn test_range_body_filter() {
1233 let mut body_filter = RangeBodyFilter::new();
1234 assert_eq!(body_filter.filter_body(Some("123".into())).unwrap(), "123");
1235
1236 let mut body_filter = RangeBodyFilter::new();
1237 body_filter.set(RangeType::Invalid);
1238 assert!(body_filter.filter_body(Some("123".into())).is_none());
1239
1240 let mut body_filter = RangeBodyFilter::new();
1241 body_filter.set(RangeType::new_single(0, 1));
1242 assert_eq!(body_filter.filter_body(Some("012".into())).unwrap(), "0");
1243 assert!(body_filter.filter_body(Some("345".into())).is_none());
1244
1245 let mut body_filter = RangeBodyFilter::new();
1246 body_filter.set(RangeType::new_single(4, 6));
1247 assert!(body_filter.filter_body(Some("012".into())).is_none());
1248 assert_eq!(body_filter.filter_body(Some("345".into())).unwrap(), "45");
1249 assert!(body_filter.filter_body(Some("678".into())).is_none());
1250
1251 let mut body_filter = RangeBodyFilter::new();
1252 body_filter.set(RangeType::new_single(1, 7));
1253 assert_eq!(body_filter.filter_body(Some("012".into())).unwrap(), "12");
1254 assert_eq!(body_filter.filter_body(Some("345".into())).unwrap(), "345");
1255 assert_eq!(body_filter.filter_body(Some("678".into())).unwrap(), "6");
1256 }
1257}
1258
1259// a state machine for proxy logic to tell when to use cache in the case of
1260// miss/revalidation/error.
1261#[derive(Debug)]
1262pub(crate) enum ServeFromCache {
1263 Off, // not using cache
1264 CacheHeader, // should serve cache header
1265 CacheHeaderOnly, // should serve cache header
1266 CacheBody(bool), // should serve cache body with a bool to indicate if it has already called seek on the hit handler
1267 CacheHeaderMiss, // should serve cache header but upstream response should be admitted to cache
1268 CacheBodyMiss(bool), // should serve cache body but upstream response should be admitted to cache, bool to indicate seek status
1269 Done, // should serve cache body
1270}
1271
1272impl ServeFromCache {
1273 pub fn new() -> Self {
1274 Self::Off
1275 }
1276
1277 pub fn is_on(&self) -> bool {
1278 !matches!(self, Self::Off)
1279 }
1280
1281 pub fn is_miss(&self) -> bool {
1282 matches!(self, Self::CacheHeaderMiss | Self::CacheBodyMiss(_))
1283 }
1284
1285 pub fn is_miss_header(&self) -> bool {
1286 matches!(self, Self::CacheHeaderMiss)
1287 }
1288
1289 pub fn is_miss_body(&self) -> bool {
1290 matches!(self, Self::CacheBodyMiss(_))
1291 }
1292
1293 pub fn should_discard_upstream(&self) -> bool {
1294 self.is_on() && !self.is_miss()
1295 }
1296
1297 pub fn should_send_to_downstream(&self) -> bool {
1298 !self.is_on()
1299 }
1300
1301 pub fn enable(&mut self) {
1302 *self = Self::CacheHeader;
1303 }
1304
1305 pub fn enable_miss(&mut self) {
1306 if !self.is_on() {
1307 *self = Self::CacheHeaderMiss;
1308 }
1309 }
1310
1311 pub fn enable_header_only(&mut self) {
1312 match self {
1313 Self::CacheBody(_) | Self::CacheBodyMiss(_) => *self = Self::Done, // TODO: make sure no body is read yet
1314 _ => *self = Self::CacheHeaderOnly,
1315 }
1316 }
1317
1318 // This function is (best effort) cancel-safe to be used in select
1319 pub async fn next_http_task(
1320 &mut self,
1321 cache: &mut HttpCache,
1322 range: &mut RangeBodyFilter,
1323 ) -> Result<HttpTask> {
1324 if !cache.enabled() {
1325 // Cache is disabled due to internal error
1326 // TODO: if nothing is sent to eyeball yet, figure out a way to recovery by
1327 // fetching from upstream
1328 return Error::e_explain(InternalError, "Cache disabled");
1329 }
1330 match self {
1331 Self::Off => panic!("ProxyUseCache not enabled"),
1332 Self::CacheHeader => {
1333 *self = Self::CacheBody(true);
1334 Ok(HttpTask::Header(cache_hit_header(cache), false)) // false for now
1335 }
1336 Self::CacheHeaderMiss => {
1337 *self = Self::CacheBodyMiss(true);
1338 Ok(HttpTask::Header(cache_hit_header(cache), false)) // false for now
1339 }
1340 Self::CacheHeaderOnly => {
1341 *self = Self::Done;
1342 Ok(HttpTask::Header(cache_hit_header(cache), true))
1343 }
1344 Self::CacheBody(should_seek) => {
1345 if *should_seek {
1346 self.maybe_seek_hit_handler(cache, range)?;
1347 }
1348 if let Some(b) = cache.hit_handler().read_body().await? {
1349 Ok(HttpTask::Body(Some(b), false)) // false for now
1350 } else {
1351 *self = Self::Done;
1352 Ok(HttpTask::Done)
1353 }
1354 }
1355 Self::CacheBodyMiss(should_seek) => {
1356 if *should_seek {
1357 self.maybe_seek_miss_handler(cache, range)?;
1358 }
1359 // safety: called of enable_miss() call it only if the async_body_reader exist
1360 if let Some(b) = cache.miss_body_reader().unwrap().read_body().await? {
1361 Ok(HttpTask::Body(Some(b), false)) // false for now
1362 } else {
1363 *self = Self::Done;
1364 Ok(HttpTask::Done)
1365 }
1366 }
1367 Self::Done => Ok(HttpTask::Done),
1368 }
1369 }
1370
1371 fn maybe_seek_miss_handler(
1372 &mut self,
1373 cache: &mut HttpCache,
1374 range_filter: &mut RangeBodyFilter,
1375 ) -> Result<()> {
1376 if let RangeType::Single(range) = &range_filter.range {
1377 // safety: called only if the async_body_reader exists
1378 if cache.miss_body_reader().unwrap().can_seek() {
1379 cache
1380 .miss_body_reader()
1381 // safety: called only if the async_body_reader exists
1382 .unwrap()
1383 .seek(range.start, Some(range.end))
1384 .or_err(InternalError, "cannot seek miss handler")?;
1385 // Because the miss body reader is seeking, we no longer need the
1386 // RangeBodyFilter's help to return the requested byte range.
1387 range_filter.range = RangeType::None;
1388 }
1389 }
1390 *self = Self::CacheBodyMiss(false);
1391 Ok(())
1392 }
1393
1394 fn maybe_seek_hit_handler(
1395 &mut self,
1396 cache: &mut HttpCache,
1397 range_filter: &mut RangeBodyFilter,
1398 ) -> Result<()> {
1399 if let RangeType::Single(range) = &range_filter.range {
1400 if cache.hit_handler().can_seek() {
1401 cache
1402 .hit_handler()
1403 .seek(range.start, Some(range.end))
1404 .or_err(InternalError, "cannot seek hit handler")?;
1405 // Because the hit handler is seeking, we no longer need the
1406 // RangeBodyFilter's help to return the requested byte range.
1407 range_filter.range = RangeType::None;
1408 }
1409 }
1410 *self = Self::CacheBody(false);
1411 Ok(())
1412 }
1413}