1use std::path::Path;
14use std::pin::Pin;
15
16use futures_core::Stream;
17use futures_util::StreamExt;
18#[cfg(feature = "checksums")]
19use md5::{Digest, Md5};
20use reqwest::header::{CONTENT_DISPOSITION, CONTENT_LENGTH, CONTENT_TYPE};
21use tokio::io::AsyncWriteExt;
22use url::Url;
23
24use crate::client::ZenodoClient;
25use crate::error::ZenodoError;
26use crate::ids::{Doi, RecordId};
27use crate::model::Record;
28use crate::progress::TransferProgress;
29use crate::records::{ArtifactSelector, RecordSelector};
30
31pub struct DownloadStream {
33 pub content_type: Option<mime::Mime>,
35 pub content_length: Option<u64>,
37 pub content_disposition: Option<String>,
39 pub stream: Pin<Box<dyn Stream<Item = Result<bytes::Bytes, ZenodoError>> + Send>>,
41}
42
43#[derive(Clone, Debug, PartialEq, Eq)]
45pub struct ResolvedDownload {
46 pub requested: ArtifactSelector,
48 pub resolved_record: RecordId,
50 pub resolved_doi: Option<Doi>,
52 pub resolved_key: Option<String>,
54 pub bytes_written: u64,
56 pub checksum: Option<String>,
58}
59
60#[derive(Clone, Debug)]
61struct ResolvedArtifact {
62 requested: ArtifactSelector,
63 resolved_record: Record,
64 resolved_key: Option<String>,
65 checksum: Option<String>,
66 url: Url,
67}
68
69impl ZenodoClient {
70 pub async fn open_artifact(
93 &self,
94 selector: &ArtifactSelector,
95 ) -> Result<DownloadStream, ZenodoError> {
96 let resolved = self.resolve_artifact(selector).await?;
97 self.open_download_url(&resolved.url).await
98 }
99
100 async fn open_download_url(&self, file_url: &Url) -> Result<DownloadStream, ZenodoError> {
101 let response = self
102 .execute_response(self.download_request_url(reqwest::Method::GET, file_url.clone())?)
103 .await?;
104
105 let content_type = response
106 .headers()
107 .get(CONTENT_TYPE)
108 .and_then(|value| value.to_str().ok())
109 .and_then(|value| value.parse().ok());
110 let content_length = response
111 .headers()
112 .get(CONTENT_LENGTH)
113 .and_then(|value| value.to_str().ok())
114 .and_then(|value| value.parse().ok());
115 let content_disposition = response
116 .headers()
117 .get(CONTENT_DISPOSITION)
118 .and_then(|value| value.to_str().ok())
119 .map(str::to_owned);
120 let stream = response
121 .bytes_stream()
122 .map(|item| item.map_err(ZenodoError::Transport));
123
124 Ok(DownloadStream {
125 content_type,
126 content_length,
127 content_disposition,
128 stream: Box::pin(stream),
129 })
130 }
131
132 pub async fn download_record_file_by_key_to_path(
142 &self,
143 id: RecordId,
144 key: &str,
145 path: &Path,
146 ) -> Result<ResolvedDownload, ZenodoError> {
147 self.download_record_file_by_key_to_path_with_progress(id, key, path, ())
148 .await
149 }
150
151 pub async fn download_record_file_by_key_to_path_with_progress<P>(
161 &self,
162 id: RecordId,
163 key: &str,
164 path: &Path,
165 progress: P,
166 ) -> Result<ResolvedDownload, ZenodoError>
167 where
168 P: TransferProgress,
169 {
170 self.download_artifact_with_progress(
171 &ArtifactSelector::FileByKey {
172 record: RecordSelector::RecordId(id),
173 key: key.to_owned(),
174 latest: false,
175 },
176 path,
177 progress,
178 )
179 .await
180 }
181
182 pub async fn download_latest_record_file_by_key_to_path(
189 &self,
190 id: RecordId,
191 key: &str,
192 path: &Path,
193 ) -> Result<ResolvedDownload, ZenodoError> {
194 self.download_latest_record_file_by_key_to_path_with_progress(id, key, path, ())
195 .await
196 }
197
198 pub async fn download_latest_record_file_by_key_to_path_with_progress<P>(
205 &self,
206 id: RecordId,
207 key: &str,
208 path: &Path,
209 progress: P,
210 ) -> Result<ResolvedDownload, ZenodoError>
211 where
212 P: TransferProgress,
213 {
214 self.download_artifact_with_progress(
215 &ArtifactSelector::FileByKey {
216 record: RecordSelector::RecordId(id),
217 key: key.to_owned(),
218 latest: true,
219 },
220 path,
221 progress,
222 )
223 .await
224 }
225
226 pub async fn download_record_archive_to_path(
236 &self,
237 id: RecordId,
238 path: &Path,
239 ) -> Result<ResolvedDownload, ZenodoError> {
240 self.download_record_archive_to_path_with_progress(id, path, ())
241 .await
242 }
243
244 pub async fn download_record_archive_to_path_with_progress<P>(
254 &self,
255 id: RecordId,
256 path: &Path,
257 progress: P,
258 ) -> Result<ResolvedDownload, ZenodoError>
259 where
260 P: TransferProgress,
261 {
262 self.download_artifact_with_progress(
263 &ArtifactSelector::Archive {
264 record: RecordSelector::RecordId(id),
265 latest: false,
266 },
267 path,
268 progress,
269 )
270 .await
271 }
272
273 pub async fn download_file_by_doi_to_path(
280 &self,
281 doi: &Doi,
282 key: &str,
283 latest: bool,
284 path: &Path,
285 ) -> Result<ResolvedDownload, ZenodoError> {
286 self.download_file_by_doi_to_path_with_progress(doi, key, latest, path, ())
287 .await
288 }
289
290 pub async fn download_file_by_doi_to_path_with_progress<P>(
297 &self,
298 doi: &Doi,
299 key: &str,
300 latest: bool,
301 path: &Path,
302 progress: P,
303 ) -> Result<ResolvedDownload, ZenodoError>
304 where
305 P: TransferProgress,
306 {
307 self.download_artifact_with_progress(
308 &ArtifactSelector::FileByKey {
309 record: RecordSelector::Doi(doi.clone()),
310 key: key.to_owned(),
311 latest,
312 },
313 path,
314 progress,
315 )
316 .await
317 }
318
319 pub async fn download_artifact(
347 &self,
348 selector: &ArtifactSelector,
349 destination: &Path,
350 ) -> Result<ResolvedDownload, ZenodoError> {
351 self.download_artifact_with_progress(selector, destination, ())
352 .await
353 }
354
355 pub async fn download_artifact_with_progress<P>(
367 &self,
368 selector: &ArtifactSelector,
369 destination: &Path,
370 progress: P,
371 ) -> Result<ResolvedDownload, ZenodoError>
372 where
373 P: TransferProgress,
374 {
375 let resolved = self.resolve_artifact(selector).await?;
376 let bytes_written = write_stream_to_path_with_progress(
377 self.open_download_url(&resolved.url).await?,
378 destination,
379 resolved.checksum.as_deref(),
380 progress,
381 )
382 .await?;
383
384 Ok(ResolvedDownload {
385 requested: resolved.requested,
386 resolved_record: resolved.resolved_record.id,
387 resolved_doi: resolved.resolved_record.doi,
388 resolved_key: resolved.resolved_key,
389 bytes_written,
390 checksum: resolved.checksum,
391 })
392 }
393
394 async fn resolve_record_for_download(
395 &self,
396 selector: &RecordSelector,
397 latest: bool,
398 ) -> Result<Record, ZenodoError> {
399 let record = self.resolve_record_selector(selector).await?;
400 if latest {
401 self.resolve_latest_from_record(record).await
402 } else {
403 Ok(record)
404 }
405 }
406
407 async fn resolve_artifact(
408 &self,
409 selector: &ArtifactSelector,
410 ) -> Result<ResolvedArtifact, ZenodoError> {
411 match selector {
412 ArtifactSelector::FileByKey {
413 record,
414 key,
415 latest,
416 } => {
417 let resolved_record = self.resolve_record_for_download(record, *latest).await?;
418 let file = resolved_record.file_by_key(key).cloned().ok_or_else(|| {
419 ZenodoError::MissingFile {
420 key: key.to_owned(),
421 }
422 })?;
423 let url = file
424 .download_url()
425 .cloned()
426 .ok_or(ZenodoError::MissingLink("record_file.links.self"))?;
427
428 Ok(ResolvedArtifact {
429 requested: selector.clone(),
430 resolved_record,
431 resolved_key: Some(file.key),
432 checksum: file.checksum,
433 url,
434 })
435 }
436 ArtifactSelector::Archive { record, latest } => {
437 let resolved_record = self.resolve_record_for_download(record, *latest).await?;
438 let url = resolved_record
439 .archive_url()
440 .cloned()
441 .ok_or(ZenodoError::MissingLink("archive"))?;
442
443 Ok(ResolvedArtifact {
444 requested: selector.clone(),
445 resolved_record,
446 resolved_key: None,
447 checksum: None,
448 url,
449 })
450 }
451 }
452 }
453}
454
455async fn write_stream_to_path_with_progress<P>(
456 mut stream: DownloadStream,
457 path: &Path,
458 #[cfg(feature = "checksums")] expected_checksum: Option<&str>,
459 #[cfg(not(feature = "checksums"))] _expected_checksum: Option<&str>,
460 progress: P,
461) -> Result<u64, ZenodoError>
462where
463 P: TransferProgress,
464{
465 progress.begin(stream.content_length);
466 let temp = tempfile::Builder::new()
467 .prefix(".zenodo-rs-download-")
468 .tempfile_in(download_parent_directory(path))?;
469 let temp_path = temp.path().to_path_buf();
470 let mut file = tokio::fs::File::from_std(temp.reopen()?);
471 let mut bytes_written = 0_u64;
472 #[cfg(feature = "checksums")]
473 let mut checksum_validator = checksum_validator(expected_checksum)?;
474
475 while let Some(chunk) = stream.stream.next().await {
476 let result = async {
477 let chunk = chunk?;
478 #[cfg(feature = "checksums")]
479 if let Some(validator) = checksum_validator.as_mut() {
480 validator.update(&chunk);
481 }
482 file.write_all(&chunk).await?;
483 bytes_written += chunk.len() as u64;
484 progress.advance(chunk.len() as u64);
485 Ok::<(), ZenodoError>(())
486 }
487 .await;
488 if let Err(error) = result {
489 drop(file);
490 let _ = tokio::fs::remove_file(&temp_path).await;
491 return Err(error);
492 }
493 }
494
495 file.flush().await?;
496 file.sync_all().await?;
497 drop(file);
498 #[cfg(feature = "checksums")]
499 if let Some(validator) = checksum_validator {
500 if let Err(error) = validator.finish() {
501 let _ = tokio::fs::remove_file(&temp_path).await;
502 return Err(error);
503 }
504 }
505 temp.persist(path)
506 .map_err(|error| ZenodoError::Io(error.error))?;
507 progress.finish();
508 Ok(bytes_written)
509}
510
511fn download_parent_directory(path: &Path) -> &Path {
512 path.parent()
513 .filter(|parent| !parent.as_os_str().is_empty())
514 .unwrap_or_else(|| Path::new("."))
515}
516
517#[cfg(feature = "checksums")]
518#[derive(Debug)]
519struct ChecksumValidator {
520 expected: String,
521 hasher: Md5,
522}
523
524#[cfg(feature = "checksums")]
525impl ChecksumValidator {
526 fn update(&mut self, bytes: &[u8]) {
527 self.hasher.update(bytes);
528 }
529
530 fn finish(self) -> Result<(), ZenodoError> {
531 let actual = hex::encode(self.hasher.finalize());
532 if actual == self.expected {
533 Ok(())
534 } else {
535 Err(ZenodoError::ChecksumMismatch {
536 expected: self.expected,
537 actual,
538 })
539 }
540 }
541}
542
543#[cfg(feature = "checksums")]
544fn checksum_validator(
545 expected_checksum: Option<&str>,
546) -> Result<Option<ChecksumValidator>, ZenodoError> {
547 let Some(expected_checksum) = expected_checksum else {
548 return Ok(None);
549 };
550
551 let Some((algorithm, expected)) = expected_checksum.split_once(':') else {
552 return Err(ZenodoError::InvalidState(format!(
553 "unsupported checksum format: {expected_checksum}"
554 )));
555 };
556
557 if !algorithm.eq_ignore_ascii_case("md5") {
558 return Err(ZenodoError::InvalidState(format!(
559 "unsupported checksum algorithm: {algorithm}"
560 )));
561 }
562
563 Ok(Some(ChecksumValidator {
564 expected: expected.trim().to_ascii_lowercase(),
565 hasher: Md5::new(),
566 }))
567}
568
569#[cfg(test)]
570mod tests {
571 use crate::model::Record;
572
573 #[test]
574 fn artifact_lookup_uses_file_key() {
575 let record: Record = serde_json::from_value(serde_json::json!({
576 "id": 42,
577 "recid": 42,
578 "metadata": { "title": "artifact" },
579 "files": [
580 {
581 "id": "abc",
582 "key": "bundle.tar.gz",
583 "size": 10,
584 "links": { "self": "https://zenodo.org/api/files/1" }
585 }
586 ],
587 "links": {
588 "archive": "https://zenodo.org/api/records/42/files-archive"
589 }
590 }))
591 .unwrap();
592
593 assert_eq!(record.file_by_key("bundle.tar.gz").unwrap().id, "abc");
594 assert_eq!(
595 record.archive_url().unwrap().as_str(),
596 "https://zenodo.org/api/records/42/files-archive"
597 );
598 }
599
600 #[cfg(feature = "checksums")]
601 #[test]
602 fn checksum_validator_accepts_md5_and_rejects_unsupported_formats() {
603 let mut validator = super::checksum_validator(Some("md5:900150983cd24fb0d6963f7d28e17f72"))
604 .unwrap()
605 .unwrap();
606 validator.update(b"abc");
607 assert!(validator.finish().is_ok());
608
609 let error = super::checksum_validator(Some("sha256:deadbeef")).unwrap_err();
610 assert!(matches!(error, crate::ZenodoError::InvalidState(_)));
611
612 let error = super::checksum_validator(Some("deadbeef")).unwrap_err();
613 assert!(matches!(error, crate::ZenodoError::InvalidState(_)));
614 }
615}