Skip to main content

internetarchive_rs/
client.rs

1//! Low-level typed Internet Archive client operations.
2
3use std::path::Path;
4#[cfg(feature = "indicatif")]
5use std::pin::Pin;
6#[cfg(feature = "indicatif")]
7use std::task::{Context, Poll};
8use std::time::Duration;
9
10#[cfg(feature = "indicatif")]
11use futures_core::Stream;
12#[cfg(feature = "indicatif")]
13use indicatif::ProgressBar;
14use reqwest::header::{
15    HeaderMap, HeaderName, HeaderValue, ACCEPT, AUTHORIZATION, CONTENT_LENGTH, CONTENT_TYPE,
16    LOCATION,
17};
18use reqwest::{Method, StatusCode};
19use secrecy::{ExposeSecret, SecretString};
20use serde_json::Value;
21use tokio::fs::File;
22use tokio_util::io::ReaderStream;
23use url::Url;
24
25use crate::downloads::ResolvedDownload;
26use crate::endpoint::Endpoint;
27use crate::error::{decode_metadata_write_failure, InternetArchiveError};
28use crate::ids::SecretPair;
29use crate::metadata::{
30    merge_metadata_semantically, metadata_contains_projection, HeaderEncoding, ItemMetadata,
31    MetadataChange, MetadataTarget, PatchOperation,
32};
33use crate::model::{Item, MetadataWriteResponse, S3LimitCheck, SearchResponse, TaskSubmission};
34use crate::poll::PollOptions;
35use crate::search::SearchQuery;
36use crate::upload::{DeleteOptions, UploadOptions, UploadSource, UploadSpec};
37use crate::ItemIdentifier;
38
39/// LOW-auth credentials for Internet Archive programmatic access.
40#[derive(Clone)]
41pub struct Auth {
42    pub(crate) secrets: SecretPair,
43}
44
45impl Auth {
46    /// Standard environment variable for the S3 access key.
47    pub const ACCESS_KEY_ENV_VAR: &'static str = "INTERNET_ARCHIVE_ACCESS_KEY";
48    /// Standard environment variable for the S3 secret key.
49    pub const SECRET_KEY_ENV_VAR: &'static str = "INTERNET_ARCHIVE_SECRET_KEY";
50
51    /// Creates a new auth pair.
52    #[must_use]
53    pub fn new(access_key: impl Into<String>, secret_key: impl Into<String>) -> Self {
54        Self {
55            secrets: SecretPair {
56                access_key: SecretString::from(access_key.into()),
57                secret_key: SecretString::from(secret_key.into()),
58            },
59        }
60    }
61
62    /// Reads credentials from the standard environment variables.
63    ///
64    /// # Errors
65    ///
66    /// Returns an error if either environment variable is missing.
67    pub fn from_env() -> Result<Self, InternetArchiveError> {
68        Self::from_env_vars(Self::ACCESS_KEY_ENV_VAR, Self::SECRET_KEY_ENV_VAR)
69    }
70
71    /// Reads credentials from custom environment variables.
72    ///
73    /// # Errors
74    ///
75    /// Returns an error if either environment variable is missing.
76    pub fn from_env_vars(
77        access_name: &str,
78        secret_name: &str,
79    ) -> Result<Self, InternetArchiveError> {
80        let access_key =
81            std::env::var(access_name).map_err(|source| InternetArchiveError::EnvVar {
82                name: access_name.to_owned(),
83                source,
84            })?;
85        let secret_key =
86            std::env::var(secret_name).map_err(|source| InternetArchiveError::EnvVar {
87                name: secret_name.to_owned(),
88                source,
89            })?;
90        Ok(Self::new(access_key, secret_key))
91    }
92
93    #[must_use]
94    pub(crate) fn authorization_header(&self) -> String {
95        format!(
96            "LOW {}:{}",
97            self.secrets.access_key.expose_secret(),
98            self.secrets.secret_key.expose_secret()
99        )
100    }
101}
102
103impl std::fmt::Debug for Auth {
104    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105        f.debug_struct("Auth")
106            .field("access_key", &"<redacted>")
107            .field("secret_key", &"<redacted>")
108            .finish()
109    }
110}
111
112/// Builder for configuring an [`InternetArchiveClient`].
113#[derive(Clone, Debug)]
114pub struct InternetArchiveClientBuilder {
115    auth: Option<Auth>,
116    endpoint: Endpoint,
117    poll: PollOptions,
118    user_agent: Option<String>,
119    request_timeout: Option<Duration>,
120    connect_timeout: Option<Duration>,
121}
122
123impl InternetArchiveClientBuilder {
124    /// Sets the credentials used for authenticated operations.
125    #[must_use]
126    pub fn auth(mut self, auth: Auth) -> Self {
127        self.auth = Some(auth);
128        self
129    }
130
131    /// Overrides the endpoint roots.
132    #[must_use]
133    pub fn endpoint(mut self, endpoint: Endpoint) -> Self {
134        self.endpoint = endpoint;
135        self
136    }
137
138    /// Overrides the `User-Agent` header.
139    #[must_use]
140    pub fn user_agent(mut self, user_agent: impl Into<String>) -> Self {
141        self.user_agent = Some(user_agent.into());
142        self
143    }
144
145    /// Sets the overall request timeout.
146    #[must_use]
147    pub fn request_timeout(mut self, timeout: Duration) -> Self {
148        self.request_timeout = Some(timeout);
149        self
150    }
151
152    /// Sets the TCP connect timeout.
153    #[must_use]
154    pub fn connect_timeout(mut self, timeout: Duration) -> Self {
155        self.connect_timeout = Some(timeout);
156        self
157    }
158
159    /// Overrides workflow polling behavior.
160    #[must_use]
161    pub fn poll_options(mut self, poll: PollOptions) -> Self {
162        self.poll = poll;
163        self
164    }
165
166    /// Builds the client.
167    ///
168    /// # Errors
169    ///
170    /// Returns an error if the underlying HTTP clients cannot be built.
171    pub fn build(self) -> Result<InternetArchiveClient, InternetArchiveError> {
172        let user_agent = self
173            .user_agent
174            .unwrap_or_else(|| format!("{}/{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION")));
175
176        let build_client =
177            |redirects_enabled: bool| -> Result<reqwest::Client, InternetArchiveError> {
178                let mut builder = reqwest::Client::builder().user_agent(&user_agent);
179                if let Some(timeout) = self.request_timeout {
180                    builder = builder.timeout(timeout);
181                }
182                if let Some(timeout) = self.connect_timeout {
183                    builder = builder.connect_timeout(timeout);
184                }
185                if !redirects_enabled {
186                    builder = builder.redirect(reqwest::redirect::Policy::none());
187                }
188                builder.build().map_err(Into::into)
189            };
190
191        Ok(InternetArchiveClient {
192            inner: build_client(true)?,
193            s3_inner: build_client(false)?,
194            auth: self.auth,
195            endpoint: self.endpoint,
196            poll: self.poll,
197            request_timeout: self.request_timeout,
198            connect_timeout: self.connect_timeout,
199        })
200    }
201}
202
203/// Typed async client for Internet Archive metadata, search, uploads, and downloads.
204#[derive(Clone, Debug)]
205pub struct InternetArchiveClient {
206    pub(crate) inner: reqwest::Client,
207    pub(crate) s3_inner: reqwest::Client,
208    pub(crate) auth: Option<Auth>,
209    pub(crate) endpoint: Endpoint,
210    pub(crate) poll: PollOptions,
211    pub(crate) request_timeout: Option<Duration>,
212    pub(crate) connect_timeout: Option<Duration>,
213}
214
215impl InternetArchiveClient {
216    /// Starts building a client.
217    #[must_use]
218    pub fn builder() -> InternetArchiveClientBuilder {
219        InternetArchiveClientBuilder {
220            auth: None,
221            endpoint: Endpoint::default(),
222            poll: PollOptions::default(),
223            user_agent: None,
224            request_timeout: None,
225            connect_timeout: None,
226        }
227    }
228
229    /// Builds an unauthenticated client.
230    ///
231    /// # Errors
232    ///
233    /// Returns an error if the underlying HTTP clients cannot be built.
234    pub fn new() -> Result<Self, InternetArchiveError> {
235        Self::builder().build()
236    }
237
238    /// Builds a client with explicit credentials.
239    ///
240    /// # Errors
241    ///
242    /// Returns an error if the underlying HTTP clients cannot be built.
243    pub fn with_auth(auth: Auth) -> Result<Self, InternetArchiveError> {
244        Self::builder().auth(auth).build()
245    }
246
247    /// Builds a client from the standard Internet Archive environment variables.
248    ///
249    /// # Errors
250    ///
251    /// Returns an error if the environment variables are missing or if the
252    /// underlying HTTP clients cannot be built.
253    pub fn from_env() -> Result<Self, InternetArchiveError> {
254        Self::with_auth(Auth::from_env()?)
255    }
256
257    /// Returns the configured endpoint roots.
258    #[must_use]
259    pub fn endpoint(&self) -> &Endpoint {
260        &self.endpoint
261    }
262
263    /// Returns the configured workflow polling options.
264    #[must_use]
265    pub fn poll_options(&self) -> &PollOptions {
266        &self.poll
267    }
268
269    /// Returns the request timeout.
270    #[must_use]
271    pub fn request_timeout(&self) -> Option<Duration> {
272        self.request_timeout
273    }
274
275    /// Returns the connect timeout.
276    #[must_use]
277    pub fn connect_timeout(&self) -> Option<Duration> {
278        self.connect_timeout
279    }
280
281    /// Returns whether the client currently has credentials configured.
282    #[must_use]
283    pub fn has_auth(&self) -> bool {
284        self.auth.is_some()
285    }
286
287    /// Fetches a full item metadata record by identifier.
288    ///
289    /// # Errors
290    ///
291    /// Returns an error if the request fails or the item does not exist.
292    pub async fn get_item(
293        &self,
294        identifier: &ItemIdentifier,
295    ) -> Result<Item, InternetArchiveError> {
296        let url = self.endpoint.metadata_url(identifier.as_str())?;
297        let bytes = self
298            .execute_bytes(
299                self.archive_request(Method::GET, url)
300                    .header(ACCEPT, "application/json"),
301            )
302            .await?;
303
304        if bytes.iter().all(u8::is_ascii_whitespace) || bytes.as_ref() == b"[]" {
305            return Err(InternetArchiveError::ItemNotFound {
306                identifier: identifier.to_string(),
307            });
308        }
309
310        let item: Item = serde_json::from_slice(&bytes)?;
311        if item.identifier().as_ref() != Some(identifier) {
312            return Err(InternetArchiveError::ItemNotFound {
313                identifier: identifier.to_string(),
314            });
315        }
316
317        Ok(item)
318    }
319
320    /// Fetches a full item metadata record from a string identifier.
321    ///
322    /// # Errors
323    ///
324    /// Returns an error if the identifier is invalid, the request fails, or the
325    /// item does not exist.
326    pub async fn get_item_by_str(
327        &self,
328        identifier: impl AsRef<str>,
329    ) -> Result<Item, InternetArchiveError> {
330        let identifier = ItemIdentifier::new(identifier.as_ref())?;
331        self.get_item(&identifier).await
332    }
333
334    /// Runs an advanced search query.
335    ///
336    /// # Errors
337    ///
338    /// Returns an error if the request fails or the response is invalid.
339    pub async fn search(
340        &self,
341        query: &SearchQuery,
342    ) -> Result<SearchResponse, InternetArchiveError> {
343        let url = query.into_url(self.endpoint.search_url()?)?;
344        let response = self
345            .archive_request(Method::GET, url)
346            .header(ACCEPT, "application/json")
347            .send()
348            .await?;
349        if !response.status().is_success() {
350            return Err(InternetArchiveError::from_response(response).await);
351        }
352
353        let bytes = response.bytes().await?;
354        decode_search_response(&bytes)
355    }
356
357    /// Checks whether the S3 queue is currently over its documented upload limit.
358    ///
359    /// # Errors
360    ///
361    /// Returns an error if the client has no credentials or if the request
362    /// fails.
363    pub async fn check_upload_limit(
364        &self,
365        identifier: &ItemIdentifier,
366    ) -> Result<S3LimitCheck, InternetArchiveError> {
367        let auth = self
368            .auth
369            .as_ref()
370            .ok_or(InternetArchiveError::MissingAuth)?;
371        let url = self
372            .endpoint
373            .s3_limit_check_url(auth.secrets.access_key.expose_secret(), identifier.as_str())?;
374        self.execute_json(self.s3_request(Method::GET, url, HeaderMap::new())?)
375            .await
376    }
377
378    /// Applies a single-target metadata patch document.
379    ///
380    /// # Errors
381    ///
382    /// Returns an error if the client has no credentials, the request fails, or
383    /// MDAPI rejects the patch.
384    pub async fn apply_metadata_patch(
385        &self,
386        identifier: &ItemIdentifier,
387        target: MetadataTarget,
388        patch: &[PatchOperation],
389    ) -> Result<MetadataWriteResponse, InternetArchiveError> {
390        if self.auth.is_none() {
391            return Err(InternetArchiveError::MissingAuth);
392        }
393        let url = self.endpoint.metadata_url(identifier.as_str())?;
394        let patch = serde_json::to_string(patch)?;
395        self.execute_metadata_write(
396            self.archive_request(Method::POST, url)
397                .header(CONTENT_TYPE, "application/x-www-form-urlencoded")
398                .form(&[("-target", target.as_str()), ("-patch", patch)]),
399        )
400        .await
401    }
402
403    /// Applies an atomic multi-target metadata write.
404    ///
405    /// # Errors
406    ///
407    /// Returns an error if the client has no credentials, the request fails, or
408    /// MDAPI rejects the patch document.
409    pub async fn apply_metadata_changes(
410        &self,
411        identifier: &ItemIdentifier,
412        changes: &[MetadataChange],
413    ) -> Result<MetadataWriteResponse, InternetArchiveError> {
414        if self.auth.is_none() {
415            return Err(InternetArchiveError::MissingAuth);
416        }
417        let url = self.endpoint.metadata_url(identifier.as_str())?;
418        let payload = serde_json::to_string(changes)?;
419        self.execute_metadata_write(
420            self.archive_request(Method::POST, url)
421                .header(CONTENT_TYPE, "application/x-www-form-urlencoded")
422                .form(&[("-changes", payload)]),
423        )
424        .await
425    }
426
427    /// Updates the item-level metadata document by overlaying the provided keys
428    /// onto the current metadata and diffing the result.
429    ///
430    /// Missing keys in `metadata` are left untouched. Existing entries within
431    /// list-valued fields (for example `collection`, `subject`, `language`) are
432    /// also preserved; the update set is unioned into them. Use
433    /// [`Self::apply_metadata_patch`] when you want exact JSON Patch behavior,
434    /// including removing individual list entries or whole keys.
435    ///
436    /// If there are no effective changes, the method returns a synthetic
437    /// successful response with no task id and does not require authentication.
438    ///
439    /// # Errors
440    ///
441    /// Returns an error if the client has no credentials, the item cannot be
442    /// fetched, or the patch cannot be applied.
443    pub async fn update_item_metadata(
444        &self,
445        identifier: &ItemIdentifier,
446        metadata: &ItemMetadata,
447    ) -> Result<MetadataWriteResponse, InternetArchiveError> {
448        let current = self.get_item(identifier).await?;
449        let current_value = serde_json::to_value(&current.metadata)?;
450        let desired_value =
451            serde_json::to_value(merge_metadata_semantically(&current.metadata, metadata))?;
452        let patch_value = json_patch::diff(&current_value, &desired_value);
453        let patch: Vec<PatchOperation> =
454            serde_json::from_value(serde_json::to_value(patch_value)?)?;
455
456        if patch.is_empty() {
457            return Ok(MetadataWriteResponse {
458                success: true,
459                task_id: None,
460                log: None,
461                error: None,
462            });
463        }
464
465        if self.auth.is_none() {
466            return Err(InternetArchiveError::MissingAuth);
467        }
468
469        self.apply_metadata_patch(identifier, MetadataTarget::Metadata, &patch)
470            .await
471    }
472
473    /// Uploads a file to an existing item.
474    ///
475    /// # Errors
476    ///
477    /// Returns an error if the client has no credentials, the request fails, or
478    /// IA rejects the upload.
479    pub async fn upload_file(
480        &self,
481        identifier: &ItemIdentifier,
482        spec: &UploadSpec,
483        options: &UploadOptions,
484    ) -> Result<(), InternetArchiveError> {
485        self.put_object(identifier, spec, options, None, false)
486            .await
487    }
488
489    /// Uploads a file to an existing item while updating an `indicatif`
490    /// progress bar.
491    ///
492    /// Available when the `indicatif` feature is enabled.
493    ///
494    /// # Errors
495    ///
496    /// Returns an error if the client has no credentials, the request fails, or
497    /// IA rejects the upload.
498    #[cfg(feature = "indicatif")]
499    pub async fn upload_file_with_progress(
500        &self,
501        identifier: &ItemIdentifier,
502        spec: &UploadSpec,
503        options: &UploadOptions,
504        progress: &ProgressBar,
505    ) -> Result<(), InternetArchiveError> {
506        self.put_object_with_progress(identifier, spec, options, None, false, progress)
507            .await
508    }
509
510    /// Creates a new item by uploading the first file with automatic bucket
511    /// creation headers and initial metadata.
512    ///
513    /// # Errors
514    ///
515    /// Returns an error if the identifier is not valid for IA-S3 bucket
516    /// creation, the client has no credentials, the request fails, or IA
517    /// rejects the upload.
518    pub async fn create_item(
519        &self,
520        identifier: &ItemIdentifier,
521        metadata: &ItemMetadata,
522        spec: &UploadSpec,
523        options: &UploadOptions,
524    ) -> Result<(), InternetArchiveError> {
525        let header_encoding = metadata.as_header_encoding();
526        let remainder = header_encoding.remainder.clone();
527        self.put_object(identifier, spec, options, Some(header_encoding), true)
528            .await?;
529
530        if !remainder.as_map().is_empty() {
531            self.wait_for_item(identifier).await?;
532            self.update_item_metadata(identifier, &remainder).await?;
533            self.wait_for_item_projection(identifier, &[], &remainder)
534                .await?;
535        }
536
537        Ok(())
538    }
539
540    /// Creates a new item while updating an `indicatif` progress bar for the
541    /// initial upload step.
542    ///
543    /// Available when the `indicatif` feature is enabled.
544    ///
545    /// # Errors
546    ///
547    /// Returns an error if the identifier is not valid for IA-S3 bucket
548    /// creation, the client has no credentials, the request fails, or IA
549    /// rejects the upload.
550    #[cfg(feature = "indicatif")]
551    pub async fn create_item_with_progress(
552        &self,
553        identifier: &ItemIdentifier,
554        metadata: &ItemMetadata,
555        spec: &UploadSpec,
556        options: &UploadOptions,
557        progress: &ProgressBar,
558    ) -> Result<(), InternetArchiveError> {
559        let header_encoding = metadata.as_header_encoding();
560        let remainder = header_encoding.remainder.clone();
561        self.put_object_with_progress(
562            identifier,
563            spec,
564            options,
565            Some(header_encoding),
566            true,
567            progress,
568        )
569        .await?;
570
571        if !remainder.as_map().is_empty() {
572            self.wait_for_item(identifier).await?;
573            self.update_item_metadata(identifier, &remainder).await?;
574            self.wait_for_item_projection(identifier, &[], &remainder)
575                .await?;
576        }
577
578        Ok(())
579    }
580
581    /// Deletes a file from an item through the S3-like API.
582    ///
583    /// # Errors
584    ///
585    /// Returns an error if the client has no credentials, the request fails, or
586    /// IA rejects the delete.
587    pub async fn delete_file(
588        &self,
589        identifier: &ItemIdentifier,
590        filename: &str,
591        options: &DeleteOptions,
592    ) -> Result<(), InternetArchiveError> {
593        let mut headers = HeaderMap::new();
594        if options.cascade_delete {
595            headers.insert("x-archive-cascade-delete", HeaderValue::from_static("1"));
596        }
597        if options.keep_old_version {
598            headers.insert("x-archive-keep-old-version", HeaderValue::from_static("1"));
599        }
600
601        let url = self.endpoint.s3_object_url(identifier.as_str(), filename)?;
602        self.execute_s3(Method::DELETE, url, headers, None).await?;
603        Ok(())
604    }
605
606    /// Submits a `make_dark.php` task to the Internet Archive catalog tasks
607    /// API, hiding the item from public view.
608    ///
609    /// Darkening makes the item unavailable to all users, including the
610    /// uploader and IA's metadata and search subsystems; the item's metadata
611    /// stub remains with `is_dark: true`, but `/details/{id}` returns 404 and
612    /// search no longer indexes it. The caller must have uploaded the item or
613    /// otherwise have edit permission. The `comment` argument is recorded in
614    /// the task log; supply a short rationale (for example `"live test
615    /// cleanup"`).
616    ///
617    /// # Errors
618    ///
619    /// Returns an error if the client has no credentials, the request fails,
620    /// or IA rejects the task submission (for example with `401` when the
621    /// caller does not own the item).
622    pub async fn make_dark(
623        &self,
624        identifier: &ItemIdentifier,
625        comment: &str,
626    ) -> Result<TaskSubmission, InternetArchiveError> {
627        if self.auth.is_none() {
628            return Err(InternetArchiveError::MissingAuth);
629        }
630        let url = self.endpoint.tasks_url()?;
631        let payload = serde_json::json!({
632            "identifier": identifier.as_str(),
633            "cmd": "make_dark.php",
634            "args": { "comment": comment },
635        });
636        let response = self
637            .archive_request(Method::POST, url)
638            .header(CONTENT_TYPE, "application/json")
639            .header(ACCEPT, "application/json")
640            .body(serde_json::to_vec(&payload)?)
641            .send()
642            .await?;
643        if !response.status().is_success() {
644            return Err(InternetArchiveError::from_response(response).await);
645        }
646        let bytes = response.bytes().await?;
647        decode_task_submission(&bytes)
648    }
649
650    /// Resolves the public download URL for a file.
651    ///
652    /// # Errors
653    ///
654    /// Returns an error if URL construction fails.
655    pub fn resolve_download(
656        &self,
657        identifier: &ItemIdentifier,
658        filename: &str,
659    ) -> Result<ResolvedDownload, InternetArchiveError> {
660        Ok(ResolvedDownload {
661            identifier: identifier.clone(),
662            filename: filename.to_owned(),
663            url: self.endpoint.download_url(identifier.as_str(), filename)?,
664        })
665    }
666
667    /// Downloads a file into memory.
668    ///
669    /// # Errors
670    ///
671    /// Returns an error if the request fails.
672    pub async fn download_bytes(
673        &self,
674        identifier: &ItemIdentifier,
675        filename: &str,
676    ) -> Result<bytes::Bytes, InternetArchiveError> {
677        let resolved = self.resolve_download(identifier, filename)?;
678        self.execute_bytes(self.inner.get(resolved.url)).await
679    }
680
681    /// Downloads a file into memory while updating an `indicatif` progress bar.
682    ///
683    /// Available when the `indicatif` feature is enabled.
684    ///
685    /// # Errors
686    ///
687    /// Returns an error if the request fails.
688    #[cfg(feature = "indicatif")]
689    pub async fn download_bytes_with_progress(
690        &self,
691        identifier: &ItemIdentifier,
692        filename: &str,
693        progress: &ProgressBar,
694    ) -> Result<bytes::Bytes, InternetArchiveError> {
695        let resolved = self.resolve_download(identifier, filename)?;
696        self.execute_bytes_with_progress(self.inner.get(resolved.url), progress)
697            .await
698    }
699
700    /// Downloads a file to a local path.
701    ///
702    /// # Errors
703    ///
704    /// Returns an error if the request or local file write fails.
705    pub async fn download_to_path(
706        &self,
707        identifier: &ItemIdentifier,
708        filename: &str,
709        path: impl AsRef<Path>,
710    ) -> Result<(), InternetArchiveError> {
711        let bytes = self.download_bytes(identifier, filename).await?;
712        tokio::fs::write(path, &bytes).await?;
713        Ok(())
714    }
715
716    /// Downloads a file to a local path while updating an `indicatif`
717    /// progress bar.
718    ///
719    /// Available when the `indicatif` feature is enabled.
720    ///
721    /// # Errors
722    ///
723    /// Returns an error if the request or local file write fails.
724    #[cfg(feature = "indicatif")]
725    pub async fn download_to_path_with_progress(
726        &self,
727        identifier: &ItemIdentifier,
728        filename: &str,
729        path: impl AsRef<Path>,
730        progress: &ProgressBar,
731    ) -> Result<(), InternetArchiveError> {
732        let bytes = self
733            .download_bytes_with_progress(identifier, filename, progress)
734            .await?;
735        tokio::fs::write(path, &bytes).await?;
736        Ok(())
737    }
738
739    pub(crate) async fn wait_for_item(
740        &self,
741        identifier: &ItemIdentifier,
742    ) -> Result<Item, InternetArchiveError> {
743        self.wait_until("item visibility", || async {
744            self.get_item(identifier).await
745        })
746        .await
747    }
748
749    pub(crate) async fn wait_for_item_projection(
750        &self,
751        identifier: &ItemIdentifier,
752        expected_files: &[String],
753        expected_metadata: &ItemMetadata,
754    ) -> Result<Item, InternetArchiveError> {
755        self.wait_until("item projection visibility", || async {
756            let item = self.get_item(identifier).await?;
757            if expected_files
758                .iter()
759                .all(|filename| item.file(filename).is_some())
760                && metadata_contains_projection(&item.metadata, expected_metadata)
761            {
762                Ok(item)
763            } else {
764                Err(InternetArchiveError::ItemNotFound {
765                    identifier: identifier.to_string(),
766                })
767            }
768        })
769        .await
770    }
771
772    async fn put_object(
773        &self,
774        identifier: &ItemIdentifier,
775        spec: &UploadSpec,
776        options: &UploadOptions,
777        metadata: Option<HeaderEncoding>,
778        auto_make_bucket: bool,
779    ) -> Result<(), InternetArchiveError> {
780        let (url, headers, body) = self
781            .prepare_put_object(identifier, spec, options, metadata, auto_make_bucket)
782            .await?;
783        self.execute_s3(Method::PUT, url, headers, Some(body))
784            .await?;
785        Ok(())
786    }
787
788    #[cfg(feature = "indicatif")]
789    async fn put_object_with_progress(
790        &self,
791        identifier: &ItemIdentifier,
792        spec: &UploadSpec,
793        options: &UploadOptions,
794        metadata: Option<HeaderEncoding>,
795        auto_make_bucket: bool,
796        progress: &ProgressBar,
797    ) -> Result<(), InternetArchiveError> {
798        let (url, headers, body) = self
799            .prepare_put_object(identifier, spec, options, metadata, auto_make_bucket)
800            .await?;
801        self.execute_s3_with_progress(Method::PUT, url, headers, Some(body), progress)
802            .await?;
803        Ok(())
804    }
805
806    async fn prepare_put_object(
807        &self,
808        identifier: &ItemIdentifier,
809        spec: &UploadSpec,
810        options: &UploadOptions,
811        metadata: Option<HeaderEncoding>,
812        auto_make_bucket: bool,
813    ) -> Result<(Url, HeaderMap, ReplayableBody), InternetArchiveError> {
814        if auto_make_bucket {
815            identifier.validate_for_bucket_creation()?;
816        }
817
818        let mut headers = HeaderMap::new();
819        headers.insert(
820            CONTENT_TYPE,
821            HeaderValue::from_str(spec.content_type.as_ref()).map_err(|_| {
822                InternetArchiveError::InvalidState("invalid content type".to_owned())
823            })?,
824        );
825
826        if auto_make_bucket {
827            headers.insert("x-archive-auto-make-bucket", HeaderValue::from_static("1"));
828            headers.insert("x-amz-auto-make-bucket", HeaderValue::from_static("1"));
829        }
830        if options.skip_derive {
831            headers.insert("x-archive-queue-derive", HeaderValue::from_static("0"));
832        }
833        if options.keep_old_version {
834            headers.insert("x-archive-keep-old-version", HeaderValue::from_static("1"));
835        }
836        if options.interactive_priority {
837            headers.insert(
838                "x-archive-interactive-priority",
839                HeaderValue::from_static("1"),
840            );
841        }
842        if let Some(size_hint) = options.size_hint {
843            headers.insert(
844                "x-archive-size-hint",
845                HeaderValue::from_str(&size_hint.to_string()).map_err(|_| {
846                    InternetArchiveError::InvalidState("invalid size hint".to_owned())
847                })?,
848            );
849        }
850        if let Some(metadata) = metadata {
851            for (name, value) in metadata.headers {
852                let name = HeaderName::from_bytes(name.as_bytes()).map_err(|_| {
853                    InternetArchiveError::InvalidState("invalid metadata header name".to_owned())
854                })?;
855                headers.insert(
856                    name,
857                    HeaderValue::from_str(&value).map_err(|_| {
858                        InternetArchiveError::InvalidState(
859                            "invalid metadata header value".to_owned(),
860                        )
861                    })?,
862                );
863            }
864        }
865
866        let body = match &spec.source {
867            UploadSource::Path(path) => {
868                let length = tokio::fs::metadata(path).await?.len();
869                ReplayableBody::Path {
870                    path: path.clone(),
871                    length,
872                }
873            }
874            UploadSource::Bytes(bytes) => ReplayableBody::Bytes(bytes.clone()),
875        };
876
877        let url = self
878            .endpoint
879            .s3_object_url(identifier.as_str(), &spec.filename)?;
880        Ok((url, headers, body))
881    }
882
883    fn archive_request(&self, method: Method, url: Url) -> reqwest::RequestBuilder {
884        let mut request = self.inner.request(method, url);
885        if let Some(auth) = &self.auth {
886            request = request.header(AUTHORIZATION, auth.authorization_header());
887        }
888        request
889    }
890
891    fn s3_request(
892        &self,
893        method: Method,
894        url: Url,
895        headers: HeaderMap,
896    ) -> Result<reqwest::RequestBuilder, InternetArchiveError> {
897        let auth = self
898            .auth
899            .as_ref()
900            .ok_or(InternetArchiveError::MissingAuth)?;
901        Ok(self
902            .s3_inner
903            .request(method, url)
904            .headers(headers)
905            .header(AUTHORIZATION, auth.authorization_header()))
906    }
907
908    async fn execute_json<T>(
909        &self,
910        request: reqwest::RequestBuilder,
911    ) -> Result<T, InternetArchiveError>
912    where
913        T: serde::de::DeserializeOwned,
914    {
915        let response = request.send().await?;
916        if !response.status().is_success() {
917            return Err(InternetArchiveError::from_response(response).await);
918        }
919        let bytes = response.bytes().await?;
920        Ok(serde_json::from_slice(&bytes)?)
921    }
922
923    async fn execute_bytes(
924        &self,
925        request: reqwest::RequestBuilder,
926    ) -> Result<bytes::Bytes, InternetArchiveError> {
927        let response = request.send().await?;
928        if !response.status().is_success() {
929            return Err(InternetArchiveError::from_response(response).await);
930        }
931        response.bytes().await.map_err(Into::into)
932    }
933
934    #[cfg(feature = "indicatif")]
935    async fn execute_bytes_with_progress(
936        &self,
937        request: reqwest::RequestBuilder,
938        progress: &ProgressBar,
939    ) -> Result<bytes::Bytes, InternetArchiveError> {
940        progress.set_position(0);
941
942        let mut response = request.send().await?;
943        if !response.status().is_success() {
944            return Err(InternetArchiveError::from_response(response).await);
945        }
946
947        if let Some(length) = response.content_length() {
948            progress.set_length(length);
949        }
950
951        let mut bytes = response
952            .content_length()
953            .and_then(|length| usize::try_from(length).ok())
954            .map_or_else(Vec::new, Vec::with_capacity);
955
956        while let Some(chunk) = response.chunk().await? {
957            progress.inc(chunk.len() as u64);
958            bytes.extend_from_slice(&chunk);
959        }
960
961        progress.finish();
962        Ok(bytes::Bytes::from(bytes))
963    }
964
965    async fn execute_metadata_write(
966        &self,
967        request: reqwest::RequestBuilder,
968    ) -> Result<MetadataWriteResponse, InternetArchiveError> {
969        let response = request.send().await?;
970        if !response.status().is_success() {
971            return Err(InternetArchiveError::from_response(response).await);
972        }
973
974        let bytes = response.bytes().await?;
975        decode_metadata_write_failure(&bytes)?;
976        Ok(serde_json::from_slice(&bytes)?)
977    }
978
979    async fn execute_s3(
980        &self,
981        method: Method,
982        url: Url,
983        headers: HeaderMap,
984        body: Option<ReplayableBody>,
985    ) -> Result<reqwest::Response, InternetArchiveError> {
986        let mut current_url = url;
987        let mut remaining_redirects = 8_u8;
988
989        loop {
990            let mut request =
991                self.s3_request(method.clone(), current_url.clone(), headers.clone())?;
992            if let Some(body) = &body {
993                request = body.apply(request).await?;
994            }
995
996            let response = request.send().await?;
997            if is_redirect(response.status()) {
998                let Some(location) = response.headers().get(LOCATION).cloned() else {
999                    return Err(InternetArchiveError::InvalidState(
1000                        "redirect response missing location header".to_owned(),
1001                    ));
1002                };
1003
1004                if remaining_redirects == 0 {
1005                    return Err(InternetArchiveError::InvalidState(
1006                        "too many redirects during S3 request".to_owned(),
1007                    ));
1008                }
1009
1010                let location = location.to_str().map_err(|_| {
1011                    InternetArchiveError::InvalidState(
1012                        "redirect location is not valid UTF-8".to_owned(),
1013                    )
1014                })?;
1015                let redirected_url = current_url.join(location)?;
1016                if redirected_url.origin() != self.endpoint.s3_base().origin() {
1017                    return Err(InternetArchiveError::InvalidState(
1018                        "refusing to forward credentials to redirected S3 host".to_owned(),
1019                    ));
1020                }
1021                current_url = redirected_url;
1022                remaining_redirects -= 1;
1023                continue;
1024            }
1025
1026            if !response.status().is_success() {
1027                return Err(InternetArchiveError::from_response(response).await);
1028            }
1029
1030            return Ok(response);
1031        }
1032    }
1033
1034    #[cfg(feature = "indicatif")]
1035    async fn execute_s3_with_progress(
1036        &self,
1037        method: Method,
1038        url: Url,
1039        headers: HeaderMap,
1040        body: Option<ReplayableBody>,
1041        progress: &ProgressBar,
1042    ) -> Result<reqwest::Response, InternetArchiveError> {
1043        let mut current_url = url;
1044        let mut remaining_redirects = 8_u8;
1045
1046        loop {
1047            let mut request =
1048                self.s3_request(method.clone(), current_url.clone(), headers.clone())?;
1049            if let Some(body) = &body {
1050                request = body.apply_with_progress(request, progress).await?;
1051            }
1052
1053            let response = request.send().await?;
1054            if is_redirect(response.status()) {
1055                let Some(location) = response.headers().get(LOCATION).cloned() else {
1056                    return Err(InternetArchiveError::InvalidState(
1057                        "redirect response missing location header".to_owned(),
1058                    ));
1059                };
1060
1061                if remaining_redirects == 0 {
1062                    return Err(InternetArchiveError::InvalidState(
1063                        "too many redirects during S3 request".to_owned(),
1064                    ));
1065                }
1066
1067                let location = location.to_str().map_err(|_| {
1068                    InternetArchiveError::InvalidState(
1069                        "redirect location is not valid UTF-8".to_owned(),
1070                    )
1071                })?;
1072                let redirected_url = current_url.join(location)?;
1073                if redirected_url.origin() != self.endpoint.s3_base().origin() {
1074                    return Err(InternetArchiveError::InvalidState(
1075                        "refusing to forward credentials to redirected S3 host".to_owned(),
1076                    ));
1077                }
1078                current_url = redirected_url;
1079                remaining_redirects -= 1;
1080                continue;
1081            }
1082
1083            if !response.status().is_success() {
1084                return Err(InternetArchiveError::from_response(response).await);
1085            }
1086
1087            progress.finish();
1088            return Ok(response);
1089        }
1090    }
1091
1092    pub(crate) async fn wait_until<T, F, Fut>(
1093        &self,
1094        label: &'static str,
1095        mut action: F,
1096    ) -> Result<T, InternetArchiveError>
1097    where
1098        F: FnMut() -> Fut,
1099        Fut: std::future::Future<Output = Result<T, InternetArchiveError>>,
1100    {
1101        let started = tokio::time::Instant::now();
1102        let mut delay = self.poll.initial_delay;
1103
1104        loop {
1105            match action().await {
1106                Ok(value) => return Ok(value),
1107                Err(error)
1108                    if started.elapsed() < self.poll.max_wait
1109                        && is_retryable_wait_error(&error) =>
1110                {
1111                    tokio::time::sleep(delay).await;
1112                    delay = std::cmp::min(delay.saturating_mul(2), self.poll.max_delay);
1113                }
1114                Err(error) => return Err(error),
1115            }
1116
1117            if started.elapsed() >= self.poll.max_wait {
1118                return Err(InternetArchiveError::Timeout(label));
1119            }
1120        }
1121    }
1122}
1123
1124#[derive(Clone, Debug)]
1125enum ReplayableBody {
1126    Path {
1127        path: std::path::PathBuf,
1128        length: u64,
1129    },
1130    Bytes(Vec<u8>),
1131}
1132
1133impl ReplayableBody {
1134    async fn apply(
1135        &self,
1136        request: reqwest::RequestBuilder,
1137    ) -> Result<reqwest::RequestBuilder, InternetArchiveError> {
1138        match self {
1139            Self::Path { path, length } => {
1140                let file = File::open(path).await?;
1141                Ok(request
1142                    .header(CONTENT_LENGTH, *length)
1143                    .body(reqwest::Body::wrap_stream(ReaderStream::new(file))))
1144            }
1145            Self::Bytes(bytes) => Ok(request
1146                .header(CONTENT_LENGTH, bytes.len())
1147                .body(bytes.clone())),
1148        }
1149    }
1150
1151    #[cfg(feature = "indicatif")]
1152    async fn apply_with_progress(
1153        &self,
1154        request: reqwest::RequestBuilder,
1155        progress: &ProgressBar,
1156    ) -> Result<reqwest::RequestBuilder, InternetArchiveError> {
1157        progress.set_position(0);
1158
1159        match self {
1160            Self::Path { path, length } => {
1161                progress.set_length(*length);
1162                let file = File::open(path).await?;
1163                Ok(request
1164                    .header(CONTENT_LENGTH, *length)
1165                    .body(reqwest::Body::wrap_stream(ProgressStream::new(
1166                        ReaderStream::new(file),
1167                        progress.clone(),
1168                    ))))
1169            }
1170            Self::Bytes(bytes) => {
1171                let length = bytes.len() as u64;
1172                progress.set_length(length);
1173                Ok(request
1174                    .header(CONTENT_LENGTH, length)
1175                    .body(reqwest::Body::wrap_stream(ProgressStream::new(
1176                        ChunkedBytesStream::new(bytes::Bytes::from(bytes.clone())),
1177                        progress.clone(),
1178                    ))))
1179            }
1180        }
1181    }
1182}
1183
1184#[cfg(feature = "indicatif")]
1185struct ProgressStream<S> {
1186    inner: S,
1187    progress: ProgressBar,
1188}
1189
1190#[cfg(feature = "indicatif")]
1191impl<S> ProgressStream<S> {
1192    fn new(inner: S, progress: ProgressBar) -> Self {
1193        Self { inner, progress }
1194    }
1195}
1196
1197#[cfg(feature = "indicatif")]
1198impl<S> Stream for ProgressStream<S>
1199where
1200    S: Stream<Item = Result<bytes::Bytes, std::io::Error>> + Unpin,
1201{
1202    type Item = Result<bytes::Bytes, std::io::Error>;
1203
1204    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1205        let this = self.get_mut();
1206        match Pin::new(&mut this.inner).poll_next(cx) {
1207            Poll::Ready(Some(Ok(chunk))) => {
1208                this.progress.inc(chunk.len() as u64);
1209                Poll::Ready(Some(Ok(chunk)))
1210            }
1211            other => other,
1212        }
1213    }
1214}
1215
1216#[cfg(feature = "indicatif")]
1217struct ChunkedBytesStream {
1218    bytes: bytes::Bytes,
1219    offset: usize,
1220}
1221
1222#[cfg(feature = "indicatif")]
1223impl ChunkedBytesStream {
1224    const CHUNK_SIZE: usize = 16 * 1024;
1225
1226    fn new(bytes: bytes::Bytes) -> Self {
1227        Self { bytes, offset: 0 }
1228    }
1229}
1230
1231#[cfg(feature = "indicatif")]
1232impl Stream for ChunkedBytesStream {
1233    type Item = Result<bytes::Bytes, std::io::Error>;
1234
1235    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1236        let this = self.get_mut();
1237        if this.offset >= this.bytes.len() {
1238            return Poll::Ready(None);
1239        }
1240
1241        let end = this
1242            .offset
1243            .saturating_add(Self::CHUNK_SIZE)
1244            .min(this.bytes.len());
1245        let chunk = this.bytes.slice(this.offset..end);
1246        this.offset = end;
1247        Poll::Ready(Some(Ok(chunk)))
1248    }
1249}
1250
1251fn is_redirect(status: StatusCode) -> bool {
1252    matches!(
1253        status,
1254        StatusCode::MOVED_PERMANENTLY
1255            | StatusCode::FOUND
1256            | StatusCode::SEE_OTHER
1257            | StatusCode::TEMPORARY_REDIRECT
1258            | StatusCode::PERMANENT_REDIRECT
1259    )
1260}
1261
1262fn is_retryable_wait_error(error: &InternetArchiveError) -> bool {
1263    match error {
1264        InternetArchiveError::ItemNotFound { .. } => true,
1265        InternetArchiveError::Http { status, .. } if status.is_server_error() => true,
1266        _ => false,
1267    }
1268}
1269
1270fn decode_search_response(bytes: &[u8]) -> Result<SearchResponse, InternetArchiveError> {
1271    let value: Value = serde_json::from_slice(bytes)?;
1272
1273    if value.get("response").is_some() {
1274        return Ok(serde_json::from_value(value)?);
1275    }
1276
1277    let message = value
1278        .get("error")
1279        .and_then(Value::as_str)
1280        .or_else(|| value.get("message").and_then(Value::as_str))
1281        .or_else(|| value.get("title").and_then(Value::as_str))
1282        .map_or_else(
1283            || {
1284                String::from_utf8_lossy(bytes)
1285                    .trim()
1286                    .chars()
1287                    .take(512)
1288                    .collect()
1289            },
1290            str::to_owned,
1291        );
1292
1293    Err(InternetArchiveError::InvalidState(format!(
1294        "unexpected search response: {message}"
1295    )))
1296}
1297
1298fn decode_task_submission(bytes: &[u8]) -> Result<TaskSubmission, InternetArchiveError> {
1299    let value: Value = serde_json::from_slice(bytes)?;
1300
1301    let success = value
1302        .get("success")
1303        .and_then(Value::as_bool)
1304        .unwrap_or(false);
1305
1306    if success {
1307        if let Some(inner) = value.get("value").cloned() {
1308            return Ok(serde_json::from_value(inner)?);
1309        }
1310    }
1311
1312    let message = value
1313        .get("error")
1314        .and_then(Value::as_str)
1315        .or_else(|| value.get("message").and_then(Value::as_str))
1316        .map_or_else(
1317            || {
1318                String::from_utf8_lossy(bytes)
1319                    .trim()
1320                    .chars()
1321                    .take(512)
1322                    .collect()
1323            },
1324            str::to_owned,
1325        );
1326
1327    Err(InternetArchiveError::InvalidState(format!(
1328        "unexpected task submission response: {message}"
1329    )))
1330}
1331
1332#[cfg(test)]
1333mod tests {
1334    #[cfg(feature = "indicatif")]
1335    use std::pin::Pin;
1336    use std::sync::OnceLock;
1337    #[cfg(feature = "indicatif")]
1338    use std::task::{Context, Poll, Waker};
1339    use std::time::Duration;
1340
1341    use axum::extract::State;
1342    use axum::http::{HeaderMap, HeaderValue, StatusCode, Uri};
1343    use axum::routing::{get, put};
1344    use axum::{Json, Router};
1345    #[cfg(feature = "indicatif")]
1346    use bytes::Bytes;
1347    #[cfg(feature = "indicatif")]
1348    use futures_core::Stream;
1349    #[cfg(feature = "indicatif")]
1350    use indicatif::ProgressBar;
1351    use serde_json::{json, Value};
1352    use tokio::net::TcpListener;
1353    use tokio::sync::Mutex;
1354    use url::Url;
1355
1356    use super::{Auth, InternetArchiveClient};
1357    #[cfg(feature = "indicatif")]
1358    use super::{ChunkedBytesStream, ProgressStream, ReplayableBody};
1359    use crate::error::InternetArchiveError;
1360    use crate::metadata::{ItemMetadata, MetadataChange, MetadataTarget, PatchOperation};
1361    use crate::search::{SearchQuery, SortDirection};
1362    use crate::upload::{DeleteOptions, UploadOptions, UploadSpec};
1363    use crate::{Endpoint, IdentifierError, ItemIdentifier, PollOptions};
1364    use reqwest::header::LOCATION;
1365
1366    #[derive(Default)]
1367    struct StateData {
1368        seen_upload_auth: Mutex<Vec<String>>,
1369        seen_delete_auth: Mutex<Vec<String>>,
1370        captured_mdapi_body: Mutex<Vec<String>>,
1371        #[cfg(feature = "indicatif")]
1372        metadata_reads: Mutex<u8>,
1373    }
1374
1375    fn test_client(addr: std::net::SocketAddr) -> InternetArchiveClient {
1376        InternetArchiveClient::builder()
1377            .auth(Auth::new("access", "secret"))
1378            .endpoint(Endpoint::custom(
1379                Url::parse(&format!("http://{addr}/")).unwrap(),
1380                Url::parse(&format!("http://{addr}/s3/")).unwrap(),
1381            ))
1382            .poll_options(PollOptions {
1383                max_wait: Duration::from_millis(50),
1384                initial_delay: Duration::from_millis(5),
1385                max_delay: Duration::from_millis(10),
1386            })
1387            .build()
1388            .unwrap()
1389    }
1390
1391    fn unauthenticated_test_client(addr: std::net::SocketAddr) -> InternetArchiveClient {
1392        InternetArchiveClient::builder()
1393            .endpoint(Endpoint::custom(
1394                Url::parse(&format!("http://{addr}/")).unwrap(),
1395                Url::parse(&format!("http://{addr}/s3/")).unwrap(),
1396            ))
1397            .poll_options(PollOptions {
1398                max_wait: Duration::from_millis(50),
1399                initial_delay: Duration::from_millis(5),
1400                max_delay: Duration::from_millis(10),
1401            })
1402            .build()
1403            .unwrap()
1404    }
1405
1406    #[tokio::test]
1407    async fn search_get_item_metadata_write_download_and_redirected_s3_calls_work() {
1408        async fn metadata() -> Json<Value> {
1409            Json(json!({
1410                "created": 1,
1411                "files": [{"name": "demo.txt", "size": "5"}],
1412                "metadata": {
1413                    "identifier": "demo-item",
1414                    "title": "Demo item",
1415                    "collection": ["opensource"]
1416                }
1417            }))
1418        }
1419
1420        async fn advanced_search() -> Json<Value> {
1421            Json(json!({
1422                "responseHeader": {
1423                    "status": 0,
1424                    "QTime": 1,
1425                    "params": {"query": "identifier:demo-item"}
1426                },
1427                "response": {
1428                    "numFound": 1,
1429                    "start": 0,
1430                    "docs": [{"identifier": "demo-item", "title": "Demo item"}]
1431                }
1432            }))
1433        }
1434
1435        async fn metadata_write(
1436            State(state): State<std::sync::Arc<StateData>>,
1437            headers: HeaderMap,
1438            body: String,
1439        ) -> (StatusCode, Json<Value>) {
1440            state.captured_mdapi_body.lock().await.push(body);
1441            assert_eq!(headers.get("authorization").unwrap(), "LOW access:secret");
1442            (
1443                StatusCode::OK,
1444                Json(json!({
1445                    "success": true,
1446                    "task_id": 42,
1447                    "log": "https://catalogd.archive.org/log/42"
1448                })),
1449            )
1450        }
1451
1452        async fn download() -> &'static str {
1453            "hello"
1454        }
1455
1456        async fn first_upload() -> (StatusCode, HeaderMap) {
1457            let mut headers = HeaderMap::new();
1458            headers.insert(
1459                LOCATION,
1460                HeaderValue::from_static("/s3-redirected/demo-item/demo.txt"),
1461            );
1462            (StatusCode::TEMPORARY_REDIRECT, headers)
1463        }
1464
1465        async fn redirected_upload(
1466            State(state): State<std::sync::Arc<StateData>>,
1467            headers: HeaderMap,
1468            body: String,
1469        ) -> StatusCode {
1470            state.seen_upload_auth.lock().await.push(
1471                headers
1472                    .get("authorization")
1473                    .unwrap()
1474                    .to_str()
1475                    .unwrap()
1476                    .to_owned(),
1477            );
1478            assert_eq!(body, "hello");
1479            StatusCode::OK
1480        }
1481
1482        async fn first_delete() -> (StatusCode, HeaderMap) {
1483            let mut headers = HeaderMap::new();
1484            headers.insert(
1485                LOCATION,
1486                HeaderValue::from_static("/s3-redirected/demo-item/demo.txt"),
1487            );
1488            (StatusCode::TEMPORARY_REDIRECT, headers)
1489        }
1490
1491        async fn redirected_delete(
1492            State(state): State<std::sync::Arc<StateData>>,
1493            headers: HeaderMap,
1494        ) -> StatusCode {
1495            state.seen_delete_auth.lock().await.push(
1496                headers
1497                    .get("authorization")
1498                    .unwrap()
1499                    .to_str()
1500                    .unwrap()
1501                    .to_owned(),
1502            );
1503            StatusCode::NO_CONTENT
1504        }
1505
1506        let state = std::sync::Arc::new(StateData::default());
1507        let app = Router::new()
1508            .route("/metadata/demo-item", get(metadata).post(metadata_write))
1509            .route("/advancedsearch.php", get(advanced_search))
1510            .route("/download/demo-item/demo.txt", get(download))
1511            .route(
1512                "/s3/demo-item/demo.txt",
1513                put(first_upload).delete(first_delete),
1514            )
1515            .route(
1516                "/s3-redirected/demo-item/demo.txt",
1517                put(redirected_upload).delete(redirected_delete),
1518            )
1519            .with_state(state.clone());
1520
1521        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1522        let addr = listener.local_addr().unwrap();
1523        let server = tokio::spawn(async move { axum::serve(listener, app).await.unwrap() });
1524        let client = test_client(addr);
1525        let identifier = ItemIdentifier::new("demo-item").unwrap();
1526
1527        let item = client.get_item(&identifier).await.unwrap();
1528        assert_eq!(item.metadata.title(), Some("Demo item"));
1529
1530        let search = client
1531            .search(
1532                &SearchQuery::builder("identifier:demo-item")
1533                    .field("identifier")
1534                    .field("title")
1535                    .sort("publicdate", SortDirection::Desc)
1536                    .build(),
1537            )
1538            .await
1539            .unwrap();
1540        assert_eq!(
1541            search.response.docs[0].identifier().unwrap().as_str(),
1542            "demo-item"
1543        );
1544
1545        let write = client
1546            .apply_metadata_patch(
1547                &identifier,
1548                MetadataTarget::Metadata,
1549                &[PatchOperation::replace("/title", "Updated title")],
1550            )
1551            .await
1552            .unwrap();
1553        assert_eq!(write.task_id, Some(crate::TaskId(42)));
1554
1555        let spec = UploadSpec::from_bytes("demo.txt", b"hello".to_vec());
1556        client
1557            .upload_file(&identifier, &spec, &UploadOptions::default())
1558            .await
1559            .unwrap();
1560        client
1561            .delete_file(&identifier, "demo.txt", &DeleteOptions::default())
1562            .await
1563            .unwrap();
1564        assert_eq!(
1565            client
1566                .download_bytes(&identifier, "demo.txt")
1567                .await
1568                .unwrap(),
1569            "hello"
1570        );
1571
1572        assert_eq!(state.seen_upload_auth.lock().await[0], "LOW access:secret");
1573        assert_eq!(state.seen_delete_auth.lock().await[0], "LOW access:secret");
1574        assert!(state.captured_mdapi_body.lock().await[0].contains("-target=metadata"));
1575
1576        server.abort();
1577    }
1578
1579    #[tokio::test]
1580    async fn search_reports_non_search_json_payloads_as_invalid_state() {
1581        async fn advanced_search_error() -> Json<Value> {
1582            Json(json!({
1583                "message": "search backend warming up"
1584            }))
1585        }
1586
1587        let app = Router::new().route("/advancedsearch.php", get(advanced_search_error));
1588        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1589        let addr = listener.local_addr().unwrap();
1590        let server = tokio::spawn(async move { axum::serve(listener, app).await.unwrap() });
1591        let client = unauthenticated_test_client(addr);
1592
1593        let error = client
1594            .search(&SearchQuery::builder("identifier:demo-item").build())
1595            .await
1596            .unwrap_err();
1597
1598        assert!(
1599            matches!(error, InternetArchiveError::InvalidState(message) if message.contains("search backend warming up"))
1600        );
1601
1602        server.abort();
1603    }
1604
1605    #[tokio::test]
1606    async fn search_uses_trimmed_raw_json_when_no_message_fields_are_present() {
1607        async fn advanced_search_error() -> Json<Value> {
1608            Json(json!({
1609                "foo": "bar"
1610            }))
1611        }
1612
1613        let app = Router::new().route("/advancedsearch.php", get(advanced_search_error));
1614        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1615        let addr = listener.local_addr().unwrap();
1616        let server = tokio::spawn(async move { axum::serve(listener, app).await.unwrap() });
1617        let client = unauthenticated_test_client(addr);
1618
1619        let error = client
1620            .search(&SearchQuery::builder("identifier:demo-item").build())
1621            .await
1622            .unwrap_err();
1623
1624        assert!(
1625            matches!(error, InternetArchiveError::InvalidState(message) if message.contains("\"foo\":\"bar\""))
1626        );
1627
1628        server.abort();
1629    }
1630
1631    #[tokio::test]
1632    async fn s3_redirects_do_not_forward_credentials_to_foreign_hosts() {
1633        async fn initial_upload_redirect() -> (StatusCode, HeaderMap) {
1634            let trap = TRAP_BASE_URL.get().expect("trap base url");
1635            let mut headers = HeaderMap::new();
1636            headers.insert(
1637                LOCATION,
1638                HeaderValue::from_str(&format!("{trap}stolen/demo-item/demo.txt")).unwrap(),
1639            );
1640            (StatusCode::TEMPORARY_REDIRECT, headers)
1641        }
1642
1643        async fn trap_handler(
1644            State(state): State<std::sync::Arc<StateData>>,
1645            headers: HeaderMap,
1646        ) -> StatusCode {
1647            state.seen_upload_auth.lock().await.push(
1648                headers
1649                    .get("authorization")
1650                    .and_then(|value| value.to_str().ok())
1651                    .unwrap_or("")
1652                    .to_owned(),
1653            );
1654            StatusCode::OK
1655        }
1656
1657        static TRAP_BASE_URL: OnceLock<String> = OnceLock::new();
1658
1659        let trap_state = std::sync::Arc::new(StateData::default());
1660        let trap_app = Router::new()
1661            .route("/stolen/demo-item/demo.txt", put(trap_handler))
1662            .with_state(trap_state.clone());
1663        let trap_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1664        let trap_addr = trap_listener.local_addr().unwrap();
1665        let trap_server =
1666            tokio::spawn(async move { axum::serve(trap_listener, trap_app).await.unwrap() });
1667        TRAP_BASE_URL
1668            .set(format!("http://{trap_addr}/"))
1669            .expect("set trap base url once");
1670
1671        let origin_state = std::sync::Arc::new(StateData::default());
1672        let origin_app = Router::new()
1673            .route("/s3/demo-item/demo.txt", put(initial_upload_redirect))
1674            .with_state(origin_state);
1675        let origin_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1676        let origin_addr = origin_listener.local_addr().unwrap();
1677        let origin_server =
1678            tokio::spawn(async move { axum::serve(origin_listener, origin_app).await.unwrap() });
1679
1680        let client = test_client(origin_addr);
1681        let error = client
1682            .upload_file(
1683                &ItemIdentifier::new("demo-item").unwrap(),
1684                &UploadSpec::from_bytes("demo.txt", b"hello"),
1685                &UploadOptions::default(),
1686            )
1687            .await
1688            .unwrap_err();
1689
1690        assert!(matches!(
1691            error,
1692            InternetArchiveError::InvalidState(message)
1693                if message.contains("redirected S3 host")
1694        ));
1695        assert!(trap_state.seen_upload_auth.lock().await.is_empty());
1696
1697        origin_server.abort();
1698        trap_server.abort();
1699    }
1700
1701    #[test]
1702    fn auth_debug_is_redacted_and_env_helpers_work() {
1703        let auth = Auth::new("access", "secret");
1704        assert!(format!("{auth:?}").contains("<redacted>"));
1705    }
1706
1707    #[test]
1708    fn decode_task_submission_falls_back_to_body_excerpt_when_envelope_has_no_error_message() {
1709        let error = super::decode_task_submission(b"{\"success\":false}").unwrap_err();
1710        match error {
1711            InternetArchiveError::InvalidState(message) => {
1712                assert!(
1713                    message.contains("{\"success\":false}"),
1714                    "expected body excerpt in fallback message, got: {message}"
1715                );
1716            }
1717            other => panic!("unexpected error variant: {other:?}"),
1718        }
1719    }
1720
1721    #[tokio::test]
1722    async fn update_item_metadata_returns_synthetic_success_for_noop_diff() {
1723        async fn metadata() -> Json<Value> {
1724            Json(json!({
1725                "files": [],
1726                "metadata": {
1727                    "identifier": "demo-item",
1728                    "title": "Demo item",
1729                    "collection": ["opensource"]
1730                }
1731            }))
1732        }
1733
1734        let app = Router::new().route("/metadata/demo-item", get(metadata));
1735        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1736        let addr = listener.local_addr().unwrap();
1737        let server = tokio::spawn(async move { axum::serve(listener, app).await.unwrap() });
1738
1739        let client = InternetArchiveClient::builder()
1740            .endpoint(Endpoint::custom(
1741                Url::parse(&format!("http://{addr}/")).unwrap(),
1742                Url::parse(&format!("http://{addr}/s3/")).unwrap(),
1743            ))
1744            .build()
1745            .unwrap();
1746
1747        let response = client
1748            .update_item_metadata(
1749                &ItemIdentifier::new("demo-item").unwrap(),
1750                &ItemMetadata::builder()
1751                    .title("Demo item")
1752                    .collection("opensource")
1753                    .build(),
1754            )
1755            .await
1756            .unwrap();
1757        assert!(response.success);
1758        assert_eq!(response.task_id, None);
1759
1760        server.abort();
1761    }
1762
1763    #[test]
1764    fn builder_accessors_env_helpers_and_wait_until_paths_work() {
1765        static ENV_LOCK: OnceLock<std::sync::Mutex<()>> = OnceLock::new();
1766
1767        let _guard = ENV_LOCK
1768            .get_or_init(|| std::sync::Mutex::new(()))
1769            .lock()
1770            .unwrap();
1771
1772        let custom_access = "IA_TEST_ACCESS_KEY";
1773        let custom_secret = "IA_TEST_SECRET_KEY";
1774        std::env::set_var(custom_access, "custom-access");
1775        std::env::set_var(custom_secret, "custom-secret");
1776        std::env::set_var(Auth::ACCESS_KEY_ENV_VAR, "default-access");
1777        std::env::set_var(Auth::SECRET_KEY_ENV_VAR, "default-secret");
1778
1779        let auth = Auth::from_env_vars(custom_access, custom_secret).unwrap();
1780        assert_eq!(
1781            auth.authorization_header(),
1782            "LOW custom-access:custom-secret"
1783        );
1784        assert_eq!(
1785            Auth::from_env().unwrap().authorization_header(),
1786            "LOW default-access:default-secret"
1787        );
1788        assert!(matches!(
1789            Auth::from_env_vars("MISSING_ACCESS", custom_secret).unwrap_err(),
1790            InternetArchiveError::EnvVar { name, .. } if name == "MISSING_ACCESS"
1791        ));
1792
1793        let poll = PollOptions {
1794            max_wait: Duration::from_secs(1),
1795            initial_delay: Duration::from_millis(5),
1796            max_delay: Duration::from_millis(10),
1797        };
1798        let endpoint = Endpoint::custom(
1799            Url::parse("http://localhost:3000/archive").unwrap(),
1800            Url::parse("http://localhost:3000/s3").unwrap(),
1801        );
1802        let client = InternetArchiveClient::builder()
1803            .auth(auth.clone())
1804            .endpoint(endpoint.clone())
1805            .user_agent("internetarchive-rs-tests")
1806            .request_timeout(Duration::from_secs(5))
1807            .connect_timeout(Duration::from_secs(1))
1808            .poll_options(poll.clone())
1809            .build()
1810            .unwrap();
1811
1812        assert!(client.has_auth());
1813        assert_eq!(client.endpoint(), &endpoint);
1814        assert_eq!(client.poll_options(), &poll);
1815        assert_eq!(client.request_timeout(), Some(Duration::from_secs(5)));
1816        assert_eq!(client.connect_timeout(), Some(Duration::from_secs(1)));
1817        assert!(!InternetArchiveClient::new().unwrap().has_auth());
1818        assert!(InternetArchiveClient::with_auth(auth).unwrap().has_auth());
1819        assert!(InternetArchiveClient::from_env().unwrap().has_auth());
1820
1821        let runtime = tokio::runtime::Runtime::new().unwrap();
1822        let mut attempts = 0_u8;
1823        runtime.block_on(async {
1824            let value = client
1825                .wait_until("demo wait", || {
1826                    attempts += 1;
1827                    async move {
1828                        if attempts < 3 {
1829                            Err(InternetArchiveError::ItemNotFound {
1830                                identifier: "demo-item".to_owned(),
1831                            })
1832                        } else {
1833                            Ok("ready")
1834                        }
1835                    }
1836                })
1837                .await
1838                .unwrap();
1839            assert_eq!(value, "ready");
1840
1841            let mut transient_attempts = 0_u8;
1842            let recovered = client
1843                .wait_until("demo transient", || {
1844                    transient_attempts += 1;
1845                    async move {
1846                        if transient_attempts < 3 {
1847                            Err(InternetArchiveError::Http {
1848                                status: StatusCode::BAD_GATEWAY,
1849                                code: None,
1850                                message: Some("temporary outage".to_owned()),
1851                                raw_body: None,
1852                            })
1853                        } else {
1854                            Ok("recovered")
1855                        }
1856                    }
1857                })
1858                .await
1859                .unwrap();
1860            assert_eq!(recovered, "recovered");
1861
1862            let error = client
1863                .wait_until("demo error", || async {
1864                    Err::<(), _>(InternetArchiveError::InvalidState("boom".to_owned()))
1865                })
1866                .await
1867                .unwrap_err();
1868            assert!(
1869                matches!(error, InternetArchiveError::InvalidState(message) if message == "boom")
1870            );
1871
1872            let timeout = client
1873                .wait_until("demo timeout", || async {
1874                    Err::<(), _>(InternetArchiveError::ItemNotFound {
1875                        identifier: "demo-item".to_owned(),
1876                    })
1877                })
1878                .await
1879                .unwrap_err();
1880            assert!(matches!(
1881                timeout,
1882                InternetArchiveError::Timeout("demo timeout")
1883            ));
1884        });
1885
1886        std::env::remove_var(custom_access);
1887        std::env::remove_var(custom_secret);
1888        std::env::remove_var(Auth::ACCESS_KEY_ENV_VAR);
1889        std::env::remove_var(Auth::SECRET_KEY_ENV_VAR);
1890    }
1891
1892    #[tokio::test]
1893    async fn missing_auth_and_http_error_paths_are_reported() {
1894        async fn metadata() -> Json<Value> {
1895            Json(json!({
1896                "files": [],
1897                "metadata": {
1898                    "identifier": "demo-item",
1899                    "title": "Old title"
1900                }
1901            }))
1902        }
1903
1904        async fn blank_metadata() -> &'static str {
1905            "   "
1906        }
1907
1908        async fn non_item_metadata() -> Json<Value> {
1909            Json(json!({
1910                "error": "identifier not found",
1911                "success": false
1912            }))
1913        }
1914
1915        async fn mismatched_metadata() -> Json<Value> {
1916            Json(json!({
1917                "files": [],
1918                "metadata": {
1919                    "identifier": "other-item",
1920                    "title": "Wrong item"
1921                }
1922            }))
1923        }
1924
1925        async fn search_error() -> (StatusCode, Json<Value>) {
1926            (
1927                StatusCode::BAD_GATEWAY,
1928                Json(json!({"error":"search failed","code":"bad_gateway"})),
1929            )
1930        }
1931
1932        async fn metadata_error() -> (StatusCode, Json<Value>) {
1933            (
1934                StatusCode::BAD_REQUEST,
1935                Json(json!({"error":"metadata failed","code":"bad_request"})),
1936            )
1937        }
1938
1939        async fn download_error() -> (StatusCode, &'static str) {
1940            (StatusCode::BAD_GATEWAY, "download failed")
1941        }
1942
1943        async fn missing_location() -> StatusCode {
1944            StatusCode::TEMPORARY_REDIRECT
1945        }
1946
1947        async fn failing_upload() -> (StatusCode, &'static str) {
1948            (StatusCode::INTERNAL_SERVER_ERROR, "upload failed")
1949        }
1950
1951        let app = Router::new()
1952            .route("/metadata/demo-item", get(metadata).post(metadata_error))
1953            .route("/metadata/blank-item", get(blank_metadata))
1954            .route("/metadata/non-item", get(non_item_metadata))
1955            .route("/metadata/mismatched-item", get(mismatched_metadata))
1956            .route("/advancedsearch.php", get(search_error))
1957            .route("/download/demo-item/missing.txt", get(download_error))
1958            .route("/s3/demo-item/missing-location.bin", put(missing_location))
1959            .route("/s3/demo-item/failing.bin", put(failing_upload));
1960        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1961        let addr = listener.local_addr().unwrap();
1962        let server = tokio::spawn(async move { axum::serve(listener, app).await.unwrap() });
1963
1964        let unauth = unauthenticated_test_client(addr);
1965        let auth = test_client(addr);
1966        let identifier = ItemIdentifier::new("demo-item").unwrap();
1967
1968        assert!(matches!(
1969            unauth.get_item_by_str("bad item").await.unwrap_err(),
1970            InternetArchiveError::Identifier(_)
1971        ));
1972        assert!(matches!(
1973            unauth.check_upload_limit(&identifier).await.unwrap_err(),
1974            InternetArchiveError::MissingAuth
1975        ));
1976        assert!(matches!(
1977            unauth
1978                .apply_metadata_patch(
1979                    &identifier,
1980                    MetadataTarget::Metadata,
1981                    &[PatchOperation::replace("/title", "New title")],
1982                )
1983                .await
1984                .unwrap_err(),
1985            InternetArchiveError::MissingAuth
1986        ));
1987        assert!(matches!(
1988            unauth
1989                .apply_metadata_changes(
1990                    &identifier,
1991                    &[MetadataChange {
1992                        target: "metadata".to_owned(),
1993                        patch: vec![PatchOperation::add("/subject/-", "rust")],
1994                    }],
1995                )
1996                .await
1997                .unwrap_err(),
1998            InternetArchiveError::MissingAuth
1999        ));
2000        assert!(matches!(
2001            unauth
2002                .upload_file(
2003                    &identifier,
2004                    &UploadSpec::from_bytes("demo.txt", b"hello"),
2005                    &UploadOptions::default(),
2006                )
2007                .await
2008                .unwrap_err(),
2009            InternetArchiveError::MissingAuth
2010        ));
2011        assert!(matches!(
2012            unauth
2013                .create_item(
2014                    &identifier,
2015                    &ItemMetadata::builder().title("Demo").build(),
2016                    &UploadSpec::from_bytes("demo.txt", b"hello"),
2017                    &UploadOptions::default(),
2018                )
2019                .await
2020                .unwrap_err(),
2021            InternetArchiveError::MissingAuth
2022        ));
2023        assert!(matches!(
2024            unauth
2025                .delete_file(&identifier, "demo.txt", &DeleteOptions::default())
2026                .await
2027                .unwrap_err(),
2028            InternetArchiveError::MissingAuth
2029        ));
2030        assert!(matches!(
2031            unauth
2032                .update_item_metadata(
2033                    &identifier,
2034                    &ItemMetadata::builder().title("New title").build(),
2035                )
2036                .await
2037                .unwrap_err(),
2038            InternetArchiveError::MissingAuth
2039        ));
2040        assert!(matches!(
2041            unauth.make_dark(&identifier, "cleanup").await.unwrap_err(),
2042            InternetArchiveError::MissingAuth
2043        ));
2044
2045        assert!(matches!(
2046            auth.get_item(&ItemIdentifier::new("blank-item").unwrap())
2047                .await
2048                .unwrap_err(),
2049            InternetArchiveError::ItemNotFound { .. }
2050        ));
2051        assert!(matches!(
2052            auth.get_item(&ItemIdentifier::new("non-item").unwrap())
2053                .await
2054                .unwrap_err(),
2055            InternetArchiveError::ItemNotFound { .. }
2056        ));
2057        assert!(matches!(
2058            auth.get_item(&ItemIdentifier::new("mismatched-item").unwrap())
2059                .await
2060                .unwrap_err(),
2061            InternetArchiveError::ItemNotFound { .. }
2062        ));
2063        assert!(matches!(
2064            auth.search(&SearchQuery::identifier("demo-item"))
2065                .await
2066                .unwrap_err(),
2067            InternetArchiveError::Http { status, .. } if status == StatusCode::BAD_GATEWAY
2068        ));
2069        assert!(matches!(
2070            auth.download_bytes(&identifier, "missing.txt")
2071                .await
2072                .unwrap_err(),
2073            InternetArchiveError::Http { status, .. } if status == StatusCode::BAD_GATEWAY
2074        ));
2075        assert!(matches!(
2076            auth.apply_metadata_patch(
2077                &identifier,
2078                MetadataTarget::Metadata,
2079                &[PatchOperation::replace("/title", "New title")],
2080            )
2081            .await
2082            .unwrap_err(),
2083            InternetArchiveError::Http { status, .. } if status == StatusCode::BAD_REQUEST
2084        ));
2085        assert!(matches!(
2086            auth.upload_file(
2087                &identifier,
2088                &UploadSpec::from_bytes("missing-location.bin", b"hello"),
2089                &UploadOptions::default(),
2090            )
2091            .await
2092            .unwrap_err(),
2093            InternetArchiveError::InvalidState(message) if message.contains("missing location")
2094        ));
2095        assert!(matches!(
2096            auth.upload_file(
2097                &identifier,
2098                &UploadSpec::from_bytes("failing.bin", b"hello"),
2099                &UploadOptions::default(),
2100            )
2101            .await
2102            .unwrap_err(),
2103            InternetArchiveError::Http { status, .. } if status == StatusCode::INTERNAL_SERVER_ERROR
2104        ));
2105
2106        server.abort();
2107    }
2108
2109    #[tokio::test]
2110    async fn s3_create_helpers_reject_bucket_unsafe_identifiers_before_network_access() {
2111        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2112        let client = test_client(listener.local_addr().unwrap());
2113        let spec = UploadSpec::from_bytes("demo.txt", b"hello");
2114        let options = UploadOptions::default();
2115        let too_long =
2116            ItemIdentifier::new("a".repeat(ItemIdentifier::MAX_BUCKET_IDENTIFIER_LEN + 1)).unwrap();
2117        let uppercase = ItemIdentifier::new("Demo-item").unwrap();
2118
2119        assert!(matches!(
2120            client
2121                .create_item(
2122                    &too_long,
2123                    &ItemMetadata::builder().title("Demo").build(),
2124                    &spec,
2125                    &options,
2126                )
2127                .await
2128                .unwrap_err(),
2129            InternetArchiveError::Identifier(IdentifierError::TooLongForBucketCreation {
2130                max: ItemIdentifier::MAX_BUCKET_IDENTIFIER_LEN,
2131                ..
2132            })
2133        ));
2134
2135        assert!(matches!(
2136            client
2137                .create_item(
2138                    &uppercase,
2139                    &ItemMetadata::builder().title("Demo").build(),
2140                    &spec,
2141                    &options,
2142                )
2143                .await
2144                .unwrap_err(),
2145            InternetArchiveError::Identifier(IdentifierError::InvalidBucketCreationCharacter {
2146                character: 'D',
2147                ..
2148            })
2149        ));
2150    }
2151
2152    #[tokio::test]
2153    async fn existing_item_s3_helpers_and_limit_check_allow_documented_non_bucket_creation_identifiers(
2154    ) {
2155        async fn upload() -> StatusCode {
2156            StatusCode::OK
2157        }
2158
2159        async fn delete() -> StatusCode {
2160            StatusCode::NO_CONTENT
2161        }
2162
2163        async fn limit_check(uri: Uri) -> Json<Value> {
2164            let query = uri.query().unwrap();
2165            assert!(query.contains("check_limit=1"));
2166            assert!(query.contains("accesskey=access"));
2167            assert!(query.contains("bucket=Demo_Item"));
2168            Json(json!({
2169                "bucket": "Demo_Item",
2170                "accesskey": "access",
2171                "over_limit": 0,
2172            }))
2173        }
2174
2175        let app = Router::new()
2176            .route("/s3/", get(limit_check))
2177            .route("/s3/Demo_Item/demo.txt", put(upload).delete(delete));
2178        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2179        let addr = listener.local_addr().unwrap();
2180        let server = tokio::spawn(async move { axum::serve(listener, app).await.unwrap() });
2181        let client = test_client(addr);
2182        let identifier = ItemIdentifier::new("Demo_Item").unwrap();
2183        let spec = UploadSpec::from_bytes("demo.txt", b"hello");
2184
2185        client
2186            .upload_file(&identifier, &spec, &UploadOptions::default())
2187            .await
2188            .unwrap();
2189        client
2190            .delete_file(&identifier, "demo.txt", &DeleteOptions::default())
2191            .await
2192            .unwrap();
2193        assert_eq!(
2194            client.check_upload_limit(&identifier).await.unwrap().bucket,
2195            "Demo_Item"
2196        );
2197
2198        server.abort();
2199    }
2200
2201    #[cfg(feature = "indicatif")]
2202    #[tokio::test]
2203    async fn create_item_with_progress_handles_redirects_and_metadata_remainders() {
2204        async fn metadata(State(state): State<std::sync::Arc<StateData>>) -> Json<Value> {
2205            let mut reads = state.metadata_reads.lock().await;
2206            let payload = if *reads < 2 {
2207                json!({
2208                    "files": [{"name": "demo.txt", "size": "5"}],
2209                    "metadata": {
2210                        "identifier": "demo-item",
2211                        "title": "Demo item"
2212                    }
2213                })
2214            } else {
2215                json!({
2216                    "files": [{"name": "demo.txt", "size": "5"}],
2217                    "metadata": {
2218                        "identifier": "demo-item",
2219                        "title": "Demo item",
2220                        "custom": {"nested": true}
2221                    }
2222                })
2223            };
2224            *reads += 1;
2225            Json(payload)
2226        }
2227
2228        async fn metadata_write(
2229            State(state): State<std::sync::Arc<StateData>>,
2230            body: String,
2231        ) -> (StatusCode, Json<Value>) {
2232            state.captured_mdapi_body.lock().await.push(body);
2233            (
2234                StatusCode::OK,
2235                Json(json!({
2236                    "success": true,
2237                    "task_id": 200,
2238                    "log": "https://catalogd.archive.org/log/200"
2239                })),
2240            )
2241        }
2242
2243        async fn first_upload() -> (StatusCode, HeaderMap) {
2244            let mut headers = HeaderMap::new();
2245            headers.insert(
2246                LOCATION,
2247                HeaderValue::from_static("/s3-redirected/demo-item/demo.txt"),
2248            );
2249            (StatusCode::TEMPORARY_REDIRECT, headers)
2250        }
2251
2252        async fn redirected_upload(body: String) -> StatusCode {
2253            assert_eq!(body, "hello");
2254            StatusCode::OK
2255        }
2256
2257        let state = std::sync::Arc::new(StateData::default());
2258        let app = Router::new()
2259            .route("/metadata/demo-item", get(metadata).post(metadata_write))
2260            .route("/s3/demo-item/demo.txt", put(first_upload))
2261            .route("/s3-redirected/demo-item/demo.txt", put(redirected_upload))
2262            .with_state(state.clone());
2263
2264        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2265        let addr = listener.local_addr().unwrap();
2266        let server = tokio::spawn(async move { axum::serve(listener, app).await.unwrap() });
2267        let client = test_client(addr);
2268        let identifier = ItemIdentifier::new("demo-item").unwrap();
2269        let progress = ProgressBar::hidden();
2270
2271        client
2272            .create_item_with_progress(
2273                &identifier,
2274                &ItemMetadata::builder()
2275                    .title("Demo item")
2276                    .extra_json("custom", json!({"nested": true}))
2277                    .build(),
2278                &UploadSpec::from_bytes("demo.txt", b"hello"),
2279                &UploadOptions::default(),
2280                &progress,
2281            )
2282            .await
2283            .unwrap();
2284
2285        assert_eq!(progress.length(), Some(5));
2286        assert_eq!(progress.position(), 5);
2287        assert!(progress.is_finished());
2288        assert_eq!(*state.metadata_reads.lock().await, 3);
2289        assert!(state.captured_mdapi_body.lock().await[0].contains("custom"));
2290
2291        server.abort();
2292    }
2293
2294    #[cfg(feature = "indicatif")]
2295    #[tokio::test]
2296    async fn progress_upload_reports_missing_location_foreign_redirect_and_http_errors() {
2297        async fn missing_location() -> StatusCode {
2298            StatusCode::TEMPORARY_REDIRECT
2299        }
2300
2301        async fn foreign_redirect() -> (StatusCode, HeaderMap) {
2302            let mut headers = HeaderMap::new();
2303            headers.insert(
2304                LOCATION,
2305                HeaderValue::from_static("http://example.com/stolen/demo-item/foreign.bin"),
2306            );
2307            (StatusCode::TEMPORARY_REDIRECT, headers)
2308        }
2309
2310        async fn failing_upload() -> (StatusCode, &'static str) {
2311            (StatusCode::INTERNAL_SERVER_ERROR, "upload failed")
2312        }
2313
2314        let app = Router::new()
2315            .route("/s3/demo-item/missing-location.bin", put(missing_location))
2316            .route("/s3/demo-item/foreign.bin", put(foreign_redirect))
2317            .route("/s3/demo-item/failing.bin", put(failing_upload));
2318        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2319        let addr = listener.local_addr().unwrap();
2320        let server = tokio::spawn(async move { axum::serve(listener, app).await.unwrap() });
2321        let client = test_client(addr);
2322        let identifier = ItemIdentifier::new("demo-item").unwrap();
2323
2324        let missing_progress = ProgressBar::hidden();
2325        assert!(matches!(
2326            client
2327                .upload_file_with_progress(
2328                    &identifier,
2329                    &UploadSpec::from_bytes("missing-location.bin", b"hello"),
2330                    &UploadOptions::default(),
2331                    &missing_progress,
2332                )
2333                .await
2334                .unwrap_err(),
2335            InternetArchiveError::InvalidState(message) if message.contains("missing location")
2336        ));
2337
2338        let foreign_progress = ProgressBar::hidden();
2339        assert!(matches!(
2340            client
2341                .upload_file_with_progress(
2342                    &identifier,
2343                    &UploadSpec::from_bytes("foreign.bin", b"hello"),
2344                    &UploadOptions::default(),
2345                    &foreign_progress,
2346                )
2347                .await
2348                .unwrap_err(),
2349            InternetArchiveError::InvalidState(message)
2350                if message.contains("redirected S3 host")
2351        ));
2352
2353        let failing_progress = ProgressBar::hidden();
2354        assert!(matches!(
2355            client
2356                .upload_file_with_progress(
2357                    &identifier,
2358                    &UploadSpec::from_bytes("failing.bin", b"hello"),
2359                    &UploadOptions::default(),
2360                    &failing_progress,
2361                )
2362                .await
2363                .unwrap_err(),
2364            InternetArchiveError::Http { status, .. } if status == StatusCode::INTERNAL_SERVER_ERROR
2365        ));
2366
2367        server.abort();
2368    }
2369
2370    #[cfg(feature = "indicatif")]
2371    #[tokio::test]
2372    async fn redirect_edge_cases_are_reported_for_plain_and_progress_uploads() {
2373        async fn endless_redirect() -> (StatusCode, HeaderMap) {
2374            let mut headers = HeaderMap::new();
2375            headers.insert(LOCATION, HeaderValue::from_static("/s3/demo-item/spin.bin"));
2376            (StatusCode::TEMPORARY_REDIRECT, headers)
2377        }
2378
2379        async fn invalid_location() -> (StatusCode, HeaderMap) {
2380            let mut headers = HeaderMap::new();
2381            headers.insert(
2382                LOCATION,
2383                HeaderValue::from_bytes(b"/s3/demo-item/\xff.bin").unwrap(),
2384            );
2385            (StatusCode::TEMPORARY_REDIRECT, headers)
2386        }
2387
2388        let app = Router::new()
2389            .route("/s3/demo-item/spin.bin", put(endless_redirect))
2390            .route("/s3/demo-item/bad-location.bin", put(invalid_location));
2391        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2392        let addr = listener.local_addr().unwrap();
2393        let server = tokio::spawn(async move { axum::serve(listener, app).await.unwrap() });
2394        let client = test_client(addr);
2395        let identifier = ItemIdentifier::new("demo-item").unwrap();
2396
2397        assert!(matches!(
2398            client
2399                .upload_file(
2400                    &identifier,
2401                    &UploadSpec::from_bytes("spin.bin", b"hello"),
2402                    &UploadOptions::default(),
2403                )
2404                .await
2405                .unwrap_err(),
2406            InternetArchiveError::InvalidState(message) if message.contains("too many redirects")
2407        ));
2408        assert!(matches!(
2409            client
2410                .upload_file(
2411                    &identifier,
2412                    &UploadSpec::from_bytes("bad-location.bin", b"hello"),
2413                    &UploadOptions::default(),
2414                )
2415                .await
2416                .unwrap_err(),
2417            InternetArchiveError::InvalidState(message)
2418                if message.contains("not valid UTF-8")
2419        ));
2420
2421        let spin_progress = ProgressBar::hidden();
2422        assert!(matches!(
2423            client
2424                .upload_file_with_progress(
2425                    &identifier,
2426                    &UploadSpec::from_bytes("spin.bin", b"hello"),
2427                    &UploadOptions::default(),
2428                    &spin_progress,
2429                )
2430                .await
2431                .unwrap_err(),
2432            InternetArchiveError::InvalidState(message) if message.contains("too many redirects")
2433        ));
2434
2435        let bad_progress = ProgressBar::hidden();
2436        assert!(matches!(
2437            client
2438                .upload_file_with_progress(
2439                    &identifier,
2440                    &UploadSpec::from_bytes("bad-location.bin", b"hello"),
2441                    &UploadOptions::default(),
2442                    &bad_progress,
2443                )
2444                .await
2445                .unwrap_err(),
2446            InternetArchiveError::InvalidState(message)
2447                if message.contains("not valid UTF-8")
2448        ));
2449
2450        server.abort();
2451    }
2452
2453    #[cfg(feature = "indicatif")]
2454    #[tokio::test]
2455    async fn download_bytes_with_progress_reports_http_errors() {
2456        async fn download_error() -> (StatusCode, &'static str) {
2457            (StatusCode::BAD_GATEWAY, "download failed")
2458        }
2459
2460        let app = Router::new().route("/download/demo-item/missing.txt", get(download_error));
2461        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2462        let addr = listener.local_addr().unwrap();
2463        let server = tokio::spawn(async move { axum::serve(listener, app).await.unwrap() });
2464        let client = test_client(addr);
2465        let progress = ProgressBar::hidden();
2466
2467        assert!(matches!(
2468            client
2469                .download_bytes_with_progress(&ItemIdentifier::new("demo-item").unwrap(), "missing.txt", &progress)
2470                .await
2471                .unwrap_err(),
2472            InternetArchiveError::Http { status, .. } if status == StatusCode::BAD_GATEWAY
2473        ));
2474
2475        server.abort();
2476    }
2477
2478    #[cfg(feature = "indicatif")]
2479    #[tokio::test]
2480    async fn replayable_body_apply_with_progress_sets_lengths_for_paths_and_bytes() {
2481        let client = reqwest::Client::new();
2482
2483        let bytes_progress = ProgressBar::hidden();
2484        let _bytes_request = ReplayableBody::Bytes(b"hello".to_vec())
2485            .apply_with_progress(client.put("http://example.com/bytes"), &bytes_progress)
2486            .await
2487            .unwrap();
2488        assert_eq!(bytes_progress.length(), Some(5));
2489        assert_eq!(bytes_progress.position(), 0);
2490
2491        let directory = tempfile::tempdir().unwrap();
2492        let path = directory.path().join("demo.txt");
2493        tokio::fs::write(&path, b"hello").await.unwrap();
2494
2495        let path_progress = ProgressBar::hidden();
2496        let _path_request = ReplayableBody::Path { path, length: 5 }
2497            .apply_with_progress(client.put("http://example.com/path"), &path_progress)
2498            .await
2499            .unwrap();
2500        assert_eq!(path_progress.length(), Some(5));
2501        assert_eq!(path_progress.position(), 0);
2502    }
2503
2504    #[cfg(feature = "indicatif")]
2505    #[test]
2506    fn progress_stream_and_chunked_bytes_stream_cover_poll_paths() {
2507        let payload = Bytes::from_static(b"hello world");
2508        let progress = ProgressBar::hidden();
2509        let mut stream =
2510            ProgressStream::new(ChunkedBytesStream::new(payload.clone()), progress.clone());
2511        let waker = Waker::noop();
2512        let mut context = Context::from_waker(waker);
2513
2514        let first = Pin::new(&mut stream).poll_next(&mut context);
2515        assert!(matches!(first, Poll::Ready(Some(Ok(ref chunk))) if chunk == &payload));
2516        assert_eq!(progress.position(), payload.len() as u64);
2517
2518        let second = Pin::new(&mut stream).poll_next(&mut context);
2519        assert!(matches!(second, Poll::Ready(None)));
2520    }
2521}