noosphere_ipfs/
storage.rs1use crate::IpfsClient;
2use anyhow::Result;
3use async_trait::async_trait;
4use cid::Cid;
5use noosphere_common::ConditionalSync;
6use noosphere_storage::{BlockStore, Storage};
7use std::sync::Arc;
8use tokio::sync::RwLock;
9
10#[cfg(doc)]
11use noosphere_storage::KeyValueStore;
12
13#[derive(Clone, Debug)]
18pub struct IpfsStorage<S, C>
19where
20 S: Storage,
21 C: IpfsClient,
22{
23 local_storage: S,
24 ipfs_client: Option<C>,
25}
26
27impl<S, C> IpfsStorage<S, C>
28where
29 S: Storage,
30 C: IpfsClient,
31{
32 pub fn new(local_storage: S, ipfs_client: Option<C>) -> Self {
33 IpfsStorage {
34 local_storage,
35 ipfs_client,
36 }
37 }
38}
39
40#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
41#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
42impl<S, C> Storage for IpfsStorage<S, C>
43where
44 S: Storage + ConditionalSync,
45 C: IpfsClient + ConditionalSync,
46{
47 type BlockStore = IpfsStore<S::BlockStore, C>;
48
49 type KeyValueStore = S::KeyValueStore;
50
51 async fn get_block_store(&self, name: &str) -> Result<Self::BlockStore> {
52 let store = self.local_storage.get_block_store(name).await?;
53 Ok(IpfsStore::new(store, self.ipfs_client.clone()))
54 }
55
56 async fn get_key_value_store(&self, name: &str) -> Result<Self::KeyValueStore> {
57 self.local_storage.get_key_value_store(name).await
58 }
59}
60
61#[derive(Clone)]
68pub struct IpfsStore<B, C>
69where
70 B: BlockStore,
71 C: IpfsClient + ConditionalSync,
72{
73 local_store: Arc<RwLock<B>>,
74 ipfs_client: Option<C>,
75}
76
77impl<B, C> IpfsStore<B, C>
78where
79 B: BlockStore,
80 C: IpfsClient + ConditionalSync,
81{
82 pub fn new(block_store: B, ipfs_client: Option<C>) -> Self {
83 IpfsStore {
84 local_store: Arc::new(RwLock::new(block_store)),
85 ipfs_client,
86 }
87 }
88}
89
90#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
91#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
92impl<B, C> BlockStore for IpfsStore<B, C>
93where
94 B: BlockStore,
95 C: IpfsClient + ConditionalSync,
96{
97 #[instrument(skip(self), level = "trace")]
98 async fn put_block(&mut self, cid: &Cid, block: &[u8]) -> Result<()> {
99 let mut local_store = self.local_store.write().await;
100 local_store.put_block(cid, block).await
101 }
102
103 #[instrument(skip(self), level = "trace")]
104 async fn get_block(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
105 trace!("Looking up block locally...");
106 let maybe_block = {
107 let local_store = self.local_store.read().await;
108 local_store.get_block(cid).await?
109 };
110
111 if let Some(block) = maybe_block {
112 trace!("Found block locally!");
113 return Ok(Some(block));
114 }
115
116 trace!("Block not available locally...");
117
118 if let Some(ipfs_client) = self.ipfs_client.as_ref() {
119 trace!("Looking up block in IPFS...");
120 if let Some(bytes) = ipfs_client.get_block(cid).await? {
121 trace!("Found block in IPFS!");
122 let mut local_store = self.local_store.write().await;
123 local_store.put_block(cid, &bytes).await?;
124 return Ok(Some(bytes));
125 }
126 }
127 Ok(None)
128 }
129}
130
131#[cfg(all(test, feature = "test-kubo", not(target_arch = "wasm32")))]
134mod tests {
135 use std::time::Duration;
136
137 use super::*;
138 use crate::KuboClient;
139 use libipld_cbor::DagCborCodec;
140 use noosphere_core::tracing::initialize_tracing;
141 use noosphere_storage::{block_serialize, BlockStoreRetry, MemoryStore};
142 use rand::prelude::*;
143 use serde::{Deserialize, Serialize};
144 use url::Url;
145
146 #[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
147 struct TestData {
148 value_a: i64,
149 value_b: i64,
150 }
151
152 #[tokio::test]
156 pub async fn it_fails_gracefully_if_block_not_found() {
157 initialize_tracing(None);
158
159 let mut rng = thread_rng();
160 let foo = TestData {
161 value_a: rng.gen(),
164 value_b: rng.gen(),
165 };
166
167 let (foo_cid, _) = block_serialize::<DagCborCodec, _>(foo.clone()).unwrap();
168
169 let ipfs_url = Url::parse("http://127.0.0.1:5001").unwrap();
170 let kubo_client = KuboClient::new(&ipfs_url).unwrap();
171 let ipfs_store = {
172 let inner = MemoryStore::default();
173 let inner = IpfsStore::new(inner, Some(kubo_client));
174 BlockStoreRetry {
175 store: inner,
176 maximum_retries: 1,
177 attempt_window: Duration::from_millis(100),
178 minimum_delay: Duration::from_millis(100),
179 backoff: None,
180 }
181 };
182
183 assert!(ipfs_store.get_block(&foo_cid).await.is_err());
184 }
185}