use std::{future::Future, sync::Arc};
use bytes::Bytes;
use futures::{
future::{ready, Either},
stream::{once, unfold},
FutureExt, Stream, StreamExt,
};
use reqwest::{Client, Response, Url};
use tokio::io::{AsyncWrite, AsyncWriteExt};
use crate::{errors::DownloadError, net::file_url};
pub trait Download {
type Err<'dst>;
type Fut<'dst>: Future<Output = Result<(), Self::Err<'dst>>> + Send;
fn download_file<'dst>(
&self,
path: &str,
destination: &'dst mut (dyn AsyncWrite + Unpin + Send),
) -> Self::Fut<'dst>;
type StreamErr;
type Stream: Stream<Item = Result<Bytes, Self::StreamErr>> + Send;
fn download_file_stream(&self, path: &str) -> Self::Stream;
}
pub fn download_file<'o, D>(
client: &Client,
api_url: Url,
token: &str,
path: &str,
dst: &'o mut D,
) -> impl Future<Output = Result<(), DownloadError>> + 'o
where
D: ?Sized + AsyncWrite + Unpin,
{
client.get(file_url(api_url, token, path)).send().then(move |r| async move {
let mut res = r?.error_for_status()?;
while let Some(chunk) = res.chunk().await? {
dst.write_all(&chunk).await.map_err(Arc::new)?;
}
Ok(())
})
}
pub fn download_file_stream(
client: &Client,
api_url: Url,
token: &str,
path: &str,
) -> impl Stream<Item = reqwest::Result<Bytes>> + 'static {
client.get(file_url(api_url, token, path)).send().into_stream().flat_map(|res| {
match res.and_then(Response::error_for_status) {
Ok(res) => Either::Left(unfold(res, |mut res| async {
match res.chunk().await {
Err(err) => Some((Err(err), res)),
Ok(Some(c)) => Some((Ok(c), res)),
Ok(None) => None,
}
})),
Err(err) => Either::Right(once(ready(Err(err)))),
}
})
}