1use crate::blocks::{Aliases, Subscription};
2use async_std::stream::interval;
3use async_std::task;
4use futures::stream::StreamExt;
5use ipfs_embed_core::{async_trait, Block, Cid, Result, Storage, StoreParams};
6use libipld::codec::Decode;
7use libipld::ipld::Ipld;
8use std::time::Duration;
9
10mod blocks;
11mod id;
12
13pub struct StorageService<S: StoreParams> {
14 db: sled::Db,
15 store: Aliases<S>,
16 cache_size: usize,
17}
18
19impl<S: StoreParams> StorageService<S>
20where
21 Ipld: Decode<S::Codecs>,
22{
23 pub fn open(
24 config: &sled::Config,
25 cache_size: usize,
26 sweep_interval: Option<Duration>,
27 ) -> Result<Self> {
28 let db = config.open()?;
29 let store = Aliases::open(&db)?;
30 if let Some(sweep_interval) = sweep_interval {
31 let gc = store.clone();
32 task::spawn(async move {
33 let mut atime = gc.atime();
34 let mut stream = interval(sweep_interval);
35 while let Some(()) = stream.next().await {
36 let next_atime = gc.atime();
37 gc.evict(cache_size, atime).await.ok();
38 atime = next_atime;
39 }
40 });
41 }
42 Ok(Self {
43 db,
44 cache_size,
45 store,
46 })
47 }
48
49 pub fn atime(&self) -> u64 {
50 self.store.atime()
51 }
52
53 pub async fn evict(&self, grace_atime: u64) -> Result<()> {
54 self.store.evict(self.cache_size, grace_atime).await
55 }
56
57 pub async fn flush(&self) -> Result<()> {
58 self.db.flush_async().await?;
59 Ok(())
60 }
61
62 pub fn iter(&self) -> impl Iterator<Item = Result<Cid>> {
63 self.store.iter()
64 }
65}
66
67#[async_trait]
68impl<S: StoreParams> Storage<S> for StorageService<S>
69where
70 Ipld: Decode<S::Codecs>,
71{
72 type Subscription = Subscription;
73
74 fn contains(&self, cid: &Cid) -> Result<bool> {
75 self.store.contains(cid)
76 }
77
78 fn get(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
79 self.store.get(cid)
80 }
81
82 fn insert(&self, block: &Block<S>) -> Result<()> {
83 self.store.insert(block)
84 }
85
86 async fn alias<T: AsRef<[u8]> + Send + Sync>(&self, alias: T, cid: Option<&Cid>) -> Result<()> {
87 self.store.alias(alias.as_ref(), cid).await?;
88 self.flush().await
89 }
90
91 fn resolve<T: AsRef<[u8]> + Send + Sync>(&self, alias: T) -> Result<Option<Cid>> {
92 self.store.resolve(alias.as_ref())
93 }
94
95 async fn pinned(&self, cid: &Cid) -> Result<Option<bool>> {
96 self.store.pinned(cid).await
97 }
98
99 fn subscribe(&self) -> Self::Subscription {
100 self.store.subscribe()
101 }
102}
103
104#[cfg(test)]
105mod tests {
106 use super::*;
107 use libipld::cbor::DagCborCodec;
108 use libipld::multihash::Code;
109 use libipld::store::DefaultParams;
110 use libipld::{alias, ipld};
111
112 fn create_block(ipld: &Ipld) -> Block<DefaultParams> {
113 Block::encode(DagCborCodec, Code::Blake3_256, ipld).unwrap()
114 }
115
116 macro_rules! assert_evicted {
117 ($store:expr, $block:expr) => {
118 assert_eq!($store.pinned($block.cid()).await.unwrap(), None);
119 };
120 }
121
122 macro_rules! assert_pinned {
123 ($store:expr, $block:expr) => {
124 assert_eq!($store.pinned($block.cid()).await.unwrap(), Some(true));
125 };
126 }
127
128 macro_rules! assert_unpinned {
129 ($store:expr, $block:expr) => {
130 assert_eq!($store.pinned($block.cid()).await.unwrap(), Some(false));
131 };
132 }
133
134 fn tracing_try_init() {
135 tracing_subscriber::fmt()
136 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
137 .try_init()
138 .ok();
139 }
140
141 #[async_std::test]
142 async fn test_store_evict() {
143 tracing_try_init();
144 let config = sled::Config::new().temporary(true);
145 let store = StorageService::open(&config, 2, None).unwrap();
146 let blocks = [
147 create_block(&ipld!(0)),
148 create_block(&ipld!(1)),
149 create_block(&ipld!(2)),
150 create_block(&ipld!(3)),
151 ];
152 store.insert(&blocks[0]).unwrap();
153 store.insert(&blocks[1]).unwrap();
154 store.evict(store.atime() + 1).await.unwrap();
155 assert_unpinned!(&store, &blocks[0]);
156 assert_unpinned!(&store, &blocks[1]);
157 store.insert(&blocks[2]).unwrap();
158 store.evict(store.atime() + 1).await.unwrap();
159 assert_evicted!(&store, &blocks[0]);
160 assert_unpinned!(&store, &blocks[1]);
161 assert_unpinned!(&store, &blocks[2]);
162 store.get(&blocks[1]).unwrap();
163 store.insert(&blocks[3]).unwrap();
164 store.evict(store.atime() + 1).await.unwrap();
165 assert_unpinned!(&store, &blocks[1]);
166 assert_evicted!(&store, &blocks[2]);
167 assert_unpinned!(&store, &blocks[3]);
168 }
169
170 #[async_std::test]
171 async fn test_grace_period() {
172 tracing_try_init();
173 let config = sled::Config::new().temporary(true);
174 let store = StorageService::open(&config, 0, None).unwrap();
175 let blocks = [create_block(&ipld!(0))];
176 store.insert(&blocks[0]).unwrap();
177 store.evict(0).await.unwrap();
178 assert_unpinned!(&store, &blocks[0]);
179 store.evict(store.atime() + 1).await.unwrap();
180 assert_evicted!(&store, &blocks[0]);
181 }
182
183 #[async_std::test]
184 #[allow(clippy::many_single_char_names)]
185 async fn test_store_unpin() {
186 tracing_try_init();
187 let config = sled::Config::new().temporary(true);
188 let store = StorageService::open(&config, 2, None).unwrap();
189 let a = create_block(&ipld!({ "a": [] }));
190 let b = create_block(&ipld!({ "b": [a.cid()] }));
191 let c = create_block(&ipld!({ "c": [a.cid()] }));
192 let x = alias!(x);
193 let y = alias!(y);
194 store.insert(&a).unwrap();
195 store.insert(&b).unwrap();
196 store.insert(&c).unwrap();
197 store.alias(x, Some(b.cid())).await.unwrap();
198 store.alias(y, Some(c.cid())).await.unwrap();
199 assert_pinned!(&store, &a);
200 assert_pinned!(&store, &b);
201 assert_pinned!(&store, &c);
202 store.alias(x, None).await.unwrap();
203 assert_pinned!(&store, &a);
204 assert_unpinned!(&store, &b);
205 assert_pinned!(&store, &c);
206 store.alias(y, None).await.unwrap();
207 assert_unpinned!(&store, &a);
208 assert_unpinned!(&store, &b);
209 assert_unpinned!(&store, &c);
210 }
211
212 #[async_std::test]
213 #[allow(clippy::many_single_char_names)]
214 async fn test_store_unpin2() {
215 tracing_try_init();
216 let config = sled::Config::new().temporary(true);
217 let store = StorageService::open(&config, 2, None).unwrap();
218 let a = create_block(&ipld!({ "a": [] }));
219 let b = create_block(&ipld!({ "b": [a.cid()] }));
220 let x = alias!(x);
221 let y = alias!(y);
222 store.insert(&a).unwrap();
223 store.insert(&b).unwrap();
224 store.alias(x, Some(b.cid())).await.unwrap();
225 store.alias(y, Some(b.cid())).await.unwrap();
226 assert_pinned!(&store, &a);
227 assert_pinned!(&store, &b);
228 store.alias(x, None).await.unwrap();
229 assert_pinned!(&store, &a);
230 assert_pinned!(&store, &b);
231 store.alias(y, None).await.unwrap();
232 assert_unpinned!(&store, &a);
233 assert_unpinned!(&store, &b);
234 }
235}