Skip to main content

zenodo_rs/
downloads.rs

1//! Download helpers for record files and archives.
2//!
3//! Use this module when you already know which published record or DOI you want
4//! to consume and need either:
5//!
6//! - a streaming response via [`DownloadStream`]
7//! - a resolved local download via [`ResolvedDownload`]
8//! - high-level selector-based downloads via [`crate::records::ArtifactSelector`]
9//!
10//! For record lookup and DOI resolution before downloading, see
11//! [`crate::records`].
12
13use std::path::Path;
14use std::pin::Pin;
15
16use futures_core::Stream;
17use futures_util::StreamExt;
18#[cfg(feature = "checksums")]
19use md5::{Digest, Md5};
20use reqwest::header::{CONTENT_DISPOSITION, CONTENT_LENGTH, CONTENT_TYPE};
21use tokio::io::AsyncWriteExt;
22use url::Url;
23
24use crate::client::ZenodoClient;
25use crate::error::ZenodoError;
26use crate::ids::{Doi, RecordId};
27use crate::model::Record;
28use crate::progress::TransferProgress;
29use crate::records::{ArtifactSelector, RecordSelector};
30
31/// Streaming download response metadata plus the response body stream.
32pub struct DownloadStream {
33    /// Parsed `Content-Type` header when the server provides one.
34    pub content_type: Option<mime::Mime>,
35    /// Parsed `Content-Length` header when the server provides one.
36    pub content_length: Option<u64>,
37    /// Raw `Content-Disposition` header when the server provides one.
38    pub content_disposition: Option<String>,
39    /// Byte stream for the response body.
40    pub stream: Pin<Box<dyn Stream<Item = Result<bytes::Bytes, ZenodoError>> + Send>>,
41}
42
43/// Details about how an artifact download request was resolved.
44#[derive(Clone, Debug, PartialEq, Eq)]
45pub struct ResolvedDownload {
46    /// The selector originally requested by the caller.
47    pub requested: ArtifactSelector,
48    /// The record ID that ultimately provided the bytes.
49    pub resolved_record: RecordId,
50    /// The DOI on the resolved record, when present.
51    pub resolved_doi: Option<Doi>,
52    /// The resolved file key for file downloads.
53    pub resolved_key: Option<String>,
54    /// Number of bytes written to the destination path.
55    pub bytes_written: u64,
56    /// Checksum reported by Zenodo for the resolved file, when present.
57    pub checksum: Option<String>,
58}
59
60#[derive(Clone, Debug)]
61struct ResolvedArtifact {
62    requested: ArtifactSelector,
63    resolved_record: Record,
64    resolved_key: Option<String>,
65    checksum: Option<String>,
66    url: Url,
67}
68
69impl ZenodoClient {
70    /// Opens a download stream for a Zenodo artifact selector.
71    ///
72    /// # Examples
73    ///
74    /// ```no_run
75    /// use zenodo_rs::{ArtifactSelector, Auth, RecordId, ZenodoClient};
76    ///
77    /// #[tokio::main]
78    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
79    ///     let client = ZenodoClient::new(Auth::new("token"))?;
80    ///     let stream = client
81    ///         .open_artifact(&ArtifactSelector::latest_file(RecordId(123), "artifact.tar.gz"))
82    ///         .await?;
83    ///     let _ = stream.content_length;
84    ///     Ok(())
85    /// }
86    /// ```
87    ///
88    /// # Errors
89    ///
90    /// Returns an error if selector resolution fails or if Zenodo returns a
91    /// non-success response for the resolved download.
92    pub async fn open_artifact(
93        &self,
94        selector: &ArtifactSelector,
95    ) -> Result<DownloadStream, ZenodoError> {
96        let resolved = self.resolve_artifact(selector).await?;
97        self.open_download_url(&resolved.url).await
98    }
99
100    async fn open_download_url(&self, file_url: &Url) -> Result<DownloadStream, ZenodoError> {
101        let response = self
102            .execute_response(self.download_request_url(reqwest::Method::GET, file_url.clone())?)
103            .await?;
104
105        let content_type = response
106            .headers()
107            .get(CONTENT_TYPE)
108            .and_then(|value| value.to_str().ok())
109            .and_then(|value| value.parse().ok());
110        let content_length = response
111            .headers()
112            .get(CONTENT_LENGTH)
113            .and_then(|value| value.to_str().ok())
114            .and_then(|value| value.parse().ok());
115        let content_disposition = response
116            .headers()
117            .get(CONTENT_DISPOSITION)
118            .and_then(|value| value.to_str().ok())
119            .map(str::to_owned);
120        let stream = response
121            .bytes_stream()
122            .map(|item| item.map_err(ZenodoError::Transport));
123
124        Ok(DownloadStream {
125            content_type,
126            content_length,
127            content_disposition,
128            stream: Box::pin(stream),
129        })
130    }
131
132    /// Downloads a named file from a specific record to a local path.
133    ///
134    /// Returns resolution metadata describing the record and file that
135    /// ultimately produced the bytes.
136    ///
137    /// # Errors
138    ///
139    /// Returns an error if the record lookup fails, if the file is missing, or
140    /// if writing the destination path fails.
141    pub async fn download_record_file_by_key_to_path(
142        &self,
143        id: RecordId,
144        key: &str,
145        path: &Path,
146    ) -> Result<ResolvedDownload, ZenodoError> {
147        self.download_record_file_by_key_to_path_with_progress(id, key, path, ())
148            .await
149    }
150
151    /// Downloads a named file from a specific record to a local path while reporting progress.
152    ///
153    /// Returns resolution metadata describing the record and file that
154    /// ultimately produced the bytes.
155    ///
156    /// # Errors
157    ///
158    /// Returns an error if the record lookup fails, if the file is missing, or
159    /// if writing the destination path fails.
160    pub async fn download_record_file_by_key_to_path_with_progress<P>(
161        &self,
162        id: RecordId,
163        key: &str,
164        path: &Path,
165        progress: P,
166    ) -> Result<ResolvedDownload, ZenodoError>
167    where
168        P: TransferProgress,
169    {
170        self.download_artifact_with_progress(
171            &ArtifactSelector::FileByKey {
172                record: RecordSelector::RecordId(id),
173                key: key.to_owned(),
174                latest: false,
175            },
176            path,
177            progress,
178        )
179        .await
180    }
181
182    /// Downloads a named file from the latest record version to a local path.
183    ///
184    /// # Errors
185    ///
186    /// Returns an error if latest-version resolution fails, if the file is
187    /// missing, or if writing the destination path fails.
188    pub async fn download_latest_record_file_by_key_to_path(
189        &self,
190        id: RecordId,
191        key: &str,
192        path: &Path,
193    ) -> Result<ResolvedDownload, ZenodoError> {
194        self.download_latest_record_file_by_key_to_path_with_progress(id, key, path, ())
195            .await
196    }
197
198    /// Downloads a named file from the latest record version to a local path while reporting progress.
199    ///
200    /// # Errors
201    ///
202    /// Returns an error if latest-version resolution fails, if the file is
203    /// missing, or if writing the destination path fails.
204    pub async fn download_latest_record_file_by_key_to_path_with_progress<P>(
205        &self,
206        id: RecordId,
207        key: &str,
208        path: &Path,
209        progress: P,
210    ) -> Result<ResolvedDownload, ZenodoError>
211    where
212        P: TransferProgress,
213    {
214        self.download_artifact_with_progress(
215            &ArtifactSelector::FileByKey {
216                record: RecordSelector::RecordId(id),
217                key: key.to_owned(),
218                latest: true,
219            },
220            path,
221            progress,
222        )
223        .await
224    }
225
226    /// Downloads the archive for a specific record to a local path.
227    ///
228    /// Returns resolution metadata describing the record that produced the
229    /// archive bytes.
230    ///
231    /// # Errors
232    ///
233    /// Returns an error if the record lookup fails, if the archive link is
234    /// missing, or if writing the destination path fails.
235    pub async fn download_record_archive_to_path(
236        &self,
237        id: RecordId,
238        path: &Path,
239    ) -> Result<ResolvedDownload, ZenodoError> {
240        self.download_record_archive_to_path_with_progress(id, path, ())
241            .await
242    }
243
244    /// Downloads the archive for a specific record to a local path while reporting progress.
245    ///
246    /// Returns resolution metadata describing the record that produced the
247    /// archive bytes.
248    ///
249    /// # Errors
250    ///
251    /// Returns an error if the record lookup fails, if the archive link is
252    /// missing, or if writing the destination path fails.
253    pub async fn download_record_archive_to_path_with_progress<P>(
254        &self,
255        id: RecordId,
256        path: &Path,
257        progress: P,
258    ) -> Result<ResolvedDownload, ZenodoError>
259    where
260        P: TransferProgress,
261    {
262        self.download_artifact_with_progress(
263            &ArtifactSelector::Archive {
264                record: RecordSelector::RecordId(id),
265                latest: false,
266            },
267            path,
268            progress,
269        )
270        .await
271    }
272
273    /// Downloads a named file after resolving a DOI to a record.
274    ///
275    /// # Errors
276    ///
277    /// Returns an error if DOI resolution fails, if the file is missing, or if
278    /// writing the destination path fails.
279    pub async fn download_file_by_doi_to_path(
280        &self,
281        doi: &Doi,
282        key: &str,
283        latest: bool,
284        path: &Path,
285    ) -> Result<ResolvedDownload, ZenodoError> {
286        self.download_file_by_doi_to_path_with_progress(doi, key, latest, path, ())
287            .await
288    }
289
290    /// Downloads a named file after resolving a DOI to a record while reporting progress.
291    ///
292    /// # Errors
293    ///
294    /// Returns an error if DOI resolution fails, if the file is missing, or if
295    /// writing the destination path fails.
296    pub async fn download_file_by_doi_to_path_with_progress<P>(
297        &self,
298        doi: &Doi,
299        key: &str,
300        latest: bool,
301        path: &Path,
302        progress: P,
303    ) -> Result<ResolvedDownload, ZenodoError>
304    where
305        P: TransferProgress,
306    {
307        self.download_artifact_with_progress(
308            &ArtifactSelector::FileByKey {
309                record: RecordSelector::Doi(doi.clone()),
310                key: key.to_owned(),
311                latest,
312            },
313            path,
314            progress,
315        )
316        .await
317    }
318
319    /// Downloads an artifact selected by high-level record or DOI selectors.
320    ///
321    /// # Examples
322    ///
323    /// ```no_run
324    /// use std::path::Path;
325    /// use zenodo_rs::{ArtifactSelector, Auth, ZenodoClient};
326    ///
327    /// #[tokio::main]
328    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
329    ///     let client = ZenodoClient::new(Auth::new("token"))?;
330    ///     let resolved = client
331    ///         .download_artifact(
332    ///             &ArtifactSelector::latest_archive_by_doi("10.5281/zenodo.123")?,
333    ///             Path::new("record.zip"),
334    ///         )
335    ///         .await?;
336    ///     let _ = resolved.bytes_written;
337    ///     Ok(())
338    /// }
339    /// ```
340    ///
341    /// # Errors
342    ///
343    /// Returns an error if record resolution fails, if the requested artifact
344    /// is unavailable, if checksum validation fails, or if writing the
345    /// destination path fails.
346    pub async fn download_artifact(
347        &self,
348        selector: &ArtifactSelector,
349        destination: &Path,
350    ) -> Result<ResolvedDownload, ZenodoError> {
351        self.download_artifact_with_progress(selector, destination, ())
352            .await
353    }
354
355    /// Downloads an artifact selected by high-level record or DOI selectors while reporting progress.
356    ///
357    /// The supplied progress sink receives the response `Content-Length` when
358    /// Zenodo provides one and one `advance` event per chunk successfully
359    /// written to disk.
360    ///
361    /// # Errors
362    ///
363    /// Returns an error if record resolution fails, if the requested artifact
364    /// is unavailable, if checksum validation fails, or if writing the
365    /// destination path fails.
366    pub async fn download_artifact_with_progress<P>(
367        &self,
368        selector: &ArtifactSelector,
369        destination: &Path,
370        progress: P,
371    ) -> Result<ResolvedDownload, ZenodoError>
372    where
373        P: TransferProgress,
374    {
375        let resolved = self.resolve_artifact(selector).await?;
376        let bytes_written = write_stream_to_path_with_progress(
377            self.open_download_url(&resolved.url).await?,
378            destination,
379            resolved.checksum.as_deref(),
380            progress,
381        )
382        .await?;
383
384        Ok(ResolvedDownload {
385            requested: resolved.requested,
386            resolved_record: resolved.resolved_record.id,
387            resolved_doi: resolved.resolved_record.doi,
388            resolved_key: resolved.resolved_key,
389            bytes_written,
390            checksum: resolved.checksum,
391        })
392    }
393
394    async fn resolve_record_for_download(
395        &self,
396        selector: &RecordSelector,
397        latest: bool,
398    ) -> Result<Record, ZenodoError> {
399        let record = self.resolve_record_selector(selector).await?;
400        if latest {
401            self.resolve_latest_from_record(record).await
402        } else {
403            Ok(record)
404        }
405    }
406
407    async fn resolve_artifact(
408        &self,
409        selector: &ArtifactSelector,
410    ) -> Result<ResolvedArtifact, ZenodoError> {
411        match selector {
412            ArtifactSelector::FileByKey {
413                record,
414                key,
415                latest,
416            } => {
417                let resolved_record = self.resolve_record_for_download(record, *latest).await?;
418                let file = resolved_record.file_by_key(key).cloned().ok_or_else(|| {
419                    ZenodoError::MissingFile {
420                        key: key.to_owned(),
421                    }
422                })?;
423                let url = file
424                    .download_url()
425                    .cloned()
426                    .ok_or(ZenodoError::MissingLink("record_file.links.self"))?;
427
428                Ok(ResolvedArtifact {
429                    requested: selector.clone(),
430                    resolved_record,
431                    resolved_key: Some(file.key),
432                    checksum: file.checksum,
433                    url,
434                })
435            }
436            ArtifactSelector::Archive { record, latest } => {
437                let resolved_record = self.resolve_record_for_download(record, *latest).await?;
438                let url = resolved_record
439                    .archive_url()
440                    .cloned()
441                    .ok_or(ZenodoError::MissingLink("archive"))?;
442
443                Ok(ResolvedArtifact {
444                    requested: selector.clone(),
445                    resolved_record,
446                    resolved_key: None,
447                    checksum: None,
448                    url,
449                })
450            }
451        }
452    }
453}
454
455async fn write_stream_to_path_with_progress<P>(
456    mut stream: DownloadStream,
457    path: &Path,
458    #[cfg(feature = "checksums")] expected_checksum: Option<&str>,
459    #[cfg(not(feature = "checksums"))] _expected_checksum: Option<&str>,
460    progress: P,
461) -> Result<u64, ZenodoError>
462where
463    P: TransferProgress,
464{
465    progress.begin(stream.content_length);
466    let temp = tempfile::Builder::new()
467        .prefix(".zenodo-rs-download-")
468        .tempfile_in(download_parent_directory(path))?;
469    let temp_path = temp.path().to_path_buf();
470    let mut file = tokio::fs::File::from_std(temp.reopen()?);
471    let mut bytes_written = 0_u64;
472    #[cfg(feature = "checksums")]
473    let mut checksum_validator = checksum_validator(expected_checksum)?;
474
475    while let Some(chunk) = stream.stream.next().await {
476        let result = async {
477            let chunk = chunk?;
478            #[cfg(feature = "checksums")]
479            if let Some(validator) = checksum_validator.as_mut() {
480                validator.update(&chunk);
481            }
482            file.write_all(&chunk).await?;
483            bytes_written += chunk.len() as u64;
484            progress.advance(chunk.len() as u64);
485            Ok::<(), ZenodoError>(())
486        }
487        .await;
488        if let Err(error) = result {
489            drop(file);
490            let _ = tokio::fs::remove_file(&temp_path).await;
491            return Err(error);
492        }
493    }
494
495    file.flush().await?;
496    file.sync_all().await?;
497    drop(file);
498    #[cfg(feature = "checksums")]
499    if let Some(validator) = checksum_validator {
500        if let Err(error) = validator.finish() {
501            let _ = tokio::fs::remove_file(&temp_path).await;
502            return Err(error);
503        }
504    }
505    temp.persist(path)
506        .map_err(|error| ZenodoError::Io(error.error))?;
507    progress.finish();
508    Ok(bytes_written)
509}
510
511fn download_parent_directory(path: &Path) -> &Path {
512    path.parent()
513        .filter(|parent| !parent.as_os_str().is_empty())
514        .unwrap_or_else(|| Path::new("."))
515}
516
517#[cfg(feature = "checksums")]
518#[derive(Debug)]
519struct ChecksumValidator {
520    expected: String,
521    hasher: Md5,
522}
523
524#[cfg(feature = "checksums")]
525impl ChecksumValidator {
526    fn update(&mut self, bytes: &[u8]) {
527        self.hasher.update(bytes);
528    }
529
530    fn finish(self) -> Result<(), ZenodoError> {
531        let actual = hex::encode(self.hasher.finalize());
532        if actual == self.expected {
533            Ok(())
534        } else {
535            Err(ZenodoError::ChecksumMismatch {
536                expected: self.expected,
537                actual,
538            })
539        }
540    }
541}
542
543#[cfg(feature = "checksums")]
544fn checksum_validator(
545    expected_checksum: Option<&str>,
546) -> Result<Option<ChecksumValidator>, ZenodoError> {
547    let Some(expected_checksum) = expected_checksum else {
548        return Ok(None);
549    };
550
551    let Some((algorithm, expected)) = expected_checksum.split_once(':') else {
552        return Err(ZenodoError::InvalidState(format!(
553            "unsupported checksum format: {expected_checksum}"
554        )));
555    };
556
557    if !algorithm.eq_ignore_ascii_case("md5") {
558        return Err(ZenodoError::InvalidState(format!(
559            "unsupported checksum algorithm: {algorithm}"
560        )));
561    }
562
563    Ok(Some(ChecksumValidator {
564        expected: expected.trim().to_ascii_lowercase(),
565        hasher: Md5::new(),
566    }))
567}
568
569#[cfg(test)]
570mod tests {
571    use crate::model::Record;
572
573    #[test]
574    fn artifact_lookup_uses_file_key() {
575        let record: Record = serde_json::from_value(serde_json::json!({
576            "id": 42,
577            "recid": 42,
578            "metadata": { "title": "artifact" },
579            "files": [
580                {
581                    "id": "abc",
582                    "key": "bundle.tar.gz",
583                    "size": 10,
584                    "links": { "self": "https://zenodo.org/api/files/1" }
585                }
586            ],
587            "links": {
588                "archive": "https://zenodo.org/api/records/42/files-archive"
589            }
590        }))
591        .unwrap();
592
593        assert_eq!(record.file_by_key("bundle.tar.gz").unwrap().id, "abc");
594        assert_eq!(
595            record.archive_url().unwrap().as_str(),
596            "https://zenodo.org/api/records/42/files-archive"
597        );
598    }
599
600    #[cfg(feature = "checksums")]
601    #[test]
602    fn checksum_validator_accepts_md5_and_rejects_unsupported_formats() {
603        let mut validator = super::checksum_validator(Some("md5:900150983cd24fb0d6963f7d28e17f72"))
604            .unwrap()
605            .unwrap();
606        validator.update(b"abc");
607        assert!(validator.finish().is_ok());
608
609        let error = super::checksum_validator(Some("sha256:deadbeef")).unwrap_err();
610        assert!(matches!(error, crate::ZenodoError::InvalidState(_)));
611
612        let error = super::checksum_validator(Some("deadbeef")).unwrap_err();
613        assert!(matches!(error, crate::ZenodoError::InvalidState(_)));
614    }
615}