1use async_trait::async_trait;
6use aws_credential_types::Credentials;
7use aws_sdk_s3::error::ProvideErrorMetadata;
8use aws_sigv4::http_request::{
9 SignableBody, SignableRequest, SignatureLocation, SigningSettings, sign,
10};
11use aws_sigv4::sign::v4;
12use aws_smithy_runtime_api::box_error::BoxError;
13use aws_smithy_runtime_api::client::http::{
14 HttpClient, HttpConnector, HttpConnectorFuture, HttpConnectorSettings, SharedHttpConnector,
15};
16use aws_smithy_runtime_api::client::interceptors::Intercept;
17use aws_smithy_runtime_api::client::interceptors::context::BeforeTransmitInterceptorContextMut;
18use aws_smithy_runtime_api::client::orchestrator::HttpRequest;
19use aws_smithy_runtime_api::client::result::ConnectorError;
20use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
21use aws_smithy_runtime_api::http::{Response, StatusCode};
22use aws_smithy_types::body::SdkBody;
23use aws_smithy_types::config_bag::ConfigBag;
24use bytes::Bytes;
25use jiff::Timestamp;
26use quick_xml::de::from_str as from_xml_str;
27use rc_core::{
28 Alias, BucketNotification, Capabilities, CorsRule, Error, LifecycleRule, ListOptions,
29 ListResult, NotificationTarget, ObjectInfo, ObjectStore, ObjectVersion,
30 ObjectVersionListResult, RemotePath, ReplicationConfiguration, RequestHeader, Result,
31 SelectOptions, global_request_headers,
32};
33use reqwest::Method;
34use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderName, HeaderValue};
35use serde::Deserialize;
36use sha2::{Digest, Sha256};
37use std::collections::HashMap;
38use tokio::io::AsyncReadExt;
39use tokio::io::AsyncWrite;
40
41const SINGLE_PUT_OBJECT_MAX_SIZE: u64 = crate::multipart::DEFAULT_PART_SIZE;
44const S3_SERVICE_NAME: &str = "s3";
45const S3_REPLICATION_XML_NAMESPACE: &str = "http://s3.amazonaws.com/doc/2006-03-01/";
46const RUSTFS_FORCE_DELETE_HEADER: &str = "x-rustfs-force-delete";
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49enum BucketPolicyErrorKind {
50 MissingPolicy,
51 MissingBucket,
52 Other,
53}
54
55#[derive(Debug, Clone)]
58struct ReqwestConnector {
59 client: reqwest::Client,
60}
61
62impl ReqwestConnector {
63 async fn new(insecure: bool, ca_bundle: Option<&str>) -> Result<Self> {
64 let client = build_reqwest_client(insecure, ca_bundle).await?;
65 Ok(Self { client })
66 }
67}
68
69async fn build_reqwest_client(insecure: bool, ca_bundle: Option<&str>) -> Result<reqwest::Client> {
70 let mut builder = reqwest::Client::builder().danger_accept_invalid_certs(insecure);
74
75 if let Some(bundle_path) = ca_bundle {
76 let pem = tokio::fs::read(bundle_path).await.map_err(|e| {
78 Error::Network(format!("Failed to read CA bundle '{bundle_path}': {e}"))
79 })?;
80 let cert = reqwest::Certificate::from_pem(&pem)
81 .map_err(|e| Error::Network(format!("Invalid CA bundle '{bundle_path}': {e}")))?;
82 builder = builder.add_root_certificate(cert);
83 }
84
85 let client = builder
86 .build()
87 .map_err(|e| Error::Network(format!("Failed to build HTTP client: {e}")))?;
88 Ok(client)
89}
90
91fn force_path_style_for_alias(alias: &Alias) -> bool {
92 match alias.bucket_lookup.as_str() {
93 "path" => true,
94 "dns" => false,
95 "auto" => !is_aliyun_oss_service_endpoint(&alias.endpoint),
96 _ => true,
97 }
98}
99
100fn is_aliyun_oss_service_endpoint(endpoint: &str) -> bool {
101 let Ok(url) = reqwest::Url::parse(endpoint.trim_end_matches('/')) else {
102 return false;
103 };
104
105 let Some(host) = url.host_str() else {
106 return false;
107 };
108
109 host.strip_suffix(".aliyuncs.com")
110 .and_then(|host| host.split('.').next())
111 .is_some_and(|first_label| first_label.starts_with("oss-"))
112}
113
114#[derive(Debug, Deserialize)]
115#[serde(rename_all = "PascalCase")]
116struct ReplicationConfigurationXml {
117 role: Option<String>,
118 #[serde(rename = "Rule", default)]
119 rules: Vec<ReplicationRuleXml>,
120}
121
122#[derive(Debug, Deserialize)]
123#[serde(rename_all = "PascalCase")]
124struct ReplicationRuleXml {
125 #[serde(rename = "ID")]
126 id: Option<String>,
127 priority: Option<i32>,
128 status: Option<String>,
129 #[serde(rename = "Prefix")]
130 legacy_prefix: Option<String>,
131 filter: Option<ReplicationFilterXml>,
132 destination: Option<ReplicationDestinationXml>,
133 delete_marker_replication: Option<ReplicationStatusXml>,
134 existing_object_replication: Option<ReplicationStatusXml>,
135 delete_replication: Option<ReplicationStatusXml>,
136}
137
138#[derive(Debug, Deserialize)]
139#[serde(rename_all = "PascalCase")]
140struct ReplicationFilterXml {
141 prefix: Option<String>,
142 tag: Option<TagXml>,
143 and: Option<ReplicationAndXml>,
144}
145
146#[derive(Debug, Deserialize)]
147#[serde(rename_all = "PascalCase")]
148struct ReplicationAndXml {
149 prefix: Option<String>,
150 #[serde(rename = "Tag", default)]
151 tags: Vec<TagXml>,
152}
153
154#[derive(Debug, Deserialize)]
155#[serde(rename_all = "PascalCase")]
156struct TagXml {
157 key: Option<String>,
158 value: Option<String>,
159}
160
161#[derive(Debug, Deserialize)]
162#[serde(rename_all = "PascalCase")]
163struct ReplicationDestinationXml {
164 bucket: Option<String>,
165 storage_class: Option<String>,
166}
167
168#[derive(Debug, Deserialize)]
169#[serde(rename_all = "PascalCase")]
170struct ReplicationStatusXml {
171 status: Option<String>,
172}
173
174#[derive(Debug, Deserialize)]
175#[serde(rename_all = "PascalCase")]
176struct CorsConfigurationXml {
177 #[serde(rename = "CORSRule", default)]
178 rules: Vec<CorsRuleXml>,
179}
180
181#[derive(Debug, Deserialize)]
182#[serde(rename_all = "PascalCase")]
183struct CorsRuleXml {
184 #[serde(rename = "ID")]
185 id: Option<String>,
186 #[serde(rename = "AllowedOrigin", default)]
187 allowed_origins: Vec<String>,
188 #[serde(rename = "AllowedMethod", default)]
189 allowed_methods: Vec<String>,
190 #[serde(rename = "AllowedHeader", default)]
191 allowed_headers: Vec<String>,
192 #[serde(rename = "ExposeHeader", default)]
193 expose_headers: Vec<String>,
194 max_age_seconds: Option<i32>,
195}
196
197fn parse_replication_status(status: Option<&ReplicationStatusXml>) -> Option<bool> {
198 status
199 .and_then(|value| value.status.as_deref())
200 .map(|value| value.eq_ignore_ascii_case("enabled"))
201}
202
203fn parse_replication_rule_status(status: Option<&str>) -> rc_core::ReplicationRuleStatus {
204 match status {
205 Some(value) if value.eq_ignore_ascii_case("enabled") => {
206 rc_core::ReplicationRuleStatus::Enabled
207 }
208 _ => rc_core::ReplicationRuleStatus::Disabled,
209 }
210}
211
212fn collect_tag_map<'a, I>(tags: I) -> Option<HashMap<String, String>>
213where
214 I: IntoIterator<Item = (&'a str, &'a str)>,
215{
216 let collected: HashMap<String, String> = tags
217 .into_iter()
218 .map(|(key, value)| (key.to_string(), value.to_string()))
219 .collect();
220 if collected.is_empty() {
221 None
222 } else {
223 Some(collected)
224 }
225}
226
227fn parse_tag_xml(tag: Option<&TagXml>) -> Option<HashMap<String, String>> {
228 collect_tag_map(tag.and_then(|tag| Some((tag.key.as_deref()?, tag.value.as_deref()?))))
229}
230
231fn parse_tag_xmls(tags: &[TagXml]) -> Option<HashMap<String, String>> {
232 collect_tag_map(
233 tags.iter()
234 .filter_map(|tag| Some((tag.key.as_deref()?, tag.value.as_deref()?))),
235 )
236}
237
238fn parse_replication_filter_prefix(filter: Option<&ReplicationFilterXml>) -> Option<String> {
239 filter
240 .and_then(|filter| filter.prefix.clone())
241 .or_else(|| filter.and_then(|filter| filter.and.as_ref()?.prefix.clone()))
242}
243
244fn parse_replication_filter_tags(
245 filter: Option<&ReplicationFilterXml>,
246) -> Option<HashMap<String, String>> {
247 filter
248 .and_then(|filter| parse_tag_xml(filter.tag.as_ref()))
249 .or_else(|| filter.and_then(|filter| parse_tag_xmls(&filter.and.as_ref()?.tags)))
250}
251
252fn sorted_tags(tags: &HashMap<String, String>) -> Vec<(&str, &str)> {
253 let mut pairs: Vec<(&str, &str)> = tags
254 .iter()
255 .map(|(key, value)| (key.as_str(), value.as_str()))
256 .collect();
257 pairs.sort_unstable();
258 pairs
259}
260
261fn append_tag_xml(xml: &mut String, key: &str, value: &str) {
262 xml.push_str("<Tag><Key>");
263 xml.push_str(&xml_escape(key));
264 xml.push_str("</Key><Value>");
265 xml.push_str(&xml_escape(value));
266 xml.push_str("</Value></Tag>");
267}
268
269fn append_replication_filter_xml(
270 xml: &mut String,
271 prefix: Option<&str>,
272 tags: Option<&HashMap<String, String>>,
273) {
274 let Some(tags) = tags.filter(|tags| !tags.is_empty()) else {
275 if let Some(prefix) = prefix {
276 xml.push_str("<Filter><Prefix>");
277 xml.push_str(&xml_escape(prefix));
278 xml.push_str("</Prefix></Filter>");
279 }
280 return;
281 };
282
283 xml.push_str("<Filter>");
284 if prefix.is_some() || tags.len() > 1 {
285 xml.push_str("<And>");
286 if let Some(prefix) = prefix {
287 xml.push_str("<Prefix>");
288 xml.push_str(&xml_escape(prefix));
289 xml.push_str("</Prefix>");
290 }
291 for (key, value) in sorted_tags(tags) {
292 append_tag_xml(xml, key, value);
293 }
294 xml.push_str("</And>");
295 } else if let Some((key, value)) = sorted_tags(tags).into_iter().next() {
296 append_tag_xml(xml, key, value);
297 }
298 xml.push_str("</Filter>");
299}
300
301fn normalize_optional_strings(values: Option<Vec<String>>) -> Option<Vec<String>> {
302 values.filter(|items| !items.is_empty())
303}
304
305fn is_missing_cors_configuration_error(error_text: &str) -> bool {
306 let normalized = error_text.to_ascii_lowercase();
307 normalized.contains("nosuchcorsconfiguration")
308 || normalized.contains("cors configuration does not exist")
309 || normalized.contains("the cors configuration does not exist")
310}
311
312fn is_missing_cors_configuration_response(
313 error_code: Option<&str>,
314 status_code: Option<u16>,
315 error_text: &str,
316) -> bool {
317 let error_code = error_code.map(|code| code.to_ascii_lowercase());
318 if matches!(error_code.as_deref(), Some("nosuchcorsconfiguration")) {
319 return true;
320 }
321
322 if !is_missing_cors_configuration_error(error_text) {
323 return false;
324 }
325
326 status_code.is_none_or(|status| status == 404)
327}
328
329fn sdk_cors_rule_to_core(rule: &aws_sdk_s3::types::CorsRule) -> CorsRule {
330 CorsRule {
331 id: rule.id().map(str::to_string),
332 allowed_origins: rule.allowed_origins().to_vec(),
333 allowed_methods: rule.allowed_methods().to_vec(),
334 allowed_headers: normalize_optional_strings(Some(rule.allowed_headers().to_vec())),
335 expose_headers: normalize_optional_strings(Some(rule.expose_headers().to_vec())),
336 max_age_seconds: rule.max_age_seconds(),
337 }
338}
339
340fn core_cors_rule_to_sdk(rule: &CorsRule) -> Result<aws_sdk_s3::types::CorsRule> {
341 aws_sdk_s3::types::CorsRule::builder()
342 .set_id(rule.id.clone())
343 .set_allowed_origins(Some(rule.allowed_origins.clone()))
344 .set_allowed_methods(Some(
345 rule.allowed_methods
346 .iter()
347 .map(|method| method.to_ascii_uppercase())
348 .collect(),
349 ))
350 .set_allowed_headers(normalize_optional_strings(rule.allowed_headers.clone()))
351 .set_expose_headers(normalize_optional_strings(rule.expose_headers.clone()))
352 .set_max_age_seconds(rule.max_age_seconds)
353 .build()
354 .map_err(|e| Error::General(format!("build bucket cors rule: {e}")))
355}
356
357fn parse_cors_configuration_xml(body: &str) -> Result<Vec<CorsRule>> {
358 let config: CorsConfigurationXml =
359 from_xml_str(body).map_err(|e| Error::General(format!("parse bucket cors xml: {e}")))?;
360
361 Ok(config
362 .rules
363 .into_iter()
364 .map(|rule| CorsRule {
365 id: rule.id,
366 allowed_origins: rule.allowed_origins,
367 allowed_methods: rule.allowed_methods,
368 allowed_headers: normalize_optional_strings(Some(rule.allowed_headers)),
369 expose_headers: normalize_optional_strings(Some(rule.expose_headers)),
370 max_age_seconds: rule.max_age_seconds,
371 })
372 .collect())
373}
374
375fn parse_replication_configuration_xml(body: &str) -> Result<ReplicationConfiguration> {
376 let config: ReplicationConfigurationXml = from_xml_str(body)
377 .map_err(|e| Error::General(format!("parse replication config xml: {e}")))?;
378
379 let rules = config
380 .rules
381 .into_iter()
382 .map(|rule| rc_core::ReplicationRule {
383 id: rule.id.unwrap_or_default(),
384 priority: rule.priority.unwrap_or_default(),
385 status: parse_replication_rule_status(rule.status.as_deref()),
386 prefix: parse_replication_filter_prefix(rule.filter.as_ref()).or(rule.legacy_prefix),
387 tags: parse_replication_filter_tags(rule.filter.as_ref()),
388 destination: rc_core::ReplicationDestination {
389 bucket_arn: rule
390 .destination
391 .as_ref()
392 .and_then(|destination| destination.bucket.clone())
393 .unwrap_or_default(),
394 storage_class: rule
395 .destination
396 .and_then(|destination| destination.storage_class),
397 },
398 delete_marker_replication: parse_replication_status(
399 rule.delete_marker_replication.as_ref(),
400 ),
401 existing_object_replication: parse_replication_status(
402 rule.existing_object_replication.as_ref(),
403 ),
404 delete_replication: parse_replication_status(rule.delete_replication.as_ref()),
405 })
406 .collect();
407
408 Ok(ReplicationConfiguration {
409 role: config.role.unwrap_or_default(),
410 rules,
411 })
412}
413
414fn xml_escape(value: &str) -> String {
415 value
416 .replace('&', "&")
417 .replace('<', "<")
418 .replace('>', ">")
419 .replace('"', """)
420 .replace('\'', "'")
421}
422
423fn append_replication_status_tag(xml: &mut String, tag: &str, enabled: Option<bool>) {
424 if let Some(enabled) = enabled {
425 let status = if enabled { "Enabled" } else { "Disabled" };
426 xml.push('<');
427 xml.push_str(tag);
428 xml.push_str("><Status>");
429 xml.push_str(status);
430 xml.push_str("</Status></");
431 xml.push_str(tag);
432 xml.push('>');
433 }
434}
435
436fn build_replication_configuration_xml(config: &ReplicationConfiguration) -> String {
437 let mut xml = String::from(r#"<?xml version="1.0" encoding="UTF-8"?>"#);
438 xml.push_str(r#"<ReplicationConfiguration xmlns=""#);
439 xml.push_str(S3_REPLICATION_XML_NAMESPACE);
440 xml.push_str(r#"">"#);
441
442 if !config.role.is_empty() {
443 xml.push_str("<Role>");
444 xml.push_str(&xml_escape(&config.role));
445 xml.push_str("</Role>");
446 }
447
448 for rule in &config.rules {
449 xml.push_str("<Rule>");
450
451 xml.push_str("<Status>");
452 xml.push_str(match rule.status {
453 rc_core::ReplicationRuleStatus::Enabled => "Enabled",
454 rc_core::ReplicationRuleStatus::Disabled => "Disabled",
455 });
456 xml.push_str("</Status>");
457
458 xml.push_str("<Destination><Bucket>");
459 xml.push_str(&xml_escape(&rule.destination.bucket_arn));
460 xml.push_str("</Bucket>");
461 if let Some(storage_class) = &rule.destination.storage_class {
462 xml.push_str("<StorageClass>");
463 xml.push_str(&xml_escape(storage_class));
464 xml.push_str("</StorageClass>");
465 }
466 xml.push_str("</Destination>");
467
468 if !rule.id.is_empty() {
469 xml.push_str("<ID>");
470 xml.push_str(&xml_escape(&rule.id));
471 xml.push_str("</ID>");
472 }
473
474 xml.push_str("<Priority>");
475 xml.push_str(&rule.priority.to_string());
476 xml.push_str("</Priority>");
477
478 append_replication_filter_xml(&mut xml, rule.prefix.as_deref(), rule.tags.as_ref());
479
480 append_replication_status_tag(
481 &mut xml,
482 "ExistingObjectReplication",
483 rule.existing_object_replication,
484 );
485 append_replication_status_tag(
486 &mut xml,
487 "DeleteMarkerReplication",
488 rule.delete_marker_replication,
489 );
490 append_replication_status_tag(&mut xml, "DeleteReplication", rule.delete_replication);
491
492 xml.push_str("</Rule>");
493 }
494
495 xml.push_str("</ReplicationConfiguration>");
496 xml
497}
498
499fn parse_lifecycle_filter_prefix(
500 filter: Option<&aws_sdk_s3::types::LifecycleRuleFilter>,
501) -> Option<String> {
502 filter
503 .and_then(|filter| filter.prefix().map(str::to_string))
504 .or_else(|| filter.and_then(|filter| filter.and()?.prefix().map(str::to_string)))
505}
506
507fn parse_lifecycle_filter_tags(
508 filter: Option<&aws_sdk_s3::types::LifecycleRuleFilter>,
509) -> Option<HashMap<String, String>> {
510 filter
511 .and_then(|filter| collect_tag_map(filter.tag().map(|tag| (tag.key(), tag.value()))))
512 .or_else(|| {
513 filter.and_then(|filter| {
514 collect_tag_map(
515 filter
516 .and()?
517 .tags()
518 .iter()
519 .map(|tag| (tag.key(), tag.value())),
520 )
521 })
522 })
523}
524
525fn build_s3_tag(key: &str, value: &str) -> Result<aws_sdk_s3::types::Tag> {
526 aws_sdk_s3::types::Tag::builder()
527 .key(key)
528 .value(value)
529 .build()
530 .map_err(|error| Error::General(format!("build filter tag: {error}")))
531}
532
533fn build_lifecycle_rule_filter(
534 prefix: Option<&str>,
535 tags: Option<&HashMap<String, String>>,
536) -> Result<Option<aws_sdk_s3::types::LifecycleRuleFilter>> {
537 let Some(tags) = tags.filter(|tags| !tags.is_empty()) else {
538 return Ok(prefix.map(|prefix| {
539 aws_sdk_s3::types::LifecycleRuleFilter::builder()
540 .prefix(prefix)
541 .build()
542 }));
543 };
544
545 let tag_values = sorted_tags(tags)
546 .into_iter()
547 .map(|(key, value)| build_s3_tag(key, value))
548 .collect::<Result<Vec<_>>>()?;
549
550 let filter = if prefix.is_some() || tag_values.len() > 1 {
551 let mut and_builder = aws_sdk_s3::types::LifecycleRuleAndOperator::builder();
552 if let Some(prefix) = prefix {
553 and_builder = and_builder.prefix(prefix);
554 }
555 for tag in tag_values {
556 and_builder = and_builder.tags(tag);
557 }
558 aws_sdk_s3::types::LifecycleRuleFilter::builder()
559 .and(and_builder.build())
560 .build()
561 } else {
562 aws_sdk_s3::types::LifecycleRuleFilter::builder()
563 .tag(
564 tag_values
565 .into_iter()
566 .next()
567 .expect("non-empty tags required to build lifecycle filter"),
568 )
569 .build()
570 };
571
572 Ok(Some(filter))
573}
574
575impl HttpConnector for ReqwestConnector {
576 fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
577 let client = self.client.clone();
578 HttpConnectorFuture::new(async move {
579 let uri = request.uri().to_string();
581 let method_str = request.method().to_string();
582 let headers = request.headers().clone();
583
584 let body_bytes = match request.body().bytes() {
589 Some(b) => Bytes::copy_from_slice(b),
590 None => {
591 return Err(ConnectorError::user(
592 "Streaming request bodies are not supported in insecure/ca_bundle TLS mode; \
593 use in-memory data for uploads with this connector"
594 .into(),
595 ));
596 }
597 };
598
599 let method = reqwest::Method::from_bytes(method_str.as_bytes())
601 .map_err(|e| ConnectorError::user(Box::new(e)))?;
602
603 let url = reqwest::Url::parse(&uri).map_err(|e| ConnectorError::user(Box::new(e)))?;
605
606 let mut req = reqwest::Request::new(method, url);
608
609 for (name, value) in headers.iter() {
611 match (
612 reqwest::header::HeaderName::from_bytes(name.as_bytes()),
613 reqwest::header::HeaderValue::from_bytes(value.as_bytes()),
614 ) {
615 (Ok(header_name), Ok(header_value)) => {
616 req.headers_mut().append(header_name, header_value);
617 }
618 _ => {
619 tracing::warn!("Skipping non-convertible request header: {}", name);
620 }
621 }
622 }
623
624 *req.body_mut() = Some(reqwest::Body::from(body_bytes));
626
627 let resp = client
629 .execute(req)
630 .await
631 .map_err(|e| ConnectorError::io(Box::new(e)))?;
632
633 let status = StatusCode::try_from(resp.status().as_u16())
635 .map_err(|e| ConnectorError::other(Box::new(e), None))?;
636 let resp_headers = resp.headers().clone();
637 let body = resp
638 .bytes()
639 .await
640 .map_err(|e| ConnectorError::io(Box::new(e)))?;
641
642 let mut sdk_response = Response::new(status, SdkBody::from(body));
643 for (name, value) in &resp_headers {
644 match value.to_str() {
645 Ok(value_str) => {
646 sdk_response
647 .headers_mut()
648 .append(name.as_str().to_owned(), value_str.to_owned());
649 }
650 Err(_) => {
651 tracing::warn!("Skipping non-UTF8 response header: {}", name.as_str());
652 }
653 }
654 }
655
656 Ok(sdk_response)
657 })
658 }
659}
660
661impl HttpClient for ReqwestConnector {
662 fn http_connector(
663 &self,
664 _settings: &HttpConnectorSettings,
665 _components: &RuntimeComponents,
666 ) -> SharedHttpConnector {
667 SharedHttpConnector::new(self.clone())
673 }
674}
675
676pub struct S3Client {
678 inner: aws_sdk_s3::Client,
679 presign_inner: aws_sdk_s3::Client,
680 xml_http_client: reqwest::Client,
681 alias: Alias,
682 request_headers: Vec<RequestHeader>,
683}
684
685#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
687pub struct DeleteRequestOptions {
688 pub force_delete: bool,
690}
691
692#[derive(Debug, Clone)]
693struct CustomHeaderInterceptor {
694 headers: Vec<RequestHeader>,
695}
696
697impl Intercept for CustomHeaderInterceptor {
698 fn name(&self) -> &'static str {
699 "CustomHeaderInterceptor"
700 }
701
702 fn modify_before_signing(
703 &self,
704 context: &mut BeforeTransmitInterceptorContextMut<'_>,
705 _runtime_components: &RuntimeComponents,
706 _cfg: &mut ConfigBag,
707 ) -> std::result::Result<(), BoxError> {
708 let request = context.request_mut();
709 for header in &self.headers {
710 request
711 .headers_mut()
712 .try_insert(header.name.clone(), header.value.clone())
713 .map_err(|error| Box::new(error) as BoxError)?;
714 }
715 Ok(())
716 }
717}
718
719impl S3Client {
720 pub async fn new(alias: Alias) -> Result<Self> {
722 let endpoint = alias.endpoint.clone();
723 let region = alias.region.clone();
724 let access_key = alias.access_key.clone();
725 let secret_key = alias.secret_key.clone();
726
727 let credentials = aws_credential_types::Credentials::new(
729 access_key,
730 secret_key,
731 None, None, "rc-static-credentials",
734 );
735
736 let mut config_loader = aws_config::defaults(aws_config::BehaviorVersion::latest())
738 .credentials_provider(credentials)
739 .region(aws_config::Region::new(region))
740 .endpoint_url(&endpoint);
741
742 if alias.insecure || alias.ca_bundle.is_some() {
745 let connector =
746 ReqwestConnector::new(alias.insecure, alias.ca_bundle.as_deref()).await?;
747 config_loader = config_loader.http_client(connector);
748 }
749
750 let xml_http_client =
751 build_reqwest_client(alias.insecure, alias.ca_bundle.as_deref()).await?;
752 let config = config_loader.load().await;
753
754 let request_headers = global_request_headers();
756 let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&config)
757 .force_path_style(force_path_style_for_alias(&alias))
758 .request_checksum_calculation(
761 aws_sdk_s3::config::RequestChecksumCalculation::WhenRequired,
762 )
763 .response_checksum_validation(
764 aws_sdk_s3::config::ResponseChecksumValidation::WhenRequired,
765 );
766
767 let presign_s3_config = s3_config_builder.clone().build();
768
769 if !request_headers.is_empty() {
770 s3_config_builder = s3_config_builder.interceptor(CustomHeaderInterceptor {
771 headers: request_headers.clone(),
772 });
773 }
774
775 let s3_config = s3_config_builder.build();
776
777 let client = aws_sdk_s3::Client::from_conf(s3_config);
778 let presign_client = aws_sdk_s3::Client::from_conf(presign_s3_config);
779
780 Ok(Self {
781 inner: client,
782 presign_inner: presign_client,
783 xml_http_client,
784 alias,
785 request_headers,
786 })
787 }
788
789 pub fn inner(&self) -> &aws_sdk_s3::Client {
791 &self.inner
792 }
793
794 pub async fn list_object_versions_page(
796 &self,
797 path: &RemotePath,
798 max_keys: Option<i32>,
799 ) -> Result<ObjectVersionListResult> {
800 let mut builder = self.inner.list_object_versions().bucket(&path.bucket);
801
802 if !path.key.is_empty() {
803 builder = builder.prefix(&path.key);
804 }
805
806 if let Some(max) = max_keys {
807 builder = builder.max_keys(max);
808 }
809
810 let response = builder.send().await.map_err(|e| {
811 let err_str = Self::format_sdk_error(&e);
812 if err_str.contains("NotFound") || err_str.contains("NoSuchBucket") {
813 Error::NotFound(format!("Bucket not found: {}", path.bucket))
814 } else {
815 Error::Network(err_str)
816 }
817 })?;
818
819 let mut items = Vec::new();
820
821 for v in response.versions() {
822 items.push(ObjectVersion {
823 key: v.key().unwrap_or_default().to_string(),
824 version_id: v.version_id().unwrap_or("null").to_string(),
825 is_latest: v.is_latest().unwrap_or(false),
826 is_delete_marker: false,
827 last_modified: v
828 .last_modified()
829 .and_then(|dt| Timestamp::from_second(dt.secs()).ok()),
830 size_bytes: v.size(),
831 etag: v.e_tag().map(|s| s.trim_matches('"').to_string()),
832 });
833 }
834
835 for m in response.delete_markers() {
836 items.push(ObjectVersion {
837 key: m.key().unwrap_or_default().to_string(),
838 version_id: m.version_id().unwrap_or("null").to_string(),
839 is_latest: m.is_latest().unwrap_or(false),
840 is_delete_marker: true,
841 last_modified: m
842 .last_modified()
843 .and_then(|dt| Timestamp::from_second(dt.secs()).ok()),
844 size_bytes: None,
845 etag: None,
846 });
847 }
848
849 items.sort_by(|a, b| {
850 a.key
851 .cmp(&b.key)
852 .then_with(|| b.last_modified.cmp(&a.last_modified))
853 });
854
855 Ok(ObjectVersionListResult {
856 items,
857 truncated: response.is_truncated().unwrap_or(false),
858 continuation_token: response.next_key_marker().map(ToString::to_string),
859 version_id_marker: response.next_version_id_marker().map(ToString::to_string),
860 })
861 }
862
863 pub async fn get_object_with_progress(
865 &self,
866 path: &RemotePath,
867 mut on_progress: impl FnMut(u64, Option<u64>) + Send,
868 ) -> Result<Vec<u8>> {
869 let response = self
870 .inner
871 .get_object()
872 .bucket(&path.bucket)
873 .key(&path.key)
874 .send()
875 .await
876 .map_err(|e| {
877 let err_str = e.to_string();
878 if err_str.contains("NotFound") || err_str.contains("NoSuchKey") {
879 Error::NotFound(path.to_string())
880 } else {
881 Error::Network(err_str)
882 }
883 })?;
884
885 let content_length = response
886 .content_length()
887 .and_then(|length| u64::try_from(length).ok());
888 let mut data = Vec::with_capacity(
889 content_length
890 .and_then(|length| usize::try_from(length).ok())
891 .unwrap_or_default(),
892 );
893 let mut body = response.body;
894 let mut bytes_downloaded = 0u64;
895
896 while let Some(chunk) = body
897 .try_next()
898 .await
899 .map_err(|e| Error::Network(e.to_string()))?
900 {
901 bytes_downloaded += chunk.len() as u64;
902 data.extend_from_slice(&chunk);
903 on_progress(bytes_downloaded, content_length);
904 }
905
906 Ok(data)
907 }
908
909 pub async fn delete_object_with_options(
911 &self,
912 path: &RemotePath,
913 options: DeleteRequestOptions,
914 ) -> Result<()> {
915 let mut request = self
916 .inner
917 .delete_object()
918 .bucket(&path.bucket)
919 .key(&path.key)
920 .customize();
921
922 if options.force_delete {
923 request = request.mutate_request(|request| {
924 request
925 .headers_mut()
926 .insert(RUSTFS_FORCE_DELETE_HEADER, "true");
927 });
928 }
929
930 request.send().await.map_err(|e| {
931 let err_str = Self::format_sdk_error(&e);
932 let is_missing_key = if let aws_sdk_s3::error::SdkError::ServiceError(service_err) = &e
933 {
934 let code = service_err.err().code().or_else(|| {
935 service_err
936 .raw()
937 .headers()
938 .get("x-amz-error-code")
939 .and_then(|value| std::str::from_utf8(value.as_bytes()).ok())
940 });
941 matches!(code, Some("NoSuchKey") | Some("NotFound"))
942 || service_err.raw().status().as_u16() == 404
943 } else {
944 err_str.contains("NotFound") || err_str.contains("NoSuchKey")
945 };
946
947 if is_missing_key {
948 Error::NotFound(path.to_string())
949 } else {
950 Error::Network(err_str)
951 }
952 })?;
953
954 Ok(())
955 }
956
957 pub async fn delete_objects_with_options(
959 &self,
960 bucket: &str,
961 keys: Vec<String>,
962 options: DeleteRequestOptions,
963 ) -> Result<Vec<String>> {
964 use aws_sdk_s3::types::{Delete, ObjectIdentifier};
965
966 if keys.is_empty() {
967 return Ok(vec![]);
968 }
969
970 let objects: Vec<ObjectIdentifier> =
971 keys.iter()
972 .map(|key| {
973 ObjectIdentifier::builder().key(key).build().map_err(|e| {
974 Error::General(format!("invalid delete object identifier: {e}"))
975 })
976 })
977 .collect::<Result<Vec<_>>>()?;
978
979 let delete = Delete::builder()
980 .set_objects(Some(objects))
981 .build()
982 .map_err(|e| Error::General(e.to_string()))?;
983
984 let mut request = self
985 .inner
986 .delete_objects()
987 .bucket(bucket)
988 .delete(delete)
989 .customize();
990
991 if options.force_delete {
992 request = request.mutate_request(|request| {
993 request
994 .headers_mut()
995 .insert(RUSTFS_FORCE_DELETE_HEADER, "true");
996 });
997 }
998
999 let response = request
1000 .send()
1001 .await
1002 .map_err(|e| Error::Network(e.to_string()))?;
1003
1004 let deleted: Vec<String> = response
1005 .deleted()
1006 .iter()
1007 .filter_map(|d| d.key().map(|k| k.to_string()))
1008 .collect();
1009
1010 if !response.errors().is_empty() {
1011 let error_keys: Vec<String> = response
1012 .errors()
1013 .iter()
1014 .filter_map(|e| e.key().map(|k| k.to_string()))
1015 .collect();
1016 tracing::warn!("Failed to delete some objects: {:?}", error_keys);
1017 }
1018
1019 Ok(deleted)
1020 }
1021
1022 fn format_sdk_error<E: std::fmt::Display>(error: &aws_sdk_s3::error::SdkError<E>) -> String {
1024 match error {
1025 aws_sdk_s3::error::SdkError::ServiceError(service_err) => {
1026 let err = service_err.err();
1027 let meta = service_err.raw();
1028 let mut msg = format!("Service error: {}", err);
1029 if let Some(code) = meta.headers().get("x-amz-error-code")
1031 && let Ok(code_str) = std::str::from_utf8(code.as_bytes())
1032 {
1033 msg.push_str(&format!(" (code: {})", code_str));
1034 }
1035 msg
1036 }
1037 aws_sdk_s3::error::SdkError::ConstructionFailure(err) => {
1038 format!("Request construction failed: {:?}", err)
1039 }
1040 aws_sdk_s3::error::SdkError::TimeoutError(_) => "Request timeout".to_string(),
1041 aws_sdk_s3::error::SdkError::DispatchFailure(err) => {
1042 format!("Network dispatch error: {:?}", err)
1043 }
1044 aws_sdk_s3::error::SdkError::ResponseError(err) => {
1045 format!("Response error: {:?}", err)
1046 }
1047 _ => error.to_string(),
1048 }
1049 }
1050
1051 fn should_use_multipart(file_size: u64) -> bool {
1052 file_size > SINGLE_PUT_OBJECT_MAX_SIZE
1053 }
1054
1055 fn sha256_hash(body: &[u8]) -> String {
1056 let mut hasher = Sha256::new();
1057 hasher.update(body);
1058 hex::encode(hasher.finalize())
1059 }
1060
1061 fn request_host(&self, url: &reqwest::Url) -> Result<String> {
1062 let host = url
1063 .host_str()
1064 .ok_or_else(|| Error::Network("Missing host in request URL".to_string()))?;
1065 Ok(match url.port() {
1066 Some(port) => format!("{host}:{port}"),
1067 None => host.to_string(),
1068 })
1069 }
1070
1071 fn replication_url(&self, bucket: &str) -> Result<reqwest::Url> {
1072 let mut url =
1073 reqwest::Url::parse(self.alias.endpoint.trim_end_matches('/')).map_err(|e| {
1074 Error::Network(format!("Invalid endpoint '{}': {e}", self.alias.endpoint))
1075 })?;
1076
1077 {
1078 let mut segments = url.path_segments_mut().map_err(|_| {
1079 Error::Network(format!(
1080 "Endpoint '{}' does not support path-style bucket operations",
1081 self.alias.endpoint
1082 ))
1083 })?;
1084 segments.pop_if_empty();
1085 segments.push(bucket);
1086 }
1087
1088 url.set_query(Some("replication="));
1089 Ok(url)
1090 }
1091
1092 fn cors_url(&self, bucket: &str) -> Result<reqwest::Url> {
1093 let mut url =
1094 reqwest::Url::parse(self.alias.endpoint.trim_end_matches('/')).map_err(|e| {
1095 Error::Network(format!("Invalid endpoint '{}': {e}", self.alias.endpoint))
1096 })?;
1097
1098 {
1099 let mut segments = url.path_segments_mut().map_err(|_| {
1100 Error::Network(format!(
1101 "Endpoint '{}' does not support path-style bucket operations",
1102 self.alias.endpoint
1103 ))
1104 })?;
1105 segments.pop_if_empty();
1106 segments.push(bucket);
1107 }
1108
1109 url.set_query(Some("cors="));
1110 Ok(url)
1111 }
1112
1113 async fn sign_xml_request(
1114 &self,
1115 method: &Method,
1116 url: &str,
1117 headers: &HeaderMap,
1118 body: &[u8],
1119 ) -> Result<HeaderMap> {
1120 let credentials = Credentials::new(
1121 &self.alias.access_key,
1122 &self.alias.secret_key,
1123 None,
1124 None,
1125 "s3-xml-client",
1126 );
1127
1128 let identity = credentials.into();
1129 let mut signing_settings = SigningSettings::default();
1130 signing_settings.signature_location = SignatureLocation::Headers;
1131
1132 let signing_params = v4::SigningParams::builder()
1133 .identity(&identity)
1134 .region(&self.alias.region)
1135 .name(S3_SERVICE_NAME)
1136 .time(std::time::SystemTime::now())
1137 .settings(signing_settings)
1138 .build()
1139 .map_err(|e| Error::Auth(format!("Failed to build signing params: {e}")))?;
1140
1141 let header_pairs: Vec<(&str, &str)> = headers
1142 .iter()
1143 .filter_map(|(k, v)| v.to_str().ok().map(|v| (k.as_str(), v)))
1144 .collect();
1145
1146 let signable_request = SignableRequest::new(
1147 method.as_str(),
1148 url,
1149 header_pairs.into_iter(),
1150 SignableBody::Bytes(body),
1151 )
1152 .map_err(|e| Error::Auth(format!("Failed to create signable request: {e}")))?;
1153
1154 let (signing_instructions, _) = sign(signable_request, &signing_params.into())
1155 .map_err(|e| Error::Auth(format!("Failed to sign request: {e}")))?
1156 .into_parts();
1157
1158 let mut signed_headers = headers.clone();
1159 for (name, value) in signing_instructions.headers() {
1160 let header_name = HeaderName::try_from(name.to_string())
1161 .map_err(|e| Error::Auth(format!("Invalid header name: {e}")))?;
1162 let header_value = HeaderValue::try_from(value.to_string())
1163 .map_err(|e| Error::Auth(format!("Invalid header value: {e}")))?;
1164 signed_headers.insert(header_name, header_value);
1165 }
1166
1167 Ok(signed_headers)
1168 }
1169
1170 async fn xml_request(
1171 &self,
1172 method: Method,
1173 url: reqwest::Url,
1174 content_type: Option<&str>,
1175 body: Option<Vec<u8>>,
1176 ) -> Result<String> {
1177 let body = body.unwrap_or_default();
1178 let mut headers = HeaderMap::new();
1179 headers.insert(
1180 "x-amz-content-sha256",
1181 HeaderValue::from_str(&Self::sha256_hash(&body))
1182 .map_err(|e| Error::Auth(format!("Invalid content hash header: {e}")))?,
1183 );
1184 headers.insert(
1185 "host",
1186 HeaderValue::from_str(&self.request_host(&url)?)
1187 .map_err(|e| Error::Auth(format!("Invalid host header: {e}")))?,
1188 );
1189
1190 if let Some(content_type) = content_type {
1191 headers.insert(
1192 CONTENT_TYPE,
1193 HeaderValue::from_str(content_type)
1194 .map_err(|e| Error::Auth(format!("Invalid content type header: {e}")))?,
1195 );
1196 }
1197
1198 for header in &self.request_headers {
1199 let name = HeaderName::from_bytes(header.name.as_bytes())
1200 .map_err(|e| Error::Auth(format!("Invalid custom header name: {e}")))?;
1201 let value = HeaderValue::from_str(&header.value)
1202 .map_err(|e| Error::Auth(format!("Invalid custom header value: {e}")))?;
1203 headers.insert(name, value);
1204 }
1205
1206 let signed_headers = self
1207 .sign_xml_request(&method, url.as_str(), &headers, &body)
1208 .await?;
1209
1210 let mut request_builder = self.xml_http_client.request(method, url);
1211 for (name, value) in &signed_headers {
1212 request_builder = request_builder.header(name, value);
1213 }
1214 if !body.is_empty() {
1215 request_builder = request_builder.body(body);
1216 }
1217
1218 let response = request_builder
1219 .send()
1220 .await
1221 .map_err(|e| Error::Network(format!("Request failed: {e}")))?;
1222
1223 let status = response.status();
1224 let text = response
1225 .text()
1226 .await
1227 .map_err(|e| Error::Network(format!("Failed to read response: {e}")))?;
1228
1229 if !status.is_success() {
1230 return Err(Error::Network(format!(
1231 "HTTP {}: {}",
1232 status.as_u16(),
1233 text
1234 )));
1235 }
1236
1237 Ok(text)
1238 }
1239
1240 fn bucket_policy_error_kind(
1241 error_code: Option<&str>,
1242 status_code: Option<u16>,
1243 error_text: &str,
1244 ) -> BucketPolicyErrorKind {
1245 let error_code = error_code.map(|code| code.to_ascii_lowercase());
1246 if matches!(
1247 error_code.as_deref(),
1248 Some("nosuchbucketpolicy") | Some("nosuchpolicy")
1249 ) {
1250 return BucketPolicyErrorKind::MissingPolicy;
1251 }
1252 if matches!(error_code.as_deref(), Some("nosuchbucket")) {
1253 return BucketPolicyErrorKind::MissingBucket;
1254 }
1255
1256 let error_text = error_text.to_ascii_lowercase();
1257 if error_text.contains("nosuchbucketpolicy") || error_text.contains("nosuchpolicy") {
1258 return BucketPolicyErrorKind::MissingPolicy;
1259 }
1260 if error_text.contains("nosuchbucket") {
1261 return BucketPolicyErrorKind::MissingBucket;
1262 }
1263 if status_code == Some(404) {
1264 return BucketPolicyErrorKind::MissingPolicy;
1265 }
1266
1267 BucketPolicyErrorKind::Other
1268 }
1269
1270 fn map_get_bucket_policy_error(
1271 bucket: &str,
1272 kind: BucketPolicyErrorKind,
1273 error_text: &str,
1274 ) -> Result<Option<String>> {
1275 match kind {
1276 BucketPolicyErrorKind::MissingPolicy => Ok(None),
1277 BucketPolicyErrorKind::MissingBucket => {
1278 Err(Error::NotFound(format!("Bucket not found: {bucket}")))
1279 }
1280 BucketPolicyErrorKind::Other => {
1281 Err(Error::Network(format!("get_bucket_policy: {error_text}")))
1282 }
1283 }
1284 }
1285
1286 fn map_delete_bucket_policy_error(
1287 bucket: &str,
1288 kind: BucketPolicyErrorKind,
1289 error_text: &str,
1290 ) -> Result<()> {
1291 match kind {
1292 BucketPolicyErrorKind::MissingPolicy => Ok(()),
1293 BucketPolicyErrorKind::MissingBucket => {
1294 Err(Error::NotFound(format!("Bucket not found: {bucket}")))
1295 }
1296 BucketPolicyErrorKind::Other => Err(Error::General(format!(
1297 "delete_bucket_policy: {error_text}"
1298 ))),
1299 }
1300 }
1301
1302 fn extract_notification_filter(
1303 filter: Option<&aws_sdk_s3::types::NotificationConfigurationFilter>,
1304 ) -> (Option<String>, Option<String>) {
1305 let mut prefix = None;
1306 let mut suffix = None;
1307
1308 if let Some(key_filter) = filter.and_then(|value| value.key()) {
1309 for rule in key_filter.filter_rules() {
1310 match rule.name().map(|name| name.as_str()) {
1311 Some("prefix") => {
1312 prefix = rule.value().map(ToString::to_string);
1313 }
1314 Some("suffix") => {
1315 suffix = rule.value().map(ToString::to_string);
1316 }
1317 _ => {}
1318 }
1319 }
1320 }
1321
1322 (prefix, suffix)
1323 }
1324
1325 fn build_notification_filter(
1326 prefix: Option<&str>,
1327 suffix: Option<&str>,
1328 ) -> Option<aws_sdk_s3::types::NotificationConfigurationFilter> {
1329 use aws_sdk_s3::types::{FilterRule, FilterRuleName, NotificationConfigurationFilter};
1330
1331 let mut rules = Vec::new();
1332 if let Some(value) = prefix {
1333 let rule = FilterRule::builder()
1334 .name(FilterRuleName::Prefix)
1335 .value(value)
1336 .build();
1337 rules.push(rule);
1338 }
1339 if let Some(value) = suffix {
1340 let rule = FilterRule::builder()
1341 .name(FilterRuleName::Suffix)
1342 .value(value)
1343 .build();
1344 rules.push(rule);
1345 }
1346 if rules.is_empty() {
1347 return None;
1348 }
1349
1350 let key_filter = aws_sdk_s3::types::S3KeyFilter::builder()
1351 .set_filter_rules(Some(rules))
1352 .build();
1353 NotificationConfigurationFilter::builder()
1354 .key(key_filter)
1355 .build()
1356 .into()
1357 }
1358
1359 fn event_list_to_strings(events: &[aws_sdk_s3::types::Event]) -> Vec<String> {
1360 events
1361 .iter()
1362 .map(|event| event.as_str().to_string())
1363 .collect()
1364 }
1365
1366 fn strings_to_event_list(events: &[String]) -> Vec<aws_sdk_s3::types::Event> {
1367 events
1368 .iter()
1369 .map(|event| aws_sdk_s3::types::Event::from(event.as_str()))
1370 .collect()
1371 }
1372
1373 fn notifications_equivalent(
1374 expected: &[BucketNotification],
1375 actual: &[BucketNotification],
1376 ) -> bool {
1377 type CanonicalEntry = (u8, String, Option<String>, Option<String>, Vec<String>);
1378
1379 fn target_order(target: NotificationTarget) -> u8 {
1380 match target {
1381 NotificationTarget::Queue => 0,
1382 NotificationTarget::Topic => 1,
1383 NotificationTarget::Lambda => 2,
1384 }
1385 }
1386
1387 fn canonical(notifications: &[BucketNotification]) -> Vec<CanonicalEntry> {
1388 let mut normalized: Vec<CanonicalEntry> = notifications
1389 .iter()
1390 .map(|item| {
1391 let mut events = item.events.clone();
1392 events.sort();
1393 events.dedup();
1394 (
1395 target_order(item.target),
1396 item.arn.clone(),
1397 item.prefix.clone(),
1398 item.suffix.clone(),
1399 events,
1400 )
1401 })
1402 .collect();
1403 normalized.sort();
1404 normalized
1405 }
1406
1407 canonical(expected) == canonical(actual)
1408 }
1409
1410 async fn read_next_part(
1411 file: &mut tokio::fs::File,
1412 file_path: &std::path::Path,
1413 buffer: &mut [u8],
1414 ) -> Result<usize> {
1415 let mut total_read = 0usize;
1416 while total_read < buffer.len() {
1417 let bytes_read = file
1418 .read(&mut buffer[total_read..])
1419 .await
1420 .map_err(|e| Error::General(format!("read file '{}': {e}", file_path.display())))?;
1421 if bytes_read == 0 {
1422 break;
1423 }
1424 total_read += bytes_read;
1425 }
1426 Ok(total_read)
1427 }
1428
1429 async fn put_object_single_part_from_path(
1430 &self,
1431 path: &RemotePath,
1432 file_path: &std::path::Path,
1433 content_type: Option<&str>,
1434 file_size: u64,
1435 ) -> Result<ObjectInfo> {
1436 let data = tokio::fs::read(file_path)
1437 .await
1438 .map_err(|e| Error::General(format!("read file '{}': {e}", file_path.display())))?;
1439 let body = aws_sdk_s3::primitives::ByteStream::from(data);
1440
1441 let mut request = self
1442 .inner
1443 .put_object()
1444 .bucket(&path.bucket)
1445 .key(&path.key)
1446 .body(body);
1447
1448 if let Some(ct) = content_type {
1449 request = request.content_type(ct);
1450 }
1451
1452 let response = request
1453 .send()
1454 .await
1455 .map_err(|e| Error::Network(e.to_string()))?;
1456
1457 let mut info = ObjectInfo::file(&path.key, file_size as i64);
1458 if let Some(etag) = response.e_tag() {
1459 info.etag = Some(etag.trim_matches('"').to_string());
1460 }
1461 info.last_modified = Some(jiff::Timestamp::now());
1462
1463 Ok(info)
1464 }
1465
1466 async fn abort_multipart_upload_best_effort(&self, path: &RemotePath, upload_id: &str) {
1467 let _ = self
1468 .inner
1469 .abort_multipart_upload()
1470 .bucket(&path.bucket)
1471 .key(&path.key)
1472 .upload_id(upload_id)
1473 .send()
1474 .await;
1475 }
1476
1477 async fn put_object_multipart_from_path(
1478 &self,
1479 path: &RemotePath,
1480 file_path: &std::path::Path,
1481 content_type: Option<&str>,
1482 file_size: u64,
1483 on_progress: impl Fn(u64) + Send,
1484 ) -> Result<ObjectInfo> {
1485 use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart};
1486
1487 let config = crate::multipart::MultipartConfig::default();
1488 let part_size = config.calculate_part_size(file_size);
1489 let part_buffer_size = usize::try_from(part_size)
1490 .map_err(|_| Error::General(format!("invalid part size: {part_size}")))?;
1491
1492 tracing::debug!(file_size, part_size, "Starting multipart upload");
1493
1494 let mut create_request = self
1495 .inner
1496 .create_multipart_upload()
1497 .bucket(&path.bucket)
1498 .key(&path.key);
1499
1500 if let Some(ct) = content_type {
1501 create_request = create_request.content_type(ct);
1502 }
1503
1504 let create_response = create_request
1505 .send()
1506 .await
1507 .map_err(|e| Error::Network(format!("create multipart upload: {e}")))?;
1508
1509 let upload_id = create_response
1510 .upload_id()
1511 .ok_or_else(|| Error::General("missing upload id from multipart upload".to_string()))?
1512 .to_string();
1513
1514 tracing::debug!(upload_id = %upload_id, "Multipart upload initiated");
1515
1516 let mut file = tokio::fs::File::open(file_path)
1517 .await
1518 .map_err(|e| Error::General(format!("open file '{}': {e}", file_path.display())))?;
1519 let mut completed_parts = Vec::new();
1520 let mut part_number: i32 = 1;
1521 let mut chunk = vec![0u8; part_buffer_size];
1522 let mut bytes_uploaded: u64 = 0;
1523
1524 loop {
1525 let bytes_read = Self::read_next_part(&mut file, file_path, &mut chunk).await?;
1526 if bytes_read == 0 {
1527 break;
1528 }
1529
1530 tracing::debug!(part_number, bytes_read, "Uploading part");
1531
1532 let body = aws_sdk_s3::primitives::ByteStream::from(chunk[..bytes_read].to_vec());
1533 let upload_part_result = self
1534 .inner
1535 .upload_part()
1536 .bucket(&path.bucket)
1537 .key(&path.key)
1538 .upload_id(&upload_id)
1539 .part_number(part_number)
1540 .body(body)
1541 .send()
1542 .await;
1543
1544 let upload_part_response = match upload_part_result {
1545 Ok(response) => response,
1546 Err(e) => {
1547 tracing::debug!(
1548 upload_id = %upload_id,
1549 part_number,
1550 "Aborting multipart upload due to error"
1551 );
1552 self.abort_multipart_upload_best_effort(path, &upload_id)
1553 .await;
1554 return Err(Error::Network(format!(
1555 "upload multipart part {part_number}: {e}"
1556 )));
1557 }
1558 };
1559
1560 let etag = match upload_part_response.e_tag() {
1561 Some(value) => value.trim_matches('"').to_string(),
1562 None => {
1563 self.abort_multipart_upload_best_effort(path, &upload_id)
1564 .await;
1565 return Err(Error::General(format!(
1566 "missing ETag for multipart part {part_number}"
1567 )));
1568 }
1569 };
1570
1571 completed_parts.push(
1572 CompletedPart::builder()
1573 .part_number(part_number)
1574 .e_tag(etag)
1575 .build(),
1576 );
1577
1578 bytes_uploaded += bytes_read as u64;
1579 on_progress(bytes_uploaded);
1580 tracing::debug!(part_number, bytes_uploaded, "Part uploaded");
1581
1582 part_number += 1;
1583 }
1584
1585 let completed_upload = CompletedMultipartUpload::builder()
1586 .set_parts(Some(completed_parts))
1587 .build();
1588 let complete_result = self
1589 .inner
1590 .complete_multipart_upload()
1591 .bucket(&path.bucket)
1592 .key(&path.key)
1593 .upload_id(&upload_id)
1594 .multipart_upload(completed_upload)
1595 .send()
1596 .await;
1597
1598 let complete_response = match complete_result {
1599 Ok(response) => response,
1600 Err(e) => {
1601 tracing::debug!(upload_id = %upload_id, "Attempting to abort multipart upload after completion failure");
1602 self.abort_multipart_upload_best_effort(path, &upload_id)
1603 .await;
1604 return Err(Error::Network(format!("complete multipart upload: {e}")));
1605 }
1606 };
1607
1608 tracing::debug!("Multipart upload completed");
1609
1610 let mut info = ObjectInfo::file(&path.key, file_size as i64);
1611 if let Some(etag) = complete_response.e_tag() {
1612 info.etag = Some(etag.trim_matches('"').to_string());
1613 }
1614 info.last_modified = Some(jiff::Timestamp::now());
1615
1616 Ok(info)
1617 }
1618
1619 pub async fn put_object_from_path(
1624 &self,
1625 path: &RemotePath,
1626 file_path: &std::path::Path,
1627 content_type: Option<&str>,
1628 on_progress: impl Fn(u64) + Send,
1629 ) -> Result<ObjectInfo> {
1630 let metadata = tokio::fs::metadata(file_path).await.map_err(|e| {
1631 Error::General(format!("read metadata for '{}': {e}", file_path.display()))
1632 })?;
1633 if !metadata.is_file() {
1634 return Err(Error::General(format!(
1635 "source is not a file: {}",
1636 file_path.display()
1637 )));
1638 }
1639
1640 let file_size = metadata.len();
1641 if Self::should_use_multipart(file_size) {
1642 self.put_object_multipart_from_path(
1643 path,
1644 file_path,
1645 content_type,
1646 file_size,
1647 on_progress,
1648 )
1649 .await
1650 } else {
1651 self.put_object_single_part_from_path(path, file_path, content_type, file_size)
1652 .await
1653 }
1654 }
1655}
1656
1657fn build_tagging(
1658 tags: std::collections::HashMap<String, String>,
1659) -> Result<aws_sdk_s3::types::Tagging> {
1660 use aws_sdk_s3::types::{Tag, Tagging};
1661
1662 let mut tag_set = Vec::with_capacity(tags.len());
1663 for (key, value) in tags {
1664 let tag = Tag::builder()
1665 .key(key)
1666 .value(value)
1667 .build()
1668 .map_err(|e| Error::General(format!("invalid tag: {e}")))?;
1669 tag_set.push(tag);
1670 }
1671
1672 Tagging::builder()
1673 .set_tag_set(Some(tag_set))
1674 .build()
1675 .map_err(|e| Error::General(format!("invalid tagging payload: {e}")))
1676}
1677
1678#[async_trait]
1679impl ObjectStore for S3Client {
1680 async fn list_buckets(&self) -> Result<Vec<ObjectInfo>> {
1681 let response = self
1682 .inner
1683 .list_buckets()
1684 .send()
1685 .await
1686 .map_err(|e| Error::Network(e.to_string()))?;
1687
1688 let buckets = response
1689 .buckets()
1690 .iter()
1691 .map(|b| {
1692 let mut info = ObjectInfo::bucket(b.name().unwrap_or_default());
1693 if let Some(creation_date) = b.creation_date() {
1694 info.last_modified = jiff::Timestamp::from_second(creation_date.secs()).ok();
1695 }
1696 info
1697 })
1698 .collect();
1699
1700 Ok(buckets)
1701 }
1702
1703 async fn list_objects(&self, path: &RemotePath, options: ListOptions) -> Result<ListResult> {
1704 let mut request = self.inner.list_objects_v2().bucket(&path.bucket);
1705
1706 let prefix = if path.key.is_empty() {
1708 options.prefix.clone()
1709 } else if let Some(p) = &options.prefix {
1710 Some(format!("{}{}", path.key, p))
1711 } else {
1712 Some(path.key.clone())
1713 };
1714
1715 if let Some(p) = prefix {
1716 request = request.prefix(p);
1717 }
1718
1719 if !options.recursive {
1721 request = request.delimiter(options.delimiter.as_deref().unwrap_or("/"));
1722 }
1723
1724 if let Some(max) = options.max_keys {
1726 request = request.max_keys(max);
1727 }
1728
1729 if let Some(token) = &options.continuation_token {
1731 request = request.continuation_token(token);
1732 }
1733
1734 let response = request.send().await.map_err(|e| {
1735 let err_str = Self::format_sdk_error(&e);
1736 if err_str.contains("NotFound") || err_str.contains("NoSuchBucket") {
1737 Error::NotFound(format!("Bucket not found: {}", path.bucket))
1738 } else {
1739 Error::Network(err_str)
1740 }
1741 })?;
1742
1743 let mut items = Vec::new();
1744
1745 for prefix in response.common_prefixes() {
1747 if let Some(p) = prefix.prefix() {
1748 items.push(ObjectInfo::dir(p));
1749 }
1750 }
1751
1752 for object in response.contents() {
1754 let key = object.key().unwrap_or_default().to_string();
1755 let size = object.size().unwrap_or(0);
1756 let mut info = ObjectInfo::file(&key, size);
1757
1758 if let Some(modified) = object.last_modified() {
1759 info.last_modified = jiff::Timestamp::from_second(modified.secs()).ok();
1760 }
1761
1762 if let Some(etag) = object.e_tag() {
1763 info.etag = Some(etag.trim_matches('"').to_string());
1764 }
1765
1766 if let Some(sc) = object.storage_class() {
1767 info.storage_class = Some(sc.as_str().to_string());
1768 }
1769
1770 items.push(info);
1771 }
1772
1773 Ok(ListResult {
1774 items,
1775 truncated: response.is_truncated().unwrap_or(false),
1776 continuation_token: response.next_continuation_token().map(|s| s.to_string()),
1777 })
1778 }
1779
1780 async fn head_object(&self, path: &RemotePath) -> Result<ObjectInfo> {
1781 let response = self
1782 .inner
1783 .head_object()
1784 .bucket(&path.bucket)
1785 .key(&path.key)
1786 .send()
1787 .await
1788 .map_err(|e| {
1789 let err_str = e.to_string();
1790 if err_str.contains("NotFound") || err_str.contains("NoSuchKey") {
1791 Error::NotFound(path.to_string())
1792 } else {
1793 Error::Network(err_str)
1794 }
1795 })?;
1796
1797 let size = response.content_length().unwrap_or(0);
1798 let mut info = ObjectInfo::file(&path.key, size);
1799
1800 if let Some(modified) = response.last_modified() {
1801 info.last_modified = jiff::Timestamp::from_second(modified.secs()).ok();
1802 }
1803
1804 if let Some(etag) = response.e_tag() {
1805 info.etag = Some(etag.trim_matches('"').to_string());
1806 }
1807
1808 if let Some(ct) = response.content_type() {
1809 info.content_type = Some(ct.to_string());
1810 }
1811
1812 if let Some(sc) = response.storage_class() {
1813 info.storage_class = Some(sc.as_str().to_string());
1814 }
1815
1816 if let Some(meta) = response.metadata()
1817 && !meta.is_empty()
1818 {
1819 info.metadata = Some(meta.clone());
1820 }
1821
1822 Ok(info)
1823 }
1824
1825 async fn bucket_exists(&self, bucket: &str) -> Result<bool> {
1826 match self.inner.head_bucket().bucket(bucket).send().await {
1827 Ok(_) => Ok(true),
1828 Err(e) => {
1829 if let aws_sdk_s3::error::SdkError::ServiceError(ref service_err) = e
1832 && service_err.raw().status().as_u16() == 404
1833 {
1834 return Ok(false);
1835 }
1836 let err_str = e.to_string();
1837 if err_str.contains("NotFound") || err_str.contains("NoSuchBucket") {
1838 return Ok(false);
1839 }
1840 Err(Error::Network(err_str))
1841 }
1842 }
1843 }
1844
1845 async fn create_bucket(&self, bucket: &str) -> Result<()> {
1846 self.inner
1847 .create_bucket()
1848 .bucket(bucket)
1849 .send()
1850 .await
1851 .map_err(|e| Error::Network(e.to_string()))?;
1852
1853 Ok(())
1854 }
1855
1856 async fn delete_bucket(&self, bucket: &str) -> Result<()> {
1857 self.inner
1858 .delete_bucket()
1859 .bucket(bucket)
1860 .send()
1861 .await
1862 .map_err(|e| {
1863 let err_str = Self::format_sdk_error(&e);
1864 if err_str.contains("NotFound") || err_str.contains("NoSuchBucket") {
1865 Error::NotFound(format!("Bucket not found: {bucket}"))
1866 } else if err_str.contains("BucketNotEmpty") {
1867 Error::Conflict(err_str)
1868 } else {
1869 Error::Network(err_str)
1870 }
1871 })?;
1872
1873 Ok(())
1874 }
1875
1876 async fn capabilities(&self) -> Result<Capabilities> {
1877 Ok(Capabilities {
1880 versioning: true,
1881 object_lock: false,
1882 tagging: true,
1883 anonymous: true,
1884 select: false,
1885 notifications: true,
1886 lifecycle: true,
1887 replication: true,
1888 cors: true,
1889 })
1890 }
1891
1892 async fn get_object(&self, path: &RemotePath) -> Result<Vec<u8>> {
1893 self.get_object_with_progress(path, |_, _| {}).await
1894 }
1895
1896 async fn put_object(
1897 &self,
1898 path: &RemotePath,
1899 data: Vec<u8>,
1900 content_type: Option<&str>,
1901 ) -> Result<ObjectInfo> {
1902 let size = data.len() as i64;
1903 let body = aws_sdk_s3::primitives::ByteStream::from(data);
1904
1905 let mut request = self
1906 .inner
1907 .put_object()
1908 .bucket(&path.bucket)
1909 .key(&path.key)
1910 .body(body);
1911
1912 if let Some(ct) = content_type {
1913 request = request.content_type(ct);
1914 }
1915
1916 let response = request
1917 .send()
1918 .await
1919 .map_err(|e| Error::Network(e.to_string()))?;
1920
1921 let mut info = ObjectInfo::file(&path.key, size);
1922 if let Some(etag) = response.e_tag() {
1923 info.etag = Some(etag.trim_matches('"').to_string());
1924 }
1925 info.last_modified = Some(jiff::Timestamp::now());
1926
1927 Ok(info)
1928 }
1929
1930 async fn delete_object(&self, path: &RemotePath) -> Result<()> {
1931 self.delete_object_with_options(path, DeleteRequestOptions::default())
1932 .await
1933 }
1934
1935 async fn delete_objects(&self, bucket: &str, keys: Vec<String>) -> Result<Vec<String>> {
1936 self.delete_objects_with_options(bucket, keys, DeleteRequestOptions::default())
1937 .await
1938 }
1939
1940 async fn copy_object(&self, src: &RemotePath, dst: &RemotePath) -> Result<ObjectInfo> {
1941 let copy_source = format!("{}/{}", src.bucket, src.key);
1943
1944 let response = self
1945 .inner
1946 .copy_object()
1947 .copy_source(©_source)
1948 .bucket(&dst.bucket)
1949 .key(&dst.key)
1950 .send()
1951 .await
1952 .map_err(|e| {
1953 let err_str = e.to_string();
1954 if err_str.contains("NotFound") || err_str.contains("NoSuchKey") {
1955 Error::NotFound(src.to_string())
1956 } else {
1957 Error::Network(err_str)
1958 }
1959 })?;
1960
1961 let info = self.head_object(dst).await?;
1963
1964 let mut result = info;
1966 if let Some(copy_result) = response.copy_object_result()
1967 && let Some(etag) = copy_result.e_tag()
1968 {
1969 result.etag = Some(etag.trim_matches('"').to_string());
1970 }
1971
1972 Ok(result)
1973 }
1974
1975 async fn presign_get(&self, path: &RemotePath, expires_secs: u64) -> Result<String> {
1976 let config = aws_sdk_s3::presigning::PresigningConfig::builder()
1977 .expires_in(std::time::Duration::from_secs(expires_secs))
1978 .build()
1979 .map_err(|e| Error::General(format!("presign_get config: {e}")))?;
1980
1981 let request = self
1982 .presign_inner
1983 .get_object()
1984 .bucket(&path.bucket)
1985 .key(&path.key)
1986 .presigned(config)
1987 .await
1988 .map_err(|e| Error::General(format!("presign_get: {e}")))?;
1989
1990 Ok(request.uri().to_string())
1991 }
1992
1993 async fn presign_put(
1994 &self,
1995 path: &RemotePath,
1996 expires_secs: u64,
1997 content_type: Option<&str>,
1998 ) -> Result<String> {
1999 let config = aws_sdk_s3::presigning::PresigningConfig::builder()
2000 .expires_in(std::time::Duration::from_secs(expires_secs))
2001 .build()
2002 .map_err(|e| Error::General(format!("presign_put config: {e}")))?;
2003
2004 let mut builder = self
2005 .presign_inner
2006 .put_object()
2007 .bucket(&path.bucket)
2008 .key(&path.key);
2009
2010 if let Some(ct) = content_type {
2011 builder = builder.content_type(ct);
2012 }
2013
2014 let request = builder
2015 .presigned(config)
2016 .await
2017 .map_err(|e| Error::General(format!("presign_put: {e}")))?;
2018
2019 Ok(request.uri().to_string())
2020 }
2021
2022 async fn get_versioning(&self, bucket: &str) -> Result<Option<bool>> {
2023 let response = self
2024 .inner
2025 .get_bucket_versioning()
2026 .bucket(bucket)
2027 .send()
2028 .await
2029 .map_err(|e| Error::General(format!("get_versioning: {e}")))?;
2030
2031 Ok(response
2032 .status()
2033 .map(|s| *s == aws_sdk_s3::types::BucketVersioningStatus::Enabled))
2034 }
2035
2036 async fn set_versioning(&self, bucket: &str, enabled: bool) -> Result<()> {
2037 use aws_sdk_s3::types::{BucketVersioningStatus, VersioningConfiguration};
2038
2039 let status = if enabled {
2040 BucketVersioningStatus::Enabled
2041 } else {
2042 BucketVersioningStatus::Suspended
2043 };
2044
2045 let config = VersioningConfiguration::builder().status(status).build();
2046
2047 self.inner
2048 .put_bucket_versioning()
2049 .bucket(bucket)
2050 .versioning_configuration(config)
2051 .send()
2052 .await
2053 .map_err(|e| Error::General(format!("set_versioning: {e}")))?;
2054
2055 Ok(())
2056 }
2057
2058 async fn list_object_versions(
2059 &self,
2060 path: &RemotePath,
2061 max_keys: Option<i32>,
2062 ) -> Result<Vec<ObjectVersion>> {
2063 Ok(self.list_object_versions_page(path, max_keys).await?.items)
2064 }
2065
2066 async fn get_object_tags(
2067 &self,
2068 path: &RemotePath,
2069 ) -> Result<std::collections::HashMap<String, String>> {
2070 let response = match self
2071 .inner
2072 .get_object_tagging()
2073 .bucket(&path.bucket)
2074 .key(&path.key)
2075 .send()
2076 .await
2077 {
2078 Ok(response) => response,
2079 Err(e) => {
2080 if e.to_string().contains("NoSuchTagSet") {
2081 return Ok(std::collections::HashMap::new());
2082 }
2083 return Err(Error::General(format!("get_object_tags: {e}")));
2084 }
2085 };
2086
2087 let mut tags = std::collections::HashMap::new();
2088 for tag in response.tag_set() {
2089 let key = tag.key();
2090 let value = tag.value();
2091 tags.insert(key.to_string(), value.to_string());
2092 }
2093
2094 Ok(tags)
2095 }
2096
2097 async fn get_bucket_tags(
2098 &self,
2099 bucket: &str,
2100 ) -> Result<std::collections::HashMap<String, String>> {
2101 let response = match self.inner.get_bucket_tagging().bucket(bucket).send().await {
2102 Ok(response) => response,
2103 Err(e) => {
2104 if e.to_string().contains("NoSuchTagSet") {
2105 return Ok(std::collections::HashMap::new());
2106 }
2107 return Err(Error::General(format!("get_bucket_tags: {e}")));
2108 }
2109 };
2110
2111 let mut tags = std::collections::HashMap::new();
2112 for tag in response.tag_set() {
2113 let key = tag.key();
2114 let value = tag.value();
2115 tags.insert(key.to_string(), value.to_string());
2116 }
2117
2118 Ok(tags)
2119 }
2120
2121 async fn set_object_tags(
2122 &self,
2123 path: &RemotePath,
2124 tags: std::collections::HashMap<String, String>,
2125 ) -> Result<()> {
2126 let tagging = build_tagging(tags)?;
2127
2128 self.inner
2129 .put_object_tagging()
2130 .bucket(&path.bucket)
2131 .key(&path.key)
2132 .tagging(tagging)
2133 .send()
2134 .await
2135 .map_err(|e| Error::General(format!("set_object_tags: {e}")))?;
2136
2137 Ok(())
2138 }
2139
2140 async fn set_bucket_tags(
2141 &self,
2142 bucket: &str,
2143 tags: std::collections::HashMap<String, String>,
2144 ) -> Result<()> {
2145 let tagging = build_tagging(tags)?;
2146
2147 self.inner
2148 .put_bucket_tagging()
2149 .bucket(bucket)
2150 .tagging(tagging)
2151 .send()
2152 .await
2153 .map_err(|e| Error::General(format!("set_bucket_tags: {e}")))?;
2154
2155 Ok(())
2156 }
2157
2158 async fn delete_object_tags(&self, path: &RemotePath) -> Result<()> {
2159 self.inner
2160 .delete_object_tagging()
2161 .bucket(&path.bucket)
2162 .key(&path.key)
2163 .send()
2164 .await
2165 .map_err(|e| Error::General(format!("delete_object_tags: {e}")))?;
2166
2167 Ok(())
2168 }
2169
2170 async fn delete_bucket_tags(&self, bucket: &str) -> Result<()> {
2171 self.inner
2172 .delete_bucket_tagging()
2173 .bucket(bucket)
2174 .send()
2175 .await
2176 .map_err(|e| Error::General(format!("delete_bucket_tags: {e}")))?;
2177
2178 Ok(())
2179 }
2180
2181 async fn get_bucket_policy(&self, bucket: &str) -> Result<Option<String>> {
2182 let response = match self.inner.get_bucket_policy().bucket(bucket).send().await {
2183 Ok(policy) => policy,
2184 Err(error) => {
2185 let error_text = error.to_string();
2186 let kind = if let aws_sdk_s3::error::SdkError::ServiceError(service_err) = &error {
2187 let code = service_err
2188 .raw()
2189 .headers()
2190 .get("x-amz-error-code")
2191 .and_then(|value| std::str::from_utf8(value.as_bytes()).ok());
2192 let status = Some(service_err.raw().status().as_u16());
2193 Self::bucket_policy_error_kind(code, status, &error_text)
2194 } else {
2195 Self::bucket_policy_error_kind(None, None, &error_text)
2196 };
2197 return Self::map_get_bucket_policy_error(bucket, kind, &error_text);
2198 }
2199 };
2200
2201 Ok(response.policy().map(|policy| policy.to_string()))
2202 }
2203
2204 async fn set_bucket_policy(&self, bucket: &str, policy: &str) -> Result<()> {
2205 self.inner
2206 .put_bucket_policy()
2207 .bucket(bucket)
2208 .policy(policy)
2209 .send()
2210 .await
2211 .map_err(|e| Error::General(format!("set_bucket_policy: {e}")))?;
2212
2213 Ok(())
2214 }
2215
2216 async fn delete_bucket_policy(&self, bucket: &str) -> Result<()> {
2217 match self
2218 .inner
2219 .delete_bucket_policy()
2220 .bucket(bucket)
2221 .send()
2222 .await
2223 {
2224 Ok(_) => Ok(()),
2225 Err(e) => {
2226 let error_text = e.to_string();
2227 let kind = if let aws_sdk_s3::error::SdkError::ServiceError(service_err) = &e {
2228 let code = service_err
2229 .raw()
2230 .headers()
2231 .get("x-amz-error-code")
2232 .and_then(|value| std::str::from_utf8(value.as_bytes()).ok());
2233 let status = Some(service_err.raw().status().as_u16());
2234 Self::bucket_policy_error_kind(code, status, &error_text)
2235 } else {
2236 Self::bucket_policy_error_kind(None, None, &error_text)
2237 };
2238 Self::map_delete_bucket_policy_error(bucket, kind, &error_text)
2239 }
2240 }
2241 }
2242
2243 async fn get_bucket_cors(&self, bucket: &str) -> Result<Vec<CorsRule>> {
2244 let response = match self.inner.get_bucket_cors().bucket(bucket).send().await {
2245 Ok(response) => response,
2246 Err(error) => {
2247 let error_text = error.to_string();
2248 if error_text.contains("service error")
2249 && let Ok(url) = self.cors_url(bucket)
2250 {
2251 match self.xml_request(Method::GET, url, None, None).await {
2252 Ok(body) => return parse_cors_configuration_xml(&body),
2253 Err(Error::Network(raw_error))
2254 if is_missing_cors_configuration_error(&raw_error) =>
2255 {
2256 return Ok(Vec::new());
2257 }
2258 Err(_) => {}
2259 }
2260 }
2261
2262 let missing_config =
2263 if let aws_sdk_s3::error::SdkError::ServiceError(service_err) = &error {
2264 is_missing_cors_configuration_response(
2265 service_err.err().code(),
2266 Some(service_err.raw().status().as_u16()),
2267 &error_text,
2268 )
2269 } else {
2270 is_missing_cors_configuration_response(None, None, &error_text)
2271 };
2272
2273 if missing_config {
2274 return Ok(Vec::new());
2275 }
2276 return Err(Error::General(format!("get_bucket_cors: {error_text}")));
2277 }
2278 };
2279
2280 Ok(response
2281 .cors_rules()
2282 .iter()
2283 .map(sdk_cors_rule_to_core)
2284 .collect())
2285 }
2286
2287 async fn set_bucket_cors(&self, bucket: &str, rules: Vec<CorsRule>) -> Result<()> {
2288 let cors_rules = rules
2289 .iter()
2290 .map(core_cors_rule_to_sdk)
2291 .collect::<Result<Vec<_>>>()?;
2292 let cors_configuration = aws_sdk_s3::types::CorsConfiguration::builder()
2293 .set_cors_rules(Some(cors_rules))
2294 .build()
2295 .map_err(|e| Error::General(format!("build bucket cors config: {e}")))?;
2296
2297 self.inner
2298 .put_bucket_cors()
2299 .bucket(bucket)
2300 .cors_configuration(cors_configuration)
2301 .send()
2302 .await
2303 .map_err(|e| Error::General(format!("set_bucket_cors: {e}")))?;
2304
2305 Ok(())
2306 }
2307
2308 async fn delete_bucket_cors(&self, bucket: &str) -> Result<()> {
2309 match self.inner.delete_bucket_cors().bucket(bucket).send().await {
2310 Ok(_) => Ok(()),
2311 Err(error) => {
2312 let error_text = error.to_string();
2313 if error_text.contains("service error")
2314 && let Ok(url) = self.cors_url(bucket)
2315 {
2316 match self.xml_request(Method::DELETE, url, None, None).await {
2317 Ok(_) => return Ok(()),
2318 Err(Error::Network(raw_error))
2319 if is_missing_cors_configuration_error(&raw_error) =>
2320 {
2321 return Ok(());
2322 }
2323 Err(_) => {}
2324 }
2325 }
2326
2327 let missing_config =
2328 if let aws_sdk_s3::error::SdkError::ServiceError(service_err) = &error {
2329 is_missing_cors_configuration_response(
2330 service_err.err().code(),
2331 Some(service_err.raw().status().as_u16()),
2332 &error_text,
2333 )
2334 } else {
2335 is_missing_cors_configuration_response(None, None, &error_text)
2336 };
2337
2338 if missing_config {
2339 Ok(())
2340 } else {
2341 Err(Error::General(format!("delete_bucket_cors: {error_text}")))
2342 }
2343 }
2344 }
2345 }
2346
2347 async fn get_bucket_notifications(&self, bucket: &str) -> Result<Vec<BucketNotification>> {
2348 let response = self
2349 .inner
2350 .get_bucket_notification_configuration()
2351 .bucket(bucket)
2352 .send()
2353 .await
2354 .map_err(|e| Error::General(format!("get_bucket_notifications: {e}")))?;
2355
2356 let mut rules = Vec::new();
2357
2358 for cfg in response.queue_configurations() {
2359 let (prefix, suffix) = Self::extract_notification_filter(cfg.filter());
2360 rules.push(BucketNotification {
2361 id: cfg.id().map(ToString::to_string),
2362 target: NotificationTarget::Queue,
2363 arn: cfg.queue_arn().to_string(),
2364 events: Self::event_list_to_strings(cfg.events()),
2365 prefix,
2366 suffix,
2367 });
2368 }
2369
2370 for cfg in response.topic_configurations() {
2371 let (prefix, suffix) = Self::extract_notification_filter(cfg.filter());
2372 rules.push(BucketNotification {
2373 id: cfg.id().map(ToString::to_string),
2374 target: NotificationTarget::Topic,
2375 arn: cfg.topic_arn().to_string(),
2376 events: Self::event_list_to_strings(cfg.events()),
2377 prefix,
2378 suffix,
2379 });
2380 }
2381
2382 for cfg in response.lambda_function_configurations() {
2383 let (prefix, suffix) = Self::extract_notification_filter(cfg.filter());
2384 rules.push(BucketNotification {
2385 id: cfg.id().map(ToString::to_string),
2386 target: NotificationTarget::Lambda,
2387 arn: cfg.lambda_function_arn().to_string(),
2388 events: Self::event_list_to_strings(cfg.events()),
2389 prefix,
2390 suffix,
2391 });
2392 }
2393
2394 Ok(rules)
2395 }
2396
2397 async fn set_bucket_notifications(
2398 &self,
2399 bucket: &str,
2400 notifications: Vec<BucketNotification>,
2401 ) -> Result<()> {
2402 use aws_sdk_s3::types::{
2403 LambdaFunctionConfiguration, NotificationConfiguration, QueueConfiguration,
2404 TopicConfiguration,
2405 };
2406
2407 let expected_notifications = notifications.clone();
2408 let mut queues = Vec::new();
2409 let mut topics = Vec::new();
2410 let mut lambdas = Vec::new();
2411
2412 for rule in notifications {
2413 let events = Self::strings_to_event_list(&rule.events);
2414 if events.is_empty() {
2415 return Err(Error::General(format!(
2416 "set_bucket_notifications: empty event list for target '{}'",
2417 rule.arn
2418 )));
2419 }
2420
2421 let filter =
2422 Self::build_notification_filter(rule.prefix.as_deref(), rule.suffix.as_deref());
2423
2424 match rule.target {
2425 NotificationTarget::Queue => {
2426 let mut builder = QueueConfiguration::builder()
2427 .queue_arn(rule.arn)
2428 .set_events(Some(events))
2429 .set_id(rule.id);
2430 if let Some(filter) = filter {
2431 builder = builder.filter(filter);
2432 }
2433 let queue = builder
2434 .build()
2435 .map_err(|e| Error::General(format!("build queue notification: {e}")))?;
2436 queues.push(queue);
2437 }
2438 NotificationTarget::Topic => {
2439 let mut builder = TopicConfiguration::builder()
2440 .topic_arn(rule.arn)
2441 .set_events(Some(events))
2442 .set_id(rule.id);
2443 if let Some(filter) = filter {
2444 builder = builder.filter(filter);
2445 }
2446 let topic = builder
2447 .build()
2448 .map_err(|e| Error::General(format!("build topic notification: {e}")))?;
2449 topics.push(topic);
2450 }
2451 NotificationTarget::Lambda => {
2452 let mut builder = LambdaFunctionConfiguration::builder()
2453 .lambda_function_arn(rule.arn)
2454 .set_events(Some(events))
2455 .set_id(rule.id);
2456 if let Some(filter) = filter {
2457 builder = builder.filter(filter);
2458 }
2459 let lambda = builder
2460 .build()
2461 .map_err(|e| Error::General(format!("build lambda notification: {e}")))?;
2462 lambdas.push(lambda);
2463 }
2464 }
2465 }
2466
2467 let config = NotificationConfiguration::builder()
2468 .set_queue_configurations(Some(queues))
2469 .set_topic_configurations(Some(topics))
2470 .set_lambda_function_configurations(Some(lambdas))
2471 .build();
2472
2473 match self
2474 .inner
2475 .put_bucket_notification_configuration()
2476 .bucket(bucket)
2477 .notification_configuration(config)
2478 .send()
2479 .await
2480 {
2481 Ok(_) => {}
2482 Err(error) => {
2483 let error_text = error.to_string();
2484 if error_text.contains("service error")
2487 && let Ok(actual) = self.get_bucket_notifications(bucket).await
2488 && Self::notifications_equivalent(&expected_notifications, &actual)
2489 {
2490 return Ok(());
2491 }
2492 return Err(Error::General(format!(
2493 "set_bucket_notifications: {error_text}"
2494 )));
2495 }
2496 }
2497
2498 Ok(())
2499 }
2500
2501 async fn get_bucket_lifecycle(&self, bucket: &str) -> Result<Vec<LifecycleRule>> {
2502 let response = match self
2503 .inner
2504 .get_bucket_lifecycle_configuration()
2505 .bucket(bucket)
2506 .send()
2507 .await
2508 {
2509 Ok(resp) => resp,
2510 Err(error) => {
2511 let error_text = Self::format_sdk_error(&error);
2512 if error_text.contains("NoSuchLifecycleConfiguration")
2513 || error_text.contains("lifecycle configuration is not found")
2514 {
2515 return Ok(Vec::new());
2516 }
2517 return Err(Error::General(format!(
2518 "get_bucket_lifecycle: {error_text}"
2519 )));
2520 }
2521 };
2522
2523 let mut rules = Vec::new();
2524 for sdk_rule in response.rules() {
2525 let id = sdk_rule.id().unwrap_or("").to_string();
2526 let status = match sdk_rule.status().as_str() {
2527 "Enabled" => rc_core::LifecycleRuleStatus::Enabled,
2528 _ => rc_core::LifecycleRuleStatus::Disabled,
2529 };
2530
2531 let prefix = parse_lifecycle_filter_prefix(sdk_rule.filter());
2532 let tags = parse_lifecycle_filter_tags(sdk_rule.filter());
2533
2534 let expiration = sdk_rule
2535 .expiration()
2536 .map(|exp| rc_core::LifecycleExpiration {
2537 days: exp.days(),
2538 date: exp.date().map(|d| d.to_string()),
2539 });
2540
2541 let transition = sdk_rule
2542 .transitions()
2543 .first()
2544 .map(|t| rc_core::LifecycleTransition {
2545 days: t.days(),
2546 date: t.date().map(|d| d.to_string()),
2547 storage_class: t
2548 .storage_class()
2549 .map(|sc| sc.as_str().to_string())
2550 .unwrap_or_default(),
2551 });
2552
2553 let noncurrent_version_expiration =
2554 sdk_rule.noncurrent_version_expiration().map(|nve| {
2555 rc_core::NoncurrentVersionExpiration {
2556 noncurrent_days: nve.noncurrent_days().unwrap_or(0),
2557 newer_noncurrent_versions: nve.newer_noncurrent_versions(),
2558 }
2559 });
2560
2561 let noncurrent_version_transition = sdk_rule
2562 .noncurrent_version_transitions()
2563 .first()
2564 .map(|nvt| rc_core::NoncurrentVersionTransition {
2565 noncurrent_days: nvt.noncurrent_days().unwrap_or(0),
2566 storage_class: nvt
2567 .storage_class()
2568 .map(|sc| sc.as_str().to_string())
2569 .unwrap_or_default(),
2570 });
2571
2572 let abort_incomplete_multipart_upload_days = sdk_rule
2573 .abort_incomplete_multipart_upload()
2574 .and_then(|a| a.days_after_initiation());
2575
2576 let expired_object_delete_marker = sdk_rule
2577 .expiration()
2578 .and_then(|e| e.expired_object_delete_marker())
2579 .filter(|v| *v);
2580
2581 rules.push(LifecycleRule {
2582 id,
2583 status,
2584 prefix,
2585 tags,
2586 expiration,
2587 transition,
2588 noncurrent_version_expiration,
2589 noncurrent_version_transition,
2590 abort_incomplete_multipart_upload_days,
2591 expired_object_delete_marker,
2592 });
2593 }
2594
2595 Ok(rules)
2596 }
2597
2598 async fn set_bucket_lifecycle(&self, bucket: &str, rules: Vec<LifecycleRule>) -> Result<()> {
2599 use aws_sdk_s3::types::{
2600 AbortIncompleteMultipartUpload, BucketLifecycleConfiguration, ExpirationStatus,
2601 LifecycleExpiration as SdkExpiration, LifecycleRule as SdkRule,
2602 NoncurrentVersionExpiration as SdkNve, NoncurrentVersionTransition as SdkNvt,
2603 Transition, TransitionStorageClass,
2604 };
2605
2606 let mut sdk_rules = Vec::new();
2607 for rule in rules {
2608 let status = match rule.status {
2609 rc_core::LifecycleRuleStatus::Enabled => ExpirationStatus::Enabled,
2610 rc_core::LifecycleRuleStatus::Disabled => ExpirationStatus::Disabled,
2611 };
2612
2613 let filter = build_lifecycle_rule_filter(rule.prefix.as_deref(), rule.tags.as_ref())?;
2614
2615 let expiration = rule.expiration.map(|exp| {
2616 let mut builder = SdkExpiration::builder();
2617 if let Some(days) = exp.days {
2618 builder = builder.days(days);
2619 }
2620 if let Some(ref date_str) = exp.date
2621 && let Ok(dt) = aws_smithy_types::DateTime::from_str(
2622 date_str,
2623 aws_smithy_types::date_time::Format::DateTime,
2624 )
2625 {
2626 builder = builder.date(dt);
2627 }
2628 if let Some(true) = rule.expired_object_delete_marker {
2629 builder = builder.expired_object_delete_marker(true);
2630 }
2631 builder.build()
2632 });
2633
2634 let transitions = rule.transition.map(|t| {
2635 #[allow(deprecated)]
2636 let sc = TransitionStorageClass::from(t.storage_class.as_str());
2637 let mut builder = Transition::builder().storage_class(sc);
2638 if let Some(days) = t.days {
2639 builder = builder.days(days);
2640 }
2641 if let Some(ref date_str) = t.date
2642 && let Ok(dt) = aws_smithy_types::DateTime::from_str(
2643 date_str,
2644 aws_smithy_types::date_time::Format::DateTime,
2645 )
2646 {
2647 builder = builder.date(dt);
2648 }
2649 vec![builder.build()]
2650 });
2651
2652 let nve = rule.noncurrent_version_expiration.map(|nve| {
2653 let mut builder = SdkNve::builder().noncurrent_days(nve.noncurrent_days);
2654 if let Some(newer) = nve.newer_noncurrent_versions {
2655 builder = builder.newer_noncurrent_versions(newer);
2656 }
2657 builder.build()
2658 });
2659
2660 let nvt = rule.noncurrent_version_transition.map(|nvt| {
2661 let sc = TransitionStorageClass::from(nvt.storage_class.as_str());
2662 let builder = SdkNvt::builder()
2663 .noncurrent_days(nvt.noncurrent_days)
2664 .storage_class(sc);
2665 vec![builder.build()]
2666 });
2667
2668 let abort = rule.abort_incomplete_multipart_upload_days.map(|days| {
2669 AbortIncompleteMultipartUpload::builder()
2670 .days_after_initiation(days)
2671 .build()
2672 });
2673
2674 let mut builder = SdkRule::builder().id(&rule.id).status(status);
2675 if let Some(filter) = filter {
2676 builder = builder.filter(filter);
2677 }
2678 if let Some(expiration) = expiration {
2679 builder = builder.expiration(expiration);
2680 }
2681 if let Some(transitions) = transitions {
2682 builder = builder.set_transitions(Some(transitions));
2683 }
2684 if let Some(nve) = nve {
2685 builder = builder.noncurrent_version_expiration(nve);
2686 }
2687 if let Some(nvt) = nvt {
2688 builder = builder.set_noncurrent_version_transitions(Some(nvt));
2689 }
2690 if let Some(abort) = abort {
2691 builder = builder.abort_incomplete_multipart_upload(abort);
2692 }
2693
2694 let sdk_rule = builder
2695 .build()
2696 .map_err(|e| Error::General(format!("build lifecycle rule: {e}")))?;
2697 sdk_rules.push(sdk_rule);
2698 }
2699
2700 let config = BucketLifecycleConfiguration::builder()
2701 .set_rules(Some(sdk_rules))
2702 .build()
2703 .map_err(|e| Error::General(format!("build lifecycle config: {e}")))?;
2704
2705 self.inner
2706 .put_bucket_lifecycle_configuration()
2707 .bucket(bucket)
2708 .lifecycle_configuration(config)
2709 .send()
2710 .await
2711 .map_err(|e| {
2712 Error::General(format!(
2713 "set_bucket_lifecycle: {}",
2714 Self::format_sdk_error(&e)
2715 ))
2716 })?;
2717
2718 Ok(())
2719 }
2720
2721 async fn delete_bucket_lifecycle(&self, bucket: &str) -> Result<()> {
2722 self.inner
2723 .delete_bucket_lifecycle()
2724 .bucket(bucket)
2725 .send()
2726 .await
2727 .map_err(|e| {
2728 Error::General(format!(
2729 "delete_bucket_lifecycle: {}",
2730 Self::format_sdk_error(&e)
2731 ))
2732 })?;
2733 Ok(())
2734 }
2735
2736 async fn restore_object(&self, path: &RemotePath, days: i32) -> Result<()> {
2737 use aws_sdk_s3::types::RestoreRequest;
2738
2739 let request = RestoreRequest::builder().days(days).build();
2740 self.inner
2741 .restore_object()
2742 .bucket(&path.bucket)
2743 .key(&path.key)
2744 .restore_request(request)
2745 .send()
2746 .await
2747 .map_err(|e| {
2748 Error::General(format!("restore_object: {}", Self::format_sdk_error(&e)))
2749 })?;
2750 Ok(())
2751 }
2752
2753 async fn get_bucket_replication(
2754 &self,
2755 bucket: &str,
2756 ) -> Result<Option<ReplicationConfiguration>> {
2757 let url = self.replication_url(bucket)?;
2758 let body = match self.xml_request(Method::GET, url, None, None).await {
2759 Ok(body) => body,
2760 Err(Error::Network(error_text))
2761 if error_text.contains("ReplicationConfigurationNotFound")
2762 || error_text.contains("replication configuration is not found")
2763 || error_text.contains("replication not found") =>
2764 {
2765 return Ok(None);
2766 }
2767 Err(error) => {
2768 return Err(Error::General(format!("get_bucket_replication: {error}")));
2769 }
2770 };
2771
2772 parse_replication_configuration_xml(&body).map(Some)
2773 }
2774
2775 async fn set_bucket_replication(
2776 &self,
2777 bucket: &str,
2778 config: ReplicationConfiguration,
2779 ) -> Result<()> {
2780 let url = self.replication_url(bucket)?;
2781 let body = build_replication_configuration_xml(&config).into_bytes();
2782 self.xml_request(Method::PUT, url, Some("application/xml"), Some(body))
2783 .await
2784 .map_err(|e| Error::General(format!("set_bucket_replication: {e}")))?;
2785
2786 Ok(())
2787 }
2788
2789 async fn delete_bucket_replication(&self, bucket: &str) -> Result<()> {
2790 self.inner
2791 .delete_bucket_replication()
2792 .bucket(bucket)
2793 .send()
2794 .await
2795 .map_err(|e| {
2796 Error::General(format!(
2797 "delete_bucket_replication: {}",
2798 Self::format_sdk_error(&e)
2799 ))
2800 })?;
2801 Ok(())
2802 }
2803
2804 async fn select_object_content(
2805 &self,
2806 path: &RemotePath,
2807 options: &SelectOptions,
2808 writer: &mut (dyn AsyncWrite + Send + Unpin),
2809 ) -> Result<()> {
2810 crate::select::select_object_content(&self.inner, path, options, writer).await
2811 }
2812}
2813
2814#[cfg(test)]
2815mod tests {
2816 use super::*;
2817 use aws_smithy_http_client::test_util::{CaptureRequestReceiver, capture_request};
2818 use std::collections::HashMap;
2819
2820 fn test_s3_client(
2821 response: Option<http::Response<SdkBody>>,
2822 ) -> (S3Client, CaptureRequestReceiver) {
2823 test_s3_client_with_endpoint("https://example.com", response)
2824 }
2825
2826 fn test_s3_client_with_endpoint(
2827 endpoint: &str,
2828 response: Option<http::Response<SdkBody>>,
2829 ) -> (S3Client, CaptureRequestReceiver) {
2830 test_s3_client_with_endpoint_and_headers(endpoint, response, Vec::new())
2831 }
2832
2833 fn test_s3_client_with_endpoint_and_headers(
2834 endpoint: &str,
2835 response: Option<http::Response<SdkBody>>,
2836 request_headers: Vec<RequestHeader>,
2837 ) -> (S3Client, CaptureRequestReceiver) {
2838 let (http_client, request_receiver) = capture_request(response);
2839 let credentials = Credentials::new(
2840 "access-key",
2841 "secret-key",
2842 None,
2843 None,
2844 "rc-test-credentials",
2845 );
2846 let mut config_builder = aws_sdk_s3::config::Builder::new()
2847 .credentials_provider(credentials)
2848 .endpoint_url(endpoint)
2849 .region(aws_sdk_s3::config::Region::new("us-east-1"))
2850 .force_path_style(true)
2851 .behavior_version_latest()
2852 .http_client(http_client);
2853
2854 let presign_config = config_builder.clone().build();
2855
2856 if !request_headers.is_empty() {
2857 config_builder = config_builder.interceptor(CustomHeaderInterceptor {
2858 headers: request_headers.clone(),
2859 });
2860 }
2861
2862 let config = config_builder.build();
2863
2864 let alias = Alias::new("test", endpoint, "access-key", "secret-key");
2865 let client = S3Client {
2866 inner: aws_sdk_s3::Client::from_conf(config),
2867 presign_inner: aws_sdk_s3::Client::from_conf(presign_config),
2868 xml_http_client: reqwest::Client::new(),
2869 alias,
2870 request_headers,
2871 };
2872
2873 (client, request_receiver)
2874 }
2875
2876 #[test]
2877 fn test_object_info_creation() {
2878 let info = ObjectInfo::file("test.txt", 1024);
2879 assert_eq!(info.key, "test.txt");
2880 assert_eq!(info.size_bytes, Some(1024));
2881 }
2882
2883 #[test]
2884 fn auto_bucket_lookup_uses_dns_for_aliyun_oss_service_endpoint() {
2885 let alias = Alias::new(
2886 "aliyun",
2887 "https://oss-cn-hangzhou.aliyuncs.com",
2888 "access-key",
2889 "secret-key",
2890 );
2891
2892 assert!(!force_path_style_for_alias(&alias));
2893 }
2894
2895 #[test]
2896 fn auto_bucket_lookup_uses_dns_for_aliyun_internal_service_endpoint() {
2897 let alias = Alias::new(
2898 "aliyun",
2899 "https://oss-cn-hangzhou-internal.aliyuncs.com",
2900 "access-key",
2901 "secret-key",
2902 );
2903
2904 assert!(!force_path_style_for_alias(&alias));
2905 }
2906
2907 #[test]
2908 fn auto_bucket_lookup_keeps_path_style_for_custom_endpoint() {
2909 let alias = Alias::new("local", "http://localhost:9000", "access-key", "secret-key");
2910
2911 assert!(force_path_style_for_alias(&alias));
2912 }
2913
2914 #[test]
2915 fn auto_bucket_lookup_keeps_path_style_for_non_oss_aliyun_endpoint() {
2916 let alias = Alias::new(
2917 "aliyun",
2918 "https://ecs-cn-hangzhou.aliyuncs.com",
2919 "access-key",
2920 "secret-key",
2921 );
2922
2923 assert!(force_path_style_for_alias(&alias));
2924 }
2925
2926 #[test]
2927 fn auto_bucket_lookup_keeps_path_style_for_invalid_endpoint() {
2928 let alias = Alias::new("broken", "not a valid endpoint", "access-key", "secret-key");
2929
2930 assert!(force_path_style_for_alias(&alias));
2931 }
2932
2933 #[test]
2934 fn explicit_bucket_lookup_overrides_auto_detection() {
2935 let mut path_alias = Alias::new(
2936 "aliyun",
2937 "https://oss-cn-hangzhou.aliyuncs.com",
2938 "access-key",
2939 "secret-key",
2940 );
2941 path_alias.bucket_lookup = "path".to_string();
2942
2943 let mut dns_alias =
2944 Alias::new("local", "http://localhost:9000", "access-key", "secret-key");
2945 dns_alias.bucket_lookup = "dns".to_string();
2946
2947 assert!(force_path_style_for_alias(&path_alias));
2948 assert!(!force_path_style_for_alias(&dns_alias));
2949 }
2950
2951 #[test]
2952 fn unknown_bucket_lookup_keeps_path_style() {
2953 let mut alias = Alias::new(
2954 "aliyun",
2955 "https://oss-cn-hangzhou.aliyuncs.com",
2956 "access-key",
2957 "secret-key",
2958 );
2959 alias.bucket_lookup = "unexpected".to_string();
2960
2961 assert!(force_path_style_for_alias(&alias));
2962 }
2963
2964 #[test]
2965 fn parse_replication_configuration_xml_reads_delete_replication() {
2966 let body = r#"<?xml version="1.0" encoding="UTF-8"?>
2967<ReplicationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
2968 <Role>arn:rustfs:replication:us-east-1:123:test</Role>
2969 <Rule>
2970 <Status>Enabled</Status>
2971 <Destination>
2972 <Bucket>arn:rustfs:replication:us-east-1:123:dest</Bucket>
2973 <StorageClass>STANDARD</StorageClass>
2974 </Destination>
2975 <ID>rule-1</ID>
2976 <Priority>1</Priority>
2977 <Filter>
2978 <Prefix>logs/</Prefix>
2979 </Filter>
2980 <ExistingObjectReplication>
2981 <Status>Enabled</Status>
2982 </ExistingObjectReplication>
2983 <DeleteMarkerReplication>
2984 <Status>Disabled</Status>
2985 </DeleteMarkerReplication>
2986 <DeleteReplication>
2987 <Status>Enabled</Status>
2988 </DeleteReplication>
2989 </Rule>
2990</ReplicationConfiguration>"#;
2991
2992 let config = parse_replication_configuration_xml(body).expect("parse replication xml");
2993 assert_eq!(config.role, "arn:rustfs:replication:us-east-1:123:test");
2994 assert_eq!(config.rules.len(), 1);
2995 assert_eq!(config.rules[0].id, "rule-1");
2996 assert_eq!(config.rules[0].prefix.as_deref(), Some("logs/"));
2997 assert_eq!(config.rules[0].delete_replication, Some(true));
2998 assert_eq!(config.rules[0].delete_marker_replication, Some(false));
2999 assert_eq!(config.rules[0].existing_object_replication, Some(true));
3000 }
3001
3002 #[test]
3003 fn parse_replication_configuration_xml_preserves_tag_filters() {
3004 let body = r#"<?xml version="1.0" encoding="UTF-8"?>
3005<ReplicationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
3006 <Rule>
3007 <Status>Enabled</Status>
3008 <Destination>
3009 <Bucket>arn:rustfs:replication:us-east-1:123:dest</Bucket>
3010 </Destination>
3011 <ID>tagged-rule</ID>
3012 <Priority>2</Priority>
3013 <Filter>
3014 <And>
3015 <Prefix>logs/</Prefix>
3016 <Tag>
3017 <Key>env</Key>
3018 <Value>prod</Value>
3019 </Tag>
3020 <Tag>
3021 <Key>team</Key>
3022 <Value>core</Value>
3023 </Tag>
3024 </And>
3025 </Filter>
3026 </Rule>
3027</ReplicationConfiguration>"#;
3028
3029 let config = parse_replication_configuration_xml(body).expect("parse replication xml");
3030 let rule = &config.rules[0];
3031 assert_eq!(rule.prefix.as_deref(), Some("logs/"));
3032 let tags = rule.tags.as_ref().expect("tag filters");
3033 assert_eq!(tags.get("env").map(String::as_str), Some("prod"));
3034 assert_eq!(tags.get("team").map(String::as_str), Some("core"));
3035 }
3036
3037 #[test]
3038 fn build_replication_configuration_xml_writes_delete_replication() {
3039 let config = ReplicationConfiguration {
3040 role: "arn:rustfs:replication:us-east-1:123:test".to_string(),
3041 rules: vec![rc_core::ReplicationRule {
3042 id: "rule-1".to_string(),
3043 priority: 1,
3044 status: rc_core::ReplicationRuleStatus::Enabled,
3045 prefix: Some("logs/".to_string()),
3046 tags: None,
3047 destination: rc_core::ReplicationDestination {
3048 bucket_arn: "arn:rustfs:replication:us-east-1:123:dest".to_string(),
3049 storage_class: Some("STANDARD".to_string()),
3050 },
3051 delete_marker_replication: Some(true),
3052 existing_object_replication: Some(true),
3053 delete_replication: Some(true),
3054 }],
3055 };
3056
3057 let xml = build_replication_configuration_xml(&config);
3058 assert!(xml.contains("<DeleteReplication><Status>Enabled</Status></DeleteReplication>"));
3059 assert!(xml.contains(
3060 "<ExistingObjectReplication><Status>Enabled</Status></ExistingObjectReplication>"
3061 ));
3062 assert!(xml.contains(
3063 "<DeleteMarkerReplication><Status>Enabled</Status></DeleteMarkerReplication>"
3064 ));
3065 assert!(xml.contains("<Filter><Prefix>logs/</Prefix></Filter>"));
3066 }
3067
3068 #[test]
3069 fn build_replication_configuration_xml_writes_and_tag_filters() {
3070 let mut tags = HashMap::new();
3071 tags.insert("env".to_string(), "prod".to_string());
3072 tags.insert("team".to_string(), "core".to_string());
3073
3074 let config = ReplicationConfiguration {
3075 role: String::new(),
3076 rules: vec![rc_core::ReplicationRule {
3077 id: "rule-1".to_string(),
3078 priority: 1,
3079 status: rc_core::ReplicationRuleStatus::Enabled,
3080 prefix: Some("logs/".to_string()),
3081 tags: Some(tags),
3082 destination: rc_core::ReplicationDestination {
3083 bucket_arn: "arn:rustfs:replication:us-east-1:123:dest".to_string(),
3084 storage_class: None,
3085 },
3086 delete_marker_replication: None,
3087 existing_object_replication: None,
3088 delete_replication: None,
3089 }],
3090 };
3091
3092 let xml = build_replication_configuration_xml(&config);
3093 assert!(xml.contains("<Filter><And><Prefix>logs/</Prefix>"));
3094 assert!(xml.contains("<Tag><Key>env</Key><Value>prod</Value></Tag>"));
3095 assert!(xml.contains("<Tag><Key>team</Key><Value>core</Value></Tag>"));
3096 }
3097
3098 #[test]
3099 fn build_lifecycle_rule_filter_preserves_prefix_and_tags() {
3100 let mut tags = HashMap::new();
3101 tags.insert("env".to_string(), "prod".to_string());
3102 tags.insert("team".to_string(), "core".to_string());
3103
3104 let filter = build_lifecycle_rule_filter(Some("logs/"), Some(&tags))
3105 .expect("build lifecycle filter")
3106 .expect("lifecycle filter");
3107
3108 assert_eq!(
3109 parse_lifecycle_filter_prefix(Some(&filter)).as_deref(),
3110 Some("logs/")
3111 );
3112 let parsed_tags = parse_lifecycle_filter_tags(Some(&filter)).expect("parsed tags");
3113 assert_eq!(parsed_tags.get("env").map(String::as_str), Some("prod"));
3114 assert_eq!(parsed_tags.get("team").map(String::as_str), Some("core"));
3115 }
3116
3117 #[test]
3118 fn bucket_policy_error_kind_uses_error_code() {
3119 assert_eq!(
3120 S3Client::bucket_policy_error_kind(Some("NoSuchBucketPolicy"), Some(404), ""),
3121 BucketPolicyErrorKind::MissingPolicy
3122 );
3123 assert_eq!(
3124 S3Client::bucket_policy_error_kind(Some("NoSuchBucket"), Some(404), ""),
3125 BucketPolicyErrorKind::MissingBucket
3126 );
3127 }
3128
3129 #[test]
3130 fn bucket_policy_error_kind_prefers_bucket_not_found_over_404_fallback() {
3131 assert_eq!(
3132 S3Client::bucket_policy_error_kind(None, Some(404), "NoSuchBucket"),
3133 BucketPolicyErrorKind::MissingBucket
3134 );
3135 assert_eq!(
3136 S3Client::bucket_policy_error_kind(None, Some(404), "no details"),
3137 BucketPolicyErrorKind::MissingPolicy
3138 );
3139 }
3140
3141 #[test]
3142 fn bucket_policy_error_mapping_returns_expected_result() {
3143 let get_missing_policy = S3Client::map_get_bucket_policy_error(
3144 "bucket",
3145 BucketPolicyErrorKind::MissingPolicy,
3146 "NoSuchPolicy",
3147 )
3148 .expect("missing policy should map to Ok(None)");
3149 assert!(get_missing_policy.is_none());
3150
3151 match S3Client::map_get_bucket_policy_error(
3152 "bucket",
3153 BucketPolicyErrorKind::MissingBucket,
3154 "NoSuchBucket",
3155 ) {
3156 Err(Error::NotFound(message)) => assert!(message.contains("Bucket not found")),
3157 other => panic!("Expected NotFound for missing bucket, got: {:?}", other),
3158 }
3159
3160 let delete_missing_policy = S3Client::map_delete_bucket_policy_error(
3161 "bucket",
3162 BucketPolicyErrorKind::MissingPolicy,
3163 "NoSuchPolicy",
3164 );
3165 assert!(
3166 delete_missing_policy.is_ok(),
3167 "Missing policy should be treated as successful delete"
3168 );
3169 }
3170
3171 #[test]
3172 fn notification_filter_round_trip_prefix_and_suffix() {
3173 let filter = S3Client::build_notification_filter(Some("logs/"), Some(".json"))
3174 .expect("filter should be built");
3175 let (prefix, suffix) = S3Client::extract_notification_filter(Some(&filter));
3176 assert_eq!(prefix.as_deref(), Some("logs/"));
3177 assert_eq!(suffix.as_deref(), Some(".json"));
3178 }
3179
3180 #[test]
3181 fn notification_filter_none_when_empty() {
3182 assert!(S3Client::build_notification_filter(None, None).is_none());
3183 }
3184
3185 #[test]
3186 fn notifications_equivalent_ignores_order_and_duplicate_events() {
3187 let expected = vec![
3188 BucketNotification {
3189 id: Some("a".to_string()),
3190 target: NotificationTarget::Queue,
3191 arn: "arn:aws:sqs:us-east-1:123456789012:q".to_string(),
3192 events: vec![
3193 "s3:ObjectCreated:*".to_string(),
3194 "s3:ObjectCreated:*".to_string(),
3195 ],
3196 prefix: Some("images/".to_string()),
3197 suffix: Some(".jpg".to_string()),
3198 },
3199 BucketNotification {
3200 id: Some("b".to_string()),
3201 target: NotificationTarget::Topic,
3202 arn: "arn:aws:sns:us-east-1:123456789012:t".to_string(),
3203 events: vec!["s3:ObjectRemoved:*".to_string()],
3204 prefix: None,
3205 suffix: None,
3206 },
3207 ];
3208
3209 let actual = vec![
3210 BucketNotification {
3211 id: None,
3212 target: NotificationTarget::Topic,
3213 arn: "arn:aws:sns:us-east-1:123456789012:t".to_string(),
3214 events: vec!["s3:ObjectRemoved:*".to_string()],
3215 prefix: None,
3216 suffix: None,
3217 },
3218 BucketNotification {
3219 id: None,
3220 target: NotificationTarget::Queue,
3221 arn: "arn:aws:sqs:us-east-1:123456789012:q".to_string(),
3222 events: vec!["s3:ObjectCreated:*".to_string()],
3223 prefix: Some("images/".to_string()),
3224 suffix: Some(".jpg".to_string()),
3225 },
3226 ];
3227
3228 assert!(S3Client::notifications_equivalent(&expected, &actual));
3229 }
3230
3231 #[test]
3232 fn sdk_cors_rule_to_core_preserves_optional_fields() {
3233 let sdk_rule = aws_sdk_s3::types::CorsRule::builder()
3234 .id("web-app")
3235 .allowed_origins("https://app.example.com")
3236 .allowed_methods("get")
3237 .allowed_headers("Authorization")
3238 .expose_headers("ETag")
3239 .max_age_seconds(300)
3240 .build()
3241 .expect("build cors rule");
3242
3243 let rule = sdk_cors_rule_to_core(&sdk_rule);
3244 assert_eq!(rule.id.as_deref(), Some("web-app"));
3245 assert_eq!(
3246 rule.allowed_origins,
3247 vec!["https://app.example.com".to_string()]
3248 );
3249 assert_eq!(rule.allowed_methods, vec!["get".to_string()]);
3250 assert_eq!(
3251 rule.allowed_headers,
3252 Some(vec!["Authorization".to_string()])
3253 );
3254 assert_eq!(rule.expose_headers, Some(vec!["ETag".to_string()]));
3255 assert_eq!(rule.max_age_seconds, Some(300));
3256 }
3257
3258 #[test]
3259 fn sdk_cors_rule_to_core_drops_empty_optional_headers() {
3260 let sdk_rule = aws_sdk_s3::types::CorsRule::builder()
3261 .allowed_origins("https://app.example.com")
3262 .allowed_methods("GET")
3263 .build()
3264 .expect("build cors rule");
3265
3266 let rule = sdk_cors_rule_to_core(&sdk_rule);
3267 assert_eq!(rule.allowed_headers, None);
3268 assert_eq!(rule.expose_headers, None);
3269 }
3270
3271 #[test]
3272 fn core_cors_rule_to_sdk_normalizes_method_case() {
3273 let rule = CorsRule {
3274 id: Some("public-read".to_string()),
3275 allowed_origins: vec!["*".to_string()],
3276 allowed_methods: vec!["get".to_string(), "post".to_string()],
3277 allowed_headers: Some(vec!["*".to_string()]),
3278 expose_headers: None,
3279 max_age_seconds: Some(600),
3280 };
3281
3282 let sdk_rule = core_cors_rule_to_sdk(&rule).expect("convert cors rule");
3283 assert_eq!(sdk_rule.id(), Some("public-read"));
3284 assert_eq!(sdk_rule.allowed_origins(), ["*"]);
3285 assert_eq!(sdk_rule.allowed_methods(), ["GET", "POST"]);
3286 assert_eq!(sdk_rule.allowed_headers(), ["*"]);
3287 assert_eq!(sdk_rule.max_age_seconds(), Some(600));
3288 }
3289
3290 #[tokio::test]
3291 async fn set_bucket_cors_sends_rule_fields() {
3292 let response = http::Response::builder()
3293 .status(200)
3294 .body(SdkBody::from(""))
3295 .expect("build put bucket cors response");
3296 let (client, request_receiver) = test_s3_client(Some(response));
3297
3298 client
3299 .set_bucket_cors(
3300 "bucket",
3301 vec![CorsRule {
3302 id: Some("web-app".to_string()),
3303 allowed_origins: vec!["https://app.example.com".to_string()],
3304 allowed_methods: vec!["GET".to_string(), "POST".to_string()],
3305 allowed_headers: Some(vec!["Authorization".to_string()]),
3306 expose_headers: Some(vec!["ETag".to_string()]),
3307 max_age_seconds: Some(600),
3308 }],
3309 )
3310 .await
3311 .expect("set bucket cors");
3312
3313 let request = request_receiver.expect_request();
3314 assert_eq!(request.method(), http::Method::PUT);
3315 assert!(
3316 request.uri().to_string().contains("?cors"),
3317 "expected bucket CORS subresource in URI: {}",
3318 request.uri()
3319 );
3320
3321 let body = request.body().bytes().expect("request body bytes");
3322 let body = std::str::from_utf8(body).expect("request body is utf8");
3323 assert!(body.contains("<ID>web-app</ID>"));
3324 assert!(body.contains("<AllowedOrigin>https://app.example.com</AllowedOrigin>"));
3325 assert!(body.contains("<AllowedMethod>GET</AllowedMethod>"));
3326 assert!(body.contains("<AllowedMethod>POST</AllowedMethod>"));
3327 assert!(body.contains("<AllowedHeader>Authorization</AllowedHeader>"));
3328 assert!(body.contains("<ExposeHeader>ETag</ExposeHeader>"));
3329 assert!(body.contains("<MaxAgeSeconds>600</MaxAgeSeconds>"));
3330 }
3331
3332 #[test]
3333 fn core_cors_rule_to_sdk_drops_empty_optional_headers() {
3334 let rule = CorsRule {
3335 id: None,
3336 allowed_origins: vec!["https://app.example.com".to_string()],
3337 allowed_methods: vec!["GET".to_string()],
3338 allowed_headers: Some(Vec::new()),
3339 expose_headers: Some(Vec::new()),
3340 max_age_seconds: None,
3341 };
3342
3343 let sdk_rule = core_cors_rule_to_sdk(&rule).expect("convert cors rule");
3344 assert!(sdk_rule.allowed_headers().is_empty());
3345 assert!(sdk_rule.expose_headers().is_empty());
3346 }
3347
3348 #[test]
3349 fn parse_cors_configuration_xml_round_trips_rule_fields() {
3350 let body = r#"
3351<CORSConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
3352 <CORSRule>
3353 <ID>mc-rule</ID>
3354 <AllowedOrigin>https://console.example.com</AllowedOrigin>
3355 <AllowedMethod>GET</AllowedMethod>
3356 <AllowedMethod>POST</AllowedMethod>
3357 <AllowedHeader>*</AllowedHeader>
3358 <ExposeHeader>ETag</ExposeHeader>
3359 <MaxAgeSeconds>1200</MaxAgeSeconds>
3360 </CORSRule>
3361</CORSConfiguration>
3362"#;
3363
3364 let rules = parse_cors_configuration_xml(body).expect("parse cors xml");
3365 assert_eq!(rules.len(), 1);
3366 assert_eq!(rules[0].id.as_deref(), Some("mc-rule"));
3367 assert_eq!(
3368 rules[0].allowed_origins,
3369 vec!["https://console.example.com".to_string()]
3370 );
3371 assert_eq!(
3372 rules[0].allowed_methods,
3373 vec!["GET".to_string(), "POST".to_string()]
3374 );
3375 assert_eq!(rules[0].allowed_headers, Some(vec!["*".to_string()]));
3376 assert_eq!(rules[0].expose_headers, Some(vec!["ETag".to_string()]));
3377 assert_eq!(rules[0].max_age_seconds, Some(1200));
3378 }
3379
3380 #[test]
3381 fn cors_url_uses_path_style_bucket_and_query() {
3382 let (client, _) = test_s3_client(None);
3383
3384 let url = client.cors_url("bucket-name").expect("build cors url");
3385
3386 assert_eq!(url.as_str(), "https://example.com/bucket-name?cors=");
3387 }
3388
3389 #[test]
3390 fn cors_url_rejects_endpoints_without_path_segments() {
3391 let (client, _) = test_s3_client_with_endpoint("mailto:test@example.com", None);
3392
3393 match client.cors_url("bucket-name") {
3394 Err(Error::Network(message)) => {
3395 assert!(message.contains("does not support path-style bucket operations"));
3396 }
3397 other => panic!("expected path-style endpoint error, got {other:?}"),
3398 }
3399 }
3400
3401 #[test]
3402 fn missing_cors_configuration_errors_are_detected() {
3403 assert!(is_missing_cors_configuration_error(
3404 "NoSuchCORSConfiguration"
3405 ));
3406 assert!(is_missing_cors_configuration_error(
3407 "The CORS configuration does not exist"
3408 ));
3409 assert!(!is_missing_cors_configuration_error("AccessDenied"));
3410 }
3411
3412 #[test]
3413 fn missing_cors_configuration_response_detects_code_and_status() {
3414 assert!(is_missing_cors_configuration_response(
3415 Some("NoSuchCORSConfiguration"),
3416 Some(404),
3417 "service error"
3418 ));
3419 assert!(is_missing_cors_configuration_response(
3420 None,
3421 Some(404),
3422 "The CORS configuration does not exist"
3423 ));
3424 assert!(!is_missing_cors_configuration_response(
3425 Some("AccessDenied"),
3426 Some(403),
3427 "access denied"
3428 ));
3429 assert!(!is_missing_cors_configuration_response(
3430 None,
3431 Some(404),
3432 "service error"
3433 ));
3434 assert!(!is_missing_cors_configuration_response(
3435 Some("NoSuchBucket"),
3436 Some(404),
3437 "NoSuchBucket"
3438 ));
3439 assert!(is_missing_cors_configuration_response(
3440 None,
3441 None,
3442 "The CORS configuration does not exist"
3443 ));
3444 assert!(!is_missing_cors_configuration_response(
3445 None,
3446 Some(500),
3447 "The CORS configuration does not exist"
3448 ));
3449 }
3450
3451 #[tokio::test]
3452 async fn get_bucket_cors_missing_configuration_returns_empty_rules() {
3453 let response = http::Response::builder()
3454 .status(404)
3455 .header("x-amz-error-code", "NoSuchCORSConfiguration")
3456 .body(SdkBody::from(
3457 r#"<?xml version="1.0" encoding="UTF-8"?>
3458<Error>
3459 <Code>NoSuchCORSConfiguration</Code>
3460 <Message>The CORS configuration does not exist</Message>
3461</Error>"#,
3462 ))
3463 .expect("build missing cors response");
3464 let (client, _) = test_s3_client(Some(response));
3465
3466 let rules = client
3467 .get_bucket_cors("bucket")
3468 .await
3469 .expect("missing cors config should be treated as empty");
3470
3471 assert!(rules.is_empty());
3472 }
3473
3474 #[tokio::test]
3475 async fn delete_bucket_cors_missing_configuration_is_successful() {
3476 let response = http::Response::builder()
3477 .status(404)
3478 .header("x-amz-error-code", "NoSuchCORSConfiguration")
3479 .body(SdkBody::from(
3480 r#"<?xml version="1.0" encoding="UTF-8"?>
3481<Error>
3482 <Code>NoSuchCORSConfiguration</Code>
3483 <Message>The CORS configuration does not exist</Message>
3484</Error>"#,
3485 ))
3486 .expect("build missing cors response");
3487 let (client, _) = test_s3_client(Some(response));
3488
3489 client
3490 .delete_bucket_cors("bucket")
3491 .await
3492 .expect("missing cors config should be treated as successful delete");
3493 }
3494
3495 #[tokio::test]
3496 async fn reqwest_connector_insecure_without_ca_bundle_succeeds() {
3497 let connector = ReqwestConnector::new(true, None).await;
3499 assert!(
3500 connector.is_ok(),
3501 "Expected insecure connector creation to succeed"
3502 );
3503 }
3504
3505 #[tokio::test]
3506 async fn reqwest_connector_invalid_ca_bundle_path_surfaces_error() {
3507 let result = ReqwestConnector::new(false, Some("")).await;
3509 match result {
3510 Err(Error::Network(msg)) => {
3511 assert!(
3512 msg.contains("Failed to read CA bundle"),
3513 "Unexpected error message: {msg}"
3514 );
3515 }
3516 other => panic!("Expected Error::Network for invalid path, got: {:?}", other),
3517 }
3518 }
3519
3520 #[test]
3521 fn should_use_multipart_for_large_files() {
3522 assert!(S3Client::should_use_multipart(
3523 SINGLE_PUT_OBJECT_MAX_SIZE + 1
3524 ));
3525 }
3526
3527 #[test]
3528 fn should_use_single_part_for_small_files() {
3529 assert!(!S3Client::should_use_multipart(0));
3530 assert!(!S3Client::should_use_multipart(1024 * 1024));
3531 assert!(!S3Client::should_use_multipart(
3532 crate::multipart::DEFAULT_PART_SIZE
3533 ));
3534 assert!(!S3Client::should_use_multipart(SINGLE_PUT_OBJECT_MAX_SIZE));
3535 }
3536
3537 #[tokio::test]
3538 async fn delete_object_with_force_delete_sets_rustfs_header() {
3539 let (client, request_receiver) = test_s3_client(None);
3540 let path = RemotePath::new("test", "bucket", "key.txt");
3541
3542 let _ = client
3543 .delete_object_with_options(&path, DeleteRequestOptions { force_delete: true })
3544 .await;
3545
3546 let request = request_receiver.expect_request();
3547 assert_eq!(request.headers().get("x-rustfs-force-delete"), Some("true"));
3548 }
3549
3550 #[tokio::test]
3551 async fn custom_headers_are_added_before_sending_sdk_requests() {
3552 let (client, request_receiver) = test_s3_client_with_endpoint_and_headers(
3553 "https://example.com",
3554 None,
3555 vec![RequestHeader {
3556 name: "x-amz-bucket-encrypt-enabled".to_string(),
3557 value: "1".to_string(),
3558 }],
3559 );
3560 let path = RemotePath::new("test", "bucket", "key.txt");
3561
3562 let _ = client.delete_object(&path).await;
3563
3564 let request = request_receiver.expect_request();
3565 assert_eq!(
3566 request.headers().get("x-amz-bucket-encrypt-enabled"),
3567 Some("1")
3568 );
3569 assert!(
3570 request
3571 .headers()
3572 .get("authorization")
3573 .expect("authorization header")
3574 .contains("x-amz-bucket-encrypt-enabled")
3575 );
3576 }
3577
3578 #[tokio::test]
3579 async fn custom_headers_are_not_required_by_presigned_urls() {
3580 let (client, _request_receiver) = test_s3_client_with_endpoint_and_headers(
3581 "https://example.com",
3582 None,
3583 vec![RequestHeader {
3584 name: "x-amz-bucket-encrypt-enabled".to_string(),
3585 value: "1".to_string(),
3586 }],
3587 );
3588 let path = RemotePath::new("test", "bucket", "key.txt");
3589
3590 let url = client
3591 .presign_get(&path, 3600)
3592 .await
3593 .expect("presign get should succeed");
3594
3595 assert!(!url.contains("x-amz-bucket-encrypt-enabled"));
3596 }
3597
3598 #[tokio::test]
3599 async fn delete_object_without_force_delete_omits_rustfs_header() {
3600 let (client, request_receiver) = test_s3_client(None);
3601 let path = RemotePath::new("test", "bucket", "key.txt");
3602
3603 let _ = client
3604 .delete_object_with_options(&path, DeleteRequestOptions::default())
3605 .await;
3606
3607 let request = request_receiver.expect_request();
3608 assert!(request.headers().get("x-rustfs-force-delete").is_none());
3609 }
3610
3611 #[tokio::test]
3612 async fn delete_object_wrapper_uses_default_options_without_rustfs_header() {
3613 let (client, request_receiver) = test_s3_client(None);
3614 let path = RemotePath::new("test", "bucket", "key.txt");
3615
3616 let _ = ObjectStore::delete_object(&client, &path).await;
3617
3618 let request = request_receiver.expect_request();
3619 assert!(request.headers().get("x-rustfs-force-delete").is_none());
3620 }
3621
3622 #[tokio::test]
3623 async fn delete_object_with_options_maps_missing_keys_to_not_found() {
3624 let response = http::Response::builder()
3625 .status(404)
3626 .header("x-amz-error-code", "NoSuchKey")
3627 .body(SdkBody::from(
3628 r#"<?xml version="1.0" encoding="UTF-8"?>
3629<Error>
3630 <Code>NoSuchKey</Code>
3631 <Message>The specified key does not exist.</Message>
3632</Error>"#,
3633 ))
3634 .expect("build delete object response");
3635 let (client, _request_receiver) = test_s3_client(Some(response));
3636 let path = RemotePath::new("test", "bucket", "missing.txt");
3637
3638 let result = client
3639 .delete_object_with_options(&path, DeleteRequestOptions::default())
3640 .await;
3641
3642 match result {
3643 Err(Error::NotFound(message)) => assert_eq!(message, path.to_string()),
3644 other => panic!("Expected NotFound for missing key, got: {other:?}"),
3645 }
3646 }
3647
3648 #[tokio::test]
3649 async fn delete_object_with_options_maps_other_failures_to_network() {
3650 let response = http::Response::builder()
3651 .status(500)
3652 .header("x-amz-error-code", "InternalError")
3653 .body(SdkBody::from(
3654 r#"<?xml version="1.0" encoding="UTF-8"?>
3655<Error>
3656 <Code>InternalError</Code>
3657 <Message>Something went wrong.</Message>
3658</Error>"#,
3659 ))
3660 .expect("build delete object response");
3661 let (client, _request_receiver) = test_s3_client(Some(response));
3662 let path = RemotePath::new("test", "bucket", "key.txt");
3663
3664 let result = client
3665 .delete_object_with_options(&path, DeleteRequestOptions::default())
3666 .await;
3667
3668 match result {
3669 Err(Error::Network(message)) => assert!(message.contains("InternalError")),
3670 other => panic!("Expected Network for delete failure, got: {other:?}"),
3671 }
3672 }
3673
3674 #[tokio::test]
3675 async fn list_object_versions_page_preserves_markers_and_delete_markers() {
3676 let response = http::Response::builder()
3677 .status(200)
3678 .body(SdkBody::from(
3679 r#"<?xml version="1.0" encoding="UTF-8"?>
3680<ListVersionsResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
3681 <Name>bucket</Name>
3682 <Prefix>logs/</Prefix>
3683 <KeyMarker></KeyMarker>
3684 <VersionIdMarker></VersionIdMarker>
3685 <NextKeyMarker>logs/c.txt</NextKeyMarker>
3686 <NextVersionIdMarker>v3</NextVersionIdMarker>
3687 <MaxKeys>25</MaxKeys>
3688 <IsTruncated>true</IsTruncated>
3689 <Version>
3690 <Key>logs/a.txt</Key>
3691 <VersionId>v1</VersionId>
3692 <IsLatest>true</IsLatest>
3693 <LastModified>2026-04-29T11:22:33.000Z</LastModified>
3694 <ETag>"etag-a"</ETag>
3695 <Size>12</Size>
3696 <StorageClass>STANDARD</StorageClass>
3697 </Version>
3698 <DeleteMarker>
3699 <Key>logs/b.txt</Key>
3700 <VersionId>v2</VersionId>
3701 <IsLatest>false</IsLatest>
3702 <LastModified>2026-04-28T10:20:30.000Z</LastModified>
3703 <Owner>
3704 <ID>owner</ID>
3705 </Owner>
3706 </DeleteMarker>
3707</ListVersionsResult>"#,
3708 ))
3709 .expect("build list object versions response");
3710 let (client, request_receiver) = test_s3_client(Some(response));
3711 let path = RemotePath::new("test", "bucket", "logs/");
3712
3713 let result = client
3714 .list_object_versions_page(&path, Some(25))
3715 .await
3716 .expect("list object versions page");
3717
3718 let request = request_receiver.expect_request();
3719 let uri = request.uri().to_string();
3720 assert!(
3721 uri.starts_with("https://example.com/bucket/?"),
3722 "unexpected URI: {uri}"
3723 );
3724 assert!(
3725 uri.contains("versions"),
3726 "expected versions subresource: {uri}"
3727 );
3728 assert!(
3729 uri.contains("prefix=logs%2F"),
3730 "expected prefix query: {uri}"
3731 );
3732 assert!(
3733 uri.contains("max-keys=25"),
3734 "expected max-keys query: {uri}"
3735 );
3736
3737 assert!(result.truncated);
3738 assert_eq!(result.continuation_token.as_deref(), Some("logs/c.txt"));
3739 assert_eq!(result.version_id_marker.as_deref(), Some("v3"));
3740 assert_eq!(result.items.len(), 2);
3741
3742 let version = &result.items[0];
3743 assert_eq!(version.key, "logs/a.txt");
3744 assert_eq!(version.version_id, "v1");
3745 assert!(version.is_latest);
3746 assert!(!version.is_delete_marker);
3747 assert_eq!(version.size_bytes, Some(12));
3748 assert_eq!(version.etag.as_deref(), Some("etag-a"));
3749
3750 let delete_marker = &result.items[1];
3751 assert_eq!(delete_marker.key, "logs/b.txt");
3752 assert_eq!(delete_marker.version_id, "v2");
3753 assert!(!delete_marker.is_latest);
3754 assert!(delete_marker.is_delete_marker);
3755 assert_eq!(delete_marker.size_bytes, None);
3756 assert_eq!(delete_marker.etag, None);
3757 }
3758
3759 #[tokio::test]
3760 async fn list_object_versions_page_maps_missing_bucket_to_not_found() {
3761 let response = http::Response::builder()
3762 .status(404)
3763 .header("x-amz-error-code", "NoSuchBucket")
3764 .body(SdkBody::from(
3765 r#"<?xml version="1.0" encoding="UTF-8"?>
3766<Error>
3767 <Code>NoSuchBucket</Code>
3768 <Message>The specified bucket does not exist.</Message>
3769</Error>"#,
3770 ))
3771 .expect("build missing bucket response");
3772 let (client, _request_receiver) = test_s3_client(Some(response));
3773 let path = RemotePath::new("test", "missing-bucket", "");
3774
3775 let result = client.list_object_versions_page(&path, Some(1000)).await;
3776
3777 match result {
3778 Err(Error::NotFound(message)) => {
3779 assert_eq!(message, "Bucket not found: missing-bucket")
3780 }
3781 other => panic!("Expected NotFound for missing bucket, got: {other:?}"),
3782 }
3783 }
3784
3785 #[tokio::test]
3786 async fn list_object_versions_page_maps_not_found_code_to_not_found() {
3787 let response = http::Response::builder()
3788 .status(404)
3789 .header("x-amz-error-code", "NotFound")
3790 .body(SdkBody::from(
3791 r#"<?xml version="1.0" encoding="UTF-8"?>
3792<Error>
3793 <Code>NotFound</Code>
3794 <Message>The specified bucket does not exist.</Message>
3795</Error>"#,
3796 ))
3797 .expect("build not found bucket response");
3798 let (client, _request_receiver) = test_s3_client(Some(response));
3799 let path = RemotePath::new("test", "missing-bucket", "");
3800
3801 let result = client.list_object_versions_page(&path, Some(1000)).await;
3802
3803 match result {
3804 Err(Error::NotFound(message)) => {
3805 assert_eq!(message, "Bucket not found: missing-bucket")
3806 }
3807 other => panic!("Expected NotFound for NotFound list versions error, got: {other:?}"),
3808 }
3809 }
3810
3811 #[tokio::test]
3812 async fn list_object_versions_page_maps_other_failures_to_network() {
3813 let response = http::Response::builder()
3814 .status(500)
3815 .header("x-amz-error-code", "InternalError")
3816 .body(SdkBody::from(
3817 r#"<?xml version="1.0" encoding="UTF-8"?>
3818<Error>
3819 <Code>InternalError</Code>
3820 <Message>Something went wrong.</Message>
3821</Error>"#,
3822 ))
3823 .expect("build internal error response");
3824 let (client, _request_receiver) = test_s3_client(Some(response));
3825 let path = RemotePath::new("test", "bucket", "");
3826
3827 let result = client.list_object_versions_page(&path, Some(1000)).await;
3828
3829 match result {
3830 Err(Error::Network(message)) => assert!(message.contains("InternalError")),
3831 other => panic!("Expected Network for list versions failure, got: {other:?}"),
3832 }
3833 }
3834
3835 #[tokio::test]
3836 async fn delete_bucket_maps_bucket_not_empty_to_conflict() {
3837 let response = http::Response::builder()
3838 .status(409)
3839 .header("x-amz-error-code", "BucketNotEmpty")
3840 .body(SdkBody::from(
3841 r#"<?xml version="1.0" encoding="UTF-8"?>
3842<Error>
3843 <Code>BucketNotEmpty</Code>
3844 <Message>The bucket you tried to delete is not empty.</Message>
3845</Error>"#,
3846 ))
3847 .expect("build delete bucket response");
3848 let (client, _request_receiver) = test_s3_client(Some(response));
3849
3850 let result = client.delete_bucket("bucket").await;
3851
3852 match result {
3853 Err(Error::Conflict(message)) => assert!(message.contains("BucketNotEmpty")),
3854 other => panic!("Expected Conflict for non-empty bucket, got: {other:?}"),
3855 }
3856 }
3857
3858 #[tokio::test]
3859 async fn delete_bucket_maps_missing_bucket_to_not_found() {
3860 let response = http::Response::builder()
3861 .status(404)
3862 .header("x-amz-error-code", "NoSuchBucket")
3863 .body(SdkBody::from(
3864 r#"<?xml version="1.0" encoding="UTF-8"?>
3865<Error>
3866 <Code>NoSuchBucket</Code>
3867 <Message>The specified bucket does not exist.</Message>
3868</Error>"#,
3869 ))
3870 .expect("build missing bucket response");
3871 let (client, _request_receiver) = test_s3_client(Some(response));
3872
3873 let result = client.delete_bucket("missing-bucket").await;
3874
3875 match result {
3876 Err(Error::NotFound(message)) => {
3877 assert_eq!(message, "Bucket not found: missing-bucket")
3878 }
3879 other => panic!("Expected NotFound for missing bucket, got: {other:?}"),
3880 }
3881 }
3882
3883 #[tokio::test]
3884 async fn delete_bucket_maps_not_found_code_to_not_found() {
3885 let response = http::Response::builder()
3886 .status(404)
3887 .header("x-amz-error-code", "NotFound")
3888 .body(SdkBody::from(
3889 r#"<?xml version="1.0" encoding="UTF-8"?>
3890<Error>
3891 <Code>NotFound</Code>
3892 <Message>The specified bucket does not exist.</Message>
3893</Error>"#,
3894 ))
3895 .expect("build not found bucket response");
3896 let (client, _request_receiver) = test_s3_client(Some(response));
3897
3898 let result = client.delete_bucket("missing-bucket").await;
3899
3900 match result {
3901 Err(Error::NotFound(message)) => {
3902 assert_eq!(message, "Bucket not found: missing-bucket")
3903 }
3904 other => panic!("Expected NotFound for NotFound delete bucket error, got: {other:?}"),
3905 }
3906 }
3907
3908 #[tokio::test]
3909 async fn delete_bucket_maps_other_failures_to_network() {
3910 let response = http::Response::builder()
3911 .status(500)
3912 .header("x-amz-error-code", "InternalError")
3913 .body(SdkBody::from(
3914 r#"<?xml version="1.0" encoding="UTF-8"?>
3915<Error>
3916 <Code>InternalError</Code>
3917 <Message>Something went wrong.</Message>
3918</Error>"#,
3919 ))
3920 .expect("build delete bucket response");
3921 let (client, _request_receiver) = test_s3_client(Some(response));
3922
3923 let result = client.delete_bucket("bucket").await;
3924
3925 match result {
3926 Err(Error::Network(message)) => assert!(message.contains("InternalError")),
3927 other => panic!("Expected Network for delete bucket failure, got: {other:?}"),
3928 }
3929 }
3930
3931 #[tokio::test]
3932 async fn delete_objects_with_force_delete_sets_rustfs_header() {
3933 let response = http::Response::builder()
3934 .status(200)
3935 .body(SdkBody::from(
3936 r#"<?xml version="1.0" encoding="UTF-8"?>
3937<DeleteResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/" />"#,
3938 ))
3939 .expect("build delete objects response");
3940 let (client, request_receiver) = test_s3_client(Some(response));
3941
3942 let _ = client
3943 .delete_objects_with_options(
3944 "bucket",
3945 vec!["key.txt".to_string()],
3946 DeleteRequestOptions { force_delete: true },
3947 )
3948 .await;
3949
3950 let request = request_receiver.expect_request();
3951 assert_eq!(request.headers().get("x-rustfs-force-delete"), Some("true"));
3952 }
3953
3954 #[tokio::test]
3955 async fn delete_objects_without_force_delete_omits_rustfs_header() {
3956 let response = http::Response::builder()
3957 .status(200)
3958 .body(SdkBody::from(
3959 r#"<?xml version="1.0" encoding="UTF-8"?>
3960<DeleteResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/" />"#,
3961 ))
3962 .expect("build delete objects response");
3963 let (client, request_receiver) = test_s3_client(Some(response));
3964
3965 let _ = client
3966 .delete_objects_with_options(
3967 "bucket",
3968 vec!["key.txt".to_string()],
3969 DeleteRequestOptions::default(),
3970 )
3971 .await;
3972
3973 let request = request_receiver.expect_request();
3974 assert!(request.headers().get("x-rustfs-force-delete").is_none());
3975 }
3976
3977 #[tokio::test]
3978 async fn delete_objects_wrapper_uses_default_options_without_rustfs_header() {
3979 let response = http::Response::builder()
3980 .status(200)
3981 .body(SdkBody::from(
3982 r#"<?xml version="1.0" encoding="UTF-8"?>
3983<DeleteResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/" />"#,
3984 ))
3985 .expect("build delete objects response");
3986 let (client, request_receiver) = test_s3_client(Some(response));
3987
3988 let _ = ObjectStore::delete_objects(&client, "bucket", vec!["key.txt".to_string()]).await;
3989
3990 let request = request_receiver.expect_request();
3991 assert!(request.headers().get("x-rustfs-force-delete").is_none());
3992 }
3993
3994 #[tokio::test]
3995 async fn delete_objects_with_empty_keys_skips_http_request() {
3996 let (client, request_receiver) = test_s3_client(None);
3997
3998 let deleted = client
3999 .delete_objects_with_options("bucket", Vec::new(), DeleteRequestOptions::default())
4000 .await
4001 .expect("empty delete should succeed");
4002
4003 assert!(deleted.is_empty());
4004 request_receiver.expect_no_request();
4005 }
4006
4007 #[tokio::test]
4008 async fn delete_objects_with_partial_errors_returns_deleted_keys() {
4009 let response = http::Response::builder()
4010 .status(200)
4011 .body(SdkBody::from(
4012 r#"<?xml version="1.0" encoding="UTF-8"?>
4013<DeleteResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
4014 <Deleted>
4015 <Key>kept.txt</Key>
4016 </Deleted>
4017 <Error>
4018 <Key>failed.txt</Key>
4019 <Code>AccessDenied</Code>
4020 <Message>Access Denied</Message>
4021 </Error>
4022</DeleteResult>"#,
4023 ))
4024 .expect("build partial delete response");
4025 let (client, request_receiver) = test_s3_client(Some(response));
4026
4027 let deleted = client
4028 .delete_objects_with_options(
4029 "bucket",
4030 vec!["kept.txt".to_string(), "failed.txt".to_string()],
4031 DeleteRequestOptions::default(),
4032 )
4033 .await
4034 .expect("partial delete should still return deleted keys");
4035
4036 let request = request_receiver.expect_request();
4037 assert_eq!(request.uri(), "https://example.com/bucket/?delete");
4038 assert_eq!(deleted, vec!["kept.txt".to_string()]);
4039 }
4040
4041 #[tokio::test]
4042 async fn read_next_part_fills_buffer_until_eof() {
4043 use tokio::io::AsyncWriteExt;
4044
4045 let temp_dir = tempfile::tempdir().expect("create temp dir");
4046 let file_path = temp_dir.path().join("payload.bin");
4047 let mut writer = tokio::fs::File::create(&file_path)
4048 .await
4049 .expect("create temp file");
4050 writer
4051 .write_all(b"abcdefghij")
4052 .await
4053 .expect("write temp file");
4054 writer.flush().await.expect("flush temp file");
4055 drop(writer);
4056
4057 let mut reader = tokio::fs::File::open(&file_path)
4058 .await
4059 .expect("open temp file");
4060 let mut buffer = vec![0u8; 8];
4061
4062 let first = S3Client::read_next_part(&mut reader, &file_path, &mut buffer)
4063 .await
4064 .expect("first read");
4065 assert_eq!(first, 8);
4066 assert_eq!(&buffer[..first], b"abcdefgh");
4067
4068 let second = S3Client::read_next_part(&mut reader, &file_path, &mut buffer)
4069 .await
4070 .expect("second read");
4071 assert_eq!(second, 2);
4072 assert_eq!(&buffer[..second], b"ij");
4073
4074 let third = S3Client::read_next_part(&mut reader, &file_path, &mut buffer)
4075 .await
4076 .expect("third read");
4077 assert_eq!(third, 0);
4078 }
4079}