Skip to main content

grapsus_proxy/
hybrid_cache.rs

1//! Hybrid cache storage backend
2//!
3//! Combines memory (hot) and disk (cold) tiers. Lookups check memory first,
4//! falling back to disk with automatic promotion on hit. Writes go to both
5//! tiers so that entries are immediately available in memory and durable on
6//! disk.
7
8use async_trait::async_trait;
9use bytes::Bytes;
10use futures::FutureExt;
11use pingora_cache::key::{CacheHashKey, CacheKey, CompactCacheKey};
12use pingora_cache::meta::CacheMeta;
13use pingora_cache::storage::{
14    HandleHit, HandleMiss, HitHandler, MissFinishType, MissHandler, PurgeType, Storage,
15};
16use pingora_cache::trace::SpanHandle;
17use pingora_cache::MemCache;
18use pingora_core::Result;
19use std::any::Any;
20use std::panic::AssertUnwindSafe;
21use tracing::{debug, warn};
22
23use crate::disk_cache::DiskCacheStorage;
24
25// ============================================================================
26// HybridCacheStorage
27// ============================================================================
28
29/// Two-tier cache: memory for hot entries, disk for cold, with automatic
30/// promotion on disk hits.
31pub struct HybridCacheStorage {
32    memory: &'static MemCache,
33    disk: &'static DiskCacheStorage,
34}
35
36impl HybridCacheStorage {
37    pub fn new(memory: &'static MemCache, disk: &'static DiskCacheStorage) -> Self {
38        Self { memory, disk }
39    }
40}
41
42#[async_trait]
43impl Storage for HybridCacheStorage {
44    async fn lookup(
45        &'static self,
46        key: &CacheKey,
47        trace: &SpanHandle,
48    ) -> Result<Option<(CacheMeta, HitHandler)>> {
49        // Fast path: check memory first
50        if let Some(hit) = self.memory.lookup(key, trace).await? {
51            debug!("hybrid cache: memory hit");
52            return Ok(Some(hit));
53        }
54
55        // Slow path: check disk
56        let (meta, mut disk_hit) = match self.disk.lookup(key, trace).await? {
57            Some(hit) => hit,
58            None => return Ok(None),
59        };
60        debug!("hybrid cache: disk hit, promoting to memory");
61
62        // Read the full body from the disk hit handler
63        let mut body_parts: Vec<Bytes> = Vec::new();
64        while let Some(chunk) = disk_hit.read_body().await? {
65            body_parts.push(chunk);
66        }
67        let full_body: Bytes = if body_parts.len() == 1 {
68            body_parts.into_iter().next().unwrap()
69        } else {
70            let total: usize = body_parts.iter().map(|b| b.len()).sum();
71            let mut buf = Vec::with_capacity(total);
72            for part in &body_parts {
73                buf.extend_from_slice(part);
74            }
75            Bytes::from(buf)
76        };
77
78        // Serialize meta before spawning (CacheMeta is not Send-safe to move
79        // across spawn boundaries without serialization)
80        let serialized_meta = meta.serialize()?;
81        let promote_body = full_body.clone();
82        let key_clone = key.clone();
83
84        // Spawn background promotion into memory tier
85        let mem = self.memory;
86        tokio::spawn(async move {
87            let promote_meta = match CacheMeta::deserialize(&serialized_meta.0, &serialized_meta.1)
88            {
89                Ok(m) => m,
90                Err(e) => {
91                    warn!(error = %e, "hybrid cache: failed to deserialize meta for promotion");
92                    return;
93                }
94            };
95            let inactive_span = pingora_cache::trace::Span::inactive().handle();
96            match mem
97                .get_miss_handler(&key_clone, &promote_meta, &inactive_span)
98                .await
99            {
100                Ok(mut handler) => {
101                    if let Err(e) = handler.write_body(promote_body, true).await {
102                        warn!(error = %e, "hybrid cache: promotion write_body failed");
103                        return;
104                    }
105                    if let Err(e) = handler.finish().await {
106                        warn!(error = %e, "hybrid cache: promotion finish failed");
107                    }
108                }
109                Err(e) => {
110                    warn!(error = %e, "hybrid cache: promotion get_miss_handler failed");
111                }
112            }
113        });
114
115        // Return a hit handler wrapping the already-read body
116        let handler = HybridHitHandler::new(full_body);
117        Ok(Some((meta, Box::new(handler))))
118    }
119
120    async fn get_miss_handler(
121        &'static self,
122        key: &CacheKey,
123        meta: &CacheMeta,
124        trace: &SpanHandle,
125    ) -> Result<MissHandler> {
126        let mem_handler = self.memory.get_miss_handler(key, meta, trace).await?;
127        let disk_handler = self.disk.get_miss_handler(key, meta, trace).await?;
128
129        Ok(Box::new(HybridMissHandler {
130            mem_handler: Some(mem_handler),
131            disk_handler: Some(disk_handler),
132            finished: false,
133        }))
134    }
135
136    async fn purge(
137        &'static self,
138        key: &CompactCacheKey,
139        purge_type: PurgeType,
140        trace: &SpanHandle,
141    ) -> Result<bool> {
142        match purge_type {
143            PurgeType::Eviction => {
144                // Capacity demotion: remove from memory only, disk copy stays.
145                debug!("hybrid cache: eviction demotion, keeping disk copy");
146                self.memory.purge(key, purge_type, trace).await
147            }
148            PurgeType::Invalidation => {
149                let mem = self.memory.purge(key, purge_type, trace).await?;
150                let disk = self.disk.purge(key, purge_type, trace).await?;
151                Ok(mem || disk)
152            }
153        }
154    }
155
156    async fn update_meta(
157        &'static self,
158        key: &CacheKey,
159        meta: &CacheMeta,
160        trace: &SpanHandle,
161    ) -> Result<bool> {
162        // MemCache::update_meta panics if the key is not in its cache map.
163        // The entry may only exist on disk, so we catch the panic.
164        let mem_updated = match AssertUnwindSafe(self.memory.update_meta(key, meta, trace))
165            .catch_unwind()
166            .await
167        {
168            Ok(Ok(v)) => v,
169            Ok(Err(e)) => {
170                warn!(error = %e, "hybrid cache: memory update_meta error");
171                false
172            }
173            Err(_) => {
174                debug!("hybrid cache: key not in memory tier, skipping memory update_meta");
175                false
176            }
177        };
178
179        let disk_updated = self.disk.update_meta(key, meta, trace).await?;
180        Ok(mem_updated || disk_updated)
181    }
182
183    fn support_streaming_partial_write(&self) -> bool {
184        // Delegate to memory tier which supports streaming partial writes
185        self.memory.support_streaming_partial_write()
186    }
187
188    fn as_any(&self) -> &(dyn Any + Send + Sync + 'static) {
189        self
190    }
191}
192
193// ============================================================================
194// HybridHitHandler — wraps an already-read body from a disk-promoted hit
195// ============================================================================
196
197pub struct HybridHitHandler {
198    body: Bytes,
199    done: bool,
200    range_start: usize,
201    range_end: usize,
202}
203
204impl HybridHitHandler {
205    fn new(body: Bytes) -> Self {
206        let len = body.len();
207        Self {
208            body,
209            done: false,
210            range_start: 0,
211            range_end: len,
212        }
213    }
214}
215
216#[async_trait]
217impl HandleHit for HybridHitHandler {
218    async fn read_body(&mut self) -> Result<Option<Bytes>> {
219        if self.done {
220            return Ok(None);
221        }
222        self.done = true;
223        Ok(Some(self.body.slice(self.range_start..self.range_end)))
224    }
225
226    async fn finish(
227        self: Box<Self>,
228        _storage: &'static (dyn Storage + Sync),
229        _key: &CacheKey,
230        _trace: &SpanHandle,
231    ) -> Result<()> {
232        Ok(())
233    }
234
235    fn can_seek(&self) -> bool {
236        true
237    }
238
239    fn seek(&mut self, start: usize, end: Option<usize>) -> Result<()> {
240        if start >= self.body.len() {
241            return pingora_core::Error::e_explain(
242                pingora_core::ErrorType::InternalError,
243                format!("seek start out of range {} >= {}", start, self.body.len()),
244            );
245        }
246        self.range_start = start;
247        if let Some(end) = end {
248            self.range_end = std::cmp::min(self.body.len(), end);
249        }
250        self.done = false;
251        Ok(())
252    }
253
254    fn get_eviction_weight(&self) -> usize {
255        self.body.len()
256    }
257
258    fn as_any(&self) -> &(dyn Any + Send + Sync) {
259        self
260    }
261
262    fn as_any_mut(&mut self) -> &mut (dyn Any + Send + Sync) {
263        self
264    }
265}
266
267// ============================================================================
268// HybridMissHandler — writes to both tiers
269// ============================================================================
270
271struct HybridMissHandler {
272    mem_handler: Option<MissHandler>,
273    disk_handler: Option<MissHandler>,
274    finished: bool,
275}
276
277#[async_trait]
278impl HandleMiss for HybridMissHandler {
279    async fn write_body(&mut self, data: Bytes, eof: bool) -> Result<()> {
280        // Bytes::clone is a cheap Arc ref-count bump
281        if let Some(ref mut mem) = self.mem_handler {
282            mem.write_body(data.clone(), eof).await?;
283        }
284        if let Some(ref mut disk) = self.disk_handler {
285            disk.write_body(data, eof).await?;
286        }
287        Ok(())
288    }
289
290    async fn finish(mut self: Box<Self>) -> Result<MissFinishType> {
291        self.finished = true;
292
293        // Finish memory first for immediate availability
294        let mem_size = if let Some(mem) = self.mem_handler.take() {
295            match mem.finish().await {
296                Ok(MissFinishType::Created(s)) => s,
297                Ok(MissFinishType::Appended(s, _)) => s,
298                Err(e) => {
299                    warn!(error = %e, "hybrid cache: memory finish failed");
300                    0
301                }
302            }
303        } else {
304            0
305        };
306
307        // Finish disk for durability; failure is non-fatal
308        if let Some(disk) = self.disk_handler.take() {
309            if let Err(e) = disk.finish().await {
310                warn!(error = %e, "hybrid cache: disk finish failed (non-fatal)");
311            }
312        }
313
314        Ok(MissFinishType::Created(mem_size))
315    }
316
317    fn streaming_write_tag(&self) -> Option<&[u8]> {
318        // Delegate to memory handler for streaming partial write support
319        self.mem_handler
320            .as_ref()
321            .and_then(|h| h.streaming_write_tag())
322    }
323}
324
325impl Drop for HybridMissHandler {
326    fn drop(&mut self) {
327        // Inner handlers clean up their own state via their Drop impls
328    }
329}
330
331// ============================================================================
332// Tests
333// ============================================================================
334
335#[cfg(test)]
336mod tests {
337    use super::*;
338    use once_cell::sync::Lazy;
339    use pingora_cache::trace::Span;
340    use pingora_http::ResponseHeader;
341    use std::path::Path;
342    use std::time::SystemTime;
343
344    fn create_test_meta() -> CacheMeta {
345        let mut header = ResponseHeader::build(200, None).unwrap();
346        header.append_header("content-type", "text/plain").unwrap();
347        header.append_header("x-test", "hybrid").unwrap();
348        CacheMeta::new(
349            SystemTime::now() + std::time::Duration::from_secs(3600),
350            SystemTime::now(),
351            60,
352            300,
353            header,
354        )
355    }
356
357    fn span() -> SpanHandle {
358        Span::inactive().handle()
359    }
360
361    fn test_disk(name: &str) -> DiskCacheStorage {
362        let path = std::env::temp_dir().join(format!("grapsus-hybrid-test-{}", name));
363        let _ = std::fs::remove_dir_all(&path);
364        DiskCacheStorage::new(&path, 2, 50 * 1024 * 1024)
365    }
366
367    fn cleanup_disk(name: &str) {
368        let path = std::env::temp_dir().join(format!("grapsus-hybrid-test-{}", name));
369        let _ = std::fs::remove_dir_all(&path);
370    }
371
372    // ---------- test 1: miss then hit ----------
373
374    static HYBRID_1_MEM: Lazy<MemCache> = Lazy::new(MemCache::new);
375    static HYBRID_1_DISK: Lazy<DiskCacheStorage> = Lazy::new(|| test_disk("miss-then-hit"));
376    static HYBRID_1: Lazy<HybridCacheStorage> =
377        Lazy::new(|| HybridCacheStorage::new(&HYBRID_1_MEM, &HYBRID_1_DISK));
378
379    #[tokio::test]
380    async fn test_hybrid_miss_then_hit() {
381        let trace = &span();
382        let key = CacheKey::new("", "hybrid-miss-hit", "1");
383        let meta = create_test_meta();
384
385        // Lookup should miss
386        assert!(HYBRID_1.lookup(&key, trace).await.unwrap().is_none());
387
388        // Write via miss handler
389        let mut handler = HYBRID_1.get_miss_handler(&key, &meta, trace).await.unwrap();
390        handler
391            .write_body(Bytes::from_static(b"hello hybrid"), true)
392            .await
393            .unwrap();
394        handler.finish().await.unwrap();
395
396        // Lookup should hit
397        let (read_meta, mut hit) = HYBRID_1.lookup(&key, trace).await.unwrap().unwrap();
398        assert_eq!(read_meta.response_header().status.as_u16(), 200);
399        let body = hit.read_body().await.unwrap().unwrap();
400        assert_eq!(body.as_ref(), b"hello hybrid");
401
402        // Second read should return None
403        assert!(hit.read_body().await.unwrap().is_none());
404
405        cleanup_disk("miss-then-hit");
406    }
407
408    // ---------- test 2: disk promotion ----------
409
410    static HYBRID_2_MEM: Lazy<MemCache> = Lazy::new(MemCache::new);
411    static HYBRID_2_DISK: Lazy<DiskCacheStorage> = Lazy::new(|| test_disk("disk-promotion"));
412    static HYBRID_2: Lazy<HybridCacheStorage> =
413        Lazy::new(|| HybridCacheStorage::new(&HYBRID_2_MEM, &HYBRID_2_DISK));
414
415    #[tokio::test]
416    async fn test_hybrid_disk_promotion() {
417        let trace = &span();
418        let key = CacheKey::new("", "hybrid-promote", "1");
419        let meta = create_test_meta();
420
421        // Write directly to disk tier only
422        let mut disk_handler = HYBRID_2_DISK
423            .get_miss_handler(&key, &meta, trace)
424            .await
425            .unwrap();
426        disk_handler
427            .write_body(Bytes::from_static(b"cold data"), true)
428            .await
429            .unwrap();
430        disk_handler.finish().await.unwrap();
431
432        // Memory should have nothing
433        assert!(HYBRID_2_MEM.lookup(&key, trace).await.unwrap().is_none());
434
435        // Hybrid lookup triggers disk hit + promotion
436        let (_meta, mut hit) = HYBRID_2.lookup(&key, trace).await.unwrap().unwrap();
437        let body = hit.read_body().await.unwrap().unwrap();
438        assert_eq!(body.as_ref(), b"cold data");
439
440        // Give the background promotion task time to complete
441        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
442
443        // Now memory should have the entry
444        let mem_result = HYBRID_2_MEM.lookup(&key, trace).await.unwrap();
445        assert!(mem_result.is_some(), "entry should be promoted to memory");
446
447        cleanup_disk("disk-promotion");
448    }
449
450    // ---------- test 3: purge both tiers ----------
451
452    static HYBRID_3_MEM: Lazy<MemCache> = Lazy::new(MemCache::new);
453    static HYBRID_3_DISK: Lazy<DiskCacheStorage> = Lazy::new(|| test_disk("purge-both"));
454    static HYBRID_3: Lazy<HybridCacheStorage> =
455        Lazy::new(|| HybridCacheStorage::new(&HYBRID_3_MEM, &HYBRID_3_DISK));
456
457    #[tokio::test]
458    async fn test_hybrid_purge_both_tiers() {
459        let trace = &span();
460        let key = CacheKey::new("", "hybrid-purge", "1");
461        let meta = create_test_meta();
462
463        // Write via hybrid (goes to both tiers)
464        let mut handler = HYBRID_3.get_miss_handler(&key, &meta, trace).await.unwrap();
465        handler
466            .write_body(Bytes::from_static(b"purge me"), true)
467            .await
468            .unwrap();
469        handler.finish().await.unwrap();
470
471        // Verify both tiers have it
472        assert!(HYBRID_3_MEM.lookup(&key, trace).await.unwrap().is_some());
473        assert!(HYBRID_3_DISK.lookup(&key, trace).await.unwrap().is_some());
474
475        // Purge
476        let compact = key.to_compact();
477        let purged = HYBRID_3
478            .purge(&compact, PurgeType::Invalidation, trace)
479            .await
480            .unwrap();
481        assert!(purged);
482
483        // Both tiers should be empty
484        assert!(HYBRID_3_MEM.lookup(&key, trace).await.unwrap().is_none());
485        assert!(HYBRID_3_DISK.lookup(&key, trace).await.unwrap().is_none());
486
487        cleanup_disk("purge-both");
488    }
489
490    // ---------- test 4: update meta ----------
491
492    static HYBRID_4_MEM: Lazy<MemCache> = Lazy::new(MemCache::new);
493    static HYBRID_4_DISK: Lazy<DiskCacheStorage> = Lazy::new(|| test_disk("update-meta"));
494    static HYBRID_4: Lazy<HybridCacheStorage> =
495        Lazy::new(|| HybridCacheStorage::new(&HYBRID_4_MEM, &HYBRID_4_DISK));
496
497    #[tokio::test]
498    async fn test_hybrid_update_meta() {
499        let trace = &span();
500        let key = CacheKey::new("", "hybrid-update-meta", "1");
501        let meta = create_test_meta();
502
503        // Write entry
504        let mut handler = HYBRID_4.get_miss_handler(&key, &meta, trace).await.unwrap();
505        handler
506            .write_body(Bytes::from_static(b"update me"), true)
507            .await
508            .unwrap();
509        handler.finish().await.unwrap();
510
511        // Create updated meta with different header
512        let mut new_header = ResponseHeader::build(200, None).unwrap();
513        new_header
514            .append_header("content-type", "application/json")
515            .unwrap();
516        new_header.append_header("x-updated", "true").unwrap();
517        let new_meta = CacheMeta::new(
518            SystemTime::now() + std::time::Duration::from_secs(7200),
519            SystemTime::now(),
520            120,
521            600,
522            new_header,
523        );
524
525        // Update meta via hybrid
526        let updated = HYBRID_4.update_meta(&key, &new_meta, trace).await.unwrap();
527        assert!(updated);
528
529        // Verify lookup returns updated headers
530        let (read_meta, _hit) = HYBRID_4.lookup(&key, trace).await.unwrap().unwrap();
531        let headers = read_meta.response_header().headers.clone();
532        assert_eq!(headers.get("x-updated").unwrap().to_str().unwrap(), "true");
533
534        cleanup_disk("update-meta");
535    }
536
537    // ---------- test 5: miss handler drop ----------
538
539    static HYBRID_5_MEM: Lazy<MemCache> = Lazy::new(MemCache::new);
540    static HYBRID_5_DISK: Lazy<DiskCacheStorage> = Lazy::new(|| test_disk("miss-drop"));
541    static HYBRID_5: Lazy<HybridCacheStorage> =
542        Lazy::new(|| HybridCacheStorage::new(&HYBRID_5_MEM, &HYBRID_5_DISK));
543
544    #[tokio::test]
545    async fn test_hybrid_miss_handler_drop() {
546        let trace = &span();
547        let key = CacheKey::new("", "hybrid-miss-drop", "1");
548        let meta = create_test_meta();
549
550        // Create miss handler, write data, drop without finish
551        {
552            let mut handler = HYBRID_5.get_miss_handler(&key, &meta, trace).await.unwrap();
553            handler
554                .write_body(Bytes::from_static(b"incomplete"), false)
555                .await
556                .unwrap();
557            // Drop without calling finish
558        }
559
560        // Neither tier should have the entry
561        assert!(HYBRID_5_MEM.lookup(&key, trace).await.unwrap().is_none());
562        assert!(HYBRID_5_DISK.lookup(&key, trace).await.unwrap().is_none());
563
564        cleanup_disk("miss-drop");
565    }
566
567    // ---------- test 6: chunked write ----------
568
569    static HYBRID_6_MEM: Lazy<MemCache> = Lazy::new(MemCache::new);
570    static HYBRID_6_DISK: Lazy<DiskCacheStorage> = Lazy::new(|| test_disk("chunked-write"));
571    static HYBRID_6: Lazy<HybridCacheStorage> =
572        Lazy::new(|| HybridCacheStorage::new(&HYBRID_6_MEM, &HYBRID_6_DISK));
573
574    #[tokio::test]
575    async fn test_hybrid_chunked_write() {
576        let trace = &span();
577        let key = CacheKey::new("", "hybrid-chunked", "1");
578        let meta = create_test_meta();
579
580        let mut handler = HYBRID_6.get_miss_handler(&key, &meta, trace).await.unwrap();
581        handler
582            .write_body(Bytes::from_static(b"chunk1-"), false)
583            .await
584            .unwrap();
585        handler
586            .write_body(Bytes::from_static(b"chunk2-"), false)
587            .await
588            .unwrap();
589        handler
590            .write_body(Bytes::from_static(b"chunk3"), true)
591            .await
592            .unwrap();
593        handler.finish().await.unwrap();
594
595        let (_meta, mut hit) = HYBRID_6.lookup(&key, trace).await.unwrap().unwrap();
596        let body = hit.read_body().await.unwrap().unwrap();
597        assert_eq!(body.as_ref(), b"chunk1-chunk2-chunk3");
598
599        cleanup_disk("chunked-write");
600    }
601
602    // ---------- test 7: seek ----------
603
604    static HYBRID_7_MEM: Lazy<MemCache> = Lazy::new(MemCache::new);
605    static HYBRID_7_DISK: Lazy<DiskCacheStorage> = Lazy::new(|| test_disk("seek"));
606    static HYBRID_7: Lazy<HybridCacheStorage> =
607        Lazy::new(|| HybridCacheStorage::new(&HYBRID_7_MEM, &HYBRID_7_DISK));
608
609    #[tokio::test]
610    async fn test_hybrid_seek() {
611        let trace = &span();
612        let key = CacheKey::new("", "hybrid-seek", "1");
613        let meta = create_test_meta();
614
615        // Write directly to disk so lookup returns HybridHitHandler
616        let mut disk_handler = HYBRID_7_DISK
617            .get_miss_handler(&key, &meta, trace)
618            .await
619            .unwrap();
620        disk_handler
621            .write_body(Bytes::from_static(b"0123456789"), true)
622            .await
623            .unwrap();
624        disk_handler.finish().await.unwrap();
625
626        let (_meta, mut hit) = HYBRID_7.lookup(&key, trace).await.unwrap().unwrap();
627        assert!(hit.can_seek());
628
629        // Seek to a range
630        hit.seek(3, Some(7)).unwrap();
631        let body = hit.read_body().await.unwrap().unwrap();
632        assert_eq!(body.as_ref(), b"3456");
633
634        // Seek again
635        hit.seek(0, Some(3)).unwrap();
636        let body = hit.read_body().await.unwrap().unwrap();
637        assert_eq!(body.as_ref(), b"012");
638
639        // Out of range should fail
640        assert!(hit.seek(100, None).is_err());
641
642        cleanup_disk("seek");
643    }
644
645    // ---------- test 8: eviction demotion ----------
646
647    static HYBRID_8_MEM: Lazy<MemCache> = Lazy::new(MemCache::new);
648    static HYBRID_8_DISK: Lazy<DiskCacheStorage> = Lazy::new(|| test_disk("eviction-demotion"));
649    static HYBRID_8: Lazy<HybridCacheStorage> =
650        Lazy::new(|| HybridCacheStorage::new(&HYBRID_8_MEM, &HYBRID_8_DISK));
651
652    #[tokio::test]
653    async fn test_hybrid_eviction_demotion() {
654        let trace = &span();
655        let key = CacheKey::new("", "hybrid-evict-demote", "1");
656        let meta = create_test_meta();
657
658        // Write via hybrid (goes to both tiers)
659        let mut handler = HYBRID_8.get_miss_handler(&key, &meta, trace).await.unwrap();
660        handler
661            .write_body(Bytes::from_static(b"demote me"), true)
662            .await
663            .unwrap();
664        handler.finish().await.unwrap();
665
666        // Verify both tiers have it
667        assert!(HYBRID_8_MEM.lookup(&key, trace).await.unwrap().is_some());
668        assert!(HYBRID_8_DISK.lookup(&key, trace).await.unwrap().is_some());
669
670        // Eviction purge — should only remove from memory
671        let compact = key.to_compact();
672        let purged = HYBRID_8
673            .purge(&compact, PurgeType::Eviction, trace)
674            .await
675            .unwrap();
676        assert!(purged);
677
678        // Memory should be empty, disk should still have it
679        assert!(HYBRID_8_MEM.lookup(&key, trace).await.unwrap().is_none());
680        assert!(HYBRID_8_DISK.lookup(&key, trace).await.unwrap().is_some());
681
682        cleanup_disk("eviction-demotion");
683    }
684
685    // ---------- test 9: eviction then disk hit ----------
686
687    static HYBRID_9_MEM: Lazy<MemCache> = Lazy::new(MemCache::new);
688    static HYBRID_9_DISK: Lazy<DiskCacheStorage> = Lazy::new(|| test_disk("evict-then-hit"));
689    static HYBRID_9: Lazy<HybridCacheStorage> =
690        Lazy::new(|| HybridCacheStorage::new(&HYBRID_9_MEM, &HYBRID_9_DISK));
691
692    #[tokio::test]
693    async fn test_hybrid_eviction_then_disk_hit() {
694        let trace = &span();
695        let key = CacheKey::new("", "hybrid-evict-hit", "1");
696        let meta = create_test_meta();
697
698        // Write via hybrid (goes to both tiers)
699        let mut handler = HYBRID_9.get_miss_handler(&key, &meta, trace).await.unwrap();
700        handler
701            .write_body(Bytes::from_static(b"evict and find"), true)
702            .await
703            .unwrap();
704        handler.finish().await.unwrap();
705
706        // Evict from memory
707        let compact = key.to_compact();
708        HYBRID_9
709            .purge(&compact, PurgeType::Eviction, trace)
710            .await
711            .unwrap();
712
713        // Memory empty
714        assert!(HYBRID_9_MEM.lookup(&key, trace).await.unwrap().is_none());
715
716        // Hybrid lookup should find it on disk and promote
717        let (_meta, mut hit) = HYBRID_9.lookup(&key, trace).await.unwrap().unwrap();
718        let body = hit.read_body().await.unwrap().unwrap();
719        assert_eq!(body.as_ref(), b"evict and find");
720
721        // Give background promotion time to complete
722        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
723
724        // Memory should now have the entry again
725        let mem_result = HYBRID_9_MEM.lookup(&key, trace).await.unwrap();
726        assert!(
727            mem_result.is_some(),
728            "entry should be re-promoted to memory"
729        );
730
731        cleanup_disk("evict-then-hit");
732    }
733
734    // ---------- test 10: invalidation clears both ----------
735
736    static HYBRID_10_MEM: Lazy<MemCache> = Lazy::new(MemCache::new);
737    static HYBRID_10_DISK: Lazy<DiskCacheStorage> = Lazy::new(|| test_disk("invalidation-both"));
738    static HYBRID_10: Lazy<HybridCacheStorage> =
739        Lazy::new(|| HybridCacheStorage::new(&HYBRID_10_MEM, &HYBRID_10_DISK));
740
741    #[tokio::test]
742    async fn test_hybrid_invalidation_clears_both() {
743        let trace = &span();
744        let key = CacheKey::new("", "hybrid-invalidate", "1");
745        let meta = create_test_meta();
746
747        // Write via hybrid (goes to both tiers)
748        let mut handler = HYBRID_10
749            .get_miss_handler(&key, &meta, trace)
750            .await
751            .unwrap();
752        handler
753            .write_body(Bytes::from_static(b"invalidate me"), true)
754            .await
755            .unwrap();
756        handler.finish().await.unwrap();
757
758        // Verify both tiers have it
759        assert!(HYBRID_10_MEM.lookup(&key, trace).await.unwrap().is_some());
760        assert!(HYBRID_10_DISK.lookup(&key, trace).await.unwrap().is_some());
761
762        // Invalidation purge — should remove from both tiers
763        let compact = key.to_compact();
764        let purged = HYBRID_10
765            .purge(&compact, PurgeType::Invalidation, trace)
766            .await
767            .unwrap();
768        assert!(purged);
769
770        // Both tiers should be empty
771        assert!(HYBRID_10_MEM.lookup(&key, trace).await.unwrap().is_none());
772        assert!(HYBRID_10_DISK.lookup(&key, trace).await.unwrap().is_none());
773
774        cleanup_disk("invalidation-both");
775    }
776}