use anyhow::{Result, ensure};
use futures_lite::{Stream, StreamExt};
use iroh::NodeAddr;
use iroh_blobs::downloader::{DownloadRequest, Downloader};
use iroh_blobs::get::Stats;
use iroh_blobs::get::db::DownloadProgress;
use iroh_blobs::util::local_pool::LocalPoolHandle;
use iroh_blobs::util::progress::{AsyncChannelProgressSender, ProgressSender};
use iroh_blobs::{BlobFormat, Hash as IrohHash, HashAndFormat};
use p2panda_core::Hash;
use p2panda_net::{Network, TopicId};
use p2panda_sync::TopicQuery;
use serde::{Deserialize, Serialize};
use serde_error::Error as RpcError;
use crate::from_node_addr;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DownloadBlobEvent {
Done,
Abort(RpcError),
}
pub(crate) async fn download_blob<T: TopicQuery + TopicId + 'static>(
network: Network<T>,
downloader: Downloader,
pool_handle: LocalPoolHandle,
hash: Hash,
) -> impl Stream<Item = DownloadBlobEvent> {
let (sender, receiver) = async_channel::bounded(1024);
let progress = AsyncChannelProgressSender::new(sender);
let hash_and_format = HashAndFormat {
hash: IrohHash::from_bytes(*hash.as_bytes()),
format: BlobFormat::Raw,
};
pool_handle.spawn_detached(move || async move {
match download_queued(network, &downloader, hash_and_format, progress.clone()).await {
Ok(stats) => {
progress.send(DownloadProgress::AllDone(stats)).await.ok();
}
Err(err) => {
progress
.send(DownloadProgress::Abort(RpcError::new(&*err)))
.await
.ok();
}
}
});
receiver.filter_map(|event| match event {
DownloadProgress::AllDone(_) => Some(DownloadBlobEvent::Done),
DownloadProgress::Abort(err) => Some(DownloadBlobEvent::Abort(err)),
_ => {
None
}
})
}
async fn download_queued<T: TopicQuery + TopicId + 'static>(
network: Network<T>,
downloader: &Downloader,
hash_and_format: HashAndFormat,
progress: AsyncChannelProgressSender<DownloadProgress>,
) -> Result<Stats> {
let addrs = network.known_peers().await?;
ensure!(!addrs.is_empty(), "no way to reach a node for download");
let iroh_addrs: Vec<NodeAddr> = addrs
.iter()
.map(|addr| from_node_addr(addr.clone()))
.collect();
let req = DownloadRequest::new(hash_and_format, iroh_addrs).progress_sender(progress);
let handle = downloader.queue(req).await;
let stats = handle.await?;
Ok(stats)
}