use std::path::Path;
use std::pin::Pin;
use bytes::Bytes;
use futures_core::Stream;
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use url::Url;
use crate::client::FigshareClient;
use crate::error::FigshareError;
use crate::ids::{ArticleId, Doi, FileId};
use crate::model::{Article, ArticleFile};
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ResolvedDownload {
pub resolved_article: ArticleId,
pub resolved_file_id: FileId,
pub resolved_name: String,
pub download_url: Url,
pub bytes_written: u64,
}
pub struct DownloadStream {
pub resolved: ResolvedDownload,
pub content_length: Option<u64>,
pub content_type: Option<String>,
pub content_disposition: Option<String>,
pub stream: Pin<Box<dyn Stream<Item = Result<Bytes, reqwest::Error>> + Send>>,
}
impl std::fmt::Debug for DownloadStream {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DownloadStream")
.field("resolved", &self.resolved)
.field("content_length", &self.content_length)
.field("content_type", &self.content_type)
.field("content_disposition", &self.content_disposition)
.finish_non_exhaustive()
}
}
impl FigshareClient {
pub async fn open_public_article_file_by_name(
&self,
article_id: ArticleId,
name: &str,
latest: bool,
) -> Result<DownloadStream, FigshareError> {
let article = if latest {
self.resolve_latest_public_article(article_id).await?
} else {
self.get_public_article(article_id).await?
};
let version = self.resolve_public_article_version_number(&article).await?;
let file = self
.find_public_article_file_by_name(article.id, version, name)
.await?;
self.open_download_for_file(article.id, file, false).await
}
pub async fn open_article_file_by_doi(
&self,
doi: &Doi,
name: &str,
latest: bool,
) -> Result<DownloadStream, FigshareError> {
let article = if latest {
self.resolve_latest_public_article_by_doi(doi).await?
} else {
self.get_public_article_by_doi(doi).await?
};
let version = self.resolve_public_article_version_number(&article).await?;
let file = self
.find_public_article_file_by_name(article.id, version, name)
.await?;
self.open_download_for_file(article.id, file, false).await
}
pub async fn open_own_article_file_by_name(
&self,
article_id: ArticleId,
name: &str,
) -> Result<DownloadStream, FigshareError> {
let file = self.find_own_article_file_by_name(article_id, name).await?;
self.open_download_for_file(article_id, file, true).await
}
pub async fn download_public_article_file_by_name_to_path(
&self,
article_id: ArticleId,
name: &str,
latest: bool,
path: &Path,
) -> Result<ResolvedDownload, FigshareError> {
let stream = self
.open_public_article_file_by_name(article_id, name, latest)
.await?;
write_stream_to_path(stream, path).await
}
pub async fn download_article_file_by_doi_to_path(
&self,
doi: &Doi,
name: &str,
latest: bool,
path: &Path,
) -> Result<ResolvedDownload, FigshareError> {
let stream = self.open_article_file_by_doi(doi, name, latest).await?;
write_stream_to_path(stream, path).await
}
pub async fn download_own_article_file_by_name_to_path(
&self,
article_id: ArticleId,
name: &str,
path: &Path,
) -> Result<ResolvedDownload, FigshareError> {
let stream = self.open_own_article_file_by_name(article_id, name).await?;
write_stream_to_path(stream, path).await
}
async fn resolve_public_article_version_number(
&self,
article: &Article,
) -> Result<u64, FigshareError> {
if let Some(version) = article.version_number() {
return Ok(version);
}
let versions = self.list_public_article_versions(article.id).await?;
Ok(versions
.iter()
.map(|version| version.version)
.max()
.unwrap_or(1))
}
async fn find_public_article_file_by_name(
&self,
article_id: ArticleId,
version: u64,
name: &str,
) -> Result<ArticleFile, FigshareError> {
self.list_public_article_version_files(article_id, version)
.await?
.into_iter()
.find(|file| file.name == name)
.ok_or_else(|| FigshareError::MissingFile {
name: name.to_owned(),
})
}
async fn find_own_article_file_by_name(
&self,
article_id: ArticleId,
name: &str,
) -> Result<ArticleFile, FigshareError> {
self.list_files(article_id)
.await?
.into_iter()
.find(|file| file.name == name)
.ok_or_else(|| FigshareError::MissingFile {
name: name.to_owned(),
})
}
pub(crate) async fn open_download_for_file(
&self,
article_id: ArticleId,
file: ArticleFile,
authenticated_download: bool,
) -> Result<DownloadStream, FigshareError> {
let download_url = file
.download_url
.clone()
.ok_or(FigshareError::MissingLink("download_url"))?;
let response = self
.execute_response(self.download_request_url(
reqwest::Method::GET,
download_url.clone(),
authenticated_download,
)?)
.await?;
let content_length = response.content_length();
let content_type = response
.headers()
.get(reqwest::header::CONTENT_TYPE)
.and_then(|value| value.to_str().ok())
.map(str::to_owned);
let content_disposition = response
.headers()
.get(reqwest::header::CONTENT_DISPOSITION)
.and_then(|value| value.to_str().ok())
.map(str::to_owned);
Ok(DownloadStream {
resolved: ResolvedDownload {
resolved_article: article_id,
resolved_file_id: file.id,
resolved_name: file.name,
download_url,
bytes_written: 0,
},
content_length,
content_type,
content_disposition,
stream: Box::pin(response.bytes_stream()),
})
}
}
async fn write_stream_to_path(
mut stream: DownloadStream,
path: &Path,
) -> Result<ResolvedDownload, FigshareError> {
let mut file = File::create(path).await?;
let mut bytes_written = 0_u64;
while let Some(chunk) = futures_util::StreamExt::next(&mut stream.stream).await {
let chunk = chunk?;
file.write_all(&chunk).await?;
bytes_written += chunk.len() as u64;
}
file.flush().await?;
stream.resolved.bytes_written = bytes_written;
Ok(stream.resolved)
}
#[cfg(test)]
mod tests {
use bytes::Bytes;
use futures_util::stream;
use url::Url;
use super::{write_stream_to_path, DownloadStream, ResolvedDownload};
use crate::ids::{ArticleId, FileId};
#[tokio::test]
async fn write_stream_to_path_persists_bytes_and_updates_metadata() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("artifact.bin");
let stream = DownloadStream {
resolved: ResolvedDownload {
resolved_article: ArticleId(7),
resolved_file_id: FileId(9),
resolved_name: "artifact.bin".into(),
download_url: Url::parse("https://ndownloader.figshare.com/files/9").unwrap(),
bytes_written: 0,
},
content_length: Some(5),
content_type: Some("application/octet-stream".into()),
content_disposition: Some("attachment".into()),
stream: Box::pin(stream::iter(vec![
Ok(Bytes::from_static(b"he")),
Ok(Bytes::from_static(b"llo")),
])),
};
let resolved = write_stream_to_path(stream, &path).await.unwrap();
assert_eq!(resolved.bytes_written, 5);
assert_eq!(std::fs::read(&path).unwrap(), b"hello");
}
#[test]
fn download_stream_debug_hides_stream_body() {
let stream = DownloadStream {
resolved: ResolvedDownload {
resolved_article: ArticleId(1),
resolved_file_id: FileId(2),
resolved_name: "artifact.bin".into(),
download_url: Url::parse("https://ndownloader.figshare.com/files/2").unwrap(),
bytes_written: 0,
},
content_length: Some(3),
content_type: Some("application/octet-stream".into()),
content_disposition: None,
stream: Box::pin(stream::iter(vec![Ok(Bytes::from_static(b"abc"))])),
};
let debug = format!("{stream:?}");
assert!(debug.contains("DownloadStream"));
assert!(debug.contains("artifact.bin"));
assert!(!debug.contains("abc"));
}
}