ipfs_embed_db/
lib.rs

1use crate::blocks::{Aliases, Subscription};
2use async_std::stream::interval;
3use async_std::task;
4use futures::stream::StreamExt;
5use ipfs_embed_core::{async_trait, Block, Cid, Result, Storage, StoreParams};
6use libipld::codec::Decode;
7use libipld::ipld::Ipld;
8use std::time::Duration;
9
10mod blocks;
11mod id;
12
13pub struct StorageService<S: StoreParams> {
14    db: sled::Db,
15    store: Aliases<S>,
16    cache_size: usize,
17}
18
19impl<S: StoreParams> StorageService<S>
20where
21    Ipld: Decode<S::Codecs>,
22{
23    pub fn open(
24        config: &sled::Config,
25        cache_size: usize,
26        sweep_interval: Option<Duration>,
27    ) -> Result<Self> {
28        let db = config.open()?;
29        let store = Aliases::open(&db)?;
30        if let Some(sweep_interval) = sweep_interval {
31            let gc = store.clone();
32            task::spawn(async move {
33                let mut atime = gc.atime();
34                let mut stream = interval(sweep_interval);
35                while let Some(()) = stream.next().await {
36                    let next_atime = gc.atime();
37                    gc.evict(cache_size, atime).await.ok();
38                    atime = next_atime;
39                }
40            });
41        }
42        Ok(Self {
43            db,
44            cache_size,
45            store,
46        })
47    }
48
49    pub fn atime(&self) -> u64 {
50        self.store.atime()
51    }
52
53    pub async fn evict(&self, grace_atime: u64) -> Result<()> {
54        self.store.evict(self.cache_size, grace_atime).await
55    }
56
57    pub async fn flush(&self) -> Result<()> {
58        self.db.flush_async().await?;
59        Ok(())
60    }
61
62    pub fn iter(&self) -> impl Iterator<Item = Result<Cid>> {
63        self.store.iter()
64    }
65}
66
67#[async_trait]
68impl<S: StoreParams> Storage<S> for StorageService<S>
69where
70    Ipld: Decode<S::Codecs>,
71{
72    type Subscription = Subscription;
73
74    fn contains(&self, cid: &Cid) -> Result<bool> {
75        self.store.contains(cid)
76    }
77
78    fn get(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
79        self.store.get(cid)
80    }
81
82    fn insert(&self, block: &Block<S>) -> Result<()> {
83        self.store.insert(block)
84    }
85
86    async fn alias<T: AsRef<[u8]> + Send + Sync>(&self, alias: T, cid: Option<&Cid>) -> Result<()> {
87        self.store.alias(alias.as_ref(), cid).await?;
88        self.flush().await
89    }
90
91    fn resolve<T: AsRef<[u8]> + Send + Sync>(&self, alias: T) -> Result<Option<Cid>> {
92        self.store.resolve(alias.as_ref())
93    }
94
95    async fn pinned(&self, cid: &Cid) -> Result<Option<bool>> {
96        self.store.pinned(cid).await
97    }
98
99    fn subscribe(&self) -> Self::Subscription {
100        self.store.subscribe()
101    }
102}
103
104#[cfg(test)]
105mod tests {
106    use super::*;
107    use libipld::cbor::DagCborCodec;
108    use libipld::multihash::Code;
109    use libipld::store::DefaultParams;
110    use libipld::{alias, ipld};
111
112    fn create_block(ipld: &Ipld) -> Block<DefaultParams> {
113        Block::encode(DagCborCodec, Code::Blake3_256, ipld).unwrap()
114    }
115
116    macro_rules! assert_evicted {
117        ($store:expr, $block:expr) => {
118            assert_eq!($store.pinned($block.cid()).await.unwrap(), None);
119        };
120    }
121
122    macro_rules! assert_pinned {
123        ($store:expr, $block:expr) => {
124            assert_eq!($store.pinned($block.cid()).await.unwrap(), Some(true));
125        };
126    }
127
128    macro_rules! assert_unpinned {
129        ($store:expr, $block:expr) => {
130            assert_eq!($store.pinned($block.cid()).await.unwrap(), Some(false));
131        };
132    }
133
134    fn tracing_try_init() {
135        tracing_subscriber::fmt()
136            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
137            .try_init()
138            .ok();
139    }
140
141    #[async_std::test]
142    async fn test_store_evict() {
143        tracing_try_init();
144        let config = sled::Config::new().temporary(true);
145        let store = StorageService::open(&config, 2, None).unwrap();
146        let blocks = [
147            create_block(&ipld!(0)),
148            create_block(&ipld!(1)),
149            create_block(&ipld!(2)),
150            create_block(&ipld!(3)),
151        ];
152        store.insert(&blocks[0]).unwrap();
153        store.insert(&blocks[1]).unwrap();
154        store.evict(store.atime() + 1).await.unwrap();
155        assert_unpinned!(&store, &blocks[0]);
156        assert_unpinned!(&store, &blocks[1]);
157        store.insert(&blocks[2]).unwrap();
158        store.evict(store.atime() + 1).await.unwrap();
159        assert_evicted!(&store, &blocks[0]);
160        assert_unpinned!(&store, &blocks[1]);
161        assert_unpinned!(&store, &blocks[2]);
162        store.get(&blocks[1]).unwrap();
163        store.insert(&blocks[3]).unwrap();
164        store.evict(store.atime() + 1).await.unwrap();
165        assert_unpinned!(&store, &blocks[1]);
166        assert_evicted!(&store, &blocks[2]);
167        assert_unpinned!(&store, &blocks[3]);
168    }
169
170    #[async_std::test]
171    async fn test_grace_period() {
172        tracing_try_init();
173        let config = sled::Config::new().temporary(true);
174        let store = StorageService::open(&config, 0, None).unwrap();
175        let blocks = [create_block(&ipld!(0))];
176        store.insert(&blocks[0]).unwrap();
177        store.evict(0).await.unwrap();
178        assert_unpinned!(&store, &blocks[0]);
179        store.evict(store.atime() + 1).await.unwrap();
180        assert_evicted!(&store, &blocks[0]);
181    }
182
183    #[async_std::test]
184    #[allow(clippy::many_single_char_names)]
185    async fn test_store_unpin() {
186        tracing_try_init();
187        let config = sled::Config::new().temporary(true);
188        let store = StorageService::open(&config, 2, None).unwrap();
189        let a = create_block(&ipld!({ "a": [] }));
190        let b = create_block(&ipld!({ "b": [a.cid()] }));
191        let c = create_block(&ipld!({ "c": [a.cid()] }));
192        let x = alias!(x);
193        let y = alias!(y);
194        store.insert(&a).unwrap();
195        store.insert(&b).unwrap();
196        store.insert(&c).unwrap();
197        store.alias(x, Some(b.cid())).await.unwrap();
198        store.alias(y, Some(c.cid())).await.unwrap();
199        assert_pinned!(&store, &a);
200        assert_pinned!(&store, &b);
201        assert_pinned!(&store, &c);
202        store.alias(x, None).await.unwrap();
203        assert_pinned!(&store, &a);
204        assert_unpinned!(&store, &b);
205        assert_pinned!(&store, &c);
206        store.alias(y, None).await.unwrap();
207        assert_unpinned!(&store, &a);
208        assert_unpinned!(&store, &b);
209        assert_unpinned!(&store, &c);
210    }
211
212    #[async_std::test]
213    #[allow(clippy::many_single_char_names)]
214    async fn test_store_unpin2() {
215        tracing_try_init();
216        let config = sled::Config::new().temporary(true);
217        let store = StorageService::open(&config, 2, None).unwrap();
218        let a = create_block(&ipld!({ "a": [] }));
219        let b = create_block(&ipld!({ "b": [a.cid()] }));
220        let x = alias!(x);
221        let y = alias!(y);
222        store.insert(&a).unwrap();
223        store.insert(&b).unwrap();
224        store.alias(x, Some(b.cid())).await.unwrap();
225        store.alias(y, Some(b.cid())).await.unwrap();
226        assert_pinned!(&store, &a);
227        assert_pinned!(&store, &b);
228        store.alias(x, None).await.unwrap();
229        assert_pinned!(&store, &a);
230        assert_pinned!(&store, &b);
231        store.alias(y, None).await.unwrap();
232        assert_unpinned!(&store, &a);
233        assert_unpinned!(&store, &b);
234    }
235}