pingora_cache/lib.rs
1// Copyright 2025 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The HTTP caching layer for proxies.
16
17#![allow(clippy::new_without_default)]
18
19use cf_rustracing::tag::Tag;
20use http::{method::Method, request::Parts as ReqHeader, response::Parts as RespHeader};
21use key::{CacheHashKey, CompactCacheKey, HashBinary};
22use lock::WritePermit;
23use log::warn;
24use pingora_error::Result;
25use pingora_http::ResponseHeader;
26use pingora_timeout::timeout;
27use std::time::{Duration, Instant, SystemTime};
28use storage::MissFinishType;
29use strum::IntoStaticStr;
30use trace::{CacheTraceCTX, Span};
31
32pub mod cache_control;
33pub mod eviction;
34pub mod filters;
35pub mod hashtable;
36pub mod key;
37pub mod lock;
38pub mod max_file_size;
39mod memory;
40pub mod meta;
41pub mod predictor;
42pub mod put;
43pub mod storage;
44pub mod trace;
45mod variance;
46
47use crate::max_file_size::MaxFileSizeTracker;
48pub use key::CacheKey;
49use lock::{CacheKeyLockImpl, LockStatus, Locked};
50pub use memory::MemCache;
51pub use meta::{set_compression_dict_content, set_compression_dict_path};
52pub use meta::{CacheMeta, CacheMetaDefaults};
53pub use storage::{HitHandler, MissHandler, PurgeType, Storage};
54pub use variance::VarianceBuilder;
55
56pub mod prelude {}
57
58/// The state machine for http caching
59///
60/// This object is used to handle the state and transitions for HTTP caching through the life of a
61/// request.
62pub struct HttpCache {
63 phase: CachePhase,
64 // Box the rest so that a disabled HttpCache struct is small
65 inner: Option<Box<HttpCacheInner>>,
66 digest: HttpCacheDigest,
67}
68
69/// This reflects the phase of HttpCache during the lifetime of a request
70#[derive(Clone, Copy, Debug, PartialEq, Eq)]
71pub enum CachePhase {
72 /// Cache disabled, with reason (NeverEnabled if never explicitly used)
73 Disabled(NoCacheReason),
74 /// Cache enabled but nothing is set yet
75 Uninit,
76 /// Cache was enabled, the request decided not to use it
77 // HttpCache.inner_enabled is kept
78 Bypass,
79 /// Awaiting the cache key to be generated
80 CacheKey,
81 /// Cache hit
82 Hit,
83 /// No cached asset is found
84 Miss,
85 /// A staled (expired) asset is found
86 Stale,
87 /// A staled (expired) asset was found, but another request is revalidating it
88 StaleUpdating,
89 /// A staled (expired) asset was found, so a fresh one was fetched
90 Expired,
91 /// A staled (expired) asset was found, and it was revalidated to be fresh
92 Revalidated,
93 /// Revalidated, but deemed uncacheable, so we do not freshen it
94 RevalidatedNoCache(NoCacheReason),
95}
96
97impl CachePhase {
98 /// Convert [CachePhase] as `str`, for logging and debugging.
99 pub fn as_str(&self) -> &'static str {
100 match self {
101 CachePhase::Disabled(_) => "disabled",
102 CachePhase::Uninit => "uninitialized",
103 CachePhase::Bypass => "bypass",
104 CachePhase::CacheKey => "key",
105 CachePhase::Hit => "hit",
106 CachePhase::Miss => "miss",
107 CachePhase::Stale => "stale",
108 CachePhase::StaleUpdating => "stale-updating",
109 CachePhase::Expired => "expired",
110 CachePhase::Revalidated => "revalidated",
111 CachePhase::RevalidatedNoCache(_) => "revalidated-nocache",
112 }
113 }
114}
115
116/// The possible reasons for not caching
117#[derive(Copy, Clone, Debug, PartialEq, Eq)]
118pub enum NoCacheReason {
119 /// Caching is not enabled to begin with
120 NeverEnabled,
121 /// Origin directives indicated this was not cacheable
122 OriginNotCache,
123 /// Response size was larger than the cache's configured maximum asset size
124 ResponseTooLarge,
125 /// Disabling caching due to unknown body size and previously exceeding maximum asset size;
126 /// the asset is otherwise cacheable, but cache needs to confirm the final size of the asset
127 /// before it can mark it as cacheable again.
128 PredictedResponseTooLarge,
129 /// Due to internal caching storage error
130 StorageError,
131 /// Due to other types of internal issues
132 InternalError,
133 /// will be cacheable but skip cache admission now
134 ///
135 /// This happens when the cache predictor predicted that this request is not cacheable, but
136 /// the response turns out to be OK to cache. However, it might be too large to re-enable caching
137 /// for this request
138 Deferred,
139 /// Due to the proxy upstream filter declining the current request from going upstream
140 DeclinedToUpstream,
141 /// Due to the upstream being unreachable or otherwise erroring during proxying
142 UpstreamError,
143 /// The writer of the cache lock sees that the request is not cacheable (Could be OriginNotCache)
144 CacheLockGiveUp,
145 /// This request waited too long for the writer of the cache lock to finish, so this request will
146 /// fetch from the origin without caching
147 CacheLockTimeout,
148 /// Other custom defined reasons
149 Custom(&'static str),
150}
151
152impl NoCacheReason {
153 /// Convert [NoCacheReason] as `str`, for logging and debugging.
154 pub fn as_str(&self) -> &'static str {
155 use NoCacheReason::*;
156 match self {
157 NeverEnabled => "NeverEnabled",
158 OriginNotCache => "OriginNotCache",
159 ResponseTooLarge => "ResponseTooLarge",
160 PredictedResponseTooLarge => "PredictedResponseTooLarge",
161 StorageError => "StorageError",
162 InternalError => "InternalError",
163 Deferred => "Deferred",
164 DeclinedToUpstream => "DeclinedToUpstream",
165 UpstreamError => "UpstreamError",
166 CacheLockGiveUp => "CacheLockGiveUp",
167 CacheLockTimeout => "CacheLockTimeout",
168 Custom(s) => s,
169 }
170 }
171}
172
173/// Information collected about the caching operation that will not be cleared
174#[derive(Debug, Default)]
175pub struct HttpCacheDigest {
176 pub lock_duration: Option<Duration>,
177 // time spent in cache lookup and reading the header
178 pub lookup_duration: Option<Duration>,
179}
180
181/// Convenience function to add a duration to an optional duration
182fn add_duration_to_opt(target_opt: &mut Option<Duration>, to_add: Duration) {
183 *target_opt = Some(target_opt.map_or(to_add, |existing| existing + to_add));
184}
185
186impl HttpCacheDigest {
187 fn add_lookup_duration(&mut self, extra_lookup_duration: Duration) {
188 add_duration_to_opt(&mut self.lookup_duration, extra_lookup_duration)
189 }
190
191 fn add_lock_duration(&mut self, extra_lock_duration: Duration) {
192 add_duration_to_opt(&mut self.lock_duration, extra_lock_duration)
193 }
194}
195
196/// Response cacheable decision
197///
198///
199#[derive(Debug)]
200pub enum RespCacheable {
201 Cacheable(CacheMeta),
202 Uncacheable(NoCacheReason),
203}
204
205impl RespCacheable {
206 /// Whether it is cacheable
207 #[inline]
208 pub fn is_cacheable(&self) -> bool {
209 matches!(*self, Self::Cacheable(_))
210 }
211
212 /// Unwrap [RespCacheable] to get the [CacheMeta] stored
213 /// # Panic
214 /// Panic when this object is not cacheable. Check [Self::is_cacheable()] first.
215 pub fn unwrap_meta(self) -> CacheMeta {
216 match self {
217 Self::Cacheable(meta) => meta,
218 Self::Uncacheable(_) => panic!("expected Cacheable value"),
219 }
220 }
221}
222
223/// Indicators of which level of purge logic to apply to an asset. As in should
224/// the purged file be revalidated or re-retrieved altogether
225#[derive(Debug, Clone, Copy, PartialEq, Eq)]
226pub enum ForcedInvalidationKind {
227 /// Indicates the asset should be considered stale and revalidated
228 ForceExpired,
229
230 /// Indicates the asset should be considered absent and treated like a miss
231 /// instead of a hit
232 ForceMiss,
233}
234
235/// Freshness state of cache hit asset
236///
237///
238#[derive(Debug, Copy, Clone, IntoStaticStr, PartialEq, Eq)]
239#[strum(serialize_all = "snake_case")]
240pub enum HitStatus {
241 /// The asset's freshness directives indicate it has expired
242 Expired,
243
244 /// The asset was marked as expired, and should be treated as stale
245 ForceExpired,
246
247 /// The asset was marked as absent, and should be treated as a miss
248 ForceMiss,
249
250 /// An error occurred while processing the asset, so it should be treated as
251 /// a miss
252 FailedHitFilter,
253
254 /// The asset is not expired
255 Fresh,
256}
257
258impl HitStatus {
259 /// For displaying cache hit status
260 pub fn as_str(&self) -> &'static str {
261 self.into()
262 }
263
264 /// Whether cached asset can be served as fresh
265 pub fn is_fresh(&self) -> bool {
266 *self == HitStatus::Fresh
267 }
268
269 /// Check whether the hit status should be treated as a miss. A forced miss
270 /// is obviously treated as a miss. A hit-filter failure is treated as a
271 /// miss because we can't use the asset as an actual hit. If we treat it as
272 /// expired, we still might not be able to use it even if revalidation
273 /// succeeds.
274 pub fn is_treated_as_miss(self) -> bool {
275 matches!(self, HitStatus::ForceMiss | HitStatus::FailedHitFilter)
276 }
277}
278
279pub struct LockCtx {
280 pub lock: Option<Locked>,
281 pub cache_lock: &'static CacheKeyLockImpl,
282 pub wait_timeout: Option<Duration>,
283}
284
285// Fields like storage handlers that are needed only when cache is enabled (or bypassing).
286struct HttpCacheInnerEnabled {
287 pub meta: Option<CacheMeta>,
288 // when set, even if an asset exists, it would only be considered valid after this timestamp
289 pub valid_after: Option<SystemTime>,
290 pub miss_handler: Option<MissHandler>,
291 pub body_reader: Option<HitHandler>,
292 pub storage: &'static (dyn storage::Storage + Sync), // static for now
293 pub eviction: Option<&'static (dyn eviction::EvictionManager + Sync)>,
294 pub lock_ctx: Option<LockCtx>,
295 pub traces: trace::CacheTraceCTX,
296}
297
298struct HttpCacheInner {
299 // Prefer adding fields to InnerEnabled if possible, these fields are released
300 // when cache is disabled.
301 // If fields are needed after cache disablement, add directly to Inner.
302 pub enabled_ctx: Option<Box<HttpCacheInnerEnabled>>,
303 pub key: Option<CacheKey>,
304 // when set, an asset will be rejected from the cache if it exceeds configured size in bytes
305 pub max_file_size_tracker: Option<MaxFileSizeTracker>,
306 pub predictor: Option<&'static (dyn predictor::CacheablePredictor + Sync)>,
307}
308
309#[derive(Debug, Default)]
310#[non_exhaustive]
311pub struct CacheOptionOverrides {
312 pub wait_timeout: Option<Duration>,
313}
314
315impl HttpCache {
316 /// Create a new [HttpCache].
317 ///
318 /// Caching is not enabled by default.
319 pub fn new() -> Self {
320 HttpCache {
321 phase: CachePhase::Disabled(NoCacheReason::NeverEnabled),
322 inner: None,
323 digest: HttpCacheDigest::default(),
324 }
325 }
326
327 /// Whether the cache is enabled
328 pub fn enabled(&self) -> bool {
329 !matches!(self.phase, CachePhase::Disabled(_) | CachePhase::Bypass)
330 }
331
332 /// Whether the cache is being bypassed
333 pub fn bypassing(&self) -> bool {
334 matches!(self.phase, CachePhase::Bypass)
335 }
336
337 /// Return the [CachePhase]
338 pub fn phase(&self) -> CachePhase {
339 self.phase
340 }
341
342 /// Whether anything was fetched from the upstream
343 ///
344 /// This essentially checks all possible [CachePhase] who need to contact the upstream server
345 pub fn upstream_used(&self) -> bool {
346 use CachePhase::*;
347 match self.phase {
348 Disabled(_) | Bypass | Miss | Expired | Revalidated | RevalidatedNoCache(_) => true,
349 Hit | Stale | StaleUpdating => false,
350 Uninit | CacheKey => false, // invalid states for this call, treat them as false to keep it simple
351 }
352 }
353
354 /// Check whether the backend storage is the type `T`.
355 pub fn storage_type_is<T: 'static>(&self) -> bool {
356 self.inner
357 .as_ref()
358 .and_then(|inner| {
359 inner
360 .enabled_ctx
361 .as_ref()
362 .and_then(|ie| ie.storage.as_any().downcast_ref::<T>())
363 })
364 .is_some()
365 }
366
367 /// Release the cache lock if the current request is a cache writer.
368 ///
369 /// Generally callers should prefer using `disable` when a cache lock should be released
370 /// due to an error to clear all cache context. This function is for releasing the cache lock
371 /// while still keeping the cache around for reading, e.g. when serving stale.
372 pub fn release_write_lock(&mut self, reason: NoCacheReason) {
373 use NoCacheReason::*;
374 if let Some(inner) = self.inner.as_mut() {
375 if let Some(lock_ctx) = inner
376 .enabled_ctx
377 .as_mut()
378 .and_then(|ie| ie.lock_ctx.as_mut())
379 {
380 let lock = lock_ctx.lock.take();
381 if let Some(Locked::Write(permit)) = lock {
382 let lock_status = match reason {
383 // let the next request try to fetch it
384 InternalError | StorageError | Deferred | UpstreamError => {
385 LockStatus::TransientError
386 }
387 // depends on why the proxy upstream filter declined the request,
388 // for now still allow next request try to acquire to avoid thundering herd
389 DeclinedToUpstream => LockStatus::TransientError,
390 // no need for the lock anymore
391 OriginNotCache | ResponseTooLarge | PredictedResponseTooLarge => {
392 LockStatus::GiveUp
393 }
394 // not sure which LockStatus make sense, we treat it as GiveUp for now
395 Custom(_) => LockStatus::GiveUp,
396 // should never happen, NeverEnabled shouldn't hold a lock
397 NeverEnabled => panic!("NeverEnabled holds a write lock"),
398 CacheLockGiveUp | CacheLockTimeout => {
399 panic!("CacheLock* are for cache lock readers only")
400 }
401 };
402 lock_ctx
403 .cache_lock
404 .release(inner.key.as_ref().unwrap(), permit, lock_status);
405 }
406 }
407 }
408 }
409
410 /// Disable caching
411 pub fn disable(&mut self, reason: NoCacheReason) {
412 // XXX: compile type enforce?
413 assert!(
414 reason != NoCacheReason::NeverEnabled,
415 "NeverEnabled not allowed as a disable reason"
416 );
417 match self.phase {
418 CachePhase::Disabled(old_reason) => {
419 // replace reason
420 if old_reason == NoCacheReason::NeverEnabled {
421 // safeguard, don't allow replacing NeverEnabled as a reason
422 // TODO: can be promoted to assertion once confirmed nothing is attempting this
423 warn!("Tried to replace cache NeverEnabled with reason: {reason:?}");
424 return;
425 }
426 self.phase = CachePhase::Disabled(reason);
427 }
428 _ => {
429 self.phase = CachePhase::Disabled(reason);
430 self.release_write_lock(reason);
431 // enabled_ctx will be cleared out
432 let mut inner_enabled = self
433 .inner_mut()
434 .enabled_ctx
435 .take()
436 .expect("could remove enabled_ctx on disable");
437 // log initial disable reason
438 inner_enabled
439 .traces
440 .cache_span
441 .set_tag(|| trace::Tag::new("disable_reason", reason.as_str()));
442 }
443 }
444 }
445
446 /* The following methods panic when they are used in the wrong phase.
447 * This is better than returning errors as such panics are only caused by coding error, which
448 * should be fixed right away. Tokio runtime only crashes the current task instead of the whole
449 * program when these panics happen. */
450
451 /// Set the cache to bypass
452 ///
453 /// # Panic
454 /// This call is only allowed in [CachePhase::CacheKey] phase (before any cache lookup is performed).
455 /// Use it in any other phase will lead to panic.
456 pub fn bypass(&mut self) {
457 match self.phase {
458 CachePhase::CacheKey => {
459 // before cache lookup / found / miss
460 self.phase = CachePhase::Bypass;
461 self.inner_enabled_mut()
462 .traces
463 .cache_span
464 .set_tag(|| trace::Tag::new("bypassed", true));
465 }
466 _ => panic!("wrong phase to bypass HttpCache {:?}", self.phase),
467 }
468 }
469
470 /// Enable the cache
471 ///
472 /// - `storage`: the cache storage backend that implements [storage::Storage]
473 /// - `eviction`: optionally the eviction manager, without it, nothing will be evicted from the storage
474 /// - `predictor`: optionally a cache predictor. The cache predictor predicts whether something is likely
475 /// to be cacheable or not. This is useful because the proxy can apply different types of optimization to
476 /// cacheable and uncacheable requests.
477 /// - `cache_lock`: optionally a cache lock which handles concurrent lookups to the same asset. Without it
478 /// such lookups will all be allowed to fetch the asset independently.
479 pub fn enable(
480 &mut self,
481 storage: &'static (dyn storage::Storage + Sync),
482 eviction: Option<&'static (dyn eviction::EvictionManager + Sync)>,
483 predictor: Option<&'static (dyn predictor::CacheablePredictor + Sync)>,
484 cache_lock: Option<&'static CacheKeyLockImpl>,
485 option_overrides: Option<CacheOptionOverrides>,
486 ) {
487 match self.phase {
488 CachePhase::Disabled(_) => {
489 self.phase = CachePhase::Uninit;
490
491 let lock_ctx = cache_lock.map(|cache_lock| LockCtx {
492 cache_lock,
493 lock: None,
494 wait_timeout: option_overrides
495 .as_ref()
496 .and_then(|overrides| overrides.wait_timeout),
497 });
498
499 self.inner = Some(Box::new(HttpCacheInner {
500 enabled_ctx: Some(Box::new(HttpCacheInnerEnabled {
501 meta: None,
502 valid_after: None,
503 miss_handler: None,
504 body_reader: None,
505 storage,
506 eviction,
507 lock_ctx,
508 traces: CacheTraceCTX::new(),
509 })),
510 key: None,
511 max_file_size_tracker: None,
512 predictor,
513 }));
514 }
515 _ => panic!("Cannot enable already enabled HttpCache {:?}", self.phase),
516 }
517 }
518
519 /// Set the cache lock implementation.
520 /// # Panic
521 /// Must be called before a cache lock is attempted to be acquired,
522 /// i.e. in the `cache_key_callback` or `cache_hit_filter` phases.
523 pub fn set_cache_lock(
524 &mut self,
525 cache_lock: Option<&'static CacheKeyLockImpl>,
526 option_overrides: Option<CacheOptionOverrides>,
527 ) {
528 match self.phase {
529 CachePhase::Disabled(_)
530 | CachePhase::CacheKey
531 | CachePhase::Stale
532 | CachePhase::Hit => {
533 let inner_enabled = self.inner_enabled_mut();
534 if inner_enabled
535 .lock_ctx
536 .as_ref()
537 .is_some_and(|ctx| ctx.lock.is_some())
538 {
539 panic!("lock already set when resetting cache lock")
540 } else {
541 let lock_ctx = cache_lock.map(|cache_lock| LockCtx {
542 cache_lock,
543 lock: None,
544 wait_timeout: option_overrides.and_then(|overrides| overrides.wait_timeout),
545 });
546 inner_enabled.lock_ctx = lock_ctx;
547 }
548 }
549 _ => panic!("wrong phase: {:?}", self.phase),
550 }
551 }
552
553 // Enable distributed tracing
554 pub fn enable_tracing(&mut self, parent_span: trace::Span) {
555 if let Some(inner_enabled) = self.inner.as_mut().and_then(|i| i.enabled_ctx.as_mut()) {
556 inner_enabled.traces.enable(parent_span);
557 }
558 }
559
560 // Get the cache parent tracing span
561 pub fn get_cache_span(&self) -> Option<trace::SpanHandle> {
562 self.inner
563 .as_ref()
564 .and_then(|i| i.enabled_ctx.as_ref().map(|ie| ie.traces.get_cache_span()))
565 }
566
567 // Get the cache `miss` tracing span
568 pub fn get_miss_span(&self) -> Option<trace::SpanHandle> {
569 self.inner
570 .as_ref()
571 .and_then(|i| i.enabled_ctx.as_ref().map(|ie| ie.traces.get_miss_span()))
572 }
573
574 // Get the cache `hit` tracing span
575 pub fn get_hit_span(&self) -> Option<trace::SpanHandle> {
576 self.inner
577 .as_ref()
578 .and_then(|i| i.enabled_ctx.as_ref().map(|ie| ie.traces.get_hit_span()))
579 }
580
581 // shortcut to access inner fields, panic if phase is disabled
582 #[inline]
583 fn inner_enabled_mut(&mut self) -> &mut HttpCacheInnerEnabled {
584 self.inner.as_mut().unwrap().enabled_ctx.as_mut().unwrap()
585 }
586
587 #[inline]
588 fn inner_enabled(&self) -> &HttpCacheInnerEnabled {
589 self.inner.as_ref().unwrap().enabled_ctx.as_ref().unwrap()
590 }
591
592 // shortcut to access inner fields, panic if cache was never enabled
593 #[inline]
594 fn inner_mut(&mut self) -> &mut HttpCacheInner {
595 self.inner.as_mut().unwrap()
596 }
597
598 #[inline]
599 fn inner(&self) -> &HttpCacheInner {
600 self.inner.as_ref().unwrap()
601 }
602
603 /// Set the cache key
604 /// # Panic
605 /// Cache key is only allowed to be set in its own phase. Set it in other phases will cause panic.
606 pub fn set_cache_key(&mut self, key: CacheKey) {
607 match self.phase {
608 CachePhase::Uninit | CachePhase::CacheKey => {
609 self.phase = CachePhase::CacheKey;
610 self.inner_mut().key = Some(key);
611 }
612 _ => panic!("wrong phase {:?}", self.phase),
613 }
614 }
615
616 /// Return the cache key used for asset lookup
617 /// # Panic
618 /// Can only be called after the cache key is set and the cache is not disabled. Panic otherwise.
619 pub fn cache_key(&self) -> &CacheKey {
620 match self.phase {
621 CachePhase::Disabled(NoCacheReason::NeverEnabled) | CachePhase::Uninit => {
622 panic!("wrong phase {:?}", self.phase)
623 }
624 _ => self
625 .inner()
626 .key
627 .as_ref()
628 .expect("cache key should be set (set_cache_key not called?)"),
629 }
630 }
631
632 /// Return the max size allowed to be cached.
633 pub fn max_file_size_bytes(&self) -> Option<usize> {
634 assert!(
635 !matches!(
636 self.phase,
637 CachePhase::Disabled(NoCacheReason::NeverEnabled)
638 ),
639 "tried to access max file size bytes when cache never enabled"
640 );
641 self.inner()
642 .max_file_size_tracker
643 .as_ref()
644 .map(|t| t.max_file_size_bytes())
645 }
646
647 /// Set the maximum response _body_ size in bytes that will be admitted to the cache.
648 ///
649 /// Response header size should not contribute to the max file size.
650 ///
651 /// To track body bytes, call `track_bytes_for_max_file_size`.
652 pub fn set_max_file_size_bytes(&mut self, max_file_size_bytes: usize) {
653 match self.phase {
654 CachePhase::Disabled(_) => panic!("wrong phase {:?}", self.phase),
655 _ => {
656 self.inner_mut().max_file_size_tracker =
657 Some(MaxFileSizeTracker::new(max_file_size_bytes));
658 }
659 }
660 }
661
662 /// Record body bytes for the max file size tracker.
663 ///
664 /// The `bytes_len` input contributes to a cumulative body byte tracker.
665 ///
666 /// Once the cumulative body bytes exceeds the maximum allowable cache file size (as configured
667 /// by `set_max_file_size_bytes`), then the return value will be false.
668 ///
669 /// Else the return value is true as long as the max file size is not exceeded.
670 /// If max file size was not configured, the return value is always true.
671 pub fn track_body_bytes_for_max_file_size(&mut self, bytes_len: usize) -> bool {
672 // This is intended to be callable when cache has already been disabled,
673 // so that we can re-mark an asset as cacheable if the body size is under limits.
674 assert!(
675 !matches!(
676 self.phase,
677 CachePhase::Disabled(NoCacheReason::NeverEnabled)
678 ),
679 "tried to access max file size bytes when cache never enabled"
680 );
681 self.inner_mut()
682 .max_file_size_tracker
683 .as_mut()
684 .map_or(true, |t| t.add_body_bytes(bytes_len))
685 }
686
687 /// Check if the max file size has been exceeded according to max file size tracker.
688 ///
689 /// Return true if max file size was exceeded.
690 pub fn exceeded_max_file_size(&self) -> bool {
691 assert!(
692 !matches!(
693 self.phase,
694 CachePhase::Disabled(NoCacheReason::NeverEnabled)
695 ),
696 "tried to access max file size bytes when cache never enabled"
697 );
698 self.inner()
699 .max_file_size_tracker
700 .as_ref()
701 .is_some_and(|t| !t.allow_caching())
702 }
703
704 /// Set that cache is found in cache storage.
705 ///
706 /// This function is called after [Self::cache_lookup()] which returns the [CacheMeta] and
707 /// [HitHandler].
708 ///
709 /// The `hit_status` enum allows the caller to force expire assets.
710 pub fn cache_found(&mut self, meta: CacheMeta, hit_handler: HitHandler, hit_status: HitStatus) {
711 // Stale allowed because of cache lock and then retry
712 if !matches!(self.phase, CachePhase::CacheKey | CachePhase::Stale) {
713 panic!("wrong phase {:?}", self.phase)
714 }
715
716 self.phase = match hit_status {
717 HitStatus::Fresh => CachePhase::Hit,
718 HitStatus::Expired | HitStatus::ForceExpired => CachePhase::Stale,
719 HitStatus::FailedHitFilter | HitStatus::ForceMiss => self.phase,
720 };
721
722 let phase = self.phase;
723 let inner = self.inner_mut();
724
725 let key = inner.key.as_ref().expect("key must be set on hit");
726 let inner_enabled = inner
727 .enabled_ctx
728 .as_mut()
729 .expect("cache_found must be called while cache enabled");
730
731 // The cache lock might not be set for stale hit or hits treated as
732 // misses, so we need to initialize it here
733 if phase == CachePhase::Stale || hit_status.is_treated_as_miss() {
734 if let Some(lock_ctx) = inner_enabled.lock_ctx.as_mut() {
735 lock_ctx.lock = Some(lock_ctx.cache_lock.lock(key));
736 }
737 }
738
739 if hit_status.is_treated_as_miss() {
740 // Clear the body and meta for hits that are treated as misses
741 inner_enabled.body_reader = None;
742 inner_enabled.meta = None;
743 } else {
744 // Set the metadata appropriately for legit hits
745 inner_enabled.traces.start_hit_span(phase, hit_status);
746 inner_enabled.traces.log_meta_in_hit_span(&meta);
747 if let Some(eviction) = inner_enabled.eviction {
748 // TODO: make access() accept CacheKey
749 let cache_key = key.to_compact();
750 if hit_handler.should_count_access() {
751 let size = hit_handler.get_eviction_weight();
752 eviction.access(&cache_key, size, meta.0.internal.fresh_until);
753 }
754 }
755 inner_enabled.meta = Some(meta);
756 inner_enabled.body_reader = Some(hit_handler);
757 }
758 }
759
760 /// Mark `self` to be cache miss.
761 ///
762 /// This function is called after [Self::cache_lookup()] finds nothing or the caller decides
763 /// not to use the assets found.
764 /// # Panic
765 /// Panic in other phases.
766 pub fn cache_miss(&mut self) {
767 match self.phase {
768 // from CacheKey: set state to miss during cache lookup
769 // from Bypass: response became cacheable, set state to miss to cache
770 // from Stale: waited for cache lock, then retried and found asset was gone
771 CachePhase::CacheKey | CachePhase::Bypass | CachePhase::Stale => {
772 self.phase = CachePhase::Miss;
773 // It's possible that we've set the meta on lookup and have come back around
774 // here after not being able to acquire the cache lock, and our item has since
775 // purged or expired. We should be sure that the meta is not set in this case
776 // as there shouldn't be a meta set for cache misses.
777 self.inner_enabled_mut().meta = None;
778 self.inner_enabled_mut().traces.start_miss_span();
779 }
780 _ => panic!("wrong phase {:?}", self.phase),
781 }
782 }
783
784 /// Return the [HitHandler]
785 /// # Panic
786 /// Call this after [Self::cache_found()], panic in other phases.
787 pub fn hit_handler(&mut self) -> &mut HitHandler {
788 match self.phase {
789 CachePhase::Hit
790 | CachePhase::Stale
791 | CachePhase::StaleUpdating
792 | CachePhase::Revalidated
793 | CachePhase::RevalidatedNoCache(_) => {
794 self.inner_enabled_mut().body_reader.as_mut().unwrap()
795 }
796 _ => panic!("wrong phase {:?}", self.phase),
797 }
798 }
799
800 /// Return the body reader during a cache admission (miss/expired) which decouples the downstream
801 /// read and upstream cache write
802 pub fn miss_body_reader(&mut self) -> Option<&mut HitHandler> {
803 match self.phase {
804 CachePhase::Miss | CachePhase::Expired => {
805 let inner_enabled = self.inner_enabled_mut();
806 if inner_enabled.storage.support_streaming_partial_write() {
807 inner_enabled.body_reader.as_mut()
808 } else {
809 // body_reader could be set even when the storage doesn't support streaming
810 // Expired cache would have the reader set.
811 None
812 }
813 }
814 _ => None,
815 }
816 }
817
818 /// Call this when cache hit is fully read.
819 ///
820 /// This call will release resource if any and log the timing in tracing if set.
821 /// # Panic
822 /// Panic in phases where there is no cache hit.
823 pub async fn finish_hit_handler(&mut self) -> Result<()> {
824 match self.phase {
825 CachePhase::Hit
826 | CachePhase::Miss
827 | CachePhase::Expired
828 | CachePhase::Stale
829 | CachePhase::StaleUpdating
830 | CachePhase::Revalidated
831 | CachePhase::RevalidatedNoCache(_) => {
832 let inner = self.inner_mut();
833 let inner_enabled = inner.enabled_ctx.as_mut().expect("cache enabled");
834 if inner_enabled.body_reader.is_none() {
835 // already finished, we allow calling this function more than once
836 return Ok(());
837 }
838 let body_reader = inner_enabled.body_reader.take().unwrap();
839 let key = inner.key.as_ref().unwrap();
840 let result = body_reader
841 .finish(
842 inner_enabled.storage,
843 key,
844 &inner_enabled.traces.hit_span.handle(),
845 )
846 .await;
847 inner_enabled.traces.finish_hit_span();
848 result
849 }
850 _ => panic!("wrong phase {:?}", self.phase),
851 }
852 }
853
854 /// Set the [MissHandler] according to cache_key and meta, can only call once
855 pub async fn set_miss_handler(&mut self) -> Result<()> {
856 match self.phase {
857 // set_miss_handler() needs to be called after set_cache_meta() (which change Stale to Expire).
858 // This is an artificial rule to enforce the state transitions
859 CachePhase::Miss | CachePhase::Expired => {
860 let inner = self.inner_mut();
861 let inner_enabled = inner
862 .enabled_ctx
863 .as_mut()
864 .expect("cache enabled on miss and expired");
865 if inner_enabled.miss_handler.is_some() {
866 panic!("write handler is already set")
867 }
868 let meta = inner_enabled.meta.as_ref().unwrap();
869 let key = inner.key.as_ref().unwrap();
870 let miss_handler = inner_enabled
871 .storage
872 .get_miss_handler(key, meta, &inner_enabled.traces.get_miss_span())
873 .await?;
874
875 inner_enabled.miss_handler = Some(miss_handler);
876
877 if inner_enabled.storage.support_streaming_partial_write() {
878 // If a reader can access partial write, the cache lock can be released here
879 // to let readers start reading the body.
880 if let Some(lock_ctx) = inner_enabled.lock_ctx.as_mut() {
881 let lock = lock_ctx.lock.take();
882 if let Some(Locked::Write(permit)) = lock {
883 lock_ctx.cache_lock.release(key, permit, LockStatus::Done);
884 }
885 }
886 // Downstream read and upstream write can be decoupled
887 let body_reader = inner_enabled
888 .storage
889 .lookup_streaming_write(
890 key,
891 inner_enabled
892 .miss_handler
893 .as_ref()
894 .expect("miss handler already set")
895 .streaming_write_tag(),
896 &inner_enabled.traces.get_miss_span(),
897 )
898 .await?;
899
900 if let Some((_meta, body_reader)) = body_reader {
901 inner_enabled.body_reader = Some(body_reader);
902 } else {
903 // body_reader should exist now because streaming_partial_write is to support it
904 panic!("unable to get body_reader for {:?}", meta);
905 }
906 }
907 Ok(())
908 }
909 _ => panic!("wrong phase {:?}", self.phase),
910 }
911 }
912
913 /// Return the [MissHandler] to write the response body to cache.
914 ///
915 /// `None`: the handler has not been set or already finished
916 pub fn miss_handler(&mut self) -> Option<&mut MissHandler> {
917 match self.phase {
918 CachePhase::Miss | CachePhase::Expired => {
919 self.inner_enabled_mut().miss_handler.as_mut()
920 }
921 _ => panic!("wrong phase {:?}", self.phase),
922 }
923 }
924
925 /// Finish cache admission
926 ///
927 /// If [self] is dropped without calling this, the cache admission is considered incomplete and
928 /// should be cleaned up.
929 ///
930 /// This call will also trigger eviction if set.
931 pub async fn finish_miss_handler(&mut self) -> Result<()> {
932 match self.phase {
933 CachePhase::Miss | CachePhase::Expired => {
934 let inner = self.inner_mut();
935 let inner_enabled = inner
936 .enabled_ctx
937 .as_mut()
938 .expect("cache enabled on miss and expired");
939 if inner_enabled.miss_handler.is_none() {
940 // already finished, we allow calling this function more than once
941 return Ok(());
942 }
943 let miss_handler = inner_enabled.miss_handler.take().unwrap();
944 let size = miss_handler.finish().await?;
945 let key = inner
946 .key
947 .as_ref()
948 .expect("key set by miss or expired phase");
949 if let Some(lock_ctx) = inner_enabled.lock_ctx.as_mut() {
950 let lock = lock_ctx.lock.take();
951 if let Some(Locked::Write(permit)) = lock {
952 // no need to call r.unlock() because release() will call it
953 // r is a guard to make sure the lock is unlocked when this request is dropped
954 lock_ctx.cache_lock.release(key, permit, LockStatus::Done);
955 }
956 }
957 if let Some(eviction) = inner_enabled.eviction {
958 let cache_key = key.to_compact();
959 let meta = inner_enabled.meta.as_ref().unwrap();
960 let evicted = match size {
961 MissFinishType::Created(size) => {
962 eviction.admit(cache_key, size, meta.0.internal.fresh_until)
963 }
964 MissFinishType::Appended(size) => {
965 eviction.increment_weight(cache_key, size)
966 }
967 };
968 // actual eviction can be done async
969 let span = inner_enabled.traces.child("eviction");
970 let handle = span.handle();
971 let storage = inner_enabled.storage;
972 tokio::task::spawn(async move {
973 for item in evicted {
974 if let Err(e) = storage.purge(&item, PurgeType::Eviction, &handle).await
975 {
976 warn!("Failed to purge {item} during eviction for finish miss handler: {e}");
977 }
978 }
979 });
980 }
981 inner_enabled.traces.finish_miss_span();
982 Ok(())
983 }
984 _ => panic!("wrong phase {:?}", self.phase),
985 }
986 }
987
988 /// Set the [CacheMeta] of the cache
989 pub fn set_cache_meta(&mut self, meta: CacheMeta) {
990 match self.phase {
991 // TODO: store the staled meta somewhere else for future use?
992 CachePhase::Stale | CachePhase::Miss => {
993 let inner_enabled = self.inner_enabled_mut();
994 // TODO: have a separate expired span?
995 inner_enabled.traces.log_meta_in_miss_span(&meta);
996 inner_enabled.meta = Some(meta);
997 }
998 _ => panic!("wrong phase {:?}", self.phase),
999 }
1000 if self.phase == CachePhase::Stale {
1001 self.phase = CachePhase::Expired;
1002 }
1003 }
1004
1005 /// Set the [CacheMeta] of the cache after revalidation.
1006 ///
1007 /// Certain info such as the original cache admission time will be preserved. Others will
1008 /// be replaced by the input `meta`.
1009 pub async fn revalidate_cache_meta(&mut self, mut meta: CacheMeta) -> Result<bool> {
1010 let result = match self.phase {
1011 CachePhase::Stale => {
1012 let inner = self.inner_mut();
1013 let inner_enabled = inner
1014 .enabled_ctx
1015 .as_mut()
1016 .expect("stale phase has cache enabled");
1017 // TODO: we should keep old meta in place, just use new one to update it
1018 // that requires cacheable_filter to take a mut header and just return InternalMeta
1019
1020 // update new meta with old meta's created time
1021 let old_meta = inner_enabled.meta.take().unwrap();
1022 let created = old_meta.0.internal.created;
1023 meta.0.internal.created = created;
1024 // meta.internal.updated was already set to new meta's `created`,
1025 // no need to set `updated` here
1026 // Merge old extensions with new ones. New exts take precedence if they conflict.
1027 let mut extensions = old_meta.0.extensions;
1028 extensions.extend(meta.0.extensions);
1029 meta.0.extensions = extensions;
1030
1031 inner_enabled.meta.replace(meta);
1032
1033 if let Some(lock_ctx) = inner_enabled.lock_ctx.as_mut() {
1034 let lock = lock_ctx.lock.take();
1035 if let Some(Locked::Write(permit)) = lock {
1036 lock_ctx.cache_lock.release(
1037 inner.key.as_ref().expect("key set by stale phase"),
1038 permit,
1039 LockStatus::Done,
1040 );
1041 }
1042 }
1043
1044 let mut span = inner_enabled.traces.child("update_meta");
1045 // TODO: this call can be async
1046 let result = inner_enabled
1047 .storage
1048 .update_meta(
1049 inner.key.as_ref().unwrap(),
1050 inner_enabled.meta.as_ref().unwrap(),
1051 &span.handle(),
1052 )
1053 .await;
1054 span.set_tag(|| trace::Tag::new("updated", result.is_ok()));
1055 result
1056 }
1057 _ => panic!("wrong phase {:?}", self.phase),
1058 };
1059 self.phase = CachePhase::Revalidated;
1060 result
1061 }
1062
1063 /// After a successful revalidation, update certain headers for the cached asset
1064 /// such as `Etag` with the fresh response header `resp`.
1065 pub fn revalidate_merge_header(&mut self, resp: &RespHeader) -> ResponseHeader {
1066 match self.phase {
1067 CachePhase::Stale => {
1068 /*
1069 * https://datatracker.ietf.org/doc/html/rfc9110#section-15.4.5
1070 * 304 response MUST generate ... would have been sent in a 200 ...
1071 * - Content-Location, Date, ETag, and Vary
1072 * - Cache-Control and Expires...
1073 */
1074 let mut old_header = self.inner_enabled().meta.as_ref().unwrap().0.header.clone();
1075 let mut clone_header = |header_name: &'static str| {
1076 for (i, value) in resp.headers.get_all(header_name).iter().enumerate() {
1077 if i == 0 {
1078 old_header
1079 .insert_header(header_name, value)
1080 .expect("can add valid header");
1081 } else {
1082 old_header
1083 .append_header(header_name, value)
1084 .expect("can add valid header");
1085 }
1086 }
1087 };
1088 clone_header("cache-control");
1089 clone_header("expires");
1090 clone_header("cache-tag");
1091 clone_header("cdn-cache-control");
1092 clone_header("etag");
1093 // https://datatracker.ietf.org/doc/html/rfc9111#section-4.3.4
1094 // "...cache MUST update its header fields with the header fields provided in the 304..."
1095 // But if the Vary header changes, the cached response may no longer match the
1096 // incoming request.
1097 //
1098 // For simplicity, ignore changing Vary in revalidation for now.
1099 // TODO: if we support vary during revalidation, there are a few edge cases to
1100 // consider (what if Vary header appears/disappears/changes)?
1101 //
1102 // clone_header("vary");
1103 old_header
1104 }
1105 _ => panic!("wrong phase {:?}", self.phase),
1106 }
1107 }
1108
1109 /// Mark this asset uncacheable after revalidation
1110 pub fn revalidate_uncacheable(&mut self, header: ResponseHeader, reason: NoCacheReason) {
1111 match self.phase {
1112 CachePhase::Stale => {
1113 // replace cache meta header
1114 self.inner_enabled_mut().meta.as_mut().unwrap().0.header = header;
1115 // upstream request done, release write lock
1116 self.release_write_lock(reason);
1117 }
1118 _ => panic!("wrong phase {:?}", self.phase),
1119 }
1120 self.phase = CachePhase::RevalidatedNoCache(reason);
1121 // TODO: remove this asset from cache once finished?
1122 }
1123
1124 /// Mark this asset as stale, but being updated separately from this request.
1125 pub fn set_stale_updating(&mut self) {
1126 match self.phase {
1127 CachePhase::Stale => self.phase = CachePhase::StaleUpdating,
1128 _ => panic!("wrong phase {:?}", self.phase),
1129 }
1130 }
1131
1132 /// Update the variance of the [CacheMeta].
1133 ///
1134 /// Note that this process may change the lookup `key`, and eventually (when the asset is
1135 /// written to storage) invalidate other cached variants under the same primary key as the
1136 /// current asset.
1137 pub fn update_variance(&mut self, variance: Option<HashBinary>) {
1138 // If this is a cache miss, we will simply update the variance in the meta.
1139 //
1140 // If this is an expired response, we will have to consider a few cases:
1141 //
1142 // **Case 1**: Variance was absent, but caller sets it now.
1143 // We will just insert it into the meta. The current asset becomes the primary variant.
1144 // Because the current location of the asset is already the primary variant, nothing else
1145 // needs to be done.
1146 //
1147 // **Case 2**: Variance was present, but it changed or was removed.
1148 // We want the current asset to take over the primary slot, in order to invalidate all
1149 // other variants derived under the old Vary.
1150 //
1151 // **Case 3**: Variance did not change.
1152 // Nothing needs to happen.
1153 let inner = match self.phase {
1154 CachePhase::Miss | CachePhase::Expired => self.inner_mut(),
1155 _ => panic!("wrong phase {:?}", self.phase),
1156 };
1157 let inner_enabled = inner
1158 .enabled_ctx
1159 .as_mut()
1160 .expect("cache enabled on miss and expired");
1161
1162 // Update the variance in the meta
1163 if let Some(variance_hash) = variance.as_ref() {
1164 inner_enabled
1165 .meta
1166 .as_mut()
1167 .unwrap()
1168 .set_variance_key(*variance_hash);
1169 } else {
1170 inner_enabled.meta.as_mut().unwrap().remove_variance();
1171 }
1172
1173 // Change the lookup `key` if necessary, in order to admit asset into the primary slot
1174 // instead of the secondary slot.
1175 let key = inner.key.as_ref().unwrap();
1176 if let Some(old_variance) = key.get_variance_key().as_ref() {
1177 // This is a secondary variant slot.
1178 if Some(*old_variance) != variance.as_ref() {
1179 // This new variance does not match the variance in the cache key we used to look
1180 // up this asset.
1181 // Drop the cache lock to avoid leaving a dangling lock
1182 // (because we locked with the old cache key for the secondary slot)
1183 // TODO: maybe we should try to signal waiting readers to compete for the primary key
1184 // lock instead? we will not be modifying this secondary slot so it's not actually
1185 // ready for readers
1186 if let Some(lock_ctx) = inner_enabled.lock_ctx.as_mut() {
1187 if let Some(Locked::Write(permit)) = lock_ctx.lock.take() {
1188 lock_ctx.cache_lock.release(key, permit, LockStatus::Done);
1189 }
1190 }
1191 // Remove the `variance` from the `key`, so that we admit this asset into the
1192 // primary slot. (`key` is used to tell storage where to write the data.)
1193 inner.key.as_mut().unwrap().remove_variance_key();
1194 }
1195 }
1196 }
1197
1198 /// Return the [CacheMeta] of this asset
1199 ///
1200 /// # Panic
1201 /// Panic in phases which has no cache meta.
1202 pub fn cache_meta(&self) -> &CacheMeta {
1203 match self.phase {
1204 // TODO: allow in Bypass phase?
1205 CachePhase::Stale
1206 | CachePhase::StaleUpdating
1207 | CachePhase::Expired
1208 | CachePhase::Hit
1209 | CachePhase::Revalidated
1210 | CachePhase::RevalidatedNoCache(_) => self.inner_enabled().meta.as_ref().unwrap(),
1211 CachePhase::Miss => {
1212 // this is the async body read case, safe because body_reader is only set
1213 // after meta is retrieved
1214 if self.inner_enabled().body_reader.is_some() {
1215 self.inner_enabled().meta.as_ref().unwrap()
1216 } else {
1217 panic!("wrong phase {:?}", self.phase);
1218 }
1219 }
1220
1221 _ => panic!("wrong phase {:?}", self.phase),
1222 }
1223 }
1224
1225 /// Return the [CacheMeta] of this asset if any
1226 ///
1227 /// Different from [Self::cache_meta()], this function is allowed to be called in
1228 /// [CachePhase::Miss] phase where the cache meta maybe set.
1229 /// # Panic
1230 /// Panic in phases that shouldn't have cache meta.
1231 pub fn maybe_cache_meta(&self) -> Option<&CacheMeta> {
1232 match self.phase {
1233 CachePhase::Miss
1234 | CachePhase::Stale
1235 | CachePhase::StaleUpdating
1236 | CachePhase::Expired
1237 | CachePhase::Hit
1238 | CachePhase::Revalidated
1239 | CachePhase::RevalidatedNoCache(_) => self.inner_enabled().meta.as_ref(),
1240 _ => panic!("wrong phase {:?}", self.phase),
1241 }
1242 }
1243
1244 /// Perform the cache lookup from the given cache storage with the given cache key
1245 ///
1246 /// A cache hit will return [CacheMeta] which contains the header and meta info about
1247 /// the cache as well as a [HitHandler] to read the cache hit body.
1248 /// # Panic
1249 /// Panic in other phases.
1250 pub async fn cache_lookup(&mut self) -> Result<Option<(CacheMeta, HitHandler)>> {
1251 match self.phase {
1252 // Stale is allowed here because stale-> cache_lock -> lookup again
1253 CachePhase::CacheKey | CachePhase::Stale => {
1254 let inner = self
1255 .inner
1256 .as_mut()
1257 .expect("Cache phase is checked and should have inner");
1258 let inner_enabled = inner
1259 .enabled_ctx
1260 .as_mut()
1261 .expect("Cache enabled on cache_lookup");
1262 let mut span = inner_enabled.traces.child("lookup");
1263 let key = inner.key.as_ref().unwrap(); // safe, this phase should have cache key
1264 let now = Instant::now();
1265 let result = inner_enabled.storage.lookup(key, &span.handle()).await?;
1266 // one request may have multiple lookups
1267 self.digest.add_lookup_duration(now.elapsed());
1268 let result = result.and_then(|(meta, header)| {
1269 if let Some(ts) = inner_enabled.valid_after {
1270 if meta.created() < ts {
1271 span.set_tag(|| trace::Tag::new("not valid", true));
1272 return None;
1273 }
1274 }
1275 Some((meta, header))
1276 });
1277 if result.is_none() {
1278 if let Some(lock_ctx) = inner_enabled.lock_ctx.as_mut() {
1279 lock_ctx.lock = Some(lock_ctx.cache_lock.lock(key));
1280 }
1281 }
1282 span.set_tag(|| trace::Tag::new("found", result.is_some()));
1283 Ok(result)
1284 }
1285 _ => panic!("wrong phase {:?}", self.phase),
1286 }
1287 }
1288
1289 /// Update variance and see if the meta matches the current variance
1290 ///
1291 /// `cache_lookup() -> compute vary hash -> cache_vary_lookup()`
1292 /// This function allows callers to compute vary based on the initial cache hit.
1293 /// `meta` should be the ones returned from the initial cache_lookup()
1294 /// - return true if the meta is the variance.
1295 /// - return false if the current meta doesn't match the variance, need to cache_lookup() again
1296 pub fn cache_vary_lookup(&mut self, variance: HashBinary, meta: &CacheMeta) -> bool {
1297 match self.phase {
1298 // Stale is allowed here because stale-> cache_lock -> lookup again
1299 CachePhase::CacheKey | CachePhase::Stale => {
1300 let inner = self.inner_mut();
1301 // make sure that all variances found are fresher than this asset
1302 // this is because when purging all the variance, only the primary slot is deleted
1303 // the created TS of the primary is the tombstone of all the variances
1304 inner
1305 .enabled_ctx
1306 .as_mut()
1307 .expect("cache enabled")
1308 .valid_after = Some(meta.created());
1309
1310 // update vary
1311 let key = inner.key.as_mut().unwrap();
1312 // if no variance was previously set, then this is the first cache hit
1313 let is_initial_cache_hit = key.get_variance_key().is_none();
1314 key.set_variance_key(variance);
1315 let variance_binary = key.variance_bin();
1316 let matches_variance = meta.variance() == variance_binary;
1317
1318 // We should remove the variance in the lookup `key` if this is the primary variant
1319 // slot. We know this is the primary variant slot if this is the initial cache hit,
1320 // AND the variance in the `key` already matches the `meta`'s.
1321 //
1322 // For the primary variant slot, the storage backend needs to use the primary key
1323 // for both cache lookup and updating the meta. Otherwise it will look for the
1324 // asset in the wrong location during revalidation.
1325 //
1326 // We can recreate the "full" cache key by using the meta's variance, if needed.
1327 if matches_variance && is_initial_cache_hit {
1328 inner.key.as_mut().unwrap().remove_variance_key();
1329 }
1330
1331 matches_variance
1332 }
1333 _ => panic!("wrong phase {:?}", self.phase),
1334 }
1335 }
1336
1337 /// Whether this request is behind a cache lock in order to wait for another request to read the
1338 /// asset.
1339 pub fn is_cache_locked(&self) -> bool {
1340 matches!(
1341 self.inner_enabled()
1342 .lock_ctx
1343 .as_ref()
1344 .and_then(|l| l.lock.as_ref()),
1345 Some(Locked::Read(_))
1346 )
1347 }
1348
1349 /// Whether this request is the leader request to fetch the assets for itself and other requests
1350 /// behind the cache lock.
1351 pub fn is_cache_lock_writer(&self) -> bool {
1352 matches!(
1353 self.inner_enabled()
1354 .lock_ctx
1355 .as_ref()
1356 .and_then(|l| l.lock.as_ref()),
1357 Some(Locked::Write(_))
1358 )
1359 }
1360
1361 /// Take the write lock from this request to transfer it to another one.
1362 /// # Panic
1363 /// Call is_cache_lock_writer() to check first, will panic otherwise.
1364 pub fn take_write_lock(&mut self) -> (WritePermit, &'static CacheKeyLockImpl) {
1365 let lock_ctx = self
1366 .inner_enabled_mut()
1367 .lock_ctx
1368 .as_mut()
1369 .expect("take_write_lock() called without cache lock");
1370 let lock = lock_ctx
1371 .lock
1372 .take()
1373 .expect("take_write_lock() called without lock");
1374 match lock {
1375 Locked::Write(w) => (w, lock_ctx.cache_lock),
1376 Locked::Read(_) => panic!("take_write_lock() called on read lock"),
1377 }
1378 }
1379
1380 /// Set the write lock, which is usually transferred from [Self::take_write_lock()]
1381 ///
1382 /// # Panic
1383 /// Panics if cache lock was not originally configured for this request.
1384 // TODO: it may make sense to allow configuring the CacheKeyLock here too that the write permit
1385 // is associated with
1386 // (The WritePermit comes from the CacheKeyLock and should be used when releasing from the CacheKeyLock,
1387 // shouldn't be possible to give a WritePermit to a request using a different CacheKeyLock)
1388 pub fn set_write_lock(&mut self, write_lock: WritePermit) {
1389 if let Some(lock_ctx) = self.inner_enabled_mut().lock_ctx.as_mut() {
1390 lock_ctx.lock.replace(Locked::Write(write_lock));
1391 }
1392 }
1393
1394 /// Whether this request's cache hit is staled
1395 fn has_staled_asset(&self) -> bool {
1396 matches!(self.phase, CachePhase::Stale | CachePhase::StaleUpdating)
1397 }
1398
1399 /// Whether this asset is staled and stale if error is allowed
1400 pub fn can_serve_stale_error(&self) -> bool {
1401 self.has_staled_asset() && self.cache_meta().serve_stale_if_error(SystemTime::now())
1402 }
1403
1404 /// Whether this asset is staled and stale while revalidate is allowed.
1405 pub fn can_serve_stale_updating(&self) -> bool {
1406 self.has_staled_asset()
1407 && self
1408 .cache_meta()
1409 .serve_stale_while_revalidate(SystemTime::now())
1410 }
1411
1412 /// Wait for the cache read lock to be unlocked
1413 /// # Panic
1414 /// Check [Self::is_cache_locked()], panic if this request doesn't have a read lock.
1415 pub async fn cache_lock_wait(&mut self) -> LockStatus {
1416 let inner_enabled = self.inner_enabled_mut();
1417 let mut span = inner_enabled.traces.child("cache_lock");
1418 // should always call is_cache_locked() before this function, which should guarantee that
1419 // the inner cache has a read lock and lock ctx
1420 if let Some(lock_ctx) = inner_enabled.lock_ctx.as_mut() {
1421 let lock = lock_ctx.lock.take(); // remove the lock from self
1422 if let Some(Locked::Read(r)) = lock {
1423 let now = Instant::now();
1424 // it's possible for a request to be locked more than once,
1425 // so wait the remainder of our configured timeout
1426 let status = if let Some(wait_timeout) = lock_ctx.wait_timeout {
1427 let wait_timeout =
1428 wait_timeout.saturating_sub(self.lock_duration().unwrap_or(Duration::ZERO));
1429 match timeout(wait_timeout, r.wait()).await {
1430 Ok(()) => r.lock_status(),
1431 // TODO: need to differentiate WaitTimeout vs. Lock(Age)Timeout (expired)?
1432 Err(_) => LockStatus::Timeout,
1433 }
1434 } else {
1435 r.wait().await;
1436 r.lock_status()
1437 };
1438 self.digest.add_lock_duration(now.elapsed());
1439 let tag_value: &'static str = status.into();
1440 span.set_tag(|| Tag::new("status", tag_value));
1441 status
1442 } else {
1443 panic!("cache_lock_wait on wrong type of lock")
1444 }
1445 } else {
1446 panic!("cache_lock_wait without cache lock")
1447 }
1448 }
1449
1450 /// How long did this request wait behind the read lock
1451 pub fn lock_duration(&self) -> Option<Duration> {
1452 self.digest.lock_duration
1453 }
1454
1455 /// How long did this request spent on cache lookup and reading the header
1456 pub fn lookup_duration(&self) -> Option<Duration> {
1457 self.digest.lookup_duration
1458 }
1459
1460 /// Delete the asset from the cache storage
1461 /// # Panic
1462 /// Need to be called after the cache key is set. Panic otherwise.
1463 pub async fn purge(&self) -> Result<bool> {
1464 match self.phase {
1465 CachePhase::CacheKey => {
1466 let inner = self.inner();
1467 let inner_enabled = self.inner_enabled();
1468 let span = inner_enabled.traces.child("purge");
1469 let key = inner.key.as_ref().unwrap().to_compact();
1470 Self::purge_impl(inner_enabled.storage, inner_enabled.eviction, &key, span).await
1471 }
1472 _ => panic!("wrong phase {:?}", self.phase),
1473 }
1474 }
1475
1476 /// Delete the asset from the cache storage via a spawned task.
1477 /// Returns corresponding `JoinHandle` of that task.
1478 /// # Panic
1479 /// Need to be called after the cache key is set. Panic otherwise.
1480 pub fn spawn_async_purge(
1481 &self,
1482 context: &'static str,
1483 ) -> tokio::task::JoinHandle<Result<bool>> {
1484 if matches!(self.phase, CachePhase::Disabled(_) | CachePhase::Uninit) {
1485 panic!("wrong phase {:?}", self.phase);
1486 }
1487
1488 let inner_enabled = self.inner_enabled();
1489 let span = inner_enabled.traces.child("purge");
1490 let key = self.inner().key.as_ref().unwrap().to_compact();
1491 let storage = inner_enabled.storage;
1492 let eviction = inner_enabled.eviction;
1493 tokio::task::spawn(async move {
1494 Self::purge_impl(storage, eviction, &key, span)
1495 .await
1496 .map_err(|e| {
1497 warn!("Failed to purge {key} (context: {context}): {e}");
1498 e
1499 })
1500 })
1501 }
1502
1503 async fn purge_impl(
1504 storage: &'static (dyn storage::Storage + Sync),
1505 eviction: Option<&'static (dyn eviction::EvictionManager + Sync)>,
1506 key: &CompactCacheKey,
1507 mut span: Span,
1508 ) -> Result<bool> {
1509 let result = storage
1510 .purge(key, PurgeType::Invalidation, &span.handle())
1511 .await;
1512 let purged = matches!(result, Ok(true));
1513 // need to inform eviction manager if asset was removed
1514 if let Some(eviction) = eviction.as_ref() {
1515 if purged {
1516 eviction.remove(key);
1517 }
1518 }
1519 span.set_tag(|| trace::Tag::new("purged", purged));
1520 result
1521 }
1522
1523 /// Check the cacheable prediction
1524 ///
1525 /// Return true if the predictor is not set
1526 pub fn cacheable_prediction(&self) -> bool {
1527 if let Some(predictor) = self.inner().predictor {
1528 predictor.cacheable_prediction(self.cache_key())
1529 } else {
1530 true
1531 }
1532 }
1533
1534 /// Tell the predictor that this response, which is previously predicted to be uncacheable,
1535 /// is cacheable now.
1536 pub fn response_became_cacheable(&self) {
1537 if let Some(predictor) = self.inner().predictor {
1538 predictor.mark_cacheable(self.cache_key());
1539 }
1540 }
1541
1542 /// Tell the predictor that this response is uncacheable so that it will know next time
1543 /// this request arrives.
1544 pub fn response_became_uncacheable(&self, reason: NoCacheReason) {
1545 if let Some(predictor) = self.inner().predictor {
1546 predictor.mark_uncacheable(self.cache_key(), reason);
1547 }
1548 }
1549
1550 /// Tag all spans as being part of a subrequest.
1551 pub fn tag_as_subrequest(&mut self) {
1552 self.inner_enabled_mut()
1553 .traces
1554 .cache_span
1555 .set_tag(|| Tag::new("is_subrequest", true))
1556 }
1557}