p2panda_blobs/
download.rs1use anyhow::{Result, ensure};
4use futures_lite::{Stream, StreamExt};
5use iroh::NodeAddr;
6use iroh_blobs::downloader::{DownloadRequest, Downloader};
7use iroh_blobs::get::Stats;
8use iroh_blobs::get::db::DownloadProgress;
9use iroh_blobs::util::local_pool::LocalPoolHandle;
10use iroh_blobs::util::progress::{AsyncChannelProgressSender, ProgressSender};
11use iroh_blobs::{BlobFormat, Hash as IrohHash, HashAndFormat};
12use p2panda_core::Hash;
13use p2panda_net::{Network, TopicId};
14use p2panda_sync::TopicQuery;
15use serde::{Deserialize, Serialize};
16use serde_error::Error as RpcError;
17
18use crate::from_node_addr;
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
22pub enum DownloadBlobEvent {
23 Done,
24 Abort(RpcError),
25}
26
27pub(crate) async fn download_blob<T: TopicQuery + TopicId + 'static>(
28 network: Network<T>,
29 downloader: Downloader,
30 pool_handle: LocalPoolHandle,
31 hash: Hash,
32) -> impl Stream<Item = DownloadBlobEvent> {
33 let (sender, receiver) = async_channel::bounded(1024);
34 let progress = AsyncChannelProgressSender::new(sender);
35 let hash_and_format = HashAndFormat {
36 hash: IrohHash::from_bytes(*hash.as_bytes()),
37 format: BlobFormat::Raw,
38 };
39
40 pool_handle.spawn_detached(move || async move {
41 match download_queued(network, &downloader, hash_and_format, progress.clone()).await {
42 Ok(stats) => {
43 progress.send(DownloadProgress::AllDone(stats)).await.ok();
44 }
45 Err(err) => {
46 progress
47 .send(DownloadProgress::Abort(RpcError::new(&*err)))
48 .await
49 .ok();
50 }
51 }
52 });
53
54 receiver.filter_map(|event| match event {
55 DownloadProgress::AllDone(_) => Some(DownloadBlobEvent::Done),
56 DownloadProgress::Abort(err) => Some(DownloadBlobEvent::Abort(err)),
58 _ => {
59 None
61 }
62 })
63}
64
65async fn download_queued<T: TopicQuery + TopicId + 'static>(
66 network: Network<T>,
67 downloader: &Downloader,
68 hash_and_format: HashAndFormat,
69 progress: AsyncChannelProgressSender<DownloadProgress>,
70) -> Result<Stats> {
71 let addrs = network.known_peers().await?;
72 ensure!(!addrs.is_empty(), "no way to reach a node for download");
73
74 let iroh_addrs: Vec<NodeAddr> = addrs
75 .iter()
76 .map(|addr| from_node_addr(addr.clone()))
77 .collect();
78 let req = DownloadRequest::new(hash_and_format, iroh_addrs).progress_sender(progress);
79 let handle = downloader.queue(req).await;
80
81 let stats = handle.await?;
82 Ok(stats)
83}