1use std::future::IntoFuture;
2use std::ops::Deref;
3use std::path::Path;
4use std::sync::Arc;
5
6use anyhow::anyhow;
7use bytes::Bytes;
8use futures::Stream;
9use iroh::{Endpoint, NodeId};
10use iroh_blobs::{
11 api::{
12 blobs::{BlobReader as Reader, BlobStatus, Blobs},
13 downloader::{Downloader, Shuffled},
14 ExportBaoError, RequestError,
15 },
16 store::fs::FsStore,
17 BlobsProtocol, Hash,
18};
19
20#[derive(Clone, Debug)]
28pub struct BlobsStore {
29 pub inner: Arc<BlobsProtocol>,
30}
31
32impl Deref for BlobsStore {
33 type Target = Arc<BlobsProtocol>;
34 fn deref(&self) -> &Self::Target {
35 &self.inner
36 }
37}
38
39#[derive(Debug, thiserror::Error)]
40pub enum BlobsStoreError {
41 #[error("blobs store error: {0}")]
42 Default(#[from] anyhow::Error),
43 #[error("blob store i/o error: {0}")]
44 Io(#[from] std::io::Error),
45 #[error("export bao error: {0}")]
46 ExportBao(#[from] ExportBaoError),
47 #[error("request error: {0}")]
48 Request(#[from] RequestError),
49}
50
51impl BlobsStore {
52 #[allow(clippy::doc_overindented_list_items)]
62 pub async fn load(path: &Path) -> Result<Self, BlobsStoreError> {
63 let store = FsStore::load(path).await?;
64 let blobs = BlobsProtocol::new(&store, None);
66 Ok(Self {
67 inner: Arc::new(blobs),
68 })
69 }
70
71 pub fn blobs(&self) -> &Blobs {
74 self.inner.store().blobs()
75 }
76
77 pub async fn get(&self, hash: &Hash) -> Result<Bytes, BlobsStoreError> {
79 let bytes = self.blobs().get_bytes(*hash).await?;
80 Ok(bytes)
81 }
82
83 pub async fn get_reader(&self, hash: Hash) -> Result<Reader, BlobsStoreError> {
85 let reader = self.blobs().reader(hash);
86 Ok(reader)
87 }
88
89 pub async fn put_stream(
91 &self,
92 stream: impl Stream<Item = std::io::Result<Bytes>> + Send + Unpin + 'static + std::marker::Sync,
93 ) -> Result<Hash, BlobsStoreError> {
94 let outcome = self
95 .blobs()
96 .add_stream(stream)
97 .into_future()
98 .await
99 .with_tag()
100 .await?
101 .hash;
102 Ok(outcome)
103 }
104
105 pub async fn put(&self, data: Vec<u8>) -> Result<Hash, BlobsStoreError> {
107 let hash = self.blobs().add_bytes(data).into_future().await?.hash;
108 Ok(hash)
109 }
110
111 pub async fn stat(&self, hash: &Hash) -> Result<bool, BlobsStoreError> {
113 let stat = self
114 .blobs()
115 .status(*hash)
116 .await
117 .map_err(|err| BlobsStoreError::Default(anyhow!(err)))?;
118 Ok(matches!(stat, BlobStatus::Complete { .. }))
119 }
120
121 pub async fn download_hash(
126 &self,
127 hash: Hash,
128 peer_ids: Vec<NodeId>,
129 endpoint: &Endpoint,
130 ) -> Result<(), BlobsStoreError> {
131 tracing::debug!("download_hash: Checking if hash {} exists locally", hash);
132
133 if self.stat(&hash).await? {
135 tracing::debug!(
136 "download_hash: Hash {} already exists locally, skipping download",
137 hash
138 );
139 return Ok(());
140 }
141
142 tracing::info!(
143 "download_hash: Downloading hash {} from {} peers: {:?}",
144 hash,
145 peer_ids.len(),
146 peer_ids
147 );
148
149 let downloader = Downloader::new(self.inner.store(), endpoint);
151
152 let discovery = Shuffled::new(peer_ids.clone());
154
155 tracing::debug!(
156 "download_hash: Starting download of hash {} with downloader",
157 hash
158 );
159
160 match downloader.download(hash, discovery).await {
163 Ok(_) => {
164 tracing::info!("download_hash: Successfully downloaded hash {}", hash);
165
166 match self.stat(&hash).await {
168 Ok(true) => tracing::debug!(
169 "download_hash: Verified hash {} exists after download",
170 hash
171 ),
172 Ok(false) => {
173 tracing::error!("download_hash: Hash {} NOT found after download!", hash);
174 return Err(anyhow!("Hash not found after download").into());
175 }
176 Err(e) => {
177 tracing::error!("download_hash: Error verifying hash {}: {}", hash, e);
178 return Err(e);
179 }
180 }
181 }
182 Err(e) => {
183 tracing::error!(
184 "download_hash: Failed to download hash {} from peers {:?}: {}",
185 hash,
186 peer_ids,
187 e
188 );
189 return Err(e.into());
190 }
191 }
192
193 Ok(())
194 }
195
196 pub async fn download_hash_list(
201 &self,
202 hash_list_hash: Hash,
203 peer_ids: Vec<NodeId>,
204 endpoint: &Endpoint,
205 ) -> Result<(), BlobsStoreError> {
206 tracing::debug!(
207 "download_hash_list: Starting download of hash list {} from {} peers",
208 hash_list_hash,
209 peer_ids.len()
210 );
211
212 tracing::debug!("download_hash_list: Downloading hash list blob itself");
214 self.download_hash(hash_list_hash, peer_ids.clone(), endpoint)
215 .await?;
216 tracing::debug!("download_hash_list: Hash list blob downloaded successfully");
217
218 match self.stat(&hash_list_hash).await {
220 Ok(true) => tracing::debug!(
221 "download_hash_list: Verified hash list blob {} exists",
222 hash_list_hash
223 ),
224 Ok(false) => {
225 tracing::error!(
226 "download_hash_list: Hash list blob {} NOT found after download!",
227 hash_list_hash
228 );
229 return Err(anyhow!("Hash list blob not found after download").into());
230 }
231 Err(e) => {
232 tracing::error!("download_hash_list: Error checking hash list blob: {}", e);
233 return Err(e);
234 }
235 }
236
237 tracing::debug!("download_hash_list: Reading hash list contents");
239 let hashes = self.read_hash_list(hash_list_hash).await?;
240 tracing::info!(
241 "download_hash_list: Hash list contains {} hashes, downloading all...",
242 hashes.len()
243 );
244
245 if hashes.is_empty() {
246 tracing::warn!("download_hash_list: Hash list is EMPTY - no content to download");
247 return Ok(());
248 }
249
250 for (idx, hash) in hashes.iter().enumerate() {
252 tracing::debug!(
253 "download_hash_list: Downloading content hash {}/{}: {:?}",
254 idx + 1,
255 hashes.len(),
256 hash
257 );
258 match self.download_hash(*hash, peer_ids.clone(), endpoint).await {
259 Ok(()) => {
260 tracing::debug!(
261 "download_hash_list: Content hash {}/{} downloaded successfully",
262 idx + 1,
263 hashes.len()
264 );
265 }
266 Err(e) => {
267 tracing::error!(
268 "download_hash_list: Failed to download content hash {}/{} ({:?}): {}",
269 idx + 1,
270 hashes.len(),
271 hash,
272 e
273 );
274 return Err(e);
275 }
276 }
277 }
278
279 tracing::info!(
280 "download_hash_list: Successfully downloaded all {} hashes from hash list",
281 hashes.len()
282 );
283
284 Ok(())
285 }
286
287 pub async fn create_hash_list<I>(&self, hashes: I) -> Result<Hash, BlobsStoreError>
291 where
292 I: IntoIterator<Item = Hash>,
293 {
294 let mut data = Vec::new();
296 for hash in hashes {
297 data.extend_from_slice(hash.as_bytes());
298 }
299
300 let hash = self.put(data).await?;
302 Ok(hash)
303 }
304
305 pub async fn read_hash_list(&self, list_hash: Hash) -> Result<Vec<Hash>, BlobsStoreError> {
308 let mut hashes = Vec::new();
309
310 let data = self.get(&list_hash).await?;
312
313 if data.len() % 32 != 0 {
315 return Err(anyhow!("Invalid hash list: length is not a multiple of 32").into());
316 }
317
318 for chunk in data.chunks_exact(32) {
319 let mut hash_bytes = [0u8; 32];
320 hash_bytes.copy_from_slice(chunk);
321 hashes.push(Hash::from_bytes(hash_bytes));
322 }
323
324 Ok(hashes)
325 }
326}
327
328#[cfg(test)]
329mod tests {
330 use super::*;
331 use bytes::Bytes;
332 use futures::stream;
333 use tempfile::TempDir;
334
335 async fn setup_test_store() -> (BlobsStore, TempDir) {
336 let temp_dir = TempDir::new().unwrap();
337 let blob_path = temp_dir.path().join("blobs");
338
339 let blobs = BlobsStore::load(&blob_path).await.unwrap();
342 (blobs, temp_dir)
343 }
344
345 #[tokio::test]
346 async fn test_put_and_get() {
347 let (store, _temp) = setup_test_store().await;
348
349 let data = b"Hello, BlobsStore!";
351
352 let hash = store.put(data.to_vec()).await.unwrap();
354 assert!(!hash.as_bytes().is_empty());
355
356 let retrieved = store.get(&hash).await.unwrap();
358 assert_eq!(retrieved.as_ref(), data);
359 }
360
361 #[tokio::test]
362 async fn test_put_stream() {
363 let (store, _temp) = setup_test_store().await;
364
365 let data = b"Streaming data test";
367 let stream =
368 stream::once(async move { Ok::<_, std::io::Error>(Bytes::from(data.to_vec())) });
369
370 let hash = store.put_stream(Box::pin(stream)).await.unwrap();
372
373 let retrieved = store.get(&hash).await.unwrap();
375 assert_eq!(retrieved.as_ref(), data);
376 }
377
378 #[tokio::test]
379 async fn test_stat() {
380 let (store, _temp) = setup_test_store().await;
381
382 let data = b"Test data for stat";
383 let hash = store.put(data.to_vec()).await.unwrap();
384
385 assert!(store.stat(&hash).await.unwrap());
387
388 let fake_hash = iroh_blobs::Hash::from_bytes([0u8; 32]);
390 assert!(!store.stat(&fake_hash).await.unwrap());
391 }
392
393 #[tokio::test]
394 async fn test_large_data() {
395 let (store, _temp) = setup_test_store().await;
396
397 let data = vec![42u8; 1024 * 1024];
399
400 let hash = store.put(data.clone()).await.unwrap();
402 let retrieved = store.get(&hash).await.unwrap();
403
404 assert_eq!(retrieved.len(), data.len());
405 assert_eq!(retrieved.as_ref(), data.as_slice());
406 }
407
408 #[tokio::test]
409 async fn test_multiple_puts() {
410 let (store, _temp) = setup_test_store().await;
411
412 let data1 = b"First data";
413 let data2 = b"Second data";
414 let data3 = b"Third data";
415
416 let hash1 = store.put(data1.to_vec()).await.unwrap();
418 let hash2 = store.put(data2.to_vec()).await.unwrap();
419 let hash3 = store.put(data3.to_vec()).await.unwrap();
420
421 assert_ne!(hash1, hash2);
423 assert_ne!(hash2, hash3);
424 assert_ne!(hash1, hash3);
425
426 assert_eq!(store.get(&hash1).await.unwrap().as_ref(), data1);
428 assert_eq!(store.get(&hash2).await.unwrap().as_ref(), data2);
429 assert_eq!(store.get(&hash3).await.unwrap().as_ref(), data3);
430 }
431
432 #[tokio::test]
433 async fn test_get_nonexistent() {
434 let (store, _temp) = setup_test_store().await;
435
436 let fake_hash = iroh_blobs::Hash::from_bytes([99u8; 32]);
438 let result = store.get(&fake_hash).await;
439
440 assert!(result.is_err());
442 }
443}