pingora_cache/
lib.rs

1// Copyright 2025 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The HTTP caching layer for proxies.
16
17#![allow(clippy::new_without_default)]
18
19use cf_rustracing::tag::Tag;
20use http::{method::Method, request::Parts as ReqHeader, response::Parts as RespHeader};
21use key::{CacheHashKey, CompactCacheKey, HashBinary};
22use lock::WritePermit;
23use log::warn;
24use pingora_error::Result;
25use pingora_http::ResponseHeader;
26use pingora_timeout::timeout;
27use std::time::{Duration, Instant, SystemTime};
28use storage::MissFinishType;
29use strum::IntoStaticStr;
30use trace::{CacheTraceCTX, Span};
31
32pub mod cache_control;
33pub mod eviction;
34pub mod filters;
35pub mod hashtable;
36pub mod key;
37pub mod lock;
38pub mod max_file_size;
39mod memory;
40pub mod meta;
41pub mod predictor;
42pub mod put;
43pub mod storage;
44pub mod trace;
45mod variance;
46
47use crate::max_file_size::MaxFileSizeTracker;
48pub use key::CacheKey;
49use lock::{CacheKeyLockImpl, LockStatus, Locked};
50pub use memory::MemCache;
51pub use meta::{set_compression_dict_content, set_compression_dict_path};
52pub use meta::{CacheMeta, CacheMetaDefaults};
53pub use storage::{HitHandler, MissHandler, PurgeType, Storage};
54pub use variance::VarianceBuilder;
55
56pub mod prelude {}
57
58/// The state machine for http caching
59///
60/// This object is used to handle the state and transitions for HTTP caching through the life of a
61/// request.
62pub struct HttpCache {
63    phase: CachePhase,
64    // Box the rest so that a disabled HttpCache struct is small
65    inner: Option<Box<HttpCacheInner>>,
66    digest: HttpCacheDigest,
67}
68
69/// This reflects the phase of HttpCache during the lifetime of a request
70#[derive(Clone, Copy, Debug, PartialEq, Eq)]
71pub enum CachePhase {
72    /// Cache disabled, with reason (NeverEnabled if never explicitly used)
73    Disabled(NoCacheReason),
74    /// Cache enabled but nothing is set yet
75    Uninit,
76    /// Cache was enabled, the request decided not to use it
77    // HttpCache.inner_enabled is kept
78    Bypass,
79    /// Awaiting the cache key to be generated
80    CacheKey,
81    /// Cache hit
82    Hit,
83    /// No cached asset is found
84    Miss,
85    /// A staled (expired) asset is found
86    Stale,
87    /// A staled (expired) asset was found, but another request is revalidating it
88    StaleUpdating,
89    /// A staled (expired) asset was found, so a fresh one was fetched
90    Expired,
91    /// A staled (expired) asset was found, and it was revalidated to be fresh
92    Revalidated,
93    /// Revalidated, but deemed uncacheable, so we do not freshen it
94    RevalidatedNoCache(NoCacheReason),
95}
96
97impl CachePhase {
98    /// Convert [CachePhase] as `str`, for logging and debugging.
99    pub fn as_str(&self) -> &'static str {
100        match self {
101            CachePhase::Disabled(_) => "disabled",
102            CachePhase::Uninit => "uninitialized",
103            CachePhase::Bypass => "bypass",
104            CachePhase::CacheKey => "key",
105            CachePhase::Hit => "hit",
106            CachePhase::Miss => "miss",
107            CachePhase::Stale => "stale",
108            CachePhase::StaleUpdating => "stale-updating",
109            CachePhase::Expired => "expired",
110            CachePhase::Revalidated => "revalidated",
111            CachePhase::RevalidatedNoCache(_) => "revalidated-nocache",
112        }
113    }
114}
115
116/// The possible reasons for not caching
117#[derive(Copy, Clone, Debug, PartialEq, Eq)]
118pub enum NoCacheReason {
119    /// Caching is not enabled to begin with
120    NeverEnabled,
121    /// Origin directives indicated this was not cacheable
122    OriginNotCache,
123    /// Response size was larger than the cache's configured maximum asset size
124    ResponseTooLarge,
125    /// Disabling caching due to unknown body size and previously exceeding maximum asset size;
126    /// the asset is otherwise cacheable, but cache needs to confirm the final size of the asset
127    /// before it can mark it as cacheable again.
128    PredictedResponseTooLarge,
129    /// Due to internal caching storage error
130    StorageError,
131    /// Due to other types of internal issues
132    InternalError,
133    /// will be cacheable but skip cache admission now
134    ///
135    /// This happens when the cache predictor predicted that this request is not cacheable, but
136    /// the response turns out to be OK to cache. However, it might be too large to re-enable caching
137    /// for this request
138    Deferred,
139    /// Due to the proxy upstream filter declining the current request from going upstream
140    DeclinedToUpstream,
141    /// Due to the upstream being unreachable or otherwise erroring during proxying
142    UpstreamError,
143    /// The writer of the cache lock sees that the request is not cacheable (Could be OriginNotCache)
144    CacheLockGiveUp,
145    /// This request waited too long for the writer of the cache lock to finish, so this request will
146    /// fetch from the origin without caching
147    CacheLockTimeout,
148    /// Other custom defined reasons
149    Custom(&'static str),
150}
151
152impl NoCacheReason {
153    /// Convert [NoCacheReason] as `str`, for logging and debugging.
154    pub fn as_str(&self) -> &'static str {
155        use NoCacheReason::*;
156        match self {
157            NeverEnabled => "NeverEnabled",
158            OriginNotCache => "OriginNotCache",
159            ResponseTooLarge => "ResponseTooLarge",
160            PredictedResponseTooLarge => "PredictedResponseTooLarge",
161            StorageError => "StorageError",
162            InternalError => "InternalError",
163            Deferred => "Deferred",
164            DeclinedToUpstream => "DeclinedToUpstream",
165            UpstreamError => "UpstreamError",
166            CacheLockGiveUp => "CacheLockGiveUp",
167            CacheLockTimeout => "CacheLockTimeout",
168            Custom(s) => s,
169        }
170    }
171}
172
173/// Information collected about the caching operation that will not be cleared
174#[derive(Debug, Default)]
175pub struct HttpCacheDigest {
176    pub lock_duration: Option<Duration>,
177    // time spent in cache lookup and reading the header
178    pub lookup_duration: Option<Duration>,
179}
180
181/// Convenience function to add a duration to an optional duration
182fn add_duration_to_opt(target_opt: &mut Option<Duration>, to_add: Duration) {
183    *target_opt = Some(target_opt.map_or(to_add, |existing| existing + to_add));
184}
185
186impl HttpCacheDigest {
187    fn add_lookup_duration(&mut self, extra_lookup_duration: Duration) {
188        add_duration_to_opt(&mut self.lookup_duration, extra_lookup_duration)
189    }
190
191    fn add_lock_duration(&mut self, extra_lock_duration: Duration) {
192        add_duration_to_opt(&mut self.lock_duration, extra_lock_duration)
193    }
194}
195
196/// Response cacheable decision
197///
198///
199#[derive(Debug)]
200pub enum RespCacheable {
201    Cacheable(CacheMeta),
202    Uncacheable(NoCacheReason),
203}
204
205impl RespCacheable {
206    /// Whether it is cacheable
207    #[inline]
208    pub fn is_cacheable(&self) -> bool {
209        matches!(*self, Self::Cacheable(_))
210    }
211
212    /// Unwrap [RespCacheable] to get the [CacheMeta] stored
213    /// # Panic
214    /// Panic when this object is not cacheable. Check [Self::is_cacheable()] first.
215    pub fn unwrap_meta(self) -> CacheMeta {
216        match self {
217            Self::Cacheable(meta) => meta,
218            Self::Uncacheable(_) => panic!("expected Cacheable value"),
219        }
220    }
221}
222
223/// Indicators of which level of purge logic to apply to an asset. As in should
224/// the purged file be revalidated or re-retrieved altogether
225#[derive(Debug, Clone, Copy, PartialEq, Eq)]
226pub enum ForcedInvalidationKind {
227    /// Indicates the asset should be considered stale and revalidated
228    ForceExpired,
229
230    /// Indicates the asset should be considered absent and treated like a miss
231    /// instead of a hit
232    ForceMiss,
233}
234
235/// Freshness state of cache hit asset
236///
237///
238#[derive(Debug, Copy, Clone, IntoStaticStr, PartialEq, Eq)]
239#[strum(serialize_all = "snake_case")]
240pub enum HitStatus {
241    /// The asset's freshness directives indicate it has expired
242    Expired,
243
244    /// The asset was marked as expired, and should be treated as stale
245    ForceExpired,
246
247    /// The asset was marked as absent, and should be treated as a miss
248    ForceMiss,
249
250    /// An error occurred while processing the asset, so it should be treated as
251    /// a miss
252    FailedHitFilter,
253
254    /// The asset is not expired
255    Fresh,
256}
257
258impl HitStatus {
259    /// For displaying cache hit status
260    pub fn as_str(&self) -> &'static str {
261        self.into()
262    }
263
264    /// Whether cached asset can be served as fresh
265    pub fn is_fresh(&self) -> bool {
266        *self == HitStatus::Fresh
267    }
268
269    /// Check whether the hit status should be treated as a miss. A forced miss
270    /// is obviously treated as a miss. A hit-filter failure is treated as a
271    /// miss because we can't use the asset as an actual hit. If we treat it as
272    /// expired, we still might not be able to use it even if revalidation
273    /// succeeds.
274    pub fn is_treated_as_miss(self) -> bool {
275        matches!(self, HitStatus::ForceMiss | HitStatus::FailedHitFilter)
276    }
277}
278
279pub struct LockCtx {
280    pub lock: Option<Locked>,
281    pub cache_lock: &'static CacheKeyLockImpl,
282    pub wait_timeout: Option<Duration>,
283}
284
285// Fields like storage handlers that are needed only when cache is enabled (or bypassing).
286struct HttpCacheInnerEnabled {
287    pub meta: Option<CacheMeta>,
288    // when set, even if an asset exists, it would only be considered valid after this timestamp
289    pub valid_after: Option<SystemTime>,
290    pub miss_handler: Option<MissHandler>,
291    pub body_reader: Option<HitHandler>,
292    pub storage: &'static (dyn storage::Storage + Sync), // static for now
293    pub eviction: Option<&'static (dyn eviction::EvictionManager + Sync)>,
294    pub lock_ctx: Option<LockCtx>,
295    pub traces: trace::CacheTraceCTX,
296}
297
298struct HttpCacheInner {
299    // Prefer adding fields to InnerEnabled if possible, these fields are released
300    // when cache is disabled.
301    // If fields are needed after cache disablement, add directly to Inner.
302    pub enabled_ctx: Option<Box<HttpCacheInnerEnabled>>,
303    pub key: Option<CacheKey>,
304    // when set, an asset will be rejected from the cache if it exceeds configured size in bytes
305    pub max_file_size_tracker: Option<MaxFileSizeTracker>,
306    pub predictor: Option<&'static (dyn predictor::CacheablePredictor + Sync)>,
307}
308
309#[derive(Debug, Default)]
310#[non_exhaustive]
311pub struct CacheOptionOverrides {
312    pub wait_timeout: Option<Duration>,
313}
314
315impl HttpCache {
316    /// Create a new [HttpCache].
317    ///
318    /// Caching is not enabled by default.
319    pub fn new() -> Self {
320        HttpCache {
321            phase: CachePhase::Disabled(NoCacheReason::NeverEnabled),
322            inner: None,
323            digest: HttpCacheDigest::default(),
324        }
325    }
326
327    /// Whether the cache is enabled
328    pub fn enabled(&self) -> bool {
329        !matches!(self.phase, CachePhase::Disabled(_) | CachePhase::Bypass)
330    }
331
332    /// Whether the cache is being bypassed
333    pub fn bypassing(&self) -> bool {
334        matches!(self.phase, CachePhase::Bypass)
335    }
336
337    /// Return the [CachePhase]
338    pub fn phase(&self) -> CachePhase {
339        self.phase
340    }
341
342    /// Whether anything was fetched from the upstream
343    ///
344    /// This essentially checks all possible [CachePhase] who need to contact the upstream server
345    pub fn upstream_used(&self) -> bool {
346        use CachePhase::*;
347        match self.phase {
348            Disabled(_) | Bypass | Miss | Expired | Revalidated | RevalidatedNoCache(_) => true,
349            Hit | Stale | StaleUpdating => false,
350            Uninit | CacheKey => false, // invalid states for this call, treat them as false to keep it simple
351        }
352    }
353
354    /// Check whether the backend storage is the type `T`.
355    pub fn storage_type_is<T: 'static>(&self) -> bool {
356        self.inner
357            .as_ref()
358            .and_then(|inner| {
359                inner
360                    .enabled_ctx
361                    .as_ref()
362                    .and_then(|ie| ie.storage.as_any().downcast_ref::<T>())
363            })
364            .is_some()
365    }
366
367    /// Release the cache lock if the current request is a cache writer.
368    ///
369    /// Generally callers should prefer using `disable` when a cache lock should be released
370    /// due to an error to clear all cache context. This function is for releasing the cache lock
371    /// while still keeping the cache around for reading, e.g. when serving stale.
372    pub fn release_write_lock(&mut self, reason: NoCacheReason) {
373        use NoCacheReason::*;
374        if let Some(inner) = self.inner.as_mut() {
375            if let Some(lock_ctx) = inner
376                .enabled_ctx
377                .as_mut()
378                .and_then(|ie| ie.lock_ctx.as_mut())
379            {
380                let lock = lock_ctx.lock.take();
381                if let Some(Locked::Write(permit)) = lock {
382                    let lock_status = match reason {
383                        // let the next request try to fetch it
384                        InternalError | StorageError | Deferred | UpstreamError => {
385                            LockStatus::TransientError
386                        }
387                        // depends on why the proxy upstream filter declined the request,
388                        // for now still allow next request try to acquire to avoid thundering herd
389                        DeclinedToUpstream => LockStatus::TransientError,
390                        // no need for the lock anymore
391                        OriginNotCache | ResponseTooLarge | PredictedResponseTooLarge => {
392                            LockStatus::GiveUp
393                        }
394                        // not sure which LockStatus make sense, we treat it as GiveUp for now
395                        Custom(_) => LockStatus::GiveUp,
396                        // should never happen, NeverEnabled shouldn't hold a lock
397                        NeverEnabled => panic!("NeverEnabled holds a write lock"),
398                        CacheLockGiveUp | CacheLockTimeout => {
399                            panic!("CacheLock* are for cache lock readers only")
400                        }
401                    };
402                    lock_ctx
403                        .cache_lock
404                        .release(inner.key.as_ref().unwrap(), permit, lock_status);
405                }
406            }
407        }
408    }
409
410    /// Disable caching
411    pub fn disable(&mut self, reason: NoCacheReason) {
412        // XXX: compile type enforce?
413        assert!(
414            reason != NoCacheReason::NeverEnabled,
415            "NeverEnabled not allowed as a disable reason"
416        );
417        match self.phase {
418            CachePhase::Disabled(old_reason) => {
419                // replace reason
420                if old_reason == NoCacheReason::NeverEnabled {
421                    // safeguard, don't allow replacing NeverEnabled as a reason
422                    // TODO: can be promoted to assertion once confirmed nothing is attempting this
423                    warn!("Tried to replace cache NeverEnabled with reason: {reason:?}");
424                    return;
425                }
426                self.phase = CachePhase::Disabled(reason);
427            }
428            _ => {
429                self.phase = CachePhase::Disabled(reason);
430                self.release_write_lock(reason);
431                // enabled_ctx will be cleared out
432                let mut inner_enabled = self
433                    .inner_mut()
434                    .enabled_ctx
435                    .take()
436                    .expect("could remove enabled_ctx on disable");
437                // log initial disable reason
438                inner_enabled
439                    .traces
440                    .cache_span
441                    .set_tag(|| trace::Tag::new("disable_reason", reason.as_str()));
442            }
443        }
444    }
445
446    /* The following methods panic when they are used in the wrong phase.
447     * This is better than returning errors as such panics are only caused by coding error, which
448     * should be fixed right away. Tokio runtime only crashes the current task instead of the whole
449     * program when these panics happen. */
450
451    /// Set the cache to bypass
452    ///
453    /// # Panic
454    /// This call is only allowed in [CachePhase::CacheKey] phase (before any cache lookup is performed).
455    /// Use it in any other phase will lead to panic.
456    pub fn bypass(&mut self) {
457        match self.phase {
458            CachePhase::CacheKey => {
459                // before cache lookup / found / miss
460                self.phase = CachePhase::Bypass;
461                self.inner_enabled_mut()
462                    .traces
463                    .cache_span
464                    .set_tag(|| trace::Tag::new("bypassed", true));
465            }
466            _ => panic!("wrong phase to bypass HttpCache {:?}", self.phase),
467        }
468    }
469
470    /// Enable the cache
471    ///
472    /// - `storage`: the cache storage backend that implements [storage::Storage]
473    /// - `eviction`: optionally the eviction manager, without it, nothing will be evicted from the storage
474    /// - `predictor`: optionally a cache predictor. The cache predictor predicts whether something is likely
475    ///   to be cacheable or not. This is useful because the proxy can apply different types of optimization to
476    ///   cacheable and uncacheable requests.
477    /// - `cache_lock`: optionally a cache lock which handles concurrent lookups to the same asset. Without it
478    ///   such lookups will all be allowed to fetch the asset independently.
479    pub fn enable(
480        &mut self,
481        storage: &'static (dyn storage::Storage + Sync),
482        eviction: Option<&'static (dyn eviction::EvictionManager + Sync)>,
483        predictor: Option<&'static (dyn predictor::CacheablePredictor + Sync)>,
484        cache_lock: Option<&'static CacheKeyLockImpl>,
485        option_overrides: Option<CacheOptionOverrides>,
486    ) {
487        match self.phase {
488            CachePhase::Disabled(_) => {
489                self.phase = CachePhase::Uninit;
490
491                let lock_ctx = cache_lock.map(|cache_lock| LockCtx {
492                    cache_lock,
493                    lock: None,
494                    wait_timeout: option_overrides
495                        .as_ref()
496                        .and_then(|overrides| overrides.wait_timeout),
497                });
498
499                self.inner = Some(Box::new(HttpCacheInner {
500                    enabled_ctx: Some(Box::new(HttpCacheInnerEnabled {
501                        meta: None,
502                        valid_after: None,
503                        miss_handler: None,
504                        body_reader: None,
505                        storage,
506                        eviction,
507                        lock_ctx,
508                        traces: CacheTraceCTX::new(),
509                    })),
510                    key: None,
511                    max_file_size_tracker: None,
512                    predictor,
513                }));
514            }
515            _ => panic!("Cannot enable already enabled HttpCache {:?}", self.phase),
516        }
517    }
518
519    /// Set the cache lock implementation.
520    /// # Panic
521    /// Must be called before a cache lock is attempted to be acquired,
522    /// i.e. in the `cache_key_callback` or `cache_hit_filter` phases.
523    pub fn set_cache_lock(
524        &mut self,
525        cache_lock: Option<&'static CacheKeyLockImpl>,
526        option_overrides: Option<CacheOptionOverrides>,
527    ) {
528        match self.phase {
529            CachePhase::Disabled(_)
530            | CachePhase::CacheKey
531            | CachePhase::Stale
532            | CachePhase::Hit => {
533                let inner_enabled = self.inner_enabled_mut();
534                if inner_enabled
535                    .lock_ctx
536                    .as_ref()
537                    .is_some_and(|ctx| ctx.lock.is_some())
538                {
539                    panic!("lock already set when resetting cache lock")
540                } else {
541                    let lock_ctx = cache_lock.map(|cache_lock| LockCtx {
542                        cache_lock,
543                        lock: None,
544                        wait_timeout: option_overrides.and_then(|overrides| overrides.wait_timeout),
545                    });
546                    inner_enabled.lock_ctx = lock_ctx;
547                }
548            }
549            _ => panic!("wrong phase: {:?}", self.phase),
550        }
551    }
552
553    // Enable distributed tracing
554    pub fn enable_tracing(&mut self, parent_span: trace::Span) {
555        if let Some(inner_enabled) = self.inner.as_mut().and_then(|i| i.enabled_ctx.as_mut()) {
556            inner_enabled.traces.enable(parent_span);
557        }
558    }
559
560    // Get the cache parent tracing span
561    pub fn get_cache_span(&self) -> Option<trace::SpanHandle> {
562        self.inner
563            .as_ref()
564            .and_then(|i| i.enabled_ctx.as_ref().map(|ie| ie.traces.get_cache_span()))
565    }
566
567    // Get the cache `miss` tracing span
568    pub fn get_miss_span(&self) -> Option<trace::SpanHandle> {
569        self.inner
570            .as_ref()
571            .and_then(|i| i.enabled_ctx.as_ref().map(|ie| ie.traces.get_miss_span()))
572    }
573
574    // Get the cache `hit` tracing span
575    pub fn get_hit_span(&self) -> Option<trace::SpanHandle> {
576        self.inner
577            .as_ref()
578            .and_then(|i| i.enabled_ctx.as_ref().map(|ie| ie.traces.get_hit_span()))
579    }
580
581    // shortcut to access inner fields, panic if phase is disabled
582    #[inline]
583    fn inner_enabled_mut(&mut self) -> &mut HttpCacheInnerEnabled {
584        self.inner.as_mut().unwrap().enabled_ctx.as_mut().unwrap()
585    }
586
587    #[inline]
588    fn inner_enabled(&self) -> &HttpCacheInnerEnabled {
589        self.inner.as_ref().unwrap().enabled_ctx.as_ref().unwrap()
590    }
591
592    // shortcut to access inner fields, panic if cache was never enabled
593    #[inline]
594    fn inner_mut(&mut self) -> &mut HttpCacheInner {
595        self.inner.as_mut().unwrap()
596    }
597
598    #[inline]
599    fn inner(&self) -> &HttpCacheInner {
600        self.inner.as_ref().unwrap()
601    }
602
603    /// Set the cache key
604    /// # Panic
605    /// Cache key is only allowed to be set in its own phase. Set it in other phases will cause panic.
606    pub fn set_cache_key(&mut self, key: CacheKey) {
607        match self.phase {
608            CachePhase::Uninit | CachePhase::CacheKey => {
609                self.phase = CachePhase::CacheKey;
610                self.inner_mut().key = Some(key);
611            }
612            _ => panic!("wrong phase {:?}", self.phase),
613        }
614    }
615
616    /// Return the cache key used for asset lookup
617    /// # Panic
618    /// Can only be called after the cache key is set and the cache is not disabled. Panic otherwise.
619    pub fn cache_key(&self) -> &CacheKey {
620        match self.phase {
621            CachePhase::Disabled(NoCacheReason::NeverEnabled) | CachePhase::Uninit => {
622                panic!("wrong phase {:?}", self.phase)
623            }
624            _ => self
625                .inner()
626                .key
627                .as_ref()
628                .expect("cache key should be set (set_cache_key not called?)"),
629        }
630    }
631
632    /// Return the max size allowed to be cached.
633    pub fn max_file_size_bytes(&self) -> Option<usize> {
634        assert!(
635            !matches!(
636                self.phase,
637                CachePhase::Disabled(NoCacheReason::NeverEnabled)
638            ),
639            "tried to access max file size bytes when cache never enabled"
640        );
641        self.inner()
642            .max_file_size_tracker
643            .as_ref()
644            .map(|t| t.max_file_size_bytes())
645    }
646
647    /// Set the maximum response _body_ size in bytes that will be admitted to the cache.
648    ///
649    /// Response header size should not contribute to the max file size.
650    ///
651    /// To track body bytes, call `track_bytes_for_max_file_size`.
652    pub fn set_max_file_size_bytes(&mut self, max_file_size_bytes: usize) {
653        match self.phase {
654            CachePhase::Disabled(_) => panic!("wrong phase {:?}", self.phase),
655            _ => {
656                self.inner_mut().max_file_size_tracker =
657                    Some(MaxFileSizeTracker::new(max_file_size_bytes));
658            }
659        }
660    }
661
662    /// Record body bytes for the max file size tracker.
663    ///
664    /// The `bytes_len` input contributes to a cumulative body byte tracker.
665    ///
666    /// Once the cumulative body bytes exceeds the maximum allowable cache file size (as configured
667    /// by `set_max_file_size_bytes`), then the return value will be false.
668    ///
669    /// Else the return value is true as long as the max file size is not exceeded.
670    /// If max file size was not configured, the return value is always true.
671    pub fn track_body_bytes_for_max_file_size(&mut self, bytes_len: usize) -> bool {
672        // This is intended to be callable when cache has already been disabled,
673        // so that we can re-mark an asset as cacheable if the body size is under limits.
674        assert!(
675            !matches!(
676                self.phase,
677                CachePhase::Disabled(NoCacheReason::NeverEnabled)
678            ),
679            "tried to access max file size bytes when cache never enabled"
680        );
681        self.inner_mut()
682            .max_file_size_tracker
683            .as_mut()
684            .map_or(true, |t| t.add_body_bytes(bytes_len))
685    }
686
687    /// Check if the max file size has been exceeded according to max file size tracker.
688    ///
689    /// Return true if max file size was exceeded.
690    pub fn exceeded_max_file_size(&self) -> bool {
691        assert!(
692            !matches!(
693                self.phase,
694                CachePhase::Disabled(NoCacheReason::NeverEnabled)
695            ),
696            "tried to access max file size bytes when cache never enabled"
697        );
698        self.inner()
699            .max_file_size_tracker
700            .as_ref()
701            .is_some_and(|t| !t.allow_caching())
702    }
703
704    /// Set that cache is found in cache storage.
705    ///
706    /// This function is called after [Self::cache_lookup()] which returns the [CacheMeta] and
707    /// [HitHandler].
708    ///
709    /// The `hit_status` enum allows the caller to force expire assets.
710    pub fn cache_found(&mut self, meta: CacheMeta, hit_handler: HitHandler, hit_status: HitStatus) {
711        // Stale allowed because of cache lock and then retry
712        if !matches!(self.phase, CachePhase::CacheKey | CachePhase::Stale) {
713            panic!("wrong phase {:?}", self.phase)
714        }
715
716        self.phase = match hit_status {
717            HitStatus::Fresh => CachePhase::Hit,
718            HitStatus::Expired | HitStatus::ForceExpired => CachePhase::Stale,
719            HitStatus::FailedHitFilter | HitStatus::ForceMiss => self.phase,
720        };
721
722        let phase = self.phase;
723        let inner = self.inner_mut();
724
725        let key = inner.key.as_ref().expect("key must be set on hit");
726        let inner_enabled = inner
727            .enabled_ctx
728            .as_mut()
729            .expect("cache_found must be called while cache enabled");
730
731        // The cache lock might not be set for stale hit or hits treated as
732        // misses, so we need to initialize it here
733        if phase == CachePhase::Stale || hit_status.is_treated_as_miss() {
734            if let Some(lock_ctx) = inner_enabled.lock_ctx.as_mut() {
735                lock_ctx.lock = Some(lock_ctx.cache_lock.lock(key));
736            }
737        }
738
739        if hit_status.is_treated_as_miss() {
740            // Clear the body and meta for hits that are treated as misses
741            inner_enabled.body_reader = None;
742            inner_enabled.meta = None;
743        } else {
744            // Set the metadata appropriately for legit hits
745            inner_enabled.traces.start_hit_span(phase, hit_status);
746            inner_enabled.traces.log_meta_in_hit_span(&meta);
747            if let Some(eviction) = inner_enabled.eviction {
748                // TODO: make access() accept CacheKey
749                let cache_key = key.to_compact();
750                if hit_handler.should_count_access() {
751                    let size = hit_handler.get_eviction_weight();
752                    eviction.access(&cache_key, size, meta.0.internal.fresh_until);
753                }
754            }
755            inner_enabled.meta = Some(meta);
756            inner_enabled.body_reader = Some(hit_handler);
757        }
758    }
759
760    /// Mark `self` to be cache miss.
761    ///
762    /// This function is called after [Self::cache_lookup()] finds nothing or the caller decides
763    /// not to use the assets found.
764    /// # Panic
765    /// Panic in other phases.
766    pub fn cache_miss(&mut self) {
767        match self.phase {
768            // from CacheKey: set state to miss during cache lookup
769            // from Bypass: response became cacheable, set state to miss to cache
770            // from Stale: waited for cache lock, then retried and found asset was gone
771            CachePhase::CacheKey | CachePhase::Bypass | CachePhase::Stale => {
772                self.phase = CachePhase::Miss;
773                // It's possible that we've set the meta on lookup and have come back around
774                // here after not being able to acquire the cache lock, and our item has since
775                // purged or expired. We should be sure that the meta is not set in this case
776                // as there shouldn't be a meta set for cache misses.
777                self.inner_enabled_mut().meta = None;
778                self.inner_enabled_mut().traces.start_miss_span();
779            }
780            _ => panic!("wrong phase {:?}", self.phase),
781        }
782    }
783
784    /// Return the [HitHandler]
785    /// # Panic
786    /// Call this after [Self::cache_found()], panic in other phases.
787    pub fn hit_handler(&mut self) -> &mut HitHandler {
788        match self.phase {
789            CachePhase::Hit
790            | CachePhase::Stale
791            | CachePhase::StaleUpdating
792            | CachePhase::Revalidated
793            | CachePhase::RevalidatedNoCache(_) => {
794                self.inner_enabled_mut().body_reader.as_mut().unwrap()
795            }
796            _ => panic!("wrong phase {:?}", self.phase),
797        }
798    }
799
800    /// Return the body reader during a cache admission (miss/expired) which decouples the downstream
801    /// read and upstream cache write
802    pub fn miss_body_reader(&mut self) -> Option<&mut HitHandler> {
803        match self.phase {
804            CachePhase::Miss | CachePhase::Expired => {
805                let inner_enabled = self.inner_enabled_mut();
806                if inner_enabled.storage.support_streaming_partial_write() {
807                    inner_enabled.body_reader.as_mut()
808                } else {
809                    // body_reader could be set even when the storage doesn't support streaming
810                    // Expired cache would have the reader set.
811                    None
812                }
813            }
814            _ => None,
815        }
816    }
817
818    /// Call this when cache hit is fully read.
819    ///
820    /// This call will release resource if any and log the timing in tracing if set.
821    /// # Panic
822    /// Panic in phases where there is no cache hit.
823    pub async fn finish_hit_handler(&mut self) -> Result<()> {
824        match self.phase {
825            CachePhase::Hit
826            | CachePhase::Miss
827            | CachePhase::Expired
828            | CachePhase::Stale
829            | CachePhase::StaleUpdating
830            | CachePhase::Revalidated
831            | CachePhase::RevalidatedNoCache(_) => {
832                let inner = self.inner_mut();
833                let inner_enabled = inner.enabled_ctx.as_mut().expect("cache enabled");
834                if inner_enabled.body_reader.is_none() {
835                    // already finished, we allow calling this function more than once
836                    return Ok(());
837                }
838                let body_reader = inner_enabled.body_reader.take().unwrap();
839                let key = inner.key.as_ref().unwrap();
840                let result = body_reader
841                    .finish(
842                        inner_enabled.storage,
843                        key,
844                        &inner_enabled.traces.hit_span.handle(),
845                    )
846                    .await;
847                inner_enabled.traces.finish_hit_span();
848                result
849            }
850            _ => panic!("wrong phase {:?}", self.phase),
851        }
852    }
853
854    /// Set the [MissHandler] according to cache_key and meta, can only call once
855    pub async fn set_miss_handler(&mut self) -> Result<()> {
856        match self.phase {
857            // set_miss_handler() needs to be called after set_cache_meta() (which change Stale to Expire).
858            // This is an artificial rule to enforce the state transitions
859            CachePhase::Miss | CachePhase::Expired => {
860                let inner = self.inner_mut();
861                let inner_enabled = inner
862                    .enabled_ctx
863                    .as_mut()
864                    .expect("cache enabled on miss and expired");
865                if inner_enabled.miss_handler.is_some() {
866                    panic!("write handler is already set")
867                }
868                let meta = inner_enabled.meta.as_ref().unwrap();
869                let key = inner.key.as_ref().unwrap();
870                let miss_handler = inner_enabled
871                    .storage
872                    .get_miss_handler(key, meta, &inner_enabled.traces.get_miss_span())
873                    .await?;
874
875                inner_enabled.miss_handler = Some(miss_handler);
876
877                if inner_enabled.storage.support_streaming_partial_write() {
878                    // If a reader can access partial write, the cache lock can be released here
879                    // to let readers start reading the body.
880                    if let Some(lock_ctx) = inner_enabled.lock_ctx.as_mut() {
881                        let lock = lock_ctx.lock.take();
882                        if let Some(Locked::Write(permit)) = lock {
883                            lock_ctx.cache_lock.release(key, permit, LockStatus::Done);
884                        }
885                    }
886                    // Downstream read and upstream write can be decoupled
887                    let body_reader = inner_enabled
888                        .storage
889                        .lookup_streaming_write(
890                            key,
891                            inner_enabled
892                                .miss_handler
893                                .as_ref()
894                                .expect("miss handler already set")
895                                .streaming_write_tag(),
896                            &inner_enabled.traces.get_miss_span(),
897                        )
898                        .await?;
899
900                    if let Some((_meta, body_reader)) = body_reader {
901                        inner_enabled.body_reader = Some(body_reader);
902                    } else {
903                        // body_reader should exist now because streaming_partial_write is to support it
904                        panic!("unable to get body_reader for {:?}", meta);
905                    }
906                }
907                Ok(())
908            }
909            _ => panic!("wrong phase {:?}", self.phase),
910        }
911    }
912
913    /// Return the [MissHandler] to write the response body to cache.
914    ///
915    /// `None`: the handler has not been set or already finished
916    pub fn miss_handler(&mut self) -> Option<&mut MissHandler> {
917        match self.phase {
918            CachePhase::Miss | CachePhase::Expired => {
919                self.inner_enabled_mut().miss_handler.as_mut()
920            }
921            _ => panic!("wrong phase {:?}", self.phase),
922        }
923    }
924
925    /// Finish cache admission
926    ///
927    /// If [self] is dropped without calling this, the cache admission is considered incomplete and
928    /// should be cleaned up.
929    ///
930    /// This call will also trigger eviction if set.
931    pub async fn finish_miss_handler(&mut self) -> Result<()> {
932        match self.phase {
933            CachePhase::Miss | CachePhase::Expired => {
934                let inner = self.inner_mut();
935                let inner_enabled = inner
936                    .enabled_ctx
937                    .as_mut()
938                    .expect("cache enabled on miss and expired");
939                if inner_enabled.miss_handler.is_none() {
940                    // already finished, we allow calling this function more than once
941                    return Ok(());
942                }
943                let miss_handler = inner_enabled.miss_handler.take().unwrap();
944                let size = miss_handler.finish().await?;
945                let key = inner
946                    .key
947                    .as_ref()
948                    .expect("key set by miss or expired phase");
949                if let Some(lock_ctx) = inner_enabled.lock_ctx.as_mut() {
950                    let lock = lock_ctx.lock.take();
951                    if let Some(Locked::Write(permit)) = lock {
952                        // no need to call r.unlock() because release() will call it
953                        // r is a guard to make sure the lock is unlocked when this request is dropped
954                        lock_ctx.cache_lock.release(key, permit, LockStatus::Done);
955                    }
956                }
957                if let Some(eviction) = inner_enabled.eviction {
958                    let cache_key = key.to_compact();
959                    let meta = inner_enabled.meta.as_ref().unwrap();
960                    let evicted = match size {
961                        MissFinishType::Created(size) => {
962                            eviction.admit(cache_key, size, meta.0.internal.fresh_until)
963                        }
964                        MissFinishType::Appended(size) => {
965                            eviction.increment_weight(cache_key, size)
966                        }
967                    };
968                    // actual eviction can be done async
969                    let span = inner_enabled.traces.child("eviction");
970                    let handle = span.handle();
971                    let storage = inner_enabled.storage;
972                    tokio::task::spawn(async move {
973                        for item in evicted {
974                            if let Err(e) = storage.purge(&item, PurgeType::Eviction, &handle).await
975                            {
976                                warn!("Failed to purge {item} during eviction for finish miss handler: {e}");
977                            }
978                        }
979                    });
980                }
981                inner_enabled.traces.finish_miss_span();
982                Ok(())
983            }
984            _ => panic!("wrong phase {:?}", self.phase),
985        }
986    }
987
988    /// Set the [CacheMeta] of the cache
989    pub fn set_cache_meta(&mut self, meta: CacheMeta) {
990        match self.phase {
991            // TODO: store the staled meta somewhere else for future use?
992            CachePhase::Stale | CachePhase::Miss => {
993                let inner_enabled = self.inner_enabled_mut();
994                // TODO: have a separate expired span?
995                inner_enabled.traces.log_meta_in_miss_span(&meta);
996                inner_enabled.meta = Some(meta);
997            }
998            _ => panic!("wrong phase {:?}", self.phase),
999        }
1000        if self.phase == CachePhase::Stale {
1001            self.phase = CachePhase::Expired;
1002        }
1003    }
1004
1005    /// Set the [CacheMeta] of the cache after revalidation.
1006    ///
1007    /// Certain info such as the original cache admission time will be preserved. Others will
1008    /// be replaced by the input `meta`.
1009    pub async fn revalidate_cache_meta(&mut self, mut meta: CacheMeta) -> Result<bool> {
1010        let result = match self.phase {
1011            CachePhase::Stale => {
1012                let inner = self.inner_mut();
1013                let inner_enabled = inner
1014                    .enabled_ctx
1015                    .as_mut()
1016                    .expect("stale phase has cache enabled");
1017                // TODO: we should keep old meta in place, just use new one to update it
1018                // that requires cacheable_filter to take a mut header and just return InternalMeta
1019
1020                // update new meta with old meta's created time
1021                let old_meta = inner_enabled.meta.take().unwrap();
1022                let created = old_meta.0.internal.created;
1023                meta.0.internal.created = created;
1024                // meta.internal.updated was already set to new meta's `created`,
1025                // no need to set `updated` here
1026                // Merge old extensions with new ones. New exts take precedence if they conflict.
1027                let mut extensions = old_meta.0.extensions;
1028                extensions.extend(meta.0.extensions);
1029                meta.0.extensions = extensions;
1030
1031                inner_enabled.meta.replace(meta);
1032
1033                if let Some(lock_ctx) = inner_enabled.lock_ctx.as_mut() {
1034                    let lock = lock_ctx.lock.take();
1035                    if let Some(Locked::Write(permit)) = lock {
1036                        lock_ctx.cache_lock.release(
1037                            inner.key.as_ref().expect("key set by stale phase"),
1038                            permit,
1039                            LockStatus::Done,
1040                        );
1041                    }
1042                }
1043
1044                let mut span = inner_enabled.traces.child("update_meta");
1045                // TODO: this call can be async
1046                let result = inner_enabled
1047                    .storage
1048                    .update_meta(
1049                        inner.key.as_ref().unwrap(),
1050                        inner_enabled.meta.as_ref().unwrap(),
1051                        &span.handle(),
1052                    )
1053                    .await;
1054                span.set_tag(|| trace::Tag::new("updated", result.is_ok()));
1055                result
1056            }
1057            _ => panic!("wrong phase {:?}", self.phase),
1058        };
1059        self.phase = CachePhase::Revalidated;
1060        result
1061    }
1062
1063    /// After a successful revalidation, update certain headers for the cached asset
1064    /// such as `Etag` with the fresh response header `resp`.
1065    pub fn revalidate_merge_header(&mut self, resp: &RespHeader) -> ResponseHeader {
1066        match self.phase {
1067            CachePhase::Stale => {
1068                /*
1069                 * https://datatracker.ietf.org/doc/html/rfc9110#section-15.4.5
1070                 * 304 response MUST generate ... would have been sent in a 200 ...
1071                 * - Content-Location, Date, ETag, and Vary
1072                 * - Cache-Control and Expires...
1073                 */
1074                let mut old_header = self.inner_enabled().meta.as_ref().unwrap().0.header.clone();
1075                let mut clone_header = |header_name: &'static str| {
1076                    for (i, value) in resp.headers.get_all(header_name).iter().enumerate() {
1077                        if i == 0 {
1078                            old_header
1079                                .insert_header(header_name, value)
1080                                .expect("can add valid header");
1081                        } else {
1082                            old_header
1083                                .append_header(header_name, value)
1084                                .expect("can add valid header");
1085                        }
1086                    }
1087                };
1088                clone_header("cache-control");
1089                clone_header("expires");
1090                clone_header("cache-tag");
1091                clone_header("cdn-cache-control");
1092                clone_header("etag");
1093                // https://datatracker.ietf.org/doc/html/rfc9111#section-4.3.4
1094                // "...cache MUST update its header fields with the header fields provided in the 304..."
1095                // But if the Vary header changes, the cached response may no longer match the
1096                // incoming request.
1097                //
1098                // For simplicity, ignore changing Vary in revalidation for now.
1099                // TODO: if we support vary during revalidation, there are a few edge cases to
1100                // consider (what if Vary header appears/disappears/changes)?
1101                //
1102                // clone_header("vary");
1103                old_header
1104            }
1105            _ => panic!("wrong phase {:?}", self.phase),
1106        }
1107    }
1108
1109    /// Mark this asset uncacheable after revalidation
1110    pub fn revalidate_uncacheable(&mut self, header: ResponseHeader, reason: NoCacheReason) {
1111        match self.phase {
1112            CachePhase::Stale => {
1113                // replace cache meta header
1114                self.inner_enabled_mut().meta.as_mut().unwrap().0.header = header;
1115                // upstream request done, release write lock
1116                self.release_write_lock(reason);
1117            }
1118            _ => panic!("wrong phase {:?}", self.phase),
1119        }
1120        self.phase = CachePhase::RevalidatedNoCache(reason);
1121        // TODO: remove this asset from cache once finished?
1122    }
1123
1124    /// Mark this asset as stale, but being updated separately from this request.
1125    pub fn set_stale_updating(&mut self) {
1126        match self.phase {
1127            CachePhase::Stale => self.phase = CachePhase::StaleUpdating,
1128            _ => panic!("wrong phase {:?}", self.phase),
1129        }
1130    }
1131
1132    /// Update the variance of the [CacheMeta].
1133    ///
1134    /// Note that this process may change the lookup `key`, and eventually (when the asset is
1135    /// written to storage) invalidate other cached variants under the same primary key as the
1136    /// current asset.
1137    pub fn update_variance(&mut self, variance: Option<HashBinary>) {
1138        // If this is a cache miss, we will simply update the variance in the meta.
1139        //
1140        // If this is an expired response, we will have to consider a few cases:
1141        //
1142        // **Case 1**: Variance was absent, but caller sets it now.
1143        // We will just insert it into the meta. The current asset becomes the primary variant.
1144        // Because the current location of the asset is already the primary variant, nothing else
1145        // needs to be done.
1146        //
1147        // **Case 2**: Variance was present, but it changed or was removed.
1148        // We want the current asset to take over the primary slot, in order to invalidate all
1149        // other variants derived under the old Vary.
1150        //
1151        // **Case 3**: Variance did not change.
1152        // Nothing needs to happen.
1153        let inner = match self.phase {
1154            CachePhase::Miss | CachePhase::Expired => self.inner_mut(),
1155            _ => panic!("wrong phase {:?}", self.phase),
1156        };
1157        let inner_enabled = inner
1158            .enabled_ctx
1159            .as_mut()
1160            .expect("cache enabled on miss and expired");
1161
1162        // Update the variance in the meta
1163        if let Some(variance_hash) = variance.as_ref() {
1164            inner_enabled
1165                .meta
1166                .as_mut()
1167                .unwrap()
1168                .set_variance_key(*variance_hash);
1169        } else {
1170            inner_enabled.meta.as_mut().unwrap().remove_variance();
1171        }
1172
1173        // Change the lookup `key` if necessary, in order to admit asset into the primary slot
1174        // instead of the secondary slot.
1175        let key = inner.key.as_ref().unwrap();
1176        if let Some(old_variance) = key.get_variance_key().as_ref() {
1177            // This is a secondary variant slot.
1178            if Some(*old_variance) != variance.as_ref() {
1179                // This new variance does not match the variance in the cache key we used to look
1180                // up this asset.
1181                // Drop the cache lock to avoid leaving a dangling lock
1182                // (because we locked with the old cache key for the secondary slot)
1183                // TODO: maybe we should try to signal waiting readers to compete for the primary key
1184                // lock instead? we will not be modifying this secondary slot so it's not actually
1185                // ready for readers
1186                if let Some(lock_ctx) = inner_enabled.lock_ctx.as_mut() {
1187                    if let Some(Locked::Write(permit)) = lock_ctx.lock.take() {
1188                        lock_ctx.cache_lock.release(key, permit, LockStatus::Done);
1189                    }
1190                }
1191                // Remove the `variance` from the `key`, so that we admit this asset into the
1192                // primary slot. (`key` is used to tell storage where to write the data.)
1193                inner.key.as_mut().unwrap().remove_variance_key();
1194            }
1195        }
1196    }
1197
1198    /// Return the [CacheMeta] of this asset
1199    ///
1200    /// # Panic
1201    /// Panic in phases which has no cache meta.
1202    pub fn cache_meta(&self) -> &CacheMeta {
1203        match self.phase {
1204            // TODO: allow in Bypass phase?
1205            CachePhase::Stale
1206            | CachePhase::StaleUpdating
1207            | CachePhase::Expired
1208            | CachePhase::Hit
1209            | CachePhase::Revalidated
1210            | CachePhase::RevalidatedNoCache(_) => self.inner_enabled().meta.as_ref().unwrap(),
1211            CachePhase::Miss => {
1212                // this is the async body read case, safe because body_reader is only set
1213                // after meta is retrieved
1214                if self.inner_enabled().body_reader.is_some() {
1215                    self.inner_enabled().meta.as_ref().unwrap()
1216                } else {
1217                    panic!("wrong phase {:?}", self.phase);
1218                }
1219            }
1220
1221            _ => panic!("wrong phase {:?}", self.phase),
1222        }
1223    }
1224
1225    /// Return the [CacheMeta] of this asset if any
1226    ///
1227    /// Different from [Self::cache_meta()], this function is allowed to be called in
1228    /// [CachePhase::Miss] phase where the cache meta maybe set.
1229    /// # Panic
1230    /// Panic in phases that shouldn't have cache meta.
1231    pub fn maybe_cache_meta(&self) -> Option<&CacheMeta> {
1232        match self.phase {
1233            CachePhase::Miss
1234            | CachePhase::Stale
1235            | CachePhase::StaleUpdating
1236            | CachePhase::Expired
1237            | CachePhase::Hit
1238            | CachePhase::Revalidated
1239            | CachePhase::RevalidatedNoCache(_) => self.inner_enabled().meta.as_ref(),
1240            _ => panic!("wrong phase {:?}", self.phase),
1241        }
1242    }
1243
1244    /// Perform the cache lookup from the given cache storage with the given cache key
1245    ///
1246    /// A cache hit will return [CacheMeta] which contains the header and meta info about
1247    /// the cache as well as a [HitHandler] to read the cache hit body.
1248    /// # Panic
1249    /// Panic in other phases.
1250    pub async fn cache_lookup(&mut self) -> Result<Option<(CacheMeta, HitHandler)>> {
1251        match self.phase {
1252            // Stale is allowed here because stale-> cache_lock -> lookup again
1253            CachePhase::CacheKey | CachePhase::Stale => {
1254                let inner = self
1255                    .inner
1256                    .as_mut()
1257                    .expect("Cache phase is checked and should have inner");
1258                let inner_enabled = inner
1259                    .enabled_ctx
1260                    .as_mut()
1261                    .expect("Cache enabled on cache_lookup");
1262                let mut span = inner_enabled.traces.child("lookup");
1263                let key = inner.key.as_ref().unwrap(); // safe, this phase should have cache key
1264                let now = Instant::now();
1265                let result = inner_enabled.storage.lookup(key, &span.handle()).await?;
1266                // one request may have multiple lookups
1267                self.digest.add_lookup_duration(now.elapsed());
1268                let result = result.and_then(|(meta, header)| {
1269                    if let Some(ts) = inner_enabled.valid_after {
1270                        if meta.created() < ts {
1271                            span.set_tag(|| trace::Tag::new("not valid", true));
1272                            return None;
1273                        }
1274                    }
1275                    Some((meta, header))
1276                });
1277                if result.is_none() {
1278                    if let Some(lock_ctx) = inner_enabled.lock_ctx.as_mut() {
1279                        lock_ctx.lock = Some(lock_ctx.cache_lock.lock(key));
1280                    }
1281                }
1282                span.set_tag(|| trace::Tag::new("found", result.is_some()));
1283                Ok(result)
1284            }
1285            _ => panic!("wrong phase {:?}", self.phase),
1286        }
1287    }
1288
1289    /// Update variance and see if the meta matches the current variance
1290    ///
1291    /// `cache_lookup() -> compute vary hash -> cache_vary_lookup()`
1292    /// This function allows callers to compute vary based on the initial cache hit.
1293    /// `meta` should be the ones returned from the initial cache_lookup()
1294    /// - return true if the meta is the variance.
1295    /// - return false if the current meta doesn't match the variance, need to cache_lookup() again
1296    pub fn cache_vary_lookup(&mut self, variance: HashBinary, meta: &CacheMeta) -> bool {
1297        match self.phase {
1298            // Stale is allowed here because stale-> cache_lock -> lookup again
1299            CachePhase::CacheKey | CachePhase::Stale => {
1300                let inner = self.inner_mut();
1301                // make sure that all variances found are fresher than this asset
1302                // this is because when purging all the variance, only the primary slot is deleted
1303                // the created TS of the primary is the tombstone of all the variances
1304                inner
1305                    .enabled_ctx
1306                    .as_mut()
1307                    .expect("cache enabled")
1308                    .valid_after = Some(meta.created());
1309
1310                // update vary
1311                let key = inner.key.as_mut().unwrap();
1312                // if no variance was previously set, then this is the first cache hit
1313                let is_initial_cache_hit = key.get_variance_key().is_none();
1314                key.set_variance_key(variance);
1315                let variance_binary = key.variance_bin();
1316                let matches_variance = meta.variance() == variance_binary;
1317
1318                // We should remove the variance in the lookup `key` if this is the primary variant
1319                // slot. We know this is the primary variant slot if this is the initial cache hit,
1320                // AND the variance in the `key` already matches the `meta`'s.
1321                //
1322                // For the primary variant slot, the storage backend needs to use the primary key
1323                // for both cache lookup and updating the meta. Otherwise it will look for the
1324                // asset in the wrong location during revalidation.
1325                //
1326                // We can recreate the "full" cache key by using the meta's variance, if needed.
1327                if matches_variance && is_initial_cache_hit {
1328                    inner.key.as_mut().unwrap().remove_variance_key();
1329                }
1330
1331                matches_variance
1332            }
1333            _ => panic!("wrong phase {:?}", self.phase),
1334        }
1335    }
1336
1337    /// Whether this request is behind a cache lock in order to wait for another request to read the
1338    /// asset.
1339    pub fn is_cache_locked(&self) -> bool {
1340        matches!(
1341            self.inner_enabled()
1342                .lock_ctx
1343                .as_ref()
1344                .and_then(|l| l.lock.as_ref()),
1345            Some(Locked::Read(_))
1346        )
1347    }
1348
1349    /// Whether this request is the leader request to fetch the assets for itself and other requests
1350    /// behind the cache lock.
1351    pub fn is_cache_lock_writer(&self) -> bool {
1352        matches!(
1353            self.inner_enabled()
1354                .lock_ctx
1355                .as_ref()
1356                .and_then(|l| l.lock.as_ref()),
1357            Some(Locked::Write(_))
1358        )
1359    }
1360
1361    /// Take the write lock from this request to transfer it to another one.
1362    /// # Panic
1363    ///  Call is_cache_lock_writer() to check first, will panic otherwise.
1364    pub fn take_write_lock(&mut self) -> (WritePermit, &'static CacheKeyLockImpl) {
1365        let lock_ctx = self
1366            .inner_enabled_mut()
1367            .lock_ctx
1368            .as_mut()
1369            .expect("take_write_lock() called without cache lock");
1370        let lock = lock_ctx
1371            .lock
1372            .take()
1373            .expect("take_write_lock() called without lock");
1374        match lock {
1375            Locked::Write(w) => (w, lock_ctx.cache_lock),
1376            Locked::Read(_) => panic!("take_write_lock() called on read lock"),
1377        }
1378    }
1379
1380    /// Set the write lock, which is usually transferred from [Self::take_write_lock()]
1381    ///
1382    /// # Panic
1383    /// Panics if cache lock was not originally configured for this request.
1384    // TODO: it may make sense to allow configuring the CacheKeyLock here too that the write permit
1385    // is associated with
1386    // (The WritePermit comes from the CacheKeyLock and should be used when releasing from the CacheKeyLock,
1387    // shouldn't be possible to give a WritePermit to a request using a different CacheKeyLock)
1388    pub fn set_write_lock(&mut self, write_lock: WritePermit) {
1389        if let Some(lock_ctx) = self.inner_enabled_mut().lock_ctx.as_mut() {
1390            lock_ctx.lock.replace(Locked::Write(write_lock));
1391        }
1392    }
1393
1394    /// Whether this request's cache hit is staled
1395    fn has_staled_asset(&self) -> bool {
1396        matches!(self.phase, CachePhase::Stale | CachePhase::StaleUpdating)
1397    }
1398
1399    /// Whether this asset is staled and stale if error is allowed
1400    pub fn can_serve_stale_error(&self) -> bool {
1401        self.has_staled_asset() && self.cache_meta().serve_stale_if_error(SystemTime::now())
1402    }
1403
1404    /// Whether this asset is staled and stale while revalidate is allowed.
1405    pub fn can_serve_stale_updating(&self) -> bool {
1406        self.has_staled_asset()
1407            && self
1408                .cache_meta()
1409                .serve_stale_while_revalidate(SystemTime::now())
1410    }
1411
1412    /// Wait for the cache read lock to be unlocked
1413    /// # Panic
1414    /// Check [Self::is_cache_locked()], panic if this request doesn't have a read lock.
1415    pub async fn cache_lock_wait(&mut self) -> LockStatus {
1416        let inner_enabled = self.inner_enabled_mut();
1417        let mut span = inner_enabled.traces.child("cache_lock");
1418        // should always call is_cache_locked() before this function, which should guarantee that
1419        // the inner cache has a read lock and lock ctx
1420        if let Some(lock_ctx) = inner_enabled.lock_ctx.as_mut() {
1421            let lock = lock_ctx.lock.take(); // remove the lock from self
1422            if let Some(Locked::Read(r)) = lock {
1423                let now = Instant::now();
1424                // it's possible for a request to be locked more than once,
1425                // so wait the remainder of our configured timeout
1426                let status = if let Some(wait_timeout) = lock_ctx.wait_timeout {
1427                    let wait_timeout =
1428                        wait_timeout.saturating_sub(self.lock_duration().unwrap_or(Duration::ZERO));
1429                    match timeout(wait_timeout, r.wait()).await {
1430                        Ok(()) => r.lock_status(),
1431                        // TODO: need to differentiate WaitTimeout vs. Lock(Age)Timeout (expired)?
1432                        Err(_) => LockStatus::Timeout,
1433                    }
1434                } else {
1435                    r.wait().await;
1436                    r.lock_status()
1437                };
1438                self.digest.add_lock_duration(now.elapsed());
1439                let tag_value: &'static str = status.into();
1440                span.set_tag(|| Tag::new("status", tag_value));
1441                status
1442            } else {
1443                panic!("cache_lock_wait on wrong type of lock")
1444            }
1445        } else {
1446            panic!("cache_lock_wait without cache lock")
1447        }
1448    }
1449
1450    /// How long did this request wait behind the read lock
1451    pub fn lock_duration(&self) -> Option<Duration> {
1452        self.digest.lock_duration
1453    }
1454
1455    /// How long did this request spent on cache lookup and reading the header
1456    pub fn lookup_duration(&self) -> Option<Duration> {
1457        self.digest.lookup_duration
1458    }
1459
1460    /// Delete the asset from the cache storage
1461    /// # Panic
1462    /// Need to be called after the cache key is set. Panic otherwise.
1463    pub async fn purge(&self) -> Result<bool> {
1464        match self.phase {
1465            CachePhase::CacheKey => {
1466                let inner = self.inner();
1467                let inner_enabled = self.inner_enabled();
1468                let span = inner_enabled.traces.child("purge");
1469                let key = inner.key.as_ref().unwrap().to_compact();
1470                Self::purge_impl(inner_enabled.storage, inner_enabled.eviction, &key, span).await
1471            }
1472            _ => panic!("wrong phase {:?}", self.phase),
1473        }
1474    }
1475
1476    /// Delete the asset from the cache storage via a spawned task.
1477    /// Returns corresponding `JoinHandle` of that task.
1478    /// # Panic
1479    /// Need to be called after the cache key is set. Panic otherwise.
1480    pub fn spawn_async_purge(
1481        &self,
1482        context: &'static str,
1483    ) -> tokio::task::JoinHandle<Result<bool>> {
1484        if matches!(self.phase, CachePhase::Disabled(_) | CachePhase::Uninit) {
1485            panic!("wrong phase {:?}", self.phase);
1486        }
1487
1488        let inner_enabled = self.inner_enabled();
1489        let span = inner_enabled.traces.child("purge");
1490        let key = self.inner().key.as_ref().unwrap().to_compact();
1491        let storage = inner_enabled.storage;
1492        let eviction = inner_enabled.eviction;
1493        tokio::task::spawn(async move {
1494            Self::purge_impl(storage, eviction, &key, span)
1495                .await
1496                .map_err(|e| {
1497                    warn!("Failed to purge {key} (context: {context}): {e}");
1498                    e
1499                })
1500        })
1501    }
1502
1503    async fn purge_impl(
1504        storage: &'static (dyn storage::Storage + Sync),
1505        eviction: Option<&'static (dyn eviction::EvictionManager + Sync)>,
1506        key: &CompactCacheKey,
1507        mut span: Span,
1508    ) -> Result<bool> {
1509        let result = storage
1510            .purge(key, PurgeType::Invalidation, &span.handle())
1511            .await;
1512        let purged = matches!(result, Ok(true));
1513        // need to inform eviction manager if asset was removed
1514        if let Some(eviction) = eviction.as_ref() {
1515            if purged {
1516                eviction.remove(key);
1517            }
1518        }
1519        span.set_tag(|| trace::Tag::new("purged", purged));
1520        result
1521    }
1522
1523    /// Check the cacheable prediction
1524    ///
1525    /// Return true if the predictor is not set
1526    pub fn cacheable_prediction(&self) -> bool {
1527        if let Some(predictor) = self.inner().predictor {
1528            predictor.cacheable_prediction(self.cache_key())
1529        } else {
1530            true
1531        }
1532    }
1533
1534    /// Tell the predictor that this response, which is previously predicted to be uncacheable,
1535    /// is cacheable now.
1536    pub fn response_became_cacheable(&self) {
1537        if let Some(predictor) = self.inner().predictor {
1538            predictor.mark_cacheable(self.cache_key());
1539        }
1540    }
1541
1542    /// Tell the predictor that this response is uncacheable so that it will know next time
1543    /// this request arrives.
1544    pub fn response_became_uncacheable(&self, reason: NoCacheReason) {
1545        if let Some(predictor) = self.inner().predictor {
1546            predictor.mark_uncacheable(self.cache_key(), reason);
1547        }
1548    }
1549
1550    /// Tag all spans as being part of a subrequest.
1551    pub fn tag_as_subrequest(&mut self) {
1552        self.inner_enabled_mut()
1553            .traces
1554            .cache_span
1555            .set_tag(|| Tag::new("is_subrequest", true))
1556    }
1557}