1use std::path::Path;
4use std::pin::Pin;
5
6use bytes::Bytes;
7use futures_core::Stream;
8use tokio::fs::File;
9use tokio::io::AsyncWriteExt;
10use url::Url;
11
12use crate::client::FigshareClient;
13use crate::error::FigshareError;
14use crate::ids::{ArticleId, Doi, FileId};
15use crate::model::{Article, ArticleFile};
16
17#[derive(Clone, Debug, PartialEq, Eq)]
19pub struct ResolvedDownload {
20 pub resolved_article: ArticleId,
22 pub resolved_file_id: FileId,
24 pub resolved_name: String,
26 pub download_url: Url,
28 pub bytes_written: u64,
30}
31
32pub struct DownloadStream {
34 pub resolved: ResolvedDownload,
36 pub content_length: Option<u64>,
38 pub content_type: Option<String>,
40 pub content_disposition: Option<String>,
42 pub stream: Pin<Box<dyn Stream<Item = Result<Bytes, reqwest::Error>> + Send>>,
44}
45
46impl std::fmt::Debug for DownloadStream {
47 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48 f.debug_struct("DownloadStream")
49 .field("resolved", &self.resolved)
50 .field("content_length", &self.content_length)
51 .field("content_type", &self.content_type)
52 .field("content_disposition", &self.content_disposition)
53 .finish_non_exhaustive()
54 }
55}
56
57impl FigshareClient {
58 pub async fn open_public_article_file_by_name(
68 &self,
69 article_id: ArticleId,
70 name: &str,
71 latest: bool,
72 ) -> Result<DownloadStream, FigshareError> {
73 let article = if latest {
74 self.resolve_latest_public_article(article_id).await?
75 } else {
76 self.get_public_article(article_id).await?
77 };
78 let version = self.resolve_public_article_version_number(&article).await?;
79 let file = self
80 .find_public_article_file_by_name(article.id, version, name)
81 .await?;
82 self.open_download_for_file(article.id, file, false).await
83 }
84
85 pub async fn open_article_file_by_doi(
95 &self,
96 doi: &Doi,
97 name: &str,
98 latest: bool,
99 ) -> Result<DownloadStream, FigshareError> {
100 let article = if latest {
101 self.resolve_latest_public_article_by_doi(doi).await?
102 } else {
103 self.get_public_article_by_doi(doi).await?
104 };
105 let version = self.resolve_public_article_version_number(&article).await?;
106 let file = self
107 .find_public_article_file_by_name(article.id, version, name)
108 .await?;
109 self.open_download_for_file(article.id, file, false).await
110 }
111
112 pub async fn open_own_article_file_by_name(
121 &self,
122 article_id: ArticleId,
123 name: &str,
124 ) -> Result<DownloadStream, FigshareError> {
125 let file = self.find_own_article_file_by_name(article_id, name).await?;
126 self.open_download_for_file(article_id, file, true).await
127 }
128
129 pub async fn download_public_article_file_by_name_to_path(
136 &self,
137 article_id: ArticleId,
138 name: &str,
139 latest: bool,
140 path: &Path,
141 ) -> Result<ResolvedDownload, FigshareError> {
142 let stream = self
143 .open_public_article_file_by_name(article_id, name, latest)
144 .await?;
145 write_stream_to_path(stream, path).await
146 }
147
148 pub async fn download_article_file_by_doi_to_path(
155 &self,
156 doi: &Doi,
157 name: &str,
158 latest: bool,
159 path: &Path,
160 ) -> Result<ResolvedDownload, FigshareError> {
161 let stream = self.open_article_file_by_doi(doi, name, latest).await?;
162 write_stream_to_path(stream, path).await
163 }
164
165 pub async fn download_own_article_file_by_name_to_path(
174 &self,
175 article_id: ArticleId,
176 name: &str,
177 path: &Path,
178 ) -> Result<ResolvedDownload, FigshareError> {
179 let stream = self.open_own_article_file_by_name(article_id, name).await?;
180 write_stream_to_path(stream, path).await
181 }
182
183 async fn resolve_public_article_version_number(
184 &self,
185 article: &Article,
186 ) -> Result<u64, FigshareError> {
187 if let Some(version) = article.version_number() {
188 return Ok(version);
189 }
190
191 let versions = self.list_public_article_versions(article.id).await?;
192 Ok(versions
193 .iter()
194 .map(|version| version.version)
195 .max()
196 .unwrap_or(1))
197 }
198
199 async fn find_public_article_file_by_name(
200 &self,
201 article_id: ArticleId,
202 version: u64,
203 name: &str,
204 ) -> Result<ArticleFile, FigshareError> {
205 self.list_public_article_version_files(article_id, version)
206 .await?
207 .into_iter()
208 .find(|file| file.name == name)
209 .ok_or_else(|| FigshareError::MissingFile {
210 name: name.to_owned(),
211 })
212 }
213
214 async fn find_own_article_file_by_name(
215 &self,
216 article_id: ArticleId,
217 name: &str,
218 ) -> Result<ArticleFile, FigshareError> {
219 self.list_files(article_id)
220 .await?
221 .into_iter()
222 .find(|file| file.name == name)
223 .ok_or_else(|| FigshareError::MissingFile {
224 name: name.to_owned(),
225 })
226 }
227
228 pub(crate) async fn open_download_for_file(
229 &self,
230 article_id: ArticleId,
231 file: ArticleFile,
232 authenticated_download: bool,
233 ) -> Result<DownloadStream, FigshareError> {
234 let download_url = file
235 .download_url
236 .clone()
237 .ok_or(FigshareError::MissingLink("download_url"))?;
238
239 let response = self
240 .execute_response(self.download_request_url(
241 reqwest::Method::GET,
242 download_url.clone(),
243 authenticated_download,
244 )?)
245 .await?;
246
247 let content_length = response.content_length();
248 let content_type = response
249 .headers()
250 .get(reqwest::header::CONTENT_TYPE)
251 .and_then(|value| value.to_str().ok())
252 .map(str::to_owned);
253 let content_disposition = response
254 .headers()
255 .get(reqwest::header::CONTENT_DISPOSITION)
256 .and_then(|value| value.to_str().ok())
257 .map(str::to_owned);
258
259 Ok(DownloadStream {
260 resolved: ResolvedDownload {
261 resolved_article: article_id,
262 resolved_file_id: file.id,
263 resolved_name: file.name,
264 download_url,
265 bytes_written: 0,
266 },
267 content_length,
268 content_type,
269 content_disposition,
270 stream: Box::pin(response.bytes_stream()),
271 })
272 }
273}
274
275async fn write_stream_to_path(
276 mut stream: DownloadStream,
277 path: &Path,
278) -> Result<ResolvedDownload, FigshareError> {
279 let mut file = File::create(path).await?;
280 let mut bytes_written = 0_u64;
281
282 while let Some(chunk) = futures_util::StreamExt::next(&mut stream.stream).await {
283 let chunk = chunk?;
284 file.write_all(&chunk).await?;
285 bytes_written += chunk.len() as u64;
286 }
287 file.flush().await?;
288
289 stream.resolved.bytes_written = bytes_written;
290 Ok(stream.resolved)
291}
292
293#[cfg(test)]
294mod tests {
295 use bytes::Bytes;
296 use futures_util::stream;
297 use url::Url;
298
299 use super::{write_stream_to_path, DownloadStream, ResolvedDownload};
300 use crate::ids::{ArticleId, FileId};
301
302 #[tokio::test]
303 async fn write_stream_to_path_persists_bytes_and_updates_metadata() {
304 let dir = tempfile::tempdir().unwrap();
305 let path = dir.path().join("artifact.bin");
306 let stream = DownloadStream {
307 resolved: ResolvedDownload {
308 resolved_article: ArticleId(7),
309 resolved_file_id: FileId(9),
310 resolved_name: "artifact.bin".into(),
311 download_url: Url::parse("https://ndownloader.figshare.com/files/9").unwrap(),
312 bytes_written: 0,
313 },
314 content_length: Some(5),
315 content_type: Some("application/octet-stream".into()),
316 content_disposition: Some("attachment".into()),
317 stream: Box::pin(stream::iter(vec![
318 Ok(Bytes::from_static(b"he")),
319 Ok(Bytes::from_static(b"llo")),
320 ])),
321 };
322
323 let resolved = write_stream_to_path(stream, &path).await.unwrap();
324 assert_eq!(resolved.bytes_written, 5);
325 assert_eq!(std::fs::read(&path).unwrap(), b"hello");
326 }
327
328 #[test]
329 fn download_stream_debug_hides_stream_body() {
330 let stream = DownloadStream {
331 resolved: ResolvedDownload {
332 resolved_article: ArticleId(1),
333 resolved_file_id: FileId(2),
334 resolved_name: "artifact.bin".into(),
335 download_url: Url::parse("https://ndownloader.figshare.com/files/2").unwrap(),
336 bytes_written: 0,
337 },
338 content_length: Some(3),
339 content_type: Some("application/octet-stream".into()),
340 content_disposition: None,
341 stream: Box::pin(stream::iter(vec![Ok(Bytes::from_static(b"abc"))])),
342 };
343
344 let debug = format!("{stream:?}");
345 assert!(debug.contains("DownloadStream"));
346 assert!(debug.contains("artifact.bin"));
347 assert!(!debug.contains("abc"));
348 }
349}