use anyhow::Error;
use futures_util::stream::StreamExt;
use reqwest::header;
use tokio::io::{copy, AsyncWrite};
use tokio_util::io::StreamReader;
pub(crate) enum RetriableResult<R, E> {
Retriable(E),
Permanent(E),
Ok(R),
}
pub(crate) struct FetchMetadata {
pub content_type: String,
}
pub(crate) async fn get_url(
url: &str,
writer: &mut (dyn AsyncWrite + Unpin),
) -> RetriableResult<FetchMetadata, Error> {
let res = match reqwest::get(url)
.await
.and_then(|res| res.error_for_status())
{
Err(err) => {
if err.status().map(|s| s.is_client_error()).unwrap_or(false) {
return RetriableResult::Permanent(err.into());
} else {
return RetriableResult::Retriable(err.into());
}
}
Ok(res) => res,
};
let default_content_type = "application/binary";
let content_type = res
.headers()
.get(header::CONTENT_TYPE)
.map(|h| h.to_str().unwrap_or(default_content_type))
.unwrap_or(default_content_type)
.to_owned();
let stream = res
.bytes_stream()
.map(|r| r.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)));
let mut reader = StreamReader::new(stream);
match copy(&mut reader, writer).await {
Ok(_) => {}
Err(e) => return RetriableResult::Retriable(e.into()),
};
return RetriableResult::Ok(FetchMetadata { content_type });
}