p2panda_blobs/
blobs.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3use std::io;
4use std::path::PathBuf;
5
6use anyhow::Result;
7use bytes::Bytes;
8use futures_util::Stream;
9use iroh_blobs::Hash as IrohHash;
10use iroh_blobs::downloader::Downloader;
11use iroh_blobs::store::{Map, Store};
12use iroh_blobs::util::local_pool::{Config as LocalPoolConfig, LocalPool};
13use p2panda_core::Hash;
14use p2panda_net::{Network, NetworkBuilder, TopicId};
15use p2panda_sync::TopicQuery;
16
17use crate::DownloadBlobEvent;
18use crate::config::Config;
19use crate::download::download_blob;
20use crate::export::export_blob;
21use crate::import::{ImportBlobEvent, import_blob, import_blob_from_stream};
22use crate::protocol::{BLOBS_ALPN, BlobsProtocol};
23
24/// Blobs service offering storage, retrieval and synchronisation of content-addressed data.
25#[derive(Debug)]
26pub struct Blobs<T, S>
27where
28    S: Store,
29{
30    downloader: Downloader,
31    network: Network<T>,
32    rt: LocalPool,
33    store: S,
34}
35
36impl<T, S> Blobs<T, S>
37where
38    T: TopicQuery + TopicId + 'static,
39    S: Store,
40{
41    /// Returns a new instance of `Blobs` using the given `NetworkBuilder` and store
42    /// implementation.
43    pub async fn from_builder(
44        network_builder: NetworkBuilder<T>,
45        store: S,
46    ) -> Result<(Network<T>, Self)> {
47        Blobs::from_builder_with_config(network_builder, store, Config::default()).await
48    }
49
50    /// Returns a new instance of `Blobs` using the given `NetworkBuilder`, store
51    /// implementation and configuration.
52    pub async fn from_builder_with_config(
53        network_builder: NetworkBuilder<T>,
54        store: S,
55        config: Config,
56    ) -> Result<(Network<T>, Self)> {
57        // Calls `num_cpus::get()` to define thread count.
58        let local_pool_config = LocalPoolConfig::default();
59        let local_pool = LocalPool::new(local_pool_config);
60
61        let network = network_builder
62            .protocol(
63                BLOBS_ALPN,
64                BlobsProtocol::new(store.clone(), local_pool.handle().clone()),
65            )
66            .build()
67            .await?;
68
69        let downloader = Downloader::with_config(
70            store.clone(),
71            network.endpoint().clone(),
72            local_pool.handle().clone(),
73            config.clone().into(),
74            config.into(),
75        );
76
77        let blobs = Self {
78            downloader,
79            network: network.clone(),
80            rt: local_pool,
81            store,
82        };
83
84        Ok((network, blobs))
85    }
86
87    /// Get an entry for a hash.
88    ///
89    /// The entry provides access to metadata and methods for retrieving the actual blob data.
90    pub async fn get(&self, hash: Hash) -> anyhow::Result<Option<<S as Map>::Entry>> {
91        let hash = IrohHash::from_bytes(*hash.as_bytes());
92        let entry = self.store.get(&hash).await?;
93        Ok(entry)
94    }
95
96    /// Import a blob from the given path.
97    pub async fn import_blob(&self, path: PathBuf) -> impl Stream<Item = ImportBlobEvent> {
98        import_blob(self.store.clone(), self.rt.handle().clone(), path).await
99    }
100
101    /// Import a blob from the given stream.
102    pub async fn import_blob_from_stream<D>(&self, data: D) -> impl Stream<Item = ImportBlobEvent>
103    where
104        D: Stream<Item = io::Result<Bytes>> + Send + Unpin + 'static,
105    {
106        import_blob_from_stream(self.store.clone(), self.rt.handle().clone(), data).await
107    }
108
109    /// Download a blob from a network peer.
110    pub async fn download_blob(&self, hash: Hash) -> impl Stream<Item = DownloadBlobEvent> {
111        download_blob(
112            self.network.clone(),
113            self.downloader.clone(),
114            self.rt.handle().clone(),
115            hash,
116        )
117        .await
118    }
119
120    /// Export a blob to the given filesystem path.
121    pub async fn export_blob(&self, hash: Hash, path: &PathBuf) -> Result<()> {
122        export_blob(&self.store, hash, path).await?;
123        Ok(())
124    }
125}