tiered_cache/
lib.rs

1#![deny(missing_docs)]
2#![deny(unsafe_code)]
3
4//! High-performance multi-tiered cache with automatic sizing and async support.
5
6mod config;
7mod entry;
8mod stats;
9mod tier;
10
11pub use config::{CacheConfig, TierConfig};
12pub use stats::{CacheStats, TierStats};
13
14use crossbeam_utils::CachePadded;
15use lru_mem::HeapSize;
16use dashmap::DashMap;
17use entry::CacheEntry;
18use parking_lot::RwLock;
19use smallvec::SmallVec;
20use std::{hash::Hash, sync::Arc};
21use std::future::Future;
22use tokio::sync::oneshot;
23use tokio::sync::broadcast;
24use tier::Tier;
25
26
27type TierVec<K, V> = SmallVec<[Arc<CachePadded<Tier<K, V>>>; 4]>;
28
29/// High-performance multi-tiered cache with automatic sizing
30pub struct TieredCache<K, V>
31where
32    K: Hash + Eq + Clone + Send + Sync + HeapSize + 'static,
33    V: Clone + Send + Sync + HeapSize + 'static,
34{
35    tiers: TierVec<K, V>,
36    key_to_tier: Arc<DashMap<K, usize>>,
37    config: Arc<CacheConfig>,
38    update_tx: Option<broadcast::Sender<K>>,
39    put_lock: parking_lot::RwLock<()>,
40    pending_updates: DashMap<K, oneshot::Sender<Option<Arc<V>>>>,
41}
42
43impl<K, V> TieredCache<K, V>
44where
45    K: Hash + Eq + Clone + Send + Sync + HeapSize + 'static,
46    V: Clone + Send + Sync + HeapSize + 'static,
47{
48    /// Creates a new cache
49    #[inline]
50    pub fn new(config: CacheConfig) -> Self {
51        let tiers = config
52            .tiers
53            .iter()
54            .map(|tier_config| {
55                Arc::new(CachePadded::new(Tier::new(
56                    tier_config.total_capacity,
57                    tier_config.size_range,
58                )))
59            })
60            .collect();
61
62        // Calculate total cache size in bytes
63        let total_cache_size: usize = config.tiers.iter()
64            .map(|t| t.total_capacity)
65            .sum();
66
67        let tx = config
68            .update_channel_size
69            .map(|size| broadcast::channel(size).0);
70
71        Self {
72            tiers,
73            key_to_tier: Arc::new(DashMap::with_capacity(
74                total_cache_size / std::mem::size_of::<(K, usize)>()
75            )),
76            config: Arc::new(config),
77            update_tx: tx,
78            put_lock: parking_lot::RwLock::new(()),
79            pending_updates: DashMap::new(),
80        }
81    }
82
83    /// Gets or updates a cache entry asynchronously
84    #[inline]
85    pub async fn get_or_update<F, Fut>(&self, key: K, updater: F) -> Option<Arc<V>>
86    where
87        F: FnOnce() -> Fut,
88        Fut: Future<Output = Option<V>>,
89    {
90        // Fast path: check cache first
91        if let Some(value) = self.get(&key) {
92            return Some(value);
93        }
94
95        // Check if there's already an update in progress
96        if self.pending_updates.contains_key(&key) {
97            let (tx, rx) = oneshot::channel();
98            
99            // Store our new sender in pending updates
100            self.pending_updates.insert(key.clone(), tx);
101            
102            // Wait for the result
103            return rx.await.ok().flatten();
104        }
105
106        // No update in progress, we'll do it
107        let (tx, _rx) = oneshot::channel();
108        self.pending_updates.insert(key.clone(), tx);
109
110        // Perform the update
111        let result = self.update_value(key.clone(), updater).await;
112        
113        // Share result with other waiters
114        if let Some(entry) = self.pending_updates.remove(&key) {
115            let _ = entry.1.send(result.clone());
116        }
117
118        result
119    }
120
121    #[inline]
122    async fn update_value<F, Fut>(&self, key: K, updater: F) -> Option<Arc<V>>
123    where
124        F: FnOnce() -> Fut,
125        Fut: Future<Output = Option<V>>,
126    {
127        if let Some(new_value) = updater().await {
128            self.put(key.clone(), new_value.clone());
129            self.notify_update(key);
130            Some(Arc::new(new_value))
131        } else {
132            None
133        }
134    }
135
136    /// Puts a value into the cache
137    #[inline]
138    pub fn put(&self, key: K, value: V) -> Option<V> {
139        let size = value.heap_size();
140        
141        // Fast path: check if size is within any tier
142        let tier_idx = self.find_tier_for_size(size)?;
143        
144        // Acquire write lock for the entire operation
145        let _guard = self.put_lock.write();
146        
147        // Remove from ALL other tiers to ensure consistency
148        let mut old_value = None;
149        for (i, tier) in self.tiers.iter().enumerate() {
150            if i != tier_idx {
151                if let Some(removed) = tier.remove(&key) {
152                    old_value = Some(removed);
153                }
154            }
155        }
156        
157        let entry = CacheEntry::new(value, size);
158        let tier = &self.tiers[tier_idx];
159        
160        // Update mapping and insert new value
161        self.key_to_tier.insert(key.clone(), tier_idx);
162        old_value.or_else(|| tier.put(key, entry))
163        
164        // Lock is automatically released when _guard goes out of scope
165    }
166
167    /// Gets a value from the cache
168    #[inline]
169    pub fn get(&self, key: &K) -> Option<Arc<V>> {
170        // Fast path: direct tier lookup
171        let tier_idx = self.key_to_tier.get(key)?;
172        let tier = &self.tiers[*tier_idx];
173        tier.get(key)
174    }
175
176    /// Subscribes to cache updates
177    #[inline]
178    pub fn subscribe_updates(&self) -> Option<broadcast::Receiver<K>> {
179        self.update_tx.as_ref().map(|tx| tx.subscribe())
180    }
181
182    #[inline]
183    fn notify_update(&self, key: K) {
184        self.update_tx.as_ref().map(|tx| tx.send(key));
185    }
186
187    #[inline]
188    fn find_tier_for_size(&self, size: usize) -> Option<usize> {
189        // Optimize for common case of small items
190        if !self.tiers.is_empty() && size < self.config.tiers[0].size_range.1 {
191            return Some(0);
192        }
193
194        // Binary search for larger items
195        self.config
196            .tiers
197            .binary_search_by_key(&size, |tier| tier.size_range.0)
198            .ok()
199    }
200
201    /// Gets cache statistics
202    pub fn stats(&self) -> CacheStats {
203        let mut tier_stats = Vec::with_capacity(self.tiers.len());
204        let mut total_items = 0;
205        let mut total_size = 0;
206
207        for (tier, config) in self.tiers.iter().zip(self.config.tiers.iter()) {
208            let stats = tier.stats(config);
209            total_items += stats.items;
210            total_size += stats.size;
211            tier_stats.push(stats);
212        }
213
214        CacheStats {
215            tier_stats,
216            total_items,
217            total_size,
218        }
219    }
220
221    /// Removes a value from the cache
222    #[inline]
223    pub fn remove(&self, key: &K) -> Option<V> {
224        // Acquire write lock for the entire operation
225        let _guard = self.put_lock.write();
226        
227        let tier_idx = self.key_to_tier.remove(key)?;
228        let tier = &self.tiers[tier_idx.1];
229        tier.remove(key)
230        // Lock is automatically released when _guard goes out of scope
231    }
232
233    /// Clears the cache
234    pub fn clear(&self) {
235        for tier in &self.tiers {
236            tier.clear();
237        }
238        self.key_to_tier.clear();
239    }
240}
241