noosphere_ipfs/
storage.rs

1use crate::IpfsClient;
2use anyhow::Result;
3use async_trait::async_trait;
4use cid::Cid;
5use noosphere_common::ConditionalSync;
6use noosphere_storage::{BlockStore, Storage};
7use std::sync::Arc;
8use tokio::sync::RwLock;
9
10#[cfg(doc)]
11use noosphere_storage::KeyValueStore;
12
13/// [IpfsStorage] is an implementation of [Storage] that wraps another
14/// implementation of [Storage] and an [IpfsClient].
15/// [IpfsStorage] is generic over [BlockStore] and [KeyValueStore]
16/// but will produce a [IpfsStore] wrapped [BlockStore]
17#[derive(Clone, Debug)]
18pub struct IpfsStorage<S, C>
19where
20    S: Storage,
21    C: IpfsClient,
22{
23    local_storage: S,
24    ipfs_client: Option<C>,
25}
26
27impl<S, C> IpfsStorage<S, C>
28where
29    S: Storage,
30    C: IpfsClient,
31{
32    pub fn new(local_storage: S, ipfs_client: Option<C>) -> Self {
33        IpfsStorage {
34            local_storage,
35            ipfs_client,
36        }
37    }
38}
39
40#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
41#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
42impl<S, C> Storage for IpfsStorage<S, C>
43where
44    S: Storage + ConditionalSync,
45    C: IpfsClient + ConditionalSync,
46{
47    type BlockStore = IpfsStore<S::BlockStore, C>;
48
49    type KeyValueStore = S::KeyValueStore;
50
51    async fn get_block_store(&self, name: &str) -> Result<Self::BlockStore> {
52        let store = self.local_storage.get_block_store(name).await?;
53        Ok(IpfsStore::new(store, self.ipfs_client.clone()))
54    }
55
56    async fn get_key_value_store(&self, name: &str) -> Result<Self::KeyValueStore> {
57        self.local_storage.get_key_value_store(name).await
58    }
59}
60
61/// An implementation of [BlockStore] that wraps some other implementation of
62/// same. It forwards most behavior to its wrapped implementation, except when
63/// reading blocks. In that case, if a block cannot be found locally, it will
64/// attempt to fail-over by requesting the block from a configured IPFS gateway
65/// API. If the block is found, it is added to local storage and then returned
66/// as normal
67#[derive(Clone)]
68pub struct IpfsStore<B, C>
69where
70    B: BlockStore,
71    C: IpfsClient + ConditionalSync,
72{
73    local_store: Arc<RwLock<B>>,
74    ipfs_client: Option<C>,
75}
76
77impl<B, C> IpfsStore<B, C>
78where
79    B: BlockStore,
80    C: IpfsClient + ConditionalSync,
81{
82    pub fn new(block_store: B, ipfs_client: Option<C>) -> Self {
83        IpfsStore {
84            local_store: Arc::new(RwLock::new(block_store)),
85            ipfs_client,
86        }
87    }
88}
89
90#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
91#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
92impl<B, C> BlockStore for IpfsStore<B, C>
93where
94    B: BlockStore,
95    C: IpfsClient + ConditionalSync,
96{
97    #[instrument(skip(self), level = "trace")]
98    async fn put_block(&mut self, cid: &Cid, block: &[u8]) -> Result<()> {
99        let mut local_store = self.local_store.write().await;
100        local_store.put_block(cid, block).await
101    }
102
103    #[instrument(skip(self), level = "trace")]
104    async fn get_block(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
105        trace!("Looking up block locally...");
106        let maybe_block = {
107            let local_store = self.local_store.read().await;
108            local_store.get_block(cid).await?
109        };
110
111        if let Some(block) = maybe_block {
112            trace!("Found block locally!");
113            return Ok(Some(block));
114        }
115
116        trace!("Block not available locally...");
117
118        if let Some(ipfs_client) = self.ipfs_client.as_ref() {
119            trace!("Looking up block in IPFS...");
120            if let Some(bytes) = ipfs_client.get_block(cid).await? {
121                trace!("Found block in IPFS!");
122                let mut local_store = self.local_store.write().await;
123                local_store.put_block(cid, &bytes).await?;
124                return Ok(Some(bytes));
125            }
126        }
127        Ok(None)
128    }
129}
130
131// Note that these tests require that there is a locally available IPFS Kubo
132// node running with the RPC API enabled
133#[cfg(all(test, feature = "test-kubo", not(target_arch = "wasm32")))]
134mod tests {
135    use std::time::Duration;
136
137    use super::*;
138    use crate::KuboClient;
139    use libipld_cbor::DagCborCodec;
140    use noosphere_core::tracing::initialize_tracing;
141    use noosphere_storage::{block_serialize, BlockStoreRetry, MemoryStore};
142    use rand::prelude::*;
143    use serde::{Deserialize, Serialize};
144    use url::Url;
145
146    #[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
147    struct TestData {
148        value_a: i64,
149        value_b: i64,
150    }
151
152    /// Fetching a block from IPFS that isn't already on IPFS can hang
153    /// indefinitely. This test ensures that [BlockStoreRetry] wraps
154    /// [IpfsStore] successfully, producing an error.
155    #[tokio::test]
156    pub async fn it_fails_gracefully_if_block_not_found() {
157        initialize_tracing(None);
158
159        let mut rng = thread_rng();
160        let foo = TestData {
161            // uniquely generate value such that
162            // it is not found on the IPFS network.
163            value_a: rng.gen(),
164            value_b: rng.gen(),
165        };
166
167        let (foo_cid, _) = block_serialize::<DagCborCodec, _>(foo.clone()).unwrap();
168
169        let ipfs_url = Url::parse("http://127.0.0.1:5001").unwrap();
170        let kubo_client = KuboClient::new(&ipfs_url).unwrap();
171        let ipfs_store = {
172            let inner = MemoryStore::default();
173            let inner = IpfsStore::new(inner, Some(kubo_client));
174            BlockStoreRetry {
175                store: inner,
176                maximum_retries: 1,
177                attempt_window: Duration::from_millis(100),
178                minimum_delay: Duration::from_millis(100),
179                backoff: None,
180            }
181        };
182
183        assert!(ipfs_store.get_block(&foo_cid).await.is_err());
184    }
185}