1use std::future::IntoFuture;
2use std::ops::Deref;
3use std::path::Path;
4use std::sync::Arc;
5
6use anyhow::anyhow;
7use bytes::Bytes;
8use futures::Stream;
9use iroh::{Endpoint, NodeId};
10use iroh_blobs::{
11 api::{
12 blobs::{BlobReader as Reader, BlobStatus, Blobs},
13 downloader::{Downloader, Shuffled},
14 ExportBaoError, RequestError,
15 },
16 store::{fs::FsStore, mem::MemStore},
17 BlobsProtocol, Hash,
18};
19
20use object_store::ObjectStore as ObjStore;
21
22use crate::{
23 crypto::PublicKey,
24 linked_data::{BlockEncoded, CodecError, DagCborCodec},
25};
26
27#[derive(Clone, Debug)]
35pub struct BlobsStore {
36 pub inner: Arc<BlobsProtocol>,
37}
38
39impl Deref for BlobsStore {
40 type Target = Arc<BlobsProtocol>;
41 fn deref(&self) -> &Self::Target {
42 &self.inner
43 }
44}
45
46#[derive(Debug, thiserror::Error)]
47pub enum BlobsStoreError {
48 #[error("blobs store error: {0}")]
49 Default(#[from] anyhow::Error),
50 #[error("blob store i/o error: {0}")]
51 Io(#[from] std::io::Error),
52 #[error("export bao error: {0}")]
53 ExportBao(#[from] ExportBaoError),
54 #[error("request error: {0}")]
55 Request(#[from] RequestError),
56 #[error("decode error: {0}")]
57 Decode(#[from] CodecError),
58 #[error("object store error: {0}")]
59 ObjectStore(#[from] object_store::BlobStoreError),
60}
61
62impl BlobsStore {
63 pub async fn legacy_fs(path: &Path) -> Result<Self, BlobsStoreError> {
65 tracing::debug!("BlobsStore::legacy_fs called with path: {:?}", path);
66 let store = FsStore::load(path).await?;
67 tracing::debug!("BlobsStore::legacy_fs completed loading FsStore");
68 let blobs = BlobsProtocol::new(&store, None);
69 Ok(Self {
70 inner: Arc::new(blobs),
71 })
72 }
73
74 pub async fn legacy_memory() -> Result<Self, BlobsStoreError> {
76 let store = MemStore::new();
77 let blobs = BlobsProtocol::new(&store, None);
78 Ok(Self {
79 inner: Arc::new(blobs),
80 })
81 }
82
83 pub async fn fs(data_dir: &Path) -> Result<Self, BlobsStoreError> {
85 let store = ObjStore::new_local(data_dir).await?;
86 Ok(Self::from_store(store.into()))
87 }
88
89 pub async fn memory() -> Result<Self, BlobsStoreError> {
91 let store = ObjStore::new_ephemeral().await?;
92 Ok(Self::from_store(store.into()))
93 }
94
95 pub async fn s3(
97 db_path: &Path,
98 endpoint: &str,
99 access_key: &str,
100 secret_key: &str,
101 bucket: &str,
102 region: Option<&str>,
103 ) -> Result<Self, BlobsStoreError> {
104 let store =
105 ObjStore::new_s3(db_path, endpoint, access_key, secret_key, bucket, region).await?;
106 Ok(Self::from_store(store.into()))
107 }
108
109 pub fn from_store(store: iroh_blobs::api::Store) -> Self {
111 let blobs = BlobsProtocol::new(&store, None);
112 Self {
113 inner: Arc::new(blobs),
114 }
115 }
116
117 pub fn blobs(&self) -> &Blobs {
120 self.inner.store().blobs()
121 }
122
123 pub async fn get(&self, hash: &Hash) -> Result<Bytes, BlobsStoreError> {
125 let bytes = self.blobs().get_bytes(*hash).await?;
126 Ok(bytes)
127 }
128
129 pub async fn get_cbor<T: BlockEncoded<DagCborCodec>>(
131 &self,
132 hash: &Hash,
133 ) -> Result<T, BlobsStoreError> {
134 let bytes = self.blobs().get_bytes(*hash).await?;
135 Ok(T::decode(&bytes)?)
136 }
137
138 pub async fn get_reader(&self, hash: Hash) -> Result<Reader, BlobsStoreError> {
140 let reader = self.blobs().reader(hash);
141 Ok(reader)
142 }
143
144 pub async fn put_stream(
146 &self,
147 stream: impl Stream<Item = std::io::Result<Bytes>> + Send + Unpin + 'static + std::marker::Sync,
148 ) -> Result<Hash, BlobsStoreError> {
149 let outcome = self
150 .blobs()
151 .add_stream(stream)
152 .into_future()
153 .await
154 .with_tag()
155 .await?
156 .hash;
157 Ok(outcome)
158 }
159
160 pub async fn put(&self, data: Vec<u8>) -> Result<Hash, BlobsStoreError> {
162 let hash = self.blobs().add_bytes(data).into_future().await?.hash;
163 Ok(hash)
164 }
165
166 pub async fn stat(&self, hash: &Hash) -> Result<bool, BlobsStoreError> {
168 let stat = self
169 .blobs()
170 .status(*hash)
171 .await
172 .map_err(|err| BlobsStoreError::Default(anyhow!(err)))?;
173 Ok(matches!(stat, BlobStatus::Complete { .. }))
174 }
175
176 pub async fn download_hash(
181 &self,
182 hash: Hash,
183 peer_ids: Vec<PublicKey>,
184 endpoint: &Endpoint,
185 ) -> Result<(), BlobsStoreError> {
186 tracing::debug!("download_hash: Checking if hash {} exists locally", hash);
187
188 if self.stat(&hash).await? {
190 tracing::debug!(
191 "download_hash: Hash {} already exists locally, skipping download",
192 hash
193 );
194 return Ok(());
195 }
196
197 tracing::info!(
198 "download_hash: Downloading hash {} from {} peers: {:?}",
199 hash,
200 peer_ids.len(),
201 peer_ids
202 );
203
204 let downloader = Downloader::new(self.inner.store(), endpoint);
206
207 let discovery = Shuffled::new(
209 peer_ids
210 .iter()
211 .map(|peer_id| NodeId::from(*peer_id))
212 .collect(),
213 );
214
215 tracing::debug!(
216 "download_hash: Starting download of hash {} with downloader",
217 hash
218 );
219
220 match downloader.download(hash, discovery).await {
223 Ok(_) => {
224 tracing::info!("download_hash: Successfully downloaded hash {}", hash);
225
226 match self.stat(&hash).await {
228 Ok(true) => tracing::debug!(
229 "download_hash: Verified hash {} exists after download",
230 hash
231 ),
232 Ok(false) => {
233 tracing::error!("download_hash: Hash {} NOT found after download!", hash);
234 return Err(anyhow!("Hash not found after download").into());
235 }
236 Err(e) => {
237 tracing::error!("download_hash: Error verifying hash {}: {}", hash, e);
238 return Err(e);
239 }
240 }
241 }
242 Err(e) => {
243 tracing::error!(
244 "download_hash: Failed to download hash {} from peers {:?}: {}",
245 hash,
246 peer_ids,
247 e
248 );
249 return Err(e.into());
250 }
251 }
252
253 Ok(())
254 }
255
256 pub async fn download_hash_list(
261 &self,
262 hash_list_hash: Hash,
263 peer_ids: Vec<PublicKey>,
264 endpoint: &Endpoint,
265 ) -> Result<(), BlobsStoreError> {
266 tracing::debug!(
267 "download_hash_list: Starting download of hash list {} from {} peers",
268 hash_list_hash,
269 peer_ids.len()
270 );
271
272 tracing::debug!("download_hash_list: Downloading hash list blob itself");
274 self.download_hash(hash_list_hash, peer_ids.clone(), endpoint)
275 .await?;
276 tracing::debug!("download_hash_list: Hash list blob downloaded successfully");
277
278 match self.stat(&hash_list_hash).await {
280 Ok(true) => tracing::debug!(
281 "download_hash_list: Verified hash list blob {} exists",
282 hash_list_hash
283 ),
284 Ok(false) => {
285 tracing::error!(
286 "download_hash_list: Hash list blob {} NOT found after download!",
287 hash_list_hash
288 );
289 return Err(anyhow!("Hash list blob not found after download").into());
290 }
291 Err(e) => {
292 tracing::error!("download_hash_list: Error checking hash list blob: {}", e);
293 return Err(e);
294 }
295 }
296
297 tracing::debug!("download_hash_list: Reading hash list contents");
299 let hashes = self.read_hash_list(hash_list_hash).await?;
300 tracing::info!(
301 "download_hash_list: Hash list contains {} hashes, downloading all...",
302 hashes.len()
303 );
304
305 if hashes.is_empty() {
306 tracing::warn!("download_hash_list: Hash list is EMPTY - no content to download");
307 return Ok(());
308 }
309
310 for (idx, hash) in hashes.iter().enumerate() {
312 tracing::debug!(
313 "download_hash_list: Downloading content hash {}/{}: {:?}",
314 idx + 1,
315 hashes.len(),
316 hash
317 );
318 match self.download_hash(*hash, peer_ids.clone(), endpoint).await {
319 Ok(()) => {
320 tracing::debug!(
321 "download_hash_list: Content hash {}/{} downloaded successfully",
322 idx + 1,
323 hashes.len()
324 );
325 }
326 Err(e) => {
327 tracing::error!(
328 "download_hash_list: Failed to download content hash {}/{} ({:?}): {}",
329 idx + 1,
330 hashes.len(),
331 hash,
332 e
333 );
334 return Err(e);
335 }
336 }
337 }
338
339 tracing::info!(
340 "download_hash_list: Successfully downloaded all {} hashes from hash list",
341 hashes.len()
342 );
343
344 Ok(())
345 }
346
347 pub async fn create_hash_list<I>(&self, hashes: I) -> Result<Hash, BlobsStoreError>
351 where
352 I: IntoIterator<Item = Hash>,
353 {
354 let mut data = Vec::new();
356 for hash in hashes {
357 data.extend_from_slice(hash.as_bytes());
358 }
359
360 let hash = self.put(data).await?;
362 Ok(hash)
363 }
364
365 pub async fn read_hash_list(&self, list_hash: Hash) -> Result<Vec<Hash>, BlobsStoreError> {
368 let mut hashes = Vec::new();
369
370 let data = self.get(&list_hash).await?;
372
373 if data.len() % 32 != 0 {
375 return Err(anyhow!("Invalid hash list: length is not a multiple of 32").into());
376 }
377
378 for chunk in data.chunks_exact(32) {
379 let mut hash_bytes = [0u8; 32];
380 hash_bytes.copy_from_slice(chunk);
381 hashes.push(Hash::from_bytes(hash_bytes));
382 }
383
384 Ok(hashes)
385 }
386}
387
388#[cfg(test)]
389mod tests {
390 use super::*;
391 use bytes::Bytes;
392 use futures::stream;
393 use tempfile::TempDir;
394
395 async fn setup_test_store() -> (BlobsStore, TempDir) {
396 let temp_dir = TempDir::new().unwrap();
397 let blob_path = temp_dir.path().join("blobs");
398
399 let blobs = BlobsStore::fs(&blob_path).await.unwrap();
400 (blobs, temp_dir)
401 }
402
403 #[tokio::test]
404 async fn test_put_and_get() {
405 let (store, _temp) = setup_test_store().await;
406
407 let data = b"Hello, BlobsStore!";
409
410 let hash = store.put(data.to_vec()).await.unwrap();
412 assert!(!hash.as_bytes().is_empty());
413
414 let retrieved = store.get(&hash).await.unwrap();
416 assert_eq!(retrieved.as_ref(), data);
417 }
418
419 #[tokio::test]
420 async fn test_put_stream() {
421 let (store, _temp) = setup_test_store().await;
422
423 let data = b"Streaming data test";
425 let stream =
426 stream::once(async move { Ok::<_, std::io::Error>(Bytes::from(data.to_vec())) });
427
428 let hash = store.put_stream(Box::pin(stream)).await.unwrap();
430
431 let retrieved = store.get(&hash).await.unwrap();
433 assert_eq!(retrieved.as_ref(), data);
434 }
435
436 #[tokio::test]
437 async fn test_stat() {
438 let (store, _temp) = setup_test_store().await;
439
440 let data = b"Test data for stat";
441 let hash = store.put(data.to_vec()).await.unwrap();
442
443 assert!(store.stat(&hash).await.unwrap());
445
446 let fake_hash = iroh_blobs::Hash::from_bytes([0u8; 32]);
448 assert!(!store.stat(&fake_hash).await.unwrap());
449 }
450
451 #[tokio::test]
452 async fn test_large_data() {
453 let (store, _temp) = setup_test_store().await;
454
455 let data = vec![42u8; 1024 * 1024];
457
458 let hash = store.put(data.clone()).await.unwrap();
460 let retrieved = store.get(&hash).await.unwrap();
461
462 assert_eq!(retrieved.len(), data.len());
463 assert_eq!(retrieved.as_ref(), data.as_slice());
464 }
465
466 #[tokio::test]
467 async fn test_multiple_puts() {
468 let (store, _temp) = setup_test_store().await;
469
470 let data1 = b"First data";
471 let data2 = b"Second data";
472 let data3 = b"Third data";
473
474 let hash1 = store.put(data1.to_vec()).await.unwrap();
476 let hash2 = store.put(data2.to_vec()).await.unwrap();
477 let hash3 = store.put(data3.to_vec()).await.unwrap();
478
479 assert_ne!(hash1, hash2);
481 assert_ne!(hash2, hash3);
482 assert_ne!(hash1, hash3);
483
484 assert_eq!(store.get(&hash1).await.unwrap().as_ref(), data1);
486 assert_eq!(store.get(&hash2).await.unwrap().as_ref(), data2);
487 assert_eq!(store.get(&hash3).await.unwrap().as_ref(), data3);
488 }
489
490 #[tokio::test]
491 async fn test_get_nonexistent() {
492 let (store, _temp) = setup_test_store().await;
493
494 let fake_hash = iroh_blobs::Hash::from_bytes([99u8; 32]);
496 let result = store.get(&fake_hash).await;
497
498 assert!(result.is_err());
500 }
501}