1use async_trait::async_trait;
6use aws_credential_types::Credentials;
7use aws_sdk_s3::error::ProvideErrorMetadata;
8use aws_sigv4::http_request::{
9 SignableBody, SignableRequest, SignatureLocation, SigningSettings, sign,
10};
11use aws_sigv4::sign::v4;
12use aws_smithy_runtime_api::box_error::BoxError;
13use aws_smithy_runtime_api::client::http::{
14 HttpClient, HttpConnector, HttpConnectorFuture, HttpConnectorSettings, SharedHttpConnector,
15};
16use aws_smithy_runtime_api::client::interceptors::Intercept;
17use aws_smithy_runtime_api::client::interceptors::context::BeforeTransmitInterceptorContextMut;
18use aws_smithy_runtime_api::client::orchestrator::HttpRequest;
19use aws_smithy_runtime_api::client::result::ConnectorError;
20use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
21use aws_smithy_runtime_api::http::{Response, StatusCode};
22use aws_smithy_types::body::SdkBody;
23use aws_smithy_types::config_bag::ConfigBag;
24use bytes::Bytes;
25use jiff::Timestamp;
26use quick_xml::de::from_str as from_xml_str;
27use rc_core::{
28 Alias, BucketEncryption, BucketNotification, Capabilities, CorsRule, Error, LifecycleRule,
29 ListOptions, ListResult, NotificationTarget, ObjectEncryptionRequest, ObjectInfo, ObjectStore,
30 ObjectVersion, ObjectVersionListResult, RemotePath, ReplicationConfiguration, RequestHeader,
31 Result, SelectOptions, global_request_headers,
32};
33use reqwest::Method;
34use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderName, HeaderValue};
35use serde::Deserialize;
36use sha2::{Digest, Sha256};
37use std::collections::HashMap;
38use tokio::io::AsyncReadExt;
39use tokio::io::AsyncWrite;
40
41const SINGLE_PUT_OBJECT_MAX_SIZE: u64 = crate::multipart::DEFAULT_PART_SIZE;
44const S3_SERVICE_NAME: &str = "s3";
45const S3_REPLICATION_XML_NAMESPACE: &str = "http://s3.amazonaws.com/doc/2006-03-01/";
46const RUSTFS_FORCE_DELETE_HEADER: &str = "x-rustfs-force-delete";
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49enum BucketPolicyErrorKind {
50 MissingPolicy,
51 MissingBucket,
52 Other,
53}
54
55#[derive(Debug, Clone)]
58struct ReqwestConnector {
59 client: reqwest::Client,
60}
61
62impl ReqwestConnector {
63 async fn new(
64 insecure: bool,
65 ca_bundle: Option<&str>,
66 client_cert: Option<&str>,
67 client_key: Option<&str>,
68 ) -> Result<Self> {
69 let client = build_reqwest_client(insecure, ca_bundle, client_cert, client_key).await?;
70 Ok(Self { client })
71 }
72}
73
74async fn build_reqwest_client(
75 insecure: bool,
76 ca_bundle: Option<&str>,
77 client_cert: Option<&str>,
78 client_key: Option<&str>,
79) -> Result<reqwest::Client> {
80 let mut builder = reqwest::Client::builder().danger_accept_invalid_certs(insecure);
84
85 if let Some(bundle_path) = ca_bundle {
86 let pem = tokio::fs::read(bundle_path).await.map_err(|e| {
88 Error::Network(format!("Failed to read CA bundle '{bundle_path}': {e}"))
89 })?;
90 let cert = reqwest::Certificate::from_pem(&pem)
91 .map_err(|e| Error::Network(format!("Invalid CA bundle '{bundle_path}': {e}")))?;
92 builder = builder.add_root_certificate(cert);
93 }
94
95 if let (Some(cert_path), Some(key_path)) = (client_cert, client_key) {
96 let mut identity_pem = tokio::fs::read(cert_path).await.map_err(|e| {
97 Error::Network(format!(
98 "Failed to read client certificate '{cert_path}': {e}"
99 ))
100 })?;
101 let key_pem = tokio::fs::read(key_path)
102 .await
103 .map_err(|e| Error::Network(format!("Failed to read client key '{key_path}': {e}")))?;
104 identity_pem.extend_from_slice(b"\n");
105 identity_pem.extend_from_slice(&key_pem);
106 let identity = reqwest::Identity::from_pem(&identity_pem)
107 .map_err(|e| Error::Network(format!("Invalid client certificate/key identity: {e}")))?;
108 builder = builder.use_rustls_tls().identity(identity);
109 }
110
111 let client = builder
112 .build()
113 .map_err(|e| Error::Network(format!("Failed to build HTTP client: {e}")))?;
114 Ok(client)
115}
116
117fn force_path_style_for_alias(alias: &Alias) -> bool {
118 match alias.bucket_lookup.as_str() {
119 "path" => true,
120 "dns" => false,
121 "auto" => !is_aliyun_oss_service_endpoint(&alias.endpoint),
122 _ => true,
123 }
124}
125
126fn is_aliyun_oss_service_endpoint(endpoint: &str) -> bool {
127 let Ok(url) = reqwest::Url::parse(endpoint.trim_end_matches('/')) else {
128 return false;
129 };
130
131 let Some(host) = url.host_str() else {
132 return false;
133 };
134
135 host.strip_suffix(".aliyuncs.com")
136 .and_then(|host| host.split('.').next())
137 .is_some_and(|first_label| first_label.starts_with("oss-"))
138}
139
140#[derive(Debug, Deserialize)]
141#[serde(rename_all = "PascalCase")]
142struct ReplicationConfigurationXml {
143 role: Option<String>,
144 #[serde(rename = "Rule", default)]
145 rules: Vec<ReplicationRuleXml>,
146}
147
148#[derive(Debug, Deserialize)]
149#[serde(rename_all = "PascalCase")]
150struct ReplicationRuleXml {
151 #[serde(rename = "ID")]
152 id: Option<String>,
153 priority: Option<i32>,
154 status: Option<String>,
155 #[serde(rename = "Prefix")]
156 legacy_prefix: Option<String>,
157 filter: Option<ReplicationFilterXml>,
158 destination: Option<ReplicationDestinationXml>,
159 delete_marker_replication: Option<ReplicationStatusXml>,
160 existing_object_replication: Option<ReplicationStatusXml>,
161 delete_replication: Option<ReplicationStatusXml>,
162}
163
164#[derive(Debug, Deserialize)]
165#[serde(rename_all = "PascalCase")]
166struct ReplicationFilterXml {
167 prefix: Option<String>,
168 tag: Option<TagXml>,
169 and: Option<ReplicationAndXml>,
170}
171
172#[derive(Debug, Deserialize)]
173#[serde(rename_all = "PascalCase")]
174struct ReplicationAndXml {
175 prefix: Option<String>,
176 #[serde(rename = "Tag", default)]
177 tags: Vec<TagXml>,
178}
179
180#[derive(Debug, Deserialize)]
181#[serde(rename_all = "PascalCase")]
182struct TagXml {
183 key: Option<String>,
184 value: Option<String>,
185}
186
187#[derive(Debug, Deserialize)]
188#[serde(rename_all = "PascalCase")]
189struct ReplicationDestinationXml {
190 bucket: Option<String>,
191 storage_class: Option<String>,
192}
193
194#[derive(Debug, Deserialize)]
195#[serde(rename_all = "PascalCase")]
196struct ReplicationStatusXml {
197 status: Option<String>,
198}
199
200#[derive(Debug, Deserialize)]
201#[serde(rename_all = "PascalCase")]
202struct CorsConfigurationXml {
203 #[serde(rename = "CORSRule", default)]
204 rules: Vec<CorsRuleXml>,
205}
206
207#[derive(Debug, Deserialize)]
208#[serde(rename_all = "PascalCase")]
209struct CorsRuleXml {
210 #[serde(rename = "ID")]
211 id: Option<String>,
212 #[serde(rename = "AllowedOrigin", default)]
213 allowed_origins: Vec<String>,
214 #[serde(rename = "AllowedMethod", default)]
215 allowed_methods: Vec<String>,
216 #[serde(rename = "AllowedHeader", default)]
217 allowed_headers: Vec<String>,
218 #[serde(rename = "ExposeHeader", default)]
219 expose_headers: Vec<String>,
220 max_age_seconds: Option<i32>,
221}
222
223fn parse_replication_status(status: Option<&ReplicationStatusXml>) -> Option<bool> {
224 status
225 .and_then(|value| value.status.as_deref())
226 .map(|value| value.eq_ignore_ascii_case("enabled"))
227}
228
229fn parse_replication_rule_status(status: Option<&str>) -> rc_core::ReplicationRuleStatus {
230 match status {
231 Some(value) if value.eq_ignore_ascii_case("enabled") => {
232 rc_core::ReplicationRuleStatus::Enabled
233 }
234 _ => rc_core::ReplicationRuleStatus::Disabled,
235 }
236}
237
238fn collect_tag_map<'a, I>(tags: I) -> Option<HashMap<String, String>>
239where
240 I: IntoIterator<Item = (&'a str, &'a str)>,
241{
242 let collected: HashMap<String, String> = tags
243 .into_iter()
244 .map(|(key, value)| (key.to_string(), value.to_string()))
245 .collect();
246 if collected.is_empty() {
247 None
248 } else {
249 Some(collected)
250 }
251}
252
253fn parse_tag_xml(tag: Option<&TagXml>) -> Option<HashMap<String, String>> {
254 collect_tag_map(tag.and_then(|tag| Some((tag.key.as_deref()?, tag.value.as_deref()?))))
255}
256
257fn parse_tag_xmls(tags: &[TagXml]) -> Option<HashMap<String, String>> {
258 collect_tag_map(
259 tags.iter()
260 .filter_map(|tag| Some((tag.key.as_deref()?, tag.value.as_deref()?))),
261 )
262}
263
264fn parse_replication_filter_prefix(filter: Option<&ReplicationFilterXml>) -> Option<String> {
265 filter
266 .and_then(|filter| filter.prefix.clone())
267 .or_else(|| filter.and_then(|filter| filter.and.as_ref()?.prefix.clone()))
268}
269
270fn parse_replication_filter_tags(
271 filter: Option<&ReplicationFilterXml>,
272) -> Option<HashMap<String, String>> {
273 filter
274 .and_then(|filter| parse_tag_xml(filter.tag.as_ref()))
275 .or_else(|| filter.and_then(|filter| parse_tag_xmls(&filter.and.as_ref()?.tags)))
276}
277
278fn sorted_tags(tags: &HashMap<String, String>) -> Vec<(&str, &str)> {
279 let mut pairs: Vec<(&str, &str)> = tags
280 .iter()
281 .map(|(key, value)| (key.as_str(), value.as_str()))
282 .collect();
283 pairs.sort_unstable();
284 pairs
285}
286
287fn append_tag_xml(xml: &mut String, key: &str, value: &str) {
288 xml.push_str("<Tag><Key>");
289 xml.push_str(&xml_escape(key));
290 xml.push_str("</Key><Value>");
291 xml.push_str(&xml_escape(value));
292 xml.push_str("</Value></Tag>");
293}
294
295fn append_replication_filter_xml(
296 xml: &mut String,
297 prefix: Option<&str>,
298 tags: Option<&HashMap<String, String>>,
299) {
300 let Some(tags) = tags.filter(|tags| !tags.is_empty()) else {
301 if let Some(prefix) = prefix {
302 xml.push_str("<Filter><Prefix>");
303 xml.push_str(&xml_escape(prefix));
304 xml.push_str("</Prefix></Filter>");
305 }
306 return;
307 };
308
309 xml.push_str("<Filter>");
310 if prefix.is_some() || tags.len() > 1 {
311 xml.push_str("<And>");
312 if let Some(prefix) = prefix {
313 xml.push_str("<Prefix>");
314 xml.push_str(&xml_escape(prefix));
315 xml.push_str("</Prefix>");
316 }
317 for (key, value) in sorted_tags(tags) {
318 append_tag_xml(xml, key, value);
319 }
320 xml.push_str("</And>");
321 } else if let Some((key, value)) = sorted_tags(tags).into_iter().next() {
322 append_tag_xml(xml, key, value);
323 }
324 xml.push_str("</Filter>");
325}
326
327fn normalize_optional_strings(values: Option<Vec<String>>) -> Option<Vec<String>> {
328 values.filter(|items| !items.is_empty())
329}
330
331fn is_missing_cors_configuration_error(error_text: &str) -> bool {
332 let normalized = error_text.to_ascii_lowercase();
333 normalized.contains("nosuchcorsconfiguration")
334 || normalized.contains("cors configuration does not exist")
335 || normalized.contains("the cors configuration does not exist")
336}
337
338fn is_missing_cors_configuration_response(
339 error_code: Option<&str>,
340 status_code: Option<u16>,
341 error_text: &str,
342) -> bool {
343 let error_code = error_code.map(|code| code.to_ascii_lowercase());
344 if matches!(error_code.as_deref(), Some("nosuchcorsconfiguration")) {
345 return true;
346 }
347
348 if !is_missing_cors_configuration_error(error_text) {
349 return false;
350 }
351
352 status_code.is_none_or(|status| status == 404)
353}
354
355fn sdk_cors_rule_to_core(rule: &aws_sdk_s3::types::CorsRule) -> CorsRule {
356 CorsRule {
357 id: rule.id().map(str::to_string),
358 allowed_origins: rule.allowed_origins().to_vec(),
359 allowed_methods: rule.allowed_methods().to_vec(),
360 allowed_headers: normalize_optional_strings(Some(rule.allowed_headers().to_vec())),
361 expose_headers: normalize_optional_strings(Some(rule.expose_headers().to_vec())),
362 max_age_seconds: rule.max_age_seconds(),
363 }
364}
365
366fn sdk_bucket_encryption_to_core(
367 value: &aws_sdk_s3::types::ServerSideEncryptionByDefault,
368) -> Result<BucketEncryption> {
369 match value.sse_algorithm() {
370 aws_sdk_s3::types::ServerSideEncryption::Aes256 => Ok(BucketEncryption::SseS3),
371 aws_sdk_s3::types::ServerSideEncryption::AwsKms => Ok(BucketEncryption::SseKms {
372 key_id: value.kms_master_key_id().map(ToString::to_string),
373 }),
374 other => Err(Error::General(format!(
375 "unsupported bucket encryption algorithm: {}",
376 other.as_str()
377 ))),
378 }
379}
380
381fn is_missing_bucket_encryption_error(error_text: &str) -> bool {
382 let normalized = error_text.to_ascii_lowercase();
383 normalized.contains("serversideencryptionconfigurationnotfounderror")
384 || normalized.contains("nosuchbucketencryption")
385 || normalized.contains("encryption configuration was not found")
386}
387
388fn is_missing_bucket_encryption_response(
389 error_code: Option<&str>,
390 status_code: Option<u16>,
391 error_text: &str,
392) -> bool {
393 let error_code = error_code.map(|code| code.to_ascii_lowercase());
394 if matches!(
395 error_code.as_deref(),
396 Some("serversideencryptionconfigurationnotfounderror" | "nosuchbucketencryption")
397 ) {
398 return true;
399 }
400
401 if !is_missing_bucket_encryption_error(error_text) {
402 return false;
403 }
404
405 status_code.is_none_or(|status| status == 404)
406}
407
408fn core_bucket_encryption_to_sdk(
409 value: &BucketEncryption,
410) -> aws_sdk_s3::types::ServerSideEncryptionConfiguration {
411 let encryption_by_default = match value {
412 BucketEncryption::SseS3 => aws_sdk_s3::types::ServerSideEncryptionByDefault::builder()
413 .sse_algorithm(aws_sdk_s3::types::ServerSideEncryption::Aes256)
414 .build()
415 .expect("sse-s3 bucket encryption configuration is valid"),
416 BucketEncryption::SseKms { key_id } => {
417 let mut builder = aws_sdk_s3::types::ServerSideEncryptionByDefault::builder()
418 .sse_algorithm(aws_sdk_s3::types::ServerSideEncryption::AwsKms);
419 if let Some(key_id) = key_id {
420 builder = builder.kms_master_key_id(key_id);
421 }
422 builder
423 .build()
424 .expect("sse-kms bucket encryption configuration is valid")
425 }
426 };
427
428 let rule = aws_sdk_s3::types::ServerSideEncryptionRule::builder()
429 .apply_server_side_encryption_by_default(encryption_by_default)
430 .build();
431
432 aws_sdk_s3::types::ServerSideEncryptionConfiguration::builder()
433 .rules(rule)
434 .build()
435 .expect("bucket encryption configuration requires one rule")
436}
437
438fn apply_object_encryption_to_put_request(
439 request: aws_sdk_s3::operation::put_object::builders::PutObjectFluentBuilder,
440 encryption: Option<&ObjectEncryptionRequest>,
441) -> aws_sdk_s3::operation::put_object::builders::PutObjectFluentBuilder {
442 match encryption {
443 Some(ObjectEncryptionRequest::SseS3) => {
444 request.server_side_encryption(aws_sdk_s3::types::ServerSideEncryption::Aes256)
445 }
446 Some(ObjectEncryptionRequest::SseKms { key_id }) => request
447 .server_side_encryption(aws_sdk_s3::types::ServerSideEncryption::AwsKms)
448 .ssekms_key_id(key_id),
449 None => request,
450 }
451}
452
453fn apply_object_encryption_to_copy_request(
454 request: aws_sdk_s3::operation::copy_object::builders::CopyObjectFluentBuilder,
455 encryption: Option<&ObjectEncryptionRequest>,
456) -> aws_sdk_s3::operation::copy_object::builders::CopyObjectFluentBuilder {
457 match encryption {
458 Some(ObjectEncryptionRequest::SseS3) => {
459 request.server_side_encryption(aws_sdk_s3::types::ServerSideEncryption::Aes256)
460 }
461 Some(ObjectEncryptionRequest::SseKms { key_id }) => request
462 .server_side_encryption(aws_sdk_s3::types::ServerSideEncryption::AwsKms)
463 .ssekms_key_id(key_id),
464 None => request,
465 }
466}
467
468fn core_cors_rule_to_sdk(rule: &CorsRule) -> Result<aws_sdk_s3::types::CorsRule> {
469 aws_sdk_s3::types::CorsRule::builder()
470 .set_id(rule.id.clone())
471 .set_allowed_origins(Some(rule.allowed_origins.clone()))
472 .set_allowed_methods(Some(
473 rule.allowed_methods
474 .iter()
475 .map(|method| method.to_ascii_uppercase())
476 .collect(),
477 ))
478 .set_allowed_headers(normalize_optional_strings(rule.allowed_headers.clone()))
479 .set_expose_headers(normalize_optional_strings(rule.expose_headers.clone()))
480 .set_max_age_seconds(rule.max_age_seconds)
481 .build()
482 .map_err(|e| Error::General(format!("build bucket cors rule: {e}")))
483}
484
485fn parse_cors_configuration_xml(body: &str) -> Result<Vec<CorsRule>> {
486 let config: CorsConfigurationXml =
487 from_xml_str(body).map_err(|e| Error::General(format!("parse bucket cors xml: {e}")))?;
488
489 Ok(config
490 .rules
491 .into_iter()
492 .map(|rule| CorsRule {
493 id: rule.id,
494 allowed_origins: rule.allowed_origins,
495 allowed_methods: rule.allowed_methods,
496 allowed_headers: normalize_optional_strings(Some(rule.allowed_headers)),
497 expose_headers: normalize_optional_strings(Some(rule.expose_headers)),
498 max_age_seconds: rule.max_age_seconds,
499 })
500 .collect())
501}
502
503fn parse_replication_configuration_xml(body: &str) -> Result<ReplicationConfiguration> {
504 let config: ReplicationConfigurationXml = from_xml_str(body)
505 .map_err(|e| Error::General(format!("parse replication config xml: {e}")))?;
506
507 let rules = config
508 .rules
509 .into_iter()
510 .map(|rule| rc_core::ReplicationRule {
511 id: rule.id.unwrap_or_default(),
512 priority: rule.priority.unwrap_or_default(),
513 status: parse_replication_rule_status(rule.status.as_deref()),
514 prefix: parse_replication_filter_prefix(rule.filter.as_ref()).or(rule.legacy_prefix),
515 tags: parse_replication_filter_tags(rule.filter.as_ref()),
516 destination: rc_core::ReplicationDestination {
517 bucket_arn: rule
518 .destination
519 .as_ref()
520 .and_then(|destination| destination.bucket.clone())
521 .unwrap_or_default(),
522 storage_class: rule
523 .destination
524 .and_then(|destination| destination.storage_class),
525 },
526 delete_marker_replication: parse_replication_status(
527 rule.delete_marker_replication.as_ref(),
528 ),
529 existing_object_replication: parse_replication_status(
530 rule.existing_object_replication.as_ref(),
531 ),
532 delete_replication: parse_replication_status(rule.delete_replication.as_ref()),
533 })
534 .collect();
535
536 Ok(ReplicationConfiguration {
537 role: config.role.unwrap_or_default(),
538 rules,
539 })
540}
541
542fn xml_escape(value: &str) -> String {
543 value
544 .replace('&', "&")
545 .replace('<', "<")
546 .replace('>', ">")
547 .replace('"', """)
548 .replace('\'', "'")
549}
550
551fn append_replication_status_tag(xml: &mut String, tag: &str, enabled: Option<bool>) {
552 if let Some(enabled) = enabled {
553 let status = if enabled { "Enabled" } else { "Disabled" };
554 xml.push('<');
555 xml.push_str(tag);
556 xml.push_str("><Status>");
557 xml.push_str(status);
558 xml.push_str("</Status></");
559 xml.push_str(tag);
560 xml.push('>');
561 }
562}
563
564fn build_replication_configuration_xml(config: &ReplicationConfiguration) -> String {
565 let mut xml = String::from(r#"<?xml version="1.0" encoding="UTF-8"?>"#);
566 xml.push_str(r#"<ReplicationConfiguration xmlns=""#);
567 xml.push_str(S3_REPLICATION_XML_NAMESPACE);
568 xml.push_str(r#"">"#);
569
570 if !config.role.is_empty() {
571 xml.push_str("<Role>");
572 xml.push_str(&xml_escape(&config.role));
573 xml.push_str("</Role>");
574 }
575
576 for rule in &config.rules {
577 xml.push_str("<Rule>");
578
579 xml.push_str("<Status>");
580 xml.push_str(match rule.status {
581 rc_core::ReplicationRuleStatus::Enabled => "Enabled",
582 rc_core::ReplicationRuleStatus::Disabled => "Disabled",
583 });
584 xml.push_str("</Status>");
585
586 xml.push_str("<Destination><Bucket>");
587 xml.push_str(&xml_escape(&rule.destination.bucket_arn));
588 xml.push_str("</Bucket>");
589 if let Some(storage_class) = &rule.destination.storage_class {
590 xml.push_str("<StorageClass>");
591 xml.push_str(&xml_escape(storage_class));
592 xml.push_str("</StorageClass>");
593 }
594 xml.push_str("</Destination>");
595
596 if !rule.id.is_empty() {
597 xml.push_str("<ID>");
598 xml.push_str(&xml_escape(&rule.id));
599 xml.push_str("</ID>");
600 }
601
602 xml.push_str("<Priority>");
603 xml.push_str(&rule.priority.to_string());
604 xml.push_str("</Priority>");
605
606 append_replication_filter_xml(&mut xml, rule.prefix.as_deref(), rule.tags.as_ref());
607
608 append_replication_status_tag(
609 &mut xml,
610 "ExistingObjectReplication",
611 rule.existing_object_replication,
612 );
613 append_replication_status_tag(
614 &mut xml,
615 "DeleteMarkerReplication",
616 rule.delete_marker_replication,
617 );
618 append_replication_status_tag(&mut xml, "DeleteReplication", rule.delete_replication);
619
620 xml.push_str("</Rule>");
621 }
622
623 xml.push_str("</ReplicationConfiguration>");
624 xml
625}
626
627fn parse_lifecycle_filter_prefix(
628 filter: Option<&aws_sdk_s3::types::LifecycleRuleFilter>,
629) -> Option<String> {
630 filter
631 .and_then(|filter| filter.prefix().map(str::to_string))
632 .or_else(|| filter.and_then(|filter| filter.and()?.prefix().map(str::to_string)))
633}
634
635fn parse_lifecycle_filter_tags(
636 filter: Option<&aws_sdk_s3::types::LifecycleRuleFilter>,
637) -> Option<HashMap<String, String>> {
638 filter
639 .and_then(|filter| collect_tag_map(filter.tag().map(|tag| (tag.key(), tag.value()))))
640 .or_else(|| {
641 filter.and_then(|filter| {
642 collect_tag_map(
643 filter
644 .and()?
645 .tags()
646 .iter()
647 .map(|tag| (tag.key(), tag.value())),
648 )
649 })
650 })
651}
652
653fn build_s3_tag(key: &str, value: &str) -> Result<aws_sdk_s3::types::Tag> {
654 aws_sdk_s3::types::Tag::builder()
655 .key(key)
656 .value(value)
657 .build()
658 .map_err(|error| Error::General(format!("build filter tag: {error}")))
659}
660
661fn build_lifecycle_rule_filter(
662 prefix: Option<&str>,
663 tags: Option<&HashMap<String, String>>,
664) -> Result<Option<aws_sdk_s3::types::LifecycleRuleFilter>> {
665 let Some(tags) = tags.filter(|tags| !tags.is_empty()) else {
666 return Ok(prefix.map(|prefix| {
667 aws_sdk_s3::types::LifecycleRuleFilter::builder()
668 .prefix(prefix)
669 .build()
670 }));
671 };
672
673 let tag_values = sorted_tags(tags)
674 .into_iter()
675 .map(|(key, value)| build_s3_tag(key, value))
676 .collect::<Result<Vec<_>>>()?;
677
678 let filter = if prefix.is_some() || tag_values.len() > 1 {
679 let mut and_builder = aws_sdk_s3::types::LifecycleRuleAndOperator::builder();
680 if let Some(prefix) = prefix {
681 and_builder = and_builder.prefix(prefix);
682 }
683 for tag in tag_values {
684 and_builder = and_builder.tags(tag);
685 }
686 aws_sdk_s3::types::LifecycleRuleFilter::builder()
687 .and(and_builder.build())
688 .build()
689 } else {
690 aws_sdk_s3::types::LifecycleRuleFilter::builder()
691 .tag(
692 tag_values
693 .into_iter()
694 .next()
695 .expect("non-empty tags required to build lifecycle filter"),
696 )
697 .build()
698 };
699
700 Ok(Some(filter))
701}
702
703impl HttpConnector for ReqwestConnector {
704 fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
705 let client = self.client.clone();
706 HttpConnectorFuture::new(async move {
707 let uri = request.uri().to_string();
709 let method_str = request.method().to_string();
710 let headers = request.headers().clone();
711
712 let body_bytes = match request.body().bytes() {
717 Some(b) => Bytes::copy_from_slice(b),
718 None => {
719 return Err(ConnectorError::user(
720 "Streaming request bodies are not supported in insecure/ca_bundle TLS mode; \
721 use in-memory data for uploads with this connector"
722 .into(),
723 ));
724 }
725 };
726
727 let method = reqwest::Method::from_bytes(method_str.as_bytes())
729 .map_err(|e| ConnectorError::user(Box::new(e)))?;
730
731 let url = reqwest::Url::parse(&uri).map_err(|e| ConnectorError::user(Box::new(e)))?;
733
734 let mut req = reqwest::Request::new(method, url);
736
737 for (name, value) in headers.iter() {
739 match (
740 reqwest::header::HeaderName::from_bytes(name.as_bytes()),
741 reqwest::header::HeaderValue::from_bytes(value.as_bytes()),
742 ) {
743 (Ok(header_name), Ok(header_value)) => {
744 req.headers_mut().append(header_name, header_value);
745 }
746 _ => {
747 tracing::warn!("Skipping non-convertible request header: {}", name);
748 }
749 }
750 }
751
752 *req.body_mut() = Some(reqwest::Body::from(body_bytes));
754
755 let resp = client
757 .execute(req)
758 .await
759 .map_err(|e| ConnectorError::io(Box::new(e)))?;
760
761 let status = StatusCode::try_from(resp.status().as_u16())
763 .map_err(|e| ConnectorError::other(Box::new(e), None))?;
764 let resp_headers = resp.headers().clone();
765 let body = resp
766 .bytes()
767 .await
768 .map_err(|e| ConnectorError::io(Box::new(e)))?;
769
770 let mut sdk_response = Response::new(status, SdkBody::from(body));
771 for (name, value) in &resp_headers {
772 match value.to_str() {
773 Ok(value_str) => {
774 sdk_response
775 .headers_mut()
776 .append(name.as_str().to_owned(), value_str.to_owned());
777 }
778 Err(_) => {
779 tracing::warn!("Skipping non-UTF8 response header: {}", name.as_str());
780 }
781 }
782 }
783
784 Ok(sdk_response)
785 })
786 }
787}
788
789impl HttpClient for ReqwestConnector {
790 fn http_connector(
791 &self,
792 _settings: &HttpConnectorSettings,
793 _components: &RuntimeComponents,
794 ) -> SharedHttpConnector {
795 SharedHttpConnector::new(self.clone())
801 }
802}
803
804pub struct S3Client {
806 inner: aws_sdk_s3::Client,
807 presign_inner: aws_sdk_s3::Client,
808 xml_http_client: reqwest::Client,
809 alias: Alias,
810 request_headers: Vec<RequestHeader>,
811}
812
813#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
815pub struct DeleteRequestOptions {
816 pub force_delete: bool,
818}
819
820#[derive(Debug, Clone)]
821struct CustomHeaderInterceptor {
822 headers: Vec<RequestHeader>,
823}
824
825impl Intercept for CustomHeaderInterceptor {
826 fn name(&self) -> &'static str {
827 "CustomHeaderInterceptor"
828 }
829
830 fn modify_before_signing(
831 &self,
832 context: &mut BeforeTransmitInterceptorContextMut<'_>,
833 _runtime_components: &RuntimeComponents,
834 _cfg: &mut ConfigBag,
835 ) -> std::result::Result<(), BoxError> {
836 let request = context.request_mut();
837 for header in &self.headers {
838 request
839 .headers_mut()
840 .try_insert(header.name.clone(), header.value.clone())
841 .map_err(|error| Box::new(error) as BoxError)?;
842 }
843 Ok(())
844 }
845}
846
847impl S3Client {
848 pub async fn new(alias: Alias) -> Result<Self> {
850 let endpoint = alias.endpoint.clone();
851 let region = alias.region.clone();
852 let access_key = alias.access_key.clone();
853 let secret_key = alias.secret_key.clone();
854
855 let mut config_loader = aws_config::defaults(aws_config::BehaviorVersion::latest())
857 .region(aws_config::Region::new(region))
858 .endpoint_url(&endpoint);
859
860 if alias.anonymous {
861 config_loader = config_loader.no_credentials();
862 } else {
863 let credentials = aws_credential_types::Credentials::new(
864 access_key,
865 secret_key,
866 None, None, "rc-static-credentials",
869 );
870 config_loader = config_loader.credentials_provider(credentials);
871 }
872
873 if alias.insecure
876 || alias.ca_bundle.is_some()
877 || (alias.client_cert.is_some() && alias.client_key.is_some())
878 {
879 let connector = ReqwestConnector::new(
880 alias.insecure,
881 alias.ca_bundle.as_deref(),
882 alias.client_cert.as_deref(),
883 alias.client_key.as_deref(),
884 )
885 .await?;
886 config_loader = config_loader.http_client(connector);
887 }
888
889 let xml_http_client = build_reqwest_client(
890 alias.insecure,
891 alias.ca_bundle.as_deref(),
892 alias.client_cert.as_deref(),
893 alias.client_key.as_deref(),
894 )
895 .await?;
896 let config = config_loader.load().await;
897
898 let request_headers = global_request_headers();
900 let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&config)
901 .force_path_style(force_path_style_for_alias(&alias))
902 .request_checksum_calculation(
905 aws_sdk_s3::config::RequestChecksumCalculation::WhenRequired,
906 )
907 .response_checksum_validation(
908 aws_sdk_s3::config::ResponseChecksumValidation::WhenRequired,
909 );
910
911 let presign_s3_config = s3_config_builder.clone().build();
912
913 if !request_headers.is_empty() {
914 s3_config_builder = s3_config_builder.interceptor(CustomHeaderInterceptor {
915 headers: request_headers.clone(),
916 });
917 }
918
919 let s3_config = s3_config_builder.build();
920
921 let client = aws_sdk_s3::Client::from_conf(s3_config);
922 let presign_client = aws_sdk_s3::Client::from_conf(presign_s3_config);
923
924 Ok(Self {
925 inner: client,
926 presign_inner: presign_client,
927 xml_http_client,
928 alias,
929 request_headers,
930 })
931 }
932
933 pub fn inner(&self) -> &aws_sdk_s3::Client {
935 &self.inner
936 }
937
938 pub async fn list_object_versions_page(
940 &self,
941 path: &RemotePath,
942 max_keys: Option<i32>,
943 ) -> Result<ObjectVersionListResult> {
944 let mut builder = self.inner.list_object_versions().bucket(&path.bucket);
945
946 if !path.key.is_empty() {
947 builder = builder.prefix(&path.key);
948 }
949
950 if let Some(max) = max_keys {
951 builder = builder.max_keys(max);
952 }
953
954 let response = builder.send().await.map_err(|e| {
955 let err_str = Self::format_sdk_error(&e);
956 if err_str.contains("NotFound") || err_str.contains("NoSuchBucket") {
957 Error::NotFound(format!("Bucket not found: {}", path.bucket))
958 } else {
959 Error::Network(err_str)
960 }
961 })?;
962
963 let mut items = Vec::new();
964
965 for v in response.versions() {
966 items.push(ObjectVersion {
967 key: v.key().unwrap_or_default().to_string(),
968 version_id: v.version_id().unwrap_or("null").to_string(),
969 is_latest: v.is_latest().unwrap_or(false),
970 is_delete_marker: false,
971 last_modified: v
972 .last_modified()
973 .and_then(|dt| Timestamp::from_second(dt.secs()).ok()),
974 size_bytes: v.size(),
975 etag: v.e_tag().map(|s| s.trim_matches('"').to_string()),
976 });
977 }
978
979 for m in response.delete_markers() {
980 items.push(ObjectVersion {
981 key: m.key().unwrap_or_default().to_string(),
982 version_id: m.version_id().unwrap_or("null").to_string(),
983 is_latest: m.is_latest().unwrap_or(false),
984 is_delete_marker: true,
985 last_modified: m
986 .last_modified()
987 .and_then(|dt| Timestamp::from_second(dt.secs()).ok()),
988 size_bytes: None,
989 etag: None,
990 });
991 }
992
993 items.sort_by(|a, b| {
994 a.key
995 .cmp(&b.key)
996 .then_with(|| b.last_modified.cmp(&a.last_modified))
997 });
998
999 Ok(ObjectVersionListResult {
1000 items,
1001 truncated: response.is_truncated().unwrap_or(false),
1002 continuation_token: response.next_key_marker().map(ToString::to_string),
1003 version_id_marker: response.next_version_id_marker().map(ToString::to_string),
1004 })
1005 }
1006
1007 pub async fn get_object_with_progress(
1009 &self,
1010 path: &RemotePath,
1011 mut on_progress: impl FnMut(u64, Option<u64>) + Send,
1012 ) -> Result<Vec<u8>> {
1013 let response = self
1014 .inner
1015 .get_object()
1016 .bucket(&path.bucket)
1017 .key(&path.key)
1018 .send()
1019 .await
1020 .map_err(|e| {
1021 let err_str = e.to_string();
1022 if err_str.contains("NotFound") || err_str.contains("NoSuchKey") {
1023 Error::NotFound(path.to_string())
1024 } else {
1025 Error::Network(err_str)
1026 }
1027 })?;
1028
1029 let content_length = response
1030 .content_length()
1031 .and_then(|length| u64::try_from(length).ok());
1032 let mut data = Vec::with_capacity(
1033 content_length
1034 .and_then(|length| usize::try_from(length).ok())
1035 .unwrap_or_default(),
1036 );
1037 let mut body = response.body;
1038 let mut bytes_downloaded = 0u64;
1039
1040 while let Some(chunk) = body
1041 .try_next()
1042 .await
1043 .map_err(|e| Error::Network(e.to_string()))?
1044 {
1045 bytes_downloaded += chunk.len() as u64;
1046 data.extend_from_slice(&chunk);
1047 on_progress(bytes_downloaded, content_length);
1048 }
1049
1050 Ok(data)
1051 }
1052
1053 pub async fn delete_object_with_options(
1055 &self,
1056 path: &RemotePath,
1057 options: DeleteRequestOptions,
1058 ) -> Result<()> {
1059 let mut request = self
1060 .inner
1061 .delete_object()
1062 .bucket(&path.bucket)
1063 .key(&path.key)
1064 .customize();
1065
1066 if options.force_delete {
1067 request = request.mutate_request(|request| {
1068 request
1069 .headers_mut()
1070 .insert(RUSTFS_FORCE_DELETE_HEADER, "true");
1071 });
1072 }
1073
1074 request.send().await.map_err(|e| {
1075 let err_str = Self::format_sdk_error(&e);
1076 let is_missing_key = if let aws_sdk_s3::error::SdkError::ServiceError(service_err) = &e
1077 {
1078 let code = service_err.err().code().or_else(|| {
1079 service_err
1080 .raw()
1081 .headers()
1082 .get("x-amz-error-code")
1083 .and_then(|value| std::str::from_utf8(value.as_bytes()).ok())
1084 });
1085 matches!(code, Some("NoSuchKey") | Some("NotFound"))
1086 || service_err.raw().status().as_u16() == 404
1087 } else {
1088 err_str.contains("NotFound") || err_str.contains("NoSuchKey")
1089 };
1090
1091 if is_missing_key {
1092 Error::NotFound(path.to_string())
1093 } else {
1094 Error::Network(err_str)
1095 }
1096 })?;
1097
1098 Ok(())
1099 }
1100
1101 pub async fn delete_objects_with_options(
1103 &self,
1104 bucket: &str,
1105 keys: Vec<String>,
1106 options: DeleteRequestOptions,
1107 ) -> Result<Vec<String>> {
1108 use aws_sdk_s3::types::{Delete, ObjectIdentifier};
1109
1110 if keys.is_empty() {
1111 return Ok(vec![]);
1112 }
1113
1114 let objects: Vec<ObjectIdentifier> =
1115 keys.iter()
1116 .map(|key| {
1117 ObjectIdentifier::builder().key(key).build().map_err(|e| {
1118 Error::General(format!("invalid delete object identifier: {e}"))
1119 })
1120 })
1121 .collect::<Result<Vec<_>>>()?;
1122
1123 let delete = Delete::builder()
1124 .set_objects(Some(objects))
1125 .build()
1126 .map_err(|e| Error::General(e.to_string()))?;
1127
1128 let mut request = self
1129 .inner
1130 .delete_objects()
1131 .bucket(bucket)
1132 .delete(delete)
1133 .customize();
1134
1135 if options.force_delete {
1136 request = request.mutate_request(|request| {
1137 request
1138 .headers_mut()
1139 .insert(RUSTFS_FORCE_DELETE_HEADER, "true");
1140 });
1141 }
1142
1143 let response = request
1144 .send()
1145 .await
1146 .map_err(|e| Error::Network(e.to_string()))?;
1147
1148 let deleted: Vec<String> = response
1149 .deleted()
1150 .iter()
1151 .filter_map(|d| d.key().map(|k| k.to_string()))
1152 .collect();
1153
1154 if !response.errors().is_empty() {
1155 let error_keys: Vec<String> = response
1156 .errors()
1157 .iter()
1158 .filter_map(|e| e.key().map(|k| k.to_string()))
1159 .collect();
1160 tracing::warn!("Failed to delete some objects: {:?}", error_keys);
1161 }
1162
1163 Ok(deleted)
1164 }
1165
1166 fn format_sdk_error<E: std::fmt::Display>(error: &aws_sdk_s3::error::SdkError<E>) -> String {
1168 match error {
1169 aws_sdk_s3::error::SdkError::ServiceError(service_err) => {
1170 let err = service_err.err();
1171 let meta = service_err.raw();
1172 let mut msg = format!("Service error: {}", err);
1173 if let Some(code) = meta.headers().get("x-amz-error-code")
1175 && let Ok(code_str) = std::str::from_utf8(code.as_bytes())
1176 {
1177 msg.push_str(&format!(" (code: {})", code_str));
1178 }
1179 msg
1180 }
1181 aws_sdk_s3::error::SdkError::ConstructionFailure(err) => {
1182 format!("Request construction failed: {:?}", err)
1183 }
1184 aws_sdk_s3::error::SdkError::TimeoutError(_) => "Request timeout".to_string(),
1185 aws_sdk_s3::error::SdkError::DispatchFailure(err) => {
1186 format!("Network dispatch error: {:?}", err)
1187 }
1188 aws_sdk_s3::error::SdkError::ResponseError(err) => {
1189 format!("Response error: {:?}", err)
1190 }
1191 _ => error.to_string(),
1192 }
1193 }
1194
1195 fn should_use_multipart(file_size: u64) -> bool {
1196 file_size > SINGLE_PUT_OBJECT_MAX_SIZE
1197 }
1198
1199 fn sha256_hash(body: &[u8]) -> String {
1200 let mut hasher = Sha256::new();
1201 hasher.update(body);
1202 hex::encode(hasher.finalize())
1203 }
1204
1205 fn request_host(&self, url: &reqwest::Url) -> Result<String> {
1206 let host = url
1207 .host_str()
1208 .ok_or_else(|| Error::Network("Missing host in request URL".to_string()))?;
1209 Ok(match url.port() {
1210 Some(port) => format!("{host}:{port}"),
1211 None => host.to_string(),
1212 })
1213 }
1214
1215 fn replication_url(&self, bucket: &str) -> Result<reqwest::Url> {
1216 let mut url =
1217 reqwest::Url::parse(self.alias.endpoint.trim_end_matches('/')).map_err(|e| {
1218 Error::Network(format!("Invalid endpoint '{}': {e}", self.alias.endpoint))
1219 })?;
1220
1221 {
1222 let mut segments = url.path_segments_mut().map_err(|_| {
1223 Error::Network(format!(
1224 "Endpoint '{}' does not support path-style bucket operations",
1225 self.alias.endpoint
1226 ))
1227 })?;
1228 segments.pop_if_empty();
1229 segments.push(bucket);
1230 }
1231
1232 url.set_query(Some("replication="));
1233 Ok(url)
1234 }
1235
1236 fn cors_url(&self, bucket: &str) -> Result<reqwest::Url> {
1237 let mut url =
1238 reqwest::Url::parse(self.alias.endpoint.trim_end_matches('/')).map_err(|e| {
1239 Error::Network(format!("Invalid endpoint '{}': {e}", self.alias.endpoint))
1240 })?;
1241
1242 {
1243 let mut segments = url.path_segments_mut().map_err(|_| {
1244 Error::Network(format!(
1245 "Endpoint '{}' does not support path-style bucket operations",
1246 self.alias.endpoint
1247 ))
1248 })?;
1249 segments.pop_if_empty();
1250 segments.push(bucket);
1251 }
1252
1253 url.set_query(Some("cors="));
1254 Ok(url)
1255 }
1256
1257 async fn sign_xml_request(
1258 &self,
1259 method: &Method,
1260 url: &str,
1261 headers: &HeaderMap,
1262 body: &[u8],
1263 ) -> Result<HeaderMap> {
1264 if self.alias.anonymous {
1265 return Ok(headers.clone());
1266 }
1267
1268 let credentials = Credentials::new(
1269 &self.alias.access_key,
1270 &self.alias.secret_key,
1271 None,
1272 None,
1273 "s3-xml-client",
1274 );
1275
1276 let identity = credentials.into();
1277 let mut signing_settings = SigningSettings::default();
1278 signing_settings.signature_location = SignatureLocation::Headers;
1279
1280 let signing_params = v4::SigningParams::builder()
1281 .identity(&identity)
1282 .region(&self.alias.region)
1283 .name(S3_SERVICE_NAME)
1284 .time(std::time::SystemTime::now())
1285 .settings(signing_settings)
1286 .build()
1287 .map_err(|e| Error::Auth(format!("Failed to build signing params: {e}")))?;
1288
1289 let header_pairs: Vec<(&str, &str)> = headers
1290 .iter()
1291 .filter_map(|(k, v)| v.to_str().ok().map(|v| (k.as_str(), v)))
1292 .collect();
1293
1294 let signable_request = SignableRequest::new(
1295 method.as_str(),
1296 url,
1297 header_pairs.into_iter(),
1298 SignableBody::Bytes(body),
1299 )
1300 .map_err(|e| Error::Auth(format!("Failed to create signable request: {e}")))?;
1301
1302 let (signing_instructions, _) = sign(signable_request, &signing_params.into())
1303 .map_err(|e| Error::Auth(format!("Failed to sign request: {e}")))?
1304 .into_parts();
1305
1306 let mut signed_headers = headers.clone();
1307 for (name, value) in signing_instructions.headers() {
1308 let header_name = HeaderName::try_from(name.to_string())
1309 .map_err(|e| Error::Auth(format!("Invalid header name: {e}")))?;
1310 let header_value = HeaderValue::try_from(value.to_string())
1311 .map_err(|e| Error::Auth(format!("Invalid header value: {e}")))?;
1312 signed_headers.insert(header_name, header_value);
1313 }
1314
1315 Ok(signed_headers)
1316 }
1317
1318 async fn xml_request(
1319 &self,
1320 method: Method,
1321 url: reqwest::Url,
1322 content_type: Option<&str>,
1323 body: Option<Vec<u8>>,
1324 ) -> Result<String> {
1325 let body = body.unwrap_or_default();
1326 let mut headers = HeaderMap::new();
1327 headers.insert(
1328 "x-amz-content-sha256",
1329 HeaderValue::from_str(&Self::sha256_hash(&body))
1330 .map_err(|e| Error::Auth(format!("Invalid content hash header: {e}")))?,
1331 );
1332 headers.insert(
1333 "host",
1334 HeaderValue::from_str(&self.request_host(&url)?)
1335 .map_err(|e| Error::Auth(format!("Invalid host header: {e}")))?,
1336 );
1337
1338 if let Some(content_type) = content_type {
1339 headers.insert(
1340 CONTENT_TYPE,
1341 HeaderValue::from_str(content_type)
1342 .map_err(|e| Error::Auth(format!("Invalid content type header: {e}")))?,
1343 );
1344 }
1345
1346 for header in &self.request_headers {
1347 let name = HeaderName::from_bytes(header.name.as_bytes())
1348 .map_err(|e| Error::Auth(format!("Invalid custom header name: {e}")))?;
1349 let value = HeaderValue::from_str(&header.value)
1350 .map_err(|e| Error::Auth(format!("Invalid custom header value: {e}")))?;
1351 headers.insert(name, value);
1352 }
1353
1354 let signed_headers = self
1355 .sign_xml_request(&method, url.as_str(), &headers, &body)
1356 .await?;
1357
1358 let mut request_builder = self.xml_http_client.request(method, url);
1359 for (name, value) in &signed_headers {
1360 request_builder = request_builder.header(name, value);
1361 }
1362 if !body.is_empty() {
1363 request_builder = request_builder.body(body);
1364 }
1365
1366 let response = request_builder
1367 .send()
1368 .await
1369 .map_err(|e| Error::Network(format!("Request failed: {e}")))?;
1370
1371 let status = response.status();
1372 let text = response
1373 .text()
1374 .await
1375 .map_err(|e| Error::Network(format!("Failed to read response: {e}")))?;
1376
1377 if !status.is_success() {
1378 return Err(Error::Network(format!(
1379 "HTTP {}: {}",
1380 status.as_u16(),
1381 text
1382 )));
1383 }
1384
1385 Ok(text)
1386 }
1387
1388 fn bucket_policy_error_kind(
1389 error_code: Option<&str>,
1390 status_code: Option<u16>,
1391 error_text: &str,
1392 ) -> BucketPolicyErrorKind {
1393 let error_code = error_code.map(|code| code.to_ascii_lowercase());
1394 if matches!(
1395 error_code.as_deref(),
1396 Some("nosuchbucketpolicy") | Some("nosuchpolicy")
1397 ) {
1398 return BucketPolicyErrorKind::MissingPolicy;
1399 }
1400 if matches!(error_code.as_deref(), Some("nosuchbucket")) {
1401 return BucketPolicyErrorKind::MissingBucket;
1402 }
1403
1404 let error_text = error_text.to_ascii_lowercase();
1405 if error_text.contains("nosuchbucketpolicy") || error_text.contains("nosuchpolicy") {
1406 return BucketPolicyErrorKind::MissingPolicy;
1407 }
1408 if error_text.contains("nosuchbucket") {
1409 return BucketPolicyErrorKind::MissingBucket;
1410 }
1411 if status_code == Some(404) {
1412 return BucketPolicyErrorKind::MissingPolicy;
1413 }
1414
1415 BucketPolicyErrorKind::Other
1416 }
1417
1418 fn map_get_bucket_policy_error(
1419 bucket: &str,
1420 kind: BucketPolicyErrorKind,
1421 error_text: &str,
1422 ) -> Result<Option<String>> {
1423 match kind {
1424 BucketPolicyErrorKind::MissingPolicy => Ok(None),
1425 BucketPolicyErrorKind::MissingBucket => {
1426 Err(Error::NotFound(format!("Bucket not found: {bucket}")))
1427 }
1428 BucketPolicyErrorKind::Other => {
1429 Err(Error::Network(format!("get_bucket_policy: {error_text}")))
1430 }
1431 }
1432 }
1433
1434 fn map_delete_bucket_policy_error(
1435 bucket: &str,
1436 kind: BucketPolicyErrorKind,
1437 error_text: &str,
1438 ) -> Result<()> {
1439 match kind {
1440 BucketPolicyErrorKind::MissingPolicy => Ok(()),
1441 BucketPolicyErrorKind::MissingBucket => {
1442 Err(Error::NotFound(format!("Bucket not found: {bucket}")))
1443 }
1444 BucketPolicyErrorKind::Other => Err(Error::General(format!(
1445 "delete_bucket_policy: {error_text}"
1446 ))),
1447 }
1448 }
1449
1450 fn extract_notification_filter(
1451 filter: Option<&aws_sdk_s3::types::NotificationConfigurationFilter>,
1452 ) -> (Option<String>, Option<String>) {
1453 let mut prefix = None;
1454 let mut suffix = None;
1455
1456 if let Some(key_filter) = filter.and_then(|value| value.key()) {
1457 for rule in key_filter.filter_rules() {
1458 match rule.name().map(|name| name.as_str()) {
1459 Some("prefix") => {
1460 prefix = rule.value().map(ToString::to_string);
1461 }
1462 Some("suffix") => {
1463 suffix = rule.value().map(ToString::to_string);
1464 }
1465 _ => {}
1466 }
1467 }
1468 }
1469
1470 (prefix, suffix)
1471 }
1472
1473 fn build_notification_filter(
1474 prefix: Option<&str>,
1475 suffix: Option<&str>,
1476 ) -> Option<aws_sdk_s3::types::NotificationConfigurationFilter> {
1477 use aws_sdk_s3::types::{FilterRule, FilterRuleName, NotificationConfigurationFilter};
1478
1479 let mut rules = Vec::new();
1480 if let Some(value) = prefix {
1481 let rule = FilterRule::builder()
1482 .name(FilterRuleName::Prefix)
1483 .value(value)
1484 .build();
1485 rules.push(rule);
1486 }
1487 if let Some(value) = suffix {
1488 let rule = FilterRule::builder()
1489 .name(FilterRuleName::Suffix)
1490 .value(value)
1491 .build();
1492 rules.push(rule);
1493 }
1494 if rules.is_empty() {
1495 return None;
1496 }
1497
1498 let key_filter = aws_sdk_s3::types::S3KeyFilter::builder()
1499 .set_filter_rules(Some(rules))
1500 .build();
1501 NotificationConfigurationFilter::builder()
1502 .key(key_filter)
1503 .build()
1504 .into()
1505 }
1506
1507 fn event_list_to_strings(events: &[aws_sdk_s3::types::Event]) -> Vec<String> {
1508 events
1509 .iter()
1510 .map(|event| event.as_str().to_string())
1511 .collect()
1512 }
1513
1514 fn strings_to_event_list(events: &[String]) -> Vec<aws_sdk_s3::types::Event> {
1515 events
1516 .iter()
1517 .map(|event| aws_sdk_s3::types::Event::from(event.as_str()))
1518 .collect()
1519 }
1520
1521 fn notifications_equivalent(
1522 expected: &[BucketNotification],
1523 actual: &[BucketNotification],
1524 ) -> bool {
1525 type CanonicalEntry = (u8, String, Option<String>, Option<String>, Vec<String>);
1526
1527 fn target_order(target: NotificationTarget) -> u8 {
1528 match target {
1529 NotificationTarget::Queue => 0,
1530 NotificationTarget::Topic => 1,
1531 NotificationTarget::Lambda => 2,
1532 }
1533 }
1534
1535 fn canonical(notifications: &[BucketNotification]) -> Vec<CanonicalEntry> {
1536 let mut normalized: Vec<CanonicalEntry> = notifications
1537 .iter()
1538 .map(|item| {
1539 let mut events = item.events.clone();
1540 events.sort();
1541 events.dedup();
1542 (
1543 target_order(item.target),
1544 item.arn.clone(),
1545 item.prefix.clone(),
1546 item.suffix.clone(),
1547 events,
1548 )
1549 })
1550 .collect();
1551 normalized.sort();
1552 normalized
1553 }
1554
1555 canonical(expected) == canonical(actual)
1556 }
1557
1558 async fn read_next_part(
1559 file: &mut tokio::fs::File,
1560 file_path: &std::path::Path,
1561 buffer: &mut [u8],
1562 ) -> Result<usize> {
1563 let mut total_read = 0usize;
1564 while total_read < buffer.len() {
1565 let bytes_read = file
1566 .read(&mut buffer[total_read..])
1567 .await
1568 .map_err(|e| Error::General(format!("read file '{}': {e}", file_path.display())))?;
1569 if bytes_read == 0 {
1570 break;
1571 }
1572 total_read += bytes_read;
1573 }
1574 Ok(total_read)
1575 }
1576
1577 async fn put_object_single_part_from_path(
1578 &self,
1579 path: &RemotePath,
1580 file_path: &std::path::Path,
1581 content_type: Option<&str>,
1582 file_size: u64,
1583 encryption: Option<&ObjectEncryptionRequest>,
1584 ) -> Result<ObjectInfo> {
1585 let data = tokio::fs::read(file_path)
1586 .await
1587 .map_err(|e| Error::General(format!("read file '{}': {e}", file_path.display())))?;
1588 let body = aws_sdk_s3::primitives::ByteStream::from(data);
1589
1590 let mut request = apply_object_encryption_to_put_request(
1591 self.inner
1592 .put_object()
1593 .bucket(&path.bucket)
1594 .key(&path.key)
1595 .body(body),
1596 encryption,
1597 );
1598
1599 if let Some(ct) = content_type {
1600 request = request.content_type(ct);
1601 }
1602
1603 let response = request
1604 .send()
1605 .await
1606 .map_err(|e| Error::Network(e.to_string()))?;
1607
1608 let mut info = ObjectInfo::file(&path.key, file_size as i64);
1609 if let Some(etag) = response.e_tag() {
1610 info.etag = Some(etag.trim_matches('"').to_string());
1611 }
1612 info.last_modified = Some(jiff::Timestamp::now());
1613
1614 Ok(info)
1615 }
1616
1617 async fn abort_multipart_upload_best_effort(&self, path: &RemotePath, upload_id: &str) {
1618 let _ = self
1619 .inner
1620 .abort_multipart_upload()
1621 .bucket(&path.bucket)
1622 .key(&path.key)
1623 .upload_id(upload_id)
1624 .send()
1625 .await;
1626 }
1627
1628 async fn put_object_multipart_from_path(
1629 &self,
1630 path: &RemotePath,
1631 file_path: &std::path::Path,
1632 content_type: Option<&str>,
1633 file_size: u64,
1634 encryption: Option<&ObjectEncryptionRequest>,
1635 on_progress: impl Fn(u64) + Send,
1636 ) -> Result<ObjectInfo> {
1637 use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart};
1638
1639 let config = crate::multipart::MultipartConfig::default();
1640 let part_size = config.calculate_part_size(file_size);
1641 let part_buffer_size = usize::try_from(part_size)
1642 .map_err(|_| Error::General(format!("invalid part size: {part_size}")))?;
1643
1644 tracing::debug!(file_size, part_size, "Starting multipart upload");
1645
1646 let mut create_request = self
1647 .inner
1648 .create_multipart_upload()
1649 .bucket(&path.bucket)
1650 .key(&path.key);
1651
1652 create_request = match encryption {
1653 Some(ObjectEncryptionRequest::SseS3) => create_request
1654 .server_side_encryption(aws_sdk_s3::types::ServerSideEncryption::Aes256),
1655 Some(ObjectEncryptionRequest::SseKms { key_id }) => create_request
1656 .server_side_encryption(aws_sdk_s3::types::ServerSideEncryption::AwsKms)
1657 .ssekms_key_id(key_id),
1658 None => create_request,
1659 };
1660
1661 if let Some(ct) = content_type {
1662 create_request = create_request.content_type(ct);
1663 }
1664
1665 let create_response = create_request
1666 .send()
1667 .await
1668 .map_err(|e| Error::Network(format!("create multipart upload: {e}")))?;
1669
1670 let upload_id = create_response
1671 .upload_id()
1672 .ok_or_else(|| Error::General("missing upload id from multipart upload".to_string()))?
1673 .to_string();
1674
1675 tracing::debug!(upload_id = %upload_id, "Multipart upload initiated");
1676
1677 let mut file = tokio::fs::File::open(file_path)
1678 .await
1679 .map_err(|e| Error::General(format!("open file '{}': {e}", file_path.display())))?;
1680 let mut completed_parts = Vec::new();
1681 let mut part_number: i32 = 1;
1682 let mut chunk = vec![0u8; part_buffer_size];
1683 let mut bytes_uploaded: u64 = 0;
1684
1685 loop {
1686 let bytes_read = Self::read_next_part(&mut file, file_path, &mut chunk).await?;
1687 if bytes_read == 0 {
1688 break;
1689 }
1690
1691 tracing::debug!(part_number, bytes_read, "Uploading part");
1692
1693 let body = aws_sdk_s3::primitives::ByteStream::from(chunk[..bytes_read].to_vec());
1694 let upload_part_result = self
1695 .inner
1696 .upload_part()
1697 .bucket(&path.bucket)
1698 .key(&path.key)
1699 .upload_id(&upload_id)
1700 .part_number(part_number)
1701 .body(body)
1702 .send()
1703 .await;
1704
1705 let upload_part_response = match upload_part_result {
1706 Ok(response) => response,
1707 Err(e) => {
1708 tracing::debug!(
1709 upload_id = %upload_id,
1710 part_number,
1711 "Aborting multipart upload due to error"
1712 );
1713 self.abort_multipart_upload_best_effort(path, &upload_id)
1714 .await;
1715 return Err(Error::Network(format!(
1716 "upload multipart part {part_number}: {e}"
1717 )));
1718 }
1719 };
1720
1721 let etag = match upload_part_response.e_tag() {
1722 Some(value) => value.trim_matches('"').to_string(),
1723 None => {
1724 self.abort_multipart_upload_best_effort(path, &upload_id)
1725 .await;
1726 return Err(Error::General(format!(
1727 "missing ETag for multipart part {part_number}"
1728 )));
1729 }
1730 };
1731
1732 completed_parts.push(
1733 CompletedPart::builder()
1734 .part_number(part_number)
1735 .e_tag(etag)
1736 .build(),
1737 );
1738
1739 bytes_uploaded += bytes_read as u64;
1740 on_progress(bytes_uploaded);
1741 tracing::debug!(part_number, bytes_uploaded, "Part uploaded");
1742
1743 part_number += 1;
1744 }
1745
1746 let completed_upload = CompletedMultipartUpload::builder()
1747 .set_parts(Some(completed_parts))
1748 .build();
1749 let complete_result = self
1750 .inner
1751 .complete_multipart_upload()
1752 .bucket(&path.bucket)
1753 .key(&path.key)
1754 .upload_id(&upload_id)
1755 .multipart_upload(completed_upload)
1756 .send()
1757 .await;
1758
1759 let complete_response = match complete_result {
1760 Ok(response) => response,
1761 Err(e) => {
1762 tracing::debug!(upload_id = %upload_id, "Attempting to abort multipart upload after completion failure");
1763 self.abort_multipart_upload_best_effort(path, &upload_id)
1764 .await;
1765 return Err(Error::Network(format!("complete multipart upload: {e}")));
1766 }
1767 };
1768
1769 tracing::debug!("Multipart upload completed");
1770
1771 let mut info = ObjectInfo::file(&path.key, file_size as i64);
1772 if let Some(etag) = complete_response.e_tag() {
1773 info.etag = Some(etag.trim_matches('"').to_string());
1774 }
1775 info.last_modified = Some(jiff::Timestamp::now());
1776
1777 Ok(info)
1778 }
1779
1780 pub async fn put_object_from_path(
1785 &self,
1786 path: &RemotePath,
1787 file_path: &std::path::Path,
1788 content_type: Option<&str>,
1789 encryption: Option<&ObjectEncryptionRequest>,
1790 on_progress: impl Fn(u64) + Send,
1791 ) -> Result<ObjectInfo> {
1792 let metadata = tokio::fs::metadata(file_path).await.map_err(|e| {
1793 Error::General(format!("read metadata for '{}': {e}", file_path.display()))
1794 })?;
1795 if !metadata.is_file() {
1796 return Err(Error::General(format!(
1797 "source is not a file: {}",
1798 file_path.display()
1799 )));
1800 }
1801
1802 let file_size = metadata.len();
1803 if Self::should_use_multipart(file_size) {
1804 self.put_object_multipart_from_path(
1805 path,
1806 file_path,
1807 content_type,
1808 file_size,
1809 encryption,
1810 on_progress,
1811 )
1812 .await
1813 } else {
1814 self.put_object_single_part_from_path(
1815 path,
1816 file_path,
1817 content_type,
1818 file_size,
1819 encryption,
1820 )
1821 .await
1822 }
1823 }
1824}
1825
1826fn build_tagging(
1827 tags: std::collections::HashMap<String, String>,
1828) -> Result<aws_sdk_s3::types::Tagging> {
1829 use aws_sdk_s3::types::{Tag, Tagging};
1830
1831 let mut tag_set = Vec::with_capacity(tags.len());
1832 for (key, value) in tags {
1833 let tag = Tag::builder()
1834 .key(key)
1835 .value(value)
1836 .build()
1837 .map_err(|e| Error::General(format!("invalid tag: {e}")))?;
1838 tag_set.push(tag);
1839 }
1840
1841 Tagging::builder()
1842 .set_tag_set(Some(tag_set))
1843 .build()
1844 .map_err(|e| Error::General(format!("invalid tagging payload: {e}")))
1845}
1846
1847#[async_trait]
1848impl ObjectStore for S3Client {
1849 async fn list_buckets(&self) -> Result<Vec<ObjectInfo>> {
1850 let response = self
1851 .inner
1852 .list_buckets()
1853 .send()
1854 .await
1855 .map_err(|e| Error::Network(Self::format_sdk_error(&e)))?;
1856
1857 let buckets = response
1858 .buckets()
1859 .iter()
1860 .map(|b| {
1861 let mut info = ObjectInfo::bucket(b.name().unwrap_or_default());
1862 if let Some(creation_date) = b.creation_date() {
1863 info.last_modified = jiff::Timestamp::from_second(creation_date.secs()).ok();
1864 }
1865 info
1866 })
1867 .collect();
1868
1869 Ok(buckets)
1870 }
1871
1872 async fn list_objects(&self, path: &RemotePath, options: ListOptions) -> Result<ListResult> {
1873 let mut request = self.inner.list_objects_v2().bucket(&path.bucket);
1874
1875 let prefix = if path.key.is_empty() {
1877 options.prefix.clone()
1878 } else if let Some(p) = &options.prefix {
1879 Some(format!("{}{}", path.key, p))
1880 } else {
1881 Some(path.key.clone())
1882 };
1883
1884 if let Some(p) = prefix {
1885 request = request.prefix(p);
1886 }
1887
1888 if !options.recursive {
1890 request = request.delimiter(options.delimiter.as_deref().unwrap_or("/"));
1891 }
1892
1893 if let Some(max) = options.max_keys {
1895 request = request.max_keys(max);
1896 }
1897
1898 if let Some(token) = &options.continuation_token {
1900 request = request.continuation_token(token);
1901 }
1902
1903 let response = request.send().await.map_err(|e| {
1904 let err_str = Self::format_sdk_error(&e);
1905 if err_str.contains("NotFound") || err_str.contains("NoSuchBucket") {
1906 Error::NotFound(format!("Bucket not found: {}", path.bucket))
1907 } else {
1908 Error::Network(err_str)
1909 }
1910 })?;
1911
1912 let mut items = Vec::new();
1913
1914 for prefix in response.common_prefixes() {
1916 if let Some(p) = prefix.prefix() {
1917 items.push(ObjectInfo::dir(p));
1918 }
1919 }
1920
1921 for object in response.contents() {
1923 let key = object.key().unwrap_or_default().to_string();
1924 let size = object.size().unwrap_or(0);
1925 let mut info = ObjectInfo::file(&key, size);
1926
1927 if let Some(modified) = object.last_modified() {
1928 info.last_modified = jiff::Timestamp::from_second(modified.secs()).ok();
1929 }
1930
1931 if let Some(etag) = object.e_tag() {
1932 info.etag = Some(etag.trim_matches('"').to_string());
1933 }
1934
1935 if let Some(sc) = object.storage_class() {
1936 info.storage_class = Some(sc.as_str().to_string());
1937 }
1938
1939 items.push(info);
1940 }
1941
1942 Ok(ListResult {
1943 items,
1944 truncated: response.is_truncated().unwrap_or(false),
1945 continuation_token: response.next_continuation_token().map(|s| s.to_string()),
1946 })
1947 }
1948
1949 async fn head_object(&self, path: &RemotePath) -> Result<ObjectInfo> {
1950 let response = self
1951 .inner
1952 .head_object()
1953 .bucket(&path.bucket)
1954 .key(&path.key)
1955 .send()
1956 .await
1957 .map_err(|e| {
1958 let err_str = e.to_string();
1959 if err_str.contains("NotFound") || err_str.contains("NoSuchKey") {
1960 Error::NotFound(path.to_string())
1961 } else {
1962 Error::Network(err_str)
1963 }
1964 })?;
1965
1966 let size = response.content_length().unwrap_or(0);
1967 let mut info = ObjectInfo::file(&path.key, size);
1968
1969 if let Some(modified) = response.last_modified() {
1970 info.last_modified = jiff::Timestamp::from_second(modified.secs()).ok();
1971 }
1972
1973 if let Some(etag) = response.e_tag() {
1974 info.etag = Some(etag.trim_matches('"').to_string());
1975 }
1976
1977 if let Some(ct) = response.content_type() {
1978 info.content_type = Some(ct.to_string());
1979 }
1980
1981 if let Some(sc) = response.storage_class() {
1982 info.storage_class = Some(sc.as_str().to_string());
1983 }
1984
1985 if let Some(meta) = response.metadata()
1986 && !meta.is_empty()
1987 {
1988 info.metadata = Some(meta.clone());
1989 }
1990
1991 Ok(info)
1992 }
1993
1994 async fn bucket_exists(&self, bucket: &str) -> Result<bool> {
1995 match self.inner.head_bucket().bucket(bucket).send().await {
1996 Ok(_) => Ok(true),
1997 Err(e) => {
1998 if let aws_sdk_s3::error::SdkError::ServiceError(ref service_err) = e
2001 && service_err.raw().status().as_u16() == 404
2002 {
2003 return Ok(false);
2004 }
2005 let err_str = e.to_string();
2006 if err_str.contains("NotFound") || err_str.contains("NoSuchBucket") {
2007 return Ok(false);
2008 }
2009 Err(Error::Network(err_str))
2010 }
2011 }
2012 }
2013
2014 async fn create_bucket(&self, bucket: &str) -> Result<()> {
2015 self.inner
2016 .create_bucket()
2017 .bucket(bucket)
2018 .send()
2019 .await
2020 .map_err(|e| Error::Network(Self::format_sdk_error(&e)))?;
2021
2022 Ok(())
2023 }
2024
2025 async fn delete_bucket(&self, bucket: &str) -> Result<()> {
2026 self.inner
2027 .delete_bucket()
2028 .bucket(bucket)
2029 .send()
2030 .await
2031 .map_err(|e| {
2032 let err_str = Self::format_sdk_error(&e);
2033 if err_str.contains("NotFound") || err_str.contains("NoSuchBucket") {
2034 Error::NotFound(format!("Bucket not found: {bucket}"))
2035 } else if err_str.contains("BucketNotEmpty") {
2036 Error::Conflict(err_str)
2037 } else {
2038 Error::Network(err_str)
2039 }
2040 })?;
2041
2042 Ok(())
2043 }
2044
2045 async fn capabilities(&self) -> Result<Capabilities> {
2046 Ok(Capabilities {
2049 versioning: true,
2050 object_lock: false,
2051 tagging: true,
2052 anonymous: true,
2053 select: false,
2054 notifications: true,
2055 lifecycle: true,
2056 replication: true,
2057 cors: true,
2058 })
2059 }
2060
2061 async fn get_object(&self, path: &RemotePath) -> Result<Vec<u8>> {
2062 self.get_object_with_progress(path, |_, _| {}).await
2063 }
2064
2065 async fn put_object(
2066 &self,
2067 path: &RemotePath,
2068 data: Vec<u8>,
2069 content_type: Option<&str>,
2070 encryption: Option<&ObjectEncryptionRequest>,
2071 ) -> Result<ObjectInfo> {
2072 let size = data.len() as i64;
2073 let body = aws_sdk_s3::primitives::ByteStream::from(data);
2074
2075 let mut request = apply_object_encryption_to_put_request(
2076 self.inner
2077 .put_object()
2078 .bucket(&path.bucket)
2079 .key(&path.key)
2080 .body(body),
2081 encryption,
2082 );
2083
2084 if let Some(ct) = content_type {
2085 request = request.content_type(ct);
2086 }
2087
2088 let response = request
2089 .send()
2090 .await
2091 .map_err(|e| Error::Network(e.to_string()))?;
2092
2093 let mut info = ObjectInfo::file(&path.key, size);
2094 if let Some(etag) = response.e_tag() {
2095 info.etag = Some(etag.trim_matches('"').to_string());
2096 }
2097 info.last_modified = Some(jiff::Timestamp::now());
2098
2099 Ok(info)
2100 }
2101
2102 async fn delete_object(&self, path: &RemotePath) -> Result<()> {
2103 self.delete_object_with_options(path, DeleteRequestOptions::default())
2104 .await
2105 }
2106
2107 async fn delete_objects(&self, bucket: &str, keys: Vec<String>) -> Result<Vec<String>> {
2108 self.delete_objects_with_options(bucket, keys, DeleteRequestOptions::default())
2109 .await
2110 }
2111
2112 async fn copy_object(
2113 &self,
2114 src: &RemotePath,
2115 dst: &RemotePath,
2116 encryption: Option<&ObjectEncryptionRequest>,
2117 ) -> Result<ObjectInfo> {
2118 let copy_source = format!("{}/{}", src.bucket, src.key);
2120
2121 let response = apply_object_encryption_to_copy_request(
2122 self.inner
2123 .copy_object()
2124 .copy_source(©_source)
2125 .bucket(&dst.bucket)
2126 .key(&dst.key),
2127 encryption,
2128 )
2129 .send()
2130 .await
2131 .map_err(|e| {
2132 let err_str = e.to_string();
2133 if err_str.contains("NotFound") || err_str.contains("NoSuchKey") {
2134 Error::NotFound(src.to_string())
2135 } else {
2136 Error::Network(err_str)
2137 }
2138 })?;
2139
2140 let info = self.head_object(dst).await?;
2142
2143 let mut result = info;
2145 if let Some(copy_result) = response.copy_object_result()
2146 && let Some(etag) = copy_result.e_tag()
2147 {
2148 result.etag = Some(etag.trim_matches('"').to_string());
2149 }
2150
2151 Ok(result)
2152 }
2153
2154 async fn presign_get(&self, path: &RemotePath, expires_secs: u64) -> Result<String> {
2155 let config = aws_sdk_s3::presigning::PresigningConfig::builder()
2156 .expires_in(std::time::Duration::from_secs(expires_secs))
2157 .build()
2158 .map_err(|e| Error::General(format!("presign_get config: {e}")))?;
2159
2160 let request = self
2161 .presign_inner
2162 .get_object()
2163 .bucket(&path.bucket)
2164 .key(&path.key)
2165 .presigned(config)
2166 .await
2167 .map_err(|e| Error::General(format!("presign_get: {e}")))?;
2168
2169 Ok(request.uri().to_string())
2170 }
2171
2172 async fn presign_put(
2173 &self,
2174 path: &RemotePath,
2175 expires_secs: u64,
2176 content_type: Option<&str>,
2177 ) -> Result<String> {
2178 let config = aws_sdk_s3::presigning::PresigningConfig::builder()
2179 .expires_in(std::time::Duration::from_secs(expires_secs))
2180 .build()
2181 .map_err(|e| Error::General(format!("presign_put config: {e}")))?;
2182
2183 let mut builder = self
2184 .presign_inner
2185 .put_object()
2186 .bucket(&path.bucket)
2187 .key(&path.key);
2188
2189 if let Some(ct) = content_type {
2190 builder = builder.content_type(ct);
2191 }
2192
2193 let request = builder
2194 .presigned(config)
2195 .await
2196 .map_err(|e| Error::General(format!("presign_put: {e}")))?;
2197
2198 Ok(request.uri().to_string())
2199 }
2200
2201 async fn get_versioning(&self, bucket: &str) -> Result<Option<bool>> {
2202 let response = self
2203 .inner
2204 .get_bucket_versioning()
2205 .bucket(bucket)
2206 .send()
2207 .await
2208 .map_err(|e| Error::General(format!("get_versioning: {e}")))?;
2209
2210 Ok(response
2211 .status()
2212 .map(|s| *s == aws_sdk_s3::types::BucketVersioningStatus::Enabled))
2213 }
2214
2215 async fn set_versioning(&self, bucket: &str, enabled: bool) -> Result<()> {
2216 use aws_sdk_s3::types::{BucketVersioningStatus, VersioningConfiguration};
2217
2218 let status = if enabled {
2219 BucketVersioningStatus::Enabled
2220 } else {
2221 BucketVersioningStatus::Suspended
2222 };
2223
2224 let config = VersioningConfiguration::builder().status(status).build();
2225
2226 self.inner
2227 .put_bucket_versioning()
2228 .bucket(bucket)
2229 .versioning_configuration(config)
2230 .send()
2231 .await
2232 .map_err(|e| Error::General(format!("set_versioning: {e}")))?;
2233
2234 Ok(())
2235 }
2236
2237 async fn get_bucket_encryption(&self, bucket: &str) -> Result<Option<BucketEncryption>> {
2238 let response = match self
2239 .inner
2240 .get_bucket_encryption()
2241 .bucket(bucket)
2242 .send()
2243 .await
2244 {
2245 Ok(response) => response,
2246 Err(error) => {
2247 let error_text = Self::format_sdk_error(&error);
2248 let missing_config =
2249 if let aws_sdk_s3::error::SdkError::ServiceError(service_err) = &error {
2250 is_missing_bucket_encryption_response(
2251 service_err.err().code(),
2252 Some(service_err.raw().status().as_u16()),
2253 &error_text,
2254 )
2255 } else {
2256 is_missing_bucket_encryption_response(None, None, &error_text)
2257 };
2258 if missing_config {
2259 return Ok(None);
2260 }
2261 return Err(Error::General(format!(
2262 "get_bucket_encryption: {error_text}"
2263 )));
2264 }
2265 };
2266
2267 let rule = response
2268 .server_side_encryption_configuration()
2269 .and_then(|config| config.rules().first())
2270 .and_then(|rule| rule.apply_server_side_encryption_by_default())
2271 .ok_or_else(|| {
2272 Error::General("get_bucket_encryption: missing bucket encryption rule".to_string())
2273 })?;
2274
2275 sdk_bucket_encryption_to_core(rule).map(Some)
2276 }
2277
2278 async fn set_bucket_encryption(
2279 &self,
2280 bucket: &str,
2281 encryption: BucketEncryption,
2282 ) -> Result<()> {
2283 let configuration = core_bucket_encryption_to_sdk(&encryption);
2284
2285 self.inner
2286 .put_bucket_encryption()
2287 .bucket(bucket)
2288 .server_side_encryption_configuration(configuration)
2289 .send()
2290 .await
2291 .map_err(|e| Error::General(format!("set_bucket_encryption: {e}")))?;
2292
2293 Ok(())
2294 }
2295
2296 async fn delete_bucket_encryption(&self, bucket: &str) -> Result<()> {
2297 match self
2298 .inner
2299 .delete_bucket_encryption()
2300 .bucket(bucket)
2301 .send()
2302 .await
2303 {
2304 Ok(_) => Ok(()),
2305 Err(error) => {
2306 let error_text = Self::format_sdk_error(&error);
2307 let missing_config =
2308 if let aws_sdk_s3::error::SdkError::ServiceError(service_err) = &error {
2309 is_missing_bucket_encryption_response(
2310 service_err.err().code(),
2311 Some(service_err.raw().status().as_u16()),
2312 &error_text,
2313 )
2314 } else {
2315 is_missing_bucket_encryption_response(None, None, &error_text)
2316 };
2317 if missing_config {
2318 Ok(())
2319 } else {
2320 Err(Error::General(format!(
2321 "delete_bucket_encryption: {error_text}"
2322 )))
2323 }
2324 }
2325 }
2326 }
2327
2328 async fn list_object_versions(
2329 &self,
2330 path: &RemotePath,
2331 max_keys: Option<i32>,
2332 ) -> Result<Vec<ObjectVersion>> {
2333 Ok(self.list_object_versions_page(path, max_keys).await?.items)
2334 }
2335
2336 async fn get_object_tags(
2337 &self,
2338 path: &RemotePath,
2339 ) -> Result<std::collections::HashMap<String, String>> {
2340 let response = match self
2341 .inner
2342 .get_object_tagging()
2343 .bucket(&path.bucket)
2344 .key(&path.key)
2345 .send()
2346 .await
2347 {
2348 Ok(response) => response,
2349 Err(e) => {
2350 if e.to_string().contains("NoSuchTagSet") {
2351 return Ok(std::collections::HashMap::new());
2352 }
2353 return Err(Error::General(format!("get_object_tags: {e}")));
2354 }
2355 };
2356
2357 let mut tags = std::collections::HashMap::new();
2358 for tag in response.tag_set() {
2359 let key = tag.key();
2360 let value = tag.value();
2361 tags.insert(key.to_string(), value.to_string());
2362 }
2363
2364 Ok(tags)
2365 }
2366
2367 async fn get_bucket_tags(
2368 &self,
2369 bucket: &str,
2370 ) -> Result<std::collections::HashMap<String, String>> {
2371 let response = match self.inner.get_bucket_tagging().bucket(bucket).send().await {
2372 Ok(response) => response,
2373 Err(e) => {
2374 if e.to_string().contains("NoSuchTagSet") {
2375 return Ok(std::collections::HashMap::new());
2376 }
2377 return Err(Error::General(format!("get_bucket_tags: {e}")));
2378 }
2379 };
2380
2381 let mut tags = std::collections::HashMap::new();
2382 for tag in response.tag_set() {
2383 let key = tag.key();
2384 let value = tag.value();
2385 tags.insert(key.to_string(), value.to_string());
2386 }
2387
2388 Ok(tags)
2389 }
2390
2391 async fn set_object_tags(
2392 &self,
2393 path: &RemotePath,
2394 tags: std::collections::HashMap<String, String>,
2395 ) -> Result<()> {
2396 let tagging = build_tagging(tags)?;
2397
2398 self.inner
2399 .put_object_tagging()
2400 .bucket(&path.bucket)
2401 .key(&path.key)
2402 .tagging(tagging)
2403 .send()
2404 .await
2405 .map_err(|e| Error::General(format!("set_object_tags: {e}")))?;
2406
2407 Ok(())
2408 }
2409
2410 async fn set_bucket_tags(
2411 &self,
2412 bucket: &str,
2413 tags: std::collections::HashMap<String, String>,
2414 ) -> Result<()> {
2415 let tagging = build_tagging(tags)?;
2416
2417 self.inner
2418 .put_bucket_tagging()
2419 .bucket(bucket)
2420 .tagging(tagging)
2421 .send()
2422 .await
2423 .map_err(|e| Error::General(format!("set_bucket_tags: {e}")))?;
2424
2425 Ok(())
2426 }
2427
2428 async fn delete_object_tags(&self, path: &RemotePath) -> Result<()> {
2429 self.inner
2430 .delete_object_tagging()
2431 .bucket(&path.bucket)
2432 .key(&path.key)
2433 .send()
2434 .await
2435 .map_err(|e| Error::General(format!("delete_object_tags: {e}")))?;
2436
2437 Ok(())
2438 }
2439
2440 async fn delete_bucket_tags(&self, bucket: &str) -> Result<()> {
2441 self.inner
2442 .delete_bucket_tagging()
2443 .bucket(bucket)
2444 .send()
2445 .await
2446 .map_err(|e| Error::General(format!("delete_bucket_tags: {e}")))?;
2447
2448 Ok(())
2449 }
2450
2451 async fn get_bucket_policy(&self, bucket: &str) -> Result<Option<String>> {
2452 let response = match self.inner.get_bucket_policy().bucket(bucket).send().await {
2453 Ok(policy) => policy,
2454 Err(error) => {
2455 let error_text = error.to_string();
2456 let kind = if let aws_sdk_s3::error::SdkError::ServiceError(service_err) = &error {
2457 let code = service_err
2458 .raw()
2459 .headers()
2460 .get("x-amz-error-code")
2461 .and_then(|value| std::str::from_utf8(value.as_bytes()).ok());
2462 let status = Some(service_err.raw().status().as_u16());
2463 Self::bucket_policy_error_kind(code, status, &error_text)
2464 } else {
2465 Self::bucket_policy_error_kind(None, None, &error_text)
2466 };
2467 return Self::map_get_bucket_policy_error(bucket, kind, &error_text);
2468 }
2469 };
2470
2471 Ok(response.policy().map(|policy| policy.to_string()))
2472 }
2473
2474 async fn set_bucket_policy(&self, bucket: &str, policy: &str) -> Result<()> {
2475 self.inner
2476 .put_bucket_policy()
2477 .bucket(bucket)
2478 .policy(policy)
2479 .send()
2480 .await
2481 .map_err(|e| Error::General(format!("set_bucket_policy: {e}")))?;
2482
2483 Ok(())
2484 }
2485
2486 async fn delete_bucket_policy(&self, bucket: &str) -> Result<()> {
2487 match self
2488 .inner
2489 .delete_bucket_policy()
2490 .bucket(bucket)
2491 .send()
2492 .await
2493 {
2494 Ok(_) => Ok(()),
2495 Err(e) => {
2496 let error_text = e.to_string();
2497 let kind = if let aws_sdk_s3::error::SdkError::ServiceError(service_err) = &e {
2498 let code = service_err
2499 .raw()
2500 .headers()
2501 .get("x-amz-error-code")
2502 .and_then(|value| std::str::from_utf8(value.as_bytes()).ok());
2503 let status = Some(service_err.raw().status().as_u16());
2504 Self::bucket_policy_error_kind(code, status, &error_text)
2505 } else {
2506 Self::bucket_policy_error_kind(None, None, &error_text)
2507 };
2508 Self::map_delete_bucket_policy_error(bucket, kind, &error_text)
2509 }
2510 }
2511 }
2512
2513 async fn get_bucket_cors(&self, bucket: &str) -> Result<Vec<CorsRule>> {
2514 let response = match self.inner.get_bucket_cors().bucket(bucket).send().await {
2515 Ok(response) => response,
2516 Err(error) => {
2517 let error_text = error.to_string();
2518 if error_text.contains("service error")
2519 && let Ok(url) = self.cors_url(bucket)
2520 {
2521 match self.xml_request(Method::GET, url, None, None).await {
2522 Ok(body) => return parse_cors_configuration_xml(&body),
2523 Err(Error::Network(raw_error))
2524 if is_missing_cors_configuration_error(&raw_error) =>
2525 {
2526 return Ok(Vec::new());
2527 }
2528 Err(_) => {}
2529 }
2530 }
2531
2532 let missing_config =
2533 if let aws_sdk_s3::error::SdkError::ServiceError(service_err) = &error {
2534 is_missing_cors_configuration_response(
2535 service_err.err().code(),
2536 Some(service_err.raw().status().as_u16()),
2537 &error_text,
2538 )
2539 } else {
2540 is_missing_cors_configuration_response(None, None, &error_text)
2541 };
2542
2543 if missing_config {
2544 return Ok(Vec::new());
2545 }
2546 return Err(Error::General(format!("get_bucket_cors: {error_text}")));
2547 }
2548 };
2549
2550 Ok(response
2551 .cors_rules()
2552 .iter()
2553 .map(sdk_cors_rule_to_core)
2554 .collect())
2555 }
2556
2557 async fn set_bucket_cors(&self, bucket: &str, rules: Vec<CorsRule>) -> Result<()> {
2558 let cors_rules = rules
2559 .iter()
2560 .map(core_cors_rule_to_sdk)
2561 .collect::<Result<Vec<_>>>()?;
2562 let cors_configuration = aws_sdk_s3::types::CorsConfiguration::builder()
2563 .set_cors_rules(Some(cors_rules))
2564 .build()
2565 .map_err(|e| Error::General(format!("build bucket cors config: {e}")))?;
2566
2567 self.inner
2568 .put_bucket_cors()
2569 .bucket(bucket)
2570 .cors_configuration(cors_configuration)
2571 .send()
2572 .await
2573 .map_err(|e| Error::General(format!("set_bucket_cors: {e}")))?;
2574
2575 Ok(())
2576 }
2577
2578 async fn delete_bucket_cors(&self, bucket: &str) -> Result<()> {
2579 match self.inner.delete_bucket_cors().bucket(bucket).send().await {
2580 Ok(_) => Ok(()),
2581 Err(error) => {
2582 let error_text = error.to_string();
2583 if error_text.contains("service error")
2584 && let Ok(url) = self.cors_url(bucket)
2585 {
2586 match self.xml_request(Method::DELETE, url, None, None).await {
2587 Ok(_) => return Ok(()),
2588 Err(Error::Network(raw_error))
2589 if is_missing_cors_configuration_error(&raw_error) =>
2590 {
2591 return Ok(());
2592 }
2593 Err(_) => {}
2594 }
2595 }
2596
2597 let missing_config =
2598 if let aws_sdk_s3::error::SdkError::ServiceError(service_err) = &error {
2599 is_missing_cors_configuration_response(
2600 service_err.err().code(),
2601 Some(service_err.raw().status().as_u16()),
2602 &error_text,
2603 )
2604 } else {
2605 is_missing_cors_configuration_response(None, None, &error_text)
2606 };
2607
2608 if missing_config {
2609 Ok(())
2610 } else {
2611 Err(Error::General(format!("delete_bucket_cors: {error_text}")))
2612 }
2613 }
2614 }
2615 }
2616
2617 async fn get_bucket_notifications(&self, bucket: &str) -> Result<Vec<BucketNotification>> {
2618 let response = self
2619 .inner
2620 .get_bucket_notification_configuration()
2621 .bucket(bucket)
2622 .send()
2623 .await
2624 .map_err(|e| Error::General(format!("get_bucket_notifications: {e}")))?;
2625
2626 let mut rules = Vec::new();
2627
2628 for cfg in response.queue_configurations() {
2629 let (prefix, suffix) = Self::extract_notification_filter(cfg.filter());
2630 rules.push(BucketNotification {
2631 id: cfg.id().map(ToString::to_string),
2632 target: NotificationTarget::Queue,
2633 arn: cfg.queue_arn().to_string(),
2634 events: Self::event_list_to_strings(cfg.events()),
2635 prefix,
2636 suffix,
2637 });
2638 }
2639
2640 for cfg in response.topic_configurations() {
2641 let (prefix, suffix) = Self::extract_notification_filter(cfg.filter());
2642 rules.push(BucketNotification {
2643 id: cfg.id().map(ToString::to_string),
2644 target: NotificationTarget::Topic,
2645 arn: cfg.topic_arn().to_string(),
2646 events: Self::event_list_to_strings(cfg.events()),
2647 prefix,
2648 suffix,
2649 });
2650 }
2651
2652 for cfg in response.lambda_function_configurations() {
2653 let (prefix, suffix) = Self::extract_notification_filter(cfg.filter());
2654 rules.push(BucketNotification {
2655 id: cfg.id().map(ToString::to_string),
2656 target: NotificationTarget::Lambda,
2657 arn: cfg.lambda_function_arn().to_string(),
2658 events: Self::event_list_to_strings(cfg.events()),
2659 prefix,
2660 suffix,
2661 });
2662 }
2663
2664 Ok(rules)
2665 }
2666
2667 async fn set_bucket_notifications(
2668 &self,
2669 bucket: &str,
2670 notifications: Vec<BucketNotification>,
2671 ) -> Result<()> {
2672 use aws_sdk_s3::types::{
2673 LambdaFunctionConfiguration, NotificationConfiguration, QueueConfiguration,
2674 TopicConfiguration,
2675 };
2676
2677 let expected_notifications = notifications.clone();
2678 let mut queues = Vec::new();
2679 let mut topics = Vec::new();
2680 let mut lambdas = Vec::new();
2681
2682 for rule in notifications {
2683 let events = Self::strings_to_event_list(&rule.events);
2684 if events.is_empty() {
2685 return Err(Error::General(format!(
2686 "set_bucket_notifications: empty event list for target '{}'",
2687 rule.arn
2688 )));
2689 }
2690
2691 let filter =
2692 Self::build_notification_filter(rule.prefix.as_deref(), rule.suffix.as_deref());
2693
2694 match rule.target {
2695 NotificationTarget::Queue => {
2696 let mut builder = QueueConfiguration::builder()
2697 .queue_arn(rule.arn)
2698 .set_events(Some(events))
2699 .set_id(rule.id);
2700 if let Some(filter) = filter {
2701 builder = builder.filter(filter);
2702 }
2703 let queue = builder
2704 .build()
2705 .map_err(|e| Error::General(format!("build queue notification: {e}")))?;
2706 queues.push(queue);
2707 }
2708 NotificationTarget::Topic => {
2709 let mut builder = TopicConfiguration::builder()
2710 .topic_arn(rule.arn)
2711 .set_events(Some(events))
2712 .set_id(rule.id);
2713 if let Some(filter) = filter {
2714 builder = builder.filter(filter);
2715 }
2716 let topic = builder
2717 .build()
2718 .map_err(|e| Error::General(format!("build topic notification: {e}")))?;
2719 topics.push(topic);
2720 }
2721 NotificationTarget::Lambda => {
2722 let mut builder = LambdaFunctionConfiguration::builder()
2723 .lambda_function_arn(rule.arn)
2724 .set_events(Some(events))
2725 .set_id(rule.id);
2726 if let Some(filter) = filter {
2727 builder = builder.filter(filter);
2728 }
2729 let lambda = builder
2730 .build()
2731 .map_err(|e| Error::General(format!("build lambda notification: {e}")))?;
2732 lambdas.push(lambda);
2733 }
2734 }
2735 }
2736
2737 let config = NotificationConfiguration::builder()
2738 .set_queue_configurations(Some(queues))
2739 .set_topic_configurations(Some(topics))
2740 .set_lambda_function_configurations(Some(lambdas))
2741 .build();
2742
2743 match self
2744 .inner
2745 .put_bucket_notification_configuration()
2746 .bucket(bucket)
2747 .notification_configuration(config)
2748 .send()
2749 .await
2750 {
2751 Ok(_) => {}
2752 Err(error) => {
2753 let error_text = error.to_string();
2754 if error_text.contains("service error")
2757 && let Ok(actual) = self.get_bucket_notifications(bucket).await
2758 && Self::notifications_equivalent(&expected_notifications, &actual)
2759 {
2760 return Ok(());
2761 }
2762 return Err(Error::General(format!(
2763 "set_bucket_notifications: {error_text}"
2764 )));
2765 }
2766 }
2767
2768 Ok(())
2769 }
2770
2771 async fn get_bucket_lifecycle(&self, bucket: &str) -> Result<Vec<LifecycleRule>> {
2772 let response = match self
2773 .inner
2774 .get_bucket_lifecycle_configuration()
2775 .bucket(bucket)
2776 .send()
2777 .await
2778 {
2779 Ok(resp) => resp,
2780 Err(error) => {
2781 let error_text = Self::format_sdk_error(&error);
2782 if error_text.contains("NoSuchLifecycleConfiguration")
2783 || error_text.contains("lifecycle configuration is not found")
2784 {
2785 return Ok(Vec::new());
2786 }
2787 return Err(Error::General(format!(
2788 "get_bucket_lifecycle: {error_text}"
2789 )));
2790 }
2791 };
2792
2793 let mut rules = Vec::new();
2794 for sdk_rule in response.rules() {
2795 let id = sdk_rule.id().unwrap_or("").to_string();
2796 let status = match sdk_rule.status().as_str() {
2797 "Enabled" => rc_core::LifecycleRuleStatus::Enabled,
2798 _ => rc_core::LifecycleRuleStatus::Disabled,
2799 };
2800
2801 let prefix = parse_lifecycle_filter_prefix(sdk_rule.filter());
2802 let tags = parse_lifecycle_filter_tags(sdk_rule.filter());
2803
2804 let expiration = sdk_rule
2805 .expiration()
2806 .map(|exp| rc_core::LifecycleExpiration {
2807 days: exp.days(),
2808 date: exp.date().map(|d| d.to_string()),
2809 });
2810
2811 let transition = sdk_rule
2812 .transitions()
2813 .first()
2814 .map(|t| rc_core::LifecycleTransition {
2815 days: t.days(),
2816 date: t.date().map(|d| d.to_string()),
2817 storage_class: t
2818 .storage_class()
2819 .map(|sc| sc.as_str().to_string())
2820 .unwrap_or_default(),
2821 });
2822
2823 let noncurrent_version_expiration =
2824 sdk_rule.noncurrent_version_expiration().map(|nve| {
2825 rc_core::NoncurrentVersionExpiration {
2826 noncurrent_days: nve.noncurrent_days().unwrap_or(0),
2827 newer_noncurrent_versions: nve.newer_noncurrent_versions(),
2828 }
2829 });
2830
2831 let noncurrent_version_transition = sdk_rule
2832 .noncurrent_version_transitions()
2833 .first()
2834 .map(|nvt| rc_core::NoncurrentVersionTransition {
2835 noncurrent_days: nvt.noncurrent_days().unwrap_or(0),
2836 storage_class: nvt
2837 .storage_class()
2838 .map(|sc| sc.as_str().to_string())
2839 .unwrap_or_default(),
2840 });
2841
2842 let abort_incomplete_multipart_upload_days = sdk_rule
2843 .abort_incomplete_multipart_upload()
2844 .and_then(|a| a.days_after_initiation());
2845
2846 let expired_object_delete_marker = sdk_rule
2847 .expiration()
2848 .and_then(|e| e.expired_object_delete_marker())
2849 .filter(|v| *v);
2850
2851 rules.push(LifecycleRule {
2852 id,
2853 status,
2854 prefix,
2855 tags,
2856 expiration,
2857 transition,
2858 noncurrent_version_expiration,
2859 noncurrent_version_transition,
2860 abort_incomplete_multipart_upload_days,
2861 expired_object_delete_marker,
2862 });
2863 }
2864
2865 Ok(rules)
2866 }
2867
2868 async fn set_bucket_lifecycle(&self, bucket: &str, rules: Vec<LifecycleRule>) -> Result<()> {
2869 use aws_sdk_s3::types::{
2870 AbortIncompleteMultipartUpload, BucketLifecycleConfiguration, ExpirationStatus,
2871 LifecycleExpiration as SdkExpiration, LifecycleRule as SdkRule,
2872 NoncurrentVersionExpiration as SdkNve, NoncurrentVersionTransition as SdkNvt,
2873 Transition, TransitionStorageClass,
2874 };
2875
2876 let mut sdk_rules = Vec::new();
2877 for rule in rules {
2878 let status = match rule.status {
2879 rc_core::LifecycleRuleStatus::Enabled => ExpirationStatus::Enabled,
2880 rc_core::LifecycleRuleStatus::Disabled => ExpirationStatus::Disabled,
2881 };
2882
2883 let filter = build_lifecycle_rule_filter(rule.prefix.as_deref(), rule.tags.as_ref())?;
2884
2885 let expiration = rule.expiration.map(|exp| {
2886 let mut builder = SdkExpiration::builder();
2887 if let Some(days) = exp.days {
2888 builder = builder.days(days);
2889 }
2890 if let Some(ref date_str) = exp.date
2891 && let Ok(dt) = aws_smithy_types::DateTime::from_str(
2892 date_str,
2893 aws_smithy_types::date_time::Format::DateTime,
2894 )
2895 {
2896 builder = builder.date(dt);
2897 }
2898 if let Some(true) = rule.expired_object_delete_marker {
2899 builder = builder.expired_object_delete_marker(true);
2900 }
2901 builder.build()
2902 });
2903
2904 let transitions = rule.transition.map(|t| {
2905 #[allow(deprecated)]
2906 let sc = TransitionStorageClass::from(t.storage_class.as_str());
2907 let mut builder = Transition::builder().storage_class(sc);
2908 if let Some(days) = t.days {
2909 builder = builder.days(days);
2910 }
2911 if let Some(ref date_str) = t.date
2912 && let Ok(dt) = aws_smithy_types::DateTime::from_str(
2913 date_str,
2914 aws_smithy_types::date_time::Format::DateTime,
2915 )
2916 {
2917 builder = builder.date(dt);
2918 }
2919 vec![builder.build()]
2920 });
2921
2922 let nve = rule.noncurrent_version_expiration.map(|nve| {
2923 let mut builder = SdkNve::builder().noncurrent_days(nve.noncurrent_days);
2924 if let Some(newer) = nve.newer_noncurrent_versions {
2925 builder = builder.newer_noncurrent_versions(newer);
2926 }
2927 builder.build()
2928 });
2929
2930 let nvt = rule.noncurrent_version_transition.map(|nvt| {
2931 let sc = TransitionStorageClass::from(nvt.storage_class.as_str());
2932 let builder = SdkNvt::builder()
2933 .noncurrent_days(nvt.noncurrent_days)
2934 .storage_class(sc);
2935 vec![builder.build()]
2936 });
2937
2938 let abort = rule.abort_incomplete_multipart_upload_days.map(|days| {
2939 AbortIncompleteMultipartUpload::builder()
2940 .days_after_initiation(days)
2941 .build()
2942 });
2943
2944 let mut builder = SdkRule::builder().id(&rule.id).status(status);
2945 if let Some(filter) = filter {
2946 builder = builder.filter(filter);
2947 }
2948 if let Some(expiration) = expiration {
2949 builder = builder.expiration(expiration);
2950 }
2951 if let Some(transitions) = transitions {
2952 builder = builder.set_transitions(Some(transitions));
2953 }
2954 if let Some(nve) = nve {
2955 builder = builder.noncurrent_version_expiration(nve);
2956 }
2957 if let Some(nvt) = nvt {
2958 builder = builder.set_noncurrent_version_transitions(Some(nvt));
2959 }
2960 if let Some(abort) = abort {
2961 builder = builder.abort_incomplete_multipart_upload(abort);
2962 }
2963
2964 let sdk_rule = builder
2965 .build()
2966 .map_err(|e| Error::General(format!("build lifecycle rule: {e}")))?;
2967 sdk_rules.push(sdk_rule);
2968 }
2969
2970 let config = BucketLifecycleConfiguration::builder()
2971 .set_rules(Some(sdk_rules))
2972 .build()
2973 .map_err(|e| Error::General(format!("build lifecycle config: {e}")))?;
2974
2975 self.inner
2976 .put_bucket_lifecycle_configuration()
2977 .bucket(bucket)
2978 .lifecycle_configuration(config)
2979 .send()
2980 .await
2981 .map_err(|e| {
2982 Error::General(format!(
2983 "set_bucket_lifecycle: {}",
2984 Self::format_sdk_error(&e)
2985 ))
2986 })?;
2987
2988 Ok(())
2989 }
2990
2991 async fn delete_bucket_lifecycle(&self, bucket: &str) -> Result<()> {
2992 self.inner
2993 .delete_bucket_lifecycle()
2994 .bucket(bucket)
2995 .send()
2996 .await
2997 .map_err(|e| {
2998 Error::General(format!(
2999 "delete_bucket_lifecycle: {}",
3000 Self::format_sdk_error(&e)
3001 ))
3002 })?;
3003 Ok(())
3004 }
3005
3006 async fn restore_object(&self, path: &RemotePath, days: i32) -> Result<()> {
3007 use aws_sdk_s3::types::RestoreRequest;
3008
3009 let request = RestoreRequest::builder().days(days).build();
3010 self.inner
3011 .restore_object()
3012 .bucket(&path.bucket)
3013 .key(&path.key)
3014 .restore_request(request)
3015 .send()
3016 .await
3017 .map_err(|e| {
3018 Error::General(format!("restore_object: {}", Self::format_sdk_error(&e)))
3019 })?;
3020 Ok(())
3021 }
3022
3023 async fn get_bucket_replication(
3024 &self,
3025 bucket: &str,
3026 ) -> Result<Option<ReplicationConfiguration>> {
3027 let url = self.replication_url(bucket)?;
3028 let body = match self.xml_request(Method::GET, url, None, None).await {
3029 Ok(body) => body,
3030 Err(Error::Network(error_text))
3031 if error_text.contains("ReplicationConfigurationNotFound")
3032 || error_text.contains("replication configuration is not found")
3033 || error_text.contains("replication not found") =>
3034 {
3035 return Ok(None);
3036 }
3037 Err(error) => {
3038 return Err(Error::General(format!("get_bucket_replication: {error}")));
3039 }
3040 };
3041
3042 parse_replication_configuration_xml(&body).map(Some)
3043 }
3044
3045 async fn set_bucket_replication(
3046 &self,
3047 bucket: &str,
3048 config: ReplicationConfiguration,
3049 ) -> Result<()> {
3050 let url = self.replication_url(bucket)?;
3051 let body = build_replication_configuration_xml(&config).into_bytes();
3052 self.xml_request(Method::PUT, url, Some("application/xml"), Some(body))
3053 .await
3054 .map_err(|e| Error::General(format!("set_bucket_replication: {e}")))?;
3055
3056 Ok(())
3057 }
3058
3059 async fn delete_bucket_replication(&self, bucket: &str) -> Result<()> {
3060 self.inner
3061 .delete_bucket_replication()
3062 .bucket(bucket)
3063 .send()
3064 .await
3065 .map_err(|e| {
3066 Error::General(format!(
3067 "delete_bucket_replication: {}",
3068 Self::format_sdk_error(&e)
3069 ))
3070 })?;
3071 Ok(())
3072 }
3073
3074 async fn select_object_content(
3075 &self,
3076 path: &RemotePath,
3077 options: &SelectOptions,
3078 writer: &mut (dyn AsyncWrite + Send + Unpin),
3079 ) -> Result<()> {
3080 crate::select::select_object_content(&self.inner, path, options, writer).await
3081 }
3082}
3083
3084#[cfg(test)]
3085mod tests {
3086 use super::*;
3087 use aws_smithy_http_client::test_util::{CaptureRequestReceiver, capture_request};
3088 use std::collections::HashMap;
3089 use std::io::{Read, Write};
3090 use std::net::{TcpListener, TcpStream};
3091 use std::sync::mpsc;
3092 use std::thread;
3093 use std::time::{Duration, Instant};
3094
3095 #[derive(Debug)]
3096 struct CapturedXmlRequest {
3097 method: String,
3098 target: String,
3099 headers: Vec<(String, String)>,
3100 }
3101
3102 fn test_s3_client(
3103 response: Option<http::Response<SdkBody>>,
3104 ) -> (S3Client, CaptureRequestReceiver) {
3105 test_s3_client_with_endpoint("https://example.com", response)
3106 }
3107
3108 fn test_s3_client_with_endpoint(
3109 endpoint: &str,
3110 response: Option<http::Response<SdkBody>>,
3111 ) -> (S3Client, CaptureRequestReceiver) {
3112 test_s3_client_with_endpoint_and_headers(endpoint, response, Vec::new())
3113 }
3114
3115 fn test_s3_client_with_endpoint_and_headers(
3116 endpoint: &str,
3117 response: Option<http::Response<SdkBody>>,
3118 request_headers: Vec<RequestHeader>,
3119 ) -> (S3Client, CaptureRequestReceiver) {
3120 let (http_client, request_receiver) = capture_request(response);
3121 let credentials = Credentials::new(
3122 "access-key",
3123 "secret-key",
3124 None,
3125 None,
3126 "rc-test-credentials",
3127 );
3128 let mut config_builder = aws_sdk_s3::config::Builder::new()
3129 .credentials_provider(credentials)
3130 .endpoint_url(endpoint)
3131 .region(aws_sdk_s3::config::Region::new("us-east-1"))
3132 .force_path_style(true)
3133 .behavior_version_latest()
3134 .http_client(http_client);
3135
3136 let presign_config = config_builder.clone().build();
3137
3138 if !request_headers.is_empty() {
3139 config_builder = config_builder.interceptor(CustomHeaderInterceptor {
3140 headers: request_headers.clone(),
3141 });
3142 }
3143
3144 let config = config_builder.build();
3145
3146 let alias = Alias::new("test", endpoint, "access-key", "secret-key");
3147 let client = S3Client {
3148 inner: aws_sdk_s3::Client::from_conf(config),
3149 presign_inner: aws_sdk_s3::Client::from_conf(presign_config),
3150 xml_http_client: reqwest::Client::new(),
3151 alias,
3152 request_headers,
3153 };
3154
3155 (client, request_receiver)
3156 }
3157
3158 fn read_xml_request(stream: &mut TcpStream) -> CapturedXmlRequest {
3159 let mut buffer = Vec::new();
3160 let mut chunk = [0_u8; 1024];
3161 let header_end = loop {
3162 let read = stream.read(&mut chunk).expect("read HTTP request");
3163 assert!(read > 0, "client closed connection before headers");
3164 buffer.extend_from_slice(&chunk[..read]);
3165
3166 if let Some(position) = buffer.windows(4).position(|window| window == b"\r\n\r\n") {
3167 break position + 4;
3168 }
3169 };
3170
3171 let headers_text = String::from_utf8_lossy(&buffer[..header_end]).into_owned();
3172 let content_length = headers_text
3173 .lines()
3174 .find_map(|line| {
3175 let (name, value) = line.split_once(':')?;
3176 name.eq_ignore_ascii_case("content-length")
3177 .then(|| value.trim().parse::<usize>().expect("valid content length"))
3178 })
3179 .unwrap_or(0);
3180
3181 while buffer.len() - header_end < content_length {
3182 let read = stream.read(&mut chunk).expect("read HTTP request body");
3183 assert!(read > 0, "client closed connection before body");
3184 buffer.extend_from_slice(&chunk[..read]);
3185 }
3186
3187 let mut lines = headers_text.lines();
3188 let request_line = lines.next().expect("request line");
3189 let mut parts = request_line.split_whitespace();
3190 let method = parts.next().expect("request method").to_string();
3191 let target = parts.next().expect("request target").to_string();
3192 let headers = lines
3193 .filter_map(|line| {
3194 let (name, value) = line.split_once(':')?;
3195 Some((name.to_ascii_lowercase(), value.trim().to_string()))
3196 })
3197 .collect();
3198
3199 CapturedXmlRequest {
3200 method,
3201 target,
3202 headers,
3203 }
3204 }
3205
3206 fn start_xml_test_server() -> (
3207 String,
3208 mpsc::Receiver<CapturedXmlRequest>,
3209 thread::JoinHandle<()>,
3210 ) {
3211 let listener = TcpListener::bind("127.0.0.1:0").expect("bind test server");
3212 listener
3213 .set_nonblocking(true)
3214 .expect("configure nonblocking listener");
3215 let endpoint = format!("http://{}", listener.local_addr().expect("local addr"));
3216 let (sender, receiver) = mpsc::channel();
3217
3218 let handle = thread::spawn(move || {
3219 let deadline = Instant::now() + Duration::from_secs(5);
3220 let mut stream = loop {
3221 match listener.accept() {
3222 Ok((stream, _)) => break stream,
3223 Err(error) if error.kind() == std::io::ErrorKind::WouldBlock => {
3224 assert!(Instant::now() < deadline, "timed out waiting for request");
3225 thread::sleep(Duration::from_millis(10));
3226 }
3227 Err(error) => panic!("accept request: {error}"),
3228 }
3229 };
3230 stream
3231 .set_nonblocking(false)
3232 .expect("configure blocking request stream");
3233 stream
3234 .set_read_timeout(Some(Duration::from_secs(5)))
3235 .expect("set stream read timeout");
3236 let request = read_xml_request(&mut stream);
3237 sender.send(request).expect("send captured request");
3238
3239 let response = "HTTP/1.1 200 OK\r\ncontent-length: 2\r\nconnection: close\r\n\r\nok";
3240 stream
3241 .write_all(response.as_bytes())
3242 .expect("write HTTP response");
3243 });
3244
3245 (endpoint, receiver, handle)
3246 }
3247
3248 fn header_value<'a>(headers: &'a [(String, String)], name: &str) -> Option<&'a str> {
3249 headers
3250 .iter()
3251 .find(|(header_name, _)| header_name.eq_ignore_ascii_case(name))
3252 .map(|(_, value)| value.as_str())
3253 }
3254
3255 #[test]
3256 fn test_object_info_creation() {
3257 let info = ObjectInfo::file("test.txt", 1024);
3258 assert_eq!(info.key, "test.txt");
3259 assert_eq!(info.size_bytes, Some(1024));
3260 }
3261
3262 #[test]
3263 fn auto_bucket_lookup_uses_dns_for_aliyun_oss_service_endpoint() {
3264 let alias = Alias::new(
3265 "aliyun",
3266 "https://oss-cn-hangzhou.aliyuncs.com",
3267 "access-key",
3268 "secret-key",
3269 );
3270
3271 assert!(!force_path_style_for_alias(&alias));
3272 }
3273
3274 #[test]
3275 fn auto_bucket_lookup_uses_dns_for_aliyun_internal_service_endpoint() {
3276 let alias = Alias::new(
3277 "aliyun",
3278 "https://oss-cn-hangzhou-internal.aliyuncs.com",
3279 "access-key",
3280 "secret-key",
3281 );
3282
3283 assert!(!force_path_style_for_alias(&alias));
3284 }
3285
3286 #[test]
3287 fn auto_bucket_lookup_keeps_path_style_for_custom_endpoint() {
3288 let alias = Alias::new("local", "http://localhost:9000", "access-key", "secret-key");
3289
3290 assert!(force_path_style_for_alias(&alias));
3291 }
3292
3293 #[test]
3294 fn auto_bucket_lookup_keeps_path_style_for_non_oss_aliyun_endpoint() {
3295 let alias = Alias::new(
3296 "aliyun",
3297 "https://ecs-cn-hangzhou.aliyuncs.com",
3298 "access-key",
3299 "secret-key",
3300 );
3301
3302 assert!(force_path_style_for_alias(&alias));
3303 }
3304
3305 #[test]
3306 fn auto_bucket_lookup_keeps_path_style_for_invalid_endpoint() {
3307 let alias = Alias::new("broken", "not a valid endpoint", "access-key", "secret-key");
3308
3309 assert!(force_path_style_for_alias(&alias));
3310 }
3311
3312 #[test]
3313 fn explicit_bucket_lookup_overrides_auto_detection() {
3314 let mut path_alias = Alias::new(
3315 "aliyun",
3316 "https://oss-cn-hangzhou.aliyuncs.com",
3317 "access-key",
3318 "secret-key",
3319 );
3320 path_alias.bucket_lookup = "path".to_string();
3321
3322 let mut dns_alias =
3323 Alias::new("local", "http://localhost:9000", "access-key", "secret-key");
3324 dns_alias.bucket_lookup = "dns".to_string();
3325
3326 assert!(force_path_style_for_alias(&path_alias));
3327 assert!(!force_path_style_for_alias(&dns_alias));
3328 }
3329
3330 #[test]
3331 fn unknown_bucket_lookup_keeps_path_style() {
3332 let mut alias = Alias::new(
3333 "aliyun",
3334 "https://oss-cn-hangzhou.aliyuncs.com",
3335 "access-key",
3336 "secret-key",
3337 );
3338 alias.bucket_lookup = "unexpected".to_string();
3339
3340 assert!(force_path_style_for_alias(&alias));
3341 }
3342
3343 #[test]
3344 fn parse_replication_configuration_xml_reads_delete_replication() {
3345 let body = r#"<?xml version="1.0" encoding="UTF-8"?>
3346<ReplicationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
3347 <Role>arn:rustfs:replication:us-east-1:123:test</Role>
3348 <Rule>
3349 <Status>Enabled</Status>
3350 <Destination>
3351 <Bucket>arn:rustfs:replication:us-east-1:123:dest</Bucket>
3352 <StorageClass>STANDARD</StorageClass>
3353 </Destination>
3354 <ID>rule-1</ID>
3355 <Priority>1</Priority>
3356 <Filter>
3357 <Prefix>logs/</Prefix>
3358 </Filter>
3359 <ExistingObjectReplication>
3360 <Status>Enabled</Status>
3361 </ExistingObjectReplication>
3362 <DeleteMarkerReplication>
3363 <Status>Disabled</Status>
3364 </DeleteMarkerReplication>
3365 <DeleteReplication>
3366 <Status>Enabled</Status>
3367 </DeleteReplication>
3368 </Rule>
3369</ReplicationConfiguration>"#;
3370
3371 let config = parse_replication_configuration_xml(body).expect("parse replication xml");
3372 assert_eq!(config.role, "arn:rustfs:replication:us-east-1:123:test");
3373 assert_eq!(config.rules.len(), 1);
3374 assert_eq!(config.rules[0].id, "rule-1");
3375 assert_eq!(config.rules[0].prefix.as_deref(), Some("logs/"));
3376 assert_eq!(config.rules[0].delete_replication, Some(true));
3377 assert_eq!(config.rules[0].delete_marker_replication, Some(false));
3378 assert_eq!(config.rules[0].existing_object_replication, Some(true));
3379 }
3380
3381 #[test]
3382 fn parse_replication_configuration_xml_preserves_tag_filters() {
3383 let body = r#"<?xml version="1.0" encoding="UTF-8"?>
3384<ReplicationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
3385 <Rule>
3386 <Status>Enabled</Status>
3387 <Destination>
3388 <Bucket>arn:rustfs:replication:us-east-1:123:dest</Bucket>
3389 </Destination>
3390 <ID>tagged-rule</ID>
3391 <Priority>2</Priority>
3392 <Filter>
3393 <And>
3394 <Prefix>logs/</Prefix>
3395 <Tag>
3396 <Key>env</Key>
3397 <Value>prod</Value>
3398 </Tag>
3399 <Tag>
3400 <Key>team</Key>
3401 <Value>core</Value>
3402 </Tag>
3403 </And>
3404 </Filter>
3405 </Rule>
3406</ReplicationConfiguration>"#;
3407
3408 let config = parse_replication_configuration_xml(body).expect("parse replication xml");
3409 let rule = &config.rules[0];
3410 assert_eq!(rule.prefix.as_deref(), Some("logs/"));
3411 let tags = rule.tags.as_ref().expect("tag filters");
3412 assert_eq!(tags.get("env").map(String::as_str), Some("prod"));
3413 assert_eq!(tags.get("team").map(String::as_str), Some("core"));
3414 }
3415
3416 #[test]
3417 fn build_replication_configuration_xml_writes_delete_replication() {
3418 let config = ReplicationConfiguration {
3419 role: "arn:rustfs:replication:us-east-1:123:test".to_string(),
3420 rules: vec![rc_core::ReplicationRule {
3421 id: "rule-1".to_string(),
3422 priority: 1,
3423 status: rc_core::ReplicationRuleStatus::Enabled,
3424 prefix: Some("logs/".to_string()),
3425 tags: None,
3426 destination: rc_core::ReplicationDestination {
3427 bucket_arn: "arn:rustfs:replication:us-east-1:123:dest".to_string(),
3428 storage_class: Some("STANDARD".to_string()),
3429 },
3430 delete_marker_replication: Some(true),
3431 existing_object_replication: Some(true),
3432 delete_replication: Some(true),
3433 }],
3434 };
3435
3436 let xml = build_replication_configuration_xml(&config);
3437 assert!(xml.contains("<DeleteReplication><Status>Enabled</Status></DeleteReplication>"));
3438 assert!(xml.contains(
3439 "<ExistingObjectReplication><Status>Enabled</Status></ExistingObjectReplication>"
3440 ));
3441 assert!(xml.contains(
3442 "<DeleteMarkerReplication><Status>Enabled</Status></DeleteMarkerReplication>"
3443 ));
3444 assert!(xml.contains("<Filter><Prefix>logs/</Prefix></Filter>"));
3445 }
3446
3447 #[test]
3448 fn build_replication_configuration_xml_writes_and_tag_filters() {
3449 let mut tags = HashMap::new();
3450 tags.insert("env".to_string(), "prod".to_string());
3451 tags.insert("team".to_string(), "core".to_string());
3452
3453 let config = ReplicationConfiguration {
3454 role: String::new(),
3455 rules: vec![rc_core::ReplicationRule {
3456 id: "rule-1".to_string(),
3457 priority: 1,
3458 status: rc_core::ReplicationRuleStatus::Enabled,
3459 prefix: Some("logs/".to_string()),
3460 tags: Some(tags),
3461 destination: rc_core::ReplicationDestination {
3462 bucket_arn: "arn:rustfs:replication:us-east-1:123:dest".to_string(),
3463 storage_class: None,
3464 },
3465 delete_marker_replication: None,
3466 existing_object_replication: None,
3467 delete_replication: None,
3468 }],
3469 };
3470
3471 let xml = build_replication_configuration_xml(&config);
3472 assert!(xml.contains("<Filter><And><Prefix>logs/</Prefix>"));
3473 assert!(xml.contains("<Tag><Key>env</Key><Value>prod</Value></Tag>"));
3474 assert!(xml.contains("<Tag><Key>team</Key><Value>core</Value></Tag>"));
3475 }
3476
3477 #[test]
3478 fn build_lifecycle_rule_filter_preserves_prefix_and_tags() {
3479 let mut tags = HashMap::new();
3480 tags.insert("env".to_string(), "prod".to_string());
3481 tags.insert("team".to_string(), "core".to_string());
3482
3483 let filter = build_lifecycle_rule_filter(Some("logs/"), Some(&tags))
3484 .expect("build lifecycle filter")
3485 .expect("lifecycle filter");
3486
3487 assert_eq!(
3488 parse_lifecycle_filter_prefix(Some(&filter)).as_deref(),
3489 Some("logs/")
3490 );
3491 let parsed_tags = parse_lifecycle_filter_tags(Some(&filter)).expect("parsed tags");
3492 assert_eq!(parsed_tags.get("env").map(String::as_str), Some("prod"));
3493 assert_eq!(parsed_tags.get("team").map(String::as_str), Some("core"));
3494 }
3495
3496 #[test]
3497 fn bucket_policy_error_kind_uses_error_code() {
3498 assert_eq!(
3499 S3Client::bucket_policy_error_kind(Some("NoSuchBucketPolicy"), Some(404), ""),
3500 BucketPolicyErrorKind::MissingPolicy
3501 );
3502 assert_eq!(
3503 S3Client::bucket_policy_error_kind(Some("NoSuchBucket"), Some(404), ""),
3504 BucketPolicyErrorKind::MissingBucket
3505 );
3506 }
3507
3508 #[test]
3509 fn bucket_policy_error_kind_prefers_bucket_not_found_over_404_fallback() {
3510 assert_eq!(
3511 S3Client::bucket_policy_error_kind(None, Some(404), "NoSuchBucket"),
3512 BucketPolicyErrorKind::MissingBucket
3513 );
3514 assert_eq!(
3515 S3Client::bucket_policy_error_kind(None, Some(404), "no details"),
3516 BucketPolicyErrorKind::MissingPolicy
3517 );
3518 }
3519
3520 #[test]
3521 fn bucket_policy_error_mapping_returns_expected_result() {
3522 let get_missing_policy = S3Client::map_get_bucket_policy_error(
3523 "bucket",
3524 BucketPolicyErrorKind::MissingPolicy,
3525 "NoSuchPolicy",
3526 )
3527 .expect("missing policy should map to Ok(None)");
3528 assert!(get_missing_policy.is_none());
3529
3530 match S3Client::map_get_bucket_policy_error(
3531 "bucket",
3532 BucketPolicyErrorKind::MissingBucket,
3533 "NoSuchBucket",
3534 ) {
3535 Err(Error::NotFound(message)) => assert!(message.contains("Bucket not found")),
3536 other => panic!("Expected NotFound for missing bucket, got: {:?}", other),
3537 }
3538
3539 let delete_missing_policy = S3Client::map_delete_bucket_policy_error(
3540 "bucket",
3541 BucketPolicyErrorKind::MissingPolicy,
3542 "NoSuchPolicy",
3543 );
3544 assert!(
3545 delete_missing_policy.is_ok(),
3546 "Missing policy should be treated as successful delete"
3547 );
3548 }
3549
3550 #[test]
3551 fn notification_filter_round_trip_prefix_and_suffix() {
3552 let filter = S3Client::build_notification_filter(Some("logs/"), Some(".json"))
3553 .expect("filter should be built");
3554 let (prefix, suffix) = S3Client::extract_notification_filter(Some(&filter));
3555 assert_eq!(prefix.as_deref(), Some("logs/"));
3556 assert_eq!(suffix.as_deref(), Some(".json"));
3557 }
3558
3559 #[test]
3560 fn notification_filter_none_when_empty() {
3561 assert!(S3Client::build_notification_filter(None, None).is_none());
3562 }
3563
3564 #[test]
3565 fn notifications_equivalent_ignores_order_and_duplicate_events() {
3566 let expected = vec![
3567 BucketNotification {
3568 id: Some("a".to_string()),
3569 target: NotificationTarget::Queue,
3570 arn: "arn:aws:sqs:us-east-1:123456789012:q".to_string(),
3571 events: vec![
3572 "s3:ObjectCreated:*".to_string(),
3573 "s3:ObjectCreated:*".to_string(),
3574 ],
3575 prefix: Some("images/".to_string()),
3576 suffix: Some(".jpg".to_string()),
3577 },
3578 BucketNotification {
3579 id: Some("b".to_string()),
3580 target: NotificationTarget::Topic,
3581 arn: "arn:aws:sns:us-east-1:123456789012:t".to_string(),
3582 events: vec!["s3:ObjectRemoved:*".to_string()],
3583 prefix: None,
3584 suffix: None,
3585 },
3586 ];
3587
3588 let actual = vec![
3589 BucketNotification {
3590 id: None,
3591 target: NotificationTarget::Topic,
3592 arn: "arn:aws:sns:us-east-1:123456789012:t".to_string(),
3593 events: vec!["s3:ObjectRemoved:*".to_string()],
3594 prefix: None,
3595 suffix: None,
3596 },
3597 BucketNotification {
3598 id: None,
3599 target: NotificationTarget::Queue,
3600 arn: "arn:aws:sqs:us-east-1:123456789012:q".to_string(),
3601 events: vec!["s3:ObjectCreated:*".to_string()],
3602 prefix: Some("images/".to_string()),
3603 suffix: Some(".jpg".to_string()),
3604 },
3605 ];
3606
3607 assert!(S3Client::notifications_equivalent(&expected, &actual));
3608 }
3609
3610 #[test]
3611 fn sdk_cors_rule_to_core_preserves_optional_fields() {
3612 let sdk_rule = aws_sdk_s3::types::CorsRule::builder()
3613 .id("web-app")
3614 .allowed_origins("https://app.example.com")
3615 .allowed_methods("get")
3616 .allowed_headers("Authorization")
3617 .expose_headers("ETag")
3618 .max_age_seconds(300)
3619 .build()
3620 .expect("build cors rule");
3621
3622 let rule = sdk_cors_rule_to_core(&sdk_rule);
3623 assert_eq!(rule.id.as_deref(), Some("web-app"));
3624 assert_eq!(
3625 rule.allowed_origins,
3626 vec!["https://app.example.com".to_string()]
3627 );
3628 assert_eq!(rule.allowed_methods, vec!["get".to_string()]);
3629 assert_eq!(
3630 rule.allowed_headers,
3631 Some(vec!["Authorization".to_string()])
3632 );
3633 assert_eq!(rule.expose_headers, Some(vec!["ETag".to_string()]));
3634 assert_eq!(rule.max_age_seconds, Some(300));
3635 }
3636
3637 #[test]
3638 fn bucket_encryption_rule_maps_sse_s3() {
3639 let value = aws_sdk_s3::types::ServerSideEncryptionByDefault::builder()
3640 .sse_algorithm(aws_sdk_s3::types::ServerSideEncryption::Aes256)
3641 .build()
3642 .expect("build rule");
3643
3644 let encryption = sdk_bucket_encryption_to_core(&value).expect("map rule");
3645 assert_eq!(encryption, BucketEncryption::SseS3);
3646 }
3647
3648 #[test]
3649 fn bucket_encryption_rule_maps_sse_kms() {
3650 let value = aws_sdk_s3::types::ServerSideEncryptionByDefault::builder()
3651 .sse_algorithm(aws_sdk_s3::types::ServerSideEncryption::AwsKms)
3652 .kms_master_key_id("kms-key")
3653 .build()
3654 .expect("build rule");
3655
3656 let encryption = sdk_bucket_encryption_to_core(&value).expect("map rule");
3657 assert_eq!(
3658 encryption,
3659 BucketEncryption::SseKms {
3660 key_id: Some("kms-key".to_string()),
3661 }
3662 );
3663 }
3664
3665 #[test]
3666 fn bucket_encryption_rule_without_kms_key_maps_to_default_kms() {
3667 let value = aws_sdk_s3::types::ServerSideEncryptionByDefault::builder()
3668 .sse_algorithm(aws_sdk_s3::types::ServerSideEncryption::AwsKms)
3669 .build()
3670 .expect("build rule");
3671
3672 let encryption = sdk_bucket_encryption_to_core(&value).expect("map default kms rule");
3673 assert_eq!(encryption, BucketEncryption::SseKms { key_id: None });
3674 }
3675
3676 #[test]
3677 fn missing_bucket_encryption_errors_are_detected() {
3678 assert!(is_missing_bucket_encryption_error(
3679 "ServerSideEncryptionConfigurationNotFoundError"
3680 ));
3681 assert!(is_missing_bucket_encryption_error(
3682 "The server-side encryption configuration was not found"
3683 ));
3684 assert!(!is_missing_bucket_encryption_error("AccessDenied"));
3685 }
3686
3687 #[test]
3688 fn missing_bucket_encryption_response_detects_code_and_status() {
3689 assert!(is_missing_bucket_encryption_response(
3690 Some("ServerSideEncryptionConfigurationNotFoundError"),
3691 Some(404),
3692 "service error"
3693 ));
3694 assert!(is_missing_bucket_encryption_response(
3695 Some("NoSuchBucketEncryption"),
3696 Some(404),
3697 "service error"
3698 ));
3699 assert!(is_missing_bucket_encryption_response(
3700 None,
3701 Some(404),
3702 "The server-side encryption configuration was not found"
3703 ));
3704 assert!(is_missing_bucket_encryption_response(
3705 None,
3706 None,
3707 "The server-side encryption configuration was not found"
3708 ));
3709 assert!(!is_missing_bucket_encryption_response(
3710 Some("AccessDenied"),
3711 Some(403),
3712 "access denied"
3713 ));
3714 assert!(!is_missing_bucket_encryption_response(
3715 None,
3716 Some(500),
3717 "The server-side encryption configuration was not found"
3718 ));
3719 }
3720
3721 #[test]
3722 fn sdk_cors_rule_to_core_drops_empty_optional_headers() {
3723 let sdk_rule = aws_sdk_s3::types::CorsRule::builder()
3724 .allowed_origins("https://app.example.com")
3725 .allowed_methods("GET")
3726 .build()
3727 .expect("build cors rule");
3728
3729 let rule = sdk_cors_rule_to_core(&sdk_rule);
3730 assert_eq!(rule.allowed_headers, None);
3731 assert_eq!(rule.expose_headers, None);
3732 }
3733
3734 #[test]
3735 fn core_cors_rule_to_sdk_normalizes_method_case() {
3736 let rule = CorsRule {
3737 id: Some("public-read".to_string()),
3738 allowed_origins: vec!["*".to_string()],
3739 allowed_methods: vec!["get".to_string(), "post".to_string()],
3740 allowed_headers: Some(vec!["*".to_string()]),
3741 expose_headers: None,
3742 max_age_seconds: Some(600),
3743 };
3744
3745 let sdk_rule = core_cors_rule_to_sdk(&rule).expect("convert cors rule");
3746 assert_eq!(sdk_rule.id(), Some("public-read"));
3747 assert_eq!(sdk_rule.allowed_origins(), ["*"]);
3748 assert_eq!(sdk_rule.allowed_methods(), ["GET", "POST"]);
3749 assert_eq!(sdk_rule.allowed_headers(), ["*"]);
3750 assert_eq!(sdk_rule.max_age_seconds(), Some(600));
3751 }
3752
3753 #[tokio::test]
3754 async fn set_bucket_cors_sends_rule_fields() {
3755 let response = http::Response::builder()
3756 .status(200)
3757 .body(SdkBody::from(""))
3758 .expect("build put bucket cors response");
3759 let (client, request_receiver) = test_s3_client(Some(response));
3760
3761 client
3762 .set_bucket_cors(
3763 "bucket",
3764 vec![CorsRule {
3765 id: Some("web-app".to_string()),
3766 allowed_origins: vec!["https://app.example.com".to_string()],
3767 allowed_methods: vec!["GET".to_string(), "POST".to_string()],
3768 allowed_headers: Some(vec!["Authorization".to_string()]),
3769 expose_headers: Some(vec!["ETag".to_string()]),
3770 max_age_seconds: Some(600),
3771 }],
3772 )
3773 .await
3774 .expect("set bucket cors");
3775
3776 let request = request_receiver.expect_request();
3777 assert_eq!(request.method(), http::Method::PUT);
3778 assert!(
3779 request.uri().to_string().contains("?cors"),
3780 "expected bucket CORS subresource in URI: {}",
3781 request.uri()
3782 );
3783
3784 let body = request.body().bytes().expect("request body bytes");
3785 let body = std::str::from_utf8(body).expect("request body is utf8");
3786 assert!(body.contains("<ID>web-app</ID>"));
3787 assert!(body.contains("<AllowedOrigin>https://app.example.com</AllowedOrigin>"));
3788 assert!(body.contains("<AllowedMethod>GET</AllowedMethod>"));
3789 assert!(body.contains("<AllowedMethod>POST</AllowedMethod>"));
3790 assert!(body.contains("<AllowedHeader>Authorization</AllowedHeader>"));
3791 assert!(body.contains("<ExposeHeader>ETag</ExposeHeader>"));
3792 assert!(body.contains("<MaxAgeSeconds>600</MaxAgeSeconds>"));
3793 }
3794
3795 #[tokio::test]
3796 async fn get_bucket_encryption_sends_expected_request_shape() {
3797 let response = http::Response::builder()
3798 .status(200)
3799 .body(SdkBody::from(
3800 r#"<?xml version="1.0" encoding="UTF-8"?>
3801<ServerSideEncryptionConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
3802 <Rule>
3803 <ApplyServerSideEncryptionByDefault>
3804 <SSEAlgorithm>AES256</SSEAlgorithm>
3805 </ApplyServerSideEncryptionByDefault>
3806 </Rule>
3807</ServerSideEncryptionConfiguration>"#,
3808 ))
3809 .expect("build get bucket encryption response");
3810 let (client, request_receiver) = test_s3_client(Some(response));
3811
3812 let encryption = client
3813 .get_bucket_encryption("bucket")
3814 .await
3815 .expect("get bucket encryption");
3816
3817 assert_eq!(encryption, Some(BucketEncryption::SseS3));
3818
3819 let request = request_receiver.expect_request();
3820 assert_eq!(request.method(), http::Method::GET);
3821 assert!(
3822 request.uri().to_string().contains("?encryption"),
3823 "expected bucket encryption subresource in URI: {}",
3824 request.uri()
3825 );
3826 }
3827
3828 #[tokio::test]
3829 async fn get_bucket_encryption_missing_configuration_returns_none() {
3830 let response = http::Response::builder()
3831 .status(404)
3832 .header(
3833 "x-amz-error-code",
3834 "ServerSideEncryptionConfigurationNotFoundError",
3835 )
3836 .body(SdkBody::from(
3837 r#"<?xml version="1.0" encoding="UTF-8"?>
3838<Error>
3839 <Code>ServerSideEncryptionConfigurationNotFoundError</Code>
3840 <Message>The server-side encryption configuration was not found</Message>
3841</Error>"#,
3842 ))
3843 .expect("build missing bucket encryption response");
3844 let (client, _) = test_s3_client(Some(response));
3845
3846 let encryption = client
3847 .get_bucket_encryption("bucket")
3848 .await
3849 .expect("missing bucket encryption should map to None");
3850
3851 assert_eq!(encryption, None);
3852 }
3853
3854 #[tokio::test]
3855 async fn get_bucket_encryption_missing_rule_errors() {
3856 let response = http::Response::builder()
3857 .status(200)
3858 .body(SdkBody::from(
3859 r#"<?xml version="1.0" encoding="UTF-8"?>
3860<ServerSideEncryptionConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
3861 <Rule />
3862</ServerSideEncryptionConfiguration>"#,
3863 ))
3864 .expect("build malformed bucket encryption response");
3865 let (client, _) = test_s3_client(Some(response));
3866
3867 match client.get_bucket_encryption("bucket").await {
3868 Err(Error::General(message)) => {
3869 assert!(message.contains("missing bucket encryption rule"))
3870 }
3871 other => panic!("expected missing rule error, got {other:?}"),
3872 }
3873 }
3874
3875 #[tokio::test]
3876 async fn set_bucket_encryption_sends_expected_request_shape() {
3877 let response = http::Response::builder()
3878 .status(200)
3879 .body(SdkBody::from(""))
3880 .expect("build put bucket encryption response");
3881 let (client, request_receiver) = test_s3_client(Some(response));
3882
3883 client
3884 .set_bucket_encryption(
3885 "bucket",
3886 BucketEncryption::SseKms {
3887 key_id: Some("kms-key".to_string()),
3888 },
3889 )
3890 .await
3891 .expect("set bucket encryption");
3892
3893 let request = request_receiver.expect_request();
3894 assert_eq!(request.method(), http::Method::PUT);
3895 assert!(
3896 request.uri().to_string().contains("?encryption"),
3897 "expected bucket encryption subresource in URI: {}",
3898 request.uri()
3899 );
3900
3901 let body = request.body().bytes().expect("request body bytes");
3902 let body = std::str::from_utf8(body).expect("request body is utf8");
3903 assert!(body.contains("<ServerSideEncryptionConfiguration"));
3904 assert!(body.contains("<SSEAlgorithm>aws:kms</SSEAlgorithm>"));
3905 assert!(body.contains("<KMSMasterKeyID>kms-key</KMSMasterKeyID>"));
3906 }
3907
3908 #[tokio::test]
3909 async fn delete_bucket_encryption_missing_configuration_is_successful() {
3910 let response = http::Response::builder()
3911 .status(404)
3912 .header("x-amz-error-code", "NoSuchBucketEncryption")
3913 .body(SdkBody::from(
3914 r#"<?xml version="1.0" encoding="UTF-8"?>
3915<Error>
3916 <Code>NoSuchBucketEncryption</Code>
3917 <Message>The server-side encryption configuration was not found</Message>
3918</Error>"#,
3919 ))
3920 .expect("build missing bucket encryption delete response");
3921 let (client, _) = test_s3_client(Some(response));
3922
3923 client
3924 .delete_bucket_encryption("bucket")
3925 .await
3926 .expect("missing bucket encryption should be treated as successful delete");
3927 }
3928
3929 #[tokio::test]
3930 async fn delete_bucket_encryption_sends_expected_request_shape() {
3931 let response = http::Response::builder()
3932 .status(204)
3933 .body(SdkBody::from(""))
3934 .expect("build delete bucket encryption response");
3935 let (client, request_receiver) = test_s3_client(Some(response));
3936
3937 client
3938 .delete_bucket_encryption("bucket")
3939 .await
3940 .expect("delete bucket encryption");
3941
3942 let request = request_receiver.expect_request();
3943 assert_eq!(request.method(), http::Method::DELETE);
3944 assert!(
3945 request.uri().to_string().contains("?encryption"),
3946 "expected bucket encryption subresource in URI: {}",
3947 request.uri()
3948 );
3949 }
3950
3951 #[test]
3952 fn core_cors_rule_to_sdk_drops_empty_optional_headers() {
3953 let rule = CorsRule {
3954 id: None,
3955 allowed_origins: vec!["https://app.example.com".to_string()],
3956 allowed_methods: vec!["GET".to_string()],
3957 allowed_headers: Some(Vec::new()),
3958 expose_headers: Some(Vec::new()),
3959 max_age_seconds: None,
3960 };
3961
3962 let sdk_rule = core_cors_rule_to_sdk(&rule).expect("convert cors rule");
3963 assert!(sdk_rule.allowed_headers().is_empty());
3964 assert!(sdk_rule.expose_headers().is_empty());
3965 }
3966
3967 #[test]
3968 fn parse_cors_configuration_xml_round_trips_rule_fields() {
3969 let body = r#"
3970<CORSConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
3971 <CORSRule>
3972 <ID>mc-rule</ID>
3973 <AllowedOrigin>https://console.example.com</AllowedOrigin>
3974 <AllowedMethod>GET</AllowedMethod>
3975 <AllowedMethod>POST</AllowedMethod>
3976 <AllowedHeader>*</AllowedHeader>
3977 <ExposeHeader>ETag</ExposeHeader>
3978 <MaxAgeSeconds>1200</MaxAgeSeconds>
3979 </CORSRule>
3980</CORSConfiguration>
3981"#;
3982
3983 let rules = parse_cors_configuration_xml(body).expect("parse cors xml");
3984 assert_eq!(rules.len(), 1);
3985 assert_eq!(rules[0].id.as_deref(), Some("mc-rule"));
3986 assert_eq!(
3987 rules[0].allowed_origins,
3988 vec!["https://console.example.com".to_string()]
3989 );
3990 assert_eq!(
3991 rules[0].allowed_methods,
3992 vec!["GET".to_string(), "POST".to_string()]
3993 );
3994 assert_eq!(rules[0].allowed_headers, Some(vec!["*".to_string()]));
3995 assert_eq!(rules[0].expose_headers, Some(vec!["ETag".to_string()]));
3996 assert_eq!(rules[0].max_age_seconds, Some(1200));
3997 }
3998
3999 #[test]
4000 fn cors_url_uses_path_style_bucket_and_query() {
4001 let (client, _) = test_s3_client(None);
4002
4003 let url = client.cors_url("bucket-name").expect("build cors url");
4004
4005 assert_eq!(url.as_str(), "https://example.com/bucket-name?cors=");
4006 }
4007
4008 #[test]
4009 fn cors_url_rejects_endpoints_without_path_segments() {
4010 let (client, _) = test_s3_client_with_endpoint("mailto:test@example.com", None);
4011
4012 match client.cors_url("bucket-name") {
4013 Err(Error::Network(message)) => {
4014 assert!(message.contains("does not support path-style bucket operations"));
4015 }
4016 other => panic!("expected path-style endpoint error, got {other:?}"),
4017 }
4018 }
4019
4020 #[test]
4021 fn missing_cors_configuration_errors_are_detected() {
4022 assert!(is_missing_cors_configuration_error(
4023 "NoSuchCORSConfiguration"
4024 ));
4025 assert!(is_missing_cors_configuration_error(
4026 "The CORS configuration does not exist"
4027 ));
4028 assert!(!is_missing_cors_configuration_error("AccessDenied"));
4029 }
4030
4031 #[test]
4032 fn missing_cors_configuration_response_detects_code_and_status() {
4033 assert!(is_missing_cors_configuration_response(
4034 Some("NoSuchCORSConfiguration"),
4035 Some(404),
4036 "service error"
4037 ));
4038 assert!(is_missing_cors_configuration_response(
4039 None,
4040 Some(404),
4041 "The CORS configuration does not exist"
4042 ));
4043 assert!(!is_missing_cors_configuration_response(
4044 Some("AccessDenied"),
4045 Some(403),
4046 "access denied"
4047 ));
4048 assert!(!is_missing_cors_configuration_response(
4049 None,
4050 Some(404),
4051 "service error"
4052 ));
4053 assert!(!is_missing_cors_configuration_response(
4054 Some("NoSuchBucket"),
4055 Some(404),
4056 "NoSuchBucket"
4057 ));
4058 assert!(is_missing_cors_configuration_response(
4059 None,
4060 None,
4061 "The CORS configuration does not exist"
4062 ));
4063 assert!(!is_missing_cors_configuration_response(
4064 None,
4065 Some(500),
4066 "The CORS configuration does not exist"
4067 ));
4068 }
4069
4070 #[tokio::test]
4071 async fn get_bucket_cors_missing_configuration_returns_empty_rules() {
4072 let response = http::Response::builder()
4073 .status(404)
4074 .header("x-amz-error-code", "NoSuchCORSConfiguration")
4075 .body(SdkBody::from(
4076 r#"<?xml version="1.0" encoding="UTF-8"?>
4077<Error>
4078 <Code>NoSuchCORSConfiguration</Code>
4079 <Message>The CORS configuration does not exist</Message>
4080</Error>"#,
4081 ))
4082 .expect("build missing cors response");
4083 let (client, _) = test_s3_client(Some(response));
4084
4085 let rules = client
4086 .get_bucket_cors("bucket")
4087 .await
4088 .expect("missing cors config should be treated as empty");
4089
4090 assert!(rules.is_empty());
4091 }
4092
4093 #[tokio::test]
4094 async fn delete_bucket_cors_missing_configuration_is_successful() {
4095 let response = http::Response::builder()
4096 .status(404)
4097 .header("x-amz-error-code", "NoSuchCORSConfiguration")
4098 .body(SdkBody::from(
4099 r#"<?xml version="1.0" encoding="UTF-8"?>
4100<Error>
4101 <Code>NoSuchCORSConfiguration</Code>
4102 <Message>The CORS configuration does not exist</Message>
4103</Error>"#,
4104 ))
4105 .expect("build missing cors response");
4106 let (client, _) = test_s3_client(Some(response));
4107
4108 client
4109 .delete_bucket_cors("bucket")
4110 .await
4111 .expect("missing cors config should be treated as successful delete");
4112 }
4113
4114 #[tokio::test]
4115 async fn reqwest_connector_insecure_without_ca_bundle_succeeds() {
4116 let connector = ReqwestConnector::new(true, None, None, None).await;
4118 assert!(
4119 connector.is_ok(),
4120 "Expected insecure connector creation to succeed"
4121 );
4122 }
4123
4124 #[tokio::test]
4125 async fn reqwest_connector_invalid_ca_bundle_path_surfaces_error() {
4126 let result = ReqwestConnector::new(false, Some(""), None, None).await;
4128 match result {
4129 Err(Error::Network(msg)) => {
4130 assert!(
4131 msg.contains("Failed to read CA bundle"),
4132 "Unexpected error message: {msg}"
4133 );
4134 }
4135 other => panic!("Expected Error::Network for invalid path, got: {:?}", other),
4136 }
4137 }
4138
4139 #[test]
4140 fn should_use_multipart_for_large_files() {
4141 assert!(S3Client::should_use_multipart(
4142 SINGLE_PUT_OBJECT_MAX_SIZE + 1
4143 ));
4144 }
4145
4146 #[test]
4147 fn should_use_single_part_for_small_files() {
4148 assert!(!S3Client::should_use_multipart(0));
4149 assert!(!S3Client::should_use_multipart(1024 * 1024));
4150 assert!(!S3Client::should_use_multipart(
4151 crate::multipart::DEFAULT_PART_SIZE
4152 ));
4153 assert!(!S3Client::should_use_multipart(SINGLE_PUT_OBJECT_MAX_SIZE));
4154 }
4155
4156 #[tokio::test]
4157 async fn delete_object_with_force_delete_sets_rustfs_header() {
4158 let (client, request_receiver) = test_s3_client(None);
4159 let path = RemotePath::new("test", "bucket", "key.txt");
4160
4161 let _ = client
4162 .delete_object_with_options(&path, DeleteRequestOptions { force_delete: true })
4163 .await;
4164
4165 let request = request_receiver.expect_request();
4166 assert_eq!(request.headers().get("x-rustfs-force-delete"), Some("true"));
4167 }
4168
4169 #[tokio::test]
4170 async fn custom_headers_are_added_before_sending_sdk_requests() {
4171 let (client, request_receiver) = test_s3_client_with_endpoint_and_headers(
4172 "https://example.com",
4173 None,
4174 vec![RequestHeader {
4175 name: "x-amz-bucket-encrypt-enabled".to_string(),
4176 value: "1".to_string(),
4177 }],
4178 );
4179 let path = RemotePath::new("test", "bucket", "key.txt");
4180
4181 let _ = client.delete_object(&path).await;
4182
4183 let request = request_receiver.expect_request();
4184 assert_eq!(
4185 request.headers().get("x-amz-bucket-encrypt-enabled"),
4186 Some("1")
4187 );
4188 assert!(
4189 request
4190 .headers()
4191 .get("authorization")
4192 .expect("authorization header")
4193 .contains("x-amz-bucket-encrypt-enabled")
4194 );
4195 }
4196
4197 #[tokio::test]
4198 async fn custom_headers_are_not_required_by_presigned_urls() {
4199 let (client, _request_receiver) = test_s3_client_with_endpoint_and_headers(
4200 "https://example.com",
4201 None,
4202 vec![RequestHeader {
4203 name: "x-amz-bucket-encrypt-enabled".to_string(),
4204 value: "1".to_string(),
4205 }],
4206 );
4207 let path = RemotePath::new("test", "bucket", "key.txt");
4208
4209 let url = client
4210 .presign_get(&path, 3600)
4211 .await
4212 .expect("presign get should succeed");
4213
4214 assert!(!url.contains("x-amz-bucket-encrypt-enabled"));
4215 }
4216
4217 #[tokio::test]
4218 async fn custom_headers_are_added_to_xml_requests_before_signing() {
4219 let (endpoint, request_receiver, server_handle) = start_xml_test_server();
4220 let (client, _sdk_request_receiver) = test_s3_client_with_endpoint_and_headers(
4221 &endpoint,
4222 None,
4223 vec![RequestHeader {
4224 name: "x-amz-bucket-encrypt-enabled".to_string(),
4225 value: "1".to_string(),
4226 }],
4227 );
4228 let url = client
4229 .replication_url("bucket")
4230 .expect("replication URL should build");
4231
4232 let response = client
4233 .xml_request(
4234 Method::PUT,
4235 url,
4236 Some("application/xml"),
4237 Some(b"<xml/>".to_vec()),
4238 )
4239 .await
4240 .expect("xml request should succeed");
4241
4242 assert_eq!(response, "ok");
4243 let request = request_receiver
4244 .recv_timeout(Duration::from_secs(5))
4245 .expect("server should capture XML request");
4246 assert_eq!(request.method, "PUT");
4247 assert_eq!(request.target, "/bucket?replication=");
4248 assert_eq!(
4249 header_value(&request.headers, "x-amz-bucket-encrypt-enabled"),
4250 Some("1")
4251 );
4252 assert!(
4253 header_value(&request.headers, "authorization")
4254 .expect("authorization header")
4255 .contains("x-amz-bucket-encrypt-enabled")
4256 );
4257 server_handle.join().expect("server thread should finish");
4258 }
4259
4260 #[tokio::test]
4261 async fn delete_object_without_force_delete_omits_rustfs_header() {
4262 let (client, request_receiver) = test_s3_client(None);
4263 let path = RemotePath::new("test", "bucket", "key.txt");
4264
4265 let _ = client
4266 .delete_object_with_options(&path, DeleteRequestOptions::default())
4267 .await;
4268
4269 let request = request_receiver.expect_request();
4270 assert!(request.headers().get("x-rustfs-force-delete").is_none());
4271 }
4272
4273 #[tokio::test]
4274 async fn put_object_applies_sse_s3_headers() {
4275 let (client, request_receiver) = test_s3_client(None);
4276 let path = RemotePath::new("test", "bucket", "file.txt");
4277
4278 client
4279 .put_object(
4280 &path,
4281 b"payload".to_vec(),
4282 Some("text/plain"),
4283 Some(&ObjectEncryptionRequest::SseS3),
4284 )
4285 .await
4286 .expect("put object");
4287
4288 let request = request_receiver.expect_request();
4289 assert_eq!(
4290 request.headers().get("x-amz-server-side-encryption"),
4291 Some("AES256")
4292 );
4293 }
4294
4295 #[tokio::test]
4296 async fn copy_object_applies_sse_kms_headers() {
4297 let response = http::Response::builder()
4298 .status(500)
4299 .header("x-amz-error-code", "InternalError")
4300 .body(SdkBody::from(
4301 r#"<?xml version="1.0" encoding="UTF-8"?>
4302<Error>
4303 <Code>InternalError</Code>
4304 <Message>Something went wrong.</Message>
4305</Error>"#,
4306 ))
4307 .expect("build copy object response");
4308 let (client, request_receiver) = test_s3_client(Some(response));
4309 let src = RemotePath::new("test", "bucket", "src.txt");
4310 let dst = RemotePath::new("test", "bucket", "dst.txt");
4311
4312 let _ = client
4313 .copy_object(
4314 &src,
4315 &dst,
4316 Some(&ObjectEncryptionRequest::SseKms {
4317 key_id: "kms-key".to_string(),
4318 }),
4319 )
4320 .await;
4321
4322 let request = request_receiver.expect_request();
4323 assert_eq!(
4324 request.headers().get("x-amz-server-side-encryption"),
4325 Some("aws:kms")
4326 );
4327 assert_eq!(
4328 request
4329 .headers()
4330 .get("x-amz-server-side-encryption-aws-kms-key-id"),
4331 Some("kms-key")
4332 );
4333 }
4334
4335 #[tokio::test]
4336 async fn delete_object_wrapper_uses_default_options_without_rustfs_header() {
4337 let (client, request_receiver) = test_s3_client(None);
4338 let path = RemotePath::new("test", "bucket", "key.txt");
4339
4340 let _ = ObjectStore::delete_object(&client, &path).await;
4341
4342 let request = request_receiver.expect_request();
4343 assert!(request.headers().get("x-rustfs-force-delete").is_none());
4344 }
4345
4346 #[tokio::test]
4347 async fn delete_object_with_options_maps_missing_keys_to_not_found() {
4348 let response = http::Response::builder()
4349 .status(404)
4350 .header("x-amz-error-code", "NoSuchKey")
4351 .body(SdkBody::from(
4352 r#"<?xml version="1.0" encoding="UTF-8"?>
4353<Error>
4354 <Code>NoSuchKey</Code>
4355 <Message>The specified key does not exist.</Message>
4356</Error>"#,
4357 ))
4358 .expect("build delete object response");
4359 let (client, _request_receiver) = test_s3_client(Some(response));
4360 let path = RemotePath::new("test", "bucket", "missing.txt");
4361
4362 let result = client
4363 .delete_object_with_options(&path, DeleteRequestOptions::default())
4364 .await;
4365
4366 match result {
4367 Err(Error::NotFound(message)) => assert_eq!(message, path.to_string()),
4368 other => panic!("Expected NotFound for missing key, got: {other:?}"),
4369 }
4370 }
4371
4372 #[tokio::test]
4373 async fn delete_object_with_options_maps_other_failures_to_network() {
4374 let response = http::Response::builder()
4375 .status(500)
4376 .header("x-amz-error-code", "InternalError")
4377 .body(SdkBody::from(
4378 r#"<?xml version="1.0" encoding="UTF-8"?>
4379<Error>
4380 <Code>InternalError</Code>
4381 <Message>Something went wrong.</Message>
4382</Error>"#,
4383 ))
4384 .expect("build delete object response");
4385 let (client, _request_receiver) = test_s3_client(Some(response));
4386 let path = RemotePath::new("test", "bucket", "key.txt");
4387
4388 let result = client
4389 .delete_object_with_options(&path, DeleteRequestOptions::default())
4390 .await;
4391
4392 match result {
4393 Err(Error::Network(message)) => assert!(message.contains("InternalError")),
4394 other => panic!("Expected Network for delete failure, got: {other:?}"),
4395 }
4396 }
4397
4398 #[tokio::test]
4399 async fn list_object_versions_page_preserves_markers_and_delete_markers() {
4400 let response = http::Response::builder()
4401 .status(200)
4402 .body(SdkBody::from(
4403 r#"<?xml version="1.0" encoding="UTF-8"?>
4404<ListVersionsResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
4405 <Name>bucket</Name>
4406 <Prefix>logs/</Prefix>
4407 <KeyMarker></KeyMarker>
4408 <VersionIdMarker></VersionIdMarker>
4409 <NextKeyMarker>logs/c.txt</NextKeyMarker>
4410 <NextVersionIdMarker>v3</NextVersionIdMarker>
4411 <MaxKeys>25</MaxKeys>
4412 <IsTruncated>true</IsTruncated>
4413 <Version>
4414 <Key>logs/a.txt</Key>
4415 <VersionId>v1</VersionId>
4416 <IsLatest>true</IsLatest>
4417 <LastModified>2026-04-29T11:22:33.000Z</LastModified>
4418 <ETag>"etag-a"</ETag>
4419 <Size>12</Size>
4420 <StorageClass>STANDARD</StorageClass>
4421 </Version>
4422 <DeleteMarker>
4423 <Key>logs/b.txt</Key>
4424 <VersionId>v2</VersionId>
4425 <IsLatest>false</IsLatest>
4426 <LastModified>2026-04-28T10:20:30.000Z</LastModified>
4427 <Owner>
4428 <ID>owner</ID>
4429 </Owner>
4430 </DeleteMarker>
4431</ListVersionsResult>"#,
4432 ))
4433 .expect("build list object versions response");
4434 let (client, request_receiver) = test_s3_client(Some(response));
4435 let path = RemotePath::new("test", "bucket", "logs/");
4436
4437 let result = client
4438 .list_object_versions_page(&path, Some(25))
4439 .await
4440 .expect("list object versions page");
4441
4442 let request = request_receiver.expect_request();
4443 let uri = request.uri().to_string();
4444 assert!(
4445 uri.starts_with("https://example.com/bucket/?"),
4446 "unexpected URI: {uri}"
4447 );
4448 assert!(
4449 uri.contains("versions"),
4450 "expected versions subresource: {uri}"
4451 );
4452 assert!(
4453 uri.contains("prefix=logs%2F"),
4454 "expected prefix query: {uri}"
4455 );
4456 assert!(
4457 uri.contains("max-keys=25"),
4458 "expected max-keys query: {uri}"
4459 );
4460
4461 assert!(result.truncated);
4462 assert_eq!(result.continuation_token.as_deref(), Some("logs/c.txt"));
4463 assert_eq!(result.version_id_marker.as_deref(), Some("v3"));
4464 assert_eq!(result.items.len(), 2);
4465
4466 let version = &result.items[0];
4467 assert_eq!(version.key, "logs/a.txt");
4468 assert_eq!(version.version_id, "v1");
4469 assert!(version.is_latest);
4470 assert!(!version.is_delete_marker);
4471 assert_eq!(version.size_bytes, Some(12));
4472 assert_eq!(version.etag.as_deref(), Some("etag-a"));
4473
4474 let delete_marker = &result.items[1];
4475 assert_eq!(delete_marker.key, "logs/b.txt");
4476 assert_eq!(delete_marker.version_id, "v2");
4477 assert!(!delete_marker.is_latest);
4478 assert!(delete_marker.is_delete_marker);
4479 assert_eq!(delete_marker.size_bytes, None);
4480 assert_eq!(delete_marker.etag, None);
4481 }
4482
4483 #[tokio::test]
4484 async fn list_object_versions_page_maps_missing_bucket_to_not_found() {
4485 let response = http::Response::builder()
4486 .status(404)
4487 .header("x-amz-error-code", "NoSuchBucket")
4488 .body(SdkBody::from(
4489 r#"<?xml version="1.0" encoding="UTF-8"?>
4490<Error>
4491 <Code>NoSuchBucket</Code>
4492 <Message>The specified bucket does not exist.</Message>
4493</Error>"#,
4494 ))
4495 .expect("build missing bucket response");
4496 let (client, _request_receiver) = test_s3_client(Some(response));
4497 let path = RemotePath::new("test", "missing-bucket", "");
4498
4499 let result = client.list_object_versions_page(&path, Some(1000)).await;
4500
4501 match result {
4502 Err(Error::NotFound(message)) => {
4503 assert_eq!(message, "Bucket not found: missing-bucket")
4504 }
4505 other => panic!("Expected NotFound for missing bucket, got: {other:?}"),
4506 }
4507 }
4508
4509 #[tokio::test]
4510 async fn list_object_versions_page_maps_not_found_code_to_not_found() {
4511 let response = http::Response::builder()
4512 .status(404)
4513 .header("x-amz-error-code", "NotFound")
4514 .body(SdkBody::from(
4515 r#"<?xml version="1.0" encoding="UTF-8"?>
4516<Error>
4517 <Code>NotFound</Code>
4518 <Message>The specified bucket does not exist.</Message>
4519</Error>"#,
4520 ))
4521 .expect("build not found bucket response");
4522 let (client, _request_receiver) = test_s3_client(Some(response));
4523 let path = RemotePath::new("test", "missing-bucket", "");
4524
4525 let result = client.list_object_versions_page(&path, Some(1000)).await;
4526
4527 match result {
4528 Err(Error::NotFound(message)) => {
4529 assert_eq!(message, "Bucket not found: missing-bucket")
4530 }
4531 other => panic!("Expected NotFound for NotFound list versions error, got: {other:?}"),
4532 }
4533 }
4534
4535 #[tokio::test]
4536 async fn list_object_versions_page_maps_other_failures_to_network() {
4537 let response = http::Response::builder()
4538 .status(500)
4539 .header("x-amz-error-code", "InternalError")
4540 .body(SdkBody::from(
4541 r#"<?xml version="1.0" encoding="UTF-8"?>
4542<Error>
4543 <Code>InternalError</Code>
4544 <Message>Something went wrong.</Message>
4545</Error>"#,
4546 ))
4547 .expect("build internal error response");
4548 let (client, _request_receiver) = test_s3_client(Some(response));
4549 let path = RemotePath::new("test", "bucket", "");
4550
4551 let result = client.list_object_versions_page(&path, Some(1000)).await;
4552
4553 match result {
4554 Err(Error::Network(message)) => assert!(message.contains("InternalError")),
4555 other => panic!("Expected Network for list versions failure, got: {other:?}"),
4556 }
4557 }
4558
4559 #[tokio::test]
4560 async fn list_buckets_preserves_service_error_code() {
4561 let response = http::Response::builder()
4562 .status(403)
4563 .header("x-amz-error-code", "InvalidAccessKeyId")
4564 .body(SdkBody::from(
4565 r#"<?xml version="1.0" encoding="UTF-8"?>
4566<Error>
4567 <Code>InvalidAccessKeyId</Code>
4568 <Message>The AWS access key Id you provided does not exist in our records.</Message>
4569</Error>"#,
4570 ))
4571 .expect("build list buckets response");
4572 let (client, _request_receiver) = test_s3_client(Some(response));
4573
4574 let result = client.list_buckets().await;
4575
4576 match result {
4577 Err(Error::Network(message)) => assert!(message.contains("InvalidAccessKeyId")),
4578 other => panic!("Expected Network for list buckets failure, got: {other:?}"),
4579 }
4580 }
4581
4582 #[tokio::test]
4583 async fn create_bucket_preserves_service_error_code() {
4584 let response = http::Response::builder()
4585 .status(403)
4586 .header("x-amz-error-code", "InvalidAccessKeyId")
4587 .body(SdkBody::from(
4588 r#"<?xml version="1.0" encoding="UTF-8"?>
4589<Error>
4590 <Code>InvalidAccessKeyId</Code>
4591 <Message>The AWS access key Id you provided does not exist in our records.</Message>
4592</Error>"#,
4593 ))
4594 .expect("build create bucket response");
4595 let (client, _request_receiver) = test_s3_client(Some(response));
4596
4597 let result = client.create_bucket("bucket").await;
4598
4599 match result {
4600 Err(Error::Network(message)) => assert!(message.contains("InvalidAccessKeyId")),
4601 other => panic!("Expected Network for create bucket failure, got: {other:?}"),
4602 }
4603 }
4604
4605 #[tokio::test]
4606 async fn delete_bucket_maps_bucket_not_empty_to_conflict() {
4607 let response = http::Response::builder()
4608 .status(409)
4609 .header("x-amz-error-code", "BucketNotEmpty")
4610 .body(SdkBody::from(
4611 r#"<?xml version="1.0" encoding="UTF-8"?>
4612<Error>
4613 <Code>BucketNotEmpty</Code>
4614 <Message>The bucket you tried to delete is not empty.</Message>
4615</Error>"#,
4616 ))
4617 .expect("build delete bucket response");
4618 let (client, _request_receiver) = test_s3_client(Some(response));
4619
4620 let result = client.delete_bucket("bucket").await;
4621
4622 match result {
4623 Err(Error::Conflict(message)) => assert!(message.contains("BucketNotEmpty")),
4624 other => panic!("Expected Conflict for non-empty bucket, got: {other:?}"),
4625 }
4626 }
4627
4628 #[tokio::test]
4629 async fn delete_bucket_maps_missing_bucket_to_not_found() {
4630 let response = http::Response::builder()
4631 .status(404)
4632 .header("x-amz-error-code", "NoSuchBucket")
4633 .body(SdkBody::from(
4634 r#"<?xml version="1.0" encoding="UTF-8"?>
4635<Error>
4636 <Code>NoSuchBucket</Code>
4637 <Message>The specified bucket does not exist.</Message>
4638</Error>"#,
4639 ))
4640 .expect("build missing bucket response");
4641 let (client, _request_receiver) = test_s3_client(Some(response));
4642
4643 let result = client.delete_bucket("missing-bucket").await;
4644
4645 match result {
4646 Err(Error::NotFound(message)) => {
4647 assert_eq!(message, "Bucket not found: missing-bucket")
4648 }
4649 other => panic!("Expected NotFound for missing bucket, got: {other:?}"),
4650 }
4651 }
4652
4653 #[tokio::test]
4654 async fn delete_bucket_maps_not_found_code_to_not_found() {
4655 let response = http::Response::builder()
4656 .status(404)
4657 .header("x-amz-error-code", "NotFound")
4658 .body(SdkBody::from(
4659 r#"<?xml version="1.0" encoding="UTF-8"?>
4660<Error>
4661 <Code>NotFound</Code>
4662 <Message>The specified bucket does not exist.</Message>
4663</Error>"#,
4664 ))
4665 .expect("build not found bucket response");
4666 let (client, _request_receiver) = test_s3_client(Some(response));
4667
4668 let result = client.delete_bucket("missing-bucket").await;
4669
4670 match result {
4671 Err(Error::NotFound(message)) => {
4672 assert_eq!(message, "Bucket not found: missing-bucket")
4673 }
4674 other => panic!("Expected NotFound for NotFound delete bucket error, got: {other:?}"),
4675 }
4676 }
4677
4678 #[tokio::test]
4679 async fn delete_bucket_maps_other_failures_to_network() {
4680 let response = http::Response::builder()
4681 .status(500)
4682 .header("x-amz-error-code", "InternalError")
4683 .body(SdkBody::from(
4684 r#"<?xml version="1.0" encoding="UTF-8"?>
4685<Error>
4686 <Code>InternalError</Code>
4687 <Message>Something went wrong.</Message>
4688</Error>"#,
4689 ))
4690 .expect("build delete bucket response");
4691 let (client, _request_receiver) = test_s3_client(Some(response));
4692
4693 let result = client.delete_bucket("bucket").await;
4694
4695 match result {
4696 Err(Error::Network(message)) => assert!(message.contains("InternalError")),
4697 other => panic!("Expected Network for delete bucket failure, got: {other:?}"),
4698 }
4699 }
4700
4701 #[tokio::test]
4702 async fn delete_objects_with_force_delete_sets_rustfs_header() {
4703 let response = http::Response::builder()
4704 .status(200)
4705 .body(SdkBody::from(
4706 r#"<?xml version="1.0" encoding="UTF-8"?>
4707<DeleteResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/" />"#,
4708 ))
4709 .expect("build delete objects response");
4710 let (client, request_receiver) = test_s3_client(Some(response));
4711
4712 let _ = client
4713 .delete_objects_with_options(
4714 "bucket",
4715 vec!["key.txt".to_string()],
4716 DeleteRequestOptions { force_delete: true },
4717 )
4718 .await;
4719
4720 let request = request_receiver.expect_request();
4721 assert_eq!(request.headers().get("x-rustfs-force-delete"), Some("true"));
4722 }
4723
4724 #[tokio::test]
4725 async fn delete_objects_without_force_delete_omits_rustfs_header() {
4726 let response = http::Response::builder()
4727 .status(200)
4728 .body(SdkBody::from(
4729 r#"<?xml version="1.0" encoding="UTF-8"?>
4730<DeleteResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/" />"#,
4731 ))
4732 .expect("build delete objects response");
4733 let (client, request_receiver) = test_s3_client(Some(response));
4734
4735 let _ = client
4736 .delete_objects_with_options(
4737 "bucket",
4738 vec!["key.txt".to_string()],
4739 DeleteRequestOptions::default(),
4740 )
4741 .await;
4742
4743 let request = request_receiver.expect_request();
4744 assert!(request.headers().get("x-rustfs-force-delete").is_none());
4745 }
4746
4747 #[tokio::test]
4748 async fn delete_objects_wrapper_uses_default_options_without_rustfs_header() {
4749 let response = http::Response::builder()
4750 .status(200)
4751 .body(SdkBody::from(
4752 r#"<?xml version="1.0" encoding="UTF-8"?>
4753<DeleteResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/" />"#,
4754 ))
4755 .expect("build delete objects response");
4756 let (client, request_receiver) = test_s3_client(Some(response));
4757
4758 let _ = ObjectStore::delete_objects(&client, "bucket", vec!["key.txt".to_string()]).await;
4759
4760 let request = request_receiver.expect_request();
4761 assert!(request.headers().get("x-rustfs-force-delete").is_none());
4762 }
4763
4764 #[tokio::test]
4765 async fn delete_objects_with_empty_keys_skips_http_request() {
4766 let (client, request_receiver) = test_s3_client(None);
4767
4768 let deleted = client
4769 .delete_objects_with_options("bucket", Vec::new(), DeleteRequestOptions::default())
4770 .await
4771 .expect("empty delete should succeed");
4772
4773 assert!(deleted.is_empty());
4774 request_receiver.expect_no_request();
4775 }
4776
4777 #[tokio::test]
4778 async fn delete_objects_with_partial_errors_returns_deleted_keys() {
4779 let response = http::Response::builder()
4780 .status(200)
4781 .body(SdkBody::from(
4782 r#"<?xml version="1.0" encoding="UTF-8"?>
4783<DeleteResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
4784 <Deleted>
4785 <Key>kept.txt</Key>
4786 </Deleted>
4787 <Error>
4788 <Key>failed.txt</Key>
4789 <Code>AccessDenied</Code>
4790 <Message>Access Denied</Message>
4791 </Error>
4792</DeleteResult>"#,
4793 ))
4794 .expect("build partial delete response");
4795 let (client, request_receiver) = test_s3_client(Some(response));
4796
4797 let deleted = client
4798 .delete_objects_with_options(
4799 "bucket",
4800 vec!["kept.txt".to_string(), "failed.txt".to_string()],
4801 DeleteRequestOptions::default(),
4802 )
4803 .await
4804 .expect("partial delete should still return deleted keys");
4805
4806 let request = request_receiver.expect_request();
4807 assert_eq!(request.uri(), "https://example.com/bucket/?delete");
4808 assert_eq!(deleted, vec!["kept.txt".to_string()]);
4809 }
4810
4811 #[tokio::test]
4812 async fn read_next_part_fills_buffer_until_eof() {
4813 use tokio::io::AsyncWriteExt;
4814
4815 let temp_dir = tempfile::tempdir().expect("create temp dir");
4816 let file_path = temp_dir.path().join("payload.bin");
4817 let mut writer = tokio::fs::File::create(&file_path)
4818 .await
4819 .expect("create temp file");
4820 writer
4821 .write_all(b"abcdefghij")
4822 .await
4823 .expect("write temp file");
4824 writer.flush().await.expect("flush temp file");
4825 drop(writer);
4826
4827 let mut reader = tokio::fs::File::open(&file_path)
4828 .await
4829 .expect("open temp file");
4830 let mut buffer = vec![0u8; 8];
4831
4832 let first = S3Client::read_next_part(&mut reader, &file_path, &mut buffer)
4833 .await
4834 .expect("first read");
4835 assert_eq!(first, 8);
4836 assert_eq!(&buffer[..first], b"abcdefgh");
4837
4838 let second = S3Client::read_next_part(&mut reader, &file_path, &mut buffer)
4839 .await
4840 .expect("second read");
4841 assert_eq!(second, 2);
4842 assert_eq!(&buffer[..second], b"ij");
4843
4844 let third = S3Client::read_next_part(&mut reader, &file_path, &mut buffer)
4845 .await
4846 .expect("third read");
4847 assert_eq!(third, 0);
4848 }
4849}