noosphere_storage/
db.rs

1use anyhow::{anyhow, Result};
2use async_trait::async_trait;
3use cid::Cid;
4use libipld_cbor::DagCborCodec;
5use libipld_core::{
6    codec::{Codec, Decode, Encode, References},
7    ipld::Ipld,
8    raw::RawCodec,
9};
10use noosphere_common::ConditionalSend;
11use serde::{de::DeserializeOwned, Serialize};
12use std::future::Future;
13use std::{collections::BTreeSet, fmt::Debug};
14use tokio_stream::Stream;
15use ucan::store::UcanStore;
16
17use crate::{BlockStore, KeyValueStore, MemoryStore, Storage};
18
19use async_stream::try_stream;
20
21pub const BLOCK_STORE: &str = "blocks";
22pub const LINK_STORE: &str = "links";
23pub const VERSION_STORE: &str = "versions";
24pub const METADATA_STORE: &str = "metadata";
25
26pub const SPHERE_DB_STORE_NAMES: &[&str] =
27    &[BLOCK_STORE, LINK_STORE, VERSION_STORE, METADATA_STORE];
28
29/// A [SphereDb] is a high-level storage primitive for Noosphere's APIs. It
30/// takes a [Storage] and implements [BlockStore] and [KeyValueStore],
31/// orchestrating writes so that as blocks are stored, links are also extracted
32/// and tracked separately, and also hosting metadata information such as sphere
33/// version records and other purely local configuration
34#[derive(Clone, Debug)]
35pub struct SphereDb<S>
36where
37    S: Storage,
38{
39    block_store: S::BlockStore,
40    link_store: S::KeyValueStore,
41    version_store: S::KeyValueStore,
42    metadata_store: S::KeyValueStore,
43}
44
45impl<S> SphereDb<S>
46where
47    S: Storage,
48{
49    pub async fn new(storage: &S) -> Result<SphereDb<S>> {
50        Ok(SphereDb {
51            block_store: storage.get_block_store(BLOCK_STORE).await?,
52            link_store: storage.get_key_value_store(LINK_STORE).await?,
53            version_store: storage.get_key_value_store(VERSION_STORE).await?,
54            metadata_store: storage.get_key_value_store(METADATA_STORE).await?,
55        })
56    }
57
58    /// Given a [MemoryStore], store copies of all the blocks found within in
59    /// the storage that backs this [SphereDb].
60    pub async fn persist(&mut self, memory_store: &MemoryStore) -> Result<()> {
61        let cids = memory_store.get_stored_cids().await;
62
63        for cid in &cids {
64            let block = memory_store.require_block(cid).await?;
65
66            self.put_block(cid, &block).await?;
67
68            match cid.codec() {
69                codec_id if codec_id == u64::from(DagCborCodec) => {
70                    self.put_links::<DagCborCodec>(cid, &block).await?;
71                }
72                codec_id if codec_id == u64::from(RawCodec) => {
73                    self.put_links::<RawCodec>(cid, &block).await?;
74                }
75                codec_id => warn!("Unrecognized codec {}; skipping...", codec_id),
76            }
77        }
78        Ok(())
79    }
80
81    /// Record the tip of a local sphere lineage as a [Cid]
82    pub async fn set_version(&mut self, identity: &str, version: &Cid) -> Result<()> {
83        self.version_store.set_key(identity, version).await
84    }
85
86    /// Get the most recently recorded tip of a local sphere lineage
87    pub async fn get_version(&self, identity: &str) -> Result<Option<Cid>> {
88        self.version_store.get_key(identity).await
89    }
90
91    /// Manually flush all pending writes to the underlying [Storage]
92    pub async fn flush(&self) -> Result<()> {
93        let (block_store_result, link_store_result, version_store_result, metadata_store_result) = tokio::join!(
94            self.block_store.flush(),
95            self.link_store.flush(),
96            self.version_store.flush(),
97            self.metadata_store.flush()
98        );
99
100        let results = vec![
101            ("block", block_store_result),
102            ("link", link_store_result),
103            ("version", version_store_result),
104            ("metadata", metadata_store_result),
105        ];
106
107        for (store_kind, result) in results {
108            if let Err(error) = result {
109                warn!("Failed to flush {} store: {:?}", store_kind, error);
110            }
111        }
112
113        Ok(())
114    }
115
116    /// Get the most recently recorded tip of a local sphere lineage, returning
117    /// an error if no version has ever been recorded
118    pub async fn require_version(&self, identity: &str) -> Result<Cid> {
119        self.version_store
120            .get_key(identity)
121            .await?
122            .ok_or_else(|| anyhow!("No version was found for sphere {}", identity))
123    }
124
125    /// Get all links referenced by a block given its [Cid]
126    pub async fn get_block_links(&self, cid: &Cid) -> Result<Option<Vec<Cid>>> {
127        self.link_store.get_key(&cid.to_string()).await
128    }
129
130    /// Given a [Cid] root and a predicate function, stream all links that are
131    /// referenced by the root or its descendants (recursively). The predicate
132    /// function is called with each [Cid] before it is yielded by the stream.
133    /// If the predicate returns true, the [Cid] is yielded and its referenced
134    /// links are queued to be yielded later by the stream. If the predicate
135    /// returns false, the [Cid] is skipped and by extension so are its
136    /// referenced links.
137    pub fn query_links<'a, F, P>(
138        &'a self,
139        cid: &'a Cid,
140        predicate: P,
141    ) -> impl Stream<Item = Result<Cid>> + 'a
142    where
143        F: Future<Output = Result<bool>>,
144        P: Fn(&Cid) -> F + Send + Sync + 'static,
145    {
146        try_stream! {
147            let mut visited_links = BTreeSet::new();
148            let mut remaining_links = vec![*cid];
149
150            while let Some(cid) = remaining_links.pop() {
151                if visited_links.contains(&cid) {
152                    continue;
153                }
154
155                if predicate(&cid).await? {
156                    if let Some(mut links) = self.get_block_links(&cid).await? {
157                        remaining_links.append(&mut links);
158                    }
159
160                    yield cid;
161                }
162
163                visited_links.insert(cid);
164            }
165        }
166    }
167
168    /// Stream all links that are referenced from the given root [Cid] or its
169    /// DAG descendants (recursively).
170    pub fn stream_links<'a>(&'a self, cid: &'a Cid) -> impl Stream<Item = Result<Cid>> + 'a {
171        try_stream! {
172            for await cid in self.query_links(cid, |_| async {Ok(true)}) {
173                yield cid?;
174            }
175        }
176    }
177
178    /// Stream all the blocks in the DAG starting at the given root [Cid].
179    pub fn stream_blocks<'a>(
180        &'a self,
181        cid: &'a Cid,
182    ) -> impl Stream<Item = Result<(Cid, Vec<u8>)>> + 'a {
183        try_stream! {
184            for await cid in self.stream_links(cid) {
185                let cid = cid?;
186                if let Some(block) = self.block_store.get_block(&cid).await? {
187                    yield (cid, block);
188                }
189            }
190        }
191    }
192
193    /// Get an owned copy of the underlying primitive [BlockStore] for this
194    /// [SphereDb]
195    pub fn to_block_store(&self) -> S::BlockStore {
196        self.block_store.clone()
197    }
198}
199
200#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
201#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
202impl<S> BlockStore for SphereDb<S>
203where
204    S: Storage,
205{
206    async fn put_links<C>(&mut self, cid: &Cid, block: &[u8]) -> Result<()>
207    where
208        C: Codec + Default,
209        Ipld: References<C>,
210    {
211        let codec = C::default();
212        let mut links = Vec::new();
213
214        codec.references::<Ipld, _>(block, &mut links)?;
215
216        self.link_store.set_key(&cid.to_string(), links).await?;
217
218        Ok(())
219    }
220
221    async fn put_block(&mut self, cid: &cid::Cid, block: &[u8]) -> Result<()> {
222        self.block_store.put_block(cid, block).await
223    }
224
225    async fn get_block(&self, cid: &cid::Cid) -> Result<Option<Vec<u8>>> {
226        self.block_store.get_block(cid).await
227    }
228}
229
230#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
231#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
232impl<S> KeyValueStore for SphereDb<S>
233where
234    S: Storage,
235{
236    async fn set_key<K, V>(&mut self, key: K, value: V) -> Result<()>
237    where
238        K: AsRef<[u8]> + ConditionalSend,
239        V: Serialize + ConditionalSend,
240    {
241        self.metadata_store.set_key(key, value).await
242    }
243
244    async fn unset_key<K>(&mut self, key: K) -> Result<()>
245    where
246        K: AsRef<[u8]> + ConditionalSend,
247    {
248        self.metadata_store.unset_key(key).await
249    }
250
251    async fn get_key<K, V>(&self, key: K) -> Result<Option<V>>
252    where
253        K: AsRef<[u8]> + ConditionalSend,
254        V: DeserializeOwned + ConditionalSend,
255    {
256        self.metadata_store.get_key(key).await
257    }
258}
259
260#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
261#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
262impl<S> UcanStore for SphereDb<S>
263where
264    S: Storage,
265{
266    async fn read<T: Decode<RawCodec>>(&self, cid: &Cid) -> Result<Option<T>> {
267        self.get::<RawCodec, T>(cid).await
268    }
269
270    async fn write<T: Encode<RawCodec> + ConditionalSend + Debug>(
271        &mut self,
272        token: T,
273    ) -> Result<Cid> {
274        self.put::<RawCodec, T>(token).await
275    }
276}
277
278#[cfg(test)]
279mod tests {
280
281    use libipld_cbor::DagCborCodec;
282    use libipld_core::{ipld::Ipld, raw::RawCodec};
283    use ucan::store::UcanJwtStore;
284    #[cfg(target_arch = "wasm32")]
285    use wasm_bindgen_test::{wasm_bindgen_test, wasm_bindgen_test_configure};
286
287    use crate::{block_encode, derive_cid, BlockStore, MemoryStorage, SphereDb};
288
289    use tokio_stream::StreamExt;
290
291    #[cfg(target_arch = "wasm32")]
292    wasm_bindgen_test_configure!(run_in_browser);
293
294    #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
295    #[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
296    pub async fn it_stores_links_when_a_block_is_saved() {
297        let storage_provider = MemoryStorage::default();
298        let mut db = SphereDb::new(&storage_provider).await.unwrap();
299
300        let list1 = vec!["cats", "dogs", "pigeons"];
301        let list2 = vec!["apples", "oranges", "starfruit"];
302
303        let cid1 = db.save::<DagCborCodec, _>(&list1).await.unwrap();
304        let cid2 = db.save::<DagCborCodec, _>(&list2).await.unwrap();
305
306        let list3 = vec![cid1, cid2];
307
308        let cid3 = db.save::<DagCborCodec, _>(&list3).await.unwrap();
309
310        let links = db.get_block_links(&cid3).await.unwrap();
311
312        assert_eq!(Some(list3), links);
313    }
314
315    #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
316    #[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
317    pub async fn it_can_stream_all_blocks_in_a_dag() {
318        let storage_provider = MemoryStorage::default();
319        let mut db = SphereDb::new(&storage_provider).await.unwrap();
320
321        let list1 = vec!["cats", "dogs", "pigeons"];
322        let list2 = vec!["apples", "oranges", "starfruit"];
323
324        let cid1 = db.save::<DagCborCodec, _>(&list1).await.unwrap();
325        let cid2 = db.save::<DagCborCodec, _>(&list2).await.unwrap();
326
327        let list3 = vec![cid1, cid2];
328
329        let cid3 = db.save::<DagCborCodec, _>(&list3).await.unwrap();
330
331        let stream = db.stream_blocks(&cid3);
332
333        tokio::pin!(stream);
334
335        let mut cids = Vec::new();
336
337        while let Some((cid, block)) = stream.try_next().await.unwrap() {
338            let derived_cid = derive_cid::<DagCborCodec>(&block);
339            assert_eq!(cid, derived_cid);
340            cids.push(cid);
341        }
342
343        assert_eq!(cids.len(), 3);
344
345        for cid in [cid1, cid2, cid3] {
346            assert!(cids.contains(&cid));
347        }
348    }
349
350    #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
351    #[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
352    pub async fn it_can_put_a_raw_block_and_read_it_as_a_token() {
353        let storage_provider = MemoryStorage::default();
354        let mut db = SphereDb::new(&storage_provider).await.unwrap();
355
356        let (cid, block) = block_encode::<RawCodec, _>(&Ipld::Bytes(b"foobar".to_vec())).unwrap();
357
358        db.put_block(&cid, &block).await.unwrap();
359
360        let token = db.read_token(&cid).await.unwrap();
361
362        assert_eq!(token, Some("foobar".into()));
363    }
364}