1use std::path::Path;
4#[cfg(feature = "indicatif")]
5use std::pin::Pin;
6#[cfg(feature = "indicatif")]
7use std::task::{Context, Poll};
8use std::time::Duration;
9
10#[cfg(feature = "indicatif")]
11use futures_core::Stream;
12#[cfg(feature = "indicatif")]
13use indicatif::ProgressBar;
14use reqwest::header::{
15 HeaderMap, HeaderName, HeaderValue, ACCEPT, AUTHORIZATION, CONTENT_LENGTH, CONTENT_TYPE,
16 LOCATION,
17};
18use reqwest::{Method, StatusCode};
19use secrecy::{ExposeSecret, SecretString};
20use serde_json::Value;
21use tokio::fs::File;
22use tokio_util::io::ReaderStream;
23use url::Url;
24
25use crate::downloads::ResolvedDownload;
26use crate::endpoint::Endpoint;
27use crate::error::{decode_metadata_write_failure, InternetArchiveError};
28use crate::ids::SecretPair;
29use crate::metadata::{
30 merge_metadata_semantically, metadata_contains_projection, HeaderEncoding, ItemMetadata,
31 MetadataChange, MetadataTarget, PatchOperation,
32};
33use crate::model::{Item, MetadataWriteResponse, S3LimitCheck, SearchResponse, TaskSubmission};
34use crate::poll::PollOptions;
35use crate::search::SearchQuery;
36use crate::upload::{DeleteOptions, UploadOptions, UploadSource, UploadSpec};
37use crate::ItemIdentifier;
38
39#[derive(Clone)]
41pub struct Auth {
42 pub(crate) secrets: SecretPair,
43}
44
45impl Auth {
46 pub const ACCESS_KEY_ENV_VAR: &'static str = "INTERNET_ARCHIVE_ACCESS_KEY";
48 pub const SECRET_KEY_ENV_VAR: &'static str = "INTERNET_ARCHIVE_SECRET_KEY";
50
51 #[must_use]
53 pub fn new(access_key: impl Into<String>, secret_key: impl Into<String>) -> Self {
54 Self {
55 secrets: SecretPair {
56 access_key: SecretString::from(access_key.into()),
57 secret_key: SecretString::from(secret_key.into()),
58 },
59 }
60 }
61
62 pub fn from_env() -> Result<Self, InternetArchiveError> {
68 Self::from_env_vars(Self::ACCESS_KEY_ENV_VAR, Self::SECRET_KEY_ENV_VAR)
69 }
70
71 pub fn from_env_vars(
77 access_name: &str,
78 secret_name: &str,
79 ) -> Result<Self, InternetArchiveError> {
80 let access_key =
81 std::env::var(access_name).map_err(|source| InternetArchiveError::EnvVar {
82 name: access_name.to_owned(),
83 source,
84 })?;
85 let secret_key =
86 std::env::var(secret_name).map_err(|source| InternetArchiveError::EnvVar {
87 name: secret_name.to_owned(),
88 source,
89 })?;
90 Ok(Self::new(access_key, secret_key))
91 }
92
93 #[must_use]
94 pub(crate) fn authorization_header(&self) -> String {
95 format!(
96 "LOW {}:{}",
97 self.secrets.access_key.expose_secret(),
98 self.secrets.secret_key.expose_secret()
99 )
100 }
101}
102
103impl std::fmt::Debug for Auth {
104 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105 f.debug_struct("Auth")
106 .field("access_key", &"<redacted>")
107 .field("secret_key", &"<redacted>")
108 .finish()
109 }
110}
111
112#[derive(Clone, Debug)]
114pub struct InternetArchiveClientBuilder {
115 auth: Option<Auth>,
116 endpoint: Endpoint,
117 poll: PollOptions,
118 user_agent: Option<String>,
119 request_timeout: Option<Duration>,
120 connect_timeout: Option<Duration>,
121}
122
123impl InternetArchiveClientBuilder {
124 #[must_use]
126 pub fn auth(mut self, auth: Auth) -> Self {
127 self.auth = Some(auth);
128 self
129 }
130
131 #[must_use]
133 pub fn endpoint(mut self, endpoint: Endpoint) -> Self {
134 self.endpoint = endpoint;
135 self
136 }
137
138 #[must_use]
140 pub fn user_agent(mut self, user_agent: impl Into<String>) -> Self {
141 self.user_agent = Some(user_agent.into());
142 self
143 }
144
145 #[must_use]
147 pub fn request_timeout(mut self, timeout: Duration) -> Self {
148 self.request_timeout = Some(timeout);
149 self
150 }
151
152 #[must_use]
154 pub fn connect_timeout(mut self, timeout: Duration) -> Self {
155 self.connect_timeout = Some(timeout);
156 self
157 }
158
159 #[must_use]
161 pub fn poll_options(mut self, poll: PollOptions) -> Self {
162 self.poll = poll;
163 self
164 }
165
166 pub fn build(self) -> Result<InternetArchiveClient, InternetArchiveError> {
172 let user_agent = self
173 .user_agent
174 .unwrap_or_else(|| format!("{}/{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION")));
175
176 let build_client =
177 |redirects_enabled: bool| -> Result<reqwest::Client, InternetArchiveError> {
178 let mut builder = reqwest::Client::builder().user_agent(&user_agent);
179 if let Some(timeout) = self.request_timeout {
180 builder = builder.timeout(timeout);
181 }
182 if let Some(timeout) = self.connect_timeout {
183 builder = builder.connect_timeout(timeout);
184 }
185 if !redirects_enabled {
186 builder = builder.redirect(reqwest::redirect::Policy::none());
187 }
188 builder.build().map_err(Into::into)
189 };
190
191 Ok(InternetArchiveClient {
192 inner: build_client(true)?,
193 s3_inner: build_client(false)?,
194 auth: self.auth,
195 endpoint: self.endpoint,
196 poll: self.poll,
197 request_timeout: self.request_timeout,
198 connect_timeout: self.connect_timeout,
199 })
200 }
201}
202
203#[derive(Clone, Debug)]
205pub struct InternetArchiveClient {
206 pub(crate) inner: reqwest::Client,
207 pub(crate) s3_inner: reqwest::Client,
208 pub(crate) auth: Option<Auth>,
209 pub(crate) endpoint: Endpoint,
210 pub(crate) poll: PollOptions,
211 pub(crate) request_timeout: Option<Duration>,
212 pub(crate) connect_timeout: Option<Duration>,
213}
214
215impl InternetArchiveClient {
216 #[must_use]
218 pub fn builder() -> InternetArchiveClientBuilder {
219 InternetArchiveClientBuilder {
220 auth: None,
221 endpoint: Endpoint::default(),
222 poll: PollOptions::default(),
223 user_agent: None,
224 request_timeout: None,
225 connect_timeout: None,
226 }
227 }
228
229 pub fn new() -> Result<Self, InternetArchiveError> {
235 Self::builder().build()
236 }
237
238 pub fn with_auth(auth: Auth) -> Result<Self, InternetArchiveError> {
244 Self::builder().auth(auth).build()
245 }
246
247 pub fn from_env() -> Result<Self, InternetArchiveError> {
254 Self::with_auth(Auth::from_env()?)
255 }
256
257 #[must_use]
259 pub fn endpoint(&self) -> &Endpoint {
260 &self.endpoint
261 }
262
263 #[must_use]
265 pub fn poll_options(&self) -> &PollOptions {
266 &self.poll
267 }
268
269 #[must_use]
271 pub fn request_timeout(&self) -> Option<Duration> {
272 self.request_timeout
273 }
274
275 #[must_use]
277 pub fn connect_timeout(&self) -> Option<Duration> {
278 self.connect_timeout
279 }
280
281 #[must_use]
283 pub fn has_auth(&self) -> bool {
284 self.auth.is_some()
285 }
286
287 pub async fn get_item(
293 &self,
294 identifier: &ItemIdentifier,
295 ) -> Result<Item, InternetArchiveError> {
296 let url = self.endpoint.metadata_url(identifier.as_str())?;
297 let bytes = self
298 .execute_bytes(
299 self.archive_request(Method::GET, url)
300 .header(ACCEPT, "application/json"),
301 )
302 .await?;
303
304 if bytes.iter().all(u8::is_ascii_whitespace) || bytes.as_ref() == b"[]" {
305 return Err(InternetArchiveError::ItemNotFound {
306 identifier: identifier.to_string(),
307 });
308 }
309
310 let item: Item = serde_json::from_slice(&bytes)?;
311 if item.identifier().as_ref() != Some(identifier) {
312 return Err(InternetArchiveError::ItemNotFound {
313 identifier: identifier.to_string(),
314 });
315 }
316
317 Ok(item)
318 }
319
320 pub async fn get_item_by_str(
327 &self,
328 identifier: impl AsRef<str>,
329 ) -> Result<Item, InternetArchiveError> {
330 let identifier = ItemIdentifier::new(identifier.as_ref())?;
331 self.get_item(&identifier).await
332 }
333
334 pub async fn search(
340 &self,
341 query: &SearchQuery,
342 ) -> Result<SearchResponse, InternetArchiveError> {
343 let url = query.into_url(self.endpoint.search_url()?)?;
344 let response = self
345 .archive_request(Method::GET, url)
346 .header(ACCEPT, "application/json")
347 .send()
348 .await?;
349 if !response.status().is_success() {
350 return Err(InternetArchiveError::from_response(response).await);
351 }
352
353 let bytes = response.bytes().await?;
354 decode_search_response(&bytes)
355 }
356
357 pub async fn check_upload_limit(
364 &self,
365 identifier: &ItemIdentifier,
366 ) -> Result<S3LimitCheck, InternetArchiveError> {
367 let auth = self
368 .auth
369 .as_ref()
370 .ok_or(InternetArchiveError::MissingAuth)?;
371 let url = self
372 .endpoint
373 .s3_limit_check_url(auth.secrets.access_key.expose_secret(), identifier.as_str())?;
374 self.execute_json(self.s3_request(Method::GET, url, HeaderMap::new())?)
375 .await
376 }
377
378 pub async fn apply_metadata_patch(
385 &self,
386 identifier: &ItemIdentifier,
387 target: MetadataTarget,
388 patch: &[PatchOperation],
389 ) -> Result<MetadataWriteResponse, InternetArchiveError> {
390 if self.auth.is_none() {
391 return Err(InternetArchiveError::MissingAuth);
392 }
393 let url = self.endpoint.metadata_url(identifier.as_str())?;
394 let patch = serde_json::to_string(patch)?;
395 self.execute_metadata_write(
396 self.archive_request(Method::POST, url)
397 .header(CONTENT_TYPE, "application/x-www-form-urlencoded")
398 .form(&[("-target", target.as_str()), ("-patch", patch)]),
399 )
400 .await
401 }
402
403 pub async fn apply_metadata_changes(
410 &self,
411 identifier: &ItemIdentifier,
412 changes: &[MetadataChange],
413 ) -> Result<MetadataWriteResponse, InternetArchiveError> {
414 if self.auth.is_none() {
415 return Err(InternetArchiveError::MissingAuth);
416 }
417 let url = self.endpoint.metadata_url(identifier.as_str())?;
418 let payload = serde_json::to_string(changes)?;
419 self.execute_metadata_write(
420 self.archive_request(Method::POST, url)
421 .header(CONTENT_TYPE, "application/x-www-form-urlencoded")
422 .form(&[("-changes", payload)]),
423 )
424 .await
425 }
426
427 pub async fn update_item_metadata(
444 &self,
445 identifier: &ItemIdentifier,
446 metadata: &ItemMetadata,
447 ) -> Result<MetadataWriteResponse, InternetArchiveError> {
448 let current = self.get_item(identifier).await?;
449 let current_value = serde_json::to_value(¤t.metadata)?;
450 let desired_value =
451 serde_json::to_value(merge_metadata_semantically(¤t.metadata, metadata))?;
452 let patch_value = json_patch::diff(¤t_value, &desired_value);
453 let patch: Vec<PatchOperation> =
454 serde_json::from_value(serde_json::to_value(patch_value)?)?;
455
456 if patch.is_empty() {
457 return Ok(MetadataWriteResponse {
458 success: true,
459 task_id: None,
460 log: None,
461 error: None,
462 });
463 }
464
465 if self.auth.is_none() {
466 return Err(InternetArchiveError::MissingAuth);
467 }
468
469 self.apply_metadata_patch(identifier, MetadataTarget::Metadata, &patch)
470 .await
471 }
472
473 pub async fn upload_file(
480 &self,
481 identifier: &ItemIdentifier,
482 spec: &UploadSpec,
483 options: &UploadOptions,
484 ) -> Result<(), InternetArchiveError> {
485 self.put_object(identifier, spec, options, None, false)
486 .await
487 }
488
489 #[cfg(feature = "indicatif")]
499 pub async fn upload_file_with_progress(
500 &self,
501 identifier: &ItemIdentifier,
502 spec: &UploadSpec,
503 options: &UploadOptions,
504 progress: &ProgressBar,
505 ) -> Result<(), InternetArchiveError> {
506 self.put_object_with_progress(identifier, spec, options, None, false, progress)
507 .await
508 }
509
510 pub async fn create_item(
519 &self,
520 identifier: &ItemIdentifier,
521 metadata: &ItemMetadata,
522 spec: &UploadSpec,
523 options: &UploadOptions,
524 ) -> Result<(), InternetArchiveError> {
525 let header_encoding = metadata.as_header_encoding();
526 let remainder = header_encoding.remainder.clone();
527 self.put_object(identifier, spec, options, Some(header_encoding), true)
528 .await?;
529
530 if !remainder.as_map().is_empty() {
531 self.wait_for_item(identifier).await?;
532 self.update_item_metadata(identifier, &remainder).await?;
533 self.wait_for_item_projection(identifier, &[], &remainder)
534 .await?;
535 }
536
537 Ok(())
538 }
539
540 #[cfg(feature = "indicatif")]
551 pub async fn create_item_with_progress(
552 &self,
553 identifier: &ItemIdentifier,
554 metadata: &ItemMetadata,
555 spec: &UploadSpec,
556 options: &UploadOptions,
557 progress: &ProgressBar,
558 ) -> Result<(), InternetArchiveError> {
559 let header_encoding = metadata.as_header_encoding();
560 let remainder = header_encoding.remainder.clone();
561 self.put_object_with_progress(
562 identifier,
563 spec,
564 options,
565 Some(header_encoding),
566 true,
567 progress,
568 )
569 .await?;
570
571 if !remainder.as_map().is_empty() {
572 self.wait_for_item(identifier).await?;
573 self.update_item_metadata(identifier, &remainder).await?;
574 self.wait_for_item_projection(identifier, &[], &remainder)
575 .await?;
576 }
577
578 Ok(())
579 }
580
581 pub async fn delete_file(
588 &self,
589 identifier: &ItemIdentifier,
590 filename: &str,
591 options: &DeleteOptions,
592 ) -> Result<(), InternetArchiveError> {
593 let mut headers = HeaderMap::new();
594 if options.cascade_delete {
595 headers.insert("x-archive-cascade-delete", HeaderValue::from_static("1"));
596 }
597 if options.keep_old_version {
598 headers.insert("x-archive-keep-old-version", HeaderValue::from_static("1"));
599 }
600
601 let url = self.endpoint.s3_object_url(identifier.as_str(), filename)?;
602 self.execute_s3(Method::DELETE, url, headers, None).await?;
603 Ok(())
604 }
605
606 pub async fn make_dark(
623 &self,
624 identifier: &ItemIdentifier,
625 comment: &str,
626 ) -> Result<TaskSubmission, InternetArchiveError> {
627 if self.auth.is_none() {
628 return Err(InternetArchiveError::MissingAuth);
629 }
630 let url = self.endpoint.tasks_url()?;
631 let payload = serde_json::json!({
632 "identifier": identifier.as_str(),
633 "cmd": "make_dark.php",
634 "args": { "comment": comment },
635 });
636 let response = self
637 .archive_request(Method::POST, url)
638 .header(CONTENT_TYPE, "application/json")
639 .header(ACCEPT, "application/json")
640 .body(serde_json::to_vec(&payload)?)
641 .send()
642 .await?;
643 if !response.status().is_success() {
644 return Err(InternetArchiveError::from_response(response).await);
645 }
646 let bytes = response.bytes().await?;
647 decode_task_submission(&bytes)
648 }
649
650 pub fn resolve_download(
656 &self,
657 identifier: &ItemIdentifier,
658 filename: &str,
659 ) -> Result<ResolvedDownload, InternetArchiveError> {
660 Ok(ResolvedDownload {
661 identifier: identifier.clone(),
662 filename: filename.to_owned(),
663 url: self.endpoint.download_url(identifier.as_str(), filename)?,
664 })
665 }
666
667 pub async fn download_bytes(
673 &self,
674 identifier: &ItemIdentifier,
675 filename: &str,
676 ) -> Result<bytes::Bytes, InternetArchiveError> {
677 let resolved = self.resolve_download(identifier, filename)?;
678 self.execute_bytes(self.inner.get(resolved.url)).await
679 }
680
681 #[cfg(feature = "indicatif")]
689 pub async fn download_bytes_with_progress(
690 &self,
691 identifier: &ItemIdentifier,
692 filename: &str,
693 progress: &ProgressBar,
694 ) -> Result<bytes::Bytes, InternetArchiveError> {
695 let resolved = self.resolve_download(identifier, filename)?;
696 self.execute_bytes_with_progress(self.inner.get(resolved.url), progress)
697 .await
698 }
699
700 pub async fn download_to_path(
706 &self,
707 identifier: &ItemIdentifier,
708 filename: &str,
709 path: impl AsRef<Path>,
710 ) -> Result<(), InternetArchiveError> {
711 let bytes = self.download_bytes(identifier, filename).await?;
712 tokio::fs::write(path, &bytes).await?;
713 Ok(())
714 }
715
716 #[cfg(feature = "indicatif")]
725 pub async fn download_to_path_with_progress(
726 &self,
727 identifier: &ItemIdentifier,
728 filename: &str,
729 path: impl AsRef<Path>,
730 progress: &ProgressBar,
731 ) -> Result<(), InternetArchiveError> {
732 let bytes = self
733 .download_bytes_with_progress(identifier, filename, progress)
734 .await?;
735 tokio::fs::write(path, &bytes).await?;
736 Ok(())
737 }
738
739 pub(crate) async fn wait_for_item(
740 &self,
741 identifier: &ItemIdentifier,
742 ) -> Result<Item, InternetArchiveError> {
743 self.wait_until("item visibility", || async {
744 self.get_item(identifier).await
745 })
746 .await
747 }
748
749 pub(crate) async fn wait_for_item_projection(
750 &self,
751 identifier: &ItemIdentifier,
752 expected_files: &[String],
753 expected_metadata: &ItemMetadata,
754 ) -> Result<Item, InternetArchiveError> {
755 self.wait_until("item projection visibility", || async {
756 let item = self.get_item(identifier).await?;
757 if expected_files
758 .iter()
759 .all(|filename| item.file(filename).is_some())
760 && metadata_contains_projection(&item.metadata, expected_metadata)
761 {
762 Ok(item)
763 } else {
764 Err(InternetArchiveError::ItemNotFound {
765 identifier: identifier.to_string(),
766 })
767 }
768 })
769 .await
770 }
771
772 async fn put_object(
773 &self,
774 identifier: &ItemIdentifier,
775 spec: &UploadSpec,
776 options: &UploadOptions,
777 metadata: Option<HeaderEncoding>,
778 auto_make_bucket: bool,
779 ) -> Result<(), InternetArchiveError> {
780 let (url, headers, body) = self
781 .prepare_put_object(identifier, spec, options, metadata, auto_make_bucket)
782 .await?;
783 self.execute_s3(Method::PUT, url, headers, Some(body))
784 .await?;
785 Ok(())
786 }
787
788 #[cfg(feature = "indicatif")]
789 async fn put_object_with_progress(
790 &self,
791 identifier: &ItemIdentifier,
792 spec: &UploadSpec,
793 options: &UploadOptions,
794 metadata: Option<HeaderEncoding>,
795 auto_make_bucket: bool,
796 progress: &ProgressBar,
797 ) -> Result<(), InternetArchiveError> {
798 let (url, headers, body) = self
799 .prepare_put_object(identifier, spec, options, metadata, auto_make_bucket)
800 .await?;
801 self.execute_s3_with_progress(Method::PUT, url, headers, Some(body), progress)
802 .await?;
803 Ok(())
804 }
805
806 async fn prepare_put_object(
807 &self,
808 identifier: &ItemIdentifier,
809 spec: &UploadSpec,
810 options: &UploadOptions,
811 metadata: Option<HeaderEncoding>,
812 auto_make_bucket: bool,
813 ) -> Result<(Url, HeaderMap, ReplayableBody), InternetArchiveError> {
814 if auto_make_bucket {
815 identifier.validate_for_bucket_creation()?;
816 }
817
818 let mut headers = HeaderMap::new();
819 headers.insert(
820 CONTENT_TYPE,
821 HeaderValue::from_str(spec.content_type.as_ref()).map_err(|_| {
822 InternetArchiveError::InvalidState("invalid content type".to_owned())
823 })?,
824 );
825
826 if auto_make_bucket {
827 headers.insert("x-archive-auto-make-bucket", HeaderValue::from_static("1"));
828 headers.insert("x-amz-auto-make-bucket", HeaderValue::from_static("1"));
829 }
830 if options.skip_derive {
831 headers.insert("x-archive-queue-derive", HeaderValue::from_static("0"));
832 }
833 if options.keep_old_version {
834 headers.insert("x-archive-keep-old-version", HeaderValue::from_static("1"));
835 }
836 if options.interactive_priority {
837 headers.insert(
838 "x-archive-interactive-priority",
839 HeaderValue::from_static("1"),
840 );
841 }
842 if let Some(size_hint) = options.size_hint {
843 headers.insert(
844 "x-archive-size-hint",
845 HeaderValue::from_str(&size_hint.to_string()).map_err(|_| {
846 InternetArchiveError::InvalidState("invalid size hint".to_owned())
847 })?,
848 );
849 }
850 if let Some(metadata) = metadata {
851 for (name, value) in metadata.headers {
852 let name = HeaderName::from_bytes(name.as_bytes()).map_err(|_| {
853 InternetArchiveError::InvalidState("invalid metadata header name".to_owned())
854 })?;
855 headers.insert(
856 name,
857 HeaderValue::from_str(&value).map_err(|_| {
858 InternetArchiveError::InvalidState(
859 "invalid metadata header value".to_owned(),
860 )
861 })?,
862 );
863 }
864 }
865
866 let body = match &spec.source {
867 UploadSource::Path(path) => {
868 let length = tokio::fs::metadata(path).await?.len();
869 ReplayableBody::Path {
870 path: path.clone(),
871 length,
872 }
873 }
874 UploadSource::Bytes(bytes) => ReplayableBody::Bytes(bytes.clone()),
875 };
876
877 let url = self
878 .endpoint
879 .s3_object_url(identifier.as_str(), &spec.filename)?;
880 Ok((url, headers, body))
881 }
882
883 fn archive_request(&self, method: Method, url: Url) -> reqwest::RequestBuilder {
884 let mut request = self.inner.request(method, url);
885 if let Some(auth) = &self.auth {
886 request = request.header(AUTHORIZATION, auth.authorization_header());
887 }
888 request
889 }
890
891 fn s3_request(
892 &self,
893 method: Method,
894 url: Url,
895 headers: HeaderMap,
896 ) -> Result<reqwest::RequestBuilder, InternetArchiveError> {
897 let auth = self
898 .auth
899 .as_ref()
900 .ok_or(InternetArchiveError::MissingAuth)?;
901 Ok(self
902 .s3_inner
903 .request(method, url)
904 .headers(headers)
905 .header(AUTHORIZATION, auth.authorization_header()))
906 }
907
908 async fn execute_json<T>(
909 &self,
910 request: reqwest::RequestBuilder,
911 ) -> Result<T, InternetArchiveError>
912 where
913 T: serde::de::DeserializeOwned,
914 {
915 let response = request.send().await?;
916 if !response.status().is_success() {
917 return Err(InternetArchiveError::from_response(response).await);
918 }
919 let bytes = response.bytes().await?;
920 Ok(serde_json::from_slice(&bytes)?)
921 }
922
923 async fn execute_bytes(
924 &self,
925 request: reqwest::RequestBuilder,
926 ) -> Result<bytes::Bytes, InternetArchiveError> {
927 let response = request.send().await?;
928 if !response.status().is_success() {
929 return Err(InternetArchiveError::from_response(response).await);
930 }
931 response.bytes().await.map_err(Into::into)
932 }
933
934 #[cfg(feature = "indicatif")]
935 async fn execute_bytes_with_progress(
936 &self,
937 request: reqwest::RequestBuilder,
938 progress: &ProgressBar,
939 ) -> Result<bytes::Bytes, InternetArchiveError> {
940 progress.set_position(0);
941
942 let mut response = request.send().await?;
943 if !response.status().is_success() {
944 return Err(InternetArchiveError::from_response(response).await);
945 }
946
947 if let Some(length) = response.content_length() {
948 progress.set_length(length);
949 }
950
951 let mut bytes = response
952 .content_length()
953 .and_then(|length| usize::try_from(length).ok())
954 .map_or_else(Vec::new, Vec::with_capacity);
955
956 while let Some(chunk) = response.chunk().await? {
957 progress.inc(chunk.len() as u64);
958 bytes.extend_from_slice(&chunk);
959 }
960
961 progress.finish();
962 Ok(bytes::Bytes::from(bytes))
963 }
964
965 async fn execute_metadata_write(
966 &self,
967 request: reqwest::RequestBuilder,
968 ) -> Result<MetadataWriteResponse, InternetArchiveError> {
969 let response = request.send().await?;
970 if !response.status().is_success() {
971 return Err(InternetArchiveError::from_response(response).await);
972 }
973
974 let bytes = response.bytes().await?;
975 decode_metadata_write_failure(&bytes)?;
976 Ok(serde_json::from_slice(&bytes)?)
977 }
978
979 async fn execute_s3(
980 &self,
981 method: Method,
982 url: Url,
983 headers: HeaderMap,
984 body: Option<ReplayableBody>,
985 ) -> Result<reqwest::Response, InternetArchiveError> {
986 let mut current_url = url;
987 let mut remaining_redirects = 8_u8;
988
989 loop {
990 let mut request =
991 self.s3_request(method.clone(), current_url.clone(), headers.clone())?;
992 if let Some(body) = &body {
993 request = body.apply(request).await?;
994 }
995
996 let response = request.send().await?;
997 if is_redirect(response.status()) {
998 let Some(location) = response.headers().get(LOCATION).cloned() else {
999 return Err(InternetArchiveError::InvalidState(
1000 "redirect response missing location header".to_owned(),
1001 ));
1002 };
1003
1004 if remaining_redirects == 0 {
1005 return Err(InternetArchiveError::InvalidState(
1006 "too many redirects during S3 request".to_owned(),
1007 ));
1008 }
1009
1010 let location = location.to_str().map_err(|_| {
1011 InternetArchiveError::InvalidState(
1012 "redirect location is not valid UTF-8".to_owned(),
1013 )
1014 })?;
1015 let redirected_url = current_url.join(location)?;
1016 if redirected_url.origin() != self.endpoint.s3_base().origin() {
1017 return Err(InternetArchiveError::InvalidState(
1018 "refusing to forward credentials to redirected S3 host".to_owned(),
1019 ));
1020 }
1021 current_url = redirected_url;
1022 remaining_redirects -= 1;
1023 continue;
1024 }
1025
1026 if !response.status().is_success() {
1027 return Err(InternetArchiveError::from_response(response).await);
1028 }
1029
1030 return Ok(response);
1031 }
1032 }
1033
1034 #[cfg(feature = "indicatif")]
1035 async fn execute_s3_with_progress(
1036 &self,
1037 method: Method,
1038 url: Url,
1039 headers: HeaderMap,
1040 body: Option<ReplayableBody>,
1041 progress: &ProgressBar,
1042 ) -> Result<reqwest::Response, InternetArchiveError> {
1043 let mut current_url = url;
1044 let mut remaining_redirects = 8_u8;
1045
1046 loop {
1047 let mut request =
1048 self.s3_request(method.clone(), current_url.clone(), headers.clone())?;
1049 if let Some(body) = &body {
1050 request = body.apply_with_progress(request, progress).await?;
1051 }
1052
1053 let response = request.send().await?;
1054 if is_redirect(response.status()) {
1055 let Some(location) = response.headers().get(LOCATION).cloned() else {
1056 return Err(InternetArchiveError::InvalidState(
1057 "redirect response missing location header".to_owned(),
1058 ));
1059 };
1060
1061 if remaining_redirects == 0 {
1062 return Err(InternetArchiveError::InvalidState(
1063 "too many redirects during S3 request".to_owned(),
1064 ));
1065 }
1066
1067 let location = location.to_str().map_err(|_| {
1068 InternetArchiveError::InvalidState(
1069 "redirect location is not valid UTF-8".to_owned(),
1070 )
1071 })?;
1072 let redirected_url = current_url.join(location)?;
1073 if redirected_url.origin() != self.endpoint.s3_base().origin() {
1074 return Err(InternetArchiveError::InvalidState(
1075 "refusing to forward credentials to redirected S3 host".to_owned(),
1076 ));
1077 }
1078 current_url = redirected_url;
1079 remaining_redirects -= 1;
1080 continue;
1081 }
1082
1083 if !response.status().is_success() {
1084 return Err(InternetArchiveError::from_response(response).await);
1085 }
1086
1087 progress.finish();
1088 return Ok(response);
1089 }
1090 }
1091
1092 pub(crate) async fn wait_until<T, F, Fut>(
1093 &self,
1094 label: &'static str,
1095 mut action: F,
1096 ) -> Result<T, InternetArchiveError>
1097 where
1098 F: FnMut() -> Fut,
1099 Fut: std::future::Future<Output = Result<T, InternetArchiveError>>,
1100 {
1101 let started = tokio::time::Instant::now();
1102 let mut delay = self.poll.initial_delay;
1103
1104 loop {
1105 match action().await {
1106 Ok(value) => return Ok(value),
1107 Err(error)
1108 if started.elapsed() < self.poll.max_wait
1109 && is_retryable_wait_error(&error) =>
1110 {
1111 tokio::time::sleep(delay).await;
1112 delay = std::cmp::min(delay.saturating_mul(2), self.poll.max_delay);
1113 }
1114 Err(error) => return Err(error),
1115 }
1116
1117 if started.elapsed() >= self.poll.max_wait {
1118 return Err(InternetArchiveError::Timeout(label));
1119 }
1120 }
1121 }
1122}
1123
1124#[derive(Clone, Debug)]
1125enum ReplayableBody {
1126 Path {
1127 path: std::path::PathBuf,
1128 length: u64,
1129 },
1130 Bytes(Vec<u8>),
1131}
1132
1133impl ReplayableBody {
1134 async fn apply(
1135 &self,
1136 request: reqwest::RequestBuilder,
1137 ) -> Result<reqwest::RequestBuilder, InternetArchiveError> {
1138 match self {
1139 Self::Path { path, length } => {
1140 let file = File::open(path).await?;
1141 Ok(request
1142 .header(CONTENT_LENGTH, *length)
1143 .body(reqwest::Body::wrap_stream(ReaderStream::new(file))))
1144 }
1145 Self::Bytes(bytes) => Ok(request
1146 .header(CONTENT_LENGTH, bytes.len())
1147 .body(bytes.clone())),
1148 }
1149 }
1150
1151 #[cfg(feature = "indicatif")]
1152 async fn apply_with_progress(
1153 &self,
1154 request: reqwest::RequestBuilder,
1155 progress: &ProgressBar,
1156 ) -> Result<reqwest::RequestBuilder, InternetArchiveError> {
1157 progress.set_position(0);
1158
1159 match self {
1160 Self::Path { path, length } => {
1161 progress.set_length(*length);
1162 let file = File::open(path).await?;
1163 Ok(request
1164 .header(CONTENT_LENGTH, *length)
1165 .body(reqwest::Body::wrap_stream(ProgressStream::new(
1166 ReaderStream::new(file),
1167 progress.clone(),
1168 ))))
1169 }
1170 Self::Bytes(bytes) => {
1171 let length = bytes.len() as u64;
1172 progress.set_length(length);
1173 Ok(request
1174 .header(CONTENT_LENGTH, length)
1175 .body(reqwest::Body::wrap_stream(ProgressStream::new(
1176 ChunkedBytesStream::new(bytes::Bytes::from(bytes.clone())),
1177 progress.clone(),
1178 ))))
1179 }
1180 }
1181 }
1182}
1183
1184#[cfg(feature = "indicatif")]
1185struct ProgressStream<S> {
1186 inner: S,
1187 progress: ProgressBar,
1188}
1189
1190#[cfg(feature = "indicatif")]
1191impl<S> ProgressStream<S> {
1192 fn new(inner: S, progress: ProgressBar) -> Self {
1193 Self { inner, progress }
1194 }
1195}
1196
1197#[cfg(feature = "indicatif")]
1198impl<S> Stream for ProgressStream<S>
1199where
1200 S: Stream<Item = Result<bytes::Bytes, std::io::Error>> + Unpin,
1201{
1202 type Item = Result<bytes::Bytes, std::io::Error>;
1203
1204 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1205 let this = self.get_mut();
1206 match Pin::new(&mut this.inner).poll_next(cx) {
1207 Poll::Ready(Some(Ok(chunk))) => {
1208 this.progress.inc(chunk.len() as u64);
1209 Poll::Ready(Some(Ok(chunk)))
1210 }
1211 other => other,
1212 }
1213 }
1214}
1215
1216#[cfg(feature = "indicatif")]
1217struct ChunkedBytesStream {
1218 bytes: bytes::Bytes,
1219 offset: usize,
1220}
1221
1222#[cfg(feature = "indicatif")]
1223impl ChunkedBytesStream {
1224 const CHUNK_SIZE: usize = 16 * 1024;
1225
1226 fn new(bytes: bytes::Bytes) -> Self {
1227 Self { bytes, offset: 0 }
1228 }
1229}
1230
1231#[cfg(feature = "indicatif")]
1232impl Stream for ChunkedBytesStream {
1233 type Item = Result<bytes::Bytes, std::io::Error>;
1234
1235 fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1236 let this = self.get_mut();
1237 if this.offset >= this.bytes.len() {
1238 return Poll::Ready(None);
1239 }
1240
1241 let end = this
1242 .offset
1243 .saturating_add(Self::CHUNK_SIZE)
1244 .min(this.bytes.len());
1245 let chunk = this.bytes.slice(this.offset..end);
1246 this.offset = end;
1247 Poll::Ready(Some(Ok(chunk)))
1248 }
1249}
1250
1251fn is_redirect(status: StatusCode) -> bool {
1252 matches!(
1253 status,
1254 StatusCode::MOVED_PERMANENTLY
1255 | StatusCode::FOUND
1256 | StatusCode::SEE_OTHER
1257 | StatusCode::TEMPORARY_REDIRECT
1258 | StatusCode::PERMANENT_REDIRECT
1259 )
1260}
1261
1262fn is_retryable_wait_error(error: &InternetArchiveError) -> bool {
1263 match error {
1264 InternetArchiveError::ItemNotFound { .. } => true,
1265 InternetArchiveError::Http { status, .. } if status.is_server_error() => true,
1266 _ => false,
1267 }
1268}
1269
1270fn decode_search_response(bytes: &[u8]) -> Result<SearchResponse, InternetArchiveError> {
1271 let value: Value = serde_json::from_slice(bytes)?;
1272
1273 if value.get("response").is_some() {
1274 return Ok(serde_json::from_value(value)?);
1275 }
1276
1277 let message = value
1278 .get("error")
1279 .and_then(Value::as_str)
1280 .or_else(|| value.get("message").and_then(Value::as_str))
1281 .or_else(|| value.get("title").and_then(Value::as_str))
1282 .map_or_else(
1283 || {
1284 String::from_utf8_lossy(bytes)
1285 .trim()
1286 .chars()
1287 .take(512)
1288 .collect()
1289 },
1290 str::to_owned,
1291 );
1292
1293 Err(InternetArchiveError::InvalidState(format!(
1294 "unexpected search response: {message}"
1295 )))
1296}
1297
1298fn decode_task_submission(bytes: &[u8]) -> Result<TaskSubmission, InternetArchiveError> {
1299 let value: Value = serde_json::from_slice(bytes)?;
1300
1301 let success = value
1302 .get("success")
1303 .and_then(Value::as_bool)
1304 .unwrap_or(false);
1305
1306 if success {
1307 if let Some(inner) = value.get("value").cloned() {
1308 return Ok(serde_json::from_value(inner)?);
1309 }
1310 }
1311
1312 let message = value
1313 .get("error")
1314 .and_then(Value::as_str)
1315 .or_else(|| value.get("message").and_then(Value::as_str))
1316 .map_or_else(
1317 || {
1318 String::from_utf8_lossy(bytes)
1319 .trim()
1320 .chars()
1321 .take(512)
1322 .collect()
1323 },
1324 str::to_owned,
1325 );
1326
1327 Err(InternetArchiveError::InvalidState(format!(
1328 "unexpected task submission response: {message}"
1329 )))
1330}
1331
1332#[cfg(test)]
1333mod tests {
1334 #[cfg(feature = "indicatif")]
1335 use std::pin::Pin;
1336 use std::sync::OnceLock;
1337 #[cfg(feature = "indicatif")]
1338 use std::task::{Context, Poll, Waker};
1339 use std::time::Duration;
1340
1341 use axum::extract::State;
1342 use axum::http::{HeaderMap, HeaderValue, StatusCode, Uri};
1343 use axum::routing::{get, put};
1344 use axum::{Json, Router};
1345 #[cfg(feature = "indicatif")]
1346 use bytes::Bytes;
1347 #[cfg(feature = "indicatif")]
1348 use futures_core::Stream;
1349 #[cfg(feature = "indicatif")]
1350 use indicatif::ProgressBar;
1351 use serde_json::{json, Value};
1352 use tokio::net::TcpListener;
1353 use tokio::sync::Mutex;
1354 use url::Url;
1355
1356 use super::{Auth, InternetArchiveClient};
1357 #[cfg(feature = "indicatif")]
1358 use super::{ChunkedBytesStream, ProgressStream, ReplayableBody};
1359 use crate::error::InternetArchiveError;
1360 use crate::metadata::{ItemMetadata, MetadataChange, MetadataTarget, PatchOperation};
1361 use crate::search::{SearchQuery, SortDirection};
1362 use crate::upload::{DeleteOptions, UploadOptions, UploadSpec};
1363 use crate::{Endpoint, IdentifierError, ItemIdentifier, PollOptions};
1364 use reqwest::header::LOCATION;
1365
1366 #[derive(Default)]
1367 struct StateData {
1368 seen_upload_auth: Mutex<Vec<String>>,
1369 seen_delete_auth: Mutex<Vec<String>>,
1370 captured_mdapi_body: Mutex<Vec<String>>,
1371 #[cfg(feature = "indicatif")]
1372 metadata_reads: Mutex<u8>,
1373 }
1374
1375 fn test_client(addr: std::net::SocketAddr) -> InternetArchiveClient {
1376 InternetArchiveClient::builder()
1377 .auth(Auth::new("access", "secret"))
1378 .endpoint(Endpoint::custom(
1379 Url::parse(&format!("http://{addr}/")).unwrap(),
1380 Url::parse(&format!("http://{addr}/s3/")).unwrap(),
1381 ))
1382 .poll_options(PollOptions {
1383 max_wait: Duration::from_millis(50),
1384 initial_delay: Duration::from_millis(5),
1385 max_delay: Duration::from_millis(10),
1386 })
1387 .build()
1388 .unwrap()
1389 }
1390
1391 fn unauthenticated_test_client(addr: std::net::SocketAddr) -> InternetArchiveClient {
1392 InternetArchiveClient::builder()
1393 .endpoint(Endpoint::custom(
1394 Url::parse(&format!("http://{addr}/")).unwrap(),
1395 Url::parse(&format!("http://{addr}/s3/")).unwrap(),
1396 ))
1397 .poll_options(PollOptions {
1398 max_wait: Duration::from_millis(50),
1399 initial_delay: Duration::from_millis(5),
1400 max_delay: Duration::from_millis(10),
1401 })
1402 .build()
1403 .unwrap()
1404 }
1405
1406 #[tokio::test]
1407 async fn search_get_item_metadata_write_download_and_redirected_s3_calls_work() {
1408 async fn metadata() -> Json<Value> {
1409 Json(json!({
1410 "created": 1,
1411 "files": [{"name": "demo.txt", "size": "5"}],
1412 "metadata": {
1413 "identifier": "demo-item",
1414 "title": "Demo item",
1415 "collection": ["opensource"]
1416 }
1417 }))
1418 }
1419
1420 async fn advanced_search() -> Json<Value> {
1421 Json(json!({
1422 "responseHeader": {
1423 "status": 0,
1424 "QTime": 1,
1425 "params": {"query": "identifier:demo-item"}
1426 },
1427 "response": {
1428 "numFound": 1,
1429 "start": 0,
1430 "docs": [{"identifier": "demo-item", "title": "Demo item"}]
1431 }
1432 }))
1433 }
1434
1435 async fn metadata_write(
1436 State(state): State<std::sync::Arc<StateData>>,
1437 headers: HeaderMap,
1438 body: String,
1439 ) -> (StatusCode, Json<Value>) {
1440 state.captured_mdapi_body.lock().await.push(body);
1441 assert_eq!(headers.get("authorization").unwrap(), "LOW access:secret");
1442 (
1443 StatusCode::OK,
1444 Json(json!({
1445 "success": true,
1446 "task_id": 42,
1447 "log": "https://catalogd.archive.org/log/42"
1448 })),
1449 )
1450 }
1451
1452 async fn download() -> &'static str {
1453 "hello"
1454 }
1455
1456 async fn first_upload() -> (StatusCode, HeaderMap) {
1457 let mut headers = HeaderMap::new();
1458 headers.insert(
1459 LOCATION,
1460 HeaderValue::from_static("/s3-redirected/demo-item/demo.txt"),
1461 );
1462 (StatusCode::TEMPORARY_REDIRECT, headers)
1463 }
1464
1465 async fn redirected_upload(
1466 State(state): State<std::sync::Arc<StateData>>,
1467 headers: HeaderMap,
1468 body: String,
1469 ) -> StatusCode {
1470 state.seen_upload_auth.lock().await.push(
1471 headers
1472 .get("authorization")
1473 .unwrap()
1474 .to_str()
1475 .unwrap()
1476 .to_owned(),
1477 );
1478 assert_eq!(body, "hello");
1479 StatusCode::OK
1480 }
1481
1482 async fn first_delete() -> (StatusCode, HeaderMap) {
1483 let mut headers = HeaderMap::new();
1484 headers.insert(
1485 LOCATION,
1486 HeaderValue::from_static("/s3-redirected/demo-item/demo.txt"),
1487 );
1488 (StatusCode::TEMPORARY_REDIRECT, headers)
1489 }
1490
1491 async fn redirected_delete(
1492 State(state): State<std::sync::Arc<StateData>>,
1493 headers: HeaderMap,
1494 ) -> StatusCode {
1495 state.seen_delete_auth.lock().await.push(
1496 headers
1497 .get("authorization")
1498 .unwrap()
1499 .to_str()
1500 .unwrap()
1501 .to_owned(),
1502 );
1503 StatusCode::NO_CONTENT
1504 }
1505
1506 let state = std::sync::Arc::new(StateData::default());
1507 let app = Router::new()
1508 .route("/metadata/demo-item", get(metadata).post(metadata_write))
1509 .route("/advancedsearch.php", get(advanced_search))
1510 .route("/download/demo-item/demo.txt", get(download))
1511 .route(
1512 "/s3/demo-item/demo.txt",
1513 put(first_upload).delete(first_delete),
1514 )
1515 .route(
1516 "/s3-redirected/demo-item/demo.txt",
1517 put(redirected_upload).delete(redirected_delete),
1518 )
1519 .with_state(state.clone());
1520
1521 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1522 let addr = listener.local_addr().unwrap();
1523 let server = tokio::spawn(async move { axum::serve(listener, app).await.unwrap() });
1524 let client = test_client(addr);
1525 let identifier = ItemIdentifier::new("demo-item").unwrap();
1526
1527 let item = client.get_item(&identifier).await.unwrap();
1528 assert_eq!(item.metadata.title(), Some("Demo item"));
1529
1530 let search = client
1531 .search(
1532 &SearchQuery::builder("identifier:demo-item")
1533 .field("identifier")
1534 .field("title")
1535 .sort("publicdate", SortDirection::Desc)
1536 .build(),
1537 )
1538 .await
1539 .unwrap();
1540 assert_eq!(
1541 search.response.docs[0].identifier().unwrap().as_str(),
1542 "demo-item"
1543 );
1544
1545 let write = client
1546 .apply_metadata_patch(
1547 &identifier,
1548 MetadataTarget::Metadata,
1549 &[PatchOperation::replace("/title", "Updated title")],
1550 )
1551 .await
1552 .unwrap();
1553 assert_eq!(write.task_id, Some(crate::TaskId(42)));
1554
1555 let spec = UploadSpec::from_bytes("demo.txt", b"hello".to_vec());
1556 client
1557 .upload_file(&identifier, &spec, &UploadOptions::default())
1558 .await
1559 .unwrap();
1560 client
1561 .delete_file(&identifier, "demo.txt", &DeleteOptions::default())
1562 .await
1563 .unwrap();
1564 assert_eq!(
1565 client
1566 .download_bytes(&identifier, "demo.txt")
1567 .await
1568 .unwrap(),
1569 "hello"
1570 );
1571
1572 assert_eq!(state.seen_upload_auth.lock().await[0], "LOW access:secret");
1573 assert_eq!(state.seen_delete_auth.lock().await[0], "LOW access:secret");
1574 assert!(state.captured_mdapi_body.lock().await[0].contains("-target=metadata"));
1575
1576 server.abort();
1577 }
1578
1579 #[tokio::test]
1580 async fn search_reports_non_search_json_payloads_as_invalid_state() {
1581 async fn advanced_search_error() -> Json<Value> {
1582 Json(json!({
1583 "message": "search backend warming up"
1584 }))
1585 }
1586
1587 let app = Router::new().route("/advancedsearch.php", get(advanced_search_error));
1588 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1589 let addr = listener.local_addr().unwrap();
1590 let server = tokio::spawn(async move { axum::serve(listener, app).await.unwrap() });
1591 let client = unauthenticated_test_client(addr);
1592
1593 let error = client
1594 .search(&SearchQuery::builder("identifier:demo-item").build())
1595 .await
1596 .unwrap_err();
1597
1598 assert!(
1599 matches!(error, InternetArchiveError::InvalidState(message) if message.contains("search backend warming up"))
1600 );
1601
1602 server.abort();
1603 }
1604
1605 #[tokio::test]
1606 async fn search_uses_trimmed_raw_json_when_no_message_fields_are_present() {
1607 async fn advanced_search_error() -> Json<Value> {
1608 Json(json!({
1609 "foo": "bar"
1610 }))
1611 }
1612
1613 let app = Router::new().route("/advancedsearch.php", get(advanced_search_error));
1614 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1615 let addr = listener.local_addr().unwrap();
1616 let server = tokio::spawn(async move { axum::serve(listener, app).await.unwrap() });
1617 let client = unauthenticated_test_client(addr);
1618
1619 let error = client
1620 .search(&SearchQuery::builder("identifier:demo-item").build())
1621 .await
1622 .unwrap_err();
1623
1624 assert!(
1625 matches!(error, InternetArchiveError::InvalidState(message) if message.contains("\"foo\":\"bar\""))
1626 );
1627
1628 server.abort();
1629 }
1630
1631 #[tokio::test]
1632 async fn s3_redirects_do_not_forward_credentials_to_foreign_hosts() {
1633 async fn initial_upload_redirect() -> (StatusCode, HeaderMap) {
1634 let trap = TRAP_BASE_URL.get().expect("trap base url");
1635 let mut headers = HeaderMap::new();
1636 headers.insert(
1637 LOCATION,
1638 HeaderValue::from_str(&format!("{trap}stolen/demo-item/demo.txt")).unwrap(),
1639 );
1640 (StatusCode::TEMPORARY_REDIRECT, headers)
1641 }
1642
1643 async fn trap_handler(
1644 State(state): State<std::sync::Arc<StateData>>,
1645 headers: HeaderMap,
1646 ) -> StatusCode {
1647 state.seen_upload_auth.lock().await.push(
1648 headers
1649 .get("authorization")
1650 .and_then(|value| value.to_str().ok())
1651 .unwrap_or("")
1652 .to_owned(),
1653 );
1654 StatusCode::OK
1655 }
1656
1657 static TRAP_BASE_URL: OnceLock<String> = OnceLock::new();
1658
1659 let trap_state = std::sync::Arc::new(StateData::default());
1660 let trap_app = Router::new()
1661 .route("/stolen/demo-item/demo.txt", put(trap_handler))
1662 .with_state(trap_state.clone());
1663 let trap_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1664 let trap_addr = trap_listener.local_addr().unwrap();
1665 let trap_server =
1666 tokio::spawn(async move { axum::serve(trap_listener, trap_app).await.unwrap() });
1667 TRAP_BASE_URL
1668 .set(format!("http://{trap_addr}/"))
1669 .expect("set trap base url once");
1670
1671 let origin_state = std::sync::Arc::new(StateData::default());
1672 let origin_app = Router::new()
1673 .route("/s3/demo-item/demo.txt", put(initial_upload_redirect))
1674 .with_state(origin_state);
1675 let origin_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1676 let origin_addr = origin_listener.local_addr().unwrap();
1677 let origin_server =
1678 tokio::spawn(async move { axum::serve(origin_listener, origin_app).await.unwrap() });
1679
1680 let client = test_client(origin_addr);
1681 let error = client
1682 .upload_file(
1683 &ItemIdentifier::new("demo-item").unwrap(),
1684 &UploadSpec::from_bytes("demo.txt", b"hello"),
1685 &UploadOptions::default(),
1686 )
1687 .await
1688 .unwrap_err();
1689
1690 assert!(matches!(
1691 error,
1692 InternetArchiveError::InvalidState(message)
1693 if message.contains("redirected S3 host")
1694 ));
1695 assert!(trap_state.seen_upload_auth.lock().await.is_empty());
1696
1697 origin_server.abort();
1698 trap_server.abort();
1699 }
1700
1701 #[test]
1702 fn auth_debug_is_redacted_and_env_helpers_work() {
1703 let auth = Auth::new("access", "secret");
1704 assert!(format!("{auth:?}").contains("<redacted>"));
1705 }
1706
1707 #[test]
1708 fn decode_task_submission_falls_back_to_body_excerpt_when_envelope_has_no_error_message() {
1709 let error = super::decode_task_submission(b"{\"success\":false}").unwrap_err();
1710 match error {
1711 InternetArchiveError::InvalidState(message) => {
1712 assert!(
1713 message.contains("{\"success\":false}"),
1714 "expected body excerpt in fallback message, got: {message}"
1715 );
1716 }
1717 other => panic!("unexpected error variant: {other:?}"),
1718 }
1719 }
1720
1721 #[tokio::test]
1722 async fn update_item_metadata_returns_synthetic_success_for_noop_diff() {
1723 async fn metadata() -> Json<Value> {
1724 Json(json!({
1725 "files": [],
1726 "metadata": {
1727 "identifier": "demo-item",
1728 "title": "Demo item",
1729 "collection": ["opensource"]
1730 }
1731 }))
1732 }
1733
1734 let app = Router::new().route("/metadata/demo-item", get(metadata));
1735 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1736 let addr = listener.local_addr().unwrap();
1737 let server = tokio::spawn(async move { axum::serve(listener, app).await.unwrap() });
1738
1739 let client = InternetArchiveClient::builder()
1740 .endpoint(Endpoint::custom(
1741 Url::parse(&format!("http://{addr}/")).unwrap(),
1742 Url::parse(&format!("http://{addr}/s3/")).unwrap(),
1743 ))
1744 .build()
1745 .unwrap();
1746
1747 let response = client
1748 .update_item_metadata(
1749 &ItemIdentifier::new("demo-item").unwrap(),
1750 &ItemMetadata::builder()
1751 .title("Demo item")
1752 .collection("opensource")
1753 .build(),
1754 )
1755 .await
1756 .unwrap();
1757 assert!(response.success);
1758 assert_eq!(response.task_id, None);
1759
1760 server.abort();
1761 }
1762
1763 #[test]
1764 fn builder_accessors_env_helpers_and_wait_until_paths_work() {
1765 static ENV_LOCK: OnceLock<std::sync::Mutex<()>> = OnceLock::new();
1766
1767 let _guard = ENV_LOCK
1768 .get_or_init(|| std::sync::Mutex::new(()))
1769 .lock()
1770 .unwrap();
1771
1772 let custom_access = "IA_TEST_ACCESS_KEY";
1773 let custom_secret = "IA_TEST_SECRET_KEY";
1774 std::env::set_var(custom_access, "custom-access");
1775 std::env::set_var(custom_secret, "custom-secret");
1776 std::env::set_var(Auth::ACCESS_KEY_ENV_VAR, "default-access");
1777 std::env::set_var(Auth::SECRET_KEY_ENV_VAR, "default-secret");
1778
1779 let auth = Auth::from_env_vars(custom_access, custom_secret).unwrap();
1780 assert_eq!(
1781 auth.authorization_header(),
1782 "LOW custom-access:custom-secret"
1783 );
1784 assert_eq!(
1785 Auth::from_env().unwrap().authorization_header(),
1786 "LOW default-access:default-secret"
1787 );
1788 assert!(matches!(
1789 Auth::from_env_vars("MISSING_ACCESS", custom_secret).unwrap_err(),
1790 InternetArchiveError::EnvVar { name, .. } if name == "MISSING_ACCESS"
1791 ));
1792
1793 let poll = PollOptions {
1794 max_wait: Duration::from_secs(1),
1795 initial_delay: Duration::from_millis(5),
1796 max_delay: Duration::from_millis(10),
1797 };
1798 let endpoint = Endpoint::custom(
1799 Url::parse("http://localhost:3000/archive").unwrap(),
1800 Url::parse("http://localhost:3000/s3").unwrap(),
1801 );
1802 let client = InternetArchiveClient::builder()
1803 .auth(auth.clone())
1804 .endpoint(endpoint.clone())
1805 .user_agent("internetarchive-rs-tests")
1806 .request_timeout(Duration::from_secs(5))
1807 .connect_timeout(Duration::from_secs(1))
1808 .poll_options(poll.clone())
1809 .build()
1810 .unwrap();
1811
1812 assert!(client.has_auth());
1813 assert_eq!(client.endpoint(), &endpoint);
1814 assert_eq!(client.poll_options(), &poll);
1815 assert_eq!(client.request_timeout(), Some(Duration::from_secs(5)));
1816 assert_eq!(client.connect_timeout(), Some(Duration::from_secs(1)));
1817 assert!(!InternetArchiveClient::new().unwrap().has_auth());
1818 assert!(InternetArchiveClient::with_auth(auth).unwrap().has_auth());
1819 assert!(InternetArchiveClient::from_env().unwrap().has_auth());
1820
1821 let runtime = tokio::runtime::Runtime::new().unwrap();
1822 let mut attempts = 0_u8;
1823 runtime.block_on(async {
1824 let value = client
1825 .wait_until("demo wait", || {
1826 attempts += 1;
1827 async move {
1828 if attempts < 3 {
1829 Err(InternetArchiveError::ItemNotFound {
1830 identifier: "demo-item".to_owned(),
1831 })
1832 } else {
1833 Ok("ready")
1834 }
1835 }
1836 })
1837 .await
1838 .unwrap();
1839 assert_eq!(value, "ready");
1840
1841 let mut transient_attempts = 0_u8;
1842 let recovered = client
1843 .wait_until("demo transient", || {
1844 transient_attempts += 1;
1845 async move {
1846 if transient_attempts < 3 {
1847 Err(InternetArchiveError::Http {
1848 status: StatusCode::BAD_GATEWAY,
1849 code: None,
1850 message: Some("temporary outage".to_owned()),
1851 raw_body: None,
1852 })
1853 } else {
1854 Ok("recovered")
1855 }
1856 }
1857 })
1858 .await
1859 .unwrap();
1860 assert_eq!(recovered, "recovered");
1861
1862 let error = client
1863 .wait_until("demo error", || async {
1864 Err::<(), _>(InternetArchiveError::InvalidState("boom".to_owned()))
1865 })
1866 .await
1867 .unwrap_err();
1868 assert!(
1869 matches!(error, InternetArchiveError::InvalidState(message) if message == "boom")
1870 );
1871
1872 let timeout = client
1873 .wait_until("demo timeout", || async {
1874 Err::<(), _>(InternetArchiveError::ItemNotFound {
1875 identifier: "demo-item".to_owned(),
1876 })
1877 })
1878 .await
1879 .unwrap_err();
1880 assert!(matches!(
1881 timeout,
1882 InternetArchiveError::Timeout("demo timeout")
1883 ));
1884 });
1885
1886 std::env::remove_var(custom_access);
1887 std::env::remove_var(custom_secret);
1888 std::env::remove_var(Auth::ACCESS_KEY_ENV_VAR);
1889 std::env::remove_var(Auth::SECRET_KEY_ENV_VAR);
1890 }
1891
1892 #[tokio::test]
1893 async fn missing_auth_and_http_error_paths_are_reported() {
1894 async fn metadata() -> Json<Value> {
1895 Json(json!({
1896 "files": [],
1897 "metadata": {
1898 "identifier": "demo-item",
1899 "title": "Old title"
1900 }
1901 }))
1902 }
1903
1904 async fn blank_metadata() -> &'static str {
1905 " "
1906 }
1907
1908 async fn non_item_metadata() -> Json<Value> {
1909 Json(json!({
1910 "error": "identifier not found",
1911 "success": false
1912 }))
1913 }
1914
1915 async fn mismatched_metadata() -> Json<Value> {
1916 Json(json!({
1917 "files": [],
1918 "metadata": {
1919 "identifier": "other-item",
1920 "title": "Wrong item"
1921 }
1922 }))
1923 }
1924
1925 async fn search_error() -> (StatusCode, Json<Value>) {
1926 (
1927 StatusCode::BAD_GATEWAY,
1928 Json(json!({"error":"search failed","code":"bad_gateway"})),
1929 )
1930 }
1931
1932 async fn metadata_error() -> (StatusCode, Json<Value>) {
1933 (
1934 StatusCode::BAD_REQUEST,
1935 Json(json!({"error":"metadata failed","code":"bad_request"})),
1936 )
1937 }
1938
1939 async fn download_error() -> (StatusCode, &'static str) {
1940 (StatusCode::BAD_GATEWAY, "download failed")
1941 }
1942
1943 async fn missing_location() -> StatusCode {
1944 StatusCode::TEMPORARY_REDIRECT
1945 }
1946
1947 async fn failing_upload() -> (StatusCode, &'static str) {
1948 (StatusCode::INTERNAL_SERVER_ERROR, "upload failed")
1949 }
1950
1951 let app = Router::new()
1952 .route("/metadata/demo-item", get(metadata).post(metadata_error))
1953 .route("/metadata/blank-item", get(blank_metadata))
1954 .route("/metadata/non-item", get(non_item_metadata))
1955 .route("/metadata/mismatched-item", get(mismatched_metadata))
1956 .route("/advancedsearch.php", get(search_error))
1957 .route("/download/demo-item/missing.txt", get(download_error))
1958 .route("/s3/demo-item/missing-location.bin", put(missing_location))
1959 .route("/s3/demo-item/failing.bin", put(failing_upload));
1960 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1961 let addr = listener.local_addr().unwrap();
1962 let server = tokio::spawn(async move { axum::serve(listener, app).await.unwrap() });
1963
1964 let unauth = unauthenticated_test_client(addr);
1965 let auth = test_client(addr);
1966 let identifier = ItemIdentifier::new("demo-item").unwrap();
1967
1968 assert!(matches!(
1969 unauth.get_item_by_str("bad item").await.unwrap_err(),
1970 InternetArchiveError::Identifier(_)
1971 ));
1972 assert!(matches!(
1973 unauth.check_upload_limit(&identifier).await.unwrap_err(),
1974 InternetArchiveError::MissingAuth
1975 ));
1976 assert!(matches!(
1977 unauth
1978 .apply_metadata_patch(
1979 &identifier,
1980 MetadataTarget::Metadata,
1981 &[PatchOperation::replace("/title", "New title")],
1982 )
1983 .await
1984 .unwrap_err(),
1985 InternetArchiveError::MissingAuth
1986 ));
1987 assert!(matches!(
1988 unauth
1989 .apply_metadata_changes(
1990 &identifier,
1991 &[MetadataChange {
1992 target: "metadata".to_owned(),
1993 patch: vec![PatchOperation::add("/subject/-", "rust")],
1994 }],
1995 )
1996 .await
1997 .unwrap_err(),
1998 InternetArchiveError::MissingAuth
1999 ));
2000 assert!(matches!(
2001 unauth
2002 .upload_file(
2003 &identifier,
2004 &UploadSpec::from_bytes("demo.txt", b"hello"),
2005 &UploadOptions::default(),
2006 )
2007 .await
2008 .unwrap_err(),
2009 InternetArchiveError::MissingAuth
2010 ));
2011 assert!(matches!(
2012 unauth
2013 .create_item(
2014 &identifier,
2015 &ItemMetadata::builder().title("Demo").build(),
2016 &UploadSpec::from_bytes("demo.txt", b"hello"),
2017 &UploadOptions::default(),
2018 )
2019 .await
2020 .unwrap_err(),
2021 InternetArchiveError::MissingAuth
2022 ));
2023 assert!(matches!(
2024 unauth
2025 .delete_file(&identifier, "demo.txt", &DeleteOptions::default())
2026 .await
2027 .unwrap_err(),
2028 InternetArchiveError::MissingAuth
2029 ));
2030 assert!(matches!(
2031 unauth
2032 .update_item_metadata(
2033 &identifier,
2034 &ItemMetadata::builder().title("New title").build(),
2035 )
2036 .await
2037 .unwrap_err(),
2038 InternetArchiveError::MissingAuth
2039 ));
2040 assert!(matches!(
2041 unauth.make_dark(&identifier, "cleanup").await.unwrap_err(),
2042 InternetArchiveError::MissingAuth
2043 ));
2044
2045 assert!(matches!(
2046 auth.get_item(&ItemIdentifier::new("blank-item").unwrap())
2047 .await
2048 .unwrap_err(),
2049 InternetArchiveError::ItemNotFound { .. }
2050 ));
2051 assert!(matches!(
2052 auth.get_item(&ItemIdentifier::new("non-item").unwrap())
2053 .await
2054 .unwrap_err(),
2055 InternetArchiveError::ItemNotFound { .. }
2056 ));
2057 assert!(matches!(
2058 auth.get_item(&ItemIdentifier::new("mismatched-item").unwrap())
2059 .await
2060 .unwrap_err(),
2061 InternetArchiveError::ItemNotFound { .. }
2062 ));
2063 assert!(matches!(
2064 auth.search(&SearchQuery::identifier("demo-item"))
2065 .await
2066 .unwrap_err(),
2067 InternetArchiveError::Http { status, .. } if status == StatusCode::BAD_GATEWAY
2068 ));
2069 assert!(matches!(
2070 auth.download_bytes(&identifier, "missing.txt")
2071 .await
2072 .unwrap_err(),
2073 InternetArchiveError::Http { status, .. } if status == StatusCode::BAD_GATEWAY
2074 ));
2075 assert!(matches!(
2076 auth.apply_metadata_patch(
2077 &identifier,
2078 MetadataTarget::Metadata,
2079 &[PatchOperation::replace("/title", "New title")],
2080 )
2081 .await
2082 .unwrap_err(),
2083 InternetArchiveError::Http { status, .. } if status == StatusCode::BAD_REQUEST
2084 ));
2085 assert!(matches!(
2086 auth.upload_file(
2087 &identifier,
2088 &UploadSpec::from_bytes("missing-location.bin", b"hello"),
2089 &UploadOptions::default(),
2090 )
2091 .await
2092 .unwrap_err(),
2093 InternetArchiveError::InvalidState(message) if message.contains("missing location")
2094 ));
2095 assert!(matches!(
2096 auth.upload_file(
2097 &identifier,
2098 &UploadSpec::from_bytes("failing.bin", b"hello"),
2099 &UploadOptions::default(),
2100 )
2101 .await
2102 .unwrap_err(),
2103 InternetArchiveError::Http { status, .. } if status == StatusCode::INTERNAL_SERVER_ERROR
2104 ));
2105
2106 server.abort();
2107 }
2108
2109 #[tokio::test]
2110 async fn s3_create_helpers_reject_bucket_unsafe_identifiers_before_network_access() {
2111 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2112 let client = test_client(listener.local_addr().unwrap());
2113 let spec = UploadSpec::from_bytes("demo.txt", b"hello");
2114 let options = UploadOptions::default();
2115 let too_long =
2116 ItemIdentifier::new("a".repeat(ItemIdentifier::MAX_BUCKET_IDENTIFIER_LEN + 1)).unwrap();
2117 let uppercase = ItemIdentifier::new("Demo-item").unwrap();
2118
2119 assert!(matches!(
2120 client
2121 .create_item(
2122 &too_long,
2123 &ItemMetadata::builder().title("Demo").build(),
2124 &spec,
2125 &options,
2126 )
2127 .await
2128 .unwrap_err(),
2129 InternetArchiveError::Identifier(IdentifierError::TooLongForBucketCreation {
2130 max: ItemIdentifier::MAX_BUCKET_IDENTIFIER_LEN,
2131 ..
2132 })
2133 ));
2134
2135 assert!(matches!(
2136 client
2137 .create_item(
2138 &uppercase,
2139 &ItemMetadata::builder().title("Demo").build(),
2140 &spec,
2141 &options,
2142 )
2143 .await
2144 .unwrap_err(),
2145 InternetArchiveError::Identifier(IdentifierError::InvalidBucketCreationCharacter {
2146 character: 'D',
2147 ..
2148 })
2149 ));
2150 }
2151
2152 #[tokio::test]
2153 async fn existing_item_s3_helpers_and_limit_check_allow_documented_non_bucket_creation_identifiers(
2154 ) {
2155 async fn upload() -> StatusCode {
2156 StatusCode::OK
2157 }
2158
2159 async fn delete() -> StatusCode {
2160 StatusCode::NO_CONTENT
2161 }
2162
2163 async fn limit_check(uri: Uri) -> Json<Value> {
2164 let query = uri.query().unwrap();
2165 assert!(query.contains("check_limit=1"));
2166 assert!(query.contains("accesskey=access"));
2167 assert!(query.contains("bucket=Demo_Item"));
2168 Json(json!({
2169 "bucket": "Demo_Item",
2170 "accesskey": "access",
2171 "over_limit": 0,
2172 }))
2173 }
2174
2175 let app = Router::new()
2176 .route("/s3/", get(limit_check))
2177 .route("/s3/Demo_Item/demo.txt", put(upload).delete(delete));
2178 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2179 let addr = listener.local_addr().unwrap();
2180 let server = tokio::spawn(async move { axum::serve(listener, app).await.unwrap() });
2181 let client = test_client(addr);
2182 let identifier = ItemIdentifier::new("Demo_Item").unwrap();
2183 let spec = UploadSpec::from_bytes("demo.txt", b"hello");
2184
2185 client
2186 .upload_file(&identifier, &spec, &UploadOptions::default())
2187 .await
2188 .unwrap();
2189 client
2190 .delete_file(&identifier, "demo.txt", &DeleteOptions::default())
2191 .await
2192 .unwrap();
2193 assert_eq!(
2194 client.check_upload_limit(&identifier).await.unwrap().bucket,
2195 "Demo_Item"
2196 );
2197
2198 server.abort();
2199 }
2200
2201 #[cfg(feature = "indicatif")]
2202 #[tokio::test]
2203 async fn create_item_with_progress_handles_redirects_and_metadata_remainders() {
2204 async fn metadata(State(state): State<std::sync::Arc<StateData>>) -> Json<Value> {
2205 let mut reads = state.metadata_reads.lock().await;
2206 let payload = if *reads < 2 {
2207 json!({
2208 "files": [{"name": "demo.txt", "size": "5"}],
2209 "metadata": {
2210 "identifier": "demo-item",
2211 "title": "Demo item"
2212 }
2213 })
2214 } else {
2215 json!({
2216 "files": [{"name": "demo.txt", "size": "5"}],
2217 "metadata": {
2218 "identifier": "demo-item",
2219 "title": "Demo item",
2220 "custom": {"nested": true}
2221 }
2222 })
2223 };
2224 *reads += 1;
2225 Json(payload)
2226 }
2227
2228 async fn metadata_write(
2229 State(state): State<std::sync::Arc<StateData>>,
2230 body: String,
2231 ) -> (StatusCode, Json<Value>) {
2232 state.captured_mdapi_body.lock().await.push(body);
2233 (
2234 StatusCode::OK,
2235 Json(json!({
2236 "success": true,
2237 "task_id": 200,
2238 "log": "https://catalogd.archive.org/log/200"
2239 })),
2240 )
2241 }
2242
2243 async fn first_upload() -> (StatusCode, HeaderMap) {
2244 let mut headers = HeaderMap::new();
2245 headers.insert(
2246 LOCATION,
2247 HeaderValue::from_static("/s3-redirected/demo-item/demo.txt"),
2248 );
2249 (StatusCode::TEMPORARY_REDIRECT, headers)
2250 }
2251
2252 async fn redirected_upload(body: String) -> StatusCode {
2253 assert_eq!(body, "hello");
2254 StatusCode::OK
2255 }
2256
2257 let state = std::sync::Arc::new(StateData::default());
2258 let app = Router::new()
2259 .route("/metadata/demo-item", get(metadata).post(metadata_write))
2260 .route("/s3/demo-item/demo.txt", put(first_upload))
2261 .route("/s3-redirected/demo-item/demo.txt", put(redirected_upload))
2262 .with_state(state.clone());
2263
2264 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2265 let addr = listener.local_addr().unwrap();
2266 let server = tokio::spawn(async move { axum::serve(listener, app).await.unwrap() });
2267 let client = test_client(addr);
2268 let identifier = ItemIdentifier::new("demo-item").unwrap();
2269 let progress = ProgressBar::hidden();
2270
2271 client
2272 .create_item_with_progress(
2273 &identifier,
2274 &ItemMetadata::builder()
2275 .title("Demo item")
2276 .extra_json("custom", json!({"nested": true}))
2277 .build(),
2278 &UploadSpec::from_bytes("demo.txt", b"hello"),
2279 &UploadOptions::default(),
2280 &progress,
2281 )
2282 .await
2283 .unwrap();
2284
2285 assert_eq!(progress.length(), Some(5));
2286 assert_eq!(progress.position(), 5);
2287 assert!(progress.is_finished());
2288 assert_eq!(*state.metadata_reads.lock().await, 3);
2289 assert!(state.captured_mdapi_body.lock().await[0].contains("custom"));
2290
2291 server.abort();
2292 }
2293
2294 #[cfg(feature = "indicatif")]
2295 #[tokio::test]
2296 async fn progress_upload_reports_missing_location_foreign_redirect_and_http_errors() {
2297 async fn missing_location() -> StatusCode {
2298 StatusCode::TEMPORARY_REDIRECT
2299 }
2300
2301 async fn foreign_redirect() -> (StatusCode, HeaderMap) {
2302 let mut headers = HeaderMap::new();
2303 headers.insert(
2304 LOCATION,
2305 HeaderValue::from_static("http://example.com/stolen/demo-item/foreign.bin"),
2306 );
2307 (StatusCode::TEMPORARY_REDIRECT, headers)
2308 }
2309
2310 async fn failing_upload() -> (StatusCode, &'static str) {
2311 (StatusCode::INTERNAL_SERVER_ERROR, "upload failed")
2312 }
2313
2314 let app = Router::new()
2315 .route("/s3/demo-item/missing-location.bin", put(missing_location))
2316 .route("/s3/demo-item/foreign.bin", put(foreign_redirect))
2317 .route("/s3/demo-item/failing.bin", put(failing_upload));
2318 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2319 let addr = listener.local_addr().unwrap();
2320 let server = tokio::spawn(async move { axum::serve(listener, app).await.unwrap() });
2321 let client = test_client(addr);
2322 let identifier = ItemIdentifier::new("demo-item").unwrap();
2323
2324 let missing_progress = ProgressBar::hidden();
2325 assert!(matches!(
2326 client
2327 .upload_file_with_progress(
2328 &identifier,
2329 &UploadSpec::from_bytes("missing-location.bin", b"hello"),
2330 &UploadOptions::default(),
2331 &missing_progress,
2332 )
2333 .await
2334 .unwrap_err(),
2335 InternetArchiveError::InvalidState(message) if message.contains("missing location")
2336 ));
2337
2338 let foreign_progress = ProgressBar::hidden();
2339 assert!(matches!(
2340 client
2341 .upload_file_with_progress(
2342 &identifier,
2343 &UploadSpec::from_bytes("foreign.bin", b"hello"),
2344 &UploadOptions::default(),
2345 &foreign_progress,
2346 )
2347 .await
2348 .unwrap_err(),
2349 InternetArchiveError::InvalidState(message)
2350 if message.contains("redirected S3 host")
2351 ));
2352
2353 let failing_progress = ProgressBar::hidden();
2354 assert!(matches!(
2355 client
2356 .upload_file_with_progress(
2357 &identifier,
2358 &UploadSpec::from_bytes("failing.bin", b"hello"),
2359 &UploadOptions::default(),
2360 &failing_progress,
2361 )
2362 .await
2363 .unwrap_err(),
2364 InternetArchiveError::Http { status, .. } if status == StatusCode::INTERNAL_SERVER_ERROR
2365 ));
2366
2367 server.abort();
2368 }
2369
2370 #[cfg(feature = "indicatif")]
2371 #[tokio::test]
2372 async fn redirect_edge_cases_are_reported_for_plain_and_progress_uploads() {
2373 async fn endless_redirect() -> (StatusCode, HeaderMap) {
2374 let mut headers = HeaderMap::new();
2375 headers.insert(LOCATION, HeaderValue::from_static("/s3/demo-item/spin.bin"));
2376 (StatusCode::TEMPORARY_REDIRECT, headers)
2377 }
2378
2379 async fn invalid_location() -> (StatusCode, HeaderMap) {
2380 let mut headers = HeaderMap::new();
2381 headers.insert(
2382 LOCATION,
2383 HeaderValue::from_bytes(b"/s3/demo-item/\xff.bin").unwrap(),
2384 );
2385 (StatusCode::TEMPORARY_REDIRECT, headers)
2386 }
2387
2388 let app = Router::new()
2389 .route("/s3/demo-item/spin.bin", put(endless_redirect))
2390 .route("/s3/demo-item/bad-location.bin", put(invalid_location));
2391 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2392 let addr = listener.local_addr().unwrap();
2393 let server = tokio::spawn(async move { axum::serve(listener, app).await.unwrap() });
2394 let client = test_client(addr);
2395 let identifier = ItemIdentifier::new("demo-item").unwrap();
2396
2397 assert!(matches!(
2398 client
2399 .upload_file(
2400 &identifier,
2401 &UploadSpec::from_bytes("spin.bin", b"hello"),
2402 &UploadOptions::default(),
2403 )
2404 .await
2405 .unwrap_err(),
2406 InternetArchiveError::InvalidState(message) if message.contains("too many redirects")
2407 ));
2408 assert!(matches!(
2409 client
2410 .upload_file(
2411 &identifier,
2412 &UploadSpec::from_bytes("bad-location.bin", b"hello"),
2413 &UploadOptions::default(),
2414 )
2415 .await
2416 .unwrap_err(),
2417 InternetArchiveError::InvalidState(message)
2418 if message.contains("not valid UTF-8")
2419 ));
2420
2421 let spin_progress = ProgressBar::hidden();
2422 assert!(matches!(
2423 client
2424 .upload_file_with_progress(
2425 &identifier,
2426 &UploadSpec::from_bytes("spin.bin", b"hello"),
2427 &UploadOptions::default(),
2428 &spin_progress,
2429 )
2430 .await
2431 .unwrap_err(),
2432 InternetArchiveError::InvalidState(message) if message.contains("too many redirects")
2433 ));
2434
2435 let bad_progress = ProgressBar::hidden();
2436 assert!(matches!(
2437 client
2438 .upload_file_with_progress(
2439 &identifier,
2440 &UploadSpec::from_bytes("bad-location.bin", b"hello"),
2441 &UploadOptions::default(),
2442 &bad_progress,
2443 )
2444 .await
2445 .unwrap_err(),
2446 InternetArchiveError::InvalidState(message)
2447 if message.contains("not valid UTF-8")
2448 ));
2449
2450 server.abort();
2451 }
2452
2453 #[cfg(feature = "indicatif")]
2454 #[tokio::test]
2455 async fn download_bytes_with_progress_reports_http_errors() {
2456 async fn download_error() -> (StatusCode, &'static str) {
2457 (StatusCode::BAD_GATEWAY, "download failed")
2458 }
2459
2460 let app = Router::new().route("/download/demo-item/missing.txt", get(download_error));
2461 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2462 let addr = listener.local_addr().unwrap();
2463 let server = tokio::spawn(async move { axum::serve(listener, app).await.unwrap() });
2464 let client = test_client(addr);
2465 let progress = ProgressBar::hidden();
2466
2467 assert!(matches!(
2468 client
2469 .download_bytes_with_progress(&ItemIdentifier::new("demo-item").unwrap(), "missing.txt", &progress)
2470 .await
2471 .unwrap_err(),
2472 InternetArchiveError::Http { status, .. } if status == StatusCode::BAD_GATEWAY
2473 ));
2474
2475 server.abort();
2476 }
2477
2478 #[cfg(feature = "indicatif")]
2479 #[tokio::test]
2480 async fn replayable_body_apply_with_progress_sets_lengths_for_paths_and_bytes() {
2481 let client = reqwest::Client::new();
2482
2483 let bytes_progress = ProgressBar::hidden();
2484 let _bytes_request = ReplayableBody::Bytes(b"hello".to_vec())
2485 .apply_with_progress(client.put("http://example.com/bytes"), &bytes_progress)
2486 .await
2487 .unwrap();
2488 assert_eq!(bytes_progress.length(), Some(5));
2489 assert_eq!(bytes_progress.position(), 0);
2490
2491 let directory = tempfile::tempdir().unwrap();
2492 let path = directory.path().join("demo.txt");
2493 tokio::fs::write(&path, b"hello").await.unwrap();
2494
2495 let path_progress = ProgressBar::hidden();
2496 let _path_request = ReplayableBody::Path { path, length: 5 }
2497 .apply_with_progress(client.put("http://example.com/path"), &path_progress)
2498 .await
2499 .unwrap();
2500 assert_eq!(path_progress.length(), Some(5));
2501 assert_eq!(path_progress.position(), 0);
2502 }
2503
2504 #[cfg(feature = "indicatif")]
2505 #[test]
2506 fn progress_stream_and_chunked_bytes_stream_cover_poll_paths() {
2507 let payload = Bytes::from_static(b"hello world");
2508 let progress = ProgressBar::hidden();
2509 let mut stream =
2510 ProgressStream::new(ChunkedBytesStream::new(payload.clone()), progress.clone());
2511 let waker = Waker::noop();
2512 let mut context = Context::from_waker(waker);
2513
2514 let first = Pin::new(&mut stream).poll_next(&mut context);
2515 assert!(matches!(first, Poll::Ready(Some(Ok(ref chunk))) if chunk == &payload));
2516 assert_eq!(progress.position(), payload.len() as u64);
2517
2518 let second = Pin::new(&mut stream).poll_next(&mut context);
2519 assert!(matches!(second, Poll::Ready(None)));
2520 }
2521}