Skip to main content

pingora_cache/
storage.rs

1// Copyright 2026 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Cache backend storage abstraction
16
17use super::{CacheKey, CacheMeta};
18use crate::key::CompactCacheKey;
19use crate::trace::SpanHandle;
20
21use async_trait::async_trait;
22use pingora_error::Result;
23use std::any::Any;
24
25/// The reason a purge() is called
26#[derive(Debug, Clone, Copy)]
27pub enum PurgeType {
28    // For eviction because the cache storage is full
29    Eviction,
30    // For cache invalidation
31    Invalidation,
32}
33
34/// Cache storage interface
35#[async_trait]
36pub trait Storage {
37    // TODO: shouldn't have to be static
38
39    /// Lookup the storage for the given [CacheKey].
40    async fn lookup(
41        &'static self,
42        key: &CacheKey,
43        trace: &SpanHandle,
44    ) -> Result<Option<(CacheMeta, HitHandler)>>;
45
46    /// Lookup the storage for the given [CacheKey] using a streaming write tag.
47    ///
48    /// When streaming partial writes is supported, the request that initiates the write will also
49    /// pass an optional `streaming_write_tag` so that the storage may try to find the associated
50    /// [HitHandler], for the same ongoing write.
51    ///
52    /// Therefore, when the write tag is set, the storage implementation should either return a
53    /// [HitHandler] that can be matched to that tag, or none at all. Otherwise when the storage
54    /// supports concurrent streaming writes for the same key, the calling request may receive a
55    /// different body from the one it expected.
56    ///
57    /// By default this defers to the standard `Storage::lookup` implementation.
58    async fn lookup_streaming_write(
59        &'static self,
60        key: &CacheKey,
61        _streaming_write_tag: Option<&[u8]>,
62        trace: &SpanHandle,
63    ) -> Result<Option<(CacheMeta, HitHandler)>> {
64        self.lookup(key, trace).await
65    }
66
67    /// Write the given [CacheMeta] to the storage. Return [MissHandler] to write the body later.
68    async fn get_miss_handler(
69        &'static self,
70        key: &CacheKey,
71        meta: &CacheMeta,
72        trace: &SpanHandle,
73    ) -> Result<MissHandler>;
74
75    /// Delete the cached asset for the given key
76    ///
77    /// [CompactCacheKey] is used here because it is how eviction managers store the keys
78    async fn purge(
79        &'static self,
80        key: &CompactCacheKey,
81        purge_type: PurgeType,
82        trace: &SpanHandle,
83    ) -> Result<bool>;
84
85    /// Update cache header and metadata for the already stored asset.
86    async fn update_meta(
87        &'static self,
88        key: &CacheKey,
89        meta: &CacheMeta,
90        trace: &SpanHandle,
91    ) -> Result<bool>;
92
93    /// Whether this storage backend supports reading partially written data
94    ///
95    /// This is to indicate when cache should unlock readers
96    fn support_streaming_partial_write(&self) -> bool {
97        false
98    }
99
100    /// Helper function to cast the trait object to concrete types
101    fn as_any(&self) -> &(dyn Any + Send + Sync + 'static);
102}
103
104/// Cache hit handling trait
105#[async_trait]
106pub trait HandleHit {
107    /// Read cached body
108    ///
109    /// Return `None` when no more body to read.
110    async fn read_body(&mut self) -> Result<Option<bytes::Bytes>>;
111
112    /// Finish the current cache hit
113    async fn finish(
114        self: Box<Self>, // because self is always used as a trait object
115        storage: &'static (dyn Storage + Sync),
116        key: &CacheKey,
117        trace: &SpanHandle,
118    ) -> Result<()>;
119
120    /// Whether this storage allows seeking to a certain range of body for single ranges.
121    fn can_seek(&self) -> bool {
122        false
123    }
124
125    /// Whether this storage allows seeking to a certain range of body for multipart ranges.
126    ///
127    /// By default uses the `can_seek` implementation.
128    fn can_seek_multipart(&self) -> bool {
129        self.can_seek()
130    }
131
132    /// Try to seek to a certain range of the body for single ranges.
133    ///
134    /// `end: None` means to read to the end of the body.
135    fn seek(&mut self, _start: usize, _end: Option<usize>) -> Result<()> {
136        // to prevent impl can_seek() without impl seek
137        todo!("seek() needs to be implemented")
138    }
139
140    /// Try to seek to a certain range of the body for multipart ranges.
141    ///
142    /// Works in an identical manner to `seek()`.
143    ///
144    /// `end: None` means to read to the end of the body.
145    ///
146    /// By default uses the `seek` implementation, but hit handlers may customize the
147    /// implementation specifically to anticipate multipart requests.
148    fn seek_multipart(&mut self, start: usize, end: Option<usize>) -> Result<()> {
149        // to prevent impl can_seek() without impl seek
150        self.seek(start, end)
151    }
152
153    // TODO: fn is_stream_hit()
154
155    /// Should we count this hit handler instance as an access in the eviction manager.
156    ///
157    /// Defaults to returning true to track all cache hits as accesses. Customize this if certain
158    /// hits should not affect the eviction system's view of the asset.
159    fn should_count_access(&self) -> bool {
160        true
161    }
162
163    /// Returns the weight of the current cache hit asset to report to the eviction manager.
164    ///
165    /// This allows the eviction system to initialize a weight for the asset, in case it is not
166    /// already tracking it (e.g. storage is out of sync with the eviction manager).
167    ///
168    /// Defaults to 0.
169    fn get_eviction_weight(&self) -> usize {
170        0
171    }
172
173    /// Helper function to cast the trait object to concrete types
174    fn as_any(&self) -> &(dyn Any + Send + Sync);
175
176    /// Helper function to cast the trait object to concrete types
177    fn as_any_mut(&mut self) -> &mut (dyn Any + Send + Sync);
178}
179
180/// Hit Handler
181pub type HitHandler = Box<dyn HandleHit + Sync + Send>;
182
183/// MissFinishType
184pub enum MissFinishType {
185    /// A new asset was created with the given size.
186    Created(usize),
187    /// Appended size to existing asset, with an optional max size param.
188    Appended(usize, Option<usize>),
189}
190
191/// Cache miss handling trait
192#[async_trait]
193pub trait HandleMiss {
194    /// Write the given body to the storage
195    async fn write_body(&mut self, data: bytes::Bytes, eof: bool) -> Result<()>;
196
197    /// Finish the cache admission
198    ///
199    /// When `self` is dropped without calling this function, the storage should consider this write
200    /// failed.
201    async fn finish(
202        self: Box<Self>, // because self is always used as a trait object
203    ) -> Result<MissFinishType>;
204
205    /// Return a streaming write tag recognized by the underlying [`Storage`].
206    ///
207    /// This is an arbitrary data identifier that is used to associate this miss handler's current
208    /// write with a hit handler for the same write. This identifier will be compared by the
209    /// storage during `lookup_streaming_write`.
210    // This write tag is essentially an borrowed data blob of bytes retrieved from the miss handler
211    // and passed to storage, which means it can support strings or small data types, e.g. bytes
212    // represented by a u64.
213    // The downside with the current API is that such a data blob must be owned by the miss handler
214    // and stored in a way that permits retrieval as a byte slice (not computed on the fly).
215    // But most use cases likely only require a simple integer and may not like the overhead of a
216    // Vec/String allocation or even a Cow, though such data types can also be used here.
217    fn streaming_write_tag(&self) -> Option<&[u8]> {
218        None
219    }
220}
221
222/// Miss Handler
223pub type MissHandler = Box<dyn HandleMiss + Sync + Send>;
224
225pub mod streaming_write {
226    /// Portable u64 (sized) write id convenience type for use with streaming writes.
227    ///
228    /// Often an integer value is sufficient for a streaming write tag. This convenience type enables
229    /// storing such a value and functions for consistent conversion between byte sequence data types.
230    #[derive(Debug, Clone, Copy)]
231    pub struct U64WriteId([u8; 8]);
232
233    impl U64WriteId {
234        pub fn as_bytes(&self) -> &[u8] {
235            &self.0[..]
236        }
237    }
238
239    impl From<u64> for U64WriteId {
240        fn from(value: u64) -> U64WriteId {
241            U64WriteId(value.to_be_bytes())
242        }
243    }
244    impl From<U64WriteId> for u64 {
245        fn from(value: U64WriteId) -> u64 {
246            u64::from_be_bytes(value.0)
247        }
248    }
249    impl TryFrom<&[u8]> for U64WriteId {
250        type Error = std::array::TryFromSliceError;
251
252        fn try_from(value: &[u8]) -> std::result::Result<Self, Self::Error> {
253            Ok(U64WriteId(value.try_into()?))
254        }
255    }
256
257    /// Portable u32 (sized) write id convenience type for use with streaming writes.
258    ///
259    /// Often an integer value is sufficient for a streaming write tag. This convenience type enables
260    /// storing such a value and functions for consistent conversion between byte sequence data types.
261    #[derive(Debug, Clone, Copy)]
262    pub struct U32WriteId([u8; 4]);
263
264    impl U32WriteId {
265        pub fn as_bytes(&self) -> &[u8] {
266            &self.0[..]
267        }
268    }
269
270    impl From<u32> for U32WriteId {
271        fn from(value: u32) -> U32WriteId {
272            U32WriteId(value.to_be_bytes())
273        }
274    }
275    impl From<U32WriteId> for u32 {
276        fn from(value: U32WriteId) -> u32 {
277            u32::from_be_bytes(value.0)
278        }
279    }
280    impl TryFrom<&[u8]> for U32WriteId {
281        type Error = std::array::TryFromSliceError;
282
283        fn try_from(value: &[u8]) -> std::result::Result<Self, Self::Error> {
284            Ok(U32WriteId(value.try_into()?))
285        }
286    }
287}