1use std::future::IntoFuture;
2use std::ops::Deref;
3use std::path::Path;
4use std::sync::Arc;
5
6use anyhow::anyhow;
7use bytes::Bytes;
8use futures::Stream;
9use iroh::{Endpoint, NodeId};
10use iroh_blobs::{
11 api::{
12 blobs::{BlobReader as Reader, BlobStatus, Blobs},
13 downloader::{Downloader, Shuffled},
14 ExportBaoError, RequestError,
15 },
16 store::{fs::FsStore, mem::MemStore},
17 BlobsProtocol, Hash,
18};
19
20use crate::{
21 crypto::PublicKey,
22 linked_data::{BlockEncoded, CodecError, DagCborCodec},
23};
24
25#[derive(Clone, Debug)]
33pub struct BlobsStore {
34 pub inner: Arc<BlobsProtocol>,
35}
36
37impl Deref for BlobsStore {
38 type Target = Arc<BlobsProtocol>;
39 fn deref(&self) -> &Self::Target {
40 &self.inner
41 }
42}
43
44#[derive(Debug, thiserror::Error)]
45pub enum BlobsStoreError {
46 #[error("blobs store error: {0}")]
47 Default(#[from] anyhow::Error),
48 #[error("blob store i/o error: {0}")]
49 Io(#[from] std::io::Error),
50 #[error("export bao error: {0}")]
51 ExportBao(#[from] ExportBaoError),
52 #[error("request error: {0}")]
53 Request(#[from] RequestError),
54 #[error("decode error: {0}")]
55 Decode(#[from] CodecError),
56}
57
58impl BlobsStore {
59 #[allow(clippy::doc_overindented_list_items)]
69 pub async fn fs(path: &Path) -> Result<Self, BlobsStoreError> {
70 tracing::debug!("BlobsStore::load called with path: {:?}", path);
71 let store = FsStore::load(path).await?;
72 tracing::debug!("BlobsStore::load completed loading FsStore");
73 let blobs = BlobsProtocol::new(&store, None);
75 Ok(Self {
76 inner: Arc::new(blobs),
77 })
78 }
79
80 pub async fn memory() -> Result<Self, BlobsStoreError> {
82 let store = MemStore::new();
83 let blobs = BlobsProtocol::new(&store, None);
84 Ok(Self {
85 inner: Arc::new(blobs),
86 })
87 }
88
89 pub fn blobs(&self) -> &Blobs {
92 self.inner.store().blobs()
93 }
94
95 pub async fn get(&self, hash: &Hash) -> Result<Bytes, BlobsStoreError> {
97 let bytes = self.blobs().get_bytes(*hash).await?;
98 Ok(bytes)
99 }
100
101 pub async fn get_cbor<T: BlockEncoded<DagCborCodec>>(
103 &self,
104 hash: &Hash,
105 ) -> Result<T, BlobsStoreError> {
106 let bytes = self.blobs().get_bytes(*hash).await?;
107 Ok(T::decode(&bytes)?)
108 }
109
110 pub async fn get_reader(&self, hash: Hash) -> Result<Reader, BlobsStoreError> {
112 let reader = self.blobs().reader(hash);
113 Ok(reader)
114 }
115
116 pub async fn put_stream(
118 &self,
119 stream: impl Stream<Item = std::io::Result<Bytes>> + Send + Unpin + 'static + std::marker::Sync,
120 ) -> Result<Hash, BlobsStoreError> {
121 let outcome = self
122 .blobs()
123 .add_stream(stream)
124 .into_future()
125 .await
126 .with_tag()
127 .await?
128 .hash;
129 Ok(outcome)
130 }
131
132 pub async fn put(&self, data: Vec<u8>) -> Result<Hash, BlobsStoreError> {
134 let hash = self.blobs().add_bytes(data).into_future().await?.hash;
135 Ok(hash)
136 }
137
138 pub async fn stat(&self, hash: &Hash) -> Result<bool, BlobsStoreError> {
140 let stat = self
141 .blobs()
142 .status(*hash)
143 .await
144 .map_err(|err| BlobsStoreError::Default(anyhow!(err)))?;
145 Ok(matches!(stat, BlobStatus::Complete { .. }))
146 }
147
148 pub async fn download_hash(
153 &self,
154 hash: Hash,
155 peer_ids: Vec<PublicKey>,
156 endpoint: &Endpoint,
157 ) -> Result<(), BlobsStoreError> {
158 tracing::debug!("download_hash: Checking if hash {} exists locally", hash);
159
160 if self.stat(&hash).await? {
162 tracing::debug!(
163 "download_hash: Hash {} already exists locally, skipping download",
164 hash
165 );
166 return Ok(());
167 }
168
169 tracing::info!(
170 "download_hash: Downloading hash {} from {} peers: {:?}",
171 hash,
172 peer_ids.len(),
173 peer_ids
174 );
175
176 let downloader = Downloader::new(self.inner.store(), endpoint);
178
179 let discovery = Shuffled::new(
181 peer_ids
182 .iter()
183 .map(|peer_id| NodeId::from(*peer_id))
184 .collect(),
185 );
186
187 tracing::debug!(
188 "download_hash: Starting download of hash {} with downloader",
189 hash
190 );
191
192 match downloader.download(hash, discovery).await {
195 Ok(_) => {
196 tracing::info!("download_hash: Successfully downloaded hash {}", hash);
197
198 match self.stat(&hash).await {
200 Ok(true) => tracing::debug!(
201 "download_hash: Verified hash {} exists after download",
202 hash
203 ),
204 Ok(false) => {
205 tracing::error!("download_hash: Hash {} NOT found after download!", hash);
206 return Err(anyhow!("Hash not found after download").into());
207 }
208 Err(e) => {
209 tracing::error!("download_hash: Error verifying hash {}: {}", hash, e);
210 return Err(e);
211 }
212 }
213 }
214 Err(e) => {
215 tracing::error!(
216 "download_hash: Failed to download hash {} from peers {:?}: {}",
217 hash,
218 peer_ids,
219 e
220 );
221 return Err(e.into());
222 }
223 }
224
225 Ok(())
226 }
227
228 pub async fn download_hash_list(
233 &self,
234 hash_list_hash: Hash,
235 peer_ids: Vec<PublicKey>,
236 endpoint: &Endpoint,
237 ) -> Result<(), BlobsStoreError> {
238 tracing::debug!(
239 "download_hash_list: Starting download of hash list {} from {} peers",
240 hash_list_hash,
241 peer_ids.len()
242 );
243
244 tracing::debug!("download_hash_list: Downloading hash list blob itself");
246 self.download_hash(hash_list_hash, peer_ids.clone(), endpoint)
247 .await?;
248 tracing::debug!("download_hash_list: Hash list blob downloaded successfully");
249
250 match self.stat(&hash_list_hash).await {
252 Ok(true) => tracing::debug!(
253 "download_hash_list: Verified hash list blob {} exists",
254 hash_list_hash
255 ),
256 Ok(false) => {
257 tracing::error!(
258 "download_hash_list: Hash list blob {} NOT found after download!",
259 hash_list_hash
260 );
261 return Err(anyhow!("Hash list blob not found after download").into());
262 }
263 Err(e) => {
264 tracing::error!("download_hash_list: Error checking hash list blob: {}", e);
265 return Err(e);
266 }
267 }
268
269 tracing::debug!("download_hash_list: Reading hash list contents");
271 let hashes = self.read_hash_list(hash_list_hash).await?;
272 tracing::info!(
273 "download_hash_list: Hash list contains {} hashes, downloading all...",
274 hashes.len()
275 );
276
277 if hashes.is_empty() {
278 tracing::warn!("download_hash_list: Hash list is EMPTY - no content to download");
279 return Ok(());
280 }
281
282 for (idx, hash) in hashes.iter().enumerate() {
284 tracing::debug!(
285 "download_hash_list: Downloading content hash {}/{}: {:?}",
286 idx + 1,
287 hashes.len(),
288 hash
289 );
290 match self.download_hash(*hash, peer_ids.clone(), endpoint).await {
291 Ok(()) => {
292 tracing::debug!(
293 "download_hash_list: Content hash {}/{} downloaded successfully",
294 idx + 1,
295 hashes.len()
296 );
297 }
298 Err(e) => {
299 tracing::error!(
300 "download_hash_list: Failed to download content hash {}/{} ({:?}): {}",
301 idx + 1,
302 hashes.len(),
303 hash,
304 e
305 );
306 return Err(e);
307 }
308 }
309 }
310
311 tracing::info!(
312 "download_hash_list: Successfully downloaded all {} hashes from hash list",
313 hashes.len()
314 );
315
316 Ok(())
317 }
318
319 pub async fn create_hash_list<I>(&self, hashes: I) -> Result<Hash, BlobsStoreError>
323 where
324 I: IntoIterator<Item = Hash>,
325 {
326 let mut data = Vec::new();
328 for hash in hashes {
329 data.extend_from_slice(hash.as_bytes());
330 }
331
332 let hash = self.put(data).await?;
334 Ok(hash)
335 }
336
337 pub async fn read_hash_list(&self, list_hash: Hash) -> Result<Vec<Hash>, BlobsStoreError> {
340 let mut hashes = Vec::new();
341
342 let data = self.get(&list_hash).await?;
344
345 if data.len() % 32 != 0 {
347 return Err(anyhow!("Invalid hash list: length is not a multiple of 32").into());
348 }
349
350 for chunk in data.chunks_exact(32) {
351 let mut hash_bytes = [0u8; 32];
352 hash_bytes.copy_from_slice(chunk);
353 hashes.push(Hash::from_bytes(hash_bytes));
354 }
355
356 Ok(hashes)
357 }
358}
359
360#[cfg(test)]
361mod tests {
362 use super::*;
363 use bytes::Bytes;
364 use futures::stream;
365 use tempfile::TempDir;
366
367 async fn setup_test_store() -> (BlobsStore, TempDir) {
368 let temp_dir = TempDir::new().unwrap();
369 let blob_path = temp_dir.path().join("blobs");
370
371 let blobs = BlobsStore::fs(&blob_path).await.unwrap();
374 (blobs, temp_dir)
375 }
376
377 #[tokio::test]
378 async fn test_put_and_get() {
379 let (store, _temp) = setup_test_store().await;
380
381 let data = b"Hello, BlobsStore!";
383
384 let hash = store.put(data.to_vec()).await.unwrap();
386 assert!(!hash.as_bytes().is_empty());
387
388 let retrieved = store.get(&hash).await.unwrap();
390 assert_eq!(retrieved.as_ref(), data);
391 }
392
393 #[tokio::test]
394 async fn test_put_stream() {
395 let (store, _temp) = setup_test_store().await;
396
397 let data = b"Streaming data test";
399 let stream =
400 stream::once(async move { Ok::<_, std::io::Error>(Bytes::from(data.to_vec())) });
401
402 let hash = store.put_stream(Box::pin(stream)).await.unwrap();
404
405 let retrieved = store.get(&hash).await.unwrap();
407 assert_eq!(retrieved.as_ref(), data);
408 }
409
410 #[tokio::test]
411 async fn test_stat() {
412 let (store, _temp) = setup_test_store().await;
413
414 let data = b"Test data for stat";
415 let hash = store.put(data.to_vec()).await.unwrap();
416
417 assert!(store.stat(&hash).await.unwrap());
419
420 let fake_hash = iroh_blobs::Hash::from_bytes([0u8; 32]);
422 assert!(!store.stat(&fake_hash).await.unwrap());
423 }
424
425 #[tokio::test]
426 async fn test_large_data() {
427 let (store, _temp) = setup_test_store().await;
428
429 let data = vec![42u8; 1024 * 1024];
431
432 let hash = store.put(data.clone()).await.unwrap();
434 let retrieved = store.get(&hash).await.unwrap();
435
436 assert_eq!(retrieved.len(), data.len());
437 assert_eq!(retrieved.as_ref(), data.as_slice());
438 }
439
440 #[tokio::test]
441 async fn test_multiple_puts() {
442 let (store, _temp) = setup_test_store().await;
443
444 let data1 = b"First data";
445 let data2 = b"Second data";
446 let data3 = b"Third data";
447
448 let hash1 = store.put(data1.to_vec()).await.unwrap();
450 let hash2 = store.put(data2.to_vec()).await.unwrap();
451 let hash3 = store.put(data3.to_vec()).await.unwrap();
452
453 assert_ne!(hash1, hash2);
455 assert_ne!(hash2, hash3);
456 assert_ne!(hash1, hash3);
457
458 assert_eq!(store.get(&hash1).await.unwrap().as_ref(), data1);
460 assert_eq!(store.get(&hash2).await.unwrap().as_ref(), data2);
461 assert_eq!(store.get(&hash3).await.unwrap().as_ref(), data3);
462 }
463
464 #[tokio::test]
465 async fn test_get_nonexistent() {
466 let (store, _temp) = setup_test_store().await;
467
468 let fake_hash = iroh_blobs::Hash::from_bytes([99u8; 32]);
470 let result = store.get(&fake_hash).await;
471
472 assert!(result.is_err());
474 }
475}