pingora_cache/lib.rs
1// Copyright 2025 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The HTTP caching layer for proxies.
16
17#![allow(clippy::new_without_default)]
18
19use cf_rustracing::tag::Tag;
20use http::{method::Method, request::Parts as ReqHeader, response::Parts as RespHeader};
21use key::{CacheHashKey, HashBinary};
22use lock::WritePermit;
23use log::warn;
24use pingora_error::Result;
25use pingora_http::ResponseHeader;
26use std::time::{Duration, Instant, SystemTime};
27use storage::MissFinishType;
28use strum::IntoStaticStr;
29use trace::CacheTraceCTX;
30
31pub mod cache_control;
32pub mod eviction;
33pub mod filters;
34pub mod hashtable;
35pub mod key;
36pub mod lock;
37pub mod max_file_size;
38mod memory;
39pub mod meta;
40pub mod predictor;
41pub mod put;
42pub mod storage;
43pub mod trace;
44mod variance;
45
46use crate::max_file_size::MaxFileSizeMissHandler;
47pub use key::CacheKey;
48use lock::{CacheKeyLockImpl, LockStatus, Locked};
49pub use memory::MemCache;
50pub use meta::{CacheMeta, CacheMetaDefaults};
51pub use storage::{HitHandler, MissHandler, PurgeType, Storage};
52pub use variance::VarianceBuilder;
53
54pub mod prelude {}
55
56/// The state machine for http caching
57///
58/// This object is used to handle the state and transitions for HTTP caching through the life of a
59/// request.
60pub struct HttpCache {
61 phase: CachePhase,
62 // Box the rest so that a disabled HttpCache struct is small
63 inner: Option<Box<HttpCacheInner>>,
64 digest: HttpCacheDigest,
65}
66
67/// This reflects the phase of HttpCache during the lifetime of a request
68#[derive(Clone, Copy, Debug, PartialEq, Eq)]
69pub enum CachePhase {
70 /// Cache disabled, with reason (NeverEnabled if never explicitly used)
71 Disabled(NoCacheReason),
72 /// Cache enabled but nothing is set yet
73 Uninit,
74 /// Cache was enabled, the request decided not to use it
75 // HttpCache.inner is kept
76 Bypass,
77 /// Awaiting the cache key to be generated
78 CacheKey,
79 /// Cache hit
80 Hit,
81 /// No cached asset is found
82 Miss,
83 /// A staled (expired) asset is found
84 Stale,
85 /// A staled (expired) asset was found, but another request is revalidating it
86 StaleUpdating,
87 /// A staled (expired) asset was found, so a fresh one was fetched
88 Expired,
89 /// A staled (expired) asset was found, and it was revalidated to be fresh
90 Revalidated,
91 /// Revalidated, but deemed uncacheable, so we do not freshen it
92 RevalidatedNoCache(NoCacheReason),
93}
94
95impl CachePhase {
96 /// Convert [CachePhase] as `str`, for logging and debugging.
97 pub fn as_str(&self) -> &'static str {
98 match self {
99 CachePhase::Disabled(_) => "disabled",
100 CachePhase::Uninit => "uninitialized",
101 CachePhase::Bypass => "bypass",
102 CachePhase::CacheKey => "key",
103 CachePhase::Hit => "hit",
104 CachePhase::Miss => "miss",
105 CachePhase::Stale => "stale",
106 CachePhase::StaleUpdating => "stale-updating",
107 CachePhase::Expired => "expired",
108 CachePhase::Revalidated => "revalidated",
109 CachePhase::RevalidatedNoCache(_) => "revalidated-nocache",
110 }
111 }
112}
113
114/// The possible reasons for not caching
115#[derive(Copy, Clone, Debug, PartialEq, Eq)]
116pub enum NoCacheReason {
117 /// Caching is not enabled to begin with
118 NeverEnabled,
119 /// Origin directives indicated this was not cacheable
120 OriginNotCache,
121 /// Response size was larger than the cache's configured maximum asset size
122 ResponseTooLarge,
123 /// Due to internal caching storage error
124 StorageError,
125 /// Due to other types of internal issues
126 InternalError,
127 /// will be cacheable but skip cache admission now
128 ///
129 /// This happens when the cache predictor predicted that this request is not cacheable, but
130 /// the response turns out to be OK to cache. However, it might be too large to re-enable caching
131 /// for this request
132 Deferred,
133 /// Due to the proxy upstream filter declining the current request from going upstream
134 DeclinedToUpstream,
135 /// Due to the upstream being unreachable or otherwise erroring during proxying
136 UpstreamError,
137 /// The writer of the cache lock sees that the request is not cacheable (Could be OriginNotCache)
138 CacheLockGiveUp,
139 /// This request waited too long for the writer of the cache lock to finish, so this request will
140 /// fetch from the origin without caching
141 CacheLockTimeout,
142 /// Other custom defined reasons
143 Custom(&'static str),
144}
145
146impl NoCacheReason {
147 /// Convert [NoCacheReason] as `str`, for logging and debugging.
148 pub fn as_str(&self) -> &'static str {
149 use NoCacheReason::*;
150 match self {
151 NeverEnabled => "NeverEnabled",
152 OriginNotCache => "OriginNotCache",
153 ResponseTooLarge => "ResponseTooLarge",
154 StorageError => "StorageError",
155 InternalError => "InternalError",
156 Deferred => "Deferred",
157 DeclinedToUpstream => "DeclinedToUpstream",
158 UpstreamError => "UpstreamError",
159 CacheLockGiveUp => "CacheLockGiveUp",
160 CacheLockTimeout => "CacheLockTimeout",
161 Custom(s) => s,
162 }
163 }
164}
165
166/// Information collected about the caching operation that will not be cleared
167#[derive(Debug, Default)]
168pub struct HttpCacheDigest {
169 pub lock_duration: Option<Duration>,
170 // time spent in cache lookup and reading the header
171 pub lookup_duration: Option<Duration>,
172}
173
174/// Convenience function to add a duration to an optional duration
175fn add_duration_to_opt(target_opt: &mut Option<Duration>, to_add: Duration) {
176 *target_opt = Some(target_opt.map_or(to_add, |existing| existing + to_add));
177}
178
179impl HttpCacheDigest {
180 fn add_lookup_duration(&mut self, extra_lookup_duration: Duration) {
181 add_duration_to_opt(&mut self.lookup_duration, extra_lookup_duration)
182 }
183
184 fn add_lock_duration(&mut self, extra_lock_duration: Duration) {
185 add_duration_to_opt(&mut self.lock_duration, extra_lock_duration)
186 }
187}
188
189/// Response cacheable decision
190///
191///
192#[derive(Debug)]
193pub enum RespCacheable {
194 Cacheable(CacheMeta),
195 Uncacheable(NoCacheReason),
196}
197
198impl RespCacheable {
199 /// Whether it is cacheable
200 #[inline]
201 pub fn is_cacheable(&self) -> bool {
202 matches!(*self, Self::Cacheable(_))
203 }
204
205 /// Unwrap [RespCacheable] to get the [CacheMeta] stored
206 /// # Panic
207 /// Panic when this object is not cacheable. Check [Self::is_cacheable()] first.
208 pub fn unwrap_meta(self) -> CacheMeta {
209 match self {
210 Self::Cacheable(meta) => meta,
211 Self::Uncacheable(_) => panic!("expected Cacheable value"),
212 }
213 }
214}
215
216/// Indicators of which level of purge logic to apply to an asset. As in should
217/// the purged file be revalidated or re-retrieved altogether
218#[derive(Debug, Clone, Copy, PartialEq, Eq)]
219pub enum ForcedInvalidationKind {
220 /// Indicates the asset should be considered stale and revalidated
221 ForceExpired,
222
223 /// Indicates the asset should be considered absent and treated like a miss
224 /// instead of a hit
225 ForceMiss,
226}
227
228/// Freshness state of cache hit asset
229///
230///
231#[derive(Debug, Copy, Clone, IntoStaticStr, PartialEq, Eq)]
232#[strum(serialize_all = "snake_case")]
233pub enum HitStatus {
234 /// The asset's freshness directives indicate it has expired
235 Expired,
236
237 /// The asset was marked as expired, and should be treated as stale
238 ForceExpired,
239
240 /// The asset was marked as absent, and should be treated as a miss
241 ForceMiss,
242
243 /// An error occurred while processing the asset, so it should be treated as
244 /// a miss
245 FailedHitFilter,
246
247 /// The asset is not expired
248 Fresh,
249}
250
251impl HitStatus {
252 /// For displaying cache hit status
253 pub fn as_str(&self) -> &'static str {
254 self.into()
255 }
256
257 /// Whether cached asset can be served as fresh
258 pub fn is_fresh(&self) -> bool {
259 *self == HitStatus::Fresh
260 }
261
262 /// Check whether the hit status should be treated as a miss. A forced miss
263 /// is obviously treated as a miss. A hit-filter failure is treated as a
264 /// miss because we can't use the asset as an actual hit. If we treat it as
265 /// expired, we still might not be able to use it even if revalidation
266 /// succeeds.
267 pub fn is_treated_as_miss(self) -> bool {
268 matches!(self, HitStatus::ForceMiss | HitStatus::FailedHitFilter)
269 }
270}
271
272struct HttpCacheInner {
273 pub key: Option<CacheKey>,
274 pub meta: Option<CacheMeta>,
275 // when set, even if an asset exists, it would only be considered valid after this timestamp
276 pub valid_after: Option<SystemTime>,
277 // when set, an asset will be rejected from the cache if it exceeds this size in bytes
278 pub max_file_size_bytes: Option<usize>,
279 pub miss_handler: Option<MissHandler>,
280 pub body_reader: Option<HitHandler>,
281 pub storage: &'static (dyn storage::Storage + Sync), // static for now
282 pub eviction: Option<&'static (dyn eviction::EvictionManager + Sync)>,
283 pub predictor: Option<&'static (dyn predictor::CacheablePredictor + Sync)>,
284 pub lock: Option<Locked>, // TODO: these 3 fields should come in 1 sub struct
285 pub cache_lock: Option<&'static CacheKeyLockImpl>,
286 pub traces: trace::CacheTraceCTX,
287}
288
289impl HttpCache {
290 /// Create a new [HttpCache].
291 ///
292 /// Caching is not enabled by default.
293 pub fn new() -> Self {
294 HttpCache {
295 phase: CachePhase::Disabled(NoCacheReason::NeverEnabled),
296 inner: None,
297 digest: HttpCacheDigest::default(),
298 }
299 }
300
301 /// Whether the cache is enabled
302 pub fn enabled(&self) -> bool {
303 !matches!(self.phase, CachePhase::Disabled(_) | CachePhase::Bypass)
304 }
305
306 /// Whether the cache is being bypassed
307 pub fn bypassing(&self) -> bool {
308 matches!(self.phase, CachePhase::Bypass)
309 }
310
311 /// Return the [CachePhase]
312 pub fn phase(&self) -> CachePhase {
313 self.phase
314 }
315
316 /// Whether anything was fetched from the upstream
317 ///
318 /// This essentially checks all possible [CachePhase] who need to contact the upstream server
319 pub fn upstream_used(&self) -> bool {
320 use CachePhase::*;
321 match self.phase {
322 Disabled(_) | Bypass | Miss | Expired | Revalidated | RevalidatedNoCache(_) => true,
323 Hit | Stale | StaleUpdating => false,
324 Uninit | CacheKey => false, // invalid states for this call, treat them as false to keep it simple
325 }
326 }
327
328 /// Check whether the backend storage is the type `T`.
329 pub fn storage_type_is<T: 'static>(&self) -> bool {
330 self.inner
331 .as_ref()
332 .and_then(|inner| inner.storage.as_any().downcast_ref::<T>())
333 .is_some()
334 }
335
336 /// Release the cache lock if the current request is a cache writer.
337 ///
338 /// Generally callers should prefer using `disable` when a cache lock should be released
339 /// due to an error to clear all cache context. This function is for releasing the cache lock
340 /// while still keeping the cache around for reading, e.g. when serving stale.
341 pub fn release_write_lock(&mut self, reason: NoCacheReason) {
342 use NoCacheReason::*;
343 if let Some(inner) = self.inner.as_mut() {
344 let lock = inner.lock.take();
345 if let Some(Locked::Write(permit)) = lock {
346 let lock_status = match reason {
347 // let the next request try to fetch it
348 InternalError | StorageError | Deferred | UpstreamError => {
349 LockStatus::TransientError
350 }
351 // depends on why the proxy upstream filter declined the request,
352 // for now still allow next request try to acquire to avoid thundering herd
353 DeclinedToUpstream => LockStatus::TransientError,
354 // no need for the lock anymore
355 OriginNotCache | ResponseTooLarge => LockStatus::GiveUp,
356 // not sure which LockStatus make sense, we treat it as GiveUp for now
357 Custom(_) => LockStatus::GiveUp,
358 // should never happen, NeverEnabled shouldn't hold a lock
359 NeverEnabled => panic!("NeverEnabled holds a write lock"),
360 CacheLockGiveUp | CacheLockTimeout => {
361 panic!("CacheLock* are for cache lock readers only")
362 }
363 };
364 inner
365 .cache_lock
366 .unwrap()
367 .release(inner.key.as_ref().unwrap(), permit, lock_status);
368 }
369 }
370 }
371
372 /// Disable caching
373 pub fn disable(&mut self, reason: NoCacheReason) {
374 match self.phase {
375 CachePhase::Disabled(_) => {
376 // replace reason
377 self.phase = CachePhase::Disabled(reason);
378 }
379 _ => {
380 self.phase = CachePhase::Disabled(reason);
381 self.release_write_lock(reason);
382 // log initial disable reason
383 self.inner_mut()
384 .traces
385 .cache_span
386 .set_tag(|| trace::Tag::new("disable_reason", reason.as_str()));
387 self.inner = None;
388 }
389 }
390 }
391
392 /* The following methods panic when they are used in the wrong phase.
393 * This is better than returning errors as such panics are only caused by coding error, which
394 * should be fixed right away. Tokio runtime only crashes the current task instead of the whole
395 * program when these panics happen. */
396
397 /// Set the cache to bypass
398 ///
399 /// # Panic
400 /// This call is only allowed in [CachePhase::CacheKey] phase (before any cache lookup is performed).
401 /// Use it in any other phase will lead to panic.
402 pub fn bypass(&mut self) {
403 match self.phase {
404 CachePhase::CacheKey => {
405 // before cache lookup / found / miss
406 self.phase = CachePhase::Bypass;
407 self.inner_mut()
408 .traces
409 .cache_span
410 .set_tag(|| trace::Tag::new("bypassed", true));
411 }
412 _ => panic!("wrong phase to bypass HttpCache {:?}", self.phase),
413 }
414 }
415
416 /// Enable the cache
417 ///
418 /// - `storage`: the cache storage backend that implements [storage::Storage]
419 /// - `eviction`: optionally the eviction manager, without it, nothing will be evicted from the storage
420 /// - `predictor`: optionally a cache predictor. The cache predictor predicts whether something is likely
421 /// to be cacheable or not. This is useful because the proxy can apply different types of optimization to
422 /// cacheable and uncacheable requests.
423 /// - `cache_lock`: optionally a cache lock which handles concurrent lookups to the same asset. Without it
424 /// such lookups will all be allowed to fetch the asset independently.
425 pub fn enable(
426 &mut self,
427 storage: &'static (dyn storage::Storage + Sync),
428 eviction: Option<&'static (dyn eviction::EvictionManager + Sync)>,
429 predictor: Option<&'static (dyn predictor::CacheablePredictor + Sync)>,
430 cache_lock: Option<&'static CacheKeyLockImpl>,
431 ) {
432 match self.phase {
433 CachePhase::Disabled(_) => {
434 self.phase = CachePhase::Uninit;
435 self.inner = Some(Box::new(HttpCacheInner {
436 key: None,
437 meta: None,
438 valid_after: None,
439 max_file_size_bytes: None,
440 miss_handler: None,
441 body_reader: None,
442 storage,
443 eviction,
444 predictor,
445 lock: None,
446 cache_lock,
447 traces: CacheTraceCTX::new(),
448 }));
449 }
450 _ => panic!("Cannot enable already enabled HttpCache {:?}", self.phase),
451 }
452 }
453
454 // Enable distributed tracing
455 pub fn enable_tracing(&mut self, parent_span: trace::Span) {
456 if let Some(inner) = self.inner.as_mut() {
457 inner.traces.enable(parent_span);
458 }
459 }
460
461 // Get the cache parent tracing span
462 pub fn get_cache_span(&self) -> Option<trace::SpanHandle> {
463 self.inner.as_ref().map(|i| i.traces.get_cache_span())
464 }
465
466 // Get the cache `miss` tracing span
467 pub fn get_miss_span(&self) -> Option<trace::SpanHandle> {
468 self.inner.as_ref().map(|i| i.traces.get_miss_span())
469 }
470
471 // Get the cache `hit` tracing span
472 pub fn get_hit_span(&self) -> Option<trace::SpanHandle> {
473 self.inner.as_ref().map(|i| i.traces.get_hit_span())
474 }
475
476 // shortcut to access inner, panic if phase is disabled
477 #[inline]
478 fn inner_mut(&mut self) -> &mut HttpCacheInner {
479 self.inner.as_mut().unwrap()
480 }
481
482 #[inline]
483 fn inner(&self) -> &HttpCacheInner {
484 self.inner.as_ref().unwrap()
485 }
486
487 /// Set the cache key
488 /// # Panic
489 /// Cache key is only allowed to be set in its own phase. Set it in other phases will cause panic.
490 pub fn set_cache_key(&mut self, key: CacheKey) {
491 match self.phase {
492 CachePhase::Uninit | CachePhase::CacheKey => {
493 self.phase = CachePhase::CacheKey;
494 self.inner_mut().key = Some(key);
495 }
496 _ => panic!("wrong phase {:?}", self.phase),
497 }
498 }
499
500 /// Return the cache key used for asset lookup
501 /// # Panic
502 /// Can only be called after the cache key is set and the cache is not disabled. Panic otherwise.
503 pub fn cache_key(&self) -> &CacheKey {
504 match self.phase {
505 CachePhase::Disabled(_) | CachePhase::Uninit => panic!("wrong phase {:?}", self.phase),
506 _ => self.inner().key.as_ref().unwrap(),
507 }
508 }
509
510 /// Return the max size allowed to be cached.
511 pub fn max_file_size_bytes(&self) -> Option<usize> {
512 match self.phase {
513 CachePhase::Disabled(_) | CachePhase::Uninit => panic!("wrong phase {:?}", self.phase),
514 _ => self.inner().max_file_size_bytes,
515 }
516 }
517
518 /// Set the maximum response _body_ size in bytes that will be admitted to the cache.
519 ///
520 /// Response header size does not contribute to the max file size.
521 pub fn set_max_file_size_bytes(&mut self, max_file_size_bytes: usize) {
522 match self.phase {
523 CachePhase::Disabled(_) => panic!("wrong phase {:?}", self.phase),
524 _ => {
525 self.inner_mut().max_file_size_bytes = Some(max_file_size_bytes);
526 }
527 }
528 }
529
530 /// Set that cache is found in cache storage.
531 ///
532 /// This function is called after [Self::cache_lookup()] which returns the [CacheMeta] and
533 /// [HitHandler].
534 ///
535 /// The `hit_status` enum allows the caller to force expire assets.
536 pub fn cache_found(&mut self, meta: CacheMeta, hit_handler: HitHandler, hit_status: HitStatus) {
537 // Stale allowed because of cache lock and then retry
538 if !matches!(self.phase, CachePhase::CacheKey | CachePhase::Stale) {
539 panic!("wrong phase {:?}", self.phase)
540 }
541
542 self.phase = match hit_status {
543 HitStatus::Fresh => CachePhase::Hit,
544 HitStatus::Expired | HitStatus::ForceExpired => CachePhase::Stale,
545 HitStatus::FailedHitFilter | HitStatus::ForceMiss => self.phase,
546 };
547
548 let phase = self.phase;
549 let inner = self.inner_mut();
550
551 let key = inner.key.as_ref().unwrap();
552
553 // The cache lock might not be set for stale hit or hits treated as
554 // misses, so we need to initialize it here
555 if phase == CachePhase::Stale || hit_status.is_treated_as_miss() {
556 if let Some(lock) = inner.cache_lock.as_ref() {
557 inner.lock = Some(lock.lock(key));
558 }
559 }
560
561 if hit_status.is_treated_as_miss() {
562 // Clear the body and meta for hits that are treated as misses
563 inner.body_reader = None;
564 inner.meta = None;
565 } else {
566 // Set the metadata appropriately for legit hits
567 inner.traces.start_hit_span(phase, hit_status);
568 inner.traces.log_meta_in_hit_span(&meta);
569 if let Some(eviction) = inner.eviction {
570 // TODO: make access() accept CacheKey
571 let cache_key = key.to_compact();
572 if hit_handler.should_count_access() {
573 let size = hit_handler.get_eviction_weight();
574 eviction.access(&cache_key, size, meta.0.internal.fresh_until);
575 }
576 }
577 inner.meta = Some(meta);
578 inner.body_reader = Some(hit_handler);
579 }
580 }
581
582 /// Mark `self` to be cache miss.
583 ///
584 /// This function is called after [Self::cache_lookup()] finds nothing or the caller decides
585 /// not to use the assets found.
586 /// # Panic
587 /// Panic in other phases.
588 pub fn cache_miss(&mut self) {
589 match self.phase {
590 // from CacheKey: set state to miss during cache lookup
591 // from Bypass: response became cacheable, set state to miss to cache
592 // from Stale: waited for cache lock, then retried and found asset was gone
593 CachePhase::CacheKey | CachePhase::Bypass | CachePhase::Stale => {
594 self.phase = CachePhase::Miss;
595 // It's possible that we've set the meta on lookup and have come back around
596 // here after not being able to acquire the cache lock, and our item has since
597 // purged or expired. We should be sure that the meta is not set in this case
598 // as there shouldn't be a meta set for cache misses.
599 self.inner_mut().meta = None;
600 self.inner_mut().traces.start_miss_span();
601 }
602 _ => panic!("wrong phase {:?}", self.phase),
603 }
604 }
605
606 /// Return the [HitHandler]
607 /// # Panic
608 /// Call this after [Self::cache_found()], panic in other phases.
609 pub fn hit_handler(&mut self) -> &mut HitHandler {
610 match self.phase {
611 CachePhase::Hit
612 | CachePhase::Stale
613 | CachePhase::StaleUpdating
614 | CachePhase::Revalidated
615 | CachePhase::RevalidatedNoCache(_) => self.inner_mut().body_reader.as_mut().unwrap(),
616 _ => panic!("wrong phase {:?}", self.phase),
617 }
618 }
619
620 /// Return the body reader during a cache admission (miss/expired) which decouples the downstream
621 /// read and upstream cache write
622 pub fn miss_body_reader(&mut self) -> Option<&mut HitHandler> {
623 match self.phase {
624 CachePhase::Miss | CachePhase::Expired => {
625 let inner = self.inner_mut();
626 if inner.storage.support_streaming_partial_write() {
627 inner.body_reader.as_mut()
628 } else {
629 // body_reader could be set even when the storage doesn't support streaming
630 // Expired cache would have the reader set.
631 None
632 }
633 }
634 _ => None,
635 }
636 }
637
638 /// Call this when cache hit is fully read.
639 ///
640 /// This call will release resource if any and log the timing in tracing if set.
641 /// # Panic
642 /// Panic in phases where there is no cache hit.
643 pub async fn finish_hit_handler(&mut self) -> Result<()> {
644 match self.phase {
645 CachePhase::Hit
646 | CachePhase::Miss
647 | CachePhase::Expired
648 | CachePhase::Stale
649 | CachePhase::StaleUpdating
650 | CachePhase::Revalidated
651 | CachePhase::RevalidatedNoCache(_) => {
652 let inner = self.inner_mut();
653 if inner.body_reader.is_none() {
654 // already finished, we allow calling this function more than once
655 return Ok(());
656 }
657 let body_reader = inner.body_reader.take().unwrap();
658 let key = inner.key.as_ref().unwrap();
659 let result = body_reader
660 .finish(inner.storage, key, &inner.traces.hit_span.handle())
661 .await;
662 inner.traces.finish_hit_span();
663 result
664 }
665 _ => panic!("wrong phase {:?}", self.phase),
666 }
667 }
668
669 /// Set the [MissHandler] according to cache_key and meta, can only call once
670 pub async fn set_miss_handler(&mut self) -> Result<()> {
671 match self.phase {
672 // set_miss_handler() needs to be called after set_cache_meta() (which change Stale to Expire).
673 // This is an artificial rule to enforce the state transitions
674 CachePhase::Miss | CachePhase::Expired => {
675 let max_file_size_bytes = self.max_file_size_bytes();
676
677 let inner = self.inner_mut();
678 if inner.miss_handler.is_some() {
679 panic!("write handler is already set")
680 }
681 let meta = inner.meta.as_ref().unwrap();
682 let key = inner.key.as_ref().unwrap();
683 let miss_handler = inner
684 .storage
685 .get_miss_handler(key, meta, &inner.traces.get_miss_span())
686 .await?;
687
688 inner.miss_handler = if let Some(max_size) = max_file_size_bytes {
689 Some(Box::new(MaxFileSizeMissHandler::new(
690 miss_handler,
691 max_size,
692 )))
693 } else {
694 Some(miss_handler)
695 };
696
697 if inner.storage.support_streaming_partial_write() {
698 // If a reader can access partial write, the cache lock can be released here
699 // to let readers start reading the body.
700 let lock = inner.lock.take();
701 if let Some(Locked::Write(permit)) = lock {
702 inner
703 .cache_lock
704 .expect("must have cache lock to have write permit")
705 .release(key, permit, LockStatus::Done);
706 }
707 // Downstream read and upstream write can be decoupled
708 let body_reader = inner
709 .storage
710 .lookup_streaming_write(
711 key,
712 inner
713 .miss_handler
714 .as_ref()
715 .expect("miss handler already set")
716 .streaming_write_tag(),
717 &inner.traces.get_miss_span(),
718 )
719 .await?;
720
721 if let Some((_meta, body_reader)) = body_reader {
722 inner.body_reader = Some(body_reader);
723 } else {
724 // body_reader should exist now because streaming_partial_write is to support it
725 panic!("unable to get body_reader for {:?}", meta);
726 }
727 }
728 Ok(())
729 }
730 _ => panic!("wrong phase {:?}", self.phase),
731 }
732 }
733
734 /// Return the [MissHandler] to write the response body to cache.
735 ///
736 /// `None`: the handler has not been set or already finished
737 pub fn miss_handler(&mut self) -> Option<&mut MissHandler> {
738 match self.phase {
739 CachePhase::Miss | CachePhase::Expired => self.inner_mut().miss_handler.as_mut(),
740 _ => panic!("wrong phase {:?}", self.phase),
741 }
742 }
743
744 /// Finish cache admission
745 ///
746 /// If [self] is dropped without calling this, the cache admission is considered incomplete and
747 /// should be cleaned up.
748 ///
749 /// This call will also trigger eviction if set.
750 pub async fn finish_miss_handler(&mut self) -> Result<()> {
751 match self.phase {
752 CachePhase::Miss | CachePhase::Expired => {
753 let inner = self.inner_mut();
754 if inner.miss_handler.is_none() {
755 // already finished, we allow calling this function more than once
756 return Ok(());
757 }
758 let miss_handler = inner.miss_handler.take().unwrap();
759 let finish = miss_handler.finish().await?;
760 let lock = inner.lock.take();
761 let key = inner.key.as_ref().unwrap();
762 if let Some(Locked::Write(permit)) = lock {
763 // no need to call r.unlock() because release() will call it
764 // r is a guard to make sure the lock is unlocked when this request is dropped
765 inner
766 .cache_lock
767 .unwrap()
768 .release(key, permit, LockStatus::Done);
769 }
770 if let Some(eviction) = inner.eviction {
771 let cache_key = key.to_compact();
772 let meta = inner.meta.as_ref().unwrap();
773 let evicted = match finish {
774 MissFinishType::Created(size) => {
775 eviction.admit(cache_key, size, meta.0.internal.fresh_until)
776 }
777 MissFinishType::Appended(size) => {
778 eviction.increment_weight(cache_key, size)
779 }
780 };
781 // actual eviction can be done async
782 let span = inner.traces.child("eviction");
783 let handle = span.handle();
784 let storage = inner.storage;
785 tokio::task::spawn(async move {
786 for item in evicted {
787 if let Err(e) = storage.purge(&item, PurgeType::Eviction, &handle).await
788 {
789 warn!("Failed to purge {item} during eviction for finish miss handler: {e}");
790 }
791 }
792 });
793 }
794 inner.traces.finish_miss_span();
795 Ok(())
796 }
797 _ => panic!("wrong phase {:?}", self.phase),
798 }
799 }
800
801 /// Set the [CacheMeta] of the cache
802 pub fn set_cache_meta(&mut self, meta: CacheMeta) {
803 match self.phase {
804 // TODO: store the staled meta somewhere else for future use?
805 CachePhase::Stale | CachePhase::Miss => {
806 let inner = self.inner_mut();
807 // TODO: have a separate expired span?
808 inner.traces.log_meta_in_miss_span(&meta);
809 inner.meta = Some(meta);
810 }
811 _ => panic!("wrong phase {:?}", self.phase),
812 }
813 if self.phase == CachePhase::Stale {
814 self.phase = CachePhase::Expired;
815 }
816 }
817
818 /// Set the [CacheMeta] of the cache after revalidation.
819 ///
820 /// Certain info such as the original cache admission time will be preserved. Others will
821 /// be replaced by the input `meta`.
822 pub async fn revalidate_cache_meta(&mut self, mut meta: CacheMeta) -> Result<bool> {
823 let result = match self.phase {
824 CachePhase::Stale => {
825 let inner = self.inner_mut();
826 // TODO: we should keep old meta in place, just use new one to update it
827 // that requires cacheable_filter to take a mut header and just return InternalMeta
828
829 // update new meta with old meta's created time
830 let old_meta = inner.meta.take().unwrap();
831 let created = old_meta.0.internal.created;
832 meta.0.internal.created = created;
833 // meta.internal.updated was already set to new meta's `created`,
834 // no need to set `updated` here
835 // Merge old extensions with new ones. New exts take precedence if they conflict.
836 let mut extensions = old_meta.0.extensions;
837 extensions.extend(meta.0.extensions);
838 meta.0.extensions = extensions;
839
840 inner.meta.replace(meta);
841
842 let lock = inner.lock.take();
843 if let Some(Locked::Write(permit)) = lock {
844 inner.cache_lock.unwrap().release(
845 inner.key.as_ref().unwrap(),
846 permit,
847 LockStatus::Done,
848 );
849 }
850
851 let mut span = inner.traces.child("update_meta");
852 // TODO: this call can be async
853 let result = inner
854 .storage
855 .update_meta(
856 inner.key.as_ref().unwrap(),
857 inner.meta.as_ref().unwrap(),
858 &span.handle(),
859 )
860 .await;
861 span.set_tag(|| trace::Tag::new("updated", result.is_ok()));
862 result
863 }
864 _ => panic!("wrong phase {:?}", self.phase),
865 };
866 self.phase = CachePhase::Revalidated;
867 result
868 }
869
870 /// After a successful revalidation, update certain headers for the cached asset
871 /// such as `Etag` with the fresh response header `resp`.
872 pub fn revalidate_merge_header(&mut self, resp: &RespHeader) -> ResponseHeader {
873 match self.phase {
874 CachePhase::Stale => {
875 /*
876 * https://datatracker.ietf.org/doc/html/rfc9110#section-15.4.5
877 * 304 response MUST generate ... would have been sent in a 200 ...
878 * - Content-Location, Date, ETag, and Vary
879 * - Cache-Control and Expires...
880 */
881 let mut old_header = self.inner().meta.as_ref().unwrap().0.header.clone();
882 let mut clone_header = |header_name: &'static str| {
883 for (i, value) in resp.headers.get_all(header_name).iter().enumerate() {
884 if i == 0 {
885 old_header
886 .insert_header(header_name, value)
887 .expect("can add valid header");
888 } else {
889 old_header
890 .append_header(header_name, value)
891 .expect("can add valid header");
892 }
893 }
894 };
895 clone_header("cache-control");
896 clone_header("expires");
897 clone_header("cache-tag");
898 clone_header("cdn-cache-control");
899 clone_header("etag");
900 // https://datatracker.ietf.org/doc/html/rfc9111#section-4.3.4
901 // "...cache MUST update its header fields with the header fields provided in the 304..."
902 // But if the Vary header changes, the cached response may no longer match the
903 // incoming request.
904 //
905 // For simplicity, ignore changing Vary in revalidation for now.
906 // TODO: if we support vary during revalidation, there are a few edge cases to
907 // consider (what if Vary header appears/disappears/changes)?
908 //
909 // clone_header("vary");
910 old_header
911 }
912 _ => panic!("wrong phase {:?}", self.phase),
913 }
914 }
915
916 /// Mark this asset uncacheable after revalidation
917 pub fn revalidate_uncacheable(&mut self, header: ResponseHeader, reason: NoCacheReason) {
918 match self.phase {
919 CachePhase::Stale => {
920 // replace cache meta header
921 self.inner_mut().meta.as_mut().unwrap().0.header = header;
922 // upstream request done, release write lock
923 self.release_write_lock(reason);
924 }
925 _ => panic!("wrong phase {:?}", self.phase),
926 }
927 self.phase = CachePhase::RevalidatedNoCache(reason);
928 // TODO: remove this asset from cache once finished?
929 }
930
931 /// Mark this asset as stale, but being updated separately from this request.
932 pub fn set_stale_updating(&mut self) {
933 match self.phase {
934 CachePhase::Stale => self.phase = CachePhase::StaleUpdating,
935 _ => panic!("wrong phase {:?}", self.phase),
936 }
937 }
938
939 /// Update the variance of the [CacheMeta].
940 ///
941 /// Note that this process may change the lookup `key`, and eventually (when the asset is
942 /// written to storage) invalidate other cached variants under the same primary key as the
943 /// current asset.
944 pub fn update_variance(&mut self, variance: Option<HashBinary>) {
945 // If this is a cache miss, we will simply update the variance in the meta.
946 //
947 // If this is an expired response, we will have to consider a few cases:
948 //
949 // **Case 1**: Variance was absent, but caller sets it now.
950 // We will just insert it into the meta. The current asset becomes the primary variant.
951 // Because the current location of the asset is already the primary variant, nothing else
952 // needs to be done.
953 //
954 // **Case 2**: Variance was present, but it changed or was removed.
955 // We want the current asset to take over the primary slot, in order to invalidate all
956 // other variants derived under the old Vary.
957 //
958 // **Case 3**: Variance did not change.
959 // Nothing needs to happen.
960 let inner = match self.phase {
961 CachePhase::Miss | CachePhase::Expired => self.inner_mut(),
962 _ => panic!("wrong phase {:?}", self.phase),
963 };
964
965 // Update the variance in the meta
966 if let Some(variance_hash) = variance.as_ref() {
967 inner
968 .meta
969 .as_mut()
970 .unwrap()
971 .set_variance_key(*variance_hash);
972 } else {
973 inner.meta.as_mut().unwrap().remove_variance();
974 }
975
976 // Change the lookup `key` if necessary, in order to admit asset into the primary slot
977 // instead of the secondary slot.
978 let key = inner.key.as_ref().unwrap();
979 if let Some(old_variance) = key.get_variance_key().as_ref() {
980 // This is a secondary variant slot.
981 if Some(*old_variance) != variance.as_ref() {
982 // This new variance does not match the variance in the cache key we used to look
983 // up this asset.
984 // Drop the cache lock to avoid leaving a dangling lock
985 // (because we locked with the old cache key for the secondary slot)
986 // TODO: maybe we should try to signal waiting readers to compete for the primary key
987 // lock instead? we will not be modifying this secondary slot so it's not actually
988 // ready for readers
989 if let Some(Locked::Write(permit)) = inner.lock.take() {
990 inner
991 .cache_lock
992 .unwrap()
993 .release(key, permit, LockStatus::Done);
994 }
995 // Remove the `variance` from the `key`, so that we admit this asset into the
996 // primary slot. (`key` is used to tell storage where to write the data.)
997 inner.key.as_mut().unwrap().remove_variance_key();
998 }
999 }
1000 }
1001
1002 /// Return the [CacheMeta] of this asset
1003 ///
1004 /// # Panic
1005 /// Panic in phases which has no cache meta.
1006 pub fn cache_meta(&self) -> &CacheMeta {
1007 match self.phase {
1008 // TODO: allow in Bypass phase?
1009 CachePhase::Stale
1010 | CachePhase::StaleUpdating
1011 | CachePhase::Expired
1012 | CachePhase::Hit
1013 | CachePhase::Revalidated
1014 | CachePhase::RevalidatedNoCache(_) => self.inner().meta.as_ref().unwrap(),
1015 CachePhase::Miss => {
1016 // this is the async body read case, safe because body_reader is only set
1017 // after meta is retrieved
1018 if self.inner().body_reader.is_some() {
1019 self.inner().meta.as_ref().unwrap()
1020 } else {
1021 panic!("wrong phase {:?}", self.phase);
1022 }
1023 }
1024
1025 _ => panic!("wrong phase {:?}", self.phase),
1026 }
1027 }
1028
1029 /// Return the [CacheMeta] of this asset if any
1030 ///
1031 /// Different from [Self::cache_meta()], this function is allowed to be called in
1032 /// [CachePhase::Miss] phase where the cache meta maybe set.
1033 /// # Panic
1034 /// Panic in phases that shouldn't have cache meta.
1035 pub fn maybe_cache_meta(&self) -> Option<&CacheMeta> {
1036 match self.phase {
1037 CachePhase::Miss
1038 | CachePhase::Stale
1039 | CachePhase::StaleUpdating
1040 | CachePhase::Expired
1041 | CachePhase::Hit
1042 | CachePhase::Revalidated
1043 | CachePhase::RevalidatedNoCache(_) => self.inner().meta.as_ref(),
1044 _ => panic!("wrong phase {:?}", self.phase),
1045 }
1046 }
1047
1048 /// Perform the cache lookup from the given cache storage with the given cache key
1049 ///
1050 /// A cache hit will return [CacheMeta] which contains the header and meta info about
1051 /// the cache as well as a [HitHandler] to read the cache hit body.
1052 /// # Panic
1053 /// Panic in other phases.
1054 pub async fn cache_lookup(&mut self) -> Result<Option<(CacheMeta, HitHandler)>> {
1055 match self.phase {
1056 // Stale is allowed here because stale-> cache_lock -> lookup again
1057 CachePhase::CacheKey | CachePhase::Stale => {
1058 let inner = self
1059 .inner
1060 .as_mut()
1061 .expect("Cache phase is checked and should have inner");
1062 let mut span = inner.traces.child("lookup");
1063 let key = inner.key.as_ref().unwrap(); // safe, this phase should have cache key
1064 let now = Instant::now();
1065 let result = inner.storage.lookup(key, &span.handle()).await?;
1066 // one request may have multiple lookups
1067 self.digest.add_lookup_duration(now.elapsed());
1068 let result = result.and_then(|(meta, header)| {
1069 if let Some(ts) = inner.valid_after {
1070 if meta.created() < ts {
1071 span.set_tag(|| trace::Tag::new("not valid", true));
1072 return None;
1073 }
1074 }
1075 Some((meta, header))
1076 });
1077 if result.is_none() {
1078 if let Some(lock) = inner.cache_lock.as_ref() {
1079 inner.lock = Some(lock.lock(key));
1080 }
1081 }
1082 span.set_tag(|| trace::Tag::new("found", result.is_some()));
1083 Ok(result)
1084 }
1085 _ => panic!("wrong phase {:?}", self.phase),
1086 }
1087 }
1088
1089 /// Update variance and see if the meta matches the current variance
1090 ///
1091 /// `cache_lookup() -> compute vary hash -> cache_vary_lookup()`
1092 /// This function allows callers to compute vary based on the initial cache hit.
1093 /// `meta` should be the ones returned from the initial cache_lookup()
1094 /// - return true if the meta is the variance.
1095 /// - return false if the current meta doesn't match the variance, need to cache_lookup() again
1096 pub fn cache_vary_lookup(&mut self, variance: HashBinary, meta: &CacheMeta) -> bool {
1097 match self.phase {
1098 // Stale is allowed here because stale-> cache_lock -> lookup again
1099 CachePhase::CacheKey | CachePhase::Stale => {
1100 let inner = self.inner_mut();
1101 // make sure that all variances found are fresher than this asset
1102 // this is because when purging all the variance, only the primary slot is deleted
1103 // the created TS of the primary is the tombstone of all the variances
1104 inner.valid_after = Some(meta.created());
1105
1106 // update vary
1107 let key = inner.key.as_mut().unwrap();
1108 // if no variance was previously set, then this is the first cache hit
1109 let is_initial_cache_hit = key.get_variance_key().is_none();
1110 key.set_variance_key(variance);
1111 let variance_binary = key.variance_bin();
1112 let matches_variance = meta.variance() == variance_binary;
1113
1114 // We should remove the variance in the lookup `key` if this is the primary variant
1115 // slot. We know this is the primary variant slot if this is the initial cache hit,
1116 // AND the variance in the `key` already matches the `meta`'s.
1117 //
1118 // For the primary variant slot, the storage backend needs to use the primary key
1119 // for both cache lookup and updating the meta. Otherwise it will look for the
1120 // asset in the wrong location during revalidation.
1121 //
1122 // We can recreate the "full" cache key by using the meta's variance, if needed.
1123 if matches_variance && is_initial_cache_hit {
1124 inner.key.as_mut().unwrap().remove_variance_key();
1125 }
1126
1127 matches_variance
1128 }
1129 _ => panic!("wrong phase {:?}", self.phase),
1130 }
1131 }
1132
1133 /// Whether this request is behind a cache lock in order to wait for another request to read the
1134 /// asset.
1135 pub fn is_cache_locked(&self) -> bool {
1136 matches!(self.inner().lock, Some(Locked::Read(_)))
1137 }
1138
1139 /// Whether this request is the leader request to fetch the assets for itself and other requests
1140 /// behind the cache lock.
1141 pub fn is_cache_lock_writer(&self) -> bool {
1142 matches!(self.inner().lock, Some(Locked::Write(_)))
1143 }
1144
1145 /// Take the write lock from this request to transfer it to another one.
1146 /// # Panic
1147 /// Call is_cache_lock_writer() to check first, will panic otherwise.
1148 pub fn take_write_lock(&mut self) -> (WritePermit, &'static CacheKeyLockImpl) {
1149 let lock = self.inner_mut().lock.take().unwrap();
1150 match lock {
1151 Locked::Write(w) => (
1152 w,
1153 self.inner()
1154 .cache_lock
1155 .expect("cache lock must be set if write permit exists"),
1156 ),
1157 Locked::Read(_) => panic!("take_write_lock() called on read lock"),
1158 }
1159 }
1160
1161 /// Set the write lock, which is usually transferred from [Self::take_write_lock()]
1162 pub fn set_write_lock(&mut self, write_lock: WritePermit) {
1163 self.inner_mut().lock.replace(Locked::Write(write_lock));
1164 }
1165
1166 /// Whether this request's cache hit is staled
1167 fn has_staled_asset(&self) -> bool {
1168 matches!(self.phase, CachePhase::Stale | CachePhase::StaleUpdating)
1169 }
1170
1171 /// Whether this asset is staled and stale if error is allowed
1172 pub fn can_serve_stale_error(&self) -> bool {
1173 self.has_staled_asset() && self.cache_meta().serve_stale_if_error(SystemTime::now())
1174 }
1175
1176 /// Whether this asset is staled and stale while revalidate is allowed.
1177 pub fn can_serve_stale_updating(&self) -> bool {
1178 self.has_staled_asset()
1179 && self
1180 .cache_meta()
1181 .serve_stale_while_revalidate(SystemTime::now())
1182 }
1183
1184 /// Wait for the cache read lock to be unlocked
1185 /// # Panic
1186 /// Check [Self::is_cache_locked()], panic if this request doesn't have a read lock.
1187 pub async fn cache_lock_wait(&mut self) -> LockStatus {
1188 let inner = self.inner_mut();
1189 let mut span = inner.traces.child("cache_lock");
1190 let lock = inner.lock.take(); // remove the lock from self
1191 if let Some(Locked::Read(r)) = lock {
1192 let now = Instant::now();
1193 r.wait().await;
1194 // it's possible for a request to be locked more than once
1195 self.digest.add_lock_duration(now.elapsed());
1196 let status = r.lock_status();
1197 let tag_value: &'static str = status.into();
1198 span.set_tag(|| Tag::new("status", tag_value));
1199 status
1200 } else {
1201 // should always call is_cache_locked() before this function
1202 panic!("cache_lock_wait on wrong type of lock")
1203 }
1204 }
1205
1206 /// How long did this request wait behind the read lock
1207 pub fn lock_duration(&self) -> Option<Duration> {
1208 self.digest.lock_duration
1209 }
1210
1211 /// How long did this request spent on cache lookup and reading the header
1212 pub fn lookup_duration(&self) -> Option<Duration> {
1213 self.digest.lookup_duration
1214 }
1215
1216 /// Delete the asset from the cache storage
1217 /// # Panic
1218 /// Need to be called after the cache key is set. Panic otherwise.
1219 pub async fn purge(&mut self) -> Result<bool> {
1220 match self.phase {
1221 CachePhase::CacheKey => {
1222 let inner = self.inner_mut();
1223 let mut span = inner.traces.child("purge");
1224 let key = inner.key.as_ref().unwrap().to_compact();
1225 let result = inner
1226 .storage
1227 .purge(&key, PurgeType::Invalidation, &span.handle())
1228 .await;
1229 let purged = matches!(result, Ok(true));
1230 // need to inform eviction manager if asset was removed
1231 if let Some(eviction) = inner.eviction.as_ref() {
1232 if purged {
1233 eviction.remove(&key);
1234 }
1235 }
1236 span.set_tag(|| trace::Tag::new("purged", purged));
1237 result
1238 }
1239 _ => panic!("wrong phase {:?}", self.phase),
1240 }
1241 }
1242
1243 /// Check the cacheable prediction
1244 ///
1245 /// Return true if the predictor is not set
1246 pub fn cacheable_prediction(&self) -> bool {
1247 if let Some(predictor) = self.inner().predictor {
1248 predictor.cacheable_prediction(self.cache_key())
1249 } else {
1250 true
1251 }
1252 }
1253
1254 /// Tell the predictor that this response, which is previously predicted to be uncacheable,
1255 /// is cacheable now.
1256 pub fn response_became_cacheable(&self) {
1257 if let Some(predictor) = self.inner().predictor {
1258 predictor.mark_cacheable(self.cache_key());
1259 }
1260 }
1261
1262 /// Tell the predictor that this response is uncacheable so that it will know next time
1263 /// this request arrives.
1264 pub fn response_became_uncacheable(&self, reason: NoCacheReason) {
1265 if let Some(predictor) = self.inner().predictor {
1266 predictor.mark_uncacheable(self.cache_key(), reason);
1267 }
1268 }
1269
1270 /// Tag all spans as being part of a subrequest.
1271 pub fn tag_as_subrequest(&mut self) {
1272 self.inner_mut()
1273 .traces
1274 .cache_span
1275 .set_tag(|| Tag::new("is_subrequest", true))
1276 }
1277}
1278
1279/// Set the header compression dictionary, which helps serialize http header.
1280///
1281/// Return false if it is already set.
1282pub fn set_compression_dict_path(path: &str) -> bool {
1283 crate::meta::COMPRESSION_DICT_PATH
1284 .set(path.to_string())
1285 .is_ok()
1286}