1use std::io;
4use std::path::PathBuf;
5
6use anyhow::Result;
7use bytes::Bytes;
8use futures_util::Stream;
9use iroh_blobs::Hash as IrohHash;
10use iroh_blobs::downloader::Downloader;
11use iroh_blobs::store::{Map, Store};
12use iroh_blobs::util::local_pool::{Config as LocalPoolConfig, LocalPool};
13use p2panda_core::Hash;
14use p2panda_net::{Network, NetworkBuilder, TopicId};
15use p2panda_sync::TopicQuery;
16
17use crate::DownloadBlobEvent;
18use crate::config::Config;
19use crate::download::download_blob;
20use crate::export::export_blob;
21use crate::import::{ImportBlobEvent, import_blob, import_blob_from_stream};
22use crate::protocol::{BLOBS_ALPN, BlobsProtocol};
23
24#[derive(Debug)]
26pub struct Blobs<T, S>
27where
28 S: Store,
29{
30 downloader: Downloader,
31 network: Network<T>,
32 rt: LocalPool,
33 store: S,
34}
35
36impl<T, S> Blobs<T, S>
37where
38 T: TopicQuery + TopicId + 'static,
39 S: Store,
40{
41 pub async fn from_builder(
44 network_builder: NetworkBuilder<T>,
45 store: S,
46 ) -> Result<(Network<T>, Self)> {
47 Blobs::from_builder_with_config(network_builder, store, Config::default()).await
48 }
49
50 pub async fn from_builder_with_config(
53 network_builder: NetworkBuilder<T>,
54 store: S,
55 config: Config,
56 ) -> Result<(Network<T>, Self)> {
57 let local_pool_config = LocalPoolConfig::default();
59 let local_pool = LocalPool::new(local_pool_config);
60
61 let network = network_builder
62 .protocol(
63 BLOBS_ALPN,
64 BlobsProtocol::new(store.clone(), local_pool.handle().clone()),
65 )
66 .build()
67 .await?;
68
69 let downloader = Downloader::with_config(
70 store.clone(),
71 network.endpoint().clone(),
72 local_pool.handle().clone(),
73 config.clone().into(),
74 config.into(),
75 );
76
77 let blobs = Self {
78 downloader,
79 network: network.clone(),
80 rt: local_pool,
81 store,
82 };
83
84 Ok((network, blobs))
85 }
86
87 pub async fn get(&self, hash: Hash) -> anyhow::Result<Option<<S as Map>::Entry>> {
91 let hash = IrohHash::from_bytes(*hash.as_bytes());
92 let entry = self.store.get(&hash).await?;
93 Ok(entry)
94 }
95
96 pub async fn import_blob(&self, path: PathBuf) -> impl Stream<Item = ImportBlobEvent> {
98 import_blob(self.store.clone(), self.rt.handle().clone(), path).await
99 }
100
101 pub async fn import_blob_from_stream<D>(&self, data: D) -> impl Stream<Item = ImportBlobEvent>
103 where
104 D: Stream<Item = io::Result<Bytes>> + Send + Unpin + 'static,
105 {
106 import_blob_from_stream(self.store.clone(), self.rt.handle().clone(), data).await
107 }
108
109 pub async fn download_blob(&self, hash: Hash) -> impl Stream<Item = DownloadBlobEvent> {
111 download_blob(
112 self.network.clone(),
113 self.downloader.clone(),
114 self.rt.handle().clone(),
115 hash,
116 )
117 .await
118 }
119
120 pub async fn export_blob(&self, hash: Hash, path: &PathBuf) -> Result<()> {
122 export_blob(&self.store, hash, path).await?;
123 Ok(())
124 }
125}