pingora_cache/storage.rs
1// Copyright 2026 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Cache backend storage abstraction
16
17use super::{CacheKey, CacheMeta};
18use crate::key::CompactCacheKey;
19use crate::trace::SpanHandle;
20
21use async_trait::async_trait;
22use pingora_error::Result;
23use std::any::Any;
24
25/// The reason a purge() is called
26#[derive(Debug, Clone, Copy)]
27pub enum PurgeType {
28 // For eviction because the cache storage is full
29 Eviction,
30 // For cache invalidation
31 Invalidation,
32}
33
34/// Cache storage interface
35#[async_trait]
36pub trait Storage {
37 // TODO: shouldn't have to be static
38
39 /// Lookup the storage for the given [CacheKey].
40 async fn lookup(
41 &'static self,
42 key: &CacheKey,
43 trace: &SpanHandle,
44 ) -> Result<Option<(CacheMeta, HitHandler)>>;
45
46 /// Lookup the storage for the given [CacheKey] using a streaming write tag.
47 ///
48 /// When streaming partial writes is supported, the request that initiates the write will also
49 /// pass an optional `streaming_write_tag` so that the storage may try to find the associated
50 /// [HitHandler], for the same ongoing write.
51 ///
52 /// Therefore, when the write tag is set, the storage implementation should either return a
53 /// [HitHandler] that can be matched to that tag, or none at all. Otherwise when the storage
54 /// supports concurrent streaming writes for the same key, the calling request may receive a
55 /// different body from the one it expected.
56 ///
57 /// By default this defers to the standard `Storage::lookup` implementation.
58 async fn lookup_streaming_write(
59 &'static self,
60 key: &CacheKey,
61 _streaming_write_tag: Option<&[u8]>,
62 trace: &SpanHandle,
63 ) -> Result<Option<(CacheMeta, HitHandler)>> {
64 self.lookup(key, trace).await
65 }
66
67 /// Write the given [CacheMeta] to the storage. Return [MissHandler] to write the body later.
68 async fn get_miss_handler(
69 &'static self,
70 key: &CacheKey,
71 meta: &CacheMeta,
72 trace: &SpanHandle,
73 ) -> Result<MissHandler>;
74
75 /// Delete the cached asset for the given key
76 ///
77 /// [CompactCacheKey] is used here because it is how eviction managers store the keys
78 async fn purge(
79 &'static self,
80 key: &CompactCacheKey,
81 purge_type: PurgeType,
82 trace: &SpanHandle,
83 ) -> Result<bool>;
84
85 /// Update cache header and metadata for the already stored asset.
86 async fn update_meta(
87 &'static self,
88 key: &CacheKey,
89 meta: &CacheMeta,
90 trace: &SpanHandle,
91 ) -> Result<bool>;
92
93 /// Whether this storage backend supports reading partially written data
94 ///
95 /// This is to indicate when cache should unlock readers
96 fn support_streaming_partial_write(&self) -> bool {
97 false
98 }
99
100 /// Helper function to cast the trait object to concrete types
101 fn as_any(&self) -> &(dyn Any + Send + Sync + 'static);
102}
103
104/// Cache hit handling trait
105#[async_trait]
106pub trait HandleHit {
107 /// Read cached body
108 ///
109 /// Return `None` when no more body to read.
110 async fn read_body(&mut self) -> Result<Option<bytes::Bytes>>;
111
112 /// Finish the current cache hit
113 async fn finish(
114 self: Box<Self>, // because self is always used as a trait object
115 storage: &'static (dyn Storage + Sync),
116 key: &CacheKey,
117 trace: &SpanHandle,
118 ) -> Result<()>;
119
120 /// Whether this storage allows seeking to a certain range of body for single ranges.
121 fn can_seek(&self) -> bool {
122 false
123 }
124
125 /// Whether this storage allows seeking to a certain range of body for multipart ranges.
126 ///
127 /// By default uses the `can_seek` implementation.
128 fn can_seek_multipart(&self) -> bool {
129 self.can_seek()
130 }
131
132 /// Try to seek to a certain range of the body for single ranges.
133 ///
134 /// `end: None` means to read to the end of the body.
135 fn seek(&mut self, _start: usize, _end: Option<usize>) -> Result<()> {
136 // to prevent impl can_seek() without impl seek
137 todo!("seek() needs to be implemented")
138 }
139
140 /// Try to seek to a certain range of the body for multipart ranges.
141 ///
142 /// Works in an identical manner to `seek()`.
143 ///
144 /// `end: None` means to read to the end of the body.
145 ///
146 /// By default uses the `seek` implementation, but hit handlers may customize the
147 /// implementation specifically to anticipate multipart requests.
148 fn seek_multipart(&mut self, start: usize, end: Option<usize>) -> Result<()> {
149 // to prevent impl can_seek() without impl seek
150 self.seek(start, end)
151 }
152
153 // TODO: fn is_stream_hit()
154
155 /// Should we count this hit handler instance as an access in the eviction manager.
156 ///
157 /// Defaults to returning true to track all cache hits as accesses. Customize this if certain
158 /// hits should not affect the eviction system's view of the asset.
159 fn should_count_access(&self) -> bool {
160 true
161 }
162
163 /// Returns the weight of the current cache hit asset to report to the eviction manager.
164 ///
165 /// This allows the eviction system to initialize a weight for the asset, in case it is not
166 /// already tracking it (e.g. storage is out of sync with the eviction manager).
167 ///
168 /// Defaults to 0.
169 fn get_eviction_weight(&self) -> usize {
170 0
171 }
172
173 /// Helper function to cast the trait object to concrete types
174 fn as_any(&self) -> &(dyn Any + Send + Sync);
175
176 /// Helper function to cast the trait object to concrete types
177 fn as_any_mut(&mut self) -> &mut (dyn Any + Send + Sync);
178}
179
180/// Hit Handler
181pub type HitHandler = Box<dyn HandleHit + Sync + Send>;
182
183/// MissFinishType
184pub enum MissFinishType {
185 /// A new asset was created with the given size.
186 Created(usize),
187 /// Appended size to existing asset, with an optional max size param.
188 Appended(usize, Option<usize>),
189}
190
191/// Cache miss handling trait
192#[async_trait]
193pub trait HandleMiss {
194 /// Write the given body to the storage
195 async fn write_body(&mut self, data: bytes::Bytes, eof: bool) -> Result<()>;
196
197 /// Finish the cache admission
198 ///
199 /// When `self` is dropped without calling this function, the storage should consider this write
200 /// failed.
201 async fn finish(
202 self: Box<Self>, // because self is always used as a trait object
203 ) -> Result<MissFinishType>;
204
205 /// Return a streaming write tag recognized by the underlying [`Storage`].
206 ///
207 /// This is an arbitrary data identifier that is used to associate this miss handler's current
208 /// write with a hit handler for the same write. This identifier will be compared by the
209 /// storage during `lookup_streaming_write`.
210 // This write tag is essentially an borrowed data blob of bytes retrieved from the miss handler
211 // and passed to storage, which means it can support strings or small data types, e.g. bytes
212 // represented by a u64.
213 // The downside with the current API is that such a data blob must be owned by the miss handler
214 // and stored in a way that permits retrieval as a byte slice (not computed on the fly).
215 // But most use cases likely only require a simple integer and may not like the overhead of a
216 // Vec/String allocation or even a Cow, though such data types can also be used here.
217 fn streaming_write_tag(&self) -> Option<&[u8]> {
218 None
219 }
220}
221
222/// Miss Handler
223pub type MissHandler = Box<dyn HandleMiss + Sync + Send>;
224
225pub mod streaming_write {
226 /// Portable u64 (sized) write id convenience type for use with streaming writes.
227 ///
228 /// Often an integer value is sufficient for a streaming write tag. This convenience type enables
229 /// storing such a value and functions for consistent conversion between byte sequence data types.
230 #[derive(Debug, Clone, Copy)]
231 pub struct U64WriteId([u8; 8]);
232
233 impl U64WriteId {
234 pub fn as_bytes(&self) -> &[u8] {
235 &self.0[..]
236 }
237 }
238
239 impl From<u64> for U64WriteId {
240 fn from(value: u64) -> U64WriteId {
241 U64WriteId(value.to_be_bytes())
242 }
243 }
244 impl From<U64WriteId> for u64 {
245 fn from(value: U64WriteId) -> u64 {
246 u64::from_be_bytes(value.0)
247 }
248 }
249 impl TryFrom<&[u8]> for U64WriteId {
250 type Error = std::array::TryFromSliceError;
251
252 fn try_from(value: &[u8]) -> std::result::Result<Self, Self::Error> {
253 Ok(U64WriteId(value.try_into()?))
254 }
255 }
256
257 /// Portable u32 (sized) write id convenience type for use with streaming writes.
258 ///
259 /// Often an integer value is sufficient for a streaming write tag. This convenience type enables
260 /// storing such a value and functions for consistent conversion between byte sequence data types.
261 #[derive(Debug, Clone, Copy)]
262 pub struct U32WriteId([u8; 4]);
263
264 impl U32WriteId {
265 pub fn as_bytes(&self) -> &[u8] {
266 &self.0[..]
267 }
268 }
269
270 impl From<u32> for U32WriteId {
271 fn from(value: u32) -> U32WriteId {
272 U32WriteId(value.to_be_bytes())
273 }
274 }
275 impl From<U32WriteId> for u32 {
276 fn from(value: U32WriteId) -> u32 {
277 u32::from_be_bytes(value.0)
278 }
279 }
280 impl TryFrom<&[u8]> for U32WriteId {
281 type Error = std::array::TryFromSliceError;
282
283 fn try_from(value: &[u8]) -> std::result::Result<Self, Self::Error> {
284 Ok(U32WriteId(value.try_into()?))
285 }
286 }
287}