pingora_cache/
lib.rs

1// Copyright 2025 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The HTTP caching layer for proxies.
16
17#![allow(clippy::new_without_default)]
18
19use cf_rustracing::tag::Tag;
20use http::{method::Method, request::Parts as ReqHeader, response::Parts as RespHeader};
21use key::{CacheHashKey, HashBinary};
22use lock::WritePermit;
23use log::warn;
24use pingora_error::Result;
25use pingora_http::ResponseHeader;
26use std::time::{Duration, Instant, SystemTime};
27use storage::MissFinishType;
28use strum::IntoStaticStr;
29use trace::CacheTraceCTX;
30
31pub mod cache_control;
32pub mod eviction;
33pub mod filters;
34pub mod hashtable;
35pub mod key;
36pub mod lock;
37pub mod max_file_size;
38mod memory;
39pub mod meta;
40pub mod predictor;
41pub mod put;
42pub mod storage;
43pub mod trace;
44mod variance;
45
46use crate::max_file_size::MaxFileSizeMissHandler;
47pub use key::CacheKey;
48use lock::{CacheKeyLockImpl, LockStatus, Locked};
49pub use memory::MemCache;
50pub use meta::{CacheMeta, CacheMetaDefaults};
51pub use storage::{HitHandler, MissHandler, PurgeType, Storage};
52pub use variance::VarianceBuilder;
53
54pub mod prelude {}
55
56/// The state machine for http caching
57///
58/// This object is used to handle the state and transitions for HTTP caching through the life of a
59/// request.
60pub struct HttpCache {
61    phase: CachePhase,
62    // Box the rest so that a disabled HttpCache struct is small
63    inner: Option<Box<HttpCacheInner>>,
64    digest: HttpCacheDigest,
65}
66
67/// This reflects the phase of HttpCache during the lifetime of a request
68#[derive(Clone, Copy, Debug, PartialEq, Eq)]
69pub enum CachePhase {
70    /// Cache disabled, with reason (NeverEnabled if never explicitly used)
71    Disabled(NoCacheReason),
72    /// Cache enabled but nothing is set yet
73    Uninit,
74    /// Cache was enabled, the request decided not to use it
75    // HttpCache.inner is kept
76    Bypass,
77    /// Awaiting the cache key to be generated
78    CacheKey,
79    /// Cache hit
80    Hit,
81    /// No cached asset is found
82    Miss,
83    /// A staled (expired) asset is found
84    Stale,
85    /// A staled (expired) asset was found, but another request is revalidating it
86    StaleUpdating,
87    /// A staled (expired) asset was found, so a fresh one was fetched
88    Expired,
89    /// A staled (expired) asset was found, and it was revalidated to be fresh
90    Revalidated,
91    /// Revalidated, but deemed uncacheable, so we do not freshen it
92    RevalidatedNoCache(NoCacheReason),
93}
94
95impl CachePhase {
96    /// Convert [CachePhase] as `str`, for logging and debugging.
97    pub fn as_str(&self) -> &'static str {
98        match self {
99            CachePhase::Disabled(_) => "disabled",
100            CachePhase::Uninit => "uninitialized",
101            CachePhase::Bypass => "bypass",
102            CachePhase::CacheKey => "key",
103            CachePhase::Hit => "hit",
104            CachePhase::Miss => "miss",
105            CachePhase::Stale => "stale",
106            CachePhase::StaleUpdating => "stale-updating",
107            CachePhase::Expired => "expired",
108            CachePhase::Revalidated => "revalidated",
109            CachePhase::RevalidatedNoCache(_) => "revalidated-nocache",
110        }
111    }
112}
113
114/// The possible reasons for not caching
115#[derive(Copy, Clone, Debug, PartialEq, Eq)]
116pub enum NoCacheReason {
117    /// Caching is not enabled to begin with
118    NeverEnabled,
119    /// Origin directives indicated this was not cacheable
120    OriginNotCache,
121    /// Response size was larger than the cache's configured maximum asset size
122    ResponseTooLarge,
123    /// Due to internal caching storage error
124    StorageError,
125    /// Due to other types of internal issues
126    InternalError,
127    /// will be cacheable but skip cache admission now
128    ///
129    /// This happens when the cache predictor predicted that this request is not cacheable, but
130    /// the response turns out to be OK to cache. However, it might be too large to re-enable caching
131    /// for this request
132    Deferred,
133    /// Due to the proxy upstream filter declining the current request from going upstream
134    DeclinedToUpstream,
135    /// Due to the upstream being unreachable or otherwise erroring during proxying
136    UpstreamError,
137    /// The writer of the cache lock sees that the request is not cacheable (Could be OriginNotCache)
138    CacheLockGiveUp,
139    /// This request waited too long for the writer of the cache lock to finish, so this request will
140    /// fetch from the origin without caching
141    CacheLockTimeout,
142    /// Other custom defined reasons
143    Custom(&'static str),
144}
145
146impl NoCacheReason {
147    /// Convert [NoCacheReason] as `str`, for logging and debugging.
148    pub fn as_str(&self) -> &'static str {
149        use NoCacheReason::*;
150        match self {
151            NeverEnabled => "NeverEnabled",
152            OriginNotCache => "OriginNotCache",
153            ResponseTooLarge => "ResponseTooLarge",
154            StorageError => "StorageError",
155            InternalError => "InternalError",
156            Deferred => "Deferred",
157            DeclinedToUpstream => "DeclinedToUpstream",
158            UpstreamError => "UpstreamError",
159            CacheLockGiveUp => "CacheLockGiveUp",
160            CacheLockTimeout => "CacheLockTimeout",
161            Custom(s) => s,
162        }
163    }
164}
165
166/// Information collected about the caching operation that will not be cleared
167#[derive(Debug, Default)]
168pub struct HttpCacheDigest {
169    pub lock_duration: Option<Duration>,
170    // time spent in cache lookup and reading the header
171    pub lookup_duration: Option<Duration>,
172}
173
174/// Convenience function to add a duration to an optional duration
175fn add_duration_to_opt(target_opt: &mut Option<Duration>, to_add: Duration) {
176    *target_opt = Some(target_opt.map_or(to_add, |existing| existing + to_add));
177}
178
179impl HttpCacheDigest {
180    fn add_lookup_duration(&mut self, extra_lookup_duration: Duration) {
181        add_duration_to_opt(&mut self.lookup_duration, extra_lookup_duration)
182    }
183
184    fn add_lock_duration(&mut self, extra_lock_duration: Duration) {
185        add_duration_to_opt(&mut self.lock_duration, extra_lock_duration)
186    }
187}
188
189/// Response cacheable decision
190///
191///
192#[derive(Debug)]
193pub enum RespCacheable {
194    Cacheable(CacheMeta),
195    Uncacheable(NoCacheReason),
196}
197
198impl RespCacheable {
199    /// Whether it is cacheable
200    #[inline]
201    pub fn is_cacheable(&self) -> bool {
202        matches!(*self, Self::Cacheable(_))
203    }
204
205    /// Unwrap [RespCacheable] to get the [CacheMeta] stored
206    /// # Panic
207    /// Panic when this object is not cacheable. Check [Self::is_cacheable()] first.
208    pub fn unwrap_meta(self) -> CacheMeta {
209        match self {
210            Self::Cacheable(meta) => meta,
211            Self::Uncacheable(_) => panic!("expected Cacheable value"),
212        }
213    }
214}
215
216/// Indicators of which level of purge logic to apply to an asset. As in should
217/// the purged file be revalidated or re-retrieved altogether
218#[derive(Debug, Clone, Copy, PartialEq, Eq)]
219pub enum ForcedInvalidationKind {
220    /// Indicates the asset should be considered stale and revalidated
221    ForceExpired,
222
223    /// Indicates the asset should be considered absent and treated like a miss
224    /// instead of a hit
225    ForceMiss,
226}
227
228/// Freshness state of cache hit asset
229///
230///
231#[derive(Debug, Copy, Clone, IntoStaticStr, PartialEq, Eq)]
232#[strum(serialize_all = "snake_case")]
233pub enum HitStatus {
234    /// The asset's freshness directives indicate it has expired
235    Expired,
236
237    /// The asset was marked as expired, and should be treated as stale
238    ForceExpired,
239
240    /// The asset was marked as absent, and should be treated as a miss
241    ForceMiss,
242
243    /// An error occurred while processing the asset, so it should be treated as
244    /// a miss
245    FailedHitFilter,
246
247    /// The asset is not expired
248    Fresh,
249}
250
251impl HitStatus {
252    /// For displaying cache hit status
253    pub fn as_str(&self) -> &'static str {
254        self.into()
255    }
256
257    /// Whether cached asset can be served as fresh
258    pub fn is_fresh(&self) -> bool {
259        *self == HitStatus::Fresh
260    }
261
262    /// Check whether the hit status should be treated as a miss. A forced miss
263    /// is obviously treated as a miss. A hit-filter failure is treated as a
264    /// miss because we can't use the asset as an actual hit. If we treat it as
265    /// expired, we still might not be able to use it even if revalidation
266    /// succeeds.
267    pub fn is_treated_as_miss(self) -> bool {
268        matches!(self, HitStatus::ForceMiss | HitStatus::FailedHitFilter)
269    }
270}
271
272struct HttpCacheInner {
273    pub key: Option<CacheKey>,
274    pub meta: Option<CacheMeta>,
275    // when set, even if an asset exists, it would only be considered valid after this timestamp
276    pub valid_after: Option<SystemTime>,
277    // when set, an asset will be rejected from the cache if it exceeds this size in bytes
278    pub max_file_size_bytes: Option<usize>,
279    pub miss_handler: Option<MissHandler>,
280    pub body_reader: Option<HitHandler>,
281    pub storage: &'static (dyn storage::Storage + Sync), // static for now
282    pub eviction: Option<&'static (dyn eviction::EvictionManager + Sync)>,
283    pub predictor: Option<&'static (dyn predictor::CacheablePredictor + Sync)>,
284    pub lock: Option<Locked>, // TODO: these 3 fields should come in 1 sub struct
285    pub cache_lock: Option<&'static CacheKeyLockImpl>,
286    pub traces: trace::CacheTraceCTX,
287}
288
289impl HttpCache {
290    /// Create a new [HttpCache].
291    ///
292    /// Caching is not enabled by default.
293    pub fn new() -> Self {
294        HttpCache {
295            phase: CachePhase::Disabled(NoCacheReason::NeverEnabled),
296            inner: None,
297            digest: HttpCacheDigest::default(),
298        }
299    }
300
301    /// Whether the cache is enabled
302    pub fn enabled(&self) -> bool {
303        !matches!(self.phase, CachePhase::Disabled(_) | CachePhase::Bypass)
304    }
305
306    /// Whether the cache is being bypassed
307    pub fn bypassing(&self) -> bool {
308        matches!(self.phase, CachePhase::Bypass)
309    }
310
311    /// Return the [CachePhase]
312    pub fn phase(&self) -> CachePhase {
313        self.phase
314    }
315
316    /// Whether anything was fetched from the upstream
317    ///
318    /// This essentially checks all possible [CachePhase] who need to contact the upstream server
319    pub fn upstream_used(&self) -> bool {
320        use CachePhase::*;
321        match self.phase {
322            Disabled(_) | Bypass | Miss | Expired | Revalidated | RevalidatedNoCache(_) => true,
323            Hit | Stale | StaleUpdating => false,
324            Uninit | CacheKey => false, // invalid states for this call, treat them as false to keep it simple
325        }
326    }
327
328    /// Check whether the backend storage is the type `T`.
329    pub fn storage_type_is<T: 'static>(&self) -> bool {
330        self.inner
331            .as_ref()
332            .and_then(|inner| inner.storage.as_any().downcast_ref::<T>())
333            .is_some()
334    }
335
336    /// Release the cache lock if the current request is a cache writer.
337    ///
338    /// Generally callers should prefer using `disable` when a cache lock should be released
339    /// due to an error to clear all cache context. This function is for releasing the cache lock
340    /// while still keeping the cache around for reading, e.g. when serving stale.
341    pub fn release_write_lock(&mut self, reason: NoCacheReason) {
342        use NoCacheReason::*;
343        if let Some(inner) = self.inner.as_mut() {
344            let lock = inner.lock.take();
345            if let Some(Locked::Write(permit)) = lock {
346                let lock_status = match reason {
347                    // let the next request try to fetch it
348                    InternalError | StorageError | Deferred | UpstreamError => {
349                        LockStatus::TransientError
350                    }
351                    // depends on why the proxy upstream filter declined the request,
352                    // for now still allow next request try to acquire to avoid thundering herd
353                    DeclinedToUpstream => LockStatus::TransientError,
354                    // no need for the lock anymore
355                    OriginNotCache | ResponseTooLarge => LockStatus::GiveUp,
356                    // not sure which LockStatus make sense, we treat it as GiveUp for now
357                    Custom(_) => LockStatus::GiveUp,
358                    // should never happen, NeverEnabled shouldn't hold a lock
359                    NeverEnabled => panic!("NeverEnabled holds a write lock"),
360                    CacheLockGiveUp | CacheLockTimeout => {
361                        panic!("CacheLock* are for cache lock readers only")
362                    }
363                };
364                inner
365                    .cache_lock
366                    .unwrap()
367                    .release(inner.key.as_ref().unwrap(), permit, lock_status);
368            }
369        }
370    }
371
372    /// Disable caching
373    pub fn disable(&mut self, reason: NoCacheReason) {
374        match self.phase {
375            CachePhase::Disabled(_) => {
376                // replace reason
377                self.phase = CachePhase::Disabled(reason);
378            }
379            _ => {
380                self.phase = CachePhase::Disabled(reason);
381                self.release_write_lock(reason);
382                // log initial disable reason
383                self.inner_mut()
384                    .traces
385                    .cache_span
386                    .set_tag(|| trace::Tag::new("disable_reason", reason.as_str()));
387                self.inner = None;
388            }
389        }
390    }
391
392    /* The following methods panic when they are used in the wrong phase.
393     * This is better than returning errors as such panics are only caused by coding error, which
394     * should be fixed right away. Tokio runtime only crashes the current task instead of the whole
395     * program when these panics happen. */
396
397    /// Set the cache to bypass
398    ///
399    /// # Panic
400    /// This call is only allowed in [CachePhase::CacheKey] phase (before any cache lookup is performed).
401    /// Use it in any other phase will lead to panic.
402    pub fn bypass(&mut self) {
403        match self.phase {
404            CachePhase::CacheKey => {
405                // before cache lookup / found / miss
406                self.phase = CachePhase::Bypass;
407                self.inner_mut()
408                    .traces
409                    .cache_span
410                    .set_tag(|| trace::Tag::new("bypassed", true));
411            }
412            _ => panic!("wrong phase to bypass HttpCache {:?}", self.phase),
413        }
414    }
415
416    /// Enable the cache
417    ///
418    /// - `storage`: the cache storage backend that implements [storage::Storage]
419    /// - `eviction`: optionally the eviction manager, without it, nothing will be evicted from the storage
420    /// - `predictor`: optionally a cache predictor. The cache predictor predicts whether something is likely
421    ///   to be cacheable or not. This is useful because the proxy can apply different types of optimization to
422    ///   cacheable and uncacheable requests.
423    /// - `cache_lock`: optionally a cache lock which handles concurrent lookups to the same asset. Without it
424    ///   such lookups will all be allowed to fetch the asset independently.
425    pub fn enable(
426        &mut self,
427        storage: &'static (dyn storage::Storage + Sync),
428        eviction: Option<&'static (dyn eviction::EvictionManager + Sync)>,
429        predictor: Option<&'static (dyn predictor::CacheablePredictor + Sync)>,
430        cache_lock: Option<&'static CacheKeyLockImpl>,
431    ) {
432        match self.phase {
433            CachePhase::Disabled(_) => {
434                self.phase = CachePhase::Uninit;
435                self.inner = Some(Box::new(HttpCacheInner {
436                    key: None,
437                    meta: None,
438                    valid_after: None,
439                    max_file_size_bytes: None,
440                    miss_handler: None,
441                    body_reader: None,
442                    storage,
443                    eviction,
444                    predictor,
445                    lock: None,
446                    cache_lock,
447                    traces: CacheTraceCTX::new(),
448                }));
449            }
450            _ => panic!("Cannot enable already enabled HttpCache {:?}", self.phase),
451        }
452    }
453
454    // Enable distributed tracing
455    pub fn enable_tracing(&mut self, parent_span: trace::Span) {
456        if let Some(inner) = self.inner.as_mut() {
457            inner.traces.enable(parent_span);
458        }
459    }
460
461    // Get the cache parent tracing span
462    pub fn get_cache_span(&self) -> Option<trace::SpanHandle> {
463        self.inner.as_ref().map(|i| i.traces.get_cache_span())
464    }
465
466    // Get the cache `miss` tracing span
467    pub fn get_miss_span(&self) -> Option<trace::SpanHandle> {
468        self.inner.as_ref().map(|i| i.traces.get_miss_span())
469    }
470
471    // Get the cache `hit` tracing span
472    pub fn get_hit_span(&self) -> Option<trace::SpanHandle> {
473        self.inner.as_ref().map(|i| i.traces.get_hit_span())
474    }
475
476    // shortcut to access inner, panic if phase is disabled
477    #[inline]
478    fn inner_mut(&mut self) -> &mut HttpCacheInner {
479        self.inner.as_mut().unwrap()
480    }
481
482    #[inline]
483    fn inner(&self) -> &HttpCacheInner {
484        self.inner.as_ref().unwrap()
485    }
486
487    /// Set the cache key
488    /// # Panic
489    /// Cache key is only allowed to be set in its own phase. Set it in other phases will cause panic.
490    pub fn set_cache_key(&mut self, key: CacheKey) {
491        match self.phase {
492            CachePhase::Uninit | CachePhase::CacheKey => {
493                self.phase = CachePhase::CacheKey;
494                self.inner_mut().key = Some(key);
495            }
496            _ => panic!("wrong phase {:?}", self.phase),
497        }
498    }
499
500    /// Return the cache key used for asset lookup
501    /// # Panic
502    /// Can only be called after the cache key is set and the cache is not disabled. Panic otherwise.
503    pub fn cache_key(&self) -> &CacheKey {
504        match self.phase {
505            CachePhase::Disabled(_) | CachePhase::Uninit => panic!("wrong phase {:?}", self.phase),
506            _ => self.inner().key.as_ref().unwrap(),
507        }
508    }
509
510    /// Return the max size allowed to be cached.
511    pub fn max_file_size_bytes(&self) -> Option<usize> {
512        match self.phase {
513            CachePhase::Disabled(_) | CachePhase::Uninit => panic!("wrong phase {:?}", self.phase),
514            _ => self.inner().max_file_size_bytes,
515        }
516    }
517
518    /// Set the maximum response _body_ size in bytes that will be admitted to the cache.
519    ///
520    /// Response header size does not contribute to the max file size.
521    pub fn set_max_file_size_bytes(&mut self, max_file_size_bytes: usize) {
522        match self.phase {
523            CachePhase::Disabled(_) => panic!("wrong phase {:?}", self.phase),
524            _ => {
525                self.inner_mut().max_file_size_bytes = Some(max_file_size_bytes);
526            }
527        }
528    }
529
530    /// Set that cache is found in cache storage.
531    ///
532    /// This function is called after [Self::cache_lookup()] which returns the [CacheMeta] and
533    /// [HitHandler].
534    ///
535    /// The `hit_status` enum allows the caller to force expire assets.
536    pub fn cache_found(&mut self, meta: CacheMeta, hit_handler: HitHandler, hit_status: HitStatus) {
537        // Stale allowed because of cache lock and then retry
538        if !matches!(self.phase, CachePhase::CacheKey | CachePhase::Stale) {
539            panic!("wrong phase {:?}", self.phase)
540        }
541
542        self.phase = match hit_status {
543            HitStatus::Fresh => CachePhase::Hit,
544            HitStatus::Expired | HitStatus::ForceExpired => CachePhase::Stale,
545            HitStatus::FailedHitFilter | HitStatus::ForceMiss => self.phase,
546        };
547
548        let phase = self.phase;
549        let inner = self.inner_mut();
550
551        let key = inner.key.as_ref().unwrap();
552
553        // The cache lock might not be set for stale hit or hits treated as
554        // misses, so we need to initialize it here
555        if phase == CachePhase::Stale || hit_status.is_treated_as_miss() {
556            if let Some(lock) = inner.cache_lock.as_ref() {
557                inner.lock = Some(lock.lock(key));
558            }
559        }
560
561        if hit_status.is_treated_as_miss() {
562            // Clear the body and meta for hits that are treated as misses
563            inner.body_reader = None;
564            inner.meta = None;
565        } else {
566            // Set the metadata appropriately for legit hits
567            inner.traces.start_hit_span(phase, hit_status);
568            inner.traces.log_meta_in_hit_span(&meta);
569            if let Some(eviction) = inner.eviction {
570                // TODO: make access() accept CacheKey
571                let cache_key = key.to_compact();
572                if hit_handler.should_count_access() {
573                    let size = hit_handler.get_eviction_weight();
574                    eviction.access(&cache_key, size, meta.0.internal.fresh_until);
575                }
576            }
577            inner.meta = Some(meta);
578            inner.body_reader = Some(hit_handler);
579        }
580    }
581
582    /// Mark `self` to be cache miss.
583    ///
584    /// This function is called after [Self::cache_lookup()] finds nothing or the caller decides
585    /// not to use the assets found.
586    /// # Panic
587    /// Panic in other phases.
588    pub fn cache_miss(&mut self) {
589        match self.phase {
590            // from CacheKey: set state to miss during cache lookup
591            // from Bypass: response became cacheable, set state to miss to cache
592            // from Stale: waited for cache lock, then retried and found asset was gone
593            CachePhase::CacheKey | CachePhase::Bypass | CachePhase::Stale => {
594                self.phase = CachePhase::Miss;
595                // It's possible that we've set the meta on lookup and have come back around
596                // here after not being able to acquire the cache lock, and our item has since
597                // purged or expired. We should be sure that the meta is not set in this case
598                // as there shouldn't be a meta set for cache misses.
599                self.inner_mut().meta = None;
600                self.inner_mut().traces.start_miss_span();
601            }
602            _ => panic!("wrong phase {:?}", self.phase),
603        }
604    }
605
606    /// Return the [HitHandler]
607    /// # Panic
608    /// Call this after [Self::cache_found()], panic in other phases.
609    pub fn hit_handler(&mut self) -> &mut HitHandler {
610        match self.phase {
611            CachePhase::Hit
612            | CachePhase::Stale
613            | CachePhase::StaleUpdating
614            | CachePhase::Revalidated
615            | CachePhase::RevalidatedNoCache(_) => self.inner_mut().body_reader.as_mut().unwrap(),
616            _ => panic!("wrong phase {:?}", self.phase),
617        }
618    }
619
620    /// Return the body reader during a cache admission (miss/expired) which decouples the downstream
621    /// read and upstream cache write
622    pub fn miss_body_reader(&mut self) -> Option<&mut HitHandler> {
623        match self.phase {
624            CachePhase::Miss | CachePhase::Expired => {
625                let inner = self.inner_mut();
626                if inner.storage.support_streaming_partial_write() {
627                    inner.body_reader.as_mut()
628                } else {
629                    // body_reader could be set even when the storage doesn't support streaming
630                    // Expired cache would have the reader set.
631                    None
632                }
633            }
634            _ => None,
635        }
636    }
637
638    /// Call this when cache hit is fully read.
639    ///
640    /// This call will release resource if any and log the timing in tracing if set.
641    /// # Panic
642    /// Panic in phases where there is no cache hit.
643    pub async fn finish_hit_handler(&mut self) -> Result<()> {
644        match self.phase {
645            CachePhase::Hit
646            | CachePhase::Miss
647            | CachePhase::Expired
648            | CachePhase::Stale
649            | CachePhase::StaleUpdating
650            | CachePhase::Revalidated
651            | CachePhase::RevalidatedNoCache(_) => {
652                let inner = self.inner_mut();
653                if inner.body_reader.is_none() {
654                    // already finished, we allow calling this function more than once
655                    return Ok(());
656                }
657                let body_reader = inner.body_reader.take().unwrap();
658                let key = inner.key.as_ref().unwrap();
659                let result = body_reader
660                    .finish(inner.storage, key, &inner.traces.hit_span.handle())
661                    .await;
662                inner.traces.finish_hit_span();
663                result
664            }
665            _ => panic!("wrong phase {:?}", self.phase),
666        }
667    }
668
669    /// Set the [MissHandler] according to cache_key and meta, can only call once
670    pub async fn set_miss_handler(&mut self) -> Result<()> {
671        match self.phase {
672            // set_miss_handler() needs to be called after set_cache_meta() (which change Stale to Expire).
673            // This is an artificial rule to enforce the state transitions
674            CachePhase::Miss | CachePhase::Expired => {
675                let max_file_size_bytes = self.max_file_size_bytes();
676
677                let inner = self.inner_mut();
678                if inner.miss_handler.is_some() {
679                    panic!("write handler is already set")
680                }
681                let meta = inner.meta.as_ref().unwrap();
682                let key = inner.key.as_ref().unwrap();
683                let miss_handler = inner
684                    .storage
685                    .get_miss_handler(key, meta, &inner.traces.get_miss_span())
686                    .await?;
687
688                inner.miss_handler = if let Some(max_size) = max_file_size_bytes {
689                    Some(Box::new(MaxFileSizeMissHandler::new(
690                        miss_handler,
691                        max_size,
692                    )))
693                } else {
694                    Some(miss_handler)
695                };
696
697                if inner.storage.support_streaming_partial_write() {
698                    // If a reader can access partial write, the cache lock can be released here
699                    // to let readers start reading the body.
700                    let lock = inner.lock.take();
701                    if let Some(Locked::Write(permit)) = lock {
702                        inner
703                            .cache_lock
704                            .expect("must have cache lock to have write permit")
705                            .release(key, permit, LockStatus::Done);
706                    }
707                    // Downstream read and upstream write can be decoupled
708                    let body_reader = inner
709                        .storage
710                        .lookup_streaming_write(
711                            key,
712                            inner
713                                .miss_handler
714                                .as_ref()
715                                .expect("miss handler already set")
716                                .streaming_write_tag(),
717                            &inner.traces.get_miss_span(),
718                        )
719                        .await?;
720
721                    if let Some((_meta, body_reader)) = body_reader {
722                        inner.body_reader = Some(body_reader);
723                    } else {
724                        // body_reader should exist now because streaming_partial_write is to support it
725                        panic!("unable to get body_reader for {:?}", meta);
726                    }
727                }
728                Ok(())
729            }
730            _ => panic!("wrong phase {:?}", self.phase),
731        }
732    }
733
734    /// Return the [MissHandler] to write the response body to cache.
735    ///
736    /// `None`: the handler has not been set or already finished
737    pub fn miss_handler(&mut self) -> Option<&mut MissHandler> {
738        match self.phase {
739            CachePhase::Miss | CachePhase::Expired => self.inner_mut().miss_handler.as_mut(),
740            _ => panic!("wrong phase {:?}", self.phase),
741        }
742    }
743
744    /// Finish cache admission
745    ///
746    /// If [self] is dropped without calling this, the cache admission is considered incomplete and
747    /// should be cleaned up.
748    ///
749    /// This call will also trigger eviction if set.
750    pub async fn finish_miss_handler(&mut self) -> Result<()> {
751        match self.phase {
752            CachePhase::Miss | CachePhase::Expired => {
753                let inner = self.inner_mut();
754                if inner.miss_handler.is_none() {
755                    // already finished, we allow calling this function more than once
756                    return Ok(());
757                }
758                let miss_handler = inner.miss_handler.take().unwrap();
759                let finish = miss_handler.finish().await?;
760                let lock = inner.lock.take();
761                let key = inner.key.as_ref().unwrap();
762                if let Some(Locked::Write(permit)) = lock {
763                    // no need to call r.unlock() because release() will call it
764                    // r is a guard to make sure the lock is unlocked when this request is dropped
765                    inner
766                        .cache_lock
767                        .unwrap()
768                        .release(key, permit, LockStatus::Done);
769                }
770                if let Some(eviction) = inner.eviction {
771                    let cache_key = key.to_compact();
772                    let meta = inner.meta.as_ref().unwrap();
773                    let evicted = match finish {
774                        MissFinishType::Created(size) => {
775                            eviction.admit(cache_key, size, meta.0.internal.fresh_until)
776                        }
777                        MissFinishType::Appended(size) => {
778                            eviction.increment_weight(cache_key, size)
779                        }
780                    };
781                    // actual eviction can be done async
782                    let span = inner.traces.child("eviction");
783                    let handle = span.handle();
784                    let storage = inner.storage;
785                    tokio::task::spawn(async move {
786                        for item in evicted {
787                            if let Err(e) = storage.purge(&item, PurgeType::Eviction, &handle).await
788                            {
789                                warn!("Failed to purge {item} during eviction for finish miss handler: {e}");
790                            }
791                        }
792                    });
793                }
794                inner.traces.finish_miss_span();
795                Ok(())
796            }
797            _ => panic!("wrong phase {:?}", self.phase),
798        }
799    }
800
801    /// Set the [CacheMeta] of the cache
802    pub fn set_cache_meta(&mut self, meta: CacheMeta) {
803        match self.phase {
804            // TODO: store the staled meta somewhere else for future use?
805            CachePhase::Stale | CachePhase::Miss => {
806                let inner = self.inner_mut();
807                // TODO: have a separate expired span?
808                inner.traces.log_meta_in_miss_span(&meta);
809                inner.meta = Some(meta);
810            }
811            _ => panic!("wrong phase {:?}", self.phase),
812        }
813        if self.phase == CachePhase::Stale {
814            self.phase = CachePhase::Expired;
815        }
816    }
817
818    /// Set the [CacheMeta] of the cache after revalidation.
819    ///
820    /// Certain info such as the original cache admission time will be preserved. Others will
821    /// be replaced by the input `meta`.
822    pub async fn revalidate_cache_meta(&mut self, mut meta: CacheMeta) -> Result<bool> {
823        let result = match self.phase {
824            CachePhase::Stale => {
825                let inner = self.inner_mut();
826                // TODO: we should keep old meta in place, just use new one to update it
827                // that requires cacheable_filter to take a mut header and just return InternalMeta
828
829                // update new meta with old meta's created time
830                let old_meta = inner.meta.take().unwrap();
831                let created = old_meta.0.internal.created;
832                meta.0.internal.created = created;
833                // meta.internal.updated was already set to new meta's `created`,
834                // no need to set `updated` here
835                // Merge old extensions with new ones. New exts take precedence if they conflict.
836                let mut extensions = old_meta.0.extensions;
837                extensions.extend(meta.0.extensions);
838                meta.0.extensions = extensions;
839
840                inner.meta.replace(meta);
841
842                let lock = inner.lock.take();
843                if let Some(Locked::Write(permit)) = lock {
844                    inner.cache_lock.unwrap().release(
845                        inner.key.as_ref().unwrap(),
846                        permit,
847                        LockStatus::Done,
848                    );
849                }
850
851                let mut span = inner.traces.child("update_meta");
852                // TODO: this call can be async
853                let result = inner
854                    .storage
855                    .update_meta(
856                        inner.key.as_ref().unwrap(),
857                        inner.meta.as_ref().unwrap(),
858                        &span.handle(),
859                    )
860                    .await;
861                span.set_tag(|| trace::Tag::new("updated", result.is_ok()));
862                result
863            }
864            _ => panic!("wrong phase {:?}", self.phase),
865        };
866        self.phase = CachePhase::Revalidated;
867        result
868    }
869
870    /// After a successful revalidation, update certain headers for the cached asset
871    /// such as `Etag` with the fresh response header `resp`.
872    pub fn revalidate_merge_header(&mut self, resp: &RespHeader) -> ResponseHeader {
873        match self.phase {
874            CachePhase::Stale => {
875                /*
876                 * https://datatracker.ietf.org/doc/html/rfc9110#section-15.4.5
877                 * 304 response MUST generate ... would have been sent in a 200 ...
878                 * - Content-Location, Date, ETag, and Vary
879                 * - Cache-Control and Expires...
880                 */
881                let mut old_header = self.inner().meta.as_ref().unwrap().0.header.clone();
882                let mut clone_header = |header_name: &'static str| {
883                    for (i, value) in resp.headers.get_all(header_name).iter().enumerate() {
884                        if i == 0 {
885                            old_header
886                                .insert_header(header_name, value)
887                                .expect("can add valid header");
888                        } else {
889                            old_header
890                                .append_header(header_name, value)
891                                .expect("can add valid header");
892                        }
893                    }
894                };
895                clone_header("cache-control");
896                clone_header("expires");
897                clone_header("cache-tag");
898                clone_header("cdn-cache-control");
899                clone_header("etag");
900                // https://datatracker.ietf.org/doc/html/rfc9111#section-4.3.4
901                // "...cache MUST update its header fields with the header fields provided in the 304..."
902                // But if the Vary header changes, the cached response may no longer match the
903                // incoming request.
904                //
905                // For simplicity, ignore changing Vary in revalidation for now.
906                // TODO: if we support vary during revalidation, there are a few edge cases to
907                // consider (what if Vary header appears/disappears/changes)?
908                //
909                // clone_header("vary");
910                old_header
911            }
912            _ => panic!("wrong phase {:?}", self.phase),
913        }
914    }
915
916    /// Mark this asset uncacheable after revalidation
917    pub fn revalidate_uncacheable(&mut self, header: ResponseHeader, reason: NoCacheReason) {
918        match self.phase {
919            CachePhase::Stale => {
920                // replace cache meta header
921                self.inner_mut().meta.as_mut().unwrap().0.header = header;
922                // upstream request done, release write lock
923                self.release_write_lock(reason);
924            }
925            _ => panic!("wrong phase {:?}", self.phase),
926        }
927        self.phase = CachePhase::RevalidatedNoCache(reason);
928        // TODO: remove this asset from cache once finished?
929    }
930
931    /// Mark this asset as stale, but being updated separately from this request.
932    pub fn set_stale_updating(&mut self) {
933        match self.phase {
934            CachePhase::Stale => self.phase = CachePhase::StaleUpdating,
935            _ => panic!("wrong phase {:?}", self.phase),
936        }
937    }
938
939    /// Update the variance of the [CacheMeta].
940    ///
941    /// Note that this process may change the lookup `key`, and eventually (when the asset is
942    /// written to storage) invalidate other cached variants under the same primary key as the
943    /// current asset.
944    pub fn update_variance(&mut self, variance: Option<HashBinary>) {
945        // If this is a cache miss, we will simply update the variance in the meta.
946        //
947        // If this is an expired response, we will have to consider a few cases:
948        //
949        // **Case 1**: Variance was absent, but caller sets it now.
950        // We will just insert it into the meta. The current asset becomes the primary variant.
951        // Because the current location of the asset is already the primary variant, nothing else
952        // needs to be done.
953        //
954        // **Case 2**: Variance was present, but it changed or was removed.
955        // We want the current asset to take over the primary slot, in order to invalidate all
956        // other variants derived under the old Vary.
957        //
958        // **Case 3**: Variance did not change.
959        // Nothing needs to happen.
960        let inner = match self.phase {
961            CachePhase::Miss | CachePhase::Expired => self.inner_mut(),
962            _ => panic!("wrong phase {:?}", self.phase),
963        };
964
965        // Update the variance in the meta
966        if let Some(variance_hash) = variance.as_ref() {
967            inner
968                .meta
969                .as_mut()
970                .unwrap()
971                .set_variance_key(*variance_hash);
972        } else {
973            inner.meta.as_mut().unwrap().remove_variance();
974        }
975
976        // Change the lookup `key` if necessary, in order to admit asset into the primary slot
977        // instead of the secondary slot.
978        let key = inner.key.as_ref().unwrap();
979        if let Some(old_variance) = key.get_variance_key().as_ref() {
980            // This is a secondary variant slot.
981            if Some(*old_variance) != variance.as_ref() {
982                // This new variance does not match the variance in the cache key we used to look
983                // up this asset.
984                // Drop the cache lock to avoid leaving a dangling lock
985                // (because we locked with the old cache key for the secondary slot)
986                // TODO: maybe we should try to signal waiting readers to compete for the primary key
987                // lock instead? we will not be modifying this secondary slot so it's not actually
988                // ready for readers
989                if let Some(Locked::Write(permit)) = inner.lock.take() {
990                    inner
991                        .cache_lock
992                        .unwrap()
993                        .release(key, permit, LockStatus::Done);
994                }
995                // Remove the `variance` from the `key`, so that we admit this asset into the
996                // primary slot. (`key` is used to tell storage where to write the data.)
997                inner.key.as_mut().unwrap().remove_variance_key();
998            }
999        }
1000    }
1001
1002    /// Return the [CacheMeta] of this asset
1003    ///
1004    /// # Panic
1005    /// Panic in phases which has no cache meta.
1006    pub fn cache_meta(&self) -> &CacheMeta {
1007        match self.phase {
1008            // TODO: allow in Bypass phase?
1009            CachePhase::Stale
1010            | CachePhase::StaleUpdating
1011            | CachePhase::Expired
1012            | CachePhase::Hit
1013            | CachePhase::Revalidated
1014            | CachePhase::RevalidatedNoCache(_) => self.inner().meta.as_ref().unwrap(),
1015            CachePhase::Miss => {
1016                // this is the async body read case, safe because body_reader is only set
1017                // after meta is retrieved
1018                if self.inner().body_reader.is_some() {
1019                    self.inner().meta.as_ref().unwrap()
1020                } else {
1021                    panic!("wrong phase {:?}", self.phase);
1022                }
1023            }
1024
1025            _ => panic!("wrong phase {:?}", self.phase),
1026        }
1027    }
1028
1029    /// Return the [CacheMeta] of this asset if any
1030    ///
1031    /// Different from [Self::cache_meta()], this function is allowed to be called in
1032    /// [CachePhase::Miss] phase where the cache meta maybe set.
1033    /// # Panic
1034    /// Panic in phases that shouldn't have cache meta.
1035    pub fn maybe_cache_meta(&self) -> Option<&CacheMeta> {
1036        match self.phase {
1037            CachePhase::Miss
1038            | CachePhase::Stale
1039            | CachePhase::StaleUpdating
1040            | CachePhase::Expired
1041            | CachePhase::Hit
1042            | CachePhase::Revalidated
1043            | CachePhase::RevalidatedNoCache(_) => self.inner().meta.as_ref(),
1044            _ => panic!("wrong phase {:?}", self.phase),
1045        }
1046    }
1047
1048    /// Perform the cache lookup from the given cache storage with the given cache key
1049    ///
1050    /// A cache hit will return [CacheMeta] which contains the header and meta info about
1051    /// the cache as well as a [HitHandler] to read the cache hit body.
1052    /// # Panic
1053    /// Panic in other phases.
1054    pub async fn cache_lookup(&mut self) -> Result<Option<(CacheMeta, HitHandler)>> {
1055        match self.phase {
1056            // Stale is allowed here because stale-> cache_lock -> lookup again
1057            CachePhase::CacheKey | CachePhase::Stale => {
1058                let inner = self
1059                    .inner
1060                    .as_mut()
1061                    .expect("Cache phase is checked and should have inner");
1062                let mut span = inner.traces.child("lookup");
1063                let key = inner.key.as_ref().unwrap(); // safe, this phase should have cache key
1064                let now = Instant::now();
1065                let result = inner.storage.lookup(key, &span.handle()).await?;
1066                // one request may have multiple lookups
1067                self.digest.add_lookup_duration(now.elapsed());
1068                let result = result.and_then(|(meta, header)| {
1069                    if let Some(ts) = inner.valid_after {
1070                        if meta.created() < ts {
1071                            span.set_tag(|| trace::Tag::new("not valid", true));
1072                            return None;
1073                        }
1074                    }
1075                    Some((meta, header))
1076                });
1077                if result.is_none() {
1078                    if let Some(lock) = inner.cache_lock.as_ref() {
1079                        inner.lock = Some(lock.lock(key));
1080                    }
1081                }
1082                span.set_tag(|| trace::Tag::new("found", result.is_some()));
1083                Ok(result)
1084            }
1085            _ => panic!("wrong phase {:?}", self.phase),
1086        }
1087    }
1088
1089    /// Update variance and see if the meta matches the current variance
1090    ///
1091    /// `cache_lookup() -> compute vary hash -> cache_vary_lookup()`
1092    /// This function allows callers to compute vary based on the initial cache hit.
1093    /// `meta` should be the ones returned from the initial cache_lookup()
1094    /// - return true if the meta is the variance.
1095    /// - return false if the current meta doesn't match the variance, need to cache_lookup() again
1096    pub fn cache_vary_lookup(&mut self, variance: HashBinary, meta: &CacheMeta) -> bool {
1097        match self.phase {
1098            // Stale is allowed here because stale-> cache_lock -> lookup again
1099            CachePhase::CacheKey | CachePhase::Stale => {
1100                let inner = self.inner_mut();
1101                // make sure that all variances found are fresher than this asset
1102                // this is because when purging all the variance, only the primary slot is deleted
1103                // the created TS of the primary is the tombstone of all the variances
1104                inner.valid_after = Some(meta.created());
1105
1106                // update vary
1107                let key = inner.key.as_mut().unwrap();
1108                // if no variance was previously set, then this is the first cache hit
1109                let is_initial_cache_hit = key.get_variance_key().is_none();
1110                key.set_variance_key(variance);
1111                let variance_binary = key.variance_bin();
1112                let matches_variance = meta.variance() == variance_binary;
1113
1114                // We should remove the variance in the lookup `key` if this is the primary variant
1115                // slot. We know this is the primary variant slot if this is the initial cache hit,
1116                // AND the variance in the `key` already matches the `meta`'s.
1117                //
1118                // For the primary variant slot, the storage backend needs to use the primary key
1119                // for both cache lookup and updating the meta. Otherwise it will look for the
1120                // asset in the wrong location during revalidation.
1121                //
1122                // We can recreate the "full" cache key by using the meta's variance, if needed.
1123                if matches_variance && is_initial_cache_hit {
1124                    inner.key.as_mut().unwrap().remove_variance_key();
1125                }
1126
1127                matches_variance
1128            }
1129            _ => panic!("wrong phase {:?}", self.phase),
1130        }
1131    }
1132
1133    /// Whether this request is behind a cache lock in order to wait for another request to read the
1134    /// asset.
1135    pub fn is_cache_locked(&self) -> bool {
1136        matches!(self.inner().lock, Some(Locked::Read(_)))
1137    }
1138
1139    /// Whether this request is the leader request to fetch the assets for itself and other requests
1140    /// behind the cache lock.
1141    pub fn is_cache_lock_writer(&self) -> bool {
1142        matches!(self.inner().lock, Some(Locked::Write(_)))
1143    }
1144
1145    /// Take the write lock from this request to transfer it to another one.
1146    /// # Panic
1147    ///  Call is_cache_lock_writer() to check first, will panic otherwise.
1148    pub fn take_write_lock(&mut self) -> (WritePermit, &'static CacheKeyLockImpl) {
1149        let lock = self.inner_mut().lock.take().unwrap();
1150        match lock {
1151            Locked::Write(w) => (
1152                w,
1153                self.inner()
1154                    .cache_lock
1155                    .expect("cache lock must be set if write permit exists"),
1156            ),
1157            Locked::Read(_) => panic!("take_write_lock() called on read lock"),
1158        }
1159    }
1160
1161    /// Set the write lock, which is usually transferred from [Self::take_write_lock()]
1162    pub fn set_write_lock(&mut self, write_lock: WritePermit) {
1163        self.inner_mut().lock.replace(Locked::Write(write_lock));
1164    }
1165
1166    /// Whether this request's cache hit is staled
1167    fn has_staled_asset(&self) -> bool {
1168        matches!(self.phase, CachePhase::Stale | CachePhase::StaleUpdating)
1169    }
1170
1171    /// Whether this asset is staled and stale if error is allowed
1172    pub fn can_serve_stale_error(&self) -> bool {
1173        self.has_staled_asset() && self.cache_meta().serve_stale_if_error(SystemTime::now())
1174    }
1175
1176    /// Whether this asset is staled and stale while revalidate is allowed.
1177    pub fn can_serve_stale_updating(&self) -> bool {
1178        self.has_staled_asset()
1179            && self
1180                .cache_meta()
1181                .serve_stale_while_revalidate(SystemTime::now())
1182    }
1183
1184    /// Wait for the cache read lock to be unlocked
1185    /// # Panic
1186    /// Check [Self::is_cache_locked()], panic if this request doesn't have a read lock.
1187    pub async fn cache_lock_wait(&mut self) -> LockStatus {
1188        let inner = self.inner_mut();
1189        let mut span = inner.traces.child("cache_lock");
1190        let lock = inner.lock.take(); // remove the lock from self
1191        if let Some(Locked::Read(r)) = lock {
1192            let now = Instant::now();
1193            r.wait().await;
1194            // it's possible for a request to be locked more than once
1195            self.digest.add_lock_duration(now.elapsed());
1196            let status = r.lock_status();
1197            let tag_value: &'static str = status.into();
1198            span.set_tag(|| Tag::new("status", tag_value));
1199            status
1200        } else {
1201            // should always call is_cache_locked() before this function
1202            panic!("cache_lock_wait on wrong type of lock")
1203        }
1204    }
1205
1206    /// How long did this request wait behind the read lock
1207    pub fn lock_duration(&self) -> Option<Duration> {
1208        self.digest.lock_duration
1209    }
1210
1211    /// How long did this request spent on cache lookup and reading the header
1212    pub fn lookup_duration(&self) -> Option<Duration> {
1213        self.digest.lookup_duration
1214    }
1215
1216    /// Delete the asset from the cache storage
1217    /// # Panic
1218    /// Need to be called after the cache key is set. Panic otherwise.
1219    pub async fn purge(&mut self) -> Result<bool> {
1220        match self.phase {
1221            CachePhase::CacheKey => {
1222                let inner = self.inner_mut();
1223                let mut span = inner.traces.child("purge");
1224                let key = inner.key.as_ref().unwrap().to_compact();
1225                let result = inner
1226                    .storage
1227                    .purge(&key, PurgeType::Invalidation, &span.handle())
1228                    .await;
1229                let purged = matches!(result, Ok(true));
1230                // need to inform eviction manager if asset was removed
1231                if let Some(eviction) = inner.eviction.as_ref() {
1232                    if purged {
1233                        eviction.remove(&key);
1234                    }
1235                }
1236                span.set_tag(|| trace::Tag::new("purged", purged));
1237                result
1238            }
1239            _ => panic!("wrong phase {:?}", self.phase),
1240        }
1241    }
1242
1243    /// Check the cacheable prediction
1244    ///
1245    /// Return true if the predictor is not set
1246    pub fn cacheable_prediction(&self) -> bool {
1247        if let Some(predictor) = self.inner().predictor {
1248            predictor.cacheable_prediction(self.cache_key())
1249        } else {
1250            true
1251        }
1252    }
1253
1254    /// Tell the predictor that this response, which is previously predicted to be uncacheable,
1255    /// is cacheable now.
1256    pub fn response_became_cacheable(&self) {
1257        if let Some(predictor) = self.inner().predictor {
1258            predictor.mark_cacheable(self.cache_key());
1259        }
1260    }
1261
1262    /// Tell the predictor that this response is uncacheable so that it will know next time
1263    /// this request arrives.
1264    pub fn response_became_uncacheable(&self, reason: NoCacheReason) {
1265        if let Some(predictor) = self.inner().predictor {
1266            predictor.mark_uncacheable(self.cache_key(), reason);
1267        }
1268    }
1269
1270    /// Tag all spans as being part of a subrequest.
1271    pub fn tag_as_subrequest(&mut self) {
1272        self.inner_mut()
1273            .traces
1274            .cache_span
1275            .set_tag(|| Tag::new("is_subrequest", true))
1276    }
1277}
1278
1279/// Set the header compression dictionary, which helps serialize http header.
1280///
1281/// Return false if it is already set.
1282pub fn set_compression_dict_path(path: &str) -> bool {
1283    crate::meta::COMPRESSION_DICT_PATH
1284        .set(path.to_string())
1285        .is_ok()
1286}