Skip to main content

pingora_cache/
lib.rs

1// Copyright 2026 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The HTTP caching layer for proxies.
16
17#![allow(clippy::new_without_default)]
18
19use cf_rustracing::tag::Tag;
20use http::{method::Method, request::Parts as ReqHeader, response::Parts as RespHeader};
21use key::{CacheHashKey, CompactCacheKey, HashBinary};
22use lock::WritePermit;
23use log::warn;
24use pingora_error::Result;
25use pingora_http::ResponseHeader;
26use pingora_timeout::timeout;
27use std::time::{Duration, Instant, SystemTime};
28use storage::MissFinishType;
29use strum::IntoStaticStr;
30use trace::{CacheTraceCTX, Span};
31
32pub mod cache_control;
33pub mod eviction;
34pub mod filters;
35pub mod hashtable;
36pub mod key;
37pub mod lock;
38pub mod max_file_size;
39mod memory;
40pub mod meta;
41pub mod predictor;
42pub mod put;
43pub mod storage;
44pub mod trace;
45mod variance;
46
47use crate::max_file_size::MaxFileSizeTracker;
48pub use key::CacheKey;
49use lock::{CacheKeyLockImpl, LockStatus, Locked};
50pub use memory::MemCache;
51pub use meta::{set_compression_dict_content, set_compression_dict_path};
52pub use meta::{CacheMeta, CacheMetaDefaults};
53pub use storage::{HitHandler, MissHandler, PurgeType, Storage};
54pub use variance::VarianceBuilder;
55
56pub mod prelude {}
57
58/// The state machine for http caching
59///
60/// This object is used to handle the state and transitions for HTTP caching through the life of a
61/// request.
62pub struct HttpCache {
63    phase: CachePhase,
64    // Box the rest so that a disabled HttpCache struct is small
65    inner: Option<Box<HttpCacheInner>>,
66    digest: HttpCacheDigest,
67}
68
69/// This reflects the phase of HttpCache during the lifetime of a request
70#[derive(Clone, Copy, Debug, PartialEq, Eq)]
71pub enum CachePhase {
72    /// Cache disabled, with reason (NeverEnabled if never explicitly used)
73    Disabled(NoCacheReason),
74    /// Cache enabled but nothing is set yet
75    Uninit,
76    /// Cache was enabled, the request decided not to use it
77    // HttpCache.inner_enabled is kept
78    Bypass,
79    /// Awaiting the cache key to be generated
80    CacheKey,
81    /// Cache hit
82    Hit,
83    /// No cached asset is found
84    Miss,
85    /// A staled (expired) asset is found
86    Stale,
87    /// A staled (expired) asset was found, but another request is revalidating it
88    StaleUpdating,
89    /// A staled (expired) asset was found, so a fresh one was fetched
90    Expired,
91    /// A staled (expired) asset was found, and it was revalidated to be fresh
92    Revalidated,
93    /// Revalidated, but deemed uncacheable, so we do not freshen it
94    RevalidatedNoCache(NoCacheReason),
95}
96
97impl CachePhase {
98    /// Convert [CachePhase] as `str`, for logging and debugging.
99    pub fn as_str(&self) -> &'static str {
100        match self {
101            CachePhase::Disabled(_) => "disabled",
102            CachePhase::Uninit => "uninitialized",
103            CachePhase::Bypass => "bypass",
104            CachePhase::CacheKey => "key",
105            CachePhase::Hit => "hit",
106            CachePhase::Miss => "miss",
107            CachePhase::Stale => "stale",
108            CachePhase::StaleUpdating => "stale-updating",
109            CachePhase::Expired => "expired",
110            CachePhase::Revalidated => "revalidated",
111            CachePhase::RevalidatedNoCache(_) => "revalidated-nocache",
112        }
113    }
114}
115
116/// The possible reasons for not caching
117#[derive(Copy, Clone, Debug, PartialEq, Eq)]
118pub enum NoCacheReason {
119    /// Caching is not enabled to begin with
120    NeverEnabled,
121    /// Origin directives indicated this was not cacheable
122    OriginNotCache,
123    /// Response size was larger than the cache's configured maximum asset size
124    ResponseTooLarge,
125    /// Disabling caching due to unknown body size and previously exceeding maximum asset size;
126    /// the asset is otherwise cacheable, but cache needs to confirm the final size of the asset
127    /// before it can mark it as cacheable again.
128    PredictedResponseTooLarge,
129    /// Due to internal caching storage error
130    StorageError,
131    /// Due to other types of internal issues
132    InternalError,
133    /// will be cacheable but skip cache admission now
134    ///
135    /// This happens when the cache predictor predicted that this request is not cacheable, but
136    /// the response turns out to be OK to cache. However, it might be too large to re-enable caching
137    /// for this request
138    Deferred,
139    /// Due to the proxy upstream filter declining the current request from going upstream
140    DeclinedToUpstream,
141    /// Due to the upstream being unreachable or otherwise erroring during proxying
142    UpstreamError,
143    /// The writer of the cache lock sees that the request is not cacheable (Could be OriginNotCache)
144    CacheLockGiveUp,
145    /// This request waited too long for the writer of the cache lock to finish, so this request will
146    /// fetch from the origin without caching
147    CacheLockTimeout,
148    /// Other custom defined reasons
149    Custom(&'static str),
150}
151
152impl NoCacheReason {
153    /// Convert [NoCacheReason] as `str`, for logging and debugging.
154    pub fn as_str(&self) -> &'static str {
155        use NoCacheReason::*;
156        match self {
157            NeverEnabled => "NeverEnabled",
158            OriginNotCache => "OriginNotCache",
159            ResponseTooLarge => "ResponseTooLarge",
160            PredictedResponseTooLarge => "PredictedResponseTooLarge",
161            StorageError => "StorageError",
162            InternalError => "InternalError",
163            Deferred => "Deferred",
164            DeclinedToUpstream => "DeclinedToUpstream",
165            UpstreamError => "UpstreamError",
166            CacheLockGiveUp => "CacheLockGiveUp",
167            CacheLockTimeout => "CacheLockTimeout",
168            Custom(s) => s,
169        }
170    }
171}
172
173/// Information collected about the caching operation that will not be cleared
174#[derive(Debug, Default)]
175pub struct HttpCacheDigest {
176    pub lock_duration: Option<Duration>,
177    // time spent in cache lookup and reading the header
178    pub lookup_duration: Option<Duration>,
179}
180
181/// Convenience function to add a duration to an optional duration
182fn add_duration_to_opt(target_opt: &mut Option<Duration>, to_add: Duration) {
183    *target_opt = Some(target_opt.map_or(to_add, |existing| existing + to_add));
184}
185
186impl HttpCacheDigest {
187    fn add_lookup_duration(&mut self, extra_lookup_duration: Duration) {
188        add_duration_to_opt(&mut self.lookup_duration, extra_lookup_duration)
189    }
190
191    fn add_lock_duration(&mut self, extra_lock_duration: Duration) {
192        add_duration_to_opt(&mut self.lock_duration, extra_lock_duration)
193    }
194}
195
196/// Response cacheable decision
197///
198///
199#[derive(Debug)]
200pub enum RespCacheable {
201    Cacheable(CacheMeta),
202    Uncacheable(NoCacheReason),
203}
204
205impl RespCacheable {
206    /// Whether it is cacheable
207    #[inline]
208    pub fn is_cacheable(&self) -> bool {
209        matches!(*self, Self::Cacheable(_))
210    }
211
212    /// Unwrap [RespCacheable] to get the [CacheMeta] stored
213    /// # Panic
214    /// Panic when this object is not cacheable. Check [Self::is_cacheable()] first.
215    pub fn unwrap_meta(self) -> CacheMeta {
216        match self {
217            Self::Cacheable(meta) => meta,
218            Self::Uncacheable(_) => panic!("expected Cacheable value"),
219        }
220    }
221}
222
223/// Indicators of which level of cache freshness logic to force apply to an asset.
224///
225/// For example, should an existing fresh asset be revalidated or re-retrieved altogether.
226#[derive(Debug, Clone, Copy, PartialEq, Eq)]
227pub enum ForcedFreshness {
228    /// Indicates the asset should be considered stale and revalidated
229    ForceExpired,
230
231    /// Indicates the asset should be considered absent and treated like a miss
232    /// instead of a hit
233    ForceMiss,
234
235    /// Indicates the asset should be considered fresh despite possibly being stale
236    ForceFresh,
237}
238
239/// Freshness state of cache hit asset
240///
241///
242#[derive(Debug, Copy, Clone, IntoStaticStr, PartialEq, Eq)]
243#[strum(serialize_all = "snake_case")]
244pub enum HitStatus {
245    /// The asset's freshness directives indicate it has expired
246    Expired,
247
248    /// The asset was marked as expired, and should be treated as stale
249    ForceExpired,
250
251    /// The asset was marked as absent, and should be treated as a miss
252    ForceMiss,
253
254    /// An error occurred while processing the asset, so it should be treated as
255    /// a miss
256    FailedHitFilter,
257
258    /// The asset is not expired
259    Fresh,
260
261    /// Asset exists but is expired, forced to be a hit
262    ForceFresh,
263}
264
265impl HitStatus {
266    /// For displaying cache hit status
267    pub fn as_str(&self) -> &'static str {
268        self.into()
269    }
270
271    /// Whether cached asset can be served as fresh
272    pub fn is_fresh(&self) -> bool {
273        *self == HitStatus::Fresh || *self == HitStatus::ForceFresh
274    }
275
276    /// Check whether the hit status should be treated as a miss. A forced miss
277    /// is obviously treated as a miss. A hit-filter failure is treated as a
278    /// miss because we can't use the asset as an actual hit. If we treat it as
279    /// expired, we still might not be able to use it even if revalidation
280    /// succeeds.
281    pub fn is_treated_as_miss(self) -> bool {
282        matches!(self, HitStatus::ForceMiss | HitStatus::FailedHitFilter)
283    }
284}
285
286pub struct LockCtx {
287    pub lock: Option<Locked>,
288    pub cache_lock: &'static CacheKeyLockImpl,
289    pub wait_timeout: Option<Duration>,
290}
291
292// Fields like storage handlers that are needed only when cache is enabled (or bypassing).
293struct HttpCacheInnerEnabled {
294    pub meta: Option<CacheMeta>,
295    // when set, even if an asset exists, it would only be considered valid after this timestamp
296    pub valid_after: Option<SystemTime>,
297    pub miss_handler: Option<MissHandler>,
298    pub body_reader: Option<HitHandler>,
299    pub storage: &'static (dyn storage::Storage + Sync), // static for now
300    pub eviction: Option<&'static (dyn eviction::EvictionManager + Sync)>,
301    pub lock_ctx: Option<LockCtx>,
302    pub traces: trace::CacheTraceCTX,
303}
304
305struct HttpCacheInner {
306    // Prefer adding fields to InnerEnabled if possible, these fields are released
307    // when cache is disabled.
308    // If fields are needed after cache disablement, add directly to Inner.
309    pub enabled_ctx: Option<Box<HttpCacheInnerEnabled>>,
310    pub key: Option<CacheKey>,
311    // when set, an asset will be rejected from the cache if it exceeds configured size in bytes
312    pub max_file_size_tracker: Option<MaxFileSizeTracker>,
313    pub predictor: Option<&'static (dyn predictor::CacheablePredictor + Sync)>,
314}
315
316#[derive(Debug, Default)]
317#[non_exhaustive]
318pub struct CacheOptionOverrides {
319    pub wait_timeout: Option<Duration>,
320}
321
322impl HttpCache {
323    /// Create a new [HttpCache].
324    ///
325    /// Caching is not enabled by default.
326    pub fn new() -> Self {
327        HttpCache {
328            phase: CachePhase::Disabled(NoCacheReason::NeverEnabled),
329            inner: None,
330            digest: HttpCacheDigest::default(),
331        }
332    }
333
334    /// Whether the cache is enabled
335    pub fn enabled(&self) -> bool {
336        !matches!(self.phase, CachePhase::Disabled(_) | CachePhase::Bypass)
337    }
338
339    /// Whether the cache is being bypassed
340    pub fn bypassing(&self) -> bool {
341        matches!(self.phase, CachePhase::Bypass)
342    }
343
344    /// Return the [CachePhase]
345    pub fn phase(&self) -> CachePhase {
346        self.phase
347    }
348
349    /// Whether anything was fetched from the upstream
350    ///
351    /// This essentially checks all possible [CachePhase] who need to contact the upstream server
352    pub fn upstream_used(&self) -> bool {
353        use CachePhase::*;
354        match self.phase {
355            Disabled(_) | Bypass | Miss | Expired | Revalidated | RevalidatedNoCache(_) => true,
356            Hit | Stale | StaleUpdating => false,
357            Uninit | CacheKey => false, // invalid states for this call, treat them as false to keep it simple
358        }
359    }
360
361    /// Check whether the backend storage is the type `T`.
362    pub fn storage_type_is<T: 'static>(&self) -> bool {
363        self.inner
364            .as_ref()
365            .and_then(|inner| {
366                inner
367                    .enabled_ctx
368                    .as_ref()
369                    .and_then(|ie| ie.storage.as_any().downcast_ref::<T>())
370            })
371            .is_some()
372    }
373
374    /// Release the cache lock if the current request is a cache writer.
375    ///
376    /// Generally callers should prefer using `disable` when a cache lock should be released
377    /// due to an error to clear all cache context. This function is for releasing the cache lock
378    /// while still keeping the cache around for reading, e.g. when serving stale.
379    pub fn release_write_lock(&mut self, reason: NoCacheReason) {
380        use NoCacheReason::*;
381        if let Some(inner) = self.inner.as_mut() {
382            if let Some(lock_ctx) = inner
383                .enabled_ctx
384                .as_mut()
385                .and_then(|ie| ie.lock_ctx.as_mut())
386            {
387                let lock = lock_ctx.lock.take();
388                if let Some(Locked::Write(permit)) = lock {
389                    let lock_status = match reason {
390                        // let the next request try to fetch it
391                        InternalError | StorageError | Deferred | UpstreamError => {
392                            LockStatus::TransientError
393                        }
394                        // depends on why the proxy upstream filter declined the request,
395                        // for now still allow next request try to acquire to avoid thundering herd
396                        DeclinedToUpstream => LockStatus::TransientError,
397                        // no need for the lock anymore
398                        OriginNotCache | ResponseTooLarge | PredictedResponseTooLarge => {
399                            LockStatus::GiveUp
400                        }
401                        Custom(reason) => lock_ctx.cache_lock.custom_lock_status(reason),
402                        // should never happen, NeverEnabled shouldn't hold a lock
403                        NeverEnabled => panic!("NeverEnabled holds a write lock"),
404                        CacheLockGiveUp | CacheLockTimeout => {
405                            panic!("CacheLock* are for cache lock readers only")
406                        }
407                    };
408                    lock_ctx
409                        .cache_lock
410                        .release(inner.key.as_ref().unwrap(), permit, lock_status);
411                }
412            }
413        }
414    }
415
416    /// Disable caching
417    pub fn disable(&mut self, reason: NoCacheReason) {
418        // XXX: compile type enforce?
419        assert!(
420            reason != NoCacheReason::NeverEnabled,
421            "NeverEnabled not allowed as a disable reason"
422        );
423        match self.phase {
424            CachePhase::Disabled(old_reason) => {
425                // replace reason
426                if old_reason == NoCacheReason::NeverEnabled {
427                    // safeguard, don't allow replacing NeverEnabled as a reason
428                    // TODO: can be promoted to assertion once confirmed nothing is attempting this
429                    warn!("Tried to replace cache NeverEnabled with reason: {reason:?}");
430                    return;
431                }
432                self.phase = CachePhase::Disabled(reason);
433            }
434            _ => {
435                self.phase = CachePhase::Disabled(reason);
436                self.release_write_lock(reason);
437                // enabled_ctx will be cleared out
438                let mut inner_enabled = self
439                    .inner_mut()
440                    .enabled_ctx
441                    .take()
442                    .expect("could remove enabled_ctx on disable");
443                // log initial disable reason
444                inner_enabled
445                    .traces
446                    .cache_span
447                    .set_tag(|| trace::Tag::new("disable_reason", reason.as_str()));
448            }
449        }
450    }
451
452    /* The following methods panic when they are used in the wrong phase.
453     * This is better than returning errors as such panics are only caused by coding error, which
454     * should be fixed right away. Tokio runtime only crashes the current task instead of the whole
455     * program when these panics happen. */
456
457    /// Set the cache to bypass
458    ///
459    /// # Panic
460    /// This call is only allowed in [CachePhase::CacheKey] phase (before any cache lookup is performed).
461    /// Use it in any other phase will lead to panic.
462    pub fn bypass(&mut self) {
463        match self.phase {
464            CachePhase::CacheKey => {
465                // before cache lookup / found / miss
466                self.phase = CachePhase::Bypass;
467                self.inner_enabled_mut()
468                    .traces
469                    .cache_span
470                    .set_tag(|| trace::Tag::new("bypassed", true));
471            }
472            _ => panic!("wrong phase to bypass HttpCache {:?}", self.phase),
473        }
474    }
475
476    /// Enable the cache
477    ///
478    /// - `storage`: the cache storage backend that implements [storage::Storage]
479    /// - `eviction`: optionally the eviction manager, without it, nothing will be evicted from the storage
480    /// - `predictor`: optionally a cache predictor. The cache predictor predicts whether something is likely
481    ///   to be cacheable or not. This is useful because the proxy can apply different types of optimization to
482    ///   cacheable and uncacheable requests.
483    /// - `cache_lock`: optionally a cache lock which handles concurrent lookups to the same asset. Without it
484    ///   such lookups will all be allowed to fetch the asset independently.
485    pub fn enable(
486        &mut self,
487        storage: &'static (dyn storage::Storage + Sync),
488        eviction: Option<&'static (dyn eviction::EvictionManager + Sync)>,
489        predictor: Option<&'static (dyn predictor::CacheablePredictor + Sync)>,
490        cache_lock: Option<&'static CacheKeyLockImpl>,
491        option_overrides: Option<CacheOptionOverrides>,
492    ) {
493        match self.phase {
494            CachePhase::Disabled(_) => {
495                self.phase = CachePhase::Uninit;
496
497                let lock_ctx = cache_lock.map(|cache_lock| LockCtx {
498                    cache_lock,
499                    lock: None,
500                    wait_timeout: option_overrides
501                        .as_ref()
502                        .and_then(|overrides| overrides.wait_timeout),
503                });
504
505                self.inner = Some(Box::new(HttpCacheInner {
506                    enabled_ctx: Some(Box::new(HttpCacheInnerEnabled {
507                        meta: None,
508                        valid_after: None,
509                        miss_handler: None,
510                        body_reader: None,
511                        storage,
512                        eviction,
513                        lock_ctx,
514                        traces: CacheTraceCTX::new(),
515                    })),
516                    key: None,
517                    max_file_size_tracker: None,
518                    predictor,
519                }));
520            }
521            _ => panic!("Cannot enable already enabled HttpCache {:?}", self.phase),
522        }
523    }
524
525    /// Set the cache lock implementation.
526    /// # Panic
527    /// Must be called before a cache lock is attempted to be acquired,
528    /// i.e. in the `cache_key_callback` or `cache_hit_filter` phases.
529    pub fn set_cache_lock(
530        &mut self,
531        cache_lock: Option<&'static CacheKeyLockImpl>,
532        option_overrides: Option<CacheOptionOverrides>,
533    ) {
534        match self.phase {
535            CachePhase::Disabled(_)
536            | CachePhase::CacheKey
537            | CachePhase::Stale
538            | CachePhase::Hit => {
539                let inner_enabled = self.inner_enabled_mut();
540                if inner_enabled
541                    .lock_ctx
542                    .as_ref()
543                    .is_some_and(|ctx| ctx.lock.is_some())
544                {
545                    panic!("lock already set when resetting cache lock")
546                } else {
547                    let lock_ctx = cache_lock.map(|cache_lock| LockCtx {
548                        cache_lock,
549                        lock: None,
550                        wait_timeout: option_overrides.and_then(|overrides| overrides.wait_timeout),
551                    });
552                    inner_enabled.lock_ctx = lock_ctx;
553                }
554            }
555            _ => panic!("wrong phase: {:?}", self.phase),
556        }
557    }
558
559    // Enable distributed tracing
560    pub fn enable_tracing(&mut self, parent_span: trace::Span) {
561        if let Some(inner_enabled) = self.inner.as_mut().and_then(|i| i.enabled_ctx.as_mut()) {
562            inner_enabled.traces.enable(parent_span);
563        }
564    }
565
566    // Get the cache parent tracing span
567    pub fn get_cache_span(&self) -> Option<trace::SpanHandle> {
568        self.inner
569            .as_ref()
570            .and_then(|i| i.enabled_ctx.as_ref().map(|ie| ie.traces.get_cache_span()))
571    }
572
573    // Get the cache `miss` tracing span
574    pub fn get_miss_span(&self) -> Option<trace::SpanHandle> {
575        self.inner
576            .as_ref()
577            .and_then(|i| i.enabled_ctx.as_ref().map(|ie| ie.traces.get_miss_span()))
578    }
579
580    // Get the cache `hit` tracing span
581    pub fn get_hit_span(&self) -> Option<trace::SpanHandle> {
582        self.inner
583            .as_ref()
584            .and_then(|i| i.enabled_ctx.as_ref().map(|ie| ie.traces.get_hit_span()))
585    }
586
587    // shortcut to access inner fields, panic if phase is disabled
588    #[inline]
589    fn inner_enabled_mut(&mut self) -> &mut HttpCacheInnerEnabled {
590        self.inner.as_mut().unwrap().enabled_ctx.as_mut().unwrap()
591    }
592
593    #[inline]
594    fn inner_enabled(&self) -> &HttpCacheInnerEnabled {
595        self.inner.as_ref().unwrap().enabled_ctx.as_ref().unwrap()
596    }
597
598    // shortcut to access inner fields, panic if cache was never enabled
599    #[inline]
600    fn inner_mut(&mut self) -> &mut HttpCacheInner {
601        self.inner.as_mut().unwrap()
602    }
603
604    #[inline]
605    fn inner(&self) -> &HttpCacheInner {
606        self.inner.as_ref().unwrap()
607    }
608
609    /// Set the cache key
610    /// # Panic
611    /// Cache key is only allowed to be set in its own phase. Set it in other phases will cause panic.
612    pub fn set_cache_key(&mut self, key: CacheKey) {
613        match self.phase {
614            CachePhase::Uninit | CachePhase::CacheKey => {
615                self.phase = CachePhase::CacheKey;
616                self.inner_mut().key = Some(key);
617            }
618            _ => panic!("wrong phase {:?}", self.phase),
619        }
620    }
621
622    /// Return the cache key used for asset lookup
623    /// # Panic
624    /// Can only be called after the cache key is set and the cache is not disabled. Panic otherwise.
625    pub fn cache_key(&self) -> &CacheKey {
626        match self.phase {
627            CachePhase::Disabled(NoCacheReason::NeverEnabled) | CachePhase::Uninit => {
628                panic!("wrong phase {:?}", self.phase)
629            }
630            _ => self
631                .inner()
632                .key
633                .as_ref()
634                .expect("cache key should be set (set_cache_key not called?)"),
635        }
636    }
637
638    /// Return the max size allowed to be cached.
639    pub fn max_file_size_bytes(&self) -> Option<usize> {
640        assert!(
641            !matches!(
642                self.phase,
643                CachePhase::Disabled(NoCacheReason::NeverEnabled)
644            ),
645            "tried to access max file size bytes when cache never enabled"
646        );
647        self.inner()
648            .max_file_size_tracker
649            .as_ref()
650            .map(|t| t.max_file_size_bytes())
651    }
652
653    /// Set the maximum response _body_ size in bytes that will be admitted to the cache.
654    ///
655    /// Response header size should not contribute to the max file size.
656    ///
657    /// To track body bytes, call `track_bytes_for_max_file_size`.
658    pub fn set_max_file_size_bytes(&mut self, max_file_size_bytes: usize) {
659        match self.phase {
660            CachePhase::Disabled(_) => panic!("wrong phase {:?}", self.phase),
661            _ => {
662                self.inner_mut().max_file_size_tracker =
663                    Some(MaxFileSizeTracker::new(max_file_size_bytes));
664            }
665        }
666    }
667
668    /// Record body bytes for the max file size tracker.
669    ///
670    /// The `bytes_len` input contributes to a cumulative body byte tracker.
671    ///
672    /// Once the cumulative body bytes exceeds the maximum allowable cache file size (as configured
673    /// by `set_max_file_size_bytes`), then the return value will be false.
674    ///
675    /// Else the return value is true as long as the max file size is not exceeded.
676    /// If max file size was not configured, the return value is always true.
677    pub fn track_body_bytes_for_max_file_size(&mut self, bytes_len: usize) -> bool {
678        // This is intended to be callable when cache has already been disabled,
679        // so that we can re-mark an asset as cacheable if the body size is under limits.
680        assert!(
681            !matches!(
682                self.phase,
683                CachePhase::Disabled(NoCacheReason::NeverEnabled)
684            ),
685            "tried to access max file size bytes when cache never enabled"
686        );
687        self.inner_mut()
688            .max_file_size_tracker
689            .as_mut()
690            .is_none_or(|t| t.add_body_bytes(bytes_len))
691    }
692
693    /// Check if the max file size has been exceeded according to max file size tracker.
694    ///
695    /// Return true if max file size was exceeded.
696    pub fn exceeded_max_file_size(&self) -> bool {
697        assert!(
698            !matches!(
699                self.phase,
700                CachePhase::Disabled(NoCacheReason::NeverEnabled)
701            ),
702            "tried to access max file size bytes when cache never enabled"
703        );
704        self.inner()
705            .max_file_size_tracker
706            .as_ref()
707            .is_some_and(|t| !t.allow_caching())
708    }
709
710    /// Set that cache is found in cache storage.
711    ///
712    /// This function is called after [Self::cache_lookup()] which returns the [CacheMeta] and
713    /// [HitHandler].
714    ///
715    /// The `hit_status` enum allows the caller to force expire assets.
716    pub fn cache_found(&mut self, meta: CacheMeta, hit_handler: HitHandler, hit_status: HitStatus) {
717        // Stale allowed because of cache lock and then retry
718        if !matches!(self.phase, CachePhase::CacheKey | CachePhase::Stale) {
719            panic!("wrong phase {:?}", self.phase)
720        }
721
722        self.phase = match hit_status {
723            HitStatus::Fresh | HitStatus::ForceFresh => CachePhase::Hit,
724            HitStatus::Expired | HitStatus::ForceExpired => CachePhase::Stale,
725            HitStatus::FailedHitFilter | HitStatus::ForceMiss => self.phase,
726        };
727
728        let phase = self.phase;
729        let inner = self.inner_mut();
730
731        let key = inner.key.as_ref().expect("key must be set on hit");
732        let inner_enabled = inner
733            .enabled_ctx
734            .as_mut()
735            .expect("cache_found must be called while cache enabled");
736
737        // The cache lock might not be set for stale hit or hits treated as
738        // misses, so we need to initialize it here
739        let stale = phase == CachePhase::Stale;
740        if stale || hit_status.is_treated_as_miss() {
741            if let Some(lock_ctx) = inner_enabled.lock_ctx.as_mut() {
742                lock_ctx.lock = Some(lock_ctx.cache_lock.lock(key, stale));
743            }
744        }
745
746        if hit_status.is_treated_as_miss() {
747            // Clear the body and meta for hits that are treated as misses
748            inner_enabled.body_reader = None;
749            inner_enabled.meta = None;
750        } else {
751            // Set the metadata appropriately for legit hits
752            inner_enabled.traces.start_hit_span(phase, hit_status);
753            inner_enabled.traces.log_meta_in_hit_span(&meta);
754            if let Some(eviction) = inner_enabled.eviction {
755                // TODO: make access() accept CacheKey
756                let cache_key = key.to_compact();
757                if hit_handler.should_count_access() {
758                    let size = hit_handler.get_eviction_weight();
759                    eviction.access(&cache_key, size, meta.0.internal.fresh_until);
760                }
761            }
762            inner_enabled.meta = Some(meta);
763            inner_enabled.body_reader = Some(hit_handler);
764        }
765    }
766
767    /// Mark `self` to be cache miss.
768    ///
769    /// This function is called after [Self::cache_lookup()] finds nothing or the caller decides
770    /// not to use the assets found.
771    /// # Panic
772    /// Panic in other phases.
773    pub fn cache_miss(&mut self) {
774        match self.phase {
775            // from CacheKey: set state to miss during cache lookup
776            // from Bypass: response became cacheable, set state to miss to cache
777            // from Stale: waited for cache lock, then retried and found asset was gone
778            CachePhase::CacheKey | CachePhase::Bypass | CachePhase::Stale => {
779                self.phase = CachePhase::Miss;
780                // It's possible that we've set the meta on lookup and have come back around
781                // here after not being able to acquire the cache lock, and our item has since
782                // purged or expired. We should be sure that the meta is not set in this case
783                // as there shouldn't be a meta set for cache misses.
784                self.inner_enabled_mut().meta = None;
785                self.inner_enabled_mut().traces.start_miss_span();
786            }
787            _ => panic!("wrong phase {:?}", self.phase),
788        }
789    }
790
791    /// Return the [HitHandler]
792    /// # Panic
793    /// Call this after [Self::cache_found()], panic in other phases.
794    pub fn hit_handler(&mut self) -> &mut HitHandler {
795        match self.phase {
796            CachePhase::Hit
797            | CachePhase::Stale
798            | CachePhase::StaleUpdating
799            | CachePhase::Revalidated
800            | CachePhase::RevalidatedNoCache(_) => {
801                self.inner_enabled_mut().body_reader.as_mut().unwrap()
802            }
803            _ => panic!("wrong phase {:?}", self.phase),
804        }
805    }
806
807    /// Return the body reader during a cache admission (miss/expired) which decouples the downstream
808    /// read and upstream cache write
809    pub fn miss_body_reader(&mut self) -> Option<&mut HitHandler> {
810        match self.phase {
811            CachePhase::Miss | CachePhase::Expired => {
812                let inner_enabled = self.inner_enabled_mut();
813                if inner_enabled.storage.support_streaming_partial_write() {
814                    inner_enabled.body_reader.as_mut()
815                } else {
816                    // body_reader could be set even when the storage doesn't support streaming
817                    // Expired cache would have the reader set.
818                    None
819                }
820            }
821            _ => None,
822        }
823    }
824
825    /// Return whether the underlying storage backend supports streaming partial write.
826    ///
827    /// Returns None if cache is not enabled.
828    pub fn support_streaming_partial_write(&self) -> Option<bool> {
829        self.inner.as_ref().and_then(|inner| {
830            inner
831                .enabled_ctx
832                .as_ref()
833                .map(|c| c.storage.support_streaming_partial_write())
834        })
835    }
836
837    /// Call this when cache hit is fully read.
838    ///
839    /// This call will release resource if any and log the timing in tracing if set.
840    /// # Panic
841    /// Panic in phases where there is no cache hit.
842    pub async fn finish_hit_handler(&mut self) -> Result<()> {
843        match self.phase {
844            CachePhase::Hit
845            | CachePhase::Miss
846            | CachePhase::Expired
847            | CachePhase::Stale
848            | CachePhase::StaleUpdating
849            | CachePhase::Revalidated
850            | CachePhase::RevalidatedNoCache(_) => {
851                let inner = self.inner_mut();
852                let inner_enabled = inner.enabled_ctx.as_mut().expect("cache enabled");
853                if inner_enabled.body_reader.is_none() {
854                    // already finished, we allow calling this function more than once
855                    return Ok(());
856                }
857                let body_reader = inner_enabled.body_reader.take().unwrap();
858                let key = inner.key.as_ref().unwrap();
859                let result = body_reader
860                    .finish(
861                        inner_enabled.storage,
862                        key,
863                        &inner_enabled.traces.hit_span.handle(),
864                    )
865                    .await;
866                inner_enabled.traces.finish_hit_span();
867                result
868            }
869            _ => panic!("wrong phase {:?}", self.phase),
870        }
871    }
872
873    /// Set the [MissHandler] according to cache_key and meta, can only call once
874    pub async fn set_miss_handler(&mut self) -> Result<()> {
875        match self.phase {
876            // set_miss_handler() needs to be called after set_cache_meta() (which change Stale to Expire).
877            // This is an artificial rule to enforce the state transitions
878            CachePhase::Miss | CachePhase::Expired => {
879                let inner = self.inner_mut();
880                let inner_enabled = inner
881                    .enabled_ctx
882                    .as_mut()
883                    .expect("cache enabled on miss and expired");
884                if inner_enabled.miss_handler.is_some() {
885                    panic!("write handler is already set")
886                }
887                let meta = inner_enabled.meta.as_ref().unwrap();
888                let key = inner.key.as_ref().unwrap();
889                let miss_handler = inner_enabled
890                    .storage
891                    .get_miss_handler(key, meta, &inner_enabled.traces.get_miss_span())
892                    .await?;
893
894                inner_enabled.miss_handler = Some(miss_handler);
895
896                if inner_enabled.storage.support_streaming_partial_write() {
897                    // If a reader can access partial write, the cache lock can be released here
898                    // to let readers start reading the body.
899                    if let Some(lock_ctx) = inner_enabled.lock_ctx.as_mut() {
900                        let lock = lock_ctx.lock.take();
901                        if let Some(Locked::Write(permit)) = lock {
902                            lock_ctx.cache_lock.release(key, permit, LockStatus::Done);
903                        }
904                    }
905                    // Downstream read and upstream write can be decoupled
906                    let body_reader = inner_enabled
907                        .storage
908                        .lookup_streaming_write(
909                            key,
910                            inner_enabled
911                                .miss_handler
912                                .as_ref()
913                                .expect("miss handler already set")
914                                .streaming_write_tag(),
915                            &inner_enabled.traces.get_miss_span(),
916                        )
917                        .await?;
918
919                    if let Some((_meta, body_reader)) = body_reader {
920                        inner_enabled.body_reader = Some(body_reader);
921                    } else {
922                        // body_reader should exist now because streaming_partial_write is to support it
923                        panic!("unable to get body_reader for {:?}", meta);
924                    }
925                }
926                Ok(())
927            }
928            _ => panic!("wrong phase {:?}", self.phase),
929        }
930    }
931
932    /// Return the [MissHandler] to write the response body to cache.
933    ///
934    /// `None`: the handler has not been set or already finished
935    pub fn miss_handler(&mut self) -> Option<&mut MissHandler> {
936        match self.phase {
937            CachePhase::Miss | CachePhase::Expired => {
938                self.inner_enabled_mut().miss_handler.as_mut()
939            }
940            _ => panic!("wrong phase {:?}", self.phase),
941        }
942    }
943
944    /// Finish cache admission
945    ///
946    /// If [self] is dropped without calling this, the cache admission is considered incomplete and
947    /// should be cleaned up.
948    ///
949    /// This call will also trigger eviction if set.
950    pub async fn finish_miss_handler(&mut self) -> Result<()> {
951        match self.phase {
952            CachePhase::Miss | CachePhase::Expired => {
953                let inner = self.inner_mut();
954                let inner_enabled = inner
955                    .enabled_ctx
956                    .as_mut()
957                    .expect("cache enabled on miss and expired");
958                if inner_enabled.miss_handler.is_none() {
959                    // already finished, we allow calling this function more than once
960                    return Ok(());
961                }
962                let miss_handler = inner_enabled.miss_handler.take().unwrap();
963                let size = miss_handler.finish().await?;
964                let key = inner
965                    .key
966                    .as_ref()
967                    .expect("key set by miss or expired phase");
968                if let Some(lock_ctx) = inner_enabled.lock_ctx.as_mut() {
969                    let lock = lock_ctx.lock.take();
970                    if let Some(Locked::Write(permit)) = lock {
971                        // no need to call r.unlock() because release() will call it
972                        // r is a guard to make sure the lock is unlocked when this request is dropped
973                        lock_ctx.cache_lock.release(key, permit, LockStatus::Done);
974                    }
975                }
976                if let Some(eviction) = inner_enabled.eviction {
977                    let cache_key = key.to_compact();
978                    let meta = inner_enabled.meta.as_ref().unwrap();
979                    let evicted = match size {
980                        MissFinishType::Created(size) => {
981                            eviction.admit(cache_key, size, meta.0.internal.fresh_until)
982                        }
983                        MissFinishType::Appended(size, max_size) => {
984                            eviction.increment_weight(&cache_key, size, max_size)
985                        }
986                    };
987                    // actual eviction can be done async
988                    let span = inner_enabled.traces.child("eviction");
989                    let handle = span.handle();
990                    let storage = inner_enabled.storage;
991                    tokio::task::spawn(async move {
992                        for item in evicted {
993                            if let Err(e) = storage.purge(&item, PurgeType::Eviction, &handle).await
994                            {
995                                warn!("Failed to purge {item} during eviction for finish miss handler: {e}");
996                            }
997                        }
998                    });
999                }
1000                inner_enabled.traces.finish_miss_span();
1001                Ok(())
1002            }
1003            _ => panic!("wrong phase {:?}", self.phase),
1004        }
1005    }
1006
1007    /// Set the [CacheMeta] of the cache
1008    pub fn set_cache_meta(&mut self, meta: CacheMeta) {
1009        match self.phase {
1010            // TODO: store the staled meta somewhere else for future use?
1011            CachePhase::Stale | CachePhase::Miss => {
1012                let inner_enabled = self.inner_enabled_mut();
1013                // TODO: have a separate expired span?
1014                inner_enabled.traces.log_meta_in_miss_span(&meta);
1015                inner_enabled.meta = Some(meta);
1016            }
1017            _ => panic!("wrong phase {:?}", self.phase),
1018        }
1019        if self.phase == CachePhase::Stale {
1020            self.phase = CachePhase::Expired;
1021        }
1022    }
1023
1024    /// Set the [CacheMeta] of the cache after revalidation.
1025    ///
1026    /// Certain info such as the original cache admission time will be preserved. Others will
1027    /// be replaced by the input `meta`.
1028    pub async fn revalidate_cache_meta(&mut self, mut meta: CacheMeta) -> Result<bool> {
1029        let result = match self.phase {
1030            CachePhase::Stale => {
1031                let inner = self.inner_mut();
1032                let inner_enabled = inner
1033                    .enabled_ctx
1034                    .as_mut()
1035                    .expect("stale phase has cache enabled");
1036                // TODO: we should keep old meta in place, just use new one to update it
1037                // that requires cacheable_filter to take a mut header and just return InternalMeta
1038
1039                // update new meta with old meta's created time
1040                let old_meta = inner_enabled.meta.take().unwrap();
1041                let created = old_meta.0.internal.created;
1042                meta.0.internal.created = created;
1043                // meta.internal.updated was already set to new meta's `created`,
1044                // no need to set `updated` here
1045                // Merge old extensions with new ones. New exts take precedence if they conflict.
1046                let mut extensions = old_meta.0.extensions;
1047                extensions.extend(meta.0.extensions);
1048                meta.0.extensions = extensions;
1049
1050                inner_enabled.meta.replace(meta);
1051
1052                let mut span = inner_enabled.traces.child("update_meta");
1053                let result = inner_enabled
1054                    .storage
1055                    .update_meta(
1056                        inner.key.as_ref().unwrap(),
1057                        inner_enabled.meta.as_ref().unwrap(),
1058                        &span.handle(),
1059                    )
1060                    .await;
1061                span.set_tag(|| trace::Tag::new("updated", result.is_ok()));
1062
1063                // regardless of result, release the cache lock
1064                if let Some(lock_ctx) = inner_enabled.lock_ctx.as_mut() {
1065                    let lock = lock_ctx.lock.take();
1066                    if let Some(Locked::Write(permit)) = lock {
1067                        lock_ctx.cache_lock.release(
1068                            inner.key.as_ref().expect("key set by stale phase"),
1069                            permit,
1070                            LockStatus::Done,
1071                        );
1072                    }
1073                }
1074
1075                result
1076            }
1077            _ => panic!("wrong phase {:?}", self.phase),
1078        };
1079        self.phase = CachePhase::Revalidated;
1080        result
1081    }
1082
1083    /// After a successful revalidation, update certain headers for the cached asset
1084    /// such as `Etag` with the fresh response header `resp`.
1085    pub fn revalidate_merge_header(&mut self, resp: &RespHeader) -> ResponseHeader {
1086        match self.phase {
1087            CachePhase::Stale => {
1088                /*
1089                 * https://datatracker.ietf.org/doc/html/rfc9110#section-15.4.5
1090                 * 304 response MUST generate ... would have been sent in a 200 ...
1091                 * - Content-Location, Date, ETag, and Vary
1092                 * - Cache-Control and Expires...
1093                 */
1094                let mut old_header = self.inner_enabled().meta.as_ref().unwrap().0.header.clone();
1095                let mut clone_header = |header_name: &'static str| {
1096                    for (i, value) in resp.headers.get_all(header_name).iter().enumerate() {
1097                        if i == 0 {
1098                            old_header
1099                                .insert_header(header_name, value)
1100                                .expect("can add valid header");
1101                        } else {
1102                            old_header
1103                                .append_header(header_name, value)
1104                                .expect("can add valid header");
1105                        }
1106                    }
1107                };
1108                clone_header("cache-control");
1109                clone_header("expires");
1110                clone_header("cache-tag");
1111                clone_header("cdn-cache-control");
1112                clone_header("etag");
1113                // https://datatracker.ietf.org/doc/html/rfc9111#section-4.3.4
1114                // "...cache MUST update its header fields with the header fields provided in the 304..."
1115                // But if the Vary header changes, the cached response may no longer match the
1116                // incoming request.
1117                //
1118                // For simplicity, ignore changing Vary in revalidation for now.
1119                // TODO: if we support vary during revalidation, there are a few edge cases to
1120                // consider (what if Vary header appears/disappears/changes)?
1121                //
1122                // clone_header("vary");
1123                old_header
1124            }
1125            _ => panic!("wrong phase {:?}", self.phase),
1126        }
1127    }
1128
1129    /// Mark this asset uncacheable after revalidation
1130    pub fn revalidate_uncacheable(&mut self, header: ResponseHeader, reason: NoCacheReason) {
1131        match self.phase {
1132            CachePhase::Stale => {
1133                // replace cache meta header
1134                self.inner_enabled_mut().meta.as_mut().unwrap().0.header = header;
1135                // upstream request done, release write lock
1136                self.release_write_lock(reason);
1137            }
1138            _ => panic!("wrong phase {:?}", self.phase),
1139        }
1140        self.phase = CachePhase::RevalidatedNoCache(reason);
1141        // TODO: remove this asset from cache once finished?
1142    }
1143
1144    /// Mark this asset as stale, but being updated separately from this request.
1145    pub fn set_stale_updating(&mut self) {
1146        match self.phase {
1147            CachePhase::Stale => self.phase = CachePhase::StaleUpdating,
1148            _ => panic!("wrong phase {:?}", self.phase),
1149        }
1150    }
1151
1152    /// Update the variance of the [CacheMeta].
1153    ///
1154    /// Note that this process may change the lookup `key`, and eventually (when the asset is
1155    /// written to storage) invalidate other cached variants under the same primary key as the
1156    /// current asset.
1157    pub fn update_variance(&mut self, variance: Option<HashBinary>) {
1158        // If this is a cache miss, we will simply update the variance in the meta.
1159        //
1160        // If this is an expired response, we will have to consider a few cases:
1161        //
1162        // **Case 1**: Variance was absent, but caller sets it now.
1163        // We will just insert it into the meta. The current asset becomes the primary variant.
1164        // Because the current location of the asset is already the primary variant, nothing else
1165        // needs to be done.
1166        //
1167        // **Case 2**: Variance was present, but it changed or was removed.
1168        // We want the current asset to take over the primary slot, in order to invalidate all
1169        // other variants derived under the old Vary.
1170        //
1171        // **Case 3**: Variance did not change.
1172        // Nothing needs to happen.
1173        let inner = match self.phase {
1174            CachePhase::Miss | CachePhase::Expired => self.inner_mut(),
1175            _ => panic!("wrong phase {:?}", self.phase),
1176        };
1177        let inner_enabled = inner
1178            .enabled_ctx
1179            .as_mut()
1180            .expect("cache enabled on miss and expired");
1181
1182        // Update the variance in the meta
1183        if let Some(variance_hash) = variance.as_ref() {
1184            inner_enabled
1185                .meta
1186                .as_mut()
1187                .unwrap()
1188                .set_variance_key(*variance_hash);
1189        } else {
1190            inner_enabled.meta.as_mut().unwrap().remove_variance();
1191        }
1192
1193        // Change the lookup `key` if necessary, in order to admit asset into the primary slot
1194        // instead of the secondary slot.
1195        let key = inner.key.as_ref().unwrap();
1196        if let Some(old_variance) = key.get_variance_key().as_ref() {
1197            // This is a secondary variant slot.
1198            if Some(*old_variance) != variance.as_ref() {
1199                // This new variance does not match the variance in the cache key we used to look
1200                // up this asset.
1201                // Drop the cache lock to avoid leaving a dangling lock
1202                // (because we locked with the old cache key for the secondary slot)
1203                // TODO: maybe we should try to signal waiting readers to compete for the primary key
1204                // lock instead? we will not be modifying this secondary slot so it's not actually
1205                // ready for readers
1206                if let Some(lock_ctx) = inner_enabled.lock_ctx.as_mut() {
1207                    if let Some(Locked::Write(permit)) = lock_ctx.lock.take() {
1208                        lock_ctx.cache_lock.release(key, permit, LockStatus::Done);
1209                    }
1210                }
1211                // Remove the `variance` from the `key`, so that we admit this asset into the
1212                // primary slot. (`key` is used to tell storage where to write the data.)
1213                inner.key.as_mut().unwrap().remove_variance_key();
1214            }
1215        }
1216    }
1217
1218    /// Return the [CacheMeta] of this asset
1219    ///
1220    /// # Panic
1221    /// Panic in phases which has no cache meta.
1222    pub fn cache_meta(&self) -> &CacheMeta {
1223        match self.phase {
1224            // TODO: allow in Bypass phase?
1225            CachePhase::Stale
1226            | CachePhase::StaleUpdating
1227            | CachePhase::Expired
1228            | CachePhase::Hit
1229            | CachePhase::Revalidated
1230            | CachePhase::RevalidatedNoCache(_) => self.inner_enabled().meta.as_ref().unwrap(),
1231            CachePhase::Miss => {
1232                // this is the async body read case, safe because body_reader is only set
1233                // after meta is retrieved
1234                if self.inner_enabled().body_reader.is_some() {
1235                    self.inner_enabled().meta.as_ref().unwrap()
1236                } else {
1237                    panic!("wrong phase {:?}", self.phase);
1238                }
1239            }
1240
1241            _ => panic!("wrong phase {:?}", self.phase),
1242        }
1243    }
1244
1245    /// Return the [CacheMeta] of this asset if any
1246    ///
1247    /// Different from [Self::cache_meta()], this function is allowed to be called in
1248    /// [CachePhase::Miss] phase where the cache meta maybe set.
1249    /// # Panic
1250    /// Panic in phases that shouldn't have cache meta.
1251    pub fn maybe_cache_meta(&self) -> Option<&CacheMeta> {
1252        match self.phase {
1253            CachePhase::Miss
1254            | CachePhase::Stale
1255            | CachePhase::StaleUpdating
1256            | CachePhase::Expired
1257            | CachePhase::Hit
1258            | CachePhase::Revalidated
1259            | CachePhase::RevalidatedNoCache(_) => self.inner_enabled().meta.as_ref(),
1260            _ => panic!("wrong phase {:?}", self.phase),
1261        }
1262    }
1263
1264    /// Return the [`CacheKey`] of this asset if any.
1265    ///
1266    /// This is allowed to be called in any phase. If the cache key callback was not called,
1267    /// this will return None.
1268    pub fn maybe_cache_key(&self) -> Option<&CacheKey> {
1269        (!matches!(
1270            self.phase(),
1271            CachePhase::Disabled(NoCacheReason::NeverEnabled) | CachePhase::Uninit
1272        ))
1273        .then(|| self.cache_key())
1274    }
1275
1276    /// Perform the cache lookup from the given cache storage with the given cache key
1277    ///
1278    /// A cache hit will return [CacheMeta] which contains the header and meta info about
1279    /// the cache as well as a [HitHandler] to read the cache hit body.
1280    /// # Panic
1281    /// Panic in other phases.
1282    pub async fn cache_lookup(&mut self) -> Result<Option<(CacheMeta, HitHandler)>> {
1283        match self.phase {
1284            // Stale is allowed here because stale-> cache_lock -> lookup again
1285            CachePhase::CacheKey | CachePhase::Stale => {
1286                let inner = self
1287                    .inner
1288                    .as_mut()
1289                    .expect("Cache phase is checked and should have inner");
1290                let inner_enabled = inner
1291                    .enabled_ctx
1292                    .as_mut()
1293                    .expect("Cache enabled on cache_lookup");
1294                let mut span = inner_enabled.traces.child("lookup");
1295                let key = inner.key.as_ref().unwrap(); // safe, this phase should have cache key
1296                let now = Instant::now();
1297                let result = inner_enabled.storage.lookup(key, &span.handle()).await?;
1298                // one request may have multiple lookups
1299                self.digest.add_lookup_duration(now.elapsed());
1300                let result = result.and_then(|(meta, header)| {
1301                    if let Some(ts) = inner_enabled.valid_after {
1302                        if meta.created() < ts {
1303                            span.set_tag(|| trace::Tag::new("not valid", true));
1304                            return None;
1305                        }
1306                    }
1307                    Some((meta, header))
1308                });
1309                if result.is_none() {
1310                    if let Some(lock_ctx) = inner_enabled.lock_ctx.as_mut() {
1311                        lock_ctx.lock = Some(lock_ctx.cache_lock.lock(key, false));
1312                    }
1313                }
1314                span.set_tag(|| trace::Tag::new("found", result.is_some()));
1315                Ok(result)
1316            }
1317            _ => panic!("wrong phase {:?}", self.phase),
1318        }
1319    }
1320
1321    /// Update variance and see if the meta matches the current variance
1322    ///
1323    /// `cache_lookup() -> compute vary hash -> cache_vary_lookup()`
1324    /// This function allows callers to compute vary based on the initial cache hit.
1325    /// `meta` should be the ones returned from the initial cache_lookup()
1326    /// - return true if the meta is the variance.
1327    /// - return false if the current meta doesn't match the variance, need to cache_lookup() again
1328    pub fn cache_vary_lookup(&mut self, variance: HashBinary, meta: &CacheMeta) -> bool {
1329        match self.phase {
1330            // Stale is allowed here because stale-> cache_lock -> lookup again
1331            CachePhase::CacheKey | CachePhase::Stale => {
1332                let inner = self.inner_mut();
1333                // make sure that all variances found are fresher than this asset
1334                // this is because when purging all the variance, only the primary slot is deleted
1335                // the created TS of the primary is the tombstone of all the variances
1336                inner
1337                    .enabled_ctx
1338                    .as_mut()
1339                    .expect("cache enabled")
1340                    .valid_after = Some(meta.created());
1341
1342                // update vary
1343                let key = inner.key.as_mut().unwrap();
1344                // if no variance was previously set, then this is the first cache hit
1345                let is_initial_cache_hit = key.get_variance_key().is_none();
1346                key.set_variance_key(variance);
1347                let variance_binary = key.variance_bin();
1348                let matches_variance = meta.variance() == variance_binary;
1349
1350                // We should remove the variance in the lookup `key` if this is the primary variant
1351                // slot. We know this is the primary variant slot if this is the initial cache hit,
1352                // AND the variance in the `key` already matches the `meta`'s.
1353                //
1354                // For the primary variant slot, the storage backend needs to use the primary key
1355                // for both cache lookup and updating the meta. Otherwise it will look for the
1356                // asset in the wrong location during revalidation.
1357                //
1358                // We can recreate the "full" cache key by using the meta's variance, if needed.
1359                if matches_variance && is_initial_cache_hit {
1360                    inner.key.as_mut().unwrap().remove_variance_key();
1361                }
1362
1363                matches_variance
1364            }
1365            _ => panic!("wrong phase {:?}", self.phase),
1366        }
1367    }
1368
1369    /// Whether this request is behind a cache lock in order to wait for another request to read the
1370    /// asset.
1371    pub fn is_cache_locked(&self) -> bool {
1372        matches!(
1373            self.inner_enabled()
1374                .lock_ctx
1375                .as_ref()
1376                .and_then(|l| l.lock.as_ref()),
1377            Some(Locked::Read(_))
1378        )
1379    }
1380
1381    /// Whether this request is the leader request to fetch the assets for itself and other requests
1382    /// behind the cache lock.
1383    pub fn is_cache_lock_writer(&self) -> bool {
1384        matches!(
1385            self.inner_enabled()
1386                .lock_ctx
1387                .as_ref()
1388                .and_then(|l| l.lock.as_ref()),
1389            Some(Locked::Write(_))
1390        )
1391    }
1392
1393    /// Take the write lock from this request to transfer it to another one.
1394    /// # Panic
1395    ///  Call is_cache_lock_writer() to check first, will panic otherwise.
1396    pub fn take_write_lock(&mut self) -> (WritePermit, &'static CacheKeyLockImpl) {
1397        let lock_ctx = self
1398            .inner_enabled_mut()
1399            .lock_ctx
1400            .as_mut()
1401            .expect("take_write_lock() called without cache lock");
1402        let lock = lock_ctx
1403            .lock
1404            .take()
1405            .expect("take_write_lock() called without lock");
1406        match lock {
1407            Locked::Write(w) => (w, lock_ctx.cache_lock),
1408            Locked::Read(_) => panic!("take_write_lock() called on read lock"),
1409        }
1410    }
1411
1412    /// Set the write lock, which is usually transferred from [Self::take_write_lock()]
1413    ///
1414    /// # Panic
1415    /// Panics if cache lock was not originally configured for this request.
1416    // TODO: it may make sense to allow configuring the CacheKeyLock here too that the write permit
1417    // is associated with
1418    // (The WritePermit comes from the CacheKeyLock and should be used when releasing from the CacheKeyLock,
1419    // shouldn't be possible to give a WritePermit to a request using a different CacheKeyLock)
1420    pub fn set_write_lock(&mut self, write_lock: WritePermit) {
1421        if let Some(lock_ctx) = self.inner_enabled_mut().lock_ctx.as_mut() {
1422            lock_ctx.lock.replace(Locked::Write(write_lock));
1423        }
1424    }
1425
1426    /// Whether this request's cache hit is staled
1427    fn has_staled_asset(&self) -> bool {
1428        matches!(self.phase, CachePhase::Stale | CachePhase::StaleUpdating)
1429    }
1430
1431    /// Whether this asset is staled and stale if error is allowed
1432    pub fn can_serve_stale_error(&self) -> bool {
1433        self.has_staled_asset() && self.cache_meta().serve_stale_if_error(SystemTime::now())
1434    }
1435
1436    /// Whether this asset is staled and stale while revalidate is allowed.
1437    pub fn can_serve_stale_updating(&self) -> bool {
1438        self.has_staled_asset()
1439            && self
1440                .cache_meta()
1441                .serve_stale_while_revalidate(SystemTime::now())
1442    }
1443
1444    /// Wait for the cache read lock to be unlocked
1445    /// # Panic
1446    /// Check [Self::is_cache_locked()], panic if this request doesn't have a read lock.
1447    pub async fn cache_lock_wait(&mut self) -> LockStatus {
1448        let inner_enabled = self.inner_enabled_mut();
1449        let mut span = inner_enabled.traces.child("cache_lock");
1450        // should always call is_cache_locked() before this function, which should guarantee that
1451        // the inner cache has a read lock and lock ctx
1452        let (read_lock, status) = if let Some(lock_ctx) = inner_enabled.lock_ctx.as_mut() {
1453            let lock = lock_ctx.lock.take(); // remove the lock from self
1454            if let Some(Locked::Read(r)) = lock {
1455                let now = Instant::now();
1456                // it's possible for a request to be locked more than once,
1457                // so wait the remainder of our configured timeout
1458                let status = if let Some(wait_timeout) = lock_ctx.wait_timeout {
1459                    let wait_timeout =
1460                        wait_timeout.saturating_sub(self.lock_duration().unwrap_or(Duration::ZERO));
1461                    match timeout(wait_timeout, r.wait()).await {
1462                        Ok(()) => r.lock_status(),
1463                        Err(_) => LockStatus::WaitTimeout,
1464                    }
1465                } else {
1466                    r.wait().await;
1467                    r.lock_status()
1468                };
1469                self.digest.add_lock_duration(now.elapsed());
1470                (r, status)
1471            } else {
1472                panic!("cache_lock_wait on wrong type of lock")
1473            }
1474        } else {
1475            panic!("cache_lock_wait without cache lock")
1476        };
1477        if let Some(lock_ctx) = self.inner_enabled().lock_ctx.as_ref() {
1478            lock_ctx
1479                .cache_lock
1480                .trace_lock_wait(&mut span, &read_lock, status);
1481        }
1482        status
1483    }
1484
1485    /// How long did this request wait behind the read lock
1486    pub fn lock_duration(&self) -> Option<Duration> {
1487        self.digest.lock_duration
1488    }
1489
1490    /// How long did this request spent on cache lookup and reading the header
1491    pub fn lookup_duration(&self) -> Option<Duration> {
1492        self.digest.lookup_duration
1493    }
1494
1495    /// Delete the asset from the cache storage
1496    /// # Panic
1497    /// Need to be called after the cache key is set. Panic otherwise.
1498    pub async fn purge(&self) -> Result<bool> {
1499        match self.phase {
1500            CachePhase::CacheKey => {
1501                let inner = self.inner();
1502                let inner_enabled = self.inner_enabled();
1503                let span = inner_enabled.traces.child("purge");
1504                let key = inner.key.as_ref().unwrap().to_compact();
1505                Self::purge_impl(inner_enabled.storage, inner_enabled.eviction, &key, span).await
1506            }
1507            _ => panic!("wrong phase {:?}", self.phase),
1508        }
1509    }
1510
1511    /// Delete the asset from the cache storage via a spawned task.
1512    /// Returns corresponding `JoinHandle` of that task.
1513    /// # Panic
1514    /// Need to be called after the cache key is set. Panic otherwise.
1515    pub fn spawn_async_purge(
1516        &self,
1517        context: &'static str,
1518    ) -> tokio::task::JoinHandle<Result<bool>> {
1519        if matches!(self.phase, CachePhase::Disabled(_) | CachePhase::Uninit) {
1520            panic!("wrong phase {:?}", self.phase);
1521        }
1522
1523        let inner_enabled = self.inner_enabled();
1524        let span = inner_enabled.traces.child("purge");
1525        let key = self.inner().key.as_ref().unwrap().to_compact();
1526        let storage = inner_enabled.storage;
1527        let eviction = inner_enabled.eviction;
1528        tokio::task::spawn(async move {
1529            Self::purge_impl(storage, eviction, &key, span)
1530                .await
1531                .map_err(|e| {
1532                    warn!("Failed to purge {key} (context: {context}): {e}");
1533                    e
1534                })
1535        })
1536    }
1537
1538    async fn purge_impl(
1539        storage: &'static (dyn storage::Storage + Sync),
1540        eviction: Option<&'static (dyn eviction::EvictionManager + Sync)>,
1541        key: &CompactCacheKey,
1542        mut span: Span,
1543    ) -> Result<bool> {
1544        let result = storage
1545            .purge(key, PurgeType::Invalidation, &span.handle())
1546            .await;
1547        let purged = matches!(result, Ok(true));
1548        // need to inform eviction manager if asset was removed
1549        if let Some(eviction) = eviction.as_ref() {
1550            if purged {
1551                eviction.remove(key);
1552            }
1553        }
1554        span.set_tag(|| trace::Tag::new("purged", purged));
1555        result
1556    }
1557
1558    /// Check the cacheable prediction
1559    ///
1560    /// Return true if the predictor is not set
1561    pub fn cacheable_prediction(&self) -> bool {
1562        if let Some(predictor) = self.inner().predictor {
1563            predictor.cacheable_prediction(self.cache_key())
1564        } else {
1565            true
1566        }
1567    }
1568
1569    /// Tell the predictor that this response, which is previously predicted to be uncacheable,
1570    /// is cacheable now.
1571    pub fn response_became_cacheable(&self) {
1572        if let Some(predictor) = self.inner().predictor {
1573            predictor.mark_cacheable(self.cache_key());
1574        }
1575    }
1576
1577    /// Tell the predictor that this response is uncacheable so that it will know next time
1578    /// this request arrives.
1579    pub fn response_became_uncacheable(&self, reason: NoCacheReason) {
1580        if let Some(predictor) = self.inner().predictor {
1581            predictor.mark_uncacheable(self.cache_key(), reason);
1582        }
1583    }
1584
1585    /// Tag all spans as being part of a subrequest.
1586    pub fn tag_as_subrequest(&mut self) {
1587        self.inner_enabled_mut()
1588            .traces
1589            .cache_span
1590            .set_tag(|| Tag::new("is_subrequest", true))
1591    }
1592}