1use super::{get_cache_backend, is_cache_backend_init, LOG_CATEGORY};
16use super::{Error, Result, PAGE_SIZE};
17use async_trait::async_trait;
18use bytes::{Buf, BufMut, Bytes, BytesMut};
19use pingap_core::Error as ServiceError;
20use pingap_core::SimpleServiceTaskFuture;
21use pingora::cache::key::{CacheHashKey, CompactCacheKey};
22use pingora::cache::storage::MissFinishType;
23use pingora::cache::storage::{HandleHit, HandleMiss};
24use pingora::cache::trace::SpanHandle;
25use pingora::cache::{
26 CacheKey, CacheMeta, HitHandler, MissHandler, PurgeType, Storage,
27};
28use std::any::Any;
29use std::sync::Arc;
30use std::time::{Duration, SystemTime};
31use tracing::info;
32
33type BinaryMeta = (Vec<u8>, Vec<u8>);
34
35#[derive(Debug, Clone, Default, PartialEq)]
37pub struct CacheObject {
38 pub meta: BinaryMeta,
40 pub body: Bytes,
42}
43
44static MAX_OBJECT_CACHE_SIZE: usize = 10 * 1024 * PAGE_SIZE;
46
47impl CacheObject {
48 pub fn get_weight(&self) -> u16 {
49 let size = self.body.len() + self.meta.0.len() + self.meta.1.len();
50 if size <= PAGE_SIZE {
51 return 1;
52 }
53 if size >= MAX_OBJECT_CACHE_SIZE {
54 return u16::MAX;
55 }
56 (size / PAGE_SIZE) as u16
57 }
58}
59
60const META_SIZE_LENGTH: usize = 8;
61
62impl From<Bytes> for CacheObject {
69 fn from(value: Bytes) -> Self {
70 if value.len() < META_SIZE_LENGTH {
72 return Self::default();
73 }
74 let mut data = value;
75
76 let meta0_size = data.get_u32() as usize;
77 let meta1_size = data.get_u32() as usize;
78
79 let meta0 = data.split_to(meta0_size).to_vec();
80 let meta1 = data.split_to(meta1_size).to_vec();
81
82 Self {
83 meta: (meta0, meta1),
84 body: data,
85 }
86 }
87}
88impl From<CacheObject> for Bytes {
95 fn from(value: CacheObject) -> Self {
96 let meta_size =
97 value.meta.0.len() + value.meta.1.len() + META_SIZE_LENGTH;
98 let mut buf = BytesMut::with_capacity(value.body.len() + meta_size);
99 let meta0_size = value.meta.0.len() as u32;
100 let meta1_size = value.meta.1.len() as u32;
101 buf.put_u32(meta0_size);
102 buf.put_u32(meta1_size);
103 buf.extend(value.meta.0);
104 buf.extend(value.meta.1);
105 buf.extend(value.body.iter());
106
107 buf.into()
108 }
109}
110
111#[derive(Debug)]
112pub struct HttpCacheStats {
113 pub reading: u32,
114 pub writing: u32,
115}
116
117#[async_trait]
123pub trait HttpCacheStorage: Sync + Send {
124 async fn get(
127 &self,
128 key: &str,
129 namespace: &str,
130 ) -> Result<Option<CacheObject>>;
131
132 async fn put(
134 &self,
135 key: &str,
136 namespace: &str,
137 data: CacheObject,
138 ) -> Result<()>;
139
140 async fn remove(
149 &self,
150 _key: &str,
151 _namespace: &str,
152 ) -> Result<Option<CacheObject>> {
153 Ok(None)
154 }
155
156 async fn clear(
164 &self,
165 _access_before: std::time::SystemTime,
166 ) -> Result<(i32, i32)> {
167 Ok((-1, -1))
168 }
169
170 fn stats(&self) -> Option<HttpCacheStats> {
175 None
176 }
177
178 fn support_clear(&self) -> bool {
185 false
186 }
187}
188
189async fn do_file_storage_clear(
190 count: u32,
191 cache: Arc<dyn HttpCacheStorage>,
192) -> Result<bool, ServiceError> {
193 let offset = 60;
195 if count % offset != 0 {
196 return Ok(false);
197 }
198
199 let Some(access_before) =
200 SystemTime::now().checked_sub(Duration::from_secs(24 * 3600))
201 else {
202 return Ok(false);
203 };
204
205 let Ok((success, fail)) = cache.clear(access_before).await else {
206 return Ok(true);
207 };
208 if success < 0 {
209 return Ok(true);
210 }
211 info!(
212 category = LOG_CATEGORY,
213 success, fail, "file cache storage clear"
214 );
215 Ok(true)
216}
217
218pub fn new_storage_clear_service() -> Option<(String, SimpleServiceTaskFuture)>
219{
220 if !is_cache_backend_init() {
222 return None;
223 }
224 let Ok(backend) = get_cache_backend(None) else {
227 return None;
228 };
229 if !backend.cache.support_clear() {
230 return None;
231 }
232 let task: SimpleServiceTaskFuture = Box::new(move |count: u32| {
233 Box::pin({
234 let value = backend.cache.clone();
235 async move {
236 let value = value.clone();
237 do_file_storage_clear(count, value).await
238 }
239 })
240 });
241 Some(("cache_storage_clear".to_string(), task))
242}
243
244pub struct HttpCache {
245 pub directory: Option<String>,
246 pub cache: Arc<dyn HttpCacheStorage>,
247}
248
249impl HttpCache {
250 #[inline]
251 pub fn stats(&self) -> Option<HttpCacheStats> {
252 self.cache.stats()
253 }
254}
255
256pub struct CompleteHit {
258 body: Bytes,
260 done: bool,
262 range_start: usize,
264 range_end: usize,
266}
267
268impl CompleteHit {
269 fn get(&mut self) -> Option<Bytes> {
270 if self.done {
271 None
272 } else {
273 self.done = true;
274 Some(self.body.slice(self.range_start..self.range_end))
275 }
276 }
277
278 fn seek(&mut self, start: usize, end: Option<usize>) -> Result<()> {
279 if start >= self.body.len() {
280 return Err(Error::Invalid {
281 message: format!(
282 "seek start out of range {start} >= {}",
283 self.body.len()
284 ),
285 });
286 }
287 self.range_start = start;
288 if let Some(end) = end {
289 self.range_end = std::cmp::min(self.body.len(), end);
291 }
292 self.done = false;
294 Ok(())
295 }
296}
297
298#[async_trait]
299impl HandleHit for CompleteHit {
300 async fn read_body(&mut self) -> pingora::Result<Option<Bytes>> {
301 Ok(self.get())
302 }
303 async fn finish(
304 self: Box<Self>, _storage: &'static (dyn Storage + Sync),
306 _key: &CacheKey,
307 _trace: &SpanHandle,
308 ) -> pingora::Result<()> {
309 Ok(())
310 }
311
312 fn can_seek(&self) -> bool {
313 true
314 }
315
316 fn seek(
317 &mut self,
318 start: usize,
319 end: Option<usize>,
320 ) -> pingora::Result<()> {
321 self.seek(start, end)?;
322 Ok(())
323 }
324
325 fn as_any(&self) -> &(dyn Any + Send + Sync) {
326 self
327 }
328}
329
330pub struct ObjectMissHandler {
332 meta: BinaryMeta,
334 body: BytesMut,
336 key: String,
338 namespace: String,
340 cache: Arc<dyn HttpCacheStorage>,
342}
343
344#[async_trait]
345impl HandleMiss for ObjectMissHandler {
346 async fn write_body(
347 &mut self,
348 data: bytes::Bytes,
349 _eof: bool,
350 ) -> pingora::Result<()> {
351 self.body.extend(&data);
352 Ok(())
353 }
354
355 async fn finish(self: Box<Self>) -> pingora::Result<MissFinishType> {
356 let size = self.body.len(); let _ = self
358 .cache
359 .put(
360 &self.key,
361 &self.namespace,
362 CacheObject {
363 meta: self.meta,
364 body: self.body.into(),
365 },
366 )
367 .await?;
368
369 Ok(MissFinishType::Created(size))
370 }
371}
372
373#[async_trait]
374impl Storage for HttpCache {
375 async fn lookup(
376 &'static self,
377 key: &CacheKey,
378 _trace: &SpanHandle,
379 ) -> pingora::Result<Option<(CacheMeta, HitHandler)>> {
380 let namespace = key.namespace();
381 let hash = key.combined();
382 if let Some(obj) = self.cache.get(&hash, namespace).await? {
383 let meta = CacheMeta::deserialize(&obj.meta.0, &obj.meta.1)?;
384 let size = obj.body.len();
385 let hit_handler = CompleteHit {
386 body: obj.body,
387 done: false,
388 range_start: 0,
389 range_end: size,
390 };
391 Ok(Some((meta, Box::new(hit_handler))))
392 } else {
393 Ok(None)
394 }
395 }
396
397 async fn get_miss_handler(
398 &'static self,
399 key: &CacheKey,
400 meta: &CacheMeta,
401 _trace: &SpanHandle,
402 ) -> pingora::Result<MissHandler> {
403 let capacity = 5 * 1024;
405 let size = if let Some(content_length) =
406 meta.headers().get(http::header::CONTENT_LENGTH)
407 {
408 content_length
409 .to_str()
410 .unwrap_or_default()
411 .parse::<usize>()
412 .unwrap_or(capacity)
413 } else {
414 capacity
415 };
416 let hash = key.combined();
417 let meta = meta.serialize()?;
418 let miss_handler = ObjectMissHandler {
419 meta,
420 key: hash,
421 namespace: key.namespace().to_string(),
422 cache: self.cache.clone(),
423 body: BytesMut::with_capacity(size),
424 };
425 Ok(Box::new(miss_handler))
426 }
427
428 async fn purge(
429 &'static self,
430 key: &CompactCacheKey,
431 _type: PurgeType,
432 _trace: &SpanHandle,
433 ) -> pingora::Result<bool> {
434 let hash = key.combined();
437 let cache_removed =
439 if let Ok(result) = self.cache.remove(&hash, "").await {
440 result.is_some()
441 } else {
442 false
443 };
444 Ok(cache_removed)
445 }
446
447 async fn update_meta(
448 &'static self,
449 key: &CacheKey,
450 meta: &CacheMeta,
451 _trace: &SpanHandle,
452 ) -> pingora::Result<bool> {
453 let namespace = key.namespace();
454 let hash = key.combined();
455 if let Some(mut obj) = self.cache.get(&hash, namespace).await? {
456 obj.meta = meta.serialize()?;
457 let _ = self.cache.put(&hash, namespace, obj).await?;
458 Ok(true)
459 } else {
460 Err(Error::Invalid {
461 message: "no meta found".to_string(),
462 }
463 .into())
464 }
465 }
466
467 fn support_streaming_partial_write(&self) -> bool {
468 false
469 }
470
471 fn as_any(&self) -> &(dyn Any + Send + Sync) {
472 self
473 }
474}
475
476#[cfg(test)]
477mod tests {
478 use super::*;
479 use crate::tiny::new_tiny_ufo_cache;
480 use bytes::{Bytes, BytesMut};
481 use pingora::cache::storage::{HitHandler, MissHandler};
482 use pretty_assertions::assert_eq;
483 use std::sync::Arc;
484
485 #[tokio::test]
486 async fn test_complete_hit() {
487 let body = Bytes::from_static(b"Hello World!");
488 let size = body.len();
489 let hit = CompleteHit {
490 body,
491 done: false,
492 range_start: 0,
493 range_end: size,
494 };
495 let mut handle: HitHandler = Box::new(hit);
496 let body = handle.read_body().await.unwrap();
497 assert_eq!(true, body.is_some());
498 assert_eq!(b"Hello World!", body.unwrap().as_ref());
499
500 handle.seek(1, Some(size - 1)).unwrap();
501 let body = handle.read_body().await.unwrap();
502 assert_eq!(true, body.is_some());
503 assert_eq!(b"ello World", body.unwrap().as_ref());
504 }
505
506 #[tokio::test]
507 async fn test_object_miss_handler() {
508 let key = "key";
509
510 let cache = Arc::new(new_tiny_ufo_cache("", 10, 10));
511 let obj = ObjectMissHandler {
512 meta: (b"Hello".to_vec(), b"World".to_vec()),
513 body: BytesMut::new(),
514 key: key.to_string(),
515 namespace: "".to_string(),
516 cache: cache.clone(),
517 };
518 let mut handle: MissHandler = Box::new(obj);
519
520 handle
521 .write_body(Bytes::from_static(b"Hello World!"), true)
522 .await
523 .unwrap();
524 handle.finish().await.unwrap();
525
526 let data = cache.get(key, "").await.unwrap().unwrap();
527 assert_eq!("Hello World!", std::str::from_utf8(&data.body).unwrap());
528 }
529
530 #[test]
531 fn test_cache_object_get_weight() {
532 let obj = CacheObject {
534 meta: (b"Hello".to_vec(), b"World".to_vec()),
535 body: Bytes::from_static(b"Hello World!"),
536 };
537 assert_eq!(1, obj.get_weight());
538
539 let obj = CacheObject {
540 meta: (b"Hello".to_vec(), b"World".to_vec()),
541 body: vec![0; PAGE_SIZE * 2].into(),
542 };
543 assert_eq!(2, obj.get_weight());
544
545 let obj = CacheObject {
547 meta: (b"Hello".to_vec(), b"World".to_vec()),
548 body: vec![0; MAX_OBJECT_CACHE_SIZE + 1].into(),
549 };
550 assert_eq!(u16::MAX, obj.get_weight());
551 }
552}