noosphere_storage/
block.rs

1use std::io::Cursor;
2
3use anyhow::{anyhow, Result};
4use async_trait::async_trait;
5use cid::{
6    multihash::{Code, MultihashDigest},
7    Cid,
8};
9use libipld_core::{
10    codec::References,
11    serde::{from_ipld, to_ipld},
12};
13use libipld_core::{
14    codec::{Codec, Decode, Encode},
15    ipld::Ipld,
16};
17use noosphere_common::{ConditionalSend, ConditionalSync};
18use serde::{de::DeserializeOwned, Serialize};
19
20#[cfg(doc)]
21use serde::Deserialize;
22
23/// An interface for storage backends that are suitable for storing blocks. A
24/// block is a chunk of bytes that can be addressed by a
25/// [CID](https://docs.ipfs.tech/concepts/content-addressing/#identifier-formats).
26/// Any backend that implements this trait should be able to reliably store and
27/// retrieve blocks given a [Cid].
28#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
29#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
30pub trait BlockStore: Clone + ConditionalSync {
31    /// Given a CID and a block, store the links (any [Cid] that is part of the
32    /// encoded data) in a suitable location for later retrieval. This method is
33    /// optional, and its default implementation is a no-op. It should be
34    /// implemented when possible to enable optimized traversal of a DAG given
35    /// its root.
36    #[allow(unused_variables)]
37    async fn put_links<C>(&mut self, cid: &Cid, block: &[u8]) -> Result<()>
38    where
39        C: Codec + Default,
40        Ipld: References<C>,
41    {
42        Ok(())
43    }
44
45    /// Given a block and its [Cid], persist the block in storage.
46    async fn put_block(&mut self, cid: &Cid, block: &[u8]) -> Result<()>;
47
48    /// Given the [Cid] of a block, retrieve the block bytes from storage.
49    async fn get_block(&self, cid: &Cid) -> Result<Option<Vec<u8>>>;
50
51    /// Given some data structure that implements [Encode] for a given [Codec],
52    /// encode it as a block and persist it to storage for later retrieval by
53    /// [Cid].
54    async fn put<C, T>(&mut self, data: T) -> Result<Cid>
55    where
56        C: Codec + Default,
57        T: Encode<C> + ConditionalSend,
58        Ipld: References<C>,
59    {
60        let codec = C::default();
61        let block = codec.encode(&data)?;
62        let cid = Cid::new_v1(codec.into(), Code::Blake3_256.digest(&block));
63
64        self.put_block(&cid, &block).await?;
65        self.put_links::<C>(&cid, &block).await?;
66
67        Ok(cid)
68    }
69
70    /// Given the [Cid] of a block that refers to a type that implements
71    /// [Decode] for some [Codec], retrieve the block, decode it as the type and
72    /// return the result.
73    async fn get<C, T>(&self, cid: &Cid) -> Result<Option<T>>
74    where
75        C: Codec + Default,
76        T: Decode<C>,
77    {
78        let codec = C::default();
79        let block = self.get_block(cid).await?;
80
81        Ok(match block {
82            Some(bytes) => Some(T::decode(codec, &mut Cursor::new(bytes))?),
83            None => None,
84        })
85    }
86
87    /// Given some data structure that implements [Serialize], convert it to an
88    /// [IPLD](https://ipld.io/docs/)-compatible representation, encode it as a
89    /// block with the desired [Codec] and persist it to the storage backend by
90    /// its [Cid]
91    async fn save<C, T>(&mut self, data: T) -> Result<Cid>
92    where
93        C: Codec + Default,
94        T: Serialize + ConditionalSend,
95        Ipld: Encode<C> + References<C>,
96    {
97        self.put::<C, Ipld>(to_ipld(data)?).await
98    }
99
100    /// Given a [Cid] that refers to some data structure that implements
101    /// [Deserialize], read the block bytes from storage, decode it as
102    /// [IPLD](https://ipld.io/docs/) using the specified [Codec] and and
103    /// deserialize it to the intended data structure, returning the result.
104    async fn load<C, T>(&self, cid: &Cid) -> Result<T>
105    where
106        C: Codec + Default,
107        T: DeserializeOwned + ConditionalSend,
108        u64: From<C>,
109        Ipld: Decode<C>,
110    {
111        let codec = u64::from(C::default());
112
113        if cid.codec() != codec {
114            return Err(anyhow!(
115                "Incorrect codec; expected {}, but CID refers to {}",
116                codec,
117                cid.codec()
118            ));
119        }
120
121        Ok(match self.get::<C, Ipld>(cid).await? {
122            Some(ipld) => from_ipld(ipld)?,
123            None => return Err(anyhow!("No block found for {}", cid)),
124        })
125    }
126
127    /// Same as load, but returns an error if no block is found locally for the
128    /// given [Cid]
129    async fn require_block(&self, cid: &Cid) -> Result<Vec<u8>> {
130        match self.get_block(cid).await? {
131            Some(block) => Ok(block),
132            None => Err(anyhow!("Block {cid} was required but not found")),
133        }
134    }
135
136    /// Flushes pending writes if there are any
137    async fn flush(&self) -> Result<()> {
138        Ok(())
139    }
140}