Skip to main content

grapsus_proxy/
disk_cache.rs

1//! Disk-based cache storage backend
2//!
3//! Implements Pingora's `Storage` trait using the local filesystem. Each cached
4//! response is stored as a pair of files (`.meta` + `.body`) distributed across
5//! sharded subdirectories to keep per-directory inode counts manageable.
6//!
7//! # Directory layout
8//!
9//! ```text
10//! <base_path>/
11//!   shard-00/
12//!     <2-char-hex-prefix>/
13//!       <combined-hex-hash>.meta
14//!       <combined-hex-hash>.body
15//!     tmp/
16//!   shard-01/
17//!     ...
18//! ```
19
20use async_trait::async_trait;
21use bytes::Bytes;
22use pingora_cache::eviction::EvictionManager;
23use pingora_cache::key::{CacheHashKey, CacheKey, CompactCacheKey};
24use pingora_cache::meta::CacheMeta;
25use pingora_cache::storage::{
26    HandleHit, HandleMiss, HitHandler, MissFinishType, MissHandler, PurgeType, Storage,
27};
28use pingora_cache::trace::SpanHandle;
29use pingora_core::{Error, ErrorType, Result};
30use std::any::Any;
31use std::collections::{HashMap, HashSet};
32use std::path::{Path, PathBuf};
33use std::sync::atomic::{AtomicU64, Ordering};
34use std::sync::Arc;
35use tokio::sync::RwLock;
36use tracing::{debug, error, info, warn};
37
38// ============================================================================
39// DiskCacheStorage
40// ============================================================================
41
42/// Disk-based cache storage backend implementing Pingora's `Storage` trait.
43///
44/// All disk I/O is performed via `tokio::task::spawn_blocking` to avoid
45/// blocking the async runtime.
46pub struct DiskCacheStorage {
47    base_path: PathBuf,
48    num_shards: u32,
49    #[allow(dead_code)]
50    max_size_bytes: usize,
51    /// Tracks in-flight writes: combined_hash -> set of temp_ids
52    inflight: Arc<RwLock<HashMap<String, HashSet<u64>>>>,
53    next_temp_id: AtomicU64,
54}
55
56impl DiskCacheStorage {
57    /// Create a new `DiskCacheStorage`.
58    ///
59    /// Creates the shard directory structure and cleans up any orphaned `.tmp`
60    /// files left behind by interrupted writes.
61    pub fn new(path: &Path, shards: u32, max_size: usize) -> Self {
62        let base = path.to_path_buf();
63
64        // Create shard dirs, hex-prefix subdirs, and tmp dirs
65        for shard in 0..shards {
66            let shard_dir = base.join(format!("shard-{:02}", shard));
67
68            // Create all 256 hex-prefix subdirs
69            for prefix in 0..=255u8 {
70                let prefix_dir = shard_dir.join(format!("{:02x}", prefix));
71                if let Err(e) = std::fs::create_dir_all(&prefix_dir) {
72                    error!(path = %prefix_dir.display(), error = %e, "Failed to create prefix dir");
73                }
74            }
75
76            // Create tmp dir and clean orphaned files
77            let tmp_dir = shard_dir.join("tmp");
78            if let Err(e) = std::fs::create_dir_all(&tmp_dir) {
79                error!(path = %tmp_dir.display(), error = %e, "Failed to create tmp dir");
80            } else {
81                Self::clean_orphaned_tmp(&tmp_dir);
82            }
83        }
84
85        info!(
86            path = %base.display(),
87            shards,
88            max_size_mb = max_size / 1024 / 1024,
89            "Disk cache storage initialized"
90        );
91
92        Self {
93            base_path: base,
94            num_shards: shards,
95            max_size_bytes: max_size,
96            inflight: Arc::new(RwLock::new(HashMap::new())),
97            next_temp_id: AtomicU64::new(1),
98        }
99    }
100
101    /// Remove orphaned .tmp files from a tmp directory.
102    fn clean_orphaned_tmp(tmp_dir: &Path) {
103        let entries = match std::fs::read_dir(tmp_dir) {
104            Ok(e) => e,
105            Err(_) => return,
106        };
107        let mut cleaned = 0u64;
108        for entry in entries.flatten() {
109            let path = entry.path();
110            if path.extension().and_then(|e| e.to_str()) == Some("tmp") {
111                if let Err(e) = std::fs::remove_file(&path) {
112                    warn!(path = %path.display(), error = %e, "Failed to clean orphaned tmp file");
113                } else {
114                    cleaned += 1;
115                }
116            }
117        }
118        if cleaned > 0 {
119            info!(dir = %tmp_dir.display(), cleaned, "Cleaned orphaned tmp files");
120        }
121    }
122
123    // ========================================================================
124    // Path helpers
125    // ========================================================================
126
127    /// Compute the shard index for a combined hex hash string.
128    fn shard_for_key(combined: &str, num_shards: u32) -> u32 {
129        // Use first two hex chars (one byte) to determine shard
130        let byte = u8::from_str_radix(&combined[..2], 16).unwrap_or(0);
131        (byte as u32) % num_shards
132    }
133
134    /// Compute the 2-char hex prefix subdirectory for a combined hex hash.
135    fn prefix_for_key(combined: &str) -> &str {
136        // Use chars 2..4 (second byte) as prefix subdir
137        &combined[2..4]
138    }
139
140    /// Full path to the `.meta` file for a given combined hex hash.
141    fn meta_path(&self, combined: &str) -> PathBuf {
142        let shard = Self::shard_for_key(combined, self.num_shards);
143        let prefix = Self::prefix_for_key(combined);
144        self.base_path
145            .join(format!("shard-{:02}", shard))
146            .join(prefix)
147            .join(format!("{}.meta", combined))
148    }
149
150    /// Full path to the `.body` file for a given combined hex hash.
151    fn body_path(&self, combined: &str) -> PathBuf {
152        let shard = Self::shard_for_key(combined, self.num_shards);
153        let prefix = Self::prefix_for_key(combined);
154        self.base_path
155            .join(format!("shard-{:02}", shard))
156            .join(prefix)
157            .join(format!("{}.body", combined))
158    }
159
160    /// Path to the tmp directory for a given combined hex hash (shard-local).
161    fn tmp_dir_for_key(&self, combined: &str) -> PathBuf {
162        let shard = Self::shard_for_key(combined, self.num_shards);
163        self.base_path
164            .join(format!("shard-{:02}", shard))
165            .join("tmp")
166    }
167}
168
169// ============================================================================
170// Meta file serialization helpers
171// ============================================================================
172
173/// Serialize CacheMeta to the on-disk format.
174///
175/// Format: `[4 bytes: internal_meta_len as u32 LE][internal_meta bytes][header bytes]`
176fn serialize_meta_to_disk(meta: &CacheMeta) -> Result<Vec<u8>> {
177    let (internal, header) = meta.serialize()?;
178    let internal_len = internal.len() as u32;
179    let mut buf = Vec::with_capacity(4 + internal.len() + header.len());
180    buf.extend_from_slice(&internal_len.to_le_bytes());
181    buf.extend_from_slice(&internal);
182    buf.extend_from_slice(&header);
183    Ok(buf)
184}
185
186/// Deserialize CacheMeta from the on-disk format.
187fn deserialize_meta_from_disk(data: &[u8]) -> Result<CacheMeta> {
188    if data.len() < 4 {
189        return Error::e_explain(ErrorType::FileReadError, "meta file too short");
190    }
191    let internal_len = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
192    if data.len() < 4 + internal_len {
193        return Error::e_explain(ErrorType::FileReadError, "meta file truncated");
194    }
195    let internal = &data[4..4 + internal_len];
196    let header = &data[4 + internal_len..];
197    CacheMeta::deserialize(internal, header)
198}
199
200// ============================================================================
201// DiskHitHandler
202// ============================================================================
203
204/// Hit handler for disk cache lookups.
205///
206/// Loads the full body into memory for seekable access.
207pub struct DiskHitHandler {
208    body: Vec<u8>,
209    meta_size: usize,
210    done: bool,
211    range_start: usize,
212    range_end: usize,
213}
214
215#[async_trait]
216impl HandleHit for DiskHitHandler {
217    async fn read_body(&mut self) -> Result<Option<Bytes>> {
218        if self.done {
219            return Ok(None);
220        }
221        self.done = true;
222        Ok(Some(Bytes::copy_from_slice(
223            &self.body[self.range_start..self.range_end],
224        )))
225    }
226
227    async fn finish(
228        self: Box<Self>,
229        _storage: &'static (dyn Storage + Sync),
230        _key: &CacheKey,
231        _trace: &SpanHandle,
232    ) -> Result<()> {
233        Ok(())
234    }
235
236    fn can_seek(&self) -> bool {
237        true
238    }
239
240    fn seek(&mut self, start: usize, end: Option<usize>) -> Result<()> {
241        if start >= self.body.len() {
242            return Error::e_explain(
243                ErrorType::InternalError,
244                format!("seek start out of range {} >= {}", start, self.body.len()),
245            );
246        }
247        self.range_start = start;
248        if let Some(end) = end {
249            self.range_end = std::cmp::min(self.body.len(), end);
250        }
251        self.done = false;
252        Ok(())
253    }
254
255    fn get_eviction_weight(&self) -> usize {
256        self.meta_size + self.body.len()
257    }
258
259    fn as_any(&self) -> &(dyn Any + Send + Sync) {
260        self
261    }
262
263    fn as_any_mut(&mut self) -> &mut (dyn Any + Send + Sync) {
264        self
265    }
266}
267
268// ============================================================================
269// DiskMissHandler
270// ============================================================================
271
272/// Miss handler for disk cache writes.
273///
274/// Accumulates the response body in memory, then atomically writes both
275/// `.meta` and `.body` files to disk via temp-file + rename.
276pub struct DiskMissHandler {
277    body_buffer: Vec<u8>,
278    serialized_meta: Vec<u8>,
279    combined: String,
280    meta_path: PathBuf,
281    body_path: PathBuf,
282    tmp_dir: PathBuf,
283    temp_id: u64,
284    inflight: Arc<RwLock<HashMap<String, HashSet<u64>>>>,
285    finished: bool,
286}
287
288#[async_trait]
289impl HandleMiss for DiskMissHandler {
290    async fn write_body(&mut self, data: Bytes, _eof: bool) -> Result<()> {
291        self.body_buffer.extend_from_slice(&data);
292        Ok(())
293    }
294
295    async fn finish(mut self: Box<Self>) -> Result<MissFinishType> {
296        self.finished = true;
297        let body = std::mem::take(&mut self.body_buffer);
298        let meta = self.serialized_meta.clone();
299        let meta_path = self.meta_path.clone();
300        let body_path = self.body_path.clone();
301        let tmp_dir = self.tmp_dir.clone();
302        let temp_id = self.temp_id;
303
304        let size = meta.len() + body.len();
305
306        // Write to disk via spawn_blocking
307        tokio::task::spawn_blocking(move || {
308            let tmp_meta = tmp_dir.join(format!("{}.meta.tmp", temp_id));
309            let tmp_body = tmp_dir.join(format!("{}.body.tmp", temp_id));
310
311            // Write meta temp file
312            if let Err(e) = std::fs::write(&tmp_meta, &meta) {
313                error!(path = %tmp_meta.display(), error = %e, "Failed to write tmp meta");
314                let _ = std::fs::remove_file(&tmp_meta);
315                return Err(Error::explain(
316                    ErrorType::WriteError,
317                    format!("failed to write meta: {}", e),
318                ));
319            }
320
321            // Write body temp file
322            if let Err(e) = std::fs::write(&tmp_body, &body) {
323                error!(path = %tmp_body.display(), error = %e, "Failed to write tmp body");
324                let _ = std::fs::remove_file(&tmp_meta);
325                let _ = std::fs::remove_file(&tmp_body);
326                return Err(Error::explain(
327                    ErrorType::WriteError,
328                    format!("failed to write body: {}", e),
329                ));
330            }
331
332            // Atomic rename meta
333            if let Err(e) = std::fs::rename(&tmp_meta, &meta_path) {
334                error!(error = %e, "Failed to rename tmp meta to final path");
335                let _ = std::fs::remove_file(&tmp_meta);
336                let _ = std::fs::remove_file(&tmp_body);
337                return Err(Error::explain(
338                    ErrorType::WriteError,
339                    format!("failed to rename meta: {}", e),
340                ));
341            }
342
343            // Atomic rename body
344            if let Err(e) = std::fs::rename(&tmp_body, &body_path) {
345                error!(error = %e, "Failed to rename tmp body to final path");
346                // Meta already renamed; remove it to stay consistent
347                let _ = std::fs::remove_file(&meta_path);
348                return Err(Error::explain(
349                    ErrorType::WriteError,
350                    format!("failed to rename body: {}", e),
351                ));
352            }
353
354            Ok(())
355        })
356        .await
357        .map_err(|e| {
358            Error::explain(
359                ErrorType::InternalError,
360                format!("spawn_blocking join error: {}", e),
361            )
362        })??;
363
364        // Remove from inflight tracking
365        {
366            let mut inflight = self.inflight.write().await;
367            if let Some(set) = inflight.get_mut(&self.combined) {
368                set.remove(&self.temp_id);
369                if set.is_empty() {
370                    inflight.remove(&self.combined);
371                }
372            }
373        }
374
375        debug!(combined = %self.combined, size, "Disk cache entry written");
376        Ok(MissFinishType::Created(size))
377    }
378}
379
380impl Drop for DiskMissHandler {
381    fn drop(&mut self) {
382        if !self.finished {
383            // Clean up inflight tracking if finish() was never called.
384            // We can't do async in Drop, so use try_write.
385            if let Ok(mut inflight) = self.inflight.try_write() {
386                if let Some(set) = inflight.get_mut(&self.combined) {
387                    set.remove(&self.temp_id);
388                    if set.is_empty() {
389                        inflight.remove(&self.combined);
390                    }
391                }
392            }
393        }
394    }
395}
396
397// ============================================================================
398// Storage trait implementation
399// ============================================================================
400
401#[async_trait]
402impl Storage for DiskCacheStorage {
403    async fn lookup(
404        &'static self,
405        key: &CacheKey,
406        _trace: &SpanHandle,
407    ) -> Result<Option<(CacheMeta, HitHandler)>> {
408        let combined = key.combined();
409        let meta_path = self.meta_path(&combined);
410        let body_path = self.body_path(&combined);
411
412        let result =
413            tokio::task::spawn_blocking(move || -> Result<Option<(CacheMeta, HitHandler)>> {
414                let meta_data = match std::fs::read(&meta_path) {
415                    Ok(d) => d,
416                    Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
417                    Err(e) => {
418                        debug!(error = %e, "Failed to read cache meta file");
419                        return Ok(None);
420                    }
421                };
422
423                let body_data = match std::fs::read(&body_path) {
424                    Ok(d) => d,
425                    Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
426                    Err(e) => {
427                        debug!(error = %e, "Failed to read cache body file");
428                        return Ok(None);
429                    }
430                };
431
432                let meta = match deserialize_meta_from_disk(&meta_data) {
433                    Ok(m) => m,
434                    Err(e) => {
435                        warn!(error = %e, "Corrupted cache meta, removing entry");
436                        let _ = std::fs::remove_file(&meta_path);
437                        let _ = std::fs::remove_file(&body_path);
438                        return Ok(None);
439                    }
440                };
441
442                let body_len = body_data.len();
443                let hit_handler = DiskHitHandler {
444                    body: body_data,
445                    meta_size: meta_data.len(),
446                    done: false,
447                    range_start: 0,
448                    range_end: body_len,
449                };
450
451                Ok(Some((meta, Box::new(hit_handler) as HitHandler)))
452            })
453            .await
454            .map_err(|e| {
455                Error::explain(
456                    ErrorType::InternalError,
457                    format!("spawn_blocking join error: {}", e),
458                )
459            })??;
460
461        Ok(result)
462    }
463
464    async fn get_miss_handler(
465        &'static self,
466        key: &CacheKey,
467        meta: &CacheMeta,
468        _trace: &SpanHandle,
469    ) -> Result<MissHandler> {
470        let combined = key.combined();
471        let serialized_meta = serialize_meta_to_disk(meta)?;
472        let meta_path = self.meta_path(&combined);
473        let body_path = self.body_path(&combined);
474        let tmp_dir = self.tmp_dir_for_key(&combined);
475        let temp_id = self.next_temp_id.fetch_add(1, Ordering::Relaxed);
476
477        // Register in inflight tracking
478        {
479            let mut inflight = self.inflight.write().await;
480            inflight
481                .entry(combined.clone())
482                .or_default()
483                .insert(temp_id);
484        }
485
486        Ok(Box::new(DiskMissHandler {
487            body_buffer: Vec::new(),
488            serialized_meta,
489            combined,
490            meta_path,
491            body_path,
492            tmp_dir,
493            temp_id,
494            inflight: self.inflight.clone(),
495            finished: false,
496        }))
497    }
498
499    async fn purge(
500        &'static self,
501        key: &CompactCacheKey,
502        _purge_type: PurgeType,
503        _trace: &SpanHandle,
504    ) -> Result<bool> {
505        let combined = key.combined();
506        let meta_path = self.meta_path(&combined);
507        let body_path = self.body_path(&combined);
508
509        let removed = tokio::task::spawn_blocking(move || {
510            let meta_removed = std::fs::remove_file(&meta_path).is_ok();
511            let body_removed = std::fs::remove_file(&body_path).is_ok();
512            meta_removed || body_removed
513        })
514        .await
515        .map_err(|e| {
516            Error::explain(
517                ErrorType::InternalError,
518                format!("spawn_blocking join error: {}", e),
519            )
520        })?;
521
522        // Also remove from inflight tracking
523        {
524            let mut inflight = self.inflight.write().await;
525            inflight.remove(&combined);
526        }
527
528        Ok(removed)
529    }
530
531    async fn update_meta(
532        &'static self,
533        key: &CacheKey,
534        meta: &CacheMeta,
535        _trace: &SpanHandle,
536    ) -> Result<bool> {
537        let combined = key.combined();
538        let serialized = serialize_meta_to_disk(meta)?;
539        let meta_path = self.meta_path(&combined);
540        let tmp_dir = self.tmp_dir_for_key(&combined);
541
542        tokio::task::spawn_blocking(move || {
543            // Atomic rewrite: write to tmp, rename over existing
544            let tmp_path = tmp_dir.join(format!("{}.meta.update.tmp", combined));
545            std::fs::write(&tmp_path, &serialized).map_err(|e| {
546                Error::explain(
547                    ErrorType::WriteError,
548                    format!("failed to write updated meta: {}", e),
549                )
550            })?;
551            std::fs::rename(&tmp_path, &meta_path).map_err(|e| {
552                let _ = std::fs::remove_file(&tmp_path);
553                Error::explain(
554                    ErrorType::WriteError,
555                    format!("failed to rename updated meta: {}", e),
556                )
557            })?;
558            Ok(true)
559        })
560        .await
561        .map_err(|e| {
562            Error::explain(
563                ErrorType::InternalError,
564                format!("spawn_blocking join error: {}", e),
565            )
566        })?
567    }
568
569    fn support_streaming_partial_write(&self) -> bool {
570        false
571    }
572
573    fn as_any(&self) -> &(dyn Any + Send + Sync + 'static) {
574        self
575    }
576}
577
578// ============================================================================
579// Eviction state rebuild
580// ============================================================================
581
582/// Scan disk entries and register them with the eviction manager.
583///
584/// This is called at startup to rebuild the LRU eviction state from the
585/// files on disk.
586pub async fn rebuild_eviction_state(
587    base_path: &Path,
588    num_shards: u32,
589    eviction: &'static pingora_cache::eviction::simple_lru::Manager,
590) {
591    let base = base_path.to_path_buf();
592    let result = tokio::task::spawn_blocking(move || {
593        let mut count = 0usize;
594        let mut total_size = 0usize;
595
596        for shard in 0..num_shards {
597            let shard_dir = base.join(format!("shard-{:02}", shard));
598
599            for prefix in 0..=255u8 {
600                let prefix_dir = shard_dir.join(format!("{:02x}", prefix));
601                let entries = match std::fs::read_dir(&prefix_dir) {
602                    Ok(e) => e,
603                    Err(_) => continue,
604                };
605
606                for entry in entries.flatten() {
607                    let path = entry.path();
608                    let ext = path.extension().and_then(|e| e.to_str());
609                    if ext != Some("body") {
610                        continue;
611                    }
612
613                    // Extract combined hash from filename
614                    let stem = match path.file_stem().and_then(|s| s.to_str()) {
615                        Some(s) => s.to_string(),
616                        None => continue,
617                    };
618
619                    // Get file size for weight
620                    let body_size = match std::fs::metadata(&path) {
621                        Ok(m) => m.len() as usize,
622                        Err(_) => continue,
623                    };
624
625                    // Also add meta size
626                    let meta_path = prefix_dir.join(format!("{}.meta", stem));
627                    let meta_size = std::fs::metadata(&meta_path)
628                        .map(|m| m.len() as usize)
629                        .unwrap_or(0);
630
631                    let size = body_size + meta_size;
632
633                    // Reconstruct CompactCacheKey from the combined hex hash
634                    if let Some(primary) = pingora_cache::key::str2hex(&stem) {
635                        let compact = CompactCacheKey {
636                            primary,
637                            variance: None,
638                            user_tag: "".into(),
639                        };
640
641                        // Admit to eviction manager (use epoch as fresh_until since
642                        // we don't know the actual TTL without parsing meta)
643                        let _ = eviction.admit(
644                            compact,
645                            size,
646                            std::time::SystemTime::now() + std::time::Duration::from_secs(3600),
647                        );
648
649                        count += 1;
650                        total_size += size;
651                    }
652                }
653            }
654        }
655
656        (count, total_size)
657    })
658    .await;
659
660    match result {
661        Ok((count, total_size)) => {
662            info!(
663                entries = count,
664                total_size_mb = total_size / 1024 / 1024,
665                "Rebuilt disk cache eviction state"
666            );
667        }
668        Err(e) => {
669            error!(error = %e, "Failed to rebuild disk cache eviction state");
670        }
671    }
672}
673
674// ============================================================================
675// Tests
676// ============================================================================
677
678#[cfg(test)]
679mod tests {
680    use super::*;
681    use once_cell::sync::Lazy;
682    use pingora_cache::trace::Span;
683    use pingora_http::ResponseHeader;
684    use std::time::SystemTime;
685    use tempfile::TempDir;
686
687    fn create_test_meta() -> CacheMeta {
688        let mut header = ResponseHeader::build(200, None).unwrap();
689        header.append_header("content-type", "text/plain").unwrap();
690        header.append_header("x-test", "disk-cache").unwrap();
691        CacheMeta::new(
692            SystemTime::now() + std::time::Duration::from_secs(3600),
693            SystemTime::now(),
694            60,
695            300,
696            header,
697        )
698    }
699
700    fn span() -> SpanHandle {
701        Span::inactive().handle()
702    }
703
704    #[test]
705    fn test_directory_creation() {
706        let tmp = TempDir::new().unwrap();
707        let _storage = DiskCacheStorage::new(tmp.path(), 4, 100 * 1024 * 1024);
708
709        // Verify shard dirs exist
710        for shard in 0..4u32 {
711            let shard_dir = tmp.path().join(format!("shard-{:02}", shard));
712            assert!(shard_dir.is_dir(), "shard dir should exist");
713
714            // Verify some prefix dirs exist
715            assert!(shard_dir.join("00").is_dir());
716            assert!(shard_dir.join("ff").is_dir());
717            assert!(shard_dir.join("a5").is_dir());
718
719            // Verify tmp dir exists
720            assert!(shard_dir.join("tmp").is_dir());
721        }
722
723        // Shard-04 should not exist
724        assert!(!tmp.path().join("shard-04").exists());
725    }
726
727    #[test]
728    fn test_path_helpers() {
729        let tmp = TempDir::new().unwrap();
730        let storage = DiskCacheStorage::new(tmp.path(), 16, 100 * 1024 * 1024);
731
732        // "ab" prefix -> shard = 0xab % 16 = 11, prefix subdir = second byte
733        let combined = "abcd1234567890abcdef1234567890ab";
734
735        let shard = DiskCacheStorage::shard_for_key(combined, 16);
736        assert_eq!(shard, 0xab % 16); // 171 % 16 = 11
737
738        let prefix = DiskCacheStorage::prefix_for_key(combined);
739        assert_eq!(prefix, "cd");
740
741        let meta = storage.meta_path(combined);
742        assert!(meta.to_str().unwrap().contains("shard-11"));
743        assert!(meta.to_str().unwrap().contains("/cd/"));
744        assert!(meta.to_str().unwrap().ends_with(".meta"));
745
746        let body = storage.body_path(combined);
747        assert!(body.to_str().unwrap().contains("shard-11"));
748        assert!(body.to_str().unwrap().contains("/cd/"));
749        assert!(body.to_str().unwrap().ends_with(".body"));
750    }
751
752    #[tokio::test]
753    async fn test_write_and_read() {
754        static STORAGE: Lazy<DiskCacheStorage> = Lazy::new(|| {
755            let path = std::env::temp_dir().join("grapsus-disk-cache-test-write-read");
756            let _ = std::fs::remove_dir_all(&path);
757            DiskCacheStorage::new(&path, 4, 100 * 1024 * 1024)
758        });
759        let trace = &span();
760
761        let key = CacheKey::new("", "test-write-read", "1");
762        let meta = create_test_meta();
763
764        // Lookup should return None initially
765        let result = STORAGE.lookup(&key, trace).await.unwrap();
766        assert!(result.is_none());
767
768        // Write via miss handler
769        let mut miss_handler = STORAGE.get_miss_handler(&key, &meta, trace).await.unwrap();
770        miss_handler
771            .write_body(b"hello "[..].into(), false)
772            .await
773            .unwrap();
774        miss_handler
775            .write_body(b"world"[..].into(), true)
776            .await
777            .unwrap();
778        let finish_result = miss_handler.finish().await.unwrap();
779        assert!(matches!(finish_result, MissFinishType::Created(_)));
780
781        // Lookup should now return the cached entry
782        let (read_meta, mut hit_handler) = STORAGE.lookup(&key, trace).await.unwrap().unwrap();
783        assert_eq!(read_meta.response_header().status.as_u16(), 200);
784
785        let body = hit_handler.read_body().await.unwrap().unwrap();
786        assert_eq!(body.as_ref(), b"hello world");
787
788        // Second read should return None
789        let body2 = hit_handler.read_body().await.unwrap();
790        assert!(body2.is_none());
791
792        // Cleanup
793        let _ = std::fs::remove_dir_all(
794            std::env::temp_dir().join("grapsus-disk-cache-test-write-read"),
795        );
796    }
797
798    #[tokio::test]
799    async fn test_purge() {
800        static STORAGE: Lazy<DiskCacheStorage> = Lazy::new(|| {
801            let path = std::env::temp_dir().join("grapsus-disk-cache-test-purge");
802            let _ = std::fs::remove_dir_all(&path);
803            DiskCacheStorage::new(&path, 4, 100 * 1024 * 1024)
804        });
805        let trace = &span();
806
807        let key = CacheKey::new("", "test-purge", "1");
808        let meta = create_test_meta();
809
810        // Write an entry
811        let mut miss_handler = STORAGE.get_miss_handler(&key, &meta, trace).await.unwrap();
812        miss_handler
813            .write_body(b"purge-me"[..].into(), true)
814            .await
815            .unwrap();
816        miss_handler.finish().await.unwrap();
817
818        // Verify it's there
819        assert!(STORAGE.lookup(&key, trace).await.unwrap().is_some());
820
821        // Purge it
822        let compact = key.to_compact();
823        let purged = STORAGE
824            .purge(&compact, PurgeType::Invalidation, trace)
825            .await
826            .unwrap();
827        assert!(purged);
828
829        // Verify it's gone
830        assert!(STORAGE.lookup(&key, trace).await.unwrap().is_none());
831
832        // Cleanup
833        let _ =
834            std::fs::remove_dir_all(std::env::temp_dir().join("grapsus-disk-cache-test-purge"));
835    }
836
837    #[tokio::test]
838    async fn test_update_meta() {
839        static STORAGE: Lazy<DiskCacheStorage> = Lazy::new(|| {
840            let path = std::env::temp_dir().join("grapsus-disk-cache-test-update-meta");
841            let _ = std::fs::remove_dir_all(&path);
842            DiskCacheStorage::new(&path, 4, 100 * 1024 * 1024)
843        });
844        let trace = &span();
845
846        let key = CacheKey::new("", "test-update-meta", "1");
847        let meta = create_test_meta();
848
849        // Write an entry
850        let mut miss_handler = STORAGE.get_miss_handler(&key, &meta, trace).await.unwrap();
851        miss_handler
852            .write_body(b"body-data"[..].into(), true)
853            .await
854            .unwrap();
855        miss_handler.finish().await.unwrap();
856
857        // Create updated meta with different header
858        let mut new_header = ResponseHeader::build(200, None).unwrap();
859        new_header
860            .append_header("content-type", "application/json")
861            .unwrap();
862        new_header.append_header("x-updated", "true").unwrap();
863        let new_meta = CacheMeta::new(
864            SystemTime::now() + std::time::Duration::from_secs(7200),
865            SystemTime::now(),
866            120,
867            600,
868            new_header,
869        );
870
871        // Update meta
872        let updated = STORAGE.update_meta(&key, &new_meta, trace).await.unwrap();
873        assert!(updated);
874
875        // Verify updated meta
876        let (read_meta, _hit) = STORAGE.lookup(&key, trace).await.unwrap().unwrap();
877        let headers = read_meta.response_header().headers.clone();
878        assert_eq!(headers.get("x-updated").unwrap().to_str().unwrap(), "true");
879
880        // Cleanup
881        let _ = std::fs::remove_dir_all(
882            std::env::temp_dir().join("grapsus-disk-cache-test-update-meta"),
883        );
884    }
885
886    #[tokio::test]
887    async fn test_miss_handler_drop() {
888        static STORAGE: Lazy<DiskCacheStorage> = Lazy::new(|| {
889            let path = std::env::temp_dir().join("grapsus-disk-cache-test-miss-drop");
890            let _ = std::fs::remove_dir_all(&path);
891            DiskCacheStorage::new(&path, 4, 100 * 1024 * 1024)
892        });
893        let trace = &span();
894
895        let key = CacheKey::new("", "test-miss-drop", "1");
896        let meta = create_test_meta();
897
898        // Create miss handler and write some data but don't finish
899        {
900            let mut miss_handler = STORAGE.get_miss_handler(&key, &meta, trace).await.unwrap();
901            miss_handler
902                .write_body(b"incomplete"[..].into(), false)
903                .await
904                .unwrap();
905            // Drop without finish
906        }
907
908        // Verify no files were written
909        assert!(STORAGE.lookup(&key, trace).await.unwrap().is_none());
910
911        // Verify inflight tracking was cleaned up
912        assert!(STORAGE.inflight.read().await.is_empty());
913
914        // Cleanup
915        let _ = std::fs::remove_dir_all(
916            std::env::temp_dir().join("grapsus-disk-cache-test-miss-drop"),
917        );
918    }
919
920    #[tokio::test]
921    async fn test_corrupted_meta() {
922        static STORAGE: Lazy<DiskCacheStorage> = Lazy::new(|| {
923            let path = std::env::temp_dir().join("grapsus-disk-cache-test-corrupted");
924            let _ = std::fs::remove_dir_all(&path);
925            DiskCacheStorage::new(&path, 4, 100 * 1024 * 1024)
926        });
927        let trace = &span();
928
929        let key = CacheKey::new("", "test-corrupted", "1");
930        let combined = key.combined();
931
932        // Write garbage to the meta file
933        let meta_path = STORAGE.meta_path(&combined);
934        let body_path = STORAGE.body_path(&combined);
935        std::fs::write(&meta_path, b"not-valid-meta-data").unwrap();
936        std::fs::write(&body_path, b"some-body").unwrap();
937
938        // Lookup should gracefully return None
939        let result = STORAGE.lookup(&key, trace).await.unwrap();
940        assert!(result.is_none());
941
942        // Corrupted files should have been cleaned up
943        assert!(!meta_path.exists());
944        assert!(!body_path.exists());
945
946        // Cleanup
947        let _ = std::fs::remove_dir_all(
948            std::env::temp_dir().join("grapsus-disk-cache-test-corrupted"),
949        );
950    }
951
952    #[test]
953    fn test_orphan_cleanup() {
954        let tmp = TempDir::new().unwrap();
955
956        // Pre-create a shard with tmp dir and orphaned files
957        let shard_tmp = tmp.path().join("shard-00").join("tmp");
958        std::fs::create_dir_all(&shard_tmp).unwrap();
959        std::fs::write(shard_tmp.join("orphan1.tmp"), b"data1").unwrap();
960        std::fs::write(shard_tmp.join("orphan2.tmp"), b"data2").unwrap();
961        // Non-tmp file should be left alone
962        std::fs::write(shard_tmp.join("keep.txt"), b"keep").unwrap();
963
964        assert!(shard_tmp.join("orphan1.tmp").exists());
965        assert!(shard_tmp.join("orphan2.tmp").exists());
966
967        // Creating storage should clean orphaned .tmp files
968        let _storage = DiskCacheStorage::new(tmp.path(), 4, 100 * 1024 * 1024);
969
970        assert!(!shard_tmp.join("orphan1.tmp").exists());
971        assert!(!shard_tmp.join("orphan2.tmp").exists());
972        assert!(shard_tmp.join("keep.txt").exists());
973    }
974
975    #[test]
976    fn test_meta_serialization_roundtrip() {
977        let meta = create_test_meta();
978        let serialized = serialize_meta_to_disk(&meta).unwrap();
979        let deserialized = deserialize_meta_from_disk(&serialized).unwrap();
980
981        assert_eq!(
982            meta.response_header().status.as_u16(),
983            deserialized.response_header().status.as_u16(),
984        );
985    }
986}