ipfs_sqlite_block_store/cache/
mod.rs

1use fnv::{FnvHashMap, FnvHashSet};
2use libipld::Cid;
3use std::{
4    fmt::Debug,
5    ops::{Deref, DerefMut},
6    sync::Arc,
7    time::{Duration, Instant},
8};
9mod async_tracker;
10mod sqlite_tracker;
11pub use async_tracker::{AsyncCacheTracker, Spawner};
12use parking_lot::Mutex;
13pub use sqlite_tracker::SqliteCacheTracker;
14
15#[cfg(test)]
16mod tests;
17
18/// Information about a block that is quick to gather
19///
20/// This is what is available for making decisions about whether to cache a block
21#[derive(Debug, Clone, Copy)]
22pub struct BlockInfo {
23    /// id of the block in the block store
24    id: i64,
25    /// cid
26    cid: Cid,
27    /// size of the block
28    len: usize,
29}
30
31impl BlockInfo {
32    pub fn new(id: i64, cid: &Cid, len: usize) -> Self {
33        Self { id, cid: *cid, len }
34    }
35    pub fn id(&self) -> i64 {
36        self.id
37    }
38    pub fn cid(&self) -> &Cid {
39        &self.cid
40    }
41    pub fn block_len(&self) -> usize {
42        self.len
43    }
44}
45
46/// Information about a write operation that is cheap to gather
47#[derive(Debug, Clone, Copy)]
48pub struct WriteInfo {
49    block: BlockInfo,
50    block_exists: bool,
51}
52
53impl WriteInfo {
54    pub fn new(block: BlockInfo, block_exists: bool) -> Self {
55        Self {
56            block,
57            block_exists,
58        }
59    }
60    /// true if we had the block already.
61    pub fn block_exists(&self) -> bool {
62        self.block_exists
63    }
64}
65
66impl Deref for WriteInfo {
67    type Target = BlockInfo;
68
69    fn deref(&self) -> &Self::Target {
70        &self.block
71    }
72}
73
74/// tracks block reads and writes to provide info about which blocks to evict from the LRU cache
75#[allow(unused_variables)]
76pub trait CacheTracker: Debug + Send + Sync {
77    /// called whenever blocks were accessed
78    ///
79    /// note that this method will be called very frequently, on every block access.
80    /// it is fire and forget, so it is perfectly ok to offload the writing to another thread.
81    fn blocks_accessed(&self, blocks: Vec<BlockInfo>) {}
82
83    /// called whenever blocks were written
84    ///
85    /// note that this method will be called frequently, on every block write.
86    /// it is fire and forget, so it is perfectly ok to offload the writing to another thread.
87    fn blocks_written(&self, blocks: Vec<WriteInfo>) {}
88
89    /// called whenever blocks have been deleted by gc.
90    fn blocks_deleted(&self, blocks: Vec<BlockInfo>) {}
91
92    /// sort ids by importance. More important ids should go to the end.
93    ///
94    /// this will be called from inside gc
95    fn sort_ids(&self, ids: &mut [i64]) {}
96
97    /// indicate whether `retain_ids` should be called on startup
98    fn has_persistent_state(&self) -> bool;
99
100    /// notification that only these ids should be retained
101    ///
102    /// this will be called once during startup
103    fn retain_ids(&self, ids: &[i64]) {}
104}
105
106impl CacheTracker for Arc<dyn CacheTracker> {
107    fn blocks_accessed(&self, blocks: Vec<BlockInfo>) {
108        self.as_ref().blocks_accessed(blocks)
109    }
110
111    fn blocks_written(&self, blocks: Vec<WriteInfo>) {
112        self.as_ref().blocks_written(blocks)
113    }
114
115    fn sort_ids(&self, ids: &mut [i64]) {
116        self.as_ref().sort_ids(ids)
117    }
118
119    fn blocks_deleted(&self, blocks: Vec<BlockInfo>) {
120        self.as_ref().blocks_deleted(blocks)
121    }
122
123    fn has_persistent_state(&self) -> bool {
124        self.as_ref().has_persistent_state()
125    }
126
127    fn retain_ids(&self, ids: &[i64]) {
128        self.as_ref().retain_ids(ids)
129    }
130}
131
132/// a cache tracker that does nothing whatsoever, but is extremely fast
133#[derive(Debug)]
134pub struct NoopCacheTracker;
135
136impl CacheTracker for NoopCacheTracker {
137    fn has_persistent_state(&self) -> bool {
138        false
139    }
140}
141
142/// a cache tracker that just sorts by id, which is the time of first addition of a block
143#[derive(Debug)]
144pub struct SortByIdCacheTracker;
145
146impl CacheTracker for SortByIdCacheTracker {
147    fn sort_ids(&self, ids: &mut [i64]) {
148        // a bit faster than stable sort, and obviously for ids it does not matter
149        ids.sort_unstable();
150    }
151    fn has_persistent_state(&self) -> bool {
152        false
153    }
154}
155
156/// keep track of block accesses in memory
157pub struct InMemCacheTracker<T, F> {
158    cache: Arc<Mutex<FnvHashMap<i64, T>>>,
159    mk_cache_entry: F,
160    created: Instant,
161}
162
163impl<T, F> InMemCacheTracker<T, F>
164where
165    T: Ord + Clone + Debug,
166    F: Fn(Duration, BlockInfo) -> Option<T>,
167{
168    /// mk_cache_entry will be called on each block access to create or update a cache entry.
169    /// It allows to customize whether we are interested in an entry at all, and what
170    /// entries we want to be preserved.
171    ///
172    /// E.g. to just sort entries by their access time, use `|access, _, _| Some(access)`.
173    /// this will keep entries in the cache based on last access time.
174    ///
175    /// It is also possible to use more sophisticated strategies like only caching certain cid types
176    /// or caching based on the data size.
177    pub fn new(mk_cache_entry: F) -> Self {
178        Self {
179            cache: Arc::new(Mutex::new(FnvHashMap::default())),
180            mk_cache_entry,
181            created: Instant::now(),
182        }
183    }
184}
185
186#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
187struct SortKey<T: Ord> {
188    time: Option<T>,
189    id: i64,
190}
191
192impl<T: Ord> SortKey<T> {
193    pub fn new(time: Option<T>, id: i64) -> Self {
194        Self { time, id }
195    }
196}
197
198fn get_key<T: Ord + Clone>(
199    cache: &mut impl DerefMut<Target = FnvHashMap<i64, T>>,
200    id: i64,
201) -> SortKey<T> {
202    SortKey::new(cache.get(&id).cloned(), id)
203}
204
205impl<T, F> CacheTracker for InMemCacheTracker<T, F>
206where
207    T: Ord + Clone + Debug + Send + Sync,
208    F: Fn(Duration, BlockInfo) -> Option<T> + Send + Sync,
209{
210    /// called whenever blocks were accessed
211    fn blocks_accessed(&self, blocks: Vec<BlockInfo>) {
212        let now = Instant::now().checked_duration_since(self.created).unwrap();
213        let mut cache = self.cache.lock();
214        for block in blocks {
215            if let Some(value) = (self.mk_cache_entry)(now, block) {
216                cache.insert(block.id, value);
217            } else {
218                cache.remove(&block.id);
219            }
220        }
221    }
222
223    /// notification that these ids no longer have to be tracked
224    fn blocks_deleted(&self, blocks: Vec<BlockInfo>) {
225        let mut cache = self.cache.lock();
226        for block in blocks {
227            cache.remove(&block.id);
228        }
229    }
230
231    /// notification that only these ids should be retained
232    fn retain_ids(&self, ids: &[i64]) {
233        let ids = ids.iter().cloned().collect::<FnvHashSet<_>>();
234        let mut cache = self.cache.lock();
235        cache.retain(|id, _| ids.contains(id));
236    }
237
238    /// sort ids by importance. More important ids should go to the end.
239    fn sort_ids(&self, ids: &mut [i64]) {
240        let mut cache = self.cache.lock();
241        ids.sort_unstable_by_key(move |id| get_key(&mut cache, *id));
242    }
243
244    fn has_persistent_state(&self) -> bool {
245        false
246    }
247}
248
249impl<T: Debug, F> std::fmt::Debug for InMemCacheTracker<T, F> {
250    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
251        f.debug_struct("InMemLruCacheTracker")
252            .field("cache", &self.cache.lock())
253            .finish()
254    }
255}
256
257#[cfg(test)]
258#[test]
259fn sort_key_sort_order() {
260    assert!(
261        SortKey::new(None, i64::max_value())
262            < SortKey::new(Some(Duration::default()), i64::min_value())
263    );
264}