1#[cfg(feature = "blocking")]
37use block_on_proc::block_on;
38#[cfg(feature = "tags")]
39use minidom::Element;
40use std::collections::HashMap;
41use std::time::Duration;
42
43use crate::bucket_ops::{BucketConfiguration, CreateBucketResponse};
44use crate::command::{Command, Multipart};
45use crate::creds::Credentials;
46use crate::region::Region;
47#[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
48use crate::request::ResponseDataStream;
49#[cfg(feature = "with-tokio")]
50use crate::request::tokio_backend::ClientOptions;
51#[cfg(feature = "with-tokio")]
52use crate::request::tokio_backend::client;
53use crate::request::{Request as _, ResponseData};
54use std::str::FromStr;
55use std::sync::Arc;
56
57#[cfg(feature = "with-tokio")]
58use tokio::sync::RwLock;
59
60#[cfg(feature = "with-async-std")]
61use async_std::sync::RwLock;
62
63#[cfg(feature = "sync")]
64use std::sync::RwLock;
65
66pub type Query = HashMap<String, String>;
67
68#[cfg(feature = "with-async-std")]
69use crate::request::async_std_backend::SurfRequest as RequestImpl;
70#[cfg(feature = "with-tokio")]
71use crate::request::tokio_backend::ReqwestRequest as RequestImpl;
72
73#[cfg(feature = "with-async-std")]
74use async_std::io::Write as AsyncWrite;
75#[cfg(feature = "with-tokio")]
76use tokio::io::AsyncWrite;
77
78#[cfg(feature = "sync")]
79use crate::request::blocking::AttoRequest as RequestImpl;
80use std::io::Read;
81
82#[cfg(feature = "with-tokio")]
83use tokio::io::AsyncRead;
84
85#[cfg(feature = "with-async-std")]
86use async_std::io::Read as AsyncRead;
87
88use crate::PostPolicy;
89use crate::error::S3Error;
90use crate::post_policy::PresignedPost;
91use crate::serde_types::{
92 BucketLifecycleConfiguration, BucketLocationResult, CompleteMultipartUploadData,
93 CorsConfiguration, DeleteObjectsRequest, DeleteObjectsResult, GetObjectAttributesOutput,
94 HeadObjectResult, InitiateMultipartUploadResponse, ListBucketResult,
95 ListMultipartUploadsResult, ObjectIdentifier, Part,
96};
97#[allow(unused_imports)]
98use crate::utils::{PutStreamResponse, error_from_response_data};
99use http::HeaderMap;
100use http::header::HeaderName;
101#[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
102use sysinfo::{MemoryRefreshKind, System};
103
104pub const CHUNK_SIZE: usize = 8_388_608; const DEFAULT_REQUEST_TIMEOUT: Option<Duration> = Some(Duration::from_secs(60));
107
108#[derive(Debug, PartialEq, Eq)]
109pub struct Tag {
110 key: String,
111 value: String,
112}
113
114impl Tag {
115 pub fn key(&self) -> String {
116 self.key.to_owned()
117 }
118
119 pub fn value(&self) -> String {
120 self.value.to_owned()
121 }
122}
123
124#[derive(Clone, Debug)]
139pub struct Bucket {
140 pub name: String,
141 pub region: Region,
142 credentials: Arc<RwLock<Credentials>>,
143 pub extra_headers: HeaderMap,
144 pub extra_query: Query,
145 pub request_timeout: Option<Duration>,
146 path_style: bool,
147 listobjects_v2: bool,
148 #[cfg(feature = "with-tokio")]
149 http_client: reqwest::Client,
150 #[cfg(feature = "with-tokio")]
151 client_options: crate::request::tokio_backend::ClientOptions,
152}
153
154impl Bucket {
155 #[maybe_async::async_impl]
156 pub async fn credentials_refresh(&self) -> Result<(), S3Error> {
158 Ok(self.credentials.write().await.refresh()?)
159 }
160
161 #[maybe_async::sync_impl]
162 pub fn credentials_refresh(&self) -> Result<(), S3Error> {
164 match self.credentials.write() {
165 Ok(mut credentials) => Ok(credentials.refresh()?),
166 Err(_) => Err(S3Error::CredentialsWriteLock),
167 }
168 }
169
170 #[cfg(feature = "with-tokio")]
171 pub fn http_client(&self) -> reqwest::Client {
172 self.http_client.clone()
173 }
174}
175
176fn validate_expiry(expiry_secs: u32) -> Result<(), S3Error> {
177 if 604800 < expiry_secs {
178 return Err(S3Error::MaxExpiry(expiry_secs));
179 }
180 Ok(())
181}
182
183#[cfg_attr(all(feature = "with-tokio", feature = "blocking"), block_on("tokio"))]
184#[cfg_attr(
185 all(feature = "with-async-std", feature = "blocking"),
186 block_on("async-std")
187)]
188impl Bucket {
189 #[maybe_async::maybe_async]
217 pub async fn presign_get<S: AsRef<str>>(
218 &self,
219 path: S,
220 expiry_secs: u32,
221 custom_queries: Option<HashMap<String, String>>,
222 ) -> Result<String, S3Error> {
223 validate_expiry(expiry_secs)?;
224 let request = RequestImpl::new(
225 self,
226 path.as_ref(),
227 Command::PresignGet {
228 expiry_secs,
229 custom_queries,
230 },
231 )
232 .await?;
233 request.presigned().await
234 }
235
236 #[maybe_async::maybe_async]
263 #[allow(clippy::needless_lifetimes)]
264 pub async fn presign_post<'a>(
265 &self,
266 post_policy: PostPolicy<'a>,
267 ) -> Result<PresignedPost, S3Error> {
268 post_policy.sign(Box::new(self.clone())).await
269 }
270
271 #[maybe_async::maybe_async]
299 pub async fn presign_put<S: AsRef<str>>(
300 &self,
301 path: S,
302 expiry_secs: u32,
303 custom_headers: Option<HeaderMap>,
304 custom_queries: Option<HashMap<String, String>>,
305 ) -> Result<String, S3Error> {
306 validate_expiry(expiry_secs)?;
307 let request = RequestImpl::new(
308 self,
309 path.as_ref(),
310 Command::PresignPut {
311 expiry_secs,
312 custom_headers,
313 custom_queries,
314 },
315 )
316 .await?;
317 request.presigned().await
318 }
319
320 #[maybe_async::maybe_async]
341 pub async fn presign_delete<S: AsRef<str>>(
342 &self,
343 path: S,
344 expiry_secs: u32,
345 ) -> Result<String, S3Error> {
346 validate_expiry(expiry_secs)?;
347 let request =
348 RequestImpl::new(self, path.as_ref(), Command::PresignDelete { expiry_secs }).await?;
349 request.presigned().await
350 }
351
352 #[maybe_async::maybe_async]
385 pub async fn create(
386 name: &str,
387 region: Region,
388 credentials: Credentials,
389 config: BucketConfiguration,
390 ) -> Result<CreateBucketResponse, S3Error> {
391 let mut config = config;
392
393 let skip_constraint = std::env::var("RUST_S3_SKIP_LOCATION_CONSTRAINT")
397 .unwrap_or_default()
398 .to_lowercase();
399
400 if skip_constraint != "true" && skip_constraint != "1" {
401 config.set_region(region.clone());
402 }
403
404 let command = Command::CreateBucket { config };
405 let bucket = Bucket::new(name, region, credentials)?;
406 let request = RequestImpl::new(&bucket, "", command).await?;
407 let response_data = request.response_data(false).await?;
408 let response_text = response_data.as_str()?;
409 Ok(CreateBucketResponse {
410 bucket,
411 response_text: response_text.to_string(),
412 response_code: response_data.status_code(),
413 })
414 }
415
416 #[maybe_async::maybe_async]
450 pub async fn list_buckets(
451 region: Region,
452 credentials: Credentials,
453 ) -> Result<crate::bucket_ops::ListBucketsResponse, S3Error> {
454 let dummy_bucket = Bucket::new("", region, credentials)?.with_path_style();
455 dummy_bucket._list_buckets().await
456 }
457
458 #[maybe_async::maybe_async]
461 async fn _list_buckets(&self) -> Result<crate::bucket_ops::ListBucketsResponse, S3Error> {
462 let request = RequestImpl::new(self, "", Command::ListBuckets).await?;
463 let response = request.response_data(false).await?;
464
465 Ok(quick_xml::de::from_str::<
466 crate::bucket_ops::ListBucketsResponse,
467 >(response.as_str()?)?)
468 }
469
470 #[maybe_async::maybe_async]
502 pub async fn exists(&self) -> Result<bool, S3Error> {
503 let mut dummy_bucket = self.clone();
504 dummy_bucket.name = "".into();
505
506 let response = dummy_bucket._list_buckets().await?;
507
508 Ok(response
509 .bucket_names()
510 .collect::<std::collections::HashSet<String>>()
511 .contains(&self.name))
512 }
513
514 #[maybe_async::maybe_async]
547 pub async fn create_with_path_style(
548 name: &str,
549 region: Region,
550 credentials: Credentials,
551 config: BucketConfiguration,
552 ) -> Result<CreateBucketResponse, S3Error> {
553 let mut config = config;
554
555 let skip_constraint = std::env::var("RUST_S3_SKIP_LOCATION_CONSTRAINT")
559 .unwrap_or_default()
560 .to_lowercase();
561
562 if skip_constraint != "true" && skip_constraint != "1" {
563 config.set_region(region.clone());
564 }
565
566 let command = Command::CreateBucket { config };
567 let bucket = Bucket::new(name, region, credentials)?.with_path_style();
568 let request = RequestImpl::new(&bucket, "", command).await?;
569 let response_data = request.response_data(false).await?;
570 let response_text = response_data.to_string()?;
571
572 Ok(CreateBucketResponse {
573 bucket,
574 response_text,
575 response_code: response_data.status_code(),
576 })
577 }
578
579 #[maybe_async::maybe_async]
610 pub async fn delete(&self) -> Result<u16, S3Error> {
611 let command = Command::DeleteBucket;
612 let request = RequestImpl::new(self, "", command).await?;
613 let response_data = request.response_data(false).await?;
614 Ok(response_data.status_code())
615 }
616
617 pub fn new(
632 name: &str,
633 region: Region,
634 credentials: Credentials,
635 ) -> Result<Box<Bucket>, S3Error> {
636 #[cfg(feature = "with-tokio")]
637 let options = ClientOptions::default();
638
639 Ok(Box::new(Bucket {
640 name: name.into(),
641 region,
642 credentials: Arc::new(RwLock::new(credentials)),
643 extra_headers: HeaderMap::new(),
644 extra_query: HashMap::new(),
645 request_timeout: DEFAULT_REQUEST_TIMEOUT,
646 path_style: false,
647 listobjects_v2: true,
648 #[cfg(feature = "with-tokio")]
649 http_client: client(&options)?,
650 #[cfg(feature = "with-tokio")]
651 client_options: options,
652 }))
653 }
654
655 pub fn new_public(name: &str, region: Region) -> Result<Bucket, S3Error> {
667 #[cfg(feature = "with-tokio")]
668 let options = ClientOptions::default();
669
670 Ok(Bucket {
671 name: name.into(),
672 region,
673 credentials: Arc::new(RwLock::new(Credentials::anonymous()?)),
674 extra_headers: HeaderMap::new(),
675 extra_query: HashMap::new(),
676 request_timeout: DEFAULT_REQUEST_TIMEOUT,
677 path_style: false,
678 listobjects_v2: true,
679 #[cfg(feature = "with-tokio")]
680 http_client: client(&options)?,
681 #[cfg(feature = "with-tokio")]
682 client_options: options,
683 })
684 }
685
686 pub fn with_path_style(&self) -> Box<Bucket> {
687 Box::new(Bucket {
688 name: self.name.clone(),
689 region: self.region.clone(),
690 credentials: self.credentials.clone(),
691 extra_headers: self.extra_headers.clone(),
692 extra_query: self.extra_query.clone(),
693 request_timeout: self.request_timeout,
694 path_style: true,
695 listobjects_v2: self.listobjects_v2,
696 #[cfg(feature = "with-tokio")]
697 http_client: self.http_client(),
698 #[cfg(feature = "with-tokio")]
699 client_options: self.client_options.clone(),
700 })
701 }
702
703 pub fn with_extra_headers(&self, extra_headers: HeaderMap) -> Result<Bucket, S3Error> {
704 Ok(Bucket {
705 name: self.name.clone(),
706 region: self.region.clone(),
707 credentials: self.credentials.clone(),
708 extra_headers,
709 extra_query: self.extra_query.clone(),
710 request_timeout: self.request_timeout,
711 path_style: self.path_style,
712 listobjects_v2: self.listobjects_v2,
713 #[cfg(feature = "with-tokio")]
714 http_client: self.http_client(),
715 #[cfg(feature = "with-tokio")]
716 client_options: self.client_options.clone(),
717 })
718 }
719
720 pub fn with_extra_query(
721 &self,
722 extra_query: HashMap<String, String>,
723 ) -> Result<Bucket, S3Error> {
724 Ok(Bucket {
725 name: self.name.clone(),
726 region: self.region.clone(),
727 credentials: self.credentials.clone(),
728 extra_headers: self.extra_headers.clone(),
729 extra_query,
730 request_timeout: self.request_timeout,
731 path_style: self.path_style,
732 listobjects_v2: self.listobjects_v2,
733 #[cfg(feature = "with-tokio")]
734 http_client: self.http_client(),
735 #[cfg(feature = "with-tokio")]
736 client_options: self.client_options.clone(),
737 })
738 }
739
740 #[cfg(not(feature = "with-tokio"))]
741 pub fn with_request_timeout(&self, request_timeout: Duration) -> Result<Box<Bucket>, S3Error> {
742 Ok(Box::new(Bucket {
743 name: self.name.clone(),
744 region: self.region.clone(),
745 credentials: self.credentials.clone(),
746 extra_headers: self.extra_headers.clone(),
747 extra_query: self.extra_query.clone(),
748 request_timeout: Some(request_timeout),
749 path_style: self.path_style,
750 listobjects_v2: self.listobjects_v2,
751 }))
752 }
753
754 #[cfg(feature = "with-tokio")]
755 pub fn with_request_timeout(&self, request_timeout: Duration) -> Result<Box<Bucket>, S3Error> {
756 let options = ClientOptions {
757 request_timeout: Some(request_timeout),
758 ..Default::default()
759 };
760
761 Ok(Box::new(Bucket {
762 name: self.name.clone(),
763 region: self.region.clone(),
764 credentials: self.credentials.clone(),
765 extra_headers: self.extra_headers.clone(),
766 extra_query: self.extra_query.clone(),
767 request_timeout: Some(request_timeout),
768 path_style: self.path_style,
769 listobjects_v2: self.listobjects_v2,
770 #[cfg(feature = "with-tokio")]
771 http_client: client(&options)?,
772 #[cfg(feature = "with-tokio")]
773 client_options: options,
774 }))
775 }
776
777 pub fn with_listobjects_v1(&self) -> Bucket {
778 Bucket {
779 name: self.name.clone(),
780 region: self.region.clone(),
781 credentials: self.credentials.clone(),
782 extra_headers: self.extra_headers.clone(),
783 extra_query: self.extra_query.clone(),
784 request_timeout: self.request_timeout,
785 path_style: self.path_style,
786 listobjects_v2: false,
787 #[cfg(feature = "with-tokio")]
788 http_client: self.http_client(),
789 #[cfg(feature = "with-tokio")]
790 client_options: self.client_options.clone(),
791 }
792 }
793
794 #[cfg(any(feature = "tokio-native-tls", feature = "tokio-rustls-tls"))]
827 pub fn set_dangerous_config(
828 &self,
829 accept_invalid_certs: bool,
830 accept_invalid_hostnames: bool,
831 ) -> Result<Bucket, S3Error> {
832 let mut options = self.client_options.clone();
833 options.accept_invalid_certs = accept_invalid_certs;
834 options.accept_invalid_hostnames = accept_invalid_hostnames;
835
836 Ok(Bucket {
837 name: self.name.clone(),
838 region: self.region.clone(),
839 credentials: self.credentials.clone(),
840 extra_headers: self.extra_headers.clone(),
841 extra_query: self.extra_query.clone(),
842 request_timeout: self.request_timeout,
843 path_style: self.path_style,
844 listobjects_v2: self.listobjects_v2,
845 http_client: client(&options)?,
846 client_options: options,
847 })
848 }
849
850 #[deprecated(
852 since = "0.37.3",
853 note = "use `set_dangerous_config`; this misspelled method remains for compatibility"
854 )]
855 #[cfg(any(feature = "tokio-native-tls", feature = "tokio-rustls-tls"))]
856 pub fn set_dangereous_config(
857 &self,
858 accept_invalid_certs: bool,
859 accept_invalid_hostnames: bool,
860 ) -> Result<Bucket, S3Error> {
861 self.set_dangerous_config(accept_invalid_certs, accept_invalid_hostnames)
862 }
863
864 #[cfg(feature = "with-tokio")]
865 pub fn set_proxy(&self, proxy: reqwest::Proxy) -> Result<Bucket, S3Error> {
866 let mut options = self.client_options.clone();
867 options.proxy = Some(proxy);
868
869 Ok(Bucket {
870 name: self.name.clone(),
871 region: self.region.clone(),
872 credentials: self.credentials.clone(),
873 extra_headers: self.extra_headers.clone(),
874 extra_query: self.extra_query.clone(),
875 request_timeout: self.request_timeout,
876 path_style: self.path_style,
877 listobjects_v2: self.listobjects_v2,
878 http_client: client(&options)?,
879 client_options: options,
880 })
881 }
882
883 #[maybe_async::maybe_async]
911 pub async fn copy_object_internal<F: AsRef<str>, T: AsRef<str>>(
912 &self,
913 from: F,
914 to: T,
915 ) -> Result<u16, S3Error> {
916 let fq_from = {
917 let from = from.as_ref();
918 let from = from.strip_prefix('/').unwrap_or(from);
919 format!("{bucket}/{path}", bucket = self.name(), path = from)
920 };
921 self.copy_object(fq_from, to).await
922 }
923
924 #[maybe_async::maybe_async]
925 async fn copy_object<F: AsRef<str>, T: AsRef<str>>(
926 &self,
927 from: F,
928 to: T,
929 ) -> Result<u16, S3Error> {
930 let command = Command::CopyObject {
931 from: from.as_ref(),
932 };
933 let request = RequestImpl::new(self, to.as_ref(), command).await?;
934 let response_data = request.response_data(false).await?;
935 Ok(response_data.status_code())
936 }
937
938 #[maybe_async::maybe_async]
970 pub async fn get_object<S: AsRef<str>>(&self, path: S) -> Result<ResponseData, S3Error> {
971 let command = Command::GetObject;
972 let request = RequestImpl::new(self, path.as_ref(), command).await?;
973 request.response_data(false).await
974 }
975
976 #[maybe_async::maybe_async]
977 pub async fn get_object_attributes<S: AsRef<str>>(
978 &self,
979 path: S,
980 expected_bucket_owner: &str,
981 version_id: Option<String>,
982 ) -> Result<GetObjectAttributesOutput, S3Error> {
983 let command = Command::GetObjectAttributes {
984 expected_bucket_owner: expected_bucket_owner.to_string(),
985 version_id,
986 };
987 let request = RequestImpl::new(self, path.as_ref(), command).await?;
988
989 let response = request.response_data(false).await?;
990
991 Ok(quick_xml::de::from_str::<GetObjectAttributesOutput>(
992 response.as_str()?,
993 )?)
994 }
995
996 #[maybe_async::maybe_async]
1039 pub async fn object_exists<S: AsRef<str>>(&self, path: S) -> Result<bool, S3Error> {
1040 let command = Command::HeadObject;
1041 let request = RequestImpl::new(self, path.as_ref(), command).await?;
1042 let status_code = request.response_status().await?;
1043 Ok(status_code != 404)
1044 }
1045
1046 #[maybe_async::maybe_async]
1047 pub async fn put_bucket_cors(
1048 &self,
1049 expected_bucket_owner: &str,
1050 cors_config: &CorsConfiguration,
1051 ) -> Result<ResponseData, S3Error> {
1052 let command = Command::PutBucketCors {
1053 expected_bucket_owner: expected_bucket_owner.to_string(),
1054 configuration: cors_config.clone(),
1055 };
1056 let request = RequestImpl::new(self, "", command).await?;
1057 request.response_data(false).await
1058 }
1059
1060 #[maybe_async::maybe_async]
1061 pub async fn get_bucket_cors(
1062 &self,
1063 expected_bucket_owner: &str,
1064 ) -> Result<CorsConfiguration, S3Error> {
1065 let command = Command::GetBucketCors {
1066 expected_bucket_owner: expected_bucket_owner.to_string(),
1067 };
1068 let request = RequestImpl::new(self, "", command).await?;
1069 let response = request.response_data(false).await?;
1070 Ok(quick_xml::de::from_str::<CorsConfiguration>(
1071 response.as_str()?,
1072 )?)
1073 }
1074
1075 #[maybe_async::maybe_async]
1076 pub async fn delete_bucket_cors(
1077 &self,
1078 expected_bucket_owner: &str,
1079 ) -> Result<ResponseData, S3Error> {
1080 let command = Command::DeleteBucketCors {
1081 expected_bucket_owner: expected_bucket_owner.to_string(),
1082 };
1083 let request = RequestImpl::new(self, "", command).await?;
1084 request.response_data(false).await
1085 }
1086
1087 #[maybe_async::maybe_async]
1088 pub async fn get_bucket_lifecycle(&self) -> Result<BucketLifecycleConfiguration, S3Error> {
1089 let request = RequestImpl::new(self, "", Command::GetBucketLifecycle).await?;
1090 let response = request.response_data(false).await?;
1091 Ok(quick_xml::de::from_str::<BucketLifecycleConfiguration>(
1092 response.as_str()?,
1093 )?)
1094 }
1095
1096 #[maybe_async::maybe_async]
1097 pub async fn put_bucket_lifecycle(
1098 &self,
1099 lifecycle_config: BucketLifecycleConfiguration,
1100 ) -> Result<ResponseData, S3Error> {
1101 let command = Command::PutBucketLifecycle {
1102 configuration: lifecycle_config,
1103 };
1104 let request = RequestImpl::new(self, "", command).await?;
1105 request.response_data(false).await
1106 }
1107
1108 #[maybe_async::maybe_async]
1109 pub async fn delete_bucket_lifecycle(&self) -> Result<ResponseData, S3Error> {
1110 let request = RequestImpl::new(self, "", Command::DeleteBucketLifecycle).await?;
1111 request.response_data(false).await
1112 }
1113
1114 #[maybe_async::maybe_async]
1146 pub async fn get_object_torrent<S: AsRef<str>>(
1147 &self,
1148 path: S,
1149 ) -> Result<ResponseData, S3Error> {
1150 let command = Command::GetObjectTorrent;
1151 let request = RequestImpl::new(self, path.as_ref(), command).await?;
1152 request.response_data(false).await
1153 }
1154
1155 #[maybe_async::maybe_async]
1188 pub async fn get_object_range<S: AsRef<str>>(
1189 &self,
1190 path: S,
1191 start: u64,
1192 end: Option<u64>,
1193 ) -> Result<ResponseData, S3Error> {
1194 if let Some(end) = end {
1195 assert!(start <= end);
1196 }
1197
1198 let command = Command::GetObjectRange { start, end };
1199 let request = RequestImpl::new(self, path.as_ref(), command).await?;
1200 request.response_data(false).await
1201 }
1202
1203 #[maybe_async::async_impl]
1244 pub async fn get_object_range_to_writer<T, S>(
1245 &self,
1246 path: S,
1247 start: u64,
1248 end: Option<u64>,
1249 writer: &mut T,
1250 ) -> Result<u16, S3Error>
1251 where
1252 T: AsyncWrite + Send + Unpin + ?Sized,
1253 S: AsRef<str>,
1254 {
1255 if let Some(end) = end {
1256 assert!(start <= end);
1257 }
1258
1259 let command = Command::GetObjectRange { start, end };
1260 let request = RequestImpl::new(self, path.as_ref(), command).await?;
1261 request.response_data_to_writer(writer).await
1262 }
1263
1264 #[maybe_async::sync_impl]
1265 pub fn get_object_range_to_writer<T: std::io::Write + Send + ?Sized, S: AsRef<str>>(
1266 &self,
1267 path: S,
1268 start: u64,
1269 end: Option<u64>,
1270 writer: &mut T,
1271 ) -> Result<u16, S3Error> {
1272 if let Some(end) = end {
1273 assert!(start <= end);
1274 }
1275
1276 let command = Command::GetObjectRange { start, end };
1277 let request = RequestImpl::new(self, path.as_ref(), command)?;
1278 request.response_data_to_writer(writer)
1279 }
1280
1281 #[maybe_async::async_impl]
1319 pub async fn get_object_to_writer<T: AsyncWrite + Send + Unpin + ?Sized, S: AsRef<str>>(
1320 &self,
1321 path: S,
1322 writer: &mut T,
1323 ) -> Result<u16, S3Error> {
1324 let command = Command::GetObject;
1325 let request = RequestImpl::new(self, path.as_ref(), command).await?;
1326 request.response_data_to_writer(writer).await
1327 }
1328
1329 #[maybe_async::sync_impl]
1330 pub fn get_object_to_writer<T: std::io::Write + Send + ?Sized, S: AsRef<str>>(
1331 &self,
1332 path: S,
1333 writer: &mut T,
1334 ) -> Result<u16, S3Error> {
1335 let command = Command::GetObject;
1336 let request = RequestImpl::new(self, path.as_ref(), command)?;
1337 request.response_data_to_writer(writer)
1338 }
1339
1340 #[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
1382 pub async fn get_object_stream<S: AsRef<str>>(
1383 &self,
1384 path: S,
1385 ) -> Result<ResponseDataStream, S3Error> {
1386 let command = Command::GetObject;
1387 let request = RequestImpl::new(self, path.as_ref(), command).await?;
1388 request.response_data_to_stream().await
1389 }
1390
1391 #[maybe_async::async_impl]
1438 pub async fn put_object_stream<R: AsyncRead + Unpin + ?Sized>(
1439 &self,
1440 reader: &mut R,
1441 s3_path: impl AsRef<str>,
1442 ) -> Result<PutStreamResponse, S3Error> {
1443 self._put_object_stream_with_content_type(
1444 reader,
1445 s3_path.as_ref(),
1446 "application/octet-stream",
1447 )
1448 .await
1449 }
1450
1451 #[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
1484 pub fn put_object_stream_builder<S: AsRef<str>>(
1485 &self,
1486 path: S,
1487 ) -> crate::put_object_request::PutObjectStreamRequest<'_> {
1488 crate::put_object_request::PutObjectStreamRequest::new(self, path)
1489 }
1490
1491 #[maybe_async::sync_impl]
1492 pub fn put_object_stream<R: Read>(
1493 &self,
1494 reader: &mut R,
1495 s3_path: impl AsRef<str>,
1496 ) -> Result<u16, S3Error> {
1497 self._put_object_stream_with_content_type(
1498 reader,
1499 s3_path.as_ref(),
1500 "application/octet-stream",
1501 )
1502 }
1503
1504 #[maybe_async::async_impl]
1555 pub async fn put_object_stream_with_content_type<R: AsyncRead + Unpin>(
1556 &self,
1557 reader: &mut R,
1558 s3_path: impl AsRef<str>,
1559 content_type: impl AsRef<str>,
1560 ) -> Result<PutStreamResponse, S3Error> {
1561 self._put_object_stream_with_content_type(reader, s3_path.as_ref(), content_type.as_ref())
1562 .await
1563 }
1564
1565 #[maybe_async::sync_impl]
1566 pub fn put_object_stream_with_content_type<R: Read>(
1567 &self,
1568 reader: &mut R,
1569 s3_path: impl AsRef<str>,
1570 content_type: impl AsRef<str>,
1571 ) -> Result<u16, S3Error> {
1572 self._put_object_stream_with_content_type(reader, s3_path.as_ref(), content_type.as_ref())
1573 }
1574
1575 #[maybe_async::async_impl]
1576 async fn make_multipart_request(
1577 &self,
1578 path: &str,
1579 chunk: Vec<u8>,
1580 part_number: u32,
1581 upload_id: &str,
1582 content_type: &str,
1583 ) -> Result<ResponseData, S3Error> {
1584 let command = Command::PutObject {
1585 content: &chunk,
1586 multipart: Some(Multipart::new(part_number, upload_id)), custom_headers: None,
1588 content_type,
1589 };
1590 let request = RequestImpl::new(self, path, command).await?;
1591 request.response_data(true).await
1592 }
1593
1594 #[maybe_async::async_impl]
1595 async fn _put_object_stream_with_content_type<R: AsyncRead + Unpin + ?Sized>(
1596 &self,
1597 reader: &mut R,
1598 s3_path: &str,
1599 content_type: &str,
1600 ) -> Result<PutStreamResponse, S3Error> {
1601 self._put_object_stream_with_content_type_and_headers(reader, s3_path, content_type, None)
1602 .await
1603 }
1604
1605 #[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
1608 fn calculate_max_concurrent_chunks() -> usize {
1609 let mut system = System::new();
1611 system.refresh_memory_specifics(MemoryRefreshKind::everything());
1612
1613 let available_memory = system.available_memory();
1615
1616 if available_memory == 0 {
1618 return 3;
1619 }
1620
1621 let safety_factor = 3;
1625 let memory_per_chunk = CHUNK_SIZE as u64 * safety_factor;
1626
1627 let calculated_chunks = (available_memory / memory_per_chunk) as usize;
1629
1630 calculated_chunks.clamp(2, 100)
1634 }
1635
1636 #[maybe_async::async_impl]
1637 pub(crate) async fn _put_object_stream_with_content_type_and_headers<
1638 R: AsyncRead + Unpin + ?Sized,
1639 >(
1640 &self,
1641 reader: &mut R,
1642 s3_path: &str,
1643 content_type: &str,
1644 custom_headers: Option<http::HeaderMap>,
1645 ) -> Result<PutStreamResponse, S3Error> {
1646 let first_chunk = crate::utils::read_chunk_async(reader).await?;
1649 if first_chunk.len() < CHUNK_SIZE {
1651 let total_size = first_chunk.len();
1652 let mut builder = self
1654 .put_object_builder(s3_path, first_chunk.as_slice())
1655 .with_content_type(content_type);
1656
1657 if let Some(headers) = custom_headers {
1659 builder = builder.with_headers(headers);
1660 }
1661
1662 let response_data = builder.execute().await?;
1663 if response_data.status_code() >= 300 {
1664 return Err(error_from_response_data(response_data)?);
1665 }
1666 return Ok(PutStreamResponse::new(
1667 response_data.status_code(),
1668 total_size,
1669 ));
1670 }
1671
1672 let msg = self
1673 .initiate_multipart_upload(s3_path, content_type)
1674 .await?;
1675 let path = msg.key;
1676 let upload_id = &msg.upload_id;
1677
1678 let max_concurrent_chunks = Self::calculate_max_concurrent_chunks();
1680
1681 use futures_util::FutureExt;
1683 use futures_util::stream::{FuturesUnordered, StreamExt};
1684
1685 let mut part_number: u32 = 0;
1686 let mut total_size = 0;
1687 let mut etags = Vec::new();
1688 let mut active_uploads: FuturesUnordered<
1689 futures_util::future::BoxFuture<'_, (u32, Result<ResponseData, S3Error>)>,
1690 > = FuturesUnordered::new();
1691 let mut reading_done = false;
1692
1693 part_number += 1;
1695 total_size += first_chunk.len();
1696 if first_chunk.len() < CHUNK_SIZE {
1697 reading_done = true;
1698 }
1699
1700 let path_clone = path.clone();
1701 let upload_id_clone = upload_id.clone();
1702 let content_type_clone = content_type.to_string();
1703 let bucket_clone = self.clone();
1704
1705 active_uploads.push(
1706 async move {
1707 let result = bucket_clone
1708 .make_multipart_request(
1709 &path_clone,
1710 first_chunk,
1711 1,
1712 &upload_id_clone,
1713 &content_type_clone,
1714 )
1715 .await;
1716 (1, result)
1717 }
1718 .boxed(),
1719 );
1720
1721 while !active_uploads.is_empty() || !reading_done {
1723 while active_uploads.len() < max_concurrent_chunks && !reading_done {
1725 let chunk = crate::utils::read_chunk_async(reader).await?;
1726 let chunk_len = chunk.len();
1727
1728 if chunk_len == 0 {
1729 reading_done = true;
1730 break;
1731 }
1732
1733 total_size += chunk_len;
1734 part_number += 1;
1735
1736 if chunk_len < CHUNK_SIZE {
1737 reading_done = true;
1738 }
1739
1740 let current_part = part_number;
1741 let path_clone = path.clone();
1742 let upload_id_clone = upload_id.clone();
1743 let content_type_clone = content_type.to_string();
1744 let bucket_clone = self.clone();
1745
1746 active_uploads.push(
1747 async move {
1748 let result = bucket_clone
1749 .make_multipart_request(
1750 &path_clone,
1751 chunk,
1752 current_part,
1753 &upload_id_clone,
1754 &content_type_clone,
1755 )
1756 .await;
1757 (current_part, result)
1758 }
1759 .boxed(),
1760 );
1761 }
1762
1763 if let Some((part_num, result)) = active_uploads.next().await {
1765 let response_data = result?;
1766 if !(200..300).contains(&response_data.status_code()) {
1767 match self.abort_upload(&path, upload_id).await {
1769 Ok(_) => {
1770 return Err(error_from_response_data(response_data)?);
1771 }
1772 Err(error) => {
1773 return Err(error);
1774 }
1775 }
1776 }
1777
1778 let etag = response_data.as_str()?;
1779 etags.push((part_num, etag.to_string()));
1781 }
1782 }
1783
1784 etags.sort_by_key(|k| k.0);
1786 let etags: Vec<String> = etags.into_iter().map(|(_, etag)| etag).collect();
1787
1788 let inner_data = etags
1790 .clone()
1791 .into_iter()
1792 .enumerate()
1793 .map(|(i, x)| Part {
1794 etag: x,
1795 part_number: i as u32 + 1,
1796 })
1797 .collect::<Vec<Part>>();
1798 let response_data = self
1799 .complete_multipart_upload(&path, &msg.upload_id, inner_data)
1800 .await?;
1801
1802 Ok(PutStreamResponse::new(
1803 response_data.status_code(),
1804 total_size,
1805 ))
1806 }
1807
1808 #[maybe_async::sync_impl]
1809 fn _put_object_stream_with_content_type<R: Read + ?Sized>(
1810 &self,
1811 reader: &mut R,
1812 s3_path: &str,
1813 content_type: &str,
1814 ) -> Result<u16, S3Error> {
1815 let msg = self.initiate_multipart_upload(s3_path, content_type)?;
1816 let path = msg.key;
1817 let upload_id = &msg.upload_id;
1818
1819 let mut part_number: u32 = 0;
1820 let mut etags = Vec::new();
1821 loop {
1822 let chunk = crate::utils::read_chunk(reader)?;
1823
1824 if chunk.len() < CHUNK_SIZE {
1825 if part_number == 0 {
1826 self.abort_upload(&path, upload_id)?;
1828
1829 return Ok(self.put_object(s3_path, chunk.as_slice())?.status_code());
1830 } else {
1831 part_number += 1;
1832 let part = self.put_multipart_chunk(
1833 &chunk,
1834 &path,
1835 part_number,
1836 upload_id,
1837 content_type,
1838 )?;
1839 etags.push(part.etag);
1840 let inner_data = etags
1841 .into_iter()
1842 .enumerate()
1843 .map(|(i, x)| Part {
1844 etag: x,
1845 part_number: i as u32 + 1,
1846 })
1847 .collect::<Vec<Part>>();
1848 return Ok(self
1849 .complete_multipart_upload(&path, upload_id, inner_data)?
1850 .status_code());
1851 }
1853 } else {
1854 part_number += 1;
1855 let part =
1856 self.put_multipart_chunk(&chunk, &path, part_number, upload_id, content_type)?;
1857 etags.push(part.etag.to_string());
1858 }
1859 }
1860 }
1861
1862 #[maybe_async::async_impl]
1864 pub async fn initiate_multipart_upload(
1865 &self,
1866 s3_path: &str,
1867 content_type: &str,
1868 ) -> Result<InitiateMultipartUploadResponse, S3Error> {
1869 let command = Command::InitiateMultipartUpload { content_type };
1870 let request = RequestImpl::new(self, s3_path, command).await?;
1871 let response_data = request.response_data(false).await?;
1872 if response_data.status_code() >= 300 {
1873 return Err(error_from_response_data(response_data)?);
1874 }
1875
1876 let msg: InitiateMultipartUploadResponse =
1877 quick_xml::de::from_str(response_data.as_str()?)?;
1878 Ok(msg)
1879 }
1880
1881 #[maybe_async::sync_impl]
1882 pub fn initiate_multipart_upload(
1883 &self,
1884 s3_path: &str,
1885 content_type: &str,
1886 ) -> Result<InitiateMultipartUploadResponse, S3Error> {
1887 let command = Command::InitiateMultipartUpload { content_type };
1888 let request = RequestImpl::new(self, s3_path, command)?;
1889 let response_data = request.response_data(false)?;
1890 if response_data.status_code() >= 300 {
1891 return Err(error_from_response_data(response_data)?);
1892 }
1893
1894 let msg: InitiateMultipartUploadResponse =
1895 quick_xml::de::from_str(response_data.as_str()?)?;
1896 Ok(msg)
1897 }
1898
1899 #[maybe_async::async_impl]
1901 pub async fn put_multipart_stream<R: Read + Unpin>(
1902 &self,
1903 reader: &mut R,
1904 path: &str,
1905 part_number: u32,
1906 upload_id: &str,
1907 content_type: &str,
1908 ) -> Result<Part, S3Error> {
1909 let chunk = crate::utils::read_chunk(reader)?;
1910 self.put_multipart_chunk(chunk, path, part_number, upload_id, content_type)
1911 .await
1912 }
1913
1914 #[maybe_async::sync_impl]
1915 pub async fn put_multipart_stream<R: Read + Unpin>(
1916 &self,
1917 reader: &mut R,
1918 path: &str,
1919 part_number: u32,
1920 upload_id: &str,
1921 content_type: &str,
1922 ) -> Result<Part, S3Error> {
1923 let chunk = crate::utils::read_chunk(reader)?;
1924 self.put_multipart_chunk(&chunk, path, part_number, upload_id, content_type)
1925 }
1926
1927 #[maybe_async::async_impl]
1929 pub async fn put_multipart_chunk(
1930 &self,
1931 chunk: Vec<u8>,
1932 path: &str,
1933 part_number: u32,
1934 upload_id: &str,
1935 content_type: &str,
1936 ) -> Result<Part, S3Error> {
1937 let command = Command::PutObject {
1938 content: &chunk,
1940 multipart: Some(Multipart::new(part_number, upload_id)), custom_headers: None,
1942 content_type,
1943 };
1944 let request = RequestImpl::new(self, path, command).await?;
1945 let response_data = request.response_data(true).await?;
1946 if !(200..300).contains(&response_data.status_code()) {
1947 match self.abort_upload(path, upload_id).await {
1949 Ok(_) => {
1950 return Err(error_from_response_data(response_data)?);
1951 }
1952 Err(error) => {
1953 return Err(error);
1954 }
1955 }
1956 }
1957 let etag = response_data.as_str()?;
1958 Ok(Part {
1959 etag: etag.to_string(),
1960 part_number,
1961 })
1962 }
1963
1964 #[maybe_async::sync_impl]
1965 pub fn put_multipart_chunk(
1966 &self,
1967 chunk: &[u8],
1968 path: &str,
1969 part_number: u32,
1970 upload_id: &str,
1971 content_type: &str,
1972 ) -> Result<Part, S3Error> {
1973 let command = Command::PutObject {
1974 content: chunk,
1976 multipart: Some(Multipart::new(part_number, upload_id)), custom_headers: None,
1978 content_type,
1979 };
1980 let request = RequestImpl::new(self, path, command)?;
1981 let response_data = request.response_data(true)?;
1982 if !(200..300).contains(&response_data.status_code()) {
1983 match self.abort_upload(path, upload_id) {
1985 Ok(_) => {
1986 return Err(error_from_response_data(response_data)?);
1987 }
1988 Err(error) => {
1989 return Err(error);
1990 }
1991 }
1992 }
1993 let etag = response_data.as_str()?;
1994 Ok(Part {
1995 etag: etag.to_string(),
1996 part_number,
1997 })
1998 }
1999
2000 #[maybe_async::async_impl]
2002 pub async fn complete_multipart_upload(
2003 &self,
2004 path: &str,
2005 upload_id: &str,
2006 parts: Vec<Part>,
2007 ) -> Result<ResponseData, S3Error> {
2008 let data = CompleteMultipartUploadData { parts };
2009 let complete = Command::CompleteMultipartUpload { upload_id, data };
2010 let complete_request = RequestImpl::new(self, path, complete).await?;
2011 complete_request.response_data(false).await
2012 }
2013
2014 #[maybe_async::sync_impl]
2015 pub fn complete_multipart_upload(
2016 &self,
2017 path: &str,
2018 upload_id: &str,
2019 parts: Vec<Part>,
2020 ) -> Result<ResponseData, S3Error> {
2021 let data = CompleteMultipartUploadData { parts };
2022 let complete = Command::CompleteMultipartUpload { upload_id, data };
2023 let complete_request = RequestImpl::new(self, path, complete)?;
2024 complete_request.response_data(false)
2025 }
2026
2027 #[maybe_async::maybe_async]
2060 pub async fn location(&self) -> Result<(Region, u16), S3Error> {
2061 let request = RequestImpl::new(self, "?location", Command::GetBucketLocation).await?;
2062 let response_data = request.response_data(false).await?;
2063 let region_string = String::from_utf8_lossy(response_data.as_slice());
2064 let region = match quick_xml::de::from_reader(region_string.as_bytes()) {
2065 Ok(r) => {
2066 let location_result: BucketLocationResult = r;
2067 location_result.region.parse()?
2068 }
2069 Err(e) => {
2070 if response_data.status_code() == 200 {
2071 Region::Custom {
2072 region: "Custom".to_string(),
2073 endpoint: "".to_string(),
2074 }
2075 } else {
2076 Region::Custom {
2077 region: format!("Error encountered : {}", e),
2078 endpoint: "".to_string(),
2079 }
2080 }
2081 }
2082 };
2083 Ok((region, response_data.status_code()))
2084 }
2085
2086 #[maybe_async::maybe_async]
2119 pub async fn delete_object<S: AsRef<str>>(&self, path: S) -> Result<ResponseData, S3Error> {
2120 let command = Command::DeleteObject;
2121 let request = RequestImpl::new(self, path.as_ref(), command).await?;
2122 request.response_data(false).await
2123 }
2124
2125 #[maybe_async::maybe_async]
2169 pub async fn delete_objects<I: Into<Vec<ObjectIdentifier>>>(
2170 &self,
2171 objects: I,
2172 ) -> Result<DeleteObjectsResult, S3Error> {
2173 let objects = objects.into();
2174 let mut result = DeleteObjectsResult {
2175 deleted: Vec::new(),
2176 errors: Vec::new(),
2177 };
2178
2179 let objects: Vec<ObjectIdentifier> = objects
2183 .into_iter()
2184 .map(|mut obj| {
2185 if let Some(stripped) = obj.key.strip_prefix('/') {
2186 obj.key = stripped.to_string();
2187 }
2188 obj
2189 })
2190 .collect();
2191
2192 for chunk in objects.chunks(1000) {
2193 let data = DeleteObjectsRequest {
2194 objects: chunk.to_vec(),
2195 quiet: false,
2196 };
2197 let command = Command::DeleteObjects { data };
2198 let request = RequestImpl::new(self, "/", command).await?;
2199 let response_data = request.response_data(false).await?;
2200 if response_data.status_code() >= 300 {
2201 return Err(error_from_response_data(response_data)?);
2202 }
2203 let msg: DeleteObjectsResult = quick_xml::de::from_str(response_data.as_str()?)?;
2204 result.deleted.extend(msg.deleted);
2205 result.errors.extend(msg.errors);
2206 }
2207
2208 Ok(result)
2209 }
2210
2211 #[maybe_async::maybe_async]
2244 pub async fn head_object<S: AsRef<str>>(
2245 &self,
2246 path: S,
2247 ) -> Result<(HeadObjectResult, u16), S3Error> {
2248 let command = Command::HeadObject;
2249 let request = RequestImpl::new(self, path.as_ref(), command).await?;
2250 let (headers, status) = request.response_header().await?;
2251 let header_object = HeadObjectResult::from(&headers);
2252 Ok((header_object, status))
2253 }
2254
2255 #[maybe_async::maybe_async]
2289 pub async fn put_object_with_content_type<S: AsRef<str>>(
2290 &self,
2291 path: S,
2292 content: &[u8],
2293 content_type: &str,
2294 ) -> Result<ResponseData, S3Error> {
2295 let command = Command::PutObject {
2296 content,
2297 content_type,
2298 custom_headers: None,
2299 multipart: None,
2300 };
2301 let request = RequestImpl::new(self, path.as_ref(), command).await?;
2302 request.response_data(true).await
2303 }
2304
2305 #[maybe_async::maybe_async]
2348 pub async fn put_object_with_content_type_and_headers<S: AsRef<str>>(
2349 &self,
2350 path: S,
2351 content: &[u8],
2352 content_type: &str,
2353 custom_headers: Option<HeaderMap>,
2354 ) -> Result<ResponseData, S3Error> {
2355 let command = Command::PutObject {
2356 content,
2357 content_type,
2358 custom_headers,
2359 multipart: None,
2360 };
2361 let request = RequestImpl::new(self, path.as_ref(), command).await?;
2362 request.response_data(true).await
2363 }
2364
2365 #[maybe_async::maybe_async]
2405 pub async fn put_object_with_headers<S: AsRef<str>>(
2406 &self,
2407 path: S,
2408 content: &[u8],
2409 custom_headers: Option<HeaderMap>,
2410 ) -> Result<ResponseData, S3Error> {
2411 self.put_object_with_content_type_and_headers(
2412 path,
2413 content,
2414 "application/octet-stream",
2415 custom_headers,
2416 )
2417 .await
2418 }
2419
2420 #[maybe_async::maybe_async]
2454 pub async fn put_object<S: AsRef<str>>(
2455 &self,
2456 path: S,
2457 content: &[u8],
2458 ) -> Result<ResponseData, S3Error> {
2459 self.put_object_with_content_type(path, content, "application/octet-stream")
2460 .await
2461 }
2462
2463 pub fn put_object_builder<S: AsRef<str>>(
2492 &self,
2493 path: S,
2494 content: &[u8],
2495 ) -> crate::put_object_request::PutObjectRequest<'_> {
2496 crate::put_object_request::PutObjectRequest::new(self, path, content)
2497 }
2498
2499 fn _tags_xml<S: AsRef<str>>(&self, tags: &[(S, S)]) -> String {
2500 let mut s = String::new();
2501 let content = tags
2502 .iter()
2503 .map(|(name, value)| {
2504 format!(
2505 "<Tag><Key>{}</Key><Value>{}</Value></Tag>",
2506 name.as_ref(),
2507 value.as_ref()
2508 )
2509 })
2510 .fold(String::new(), |mut a, b| {
2511 a.push_str(b.as_str());
2512 a
2513 });
2514 s.push_str("<Tagging><TagSet>");
2515 s.push_str(&content);
2516 s.push_str("</TagSet></Tagging>");
2517 s
2518 }
2519
2520 #[maybe_async::maybe_async]
2553 pub async fn put_object_tagging<S: AsRef<str>>(
2554 &self,
2555 path: &str,
2556 tags: &[(S, S)],
2557 ) -> Result<ResponseData, S3Error> {
2558 let content = self._tags_xml(tags);
2559 let command = Command::PutObjectTagging { tags: &content };
2560 let request = RequestImpl::new(self, path, command).await?;
2561 request.response_data(false).await
2562 }
2563
2564 #[maybe_async::maybe_async]
2597 pub async fn delete_object_tagging<S: AsRef<str>>(
2598 &self,
2599 path: S,
2600 ) -> Result<ResponseData, S3Error> {
2601 let command = Command::DeleteObjectTagging;
2602 let request = RequestImpl::new(self, path.as_ref(), command).await?;
2603 request.response_data(false).await
2604 }
2605
2606 #[cfg(feature = "tags")]
2639 #[maybe_async::maybe_async]
2640 pub async fn get_object_tagging<S: AsRef<str>>(
2641 &self,
2642 path: S,
2643 ) -> Result<(Vec<Tag>, u16), S3Error> {
2644 let command = Command::GetObjectTagging {};
2645 let request = RequestImpl::new(self, path.as_ref(), command).await?;
2646 let result = request.response_data(false).await?;
2647
2648 let mut tags = Vec::new();
2649
2650 if result.status_code() == 200 {
2651 let result_string = String::from_utf8_lossy(result.as_slice());
2652
2653 let ns = "http://s3.amazonaws.com/doc/2006-03-01/";
2655 let result_string =
2656 if let Err(minidom::Error::MissingNamespace) = result_string.parse::<Element>() {
2657 result_string
2658 .replace("<Tagging>", &format!("<Tagging xmlns=\"{}\">", ns))
2659 .into()
2660 } else {
2661 result_string
2662 };
2663
2664 if let Ok(tagging) = result_string.parse::<Element>() {
2665 for tag_set in tagging.children() {
2666 if tag_set.is("TagSet", ns) {
2667 for tag in tag_set.children() {
2668 if tag.is("Tag", ns) {
2669 let key = if let Some(element) = tag.get_child("Key", ns) {
2670 element.text()
2671 } else {
2672 "Could not parse Key from Tag".to_string()
2673 };
2674 let value = if let Some(element) = tag.get_child("Value", ns) {
2675 element.text()
2676 } else {
2677 "Could not parse Values from Tag".to_string()
2678 };
2679 tags.push(Tag { key, value });
2680 }
2681 }
2682 }
2683 }
2684 }
2685 }
2686
2687 Ok((tags, result.status_code()))
2688 }
2689
2690 #[maybe_async::maybe_async]
2691 pub async fn list_page(
2692 &self,
2693 prefix: String,
2694 delimiter: Option<String>,
2695 continuation_token: Option<String>,
2696 start_after: Option<String>,
2697 max_keys: Option<usize>,
2698 ) -> Result<(ListBucketResult, u16), S3Error> {
2699 let command = if self.listobjects_v2 {
2700 Command::ListObjectsV2 {
2701 prefix,
2702 delimiter,
2703 continuation_token,
2704 start_after,
2705 max_keys,
2706 }
2707 } else {
2708 Command::ListObjects {
2712 prefix,
2713 delimiter,
2714 marker: std::cmp::max(continuation_token, start_after),
2715 max_keys,
2716 }
2717 };
2718 let request = RequestImpl::new(self, "/", command).await?;
2719 let response_data = request.response_data(false).await?;
2720 let list_bucket_result = quick_xml::de::from_reader(response_data.as_slice())?;
2721
2722 Ok((list_bucket_result, response_data.status_code()))
2723 }
2724
2725 #[maybe_async::maybe_async]
2758 #[allow(clippy::assigning_clones)]
2759 pub async fn list(
2760 &self,
2761 prefix: String,
2762 delimiter: Option<String>,
2763 ) -> Result<Vec<ListBucketResult>, S3Error> {
2764 let the_bucket = self.to_owned();
2765 let mut results = Vec::new();
2766 let mut continuation_token = None;
2767
2768 loop {
2769 let (list_bucket_result, _) = the_bucket
2770 .list_page(
2771 prefix.clone(),
2772 delimiter.clone(),
2773 continuation_token,
2774 None,
2775 None,
2776 )
2777 .await?;
2778 continuation_token = list_bucket_result.next_continuation_token.clone();
2779 results.push(list_bucket_result);
2780 if continuation_token.is_none() {
2781 break;
2782 }
2783 }
2784
2785 Ok(results)
2786 }
2787
2788 #[maybe_async::maybe_async]
2789 pub async fn list_multiparts_uploads_page(
2790 &self,
2791 prefix: Option<&str>,
2792 delimiter: Option<&str>,
2793 key_marker: Option<String>,
2794 max_uploads: Option<usize>,
2795 ) -> Result<(ListMultipartUploadsResult, u16), S3Error> {
2796 let command = Command::ListMultipartUploads {
2797 prefix,
2798 delimiter,
2799 key_marker,
2800 max_uploads,
2801 };
2802 let request = RequestImpl::new(self, "/", command).await?;
2803 let response_data = request.response_data(false).await?;
2804 let list_bucket_result = quick_xml::de::from_reader(response_data.as_slice())?;
2805
2806 Ok((list_bucket_result, response_data.status_code()))
2807 }
2808
2809 #[maybe_async::maybe_async]
2843 #[allow(clippy::assigning_clones)]
2844 pub async fn list_multiparts_uploads(
2845 &self,
2846 prefix: Option<&str>,
2847 delimiter: Option<&str>,
2848 ) -> Result<Vec<ListMultipartUploadsResult>, S3Error> {
2849 let the_bucket = self.to_owned();
2850 let mut results = Vec::new();
2851 let mut next_marker: Option<String> = None;
2852
2853 loop {
2854 let (list_multiparts_uploads_result, _) = the_bucket
2855 .list_multiparts_uploads_page(prefix, delimiter, next_marker, None)
2856 .await?;
2857
2858 let is_truncated = list_multiparts_uploads_result.is_truncated;
2859
2860 next_marker = list_multiparts_uploads_result.next_marker.clone();
2861 results.push(list_multiparts_uploads_result);
2862
2863 if !is_truncated {
2864 break;
2865 }
2866 }
2867
2868 Ok(results)
2869 }
2870
2871 #[maybe_async::maybe_async]
2904 pub async fn abort_upload(&self, key: &str, upload_id: &str) -> Result<(), S3Error> {
2905 let abort = Command::AbortMultipartUpload { upload_id };
2906 let abort_request = RequestImpl::new(self, key, abort).await?;
2907 let response_data = abort_request.response_data(false).await?;
2908
2909 if (200..300).contains(&response_data.status_code()) {
2910 Ok(())
2911 } else {
2912 let utf8_content = String::from_utf8(response_data.as_slice().to_vec())?;
2913 Err(S3Error::HttpFailWithBody(
2914 response_data.status_code(),
2915 utf8_content,
2916 ))
2917 }
2918 }
2919
2920 pub fn is_path_style(&self) -> bool {
2922 self.path_style
2923 }
2924
2925 pub fn is_subdomain_style(&self) -> bool {
2927 !self.path_style
2928 }
2929
2930 pub fn set_path_style(&mut self) {
2932 self.path_style = true;
2933 }
2934
2935 pub fn set_subdomain_style(&mut self) {
2937 self.path_style = false;
2938 }
2939
2940 pub fn set_request_timeout(&mut self, timeout: Option<Duration>) {
2947 self.request_timeout = timeout;
2948 }
2949
2950 pub fn set_listobjects_v1(&mut self) {
2956 self.listobjects_v2 = false;
2957 }
2958
2959 pub fn set_listobjects_v2(&mut self) {
2961 self.listobjects_v2 = true;
2962 }
2963
2964 pub fn name(&self) -> String {
2966 self.name.to_string()
2967 }
2968
2969 pub fn host(&self) -> String {
2971 if self.path_style {
2972 self.path_style_host()
2973 } else {
2974 self.subdomain_style_host()
2975 }
2976 }
2977
2978 pub fn url(&self) -> String {
2979 if self.path_style {
2980 format!(
2981 "{}://{}/{}",
2982 self.scheme(),
2983 self.path_style_host(),
2984 self.name()
2985 )
2986 } else {
2987 format!("{}://{}", self.scheme(), self.subdomain_style_host())
2988 }
2989 }
2990
2991 pub fn path_style_host(&self) -> String {
2993 self.region.host()
2994 }
2995
2996 pub fn subdomain_style_host(&self) -> String {
2997 format!("{}.{}", self.name, self.region.host())
2998 }
2999
3000 pub fn scheme(&self) -> String {
3005 self.region.scheme()
3006 }
3007
3008 pub fn region(&self) -> Region {
3010 self.region.clone()
3011 }
3012
3013 #[maybe_async::maybe_async]
3015 pub async fn access_key(&self) -> Result<Option<String>, S3Error> {
3016 Ok(self.credentials().await?.access_key)
3017 }
3018
3019 #[maybe_async::maybe_async]
3021 pub async fn secret_key(&self) -> Result<Option<String>, S3Error> {
3022 Ok(self.credentials().await?.secret_key)
3023 }
3024
3025 #[maybe_async::maybe_async]
3027 pub async fn security_token(&self) -> Result<Option<String>, S3Error> {
3028 Ok(self.credentials().await?.security_token)
3029 }
3030
3031 #[maybe_async::maybe_async]
3033 pub async fn session_token(&self) -> Result<Option<String>, S3Error> {
3034 Ok(self.credentials().await?.session_token)
3035 }
3036
3037 #[maybe_async::async_impl]
3040 pub async fn credentials(&self) -> Result<Credentials, S3Error> {
3041 Ok(self.credentials.read().await.clone())
3042 }
3043
3044 #[maybe_async::sync_impl]
3045 pub fn credentials(&self) -> Result<Credentials, S3Error> {
3046 match self.credentials.read() {
3047 Ok(credentials) => Ok(credentials.clone()),
3048 Err(_) => Err(S3Error::CredentialsReadLock),
3049 }
3050 }
3051
3052 pub fn set_credentials(&mut self, credentials: Credentials) {
3054 self.credentials = Arc::new(RwLock::new(credentials));
3055 }
3056
3057 pub fn add_header(&mut self, key: &str, value: &str) {
3070 self.extra_headers
3071 .insert(HeaderName::from_str(key).unwrap(), value.parse().unwrap());
3072 }
3073
3074 pub fn extra_headers(&self) -> &HeaderMap {
3076 &self.extra_headers
3077 }
3078
3079 pub fn extra_headers_mut(&mut self) -> &mut HeaderMap {
3082 &mut self.extra_headers
3083 }
3084
3085 pub fn add_query(&mut self, key: &str, value: &str) {
3087 self.extra_query.insert(key.into(), value.into());
3088 }
3089
3090 pub fn extra_query(&self) -> &Query {
3092 &self.extra_query
3093 }
3094
3095 pub fn extra_query_mut(&mut self) -> &mut Query {
3098 &mut self.extra_query
3099 }
3100
3101 pub fn request_timeout(&self) -> Option<Duration> {
3102 self.request_timeout
3103 }
3104}
3105
3106#[cfg(test)]
3107mod test {
3108
3109 use crate::BucketConfiguration;
3110 use crate::Tag;
3111 use crate::creds::Credentials;
3112 use crate::post_policy::{PostPolicyField, PostPolicyValue};
3113 use crate::region::Region;
3114 use crate::serde_types::{
3115 BucketLifecycleConfiguration, CorsConfiguration, CorsRule, Expiration, LifecycleFilter,
3116 LifecycleRule,
3117 };
3118 use crate::{Bucket, PostPolicy};
3119 use http::header::{CACHE_CONTROL, HeaderMap, HeaderName, HeaderValue};
3120 use std::env;
3121 #[cfg(all(not(feature = "sync"), feature = "with-tokio"))]
3122 use std::io::{Read, Write};
3123 #[cfg(all(not(feature = "sync"), feature = "with-tokio"))]
3124 use std::net::TcpListener;
3125 #[cfg(all(not(feature = "sync"), feature = "with-tokio"))]
3126 use std::sync::{
3127 Arc,
3128 atomic::{AtomicUsize, Ordering},
3129 };
3130 #[cfg(all(not(feature = "sync"), feature = "with-tokio"))]
3131 use std::thread;
3132
3133 fn init() {
3134 let _ = env_logger::builder().is_test(true).try_init();
3135 }
3136
3137 #[cfg(all(not(feature = "sync"), feature = "with-tokio"))]
3138 #[tokio::test]
3139 async fn test_object_exists_404_does_not_retry() {
3140 init();
3141
3142 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
3143 let endpoint = format!("http://{}", listener.local_addr().unwrap());
3144 let requests = Arc::new(AtomicUsize::new(0));
3145 let request_count = Arc::clone(&requests);
3146
3147 let server = thread::spawn(move || {
3148 let (mut stream, _) = listener.accept().unwrap();
3149 request_count.fetch_add(1, Ordering::SeqCst);
3150
3151 let mut buffer = [0; 2048];
3152 let _ = stream.read(&mut buffer).unwrap();
3153 stream
3154 .write_all(
3155 b"HTTP/1.1 404 Not Found\r\nContent-Length: 0\r\nConnection: close\r\n\r\n",
3156 )
3157 .unwrap();
3158 });
3159
3160 crate::set_retries(1);
3161
3162 let credentials = Credentials::new(
3163 Some("test_access_key"),
3164 Some("test_secret_key"),
3165 None,
3166 None,
3167 None,
3168 )
3169 .unwrap();
3170 let bucket = Bucket::new(
3171 "test-bucket",
3172 Region::Custom {
3173 region: "us-east-1".to_owned(),
3174 endpoint,
3175 },
3176 credentials,
3177 )
3178 .unwrap()
3179 .with_path_style();
3180
3181 let exists = bucket.object_exists("/missing.txt").await.unwrap();
3182
3183 crate::set_retries(1);
3184 server.join().unwrap();
3185
3186 assert!(!exists);
3187 assert_eq!(requests.load(Ordering::SeqCst), 1);
3188 }
3189
3190 #[test]
3191 #[cfg(any(feature = "tokio-native-tls", feature = "tokio-rustls-tls"))]
3192 #[allow(deprecated)]
3193 fn dangerous_config_correct_spelling_and_compat_alias_set_same_options() {
3194 let bucket = Bucket::new(
3195 "test-bucket",
3196 Region::Custom {
3197 region: "test-region".to_owned(),
3198 endpoint: "https://example.com".to_owned(),
3199 },
3200 Credentials::anonymous().unwrap(),
3201 )
3202 .unwrap();
3203
3204 let corrected = bucket.set_dangerous_config(true, true).unwrap();
3205 let deprecated_alias = bucket.set_dangereous_config(true, true).unwrap();
3206
3207 assert!(corrected.client_options.accept_invalid_certs);
3208 assert!(corrected.client_options.accept_invalid_hostnames);
3209 assert_eq!(
3210 corrected.client_options.accept_invalid_certs,
3211 deprecated_alias.client_options.accept_invalid_certs
3212 );
3213 assert_eq!(
3214 corrected.client_options.accept_invalid_hostnames,
3215 deprecated_alias.client_options.accept_invalid_hostnames
3216 );
3217 }
3218 fn test_aws_credentials() -> Credentials {
3219 Credentials::new(
3220 Some(&env::var("EU_AWS_ACCESS_KEY_ID").unwrap()),
3221 Some(&env::var("EU_AWS_SECRET_ACCESS_KEY").unwrap()),
3222 None,
3223 None,
3224 None,
3225 )
3226 .unwrap()
3227 }
3228
3229 fn test_gc_credentials() -> Credentials {
3230 Credentials::new(
3231 Some(&env::var("GC_ACCESS_KEY_ID").unwrap()),
3232 Some(&env::var("GC_SECRET_ACCESS_KEY").unwrap()),
3233 None,
3234 None,
3235 None,
3236 )
3237 .unwrap()
3238 }
3239
3240 fn test_wasabi_credentials() -> Credentials {
3241 Credentials::new(
3242 Some(&env::var("WASABI_ACCESS_KEY_ID").unwrap()),
3243 Some(&env::var("WASABI_SECRET_ACCESS_KEY").unwrap()),
3244 None,
3245 None,
3246 None,
3247 )
3248 .unwrap()
3249 }
3250
3251 fn test_minio_credentials() -> Credentials {
3252 Credentials::new(
3253 Some(&env::var("MINIO_ACCESS_KEY_ID").unwrap()),
3254 Some(&env::var("MINIO_SECRET_ACCESS_KEY").unwrap()),
3255 None,
3256 None,
3257 None,
3258 )
3259 .unwrap()
3260 }
3261
3262 fn test_digital_ocean_credentials() -> Credentials {
3263 Credentials::new(
3264 Some(&env::var("DIGITAL_OCEAN_ACCESS_KEY_ID").unwrap()),
3265 Some(&env::var("DIGITAL_OCEAN_SECRET_ACCESS_KEY").unwrap()),
3266 None,
3267 None,
3268 None,
3269 )
3270 .unwrap()
3271 }
3272
3273 fn test_r2_credentials() -> Credentials {
3274 Credentials::new(
3275 Some(&env::var("R2_ACCESS_KEY_ID").unwrap()),
3276 Some(&env::var("R2_SECRET_ACCESS_KEY").unwrap()),
3277 None,
3278 None,
3279 None,
3280 )
3281 .unwrap()
3282 }
3283
3284 fn test_aws_bucket() -> Box<Bucket> {
3285 Bucket::new(
3286 "rust-s3-test",
3287 "eu-central-1".parse().unwrap(),
3288 test_aws_credentials(),
3289 )
3290 .unwrap()
3291 }
3292
3293 fn test_wasabi_bucket() -> Box<Bucket> {
3294 Bucket::new(
3295 "rust-s3",
3296 "wa-eu-central-1".parse().unwrap(),
3297 test_wasabi_credentials(),
3298 )
3299 .unwrap()
3300 }
3301
3302 fn test_gc_bucket() -> Box<Bucket> {
3303 let mut bucket = Bucket::new(
3304 "rust-s3",
3305 Region::Custom {
3306 region: "us-east1".to_owned(),
3307 endpoint: "https://storage.googleapis.com".to_owned(),
3308 },
3309 test_gc_credentials(),
3310 )
3311 .unwrap();
3312 bucket.set_listobjects_v1();
3313 bucket
3314 }
3315
3316 fn test_minio_bucket() -> Box<Bucket> {
3317 Bucket::new(
3318 "rust-s3",
3319 Region::Custom {
3320 region: "us-east-1".to_owned(),
3321 endpoint: "http://localhost:9000".to_owned(),
3322 },
3323 test_minio_credentials(),
3324 )
3325 .unwrap()
3326 .with_path_style()
3327 }
3328
3329 fn test_presign_bucket() -> Box<Bucket> {
3332 Bucket::new(
3333 "rust-s3",
3334 Region::Custom {
3335 region: "us-east-1".to_owned(),
3336 endpoint: "http://localhost:9000".to_owned(),
3337 },
3338 Credentials::new(
3339 Some("test_access_key"),
3340 Some("test_secret_key"),
3341 None,
3342 None,
3343 None,
3344 )
3345 .unwrap(),
3346 )
3347 .unwrap()
3348 .with_path_style()
3349 }
3350
3351 #[allow(dead_code)]
3352 fn test_digital_ocean_bucket() -> Box<Bucket> {
3353 Bucket::new("rust-s3", Region::DoFra1, test_digital_ocean_credentials()).unwrap()
3354 }
3355
3356 fn test_r2_bucket() -> Box<Bucket> {
3357 Bucket::new(
3358 "rust-s3",
3359 Region::R2 {
3360 account_id: "f048f3132be36fa1aaa8611992002b3f".to_string(),
3361 },
3362 test_r2_credentials(),
3363 )
3364 .unwrap()
3365 }
3366
3367 fn object(size: u32) -> Vec<u8> {
3368 (0..size).map(|_| 33).collect()
3369 }
3370
3371 #[maybe_async::maybe_async]
3372 async fn put_head_get_delete_object(bucket: Bucket, head: bool) {
3373 let s3_path = "/+test.file";
3374 let non_existant_path = "/+non_existant.file";
3375 let test: Vec<u8> = object(3072);
3376
3377 let response_data = bucket.put_object(s3_path, &test).await.unwrap();
3378 assert_eq!(response_data.status_code(), 200);
3379
3380 let response_data = bucket.get_object(s3_path).await.unwrap();
3386 assert_eq!(response_data.status_code(), 200);
3387 assert_eq!(test, response_data.as_slice());
3388
3389 let exists = bucket.object_exists(s3_path).await.unwrap();
3390 assert!(exists);
3391
3392 let not_exists = bucket.object_exists(non_existant_path).await.unwrap();
3393 assert!(!not_exists);
3394
3395 let response_data = bucket
3396 .get_object_range(s3_path, 100, Some(1000))
3397 .await
3398 .unwrap();
3399 assert_eq!(response_data.status_code(), 206);
3400 assert_eq!(test[100..1001].to_vec(), response_data.as_slice());
3401
3402 let response_data = bucket
3404 .get_object_range(s3_path, 100, Some(100))
3405 .await
3406 .unwrap();
3407 assert_eq!(response_data.status_code(), 206);
3408 assert_eq!(vec![test[100]], response_data.as_slice());
3409
3410 if head {
3411 let (_head_object_result, code) = bucket.head_object(s3_path).await.unwrap();
3412 assert_eq!(code, 200);
3414 }
3415
3416 let response_data = bucket.delete_object(s3_path).await.unwrap();
3418 assert_eq!(response_data.status_code(), 204);
3419 }
3420
3421 #[maybe_async::maybe_async]
3422 async fn put_head_delete_object_with_headers(bucket: Bucket) {
3423 let s3_path = "/+test.file";
3424 let non_existant_path = "/+non_existant.file";
3425 let test: Vec<u8> = object(3072);
3426 let header_value = "max-age=42";
3427
3428 let mut custom_headers = HeaderMap::new();
3429 custom_headers.insert(CACHE_CONTROL, HeaderValue::from_static(header_value));
3430 custom_headers.insert(
3431 HeaderName::from_static("test-key"),
3432 "value".parse().unwrap(),
3433 );
3434
3435 let response_data = bucket
3436 .put_object_with_headers(s3_path, &test, Some(custom_headers.clone()))
3437 .await
3438 .expect("Put object with custom headers failed");
3439 assert_eq!(response_data.status_code(), 200);
3440
3441 let response_data = bucket.get_object(s3_path).await.unwrap();
3442 assert_eq!(response_data.status_code(), 200);
3443 assert_eq!(test, response_data.as_slice());
3444
3445 let exists = bucket.object_exists(s3_path).await.unwrap();
3446 assert!(exists);
3447
3448 let not_exists = bucket.object_exists(non_existant_path).await.unwrap();
3449 assert!(!not_exists);
3450
3451 let response_data = bucket
3452 .get_object_range(s3_path, 100, Some(1000))
3453 .await
3454 .unwrap();
3455 assert_eq!(response_data.status_code(), 206);
3456 assert_eq!(test[100..1001].to_vec(), response_data.as_slice());
3457
3458 let (head_object_result, code) = bucket.head_object(s3_path).await.unwrap();
3459 assert_eq!(code, 200);
3461 assert_eq!(
3462 head_object_result.cache_control,
3463 Some(header_value.to_string())
3464 );
3465
3466 let response_data = bucket.delete_object(s3_path).await.unwrap();
3467 assert_eq!(response_data.status_code(), 204);
3468 }
3469
3470 #[ignore]
3471 #[cfg(feature = "tags")]
3472 #[maybe_async::test(
3473 feature = "sync",
3474 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3475 async(
3476 all(not(feature = "sync"), feature = "with-async-std"),
3477 async_std::test
3478 )
3479 )]
3480 async fn test_tagging_aws() {
3481 let bucket = test_aws_bucket();
3482 let _target_tags = vec![
3483 Tag {
3484 key: "Tag1".to_string(),
3485 value: "Value1".to_string(),
3486 },
3487 Tag {
3488 key: "Tag2".to_string(),
3489 value: "Value2".to_string(),
3490 },
3491 ];
3492 let empty_tags: Vec<Tag> = Vec::new();
3493 let response_data = bucket
3494 .put_object("tagging_test", b"Gimme tags")
3495 .await
3496 .unwrap();
3497 assert_eq!(response_data.status_code(), 200);
3498 let (tags, _code) = bucket.get_object_tagging("tagging_test").await.unwrap();
3499 assert_eq!(tags, empty_tags);
3500 let response_data = bucket
3501 .put_object_tagging("tagging_test", &[("Tag1", "Value1"), ("Tag2", "Value2")])
3502 .await
3503 .unwrap();
3504 assert_eq!(response_data.status_code(), 200);
3505 let (_tags, _code) = bucket.get_object_tagging("tagging_test").await.unwrap();
3507 let _response_data = bucket.delete_object("tagging_test").await.unwrap();
3509 }
3510
3511 #[ignore]
3512 #[cfg(feature = "tags")]
3513 #[maybe_async::test(
3514 feature = "sync",
3515 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3516 async(
3517 all(not(feature = "sync"), feature = "with-async-std"),
3518 async_std::test
3519 )
3520 )]
3521 async fn test_tagging_minio() {
3522 let bucket = test_minio_bucket();
3523 let _target_tags = vec![
3524 Tag {
3525 key: "Tag1".to_string(),
3526 value: "Value1".to_string(),
3527 },
3528 Tag {
3529 key: "Tag2".to_string(),
3530 value: "Value2".to_string(),
3531 },
3532 ];
3533 let empty_tags: Vec<Tag> = Vec::new();
3534 let response_data = bucket
3535 .put_object("tagging_test", b"Gimme tags")
3536 .await
3537 .unwrap();
3538 assert_eq!(response_data.status_code(), 200);
3539 let (tags, _code) = bucket.get_object_tagging("tagging_test").await.unwrap();
3540 assert_eq!(tags, empty_tags);
3541 let response_data = bucket
3542 .put_object_tagging("tagging_test", &[("Tag1", "Value1"), ("Tag2", "Value2")])
3543 .await
3544 .unwrap();
3545 assert_eq!(response_data.status_code(), 200);
3546 let (_tags, _code) = bucket.get_object_tagging("tagging_test").await.unwrap();
3548 let _response_data = bucket.delete_object("tagging_test").await.unwrap();
3550 }
3551
3552 #[ignore]
3553 #[maybe_async::test(
3554 feature = "sync",
3555 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3556 async(
3557 all(not(feature = "sync"), feature = "with-async-std"),
3558 async_std::test
3559 )
3560 )]
3561 async fn streaming_big_aws_put_head_get_delete_object() {
3562 streaming_test_put_get_delete_big_object(*test_aws_bucket()).await;
3563 }
3564
3565 #[ignore]
3566 #[maybe_async::test(
3567 feature = "sync",
3568 async(
3569 all(
3570 not(feature = "sync"),
3571 not(feature = "tokio-rustls-tls"),
3572 feature = "with-tokio"
3573 ),
3574 tokio::test
3575 ),
3576 async(
3577 all(not(feature = "sync"), feature = "with-async-std"),
3578 async_std::test
3579 )
3580 )]
3581 async fn streaming_big_gc_put_head_get_delete_object() {
3582 streaming_test_put_get_delete_big_object(*test_gc_bucket()).await;
3583 }
3584
3585 #[ignore]
3586 #[maybe_async::test(
3587 feature = "sync",
3588 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3589 async(
3590 all(not(feature = "sync"), feature = "with-async-std"),
3591 async_std::test
3592 )
3593 )]
3594 async fn streaming_big_minio_put_head_get_delete_object() {
3595 streaming_test_put_get_delete_big_object(*test_minio_bucket()).await;
3596 }
3597
3598 #[ignore]
3599 #[maybe_async::test(
3600 feature = "sync",
3601 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3602 async(
3603 all(not(feature = "sync"), feature = "with-async-std"),
3604 async_std::test
3605 )
3606 )]
3607 async fn streaming_big_r2_put_head_get_delete_object() {
3608 streaming_test_put_get_delete_big_object(*test_r2_bucket()).await;
3609 }
3610
3611 #[maybe_async::maybe_async]
3613 async fn streaming_test_put_get_delete_big_object(bucket: Bucket) {
3614 #[cfg(feature = "with-async-std")]
3615 use async_std::fs::File;
3616 #[cfg(feature = "with-async-std")]
3617 use async_std::io::WriteExt;
3618 #[cfg(feature = "with-async-std")]
3619 use async_std::stream::StreamExt;
3620 #[cfg(feature = "with-tokio")]
3621 use futures_util::StreamExt;
3622 #[cfg(not(any(feature = "with-tokio", feature = "with-async-std")))]
3623 use std::fs::File;
3624 #[cfg(not(any(feature = "with-tokio", feature = "with-async-std")))]
3625 use std::io::Write;
3626 #[cfg(feature = "with-tokio")]
3627 use tokio::fs::File;
3628 #[cfg(feature = "with-tokio")]
3629 use tokio::io::AsyncWriteExt;
3630
3631 init();
3632 let remote_path = "+stream_test_big";
3633 let local_path = "+stream_test_big";
3634 std::fs::remove_file(remote_path).unwrap_or(());
3635 let content: Vec<u8> = object(20_000_000);
3636
3637 let mut file = File::create(local_path).await.unwrap();
3638 file.write_all(&content).await.unwrap();
3639 file.flush().await.unwrap();
3640 let mut reader = File::open(local_path).await.unwrap();
3641
3642 let response = bucket
3643 .put_object_stream(&mut reader, remote_path)
3644 .await
3645 .unwrap();
3646 #[cfg(not(feature = "sync"))]
3647 assert_eq!(response.status_code(), 200);
3648 #[cfg(feature = "sync")]
3649 assert_eq!(response, 200);
3650 let mut writer = Vec::new();
3651 let code = bucket
3652 .get_object_to_writer(remote_path, &mut writer)
3653 .await
3654 .unwrap();
3655 assert_eq!(code, 200);
3656 assert_eq!(content.len(), writer.len());
3658 assert_eq!(content.len(), 20_000_000);
3659
3660 #[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
3661 {
3662 let mut response_data_stream = bucket.get_object_stream(remote_path).await.unwrap();
3663
3664 let mut bytes = vec![];
3665
3666 while let Some(chunk) = response_data_stream.bytes().next().await {
3667 bytes.push(chunk)
3668 }
3669 assert_ne!(bytes.len(), 0);
3670 }
3671
3672 let response_data = bucket.delete_object(remote_path).await.unwrap();
3673 assert_eq!(response_data.status_code(), 204);
3674 std::fs::remove_file(local_path).unwrap_or(());
3675 }
3676
3677 #[ignore]
3678 #[maybe_async::test(
3679 feature = "sync",
3680 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3681 async(
3682 all(not(feature = "sync"), feature = "with-async-std"),
3683 async_std::test
3684 )
3685 )]
3686 async fn streaming_aws_put_head_get_delete_object() {
3687 streaming_test_put_get_delete_small_object(test_aws_bucket()).await;
3688 }
3689
3690 #[ignore]
3691 #[maybe_async::test(
3692 feature = "sync",
3693 async(
3694 all(
3695 not(feature = "sync"),
3696 not(feature = "tokio-rustls-tls"),
3697 feature = "with-tokio"
3698 ),
3699 tokio::test
3700 ),
3701 async(
3702 all(not(feature = "sync"), feature = "with-async-std"),
3703 async_std::test
3704 )
3705 )]
3706 async fn streaming_gc_put_head_get_delete_object() {
3707 streaming_test_put_get_delete_small_object(test_gc_bucket()).await;
3708 }
3709
3710 #[ignore]
3711 #[maybe_async::test(
3712 feature = "sync",
3713 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3714 async(
3715 all(not(feature = "sync"), feature = "with-async-std"),
3716 async_std::test
3717 )
3718 )]
3719 async fn streaming_r2_put_head_get_delete_object() {
3720 streaming_test_put_get_delete_small_object(test_r2_bucket()).await;
3721 }
3722
3723 #[ignore]
3724 #[maybe_async::test(
3725 feature = "sync",
3726 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3727 async(
3728 all(not(feature = "sync"), feature = "with-async-std"),
3729 async_std::test
3730 )
3731 )]
3732 async fn streaming_minio_put_head_get_delete_object() {
3733 streaming_test_put_get_delete_small_object(test_minio_bucket()).await;
3734 }
3735
3736 #[maybe_async::maybe_async]
3737 async fn streaming_test_put_get_delete_small_object(bucket: Box<Bucket>) {
3738 init();
3739 let remote_path = "+stream_test_small";
3740 let content: Vec<u8> = object(1000);
3741 #[cfg(feature = "with-tokio")]
3742 let mut reader = std::io::Cursor::new(&content);
3743 #[cfg(feature = "with-async-std")]
3744 let mut reader = async_std::io::Cursor::new(&content);
3745 #[cfg(feature = "sync")]
3746 let mut reader = std::io::Cursor::new(&content);
3747
3748 let response = bucket
3749 .put_object_stream(&mut reader, remote_path)
3750 .await
3751 .unwrap();
3752 #[cfg(not(feature = "sync"))]
3753 assert_eq!(response.status_code(), 200);
3754 #[cfg(feature = "sync")]
3755 assert_eq!(response, 200);
3756 let mut writer = Vec::new();
3757 let code = bucket
3758 .get_object_to_writer(remote_path, &mut writer)
3759 .await
3760 .unwrap();
3761 assert_eq!(code, 200);
3762 assert_eq!(content, writer);
3763
3764 let response_data = bucket.delete_object(remote_path).await.unwrap();
3765 assert_eq!(response_data.status_code(), 204);
3766 }
3767
3768 #[cfg(feature = "blocking")]
3769 fn put_head_get_list_delete_object_blocking(bucket: Bucket) {
3770 let s3_path = "/test_blocking.file";
3771 let s3_path_2 = "/test_blocking.file2";
3772 let s3_path_3 = "/test_blocking.file3";
3773 let test: Vec<u8> = object(3072);
3774
3775 let response_data = bucket.put_object_blocking(s3_path, &test).unwrap();
3777 assert_eq!(response_data.status_code(), 200);
3778
3779 let response_data = bucket.get_object_blocking(s3_path).unwrap();
3781 assert_eq!(response_data.status_code(), 200);
3782 assert_eq!(test, response_data.as_slice());
3783
3784 let response_data = bucket
3786 .get_object_range_blocking(s3_path, 100, Some(1000))
3787 .unwrap();
3788 assert_eq!(response_data.status_code(), 206);
3789 assert_eq!(test[100..1001].to_vec(), response_data.as_slice());
3790
3791 let response_data = bucket
3793 .get_object_range_blocking(s3_path, 100, Some(100))
3794 .unwrap();
3795 assert_eq!(response_data.status_code(), 206);
3796 assert_eq!(vec![test[100]], response_data.as_slice());
3797
3798 let (head_object_result, code) = bucket.head_object_blocking(s3_path).unwrap();
3800 assert_eq!(code, 200);
3801 assert_eq!(
3802 head_object_result.content_type.unwrap(),
3803 "application/octet-stream".to_owned()
3804 );
3805 let response_data = bucket.put_object_blocking(s3_path_2, &test).unwrap();
3809 assert_eq!(response_data.status_code(), 200);
3810 let response_data = bucket.put_object_blocking(s3_path_3, &test).unwrap();
3811 assert_eq!(response_data.status_code(), 200);
3812
3813 let (result, code) = bucket
3815 .list_page_blocking(
3816 "test_blocking.".to_string(),
3817 Some("/".to_string()),
3818 None,
3819 None,
3820 Some(2),
3821 )
3822 .unwrap();
3823 assert_eq!(code, 200);
3824 assert_eq!(result.contents.len(), 2);
3825 assert_eq!(result.contents[0].key, s3_path[1..]);
3826 assert_eq!(result.contents[1].key, s3_path_2[1..]);
3827
3828 let cont_token = result.next_continuation_token.unwrap();
3829
3830 let (result, code) = bucket
3831 .list_page_blocking(
3832 "test_blocking.".to_string(),
3833 Some("/".to_string()),
3834 Some(cont_token),
3835 None,
3836 Some(2),
3837 )
3838 .unwrap();
3839 assert_eq!(code, 200);
3840 assert_eq!(result.contents.len(), 1);
3841 assert_eq!(result.contents[0].key, s3_path_3[1..]);
3842 assert!(result.next_continuation_token.is_none());
3843
3844 let response_data = bucket.delete_object_blocking(s3_path).unwrap();
3846 assert_eq!(code, 200);
3847 let response_data = bucket.delete_object_blocking(s3_path_2).unwrap();
3848 assert_eq!(code, 200);
3849 let response_data = bucket.delete_object_blocking(s3_path_3).unwrap();
3850 assert_eq!(code, 200);
3851 }
3852
3853 #[ignore]
3854 #[cfg(all(
3855 any(feature = "with-tokio", feature = "with-async-std"),
3856 feature = "blocking"
3857 ))]
3858 #[test]
3859 fn aws_put_head_get_delete_object_blocking() {
3860 put_head_get_list_delete_object_blocking(*test_aws_bucket())
3861 }
3862
3863 #[ignore]
3864 #[cfg(all(
3865 any(feature = "with-tokio", feature = "with-async-std"),
3866 feature = "blocking"
3867 ))]
3868 #[test]
3869 fn gc_put_head_get_delete_object_blocking() {
3870 put_head_get_list_delete_object_blocking(*test_gc_bucket())
3871 }
3872
3873 #[ignore]
3874 #[cfg(all(
3875 any(feature = "with-tokio", feature = "with-async-std"),
3876 feature = "blocking"
3877 ))]
3878 #[test]
3879 fn wasabi_put_head_get_delete_object_blocking() {
3880 put_head_get_list_delete_object_blocking(*test_wasabi_bucket())
3881 }
3882
3883 #[ignore]
3884 #[cfg(all(
3885 any(feature = "with-tokio", feature = "with-async-std"),
3886 feature = "blocking"
3887 ))]
3888 #[test]
3889 fn minio_put_head_get_delete_object_blocking() {
3890 put_head_get_list_delete_object_blocking(*test_minio_bucket())
3891 }
3892
3893 #[ignore]
3894 #[cfg(all(
3895 any(feature = "with-tokio", feature = "with-async-std"),
3896 feature = "blocking"
3897 ))]
3898 #[test]
3899 fn digital_ocean_put_head_get_delete_object_blocking() {
3900 put_head_get_list_delete_object_blocking(*test_digital_ocean_bucket())
3901 }
3902
3903 #[ignore]
3904 #[maybe_async::test(
3905 feature = "sync",
3906 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3907 async(
3908 all(not(feature = "sync"), feature = "with-async-std"),
3909 async_std::test
3910 )
3911 )]
3912 async fn aws_put_head_get_delete_object() {
3913 put_head_get_delete_object(*test_aws_bucket(), true).await;
3914 put_head_delete_object_with_headers(*test_aws_bucket()).await;
3915 }
3916
3917 #[ignore]
3918 #[maybe_async::test(
3919 feature = "sync",
3920 async(
3921 all(
3922 not(any(feature = "sync", feature = "tokio-rustls-tls")),
3923 feature = "with-tokio"
3924 ),
3925 tokio::test
3926 ),
3927 async(
3928 all(not(feature = "sync"), feature = "with-async-std"),
3929 async_std::test
3930 )
3931 )]
3932 async fn gc_test_put_head_get_delete_object() {
3933 put_head_get_delete_object(*test_gc_bucket(), true).await;
3934 put_head_delete_object_with_headers(*test_gc_bucket()).await;
3935 }
3936
3937 #[ignore]
3938 #[maybe_async::test(
3939 feature = "sync",
3940 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3941 async(
3942 all(not(feature = "sync"), feature = "with-async-std"),
3943 async_std::test
3944 )
3945 )]
3946 async fn wasabi_test_put_head_get_delete_object() {
3947 put_head_get_delete_object(*test_wasabi_bucket(), true).await;
3948 put_head_delete_object_with_headers(*test_wasabi_bucket()).await;
3949 }
3950
3951 #[ignore]
3952 #[maybe_async::test(
3953 feature = "sync",
3954 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3955 async(
3956 all(not(feature = "sync"), feature = "with-async-std"),
3957 async_std::test
3958 )
3959 )]
3960 async fn minio_test_put_head_get_delete_object() {
3961 put_head_get_delete_object(*test_minio_bucket(), true).await;
3962 put_head_delete_object_with_headers(*test_minio_bucket()).await;
3963 }
3964
3965 #[ignore]
3980 #[maybe_async::test(
3981 feature = "sync",
3982 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3983 async(
3984 all(not(feature = "sync"), feature = "with-async-std"),
3985 async_std::test
3986 )
3987 )]
3988 async fn r2_test_put_head_get_delete_object() {
3989 put_head_get_delete_object(*test_r2_bucket(), false).await;
3990 put_head_delete_object_with_headers(*test_r2_bucket()).await;
3991 }
3992
3993 #[maybe_async::maybe_async]
3994 async fn put_delete_objects(bucket: Bucket) {
3995 use crate::serde_types::ObjectIdentifier;
3996
3997 let paths = [
3998 "/+bulk_delete_1.file",
3999 "/+bulk_delete_2.file",
4000 "/+bulk_delete_3.file",
4001 ];
4002 let test: Vec<u8> = object(128);
4003
4004 for path in &paths {
4006 let response_data = bucket.put_object(*path, &test).await.unwrap();
4007 assert_eq!(response_data.status_code(), 200);
4008 }
4009
4010 let objects: Vec<ObjectIdentifier> =
4012 paths.iter().map(|p| ObjectIdentifier::new(*p)).collect();
4013 let result = bucket.delete_objects(objects).await.unwrap();
4014
4015 assert_eq!(result.deleted.len(), 3);
4016 assert!(result.errors.is_empty());
4017
4018 for path in &paths {
4020 let exists = bucket.object_exists(*path).await.unwrap();
4021 assert!(!exists);
4022 }
4023 }
4024
4025 #[ignore]
4026 #[maybe_async::test(
4027 feature = "sync",
4028 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
4029 async(
4030 all(not(feature = "sync"), feature = "with-async-std"),
4031 async_std::test
4032 )
4033 )]
4034 async fn aws_test_delete_objects() {
4035 put_delete_objects(*test_aws_bucket()).await;
4036 }
4037
4038 #[ignore]
4039 #[maybe_async::test(
4040 feature = "sync",
4041 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
4042 async(
4043 all(not(feature = "sync"), feature = "with-async-std"),
4044 async_std::test
4045 )
4046 )]
4047 async fn minio_test_delete_objects() {
4048 put_delete_objects(*test_minio_bucket()).await;
4049 }
4050
4051 #[maybe_async::test(
4052 feature = "sync",
4053 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
4054 async(
4055 all(not(feature = "sync"), feature = "with-async-std"),
4056 async_std::test
4057 )
4058 )]
4059 async fn test_presign_put() {
4060 let s3_path = "/test/test.file";
4061 let bucket = test_presign_bucket();
4062
4063 let mut custom_headers = HeaderMap::new();
4064 custom_headers.insert(
4065 HeaderName::from_static("custom_header"),
4066 "custom_value".parse().unwrap(),
4067 );
4068
4069 let url = bucket
4070 .presign_put(s3_path, 86400, Some(custom_headers), None)
4071 .await
4072 .unwrap();
4073
4074 assert!(url.contains("custom_header%3Bhost"));
4075 assert!(url.contains("/test/test.file"))
4076 }
4077
4078 #[maybe_async::test(
4079 feature = "sync",
4080 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
4081 async(
4082 all(not(feature = "sync"), feature = "with-async-std"),
4083 async_std::test
4084 )
4085 )]
4086 async fn test_presign_post() {
4087 use std::borrow::Cow;
4088
4089 let bucket = test_presign_bucket();
4090
4091 let policy = PostPolicy::new(86400)
4093 .condition(
4094 PostPolicyField::Key,
4095 PostPolicyValue::StartsWith(Cow::from("user/user1/")),
4096 )
4097 .unwrap();
4098
4099 let data = bucket.presign_post(policy).await.unwrap();
4100
4101 assert_eq!(data.url, "http://localhost:9000/rust-s3");
4102 assert_eq!(data.fields.len(), 6);
4103 assert_eq!(data.dynamic_fields.len(), 1);
4104 }
4105
4106 #[maybe_async::test(
4107 feature = "sync",
4108 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
4109 async(
4110 all(not(feature = "sync"), feature = "with-async-std"),
4111 async_std::test
4112 )
4113 )]
4114 async fn test_presign_get() {
4115 let s3_path = "/test/test.file";
4116 let bucket = test_presign_bucket();
4117
4118 let url = bucket.presign_get(s3_path, 86400, None).await.unwrap();
4119 assert!(url.contains("/test/test.file?"))
4120 }
4121
4122 #[maybe_async::test(
4123 feature = "sync",
4124 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
4125 async(
4126 all(not(feature = "sync"), feature = "with-async-std"),
4127 async_std::test
4128 )
4129 )]
4130 async fn test_presign_delete() {
4131 let s3_path = "/test/test.file";
4132 let bucket = test_presign_bucket();
4133
4134 let url = bucket.presign_delete(s3_path, 86400).await.unwrap();
4135 assert!(url.contains("/test/test.file?"))
4136 }
4137
4138 #[maybe_async::test(
4139 feature = "sync",
4140 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
4141 async(
4142 all(not(feature = "sync"), feature = "with-async-std"),
4143 async_std::test
4144 )
4145 )]
4146 async fn test_presign_url_standard_ports() {
4147 let region_http_80 = Region::Custom {
4152 region: "eu-central-1".to_owned(),
4153 endpoint: "http://minio:80".to_owned(),
4154 };
4155 let credentials = Credentials::new(
4156 Some("test_access_key"),
4157 Some("test_secret_key"),
4158 None,
4159 None,
4160 None,
4161 )
4162 .unwrap();
4163 let bucket_http_80 = Bucket::new("test-bucket", region_http_80, credentials.clone())
4164 .unwrap()
4165 .with_path_style();
4166
4167 let presigned_url_80 = bucket_http_80
4168 .presign_get("/test.file", 3600, None)
4169 .await
4170 .unwrap();
4171 println!("Presigned URL with port 80: {}", presigned_url_80);
4172
4173 assert!(
4175 presigned_url_80.starts_with("http://minio:80/"),
4176 "URL must preserve port 80, got: {}",
4177 presigned_url_80
4178 );
4179
4180 let region_https_443 = Region::Custom {
4182 region: "eu-central-1".to_owned(),
4183 endpoint: "https://minio:443".to_owned(),
4184 };
4185 let bucket_https_443 = Bucket::new("test-bucket", region_https_443, credentials.clone())
4186 .unwrap()
4187 .with_path_style();
4188
4189 let presigned_url_443 = bucket_https_443
4190 .presign_get("/test.file", 3600, None)
4191 .await
4192 .unwrap();
4193 println!("Presigned URL with port 443: {}", presigned_url_443);
4194
4195 assert!(
4197 presigned_url_443.starts_with("https://minio:443/"),
4198 "URL must preserve port 443, got: {}",
4199 presigned_url_443
4200 );
4201
4202 let region_http_9000 = Region::Custom {
4204 region: "eu-central-1".to_owned(),
4205 endpoint: "http://minio:9000".to_owned(),
4206 };
4207 let bucket_http_9000 = Bucket::new("test-bucket", region_http_9000, credentials)
4208 .unwrap()
4209 .with_path_style();
4210
4211 let presigned_url_9000 = bucket_http_9000
4212 .presign_get("/test.file", 3600, None)
4213 .await
4214 .unwrap();
4215 assert!(
4216 presigned_url_9000.contains("minio:9000"),
4217 "Non-standard port should be preserved in URL"
4218 );
4219 }
4220
4221 #[maybe_async::test(
4222 feature = "sync",
4223 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
4224 async(
4225 all(not(feature = "sync"), feature = "with-async-std"),
4226 async_std::test
4227 )
4228 )]
4229 #[ignore]
4230 async fn test_bucket_create_delete_default_region() {
4231 let config = BucketConfiguration::default();
4232 let response = Bucket::create(
4233 &uuid::Uuid::new_v4().to_string(),
4234 "us-east-1".parse().unwrap(),
4235 test_aws_credentials(),
4236 config,
4237 )
4238 .await
4239 .unwrap();
4240
4241 assert_eq!(&response.response_text, "");
4242
4243 assert_eq!(response.response_code, 200);
4244
4245 let response_code = response.bucket.delete().await.unwrap();
4246 assert!(response_code < 300);
4247 }
4248
4249 #[ignore]
4250 #[maybe_async::test(
4251 feature = "sync",
4252 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
4253 async(
4254 all(not(feature = "sync"), feature = "with-async-std"),
4255 async_std::test
4256 )
4257 )]
4258 async fn test_bucket_create_delete_non_default_region() {
4259 let config = BucketConfiguration::default();
4260 let response = Bucket::create(
4261 &uuid::Uuid::new_v4().to_string(),
4262 "eu-central-1".parse().unwrap(),
4263 test_aws_credentials(),
4264 config,
4265 )
4266 .await
4267 .unwrap();
4268
4269 assert_eq!(&response.response_text, "");
4270
4271 assert_eq!(response.response_code, 200);
4272
4273 let response_code = response.bucket.delete().await.unwrap();
4274 assert!(response_code < 300);
4275 }
4276
4277 #[ignore]
4278 #[maybe_async::test(
4279 feature = "sync",
4280 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
4281 async(
4282 all(not(feature = "sync"), feature = "with-async-std"),
4283 async_std::test
4284 )
4285 )]
4286 async fn test_bucket_create_delete_non_default_region_public() {
4287 let config = BucketConfiguration::public();
4288 let response = Bucket::create(
4289 &uuid::Uuid::new_v4().to_string(),
4290 "eu-central-1".parse().unwrap(),
4291 test_aws_credentials(),
4292 config,
4293 )
4294 .await
4295 .unwrap();
4296
4297 assert_eq!(&response.response_text, "");
4298
4299 assert_eq!(response.response_code, 200);
4300
4301 let response_code = response.bucket.delete().await.unwrap();
4302 assert!(response_code < 300);
4303 }
4304
4305 #[test]
4306 fn test_tag_has_key_and_value_functions() {
4307 let key = "key".to_owned();
4308 let value = "value".to_owned();
4309 let tag = Tag { key, value };
4310 assert_eq!["key", tag.key()];
4311 assert_eq!["value", tag.value()];
4312 }
4313
4314 #[test]
4315 #[ignore]
4316 fn test_builder_composition() {
4317 use std::time::Duration;
4318
4319 let bucket = Bucket::new(
4320 "test-bucket",
4321 "eu-central-1".parse().unwrap(),
4322 test_aws_credentials(),
4323 )
4324 .unwrap()
4325 .with_request_timeout(Duration::from_secs(10))
4326 .unwrap();
4327
4328 assert_eq!(bucket.request_timeout(), Some(Duration::from_secs(10)));
4329 }
4330
4331 #[maybe_async::test(
4332 feature = "sync",
4333 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
4334 async(
4335 all(not(feature = "sync"), feature = "with-async-std"),
4336 async_std::test
4337 )
4338 )]
4339 #[ignore]
4340 async fn test_bucket_cors() {
4341 let bucket = test_aws_bucket();
4342 let rule = CorsRule::new(
4343 None,
4344 vec!["GET".to_string()],
4345 vec!["*".to_string()],
4346 None,
4347 None,
4348 None,
4349 );
4350 let expected_bucket_owner = "904662384344";
4351 let cors_config = CorsConfiguration::new(vec![rule]);
4352 let response = bucket
4353 .put_bucket_cors(expected_bucket_owner, &cors_config)
4354 .await
4355 .unwrap();
4356 assert_eq!(response.status_code(), 200);
4357
4358 let cors_response = bucket.get_bucket_cors(expected_bucket_owner).await.unwrap();
4359 assert_eq!(cors_response, cors_config);
4360
4361 let response = bucket
4362 .delete_bucket_cors(expected_bucket_owner)
4363 .await
4364 .unwrap();
4365 assert_eq!(response.status_code(), 204);
4366 }
4367
4368 #[maybe_async::test(
4369 feature = "sync",
4370 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
4371 async(
4372 all(not(feature = "sync"), feature = "with-async-std"),
4373 async_std::test
4374 )
4375 )]
4376 #[ignore]
4377 async fn test_bucket_lifecycle() {
4378 let bucket = test_aws_bucket();
4379
4380 let rule = LifecycleRule::builder("Enabled")
4382 .id("test-rule")
4383 .filter(LifecycleFilter {
4384 prefix: Some("test/".to_string()),
4385 ..Default::default()
4386 })
4387 .expiration(Expiration {
4388 days: Some(1),
4389 ..Default::default()
4390 })
4391 .build();
4392
4393 let lifecycle_config = BucketLifecycleConfiguration::new(vec![rule]);
4394
4395 let response = bucket
4397 .put_bucket_lifecycle(lifecycle_config.clone())
4398 .await
4399 .unwrap();
4400 assert_eq!(response.status_code(), 200);
4401
4402 let retrieved_config = bucket.get_bucket_lifecycle().await.unwrap();
4404 assert_eq!(retrieved_config.rules.len(), 1);
4405 assert_eq!(retrieved_config.rules[0].id, Some("test-rule".to_string()));
4406 assert_eq!(retrieved_config.rules[0].status, "Enabled");
4407
4408 let response = bucket.delete_bucket_lifecycle().await.unwrap();
4410 assert_eq!(response.status_code(), 204);
4411 }
4412
4413 #[ignore]
4414 #[cfg(any(feature = "tokio-native-tls", feature = "tokio-rustls-tls"))]
4415 #[maybe_async::test(
4416 feature = "sync",
4417 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
4418 async(
4419 all(not(feature = "sync"), feature = "with-async-std"),
4420 async_std::test
4421 )
4422 )]
4423 async fn test_bucket_exists_with_dangerous_config() {
4424 init();
4425
4426 let credentials = test_aws_credentials();
4434 let region = "eu-central-1".parse().unwrap();
4435 let bucket_name = "rust-s3-test";
4436
4437 let bucket = Bucket::new(bucket_name, region, credentials)
4439 .unwrap()
4440 .with_path_style();
4441
4442 let bucket = bucket.set_dangerous_config(true, true).unwrap();
4444
4445 let exists_result = bucket.exists().await;
4448
4449 assert!(
4451 exists_result.is_ok(),
4452 "Bucket::exists() failed with dangerous config"
4453 );
4454 let exists = exists_result.unwrap();
4455 assert!(exists, "Test bucket should exist");
4456
4457 let list_result = bucket.list("".to_string(), Some("/".to_string())).await;
4460 assert!(
4461 list_result.is_ok(),
4462 "List operation should work with dangerous config"
4463 );
4464 }
4465
4466 #[ignore]
4467 #[maybe_async::test(
4468 feature = "sync",
4469 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
4470 async(
4471 all(not(feature = "sync"), feature = "with-async-std"),
4472 async_std::test
4473 )
4474 )]
4475 async fn test_bucket_exists_without_dangerous_config() {
4476 init();
4477
4478 let credentials = test_aws_credentials();
4480 let region = "eu-central-1".parse().unwrap();
4481 let bucket_name = "rust-s3-test";
4482
4483 let bucket = Bucket::new(bucket_name, region, credentials)
4485 .unwrap()
4486 .with_path_style();
4487
4488 let exists_result = bucket.exists().await;
4490 assert!(
4491 exists_result.is_ok(),
4492 "Bucket::exists() should work without dangerous config"
4493 );
4494 let exists = exists_result.unwrap();
4495 assert!(exists, "Test bucket should exist");
4496 }
4497}