use std::future::IntoFuture;
use std::ops::Deref;
use std::path::Path;
use std::sync::Arc;
use anyhow::anyhow;
use bytes::Bytes;
use futures::Stream;
use iroh::{Endpoint, NodeId};
use iroh_blobs::{
api::{
blobs::{BlobReader as Reader, BlobStatus, Blobs},
downloader::{Downloader, Shuffled},
ExportBaoError, RequestError,
},
store::{fs::FsStore, mem::MemStore},
BlobsProtocol, Hash,
};
use object_store::ObjectStore as ObjStore;
use crate::{
crypto::PublicKey,
linked_data::{BlockEncoded, CodecError, DagCborCodec},
};
#[derive(Clone, Debug)]
pub struct BlobsStore {
pub inner: Arc<BlobsProtocol>,
}
impl Deref for BlobsStore {
type Target = Arc<BlobsProtocol>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
#[derive(Debug, thiserror::Error)]
pub enum BlobsStoreError {
#[error("blobs store error: {0}")]
Default(#[from] anyhow::Error),
#[error("blob store i/o error: {0}")]
Io(#[from] std::io::Error),
#[error("export bao error: {0}")]
ExportBao(#[from] ExportBaoError),
#[error("request error: {0}")]
Request(#[from] RequestError),
#[error("decode error: {0}")]
Decode(#[from] CodecError),
#[error("object store error: {0}")]
ObjectStore(#[from] object_store::BlobStoreError),
}
impl BlobsStore {
pub async fn legacy_fs(path: &Path) -> Result<Self, BlobsStoreError> {
tracing::debug!("BlobsStore::legacy_fs called with path: {:?}", path);
let store = FsStore::load(path).await?;
tracing::debug!("BlobsStore::legacy_fs completed loading FsStore");
let blobs = BlobsProtocol::new(&store, None);
Ok(Self {
inner: Arc::new(blobs),
})
}
pub async fn legacy_memory() -> Result<Self, BlobsStoreError> {
let store = MemStore::new();
let blobs = BlobsProtocol::new(&store, None);
Ok(Self {
inner: Arc::new(blobs),
})
}
pub async fn fs(
db_path: &Path,
objects_path: &Path,
max_import_size: Option<u64>,
) -> Result<Self, BlobsStoreError> {
let store = ObjStore::new_local(db_path, objects_path, max_import_size).await?;
Ok(Self::from_store(store.into()))
}
pub async fn memory() -> Result<Self, BlobsStoreError> {
let store = ObjStore::new_ephemeral().await?;
Ok(Self::from_store(store.into()))
}
pub async fn s3(
db_path: &Path,
endpoint: &str,
access_key: &str,
secret_key: &str,
bucket: &str,
region: Option<&str>,
max_import_size: Option<u64>,
) -> Result<Self, BlobsStoreError> {
let store = ObjStore::new_s3(
db_path,
endpoint,
access_key,
secret_key,
bucket,
region,
max_import_size,
)
.await?;
Ok(Self::from_store(store.into()))
}
pub fn from_store(store: iroh_blobs::api::Store) -> Self {
let blobs = BlobsProtocol::new(&store, None);
Self {
inner: Arc::new(blobs),
}
}
pub fn blobs(&self) -> &Blobs {
self.inner.store().blobs()
}
pub async fn get(&self, hash: &Hash) -> Result<Bytes, BlobsStoreError> {
let bytes = self.blobs().get_bytes(*hash).await?;
Ok(bytes)
}
pub async fn get_cbor<T: BlockEncoded<DagCborCodec>>(
&self,
hash: &Hash,
) -> Result<T, BlobsStoreError> {
let bytes = self.blobs().get_bytes(*hash).await?;
Ok(T::decode(&bytes)?)
}
pub async fn get_reader(&self, hash: Hash) -> Result<Reader, BlobsStoreError> {
let reader = self.blobs().reader(hash);
Ok(reader)
}
pub async fn put_stream(
&self,
stream: impl Stream<Item = std::io::Result<Bytes>> + Send + Unpin + 'static + std::marker::Sync,
) -> Result<Hash, BlobsStoreError> {
let outcome = self
.blobs()
.add_stream(stream)
.into_future()
.await
.with_tag()
.await?
.hash;
Ok(outcome)
}
pub async fn put(&self, data: Vec<u8>) -> Result<Hash, BlobsStoreError> {
let hash = self.blobs().add_bytes(data).into_future().await?.hash;
Ok(hash)
}
pub async fn stat(&self, hash: &Hash) -> Result<bool, BlobsStoreError> {
let stat = self
.blobs()
.status(*hash)
.await
.map_err(|err| BlobsStoreError::Default(anyhow!(err)))?;
Ok(matches!(stat, BlobStatus::Complete { .. }))
}
pub async fn download_hash(
&self,
hash: Hash,
peer_ids: Vec<PublicKey>,
endpoint: &Endpoint,
) -> Result<(), BlobsStoreError> {
tracing::debug!("download_hash: Checking if hash {} exists locally", hash);
if self.stat(&hash).await? {
tracing::debug!(
"download_hash: Hash {} already exists locally, skipping download",
hash
);
return Ok(());
}
tracing::info!(
"download_hash: Downloading hash {} from {} peers: {:?}",
hash,
peer_ids.len(),
peer_ids
);
let downloader = Downloader::new(self.inner.store(), endpoint);
let discovery = Shuffled::new(
peer_ids
.iter()
.map(|peer_id| NodeId::from(*peer_id))
.collect(),
);
tracing::debug!(
"download_hash: Starting download of hash {} with downloader",
hash
);
match downloader.download(hash, discovery).await {
Ok(_) => {
tracing::info!("download_hash: Successfully downloaded hash {}", hash);
match self.stat(&hash).await {
Ok(true) => tracing::debug!(
"download_hash: Verified hash {} exists after download",
hash
),
Ok(false) => {
tracing::error!("download_hash: Hash {} NOT found after download!", hash);
return Err(anyhow!("Hash not found after download").into());
}
Err(e) => {
tracing::error!("download_hash: Error verifying hash {}: {}", hash, e);
return Err(e);
}
}
}
Err(e) => {
tracing::error!(
"download_hash: Failed to download hash {} from peers {:?}: {}",
hash,
peer_ids,
e
);
return Err(e.into());
}
}
Ok(())
}
pub async fn download_hash_list(
&self,
hash_list_hash: Hash,
peer_ids: Vec<PublicKey>,
endpoint: &Endpoint,
) -> Result<(), BlobsStoreError> {
tracing::debug!(
"download_hash_list: Starting download of hash list {} from {} peers",
hash_list_hash,
peer_ids.len()
);
tracing::debug!("download_hash_list: Downloading hash list blob itself");
self.download_hash(hash_list_hash, peer_ids.clone(), endpoint)
.await?;
tracing::debug!("download_hash_list: Hash list blob downloaded successfully");
match self.stat(&hash_list_hash).await {
Ok(true) => tracing::debug!(
"download_hash_list: Verified hash list blob {} exists",
hash_list_hash
),
Ok(false) => {
tracing::error!(
"download_hash_list: Hash list blob {} NOT found after download!",
hash_list_hash
);
return Err(anyhow!("Hash list blob not found after download").into());
}
Err(e) => {
tracing::error!("download_hash_list: Error checking hash list blob: {}", e);
return Err(e);
}
}
tracing::debug!("download_hash_list: Reading hash list contents");
let hashes = self.read_hash_list(hash_list_hash).await?;
tracing::info!(
"download_hash_list: Hash list contains {} hashes, downloading all...",
hashes.len()
);
if hashes.is_empty() {
tracing::warn!("download_hash_list: Hash list is EMPTY - no content to download");
return Ok(());
}
for (idx, hash) in hashes.iter().enumerate() {
tracing::debug!(
"download_hash_list: Downloading content hash {}/{}: {:?}",
idx + 1,
hashes.len(),
hash
);
match self.download_hash(*hash, peer_ids.clone(), endpoint).await {
Ok(()) => {
tracing::debug!(
"download_hash_list: Content hash {}/{} downloaded successfully",
idx + 1,
hashes.len()
);
}
Err(e) => {
tracing::error!(
"download_hash_list: Failed to download content hash {}/{} ({:?}): {}",
idx + 1,
hashes.len(),
hash,
e
);
return Err(e);
}
}
}
tracing::info!(
"download_hash_list: Successfully downloaded all {} hashes from hash list",
hashes.len()
);
Ok(())
}
pub async fn create_hash_list<I>(&self, hashes: I) -> Result<Hash, BlobsStoreError>
where
I: IntoIterator<Item = Hash>,
{
let mut data = Vec::new();
for hash in hashes {
data.extend_from_slice(hash.as_bytes());
}
let hash = self.put(data).await?;
Ok(hash)
}
pub async fn read_hash_list(&self, list_hash: Hash) -> Result<Vec<Hash>, BlobsStoreError> {
let mut hashes = Vec::new();
let data = self.get(&list_hash).await?;
if data.len() % 32 != 0 {
return Err(anyhow!("Invalid hash list: length is not a multiple of 32").into());
}
for chunk in data.chunks_exact(32) {
let mut hash_bytes = [0u8; 32];
hash_bytes.copy_from_slice(chunk);
hashes.push(Hash::from_bytes(hash_bytes));
}
Ok(hashes)
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
use futures::stream;
use tempfile::TempDir;
async fn setup_test_store() -> (BlobsStore, TempDir) {
let temp_dir = TempDir::new().unwrap();
let db_path = temp_dir.path().join("blobs.db");
let objects_path = temp_dir.path().join("objects");
let blobs = BlobsStore::fs(&db_path, &objects_path, None).await.unwrap();
(blobs, temp_dir)
}
#[tokio::test]
async fn test_put_and_get() {
let (store, _temp) = setup_test_store().await;
let data = b"Hello, BlobsStore!";
let hash = store.put(data.to_vec()).await.unwrap();
assert!(!hash.as_bytes().is_empty());
let retrieved = store.get(&hash).await.unwrap();
assert_eq!(retrieved.as_ref(), data);
}
#[tokio::test]
async fn test_put_stream() {
let (store, _temp) = setup_test_store().await;
let data = b"Streaming data test";
let stream =
stream::once(async move { Ok::<_, std::io::Error>(Bytes::from(data.to_vec())) });
let hash = store.put_stream(Box::pin(stream)).await.unwrap();
let retrieved = store.get(&hash).await.unwrap();
assert_eq!(retrieved.as_ref(), data);
}
#[tokio::test]
async fn test_stat() {
let (store, _temp) = setup_test_store().await;
let data = b"Test data for stat";
let hash = store.put(data.to_vec()).await.unwrap();
assert!(store.stat(&hash).await.unwrap());
let fake_hash = iroh_blobs::Hash::from_bytes([0u8; 32]);
assert!(!store.stat(&fake_hash).await.unwrap());
}
#[tokio::test]
async fn test_large_data() {
let (store, _temp) = setup_test_store().await;
let data = vec![42u8; 1024 * 1024];
let hash = store.put(data.clone()).await.unwrap();
let retrieved = store.get(&hash).await.unwrap();
assert_eq!(retrieved.len(), data.len());
assert_eq!(retrieved.as_ref(), data.as_slice());
}
#[tokio::test]
async fn test_multiple_puts() {
let (store, _temp) = setup_test_store().await;
let data1 = b"First data";
let data2 = b"Second data";
let data3 = b"Third data";
let hash1 = store.put(data1.to_vec()).await.unwrap();
let hash2 = store.put(data2.to_vec()).await.unwrap();
let hash3 = store.put(data3.to_vec()).await.unwrap();
assert_ne!(hash1, hash2);
assert_ne!(hash2, hash3);
assert_ne!(hash1, hash3);
assert_eq!(store.get(&hash1).await.unwrap().as_ref(), data1);
assert_eq!(store.get(&hash2).await.unwrap().as_ref(), data2);
assert_eq!(store.get(&hash3).await.unwrap().as_ref(), data3);
}
#[tokio::test]
async fn test_get_nonexistent() {
let (store, _temp) = setup_test_store().await;
let fake_hash = iroh_blobs::Hash::from_bytes([99u8; 32]);
let result = store.get(&fake_hash).await;
assert!(result.is_err());
}
}