1use std::sync::Arc;
17use std::time::Instant;
18
19use client_uploader_traits::collect_upload_filenames;
20use tokio::time::sleep;
21use url::Url;
22
23use crate::client::ZenodoClient;
24use crate::error::ZenodoError;
25use crate::ids::DepositionId;
26use crate::metadata::DepositMetadataUpdate;
27use crate::model::{Deposition, PublishedRecord};
28use crate::progress::TransferProgress;
29use crate::upload::{FileReplacePolicy, UploadSource, UploadSpec};
30
31#[derive(Clone, Copy, Debug, PartialEq, Eq)]
33pub enum EditableDraftAction {
34 ReuseExisting,
36 CreateNewVersion,
38}
39
40#[must_use]
64pub fn editable_draft_action(deposition: &Deposition) -> EditableDraftAction {
65 if deposition.is_published() {
66 EditableDraftAction::CreateNewVersion
67 } else {
68 EditableDraftAction::ReuseExisting
69 }
70}
71
72fn deposition_allows_metadata_edits(deposition: &Deposition) -> bool {
73 deposition.allows_metadata_edits()
74}
75
76struct ForwardOnlyProgress<P>(P);
77
78impl<P> TransferProgress for ForwardOnlyProgress<P>
79where
80 P: TransferProgress,
81{
82 fn advance(&self, delta: u64) {
83 self.0.advance(delta);
84 }
85}
86
87pub(crate) fn file_ids_to_delete(
88 policy: FileReplacePolicy,
89 deposition: &Deposition,
90 uploaded_filenames: &std::collections::BTreeSet<String>,
91) -> Vec<crate::ids::DepositionFileId> {
92 match policy {
93 FileReplacePolicy::ReplaceAll => deposition
94 .files
95 .iter()
96 .map(|file| file.id.clone())
97 .collect(),
98 FileReplacePolicy::UpsertByFilename => deposition
99 .files
100 .iter()
101 .filter(|file| uploaded_filenames.contains(&file.filename))
102 .map(|file| file.id.clone())
103 .collect(),
104 FileReplacePolicy::KeepExistingAndAdd => Vec::new(),
105 }
106}
107
108fn validate_reconcile_inputs(
109 policy: FileReplacePolicy,
110 deposition: &Deposition,
111 uploaded_filenames: &std::collections::BTreeSet<String>,
112) -> Result<(), ZenodoError> {
113 if policy != FileReplacePolicy::KeepExistingAndAdd {
114 return Ok(());
115 }
116
117 if let Some(filename) = deposition
118 .files
119 .iter()
120 .map(|file| &file.filename)
121 .find(|filename| uploaded_filenames.contains(*filename))
122 {
123 return Err(ZenodoError::ConflictingDraftFile {
124 filename: filename.clone(),
125 });
126 }
127
128 Ok(())
129}
130
131impl ZenodoClient {
132 pub async fn enter_edit_mode(&self, id: DepositionId) -> Result<Deposition, ZenodoError> {
157 let deposition = self.get_deposition(id).await?;
158 if !deposition.is_published() {
159 return Ok(deposition);
160 }
161
162 let edited = self.edit(id).await?;
163 if deposition_allows_metadata_edits(&edited) {
164 return Ok(edited);
165 }
166
167 self.poll_until("edit mode", || async move {
168 let deposition = self.get_deposition(id).await?;
169 if deposition_allows_metadata_edits(&deposition) {
170 return Ok(Some(deposition));
171 }
172
173 if let Some(latest_draft) = deposition.latest_draft_url().cloned() {
174 if deposition.links.self_.as_ref() == Some(&latest_draft) {
175 return Ok(None);
176 }
177
178 match self.get_deposition_by_url(&latest_draft).await {
179 Ok(draft) if deposition_allows_metadata_edits(&draft) => Ok(Some(draft)),
180 Ok(_) => Ok(None),
181 Err(error) if retryable_error(&error) => Ok(None),
182 Err(error) => Err(error),
183 }
184 } else {
185 Ok(None)
186 }
187 })
188 .await
189 }
190
191 pub async fn ensure_editable_draft(&self, id: DepositionId) -> Result<Deposition, ZenodoError> {
216 let deposition = self.get_deposition(id).await?;
217 match editable_draft_action(&deposition) {
218 EditableDraftAction::ReuseExisting => Ok(deposition),
219 EditableDraftAction::CreateNewVersion => {
220 let latest_published = self
221 .latest_published_deposition_for_new_version(deposition)
222 .await?;
223 let latest = self.new_version(latest_published.id).await?;
224 let latest_draft = latest
225 .latest_draft_url()
226 .cloned()
227 .ok_or(ZenodoError::MissingLink("latest_draft"))?;
228 self.wait_for_deposition_url(&latest_draft, "latest draft")
229 .await
230 }
231 }
232 }
233
234 pub async fn replace_all_files<I>(
241 &self,
242 draft: &Deposition,
243 files: I,
244 ) -> Result<Vec<crate::model::BucketObject>, ZenodoError>
245 where
246 I: IntoIterator<Item = UploadSpec>,
247 {
248 self.reconcile_files(draft, FileReplacePolicy::ReplaceAll, files)
249 .await
250 }
251
252 pub async fn reconcile_files<I>(
286 &self,
287 draft: &Deposition,
288 policy: FileReplacePolicy,
289 files: I,
290 ) -> Result<Vec<crate::model::BucketObject>, ZenodoError>
291 where
292 I: IntoIterator<Item = UploadSpec>,
293 {
294 self.reconcile_files_with_progress(draft, policy, files, ())
295 .await
296 }
297
298 pub async fn reconcile_files_with_progress<I, P>(
311 &self,
312 draft: &Deposition,
313 policy: FileReplacePolicy,
314 files: I,
315 progress: P,
316 ) -> Result<Vec<crate::model::BucketObject>, ZenodoError>
317 where
318 I: IntoIterator<Item = UploadSpec>,
319 P: TransferProgress + 'static,
320 {
321 let files: Vec<_> = files.into_iter().collect();
322 let refreshed = self.get_deposition(draft.id).await?;
323 let bucket = refreshed
324 .bucket_url()
325 .cloned()
326 .ok_or(ZenodoError::MissingLink("bucket"))?;
327 let uploaded_filenames =
328 collect_upload_filenames(files.iter()).map_err(ZenodoError::from)?;
329 validate_reconcile_inputs(policy, &refreshed, &uploaded_filenames)?;
330
331 for file_id in file_ids_to_delete(policy, &refreshed, &uploaded_filenames) {
332 self.delete_file(refreshed.id, file_id).await?;
333 }
334
335 let total_bytes = files.iter().try_fold(0_u64, |total, spec| {
336 Ok::<u64, std::io::Error>(total + spec.content_length()?)
337 })?;
338 let progress = Arc::new(progress);
339 progress.begin(Some(total_bytes));
340 let mut uploaded = Vec::new();
341 for spec in files {
342 uploaded.push(
343 self.upload_spec_with_progress(
344 &bucket,
345 spec,
346 ForwardOnlyProgress(Arc::clone(&progress)),
347 )
348 .await?,
349 );
350 }
351 progress.finish();
352
353 Ok(uploaded)
354 }
355
356 pub async fn publish_dataset(
363 &self,
364 root: DepositionId,
365 metadata: &DepositMetadataUpdate,
366 files: impl IntoIterator<Item = UploadSpec>,
367 ) -> Result<PublishedRecord, ZenodoError> {
368 self.publish_dataset_with_policy(root, metadata, FileReplacePolicy::ReplaceAll, files)
369 .await
370 }
371
372 pub async fn publish_dataset_with_policy(
412 &self,
413 root: DepositionId,
414 metadata: &DepositMetadataUpdate,
415 policy: FileReplacePolicy,
416 files: impl IntoIterator<Item = UploadSpec>,
417 ) -> Result<PublishedRecord, ZenodoError> {
418 let draft = self.ensure_editable_draft(root).await?;
419 let draft = self.update_metadata(draft.id, metadata).await?;
420 self.reconcile_files(&draft, policy, files).await?;
421 let published = self.publish(draft.id).await?;
422 let published = self.wait_for_published_deposition(published.id).await?;
423 let record_id = published.record_id.ok_or_else(|| {
424 ZenodoError::InvalidState("published deposition is missing record_id".into())
425 })?;
426 let record = self.get_record(record_id).await?;
427
428 Ok(PublishedRecord {
429 deposition: published,
430 record,
431 })
432 }
433
434 pub async fn create_and_publish_dataset(
476 &self,
477 metadata: &DepositMetadataUpdate,
478 files: impl IntoIterator<Item = UploadSpec>,
479 ) -> Result<PublishedRecord, ZenodoError> {
480 self.create_and_publish_dataset_with_policy(metadata, FileReplacePolicy::ReplaceAll, files)
481 .await
482 }
483
484 pub async fn create_and_publish_dataset_with_policy(
492 &self,
493 metadata: &DepositMetadataUpdate,
494 policy: FileReplacePolicy,
495 files: impl IntoIterator<Item = UploadSpec>,
496 ) -> Result<PublishedRecord, ZenodoError> {
497 let draft = self.create_deposition().await?;
498 self.publish_dataset_with_policy(draft.id, metadata, policy, files)
499 .await
500 }
501
502 async fn latest_published_deposition_for_new_version(
503 &self,
504 deposition: Deposition,
505 ) -> Result<Deposition, ZenodoError> {
506 if !deposition.is_published() {
507 return Ok(deposition);
508 }
509
510 if let Some(latest_url) = deposition.links.latest.as_ref() {
511 let self_url = deposition.links.self_.as_ref();
512 if self_url != Some(latest_url) {
513 return self
514 .resolve_latest_published_deposition_url(latest_url)
515 .await;
516 }
517 }
518
519 if let Some(record_id) = deposition.record_id {
520 let latest_record = self.resolve_latest_version(record_id).await?;
521 if latest_record.id.0 != deposition.id.0 {
522 return self.get_deposition(DepositionId(latest_record.id.0)).await;
523 }
524 }
525
526 Ok(deposition)
527 }
528
529 async fn resolve_latest_published_deposition_url(
530 &self,
531 url: &Url,
532 ) -> Result<Deposition, ZenodoError> {
533 if url.path().contains("/api/deposit/depositions/") {
534 return self.get_deposition_by_url(url).await;
535 }
536
537 if url.path().contains("/api/records/") {
538 let record = self.get_record_by_url(url).await?;
539 return self.get_deposition(DepositionId(record.id.0)).await;
540 }
541
542 Err(ZenodoError::InvalidState(format!(
543 "unsupported latest published deposition link: {url}"
544 )))
545 }
546
547 async fn upload_spec_with_progress<P>(
548 &self,
549 bucket: &crate::ids::BucketUrl,
550 spec: UploadSpec,
551 progress: P,
552 ) -> Result<crate::model::BucketObject, ZenodoError>
553 where
554 P: TransferProgress + 'static,
555 {
556 match spec.source {
557 UploadSource::Path(path) => {
558 self.upload_path_with_content_type_and_progress(
559 bucket,
560 &spec.filename,
561 &path,
562 spec.content_type,
563 progress,
564 )
565 .await
566 }
567 UploadSource::Reader {
568 reader,
569 content_length,
570 } => {
571 self.upload_reader_with_progress(
572 bucket,
573 &spec.filename,
574 reader,
575 content_length,
576 spec.content_type,
577 progress,
578 )
579 .await
580 }
581 }
582 }
583
584 async fn wait_for_published_deposition(
585 &self,
586 id: DepositionId,
587 ) -> Result<Deposition, ZenodoError> {
588 self.poll_until("publication", || async move {
589 let deposition = self.get_deposition(id).await?;
590 if deposition.is_published() {
591 Ok(Some(deposition))
592 } else {
593 Ok(None)
594 }
595 })
596 .await
597 }
598
599 async fn wait_for_deposition_url(
600 &self,
601 url: &Url,
602 label: &'static str,
603 ) -> Result<Deposition, ZenodoError> {
604 let url = url.clone();
605 self.poll_until(label, || {
606 let url = url.clone();
607 async move {
608 match self.get_deposition_by_url(&url).await {
609 Ok(deposition) => Ok(Some(deposition)),
610 Err(error) if retryable_error(&error) => Ok(None),
611 Err(error) => Err(error),
612 }
613 }
614 })
615 .await
616 }
617
618 async fn poll_until<F, Fut, T>(
619 &self,
620 label: &'static str,
621 mut attempt: F,
622 ) -> Result<T, ZenodoError>
623 where
624 F: FnMut() -> Fut,
625 Fut: std::future::Future<Output = Result<Option<T>, ZenodoError>>,
626 {
627 let started = Instant::now();
628 let mut delay = self.poll.initial_delay;
629
630 loop {
631 if let Some(value) = attempt().await? {
632 return Ok(value);
633 }
634
635 let elapsed = started.elapsed();
636 if elapsed >= self.poll.max_wait {
637 return Err(ZenodoError::Timeout(label));
638 }
639
640 let remaining = self.poll.max_wait.saturating_sub(elapsed);
641 sleep(std::cmp::min(delay, remaining)).await;
642 delay = std::cmp::min(delay.saturating_mul(2), self.poll.max_delay);
643 }
644 }
645}
646
647fn retryable_error(error: &ZenodoError) -> bool {
648 match error {
649 ZenodoError::Http { status, .. } => {
650 *status == reqwest::StatusCode::CONFLICT
651 || *status == reqwest::StatusCode::TOO_MANY_REQUESTS
652 || status.is_server_error()
653 }
654 ZenodoError::Transport(_) => true,
655 _ => false,
656 }
657}
658
659#[cfg(test)]
660mod tests {
661 use std::time::{Duration, Instant};
662
663 use client_uploader_traits::collect_upload_filenames;
664
665 use super::{
666 editable_draft_action, file_ids_to_delete, retryable_error, validate_reconcile_inputs,
667 EditableDraftAction,
668 };
669 use crate::client::{Auth, ZenodoClient};
670 use crate::error::ZenodoError;
671 use crate::model::Deposition;
672 use crate::upload::{FileReplacePolicy, UploadSpec};
673 use crate::{Endpoint, PollOptions};
674 use axum::routing::get;
675 use axum::{Json, Router};
676 use tokio::net::TcpListener;
677 use url::Url;
678
679 #[test]
680 fn unpublished_deposition_reuses_current_draft() {
681 let deposition: Deposition = serde_json::from_value(serde_json::json!({
682 "id": 1,
683 "submitted": false,
684 "state": "inprogress",
685 "metadata": {},
686 "files": [],
687 "links": {}
688 }))
689 .unwrap();
690
691 assert_eq!(
692 editable_draft_action(&deposition),
693 EditableDraftAction::ReuseExisting
694 );
695 }
696
697 #[test]
698 fn published_deposition_requires_new_version() {
699 let deposition: Deposition = serde_json::from_value(serde_json::json!({
700 "id": 1,
701 "submitted": true,
702 "state": "done",
703 "metadata": {},
704 "files": [],
705 "links": {}
706 }))
707 .unwrap();
708
709 assert_eq!(
710 editable_draft_action(&deposition),
711 EditableDraftAction::CreateNewVersion
712 );
713 }
714
715 #[test]
716 fn replace_all_deletes_existing_files_first() {
717 let deposition: Deposition = serde_json::from_value(serde_json::json!({
718 "id": 1,
719 "submitted": false,
720 "state": "inprogress",
721 "metadata": {},
722 "files": [
723 { "id": "a", "filename": "one.txt", "filesize": 1 },
724 { "id": "b", "filename": "two.txt", "filesize": 2 }
725 ],
726 "links": {}
727 }))
728 .unwrap();
729
730 let ids = file_ids_to_delete(
731 FileReplacePolicy::ReplaceAll,
732 &deposition,
733 &std::collections::BTreeSet::new(),
734 );
735 assert_eq!(ids.len(), 2);
736 assert_eq!(ids[0].0, "a");
737 assert_eq!(ids[1].0, "b");
738 }
739
740 #[test]
741 fn upsert_by_filename_only_deletes_matching_files() {
742 let deposition: Deposition = serde_json::from_value(serde_json::json!({
743 "id": 1,
744 "submitted": false,
745 "state": "inprogress",
746 "metadata": {},
747 "files": [
748 { "id": "a", "filename": "one.txt", "filesize": 1 },
749 { "id": "b", "filename": "two.txt", "filesize": 1 }
750 ],
751 "links": {}
752 }))
753 .unwrap();
754
755 let uploaded =
756 std::collections::BTreeSet::from(["two.txt".to_owned(), "three.txt".to_owned()]);
757
758 let ids = file_ids_to_delete(FileReplacePolicy::UpsertByFilename, &deposition, &uploaded);
759 assert_eq!(ids.len(), 1);
760 assert_eq!(ids[0].0, "b");
761 }
762
763 #[test]
764 fn keep_existing_and_add_does_not_delete_existing_files() {
765 let deposition: Deposition = serde_json::from_value(serde_json::json!({
766 "id": 1,
767 "submitted": false,
768 "state": "inprogress",
769 "metadata": {},
770 "files": [
771 { "id": "a", "filename": "one.txt", "filesize": 1 }
772 ],
773 "links": {}
774 }))
775 .unwrap();
776
777 let uploaded = std::collections::BTreeSet::from(["one.txt".to_owned()]);
778 assert!(file_ids_to_delete(
779 FileReplacePolicy::KeepExistingAndAdd,
780 &deposition,
781 &uploaded
782 )
783 .is_empty());
784 }
785
786 #[test]
787 fn duplicate_uploaded_filenames_are_rejected() {
788 let files = [
789 UploadSpec::from_reader(
790 "artifact.bin",
791 std::io::Cursor::new(vec![1_u8]),
792 1,
793 mime::APPLICATION_OCTET_STREAM,
794 ),
795 UploadSpec::from_reader(
796 "artifact.bin",
797 std::io::Cursor::new(vec![2_u8]),
798 1,
799 mime::APPLICATION_OCTET_STREAM,
800 ),
801 ];
802
803 let error = collect_upload_filenames(files.iter())
804 .map_err(ZenodoError::from)
805 .unwrap_err();
806 assert!(matches!(
807 error,
808 ZenodoError::DuplicateUploadFilename { filename } if filename == "artifact.bin"
809 ));
810 }
811
812 #[test]
813 fn keep_existing_and_add_rejects_existing_filename_collisions() {
814 let deposition: Deposition = serde_json::from_value(serde_json::json!({
815 "id": 1,
816 "submitted": false,
817 "state": "inprogress",
818 "metadata": {},
819 "files": [{ "id": "stale", "filename": "artifact.bin" }],
820 "links": {}
821 }))
822 .unwrap();
823 let uploaded_filenames = ["artifact.bin".to_owned()].into_iter().collect();
824
825 let error = validate_reconcile_inputs(
826 FileReplacePolicy::KeepExistingAndAdd,
827 &deposition,
828 &uploaded_filenames,
829 )
830 .unwrap_err();
831 assert!(matches!(
832 error,
833 ZenodoError::ConflictingDraftFile { filename } if filename == "artifact.bin"
834 ));
835 }
836
837 #[test]
838 fn empty_uploaded_filenames_are_rejected() {
839 let files = [UploadSpec::from_reader(
840 "",
841 std::io::Cursor::new(vec![1_u8]),
842 1,
843 mime::APPLICATION_OCTET_STREAM,
844 )];
845
846 let error = collect_upload_filenames(files.iter())
847 .map_err(ZenodoError::from)
848 .unwrap_err();
849 assert!(
850 matches!(error, ZenodoError::InvalidState(message) if message == "upload filename cannot be empty")
851 );
852 }
853
854 #[test]
855 fn retryable_error_matches_retryable_http_statuses() {
856 let conflict = ZenodoError::Http {
857 status: reqwest::StatusCode::CONFLICT,
858 message: None,
859 field_errors: Vec::new(),
860 raw_body: None,
861 };
862 let bad_request = ZenodoError::Http {
863 status: reqwest::StatusCode::BAD_REQUEST,
864 message: None,
865 field_errors: Vec::new(),
866 raw_body: None,
867 };
868
869 assert!(retryable_error(&conflict));
870 assert!(!retryable_error(&bad_request));
871 }
872
873 #[tokio::test]
874 async fn retryable_error_treats_transport_errors_as_retryable() {
875 crate::client::ensure_rustls_provider();
876
877 let error = reqwest::Client::new()
878 .get("http://127.0.0.1:9")
879 .send()
880 .await
881 .unwrap_err();
882 assert!(retryable_error(&ZenodoError::Transport(error)));
883 assert!(!retryable_error(&ZenodoError::Io(std::io::Error::other(
884 "io"
885 ))));
886 }
887
888 #[tokio::test]
889 async fn poll_until_does_not_sleep_past_max_wait() {
890 let client = ZenodoClient::builder(Auth::new("token"))
891 .endpoint(Endpoint::Production)
892 .poll_options(PollOptions {
893 max_wait: Duration::from_millis(20),
894 initial_delay: Duration::from_millis(100),
895 max_delay: Duration::from_millis(100),
896 })
897 .build()
898 .unwrap();
899 let started = Instant::now();
900
901 let error = client
902 .poll_until("probe", || async { Ok::<Option<()>, ZenodoError>(None) })
903 .await
904 .unwrap_err();
905
906 assert!(matches!(error, ZenodoError::Timeout("probe")));
907 assert!(started.elapsed() < Duration::from_millis(80));
908 }
909
910 #[tokio::test]
911 async fn latest_published_deposition_short_circuits_when_resolution_is_not_needed() {
912 let client = ZenodoClient::new(Auth::new("token")).unwrap();
913 let unpublished: Deposition = serde_json::from_value(serde_json::json!({
914 "id": 10,
915 "submitted": false,
916 "state": "inprogress",
917 "metadata": {},
918 "files": [],
919 "links": {}
920 }))
921 .unwrap();
922 let already_latest: Deposition = serde_json::from_value(serde_json::json!({
923 "id": 11,
924 "submitted": true,
925 "state": "done",
926 "metadata": {},
927 "files": [],
928 "links": {
929 "self": "https://zenodo.example/api/deposit/depositions/11",
930 "latest": "https://zenodo.example/api/deposit/depositions/11"
931 }
932 }))
933 .unwrap();
934
935 assert_eq!(
936 client
937 .latest_published_deposition_for_new_version(unpublished.clone())
938 .await
939 .unwrap()
940 .id,
941 unpublished.id
942 );
943 assert_eq!(
944 client
945 .latest_published_deposition_for_new_version(already_latest.clone())
946 .await
947 .unwrap()
948 .id,
949 already_latest.id
950 );
951 }
952
953 #[tokio::test]
954 async fn latest_published_deposition_url_resolves_record_links_and_rejects_unknown_paths() {
955 async fn record() -> Json<serde_json::Value> {
956 Json(serde_json::json!({
957 "id": 22,
958 "recid": "22",
959 "metadata": { "title": "record" },
960 "files": [],
961 "links": {}
962 }))
963 }
964
965 async fn deposition() -> Json<serde_json::Value> {
966 Json(serde_json::json!({
967 "id": 22,
968 "submitted": true,
969 "state": "done",
970 "metadata": {},
971 "files": [],
972 "links": {}
973 }))
974 }
975
976 let app = Router::new()
977 .route("/api/records/22", get(record))
978 .route("/api/deposit/depositions/22", get(deposition));
979 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
980 let addr = listener.local_addr().unwrap();
981 let server = tokio::spawn(async move {
982 axum::serve(listener, app).await.unwrap();
983 });
984
985 let client = ZenodoClient::builder(Auth::new("token"))
986 .endpoint(Endpoint::Custom(
987 Url::parse(&format!("http://{addr}/api/")).unwrap(),
988 ))
989 .build()
990 .unwrap();
991
992 let resolved = client
993 .resolve_latest_published_deposition_url(
994 &Url::parse(&format!("http://{addr}/api/records/22")).unwrap(),
995 )
996 .await
997 .unwrap();
998 assert_eq!(resolved.id.0, 22);
999
1000 let error = client
1001 .resolve_latest_published_deposition_url(
1002 &Url::parse(&format!("http://{addr}/something/else")).unwrap(),
1003 )
1004 .await
1005 .unwrap_err();
1006 assert!(
1007 matches!(error, ZenodoError::InvalidState(message) if message.contains("unsupported latest published deposition link"))
1008 );
1009
1010 server.abort();
1011 }
1012}