1use super::config::{BlobCacheConfig, L2Compression};
9use super::entry::{effective_expires_at_unix_ms, jitter_seed, Entry};
10use super::l2::BlobCacheL2;
11use super::shard::{InsertOutcome, Lookup, Shard};
12
13use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
14use std::hash::{Hash, Hasher};
15#[cfg(test)]
16use std::path::{Path, PathBuf};
17use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
18use std::sync::{Arc, OnceLock, Weak};
19use std::time::{SystemTime, UNIX_EPOCH};
20
21use parking_lot::RwLock;
22
23use super::super::compressor::{CompressOpts, Compressed, L2BlobCompressor};
24use super::super::extended_ttl::ExtendedTtlPolicy;
25use super::super::promotion_pool::{
26 AsyncPromotionPool, PoolOpts, PromotionExecutor, PromotionRequest,
27};
28
29#[cfg(test)]
34thread_local! {
35 pub(super) static EFFECTIVE_EXPIRY_COMPUTE_CALLS: std::cell::Cell<u64> = const { std::cell::Cell::new(0) };
36}
37
38#[derive(Debug, Clone, PartialEq, Eq)]
39pub enum CacheError {
40 BlobTooLarge {
41 size: usize,
42 max: usize,
43 },
44 MetadataTooLarge {
45 keys: usize,
46 bytes: usize,
47 max_keys: usize,
48 max_bytes: usize,
49 },
50 TooManyNamespaces {
51 max: usize,
52 },
53 VersionMismatch {
54 existing: u64,
55 attempted: u64,
56 },
57 L2Full {
58 size: u64,
59 max: u64,
60 },
61 L2Io(String),
62}
63
64#[derive(Debug, Clone, PartialEq, Eq, Hash)]
65pub(super) struct BlobCacheKey {
66 pub(super) namespace: String,
67 pub(super) key: String,
68}
69
70impl BlobCacheKey {
71 pub(super) fn new(namespace: impl Into<String>, key: impl Into<String>) -> Self {
72 Self {
73 namespace: namespace.into(),
74 key: key.into(),
75 }
76 }
77}
78
79#[derive(Debug, Clone, PartialEq, Eq, Hash)]
80struct ScopedLabel {
81 namespace: String,
82 label: String,
83}
84
85impl ScopedLabel {
86 fn new(namespace: impl Into<String>, label: impl Into<String>) -> Self {
87 Self {
88 namespace: namespace.into(),
89 label: label.into(),
90 }
91 }
92}
93
94#[derive(Debug, Clone, PartialEq, Eq)]
95pub struct BlobCacheHit {
96 pub(super) bytes: Arc<[u8]>,
97 pub(super) content_metadata: BTreeMap<String, String>,
98 pub(super) version: Option<u64>,
99 pub(super) stale_window_remaining_ms: Option<u64>,
103}
104
105impl BlobCacheHit {
106 pub(crate) fn new(
107 bytes: Arc<[u8]>,
108 content_metadata: BTreeMap<String, String>,
109 version: Option<u64>,
110 ) -> Self {
111 Self {
112 bytes,
113 content_metadata,
114 version,
115 stale_window_remaining_ms: None,
116 }
117 }
118
119 pub(crate) fn new_stale(
120 bytes: Arc<[u8]>,
121 content_metadata: BTreeMap<String, String>,
122 version: Option<u64>,
123 window_remaining_ms: u64,
124 ) -> Self {
125 Self {
126 bytes,
127 content_metadata,
128 version,
129 stale_window_remaining_ms: Some(window_remaining_ms),
130 }
131 }
132
133 pub fn bytes(&self) -> &Arc<[u8]> {
135 &self.bytes
136 }
137
138 pub fn value(&self) -> &[u8] {
140 &self.bytes
141 }
142
143 pub fn content_metadata(&self) -> &BTreeMap<String, String> {
145 &self.content_metadata
146 }
147
148 pub fn version(&self) -> Option<u64> {
150 self.version
151 }
152
153 pub fn is_stale(&self) -> bool {
157 self.stale_window_remaining_ms.is_some()
158 }
159
160 pub fn stale_window_remaining_ms(&self) -> Option<u64> {
163 self.stale_window_remaining_ms
164 }
165}
166
167#[derive(Debug, Clone, Default, PartialEq, Eq)]
168pub struct BlobCachePut {
169 pub bytes: Vec<u8>,
170 pub content_metadata: BTreeMap<String, String>,
171 pub tags: BTreeSet<String>,
172 pub dependencies: BTreeSet<String>,
173 pub policy: BlobCachePolicy,
174}
175
176impl BlobCachePut {
177 pub fn new(bytes: impl Into<Vec<u8>>) -> Self {
178 Self {
179 bytes: bytes.into(),
180 content_metadata: BTreeMap::new(),
181 tags: BTreeSet::new(),
182 dependencies: BTreeSet::new(),
183 policy: BlobCachePolicy::default(),
184 }
185 }
186
187 pub fn with_content_metadata(mut self, content_metadata: BTreeMap<String, String>) -> Self {
188 self.content_metadata = content_metadata;
189 self
190 }
191
192 pub fn with_tags(mut self, tags: impl IntoIterator<Item = impl Into<String>>) -> Self {
193 self.tags = tags.into_iter().map(Into::into).collect();
194 self
195 }
196
197 pub fn with_dependencies(
198 mut self,
199 dependencies: impl IntoIterator<Item = impl Into<String>>,
200 ) -> Self {
201 self.dependencies = dependencies.into_iter().map(Into::into).collect();
202 self
203 }
204
205 pub fn with_policy(mut self, policy: BlobCachePolicy) -> Self {
206 self.policy = policy;
207 self
208 }
209}
210
211#[derive(Debug, Clone, Copy, PartialEq, Eq)]
212pub enum L1Admission {
213 Always,
214 Auto,
215 Never,
216}
217
218#[derive(Debug, Clone, Copy, PartialEq, Eq)]
226pub enum CachePresence {
227 Present,
229 Absent,
231 MaybePresent,
234}
235
236impl From<bool> for CachePresence {
237 fn from(present: bool) -> Self {
238 if present {
239 CachePresence::Present
240 } else {
241 CachePresence::Absent
242 }
243 }
244}
245
246#[derive(Debug, Clone, Copy, PartialEq, Eq)]
247pub struct BlobCachePolicy {
248 ttl_ms: Option<u64>,
249 expires_at_unix_ms: Option<u64>,
250 max_blob_bytes: Option<usize>,
251 l1_admission: L1Admission,
252 priority: u8,
253 version: Option<u64>,
254 extended: ExtendedTtlPolicy,
260}
261
262impl Default for BlobCachePolicy {
263 fn default() -> Self {
264 Self {
265 ttl_ms: None,
266 expires_at_unix_ms: None,
267 max_blob_bytes: None,
268 l1_admission: L1Admission::Auto,
269 priority: 128,
270 version: None,
271 extended: ExtendedTtlPolicy::off(),
272 }
273 }
274}
275
276impl BlobCachePolicy {
277 pub fn ttl_ms(mut self, ttl_ms: u64) -> Self {
280 self.ttl_ms = Some(ttl_ms);
281 self
282 }
283
284 pub fn expires_at_unix_ms(mut self, expires_at_unix_ms: u64) -> Self {
285 self.expires_at_unix_ms = Some(expires_at_unix_ms);
286 self
287 }
288
289 pub fn max_blob_bytes(mut self, max_blob_bytes: usize) -> Self {
290 self.max_blob_bytes = Some(max_blob_bytes);
291 self
292 }
293
294 pub fn l1_admission(mut self, l1_admission: L1Admission) -> Self {
295 self.l1_admission = l1_admission;
296 self
297 }
298
299 pub fn priority(mut self, priority: u8) -> Self {
300 self.priority = priority;
301 self
302 }
303
304 pub fn version(mut self, version: u64) -> Self {
305 self.version = Some(version);
306 self
307 }
308
309 pub fn extended(mut self, extended: ExtendedTtlPolicy) -> Self {
314 self.extended = extended;
315 self
316 }
317
318 pub fn ttl_ms_value(&self) -> Option<u64> {
325 self.ttl_ms
326 }
327
328 pub fn expires_at_unix_ms_value(&self) -> Option<u64> {
329 self.expires_at_unix_ms
330 }
331
332 pub fn max_blob_bytes_value(&self) -> Option<usize> {
333 self.max_blob_bytes
334 }
335
336 pub fn l1_admission_value(&self) -> L1Admission {
337 self.l1_admission
338 }
339
340 pub fn priority_value(&self) -> u8 {
341 self.priority
342 }
343
344 pub fn version_value(&self) -> Option<u64> {
345 self.version
346 }
347
348 pub fn extended_value(&self) -> ExtendedTtlPolicy {
352 self.extended
353 }
354}
355
356#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
357pub struct BlobCacheStats {
358 pub(super) hits: u64,
359 pub(super) misses: u64,
360 pub(super) insertions: u64,
361 pub(super) evictions: u64,
362 pub(super) expirations: u64,
363 pub(super) invalidations: u64,
364 pub(super) namespace_flushes: u64,
365 pub(super) version_mismatches: u64,
366 pub(super) entries: usize,
367 pub(super) bytes_in_use: usize,
368 pub(super) l1_bytes_max: usize,
369 pub(super) l2_bytes_in_use: u64,
370 pub(super) l2_bytes_max: u64,
371 pub(super) l2_full_rejections: u64,
372 pub(super) l2_metadata_reads: u64,
373 pub(super) l2_negative_skips: u64,
374 pub(super) synopsis_metadata_reads: u64,
378 pub(super) synopsis_bytes: u64,
380 pub(super) namespaces: usize,
381 pub(super) max_namespaces: usize,
382 pub(super) promotion_queued: u64,
385 pub(super) promotion_dropped: u64,
386 pub(super) promotion_completed: u64,
387 pub(super) promotion_queue_depth: usize,
388 pub(super) l2_compression_original_bytes: u64,
392 pub(super) l2_compression_stored_bytes: u64,
395 pub(super) l2_compression_skipped_total: u64,
397 pub(super) l2_bytes_saved_total: u64,
399 pub(super) l1_stale_serves_total: u64,
402 pub(super) l1_idle_evicts_total: u64,
405}
406
407impl BlobCacheStats {
408 pub fn hits(&self) -> u64 {
411 self.hits
412 }
413
414 pub fn misses(&self) -> u64 {
416 self.misses
417 }
418
419 pub fn insertions(&self) -> u64 {
420 self.insertions
421 }
422
423 pub fn evictions(&self) -> u64 {
424 self.evictions
425 }
426
427 pub fn expirations(&self) -> u64 {
428 self.expirations
429 }
430
431 pub fn invalidations(&self) -> u64 {
432 self.invalidations
433 }
434
435 pub fn namespace_flushes(&self) -> u64 {
436 self.namespace_flushes
437 }
438
439 pub fn version_mismatches(&self) -> u64 {
440 self.version_mismatches
441 }
442
443 pub fn entries(&self) -> usize {
444 self.entries
445 }
446
447 pub fn bytes_in_use(&self) -> u64 {
450 self.bytes_in_use as u64
451 }
452
453 pub fn l1_bytes_max(&self) -> usize {
454 self.l1_bytes_max
455 }
456
457 pub fn l2_bytes_in_use(&self) -> u64 {
458 self.l2_bytes_in_use
459 }
460
461 pub fn l2_bytes_max(&self) -> u64 {
462 self.l2_bytes_max
463 }
464
465 pub fn l2_full_rejections(&self) -> u64 {
466 self.l2_full_rejections
467 }
468
469 pub fn l2_metadata_reads(&self) -> u64 {
470 self.l2_metadata_reads
471 }
472
473 pub fn l2_negative_skips(&self) -> u64 {
474 self.l2_negative_skips
475 }
476
477 pub fn synopsis_metadata_reads(&self) -> u64 {
483 self.synopsis_metadata_reads
484 }
485
486 pub fn synopsis_bytes(&self) -> u64 {
488 self.synopsis_bytes
489 }
490
491 pub fn namespaces(&self) -> usize {
492 self.namespaces
493 }
494
495 pub fn max_namespaces(&self) -> usize {
496 self.max_namespaces
497 }
498
499 pub fn promotion_queued(&self) -> u64 {
502 self.promotion_queued
503 }
504
505 pub fn promotion_dropped(&self) -> u64 {
508 self.promotion_dropped
509 }
510
511 pub fn promotion_completed(&self) -> u64 {
514 self.promotion_completed
515 }
516
517 pub fn promotion_queue_depth(&self) -> usize {
520 self.promotion_queue_depth
521 }
522
523 pub fn l2_compression_ratio_observed(&self) -> f64 {
530 if self.l2_compression_stored_bytes == 0 {
531 return 1.0;
532 }
533 self.l2_compression_original_bytes as f64 / self.l2_compression_stored_bytes as f64
534 }
535
536 pub fn l2_compression_skipped_total(&self) -> u64 {
540 self.l2_compression_skipped_total
541 }
542
543 pub fn l2_bytes_saved_total(&self) -> u64 {
547 self.l2_bytes_saved_total
548 }
549
550 pub fn l1_stale_serves_total(&self) -> u64 {
554 self.l1_stale_serves_total
555 }
556
557 pub fn l1_idle_evicts_total(&self) -> u64 {
561 self.l1_idle_evicts_total
562 }
563}
564
565#[derive(Clone, Copy)]
566enum IndexedKind {
567 Tag,
568 Dependency,
569}
570
571#[derive(Debug)]
572struct AtomicStats {
573 hits: AtomicU64,
574 misses: AtomicU64,
575 insertions: AtomicU64,
576 evictions: AtomicU64,
577 expirations: AtomicU64,
578 invalidations: AtomicU64,
579 namespace_flushes: AtomicU64,
580 version_mismatches: AtomicU64,
581 l2_full_rejections: AtomicU64,
582 l1_stale_serves: AtomicU64,
586 l1_idle_evicts: AtomicU64,
590}
591
592impl AtomicStats {
593 fn new() -> Self {
594 Self {
595 hits: AtomicU64::new(0),
596 misses: AtomicU64::new(0),
597 insertions: AtomicU64::new(0),
598 evictions: AtomicU64::new(0),
599 expirations: AtomicU64::new(0),
600 invalidations: AtomicU64::new(0),
601 namespace_flushes: AtomicU64::new(0),
602 version_mismatches: AtomicU64::new(0),
603 l2_full_rejections: AtomicU64::new(0),
604 l1_stale_serves: AtomicU64::new(0),
605 l1_idle_evicts: AtomicU64::new(0),
606 }
607 }
608}
609
610pub struct BlobCache {
629 config: BlobCacheConfig,
630 shards: Vec<RwLock<Shard>>,
631 namespaces: RwLock<HashSet<String>>,
632 namespace_generations: RwLock<HashMap<String, u64>>,
633 tag_index: RwLock<HashMap<ScopedLabel, HashSet<BlobCacheKey>>>,
634 dependency_index: RwLock<HashMap<ScopedLabel, HashSet<BlobCacheKey>>>,
635 l2: Option<Arc<BlobCacheL2>>,
636 bytes_in_use: AtomicUsize,
637 stats: AtomicStats,
638 promotion_pool: OnceLock<Arc<AsyncPromotionPool>>,
643}
644
645const _: fn() = || {
650 fn assert_send_sync<T: Send + Sync>() {}
651 assert_send_sync::<BlobCache>();
652};
653
654impl BlobCache {
655 pub fn new(config: BlobCacheConfig) -> Self {
659 Self::try_new(config).expect("open blob-cache L2")
660 }
661
662 pub fn open_with_l2(config: BlobCacheConfig) -> Result<Self, CacheError> {
666 Self::try_new(config)
667 }
668
669 fn try_new(config: BlobCacheConfig) -> Result<Self, CacheError> {
670 let config = BlobCacheConfig {
671 shard_count: config.shard_count.max(1),
672 ..config
673 };
674 let l2 = config
675 .l2_path
676 .clone()
677 .map(|path| BlobCacheL2::open(path, config.l2_bytes_max))
678 .transpose()?;
679 let shards = (0..config.shard_count)
680 .map(|_| RwLock::new(Shard::new()))
681 .collect();
682 Ok(Self {
683 config,
684 shards,
685 namespaces: RwLock::new(HashSet::new()),
686 namespace_generations: RwLock::new(HashMap::new()),
687 tag_index: RwLock::new(HashMap::new()),
688 dependency_index: RwLock::new(HashMap::new()),
689 l2: l2.map(Arc::new),
690 bytes_in_use: AtomicUsize::new(0),
691 stats: AtomicStats::new(),
692 promotion_pool: OnceLock::new(),
693 })
694 }
695
696 pub fn with_defaults() -> Self {
697 Self::new(BlobCacheConfig::default())
698 }
699
700 pub fn l2_path(&self) -> Option<&std::path::Path> {
708 self.config.l2_path.as_deref()
709 }
710
711 pub fn put(
712 &self,
713 namespace: impl Into<String>,
714 key: impl Into<String>,
715 input: BlobCachePut,
716 ) -> Result<(), CacheError> {
717 self.put_at(namespace, key, input, unix_now_ms())
718 }
719
720 fn put_at(
721 &self,
722 namespace: impl Into<String>,
723 key: impl Into<String>,
724 input: BlobCachePut,
725 now_ms: u64,
726 ) -> Result<(), CacheError> {
727 let namespace = namespace.into();
728 let key = BlobCacheKey::new(namespace.clone(), key);
729 self.validate_blob_size(input.bytes.len(), input.policy)?;
730 self.validate_metadata(&input.content_metadata)?;
731 self.ensure_namespace(&namespace)?;
732 let namespace_generation = self.current_generation(&namespace);
733 let tags = input.tags.clone();
734 let dependencies = input.dependencies.clone();
735
736 let shard_idx = self.shard_index(&key);
737 let mut shard = self.shards[shard_idx].write();
738 self.check_version(
739 &shard,
740 &key,
741 input.policy.version_value(),
742 namespace_generation,
743 )?;
744 let entry = Entry::new(
745 input.bytes,
746 input.content_metadata,
747 input.tags,
748 input.dependencies,
749 input.policy,
750 namespace_generation,
751 now_ms,
752 &namespace,
753 &key.key,
754 );
755 let entry_size = entry.size;
756 if let Some(l2) = &self.l2 {
757 let old_l2_size = l2.record_size(&key);
758 let compressed = match self.config.l2_compression {
765 L2Compression::Off => Compressed::Raw(entry.bytes.as_ref().to_vec()),
766 L2Compression::On => {
767 let content_type = entry
768 .content_metadata
769 .get("content-type")
770 .map(String::as_str);
771 L2BlobCompressor::compress(
772 entry.bytes.as_ref(),
773 content_type,
774 &CompressOpts::default(),
775 )
776 .map_err(|err| CacheError::L2Io(err.to_string()))?
777 }
778 };
779 match l2.put(&key, &entry, old_l2_size, compressed) {
780 Ok(()) => {}
781 Err(err @ CacheError::L2Full { .. }) => {
782 self.stats
783 .l2_full_rejections
784 .fetch_add(1, Ordering::Relaxed);
785 return Err(err);
786 }
787 Err(err) => return Err(err),
788 }
789 }
790 let outcome = if matches!(input.policy.l1_admission_value(), L1Admission::Never) {
791 let old_entry = shard.remove(&key);
792 InsertOutcome {
793 old_entry,
794 admitted: false,
795 }
796 } else {
797 shard.insert(key.clone(), entry)
798 };
799 drop(shard);
800
801 if let Some(old_entry) = outcome.old_entry.as_ref() {
802 self.deindex_entry(&key, old_entry);
803 }
804 if outcome.admitted {
805 self.index_entry(&key, &tags, &dependencies);
806 }
807
808 let old_size = outcome.old_entry.as_ref().map_or(0, |entry| entry.size);
809 let new_size = if outcome.admitted { entry_size } else { 0 };
810 if new_size >= old_size {
811 self.bytes_in_use
812 .fetch_add(new_size - old_size, Ordering::Relaxed);
813 } else {
814 self.bytes_in_use
815 .fetch_sub(old_size - new_size, Ordering::Relaxed);
816 }
817 self.stats.insertions.fetch_add(1, Ordering::Relaxed);
818 if outcome.admitted {
819 self.evict_until_within_budget(shard_idx);
820 }
821 Ok(())
822 }
823
824 pub fn get(&self, namespace: &str, key: &str) -> Option<BlobCacheHit> {
825 self.get_at(namespace, key, unix_now_ms())
826 }
827
828 fn get_at(&self, namespace: &str, key: &str, now_ms: u64) -> Option<BlobCacheHit> {
829 let cache_key = BlobCacheKey::new(namespace, key);
830 let namespace_generation = self.current_generation(namespace);
831 let shard_idx = self.shard_index(&cache_key);
832 let mut shard = self.shards[shard_idx].write();
833 match shard.get(&cache_key, now_ms, namespace_generation) {
834 Lookup::Hit(hit) => {
835 self.stats.hits.fetch_add(1, Ordering::Relaxed);
836 if hit.is_stale() {
837 self.stats.l1_stale_serves.fetch_add(1, Ordering::Relaxed);
838 }
839 Some(hit)
840 }
841 Lookup::Expired(entry) => {
842 drop(shard);
843 self.record_removed_entry(&cache_key, &entry);
844 if let Some(l2) = &self.l2 {
845 l2.delete_key(&cache_key);
846 }
847 self.stats.expirations.fetch_add(1, Ordering::Relaxed);
848 self.stats.misses.fetch_add(1, Ordering::Relaxed);
849 None
850 }
851 Lookup::IdleEvicted(entry) => {
852 drop(shard);
853 self.record_removed_entry(&cache_key, &entry);
854 if let Some(l2) = &self.l2 {
855 l2.delete_key(&cache_key);
856 }
857 self.stats.expirations.fetch_add(1, Ordering::Relaxed);
858 self.stats.l1_idle_evicts.fetch_add(1, Ordering::Relaxed);
859 self.stats.misses.fetch_add(1, Ordering::Relaxed);
860 None
861 }
862 Lookup::Stale(entry) => {
863 drop(shard);
864 self.record_removed_entry(&cache_key, &entry);
865 self.stats.misses.fetch_add(1, Ordering::Relaxed);
866 None
867 }
868 Lookup::Miss => {
869 drop(shard);
870 if let Some(pool) = self.promotion_pool.get() {
871 if let Some(l2) = self.l2.as_ref() {
875 if let Some(entry) = l2.get(&cache_key, now_ms, namespace_generation) {
876 let hit = entry.hit();
877 drop(entry);
883 let request = PromotionRequest {
884 namespace: cache_key.namespace.clone(),
885 key: cache_key.key.clone(),
886 bytes: Arc::clone(hit.bytes()),
887 policy: BlobCachePolicy::default(),
888 };
889 let _ = pool.schedule(request);
890 self.stats.hits.fetch_add(1, Ordering::Relaxed);
891 return Some(hit);
892 }
893 }
894 self.stats.misses.fetch_add(1, Ordering::Relaxed);
895 return None;
896 }
897 if let Some(hit) =
898 self.rehydrate_l2_entry(&cache_key, now_ms, namespace_generation, shard_idx)
899 {
900 self.stats.hits.fetch_add(1, Ordering::Relaxed);
901 return Some(hit);
902 }
903 self.stats.misses.fetch_add(1, Ordering::Relaxed);
904 None
905 }
906 Lookup::Present => unreachable!("get cannot return presence-only lookup"),
907 }
908 }
909
910 pub fn exists(&self, namespace: &str, key: &str) -> CachePresence {
928 self.exists_at(namespace, key, unix_now_ms())
929 }
930
931 fn exists_at(&self, namespace: &str, key: &str, now_ms: u64) -> CachePresence {
932 let cache_key = BlobCacheKey::new(namespace, key);
933 let namespace_generation = self.current_generation(namespace);
934 let shard_idx = self.shard_index(&cache_key);
935 let mut shard = self.shards[shard_idx].write();
936 match shard.contains(&cache_key, now_ms, namespace_generation) {
937 Lookup::Present => {
938 self.stats.hits.fetch_add(1, Ordering::Relaxed);
939 CachePresence::Present
940 }
941 Lookup::Expired(entry) => {
942 drop(shard);
943 self.record_removed_entry(&cache_key, &entry);
944 if let Some(l2) = &self.l2 {
945 l2.delete_key(&cache_key);
946 }
947 self.stats.expirations.fetch_add(1, Ordering::Relaxed);
948 self.stats.misses.fetch_add(1, Ordering::Relaxed);
949 CachePresence::Absent
950 }
951 Lookup::IdleEvicted(entry) => {
952 drop(shard);
953 self.record_removed_entry(&cache_key, &entry);
954 if let Some(l2) = &self.l2 {
955 l2.delete_key(&cache_key);
956 }
957 self.stats.expirations.fetch_add(1, Ordering::Relaxed);
958 self.stats.l1_idle_evicts.fetch_add(1, Ordering::Relaxed);
959 self.stats.misses.fetch_add(1, Ordering::Relaxed);
960 CachePresence::Absent
961 }
962 Lookup::Stale(entry) => {
963 drop(shard);
964 self.record_removed_entry(&cache_key, &entry);
965 self.stats.misses.fetch_add(1, Ordering::Relaxed);
966 CachePresence::Absent
967 }
968 Lookup::Miss => {
969 drop(shard);
970 let Some(l2) = self.l2.as_ref() else {
971 self.stats.misses.fetch_add(1, Ordering::Relaxed);
972 return CachePresence::Absent;
973 };
974 if l2.synopsis_may_contain(namespace, key) {
975 self.stats.hits.fetch_add(1, Ordering::Relaxed);
978 CachePresence::MaybePresent
979 } else {
980 self.stats.misses.fetch_add(1, Ordering::Relaxed);
983 CachePresence::Absent
984 }
985 }
986 Lookup::Hit(_) => unreachable!("exists cannot return a hit payload"),
987 }
988 }
989
990 pub fn invalidate_key(&self, namespace: &str, key: &str) -> usize {
996 if !self.namespace_exists(namespace) {
997 return 0;
998 }
999 let cache_key = BlobCacheKey::new(namespace, key);
1000 let shard_idx = self.shard_index(&cache_key);
1001 let mut shard = self.shards[shard_idx].write();
1002 let removed = shard.remove(&cache_key);
1003 drop(shard);
1004
1005 if let Some(entry) = removed {
1006 self.record_invalidated_entry(&cache_key, &entry);
1007 1
1008 } else {
1009 self.l2
1010 .as_ref()
1011 .and_then(|l2| l2.delete_key(&cache_key))
1012 .map(|_| {
1013 self.stats.invalidations.fetch_add(1, Ordering::Relaxed);
1014 1
1015 })
1016 .unwrap_or(0)
1017 }
1018 }
1019
1020 pub fn invalidate_prefix(&self, namespace: &str, prefix: &str) -> usize {
1022 if !self.namespace_exists(namespace) {
1023 return 0;
1024 }
1025
1026 let mut removed = Vec::new();
1027 for shard in &self.shards {
1028 let mut shard = shard.write();
1029 let keys = shard
1030 .keys_matching(|key| key.namespace == namespace && key.key.starts_with(prefix));
1031 for key in keys {
1032 if let Some(entry) = shard.remove(&key) {
1033 removed.push((key, entry));
1034 }
1035 }
1036 }
1037
1038 let count = removed.len();
1039 for (key, entry) in removed {
1040 self.record_invalidated_entry(&key, &entry);
1041 }
1042 let l2_count = self
1043 .l2
1044 .as_ref()
1045 .map_or(0, |l2| l2.delete_prefix(namespace, prefix));
1046 if l2_count > count {
1047 self.stats
1048 .invalidations
1049 .fetch_add((l2_count - count) as u64, Ordering::Relaxed);
1050 }
1051 count.max(l2_count)
1052 }
1053
1054 pub fn invalidate_tags(&self, namespace: &str, tags: &[&str]) -> usize {
1060 self.invalidate_indexed_many(namespace, tags, IndexedKind::Tag)
1061 }
1062
1063 pub fn invalidate_dependencies(&self, namespace: &str, dependencies: &[&str]) -> usize {
1065 self.invalidate_indexed_many(namespace, dependencies, IndexedKind::Dependency)
1066 }
1067
1068 #[deprecated(
1070 since = "0.1.0",
1071 note = "use `invalidate_tags(namespace, &[tag])` for batched callers"
1072 )]
1073 pub fn invalidate_tag(&self, namespace: &str, tag: &str) -> usize {
1074 self.invalidate_indexed_many(namespace, &[tag], IndexedKind::Tag)
1075 }
1076
1077 #[deprecated(
1079 since = "0.1.0",
1080 note = "use `invalidate_dependencies(namespace, &[dependency])` for batched callers"
1081 )]
1082 pub fn invalidate_dependency(&self, namespace: &str, dependency: &str) -> usize {
1083 self.invalidate_indexed_many(namespace, &[dependency], IndexedKind::Dependency)
1084 }
1085
1086 pub fn invalidate_namespace(&self, namespace: &str) -> bool {
1092 if !self.namespace_exists(namespace) {
1093 return false;
1094 }
1095 let mut generations = self.namespace_generations.write();
1096 let generation = generations.entry(namespace.to_string()).or_insert(0);
1097 *generation = generation.saturating_add(1);
1098 if let Some(l2) = &self.l2 {
1099 l2.delete_namespace(namespace);
1100 }
1101 self.stats.namespace_flushes.fetch_add(1, Ordering::Relaxed);
1102 true
1103 }
1104
1105 pub fn stats(&self) -> BlobCacheStats {
1106 BlobCacheStats {
1107 hits: self.stats.hits.load(Ordering::Relaxed),
1108 misses: self.stats.misses.load(Ordering::Relaxed),
1109 insertions: self.stats.insertions.load(Ordering::Relaxed),
1110 evictions: self.stats.evictions.load(Ordering::Relaxed),
1111 expirations: self.stats.expirations.load(Ordering::Relaxed),
1112 invalidations: self.stats.invalidations.load(Ordering::Relaxed),
1113 namespace_flushes: self.stats.namespace_flushes.load(Ordering::Relaxed),
1114 version_mismatches: self.stats.version_mismatches.load(Ordering::Relaxed),
1115 entries: self.shards.iter().map(|shard| shard.read().len()).sum(),
1116 bytes_in_use: self.bytes_in_use.load(Ordering::Relaxed),
1117 l1_bytes_max: self.config.l1_bytes_max,
1118 l2_bytes_in_use: self.l2.as_ref().map_or(0, |l2| l2.stats_bytes_in_use()),
1119 l2_bytes_max: self.config.l2_bytes_max,
1120 l2_full_rejections: self.stats.l2_full_rejections.load(Ordering::Relaxed),
1121 l2_metadata_reads: self.l2.as_ref().map_or(0, |l2| l2.stats_metadata_reads()),
1122 l2_negative_skips: self.l2.as_ref().map_or(0, |l2| l2.stats_negative_skips()),
1123 synopsis_metadata_reads: self
1124 .l2
1125 .as_ref()
1126 .map_or(0, |l2| l2.stats_synopsis_metadata_reads()),
1127 synopsis_bytes: self.l2.as_ref().map_or(0, |l2| l2.stats_synopsis_bytes()),
1128 namespaces: self.namespaces.read().len(),
1129 max_namespaces: self.config.max_namespaces,
1130 promotion_queued: self
1131 .promotion_pool
1132 .get()
1133 .map_or(0, |p| p.metrics().queued_total),
1134 promotion_dropped: self
1135 .promotion_pool
1136 .get()
1137 .map_or(0, |p| p.metrics().dropped_total),
1138 promotion_completed: self
1139 .promotion_pool
1140 .get()
1141 .map_or(0, |p| p.metrics().completed_total),
1142 promotion_queue_depth: self
1143 .promotion_pool
1144 .get()
1145 .map_or(0, |p| p.metrics().queue_depth),
1146 l2_compression_original_bytes: self
1147 .l2
1148 .as_ref()
1149 .map_or(0, |l2| l2.stats_compression_original_bytes()),
1150 l2_compression_stored_bytes: self
1151 .l2
1152 .as_ref()
1153 .map_or(0, |l2| l2.stats_compression_stored_bytes()),
1154 l2_compression_skipped_total: self
1155 .l2
1156 .as_ref()
1157 .map_or(0, |l2| l2.stats_compression_skipped_total()),
1158 l2_bytes_saved_total: self
1159 .l2
1160 .as_ref()
1161 .map_or(0, |l2| l2.stats_bytes_saved_total()),
1162 l1_stale_serves_total: self.stats.l1_stale_serves.load(Ordering::Relaxed),
1163 l1_idle_evicts_total: self.stats.l1_idle_evicts.load(Ordering::Relaxed),
1164 }
1165 }
1166
1167 pub fn enable_async_promotion(self: &Arc<Self>, opts: PoolOpts) -> Arc<AsyncPromotionPool> {
1178 let weak: Weak<Self> = Arc::downgrade(self);
1179 let executor: PromotionExecutor = Arc::new(move |req| {
1180 let Some(cache) = weak.upgrade() else {
1184 return Ok(());
1185 };
1186 cache.promote_from_l2(&req)
1187 });
1188 let pool = AsyncPromotionPool::new_with_executor(opts, executor);
1189 match self.promotion_pool.set(Arc::clone(&pool)) {
1190 Ok(()) => pool,
1191 Err(losing_pool) => {
1195 losing_pool.shutdown();
1196 Arc::clone(
1197 self.promotion_pool
1198 .get()
1199 .expect("OnceLock set+get inconsistency"),
1200 )
1201 }
1202 }
1203 }
1204
1205 pub fn shutdown_async_promotion(&self) {
1209 if let Some(pool) = self.promotion_pool.get() {
1210 Arc::clone(pool).shutdown();
1211 }
1212 }
1213
1214 #[cfg(test)]
1217 fn promotion_pool_handle(&self) -> Option<Arc<AsyncPromotionPool>> {
1218 self.promotion_pool.get().cloned()
1219 }
1220
1221 #[cfg(test)]
1225 fn enable_async_promotion_with_executor(
1226 self: &Arc<Self>,
1227 opts: PoolOpts,
1228 executor: PromotionExecutor,
1229 ) -> Arc<AsyncPromotionPool> {
1230 let pool = AsyncPromotionPool::new_with_executor(opts, executor);
1231 let _ = self.promotion_pool.set(Arc::clone(&pool));
1232 pool
1233 }
1234
1235 pub fn config(&self) -> &BlobCacheConfig {
1236 &self.config
1237 }
1238
1239 #[cfg(test)]
1240 fn inject_l2_fault_after_blob_write_once(&self) {
1241 self.l2
1242 .as_ref()
1243 .expect("L2 enabled")
1244 .inject_fault_after_blob_write_once();
1245 }
1246
1247 #[cfg(test)]
1248 fn inject_l2_synopsis_maybe_present(&self, namespace: &str, key: &str) {
1249 self.l2
1250 .as_ref()
1251 .expect("L2 enabled")
1252 .inject_synopsis_maybe_present(namespace, key);
1253 }
1254
1255 #[cfg(test)]
1259 fn inject_l2_v1_entry(
1260 &self,
1261 namespace: &str,
1262 key: &str,
1263 payload: &[u8],
1264 ) -> Result<(), CacheError> {
1265 let l2 = self.l2.as_ref().expect("L2 enabled");
1266 let cache_key = BlobCacheKey::new(namespace, key);
1267 l2.inject_v1_entry(&cache_key, payload)
1268 }
1269
1270 fn validate_blob_size(&self, size: usize, policy: BlobCachePolicy) -> Result<(), CacheError> {
1271 let max = policy
1272 .max_blob_bytes_value()
1273 .unwrap_or(self.config.l1_bytes_max);
1274 if size > max {
1275 Err(CacheError::BlobTooLarge { size, max })
1276 } else {
1277 Ok(())
1278 }
1279 }
1280
1281 fn validate_metadata(&self, metadata: &BTreeMap<String, String>) -> Result<(), CacheError> {
1282 let keys = metadata.len();
1283 let bytes = metadata
1284 .iter()
1285 .map(|(key, value)| key.len() + value.len())
1286 .sum::<usize>();
1287 if keys > self.config.content_metadata_keys_max
1288 || bytes > self.config.content_metadata_bytes_max
1289 {
1290 Err(CacheError::MetadataTooLarge {
1291 keys,
1292 bytes,
1293 max_keys: self.config.content_metadata_keys_max,
1294 max_bytes: self.config.content_metadata_bytes_max,
1295 })
1296 } else {
1297 Ok(())
1298 }
1299 }
1300
1301 fn rehydrate_l2_entry(
1302 &self,
1303 key: &BlobCacheKey,
1304 now_ms: u64,
1305 namespace_generation: u64,
1306 shard_idx: usize,
1307 ) -> Option<BlobCacheHit> {
1308 let l2 = self.l2.as_ref()?;
1309 let entry = l2.get(key, now_ms, namespace_generation)?;
1310 let hit = entry.hit();
1311 self.do_l1_promotion_sync(key, entry, shard_idx);
1312 Some(hit)
1313 }
1314
1315 fn do_l1_promotion_sync(&self, key: &BlobCacheKey, entry: Entry, shard_idx: usize) {
1323 let entry_size = entry.size;
1324 let mut shard = self.shards[shard_idx].write();
1325 let outcome = shard.insert(key.clone(), entry);
1326 drop(shard);
1327 let old_size = outcome.old_entry.as_ref().map_or(0, |entry| entry.size);
1328 if entry_size >= old_size {
1329 self.bytes_in_use
1330 .fetch_add(entry_size - old_size, Ordering::Relaxed);
1331 } else {
1332 self.bytes_in_use
1333 .fetch_sub(old_size - entry_size, Ordering::Relaxed);
1334 }
1335 self.evict_until_within_budget(shard_idx);
1336 }
1337
1338 fn promote_from_l2(&self, req: &PromotionRequest) -> Result<(), String> {
1344 let l2 = self
1345 .l2
1346 .as_ref()
1347 .ok_or_else(|| "promotion executor invoked without L2 configured".to_string())?;
1348 let cache_key = BlobCacheKey::new(req.namespace.as_str(), req.key.as_str());
1349 let now_ms = unix_now_ms();
1350 let namespace_generation = self.current_generation(req.namespace.as_str());
1351 if let Some(entry) = l2.get(&cache_key, now_ms, namespace_generation) {
1352 let shard_idx = self.shard_index(&cache_key);
1353 self.do_l1_promotion_sync(&cache_key, entry, shard_idx);
1354 }
1355 Ok(())
1356 }
1357
1358 fn ensure_namespace(&self, namespace: &str) -> Result<(), CacheError> {
1359 {
1360 let namespaces = self.namespaces.read();
1361 if namespaces.contains(namespace) {
1362 return Ok(());
1363 }
1364 }
1365 let mut namespaces = self.namespaces.write();
1366 if namespaces.contains(namespace) {
1367 return Ok(());
1368 }
1369 if namespaces.len() >= self.config.max_namespaces {
1370 return Err(CacheError::TooManyNamespaces {
1371 max: self.config.max_namespaces,
1372 });
1373 }
1374 namespaces.insert(namespace.to_string());
1375 self.namespace_generations
1376 .write()
1377 .entry(namespace.to_string())
1378 .or_insert(0);
1379 Ok(())
1380 }
1381
1382 fn namespace_exists(&self, namespace: &str) -> bool {
1383 self.namespaces.read().contains(namespace)
1384 || self
1385 .l2
1386 .as_ref()
1387 .is_some_and(|l2| l2.has_namespace(namespace))
1388 }
1389
1390 fn current_generation(&self, namespace: &str) -> u64 {
1391 self.namespace_generations
1392 .read()
1393 .get(namespace)
1394 .copied()
1395 .unwrap_or(0)
1396 }
1397
1398 fn index_entry(
1399 &self,
1400 key: &BlobCacheKey,
1401 tags: &BTreeSet<String>,
1402 dependencies: &BTreeSet<String>,
1403 ) {
1404 if !tags.is_empty() {
1405 let mut index = self.tag_index.write();
1406 for tag in tags {
1407 index
1408 .entry(ScopedLabel::new(key.namespace.as_str(), tag.as_str()))
1409 .or_default()
1410 .insert(key.clone());
1411 }
1412 }
1413 if !dependencies.is_empty() {
1414 let mut index = self.dependency_index.write();
1415 for dependency in dependencies {
1416 index
1417 .entry(ScopedLabel::new(
1418 key.namespace.as_str(),
1419 dependency.as_str(),
1420 ))
1421 .or_default()
1422 .insert(key.clone());
1423 }
1424 }
1425 }
1426
1427 fn deindex_entry(&self, key: &BlobCacheKey, entry: &Entry) {
1428 Self::remove_indexed_labels(&self.tag_index, key, &entry.tags);
1429 Self::remove_indexed_labels(&self.dependency_index, key, &entry.dependencies);
1430 }
1431
1432 fn remove_indexed_labels(
1433 index: &RwLock<HashMap<ScopedLabel, HashSet<BlobCacheKey>>>,
1434 key: &BlobCacheKey,
1435 labels: &BTreeSet<String>,
1436 ) {
1437 if labels.is_empty() {
1438 return;
1439 }
1440 let mut index = index.write();
1441 for label in labels {
1442 let scoped = ScopedLabel::new(key.namespace.as_str(), label.as_str());
1443 let should_remove = if let Some(keys) = index.get_mut(&scoped) {
1444 keys.remove(key);
1445 keys.is_empty()
1446 } else {
1447 false
1448 };
1449 if should_remove {
1450 index.remove(&scoped);
1451 }
1452 }
1453 }
1454
1455 fn record_removed_entry(&self, key: &BlobCacheKey, entry: &Entry) {
1456 self.bytes_in_use.fetch_sub(entry.size, Ordering::Relaxed);
1457 self.deindex_entry(key, entry);
1458 }
1459
1460 fn record_invalidated_entry(&self, key: &BlobCacheKey, entry: &Entry) {
1461 self.record_removed_entry(key, entry);
1462 if let Some(l2) = &self.l2 {
1463 l2.delete_key(key);
1464 }
1465 self.stats.invalidations.fetch_add(1, Ordering::Relaxed);
1466 }
1467
1468 fn invalidate_indexed_many(
1469 &self,
1470 namespace: &str,
1471 labels: &[&str],
1472 kind: IndexedKind,
1473 ) -> usize {
1474 if labels.is_empty() || !self.namespace_exists(namespace) {
1475 return 0;
1476 }
1477
1478 let mut candidates: HashMap<BlobCacheKey, HashSet<String>> = HashMap::new();
1483 {
1484 let index = match kind {
1485 IndexedKind::Tag => self.tag_index.read(),
1486 IndexedKind::Dependency => self.dependency_index.read(),
1487 };
1488 for label in labels {
1489 let scoped = ScopedLabel::new(namespace, *label);
1490 if let Some(keys) = index.get(&scoped) {
1491 for key in keys {
1492 candidates
1493 .entry(key.clone())
1494 .or_default()
1495 .insert((*label).to_string());
1496 }
1497 }
1498 }
1499 }
1500
1501 if candidates.is_empty() {
1502 return 0;
1503 }
1504
1505 let mut by_shard: HashMap<usize, Vec<(BlobCacheKey, HashSet<String>)>> = HashMap::new();
1508 for (key, matched_labels) in candidates {
1509 let shard_idx = self.shard_index(&key);
1510 by_shard
1511 .entry(shard_idx)
1512 .or_default()
1513 .push((key, matched_labels));
1514 }
1515
1516 let mut removed = Vec::new();
1517 for (shard_idx, keys) in by_shard {
1518 let mut shard = self.shards[shard_idx].write();
1519 for (key, matched_labels) in keys {
1520 let still_matches = match kind {
1521 IndexedKind::Tag => shard.entry_has_any_tag(&key, &matched_labels),
1522 IndexedKind::Dependency => {
1523 shard.entry_has_any_dependency(&key, &matched_labels)
1524 }
1525 };
1526 if still_matches {
1527 if let Some(entry) = shard.remove(&key) {
1528 removed.push((key, entry));
1529 }
1530 }
1531 }
1532 }
1533
1534 let count = removed.len();
1535 for (key, entry) in removed {
1536 self.record_invalidated_entry(&key, &entry);
1537 }
1538 count
1539 }
1540
1541 fn shard_index(&self, key: &BlobCacheKey) -> usize {
1542 let mut hasher = std::collections::hash_map::DefaultHasher::new();
1543 key.hash(&mut hasher);
1544 (hasher.finish() as usize) % self.shards.len()
1545 }
1546
1547 fn check_version(
1548 &self,
1549 shard: &Shard,
1550 key: &BlobCacheKey,
1551 attempted: Option<u64>,
1552 namespace_generation: u64,
1553 ) -> Result<(), CacheError> {
1554 let Some(attempted) = attempted else {
1555 return Ok(());
1556 };
1557 let Some(existing) = shard.existing_version(key, namespace_generation) else {
1558 return Ok(());
1559 };
1560 if existing >= attempted {
1561 self.stats
1562 .version_mismatches
1563 .fetch_add(1, Ordering::Relaxed);
1564 Err(CacheError::VersionMismatch {
1565 existing,
1566 attempted,
1567 })
1568 } else {
1569 Ok(())
1570 }
1571 }
1572
1573 fn evict_until_within_budget(&self, preferred_start: usize) {
1574 while self.bytes_in_use.load(Ordering::Relaxed) > self.config.l1_bytes_max {
1575 let mut evicted = false;
1576 for offset in 0..self.shards.len() {
1577 let idx = (preferred_start + offset) % self.shards.len();
1578 let mut shard = self.shards[idx].write();
1579 if let Some((key, entry)) = shard.evict_one() {
1580 self.bytes_in_use.fetch_sub(entry.size, Ordering::Relaxed);
1581 self.stats.evictions.fetch_add(1, Ordering::Relaxed);
1582 evicted = true;
1583 drop(shard);
1584 self.deindex_entry(&key, &entry);
1585 break;
1586 }
1587 }
1588 if !evicted {
1589 break;
1590 }
1591 }
1592 }
1593}
1594
1595fn unix_now_ms() -> u64 {
1596 SystemTime::now()
1597 .duration_since(UNIX_EPOCH)
1598 .map(|duration| duration.as_millis() as u64)
1599 .unwrap_or(0)
1600}
1601
1602impl Default for BlobCache {
1603 fn default() -> Self {
1604 Self::with_defaults()
1605 }
1606}
1607
1608#[cfg(test)]
1609mod tests;