use crate::IpfsClient;
use anyhow::Result;
use async_trait::async_trait;
use cid::Cid;
use noosphere_storage::{BlockStore, Storage};
use std::sync::Arc;
use tokio::sync::RwLock;
#[cfg(doc)]
use noosphere_storage::KeyValueStore;
#[derive(Clone, Debug)]
pub struct IpfsStorage<S, C>
where
S: Storage,
C: IpfsClient,
{
local_storage: S,
ipfs_client: Option<C>,
}
impl<S, C> IpfsStorage<S, C>
where
S: Storage,
C: IpfsClient,
{
pub fn new(local_storage: S, ipfs_client: Option<C>) -> Self {
IpfsStorage {
local_storage,
ipfs_client,
}
}
}
#[cfg(not(target_arch = "wasm32"))]
pub trait IpfsStorageConditionalSendSync: Send + Sync {}
#[cfg(not(target_arch = "wasm32"))]
impl<S> IpfsStorageConditionalSendSync for S where S: Send + Sync {}
#[cfg(target_arch = "wasm32")]
pub trait IpfsStorageConditionalSendSync {}
#[cfg(target_arch = "wasm32")]
impl<S> IpfsStorageConditionalSendSync for S {}
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl<S, C> Storage for IpfsStorage<S, C>
where
S: Storage + IpfsStorageConditionalSendSync,
C: IpfsClient + IpfsStorageConditionalSendSync,
{
type BlockStore = IpfsStore<S::BlockStore, C>;
type KeyValueStore = S::KeyValueStore;
async fn get_block_store(&self, name: &str) -> Result<Self::BlockStore> {
let store = self.local_storage.get_block_store(name).await?;
Ok(IpfsStore::new(store, self.ipfs_client.clone()))
}
async fn get_key_value_store(&self, name: &str) -> Result<Self::KeyValueStore> {
self.local_storage.get_key_value_store(name).await
}
}
#[derive(Clone)]
pub struct IpfsStore<B, C>
where
B: BlockStore,
C: IpfsClient + IpfsStorageConditionalSendSync,
{
local_store: Arc<RwLock<B>>,
ipfs_client: Option<C>,
}
impl<B, C> IpfsStore<B, C>
where
B: BlockStore,
C: IpfsClient + IpfsStorageConditionalSendSync,
{
pub fn new(block_store: B, ipfs_client: Option<C>) -> Self {
IpfsStore {
local_store: Arc::new(RwLock::new(block_store)),
ipfs_client,
}
}
}
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl<B, C> BlockStore for IpfsStore<B, C>
where
B: BlockStore,
C: IpfsClient + IpfsStorageConditionalSendSync,
{
#[instrument(skip(self), level = "trace")]
async fn put_block(&mut self, cid: &Cid, block: &[u8]) -> Result<()> {
let mut local_store = self.local_store.write().await;
local_store.put_block(cid, block).await
}
#[instrument(skip(self), level = "trace")]
async fn get_block(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
trace!("IpfsStore: Getting block {}...", cid);
let maybe_block = {
let local_store = self.local_store.read().await;
local_store.get_block(cid).await?
};
if let Some(block) = maybe_block {
trace!("IpfsStore: Got block locally {}", cid);
return Ok(Some(block));
}
if let Some(ipfs_client) = self.ipfs_client.as_ref() {
trace!("IpfsStore: Querying IPFS for {}...", cid);
if let Some(bytes) = ipfs_client.get_block(cid).await? {
trace!("IpfsStore: Got block via IPFS {}", cid);
let mut local_store = self.local_store.write().await;
local_store.put_block(cid, &bytes).await?;
return Ok(Some(bytes));
}
}
Ok(None)
}
}
#[cfg(all(test, feature = "test_kubo"))]
mod tests {
use super::*;
use crate::KuboClient;
use libipld_cbor::DagCborCodec;
use noosphere_core::tracing::initialize_tracing;
use noosphere_storage::{block_serialize, BlockStoreRetry, MemoryStore};
use rand::prelude::*;
use serde::{Deserialize, Serialize};
use std::time::Duration;
use url::Url;
#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
struct TestData {
value_a: i64,
value_b: i64,
}
#[tokio::test]
pub async fn it_fails_gracefully_if_block_not_found() {
initialize_tracing(None);
let mut rng = thread_rng();
let foo = TestData {
value_a: rng.gen(),
value_b: rng.gen(),
};
let (foo_cid, _) = block_serialize::<DagCborCodec, _>(foo.clone()).unwrap();
let ipfs_url = Url::parse("http://127.0.0.1:5001").unwrap();
let kubo_client = KuboClient::new(&ipfs_url).unwrap();
let ipfs_store = {
let inner = MemoryStore::default();
let inner = IpfsStore::new(inner, Some(kubo_client));
BlockStoreRetry::new(inner, 3u32, Duration::new(0, 100))
};
assert!(ipfs_store.get_block(&foo_cid).await.is_err());
}
}