p2panda_blobs/
download.rs1use anyhow::{ensure, Result};
4use futures_lite::{Stream, StreamExt};
5use iroh_blobs::downloader::{DownloadRequest, Downloader};
6use iroh_blobs::get::db::DownloadProgress;
7use iroh_blobs::get::Stats;
8use iroh_blobs::util::local_pool::LocalPoolHandle;
9use iroh_blobs::util::progress::{AsyncChannelProgressSender, ProgressSender};
10use iroh_blobs::{BlobFormat, Hash as IrohHash, HashAndFormat};
11use p2panda_core::Hash;
12use p2panda_net::{Network, TopicId};
13use p2panda_sync::TopicQuery;
14use serde::{Deserialize, Serialize};
15use serde_error::Error as RpcError;
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
19pub enum DownloadBlobEvent {
20 Done,
21 Abort(RpcError),
22}
23
24pub(crate) async fn download_blob<T: TopicQuery + TopicId + 'static>(
25 network: Network<T>,
26 downloader: Downloader,
27 pool_handle: LocalPoolHandle,
28 hash: Hash,
29) -> impl Stream<Item = DownloadBlobEvent> {
30 let (sender, receiver) = async_channel::bounded(1024);
31 let progress = AsyncChannelProgressSender::new(sender);
32 let hash_and_format = HashAndFormat {
33 hash: IrohHash::from_bytes(*hash.as_bytes()),
34 format: BlobFormat::Raw,
35 };
36
37 pool_handle.spawn_detached(move || async move {
38 match download_queued(network, &downloader, hash_and_format, progress.clone()).await {
39 Ok(stats) => {
40 progress.send(DownloadProgress::AllDone(stats)).await.ok();
41 }
42 Err(err) => {
43 progress
44 .send(DownloadProgress::Abort(RpcError::new(&*err)))
45 .await
46 .ok();
47 }
48 }
49 });
50
51 receiver.filter_map(|event| match event {
52 DownloadProgress::AllDone(_) => Some(DownloadBlobEvent::Done),
53 DownloadProgress::Abort(err) => Some(DownloadBlobEvent::Abort(err)),
55 _ => {
56 None
58 }
59 })
60}
61
62async fn download_queued<T: TopicQuery + TopicId + 'static>(
63 network: Network<T>,
64 downloader: &Downloader,
65 hash_and_format: HashAndFormat,
66 progress: AsyncChannelProgressSender<DownloadProgress>,
67) -> Result<Stats> {
68 let addrs = network.known_peers().await?;
69 ensure!(!addrs.is_empty(), "no way to reach a node for download");
70
71 let req = DownloadRequest::new(hash_and_format, addrs).progress_sender(progress);
72 let handle = downloader.queue(req).await;
73
74 let stats = handle.await?;
75 Ok(stats)
76}