p2panda_blobs/
download.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3use anyhow::{Result, ensure};
4use futures_lite::{Stream, StreamExt};
5use iroh::NodeAddr;
6use iroh_blobs::downloader::{DownloadRequest, Downloader};
7use iroh_blobs::get::Stats;
8use iroh_blobs::get::db::DownloadProgress;
9use iroh_blobs::util::local_pool::LocalPoolHandle;
10use iroh_blobs::util::progress::{AsyncChannelProgressSender, ProgressSender};
11use iroh_blobs::{BlobFormat, Hash as IrohHash, HashAndFormat};
12use p2panda_core::Hash;
13use p2panda_net::{Network, TopicId};
14use p2panda_sync::TopicQuery;
15use serde::{Deserialize, Serialize};
16use serde_error::Error as RpcError;
17
18use crate::from_node_addr;
19
20/// Status of a blob download attempt.
21#[derive(Debug, Clone, Serialize, Deserialize)]
22pub enum DownloadBlobEvent {
23    Done,
24    Abort(RpcError),
25}
26
27pub(crate) async fn download_blob<T: TopicQuery + TopicId + 'static>(
28    network: Network<T>,
29    downloader: Downloader,
30    pool_handle: LocalPoolHandle,
31    hash: Hash,
32) -> impl Stream<Item = DownloadBlobEvent> {
33    let (sender, receiver) = async_channel::bounded(1024);
34    let progress = AsyncChannelProgressSender::new(sender);
35    let hash_and_format = HashAndFormat {
36        hash: IrohHash::from_bytes(*hash.as_bytes()),
37        format: BlobFormat::Raw,
38    };
39
40    pool_handle.spawn_detached(move || async move {
41        match download_queued(network, &downloader, hash_and_format, progress.clone()).await {
42            Ok(stats) => {
43                progress.send(DownloadProgress::AllDone(stats)).await.ok();
44            }
45            Err(err) => {
46                progress
47                    .send(DownloadProgress::Abort(RpcError::new(&*err)))
48                    .await
49                    .ok();
50            }
51        }
52    });
53
54    receiver.filter_map(|event| match event {
55        DownloadProgress::AllDone(_) => Some(DownloadBlobEvent::Done),
56        // @TODO: Use own error type here
57        DownloadProgress::Abort(err) => Some(DownloadBlobEvent::Abort(err)),
58        _ => {
59            // @TODO: Add more event types
60            None
61        }
62    })
63}
64
65async fn download_queued<T: TopicQuery + TopicId + 'static>(
66    network: Network<T>,
67    downloader: &Downloader,
68    hash_and_format: HashAndFormat,
69    progress: AsyncChannelProgressSender<DownloadProgress>,
70) -> Result<Stats> {
71    let addrs = network.known_peers().await?;
72    ensure!(!addrs.is_empty(), "no way to reach a node for download");
73
74    let iroh_addrs: Vec<NodeAddr> = addrs
75        .iter()
76        .map(|addr| from_node_addr(addr.clone()))
77        .collect();
78    let req = DownloadRequest::new(hash_and_format, iroh_addrs).progress_sender(progress);
79    let handle = downloader.queue(req).await;
80
81    let stats = handle.await?;
82    Ok(stats)
83}