Skip to main content

common/peer/
blobs_store.rs

1use std::future::IntoFuture;
2use std::ops::Deref;
3use std::path::Path;
4use std::sync::Arc;
5
6use anyhow::anyhow;
7use bytes::Bytes;
8use futures::Stream;
9use iroh::{Endpoint, NodeId};
10use iroh_blobs::{
11    api::{
12        blobs::{BlobReader as Reader, BlobStatus, Blobs},
13        downloader::{Downloader, Shuffled},
14        ExportBaoError, RequestError,
15    },
16    store::{fs::FsStore, mem::MemStore},
17    BlobsProtocol, Hash,
18};
19
20use object_store::ObjectStore as ObjStore;
21
22use crate::{
23    crypto::PublicKey,
24    linked_data::{BlockEncoded, CodecError, DagCborCodec},
25};
26
27// TODO (amiller68): maybe at some point it would make sense
28//  to implement some sort of `BlockStore` trait over BlobStore
29/// Client over a local iroh-blob store.
30///  Exposes an iroh-blobs peer over the endpoint.
31///  Router must handle the iroh-blobs APLN
32/// Also acts as our main BlockStore implemenetation
33///  for bucket, node, and data storage and retrieval
34#[derive(Clone, Debug)]
35pub struct BlobsStore {
36    pub inner: Arc<BlobsProtocol>,
37}
38
39impl Deref for BlobsStore {
40    type Target = Arc<BlobsProtocol>;
41    fn deref(&self) -> &Self::Target {
42        &self.inner
43    }
44}
45
46#[derive(Debug, thiserror::Error)]
47pub enum BlobsStoreError {
48    #[error("blobs store error: {0}")]
49    Default(#[from] anyhow::Error),
50    #[error("blob store i/o error: {0}")]
51    Io(#[from] std::io::Error),
52    #[error("export bao error: {0}")]
53    ExportBao(#[from] ExportBaoError),
54    #[error("request error: {0}")]
55    Request(#[from] RequestError),
56    #[error("decode error: {0}")]
57    Decode(#[from] CodecError),
58    #[error("object store error: {0}")]
59    ObjectStore(#[from] object_store::BlobStoreError),
60}
61
62impl BlobsStore {
63    /// Legacy: load from filesystem using iroh's FsStore directly.
64    pub async fn legacy_fs(path: &Path) -> Result<Self, BlobsStoreError> {
65        tracing::debug!("BlobsStore::legacy_fs called with path: {:?}", path);
66        let store = FsStore::load(path).await?;
67        tracing::debug!("BlobsStore::legacy_fs completed loading FsStore");
68        let blobs = BlobsProtocol::new(&store, None);
69        Ok(Self {
70            inner: Arc::new(blobs),
71        })
72    }
73
74    /// Legacy: in-memory using iroh's MemStore directly.
75    pub async fn legacy_memory() -> Result<Self, BlobsStoreError> {
76        let store = MemStore::new();
77        let blobs = BlobsProtocol::new(&store, None);
78        Ok(Self {
79            inner: Arc::new(blobs),
80        })
81    }
82
83    /// Local filesystem via ObjectStore (SQLite + local object storage).
84    ///
85    /// # Arguments
86    /// * `db_path` - Path to the SQLite database file
87    /// * `objects_path` - Directory for object storage
88    /// * `max_import_size` - Maximum blob size for BAO imports, or None for default (1GB)
89    pub async fn fs(
90        db_path: &Path,
91        objects_path: &Path,
92        max_import_size: Option<u64>,
93    ) -> Result<Self, BlobsStoreError> {
94        let store = ObjStore::new_local(db_path, objects_path, max_import_size).await?;
95        Ok(Self::from_store(store.into()))
96    }
97
98    /// In-memory via ObjectStore.
99    pub async fn memory() -> Result<Self, BlobsStoreError> {
100        let store = ObjStore::new_ephemeral().await?;
101        Ok(Self::from_store(store.into()))
102    }
103
104    /// S3-backed via ObjectStore.
105    ///
106    /// # Arguments
107    /// * `max_import_size` - Maximum blob size for BAO imports, or None for default (1GB)
108    pub async fn s3(
109        db_path: &Path,
110        endpoint: &str,
111        access_key: &str,
112        secret_key: &str,
113        bucket: &str,
114        region: Option<&str>,
115        max_import_size: Option<u64>,
116    ) -> Result<Self, BlobsStoreError> {
117        let store = ObjStore::new_s3(
118            db_path,
119            endpoint,
120            access_key,
121            secret_key,
122            bucket,
123            region,
124            max_import_size,
125        )
126        .await?;
127        Ok(Self::from_store(store.into()))
128    }
129
130    /// Wrap an existing iroh-blobs Store (kept for flexibility).
131    pub fn from_store(store: iroh_blobs::api::Store) -> Self {
132        let blobs = BlobsProtocol::new(&store, None);
133        Self {
134            inner: Arc::new(blobs),
135        }
136    }
137
138    /// Get a handle to the underlying blobs client against
139    ///  the store
140    pub fn blobs(&self) -> &Blobs {
141        self.inner.store().blobs()
142    }
143
144    /// Get a blob as bytes
145    pub async fn get(&self, hash: &Hash) -> Result<Bytes, BlobsStoreError> {
146        let bytes = self.blobs().get_bytes(*hash).await?;
147        Ok(bytes)
148    }
149
150    /// Get a blob as a block encoded
151    pub async fn get_cbor<T: BlockEncoded<DagCborCodec>>(
152        &self,
153        hash: &Hash,
154    ) -> Result<T, BlobsStoreError> {
155        let bytes = self.blobs().get_bytes(*hash).await?;
156        Ok(T::decode(&bytes)?)
157    }
158
159    /// Get a blob from the store as a reader
160    pub async fn get_reader(&self, hash: Hash) -> Result<Reader, BlobsStoreError> {
161        let reader = self.blobs().reader(hash);
162        Ok(reader)
163    }
164
165    /// Store a stream of bytes as a blob
166    pub async fn put_stream(
167        &self,
168        stream: impl Stream<Item = std::io::Result<Bytes>> + Send + Unpin + 'static + std::marker::Sync,
169    ) -> Result<Hash, BlobsStoreError> {
170        let outcome = self
171            .blobs()
172            .add_stream(stream)
173            .into_future()
174            .await
175            .with_tag()
176            .await?
177            .hash;
178        Ok(outcome)
179    }
180
181    /// Store a vec of bytes as a blob
182    pub async fn put(&self, data: Vec<u8>) -> Result<Hash, BlobsStoreError> {
183        let hash = self.blobs().add_bytes(data).into_future().await?.hash;
184        Ok(hash)
185    }
186
187    /// Get the stat of a blob
188    pub async fn stat(&self, hash: &Hash) -> Result<bool, BlobsStoreError> {
189        let stat = self
190            .blobs()
191            .status(*hash)
192            .await
193            .map_err(|err| BlobsStoreError::Default(anyhow!(err)))?;
194        Ok(matches!(stat, BlobStatus::Complete { .. }))
195    }
196
197    /// Download a single hash from peers
198    ///
199    /// This checks if the hash exists locally first, then downloads if needed.
200    /// Uses the Downloader API with Shuffled content discovery.
201    pub async fn download_hash(
202        &self,
203        hash: Hash,
204        peer_ids: Vec<PublicKey>,
205        endpoint: &Endpoint,
206    ) -> Result<(), BlobsStoreError> {
207        tracing::debug!("download_hash: Checking if hash {} exists locally", hash);
208
209        // Check if we already have this hash
210        if self.stat(&hash).await? {
211            tracing::debug!(
212                "download_hash: Hash {} already exists locally, skipping download",
213                hash
214            );
215            return Ok(());
216        }
217
218        tracing::info!(
219            "download_hash: Downloading hash {} from {} peers: {:?}",
220            hash,
221            peer_ids.len(),
222            peer_ids
223        );
224
225        // Create downloader - needs the Store from BlobsProtocol
226        let downloader = Downloader::new(self.inner.store(), endpoint);
227
228        // Create content discovery with shuffled peers
229        let discovery = Shuffled::new(
230            peer_ids
231                .iter()
232                .map(|peer_id| NodeId::from(*peer_id))
233                .collect(),
234        );
235
236        tracing::debug!(
237            "download_hash: Starting download of hash {} with downloader",
238            hash
239        );
240
241        // Download the hash and wait for completion
242        // DownloadProgress implements IntoFuture, so we can await it directly
243        match downloader.download(hash, discovery).await {
244            Ok(_) => {
245                tracing::info!("download_hash: Successfully downloaded hash {}", hash);
246
247                // Verify it was actually downloaded
248                match self.stat(&hash).await {
249                    Ok(true) => tracing::debug!(
250                        "download_hash: Verified hash {} exists after download",
251                        hash
252                    ),
253                    Ok(false) => {
254                        tracing::error!("download_hash: Hash {} NOT found after download!", hash);
255                        return Err(anyhow!("Hash not found after download").into());
256                    }
257                    Err(e) => {
258                        tracing::error!("download_hash: Error verifying hash {}: {}", hash, e);
259                        return Err(e);
260                    }
261                }
262            }
263            Err(e) => {
264                tracing::error!(
265                    "download_hash: Failed to download hash {} from peers {:?}: {}",
266                    hash,
267                    peer_ids,
268                    e
269                );
270                return Err(e.into());
271            }
272        }
273
274        Ok(())
275    }
276
277    /// Download a hash list (pinset) and all referenced hashes
278    ///
279    /// This first downloads the hash list blob, reads the list of hashes,
280    /// then downloads each referenced hash.
281    pub async fn download_hash_list(
282        &self,
283        hash_list_hash: Hash,
284        peer_ids: Vec<PublicKey>,
285        endpoint: &Endpoint,
286    ) -> Result<(), BlobsStoreError> {
287        tracing::debug!(
288            "download_hash_list: Starting download of hash list {} from {} peers",
289            hash_list_hash,
290            peer_ids.len()
291        );
292
293        // First download the hash list itself
294        tracing::debug!("download_hash_list: Downloading hash list blob itself");
295        self.download_hash(hash_list_hash, peer_ids.clone(), endpoint)
296            .await?;
297        tracing::debug!("download_hash_list: Hash list blob downloaded successfully");
298
299        // Verify it exists
300        match self.stat(&hash_list_hash).await {
301            Ok(true) => tracing::debug!(
302                "download_hash_list: Verified hash list blob {} exists",
303                hash_list_hash
304            ),
305            Ok(false) => {
306                tracing::error!(
307                    "download_hash_list: Hash list blob {} NOT found after download!",
308                    hash_list_hash
309                );
310                return Err(anyhow!("Hash list blob not found after download").into());
311            }
312            Err(e) => {
313                tracing::error!("download_hash_list: Error checking hash list blob: {}", e);
314                return Err(e);
315            }
316        }
317
318        // Read the list of hashes
319        tracing::debug!("download_hash_list: Reading hash list contents");
320        let hashes = self.read_hash_list(hash_list_hash).await?;
321        tracing::info!(
322            "download_hash_list: Hash list contains {} hashes, downloading all...",
323            hashes.len()
324        );
325
326        if hashes.is_empty() {
327            tracing::warn!("download_hash_list: Hash list is EMPTY - no content to download");
328            return Ok(());
329        }
330
331        // Download each hash in the list
332        for (idx, hash) in hashes.iter().enumerate() {
333            tracing::debug!(
334                "download_hash_list: Downloading content hash {}/{}: {:?}",
335                idx + 1,
336                hashes.len(),
337                hash
338            );
339            match self.download_hash(*hash, peer_ids.clone(), endpoint).await {
340                Ok(()) => {
341                    tracing::debug!(
342                        "download_hash_list: Content hash {}/{} downloaded successfully",
343                        idx + 1,
344                        hashes.len()
345                    );
346                }
347                Err(e) => {
348                    tracing::error!(
349                        "download_hash_list: Failed to download content hash {}/{} ({:?}): {}",
350                        idx + 1,
351                        hashes.len(),
352                        hash,
353                        e
354                    );
355                    return Err(e);
356                }
357            }
358        }
359
360        tracing::info!(
361            "download_hash_list: Successfully downloaded all {} hashes from hash list",
362            hashes.len()
363        );
364
365        Ok(())
366    }
367
368    /// Create a simple blob containing a sequence of hashes
369    /// Each hash is 32 bytes, stored consecutively
370    /// Returns the hash of the blob containing all the hashes
371    pub async fn create_hash_list<I>(&self, hashes: I) -> Result<Hash, BlobsStoreError>
372    where
373        I: IntoIterator<Item = Hash>,
374    {
375        // Serialize hashes as raw bytes (32 bytes each, concatenated)
376        let mut data = Vec::new();
377        for hash in hashes {
378            data.extend_from_slice(hash.as_bytes());
379        }
380
381        // Store as a single blob
382        let hash = self.put(data).await?;
383        Ok(hash)
384    }
385
386    /// Read all hashes from a hash list blob
387    /// Returns a Vec of all hashes in the list
388    pub async fn read_hash_list(&self, list_hash: Hash) -> Result<Vec<Hash>, BlobsStoreError> {
389        let mut hashes = Vec::new();
390
391        // Read the blob
392        let data = self.get(&list_hash).await?;
393
394        // Parse hashes (32 bytes each)
395        if data.len() % 32 != 0 {
396            return Err(anyhow!("Invalid hash list: length is not a multiple of 32").into());
397        }
398
399        for chunk in data.chunks_exact(32) {
400            let mut hash_bytes = [0u8; 32];
401            hash_bytes.copy_from_slice(chunk);
402            hashes.push(Hash::from_bytes(hash_bytes));
403        }
404
405        Ok(hashes)
406    }
407}
408
409#[cfg(test)]
410mod tests {
411    use super::*;
412    use bytes::Bytes;
413    use futures::stream;
414    use tempfile::TempDir;
415
416    async fn setup_test_store() -> (BlobsStore, TempDir) {
417        let temp_dir = TempDir::new().unwrap();
418        let db_path = temp_dir.path().join("blobs.db");
419        let objects_path = temp_dir.path().join("objects");
420
421        let blobs = BlobsStore::fs(&db_path, &objects_path, None).await.unwrap();
422        (blobs, temp_dir)
423    }
424
425    #[tokio::test]
426    async fn test_put_and_get() {
427        let (store, _temp) = setup_test_store().await;
428
429        // Test data
430        let data = b"Hello, BlobsStore!";
431
432        // Put data
433        let hash = store.put(data.to_vec()).await.unwrap();
434        assert!(!hash.as_bytes().is_empty());
435
436        // Get data back
437        let retrieved = store.get(&hash).await.unwrap();
438        assert_eq!(retrieved.as_ref(), data);
439    }
440
441    #[tokio::test]
442    async fn test_put_stream() {
443        let (store, _temp) = setup_test_store().await;
444
445        // Create a stream of data
446        let data = b"Streaming data test";
447        let stream =
448            stream::once(async move { Ok::<_, std::io::Error>(Bytes::from(data.to_vec())) });
449
450        // Put stream
451        let hash = store.put_stream(Box::pin(stream)).await.unwrap();
452
453        // Verify we can get it back
454        let retrieved = store.get(&hash).await.unwrap();
455        assert_eq!(retrieved.as_ref(), data);
456    }
457
458    #[tokio::test]
459    async fn test_stat() {
460        let (store, _temp) = setup_test_store().await;
461
462        let data = b"Test data for stat";
463        let hash = store.put(data.to_vec()).await.unwrap();
464
465        // Should exist
466        assert!(store.stat(&hash).await.unwrap());
467
468        // Non-existent hash should not exist
469        let fake_hash = iroh_blobs::Hash::from_bytes([0u8; 32]);
470        assert!(!store.stat(&fake_hash).await.unwrap());
471    }
472
473    #[tokio::test]
474    async fn test_large_data() {
475        let (store, _temp) = setup_test_store().await;
476
477        // Create large data (1MB)
478        let data = vec![42u8; 1024 * 1024];
479
480        // Put and get large data
481        let hash = store.put(data.clone()).await.unwrap();
482        let retrieved = store.get(&hash).await.unwrap();
483
484        assert_eq!(retrieved.len(), data.len());
485        assert_eq!(retrieved.as_ref(), data.as_slice());
486    }
487
488    #[tokio::test]
489    async fn test_multiple_puts() {
490        let (store, _temp) = setup_test_store().await;
491
492        let data1 = b"First data";
493        let data2 = b"Second data";
494        let data3 = b"Third data";
495
496        // Put multiple items
497        let hash1 = store.put(data1.to_vec()).await.unwrap();
498        let hash2 = store.put(data2.to_vec()).await.unwrap();
499        let hash3 = store.put(data3.to_vec()).await.unwrap();
500
501        // Verify all are different hashes
502        assert_ne!(hash1, hash2);
503        assert_ne!(hash2, hash3);
504        assert_ne!(hash1, hash3);
505
506        // Verify all can be retrieved
507        assert_eq!(store.get(&hash1).await.unwrap().as_ref(), data1);
508        assert_eq!(store.get(&hash2).await.unwrap().as_ref(), data2);
509        assert_eq!(store.get(&hash3).await.unwrap().as_ref(), data3);
510    }
511
512    #[tokio::test]
513    async fn test_get_nonexistent() {
514        let (store, _temp) = setup_test_store().await;
515
516        // Try to get non-existent data
517        let fake_hash = iroh_blobs::Hash::from_bytes([99u8; 32]);
518        let result = store.get(&fake_hash).await;
519
520        // Should return an error
521        assert!(result.is_err());
522    }
523}