1#![deny(missing_docs)]
2#![deny(unsafe_code)]
3
4mod config;
7mod entry;
8mod stats;
9mod tier;
10
11pub use config::{CacheConfig, TierConfig};
12pub use stats::{CacheStats, TierStats};
13
14use crossbeam_utils::CachePadded;
15use lru_mem::HeapSize;
16use dashmap::DashMap;
17use entry::CacheEntry;
18use parking_lot::RwLock;
19use smallvec::SmallVec;
20use std::{hash::Hash, sync::Arc};
21use std::future::Future;
22use tokio::sync::oneshot;
23use tokio::sync::broadcast;
24use tier::Tier;
25
26
27type TierVec<K, V> = SmallVec<[Arc<CachePadded<Tier<K, V>>>; 4]>;
28
29pub struct TieredCache<K, V>
31where
32 K: Hash + Eq + Clone + Send + Sync + HeapSize + 'static,
33 V: Clone + Send + Sync + HeapSize + 'static,
34{
35 tiers: TierVec<K, V>,
36 key_to_tier: Arc<DashMap<K, usize>>,
37 config: Arc<CacheConfig>,
38 update_tx: Option<broadcast::Sender<K>>,
39 put_lock: parking_lot::RwLock<()>,
40 pending_updates: DashMap<K, oneshot::Sender<Option<Arc<V>>>>,
41}
42
43impl<K, V> TieredCache<K, V>
44where
45 K: Hash + Eq + Clone + Send + Sync + HeapSize + 'static,
46 V: Clone + Send + Sync + HeapSize + 'static,
47{
48 #[inline]
50 pub fn new(config: CacheConfig) -> Self {
51 let tiers = config
52 .tiers
53 .iter()
54 .map(|tier_config| {
55 Arc::new(CachePadded::new(Tier::new(
56 tier_config.total_capacity,
57 tier_config.size_range,
58 )))
59 })
60 .collect();
61
62 let total_cache_size: usize = config.tiers.iter()
64 .map(|t| t.total_capacity)
65 .sum();
66
67 let tx = config
68 .update_channel_size
69 .map(|size| broadcast::channel(size).0);
70
71 Self {
72 tiers,
73 key_to_tier: Arc::new(DashMap::with_capacity(
74 total_cache_size / std::mem::size_of::<(K, usize)>()
75 )),
76 config: Arc::new(config),
77 update_tx: tx,
78 put_lock: parking_lot::RwLock::new(()),
79 pending_updates: DashMap::new(),
80 }
81 }
82
83 #[inline]
85 pub async fn get_or_update<F, Fut>(&self, key: K, updater: F) -> Option<Arc<V>>
86 where
87 F: FnOnce() -> Fut,
88 Fut: Future<Output = Option<V>>,
89 {
90 if let Some(value) = self.get(&key) {
92 return Some(value);
93 }
94
95 if self.pending_updates.contains_key(&key) {
97 let (tx, rx) = oneshot::channel();
98
99 self.pending_updates.insert(key.clone(), tx);
101
102 return rx.await.ok().flatten();
104 }
105
106 let (tx, _rx) = oneshot::channel();
108 self.pending_updates.insert(key.clone(), tx);
109
110 let result = self.update_value(key.clone(), updater).await;
112
113 if let Some(entry) = self.pending_updates.remove(&key) {
115 let _ = entry.1.send(result.clone());
116 }
117
118 result
119 }
120
121 #[inline]
122 async fn update_value<F, Fut>(&self, key: K, updater: F) -> Option<Arc<V>>
123 where
124 F: FnOnce() -> Fut,
125 Fut: Future<Output = Option<V>>,
126 {
127 if let Some(new_value) = updater().await {
128 self.put(key.clone(), new_value.clone());
129 self.notify_update(key);
130 Some(Arc::new(new_value))
131 } else {
132 None
133 }
134 }
135
136 #[inline]
138 pub fn put(&self, key: K, value: V) -> Option<V> {
139 let size = value.heap_size();
140
141 let tier_idx = self.find_tier_for_size(size)?;
143
144 let _guard = self.put_lock.write();
146
147 let mut old_value = None;
149 for (i, tier) in self.tiers.iter().enumerate() {
150 if i != tier_idx {
151 if let Some(removed) = tier.remove(&key) {
152 old_value = Some(removed);
153 }
154 }
155 }
156
157 let entry = CacheEntry::new(value, size);
158 let tier = &self.tiers[tier_idx];
159
160 self.key_to_tier.insert(key.clone(), tier_idx);
162 old_value.or_else(|| tier.put(key, entry))
163
164 }
166
167 #[inline]
169 pub fn get(&self, key: &K) -> Option<Arc<V>> {
170 let tier_idx = self.key_to_tier.get(key)?;
172 let tier = &self.tiers[*tier_idx];
173 tier.get(key)
174 }
175
176 #[inline]
178 pub fn subscribe_updates(&self) -> Option<broadcast::Receiver<K>> {
179 self.update_tx.as_ref().map(|tx| tx.subscribe())
180 }
181
182 #[inline]
183 fn notify_update(&self, key: K) {
184 self.update_tx.as_ref().map(|tx| tx.send(key));
185 }
186
187 #[inline]
188 fn find_tier_for_size(&self, size: usize) -> Option<usize> {
189 if !self.tiers.is_empty() && size < self.config.tiers[0].size_range.1 {
191 return Some(0);
192 }
193
194 self.config
196 .tiers
197 .binary_search_by_key(&size, |tier| tier.size_range.0)
198 .ok()
199 }
200
201 pub fn stats(&self) -> CacheStats {
203 let mut tier_stats = Vec::with_capacity(self.tiers.len());
204 let mut total_items = 0;
205 let mut total_size = 0;
206
207 for (tier, config) in self.tiers.iter().zip(self.config.tiers.iter()) {
208 let stats = tier.stats(config);
209 total_items += stats.items;
210 total_size += stats.size;
211 tier_stats.push(stats);
212 }
213
214 CacheStats {
215 tier_stats,
216 total_items,
217 total_size,
218 }
219 }
220
221 #[inline]
223 pub fn remove(&self, key: &K) -> Option<V> {
224 let _guard = self.put_lock.write();
226
227 let tier_idx = self.key_to_tier.remove(key)?;
228 let tier = &self.tiers[tier_idx.1];
229 tier.remove(key)
230 }
232
233 pub fn clear(&self) {
235 for tier in &self.tiers {
236 tier.clear();
237 }
238 self.key_to_tier.clear();
239 }
240}
241