p2panda_blobs/
download.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3use anyhow::{ensure, Result};
4use futures_lite::{Stream, StreamExt};
5use iroh_blobs::downloader::{DownloadRequest, Downloader};
6use iroh_blobs::get::db::DownloadProgress;
7use iroh_blobs::get::Stats;
8use iroh_blobs::util::local_pool::LocalPoolHandle;
9use iroh_blobs::util::progress::{AsyncChannelProgressSender, ProgressSender};
10use iroh_blobs::{BlobFormat, Hash as IrohHash, HashAndFormat};
11use p2panda_core::Hash;
12use p2panda_net::{Network, TopicId};
13use p2panda_sync::TopicQuery;
14use serde::{Deserialize, Serialize};
15use serde_error::Error as RpcError;
16
17/// Status of a blob download attempt.
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub enum DownloadBlobEvent {
20    Done,
21    Abort(RpcError),
22}
23
24pub(crate) async fn download_blob<T: TopicQuery + TopicId + 'static>(
25    network: Network<T>,
26    downloader: Downloader,
27    pool_handle: LocalPoolHandle,
28    hash: Hash,
29) -> impl Stream<Item = DownloadBlobEvent> {
30    let (sender, receiver) = async_channel::bounded(1024);
31    let progress = AsyncChannelProgressSender::new(sender);
32    let hash_and_format = HashAndFormat {
33        hash: IrohHash::from_bytes(*hash.as_bytes()),
34        format: BlobFormat::Raw,
35    };
36
37    pool_handle.spawn_detached(move || async move {
38        match download_queued(network, &downloader, hash_and_format, progress.clone()).await {
39            Ok(stats) => {
40                progress.send(DownloadProgress::AllDone(stats)).await.ok();
41            }
42            Err(err) => {
43                progress
44                    .send(DownloadProgress::Abort(RpcError::new(&*err)))
45                    .await
46                    .ok();
47            }
48        }
49    });
50
51    receiver.filter_map(|event| match event {
52        DownloadProgress::AllDone(_) => Some(DownloadBlobEvent::Done),
53        // @TODO: Use own error type here
54        DownloadProgress::Abort(err) => Some(DownloadBlobEvent::Abort(err)),
55        _ => {
56            // @TODO: Add more event types
57            None
58        }
59    })
60}
61
62async fn download_queued<T: TopicQuery + TopicId + 'static>(
63    network: Network<T>,
64    downloader: &Downloader,
65    hash_and_format: HashAndFormat,
66    progress: AsyncChannelProgressSender<DownloadProgress>,
67) -> Result<Stats> {
68    let addrs = network.known_peers().await?;
69    ensure!(!addrs.is_empty(), "no way to reach a node for download");
70
71    let req = DownloadRequest::new(hash_and_format, addrs).progress_sender(progress);
72    let handle = downloader.queue(req).await;
73
74    let stats = handle.await?;
75    Ok(stats)
76}