Skip to main content

s3/
bucket.rs

1//! # Rust S3 Bucket Operations
2//!
3//! This module provides functionality for interacting with S3 buckets and objects,
4//! including creating, listing, uploading, downloading, and deleting objects. It supports
5//! various features such as asynchronous and blocking operations, multipart uploads,
6//! presigned URLs, and tagging objects.
7//!
8//! ## Features
9//!
10//! The module supports the following features:
11//!
12//! - **blocking**: Enables blocking (synchronous) operations using the `block_on` macro.
13//! - **tags**: Adds support for managing S3 object tags.
14//! - **with-tokio**: Enables asynchronous operations using the Tokio runtime.
15//! - **with-async-std**: Enables asynchronous operations using the async-std runtime.
16//! - **sync**: Enables synchronous (blocking) operations using standard Rust synchronization primitives.
17//!
18//! ## Constants
19//!
20//! - `CHUNK_SIZE`: Defines the chunk size for multipart uploads (8 MiB).
21//! - `DEFAULT_REQUEST_TIMEOUT`: The default request timeout (60 seconds).
22//!
23//! ## Types
24//!
25//! - `Query`: A type alias for `HashMap<String, String>`, representing query parameters for requests.
26//!
27//! ## Structs
28//!
29//! - `Bucket`: Represents an S3 bucket, providing methods to interact with the bucket and its contents.
30//! - `Tag`: Represents a key-value pair used for tagging S3 objects.
31//!
32//! ## Errors
33//!
34//! - `S3Error`: Represents various errors that can occur during S3 operations.
35
36#[cfg(feature = "blocking")]
37use block_on_proc::block_on;
38#[cfg(feature = "tags")]
39use minidom::Element;
40use std::collections::HashMap;
41use std::time::Duration;
42
43use crate::bucket_ops::{BucketConfiguration, CreateBucketResponse};
44use crate::command::{Command, Multipart};
45use crate::creds::Credentials;
46use crate::region::Region;
47#[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
48use crate::request::ResponseDataStream;
49#[cfg(feature = "with-tokio")]
50use crate::request::tokio_backend::ClientOptions;
51#[cfg(feature = "with-tokio")]
52use crate::request::tokio_backend::client;
53use crate::request::{Request as _, ResponseData};
54use std::str::FromStr;
55use std::sync::Arc;
56
57#[cfg(feature = "with-tokio")]
58use tokio::sync::RwLock;
59
60#[cfg(feature = "with-async-std")]
61use async_std::sync::RwLock;
62
63#[cfg(feature = "sync")]
64use std::sync::RwLock;
65
66pub type Query = HashMap<String, String>;
67
68#[cfg(feature = "with-async-std")]
69use crate::request::async_std_backend::SurfRequest as RequestImpl;
70#[cfg(feature = "with-tokio")]
71use crate::request::tokio_backend::ReqwestRequest as RequestImpl;
72
73#[cfg(feature = "with-async-std")]
74use async_std::io::Write as AsyncWrite;
75#[cfg(feature = "with-tokio")]
76use tokio::io::AsyncWrite;
77
78#[cfg(feature = "sync")]
79use crate::request::blocking::AttoRequest as RequestImpl;
80use std::io::Read;
81
82#[cfg(feature = "with-tokio")]
83use tokio::io::AsyncRead;
84
85#[cfg(feature = "with-async-std")]
86use async_std::io::Read as AsyncRead;
87
88use crate::PostPolicy;
89use crate::error::S3Error;
90use crate::post_policy::PresignedPost;
91use crate::serde_types::{
92    BucketLifecycleConfiguration, BucketLocationResult, CompleteMultipartUploadData,
93    CorsConfiguration, DeleteObjectsRequest, DeleteObjectsResult, GetObjectAttributesOutput,
94    HeadObjectResult, InitiateMultipartUploadResponse, ListBucketResult,
95    ListMultipartUploadsResult, ObjectIdentifier, Part,
96};
97#[allow(unused_imports)]
98use crate::utils::{PutStreamResponse, error_from_response_data};
99use http::HeaderMap;
100use http::header::HeaderName;
101#[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
102use sysinfo::{MemoryRefreshKind, System};
103
104pub const CHUNK_SIZE: usize = 8_388_608; // 8 Mebibytes, min is 5 (5_242_880);
105
106const DEFAULT_REQUEST_TIMEOUT: Option<Duration> = Some(Duration::from_secs(60));
107
108#[derive(Debug, PartialEq, Eq)]
109pub struct Tag {
110    key: String,
111    value: String,
112}
113
114impl Tag {
115    pub fn key(&self) -> String {
116        self.key.to_owned()
117    }
118
119    pub fn value(&self) -> String {
120        self.value.to_owned()
121    }
122}
123
124/// Instantiate an existing Bucket
125///
126/// # Example
127///
128/// ```no_run
129/// use s3::bucket::Bucket;
130/// use s3::creds::Credentials;
131///
132/// let bucket_name = "rust-s3-test";
133/// let region = "us-east-1".parse().unwrap();
134/// let credentials = Credentials::default().unwrap();
135///
136/// let bucket = Bucket::new(bucket_name, region, credentials);
137/// ```
138#[derive(Clone, Debug)]
139pub struct Bucket {
140    pub name: String,
141    pub region: Region,
142    credentials: Arc<RwLock<Credentials>>,
143    pub extra_headers: HeaderMap,
144    pub extra_query: Query,
145    pub request_timeout: Option<Duration>,
146    path_style: bool,
147    listobjects_v2: bool,
148    #[cfg(feature = "with-tokio")]
149    http_client: reqwest::Client,
150    #[cfg(feature = "with-tokio")]
151    client_options: crate::request::tokio_backend::ClientOptions,
152}
153
154impl Bucket {
155    #[maybe_async::async_impl]
156    /// Credential refreshing is done automatically, but can be manually triggered.
157    pub async fn credentials_refresh(&self) -> Result<(), S3Error> {
158        Ok(self.credentials.write().await.refresh()?)
159    }
160
161    #[maybe_async::sync_impl]
162    /// Credential refreshing is done automatically, but can be manually triggered.
163    pub fn credentials_refresh(&self) -> Result<(), S3Error> {
164        match self.credentials.write() {
165            Ok(mut credentials) => Ok(credentials.refresh()?),
166            Err(_) => Err(S3Error::CredentialsWriteLock),
167        }
168    }
169
170    #[cfg(feature = "with-tokio")]
171    pub fn http_client(&self) -> reqwest::Client {
172        self.http_client.clone()
173    }
174}
175
176fn validate_expiry(expiry_secs: u32) -> Result<(), S3Error> {
177    if 604800 < expiry_secs {
178        return Err(S3Error::MaxExpiry(expiry_secs));
179    }
180    Ok(())
181}
182
183#[cfg_attr(all(feature = "with-tokio", feature = "blocking"), block_on("tokio"))]
184#[cfg_attr(
185    all(feature = "with-async-std", feature = "blocking"),
186    block_on("async-std")
187)]
188impl Bucket {
189    /// Get a presigned url for getting object on a given path
190    ///
191    /// # Example:
192    ///
193    /// ```no_run
194    /// use std::collections::HashMap;
195    /// use s3::bucket::Bucket;
196    /// use s3::creds::Credentials;
197    ///
198    /// #[tokio::main]
199    /// async fn main() {
200    /// let bucket_name = "rust-s3-test";
201    /// let region = "us-east-1".parse().unwrap();
202    /// let credentials = Credentials::default().unwrap();
203    /// let bucket = Bucket::new(bucket_name, region, credentials).unwrap();
204    ///
205    /// // Add optional custom queries
206    /// let mut custom_queries = HashMap::new();
207    /// custom_queries.insert(
208    ///    "response-content-disposition".into(),
209    ///    "attachment; filename=\"test.png\"".into(),
210    /// );
211    ///
212    /// let url = bucket.presign_get("/test.file", 86400, Some(custom_queries)).await.unwrap();
213    /// println!("Presigned url: {}", url);
214    /// }
215    /// ```
216    #[maybe_async::maybe_async]
217    pub async fn presign_get<S: AsRef<str>>(
218        &self,
219        path: S,
220        expiry_secs: u32,
221        custom_queries: Option<HashMap<String, String>>,
222    ) -> Result<String, S3Error> {
223        validate_expiry(expiry_secs)?;
224        let request = RequestImpl::new(
225            self,
226            path.as_ref(),
227            Command::PresignGet {
228                expiry_secs,
229                custom_queries,
230            },
231        )
232        .await?;
233        request.presigned().await
234    }
235
236    /// Get a presigned url for posting an object to a given path
237    ///
238    /// # Example:
239    ///
240    /// ```no_run
241    /// use s3::bucket::Bucket;
242    /// use s3::creds::Credentials;
243    /// use s3::post_policy::*;
244    /// use std::borrow::Cow;
245    ///
246    /// #[tokio::main]
247    /// async fn main() {
248    /// let bucket_name = "rust-s3-test";
249    /// let region = "us-east-1".parse().unwrap();
250    /// let credentials = Credentials::default().unwrap();
251    /// let bucket = Bucket::new(bucket_name, region, credentials).unwrap();
252    ///
253    /// let post_policy = PostPolicy::new(86400).condition(
254    ///     PostPolicyField::Key,
255    ///     PostPolicyValue::StartsWith(Cow::from("user/user1/"))
256    /// ).unwrap();
257    ///
258    /// let presigned_post = bucket.presign_post(post_policy).await.unwrap();
259    /// println!("Presigned url: {}, fields: {:?}", presigned_post.url, presigned_post.fields);
260    /// }
261    /// ```
262    #[maybe_async::maybe_async]
263    #[allow(clippy::needless_lifetimes)]
264    pub async fn presign_post<'a>(
265        &self,
266        post_policy: PostPolicy<'a>,
267    ) -> Result<PresignedPost, S3Error> {
268        post_policy.sign(Box::new(self.clone())).await
269    }
270
271    /// Get a presigned url for putting object to a given path
272    ///
273    /// # Example:
274    ///
275    /// ```no_run
276    /// use s3::bucket::Bucket;
277    /// use s3::creds::Credentials;
278    /// use http::HeaderMap;
279    /// use http::header::HeaderName;
280    /// #[tokio::main]
281    /// async fn main() {
282    /// let bucket_name = "rust-s3-test";
283    /// let region = "us-east-1".parse().unwrap();
284    /// let credentials = Credentials::default().unwrap();
285    /// let bucket = Bucket::new(bucket_name, region, credentials).unwrap();
286    ///
287    /// // Add optional custom headers
288    /// let mut custom_headers = HeaderMap::new();
289    /// custom_headers.insert(
290    ///    HeaderName::from_static("custom_header"),
291    ///    "custom_value".parse().unwrap(),
292    /// );
293    ///
294    /// let url = bucket.presign_put("/test.file", 86400, Some(custom_headers), None).await.unwrap();
295    /// println!("Presigned url: {}", url);
296    /// }
297    /// ```
298    #[maybe_async::maybe_async]
299    pub async fn presign_put<S: AsRef<str>>(
300        &self,
301        path: S,
302        expiry_secs: u32,
303        custom_headers: Option<HeaderMap>,
304        custom_queries: Option<HashMap<String, String>>,
305    ) -> Result<String, S3Error> {
306        validate_expiry(expiry_secs)?;
307        let request = RequestImpl::new(
308            self,
309            path.as_ref(),
310            Command::PresignPut {
311                expiry_secs,
312                custom_headers,
313                custom_queries,
314            },
315        )
316        .await?;
317        request.presigned().await
318    }
319
320    /// Get a presigned url for deleting object on a given path
321    ///
322    /// # Example:
323    ///
324    /// ```no_run
325    /// use s3::bucket::Bucket;
326    /// use s3::creds::Credentials;
327    ///
328    ///
329    /// #[tokio::main]
330    /// async fn main() {
331    /// let bucket_name = "rust-s3-test";
332    /// let region = "us-east-1".parse().unwrap();
333    /// let credentials = Credentials::default().unwrap();
334    /// let bucket = Bucket::new(bucket_name, region, credentials).unwrap();
335    ///
336    /// let url = bucket.presign_delete("/test.file", 86400).await.unwrap();
337    /// println!("Presigned url: {}", url);
338    /// }
339    /// ```
340    #[maybe_async::maybe_async]
341    pub async fn presign_delete<S: AsRef<str>>(
342        &self,
343        path: S,
344        expiry_secs: u32,
345    ) -> Result<String, S3Error> {
346        validate_expiry(expiry_secs)?;
347        let request =
348            RequestImpl::new(self, path.as_ref(), Command::PresignDelete { expiry_secs }).await?;
349        request.presigned().await
350    }
351
352    /// Create a new `Bucket` and instantiate it
353    ///
354    /// ```no_run
355    /// use s3::{Bucket, BucketConfiguration};
356    /// use s3::creds::Credentials;
357    /// # use s3::region::Region;
358    /// use anyhow::Result;
359    ///
360    /// # #[tokio::main]
361    /// # async fn main() -> Result<()> {
362    /// let bucket_name = "rust-s3-test";
363    /// let region = "us-east-1".parse()?;
364    /// let credentials = Credentials::default()?;
365    /// let config = BucketConfiguration::default();
366    ///
367    /// // Async variant with `tokio` or `async-std` features
368    /// let create_bucket_response = Bucket::create(bucket_name, region, credentials, config).await?;
369    ///
370    /// // `sync` fature will produce an identical method
371    /// #[cfg(feature = "sync")]
372    /// let create_bucket_response = Bucket::create(bucket_name, region, credentials, config)?;
373    ///
374    /// # let region: Region = "us-east-1".parse()?;
375    /// # let credentials = Credentials::default()?;
376    /// # let config = BucketConfiguration::default();
377    /// // Blocking variant, generated with `blocking` feature in combination
378    /// // with `tokio` or `async-std` features.
379    /// #[cfg(feature = "blocking")]
380    /// let create_bucket_response = Bucket::create_blocking(bucket_name, region, credentials, config)?;
381    /// # Ok(())
382    /// # }
383    /// ```
384    #[maybe_async::maybe_async]
385    pub async fn create(
386        name: &str,
387        region: Region,
388        credentials: Credentials,
389        config: BucketConfiguration,
390    ) -> Result<CreateBucketResponse, S3Error> {
391        let mut config = config;
392
393        // Check if we should skip location constraint for LocalStack/Minio compatibility
394        // This env var allows users to create buckets on S3-compatible services that
395        // don't support or require location constraints in the request body
396        let skip_constraint = std::env::var("RUST_S3_SKIP_LOCATION_CONSTRAINT")
397            .unwrap_or_default()
398            .to_lowercase();
399
400        if skip_constraint != "true" && skip_constraint != "1" {
401            config.set_region(region.clone());
402        }
403
404        let command = Command::CreateBucket { config };
405        let bucket = Bucket::new(name, region, credentials)?;
406        let request = RequestImpl::new(&bucket, "", command).await?;
407        let response_data = request.response_data(false).await?;
408        let response_text = response_data.as_str()?;
409        Ok(CreateBucketResponse {
410            bucket,
411            response_text: response_text.to_string(),
412            response_code: response_data.status_code(),
413        })
414    }
415
416    /// Get a list of all existing buckets in the region
417    /// that are accessible by the given credentials.
418    /// ```no_run
419    /// use s3::{Bucket, BucketConfiguration};
420    /// use s3::creds::Credentials;
421    /// use s3::region::Region;
422    /// use anyhow::Result;
423    ///
424    /// # #[tokio::main]
425    /// # async fn main() -> Result<()> {
426    /// let region = Region::Custom {
427    ///   region: "eu-central-1".to_owned(),
428    ///   endpoint: "http://localhost:9000".to_owned()
429    /// };
430    /// let credentials = Credentials::default()?;
431    ///
432    /// // Async variant with `tokio` or `async-std` features
433    /// let response = Bucket::list_buckets(region, credentials).await?;
434    ///
435    /// // `sync` feature will produce an identical method
436    /// #[cfg(feature = "sync")]
437    /// let response = Bucket::list_buckets(region, credentials)?;
438    ///
439    /// // Blocking variant, generated with `blocking` feature in combination
440    /// // with `tokio` or `async-std` features.
441    /// #[cfg(feature = "blocking")]
442    /// let response = Bucket::list_buckets_blocking(region, credentials)?;
443    ///
444    /// let found_buckets = response.bucket_names().collect::<Vec<String>>();
445    /// println!("found buckets: {:#?}", found_buckets);
446    /// # Ok(())
447    /// # }
448    /// ```
449    #[maybe_async::maybe_async]
450    pub async fn list_buckets(
451        region: Region,
452        credentials: Credentials,
453    ) -> Result<crate::bucket_ops::ListBucketsResponse, S3Error> {
454        let dummy_bucket = Bucket::new("", region, credentials)?.with_path_style();
455        dummy_bucket._list_buckets().await
456    }
457
458    /// Internal helper method that performs the actual bucket listing operation.
459    /// Used by the public `list_buckets` method to retrieve the list of buckets for the configured client.
460    #[maybe_async::maybe_async]
461    async fn _list_buckets(&self) -> Result<crate::bucket_ops::ListBucketsResponse, S3Error> {
462        let request = RequestImpl::new(self, "", Command::ListBuckets).await?;
463        let response = request.response_data(false).await?;
464
465        Ok(quick_xml::de::from_str::<
466            crate::bucket_ops::ListBucketsResponse,
467        >(response.as_str()?)?)
468    }
469
470    /// Determine whether the instantiated bucket exists.
471    /// ```no_run
472    /// use s3::{Bucket, BucketConfiguration};
473    /// use s3::creds::Credentials;
474    /// use s3::region::Region;
475    /// use anyhow::Result;
476    ///
477    /// # #[tokio::main]
478    /// # async fn main() -> Result<()> {
479    /// let bucket_name = "some-bucket-that-is-known-to-exist";
480    /// let region = "us-east-1".parse()?;
481    /// let credentials = Credentials::default()?;
482    ///
483    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
484    ///
485    /// // Async variant with `tokio` or `async-std` features
486    /// let exists = bucket.exists().await?;
487    ///
488    /// // `sync` feature will produce an identical method
489    /// #[cfg(feature = "sync")]
490    /// let exists = bucket.exists()?;
491    ///
492    /// // Blocking variant, generated with `blocking` feature in combination
493    /// // with `tokio` or `async-std` features.
494    /// #[cfg(feature = "blocking")]
495    /// let exists = bucket.exists_blocking()?;
496    ///
497    /// assert_eq!(exists, true);
498    /// # Ok(())
499    /// # }
500    /// ```
501    #[maybe_async::maybe_async]
502    pub async fn exists(&self) -> Result<bool, S3Error> {
503        let mut dummy_bucket = self.clone();
504        dummy_bucket.name = "".into();
505
506        let response = dummy_bucket._list_buckets().await?;
507
508        Ok(response
509            .bucket_names()
510            .collect::<std::collections::HashSet<String>>()
511            .contains(&self.name))
512    }
513
514    /// Create a new `Bucket` with path style and instantiate it
515    ///
516    /// ```no_run
517    /// use s3::{Bucket, BucketConfiguration};
518    /// use s3::creds::Credentials;
519    /// # use s3::region::Region;
520    /// use anyhow::Result;
521    ///
522    /// # #[tokio::main]
523    /// # async fn main() -> Result<()> {
524    /// let bucket_name = "rust-s3-test";
525    /// let region = "us-east-1".parse()?;
526    /// let credentials = Credentials::default()?;
527    /// let config = BucketConfiguration::default();
528    ///
529    /// // Async variant with `tokio` or `async-std` features
530    /// let create_bucket_response = Bucket::create_with_path_style(bucket_name, region, credentials, config).await?;
531    ///
532    /// // `sync` fature will produce an identical method
533    /// #[cfg(feature = "sync")]
534    /// let create_bucket_response = Bucket::create_with_path_style(bucket_name, region, credentials, config)?;
535    ///
536    /// # let region: Region = "us-east-1".parse()?;
537    /// # let credentials = Credentials::default()?;
538    /// # let config = BucketConfiguration::default();
539    /// // Blocking variant, generated with `blocking` feature in combination
540    /// // with `tokio` or `async-std` features.
541    /// #[cfg(feature = "blocking")]
542    /// let create_bucket_response = Bucket::create_with_path_style_blocking(bucket_name, region, credentials, config)?;
543    /// # Ok(())
544    /// # }
545    /// ```
546    #[maybe_async::maybe_async]
547    pub async fn create_with_path_style(
548        name: &str,
549        region: Region,
550        credentials: Credentials,
551        config: BucketConfiguration,
552    ) -> Result<CreateBucketResponse, S3Error> {
553        let mut config = config;
554
555        // Check if we should skip location constraint for LocalStack/Minio compatibility
556        // This env var allows users to create buckets on S3-compatible services that
557        // don't support or require location constraints in the request body
558        let skip_constraint = std::env::var("RUST_S3_SKIP_LOCATION_CONSTRAINT")
559            .unwrap_or_default()
560            .to_lowercase();
561
562        if skip_constraint != "true" && skip_constraint != "1" {
563            config.set_region(region.clone());
564        }
565
566        let command = Command::CreateBucket { config };
567        let bucket = Bucket::new(name, region, credentials)?.with_path_style();
568        let request = RequestImpl::new(&bucket, "", command).await?;
569        let response_data = request.response_data(false).await?;
570        let response_text = response_data.to_string()?;
571
572        Ok(CreateBucketResponse {
573            bucket,
574            response_text,
575            response_code: response_data.status_code(),
576        })
577    }
578
579    /// Delete existing `Bucket`
580    ///
581    /// # Example
582    /// ```rust,no_run
583    /// use s3::Bucket;
584    /// use s3::creds::Credentials;
585    /// use anyhow::Result;
586    ///
587    /// # #[tokio::main]
588    /// # async fn main() -> Result<()> {
589    /// let bucket_name = "rust-s3-test";
590    /// let region = "us-east-1".parse().unwrap();
591    /// let credentials = Credentials::default().unwrap();
592    /// let bucket = Bucket::new(bucket_name, region, credentials).unwrap();
593    ///
594    /// // Async variant with `tokio` or `async-std` features
595    /// bucket.delete().await.unwrap();
596    /// // `sync` fature will produce an identical method
597    ///
598    /// #[cfg(feature = "sync")]
599    /// bucket.delete().unwrap();
600    /// // Blocking variant, generated with `blocking` feature in combination
601    /// // with `tokio` or `async-std` features.
602    ///
603    /// #[cfg(feature = "blocking")]
604    /// bucket.delete_blocking().unwrap();
605    ///
606    /// # Ok(())
607    /// # }
608    /// ```
609    #[maybe_async::maybe_async]
610    pub async fn delete(&self) -> Result<u16, S3Error> {
611        let command = Command::DeleteBucket;
612        let request = RequestImpl::new(self, "", command).await?;
613        let response_data = request.response_data(false).await?;
614        Ok(response_data.status_code())
615    }
616
617    /// Instantiate an existing `Bucket`.
618    ///
619    /// # Example
620    /// ```no_run
621    /// use s3::bucket::Bucket;
622    /// use s3::creds::Credentials;
623    ///
624    /// // Fake  credentials so we don't access user's real credentials in tests
625    /// let bucket_name = "rust-s3-test";
626    /// let region = "us-east-1".parse().unwrap();
627    /// let credentials = Credentials::default().unwrap();
628    ///
629    /// let bucket = Bucket::new(bucket_name, region, credentials).unwrap();
630    /// ```
631    pub fn new(
632        name: &str,
633        region: Region,
634        credentials: Credentials,
635    ) -> Result<Box<Bucket>, S3Error> {
636        #[cfg(feature = "with-tokio")]
637        let options = ClientOptions::default();
638
639        Ok(Box::new(Bucket {
640            name: name.into(),
641            region,
642            credentials: Arc::new(RwLock::new(credentials)),
643            extra_headers: HeaderMap::new(),
644            extra_query: HashMap::new(),
645            request_timeout: DEFAULT_REQUEST_TIMEOUT,
646            path_style: false,
647            listobjects_v2: true,
648            #[cfg(feature = "with-tokio")]
649            http_client: client(&options)?,
650            #[cfg(feature = "with-tokio")]
651            client_options: options,
652        }))
653    }
654
655    /// Instantiate a public existing `Bucket`.
656    ///
657    /// # Example
658    /// ```no_run
659    /// use s3::bucket::Bucket;
660    ///
661    /// let bucket_name = "rust-s3-test";
662    /// let region = "us-east-1".parse().unwrap();
663    ///
664    /// let bucket = Bucket::new_public(bucket_name, region).unwrap();
665    /// ```
666    pub fn new_public(name: &str, region: Region) -> Result<Bucket, S3Error> {
667        #[cfg(feature = "with-tokio")]
668        let options = ClientOptions::default();
669
670        Ok(Bucket {
671            name: name.into(),
672            region,
673            credentials: Arc::new(RwLock::new(Credentials::anonymous()?)),
674            extra_headers: HeaderMap::new(),
675            extra_query: HashMap::new(),
676            request_timeout: DEFAULT_REQUEST_TIMEOUT,
677            path_style: false,
678            listobjects_v2: true,
679            #[cfg(feature = "with-tokio")]
680            http_client: client(&options)?,
681            #[cfg(feature = "with-tokio")]
682            client_options: options,
683        })
684    }
685
686    pub fn with_path_style(&self) -> Box<Bucket> {
687        Box::new(Bucket {
688            name: self.name.clone(),
689            region: self.region.clone(),
690            credentials: self.credentials.clone(),
691            extra_headers: self.extra_headers.clone(),
692            extra_query: self.extra_query.clone(),
693            request_timeout: self.request_timeout,
694            path_style: true,
695            listobjects_v2: self.listobjects_v2,
696            #[cfg(feature = "with-tokio")]
697            http_client: self.http_client(),
698            #[cfg(feature = "with-tokio")]
699            client_options: self.client_options.clone(),
700        })
701    }
702
703    pub fn with_extra_headers(&self, extra_headers: HeaderMap) -> Result<Bucket, S3Error> {
704        Ok(Bucket {
705            name: self.name.clone(),
706            region: self.region.clone(),
707            credentials: self.credentials.clone(),
708            extra_headers,
709            extra_query: self.extra_query.clone(),
710            request_timeout: self.request_timeout,
711            path_style: self.path_style,
712            listobjects_v2: self.listobjects_v2,
713            #[cfg(feature = "with-tokio")]
714            http_client: self.http_client(),
715            #[cfg(feature = "with-tokio")]
716            client_options: self.client_options.clone(),
717        })
718    }
719
720    pub fn with_extra_query(
721        &self,
722        extra_query: HashMap<String, String>,
723    ) -> Result<Bucket, S3Error> {
724        Ok(Bucket {
725            name: self.name.clone(),
726            region: self.region.clone(),
727            credentials: self.credentials.clone(),
728            extra_headers: self.extra_headers.clone(),
729            extra_query,
730            request_timeout: self.request_timeout,
731            path_style: self.path_style,
732            listobjects_v2: self.listobjects_v2,
733            #[cfg(feature = "with-tokio")]
734            http_client: self.http_client(),
735            #[cfg(feature = "with-tokio")]
736            client_options: self.client_options.clone(),
737        })
738    }
739
740    #[cfg(not(feature = "with-tokio"))]
741    pub fn with_request_timeout(&self, request_timeout: Duration) -> Result<Box<Bucket>, S3Error> {
742        Ok(Box::new(Bucket {
743            name: self.name.clone(),
744            region: self.region.clone(),
745            credentials: self.credentials.clone(),
746            extra_headers: self.extra_headers.clone(),
747            extra_query: self.extra_query.clone(),
748            request_timeout: Some(request_timeout),
749            path_style: self.path_style,
750            listobjects_v2: self.listobjects_v2,
751        }))
752    }
753
754    #[cfg(feature = "with-tokio")]
755    pub fn with_request_timeout(&self, request_timeout: Duration) -> Result<Box<Bucket>, S3Error> {
756        let options = ClientOptions {
757            request_timeout: Some(request_timeout),
758            ..Default::default()
759        };
760
761        Ok(Box::new(Bucket {
762            name: self.name.clone(),
763            region: self.region.clone(),
764            credentials: self.credentials.clone(),
765            extra_headers: self.extra_headers.clone(),
766            extra_query: self.extra_query.clone(),
767            request_timeout: Some(request_timeout),
768            path_style: self.path_style,
769            listobjects_v2: self.listobjects_v2,
770            #[cfg(feature = "with-tokio")]
771            http_client: client(&options)?,
772            #[cfg(feature = "with-tokio")]
773            client_options: options,
774        }))
775    }
776
777    pub fn with_listobjects_v1(&self) -> Bucket {
778        Bucket {
779            name: self.name.clone(),
780            region: self.region.clone(),
781            credentials: self.credentials.clone(),
782            extra_headers: self.extra_headers.clone(),
783            extra_query: self.extra_query.clone(),
784            request_timeout: self.request_timeout,
785            path_style: self.path_style,
786            listobjects_v2: false,
787            #[cfg(feature = "with-tokio")]
788            http_client: self.http_client(),
789            #[cfg(feature = "with-tokio")]
790            client_options: self.client_options.clone(),
791        }
792    }
793
794    /// Configures a bucket to accept invalid SSL certificates and hostnames.
795    ///
796    /// This method is available only when either the `tokio-native-tls` or `tokio-rustls-tls` feature is enabled.
797    ///
798    /// # Parameters
799    ///
800    /// - `accept_invalid_certs`: A boolean flag that determines whether the client should accept invalid SSL certificates.
801    /// - `accept_invalid_hostnames`: A boolean flag that determines whether the client should accept invalid hostnames.
802    ///
803    /// # Returns
804    ///
805    /// Returns a `Result` containing the newly configured `Bucket` instance if successful, or an `S3Error` if an error occurs during client configuration.
806    ///
807    /// # Errors
808    ///
809    /// This function returns an `S3Error` if the HTTP client configuration fails.
810    ///
811    /// # Example
812    ///
813    /// ```rust
814    /// # use s3::bucket::Bucket;
815    /// # use s3::error::S3Error;
816    /// # use s3::creds::Credentials;
817    /// # use s3::Region;
818    /// # use std::str::FromStr;
819    ///
820    /// # fn example() -> Result<(), S3Error> {
821    /// let bucket = Bucket::new("my-bucket", Region::from_str("us-east-1")?, Credentials::default()?)?
822    ///     .set_dangerous_config(true, true)?;
823    /// # Ok(())
824    /// # }
825    /// ```
826    #[cfg(any(feature = "tokio-native-tls", feature = "tokio-rustls-tls"))]
827    pub fn set_dangerous_config(
828        &self,
829        accept_invalid_certs: bool,
830        accept_invalid_hostnames: bool,
831    ) -> Result<Bucket, S3Error> {
832        let mut options = self.client_options.clone();
833        options.accept_invalid_certs = accept_invalid_certs;
834        options.accept_invalid_hostnames = accept_invalid_hostnames;
835
836        Ok(Bucket {
837            name: self.name.clone(),
838            region: self.region.clone(),
839            credentials: self.credentials.clone(),
840            extra_headers: self.extra_headers.clone(),
841            extra_query: self.extra_query.clone(),
842            request_timeout: self.request_timeout,
843            path_style: self.path_style,
844            listobjects_v2: self.listobjects_v2,
845            http_client: client(&options)?,
846            client_options: options,
847        })
848    }
849
850    /// Deprecated alias for [`Bucket::set_dangerous_config`].
851    #[deprecated(
852        since = "0.37.3",
853        note = "use `set_dangerous_config`; this misspelled method remains for compatibility"
854    )]
855    #[cfg(any(feature = "tokio-native-tls", feature = "tokio-rustls-tls"))]
856    pub fn set_dangereous_config(
857        &self,
858        accept_invalid_certs: bool,
859        accept_invalid_hostnames: bool,
860    ) -> Result<Bucket, S3Error> {
861        self.set_dangerous_config(accept_invalid_certs, accept_invalid_hostnames)
862    }
863
864    #[cfg(feature = "with-tokio")]
865    pub fn set_proxy(&self, proxy: reqwest::Proxy) -> Result<Bucket, S3Error> {
866        let mut options = self.client_options.clone();
867        options.proxy = Some(proxy);
868
869        Ok(Bucket {
870            name: self.name.clone(),
871            region: self.region.clone(),
872            credentials: self.credentials.clone(),
873            extra_headers: self.extra_headers.clone(),
874            extra_query: self.extra_query.clone(),
875            request_timeout: self.request_timeout,
876            path_style: self.path_style,
877            listobjects_v2: self.listobjects_v2,
878            http_client: client(&options)?,
879            client_options: options,
880        })
881    }
882
883    /// Copy file from an S3 path, internally within the same bucket.
884    ///
885    /// # Example:
886    ///
887    /// ```rust,no_run
888    /// use s3::bucket::Bucket;
889    /// use s3::creds::Credentials;
890    /// use anyhow::Result;
891    ///
892    /// # #[tokio::main]
893    /// # async fn main() -> Result<()> {
894    ///
895    /// let bucket_name = "rust-s3-test";
896    /// let region = "us-east-1".parse()?;
897    /// let credentials = Credentials::default()?;
898    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
899    ///
900    /// // Async variant with `tokio` or `async-std` features
901    /// let code = bucket.copy_object_internal("/from.file", "/to.file").await?;
902    ///
903    /// // `sync` feature will produce an identical method
904    /// #[cfg(feature = "sync")]
905    /// let code = bucket.copy_object_internal("/from.file", "/to.file")?;
906    ///
907    /// # Ok(())
908    /// # }
909    /// ```
910    #[maybe_async::maybe_async]
911    pub async fn copy_object_internal<F: AsRef<str>, T: AsRef<str>>(
912        &self,
913        from: F,
914        to: T,
915    ) -> Result<u16, S3Error> {
916        let fq_from = {
917            let from = from.as_ref();
918            let from = from.strip_prefix('/').unwrap_or(from);
919            format!("{bucket}/{path}", bucket = self.name(), path = from)
920        };
921        self.copy_object(fq_from, to).await
922    }
923
924    #[maybe_async::maybe_async]
925    async fn copy_object<F: AsRef<str>, T: AsRef<str>>(
926        &self,
927        from: F,
928        to: T,
929    ) -> Result<u16, S3Error> {
930        let command = Command::CopyObject {
931            from: from.as_ref(),
932        };
933        let request = RequestImpl::new(self, to.as_ref(), command).await?;
934        let response_data = request.response_data(false).await?;
935        Ok(response_data.status_code())
936    }
937
938    /// Gets file from an S3 path.
939    ///
940    /// # Example:
941    ///
942    /// ```rust,no_run
943    /// use s3::bucket::Bucket;
944    /// use s3::creds::Credentials;
945    /// use anyhow::Result;
946    ///
947    /// # #[tokio::main]
948    /// # async fn main() -> Result<()> {
949    ///
950    /// let bucket_name = "rust-s3-test";
951    /// let region = "us-east-1".parse()?;
952    /// let credentials = Credentials::default()?;
953    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
954    ///
955    /// // Async variant with `tokio` or `async-std` features
956    /// let response_data = bucket.get_object("/test.file").await?;
957    ///
958    /// // `sync` feature will produce an identical method
959    /// #[cfg(feature = "sync")]
960    /// let response_data = bucket.get_object("/test.file")?;
961    ///
962    /// // Blocking variant, generated with `blocking` feature in combination
963    /// // with `tokio` or `async-std` features.
964    /// #[cfg(feature = "blocking")]
965    /// let response_data = bucket.get_object_blocking("/test.file")?;
966    /// # Ok(())
967    /// # }
968    /// ```
969    #[maybe_async::maybe_async]
970    pub async fn get_object<S: AsRef<str>>(&self, path: S) -> Result<ResponseData, S3Error> {
971        let command = Command::GetObject;
972        let request = RequestImpl::new(self, path.as_ref(), command).await?;
973        request.response_data(false).await
974    }
975
976    #[maybe_async::maybe_async]
977    pub async fn get_object_attributes<S: AsRef<str>>(
978        &self,
979        path: S,
980        expected_bucket_owner: &str,
981        version_id: Option<String>,
982    ) -> Result<GetObjectAttributesOutput, S3Error> {
983        let command = Command::GetObjectAttributes {
984            expected_bucket_owner: expected_bucket_owner.to_string(),
985            version_id,
986        };
987        let request = RequestImpl::new(self, path.as_ref(), command).await?;
988
989        let response = request.response_data(false).await?;
990
991        Ok(quick_xml::de::from_str::<GetObjectAttributesOutput>(
992            response.as_str()?,
993        )?)
994    }
995
996    /// Checks if an object exists at the specified S3 path.
997    ///
998    /// # Example:
999    ///
1000    /// ```rust,no_run
1001    /// use s3::bucket::Bucket;
1002    /// use s3::creds::Credentials;
1003    /// use anyhow::Result;
1004    ///
1005    /// # #[tokio::main]
1006    /// # async fn main() -> Result<()> {
1007    ///
1008    /// let bucket_name = "rust-s3-test";
1009    /// let region = "us-east-1".parse()?;
1010    /// let credentials = Credentials::default()?;
1011    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
1012    ///
1013    /// // Async variant with `tokio` or `async-std` features
1014    /// let exists = bucket.object_exists("/test.file").await?;
1015    ///
1016    /// // `sync` feature will produce an identical method
1017    /// #[cfg(feature = "sync")]
1018    /// let exists = bucket.object_exists("/test.file")?;
1019    ///
1020    /// // Blocking variant, generated with `blocking` feature in combination
1021    /// // with `tokio` or `async-std` features.
1022    /// #[cfg(feature = "blocking")]
1023    /// let exists = bucket.object_exists_blocking("/test.file")?;
1024    ///
1025    /// if exists {
1026    ///     println!("Object exists.");
1027    /// } else {
1028    ///     println!("Object does not exist.");
1029    /// }
1030    /// # Ok(())
1031    /// # }
1032    /// ```
1033    ///
1034    /// # Errors
1035    ///
1036    /// This function will return an `Err` if the request to the S3 service fails or if there is an unexpected error.
1037    /// It will return `Ok(false)` if the object does not exist (i.e., the server returns a 404 status code).
1038    #[maybe_async::maybe_async]
1039    pub async fn object_exists<S: AsRef<str>>(&self, path: S) -> Result<bool, S3Error> {
1040        let command = Command::HeadObject;
1041        let request = RequestImpl::new(self, path.as_ref(), command).await?;
1042        let status_code = request.response_status().await?;
1043        Ok(status_code != 404)
1044    }
1045
1046    #[maybe_async::maybe_async]
1047    pub async fn put_bucket_cors(
1048        &self,
1049        expected_bucket_owner: &str,
1050        cors_config: &CorsConfiguration,
1051    ) -> Result<ResponseData, S3Error> {
1052        let command = Command::PutBucketCors {
1053            expected_bucket_owner: expected_bucket_owner.to_string(),
1054            configuration: cors_config.clone(),
1055        };
1056        let request = RequestImpl::new(self, "", command).await?;
1057        request.response_data(false).await
1058    }
1059
1060    #[maybe_async::maybe_async]
1061    pub async fn get_bucket_cors(
1062        &self,
1063        expected_bucket_owner: &str,
1064    ) -> Result<CorsConfiguration, S3Error> {
1065        let command = Command::GetBucketCors {
1066            expected_bucket_owner: expected_bucket_owner.to_string(),
1067        };
1068        let request = RequestImpl::new(self, "", command).await?;
1069        let response = request.response_data(false).await?;
1070        Ok(quick_xml::de::from_str::<CorsConfiguration>(
1071            response.as_str()?,
1072        )?)
1073    }
1074
1075    #[maybe_async::maybe_async]
1076    pub async fn delete_bucket_cors(
1077        &self,
1078        expected_bucket_owner: &str,
1079    ) -> Result<ResponseData, S3Error> {
1080        let command = Command::DeleteBucketCors {
1081            expected_bucket_owner: expected_bucket_owner.to_string(),
1082        };
1083        let request = RequestImpl::new(self, "", command).await?;
1084        request.response_data(false).await
1085    }
1086
1087    #[maybe_async::maybe_async]
1088    pub async fn get_bucket_lifecycle(&self) -> Result<BucketLifecycleConfiguration, S3Error> {
1089        let request = RequestImpl::new(self, "", Command::GetBucketLifecycle).await?;
1090        let response = request.response_data(false).await?;
1091        Ok(quick_xml::de::from_str::<BucketLifecycleConfiguration>(
1092            response.as_str()?,
1093        )?)
1094    }
1095
1096    #[maybe_async::maybe_async]
1097    pub async fn put_bucket_lifecycle(
1098        &self,
1099        lifecycle_config: BucketLifecycleConfiguration,
1100    ) -> Result<ResponseData, S3Error> {
1101        let command = Command::PutBucketLifecycle {
1102            configuration: lifecycle_config,
1103        };
1104        let request = RequestImpl::new(self, "", command).await?;
1105        request.response_data(false).await
1106    }
1107
1108    #[maybe_async::maybe_async]
1109    pub async fn delete_bucket_lifecycle(&self) -> Result<ResponseData, S3Error> {
1110        let request = RequestImpl::new(self, "", Command::DeleteBucketLifecycle).await?;
1111        request.response_data(false).await
1112    }
1113
1114    /// Gets torrent from an S3 path.
1115    ///
1116    /// # Example:
1117    ///
1118    /// ```rust,no_run
1119    /// use s3::bucket::Bucket;
1120    /// use s3::creds::Credentials;
1121    /// use anyhow::Result;
1122    ///
1123    /// # #[tokio::main]
1124    /// # async fn main() -> Result<()> {
1125    ///
1126    /// let bucket_name = "rust-s3-test";
1127    /// let region = "us-east-1".parse()?;
1128    /// let credentials = Credentials::default()?;
1129    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
1130    ///
1131    /// // Async variant with `tokio` or `async-std` features
1132    /// let response_data = bucket.get_object_torrent("/test.file").await?;
1133    ///
1134    /// // `sync` feature will produce an identical method
1135    /// #[cfg(feature = "sync")]
1136    /// let response_data = bucket.get_object_torrent("/test.file")?;
1137    ///
1138    /// // Blocking variant, generated with `blocking` feature in combination
1139    /// // with `tokio` or `async-std` features.
1140    /// #[cfg(feature = "blocking")]
1141    /// let response_data = bucket.get_object_torrent_blocking("/test.file")?;
1142    /// # Ok(())
1143    /// # }
1144    /// ```
1145    #[maybe_async::maybe_async]
1146    pub async fn get_object_torrent<S: AsRef<str>>(
1147        &self,
1148        path: S,
1149    ) -> Result<ResponseData, S3Error> {
1150        let command = Command::GetObjectTorrent;
1151        let request = RequestImpl::new(self, path.as_ref(), command).await?;
1152        request.response_data(false).await
1153    }
1154
1155    /// Gets specified inclusive byte range of file from an S3 path.
1156    ///
1157    /// # Example:
1158    ///
1159    /// ```rust,no_run
1160    /// use s3::bucket::Bucket;
1161    /// use s3::creds::Credentials;
1162    /// use anyhow::Result;
1163    ///
1164    /// # #[tokio::main]
1165    /// # async fn main() -> Result<()> {
1166    ///
1167    /// let bucket_name = "rust-s3-test";
1168    /// let region = "us-east-1".parse()?;
1169    /// let credentials = Credentials::default()?;
1170    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
1171    ///
1172    /// // Async variant with `tokio` or `async-std` features
1173    /// let response_data = bucket.get_object_range("/test.file", 0, Some(31)).await?;
1174    ///
1175    /// // `sync` feature will produce an identical method
1176    /// #[cfg(feature = "sync")]
1177    /// let response_data = bucket.get_object_range("/test.file", 0, Some(31))?;
1178    ///
1179    /// // Blocking variant, generated with `blocking` feature in combination
1180    /// // with `tokio` or `async-std` features.
1181    /// #[cfg(feature = "blocking")]
1182    /// let response_data = bucket.get_object_range_blocking("/test.file", 0, Some(31))?;
1183    /// #
1184    /// # Ok(())
1185    /// # }
1186    /// ```
1187    #[maybe_async::maybe_async]
1188    pub async fn get_object_range<S: AsRef<str>>(
1189        &self,
1190        path: S,
1191        start: u64,
1192        end: Option<u64>,
1193    ) -> Result<ResponseData, S3Error> {
1194        if let Some(end) = end {
1195            assert!(start <= end);
1196        }
1197
1198        let command = Command::GetObjectRange { start, end };
1199        let request = RequestImpl::new(self, path.as_ref(), command).await?;
1200        request.response_data(false).await
1201    }
1202
1203    /// Stream range of bytes from S3 path to a local file, generic over T: Write.
1204    ///
1205    /// # Example:
1206    ///
1207    /// ```rust,no_run
1208    /// use s3::bucket::Bucket;
1209    /// use s3::creds::Credentials;
1210    /// use anyhow::Result;
1211    /// use std::fs::File;
1212    ///
1213    /// # #[tokio::main]
1214    /// # async fn main() -> Result<()> {
1215    ///
1216    /// let bucket_name = "rust-s3-test";
1217    /// let region = "us-east-1".parse()?;
1218    /// let credentials = Credentials::default()?;
1219    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
1220    /// let mut output_file = File::create("output_file").expect("Unable to create file");
1221    /// let mut async_output_file = tokio::fs::File::create("async_output_file").await.expect("Unable to create file");
1222    /// #[cfg(feature = "with-async-std")]
1223    /// let mut async_output_file = async_std::fs::File::create("async_output_file").await.expect("Unable to create file");
1224    ///
1225    /// let start = 0;
1226    /// let end = Some(1024);
1227    ///
1228    /// // Async variant with `tokio` or `async-std` features
1229    /// let status_code = bucket.get_object_range_to_writer("/test.file", start, end, &mut async_output_file).await?;
1230    ///
1231    /// // `sync` feature will produce an identical method
1232    /// #[cfg(feature = "sync")]
1233    /// let status_code = bucket.get_object_range_to_writer("/test.file", start, end, &mut output_file)?;
1234    ///
1235    /// // Blocking variant, generated with `blocking` feature in combination
1236    /// // with `tokio` or `async-std` features. Based of the async branch
1237    /// #[cfg(feature = "blocking")]
1238    /// let status_code = bucket.get_object_range_to_writer_blocking("/test.file", start, end, &mut async_output_file)?;
1239    /// #
1240    /// # Ok(())
1241    /// # }
1242    /// ```
1243    #[maybe_async::async_impl]
1244    pub async fn get_object_range_to_writer<T, S>(
1245        &self,
1246        path: S,
1247        start: u64,
1248        end: Option<u64>,
1249        writer: &mut T,
1250    ) -> Result<u16, S3Error>
1251    where
1252        T: AsyncWrite + Send + Unpin + ?Sized,
1253        S: AsRef<str>,
1254    {
1255        if let Some(end) = end {
1256            assert!(start <= end);
1257        }
1258
1259        let command = Command::GetObjectRange { start, end };
1260        let request = RequestImpl::new(self, path.as_ref(), command).await?;
1261        request.response_data_to_writer(writer).await
1262    }
1263
1264    #[maybe_async::sync_impl]
1265    pub fn get_object_range_to_writer<T: std::io::Write + Send + ?Sized, S: AsRef<str>>(
1266        &self,
1267        path: S,
1268        start: u64,
1269        end: Option<u64>,
1270        writer: &mut T,
1271    ) -> Result<u16, S3Error> {
1272        if let Some(end) = end {
1273            assert!(start <= end);
1274        }
1275
1276        let command = Command::GetObjectRange { start, end };
1277        let request = RequestImpl::new(self, path.as_ref(), command)?;
1278        request.response_data_to_writer(writer)
1279    }
1280
1281    /// Stream file from S3 path to a local file, generic over T: Write.
1282    ///
1283    /// # Example:
1284    ///
1285    /// ```rust,no_run
1286    /// use s3::bucket::Bucket;
1287    /// use s3::creds::Credentials;
1288    /// use anyhow::Result;
1289    /// use std::fs::File;
1290    ///
1291    /// # #[tokio::main]
1292    /// # async fn main() -> Result<()> {
1293    ///
1294    /// let bucket_name = "rust-s3-test";
1295    /// let region = "us-east-1".parse()?;
1296    /// let credentials = Credentials::default()?;
1297    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
1298    /// let mut output_file = File::create("output_file").expect("Unable to create file");
1299    /// let mut async_output_file = tokio::fs::File::create("async_output_file").await.expect("Unable to create file");
1300    /// #[cfg(feature = "with-async-std")]
1301    /// let mut async_output_file = async_std::fs::File::create("async_output_file").await.expect("Unable to create file");
1302    ///
1303    /// // Async variant with `tokio` or `async-std` features
1304    /// let status_code = bucket.get_object_to_writer("/test.file", &mut async_output_file).await?;
1305    ///
1306    /// // `sync` feature will produce an identical method
1307    /// #[cfg(feature = "sync")]
1308    /// let status_code = bucket.get_object_to_writer("/test.file", &mut output_file)?;
1309    ///
1310    /// // Blocking variant, generated with `blocking` feature in combination
1311    /// // with `tokio` or `async-std` features. Based of the async branch
1312    /// #[cfg(feature = "blocking")]
1313    /// let status_code = bucket.get_object_to_writer_blocking("/test.file", &mut async_output_file)?;
1314    /// #
1315    /// # Ok(())
1316    /// # }
1317    /// ```
1318    #[maybe_async::async_impl]
1319    pub async fn get_object_to_writer<T: AsyncWrite + Send + Unpin + ?Sized, S: AsRef<str>>(
1320        &self,
1321        path: S,
1322        writer: &mut T,
1323    ) -> Result<u16, S3Error> {
1324        let command = Command::GetObject;
1325        let request = RequestImpl::new(self, path.as_ref(), command).await?;
1326        request.response_data_to_writer(writer).await
1327    }
1328
1329    #[maybe_async::sync_impl]
1330    pub fn get_object_to_writer<T: std::io::Write + Send + ?Sized, S: AsRef<str>>(
1331        &self,
1332        path: S,
1333        writer: &mut T,
1334    ) -> Result<u16, S3Error> {
1335        let command = Command::GetObject;
1336        let request = RequestImpl::new(self, path.as_ref(), command)?;
1337        request.response_data_to_writer(writer)
1338    }
1339
1340    /// Stream file from S3 path to a local file using an async stream.
1341    ///
1342    /// # Example
1343    ///
1344    /// ```rust,no_run
1345    /// use s3::bucket::Bucket;
1346    /// use s3::creds::Credentials;
1347    /// use anyhow::Result;
1348    /// #[cfg(feature = "with-tokio")]
1349    /// use tokio_stream::StreamExt;
1350    /// #[cfg(feature = "with-tokio")]
1351    /// use tokio::io::AsyncWriteExt;
1352    /// #[cfg(feature = "with-async-std")]
1353    /// use async_std::stream::StreamExt;
1354    /// #[cfg(feature = "with-async-std")]
1355    /// use async_std::io::WriteExt;
1356    ///
1357    /// # #[tokio::main]
1358    /// # async fn main() -> Result<()> {
1359    ///
1360    /// let bucket_name = "rust-s3-test";
1361    /// let region = "us-east-1".parse()?;
1362    /// let credentials = Credentials::default()?;
1363    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
1364    /// let path = "path";
1365    ///
1366    /// let mut response_data_stream = bucket.get_object_stream(path).await?;
1367    ///
1368    /// #[cfg(feature = "with-tokio")]
1369    /// let mut async_output_file = tokio::fs::File::create("async_output_file").await.expect("Unable to create file");
1370    /// #[cfg(feature = "with-async-std")]
1371    /// let mut async_output_file = async_std::fs::File::create("async_output_file").await.expect("Unable to create file");
1372    ///
1373    /// while let Some(chunk) = response_data_stream.bytes().next().await {
1374    ///     async_output_file.write_all(&chunk.unwrap()).await?;
1375    /// }
1376    ///
1377    /// #
1378    /// # Ok(())
1379    /// # }
1380    /// ```
1381    #[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
1382    pub async fn get_object_stream<S: AsRef<str>>(
1383        &self,
1384        path: S,
1385    ) -> Result<ResponseDataStream, S3Error> {
1386        let command = Command::GetObject;
1387        let request = RequestImpl::new(self, path.as_ref(), command).await?;
1388        request.response_data_to_stream().await
1389    }
1390
1391    /// Stream file from local path to s3, generic over T: Write.
1392    ///
1393    /// # Example:
1394    ///
1395    /// ```rust,no_run
1396    /// use s3::bucket::Bucket;
1397    /// use s3::creds::Credentials;
1398    /// use anyhow::Result;
1399    /// use std::fs::File;
1400    /// use std::io::Write;
1401    ///
1402    /// # #[tokio::main]
1403    /// # async fn main() -> Result<()> {
1404    ///
1405    /// let bucket_name = "rust-s3-test";
1406    /// let region = "us-east-1".parse()?;
1407    /// let credentials = Credentials::default()?;
1408    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
1409    /// let path = "path";
1410    /// let test: Vec<u8> = (0..1000).map(|_| 42).collect();
1411    /// let mut file = File::create(path)?;
1412    /// // tokio open file
1413    /// let mut async_output_file = tokio::fs::File::create("async_output_file").await.expect("Unable to create file");
1414    /// file.write_all(&test)?;
1415    ///
1416    /// // Generic over std::io::Read
1417    /// #[cfg(feature = "with-tokio")]
1418    /// let status_code = bucket.put_object_stream(&mut async_output_file, "/path").await?;
1419    ///
1420    ///
1421    /// #[cfg(feature = "with-async-std")]
1422    /// let mut async_output_file = async_std::fs::File::create("async_output_file").await.expect("Unable to create file");
1423    ///
1424    /// // `sync` feature will produce an identical method
1425    /// #[cfg(feature = "sync")]
1426    /// // Generic over std::io::Read
1427    /// let status_code = bucket.put_object_stream(&mut path, "/path")?;
1428    ///
1429    /// // Blocking variant, generated with `blocking` feature in combination
1430    /// // with `tokio` or `async-std` features.
1431    /// #[cfg(feature = "blocking")]
1432    /// let status_code = bucket.put_object_stream_blocking(&mut path, "/path")?;
1433    /// #
1434    /// # Ok(())
1435    /// # }
1436    /// ```
1437    #[maybe_async::async_impl]
1438    pub async fn put_object_stream<R: AsyncRead + Unpin + ?Sized>(
1439        &self,
1440        reader: &mut R,
1441        s3_path: impl AsRef<str>,
1442    ) -> Result<PutStreamResponse, S3Error> {
1443        self._put_object_stream_with_content_type(
1444            reader,
1445            s3_path.as_ref(),
1446            "application/octet-stream",
1447        )
1448        .await
1449    }
1450
1451    /// Create a builder for streaming PUT operations with custom options
1452    ///
1453    /// # Example:
1454    ///
1455    /// ```no_run
1456    /// use s3::bucket::Bucket;
1457    /// use s3::creds::Credentials;
1458    /// use anyhow::Result;
1459    ///
1460    /// # #[cfg(feature = "with-tokio")]
1461    /// # #[tokio::main]
1462    /// # async fn main() -> Result<()> {
1463    /// # use tokio::fs::File;
1464    ///
1465    /// let bucket = Bucket::new("my-bucket", "us-east-1".parse()?, Credentials::default()?)?;
1466    ///
1467    /// # #[cfg(feature = "with-tokio")]
1468    /// let mut file = File::open("large-file.zip").await?;
1469    ///
1470    /// // Stream upload with custom headers using builder pattern
1471    /// let response = bucket.put_object_stream_builder("/large-file.zip")
1472    ///     .with_content_type("application/zip")
1473    ///     .with_cache_control("public, max-age=3600")?
1474    ///     .with_metadata("uploaded-by", "stream-builder")?
1475    ///     .execute_stream(&mut file)
1476    ///     .await?;
1477    /// #
1478    /// # Ok(())
1479    /// # }
1480    /// # #[cfg(not(feature = "with-tokio"))]
1481    /// # fn main() {}
1482    /// ```
1483    #[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
1484    pub fn put_object_stream_builder<S: AsRef<str>>(
1485        &self,
1486        path: S,
1487    ) -> crate::put_object_request::PutObjectStreamRequest<'_> {
1488        crate::put_object_request::PutObjectStreamRequest::new(self, path)
1489    }
1490
1491    #[maybe_async::sync_impl]
1492    pub fn put_object_stream<R: Read>(
1493        &self,
1494        reader: &mut R,
1495        s3_path: impl AsRef<str>,
1496    ) -> Result<u16, S3Error> {
1497        self._put_object_stream_with_content_type(
1498            reader,
1499            s3_path.as_ref(),
1500            "application/octet-stream",
1501        )
1502    }
1503
1504    /// Stream file from local path to s3, generic over T: Write with explicit content type.
1505    ///
1506    /// # Example:
1507    ///
1508    /// ```rust,no_run
1509    /// use s3::bucket::Bucket;
1510    /// use s3::creds::Credentials;
1511    /// use anyhow::Result;
1512    /// use std::fs::File;
1513    /// use std::io::Write;
1514    ///
1515    /// # #[tokio::main]
1516    /// # async fn main() -> Result<()> {
1517    ///
1518    /// let bucket_name = "rust-s3-test";
1519    /// let region = "us-east-1".parse()?;
1520    /// let credentials = Credentials::default()?;
1521    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
1522    /// let path = "path";
1523    /// let test: Vec<u8> = (0..1000).map(|_| 42).collect();
1524    /// let mut file = File::create(path)?;
1525    /// file.write_all(&test)?;
1526    ///
1527    /// #[cfg(feature = "with-tokio")]
1528    /// let mut async_output_file = tokio::fs::File::create("async_output_file").await.expect("Unable to create file");
1529    ///
1530    /// #[cfg(feature = "with-async-std")]
1531    /// let mut async_output_file = async_std::fs::File::create("async_output_file").await.expect("Unable to create file");
1532    ///
1533    /// // Async variant with `tokio` or `async-std` features
1534    /// // Generic over std::io::Read
1535    /// let status_code = bucket
1536    ///     .put_object_stream_with_content_type(&mut async_output_file, "/path", "application/octet-stream")
1537    ///     .await?;
1538    ///
1539    /// // `sync` feature will produce an identical method
1540    /// #[cfg(feature = "sync")]
1541    /// // Generic over std::io::Read
1542    /// let status_code = bucket
1543    ///     .put_object_stream_with_content_type(&mut path, "/path", "application/octet-stream")?;
1544    ///
1545    /// // Blocking variant, generated with `blocking` feature in combination
1546    /// // with `tokio` or `async-std` features.
1547    /// #[cfg(feature = "blocking")]
1548    /// let status_code = bucket
1549    ///     .put_object_stream_with_content_type_blocking(&mut path, "/path", "application/octet-stream")?;
1550    /// #
1551    /// # Ok(())
1552    /// # }
1553    /// ```
1554    #[maybe_async::async_impl]
1555    pub async fn put_object_stream_with_content_type<R: AsyncRead + Unpin>(
1556        &self,
1557        reader: &mut R,
1558        s3_path: impl AsRef<str>,
1559        content_type: impl AsRef<str>,
1560    ) -> Result<PutStreamResponse, S3Error> {
1561        self._put_object_stream_with_content_type(reader, s3_path.as_ref(), content_type.as_ref())
1562            .await
1563    }
1564
1565    #[maybe_async::sync_impl]
1566    pub fn put_object_stream_with_content_type<R: Read>(
1567        &self,
1568        reader: &mut R,
1569        s3_path: impl AsRef<str>,
1570        content_type: impl AsRef<str>,
1571    ) -> Result<u16, S3Error> {
1572        self._put_object_stream_with_content_type(reader, s3_path.as_ref(), content_type.as_ref())
1573    }
1574
1575    #[maybe_async::async_impl]
1576    async fn make_multipart_request(
1577        &self,
1578        path: &str,
1579        chunk: Vec<u8>,
1580        part_number: u32,
1581        upload_id: &str,
1582        content_type: &str,
1583    ) -> Result<ResponseData, S3Error> {
1584        let command = Command::PutObject {
1585            content: &chunk,
1586            multipart: Some(Multipart::new(part_number, upload_id)), // upload_id: &msg.upload_id,
1587            custom_headers: None,
1588            content_type,
1589        };
1590        let request = RequestImpl::new(self, path, command).await?;
1591        request.response_data(true).await
1592    }
1593
1594    #[maybe_async::async_impl]
1595    async fn _put_object_stream_with_content_type<R: AsyncRead + Unpin + ?Sized>(
1596        &self,
1597        reader: &mut R,
1598        s3_path: &str,
1599        content_type: &str,
1600    ) -> Result<PutStreamResponse, S3Error> {
1601        self._put_object_stream_with_content_type_and_headers(reader, s3_path, content_type, None)
1602            .await
1603    }
1604
1605    /// Calculate the maximum number of concurrent chunks based on available memory.
1606    /// Returns a value between 2 and 10, defaulting to 3 if memory detection fails.
1607    #[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
1608    fn calculate_max_concurrent_chunks() -> usize {
1609        // Create a new System instance and refresh memory info
1610        let mut system = System::new();
1611        system.refresh_memory_specifics(MemoryRefreshKind::everything());
1612
1613        // Get available memory in bytes
1614        let available_memory = system.available_memory();
1615
1616        // If we can't get memory info, use a conservative default
1617        if available_memory == 0 {
1618            return 3;
1619        }
1620
1621        // CHUNK_SIZE is 8MB (8_388_608 bytes)
1622        // Use a safety factor of 3 to leave room for other operations
1623        // and account for memory that might be allocated during upload
1624        let safety_factor = 3;
1625        let memory_per_chunk = CHUNK_SIZE as u64 * safety_factor;
1626
1627        // Calculate how many chunks we can safely handle concurrently
1628        let calculated_chunks = (available_memory / memory_per_chunk) as usize;
1629
1630        // Clamp between 2 and 100 for safety
1631        // Minimum 2 to maintain some parallelism
1632        // Maximum 100 to prevent too many concurrent connections
1633        calculated_chunks.clamp(2, 100)
1634    }
1635
1636    #[maybe_async::async_impl]
1637    pub(crate) async fn _put_object_stream_with_content_type_and_headers<
1638        R: AsyncRead + Unpin + ?Sized,
1639    >(
1640        &self,
1641        reader: &mut R,
1642        s3_path: &str,
1643        content_type: &str,
1644        custom_headers: Option<http::HeaderMap>,
1645    ) -> Result<PutStreamResponse, S3Error> {
1646        // If the file is smaller CHUNK_SIZE, just do a regular upload.
1647        // Otherwise perform a multi-part upload.
1648        let first_chunk = crate::utils::read_chunk_async(reader).await?;
1649        // println!("First chunk size: {}", first_chunk.len());
1650        if first_chunk.len() < CHUNK_SIZE {
1651            let total_size = first_chunk.len();
1652            // Use the builder pattern for small files
1653            let mut builder = self
1654                .put_object_builder(s3_path, first_chunk.as_slice())
1655                .with_content_type(content_type);
1656
1657            // Add custom headers if provided
1658            if let Some(headers) = custom_headers {
1659                builder = builder.with_headers(headers);
1660            }
1661
1662            let response_data = builder.execute().await?;
1663            if response_data.status_code() >= 300 {
1664                return Err(error_from_response_data(response_data)?);
1665            }
1666            return Ok(PutStreamResponse::new(
1667                response_data.status_code(),
1668                total_size,
1669            ));
1670        }
1671
1672        let msg = self
1673            .initiate_multipart_upload(s3_path, content_type)
1674            .await?;
1675        let path = msg.key;
1676        let upload_id = &msg.upload_id;
1677
1678        // Determine max concurrent chunks based on available memory
1679        let max_concurrent_chunks = Self::calculate_max_concurrent_chunks();
1680
1681        // Use FuturesUnordered for bounded parallelism
1682        use futures_util::FutureExt;
1683        use futures_util::stream::{FuturesUnordered, StreamExt};
1684
1685        let mut part_number: u32 = 0;
1686        let mut total_size = 0;
1687        let mut etags = Vec::new();
1688        let mut active_uploads: FuturesUnordered<
1689            futures_util::future::BoxFuture<'_, (u32, Result<ResponseData, S3Error>)>,
1690        > = FuturesUnordered::new();
1691        let mut reading_done = false;
1692
1693        // Process first chunk
1694        part_number += 1;
1695        total_size += first_chunk.len();
1696        if first_chunk.len() < CHUNK_SIZE {
1697            reading_done = true;
1698        }
1699
1700        let path_clone = path.clone();
1701        let upload_id_clone = upload_id.clone();
1702        let content_type_clone = content_type.to_string();
1703        let bucket_clone = self.clone();
1704
1705        active_uploads.push(
1706            async move {
1707                let result = bucket_clone
1708                    .make_multipart_request(
1709                        &path_clone,
1710                        first_chunk,
1711                        1,
1712                        &upload_id_clone,
1713                        &content_type_clone,
1714                    )
1715                    .await;
1716                (1, result)
1717            }
1718            .boxed(),
1719        );
1720
1721        // Main upload loop with bounded parallelism
1722        while !active_uploads.is_empty() || !reading_done {
1723            // Start new uploads if we have room and more data to read
1724            while active_uploads.len() < max_concurrent_chunks && !reading_done {
1725                let chunk = crate::utils::read_chunk_async(reader).await?;
1726                let chunk_len = chunk.len();
1727
1728                if chunk_len == 0 {
1729                    reading_done = true;
1730                    break;
1731                }
1732
1733                total_size += chunk_len;
1734                part_number += 1;
1735
1736                if chunk_len < CHUNK_SIZE {
1737                    reading_done = true;
1738                }
1739
1740                let current_part = part_number;
1741                let path_clone = path.clone();
1742                let upload_id_clone = upload_id.clone();
1743                let content_type_clone = content_type.to_string();
1744                let bucket_clone = self.clone();
1745
1746                active_uploads.push(
1747                    async move {
1748                        let result = bucket_clone
1749                            .make_multipart_request(
1750                                &path_clone,
1751                                chunk,
1752                                current_part,
1753                                &upload_id_clone,
1754                                &content_type_clone,
1755                            )
1756                            .await;
1757                        (current_part, result)
1758                    }
1759                    .boxed(),
1760                );
1761            }
1762
1763            // Process completed uploads
1764            if let Some((part_num, result)) = active_uploads.next().await {
1765                let response_data = result?;
1766                if !(200..300).contains(&response_data.status_code()) {
1767                    // if chunk upload failed - abort the upload
1768                    match self.abort_upload(&path, upload_id).await {
1769                        Ok(_) => {
1770                            return Err(error_from_response_data(response_data)?);
1771                        }
1772                        Err(error) => {
1773                            return Err(error);
1774                        }
1775                    }
1776                }
1777
1778                let etag = response_data.as_str()?;
1779                // Store part number with etag to sort later
1780                etags.push((part_num, etag.to_string()));
1781            }
1782        }
1783
1784        // Sort etags by part number to ensure correct order
1785        etags.sort_by_key(|k| k.0);
1786        let etags: Vec<String> = etags.into_iter().map(|(_, etag)| etag).collect();
1787
1788        // Finish the upload
1789        let inner_data = etags
1790            .clone()
1791            .into_iter()
1792            .enumerate()
1793            .map(|(i, x)| Part {
1794                etag: x,
1795                part_number: i as u32 + 1,
1796            })
1797            .collect::<Vec<Part>>();
1798        let response_data = self
1799            .complete_multipart_upload(&path, &msg.upload_id, inner_data)
1800            .await?;
1801
1802        Ok(PutStreamResponse::new(
1803            response_data.status_code(),
1804            total_size,
1805        ))
1806    }
1807
1808    #[maybe_async::sync_impl]
1809    fn _put_object_stream_with_content_type<R: Read + ?Sized>(
1810        &self,
1811        reader: &mut R,
1812        s3_path: &str,
1813        content_type: &str,
1814    ) -> Result<u16, S3Error> {
1815        let msg = self.initiate_multipart_upload(s3_path, content_type)?;
1816        let path = msg.key;
1817        let upload_id = &msg.upload_id;
1818
1819        let mut part_number: u32 = 0;
1820        let mut etags = Vec::new();
1821        loop {
1822            let chunk = crate::utils::read_chunk(reader)?;
1823
1824            if chunk.len() < CHUNK_SIZE {
1825                if part_number == 0 {
1826                    // Files is not big enough for multipart upload, going with regular put_object
1827                    self.abort_upload(&path, upload_id)?;
1828
1829                    return Ok(self.put_object(s3_path, chunk.as_slice())?.status_code());
1830                } else {
1831                    part_number += 1;
1832                    let part = self.put_multipart_chunk(
1833                        &chunk,
1834                        &path,
1835                        part_number,
1836                        upload_id,
1837                        content_type,
1838                    )?;
1839                    etags.push(part.etag);
1840                    let inner_data = etags
1841                        .into_iter()
1842                        .enumerate()
1843                        .map(|(i, x)| Part {
1844                            etag: x,
1845                            part_number: i as u32 + 1,
1846                        })
1847                        .collect::<Vec<Part>>();
1848                    return Ok(self
1849                        .complete_multipart_upload(&path, upload_id, inner_data)?
1850                        .status_code());
1851                    // let response = std::str::from_utf8(data.as_slice())?;
1852                }
1853            } else {
1854                part_number += 1;
1855                let part =
1856                    self.put_multipart_chunk(&chunk, &path, part_number, upload_id, content_type)?;
1857                etags.push(part.etag.to_string());
1858            }
1859        }
1860    }
1861
1862    /// Initiate multipart upload to s3.
1863    #[maybe_async::async_impl]
1864    pub async fn initiate_multipart_upload(
1865        &self,
1866        s3_path: &str,
1867        content_type: &str,
1868    ) -> Result<InitiateMultipartUploadResponse, S3Error> {
1869        let command = Command::InitiateMultipartUpload { content_type };
1870        let request = RequestImpl::new(self, s3_path, command).await?;
1871        let response_data = request.response_data(false).await?;
1872        if response_data.status_code() >= 300 {
1873            return Err(error_from_response_data(response_data)?);
1874        }
1875
1876        let msg: InitiateMultipartUploadResponse =
1877            quick_xml::de::from_str(response_data.as_str()?)?;
1878        Ok(msg)
1879    }
1880
1881    #[maybe_async::sync_impl]
1882    pub fn initiate_multipart_upload(
1883        &self,
1884        s3_path: &str,
1885        content_type: &str,
1886    ) -> Result<InitiateMultipartUploadResponse, S3Error> {
1887        let command = Command::InitiateMultipartUpload { content_type };
1888        let request = RequestImpl::new(self, s3_path, command)?;
1889        let response_data = request.response_data(false)?;
1890        if response_data.status_code() >= 300 {
1891            return Err(error_from_response_data(response_data)?);
1892        }
1893
1894        let msg: InitiateMultipartUploadResponse =
1895            quick_xml::de::from_str(response_data.as_str()?)?;
1896        Ok(msg)
1897    }
1898
1899    /// Upload a streamed multipart chunk to s3 using a previously initiated multipart upload
1900    #[maybe_async::async_impl]
1901    pub async fn put_multipart_stream<R: Read + Unpin>(
1902        &self,
1903        reader: &mut R,
1904        path: &str,
1905        part_number: u32,
1906        upload_id: &str,
1907        content_type: &str,
1908    ) -> Result<Part, S3Error> {
1909        let chunk = crate::utils::read_chunk(reader)?;
1910        self.put_multipart_chunk(chunk, path, part_number, upload_id, content_type)
1911            .await
1912    }
1913
1914    #[maybe_async::sync_impl]
1915    pub async fn put_multipart_stream<R: Read + Unpin>(
1916        &self,
1917        reader: &mut R,
1918        path: &str,
1919        part_number: u32,
1920        upload_id: &str,
1921        content_type: &str,
1922    ) -> Result<Part, S3Error> {
1923        let chunk = crate::utils::read_chunk(reader)?;
1924        self.put_multipart_chunk(&chunk, path, part_number, upload_id, content_type)
1925    }
1926
1927    /// Upload a buffered multipart chunk to s3 using a previously initiated multipart upload
1928    #[maybe_async::async_impl]
1929    pub async fn put_multipart_chunk(
1930        &self,
1931        chunk: Vec<u8>,
1932        path: &str,
1933        part_number: u32,
1934        upload_id: &str,
1935        content_type: &str,
1936    ) -> Result<Part, S3Error> {
1937        let command = Command::PutObject {
1938            // part_number,
1939            content: &chunk,
1940            multipart: Some(Multipart::new(part_number, upload_id)), // upload_id: &msg.upload_id,
1941            custom_headers: None,
1942            content_type,
1943        };
1944        let request = RequestImpl::new(self, path, command).await?;
1945        let response_data = request.response_data(true).await?;
1946        if !(200..300).contains(&response_data.status_code()) {
1947            // if chunk upload failed - abort the upload
1948            match self.abort_upload(path, upload_id).await {
1949                Ok(_) => {
1950                    return Err(error_from_response_data(response_data)?);
1951                }
1952                Err(error) => {
1953                    return Err(error);
1954                }
1955            }
1956        }
1957        let etag = response_data.as_str()?;
1958        Ok(Part {
1959            etag: etag.to_string(),
1960            part_number,
1961        })
1962    }
1963
1964    #[maybe_async::sync_impl]
1965    pub fn put_multipart_chunk(
1966        &self,
1967        chunk: &[u8],
1968        path: &str,
1969        part_number: u32,
1970        upload_id: &str,
1971        content_type: &str,
1972    ) -> Result<Part, S3Error> {
1973        let command = Command::PutObject {
1974            // part_number,
1975            content: chunk,
1976            multipart: Some(Multipart::new(part_number, upload_id)), // upload_id: &msg.upload_id,
1977            custom_headers: None,
1978            content_type,
1979        };
1980        let request = RequestImpl::new(self, path, command)?;
1981        let response_data = request.response_data(true)?;
1982        if !(200..300).contains(&response_data.status_code()) {
1983            // if chunk upload failed - abort the upload
1984            match self.abort_upload(path, upload_id) {
1985                Ok(_) => {
1986                    return Err(error_from_response_data(response_data)?);
1987                }
1988                Err(error) => {
1989                    return Err(error);
1990                }
1991            }
1992        }
1993        let etag = response_data.as_str()?;
1994        Ok(Part {
1995            etag: etag.to_string(),
1996            part_number,
1997        })
1998    }
1999
2000    /// Completes a previously initiated multipart upload, with optional final data chunks
2001    #[maybe_async::async_impl]
2002    pub async fn complete_multipart_upload(
2003        &self,
2004        path: &str,
2005        upload_id: &str,
2006        parts: Vec<Part>,
2007    ) -> Result<ResponseData, S3Error> {
2008        let data = CompleteMultipartUploadData { parts };
2009        let complete = Command::CompleteMultipartUpload { upload_id, data };
2010        let complete_request = RequestImpl::new(self, path, complete).await?;
2011        complete_request.response_data(false).await
2012    }
2013
2014    #[maybe_async::sync_impl]
2015    pub fn complete_multipart_upload(
2016        &self,
2017        path: &str,
2018        upload_id: &str,
2019        parts: Vec<Part>,
2020    ) -> Result<ResponseData, S3Error> {
2021        let data = CompleteMultipartUploadData { parts };
2022        let complete = Command::CompleteMultipartUpload { upload_id, data };
2023        let complete_request = RequestImpl::new(self, path, complete)?;
2024        complete_request.response_data(false)
2025    }
2026
2027    /// Get Bucket location.
2028    ///
2029    /// # Example:
2030    ///
2031    /// ```no_run
2032    /// use s3::bucket::Bucket;
2033    /// use s3::creds::Credentials;
2034    /// use anyhow::Result;
2035    ///
2036    /// # #[tokio::main]
2037    /// # async fn main() -> Result<()> {
2038    ///
2039    /// let bucket_name = "rust-s3-test";
2040    /// let region = "us-east-1".parse()?;
2041    /// let credentials = Credentials::default()?;
2042    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
2043    ///
2044    /// // Async variant with `tokio` or `async-std` features
2045    /// let (region, status_code) = bucket.location().await?;
2046    ///
2047    /// // `sync` feature will produce an identical method
2048    /// #[cfg(feature = "sync")]
2049    /// let (region, status_code) = bucket.location()?;
2050    ///
2051    /// // Blocking variant, generated with `blocking` feature in combination
2052    /// // with `tokio` or `async-std` features.
2053    /// #[cfg(feature = "blocking")]
2054    /// let (region, status_code) = bucket.location_blocking()?;
2055    /// #
2056    /// # Ok(())
2057    /// # }
2058    /// ```
2059    #[maybe_async::maybe_async]
2060    pub async fn location(&self) -> Result<(Region, u16), S3Error> {
2061        let request = RequestImpl::new(self, "?location", Command::GetBucketLocation).await?;
2062        let response_data = request.response_data(false).await?;
2063        let region_string = String::from_utf8_lossy(response_data.as_slice());
2064        let region = match quick_xml::de::from_reader(region_string.as_bytes()) {
2065            Ok(r) => {
2066                let location_result: BucketLocationResult = r;
2067                location_result.region.parse()?
2068            }
2069            Err(e) => {
2070                if response_data.status_code() == 200 {
2071                    Region::Custom {
2072                        region: "Custom".to_string(),
2073                        endpoint: "".to_string(),
2074                    }
2075                } else {
2076                    Region::Custom {
2077                        region: format!("Error encountered : {}", e),
2078                        endpoint: "".to_string(),
2079                    }
2080                }
2081            }
2082        };
2083        Ok((region, response_data.status_code()))
2084    }
2085
2086    /// Delete file from an S3 path.
2087    ///
2088    /// # Example:
2089    ///
2090    /// ```no_run
2091    /// use s3::bucket::Bucket;
2092    /// use s3::creds::Credentials;
2093    /// use anyhow::Result;
2094    ///
2095    /// # #[tokio::main]
2096    /// # async fn main() -> Result<()> {
2097    ///
2098    /// let bucket_name = "rust-s3-test";
2099    /// let region = "us-east-1".parse()?;
2100    /// let credentials = Credentials::default()?;
2101    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
2102    ///
2103    /// // Async variant with `tokio` or `async-std` features
2104    /// let response_data = bucket.delete_object("/test.file").await?;
2105    ///
2106    /// // `sync` feature will produce an identical method
2107    /// #[cfg(feature = "sync")]
2108    /// let response_data = bucket.delete_object("/test.file")?;
2109    ///
2110    /// // Blocking variant, generated with `blocking` feature in combination
2111    /// // with `tokio` or `async-std` features.
2112    /// #[cfg(feature = "blocking")]
2113    /// let response_data = bucket.delete_object_blocking("/test.file")?;
2114    /// #
2115    /// # Ok(())
2116    /// # }
2117    /// ```
2118    #[maybe_async::maybe_async]
2119    pub async fn delete_object<S: AsRef<str>>(&self, path: S) -> Result<ResponseData, S3Error> {
2120        let command = Command::DeleteObject;
2121        let request = RequestImpl::new(self, path.as_ref(), command).await?;
2122        request.response_data(false).await
2123    }
2124
2125    /// Delete multiple objects from S3 using the Multi-Object Delete API.
2126    ///
2127    /// If more than 1000 objects are provided, they are automatically batched
2128    /// into multiple requests (S3 allows at most 1000 keys per request).
2129    /// Results from all batches are combined into a single response.
2130    ///
2131    /// # Example:
2132    ///
2133    /// ```no_run
2134    /// use s3::bucket::Bucket;
2135    /// use s3::creds::Credentials;
2136    /// use s3::serde_types::ObjectIdentifier;
2137    /// use anyhow::Result;
2138    ///
2139    /// # #[tokio::main]
2140    /// # async fn main() -> Result<()> {
2141    ///
2142    /// let bucket_name = "rust-s3-test";
2143    /// let region = "us-east-1".parse()?;
2144    /// let credentials = Credentials::default()?;
2145    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
2146    ///
2147    /// let objects = vec![
2148    ///     ObjectIdentifier::new("file1.txt"),
2149    ///     ObjectIdentifier::new("file2.txt"),
2150    ///     ObjectIdentifier::new("file3.txt"),
2151    /// ];
2152    ///
2153    /// // Async variant with `tokio` or `async-std` features
2154    /// let response = bucket.delete_objects(objects).await?;
2155    ///
2156    /// // `sync` feature will produce an identical method
2157    /// #[cfg(feature = "sync")]
2158    /// let response = bucket.delete_objects(objects)?;
2159    ///
2160    /// // Blocking variant, generated with `blocking` feature in combination
2161    /// // with `tokio` or `async-std` features.
2162    /// #[cfg(feature = "blocking")]
2163    /// let response = bucket.delete_objects_blocking(objects)?;
2164    /// #
2165    /// # Ok(())
2166    /// # }
2167    /// ```
2168    #[maybe_async::maybe_async]
2169    pub async fn delete_objects<I: Into<Vec<ObjectIdentifier>>>(
2170        &self,
2171        objects: I,
2172    ) -> Result<DeleteObjectsResult, S3Error> {
2173        let objects = objects.into();
2174        let mut result = DeleteObjectsResult {
2175            deleted: Vec::new(),
2176            errors: Vec::new(),
2177        };
2178
2179        // Strip leading '/' from keys to match library convention.
2180        // Other methods (put_object, delete_object, etc.) strip the leading
2181        // slash when building the URL; we do the same for the XML body.
2182        let objects: Vec<ObjectIdentifier> = objects
2183            .into_iter()
2184            .map(|mut obj| {
2185                if let Some(stripped) = obj.key.strip_prefix('/') {
2186                    obj.key = stripped.to_string();
2187                }
2188                obj
2189            })
2190            .collect();
2191
2192        for chunk in objects.chunks(1000) {
2193            let data = DeleteObjectsRequest {
2194                objects: chunk.to_vec(),
2195                quiet: false,
2196            };
2197            let command = Command::DeleteObjects { data };
2198            let request = RequestImpl::new(self, "/", command).await?;
2199            let response_data = request.response_data(false).await?;
2200            if response_data.status_code() >= 300 {
2201                return Err(error_from_response_data(response_data)?);
2202            }
2203            let msg: DeleteObjectsResult = quick_xml::de::from_str(response_data.as_str()?)?;
2204            result.deleted.extend(msg.deleted);
2205            result.errors.extend(msg.errors);
2206        }
2207
2208        Ok(result)
2209    }
2210
2211    /// Head object from S3.
2212    ///
2213    /// # Example:
2214    ///
2215    /// ```no_run
2216    /// use s3::bucket::Bucket;
2217    /// use s3::creds::Credentials;
2218    /// use anyhow::Result;
2219    ///
2220    /// # #[tokio::main]
2221    /// # async fn main() -> Result<()> {
2222    ///
2223    /// let bucket_name = "rust-s3-test";
2224    /// let region = "us-east-1".parse()?;
2225    /// let credentials = Credentials::default()?;
2226    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
2227    ///
2228    /// // Async variant with `tokio` or `async-std` features
2229    /// let (head_object_result, code) = bucket.head_object("/test.png").await?;
2230    ///
2231    /// // `sync` feature will produce an identical method
2232    /// #[cfg(feature = "sync")]
2233    /// let (head_object_result, code) = bucket.head_object("/test.png")?;
2234    ///
2235    /// // Blocking variant, generated with `blocking` feature in combination
2236    /// // with `tokio` or `async-std` features.
2237    /// #[cfg(feature = "blocking")]
2238    /// let (head_object_result, code) = bucket.head_object_blocking("/test.png")?;
2239    /// #
2240    /// # Ok(())
2241    /// # }
2242    /// ```
2243    #[maybe_async::maybe_async]
2244    pub async fn head_object<S: AsRef<str>>(
2245        &self,
2246        path: S,
2247    ) -> Result<(HeadObjectResult, u16), S3Error> {
2248        let command = Command::HeadObject;
2249        let request = RequestImpl::new(self, path.as_ref(), command).await?;
2250        let (headers, status) = request.response_header().await?;
2251        let header_object = HeadObjectResult::from(&headers);
2252        Ok((header_object, status))
2253    }
2254
2255    /// Put into an S3 bucket, with explicit content-type.
2256    ///
2257    /// # Example:
2258    ///
2259    /// ```no_run
2260    /// use s3::bucket::Bucket;
2261    /// use s3::creds::Credentials;
2262    /// use anyhow::Result;
2263    ///
2264    /// # #[tokio::main]
2265    /// # async fn main() -> Result<()> {
2266    ///
2267    /// let bucket_name = "rust-s3-test";
2268    /// let region = "us-east-1".parse()?;
2269    /// let credentials = Credentials::default()?;
2270    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
2271    /// let content = "I want to go to S3".as_bytes();
2272    ///
2273    /// // Async variant with `tokio` or `async-std` features
2274    /// let response_data = bucket.put_object_with_content_type("/test.file", content, "text/plain").await?;
2275    ///
2276    /// // `sync` feature will produce an identical method
2277    /// #[cfg(feature = "sync")]
2278    /// let response_data = bucket.put_object_with_content_type("/test.file", content, "text/plain")?;
2279    ///
2280    /// // Blocking variant, generated with `blocking` feature in combination
2281    /// // with `tokio` or `async-std` features.
2282    /// #[cfg(feature = "blocking")]
2283    /// let response_data = bucket.put_object_with_content_type_blocking("/test.file", content, "text/plain")?;
2284    /// #
2285    /// # Ok(())
2286    /// # }
2287    /// ```
2288    #[maybe_async::maybe_async]
2289    pub async fn put_object_with_content_type<S: AsRef<str>>(
2290        &self,
2291        path: S,
2292        content: &[u8],
2293        content_type: &str,
2294    ) -> Result<ResponseData, S3Error> {
2295        let command = Command::PutObject {
2296            content,
2297            content_type,
2298            custom_headers: None,
2299            multipart: None,
2300        };
2301        let request = RequestImpl::new(self, path.as_ref(), command).await?;
2302        request.response_data(true).await
2303    }
2304
2305    /// Put into an S3 bucket, with explicit content-type and custom headers for the request.
2306    ///
2307    /// # Example:
2308    ///
2309    /// ```no_run
2310    /// use s3::bucket::Bucket;
2311    /// use s3::creds::Credentials;
2312    /// use anyhow::Result;
2313    ///
2314    /// # #[tokio::main]
2315    /// # async fn main() -> Result<()> {
2316    ///
2317    /// let bucket_name = "rust-s3-test";
2318    /// let region = "us-east-1".parse()?;
2319    /// let credentials = Credentials::default()?;
2320    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
2321    /// let content = "I want to go to S3".as_bytes();
2322    ///
2323    /// let mut headers = http::HeaderMap::new();
2324    /// headers.insert(
2325    ///     http::HeaderName::from_static("cache-control"),
2326    ///     "public, max-age=300".parse().unwrap(),
2327    /// );
2328    ///
2329    /// // Async variant with `tokio` or `async-std` features
2330    /// let response_data = bucket
2331    ///     .put_object_with_content_type_and_headers("/test.file", content, "text/plain", Some(headers)).await?;
2332    ///
2333    /// // `sync` feature will produce an identical method
2334    /// #[cfg(feature = "sync")]
2335    /// let response_data = bucket
2336    ///     .put_object_with_content_type_and_headers("/test.file", content, "text/plain", Some(headers))?;
2337    ///
2338    /// // Blocking variant, generated with `blocking` feature in combination
2339    /// // with `tokio` or `async-std` features.
2340    /// #[cfg(feature = "blocking")]
2341    /// let response_data = bucket
2342    ///     .put_object_with_content_type_and_headers("/test.file", content, "text/plain", Some(headers))?;
2343    /// #
2344    /// # Ok(())
2345    /// # }
2346    /// ```
2347    #[maybe_async::maybe_async]
2348    pub async fn put_object_with_content_type_and_headers<S: AsRef<str>>(
2349        &self,
2350        path: S,
2351        content: &[u8],
2352        content_type: &str,
2353        custom_headers: Option<HeaderMap>,
2354    ) -> Result<ResponseData, S3Error> {
2355        let command = Command::PutObject {
2356            content,
2357            content_type,
2358            custom_headers,
2359            multipart: None,
2360        };
2361        let request = RequestImpl::new(self, path.as_ref(), command).await?;
2362        request.response_data(true).await
2363    }
2364
2365    /// Put into an S3 bucket, with custom headers for the request.
2366    ///
2367    /// # Example:
2368    ///
2369    /// ```no_run
2370    /// use s3::bucket::Bucket;
2371    /// use s3::creds::Credentials;
2372    /// use anyhow::Result;
2373    ///
2374    /// # #[tokio::main]
2375    /// # async fn main() -> Result<()> {
2376    ///
2377    /// let bucket_name = "rust-s3-test";
2378    /// let region = "us-east-1".parse()?;
2379    /// let credentials = Credentials::default()?;
2380    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
2381    /// let content = "I want to go to S3".as_bytes();
2382    ///
2383    /// let mut headers = http::HeaderMap::new();
2384    /// headers.insert(
2385    ///     http::HeaderName::from_static("cache-control"),
2386    ///     "public, max-age=300".parse().unwrap(),
2387    /// );
2388    ///
2389    /// // Async variant with `tokio` or `async-std` features
2390    /// let response_data = bucket.put_object_with_headers("/test.file", content, Some(headers)).await?;
2391    ///
2392    /// // `sync` feature will produce an identical method
2393    /// #[cfg(feature = "sync")]
2394    /// let response_data = bucket.put_object_with_headers("/test.file", content, Some(headers))?;
2395    ///
2396    /// // Blocking variant, generated with `blocking` feature in combination
2397    /// // with `tokio` or `async-std` features.
2398    /// #[cfg(feature = "blocking")]
2399    /// let response_data = bucket.put_object_with_headers("/test.file", content, Some(headers))?;
2400    /// #
2401    /// # Ok(())
2402    /// # }
2403    /// ```
2404    #[maybe_async::maybe_async]
2405    pub async fn put_object_with_headers<S: AsRef<str>>(
2406        &self,
2407        path: S,
2408        content: &[u8],
2409        custom_headers: Option<HeaderMap>,
2410    ) -> Result<ResponseData, S3Error> {
2411        self.put_object_with_content_type_and_headers(
2412            path,
2413            content,
2414            "application/octet-stream",
2415            custom_headers,
2416        )
2417        .await
2418    }
2419
2420    /// Put into an S3 bucket.
2421    ///
2422    /// # Example:
2423    ///
2424    /// ```no_run
2425    /// use s3::bucket::Bucket;
2426    /// use s3::creds::Credentials;
2427    /// use anyhow::Result;
2428    ///
2429    /// # #[tokio::main]
2430    /// # async fn main() -> Result<()> {
2431    ///
2432    /// let bucket_name = "rust-s3-test";
2433    /// let region = "us-east-1".parse()?;
2434    /// let credentials = Credentials::default()?;
2435    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
2436    /// let content = "I want to go to S3".as_bytes();
2437    ///
2438    /// // Async variant with `tokio` or `async-std` features
2439    /// let response_data = bucket.put_object("/test.file", content).await?;
2440    ///
2441    /// // `sync` feature will produce an identical method
2442    /// #[cfg(feature = "sync")]
2443    /// let response_data = bucket.put_object("/test.file", content)?;
2444    ///
2445    /// // Blocking variant, generated with `blocking` feature in combination
2446    /// // with `tokio` or `async-std` features.
2447    /// #[cfg(feature = "blocking")]
2448    /// let response_data = bucket.put_object_blocking("/test.file", content)?;
2449    /// #
2450    /// # Ok(())
2451    /// # }
2452    /// ```
2453    #[maybe_async::maybe_async]
2454    pub async fn put_object<S: AsRef<str>>(
2455        &self,
2456        path: S,
2457        content: &[u8],
2458    ) -> Result<ResponseData, S3Error> {
2459        self.put_object_with_content_type(path, content, "application/octet-stream")
2460            .await
2461    }
2462
2463    /// Create a builder for PUT object operations with custom options
2464    ///
2465    /// This method returns a builder that allows configuring various options
2466    /// for the PUT operation including headers, content type, and metadata.
2467    ///
2468    /// # Example:
2469    ///
2470    /// ```no_run
2471    /// use s3::bucket::Bucket;
2472    /// use s3::creds::Credentials;
2473    /// use anyhow::Result;
2474    ///
2475    /// # #[tokio::main]
2476    /// # async fn main() -> Result<()> {
2477    ///
2478    /// let bucket = Bucket::new("my-bucket", "us-east-1".parse()?, Credentials::default()?)?;
2479    ///
2480    /// // Upload with custom headers using builder pattern
2481    /// let response = bucket.put_object_builder("/my-file.txt", b"Hello, World!")
2482    ///     .with_content_type("text/plain")
2483    ///     .with_cache_control("public, max-age=3600")?
2484    ///     .with_metadata("author", "john-doe")?
2485    ///     .execute()
2486    ///     .await?;
2487    /// #
2488    /// # Ok(())
2489    /// # }
2490    /// ```
2491    pub fn put_object_builder<S: AsRef<str>>(
2492        &self,
2493        path: S,
2494        content: &[u8],
2495    ) -> crate::put_object_request::PutObjectRequest<'_> {
2496        crate::put_object_request::PutObjectRequest::new(self, path, content)
2497    }
2498
2499    fn _tags_xml<S: AsRef<str>>(&self, tags: &[(S, S)]) -> String {
2500        let mut s = String::new();
2501        let content = tags
2502            .iter()
2503            .map(|(name, value)| {
2504                format!(
2505                    "<Tag><Key>{}</Key><Value>{}</Value></Tag>",
2506                    name.as_ref(),
2507                    value.as_ref()
2508                )
2509            })
2510            .fold(String::new(), |mut a, b| {
2511                a.push_str(b.as_str());
2512                a
2513            });
2514        s.push_str("<Tagging><TagSet>");
2515        s.push_str(&content);
2516        s.push_str("</TagSet></Tagging>");
2517        s
2518    }
2519
2520    /// Tag an S3 object.
2521    ///
2522    /// # Example:
2523    ///
2524    /// ```no_run
2525    /// use s3::bucket::Bucket;
2526    /// use s3::creds::Credentials;
2527    /// use anyhow::Result;
2528    ///
2529    /// # #[tokio::main]
2530    /// # async fn main() -> Result<()> {
2531    ///
2532    /// let bucket_name = "rust-s3-test";
2533    /// let region = "us-east-1".parse()?;
2534    /// let credentials = Credentials::default()?;
2535    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
2536    ///
2537    /// // Async variant with `tokio` or `async-std` features
2538    /// let response_data = bucket.put_object_tagging("/test.file", &[("Tag1", "Value1"), ("Tag2", "Value2")]).await?;
2539    ///
2540    /// // `sync` feature will produce an identical method
2541    /// #[cfg(feature = "sync")]
2542    /// let response_data = bucket.put_object_tagging("/test.file", &[("Tag1", "Value1"), ("Tag2", "Value2")])?;
2543    ///
2544    /// // Blocking variant, generated with `blocking` feature in combination
2545    /// // with `tokio` or `async-std` features.
2546    /// #[cfg(feature = "blocking")]
2547    /// let response_data = bucket.put_object_tagging_blocking("/test.file", &[("Tag1", "Value1"), ("Tag2", "Value2")])?;
2548    /// #
2549    /// # Ok(())
2550    /// # }
2551    /// ```
2552    #[maybe_async::maybe_async]
2553    pub async fn put_object_tagging<S: AsRef<str>>(
2554        &self,
2555        path: &str,
2556        tags: &[(S, S)],
2557    ) -> Result<ResponseData, S3Error> {
2558        let content = self._tags_xml(tags);
2559        let command = Command::PutObjectTagging { tags: &content };
2560        let request = RequestImpl::new(self, path, command).await?;
2561        request.response_data(false).await
2562    }
2563
2564    /// Delete tags from an S3 object.
2565    ///
2566    /// # Example:
2567    ///
2568    /// ```no_run
2569    /// use s3::bucket::Bucket;
2570    /// use s3::creds::Credentials;
2571    /// use anyhow::Result;
2572    ///
2573    /// # #[tokio::main]
2574    /// # async fn main() -> Result<()> {
2575    ///
2576    /// let bucket_name = "rust-s3-test";
2577    /// let region = "us-east-1".parse()?;
2578    /// let credentials = Credentials::default()?;
2579    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
2580    ///
2581    /// // Async variant with `tokio` or `async-std` features
2582    /// let response_data = bucket.delete_object_tagging("/test.file").await?;
2583    ///
2584    /// // `sync` feature will produce an identical method
2585    /// #[cfg(feature = "sync")]
2586    /// let response_data = bucket.delete_object_tagging("/test.file")?;
2587    ///
2588    /// // Blocking variant, generated with `blocking` feature in combination
2589    /// // with `tokio` or `async-std` features.
2590    /// #[cfg(feature = "blocking")]
2591    /// let response_data = bucket.delete_object_tagging_blocking("/test.file")?;
2592    /// #
2593    /// # Ok(())
2594    /// # }
2595    /// ```
2596    #[maybe_async::maybe_async]
2597    pub async fn delete_object_tagging<S: AsRef<str>>(
2598        &self,
2599        path: S,
2600    ) -> Result<ResponseData, S3Error> {
2601        let command = Command::DeleteObjectTagging;
2602        let request = RequestImpl::new(self, path.as_ref(), command).await?;
2603        request.response_data(false).await
2604    }
2605
2606    /// Retrieve an S3 object list of tags.
2607    ///
2608    /// # Example:
2609    ///
2610    /// ```no_run
2611    /// use s3::bucket::Bucket;
2612    /// use s3::creds::Credentials;
2613    /// use anyhow::Result;
2614    ///
2615    /// # #[tokio::main]
2616    /// # async fn main() -> Result<()> {
2617    ///
2618    /// let bucket_name = "rust-s3-test";
2619    /// let region = "us-east-1".parse()?;
2620    /// let credentials = Credentials::default()?;
2621    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
2622    ///
2623    /// // Async variant with `tokio` or `async-std` features
2624    /// let response_data = bucket.get_object_tagging("/test.file").await?;
2625    ///
2626    /// // `sync` feature will produce an identical method
2627    /// #[cfg(feature = "sync")]
2628    /// let response_data = bucket.get_object_tagging("/test.file")?;
2629    ///
2630    /// // Blocking variant, generated with `blocking` feature in combination
2631    /// // with `tokio` or `async-std` features.
2632    /// #[cfg(feature = "blocking")]
2633    /// let response_data = bucket.get_object_tagging_blocking("/test.file")?;
2634    /// #
2635    /// # Ok(())
2636    /// # }
2637    /// ```
2638    #[cfg(feature = "tags")]
2639    #[maybe_async::maybe_async]
2640    pub async fn get_object_tagging<S: AsRef<str>>(
2641        &self,
2642        path: S,
2643    ) -> Result<(Vec<Tag>, u16), S3Error> {
2644        let command = Command::GetObjectTagging {};
2645        let request = RequestImpl::new(self, path.as_ref(), command).await?;
2646        let result = request.response_data(false).await?;
2647
2648        let mut tags = Vec::new();
2649
2650        if result.status_code() == 200 {
2651            let result_string = String::from_utf8_lossy(result.as_slice());
2652
2653            // Add namespace if it doesn't exist
2654            let ns = "http://s3.amazonaws.com/doc/2006-03-01/";
2655            let result_string =
2656                if let Err(minidom::Error::MissingNamespace) = result_string.parse::<Element>() {
2657                    result_string
2658                        .replace("<Tagging>", &format!("<Tagging xmlns=\"{}\">", ns))
2659                        .into()
2660                } else {
2661                    result_string
2662                };
2663
2664            if let Ok(tagging) = result_string.parse::<Element>() {
2665                for tag_set in tagging.children() {
2666                    if tag_set.is("TagSet", ns) {
2667                        for tag in tag_set.children() {
2668                            if tag.is("Tag", ns) {
2669                                let key = if let Some(element) = tag.get_child("Key", ns) {
2670                                    element.text()
2671                                } else {
2672                                    "Could not parse Key from Tag".to_string()
2673                                };
2674                                let value = if let Some(element) = tag.get_child("Value", ns) {
2675                                    element.text()
2676                                } else {
2677                                    "Could not parse Values from Tag".to_string()
2678                                };
2679                                tags.push(Tag { key, value });
2680                            }
2681                        }
2682                    }
2683                }
2684            }
2685        }
2686
2687        Ok((tags, result.status_code()))
2688    }
2689
2690    #[maybe_async::maybe_async]
2691    pub async fn list_page(
2692        &self,
2693        prefix: String,
2694        delimiter: Option<String>,
2695        continuation_token: Option<String>,
2696        start_after: Option<String>,
2697        max_keys: Option<usize>,
2698    ) -> Result<(ListBucketResult, u16), S3Error> {
2699        let command = if self.listobjects_v2 {
2700            Command::ListObjectsV2 {
2701                prefix,
2702                delimiter,
2703                continuation_token,
2704                start_after,
2705                max_keys,
2706            }
2707        } else {
2708            // In the v1 ListObjects request, there is only one "marker"
2709            // field that serves as both the initial starting position,
2710            // and as the continuation token.
2711            Command::ListObjects {
2712                prefix,
2713                delimiter,
2714                marker: std::cmp::max(continuation_token, start_after),
2715                max_keys,
2716            }
2717        };
2718        let request = RequestImpl::new(self, "/", command).await?;
2719        let response_data = request.response_data(false).await?;
2720        let list_bucket_result = quick_xml::de::from_reader(response_data.as_slice())?;
2721
2722        Ok((list_bucket_result, response_data.status_code()))
2723    }
2724
2725    /// List the contents of an S3 bucket.
2726    ///
2727    /// # Example:
2728    ///
2729    /// ```no_run
2730    /// use s3::bucket::Bucket;
2731    /// use s3::creds::Credentials;
2732    /// use anyhow::Result;
2733    ///
2734    /// # #[tokio::main]
2735    /// # async fn main() -> Result<()> {
2736    ///
2737    /// let bucket_name = "rust-s3-test";
2738    /// let region = "us-east-1".parse()?;
2739    /// let credentials = Credentials::default()?;
2740    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
2741    ///
2742    /// // Async variant with `tokio` or `async-std` features
2743    /// let results = bucket.list("/".to_string(), Some("/".to_string())).await?;
2744    ///
2745    /// // `sync` feature will produce an identical method
2746    /// #[cfg(feature = "sync")]
2747    /// let results = bucket.list("/".to_string(), Some("/".to_string()))?;
2748    ///
2749    /// // Blocking variant, generated with `blocking` feature in combination
2750    /// // with `tokio` or `async-std` features.
2751    /// #[cfg(feature = "blocking")]
2752    /// let results = bucket.list_blocking("/".to_string(), Some("/".to_string()))?;
2753    /// #
2754    /// # Ok(())
2755    /// # }
2756    /// ```
2757    #[maybe_async::maybe_async]
2758    #[allow(clippy::assigning_clones)]
2759    pub async fn list(
2760        &self,
2761        prefix: String,
2762        delimiter: Option<String>,
2763    ) -> Result<Vec<ListBucketResult>, S3Error> {
2764        let the_bucket = self.to_owned();
2765        let mut results = Vec::new();
2766        let mut continuation_token = None;
2767
2768        loop {
2769            let (list_bucket_result, _) = the_bucket
2770                .list_page(
2771                    prefix.clone(),
2772                    delimiter.clone(),
2773                    continuation_token,
2774                    None,
2775                    None,
2776                )
2777                .await?;
2778            continuation_token = list_bucket_result.next_continuation_token.clone();
2779            results.push(list_bucket_result);
2780            if continuation_token.is_none() {
2781                break;
2782            }
2783        }
2784
2785        Ok(results)
2786    }
2787
2788    #[maybe_async::maybe_async]
2789    pub async fn list_multiparts_uploads_page(
2790        &self,
2791        prefix: Option<&str>,
2792        delimiter: Option<&str>,
2793        key_marker: Option<String>,
2794        max_uploads: Option<usize>,
2795    ) -> Result<(ListMultipartUploadsResult, u16), S3Error> {
2796        let command = Command::ListMultipartUploads {
2797            prefix,
2798            delimiter,
2799            key_marker,
2800            max_uploads,
2801        };
2802        let request = RequestImpl::new(self, "/", command).await?;
2803        let response_data = request.response_data(false).await?;
2804        let list_bucket_result = quick_xml::de::from_reader(response_data.as_slice())?;
2805
2806        Ok((list_bucket_result, response_data.status_code()))
2807    }
2808
2809    /// List the ongoing multipart uploads of an S3 bucket. This may be useful to cleanup failed
2810    /// uploads, together with [`crate::bucket::Bucket::abort_upload`].
2811    ///
2812    /// # Example:
2813    ///
2814    /// ```no_run
2815    /// use s3::bucket::Bucket;
2816    /// use s3::creds::Credentials;
2817    /// use anyhow::Result;
2818    ///
2819    /// # #[tokio::main]
2820    /// # async fn main() -> Result<()> {
2821    ///
2822    /// let bucket_name = "rust-s3-test";
2823    /// let region = "us-east-1".parse()?;
2824    /// let credentials = Credentials::default()?;
2825    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
2826    ///
2827    /// // Async variant with `tokio` or `async-std` features
2828    /// let results = bucket.list_multiparts_uploads(Some("/"), Some("/")).await?;
2829    ///
2830    /// // `sync` feature will produce an identical method
2831    /// #[cfg(feature = "sync")]
2832    /// let results = bucket.list_multiparts_uploads(Some("/"), Some("/"))?;
2833    ///
2834    /// // Blocking variant, generated with `blocking` feature in combination
2835    /// // with `tokio` or `async-std` features.
2836    /// #[cfg(feature = "blocking")]
2837    /// let results = bucket.list_multiparts_uploads_blocking(Some("/"), Some("/"))?;
2838    /// #
2839    /// # Ok(())
2840    /// # }
2841    /// ```
2842    #[maybe_async::maybe_async]
2843    #[allow(clippy::assigning_clones)]
2844    pub async fn list_multiparts_uploads(
2845        &self,
2846        prefix: Option<&str>,
2847        delimiter: Option<&str>,
2848    ) -> Result<Vec<ListMultipartUploadsResult>, S3Error> {
2849        let the_bucket = self.to_owned();
2850        let mut results = Vec::new();
2851        let mut next_marker: Option<String> = None;
2852
2853        loop {
2854            let (list_multiparts_uploads_result, _) = the_bucket
2855                .list_multiparts_uploads_page(prefix, delimiter, next_marker, None)
2856                .await?;
2857
2858            let is_truncated = list_multiparts_uploads_result.is_truncated;
2859
2860            next_marker = list_multiparts_uploads_result.next_marker.clone();
2861            results.push(list_multiparts_uploads_result);
2862
2863            if !is_truncated {
2864                break;
2865            }
2866        }
2867
2868        Ok(results)
2869    }
2870
2871    /// Abort a running multipart upload.
2872    ///
2873    /// # Example:
2874    ///
2875    /// ```no_run
2876    /// use s3::bucket::Bucket;
2877    /// use s3::creds::Credentials;
2878    /// use anyhow::Result;
2879    ///
2880    /// # #[tokio::main]
2881    /// # async fn main() -> Result<()> {
2882    ///
2883    /// let bucket_name = "rust-s3-test";
2884    /// let region = "us-east-1".parse()?;
2885    /// let credentials = Credentials::default()?;
2886    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
2887    ///
2888    /// // Async variant with `tokio` or `async-std` features
2889    /// let results = bucket.abort_upload("/some/file.txt", "ZDFjM2I0YmEtMzU3ZC00OTQ1LTlkNGUtMTgxZThjYzIwNjA2").await?;
2890    ///
2891    /// // `sync` feature will produce an identical method
2892    /// #[cfg(feature = "sync")]
2893    /// let results = bucket.abort_upload("/some/file.txt", "ZDFjM2I0YmEtMzU3ZC00OTQ1LTlkNGUtMTgxZThjYzIwNjA2")?;
2894    ///
2895    /// // Blocking variant, generated with `blocking` feature in combination
2896    /// // with `tokio` or `async-std` features.
2897    /// #[cfg(feature = "blocking")]
2898    /// let results = bucket.abort_upload_blocking("/some/file.txt", "ZDFjM2I0YmEtMzU3ZC00OTQ1LTlkNGUtMTgxZThjYzIwNjA2")?;
2899    /// #
2900    /// # Ok(())
2901    /// # }
2902    /// ```
2903    #[maybe_async::maybe_async]
2904    pub async fn abort_upload(&self, key: &str, upload_id: &str) -> Result<(), S3Error> {
2905        let abort = Command::AbortMultipartUpload { upload_id };
2906        let abort_request = RequestImpl::new(self, key, abort).await?;
2907        let response_data = abort_request.response_data(false).await?;
2908
2909        if (200..300).contains(&response_data.status_code()) {
2910            Ok(())
2911        } else {
2912            let utf8_content = String::from_utf8(response_data.as_slice().to_vec())?;
2913            Err(S3Error::HttpFailWithBody(
2914                response_data.status_code(),
2915                utf8_content,
2916            ))
2917        }
2918    }
2919
2920    /// Get path_style field of the Bucket struct
2921    pub fn is_path_style(&self) -> bool {
2922        self.path_style
2923    }
2924
2925    /// Get negated path_style field of the Bucket struct
2926    pub fn is_subdomain_style(&self) -> bool {
2927        !self.path_style
2928    }
2929
2930    /// Configure bucket to use path-style urls and headers
2931    pub fn set_path_style(&mut self) {
2932        self.path_style = true;
2933    }
2934
2935    /// Configure bucket to use subdomain style urls and headers \[default\]
2936    pub fn set_subdomain_style(&mut self) {
2937        self.path_style = false;
2938    }
2939
2940    /// Configure bucket to apply this request timeout to all HTTP
2941    /// requests, or no (infinity) timeout if `None`.  Defaults to
2942    /// 30 seconds.
2943    ///
2944    /// Only the [`attohttpc`] and the [`hyper`] backends obey this option;
2945    /// async code may instead await with a timeout.
2946    pub fn set_request_timeout(&mut self, timeout: Option<Duration>) {
2947        self.request_timeout = timeout;
2948    }
2949
2950    /// Configure bucket to use the older ListObjects API
2951    ///
2952    /// If your provider doesn't support the ListObjectsV2 interface, set this to
2953    /// use the v1 ListObjects interface instead. This is currently needed at least
2954    /// for Google Cloud Storage.
2955    pub fn set_listobjects_v1(&mut self) {
2956        self.listobjects_v2 = false;
2957    }
2958
2959    /// Configure bucket to use the newer ListObjectsV2 API
2960    pub fn set_listobjects_v2(&mut self) {
2961        self.listobjects_v2 = true;
2962    }
2963
2964    /// Get a reference to the name of the S3 bucket.
2965    pub fn name(&self) -> String {
2966        self.name.to_string()
2967    }
2968
2969    // Get a reference to the hostname of the S3 API endpoint.
2970    pub fn host(&self) -> String {
2971        if self.path_style {
2972            self.path_style_host()
2973        } else {
2974            self.subdomain_style_host()
2975        }
2976    }
2977
2978    pub fn url(&self) -> String {
2979        if self.path_style {
2980            format!(
2981                "{}://{}/{}",
2982                self.scheme(),
2983                self.path_style_host(),
2984                self.name()
2985            )
2986        } else {
2987            format!("{}://{}", self.scheme(), self.subdomain_style_host())
2988        }
2989    }
2990
2991    /// Get a paths-style reference to the hostname of the S3 API endpoint.
2992    pub fn path_style_host(&self) -> String {
2993        self.region.host()
2994    }
2995
2996    pub fn subdomain_style_host(&self) -> String {
2997        format!("{}.{}", self.name, self.region.host())
2998    }
2999
3000    // pub fn self_host(&self) -> String {
3001    //     format!("{}.{}", self.name, self.region.host())
3002    // }
3003
3004    pub fn scheme(&self) -> String {
3005        self.region.scheme()
3006    }
3007
3008    /// Get the region this object will connect to.
3009    pub fn region(&self) -> Region {
3010        self.region.clone()
3011    }
3012
3013    /// Get a reference to the AWS access key.
3014    #[maybe_async::maybe_async]
3015    pub async fn access_key(&self) -> Result<Option<String>, S3Error> {
3016        Ok(self.credentials().await?.access_key)
3017    }
3018
3019    /// Get a reference to the AWS secret key.
3020    #[maybe_async::maybe_async]
3021    pub async fn secret_key(&self) -> Result<Option<String>, S3Error> {
3022        Ok(self.credentials().await?.secret_key)
3023    }
3024
3025    /// Get a reference to the AWS security token.
3026    #[maybe_async::maybe_async]
3027    pub async fn security_token(&self) -> Result<Option<String>, S3Error> {
3028        Ok(self.credentials().await?.security_token)
3029    }
3030
3031    /// Get a reference to the AWS session token.
3032    #[maybe_async::maybe_async]
3033    pub async fn session_token(&self) -> Result<Option<String>, S3Error> {
3034        Ok(self.credentials().await?.session_token)
3035    }
3036
3037    /// Get a reference to the full [`Credentials`](struct.Credentials.html)
3038    /// object used by this `Bucket`.
3039    #[maybe_async::async_impl]
3040    pub async fn credentials(&self) -> Result<Credentials, S3Error> {
3041        Ok(self.credentials.read().await.clone())
3042    }
3043
3044    #[maybe_async::sync_impl]
3045    pub fn credentials(&self) -> Result<Credentials, S3Error> {
3046        match self.credentials.read() {
3047            Ok(credentials) => Ok(credentials.clone()),
3048            Err(_) => Err(S3Error::CredentialsReadLock),
3049        }
3050    }
3051
3052    /// Change the credentials used by the Bucket.
3053    pub fn set_credentials(&mut self, credentials: Credentials) {
3054        self.credentials = Arc::new(RwLock::new(credentials));
3055    }
3056
3057    /// Add an extra header to send with requests to S3.
3058    ///
3059    /// Add an extra header to send with requests. Note that the library
3060    /// already sets a number of headers - headers set with this method will be
3061    /// overridden by the library headers:
3062    ///   * Host
3063    ///   * Content-Type
3064    ///   * Date
3065    ///   * Content-Length
3066    ///   * Authorization
3067    ///   * X-Amz-Content-Sha256
3068    ///   * X-Amz-Date
3069    pub fn add_header(&mut self, key: &str, value: &str) {
3070        self.extra_headers
3071            .insert(HeaderName::from_str(key).unwrap(), value.parse().unwrap());
3072    }
3073
3074    /// Get a reference to the extra headers to be passed to the S3 API.
3075    pub fn extra_headers(&self) -> &HeaderMap {
3076        &self.extra_headers
3077    }
3078
3079    /// Get a mutable reference to the extra headers to be passed to the S3
3080    /// API.
3081    pub fn extra_headers_mut(&mut self) -> &mut HeaderMap {
3082        &mut self.extra_headers
3083    }
3084
3085    /// Add an extra query pair to the URL used for S3 API access.
3086    pub fn add_query(&mut self, key: &str, value: &str) {
3087        self.extra_query.insert(key.into(), value.into());
3088    }
3089
3090    /// Get a reference to the extra query pairs to be passed to the S3 API.
3091    pub fn extra_query(&self) -> &Query {
3092        &self.extra_query
3093    }
3094
3095    /// Get a mutable reference to the extra query pairs to be passed to the S3
3096    /// API.
3097    pub fn extra_query_mut(&mut self) -> &mut Query {
3098        &mut self.extra_query
3099    }
3100
3101    pub fn request_timeout(&self) -> Option<Duration> {
3102        self.request_timeout
3103    }
3104}
3105
3106#[cfg(test)]
3107mod test {
3108
3109    use crate::BucketConfiguration;
3110    use crate::Tag;
3111    use crate::creds::Credentials;
3112    use crate::post_policy::{PostPolicyField, PostPolicyValue};
3113    use crate::region::Region;
3114    use crate::serde_types::{
3115        BucketLifecycleConfiguration, CorsConfiguration, CorsRule, Expiration, LifecycleFilter,
3116        LifecycleRule,
3117    };
3118    use crate::{Bucket, PostPolicy};
3119    use http::header::{CACHE_CONTROL, HeaderMap, HeaderName, HeaderValue};
3120    use std::env;
3121    #[cfg(all(not(feature = "sync"), feature = "with-tokio"))]
3122    use std::io::{Read, Write};
3123    #[cfg(all(not(feature = "sync"), feature = "with-tokio"))]
3124    use std::net::TcpListener;
3125    #[cfg(all(not(feature = "sync"), feature = "with-tokio"))]
3126    use std::sync::{
3127        Arc,
3128        atomic::{AtomicUsize, Ordering},
3129    };
3130    #[cfg(all(not(feature = "sync"), feature = "with-tokio"))]
3131    use std::thread;
3132
3133    fn init() {
3134        let _ = env_logger::builder().is_test(true).try_init();
3135    }
3136
3137    #[cfg(all(not(feature = "sync"), feature = "with-tokio"))]
3138    #[tokio::test]
3139    async fn test_object_exists_404_does_not_retry() {
3140        init();
3141
3142        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
3143        let endpoint = format!("http://{}", listener.local_addr().unwrap());
3144        let requests = Arc::new(AtomicUsize::new(0));
3145        let request_count = Arc::clone(&requests);
3146
3147        let server = thread::spawn(move || {
3148            let (mut stream, _) = listener.accept().unwrap();
3149            request_count.fetch_add(1, Ordering::SeqCst);
3150
3151            let mut buffer = [0; 2048];
3152            let _ = stream.read(&mut buffer).unwrap();
3153            stream
3154                .write_all(
3155                    b"HTTP/1.1 404 Not Found\r\nContent-Length: 0\r\nConnection: close\r\n\r\n",
3156                )
3157                .unwrap();
3158        });
3159
3160        crate::set_retries(1);
3161
3162        let credentials = Credentials::new(
3163            Some("test_access_key"),
3164            Some("test_secret_key"),
3165            None,
3166            None,
3167            None,
3168        )
3169        .unwrap();
3170        let bucket = Bucket::new(
3171            "test-bucket",
3172            Region::Custom {
3173                region: "us-east-1".to_owned(),
3174                endpoint,
3175            },
3176            credentials,
3177        )
3178        .unwrap()
3179        .with_path_style();
3180
3181        let exists = bucket.object_exists("/missing.txt").await.unwrap();
3182
3183        crate::set_retries(1);
3184        server.join().unwrap();
3185
3186        assert!(!exists);
3187        assert_eq!(requests.load(Ordering::SeqCst), 1);
3188    }
3189
3190    #[test]
3191    #[cfg(any(feature = "tokio-native-tls", feature = "tokio-rustls-tls"))]
3192    #[allow(deprecated)]
3193    fn dangerous_config_correct_spelling_and_compat_alias_set_same_options() {
3194        let bucket = Bucket::new(
3195            "test-bucket",
3196            Region::Custom {
3197                region: "test-region".to_owned(),
3198                endpoint: "https://example.com".to_owned(),
3199            },
3200            Credentials::anonymous().unwrap(),
3201        )
3202        .unwrap();
3203
3204        let corrected = bucket.set_dangerous_config(true, true).unwrap();
3205        let deprecated_alias = bucket.set_dangereous_config(true, true).unwrap();
3206
3207        assert!(corrected.client_options.accept_invalid_certs);
3208        assert!(corrected.client_options.accept_invalid_hostnames);
3209        assert_eq!(
3210            corrected.client_options.accept_invalid_certs,
3211            deprecated_alias.client_options.accept_invalid_certs
3212        );
3213        assert_eq!(
3214            corrected.client_options.accept_invalid_hostnames,
3215            deprecated_alias.client_options.accept_invalid_hostnames
3216        );
3217    }
3218    fn test_aws_credentials() -> Credentials {
3219        Credentials::new(
3220            Some(&env::var("EU_AWS_ACCESS_KEY_ID").unwrap()),
3221            Some(&env::var("EU_AWS_SECRET_ACCESS_KEY").unwrap()),
3222            None,
3223            None,
3224            None,
3225        )
3226        .unwrap()
3227    }
3228
3229    fn test_gc_credentials() -> Credentials {
3230        Credentials::new(
3231            Some(&env::var("GC_ACCESS_KEY_ID").unwrap()),
3232            Some(&env::var("GC_SECRET_ACCESS_KEY").unwrap()),
3233            None,
3234            None,
3235            None,
3236        )
3237        .unwrap()
3238    }
3239
3240    fn test_wasabi_credentials() -> Credentials {
3241        Credentials::new(
3242            Some(&env::var("WASABI_ACCESS_KEY_ID").unwrap()),
3243            Some(&env::var("WASABI_SECRET_ACCESS_KEY").unwrap()),
3244            None,
3245            None,
3246            None,
3247        )
3248        .unwrap()
3249    }
3250
3251    fn test_minio_credentials() -> Credentials {
3252        Credentials::new(
3253            Some(&env::var("MINIO_ACCESS_KEY_ID").unwrap()),
3254            Some(&env::var("MINIO_SECRET_ACCESS_KEY").unwrap()),
3255            None,
3256            None,
3257            None,
3258        )
3259        .unwrap()
3260    }
3261
3262    fn test_digital_ocean_credentials() -> Credentials {
3263        Credentials::new(
3264            Some(&env::var("DIGITAL_OCEAN_ACCESS_KEY_ID").unwrap()),
3265            Some(&env::var("DIGITAL_OCEAN_SECRET_ACCESS_KEY").unwrap()),
3266            None,
3267            None,
3268            None,
3269        )
3270        .unwrap()
3271    }
3272
3273    fn test_r2_credentials() -> Credentials {
3274        Credentials::new(
3275            Some(&env::var("R2_ACCESS_KEY_ID").unwrap()),
3276            Some(&env::var("R2_SECRET_ACCESS_KEY").unwrap()),
3277            None,
3278            None,
3279            None,
3280        )
3281        .unwrap()
3282    }
3283
3284    fn test_aws_bucket() -> Box<Bucket> {
3285        Bucket::new(
3286            "rust-s3-test",
3287            "eu-central-1".parse().unwrap(),
3288            test_aws_credentials(),
3289        )
3290        .unwrap()
3291    }
3292
3293    fn test_wasabi_bucket() -> Box<Bucket> {
3294        Bucket::new(
3295            "rust-s3",
3296            "wa-eu-central-1".parse().unwrap(),
3297            test_wasabi_credentials(),
3298        )
3299        .unwrap()
3300    }
3301
3302    fn test_gc_bucket() -> Box<Bucket> {
3303        let mut bucket = Bucket::new(
3304            "rust-s3",
3305            Region::Custom {
3306                region: "us-east1".to_owned(),
3307                endpoint: "https://storage.googleapis.com".to_owned(),
3308            },
3309            test_gc_credentials(),
3310        )
3311        .unwrap();
3312        bucket.set_listobjects_v1();
3313        bucket
3314    }
3315
3316    fn test_minio_bucket() -> Box<Bucket> {
3317        Bucket::new(
3318            "rust-s3",
3319            Region::Custom {
3320                region: "us-east-1".to_owned(),
3321                endpoint: "http://localhost:9000".to_owned(),
3322            },
3323            test_minio_credentials(),
3324        )
3325        .unwrap()
3326        .with_path_style()
3327    }
3328
3329    /// Bucket with hardcoded fake credentials for tests that only exercise
3330    /// local signing logic and never hit the network.
3331    fn test_presign_bucket() -> Box<Bucket> {
3332        Bucket::new(
3333            "rust-s3",
3334            Region::Custom {
3335                region: "us-east-1".to_owned(),
3336                endpoint: "http://localhost:9000".to_owned(),
3337            },
3338            Credentials::new(
3339                Some("test_access_key"),
3340                Some("test_secret_key"),
3341                None,
3342                None,
3343                None,
3344            )
3345            .unwrap(),
3346        )
3347        .unwrap()
3348        .with_path_style()
3349    }
3350
3351    #[allow(dead_code)]
3352    fn test_digital_ocean_bucket() -> Box<Bucket> {
3353        Bucket::new("rust-s3", Region::DoFra1, test_digital_ocean_credentials()).unwrap()
3354    }
3355
3356    fn test_r2_bucket() -> Box<Bucket> {
3357        Bucket::new(
3358            "rust-s3",
3359            Region::R2 {
3360                account_id: "f048f3132be36fa1aaa8611992002b3f".to_string(),
3361            },
3362            test_r2_credentials(),
3363        )
3364        .unwrap()
3365    }
3366
3367    fn object(size: u32) -> Vec<u8> {
3368        (0..size).map(|_| 33).collect()
3369    }
3370
3371    #[maybe_async::maybe_async]
3372    async fn put_head_get_delete_object(bucket: Bucket, head: bool) {
3373        let s3_path = "/+test.file";
3374        let non_existant_path = "/+non_existant.file";
3375        let test: Vec<u8> = object(3072);
3376
3377        let response_data = bucket.put_object(s3_path, &test).await.unwrap();
3378        assert_eq!(response_data.status_code(), 200);
3379
3380        // let attributes = bucket
3381        //     .get_object_attributes(s3_path, "904662384344", None)
3382        //     .await
3383        //     .unwrap();
3384
3385        let response_data = bucket.get_object(s3_path).await.unwrap();
3386        assert_eq!(response_data.status_code(), 200);
3387        assert_eq!(test, response_data.as_slice());
3388
3389        let exists = bucket.object_exists(s3_path).await.unwrap();
3390        assert!(exists);
3391
3392        let not_exists = bucket.object_exists(non_existant_path).await.unwrap();
3393        assert!(!not_exists);
3394
3395        let response_data = bucket
3396            .get_object_range(s3_path, 100, Some(1000))
3397            .await
3398            .unwrap();
3399        assert_eq!(response_data.status_code(), 206);
3400        assert_eq!(test[100..1001].to_vec(), response_data.as_slice());
3401
3402        // Test single-byte range read (start == end)
3403        let response_data = bucket
3404            .get_object_range(s3_path, 100, Some(100))
3405            .await
3406            .unwrap();
3407        assert_eq!(response_data.status_code(), 206);
3408        assert_eq!(vec![test[100]], response_data.as_slice());
3409
3410        if head {
3411            let (_head_object_result, code) = bucket.head_object(s3_path).await.unwrap();
3412            // println!("{:?}", head_object_result);
3413            assert_eq!(code, 200);
3414        }
3415
3416        // println!("{:?}", head_object_result);
3417        let response_data = bucket.delete_object(s3_path).await.unwrap();
3418        assert_eq!(response_data.status_code(), 204);
3419    }
3420
3421    #[maybe_async::maybe_async]
3422    async fn put_head_delete_object_with_headers(bucket: Bucket) {
3423        let s3_path = "/+test.file";
3424        let non_existant_path = "/+non_existant.file";
3425        let test: Vec<u8> = object(3072);
3426        let header_value = "max-age=42";
3427
3428        let mut custom_headers = HeaderMap::new();
3429        custom_headers.insert(CACHE_CONTROL, HeaderValue::from_static(header_value));
3430        custom_headers.insert(
3431            HeaderName::from_static("test-key"),
3432            "value".parse().unwrap(),
3433        );
3434
3435        let response_data = bucket
3436            .put_object_with_headers(s3_path, &test, Some(custom_headers.clone()))
3437            .await
3438            .expect("Put object with custom headers failed");
3439        assert_eq!(response_data.status_code(), 200);
3440
3441        let response_data = bucket.get_object(s3_path).await.unwrap();
3442        assert_eq!(response_data.status_code(), 200);
3443        assert_eq!(test, response_data.as_slice());
3444
3445        let exists = bucket.object_exists(s3_path).await.unwrap();
3446        assert!(exists);
3447
3448        let not_exists = bucket.object_exists(non_existant_path).await.unwrap();
3449        assert!(!not_exists);
3450
3451        let response_data = bucket
3452            .get_object_range(s3_path, 100, Some(1000))
3453            .await
3454            .unwrap();
3455        assert_eq!(response_data.status_code(), 206);
3456        assert_eq!(test[100..1001].to_vec(), response_data.as_slice());
3457
3458        let (head_object_result, code) = bucket.head_object(s3_path).await.unwrap();
3459        // println!("{:?}", head_object_result);
3460        assert_eq!(code, 200);
3461        assert_eq!(
3462            head_object_result.cache_control,
3463            Some(header_value.to_string())
3464        );
3465
3466        let response_data = bucket.delete_object(s3_path).await.unwrap();
3467        assert_eq!(response_data.status_code(), 204);
3468    }
3469
3470    #[ignore]
3471    #[cfg(feature = "tags")]
3472    #[maybe_async::test(
3473        feature = "sync",
3474        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3475        async(
3476            all(not(feature = "sync"), feature = "with-async-std"),
3477            async_std::test
3478        )
3479    )]
3480    async fn test_tagging_aws() {
3481        let bucket = test_aws_bucket();
3482        let _target_tags = vec![
3483            Tag {
3484                key: "Tag1".to_string(),
3485                value: "Value1".to_string(),
3486            },
3487            Tag {
3488                key: "Tag2".to_string(),
3489                value: "Value2".to_string(),
3490            },
3491        ];
3492        let empty_tags: Vec<Tag> = Vec::new();
3493        let response_data = bucket
3494            .put_object("tagging_test", b"Gimme tags")
3495            .await
3496            .unwrap();
3497        assert_eq!(response_data.status_code(), 200);
3498        let (tags, _code) = bucket.get_object_tagging("tagging_test").await.unwrap();
3499        assert_eq!(tags, empty_tags);
3500        let response_data = bucket
3501            .put_object_tagging("tagging_test", &[("Tag1", "Value1"), ("Tag2", "Value2")])
3502            .await
3503            .unwrap();
3504        assert_eq!(response_data.status_code(), 200);
3505        // This could be eventually consistent now
3506        let (_tags, _code) = bucket.get_object_tagging("tagging_test").await.unwrap();
3507        // assert_eq!(tags, target_tags)
3508        let _response_data = bucket.delete_object("tagging_test").await.unwrap();
3509    }
3510
3511    #[ignore]
3512    #[cfg(feature = "tags")]
3513    #[maybe_async::test(
3514        feature = "sync",
3515        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3516        async(
3517            all(not(feature = "sync"), feature = "with-async-std"),
3518            async_std::test
3519        )
3520    )]
3521    async fn test_tagging_minio() {
3522        let bucket = test_minio_bucket();
3523        let _target_tags = vec![
3524            Tag {
3525                key: "Tag1".to_string(),
3526                value: "Value1".to_string(),
3527            },
3528            Tag {
3529                key: "Tag2".to_string(),
3530                value: "Value2".to_string(),
3531            },
3532        ];
3533        let empty_tags: Vec<Tag> = Vec::new();
3534        let response_data = bucket
3535            .put_object("tagging_test", b"Gimme tags")
3536            .await
3537            .unwrap();
3538        assert_eq!(response_data.status_code(), 200);
3539        let (tags, _code) = bucket.get_object_tagging("tagging_test").await.unwrap();
3540        assert_eq!(tags, empty_tags);
3541        let response_data = bucket
3542            .put_object_tagging("tagging_test", &[("Tag1", "Value1"), ("Tag2", "Value2")])
3543            .await
3544            .unwrap();
3545        assert_eq!(response_data.status_code(), 200);
3546        // This could be eventually consistent now
3547        let (_tags, _code) = bucket.get_object_tagging("tagging_test").await.unwrap();
3548        // assert_eq!(tags, target_tags)
3549        let _response_data = bucket.delete_object("tagging_test").await.unwrap();
3550    }
3551
3552    #[ignore]
3553    #[maybe_async::test(
3554        feature = "sync",
3555        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3556        async(
3557            all(not(feature = "sync"), feature = "with-async-std"),
3558            async_std::test
3559        )
3560    )]
3561    async fn streaming_big_aws_put_head_get_delete_object() {
3562        streaming_test_put_get_delete_big_object(*test_aws_bucket()).await;
3563    }
3564
3565    #[ignore]
3566    #[maybe_async::test(
3567        feature = "sync",
3568        async(
3569            all(
3570                not(feature = "sync"),
3571                not(feature = "tokio-rustls-tls"),
3572                feature = "with-tokio"
3573            ),
3574            tokio::test
3575        ),
3576        async(
3577            all(not(feature = "sync"), feature = "with-async-std"),
3578            async_std::test
3579        )
3580    )]
3581    async fn streaming_big_gc_put_head_get_delete_object() {
3582        streaming_test_put_get_delete_big_object(*test_gc_bucket()).await;
3583    }
3584
3585    #[ignore]
3586    #[maybe_async::test(
3587        feature = "sync",
3588        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3589        async(
3590            all(not(feature = "sync"), feature = "with-async-std"),
3591            async_std::test
3592        )
3593    )]
3594    async fn streaming_big_minio_put_head_get_delete_object() {
3595        streaming_test_put_get_delete_big_object(*test_minio_bucket()).await;
3596    }
3597
3598    #[ignore]
3599    #[maybe_async::test(
3600        feature = "sync",
3601        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3602        async(
3603            all(not(feature = "sync"), feature = "with-async-std"),
3604            async_std::test
3605        )
3606    )]
3607    async fn streaming_big_r2_put_head_get_delete_object() {
3608        streaming_test_put_get_delete_big_object(*test_r2_bucket()).await;
3609    }
3610
3611    // Test multi-part upload
3612    #[maybe_async::maybe_async]
3613    async fn streaming_test_put_get_delete_big_object(bucket: Bucket) {
3614        #[cfg(feature = "with-async-std")]
3615        use async_std::fs::File;
3616        #[cfg(feature = "with-async-std")]
3617        use async_std::io::WriteExt;
3618        #[cfg(feature = "with-async-std")]
3619        use async_std::stream::StreamExt;
3620        #[cfg(feature = "with-tokio")]
3621        use futures_util::StreamExt;
3622        #[cfg(not(any(feature = "with-tokio", feature = "with-async-std")))]
3623        use std::fs::File;
3624        #[cfg(not(any(feature = "with-tokio", feature = "with-async-std")))]
3625        use std::io::Write;
3626        #[cfg(feature = "with-tokio")]
3627        use tokio::fs::File;
3628        #[cfg(feature = "with-tokio")]
3629        use tokio::io::AsyncWriteExt;
3630
3631        init();
3632        let remote_path = "+stream_test_big";
3633        let local_path = "+stream_test_big";
3634        std::fs::remove_file(remote_path).unwrap_or(());
3635        let content: Vec<u8> = object(20_000_000);
3636
3637        let mut file = File::create(local_path).await.unwrap();
3638        file.write_all(&content).await.unwrap();
3639        file.flush().await.unwrap();
3640        let mut reader = File::open(local_path).await.unwrap();
3641
3642        let response = bucket
3643            .put_object_stream(&mut reader, remote_path)
3644            .await
3645            .unwrap();
3646        #[cfg(not(feature = "sync"))]
3647        assert_eq!(response.status_code(), 200);
3648        #[cfg(feature = "sync")]
3649        assert_eq!(response, 200);
3650        let mut writer = Vec::new();
3651        let code = bucket
3652            .get_object_to_writer(remote_path, &mut writer)
3653            .await
3654            .unwrap();
3655        assert_eq!(code, 200);
3656        // assert_eq!(content, writer);
3657        assert_eq!(content.len(), writer.len());
3658        assert_eq!(content.len(), 20_000_000);
3659
3660        #[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
3661        {
3662            let mut response_data_stream = bucket.get_object_stream(remote_path).await.unwrap();
3663
3664            let mut bytes = vec![];
3665
3666            while let Some(chunk) = response_data_stream.bytes().next().await {
3667                bytes.push(chunk)
3668            }
3669            assert_ne!(bytes.len(), 0);
3670        }
3671
3672        let response_data = bucket.delete_object(remote_path).await.unwrap();
3673        assert_eq!(response_data.status_code(), 204);
3674        std::fs::remove_file(local_path).unwrap_or(());
3675    }
3676
3677    #[ignore]
3678    #[maybe_async::test(
3679        feature = "sync",
3680        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3681        async(
3682            all(not(feature = "sync"), feature = "with-async-std"),
3683            async_std::test
3684        )
3685    )]
3686    async fn streaming_aws_put_head_get_delete_object() {
3687        streaming_test_put_get_delete_small_object(test_aws_bucket()).await;
3688    }
3689
3690    #[ignore]
3691    #[maybe_async::test(
3692        feature = "sync",
3693        async(
3694            all(
3695                not(feature = "sync"),
3696                not(feature = "tokio-rustls-tls"),
3697                feature = "with-tokio"
3698            ),
3699            tokio::test
3700        ),
3701        async(
3702            all(not(feature = "sync"), feature = "with-async-std"),
3703            async_std::test
3704        )
3705    )]
3706    async fn streaming_gc_put_head_get_delete_object() {
3707        streaming_test_put_get_delete_small_object(test_gc_bucket()).await;
3708    }
3709
3710    #[ignore]
3711    #[maybe_async::test(
3712        feature = "sync",
3713        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3714        async(
3715            all(not(feature = "sync"), feature = "with-async-std"),
3716            async_std::test
3717        )
3718    )]
3719    async fn streaming_r2_put_head_get_delete_object() {
3720        streaming_test_put_get_delete_small_object(test_r2_bucket()).await;
3721    }
3722
3723    #[ignore]
3724    #[maybe_async::test(
3725        feature = "sync",
3726        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3727        async(
3728            all(not(feature = "sync"), feature = "with-async-std"),
3729            async_std::test
3730        )
3731    )]
3732    async fn streaming_minio_put_head_get_delete_object() {
3733        streaming_test_put_get_delete_small_object(test_minio_bucket()).await;
3734    }
3735
3736    #[maybe_async::maybe_async]
3737    async fn streaming_test_put_get_delete_small_object(bucket: Box<Bucket>) {
3738        init();
3739        let remote_path = "+stream_test_small";
3740        let content: Vec<u8> = object(1000);
3741        #[cfg(feature = "with-tokio")]
3742        let mut reader = std::io::Cursor::new(&content);
3743        #[cfg(feature = "with-async-std")]
3744        let mut reader = async_std::io::Cursor::new(&content);
3745        #[cfg(feature = "sync")]
3746        let mut reader = std::io::Cursor::new(&content);
3747
3748        let response = bucket
3749            .put_object_stream(&mut reader, remote_path)
3750            .await
3751            .unwrap();
3752        #[cfg(not(feature = "sync"))]
3753        assert_eq!(response.status_code(), 200);
3754        #[cfg(feature = "sync")]
3755        assert_eq!(response, 200);
3756        let mut writer = Vec::new();
3757        let code = bucket
3758            .get_object_to_writer(remote_path, &mut writer)
3759            .await
3760            .unwrap();
3761        assert_eq!(code, 200);
3762        assert_eq!(content, writer);
3763
3764        let response_data = bucket.delete_object(remote_path).await.unwrap();
3765        assert_eq!(response_data.status_code(), 204);
3766    }
3767
3768    #[cfg(feature = "blocking")]
3769    fn put_head_get_list_delete_object_blocking(bucket: Bucket) {
3770        let s3_path = "/test_blocking.file";
3771        let s3_path_2 = "/test_blocking.file2";
3772        let s3_path_3 = "/test_blocking.file3";
3773        let test: Vec<u8> = object(3072);
3774
3775        // Test PutObject
3776        let response_data = bucket.put_object_blocking(s3_path, &test).unwrap();
3777        assert_eq!(response_data.status_code(), 200);
3778
3779        // Test GetObject
3780        let response_data = bucket.get_object_blocking(s3_path).unwrap();
3781        assert_eq!(response_data.status_code(), 200);
3782        assert_eq!(test, response_data.as_slice());
3783
3784        // Test GetObject with a range
3785        let response_data = bucket
3786            .get_object_range_blocking(s3_path, 100, Some(1000))
3787            .unwrap();
3788        assert_eq!(response_data.status_code(), 206);
3789        assert_eq!(test[100..1001].to_vec(), response_data.as_slice());
3790
3791        // Test single-byte range read (start == end)
3792        let response_data = bucket
3793            .get_object_range_blocking(s3_path, 100, Some(100))
3794            .unwrap();
3795        assert_eq!(response_data.status_code(), 206);
3796        assert_eq!(vec![test[100]], response_data.as_slice());
3797
3798        // Test HeadObject
3799        let (head_object_result, code) = bucket.head_object_blocking(s3_path).unwrap();
3800        assert_eq!(code, 200);
3801        assert_eq!(
3802            head_object_result.content_type.unwrap(),
3803            "application/octet-stream".to_owned()
3804        );
3805        // println!("{:?}", head_object_result);
3806
3807        // Put some additional objects, so that we can test ListObjects
3808        let response_data = bucket.put_object_blocking(s3_path_2, &test).unwrap();
3809        assert_eq!(response_data.status_code(), 200);
3810        let response_data = bucket.put_object_blocking(s3_path_3, &test).unwrap();
3811        assert_eq!(response_data.status_code(), 200);
3812
3813        // Test ListObjects, with continuation
3814        let (result, code) = bucket
3815            .list_page_blocking(
3816                "test_blocking.".to_string(),
3817                Some("/".to_string()),
3818                None,
3819                None,
3820                Some(2),
3821            )
3822            .unwrap();
3823        assert_eq!(code, 200);
3824        assert_eq!(result.contents.len(), 2);
3825        assert_eq!(result.contents[0].key, s3_path[1..]);
3826        assert_eq!(result.contents[1].key, s3_path_2[1..]);
3827
3828        let cont_token = result.next_continuation_token.unwrap();
3829
3830        let (result, code) = bucket
3831            .list_page_blocking(
3832                "test_blocking.".to_string(),
3833                Some("/".to_string()),
3834                Some(cont_token),
3835                None,
3836                Some(2),
3837            )
3838            .unwrap();
3839        assert_eq!(code, 200);
3840        assert_eq!(result.contents.len(), 1);
3841        assert_eq!(result.contents[0].key, s3_path_3[1..]);
3842        assert!(result.next_continuation_token.is_none());
3843
3844        // cleanup (and test Delete)
3845        let response_data = bucket.delete_object_blocking(s3_path).unwrap();
3846        assert_eq!(code, 200);
3847        let response_data = bucket.delete_object_blocking(s3_path_2).unwrap();
3848        assert_eq!(code, 200);
3849        let response_data = bucket.delete_object_blocking(s3_path_3).unwrap();
3850        assert_eq!(code, 200);
3851    }
3852
3853    #[ignore]
3854    #[cfg(all(
3855        any(feature = "with-tokio", feature = "with-async-std"),
3856        feature = "blocking"
3857    ))]
3858    #[test]
3859    fn aws_put_head_get_delete_object_blocking() {
3860        put_head_get_list_delete_object_blocking(*test_aws_bucket())
3861    }
3862
3863    #[ignore]
3864    #[cfg(all(
3865        any(feature = "with-tokio", feature = "with-async-std"),
3866        feature = "blocking"
3867    ))]
3868    #[test]
3869    fn gc_put_head_get_delete_object_blocking() {
3870        put_head_get_list_delete_object_blocking(*test_gc_bucket())
3871    }
3872
3873    #[ignore]
3874    #[cfg(all(
3875        any(feature = "with-tokio", feature = "with-async-std"),
3876        feature = "blocking"
3877    ))]
3878    #[test]
3879    fn wasabi_put_head_get_delete_object_blocking() {
3880        put_head_get_list_delete_object_blocking(*test_wasabi_bucket())
3881    }
3882
3883    #[ignore]
3884    #[cfg(all(
3885        any(feature = "with-tokio", feature = "with-async-std"),
3886        feature = "blocking"
3887    ))]
3888    #[test]
3889    fn minio_put_head_get_delete_object_blocking() {
3890        put_head_get_list_delete_object_blocking(*test_minio_bucket())
3891    }
3892
3893    #[ignore]
3894    #[cfg(all(
3895        any(feature = "with-tokio", feature = "with-async-std"),
3896        feature = "blocking"
3897    ))]
3898    #[test]
3899    fn digital_ocean_put_head_get_delete_object_blocking() {
3900        put_head_get_list_delete_object_blocking(*test_digital_ocean_bucket())
3901    }
3902
3903    #[ignore]
3904    #[maybe_async::test(
3905        feature = "sync",
3906        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3907        async(
3908            all(not(feature = "sync"), feature = "with-async-std"),
3909            async_std::test
3910        )
3911    )]
3912    async fn aws_put_head_get_delete_object() {
3913        put_head_get_delete_object(*test_aws_bucket(), true).await;
3914        put_head_delete_object_with_headers(*test_aws_bucket()).await;
3915    }
3916
3917    #[ignore]
3918    #[maybe_async::test(
3919        feature = "sync",
3920        async(
3921            all(
3922                not(any(feature = "sync", feature = "tokio-rustls-tls")),
3923                feature = "with-tokio"
3924            ),
3925            tokio::test
3926        ),
3927        async(
3928            all(not(feature = "sync"), feature = "with-async-std"),
3929            async_std::test
3930        )
3931    )]
3932    async fn gc_test_put_head_get_delete_object() {
3933        put_head_get_delete_object(*test_gc_bucket(), true).await;
3934        put_head_delete_object_with_headers(*test_gc_bucket()).await;
3935    }
3936
3937    #[ignore]
3938    #[maybe_async::test(
3939        feature = "sync",
3940        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3941        async(
3942            all(not(feature = "sync"), feature = "with-async-std"),
3943            async_std::test
3944        )
3945    )]
3946    async fn wasabi_test_put_head_get_delete_object() {
3947        put_head_get_delete_object(*test_wasabi_bucket(), true).await;
3948        put_head_delete_object_with_headers(*test_wasabi_bucket()).await;
3949    }
3950
3951    #[ignore]
3952    #[maybe_async::test(
3953        feature = "sync",
3954        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3955        async(
3956            all(not(feature = "sync"), feature = "with-async-std"),
3957            async_std::test
3958        )
3959    )]
3960    async fn minio_test_put_head_get_delete_object() {
3961        put_head_get_delete_object(*test_minio_bucket(), true).await;
3962        put_head_delete_object_with_headers(*test_minio_bucket()).await;
3963    }
3964
3965    // Keeps failing on tokio-rustls-tls
3966    // #[ignore]
3967    // #[maybe_async::test(
3968    //     feature = "sync",
3969    //     async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3970    //     async(
3971    //         all(not(feature = "sync"), feature = "with-async-std"),
3972    //         async_std::test
3973    //     )
3974    // )]
3975    // async fn digital_ocean_test_put_head_get_delete_object() {
3976    //     put_head_get_delete_object(test_digital_ocean_bucket(), true).await;
3977    // }
3978
3979    #[ignore]
3980    #[maybe_async::test(
3981        feature = "sync",
3982        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3983        async(
3984            all(not(feature = "sync"), feature = "with-async-std"),
3985            async_std::test
3986        )
3987    )]
3988    async fn r2_test_put_head_get_delete_object() {
3989        put_head_get_delete_object(*test_r2_bucket(), false).await;
3990        put_head_delete_object_with_headers(*test_r2_bucket()).await;
3991    }
3992
3993    #[maybe_async::maybe_async]
3994    async fn put_delete_objects(bucket: Bucket) {
3995        use crate::serde_types::ObjectIdentifier;
3996
3997        let paths = [
3998            "/+bulk_delete_1.file",
3999            "/+bulk_delete_2.file",
4000            "/+bulk_delete_3.file",
4001        ];
4002        let test: Vec<u8> = object(128);
4003
4004        // Put test objects
4005        for path in &paths {
4006            let response_data = bucket.put_object(*path, &test).await.unwrap();
4007            assert_eq!(response_data.status_code(), 200);
4008        }
4009
4010        // Bulk delete them
4011        let objects: Vec<ObjectIdentifier> =
4012            paths.iter().map(|p| ObjectIdentifier::new(*p)).collect();
4013        let result = bucket.delete_objects(objects).await.unwrap();
4014
4015        assert_eq!(result.deleted.len(), 3);
4016        assert!(result.errors.is_empty());
4017
4018        // Verify they are gone
4019        for path in &paths {
4020            let exists = bucket.object_exists(*path).await.unwrap();
4021            assert!(!exists);
4022        }
4023    }
4024
4025    #[ignore]
4026    #[maybe_async::test(
4027        feature = "sync",
4028        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
4029        async(
4030            all(not(feature = "sync"), feature = "with-async-std"),
4031            async_std::test
4032        )
4033    )]
4034    async fn aws_test_delete_objects() {
4035        put_delete_objects(*test_aws_bucket()).await;
4036    }
4037
4038    #[ignore]
4039    #[maybe_async::test(
4040        feature = "sync",
4041        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
4042        async(
4043            all(not(feature = "sync"), feature = "with-async-std"),
4044            async_std::test
4045        )
4046    )]
4047    async fn minio_test_delete_objects() {
4048        put_delete_objects(*test_minio_bucket()).await;
4049    }
4050
4051    #[maybe_async::test(
4052        feature = "sync",
4053        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
4054        async(
4055            all(not(feature = "sync"), feature = "with-async-std"),
4056            async_std::test
4057        )
4058    )]
4059    async fn test_presign_put() {
4060        let s3_path = "/test/test.file";
4061        let bucket = test_presign_bucket();
4062
4063        let mut custom_headers = HeaderMap::new();
4064        custom_headers.insert(
4065            HeaderName::from_static("custom_header"),
4066            "custom_value".parse().unwrap(),
4067        );
4068
4069        let url = bucket
4070            .presign_put(s3_path, 86400, Some(custom_headers), None)
4071            .await
4072            .unwrap();
4073
4074        assert!(url.contains("custom_header%3Bhost"));
4075        assert!(url.contains("/test/test.file"))
4076    }
4077
4078    #[maybe_async::test(
4079        feature = "sync",
4080        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
4081        async(
4082            all(not(feature = "sync"), feature = "with-async-std"),
4083            async_std::test
4084        )
4085    )]
4086    async fn test_presign_post() {
4087        use std::borrow::Cow;
4088
4089        let bucket = test_presign_bucket();
4090
4091        // Policy from sample
4092        let policy = PostPolicy::new(86400)
4093            .condition(
4094                PostPolicyField::Key,
4095                PostPolicyValue::StartsWith(Cow::from("user/user1/")),
4096            )
4097            .unwrap();
4098
4099        let data = bucket.presign_post(policy).await.unwrap();
4100
4101        assert_eq!(data.url, "http://localhost:9000/rust-s3");
4102        assert_eq!(data.fields.len(), 6);
4103        assert_eq!(data.dynamic_fields.len(), 1);
4104    }
4105
4106    #[maybe_async::test(
4107        feature = "sync",
4108        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
4109        async(
4110            all(not(feature = "sync"), feature = "with-async-std"),
4111            async_std::test
4112        )
4113    )]
4114    async fn test_presign_get() {
4115        let s3_path = "/test/test.file";
4116        let bucket = test_presign_bucket();
4117
4118        let url = bucket.presign_get(s3_path, 86400, None).await.unwrap();
4119        assert!(url.contains("/test/test.file?"))
4120    }
4121
4122    #[maybe_async::test(
4123        feature = "sync",
4124        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
4125        async(
4126            all(not(feature = "sync"), feature = "with-async-std"),
4127            async_std::test
4128        )
4129    )]
4130    async fn test_presign_delete() {
4131        let s3_path = "/test/test.file";
4132        let bucket = test_presign_bucket();
4133
4134        let url = bucket.presign_delete(s3_path, 86400).await.unwrap();
4135        assert!(url.contains("/test/test.file?"))
4136    }
4137
4138    #[maybe_async::test(
4139        feature = "sync",
4140        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
4141        async(
4142            all(not(feature = "sync"), feature = "with-async-std"),
4143            async_std::test
4144        )
4145    )]
4146    async fn test_presign_url_standard_ports() {
4147        // Test that presigned URLs preserve standard ports in the host header
4148        // This is crucial for signature validation
4149
4150        // Test with HTTP standard port 80
4151        let region_http_80 = Region::Custom {
4152            region: "eu-central-1".to_owned(),
4153            endpoint: "http://minio:80".to_owned(),
4154        };
4155        let credentials = Credentials::new(
4156            Some("test_access_key"),
4157            Some("test_secret_key"),
4158            None,
4159            None,
4160            None,
4161        )
4162        .unwrap();
4163        let bucket_http_80 = Bucket::new("test-bucket", region_http_80, credentials.clone())
4164            .unwrap()
4165            .with_path_style();
4166
4167        let presigned_url_80 = bucket_http_80
4168            .presign_get("/test.file", 3600, None)
4169            .await
4170            .unwrap();
4171        println!("Presigned URL with port 80: {}", presigned_url_80);
4172
4173        // Port 80 MUST be preserved in the URL for signature validation
4174        assert!(
4175            presigned_url_80.starts_with("http://minio:80/"),
4176            "URL must preserve port 80, got: {}",
4177            presigned_url_80
4178        );
4179
4180        // Test with HTTPS standard port 443
4181        let region_https_443 = Region::Custom {
4182            region: "eu-central-1".to_owned(),
4183            endpoint: "https://minio:443".to_owned(),
4184        };
4185        let bucket_https_443 = Bucket::new("test-bucket", region_https_443, credentials.clone())
4186            .unwrap()
4187            .with_path_style();
4188
4189        let presigned_url_443 = bucket_https_443
4190            .presign_get("/test.file", 3600, None)
4191            .await
4192            .unwrap();
4193        println!("Presigned URL with port 443: {}", presigned_url_443);
4194
4195        // Port 443 MUST be preserved in the URL for signature validation
4196        assert!(
4197            presigned_url_443.starts_with("https://minio:443/"),
4198            "URL must preserve port 443, got: {}",
4199            presigned_url_443
4200        );
4201
4202        // Test with non-standard port (should always include port)
4203        let region_http_9000 = Region::Custom {
4204            region: "eu-central-1".to_owned(),
4205            endpoint: "http://minio:9000".to_owned(),
4206        };
4207        let bucket_http_9000 = Bucket::new("test-bucket", region_http_9000, credentials)
4208            .unwrap()
4209            .with_path_style();
4210
4211        let presigned_url_9000 = bucket_http_9000
4212            .presign_get("/test.file", 3600, None)
4213            .await
4214            .unwrap();
4215        assert!(
4216            presigned_url_9000.contains("minio:9000"),
4217            "Non-standard port should be preserved in URL"
4218        );
4219    }
4220
4221    #[maybe_async::test(
4222        feature = "sync",
4223        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
4224        async(
4225            all(not(feature = "sync"), feature = "with-async-std"),
4226            async_std::test
4227        )
4228    )]
4229    #[ignore]
4230    async fn test_bucket_create_delete_default_region() {
4231        let config = BucketConfiguration::default();
4232        let response = Bucket::create(
4233            &uuid::Uuid::new_v4().to_string(),
4234            "us-east-1".parse().unwrap(),
4235            test_aws_credentials(),
4236            config,
4237        )
4238        .await
4239        .unwrap();
4240
4241        assert_eq!(&response.response_text, "");
4242
4243        assert_eq!(response.response_code, 200);
4244
4245        let response_code = response.bucket.delete().await.unwrap();
4246        assert!(response_code < 300);
4247    }
4248
4249    #[ignore]
4250    #[maybe_async::test(
4251        feature = "sync",
4252        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
4253        async(
4254            all(not(feature = "sync"), feature = "with-async-std"),
4255            async_std::test
4256        )
4257    )]
4258    async fn test_bucket_create_delete_non_default_region() {
4259        let config = BucketConfiguration::default();
4260        let response = Bucket::create(
4261            &uuid::Uuid::new_v4().to_string(),
4262            "eu-central-1".parse().unwrap(),
4263            test_aws_credentials(),
4264            config,
4265        )
4266        .await
4267        .unwrap();
4268
4269        assert_eq!(&response.response_text, "");
4270
4271        assert_eq!(response.response_code, 200);
4272
4273        let response_code = response.bucket.delete().await.unwrap();
4274        assert!(response_code < 300);
4275    }
4276
4277    #[ignore]
4278    #[maybe_async::test(
4279        feature = "sync",
4280        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
4281        async(
4282            all(not(feature = "sync"), feature = "with-async-std"),
4283            async_std::test
4284        )
4285    )]
4286    async fn test_bucket_create_delete_non_default_region_public() {
4287        let config = BucketConfiguration::public();
4288        let response = Bucket::create(
4289            &uuid::Uuid::new_v4().to_string(),
4290            "eu-central-1".parse().unwrap(),
4291            test_aws_credentials(),
4292            config,
4293        )
4294        .await
4295        .unwrap();
4296
4297        assert_eq!(&response.response_text, "");
4298
4299        assert_eq!(response.response_code, 200);
4300
4301        let response_code = response.bucket.delete().await.unwrap();
4302        assert!(response_code < 300);
4303    }
4304
4305    #[test]
4306    fn test_tag_has_key_and_value_functions() {
4307        let key = "key".to_owned();
4308        let value = "value".to_owned();
4309        let tag = Tag { key, value };
4310        assert_eq!["key", tag.key()];
4311        assert_eq!["value", tag.value()];
4312    }
4313
4314    #[test]
4315    #[ignore]
4316    fn test_builder_composition() {
4317        use std::time::Duration;
4318
4319        let bucket = Bucket::new(
4320            "test-bucket",
4321            "eu-central-1".parse().unwrap(),
4322            test_aws_credentials(),
4323        )
4324        .unwrap()
4325        .with_request_timeout(Duration::from_secs(10))
4326        .unwrap();
4327
4328        assert_eq!(bucket.request_timeout(), Some(Duration::from_secs(10)));
4329    }
4330
4331    #[maybe_async::test(
4332        feature = "sync",
4333        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
4334        async(
4335            all(not(feature = "sync"), feature = "with-async-std"),
4336            async_std::test
4337        )
4338    )]
4339    #[ignore]
4340    async fn test_bucket_cors() {
4341        let bucket = test_aws_bucket();
4342        let rule = CorsRule::new(
4343            None,
4344            vec!["GET".to_string()],
4345            vec!["*".to_string()],
4346            None,
4347            None,
4348            None,
4349        );
4350        let expected_bucket_owner = "904662384344";
4351        let cors_config = CorsConfiguration::new(vec![rule]);
4352        let response = bucket
4353            .put_bucket_cors(expected_bucket_owner, &cors_config)
4354            .await
4355            .unwrap();
4356        assert_eq!(response.status_code(), 200);
4357
4358        let cors_response = bucket.get_bucket_cors(expected_bucket_owner).await.unwrap();
4359        assert_eq!(cors_response, cors_config);
4360
4361        let response = bucket
4362            .delete_bucket_cors(expected_bucket_owner)
4363            .await
4364            .unwrap();
4365        assert_eq!(response.status_code(), 204);
4366    }
4367
4368    #[maybe_async::test(
4369        feature = "sync",
4370        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
4371        async(
4372            all(not(feature = "sync"), feature = "with-async-std"),
4373            async_std::test
4374        )
4375    )]
4376    #[ignore]
4377    async fn test_bucket_lifecycle() {
4378        let bucket = test_aws_bucket();
4379
4380        // Create a simple lifecycle rule that expires objects with prefix "test/" after 1 day
4381        let rule = LifecycleRule::builder("Enabled")
4382            .id("test-rule")
4383            .filter(LifecycleFilter {
4384                prefix: Some("test/".to_string()),
4385                ..Default::default()
4386            })
4387            .expiration(Expiration {
4388                days: Some(1),
4389                ..Default::default()
4390            })
4391            .build();
4392
4393        let lifecycle_config = BucketLifecycleConfiguration::new(vec![rule]);
4394
4395        // Test put_bucket_lifecycle
4396        let response = bucket
4397            .put_bucket_lifecycle(lifecycle_config.clone())
4398            .await
4399            .unwrap();
4400        assert_eq!(response.status_code(), 200);
4401
4402        // Test get_bucket_lifecycle
4403        let retrieved_config = bucket.get_bucket_lifecycle().await.unwrap();
4404        assert_eq!(retrieved_config.rules.len(), 1);
4405        assert_eq!(retrieved_config.rules[0].id, Some("test-rule".to_string()));
4406        assert_eq!(retrieved_config.rules[0].status, "Enabled");
4407
4408        // Test delete_bucket_lifecycle
4409        let response = bucket.delete_bucket_lifecycle().await.unwrap();
4410        assert_eq!(response.status_code(), 204);
4411    }
4412
4413    #[ignore]
4414    #[cfg(any(feature = "tokio-native-tls", feature = "tokio-rustls-tls"))]
4415    #[maybe_async::test(
4416        feature = "sync",
4417        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
4418        async(
4419            all(not(feature = "sync"), feature = "with-async-std"),
4420            async_std::test
4421        )
4422    )]
4423    async fn test_bucket_exists_with_dangerous_config() {
4424        init();
4425
4426        // This test verifies that Bucket::exists() honors the dangerous SSL config
4427        // which allows connections with invalid SSL certificates
4428
4429        // Create a bucket with dangerous config enabled
4430        // Note: This test requires a test environment with self-signed or invalid certs
4431        // For CI, we'll test with a regular bucket but verify the config is preserved
4432
4433        let credentials = test_aws_credentials();
4434        let region = "eu-central-1".parse().unwrap();
4435        let bucket_name = "rust-s3-test";
4436
4437        // Create bucket with dangerous config
4438        let bucket = Bucket::new(bucket_name, region, credentials)
4439            .unwrap()
4440            .with_path_style();
4441
4442        // Set dangerous config (allow invalid certs, allow invalid hostnames)
4443        let bucket = bucket.set_dangerous_config(true, true).unwrap();
4444
4445        // Test that exists() works with the dangerous config
4446        // This should not panic or fail due to SSL certificate issues
4447        let exists_result = bucket.exists().await;
4448
4449        // The bucket should exist (assuming test bucket is set up)
4450        assert!(
4451            exists_result.is_ok(),
4452            "Bucket::exists() failed with dangerous config"
4453        );
4454        let exists = exists_result.unwrap();
4455        assert!(exists, "Test bucket should exist");
4456
4457        // Verify that the dangerous config is preserved in the cloned bucket
4458        // by checking if we can perform other operations
4459        let list_result = bucket.list("".to_string(), Some("/".to_string())).await;
4460        assert!(
4461            list_result.is_ok(),
4462            "List operation should work with dangerous config"
4463        );
4464    }
4465
4466    #[ignore]
4467    #[maybe_async::test(
4468        feature = "sync",
4469        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
4470        async(
4471            all(not(feature = "sync"), feature = "with-async-std"),
4472            async_std::test
4473        )
4474    )]
4475    async fn test_bucket_exists_without_dangerous_config() {
4476        init();
4477
4478        // This test verifies normal behavior without dangerous config
4479        let credentials = test_aws_credentials();
4480        let region = "eu-central-1".parse().unwrap();
4481        let bucket_name = "rust-s3-test";
4482
4483        // Create bucket without dangerous config
4484        let bucket = Bucket::new(bucket_name, region, credentials)
4485            .unwrap()
4486            .with_path_style();
4487
4488        // Test that exists() works normally
4489        let exists_result = bucket.exists().await;
4490        assert!(
4491            exists_result.is_ok(),
4492            "Bucket::exists() should work without dangerous config"
4493        );
4494        let exists = exists_result.unwrap();
4495        assert!(exists, "Test bucket should exist");
4496    }
4497}