1use std::future::IntoFuture;
2use std::ops::Deref;
3use std::path::Path;
4use std::sync::Arc;
5
6use anyhow::anyhow;
7use bytes::Bytes;
8use futures::Stream;
9use iroh::{Endpoint, NodeId};
10use iroh_blobs::{
11 api::{
12 blobs::{BlobReader as Reader, BlobStatus, Blobs},
13 downloader::{Downloader, Shuffled},
14 ExportBaoError, RequestError,
15 },
16 store::{fs::FsStore, mem::MemStore},
17 BlobsProtocol, Hash,
18};
19
20use object_store::ObjectStore as ObjStore;
21
22use crate::{
23 crypto::PublicKey,
24 linked_data::{BlockEncoded, CodecError, DagCborCodec},
25};
26
27#[derive(Clone, Debug)]
35pub struct BlobsStore {
36 pub inner: Arc<BlobsProtocol>,
37}
38
39impl Deref for BlobsStore {
40 type Target = Arc<BlobsProtocol>;
41 fn deref(&self) -> &Self::Target {
42 &self.inner
43 }
44}
45
46#[derive(Debug, thiserror::Error)]
47pub enum BlobsStoreError {
48 #[error("blobs store error: {0}")]
49 Default(#[from] anyhow::Error),
50 #[error("blob store i/o error: {0}")]
51 Io(#[from] std::io::Error),
52 #[error("export bao error: {0}")]
53 ExportBao(#[from] ExportBaoError),
54 #[error("request error: {0}")]
55 Request(#[from] RequestError),
56 #[error("decode error: {0}")]
57 Decode(#[from] CodecError),
58 #[error("object store error: {0}")]
59 ObjectStore(#[from] object_store::BlobStoreError),
60}
61
62impl BlobsStore {
63 pub async fn legacy_fs(path: &Path) -> Result<Self, BlobsStoreError> {
65 tracing::debug!("BlobsStore::legacy_fs called with path: {:?}", path);
66 let store = FsStore::load(path).await?;
67 tracing::debug!("BlobsStore::legacy_fs completed loading FsStore");
68 let blobs = BlobsProtocol::new(&store, None);
69 Ok(Self {
70 inner: Arc::new(blobs),
71 })
72 }
73
74 pub async fn legacy_memory() -> Result<Self, BlobsStoreError> {
76 let store = MemStore::new();
77 let blobs = BlobsProtocol::new(&store, None);
78 Ok(Self {
79 inner: Arc::new(blobs),
80 })
81 }
82
83 pub async fn fs(
90 db_path: &Path,
91 objects_path: &Path,
92 max_import_size: Option<u64>,
93 ) -> Result<Self, BlobsStoreError> {
94 let store = ObjStore::new_local(db_path, objects_path, max_import_size).await?;
95 Ok(Self::from_store(store.into()))
96 }
97
98 pub async fn memory() -> Result<Self, BlobsStoreError> {
100 let store = ObjStore::new_ephemeral().await?;
101 Ok(Self::from_store(store.into()))
102 }
103
104 pub async fn s3(
109 db_path: &Path,
110 endpoint: &str,
111 access_key: &str,
112 secret_key: &str,
113 bucket: &str,
114 region: Option<&str>,
115 max_import_size: Option<u64>,
116 ) -> Result<Self, BlobsStoreError> {
117 let store = ObjStore::new_s3(
118 db_path,
119 endpoint,
120 access_key,
121 secret_key,
122 bucket,
123 region,
124 max_import_size,
125 )
126 .await?;
127 Ok(Self::from_store(store.into()))
128 }
129
130 pub fn from_store(store: iroh_blobs::api::Store) -> Self {
132 let blobs = BlobsProtocol::new(&store, None);
133 Self {
134 inner: Arc::new(blobs),
135 }
136 }
137
138 pub fn blobs(&self) -> &Blobs {
141 self.inner.store().blobs()
142 }
143
144 pub async fn get(&self, hash: &Hash) -> Result<Bytes, BlobsStoreError> {
146 let bytes = self.blobs().get_bytes(*hash).await?;
147 Ok(bytes)
148 }
149
150 pub async fn get_cbor<T: BlockEncoded<DagCborCodec>>(
152 &self,
153 hash: &Hash,
154 ) -> Result<T, BlobsStoreError> {
155 let bytes = self.blobs().get_bytes(*hash).await?;
156 Ok(T::decode(&bytes)?)
157 }
158
159 pub async fn get_reader(&self, hash: Hash) -> Result<Reader, BlobsStoreError> {
161 let reader = self.blobs().reader(hash);
162 Ok(reader)
163 }
164
165 pub async fn put_stream(
167 &self,
168 stream: impl Stream<Item = std::io::Result<Bytes>> + Send + Unpin + 'static + std::marker::Sync,
169 ) -> Result<Hash, BlobsStoreError> {
170 let outcome = self
171 .blobs()
172 .add_stream(stream)
173 .into_future()
174 .await
175 .with_tag()
176 .await?
177 .hash;
178 Ok(outcome)
179 }
180
181 pub async fn put(&self, data: Vec<u8>) -> Result<Hash, BlobsStoreError> {
183 let hash = self.blobs().add_bytes(data).into_future().await?.hash;
184 Ok(hash)
185 }
186
187 pub async fn stat(&self, hash: &Hash) -> Result<bool, BlobsStoreError> {
189 let stat = self
190 .blobs()
191 .status(*hash)
192 .await
193 .map_err(|err| BlobsStoreError::Default(anyhow!(err)))?;
194 Ok(matches!(stat, BlobStatus::Complete { .. }))
195 }
196
197 pub async fn download_hash(
202 &self,
203 hash: Hash,
204 peer_ids: Vec<PublicKey>,
205 endpoint: &Endpoint,
206 ) -> Result<(), BlobsStoreError> {
207 tracing::debug!("download_hash: Checking if hash {} exists locally", hash);
208
209 if self.stat(&hash).await? {
211 tracing::debug!(
212 "download_hash: Hash {} already exists locally, skipping download",
213 hash
214 );
215 return Ok(());
216 }
217
218 tracing::info!(
219 "download_hash: Downloading hash {} from {} peers: {:?}",
220 hash,
221 peer_ids.len(),
222 peer_ids
223 );
224
225 let downloader = Downloader::new(self.inner.store(), endpoint);
227
228 let discovery = Shuffled::new(
230 peer_ids
231 .iter()
232 .map(|peer_id| NodeId::from(*peer_id))
233 .collect(),
234 );
235
236 tracing::debug!(
237 "download_hash: Starting download of hash {} with downloader",
238 hash
239 );
240
241 match downloader.download(hash, discovery).await {
244 Ok(_) => {
245 tracing::info!("download_hash: Successfully downloaded hash {}", hash);
246
247 match self.stat(&hash).await {
249 Ok(true) => tracing::debug!(
250 "download_hash: Verified hash {} exists after download",
251 hash
252 ),
253 Ok(false) => {
254 tracing::error!("download_hash: Hash {} NOT found after download!", hash);
255 return Err(anyhow!("Hash not found after download").into());
256 }
257 Err(e) => {
258 tracing::error!("download_hash: Error verifying hash {}: {}", hash, e);
259 return Err(e);
260 }
261 }
262 }
263 Err(e) => {
264 tracing::error!(
265 "download_hash: Failed to download hash {} from peers {:?}: {}",
266 hash,
267 peer_ids,
268 e
269 );
270 return Err(e.into());
271 }
272 }
273
274 Ok(())
275 }
276
277 pub async fn download_hash_list(
282 &self,
283 hash_list_hash: Hash,
284 peer_ids: Vec<PublicKey>,
285 endpoint: &Endpoint,
286 ) -> Result<(), BlobsStoreError> {
287 tracing::debug!(
288 "download_hash_list: Starting download of hash list {} from {} peers",
289 hash_list_hash,
290 peer_ids.len()
291 );
292
293 tracing::debug!("download_hash_list: Downloading hash list blob itself");
295 self.download_hash(hash_list_hash, peer_ids.clone(), endpoint)
296 .await?;
297 tracing::debug!("download_hash_list: Hash list blob downloaded successfully");
298
299 match self.stat(&hash_list_hash).await {
301 Ok(true) => tracing::debug!(
302 "download_hash_list: Verified hash list blob {} exists",
303 hash_list_hash
304 ),
305 Ok(false) => {
306 tracing::error!(
307 "download_hash_list: Hash list blob {} NOT found after download!",
308 hash_list_hash
309 );
310 return Err(anyhow!("Hash list blob not found after download").into());
311 }
312 Err(e) => {
313 tracing::error!("download_hash_list: Error checking hash list blob: {}", e);
314 return Err(e);
315 }
316 }
317
318 tracing::debug!("download_hash_list: Reading hash list contents");
320 let hashes = self.read_hash_list(hash_list_hash).await?;
321 tracing::info!(
322 "download_hash_list: Hash list contains {} hashes, downloading all...",
323 hashes.len()
324 );
325
326 if hashes.is_empty() {
327 tracing::warn!("download_hash_list: Hash list is EMPTY - no content to download");
328 return Ok(());
329 }
330
331 for (idx, hash) in hashes.iter().enumerate() {
333 tracing::debug!(
334 "download_hash_list: Downloading content hash {}/{}: {:?}",
335 idx + 1,
336 hashes.len(),
337 hash
338 );
339 match self.download_hash(*hash, peer_ids.clone(), endpoint).await {
340 Ok(()) => {
341 tracing::debug!(
342 "download_hash_list: Content hash {}/{} downloaded successfully",
343 idx + 1,
344 hashes.len()
345 );
346 }
347 Err(e) => {
348 tracing::error!(
349 "download_hash_list: Failed to download content hash {}/{} ({:?}): {}",
350 idx + 1,
351 hashes.len(),
352 hash,
353 e
354 );
355 return Err(e);
356 }
357 }
358 }
359
360 tracing::info!(
361 "download_hash_list: Successfully downloaded all {} hashes from hash list",
362 hashes.len()
363 );
364
365 Ok(())
366 }
367
368 pub async fn create_hash_list<I>(&self, hashes: I) -> Result<Hash, BlobsStoreError>
372 where
373 I: IntoIterator<Item = Hash>,
374 {
375 let mut data = Vec::new();
377 for hash in hashes {
378 data.extend_from_slice(hash.as_bytes());
379 }
380
381 let hash = self.put(data).await?;
383 Ok(hash)
384 }
385
386 pub async fn read_hash_list(&self, list_hash: Hash) -> Result<Vec<Hash>, BlobsStoreError> {
389 let mut hashes = Vec::new();
390
391 let data = self.get(&list_hash).await?;
393
394 if data.len() % 32 != 0 {
396 return Err(anyhow!("Invalid hash list: length is not a multiple of 32").into());
397 }
398
399 for chunk in data.chunks_exact(32) {
400 let mut hash_bytes = [0u8; 32];
401 hash_bytes.copy_from_slice(chunk);
402 hashes.push(Hash::from_bytes(hash_bytes));
403 }
404
405 Ok(hashes)
406 }
407}
408
409#[cfg(test)]
410mod tests {
411 use super::*;
412 use bytes::Bytes;
413 use futures::stream;
414 use tempfile::TempDir;
415
416 async fn setup_test_store() -> (BlobsStore, TempDir) {
417 let temp_dir = TempDir::new().unwrap();
418 let db_path = temp_dir.path().join("blobs.db");
419 let objects_path = temp_dir.path().join("objects");
420
421 let blobs = BlobsStore::fs(&db_path, &objects_path, None).await.unwrap();
422 (blobs, temp_dir)
423 }
424
425 #[tokio::test]
426 async fn test_put_and_get() {
427 let (store, _temp) = setup_test_store().await;
428
429 let data = b"Hello, BlobsStore!";
431
432 let hash = store.put(data.to_vec()).await.unwrap();
434 assert!(!hash.as_bytes().is_empty());
435
436 let retrieved = store.get(&hash).await.unwrap();
438 assert_eq!(retrieved.as_ref(), data);
439 }
440
441 #[tokio::test]
442 async fn test_put_stream() {
443 let (store, _temp) = setup_test_store().await;
444
445 let data = b"Streaming data test";
447 let stream =
448 stream::once(async move { Ok::<_, std::io::Error>(Bytes::from(data.to_vec())) });
449
450 let hash = store.put_stream(Box::pin(stream)).await.unwrap();
452
453 let retrieved = store.get(&hash).await.unwrap();
455 assert_eq!(retrieved.as_ref(), data);
456 }
457
458 #[tokio::test]
459 async fn test_stat() {
460 let (store, _temp) = setup_test_store().await;
461
462 let data = b"Test data for stat";
463 let hash = store.put(data.to_vec()).await.unwrap();
464
465 assert!(store.stat(&hash).await.unwrap());
467
468 let fake_hash = iroh_blobs::Hash::from_bytes([0u8; 32]);
470 assert!(!store.stat(&fake_hash).await.unwrap());
471 }
472
473 #[tokio::test]
474 async fn test_large_data() {
475 let (store, _temp) = setup_test_store().await;
476
477 let data = vec![42u8; 1024 * 1024];
479
480 let hash = store.put(data.clone()).await.unwrap();
482 let retrieved = store.get(&hash).await.unwrap();
483
484 assert_eq!(retrieved.len(), data.len());
485 assert_eq!(retrieved.as_ref(), data.as_slice());
486 }
487
488 #[tokio::test]
489 async fn test_multiple_puts() {
490 let (store, _temp) = setup_test_store().await;
491
492 let data1 = b"First data";
493 let data2 = b"Second data";
494 let data3 = b"Third data";
495
496 let hash1 = store.put(data1.to_vec()).await.unwrap();
498 let hash2 = store.put(data2.to_vec()).await.unwrap();
499 let hash3 = store.put(data3.to_vec()).await.unwrap();
500
501 assert_ne!(hash1, hash2);
503 assert_ne!(hash2, hash3);
504 assert_ne!(hash1, hash3);
505
506 assert_eq!(store.get(&hash1).await.unwrap().as_ref(), data1);
508 assert_eq!(store.get(&hash2).await.unwrap().as_ref(), data2);
509 assert_eq!(store.get(&hash3).await.unwrap().as_ref(), data3);
510 }
511
512 #[tokio::test]
513 async fn test_get_nonexistent() {
514 let (store, _temp) = setup_test_store().await;
515
516 let fake_hash = iroh_blobs::Hash::from_bytes([99u8; 32]);
518 let result = store.get(&fake_hash).await;
519
520 assert!(result.is_err());
522 }
523}