1use std::{future::Future, sync::Arc};
9
10use async_trait::async_trait;
11use bytes::Bytes;
12use hitbox_core::{
13 BackendLabel, BoxContext, CacheKey, CacheStatus, CacheValue, Cacheable, CacheableResponse, Raw,
14 ReadMode, ResponseSource,
15};
16
17use crate::{
18 BackendError, CacheKeyFormat, Compressor, PassthroughCompressor,
19 format::{BincodeFormat, Format, FormatExt},
20 metrics::Timer,
21};
22
23#[derive(Debug, PartialEq, Eq)]
25pub enum DeleteStatus {
26 Deleted(u32),
34
35 Missing,
37}
38
39pub type BackendResult<T> = Result<T, BackendError>;
41
42pub type UnsyncBackend = dyn Backend + Send;
44
45pub type SyncBackend = dyn Backend + Send + Sync;
47
48#[async_trait]
61pub trait Backend: Sync + Send {
62 async fn read(&self, key: &CacheKey) -> BackendResult<Option<CacheValue<Raw>>>;
66
67 async fn write(&self, key: &CacheKey, value: CacheValue<Raw>) -> BackendResult<()>;
69
70 async fn remove(&self, key: &CacheKey) -> BackendResult<DeleteStatus>;
72
73 fn label(&self) -> BackendLabel {
78 BackendLabel::new_static("backend")
79 }
80
81 fn value_format(&self) -> &dyn Format {
83 &BincodeFormat
84 }
85
86 fn key_format(&self) -> &CacheKeyFormat {
88 &CacheKeyFormat::Bitcode
89 }
90
91 fn compressor(&self) -> &dyn Compressor {
93 &PassthroughCompressor
94 }
95}
96
97#[async_trait]
98impl Backend for &dyn Backend {
99 async fn read(&self, key: &CacheKey) -> BackendResult<Option<CacheValue<Raw>>> {
100 (*self).read(key).await
101 }
102
103 async fn write(&self, key: &CacheKey, value: CacheValue<Raw>) -> BackendResult<()> {
104 (*self).write(key, value).await
105 }
106
107 async fn remove(&self, key: &CacheKey) -> BackendResult<DeleteStatus> {
108 (*self).remove(key).await
109 }
110
111 fn label(&self) -> BackendLabel {
112 (*self).label()
113 }
114
115 fn value_format(&self) -> &dyn Format {
116 (*self).value_format()
117 }
118
119 fn key_format(&self) -> &CacheKeyFormat {
120 (*self).key_format()
121 }
122
123 fn compressor(&self) -> &dyn Compressor {
124 (*self).compressor()
125 }
126}
127
128#[async_trait]
129impl Backend for Box<dyn Backend> {
130 async fn read(&self, key: &CacheKey) -> BackendResult<Option<CacheValue<Raw>>> {
131 (**self).read(key).await
132 }
133
134 async fn write(&self, key: &CacheKey, value: CacheValue<Raw>) -> BackendResult<()> {
135 (**self).write(key, value).await
136 }
137
138 async fn remove(&self, key: &CacheKey) -> BackendResult<DeleteStatus> {
139 (**self).remove(key).await
140 }
141
142 fn label(&self) -> BackendLabel {
143 (**self).label()
144 }
145
146 fn value_format(&self) -> &dyn Format {
147 (**self).value_format()
148 }
149
150 fn key_format(&self) -> &CacheKeyFormat {
151 (**self).key_format()
152 }
153
154 fn compressor(&self) -> &dyn Compressor {
155 (**self).compressor()
156 }
157}
158
159#[async_trait]
160impl Backend for Arc<UnsyncBackend> {
161 async fn read(&self, key: &CacheKey) -> BackendResult<Option<CacheValue<Raw>>> {
162 (**self).read(key).await
163 }
164
165 async fn write(&self, key: &CacheKey, value: CacheValue<Raw>) -> BackendResult<()> {
166 (**self).write(key, value).await
167 }
168
169 async fn remove(&self, key: &CacheKey) -> BackendResult<DeleteStatus> {
170 (**self).remove(key).await
171 }
172
173 fn label(&self) -> BackendLabel {
174 (**self).label()
175 }
176
177 fn value_format(&self) -> &dyn Format {
178 (**self).value_format()
179 }
180
181 fn key_format(&self) -> &CacheKeyFormat {
182 (**self).key_format()
183 }
184
185 fn compressor(&self) -> &dyn Compressor {
186 (**self).compressor()
187 }
188}
189
190#[async_trait]
191impl Backend for Arc<SyncBackend> {
192 async fn read(&self, key: &CacheKey) -> BackendResult<Option<CacheValue<Raw>>> {
193 (**self).read(key).await
194 }
195
196 async fn write(&self, key: &CacheKey, value: CacheValue<Raw>) -> BackendResult<()> {
197 (**self).write(key, value).await
198 }
199
200 async fn remove(&self, key: &CacheKey) -> BackendResult<DeleteStatus> {
201 (**self).remove(key).await
202 }
203
204 fn label(&self) -> BackendLabel {
205 (**self).label()
206 }
207
208 fn value_format(&self) -> &dyn Format {
209 (**self).value_format()
210 }
211
212 fn key_format(&self) -> &CacheKeyFormat {
213 (**self).key_format()
214 }
215
216 fn compressor(&self) -> &dyn Compressor {
217 (**self).compressor()
218 }
219}
220
221pub trait CacheBackend: Backend {
240 fn get<T>(
245 &self,
246 key: &CacheKey,
247 ctx: &mut BoxContext,
248 ) -> impl Future<Output = BackendResult<Option<CacheValue<T::Cached>>>> + Send
249 where
250 T: CacheableResponse,
251 T::Cached: Cacheable,
252 {
253 async move {
254 let backend_label = self.label();
255
256 let read_timer = Timer::new();
257 let read_result = self.read(key).await;
258 crate::metrics::record_read(backend_label.as_str(), read_timer.elapsed());
259
260 match read_result {
261 Ok(Some(value)) => {
262 let (meta, raw_data) = value.into_parts();
263 let raw_len = raw_data.len();
264 crate::metrics::record_read_bytes(backend_label.as_str(), raw_len);
265
266 let format = self.value_format();
267
268 let decompress_timer = Timer::new();
269 let decompressed = self.compressor().decompress(&raw_data)?;
270 crate::metrics::record_decompress(
271 backend_label.as_str(),
272 decompress_timer.elapsed(),
273 );
274
275 let decompressed_bytes = Bytes::from(decompressed);
276
277 let deserialize_timer = Timer::new();
279 let mut deserialized_opt: Option<T::Cached> = None;
280 format.with_deserializer(
281 &decompressed_bytes,
282 &mut |deserializer| {
283 let value: T::Cached = deserializer.deserialize()?;
284 deserialized_opt = Some(value);
285 Ok(())
286 },
287 ctx,
288 )?;
289 crate::metrics::record_deserialize(
290 backend_label.as_str(),
291 deserialize_timer.elapsed(),
292 );
293
294 let deserialized = deserialized_opt.ok_or_else(|| {
295 BackendError::InternalError(Box::new(std::io::Error::other(
296 "deserialization produced no result",
297 )))
298 })?;
299
300 let cached_value = CacheValue::new(deserialized, meta.expire, meta.stale);
301
302 if ctx.read_mode() == ReadMode::Refill {
305 let _ = self.set::<T>(key, &cached_value, ctx).await;
306 }
307
308 ctx.set_status(CacheStatus::Hit);
309 ctx.set_source(ResponseSource::Backend(backend_label));
310 Ok(Some(cached_value))
311 }
312 Ok(None) => Ok(None),
313 Err(e) => {
314 crate::metrics::record_read_error(backend_label.as_str());
315 Err(e)
316 }
317 }
318 }
319 }
320
321 fn set<T>(
326 &self,
327 key: &CacheKey,
328 value: &CacheValue<T::Cached>,
329 ctx: &mut BoxContext,
330 ) -> impl Future<Output = BackendResult<()>> + Send
331 where
332 T: CacheableResponse,
333 T::Cached: Cacheable,
334 {
335 async move {
336 if ctx.read_mode() == ReadMode::Refill {
340 return Ok(());
341 }
342
343 let backend_label = self.label();
344 let format = self.value_format();
345
346 let serialize_timer = Timer::new();
347 let serialized_value = format.serialize(value.data(), &**ctx)?;
348 crate::metrics::record_serialize(backend_label.as_str(), serialize_timer.elapsed());
349
350 let compress_timer = Timer::new();
351 let compressed_value = self.compressor().compress(&serialized_value)?;
352 crate::metrics::record_compress(backend_label.as_str(), compress_timer.elapsed());
353
354 let compressed_len = compressed_value.len();
355
356 let write_timer = Timer::new();
357 let result = self
358 .write(
359 key,
360 CacheValue::new(Bytes::from(compressed_value), value.expire(), value.stale()),
361 )
362 .await;
363 crate::metrics::record_write(backend_label.as_str(), write_timer.elapsed());
364
365 match result {
366 Ok(()) => {
367 crate::metrics::record_write_bytes(backend_label.as_str(), compressed_len);
368 Ok(())
369 }
370 Err(e) => {
371 crate::metrics::record_write_error(backend_label.as_str());
372 Err(e)
373 }
374 }
375 }
376 }
377
378 fn delete(
382 &self,
383 key: &CacheKey,
384 _ctx: &mut BoxContext,
385 ) -> impl Future<Output = BackendResult<DeleteStatus>> + Send {
386 async move { self.remove(key).await }
387 }
388}
389
390impl CacheBackend for &dyn Backend {}
393
394impl CacheBackend for Box<dyn Backend> {}
395
396impl CacheBackend for Arc<UnsyncBackend> {}
397impl CacheBackend for Arc<SyncBackend> {}