1use crate::batch::Batch;
2use crate::builder::BlockBuilder;
3use crate::codec::{Decoder, Encoder};
4use async_std::sync::Mutex;
5use async_trait::async_trait;
6use cached::stores::SizedCache;
7use cached::Cached;
8use libipld::cid::Cid;
9use libipld::codec::{Decode, Encode};
10use libipld::error::Result;
11use libipld::store::{ReadonlyStore, Store};
12use std::marker::PhantomData;
13
14pub struct IpldCache<S, C, T> {
16 builder: BlockBuilder<S, C>,
17 cache: Mutex<SizedCache<Cid, T>>,
18}
19
20impl<S, C, T> IpldCache<S, C, T> {
21 pub fn new(store: S, codec: C, size: usize) -> Self {
23 Self {
24 builder: BlockBuilder::new(store, codec),
25 cache: Mutex::new(SizedCache::with_size(size)),
26 }
27 }
28}
29
30#[async_trait]
32pub trait ReadonlyCache<C, T>
33where
34 C: Decoder + Clone + Send + Sync,
35 T: Decode<<C as Decoder>::Codec> + Clone + Send + Sync,
36{
37 async fn get(&self, cid: &Cid) -> Result<T>;
39}
40
41#[async_trait]
42impl<S: ReadonlyStore + Send + Sync, C, T> ReadonlyCache<C, T> for IpldCache<S, C, T>
43where
44 C: Decoder + Clone + Send + Sync,
45 T: Decode<<C as Decoder>::Codec> + Clone + Send + Sync,
46{
47 async fn get(&self, cid: &Cid) -> Result<T> {
48 if let Some(value) = self.cache.lock().await.cache_get(cid).cloned() {
49 return Ok(value);
50 }
51 let value: T = self.builder.get(cid).await?;
52 self.cache
53 .lock()
54 .await
55 .cache_set(cid.clone(), value.clone());
56 Ok(value)
57 }
58}
59
60#[async_trait]
62pub trait Cache<C, T>: ReadonlyCache<C, T>
63where
64 C: Decoder + Encoder + Clone + Send + Sync,
65 T: Decode<<C as Decoder>::Codec> + Encode<<C as Encoder>::Codec> + Clone + Send + Sync,
66{
67 fn create_batch(&self) -> CacheBatch<C, T>;
69
70 fn create_batch_with_capacity(&self, capacity: usize) -> CacheBatch<C, T>;
72
73 async fn insert_batch(&self, batch: CacheBatch<C, T>) -> Result<Cid>;
75
76 async fn insert(&self, value: T) -> Result<Cid>;
78
79 async fn flush(&self) -> Result<()>;
81
82 async fn unpin(&self, cid: &Cid) -> Result<()>;
84}
85
86#[async_trait]
87impl<S: Store + Send + Sync, C, T> Cache<C, T> for IpldCache<S, C, T>
88where
89 C: Decoder + Encoder + Clone + Send + Sync,
90 T: Decode<<C as Decoder>::Codec> + Encode<<C as Encoder>::Codec> + Clone + Send + Sync,
91{
92 fn create_batch(&self) -> CacheBatch<C, T> {
93 CacheBatch::new(self.builder.codec().clone())
94 }
95
96 fn create_batch_with_capacity(&self, capacity: usize) -> CacheBatch<C, T> {
97 CacheBatch::with_capacity(self.builder.codec().clone(), capacity)
98 }
99
100 async fn insert_batch(&self, batch: CacheBatch<C, T>) -> Result<Cid> {
101 let cid = self.builder.insert_batch(batch.batch).await?;
102 let mut cache = self.cache.lock().await;
103 for (cid, value) in batch.cache {
104 cache.cache_set(cid, value);
105 }
106 Ok(cid)
107 }
108
109 async fn insert(&self, value: T) -> Result<Cid> {
110 let cid = self.builder.insert(&value).await?;
111 self.cache.lock().await.cache_set(cid.clone(), value);
112 Ok(cid)
113 }
114
115 async fn flush(&self) -> Result<()> {
116 self.builder.flush().await
117 }
118
119 async fn unpin(&self, cid: &Cid) -> Result<()> {
120 self.builder.unpin(cid).await
121 }
122}
123
124pub struct CacheBatch<C, T> {
126 _marker: PhantomData<T>,
127 cache: Vec<(Cid, T)>,
128 batch: Batch<C>,
129}
130
131impl<C: Encoder, T: Encode<C::Codec>> CacheBatch<C, T> {
132 pub fn new(codec: C) -> Self {
134 Self {
135 _marker: PhantomData,
136 cache: Default::default(),
137 batch: Batch::new(codec),
138 }
139 }
140
141 pub fn with_capacity(codec: C, capacity: usize) -> Self {
143 Self {
144 _marker: PhantomData,
145 cache: Vec::with_capacity(capacity),
146 batch: Batch::with_capacity(codec, capacity),
147 }
148 }
149
150 pub fn insert(&mut self, value: T) -> Result<&Cid> {
152 let cid = self.batch.insert(&value)?;
153 self.cache.push((cid.clone(), value));
154 Ok(cid)
155 }
156}
157
158#[macro_export]
160macro_rules! derive_cache {
161 ($struct:tt, $field:ident, $codec:ty, $type:ty) => {
162 #[async_trait::async_trait]
163 impl<S> $crate::ReadonlyCache<$codec, $type> for $struct<S>
164 where
165 S: libipld::store::ReadonlyStore + Send + Sync,
166 {
167 async fn get(&self, cid: &libipld::cid::Cid) -> libipld::error::Result<$type> {
168 self.$field.get(cid).await
169 }
170 }
171
172 #[async_trait::async_trait]
173 impl<S> $crate::Cache<$codec, $type> for $struct<S>
174 where
175 S: libipld::store::Store + Send + Sync,
176 {
177 fn create_batch(&self) -> $crate::CacheBatch<$codec, $type> {
178 self.$field.create_batch()
179 }
180
181 fn create_batch_with_capacity(
182 &self,
183 capacity: usize,
184 ) -> $crate::CacheBatch<$codec, $type> {
185 self.$field.create_batch_with_capacity(capacity)
186 }
187
188 async fn insert_batch(
189 &self,
190 batch: $crate::CacheBatch<$codec, $type>,
191 ) -> libipld::error::Result<libipld::cid::Cid> {
192 self.$field.insert_batch(batch).await
193 }
194
195 async fn insert(&self, value: $type) -> libipld::error::Result<libipld::cid::Cid> {
196 self.$field.insert(value).await
197 }
198
199 async fn flush(&self) -> libipld::error::Result<()> {
200 self.$field.flush().await
201 }
202
203 async fn unpin(&self, cid: &libipld::cid::Cid) -> libipld::error::Result<()> {
204 self.$field.unpin(cid).await
205 }
206 }
207 };
208}
209
210#[cfg(test)]
211mod tests {
212 use super::*;
213 use crate::Codec;
214 use libipld::mem::MemStore;
215
216 struct OffchainClient<S> {
217 number: IpldCache<S, Codec, u32>,
218 }
219
220 derive_cache!(OffchainClient, number, Codec, u32);
221
222 #[async_std::test]
223 async fn test_cache() {
224 let store = MemStore::default();
225 let codec = Codec::new();
226 let client = OffchainClient {
227 number: IpldCache::new(store, codec, 1),
228 };
229 let cid = client.insert(42).await.unwrap();
230 let res = client.get(&cid).await.unwrap();
231 assert_eq!(res, 42);
232 }
233}