common/peer/
blobs_store.rs

1use std::future::IntoFuture;
2use std::ops::Deref;
3use std::path::Path;
4use std::sync::Arc;
5
6use anyhow::anyhow;
7use bytes::Bytes;
8use futures::Stream;
9use iroh::{Endpoint, NodeId};
10use iroh_blobs::{
11    api::{
12        blobs::{BlobReader as Reader, BlobStatus, Blobs},
13        downloader::{Downloader, Shuffled},
14        ExportBaoError, RequestError,
15    },
16    store::{fs::FsStore, mem::MemStore},
17    BlobsProtocol, Hash,
18};
19
20use crate::{
21    crypto::PublicKey,
22    linked_data::{BlockEncoded, CodecError, DagCborCodec},
23};
24
25// TODO (amiller68): maybe at some point it would make sense
26//  to implement some sort of `BlockStore` trait over BlobStore
27/// Client over a local iroh-blob store.
28///  Exposes an iroh-blobs peer over the endpoint.
29///  Router must handle the iroh-blobs APLN
30/// Also acts as our main BlockStore implemenetation
31///  for bucket, node, and data storage and retrieval
32#[derive(Clone, Debug)]
33pub struct BlobsStore {
34    pub inner: Arc<BlobsProtocol>,
35}
36
37impl Deref for BlobsStore {
38    type Target = Arc<BlobsProtocol>;
39    fn deref(&self) -> &Self::Target {
40        &self.inner
41    }
42}
43
44#[derive(Debug, thiserror::Error)]
45pub enum BlobsStoreError {
46    #[error("blobs store error: {0}")]
47    Default(#[from] anyhow::Error),
48    #[error("blob store i/o error: {0}")]
49    Io(#[from] std::io::Error),
50    #[error("export bao error: {0}")]
51    ExportBao(#[from] ExportBaoError),
52    #[error("request error: {0}")]
53    Request(#[from] RequestError),
54    #[error("decode error: {0}")]
55    Decode(#[from] CodecError),
56}
57
58impl BlobsStore {
59    /// Load a blob store from the given path, using the given endpoint.
60    ///  Endpoint exposes a network interface for blob operations
61    ///  with peers.
62    ///
63    /// # Arguments
64    /// * `path` - Path to the blob store on disk
65    /// * `endpoint` - Endpoint to use for network operations
66    ///     Exposes a peer for the private key used to initiate
67    ///     the endpoint.
68    #[allow(clippy::doc_overindented_list_items)]
69    pub async fn fs(path: &Path) -> Result<Self, BlobsStoreError> {
70        tracing::debug!("BlobsStore::load called with path: {:?}", path);
71        let store = FsStore::load(path).await?;
72        tracing::debug!("BlobsStore::load completed loading FsStore");
73        // let blobs = Blobs::builder(store).build(&endpoint);
74        let blobs = BlobsProtocol::new(&store, None);
75        Ok(Self {
76            inner: Arc::new(blobs),
77        })
78    }
79
80    /// Load a memory blobs store
81    pub async fn memory() -> Result<Self, BlobsStoreError> {
82        let store = MemStore::new();
83        let blobs = BlobsProtocol::new(&store, None);
84        Ok(Self {
85            inner: Arc::new(blobs),
86        })
87    }
88
89    /// Get a handle to the underlying blobs client against
90    ///  the store
91    pub fn blobs(&self) -> &Blobs {
92        self.inner.store().blobs()
93    }
94
95    /// Get a blob as bytes
96    pub async fn get(&self, hash: &Hash) -> Result<Bytes, BlobsStoreError> {
97        let bytes = self.blobs().get_bytes(*hash).await?;
98        Ok(bytes)
99    }
100
101    /// Get a blob as a block encoded
102    pub async fn get_cbor<T: BlockEncoded<DagCborCodec>>(
103        &self,
104        hash: &Hash,
105    ) -> Result<T, BlobsStoreError> {
106        let bytes = self.blobs().get_bytes(*hash).await?;
107        Ok(T::decode(&bytes)?)
108    }
109
110    /// Get a blob from the store as a reader
111    pub async fn get_reader(&self, hash: Hash) -> Result<Reader, BlobsStoreError> {
112        let reader = self.blobs().reader(hash);
113        Ok(reader)
114    }
115
116    /// Store a stream of bytes as a blob
117    pub async fn put_stream(
118        &self,
119        stream: impl Stream<Item = std::io::Result<Bytes>> + Send + Unpin + 'static + std::marker::Sync,
120    ) -> Result<Hash, BlobsStoreError> {
121        let outcome = self
122            .blobs()
123            .add_stream(stream)
124            .into_future()
125            .await
126            .with_tag()
127            .await?
128            .hash;
129        Ok(outcome)
130    }
131
132    /// Store a vec of bytes as a blob
133    pub async fn put(&self, data: Vec<u8>) -> Result<Hash, BlobsStoreError> {
134        let hash = self.blobs().add_bytes(data).into_future().await?.hash;
135        Ok(hash)
136    }
137
138    /// Get the stat of a blob
139    pub async fn stat(&self, hash: &Hash) -> Result<bool, BlobsStoreError> {
140        let stat = self
141            .blobs()
142            .status(*hash)
143            .await
144            .map_err(|err| BlobsStoreError::Default(anyhow!(err)))?;
145        Ok(matches!(stat, BlobStatus::Complete { .. }))
146    }
147
148    /// Download a single hash from peers
149    ///
150    /// This checks if the hash exists locally first, then downloads if needed.
151    /// Uses the Downloader API with Shuffled content discovery.
152    pub async fn download_hash(
153        &self,
154        hash: Hash,
155        peer_ids: Vec<PublicKey>,
156        endpoint: &Endpoint,
157    ) -> Result<(), BlobsStoreError> {
158        tracing::debug!("download_hash: Checking if hash {} exists locally", hash);
159
160        // Check if we already have this hash
161        if self.stat(&hash).await? {
162            tracing::debug!(
163                "download_hash: Hash {} already exists locally, skipping download",
164                hash
165            );
166            return Ok(());
167        }
168
169        tracing::info!(
170            "download_hash: Downloading hash {} from {} peers: {:?}",
171            hash,
172            peer_ids.len(),
173            peer_ids
174        );
175
176        // Create downloader - needs the Store from BlobsProtocol
177        let downloader = Downloader::new(self.inner.store(), endpoint);
178
179        // Create content discovery with shuffled peers
180        let discovery = Shuffled::new(
181            peer_ids
182                .iter()
183                .map(|peer_id| NodeId::from(*peer_id))
184                .collect(),
185        );
186
187        tracing::debug!(
188            "download_hash: Starting download of hash {} with downloader",
189            hash
190        );
191
192        // Download the hash and wait for completion
193        // DownloadProgress implements IntoFuture, so we can await it directly
194        match downloader.download(hash, discovery).await {
195            Ok(_) => {
196                tracing::info!("download_hash: Successfully downloaded hash {}", hash);
197
198                // Verify it was actually downloaded
199                match self.stat(&hash).await {
200                    Ok(true) => tracing::debug!(
201                        "download_hash: Verified hash {} exists after download",
202                        hash
203                    ),
204                    Ok(false) => {
205                        tracing::error!("download_hash: Hash {} NOT found after download!", hash);
206                        return Err(anyhow!("Hash not found after download").into());
207                    }
208                    Err(e) => {
209                        tracing::error!("download_hash: Error verifying hash {}: {}", hash, e);
210                        return Err(e);
211                    }
212                }
213            }
214            Err(e) => {
215                tracing::error!(
216                    "download_hash: Failed to download hash {} from peers {:?}: {}",
217                    hash,
218                    peer_ids,
219                    e
220                );
221                return Err(e.into());
222            }
223        }
224
225        Ok(())
226    }
227
228    /// Download a hash list (pinset) and all referenced hashes
229    ///
230    /// This first downloads the hash list blob, reads the list of hashes,
231    /// then downloads each referenced hash.
232    pub async fn download_hash_list(
233        &self,
234        hash_list_hash: Hash,
235        peer_ids: Vec<PublicKey>,
236        endpoint: &Endpoint,
237    ) -> Result<(), BlobsStoreError> {
238        tracing::debug!(
239            "download_hash_list: Starting download of hash list {} from {} peers",
240            hash_list_hash,
241            peer_ids.len()
242        );
243
244        // First download the hash list itself
245        tracing::debug!("download_hash_list: Downloading hash list blob itself");
246        self.download_hash(hash_list_hash, peer_ids.clone(), endpoint)
247            .await?;
248        tracing::debug!("download_hash_list: Hash list blob downloaded successfully");
249
250        // Verify it exists
251        match self.stat(&hash_list_hash).await {
252            Ok(true) => tracing::debug!(
253                "download_hash_list: Verified hash list blob {} exists",
254                hash_list_hash
255            ),
256            Ok(false) => {
257                tracing::error!(
258                    "download_hash_list: Hash list blob {} NOT found after download!",
259                    hash_list_hash
260                );
261                return Err(anyhow!("Hash list blob not found after download").into());
262            }
263            Err(e) => {
264                tracing::error!("download_hash_list: Error checking hash list blob: {}", e);
265                return Err(e);
266            }
267        }
268
269        // Read the list of hashes
270        tracing::debug!("download_hash_list: Reading hash list contents");
271        let hashes = self.read_hash_list(hash_list_hash).await?;
272        tracing::info!(
273            "download_hash_list: Hash list contains {} hashes, downloading all...",
274            hashes.len()
275        );
276
277        if hashes.is_empty() {
278            tracing::warn!("download_hash_list: Hash list is EMPTY - no content to download");
279            return Ok(());
280        }
281
282        // Download each hash in the list
283        for (idx, hash) in hashes.iter().enumerate() {
284            tracing::debug!(
285                "download_hash_list: Downloading content hash {}/{}: {:?}",
286                idx + 1,
287                hashes.len(),
288                hash
289            );
290            match self.download_hash(*hash, peer_ids.clone(), endpoint).await {
291                Ok(()) => {
292                    tracing::debug!(
293                        "download_hash_list: Content hash {}/{} downloaded successfully",
294                        idx + 1,
295                        hashes.len()
296                    );
297                }
298                Err(e) => {
299                    tracing::error!(
300                        "download_hash_list: Failed to download content hash {}/{} ({:?}): {}",
301                        idx + 1,
302                        hashes.len(),
303                        hash,
304                        e
305                    );
306                    return Err(e);
307                }
308            }
309        }
310
311        tracing::info!(
312            "download_hash_list: Successfully downloaded all {} hashes from hash list",
313            hashes.len()
314        );
315
316        Ok(())
317    }
318
319    /// Create a simple blob containing a sequence of hashes
320    /// Each hash is 32 bytes, stored consecutively
321    /// Returns the hash of the blob containing all the hashes
322    pub async fn create_hash_list<I>(&self, hashes: I) -> Result<Hash, BlobsStoreError>
323    where
324        I: IntoIterator<Item = Hash>,
325    {
326        // Serialize hashes as raw bytes (32 bytes each, concatenated)
327        let mut data = Vec::new();
328        for hash in hashes {
329            data.extend_from_slice(hash.as_bytes());
330        }
331
332        // Store as a single blob
333        let hash = self.put(data).await?;
334        Ok(hash)
335    }
336
337    /// Read all hashes from a hash list blob
338    /// Returns a Vec of all hashes in the list
339    pub async fn read_hash_list(&self, list_hash: Hash) -> Result<Vec<Hash>, BlobsStoreError> {
340        let mut hashes = Vec::new();
341
342        // Read the blob
343        let data = self.get(&list_hash).await?;
344
345        // Parse hashes (32 bytes each)
346        if data.len() % 32 != 0 {
347            return Err(anyhow!("Invalid hash list: length is not a multiple of 32").into());
348        }
349
350        for chunk in data.chunks_exact(32) {
351            let mut hash_bytes = [0u8; 32];
352            hash_bytes.copy_from_slice(chunk);
353            hashes.push(Hash::from_bytes(hash_bytes));
354        }
355
356        Ok(hashes)
357    }
358}
359
360#[cfg(test)]
361mod tests {
362    use super::*;
363    use bytes::Bytes;
364    use futures::stream;
365    use tempfile::TempDir;
366
367    async fn setup_test_store() -> (BlobsStore, TempDir) {
368        let temp_dir = TempDir::new().unwrap();
369        let blob_path = temp_dir.path().join("blobs");
370
371        // let store = FsStore::load(&blob_path).await.unwrap();
372        // let blobs = BlobsProtocol::new(&store, None);
373        let blobs = BlobsStore::fs(&blob_path).await.unwrap();
374        (blobs, temp_dir)
375    }
376
377    #[tokio::test]
378    async fn test_put_and_get() {
379        let (store, _temp) = setup_test_store().await;
380
381        // Test data
382        let data = b"Hello, BlobsStore!";
383
384        // Put data
385        let hash = store.put(data.to_vec()).await.unwrap();
386        assert!(!hash.as_bytes().is_empty());
387
388        // Get data back
389        let retrieved = store.get(&hash).await.unwrap();
390        assert_eq!(retrieved.as_ref(), data);
391    }
392
393    #[tokio::test]
394    async fn test_put_stream() {
395        let (store, _temp) = setup_test_store().await;
396
397        // Create a stream of data
398        let data = b"Streaming data test";
399        let stream =
400            stream::once(async move { Ok::<_, std::io::Error>(Bytes::from(data.to_vec())) });
401
402        // Put stream
403        let hash = store.put_stream(Box::pin(stream)).await.unwrap();
404
405        // Verify we can get it back
406        let retrieved = store.get(&hash).await.unwrap();
407        assert_eq!(retrieved.as_ref(), data);
408    }
409
410    #[tokio::test]
411    async fn test_stat() {
412        let (store, _temp) = setup_test_store().await;
413
414        let data = b"Test data for stat";
415        let hash = store.put(data.to_vec()).await.unwrap();
416
417        // Should exist
418        assert!(store.stat(&hash).await.unwrap());
419
420        // Non-existent hash should not exist
421        let fake_hash = iroh_blobs::Hash::from_bytes([0u8; 32]);
422        assert!(!store.stat(&fake_hash).await.unwrap());
423    }
424
425    #[tokio::test]
426    async fn test_large_data() {
427        let (store, _temp) = setup_test_store().await;
428
429        // Create large data (1MB)
430        let data = vec![42u8; 1024 * 1024];
431
432        // Put and get large data
433        let hash = store.put(data.clone()).await.unwrap();
434        let retrieved = store.get(&hash).await.unwrap();
435
436        assert_eq!(retrieved.len(), data.len());
437        assert_eq!(retrieved.as_ref(), data.as_slice());
438    }
439
440    #[tokio::test]
441    async fn test_multiple_puts() {
442        let (store, _temp) = setup_test_store().await;
443
444        let data1 = b"First data";
445        let data2 = b"Second data";
446        let data3 = b"Third data";
447
448        // Put multiple items
449        let hash1 = store.put(data1.to_vec()).await.unwrap();
450        let hash2 = store.put(data2.to_vec()).await.unwrap();
451        let hash3 = store.put(data3.to_vec()).await.unwrap();
452
453        // Verify all are different hashes
454        assert_ne!(hash1, hash2);
455        assert_ne!(hash2, hash3);
456        assert_ne!(hash1, hash3);
457
458        // Verify all can be retrieved
459        assert_eq!(store.get(&hash1).await.unwrap().as_ref(), data1);
460        assert_eq!(store.get(&hash2).await.unwrap().as_ref(), data2);
461        assert_eq!(store.get(&hash3).await.unwrap().as_ref(), data3);
462    }
463
464    #[tokio::test]
465    async fn test_get_nonexistent() {
466        let (store, _temp) = setup_test_store().await;
467
468        // Try to get non-existent data
469        let fake_hash = iroh_blobs::Hash::from_bytes([99u8; 32]);
470        let result = store.get(&fake_hash).await;
471
472        // Should return an error
473        assert!(result.is_err());
474    }
475}