pingora_cache/storage.rs
1// Copyright 2025 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Cache backend storage abstraction
16
17use super::{CacheKey, CacheMeta};
18use crate::key::CompactCacheKey;
19use crate::trace::SpanHandle;
20
21use async_trait::async_trait;
22use pingora_error::Result;
23use std::any::Any;
24
25/// The reason a purge() is called
26#[derive(Debug, Clone, Copy)]
27pub enum PurgeType {
28 // For eviction because the cache storage is full
29 Eviction,
30 // For cache invalidation
31 Invalidation,
32}
33
34/// Cache storage interface
35#[async_trait]
36pub trait Storage {
37 // TODO: shouldn't have to be static
38
39 /// Lookup the storage for the given [CacheKey].
40 async fn lookup(
41 &'static self,
42 key: &CacheKey,
43 trace: &SpanHandle,
44 ) -> Result<Option<(CacheMeta, HitHandler)>>;
45
46 /// Lookup the storage for the given [CacheKey] using a streaming write tag.
47 ///
48 /// When streaming partial writes is supported, the request that initiates the write will also
49 /// pass an optional `streaming_write_tag` so that the storage may try to find the associated
50 /// [HitHandler], for the same ongoing write.
51 ///
52 /// Therefore, when the write tag is set, the storage implementation should either return a
53 /// [HitHandler] that can be matched to that tag, or none at all. Otherwise when the storage
54 /// supports concurrent streaming writes for the same key, the calling request may receive a
55 /// different body from the one it expected.
56 ///
57 /// By default this defers to the standard `Storage::lookup` implementation.
58 async fn lookup_streaming_write(
59 &'static self,
60 key: &CacheKey,
61 _streaming_write_tag: Option<&[u8]>,
62 trace: &SpanHandle,
63 ) -> Result<Option<(CacheMeta, HitHandler)>> {
64 self.lookup(key, trace).await
65 }
66
67 /// Write the given [CacheMeta] to the storage. Return [MissHandler] to write the body later.
68 async fn get_miss_handler(
69 &'static self,
70 key: &CacheKey,
71 meta: &CacheMeta,
72 trace: &SpanHandle,
73 ) -> Result<MissHandler>;
74
75 /// Delete the cached asset for the given key
76 ///
77 /// [CompactCacheKey] is used here because it is how eviction managers store the keys
78 async fn purge(
79 &'static self,
80 key: &CompactCacheKey,
81 purge_type: PurgeType,
82 trace: &SpanHandle,
83 ) -> Result<bool>;
84
85 /// Update cache header and metadata for the already stored asset.
86 async fn update_meta(
87 &'static self,
88 key: &CacheKey,
89 meta: &CacheMeta,
90 trace: &SpanHandle,
91 ) -> Result<bool>;
92
93 /// Whether this storage backend supports reading partially written data
94 ///
95 /// This is to indicate when cache should unlock readers
96 fn support_streaming_partial_write(&self) -> bool {
97 false
98 }
99
100 /// Helper function to cast the trait object to concrete types
101 fn as_any(&self) -> &(dyn Any + Send + Sync + 'static);
102}
103
104/// Cache hit handling trait
105#[async_trait]
106pub trait HandleHit {
107 /// Read cached body
108 ///
109 /// Return `None` when no more body to read.
110 async fn read_body(&mut self) -> Result<Option<bytes::Bytes>>;
111
112 /// Finish the current cache hit
113 async fn finish(
114 self: Box<Self>, // because self is always used as a trait object
115 storage: &'static (dyn Storage + Sync),
116 key: &CacheKey,
117 trace: &SpanHandle,
118 ) -> Result<()>;
119
120 /// Whether this storage allow seeking to a certain range of body
121 fn can_seek(&self) -> bool {
122 false
123 }
124
125 /// Try to seek to a certain range of the body
126 ///
127 /// `end: None` means to read to the end of the body.
128 fn seek(&mut self, _start: usize, _end: Option<usize>) -> Result<()> {
129 // to prevent impl can_seek() without impl seek
130 todo!("seek() needs to be implemented")
131 }
132 // TODO: fn is_stream_hit()
133
134 /// Should we count this hit handler instance as an access in the eviction manager.
135 ///
136 /// Defaults to returning true to track all cache hits as accesses. Customize this if certain
137 /// hits should not affect the eviction system's view of the asset.
138 fn should_count_access(&self) -> bool {
139 true
140 }
141
142 /// Returns the weight of the current cache hit asset to report to the eviction manager.
143 ///
144 /// This allows the eviction system to initialize a weight for the asset, in case it is not
145 /// already tracking it (e.g. storage is out of sync with the eviction manager).
146 ///
147 /// Defaults to 0.
148 fn get_eviction_weight(&self) -> usize {
149 0
150 }
151
152 /// Helper function to cast the trait object to concrete types
153 fn as_any(&self) -> &(dyn Any + Send + Sync);
154
155 /// Helper function to cast the trait object to concrete types
156 fn as_any_mut(&mut self) -> &mut (dyn Any + Send + Sync);
157}
158
159/// Hit Handler
160pub type HitHandler = Box<(dyn HandleHit + Sync + Send)>;
161
162/// MissFinishType
163pub enum MissFinishType {
164 Created(usize),
165 Appended(usize),
166}
167
168/// Cache miss handling trait
169#[async_trait]
170pub trait HandleMiss {
171 /// Write the given body to the storage
172 async fn write_body(&mut self, data: bytes::Bytes, eof: bool) -> Result<()>;
173
174 /// Finish the cache admission
175 ///
176 /// When `self` is dropped without calling this function, the storage should consider this write
177 /// failed.
178 async fn finish(
179 self: Box<Self>, // because self is always used as a trait object
180 ) -> Result<MissFinishType>;
181
182 /// Return a streaming write tag recognized by the underlying [`Storage`].
183 ///
184 /// This is an arbitrary data identifier that is used to associate this miss handler's current
185 /// write with a hit handler for the same write. This identifier will be compared by the
186 /// storage during `lookup_streaming_write`.
187 // This write tag is essentially an borrowed data blob of bytes retrieved from the miss handler
188 // and passed to storage, which means it can support strings or small data types, e.g. bytes
189 // represented by a u64.
190 // The downside with the current API is that such a data blob must be owned by the miss handler
191 // and stored in a way that permits retrieval as a byte slice (not computed on the fly).
192 // But most use cases likely only require a simple integer and may not like the overhead of a
193 // Vec/String allocation or even a Cow, though such data types can also be used here.
194 fn streaming_write_tag(&self) -> Option<&[u8]> {
195 None
196 }
197}
198
199/// Miss Handler
200pub type MissHandler = Box<(dyn HandleMiss + Sync + Send)>;
201
202pub mod streaming_write {
203 /// Portable u64 (sized) write id convenience type for use with streaming writes.
204 ///
205 /// Often an integer value is sufficient for a streaming write tag. This convenience type enables
206 /// storing such a value and functions for consistent conversion between byte sequence data types.
207 #[derive(Debug, Clone, Copy)]
208 pub struct U64WriteId([u8; 8]);
209
210 impl U64WriteId {
211 pub fn as_bytes(&self) -> &[u8] {
212 &self.0[..]
213 }
214 }
215
216 impl From<u64> for U64WriteId {
217 fn from(value: u64) -> U64WriteId {
218 U64WriteId(value.to_be_bytes())
219 }
220 }
221 impl From<U64WriteId> for u64 {
222 fn from(value: U64WriteId) -> u64 {
223 u64::from_be_bytes(value.0)
224 }
225 }
226 impl TryFrom<&[u8]> for U64WriteId {
227 type Error = std::array::TryFromSliceError;
228
229 fn try_from(value: &[u8]) -> std::result::Result<Self, Self::Error> {
230 Ok(U64WriteId(value.try_into()?))
231 }
232 }
233
234 /// Portable u32 (sized) write id convenience type for use with streaming writes.
235 ///
236 /// Often an integer value is sufficient for a streaming write tag. This convenience type enables
237 /// storing such a value and functions for consistent conversion between byte sequence data types.
238 #[derive(Debug, Clone, Copy)]
239 pub struct U32WriteId([u8; 4]);
240
241 impl U32WriteId {
242 pub fn as_bytes(&self) -> &[u8] {
243 &self.0[..]
244 }
245 }
246
247 impl From<u32> for U32WriteId {
248 fn from(value: u32) -> U32WriteId {
249 U32WriteId(value.to_be_bytes())
250 }
251 }
252 impl From<U32WriteId> for u32 {
253 fn from(value: U32WriteId) -> u32 {
254 u32::from_be_bytes(value.0)
255 }
256 }
257 impl TryFrom<&[u8]> for U32WriteId {
258 type Error = std::array::TryFromSliceError;
259
260 fn try_from(value: &[u8]) -> std::result::Result<Self, Self::Error> {
261 Ok(U32WriteId(value.try_into()?))
262 }
263 }
264}