common/peer/
blobs_store.rs

1use std::future::IntoFuture;
2use std::ops::Deref;
3use std::path::Path;
4use std::sync::Arc;
5
6use anyhow::anyhow;
7use bytes::Bytes;
8use futures::Stream;
9use iroh::{Endpoint, NodeId};
10use iroh_blobs::{
11    api::{
12        blobs::{BlobReader as Reader, BlobStatus, Blobs},
13        downloader::{Downloader, Shuffled},
14        ExportBaoError, RequestError,
15    },
16    store::fs::FsStore,
17    BlobsProtocol, Hash,
18};
19
20// TODO (amiller68): maybe at some point it would make sense
21//  to implement some sort of `BlockStore` trait over BlobStore
22/// Client over a local iroh-blob store.
23///  Exposes an iroh-blobs peer over the endpoint.
24///  Router must handle the iroh-blobs APLN
25/// Also acts as our main BlockStore implemenetation
26///  for bucket, node, and data storage and retrieval
27#[derive(Clone, Debug)]
28pub struct BlobsStore {
29    pub inner: Arc<BlobsProtocol>,
30}
31
32impl Deref for BlobsStore {
33    type Target = Arc<BlobsProtocol>;
34    fn deref(&self) -> &Self::Target {
35        &self.inner
36    }
37}
38
39#[derive(Debug, thiserror::Error)]
40pub enum BlobsStoreError {
41    #[error("blobs store error: {0}")]
42    Default(#[from] anyhow::Error),
43    #[error("blob store i/o error: {0}")]
44    Io(#[from] std::io::Error),
45    #[error("export bao error: {0}")]
46    ExportBao(#[from] ExportBaoError),
47    #[error("request error: {0}")]
48    Request(#[from] RequestError),
49}
50
51impl BlobsStore {
52    /// Load a blob store from the given path, using the given endpoint.
53    ///  Endpoint exposes a network interface for blob operations
54    ///  with peers.
55    ///
56    /// # Arguments
57    /// * `path` - Path to the blob store on disk
58    /// * `endpoint` - Endpoint to use for network operations
59    ///     Exposes a peer for the private key used to initiate
60    ///     the endpoint.
61    #[allow(clippy::doc_overindented_list_items)]
62    pub async fn load(path: &Path) -> Result<Self, BlobsStoreError> {
63        let store = FsStore::load(path).await?;
64        // let blobs = Blobs::builder(store).build(&endpoint);
65        let blobs = BlobsProtocol::new(&store, None);
66        Ok(Self {
67            inner: Arc::new(blobs),
68        })
69    }
70
71    /// Get a handle to the underlying blobs client against
72    ///  the store
73    pub fn blobs(&self) -> &Blobs {
74        self.inner.store().blobs()
75    }
76
77    /// Get a blob as bytes
78    pub async fn get(&self, hash: &Hash) -> Result<Bytes, BlobsStoreError> {
79        let bytes = self.blobs().get_bytes(*hash).await?;
80        Ok(bytes)
81    }
82
83    /// Get a blob from the store as a reader
84    pub async fn get_reader(&self, hash: Hash) -> Result<Reader, BlobsStoreError> {
85        let reader = self.blobs().reader(hash);
86        Ok(reader)
87    }
88
89    /// Store a stream of bytes as a blob
90    pub async fn put_stream(
91        &self,
92        stream: impl Stream<Item = std::io::Result<Bytes>> + Send + Unpin + 'static + std::marker::Sync,
93    ) -> Result<Hash, BlobsStoreError> {
94        let outcome = self
95            .blobs()
96            .add_stream(stream)
97            .into_future()
98            .await
99            .with_tag()
100            .await?
101            .hash;
102        Ok(outcome)
103    }
104
105    /// Store a vec of bytes as a blob
106    pub async fn put(&self, data: Vec<u8>) -> Result<Hash, BlobsStoreError> {
107        let hash = self.blobs().add_bytes(data).into_future().await?.hash;
108        Ok(hash)
109    }
110
111    /// Get the stat of a blob
112    pub async fn stat(&self, hash: &Hash) -> Result<bool, BlobsStoreError> {
113        let stat = self
114            .blobs()
115            .status(*hash)
116            .await
117            .map_err(|err| BlobsStoreError::Default(anyhow!(err)))?;
118        Ok(matches!(stat, BlobStatus::Complete { .. }))
119    }
120
121    /// Download a single hash from peers
122    ///
123    /// This checks if the hash exists locally first, then downloads if needed.
124    /// Uses the Downloader API with Shuffled content discovery.
125    pub async fn download_hash(
126        &self,
127        hash: Hash,
128        peer_ids: Vec<NodeId>,
129        endpoint: &Endpoint,
130    ) -> Result<(), BlobsStoreError> {
131        tracing::debug!("download_hash: Checking if hash {} exists locally", hash);
132
133        // Check if we already have this hash
134        if self.stat(&hash).await? {
135            tracing::debug!(
136                "download_hash: Hash {} already exists locally, skipping download",
137                hash
138            );
139            return Ok(());
140        }
141
142        tracing::info!(
143            "download_hash: Downloading hash {} from {} peers: {:?}",
144            hash,
145            peer_ids.len(),
146            peer_ids
147        );
148
149        // Create downloader - needs the Store from BlobsProtocol
150        let downloader = Downloader::new(self.inner.store(), endpoint);
151
152        // Create content discovery with shuffled peers
153        let discovery = Shuffled::new(peer_ids.clone());
154
155        tracing::debug!(
156            "download_hash: Starting download of hash {} with downloader",
157            hash
158        );
159
160        // Download the hash and wait for completion
161        // DownloadProgress implements IntoFuture, so we can await it directly
162        match downloader.download(hash, discovery).await {
163            Ok(_) => {
164                tracing::info!("download_hash: Successfully downloaded hash {}", hash);
165
166                // Verify it was actually downloaded
167                match self.stat(&hash).await {
168                    Ok(true) => tracing::debug!(
169                        "download_hash: Verified hash {} exists after download",
170                        hash
171                    ),
172                    Ok(false) => {
173                        tracing::error!("download_hash: Hash {} NOT found after download!", hash);
174                        return Err(anyhow!("Hash not found after download").into());
175                    }
176                    Err(e) => {
177                        tracing::error!("download_hash: Error verifying hash {}: {}", hash, e);
178                        return Err(e);
179                    }
180                }
181            }
182            Err(e) => {
183                tracing::error!(
184                    "download_hash: Failed to download hash {} from peers {:?}: {}",
185                    hash,
186                    peer_ids,
187                    e
188                );
189                return Err(e.into());
190            }
191        }
192
193        Ok(())
194    }
195
196    /// Download a hash list (pinset) and all referenced hashes
197    ///
198    /// This first downloads the hash list blob, reads the list of hashes,
199    /// then downloads each referenced hash.
200    pub async fn download_hash_list(
201        &self,
202        hash_list_hash: Hash,
203        peer_ids: Vec<NodeId>,
204        endpoint: &Endpoint,
205    ) -> Result<(), BlobsStoreError> {
206        tracing::debug!(
207            "download_hash_list: Starting download of hash list {} from {} peers",
208            hash_list_hash,
209            peer_ids.len()
210        );
211
212        // First download the hash list itself
213        tracing::debug!("download_hash_list: Downloading hash list blob itself");
214        self.download_hash(hash_list_hash, peer_ids.clone(), endpoint)
215            .await?;
216        tracing::debug!("download_hash_list: Hash list blob downloaded successfully");
217
218        // Verify it exists
219        match self.stat(&hash_list_hash).await {
220            Ok(true) => tracing::debug!(
221                "download_hash_list: Verified hash list blob {} exists",
222                hash_list_hash
223            ),
224            Ok(false) => {
225                tracing::error!(
226                    "download_hash_list: Hash list blob {} NOT found after download!",
227                    hash_list_hash
228                );
229                return Err(anyhow!("Hash list blob not found after download").into());
230            }
231            Err(e) => {
232                tracing::error!("download_hash_list: Error checking hash list blob: {}", e);
233                return Err(e);
234            }
235        }
236
237        // Read the list of hashes
238        tracing::debug!("download_hash_list: Reading hash list contents");
239        let hashes = self.read_hash_list(hash_list_hash).await?;
240        tracing::info!(
241            "download_hash_list: Hash list contains {} hashes, downloading all...",
242            hashes.len()
243        );
244
245        if hashes.is_empty() {
246            tracing::warn!("download_hash_list: Hash list is EMPTY - no content to download");
247            return Ok(());
248        }
249
250        // Download each hash in the list
251        for (idx, hash) in hashes.iter().enumerate() {
252            tracing::debug!(
253                "download_hash_list: Downloading content hash {}/{}: {:?}",
254                idx + 1,
255                hashes.len(),
256                hash
257            );
258            match self.download_hash(*hash, peer_ids.clone(), endpoint).await {
259                Ok(()) => {
260                    tracing::debug!(
261                        "download_hash_list: Content hash {}/{} downloaded successfully",
262                        idx + 1,
263                        hashes.len()
264                    );
265                }
266                Err(e) => {
267                    tracing::error!(
268                        "download_hash_list: Failed to download content hash {}/{} ({:?}): {}",
269                        idx + 1,
270                        hashes.len(),
271                        hash,
272                        e
273                    );
274                    return Err(e);
275                }
276            }
277        }
278
279        tracing::info!(
280            "download_hash_list: Successfully downloaded all {} hashes from hash list",
281            hashes.len()
282        );
283
284        Ok(())
285    }
286
287    /// Create a simple blob containing a sequence of hashes
288    /// Each hash is 32 bytes, stored consecutively
289    /// Returns the hash of the blob containing all the hashes
290    pub async fn create_hash_list<I>(&self, hashes: I) -> Result<Hash, BlobsStoreError>
291    where
292        I: IntoIterator<Item = Hash>,
293    {
294        // Serialize hashes as raw bytes (32 bytes each, concatenated)
295        let mut data = Vec::new();
296        for hash in hashes {
297            data.extend_from_slice(hash.as_bytes());
298        }
299
300        // Store as a single blob
301        let hash = self.put(data).await?;
302        Ok(hash)
303    }
304
305    /// Read all hashes from a hash list blob
306    /// Returns a Vec of all hashes in the list
307    pub async fn read_hash_list(&self, list_hash: Hash) -> Result<Vec<Hash>, BlobsStoreError> {
308        let mut hashes = Vec::new();
309
310        // Read the blob
311        let data = self.get(&list_hash).await?;
312
313        // Parse hashes (32 bytes each)
314        if data.len() % 32 != 0 {
315            return Err(anyhow!("Invalid hash list: length is not a multiple of 32").into());
316        }
317
318        for chunk in data.chunks_exact(32) {
319            let mut hash_bytes = [0u8; 32];
320            hash_bytes.copy_from_slice(chunk);
321            hashes.push(Hash::from_bytes(hash_bytes));
322        }
323
324        Ok(hashes)
325    }
326}
327
328#[cfg(test)]
329mod tests {
330    use super::*;
331    use bytes::Bytes;
332    use futures::stream;
333    use tempfile::TempDir;
334
335    async fn setup_test_store() -> (BlobsStore, TempDir) {
336        let temp_dir = TempDir::new().unwrap();
337        let blob_path = temp_dir.path().join("blobs");
338
339        // let store = FsStore::load(&blob_path).await.unwrap();
340        // let blobs = BlobsProtocol::new(&store, None);
341        let blobs = BlobsStore::load(&blob_path).await.unwrap();
342        (blobs, temp_dir)
343    }
344
345    #[tokio::test]
346    async fn test_put_and_get() {
347        let (store, _temp) = setup_test_store().await;
348
349        // Test data
350        let data = b"Hello, BlobsStore!";
351
352        // Put data
353        let hash = store.put(data.to_vec()).await.unwrap();
354        assert!(!hash.as_bytes().is_empty());
355
356        // Get data back
357        let retrieved = store.get(&hash).await.unwrap();
358        assert_eq!(retrieved.as_ref(), data);
359    }
360
361    #[tokio::test]
362    async fn test_put_stream() {
363        let (store, _temp) = setup_test_store().await;
364
365        // Create a stream of data
366        let data = b"Streaming data test";
367        let stream =
368            stream::once(async move { Ok::<_, std::io::Error>(Bytes::from(data.to_vec())) });
369
370        // Put stream
371        let hash = store.put_stream(Box::pin(stream)).await.unwrap();
372
373        // Verify we can get it back
374        let retrieved = store.get(&hash).await.unwrap();
375        assert_eq!(retrieved.as_ref(), data);
376    }
377
378    #[tokio::test]
379    async fn test_stat() {
380        let (store, _temp) = setup_test_store().await;
381
382        let data = b"Test data for stat";
383        let hash = store.put(data.to_vec()).await.unwrap();
384
385        // Should exist
386        assert!(store.stat(&hash).await.unwrap());
387
388        // Non-existent hash should not exist
389        let fake_hash = iroh_blobs::Hash::from_bytes([0u8; 32]);
390        assert!(!store.stat(&fake_hash).await.unwrap());
391    }
392
393    #[tokio::test]
394    async fn test_large_data() {
395        let (store, _temp) = setup_test_store().await;
396
397        // Create large data (1MB)
398        let data = vec![42u8; 1024 * 1024];
399
400        // Put and get large data
401        let hash = store.put(data.clone()).await.unwrap();
402        let retrieved = store.get(&hash).await.unwrap();
403
404        assert_eq!(retrieved.len(), data.len());
405        assert_eq!(retrieved.as_ref(), data.as_slice());
406    }
407
408    #[tokio::test]
409    async fn test_multiple_puts() {
410        let (store, _temp) = setup_test_store().await;
411
412        let data1 = b"First data";
413        let data2 = b"Second data";
414        let data3 = b"Third data";
415
416        // Put multiple items
417        let hash1 = store.put(data1.to_vec()).await.unwrap();
418        let hash2 = store.put(data2.to_vec()).await.unwrap();
419        let hash3 = store.put(data3.to_vec()).await.unwrap();
420
421        // Verify all are different hashes
422        assert_ne!(hash1, hash2);
423        assert_ne!(hash2, hash3);
424        assert_ne!(hash1, hash3);
425
426        // Verify all can be retrieved
427        assert_eq!(store.get(&hash1).await.unwrap().as_ref(), data1);
428        assert_eq!(store.get(&hash2).await.unwrap().as_ref(), data2);
429        assert_eq!(store.get(&hash3).await.unwrap().as_ref(), data3);
430    }
431
432    #[tokio::test]
433    async fn test_get_nonexistent() {
434        let (store, _temp) = setup_test_store().await;
435
436        // Try to get non-existent data
437        let fake_hash = iroh_blobs::Hash::from_bytes([99u8; 32]);
438        let result = store.get(&fake_hash).await;
439
440        // Should return an error
441        assert!(result.is_err());
442    }
443}