1use super::LOG_TARGET;
16use super::{Error, PAGE_SIZE, Result};
17use crate::get_file_backends;
18use async_trait::async_trait;
19use bytes::{Buf, BufMut, Bytes, BytesMut};
20use pingap_core::BackgroundTask;
21use pingap_core::Error as ServiceError;
22use pingora::cache::key::{CacheHashKey, CompactCacheKey};
23use pingora::cache::storage::MissFinishType;
24use pingora::cache::storage::{HandleHit, HandleMiss};
25use pingora::cache::trace::SpanHandle;
26use pingora::cache::{
27 CacheKey, CacheMeta, HitHandler, MissHandler, PurgeType, Storage,
28};
29use std::any::Any;
30use std::sync::Arc;
31use std::time::Duration;
32use std::time::SystemTime;
33use tracing::info;
34
35type BinaryMeta = (Vec<u8>, Vec<u8>);
36
37#[derive(Debug, Clone, Default, PartialEq)]
39pub struct CacheObject {
40 pub meta: BinaryMeta,
42 pub body: Bytes,
44}
45
46static MAX_OBJECT_CACHE_SIZE: usize = 10 * 1024 * PAGE_SIZE;
48
49impl CacheObject {
50 pub fn get_weight(&self) -> u16 {
51 let size = self.body.len() + self.meta.0.len() + self.meta.1.len();
52 if size <= PAGE_SIZE {
53 return 1;
54 }
55 if size >= MAX_OBJECT_CACHE_SIZE {
56 return u16::MAX;
57 }
58 (size / PAGE_SIZE) as u16
59 }
60}
61
62const META_SIZE_LENGTH: usize = 8;
63
64impl From<Bytes> for CacheObject {
71 fn from(value: Bytes) -> Self {
72 if value.len() < META_SIZE_LENGTH {
74 return Self::default();
75 }
76 let mut data = value;
77
78 let meta0_size = data.get_u32() as usize;
79 let meta1_size = data.get_u32() as usize;
80
81 let meta0 = data.split_to(meta0_size).to_vec();
82 let meta1 = data.split_to(meta1_size).to_vec();
83
84 Self {
85 meta: (meta0, meta1),
86 body: data,
87 }
88 }
89}
90impl From<CacheObject> for Bytes {
97 fn from(value: CacheObject) -> Self {
98 let meta_size =
99 value.meta.0.len() + value.meta.1.len() + META_SIZE_LENGTH;
100 let mut buf = BytesMut::with_capacity(value.body.len() + meta_size);
101 let meta0_size = value.meta.0.len() as u32;
102 let meta1_size = value.meta.1.len() as u32;
103 buf.put_u32(meta0_size);
104 buf.put_u32(meta1_size);
105 buf.extend(value.meta.0);
106 buf.extend(value.meta.1);
107 buf.extend(value.body.iter());
108
109 buf.into()
110 }
111}
112
113#[derive(Debug)]
114pub struct HttpCacheStats {
115 pub reading: u32,
116 pub writing: u32,
117}
118
119#[derive(Debug, Clone)]
120pub struct HttpCacheClearStats {
121 pub success: i32,
122 pub fail: i32,
123 pub description: String,
124}
125
126#[async_trait]
132pub trait HttpCacheStorage: Sync + Send {
133 async fn get(
136 &self,
137 key: &str,
138 namespace: &[u8],
139 ) -> Result<Option<CacheObject>>;
140
141 async fn put(
143 &self,
144 key: &str,
145 namespace: &[u8],
146 data: CacheObject,
147 ) -> Result<()>;
148
149 async fn remove(
158 &self,
159 _key: &str,
160 _namespace: &[u8],
161 ) -> Result<Option<CacheObject>> {
162 Ok(None)
163 }
164
165 async fn clear(
173 &self,
174 _access_before: std::time::SystemTime,
175 ) -> Result<HttpCacheClearStats> {
176 Ok(HttpCacheClearStats {
177 success: -1,
178 fail: -1,
179 description: "".to_string(),
180 })
181 }
182
183 fn stats(&self) -> Option<HttpCacheStats> {
188 None
189 }
190
191 fn inactive(&self) -> Option<Duration> {
196 None
197 }
198}
199
200async fn do_file_storage_clear(count: u32) -> Result<bool, ServiceError> {
201 let offset = 60;
203 if count % offset != 0 {
204 return Ok(false);
205 }
206
207 let backends = get_file_backends();
208 for backend in backends {
209 let cache = &backend.cache;
210 let Some(inactive_duration) = cache.inactive() else {
211 continue;
212 };
213
214 let Some(access_before) =
215 SystemTime::now().checked_sub(inactive_duration)
216 else {
217 return Ok(false);
218 };
219
220 let Ok(stats) = cache.clear(access_before).await else {
221 return Ok(true);
222 };
223 info!(
224 target: LOG_TARGET,
225 success = stats.success,
226 fail = stats.fail,
227 description = stats.description,
228 );
229 }
230 Ok(true)
231}
232
233struct StorageClearTask {}
234
235#[async_trait]
236impl BackgroundTask for StorageClearTask {
237 async fn execute(&self, count: u32) -> Result<bool, ServiceError> {
238 do_file_storage_clear(count).await?;
239 Ok(true)
240 }
241}
242
243pub fn new_storage_clear_service() -> Option<Box<dyn BackgroundTask>> {
244 Some(Box::new(StorageClearTask {}))
245}
246
247pub struct HttpCache {
248 pub directory: Option<String>,
249 pub cache: Arc<dyn HttpCacheStorage>,
250 pub max_size: u64,
251}
252
253impl HttpCache {
254 #[inline]
255 pub fn stats(&self) -> Option<HttpCacheStats> {
256 self.cache.stats()
257 }
258 pub fn max_size(&self) -> u64 {
259 self.max_size
260 }
261}
262
263pub struct CompleteHit {
265 body: Bytes,
267 done: bool,
269 range_start: usize,
271 range_end: usize,
273}
274
275impl CompleteHit {
276 fn get(&mut self) -> Option<Bytes> {
277 if self.done {
278 None
279 } else {
280 self.done = true;
281 Some(self.body.slice(self.range_start..self.range_end))
282 }
283 }
284
285 fn seek(&mut self, start: usize, end: Option<usize>) -> Result<()> {
286 if start >= self.body.len() {
287 return Err(Error::Invalid {
288 message: format!(
289 "seek start out of range {start} >= {}",
290 self.body.len()
291 ),
292 });
293 }
294 self.range_start = start;
295 if let Some(end) = end {
296 self.range_end = std::cmp::min(self.body.len(), end);
298 }
299 self.done = false;
301 Ok(())
302 }
303}
304
305#[async_trait]
306impl HandleHit for CompleteHit {
307 async fn read_body(&mut self) -> pingora::Result<Option<Bytes>> {
308 Ok(self.get())
309 }
310 async fn finish(
311 self: Box<Self>, _storage: &'static (dyn Storage + Sync),
313 _key: &CacheKey,
314 _trace: &SpanHandle,
315 ) -> pingora::Result<()> {
316 Ok(())
317 }
318
319 fn can_seek(&self) -> bool {
320 true
321 }
322
323 fn seek(
324 &mut self,
325 start: usize,
326 end: Option<usize>,
327 ) -> pingora::Result<()> {
328 self.seek(start, end)?;
329 Ok(())
330 }
331
332 fn as_any(&self) -> &(dyn Any + Send + Sync) {
333 self
334 }
335
336 fn as_any_mut(&mut self) -> &mut (dyn Any + Send + Sync) {
337 self
338 }
339}
340
341pub struct ObjectMissHandler {
343 meta: BinaryMeta,
345 body: BytesMut,
347 key: String,
349 primary_key: String,
351 namespace: Vec<u8>,
353 cache: Arc<dyn HttpCacheStorage>,
355}
356
357#[async_trait]
358impl HandleMiss for ObjectMissHandler {
359 async fn write_body(
360 &mut self,
361 data: bytes::Bytes,
362 _eof: bool,
363 ) -> pingora::Result<()> {
364 self.body.extend(&data);
365 Ok(())
366 }
367
368 async fn finish(self: Box<Self>) -> pingora::Result<MissFinishType> {
369 let size = self.body.len(); info!(
371 target: LOG_TARGET,
372 key = self.key,
373 primary_key = self.primary_key,
374 namespace = std::str::from_utf8(&self.namespace).ok(),
375 size,
376 "put data to cache"
377 );
378 let _ = self
379 .cache
380 .put(
381 &self.key,
382 &self.namespace,
383 CacheObject {
384 meta: self.meta,
385 body: self.body.into(),
386 },
387 )
388 .await?;
389
390 Ok(MissFinishType::Created(size))
391 }
392}
393
394#[async_trait]
395impl Storage for HttpCache {
396 async fn lookup(
397 &'static self,
398 key: &CacheKey,
399 _trace: &SpanHandle,
400 ) -> pingora::Result<Option<(CacheMeta, HitHandler)>> {
401 let namespace = key.namespace();
402 let hash = key.combined();
403 if let Some(obj) = self.cache.get(&hash, namespace).await? {
404 let meta = CacheMeta::deserialize(&obj.meta.0, &obj.meta.1)?;
405 let size = obj.body.len();
406 let hit_handler = CompleteHit {
407 body: obj.body,
408 done: false,
409 range_start: 0,
410 range_end: size,
411 };
412 Ok(Some((meta, Box::new(hit_handler))))
413 } else {
414 Ok(None)
415 }
416 }
417
418 async fn get_miss_handler(
419 &'static self,
420 key: &CacheKey,
421 meta: &CacheMeta,
422 _trace: &SpanHandle,
423 ) -> pingora::Result<MissHandler> {
424 let capacity = 5 * 1024;
426 let size = if let Some(content_length) =
427 meta.headers().get(http::header::CONTENT_LENGTH)
428 {
429 content_length
430 .to_str()
431 .unwrap_or_default()
432 .parse::<usize>()
433 .unwrap_or(capacity)
434 } else {
435 capacity
436 };
437 let hash = key.combined();
438 let meta = meta.serialize()?;
439 let miss_handler = ObjectMissHandler {
440 meta,
441 key: hash,
442 primary_key: key.primary_key_str().unwrap_or_default().to_string(),
443 namespace: key.namespace().to_vec(),
444 cache: self.cache.clone(),
445 body: BytesMut::with_capacity(size),
446 };
447 Ok(Box::new(miss_handler))
448 }
449
450 async fn purge(
451 &'static self,
452 key: &CompactCacheKey,
453 _type: PurgeType,
454 _trace: &SpanHandle,
455 ) -> pingora::Result<bool> {
456 let hash = key.combined();
459 let cache_removed =
461 if let Ok(result) = self.cache.remove(&hash, b"").await {
462 result.is_some()
463 } else {
464 false
465 };
466 Ok(cache_removed)
467 }
468
469 async fn update_meta(
470 &'static self,
471 key: &CacheKey,
472 meta: &CacheMeta,
473 _trace: &SpanHandle,
474 ) -> pingora::Result<bool> {
475 let namespace = key.namespace();
476 let hash = key.combined();
477 if let Some(mut obj) = self.cache.get(&hash, namespace).await? {
478 obj.meta = meta.serialize()?;
479 let _ = self.cache.put(&hash, namespace, obj).await?;
480 Ok(true)
481 } else {
482 Err(Error::Invalid {
483 message: "no meta found".to_string(),
484 }
485 .into())
486 }
487 }
488
489 fn support_streaming_partial_write(&self) -> bool {
490 false
491 }
492
493 fn as_any(&self) -> &(dyn Any + Send + Sync) {
494 self
495 }
496}
497
498#[cfg(test)]
499mod tests {
500 use super::*;
501 use crate::tiny::{CacheMode, TinyUfoCache};
502 use bytes::{Bytes, BytesMut};
503 use pingora::cache::storage::{HitHandler, MissHandler};
504 use pretty_assertions::assert_eq;
505 use std::sync::Arc;
506
507 #[tokio::test]
508 async fn test_complete_hit() {
509 let body = Bytes::from_static(b"Hello World!");
510 let size = body.len();
511 let hit = CompleteHit {
512 body,
513 done: false,
514 range_start: 0,
515 range_end: size,
516 };
517 let mut handle: HitHandler = Box::new(hit);
518 let body = handle.read_body().await.unwrap();
519 assert_eq!(true, body.is_some());
520 assert_eq!(b"Hello World!", body.unwrap().as_ref());
521
522 handle.seek(1, Some(size - 1)).unwrap();
523 let body = handle.read_body().await.unwrap();
524 assert_eq!(true, body.is_some());
525 assert_eq!(b"ello World", body.unwrap().as_ref());
526 }
527
528 #[tokio::test]
529 async fn test_object_miss_handler() {
530 let key = "key";
531
532 let cache = Arc::new(TinyUfoCache::new(CacheMode::Normal, 10, 10));
533 let obj = ObjectMissHandler {
534 meta: (b"Hello".to_vec(), b"World".to_vec()),
535 body: BytesMut::new(),
536 key: key.to_string(),
537 primary_key: "".to_string(),
538 namespace: b"".to_vec(),
539 cache: cache.clone(),
540 };
541 let mut handle: MissHandler = Box::new(obj);
542
543 handle
544 .write_body(Bytes::from_static(b"Hello World!"), true)
545 .await
546 .unwrap();
547 handle.finish().await.unwrap();
548
549 let data = cache.get(key, b"").await.unwrap().unwrap();
550 assert_eq!("Hello World!", std::str::from_utf8(&data.body).unwrap());
551 }
552
553 #[test]
554 fn test_cache_object_get_weight() {
555 let obj = CacheObject {
557 meta: (b"Hello".to_vec(), b"World".to_vec()),
558 body: Bytes::from_static(b"Hello World!"),
559 };
560 assert_eq!(1, obj.get_weight());
561
562 let obj = CacheObject {
563 meta: (b"Hello".to_vec(), b"World".to_vec()),
564 body: vec![0; PAGE_SIZE * 2].into(),
565 };
566 assert_eq!(2, obj.get_weight());
567
568 let obj = CacheObject {
570 meta: (b"Hello".to_vec(), b"World".to_vec()),
571 body: vec![0; MAX_OBJECT_CACHE_SIZE + 1].into(),
572 };
573 assert_eq!(u16::MAX, obj.get_weight());
574 }
575}