ipfs_sqlite_block_store/cache/
mod.rs1use fnv::{FnvHashMap, FnvHashSet};
2use libipld::Cid;
3use std::{
4 fmt::Debug,
5 ops::{Deref, DerefMut},
6 sync::Arc,
7 time::{Duration, Instant},
8};
9mod async_tracker;
10mod sqlite_tracker;
11pub use async_tracker::{AsyncCacheTracker, Spawner};
12use parking_lot::Mutex;
13pub use sqlite_tracker::SqliteCacheTracker;
14
15#[cfg(test)]
16mod tests;
17
18#[derive(Debug, Clone, Copy)]
22pub struct BlockInfo {
23 id: i64,
25 cid: Cid,
27 len: usize,
29}
30
31impl BlockInfo {
32 pub fn new(id: i64, cid: &Cid, len: usize) -> Self {
33 Self { id, cid: *cid, len }
34 }
35 pub fn id(&self) -> i64 {
36 self.id
37 }
38 pub fn cid(&self) -> &Cid {
39 &self.cid
40 }
41 pub fn block_len(&self) -> usize {
42 self.len
43 }
44}
45
46#[derive(Debug, Clone, Copy)]
48pub struct WriteInfo {
49 block: BlockInfo,
50 block_exists: bool,
51}
52
53impl WriteInfo {
54 pub fn new(block: BlockInfo, block_exists: bool) -> Self {
55 Self {
56 block,
57 block_exists,
58 }
59 }
60 pub fn block_exists(&self) -> bool {
62 self.block_exists
63 }
64}
65
66impl Deref for WriteInfo {
67 type Target = BlockInfo;
68
69 fn deref(&self) -> &Self::Target {
70 &self.block
71 }
72}
73
74#[allow(unused_variables)]
76pub trait CacheTracker: Debug + Send + Sync {
77 fn blocks_accessed(&self, blocks: Vec<BlockInfo>) {}
82
83 fn blocks_written(&self, blocks: Vec<WriteInfo>) {}
88
89 fn blocks_deleted(&self, blocks: Vec<BlockInfo>) {}
91
92 fn sort_ids(&self, ids: &mut [i64]) {}
96
97 fn has_persistent_state(&self) -> bool;
99
100 fn retain_ids(&self, ids: &[i64]) {}
104}
105
106impl CacheTracker for Arc<dyn CacheTracker> {
107 fn blocks_accessed(&self, blocks: Vec<BlockInfo>) {
108 self.as_ref().blocks_accessed(blocks)
109 }
110
111 fn blocks_written(&self, blocks: Vec<WriteInfo>) {
112 self.as_ref().blocks_written(blocks)
113 }
114
115 fn sort_ids(&self, ids: &mut [i64]) {
116 self.as_ref().sort_ids(ids)
117 }
118
119 fn blocks_deleted(&self, blocks: Vec<BlockInfo>) {
120 self.as_ref().blocks_deleted(blocks)
121 }
122
123 fn has_persistent_state(&self) -> bool {
124 self.as_ref().has_persistent_state()
125 }
126
127 fn retain_ids(&self, ids: &[i64]) {
128 self.as_ref().retain_ids(ids)
129 }
130}
131
132#[derive(Debug)]
134pub struct NoopCacheTracker;
135
136impl CacheTracker for NoopCacheTracker {
137 fn has_persistent_state(&self) -> bool {
138 false
139 }
140}
141
142#[derive(Debug)]
144pub struct SortByIdCacheTracker;
145
146impl CacheTracker for SortByIdCacheTracker {
147 fn sort_ids(&self, ids: &mut [i64]) {
148 ids.sort_unstable();
150 }
151 fn has_persistent_state(&self) -> bool {
152 false
153 }
154}
155
156pub struct InMemCacheTracker<T, F> {
158 cache: Arc<Mutex<FnvHashMap<i64, T>>>,
159 mk_cache_entry: F,
160 created: Instant,
161}
162
163impl<T, F> InMemCacheTracker<T, F>
164where
165 T: Ord + Clone + Debug,
166 F: Fn(Duration, BlockInfo) -> Option<T>,
167{
168 pub fn new(mk_cache_entry: F) -> Self {
178 Self {
179 cache: Arc::new(Mutex::new(FnvHashMap::default())),
180 mk_cache_entry,
181 created: Instant::now(),
182 }
183 }
184}
185
186#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
187struct SortKey<T: Ord> {
188 time: Option<T>,
189 id: i64,
190}
191
192impl<T: Ord> SortKey<T> {
193 pub fn new(time: Option<T>, id: i64) -> Self {
194 Self { time, id }
195 }
196}
197
198fn get_key<T: Ord + Clone>(
199 cache: &mut impl DerefMut<Target = FnvHashMap<i64, T>>,
200 id: i64,
201) -> SortKey<T> {
202 SortKey::new(cache.get(&id).cloned(), id)
203}
204
205impl<T, F> CacheTracker for InMemCacheTracker<T, F>
206where
207 T: Ord + Clone + Debug + Send + Sync,
208 F: Fn(Duration, BlockInfo) -> Option<T> + Send + Sync,
209{
210 fn blocks_accessed(&self, blocks: Vec<BlockInfo>) {
212 let now = Instant::now().checked_duration_since(self.created).unwrap();
213 let mut cache = self.cache.lock();
214 for block in blocks {
215 if let Some(value) = (self.mk_cache_entry)(now, block) {
216 cache.insert(block.id, value);
217 } else {
218 cache.remove(&block.id);
219 }
220 }
221 }
222
223 fn blocks_deleted(&self, blocks: Vec<BlockInfo>) {
225 let mut cache = self.cache.lock();
226 for block in blocks {
227 cache.remove(&block.id);
228 }
229 }
230
231 fn retain_ids(&self, ids: &[i64]) {
233 let ids = ids.iter().cloned().collect::<FnvHashSet<_>>();
234 let mut cache = self.cache.lock();
235 cache.retain(|id, _| ids.contains(id));
236 }
237
238 fn sort_ids(&self, ids: &mut [i64]) {
240 let mut cache = self.cache.lock();
241 ids.sort_unstable_by_key(move |id| get_key(&mut cache, *id));
242 }
243
244 fn has_persistent_state(&self) -> bool {
245 false
246 }
247}
248
249impl<T: Debug, F> std::fmt::Debug for InMemCacheTracker<T, F> {
250 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
251 f.debug_struct("InMemLruCacheTracker")
252 .field("cache", &self.cache.lock())
253 .finish()
254 }
255}
256
257#[cfg(test)]
258#[test]
259fn sort_key_sort_order() {
260 assert!(
261 SortKey::new(None, i64::max_value())
262 < SortKey::new(Some(Duration::default()), i64::min_value())
263 );
264}