Skip to main content

zenodo_rs/
workflow.rs

1//! Higher-level workflow helpers that encode Zenodo deposition lifecycles.
2//!
3//! Use this module when you want the crate to encode Zenodo's safe-path state
4//! transitions for you.
5//!
6//! The main helpers are:
7//!
8//! - [`ZenodoClient::ensure_editable_draft`] to reuse a draft or create a new version
9//! - [`ZenodoClient::enter_edit_mode`] to reopen the current published deposition
10//! - [`ZenodoClient::reconcile_files`] to apply a [`FileReplacePolicy`]
11//! - [`ZenodoClient::publish_dataset_with_policy`] for end-to-end publish flows
12//! - [`ZenodoClient::create_and_publish_dataset_with_policy`] to create a fresh deposition and publish it
13//!
14//! If you want to call raw endpoints one by one, use [`crate::client`] instead.
15
16use std::sync::Arc;
17use std::time::Instant;
18
19use client_uploader_traits::collect_upload_filenames;
20use tokio::time::sleep;
21use url::Url;
22
23use crate::client::ZenodoClient;
24use crate::error::ZenodoError;
25use crate::ids::DepositionId;
26use crate::metadata::DepositMetadataUpdate;
27use crate::model::{Deposition, PublishedRecord};
28use crate::progress::TransferProgress;
29use crate::upload::{FileReplacePolicy, UploadSource, UploadSpec};
30
31/// Action needed to obtain an editable draft.
32#[derive(Clone, Copy, Debug, PartialEq, Eq)]
33pub enum EditableDraftAction {
34    /// The current deposition is already editable.
35    ReuseExisting,
36    /// A new version draft must be created first.
37    CreateNewVersion,
38}
39
40/// Determines whether a deposition can be reused directly or needs `newversion`.
41///
42/// # Examples
43///
44/// ```
45/// use zenodo_rs::workflow::{editable_draft_action, EditableDraftAction};
46/// use zenodo_rs::Deposition;
47///
48/// let deposition: Deposition = serde_json::from_value(serde_json::json!({
49///     "id": 1,
50///     "submitted": true,
51///     "state": "done",
52///     "metadata": {},
53///     "files": [],
54///     "links": {}
55/// }))?;
56///
57/// assert_eq!(
58///     editable_draft_action(&deposition),
59///     EditableDraftAction::CreateNewVersion
60/// );
61/// # Ok::<(), Box<dyn std::error::Error>>(())
62/// ```
63#[must_use]
64pub fn editable_draft_action(deposition: &Deposition) -> EditableDraftAction {
65    if deposition.is_published() {
66        EditableDraftAction::CreateNewVersion
67    } else {
68        EditableDraftAction::ReuseExisting
69    }
70}
71
72fn deposition_allows_metadata_edits(deposition: &Deposition) -> bool {
73    deposition.allows_metadata_edits()
74}
75
76struct ForwardOnlyProgress<P>(P);
77
78impl<P> TransferProgress for ForwardOnlyProgress<P>
79where
80    P: TransferProgress,
81{
82    fn advance(&self, delta: u64) {
83        self.0.advance(delta);
84    }
85}
86
87pub(crate) fn file_ids_to_delete(
88    policy: FileReplacePolicy,
89    deposition: &Deposition,
90    uploaded_filenames: &std::collections::BTreeSet<String>,
91) -> Vec<crate::ids::DepositionFileId> {
92    match policy {
93        FileReplacePolicy::ReplaceAll => deposition
94            .files
95            .iter()
96            .map(|file| file.id.clone())
97            .collect(),
98        FileReplacePolicy::UpsertByFilename => deposition
99            .files
100            .iter()
101            .filter(|file| uploaded_filenames.contains(&file.filename))
102            .map(|file| file.id.clone())
103            .collect(),
104        FileReplacePolicy::KeepExistingAndAdd => Vec::new(),
105    }
106}
107
108fn validate_reconcile_inputs(
109    policy: FileReplacePolicy,
110    deposition: &Deposition,
111    uploaded_filenames: &std::collections::BTreeSet<String>,
112) -> Result<(), ZenodoError> {
113    if policy != FileReplacePolicy::KeepExistingAndAdd {
114        return Ok(());
115    }
116
117    if let Some(filename) = deposition
118        .files
119        .iter()
120        .map(|file| &file.filename)
121        .find(|filename| uploaded_filenames.contains(*filename))
122    {
123        return Err(ZenodoError::ConflictingDraftFile {
124            filename: filename.clone(),
125        });
126    }
127
128    Ok(())
129}
130
131impl ZenodoClient {
132    /// Enters edit mode for the current published version without versioning.
133    ///
134    /// Unpublished depositions are reused directly. Published depositions
135    /// trigger `edit` and then wait until the current deposition becomes
136    /// editable again.
137    ///
138    /// # Examples
139    ///
140    /// ```no_run
141    /// use zenodo_rs::{Auth, DepositionId, ZenodoClient};
142    ///
143    /// #[tokio::main]
144    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
145    ///     let client = ZenodoClient::new(Auth::new("token"))?;
146    ///     let draft = client.enter_edit_mode(DepositionId(42)).await?;
147    ///     let _ = draft.id;
148    ///     Ok(())
149    /// }
150    /// ```
151    ///
152    /// # Errors
153    ///
154    /// Returns an error if the deposition lookup fails, if Zenodo rejects
155    /// edit mode, or if the draft never becomes editable.
156    pub async fn enter_edit_mode(&self, id: DepositionId) -> Result<Deposition, ZenodoError> {
157        let deposition = self.get_deposition(id).await?;
158        if !deposition.is_published() {
159            return Ok(deposition);
160        }
161
162        let edited = self.edit(id).await?;
163        if deposition_allows_metadata_edits(&edited) {
164            return Ok(edited);
165        }
166
167        self.poll_until("edit mode", || async move {
168            let deposition = self.get_deposition(id).await?;
169            if deposition_allows_metadata_edits(&deposition) {
170                return Ok(Some(deposition));
171            }
172
173            if let Some(latest_draft) = deposition.latest_draft_url().cloned() {
174                if deposition.links.self_.as_ref() == Some(&latest_draft) {
175                    return Ok(None);
176                }
177
178                match self.get_deposition_by_url(&latest_draft).await {
179                    Ok(draft) if deposition_allows_metadata_edits(&draft) => Ok(Some(draft)),
180                    Ok(_) => Ok(None),
181                    Err(error) if retryable_error(&error) => Ok(None),
182                    Err(error) => Err(error),
183                }
184            } else {
185                Ok(None)
186            }
187        })
188        .await
189    }
190
191    /// Returns an editable draft for the given deposition ID.
192    ///
193    /// Unpublished depositions are reused directly. Published depositions are
194    /// first resolved to the latest published version and then trigger
195    /// `newversion`, after which the helper follows `latest_draft`.
196    ///
197    /// # Examples
198    ///
199    /// ```no_run
200    /// use zenodo_rs::{Auth, DepositionId, ZenodoClient};
201    ///
202    /// #[tokio::main]
203    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
204    ///     let client = ZenodoClient::new(Auth::new("token"))?;
205    ///     let draft = client.ensure_editable_draft(DepositionId(42)).await?;
206    ///     let _ = draft.id;
207    ///     Ok(())
208    /// }
209    /// ```
210    ///
211    /// # Errors
212    ///
213    /// Returns an error if the deposition lookup fails, if Zenodo rejects
214    /// version creation, or if the resulting draft never becomes available.
215    pub async fn ensure_editable_draft(&self, id: DepositionId) -> Result<Deposition, ZenodoError> {
216        let deposition = self.get_deposition(id).await?;
217        match editable_draft_action(&deposition) {
218            EditableDraftAction::ReuseExisting => Ok(deposition),
219            EditableDraftAction::CreateNewVersion => {
220                let latest_published = self
221                    .latest_published_deposition_for_new_version(deposition)
222                    .await?;
223                let latest = self.new_version(latest_published.id).await?;
224                let latest_draft = latest
225                    .latest_draft_url()
226                    .cloned()
227                    .ok_or(ZenodoError::MissingLink("latest_draft"))?;
228                self.wait_for_deposition_url(&latest_draft, "latest draft")
229                    .await
230            }
231        }
232    }
233
234    /// Replaces all currently visible draft files with the provided uploads.
235    ///
236    /// # Errors
237    ///
238    /// Returns an error if the draft cannot be refreshed, if the bucket link is
239    /// missing, if file deletion fails, or if any upload fails.
240    pub async fn replace_all_files<I>(
241        &self,
242        draft: &Deposition,
243        files: I,
244    ) -> Result<Vec<crate::model::BucketObject>, ZenodoError>
245    where
246        I: IntoIterator<Item = UploadSpec>,
247    {
248        self.reconcile_files(draft, FileReplacePolicy::ReplaceAll, files)
249            .await
250    }
251
252    /// Reconciles draft files using the requested replacement policy.
253    ///
254    /// `ReplaceAll` deletes all currently visible draft files before upload.
255    /// `UpsertByFilename` deletes only draft files whose filename matches one
256    /// of the new uploads. `KeepExistingAndAdd` leaves all existing files in
257    /// place and uploads additional files alongside them.
258    ///
259    /// # Examples
260    ///
261    /// ```no_run
262    /// use zenodo_rs::{Auth, DepositionId, FileReplacePolicy, UploadSpec, ZenodoClient};
263    ///
264    /// #[tokio::main]
265    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
266    ///     let client = ZenodoClient::new(Auth::new("token"))?;
267    ///     let draft = client.ensure_editable_draft(DepositionId(42)).await?;
268    ///     client
269    ///         .reconcile_files(
270    ///             &draft,
271    ///             FileReplacePolicy::UpsertByFilename,
272    ///             vec![UploadSpec::from_path("artifact.tar.gz")?],
273    ///         )
274    ///         .await?;
275    ///     Ok(())
276    /// }
277    /// ```
278    ///
279    /// # Errors
280    ///
281    /// Returns an error if the draft cannot be refreshed, if the bucket link is
282    /// missing, if duplicate upload filenames are provided, if a keep-existing
283    /// upload would overwrite an existing draft filename, if file deletion
284    /// fails, or if any upload fails.
285    pub async fn reconcile_files<I>(
286        &self,
287        draft: &Deposition,
288        policy: FileReplacePolicy,
289        files: I,
290    ) -> Result<Vec<crate::model::BucketObject>, ZenodoError>
291    where
292        I: IntoIterator<Item = UploadSpec>,
293    {
294        self.reconcile_files_with_progress(draft, policy, files, ())
295            .await
296    }
297
298    /// Reconciles a draft's visible files and reports aggregate upload progress.
299    ///
300    /// The supplied progress sink receives the sum of all upload content
301    /// lengths before the first upload starts and one `advance` event as each
302    /// upload streams bytes into the request body.
303    ///
304    /// # Errors
305    ///
306    /// Returns an error if the draft cannot be refreshed, if the bucket link is
307    /// missing, if duplicate upload filenames are provided, if a keep-existing
308    /// upload would overwrite an existing draft filename, if reading a source
309    /// path length fails, if file deletion fails, or if any upload fails.
310    pub async fn reconcile_files_with_progress<I, P>(
311        &self,
312        draft: &Deposition,
313        policy: FileReplacePolicy,
314        files: I,
315        progress: P,
316    ) -> Result<Vec<crate::model::BucketObject>, ZenodoError>
317    where
318        I: IntoIterator<Item = UploadSpec>,
319        P: TransferProgress + 'static,
320    {
321        let files: Vec<_> = files.into_iter().collect();
322        let refreshed = self.get_deposition(draft.id).await?;
323        let bucket = refreshed
324            .bucket_url()
325            .cloned()
326            .ok_or(ZenodoError::MissingLink("bucket"))?;
327        let uploaded_filenames =
328            collect_upload_filenames(files.iter()).map_err(ZenodoError::from)?;
329        validate_reconcile_inputs(policy, &refreshed, &uploaded_filenames)?;
330
331        for file_id in file_ids_to_delete(policy, &refreshed, &uploaded_filenames) {
332            self.delete_file(refreshed.id, file_id).await?;
333        }
334
335        let total_bytes = files.iter().try_fold(0_u64, |total, spec| {
336            Ok::<u64, std::io::Error>(total + spec.content_length()?)
337        })?;
338        let progress = Arc::new(progress);
339        progress.begin(Some(total_bytes));
340        let mut uploaded = Vec::new();
341        for spec in files {
342            uploaded.push(
343                self.upload_spec_with_progress(
344                    &bucket,
345                    spec,
346                    ForwardOnlyProgress(Arc::clone(&progress)),
347                )
348                .await?,
349            );
350        }
351        progress.finish();
352
353        Ok(uploaded)
354    }
355
356    /// Runs the full publish workflow for a deposition.
357    ///
358    /// # Errors
359    ///
360    /// Returns an error if any draft lookup, metadata update, file upload,
361    /// publish step, or final record lookup fails.
362    pub async fn publish_dataset(
363        &self,
364        root: DepositionId,
365        metadata: &DepositMetadataUpdate,
366        files: impl IntoIterator<Item = UploadSpec>,
367    ) -> Result<PublishedRecord, ZenodoError> {
368        self.publish_dataset_with_policy(root, metadata, FileReplacePolicy::ReplaceAll, files)
369            .await
370    }
371
372    /// Runs the full publish workflow for a deposition using a file policy.
373    ///
374    /// # Examples
375    ///
376    /// ```no_run
377    /// use zenodo_rs::{
378    ///     AccessRight, Auth, Creator, DepositMetadataUpdate, DepositionId, FileReplacePolicy,
379    ///     UploadSpec, UploadType, ZenodoClient,
380    /// };
381    ///
382    /// #[tokio::main]
383    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
384    ///     let client = ZenodoClient::new(Auth::new("token"))?;
385    ///     let metadata = DepositMetadataUpdate::builder()
386    ///         .title("Example dataset")
387    ///         .upload_type(UploadType::Dataset)
388    ///         .description_html("<p>Example upload</p>")
389    ///         .creator(Creator::builder().name("Doe, Jane").build()?)
390    ///         .access_right(AccessRight::Open)
391    ///         .build()?;
392    ///
393    ///     let published = client
394    ///         .publish_dataset_with_policy(
395    ///             DepositionId(42),
396    ///             &metadata,
397    ///             FileReplacePolicy::KeepExistingAndAdd,
398    ///             vec![UploadSpec::from_path("artifact.tar.gz")?],
399    ///         )
400    ///         .await?;
401    ///     let _ = published.record.id;
402    ///     Ok(())
403    /// }
404    /// ```
405    ///
406    /// # Errors
407    ///
408    /// Returns an error if any draft lookup, metadata update, file upload,
409    /// publish step, duplicate/conflicting filename validation, or final
410    /// record lookup fails.
411    pub async fn publish_dataset_with_policy(
412        &self,
413        root: DepositionId,
414        metadata: &DepositMetadataUpdate,
415        policy: FileReplacePolicy,
416        files: impl IntoIterator<Item = UploadSpec>,
417    ) -> Result<PublishedRecord, ZenodoError> {
418        let draft = self.ensure_editable_draft(root).await?;
419        let draft = self.update_metadata(draft.id, metadata).await?;
420        self.reconcile_files(&draft, policy, files).await?;
421        let published = self.publish(draft.id).await?;
422        let published = self.wait_for_published_deposition(published.id).await?;
423        let record_id = published.record_id.ok_or_else(|| {
424            ZenodoError::InvalidState("published deposition is missing record_id".into())
425        })?;
426        let record = self.get_record(record_id).await?;
427
428        Ok(PublishedRecord {
429            deposition: published,
430            record,
431        })
432    }
433
434    /// Creates a fresh deposition and runs the full publish workflow.
435    ///
436    /// This is the ergonomic entrypoint for "publish a new dataset now"
437    /// automation that does not already have a deposition ID.
438    ///
439    /// # Examples
440    ///
441    /// ```no_run
442    /// use zenodo_rs::{
443    ///     AccessRight, Auth, DepositMetadataUpdate, UploadSpec, UploadType, ZenodoClient,
444    /// };
445    ///
446    /// #[tokio::main]
447    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
448    ///     let client = ZenodoClient::new(Auth::new("token"))?;
449    ///     let metadata = DepositMetadataUpdate::builder()
450    ///         .title("Example dataset")
451    ///         .upload_type(UploadType::Dataset)
452    ///         .description_html("<p>Example upload</p>")
453    ///         .creator_named("Doe, Jane")
454    ///         .access_right(AccessRight::Open)
455    ///         .build()?;
456    ///
457    ///     let published = client
458    ///         .create_and_publish_dataset(
459    ///             &metadata,
460    ///             vec![UploadSpec::from_path_as(
461    ///                 "target/release.tar.gz",
462    ///                 "archive.tar.gz",
463    ///             )?],
464    ///         )
465    ///         .await?;
466    ///     let _ = published.record.id;
467    ///     Ok(())
468    /// }
469    /// ```
470    ///
471    /// # Errors
472    ///
473    /// Returns an error if deposition creation fails or if any later metadata
474    /// update, file upload, publish step, or final record lookup fails.
475    pub async fn create_and_publish_dataset(
476        &self,
477        metadata: &DepositMetadataUpdate,
478        files: impl IntoIterator<Item = UploadSpec>,
479    ) -> Result<PublishedRecord, ZenodoError> {
480        self.create_and_publish_dataset_with_policy(metadata, FileReplacePolicy::ReplaceAll, files)
481            .await
482    }
483
484    /// Creates a fresh deposition and runs the full publish workflow with a file policy.
485    ///
486    /// # Errors
487    ///
488    /// Returns an error if deposition creation fails or if any later metadata
489    /// update, file upload, publish step, duplicate/conflicting filename
490    /// validation, or final record lookup fails.
491    pub async fn create_and_publish_dataset_with_policy(
492        &self,
493        metadata: &DepositMetadataUpdate,
494        policy: FileReplacePolicy,
495        files: impl IntoIterator<Item = UploadSpec>,
496    ) -> Result<PublishedRecord, ZenodoError> {
497        let draft = self.create_deposition().await?;
498        self.publish_dataset_with_policy(draft.id, metadata, policy, files)
499            .await
500    }
501
502    async fn latest_published_deposition_for_new_version(
503        &self,
504        deposition: Deposition,
505    ) -> Result<Deposition, ZenodoError> {
506        if !deposition.is_published() {
507            return Ok(deposition);
508        }
509
510        if let Some(latest_url) = deposition.links.latest.as_ref() {
511            let self_url = deposition.links.self_.as_ref();
512            if self_url != Some(latest_url) {
513                return self
514                    .resolve_latest_published_deposition_url(latest_url)
515                    .await;
516            }
517        }
518
519        if let Some(record_id) = deposition.record_id {
520            let latest_record = self.resolve_latest_version(record_id).await?;
521            if latest_record.id.0 != deposition.id.0 {
522                return self.get_deposition(DepositionId(latest_record.id.0)).await;
523            }
524        }
525
526        Ok(deposition)
527    }
528
529    async fn resolve_latest_published_deposition_url(
530        &self,
531        url: &Url,
532    ) -> Result<Deposition, ZenodoError> {
533        if url.path().contains("/api/deposit/depositions/") {
534            return self.get_deposition_by_url(url).await;
535        }
536
537        if url.path().contains("/api/records/") {
538            let record = self.get_record_by_url(url).await?;
539            return self.get_deposition(DepositionId(record.id.0)).await;
540        }
541
542        Err(ZenodoError::InvalidState(format!(
543            "unsupported latest published deposition link: {url}"
544        )))
545    }
546
547    async fn upload_spec_with_progress<P>(
548        &self,
549        bucket: &crate::ids::BucketUrl,
550        spec: UploadSpec,
551        progress: P,
552    ) -> Result<crate::model::BucketObject, ZenodoError>
553    where
554        P: TransferProgress + 'static,
555    {
556        match spec.source {
557            UploadSource::Path(path) => {
558                self.upload_path_with_content_type_and_progress(
559                    bucket,
560                    &spec.filename,
561                    &path,
562                    spec.content_type,
563                    progress,
564                )
565                .await
566            }
567            UploadSource::Reader {
568                reader,
569                content_length,
570            } => {
571                self.upload_reader_with_progress(
572                    bucket,
573                    &spec.filename,
574                    reader,
575                    content_length,
576                    spec.content_type,
577                    progress,
578                )
579                .await
580            }
581        }
582    }
583
584    async fn wait_for_published_deposition(
585        &self,
586        id: DepositionId,
587    ) -> Result<Deposition, ZenodoError> {
588        self.poll_until("publication", || async move {
589            let deposition = self.get_deposition(id).await?;
590            if deposition.is_published() {
591                Ok(Some(deposition))
592            } else {
593                Ok(None)
594            }
595        })
596        .await
597    }
598
599    async fn wait_for_deposition_url(
600        &self,
601        url: &Url,
602        label: &'static str,
603    ) -> Result<Deposition, ZenodoError> {
604        let url = url.clone();
605        self.poll_until(label, || {
606            let url = url.clone();
607            async move {
608                match self.get_deposition_by_url(&url).await {
609                    Ok(deposition) => Ok(Some(deposition)),
610                    Err(error) if retryable_error(&error) => Ok(None),
611                    Err(error) => Err(error),
612                }
613            }
614        })
615        .await
616    }
617
618    async fn poll_until<F, Fut, T>(
619        &self,
620        label: &'static str,
621        mut attempt: F,
622    ) -> Result<T, ZenodoError>
623    where
624        F: FnMut() -> Fut,
625        Fut: std::future::Future<Output = Result<Option<T>, ZenodoError>>,
626    {
627        let started = Instant::now();
628        let mut delay = self.poll.initial_delay;
629
630        loop {
631            if let Some(value) = attempt().await? {
632                return Ok(value);
633            }
634
635            let elapsed = started.elapsed();
636            if elapsed >= self.poll.max_wait {
637                return Err(ZenodoError::Timeout(label));
638            }
639
640            let remaining = self.poll.max_wait.saturating_sub(elapsed);
641            sleep(std::cmp::min(delay, remaining)).await;
642            delay = std::cmp::min(delay.saturating_mul(2), self.poll.max_delay);
643        }
644    }
645}
646
647fn retryable_error(error: &ZenodoError) -> bool {
648    match error {
649        ZenodoError::Http { status, .. } => {
650            *status == reqwest::StatusCode::CONFLICT
651                || *status == reqwest::StatusCode::TOO_MANY_REQUESTS
652                || status.is_server_error()
653        }
654        ZenodoError::Transport(_) => true,
655        _ => false,
656    }
657}
658
659#[cfg(test)]
660mod tests {
661    use std::time::{Duration, Instant};
662
663    use client_uploader_traits::collect_upload_filenames;
664
665    use super::{
666        editable_draft_action, file_ids_to_delete, retryable_error, validate_reconcile_inputs,
667        EditableDraftAction,
668    };
669    use crate::client::{Auth, ZenodoClient};
670    use crate::error::ZenodoError;
671    use crate::model::Deposition;
672    use crate::upload::{FileReplacePolicy, UploadSpec};
673    use crate::{Endpoint, PollOptions};
674    use axum::routing::get;
675    use axum::{Json, Router};
676    use tokio::net::TcpListener;
677    use url::Url;
678
679    #[test]
680    fn unpublished_deposition_reuses_current_draft() {
681        let deposition: Deposition = serde_json::from_value(serde_json::json!({
682            "id": 1,
683            "submitted": false,
684            "state": "inprogress",
685            "metadata": {},
686            "files": [],
687            "links": {}
688        }))
689        .unwrap();
690
691        assert_eq!(
692            editable_draft_action(&deposition),
693            EditableDraftAction::ReuseExisting
694        );
695    }
696
697    #[test]
698    fn published_deposition_requires_new_version() {
699        let deposition: Deposition = serde_json::from_value(serde_json::json!({
700            "id": 1,
701            "submitted": true,
702            "state": "done",
703            "metadata": {},
704            "files": [],
705            "links": {}
706        }))
707        .unwrap();
708
709        assert_eq!(
710            editable_draft_action(&deposition),
711            EditableDraftAction::CreateNewVersion
712        );
713    }
714
715    #[test]
716    fn replace_all_deletes_existing_files_first() {
717        let deposition: Deposition = serde_json::from_value(serde_json::json!({
718            "id": 1,
719            "submitted": false,
720            "state": "inprogress",
721            "metadata": {},
722            "files": [
723                { "id": "a", "filename": "one.txt", "filesize": 1 },
724                { "id": "b", "filename": "two.txt", "filesize": 2 }
725            ],
726            "links": {}
727        }))
728        .unwrap();
729
730        let ids = file_ids_to_delete(
731            FileReplacePolicy::ReplaceAll,
732            &deposition,
733            &std::collections::BTreeSet::new(),
734        );
735        assert_eq!(ids.len(), 2);
736        assert_eq!(ids[0].0, "a");
737        assert_eq!(ids[1].0, "b");
738    }
739
740    #[test]
741    fn upsert_by_filename_only_deletes_matching_files() {
742        let deposition: Deposition = serde_json::from_value(serde_json::json!({
743            "id": 1,
744            "submitted": false,
745            "state": "inprogress",
746            "metadata": {},
747            "files": [
748                { "id": "a", "filename": "one.txt", "filesize": 1 },
749                { "id": "b", "filename": "two.txt", "filesize": 1 }
750            ],
751            "links": {}
752        }))
753        .unwrap();
754
755        let uploaded =
756            std::collections::BTreeSet::from(["two.txt".to_owned(), "three.txt".to_owned()]);
757
758        let ids = file_ids_to_delete(FileReplacePolicy::UpsertByFilename, &deposition, &uploaded);
759        assert_eq!(ids.len(), 1);
760        assert_eq!(ids[0].0, "b");
761    }
762
763    #[test]
764    fn keep_existing_and_add_does_not_delete_existing_files() {
765        let deposition: Deposition = serde_json::from_value(serde_json::json!({
766            "id": 1,
767            "submitted": false,
768            "state": "inprogress",
769            "metadata": {},
770            "files": [
771                { "id": "a", "filename": "one.txt", "filesize": 1 }
772            ],
773            "links": {}
774        }))
775        .unwrap();
776
777        let uploaded = std::collections::BTreeSet::from(["one.txt".to_owned()]);
778        assert!(file_ids_to_delete(
779            FileReplacePolicy::KeepExistingAndAdd,
780            &deposition,
781            &uploaded
782        )
783        .is_empty());
784    }
785
786    #[test]
787    fn duplicate_uploaded_filenames_are_rejected() {
788        let files = [
789            UploadSpec::from_reader(
790                "artifact.bin",
791                std::io::Cursor::new(vec![1_u8]),
792                1,
793                mime::APPLICATION_OCTET_STREAM,
794            ),
795            UploadSpec::from_reader(
796                "artifact.bin",
797                std::io::Cursor::new(vec![2_u8]),
798                1,
799                mime::APPLICATION_OCTET_STREAM,
800            ),
801        ];
802
803        let error = collect_upload_filenames(files.iter())
804            .map_err(ZenodoError::from)
805            .unwrap_err();
806        assert!(matches!(
807            error,
808            ZenodoError::DuplicateUploadFilename { filename } if filename == "artifact.bin"
809        ));
810    }
811
812    #[test]
813    fn keep_existing_and_add_rejects_existing_filename_collisions() {
814        let deposition: Deposition = serde_json::from_value(serde_json::json!({
815            "id": 1,
816            "submitted": false,
817            "state": "inprogress",
818            "metadata": {},
819            "files": [{ "id": "stale", "filename": "artifact.bin" }],
820            "links": {}
821        }))
822        .unwrap();
823        let uploaded_filenames = ["artifact.bin".to_owned()].into_iter().collect();
824
825        let error = validate_reconcile_inputs(
826            FileReplacePolicy::KeepExistingAndAdd,
827            &deposition,
828            &uploaded_filenames,
829        )
830        .unwrap_err();
831        assert!(matches!(
832            error,
833            ZenodoError::ConflictingDraftFile { filename } if filename == "artifact.bin"
834        ));
835    }
836
837    #[test]
838    fn empty_uploaded_filenames_are_rejected() {
839        let files = [UploadSpec::from_reader(
840            "",
841            std::io::Cursor::new(vec![1_u8]),
842            1,
843            mime::APPLICATION_OCTET_STREAM,
844        )];
845
846        let error = collect_upload_filenames(files.iter())
847            .map_err(ZenodoError::from)
848            .unwrap_err();
849        assert!(
850            matches!(error, ZenodoError::InvalidState(message) if message == "upload filename cannot be empty")
851        );
852    }
853
854    #[test]
855    fn retryable_error_matches_retryable_http_statuses() {
856        let conflict = ZenodoError::Http {
857            status: reqwest::StatusCode::CONFLICT,
858            message: None,
859            field_errors: Vec::new(),
860            raw_body: None,
861        };
862        let bad_request = ZenodoError::Http {
863            status: reqwest::StatusCode::BAD_REQUEST,
864            message: None,
865            field_errors: Vec::new(),
866            raw_body: None,
867        };
868
869        assert!(retryable_error(&conflict));
870        assert!(!retryable_error(&bad_request));
871    }
872
873    #[tokio::test]
874    async fn retryable_error_treats_transport_errors_as_retryable() {
875        crate::client::ensure_rustls_provider();
876
877        let error = reqwest::Client::new()
878            .get("http://127.0.0.1:9")
879            .send()
880            .await
881            .unwrap_err();
882        assert!(retryable_error(&ZenodoError::Transport(error)));
883        assert!(!retryable_error(&ZenodoError::Io(std::io::Error::other(
884            "io"
885        ))));
886    }
887
888    #[tokio::test]
889    async fn poll_until_does_not_sleep_past_max_wait() {
890        let client = ZenodoClient::builder(Auth::new("token"))
891            .endpoint(Endpoint::Production)
892            .poll_options(PollOptions {
893                max_wait: Duration::from_millis(20),
894                initial_delay: Duration::from_millis(100),
895                max_delay: Duration::from_millis(100),
896            })
897            .build()
898            .unwrap();
899        let started = Instant::now();
900
901        let error = client
902            .poll_until("probe", || async { Ok::<Option<()>, ZenodoError>(None) })
903            .await
904            .unwrap_err();
905
906        assert!(matches!(error, ZenodoError::Timeout("probe")));
907        assert!(started.elapsed() < Duration::from_millis(80));
908    }
909
910    #[tokio::test]
911    async fn latest_published_deposition_short_circuits_when_resolution_is_not_needed() {
912        let client = ZenodoClient::new(Auth::new("token")).unwrap();
913        let unpublished: Deposition = serde_json::from_value(serde_json::json!({
914            "id": 10,
915            "submitted": false,
916            "state": "inprogress",
917            "metadata": {},
918            "files": [],
919            "links": {}
920        }))
921        .unwrap();
922        let already_latest: Deposition = serde_json::from_value(serde_json::json!({
923            "id": 11,
924            "submitted": true,
925            "state": "done",
926            "metadata": {},
927            "files": [],
928            "links": {
929                "self": "https://zenodo.example/api/deposit/depositions/11",
930                "latest": "https://zenodo.example/api/deposit/depositions/11"
931            }
932        }))
933        .unwrap();
934
935        assert_eq!(
936            client
937                .latest_published_deposition_for_new_version(unpublished.clone())
938                .await
939                .unwrap()
940                .id,
941            unpublished.id
942        );
943        assert_eq!(
944            client
945                .latest_published_deposition_for_new_version(already_latest.clone())
946                .await
947                .unwrap()
948                .id,
949            already_latest.id
950        );
951    }
952
953    #[tokio::test]
954    async fn latest_published_deposition_url_resolves_record_links_and_rejects_unknown_paths() {
955        async fn record() -> Json<serde_json::Value> {
956            Json(serde_json::json!({
957                "id": 22,
958                "recid": "22",
959                "metadata": { "title": "record" },
960                "files": [],
961                "links": {}
962            }))
963        }
964
965        async fn deposition() -> Json<serde_json::Value> {
966            Json(serde_json::json!({
967                "id": 22,
968                "submitted": true,
969                "state": "done",
970                "metadata": {},
971                "files": [],
972                "links": {}
973            }))
974        }
975
976        let app = Router::new()
977            .route("/api/records/22", get(record))
978            .route("/api/deposit/depositions/22", get(deposition));
979        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
980        let addr = listener.local_addr().unwrap();
981        let server = tokio::spawn(async move {
982            axum::serve(listener, app).await.unwrap();
983        });
984
985        let client = ZenodoClient::builder(Auth::new("token"))
986            .endpoint(Endpoint::Custom(
987                Url::parse(&format!("http://{addr}/api/")).unwrap(),
988            ))
989            .build()
990            .unwrap();
991
992        let resolved = client
993            .resolve_latest_published_deposition_url(
994                &Url::parse(&format!("http://{addr}/api/records/22")).unwrap(),
995            )
996            .await
997            .unwrap();
998        assert_eq!(resolved.id.0, 22);
999
1000        let error = client
1001            .resolve_latest_published_deposition_url(
1002                &Url::parse(&format!("http://{addr}/something/else")).unwrap(),
1003            )
1004            .await
1005            .unwrap_err();
1006        assert!(
1007            matches!(error, ZenodoError::InvalidState(message) if message.contains("unsupported latest published deposition link"))
1008        );
1009
1010        server.abort();
1011    }
1012}