1use anyhow::{anyhow, Result};
2use async_trait::async_trait;
3use cid::Cid;
4use libipld_cbor::DagCborCodec;
5use libipld_core::{
6 codec::{Codec, Decode, Encode, References},
7 ipld::Ipld,
8 raw::RawCodec,
9};
10use noosphere_common::ConditionalSend;
11use serde::{de::DeserializeOwned, Serialize};
12use std::future::Future;
13use std::{collections::BTreeSet, fmt::Debug};
14use tokio_stream::Stream;
15use ucan::store::UcanStore;
16
17use crate::{BlockStore, KeyValueStore, MemoryStore, Storage};
18
19use async_stream::try_stream;
20
21pub const BLOCK_STORE: &str = "blocks";
22pub const LINK_STORE: &str = "links";
23pub const VERSION_STORE: &str = "versions";
24pub const METADATA_STORE: &str = "metadata";
25
26pub const SPHERE_DB_STORE_NAMES: &[&str] =
27 &[BLOCK_STORE, LINK_STORE, VERSION_STORE, METADATA_STORE];
28
29#[derive(Clone, Debug)]
35pub struct SphereDb<S>
36where
37 S: Storage,
38{
39 block_store: S::BlockStore,
40 link_store: S::KeyValueStore,
41 version_store: S::KeyValueStore,
42 metadata_store: S::KeyValueStore,
43}
44
45impl<S> SphereDb<S>
46where
47 S: Storage,
48{
49 pub async fn new(storage: &S) -> Result<SphereDb<S>> {
50 Ok(SphereDb {
51 block_store: storage.get_block_store(BLOCK_STORE).await?,
52 link_store: storage.get_key_value_store(LINK_STORE).await?,
53 version_store: storage.get_key_value_store(VERSION_STORE).await?,
54 metadata_store: storage.get_key_value_store(METADATA_STORE).await?,
55 })
56 }
57
58 pub async fn persist(&mut self, memory_store: &MemoryStore) -> Result<()> {
61 let cids = memory_store.get_stored_cids().await;
62
63 for cid in &cids {
64 let block = memory_store.require_block(cid).await?;
65
66 self.put_block(cid, &block).await?;
67
68 match cid.codec() {
69 codec_id if codec_id == u64::from(DagCborCodec) => {
70 self.put_links::<DagCborCodec>(cid, &block).await?;
71 }
72 codec_id if codec_id == u64::from(RawCodec) => {
73 self.put_links::<RawCodec>(cid, &block).await?;
74 }
75 codec_id => warn!("Unrecognized codec {}; skipping...", codec_id),
76 }
77 }
78 Ok(())
79 }
80
81 pub async fn set_version(&mut self, identity: &str, version: &Cid) -> Result<()> {
83 self.version_store.set_key(identity, version).await
84 }
85
86 pub async fn get_version(&self, identity: &str) -> Result<Option<Cid>> {
88 self.version_store.get_key(identity).await
89 }
90
91 pub async fn flush(&self) -> Result<()> {
93 let (block_store_result, link_store_result, version_store_result, metadata_store_result) = tokio::join!(
94 self.block_store.flush(),
95 self.link_store.flush(),
96 self.version_store.flush(),
97 self.metadata_store.flush()
98 );
99
100 let results = vec![
101 ("block", block_store_result),
102 ("link", link_store_result),
103 ("version", version_store_result),
104 ("metadata", metadata_store_result),
105 ];
106
107 for (store_kind, result) in results {
108 if let Err(error) = result {
109 warn!("Failed to flush {} store: {:?}", store_kind, error);
110 }
111 }
112
113 Ok(())
114 }
115
116 pub async fn require_version(&self, identity: &str) -> Result<Cid> {
119 self.version_store
120 .get_key(identity)
121 .await?
122 .ok_or_else(|| anyhow!("No version was found for sphere {}", identity))
123 }
124
125 pub async fn get_block_links(&self, cid: &Cid) -> Result<Option<Vec<Cid>>> {
127 self.link_store.get_key(&cid.to_string()).await
128 }
129
130 pub fn query_links<'a, F, P>(
138 &'a self,
139 cid: &'a Cid,
140 predicate: P,
141 ) -> impl Stream<Item = Result<Cid>> + 'a
142 where
143 F: Future<Output = Result<bool>>,
144 P: Fn(&Cid) -> F + Send + Sync + 'static,
145 {
146 try_stream! {
147 let mut visited_links = BTreeSet::new();
148 let mut remaining_links = vec![*cid];
149
150 while let Some(cid) = remaining_links.pop() {
151 if visited_links.contains(&cid) {
152 continue;
153 }
154
155 if predicate(&cid).await? {
156 if let Some(mut links) = self.get_block_links(&cid).await? {
157 remaining_links.append(&mut links);
158 }
159
160 yield cid;
161 }
162
163 visited_links.insert(cid);
164 }
165 }
166 }
167
168 pub fn stream_links<'a>(&'a self, cid: &'a Cid) -> impl Stream<Item = Result<Cid>> + 'a {
171 try_stream! {
172 for await cid in self.query_links(cid, |_| async {Ok(true)}) {
173 yield cid?;
174 }
175 }
176 }
177
178 pub fn stream_blocks<'a>(
180 &'a self,
181 cid: &'a Cid,
182 ) -> impl Stream<Item = Result<(Cid, Vec<u8>)>> + 'a {
183 try_stream! {
184 for await cid in self.stream_links(cid) {
185 let cid = cid?;
186 if let Some(block) = self.block_store.get_block(&cid).await? {
187 yield (cid, block);
188 }
189 }
190 }
191 }
192
193 pub fn to_block_store(&self) -> S::BlockStore {
196 self.block_store.clone()
197 }
198}
199
200#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
201#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
202impl<S> BlockStore for SphereDb<S>
203where
204 S: Storage,
205{
206 async fn put_links<C>(&mut self, cid: &Cid, block: &[u8]) -> Result<()>
207 where
208 C: Codec + Default,
209 Ipld: References<C>,
210 {
211 let codec = C::default();
212 let mut links = Vec::new();
213
214 codec.references::<Ipld, _>(block, &mut links)?;
215
216 self.link_store.set_key(&cid.to_string(), links).await?;
217
218 Ok(())
219 }
220
221 async fn put_block(&mut self, cid: &cid::Cid, block: &[u8]) -> Result<()> {
222 self.block_store.put_block(cid, block).await
223 }
224
225 async fn get_block(&self, cid: &cid::Cid) -> Result<Option<Vec<u8>>> {
226 self.block_store.get_block(cid).await
227 }
228}
229
230#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
231#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
232impl<S> KeyValueStore for SphereDb<S>
233where
234 S: Storage,
235{
236 async fn set_key<K, V>(&mut self, key: K, value: V) -> Result<()>
237 where
238 K: AsRef<[u8]> + ConditionalSend,
239 V: Serialize + ConditionalSend,
240 {
241 self.metadata_store.set_key(key, value).await
242 }
243
244 async fn unset_key<K>(&mut self, key: K) -> Result<()>
245 where
246 K: AsRef<[u8]> + ConditionalSend,
247 {
248 self.metadata_store.unset_key(key).await
249 }
250
251 async fn get_key<K, V>(&self, key: K) -> Result<Option<V>>
252 where
253 K: AsRef<[u8]> + ConditionalSend,
254 V: DeserializeOwned + ConditionalSend,
255 {
256 self.metadata_store.get_key(key).await
257 }
258}
259
260#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
261#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
262impl<S> UcanStore for SphereDb<S>
263where
264 S: Storage,
265{
266 async fn read<T: Decode<RawCodec>>(&self, cid: &Cid) -> Result<Option<T>> {
267 self.get::<RawCodec, T>(cid).await
268 }
269
270 async fn write<T: Encode<RawCodec> + ConditionalSend + Debug>(
271 &mut self,
272 token: T,
273 ) -> Result<Cid> {
274 self.put::<RawCodec, T>(token).await
275 }
276}
277
278#[cfg(test)]
279mod tests {
280
281 use libipld_cbor::DagCborCodec;
282 use libipld_core::{ipld::Ipld, raw::RawCodec};
283 use ucan::store::UcanJwtStore;
284 #[cfg(target_arch = "wasm32")]
285 use wasm_bindgen_test::{wasm_bindgen_test, wasm_bindgen_test_configure};
286
287 use crate::{block_encode, derive_cid, BlockStore, MemoryStorage, SphereDb};
288
289 use tokio_stream::StreamExt;
290
291 #[cfg(target_arch = "wasm32")]
292 wasm_bindgen_test_configure!(run_in_browser);
293
294 #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
295 #[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
296 pub async fn it_stores_links_when_a_block_is_saved() {
297 let storage_provider = MemoryStorage::default();
298 let mut db = SphereDb::new(&storage_provider).await.unwrap();
299
300 let list1 = vec!["cats", "dogs", "pigeons"];
301 let list2 = vec!["apples", "oranges", "starfruit"];
302
303 let cid1 = db.save::<DagCborCodec, _>(&list1).await.unwrap();
304 let cid2 = db.save::<DagCborCodec, _>(&list2).await.unwrap();
305
306 let list3 = vec![cid1, cid2];
307
308 let cid3 = db.save::<DagCborCodec, _>(&list3).await.unwrap();
309
310 let links = db.get_block_links(&cid3).await.unwrap();
311
312 assert_eq!(Some(list3), links);
313 }
314
315 #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
316 #[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
317 pub async fn it_can_stream_all_blocks_in_a_dag() {
318 let storage_provider = MemoryStorage::default();
319 let mut db = SphereDb::new(&storage_provider).await.unwrap();
320
321 let list1 = vec!["cats", "dogs", "pigeons"];
322 let list2 = vec!["apples", "oranges", "starfruit"];
323
324 let cid1 = db.save::<DagCborCodec, _>(&list1).await.unwrap();
325 let cid2 = db.save::<DagCborCodec, _>(&list2).await.unwrap();
326
327 let list3 = vec![cid1, cid2];
328
329 let cid3 = db.save::<DagCborCodec, _>(&list3).await.unwrap();
330
331 let stream = db.stream_blocks(&cid3);
332
333 tokio::pin!(stream);
334
335 let mut cids = Vec::new();
336
337 while let Some((cid, block)) = stream.try_next().await.unwrap() {
338 let derived_cid = derive_cid::<DagCborCodec>(&block);
339 assert_eq!(cid, derived_cid);
340 cids.push(cid);
341 }
342
343 assert_eq!(cids.len(), 3);
344
345 for cid in [cid1, cid2, cid3] {
346 assert!(cids.contains(&cid));
347 }
348 }
349
350 #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
351 #[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
352 pub async fn it_can_put_a_raw_block_and_read_it_as_a_token() {
353 let storage_provider = MemoryStorage::default();
354 let mut db = SphereDb::new(&storage_provider).await.unwrap();
355
356 let (cid, block) = block_encode::<RawCodec, _>(&Ipld::Bytes(b"foobar".to_vec())).unwrap();
357
358 db.put_block(&cid, &block).await.unwrap();
359
360 let token = db.read_token(&cid).await.unwrap();
361
362 assert_eq!(token, Some("foobar".into()));
363 }
364}