1use async_trait::async_trait;
9use bytes::Bytes;
10use futures::FutureExt;
11use pingora_cache::key::{CacheHashKey, CacheKey, CompactCacheKey};
12use pingora_cache::meta::CacheMeta;
13use pingora_cache::storage::{
14 HandleHit, HandleMiss, HitHandler, MissFinishType, MissHandler, PurgeType, Storage,
15};
16use pingora_cache::trace::SpanHandle;
17use pingora_cache::MemCache;
18use pingora_core::Result;
19use std::any::Any;
20use std::panic::AssertUnwindSafe;
21use tracing::{debug, warn};
22
23use crate::disk_cache::DiskCacheStorage;
24
25pub struct HybridCacheStorage {
32 memory: &'static MemCache,
33 disk: &'static DiskCacheStorage,
34}
35
36impl HybridCacheStorage {
37 pub fn new(memory: &'static MemCache, disk: &'static DiskCacheStorage) -> Self {
38 Self { memory, disk }
39 }
40}
41
42#[async_trait]
43impl Storage for HybridCacheStorage {
44 async fn lookup(
45 &'static self,
46 key: &CacheKey,
47 trace: &SpanHandle,
48 ) -> Result<Option<(CacheMeta, HitHandler)>> {
49 if let Some(hit) = self.memory.lookup(key, trace).await? {
51 debug!("hybrid cache: memory hit");
52 return Ok(Some(hit));
53 }
54
55 let (meta, mut disk_hit) = match self.disk.lookup(key, trace).await? {
57 Some(hit) => hit,
58 None => return Ok(None),
59 };
60 debug!("hybrid cache: disk hit, promoting to memory");
61
62 let mut body_parts: Vec<Bytes> = Vec::new();
64 while let Some(chunk) = disk_hit.read_body().await? {
65 body_parts.push(chunk);
66 }
67 let full_body: Bytes = if body_parts.len() == 1 {
68 body_parts.into_iter().next().unwrap()
69 } else {
70 let total: usize = body_parts.iter().map(|b| b.len()).sum();
71 let mut buf = Vec::with_capacity(total);
72 for part in &body_parts {
73 buf.extend_from_slice(part);
74 }
75 Bytes::from(buf)
76 };
77
78 let serialized_meta = meta.serialize()?;
81 let promote_body = full_body.clone();
82 let key_clone = key.clone();
83
84 let mem = self.memory;
86 tokio::spawn(async move {
87 let promote_meta = match CacheMeta::deserialize(&serialized_meta.0, &serialized_meta.1)
88 {
89 Ok(m) => m,
90 Err(e) => {
91 warn!(error = %e, "hybrid cache: failed to deserialize meta for promotion");
92 return;
93 }
94 };
95 let inactive_span = pingora_cache::trace::Span::inactive().handle();
96 match mem
97 .get_miss_handler(&key_clone, &promote_meta, &inactive_span)
98 .await
99 {
100 Ok(mut handler) => {
101 if let Err(e) = handler.write_body(promote_body, true).await {
102 warn!(error = %e, "hybrid cache: promotion write_body failed");
103 return;
104 }
105 if let Err(e) = handler.finish().await {
106 warn!(error = %e, "hybrid cache: promotion finish failed");
107 }
108 }
109 Err(e) => {
110 warn!(error = %e, "hybrid cache: promotion get_miss_handler failed");
111 }
112 }
113 });
114
115 let handler = HybridHitHandler::new(full_body);
117 Ok(Some((meta, Box::new(handler))))
118 }
119
120 async fn get_miss_handler(
121 &'static self,
122 key: &CacheKey,
123 meta: &CacheMeta,
124 trace: &SpanHandle,
125 ) -> Result<MissHandler> {
126 let mem_handler = self.memory.get_miss_handler(key, meta, trace).await?;
127 let disk_handler = self.disk.get_miss_handler(key, meta, trace).await?;
128
129 Ok(Box::new(HybridMissHandler {
130 mem_handler: Some(mem_handler),
131 disk_handler: Some(disk_handler),
132 finished: false,
133 }))
134 }
135
136 async fn purge(
137 &'static self,
138 key: &CompactCacheKey,
139 purge_type: PurgeType,
140 trace: &SpanHandle,
141 ) -> Result<bool> {
142 match purge_type {
143 PurgeType::Eviction => {
144 debug!("hybrid cache: eviction demotion, keeping disk copy");
146 self.memory.purge(key, purge_type, trace).await
147 }
148 PurgeType::Invalidation => {
149 let mem = self.memory.purge(key, purge_type, trace).await?;
150 let disk = self.disk.purge(key, purge_type, trace).await?;
151 Ok(mem || disk)
152 }
153 }
154 }
155
156 async fn update_meta(
157 &'static self,
158 key: &CacheKey,
159 meta: &CacheMeta,
160 trace: &SpanHandle,
161 ) -> Result<bool> {
162 let mem_updated = match AssertUnwindSafe(self.memory.update_meta(key, meta, trace))
165 .catch_unwind()
166 .await
167 {
168 Ok(Ok(v)) => v,
169 Ok(Err(e)) => {
170 warn!(error = %e, "hybrid cache: memory update_meta error");
171 false
172 }
173 Err(_) => {
174 debug!("hybrid cache: key not in memory tier, skipping memory update_meta");
175 false
176 }
177 };
178
179 let disk_updated = self.disk.update_meta(key, meta, trace).await?;
180 Ok(mem_updated || disk_updated)
181 }
182
183 fn support_streaming_partial_write(&self) -> bool {
184 self.memory.support_streaming_partial_write()
186 }
187
188 fn as_any(&self) -> &(dyn Any + Send + Sync + 'static) {
189 self
190 }
191}
192
193pub struct HybridHitHandler {
198 body: Bytes,
199 done: bool,
200 range_start: usize,
201 range_end: usize,
202}
203
204impl HybridHitHandler {
205 fn new(body: Bytes) -> Self {
206 let len = body.len();
207 Self {
208 body,
209 done: false,
210 range_start: 0,
211 range_end: len,
212 }
213 }
214}
215
216#[async_trait]
217impl HandleHit for HybridHitHandler {
218 async fn read_body(&mut self) -> Result<Option<Bytes>> {
219 if self.done {
220 return Ok(None);
221 }
222 self.done = true;
223 Ok(Some(self.body.slice(self.range_start..self.range_end)))
224 }
225
226 async fn finish(
227 self: Box<Self>,
228 _storage: &'static (dyn Storage + Sync),
229 _key: &CacheKey,
230 _trace: &SpanHandle,
231 ) -> Result<()> {
232 Ok(())
233 }
234
235 fn can_seek(&self) -> bool {
236 true
237 }
238
239 fn seek(&mut self, start: usize, end: Option<usize>) -> Result<()> {
240 if start >= self.body.len() {
241 return pingora_core::Error::e_explain(
242 pingora_core::ErrorType::InternalError,
243 format!("seek start out of range {} >= {}", start, self.body.len()),
244 );
245 }
246 self.range_start = start;
247 if let Some(end) = end {
248 self.range_end = std::cmp::min(self.body.len(), end);
249 }
250 self.done = false;
251 Ok(())
252 }
253
254 fn get_eviction_weight(&self) -> usize {
255 self.body.len()
256 }
257
258 fn as_any(&self) -> &(dyn Any + Send + Sync) {
259 self
260 }
261
262 fn as_any_mut(&mut self) -> &mut (dyn Any + Send + Sync) {
263 self
264 }
265}
266
267struct HybridMissHandler {
272 mem_handler: Option<MissHandler>,
273 disk_handler: Option<MissHandler>,
274 finished: bool,
275}
276
277#[async_trait]
278impl HandleMiss for HybridMissHandler {
279 async fn write_body(&mut self, data: Bytes, eof: bool) -> Result<()> {
280 if let Some(ref mut mem) = self.mem_handler {
282 mem.write_body(data.clone(), eof).await?;
283 }
284 if let Some(ref mut disk) = self.disk_handler {
285 disk.write_body(data, eof).await?;
286 }
287 Ok(())
288 }
289
290 async fn finish(mut self: Box<Self>) -> Result<MissFinishType> {
291 self.finished = true;
292
293 let mem_size = if let Some(mem) = self.mem_handler.take() {
295 match mem.finish().await {
296 Ok(MissFinishType::Created(s)) => s,
297 Ok(MissFinishType::Appended(s, _)) => s,
298 Err(e) => {
299 warn!(error = %e, "hybrid cache: memory finish failed");
300 0
301 }
302 }
303 } else {
304 0
305 };
306
307 if let Some(disk) = self.disk_handler.take() {
309 if let Err(e) = disk.finish().await {
310 warn!(error = %e, "hybrid cache: disk finish failed (non-fatal)");
311 }
312 }
313
314 Ok(MissFinishType::Created(mem_size))
315 }
316
317 fn streaming_write_tag(&self) -> Option<&[u8]> {
318 self.mem_handler
320 .as_ref()
321 .and_then(|h| h.streaming_write_tag())
322 }
323}
324
325impl Drop for HybridMissHandler {
326 fn drop(&mut self) {
327 }
329}
330
331#[cfg(test)]
336mod tests {
337 use super::*;
338 use once_cell::sync::Lazy;
339 use pingora_cache::trace::Span;
340 use pingora_http::ResponseHeader;
341 use std::path::Path;
342 use std::time::SystemTime;
343
344 fn create_test_meta() -> CacheMeta {
345 let mut header = ResponseHeader::build(200, None).unwrap();
346 header.append_header("content-type", "text/plain").unwrap();
347 header.append_header("x-test", "hybrid").unwrap();
348 CacheMeta::new(
349 SystemTime::now() + std::time::Duration::from_secs(3600),
350 SystemTime::now(),
351 60,
352 300,
353 header,
354 )
355 }
356
357 fn span() -> SpanHandle {
358 Span::inactive().handle()
359 }
360
361 fn test_disk(name: &str) -> DiskCacheStorage {
362 let path = std::env::temp_dir().join(format!("grapsus-hybrid-test-{}", name));
363 let _ = std::fs::remove_dir_all(&path);
364 DiskCacheStorage::new(&path, 2, 50 * 1024 * 1024)
365 }
366
367 fn cleanup_disk(name: &str) {
368 let path = std::env::temp_dir().join(format!("grapsus-hybrid-test-{}", name));
369 let _ = std::fs::remove_dir_all(&path);
370 }
371
372 static HYBRID_1_MEM: Lazy<MemCache> = Lazy::new(MemCache::new);
375 static HYBRID_1_DISK: Lazy<DiskCacheStorage> = Lazy::new(|| test_disk("miss-then-hit"));
376 static HYBRID_1: Lazy<HybridCacheStorage> =
377 Lazy::new(|| HybridCacheStorage::new(&HYBRID_1_MEM, &HYBRID_1_DISK));
378
379 #[tokio::test]
380 async fn test_hybrid_miss_then_hit() {
381 let trace = &span();
382 let key = CacheKey::new("", "hybrid-miss-hit", "1");
383 let meta = create_test_meta();
384
385 assert!(HYBRID_1.lookup(&key, trace).await.unwrap().is_none());
387
388 let mut handler = HYBRID_1.get_miss_handler(&key, &meta, trace).await.unwrap();
390 handler
391 .write_body(Bytes::from_static(b"hello hybrid"), true)
392 .await
393 .unwrap();
394 handler.finish().await.unwrap();
395
396 let (read_meta, mut hit) = HYBRID_1.lookup(&key, trace).await.unwrap().unwrap();
398 assert_eq!(read_meta.response_header().status.as_u16(), 200);
399 let body = hit.read_body().await.unwrap().unwrap();
400 assert_eq!(body.as_ref(), b"hello hybrid");
401
402 assert!(hit.read_body().await.unwrap().is_none());
404
405 cleanup_disk("miss-then-hit");
406 }
407
408 static HYBRID_2_MEM: Lazy<MemCache> = Lazy::new(MemCache::new);
411 static HYBRID_2_DISK: Lazy<DiskCacheStorage> = Lazy::new(|| test_disk("disk-promotion"));
412 static HYBRID_2: Lazy<HybridCacheStorage> =
413 Lazy::new(|| HybridCacheStorage::new(&HYBRID_2_MEM, &HYBRID_2_DISK));
414
415 #[tokio::test]
416 async fn test_hybrid_disk_promotion() {
417 let trace = &span();
418 let key = CacheKey::new("", "hybrid-promote", "1");
419 let meta = create_test_meta();
420
421 let mut disk_handler = HYBRID_2_DISK
423 .get_miss_handler(&key, &meta, trace)
424 .await
425 .unwrap();
426 disk_handler
427 .write_body(Bytes::from_static(b"cold data"), true)
428 .await
429 .unwrap();
430 disk_handler.finish().await.unwrap();
431
432 assert!(HYBRID_2_MEM.lookup(&key, trace).await.unwrap().is_none());
434
435 let (_meta, mut hit) = HYBRID_2.lookup(&key, trace).await.unwrap().unwrap();
437 let body = hit.read_body().await.unwrap().unwrap();
438 assert_eq!(body.as_ref(), b"cold data");
439
440 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
442
443 let mem_result = HYBRID_2_MEM.lookup(&key, trace).await.unwrap();
445 assert!(mem_result.is_some(), "entry should be promoted to memory");
446
447 cleanup_disk("disk-promotion");
448 }
449
450 static HYBRID_3_MEM: Lazy<MemCache> = Lazy::new(MemCache::new);
453 static HYBRID_3_DISK: Lazy<DiskCacheStorage> = Lazy::new(|| test_disk("purge-both"));
454 static HYBRID_3: Lazy<HybridCacheStorage> =
455 Lazy::new(|| HybridCacheStorage::new(&HYBRID_3_MEM, &HYBRID_3_DISK));
456
457 #[tokio::test]
458 async fn test_hybrid_purge_both_tiers() {
459 let trace = &span();
460 let key = CacheKey::new("", "hybrid-purge", "1");
461 let meta = create_test_meta();
462
463 let mut handler = HYBRID_3.get_miss_handler(&key, &meta, trace).await.unwrap();
465 handler
466 .write_body(Bytes::from_static(b"purge me"), true)
467 .await
468 .unwrap();
469 handler.finish().await.unwrap();
470
471 assert!(HYBRID_3_MEM.lookup(&key, trace).await.unwrap().is_some());
473 assert!(HYBRID_3_DISK.lookup(&key, trace).await.unwrap().is_some());
474
475 let compact = key.to_compact();
477 let purged = HYBRID_3
478 .purge(&compact, PurgeType::Invalidation, trace)
479 .await
480 .unwrap();
481 assert!(purged);
482
483 assert!(HYBRID_3_MEM.lookup(&key, trace).await.unwrap().is_none());
485 assert!(HYBRID_3_DISK.lookup(&key, trace).await.unwrap().is_none());
486
487 cleanup_disk("purge-both");
488 }
489
490 static HYBRID_4_MEM: Lazy<MemCache> = Lazy::new(MemCache::new);
493 static HYBRID_4_DISK: Lazy<DiskCacheStorage> = Lazy::new(|| test_disk("update-meta"));
494 static HYBRID_4: Lazy<HybridCacheStorage> =
495 Lazy::new(|| HybridCacheStorage::new(&HYBRID_4_MEM, &HYBRID_4_DISK));
496
497 #[tokio::test]
498 async fn test_hybrid_update_meta() {
499 let trace = &span();
500 let key = CacheKey::new("", "hybrid-update-meta", "1");
501 let meta = create_test_meta();
502
503 let mut handler = HYBRID_4.get_miss_handler(&key, &meta, trace).await.unwrap();
505 handler
506 .write_body(Bytes::from_static(b"update me"), true)
507 .await
508 .unwrap();
509 handler.finish().await.unwrap();
510
511 let mut new_header = ResponseHeader::build(200, None).unwrap();
513 new_header
514 .append_header("content-type", "application/json")
515 .unwrap();
516 new_header.append_header("x-updated", "true").unwrap();
517 let new_meta = CacheMeta::new(
518 SystemTime::now() + std::time::Duration::from_secs(7200),
519 SystemTime::now(),
520 120,
521 600,
522 new_header,
523 );
524
525 let updated = HYBRID_4.update_meta(&key, &new_meta, trace).await.unwrap();
527 assert!(updated);
528
529 let (read_meta, _hit) = HYBRID_4.lookup(&key, trace).await.unwrap().unwrap();
531 let headers = read_meta.response_header().headers.clone();
532 assert_eq!(headers.get("x-updated").unwrap().to_str().unwrap(), "true");
533
534 cleanup_disk("update-meta");
535 }
536
537 static HYBRID_5_MEM: Lazy<MemCache> = Lazy::new(MemCache::new);
540 static HYBRID_5_DISK: Lazy<DiskCacheStorage> = Lazy::new(|| test_disk("miss-drop"));
541 static HYBRID_5: Lazy<HybridCacheStorage> =
542 Lazy::new(|| HybridCacheStorage::new(&HYBRID_5_MEM, &HYBRID_5_DISK));
543
544 #[tokio::test]
545 async fn test_hybrid_miss_handler_drop() {
546 let trace = &span();
547 let key = CacheKey::new("", "hybrid-miss-drop", "1");
548 let meta = create_test_meta();
549
550 {
552 let mut handler = HYBRID_5.get_miss_handler(&key, &meta, trace).await.unwrap();
553 handler
554 .write_body(Bytes::from_static(b"incomplete"), false)
555 .await
556 .unwrap();
557 }
559
560 assert!(HYBRID_5_MEM.lookup(&key, trace).await.unwrap().is_none());
562 assert!(HYBRID_5_DISK.lookup(&key, trace).await.unwrap().is_none());
563
564 cleanup_disk("miss-drop");
565 }
566
567 static HYBRID_6_MEM: Lazy<MemCache> = Lazy::new(MemCache::new);
570 static HYBRID_6_DISK: Lazy<DiskCacheStorage> = Lazy::new(|| test_disk("chunked-write"));
571 static HYBRID_6: Lazy<HybridCacheStorage> =
572 Lazy::new(|| HybridCacheStorage::new(&HYBRID_6_MEM, &HYBRID_6_DISK));
573
574 #[tokio::test]
575 async fn test_hybrid_chunked_write() {
576 let trace = &span();
577 let key = CacheKey::new("", "hybrid-chunked", "1");
578 let meta = create_test_meta();
579
580 let mut handler = HYBRID_6.get_miss_handler(&key, &meta, trace).await.unwrap();
581 handler
582 .write_body(Bytes::from_static(b"chunk1-"), false)
583 .await
584 .unwrap();
585 handler
586 .write_body(Bytes::from_static(b"chunk2-"), false)
587 .await
588 .unwrap();
589 handler
590 .write_body(Bytes::from_static(b"chunk3"), true)
591 .await
592 .unwrap();
593 handler.finish().await.unwrap();
594
595 let (_meta, mut hit) = HYBRID_6.lookup(&key, trace).await.unwrap().unwrap();
596 let body = hit.read_body().await.unwrap().unwrap();
597 assert_eq!(body.as_ref(), b"chunk1-chunk2-chunk3");
598
599 cleanup_disk("chunked-write");
600 }
601
602 static HYBRID_7_MEM: Lazy<MemCache> = Lazy::new(MemCache::new);
605 static HYBRID_7_DISK: Lazy<DiskCacheStorage> = Lazy::new(|| test_disk("seek"));
606 static HYBRID_7: Lazy<HybridCacheStorage> =
607 Lazy::new(|| HybridCacheStorage::new(&HYBRID_7_MEM, &HYBRID_7_DISK));
608
609 #[tokio::test]
610 async fn test_hybrid_seek() {
611 let trace = &span();
612 let key = CacheKey::new("", "hybrid-seek", "1");
613 let meta = create_test_meta();
614
615 let mut disk_handler = HYBRID_7_DISK
617 .get_miss_handler(&key, &meta, trace)
618 .await
619 .unwrap();
620 disk_handler
621 .write_body(Bytes::from_static(b"0123456789"), true)
622 .await
623 .unwrap();
624 disk_handler.finish().await.unwrap();
625
626 let (_meta, mut hit) = HYBRID_7.lookup(&key, trace).await.unwrap().unwrap();
627 assert!(hit.can_seek());
628
629 hit.seek(3, Some(7)).unwrap();
631 let body = hit.read_body().await.unwrap().unwrap();
632 assert_eq!(body.as_ref(), b"3456");
633
634 hit.seek(0, Some(3)).unwrap();
636 let body = hit.read_body().await.unwrap().unwrap();
637 assert_eq!(body.as_ref(), b"012");
638
639 assert!(hit.seek(100, None).is_err());
641
642 cleanup_disk("seek");
643 }
644
645 static HYBRID_8_MEM: Lazy<MemCache> = Lazy::new(MemCache::new);
648 static HYBRID_8_DISK: Lazy<DiskCacheStorage> = Lazy::new(|| test_disk("eviction-demotion"));
649 static HYBRID_8: Lazy<HybridCacheStorage> =
650 Lazy::new(|| HybridCacheStorage::new(&HYBRID_8_MEM, &HYBRID_8_DISK));
651
652 #[tokio::test]
653 async fn test_hybrid_eviction_demotion() {
654 let trace = &span();
655 let key = CacheKey::new("", "hybrid-evict-demote", "1");
656 let meta = create_test_meta();
657
658 let mut handler = HYBRID_8.get_miss_handler(&key, &meta, trace).await.unwrap();
660 handler
661 .write_body(Bytes::from_static(b"demote me"), true)
662 .await
663 .unwrap();
664 handler.finish().await.unwrap();
665
666 assert!(HYBRID_8_MEM.lookup(&key, trace).await.unwrap().is_some());
668 assert!(HYBRID_8_DISK.lookup(&key, trace).await.unwrap().is_some());
669
670 let compact = key.to_compact();
672 let purged = HYBRID_8
673 .purge(&compact, PurgeType::Eviction, trace)
674 .await
675 .unwrap();
676 assert!(purged);
677
678 assert!(HYBRID_8_MEM.lookup(&key, trace).await.unwrap().is_none());
680 assert!(HYBRID_8_DISK.lookup(&key, trace).await.unwrap().is_some());
681
682 cleanup_disk("eviction-demotion");
683 }
684
685 static HYBRID_9_MEM: Lazy<MemCache> = Lazy::new(MemCache::new);
688 static HYBRID_9_DISK: Lazy<DiskCacheStorage> = Lazy::new(|| test_disk("evict-then-hit"));
689 static HYBRID_9: Lazy<HybridCacheStorage> =
690 Lazy::new(|| HybridCacheStorage::new(&HYBRID_9_MEM, &HYBRID_9_DISK));
691
692 #[tokio::test]
693 async fn test_hybrid_eviction_then_disk_hit() {
694 let trace = &span();
695 let key = CacheKey::new("", "hybrid-evict-hit", "1");
696 let meta = create_test_meta();
697
698 let mut handler = HYBRID_9.get_miss_handler(&key, &meta, trace).await.unwrap();
700 handler
701 .write_body(Bytes::from_static(b"evict and find"), true)
702 .await
703 .unwrap();
704 handler.finish().await.unwrap();
705
706 let compact = key.to_compact();
708 HYBRID_9
709 .purge(&compact, PurgeType::Eviction, trace)
710 .await
711 .unwrap();
712
713 assert!(HYBRID_9_MEM.lookup(&key, trace).await.unwrap().is_none());
715
716 let (_meta, mut hit) = HYBRID_9.lookup(&key, trace).await.unwrap().unwrap();
718 let body = hit.read_body().await.unwrap().unwrap();
719 assert_eq!(body.as_ref(), b"evict and find");
720
721 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
723
724 let mem_result = HYBRID_9_MEM.lookup(&key, trace).await.unwrap();
726 assert!(
727 mem_result.is_some(),
728 "entry should be re-promoted to memory"
729 );
730
731 cleanup_disk("evict-then-hit");
732 }
733
734 static HYBRID_10_MEM: Lazy<MemCache> = Lazy::new(MemCache::new);
737 static HYBRID_10_DISK: Lazy<DiskCacheStorage> = Lazy::new(|| test_disk("invalidation-both"));
738 static HYBRID_10: Lazy<HybridCacheStorage> =
739 Lazy::new(|| HybridCacheStorage::new(&HYBRID_10_MEM, &HYBRID_10_DISK));
740
741 #[tokio::test]
742 async fn test_hybrid_invalidation_clears_both() {
743 let trace = &span();
744 let key = CacheKey::new("", "hybrid-invalidate", "1");
745 let meta = create_test_meta();
746
747 let mut handler = HYBRID_10
749 .get_miss_handler(&key, &meta, trace)
750 .await
751 .unwrap();
752 handler
753 .write_body(Bytes::from_static(b"invalidate me"), true)
754 .await
755 .unwrap();
756 handler.finish().await.unwrap();
757
758 assert!(HYBRID_10_MEM.lookup(&key, trace).await.unwrap().is_some());
760 assert!(HYBRID_10_DISK.lookup(&key, trace).await.unwrap().is_some());
761
762 let compact = key.to_compact();
764 let purged = HYBRID_10
765 .purge(&compact, PurgeType::Invalidation, trace)
766 .await
767 .unwrap();
768 assert!(purged);
769
770 assert!(HYBRID_10_MEM.lookup(&key, trace).await.unwrap().is_none());
772 assert!(HYBRID_10_DISK.lookup(&key, trace).await.unwrap().is_none());
773
774 cleanup_disk("invalidation-both");
775 }
776}