1use async_trait::async_trait;
21use bytes::Bytes;
22use pingora_cache::eviction::EvictionManager;
23use pingora_cache::key::{CacheHashKey, CacheKey, CompactCacheKey};
24use pingora_cache::meta::CacheMeta;
25use pingora_cache::storage::{
26 HandleHit, HandleMiss, HitHandler, MissFinishType, MissHandler, PurgeType, Storage,
27};
28use pingora_cache::trace::SpanHandle;
29use pingora_core::{Error, ErrorType, Result};
30use std::any::Any;
31use std::collections::{HashMap, HashSet};
32use std::path::{Path, PathBuf};
33use std::sync::atomic::{AtomicU64, Ordering};
34use std::sync::Arc;
35use tokio::sync::RwLock;
36use tracing::{debug, error, info, warn};
37
38pub struct DiskCacheStorage {
47 base_path: PathBuf,
48 num_shards: u32,
49 #[allow(dead_code)]
50 max_size_bytes: usize,
51 inflight: Arc<RwLock<HashMap<String, HashSet<u64>>>>,
53 next_temp_id: AtomicU64,
54}
55
56impl DiskCacheStorage {
57 pub fn new(path: &Path, shards: u32, max_size: usize) -> Self {
62 let base = path.to_path_buf();
63
64 for shard in 0..shards {
66 let shard_dir = base.join(format!("shard-{:02}", shard));
67
68 for prefix in 0..=255u8 {
70 let prefix_dir = shard_dir.join(format!("{:02x}", prefix));
71 if let Err(e) = std::fs::create_dir_all(&prefix_dir) {
72 error!(path = %prefix_dir.display(), error = %e, "Failed to create prefix dir");
73 }
74 }
75
76 let tmp_dir = shard_dir.join("tmp");
78 if let Err(e) = std::fs::create_dir_all(&tmp_dir) {
79 error!(path = %tmp_dir.display(), error = %e, "Failed to create tmp dir");
80 } else {
81 Self::clean_orphaned_tmp(&tmp_dir);
82 }
83 }
84
85 info!(
86 path = %base.display(),
87 shards,
88 max_size_mb = max_size / 1024 / 1024,
89 "Disk cache storage initialized"
90 );
91
92 Self {
93 base_path: base,
94 num_shards: shards,
95 max_size_bytes: max_size,
96 inflight: Arc::new(RwLock::new(HashMap::new())),
97 next_temp_id: AtomicU64::new(1),
98 }
99 }
100
101 fn clean_orphaned_tmp(tmp_dir: &Path) {
103 let entries = match std::fs::read_dir(tmp_dir) {
104 Ok(e) => e,
105 Err(_) => return,
106 };
107 let mut cleaned = 0u64;
108 for entry in entries.flatten() {
109 let path = entry.path();
110 if path.extension().and_then(|e| e.to_str()) == Some("tmp") {
111 if let Err(e) = std::fs::remove_file(&path) {
112 warn!(path = %path.display(), error = %e, "Failed to clean orphaned tmp file");
113 } else {
114 cleaned += 1;
115 }
116 }
117 }
118 if cleaned > 0 {
119 info!(dir = %tmp_dir.display(), cleaned, "Cleaned orphaned tmp files");
120 }
121 }
122
123 fn shard_for_key(combined: &str, num_shards: u32) -> u32 {
129 let byte = u8::from_str_radix(&combined[..2], 16).unwrap_or(0);
131 (byte as u32) % num_shards
132 }
133
134 fn prefix_for_key(combined: &str) -> &str {
136 &combined[2..4]
138 }
139
140 fn meta_path(&self, combined: &str) -> PathBuf {
142 let shard = Self::shard_for_key(combined, self.num_shards);
143 let prefix = Self::prefix_for_key(combined);
144 self.base_path
145 .join(format!("shard-{:02}", shard))
146 .join(prefix)
147 .join(format!("{}.meta", combined))
148 }
149
150 fn body_path(&self, combined: &str) -> PathBuf {
152 let shard = Self::shard_for_key(combined, self.num_shards);
153 let prefix = Self::prefix_for_key(combined);
154 self.base_path
155 .join(format!("shard-{:02}", shard))
156 .join(prefix)
157 .join(format!("{}.body", combined))
158 }
159
160 fn tmp_dir_for_key(&self, combined: &str) -> PathBuf {
162 let shard = Self::shard_for_key(combined, self.num_shards);
163 self.base_path
164 .join(format!("shard-{:02}", shard))
165 .join("tmp")
166 }
167}
168
169fn serialize_meta_to_disk(meta: &CacheMeta) -> Result<Vec<u8>> {
177 let (internal, header) = meta.serialize()?;
178 let internal_len = internal.len() as u32;
179 let mut buf = Vec::with_capacity(4 + internal.len() + header.len());
180 buf.extend_from_slice(&internal_len.to_le_bytes());
181 buf.extend_from_slice(&internal);
182 buf.extend_from_slice(&header);
183 Ok(buf)
184}
185
186fn deserialize_meta_from_disk(data: &[u8]) -> Result<CacheMeta> {
188 if data.len() < 4 {
189 return Error::e_explain(ErrorType::FileReadError, "meta file too short");
190 }
191 let internal_len = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
192 if data.len() < 4 + internal_len {
193 return Error::e_explain(ErrorType::FileReadError, "meta file truncated");
194 }
195 let internal = &data[4..4 + internal_len];
196 let header = &data[4 + internal_len..];
197 CacheMeta::deserialize(internal, header)
198}
199
200pub struct DiskHitHandler {
208 body: Vec<u8>,
209 meta_size: usize,
210 done: bool,
211 range_start: usize,
212 range_end: usize,
213}
214
215#[async_trait]
216impl HandleHit for DiskHitHandler {
217 async fn read_body(&mut self) -> Result<Option<Bytes>> {
218 if self.done {
219 return Ok(None);
220 }
221 self.done = true;
222 Ok(Some(Bytes::copy_from_slice(
223 &self.body[self.range_start..self.range_end],
224 )))
225 }
226
227 async fn finish(
228 self: Box<Self>,
229 _storage: &'static (dyn Storage + Sync),
230 _key: &CacheKey,
231 _trace: &SpanHandle,
232 ) -> Result<()> {
233 Ok(())
234 }
235
236 fn can_seek(&self) -> bool {
237 true
238 }
239
240 fn seek(&mut self, start: usize, end: Option<usize>) -> Result<()> {
241 if start >= self.body.len() {
242 return Error::e_explain(
243 ErrorType::InternalError,
244 format!("seek start out of range {} >= {}", start, self.body.len()),
245 );
246 }
247 self.range_start = start;
248 if let Some(end) = end {
249 self.range_end = std::cmp::min(self.body.len(), end);
250 }
251 self.done = false;
252 Ok(())
253 }
254
255 fn get_eviction_weight(&self) -> usize {
256 self.meta_size + self.body.len()
257 }
258
259 fn as_any(&self) -> &(dyn Any + Send + Sync) {
260 self
261 }
262
263 fn as_any_mut(&mut self) -> &mut (dyn Any + Send + Sync) {
264 self
265 }
266}
267
268pub struct DiskMissHandler {
277 body_buffer: Vec<u8>,
278 serialized_meta: Vec<u8>,
279 combined: String,
280 meta_path: PathBuf,
281 body_path: PathBuf,
282 tmp_dir: PathBuf,
283 temp_id: u64,
284 inflight: Arc<RwLock<HashMap<String, HashSet<u64>>>>,
285 finished: bool,
286}
287
288#[async_trait]
289impl HandleMiss for DiskMissHandler {
290 async fn write_body(&mut self, data: Bytes, _eof: bool) -> Result<()> {
291 self.body_buffer.extend_from_slice(&data);
292 Ok(())
293 }
294
295 async fn finish(mut self: Box<Self>) -> Result<MissFinishType> {
296 self.finished = true;
297 let body = std::mem::take(&mut self.body_buffer);
298 let meta = self.serialized_meta.clone();
299 let meta_path = self.meta_path.clone();
300 let body_path = self.body_path.clone();
301 let tmp_dir = self.tmp_dir.clone();
302 let temp_id = self.temp_id;
303
304 let size = meta.len() + body.len();
305
306 tokio::task::spawn_blocking(move || {
308 let tmp_meta = tmp_dir.join(format!("{}.meta.tmp", temp_id));
309 let tmp_body = tmp_dir.join(format!("{}.body.tmp", temp_id));
310
311 if let Err(e) = std::fs::write(&tmp_meta, &meta) {
313 error!(path = %tmp_meta.display(), error = %e, "Failed to write tmp meta");
314 let _ = std::fs::remove_file(&tmp_meta);
315 return Err(Error::explain(
316 ErrorType::WriteError,
317 format!("failed to write meta: {}", e),
318 ));
319 }
320
321 if let Err(e) = std::fs::write(&tmp_body, &body) {
323 error!(path = %tmp_body.display(), error = %e, "Failed to write tmp body");
324 let _ = std::fs::remove_file(&tmp_meta);
325 let _ = std::fs::remove_file(&tmp_body);
326 return Err(Error::explain(
327 ErrorType::WriteError,
328 format!("failed to write body: {}", e),
329 ));
330 }
331
332 if let Err(e) = std::fs::rename(&tmp_meta, &meta_path) {
334 error!(error = %e, "Failed to rename tmp meta to final path");
335 let _ = std::fs::remove_file(&tmp_meta);
336 let _ = std::fs::remove_file(&tmp_body);
337 return Err(Error::explain(
338 ErrorType::WriteError,
339 format!("failed to rename meta: {}", e),
340 ));
341 }
342
343 if let Err(e) = std::fs::rename(&tmp_body, &body_path) {
345 error!(error = %e, "Failed to rename tmp body to final path");
346 let _ = std::fs::remove_file(&meta_path);
348 return Err(Error::explain(
349 ErrorType::WriteError,
350 format!("failed to rename body: {}", e),
351 ));
352 }
353
354 Ok(())
355 })
356 .await
357 .map_err(|e| {
358 Error::explain(
359 ErrorType::InternalError,
360 format!("spawn_blocking join error: {}", e),
361 )
362 })??;
363
364 {
366 let mut inflight = self.inflight.write().await;
367 if let Some(set) = inflight.get_mut(&self.combined) {
368 set.remove(&self.temp_id);
369 if set.is_empty() {
370 inflight.remove(&self.combined);
371 }
372 }
373 }
374
375 debug!(combined = %self.combined, size, "Disk cache entry written");
376 Ok(MissFinishType::Created(size))
377 }
378}
379
380impl Drop for DiskMissHandler {
381 fn drop(&mut self) {
382 if !self.finished {
383 if let Ok(mut inflight) = self.inflight.try_write() {
386 if let Some(set) = inflight.get_mut(&self.combined) {
387 set.remove(&self.temp_id);
388 if set.is_empty() {
389 inflight.remove(&self.combined);
390 }
391 }
392 }
393 }
394 }
395}
396
397#[async_trait]
402impl Storage for DiskCacheStorage {
403 async fn lookup(
404 &'static self,
405 key: &CacheKey,
406 _trace: &SpanHandle,
407 ) -> Result<Option<(CacheMeta, HitHandler)>> {
408 let combined = key.combined();
409 let meta_path = self.meta_path(&combined);
410 let body_path = self.body_path(&combined);
411
412 let result =
413 tokio::task::spawn_blocking(move || -> Result<Option<(CacheMeta, HitHandler)>> {
414 let meta_data = match std::fs::read(&meta_path) {
415 Ok(d) => d,
416 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
417 Err(e) => {
418 debug!(error = %e, "Failed to read cache meta file");
419 return Ok(None);
420 }
421 };
422
423 let body_data = match std::fs::read(&body_path) {
424 Ok(d) => d,
425 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
426 Err(e) => {
427 debug!(error = %e, "Failed to read cache body file");
428 return Ok(None);
429 }
430 };
431
432 let meta = match deserialize_meta_from_disk(&meta_data) {
433 Ok(m) => m,
434 Err(e) => {
435 warn!(error = %e, "Corrupted cache meta, removing entry");
436 let _ = std::fs::remove_file(&meta_path);
437 let _ = std::fs::remove_file(&body_path);
438 return Ok(None);
439 }
440 };
441
442 let body_len = body_data.len();
443 let hit_handler = DiskHitHandler {
444 body: body_data,
445 meta_size: meta_data.len(),
446 done: false,
447 range_start: 0,
448 range_end: body_len,
449 };
450
451 Ok(Some((meta, Box::new(hit_handler) as HitHandler)))
452 })
453 .await
454 .map_err(|e| {
455 Error::explain(
456 ErrorType::InternalError,
457 format!("spawn_blocking join error: {}", e),
458 )
459 })??;
460
461 Ok(result)
462 }
463
464 async fn get_miss_handler(
465 &'static self,
466 key: &CacheKey,
467 meta: &CacheMeta,
468 _trace: &SpanHandle,
469 ) -> Result<MissHandler> {
470 let combined = key.combined();
471 let serialized_meta = serialize_meta_to_disk(meta)?;
472 let meta_path = self.meta_path(&combined);
473 let body_path = self.body_path(&combined);
474 let tmp_dir = self.tmp_dir_for_key(&combined);
475 let temp_id = self.next_temp_id.fetch_add(1, Ordering::Relaxed);
476
477 {
479 let mut inflight = self.inflight.write().await;
480 inflight
481 .entry(combined.clone())
482 .or_default()
483 .insert(temp_id);
484 }
485
486 Ok(Box::new(DiskMissHandler {
487 body_buffer: Vec::new(),
488 serialized_meta,
489 combined,
490 meta_path,
491 body_path,
492 tmp_dir,
493 temp_id,
494 inflight: self.inflight.clone(),
495 finished: false,
496 }))
497 }
498
499 async fn purge(
500 &'static self,
501 key: &CompactCacheKey,
502 _purge_type: PurgeType,
503 _trace: &SpanHandle,
504 ) -> Result<bool> {
505 let combined = key.combined();
506 let meta_path = self.meta_path(&combined);
507 let body_path = self.body_path(&combined);
508
509 let removed = tokio::task::spawn_blocking(move || {
510 let meta_removed = std::fs::remove_file(&meta_path).is_ok();
511 let body_removed = std::fs::remove_file(&body_path).is_ok();
512 meta_removed || body_removed
513 })
514 .await
515 .map_err(|e| {
516 Error::explain(
517 ErrorType::InternalError,
518 format!("spawn_blocking join error: {}", e),
519 )
520 })?;
521
522 {
524 let mut inflight = self.inflight.write().await;
525 inflight.remove(&combined);
526 }
527
528 Ok(removed)
529 }
530
531 async fn update_meta(
532 &'static self,
533 key: &CacheKey,
534 meta: &CacheMeta,
535 _trace: &SpanHandle,
536 ) -> Result<bool> {
537 let combined = key.combined();
538 let serialized = serialize_meta_to_disk(meta)?;
539 let meta_path = self.meta_path(&combined);
540 let tmp_dir = self.tmp_dir_for_key(&combined);
541
542 tokio::task::spawn_blocking(move || {
543 let tmp_path = tmp_dir.join(format!("{}.meta.update.tmp", combined));
545 std::fs::write(&tmp_path, &serialized).map_err(|e| {
546 Error::explain(
547 ErrorType::WriteError,
548 format!("failed to write updated meta: {}", e),
549 )
550 })?;
551 std::fs::rename(&tmp_path, &meta_path).map_err(|e| {
552 let _ = std::fs::remove_file(&tmp_path);
553 Error::explain(
554 ErrorType::WriteError,
555 format!("failed to rename updated meta: {}", e),
556 )
557 })?;
558 Ok(true)
559 })
560 .await
561 .map_err(|e| {
562 Error::explain(
563 ErrorType::InternalError,
564 format!("spawn_blocking join error: {}", e),
565 )
566 })?
567 }
568
569 fn support_streaming_partial_write(&self) -> bool {
570 false
571 }
572
573 fn as_any(&self) -> &(dyn Any + Send + Sync + 'static) {
574 self
575 }
576}
577
578pub async fn rebuild_eviction_state(
587 base_path: &Path,
588 num_shards: u32,
589 eviction: &'static pingora_cache::eviction::simple_lru::Manager,
590) {
591 let base = base_path.to_path_buf();
592 let result = tokio::task::spawn_blocking(move || {
593 let mut count = 0usize;
594 let mut total_size = 0usize;
595
596 for shard in 0..num_shards {
597 let shard_dir = base.join(format!("shard-{:02}", shard));
598
599 for prefix in 0..=255u8 {
600 let prefix_dir = shard_dir.join(format!("{:02x}", prefix));
601 let entries = match std::fs::read_dir(&prefix_dir) {
602 Ok(e) => e,
603 Err(_) => continue,
604 };
605
606 for entry in entries.flatten() {
607 let path = entry.path();
608 let ext = path.extension().and_then(|e| e.to_str());
609 if ext != Some("body") {
610 continue;
611 }
612
613 let stem = match path.file_stem().and_then(|s| s.to_str()) {
615 Some(s) => s.to_string(),
616 None => continue,
617 };
618
619 let body_size = match std::fs::metadata(&path) {
621 Ok(m) => m.len() as usize,
622 Err(_) => continue,
623 };
624
625 let meta_path = prefix_dir.join(format!("{}.meta", stem));
627 let meta_size = std::fs::metadata(&meta_path)
628 .map(|m| m.len() as usize)
629 .unwrap_or(0);
630
631 let size = body_size + meta_size;
632
633 if let Some(primary) = pingora_cache::key::str2hex(&stem) {
635 let compact = CompactCacheKey {
636 primary,
637 variance: None,
638 user_tag: "".into(),
639 };
640
641 let _ = eviction.admit(
644 compact,
645 size,
646 std::time::SystemTime::now() + std::time::Duration::from_secs(3600),
647 );
648
649 count += 1;
650 total_size += size;
651 }
652 }
653 }
654 }
655
656 (count, total_size)
657 })
658 .await;
659
660 match result {
661 Ok((count, total_size)) => {
662 info!(
663 entries = count,
664 total_size_mb = total_size / 1024 / 1024,
665 "Rebuilt disk cache eviction state"
666 );
667 }
668 Err(e) => {
669 error!(error = %e, "Failed to rebuild disk cache eviction state");
670 }
671 }
672}
673
674#[cfg(test)]
679mod tests {
680 use super::*;
681 use once_cell::sync::Lazy;
682 use pingora_cache::trace::Span;
683 use pingora_http::ResponseHeader;
684 use std::time::SystemTime;
685 use tempfile::TempDir;
686
687 fn create_test_meta() -> CacheMeta {
688 let mut header = ResponseHeader::build(200, None).unwrap();
689 header.append_header("content-type", "text/plain").unwrap();
690 header.append_header("x-test", "disk-cache").unwrap();
691 CacheMeta::new(
692 SystemTime::now() + std::time::Duration::from_secs(3600),
693 SystemTime::now(),
694 60,
695 300,
696 header,
697 )
698 }
699
700 fn span() -> SpanHandle {
701 Span::inactive().handle()
702 }
703
704 #[test]
705 fn test_directory_creation() {
706 let tmp = TempDir::new().unwrap();
707 let _storage = DiskCacheStorage::new(tmp.path(), 4, 100 * 1024 * 1024);
708
709 for shard in 0..4u32 {
711 let shard_dir = tmp.path().join(format!("shard-{:02}", shard));
712 assert!(shard_dir.is_dir(), "shard dir should exist");
713
714 assert!(shard_dir.join("00").is_dir());
716 assert!(shard_dir.join("ff").is_dir());
717 assert!(shard_dir.join("a5").is_dir());
718
719 assert!(shard_dir.join("tmp").is_dir());
721 }
722
723 assert!(!tmp.path().join("shard-04").exists());
725 }
726
727 #[test]
728 fn test_path_helpers() {
729 let tmp = TempDir::new().unwrap();
730 let storage = DiskCacheStorage::new(tmp.path(), 16, 100 * 1024 * 1024);
731
732 let combined = "abcd1234567890abcdef1234567890ab";
734
735 let shard = DiskCacheStorage::shard_for_key(combined, 16);
736 assert_eq!(shard, 0xab % 16); let prefix = DiskCacheStorage::prefix_for_key(combined);
739 assert_eq!(prefix, "cd");
740
741 let meta = storage.meta_path(combined);
742 assert!(meta.to_str().unwrap().contains("shard-11"));
743 assert!(meta.to_str().unwrap().contains("/cd/"));
744 assert!(meta.to_str().unwrap().ends_with(".meta"));
745
746 let body = storage.body_path(combined);
747 assert!(body.to_str().unwrap().contains("shard-11"));
748 assert!(body.to_str().unwrap().contains("/cd/"));
749 assert!(body.to_str().unwrap().ends_with(".body"));
750 }
751
752 #[tokio::test]
753 async fn test_write_and_read() {
754 static STORAGE: Lazy<DiskCacheStorage> = Lazy::new(|| {
755 let path = std::env::temp_dir().join("grapsus-disk-cache-test-write-read");
756 let _ = std::fs::remove_dir_all(&path);
757 DiskCacheStorage::new(&path, 4, 100 * 1024 * 1024)
758 });
759 let trace = &span();
760
761 let key = CacheKey::new("", "test-write-read", "1");
762 let meta = create_test_meta();
763
764 let result = STORAGE.lookup(&key, trace).await.unwrap();
766 assert!(result.is_none());
767
768 let mut miss_handler = STORAGE.get_miss_handler(&key, &meta, trace).await.unwrap();
770 miss_handler
771 .write_body(b"hello "[..].into(), false)
772 .await
773 .unwrap();
774 miss_handler
775 .write_body(b"world"[..].into(), true)
776 .await
777 .unwrap();
778 let finish_result = miss_handler.finish().await.unwrap();
779 assert!(matches!(finish_result, MissFinishType::Created(_)));
780
781 let (read_meta, mut hit_handler) = STORAGE.lookup(&key, trace).await.unwrap().unwrap();
783 assert_eq!(read_meta.response_header().status.as_u16(), 200);
784
785 let body = hit_handler.read_body().await.unwrap().unwrap();
786 assert_eq!(body.as_ref(), b"hello world");
787
788 let body2 = hit_handler.read_body().await.unwrap();
790 assert!(body2.is_none());
791
792 let _ = std::fs::remove_dir_all(
794 std::env::temp_dir().join("grapsus-disk-cache-test-write-read"),
795 );
796 }
797
798 #[tokio::test]
799 async fn test_purge() {
800 static STORAGE: Lazy<DiskCacheStorage> = Lazy::new(|| {
801 let path = std::env::temp_dir().join("grapsus-disk-cache-test-purge");
802 let _ = std::fs::remove_dir_all(&path);
803 DiskCacheStorage::new(&path, 4, 100 * 1024 * 1024)
804 });
805 let trace = &span();
806
807 let key = CacheKey::new("", "test-purge", "1");
808 let meta = create_test_meta();
809
810 let mut miss_handler = STORAGE.get_miss_handler(&key, &meta, trace).await.unwrap();
812 miss_handler
813 .write_body(b"purge-me"[..].into(), true)
814 .await
815 .unwrap();
816 miss_handler.finish().await.unwrap();
817
818 assert!(STORAGE.lookup(&key, trace).await.unwrap().is_some());
820
821 let compact = key.to_compact();
823 let purged = STORAGE
824 .purge(&compact, PurgeType::Invalidation, trace)
825 .await
826 .unwrap();
827 assert!(purged);
828
829 assert!(STORAGE.lookup(&key, trace).await.unwrap().is_none());
831
832 let _ =
834 std::fs::remove_dir_all(std::env::temp_dir().join("grapsus-disk-cache-test-purge"));
835 }
836
837 #[tokio::test]
838 async fn test_update_meta() {
839 static STORAGE: Lazy<DiskCacheStorage> = Lazy::new(|| {
840 let path = std::env::temp_dir().join("grapsus-disk-cache-test-update-meta");
841 let _ = std::fs::remove_dir_all(&path);
842 DiskCacheStorage::new(&path, 4, 100 * 1024 * 1024)
843 });
844 let trace = &span();
845
846 let key = CacheKey::new("", "test-update-meta", "1");
847 let meta = create_test_meta();
848
849 let mut miss_handler = STORAGE.get_miss_handler(&key, &meta, trace).await.unwrap();
851 miss_handler
852 .write_body(b"body-data"[..].into(), true)
853 .await
854 .unwrap();
855 miss_handler.finish().await.unwrap();
856
857 let mut new_header = ResponseHeader::build(200, None).unwrap();
859 new_header
860 .append_header("content-type", "application/json")
861 .unwrap();
862 new_header.append_header("x-updated", "true").unwrap();
863 let new_meta = CacheMeta::new(
864 SystemTime::now() + std::time::Duration::from_secs(7200),
865 SystemTime::now(),
866 120,
867 600,
868 new_header,
869 );
870
871 let updated = STORAGE.update_meta(&key, &new_meta, trace).await.unwrap();
873 assert!(updated);
874
875 let (read_meta, _hit) = STORAGE.lookup(&key, trace).await.unwrap().unwrap();
877 let headers = read_meta.response_header().headers.clone();
878 assert_eq!(headers.get("x-updated").unwrap().to_str().unwrap(), "true");
879
880 let _ = std::fs::remove_dir_all(
882 std::env::temp_dir().join("grapsus-disk-cache-test-update-meta"),
883 );
884 }
885
886 #[tokio::test]
887 async fn test_miss_handler_drop() {
888 static STORAGE: Lazy<DiskCacheStorage> = Lazy::new(|| {
889 let path = std::env::temp_dir().join("grapsus-disk-cache-test-miss-drop");
890 let _ = std::fs::remove_dir_all(&path);
891 DiskCacheStorage::new(&path, 4, 100 * 1024 * 1024)
892 });
893 let trace = &span();
894
895 let key = CacheKey::new("", "test-miss-drop", "1");
896 let meta = create_test_meta();
897
898 {
900 let mut miss_handler = STORAGE.get_miss_handler(&key, &meta, trace).await.unwrap();
901 miss_handler
902 .write_body(b"incomplete"[..].into(), false)
903 .await
904 .unwrap();
905 }
907
908 assert!(STORAGE.lookup(&key, trace).await.unwrap().is_none());
910
911 assert!(STORAGE.inflight.read().await.is_empty());
913
914 let _ = std::fs::remove_dir_all(
916 std::env::temp_dir().join("grapsus-disk-cache-test-miss-drop"),
917 );
918 }
919
920 #[tokio::test]
921 async fn test_corrupted_meta() {
922 static STORAGE: Lazy<DiskCacheStorage> = Lazy::new(|| {
923 let path = std::env::temp_dir().join("grapsus-disk-cache-test-corrupted");
924 let _ = std::fs::remove_dir_all(&path);
925 DiskCacheStorage::new(&path, 4, 100 * 1024 * 1024)
926 });
927 let trace = &span();
928
929 let key = CacheKey::new("", "test-corrupted", "1");
930 let combined = key.combined();
931
932 let meta_path = STORAGE.meta_path(&combined);
934 let body_path = STORAGE.body_path(&combined);
935 std::fs::write(&meta_path, b"not-valid-meta-data").unwrap();
936 std::fs::write(&body_path, b"some-body").unwrap();
937
938 let result = STORAGE.lookup(&key, trace).await.unwrap();
940 assert!(result.is_none());
941
942 assert!(!meta_path.exists());
944 assert!(!body_path.exists());
945
946 let _ = std::fs::remove_dir_all(
948 std::env::temp_dir().join("grapsus-disk-cache-test-corrupted"),
949 );
950 }
951
952 #[test]
953 fn test_orphan_cleanup() {
954 let tmp = TempDir::new().unwrap();
955
956 let shard_tmp = tmp.path().join("shard-00").join("tmp");
958 std::fs::create_dir_all(&shard_tmp).unwrap();
959 std::fs::write(shard_tmp.join("orphan1.tmp"), b"data1").unwrap();
960 std::fs::write(shard_tmp.join("orphan2.tmp"), b"data2").unwrap();
961 std::fs::write(shard_tmp.join("keep.txt"), b"keep").unwrap();
963
964 assert!(shard_tmp.join("orphan1.tmp").exists());
965 assert!(shard_tmp.join("orphan2.tmp").exists());
966
967 let _storage = DiskCacheStorage::new(tmp.path(), 4, 100 * 1024 * 1024);
969
970 assert!(!shard_tmp.join("orphan1.tmp").exists());
971 assert!(!shard_tmp.join("orphan2.tmp").exists());
972 assert!(shard_tmp.join("keep.txt").exists());
973 }
974
975 #[test]
976 fn test_meta_serialization_roundtrip() {
977 let meta = create_test_meta();
978 let serialized = serialize_meta_to_disk(&meta).unwrap();
979 let deserialized = deserialize_meta_from_disk(&serialized).unwrap();
980
981 assert_eq!(
982 meta.response_header().status.as_u16(),
983 deserialized.response_header().status.as_u16(),
984 );
985 }
986}