1use super::{get_cache_backend, is_cache_backend_init, LOG_CATEGORY};
16use super::{Error, Result, PAGE_SIZE};
17use async_trait::async_trait;
18use bytes::{Buf, BufMut, Bytes, BytesMut};
19use pingap_core::Error as ServiceError;
20use pingap_core::SimpleServiceTaskFuture;
21use pingora::cache::key::{CacheHashKey, CompactCacheKey};
22use pingora::cache::storage::MissFinishType;
23use pingora::cache::storage::{HandleHit, HandleMiss};
24use pingora::cache::trace::SpanHandle;
25use pingora::cache::{
26 CacheKey, CacheMeta, HitHandler, MissHandler, PurgeType, Storage,
27};
28use std::any::Any;
29use std::sync::Arc;
30use std::time::{Duration, SystemTime};
31use tracing::info;
32
33type BinaryMeta = (Vec<u8>, Vec<u8>);
34
35#[derive(Debug, Clone, Default, PartialEq)]
37pub struct CacheObject {
38 pub meta: BinaryMeta,
40 pub body: Bytes,
42}
43
44static MAX_OBJECT_CACHE_SIZE: usize = 10 * 1024 * PAGE_SIZE;
46
47impl CacheObject {
48 pub fn get_weight(&self) -> u16 {
49 let size = self.body.len() + self.meta.0.len() + self.meta.1.len();
50 if size <= PAGE_SIZE {
51 return 1;
52 }
53 if size >= MAX_OBJECT_CACHE_SIZE {
54 return u16::MAX;
55 }
56 (size / PAGE_SIZE) as u16
57 }
58}
59
60const META_SIZE_LENGTH: usize = 8;
61
62impl From<Bytes> for CacheObject {
69 fn from(value: Bytes) -> Self {
70 if value.len() < META_SIZE_LENGTH {
72 return Self::default();
73 }
74 let mut data = value;
75
76 let meta0_size = data.get_u32() as usize;
77 let meta1_size = data.get_u32() as usize;
78
79 let meta0 = data.split_to(meta0_size).to_vec();
80 let meta1 = data.split_to(meta1_size).to_vec();
81
82 Self {
83 meta: (meta0, meta1),
84 body: data,
85 }
86 }
87}
88impl From<CacheObject> for Bytes {
95 fn from(value: CacheObject) -> Self {
96 let meta_size =
97 value.meta.0.len() + value.meta.1.len() + META_SIZE_LENGTH;
98 let mut buf = BytesMut::with_capacity(value.body.len() + meta_size);
99 let meta0_size = value.meta.0.len() as u32;
100 let meta1_size = value.meta.1.len() as u32;
101 buf.put_u32(meta0_size);
102 buf.put_u32(meta1_size);
103 buf.extend(value.meta.0);
104 buf.extend(value.meta.1);
105 buf.extend(value.body.iter());
106
107 buf.into()
108 }
109}
110
111#[derive(Debug)]
112pub struct HttpCacheStats {
113 pub reading: u32,
114 pub writing: u32,
115}
116
117#[async_trait]
123pub trait HttpCacheStorage: Sync + Send {
124 async fn get(
127 &self,
128 key: &str,
129 namespace: &[u8],
130 ) -> Result<Option<CacheObject>>;
131
132 async fn put(
134 &self,
135 key: &str,
136 namespace: &[u8],
137 data: CacheObject,
138 ) -> Result<()>;
139
140 async fn remove(
149 &self,
150 _key: &str,
151 _namespace: &[u8],
152 ) -> Result<Option<CacheObject>> {
153 Ok(None)
154 }
155
156 async fn clear(
164 &self,
165 _access_before: std::time::SystemTime,
166 ) -> Result<(i32, i32)> {
167 Ok((-1, -1))
168 }
169
170 fn stats(&self) -> Option<HttpCacheStats> {
175 None
176 }
177
178 fn inactive(&self) -> Option<Duration> {
183 None
184 }
185}
186
187async fn do_file_storage_clear(
188 count: u32,
189 cache: Arc<dyn HttpCacheStorage>,
190) -> Result<bool, ServiceError> {
191 let offset = 60;
193 if count % offset != 0 {
194 return Ok(false);
195 }
196
197 let Some(inactive_duration) = cache.inactive() else {
198 return Ok(false);
199 };
200
201 let Some(access_before) = SystemTime::now().checked_sub(inactive_duration)
202 else {
203 return Ok(false);
204 };
205
206 let Ok((success, fail)) = cache.clear(access_before).await else {
207 return Ok(true);
208 };
209 if success < 0 {
210 return Ok(true);
211 }
212 info!(
213 category = LOG_CATEGORY,
214 success, fail, "file cache storage clear"
215 );
216 Ok(true)
217}
218
219pub fn new_storage_clear_service() -> Option<(String, SimpleServiceTaskFuture)>
220{
221 if !is_cache_backend_init() {
223 return None;
224 }
225 let Ok(backend) = get_cache_backend(None) else {
228 return None;
229 };
230 backend.cache.inactive()?;
231 let task: SimpleServiceTaskFuture = Box::new(move |count: u32| {
232 Box::pin({
233 let value = backend.cache.clone();
234 async move {
235 let value = value.clone();
236 do_file_storage_clear(count, value).await
237 }
238 })
239 });
240 Some(("cache_storage_clear".to_string(), task))
241}
242
243pub struct HttpCache {
244 pub directory: Option<String>,
245 pub cache: Arc<dyn HttpCacheStorage>,
246}
247
248impl HttpCache {
249 #[inline]
250 pub fn stats(&self) -> Option<HttpCacheStats> {
251 self.cache.stats()
252 }
253}
254
255pub struct CompleteHit {
257 body: Bytes,
259 done: bool,
261 range_start: usize,
263 range_end: usize,
265}
266
267impl CompleteHit {
268 fn get(&mut self) -> Option<Bytes> {
269 if self.done {
270 None
271 } else {
272 self.done = true;
273 Some(self.body.slice(self.range_start..self.range_end))
274 }
275 }
276
277 fn seek(&mut self, start: usize, end: Option<usize>) -> Result<()> {
278 if start >= self.body.len() {
279 return Err(Error::Invalid {
280 message: format!(
281 "seek start out of range {start} >= {}",
282 self.body.len()
283 ),
284 });
285 }
286 self.range_start = start;
287 if let Some(end) = end {
288 self.range_end = std::cmp::min(self.body.len(), end);
290 }
291 self.done = false;
293 Ok(())
294 }
295}
296
297#[async_trait]
298impl HandleHit for CompleteHit {
299 async fn read_body(&mut self) -> pingora::Result<Option<Bytes>> {
300 Ok(self.get())
301 }
302 async fn finish(
303 self: Box<Self>, _storage: &'static (dyn Storage + Sync),
305 _key: &CacheKey,
306 _trace: &SpanHandle,
307 ) -> pingora::Result<()> {
308 Ok(())
309 }
310
311 fn can_seek(&self) -> bool {
312 true
313 }
314
315 fn seek(
316 &mut self,
317 start: usize,
318 end: Option<usize>,
319 ) -> pingora::Result<()> {
320 self.seek(start, end)?;
321 Ok(())
322 }
323
324 fn as_any(&self) -> &(dyn Any + Send + Sync) {
325 self
326 }
327
328 fn as_any_mut(&mut self) -> &mut (dyn Any + Send + Sync) {
329 self
330 }
331}
332
333pub struct ObjectMissHandler {
335 meta: BinaryMeta,
337 body: BytesMut,
339 key: String,
341 primary_key: String,
343 namespace: Vec<u8>,
345 cache: Arc<dyn HttpCacheStorage>,
347}
348
349#[async_trait]
350impl HandleMiss for ObjectMissHandler {
351 async fn write_body(
352 &mut self,
353 data: bytes::Bytes,
354 _eof: bool,
355 ) -> pingora::Result<()> {
356 self.body.extend(&data);
357 Ok(())
358 }
359
360 async fn finish(self: Box<Self>) -> pingora::Result<MissFinishType> {
361 let size = self.body.len(); info!(
363 category = LOG_CATEGORY,
364 key = self.key,
365 primary_key = self.primary_key,
366 namespace = std::str::from_utf8(&self.namespace).ok(),
367 size,
368 "put data to cache"
369 );
370 let _ = self
371 .cache
372 .put(
373 &self.key,
374 &self.namespace,
375 CacheObject {
376 meta: self.meta,
377 body: self.body.into(),
378 },
379 )
380 .await?;
381
382 Ok(MissFinishType::Created(size))
383 }
384}
385
386#[async_trait]
387impl Storage for HttpCache {
388 async fn lookup(
389 &'static self,
390 key: &CacheKey,
391 _trace: &SpanHandle,
392 ) -> pingora::Result<Option<(CacheMeta, HitHandler)>> {
393 let namespace = key.namespace();
394 let hash = key.combined();
395 if let Some(obj) = self.cache.get(&hash, namespace).await? {
396 let meta = CacheMeta::deserialize(&obj.meta.0, &obj.meta.1)?;
397 let size = obj.body.len();
398 let hit_handler = CompleteHit {
399 body: obj.body,
400 done: false,
401 range_start: 0,
402 range_end: size,
403 };
404 Ok(Some((meta, Box::new(hit_handler))))
405 } else {
406 Ok(None)
407 }
408 }
409
410 async fn get_miss_handler(
411 &'static self,
412 key: &CacheKey,
413 meta: &CacheMeta,
414 _trace: &SpanHandle,
415 ) -> pingora::Result<MissHandler> {
416 let capacity = 5 * 1024;
418 let size = if let Some(content_length) =
419 meta.headers().get(http::header::CONTENT_LENGTH)
420 {
421 content_length
422 .to_str()
423 .unwrap_or_default()
424 .parse::<usize>()
425 .unwrap_or(capacity)
426 } else {
427 capacity
428 };
429 let hash = key.combined();
430 let meta = meta.serialize()?;
431 let miss_handler = ObjectMissHandler {
432 meta,
433 key: hash,
434 primary_key: key.primary_key_str().unwrap_or_default().to_string(),
435 namespace: key.namespace().to_vec(),
436 cache: self.cache.clone(),
437 body: BytesMut::with_capacity(size),
438 };
439 Ok(Box::new(miss_handler))
440 }
441
442 async fn purge(
443 &'static self,
444 key: &CompactCacheKey,
445 _type: PurgeType,
446 _trace: &SpanHandle,
447 ) -> pingora::Result<bool> {
448 let hash = key.combined();
451 let cache_removed =
453 if let Ok(result) = self.cache.remove(&hash, b"").await {
454 result.is_some()
455 } else {
456 false
457 };
458 Ok(cache_removed)
459 }
460
461 async fn update_meta(
462 &'static self,
463 key: &CacheKey,
464 meta: &CacheMeta,
465 _trace: &SpanHandle,
466 ) -> pingora::Result<bool> {
467 let namespace = key.namespace();
468 let hash = key.combined();
469 if let Some(mut obj) = self.cache.get(&hash, namespace).await? {
470 obj.meta = meta.serialize()?;
471 let _ = self.cache.put(&hash, namespace, obj).await?;
472 Ok(true)
473 } else {
474 Err(Error::Invalid {
475 message: "no meta found".to_string(),
476 }
477 .into())
478 }
479 }
480
481 fn support_streaming_partial_write(&self) -> bool {
482 false
483 }
484
485 fn as_any(&self) -> &(dyn Any + Send + Sync) {
486 self
487 }
488}
489
490#[cfg(test)]
491mod tests {
492 use super::*;
493 use crate::tiny::new_tiny_ufo_cache;
494 use bytes::{Bytes, BytesMut};
495 use pingora::cache::storage::{HitHandler, MissHandler};
496 use pretty_assertions::assert_eq;
497 use std::sync::Arc;
498
499 #[tokio::test]
500 async fn test_complete_hit() {
501 let body = Bytes::from_static(b"Hello World!");
502 let size = body.len();
503 let hit = CompleteHit {
504 body,
505 done: false,
506 range_start: 0,
507 range_end: size,
508 };
509 let mut handle: HitHandler = Box::new(hit);
510 let body = handle.read_body().await.unwrap();
511 assert_eq!(true, body.is_some());
512 assert_eq!(b"Hello World!", body.unwrap().as_ref());
513
514 handle.seek(1, Some(size - 1)).unwrap();
515 let body = handle.read_body().await.unwrap();
516 assert_eq!(true, body.is_some());
517 assert_eq!(b"ello World", body.unwrap().as_ref());
518 }
519
520 #[tokio::test]
521 async fn test_object_miss_handler() {
522 let key = "key";
523
524 let cache = Arc::new(new_tiny_ufo_cache("", 10, 10));
525 let obj = ObjectMissHandler {
526 meta: (b"Hello".to_vec(), b"World".to_vec()),
527 body: BytesMut::new(),
528 key: key.to_string(),
529 primary_key: "".to_string(),
530 namespace: b"".to_vec(),
531 cache: cache.clone(),
532 };
533 let mut handle: MissHandler = Box::new(obj);
534
535 handle
536 .write_body(Bytes::from_static(b"Hello World!"), true)
537 .await
538 .unwrap();
539 handle.finish().await.unwrap();
540
541 let data = cache.get(key, b"").await.unwrap().unwrap();
542 assert_eq!("Hello World!", std::str::from_utf8(&data.body).unwrap());
543 }
544
545 #[test]
546 fn test_cache_object_get_weight() {
547 let obj = CacheObject {
549 meta: (b"Hello".to_vec(), b"World".to_vec()),
550 body: Bytes::from_static(b"Hello World!"),
551 };
552 assert_eq!(1, obj.get_weight());
553
554 let obj = CacheObject {
555 meta: (b"Hello".to_vec(), b"World".to_vec()),
556 body: vec![0; PAGE_SIZE * 2].into(),
557 };
558 assert_eq!(2, obj.get_weight());
559
560 let obj = CacheObject {
562 meta: (b"Hello".to_vec(), b"World".to_vec()),
563 body: vec![0; MAX_OBJECT_CACHE_SIZE + 1].into(),
564 };
565 assert_eq!(u16::MAX, obj.get_weight());
566 }
567}