1use async_trait::async_trait;
6use aws_credential_types::Credentials;
7use aws_sdk_s3::error::ProvideErrorMetadata;
8use aws_sigv4::http_request::{
9 SignableBody, SignableRequest, SignatureLocation, SigningSettings, sign,
10};
11use aws_sigv4::sign::v4;
12use aws_smithy_runtime_api::client::http::{
13 HttpClient, HttpConnector, HttpConnectorFuture, HttpConnectorSettings, SharedHttpConnector,
14};
15use aws_smithy_runtime_api::client::orchestrator::HttpRequest;
16use aws_smithy_runtime_api::client::result::ConnectorError;
17use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
18use aws_smithy_runtime_api::http::{Response, StatusCode};
19use aws_smithy_types::body::SdkBody;
20use bytes::Bytes;
21use jiff::Timestamp;
22use quick_xml::de::from_str as from_xml_str;
23use rc_core::{
24 Alias, BucketNotification, Capabilities, CorsRule, Error, LifecycleRule, ListOptions,
25 ListResult, NotificationTarget, ObjectInfo, ObjectStore, ObjectVersion,
26 ObjectVersionListResult, RemotePath, ReplicationConfiguration, Result, SelectOptions,
27};
28use reqwest::Method;
29use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderName, HeaderValue};
30use serde::Deserialize;
31use sha2::{Digest, Sha256};
32use std::collections::HashMap;
33use tokio::io::AsyncReadExt;
34use tokio::io::AsyncWrite;
35
36const SINGLE_PUT_OBJECT_MAX_SIZE: u64 = crate::multipart::DEFAULT_PART_SIZE;
39const S3_SERVICE_NAME: &str = "s3";
40const S3_REPLICATION_XML_NAMESPACE: &str = "http://s3.amazonaws.com/doc/2006-03-01/";
41const RUSTFS_FORCE_DELETE_HEADER: &str = "x-rustfs-force-delete";
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
44enum BucketPolicyErrorKind {
45 MissingPolicy,
46 MissingBucket,
47 Other,
48}
49
50#[derive(Debug, Clone)]
53struct ReqwestConnector {
54 client: reqwest::Client,
55}
56
57impl ReqwestConnector {
58 async fn new(insecure: bool, ca_bundle: Option<&str>) -> Result<Self> {
59 let client = build_reqwest_client(insecure, ca_bundle).await?;
60 Ok(Self { client })
61 }
62}
63
64async fn build_reqwest_client(insecure: bool, ca_bundle: Option<&str>) -> Result<reqwest::Client> {
65 let mut builder = reqwest::Client::builder().danger_accept_invalid_certs(insecure);
69
70 if let Some(bundle_path) = ca_bundle {
71 let pem = tokio::fs::read(bundle_path).await.map_err(|e| {
73 Error::Network(format!("Failed to read CA bundle '{bundle_path}': {e}"))
74 })?;
75 let cert = reqwest::Certificate::from_pem(&pem)
76 .map_err(|e| Error::Network(format!("Invalid CA bundle '{bundle_path}': {e}")))?;
77 builder = builder.add_root_certificate(cert);
78 }
79
80 let client = builder
81 .build()
82 .map_err(|e| Error::Network(format!("Failed to build HTTP client: {e}")))?;
83 Ok(client)
84}
85
86fn force_path_style_for_alias(alias: &Alias) -> bool {
87 match alias.bucket_lookup.as_str() {
88 "path" => true,
89 "dns" => false,
90 "auto" => !is_aliyun_oss_service_endpoint(&alias.endpoint),
91 _ => true,
92 }
93}
94
95fn is_aliyun_oss_service_endpoint(endpoint: &str) -> bool {
96 let Ok(url) = reqwest::Url::parse(endpoint.trim_end_matches('/')) else {
97 return false;
98 };
99
100 let Some(host) = url.host_str() else {
101 return false;
102 };
103
104 host.strip_suffix(".aliyuncs.com")
105 .and_then(|host| host.split('.').next())
106 .is_some_and(|first_label| first_label.starts_with("oss-"))
107}
108
109#[derive(Debug, Deserialize)]
110#[serde(rename_all = "PascalCase")]
111struct ReplicationConfigurationXml {
112 role: Option<String>,
113 #[serde(rename = "Rule", default)]
114 rules: Vec<ReplicationRuleXml>,
115}
116
117#[derive(Debug, Deserialize)]
118#[serde(rename_all = "PascalCase")]
119struct ReplicationRuleXml {
120 #[serde(rename = "ID")]
121 id: Option<String>,
122 priority: Option<i32>,
123 status: Option<String>,
124 #[serde(rename = "Prefix")]
125 legacy_prefix: Option<String>,
126 filter: Option<ReplicationFilterXml>,
127 destination: Option<ReplicationDestinationXml>,
128 delete_marker_replication: Option<ReplicationStatusXml>,
129 existing_object_replication: Option<ReplicationStatusXml>,
130 delete_replication: Option<ReplicationStatusXml>,
131}
132
133#[derive(Debug, Deserialize)]
134#[serde(rename_all = "PascalCase")]
135struct ReplicationFilterXml {
136 prefix: Option<String>,
137 tag: Option<TagXml>,
138 and: Option<ReplicationAndXml>,
139}
140
141#[derive(Debug, Deserialize)]
142#[serde(rename_all = "PascalCase")]
143struct ReplicationAndXml {
144 prefix: Option<String>,
145 #[serde(rename = "Tag", default)]
146 tags: Vec<TagXml>,
147}
148
149#[derive(Debug, Deserialize)]
150#[serde(rename_all = "PascalCase")]
151struct TagXml {
152 key: Option<String>,
153 value: Option<String>,
154}
155
156#[derive(Debug, Deserialize)]
157#[serde(rename_all = "PascalCase")]
158struct ReplicationDestinationXml {
159 bucket: Option<String>,
160 storage_class: Option<String>,
161}
162
163#[derive(Debug, Deserialize)]
164#[serde(rename_all = "PascalCase")]
165struct ReplicationStatusXml {
166 status: Option<String>,
167}
168
169#[derive(Debug, Deserialize)]
170#[serde(rename_all = "PascalCase")]
171struct CorsConfigurationXml {
172 #[serde(rename = "CORSRule", default)]
173 rules: Vec<CorsRuleXml>,
174}
175
176#[derive(Debug, Deserialize)]
177#[serde(rename_all = "PascalCase")]
178struct CorsRuleXml {
179 #[serde(rename = "ID")]
180 id: Option<String>,
181 #[serde(rename = "AllowedOrigin", default)]
182 allowed_origins: Vec<String>,
183 #[serde(rename = "AllowedMethod", default)]
184 allowed_methods: Vec<String>,
185 #[serde(rename = "AllowedHeader", default)]
186 allowed_headers: Vec<String>,
187 #[serde(rename = "ExposeHeader", default)]
188 expose_headers: Vec<String>,
189 max_age_seconds: Option<i32>,
190}
191
192fn parse_replication_status(status: Option<&ReplicationStatusXml>) -> Option<bool> {
193 status
194 .and_then(|value| value.status.as_deref())
195 .map(|value| value.eq_ignore_ascii_case("enabled"))
196}
197
198fn parse_replication_rule_status(status: Option<&str>) -> rc_core::ReplicationRuleStatus {
199 match status {
200 Some(value) if value.eq_ignore_ascii_case("enabled") => {
201 rc_core::ReplicationRuleStatus::Enabled
202 }
203 _ => rc_core::ReplicationRuleStatus::Disabled,
204 }
205}
206
207fn collect_tag_map<'a, I>(tags: I) -> Option<HashMap<String, String>>
208where
209 I: IntoIterator<Item = (&'a str, &'a str)>,
210{
211 let collected: HashMap<String, String> = tags
212 .into_iter()
213 .map(|(key, value)| (key.to_string(), value.to_string()))
214 .collect();
215 if collected.is_empty() {
216 None
217 } else {
218 Some(collected)
219 }
220}
221
222fn parse_tag_xml(tag: Option<&TagXml>) -> Option<HashMap<String, String>> {
223 collect_tag_map(tag.and_then(|tag| Some((tag.key.as_deref()?, tag.value.as_deref()?))))
224}
225
226fn parse_tag_xmls(tags: &[TagXml]) -> Option<HashMap<String, String>> {
227 collect_tag_map(
228 tags.iter()
229 .filter_map(|tag| Some((tag.key.as_deref()?, tag.value.as_deref()?))),
230 )
231}
232
233fn parse_replication_filter_prefix(filter: Option<&ReplicationFilterXml>) -> Option<String> {
234 filter
235 .and_then(|filter| filter.prefix.clone())
236 .or_else(|| filter.and_then(|filter| filter.and.as_ref()?.prefix.clone()))
237}
238
239fn parse_replication_filter_tags(
240 filter: Option<&ReplicationFilterXml>,
241) -> Option<HashMap<String, String>> {
242 filter
243 .and_then(|filter| parse_tag_xml(filter.tag.as_ref()))
244 .or_else(|| filter.and_then(|filter| parse_tag_xmls(&filter.and.as_ref()?.tags)))
245}
246
247fn sorted_tags(tags: &HashMap<String, String>) -> Vec<(&str, &str)> {
248 let mut pairs: Vec<(&str, &str)> = tags
249 .iter()
250 .map(|(key, value)| (key.as_str(), value.as_str()))
251 .collect();
252 pairs.sort_unstable();
253 pairs
254}
255
256fn append_tag_xml(xml: &mut String, key: &str, value: &str) {
257 xml.push_str("<Tag><Key>");
258 xml.push_str(&xml_escape(key));
259 xml.push_str("</Key><Value>");
260 xml.push_str(&xml_escape(value));
261 xml.push_str("</Value></Tag>");
262}
263
264fn append_replication_filter_xml(
265 xml: &mut String,
266 prefix: Option<&str>,
267 tags: Option<&HashMap<String, String>>,
268) {
269 let Some(tags) = tags.filter(|tags| !tags.is_empty()) else {
270 if let Some(prefix) = prefix {
271 xml.push_str("<Filter><Prefix>");
272 xml.push_str(&xml_escape(prefix));
273 xml.push_str("</Prefix></Filter>");
274 }
275 return;
276 };
277
278 xml.push_str("<Filter>");
279 if prefix.is_some() || tags.len() > 1 {
280 xml.push_str("<And>");
281 if let Some(prefix) = prefix {
282 xml.push_str("<Prefix>");
283 xml.push_str(&xml_escape(prefix));
284 xml.push_str("</Prefix>");
285 }
286 for (key, value) in sorted_tags(tags) {
287 append_tag_xml(xml, key, value);
288 }
289 xml.push_str("</And>");
290 } else if let Some((key, value)) = sorted_tags(tags).into_iter().next() {
291 append_tag_xml(xml, key, value);
292 }
293 xml.push_str("</Filter>");
294}
295
296fn normalize_optional_strings(values: Option<Vec<String>>) -> Option<Vec<String>> {
297 values.filter(|items| !items.is_empty())
298}
299
300fn is_missing_cors_configuration_error(error_text: &str) -> bool {
301 let normalized = error_text.to_ascii_lowercase();
302 normalized.contains("nosuchcorsconfiguration")
303 || normalized.contains("cors configuration does not exist")
304 || normalized.contains("the cors configuration does not exist")
305}
306
307fn is_missing_cors_configuration_response(
308 error_code: Option<&str>,
309 status_code: Option<u16>,
310 error_text: &str,
311) -> bool {
312 let error_code = error_code.map(|code| code.to_ascii_lowercase());
313 if matches!(error_code.as_deref(), Some("nosuchcorsconfiguration")) {
314 return true;
315 }
316
317 if !is_missing_cors_configuration_error(error_text) {
318 return false;
319 }
320
321 status_code.is_none_or(|status| status == 404)
322}
323
324fn sdk_cors_rule_to_core(rule: &aws_sdk_s3::types::CorsRule) -> CorsRule {
325 CorsRule {
326 id: rule.id().map(str::to_string),
327 allowed_origins: rule.allowed_origins().to_vec(),
328 allowed_methods: rule.allowed_methods().to_vec(),
329 allowed_headers: normalize_optional_strings(Some(rule.allowed_headers().to_vec())),
330 expose_headers: normalize_optional_strings(Some(rule.expose_headers().to_vec())),
331 max_age_seconds: rule.max_age_seconds(),
332 }
333}
334
335fn core_cors_rule_to_sdk(rule: &CorsRule) -> Result<aws_sdk_s3::types::CorsRule> {
336 aws_sdk_s3::types::CorsRule::builder()
337 .set_id(rule.id.clone())
338 .set_allowed_origins(Some(rule.allowed_origins.clone()))
339 .set_allowed_methods(Some(
340 rule.allowed_methods
341 .iter()
342 .map(|method| method.to_ascii_uppercase())
343 .collect(),
344 ))
345 .set_allowed_headers(normalize_optional_strings(rule.allowed_headers.clone()))
346 .set_expose_headers(normalize_optional_strings(rule.expose_headers.clone()))
347 .set_max_age_seconds(rule.max_age_seconds)
348 .build()
349 .map_err(|e| Error::General(format!("build bucket cors rule: {e}")))
350}
351
352fn parse_cors_configuration_xml(body: &str) -> Result<Vec<CorsRule>> {
353 let config: CorsConfigurationXml =
354 from_xml_str(body).map_err(|e| Error::General(format!("parse bucket cors xml: {e}")))?;
355
356 Ok(config
357 .rules
358 .into_iter()
359 .map(|rule| CorsRule {
360 id: rule.id,
361 allowed_origins: rule.allowed_origins,
362 allowed_methods: rule.allowed_methods,
363 allowed_headers: normalize_optional_strings(Some(rule.allowed_headers)),
364 expose_headers: normalize_optional_strings(Some(rule.expose_headers)),
365 max_age_seconds: rule.max_age_seconds,
366 })
367 .collect())
368}
369
370fn parse_replication_configuration_xml(body: &str) -> Result<ReplicationConfiguration> {
371 let config: ReplicationConfigurationXml = from_xml_str(body)
372 .map_err(|e| Error::General(format!("parse replication config xml: {e}")))?;
373
374 let rules = config
375 .rules
376 .into_iter()
377 .map(|rule| rc_core::ReplicationRule {
378 id: rule.id.unwrap_or_default(),
379 priority: rule.priority.unwrap_or_default(),
380 status: parse_replication_rule_status(rule.status.as_deref()),
381 prefix: parse_replication_filter_prefix(rule.filter.as_ref()).or(rule.legacy_prefix),
382 tags: parse_replication_filter_tags(rule.filter.as_ref()),
383 destination: rc_core::ReplicationDestination {
384 bucket_arn: rule
385 .destination
386 .as_ref()
387 .and_then(|destination| destination.bucket.clone())
388 .unwrap_or_default(),
389 storage_class: rule
390 .destination
391 .and_then(|destination| destination.storage_class),
392 },
393 delete_marker_replication: parse_replication_status(
394 rule.delete_marker_replication.as_ref(),
395 ),
396 existing_object_replication: parse_replication_status(
397 rule.existing_object_replication.as_ref(),
398 ),
399 delete_replication: parse_replication_status(rule.delete_replication.as_ref()),
400 })
401 .collect();
402
403 Ok(ReplicationConfiguration {
404 role: config.role.unwrap_or_default(),
405 rules,
406 })
407}
408
409fn xml_escape(value: &str) -> String {
410 value
411 .replace('&', "&")
412 .replace('<', "<")
413 .replace('>', ">")
414 .replace('"', """)
415 .replace('\'', "'")
416}
417
418fn append_replication_status_tag(xml: &mut String, tag: &str, enabled: Option<bool>) {
419 if let Some(enabled) = enabled {
420 let status = if enabled { "Enabled" } else { "Disabled" };
421 xml.push('<');
422 xml.push_str(tag);
423 xml.push_str("><Status>");
424 xml.push_str(status);
425 xml.push_str("</Status></");
426 xml.push_str(tag);
427 xml.push('>');
428 }
429}
430
431fn build_replication_configuration_xml(config: &ReplicationConfiguration) -> String {
432 let mut xml = String::from(r#"<?xml version="1.0" encoding="UTF-8"?>"#);
433 xml.push_str(r#"<ReplicationConfiguration xmlns=""#);
434 xml.push_str(S3_REPLICATION_XML_NAMESPACE);
435 xml.push_str(r#"">"#);
436
437 if !config.role.is_empty() {
438 xml.push_str("<Role>");
439 xml.push_str(&xml_escape(&config.role));
440 xml.push_str("</Role>");
441 }
442
443 for rule in &config.rules {
444 xml.push_str("<Rule>");
445
446 xml.push_str("<Status>");
447 xml.push_str(match rule.status {
448 rc_core::ReplicationRuleStatus::Enabled => "Enabled",
449 rc_core::ReplicationRuleStatus::Disabled => "Disabled",
450 });
451 xml.push_str("</Status>");
452
453 xml.push_str("<Destination><Bucket>");
454 xml.push_str(&xml_escape(&rule.destination.bucket_arn));
455 xml.push_str("</Bucket>");
456 if let Some(storage_class) = &rule.destination.storage_class {
457 xml.push_str("<StorageClass>");
458 xml.push_str(&xml_escape(storage_class));
459 xml.push_str("</StorageClass>");
460 }
461 xml.push_str("</Destination>");
462
463 if !rule.id.is_empty() {
464 xml.push_str("<ID>");
465 xml.push_str(&xml_escape(&rule.id));
466 xml.push_str("</ID>");
467 }
468
469 xml.push_str("<Priority>");
470 xml.push_str(&rule.priority.to_string());
471 xml.push_str("</Priority>");
472
473 append_replication_filter_xml(&mut xml, rule.prefix.as_deref(), rule.tags.as_ref());
474
475 append_replication_status_tag(
476 &mut xml,
477 "ExistingObjectReplication",
478 rule.existing_object_replication,
479 );
480 append_replication_status_tag(
481 &mut xml,
482 "DeleteMarkerReplication",
483 rule.delete_marker_replication,
484 );
485 append_replication_status_tag(&mut xml, "DeleteReplication", rule.delete_replication);
486
487 xml.push_str("</Rule>");
488 }
489
490 xml.push_str("</ReplicationConfiguration>");
491 xml
492}
493
494fn parse_lifecycle_filter_prefix(
495 filter: Option<&aws_sdk_s3::types::LifecycleRuleFilter>,
496) -> Option<String> {
497 filter
498 .and_then(|filter| filter.prefix().map(str::to_string))
499 .or_else(|| filter.and_then(|filter| filter.and()?.prefix().map(str::to_string)))
500}
501
502fn parse_lifecycle_filter_tags(
503 filter: Option<&aws_sdk_s3::types::LifecycleRuleFilter>,
504) -> Option<HashMap<String, String>> {
505 filter
506 .and_then(|filter| collect_tag_map(filter.tag().map(|tag| (tag.key(), tag.value()))))
507 .or_else(|| {
508 filter.and_then(|filter| {
509 collect_tag_map(
510 filter
511 .and()?
512 .tags()
513 .iter()
514 .map(|tag| (tag.key(), tag.value())),
515 )
516 })
517 })
518}
519
520fn build_s3_tag(key: &str, value: &str) -> Result<aws_sdk_s3::types::Tag> {
521 aws_sdk_s3::types::Tag::builder()
522 .key(key)
523 .value(value)
524 .build()
525 .map_err(|error| Error::General(format!("build filter tag: {error}")))
526}
527
528fn build_lifecycle_rule_filter(
529 prefix: Option<&str>,
530 tags: Option<&HashMap<String, String>>,
531) -> Result<Option<aws_sdk_s3::types::LifecycleRuleFilter>> {
532 let Some(tags) = tags.filter(|tags| !tags.is_empty()) else {
533 return Ok(prefix.map(|prefix| {
534 aws_sdk_s3::types::LifecycleRuleFilter::builder()
535 .prefix(prefix)
536 .build()
537 }));
538 };
539
540 let tag_values = sorted_tags(tags)
541 .into_iter()
542 .map(|(key, value)| build_s3_tag(key, value))
543 .collect::<Result<Vec<_>>>()?;
544
545 let filter = if prefix.is_some() || tag_values.len() > 1 {
546 let mut and_builder = aws_sdk_s3::types::LifecycleRuleAndOperator::builder();
547 if let Some(prefix) = prefix {
548 and_builder = and_builder.prefix(prefix);
549 }
550 for tag in tag_values {
551 and_builder = and_builder.tags(tag);
552 }
553 aws_sdk_s3::types::LifecycleRuleFilter::builder()
554 .and(and_builder.build())
555 .build()
556 } else {
557 aws_sdk_s3::types::LifecycleRuleFilter::builder()
558 .tag(
559 tag_values
560 .into_iter()
561 .next()
562 .expect("non-empty tags required to build lifecycle filter"),
563 )
564 .build()
565 };
566
567 Ok(Some(filter))
568}
569
570impl HttpConnector for ReqwestConnector {
571 fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
572 let client = self.client.clone();
573 HttpConnectorFuture::new(async move {
574 let uri = request.uri().to_string();
576 let method_str = request.method().to_string();
577 let headers = request.headers().clone();
578
579 let body_bytes = match request.body().bytes() {
584 Some(b) => Bytes::copy_from_slice(b),
585 None => {
586 return Err(ConnectorError::user(
587 "Streaming request bodies are not supported in insecure/ca_bundle TLS mode; \
588 use in-memory data for uploads with this connector"
589 .into(),
590 ));
591 }
592 };
593
594 let method = reqwest::Method::from_bytes(method_str.as_bytes())
596 .map_err(|e| ConnectorError::user(Box::new(e)))?;
597
598 let url = reqwest::Url::parse(&uri).map_err(|e| ConnectorError::user(Box::new(e)))?;
600
601 let mut req = reqwest::Request::new(method, url);
603
604 for (name, value) in headers.iter() {
606 match (
607 reqwest::header::HeaderName::from_bytes(name.as_bytes()),
608 reqwest::header::HeaderValue::from_bytes(value.as_bytes()),
609 ) {
610 (Ok(header_name), Ok(header_value)) => {
611 req.headers_mut().append(header_name, header_value);
612 }
613 _ => {
614 tracing::warn!("Skipping non-convertible request header: {}", name);
615 }
616 }
617 }
618
619 *req.body_mut() = Some(reqwest::Body::from(body_bytes));
621
622 let resp = client
624 .execute(req)
625 .await
626 .map_err(|e| ConnectorError::io(Box::new(e)))?;
627
628 let status = StatusCode::try_from(resp.status().as_u16())
630 .map_err(|e| ConnectorError::other(Box::new(e), None))?;
631 let resp_headers = resp.headers().clone();
632 let body = resp
633 .bytes()
634 .await
635 .map_err(|e| ConnectorError::io(Box::new(e)))?;
636
637 let mut sdk_response = Response::new(status, SdkBody::from(body));
638 for (name, value) in &resp_headers {
639 match value.to_str() {
640 Ok(value_str) => {
641 sdk_response
642 .headers_mut()
643 .append(name.as_str().to_owned(), value_str.to_owned());
644 }
645 Err(_) => {
646 tracing::warn!("Skipping non-UTF8 response header: {}", name.as_str());
647 }
648 }
649 }
650
651 Ok(sdk_response)
652 })
653 }
654}
655
656impl HttpClient for ReqwestConnector {
657 fn http_connector(
658 &self,
659 _settings: &HttpConnectorSettings,
660 _components: &RuntimeComponents,
661 ) -> SharedHttpConnector {
662 SharedHttpConnector::new(self.clone())
668 }
669}
670
671pub struct S3Client {
673 inner: aws_sdk_s3::Client,
674 xml_http_client: reqwest::Client,
675 alias: Alias,
676}
677
678#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
680pub struct DeleteRequestOptions {
681 pub force_delete: bool,
683}
684
685impl S3Client {
686 pub async fn new(alias: Alias) -> Result<Self> {
688 let endpoint = alias.endpoint.clone();
689 let region = alias.region.clone();
690 let access_key = alias.access_key.clone();
691 let secret_key = alias.secret_key.clone();
692
693 let credentials = aws_credential_types::Credentials::new(
695 access_key,
696 secret_key,
697 None, None, "rc-static-credentials",
700 );
701
702 let mut config_loader = aws_config::defaults(aws_config::BehaviorVersion::latest())
704 .credentials_provider(credentials)
705 .region(aws_config::Region::new(region))
706 .endpoint_url(&endpoint);
707
708 if alias.insecure || alias.ca_bundle.is_some() {
711 let connector =
712 ReqwestConnector::new(alias.insecure, alias.ca_bundle.as_deref()).await?;
713 config_loader = config_loader.http_client(connector);
714 }
715
716 let xml_http_client =
717 build_reqwest_client(alias.insecure, alias.ca_bundle.as_deref()).await?;
718 let config = config_loader.load().await;
719
720 let s3_config = aws_sdk_s3::config::Builder::from(&config)
722 .force_path_style(force_path_style_for_alias(&alias))
723 .request_checksum_calculation(
726 aws_sdk_s3::config::RequestChecksumCalculation::WhenRequired,
727 )
728 .response_checksum_validation(
729 aws_sdk_s3::config::ResponseChecksumValidation::WhenRequired,
730 )
731 .build();
732
733 let client = aws_sdk_s3::Client::from_conf(s3_config);
734
735 Ok(Self {
736 inner: client,
737 xml_http_client,
738 alias,
739 })
740 }
741
742 pub fn inner(&self) -> &aws_sdk_s3::Client {
744 &self.inner
745 }
746
747 pub async fn list_object_versions_page(
749 &self,
750 path: &RemotePath,
751 max_keys: Option<i32>,
752 ) -> Result<ObjectVersionListResult> {
753 let mut builder = self.inner.list_object_versions().bucket(&path.bucket);
754
755 if !path.key.is_empty() {
756 builder = builder.prefix(&path.key);
757 }
758
759 if let Some(max) = max_keys {
760 builder = builder.max_keys(max);
761 }
762
763 let response = builder.send().await.map_err(|e| {
764 let err_str = e.to_string();
765 if err_str.contains("NotFound") || err_str.contains("NoSuchBucket") {
766 Error::NotFound(format!("Bucket not found: {}", path.bucket))
767 } else {
768 Error::General(format!("list_object_versions: {e}"))
769 }
770 })?;
771
772 let mut items = Vec::new();
773
774 for v in response.versions() {
775 items.push(ObjectVersion {
776 key: v.key().unwrap_or_default().to_string(),
777 version_id: v.version_id().unwrap_or("null").to_string(),
778 is_latest: v.is_latest().unwrap_or(false),
779 is_delete_marker: false,
780 last_modified: v
781 .last_modified()
782 .and_then(|dt| Timestamp::from_second(dt.secs()).ok()),
783 size_bytes: v.size(),
784 etag: v.e_tag().map(|s| s.trim_matches('"').to_string()),
785 });
786 }
787
788 for m in response.delete_markers() {
789 items.push(ObjectVersion {
790 key: m.key().unwrap_or_default().to_string(),
791 version_id: m.version_id().unwrap_or("null").to_string(),
792 is_latest: m.is_latest().unwrap_or(false),
793 is_delete_marker: true,
794 last_modified: m
795 .last_modified()
796 .and_then(|dt| Timestamp::from_second(dt.secs()).ok()),
797 size_bytes: None,
798 etag: None,
799 });
800 }
801
802 items.sort_by(|a, b| {
803 a.key
804 .cmp(&b.key)
805 .then_with(|| b.last_modified.cmp(&a.last_modified))
806 });
807
808 Ok(ObjectVersionListResult {
809 items,
810 truncated: response.is_truncated().unwrap_or(false),
811 continuation_token: response.next_key_marker().map(ToString::to_string),
812 version_id_marker: response.next_version_id_marker().map(ToString::to_string),
813 })
814 }
815
816 pub async fn get_object_with_progress(
818 &self,
819 path: &RemotePath,
820 mut on_progress: impl FnMut(u64, Option<u64>) + Send,
821 ) -> Result<Vec<u8>> {
822 let response = self
823 .inner
824 .get_object()
825 .bucket(&path.bucket)
826 .key(&path.key)
827 .send()
828 .await
829 .map_err(|e| {
830 let err_str = e.to_string();
831 if err_str.contains("NotFound") || err_str.contains("NoSuchKey") {
832 Error::NotFound(path.to_string())
833 } else {
834 Error::Network(err_str)
835 }
836 })?;
837
838 let content_length = response
839 .content_length()
840 .and_then(|length| u64::try_from(length).ok());
841 let mut data = Vec::with_capacity(
842 content_length
843 .and_then(|length| usize::try_from(length).ok())
844 .unwrap_or_default(),
845 );
846 let mut body = response.body;
847 let mut bytes_downloaded = 0u64;
848
849 while let Some(chunk) = body
850 .try_next()
851 .await
852 .map_err(|e| Error::Network(e.to_string()))?
853 {
854 bytes_downloaded += chunk.len() as u64;
855 data.extend_from_slice(&chunk);
856 on_progress(bytes_downloaded, content_length);
857 }
858
859 Ok(data)
860 }
861
862 pub async fn delete_object_with_options(
864 &self,
865 path: &RemotePath,
866 options: DeleteRequestOptions,
867 ) -> Result<()> {
868 let mut request = self
869 .inner
870 .delete_object()
871 .bucket(&path.bucket)
872 .key(&path.key)
873 .customize();
874
875 if options.force_delete {
876 request = request.mutate_request(|request| {
877 request
878 .headers_mut()
879 .insert(RUSTFS_FORCE_DELETE_HEADER, "true");
880 });
881 }
882
883 request.send().await.map_err(|e| {
884 let err_str = Self::format_sdk_error(&e);
885 let is_missing_key = if let aws_sdk_s3::error::SdkError::ServiceError(service_err) = &e
886 {
887 let code = service_err.err().code().or_else(|| {
888 service_err
889 .raw()
890 .headers()
891 .get("x-amz-error-code")
892 .and_then(|value| std::str::from_utf8(value.as_bytes()).ok())
893 });
894 matches!(code, Some("NoSuchKey") | Some("NotFound"))
895 || service_err.raw().status().as_u16() == 404
896 } else {
897 err_str.contains("NotFound") || err_str.contains("NoSuchKey")
898 };
899
900 if is_missing_key {
901 Error::NotFound(path.to_string())
902 } else {
903 Error::Network(err_str)
904 }
905 })?;
906
907 Ok(())
908 }
909
910 pub async fn delete_objects_with_options(
912 &self,
913 bucket: &str,
914 keys: Vec<String>,
915 options: DeleteRequestOptions,
916 ) -> Result<Vec<String>> {
917 use aws_sdk_s3::types::{Delete, ObjectIdentifier};
918
919 if keys.is_empty() {
920 return Ok(vec![]);
921 }
922
923 let objects: Vec<ObjectIdentifier> =
924 keys.iter()
925 .map(|key| {
926 ObjectIdentifier::builder().key(key).build().map_err(|e| {
927 Error::General(format!("invalid delete object identifier: {e}"))
928 })
929 })
930 .collect::<Result<Vec<_>>>()?;
931
932 let delete = Delete::builder()
933 .set_objects(Some(objects))
934 .build()
935 .map_err(|e| Error::General(e.to_string()))?;
936
937 let mut request = self
938 .inner
939 .delete_objects()
940 .bucket(bucket)
941 .delete(delete)
942 .customize();
943
944 if options.force_delete {
945 request = request.mutate_request(|request| {
946 request
947 .headers_mut()
948 .insert(RUSTFS_FORCE_DELETE_HEADER, "true");
949 });
950 }
951
952 let response = request
953 .send()
954 .await
955 .map_err(|e| Error::Network(e.to_string()))?;
956
957 let deleted: Vec<String> = response
958 .deleted()
959 .iter()
960 .filter_map(|d| d.key().map(|k| k.to_string()))
961 .collect();
962
963 if !response.errors().is_empty() {
964 let error_keys: Vec<String> = response
965 .errors()
966 .iter()
967 .filter_map(|e| e.key().map(|k| k.to_string()))
968 .collect();
969 tracing::warn!("Failed to delete some objects: {:?}", error_keys);
970 }
971
972 Ok(deleted)
973 }
974
975 fn format_sdk_error<E: std::fmt::Display>(error: &aws_sdk_s3::error::SdkError<E>) -> String {
977 match error {
978 aws_sdk_s3::error::SdkError::ServiceError(service_err) => {
979 let err = service_err.err();
980 let meta = service_err.raw();
981 let mut msg = format!("Service error: {}", err);
982 if let Some(code) = meta.headers().get("x-amz-error-code")
984 && let Ok(code_str) = std::str::from_utf8(code.as_bytes())
985 {
986 msg.push_str(&format!(" (code: {})", code_str));
987 }
988 msg
989 }
990 aws_sdk_s3::error::SdkError::ConstructionFailure(err) => {
991 format!("Request construction failed: {:?}", err)
992 }
993 aws_sdk_s3::error::SdkError::TimeoutError(_) => "Request timeout".to_string(),
994 aws_sdk_s3::error::SdkError::DispatchFailure(err) => {
995 format!("Network dispatch error: {:?}", err)
996 }
997 aws_sdk_s3::error::SdkError::ResponseError(err) => {
998 format!("Response error: {:?}", err)
999 }
1000 _ => error.to_string(),
1001 }
1002 }
1003
1004 fn should_use_multipart(file_size: u64) -> bool {
1005 file_size > SINGLE_PUT_OBJECT_MAX_SIZE
1006 }
1007
1008 fn sha256_hash(body: &[u8]) -> String {
1009 let mut hasher = Sha256::new();
1010 hasher.update(body);
1011 hex::encode(hasher.finalize())
1012 }
1013
1014 fn request_host(&self, url: &reqwest::Url) -> Result<String> {
1015 let host = url
1016 .host_str()
1017 .ok_or_else(|| Error::Network("Missing host in request URL".to_string()))?;
1018 Ok(match url.port() {
1019 Some(port) => format!("{host}:{port}"),
1020 None => host.to_string(),
1021 })
1022 }
1023
1024 fn replication_url(&self, bucket: &str) -> Result<reqwest::Url> {
1025 let mut url =
1026 reqwest::Url::parse(self.alias.endpoint.trim_end_matches('/')).map_err(|e| {
1027 Error::Network(format!("Invalid endpoint '{}': {e}", self.alias.endpoint))
1028 })?;
1029
1030 {
1031 let mut segments = url.path_segments_mut().map_err(|_| {
1032 Error::Network(format!(
1033 "Endpoint '{}' does not support path-style bucket operations",
1034 self.alias.endpoint
1035 ))
1036 })?;
1037 segments.pop_if_empty();
1038 segments.push(bucket);
1039 }
1040
1041 url.set_query(Some("replication="));
1042 Ok(url)
1043 }
1044
1045 fn cors_url(&self, bucket: &str) -> Result<reqwest::Url> {
1046 let mut url =
1047 reqwest::Url::parse(self.alias.endpoint.trim_end_matches('/')).map_err(|e| {
1048 Error::Network(format!("Invalid endpoint '{}': {e}", self.alias.endpoint))
1049 })?;
1050
1051 {
1052 let mut segments = url.path_segments_mut().map_err(|_| {
1053 Error::Network(format!(
1054 "Endpoint '{}' does not support path-style bucket operations",
1055 self.alias.endpoint
1056 ))
1057 })?;
1058 segments.pop_if_empty();
1059 segments.push(bucket);
1060 }
1061
1062 url.set_query(Some("cors="));
1063 Ok(url)
1064 }
1065
1066 async fn sign_xml_request(
1067 &self,
1068 method: &Method,
1069 url: &str,
1070 headers: &HeaderMap,
1071 body: &[u8],
1072 ) -> Result<HeaderMap> {
1073 let credentials = Credentials::new(
1074 &self.alias.access_key,
1075 &self.alias.secret_key,
1076 None,
1077 None,
1078 "s3-xml-client",
1079 );
1080
1081 let identity = credentials.into();
1082 let mut signing_settings = SigningSettings::default();
1083 signing_settings.signature_location = SignatureLocation::Headers;
1084
1085 let signing_params = v4::SigningParams::builder()
1086 .identity(&identity)
1087 .region(&self.alias.region)
1088 .name(S3_SERVICE_NAME)
1089 .time(std::time::SystemTime::now())
1090 .settings(signing_settings)
1091 .build()
1092 .map_err(|e| Error::Auth(format!("Failed to build signing params: {e}")))?;
1093
1094 let header_pairs: Vec<(&str, &str)> = headers
1095 .iter()
1096 .filter_map(|(k, v)| v.to_str().ok().map(|v| (k.as_str(), v)))
1097 .collect();
1098
1099 let signable_request = SignableRequest::new(
1100 method.as_str(),
1101 url,
1102 header_pairs.into_iter(),
1103 SignableBody::Bytes(body),
1104 )
1105 .map_err(|e| Error::Auth(format!("Failed to create signable request: {e}")))?;
1106
1107 let (signing_instructions, _) = sign(signable_request, &signing_params.into())
1108 .map_err(|e| Error::Auth(format!("Failed to sign request: {e}")))?
1109 .into_parts();
1110
1111 let mut signed_headers = headers.clone();
1112 for (name, value) in signing_instructions.headers() {
1113 let header_name = HeaderName::try_from(name.to_string())
1114 .map_err(|e| Error::Auth(format!("Invalid header name: {e}")))?;
1115 let header_value = HeaderValue::try_from(value.to_string())
1116 .map_err(|e| Error::Auth(format!("Invalid header value: {e}")))?;
1117 signed_headers.insert(header_name, header_value);
1118 }
1119
1120 Ok(signed_headers)
1121 }
1122
1123 async fn xml_request(
1124 &self,
1125 method: Method,
1126 url: reqwest::Url,
1127 content_type: Option<&str>,
1128 body: Option<Vec<u8>>,
1129 ) -> Result<String> {
1130 let body = body.unwrap_or_default();
1131 let mut headers = HeaderMap::new();
1132 headers.insert(
1133 "x-amz-content-sha256",
1134 HeaderValue::from_str(&Self::sha256_hash(&body))
1135 .map_err(|e| Error::Auth(format!("Invalid content hash header: {e}")))?,
1136 );
1137 headers.insert(
1138 "host",
1139 HeaderValue::from_str(&self.request_host(&url)?)
1140 .map_err(|e| Error::Auth(format!("Invalid host header: {e}")))?,
1141 );
1142
1143 if let Some(content_type) = content_type {
1144 headers.insert(
1145 CONTENT_TYPE,
1146 HeaderValue::from_str(content_type)
1147 .map_err(|e| Error::Auth(format!("Invalid content type header: {e}")))?,
1148 );
1149 }
1150
1151 let signed_headers = self
1152 .sign_xml_request(&method, url.as_str(), &headers, &body)
1153 .await?;
1154
1155 let mut request_builder = self.xml_http_client.request(method, url);
1156 for (name, value) in &signed_headers {
1157 request_builder = request_builder.header(name, value);
1158 }
1159 if !body.is_empty() {
1160 request_builder = request_builder.body(body);
1161 }
1162
1163 let response = request_builder
1164 .send()
1165 .await
1166 .map_err(|e| Error::Network(format!("Request failed: {e}")))?;
1167
1168 let status = response.status();
1169 let text = response
1170 .text()
1171 .await
1172 .map_err(|e| Error::Network(format!("Failed to read response: {e}")))?;
1173
1174 if !status.is_success() {
1175 return Err(Error::Network(format!(
1176 "HTTP {}: {}",
1177 status.as_u16(),
1178 text
1179 )));
1180 }
1181
1182 Ok(text)
1183 }
1184
1185 fn bucket_policy_error_kind(
1186 error_code: Option<&str>,
1187 status_code: Option<u16>,
1188 error_text: &str,
1189 ) -> BucketPolicyErrorKind {
1190 let error_code = error_code.map(|code| code.to_ascii_lowercase());
1191 if matches!(
1192 error_code.as_deref(),
1193 Some("nosuchbucketpolicy") | Some("nosuchpolicy")
1194 ) {
1195 return BucketPolicyErrorKind::MissingPolicy;
1196 }
1197 if matches!(error_code.as_deref(), Some("nosuchbucket")) {
1198 return BucketPolicyErrorKind::MissingBucket;
1199 }
1200
1201 let error_text = error_text.to_ascii_lowercase();
1202 if error_text.contains("nosuchbucketpolicy") || error_text.contains("nosuchpolicy") {
1203 return BucketPolicyErrorKind::MissingPolicy;
1204 }
1205 if error_text.contains("nosuchbucket") {
1206 return BucketPolicyErrorKind::MissingBucket;
1207 }
1208 if status_code == Some(404) {
1209 return BucketPolicyErrorKind::MissingPolicy;
1210 }
1211
1212 BucketPolicyErrorKind::Other
1213 }
1214
1215 fn map_get_bucket_policy_error(
1216 bucket: &str,
1217 kind: BucketPolicyErrorKind,
1218 error_text: &str,
1219 ) -> Result<Option<String>> {
1220 match kind {
1221 BucketPolicyErrorKind::MissingPolicy => Ok(None),
1222 BucketPolicyErrorKind::MissingBucket => {
1223 Err(Error::NotFound(format!("Bucket not found: {bucket}")))
1224 }
1225 BucketPolicyErrorKind::Other => {
1226 Err(Error::Network(format!("get_bucket_policy: {error_text}")))
1227 }
1228 }
1229 }
1230
1231 fn map_delete_bucket_policy_error(
1232 bucket: &str,
1233 kind: BucketPolicyErrorKind,
1234 error_text: &str,
1235 ) -> Result<()> {
1236 match kind {
1237 BucketPolicyErrorKind::MissingPolicy => Ok(()),
1238 BucketPolicyErrorKind::MissingBucket => {
1239 Err(Error::NotFound(format!("Bucket not found: {bucket}")))
1240 }
1241 BucketPolicyErrorKind::Other => Err(Error::General(format!(
1242 "delete_bucket_policy: {error_text}"
1243 ))),
1244 }
1245 }
1246
1247 fn extract_notification_filter(
1248 filter: Option<&aws_sdk_s3::types::NotificationConfigurationFilter>,
1249 ) -> (Option<String>, Option<String>) {
1250 let mut prefix = None;
1251 let mut suffix = None;
1252
1253 if let Some(key_filter) = filter.and_then(|value| value.key()) {
1254 for rule in key_filter.filter_rules() {
1255 match rule.name().map(|name| name.as_str()) {
1256 Some("prefix") => {
1257 prefix = rule.value().map(ToString::to_string);
1258 }
1259 Some("suffix") => {
1260 suffix = rule.value().map(ToString::to_string);
1261 }
1262 _ => {}
1263 }
1264 }
1265 }
1266
1267 (prefix, suffix)
1268 }
1269
1270 fn build_notification_filter(
1271 prefix: Option<&str>,
1272 suffix: Option<&str>,
1273 ) -> Option<aws_sdk_s3::types::NotificationConfigurationFilter> {
1274 use aws_sdk_s3::types::{FilterRule, FilterRuleName, NotificationConfigurationFilter};
1275
1276 let mut rules = Vec::new();
1277 if let Some(value) = prefix {
1278 let rule = FilterRule::builder()
1279 .name(FilterRuleName::Prefix)
1280 .value(value)
1281 .build();
1282 rules.push(rule);
1283 }
1284 if let Some(value) = suffix {
1285 let rule = FilterRule::builder()
1286 .name(FilterRuleName::Suffix)
1287 .value(value)
1288 .build();
1289 rules.push(rule);
1290 }
1291 if rules.is_empty() {
1292 return None;
1293 }
1294
1295 let key_filter = aws_sdk_s3::types::S3KeyFilter::builder()
1296 .set_filter_rules(Some(rules))
1297 .build();
1298 NotificationConfigurationFilter::builder()
1299 .key(key_filter)
1300 .build()
1301 .into()
1302 }
1303
1304 fn event_list_to_strings(events: &[aws_sdk_s3::types::Event]) -> Vec<String> {
1305 events
1306 .iter()
1307 .map(|event| event.as_str().to_string())
1308 .collect()
1309 }
1310
1311 fn strings_to_event_list(events: &[String]) -> Vec<aws_sdk_s3::types::Event> {
1312 events
1313 .iter()
1314 .map(|event| aws_sdk_s3::types::Event::from(event.as_str()))
1315 .collect()
1316 }
1317
1318 fn notifications_equivalent(
1319 expected: &[BucketNotification],
1320 actual: &[BucketNotification],
1321 ) -> bool {
1322 type CanonicalEntry = (u8, String, Option<String>, Option<String>, Vec<String>);
1323
1324 fn target_order(target: NotificationTarget) -> u8 {
1325 match target {
1326 NotificationTarget::Queue => 0,
1327 NotificationTarget::Topic => 1,
1328 NotificationTarget::Lambda => 2,
1329 }
1330 }
1331
1332 fn canonical(notifications: &[BucketNotification]) -> Vec<CanonicalEntry> {
1333 let mut normalized: Vec<CanonicalEntry> = notifications
1334 .iter()
1335 .map(|item| {
1336 let mut events = item.events.clone();
1337 events.sort();
1338 events.dedup();
1339 (
1340 target_order(item.target),
1341 item.arn.clone(),
1342 item.prefix.clone(),
1343 item.suffix.clone(),
1344 events,
1345 )
1346 })
1347 .collect();
1348 normalized.sort();
1349 normalized
1350 }
1351
1352 canonical(expected) == canonical(actual)
1353 }
1354
1355 async fn read_next_part(
1356 file: &mut tokio::fs::File,
1357 file_path: &std::path::Path,
1358 buffer: &mut [u8],
1359 ) -> Result<usize> {
1360 let mut total_read = 0usize;
1361 while total_read < buffer.len() {
1362 let bytes_read = file
1363 .read(&mut buffer[total_read..])
1364 .await
1365 .map_err(|e| Error::General(format!("read file '{}': {e}", file_path.display())))?;
1366 if bytes_read == 0 {
1367 break;
1368 }
1369 total_read += bytes_read;
1370 }
1371 Ok(total_read)
1372 }
1373
1374 async fn put_object_single_part_from_path(
1375 &self,
1376 path: &RemotePath,
1377 file_path: &std::path::Path,
1378 content_type: Option<&str>,
1379 file_size: u64,
1380 ) -> Result<ObjectInfo> {
1381 let data = tokio::fs::read(file_path)
1382 .await
1383 .map_err(|e| Error::General(format!("read file '{}': {e}", file_path.display())))?;
1384 let body = aws_sdk_s3::primitives::ByteStream::from(data);
1385
1386 let mut request = self
1387 .inner
1388 .put_object()
1389 .bucket(&path.bucket)
1390 .key(&path.key)
1391 .body(body);
1392
1393 if let Some(ct) = content_type {
1394 request = request.content_type(ct);
1395 }
1396
1397 let response = request
1398 .send()
1399 .await
1400 .map_err(|e| Error::Network(e.to_string()))?;
1401
1402 let mut info = ObjectInfo::file(&path.key, file_size as i64);
1403 if let Some(etag) = response.e_tag() {
1404 info.etag = Some(etag.trim_matches('"').to_string());
1405 }
1406 info.last_modified = Some(jiff::Timestamp::now());
1407
1408 Ok(info)
1409 }
1410
1411 async fn abort_multipart_upload_best_effort(&self, path: &RemotePath, upload_id: &str) {
1412 let _ = self
1413 .inner
1414 .abort_multipart_upload()
1415 .bucket(&path.bucket)
1416 .key(&path.key)
1417 .upload_id(upload_id)
1418 .send()
1419 .await;
1420 }
1421
1422 async fn put_object_multipart_from_path(
1423 &self,
1424 path: &RemotePath,
1425 file_path: &std::path::Path,
1426 content_type: Option<&str>,
1427 file_size: u64,
1428 on_progress: impl Fn(u64) + Send,
1429 ) -> Result<ObjectInfo> {
1430 use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart};
1431
1432 let config = crate::multipart::MultipartConfig::default();
1433 let part_size = config.calculate_part_size(file_size);
1434 let part_buffer_size = usize::try_from(part_size)
1435 .map_err(|_| Error::General(format!("invalid part size: {part_size}")))?;
1436
1437 tracing::debug!(file_size, part_size, "Starting multipart upload");
1438
1439 let mut create_request = self
1440 .inner
1441 .create_multipart_upload()
1442 .bucket(&path.bucket)
1443 .key(&path.key);
1444
1445 if let Some(ct) = content_type {
1446 create_request = create_request.content_type(ct);
1447 }
1448
1449 let create_response = create_request
1450 .send()
1451 .await
1452 .map_err(|e| Error::Network(format!("create multipart upload: {e}")))?;
1453
1454 let upload_id = create_response
1455 .upload_id()
1456 .ok_or_else(|| Error::General("missing upload id from multipart upload".to_string()))?
1457 .to_string();
1458
1459 tracing::debug!(upload_id = %upload_id, "Multipart upload initiated");
1460
1461 let mut file = tokio::fs::File::open(file_path)
1462 .await
1463 .map_err(|e| Error::General(format!("open file '{}': {e}", file_path.display())))?;
1464 let mut completed_parts = Vec::new();
1465 let mut part_number: i32 = 1;
1466 let mut chunk = vec![0u8; part_buffer_size];
1467 let mut bytes_uploaded: u64 = 0;
1468
1469 loop {
1470 let bytes_read = Self::read_next_part(&mut file, file_path, &mut chunk).await?;
1471 if bytes_read == 0 {
1472 break;
1473 }
1474
1475 tracing::debug!(part_number, bytes_read, "Uploading part");
1476
1477 let body = aws_sdk_s3::primitives::ByteStream::from(chunk[..bytes_read].to_vec());
1478 let upload_part_result = self
1479 .inner
1480 .upload_part()
1481 .bucket(&path.bucket)
1482 .key(&path.key)
1483 .upload_id(&upload_id)
1484 .part_number(part_number)
1485 .body(body)
1486 .send()
1487 .await;
1488
1489 let upload_part_response = match upload_part_result {
1490 Ok(response) => response,
1491 Err(e) => {
1492 tracing::debug!(
1493 upload_id = %upload_id,
1494 part_number,
1495 "Aborting multipart upload due to error"
1496 );
1497 self.abort_multipart_upload_best_effort(path, &upload_id)
1498 .await;
1499 return Err(Error::Network(format!(
1500 "upload multipart part {part_number}: {e}"
1501 )));
1502 }
1503 };
1504
1505 let etag = match upload_part_response.e_tag() {
1506 Some(value) => value.trim_matches('"').to_string(),
1507 None => {
1508 self.abort_multipart_upload_best_effort(path, &upload_id)
1509 .await;
1510 return Err(Error::General(format!(
1511 "missing ETag for multipart part {part_number}"
1512 )));
1513 }
1514 };
1515
1516 completed_parts.push(
1517 CompletedPart::builder()
1518 .part_number(part_number)
1519 .e_tag(etag)
1520 .build(),
1521 );
1522
1523 bytes_uploaded += bytes_read as u64;
1524 on_progress(bytes_uploaded);
1525 tracing::debug!(part_number, bytes_uploaded, "Part uploaded");
1526
1527 part_number += 1;
1528 }
1529
1530 let completed_upload = CompletedMultipartUpload::builder()
1531 .set_parts(Some(completed_parts))
1532 .build();
1533 let complete_result = self
1534 .inner
1535 .complete_multipart_upload()
1536 .bucket(&path.bucket)
1537 .key(&path.key)
1538 .upload_id(&upload_id)
1539 .multipart_upload(completed_upload)
1540 .send()
1541 .await;
1542
1543 let complete_response = match complete_result {
1544 Ok(response) => response,
1545 Err(e) => {
1546 tracing::debug!(upload_id = %upload_id, "Attempting to abort multipart upload after completion failure");
1547 self.abort_multipart_upload_best_effort(path, &upload_id)
1548 .await;
1549 return Err(Error::Network(format!("complete multipart upload: {e}")));
1550 }
1551 };
1552
1553 tracing::debug!("Multipart upload completed");
1554
1555 let mut info = ObjectInfo::file(&path.key, file_size as i64);
1556 if let Some(etag) = complete_response.e_tag() {
1557 info.etag = Some(etag.trim_matches('"').to_string());
1558 }
1559 info.last_modified = Some(jiff::Timestamp::now());
1560
1561 Ok(info)
1562 }
1563
1564 pub async fn put_object_from_path(
1569 &self,
1570 path: &RemotePath,
1571 file_path: &std::path::Path,
1572 content_type: Option<&str>,
1573 on_progress: impl Fn(u64) + Send,
1574 ) -> Result<ObjectInfo> {
1575 let metadata = tokio::fs::metadata(file_path).await.map_err(|e| {
1576 Error::General(format!("read metadata for '{}': {e}", file_path.display()))
1577 })?;
1578 if !metadata.is_file() {
1579 return Err(Error::General(format!(
1580 "source is not a file: {}",
1581 file_path.display()
1582 )));
1583 }
1584
1585 let file_size = metadata.len();
1586 if Self::should_use_multipart(file_size) {
1587 self.put_object_multipart_from_path(
1588 path,
1589 file_path,
1590 content_type,
1591 file_size,
1592 on_progress,
1593 )
1594 .await
1595 } else {
1596 self.put_object_single_part_from_path(path, file_path, content_type, file_size)
1597 .await
1598 }
1599 }
1600}
1601
1602fn build_tagging(
1603 tags: std::collections::HashMap<String, String>,
1604) -> Result<aws_sdk_s3::types::Tagging> {
1605 use aws_sdk_s3::types::{Tag, Tagging};
1606
1607 let mut tag_set = Vec::with_capacity(tags.len());
1608 for (key, value) in tags {
1609 let tag = Tag::builder()
1610 .key(key)
1611 .value(value)
1612 .build()
1613 .map_err(|e| Error::General(format!("invalid tag: {e}")))?;
1614 tag_set.push(tag);
1615 }
1616
1617 Tagging::builder()
1618 .set_tag_set(Some(tag_set))
1619 .build()
1620 .map_err(|e| Error::General(format!("invalid tagging payload: {e}")))
1621}
1622
1623#[async_trait]
1624impl ObjectStore for S3Client {
1625 async fn list_buckets(&self) -> Result<Vec<ObjectInfo>> {
1626 let response = self
1627 .inner
1628 .list_buckets()
1629 .send()
1630 .await
1631 .map_err(|e| Error::Network(e.to_string()))?;
1632
1633 let buckets = response
1634 .buckets()
1635 .iter()
1636 .map(|b| {
1637 let mut info = ObjectInfo::bucket(b.name().unwrap_or_default());
1638 if let Some(creation_date) = b.creation_date() {
1639 info.last_modified = jiff::Timestamp::from_second(creation_date.secs()).ok();
1640 }
1641 info
1642 })
1643 .collect();
1644
1645 Ok(buckets)
1646 }
1647
1648 async fn list_objects(&self, path: &RemotePath, options: ListOptions) -> Result<ListResult> {
1649 let mut request = self.inner.list_objects_v2().bucket(&path.bucket);
1650
1651 let prefix = if path.key.is_empty() {
1653 options.prefix.clone()
1654 } else if let Some(p) = &options.prefix {
1655 Some(format!("{}{}", path.key, p))
1656 } else {
1657 Some(path.key.clone())
1658 };
1659
1660 if let Some(p) = prefix {
1661 request = request.prefix(p);
1662 }
1663
1664 if !options.recursive {
1666 request = request.delimiter(options.delimiter.as_deref().unwrap_or("/"));
1667 }
1668
1669 if let Some(max) = options.max_keys {
1671 request = request.max_keys(max);
1672 }
1673
1674 if let Some(token) = &options.continuation_token {
1676 request = request.continuation_token(token);
1677 }
1678
1679 let response = request.send().await.map_err(|e| {
1680 let err_str = Self::format_sdk_error(&e);
1681 if err_str.contains("NotFound") || err_str.contains("NoSuchBucket") {
1682 Error::NotFound(format!("Bucket not found: {}", path.bucket))
1683 } else {
1684 Error::Network(err_str)
1685 }
1686 })?;
1687
1688 let mut items = Vec::new();
1689
1690 for prefix in response.common_prefixes() {
1692 if let Some(p) = prefix.prefix() {
1693 items.push(ObjectInfo::dir(p));
1694 }
1695 }
1696
1697 for object in response.contents() {
1699 let key = object.key().unwrap_or_default().to_string();
1700 let size = object.size().unwrap_or(0);
1701 let mut info = ObjectInfo::file(&key, size);
1702
1703 if let Some(modified) = object.last_modified() {
1704 info.last_modified = jiff::Timestamp::from_second(modified.secs()).ok();
1705 }
1706
1707 if let Some(etag) = object.e_tag() {
1708 info.etag = Some(etag.trim_matches('"').to_string());
1709 }
1710
1711 if let Some(sc) = object.storage_class() {
1712 info.storage_class = Some(sc.as_str().to_string());
1713 }
1714
1715 items.push(info);
1716 }
1717
1718 Ok(ListResult {
1719 items,
1720 truncated: response.is_truncated().unwrap_or(false),
1721 continuation_token: response.next_continuation_token().map(|s| s.to_string()),
1722 })
1723 }
1724
1725 async fn head_object(&self, path: &RemotePath) -> Result<ObjectInfo> {
1726 let response = self
1727 .inner
1728 .head_object()
1729 .bucket(&path.bucket)
1730 .key(&path.key)
1731 .send()
1732 .await
1733 .map_err(|e| {
1734 let err_str = e.to_string();
1735 if err_str.contains("NotFound") || err_str.contains("NoSuchKey") {
1736 Error::NotFound(path.to_string())
1737 } else {
1738 Error::Network(err_str)
1739 }
1740 })?;
1741
1742 let size = response.content_length().unwrap_or(0);
1743 let mut info = ObjectInfo::file(&path.key, size);
1744
1745 if let Some(modified) = response.last_modified() {
1746 info.last_modified = jiff::Timestamp::from_second(modified.secs()).ok();
1747 }
1748
1749 if let Some(etag) = response.e_tag() {
1750 info.etag = Some(etag.trim_matches('"').to_string());
1751 }
1752
1753 if let Some(ct) = response.content_type() {
1754 info.content_type = Some(ct.to_string());
1755 }
1756
1757 if let Some(sc) = response.storage_class() {
1758 info.storage_class = Some(sc.as_str().to_string());
1759 }
1760
1761 if let Some(meta) = response.metadata()
1762 && !meta.is_empty()
1763 {
1764 info.metadata = Some(meta.clone());
1765 }
1766
1767 Ok(info)
1768 }
1769
1770 async fn bucket_exists(&self, bucket: &str) -> Result<bool> {
1771 match self.inner.head_bucket().bucket(bucket).send().await {
1772 Ok(_) => Ok(true),
1773 Err(e) => {
1774 if let aws_sdk_s3::error::SdkError::ServiceError(ref service_err) = e
1777 && service_err.raw().status().as_u16() == 404
1778 {
1779 return Ok(false);
1780 }
1781 let err_str = e.to_string();
1782 if err_str.contains("NotFound") || err_str.contains("NoSuchBucket") {
1783 return Ok(false);
1784 }
1785 Err(Error::Network(err_str))
1786 }
1787 }
1788 }
1789
1790 async fn create_bucket(&self, bucket: &str) -> Result<()> {
1791 self.inner
1792 .create_bucket()
1793 .bucket(bucket)
1794 .send()
1795 .await
1796 .map_err(|e| Error::Network(e.to_string()))?;
1797
1798 Ok(())
1799 }
1800
1801 async fn delete_bucket(&self, bucket: &str) -> Result<()> {
1802 self.inner
1803 .delete_bucket()
1804 .bucket(bucket)
1805 .send()
1806 .await
1807 .map_err(|e| {
1808 let err_str = Self::format_sdk_error(&e);
1809 if err_str.contains("NotFound") || err_str.contains("NoSuchBucket") {
1810 Error::NotFound(format!("Bucket not found: {bucket}"))
1811 } else if err_str.contains("BucketNotEmpty") {
1812 Error::Conflict(err_str)
1813 } else {
1814 Error::Network(err_str)
1815 }
1816 })?;
1817
1818 Ok(())
1819 }
1820
1821 async fn capabilities(&self) -> Result<Capabilities> {
1822 Ok(Capabilities {
1825 versioning: true,
1826 object_lock: false,
1827 tagging: true,
1828 anonymous: true,
1829 select: false,
1830 notifications: true,
1831 lifecycle: true,
1832 replication: true,
1833 cors: true,
1834 })
1835 }
1836
1837 async fn get_object(&self, path: &RemotePath) -> Result<Vec<u8>> {
1838 self.get_object_with_progress(path, |_, _| {}).await
1839 }
1840
1841 async fn put_object(
1842 &self,
1843 path: &RemotePath,
1844 data: Vec<u8>,
1845 content_type: Option<&str>,
1846 ) -> Result<ObjectInfo> {
1847 let size = data.len() as i64;
1848 let body = aws_sdk_s3::primitives::ByteStream::from(data);
1849
1850 let mut request = self
1851 .inner
1852 .put_object()
1853 .bucket(&path.bucket)
1854 .key(&path.key)
1855 .body(body);
1856
1857 if let Some(ct) = content_type {
1858 request = request.content_type(ct);
1859 }
1860
1861 let response = request
1862 .send()
1863 .await
1864 .map_err(|e| Error::Network(e.to_string()))?;
1865
1866 let mut info = ObjectInfo::file(&path.key, size);
1867 if let Some(etag) = response.e_tag() {
1868 info.etag = Some(etag.trim_matches('"').to_string());
1869 }
1870 info.last_modified = Some(jiff::Timestamp::now());
1871
1872 Ok(info)
1873 }
1874
1875 async fn delete_object(&self, path: &RemotePath) -> Result<()> {
1876 self.delete_object_with_options(path, DeleteRequestOptions::default())
1877 .await
1878 }
1879
1880 async fn delete_objects(&self, bucket: &str, keys: Vec<String>) -> Result<Vec<String>> {
1881 self.delete_objects_with_options(bucket, keys, DeleteRequestOptions::default())
1882 .await
1883 }
1884
1885 async fn copy_object(&self, src: &RemotePath, dst: &RemotePath) -> Result<ObjectInfo> {
1886 let copy_source = format!("{}/{}", src.bucket, src.key);
1888
1889 let response = self
1890 .inner
1891 .copy_object()
1892 .copy_source(©_source)
1893 .bucket(&dst.bucket)
1894 .key(&dst.key)
1895 .send()
1896 .await
1897 .map_err(|e| {
1898 let err_str = e.to_string();
1899 if err_str.contains("NotFound") || err_str.contains("NoSuchKey") {
1900 Error::NotFound(src.to_string())
1901 } else {
1902 Error::Network(err_str)
1903 }
1904 })?;
1905
1906 let info = self.head_object(dst).await?;
1908
1909 let mut result = info;
1911 if let Some(copy_result) = response.copy_object_result()
1912 && let Some(etag) = copy_result.e_tag()
1913 {
1914 result.etag = Some(etag.trim_matches('"').to_string());
1915 }
1916
1917 Ok(result)
1918 }
1919
1920 async fn presign_get(&self, path: &RemotePath, expires_secs: u64) -> Result<String> {
1921 let config = aws_sdk_s3::presigning::PresigningConfig::builder()
1922 .expires_in(std::time::Duration::from_secs(expires_secs))
1923 .build()
1924 .map_err(|e| Error::General(format!("presign_get config: {e}")))?;
1925
1926 let request = self
1927 .inner
1928 .get_object()
1929 .bucket(&path.bucket)
1930 .key(&path.key)
1931 .presigned(config)
1932 .await
1933 .map_err(|e| Error::General(format!("presign_get: {e}")))?;
1934
1935 Ok(request.uri().to_string())
1936 }
1937
1938 async fn presign_put(
1939 &self,
1940 path: &RemotePath,
1941 expires_secs: u64,
1942 content_type: Option<&str>,
1943 ) -> Result<String> {
1944 let config = aws_sdk_s3::presigning::PresigningConfig::builder()
1945 .expires_in(std::time::Duration::from_secs(expires_secs))
1946 .build()
1947 .map_err(|e| Error::General(format!("presign_put config: {e}")))?;
1948
1949 let mut builder = self.inner.put_object().bucket(&path.bucket).key(&path.key);
1950
1951 if let Some(ct) = content_type {
1952 builder = builder.content_type(ct);
1953 }
1954
1955 let request = builder
1956 .presigned(config)
1957 .await
1958 .map_err(|e| Error::General(format!("presign_put: {e}")))?;
1959
1960 Ok(request.uri().to_string())
1961 }
1962
1963 async fn get_versioning(&self, bucket: &str) -> Result<Option<bool>> {
1964 let response = self
1965 .inner
1966 .get_bucket_versioning()
1967 .bucket(bucket)
1968 .send()
1969 .await
1970 .map_err(|e| Error::General(format!("get_versioning: {e}")))?;
1971
1972 Ok(response
1973 .status()
1974 .map(|s| *s == aws_sdk_s3::types::BucketVersioningStatus::Enabled))
1975 }
1976
1977 async fn set_versioning(&self, bucket: &str, enabled: bool) -> Result<()> {
1978 use aws_sdk_s3::types::{BucketVersioningStatus, VersioningConfiguration};
1979
1980 let status = if enabled {
1981 BucketVersioningStatus::Enabled
1982 } else {
1983 BucketVersioningStatus::Suspended
1984 };
1985
1986 let config = VersioningConfiguration::builder().status(status).build();
1987
1988 self.inner
1989 .put_bucket_versioning()
1990 .bucket(bucket)
1991 .versioning_configuration(config)
1992 .send()
1993 .await
1994 .map_err(|e| Error::General(format!("set_versioning: {e}")))?;
1995
1996 Ok(())
1997 }
1998
1999 async fn list_object_versions(
2000 &self,
2001 path: &RemotePath,
2002 max_keys: Option<i32>,
2003 ) -> Result<Vec<ObjectVersion>> {
2004 Ok(self.list_object_versions_page(path, max_keys).await?.items)
2005 }
2006
2007 async fn get_object_tags(
2008 &self,
2009 path: &RemotePath,
2010 ) -> Result<std::collections::HashMap<String, String>> {
2011 let response = match self
2012 .inner
2013 .get_object_tagging()
2014 .bucket(&path.bucket)
2015 .key(&path.key)
2016 .send()
2017 .await
2018 {
2019 Ok(response) => response,
2020 Err(e) => {
2021 if e.to_string().contains("NoSuchTagSet") {
2022 return Ok(std::collections::HashMap::new());
2023 }
2024 return Err(Error::General(format!("get_object_tags: {e}")));
2025 }
2026 };
2027
2028 let mut tags = std::collections::HashMap::new();
2029 for tag in response.tag_set() {
2030 let key = tag.key();
2031 let value = tag.value();
2032 tags.insert(key.to_string(), value.to_string());
2033 }
2034
2035 Ok(tags)
2036 }
2037
2038 async fn get_bucket_tags(
2039 &self,
2040 bucket: &str,
2041 ) -> Result<std::collections::HashMap<String, String>> {
2042 let response = match self.inner.get_bucket_tagging().bucket(bucket).send().await {
2043 Ok(response) => response,
2044 Err(e) => {
2045 if e.to_string().contains("NoSuchTagSet") {
2046 return Ok(std::collections::HashMap::new());
2047 }
2048 return Err(Error::General(format!("get_bucket_tags: {e}")));
2049 }
2050 };
2051
2052 let mut tags = std::collections::HashMap::new();
2053 for tag in response.tag_set() {
2054 let key = tag.key();
2055 let value = tag.value();
2056 tags.insert(key.to_string(), value.to_string());
2057 }
2058
2059 Ok(tags)
2060 }
2061
2062 async fn set_object_tags(
2063 &self,
2064 path: &RemotePath,
2065 tags: std::collections::HashMap<String, String>,
2066 ) -> Result<()> {
2067 let tagging = build_tagging(tags)?;
2068
2069 self.inner
2070 .put_object_tagging()
2071 .bucket(&path.bucket)
2072 .key(&path.key)
2073 .tagging(tagging)
2074 .send()
2075 .await
2076 .map_err(|e| Error::General(format!("set_object_tags: {e}")))?;
2077
2078 Ok(())
2079 }
2080
2081 async fn set_bucket_tags(
2082 &self,
2083 bucket: &str,
2084 tags: std::collections::HashMap<String, String>,
2085 ) -> Result<()> {
2086 let tagging = build_tagging(tags)?;
2087
2088 self.inner
2089 .put_bucket_tagging()
2090 .bucket(bucket)
2091 .tagging(tagging)
2092 .send()
2093 .await
2094 .map_err(|e| Error::General(format!("set_bucket_tags: {e}")))?;
2095
2096 Ok(())
2097 }
2098
2099 async fn delete_object_tags(&self, path: &RemotePath) -> Result<()> {
2100 self.inner
2101 .delete_object_tagging()
2102 .bucket(&path.bucket)
2103 .key(&path.key)
2104 .send()
2105 .await
2106 .map_err(|e| Error::General(format!("delete_object_tags: {e}")))?;
2107
2108 Ok(())
2109 }
2110
2111 async fn delete_bucket_tags(&self, bucket: &str) -> Result<()> {
2112 self.inner
2113 .delete_bucket_tagging()
2114 .bucket(bucket)
2115 .send()
2116 .await
2117 .map_err(|e| Error::General(format!("delete_bucket_tags: {e}")))?;
2118
2119 Ok(())
2120 }
2121
2122 async fn get_bucket_policy(&self, bucket: &str) -> Result<Option<String>> {
2123 let response = match self.inner.get_bucket_policy().bucket(bucket).send().await {
2124 Ok(policy) => policy,
2125 Err(error) => {
2126 let error_text = error.to_string();
2127 let kind = if let aws_sdk_s3::error::SdkError::ServiceError(service_err) = &error {
2128 let code = service_err
2129 .raw()
2130 .headers()
2131 .get("x-amz-error-code")
2132 .and_then(|value| std::str::from_utf8(value.as_bytes()).ok());
2133 let status = Some(service_err.raw().status().as_u16());
2134 Self::bucket_policy_error_kind(code, status, &error_text)
2135 } else {
2136 Self::bucket_policy_error_kind(None, None, &error_text)
2137 };
2138 return Self::map_get_bucket_policy_error(bucket, kind, &error_text);
2139 }
2140 };
2141
2142 Ok(response.policy().map(|policy| policy.to_string()))
2143 }
2144
2145 async fn set_bucket_policy(&self, bucket: &str, policy: &str) -> Result<()> {
2146 self.inner
2147 .put_bucket_policy()
2148 .bucket(bucket)
2149 .policy(policy)
2150 .send()
2151 .await
2152 .map_err(|e| Error::General(format!("set_bucket_policy: {e}")))?;
2153
2154 Ok(())
2155 }
2156
2157 async fn delete_bucket_policy(&self, bucket: &str) -> Result<()> {
2158 match self
2159 .inner
2160 .delete_bucket_policy()
2161 .bucket(bucket)
2162 .send()
2163 .await
2164 {
2165 Ok(_) => Ok(()),
2166 Err(e) => {
2167 let error_text = e.to_string();
2168 let kind = if let aws_sdk_s3::error::SdkError::ServiceError(service_err) = &e {
2169 let code = service_err
2170 .raw()
2171 .headers()
2172 .get("x-amz-error-code")
2173 .and_then(|value| std::str::from_utf8(value.as_bytes()).ok());
2174 let status = Some(service_err.raw().status().as_u16());
2175 Self::bucket_policy_error_kind(code, status, &error_text)
2176 } else {
2177 Self::bucket_policy_error_kind(None, None, &error_text)
2178 };
2179 Self::map_delete_bucket_policy_error(bucket, kind, &error_text)
2180 }
2181 }
2182 }
2183
2184 async fn get_bucket_cors(&self, bucket: &str) -> Result<Vec<CorsRule>> {
2185 let response = match self.inner.get_bucket_cors().bucket(bucket).send().await {
2186 Ok(response) => response,
2187 Err(error) => {
2188 let error_text = error.to_string();
2189 if error_text.contains("service error")
2190 && let Ok(url) = self.cors_url(bucket)
2191 {
2192 match self.xml_request(Method::GET, url, None, None).await {
2193 Ok(body) => return parse_cors_configuration_xml(&body),
2194 Err(Error::Network(raw_error))
2195 if is_missing_cors_configuration_error(&raw_error) =>
2196 {
2197 return Ok(Vec::new());
2198 }
2199 Err(_) => {}
2200 }
2201 }
2202
2203 let missing_config =
2204 if let aws_sdk_s3::error::SdkError::ServiceError(service_err) = &error {
2205 is_missing_cors_configuration_response(
2206 service_err.err().code(),
2207 Some(service_err.raw().status().as_u16()),
2208 &error_text,
2209 )
2210 } else {
2211 is_missing_cors_configuration_response(None, None, &error_text)
2212 };
2213
2214 if missing_config {
2215 return Ok(Vec::new());
2216 }
2217 return Err(Error::General(format!("get_bucket_cors: {error_text}")));
2218 }
2219 };
2220
2221 Ok(response
2222 .cors_rules()
2223 .iter()
2224 .map(sdk_cors_rule_to_core)
2225 .collect())
2226 }
2227
2228 async fn set_bucket_cors(&self, bucket: &str, rules: Vec<CorsRule>) -> Result<()> {
2229 let cors_rules = rules
2230 .iter()
2231 .map(core_cors_rule_to_sdk)
2232 .collect::<Result<Vec<_>>>()?;
2233 let cors_configuration = aws_sdk_s3::types::CorsConfiguration::builder()
2234 .set_cors_rules(Some(cors_rules))
2235 .build()
2236 .map_err(|e| Error::General(format!("build bucket cors config: {e}")))?;
2237
2238 self.inner
2239 .put_bucket_cors()
2240 .bucket(bucket)
2241 .cors_configuration(cors_configuration)
2242 .send()
2243 .await
2244 .map_err(|e| Error::General(format!("set_bucket_cors: {e}")))?;
2245
2246 Ok(())
2247 }
2248
2249 async fn delete_bucket_cors(&self, bucket: &str) -> Result<()> {
2250 match self.inner.delete_bucket_cors().bucket(bucket).send().await {
2251 Ok(_) => Ok(()),
2252 Err(error) => {
2253 let error_text = error.to_string();
2254 if error_text.contains("service error")
2255 && let Ok(url) = self.cors_url(bucket)
2256 {
2257 match self.xml_request(Method::DELETE, url, None, None).await {
2258 Ok(_) => return Ok(()),
2259 Err(Error::Network(raw_error))
2260 if is_missing_cors_configuration_error(&raw_error) =>
2261 {
2262 return Ok(());
2263 }
2264 Err(_) => {}
2265 }
2266 }
2267
2268 let missing_config =
2269 if let aws_sdk_s3::error::SdkError::ServiceError(service_err) = &error {
2270 is_missing_cors_configuration_response(
2271 service_err.err().code(),
2272 Some(service_err.raw().status().as_u16()),
2273 &error_text,
2274 )
2275 } else {
2276 is_missing_cors_configuration_response(None, None, &error_text)
2277 };
2278
2279 if missing_config {
2280 Ok(())
2281 } else {
2282 Err(Error::General(format!("delete_bucket_cors: {error_text}")))
2283 }
2284 }
2285 }
2286 }
2287
2288 async fn get_bucket_notifications(&self, bucket: &str) -> Result<Vec<BucketNotification>> {
2289 let response = self
2290 .inner
2291 .get_bucket_notification_configuration()
2292 .bucket(bucket)
2293 .send()
2294 .await
2295 .map_err(|e| Error::General(format!("get_bucket_notifications: {e}")))?;
2296
2297 let mut rules = Vec::new();
2298
2299 for cfg in response.queue_configurations() {
2300 let (prefix, suffix) = Self::extract_notification_filter(cfg.filter());
2301 rules.push(BucketNotification {
2302 id: cfg.id().map(ToString::to_string),
2303 target: NotificationTarget::Queue,
2304 arn: cfg.queue_arn().to_string(),
2305 events: Self::event_list_to_strings(cfg.events()),
2306 prefix,
2307 suffix,
2308 });
2309 }
2310
2311 for cfg in response.topic_configurations() {
2312 let (prefix, suffix) = Self::extract_notification_filter(cfg.filter());
2313 rules.push(BucketNotification {
2314 id: cfg.id().map(ToString::to_string),
2315 target: NotificationTarget::Topic,
2316 arn: cfg.topic_arn().to_string(),
2317 events: Self::event_list_to_strings(cfg.events()),
2318 prefix,
2319 suffix,
2320 });
2321 }
2322
2323 for cfg in response.lambda_function_configurations() {
2324 let (prefix, suffix) = Self::extract_notification_filter(cfg.filter());
2325 rules.push(BucketNotification {
2326 id: cfg.id().map(ToString::to_string),
2327 target: NotificationTarget::Lambda,
2328 arn: cfg.lambda_function_arn().to_string(),
2329 events: Self::event_list_to_strings(cfg.events()),
2330 prefix,
2331 suffix,
2332 });
2333 }
2334
2335 Ok(rules)
2336 }
2337
2338 async fn set_bucket_notifications(
2339 &self,
2340 bucket: &str,
2341 notifications: Vec<BucketNotification>,
2342 ) -> Result<()> {
2343 use aws_sdk_s3::types::{
2344 LambdaFunctionConfiguration, NotificationConfiguration, QueueConfiguration,
2345 TopicConfiguration,
2346 };
2347
2348 let expected_notifications = notifications.clone();
2349 let mut queues = Vec::new();
2350 let mut topics = Vec::new();
2351 let mut lambdas = Vec::new();
2352
2353 for rule in notifications {
2354 let events = Self::strings_to_event_list(&rule.events);
2355 if events.is_empty() {
2356 return Err(Error::General(format!(
2357 "set_bucket_notifications: empty event list for target '{}'",
2358 rule.arn
2359 )));
2360 }
2361
2362 let filter =
2363 Self::build_notification_filter(rule.prefix.as_deref(), rule.suffix.as_deref());
2364
2365 match rule.target {
2366 NotificationTarget::Queue => {
2367 let mut builder = QueueConfiguration::builder()
2368 .queue_arn(rule.arn)
2369 .set_events(Some(events))
2370 .set_id(rule.id);
2371 if let Some(filter) = filter {
2372 builder = builder.filter(filter);
2373 }
2374 let queue = builder
2375 .build()
2376 .map_err(|e| Error::General(format!("build queue notification: {e}")))?;
2377 queues.push(queue);
2378 }
2379 NotificationTarget::Topic => {
2380 let mut builder = TopicConfiguration::builder()
2381 .topic_arn(rule.arn)
2382 .set_events(Some(events))
2383 .set_id(rule.id);
2384 if let Some(filter) = filter {
2385 builder = builder.filter(filter);
2386 }
2387 let topic = builder
2388 .build()
2389 .map_err(|e| Error::General(format!("build topic notification: {e}")))?;
2390 topics.push(topic);
2391 }
2392 NotificationTarget::Lambda => {
2393 let mut builder = LambdaFunctionConfiguration::builder()
2394 .lambda_function_arn(rule.arn)
2395 .set_events(Some(events))
2396 .set_id(rule.id);
2397 if let Some(filter) = filter {
2398 builder = builder.filter(filter);
2399 }
2400 let lambda = builder
2401 .build()
2402 .map_err(|e| Error::General(format!("build lambda notification: {e}")))?;
2403 lambdas.push(lambda);
2404 }
2405 }
2406 }
2407
2408 let config = NotificationConfiguration::builder()
2409 .set_queue_configurations(Some(queues))
2410 .set_topic_configurations(Some(topics))
2411 .set_lambda_function_configurations(Some(lambdas))
2412 .build();
2413
2414 match self
2415 .inner
2416 .put_bucket_notification_configuration()
2417 .bucket(bucket)
2418 .notification_configuration(config)
2419 .send()
2420 .await
2421 {
2422 Ok(_) => {}
2423 Err(error) => {
2424 let error_text = error.to_string();
2425 if error_text.contains("service error")
2428 && let Ok(actual) = self.get_bucket_notifications(bucket).await
2429 && Self::notifications_equivalent(&expected_notifications, &actual)
2430 {
2431 return Ok(());
2432 }
2433 return Err(Error::General(format!(
2434 "set_bucket_notifications: {error_text}"
2435 )));
2436 }
2437 }
2438
2439 Ok(())
2440 }
2441
2442 async fn get_bucket_lifecycle(&self, bucket: &str) -> Result<Vec<LifecycleRule>> {
2443 let response = match self
2444 .inner
2445 .get_bucket_lifecycle_configuration()
2446 .bucket(bucket)
2447 .send()
2448 .await
2449 {
2450 Ok(resp) => resp,
2451 Err(error) => {
2452 let error_text = Self::format_sdk_error(&error);
2453 if error_text.contains("NoSuchLifecycleConfiguration")
2454 || error_text.contains("lifecycle configuration is not found")
2455 {
2456 return Ok(Vec::new());
2457 }
2458 return Err(Error::General(format!(
2459 "get_bucket_lifecycle: {error_text}"
2460 )));
2461 }
2462 };
2463
2464 let mut rules = Vec::new();
2465 for sdk_rule in response.rules() {
2466 let id = sdk_rule.id().unwrap_or("").to_string();
2467 let status = match sdk_rule.status().as_str() {
2468 "Enabled" => rc_core::LifecycleRuleStatus::Enabled,
2469 _ => rc_core::LifecycleRuleStatus::Disabled,
2470 };
2471
2472 let prefix = parse_lifecycle_filter_prefix(sdk_rule.filter());
2473 let tags = parse_lifecycle_filter_tags(sdk_rule.filter());
2474
2475 let expiration = sdk_rule
2476 .expiration()
2477 .map(|exp| rc_core::LifecycleExpiration {
2478 days: exp.days(),
2479 date: exp.date().map(|d| d.to_string()),
2480 });
2481
2482 let transition = sdk_rule
2483 .transitions()
2484 .first()
2485 .map(|t| rc_core::LifecycleTransition {
2486 days: t.days(),
2487 date: t.date().map(|d| d.to_string()),
2488 storage_class: t
2489 .storage_class()
2490 .map(|sc| sc.as_str().to_string())
2491 .unwrap_or_default(),
2492 });
2493
2494 let noncurrent_version_expiration =
2495 sdk_rule.noncurrent_version_expiration().map(|nve| {
2496 rc_core::NoncurrentVersionExpiration {
2497 noncurrent_days: nve.noncurrent_days().unwrap_or(0),
2498 newer_noncurrent_versions: nve.newer_noncurrent_versions(),
2499 }
2500 });
2501
2502 let noncurrent_version_transition = sdk_rule
2503 .noncurrent_version_transitions()
2504 .first()
2505 .map(|nvt| rc_core::NoncurrentVersionTransition {
2506 noncurrent_days: nvt.noncurrent_days().unwrap_or(0),
2507 storage_class: nvt
2508 .storage_class()
2509 .map(|sc| sc.as_str().to_string())
2510 .unwrap_or_default(),
2511 });
2512
2513 let abort_incomplete_multipart_upload_days = sdk_rule
2514 .abort_incomplete_multipart_upload()
2515 .and_then(|a| a.days_after_initiation());
2516
2517 let expired_object_delete_marker = sdk_rule
2518 .expiration()
2519 .and_then(|e| e.expired_object_delete_marker())
2520 .filter(|v| *v);
2521
2522 rules.push(LifecycleRule {
2523 id,
2524 status,
2525 prefix,
2526 tags,
2527 expiration,
2528 transition,
2529 noncurrent_version_expiration,
2530 noncurrent_version_transition,
2531 abort_incomplete_multipart_upload_days,
2532 expired_object_delete_marker,
2533 });
2534 }
2535
2536 Ok(rules)
2537 }
2538
2539 async fn set_bucket_lifecycle(&self, bucket: &str, rules: Vec<LifecycleRule>) -> Result<()> {
2540 use aws_sdk_s3::types::{
2541 AbortIncompleteMultipartUpload, BucketLifecycleConfiguration, ExpirationStatus,
2542 LifecycleExpiration as SdkExpiration, LifecycleRule as SdkRule,
2543 NoncurrentVersionExpiration as SdkNve, NoncurrentVersionTransition as SdkNvt,
2544 Transition, TransitionStorageClass,
2545 };
2546
2547 let mut sdk_rules = Vec::new();
2548 for rule in rules {
2549 let status = match rule.status {
2550 rc_core::LifecycleRuleStatus::Enabled => ExpirationStatus::Enabled,
2551 rc_core::LifecycleRuleStatus::Disabled => ExpirationStatus::Disabled,
2552 };
2553
2554 let filter = build_lifecycle_rule_filter(rule.prefix.as_deref(), rule.tags.as_ref())?;
2555
2556 let expiration = rule.expiration.map(|exp| {
2557 let mut builder = SdkExpiration::builder();
2558 if let Some(days) = exp.days {
2559 builder = builder.days(days);
2560 }
2561 if let Some(ref date_str) = exp.date
2562 && let Ok(dt) = aws_smithy_types::DateTime::from_str(
2563 date_str,
2564 aws_smithy_types::date_time::Format::DateTime,
2565 )
2566 {
2567 builder = builder.date(dt);
2568 }
2569 if let Some(true) = rule.expired_object_delete_marker {
2570 builder = builder.expired_object_delete_marker(true);
2571 }
2572 builder.build()
2573 });
2574
2575 let transitions = rule.transition.map(|t| {
2576 #[allow(deprecated)]
2577 let sc = TransitionStorageClass::from(t.storage_class.as_str());
2578 let mut builder = Transition::builder().storage_class(sc);
2579 if let Some(days) = t.days {
2580 builder = builder.days(days);
2581 }
2582 if let Some(ref date_str) = t.date
2583 && let Ok(dt) = aws_smithy_types::DateTime::from_str(
2584 date_str,
2585 aws_smithy_types::date_time::Format::DateTime,
2586 )
2587 {
2588 builder = builder.date(dt);
2589 }
2590 vec![builder.build()]
2591 });
2592
2593 let nve = rule.noncurrent_version_expiration.map(|nve| {
2594 let mut builder = SdkNve::builder().noncurrent_days(nve.noncurrent_days);
2595 if let Some(newer) = nve.newer_noncurrent_versions {
2596 builder = builder.newer_noncurrent_versions(newer);
2597 }
2598 builder.build()
2599 });
2600
2601 let nvt = rule.noncurrent_version_transition.map(|nvt| {
2602 let sc = TransitionStorageClass::from(nvt.storage_class.as_str());
2603 let builder = SdkNvt::builder()
2604 .noncurrent_days(nvt.noncurrent_days)
2605 .storage_class(sc);
2606 vec![builder.build()]
2607 });
2608
2609 let abort = rule.abort_incomplete_multipart_upload_days.map(|days| {
2610 AbortIncompleteMultipartUpload::builder()
2611 .days_after_initiation(days)
2612 .build()
2613 });
2614
2615 let mut builder = SdkRule::builder().id(&rule.id).status(status);
2616 if let Some(filter) = filter {
2617 builder = builder.filter(filter);
2618 }
2619 if let Some(expiration) = expiration {
2620 builder = builder.expiration(expiration);
2621 }
2622 if let Some(transitions) = transitions {
2623 builder = builder.set_transitions(Some(transitions));
2624 }
2625 if let Some(nve) = nve {
2626 builder = builder.noncurrent_version_expiration(nve);
2627 }
2628 if let Some(nvt) = nvt {
2629 builder = builder.set_noncurrent_version_transitions(Some(nvt));
2630 }
2631 if let Some(abort) = abort {
2632 builder = builder.abort_incomplete_multipart_upload(abort);
2633 }
2634
2635 let sdk_rule = builder
2636 .build()
2637 .map_err(|e| Error::General(format!("build lifecycle rule: {e}")))?;
2638 sdk_rules.push(sdk_rule);
2639 }
2640
2641 let config = BucketLifecycleConfiguration::builder()
2642 .set_rules(Some(sdk_rules))
2643 .build()
2644 .map_err(|e| Error::General(format!("build lifecycle config: {e}")))?;
2645
2646 self.inner
2647 .put_bucket_lifecycle_configuration()
2648 .bucket(bucket)
2649 .lifecycle_configuration(config)
2650 .send()
2651 .await
2652 .map_err(|e| {
2653 Error::General(format!(
2654 "set_bucket_lifecycle: {}",
2655 Self::format_sdk_error(&e)
2656 ))
2657 })?;
2658
2659 Ok(())
2660 }
2661
2662 async fn delete_bucket_lifecycle(&self, bucket: &str) -> Result<()> {
2663 self.inner
2664 .delete_bucket_lifecycle()
2665 .bucket(bucket)
2666 .send()
2667 .await
2668 .map_err(|e| {
2669 Error::General(format!(
2670 "delete_bucket_lifecycle: {}",
2671 Self::format_sdk_error(&e)
2672 ))
2673 })?;
2674 Ok(())
2675 }
2676
2677 async fn restore_object(&self, path: &RemotePath, days: i32) -> Result<()> {
2678 use aws_sdk_s3::types::RestoreRequest;
2679
2680 let request = RestoreRequest::builder().days(days).build();
2681 self.inner
2682 .restore_object()
2683 .bucket(&path.bucket)
2684 .key(&path.key)
2685 .restore_request(request)
2686 .send()
2687 .await
2688 .map_err(|e| {
2689 Error::General(format!("restore_object: {}", Self::format_sdk_error(&e)))
2690 })?;
2691 Ok(())
2692 }
2693
2694 async fn get_bucket_replication(
2695 &self,
2696 bucket: &str,
2697 ) -> Result<Option<ReplicationConfiguration>> {
2698 let url = self.replication_url(bucket)?;
2699 let body = match self.xml_request(Method::GET, url, None, None).await {
2700 Ok(body) => body,
2701 Err(Error::Network(error_text))
2702 if error_text.contains("ReplicationConfigurationNotFound")
2703 || error_text.contains("replication configuration is not found")
2704 || error_text.contains("replication not found") =>
2705 {
2706 return Ok(None);
2707 }
2708 Err(error) => {
2709 return Err(Error::General(format!("get_bucket_replication: {error}")));
2710 }
2711 };
2712
2713 parse_replication_configuration_xml(&body).map(Some)
2714 }
2715
2716 async fn set_bucket_replication(
2717 &self,
2718 bucket: &str,
2719 config: ReplicationConfiguration,
2720 ) -> Result<()> {
2721 let url = self.replication_url(bucket)?;
2722 let body = build_replication_configuration_xml(&config).into_bytes();
2723 self.xml_request(Method::PUT, url, Some("application/xml"), Some(body))
2724 .await
2725 .map_err(|e| Error::General(format!("set_bucket_replication: {e}")))?;
2726
2727 Ok(())
2728 }
2729
2730 async fn delete_bucket_replication(&self, bucket: &str) -> Result<()> {
2731 self.inner
2732 .delete_bucket_replication()
2733 .bucket(bucket)
2734 .send()
2735 .await
2736 .map_err(|e| {
2737 Error::General(format!(
2738 "delete_bucket_replication: {}",
2739 Self::format_sdk_error(&e)
2740 ))
2741 })?;
2742 Ok(())
2743 }
2744
2745 async fn select_object_content(
2746 &self,
2747 path: &RemotePath,
2748 options: &SelectOptions,
2749 writer: &mut (dyn AsyncWrite + Send + Unpin),
2750 ) -> Result<()> {
2751 crate::select::select_object_content(&self.inner, path, options, writer).await
2752 }
2753}
2754
2755#[cfg(test)]
2756mod tests {
2757 use super::*;
2758 use aws_smithy_http_client::test_util::{CaptureRequestReceiver, capture_request};
2759 use std::collections::HashMap;
2760
2761 fn test_s3_client(
2762 response: Option<http::Response<SdkBody>>,
2763 ) -> (S3Client, CaptureRequestReceiver) {
2764 test_s3_client_with_endpoint("https://example.com", response)
2765 }
2766
2767 fn test_s3_client_with_endpoint(
2768 endpoint: &str,
2769 response: Option<http::Response<SdkBody>>,
2770 ) -> (S3Client, CaptureRequestReceiver) {
2771 let (http_client, request_receiver) = capture_request(response);
2772 let credentials = Credentials::new(
2773 "access-key",
2774 "secret-key",
2775 None,
2776 None,
2777 "rc-test-credentials",
2778 );
2779 let config = aws_sdk_s3::config::Builder::new()
2780 .credentials_provider(credentials)
2781 .endpoint_url(endpoint)
2782 .region(aws_sdk_s3::config::Region::new("us-east-1"))
2783 .force_path_style(true)
2784 .behavior_version_latest()
2785 .http_client(http_client)
2786 .build();
2787
2788 let alias = Alias::new("test", endpoint, "access-key", "secret-key");
2789 let client = S3Client {
2790 inner: aws_sdk_s3::Client::from_conf(config),
2791 xml_http_client: reqwest::Client::new(),
2792 alias,
2793 };
2794
2795 (client, request_receiver)
2796 }
2797
2798 #[test]
2799 fn test_object_info_creation() {
2800 let info = ObjectInfo::file("test.txt", 1024);
2801 assert_eq!(info.key, "test.txt");
2802 assert_eq!(info.size_bytes, Some(1024));
2803 }
2804
2805 #[test]
2806 fn auto_bucket_lookup_uses_dns_for_aliyun_oss_service_endpoint() {
2807 let alias = Alias::new(
2808 "aliyun",
2809 "https://oss-cn-hangzhou.aliyuncs.com",
2810 "access-key",
2811 "secret-key",
2812 );
2813
2814 assert!(!force_path_style_for_alias(&alias));
2815 }
2816
2817 #[test]
2818 fn auto_bucket_lookup_uses_dns_for_aliyun_internal_service_endpoint() {
2819 let alias = Alias::new(
2820 "aliyun",
2821 "https://oss-cn-hangzhou-internal.aliyuncs.com",
2822 "access-key",
2823 "secret-key",
2824 );
2825
2826 assert!(!force_path_style_for_alias(&alias));
2827 }
2828
2829 #[test]
2830 fn auto_bucket_lookup_keeps_path_style_for_custom_endpoint() {
2831 let alias = Alias::new("local", "http://localhost:9000", "access-key", "secret-key");
2832
2833 assert!(force_path_style_for_alias(&alias));
2834 }
2835
2836 #[test]
2837 fn auto_bucket_lookup_keeps_path_style_for_non_oss_aliyun_endpoint() {
2838 let alias = Alias::new(
2839 "aliyun",
2840 "https://ecs-cn-hangzhou.aliyuncs.com",
2841 "access-key",
2842 "secret-key",
2843 );
2844
2845 assert!(force_path_style_for_alias(&alias));
2846 }
2847
2848 #[test]
2849 fn auto_bucket_lookup_keeps_path_style_for_invalid_endpoint() {
2850 let alias = Alias::new("broken", "not a valid endpoint", "access-key", "secret-key");
2851
2852 assert!(force_path_style_for_alias(&alias));
2853 }
2854
2855 #[test]
2856 fn explicit_bucket_lookup_overrides_auto_detection() {
2857 let mut path_alias = Alias::new(
2858 "aliyun",
2859 "https://oss-cn-hangzhou.aliyuncs.com",
2860 "access-key",
2861 "secret-key",
2862 );
2863 path_alias.bucket_lookup = "path".to_string();
2864
2865 let mut dns_alias =
2866 Alias::new("local", "http://localhost:9000", "access-key", "secret-key");
2867 dns_alias.bucket_lookup = "dns".to_string();
2868
2869 assert!(force_path_style_for_alias(&path_alias));
2870 assert!(!force_path_style_for_alias(&dns_alias));
2871 }
2872
2873 #[test]
2874 fn unknown_bucket_lookup_keeps_path_style() {
2875 let mut alias = Alias::new(
2876 "aliyun",
2877 "https://oss-cn-hangzhou.aliyuncs.com",
2878 "access-key",
2879 "secret-key",
2880 );
2881 alias.bucket_lookup = "unexpected".to_string();
2882
2883 assert!(force_path_style_for_alias(&alias));
2884 }
2885
2886 #[test]
2887 fn parse_replication_configuration_xml_reads_delete_replication() {
2888 let body = r#"<?xml version="1.0" encoding="UTF-8"?>
2889<ReplicationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
2890 <Role>arn:rustfs:replication:us-east-1:123:test</Role>
2891 <Rule>
2892 <Status>Enabled</Status>
2893 <Destination>
2894 <Bucket>arn:rustfs:replication:us-east-1:123:dest</Bucket>
2895 <StorageClass>STANDARD</StorageClass>
2896 </Destination>
2897 <ID>rule-1</ID>
2898 <Priority>1</Priority>
2899 <Filter>
2900 <Prefix>logs/</Prefix>
2901 </Filter>
2902 <ExistingObjectReplication>
2903 <Status>Enabled</Status>
2904 </ExistingObjectReplication>
2905 <DeleteMarkerReplication>
2906 <Status>Disabled</Status>
2907 </DeleteMarkerReplication>
2908 <DeleteReplication>
2909 <Status>Enabled</Status>
2910 </DeleteReplication>
2911 </Rule>
2912</ReplicationConfiguration>"#;
2913
2914 let config = parse_replication_configuration_xml(body).expect("parse replication xml");
2915 assert_eq!(config.role, "arn:rustfs:replication:us-east-1:123:test");
2916 assert_eq!(config.rules.len(), 1);
2917 assert_eq!(config.rules[0].id, "rule-1");
2918 assert_eq!(config.rules[0].prefix.as_deref(), Some("logs/"));
2919 assert_eq!(config.rules[0].delete_replication, Some(true));
2920 assert_eq!(config.rules[0].delete_marker_replication, Some(false));
2921 assert_eq!(config.rules[0].existing_object_replication, Some(true));
2922 }
2923
2924 #[test]
2925 fn parse_replication_configuration_xml_preserves_tag_filters() {
2926 let body = r#"<?xml version="1.0" encoding="UTF-8"?>
2927<ReplicationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
2928 <Rule>
2929 <Status>Enabled</Status>
2930 <Destination>
2931 <Bucket>arn:rustfs:replication:us-east-1:123:dest</Bucket>
2932 </Destination>
2933 <ID>tagged-rule</ID>
2934 <Priority>2</Priority>
2935 <Filter>
2936 <And>
2937 <Prefix>logs/</Prefix>
2938 <Tag>
2939 <Key>env</Key>
2940 <Value>prod</Value>
2941 </Tag>
2942 <Tag>
2943 <Key>team</Key>
2944 <Value>core</Value>
2945 </Tag>
2946 </And>
2947 </Filter>
2948 </Rule>
2949</ReplicationConfiguration>"#;
2950
2951 let config = parse_replication_configuration_xml(body).expect("parse replication xml");
2952 let rule = &config.rules[0];
2953 assert_eq!(rule.prefix.as_deref(), Some("logs/"));
2954 let tags = rule.tags.as_ref().expect("tag filters");
2955 assert_eq!(tags.get("env").map(String::as_str), Some("prod"));
2956 assert_eq!(tags.get("team").map(String::as_str), Some("core"));
2957 }
2958
2959 #[test]
2960 fn build_replication_configuration_xml_writes_delete_replication() {
2961 let config = ReplicationConfiguration {
2962 role: "arn:rustfs:replication:us-east-1:123:test".to_string(),
2963 rules: vec![rc_core::ReplicationRule {
2964 id: "rule-1".to_string(),
2965 priority: 1,
2966 status: rc_core::ReplicationRuleStatus::Enabled,
2967 prefix: Some("logs/".to_string()),
2968 tags: None,
2969 destination: rc_core::ReplicationDestination {
2970 bucket_arn: "arn:rustfs:replication:us-east-1:123:dest".to_string(),
2971 storage_class: Some("STANDARD".to_string()),
2972 },
2973 delete_marker_replication: Some(true),
2974 existing_object_replication: Some(true),
2975 delete_replication: Some(true),
2976 }],
2977 };
2978
2979 let xml = build_replication_configuration_xml(&config);
2980 assert!(xml.contains("<DeleteReplication><Status>Enabled</Status></DeleteReplication>"));
2981 assert!(xml.contains(
2982 "<ExistingObjectReplication><Status>Enabled</Status></ExistingObjectReplication>"
2983 ));
2984 assert!(xml.contains(
2985 "<DeleteMarkerReplication><Status>Enabled</Status></DeleteMarkerReplication>"
2986 ));
2987 assert!(xml.contains("<Filter><Prefix>logs/</Prefix></Filter>"));
2988 }
2989
2990 #[test]
2991 fn build_replication_configuration_xml_writes_and_tag_filters() {
2992 let mut tags = HashMap::new();
2993 tags.insert("env".to_string(), "prod".to_string());
2994 tags.insert("team".to_string(), "core".to_string());
2995
2996 let config = ReplicationConfiguration {
2997 role: String::new(),
2998 rules: vec![rc_core::ReplicationRule {
2999 id: "rule-1".to_string(),
3000 priority: 1,
3001 status: rc_core::ReplicationRuleStatus::Enabled,
3002 prefix: Some("logs/".to_string()),
3003 tags: Some(tags),
3004 destination: rc_core::ReplicationDestination {
3005 bucket_arn: "arn:rustfs:replication:us-east-1:123:dest".to_string(),
3006 storage_class: None,
3007 },
3008 delete_marker_replication: None,
3009 existing_object_replication: None,
3010 delete_replication: None,
3011 }],
3012 };
3013
3014 let xml = build_replication_configuration_xml(&config);
3015 assert!(xml.contains("<Filter><And><Prefix>logs/</Prefix>"));
3016 assert!(xml.contains("<Tag><Key>env</Key><Value>prod</Value></Tag>"));
3017 assert!(xml.contains("<Tag><Key>team</Key><Value>core</Value></Tag>"));
3018 }
3019
3020 #[test]
3021 fn build_lifecycle_rule_filter_preserves_prefix_and_tags() {
3022 let mut tags = HashMap::new();
3023 tags.insert("env".to_string(), "prod".to_string());
3024 tags.insert("team".to_string(), "core".to_string());
3025
3026 let filter = build_lifecycle_rule_filter(Some("logs/"), Some(&tags))
3027 .expect("build lifecycle filter")
3028 .expect("lifecycle filter");
3029
3030 assert_eq!(
3031 parse_lifecycle_filter_prefix(Some(&filter)).as_deref(),
3032 Some("logs/")
3033 );
3034 let parsed_tags = parse_lifecycle_filter_tags(Some(&filter)).expect("parsed tags");
3035 assert_eq!(parsed_tags.get("env").map(String::as_str), Some("prod"));
3036 assert_eq!(parsed_tags.get("team").map(String::as_str), Some("core"));
3037 }
3038
3039 #[test]
3040 fn bucket_policy_error_kind_uses_error_code() {
3041 assert_eq!(
3042 S3Client::bucket_policy_error_kind(Some("NoSuchBucketPolicy"), Some(404), ""),
3043 BucketPolicyErrorKind::MissingPolicy
3044 );
3045 assert_eq!(
3046 S3Client::bucket_policy_error_kind(Some("NoSuchBucket"), Some(404), ""),
3047 BucketPolicyErrorKind::MissingBucket
3048 );
3049 }
3050
3051 #[test]
3052 fn bucket_policy_error_kind_prefers_bucket_not_found_over_404_fallback() {
3053 assert_eq!(
3054 S3Client::bucket_policy_error_kind(None, Some(404), "NoSuchBucket"),
3055 BucketPolicyErrorKind::MissingBucket
3056 );
3057 assert_eq!(
3058 S3Client::bucket_policy_error_kind(None, Some(404), "no details"),
3059 BucketPolicyErrorKind::MissingPolicy
3060 );
3061 }
3062
3063 #[test]
3064 fn bucket_policy_error_mapping_returns_expected_result() {
3065 let get_missing_policy = S3Client::map_get_bucket_policy_error(
3066 "bucket",
3067 BucketPolicyErrorKind::MissingPolicy,
3068 "NoSuchPolicy",
3069 )
3070 .expect("missing policy should map to Ok(None)");
3071 assert!(get_missing_policy.is_none());
3072
3073 match S3Client::map_get_bucket_policy_error(
3074 "bucket",
3075 BucketPolicyErrorKind::MissingBucket,
3076 "NoSuchBucket",
3077 ) {
3078 Err(Error::NotFound(message)) => assert!(message.contains("Bucket not found")),
3079 other => panic!("Expected NotFound for missing bucket, got: {:?}", other),
3080 }
3081
3082 let delete_missing_policy = S3Client::map_delete_bucket_policy_error(
3083 "bucket",
3084 BucketPolicyErrorKind::MissingPolicy,
3085 "NoSuchPolicy",
3086 );
3087 assert!(
3088 delete_missing_policy.is_ok(),
3089 "Missing policy should be treated as successful delete"
3090 );
3091 }
3092
3093 #[test]
3094 fn notification_filter_round_trip_prefix_and_suffix() {
3095 let filter = S3Client::build_notification_filter(Some("logs/"), Some(".json"))
3096 .expect("filter should be built");
3097 let (prefix, suffix) = S3Client::extract_notification_filter(Some(&filter));
3098 assert_eq!(prefix.as_deref(), Some("logs/"));
3099 assert_eq!(suffix.as_deref(), Some(".json"));
3100 }
3101
3102 #[test]
3103 fn notification_filter_none_when_empty() {
3104 assert!(S3Client::build_notification_filter(None, None).is_none());
3105 }
3106
3107 #[test]
3108 fn notifications_equivalent_ignores_order_and_duplicate_events() {
3109 let expected = vec![
3110 BucketNotification {
3111 id: Some("a".to_string()),
3112 target: NotificationTarget::Queue,
3113 arn: "arn:aws:sqs:us-east-1:123456789012:q".to_string(),
3114 events: vec![
3115 "s3:ObjectCreated:*".to_string(),
3116 "s3:ObjectCreated:*".to_string(),
3117 ],
3118 prefix: Some("images/".to_string()),
3119 suffix: Some(".jpg".to_string()),
3120 },
3121 BucketNotification {
3122 id: Some("b".to_string()),
3123 target: NotificationTarget::Topic,
3124 arn: "arn:aws:sns:us-east-1:123456789012:t".to_string(),
3125 events: vec!["s3:ObjectRemoved:*".to_string()],
3126 prefix: None,
3127 suffix: None,
3128 },
3129 ];
3130
3131 let actual = vec![
3132 BucketNotification {
3133 id: None,
3134 target: NotificationTarget::Topic,
3135 arn: "arn:aws:sns:us-east-1:123456789012:t".to_string(),
3136 events: vec!["s3:ObjectRemoved:*".to_string()],
3137 prefix: None,
3138 suffix: None,
3139 },
3140 BucketNotification {
3141 id: None,
3142 target: NotificationTarget::Queue,
3143 arn: "arn:aws:sqs:us-east-1:123456789012:q".to_string(),
3144 events: vec!["s3:ObjectCreated:*".to_string()],
3145 prefix: Some("images/".to_string()),
3146 suffix: Some(".jpg".to_string()),
3147 },
3148 ];
3149
3150 assert!(S3Client::notifications_equivalent(&expected, &actual));
3151 }
3152
3153 #[test]
3154 fn sdk_cors_rule_to_core_preserves_optional_fields() {
3155 let sdk_rule = aws_sdk_s3::types::CorsRule::builder()
3156 .id("web-app")
3157 .allowed_origins("https://app.example.com")
3158 .allowed_methods("get")
3159 .allowed_headers("Authorization")
3160 .expose_headers("ETag")
3161 .max_age_seconds(300)
3162 .build()
3163 .expect("build cors rule");
3164
3165 let rule = sdk_cors_rule_to_core(&sdk_rule);
3166 assert_eq!(rule.id.as_deref(), Some("web-app"));
3167 assert_eq!(
3168 rule.allowed_origins,
3169 vec!["https://app.example.com".to_string()]
3170 );
3171 assert_eq!(rule.allowed_methods, vec!["get".to_string()]);
3172 assert_eq!(
3173 rule.allowed_headers,
3174 Some(vec!["Authorization".to_string()])
3175 );
3176 assert_eq!(rule.expose_headers, Some(vec!["ETag".to_string()]));
3177 assert_eq!(rule.max_age_seconds, Some(300));
3178 }
3179
3180 #[test]
3181 fn sdk_cors_rule_to_core_drops_empty_optional_headers() {
3182 let sdk_rule = aws_sdk_s3::types::CorsRule::builder()
3183 .allowed_origins("https://app.example.com")
3184 .allowed_methods("GET")
3185 .build()
3186 .expect("build cors rule");
3187
3188 let rule = sdk_cors_rule_to_core(&sdk_rule);
3189 assert_eq!(rule.allowed_headers, None);
3190 assert_eq!(rule.expose_headers, None);
3191 }
3192
3193 #[test]
3194 fn core_cors_rule_to_sdk_normalizes_method_case() {
3195 let rule = CorsRule {
3196 id: Some("public-read".to_string()),
3197 allowed_origins: vec!["*".to_string()],
3198 allowed_methods: vec!["get".to_string(), "post".to_string()],
3199 allowed_headers: Some(vec!["*".to_string()]),
3200 expose_headers: None,
3201 max_age_seconds: Some(600),
3202 };
3203
3204 let sdk_rule = core_cors_rule_to_sdk(&rule).expect("convert cors rule");
3205 assert_eq!(sdk_rule.id(), Some("public-read"));
3206 assert_eq!(sdk_rule.allowed_origins(), ["*"]);
3207 assert_eq!(sdk_rule.allowed_methods(), ["GET", "POST"]);
3208 assert_eq!(sdk_rule.allowed_headers(), ["*"]);
3209 assert_eq!(sdk_rule.max_age_seconds(), Some(600));
3210 }
3211
3212 #[tokio::test]
3213 async fn set_bucket_cors_sends_rule_fields() {
3214 let response = http::Response::builder()
3215 .status(200)
3216 .body(SdkBody::from(""))
3217 .expect("build put bucket cors response");
3218 let (client, request_receiver) = test_s3_client(Some(response));
3219
3220 client
3221 .set_bucket_cors(
3222 "bucket",
3223 vec![CorsRule {
3224 id: Some("web-app".to_string()),
3225 allowed_origins: vec!["https://app.example.com".to_string()],
3226 allowed_methods: vec!["GET".to_string(), "POST".to_string()],
3227 allowed_headers: Some(vec!["Authorization".to_string()]),
3228 expose_headers: Some(vec!["ETag".to_string()]),
3229 max_age_seconds: Some(600),
3230 }],
3231 )
3232 .await
3233 .expect("set bucket cors");
3234
3235 let request = request_receiver.expect_request();
3236 assert_eq!(request.method(), http::Method::PUT);
3237 assert!(
3238 request.uri().to_string().contains("?cors"),
3239 "expected bucket CORS subresource in URI: {}",
3240 request.uri()
3241 );
3242
3243 let body = request.body().bytes().expect("request body bytes");
3244 let body = std::str::from_utf8(body).expect("request body is utf8");
3245 assert!(body.contains("<ID>web-app</ID>"));
3246 assert!(body.contains("<AllowedOrigin>https://app.example.com</AllowedOrigin>"));
3247 assert!(body.contains("<AllowedMethod>GET</AllowedMethod>"));
3248 assert!(body.contains("<AllowedMethod>POST</AllowedMethod>"));
3249 assert!(body.contains("<AllowedHeader>Authorization</AllowedHeader>"));
3250 assert!(body.contains("<ExposeHeader>ETag</ExposeHeader>"));
3251 assert!(body.contains("<MaxAgeSeconds>600</MaxAgeSeconds>"));
3252 }
3253
3254 #[test]
3255 fn core_cors_rule_to_sdk_drops_empty_optional_headers() {
3256 let rule = CorsRule {
3257 id: None,
3258 allowed_origins: vec!["https://app.example.com".to_string()],
3259 allowed_methods: vec!["GET".to_string()],
3260 allowed_headers: Some(Vec::new()),
3261 expose_headers: Some(Vec::new()),
3262 max_age_seconds: None,
3263 };
3264
3265 let sdk_rule = core_cors_rule_to_sdk(&rule).expect("convert cors rule");
3266 assert!(sdk_rule.allowed_headers().is_empty());
3267 assert!(sdk_rule.expose_headers().is_empty());
3268 }
3269
3270 #[test]
3271 fn parse_cors_configuration_xml_round_trips_rule_fields() {
3272 let body = r#"
3273<CORSConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
3274 <CORSRule>
3275 <ID>mc-rule</ID>
3276 <AllowedOrigin>https://console.example.com</AllowedOrigin>
3277 <AllowedMethod>GET</AllowedMethod>
3278 <AllowedMethod>POST</AllowedMethod>
3279 <AllowedHeader>*</AllowedHeader>
3280 <ExposeHeader>ETag</ExposeHeader>
3281 <MaxAgeSeconds>1200</MaxAgeSeconds>
3282 </CORSRule>
3283</CORSConfiguration>
3284"#;
3285
3286 let rules = parse_cors_configuration_xml(body).expect("parse cors xml");
3287 assert_eq!(rules.len(), 1);
3288 assert_eq!(rules[0].id.as_deref(), Some("mc-rule"));
3289 assert_eq!(
3290 rules[0].allowed_origins,
3291 vec!["https://console.example.com".to_string()]
3292 );
3293 assert_eq!(
3294 rules[0].allowed_methods,
3295 vec!["GET".to_string(), "POST".to_string()]
3296 );
3297 assert_eq!(rules[0].allowed_headers, Some(vec!["*".to_string()]));
3298 assert_eq!(rules[0].expose_headers, Some(vec!["ETag".to_string()]));
3299 assert_eq!(rules[0].max_age_seconds, Some(1200));
3300 }
3301
3302 #[test]
3303 fn cors_url_uses_path_style_bucket_and_query() {
3304 let (client, _) = test_s3_client(None);
3305
3306 let url = client.cors_url("bucket-name").expect("build cors url");
3307
3308 assert_eq!(url.as_str(), "https://example.com/bucket-name?cors=");
3309 }
3310
3311 #[test]
3312 fn cors_url_rejects_endpoints_without_path_segments() {
3313 let (client, _) = test_s3_client_with_endpoint("mailto:test@example.com", None);
3314
3315 match client.cors_url("bucket-name") {
3316 Err(Error::Network(message)) => {
3317 assert!(message.contains("does not support path-style bucket operations"));
3318 }
3319 other => panic!("expected path-style endpoint error, got {other:?}"),
3320 }
3321 }
3322
3323 #[test]
3324 fn missing_cors_configuration_errors_are_detected() {
3325 assert!(is_missing_cors_configuration_error(
3326 "NoSuchCORSConfiguration"
3327 ));
3328 assert!(is_missing_cors_configuration_error(
3329 "The CORS configuration does not exist"
3330 ));
3331 assert!(!is_missing_cors_configuration_error("AccessDenied"));
3332 }
3333
3334 #[test]
3335 fn missing_cors_configuration_response_detects_code_and_status() {
3336 assert!(is_missing_cors_configuration_response(
3337 Some("NoSuchCORSConfiguration"),
3338 Some(404),
3339 "service error"
3340 ));
3341 assert!(is_missing_cors_configuration_response(
3342 None,
3343 Some(404),
3344 "The CORS configuration does not exist"
3345 ));
3346 assert!(!is_missing_cors_configuration_response(
3347 Some("AccessDenied"),
3348 Some(403),
3349 "access denied"
3350 ));
3351 assert!(!is_missing_cors_configuration_response(
3352 None,
3353 Some(404),
3354 "service error"
3355 ));
3356 assert!(!is_missing_cors_configuration_response(
3357 Some("NoSuchBucket"),
3358 Some(404),
3359 "NoSuchBucket"
3360 ));
3361 assert!(is_missing_cors_configuration_response(
3362 None,
3363 None,
3364 "The CORS configuration does not exist"
3365 ));
3366 assert!(!is_missing_cors_configuration_response(
3367 None,
3368 Some(500),
3369 "The CORS configuration does not exist"
3370 ));
3371 }
3372
3373 #[tokio::test]
3374 async fn get_bucket_cors_missing_configuration_returns_empty_rules() {
3375 let response = http::Response::builder()
3376 .status(404)
3377 .header("x-amz-error-code", "NoSuchCORSConfiguration")
3378 .body(SdkBody::from(
3379 r#"<?xml version="1.0" encoding="UTF-8"?>
3380<Error>
3381 <Code>NoSuchCORSConfiguration</Code>
3382 <Message>The CORS configuration does not exist</Message>
3383</Error>"#,
3384 ))
3385 .expect("build missing cors response");
3386 let (client, _) = test_s3_client(Some(response));
3387
3388 let rules = client
3389 .get_bucket_cors("bucket")
3390 .await
3391 .expect("missing cors config should be treated as empty");
3392
3393 assert!(rules.is_empty());
3394 }
3395
3396 #[tokio::test]
3397 async fn delete_bucket_cors_missing_configuration_is_successful() {
3398 let response = http::Response::builder()
3399 .status(404)
3400 .header("x-amz-error-code", "NoSuchCORSConfiguration")
3401 .body(SdkBody::from(
3402 r#"<?xml version="1.0" encoding="UTF-8"?>
3403<Error>
3404 <Code>NoSuchCORSConfiguration</Code>
3405 <Message>The CORS configuration does not exist</Message>
3406</Error>"#,
3407 ))
3408 .expect("build missing cors response");
3409 let (client, _) = test_s3_client(Some(response));
3410
3411 client
3412 .delete_bucket_cors("bucket")
3413 .await
3414 .expect("missing cors config should be treated as successful delete");
3415 }
3416
3417 #[tokio::test]
3418 async fn reqwest_connector_insecure_without_ca_bundle_succeeds() {
3419 let connector = ReqwestConnector::new(true, None).await;
3421 assert!(
3422 connector.is_ok(),
3423 "Expected insecure connector creation to succeed"
3424 );
3425 }
3426
3427 #[tokio::test]
3428 async fn reqwest_connector_invalid_ca_bundle_path_surfaces_error() {
3429 let result = ReqwestConnector::new(false, Some("")).await;
3431 match result {
3432 Err(Error::Network(msg)) => {
3433 assert!(
3434 msg.contains("Failed to read CA bundle"),
3435 "Unexpected error message: {msg}"
3436 );
3437 }
3438 other => panic!("Expected Error::Network for invalid path, got: {:?}", other),
3439 }
3440 }
3441
3442 #[test]
3443 fn should_use_multipart_for_large_files() {
3444 assert!(S3Client::should_use_multipart(
3445 SINGLE_PUT_OBJECT_MAX_SIZE + 1
3446 ));
3447 }
3448
3449 #[test]
3450 fn should_use_single_part_for_small_files() {
3451 assert!(!S3Client::should_use_multipart(0));
3452 assert!(!S3Client::should_use_multipart(1024 * 1024));
3453 assert!(!S3Client::should_use_multipart(
3454 crate::multipart::DEFAULT_PART_SIZE
3455 ));
3456 assert!(!S3Client::should_use_multipart(SINGLE_PUT_OBJECT_MAX_SIZE));
3457 }
3458
3459 #[tokio::test]
3460 async fn delete_object_with_force_delete_sets_rustfs_header() {
3461 let (client, request_receiver) = test_s3_client(None);
3462 let path = RemotePath::new("test", "bucket", "key.txt");
3463
3464 let _ = client
3465 .delete_object_with_options(&path, DeleteRequestOptions { force_delete: true })
3466 .await;
3467
3468 let request = request_receiver.expect_request();
3469 assert_eq!(request.headers().get("x-rustfs-force-delete"), Some("true"));
3470 }
3471
3472 #[tokio::test]
3473 async fn delete_object_without_force_delete_omits_rustfs_header() {
3474 let (client, request_receiver) = test_s3_client(None);
3475 let path = RemotePath::new("test", "bucket", "key.txt");
3476
3477 let _ = client
3478 .delete_object_with_options(&path, DeleteRequestOptions::default())
3479 .await;
3480
3481 let request = request_receiver.expect_request();
3482 assert!(request.headers().get("x-rustfs-force-delete").is_none());
3483 }
3484
3485 #[tokio::test]
3486 async fn delete_object_wrapper_uses_default_options_without_rustfs_header() {
3487 let (client, request_receiver) = test_s3_client(None);
3488 let path = RemotePath::new("test", "bucket", "key.txt");
3489
3490 let _ = ObjectStore::delete_object(&client, &path).await;
3491
3492 let request = request_receiver.expect_request();
3493 assert!(request.headers().get("x-rustfs-force-delete").is_none());
3494 }
3495
3496 #[tokio::test]
3497 async fn delete_object_with_options_maps_missing_keys_to_not_found() {
3498 let response = http::Response::builder()
3499 .status(404)
3500 .header("x-amz-error-code", "NoSuchKey")
3501 .body(SdkBody::from(
3502 r#"<?xml version="1.0" encoding="UTF-8"?>
3503<Error>
3504 <Code>NoSuchKey</Code>
3505 <Message>The specified key does not exist.</Message>
3506</Error>"#,
3507 ))
3508 .expect("build delete object response");
3509 let (client, _request_receiver) = test_s3_client(Some(response));
3510 let path = RemotePath::new("test", "bucket", "missing.txt");
3511
3512 let result = client
3513 .delete_object_with_options(&path, DeleteRequestOptions::default())
3514 .await;
3515
3516 match result {
3517 Err(Error::NotFound(message)) => assert_eq!(message, path.to_string()),
3518 other => panic!("Expected NotFound for missing key, got: {other:?}"),
3519 }
3520 }
3521
3522 #[tokio::test]
3523 async fn delete_object_with_options_maps_other_failures_to_network() {
3524 let response = http::Response::builder()
3525 .status(500)
3526 .header("x-amz-error-code", "InternalError")
3527 .body(SdkBody::from(
3528 r#"<?xml version="1.0" encoding="UTF-8"?>
3529<Error>
3530 <Code>InternalError</Code>
3531 <Message>Something went wrong.</Message>
3532</Error>"#,
3533 ))
3534 .expect("build delete object response");
3535 let (client, _request_receiver) = test_s3_client(Some(response));
3536 let path = RemotePath::new("test", "bucket", "key.txt");
3537
3538 let result = client
3539 .delete_object_with_options(&path, DeleteRequestOptions::default())
3540 .await;
3541
3542 match result {
3543 Err(Error::Network(message)) => assert!(message.contains("InternalError")),
3544 other => panic!("Expected Network for delete failure, got: {other:?}"),
3545 }
3546 }
3547
3548 #[tokio::test]
3549 async fn delete_bucket_maps_bucket_not_empty_to_conflict() {
3550 let response = http::Response::builder()
3551 .status(409)
3552 .header("x-amz-error-code", "BucketNotEmpty")
3553 .body(SdkBody::from(
3554 r#"<?xml version="1.0" encoding="UTF-8"?>
3555<Error>
3556 <Code>BucketNotEmpty</Code>
3557 <Message>The bucket you tried to delete is not empty.</Message>
3558</Error>"#,
3559 ))
3560 .expect("build delete bucket response");
3561 let (client, _request_receiver) = test_s3_client(Some(response));
3562
3563 let result = client.delete_bucket("bucket").await;
3564
3565 match result {
3566 Err(Error::Conflict(message)) => assert!(message.contains("BucketNotEmpty")),
3567 other => panic!("Expected Conflict for non-empty bucket, got: {other:?}"),
3568 }
3569 }
3570
3571 #[tokio::test]
3572 async fn delete_bucket_maps_missing_bucket_to_not_found() {
3573 let response = http::Response::builder()
3574 .status(404)
3575 .header("x-amz-error-code", "NoSuchBucket")
3576 .body(SdkBody::from(
3577 r#"<?xml version="1.0" encoding="UTF-8"?>
3578<Error>
3579 <Code>NoSuchBucket</Code>
3580 <Message>The specified bucket does not exist.</Message>
3581</Error>"#,
3582 ))
3583 .expect("build missing bucket response");
3584 let (client, _request_receiver) = test_s3_client(Some(response));
3585
3586 let result = client.delete_bucket("missing-bucket").await;
3587
3588 match result {
3589 Err(Error::NotFound(message)) => {
3590 assert_eq!(message, "Bucket not found: missing-bucket")
3591 }
3592 other => panic!("Expected NotFound for missing bucket, got: {other:?}"),
3593 }
3594 }
3595
3596 #[tokio::test]
3597 async fn delete_objects_with_force_delete_sets_rustfs_header() {
3598 let response = http::Response::builder()
3599 .status(200)
3600 .body(SdkBody::from(
3601 r#"<?xml version="1.0" encoding="UTF-8"?>
3602<DeleteResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/" />"#,
3603 ))
3604 .expect("build delete objects response");
3605 let (client, request_receiver) = test_s3_client(Some(response));
3606
3607 let _ = client
3608 .delete_objects_with_options(
3609 "bucket",
3610 vec!["key.txt".to_string()],
3611 DeleteRequestOptions { force_delete: true },
3612 )
3613 .await;
3614
3615 let request = request_receiver.expect_request();
3616 assert_eq!(request.headers().get("x-rustfs-force-delete"), Some("true"));
3617 }
3618
3619 #[tokio::test]
3620 async fn delete_objects_without_force_delete_omits_rustfs_header() {
3621 let response = http::Response::builder()
3622 .status(200)
3623 .body(SdkBody::from(
3624 r#"<?xml version="1.0" encoding="UTF-8"?>
3625<DeleteResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/" />"#,
3626 ))
3627 .expect("build delete objects response");
3628 let (client, request_receiver) = test_s3_client(Some(response));
3629
3630 let _ = client
3631 .delete_objects_with_options(
3632 "bucket",
3633 vec!["key.txt".to_string()],
3634 DeleteRequestOptions::default(),
3635 )
3636 .await;
3637
3638 let request = request_receiver.expect_request();
3639 assert!(request.headers().get("x-rustfs-force-delete").is_none());
3640 }
3641
3642 #[tokio::test]
3643 async fn delete_objects_wrapper_uses_default_options_without_rustfs_header() {
3644 let response = http::Response::builder()
3645 .status(200)
3646 .body(SdkBody::from(
3647 r#"<?xml version="1.0" encoding="UTF-8"?>
3648<DeleteResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/" />"#,
3649 ))
3650 .expect("build delete objects response");
3651 let (client, request_receiver) = test_s3_client(Some(response));
3652
3653 let _ = ObjectStore::delete_objects(&client, "bucket", vec!["key.txt".to_string()]).await;
3654
3655 let request = request_receiver.expect_request();
3656 assert!(request.headers().get("x-rustfs-force-delete").is_none());
3657 }
3658
3659 #[tokio::test]
3660 async fn delete_objects_with_empty_keys_skips_http_request() {
3661 let (client, request_receiver) = test_s3_client(None);
3662
3663 let deleted = client
3664 .delete_objects_with_options("bucket", Vec::new(), DeleteRequestOptions::default())
3665 .await
3666 .expect("empty delete should succeed");
3667
3668 assert!(deleted.is_empty());
3669 request_receiver.expect_no_request();
3670 }
3671
3672 #[tokio::test]
3673 async fn delete_objects_with_partial_errors_returns_deleted_keys() {
3674 let response = http::Response::builder()
3675 .status(200)
3676 .body(SdkBody::from(
3677 r#"<?xml version="1.0" encoding="UTF-8"?>
3678<DeleteResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
3679 <Deleted>
3680 <Key>kept.txt</Key>
3681 </Deleted>
3682 <Error>
3683 <Key>failed.txt</Key>
3684 <Code>AccessDenied</Code>
3685 <Message>Access Denied</Message>
3686 </Error>
3687</DeleteResult>"#,
3688 ))
3689 .expect("build partial delete response");
3690 let (client, request_receiver) = test_s3_client(Some(response));
3691
3692 let deleted = client
3693 .delete_objects_with_options(
3694 "bucket",
3695 vec!["kept.txt".to_string(), "failed.txt".to_string()],
3696 DeleteRequestOptions::default(),
3697 )
3698 .await
3699 .expect("partial delete should still return deleted keys");
3700
3701 let request = request_receiver.expect_request();
3702 assert_eq!(request.uri(), "https://example.com/bucket/?delete");
3703 assert_eq!(deleted, vec!["kept.txt".to_string()]);
3704 }
3705
3706 #[tokio::test]
3707 async fn read_next_part_fills_buffer_until_eof() {
3708 use tokio::io::AsyncWriteExt;
3709
3710 let temp_dir = tempfile::tempdir().expect("create temp dir");
3711 let file_path = temp_dir.path().join("payload.bin");
3712 let mut writer = tokio::fs::File::create(&file_path)
3713 .await
3714 .expect("create temp file");
3715 writer
3716 .write_all(b"abcdefghij")
3717 .await
3718 .expect("write temp file");
3719 writer.flush().await.expect("flush temp file");
3720 drop(writer);
3721
3722 let mut reader = tokio::fs::File::open(&file_path)
3723 .await
3724 .expect("open temp file");
3725 let mut buffer = vec![0u8; 8];
3726
3727 let first = S3Client::read_next_part(&mut reader, &file_path, &mut buffer)
3728 .await
3729 .expect("first read");
3730 assert_eq!(first, 8);
3731 assert_eq!(&buffer[..first], b"abcdefgh");
3732
3733 let second = S3Client::read_next_part(&mut reader, &file_path, &mut buffer)
3734 .await
3735 .expect("second read");
3736 assert_eq!(second, 2);
3737 assert_eq!(&buffer[..second], b"ij");
3738
3739 let third = S3Client::read_next_part(&mut reader, &file_path, &mut buffer)
3740 .await
3741 .expect("third read");
3742 assert_eq!(third, 0);
3743 }
3744}