ipld_block_builder/
cache.rs

1use crate::batch::Batch;
2use crate::builder::BlockBuilder;
3use crate::codec::{Decoder, Encoder};
4use async_std::sync::Mutex;
5use async_trait::async_trait;
6use cached::stores::SizedCache;
7use cached::Cached;
8use libipld::cid::Cid;
9use libipld::codec::{Decode, Encode};
10use libipld::error::Result;
11use libipld::store::{ReadonlyStore, Store};
12use std::marker::PhantomData;
13
14/// Cache for ipld blocks.
15pub struct IpldCache<S, C, T> {
16    builder: BlockBuilder<S, C>,
17    cache: Mutex<SizedCache<Cid, T>>,
18}
19
20impl<S, C, T> IpldCache<S, C, T> {
21    /// Creates a new cache of size `size`.
22    pub fn new(store: S, codec: C, size: usize) -> Self {
23        Self {
24            builder: BlockBuilder::new(store, codec),
25            cache: Mutex::new(SizedCache::with_size(size)),
26        }
27    }
28}
29
30/// Readonly cache trait.
31#[async_trait]
32pub trait ReadonlyCache<C, T>
33where
34    C: Decoder + Clone + Send + Sync,
35    T: Decode<<C as Decoder>::Codec> + Clone + Send + Sync,
36{
37    /// Returns a decoded block.
38    async fn get(&self, cid: &Cid) -> Result<T>;
39}
40
41#[async_trait]
42impl<S: ReadonlyStore + Send + Sync, C, T> ReadonlyCache<C, T> for IpldCache<S, C, T>
43where
44    C: Decoder + Clone + Send + Sync,
45    T: Decode<<C as Decoder>::Codec> + Clone + Send + Sync,
46{
47    async fn get(&self, cid: &Cid) -> Result<T> {
48        if let Some(value) = self.cache.lock().await.cache_get(cid).cloned() {
49            return Ok(value);
50        }
51        let value: T = self.builder.get(cid).await?;
52        self.cache
53            .lock()
54            .await
55            .cache_set(cid.clone(), value.clone());
56        Ok(value)
57    }
58}
59
60/// Cache trait.
61#[async_trait]
62pub trait Cache<C, T>: ReadonlyCache<C, T>
63where
64    C: Decoder + Encoder + Clone + Send + Sync,
65    T: Decode<<C as Decoder>::Codec> + Encode<<C as Encoder>::Codec> + Clone + Send + Sync,
66{
67    /// Creates a typed batch.
68    fn create_batch(&self) -> CacheBatch<C, T>;
69
70    /// Creates a typed batch.
71    fn create_batch_with_capacity(&self, capacity: usize) -> CacheBatch<C, T>;
72
73    /// Inserts a batch into the store.
74    async fn insert_batch(&self, batch: CacheBatch<C, T>) -> Result<Cid>;
75
76    /// Encodes and inserts a block.
77    async fn insert(&self, value: T) -> Result<Cid>;
78
79    /// Flushes all buffers.
80    async fn flush(&self) -> Result<()>;
81
82    /// Unpins a block.
83    async fn unpin(&self, cid: &Cid) -> Result<()>;
84}
85
86#[async_trait]
87impl<S: Store + Send + Sync, C, T> Cache<C, T> for IpldCache<S, C, T>
88where
89    C: Decoder + Encoder + Clone + Send + Sync,
90    T: Decode<<C as Decoder>::Codec> + Encode<<C as Encoder>::Codec> + Clone + Send + Sync,
91{
92    fn create_batch(&self) -> CacheBatch<C, T> {
93        CacheBatch::new(self.builder.codec().clone())
94    }
95
96    fn create_batch_with_capacity(&self, capacity: usize) -> CacheBatch<C, T> {
97        CacheBatch::with_capacity(self.builder.codec().clone(), capacity)
98    }
99
100    async fn insert_batch(&self, batch: CacheBatch<C, T>) -> Result<Cid> {
101        let cid = self.builder.insert_batch(batch.batch).await?;
102        let mut cache = self.cache.lock().await;
103        for (cid, value) in batch.cache {
104            cache.cache_set(cid, value);
105        }
106        Ok(cid)
107    }
108
109    async fn insert(&self, value: T) -> Result<Cid> {
110        let cid = self.builder.insert(&value).await?;
111        self.cache.lock().await.cache_set(cid.clone(), value);
112        Ok(cid)
113    }
114
115    async fn flush(&self) -> Result<()> {
116        self.builder.flush().await
117    }
118
119    async fn unpin(&self, cid: &Cid) -> Result<()> {
120        self.builder.unpin(cid).await
121    }
122}
123
124/// Typed batch.
125pub struct CacheBatch<C, T> {
126    _marker: PhantomData<T>,
127    cache: Vec<(Cid, T)>,
128    batch: Batch<C>,
129}
130
131impl<C: Encoder, T: Encode<C::Codec>> CacheBatch<C, T> {
132    /// Creates a new batch.
133    pub fn new(codec: C) -> Self {
134        Self {
135            _marker: PhantomData,
136            cache: Default::default(),
137            batch: Batch::new(codec),
138        }
139    }
140
141    /// Creates a new batch with capacity.
142    pub fn with_capacity(codec: C, capacity: usize) -> Self {
143        Self {
144            _marker: PhantomData,
145            cache: Vec::with_capacity(capacity),
146            batch: Batch::with_capacity(codec, capacity),
147        }
148    }
149
150    /// Inserts a value into the batch.
151    pub fn insert(&mut self, value: T) -> Result<&Cid> {
152        let cid = self.batch.insert(&value)?;
153        self.cache.push((cid.clone(), value));
154        Ok(cid)
155    }
156}
157
158/// Macro to derive cache trait for a struct.
159#[macro_export]
160macro_rules! derive_cache {
161    ($struct:tt, $field:ident, $codec:ty, $type:ty) => {
162        #[async_trait::async_trait]
163        impl<S> $crate::ReadonlyCache<$codec, $type> for $struct<S>
164        where
165            S: libipld::store::ReadonlyStore + Send + Sync,
166        {
167            async fn get(&self, cid: &libipld::cid::Cid) -> libipld::error::Result<$type> {
168                self.$field.get(cid).await
169            }
170        }
171
172        #[async_trait::async_trait]
173        impl<S> $crate::Cache<$codec, $type> for $struct<S>
174        where
175            S: libipld::store::Store + Send + Sync,
176        {
177            fn create_batch(&self) -> $crate::CacheBatch<$codec, $type> {
178                self.$field.create_batch()
179            }
180
181            fn create_batch_with_capacity(
182                &self,
183                capacity: usize,
184            ) -> $crate::CacheBatch<$codec, $type> {
185                self.$field.create_batch_with_capacity(capacity)
186            }
187
188            async fn insert_batch(
189                &self,
190                batch: $crate::CacheBatch<$codec, $type>,
191            ) -> libipld::error::Result<libipld::cid::Cid> {
192                self.$field.insert_batch(batch).await
193            }
194
195            async fn insert(&self, value: $type) -> libipld::error::Result<libipld::cid::Cid> {
196                self.$field.insert(value).await
197            }
198
199            async fn flush(&self) -> libipld::error::Result<()> {
200                self.$field.flush().await
201            }
202
203            async fn unpin(&self, cid: &libipld::cid::Cid) -> libipld::error::Result<()> {
204                self.$field.unpin(cid).await
205            }
206        }
207    };
208}
209
210#[cfg(test)]
211mod tests {
212    use super::*;
213    use crate::Codec;
214    use libipld::mem::MemStore;
215
216    struct OffchainClient<S> {
217        number: IpldCache<S, Codec, u32>,
218    }
219
220    derive_cache!(OffchainClient, number, Codec, u32);
221
222    #[async_std::test]
223    async fn test_cache() {
224        let store = MemStore::default();
225        let codec = Codec::new();
226        let client = OffchainClient {
227            number: IpldCache::new(store, codec, 1),
228        };
229        let cid = client.insert(42).await.unwrap();
230        let res = client.get(&cid).await.unwrap();
231        assert_eq!(res, 42);
232    }
233}