1#[cfg(feature = "blocking")]
37use block_on_proc::block_on;
38#[cfg(feature = "tags")]
39use minidom::Element;
40use std::collections::HashMap;
41use std::time::Duration;
42
43use crate::bucket_ops::{BucketConfiguration, CreateBucketResponse};
44use crate::command::{Command, Multipart};
45use crate::creds::Credentials;
46use crate::region::Region;
47#[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
48use crate::request::ResponseDataStream;
49#[cfg(feature = "with-tokio")]
50use crate::request::tokio_backend::ClientOptions;
51#[cfg(feature = "with-tokio")]
52use crate::request::tokio_backend::client;
53use crate::request::{Request as _, ResponseData};
54use std::str::FromStr;
55use std::sync::Arc;
56
57#[cfg(feature = "with-tokio")]
58use tokio::sync::RwLock;
59
60#[cfg(feature = "with-async-std")]
61use async_std::sync::RwLock;
62
63#[cfg(feature = "sync")]
64use std::sync::RwLock;
65
66pub type Query = HashMap<String, String>;
67
68#[cfg(feature = "with-async-std")]
69use crate::request::async_std_backend::SurfRequest as RequestImpl;
70#[cfg(feature = "with-tokio")]
71use crate::request::tokio_backend::ReqwestRequest as RequestImpl;
72
73#[cfg(feature = "with-async-std")]
74use async_std::io::Write as AsyncWrite;
75#[cfg(feature = "with-tokio")]
76use tokio::io::AsyncWrite;
77
78#[cfg(feature = "sync")]
79use crate::request::blocking::AttoRequest as RequestImpl;
80use std::io::Read;
81
82#[cfg(feature = "with-tokio")]
83use tokio::io::AsyncRead;
84
85#[cfg(feature = "with-async-std")]
86use async_std::io::Read as AsyncRead;
87
88use crate::PostPolicy;
89use crate::error::S3Error;
90use crate::post_policy::PresignedPost;
91use crate::serde_types::{
92 BucketLifecycleConfiguration, BucketLocationResult, CompleteMultipartUploadData,
93 CorsConfiguration, GetObjectAttributesOutput, HeadObjectResult,
94 InitiateMultipartUploadResponse, ListBucketResult, ListMultipartUploadsResult, Part,
95};
96#[allow(unused_imports)]
97use crate::utils::{PutStreamResponse, error_from_response_data};
98use http::HeaderMap;
99use http::header::HeaderName;
100#[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
101use sysinfo::{MemoryRefreshKind, System};
102
103pub const CHUNK_SIZE: usize = 8_388_608; const DEFAULT_REQUEST_TIMEOUT: Option<Duration> = Some(Duration::from_secs(60));
106
107#[derive(Debug, PartialEq, Eq)]
108pub struct Tag {
109 key: String,
110 value: String,
111}
112
113impl Tag {
114 pub fn key(&self) -> String {
115 self.key.to_owned()
116 }
117
118 pub fn value(&self) -> String {
119 self.value.to_owned()
120 }
121}
122
123#[derive(Clone, Debug)]
138pub struct Bucket {
139 pub name: String,
140 pub region: Region,
141 credentials: Arc<RwLock<Credentials>>,
142 pub extra_headers: HeaderMap,
143 pub extra_query: Query,
144 pub request_timeout: Option<Duration>,
145 path_style: bool,
146 listobjects_v2: bool,
147 #[cfg(feature = "with-tokio")]
148 http_client: reqwest::Client,
149 #[cfg(feature = "with-tokio")]
150 client_options: crate::request::tokio_backend::ClientOptions,
151}
152
153impl Bucket {
154 #[maybe_async::async_impl]
155 pub async fn credentials_refresh(&self) -> Result<(), S3Error> {
157 Ok(self.credentials.write().await.refresh()?)
158 }
159
160 #[maybe_async::sync_impl]
161 pub fn credentials_refresh(&self) -> Result<(), S3Error> {
163 match self.credentials.write() {
164 Ok(mut credentials) => Ok(credentials.refresh()?),
165 Err(_) => Err(S3Error::CredentialsWriteLock),
166 }
167 }
168
169 #[cfg(feature = "with-tokio")]
170 pub fn http_client(&self) -> reqwest::Client {
171 self.http_client.clone()
172 }
173}
174
175fn validate_expiry(expiry_secs: u32) -> Result<(), S3Error> {
176 if 604800 < expiry_secs {
177 return Err(S3Error::MaxExpiry(expiry_secs));
178 }
179 Ok(())
180}
181
182#[cfg_attr(all(feature = "with-tokio", feature = "blocking"), block_on("tokio"))]
183#[cfg_attr(
184 all(feature = "with-async-std", feature = "blocking"),
185 block_on("async-std")
186)]
187impl Bucket {
188 #[maybe_async::maybe_async]
216 pub async fn presign_get<S: AsRef<str>>(
217 &self,
218 path: S,
219 expiry_secs: u32,
220 custom_queries: Option<HashMap<String, String>>,
221 ) -> Result<String, S3Error> {
222 validate_expiry(expiry_secs)?;
223 let request = RequestImpl::new(
224 self,
225 path.as_ref(),
226 Command::PresignGet {
227 expiry_secs,
228 custom_queries,
229 },
230 )
231 .await?;
232 request.presigned().await
233 }
234
235 #[maybe_async::maybe_async]
262 #[allow(clippy::needless_lifetimes)]
263 pub async fn presign_post<'a>(
264 &self,
265 post_policy: PostPolicy<'a>,
266 ) -> Result<PresignedPost, S3Error> {
267 post_policy.sign(Box::new(self.clone())).await
268 }
269
270 #[maybe_async::maybe_async]
298 pub async fn presign_put<S: AsRef<str>>(
299 &self,
300 path: S,
301 expiry_secs: u32,
302 custom_headers: Option<HeaderMap>,
303 custom_queries: Option<HashMap<String, String>>,
304 ) -> Result<String, S3Error> {
305 validate_expiry(expiry_secs)?;
306 let request = RequestImpl::new(
307 self,
308 path.as_ref(),
309 Command::PresignPut {
310 expiry_secs,
311 custom_headers,
312 custom_queries,
313 },
314 )
315 .await?;
316 request.presigned().await
317 }
318
319 #[maybe_async::maybe_async]
340 pub async fn presign_delete<S: AsRef<str>>(
341 &self,
342 path: S,
343 expiry_secs: u32,
344 ) -> Result<String, S3Error> {
345 validate_expiry(expiry_secs)?;
346 let request =
347 RequestImpl::new(self, path.as_ref(), Command::PresignDelete { expiry_secs }).await?;
348 request.presigned().await
349 }
350
351 #[maybe_async::maybe_async]
384 pub async fn create(
385 name: &str,
386 region: Region,
387 credentials: Credentials,
388 config: BucketConfiguration,
389 ) -> Result<CreateBucketResponse, S3Error> {
390 let mut config = config;
391
392 let skip_constraint = std::env::var("RUST_S3_SKIP_LOCATION_CONSTRAINT")
396 .unwrap_or_default()
397 .to_lowercase();
398
399 if skip_constraint != "true" && skip_constraint != "1" {
400 config.set_region(region.clone());
401 }
402
403 let command = Command::CreateBucket { config };
404 let bucket = Bucket::new(name, region, credentials)?;
405 let request = RequestImpl::new(&bucket, "", command).await?;
406 let response_data = request.response_data(false).await?;
407 let response_text = response_data.as_str()?;
408 Ok(CreateBucketResponse {
409 bucket,
410 response_text: response_text.to_string(),
411 response_code: response_data.status_code(),
412 })
413 }
414
415 #[maybe_async::maybe_async]
449 pub async fn list_buckets(
450 region: Region,
451 credentials: Credentials,
452 ) -> Result<crate::bucket_ops::ListBucketsResponse, S3Error> {
453 let dummy_bucket = Bucket::new("", region, credentials)?.with_path_style();
454 dummy_bucket._list_buckets().await
455 }
456
457 #[maybe_async::maybe_async]
460 async fn _list_buckets(&self) -> Result<crate::bucket_ops::ListBucketsResponse, S3Error> {
461 let request = RequestImpl::new(self, "", Command::ListBuckets).await?;
462 let response = request.response_data(false).await?;
463
464 Ok(quick_xml::de::from_str::<
465 crate::bucket_ops::ListBucketsResponse,
466 >(response.as_str()?)?)
467 }
468
469 #[maybe_async::maybe_async]
501 pub async fn exists(&self) -> Result<bool, S3Error> {
502 let mut dummy_bucket = self.clone();
503 dummy_bucket.name = "".into();
504
505 let response = dummy_bucket._list_buckets().await?;
506
507 Ok(response
508 .bucket_names()
509 .collect::<std::collections::HashSet<String>>()
510 .contains(&self.name))
511 }
512
513 #[maybe_async::maybe_async]
546 pub async fn create_with_path_style(
547 name: &str,
548 region: Region,
549 credentials: Credentials,
550 config: BucketConfiguration,
551 ) -> Result<CreateBucketResponse, S3Error> {
552 let mut config = config;
553
554 let skip_constraint = std::env::var("RUST_S3_SKIP_LOCATION_CONSTRAINT")
558 .unwrap_or_default()
559 .to_lowercase();
560
561 if skip_constraint != "true" && skip_constraint != "1" {
562 config.set_region(region.clone());
563 }
564
565 let command = Command::CreateBucket { config };
566 let bucket = Bucket::new(name, region, credentials)?.with_path_style();
567 let request = RequestImpl::new(&bucket, "", command).await?;
568 let response_data = request.response_data(false).await?;
569 let response_text = response_data.to_string()?;
570
571 Ok(CreateBucketResponse {
572 bucket,
573 response_text,
574 response_code: response_data.status_code(),
575 })
576 }
577
578 #[maybe_async::maybe_async]
609 pub async fn delete(&self) -> Result<u16, S3Error> {
610 let command = Command::DeleteBucket;
611 let request = RequestImpl::new(self, "", command).await?;
612 let response_data = request.response_data(false).await?;
613 Ok(response_data.status_code())
614 }
615
616 pub fn new(
631 name: &str,
632 region: Region,
633 credentials: Credentials,
634 ) -> Result<Box<Bucket>, S3Error> {
635 #[cfg(feature = "with-tokio")]
636 let options = ClientOptions::default();
637
638 Ok(Box::new(Bucket {
639 name: name.into(),
640 region,
641 credentials: Arc::new(RwLock::new(credentials)),
642 extra_headers: HeaderMap::new(),
643 extra_query: HashMap::new(),
644 request_timeout: DEFAULT_REQUEST_TIMEOUT,
645 path_style: false,
646 listobjects_v2: true,
647 #[cfg(feature = "with-tokio")]
648 http_client: client(&options)?,
649 #[cfg(feature = "with-tokio")]
650 client_options: options,
651 }))
652 }
653
654 pub fn new_public(name: &str, region: Region) -> Result<Bucket, S3Error> {
666 #[cfg(feature = "with-tokio")]
667 let options = ClientOptions::default();
668
669 Ok(Bucket {
670 name: name.into(),
671 region,
672 credentials: Arc::new(RwLock::new(Credentials::anonymous()?)),
673 extra_headers: HeaderMap::new(),
674 extra_query: HashMap::new(),
675 request_timeout: DEFAULT_REQUEST_TIMEOUT,
676 path_style: false,
677 listobjects_v2: true,
678 #[cfg(feature = "with-tokio")]
679 http_client: client(&options)?,
680 #[cfg(feature = "with-tokio")]
681 client_options: options,
682 })
683 }
684
685 pub fn with_path_style(&self) -> Box<Bucket> {
686 Box::new(Bucket {
687 name: self.name.clone(),
688 region: self.region.clone(),
689 credentials: self.credentials.clone(),
690 extra_headers: self.extra_headers.clone(),
691 extra_query: self.extra_query.clone(),
692 request_timeout: self.request_timeout,
693 path_style: true,
694 listobjects_v2: self.listobjects_v2,
695 #[cfg(feature = "with-tokio")]
696 http_client: self.http_client(),
697 #[cfg(feature = "with-tokio")]
698 client_options: self.client_options.clone(),
699 })
700 }
701
702 pub fn with_extra_headers(&self, extra_headers: HeaderMap) -> Result<Bucket, S3Error> {
703 Ok(Bucket {
704 name: self.name.clone(),
705 region: self.region.clone(),
706 credentials: self.credentials.clone(),
707 extra_headers,
708 extra_query: self.extra_query.clone(),
709 request_timeout: self.request_timeout,
710 path_style: self.path_style,
711 listobjects_v2: self.listobjects_v2,
712 #[cfg(feature = "with-tokio")]
713 http_client: self.http_client(),
714 #[cfg(feature = "with-tokio")]
715 client_options: self.client_options.clone(),
716 })
717 }
718
719 pub fn with_extra_query(
720 &self,
721 extra_query: HashMap<String, String>,
722 ) -> Result<Bucket, S3Error> {
723 Ok(Bucket {
724 name: self.name.clone(),
725 region: self.region.clone(),
726 credentials: self.credentials.clone(),
727 extra_headers: self.extra_headers.clone(),
728 extra_query,
729 request_timeout: self.request_timeout,
730 path_style: self.path_style,
731 listobjects_v2: self.listobjects_v2,
732 #[cfg(feature = "with-tokio")]
733 http_client: self.http_client(),
734 #[cfg(feature = "with-tokio")]
735 client_options: self.client_options.clone(),
736 })
737 }
738
739 #[cfg(not(feature = "with-tokio"))]
740 pub fn with_request_timeout(&self, request_timeout: Duration) -> Result<Box<Bucket>, S3Error> {
741 Ok(Box::new(Bucket {
742 name: self.name.clone(),
743 region: self.region.clone(),
744 credentials: self.credentials.clone(),
745 extra_headers: self.extra_headers.clone(),
746 extra_query: self.extra_query.clone(),
747 request_timeout: Some(request_timeout),
748 path_style: self.path_style,
749 listobjects_v2: self.listobjects_v2,
750 }))
751 }
752
753 #[cfg(feature = "with-tokio")]
754 pub fn with_request_timeout(&self, request_timeout: Duration) -> Result<Box<Bucket>, S3Error> {
755 let options = ClientOptions {
756 request_timeout: Some(request_timeout),
757 ..Default::default()
758 };
759
760 Ok(Box::new(Bucket {
761 name: self.name.clone(),
762 region: self.region.clone(),
763 credentials: self.credentials.clone(),
764 extra_headers: self.extra_headers.clone(),
765 extra_query: self.extra_query.clone(),
766 request_timeout: Some(request_timeout),
767 path_style: self.path_style,
768 listobjects_v2: self.listobjects_v2,
769 #[cfg(feature = "with-tokio")]
770 http_client: client(&options)?,
771 #[cfg(feature = "with-tokio")]
772 client_options: options,
773 }))
774 }
775
776 pub fn with_listobjects_v1(&self) -> Bucket {
777 Bucket {
778 name: self.name.clone(),
779 region: self.region.clone(),
780 credentials: self.credentials.clone(),
781 extra_headers: self.extra_headers.clone(),
782 extra_query: self.extra_query.clone(),
783 request_timeout: self.request_timeout,
784 path_style: self.path_style,
785 listobjects_v2: false,
786 #[cfg(feature = "with-tokio")]
787 http_client: self.http_client(),
788 #[cfg(feature = "with-tokio")]
789 client_options: self.client_options.clone(),
790 }
791 }
792
793 #[cfg(any(feature = "tokio-native-tls", feature = "tokio-rustls-tls"))]
826 pub fn set_dangereous_config(
827 &self,
828 accept_invalid_certs: bool,
829 accept_invalid_hostnames: bool,
830 ) -> Result<Bucket, S3Error> {
831 let mut options = self.client_options.clone();
832 options.accept_invalid_certs = accept_invalid_certs;
833 options.accept_invalid_hostnames = accept_invalid_hostnames;
834
835 Ok(Bucket {
836 name: self.name.clone(),
837 region: self.region.clone(),
838 credentials: self.credentials.clone(),
839 extra_headers: self.extra_headers.clone(),
840 extra_query: self.extra_query.clone(),
841 request_timeout: self.request_timeout,
842 path_style: self.path_style,
843 listobjects_v2: self.listobjects_v2,
844 http_client: client(&options)?,
845 client_options: options,
846 })
847 }
848
849 #[cfg(feature = "with-tokio")]
850 pub fn set_proxy(&self, proxy: reqwest::Proxy) -> Result<Bucket, S3Error> {
851 let mut options = self.client_options.clone();
852 options.proxy = Some(proxy);
853
854 Ok(Bucket {
855 name: self.name.clone(),
856 region: self.region.clone(),
857 credentials: self.credentials.clone(),
858 extra_headers: self.extra_headers.clone(),
859 extra_query: self.extra_query.clone(),
860 request_timeout: self.request_timeout,
861 path_style: self.path_style,
862 listobjects_v2: self.listobjects_v2,
863 http_client: client(&options)?,
864 client_options: options,
865 })
866 }
867
868 #[maybe_async::maybe_async]
896 pub async fn copy_object_internal<F: AsRef<str>, T: AsRef<str>>(
897 &self,
898 from: F,
899 to: T,
900 ) -> Result<u16, S3Error> {
901 let fq_from = {
902 let from = from.as_ref();
903 let from = from.strip_prefix('/').unwrap_or(from);
904 format!("{bucket}/{path}", bucket = self.name(), path = from)
905 };
906 self.copy_object(fq_from, to).await
907 }
908
909 #[maybe_async::maybe_async]
910 async fn copy_object<F: AsRef<str>, T: AsRef<str>>(
911 &self,
912 from: F,
913 to: T,
914 ) -> Result<u16, S3Error> {
915 let command = Command::CopyObject {
916 from: from.as_ref(),
917 };
918 let request = RequestImpl::new(self, to.as_ref(), command).await?;
919 let response_data = request.response_data(false).await?;
920 Ok(response_data.status_code())
921 }
922
923 #[maybe_async::maybe_async]
955 pub async fn get_object<S: AsRef<str>>(&self, path: S) -> Result<ResponseData, S3Error> {
956 let command = Command::GetObject;
957 let request = RequestImpl::new(self, path.as_ref(), command).await?;
958 request.response_data(false).await
959 }
960
961 #[maybe_async::maybe_async]
962 pub async fn get_object_attributes<S: AsRef<str>>(
963 &self,
964 path: S,
965 expected_bucket_owner: &str,
966 version_id: Option<String>,
967 ) -> Result<GetObjectAttributesOutput, S3Error> {
968 let command = Command::GetObjectAttributes {
969 expected_bucket_owner: expected_bucket_owner.to_string(),
970 version_id,
971 };
972 let request = RequestImpl::new(self, path.as_ref(), command).await?;
973
974 let response = request.response_data(false).await?;
975
976 Ok(quick_xml::de::from_str::<GetObjectAttributesOutput>(
977 response.as_str()?,
978 )?)
979 }
980
981 #[maybe_async::maybe_async]
1024 pub async fn object_exists<S: AsRef<str>>(&self, path: S) -> Result<bool, S3Error> {
1025 let command = Command::HeadObject;
1026 let request = RequestImpl::new(self, path.as_ref(), command).await?;
1027 let response_data = match request.response_data(false).await {
1028 Ok(response_data) => response_data,
1029 Err(S3Error::HttpFailWithBody(status_code, error)) => {
1030 if status_code == 404 {
1031 return Ok(false);
1032 }
1033 return Err(S3Error::HttpFailWithBody(status_code, error));
1034 }
1035 Err(e) => return Err(e),
1036 };
1037 Ok(response_data.status_code() != 404)
1038 }
1039
1040 #[maybe_async::maybe_async]
1041 pub async fn put_bucket_cors(
1042 &self,
1043 expected_bucket_owner: &str,
1044 cors_config: &CorsConfiguration,
1045 ) -> Result<ResponseData, S3Error> {
1046 let command = Command::PutBucketCors {
1047 expected_bucket_owner: expected_bucket_owner.to_string(),
1048 configuration: cors_config.clone(),
1049 };
1050 let request = RequestImpl::new(self, "", command).await?;
1051 request.response_data(false).await
1052 }
1053
1054 #[maybe_async::maybe_async]
1055 pub async fn get_bucket_cors(
1056 &self,
1057 expected_bucket_owner: &str,
1058 ) -> Result<CorsConfiguration, S3Error> {
1059 let command = Command::GetBucketCors {
1060 expected_bucket_owner: expected_bucket_owner.to_string(),
1061 };
1062 let request = RequestImpl::new(self, "", command).await?;
1063 let response = request.response_data(false).await?;
1064 Ok(quick_xml::de::from_str::<CorsConfiguration>(
1065 response.as_str()?,
1066 )?)
1067 }
1068
1069 #[maybe_async::maybe_async]
1070 pub async fn delete_bucket_cors(
1071 &self,
1072 expected_bucket_owner: &str,
1073 ) -> Result<ResponseData, S3Error> {
1074 let command = Command::DeleteBucketCors {
1075 expected_bucket_owner: expected_bucket_owner.to_string(),
1076 };
1077 let request = RequestImpl::new(self, "", command).await?;
1078 request.response_data(false).await
1079 }
1080
1081 #[maybe_async::maybe_async]
1082 pub async fn get_bucket_lifecycle(&self) -> Result<BucketLifecycleConfiguration, S3Error> {
1083 let request = RequestImpl::new(self, "", Command::GetBucketLifecycle).await?;
1084 let response = request.response_data(false).await?;
1085 Ok(quick_xml::de::from_str::<BucketLifecycleConfiguration>(
1086 response.as_str()?,
1087 )?)
1088 }
1089
1090 #[maybe_async::maybe_async]
1091 pub async fn put_bucket_lifecycle(
1092 &self,
1093 lifecycle_config: BucketLifecycleConfiguration,
1094 ) -> Result<ResponseData, S3Error> {
1095 let command = Command::PutBucketLifecycle {
1096 configuration: lifecycle_config,
1097 };
1098 let request = RequestImpl::new(self, "", command).await?;
1099 request.response_data(false).await
1100 }
1101
1102 #[maybe_async::maybe_async]
1103 pub async fn delete_bucket_lifecycle(&self) -> Result<ResponseData, S3Error> {
1104 let request = RequestImpl::new(self, "", Command::DeleteBucketLifecycle).await?;
1105 request.response_data(false).await
1106 }
1107
1108 #[maybe_async::maybe_async]
1140 pub async fn get_object_torrent<S: AsRef<str>>(
1141 &self,
1142 path: S,
1143 ) -> Result<ResponseData, S3Error> {
1144 let command = Command::GetObjectTorrent;
1145 let request = RequestImpl::new(self, path.as_ref(), command).await?;
1146 request.response_data(false).await
1147 }
1148
1149 #[maybe_async::maybe_async]
1182 pub async fn get_object_range<S: AsRef<str>>(
1183 &self,
1184 path: S,
1185 start: u64,
1186 end: Option<u64>,
1187 ) -> Result<ResponseData, S3Error> {
1188 if let Some(end) = end {
1189 assert!(start < end);
1190 }
1191
1192 let command = Command::GetObjectRange { start, end };
1193 let request = RequestImpl::new(self, path.as_ref(), command).await?;
1194 request.response_data(false).await
1195 }
1196
1197 #[maybe_async::async_impl]
1238 pub async fn get_object_range_to_writer<T, S>(
1239 &self,
1240 path: S,
1241 start: u64,
1242 end: Option<u64>,
1243 writer: &mut T,
1244 ) -> Result<u16, S3Error>
1245 where
1246 T: AsyncWrite + Send + Unpin + ?Sized,
1247 S: AsRef<str>,
1248 {
1249 if let Some(end) = end {
1250 assert!(start < end);
1251 }
1252
1253 let command = Command::GetObjectRange { start, end };
1254 let request = RequestImpl::new(self, path.as_ref(), command).await?;
1255 request.response_data_to_writer(writer).await
1256 }
1257
1258 #[maybe_async::sync_impl]
1259 pub fn get_object_range_to_writer<T: std::io::Write + Send + ?Sized, S: AsRef<str>>(
1260 &self,
1261 path: S,
1262 start: u64,
1263 end: Option<u64>,
1264 writer: &mut T,
1265 ) -> Result<u16, S3Error> {
1266 if let Some(end) = end {
1267 assert!(start < end);
1268 }
1269
1270 let command = Command::GetObjectRange { start, end };
1271 let request = RequestImpl::new(self, path.as_ref(), command)?;
1272 request.response_data_to_writer(writer)
1273 }
1274
1275 #[maybe_async::async_impl]
1313 pub async fn get_object_to_writer<T: AsyncWrite + Send + Unpin + ?Sized, S: AsRef<str>>(
1314 &self,
1315 path: S,
1316 writer: &mut T,
1317 ) -> Result<u16, S3Error> {
1318 let command = Command::GetObject;
1319 let request = RequestImpl::new(self, path.as_ref(), command).await?;
1320 request.response_data_to_writer(writer).await
1321 }
1322
1323 #[maybe_async::sync_impl]
1324 pub fn get_object_to_writer<T: std::io::Write + Send + ?Sized, S: AsRef<str>>(
1325 &self,
1326 path: S,
1327 writer: &mut T,
1328 ) -> Result<u16, S3Error> {
1329 let command = Command::GetObject;
1330 let request = RequestImpl::new(self, path.as_ref(), command)?;
1331 request.response_data_to_writer(writer)
1332 }
1333
1334 #[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
1376 pub async fn get_object_stream<S: AsRef<str>>(
1377 &self,
1378 path: S,
1379 ) -> Result<ResponseDataStream, S3Error> {
1380 let command = Command::GetObject;
1381 let request = RequestImpl::new(self, path.as_ref(), command).await?;
1382 request.response_data_to_stream().await
1383 }
1384
1385 #[maybe_async::async_impl]
1432 pub async fn put_object_stream<R: AsyncRead + Unpin + ?Sized>(
1433 &self,
1434 reader: &mut R,
1435 s3_path: impl AsRef<str>,
1436 ) -> Result<PutStreamResponse, S3Error> {
1437 self._put_object_stream_with_content_type(
1438 reader,
1439 s3_path.as_ref(),
1440 "application/octet-stream",
1441 )
1442 .await
1443 }
1444
1445 #[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
1478 pub fn put_object_stream_builder<S: AsRef<str>>(
1479 &self,
1480 path: S,
1481 ) -> crate::put_object_request::PutObjectStreamRequest<'_> {
1482 crate::put_object_request::PutObjectStreamRequest::new(self, path)
1483 }
1484
1485 #[maybe_async::sync_impl]
1486 pub fn put_object_stream<R: Read>(
1487 &self,
1488 reader: &mut R,
1489 s3_path: impl AsRef<str>,
1490 ) -> Result<u16, S3Error> {
1491 self._put_object_stream_with_content_type(
1492 reader,
1493 s3_path.as_ref(),
1494 "application/octet-stream",
1495 )
1496 }
1497
1498 #[maybe_async::async_impl]
1549 pub async fn put_object_stream_with_content_type<R: AsyncRead + Unpin>(
1550 &self,
1551 reader: &mut R,
1552 s3_path: impl AsRef<str>,
1553 content_type: impl AsRef<str>,
1554 ) -> Result<PutStreamResponse, S3Error> {
1555 self._put_object_stream_with_content_type(reader, s3_path.as_ref(), content_type.as_ref())
1556 .await
1557 }
1558
1559 #[maybe_async::sync_impl]
1560 pub fn put_object_stream_with_content_type<R: Read>(
1561 &self,
1562 reader: &mut R,
1563 s3_path: impl AsRef<str>,
1564 content_type: impl AsRef<str>,
1565 ) -> Result<u16, S3Error> {
1566 self._put_object_stream_with_content_type(reader, s3_path.as_ref(), content_type.as_ref())
1567 }
1568
1569 #[maybe_async::async_impl]
1570 async fn make_multipart_request(
1571 &self,
1572 path: &str,
1573 chunk: Vec<u8>,
1574 part_number: u32,
1575 upload_id: &str,
1576 content_type: &str,
1577 ) -> Result<ResponseData, S3Error> {
1578 let command = Command::PutObject {
1579 content: &chunk,
1580 multipart: Some(Multipart::new(part_number, upload_id)), custom_headers: None,
1582 content_type,
1583 };
1584 let request = RequestImpl::new(self, path, command).await?;
1585 request.response_data(true).await
1586 }
1587
1588 #[maybe_async::async_impl]
1589 async fn _put_object_stream_with_content_type<R: AsyncRead + Unpin + ?Sized>(
1590 &self,
1591 reader: &mut R,
1592 s3_path: &str,
1593 content_type: &str,
1594 ) -> Result<PutStreamResponse, S3Error> {
1595 self._put_object_stream_with_content_type_and_headers(reader, s3_path, content_type, None)
1596 .await
1597 }
1598
1599 #[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
1602 fn calculate_max_concurrent_chunks() -> usize {
1603 let mut system = System::new();
1605 system.refresh_memory_specifics(MemoryRefreshKind::everything());
1606
1607 let available_memory = system.available_memory();
1609
1610 if available_memory == 0 {
1612 return 3;
1613 }
1614
1615 let safety_factor = 3;
1619 let memory_per_chunk = CHUNK_SIZE as u64 * safety_factor;
1620
1621 let calculated_chunks = (available_memory / memory_per_chunk) as usize;
1623
1624 calculated_chunks.clamp(2, 100)
1628 }
1629
1630 #[maybe_async::async_impl]
1631 pub(crate) async fn _put_object_stream_with_content_type_and_headers<
1632 R: AsyncRead + Unpin + ?Sized,
1633 >(
1634 &self,
1635 reader: &mut R,
1636 s3_path: &str,
1637 content_type: &str,
1638 custom_headers: Option<http::HeaderMap>,
1639 ) -> Result<PutStreamResponse, S3Error> {
1640 let first_chunk = crate::utils::read_chunk_async(reader).await?;
1643 if first_chunk.len() < CHUNK_SIZE {
1645 let total_size = first_chunk.len();
1646 let mut builder = self
1648 .put_object_builder(s3_path, first_chunk.as_slice())
1649 .with_content_type(content_type);
1650
1651 if let Some(headers) = custom_headers {
1653 builder = builder.with_headers(headers);
1654 }
1655
1656 let response_data = builder.execute().await?;
1657 if response_data.status_code() >= 300 {
1658 return Err(error_from_response_data(response_data)?);
1659 }
1660 return Ok(PutStreamResponse::new(
1661 response_data.status_code(),
1662 total_size,
1663 ));
1664 }
1665
1666 let msg = self
1667 .initiate_multipart_upload(s3_path, content_type)
1668 .await?;
1669 let path = msg.key;
1670 let upload_id = &msg.upload_id;
1671
1672 let max_concurrent_chunks = Self::calculate_max_concurrent_chunks();
1674
1675 use futures_util::FutureExt;
1677 use futures_util::stream::{FuturesUnordered, StreamExt};
1678
1679 let mut part_number: u32 = 0;
1680 let mut total_size = 0;
1681 let mut etags = Vec::new();
1682 let mut active_uploads: FuturesUnordered<
1683 futures_util::future::BoxFuture<'_, (u32, Result<ResponseData, S3Error>)>,
1684 > = FuturesUnordered::new();
1685 let mut reading_done = false;
1686
1687 part_number += 1;
1689 total_size += first_chunk.len();
1690 if first_chunk.len() < CHUNK_SIZE {
1691 reading_done = true;
1692 }
1693
1694 let path_clone = path.clone();
1695 let upload_id_clone = upload_id.clone();
1696 let content_type_clone = content_type.to_string();
1697 let bucket_clone = self.clone();
1698
1699 active_uploads.push(
1700 async move {
1701 let result = bucket_clone
1702 .make_multipart_request(
1703 &path_clone,
1704 first_chunk,
1705 1,
1706 &upload_id_clone,
1707 &content_type_clone,
1708 )
1709 .await;
1710 (1, result)
1711 }
1712 .boxed(),
1713 );
1714
1715 while !active_uploads.is_empty() || !reading_done {
1717 while active_uploads.len() < max_concurrent_chunks && !reading_done {
1719 let chunk = crate::utils::read_chunk_async(reader).await?;
1720 let chunk_len = chunk.len();
1721
1722 if chunk_len == 0 {
1723 reading_done = true;
1724 break;
1725 }
1726
1727 total_size += chunk_len;
1728 part_number += 1;
1729
1730 if chunk_len < CHUNK_SIZE {
1731 reading_done = true;
1732 }
1733
1734 let current_part = part_number;
1735 let path_clone = path.clone();
1736 let upload_id_clone = upload_id.clone();
1737 let content_type_clone = content_type.to_string();
1738 let bucket_clone = self.clone();
1739
1740 active_uploads.push(
1741 async move {
1742 let result = bucket_clone
1743 .make_multipart_request(
1744 &path_clone,
1745 chunk,
1746 current_part,
1747 &upload_id_clone,
1748 &content_type_clone,
1749 )
1750 .await;
1751 (current_part, result)
1752 }
1753 .boxed(),
1754 );
1755 }
1756
1757 if let Some((part_num, result)) = active_uploads.next().await {
1759 let response_data = result?;
1760 if !(200..300).contains(&response_data.status_code()) {
1761 match self.abort_upload(&path, upload_id).await {
1763 Ok(_) => {
1764 return Err(error_from_response_data(response_data)?);
1765 }
1766 Err(error) => {
1767 return Err(error);
1768 }
1769 }
1770 }
1771
1772 let etag = response_data.as_str()?;
1773 etags.push((part_num, etag.to_string()));
1775 }
1776 }
1777
1778 etags.sort_by_key(|k| k.0);
1780 let etags: Vec<String> = etags.into_iter().map(|(_, etag)| etag).collect();
1781
1782 let inner_data = etags
1784 .clone()
1785 .into_iter()
1786 .enumerate()
1787 .map(|(i, x)| Part {
1788 etag: x,
1789 part_number: i as u32 + 1,
1790 })
1791 .collect::<Vec<Part>>();
1792 let response_data = self
1793 .complete_multipart_upload(&path, &msg.upload_id, inner_data)
1794 .await?;
1795
1796 Ok(PutStreamResponse::new(
1797 response_data.status_code(),
1798 total_size,
1799 ))
1800 }
1801
1802 #[maybe_async::sync_impl]
1803 fn _put_object_stream_with_content_type<R: Read + ?Sized>(
1804 &self,
1805 reader: &mut R,
1806 s3_path: &str,
1807 content_type: &str,
1808 ) -> Result<u16, S3Error> {
1809 let msg = self.initiate_multipart_upload(s3_path, content_type)?;
1810 let path = msg.key;
1811 let upload_id = &msg.upload_id;
1812
1813 let mut part_number: u32 = 0;
1814 let mut etags = Vec::new();
1815 loop {
1816 let chunk = crate::utils::read_chunk(reader)?;
1817
1818 if chunk.len() < CHUNK_SIZE {
1819 if part_number == 0 {
1820 self.abort_upload(&path, upload_id)?;
1822
1823 return Ok(self.put_object(s3_path, chunk.as_slice())?.status_code());
1824 } else {
1825 part_number += 1;
1826 let part = self.put_multipart_chunk(
1827 &chunk,
1828 &path,
1829 part_number,
1830 upload_id,
1831 content_type,
1832 )?;
1833 etags.push(part.etag);
1834 let inner_data = etags
1835 .into_iter()
1836 .enumerate()
1837 .map(|(i, x)| Part {
1838 etag: x,
1839 part_number: i as u32 + 1,
1840 })
1841 .collect::<Vec<Part>>();
1842 return Ok(self
1843 .complete_multipart_upload(&path, upload_id, inner_data)?
1844 .status_code());
1845 }
1847 } else {
1848 part_number += 1;
1849 let part =
1850 self.put_multipart_chunk(&chunk, &path, part_number, upload_id, content_type)?;
1851 etags.push(part.etag.to_string());
1852 }
1853 }
1854 }
1855
1856 #[maybe_async::async_impl]
1858 pub async fn initiate_multipart_upload(
1859 &self,
1860 s3_path: &str,
1861 content_type: &str,
1862 ) -> Result<InitiateMultipartUploadResponse, S3Error> {
1863 let command = Command::InitiateMultipartUpload { content_type };
1864 let request = RequestImpl::new(self, s3_path, command).await?;
1865 let response_data = request.response_data(false).await?;
1866 if response_data.status_code() >= 300 {
1867 return Err(error_from_response_data(response_data)?);
1868 }
1869
1870 let msg: InitiateMultipartUploadResponse =
1871 quick_xml::de::from_str(response_data.as_str()?)?;
1872 Ok(msg)
1873 }
1874
1875 #[maybe_async::sync_impl]
1876 pub fn initiate_multipart_upload(
1877 &self,
1878 s3_path: &str,
1879 content_type: &str,
1880 ) -> Result<InitiateMultipartUploadResponse, S3Error> {
1881 let command = Command::InitiateMultipartUpload { content_type };
1882 let request = RequestImpl::new(self, s3_path, command)?;
1883 let response_data = request.response_data(false)?;
1884 if response_data.status_code() >= 300 {
1885 return Err(error_from_response_data(response_data)?);
1886 }
1887
1888 let msg: InitiateMultipartUploadResponse =
1889 quick_xml::de::from_str(response_data.as_str()?)?;
1890 Ok(msg)
1891 }
1892
1893 #[maybe_async::async_impl]
1895 pub async fn put_multipart_stream<R: Read + Unpin>(
1896 &self,
1897 reader: &mut R,
1898 path: &str,
1899 part_number: u32,
1900 upload_id: &str,
1901 content_type: &str,
1902 ) -> Result<Part, S3Error> {
1903 let chunk = crate::utils::read_chunk(reader)?;
1904 self.put_multipart_chunk(chunk, path, part_number, upload_id, content_type)
1905 .await
1906 }
1907
1908 #[maybe_async::sync_impl]
1909 pub async fn put_multipart_stream<R: Read + Unpin>(
1910 &self,
1911 reader: &mut R,
1912 path: &str,
1913 part_number: u32,
1914 upload_id: &str,
1915 content_type: &str,
1916 ) -> Result<Part, S3Error> {
1917 let chunk = crate::utils::read_chunk(reader)?;
1918 self.put_multipart_chunk(&chunk, path, part_number, upload_id, content_type)
1919 }
1920
1921 #[maybe_async::async_impl]
1923 pub async fn put_multipart_chunk(
1924 &self,
1925 chunk: Vec<u8>,
1926 path: &str,
1927 part_number: u32,
1928 upload_id: &str,
1929 content_type: &str,
1930 ) -> Result<Part, S3Error> {
1931 let command = Command::PutObject {
1932 content: &chunk,
1934 multipart: Some(Multipart::new(part_number, upload_id)), custom_headers: None,
1936 content_type,
1937 };
1938 let request = RequestImpl::new(self, path, command).await?;
1939 let response_data = request.response_data(true).await?;
1940 if !(200..300).contains(&response_data.status_code()) {
1941 match self.abort_upload(path, upload_id).await {
1943 Ok(_) => {
1944 return Err(error_from_response_data(response_data)?);
1945 }
1946 Err(error) => {
1947 return Err(error);
1948 }
1949 }
1950 }
1951 let etag = response_data.as_str()?;
1952 Ok(Part {
1953 etag: etag.to_string(),
1954 part_number,
1955 })
1956 }
1957
1958 #[maybe_async::sync_impl]
1959 pub fn put_multipart_chunk(
1960 &self,
1961 chunk: &[u8],
1962 path: &str,
1963 part_number: u32,
1964 upload_id: &str,
1965 content_type: &str,
1966 ) -> Result<Part, S3Error> {
1967 let command = Command::PutObject {
1968 content: chunk,
1970 multipart: Some(Multipart::new(part_number, upload_id)), custom_headers: None,
1972 content_type,
1973 };
1974 let request = RequestImpl::new(self, path, command)?;
1975 let response_data = request.response_data(true)?;
1976 if !(200..300).contains(&response_data.status_code()) {
1977 match self.abort_upload(path, upload_id) {
1979 Ok(_) => {
1980 return Err(error_from_response_data(response_data)?);
1981 }
1982 Err(error) => {
1983 return Err(error);
1984 }
1985 }
1986 }
1987 let etag = response_data.as_str()?;
1988 Ok(Part {
1989 etag: etag.to_string(),
1990 part_number,
1991 })
1992 }
1993
1994 #[maybe_async::async_impl]
1996 pub async fn complete_multipart_upload(
1997 &self,
1998 path: &str,
1999 upload_id: &str,
2000 parts: Vec<Part>,
2001 ) -> Result<ResponseData, S3Error> {
2002 let data = CompleteMultipartUploadData { parts };
2003 let complete = Command::CompleteMultipartUpload { upload_id, data };
2004 let complete_request = RequestImpl::new(self, path, complete).await?;
2005 complete_request.response_data(false).await
2006 }
2007
2008 #[maybe_async::sync_impl]
2009 pub fn complete_multipart_upload(
2010 &self,
2011 path: &str,
2012 upload_id: &str,
2013 parts: Vec<Part>,
2014 ) -> Result<ResponseData, S3Error> {
2015 let data = CompleteMultipartUploadData { parts };
2016 let complete = Command::CompleteMultipartUpload { upload_id, data };
2017 let complete_request = RequestImpl::new(self, path, complete)?;
2018 complete_request.response_data(false)
2019 }
2020
2021 #[maybe_async::maybe_async]
2054 pub async fn location(&self) -> Result<(Region, u16), S3Error> {
2055 let request = RequestImpl::new(self, "?location", Command::GetBucketLocation).await?;
2056 let response_data = request.response_data(false).await?;
2057 let region_string = String::from_utf8_lossy(response_data.as_slice());
2058 let region = match quick_xml::de::from_reader(region_string.as_bytes()) {
2059 Ok(r) => {
2060 let location_result: BucketLocationResult = r;
2061 location_result.region.parse()?
2062 }
2063 Err(e) => {
2064 if response_data.status_code() == 200 {
2065 Region::Custom {
2066 region: "Custom".to_string(),
2067 endpoint: "".to_string(),
2068 }
2069 } else {
2070 Region::Custom {
2071 region: format!("Error encountered : {}", e),
2072 endpoint: "".to_string(),
2073 }
2074 }
2075 }
2076 };
2077 Ok((region, response_data.status_code()))
2078 }
2079
2080 #[maybe_async::maybe_async]
2113 pub async fn delete_object<S: AsRef<str>>(&self, path: S) -> Result<ResponseData, S3Error> {
2114 let command = Command::DeleteObject;
2115 let request = RequestImpl::new(self, path.as_ref(), command).await?;
2116 request.response_data(false).await
2117 }
2118
2119 #[maybe_async::maybe_async]
2152 pub async fn head_object<S: AsRef<str>>(
2153 &self,
2154 path: S,
2155 ) -> Result<(HeadObjectResult, u16), S3Error> {
2156 let command = Command::HeadObject;
2157 let request = RequestImpl::new(self, path.as_ref(), command).await?;
2158 let (headers, status) = request.response_header().await?;
2159 let header_object = HeadObjectResult::from(&headers);
2160 Ok((header_object, status))
2161 }
2162
2163 #[maybe_async::maybe_async]
2197 pub async fn put_object_with_content_type<S: AsRef<str>>(
2198 &self,
2199 path: S,
2200 content: &[u8],
2201 content_type: &str,
2202 ) -> Result<ResponseData, S3Error> {
2203 let command = Command::PutObject {
2204 content,
2205 content_type,
2206 custom_headers: None,
2207 multipart: None,
2208 };
2209 let request = RequestImpl::new(self, path.as_ref(), command).await?;
2210 request.response_data(true).await
2211 }
2212
2213 #[maybe_async::maybe_async]
2247 pub async fn put_object<S: AsRef<str>>(
2248 &self,
2249 path: S,
2250 content: &[u8],
2251 ) -> Result<ResponseData, S3Error> {
2252 self.put_object_with_content_type(path, content, "application/octet-stream")
2253 .await
2254 }
2255
2256 pub fn put_object_builder<S: AsRef<str>>(
2285 &self,
2286 path: S,
2287 content: &[u8],
2288 ) -> crate::put_object_request::PutObjectRequest<'_> {
2289 crate::put_object_request::PutObjectRequest::new(self, path, content)
2290 }
2291
2292 fn _tags_xml<S: AsRef<str>>(&self, tags: &[(S, S)]) -> String {
2293 let mut s = String::new();
2294 let content = tags
2295 .iter()
2296 .map(|(name, value)| {
2297 format!(
2298 "<Tag><Key>{}</Key><Value>{}</Value></Tag>",
2299 name.as_ref(),
2300 value.as_ref()
2301 )
2302 })
2303 .fold(String::new(), |mut a, b| {
2304 a.push_str(b.as_str());
2305 a
2306 });
2307 s.push_str("<Tagging><TagSet>");
2308 s.push_str(&content);
2309 s.push_str("</TagSet></Tagging>");
2310 s
2311 }
2312
2313 #[maybe_async::maybe_async]
2346 pub async fn put_object_tagging<S: AsRef<str>>(
2347 &self,
2348 path: &str,
2349 tags: &[(S, S)],
2350 ) -> Result<ResponseData, S3Error> {
2351 let content = self._tags_xml(tags);
2352 let command = Command::PutObjectTagging { tags: &content };
2353 let request = RequestImpl::new(self, path, command).await?;
2354 request.response_data(false).await
2355 }
2356
2357 #[maybe_async::maybe_async]
2390 pub async fn delete_object_tagging<S: AsRef<str>>(
2391 &self,
2392 path: S,
2393 ) -> Result<ResponseData, S3Error> {
2394 let command = Command::DeleteObjectTagging;
2395 let request = RequestImpl::new(self, path.as_ref(), command).await?;
2396 request.response_data(false).await
2397 }
2398
2399 #[cfg(feature = "tags")]
2432 #[maybe_async::maybe_async]
2433 pub async fn get_object_tagging<S: AsRef<str>>(
2434 &self,
2435 path: S,
2436 ) -> Result<(Vec<Tag>, u16), S3Error> {
2437 let command = Command::GetObjectTagging {};
2438 let request = RequestImpl::new(self, path.as_ref(), command).await?;
2439 let result = request.response_data(false).await?;
2440
2441 let mut tags = Vec::new();
2442
2443 if result.status_code() == 200 {
2444 let result_string = String::from_utf8_lossy(result.as_slice());
2445
2446 let ns = "http://s3.amazonaws.com/doc/2006-03-01/";
2448 let result_string =
2449 if let Err(minidom::Error::MissingNamespace) = result_string.parse::<Element>() {
2450 result_string
2451 .replace("<Tagging>", &format!("<Tagging xmlns=\"{}\">", ns))
2452 .into()
2453 } else {
2454 result_string
2455 };
2456
2457 if let Ok(tagging) = result_string.parse::<Element>() {
2458 for tag_set in tagging.children() {
2459 if tag_set.is("TagSet", ns) {
2460 for tag in tag_set.children() {
2461 if tag.is("Tag", ns) {
2462 let key = if let Some(element) = tag.get_child("Key", ns) {
2463 element.text()
2464 } else {
2465 "Could not parse Key from Tag".to_string()
2466 };
2467 let value = if let Some(element) = tag.get_child("Value", ns) {
2468 element.text()
2469 } else {
2470 "Could not parse Values from Tag".to_string()
2471 };
2472 tags.push(Tag { key, value });
2473 }
2474 }
2475 }
2476 }
2477 }
2478 }
2479
2480 Ok((tags, result.status_code()))
2481 }
2482
2483 #[maybe_async::maybe_async]
2484 pub async fn list_page(
2485 &self,
2486 prefix: String,
2487 delimiter: Option<String>,
2488 continuation_token: Option<String>,
2489 start_after: Option<String>,
2490 max_keys: Option<usize>,
2491 ) -> Result<(ListBucketResult, u16), S3Error> {
2492 let command = if self.listobjects_v2 {
2493 Command::ListObjectsV2 {
2494 prefix,
2495 delimiter,
2496 continuation_token,
2497 start_after,
2498 max_keys,
2499 }
2500 } else {
2501 Command::ListObjects {
2505 prefix,
2506 delimiter,
2507 marker: std::cmp::max(continuation_token, start_after),
2508 max_keys,
2509 }
2510 };
2511 let request = RequestImpl::new(self, "/", command).await?;
2512 let response_data = request.response_data(false).await?;
2513 let list_bucket_result = quick_xml::de::from_reader(response_data.as_slice())?;
2514
2515 Ok((list_bucket_result, response_data.status_code()))
2516 }
2517
2518 #[maybe_async::maybe_async]
2551 #[allow(clippy::assigning_clones)]
2552 pub async fn list(
2553 &self,
2554 prefix: String,
2555 delimiter: Option<String>,
2556 ) -> Result<Vec<ListBucketResult>, S3Error> {
2557 let the_bucket = self.to_owned();
2558 let mut results = Vec::new();
2559 let mut continuation_token = None;
2560
2561 loop {
2562 let (list_bucket_result, _) = the_bucket
2563 .list_page(
2564 prefix.clone(),
2565 delimiter.clone(),
2566 continuation_token,
2567 None,
2568 None,
2569 )
2570 .await?;
2571 continuation_token = list_bucket_result.next_continuation_token.clone();
2572 results.push(list_bucket_result);
2573 if continuation_token.is_none() {
2574 break;
2575 }
2576 }
2577
2578 Ok(results)
2579 }
2580
2581 #[maybe_async::maybe_async]
2582 pub async fn list_multiparts_uploads_page(
2583 &self,
2584 prefix: Option<&str>,
2585 delimiter: Option<&str>,
2586 key_marker: Option<String>,
2587 max_uploads: Option<usize>,
2588 ) -> Result<(ListMultipartUploadsResult, u16), S3Error> {
2589 let command = Command::ListMultipartUploads {
2590 prefix,
2591 delimiter,
2592 key_marker,
2593 max_uploads,
2594 };
2595 let request = RequestImpl::new(self, "/", command).await?;
2596 let response_data = request.response_data(false).await?;
2597 let list_bucket_result = quick_xml::de::from_reader(response_data.as_slice())?;
2598
2599 Ok((list_bucket_result, response_data.status_code()))
2600 }
2601
2602 #[maybe_async::maybe_async]
2636 #[allow(clippy::assigning_clones)]
2637 pub async fn list_multiparts_uploads(
2638 &self,
2639 prefix: Option<&str>,
2640 delimiter: Option<&str>,
2641 ) -> Result<Vec<ListMultipartUploadsResult>, S3Error> {
2642 let the_bucket = self.to_owned();
2643 let mut results = Vec::new();
2644 let mut next_marker: Option<String> = None;
2645
2646 loop {
2647 let (list_multiparts_uploads_result, _) = the_bucket
2648 .list_multiparts_uploads_page(prefix, delimiter, next_marker, None)
2649 .await?;
2650
2651 let is_truncated = list_multiparts_uploads_result.is_truncated;
2652
2653 next_marker = list_multiparts_uploads_result.next_marker.clone();
2654 results.push(list_multiparts_uploads_result);
2655
2656 if !is_truncated {
2657 break;
2658 }
2659 }
2660
2661 Ok(results)
2662 }
2663
2664 #[maybe_async::maybe_async]
2697 pub async fn abort_upload(&self, key: &str, upload_id: &str) -> Result<(), S3Error> {
2698 let abort = Command::AbortMultipartUpload { upload_id };
2699 let abort_request = RequestImpl::new(self, key, abort).await?;
2700 let response_data = abort_request.response_data(false).await?;
2701
2702 if (200..300).contains(&response_data.status_code()) {
2703 Ok(())
2704 } else {
2705 let utf8_content = String::from_utf8(response_data.as_slice().to_vec())?;
2706 Err(S3Error::HttpFailWithBody(
2707 response_data.status_code(),
2708 utf8_content,
2709 ))
2710 }
2711 }
2712
2713 pub fn is_path_style(&self) -> bool {
2715 self.path_style
2716 }
2717
2718 pub fn is_subdomain_style(&self) -> bool {
2720 !self.path_style
2721 }
2722
2723 pub fn set_path_style(&mut self) {
2725 self.path_style = true;
2726 }
2727
2728 pub fn set_subdomain_style(&mut self) {
2730 self.path_style = false;
2731 }
2732
2733 pub fn set_request_timeout(&mut self, timeout: Option<Duration>) {
2740 self.request_timeout = timeout;
2741 }
2742
2743 pub fn set_listobjects_v1(&mut self) {
2749 self.listobjects_v2 = false;
2750 }
2751
2752 pub fn set_listobjects_v2(&mut self) {
2754 self.listobjects_v2 = true;
2755 }
2756
2757 pub fn name(&self) -> String {
2759 self.name.to_string()
2760 }
2761
2762 pub fn host(&self) -> String {
2764 if self.path_style {
2765 self.path_style_host()
2766 } else {
2767 self.subdomain_style_host()
2768 }
2769 }
2770
2771 pub fn url(&self) -> String {
2772 if self.path_style {
2773 format!(
2774 "{}://{}/{}",
2775 self.scheme(),
2776 self.path_style_host(),
2777 self.name()
2778 )
2779 } else {
2780 format!("{}://{}", self.scheme(), self.subdomain_style_host())
2781 }
2782 }
2783
2784 pub fn path_style_host(&self) -> String {
2786 self.region.host()
2787 }
2788
2789 pub fn subdomain_style_host(&self) -> String {
2790 format!("{}.{}", self.name, self.region.host())
2791 }
2792
2793 pub fn scheme(&self) -> String {
2798 self.region.scheme()
2799 }
2800
2801 pub fn region(&self) -> Region {
2803 self.region.clone()
2804 }
2805
2806 #[maybe_async::maybe_async]
2808 pub async fn access_key(&self) -> Result<Option<String>, S3Error> {
2809 Ok(self.credentials().await?.access_key)
2810 }
2811
2812 #[maybe_async::maybe_async]
2814 pub async fn secret_key(&self) -> Result<Option<String>, S3Error> {
2815 Ok(self.credentials().await?.secret_key)
2816 }
2817
2818 #[maybe_async::maybe_async]
2820 pub async fn security_token(&self) -> Result<Option<String>, S3Error> {
2821 Ok(self.credentials().await?.security_token)
2822 }
2823
2824 #[maybe_async::maybe_async]
2826 pub async fn session_token(&self) -> Result<Option<String>, S3Error> {
2827 Ok(self.credentials().await?.session_token)
2828 }
2829
2830 #[maybe_async::async_impl]
2833 pub async fn credentials(&self) -> Result<Credentials, S3Error> {
2834 Ok(self.credentials.read().await.clone())
2835 }
2836
2837 #[maybe_async::sync_impl]
2838 pub fn credentials(&self) -> Result<Credentials, S3Error> {
2839 match self.credentials.read() {
2840 Ok(credentials) => Ok(credentials.clone()),
2841 Err(_) => Err(S3Error::CredentialsReadLock),
2842 }
2843 }
2844
2845 pub fn set_credentials(&mut self, credentials: Credentials) {
2847 self.credentials = Arc::new(RwLock::new(credentials));
2848 }
2849
2850 pub fn add_header(&mut self, key: &str, value: &str) {
2863 self.extra_headers
2864 .insert(HeaderName::from_str(key).unwrap(), value.parse().unwrap());
2865 }
2866
2867 pub fn extra_headers(&self) -> &HeaderMap {
2869 &self.extra_headers
2870 }
2871
2872 pub fn extra_headers_mut(&mut self) -> &mut HeaderMap {
2875 &mut self.extra_headers
2876 }
2877
2878 pub fn add_query(&mut self, key: &str, value: &str) {
2880 self.extra_query.insert(key.into(), value.into());
2881 }
2882
2883 pub fn extra_query(&self) -> &Query {
2885 &self.extra_query
2886 }
2887
2888 pub fn extra_query_mut(&mut self) -> &mut Query {
2891 &mut self.extra_query
2892 }
2893
2894 pub fn request_timeout(&self) -> Option<Duration> {
2895 self.request_timeout
2896 }
2897}
2898
2899#[cfg(test)]
2900mod test {
2901
2902 use crate::BucketConfiguration;
2903 use crate::Tag;
2904 use crate::creds::Credentials;
2905 use crate::post_policy::{PostPolicyField, PostPolicyValue};
2906 use crate::region::Region;
2907 use crate::serde_types::CorsConfiguration;
2908 use crate::serde_types::CorsRule;
2909 use crate::{Bucket, PostPolicy};
2910 use http::HeaderMap;
2911 use http::header::HeaderName;
2912 use std::env;
2913
2914 fn init() {
2915 let _ = env_logger::builder().is_test(true).try_init();
2916 }
2917
2918 fn test_aws_credentials() -> Credentials {
2919 Credentials::new(
2920 Some(&env::var("EU_AWS_ACCESS_KEY_ID").unwrap()),
2921 Some(&env::var("EU_AWS_SECRET_ACCESS_KEY").unwrap()),
2922 None,
2923 None,
2924 None,
2925 )
2926 .unwrap()
2927 }
2928
2929 fn test_gc_credentials() -> Credentials {
2930 Credentials::new(
2931 Some(&env::var("GC_ACCESS_KEY_ID").unwrap()),
2932 Some(&env::var("GC_SECRET_ACCESS_KEY").unwrap()),
2933 None,
2934 None,
2935 None,
2936 )
2937 .unwrap()
2938 }
2939
2940 fn test_wasabi_credentials() -> Credentials {
2941 Credentials::new(
2942 Some(&env::var("WASABI_ACCESS_KEY_ID").unwrap()),
2943 Some(&env::var("WASABI_SECRET_ACCESS_KEY").unwrap()),
2944 None,
2945 None,
2946 None,
2947 )
2948 .unwrap()
2949 }
2950
2951 fn test_minio_credentials() -> Credentials {
2952 Credentials::new(
2953 Some(&env::var("MINIO_ACCESS_KEY_ID").unwrap()),
2954 Some(&env::var("MINIO_SECRET_ACCESS_KEY").unwrap()),
2955 None,
2956 None,
2957 None,
2958 )
2959 .unwrap()
2960 }
2961
2962 fn test_digital_ocean_credentials() -> Credentials {
2963 Credentials::new(
2964 Some(&env::var("DIGITAL_OCEAN_ACCESS_KEY_ID").unwrap()),
2965 Some(&env::var("DIGITAL_OCEAN_SECRET_ACCESS_KEY").unwrap()),
2966 None,
2967 None,
2968 None,
2969 )
2970 .unwrap()
2971 }
2972
2973 fn test_r2_credentials() -> Credentials {
2974 Credentials::new(
2975 Some(&env::var("R2_ACCESS_KEY_ID").unwrap()),
2976 Some(&env::var("R2_SECRET_ACCESS_KEY").unwrap()),
2977 None,
2978 None,
2979 None,
2980 )
2981 .unwrap()
2982 }
2983
2984 fn test_aws_bucket() -> Box<Bucket> {
2985 Bucket::new(
2986 "rust-s3-test",
2987 "eu-central-1".parse().unwrap(),
2988 test_aws_credentials(),
2989 )
2990 .unwrap()
2991 }
2992
2993 fn test_wasabi_bucket() -> Box<Bucket> {
2994 Bucket::new(
2995 "rust-s3",
2996 "wa-eu-central-1".parse().unwrap(),
2997 test_wasabi_credentials(),
2998 )
2999 .unwrap()
3000 }
3001
3002 fn test_gc_bucket() -> Box<Bucket> {
3003 let mut bucket = Bucket::new(
3004 "rust-s3",
3005 Region::Custom {
3006 region: "us-east1".to_owned(),
3007 endpoint: "https://storage.googleapis.com".to_owned(),
3008 },
3009 test_gc_credentials(),
3010 )
3011 .unwrap();
3012 bucket.set_listobjects_v1();
3013 bucket
3014 }
3015
3016 fn test_minio_bucket() -> Box<Bucket> {
3017 Bucket::new(
3018 "rust-s3",
3019 Region::Custom {
3020 region: "us-east-1".to_owned(),
3021 endpoint: "http://localhost:9000".to_owned(),
3022 },
3023 test_minio_credentials(),
3024 )
3025 .unwrap()
3026 .with_path_style()
3027 }
3028
3029 #[allow(dead_code)]
3030 fn test_digital_ocean_bucket() -> Box<Bucket> {
3031 Bucket::new("rust-s3", Region::DoFra1, test_digital_ocean_credentials()).unwrap()
3032 }
3033
3034 fn test_r2_bucket() -> Box<Bucket> {
3035 Bucket::new(
3036 "rust-s3",
3037 Region::R2 {
3038 account_id: "f048f3132be36fa1aaa8611992002b3f".to_string(),
3039 },
3040 test_r2_credentials(),
3041 )
3042 .unwrap()
3043 }
3044
3045 fn object(size: u32) -> Vec<u8> {
3046 (0..size).map(|_| 33).collect()
3047 }
3048
3049 #[maybe_async::maybe_async]
3050 async fn put_head_get_delete_object(bucket: Bucket, head: bool) {
3051 let s3_path = "/+test.file";
3052 let non_existant_path = "/+non_existant.file";
3053 let test: Vec<u8> = object(3072);
3054
3055 let response_data = bucket.put_object(s3_path, &test).await.unwrap();
3056 assert_eq!(response_data.status_code(), 200);
3057
3058 let response_data = bucket.get_object(s3_path).await.unwrap();
3064 assert_eq!(response_data.status_code(), 200);
3065 assert_eq!(test, response_data.as_slice());
3066
3067 let exists = bucket.object_exists(s3_path).await.unwrap();
3068 assert!(exists);
3069
3070 let not_exists = bucket.object_exists(non_existant_path).await.unwrap();
3071 assert!(!not_exists);
3072
3073 let response_data = bucket
3074 .get_object_range(s3_path, 100, Some(1000))
3075 .await
3076 .unwrap();
3077 assert_eq!(response_data.status_code(), 206);
3078 assert_eq!(test[100..1001].to_vec(), response_data.as_slice());
3079 if head {
3080 let (_head_object_result, code) = bucket.head_object(s3_path).await.unwrap();
3081 assert_eq!(code, 200);
3083 }
3084
3085 let response_data = bucket.delete_object(s3_path).await.unwrap();
3087 assert_eq!(response_data.status_code(), 204);
3088 }
3089
3090 #[ignore]
3091 #[cfg(feature = "tags")]
3092 #[maybe_async::test(
3093 feature = "sync",
3094 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3095 async(
3096 all(not(feature = "sync"), feature = "with-async-std"),
3097 async_std::test
3098 )
3099 )]
3100 async fn test_tagging_aws() {
3101 let bucket = test_aws_bucket();
3102 let _target_tags = vec![
3103 Tag {
3104 key: "Tag1".to_string(),
3105 value: "Value1".to_string(),
3106 },
3107 Tag {
3108 key: "Tag2".to_string(),
3109 value: "Value2".to_string(),
3110 },
3111 ];
3112 let empty_tags: Vec<Tag> = Vec::new();
3113 let response_data = bucket
3114 .put_object("tagging_test", b"Gimme tags")
3115 .await
3116 .unwrap();
3117 assert_eq!(response_data.status_code(), 200);
3118 let (tags, _code) = bucket.get_object_tagging("tagging_test").await.unwrap();
3119 assert_eq!(tags, empty_tags);
3120 let response_data = bucket
3121 .put_object_tagging("tagging_test", &[("Tag1", "Value1"), ("Tag2", "Value2")])
3122 .await
3123 .unwrap();
3124 assert_eq!(response_data.status_code(), 200);
3125 let (_tags, _code) = bucket.get_object_tagging("tagging_test").await.unwrap();
3127 let _response_data = bucket.delete_object("tagging_test").await.unwrap();
3129 }
3130
3131 #[ignore]
3132 #[cfg(feature = "tags")]
3133 #[maybe_async::test(
3134 feature = "sync",
3135 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3136 async(
3137 all(not(feature = "sync"), feature = "with-async-std"),
3138 async_std::test
3139 )
3140 )]
3141 async fn test_tagging_minio() {
3142 let bucket = test_minio_bucket();
3143 let _target_tags = vec![
3144 Tag {
3145 key: "Tag1".to_string(),
3146 value: "Value1".to_string(),
3147 },
3148 Tag {
3149 key: "Tag2".to_string(),
3150 value: "Value2".to_string(),
3151 },
3152 ];
3153 let empty_tags: Vec<Tag> = Vec::new();
3154 let response_data = bucket
3155 .put_object("tagging_test", b"Gimme tags")
3156 .await
3157 .unwrap();
3158 assert_eq!(response_data.status_code(), 200);
3159 let (tags, _code) = bucket.get_object_tagging("tagging_test").await.unwrap();
3160 assert_eq!(tags, empty_tags);
3161 let response_data = bucket
3162 .put_object_tagging("tagging_test", &[("Tag1", "Value1"), ("Tag2", "Value2")])
3163 .await
3164 .unwrap();
3165 assert_eq!(response_data.status_code(), 200);
3166 let (_tags, _code) = bucket.get_object_tagging("tagging_test").await.unwrap();
3168 let _response_data = bucket.delete_object("tagging_test").await.unwrap();
3170 }
3171
3172 #[ignore]
3173 #[maybe_async::test(
3174 feature = "sync",
3175 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3176 async(
3177 all(not(feature = "sync"), feature = "with-async-std"),
3178 async_std::test
3179 )
3180 )]
3181 async fn streaming_big_aws_put_head_get_delete_object() {
3182 streaming_test_put_get_delete_big_object(*test_aws_bucket()).await;
3183 }
3184
3185 #[ignore]
3186 #[maybe_async::test(
3187 feature = "sync",
3188 async(
3189 all(
3190 not(feature = "sync"),
3191 not(feature = "tokio-rustls-tls"),
3192 feature = "with-tokio"
3193 ),
3194 tokio::test
3195 ),
3196 async(
3197 all(not(feature = "sync"), feature = "with-async-std"),
3198 async_std::test
3199 )
3200 )]
3201 async fn streaming_big_gc_put_head_get_delete_object() {
3202 streaming_test_put_get_delete_big_object(*test_gc_bucket()).await;
3203 }
3204
3205 #[ignore]
3206 #[maybe_async::test(
3207 feature = "sync",
3208 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3209 async(
3210 all(not(feature = "sync"), feature = "with-async-std"),
3211 async_std::test
3212 )
3213 )]
3214 async fn streaming_big_minio_put_head_get_delete_object() {
3215 streaming_test_put_get_delete_big_object(*test_minio_bucket()).await;
3216 }
3217
3218 #[ignore]
3219 #[maybe_async::test(
3220 feature = "sync",
3221 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3222 async(
3223 all(not(feature = "sync"), feature = "with-async-std"),
3224 async_std::test
3225 )
3226 )]
3227 async fn streaming_big_r2_put_head_get_delete_object() {
3228 streaming_test_put_get_delete_big_object(*test_r2_bucket()).await;
3229 }
3230
3231 #[maybe_async::maybe_async]
3233 async fn streaming_test_put_get_delete_big_object(bucket: Bucket) {
3234 #[cfg(feature = "with-async-std")]
3235 use async_std::fs::File;
3236 #[cfg(feature = "with-async-std")]
3237 use async_std::io::WriteExt;
3238 #[cfg(feature = "with-async-std")]
3239 use async_std::stream::StreamExt;
3240 #[cfg(feature = "with-tokio")]
3241 use futures_util::StreamExt;
3242 #[cfg(not(any(feature = "with-tokio", feature = "with-async-std")))]
3243 use std::fs::File;
3244 #[cfg(not(any(feature = "with-tokio", feature = "with-async-std")))]
3245 use std::io::Write;
3246 #[cfg(feature = "with-tokio")]
3247 use tokio::fs::File;
3248 #[cfg(feature = "with-tokio")]
3249 use tokio::io::AsyncWriteExt;
3250
3251 init();
3252 let remote_path = "+stream_test_big";
3253 let local_path = "+stream_test_big";
3254 std::fs::remove_file(remote_path).unwrap_or(());
3255 let content: Vec<u8> = object(20_000_000);
3256
3257 let mut file = File::create(local_path).await.unwrap();
3258 file.write_all(&content).await.unwrap();
3259 file.flush().await.unwrap();
3260 let mut reader = File::open(local_path).await.unwrap();
3261
3262 let response = bucket
3263 .put_object_stream(&mut reader, remote_path)
3264 .await
3265 .unwrap();
3266 #[cfg(not(feature = "sync"))]
3267 assert_eq!(response.status_code(), 200);
3268 #[cfg(feature = "sync")]
3269 assert_eq!(response, 200);
3270 let mut writer = Vec::new();
3271 let code = bucket
3272 .get_object_to_writer(remote_path, &mut writer)
3273 .await
3274 .unwrap();
3275 assert_eq!(code, 200);
3276 assert_eq!(content.len(), writer.len());
3278 assert_eq!(content.len(), 20_000_000);
3279
3280 #[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
3281 {
3282 let mut response_data_stream = bucket.get_object_stream(remote_path).await.unwrap();
3283
3284 let mut bytes = vec![];
3285
3286 while let Some(chunk) = response_data_stream.bytes().next().await {
3287 bytes.push(chunk)
3288 }
3289 assert_ne!(bytes.len(), 0);
3290 }
3291
3292 let response_data = bucket.delete_object(remote_path).await.unwrap();
3293 assert_eq!(response_data.status_code(), 204);
3294 std::fs::remove_file(local_path).unwrap_or(());
3295 }
3296
3297 #[ignore]
3298 #[maybe_async::test(
3299 feature = "sync",
3300 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3301 async(
3302 all(not(feature = "sync"), feature = "with-async-std"),
3303 async_std::test
3304 )
3305 )]
3306 async fn streaming_aws_put_head_get_delete_object() {
3307 streaming_test_put_get_delete_small_object(test_aws_bucket()).await;
3308 }
3309
3310 #[ignore]
3311 #[maybe_async::test(
3312 feature = "sync",
3313 async(
3314 all(
3315 not(feature = "sync"),
3316 not(feature = "tokio-rustls-tls"),
3317 feature = "with-tokio"
3318 ),
3319 tokio::test
3320 ),
3321 async(
3322 all(not(feature = "sync"), feature = "with-async-std"),
3323 async_std::test
3324 )
3325 )]
3326 async fn streaming_gc_put_head_get_delete_object() {
3327 streaming_test_put_get_delete_small_object(test_gc_bucket()).await;
3328 }
3329
3330 #[ignore]
3331 #[maybe_async::test(
3332 feature = "sync",
3333 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3334 async(
3335 all(not(feature = "sync"), feature = "with-async-std"),
3336 async_std::test
3337 )
3338 )]
3339 async fn streaming_r2_put_head_get_delete_object() {
3340 streaming_test_put_get_delete_small_object(test_r2_bucket()).await;
3341 }
3342
3343 #[ignore]
3344 #[maybe_async::test(
3345 feature = "sync",
3346 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3347 async(
3348 all(not(feature = "sync"), feature = "with-async-std"),
3349 async_std::test
3350 )
3351 )]
3352 async fn streaming_minio_put_head_get_delete_object() {
3353 streaming_test_put_get_delete_small_object(test_minio_bucket()).await;
3354 }
3355
3356 #[maybe_async::maybe_async]
3357 async fn streaming_test_put_get_delete_small_object(bucket: Box<Bucket>) {
3358 init();
3359 let remote_path = "+stream_test_small";
3360 let content: Vec<u8> = object(1000);
3361 #[cfg(feature = "with-tokio")]
3362 let mut reader = std::io::Cursor::new(&content);
3363 #[cfg(feature = "with-async-std")]
3364 let mut reader = async_std::io::Cursor::new(&content);
3365 #[cfg(feature = "sync")]
3366 let mut reader = std::io::Cursor::new(&content);
3367
3368 let response = bucket
3369 .put_object_stream(&mut reader, remote_path)
3370 .await
3371 .unwrap();
3372 #[cfg(not(feature = "sync"))]
3373 assert_eq!(response.status_code(), 200);
3374 #[cfg(feature = "sync")]
3375 assert_eq!(response, 200);
3376 let mut writer = Vec::new();
3377 let code = bucket
3378 .get_object_to_writer(remote_path, &mut writer)
3379 .await
3380 .unwrap();
3381 assert_eq!(code, 200);
3382 assert_eq!(content, writer);
3383
3384 let response_data = bucket.delete_object(remote_path).await.unwrap();
3385 assert_eq!(response_data.status_code(), 204);
3386 }
3387
3388 #[cfg(feature = "blocking")]
3389 fn put_head_get_list_delete_object_blocking(bucket: Bucket) {
3390 let s3_path = "/test_blocking.file";
3391 let s3_path_2 = "/test_blocking.file2";
3392 let s3_path_3 = "/test_blocking.file3";
3393 let test: Vec<u8> = object(3072);
3394
3395 let response_data = bucket.put_object_blocking(s3_path, &test).unwrap();
3397 assert_eq!(response_data.status_code(), 200);
3398
3399 let response_data = bucket.get_object_blocking(s3_path).unwrap();
3401 assert_eq!(response_data.status_code(), 200);
3402 assert_eq!(test, response_data.as_slice());
3403
3404 let response_data = bucket
3406 .get_object_range_blocking(s3_path, 100, Some(1000))
3407 .unwrap();
3408 assert_eq!(response_data.status_code(), 206);
3409 assert_eq!(test[100..1001].to_vec(), response_data.as_slice());
3410
3411 let (head_object_result, code) = bucket.head_object_blocking(s3_path).unwrap();
3413 assert_eq!(code, 200);
3414 assert_eq!(
3415 head_object_result.content_type.unwrap(),
3416 "application/octet-stream".to_owned()
3417 );
3418 let response_data = bucket.put_object_blocking(s3_path_2, &test).unwrap();
3422 assert_eq!(response_data.status_code(), 200);
3423 let response_data = bucket.put_object_blocking(s3_path_3, &test).unwrap();
3424 assert_eq!(response_data.status_code(), 200);
3425
3426 let (result, code) = bucket
3428 .list_page_blocking(
3429 "test_blocking.".to_string(),
3430 Some("/".to_string()),
3431 None,
3432 None,
3433 Some(2),
3434 )
3435 .unwrap();
3436 assert_eq!(code, 200);
3437 assert_eq!(result.contents.len(), 2);
3438 assert_eq!(result.contents[0].key, s3_path[1..]);
3439 assert_eq!(result.contents[1].key, s3_path_2[1..]);
3440
3441 let cont_token = result.next_continuation_token.unwrap();
3442
3443 let (result, code) = bucket
3444 .list_page_blocking(
3445 "test_blocking.".to_string(),
3446 Some("/".to_string()),
3447 Some(cont_token),
3448 None,
3449 Some(2),
3450 )
3451 .unwrap();
3452 assert_eq!(code, 200);
3453 assert_eq!(result.contents.len(), 1);
3454 assert_eq!(result.contents[0].key, s3_path_3[1..]);
3455 assert!(result.next_continuation_token.is_none());
3456
3457 let response_data = bucket.delete_object_blocking(s3_path).unwrap();
3459 assert_eq!(code, 200);
3460 let response_data = bucket.delete_object_blocking(s3_path_2).unwrap();
3461 assert_eq!(code, 200);
3462 let response_data = bucket.delete_object_blocking(s3_path_3).unwrap();
3463 assert_eq!(code, 200);
3464 }
3465
3466 #[ignore]
3467 #[cfg(all(
3468 any(feature = "with-tokio", feature = "with-async-std"),
3469 feature = "blocking"
3470 ))]
3471 #[test]
3472 fn aws_put_head_get_delete_object_blocking() {
3473 put_head_get_list_delete_object_blocking(*test_aws_bucket())
3474 }
3475
3476 #[ignore]
3477 #[cfg(all(
3478 any(feature = "with-tokio", feature = "with-async-std"),
3479 feature = "blocking"
3480 ))]
3481 #[test]
3482 fn gc_put_head_get_delete_object_blocking() {
3483 put_head_get_list_delete_object_blocking(*test_gc_bucket())
3484 }
3485
3486 #[ignore]
3487 #[cfg(all(
3488 any(feature = "with-tokio", feature = "with-async-std"),
3489 feature = "blocking"
3490 ))]
3491 #[test]
3492 fn wasabi_put_head_get_delete_object_blocking() {
3493 put_head_get_list_delete_object_blocking(*test_wasabi_bucket())
3494 }
3495
3496 #[ignore]
3497 #[cfg(all(
3498 any(feature = "with-tokio", feature = "with-async-std"),
3499 feature = "blocking"
3500 ))]
3501 #[test]
3502 fn minio_put_head_get_delete_object_blocking() {
3503 put_head_get_list_delete_object_blocking(*test_minio_bucket())
3504 }
3505
3506 #[ignore]
3507 #[cfg(all(
3508 any(feature = "with-tokio", feature = "with-async-std"),
3509 feature = "blocking"
3510 ))]
3511 #[test]
3512 fn digital_ocean_put_head_get_delete_object_blocking() {
3513 put_head_get_list_delete_object_blocking(*test_digital_ocean_bucket())
3514 }
3515
3516 #[ignore]
3517 #[maybe_async::test(
3518 feature = "sync",
3519 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3520 async(
3521 all(not(feature = "sync"), feature = "with-async-std"),
3522 async_std::test
3523 )
3524 )]
3525 async fn aws_put_head_get_delete_object() {
3526 put_head_get_delete_object(*test_aws_bucket(), true).await;
3527 }
3528
3529 #[ignore]
3530 #[maybe_async::test(
3531 feature = "sync",
3532 async(
3533 all(
3534 not(any(feature = "sync", feature = "tokio-rustls-tls")),
3535 feature = "with-tokio"
3536 ),
3537 tokio::test
3538 ),
3539 async(
3540 all(not(feature = "sync"), feature = "with-async-std"),
3541 async_std::test
3542 )
3543 )]
3544 async fn gc_test_put_head_get_delete_object() {
3545 put_head_get_delete_object(*test_gc_bucket(), true).await;
3546 }
3547
3548 #[ignore]
3549 #[maybe_async::test(
3550 feature = "sync",
3551 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3552 async(
3553 all(not(feature = "sync"), feature = "with-async-std"),
3554 async_std::test
3555 )
3556 )]
3557 async fn wasabi_test_put_head_get_delete_object() {
3558 put_head_get_delete_object(*test_wasabi_bucket(), true).await;
3559 }
3560
3561 #[ignore]
3562 #[maybe_async::test(
3563 feature = "sync",
3564 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3565 async(
3566 all(not(feature = "sync"), feature = "with-async-std"),
3567 async_std::test
3568 )
3569 )]
3570 async fn minio_test_put_head_get_delete_object() {
3571 put_head_get_delete_object(*test_minio_bucket(), true).await;
3572 }
3573
3574 #[ignore]
3589 #[maybe_async::test(
3590 feature = "sync",
3591 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3592 async(
3593 all(not(feature = "sync"), feature = "with-async-std"),
3594 async_std::test
3595 )
3596 )]
3597 async fn r2_test_put_head_get_delete_object() {
3598 put_head_get_delete_object(*test_r2_bucket(), false).await;
3599 }
3600
3601 #[maybe_async::test(
3602 feature = "sync",
3603 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3604 async(
3605 all(not(feature = "sync"), feature = "with-async-std"),
3606 async_std::test
3607 )
3608 )]
3609 async fn test_presign_put() {
3610 let s3_path = "/test/test.file";
3611 let bucket = test_minio_bucket();
3612
3613 let mut custom_headers = HeaderMap::new();
3614 custom_headers.insert(
3615 HeaderName::from_static("custom_header"),
3616 "custom_value".parse().unwrap(),
3617 );
3618
3619 let url = bucket
3620 .presign_put(s3_path, 86400, Some(custom_headers), None)
3621 .await
3622 .unwrap();
3623
3624 assert!(url.contains("custom_header%3Bhost"));
3625 assert!(url.contains("/test/test.file"))
3626 }
3627
3628 #[maybe_async::test(
3629 feature = "sync",
3630 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3631 async(
3632 all(not(feature = "sync"), feature = "with-async-std"),
3633 async_std::test
3634 )
3635 )]
3636 async fn test_presign_post() {
3637 use std::borrow::Cow;
3638
3639 let bucket = test_minio_bucket();
3640
3641 let policy = PostPolicy::new(86400)
3643 .condition(
3644 PostPolicyField::Key,
3645 PostPolicyValue::StartsWith(Cow::from("user/user1/")),
3646 )
3647 .unwrap();
3648
3649 let data = bucket.presign_post(policy).await.unwrap();
3650
3651 assert_eq!(data.url, "http://localhost:9000/rust-s3");
3652 assert_eq!(data.fields.len(), 6);
3653 assert_eq!(data.dynamic_fields.len(), 1);
3654 }
3655
3656 #[maybe_async::test(
3657 feature = "sync",
3658 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3659 async(
3660 all(not(feature = "sync"), feature = "with-async-std"),
3661 async_std::test
3662 )
3663 )]
3664 async fn test_presign_get() {
3665 let s3_path = "/test/test.file";
3666 let bucket = test_minio_bucket();
3667
3668 let url = bucket.presign_get(s3_path, 86400, None).await.unwrap();
3669 assert!(url.contains("/test/test.file?"))
3670 }
3671
3672 #[maybe_async::test(
3673 feature = "sync",
3674 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3675 async(
3676 all(not(feature = "sync"), feature = "with-async-std"),
3677 async_std::test
3678 )
3679 )]
3680 async fn test_presign_delete() {
3681 let s3_path = "/test/test.file";
3682 let bucket = test_minio_bucket();
3683
3684 let url = bucket.presign_delete(s3_path, 86400).await.unwrap();
3685 assert!(url.contains("/test/test.file?"))
3686 }
3687
3688 #[maybe_async::test(
3689 feature = "sync",
3690 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3691 async(
3692 all(not(feature = "sync"), feature = "with-async-std"),
3693 async_std::test
3694 )
3695 )]
3696 async fn test_presign_url_standard_ports() {
3697 let region_http_80 = Region::Custom {
3702 region: "eu-central-1".to_owned(),
3703 endpoint: "http://minio:80".to_owned(),
3704 };
3705 let credentials = Credentials::new(
3706 Some("test_access_key"),
3707 Some("test_secret_key"),
3708 None,
3709 None,
3710 None,
3711 )
3712 .unwrap();
3713 let bucket_http_80 = Bucket::new("test-bucket", region_http_80, credentials.clone())
3714 .unwrap()
3715 .with_path_style();
3716
3717 let presigned_url_80 = bucket_http_80
3718 .presign_get("/test.file", 3600, None)
3719 .await
3720 .unwrap();
3721 println!("Presigned URL with port 80: {}", presigned_url_80);
3722
3723 assert!(
3725 presigned_url_80.starts_with("http://minio:80/"),
3726 "URL must preserve port 80, got: {}",
3727 presigned_url_80
3728 );
3729
3730 let region_https_443 = Region::Custom {
3732 region: "eu-central-1".to_owned(),
3733 endpoint: "https://minio:443".to_owned(),
3734 };
3735 let bucket_https_443 = Bucket::new("test-bucket", region_https_443, credentials.clone())
3736 .unwrap()
3737 .with_path_style();
3738
3739 let presigned_url_443 = bucket_https_443
3740 .presign_get("/test.file", 3600, None)
3741 .await
3742 .unwrap();
3743 println!("Presigned URL with port 443: {}", presigned_url_443);
3744
3745 assert!(
3747 presigned_url_443.starts_with("https://minio:443/"),
3748 "URL must preserve port 443, got: {}",
3749 presigned_url_443
3750 );
3751
3752 let region_http_9000 = Region::Custom {
3754 region: "eu-central-1".to_owned(),
3755 endpoint: "http://minio:9000".to_owned(),
3756 };
3757 let bucket_http_9000 = Bucket::new("test-bucket", region_http_9000, credentials)
3758 .unwrap()
3759 .with_path_style();
3760
3761 let presigned_url_9000 = bucket_http_9000
3762 .presign_get("/test.file", 3600, None)
3763 .await
3764 .unwrap();
3765 assert!(
3766 presigned_url_9000.contains("minio:9000"),
3767 "Non-standard port should be preserved in URL"
3768 );
3769 }
3770
3771 #[maybe_async::test(
3772 feature = "sync",
3773 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3774 async(
3775 all(not(feature = "sync"), feature = "with-async-std"),
3776 async_std::test
3777 )
3778 )]
3779 #[ignore]
3780 async fn test_bucket_create_delete_default_region() {
3781 let config = BucketConfiguration::default();
3782 let response = Bucket::create(
3783 &uuid::Uuid::new_v4().to_string(),
3784 "us-east-1".parse().unwrap(),
3785 test_aws_credentials(),
3786 config,
3787 )
3788 .await
3789 .unwrap();
3790
3791 assert_eq!(&response.response_text, "");
3792
3793 assert_eq!(response.response_code, 200);
3794
3795 let response_code = response.bucket.delete().await.unwrap();
3796 assert!(response_code < 300);
3797 }
3798
3799 #[ignore]
3800 #[maybe_async::test(
3801 feature = "sync",
3802 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3803 async(
3804 all(not(feature = "sync"), feature = "with-async-std"),
3805 async_std::test
3806 )
3807 )]
3808 async fn test_bucket_create_delete_non_default_region() {
3809 let config = BucketConfiguration::default();
3810 let response = Bucket::create(
3811 &uuid::Uuid::new_v4().to_string(),
3812 "eu-central-1".parse().unwrap(),
3813 test_aws_credentials(),
3814 config,
3815 )
3816 .await
3817 .unwrap();
3818
3819 assert_eq!(&response.response_text, "");
3820
3821 assert_eq!(response.response_code, 200);
3822
3823 let response_code = response.bucket.delete().await.unwrap();
3824 assert!(response_code < 300);
3825 }
3826
3827 #[ignore]
3828 #[maybe_async::test(
3829 feature = "sync",
3830 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3831 async(
3832 all(not(feature = "sync"), feature = "with-async-std"),
3833 async_std::test
3834 )
3835 )]
3836 async fn test_bucket_create_delete_non_default_region_public() {
3837 let config = BucketConfiguration::public();
3838 let response = Bucket::create(
3839 &uuid::Uuid::new_v4().to_string(),
3840 "eu-central-1".parse().unwrap(),
3841 test_aws_credentials(),
3842 config,
3843 )
3844 .await
3845 .unwrap();
3846
3847 assert_eq!(&response.response_text, "");
3848
3849 assert_eq!(response.response_code, 200);
3850
3851 let response_code = response.bucket.delete().await.unwrap();
3852 assert!(response_code < 300);
3853 }
3854
3855 #[test]
3856 fn test_tag_has_key_and_value_functions() {
3857 let key = "key".to_owned();
3858 let value = "value".to_owned();
3859 let tag = Tag { key, value };
3860 assert_eq!["key", tag.key()];
3861 assert_eq!["value", tag.value()];
3862 }
3863
3864 #[test]
3865 #[ignore]
3866 fn test_builder_composition() {
3867 use std::time::Duration;
3868
3869 let bucket = Bucket::new(
3870 "test-bucket",
3871 "eu-central-1".parse().unwrap(),
3872 test_aws_credentials(),
3873 )
3874 .unwrap()
3875 .with_request_timeout(Duration::from_secs(10))
3876 .unwrap();
3877
3878 assert_eq!(bucket.request_timeout(), Some(Duration::from_secs(10)));
3879 }
3880
3881 #[maybe_async::test(
3882 feature = "sync",
3883 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3884 async(
3885 all(not(feature = "sync"), feature = "with-async-std"),
3886 async_std::test
3887 )
3888 )]
3889 #[ignore]
3890 async fn test_bucket_cors() {
3891 let bucket = test_aws_bucket();
3892 let rule = CorsRule::new(
3893 None,
3894 vec!["GET".to_string()],
3895 vec!["*".to_string()],
3896 None,
3897 None,
3898 None,
3899 );
3900 let expected_bucket_owner = "904662384344";
3901 let cors_config = CorsConfiguration::new(vec![rule]);
3902 let response = bucket
3903 .put_bucket_cors(expected_bucket_owner, &cors_config)
3904 .await
3905 .unwrap();
3906 assert_eq!(response.status_code(), 200);
3907
3908 let cors_response = bucket.get_bucket_cors(expected_bucket_owner).await.unwrap();
3909 assert_eq!(cors_response, cors_config);
3910
3911 let response = bucket
3912 .delete_bucket_cors(expected_bucket_owner)
3913 .await
3914 .unwrap();
3915 assert_eq!(response.status_code(), 204);
3916 }
3917
3918 #[ignore]
3919 #[cfg(any(feature = "tokio-native-tls", feature = "tokio-rustls-tls"))]
3920 #[maybe_async::test(
3921 feature = "sync",
3922 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3923 async(
3924 all(not(feature = "sync"), feature = "with-async-std"),
3925 async_std::test
3926 )
3927 )]
3928 async fn test_bucket_exists_with_dangerous_config() {
3929 init();
3930
3931 let credentials = test_aws_credentials();
3939 let region = "eu-central-1".parse().unwrap();
3940 let bucket_name = "rust-s3-test";
3941
3942 let bucket = Bucket::new(bucket_name, region, credentials)
3944 .unwrap()
3945 .with_path_style();
3946
3947 let bucket = bucket.set_dangereous_config(true, true).unwrap();
3949
3950 let exists_result = bucket.exists().await;
3953
3954 assert!(
3956 exists_result.is_ok(),
3957 "Bucket::exists() failed with dangerous config"
3958 );
3959 let exists = exists_result.unwrap();
3960 assert!(exists, "Test bucket should exist");
3961
3962 let list_result = bucket.list("".to_string(), Some("/".to_string())).await;
3965 assert!(
3966 list_result.is_ok(),
3967 "List operation should work with dangerous config"
3968 );
3969 }
3970
3971 #[ignore]
3972 #[maybe_async::test(
3973 feature = "sync",
3974 async(all(not(feature = "sync"), feature = "with-tokio"), tokio::test),
3975 async(
3976 all(not(feature = "sync"), feature = "with-async-std"),
3977 async_std::test
3978 )
3979 )]
3980 async fn test_bucket_exists_without_dangerous_config() {
3981 init();
3982
3983 let credentials = test_aws_credentials();
3985 let region = "eu-central-1".parse().unwrap();
3986 let bucket_name = "rust-s3-test";
3987
3988 let bucket = Bucket::new(bucket_name, region, credentials)
3990 .unwrap()
3991 .with_path_style();
3992
3993 let exists_result = bucket.exists().await;
3995 assert!(
3996 exists_result.is_ok(),
3997 "Bucket::exists() should work without dangerous config"
3998 );
3999 let exists = exists_result.unwrap();
4000 assert!(exists, "Test bucket should exist");
4001 }
4002}