s3/
bucket.rs

1//! # Rust S3 Bucket Operations
2//!
3//! This module provides functionality for interacting with S3 buckets and objects,
4//! including creating, listing, uploading, downloading, and deleting objects. It supports
5//! various features such as asynchronous and blocking operations, multipart uploads,
6//! presigned URLs, and tagging objects.
7//!
8//! ## Features
9//!
10//! The module supports the following features:
11//!
12//! - **blocking**: Enables blocking (synchronous) operations using the `block_on` macro.
13//! - **tags**: Adds support for managing S3 object tags.
14//! - **with-tokio**: Enables asynchronous operations using the Tokio runtime.
15//! - **with-async-std**: Enables asynchronous operations using the async-std runtime.
16//! - **sync**: Enables synchronous (blocking) operations using standard Rust synchronization primitives.
17//!
18//! ## Constants
19//!
20//! - `CHUNK_SIZE`: Defines the chunk size for multipart uploads (8 MiB).
21//! - `DEFAULT_REQUEST_TIMEOUT`: The default request timeout (60 seconds).
22//!
23//! ## Types
24//!
25//! - `Query`: A type alias for `HashMap<String, String>`, representing query parameters for requests.
26//!
27//! ## Structs
28//!
29//! - `Bucket`: Represents an S3 bucket, providing methods to interact with the bucket and its contents.
30//! - `Tag`: Represents a key-value pair used for tagging S3 objects.
31//!
32//! ## Errors
33//!
34//! - `S3Error`: Represents various errors that can occur during S3 operations.
35
36#[cfg(feature = "blocking")]
37use block_on_proc::block_on;
38#[cfg(feature = "tags")]
39use minidom::Element;
40use std::collections::HashMap;
41use std::time::Duration;
42
43use crate::bucket_ops::{BucketConfiguration, CreateBucketResponse};
44use crate::command::{Command, Multipart};
45use crate::creds::Credentials;
46use crate::region::Region;
47#[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
48use crate::request::ResponseDataStream;
49#[cfg(feature = "with-tokio")]
50use crate::request::tokio_backend::ClientOptions;
51#[cfg(feature = "with-tokio")]
52use crate::request::tokio_backend::client;
53use crate::request::{Request as _, ResponseData};
54use std::str::FromStr;
55use std::sync::Arc;
56
57#[cfg(feature = "with-tokio")]
58use tokio::sync::RwLock;
59
60#[cfg(feature = "with-async-std")]
61use async_std::sync::RwLock;
62
63#[cfg(feature = "sync")]
64use std::sync::RwLock;
65
66pub type Query = HashMap<String, String>;
67
68#[cfg(feature = "with-async-std")]
69use crate::request::async_std_backend::SurfRequest as RequestImpl;
70#[cfg(feature = "with-tokio")]
71use crate::request::tokio_backend::ReqwestRequest as RequestImpl;
72
73#[cfg(feature = "with-async-std")]
74use async_std::io::Write as AsyncWrite;
75#[cfg(feature = "with-tokio")]
76use tokio::io::AsyncWrite;
77
78#[cfg(feature = "sync")]
79use crate::request::blocking::AttoRequest as RequestImpl;
80use std::io::Read;
81
82#[cfg(feature = "with-tokio")]
83use tokio::io::AsyncRead;
84
85#[cfg(feature = "with-async-std")]
86use async_std::io::Read as AsyncRead;
87
88use crate::PostPolicy;
89use crate::error::S3Error;
90use crate::post_policy::PresignedPost;
91use crate::serde_types::{
92    BucketLifecycleConfiguration, BucketLocationResult, CompleteMultipartUploadData,
93    CorsConfiguration, GetObjectAttributesOutput, HeadObjectResult,
94    InitiateMultipartUploadResponse, ListBucketResult, ListMultipartUploadsResult, Part,
95};
96#[allow(unused_imports)]
97use crate::utils::{PutStreamResponse, error_from_response_data};
98use http::HeaderMap;
99use http::header::HeaderName;
100#[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
101use sysinfo::{MemoryRefreshKind, System};
102
103pub const CHUNK_SIZE: usize = 8_388_608; // 8 Mebibytes, min is 5 (5_242_880);
104
105const DEFAULT_REQUEST_TIMEOUT: Option<Duration> = Some(Duration::from_secs(60));
106
107#[derive(Debug, PartialEq, Eq)]
108pub struct Tag {
109    key: String,
110    value: String,
111}
112
113impl Tag {
114    pub fn key(&self) -> String {
115        self.key.to_owned()
116    }
117
118    pub fn value(&self) -> String {
119        self.value.to_owned()
120    }
121}
122
123/// Instantiate an existing Bucket
124///
125/// # Example
126///
127/// ```no_run
128/// use s3::bucket::Bucket;
129/// use s3::creds::Credentials;
130///
131/// let bucket_name = "rust-s3-test";
132/// let region = "us-east-1".parse().unwrap();
133/// let credentials = Credentials::default().unwrap();
134///
135/// let bucket = Bucket::new(bucket_name, region, credentials);
136/// ```
137#[derive(Clone, Debug)]
138pub struct Bucket {
139    pub name: String,
140    pub region: Region,
141    credentials: Arc<RwLock<Credentials>>,
142    pub extra_headers: HeaderMap,
143    pub extra_query: Query,
144    pub request_timeout: Option<Duration>,
145    path_style: bool,
146    listobjects_v2: bool,
147    #[cfg(feature = "with-tokio")]
148    http_client: reqwest::Client,
149    #[cfg(feature = "with-tokio")]
150    client_options: crate::request::tokio_backend::ClientOptions,
151}
152
153impl Bucket {
154    #[maybe_async::async_impl]
155    /// Credential refreshing is done automatically, but can be manually triggered.
156    pub async fn credentials_refresh(&self) -> Result<(), S3Error> {
157        Ok(self.credentials.write().await.refresh()?)
158    }
159
160    #[maybe_async::sync_impl]
161    /// Credential refreshing is done automatically, but can be manually triggered.
162    pub fn credentials_refresh(&self) -> Result<(), S3Error> {
163        match self.credentials.write() {
164            Ok(mut credentials) => Ok(credentials.refresh()?),
165            Err(_) => Err(S3Error::CredentialsWriteLock),
166        }
167    }
168
169    #[cfg(feature = "with-tokio")]
170    pub fn http_client(&self) -> reqwest::Client {
171        self.http_client.clone()
172    }
173}
174
175fn validate_expiry(expiry_secs: u32) -> Result<(), S3Error> {
176    if 604800 < expiry_secs {
177        return Err(S3Error::MaxExpiry(expiry_secs));
178    }
179    Ok(())
180}
181
182#[cfg_attr(all(feature = "with-tokio", feature = "blocking"), block_on("tokio"))]
183#[cfg_attr(
184    all(feature = "with-async-std", feature = "blocking"),
185    block_on("async-std")
186)]
187impl Bucket {
188    /// Get a presigned url for getting object on a given path
189    ///
190    /// # Example:
191    ///
192    /// ```no_run
193    /// use std::collections::HashMap;
194    /// use s3::bucket::Bucket;
195    /// use s3::creds::Credentials;
196    ///
197    /// #[tokio::main]
198    /// async fn main() {
199    /// let bucket_name = "rust-s3-test";
200    /// let region = "us-east-1".parse().unwrap();
201    /// let credentials = Credentials::default().unwrap();
202    /// let bucket = Bucket::new(bucket_name, region, credentials).unwrap();
203    ///
204    /// // Add optional custom queries
205    /// let mut custom_queries = HashMap::new();
206    /// custom_queries.insert(
207    ///    "response-content-disposition".into(),
208    ///    "attachment; filename=\"test.png\"".into(),
209    /// );
210    ///
211    /// let url = bucket.presign_get("/test.file", 86400, Some(custom_queries)).await.unwrap();
212    /// println!("Presigned url: {}", url);
213    /// }
214    /// ```
215    #[maybe_async::maybe_async]
216    pub async fn presign_get<S: AsRef<str>>(
217        &self,
218        path: S,
219        expiry_secs: u32,
220        custom_queries: Option<HashMap<String, String>>,
221    ) -> Result<String, S3Error> {
222        validate_expiry(expiry_secs)?;
223        let request = RequestImpl::new(
224            self,
225            path.as_ref(),
226            Command::PresignGet {
227                expiry_secs,
228                custom_queries,
229            },
230        )
231        .await?;
232        request.presigned().await
233    }
234
235    /// Get a presigned url for posting an object to a given path
236    ///
237    /// # Example:
238    ///
239    /// ```no_run
240    /// use s3::bucket::Bucket;
241    /// use s3::creds::Credentials;
242    /// use s3::post_policy::*;
243    /// use std::borrow::Cow;
244    ///
245    /// #[tokio::main]
246    /// async fn main() {
247    /// let bucket_name = "rust-s3-test";
248    /// let region = "us-east-1".parse().unwrap();
249    /// let credentials = Credentials::default().unwrap();
250    /// let bucket = Bucket::new(bucket_name, region, credentials).unwrap();
251    ///
252    /// let post_policy = PostPolicy::new(86400).condition(
253    ///     PostPolicyField::Key,
254    ///     PostPolicyValue::StartsWith(Cow::from("user/user1/"))
255    /// ).unwrap();
256    ///
257    /// let presigned_post = bucket.presign_post(post_policy).await.unwrap();
258    /// println!("Presigned url: {}, fields: {:?}", presigned_post.url, presigned_post.fields);
259    /// }
260    /// ```
261    #[maybe_async::maybe_async]
262    #[allow(clippy::needless_lifetimes)]
263    pub async fn presign_post<'a>(
264        &self,
265        post_policy: PostPolicy<'a>,
266    ) -> Result<PresignedPost, S3Error> {
267        post_policy.sign(Box::new(self.clone())).await
268    }
269
270    /// Get a presigned url for putting object to a given path
271    ///
272    /// # Example:
273    ///
274    /// ```no_run
275    /// use s3::bucket::Bucket;
276    /// use s3::creds::Credentials;
277    /// use http::HeaderMap;
278    /// use http::header::HeaderName;
279    /// #[tokio::main]
280    /// async fn main() {
281    /// let bucket_name = "rust-s3-test";
282    /// let region = "us-east-1".parse().unwrap();
283    /// let credentials = Credentials::default().unwrap();
284    /// let bucket = Bucket::new(bucket_name, region, credentials).unwrap();
285    ///
286    /// // Add optional custom headers
287    /// let mut custom_headers = HeaderMap::new();
288    /// custom_headers.insert(
289    ///    HeaderName::from_static("custom_header"),
290    ///    "custom_value".parse().unwrap(),
291    /// );
292    ///
293    /// let url = bucket.presign_put("/test.file", 86400, Some(custom_headers), None).await.unwrap();
294    /// println!("Presigned url: {}", url);
295    /// }
296    /// ```
297    #[maybe_async::maybe_async]
298    pub async fn presign_put<S: AsRef<str>>(
299        &self,
300        path: S,
301        expiry_secs: u32,
302        custom_headers: Option<HeaderMap>,
303        custom_queries: Option<HashMap<String, String>>,
304    ) -> Result<String, S3Error> {
305        validate_expiry(expiry_secs)?;
306        let request = RequestImpl::new(
307            self,
308            path.as_ref(),
309            Command::PresignPut {
310                expiry_secs,
311                custom_headers,
312                custom_queries,
313            },
314        )
315        .await?;
316        request.presigned().await
317    }
318
319    /// Get a presigned url for deleting object on a given path
320    ///
321    /// # Example:
322    ///
323    /// ```no_run
324    /// use s3::bucket::Bucket;
325    /// use s3::creds::Credentials;
326    ///
327    ///
328    /// #[tokio::main]
329    /// async fn main() {
330    /// let bucket_name = "rust-s3-test";
331    /// let region = "us-east-1".parse().unwrap();
332    /// let credentials = Credentials::default().unwrap();
333    /// let bucket = Bucket::new(bucket_name, region, credentials).unwrap();
334    ///
335    /// let url = bucket.presign_delete("/test.file", 86400).await.unwrap();
336    /// println!("Presigned url: {}", url);
337    /// }
338    /// ```
339    #[maybe_async::maybe_async]
340    pub async fn presign_delete<S: AsRef<str>>(
341        &self,
342        path: S,
343        expiry_secs: u32,
344    ) -> Result<String, S3Error> {
345        validate_expiry(expiry_secs)?;
346        let request =
347            RequestImpl::new(self, path.as_ref(), Command::PresignDelete { expiry_secs }).await?;
348        request.presigned().await
349    }
350
351    /// Create a new `Bucket` and instantiate it
352    ///
353    /// ```no_run
354    /// use s3::{Bucket, BucketConfiguration};
355    /// use s3::creds::Credentials;
356    /// # use s3::region::Region;
357    /// use anyhow::Result;
358    ///
359    /// # #[tokio::main]
360    /// # async fn main() -> Result<()> {
361    /// let bucket_name = "rust-s3-test";
362    /// let region = "us-east-1".parse()?;
363    /// let credentials = Credentials::default()?;
364    /// let config = BucketConfiguration::default();
365    ///
366    /// // Async variant with `tokio` or `async-std` features
367    /// let create_bucket_response = Bucket::create(bucket_name, region, credentials, config).await?;
368    ///
369    /// // `sync` fature will produce an identical method
370    /// #[cfg(feature = "sync")]
371    /// let create_bucket_response = Bucket::create(bucket_name, region, credentials, config)?;
372    ///
373    /// # let region: Region = "us-east-1".parse()?;
374    /// # let credentials = Credentials::default()?;
375    /// # let config = BucketConfiguration::default();
376    /// // Blocking variant, generated with `blocking` feature in combination
377    /// // with `tokio` or `async-std` features.
378    /// #[cfg(feature = "blocking")]
379    /// let create_bucket_response = Bucket::create_blocking(bucket_name, region, credentials, config)?;
380    /// # Ok(())
381    /// # }
382    /// ```
383    #[maybe_async::maybe_async]
384    pub async fn create(
385        name: &str,
386        region: Region,
387        credentials: Credentials,
388        config: BucketConfiguration,
389    ) -> Result<CreateBucketResponse, S3Error> {
390        let mut config = config;
391
392        // Check if we should skip location constraint for LocalStack/Minio compatibility
393        // This env var allows users to create buckets on S3-compatible services that
394        // don't support or require location constraints in the request body
395        let skip_constraint = std::env::var("RUST_S3_SKIP_LOCATION_CONSTRAINT")
396            .unwrap_or_default()
397            .to_lowercase();
398
399        if skip_constraint != "true" && skip_constraint != "1" {
400            config.set_region(region.clone());
401        }
402
403        let command = Command::CreateBucket { config };
404        let bucket = Bucket::new(name, region, credentials)?;
405        let request = RequestImpl::new(&bucket, "", command).await?;
406        let response_data = request.response_data(false).await?;
407        let response_text = response_data.as_str()?;
408        Ok(CreateBucketResponse {
409            bucket,
410            response_text: response_text.to_string(),
411            response_code: response_data.status_code(),
412        })
413    }
414
415    /// Get a list of all existing buckets in the region
416    /// that are accessible by the given credentials.
417    /// ```no_run
418    /// use s3::{Bucket, BucketConfiguration};
419    /// use s3::creds::Credentials;
420    /// use s3::region::Region;
421    /// use anyhow::Result;
422    ///
423    /// # #[tokio::main]
424    /// # async fn main() -> Result<()> {
425    /// let region = Region::Custom {
426    ///   region: "eu-central-1".to_owned(),
427    ///   endpoint: "http://localhost:9000".to_owned()
428    /// };
429    /// let credentials = Credentials::default()?;
430    ///
431    /// // Async variant with `tokio` or `async-std` features
432    /// let response = Bucket::list_buckets(region, credentials).await?;
433    ///
434    /// // `sync` feature will produce an identical method
435    /// #[cfg(feature = "sync")]
436    /// let response = Bucket::list_buckets(region, credentials)?;
437    ///
438    /// // Blocking variant, generated with `blocking` feature in combination
439    /// // with `tokio` or `async-std` features.
440    /// #[cfg(feature = "blocking")]
441    /// let response = Bucket::list_buckets_blocking(region, credentials)?;
442    ///
443    /// let found_buckets = response.bucket_names().collect::<Vec<String>>();
444    /// println!("found buckets: {:#?}", found_buckets);
445    /// # Ok(())
446    /// # }
447    /// ```
448    #[maybe_async::maybe_async]
449    pub async fn list_buckets(
450        region: Region,
451        credentials: Credentials,
452    ) -> Result<crate::bucket_ops::ListBucketsResponse, S3Error> {
453        let dummy_bucket = Bucket::new("", region, credentials)?.with_path_style();
454        dummy_bucket._list_buckets().await
455    }
456
457    /// Internal helper method that performs the actual bucket listing operation.
458    /// Used by the public `list_buckets` method to retrieve the list of buckets for the configured client.
459    #[maybe_async::maybe_async]
460    async fn _list_buckets(&self) -> Result<crate::bucket_ops::ListBucketsResponse, S3Error> {
461        let request = RequestImpl::new(self, "", Command::ListBuckets).await?;
462        let response = request.response_data(false).await?;
463
464        Ok(quick_xml::de::from_str::<
465            crate::bucket_ops::ListBucketsResponse,
466        >(response.as_str()?)?)
467    }
468
469    /// Determine whether the instantiated bucket exists.
470    /// ```no_run
471    /// use s3::{Bucket, BucketConfiguration};
472    /// use s3::creds::Credentials;
473    /// use s3::region::Region;
474    /// use anyhow::Result;
475    ///
476    /// # #[tokio::main]
477    /// # async fn main() -> Result<()> {
478    /// let bucket_name = "some-bucket-that-is-known-to-exist";
479    /// let region = "us-east-1".parse()?;
480    /// let credentials = Credentials::default()?;
481    ///
482    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
483    ///
484    /// // Async variant with `tokio` or `async-std` features
485    /// let exists = bucket.exists().await?;
486    ///
487    /// // `sync` feature will produce an identical method
488    /// #[cfg(feature = "sync")]
489    /// let exists = bucket.exists()?;
490    ///
491    /// // Blocking variant, generated with `blocking` feature in combination
492    /// // with `tokio` or `async-std` features.
493    /// #[cfg(feature = "blocking")]
494    /// let exists = bucket.exists_blocking()?;
495    ///
496    /// assert_eq!(exists, true);
497    /// # Ok(())
498    /// # }
499    /// ```
500    #[maybe_async::maybe_async]
501    pub async fn exists(&self) -> Result<bool, S3Error> {
502        let mut dummy_bucket = self.clone();
503        dummy_bucket.name = "".into();
504
505        let response = dummy_bucket._list_buckets().await?;
506
507        Ok(response
508            .bucket_names()
509            .collect::<std::collections::HashSet<String>>()
510            .contains(&self.name))
511    }
512
513    /// Create a new `Bucket` with path style and instantiate it
514    ///
515    /// ```no_run
516    /// use s3::{Bucket, BucketConfiguration};
517    /// use s3::creds::Credentials;
518    /// # use s3::region::Region;
519    /// use anyhow::Result;
520    ///
521    /// # #[tokio::main]
522    /// # async fn main() -> Result<()> {
523    /// let bucket_name = "rust-s3-test";
524    /// let region = "us-east-1".parse()?;
525    /// let credentials = Credentials::default()?;
526    /// let config = BucketConfiguration::default();
527    ///
528    /// // Async variant with `tokio` or `async-std` features
529    /// let create_bucket_response = Bucket::create_with_path_style(bucket_name, region, credentials, config).await?;
530    ///
531    /// // `sync` fature will produce an identical method
532    /// #[cfg(feature = "sync")]
533    /// let create_bucket_response = Bucket::create_with_path_style(bucket_name, region, credentials, config)?;
534    ///
535    /// # let region: Region = "us-east-1".parse()?;
536    /// # let credentials = Credentials::default()?;
537    /// # let config = BucketConfiguration::default();
538    /// // Blocking variant, generated with `blocking` feature in combination
539    /// // with `tokio` or `async-std` features.
540    /// #[cfg(feature = "blocking")]
541    /// let create_bucket_response = Bucket::create_with_path_style_blocking(bucket_name, region, credentials, config)?;
542    /// # Ok(())
543    /// # }
544    /// ```
545    #[maybe_async::maybe_async]
546    pub async fn create_with_path_style(
547        name: &str,
548        region: Region,
549        credentials: Credentials,
550        config: BucketConfiguration,
551    ) -> Result<CreateBucketResponse, S3Error> {
552        let mut config = config;
553
554        // Check if we should skip location constraint for LocalStack/Minio compatibility
555        // This env var allows users to create buckets on S3-compatible services that
556        // don't support or require location constraints in the request body
557        let skip_constraint = std::env::var("RUST_S3_SKIP_LOCATION_CONSTRAINT")
558            .unwrap_or_default()
559            .to_lowercase();
560
561        if skip_constraint != "true" && skip_constraint != "1" {
562            config.set_region(region.clone());
563        }
564
565        let command = Command::CreateBucket { config };
566        let bucket = Bucket::new(name, region, credentials)?.with_path_style();
567        let request = RequestImpl::new(&bucket, "", command).await?;
568        let response_data = request.response_data(false).await?;
569        let response_text = response_data.to_string()?;
570
571        Ok(CreateBucketResponse {
572            bucket,
573            response_text,
574            response_code: response_data.status_code(),
575        })
576    }
577
578    /// Delete existing `Bucket`
579    ///
580    /// # Example
581    /// ```rust,no_run
582    /// use s3::Bucket;
583    /// use s3::creds::Credentials;
584    /// use anyhow::Result;
585    ///
586    /// # #[tokio::main]
587    /// # async fn main() -> Result<()> {
588    /// let bucket_name = "rust-s3-test";
589    /// let region = "us-east-1".parse().unwrap();
590    /// let credentials = Credentials::default().unwrap();
591    /// let bucket = Bucket::new(bucket_name, region, credentials).unwrap();
592    ///
593    /// // Async variant with `tokio` or `async-std` features
594    /// bucket.delete().await.unwrap();
595    /// // `sync` fature will produce an identical method
596    ///
597    /// #[cfg(feature = "sync")]
598    /// bucket.delete().unwrap();
599    /// // Blocking variant, generated with `blocking` feature in combination
600    /// // with `tokio` or `async-std` features.
601    ///
602    /// #[cfg(feature = "blocking")]
603    /// bucket.delete_blocking().unwrap();
604    ///
605    /// # Ok(())
606    /// # }
607    /// ```
608    #[maybe_async::maybe_async]
609    pub async fn delete(&self) -> Result<u16, S3Error> {
610        let command = Command::DeleteBucket;
611        let request = RequestImpl::new(self, "", command).await?;
612        let response_data = request.response_data(false).await?;
613        Ok(response_data.status_code())
614    }
615
616    /// Instantiate an existing `Bucket`.
617    ///
618    /// # Example
619    /// ```no_run
620    /// use s3::bucket::Bucket;
621    /// use s3::creds::Credentials;
622    ///
623    /// // Fake  credentials so we don't access user's real credentials in tests
624    /// let bucket_name = "rust-s3-test";
625    /// let region = "us-east-1".parse().unwrap();
626    /// let credentials = Credentials::default().unwrap();
627    ///
628    /// let bucket = Bucket::new(bucket_name, region, credentials).unwrap();
629    /// ```
630    pub fn new(
631        name: &str,
632        region: Region,
633        credentials: Credentials,
634    ) -> Result<Box<Bucket>, S3Error> {
635        #[cfg(feature = "with-tokio")]
636        let options = ClientOptions::default();
637
638        Ok(Box::new(Bucket {
639            name: name.into(),
640            region,
641            credentials: Arc::new(RwLock::new(credentials)),
642            extra_headers: HeaderMap::new(),
643            extra_query: HashMap::new(),
644            request_timeout: DEFAULT_REQUEST_TIMEOUT,
645            path_style: false,
646            listobjects_v2: true,
647            #[cfg(feature = "with-tokio")]
648            http_client: client(&options)?,
649            #[cfg(feature = "with-tokio")]
650            client_options: options,
651        }))
652    }
653
654    /// Instantiate a public existing `Bucket`.
655    ///
656    /// # Example
657    /// ```no_run
658    /// use s3::bucket::Bucket;
659    ///
660    /// let bucket_name = "rust-s3-test";
661    /// let region = "us-east-1".parse().unwrap();
662    ///
663    /// let bucket = Bucket::new_public(bucket_name, region).unwrap();
664    /// ```
665    pub fn new_public(name: &str, region: Region) -> Result<Bucket, S3Error> {
666        #[cfg(feature = "with-tokio")]
667        let options = ClientOptions::default();
668
669        Ok(Bucket {
670            name: name.into(),
671            region,
672            credentials: Arc::new(RwLock::new(Credentials::anonymous()?)),
673            extra_headers: HeaderMap::new(),
674            extra_query: HashMap::new(),
675            request_timeout: DEFAULT_REQUEST_TIMEOUT,
676            path_style: false,
677            listobjects_v2: true,
678            #[cfg(feature = "with-tokio")]
679            http_client: client(&options)?,
680            #[cfg(feature = "with-tokio")]
681            client_options: options,
682        })
683    }
684
685    pub fn with_path_style(&self) -> Box<Bucket> {
686        Box::new(Bucket {
687            name: self.name.clone(),
688            region: self.region.clone(),
689            credentials: self.credentials.clone(),
690            extra_headers: self.extra_headers.clone(),
691            extra_query: self.extra_query.clone(),
692            request_timeout: self.request_timeout,
693            path_style: true,
694            listobjects_v2: self.listobjects_v2,
695            #[cfg(feature = "with-tokio")]
696            http_client: self.http_client(),
697            #[cfg(feature = "with-tokio")]
698            client_options: self.client_options.clone(),
699        })
700    }
701
702    pub fn with_extra_headers(&self, extra_headers: HeaderMap) -> Result<Bucket, S3Error> {
703        Ok(Bucket {
704            name: self.name.clone(),
705            region: self.region.clone(),
706            credentials: self.credentials.clone(),
707            extra_headers,
708            extra_query: self.extra_query.clone(),
709            request_timeout: self.request_timeout,
710            path_style: self.path_style,
711            listobjects_v2: self.listobjects_v2,
712            #[cfg(feature = "with-tokio")]
713            http_client: self.http_client(),
714            #[cfg(feature = "with-tokio")]
715            client_options: self.client_options.clone(),
716        })
717    }
718
719    pub fn with_extra_query(
720        &self,
721        extra_query: HashMap<String, String>,
722    ) -> Result<Bucket, S3Error> {
723        Ok(Bucket {
724            name: self.name.clone(),
725            region: self.region.clone(),
726            credentials: self.credentials.clone(),
727            extra_headers: self.extra_headers.clone(),
728            extra_query,
729            request_timeout: self.request_timeout,
730            path_style: self.path_style,
731            listobjects_v2: self.listobjects_v2,
732            #[cfg(feature = "with-tokio")]
733            http_client: self.http_client(),
734            #[cfg(feature = "with-tokio")]
735            client_options: self.client_options.clone(),
736        })
737    }
738
739    #[cfg(not(feature = "with-tokio"))]
740    pub fn with_request_timeout(&self, request_timeout: Duration) -> Result<Box<Bucket>, S3Error> {
741        Ok(Box::new(Bucket {
742            name: self.name.clone(),
743            region: self.region.clone(),
744            credentials: self.credentials.clone(),
745            extra_headers: self.extra_headers.clone(),
746            extra_query: self.extra_query.clone(),
747            request_timeout: Some(request_timeout),
748            path_style: self.path_style,
749            listobjects_v2: self.listobjects_v2,
750        }))
751    }
752
753    #[cfg(feature = "with-tokio")]
754    pub fn with_request_timeout(&self, request_timeout: Duration) -> Result<Box<Bucket>, S3Error> {
755        let options = ClientOptions {
756            request_timeout: Some(request_timeout),
757            ..Default::default()
758        };
759
760        Ok(Box::new(Bucket {
761            name: self.name.clone(),
762            region: self.region.clone(),
763            credentials: self.credentials.clone(),
764            extra_headers: self.extra_headers.clone(),
765            extra_query: self.extra_query.clone(),
766            request_timeout: Some(request_timeout),
767            path_style: self.path_style,
768            listobjects_v2: self.listobjects_v2,
769            #[cfg(feature = "with-tokio")]
770            http_client: client(&options)?,
771            #[cfg(feature = "with-tokio")]
772            client_options: options,
773        }))
774    }
775
776    pub fn with_listobjects_v1(&self) -> Bucket {
777        Bucket {
778            name: self.name.clone(),
779            region: self.region.clone(),
780            credentials: self.credentials.clone(),
781            extra_headers: self.extra_headers.clone(),
782            extra_query: self.extra_query.clone(),
783            request_timeout: self.request_timeout,
784            path_style: self.path_style,
785            listobjects_v2: false,
786            #[cfg(feature = "with-tokio")]
787            http_client: self.http_client(),
788            #[cfg(feature = "with-tokio")]
789            client_options: self.client_options.clone(),
790        }
791    }
792
793    /// Configures a bucket to accept invalid SSL certificates and hostnames.
794    ///
795    /// This method is available only when either the `tokio-native-tls` or `tokio-rustls-tls` feature is enabled.
796    ///
797    /// # Parameters
798    ///
799    /// - `accept_invalid_certs`: A boolean flag that determines whether the client should accept invalid SSL certificates.
800    /// - `accept_invalid_hostnames`: A boolean flag that determines whether the client should accept invalid hostnames.
801    ///
802    /// # Returns
803    ///
804    /// Returns a `Result` containing the newly configured `Bucket` instance if successful, or an `S3Error` if an error occurs during client configuration.
805    ///
806    /// # Errors
807    ///
808    /// This function returns an `S3Error` if the HTTP client configuration fails.
809    ///
810    /// # Example
811    ///
812    /// ```rust
813    /// # use s3::bucket::Bucket;
814    /// # use s3::error::S3Error;
815    /// # use s3::creds::Credentials;
816    /// # use s3::Region;
817    /// # use std::str::FromStr;
818    ///
819    /// # fn example() -> Result<(), S3Error> {
820    /// let bucket = Bucket::new("my-bucket", Region::from_str("us-east-1")?, Credentials::default()?)?
821    ///     .set_dangereous_config(true, true)?;
822    /// # Ok(())
823    /// # }
824    ///
825    #[cfg(any(feature = "tokio-native-tls", feature = "tokio-rustls-tls"))]
826    pub fn set_dangereous_config(
827        &self,
828        accept_invalid_certs: bool,
829        accept_invalid_hostnames: bool,
830    ) -> Result<Bucket, S3Error> {
831        let mut options = self.client_options.clone();
832        options.accept_invalid_certs = accept_invalid_certs;
833        options.accept_invalid_hostnames = accept_invalid_hostnames;
834
835        Ok(Bucket {
836            name: self.name.clone(),
837            region: self.region.clone(),
838            credentials: self.credentials.clone(),
839            extra_headers: self.extra_headers.clone(),
840            extra_query: self.extra_query.clone(),
841            request_timeout: self.request_timeout,
842            path_style: self.path_style,
843            listobjects_v2: self.listobjects_v2,
844            http_client: client(&options)?,
845            client_options: options,
846        })
847    }
848
849    #[cfg(feature = "with-tokio")]
850    pub fn set_proxy(&self, proxy: reqwest::Proxy) -> Result<Bucket, S3Error> {
851        let mut options = self.client_options.clone();
852        options.proxy = Some(proxy);
853
854        Ok(Bucket {
855            name: self.name.clone(),
856            region: self.region.clone(),
857            credentials: self.credentials.clone(),
858            extra_headers: self.extra_headers.clone(),
859            extra_query: self.extra_query.clone(),
860            request_timeout: self.request_timeout,
861            path_style: self.path_style,
862            listobjects_v2: self.listobjects_v2,
863            http_client: client(&options)?,
864            client_options: options,
865        })
866    }
867
868    /// Copy file from an S3 path, internally within the same bucket.
869    ///
870    /// # Example:
871    ///
872    /// ```rust,no_run
873    /// use s3::bucket::Bucket;
874    /// use s3::creds::Credentials;
875    /// use anyhow::Result;
876    ///
877    /// # #[tokio::main]
878    /// # async fn main() -> Result<()> {
879    ///
880    /// let bucket_name = "rust-s3-test";
881    /// let region = "us-east-1".parse()?;
882    /// let credentials = Credentials::default()?;
883    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
884    ///
885    /// // Async variant with `tokio` or `async-std` features
886    /// let code = bucket.copy_object_internal("/from.file", "/to.file").await?;
887    ///
888    /// // `sync` feature will produce an identical method
889    /// #[cfg(feature = "sync")]
890    /// let code = bucket.copy_object_internal("/from.file", "/to.file")?;
891    ///
892    /// # Ok(())
893    /// # }
894    /// ```
895    #[maybe_async::maybe_async]
896    pub async fn copy_object_internal<F: AsRef<str>, T: AsRef<str>>(
897        &self,
898        from: F,
899        to: T,
900    ) -> Result<u16, S3Error> {
901        let fq_from = {
902            let from = from.as_ref();
903            let from = from.strip_prefix('/').unwrap_or(from);
904            format!("{bucket}/{path}", bucket = self.name(), path = from)
905        };
906        self.copy_object(fq_from, to).await
907    }
908
909    #[maybe_async::maybe_async]
910    async fn copy_object<F: AsRef<str>, T: AsRef<str>>(
911        &self,
912        from: F,
913        to: T,
914    ) -> Result<u16, S3Error> {
915        let command = Command::CopyObject {
916            from: from.as_ref(),
917        };
918        let request = RequestImpl::new(self, to.as_ref(), command).await?;
919        let response_data = request.response_data(false).await?;
920        Ok(response_data.status_code())
921    }
922
923    /// Gets file from an S3 path.
924    ///
925    /// # Example:
926    ///
927    /// ```rust,no_run
928    /// use s3::bucket::Bucket;
929    /// use s3::creds::Credentials;
930    /// use anyhow::Result;
931    ///
932    /// # #[tokio::main]
933    /// # async fn main() -> Result<()> {
934    ///
935    /// let bucket_name = "rust-s3-test";
936    /// let region = "us-east-1".parse()?;
937    /// let credentials = Credentials::default()?;
938    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
939    ///
940    /// // Async variant with `tokio` or `async-std` features
941    /// let response_data = bucket.get_object("/test.file").await?;
942    ///
943    /// // `sync` feature will produce an identical method
944    /// #[cfg(feature = "sync")]
945    /// let response_data = bucket.get_object("/test.file")?;
946    ///
947    /// // Blocking variant, generated with `blocking` feature in combination
948    /// // with `tokio` or `async-std` features.
949    /// #[cfg(feature = "blocking")]
950    /// let response_data = bucket.get_object_blocking("/test.file")?;
951    /// # Ok(())
952    /// # }
953    /// ```
954    #[maybe_async::maybe_async]
955    pub async fn get_object<S: AsRef<str>>(&self, path: S) -> Result<ResponseData, S3Error> {
956        let command = Command::GetObject;
957        let request = RequestImpl::new(self, path.as_ref(), command).await?;
958        request.response_data(false).await
959    }
960
961    #[maybe_async::maybe_async]
962    pub async fn get_object_attributes<S: AsRef<str>>(
963        &self,
964        path: S,
965        expected_bucket_owner: &str,
966        version_id: Option<String>,
967    ) -> Result<GetObjectAttributesOutput, S3Error> {
968        let command = Command::GetObjectAttributes {
969            expected_bucket_owner: expected_bucket_owner.to_string(),
970            version_id,
971        };
972        let request = RequestImpl::new(self, path.as_ref(), command).await?;
973
974        let response = request.response_data(false).await?;
975
976        Ok(quick_xml::de::from_str::<GetObjectAttributesOutput>(
977            response.as_str()?,
978        )?)
979    }
980
981    /// Checks if an object exists at the specified S3 path.
982    ///
983    /// # Example:
984    ///
985    /// ```rust,no_run
986    /// use s3::bucket::Bucket;
987    /// use s3::creds::Credentials;
988    /// use anyhow::Result;
989    ///
990    /// # #[tokio::main]
991    /// # async fn main() -> Result<()> {
992    ///
993    /// let bucket_name = "rust-s3-test";
994    /// let region = "us-east-1".parse()?;
995    /// let credentials = Credentials::default()?;
996    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
997    ///
998    /// // Async variant with `tokio` or `async-std` features
999    /// let exists = bucket.object_exists("/test.file").await?;
1000    ///
1001    /// // `sync` feature will produce an identical method
1002    /// #[cfg(feature = "sync")]
1003    /// let exists = bucket.object_exists("/test.file")?;
1004    ///
1005    /// // Blocking variant, generated with `blocking` feature in combination
1006    /// // with `tokio` or `async-std` features.
1007    /// #[cfg(feature = "blocking")]
1008    /// let exists = bucket.object_exists_blocking("/test.file")?;
1009    ///
1010    /// if exists {
1011    ///     println!("Object exists.");
1012    /// } else {
1013    ///     println!("Object does not exist.");
1014    /// }
1015    /// # Ok(())
1016    /// # }
1017    /// ```
1018    ///
1019    /// # Errors
1020    ///
1021    /// This function will return an `Err` if the request to the S3 service fails or if there is an unexpected error.
1022    /// It will return `Ok(false)` if the object does not exist (i.e., the server returns a 404 status code).
1023    #[maybe_async::maybe_async]
1024    pub async fn object_exists<S: AsRef<str>>(&self, path: S) -> Result<bool, S3Error> {
1025        let command = Command::HeadObject;
1026        let request = RequestImpl::new(self, path.as_ref(), command).await?;
1027        let response_data = match request.response_data(false).await {
1028            Ok(response_data) => response_data,
1029            Err(S3Error::HttpFailWithBody(status_code, error)) => {
1030                if status_code == 404 {
1031                    return Ok(false);
1032                }
1033                return Err(S3Error::HttpFailWithBody(status_code, error));
1034            }
1035            Err(e) => return Err(e),
1036        };
1037        Ok(response_data.status_code() != 404)
1038    }
1039
1040    #[maybe_async::maybe_async]
1041    pub async fn put_bucket_cors(
1042        &self,
1043        expected_bucket_owner: &str,
1044        cors_config: &CorsConfiguration,
1045    ) -> Result<ResponseData, S3Error> {
1046        let command = Command::PutBucketCors {
1047            expected_bucket_owner: expected_bucket_owner.to_string(),
1048            configuration: cors_config.clone(),
1049        };
1050        let request = RequestImpl::new(self, "", command).await?;
1051        request.response_data(false).await
1052    }
1053
1054    #[maybe_async::maybe_async]
1055    pub async fn get_bucket_cors(
1056        &self,
1057        expected_bucket_owner: &str,
1058    ) -> Result<CorsConfiguration, S3Error> {
1059        let command = Command::GetBucketCors {
1060            expected_bucket_owner: expected_bucket_owner.to_string(),
1061        };
1062        let request = RequestImpl::new(self, "", command).await?;
1063        let response = request.response_data(false).await?;
1064        Ok(quick_xml::de::from_str::<CorsConfiguration>(
1065            response.as_str()?,
1066        )?)
1067    }
1068
1069    #[maybe_async::maybe_async]
1070    pub async fn delete_bucket_cors(
1071        &self,
1072        expected_bucket_owner: &str,
1073    ) -> Result<ResponseData, S3Error> {
1074        let command = Command::DeleteBucketCors {
1075            expected_bucket_owner: expected_bucket_owner.to_string(),
1076        };
1077        let request = RequestImpl::new(self, "", command).await?;
1078        request.response_data(false).await
1079    }
1080
1081    #[maybe_async::maybe_async]
1082    pub async fn get_bucket_lifecycle(&self) -> Result<BucketLifecycleConfiguration, S3Error> {
1083        let request = RequestImpl::new(self, "", Command::GetBucketLifecycle).await?;
1084        let response = request.response_data(false).await?;
1085        Ok(quick_xml::de::from_str::<BucketLifecycleConfiguration>(
1086            response.as_str()?,
1087        )?)
1088    }
1089
1090    #[maybe_async::maybe_async]
1091    pub async fn put_bucket_lifecycle(
1092        &self,
1093        lifecycle_config: BucketLifecycleConfiguration,
1094    ) -> Result<ResponseData, S3Error> {
1095        let command = Command::PutBucketLifecycle {
1096            configuration: lifecycle_config,
1097        };
1098        let request = RequestImpl::new(self, "", command).await?;
1099        request.response_data(false).await
1100    }
1101
1102    #[maybe_async::maybe_async]
1103    pub async fn delete_bucket_lifecycle(&self) -> Result<ResponseData, S3Error> {
1104        let request = RequestImpl::new(self, "", Command::DeleteBucketLifecycle).await?;
1105        request.response_data(false).await
1106    }
1107
1108    /// Gets torrent from an S3 path.
1109    ///
1110    /// # Example:
1111    ///
1112    /// ```rust,no_run
1113    /// use s3::bucket::Bucket;
1114    /// use s3::creds::Credentials;
1115    /// use anyhow::Result;
1116    ///
1117    /// # #[tokio::main]
1118    /// # async fn main() -> Result<()> {
1119    ///
1120    /// let bucket_name = "rust-s3-test";
1121    /// let region = "us-east-1".parse()?;
1122    /// let credentials = Credentials::default()?;
1123    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
1124    ///
1125    /// // Async variant with `tokio` or `async-std` features
1126    /// let response_data = bucket.get_object_torrent("/test.file").await?;
1127    ///
1128    /// // `sync` feature will produce an identical method
1129    /// #[cfg(feature = "sync")]
1130    /// let response_data = bucket.get_object_torrent("/test.file")?;
1131    ///
1132    /// // Blocking variant, generated with `blocking` feature in combination
1133    /// // with `tokio` or `async-std` features.
1134    /// #[cfg(feature = "blocking")]
1135    /// let response_data = bucket.get_object_torrent_blocking("/test.file")?;
1136    /// # Ok(())
1137    /// # }
1138    /// ```
1139    #[maybe_async::maybe_async]
1140    pub async fn get_object_torrent<S: AsRef<str>>(
1141        &self,
1142        path: S,
1143    ) -> Result<ResponseData, S3Error> {
1144        let command = Command::GetObjectTorrent;
1145        let request = RequestImpl::new(self, path.as_ref(), command).await?;
1146        request.response_data(false).await
1147    }
1148
1149    /// Gets specified inclusive byte range of file from an S3 path.
1150    ///
1151    /// # Example:
1152    ///
1153    /// ```rust,no_run
1154    /// use s3::bucket::Bucket;
1155    /// use s3::creds::Credentials;
1156    /// use anyhow::Result;
1157    ///
1158    /// # #[tokio::main]
1159    /// # async fn main() -> Result<()> {
1160    ///
1161    /// let bucket_name = "rust-s3-test";
1162    /// let region = "us-east-1".parse()?;
1163    /// let credentials = Credentials::default()?;
1164    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
1165    ///
1166    /// // Async variant with `tokio` or `async-std` features
1167    /// let response_data = bucket.get_object_range("/test.file", 0, Some(31)).await?;
1168    ///
1169    /// // `sync` feature will produce an identical method
1170    /// #[cfg(feature = "sync")]
1171    /// let response_data = bucket.get_object_range("/test.file", 0, Some(31))?;
1172    ///
1173    /// // Blocking variant, generated with `blocking` feature in combination
1174    /// // with `tokio` or `async-std` features.
1175    /// #[cfg(feature = "blocking")]
1176    /// let response_data = bucket.get_object_range_blocking("/test.file", 0, Some(31))?;
1177    /// #
1178    /// # Ok(())
1179    /// # }
1180    /// ```
1181    #[maybe_async::maybe_async]
1182    pub async fn get_object_range<S: AsRef<str>>(
1183        &self,
1184        path: S,
1185        start: u64,
1186        end: Option<u64>,
1187    ) -> Result<ResponseData, S3Error> {
1188        if let Some(end) = end {
1189            assert!(start < end);
1190        }
1191
1192        let command = Command::GetObjectRange { start, end };
1193        let request = RequestImpl::new(self, path.as_ref(), command).await?;
1194        request.response_data(false).await
1195    }
1196
1197    /// Stream range of bytes from S3 path to a local file, generic over T: Write.
1198    ///
1199    /// # Example:
1200    ///
1201    /// ```rust,no_run
1202    /// use s3::bucket::Bucket;
1203    /// use s3::creds::Credentials;
1204    /// use anyhow::Result;
1205    /// use std::fs::File;
1206    ///
1207    /// # #[tokio::main]
1208    /// # async fn main() -> Result<()> {
1209    ///
1210    /// let bucket_name = "rust-s3-test";
1211    /// let region = "us-east-1".parse()?;
1212    /// let credentials = Credentials::default()?;
1213    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
1214    /// let mut output_file = File::create("output_file").expect("Unable to create file");
1215    /// let mut async_output_file = tokio::fs::File::create("async_output_file").await.expect("Unable to create file");
1216    /// #[cfg(feature = "with-async-std")]
1217    /// let mut async_output_file = async_std::fs::File::create("async_output_file").await.expect("Unable to create file");
1218    ///
1219    /// let start = 0;
1220    /// let end = Some(1024);
1221    ///
1222    /// // Async variant with `tokio` or `async-std` features
1223    /// let status_code = bucket.get_object_range_to_writer("/test.file", start, end, &mut async_output_file).await?;
1224    ///
1225    /// // `sync` feature will produce an identical method
1226    /// #[cfg(feature = "sync")]
1227    /// let status_code = bucket.get_object_range_to_writer("/test.file", start, end, &mut output_file)?;
1228    ///
1229    /// // Blocking variant, generated with `blocking` feature in combination
1230    /// // with `tokio` or `async-std` features. Based of the async branch
1231    /// #[cfg(feature = "blocking")]
1232    /// let status_code = bucket.get_object_range_to_writer_blocking("/test.file", start, end, &mut async_output_file)?;
1233    /// #
1234    /// # Ok(())
1235    /// # }
1236    /// ```
1237    #[maybe_async::async_impl]
1238    pub async fn get_object_range_to_writer<T, S>(
1239        &self,
1240        path: S,
1241        start: u64,
1242        end: Option<u64>,
1243        writer: &mut T,
1244    ) -> Result<u16, S3Error>
1245    where
1246        T: AsyncWrite + Send + Unpin + ?Sized,
1247        S: AsRef<str>,
1248    {
1249        if let Some(end) = end {
1250            assert!(start < end);
1251        }
1252
1253        let command = Command::GetObjectRange { start, end };
1254        let request = RequestImpl::new(self, path.as_ref(), command).await?;
1255        request.response_data_to_writer(writer).await
1256    }
1257
1258    #[maybe_async::sync_impl]
1259    pub fn get_object_range_to_writer<T: std::io::Write + Send + ?Sized, S: AsRef<str>>(
1260        &self,
1261        path: S,
1262        start: u64,
1263        end: Option<u64>,
1264        writer: &mut T,
1265    ) -> Result<u16, S3Error> {
1266        if let Some(end) = end {
1267            assert!(start < end);
1268        }
1269
1270        let command = Command::GetObjectRange { start, end };
1271        let request = RequestImpl::new(self, path.as_ref(), command)?;
1272        request.response_data_to_writer(writer)
1273    }
1274
1275    /// Stream file from S3 path to a local file, generic over T: Write.
1276    ///
1277    /// # Example:
1278    ///
1279    /// ```rust,no_run
1280    /// use s3::bucket::Bucket;
1281    /// use s3::creds::Credentials;
1282    /// use anyhow::Result;
1283    /// use std::fs::File;
1284    ///
1285    /// # #[tokio::main]
1286    /// # async fn main() -> Result<()> {
1287    ///
1288    /// let bucket_name = "rust-s3-test";
1289    /// let region = "us-east-1".parse()?;
1290    /// let credentials = Credentials::default()?;
1291    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
1292    /// let mut output_file = File::create("output_file").expect("Unable to create file");
1293    /// let mut async_output_file = tokio::fs::File::create("async_output_file").await.expect("Unable to create file");
1294    /// #[cfg(feature = "with-async-std")]
1295    /// let mut async_output_file = async_std::fs::File::create("async_output_file").await.expect("Unable to create file");
1296    ///
1297    /// // Async variant with `tokio` or `async-std` features
1298    /// let status_code = bucket.get_object_to_writer("/test.file", &mut async_output_file).await?;
1299    ///
1300    /// // `sync` feature will produce an identical method
1301    /// #[cfg(feature = "sync")]
1302    /// let status_code = bucket.get_object_to_writer("/test.file", &mut output_file)?;
1303    ///
1304    /// // Blocking variant, generated with `blocking` feature in combination
1305    /// // with `tokio` or `async-std` features. Based of the async branch
1306    /// #[cfg(feature = "blocking")]
1307    /// let status_code = bucket.get_object_to_writer_blocking("/test.file", &mut async_output_file)?;
1308    /// #
1309    /// # Ok(())
1310    /// # }
1311    /// ```
1312    #[maybe_async::async_impl]
1313    pub async fn get_object_to_writer<T: AsyncWrite + Send + Unpin + ?Sized, S: AsRef<str>>(
1314        &self,
1315        path: S,
1316        writer: &mut T,
1317    ) -> Result<u16, S3Error> {
1318        let command = Command::GetObject;
1319        let request = RequestImpl::new(self, path.as_ref(), command).await?;
1320        request.response_data_to_writer(writer).await
1321    }
1322
1323    #[maybe_async::sync_impl]
1324    pub fn get_object_to_writer<T: std::io::Write + Send + ?Sized, S: AsRef<str>>(
1325        &self,
1326        path: S,
1327        writer: &mut T,
1328    ) -> Result<u16, S3Error> {
1329        let command = Command::GetObject;
1330        let request = RequestImpl::new(self, path.as_ref(), command)?;
1331        request.response_data_to_writer(writer)
1332    }
1333
1334    /// Stream file from S3 path to a local file using an async stream.
1335    ///
1336    /// # Example
1337    ///
1338    /// ```rust,no_run
1339    /// use s3::bucket::Bucket;
1340    /// use s3::creds::Credentials;
1341    /// use anyhow::Result;
1342    /// #[cfg(feature = "with-tokio")]
1343    /// use tokio_stream::StreamExt;
1344    /// #[cfg(feature = "with-tokio")]
1345    /// use tokio::io::AsyncWriteExt;
1346    /// #[cfg(feature = "with-async-std")]
1347    /// use async_std::stream::StreamExt;
1348    /// #[cfg(feature = "with-async-std")]
1349    /// use async_std::io::WriteExt;
1350    ///
1351    /// # #[tokio::main]
1352    /// # async fn main() -> Result<()> {
1353    ///
1354    /// let bucket_name = "rust-s3-test";
1355    /// let region = "us-east-1".parse()?;
1356    /// let credentials = Credentials::default()?;
1357    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
1358    /// let path = "path";
1359    ///
1360    /// let mut response_data_stream = bucket.get_object_stream(path).await?;
1361    ///
1362    /// #[cfg(feature = "with-tokio")]
1363    /// let mut async_output_file = tokio::fs::File::create("async_output_file").await.expect("Unable to create file");
1364    /// #[cfg(feature = "with-async-std")]
1365    /// let mut async_output_file = async_std::fs::File::create("async_output_file").await.expect("Unable to create file");
1366    ///
1367    /// while let Some(chunk) = response_data_stream.bytes().next().await {
1368    ///     async_output_file.write_all(&chunk.unwrap()).await?;
1369    /// }
1370    ///
1371    /// #
1372    /// # Ok(())
1373    /// # }
1374    /// ```
1375    #[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
1376    pub async fn get_object_stream<S: AsRef<str>>(
1377        &self,
1378        path: S,
1379    ) -> Result<ResponseDataStream, S3Error> {
1380        let command = Command::GetObject;
1381        let request = RequestImpl::new(self, path.as_ref(), command).await?;
1382        request.response_data_to_stream().await
1383    }
1384
1385    /// Stream file from local path to s3, generic over T: Write.
1386    ///
1387    /// # Example:
1388    ///
1389    /// ```rust,no_run
1390    /// use s3::bucket::Bucket;
1391    /// use s3::creds::Credentials;
1392    /// use anyhow::Result;
1393    /// use std::fs::File;
1394    /// use std::io::Write;
1395    ///
1396    /// # #[tokio::main]
1397    /// # async fn main() -> Result<()> {
1398    ///
1399    /// let bucket_name = "rust-s3-test";
1400    /// let region = "us-east-1".parse()?;
1401    /// let credentials = Credentials::default()?;
1402    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
1403    /// let path = "path";
1404    /// let test: Vec<u8> = (0..1000).map(|_| 42).collect();
1405    /// let mut file = File::create(path)?;
1406    /// // tokio open file
1407    /// let mut async_output_file = tokio::fs::File::create("async_output_file").await.expect("Unable to create file");
1408    /// file.write_all(&test)?;
1409    ///
1410    /// // Generic over std::io::Read
1411    /// #[cfg(feature = "with-tokio")]
1412    /// let status_code = bucket.put_object_stream(&mut async_output_file, "/path").await?;
1413    ///
1414    ///
1415    /// #[cfg(feature = "with-async-std")]
1416    /// let mut async_output_file = async_std::fs::File::create("async_output_file").await.expect("Unable to create file");
1417    ///
1418    /// // `sync` feature will produce an identical method
1419    /// #[cfg(feature = "sync")]
1420    /// // Generic over std::io::Read
1421    /// let status_code = bucket.put_object_stream(&mut path, "/path")?;
1422    ///
1423    /// // Blocking variant, generated with `blocking` feature in combination
1424    /// // with `tokio` or `async-std` features.
1425    /// #[cfg(feature = "blocking")]
1426    /// let status_code = bucket.put_object_stream_blocking(&mut path, "/path")?;
1427    /// #
1428    /// # Ok(())
1429    /// # }
1430    /// ```
1431    #[maybe_async::async_impl]
1432    pub async fn put_object_stream<R: AsyncRead + Unpin + ?Sized>(
1433        &self,
1434        reader: &mut R,
1435        s3_path: impl AsRef<str>,
1436    ) -> Result<PutStreamResponse, S3Error> {
1437        self._put_object_stream_with_content_type(
1438            reader,
1439            s3_path.as_ref(),
1440            "application/octet-stream",
1441        )
1442        .await
1443    }
1444
1445    /// Create a builder for streaming PUT operations with custom options
1446    ///
1447    /// # Example:
1448    ///
1449    /// ```no_run
1450    /// use s3::bucket::Bucket;
1451    /// use s3::creds::Credentials;
1452    /// use anyhow::Result;
1453    ///
1454    /// # #[cfg(feature = "with-tokio")]
1455    /// # #[tokio::main]
1456    /// # async fn main() -> Result<()> {
1457    /// # use tokio::fs::File;
1458    ///
1459    /// let bucket = Bucket::new("my-bucket", "us-east-1".parse()?, Credentials::default()?)?;
1460    ///
1461    /// # #[cfg(feature = "with-tokio")]
1462    /// let mut file = File::open("large-file.zip").await?;
1463    ///
1464    /// // Stream upload with custom headers using builder pattern
1465    /// let response = bucket.put_object_stream_builder("/large-file.zip")
1466    ///     .with_content_type("application/zip")
1467    ///     .with_cache_control("public, max-age=3600")?
1468    ///     .with_metadata("uploaded-by", "stream-builder")?
1469    ///     .execute_stream(&mut file)
1470    ///     .await?;
1471    /// #
1472    /// # Ok(())
1473    /// # }
1474    /// # #[cfg(not(feature = "with-tokio"))]
1475    /// # fn main() {}
1476    /// ```
1477    #[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
1478    pub fn put_object_stream_builder<S: AsRef<str>>(
1479        &self,
1480        path: S,
1481    ) -> crate::put_object_request::PutObjectStreamRequest<'_> {
1482        crate::put_object_request::PutObjectStreamRequest::new(self, path)
1483    }
1484
1485    #[maybe_async::sync_impl]
1486    pub fn put_object_stream<R: Read>(
1487        &self,
1488        reader: &mut R,
1489        s3_path: impl AsRef<str>,
1490    ) -> Result<u16, S3Error> {
1491        self._put_object_stream_with_content_type(
1492            reader,
1493            s3_path.as_ref(),
1494            "application/octet-stream",
1495        )
1496    }
1497
1498    /// Stream file from local path to s3, generic over T: Write with explicit content type.
1499    ///
1500    /// # Example:
1501    ///
1502    /// ```rust,no_run
1503    /// use s3::bucket::Bucket;
1504    /// use s3::creds::Credentials;
1505    /// use anyhow::Result;
1506    /// use std::fs::File;
1507    /// use std::io::Write;
1508    ///
1509    /// # #[tokio::main]
1510    /// # async fn main() -> Result<()> {
1511    ///
1512    /// let bucket_name = "rust-s3-test";
1513    /// let region = "us-east-1".parse()?;
1514    /// let credentials = Credentials::default()?;
1515    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
1516    /// let path = "path";
1517    /// let test: Vec<u8> = (0..1000).map(|_| 42).collect();
1518    /// let mut file = File::create(path)?;
1519    /// file.write_all(&test)?;
1520    ///
1521    /// #[cfg(feature = "with-tokio")]
1522    /// let mut async_output_file = tokio::fs::File::create("async_output_file").await.expect("Unable to create file");
1523    ///
1524    /// #[cfg(feature = "with-async-std")]
1525    /// let mut async_output_file = async_std::fs::File::create("async_output_file").await.expect("Unable to create file");
1526    ///
1527    /// // Async variant with `tokio` or `async-std` features
1528    /// // Generic over std::io::Read
1529    /// let status_code = bucket
1530    ///     .put_object_stream_with_content_type(&mut async_output_file, "/path", "application/octet-stream")
1531    ///     .await?;
1532    ///
1533    /// // `sync` feature will produce an identical method
1534    /// #[cfg(feature = "sync")]
1535    /// // Generic over std::io::Read
1536    /// let status_code = bucket
1537    ///     .put_object_stream_with_content_type(&mut path, "/path", "application/octet-stream")?;
1538    ///
1539    /// // Blocking variant, generated with `blocking` feature in combination
1540    /// // with `tokio` or `async-std` features.
1541    /// #[cfg(feature = "blocking")]
1542    /// let status_code = bucket
1543    ///     .put_object_stream_with_content_type_blocking(&mut path, "/path", "application/octet-stream")?;
1544    /// #
1545    /// # Ok(())
1546    /// # }
1547    /// ```
1548    #[maybe_async::async_impl]
1549    pub async fn put_object_stream_with_content_type<R: AsyncRead + Unpin>(
1550        &self,
1551        reader: &mut R,
1552        s3_path: impl AsRef<str>,
1553        content_type: impl AsRef<str>,
1554    ) -> Result<PutStreamResponse, S3Error> {
1555        self._put_object_stream_with_content_type(reader, s3_path.as_ref(), content_type.as_ref())
1556            .await
1557    }
1558
1559    #[maybe_async::sync_impl]
1560    pub fn put_object_stream_with_content_type<R: Read>(
1561        &self,
1562        reader: &mut R,
1563        s3_path: impl AsRef<str>,
1564        content_type: impl AsRef<str>,
1565    ) -> Result<u16, S3Error> {
1566        self._put_object_stream_with_content_type(reader, s3_path.as_ref(), content_type.as_ref())
1567    }
1568
1569    #[maybe_async::async_impl]
1570    async fn make_multipart_request(
1571        &self,
1572        path: &str,
1573        chunk: Vec<u8>,
1574        part_number: u32,
1575        upload_id: &str,
1576        content_type: &str,
1577    ) -> Result<ResponseData, S3Error> {
1578        let command = Command::PutObject {
1579            content: &chunk,
1580            multipart: Some(Multipart::new(part_number, upload_id)), // upload_id: &msg.upload_id,
1581            custom_headers: None,
1582            content_type,
1583        };
1584        let request = RequestImpl::new(self, path, command).await?;
1585        request.response_data(true).await
1586    }
1587
1588    #[maybe_async::async_impl]
1589    async fn _put_object_stream_with_content_type<R: AsyncRead + Unpin + ?Sized>(
1590        &self,
1591        reader: &mut R,
1592        s3_path: &str,
1593        content_type: &str,
1594    ) -> Result<PutStreamResponse, S3Error> {
1595        self._put_object_stream_with_content_type_and_headers(reader, s3_path, content_type, None)
1596            .await
1597    }
1598
1599    /// Calculate the maximum number of concurrent chunks based on available memory.
1600    /// Returns a value between 2 and 10, defaulting to 3 if memory detection fails.
1601    #[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
1602    fn calculate_max_concurrent_chunks() -> usize {
1603        // Create a new System instance and refresh memory info
1604        let mut system = System::new();
1605        system.refresh_memory_specifics(MemoryRefreshKind::everything());
1606
1607        // Get available memory in bytes
1608        let available_memory = system.available_memory();
1609
1610        // If we can't get memory info, use a conservative default
1611        if available_memory == 0 {
1612            return 3;
1613        }
1614
1615        // CHUNK_SIZE is 8MB (8_388_608 bytes)
1616        // Use a safety factor of 3 to leave room for other operations
1617        // and account for memory that might be allocated during upload
1618        let safety_factor = 3;
1619        let memory_per_chunk = CHUNK_SIZE as u64 * safety_factor;
1620
1621        // Calculate how many chunks we can safely handle concurrently
1622        let calculated_chunks = (available_memory / memory_per_chunk) as usize;
1623
1624        // Clamp between 2 and 100 for safety
1625        // Minimum 2 to maintain some parallelism
1626        // Maximum 100 to prevent too many concurrent connections
1627        calculated_chunks.clamp(2, 100)
1628    }
1629
1630    #[maybe_async::async_impl]
1631    pub(crate) async fn _put_object_stream_with_content_type_and_headers<
1632        R: AsyncRead + Unpin + ?Sized,
1633    >(
1634        &self,
1635        reader: &mut R,
1636        s3_path: &str,
1637        content_type: &str,
1638        custom_headers: Option<http::HeaderMap>,
1639    ) -> Result<PutStreamResponse, S3Error> {
1640        // If the file is smaller CHUNK_SIZE, just do a regular upload.
1641        // Otherwise perform a multi-part upload.
1642        let first_chunk = crate::utils::read_chunk_async(reader).await?;
1643        // println!("First chunk size: {}", first_chunk.len());
1644        if first_chunk.len() < CHUNK_SIZE {
1645            let total_size = first_chunk.len();
1646            // Use the builder pattern for small files
1647            let mut builder = self
1648                .put_object_builder(s3_path, first_chunk.as_slice())
1649                .with_content_type(content_type);
1650
1651            // Add custom headers if provided
1652            if let Some(headers) = custom_headers {
1653                builder = builder.with_headers(headers);
1654            }
1655
1656            let response_data = builder.execute().await?;
1657            if response_data.status_code() >= 300 {
1658                return Err(error_from_response_data(response_data)?);
1659            }
1660            return Ok(PutStreamResponse::new(
1661                response_data.status_code(),
1662                total_size,
1663            ));
1664        }
1665
1666        let msg = self
1667            .initiate_multipart_upload(s3_path, content_type)
1668            .await?;
1669        let path = msg.key;
1670        let upload_id = &msg.upload_id;
1671
1672        // Determine max concurrent chunks based on available memory
1673        let max_concurrent_chunks = Self::calculate_max_concurrent_chunks();
1674
1675        // Use FuturesUnordered for bounded parallelism
1676        use futures_util::FutureExt;
1677        use futures_util::stream::{FuturesUnordered, StreamExt};
1678
1679        let mut part_number: u32 = 0;
1680        let mut total_size = 0;
1681        let mut etags = Vec::new();
1682        let mut active_uploads: FuturesUnordered<
1683            futures_util::future::BoxFuture<'_, (u32, Result<ResponseData, S3Error>)>,
1684        > = FuturesUnordered::new();
1685        let mut reading_done = false;
1686
1687        // Process first chunk
1688        part_number += 1;
1689        total_size += first_chunk.len();
1690        if first_chunk.len() < CHUNK_SIZE {
1691            reading_done = true;
1692        }
1693
1694        let path_clone = path.clone();
1695        let upload_id_clone = upload_id.clone();
1696        let content_type_clone = content_type.to_string();
1697        let bucket_clone = self.clone();
1698
1699        active_uploads.push(
1700            async move {
1701                let result = bucket_clone
1702                    .make_multipart_request(
1703                        &path_clone,
1704                        first_chunk,
1705                        1,
1706                        &upload_id_clone,
1707                        &content_type_clone,
1708                    )
1709                    .await;
1710                (1, result)
1711            }
1712            .boxed(),
1713        );
1714
1715        // Main upload loop with bounded parallelism
1716        while !active_uploads.is_empty() || !reading_done {
1717            // Start new uploads if we have room and more data to read
1718            while active_uploads.len() < max_concurrent_chunks && !reading_done {
1719                let chunk = crate::utils::read_chunk_async(reader).await?;
1720                let chunk_len = chunk.len();
1721
1722                if chunk_len == 0 {
1723                    reading_done = true;
1724                    break;
1725                }
1726
1727                total_size += chunk_len;
1728                part_number += 1;
1729
1730                if chunk_len < CHUNK_SIZE {
1731                    reading_done = true;
1732                }
1733
1734                let current_part = part_number;
1735                let path_clone = path.clone();
1736                let upload_id_clone = upload_id.clone();
1737                let content_type_clone = content_type.to_string();
1738                let bucket_clone = self.clone();
1739
1740                active_uploads.push(
1741                    async move {
1742                        let result = bucket_clone
1743                            .make_multipart_request(
1744                                &path_clone,
1745                                chunk,
1746                                current_part,
1747                                &upload_id_clone,
1748                                &content_type_clone,
1749                            )
1750                            .await;
1751                        (current_part, result)
1752                    }
1753                    .boxed(),
1754                );
1755            }
1756
1757            // Process completed uploads
1758            if let Some((part_num, result)) = active_uploads.next().await {
1759                let response_data = result?;
1760                if !(200..300).contains(&response_data.status_code()) {
1761                    // if chunk upload failed - abort the upload
1762                    match self.abort_upload(&path, upload_id).await {
1763                        Ok(_) => {
1764                            return Err(error_from_response_data(response_data)?);
1765                        }
1766                        Err(error) => {
1767                            return Err(error);
1768                        }
1769                    }
1770                }
1771
1772                let etag = response_data.as_str()?;
1773                // Store part number with etag to sort later
1774                etags.push((part_num, etag.to_string()));
1775            }
1776        }
1777
1778        // Sort etags by part number to ensure correct order
1779        etags.sort_by_key(|k| k.0);
1780        let etags: Vec<String> = etags.into_iter().map(|(_, etag)| etag).collect();
1781
1782        // Finish the upload
1783        let inner_data = etags
1784            .clone()
1785            .into_iter()
1786            .enumerate()
1787            .map(|(i, x)| Part {
1788                etag: x,
1789                part_number: i as u32 + 1,
1790            })
1791            .collect::<Vec<Part>>();
1792        let response_data = self
1793            .complete_multipart_upload(&path, &msg.upload_id, inner_data)
1794            .await?;
1795
1796        Ok(PutStreamResponse::new(
1797            response_data.status_code(),
1798            total_size,
1799        ))
1800    }
1801
1802    #[maybe_async::sync_impl]
1803    fn _put_object_stream_with_content_type<R: Read + ?Sized>(
1804        &self,
1805        reader: &mut R,
1806        s3_path: &str,
1807        content_type: &str,
1808    ) -> Result<u16, S3Error> {
1809        let msg = self.initiate_multipart_upload(s3_path, content_type)?;
1810        let path = msg.key;
1811        let upload_id = &msg.upload_id;
1812
1813        let mut part_number: u32 = 0;
1814        let mut etags = Vec::new();
1815        loop {
1816            let chunk = crate::utils::read_chunk(reader)?;
1817
1818            if chunk.len() < CHUNK_SIZE {
1819                if part_number == 0 {
1820                    // Files is not big enough for multipart upload, going with regular put_object
1821                    self.abort_upload(&path, upload_id)?;
1822
1823                    return Ok(self.put_object(s3_path, chunk.as_slice())?.status_code());
1824                } else {
1825                    part_number += 1;
1826                    let part = self.put_multipart_chunk(
1827                        &chunk,
1828                        &path,
1829                        part_number,
1830                        upload_id,
1831                        content_type,
1832                    )?;
1833                    etags.push(part.etag);
1834                    let inner_data = etags
1835                        .into_iter()
1836                        .enumerate()
1837                        .map(|(i, x)| Part {
1838                            etag: x,
1839                            part_number: i as u32 + 1,
1840                        })
1841                        .collect::<Vec<Part>>();
1842                    return Ok(self
1843                        .complete_multipart_upload(&path, upload_id, inner_data)?
1844                        .status_code());
1845                    // let response = std::str::from_utf8(data.as_slice())?;
1846                }
1847            } else {
1848                part_number += 1;
1849                let part =
1850                    self.put_multipart_chunk(&chunk, &path, part_number, upload_id, content_type)?;
1851                etags.push(part.etag.to_string());
1852            }
1853        }
1854    }
1855
1856    /// Initiate multipart upload to s3.
1857    #[maybe_async::async_impl]
1858    pub async fn initiate_multipart_upload(
1859        &self,
1860        s3_path: &str,
1861        content_type: &str,
1862    ) -> Result<InitiateMultipartUploadResponse, S3Error> {
1863        let command = Command::InitiateMultipartUpload { content_type };
1864        let request = RequestImpl::new(self, s3_path, command).await?;
1865        let response_data = request.response_data(false).await?;
1866        if response_data.status_code() >= 300 {
1867            return Err(error_from_response_data(response_data)?);
1868        }
1869
1870        let msg: InitiateMultipartUploadResponse =
1871            quick_xml::de::from_str(response_data.as_str()?)?;
1872        Ok(msg)
1873    }
1874
1875    #[maybe_async::sync_impl]
1876    pub fn initiate_multipart_upload(
1877        &self,
1878        s3_path: &str,
1879        content_type: &str,
1880    ) -> Result<InitiateMultipartUploadResponse, S3Error> {
1881        let command = Command::InitiateMultipartUpload { content_type };
1882        let request = RequestImpl::new(self, s3_path, command)?;
1883        let response_data = request.response_data(false)?;
1884        if response_data.status_code() >= 300 {
1885            return Err(error_from_response_data(response_data)?);
1886        }
1887
1888        let msg: InitiateMultipartUploadResponse =
1889            quick_xml::de::from_str(response_data.as_str()?)?;
1890        Ok(msg)
1891    }
1892
1893    /// Upload a streamed multipart chunk to s3 using a previously initiated multipart upload
1894    #[maybe_async::async_impl]
1895    pub async fn put_multipart_stream<R: Read + Unpin>(
1896        &self,
1897        reader: &mut R,
1898        path: &str,
1899        part_number: u32,
1900        upload_id: &str,
1901        content_type: &str,
1902    ) -> Result<Part, S3Error> {
1903        let chunk = crate::utils::read_chunk(reader)?;
1904        self.put_multipart_chunk(chunk, path, part_number, upload_id, content_type)
1905            .await
1906    }
1907
1908    #[maybe_async::sync_impl]
1909    pub async fn put_multipart_stream<R: Read + Unpin>(
1910        &self,
1911        reader: &mut R,
1912        path: &str,
1913        part_number: u32,
1914        upload_id: &str,
1915        content_type: &str,
1916    ) -> Result<Part, S3Error> {
1917        let chunk = crate::utils::read_chunk(reader)?;
1918        self.put_multipart_chunk(&chunk, path, part_number, upload_id, content_type)
1919    }
1920
1921    /// Upload a buffered multipart chunk to s3 using a previously initiated multipart upload
1922    #[maybe_async::async_impl]
1923    pub async fn put_multipart_chunk(
1924        &self,
1925        chunk: Vec<u8>,
1926        path: &str,
1927        part_number: u32,
1928        upload_id: &str,
1929        content_type: &str,
1930    ) -> Result<Part, S3Error> {
1931        let command = Command::PutObject {
1932            // part_number,
1933            content: &chunk,
1934            multipart: Some(Multipart::new(part_number, upload_id)), // upload_id: &msg.upload_id,
1935            custom_headers: None,
1936            content_type,
1937        };
1938        let request = RequestImpl::new(self, path, command).await?;
1939        let response_data = request.response_data(true).await?;
1940        if !(200..300).contains(&response_data.status_code()) {
1941            // if chunk upload failed - abort the upload
1942            match self.abort_upload(path, upload_id).await {
1943                Ok(_) => {
1944                    return Err(error_from_response_data(response_data)?);
1945                }
1946                Err(error) => {
1947                    return Err(error);
1948                }
1949            }
1950        }
1951        let etag = response_data.as_str()?;
1952        Ok(Part {
1953            etag: etag.to_string(),
1954            part_number,
1955        })
1956    }
1957
1958    #[maybe_async::sync_impl]
1959    pub fn put_multipart_chunk(
1960        &self,
1961        chunk: &[u8],
1962        path: &str,
1963        part_number: u32,
1964        upload_id: &str,
1965        content_type: &str,
1966    ) -> Result<Part, S3Error> {
1967        let command = Command::PutObject {
1968            // part_number,
1969            content: chunk,
1970            multipart: Some(Multipart::new(part_number, upload_id)), // upload_id: &msg.upload_id,
1971            custom_headers: None,
1972            content_type,
1973        };
1974        let request = RequestImpl::new(self, path, command)?;
1975        let response_data = request.response_data(true)?;
1976        if !(200..300).contains(&response_data.status_code()) {
1977            // if chunk upload failed - abort the upload
1978            match self.abort_upload(path, upload_id) {
1979                Ok(_) => {
1980                    return Err(error_from_response_data(response_data)?);
1981                }
1982                Err(error) => {
1983                    return Err(error);
1984                }
1985            }
1986        }
1987        let etag = response_data.as_str()?;
1988        Ok(Part {
1989            etag: etag.to_string(),
1990            part_number,
1991        })
1992    }
1993
1994    /// Completes a previously initiated multipart upload, with optional final data chunks
1995    #[maybe_async::async_impl]
1996    pub async fn complete_multipart_upload(
1997        &self,
1998        path: &str,
1999        upload_id: &str,
2000        parts: Vec<Part>,
2001    ) -> Result<ResponseData, S3Error> {
2002        let data = CompleteMultipartUploadData { parts };
2003        let complete = Command::CompleteMultipartUpload { upload_id, data };
2004        let complete_request = RequestImpl::new(self, path, complete).await?;
2005        complete_request.response_data(false).await
2006    }
2007
2008    #[maybe_async::sync_impl]
2009    pub fn complete_multipart_upload(
2010        &self,
2011        path: &str,
2012        upload_id: &str,
2013        parts: Vec<Part>,
2014    ) -> Result<ResponseData, S3Error> {
2015        let data = CompleteMultipartUploadData { parts };
2016        let complete = Command::CompleteMultipartUpload { upload_id, data };
2017        let complete_request = RequestImpl::new(self, path, complete)?;
2018        complete_request.response_data(false)
2019    }
2020
2021    /// Get Bucket location.
2022    ///
2023    /// # Example:
2024    ///
2025    /// ```no_run
2026    /// use s3::bucket::Bucket;
2027    /// use s3::creds::Credentials;
2028    /// use anyhow::Result;
2029    ///
2030    /// # #[tokio::main]
2031    /// # async fn main() -> Result<()> {
2032    ///
2033    /// let bucket_name = "rust-s3-test";
2034    /// let region = "us-east-1".parse()?;
2035    /// let credentials = Credentials::default()?;
2036    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
2037    ///
2038    /// // Async variant with `tokio` or `async-std` features
2039    /// let (region, status_code) = bucket.location().await?;
2040    ///
2041    /// // `sync` feature will produce an identical method
2042    /// #[cfg(feature = "sync")]
2043    /// let (region, status_code) = bucket.location()?;
2044    ///
2045    /// // Blocking variant, generated with `blocking` feature in combination
2046    /// // with `tokio` or `async-std` features.
2047    /// #[cfg(feature = "blocking")]
2048    /// let (region, status_code) = bucket.location_blocking()?;
2049    /// #
2050    /// # Ok(())
2051    /// # }
2052    /// ```
2053    #[maybe_async::maybe_async]
2054    pub async fn location(&self) -> Result<(Region, u16), S3Error> {
2055        let request = RequestImpl::new(self, "?location", Command::GetBucketLocation).await?;
2056        let response_data = request.response_data(false).await?;
2057        let region_string = String::from_utf8_lossy(response_data.as_slice());
2058        let region = match quick_xml::de::from_reader(region_string.as_bytes()) {
2059            Ok(r) => {
2060                let location_result: BucketLocationResult = r;
2061                location_result.region.parse()?
2062            }
2063            Err(e) => {
2064                if response_data.status_code() == 200 {
2065                    Region::Custom {
2066                        region: "Custom".to_string(),
2067                        endpoint: "".to_string(),
2068                    }
2069                } else {
2070                    Region::Custom {
2071                        region: format!("Error encountered : {}", e),
2072                        endpoint: "".to_string(),
2073                    }
2074                }
2075            }
2076        };
2077        Ok((region, response_data.status_code()))
2078    }
2079
2080    /// Delete file from an S3 path.
2081    ///
2082    /// # Example:
2083    ///
2084    /// ```no_run
2085    /// use s3::bucket::Bucket;
2086    /// use s3::creds::Credentials;
2087    /// use anyhow::Result;
2088    ///
2089    /// # #[tokio::main]
2090    /// # async fn main() -> Result<()> {
2091    ///
2092    /// let bucket_name = "rust-s3-test";
2093    /// let region = "us-east-1".parse()?;
2094    /// let credentials = Credentials::default()?;
2095    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
2096    ///
2097    /// // Async variant with `tokio` or `async-std` features
2098    /// let response_data = bucket.delete_object("/test.file").await?;
2099    ///
2100    /// // `sync` feature will produce an identical method
2101    /// #[cfg(feature = "sync")]
2102    /// let response_data = bucket.delete_object("/test.file")?;
2103    ///
2104    /// // Blocking variant, generated with `blocking` feature in combination
2105    /// // with `tokio` or `async-std` features.
2106    /// #[cfg(feature = "blocking")]
2107    /// let response_data = bucket.delete_object_blocking("/test.file")?;
2108    /// #
2109    /// # Ok(())
2110    /// # }
2111    /// ```
2112    #[maybe_async::maybe_async]
2113    pub async fn delete_object<S: AsRef<str>>(&self, path: S) -> Result<ResponseData, S3Error> {
2114        let command = Command::DeleteObject;
2115        let request = RequestImpl::new(self, path.as_ref(), command).await?;
2116        request.response_data(false).await
2117    }
2118
2119    /// Head object from S3.
2120    ///
2121    /// # Example:
2122    ///
2123    /// ```no_run
2124    /// use s3::bucket::Bucket;
2125    /// use s3::creds::Credentials;
2126    /// use anyhow::Result;
2127    ///
2128    /// # #[tokio::main]
2129    /// # async fn main() -> Result<()> {
2130    ///
2131    /// let bucket_name = "rust-s3-test";
2132    /// let region = "us-east-1".parse()?;
2133    /// let credentials = Credentials::default()?;
2134    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
2135    ///
2136    /// // Async variant with `tokio` or `async-std` features
2137    /// let (head_object_result, code) = bucket.head_object("/test.png").await?;
2138    ///
2139    /// // `sync` feature will produce an identical method
2140    /// #[cfg(feature = "sync")]
2141    /// let (head_object_result, code) = bucket.head_object("/test.png")?;
2142    ///
2143    /// // Blocking variant, generated with `blocking` feature in combination
2144    /// // with `tokio` or `async-std` features.
2145    /// #[cfg(feature = "blocking")]
2146    /// let (head_object_result, code) = bucket.head_object_blocking("/test.png")?;
2147    /// #
2148    /// # Ok(())
2149    /// # }
2150    /// ```
2151    #[maybe_async::maybe_async]
2152    pub async fn head_object<S: AsRef<str>>(
2153        &self,
2154        path: S,
2155    ) -> Result<(HeadObjectResult, u16), S3Error> {
2156        let command = Command::HeadObject;
2157        let request = RequestImpl::new(self, path.as_ref(), command).await?;
2158        let (headers, status) = request.response_header().await?;
2159        let header_object = HeadObjectResult::from(&headers);
2160        Ok((header_object, status))
2161    }
2162
2163    /// Put into an S3 bucket, with explicit content-type.
2164    ///
2165    /// # Example:
2166    ///
2167    /// ```no_run
2168    /// use s3::bucket::Bucket;
2169    /// use s3::creds::Credentials;
2170    /// use anyhow::Result;
2171    ///
2172    /// # #[tokio::main]
2173    /// # async fn main() -> Result<()> {
2174    ///
2175    /// let bucket_name = "rust-s3-test";
2176    /// let region = "us-east-1".parse()?;
2177    /// let credentials = Credentials::default()?;
2178    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
2179    /// let content = "I want to go to S3".as_bytes();
2180    ///
2181    /// // Async variant with `tokio` or `async-std` features
2182    /// let response_data = bucket.put_object_with_content_type("/test.file", content, "text/plain").await?;
2183    ///
2184    /// // `sync` feature will produce an identical method
2185    /// #[cfg(feature = "sync")]
2186    /// let response_data = bucket.put_object_with_content_type("/test.file", content, "text/plain")?;
2187    ///
2188    /// // Blocking variant, generated with `blocking` feature in combination
2189    /// // with `tokio` or `async-std` features.
2190    /// #[cfg(feature = "blocking")]
2191    /// let response_data = bucket.put_object_with_content_type_blocking("/test.file", content, "text/plain")?;
2192    /// #
2193    /// # Ok(())
2194    /// # }
2195    /// ```
2196    #[maybe_async::maybe_async]
2197    pub async fn put_object_with_content_type<S: AsRef<str>>(
2198        &self,
2199        path: S,
2200        content: &[u8],
2201        content_type: &str,
2202    ) -> Result<ResponseData, S3Error> {
2203        let command = Command::PutObject {
2204            content,
2205            content_type,
2206            custom_headers: None,
2207            multipart: None,
2208        };
2209        let request = RequestImpl::new(self, path.as_ref(), command).await?;
2210        request.response_data(true).await
2211    }
2212
2213    /// Put into an S3 bucket.
2214    ///
2215    /// # Example:
2216    ///
2217    /// ```no_run
2218    /// use s3::bucket::Bucket;
2219    /// use s3::creds::Credentials;
2220    /// use anyhow::Result;
2221    ///
2222    /// # #[tokio::main]
2223    /// # async fn main() -> Result<()> {
2224    ///
2225    /// let bucket_name = "rust-s3-test";
2226    /// let region = "us-east-1".parse()?;
2227    /// let credentials = Credentials::default()?;
2228    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
2229    /// let content = "I want to go to S3".as_bytes();
2230    ///
2231    /// // Async variant with `tokio` or `async-std` features
2232    /// let response_data = bucket.put_object("/test.file", content).await?;
2233    ///
2234    /// // `sync` feature will produce an identical method
2235    /// #[cfg(feature = "sync")]
2236    /// let response_data = bucket.put_object("/test.file", content)?;
2237    ///
2238    /// // Blocking variant, generated with `blocking` feature in combination
2239    /// // with `tokio` or `async-std` features.
2240    /// #[cfg(feature = "blocking")]
2241    /// let response_data = bucket.put_object_blocking("/test.file", content)?;
2242    /// #
2243    /// # Ok(())
2244    /// # }
2245    /// ```
2246    #[maybe_async::maybe_async]
2247    pub async fn put_object<S: AsRef<str>>(
2248        &self,
2249        path: S,
2250        content: &[u8],
2251    ) -> Result<ResponseData, S3Error> {
2252        self.put_object_with_content_type(path, content, "application/octet-stream")
2253            .await
2254    }
2255
2256    /// Create a builder for PUT object operations with custom options
2257    ///
2258    /// This method returns a builder that allows configuring various options
2259    /// for the PUT operation including headers, content type, and metadata.
2260    ///
2261    /// # Example:
2262    ///
2263    /// ```no_run
2264    /// use s3::bucket::Bucket;
2265    /// use s3::creds::Credentials;
2266    /// use anyhow::Result;
2267    ///
2268    /// # #[tokio::main]
2269    /// # async fn main() -> Result<()> {
2270    ///
2271    /// let bucket = Bucket::new("my-bucket", "us-east-1".parse()?, Credentials::default()?)?;
2272    ///
2273    /// // Upload with custom headers using builder pattern
2274    /// let response = bucket.put_object_builder("/my-file.txt", b"Hello, World!")
2275    ///     .with_content_type("text/plain")
2276    ///     .with_cache_control("public, max-age=3600")?
2277    ///     .with_metadata("author", "john-doe")?
2278    ///     .execute()
2279    ///     .await?;
2280    /// #
2281    /// # Ok(())
2282    /// # }
2283    /// ```
2284    pub fn put_object_builder<S: AsRef<str>>(
2285        &self,
2286        path: S,
2287        content: &[u8],
2288    ) -> crate::put_object_request::PutObjectRequest<'_> {
2289        crate::put_object_request::PutObjectRequest::new(self, path, content)
2290    }
2291
2292    fn _tags_xml<S: AsRef<str>>(&self, tags: &[(S, S)]) -> String {
2293        let mut s = String::new();
2294        let content = tags
2295            .iter()
2296            .map(|(name, value)| {
2297                format!(
2298                    "<Tag><Key>{}</Key><Value>{}</Value></Tag>",
2299                    name.as_ref(),
2300                    value.as_ref()
2301                )
2302            })
2303            .fold(String::new(), |mut a, b| {
2304                a.push_str(b.as_str());
2305                a
2306            });
2307        s.push_str("<Tagging><TagSet>");
2308        s.push_str(&content);
2309        s.push_str("</TagSet></Tagging>");
2310        s
2311    }
2312
2313    /// Tag an S3 object.
2314    ///
2315    /// # Example:
2316    ///
2317    /// ```no_run
2318    /// use s3::bucket::Bucket;
2319    /// use s3::creds::Credentials;
2320    /// use anyhow::Result;
2321    ///
2322    /// # #[tokio::main]
2323    /// # async fn main() -> Result<()> {
2324    ///
2325    /// let bucket_name = "rust-s3-test";
2326    /// let region = "us-east-1".parse()?;
2327    /// let credentials = Credentials::default()?;
2328    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
2329    ///
2330    /// // Async variant with `tokio` or `async-std` features
2331    /// let response_data = bucket.put_object_tagging("/test.file", &[("Tag1", "Value1"), ("Tag2", "Value2")]).await?;
2332    ///
2333    /// // `sync` feature will produce an identical method
2334    /// #[cfg(feature = "sync")]
2335    /// let response_data = bucket.put_object_tagging("/test.file", &[("Tag1", "Value1"), ("Tag2", "Value2")])?;
2336    ///
2337    /// // Blocking variant, generated with `blocking` feature in combination
2338    /// // with `tokio` or `async-std` features.
2339    /// #[cfg(feature = "blocking")]
2340    /// let response_data = bucket.put_object_tagging_blocking("/test.file", &[("Tag1", "Value1"), ("Tag2", "Value2")])?;
2341    /// #
2342    /// # Ok(())
2343    /// # }
2344    /// ```
2345    #[maybe_async::maybe_async]
2346    pub async fn put_object_tagging<S: AsRef<str>>(
2347        &self,
2348        path: &str,
2349        tags: &[(S, S)],
2350    ) -> Result<ResponseData, S3Error> {
2351        let content = self._tags_xml(tags);
2352        let command = Command::PutObjectTagging { tags: &content };
2353        let request = RequestImpl::new(self, path, command).await?;
2354        request.response_data(false).await
2355    }
2356
2357    /// Delete tags from an S3 object.
2358    ///
2359    /// # Example:
2360    ///
2361    /// ```no_run
2362    /// use s3::bucket::Bucket;
2363    /// use s3::creds::Credentials;
2364    /// use anyhow::Result;
2365    ///
2366    /// # #[tokio::main]
2367    /// # async fn main() -> Result<()> {
2368    ///
2369    /// let bucket_name = "rust-s3-test";
2370    /// let region = "us-east-1".parse()?;
2371    /// let credentials = Credentials::default()?;
2372    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
2373    ///
2374    /// // Async variant with `tokio` or `async-std` features
2375    /// let response_data = bucket.delete_object_tagging("/test.file").await?;
2376    ///
2377    /// // `sync` feature will produce an identical method
2378    /// #[cfg(feature = "sync")]
2379    /// let response_data = bucket.delete_object_tagging("/test.file")?;
2380    ///
2381    /// // Blocking variant, generated with `blocking` feature in combination
2382    /// // with `tokio` or `async-std` features.
2383    /// #[cfg(feature = "blocking")]
2384    /// let response_data = bucket.delete_object_tagging_blocking("/test.file")?;
2385    /// #
2386    /// # Ok(())
2387    /// # }
2388    /// ```
2389    #[maybe_async::maybe_async]
2390    pub async fn delete_object_tagging<S: AsRef<str>>(
2391        &self,
2392        path: S,
2393    ) -> Result<ResponseData, S3Error> {
2394        let command = Command::DeleteObjectTagging;
2395        let request = RequestImpl::new(self, path.as_ref(), command).await?;
2396        request.response_data(false).await
2397    }
2398
2399    /// Retrieve an S3 object list of tags.
2400    ///
2401    /// # Example:
2402    ///
2403    /// ```no_run
2404    /// use s3::bucket::Bucket;
2405    /// use s3::creds::Credentials;
2406    /// use anyhow::Result;
2407    ///
2408    /// # #[tokio::main]
2409    /// # async fn main() -> Result<()> {
2410    ///
2411    /// let bucket_name = "rust-s3-test";
2412    /// let region = "us-east-1".parse()?;
2413    /// let credentials = Credentials::default()?;
2414    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
2415    ///
2416    /// // Async variant with `tokio` or `async-std` features
2417    /// let response_data = bucket.get_object_tagging("/test.file").await?;
2418    ///
2419    /// // `sync` feature will produce an identical method
2420    /// #[cfg(feature = "sync")]
2421    /// let response_data = bucket.get_object_tagging("/test.file")?;
2422    ///
2423    /// // Blocking variant, generated with `blocking` feature in combination
2424    /// // with `tokio` or `async-std` features.
2425    /// #[cfg(feature = "blocking")]
2426    /// let response_data = bucket.get_object_tagging_blocking("/test.file")?;
2427    /// #
2428    /// # Ok(())
2429    /// # }
2430    /// ```
2431    #[cfg(feature = "tags")]
2432    #[maybe_async::maybe_async]
2433    pub async fn get_object_tagging<S: AsRef<str>>(
2434        &self,
2435        path: S,
2436    ) -> Result<(Vec<Tag>, u16), S3Error> {
2437        let command = Command::GetObjectTagging {};
2438        let request = RequestImpl::new(self, path.as_ref(), command).await?;
2439        let result = request.response_data(false).await?;
2440
2441        let mut tags = Vec::new();
2442
2443        if result.status_code() == 200 {
2444            let result_string = String::from_utf8_lossy(result.as_slice());
2445
2446            // Add namespace if it doesn't exist
2447            let ns = "http://s3.amazonaws.com/doc/2006-03-01/";
2448            let result_string =
2449                if let Err(minidom::Error::MissingNamespace) = result_string.parse::<Element>() {
2450                    result_string
2451                        .replace("<Tagging>", &format!("<Tagging xmlns=\"{}\">", ns))
2452                        .into()
2453                } else {
2454                    result_string
2455                };
2456
2457            if let Ok(tagging) = result_string.parse::<Element>() {
2458                for tag_set in tagging.children() {
2459                    if tag_set.is("TagSet", ns) {
2460                        for tag in tag_set.children() {
2461                            if tag.is("Tag", ns) {
2462                                let key = if let Some(element) = tag.get_child("Key", ns) {
2463                                    element.text()
2464                                } else {
2465                                    "Could not parse Key from Tag".to_string()
2466                                };
2467                                let value = if let Some(element) = tag.get_child("Value", ns) {
2468                                    element.text()
2469                                } else {
2470                                    "Could not parse Values from Tag".to_string()
2471                                };
2472                                tags.push(Tag { key, value });
2473                            }
2474                        }
2475                    }
2476                }
2477            }
2478        }
2479
2480        Ok((tags, result.status_code()))
2481    }
2482
2483    #[maybe_async::maybe_async]
2484    pub async fn list_page(
2485        &self,
2486        prefix: String,
2487        delimiter: Option<String>,
2488        continuation_token: Option<String>,
2489        start_after: Option<String>,
2490        max_keys: Option<usize>,
2491    ) -> Result<(ListBucketResult, u16), S3Error> {
2492        let command = if self.listobjects_v2 {
2493            Command::ListObjectsV2 {
2494                prefix,
2495                delimiter,
2496                continuation_token,
2497                start_after,
2498                max_keys,
2499            }
2500        } else {
2501            // In the v1 ListObjects request, there is only one "marker"
2502            // field that serves as both the initial starting position,
2503            // and as the continuation token.
2504            Command::ListObjects {
2505                prefix,
2506                delimiter,
2507                marker: std::cmp::max(continuation_token, start_after),
2508                max_keys,
2509            }
2510        };
2511        let request = RequestImpl::new(self, "/", command).await?;
2512        let response_data = request.response_data(false).await?;
2513        let list_bucket_result = quick_xml::de::from_reader(response_data.as_slice())?;
2514
2515        Ok((list_bucket_result, response_data.status_code()))
2516    }
2517
2518    /// List the contents of an S3 bucket.
2519    ///
2520    /// # Example:
2521    ///
2522    /// ```no_run
2523    /// use s3::bucket::Bucket;
2524    /// use s3::creds::Credentials;
2525    /// use anyhow::Result;
2526    ///
2527    /// # #[tokio::main]
2528    /// # async fn main() -> Result<()> {
2529    ///
2530    /// let bucket_name = "rust-s3-test";
2531    /// let region = "us-east-1".parse()?;
2532    /// let credentials = Credentials::default()?;
2533    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
2534    ///
2535    /// // Async variant with `tokio` or `async-std` features
2536    /// let results = bucket.list("/".to_string(), Some("/".to_string())).await?;
2537    ///
2538    /// // `sync` feature will produce an identical method
2539    /// #[cfg(feature = "sync")]
2540    /// let results = bucket.list("/".to_string(), Some("/".to_string()))?;
2541    ///
2542    /// // Blocking variant, generated with `blocking` feature in combination
2543    /// // with `tokio` or `async-std` features.
2544    /// #[cfg(feature = "blocking")]
2545    /// let results = bucket.list_blocking("/".to_string(), Some("/".to_string()))?;
2546    /// #
2547    /// # Ok(())
2548    /// # }
2549    /// ```
2550    #[maybe_async::maybe_async]
2551    #[allow(clippy::assigning_clones)]
2552    pub async fn list(
2553        &self,
2554        prefix: String,
2555        delimiter: Option<String>,
2556    ) -> Result<Vec<ListBucketResult>, S3Error> {
2557        let the_bucket = self.to_owned();
2558        let mut results = Vec::new();
2559        let mut continuation_token = None;
2560
2561        loop {
2562            let (list_bucket_result, _) = the_bucket
2563                .list_page(
2564                    prefix.clone(),
2565                    delimiter.clone(),
2566                    continuation_token,
2567                    None,
2568                    None,
2569                )
2570                .await?;
2571            continuation_token = list_bucket_result.next_continuation_token.clone();
2572            results.push(list_bucket_result);
2573            if continuation_token.is_none() {
2574                break;
2575            }
2576        }
2577
2578        Ok(results)
2579    }
2580
2581    #[maybe_async::maybe_async]
2582    pub async fn list_multiparts_uploads_page(
2583        &self,
2584        prefix: Option<&str>,
2585        delimiter: Option<&str>,
2586        key_marker: Option<String>,
2587        max_uploads: Option<usize>,
2588    ) -> Result<(ListMultipartUploadsResult, u16), S3Error> {
2589        let command = Command::ListMultipartUploads {
2590            prefix,
2591            delimiter,
2592            key_marker,
2593            max_uploads,
2594        };
2595        let request = RequestImpl::new(self, "/", command).await?;
2596        let response_data = request.response_data(false).await?;
2597        let list_bucket_result = quick_xml::de::from_reader(response_data.as_slice())?;
2598
2599        Ok((list_bucket_result, response_data.status_code()))
2600    }
2601
2602    /// List the ongoing multipart uploads of an S3 bucket. This may be useful to cleanup failed
2603    /// uploads, together with [`crate::bucket::Bucket::abort_upload`].
2604    ///
2605    /// # Example:
2606    ///
2607    /// ```no_run
2608    /// use s3::bucket::Bucket;
2609    /// use s3::creds::Credentials;
2610    /// use anyhow::Result;
2611    ///
2612    /// # #[tokio::main]
2613    /// # async fn main() -> Result<()> {
2614    ///
2615    /// let bucket_name = "rust-s3-test";
2616    /// let region = "us-east-1".parse()?;
2617    /// let credentials = Credentials::default()?;
2618    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
2619    ///
2620    /// // Async variant with `tokio` or `async-std` features
2621    /// let results = bucket.list_multiparts_uploads(Some("/"), Some("/")).await?;
2622    ///
2623    /// // `sync` feature will produce an identical method
2624    /// #[cfg(feature = "sync")]
2625    /// let results = bucket.list_multiparts_uploads(Some("/"), Some("/"))?;
2626    ///
2627    /// // Blocking variant, generated with `blocking` feature in combination
2628    /// // with `tokio` or `async-std` features.
2629    /// #[cfg(feature = "blocking")]
2630    /// let results = bucket.list_multiparts_uploads_blocking(Some("/"), Some("/"))?;
2631    /// #
2632    /// # Ok(())
2633    /// # }
2634    /// ```
2635    #[maybe_async::maybe_async]
2636    #[allow(clippy::assigning_clones)]
2637    pub async fn list_multiparts_uploads(
2638        &self,
2639        prefix: Option<&str>,
2640        delimiter: Option<&str>,
2641    ) -> Result<Vec<ListMultipartUploadsResult>, S3Error> {
2642        let the_bucket = self.to_owned();
2643        let mut results = Vec::new();
2644        let mut next_marker: Option<String> = None;
2645
2646        loop {
2647            let (list_multiparts_uploads_result, _) = the_bucket
2648                .list_multiparts_uploads_page(prefix, delimiter, next_marker, None)
2649                .await?;
2650
2651            let is_truncated = list_multiparts_uploads_result.is_truncated;
2652
2653            next_marker = list_multiparts_uploads_result.next_marker.clone();
2654            results.push(list_multiparts_uploads_result);
2655
2656            if !is_truncated {
2657                break;
2658            }
2659        }
2660
2661        Ok(results)
2662    }
2663
2664    /// Abort a running multipart upload.
2665    ///
2666    /// # Example:
2667    ///
2668    /// ```no_run
2669    /// use s3::bucket::Bucket;
2670    /// use s3::creds::Credentials;
2671    /// use anyhow::Result;
2672    ///
2673    /// # #[tokio::main]
2674    /// # async fn main() -> Result<()> {
2675    ///
2676    /// let bucket_name = "rust-s3-test";
2677    /// let region = "us-east-1".parse()?;
2678    /// let credentials = Credentials::default()?;
2679    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
2680    ///
2681    /// // Async variant with `tokio` or `async-std` features
2682    /// let results = bucket.abort_upload("/some/file.txt", "ZDFjM2I0YmEtMzU3ZC00OTQ1LTlkNGUtMTgxZThjYzIwNjA2").await?;
2683    ///
2684    /// // `sync` feature will produce an identical method
2685    /// #[cfg(feature = "sync")]
2686    /// let results = bucket.abort_upload("/some/file.txt", "ZDFjM2I0YmEtMzU3ZC00OTQ1LTlkNGUtMTgxZThjYzIwNjA2")?;
2687    ///
2688    /// // Blocking variant, generated with `blocking` feature in combination
2689    /// // with `tokio` or `async-std` features.
2690    /// #[cfg(feature = "blocking")]
2691    /// let results = bucket.abort_upload_blocking("/some/file.txt", "ZDFjM2I0YmEtMzU3ZC00OTQ1LTlkNGUtMTgxZThjYzIwNjA2")?;
2692    /// #
2693    /// # Ok(())
2694    /// # }
2695    /// ```
2696    #[maybe_async::maybe_async]
2697    pub async fn abort_upload(&self, key: &str, upload_id: &str) -> Result<(), S3Error> {
2698        let abort = Command::AbortMultipartUpload { upload_id };
2699        let abort_request = RequestImpl::new(self, key, abort).await?;
2700        let response_data = abort_request.response_data(false).await?;
2701
2702        if (200..300).contains(&response_data.status_code()) {
2703            Ok(())
2704        } else {
2705            let utf8_content = String::from_utf8(response_data.as_slice().to_vec())?;
2706            Err(S3Error::HttpFailWithBody(
2707                response_data.status_code(),
2708                utf8_content,
2709            ))
2710        }
2711    }
2712
2713    /// Get path_style field of the Bucket struct
2714    pub fn is_path_style(&self) -> bool {
2715        self.path_style
2716    }
2717
2718    /// Get negated path_style field of the Bucket struct
2719    pub fn is_subdomain_style(&self) -> bool {
2720        !self.path_style
2721    }
2722
2723    /// Configure bucket to use path-style urls and headers
2724    pub fn set_path_style(&mut self) {
2725        self.path_style = true;
2726    }
2727
2728    /// Configure bucket to use subdomain style urls and headers \[default\]
2729    pub fn set_subdomain_style(&mut self) {
2730        self.path_style = false;
2731    }
2732
2733    /// Configure bucket to apply this request timeout to all HTTP
2734    /// requests, or no (infinity) timeout if `None`.  Defaults to
2735    /// 30 seconds.
2736    ///
2737    /// Only the [`attohttpc`] and the [`hyper`] backends obey this option;
2738    /// async code may instead await with a timeout.
2739    pub fn set_request_timeout(&mut self, timeout: Option<Duration>) {
2740        self.request_timeout = timeout;
2741    }
2742
2743    /// Configure bucket to use the older ListObjects API
2744    ///
2745    /// If your provider doesn't support the ListObjectsV2 interface, set this to
2746    /// use the v1 ListObjects interface instead. This is currently needed at least
2747    /// for Google Cloud Storage.
2748    pub fn set_listobjects_v1(&mut self) {
2749        self.listobjects_v2 = false;
2750    }
2751
2752    /// Configure bucket to use the newer ListObjectsV2 API
2753    pub fn set_listobjects_v2(&mut self) {
2754        self.listobjects_v2 = true;
2755    }
2756
2757    /// Get a reference to the name of the S3 bucket.
2758    pub fn name(&self) -> String {
2759        self.name.to_string()
2760    }
2761
2762    // Get a reference to the hostname of the S3 API endpoint.
2763    pub fn host(&self) -> String {
2764        if self.path_style {
2765            self.path_style_host()
2766        } else {
2767            self.subdomain_style_host()
2768        }
2769    }
2770
2771    pub fn url(&self) -> String {
2772        if self.path_style {
2773            format!(
2774                "{}://{}/{}",
2775                self.scheme(),
2776                self.path_style_host(),
2777                self.name()
2778            )
2779        } else {
2780            format!("{}://{}", self.scheme(), self.subdomain_style_host())
2781        }
2782    }
2783
2784    /// Get a paths-style reference to the hostname of the S3 API endpoint.
2785    pub fn path_style_host(&self) -> String {
2786        self.region.host()
2787    }
2788
2789    pub fn subdomain_style_host(&self) -> String {
2790        format!("{}.{}", self.name, self.region.host())
2791    }
2792
2793    // pub fn self_host(&self) -> String {
2794    //     format!("{}.{}", self.name, self.region.host())
2795    // }
2796
2797    pub fn scheme(&self) -> String {
2798        self.region.scheme()
2799    }
2800
2801    /// Get the region this object will connect to.
2802    pub fn region(&self) -> Region {
2803        self.region.clone()
2804    }
2805
2806    /// Get a reference to the AWS access key.
2807    #[maybe_async::maybe_async]
2808    pub async fn access_key(&self) -> Result<Option<String>, S3Error> {
2809        Ok(self.credentials().await?.access_key)
2810    }
2811
2812    /// Get a reference to the AWS secret key.
2813    #[maybe_async::maybe_async]
2814    pub async fn secret_key(&self) -> Result<Option<String>, S3Error> {
2815        Ok(self.credentials().await?.secret_key)
2816    }
2817
2818    /// Get a reference to the AWS security token.
2819    #[maybe_async::maybe_async]
2820    pub async fn security_token(&self) -> Result<Option<String>, S3Error> {
2821        Ok(self.credentials().await?.security_token)
2822    }
2823
2824    /// Get a reference to the AWS session token.
2825    #[maybe_async::maybe_async]
2826    pub async fn session_token(&self) -> Result<Option<String>, S3Error> {
2827        Ok(self.credentials().await?.session_token)
2828    }
2829
2830    /// Get a reference to the full [`Credentials`](struct.Credentials.html)
2831    /// object used by this `Bucket`.
2832    #[maybe_async::async_impl]
2833    pub async fn credentials(&self) -> Result<Credentials, S3Error> {
2834        Ok(self.credentials.read().await.clone())
2835    }
2836
2837    #[maybe_async::sync_impl]
2838    pub fn credentials(&self) -> Result<Credentials, S3Error> {
2839        match self.credentials.read() {
2840            Ok(credentials) => Ok(credentials.clone()),
2841            Err(_) => Err(S3Error::CredentialsReadLock),
2842        }
2843    }
2844
2845    /// Change the credentials used by the Bucket.
2846    pub fn set_credentials(&mut self, credentials: Credentials) {
2847        self.credentials = Arc::new(RwLock::new(credentials));
2848    }
2849
2850    /// Add an extra header to send with requests to S3.
2851    ///
2852    /// Add an extra header to send with requests. Note that the library
2853    /// already sets a number of headers - headers set with this method will be
2854    /// overridden by the library headers:
2855    ///   * Host
2856    ///   * Content-Type
2857    ///   * Date
2858    ///   * Content-Length
2859    ///   * Authorization
2860    ///   * X-Amz-Content-Sha256
2861    ///   * X-Amz-Date
2862    pub fn add_header(&mut self, key: &str, value: &str) {
2863        self.extra_headers
2864            .insert(HeaderName::from_str(key).unwrap(), value.parse().unwrap());
2865    }
2866
2867    /// Get a reference to the extra headers to be passed to the S3 API.
2868    pub fn extra_headers(&self) -> &HeaderMap {
2869        &self.extra_headers
2870    }
2871
2872    /// Get a mutable reference to the extra headers to be passed to the S3
2873    /// API.
2874    pub fn extra_headers_mut(&mut self) -> &mut HeaderMap {
2875        &mut self.extra_headers
2876    }
2877
2878    /// Add an extra query pair to the URL used for S3 API access.
2879    pub fn add_query(&mut self, key: &str, value: &str) {
2880        self.extra_query.insert(key.into(), value.into());
2881    }
2882
2883    /// Get a reference to the extra query pairs to be passed to the S3 API.
2884    pub fn extra_query(&self) -> &Query {
2885        &self.extra_query
2886    }
2887
2888    /// Get a mutable reference to the extra query pairs to be passed to the S3
2889    /// API.
2890    pub fn extra_query_mut(&mut self) -> &mut Query {
2891        &mut self.extra_query
2892    }
2893
2894    pub fn request_timeout(&self) -> Option<Duration> {
2895        self.request_timeout
2896    }
2897}
2898
2899#[cfg(test)]
2900mod test {
2901
2902    use crate::BucketConfiguration;
2903    use crate::Tag;
2904    use crate::creds::Credentials;
2905    use crate::post_policy::{PostPolicyField, PostPolicyValue};
2906    use crate::region::Region;
2907    use crate::serde_types::CorsConfiguration;
2908    use crate::serde_types::CorsRule;
2909    use crate::{Bucket, PostPolicy};
2910    use http::HeaderMap;
2911    use http::header::HeaderName;
2912    use std::env;
2913
2914    fn init() {
2915        let _ = env_logger::builder().is_test(true).try_init();
2916    }
2917
2918    fn test_aws_credentials() -> Credentials {
2919        Credentials::new(
2920            Some(&env::var("EU_AWS_ACCESS_KEY_ID").unwrap()),
2921            Some(&env::var("EU_AWS_SECRET_ACCESS_KEY").unwrap()),
2922            None,
2923            None,
2924            None,
2925        )
2926        .unwrap()
2927    }
2928
2929    fn test_gc_credentials() -> Credentials {
2930        Credentials::new(
2931            Some(&env::var("GC_ACCESS_KEY_ID").unwrap()),
2932            Some(&env::var("GC_SECRET_ACCESS_KEY").unwrap()),
2933            None,
2934            None,
2935            None,
2936        )
2937        .unwrap()
2938    }
2939
2940    fn test_wasabi_credentials() -> Credentials {
2941        Credentials::new(
2942            Some(&env::var("WASABI_ACCESS_KEY_ID").unwrap()),
2943            Some(&env::var("WASABI_SECRET_ACCESS_KEY").unwrap()),
2944            None,
2945            None,
2946            None,
2947        )
2948        .unwrap()
2949    }
2950
2951    fn test_minio_credentials() -> Credentials {
2952        Credentials::new(
2953            Some(&env::var("MINIO_ACCESS_KEY_ID").unwrap()),
2954            Some(&env::var("MINIO_SECRET_ACCESS_KEY").unwrap()),
2955            None,
2956            None,
2957            None,
2958        )
2959        .unwrap()
2960    }
2961
2962    fn test_digital_ocean_credentials() -> Credentials {
2963        Credentials::new(
2964            Some(&env::var("DIGITAL_OCEAN_ACCESS_KEY_ID").unwrap()),
2965            Some(&env::var("DIGITAL_OCEAN_SECRET_ACCESS_KEY").unwrap()),
2966            None,
2967            None,
2968            None,
2969        )
2970        .unwrap()
2971    }
2972
2973    fn test_r2_credentials() -> Credentials {
2974        Credentials::new(
2975            Some(&env::var("R2_ACCESS_KEY_ID").unwrap()),
2976            Some(&env::var("R2_SECRET_ACCESS_KEY").unwrap()),
2977            None,
2978            None,
2979            None,
2980        )
2981        .unwrap()
2982    }
2983
2984    fn test_aws_bucket() -> Box<Bucket> {
2985        Bucket::new(
2986            "rust-s3-test",
2987            "eu-central-1".parse().unwrap(),
2988            test_aws_credentials(),
2989        )
2990        .unwrap()
2991    }
2992
2993    fn test_wasabi_bucket() -> Box<Bucket> {
2994        Bucket::new(
2995            "rust-s3",
2996            "wa-eu-central-1".parse().unwrap(),
2997            test_wasabi_credentials(),
2998        )
2999        .unwrap()
3000    }
3001
3002    fn test_gc_bucket() -> Box<Bucket> {
3003        let mut bucket = Bucket::new(
3004            "rust-s3",
3005            Region::Custom {
3006                region: "us-east1".to_owned(),
3007                endpoint: "https://storage.googleapis.com".to_owned(),
3008            },
3009            test_gc_credentials(),
3010        )
3011        .unwrap();
3012        bucket.set_listobjects_v1();
3013        bucket
3014    }
3015
3016    fn test_minio_bucket() -> Box<Bucket> {
3017        Bucket::new(
3018            "rust-s3",
3019            Region::Custom {
3020                region: "us-east-1".to_owned(),
3021                endpoint: "http://localhost:9000".to_owned(),
3022            },
3023            test_minio_credentials(),
3024        )
3025        .unwrap()
3026        .with_path_style()
3027    }
3028
3029    #[allow(dead_code)]
3030    fn test_digital_ocean_bucket() -> Box<Bucket> {
3031        Bucket::new("rust-s3", Region::DoFra1, test_digital_ocean_credentials()).unwrap()
3032    }
3033
3034    fn test_r2_bucket() -> Box<Bucket> {
3035        Bucket::new(
3036            "rust-s3",
3037            Region::R2 {
3038                account_id: "f048f3132be36fa1aaa8611992002b3f".to_string(),
3039            },
3040            test_r2_credentials(),
3041        )
3042        .unwrap()
3043    }
3044
3045    fn object(size: u32) -> Vec<u8> {
3046        (0..size).map(|_| 33).collect()
3047    }
3048
3049    #[maybe_async::maybe_async]
3050    async fn put_head_get_delete_object(bucket: Bucket, head: bool) {
3051        let s3_path = "/+test.file";
3052        let non_existant_path = "/+non_existant.file";
3053        let test: Vec<u8> = object(3072);
3054
3055        let response_data = bucket.put_object(s3_path, &test).await.unwrap();
3056        assert_eq!(response_data.status_code(), 200);
3057
3058        // let attributes = bucket
3059        //     .get_object_attributes(s3_path, "904662384344", None)
3060        //     .await
3061        //     .unwrap();
3062
3063        let response_data = bucket.get_object(s3_path).await.unwrap();
3064        assert_eq!(response_data.status_code(), 200);
3065        assert_eq!(test, response_data.as_slice());
3066
3067        let exists = bucket.object_exists(s3_path).await.unwrap();
3068        assert!(exists);
3069
3070        let not_exists = bucket.object_exists(non_existant_path).await.unwrap();
3071        assert!(!not_exists);
3072
3073        let response_data = bucket
3074            .get_object_range(s3_path, 100, Some(1000))
3075            .await
3076            .unwrap();
3077        assert_eq!(response_data.status_code(), 206);
3078        assert_eq!(test[100..1001].to_vec(), response_data.as_slice());
3079        if head {
3080            let (_head_object_result, code) = bucket.head_object(s3_path).await.unwrap();
3081            // println!("{:?}", head_object_result);
3082            assert_eq!(code, 200);
3083        }
3084
3085        // println!("{:?}", head_object_result);
3086        let response_data = bucket.delete_object(s3_path).await.unwrap();
3087        assert_eq!(response_data.status_code(), 204);
3088    }
3089
3090    #[ignore]
3091    #[cfg(feature = "tags")]
3092    #[maybe_async::test(
3093        feature = "sync",
3094        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3095        async(
3096            all(not(feature = "sync"), feature = "with-async-std"),
3097            async_std::test
3098        )
3099    )]
3100    async fn test_tagging_aws() {
3101        let bucket = test_aws_bucket();
3102        let _target_tags = vec![
3103            Tag {
3104                key: "Tag1".to_string(),
3105                value: "Value1".to_string(),
3106            },
3107            Tag {
3108                key: "Tag2".to_string(),
3109                value: "Value2".to_string(),
3110            },
3111        ];
3112        let empty_tags: Vec<Tag> = Vec::new();
3113        let response_data = bucket
3114            .put_object("tagging_test", b"Gimme tags")
3115            .await
3116            .unwrap();
3117        assert_eq!(response_data.status_code(), 200);
3118        let (tags, _code) = bucket.get_object_tagging("tagging_test").await.unwrap();
3119        assert_eq!(tags, empty_tags);
3120        let response_data = bucket
3121            .put_object_tagging("tagging_test", &[("Tag1", "Value1"), ("Tag2", "Value2")])
3122            .await
3123            .unwrap();
3124        assert_eq!(response_data.status_code(), 200);
3125        // This could be eventually consistent now
3126        let (_tags, _code) = bucket.get_object_tagging("tagging_test").await.unwrap();
3127        // assert_eq!(tags, target_tags)
3128        let _response_data = bucket.delete_object("tagging_test").await.unwrap();
3129    }
3130
3131    #[ignore]
3132    #[cfg(feature = "tags")]
3133    #[maybe_async::test(
3134        feature = "sync",
3135        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3136        async(
3137            all(not(feature = "sync"), feature = "with-async-std"),
3138            async_std::test
3139        )
3140    )]
3141    async fn test_tagging_minio() {
3142        let bucket = test_minio_bucket();
3143        let _target_tags = vec![
3144            Tag {
3145                key: "Tag1".to_string(),
3146                value: "Value1".to_string(),
3147            },
3148            Tag {
3149                key: "Tag2".to_string(),
3150                value: "Value2".to_string(),
3151            },
3152        ];
3153        let empty_tags: Vec<Tag> = Vec::new();
3154        let response_data = bucket
3155            .put_object("tagging_test", b"Gimme tags")
3156            .await
3157            .unwrap();
3158        assert_eq!(response_data.status_code(), 200);
3159        let (tags, _code) = bucket.get_object_tagging("tagging_test").await.unwrap();
3160        assert_eq!(tags, empty_tags);
3161        let response_data = bucket
3162            .put_object_tagging("tagging_test", &[("Tag1", "Value1"), ("Tag2", "Value2")])
3163            .await
3164            .unwrap();
3165        assert_eq!(response_data.status_code(), 200);
3166        // This could be eventually consistent now
3167        let (_tags, _code) = bucket.get_object_tagging("tagging_test").await.unwrap();
3168        // assert_eq!(tags, target_tags)
3169        let _response_data = bucket.delete_object("tagging_test").await.unwrap();
3170    }
3171
3172    #[ignore]
3173    #[maybe_async::test(
3174        feature = "sync",
3175        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3176        async(
3177            all(not(feature = "sync"), feature = "with-async-std"),
3178            async_std::test
3179        )
3180    )]
3181    async fn streaming_big_aws_put_head_get_delete_object() {
3182        streaming_test_put_get_delete_big_object(*test_aws_bucket()).await;
3183    }
3184
3185    #[ignore]
3186    #[maybe_async::test(
3187        feature = "sync",
3188        async(
3189            all(
3190                not(feature = "sync"),
3191                not(feature = "tokio-rustls-tls"),
3192                feature = "with-tokio"
3193            ),
3194            tokio::test
3195        ),
3196        async(
3197            all(not(feature = "sync"), feature = "with-async-std"),
3198            async_std::test
3199        )
3200    )]
3201    async fn streaming_big_gc_put_head_get_delete_object() {
3202        streaming_test_put_get_delete_big_object(*test_gc_bucket()).await;
3203    }
3204
3205    #[ignore]
3206    #[maybe_async::test(
3207        feature = "sync",
3208        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3209        async(
3210            all(not(feature = "sync"), feature = "with-async-std"),
3211            async_std::test
3212        )
3213    )]
3214    async fn streaming_big_minio_put_head_get_delete_object() {
3215        streaming_test_put_get_delete_big_object(*test_minio_bucket()).await;
3216    }
3217
3218    #[ignore]
3219    #[maybe_async::test(
3220        feature = "sync",
3221        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3222        async(
3223            all(not(feature = "sync"), feature = "with-async-std"),
3224            async_std::test
3225        )
3226    )]
3227    async fn streaming_big_r2_put_head_get_delete_object() {
3228        streaming_test_put_get_delete_big_object(*test_r2_bucket()).await;
3229    }
3230
3231    // Test multi-part upload
3232    #[maybe_async::maybe_async]
3233    async fn streaming_test_put_get_delete_big_object(bucket: Bucket) {
3234        #[cfg(feature = "with-async-std")]
3235        use async_std::fs::File;
3236        #[cfg(feature = "with-async-std")]
3237        use async_std::io::WriteExt;
3238        #[cfg(feature = "with-async-std")]
3239        use async_std::stream::StreamExt;
3240        #[cfg(feature = "with-tokio")]
3241        use futures_util::StreamExt;
3242        #[cfg(not(any(feature = "with-tokio", feature = "with-async-std")))]
3243        use std::fs::File;
3244        #[cfg(not(any(feature = "with-tokio", feature = "with-async-std")))]
3245        use std::io::Write;
3246        #[cfg(feature = "with-tokio")]
3247        use tokio::fs::File;
3248        #[cfg(feature = "with-tokio")]
3249        use tokio::io::AsyncWriteExt;
3250
3251        init();
3252        let remote_path = "+stream_test_big";
3253        let local_path = "+stream_test_big";
3254        std::fs::remove_file(remote_path).unwrap_or(());
3255        let content: Vec<u8> = object(20_000_000);
3256
3257        let mut file = File::create(local_path).await.unwrap();
3258        file.write_all(&content).await.unwrap();
3259        file.flush().await.unwrap();
3260        let mut reader = File::open(local_path).await.unwrap();
3261
3262        let response = bucket
3263            .put_object_stream(&mut reader, remote_path)
3264            .await
3265            .unwrap();
3266        #[cfg(not(feature = "sync"))]
3267        assert_eq!(response.status_code(), 200);
3268        #[cfg(feature = "sync")]
3269        assert_eq!(response, 200);
3270        let mut writer = Vec::new();
3271        let code = bucket
3272            .get_object_to_writer(remote_path, &mut writer)
3273            .await
3274            .unwrap();
3275        assert_eq!(code, 200);
3276        // assert_eq!(content, writer);
3277        assert_eq!(content.len(), writer.len());
3278        assert_eq!(content.len(), 20_000_000);
3279
3280        #[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
3281        {
3282            let mut response_data_stream = bucket.get_object_stream(remote_path).await.unwrap();
3283
3284            let mut bytes = vec![];
3285
3286            while let Some(chunk) = response_data_stream.bytes().next().await {
3287                bytes.push(chunk)
3288            }
3289            assert_ne!(bytes.len(), 0);
3290        }
3291
3292        let response_data = bucket.delete_object(remote_path).await.unwrap();
3293        assert_eq!(response_data.status_code(), 204);
3294        std::fs::remove_file(local_path).unwrap_or(());
3295    }
3296
3297    #[ignore]
3298    #[maybe_async::test(
3299        feature = "sync",
3300        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3301        async(
3302            all(not(feature = "sync"), feature = "with-async-std"),
3303            async_std::test
3304        )
3305    )]
3306    async fn streaming_aws_put_head_get_delete_object() {
3307        streaming_test_put_get_delete_small_object(test_aws_bucket()).await;
3308    }
3309
3310    #[ignore]
3311    #[maybe_async::test(
3312        feature = "sync",
3313        async(
3314            all(
3315                not(feature = "sync"),
3316                not(feature = "tokio-rustls-tls"),
3317                feature = "with-tokio"
3318            ),
3319            tokio::test
3320        ),
3321        async(
3322            all(not(feature = "sync"), feature = "with-async-std"),
3323            async_std::test
3324        )
3325    )]
3326    async fn streaming_gc_put_head_get_delete_object() {
3327        streaming_test_put_get_delete_small_object(test_gc_bucket()).await;
3328    }
3329
3330    #[ignore]
3331    #[maybe_async::test(
3332        feature = "sync",
3333        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3334        async(
3335            all(not(feature = "sync"), feature = "with-async-std"),
3336            async_std::test
3337        )
3338    )]
3339    async fn streaming_r2_put_head_get_delete_object() {
3340        streaming_test_put_get_delete_small_object(test_r2_bucket()).await;
3341    }
3342
3343    #[ignore]
3344    #[maybe_async::test(
3345        feature = "sync",
3346        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3347        async(
3348            all(not(feature = "sync"), feature = "with-async-std"),
3349            async_std::test
3350        )
3351    )]
3352    async fn streaming_minio_put_head_get_delete_object() {
3353        streaming_test_put_get_delete_small_object(test_minio_bucket()).await;
3354    }
3355
3356    #[maybe_async::maybe_async]
3357    async fn streaming_test_put_get_delete_small_object(bucket: Box<Bucket>) {
3358        init();
3359        let remote_path = "+stream_test_small";
3360        let content: Vec<u8> = object(1000);
3361        #[cfg(feature = "with-tokio")]
3362        let mut reader = std::io::Cursor::new(&content);
3363        #[cfg(feature = "with-async-std")]
3364        let mut reader = async_std::io::Cursor::new(&content);
3365        #[cfg(feature = "sync")]
3366        let mut reader = std::io::Cursor::new(&content);
3367
3368        let response = bucket
3369            .put_object_stream(&mut reader, remote_path)
3370            .await
3371            .unwrap();
3372        #[cfg(not(feature = "sync"))]
3373        assert_eq!(response.status_code(), 200);
3374        #[cfg(feature = "sync")]
3375        assert_eq!(response, 200);
3376        let mut writer = Vec::new();
3377        let code = bucket
3378            .get_object_to_writer(remote_path, &mut writer)
3379            .await
3380            .unwrap();
3381        assert_eq!(code, 200);
3382        assert_eq!(content, writer);
3383
3384        let response_data = bucket.delete_object(remote_path).await.unwrap();
3385        assert_eq!(response_data.status_code(), 204);
3386    }
3387
3388    #[cfg(feature = "blocking")]
3389    fn put_head_get_list_delete_object_blocking(bucket: Bucket) {
3390        let s3_path = "/test_blocking.file";
3391        let s3_path_2 = "/test_blocking.file2";
3392        let s3_path_3 = "/test_blocking.file3";
3393        let test: Vec<u8> = object(3072);
3394
3395        // Test PutObject
3396        let response_data = bucket.put_object_blocking(s3_path, &test).unwrap();
3397        assert_eq!(response_data.status_code(), 200);
3398
3399        // Test GetObject
3400        let response_data = bucket.get_object_blocking(s3_path).unwrap();
3401        assert_eq!(response_data.status_code(), 200);
3402        assert_eq!(test, response_data.as_slice());
3403
3404        // Test GetObject with a range
3405        let response_data = bucket
3406            .get_object_range_blocking(s3_path, 100, Some(1000))
3407            .unwrap();
3408        assert_eq!(response_data.status_code(), 206);
3409        assert_eq!(test[100..1001].to_vec(), response_data.as_slice());
3410
3411        // Test HeadObject
3412        let (head_object_result, code) = bucket.head_object_blocking(s3_path).unwrap();
3413        assert_eq!(code, 200);
3414        assert_eq!(
3415            head_object_result.content_type.unwrap(),
3416            "application/octet-stream".to_owned()
3417        );
3418        // println!("{:?}", head_object_result);
3419
3420        // Put some additional objects, so that we can test ListObjects
3421        let response_data = bucket.put_object_blocking(s3_path_2, &test).unwrap();
3422        assert_eq!(response_data.status_code(), 200);
3423        let response_data = bucket.put_object_blocking(s3_path_3, &test).unwrap();
3424        assert_eq!(response_data.status_code(), 200);
3425
3426        // Test ListObjects, with continuation
3427        let (result, code) = bucket
3428            .list_page_blocking(
3429                "test_blocking.".to_string(),
3430                Some("/".to_string()),
3431                None,
3432                None,
3433                Some(2),
3434            )
3435            .unwrap();
3436        assert_eq!(code, 200);
3437        assert_eq!(result.contents.len(), 2);
3438        assert_eq!(result.contents[0].key, s3_path[1..]);
3439        assert_eq!(result.contents[1].key, s3_path_2[1..]);
3440
3441        let cont_token = result.next_continuation_token.unwrap();
3442
3443        let (result, code) = bucket
3444            .list_page_blocking(
3445                "test_blocking.".to_string(),
3446                Some("/".to_string()),
3447                Some(cont_token),
3448                None,
3449                Some(2),
3450            )
3451            .unwrap();
3452        assert_eq!(code, 200);
3453        assert_eq!(result.contents.len(), 1);
3454        assert_eq!(result.contents[0].key, s3_path_3[1..]);
3455        assert!(result.next_continuation_token.is_none());
3456
3457        // cleanup (and test Delete)
3458        let response_data = bucket.delete_object_blocking(s3_path).unwrap();
3459        assert_eq!(code, 200);
3460        let response_data = bucket.delete_object_blocking(s3_path_2).unwrap();
3461        assert_eq!(code, 200);
3462        let response_data = bucket.delete_object_blocking(s3_path_3).unwrap();
3463        assert_eq!(code, 200);
3464    }
3465
3466    #[ignore]
3467    #[cfg(all(
3468        any(feature = "with-tokio", feature = "with-async-std"),
3469        feature = "blocking"
3470    ))]
3471    #[test]
3472    fn aws_put_head_get_delete_object_blocking() {
3473        put_head_get_list_delete_object_blocking(*test_aws_bucket())
3474    }
3475
3476    #[ignore]
3477    #[cfg(all(
3478        any(feature = "with-tokio", feature = "with-async-std"),
3479        feature = "blocking"
3480    ))]
3481    #[test]
3482    fn gc_put_head_get_delete_object_blocking() {
3483        put_head_get_list_delete_object_blocking(*test_gc_bucket())
3484    }
3485
3486    #[ignore]
3487    #[cfg(all(
3488        any(feature = "with-tokio", feature = "with-async-std"),
3489        feature = "blocking"
3490    ))]
3491    #[test]
3492    fn wasabi_put_head_get_delete_object_blocking() {
3493        put_head_get_list_delete_object_blocking(*test_wasabi_bucket())
3494    }
3495
3496    #[ignore]
3497    #[cfg(all(
3498        any(feature = "with-tokio", feature = "with-async-std"),
3499        feature = "blocking"
3500    ))]
3501    #[test]
3502    fn minio_put_head_get_delete_object_blocking() {
3503        put_head_get_list_delete_object_blocking(*test_minio_bucket())
3504    }
3505
3506    #[ignore]
3507    #[cfg(all(
3508        any(feature = "with-tokio", feature = "with-async-std"),
3509        feature = "blocking"
3510    ))]
3511    #[test]
3512    fn digital_ocean_put_head_get_delete_object_blocking() {
3513        put_head_get_list_delete_object_blocking(*test_digital_ocean_bucket())
3514    }
3515
3516    #[ignore]
3517    #[maybe_async::test(
3518        feature = "sync",
3519        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3520        async(
3521            all(not(feature = "sync"), feature = "with-async-std"),
3522            async_std::test
3523        )
3524    )]
3525    async fn aws_put_head_get_delete_object() {
3526        put_head_get_delete_object(*test_aws_bucket(), true).await;
3527    }
3528
3529    #[ignore]
3530    #[maybe_async::test(
3531        feature = "sync",
3532        async(
3533            all(
3534                not(any(feature = "sync", feature = "tokio-rustls-tls")),
3535                feature = "with-tokio"
3536            ),
3537            tokio::test
3538        ),
3539        async(
3540            all(not(feature = "sync"), feature = "with-async-std"),
3541            async_std::test
3542        )
3543    )]
3544    async fn gc_test_put_head_get_delete_object() {
3545        put_head_get_delete_object(*test_gc_bucket(), true).await;
3546    }
3547
3548    #[ignore]
3549    #[maybe_async::test(
3550        feature = "sync",
3551        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3552        async(
3553            all(not(feature = "sync"), feature = "with-async-std"),
3554            async_std::test
3555        )
3556    )]
3557    async fn wasabi_test_put_head_get_delete_object() {
3558        put_head_get_delete_object(*test_wasabi_bucket(), true).await;
3559    }
3560
3561    #[ignore]
3562    #[maybe_async::test(
3563        feature = "sync",
3564        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3565        async(
3566            all(not(feature = "sync"), feature = "with-async-std"),
3567            async_std::test
3568        )
3569    )]
3570    async fn minio_test_put_head_get_delete_object() {
3571        put_head_get_delete_object(*test_minio_bucket(), true).await;
3572    }
3573
3574    // Keeps failing on tokio-rustls-tls
3575    // #[ignore]
3576    // #[maybe_async::test(
3577    //     feature = "sync",
3578    //     async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3579    //     async(
3580    //         all(not(feature = "sync"), feature = "with-async-std"),
3581    //         async_std::test
3582    //     )
3583    // )]
3584    // async fn digital_ocean_test_put_head_get_delete_object() {
3585    //     put_head_get_delete_object(test_digital_ocean_bucket(), true).await;
3586    // }
3587
3588    #[ignore]
3589    #[maybe_async::test(
3590        feature = "sync",
3591        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3592        async(
3593            all(not(feature = "sync"), feature = "with-async-std"),
3594            async_std::test
3595        )
3596    )]
3597    async fn r2_test_put_head_get_delete_object() {
3598        put_head_get_delete_object(*test_r2_bucket(), false).await;
3599    }
3600
3601    #[maybe_async::test(
3602        feature = "sync",
3603        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3604        async(
3605            all(not(feature = "sync"), feature = "with-async-std"),
3606            async_std::test
3607        )
3608    )]
3609    async fn test_presign_put() {
3610        let s3_path = "/test/test.file";
3611        let bucket = test_minio_bucket();
3612
3613        let mut custom_headers = HeaderMap::new();
3614        custom_headers.insert(
3615            HeaderName::from_static("custom_header"),
3616            "custom_value".parse().unwrap(),
3617        );
3618
3619        let url = bucket
3620            .presign_put(s3_path, 86400, Some(custom_headers), None)
3621            .await
3622            .unwrap();
3623
3624        assert!(url.contains("custom_header%3Bhost"));
3625        assert!(url.contains("/test/test.file"))
3626    }
3627
3628    #[maybe_async::test(
3629        feature = "sync",
3630        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3631        async(
3632            all(not(feature = "sync"), feature = "with-async-std"),
3633            async_std::test
3634        )
3635    )]
3636    async fn test_presign_post() {
3637        use std::borrow::Cow;
3638
3639        let bucket = test_minio_bucket();
3640
3641        // Policy from sample
3642        let policy = PostPolicy::new(86400)
3643            .condition(
3644                PostPolicyField::Key,
3645                PostPolicyValue::StartsWith(Cow::from("user/user1/")),
3646            )
3647            .unwrap();
3648
3649        let data = bucket.presign_post(policy).await.unwrap();
3650
3651        assert_eq!(data.url, "http://localhost:9000/rust-s3");
3652        assert_eq!(data.fields.len(), 6);
3653        assert_eq!(data.dynamic_fields.len(), 1);
3654    }
3655
3656    #[maybe_async::test(
3657        feature = "sync",
3658        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3659        async(
3660            all(not(feature = "sync"), feature = "with-async-std"),
3661            async_std::test
3662        )
3663    )]
3664    async fn test_presign_get() {
3665        let s3_path = "/test/test.file";
3666        let bucket = test_minio_bucket();
3667
3668        let url = bucket.presign_get(s3_path, 86400, None).await.unwrap();
3669        assert!(url.contains("/test/test.file?"))
3670    }
3671
3672    #[maybe_async::test(
3673        feature = "sync",
3674        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3675        async(
3676            all(not(feature = "sync"), feature = "with-async-std"),
3677            async_std::test
3678        )
3679    )]
3680    async fn test_presign_delete() {
3681        let s3_path = "/test/test.file";
3682        let bucket = test_minio_bucket();
3683
3684        let url = bucket.presign_delete(s3_path, 86400).await.unwrap();
3685        assert!(url.contains("/test/test.file?"))
3686    }
3687
3688    #[maybe_async::test(
3689        feature = "sync",
3690        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3691        async(
3692            all(not(feature = "sync"), feature = "with-async-std"),
3693            async_std::test
3694        )
3695    )]
3696    async fn test_presign_url_standard_ports() {
3697        // Test that presigned URLs preserve standard ports in the host header
3698        // This is crucial for signature validation
3699
3700        // Test with HTTP standard port 80
3701        let region_http_80 = Region::Custom {
3702            region: "eu-central-1".to_owned(),
3703            endpoint: "http://minio:80".to_owned(),
3704        };
3705        let credentials = Credentials::new(
3706            Some("test_access_key"),
3707            Some("test_secret_key"),
3708            None,
3709            None,
3710            None,
3711        )
3712        .unwrap();
3713        let bucket_http_80 = Bucket::new("test-bucket", region_http_80, credentials.clone())
3714            .unwrap()
3715            .with_path_style();
3716
3717        let presigned_url_80 = bucket_http_80
3718            .presign_get("/test.file", 3600, None)
3719            .await
3720            .unwrap();
3721        println!("Presigned URL with port 80: {}", presigned_url_80);
3722
3723        // Port 80 MUST be preserved in the URL for signature validation
3724        assert!(
3725            presigned_url_80.starts_with("http://minio:80/"),
3726            "URL must preserve port 80, got: {}",
3727            presigned_url_80
3728        );
3729
3730        // Test with HTTPS standard port 443
3731        let region_https_443 = Region::Custom {
3732            region: "eu-central-1".to_owned(),
3733            endpoint: "https://minio:443".to_owned(),
3734        };
3735        let bucket_https_443 = Bucket::new("test-bucket", region_https_443, credentials.clone())
3736            .unwrap()
3737            .with_path_style();
3738
3739        let presigned_url_443 = bucket_https_443
3740            .presign_get("/test.file", 3600, None)
3741            .await
3742            .unwrap();
3743        println!("Presigned URL with port 443: {}", presigned_url_443);
3744
3745        // Port 443 MUST be preserved in the URL for signature validation
3746        assert!(
3747            presigned_url_443.starts_with("https://minio:443/"),
3748            "URL must preserve port 443, got: {}",
3749            presigned_url_443
3750        );
3751
3752        // Test with non-standard port (should always include port)
3753        let region_http_9000 = Region::Custom {
3754            region: "eu-central-1".to_owned(),
3755            endpoint: "http://minio:9000".to_owned(),
3756        };
3757        let bucket_http_9000 = Bucket::new("test-bucket", region_http_9000, credentials)
3758            .unwrap()
3759            .with_path_style();
3760
3761        let presigned_url_9000 = bucket_http_9000
3762            .presign_get("/test.file", 3600, None)
3763            .await
3764            .unwrap();
3765        assert!(
3766            presigned_url_9000.contains("minio:9000"),
3767            "Non-standard port should be preserved in URL"
3768        );
3769    }
3770
3771    #[maybe_async::test(
3772        feature = "sync",
3773        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3774        async(
3775            all(not(feature = "sync"), feature = "with-async-std"),
3776            async_std::test
3777        )
3778    )]
3779    #[ignore]
3780    async fn test_bucket_create_delete_default_region() {
3781        let config = BucketConfiguration::default();
3782        let response = Bucket::create(
3783            &uuid::Uuid::new_v4().to_string(),
3784            "us-east-1".parse().unwrap(),
3785            test_aws_credentials(),
3786            config,
3787        )
3788        .await
3789        .unwrap();
3790
3791        assert_eq!(&response.response_text, "");
3792
3793        assert_eq!(response.response_code, 200);
3794
3795        let response_code = response.bucket.delete().await.unwrap();
3796        assert!(response_code < 300);
3797    }
3798
3799    #[ignore]
3800    #[maybe_async::test(
3801        feature = "sync",
3802        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3803        async(
3804            all(not(feature = "sync"), feature = "with-async-std"),
3805            async_std::test
3806        )
3807    )]
3808    async fn test_bucket_create_delete_non_default_region() {
3809        let config = BucketConfiguration::default();
3810        let response = Bucket::create(
3811            &uuid::Uuid::new_v4().to_string(),
3812            "eu-central-1".parse().unwrap(),
3813            test_aws_credentials(),
3814            config,
3815        )
3816        .await
3817        .unwrap();
3818
3819        assert_eq!(&response.response_text, "");
3820
3821        assert_eq!(response.response_code, 200);
3822
3823        let response_code = response.bucket.delete().await.unwrap();
3824        assert!(response_code < 300);
3825    }
3826
3827    #[ignore]
3828    #[maybe_async::test(
3829        feature = "sync",
3830        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3831        async(
3832            all(not(feature = "sync"), feature = "with-async-std"),
3833            async_std::test
3834        )
3835    )]
3836    async fn test_bucket_create_delete_non_default_region_public() {
3837        let config = BucketConfiguration::public();
3838        let response = Bucket::create(
3839            &uuid::Uuid::new_v4().to_string(),
3840            "eu-central-1".parse().unwrap(),
3841            test_aws_credentials(),
3842            config,
3843        )
3844        .await
3845        .unwrap();
3846
3847        assert_eq!(&response.response_text, "");
3848
3849        assert_eq!(response.response_code, 200);
3850
3851        let response_code = response.bucket.delete().await.unwrap();
3852        assert!(response_code < 300);
3853    }
3854
3855    #[test]
3856    fn test_tag_has_key_and_value_functions() {
3857        let key = "key".to_owned();
3858        let value = "value".to_owned();
3859        let tag = Tag { key, value };
3860        assert_eq!["key", tag.key()];
3861        assert_eq!["value", tag.value()];
3862    }
3863
3864    #[test]
3865    #[ignore]
3866    fn test_builder_composition() {
3867        use std::time::Duration;
3868
3869        let bucket = Bucket::new(
3870            "test-bucket",
3871            "eu-central-1".parse().unwrap(),
3872            test_aws_credentials(),
3873        )
3874        .unwrap()
3875        .with_request_timeout(Duration::from_secs(10))
3876        .unwrap();
3877
3878        assert_eq!(bucket.request_timeout(), Some(Duration::from_secs(10)));
3879    }
3880
3881    #[maybe_async::test(
3882        feature = "sync",
3883        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3884        async(
3885            all(not(feature = "sync"), feature = "with-async-std"),
3886            async_std::test
3887        )
3888    )]
3889    #[ignore]
3890    async fn test_bucket_cors() {
3891        let bucket = test_aws_bucket();
3892        let rule = CorsRule::new(
3893            None,
3894            vec!["GET".to_string()],
3895            vec!["*".to_string()],
3896            None,
3897            None,
3898            None,
3899        );
3900        let expected_bucket_owner = "904662384344";
3901        let cors_config = CorsConfiguration::new(vec![rule]);
3902        let response = bucket
3903            .put_bucket_cors(expected_bucket_owner, &cors_config)
3904            .await
3905            .unwrap();
3906        assert_eq!(response.status_code(), 200);
3907
3908        let cors_response = bucket.get_bucket_cors(expected_bucket_owner).await.unwrap();
3909        assert_eq!(cors_response, cors_config);
3910
3911        let response = bucket
3912            .delete_bucket_cors(expected_bucket_owner)
3913            .await
3914            .unwrap();
3915        assert_eq!(response.status_code(), 204);
3916    }
3917
3918    #[ignore]
3919    #[cfg(any(feature = "tokio-native-tls", feature = "tokio-rustls-tls"))]
3920    #[maybe_async::test(
3921        feature = "sync",
3922        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3923        async(
3924            all(not(feature = "sync"), feature = "with-async-std"),
3925            async_std::test
3926        )
3927    )]
3928    async fn test_bucket_exists_with_dangerous_config() {
3929        init();
3930
3931        // This test verifies that Bucket::exists() honors the dangerous SSL config
3932        // which allows connections with invalid SSL certificates
3933
3934        // Create a bucket with dangerous config enabled
3935        // Note: This test requires a test environment with self-signed or invalid certs
3936        // For CI, we'll test with a regular bucket but verify the config is preserved
3937
3938        let credentials = test_aws_credentials();
3939        let region = "eu-central-1".parse().unwrap();
3940        let bucket_name = "rust-s3-test";
3941
3942        // Create bucket with dangerous config
3943        let bucket = Bucket::new(bucket_name, region, credentials)
3944            .unwrap()
3945            .with_path_style();
3946
3947        // Set dangerous config (allow invalid certs, allow invalid hostnames)
3948        let bucket = bucket.set_dangereous_config(true, true).unwrap();
3949
3950        // Test that exists() works with the dangerous config
3951        // This should not panic or fail due to SSL certificate issues
3952        let exists_result = bucket.exists().await;
3953
3954        // The bucket should exist (assuming test bucket is set up)
3955        assert!(
3956            exists_result.is_ok(),
3957            "Bucket::exists() failed with dangerous config"
3958        );
3959        let exists = exists_result.unwrap();
3960        assert!(exists, "Test bucket should exist");
3961
3962        // Verify that the dangerous config is preserved in the cloned bucket
3963        // by checking if we can perform other operations
3964        let list_result = bucket.list("".to_string(), Some("/".to_string())).await;
3965        assert!(
3966            list_result.is_ok(),
3967            "List operation should work with dangerous config"
3968        );
3969    }
3970
3971    #[ignore]
3972    #[maybe_async::test(
3973        feature = "sync",
3974        async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3975        async(
3976            all(not(feature = "sync"), feature = "with-async-std"),
3977            async_std::test
3978        )
3979    )]
3980    async fn test_bucket_exists_without_dangerous_config() {
3981        init();
3982
3983        // This test verifies normal behavior without dangerous config
3984        let credentials = test_aws_credentials();
3985        let region = "eu-central-1".parse().unwrap();
3986        let bucket_name = "rust-s3-test";
3987
3988        // Create bucket without dangerous config
3989        let bucket = Bucket::new(bucket_name, region, credentials)
3990            .unwrap()
3991            .with_path_style();
3992
3993        // Test that exists() works normally
3994        let exists_result = bucket.exists().await;
3995        assert!(
3996            exists_result.is_ok(),
3997            "Bucket::exists() should work without dangerous config"
3998        );
3999        let exists = exists_result.unwrap();
4000        assert!(exists, "Test bucket should exist");
4001    }
4002}