pingora_cache/lib.rs
1// Copyright 2026 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The HTTP caching layer for proxies.
16
17#![allow(clippy::new_without_default)]
18
19use cf_rustracing::tag::Tag;
20use http::{method::Method, request::Parts as ReqHeader, response::Parts as RespHeader};
21use key::{CacheHashKey, CompactCacheKey, HashBinary};
22use lock::WritePermit;
23use log::warn;
24use pingora_error::Result;
25use pingora_http::ResponseHeader;
26use pingora_timeout::timeout;
27use std::time::{Duration, Instant, SystemTime};
28use storage::MissFinishType;
29use strum::IntoStaticStr;
30use trace::{CacheTraceCTX, Span};
31
32pub mod cache_control;
33pub mod eviction;
34pub mod filters;
35pub mod hashtable;
36pub mod key;
37pub mod lock;
38pub mod max_file_size;
39mod memory;
40pub mod meta;
41pub mod predictor;
42pub mod put;
43pub mod storage;
44pub mod trace;
45mod variance;
46
47use crate::max_file_size::MaxFileSizeTracker;
48pub use key::CacheKey;
49use lock::{CacheKeyLockImpl, LockStatus, Locked};
50pub use memory::MemCache;
51pub use meta::{set_compression_dict_content, set_compression_dict_path};
52pub use meta::{CacheMeta, CacheMetaDefaults};
53pub use storage::{HitHandler, MissHandler, PurgeType, Storage};
54pub use variance::VarianceBuilder;
55
56pub mod prelude {}
57
58/// The state machine for http caching
59///
60/// This object is used to handle the state and transitions for HTTP caching through the life of a
61/// request.
62pub struct HttpCache {
63 phase: CachePhase,
64 // Box the rest so that a disabled HttpCache struct is small
65 inner: Option<Box<HttpCacheInner>>,
66 digest: HttpCacheDigest,
67}
68
69/// This reflects the phase of HttpCache during the lifetime of a request
70#[derive(Clone, Copy, Debug, PartialEq, Eq)]
71pub enum CachePhase {
72 /// Cache disabled, with reason (NeverEnabled if never explicitly used)
73 Disabled(NoCacheReason),
74 /// Cache enabled but nothing is set yet
75 Uninit,
76 /// Cache was enabled, the request decided not to use it
77 // HttpCache.inner_enabled is kept
78 Bypass,
79 /// Awaiting the cache key to be generated
80 CacheKey,
81 /// Cache hit
82 Hit,
83 /// No cached asset is found
84 Miss,
85 /// A staled (expired) asset is found
86 Stale,
87 /// A staled (expired) asset was found, but another request is revalidating it
88 StaleUpdating,
89 /// A staled (expired) asset was found, so a fresh one was fetched
90 Expired,
91 /// A staled (expired) asset was found, and it was revalidated to be fresh
92 Revalidated,
93 /// Revalidated, but deemed uncacheable, so we do not freshen it
94 RevalidatedNoCache(NoCacheReason),
95}
96
97impl CachePhase {
98 /// Convert [CachePhase] as `str`, for logging and debugging.
99 pub fn as_str(&self) -> &'static str {
100 match self {
101 CachePhase::Disabled(_) => "disabled",
102 CachePhase::Uninit => "uninitialized",
103 CachePhase::Bypass => "bypass",
104 CachePhase::CacheKey => "key",
105 CachePhase::Hit => "hit",
106 CachePhase::Miss => "miss",
107 CachePhase::Stale => "stale",
108 CachePhase::StaleUpdating => "stale-updating",
109 CachePhase::Expired => "expired",
110 CachePhase::Revalidated => "revalidated",
111 CachePhase::RevalidatedNoCache(_) => "revalidated-nocache",
112 }
113 }
114}
115
116/// The possible reasons for not caching
117#[derive(Copy, Clone, Debug, PartialEq, Eq)]
118pub enum NoCacheReason {
119 /// Caching is not enabled to begin with
120 NeverEnabled,
121 /// Origin directives indicated this was not cacheable
122 OriginNotCache,
123 /// Response size was larger than the cache's configured maximum asset size
124 ResponseTooLarge,
125 /// Disabling caching due to unknown body size and previously exceeding maximum asset size;
126 /// the asset is otherwise cacheable, but cache needs to confirm the final size of the asset
127 /// before it can mark it as cacheable again.
128 PredictedResponseTooLarge,
129 /// Due to internal caching storage error
130 StorageError,
131 /// Due to other types of internal issues
132 InternalError,
133 /// will be cacheable but skip cache admission now
134 ///
135 /// This happens when the cache predictor predicted that this request is not cacheable, but
136 /// the response turns out to be OK to cache. However, it might be too large to re-enable caching
137 /// for this request
138 Deferred,
139 /// Due to the proxy upstream filter declining the current request from going upstream
140 DeclinedToUpstream,
141 /// Due to the upstream being unreachable or otherwise erroring during proxying
142 UpstreamError,
143 /// The writer of the cache lock sees that the request is not cacheable (Could be OriginNotCache)
144 CacheLockGiveUp,
145 /// This request waited too long for the writer of the cache lock to finish, so this request will
146 /// fetch from the origin without caching
147 CacheLockTimeout,
148 /// Other custom defined reasons
149 Custom(&'static str),
150}
151
152impl NoCacheReason {
153 /// Convert [NoCacheReason] as `str`, for logging and debugging.
154 pub fn as_str(&self) -> &'static str {
155 use NoCacheReason::*;
156 match self {
157 NeverEnabled => "NeverEnabled",
158 OriginNotCache => "OriginNotCache",
159 ResponseTooLarge => "ResponseTooLarge",
160 PredictedResponseTooLarge => "PredictedResponseTooLarge",
161 StorageError => "StorageError",
162 InternalError => "InternalError",
163 Deferred => "Deferred",
164 DeclinedToUpstream => "DeclinedToUpstream",
165 UpstreamError => "UpstreamError",
166 CacheLockGiveUp => "CacheLockGiveUp",
167 CacheLockTimeout => "CacheLockTimeout",
168 Custom(s) => s,
169 }
170 }
171}
172
173/// Information collected about the caching operation that will not be cleared
174#[derive(Debug, Default)]
175pub struct HttpCacheDigest {
176 pub lock_duration: Option<Duration>,
177 // time spent in cache lookup and reading the header
178 pub lookup_duration: Option<Duration>,
179}
180
181/// Convenience function to add a duration to an optional duration
182fn add_duration_to_opt(target_opt: &mut Option<Duration>, to_add: Duration) {
183 *target_opt = Some(target_opt.map_or(to_add, |existing| existing + to_add));
184}
185
186impl HttpCacheDigest {
187 fn add_lookup_duration(&mut self, extra_lookup_duration: Duration) {
188 add_duration_to_opt(&mut self.lookup_duration, extra_lookup_duration)
189 }
190
191 fn add_lock_duration(&mut self, extra_lock_duration: Duration) {
192 add_duration_to_opt(&mut self.lock_duration, extra_lock_duration)
193 }
194}
195
196/// Response cacheable decision
197///
198///
199#[derive(Debug)]
200pub enum RespCacheable {
201 Cacheable(CacheMeta),
202 Uncacheable(NoCacheReason),
203}
204
205impl RespCacheable {
206 /// Whether it is cacheable
207 #[inline]
208 pub fn is_cacheable(&self) -> bool {
209 matches!(*self, Self::Cacheable(_))
210 }
211
212 /// Unwrap [RespCacheable] to get the [CacheMeta] stored
213 /// # Panic
214 /// Panic when this object is not cacheable. Check [Self::is_cacheable()] first.
215 pub fn unwrap_meta(self) -> CacheMeta {
216 match self {
217 Self::Cacheable(meta) => meta,
218 Self::Uncacheable(_) => panic!("expected Cacheable value"),
219 }
220 }
221}
222
223/// Indicators of which level of cache freshness logic to force apply to an asset.
224///
225/// For example, should an existing fresh asset be revalidated or re-retrieved altogether.
226#[derive(Debug, Clone, Copy, PartialEq, Eq)]
227pub enum ForcedFreshness {
228 /// Indicates the asset should be considered stale and revalidated
229 ForceExpired,
230
231 /// Indicates the asset should be considered absent and treated like a miss
232 /// instead of a hit
233 ForceMiss,
234
235 /// Indicates the asset should be considered fresh despite possibly being stale
236 ForceFresh,
237}
238
239/// Freshness state of cache hit asset
240///
241///
242#[derive(Debug, Copy, Clone, IntoStaticStr, PartialEq, Eq)]
243#[strum(serialize_all = "snake_case")]
244pub enum HitStatus {
245 /// The asset's freshness directives indicate it has expired
246 Expired,
247
248 /// The asset was marked as expired, and should be treated as stale
249 ForceExpired,
250
251 /// The asset was marked as absent, and should be treated as a miss
252 ForceMiss,
253
254 /// An error occurred while processing the asset, so it should be treated as
255 /// a miss
256 FailedHitFilter,
257
258 /// The asset is not expired
259 Fresh,
260
261 /// Asset exists but is expired, forced to be a hit
262 ForceFresh,
263}
264
265impl HitStatus {
266 /// For displaying cache hit status
267 pub fn as_str(&self) -> &'static str {
268 self.into()
269 }
270
271 /// Whether cached asset can be served as fresh
272 pub fn is_fresh(&self) -> bool {
273 *self == HitStatus::Fresh || *self == HitStatus::ForceFresh
274 }
275
276 /// Check whether the hit status should be treated as a miss. A forced miss
277 /// is obviously treated as a miss. A hit-filter failure is treated as a
278 /// miss because we can't use the asset as an actual hit. If we treat it as
279 /// expired, we still might not be able to use it even if revalidation
280 /// succeeds.
281 pub fn is_treated_as_miss(self) -> bool {
282 matches!(self, HitStatus::ForceMiss | HitStatus::FailedHitFilter)
283 }
284}
285
286pub struct LockCtx {
287 pub lock: Option<Locked>,
288 pub cache_lock: &'static CacheKeyLockImpl,
289 pub wait_timeout: Option<Duration>,
290}
291
292// Fields like storage handlers that are needed only when cache is enabled (or bypassing).
293struct HttpCacheInnerEnabled {
294 pub meta: Option<CacheMeta>,
295 // when set, even if an asset exists, it would only be considered valid after this timestamp
296 pub valid_after: Option<SystemTime>,
297 pub miss_handler: Option<MissHandler>,
298 pub body_reader: Option<HitHandler>,
299 pub storage: &'static (dyn storage::Storage + Sync), // static for now
300 pub eviction: Option<&'static (dyn eviction::EvictionManager + Sync)>,
301 pub lock_ctx: Option<LockCtx>,
302 pub traces: trace::CacheTraceCTX,
303}
304
305struct HttpCacheInner {
306 // Prefer adding fields to InnerEnabled if possible, these fields are released
307 // when cache is disabled.
308 // If fields are needed after cache disablement, add directly to Inner.
309 pub enabled_ctx: Option<Box<HttpCacheInnerEnabled>>,
310 pub key: Option<CacheKey>,
311 // when set, an asset will be rejected from the cache if it exceeds configured size in bytes
312 pub max_file_size_tracker: Option<MaxFileSizeTracker>,
313 pub predictor: Option<&'static (dyn predictor::CacheablePredictor + Sync)>,
314}
315
316#[derive(Debug, Default)]
317#[non_exhaustive]
318pub struct CacheOptionOverrides {
319 pub wait_timeout: Option<Duration>,
320}
321
322impl HttpCache {
323 /// Create a new [HttpCache].
324 ///
325 /// Caching is not enabled by default.
326 pub fn new() -> Self {
327 HttpCache {
328 phase: CachePhase::Disabled(NoCacheReason::NeverEnabled),
329 inner: None,
330 digest: HttpCacheDigest::default(),
331 }
332 }
333
334 /// Whether the cache is enabled
335 pub fn enabled(&self) -> bool {
336 !matches!(self.phase, CachePhase::Disabled(_) | CachePhase::Bypass)
337 }
338
339 /// Whether the cache is being bypassed
340 pub fn bypassing(&self) -> bool {
341 matches!(self.phase, CachePhase::Bypass)
342 }
343
344 /// Return the [CachePhase]
345 pub fn phase(&self) -> CachePhase {
346 self.phase
347 }
348
349 /// Whether anything was fetched from the upstream
350 ///
351 /// This essentially checks all possible [CachePhase] who need to contact the upstream server
352 pub fn upstream_used(&self) -> bool {
353 use CachePhase::*;
354 match self.phase {
355 Disabled(_) | Bypass | Miss | Expired | Revalidated | RevalidatedNoCache(_) => true,
356 Hit | Stale | StaleUpdating => false,
357 Uninit | CacheKey => false, // invalid states for this call, treat them as false to keep it simple
358 }
359 }
360
361 /// Check whether the backend storage is the type `T`.
362 pub fn storage_type_is<T: 'static>(&self) -> bool {
363 self.inner
364 .as_ref()
365 .and_then(|inner| {
366 inner
367 .enabled_ctx
368 .as_ref()
369 .and_then(|ie| ie.storage.as_any().downcast_ref::<T>())
370 })
371 .is_some()
372 }
373
374 /// Release the cache lock if the current request is a cache writer.
375 ///
376 /// Generally callers should prefer using `disable` when a cache lock should be released
377 /// due to an error to clear all cache context. This function is for releasing the cache lock
378 /// while still keeping the cache around for reading, e.g. when serving stale.
379 pub fn release_write_lock(&mut self, reason: NoCacheReason) {
380 use NoCacheReason::*;
381 if let Some(inner) = self.inner.as_mut() {
382 if let Some(lock_ctx) = inner
383 .enabled_ctx
384 .as_mut()
385 .and_then(|ie| ie.lock_ctx.as_mut())
386 {
387 let lock = lock_ctx.lock.take();
388 if let Some(Locked::Write(permit)) = lock {
389 let lock_status = match reason {
390 // let the next request try to fetch it
391 InternalError | StorageError | Deferred | UpstreamError => {
392 LockStatus::TransientError
393 }
394 // depends on why the proxy upstream filter declined the request,
395 // for now still allow next request try to acquire to avoid thundering herd
396 DeclinedToUpstream => LockStatus::TransientError,
397 // no need for the lock anymore
398 OriginNotCache | ResponseTooLarge | PredictedResponseTooLarge => {
399 LockStatus::GiveUp
400 }
401 Custom(reason) => lock_ctx.cache_lock.custom_lock_status(reason),
402 // should never happen, NeverEnabled shouldn't hold a lock
403 NeverEnabled => panic!("NeverEnabled holds a write lock"),
404 CacheLockGiveUp | CacheLockTimeout => {
405 panic!("CacheLock* are for cache lock readers only")
406 }
407 };
408 lock_ctx
409 .cache_lock
410 .release(inner.key.as_ref().unwrap(), permit, lock_status);
411 }
412 }
413 }
414 }
415
416 /// Disable caching
417 pub fn disable(&mut self, reason: NoCacheReason) {
418 // XXX: compile type enforce?
419 assert!(
420 reason != NoCacheReason::NeverEnabled,
421 "NeverEnabled not allowed as a disable reason"
422 );
423 match self.phase {
424 CachePhase::Disabled(old_reason) => {
425 // replace reason
426 if old_reason == NoCacheReason::NeverEnabled {
427 // safeguard, don't allow replacing NeverEnabled as a reason
428 // TODO: can be promoted to assertion once confirmed nothing is attempting this
429 warn!("Tried to replace cache NeverEnabled with reason: {reason:?}");
430 return;
431 }
432 self.phase = CachePhase::Disabled(reason);
433 }
434 _ => {
435 self.phase = CachePhase::Disabled(reason);
436 self.release_write_lock(reason);
437 // enabled_ctx will be cleared out
438 let mut inner_enabled = self
439 .inner_mut()
440 .enabled_ctx
441 .take()
442 .expect("could remove enabled_ctx on disable");
443 // log initial disable reason
444 inner_enabled
445 .traces
446 .cache_span
447 .set_tag(|| trace::Tag::new("disable_reason", reason.as_str()));
448 }
449 }
450 }
451
452 /* The following methods panic when they are used in the wrong phase.
453 * This is better than returning errors as such panics are only caused by coding error, which
454 * should be fixed right away. Tokio runtime only crashes the current task instead of the whole
455 * program when these panics happen. */
456
457 /// Set the cache to bypass
458 ///
459 /// # Panic
460 /// This call is only allowed in [CachePhase::CacheKey] phase (before any cache lookup is performed).
461 /// Use it in any other phase will lead to panic.
462 pub fn bypass(&mut self) {
463 match self.phase {
464 CachePhase::CacheKey => {
465 // before cache lookup / found / miss
466 self.phase = CachePhase::Bypass;
467 self.inner_enabled_mut()
468 .traces
469 .cache_span
470 .set_tag(|| trace::Tag::new("bypassed", true));
471 }
472 _ => panic!("wrong phase to bypass HttpCache {:?}", self.phase),
473 }
474 }
475
476 /// Enable the cache
477 ///
478 /// - `storage`: the cache storage backend that implements [storage::Storage]
479 /// - `eviction`: optionally the eviction manager, without it, nothing will be evicted from the storage
480 /// - `predictor`: optionally a cache predictor. The cache predictor predicts whether something is likely
481 /// to be cacheable or not. This is useful because the proxy can apply different types of optimization to
482 /// cacheable and uncacheable requests.
483 /// - `cache_lock`: optionally a cache lock which handles concurrent lookups to the same asset. Without it
484 /// such lookups will all be allowed to fetch the asset independently.
485 pub fn enable(
486 &mut self,
487 storage: &'static (dyn storage::Storage + Sync),
488 eviction: Option<&'static (dyn eviction::EvictionManager + Sync)>,
489 predictor: Option<&'static (dyn predictor::CacheablePredictor + Sync)>,
490 cache_lock: Option<&'static CacheKeyLockImpl>,
491 option_overrides: Option<CacheOptionOverrides>,
492 ) {
493 match self.phase {
494 CachePhase::Disabled(_) => {
495 self.phase = CachePhase::Uninit;
496
497 let lock_ctx = cache_lock.map(|cache_lock| LockCtx {
498 cache_lock,
499 lock: None,
500 wait_timeout: option_overrides
501 .as_ref()
502 .and_then(|overrides| overrides.wait_timeout),
503 });
504
505 self.inner = Some(Box::new(HttpCacheInner {
506 enabled_ctx: Some(Box::new(HttpCacheInnerEnabled {
507 meta: None,
508 valid_after: None,
509 miss_handler: None,
510 body_reader: None,
511 storage,
512 eviction,
513 lock_ctx,
514 traces: CacheTraceCTX::new(),
515 })),
516 key: None,
517 max_file_size_tracker: None,
518 predictor,
519 }));
520 }
521 _ => panic!("Cannot enable already enabled HttpCache {:?}", self.phase),
522 }
523 }
524
525 /// Set the cache lock implementation.
526 /// # Panic
527 /// Must be called before a cache lock is attempted to be acquired,
528 /// i.e. in the `cache_key_callback` or `cache_hit_filter` phases.
529 pub fn set_cache_lock(
530 &mut self,
531 cache_lock: Option<&'static CacheKeyLockImpl>,
532 option_overrides: Option<CacheOptionOverrides>,
533 ) {
534 match self.phase {
535 CachePhase::Disabled(_)
536 | CachePhase::CacheKey
537 | CachePhase::Stale
538 | CachePhase::Hit => {
539 let inner_enabled = self.inner_enabled_mut();
540 if inner_enabled
541 .lock_ctx
542 .as_ref()
543 .is_some_and(|ctx| ctx.lock.is_some())
544 {
545 panic!("lock already set when resetting cache lock")
546 } else {
547 let lock_ctx = cache_lock.map(|cache_lock| LockCtx {
548 cache_lock,
549 lock: None,
550 wait_timeout: option_overrides.and_then(|overrides| overrides.wait_timeout),
551 });
552 inner_enabled.lock_ctx = lock_ctx;
553 }
554 }
555 _ => panic!("wrong phase: {:?}", self.phase),
556 }
557 }
558
559 // Enable distributed tracing
560 pub fn enable_tracing(&mut self, parent_span: trace::Span) {
561 if let Some(inner_enabled) = self.inner.as_mut().and_then(|i| i.enabled_ctx.as_mut()) {
562 inner_enabled.traces.enable(parent_span);
563 }
564 }
565
566 // Get the cache parent tracing span
567 pub fn get_cache_span(&self) -> Option<trace::SpanHandle> {
568 self.inner
569 .as_ref()
570 .and_then(|i| i.enabled_ctx.as_ref().map(|ie| ie.traces.get_cache_span()))
571 }
572
573 // Get the cache `miss` tracing span
574 pub fn get_miss_span(&self) -> Option<trace::SpanHandle> {
575 self.inner
576 .as_ref()
577 .and_then(|i| i.enabled_ctx.as_ref().map(|ie| ie.traces.get_miss_span()))
578 }
579
580 // Get the cache `hit` tracing span
581 pub fn get_hit_span(&self) -> Option<trace::SpanHandle> {
582 self.inner
583 .as_ref()
584 .and_then(|i| i.enabled_ctx.as_ref().map(|ie| ie.traces.get_hit_span()))
585 }
586
587 // shortcut to access inner fields, panic if phase is disabled
588 #[inline]
589 fn inner_enabled_mut(&mut self) -> &mut HttpCacheInnerEnabled {
590 self.inner.as_mut().unwrap().enabled_ctx.as_mut().unwrap()
591 }
592
593 #[inline]
594 fn inner_enabled(&self) -> &HttpCacheInnerEnabled {
595 self.inner.as_ref().unwrap().enabled_ctx.as_ref().unwrap()
596 }
597
598 // shortcut to access inner fields, panic if cache was never enabled
599 #[inline]
600 fn inner_mut(&mut self) -> &mut HttpCacheInner {
601 self.inner.as_mut().unwrap()
602 }
603
604 #[inline]
605 fn inner(&self) -> &HttpCacheInner {
606 self.inner.as_ref().unwrap()
607 }
608
609 /// Set the cache key
610 /// # Panic
611 /// Cache key is only allowed to be set in its own phase. Set it in other phases will cause panic.
612 pub fn set_cache_key(&mut self, key: CacheKey) {
613 match self.phase {
614 CachePhase::Uninit | CachePhase::CacheKey => {
615 self.phase = CachePhase::CacheKey;
616 self.inner_mut().key = Some(key);
617 }
618 _ => panic!("wrong phase {:?}", self.phase),
619 }
620 }
621
622 /// Return the cache key used for asset lookup
623 /// # Panic
624 /// Can only be called after the cache key is set and the cache is not disabled. Panic otherwise.
625 pub fn cache_key(&self) -> &CacheKey {
626 match self.phase {
627 CachePhase::Disabled(NoCacheReason::NeverEnabled) | CachePhase::Uninit => {
628 panic!("wrong phase {:?}", self.phase)
629 }
630 _ => self
631 .inner()
632 .key
633 .as_ref()
634 .expect("cache key should be set (set_cache_key not called?)"),
635 }
636 }
637
638 /// Return the max size allowed to be cached.
639 pub fn max_file_size_bytes(&self) -> Option<usize> {
640 assert!(
641 !matches!(
642 self.phase,
643 CachePhase::Disabled(NoCacheReason::NeverEnabled)
644 ),
645 "tried to access max file size bytes when cache never enabled"
646 );
647 self.inner()
648 .max_file_size_tracker
649 .as_ref()
650 .map(|t| t.max_file_size_bytes())
651 }
652
653 /// Set the maximum response _body_ size in bytes that will be admitted to the cache.
654 ///
655 /// Response header size should not contribute to the max file size.
656 ///
657 /// To track body bytes, call `track_bytes_for_max_file_size`.
658 pub fn set_max_file_size_bytes(&mut self, max_file_size_bytes: usize) {
659 match self.phase {
660 CachePhase::Disabled(_) => panic!("wrong phase {:?}", self.phase),
661 _ => {
662 self.inner_mut().max_file_size_tracker =
663 Some(MaxFileSizeTracker::new(max_file_size_bytes));
664 }
665 }
666 }
667
668 /// Record body bytes for the max file size tracker.
669 ///
670 /// The `bytes_len` input contributes to a cumulative body byte tracker.
671 ///
672 /// Once the cumulative body bytes exceeds the maximum allowable cache file size (as configured
673 /// by `set_max_file_size_bytes`), then the return value will be false.
674 ///
675 /// Else the return value is true as long as the max file size is not exceeded.
676 /// If max file size was not configured, the return value is always true.
677 pub fn track_body_bytes_for_max_file_size(&mut self, bytes_len: usize) -> bool {
678 // This is intended to be callable when cache has already been disabled,
679 // so that we can re-mark an asset as cacheable if the body size is under limits.
680 assert!(
681 !matches!(
682 self.phase,
683 CachePhase::Disabled(NoCacheReason::NeverEnabled)
684 ),
685 "tried to access max file size bytes when cache never enabled"
686 );
687 self.inner_mut()
688 .max_file_size_tracker
689 .as_mut()
690 .is_none_or(|t| t.add_body_bytes(bytes_len))
691 }
692
693 /// Check if the max file size has been exceeded according to max file size tracker.
694 ///
695 /// Return true if max file size was exceeded.
696 pub fn exceeded_max_file_size(&self) -> bool {
697 assert!(
698 !matches!(
699 self.phase,
700 CachePhase::Disabled(NoCacheReason::NeverEnabled)
701 ),
702 "tried to access max file size bytes when cache never enabled"
703 );
704 self.inner()
705 .max_file_size_tracker
706 .as_ref()
707 .is_some_and(|t| !t.allow_caching())
708 }
709
710 /// Set that cache is found in cache storage.
711 ///
712 /// This function is called after [Self::cache_lookup()] which returns the [CacheMeta] and
713 /// [HitHandler].
714 ///
715 /// The `hit_status` enum allows the caller to force expire assets.
716 pub fn cache_found(&mut self, meta: CacheMeta, hit_handler: HitHandler, hit_status: HitStatus) {
717 // Stale allowed because of cache lock and then retry
718 if !matches!(self.phase, CachePhase::CacheKey | CachePhase::Stale) {
719 panic!("wrong phase {:?}", self.phase)
720 }
721
722 self.phase = match hit_status {
723 HitStatus::Fresh | HitStatus::ForceFresh => CachePhase::Hit,
724 HitStatus::Expired | HitStatus::ForceExpired => CachePhase::Stale,
725 HitStatus::FailedHitFilter | HitStatus::ForceMiss => self.phase,
726 };
727
728 let phase = self.phase;
729 let inner = self.inner_mut();
730
731 let key = inner.key.as_ref().expect("key must be set on hit");
732 let inner_enabled = inner
733 .enabled_ctx
734 .as_mut()
735 .expect("cache_found must be called while cache enabled");
736
737 // The cache lock might not be set for stale hit or hits treated as
738 // misses, so we need to initialize it here
739 let stale = phase == CachePhase::Stale;
740 if stale || hit_status.is_treated_as_miss() {
741 if let Some(lock_ctx) = inner_enabled.lock_ctx.as_mut() {
742 lock_ctx.lock = Some(lock_ctx.cache_lock.lock(key, stale));
743 }
744 }
745
746 if hit_status.is_treated_as_miss() {
747 // Clear the body and meta for hits that are treated as misses
748 inner_enabled.body_reader = None;
749 inner_enabled.meta = None;
750 } else {
751 // Set the metadata appropriately for legit hits
752 inner_enabled.traces.start_hit_span(phase, hit_status);
753 inner_enabled.traces.log_meta_in_hit_span(&meta);
754 if let Some(eviction) = inner_enabled.eviction {
755 // TODO: make access() accept CacheKey
756 let cache_key = key.to_compact();
757 if hit_handler.should_count_access() {
758 let size = hit_handler.get_eviction_weight();
759 eviction.access(&cache_key, size, meta.0.internal.fresh_until);
760 }
761 }
762 inner_enabled.meta = Some(meta);
763 inner_enabled.body_reader = Some(hit_handler);
764 }
765 }
766
767 /// Mark `self` to be cache miss.
768 ///
769 /// This function is called after [Self::cache_lookup()] finds nothing or the caller decides
770 /// not to use the assets found.
771 /// # Panic
772 /// Panic in other phases.
773 pub fn cache_miss(&mut self) {
774 match self.phase {
775 // from CacheKey: set state to miss during cache lookup
776 // from Bypass: response became cacheable, set state to miss to cache
777 // from Stale: waited for cache lock, then retried and found asset was gone
778 CachePhase::CacheKey | CachePhase::Bypass | CachePhase::Stale => {
779 self.phase = CachePhase::Miss;
780 // It's possible that we've set the meta on lookup and have come back around
781 // here after not being able to acquire the cache lock, and our item has since
782 // purged or expired. We should be sure that the meta is not set in this case
783 // as there shouldn't be a meta set for cache misses.
784 self.inner_enabled_mut().meta = None;
785 self.inner_enabled_mut().traces.start_miss_span();
786 }
787 _ => panic!("wrong phase {:?}", self.phase),
788 }
789 }
790
791 /// Return the [HitHandler]
792 /// # Panic
793 /// Call this after [Self::cache_found()], panic in other phases.
794 pub fn hit_handler(&mut self) -> &mut HitHandler {
795 match self.phase {
796 CachePhase::Hit
797 | CachePhase::Stale
798 | CachePhase::StaleUpdating
799 | CachePhase::Revalidated
800 | CachePhase::RevalidatedNoCache(_) => {
801 self.inner_enabled_mut().body_reader.as_mut().unwrap()
802 }
803 _ => panic!("wrong phase {:?}", self.phase),
804 }
805 }
806
807 /// Return the body reader during a cache admission (miss/expired) which decouples the downstream
808 /// read and upstream cache write
809 pub fn miss_body_reader(&mut self) -> Option<&mut HitHandler> {
810 match self.phase {
811 CachePhase::Miss | CachePhase::Expired => {
812 let inner_enabled = self.inner_enabled_mut();
813 if inner_enabled.storage.support_streaming_partial_write() {
814 inner_enabled.body_reader.as_mut()
815 } else {
816 // body_reader could be set even when the storage doesn't support streaming
817 // Expired cache would have the reader set.
818 None
819 }
820 }
821 _ => None,
822 }
823 }
824
825 /// Return whether the underlying storage backend supports streaming partial write.
826 ///
827 /// Returns None if cache is not enabled.
828 pub fn support_streaming_partial_write(&self) -> Option<bool> {
829 self.inner.as_ref().and_then(|inner| {
830 inner
831 .enabled_ctx
832 .as_ref()
833 .map(|c| c.storage.support_streaming_partial_write())
834 })
835 }
836
837 /// Call this when cache hit is fully read.
838 ///
839 /// This call will release resource if any and log the timing in tracing if set.
840 /// # Panic
841 /// Panic in phases where there is no cache hit.
842 pub async fn finish_hit_handler(&mut self) -> Result<()> {
843 match self.phase {
844 CachePhase::Hit
845 | CachePhase::Miss
846 | CachePhase::Expired
847 | CachePhase::Stale
848 | CachePhase::StaleUpdating
849 | CachePhase::Revalidated
850 | CachePhase::RevalidatedNoCache(_) => {
851 let inner = self.inner_mut();
852 let inner_enabled = inner.enabled_ctx.as_mut().expect("cache enabled");
853 if inner_enabled.body_reader.is_none() {
854 // already finished, we allow calling this function more than once
855 return Ok(());
856 }
857 let body_reader = inner_enabled.body_reader.take().unwrap();
858 let key = inner.key.as_ref().unwrap();
859 let result = body_reader
860 .finish(
861 inner_enabled.storage,
862 key,
863 &inner_enabled.traces.hit_span.handle(),
864 )
865 .await;
866 inner_enabled.traces.finish_hit_span();
867 result
868 }
869 _ => panic!("wrong phase {:?}", self.phase),
870 }
871 }
872
873 /// Set the [MissHandler] according to cache_key and meta, can only call once
874 pub async fn set_miss_handler(&mut self) -> Result<()> {
875 match self.phase {
876 // set_miss_handler() needs to be called after set_cache_meta() (which change Stale to Expire).
877 // This is an artificial rule to enforce the state transitions
878 CachePhase::Miss | CachePhase::Expired => {
879 let inner = self.inner_mut();
880 let inner_enabled = inner
881 .enabled_ctx
882 .as_mut()
883 .expect("cache enabled on miss and expired");
884 if inner_enabled.miss_handler.is_some() {
885 panic!("write handler is already set")
886 }
887 let meta = inner_enabled.meta.as_ref().unwrap();
888 let key = inner.key.as_ref().unwrap();
889 let miss_handler = inner_enabled
890 .storage
891 .get_miss_handler(key, meta, &inner_enabled.traces.get_miss_span())
892 .await?;
893
894 inner_enabled.miss_handler = Some(miss_handler);
895
896 if inner_enabled.storage.support_streaming_partial_write() {
897 // If a reader can access partial write, the cache lock can be released here
898 // to let readers start reading the body.
899 if let Some(lock_ctx) = inner_enabled.lock_ctx.as_mut() {
900 let lock = lock_ctx.lock.take();
901 if let Some(Locked::Write(permit)) = lock {
902 lock_ctx.cache_lock.release(key, permit, LockStatus::Done);
903 }
904 }
905 // Downstream read and upstream write can be decoupled
906 let body_reader = inner_enabled
907 .storage
908 .lookup_streaming_write(
909 key,
910 inner_enabled
911 .miss_handler
912 .as_ref()
913 .expect("miss handler already set")
914 .streaming_write_tag(),
915 &inner_enabled.traces.get_miss_span(),
916 )
917 .await?;
918
919 if let Some((_meta, body_reader)) = body_reader {
920 inner_enabled.body_reader = Some(body_reader);
921 } else {
922 // body_reader should exist now because streaming_partial_write is to support it
923 panic!("unable to get body_reader for {:?}", meta);
924 }
925 }
926 Ok(())
927 }
928 _ => panic!("wrong phase {:?}", self.phase),
929 }
930 }
931
932 /// Return the [MissHandler] to write the response body to cache.
933 ///
934 /// `None`: the handler has not been set or already finished
935 pub fn miss_handler(&mut self) -> Option<&mut MissHandler> {
936 match self.phase {
937 CachePhase::Miss | CachePhase::Expired => {
938 self.inner_enabled_mut().miss_handler.as_mut()
939 }
940 _ => panic!("wrong phase {:?}", self.phase),
941 }
942 }
943
944 /// Finish cache admission
945 ///
946 /// If [self] is dropped without calling this, the cache admission is considered incomplete and
947 /// should be cleaned up.
948 ///
949 /// This call will also trigger eviction if set.
950 pub async fn finish_miss_handler(&mut self) -> Result<()> {
951 match self.phase {
952 CachePhase::Miss | CachePhase::Expired => {
953 let inner = self.inner_mut();
954 let inner_enabled = inner
955 .enabled_ctx
956 .as_mut()
957 .expect("cache enabled on miss and expired");
958 if inner_enabled.miss_handler.is_none() {
959 // already finished, we allow calling this function more than once
960 return Ok(());
961 }
962 let miss_handler = inner_enabled.miss_handler.take().unwrap();
963 let size = miss_handler.finish().await?;
964 let key = inner
965 .key
966 .as_ref()
967 .expect("key set by miss or expired phase");
968 if let Some(lock_ctx) = inner_enabled.lock_ctx.as_mut() {
969 let lock = lock_ctx.lock.take();
970 if let Some(Locked::Write(permit)) = lock {
971 // no need to call r.unlock() because release() will call it
972 // r is a guard to make sure the lock is unlocked when this request is dropped
973 lock_ctx.cache_lock.release(key, permit, LockStatus::Done);
974 }
975 }
976 if let Some(eviction) = inner_enabled.eviction {
977 let cache_key = key.to_compact();
978 let meta = inner_enabled.meta.as_ref().unwrap();
979 let evicted = match size {
980 MissFinishType::Created(size) => {
981 eviction.admit(cache_key, size, meta.0.internal.fresh_until)
982 }
983 MissFinishType::Appended(size, max_size) => {
984 eviction.increment_weight(&cache_key, size, max_size)
985 }
986 };
987 // actual eviction can be done async
988 let span = inner_enabled.traces.child("eviction");
989 let handle = span.handle();
990 let storage = inner_enabled.storage;
991 tokio::task::spawn(async move {
992 for item in evicted {
993 if let Err(e) = storage.purge(&item, PurgeType::Eviction, &handle).await
994 {
995 warn!("Failed to purge {item} during eviction for finish miss handler: {e}");
996 }
997 }
998 });
999 }
1000 inner_enabled.traces.finish_miss_span();
1001 Ok(())
1002 }
1003 _ => panic!("wrong phase {:?}", self.phase),
1004 }
1005 }
1006
1007 /// Set the [CacheMeta] of the cache
1008 pub fn set_cache_meta(&mut self, meta: CacheMeta) {
1009 match self.phase {
1010 // TODO: store the staled meta somewhere else for future use?
1011 CachePhase::Stale | CachePhase::Miss => {
1012 let inner_enabled = self.inner_enabled_mut();
1013 // TODO: have a separate expired span?
1014 inner_enabled.traces.log_meta_in_miss_span(&meta);
1015 inner_enabled.meta = Some(meta);
1016 }
1017 _ => panic!("wrong phase {:?}", self.phase),
1018 }
1019 if self.phase == CachePhase::Stale {
1020 self.phase = CachePhase::Expired;
1021 }
1022 }
1023
1024 /// Set the [CacheMeta] of the cache after revalidation.
1025 ///
1026 /// Certain info such as the original cache admission time will be preserved. Others will
1027 /// be replaced by the input `meta`.
1028 pub async fn revalidate_cache_meta(&mut self, mut meta: CacheMeta) -> Result<bool> {
1029 let result = match self.phase {
1030 CachePhase::Stale => {
1031 let inner = self.inner_mut();
1032 let inner_enabled = inner
1033 .enabled_ctx
1034 .as_mut()
1035 .expect("stale phase has cache enabled");
1036 // TODO: we should keep old meta in place, just use new one to update it
1037 // that requires cacheable_filter to take a mut header and just return InternalMeta
1038
1039 // update new meta with old meta's created time
1040 let old_meta = inner_enabled.meta.take().unwrap();
1041 let created = old_meta.0.internal.created;
1042 meta.0.internal.created = created;
1043 // meta.internal.updated was already set to new meta's `created`,
1044 // no need to set `updated` here
1045 // Merge old extensions with new ones. New exts take precedence if they conflict.
1046 let mut extensions = old_meta.0.extensions;
1047 extensions.extend(meta.0.extensions);
1048 meta.0.extensions = extensions;
1049
1050 inner_enabled.meta.replace(meta);
1051
1052 let mut span = inner_enabled.traces.child("update_meta");
1053 let result = inner_enabled
1054 .storage
1055 .update_meta(
1056 inner.key.as_ref().unwrap(),
1057 inner_enabled.meta.as_ref().unwrap(),
1058 &span.handle(),
1059 )
1060 .await;
1061 span.set_tag(|| trace::Tag::new("updated", result.is_ok()));
1062
1063 // regardless of result, release the cache lock
1064 if let Some(lock_ctx) = inner_enabled.lock_ctx.as_mut() {
1065 let lock = lock_ctx.lock.take();
1066 if let Some(Locked::Write(permit)) = lock {
1067 lock_ctx.cache_lock.release(
1068 inner.key.as_ref().expect("key set by stale phase"),
1069 permit,
1070 LockStatus::Done,
1071 );
1072 }
1073 }
1074
1075 result
1076 }
1077 _ => panic!("wrong phase {:?}", self.phase),
1078 };
1079 self.phase = CachePhase::Revalidated;
1080 result
1081 }
1082
1083 /// After a successful revalidation, update certain headers for the cached asset
1084 /// such as `Etag` with the fresh response header `resp`.
1085 pub fn revalidate_merge_header(&mut self, resp: &RespHeader) -> ResponseHeader {
1086 match self.phase {
1087 CachePhase::Stale => {
1088 /*
1089 * https://datatracker.ietf.org/doc/html/rfc9110#section-15.4.5
1090 * 304 response MUST generate ... would have been sent in a 200 ...
1091 * - Content-Location, Date, ETag, and Vary
1092 * - Cache-Control and Expires...
1093 */
1094 let mut old_header = self.inner_enabled().meta.as_ref().unwrap().0.header.clone();
1095 let mut clone_header = |header_name: &'static str| {
1096 for (i, value) in resp.headers.get_all(header_name).iter().enumerate() {
1097 if i == 0 {
1098 old_header
1099 .insert_header(header_name, value)
1100 .expect("can add valid header");
1101 } else {
1102 old_header
1103 .append_header(header_name, value)
1104 .expect("can add valid header");
1105 }
1106 }
1107 };
1108 clone_header("cache-control");
1109 clone_header("expires");
1110 clone_header("cache-tag");
1111 clone_header("cdn-cache-control");
1112 clone_header("etag");
1113 // https://datatracker.ietf.org/doc/html/rfc9111#section-4.3.4
1114 // "...cache MUST update its header fields with the header fields provided in the 304..."
1115 // But if the Vary header changes, the cached response may no longer match the
1116 // incoming request.
1117 //
1118 // For simplicity, ignore changing Vary in revalidation for now.
1119 // TODO: if we support vary during revalidation, there are a few edge cases to
1120 // consider (what if Vary header appears/disappears/changes)?
1121 //
1122 // clone_header("vary");
1123 old_header
1124 }
1125 _ => panic!("wrong phase {:?}", self.phase),
1126 }
1127 }
1128
1129 /// Mark this asset uncacheable after revalidation
1130 pub fn revalidate_uncacheable(&mut self, header: ResponseHeader, reason: NoCacheReason) {
1131 match self.phase {
1132 CachePhase::Stale => {
1133 // replace cache meta header
1134 self.inner_enabled_mut().meta.as_mut().unwrap().0.header = header;
1135 // upstream request done, release write lock
1136 self.release_write_lock(reason);
1137 }
1138 _ => panic!("wrong phase {:?}", self.phase),
1139 }
1140 self.phase = CachePhase::RevalidatedNoCache(reason);
1141 // TODO: remove this asset from cache once finished?
1142 }
1143
1144 /// Mark this asset as stale, but being updated separately from this request.
1145 pub fn set_stale_updating(&mut self) {
1146 match self.phase {
1147 CachePhase::Stale => self.phase = CachePhase::StaleUpdating,
1148 _ => panic!("wrong phase {:?}", self.phase),
1149 }
1150 }
1151
1152 /// Update the variance of the [CacheMeta].
1153 ///
1154 /// Note that this process may change the lookup `key`, and eventually (when the asset is
1155 /// written to storage) invalidate other cached variants under the same primary key as the
1156 /// current asset.
1157 pub fn update_variance(&mut self, variance: Option<HashBinary>) {
1158 // If this is a cache miss, we will simply update the variance in the meta.
1159 //
1160 // If this is an expired response, we will have to consider a few cases:
1161 //
1162 // **Case 1**: Variance was absent, but caller sets it now.
1163 // We will just insert it into the meta. The current asset becomes the primary variant.
1164 // Because the current location of the asset is already the primary variant, nothing else
1165 // needs to be done.
1166 //
1167 // **Case 2**: Variance was present, but it changed or was removed.
1168 // We want the current asset to take over the primary slot, in order to invalidate all
1169 // other variants derived under the old Vary.
1170 //
1171 // **Case 3**: Variance did not change.
1172 // Nothing needs to happen.
1173 let inner = match self.phase {
1174 CachePhase::Miss | CachePhase::Expired => self.inner_mut(),
1175 _ => panic!("wrong phase {:?}", self.phase),
1176 };
1177 let inner_enabled = inner
1178 .enabled_ctx
1179 .as_mut()
1180 .expect("cache enabled on miss and expired");
1181
1182 // Update the variance in the meta
1183 if let Some(variance_hash) = variance.as_ref() {
1184 inner_enabled
1185 .meta
1186 .as_mut()
1187 .unwrap()
1188 .set_variance_key(*variance_hash);
1189 } else {
1190 inner_enabled.meta.as_mut().unwrap().remove_variance();
1191 }
1192
1193 // Change the lookup `key` if necessary, in order to admit asset into the primary slot
1194 // instead of the secondary slot.
1195 let key = inner.key.as_ref().unwrap();
1196 if let Some(old_variance) = key.get_variance_key().as_ref() {
1197 // This is a secondary variant slot.
1198 if Some(*old_variance) != variance.as_ref() {
1199 // This new variance does not match the variance in the cache key we used to look
1200 // up this asset.
1201 // Drop the cache lock to avoid leaving a dangling lock
1202 // (because we locked with the old cache key for the secondary slot)
1203 // TODO: maybe we should try to signal waiting readers to compete for the primary key
1204 // lock instead? we will not be modifying this secondary slot so it's not actually
1205 // ready for readers
1206 if let Some(lock_ctx) = inner_enabled.lock_ctx.as_mut() {
1207 if let Some(Locked::Write(permit)) = lock_ctx.lock.take() {
1208 lock_ctx.cache_lock.release(key, permit, LockStatus::Done);
1209 }
1210 }
1211 // Remove the `variance` from the `key`, so that we admit this asset into the
1212 // primary slot. (`key` is used to tell storage where to write the data.)
1213 inner.key.as_mut().unwrap().remove_variance_key();
1214 }
1215 }
1216 }
1217
1218 /// Return the [CacheMeta] of this asset
1219 ///
1220 /// # Panic
1221 /// Panic in phases which has no cache meta.
1222 pub fn cache_meta(&self) -> &CacheMeta {
1223 match self.phase {
1224 // TODO: allow in Bypass phase?
1225 CachePhase::Stale
1226 | CachePhase::StaleUpdating
1227 | CachePhase::Expired
1228 | CachePhase::Hit
1229 | CachePhase::Revalidated
1230 | CachePhase::RevalidatedNoCache(_) => self.inner_enabled().meta.as_ref().unwrap(),
1231 CachePhase::Miss => {
1232 // this is the async body read case, safe because body_reader is only set
1233 // after meta is retrieved
1234 if self.inner_enabled().body_reader.is_some() {
1235 self.inner_enabled().meta.as_ref().unwrap()
1236 } else {
1237 panic!("wrong phase {:?}", self.phase);
1238 }
1239 }
1240
1241 _ => panic!("wrong phase {:?}", self.phase),
1242 }
1243 }
1244
1245 /// Return the [CacheMeta] of this asset if any
1246 ///
1247 /// Different from [Self::cache_meta()], this function is allowed to be called in
1248 /// [CachePhase::Miss] phase where the cache meta maybe set.
1249 /// # Panic
1250 /// Panic in phases that shouldn't have cache meta.
1251 pub fn maybe_cache_meta(&self) -> Option<&CacheMeta> {
1252 match self.phase {
1253 CachePhase::Miss
1254 | CachePhase::Stale
1255 | CachePhase::StaleUpdating
1256 | CachePhase::Expired
1257 | CachePhase::Hit
1258 | CachePhase::Revalidated
1259 | CachePhase::RevalidatedNoCache(_) => self.inner_enabled().meta.as_ref(),
1260 _ => panic!("wrong phase {:?}", self.phase),
1261 }
1262 }
1263
1264 /// Return the [`CacheKey`] of this asset if any.
1265 ///
1266 /// This is allowed to be called in any phase. If the cache key callback was not called,
1267 /// this will return None.
1268 pub fn maybe_cache_key(&self) -> Option<&CacheKey> {
1269 (!matches!(
1270 self.phase(),
1271 CachePhase::Disabled(NoCacheReason::NeverEnabled) | CachePhase::Uninit
1272 ))
1273 .then(|| self.cache_key())
1274 }
1275
1276 /// Perform the cache lookup from the given cache storage with the given cache key
1277 ///
1278 /// A cache hit will return [CacheMeta] which contains the header and meta info about
1279 /// the cache as well as a [HitHandler] to read the cache hit body.
1280 /// # Panic
1281 /// Panic in other phases.
1282 pub async fn cache_lookup(&mut self) -> Result<Option<(CacheMeta, HitHandler)>> {
1283 match self.phase {
1284 // Stale is allowed here because stale-> cache_lock -> lookup again
1285 CachePhase::CacheKey | CachePhase::Stale => {
1286 let inner = self
1287 .inner
1288 .as_mut()
1289 .expect("Cache phase is checked and should have inner");
1290 let inner_enabled = inner
1291 .enabled_ctx
1292 .as_mut()
1293 .expect("Cache enabled on cache_lookup");
1294 let mut span = inner_enabled.traces.child("lookup");
1295 let key = inner.key.as_ref().unwrap(); // safe, this phase should have cache key
1296 let now = Instant::now();
1297 let result = inner_enabled.storage.lookup(key, &span.handle()).await?;
1298 // one request may have multiple lookups
1299 self.digest.add_lookup_duration(now.elapsed());
1300 let result = result.and_then(|(meta, header)| {
1301 if let Some(ts) = inner_enabled.valid_after {
1302 if meta.created() < ts {
1303 span.set_tag(|| trace::Tag::new("not valid", true));
1304 return None;
1305 }
1306 }
1307 Some((meta, header))
1308 });
1309 if result.is_none() {
1310 if let Some(lock_ctx) = inner_enabled.lock_ctx.as_mut() {
1311 lock_ctx.lock = Some(lock_ctx.cache_lock.lock(key, false));
1312 }
1313 }
1314 span.set_tag(|| trace::Tag::new("found", result.is_some()));
1315 Ok(result)
1316 }
1317 _ => panic!("wrong phase {:?}", self.phase),
1318 }
1319 }
1320
1321 /// Update variance and see if the meta matches the current variance
1322 ///
1323 /// `cache_lookup() -> compute vary hash -> cache_vary_lookup()`
1324 /// This function allows callers to compute vary based on the initial cache hit.
1325 /// `meta` should be the ones returned from the initial cache_lookup()
1326 /// - return true if the meta is the variance.
1327 /// - return false if the current meta doesn't match the variance, need to cache_lookup() again
1328 pub fn cache_vary_lookup(&mut self, variance: HashBinary, meta: &CacheMeta) -> bool {
1329 match self.phase {
1330 // Stale is allowed here because stale-> cache_lock -> lookup again
1331 CachePhase::CacheKey | CachePhase::Stale => {
1332 let inner = self.inner_mut();
1333 // make sure that all variances found are fresher than this asset
1334 // this is because when purging all the variance, only the primary slot is deleted
1335 // the created TS of the primary is the tombstone of all the variances
1336 inner
1337 .enabled_ctx
1338 .as_mut()
1339 .expect("cache enabled")
1340 .valid_after = Some(meta.created());
1341
1342 // update vary
1343 let key = inner.key.as_mut().unwrap();
1344 // if no variance was previously set, then this is the first cache hit
1345 let is_initial_cache_hit = key.get_variance_key().is_none();
1346 key.set_variance_key(variance);
1347 let variance_binary = key.variance_bin();
1348 let matches_variance = meta.variance() == variance_binary;
1349
1350 // We should remove the variance in the lookup `key` if this is the primary variant
1351 // slot. We know this is the primary variant slot if this is the initial cache hit,
1352 // AND the variance in the `key` already matches the `meta`'s.
1353 //
1354 // For the primary variant slot, the storage backend needs to use the primary key
1355 // for both cache lookup and updating the meta. Otherwise it will look for the
1356 // asset in the wrong location during revalidation.
1357 //
1358 // We can recreate the "full" cache key by using the meta's variance, if needed.
1359 if matches_variance && is_initial_cache_hit {
1360 inner.key.as_mut().unwrap().remove_variance_key();
1361 }
1362
1363 matches_variance
1364 }
1365 _ => panic!("wrong phase {:?}", self.phase),
1366 }
1367 }
1368
1369 /// Whether this request is behind a cache lock in order to wait for another request to read the
1370 /// asset.
1371 pub fn is_cache_locked(&self) -> bool {
1372 matches!(
1373 self.inner_enabled()
1374 .lock_ctx
1375 .as_ref()
1376 .and_then(|l| l.lock.as_ref()),
1377 Some(Locked::Read(_))
1378 )
1379 }
1380
1381 /// Whether this request is the leader request to fetch the assets for itself and other requests
1382 /// behind the cache lock.
1383 pub fn is_cache_lock_writer(&self) -> bool {
1384 matches!(
1385 self.inner_enabled()
1386 .lock_ctx
1387 .as_ref()
1388 .and_then(|l| l.lock.as_ref()),
1389 Some(Locked::Write(_))
1390 )
1391 }
1392
1393 /// Take the write lock from this request to transfer it to another one.
1394 /// # Panic
1395 /// Call is_cache_lock_writer() to check first, will panic otherwise.
1396 pub fn take_write_lock(&mut self) -> (WritePermit, &'static CacheKeyLockImpl) {
1397 let lock_ctx = self
1398 .inner_enabled_mut()
1399 .lock_ctx
1400 .as_mut()
1401 .expect("take_write_lock() called without cache lock");
1402 let lock = lock_ctx
1403 .lock
1404 .take()
1405 .expect("take_write_lock() called without lock");
1406 match lock {
1407 Locked::Write(w) => (w, lock_ctx.cache_lock),
1408 Locked::Read(_) => panic!("take_write_lock() called on read lock"),
1409 }
1410 }
1411
1412 /// Set the write lock, which is usually transferred from [Self::take_write_lock()]
1413 ///
1414 /// # Panic
1415 /// Panics if cache lock was not originally configured for this request.
1416 // TODO: it may make sense to allow configuring the CacheKeyLock here too that the write permit
1417 // is associated with
1418 // (The WritePermit comes from the CacheKeyLock and should be used when releasing from the CacheKeyLock,
1419 // shouldn't be possible to give a WritePermit to a request using a different CacheKeyLock)
1420 pub fn set_write_lock(&mut self, write_lock: WritePermit) {
1421 if let Some(lock_ctx) = self.inner_enabled_mut().lock_ctx.as_mut() {
1422 lock_ctx.lock.replace(Locked::Write(write_lock));
1423 }
1424 }
1425
1426 /// Whether this request's cache hit is staled
1427 fn has_staled_asset(&self) -> bool {
1428 matches!(self.phase, CachePhase::Stale | CachePhase::StaleUpdating)
1429 }
1430
1431 /// Whether this asset is staled and stale if error is allowed
1432 pub fn can_serve_stale_error(&self) -> bool {
1433 self.has_staled_asset() && self.cache_meta().serve_stale_if_error(SystemTime::now())
1434 }
1435
1436 /// Whether this asset is staled and stale while revalidate is allowed.
1437 pub fn can_serve_stale_updating(&self) -> bool {
1438 self.has_staled_asset()
1439 && self
1440 .cache_meta()
1441 .serve_stale_while_revalidate(SystemTime::now())
1442 }
1443
1444 /// Wait for the cache read lock to be unlocked
1445 /// # Panic
1446 /// Check [Self::is_cache_locked()], panic if this request doesn't have a read lock.
1447 pub async fn cache_lock_wait(&mut self) -> LockStatus {
1448 let inner_enabled = self.inner_enabled_mut();
1449 let mut span = inner_enabled.traces.child("cache_lock");
1450 // should always call is_cache_locked() before this function, which should guarantee that
1451 // the inner cache has a read lock and lock ctx
1452 let (read_lock, status) = if let Some(lock_ctx) = inner_enabled.lock_ctx.as_mut() {
1453 let lock = lock_ctx.lock.take(); // remove the lock from self
1454 if let Some(Locked::Read(r)) = lock {
1455 let now = Instant::now();
1456 // it's possible for a request to be locked more than once,
1457 // so wait the remainder of our configured timeout
1458 let status = if let Some(wait_timeout) = lock_ctx.wait_timeout {
1459 let wait_timeout =
1460 wait_timeout.saturating_sub(self.lock_duration().unwrap_or(Duration::ZERO));
1461 match timeout(wait_timeout, r.wait()).await {
1462 Ok(()) => r.lock_status(),
1463 Err(_) => LockStatus::WaitTimeout,
1464 }
1465 } else {
1466 r.wait().await;
1467 r.lock_status()
1468 };
1469 self.digest.add_lock_duration(now.elapsed());
1470 (r, status)
1471 } else {
1472 panic!("cache_lock_wait on wrong type of lock")
1473 }
1474 } else {
1475 panic!("cache_lock_wait without cache lock")
1476 };
1477 if let Some(lock_ctx) = self.inner_enabled().lock_ctx.as_ref() {
1478 lock_ctx
1479 .cache_lock
1480 .trace_lock_wait(&mut span, &read_lock, status);
1481 }
1482 status
1483 }
1484
1485 /// How long did this request wait behind the read lock
1486 pub fn lock_duration(&self) -> Option<Duration> {
1487 self.digest.lock_duration
1488 }
1489
1490 /// How long did this request spent on cache lookup and reading the header
1491 pub fn lookup_duration(&self) -> Option<Duration> {
1492 self.digest.lookup_duration
1493 }
1494
1495 /// Delete the asset from the cache storage
1496 /// # Panic
1497 /// Need to be called after the cache key is set. Panic otherwise.
1498 pub async fn purge(&self) -> Result<bool> {
1499 match self.phase {
1500 CachePhase::CacheKey => {
1501 let inner = self.inner();
1502 let inner_enabled = self.inner_enabled();
1503 let span = inner_enabled.traces.child("purge");
1504 let key = inner.key.as_ref().unwrap().to_compact();
1505 Self::purge_impl(inner_enabled.storage, inner_enabled.eviction, &key, span).await
1506 }
1507 _ => panic!("wrong phase {:?}", self.phase),
1508 }
1509 }
1510
1511 /// Delete the asset from the cache storage via a spawned task.
1512 /// Returns corresponding `JoinHandle` of that task.
1513 /// # Panic
1514 /// Need to be called after the cache key is set. Panic otherwise.
1515 pub fn spawn_async_purge(
1516 &self,
1517 context: &'static str,
1518 ) -> tokio::task::JoinHandle<Result<bool>> {
1519 if matches!(self.phase, CachePhase::Disabled(_) | CachePhase::Uninit) {
1520 panic!("wrong phase {:?}", self.phase);
1521 }
1522
1523 let inner_enabled = self.inner_enabled();
1524 let span = inner_enabled.traces.child("purge");
1525 let key = self.inner().key.as_ref().unwrap().to_compact();
1526 let storage = inner_enabled.storage;
1527 let eviction = inner_enabled.eviction;
1528 tokio::task::spawn(async move {
1529 Self::purge_impl(storage, eviction, &key, span)
1530 .await
1531 .map_err(|e| {
1532 warn!("Failed to purge {key} (context: {context}): {e}");
1533 e
1534 })
1535 })
1536 }
1537
1538 async fn purge_impl(
1539 storage: &'static (dyn storage::Storage + Sync),
1540 eviction: Option<&'static (dyn eviction::EvictionManager + Sync)>,
1541 key: &CompactCacheKey,
1542 mut span: Span,
1543 ) -> Result<bool> {
1544 let result = storage
1545 .purge(key, PurgeType::Invalidation, &span.handle())
1546 .await;
1547 let purged = matches!(result, Ok(true));
1548 // need to inform eviction manager if asset was removed
1549 if let Some(eviction) = eviction.as_ref() {
1550 if purged {
1551 eviction.remove(key);
1552 }
1553 }
1554 span.set_tag(|| trace::Tag::new("purged", purged));
1555 result
1556 }
1557
1558 /// Check the cacheable prediction
1559 ///
1560 /// Return true if the predictor is not set
1561 pub fn cacheable_prediction(&self) -> bool {
1562 if let Some(predictor) = self.inner().predictor {
1563 predictor.cacheable_prediction(self.cache_key())
1564 } else {
1565 true
1566 }
1567 }
1568
1569 /// Tell the predictor that this response, which is previously predicted to be uncacheable,
1570 /// is cacheable now.
1571 pub fn response_became_cacheable(&self) {
1572 if let Some(predictor) = self.inner().predictor {
1573 predictor.mark_cacheable(self.cache_key());
1574 }
1575 }
1576
1577 /// Tell the predictor that this response is uncacheable so that it will know next time
1578 /// this request arrives.
1579 pub fn response_became_uncacheable(&self, reason: NoCacheReason) {
1580 if let Some(predictor) = self.inner().predictor {
1581 predictor.mark_uncacheable(self.cache_key(), reason);
1582 }
1583 }
1584
1585 /// Tag all spans as being part of a subrequest.
1586 pub fn tag_as_subrequest(&mut self) {
1587 self.inner_enabled_mut()
1588 .traces
1589 .cache_span
1590 .set_tag(|| Tag::new("is_subrequest", true))
1591 }
1592}