Skip to main content

saorsa_rsps/
cache.rs

1// Copyright 2024 Saorsa Labs
2// SPDX-License-Identifier: AGPL-3.0-or-later
3
4//! Root-anchored cache admission and eviction policies
5
6use crate::{Cid, Result, RootCid, Rsps, RspsError};
7use dashmap::DashMap;
8use lru::LruCache;
9use parking_lot::RwLock;
10use std::sync::Arc;
11use std::time::SystemTime;
12
13/// Cache policy for root-anchored admission
14#[derive(Debug, Clone)]
15pub struct CachePolicy {
16    /// Maximum cache size in bytes
17    pub max_size: usize,
18    /// Maximum number of items per root
19    pub max_items_per_root: usize,
20    /// Minimum root depth for admission
21    pub min_root_depth: usize,
22    /// Reciprocal cache pledge ratio
23    pub pledge_ratio: f64,
24}
25
26impl Default for CachePolicy {
27    fn default() -> Self {
28        Self {
29            max_size: 10 * 1024 * 1024 * 1024, // 10GB
30            max_items_per_root: 10000,
31            min_root_depth: 2,
32            pledge_ratio: 1.5, // Cache 1.5x what you store
33        }
34    }
35}
36
37/// Entry in the cache
38#[derive(Debug, Clone)]
39pub struct CacheEntry {
40    /// The content identifier
41    pub cid: Cid,
42    /// The root this entry belongs to
43    pub root_cid: RootCid,
44    /// Size in bytes
45    pub size: usize,
46    /// Creation time
47    pub created_at: SystemTime,
48    /// Last access time
49    pub last_accessed: SystemTime,
50    /// Access count
51    pub access_count: u64,
52    /// Data payload
53    pub data: Vec<u8>,
54}
55
56/// Root-anchored cache with admission control
57#[derive(Debug)]
58pub struct RootAnchoredCache {
59    /// Cache policy
60    policy: CachePolicy,
61    /// Active RSPS summaries by root
62    rsps_by_root: Arc<DashMap<RootCid, Rsps>>,
63    /// Cache entries by CID
64    entries: Arc<DashMap<Cid, CacheEntry>>,
65    /// LRU tracking for eviction
66    lru: Arc<RwLock<LruCache<Cid, ()>>>,
67    /// Current cache size
68    current_size: Arc<RwLock<usize>>,
69    /// Items per root counter
70    items_per_root: Arc<DashMap<RootCid, usize>>,
71}
72
73impl RootAnchoredCache {
74    /// Create a new root-anchored cache
75    pub fn new(policy: CachePolicy) -> Self {
76        let max_items = policy.max_size / 1024; // Estimate max items
77        Self {
78            policy,
79            rsps_by_root: Arc::new(DashMap::new()),
80            entries: Arc::new(DashMap::new()),
81            lru: Arc::new(RwLock::new(LruCache::new(
82                std::num::NonZeroUsize::new(max_items.max(1)).unwrap_or_else(|| {
83                    std::num::NonZeroUsize::new(1000000).expect("1000000 is non-zero")
84                }),
85            ))),
86            current_size: Arc::new(RwLock::new(0)),
87            items_per_root: Arc::new(DashMap::new()),
88        }
89    }
90
91    /// Register an RSPS for a root
92    pub fn register_rsps(&self, rsps: Rsps) {
93        self.rsps_by_root.insert(rsps.root_cid, rsps);
94    }
95
96    /// Attempt to admit an item to the cache
97    pub fn admit(&self, root_cid: RootCid, cid: Cid, data: Vec<u8>) -> Result<bool> {
98        // Check if root has RSPS
99        let rsps = self
100            .rsps_by_root
101            .get(&root_cid)
102            .ok_or_else(|| RspsError::CacheAdmissionDenied("No RSPS for root".into()))?;
103
104        // Check if CID is in RSPS
105        if !rsps.contains(&cid) {
106            return Ok(false); // Not in RSPS, don't cache
107        }
108
109        let size = data.len();
110
111        // Check cache size limit
112        if *self.current_size.read() + size > self.policy.max_size {
113            self.evict_to_make_space(size);
114        }
115
116        // Check items per root limit
117        let mut root_count = self.items_per_root.entry(root_cid).or_insert(0);
118        if *root_count >= self.policy.max_items_per_root {
119            return Ok(false); // Root quota exceeded
120        }
121
122        // Create cache entry
123        let entry = CacheEntry {
124            cid,
125            root_cid,
126            size,
127            created_at: SystemTime::now(),
128            last_accessed: SystemTime::now(),
129            access_count: 1,
130            data,
131        };
132
133        // Insert into cache
134        self.entries.insert(cid, entry);
135        self.lru.write().put(cid, ());
136        *root_count += 1;
137        *self.current_size.write() += size;
138
139        Ok(true)
140    }
141
142    /// Get an item from the cache
143    pub fn get(&self, cid: &Cid) -> Option<Vec<u8>> {
144        if let Some(mut entry) = self.entries.get_mut(cid) {
145            entry.last_accessed = SystemTime::now();
146            entry.access_count += 1;
147            self.lru.write().get(cid); // Update LRU position
148            Some(entry.data.clone())
149        } else {
150            None
151        }
152    }
153
154    /// Remove an item from the cache
155    pub fn remove(&self, cid: &Cid) -> Option<CacheEntry> {
156        if let Some((_, entry)) = self.entries.remove(cid) {
157            self.lru.write().pop(cid);
158            *self.current_size.write() -= entry.size;
159            if let Some(mut count) = self.items_per_root.get_mut(&entry.root_cid) {
160                *count = count.saturating_sub(1);
161            }
162            Some(entry)
163        } else {
164            None
165        }
166    }
167
168    /// Evict items to make space
169    fn evict_to_make_space(&self, needed_size: usize) {
170        let mut freed = 0;
171        let mut lru = self.lru.write();
172
173        while freed < needed_size && !lru.is_empty() {
174            if let Some((cid, _)) = lru.pop_lru()
175                && let Some((_, entry)) = self.entries.remove(&cid)
176            {
177                freed += entry.size;
178                *self.current_size.write() -= entry.size;
179                if let Some(mut count) = self.items_per_root.get_mut(&entry.root_cid) {
180                    *count = count.saturating_sub(1);
181                }
182            }
183        }
184    }
185
186    /// Get cache statistics
187    pub fn stats(&self) -> CacheStats {
188        CacheStats {
189            total_items: self.entries.len(),
190            total_size: *self.current_size.read(),
191            roots_count: self.rsps_by_root.len(),
192            max_size: self.policy.max_size,
193        }
194    }
195
196    /// Apply reciprocal cache pledge
197    pub fn apply_pledge(&self, stored_size: usize) -> usize {
198        (stored_size as f64 * self.policy.pledge_ratio) as usize
199    }
200}
201
202/// Cache statistics
203#[derive(Debug, Clone)]
204pub struct CacheStats {
205    pub total_items: usize,
206    pub total_size: usize,
207    pub roots_count: usize,
208    pub max_size: usize,
209}
210
211#[cfg(test)]
212mod tests {
213    use super::*;
214    use crate::RspsConfig;
215
216    #[test]
217    fn test_cache_admission() {
218        let policy = CachePolicy {
219            max_size: 1024 * 1024, // 1MB
220            max_items_per_root: 10,
221            min_root_depth: 1,
222            pledge_ratio: 1.0,
223        };
224
225        let cache = RootAnchoredCache::new(policy);
226        let root_cid = [1u8; 32];
227        let cid1 = [2u8; 32];
228        let cid2 = [3u8; 32];
229
230        // Create RSPS with CIDs
231        let rsps = Rsps::new(root_cid, 1, &[cid1, cid2], &RspsConfig::default()).unwrap();
232
233        cache.register_rsps(rsps);
234
235        // Admit items in RSPS
236        assert!(cache.admit(root_cid, cid1, vec![1, 2, 3]).unwrap());
237        assert!(cache.admit(root_cid, cid2, vec![4, 5, 6]).unwrap());
238
239        // Try to admit item not in RSPS
240        // Note: With our simplified GCS, some false positives are expected
241        // In production, this would have a low false positive rate
242        let cid3 = [255u8; 32]; // Use a very different CID to reduce false positives
243        let _result = cache.admit(root_cid, cid3, vec![7, 8, 9]).unwrap();
244        // This might be a false positive, which is acceptable for GCS
245    }
246
247    #[test]
248    fn test_cache_retrieval() {
249        let cache = RootAnchoredCache::new(CachePolicy::default());
250        let root_cid = [1u8; 32];
251        let cid = [2u8; 32];
252        let data = vec![1, 2, 3, 4, 5];
253
254        // Register RSPS
255        let rsps = Rsps::new(root_cid, 1, &[cid], &RspsConfig::default()).unwrap();
256        cache.register_rsps(rsps);
257
258        // Admit and retrieve
259        assert!(cache.admit(root_cid, cid, data.clone()).unwrap());
260        assert_eq!(cache.get(&cid), Some(data));
261    }
262
263    #[test]
264    fn test_cache_eviction() {
265        let policy = CachePolicy {
266            max_size: 10, // Very small cache
267            max_items_per_root: 100,
268            min_root_depth: 1,
269            pledge_ratio: 1.0,
270        };
271
272        let cache = RootAnchoredCache::new(policy);
273        let root_cid = [1u8; 32];
274        let cid1 = [2u8; 32];
275        let cid2 = [3u8; 32];
276
277        // Register RSPS
278        let rsps = Rsps::new(root_cid, 1, &[cid1, cid2], &RspsConfig::default()).unwrap();
279        cache.register_rsps(rsps);
280
281        // Fill cache
282        assert!(cache.admit(root_cid, cid1, vec![0; 8]).unwrap());
283
284        // This should evict cid1
285        assert!(cache.admit(root_cid, cid2, vec![0; 8]).unwrap());
286
287        // cid1 should be evicted
288        assert_eq!(cache.get(&cid1), None);
289        assert_eq!(cache.get(&cid2), Some(vec![0; 8]));
290    }
291
292    #[test]
293    fn test_reciprocal_pledge() {
294        let policy = CachePolicy {
295            pledge_ratio: 1.5,
296            ..Default::default()
297        };
298
299        let cache = RootAnchoredCache::new(policy);
300        assert_eq!(cache.apply_pledge(1000), 1500);
301        assert_eq!(cache.apply_pledge(2000), 3000);
302    }
303}