car_mirror/
cache.rs

1use crate::common::references;
2use futures::Future;
3use libipld::{Cid, IpldCodec};
4use wnfs_common::{
5    utils::{CondSend, CondSync},
6    BlockStore, BlockStoreError,
7};
8
9/// This trait abstracts caches used by the car mirror implementation.
10/// An efficient cache implementation can significantly reduce the amount
11/// of lookups into the blockstore.
12///
13/// At the moment, all caches are either memoization tables or informationally
14/// monotonous, so you don't need to be careful about cache eviction.
15///
16/// See `InMemoryCache` for a `quick_cache`-based implementation
17/// (enable the `quick-cache` feature), and `NoCache` for disabling the cache.
18pub trait Cache: CondSync {
19    /// This returns further references from the block referenced by given CID,
20    /// if the cache is hit.
21    /// Returns `None` if it's a cache miss.
22    ///
23    /// This isn't meant to be called directly, instead use `Cache::references`.
24    fn get_references_cache(
25        &self,
26        cid: Cid,
27    ) -> impl Future<Output = Result<Option<Vec<Cid>>, BlockStoreError>> + CondSend;
28
29    /// Populates the references cache for given CID with given references.
30    fn put_references_cache(
31        &self,
32        cid: Cid,
33        references: Vec<Cid>,
34    ) -> impl Future<Output = Result<(), BlockStoreError>> + CondSend;
35
36    /// Find out any CIDs that are linked to from the block with given CID.
37    ///
38    /// This makes use of the cache via `get_references_cached`, if possible.
39    /// If the cache is missed, then it will fetch the block, compute the references
40    /// and automatically populate the cache using `put_references_cached`.
41    fn references(
42        &self,
43        cid: Cid,
44        store: &impl BlockStore,
45    ) -> impl Future<Output = Result<Vec<Cid>, BlockStoreError>> + CondSend {
46        async move {
47            // raw blocks don't have further links
48            let raw_codec: u64 = IpldCodec::Raw.into();
49            if cid.codec() == raw_codec {
50                return Ok(Vec::new());
51            }
52
53            if let Some(refs) = self.get_references_cache(cid).await? {
54                return Ok(refs);
55            }
56
57            let block = store.get_block(&cid).await?;
58            let refs = references(cid, block, Vec::new())?;
59            self.put_references_cache(cid, refs.clone()).await?;
60            Ok(refs)
61        }
62    }
63}
64
65impl<C: Cache> Cache for &C {
66    async fn get_references_cache(&self, cid: Cid) -> Result<Option<Vec<Cid>>, BlockStoreError> {
67        (**self).get_references_cache(cid).await
68    }
69
70    async fn put_references_cache(
71        &self,
72        cid: Cid,
73        references: Vec<Cid>,
74    ) -> Result<(), BlockStoreError> {
75        (**self).put_references_cache(cid, references).await
76    }
77}
78
79impl<C: Cache> Cache for Box<C> {
80    async fn get_references_cache(&self, cid: Cid) -> Result<Option<Vec<Cid>>, BlockStoreError> {
81        (**self).get_references_cache(cid).await
82    }
83
84    async fn put_references_cache(
85        &self,
86        cid: Cid,
87        references: Vec<Cid>,
88    ) -> Result<(), BlockStoreError> {
89        (**self).put_references_cache(cid, references).await
90    }
91}
92
93/// An implementation of `Cache` that doesn't cache at all.
94#[derive(Debug, Clone)]
95pub struct NoCache;
96
97impl Cache for NoCache {
98    async fn get_references_cache(&self, _: Cid) -> Result<Option<Vec<Cid>>, BlockStoreError> {
99        Ok(None)
100    }
101
102    async fn put_references_cache(&self, _: Cid, _: Vec<Cid>) -> Result<(), BlockStoreError> {
103        Ok(())
104    }
105}
106
107#[cfg(feature = "quick_cache")]
108pub use quick_cache::*;
109
110#[cfg(feature = "quick_cache")]
111mod quick_cache {
112    use super::Cache;
113    use bytes::Bytes;
114    use libipld::Cid;
115    use quick_cache::{sync, OptionsBuilder, Weighter};
116    use wnfs_common::{
117        utils::{Arc, CondSend},
118        BlockStore, BlockStoreError,
119    };
120
121    /// A [quick-cache]-based implementation of a car mirror cache.
122    ///
123    /// [quick-cache]: https://github.com/arthurprs/quick-cache/
124    #[derive(Debug, Clone)]
125    pub struct InMemoryCache {
126        references: Arc<sync::Cache<Cid, Vec<Cid>, ReferencesWeighter>>,
127    }
128
129    /// A wrapper struct for a `BlockStore` that attaches an in-memory cache
130    /// of which blocks are available and which aren't, speeding up
131    /// consecutive `has_block` and `get_block` calls.
132    #[derive(Debug, Clone)]
133    pub struct CacheMissing<B: BlockStore> {
134        /// Access to the inner blockstore
135        pub inner: B,
136        has_blocks: Arc<sync::Cache<Cid, bool>>,
137    }
138
139    impl InMemoryCache {
140        /// Create a new in-memory cache that approximately holds
141        /// cached references for `approx_cids` CIDs.
142        ///
143        /// Memory requirements can be eye-balled by calculating ~100 bytes
144        /// per CID in the cache.
145        ///
146        /// So if you want this cache to never exceed roughly ~100MB, set
147        /// `approx_cids` to `1_000_000`.
148        pub fn new(approx_cids: usize) -> Self {
149            let max_links_per_unixfs = 175;
150            let est_average_links = max_links_per_unixfs / 10;
151            Self {
152                references: Arc::new(sync::Cache::with_options(
153                    OptionsBuilder::new()
154                        .estimated_items_capacity(approx_cids / est_average_links)
155                        .weight_capacity(approx_cids as u64)
156                        .build()
157                        .expect("Couldn't create options for quick cache?"),
158                    ReferencesWeighter,
159                    Default::default(),
160                    Default::default(),
161                )),
162            }
163        }
164    }
165
166    impl Cache for InMemoryCache {
167        async fn get_references_cache(
168            &self,
169            cid: Cid,
170        ) -> Result<Option<Vec<Cid>>, BlockStoreError> {
171            Ok(self.references.get(&cid))
172        }
173
174        async fn put_references_cache(
175            &self,
176            cid: Cid,
177            references: Vec<Cid>,
178        ) -> Result<(), BlockStoreError> {
179            self.references.insert(cid, references);
180            Ok(())
181        }
182    }
183
184    impl<B: BlockStore> CacheMissing<B> {
185        /// Wrap an existing `BlockStore`, caching `has_block` responses.
186        ///
187        /// This will also intercept `get_block` requests and fail early, if the
188        /// block is known to be missing.
189        /// In some circumstances this can be problematic, e.g. if blocks can be
190        /// added and removed to the underlying blockstore without going through
191        /// the wrapped instance's `put_block` or `put_block_keyed` interfaces.
192        ///
193        /// In these cases a more advanced caching strategy that may answer
194        /// `has_block` early from a cache with a cache TTL & eviction strategy.
195        ///
196        /// The additional memory requirements for this cache can be estimated
197        /// using the `approx_capacity`: Each cache line is roughly ~100 bytes
198        /// in size, so for a 100MB cache, set this value to `1_000_000`.
199        pub fn new(approx_capacity: usize, inner: B) -> Self {
200            Self {
201                inner,
202                has_blocks: Arc::new(sync::Cache::new(approx_capacity)),
203            }
204        }
205    }
206
207    impl<B: BlockStore> BlockStore for CacheMissing<B> {
208        async fn get_block(&self, cid: &Cid) -> Result<Bytes, BlockStoreError> {
209            match self.has_blocks.get_value_or_guard_async(cid).await {
210                Ok(false) => Err(BlockStoreError::CIDNotFound(*cid)),
211                Ok(true) => self.inner.get_block(cid).await,
212                Err(guard) => match self.inner.get_block(cid).await {
213                    Ok(block) => {
214                        let _ignore_meantime_eviction = guard.insert(true);
215                        Ok(block)
216                    }
217                    e @ Err(BlockStoreError::CIDNotFound(_)) => {
218                        let _ignore_meantime_eviction = guard.insert(false);
219                        e
220                    }
221                    Err(e) => Err(e),
222                },
223            }
224        }
225
226        async fn put_block_keyed(
227            &self,
228            cid: Cid,
229            bytes: impl Into<Bytes> + CondSend,
230        ) -> Result<(), BlockStoreError> {
231            self.inner.put_block_keyed(cid, bytes).await?;
232            self.has_blocks.insert(cid, true);
233            Ok(())
234        }
235
236        async fn has_block(&self, cid: &Cid) -> Result<bool, BlockStoreError> {
237            self.has_blocks
238                .get_or_insert_async(cid, self.inner.has_block(cid))
239                .await
240        }
241
242        async fn put_block(
243            &self,
244            bytes: impl Into<Bytes> + CondSend,
245            codec: u64,
246        ) -> Result<Cid, BlockStoreError> {
247            let cid = self.inner.put_block(bytes, codec).await?;
248            self.has_blocks.insert(cid, true);
249            Ok(cid)
250        }
251
252        fn create_cid(&self, bytes: &[u8], codec: u64) -> Result<Cid, BlockStoreError> {
253            self.inner.create_cid(bytes, codec)
254        }
255    }
256
257    #[derive(Debug, Clone)]
258    struct ReferencesWeighter;
259
260    impl Weighter<Cid, Vec<Cid>> for ReferencesWeighter {
261        fn weight(&self, _key: &Cid, val: &Vec<Cid>) -> u32 {
262            1 + val.len() as u32
263        }
264    }
265
266    #[cfg(test)]
267    mod tests {
268        use super::{Cache, InMemoryCache};
269        use libipld::{cbor::DagCborCodec, Ipld, IpldCodec};
270        use testresult::TestResult;
271        use wnfs_common::{encode, BlockStore, MemoryBlockStore};
272
273        #[test_log::test(async_std::test)]
274        async fn test_references_cache() -> TestResult {
275            let store = &MemoryBlockStore::new();
276            let cache = InMemoryCache::new(100_000);
277
278            let hello_one_cid = store
279                .put_block(b"Hello, One?".to_vec(), IpldCodec::Raw.into())
280                .await?;
281            let hello_two_cid = store
282                .put_block(b"Hello, Two?".to_vec(), IpldCodec::Raw.into())
283                .await?;
284            let cid = store
285                .put_block(
286                    encode(
287                        &Ipld::List(vec![Ipld::Link(hello_one_cid), Ipld::Link(hello_two_cid)]),
288                        DagCborCodec,
289                    )?,
290                    DagCborCodec.into(),
291                )
292                .await?;
293
294            // Cache unpopulated initially
295            assert_eq!(cache.get_references_cache(cid).await?, None);
296
297            // This should populate the references cache
298            assert_eq!(
299                cache.references(cid, store).await?,
300                vec![hello_one_cid, hello_two_cid]
301            );
302
303            // Cache should now contain the references
304            assert_eq!(
305                cache.get_references_cache(cid).await?,
306                Some(vec![hello_one_cid, hello_two_cid])
307            );
308
309            Ok(())
310        }
311    }
312}
313
314#[cfg(test)]
315mod tests {
316    use super::{Cache, NoCache};
317    use anyhow::Result;
318    use libipld::{cbor::DagCborCodec, Cid, Ipld, IpldCodec};
319    use std::{collections::HashMap, sync::RwLock};
320    use testresult::TestResult;
321    use wnfs_common::{encode, BlockStore, BlockStoreError, MemoryBlockStore};
322
323    #[derive(Debug, Default)]
324    struct HashMapCache {
325        references: RwLock<HashMap<Cid, Vec<Cid>>>,
326    }
327
328    impl Cache for HashMapCache {
329        async fn get_references_cache(
330            &self,
331            cid: Cid,
332        ) -> Result<Option<Vec<Cid>>, BlockStoreError> {
333            Ok(self.references.read().unwrap().get(&cid).cloned())
334        }
335
336        async fn put_references_cache(
337            &self,
338            cid: Cid,
339            references: Vec<Cid>,
340        ) -> Result<(), BlockStoreError> {
341            self.references.write().unwrap().insert(cid, references);
342            Ok(())
343        }
344    }
345
346    #[test_log::test(async_std::test)]
347    async fn test_references_cache() -> TestResult {
348        let store = &MemoryBlockStore::new();
349        let cache = HashMapCache::default();
350
351        let hello_one_cid = store
352            .put_block(b"Hello, One?".to_vec(), IpldCodec::Raw.into())
353            .await?;
354        let hello_two_cid = store
355            .put_block(b"Hello, Two?".to_vec(), IpldCodec::Raw.into())
356            .await?;
357        let cid = store
358            .put_block(
359                encode(
360                    &Ipld::List(vec![Ipld::Link(hello_one_cid), Ipld::Link(hello_two_cid)]),
361                    DagCborCodec,
362                )?,
363                DagCborCodec.into(),
364            )
365            .await?;
366
367        // Cache unpopulated initially
368        assert_eq!(cache.get_references_cache(cid).await?, None);
369
370        // This should populate the references cache
371        assert_eq!(
372            cache.references(cid, store).await?,
373            vec![hello_one_cid, hello_two_cid]
374        );
375
376        // Cache should now contain the references
377        assert_eq!(
378            cache.get_references_cache(cid).await?,
379            Some(vec![hello_one_cid, hello_two_cid])
380        );
381
382        Ok(())
383    }
384
385    #[test_log::test(async_std::test)]
386    async fn test_no_cache_references() -> TestResult {
387        let store = &MemoryBlockStore::new();
388        let cache = NoCache;
389
390        let hello_one_cid = store
391            .put_block(b"Hello, One?".to_vec(), IpldCodec::Raw.into())
392            .await?;
393        let hello_two_cid = store
394            .put_block(b"Hello, Two?".to_vec(), IpldCodec::Raw.into())
395            .await?;
396        let cid = store
397            .put_block(
398                encode(
399                    &Ipld::List(vec![Ipld::Link(hello_one_cid), Ipld::Link(hello_two_cid)]),
400                    DagCborCodec,
401                )?,
402                DagCborCodec.into(),
403            )
404            .await?;
405
406        // Cache should start out unpopulated
407        assert_eq!(cache.get_references_cache(cid).await?, None);
408
409        // We should get the correct answer for our queries
410        assert_eq!(
411            cache.references(cid, store).await?,
412            vec![hello_one_cid, hello_two_cid]
413        );
414
415        // We don't have a populated cache though
416        assert_eq!(cache.get_references_cache(cid).await?, None);
417
418        Ok(())
419    }
420}