Skip to main content

common/peer/
blobs_store.rs

1use std::future::IntoFuture;
2use std::ops::Deref;
3use std::path::Path;
4use std::sync::Arc;
5
6use anyhow::anyhow;
7use bytes::Bytes;
8use futures::Stream;
9use iroh::{Endpoint, NodeId};
10use iroh_blobs::{
11    api::{
12        blobs::{BlobReader as Reader, BlobStatus, Blobs},
13        downloader::{Downloader, Shuffled},
14        ExportBaoError, RequestError,
15    },
16    store::{fs::FsStore, mem::MemStore},
17    BlobsProtocol, Hash,
18};
19
20use object_store::ObjectStore as ObjStore;
21
22use crate::{
23    crypto::PublicKey,
24    linked_data::{BlockEncoded, CodecError, DagCborCodec},
25};
26
27// TODO (amiller68): maybe at some point it would make sense
28//  to implement some sort of `BlockStore` trait over BlobStore
29/// Client over a local iroh-blob store.
30///  Exposes an iroh-blobs peer over the endpoint.
31///  Router must handle the iroh-blobs APLN
32/// Also acts as our main BlockStore implemenetation
33///  for bucket, node, and data storage and retrieval
34#[derive(Clone, Debug)]
35pub struct BlobsStore {
36    pub inner: Arc<BlobsProtocol>,
37}
38
39impl Deref for BlobsStore {
40    type Target = Arc<BlobsProtocol>;
41    fn deref(&self) -> &Self::Target {
42        &self.inner
43    }
44}
45
46#[derive(Debug, thiserror::Error)]
47pub enum BlobsStoreError {
48    #[error("blobs store error: {0}")]
49    Default(#[from] anyhow::Error),
50    #[error("blob store i/o error: {0}")]
51    Io(#[from] std::io::Error),
52    #[error("export bao error: {0}")]
53    ExportBao(#[from] ExportBaoError),
54    #[error("request error: {0}")]
55    Request(#[from] RequestError),
56    #[error("decode error: {0}")]
57    Decode(#[from] CodecError),
58    #[error("object store error: {0}")]
59    ObjectStore(#[from] object_store::BlobStoreError),
60}
61
62impl BlobsStore {
63    /// Legacy: load from filesystem using iroh's FsStore directly.
64    pub async fn legacy_fs(path: &Path) -> Result<Self, BlobsStoreError> {
65        tracing::debug!("BlobsStore::legacy_fs called with path: {:?}", path);
66        let store = FsStore::load(path).await?;
67        tracing::debug!("BlobsStore::legacy_fs completed loading FsStore");
68        let blobs = BlobsProtocol::new(&store, None);
69        Ok(Self {
70            inner: Arc::new(blobs),
71        })
72    }
73
74    /// Legacy: in-memory using iroh's MemStore directly.
75    pub async fn legacy_memory() -> Result<Self, BlobsStoreError> {
76        let store = MemStore::new();
77        let blobs = BlobsProtocol::new(&store, None);
78        Ok(Self {
79            inner: Arc::new(blobs),
80        })
81    }
82
83    /// Local filesystem via ObjectStore (SQLite + local object storage).
84    pub async fn fs(data_dir: &Path) -> Result<Self, BlobsStoreError> {
85        let store = ObjStore::new_local(data_dir).await?;
86        Ok(Self::from_store(store.into()))
87    }
88
89    /// In-memory via ObjectStore.
90    pub async fn memory() -> Result<Self, BlobsStoreError> {
91        let store = ObjStore::new_ephemeral().await?;
92        Ok(Self::from_store(store.into()))
93    }
94
95    /// S3-backed via ObjectStore.
96    pub async fn s3(
97        db_path: &Path,
98        endpoint: &str,
99        access_key: &str,
100        secret_key: &str,
101        bucket: &str,
102        region: Option<&str>,
103    ) -> Result<Self, BlobsStoreError> {
104        let store =
105            ObjStore::new_s3(db_path, endpoint, access_key, secret_key, bucket, region).await?;
106        Ok(Self::from_store(store.into()))
107    }
108
109    /// Wrap an existing iroh-blobs Store (kept for flexibility).
110    pub fn from_store(store: iroh_blobs::api::Store) -> Self {
111        let blobs = BlobsProtocol::new(&store, None);
112        Self {
113            inner: Arc::new(blobs),
114        }
115    }
116
117    /// Get a handle to the underlying blobs client against
118    ///  the store
119    pub fn blobs(&self) -> &Blobs {
120        self.inner.store().blobs()
121    }
122
123    /// Get a blob as bytes
124    pub async fn get(&self, hash: &Hash) -> Result<Bytes, BlobsStoreError> {
125        let bytes = self.blobs().get_bytes(*hash).await?;
126        Ok(bytes)
127    }
128
129    /// Get a blob as a block encoded
130    pub async fn get_cbor<T: BlockEncoded<DagCborCodec>>(
131        &self,
132        hash: &Hash,
133    ) -> Result<T, BlobsStoreError> {
134        let bytes = self.blobs().get_bytes(*hash).await?;
135        Ok(T::decode(&bytes)?)
136    }
137
138    /// Get a blob from the store as a reader
139    pub async fn get_reader(&self, hash: Hash) -> Result<Reader, BlobsStoreError> {
140        let reader = self.blobs().reader(hash);
141        Ok(reader)
142    }
143
144    /// Store a stream of bytes as a blob
145    pub async fn put_stream(
146        &self,
147        stream: impl Stream<Item = std::io::Result<Bytes>> + Send + Unpin + 'static + std::marker::Sync,
148    ) -> Result<Hash, BlobsStoreError> {
149        let outcome = self
150            .blobs()
151            .add_stream(stream)
152            .into_future()
153            .await
154            .with_tag()
155            .await?
156            .hash;
157        Ok(outcome)
158    }
159
160    /// Store a vec of bytes as a blob
161    pub async fn put(&self, data: Vec<u8>) -> Result<Hash, BlobsStoreError> {
162        let hash = self.blobs().add_bytes(data).into_future().await?.hash;
163        Ok(hash)
164    }
165
166    /// Get the stat of a blob
167    pub async fn stat(&self, hash: &Hash) -> Result<bool, BlobsStoreError> {
168        let stat = self
169            .blobs()
170            .status(*hash)
171            .await
172            .map_err(|err| BlobsStoreError::Default(anyhow!(err)))?;
173        Ok(matches!(stat, BlobStatus::Complete { .. }))
174    }
175
176    /// Download a single hash from peers
177    ///
178    /// This checks if the hash exists locally first, then downloads if needed.
179    /// Uses the Downloader API with Shuffled content discovery.
180    pub async fn download_hash(
181        &self,
182        hash: Hash,
183        peer_ids: Vec<PublicKey>,
184        endpoint: &Endpoint,
185    ) -> Result<(), BlobsStoreError> {
186        tracing::debug!("download_hash: Checking if hash {} exists locally", hash);
187
188        // Check if we already have this hash
189        if self.stat(&hash).await? {
190            tracing::debug!(
191                "download_hash: Hash {} already exists locally, skipping download",
192                hash
193            );
194            return Ok(());
195        }
196
197        tracing::info!(
198            "download_hash: Downloading hash {} from {} peers: {:?}",
199            hash,
200            peer_ids.len(),
201            peer_ids
202        );
203
204        // Create downloader - needs the Store from BlobsProtocol
205        let downloader = Downloader::new(self.inner.store(), endpoint);
206
207        // Create content discovery with shuffled peers
208        let discovery = Shuffled::new(
209            peer_ids
210                .iter()
211                .map(|peer_id| NodeId::from(*peer_id))
212                .collect(),
213        );
214
215        tracing::debug!(
216            "download_hash: Starting download of hash {} with downloader",
217            hash
218        );
219
220        // Download the hash and wait for completion
221        // DownloadProgress implements IntoFuture, so we can await it directly
222        match downloader.download(hash, discovery).await {
223            Ok(_) => {
224                tracing::info!("download_hash: Successfully downloaded hash {}", hash);
225
226                // Verify it was actually downloaded
227                match self.stat(&hash).await {
228                    Ok(true) => tracing::debug!(
229                        "download_hash: Verified hash {} exists after download",
230                        hash
231                    ),
232                    Ok(false) => {
233                        tracing::error!("download_hash: Hash {} NOT found after download!", hash);
234                        return Err(anyhow!("Hash not found after download").into());
235                    }
236                    Err(e) => {
237                        tracing::error!("download_hash: Error verifying hash {}: {}", hash, e);
238                        return Err(e);
239                    }
240                }
241            }
242            Err(e) => {
243                tracing::error!(
244                    "download_hash: Failed to download hash {} from peers {:?}: {}",
245                    hash,
246                    peer_ids,
247                    e
248                );
249                return Err(e.into());
250            }
251        }
252
253        Ok(())
254    }
255
256    /// Download a hash list (pinset) and all referenced hashes
257    ///
258    /// This first downloads the hash list blob, reads the list of hashes,
259    /// then downloads each referenced hash.
260    pub async fn download_hash_list(
261        &self,
262        hash_list_hash: Hash,
263        peer_ids: Vec<PublicKey>,
264        endpoint: &Endpoint,
265    ) -> Result<(), BlobsStoreError> {
266        tracing::debug!(
267            "download_hash_list: Starting download of hash list {} from {} peers",
268            hash_list_hash,
269            peer_ids.len()
270        );
271
272        // First download the hash list itself
273        tracing::debug!("download_hash_list: Downloading hash list blob itself");
274        self.download_hash(hash_list_hash, peer_ids.clone(), endpoint)
275            .await?;
276        tracing::debug!("download_hash_list: Hash list blob downloaded successfully");
277
278        // Verify it exists
279        match self.stat(&hash_list_hash).await {
280            Ok(true) => tracing::debug!(
281                "download_hash_list: Verified hash list blob {} exists",
282                hash_list_hash
283            ),
284            Ok(false) => {
285                tracing::error!(
286                    "download_hash_list: Hash list blob {} NOT found after download!",
287                    hash_list_hash
288                );
289                return Err(anyhow!("Hash list blob not found after download").into());
290            }
291            Err(e) => {
292                tracing::error!("download_hash_list: Error checking hash list blob: {}", e);
293                return Err(e);
294            }
295        }
296
297        // Read the list of hashes
298        tracing::debug!("download_hash_list: Reading hash list contents");
299        let hashes = self.read_hash_list(hash_list_hash).await?;
300        tracing::info!(
301            "download_hash_list: Hash list contains {} hashes, downloading all...",
302            hashes.len()
303        );
304
305        if hashes.is_empty() {
306            tracing::warn!("download_hash_list: Hash list is EMPTY - no content to download");
307            return Ok(());
308        }
309
310        // Download each hash in the list
311        for (idx, hash) in hashes.iter().enumerate() {
312            tracing::debug!(
313                "download_hash_list: Downloading content hash {}/{}: {:?}",
314                idx + 1,
315                hashes.len(),
316                hash
317            );
318            match self.download_hash(*hash, peer_ids.clone(), endpoint).await {
319                Ok(()) => {
320                    tracing::debug!(
321                        "download_hash_list: Content hash {}/{} downloaded successfully",
322                        idx + 1,
323                        hashes.len()
324                    );
325                }
326                Err(e) => {
327                    tracing::error!(
328                        "download_hash_list: Failed to download content hash {}/{} ({:?}): {}",
329                        idx + 1,
330                        hashes.len(),
331                        hash,
332                        e
333                    );
334                    return Err(e);
335                }
336            }
337        }
338
339        tracing::info!(
340            "download_hash_list: Successfully downloaded all {} hashes from hash list",
341            hashes.len()
342        );
343
344        Ok(())
345    }
346
347    /// Create a simple blob containing a sequence of hashes
348    /// Each hash is 32 bytes, stored consecutively
349    /// Returns the hash of the blob containing all the hashes
350    pub async fn create_hash_list<I>(&self, hashes: I) -> Result<Hash, BlobsStoreError>
351    where
352        I: IntoIterator<Item = Hash>,
353    {
354        // Serialize hashes as raw bytes (32 bytes each, concatenated)
355        let mut data = Vec::new();
356        for hash in hashes {
357            data.extend_from_slice(hash.as_bytes());
358        }
359
360        // Store as a single blob
361        let hash = self.put(data).await?;
362        Ok(hash)
363    }
364
365    /// Read all hashes from a hash list blob
366    /// Returns a Vec of all hashes in the list
367    pub async fn read_hash_list(&self, list_hash: Hash) -> Result<Vec<Hash>, BlobsStoreError> {
368        let mut hashes = Vec::new();
369
370        // Read the blob
371        let data = self.get(&list_hash).await?;
372
373        // Parse hashes (32 bytes each)
374        if data.len() % 32 != 0 {
375            return Err(anyhow!("Invalid hash list: length is not a multiple of 32").into());
376        }
377
378        for chunk in data.chunks_exact(32) {
379            let mut hash_bytes = [0u8; 32];
380            hash_bytes.copy_from_slice(chunk);
381            hashes.push(Hash::from_bytes(hash_bytes));
382        }
383
384        Ok(hashes)
385    }
386}
387
388#[cfg(test)]
389mod tests {
390    use super::*;
391    use bytes::Bytes;
392    use futures::stream;
393    use tempfile::TempDir;
394
395    async fn setup_test_store() -> (BlobsStore, TempDir) {
396        let temp_dir = TempDir::new().unwrap();
397        let blob_path = temp_dir.path().join("blobs");
398
399        let blobs = BlobsStore::fs(&blob_path).await.unwrap();
400        (blobs, temp_dir)
401    }
402
403    #[tokio::test]
404    async fn test_put_and_get() {
405        let (store, _temp) = setup_test_store().await;
406
407        // Test data
408        let data = b"Hello, BlobsStore!";
409
410        // Put data
411        let hash = store.put(data.to_vec()).await.unwrap();
412        assert!(!hash.as_bytes().is_empty());
413
414        // Get data back
415        let retrieved = store.get(&hash).await.unwrap();
416        assert_eq!(retrieved.as_ref(), data);
417    }
418
419    #[tokio::test]
420    async fn test_put_stream() {
421        let (store, _temp) = setup_test_store().await;
422
423        // Create a stream of data
424        let data = b"Streaming data test";
425        let stream =
426            stream::once(async move { Ok::<_, std::io::Error>(Bytes::from(data.to_vec())) });
427
428        // Put stream
429        let hash = store.put_stream(Box::pin(stream)).await.unwrap();
430
431        // Verify we can get it back
432        let retrieved = store.get(&hash).await.unwrap();
433        assert_eq!(retrieved.as_ref(), data);
434    }
435
436    #[tokio::test]
437    async fn test_stat() {
438        let (store, _temp) = setup_test_store().await;
439
440        let data = b"Test data for stat";
441        let hash = store.put(data.to_vec()).await.unwrap();
442
443        // Should exist
444        assert!(store.stat(&hash).await.unwrap());
445
446        // Non-existent hash should not exist
447        let fake_hash = iroh_blobs::Hash::from_bytes([0u8; 32]);
448        assert!(!store.stat(&fake_hash).await.unwrap());
449    }
450
451    #[tokio::test]
452    async fn test_large_data() {
453        let (store, _temp) = setup_test_store().await;
454
455        // Create large data (1MB)
456        let data = vec![42u8; 1024 * 1024];
457
458        // Put and get large data
459        let hash = store.put(data.clone()).await.unwrap();
460        let retrieved = store.get(&hash).await.unwrap();
461
462        assert_eq!(retrieved.len(), data.len());
463        assert_eq!(retrieved.as_ref(), data.as_slice());
464    }
465
466    #[tokio::test]
467    async fn test_multiple_puts() {
468        let (store, _temp) = setup_test_store().await;
469
470        let data1 = b"First data";
471        let data2 = b"Second data";
472        let data3 = b"Third data";
473
474        // Put multiple items
475        let hash1 = store.put(data1.to_vec()).await.unwrap();
476        let hash2 = store.put(data2.to_vec()).await.unwrap();
477        let hash3 = store.put(data3.to_vec()).await.unwrap();
478
479        // Verify all are different hashes
480        assert_ne!(hash1, hash2);
481        assert_ne!(hash2, hash3);
482        assert_ne!(hash1, hash3);
483
484        // Verify all can be retrieved
485        assert_eq!(store.get(&hash1).await.unwrap().as_ref(), data1);
486        assert_eq!(store.get(&hash2).await.unwrap().as_ref(), data2);
487        assert_eq!(store.get(&hash3).await.unwrap().as_ref(), data3);
488    }
489
490    #[tokio::test]
491    async fn test_get_nonexistent() {
492        let (store, _temp) = setup_test_store().await;
493
494        // Try to get non-existent data
495        let fake_hash = iroh_blobs::Hash::from_bytes([99u8; 32]);
496        let result = store.get(&fake_hash).await;
497
498        // Should return an error
499        assert!(result.is_err());
500    }
501}