Skip to main content

rc_s3/
client.rs

1//! S3 client implementation
2//!
3//! Wraps aws-sdk-s3 and implements the ObjectStore trait from rc-core.
4
5use async_trait::async_trait;
6use aws_credential_types::Credentials;
7use aws_sdk_s3::error::ProvideErrorMetadata;
8use aws_sigv4::http_request::{
9    SignableBody, SignableRequest, SignatureLocation, SigningSettings, sign,
10};
11use aws_sigv4::sign::v4;
12use aws_smithy_runtime_api::box_error::BoxError;
13use aws_smithy_runtime_api::client::http::{
14    HttpClient, HttpConnector, HttpConnectorFuture, HttpConnectorSettings, SharedHttpConnector,
15};
16use aws_smithy_runtime_api::client::interceptors::Intercept;
17use aws_smithy_runtime_api::client::interceptors::context::BeforeTransmitInterceptorContextMut;
18use aws_smithy_runtime_api::client::orchestrator::HttpRequest;
19use aws_smithy_runtime_api::client::result::ConnectorError;
20use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
21use aws_smithy_runtime_api::http::{Response, StatusCode};
22use aws_smithy_types::body::SdkBody;
23use aws_smithy_types::config_bag::ConfigBag;
24use bytes::Bytes;
25use jiff::Timestamp;
26use quick_xml::de::from_str as from_xml_str;
27use rc_core::{
28    Alias, BucketEncryption, BucketNotification, Capabilities, CorsRule, Error, LifecycleRule,
29    ListOptions, ListResult, NotificationTarget, ObjectEncryptionRequest, ObjectInfo, ObjectStore,
30    ObjectVersion, ObjectVersionListResult, RemotePath, ReplicationConfiguration, RequestHeader,
31    Result, SelectOptions, global_request_headers,
32};
33use reqwest::Method;
34use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderName, HeaderValue};
35use serde::Deserialize;
36use sha2::{Digest, Sha256};
37use std::collections::HashMap;
38use tokio::io::AsyncReadExt;
39use tokio::io::AsyncWrite;
40
41/// Keep single-part uploads small to avoid backend incompatibilities with
42/// streaming aws-chunked payloads.
43const SINGLE_PUT_OBJECT_MAX_SIZE: u64 = crate::multipart::DEFAULT_PART_SIZE;
44const S3_SERVICE_NAME: &str = "s3";
45const S3_REPLICATION_XML_NAMESPACE: &str = "http://s3.amazonaws.com/doc/2006-03-01/";
46const RUSTFS_FORCE_DELETE_HEADER: &str = "x-rustfs-force-delete";
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49enum BucketPolicyErrorKind {
50    MissingPolicy,
51    MissingBucket,
52    Other,
53}
54
55/// Custom HTTP connector using reqwest, supporting insecure TLS (skip cert verification)
56/// and custom CA bundles. Used when `alias.insecure = true` or `alias.ca_bundle.is_some()`.
57#[derive(Debug, Clone)]
58struct ReqwestConnector {
59    client: reqwest::Client,
60}
61
62impl ReqwestConnector {
63    async fn new(
64        insecure: bool,
65        ca_bundle: Option<&str>,
66        client_cert: Option<&str>,
67        client_key: Option<&str>,
68    ) -> Result<Self> {
69        let client = build_reqwest_client(insecure, ca_bundle, client_cert, client_key).await?;
70        Ok(Self { client })
71    }
72}
73
74async fn build_reqwest_client(
75    insecure: bool,
76    ca_bundle: Option<&str>,
77    client_cert: Option<&str>,
78    client_key: Option<&str>,
79) -> Result<reqwest::Client> {
80    // NOTE: When `insecure = true`, `danger_accept_invalid_certs` disables all TLS
81    // certificate verification. Any CA bundle provided will still be added to the
82    // trust store but is rendered ineffective for this connection.
83    let mut builder = reqwest::Client::builder().danger_accept_invalid_certs(insecure);
84
85    if let Some(bundle_path) = ca_bundle {
86        // Use tokio::fs::read to avoid blocking the async runtime thread.
87        let pem = tokio::fs::read(bundle_path).await.map_err(|e| {
88            Error::Network(format!("Failed to read CA bundle '{bundle_path}': {e}"))
89        })?;
90        let cert = reqwest::Certificate::from_pem(&pem)
91            .map_err(|e| Error::Network(format!("Invalid CA bundle '{bundle_path}': {e}")))?;
92        builder = builder.add_root_certificate(cert);
93    }
94
95    if let (Some(cert_path), Some(key_path)) = (client_cert, client_key) {
96        let mut identity_pem = tokio::fs::read(cert_path).await.map_err(|e| {
97            Error::Network(format!(
98                "Failed to read client certificate '{cert_path}': {e}"
99            ))
100        })?;
101        let key_pem = tokio::fs::read(key_path)
102            .await
103            .map_err(|e| Error::Network(format!("Failed to read client key '{key_path}': {e}")))?;
104        identity_pem.extend_from_slice(b"\n");
105        identity_pem.extend_from_slice(&key_pem);
106        let identity = reqwest::Identity::from_pem(&identity_pem)
107            .map_err(|e| Error::Network(format!("Invalid client certificate/key identity: {e}")))?;
108        builder = builder.use_rustls_tls().identity(identity);
109    }
110
111    let client = builder
112        .build()
113        .map_err(|e| Error::Network(format!("Failed to build HTTP client: {e}")))?;
114    Ok(client)
115}
116
117fn force_path_style_for_alias(alias: &Alias) -> bool {
118    match alias.bucket_lookup.as_str() {
119        "path" => true,
120        "dns" => false,
121        "auto" => !is_aliyun_oss_service_endpoint(&alias.endpoint),
122        _ => true,
123    }
124}
125
126fn is_aliyun_oss_service_endpoint(endpoint: &str) -> bool {
127    let Ok(url) = reqwest::Url::parse(endpoint.trim_end_matches('/')) else {
128        return false;
129    };
130
131    let Some(host) = url.host_str() else {
132        return false;
133    };
134
135    host.strip_suffix(".aliyuncs.com")
136        .and_then(|host| host.split('.').next())
137        .is_some_and(|first_label| first_label.starts_with("oss-"))
138}
139
140#[derive(Debug, Deserialize)]
141#[serde(rename_all = "PascalCase")]
142struct ReplicationConfigurationXml {
143    role: Option<String>,
144    #[serde(rename = "Rule", default)]
145    rules: Vec<ReplicationRuleXml>,
146}
147
148#[derive(Debug, Deserialize)]
149#[serde(rename_all = "PascalCase")]
150struct ReplicationRuleXml {
151    #[serde(rename = "ID")]
152    id: Option<String>,
153    priority: Option<i32>,
154    status: Option<String>,
155    #[serde(rename = "Prefix")]
156    legacy_prefix: Option<String>,
157    filter: Option<ReplicationFilterXml>,
158    destination: Option<ReplicationDestinationXml>,
159    delete_marker_replication: Option<ReplicationStatusXml>,
160    existing_object_replication: Option<ReplicationStatusXml>,
161    delete_replication: Option<ReplicationStatusXml>,
162}
163
164#[derive(Debug, Deserialize)]
165#[serde(rename_all = "PascalCase")]
166struct ReplicationFilterXml {
167    prefix: Option<String>,
168    tag: Option<TagXml>,
169    and: Option<ReplicationAndXml>,
170}
171
172#[derive(Debug, Deserialize)]
173#[serde(rename_all = "PascalCase")]
174struct ReplicationAndXml {
175    prefix: Option<String>,
176    #[serde(rename = "Tag", default)]
177    tags: Vec<TagXml>,
178}
179
180#[derive(Debug, Deserialize)]
181#[serde(rename_all = "PascalCase")]
182struct TagXml {
183    key: Option<String>,
184    value: Option<String>,
185}
186
187#[derive(Debug, Deserialize)]
188#[serde(rename_all = "PascalCase")]
189struct ReplicationDestinationXml {
190    bucket: Option<String>,
191    storage_class: Option<String>,
192}
193
194#[derive(Debug, Deserialize)]
195#[serde(rename_all = "PascalCase")]
196struct ReplicationStatusXml {
197    status: Option<String>,
198}
199
200#[derive(Debug, Deserialize)]
201#[serde(rename_all = "PascalCase")]
202struct CorsConfigurationXml {
203    #[serde(rename = "CORSRule", default)]
204    rules: Vec<CorsRuleXml>,
205}
206
207#[derive(Debug, Deserialize)]
208#[serde(rename_all = "PascalCase")]
209struct CorsRuleXml {
210    #[serde(rename = "ID")]
211    id: Option<String>,
212    #[serde(rename = "AllowedOrigin", default)]
213    allowed_origins: Vec<String>,
214    #[serde(rename = "AllowedMethod", default)]
215    allowed_methods: Vec<String>,
216    #[serde(rename = "AllowedHeader", default)]
217    allowed_headers: Vec<String>,
218    #[serde(rename = "ExposeHeader", default)]
219    expose_headers: Vec<String>,
220    max_age_seconds: Option<i32>,
221}
222
223fn parse_replication_status(status: Option<&ReplicationStatusXml>) -> Option<bool> {
224    status
225        .and_then(|value| value.status.as_deref())
226        .map(|value| value.eq_ignore_ascii_case("enabled"))
227}
228
229fn parse_replication_rule_status(status: Option<&str>) -> rc_core::ReplicationRuleStatus {
230    match status {
231        Some(value) if value.eq_ignore_ascii_case("enabled") => {
232            rc_core::ReplicationRuleStatus::Enabled
233        }
234        _ => rc_core::ReplicationRuleStatus::Disabled,
235    }
236}
237
238fn collect_tag_map<'a, I>(tags: I) -> Option<HashMap<String, String>>
239where
240    I: IntoIterator<Item = (&'a str, &'a str)>,
241{
242    let collected: HashMap<String, String> = tags
243        .into_iter()
244        .map(|(key, value)| (key.to_string(), value.to_string()))
245        .collect();
246    if collected.is_empty() {
247        None
248    } else {
249        Some(collected)
250    }
251}
252
253fn parse_tag_xml(tag: Option<&TagXml>) -> Option<HashMap<String, String>> {
254    collect_tag_map(tag.and_then(|tag| Some((tag.key.as_deref()?, tag.value.as_deref()?))))
255}
256
257fn parse_tag_xmls(tags: &[TagXml]) -> Option<HashMap<String, String>> {
258    collect_tag_map(
259        tags.iter()
260            .filter_map(|tag| Some((tag.key.as_deref()?, tag.value.as_deref()?))),
261    )
262}
263
264fn parse_replication_filter_prefix(filter: Option<&ReplicationFilterXml>) -> Option<String> {
265    filter
266        .and_then(|filter| filter.prefix.clone())
267        .or_else(|| filter.and_then(|filter| filter.and.as_ref()?.prefix.clone()))
268}
269
270fn parse_replication_filter_tags(
271    filter: Option<&ReplicationFilterXml>,
272) -> Option<HashMap<String, String>> {
273    filter
274        .and_then(|filter| parse_tag_xml(filter.tag.as_ref()))
275        .or_else(|| filter.and_then(|filter| parse_tag_xmls(&filter.and.as_ref()?.tags)))
276}
277
278fn sorted_tags(tags: &HashMap<String, String>) -> Vec<(&str, &str)> {
279    let mut pairs: Vec<(&str, &str)> = tags
280        .iter()
281        .map(|(key, value)| (key.as_str(), value.as_str()))
282        .collect();
283    pairs.sort_unstable();
284    pairs
285}
286
287fn append_tag_xml(xml: &mut String, key: &str, value: &str) {
288    xml.push_str("<Tag><Key>");
289    xml.push_str(&xml_escape(key));
290    xml.push_str("</Key><Value>");
291    xml.push_str(&xml_escape(value));
292    xml.push_str("</Value></Tag>");
293}
294
295fn append_replication_filter_xml(
296    xml: &mut String,
297    prefix: Option<&str>,
298    tags: Option<&HashMap<String, String>>,
299) {
300    let Some(tags) = tags.filter(|tags| !tags.is_empty()) else {
301        if let Some(prefix) = prefix {
302            xml.push_str("<Filter><Prefix>");
303            xml.push_str(&xml_escape(prefix));
304            xml.push_str("</Prefix></Filter>");
305        }
306        return;
307    };
308
309    xml.push_str("<Filter>");
310    if prefix.is_some() || tags.len() > 1 {
311        xml.push_str("<And>");
312        if let Some(prefix) = prefix {
313            xml.push_str("<Prefix>");
314            xml.push_str(&xml_escape(prefix));
315            xml.push_str("</Prefix>");
316        }
317        for (key, value) in sorted_tags(tags) {
318            append_tag_xml(xml, key, value);
319        }
320        xml.push_str("</And>");
321    } else if let Some((key, value)) = sorted_tags(tags).into_iter().next() {
322        append_tag_xml(xml, key, value);
323    }
324    xml.push_str("</Filter>");
325}
326
327fn normalize_optional_strings(values: Option<Vec<String>>) -> Option<Vec<String>> {
328    values.filter(|items| !items.is_empty())
329}
330
331fn is_missing_cors_configuration_error(error_text: &str) -> bool {
332    let normalized = error_text.to_ascii_lowercase();
333    normalized.contains("nosuchcorsconfiguration")
334        || normalized.contains("cors configuration does not exist")
335        || normalized.contains("the cors configuration does not exist")
336}
337
338fn is_missing_cors_configuration_response(
339    error_code: Option<&str>,
340    status_code: Option<u16>,
341    error_text: &str,
342) -> bool {
343    let error_code = error_code.map(|code| code.to_ascii_lowercase());
344    if matches!(error_code.as_deref(), Some("nosuchcorsconfiguration")) {
345        return true;
346    }
347
348    if !is_missing_cors_configuration_error(error_text) {
349        return false;
350    }
351
352    status_code.is_none_or(|status| status == 404)
353}
354
355fn sdk_cors_rule_to_core(rule: &aws_sdk_s3::types::CorsRule) -> CorsRule {
356    CorsRule {
357        id: rule.id().map(str::to_string),
358        allowed_origins: rule.allowed_origins().to_vec(),
359        allowed_methods: rule.allowed_methods().to_vec(),
360        allowed_headers: normalize_optional_strings(Some(rule.allowed_headers().to_vec())),
361        expose_headers: normalize_optional_strings(Some(rule.expose_headers().to_vec())),
362        max_age_seconds: rule.max_age_seconds(),
363    }
364}
365
366fn sdk_bucket_encryption_to_core(
367    value: &aws_sdk_s3::types::ServerSideEncryptionByDefault,
368) -> Result<BucketEncryption> {
369    match value.sse_algorithm() {
370        aws_sdk_s3::types::ServerSideEncryption::Aes256 => Ok(BucketEncryption::SseS3),
371        aws_sdk_s3::types::ServerSideEncryption::AwsKms => Ok(BucketEncryption::SseKms {
372            key_id: value.kms_master_key_id().map(ToString::to_string),
373        }),
374        other => Err(Error::General(format!(
375            "unsupported bucket encryption algorithm: {}",
376            other.as_str()
377        ))),
378    }
379}
380
381fn is_missing_bucket_encryption_error(error_text: &str) -> bool {
382    let normalized = error_text.to_ascii_lowercase();
383    normalized.contains("serversideencryptionconfigurationnotfounderror")
384        || normalized.contains("nosuchbucketencryption")
385        || normalized.contains("encryption configuration was not found")
386}
387
388fn is_missing_bucket_encryption_response(
389    error_code: Option<&str>,
390    status_code: Option<u16>,
391    error_text: &str,
392) -> bool {
393    let error_code = error_code.map(|code| code.to_ascii_lowercase());
394    if matches!(
395        error_code.as_deref(),
396        Some("serversideencryptionconfigurationnotfounderror" | "nosuchbucketencryption")
397    ) {
398        return true;
399    }
400
401    if !is_missing_bucket_encryption_error(error_text) {
402        return false;
403    }
404
405    status_code.is_none_or(|status| status == 404)
406}
407
408fn core_bucket_encryption_to_sdk(
409    value: &BucketEncryption,
410) -> aws_sdk_s3::types::ServerSideEncryptionConfiguration {
411    let encryption_by_default = match value {
412        BucketEncryption::SseS3 => aws_sdk_s3::types::ServerSideEncryptionByDefault::builder()
413            .sse_algorithm(aws_sdk_s3::types::ServerSideEncryption::Aes256)
414            .build()
415            .expect("sse-s3 bucket encryption configuration is valid"),
416        BucketEncryption::SseKms { key_id } => {
417            let mut builder = aws_sdk_s3::types::ServerSideEncryptionByDefault::builder()
418                .sse_algorithm(aws_sdk_s3::types::ServerSideEncryption::AwsKms);
419            if let Some(key_id) = key_id {
420                builder = builder.kms_master_key_id(key_id);
421            }
422            builder
423                .build()
424                .expect("sse-kms bucket encryption configuration is valid")
425        }
426    };
427
428    let rule = aws_sdk_s3::types::ServerSideEncryptionRule::builder()
429        .apply_server_side_encryption_by_default(encryption_by_default)
430        .build();
431
432    aws_sdk_s3::types::ServerSideEncryptionConfiguration::builder()
433        .rules(rule)
434        .build()
435        .expect("bucket encryption configuration requires one rule")
436}
437
438fn apply_object_encryption_to_put_request(
439    request: aws_sdk_s3::operation::put_object::builders::PutObjectFluentBuilder,
440    encryption: Option<&ObjectEncryptionRequest>,
441) -> aws_sdk_s3::operation::put_object::builders::PutObjectFluentBuilder {
442    match encryption {
443        Some(ObjectEncryptionRequest::SseS3) => {
444            request.server_side_encryption(aws_sdk_s3::types::ServerSideEncryption::Aes256)
445        }
446        Some(ObjectEncryptionRequest::SseKms { key_id }) => request
447            .server_side_encryption(aws_sdk_s3::types::ServerSideEncryption::AwsKms)
448            .ssekms_key_id(key_id),
449        None => request,
450    }
451}
452
453fn apply_object_encryption_to_copy_request(
454    request: aws_sdk_s3::operation::copy_object::builders::CopyObjectFluentBuilder,
455    encryption: Option<&ObjectEncryptionRequest>,
456) -> aws_sdk_s3::operation::copy_object::builders::CopyObjectFluentBuilder {
457    match encryption {
458        Some(ObjectEncryptionRequest::SseS3) => {
459            request.server_side_encryption(aws_sdk_s3::types::ServerSideEncryption::Aes256)
460        }
461        Some(ObjectEncryptionRequest::SseKms { key_id }) => request
462            .server_side_encryption(aws_sdk_s3::types::ServerSideEncryption::AwsKms)
463            .ssekms_key_id(key_id),
464        None => request,
465    }
466}
467
468fn core_cors_rule_to_sdk(rule: &CorsRule) -> Result<aws_sdk_s3::types::CorsRule> {
469    aws_sdk_s3::types::CorsRule::builder()
470        .set_id(rule.id.clone())
471        .set_allowed_origins(Some(rule.allowed_origins.clone()))
472        .set_allowed_methods(Some(
473            rule.allowed_methods
474                .iter()
475                .map(|method| method.to_ascii_uppercase())
476                .collect(),
477        ))
478        .set_allowed_headers(normalize_optional_strings(rule.allowed_headers.clone()))
479        .set_expose_headers(normalize_optional_strings(rule.expose_headers.clone()))
480        .set_max_age_seconds(rule.max_age_seconds)
481        .build()
482        .map_err(|e| Error::General(format!("build bucket cors rule: {e}")))
483}
484
485fn parse_cors_configuration_xml(body: &str) -> Result<Vec<CorsRule>> {
486    let config: CorsConfigurationXml =
487        from_xml_str(body).map_err(|e| Error::General(format!("parse bucket cors xml: {e}")))?;
488
489    Ok(config
490        .rules
491        .into_iter()
492        .map(|rule| CorsRule {
493            id: rule.id,
494            allowed_origins: rule.allowed_origins,
495            allowed_methods: rule.allowed_methods,
496            allowed_headers: normalize_optional_strings(Some(rule.allowed_headers)),
497            expose_headers: normalize_optional_strings(Some(rule.expose_headers)),
498            max_age_seconds: rule.max_age_seconds,
499        })
500        .collect())
501}
502
503fn parse_replication_configuration_xml(body: &str) -> Result<ReplicationConfiguration> {
504    let config: ReplicationConfigurationXml = from_xml_str(body)
505        .map_err(|e| Error::General(format!("parse replication config xml: {e}")))?;
506
507    let rules = config
508        .rules
509        .into_iter()
510        .map(|rule| rc_core::ReplicationRule {
511            id: rule.id.unwrap_or_default(),
512            priority: rule.priority.unwrap_or_default(),
513            status: parse_replication_rule_status(rule.status.as_deref()),
514            prefix: parse_replication_filter_prefix(rule.filter.as_ref()).or(rule.legacy_prefix),
515            tags: parse_replication_filter_tags(rule.filter.as_ref()),
516            destination: rc_core::ReplicationDestination {
517                bucket_arn: rule
518                    .destination
519                    .as_ref()
520                    .and_then(|destination| destination.bucket.clone())
521                    .unwrap_or_default(),
522                storage_class: rule
523                    .destination
524                    .and_then(|destination| destination.storage_class),
525            },
526            delete_marker_replication: parse_replication_status(
527                rule.delete_marker_replication.as_ref(),
528            ),
529            existing_object_replication: parse_replication_status(
530                rule.existing_object_replication.as_ref(),
531            ),
532            delete_replication: parse_replication_status(rule.delete_replication.as_ref()),
533        })
534        .collect();
535
536    Ok(ReplicationConfiguration {
537        role: config.role.unwrap_or_default(),
538        rules,
539    })
540}
541
542fn xml_escape(value: &str) -> String {
543    value
544        .replace('&', "&amp;")
545        .replace('<', "&lt;")
546        .replace('>', "&gt;")
547        .replace('"', "&quot;")
548        .replace('\'', "&apos;")
549}
550
551fn append_replication_status_tag(xml: &mut String, tag: &str, enabled: Option<bool>) {
552    if let Some(enabled) = enabled {
553        let status = if enabled { "Enabled" } else { "Disabled" };
554        xml.push('<');
555        xml.push_str(tag);
556        xml.push_str("><Status>");
557        xml.push_str(status);
558        xml.push_str("</Status></");
559        xml.push_str(tag);
560        xml.push('>');
561    }
562}
563
564fn build_replication_configuration_xml(config: &ReplicationConfiguration) -> String {
565    let mut xml = String::from(r#"<?xml version="1.0" encoding="UTF-8"?>"#);
566    xml.push_str(r#"<ReplicationConfiguration xmlns=""#);
567    xml.push_str(S3_REPLICATION_XML_NAMESPACE);
568    xml.push_str(r#"">"#);
569
570    if !config.role.is_empty() {
571        xml.push_str("<Role>");
572        xml.push_str(&xml_escape(&config.role));
573        xml.push_str("</Role>");
574    }
575
576    for rule in &config.rules {
577        xml.push_str("<Rule>");
578
579        xml.push_str("<Status>");
580        xml.push_str(match rule.status {
581            rc_core::ReplicationRuleStatus::Enabled => "Enabled",
582            rc_core::ReplicationRuleStatus::Disabled => "Disabled",
583        });
584        xml.push_str("</Status>");
585
586        xml.push_str("<Destination><Bucket>");
587        xml.push_str(&xml_escape(&rule.destination.bucket_arn));
588        xml.push_str("</Bucket>");
589        if let Some(storage_class) = &rule.destination.storage_class {
590            xml.push_str("<StorageClass>");
591            xml.push_str(&xml_escape(storage_class));
592            xml.push_str("</StorageClass>");
593        }
594        xml.push_str("</Destination>");
595
596        if !rule.id.is_empty() {
597            xml.push_str("<ID>");
598            xml.push_str(&xml_escape(&rule.id));
599            xml.push_str("</ID>");
600        }
601
602        xml.push_str("<Priority>");
603        xml.push_str(&rule.priority.to_string());
604        xml.push_str("</Priority>");
605
606        append_replication_filter_xml(&mut xml, rule.prefix.as_deref(), rule.tags.as_ref());
607
608        append_replication_status_tag(
609            &mut xml,
610            "ExistingObjectReplication",
611            rule.existing_object_replication,
612        );
613        append_replication_status_tag(
614            &mut xml,
615            "DeleteMarkerReplication",
616            rule.delete_marker_replication,
617        );
618        append_replication_status_tag(&mut xml, "DeleteReplication", rule.delete_replication);
619
620        xml.push_str("</Rule>");
621    }
622
623    xml.push_str("</ReplicationConfiguration>");
624    xml
625}
626
627fn parse_lifecycle_filter_prefix(
628    filter: Option<&aws_sdk_s3::types::LifecycleRuleFilter>,
629) -> Option<String> {
630    filter
631        .and_then(|filter| filter.prefix().map(str::to_string))
632        .or_else(|| filter.and_then(|filter| filter.and()?.prefix().map(str::to_string)))
633}
634
635fn parse_lifecycle_filter_tags(
636    filter: Option<&aws_sdk_s3::types::LifecycleRuleFilter>,
637) -> Option<HashMap<String, String>> {
638    filter
639        .and_then(|filter| collect_tag_map(filter.tag().map(|tag| (tag.key(), tag.value()))))
640        .or_else(|| {
641            filter.and_then(|filter| {
642                collect_tag_map(
643                    filter
644                        .and()?
645                        .tags()
646                        .iter()
647                        .map(|tag| (tag.key(), tag.value())),
648                )
649            })
650        })
651}
652
653fn build_s3_tag(key: &str, value: &str) -> Result<aws_sdk_s3::types::Tag> {
654    aws_sdk_s3::types::Tag::builder()
655        .key(key)
656        .value(value)
657        .build()
658        .map_err(|error| Error::General(format!("build filter tag: {error}")))
659}
660
661fn build_lifecycle_rule_filter(
662    prefix: Option<&str>,
663    tags: Option<&HashMap<String, String>>,
664) -> Result<Option<aws_sdk_s3::types::LifecycleRuleFilter>> {
665    let Some(tags) = tags.filter(|tags| !tags.is_empty()) else {
666        return Ok(prefix.map(|prefix| {
667            aws_sdk_s3::types::LifecycleRuleFilter::builder()
668                .prefix(prefix)
669                .build()
670        }));
671    };
672
673    let tag_values = sorted_tags(tags)
674        .into_iter()
675        .map(|(key, value)| build_s3_tag(key, value))
676        .collect::<Result<Vec<_>>>()?;
677
678    let filter = if prefix.is_some() || tag_values.len() > 1 {
679        let mut and_builder = aws_sdk_s3::types::LifecycleRuleAndOperator::builder();
680        if let Some(prefix) = prefix {
681            and_builder = and_builder.prefix(prefix);
682        }
683        for tag in tag_values {
684            and_builder = and_builder.tags(tag);
685        }
686        aws_sdk_s3::types::LifecycleRuleFilter::builder()
687            .and(and_builder.build())
688            .build()
689    } else {
690        aws_sdk_s3::types::LifecycleRuleFilter::builder()
691            .tag(
692                tag_values
693                    .into_iter()
694                    .next()
695                    .expect("non-empty tags required to build lifecycle filter"),
696            )
697            .build()
698    };
699
700    Ok(Some(filter))
701}
702
703impl HttpConnector for ReqwestConnector {
704    fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
705        let client = self.client.clone();
706        HttpConnectorFuture::new(async move {
707            // Extract request parts before consuming the request
708            let uri = request.uri().to_string();
709            let method_str = request.method().to_string();
710            let headers = request.headers().clone();
711
712            // Try to get the body as buffered in-memory bytes.
713            // For streaming bodies (e.g., large file uploads), bytes() returns None and we
714            // return a clear error rather than silently sending an empty body, which would
715            // cause signature mismatches or server-side failures.
716            let body_bytes = match request.body().bytes() {
717                Some(b) => Bytes::copy_from_slice(b),
718                None => {
719                    return Err(ConnectorError::user(
720                        "Streaming request bodies are not supported in insecure/ca_bundle TLS mode; \
721                         use in-memory data for uploads with this connector"
722                            .into(),
723                    ));
724                }
725            };
726
727            // Build reqwest method
728            let method = reqwest::Method::from_bytes(method_str.as_bytes())
729                .map_err(|e| ConnectorError::user(Box::new(e)))?;
730
731            // Build reqwest URL
732            let url = reqwest::Url::parse(&uri).map_err(|e| ConnectorError::user(Box::new(e)))?;
733
734            // Build reqwest request
735            let mut req = reqwest::Request::new(method, url);
736
737            // Copy headers; S3 headers are all ASCII so failures here are unexpected
738            for (name, value) in headers.iter() {
739                match (
740                    reqwest::header::HeaderName::from_bytes(name.as_bytes()),
741                    reqwest::header::HeaderValue::from_bytes(value.as_bytes()),
742                ) {
743                    (Ok(header_name), Ok(header_value)) => {
744                        req.headers_mut().append(header_name, header_value);
745                    }
746                    _ => {
747                        tracing::warn!("Skipping non-convertible request header: {}", name);
748                    }
749                }
750            }
751
752            // Set body
753            *req.body_mut() = Some(reqwest::Body::from(body_bytes));
754
755            // Execute
756            let resp = client
757                .execute(req)
758                .await
759                .map_err(|e| ConnectorError::io(Box::new(e)))?;
760
761            // Convert response
762            let status = StatusCode::try_from(resp.status().as_u16())
763                .map_err(|e| ConnectorError::other(Box::new(e), None))?;
764            let resp_headers = resp.headers().clone();
765            let body = resp
766                .bytes()
767                .await
768                .map_err(|e| ConnectorError::io(Box::new(e)))?;
769
770            let mut sdk_response = Response::new(status, SdkBody::from(body));
771            for (name, value) in &resp_headers {
772                match value.to_str() {
773                    Ok(value_str) => {
774                        sdk_response
775                            .headers_mut()
776                            .append(name.as_str().to_owned(), value_str.to_owned());
777                    }
778                    Err(_) => {
779                        tracing::warn!("Skipping non-UTF8 response header: {}", name.as_str());
780                    }
781                }
782            }
783
784            Ok(sdk_response)
785        })
786    }
787}
788
789impl HttpClient for ReqwestConnector {
790    fn http_connector(
791        &self,
792        _settings: &HttpConnectorSettings,
793        _components: &RuntimeComponents,
794    ) -> SharedHttpConnector {
795        // NOTE: `ReqwestConnector` is preconfigured (e.g., insecure/CA-bundle options) when it
796        // is constructed, and does not currently apply `HttpConnectorSettings`. This means
797        // behavior in this mode may differ from the default connector w.r.t. SDK HTTP settings.
798        // If alignment is required, map relevant fields from `HttpConnectorSettings` onto the
799        // internal `reqwest::Client` when constructing the connector.
800        SharedHttpConnector::new(self.clone())
801    }
802}
803
804/// S3 client wrapper
805pub struct S3Client {
806    inner: aws_sdk_s3::Client,
807    presign_inner: aws_sdk_s3::Client,
808    xml_http_client: reqwest::Client,
809    alias: Alias,
810    request_headers: Vec<RequestHeader>,
811}
812
813/// Request-level options for delete operations.
814#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
815pub struct DeleteRequestOptions {
816    /// Ask RustFS to permanently delete data instead of creating delete markers.
817    pub force_delete: bool,
818}
819
820#[derive(Debug, Clone)]
821struct CustomHeaderInterceptor {
822    headers: Vec<RequestHeader>,
823}
824
825impl Intercept for CustomHeaderInterceptor {
826    fn name(&self) -> &'static str {
827        "CustomHeaderInterceptor"
828    }
829
830    fn modify_before_signing(
831        &self,
832        context: &mut BeforeTransmitInterceptorContextMut<'_>,
833        _runtime_components: &RuntimeComponents,
834        _cfg: &mut ConfigBag,
835    ) -> std::result::Result<(), BoxError> {
836        let request = context.request_mut();
837        for header in &self.headers {
838            request
839                .headers_mut()
840                .try_insert(header.name.clone(), header.value.clone())
841                .map_err(|error| Box::new(error) as BoxError)?;
842        }
843        Ok(())
844    }
845}
846
847impl S3Client {
848    /// Create a new S3 client from an alias configuration
849    pub async fn new(alias: Alias) -> Result<Self> {
850        let endpoint = alias.endpoint.clone();
851        let region = alias.region.clone();
852        let access_key = alias.access_key.clone();
853        let secret_key = alias.secret_key.clone();
854
855        // Build SDK config loader
856        let mut config_loader = aws_config::defaults(aws_config::BehaviorVersion::latest())
857            .region(aws_config::Region::new(region))
858            .endpoint_url(&endpoint);
859
860        if alias.anonymous {
861            config_loader = config_loader.no_credentials();
862        } else {
863            let credentials = aws_credential_types::Credentials::new(
864                access_key,
865                secret_key,
866                None, // session token
867                None, // expiry
868                "rc-static-credentials",
869            );
870            config_loader = config_loader.credentials_provider(credentials);
871        }
872
873        // When insecure mode is enabled or a custom CA bundle is provided, use the reqwest
874        // connector which supports danger_accept_invalid_certs and custom root certificates.
875        if alias.insecure
876            || alias.ca_bundle.is_some()
877            || (alias.client_cert.is_some() && alias.client_key.is_some())
878        {
879            let connector = ReqwestConnector::new(
880                alias.insecure,
881                alias.ca_bundle.as_deref(),
882                alias.client_cert.as_deref(),
883                alias.client_key.as_deref(),
884            )
885            .await?;
886            config_loader = config_loader.http_client(connector);
887        }
888
889        let xml_http_client = build_reqwest_client(
890            alias.insecure,
891            alias.ca_bundle.as_deref(),
892            alias.client_cert.as_deref(),
893            alias.client_key.as_deref(),
894        )
895        .await?;
896        let config = config_loader.load().await;
897
898        // Build S3 client with path-style addressing for compatibility
899        let request_headers = global_request_headers();
900        let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&config)
901            .force_path_style(force_path_style_for_alias(&alias))
902            // Improve compatibility with S3-compatible backends by only sending request
903            // checksums when the operation explicitly requires them.
904            .request_checksum_calculation(
905                aws_sdk_s3::config::RequestChecksumCalculation::WhenRequired,
906            )
907            .response_checksum_validation(
908                aws_sdk_s3::config::ResponseChecksumValidation::WhenRequired,
909            );
910
911        let presign_s3_config = s3_config_builder.clone().build();
912
913        if !request_headers.is_empty() {
914            s3_config_builder = s3_config_builder.interceptor(CustomHeaderInterceptor {
915                headers: request_headers.clone(),
916            });
917        }
918
919        let s3_config = s3_config_builder.build();
920
921        let client = aws_sdk_s3::Client::from_conf(s3_config);
922        let presign_client = aws_sdk_s3::Client::from_conf(presign_s3_config);
923
924        Ok(Self {
925            inner: client,
926            presign_inner: presign_client,
927            xml_http_client,
928            alias,
929            request_headers,
930        })
931    }
932
933    /// Get the underlying aws-sdk-s3 client
934    pub fn inner(&self) -> &aws_sdk_s3::Client {
935        &self.inner
936    }
937
938    /// List a single page of object versions and return pagination metadata.
939    pub async fn list_object_versions_page(
940        &self,
941        path: &RemotePath,
942        max_keys: Option<i32>,
943    ) -> Result<ObjectVersionListResult> {
944        let mut builder = self.inner.list_object_versions().bucket(&path.bucket);
945
946        if !path.key.is_empty() {
947            builder = builder.prefix(&path.key);
948        }
949
950        if let Some(max) = max_keys {
951            builder = builder.max_keys(max);
952        }
953
954        let response = builder.send().await.map_err(|e| {
955            let err_str = Self::format_sdk_error(&e);
956            if err_str.contains("NotFound") || err_str.contains("NoSuchBucket") {
957                Error::NotFound(format!("Bucket not found: {}", path.bucket))
958            } else {
959                Error::Network(err_str)
960            }
961        })?;
962
963        let mut items = Vec::new();
964
965        for v in response.versions() {
966            items.push(ObjectVersion {
967                key: v.key().unwrap_or_default().to_string(),
968                version_id: v.version_id().unwrap_or("null").to_string(),
969                is_latest: v.is_latest().unwrap_or(false),
970                is_delete_marker: false,
971                last_modified: v
972                    .last_modified()
973                    .and_then(|dt| Timestamp::from_second(dt.secs()).ok()),
974                size_bytes: v.size(),
975                etag: v.e_tag().map(|s| s.trim_matches('"').to_string()),
976            });
977        }
978
979        for m in response.delete_markers() {
980            items.push(ObjectVersion {
981                key: m.key().unwrap_or_default().to_string(),
982                version_id: m.version_id().unwrap_or("null").to_string(),
983                is_latest: m.is_latest().unwrap_or(false),
984                is_delete_marker: true,
985                last_modified: m
986                    .last_modified()
987                    .and_then(|dt| Timestamp::from_second(dt.secs()).ok()),
988                size_bytes: None,
989                etag: None,
990            });
991        }
992
993        items.sort_by(|a, b| {
994            a.key
995                .cmp(&b.key)
996                .then_with(|| b.last_modified.cmp(&a.last_modified))
997        });
998
999        Ok(ObjectVersionListResult {
1000            items,
1001            truncated: response.is_truncated().unwrap_or(false),
1002            continuation_token: response.next_key_marker().map(ToString::to_string),
1003            version_id_marker: response.next_version_id_marker().map(ToString::to_string),
1004        })
1005    }
1006
1007    /// Download object content and report downloaded bytes after each received chunk.
1008    pub async fn get_object_with_progress(
1009        &self,
1010        path: &RemotePath,
1011        mut on_progress: impl FnMut(u64, Option<u64>) + Send,
1012    ) -> Result<Vec<u8>> {
1013        let response = self
1014            .inner
1015            .get_object()
1016            .bucket(&path.bucket)
1017            .key(&path.key)
1018            .send()
1019            .await
1020            .map_err(|e| {
1021                let err_str = e.to_string();
1022                if err_str.contains("NotFound") || err_str.contains("NoSuchKey") {
1023                    Error::NotFound(path.to_string())
1024                } else {
1025                    Error::Network(err_str)
1026                }
1027            })?;
1028
1029        let content_length = response
1030            .content_length()
1031            .and_then(|length| u64::try_from(length).ok());
1032        let mut data = Vec::with_capacity(
1033            content_length
1034                .and_then(|length| usize::try_from(length).ok())
1035                .unwrap_or_default(),
1036        );
1037        let mut body = response.body;
1038        let mut bytes_downloaded = 0u64;
1039
1040        while let Some(chunk) = body
1041            .try_next()
1042            .await
1043            .map_err(|e| Error::Network(e.to_string()))?
1044        {
1045            bytes_downloaded += chunk.len() as u64;
1046            data.extend_from_slice(&chunk);
1047            on_progress(bytes_downloaded, content_length);
1048        }
1049
1050        Ok(data)
1051    }
1052
1053    /// Delete an object with RustFS-specific request options.
1054    pub async fn delete_object_with_options(
1055        &self,
1056        path: &RemotePath,
1057        options: DeleteRequestOptions,
1058    ) -> Result<()> {
1059        let mut request = self
1060            .inner
1061            .delete_object()
1062            .bucket(&path.bucket)
1063            .key(&path.key)
1064            .customize();
1065
1066        if options.force_delete {
1067            request = request.mutate_request(|request| {
1068                request
1069                    .headers_mut()
1070                    .insert(RUSTFS_FORCE_DELETE_HEADER, "true");
1071            });
1072        }
1073
1074        request.send().await.map_err(|e| {
1075            let err_str = Self::format_sdk_error(&e);
1076            let is_missing_key = if let aws_sdk_s3::error::SdkError::ServiceError(service_err) = &e
1077            {
1078                let code = service_err.err().code().or_else(|| {
1079                    service_err
1080                        .raw()
1081                        .headers()
1082                        .get("x-amz-error-code")
1083                        .and_then(|value| std::str::from_utf8(value.as_bytes()).ok())
1084                });
1085                matches!(code, Some("NoSuchKey") | Some("NotFound"))
1086                    || service_err.raw().status().as_u16() == 404
1087            } else {
1088                err_str.contains("NotFound") || err_str.contains("NoSuchKey")
1089            };
1090
1091            if is_missing_key {
1092                Error::NotFound(path.to_string())
1093            } else {
1094                Error::Network(err_str)
1095            }
1096        })?;
1097
1098        Ok(())
1099    }
1100
1101    /// Delete multiple objects with RustFS-specific request options.
1102    pub async fn delete_objects_with_options(
1103        &self,
1104        bucket: &str,
1105        keys: Vec<String>,
1106        options: DeleteRequestOptions,
1107    ) -> Result<Vec<String>> {
1108        use aws_sdk_s3::types::{Delete, ObjectIdentifier};
1109
1110        if keys.is_empty() {
1111            return Ok(vec![]);
1112        }
1113
1114        let objects: Vec<ObjectIdentifier> =
1115            keys.iter()
1116                .map(|key| {
1117                    ObjectIdentifier::builder().key(key).build().map_err(|e| {
1118                        Error::General(format!("invalid delete object identifier: {e}"))
1119                    })
1120                })
1121                .collect::<Result<Vec<_>>>()?;
1122
1123        let delete = Delete::builder()
1124            .set_objects(Some(objects))
1125            .build()
1126            .map_err(|e| Error::General(e.to_string()))?;
1127
1128        let mut request = self
1129            .inner
1130            .delete_objects()
1131            .bucket(bucket)
1132            .delete(delete)
1133            .customize();
1134
1135        if options.force_delete {
1136            request = request.mutate_request(|request| {
1137                request
1138                    .headers_mut()
1139                    .insert(RUSTFS_FORCE_DELETE_HEADER, "true");
1140            });
1141        }
1142
1143        let response = request
1144            .send()
1145            .await
1146            .map_err(|e| Error::Network(e.to_string()))?;
1147
1148        let deleted: Vec<String> = response
1149            .deleted()
1150            .iter()
1151            .filter_map(|d| d.key().map(|k| k.to_string()))
1152            .collect();
1153
1154        if !response.errors().is_empty() {
1155            let error_keys: Vec<String> = response
1156                .errors()
1157                .iter()
1158                .filter_map(|e| e.key().map(|k| k.to_string()))
1159                .collect();
1160            tracing::warn!("Failed to delete some objects: {:?}", error_keys);
1161        }
1162
1163        Ok(deleted)
1164    }
1165
1166    /// Format AWS SDK error into a detailed error message
1167    fn format_sdk_error<E: std::fmt::Display>(error: &aws_sdk_s3::error::SdkError<E>) -> String {
1168        match error {
1169            aws_sdk_s3::error::SdkError::ServiceError(service_err) => {
1170                let err = service_err.err();
1171                let meta = service_err.raw();
1172                let mut msg = format!("Service error: {}", err);
1173                // Try to extract additional error information from headers
1174                if let Some(code) = meta.headers().get("x-amz-error-code")
1175                    && let Ok(code_str) = std::str::from_utf8(code.as_bytes())
1176                {
1177                    msg.push_str(&format!(" (code: {})", code_str));
1178                }
1179                msg
1180            }
1181            aws_sdk_s3::error::SdkError::ConstructionFailure(err) => {
1182                format!("Request construction failed: {:?}", err)
1183            }
1184            aws_sdk_s3::error::SdkError::TimeoutError(_) => "Request timeout".to_string(),
1185            aws_sdk_s3::error::SdkError::DispatchFailure(err) => {
1186                format!("Network dispatch error: {:?}", err)
1187            }
1188            aws_sdk_s3::error::SdkError::ResponseError(err) => {
1189                format!("Response error: {:?}", err)
1190            }
1191            _ => error.to_string(),
1192        }
1193    }
1194
1195    fn should_use_multipart(file_size: u64) -> bool {
1196        file_size > SINGLE_PUT_OBJECT_MAX_SIZE
1197    }
1198
1199    fn sha256_hash(body: &[u8]) -> String {
1200        let mut hasher = Sha256::new();
1201        hasher.update(body);
1202        hex::encode(hasher.finalize())
1203    }
1204
1205    fn request_host(&self, url: &reqwest::Url) -> Result<String> {
1206        let host = url
1207            .host_str()
1208            .ok_or_else(|| Error::Network("Missing host in request URL".to_string()))?;
1209        Ok(match url.port() {
1210            Some(port) => format!("{host}:{port}"),
1211            None => host.to_string(),
1212        })
1213    }
1214
1215    fn replication_url(&self, bucket: &str) -> Result<reqwest::Url> {
1216        let mut url =
1217            reqwest::Url::parse(self.alias.endpoint.trim_end_matches('/')).map_err(|e| {
1218                Error::Network(format!("Invalid endpoint '{}': {e}", self.alias.endpoint))
1219            })?;
1220
1221        {
1222            let mut segments = url.path_segments_mut().map_err(|_| {
1223                Error::Network(format!(
1224                    "Endpoint '{}' does not support path-style bucket operations",
1225                    self.alias.endpoint
1226                ))
1227            })?;
1228            segments.pop_if_empty();
1229            segments.push(bucket);
1230        }
1231
1232        url.set_query(Some("replication="));
1233        Ok(url)
1234    }
1235
1236    fn cors_url(&self, bucket: &str) -> Result<reqwest::Url> {
1237        let mut url =
1238            reqwest::Url::parse(self.alias.endpoint.trim_end_matches('/')).map_err(|e| {
1239                Error::Network(format!("Invalid endpoint '{}': {e}", self.alias.endpoint))
1240            })?;
1241
1242        {
1243            let mut segments = url.path_segments_mut().map_err(|_| {
1244                Error::Network(format!(
1245                    "Endpoint '{}' does not support path-style bucket operations",
1246                    self.alias.endpoint
1247                ))
1248            })?;
1249            segments.pop_if_empty();
1250            segments.push(bucket);
1251        }
1252
1253        url.set_query(Some("cors="));
1254        Ok(url)
1255    }
1256
1257    async fn sign_xml_request(
1258        &self,
1259        method: &Method,
1260        url: &str,
1261        headers: &HeaderMap,
1262        body: &[u8],
1263    ) -> Result<HeaderMap> {
1264        if self.alias.anonymous {
1265            return Ok(headers.clone());
1266        }
1267
1268        let credentials = Credentials::new(
1269            &self.alias.access_key,
1270            &self.alias.secret_key,
1271            None,
1272            None,
1273            "s3-xml-client",
1274        );
1275
1276        let identity = credentials.into();
1277        let mut signing_settings = SigningSettings::default();
1278        signing_settings.signature_location = SignatureLocation::Headers;
1279
1280        let signing_params = v4::SigningParams::builder()
1281            .identity(&identity)
1282            .region(&self.alias.region)
1283            .name(S3_SERVICE_NAME)
1284            .time(std::time::SystemTime::now())
1285            .settings(signing_settings)
1286            .build()
1287            .map_err(|e| Error::Auth(format!("Failed to build signing params: {e}")))?;
1288
1289        let header_pairs: Vec<(&str, &str)> = headers
1290            .iter()
1291            .filter_map(|(k, v)| v.to_str().ok().map(|v| (k.as_str(), v)))
1292            .collect();
1293
1294        let signable_request = SignableRequest::new(
1295            method.as_str(),
1296            url,
1297            header_pairs.into_iter(),
1298            SignableBody::Bytes(body),
1299        )
1300        .map_err(|e| Error::Auth(format!("Failed to create signable request: {e}")))?;
1301
1302        let (signing_instructions, _) = sign(signable_request, &signing_params.into())
1303            .map_err(|e| Error::Auth(format!("Failed to sign request: {e}")))?
1304            .into_parts();
1305
1306        let mut signed_headers = headers.clone();
1307        for (name, value) in signing_instructions.headers() {
1308            let header_name = HeaderName::try_from(name.to_string())
1309                .map_err(|e| Error::Auth(format!("Invalid header name: {e}")))?;
1310            let header_value = HeaderValue::try_from(value.to_string())
1311                .map_err(|e| Error::Auth(format!("Invalid header value: {e}")))?;
1312            signed_headers.insert(header_name, header_value);
1313        }
1314
1315        Ok(signed_headers)
1316    }
1317
1318    async fn xml_request(
1319        &self,
1320        method: Method,
1321        url: reqwest::Url,
1322        content_type: Option<&str>,
1323        body: Option<Vec<u8>>,
1324    ) -> Result<String> {
1325        let body = body.unwrap_or_default();
1326        let mut headers = HeaderMap::new();
1327        headers.insert(
1328            "x-amz-content-sha256",
1329            HeaderValue::from_str(&Self::sha256_hash(&body))
1330                .map_err(|e| Error::Auth(format!("Invalid content hash header: {e}")))?,
1331        );
1332        headers.insert(
1333            "host",
1334            HeaderValue::from_str(&self.request_host(&url)?)
1335                .map_err(|e| Error::Auth(format!("Invalid host header: {e}")))?,
1336        );
1337
1338        if let Some(content_type) = content_type {
1339            headers.insert(
1340                CONTENT_TYPE,
1341                HeaderValue::from_str(content_type)
1342                    .map_err(|e| Error::Auth(format!("Invalid content type header: {e}")))?,
1343            );
1344        }
1345
1346        for header in &self.request_headers {
1347            let name = HeaderName::from_bytes(header.name.as_bytes())
1348                .map_err(|e| Error::Auth(format!("Invalid custom header name: {e}")))?;
1349            let value = HeaderValue::from_str(&header.value)
1350                .map_err(|e| Error::Auth(format!("Invalid custom header value: {e}")))?;
1351            headers.insert(name, value);
1352        }
1353
1354        let signed_headers = self
1355            .sign_xml_request(&method, url.as_str(), &headers, &body)
1356            .await?;
1357
1358        let mut request_builder = self.xml_http_client.request(method, url);
1359        for (name, value) in &signed_headers {
1360            request_builder = request_builder.header(name, value);
1361        }
1362        if !body.is_empty() {
1363            request_builder = request_builder.body(body);
1364        }
1365
1366        let response = request_builder
1367            .send()
1368            .await
1369            .map_err(|e| Error::Network(format!("Request failed: {e}")))?;
1370
1371        let status = response.status();
1372        let text = response
1373            .text()
1374            .await
1375            .map_err(|e| Error::Network(format!("Failed to read response: {e}")))?;
1376
1377        if !status.is_success() {
1378            return Err(Error::Network(format!(
1379                "HTTP {}: {}",
1380                status.as_u16(),
1381                text
1382            )));
1383        }
1384
1385        Ok(text)
1386    }
1387
1388    fn bucket_policy_error_kind(
1389        error_code: Option<&str>,
1390        status_code: Option<u16>,
1391        error_text: &str,
1392    ) -> BucketPolicyErrorKind {
1393        let error_code = error_code.map(|code| code.to_ascii_lowercase());
1394        if matches!(
1395            error_code.as_deref(),
1396            Some("nosuchbucketpolicy") | Some("nosuchpolicy")
1397        ) {
1398            return BucketPolicyErrorKind::MissingPolicy;
1399        }
1400        if matches!(error_code.as_deref(), Some("nosuchbucket")) {
1401            return BucketPolicyErrorKind::MissingBucket;
1402        }
1403
1404        let error_text = error_text.to_ascii_lowercase();
1405        if error_text.contains("nosuchbucketpolicy") || error_text.contains("nosuchpolicy") {
1406            return BucketPolicyErrorKind::MissingPolicy;
1407        }
1408        if error_text.contains("nosuchbucket") {
1409            return BucketPolicyErrorKind::MissingBucket;
1410        }
1411        if status_code == Some(404) {
1412            return BucketPolicyErrorKind::MissingPolicy;
1413        }
1414
1415        BucketPolicyErrorKind::Other
1416    }
1417
1418    fn map_get_bucket_policy_error(
1419        bucket: &str,
1420        kind: BucketPolicyErrorKind,
1421        error_text: &str,
1422    ) -> Result<Option<String>> {
1423        match kind {
1424            BucketPolicyErrorKind::MissingPolicy => Ok(None),
1425            BucketPolicyErrorKind::MissingBucket => {
1426                Err(Error::NotFound(format!("Bucket not found: {bucket}")))
1427            }
1428            BucketPolicyErrorKind::Other => {
1429                Err(Error::Network(format!("get_bucket_policy: {error_text}")))
1430            }
1431        }
1432    }
1433
1434    fn map_delete_bucket_policy_error(
1435        bucket: &str,
1436        kind: BucketPolicyErrorKind,
1437        error_text: &str,
1438    ) -> Result<()> {
1439        match kind {
1440            BucketPolicyErrorKind::MissingPolicy => Ok(()),
1441            BucketPolicyErrorKind::MissingBucket => {
1442                Err(Error::NotFound(format!("Bucket not found: {bucket}")))
1443            }
1444            BucketPolicyErrorKind::Other => Err(Error::General(format!(
1445                "delete_bucket_policy: {error_text}"
1446            ))),
1447        }
1448    }
1449
1450    fn extract_notification_filter(
1451        filter: Option<&aws_sdk_s3::types::NotificationConfigurationFilter>,
1452    ) -> (Option<String>, Option<String>) {
1453        let mut prefix = None;
1454        let mut suffix = None;
1455
1456        if let Some(key_filter) = filter.and_then(|value| value.key()) {
1457            for rule in key_filter.filter_rules() {
1458                match rule.name().map(|name| name.as_str()) {
1459                    Some("prefix") => {
1460                        prefix = rule.value().map(ToString::to_string);
1461                    }
1462                    Some("suffix") => {
1463                        suffix = rule.value().map(ToString::to_string);
1464                    }
1465                    _ => {}
1466                }
1467            }
1468        }
1469
1470        (prefix, suffix)
1471    }
1472
1473    fn build_notification_filter(
1474        prefix: Option<&str>,
1475        suffix: Option<&str>,
1476    ) -> Option<aws_sdk_s3::types::NotificationConfigurationFilter> {
1477        use aws_sdk_s3::types::{FilterRule, FilterRuleName, NotificationConfigurationFilter};
1478
1479        let mut rules = Vec::new();
1480        if let Some(value) = prefix {
1481            let rule = FilterRule::builder()
1482                .name(FilterRuleName::Prefix)
1483                .value(value)
1484                .build();
1485            rules.push(rule);
1486        }
1487        if let Some(value) = suffix {
1488            let rule = FilterRule::builder()
1489                .name(FilterRuleName::Suffix)
1490                .value(value)
1491                .build();
1492            rules.push(rule);
1493        }
1494        if rules.is_empty() {
1495            return None;
1496        }
1497
1498        let key_filter = aws_sdk_s3::types::S3KeyFilter::builder()
1499            .set_filter_rules(Some(rules))
1500            .build();
1501        NotificationConfigurationFilter::builder()
1502            .key(key_filter)
1503            .build()
1504            .into()
1505    }
1506
1507    fn event_list_to_strings(events: &[aws_sdk_s3::types::Event]) -> Vec<String> {
1508        events
1509            .iter()
1510            .map(|event| event.as_str().to_string())
1511            .collect()
1512    }
1513
1514    fn strings_to_event_list(events: &[String]) -> Vec<aws_sdk_s3::types::Event> {
1515        events
1516            .iter()
1517            .map(|event| aws_sdk_s3::types::Event::from(event.as_str()))
1518            .collect()
1519    }
1520
1521    fn notifications_equivalent(
1522        expected: &[BucketNotification],
1523        actual: &[BucketNotification],
1524    ) -> bool {
1525        type CanonicalEntry = (u8, String, Option<String>, Option<String>, Vec<String>);
1526
1527        fn target_order(target: NotificationTarget) -> u8 {
1528            match target {
1529                NotificationTarget::Queue => 0,
1530                NotificationTarget::Topic => 1,
1531                NotificationTarget::Lambda => 2,
1532            }
1533        }
1534
1535        fn canonical(notifications: &[BucketNotification]) -> Vec<CanonicalEntry> {
1536            let mut normalized: Vec<CanonicalEntry> = notifications
1537                .iter()
1538                .map(|item| {
1539                    let mut events = item.events.clone();
1540                    events.sort();
1541                    events.dedup();
1542                    (
1543                        target_order(item.target),
1544                        item.arn.clone(),
1545                        item.prefix.clone(),
1546                        item.suffix.clone(),
1547                        events,
1548                    )
1549                })
1550                .collect();
1551            normalized.sort();
1552            normalized
1553        }
1554
1555        canonical(expected) == canonical(actual)
1556    }
1557
1558    async fn read_next_part(
1559        file: &mut tokio::fs::File,
1560        file_path: &std::path::Path,
1561        buffer: &mut [u8],
1562    ) -> Result<usize> {
1563        let mut total_read = 0usize;
1564        while total_read < buffer.len() {
1565            let bytes_read = file
1566                .read(&mut buffer[total_read..])
1567                .await
1568                .map_err(|e| Error::General(format!("read file '{}': {e}", file_path.display())))?;
1569            if bytes_read == 0 {
1570                break;
1571            }
1572            total_read += bytes_read;
1573        }
1574        Ok(total_read)
1575    }
1576
1577    async fn put_object_single_part_from_path(
1578        &self,
1579        path: &RemotePath,
1580        file_path: &std::path::Path,
1581        content_type: Option<&str>,
1582        file_size: u64,
1583        encryption: Option<&ObjectEncryptionRequest>,
1584    ) -> Result<ObjectInfo> {
1585        let data = tokio::fs::read(file_path)
1586            .await
1587            .map_err(|e| Error::General(format!("read file '{}': {e}", file_path.display())))?;
1588        let body = aws_sdk_s3::primitives::ByteStream::from(data);
1589
1590        let mut request = apply_object_encryption_to_put_request(
1591            self.inner
1592                .put_object()
1593                .bucket(&path.bucket)
1594                .key(&path.key)
1595                .body(body),
1596            encryption,
1597        );
1598
1599        if let Some(ct) = content_type {
1600            request = request.content_type(ct);
1601        }
1602
1603        let response = request
1604            .send()
1605            .await
1606            .map_err(|e| Error::Network(e.to_string()))?;
1607
1608        let mut info = ObjectInfo::file(&path.key, file_size as i64);
1609        if let Some(etag) = response.e_tag() {
1610            info.etag = Some(etag.trim_matches('"').to_string());
1611        }
1612        info.last_modified = Some(jiff::Timestamp::now());
1613
1614        Ok(info)
1615    }
1616
1617    async fn abort_multipart_upload_best_effort(&self, path: &RemotePath, upload_id: &str) {
1618        let _ = self
1619            .inner
1620            .abort_multipart_upload()
1621            .bucket(&path.bucket)
1622            .key(&path.key)
1623            .upload_id(upload_id)
1624            .send()
1625            .await;
1626    }
1627
1628    async fn put_object_multipart_from_path(
1629        &self,
1630        path: &RemotePath,
1631        file_path: &std::path::Path,
1632        content_type: Option<&str>,
1633        file_size: u64,
1634        encryption: Option<&ObjectEncryptionRequest>,
1635        on_progress: impl Fn(u64) + Send,
1636    ) -> Result<ObjectInfo> {
1637        use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart};
1638
1639        let config = crate::multipart::MultipartConfig::default();
1640        let part_size = config.calculate_part_size(file_size);
1641        let part_buffer_size = usize::try_from(part_size)
1642            .map_err(|_| Error::General(format!("invalid part size: {part_size}")))?;
1643
1644        tracing::debug!(file_size, part_size, "Starting multipart upload");
1645
1646        let mut create_request = self
1647            .inner
1648            .create_multipart_upload()
1649            .bucket(&path.bucket)
1650            .key(&path.key);
1651
1652        create_request = match encryption {
1653            Some(ObjectEncryptionRequest::SseS3) => create_request
1654                .server_side_encryption(aws_sdk_s3::types::ServerSideEncryption::Aes256),
1655            Some(ObjectEncryptionRequest::SseKms { key_id }) => create_request
1656                .server_side_encryption(aws_sdk_s3::types::ServerSideEncryption::AwsKms)
1657                .ssekms_key_id(key_id),
1658            None => create_request,
1659        };
1660
1661        if let Some(ct) = content_type {
1662            create_request = create_request.content_type(ct);
1663        }
1664
1665        let create_response = create_request
1666            .send()
1667            .await
1668            .map_err(|e| Error::Network(format!("create multipart upload: {e}")))?;
1669
1670        let upload_id = create_response
1671            .upload_id()
1672            .ok_or_else(|| Error::General("missing upload id from multipart upload".to_string()))?
1673            .to_string();
1674
1675        tracing::debug!(upload_id = %upload_id, "Multipart upload initiated");
1676
1677        let mut file = tokio::fs::File::open(file_path)
1678            .await
1679            .map_err(|e| Error::General(format!("open file '{}': {e}", file_path.display())))?;
1680        let mut completed_parts = Vec::new();
1681        let mut part_number: i32 = 1;
1682        let mut chunk = vec![0u8; part_buffer_size];
1683        let mut bytes_uploaded: u64 = 0;
1684
1685        loop {
1686            let bytes_read = Self::read_next_part(&mut file, file_path, &mut chunk).await?;
1687            if bytes_read == 0 {
1688                break;
1689            }
1690
1691            tracing::debug!(part_number, bytes_read, "Uploading part");
1692
1693            let body = aws_sdk_s3::primitives::ByteStream::from(chunk[..bytes_read].to_vec());
1694            let upload_part_result = self
1695                .inner
1696                .upload_part()
1697                .bucket(&path.bucket)
1698                .key(&path.key)
1699                .upload_id(&upload_id)
1700                .part_number(part_number)
1701                .body(body)
1702                .send()
1703                .await;
1704
1705            let upload_part_response = match upload_part_result {
1706                Ok(response) => response,
1707                Err(e) => {
1708                    tracing::debug!(
1709                        upload_id = %upload_id,
1710                        part_number,
1711                        "Aborting multipart upload due to error"
1712                    );
1713                    self.abort_multipart_upload_best_effort(path, &upload_id)
1714                        .await;
1715                    return Err(Error::Network(format!(
1716                        "upload multipart part {part_number}: {e}"
1717                    )));
1718                }
1719            };
1720
1721            let etag = match upload_part_response.e_tag() {
1722                Some(value) => value.trim_matches('"').to_string(),
1723                None => {
1724                    self.abort_multipart_upload_best_effort(path, &upload_id)
1725                        .await;
1726                    return Err(Error::General(format!(
1727                        "missing ETag for multipart part {part_number}"
1728                    )));
1729                }
1730            };
1731
1732            completed_parts.push(
1733                CompletedPart::builder()
1734                    .part_number(part_number)
1735                    .e_tag(etag)
1736                    .build(),
1737            );
1738
1739            bytes_uploaded += bytes_read as u64;
1740            on_progress(bytes_uploaded);
1741            tracing::debug!(part_number, bytes_uploaded, "Part uploaded");
1742
1743            part_number += 1;
1744        }
1745
1746        let completed_upload = CompletedMultipartUpload::builder()
1747            .set_parts(Some(completed_parts))
1748            .build();
1749        let complete_result = self
1750            .inner
1751            .complete_multipart_upload()
1752            .bucket(&path.bucket)
1753            .key(&path.key)
1754            .upload_id(&upload_id)
1755            .multipart_upload(completed_upload)
1756            .send()
1757            .await;
1758
1759        let complete_response = match complete_result {
1760            Ok(response) => response,
1761            Err(e) => {
1762                tracing::debug!(upload_id = %upload_id, "Attempting to abort multipart upload after completion failure");
1763                self.abort_multipart_upload_best_effort(path, &upload_id)
1764                    .await;
1765                return Err(Error::Network(format!("complete multipart upload: {e}")));
1766            }
1767        };
1768
1769        tracing::debug!("Multipart upload completed");
1770
1771        let mut info = ObjectInfo::file(&path.key, file_size as i64);
1772        if let Some(etag) = complete_response.e_tag() {
1773            info.etag = Some(etag.trim_matches('"').to_string());
1774        }
1775        info.last_modified = Some(jiff::Timestamp::now());
1776
1777        Ok(info)
1778    }
1779
1780    /// Upload a local file path to S3.
1781    ///
1782    /// Uses multipart upload for large files to avoid loading the entire file into memory.
1783    /// Calls `on_progress` after each uploaded part with total bytes sent so far.
1784    pub async fn put_object_from_path(
1785        &self,
1786        path: &RemotePath,
1787        file_path: &std::path::Path,
1788        content_type: Option<&str>,
1789        encryption: Option<&ObjectEncryptionRequest>,
1790        on_progress: impl Fn(u64) + Send,
1791    ) -> Result<ObjectInfo> {
1792        let metadata = tokio::fs::metadata(file_path).await.map_err(|e| {
1793            Error::General(format!("read metadata for '{}': {e}", file_path.display()))
1794        })?;
1795        if !metadata.is_file() {
1796            return Err(Error::General(format!(
1797                "source is not a file: {}",
1798                file_path.display()
1799            )));
1800        }
1801
1802        let file_size = metadata.len();
1803        if Self::should_use_multipart(file_size) {
1804            self.put_object_multipart_from_path(
1805                path,
1806                file_path,
1807                content_type,
1808                file_size,
1809                encryption,
1810                on_progress,
1811            )
1812            .await
1813        } else {
1814            self.put_object_single_part_from_path(
1815                path,
1816                file_path,
1817                content_type,
1818                file_size,
1819                encryption,
1820            )
1821            .await
1822        }
1823    }
1824}
1825
1826fn build_tagging(
1827    tags: std::collections::HashMap<String, String>,
1828) -> Result<aws_sdk_s3::types::Tagging> {
1829    use aws_sdk_s3::types::{Tag, Tagging};
1830
1831    let mut tag_set = Vec::with_capacity(tags.len());
1832    for (key, value) in tags {
1833        let tag = Tag::builder()
1834            .key(key)
1835            .value(value)
1836            .build()
1837            .map_err(|e| Error::General(format!("invalid tag: {e}")))?;
1838        tag_set.push(tag);
1839    }
1840
1841    Tagging::builder()
1842        .set_tag_set(Some(tag_set))
1843        .build()
1844        .map_err(|e| Error::General(format!("invalid tagging payload: {e}")))
1845}
1846
1847#[async_trait]
1848impl ObjectStore for S3Client {
1849    async fn list_buckets(&self) -> Result<Vec<ObjectInfo>> {
1850        let response = self
1851            .inner
1852            .list_buckets()
1853            .send()
1854            .await
1855            .map_err(|e| Error::Network(Self::format_sdk_error(&e)))?;
1856
1857        let buckets = response
1858            .buckets()
1859            .iter()
1860            .map(|b| {
1861                let mut info = ObjectInfo::bucket(b.name().unwrap_or_default());
1862                if let Some(creation_date) = b.creation_date() {
1863                    info.last_modified = jiff::Timestamp::from_second(creation_date.secs()).ok();
1864                }
1865                info
1866            })
1867            .collect();
1868
1869        Ok(buckets)
1870    }
1871
1872    async fn list_objects(&self, path: &RemotePath, options: ListOptions) -> Result<ListResult> {
1873        let mut request = self.inner.list_objects_v2().bucket(&path.bucket);
1874
1875        // Set prefix
1876        let prefix = if path.key.is_empty() {
1877            options.prefix.clone()
1878        } else if let Some(p) = &options.prefix {
1879            Some(format!("{}{}", path.key, p))
1880        } else {
1881            Some(path.key.clone())
1882        };
1883
1884        if let Some(p) = prefix {
1885            request = request.prefix(p);
1886        }
1887
1888        // Set delimiter (for non-recursive listing)
1889        if !options.recursive {
1890            request = request.delimiter(options.delimiter.as_deref().unwrap_or("/"));
1891        }
1892
1893        // Set max keys
1894        if let Some(max) = options.max_keys {
1895            request = request.max_keys(max);
1896        }
1897
1898        // Set continuation token
1899        if let Some(token) = &options.continuation_token {
1900            request = request.continuation_token(token);
1901        }
1902
1903        let response = request.send().await.map_err(|e| {
1904            let err_str = Self::format_sdk_error(&e);
1905            if err_str.contains("NotFound") || err_str.contains("NoSuchBucket") {
1906                Error::NotFound(format!("Bucket not found: {}", path.bucket))
1907            } else {
1908                Error::Network(err_str)
1909            }
1910        })?;
1911
1912        let mut items = Vec::new();
1913
1914        // Add common prefixes (directories)
1915        for prefix in response.common_prefixes() {
1916            if let Some(p) = prefix.prefix() {
1917                items.push(ObjectInfo::dir(p));
1918            }
1919        }
1920
1921        // Add objects
1922        for object in response.contents() {
1923            let key = object.key().unwrap_or_default().to_string();
1924            let size = object.size().unwrap_or(0);
1925            let mut info = ObjectInfo::file(&key, size);
1926
1927            if let Some(modified) = object.last_modified() {
1928                info.last_modified = jiff::Timestamp::from_second(modified.secs()).ok();
1929            }
1930
1931            if let Some(etag) = object.e_tag() {
1932                info.etag = Some(etag.trim_matches('"').to_string());
1933            }
1934
1935            if let Some(sc) = object.storage_class() {
1936                info.storage_class = Some(sc.as_str().to_string());
1937            }
1938
1939            items.push(info);
1940        }
1941
1942        Ok(ListResult {
1943            items,
1944            truncated: response.is_truncated().unwrap_or(false),
1945            continuation_token: response.next_continuation_token().map(|s| s.to_string()),
1946        })
1947    }
1948
1949    async fn head_object(&self, path: &RemotePath) -> Result<ObjectInfo> {
1950        let response = self
1951            .inner
1952            .head_object()
1953            .bucket(&path.bucket)
1954            .key(&path.key)
1955            .send()
1956            .await
1957            .map_err(|e| {
1958                let err_str = e.to_string();
1959                if err_str.contains("NotFound") || err_str.contains("NoSuchKey") {
1960                    Error::NotFound(path.to_string())
1961                } else {
1962                    Error::Network(err_str)
1963                }
1964            })?;
1965
1966        let size = response.content_length().unwrap_or(0);
1967        let mut info = ObjectInfo::file(&path.key, size);
1968
1969        if let Some(modified) = response.last_modified() {
1970            info.last_modified = jiff::Timestamp::from_second(modified.secs()).ok();
1971        }
1972
1973        if let Some(etag) = response.e_tag() {
1974            info.etag = Some(etag.trim_matches('"').to_string());
1975        }
1976
1977        if let Some(ct) = response.content_type() {
1978            info.content_type = Some(ct.to_string());
1979        }
1980
1981        if let Some(sc) = response.storage_class() {
1982            info.storage_class = Some(sc.as_str().to_string());
1983        }
1984
1985        if let Some(meta) = response.metadata()
1986            && !meta.is_empty()
1987        {
1988            info.metadata = Some(meta.clone());
1989        }
1990
1991        Ok(info)
1992    }
1993
1994    async fn bucket_exists(&self, bucket: &str) -> Result<bool> {
1995        match self.inner.head_bucket().bucket(bucket).send().await {
1996            Ok(_) => Ok(true),
1997            Err(e) => {
1998                // Check HTTP status code for 404 first to avoid unnecessary string formatting
1999                // Some S3-compatible services may return 404 without standard error codes
2000                if let aws_sdk_s3::error::SdkError::ServiceError(ref service_err) = e
2001                    && service_err.raw().status().as_u16() == 404
2002                {
2003                    return Ok(false);
2004                }
2005                let err_str = e.to_string();
2006                if err_str.contains("NotFound") || err_str.contains("NoSuchBucket") {
2007                    return Ok(false);
2008                }
2009                Err(Error::Network(err_str))
2010            }
2011        }
2012    }
2013
2014    async fn create_bucket(&self, bucket: &str) -> Result<()> {
2015        self.inner
2016            .create_bucket()
2017            .bucket(bucket)
2018            .send()
2019            .await
2020            .map_err(|e| Error::Network(Self::format_sdk_error(&e)))?;
2021
2022        Ok(())
2023    }
2024
2025    async fn delete_bucket(&self, bucket: &str) -> Result<()> {
2026        self.inner
2027            .delete_bucket()
2028            .bucket(bucket)
2029            .send()
2030            .await
2031            .map_err(|e| {
2032                let err_str = Self::format_sdk_error(&e);
2033                if err_str.contains("NotFound") || err_str.contains("NoSuchBucket") {
2034                    Error::NotFound(format!("Bucket not found: {bucket}"))
2035                } else if err_str.contains("BucketNotEmpty") {
2036                    Error::Conflict(err_str)
2037                } else {
2038                    Error::Network(err_str)
2039                }
2040            })?;
2041
2042        Ok(())
2043    }
2044
2045    async fn capabilities(&self) -> Result<Capabilities> {
2046        // Best-effort hints for common S3-compatible backends. `select` is not inferred here
2047        // because `rc sql` determines support from the real request result.
2048        Ok(Capabilities {
2049            versioning: true,
2050            object_lock: false,
2051            tagging: true,
2052            anonymous: true,
2053            select: false,
2054            notifications: true,
2055            lifecycle: true,
2056            replication: true,
2057            cors: true,
2058        })
2059    }
2060
2061    async fn get_object(&self, path: &RemotePath) -> Result<Vec<u8>> {
2062        self.get_object_with_progress(path, |_, _| {}).await
2063    }
2064
2065    async fn put_object(
2066        &self,
2067        path: &RemotePath,
2068        data: Vec<u8>,
2069        content_type: Option<&str>,
2070        encryption: Option<&ObjectEncryptionRequest>,
2071    ) -> Result<ObjectInfo> {
2072        let size = data.len() as i64;
2073        let body = aws_sdk_s3::primitives::ByteStream::from(data);
2074
2075        let mut request = apply_object_encryption_to_put_request(
2076            self.inner
2077                .put_object()
2078                .bucket(&path.bucket)
2079                .key(&path.key)
2080                .body(body),
2081            encryption,
2082        );
2083
2084        if let Some(ct) = content_type {
2085            request = request.content_type(ct);
2086        }
2087
2088        let response = request
2089            .send()
2090            .await
2091            .map_err(|e| Error::Network(e.to_string()))?;
2092
2093        let mut info = ObjectInfo::file(&path.key, size);
2094        if let Some(etag) = response.e_tag() {
2095            info.etag = Some(etag.trim_matches('"').to_string());
2096        }
2097        info.last_modified = Some(jiff::Timestamp::now());
2098
2099        Ok(info)
2100    }
2101
2102    async fn delete_object(&self, path: &RemotePath) -> Result<()> {
2103        self.delete_object_with_options(path, DeleteRequestOptions::default())
2104            .await
2105    }
2106
2107    async fn delete_objects(&self, bucket: &str, keys: Vec<String>) -> Result<Vec<String>> {
2108        self.delete_objects_with_options(bucket, keys, DeleteRequestOptions::default())
2109            .await
2110    }
2111
2112    async fn copy_object(
2113        &self,
2114        src: &RemotePath,
2115        dst: &RemotePath,
2116        encryption: Option<&ObjectEncryptionRequest>,
2117    ) -> Result<ObjectInfo> {
2118        // Build copy source: bucket/key
2119        let copy_source = format!("{}/{}", src.bucket, src.key);
2120
2121        let response = apply_object_encryption_to_copy_request(
2122            self.inner
2123                .copy_object()
2124                .copy_source(&copy_source)
2125                .bucket(&dst.bucket)
2126                .key(&dst.key),
2127            encryption,
2128        )
2129        .send()
2130        .await
2131        .map_err(|e| {
2132            let err_str = e.to_string();
2133            if err_str.contains("NotFound") || err_str.contains("NoSuchKey") {
2134                Error::NotFound(src.to_string())
2135            } else {
2136                Error::Network(err_str)
2137            }
2138        })?;
2139
2140        // Get size from head_object since copy doesn't return it
2141        let info = self.head_object(dst).await?;
2142
2143        // Update etag from copy response if available
2144        let mut result = info;
2145        if let Some(copy_result) = response.copy_object_result()
2146            && let Some(etag) = copy_result.e_tag()
2147        {
2148            result.etag = Some(etag.trim_matches('"').to_string());
2149        }
2150
2151        Ok(result)
2152    }
2153
2154    async fn presign_get(&self, path: &RemotePath, expires_secs: u64) -> Result<String> {
2155        let config = aws_sdk_s3::presigning::PresigningConfig::builder()
2156            .expires_in(std::time::Duration::from_secs(expires_secs))
2157            .build()
2158            .map_err(|e| Error::General(format!("presign_get config: {e}")))?;
2159
2160        let request = self
2161            .presign_inner
2162            .get_object()
2163            .bucket(&path.bucket)
2164            .key(&path.key)
2165            .presigned(config)
2166            .await
2167            .map_err(|e| Error::General(format!("presign_get: {e}")))?;
2168
2169        Ok(request.uri().to_string())
2170    }
2171
2172    async fn presign_put(
2173        &self,
2174        path: &RemotePath,
2175        expires_secs: u64,
2176        content_type: Option<&str>,
2177    ) -> Result<String> {
2178        let config = aws_sdk_s3::presigning::PresigningConfig::builder()
2179            .expires_in(std::time::Duration::from_secs(expires_secs))
2180            .build()
2181            .map_err(|e| Error::General(format!("presign_put config: {e}")))?;
2182
2183        let mut builder = self
2184            .presign_inner
2185            .put_object()
2186            .bucket(&path.bucket)
2187            .key(&path.key);
2188
2189        if let Some(ct) = content_type {
2190            builder = builder.content_type(ct);
2191        }
2192
2193        let request = builder
2194            .presigned(config)
2195            .await
2196            .map_err(|e| Error::General(format!("presign_put: {e}")))?;
2197
2198        Ok(request.uri().to_string())
2199    }
2200
2201    async fn get_versioning(&self, bucket: &str) -> Result<Option<bool>> {
2202        let response = self
2203            .inner
2204            .get_bucket_versioning()
2205            .bucket(bucket)
2206            .send()
2207            .await
2208            .map_err(|e| Error::General(format!("get_versioning: {e}")))?;
2209
2210        Ok(response
2211            .status()
2212            .map(|s| *s == aws_sdk_s3::types::BucketVersioningStatus::Enabled))
2213    }
2214
2215    async fn set_versioning(&self, bucket: &str, enabled: bool) -> Result<()> {
2216        use aws_sdk_s3::types::{BucketVersioningStatus, VersioningConfiguration};
2217
2218        let status = if enabled {
2219            BucketVersioningStatus::Enabled
2220        } else {
2221            BucketVersioningStatus::Suspended
2222        };
2223
2224        let config = VersioningConfiguration::builder().status(status).build();
2225
2226        self.inner
2227            .put_bucket_versioning()
2228            .bucket(bucket)
2229            .versioning_configuration(config)
2230            .send()
2231            .await
2232            .map_err(|e| Error::General(format!("set_versioning: {e}")))?;
2233
2234        Ok(())
2235    }
2236
2237    async fn get_bucket_encryption(&self, bucket: &str) -> Result<Option<BucketEncryption>> {
2238        let response = match self
2239            .inner
2240            .get_bucket_encryption()
2241            .bucket(bucket)
2242            .send()
2243            .await
2244        {
2245            Ok(response) => response,
2246            Err(error) => {
2247                let error_text = Self::format_sdk_error(&error);
2248                let missing_config =
2249                    if let aws_sdk_s3::error::SdkError::ServiceError(service_err) = &error {
2250                        is_missing_bucket_encryption_response(
2251                            service_err.err().code(),
2252                            Some(service_err.raw().status().as_u16()),
2253                            &error_text,
2254                        )
2255                    } else {
2256                        is_missing_bucket_encryption_response(None, None, &error_text)
2257                    };
2258                if missing_config {
2259                    return Ok(None);
2260                }
2261                return Err(Error::General(format!(
2262                    "get_bucket_encryption: {error_text}"
2263                )));
2264            }
2265        };
2266
2267        let rule = response
2268            .server_side_encryption_configuration()
2269            .and_then(|config| config.rules().first())
2270            .and_then(|rule| rule.apply_server_side_encryption_by_default())
2271            .ok_or_else(|| {
2272                Error::General("get_bucket_encryption: missing bucket encryption rule".to_string())
2273            })?;
2274
2275        sdk_bucket_encryption_to_core(rule).map(Some)
2276    }
2277
2278    async fn set_bucket_encryption(
2279        &self,
2280        bucket: &str,
2281        encryption: BucketEncryption,
2282    ) -> Result<()> {
2283        let configuration = core_bucket_encryption_to_sdk(&encryption);
2284
2285        self.inner
2286            .put_bucket_encryption()
2287            .bucket(bucket)
2288            .server_side_encryption_configuration(configuration)
2289            .send()
2290            .await
2291            .map_err(|e| Error::General(format!("set_bucket_encryption: {e}")))?;
2292
2293        Ok(())
2294    }
2295
2296    async fn delete_bucket_encryption(&self, bucket: &str) -> Result<()> {
2297        match self
2298            .inner
2299            .delete_bucket_encryption()
2300            .bucket(bucket)
2301            .send()
2302            .await
2303        {
2304            Ok(_) => Ok(()),
2305            Err(error) => {
2306                let error_text = Self::format_sdk_error(&error);
2307                let missing_config =
2308                    if let aws_sdk_s3::error::SdkError::ServiceError(service_err) = &error {
2309                        is_missing_bucket_encryption_response(
2310                            service_err.err().code(),
2311                            Some(service_err.raw().status().as_u16()),
2312                            &error_text,
2313                        )
2314                    } else {
2315                        is_missing_bucket_encryption_response(None, None, &error_text)
2316                    };
2317                if missing_config {
2318                    Ok(())
2319                } else {
2320                    Err(Error::General(format!(
2321                        "delete_bucket_encryption: {error_text}"
2322                    )))
2323                }
2324            }
2325        }
2326    }
2327
2328    async fn list_object_versions(
2329        &self,
2330        path: &RemotePath,
2331        max_keys: Option<i32>,
2332    ) -> Result<Vec<ObjectVersion>> {
2333        Ok(self.list_object_versions_page(path, max_keys).await?.items)
2334    }
2335
2336    async fn get_object_tags(
2337        &self,
2338        path: &RemotePath,
2339    ) -> Result<std::collections::HashMap<String, String>> {
2340        let response = match self
2341            .inner
2342            .get_object_tagging()
2343            .bucket(&path.bucket)
2344            .key(&path.key)
2345            .send()
2346            .await
2347        {
2348            Ok(response) => response,
2349            Err(e) => {
2350                if e.to_string().contains("NoSuchTagSet") {
2351                    return Ok(std::collections::HashMap::new());
2352                }
2353                return Err(Error::General(format!("get_object_tags: {e}")));
2354            }
2355        };
2356
2357        let mut tags = std::collections::HashMap::new();
2358        for tag in response.tag_set() {
2359            let key = tag.key();
2360            let value = tag.value();
2361            tags.insert(key.to_string(), value.to_string());
2362        }
2363
2364        Ok(tags)
2365    }
2366
2367    async fn get_bucket_tags(
2368        &self,
2369        bucket: &str,
2370    ) -> Result<std::collections::HashMap<String, String>> {
2371        let response = match self.inner.get_bucket_tagging().bucket(bucket).send().await {
2372            Ok(response) => response,
2373            Err(e) => {
2374                if e.to_string().contains("NoSuchTagSet") {
2375                    return Ok(std::collections::HashMap::new());
2376                }
2377                return Err(Error::General(format!("get_bucket_tags: {e}")));
2378            }
2379        };
2380
2381        let mut tags = std::collections::HashMap::new();
2382        for tag in response.tag_set() {
2383            let key = tag.key();
2384            let value = tag.value();
2385            tags.insert(key.to_string(), value.to_string());
2386        }
2387
2388        Ok(tags)
2389    }
2390
2391    async fn set_object_tags(
2392        &self,
2393        path: &RemotePath,
2394        tags: std::collections::HashMap<String, String>,
2395    ) -> Result<()> {
2396        let tagging = build_tagging(tags)?;
2397
2398        self.inner
2399            .put_object_tagging()
2400            .bucket(&path.bucket)
2401            .key(&path.key)
2402            .tagging(tagging)
2403            .send()
2404            .await
2405            .map_err(|e| Error::General(format!("set_object_tags: {e}")))?;
2406
2407        Ok(())
2408    }
2409
2410    async fn set_bucket_tags(
2411        &self,
2412        bucket: &str,
2413        tags: std::collections::HashMap<String, String>,
2414    ) -> Result<()> {
2415        let tagging = build_tagging(tags)?;
2416
2417        self.inner
2418            .put_bucket_tagging()
2419            .bucket(bucket)
2420            .tagging(tagging)
2421            .send()
2422            .await
2423            .map_err(|e| Error::General(format!("set_bucket_tags: {e}")))?;
2424
2425        Ok(())
2426    }
2427
2428    async fn delete_object_tags(&self, path: &RemotePath) -> Result<()> {
2429        self.inner
2430            .delete_object_tagging()
2431            .bucket(&path.bucket)
2432            .key(&path.key)
2433            .send()
2434            .await
2435            .map_err(|e| Error::General(format!("delete_object_tags: {e}")))?;
2436
2437        Ok(())
2438    }
2439
2440    async fn delete_bucket_tags(&self, bucket: &str) -> Result<()> {
2441        self.inner
2442            .delete_bucket_tagging()
2443            .bucket(bucket)
2444            .send()
2445            .await
2446            .map_err(|e| Error::General(format!("delete_bucket_tags: {e}")))?;
2447
2448        Ok(())
2449    }
2450
2451    async fn get_bucket_policy(&self, bucket: &str) -> Result<Option<String>> {
2452        let response = match self.inner.get_bucket_policy().bucket(bucket).send().await {
2453            Ok(policy) => policy,
2454            Err(error) => {
2455                let error_text = error.to_string();
2456                let kind = if let aws_sdk_s3::error::SdkError::ServiceError(service_err) = &error {
2457                    let code = service_err
2458                        .raw()
2459                        .headers()
2460                        .get("x-amz-error-code")
2461                        .and_then(|value| std::str::from_utf8(value.as_bytes()).ok());
2462                    let status = Some(service_err.raw().status().as_u16());
2463                    Self::bucket_policy_error_kind(code, status, &error_text)
2464                } else {
2465                    Self::bucket_policy_error_kind(None, None, &error_text)
2466                };
2467                return Self::map_get_bucket_policy_error(bucket, kind, &error_text);
2468            }
2469        };
2470
2471        Ok(response.policy().map(|policy| policy.to_string()))
2472    }
2473
2474    async fn set_bucket_policy(&self, bucket: &str, policy: &str) -> Result<()> {
2475        self.inner
2476            .put_bucket_policy()
2477            .bucket(bucket)
2478            .policy(policy)
2479            .send()
2480            .await
2481            .map_err(|e| Error::General(format!("set_bucket_policy: {e}")))?;
2482
2483        Ok(())
2484    }
2485
2486    async fn delete_bucket_policy(&self, bucket: &str) -> Result<()> {
2487        match self
2488            .inner
2489            .delete_bucket_policy()
2490            .bucket(bucket)
2491            .send()
2492            .await
2493        {
2494            Ok(_) => Ok(()),
2495            Err(e) => {
2496                let error_text = e.to_string();
2497                let kind = if let aws_sdk_s3::error::SdkError::ServiceError(service_err) = &e {
2498                    let code = service_err
2499                        .raw()
2500                        .headers()
2501                        .get("x-amz-error-code")
2502                        .and_then(|value| std::str::from_utf8(value.as_bytes()).ok());
2503                    let status = Some(service_err.raw().status().as_u16());
2504                    Self::bucket_policy_error_kind(code, status, &error_text)
2505                } else {
2506                    Self::bucket_policy_error_kind(None, None, &error_text)
2507                };
2508                Self::map_delete_bucket_policy_error(bucket, kind, &error_text)
2509            }
2510        }
2511    }
2512
2513    async fn get_bucket_cors(&self, bucket: &str) -> Result<Vec<CorsRule>> {
2514        let response = match self.inner.get_bucket_cors().bucket(bucket).send().await {
2515            Ok(response) => response,
2516            Err(error) => {
2517                let error_text = error.to_string();
2518                if error_text.contains("service error")
2519                    && let Ok(url) = self.cors_url(bucket)
2520                {
2521                    match self.xml_request(Method::GET, url, None, None).await {
2522                        Ok(body) => return parse_cors_configuration_xml(&body),
2523                        Err(Error::Network(raw_error))
2524                            if is_missing_cors_configuration_error(&raw_error) =>
2525                        {
2526                            return Ok(Vec::new());
2527                        }
2528                        Err(_) => {}
2529                    }
2530                }
2531
2532                let missing_config =
2533                    if let aws_sdk_s3::error::SdkError::ServiceError(service_err) = &error {
2534                        is_missing_cors_configuration_response(
2535                            service_err.err().code(),
2536                            Some(service_err.raw().status().as_u16()),
2537                            &error_text,
2538                        )
2539                    } else {
2540                        is_missing_cors_configuration_response(None, None, &error_text)
2541                    };
2542
2543                if missing_config {
2544                    return Ok(Vec::new());
2545                }
2546                return Err(Error::General(format!("get_bucket_cors: {error_text}")));
2547            }
2548        };
2549
2550        Ok(response
2551            .cors_rules()
2552            .iter()
2553            .map(sdk_cors_rule_to_core)
2554            .collect())
2555    }
2556
2557    async fn set_bucket_cors(&self, bucket: &str, rules: Vec<CorsRule>) -> Result<()> {
2558        let cors_rules = rules
2559            .iter()
2560            .map(core_cors_rule_to_sdk)
2561            .collect::<Result<Vec<_>>>()?;
2562        let cors_configuration = aws_sdk_s3::types::CorsConfiguration::builder()
2563            .set_cors_rules(Some(cors_rules))
2564            .build()
2565            .map_err(|e| Error::General(format!("build bucket cors config: {e}")))?;
2566
2567        self.inner
2568            .put_bucket_cors()
2569            .bucket(bucket)
2570            .cors_configuration(cors_configuration)
2571            .send()
2572            .await
2573            .map_err(|e| Error::General(format!("set_bucket_cors: {e}")))?;
2574
2575        Ok(())
2576    }
2577
2578    async fn delete_bucket_cors(&self, bucket: &str) -> Result<()> {
2579        match self.inner.delete_bucket_cors().bucket(bucket).send().await {
2580            Ok(_) => Ok(()),
2581            Err(error) => {
2582                let error_text = error.to_string();
2583                if error_text.contains("service error")
2584                    && let Ok(url) = self.cors_url(bucket)
2585                {
2586                    match self.xml_request(Method::DELETE, url, None, None).await {
2587                        Ok(_) => return Ok(()),
2588                        Err(Error::Network(raw_error))
2589                            if is_missing_cors_configuration_error(&raw_error) =>
2590                        {
2591                            return Ok(());
2592                        }
2593                        Err(_) => {}
2594                    }
2595                }
2596
2597                let missing_config =
2598                    if let aws_sdk_s3::error::SdkError::ServiceError(service_err) = &error {
2599                        is_missing_cors_configuration_response(
2600                            service_err.err().code(),
2601                            Some(service_err.raw().status().as_u16()),
2602                            &error_text,
2603                        )
2604                    } else {
2605                        is_missing_cors_configuration_response(None, None, &error_text)
2606                    };
2607
2608                if missing_config {
2609                    Ok(())
2610                } else {
2611                    Err(Error::General(format!("delete_bucket_cors: {error_text}")))
2612                }
2613            }
2614        }
2615    }
2616
2617    async fn get_bucket_notifications(&self, bucket: &str) -> Result<Vec<BucketNotification>> {
2618        let response = self
2619            .inner
2620            .get_bucket_notification_configuration()
2621            .bucket(bucket)
2622            .send()
2623            .await
2624            .map_err(|e| Error::General(format!("get_bucket_notifications: {e}")))?;
2625
2626        let mut rules = Vec::new();
2627
2628        for cfg in response.queue_configurations() {
2629            let (prefix, suffix) = Self::extract_notification_filter(cfg.filter());
2630            rules.push(BucketNotification {
2631                id: cfg.id().map(ToString::to_string),
2632                target: NotificationTarget::Queue,
2633                arn: cfg.queue_arn().to_string(),
2634                events: Self::event_list_to_strings(cfg.events()),
2635                prefix,
2636                suffix,
2637            });
2638        }
2639
2640        for cfg in response.topic_configurations() {
2641            let (prefix, suffix) = Self::extract_notification_filter(cfg.filter());
2642            rules.push(BucketNotification {
2643                id: cfg.id().map(ToString::to_string),
2644                target: NotificationTarget::Topic,
2645                arn: cfg.topic_arn().to_string(),
2646                events: Self::event_list_to_strings(cfg.events()),
2647                prefix,
2648                suffix,
2649            });
2650        }
2651
2652        for cfg in response.lambda_function_configurations() {
2653            let (prefix, suffix) = Self::extract_notification_filter(cfg.filter());
2654            rules.push(BucketNotification {
2655                id: cfg.id().map(ToString::to_string),
2656                target: NotificationTarget::Lambda,
2657                arn: cfg.lambda_function_arn().to_string(),
2658                events: Self::event_list_to_strings(cfg.events()),
2659                prefix,
2660                suffix,
2661            });
2662        }
2663
2664        Ok(rules)
2665    }
2666
2667    async fn set_bucket_notifications(
2668        &self,
2669        bucket: &str,
2670        notifications: Vec<BucketNotification>,
2671    ) -> Result<()> {
2672        use aws_sdk_s3::types::{
2673            LambdaFunctionConfiguration, NotificationConfiguration, QueueConfiguration,
2674            TopicConfiguration,
2675        };
2676
2677        let expected_notifications = notifications.clone();
2678        let mut queues = Vec::new();
2679        let mut topics = Vec::new();
2680        let mut lambdas = Vec::new();
2681
2682        for rule in notifications {
2683            let events = Self::strings_to_event_list(&rule.events);
2684            if events.is_empty() {
2685                return Err(Error::General(format!(
2686                    "set_bucket_notifications: empty event list for target '{}'",
2687                    rule.arn
2688                )));
2689            }
2690
2691            let filter =
2692                Self::build_notification_filter(rule.prefix.as_deref(), rule.suffix.as_deref());
2693
2694            match rule.target {
2695                NotificationTarget::Queue => {
2696                    let mut builder = QueueConfiguration::builder()
2697                        .queue_arn(rule.arn)
2698                        .set_events(Some(events))
2699                        .set_id(rule.id);
2700                    if let Some(filter) = filter {
2701                        builder = builder.filter(filter);
2702                    }
2703                    let queue = builder
2704                        .build()
2705                        .map_err(|e| Error::General(format!("build queue notification: {e}")))?;
2706                    queues.push(queue);
2707                }
2708                NotificationTarget::Topic => {
2709                    let mut builder = TopicConfiguration::builder()
2710                        .topic_arn(rule.arn)
2711                        .set_events(Some(events))
2712                        .set_id(rule.id);
2713                    if let Some(filter) = filter {
2714                        builder = builder.filter(filter);
2715                    }
2716                    let topic = builder
2717                        .build()
2718                        .map_err(|e| Error::General(format!("build topic notification: {e}")))?;
2719                    topics.push(topic);
2720                }
2721                NotificationTarget::Lambda => {
2722                    let mut builder = LambdaFunctionConfiguration::builder()
2723                        .lambda_function_arn(rule.arn)
2724                        .set_events(Some(events))
2725                        .set_id(rule.id);
2726                    if let Some(filter) = filter {
2727                        builder = builder.filter(filter);
2728                    }
2729                    let lambda = builder
2730                        .build()
2731                        .map_err(|e| Error::General(format!("build lambda notification: {e}")))?;
2732                    lambdas.push(lambda);
2733                }
2734            }
2735        }
2736
2737        let config = NotificationConfiguration::builder()
2738            .set_queue_configurations(Some(queues))
2739            .set_topic_configurations(Some(topics))
2740            .set_lambda_function_configurations(Some(lambdas))
2741            .build();
2742
2743        match self
2744            .inner
2745            .put_bucket_notification_configuration()
2746            .bucket(bucket)
2747            .notification_configuration(config)
2748            .send()
2749            .await
2750        {
2751            Ok(_) => {}
2752            Err(error) => {
2753                let error_text = error.to_string();
2754                // RustFS may apply notification configuration but still return a non-AWS
2755                // response envelope that the SDK reports as "service error".
2756                if error_text.contains("service error")
2757                    && let Ok(actual) = self.get_bucket_notifications(bucket).await
2758                    && Self::notifications_equivalent(&expected_notifications, &actual)
2759                {
2760                    return Ok(());
2761                }
2762                return Err(Error::General(format!(
2763                    "set_bucket_notifications: {error_text}"
2764                )));
2765            }
2766        }
2767
2768        Ok(())
2769    }
2770
2771    async fn get_bucket_lifecycle(&self, bucket: &str) -> Result<Vec<LifecycleRule>> {
2772        let response = match self
2773            .inner
2774            .get_bucket_lifecycle_configuration()
2775            .bucket(bucket)
2776            .send()
2777            .await
2778        {
2779            Ok(resp) => resp,
2780            Err(error) => {
2781                let error_text = Self::format_sdk_error(&error);
2782                if error_text.contains("NoSuchLifecycleConfiguration")
2783                    || error_text.contains("lifecycle configuration is not found")
2784                {
2785                    return Ok(Vec::new());
2786                }
2787                return Err(Error::General(format!(
2788                    "get_bucket_lifecycle: {error_text}"
2789                )));
2790            }
2791        };
2792
2793        let mut rules = Vec::new();
2794        for sdk_rule in response.rules() {
2795            let id = sdk_rule.id().unwrap_or("").to_string();
2796            let status = match sdk_rule.status().as_str() {
2797                "Enabled" => rc_core::LifecycleRuleStatus::Enabled,
2798                _ => rc_core::LifecycleRuleStatus::Disabled,
2799            };
2800
2801            let prefix = parse_lifecycle_filter_prefix(sdk_rule.filter());
2802            let tags = parse_lifecycle_filter_tags(sdk_rule.filter());
2803
2804            let expiration = sdk_rule
2805                .expiration()
2806                .map(|exp| rc_core::LifecycleExpiration {
2807                    days: exp.days(),
2808                    date: exp.date().map(|d| d.to_string()),
2809                });
2810
2811            let transition = sdk_rule
2812                .transitions()
2813                .first()
2814                .map(|t| rc_core::LifecycleTransition {
2815                    days: t.days(),
2816                    date: t.date().map(|d| d.to_string()),
2817                    storage_class: t
2818                        .storage_class()
2819                        .map(|sc| sc.as_str().to_string())
2820                        .unwrap_or_default(),
2821                });
2822
2823            let noncurrent_version_expiration =
2824                sdk_rule.noncurrent_version_expiration().map(|nve| {
2825                    rc_core::NoncurrentVersionExpiration {
2826                        noncurrent_days: nve.noncurrent_days().unwrap_or(0),
2827                        newer_noncurrent_versions: nve.newer_noncurrent_versions(),
2828                    }
2829                });
2830
2831            let noncurrent_version_transition = sdk_rule
2832                .noncurrent_version_transitions()
2833                .first()
2834                .map(|nvt| rc_core::NoncurrentVersionTransition {
2835                    noncurrent_days: nvt.noncurrent_days().unwrap_or(0),
2836                    storage_class: nvt
2837                        .storage_class()
2838                        .map(|sc| sc.as_str().to_string())
2839                        .unwrap_or_default(),
2840                });
2841
2842            let abort_incomplete_multipart_upload_days = sdk_rule
2843                .abort_incomplete_multipart_upload()
2844                .and_then(|a| a.days_after_initiation());
2845
2846            let expired_object_delete_marker = sdk_rule
2847                .expiration()
2848                .and_then(|e| e.expired_object_delete_marker())
2849                .filter(|v| *v);
2850
2851            rules.push(LifecycleRule {
2852                id,
2853                status,
2854                prefix,
2855                tags,
2856                expiration,
2857                transition,
2858                noncurrent_version_expiration,
2859                noncurrent_version_transition,
2860                abort_incomplete_multipart_upload_days,
2861                expired_object_delete_marker,
2862            });
2863        }
2864
2865        Ok(rules)
2866    }
2867
2868    async fn set_bucket_lifecycle(&self, bucket: &str, rules: Vec<LifecycleRule>) -> Result<()> {
2869        use aws_sdk_s3::types::{
2870            AbortIncompleteMultipartUpload, BucketLifecycleConfiguration, ExpirationStatus,
2871            LifecycleExpiration as SdkExpiration, LifecycleRule as SdkRule,
2872            NoncurrentVersionExpiration as SdkNve, NoncurrentVersionTransition as SdkNvt,
2873            Transition, TransitionStorageClass,
2874        };
2875
2876        let mut sdk_rules = Vec::new();
2877        for rule in rules {
2878            let status = match rule.status {
2879                rc_core::LifecycleRuleStatus::Enabled => ExpirationStatus::Enabled,
2880                rc_core::LifecycleRuleStatus::Disabled => ExpirationStatus::Disabled,
2881            };
2882
2883            let filter = build_lifecycle_rule_filter(rule.prefix.as_deref(), rule.tags.as_ref())?;
2884
2885            let expiration = rule.expiration.map(|exp| {
2886                let mut builder = SdkExpiration::builder();
2887                if let Some(days) = exp.days {
2888                    builder = builder.days(days);
2889                }
2890                if let Some(ref date_str) = exp.date
2891                    && let Ok(dt) = aws_smithy_types::DateTime::from_str(
2892                        date_str,
2893                        aws_smithy_types::date_time::Format::DateTime,
2894                    )
2895                {
2896                    builder = builder.date(dt);
2897                }
2898                if let Some(true) = rule.expired_object_delete_marker {
2899                    builder = builder.expired_object_delete_marker(true);
2900                }
2901                builder.build()
2902            });
2903
2904            let transitions = rule.transition.map(|t| {
2905                #[allow(deprecated)]
2906                let sc = TransitionStorageClass::from(t.storage_class.as_str());
2907                let mut builder = Transition::builder().storage_class(sc);
2908                if let Some(days) = t.days {
2909                    builder = builder.days(days);
2910                }
2911                if let Some(ref date_str) = t.date
2912                    && let Ok(dt) = aws_smithy_types::DateTime::from_str(
2913                        date_str,
2914                        aws_smithy_types::date_time::Format::DateTime,
2915                    )
2916                {
2917                    builder = builder.date(dt);
2918                }
2919                vec![builder.build()]
2920            });
2921
2922            let nve = rule.noncurrent_version_expiration.map(|nve| {
2923                let mut builder = SdkNve::builder().noncurrent_days(nve.noncurrent_days);
2924                if let Some(newer) = nve.newer_noncurrent_versions {
2925                    builder = builder.newer_noncurrent_versions(newer);
2926                }
2927                builder.build()
2928            });
2929
2930            let nvt = rule.noncurrent_version_transition.map(|nvt| {
2931                let sc = TransitionStorageClass::from(nvt.storage_class.as_str());
2932                let builder = SdkNvt::builder()
2933                    .noncurrent_days(nvt.noncurrent_days)
2934                    .storage_class(sc);
2935                vec![builder.build()]
2936            });
2937
2938            let abort = rule.abort_incomplete_multipart_upload_days.map(|days| {
2939                AbortIncompleteMultipartUpload::builder()
2940                    .days_after_initiation(days)
2941                    .build()
2942            });
2943
2944            let mut builder = SdkRule::builder().id(&rule.id).status(status);
2945            if let Some(filter) = filter {
2946                builder = builder.filter(filter);
2947            }
2948            if let Some(expiration) = expiration {
2949                builder = builder.expiration(expiration);
2950            }
2951            if let Some(transitions) = transitions {
2952                builder = builder.set_transitions(Some(transitions));
2953            }
2954            if let Some(nve) = nve {
2955                builder = builder.noncurrent_version_expiration(nve);
2956            }
2957            if let Some(nvt) = nvt {
2958                builder = builder.set_noncurrent_version_transitions(Some(nvt));
2959            }
2960            if let Some(abort) = abort {
2961                builder = builder.abort_incomplete_multipart_upload(abort);
2962            }
2963
2964            let sdk_rule = builder
2965                .build()
2966                .map_err(|e| Error::General(format!("build lifecycle rule: {e}")))?;
2967            sdk_rules.push(sdk_rule);
2968        }
2969
2970        let config = BucketLifecycleConfiguration::builder()
2971            .set_rules(Some(sdk_rules))
2972            .build()
2973            .map_err(|e| Error::General(format!("build lifecycle config: {e}")))?;
2974
2975        self.inner
2976            .put_bucket_lifecycle_configuration()
2977            .bucket(bucket)
2978            .lifecycle_configuration(config)
2979            .send()
2980            .await
2981            .map_err(|e| {
2982                Error::General(format!(
2983                    "set_bucket_lifecycle: {}",
2984                    Self::format_sdk_error(&e)
2985                ))
2986            })?;
2987
2988        Ok(())
2989    }
2990
2991    async fn delete_bucket_lifecycle(&self, bucket: &str) -> Result<()> {
2992        self.inner
2993            .delete_bucket_lifecycle()
2994            .bucket(bucket)
2995            .send()
2996            .await
2997            .map_err(|e| {
2998                Error::General(format!(
2999                    "delete_bucket_lifecycle: {}",
3000                    Self::format_sdk_error(&e)
3001                ))
3002            })?;
3003        Ok(())
3004    }
3005
3006    async fn restore_object(&self, path: &RemotePath, days: i32) -> Result<()> {
3007        use aws_sdk_s3::types::RestoreRequest;
3008
3009        let request = RestoreRequest::builder().days(days).build();
3010        self.inner
3011            .restore_object()
3012            .bucket(&path.bucket)
3013            .key(&path.key)
3014            .restore_request(request)
3015            .send()
3016            .await
3017            .map_err(|e| {
3018                Error::General(format!("restore_object: {}", Self::format_sdk_error(&e)))
3019            })?;
3020        Ok(())
3021    }
3022
3023    async fn get_bucket_replication(
3024        &self,
3025        bucket: &str,
3026    ) -> Result<Option<ReplicationConfiguration>> {
3027        let url = self.replication_url(bucket)?;
3028        let body = match self.xml_request(Method::GET, url, None, None).await {
3029            Ok(body) => body,
3030            Err(Error::Network(error_text))
3031                if error_text.contains("ReplicationConfigurationNotFound")
3032                    || error_text.contains("replication configuration is not found")
3033                    || error_text.contains("replication not found") =>
3034            {
3035                return Ok(None);
3036            }
3037            Err(error) => {
3038                return Err(Error::General(format!("get_bucket_replication: {error}")));
3039            }
3040        };
3041
3042        parse_replication_configuration_xml(&body).map(Some)
3043    }
3044
3045    async fn set_bucket_replication(
3046        &self,
3047        bucket: &str,
3048        config: ReplicationConfiguration,
3049    ) -> Result<()> {
3050        let url = self.replication_url(bucket)?;
3051        let body = build_replication_configuration_xml(&config).into_bytes();
3052        self.xml_request(Method::PUT, url, Some("application/xml"), Some(body))
3053            .await
3054            .map_err(|e| Error::General(format!("set_bucket_replication: {e}")))?;
3055
3056        Ok(())
3057    }
3058
3059    async fn delete_bucket_replication(&self, bucket: &str) -> Result<()> {
3060        self.inner
3061            .delete_bucket_replication()
3062            .bucket(bucket)
3063            .send()
3064            .await
3065            .map_err(|e| {
3066                Error::General(format!(
3067                    "delete_bucket_replication: {}",
3068                    Self::format_sdk_error(&e)
3069                ))
3070            })?;
3071        Ok(())
3072    }
3073
3074    async fn select_object_content(
3075        &self,
3076        path: &RemotePath,
3077        options: &SelectOptions,
3078        writer: &mut (dyn AsyncWrite + Send + Unpin),
3079    ) -> Result<()> {
3080        crate::select::select_object_content(&self.inner, path, options, writer).await
3081    }
3082}
3083
3084#[cfg(test)]
3085mod tests {
3086    use super::*;
3087    use aws_smithy_http_client::test_util::{CaptureRequestReceiver, capture_request};
3088    use std::collections::HashMap;
3089    use std::io::{Read, Write};
3090    use std::net::{TcpListener, TcpStream};
3091    use std::sync::mpsc;
3092    use std::thread;
3093    use std::time::{Duration, Instant};
3094
3095    #[derive(Debug)]
3096    struct CapturedXmlRequest {
3097        method: String,
3098        target: String,
3099        headers: Vec<(String, String)>,
3100    }
3101
3102    fn test_s3_client(
3103        response: Option<http::Response<SdkBody>>,
3104    ) -> (S3Client, CaptureRequestReceiver) {
3105        test_s3_client_with_endpoint("https://example.com", response)
3106    }
3107
3108    fn test_s3_client_with_endpoint(
3109        endpoint: &str,
3110        response: Option<http::Response<SdkBody>>,
3111    ) -> (S3Client, CaptureRequestReceiver) {
3112        test_s3_client_with_endpoint_and_headers(endpoint, response, Vec::new())
3113    }
3114
3115    fn test_s3_client_with_endpoint_and_headers(
3116        endpoint: &str,
3117        response: Option<http::Response<SdkBody>>,
3118        request_headers: Vec<RequestHeader>,
3119    ) -> (S3Client, CaptureRequestReceiver) {
3120        let (http_client, request_receiver) = capture_request(response);
3121        let credentials = Credentials::new(
3122            "access-key",
3123            "secret-key",
3124            None,
3125            None,
3126            "rc-test-credentials",
3127        );
3128        let mut config_builder = aws_sdk_s3::config::Builder::new()
3129            .credentials_provider(credentials)
3130            .endpoint_url(endpoint)
3131            .region(aws_sdk_s3::config::Region::new("us-east-1"))
3132            .force_path_style(true)
3133            .behavior_version_latest()
3134            .http_client(http_client);
3135
3136        let presign_config = config_builder.clone().build();
3137
3138        if !request_headers.is_empty() {
3139            config_builder = config_builder.interceptor(CustomHeaderInterceptor {
3140                headers: request_headers.clone(),
3141            });
3142        }
3143
3144        let config = config_builder.build();
3145
3146        let alias = Alias::new("test", endpoint, "access-key", "secret-key");
3147        let client = S3Client {
3148            inner: aws_sdk_s3::Client::from_conf(config),
3149            presign_inner: aws_sdk_s3::Client::from_conf(presign_config),
3150            xml_http_client: reqwest::Client::new(),
3151            alias,
3152            request_headers,
3153        };
3154
3155        (client, request_receiver)
3156    }
3157
3158    fn read_xml_request(stream: &mut TcpStream) -> CapturedXmlRequest {
3159        let mut buffer = Vec::new();
3160        let mut chunk = [0_u8; 1024];
3161        let header_end = loop {
3162            let read = stream.read(&mut chunk).expect("read HTTP request");
3163            assert!(read > 0, "client closed connection before headers");
3164            buffer.extend_from_slice(&chunk[..read]);
3165
3166            if let Some(position) = buffer.windows(4).position(|window| window == b"\r\n\r\n") {
3167                break position + 4;
3168            }
3169        };
3170
3171        let headers_text = String::from_utf8_lossy(&buffer[..header_end]).into_owned();
3172        let content_length = headers_text
3173            .lines()
3174            .find_map(|line| {
3175                let (name, value) = line.split_once(':')?;
3176                name.eq_ignore_ascii_case("content-length")
3177                    .then(|| value.trim().parse::<usize>().expect("valid content length"))
3178            })
3179            .unwrap_or(0);
3180
3181        while buffer.len() - header_end < content_length {
3182            let read = stream.read(&mut chunk).expect("read HTTP request body");
3183            assert!(read > 0, "client closed connection before body");
3184            buffer.extend_from_slice(&chunk[..read]);
3185        }
3186
3187        let mut lines = headers_text.lines();
3188        let request_line = lines.next().expect("request line");
3189        let mut parts = request_line.split_whitespace();
3190        let method = parts.next().expect("request method").to_string();
3191        let target = parts.next().expect("request target").to_string();
3192        let headers = lines
3193            .filter_map(|line| {
3194                let (name, value) = line.split_once(':')?;
3195                Some((name.to_ascii_lowercase(), value.trim().to_string()))
3196            })
3197            .collect();
3198
3199        CapturedXmlRequest {
3200            method,
3201            target,
3202            headers,
3203        }
3204    }
3205
3206    fn start_xml_test_server() -> (
3207        String,
3208        mpsc::Receiver<CapturedXmlRequest>,
3209        thread::JoinHandle<()>,
3210    ) {
3211        let listener = TcpListener::bind("127.0.0.1:0").expect("bind test server");
3212        listener
3213            .set_nonblocking(true)
3214            .expect("configure nonblocking listener");
3215        let endpoint = format!("http://{}", listener.local_addr().expect("local addr"));
3216        let (sender, receiver) = mpsc::channel();
3217
3218        let handle = thread::spawn(move || {
3219            let deadline = Instant::now() + Duration::from_secs(5);
3220            let mut stream = loop {
3221                match listener.accept() {
3222                    Ok((stream, _)) => break stream,
3223                    Err(error) if error.kind() == std::io::ErrorKind::WouldBlock => {
3224                        assert!(Instant::now() < deadline, "timed out waiting for request");
3225                        thread::sleep(Duration::from_millis(10));
3226                    }
3227                    Err(error) => panic!("accept request: {error}"),
3228                }
3229            };
3230            stream
3231                .set_nonblocking(false)
3232                .expect("configure blocking request stream");
3233            stream
3234                .set_read_timeout(Some(Duration::from_secs(5)))
3235                .expect("set stream read timeout");
3236            let request = read_xml_request(&mut stream);
3237            sender.send(request).expect("send captured request");
3238
3239            let response = "HTTP/1.1 200 OK\r\ncontent-length: 2\r\nconnection: close\r\n\r\nok";
3240            stream
3241                .write_all(response.as_bytes())
3242                .expect("write HTTP response");
3243        });
3244
3245        (endpoint, receiver, handle)
3246    }
3247
3248    fn header_value<'a>(headers: &'a [(String, String)], name: &str) -> Option<&'a str> {
3249        headers
3250            .iter()
3251            .find(|(header_name, _)| header_name.eq_ignore_ascii_case(name))
3252            .map(|(_, value)| value.as_str())
3253    }
3254
3255    #[test]
3256    fn test_object_info_creation() {
3257        let info = ObjectInfo::file("test.txt", 1024);
3258        assert_eq!(info.key, "test.txt");
3259        assert_eq!(info.size_bytes, Some(1024));
3260    }
3261
3262    #[test]
3263    fn auto_bucket_lookup_uses_dns_for_aliyun_oss_service_endpoint() {
3264        let alias = Alias::new(
3265            "aliyun",
3266            "https://oss-cn-hangzhou.aliyuncs.com",
3267            "access-key",
3268            "secret-key",
3269        );
3270
3271        assert!(!force_path_style_for_alias(&alias));
3272    }
3273
3274    #[test]
3275    fn auto_bucket_lookup_uses_dns_for_aliyun_internal_service_endpoint() {
3276        let alias = Alias::new(
3277            "aliyun",
3278            "https://oss-cn-hangzhou-internal.aliyuncs.com",
3279            "access-key",
3280            "secret-key",
3281        );
3282
3283        assert!(!force_path_style_for_alias(&alias));
3284    }
3285
3286    #[test]
3287    fn auto_bucket_lookup_keeps_path_style_for_custom_endpoint() {
3288        let alias = Alias::new("local", "http://localhost:9000", "access-key", "secret-key");
3289
3290        assert!(force_path_style_for_alias(&alias));
3291    }
3292
3293    #[test]
3294    fn auto_bucket_lookup_keeps_path_style_for_non_oss_aliyun_endpoint() {
3295        let alias = Alias::new(
3296            "aliyun",
3297            "https://ecs-cn-hangzhou.aliyuncs.com",
3298            "access-key",
3299            "secret-key",
3300        );
3301
3302        assert!(force_path_style_for_alias(&alias));
3303    }
3304
3305    #[test]
3306    fn auto_bucket_lookup_keeps_path_style_for_invalid_endpoint() {
3307        let alias = Alias::new("broken", "not a valid endpoint", "access-key", "secret-key");
3308
3309        assert!(force_path_style_for_alias(&alias));
3310    }
3311
3312    #[test]
3313    fn explicit_bucket_lookup_overrides_auto_detection() {
3314        let mut path_alias = Alias::new(
3315            "aliyun",
3316            "https://oss-cn-hangzhou.aliyuncs.com",
3317            "access-key",
3318            "secret-key",
3319        );
3320        path_alias.bucket_lookup = "path".to_string();
3321
3322        let mut dns_alias =
3323            Alias::new("local", "http://localhost:9000", "access-key", "secret-key");
3324        dns_alias.bucket_lookup = "dns".to_string();
3325
3326        assert!(force_path_style_for_alias(&path_alias));
3327        assert!(!force_path_style_for_alias(&dns_alias));
3328    }
3329
3330    #[test]
3331    fn unknown_bucket_lookup_keeps_path_style() {
3332        let mut alias = Alias::new(
3333            "aliyun",
3334            "https://oss-cn-hangzhou.aliyuncs.com",
3335            "access-key",
3336            "secret-key",
3337        );
3338        alias.bucket_lookup = "unexpected".to_string();
3339
3340        assert!(force_path_style_for_alias(&alias));
3341    }
3342
3343    #[test]
3344    fn parse_replication_configuration_xml_reads_delete_replication() {
3345        let body = r#"<?xml version="1.0" encoding="UTF-8"?>
3346<ReplicationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
3347  <Role>arn:rustfs:replication:us-east-1:123:test</Role>
3348  <Rule>
3349    <Status>Enabled</Status>
3350    <Destination>
3351      <Bucket>arn:rustfs:replication:us-east-1:123:dest</Bucket>
3352      <StorageClass>STANDARD</StorageClass>
3353    </Destination>
3354    <ID>rule-1</ID>
3355    <Priority>1</Priority>
3356    <Filter>
3357      <Prefix>logs/</Prefix>
3358    </Filter>
3359    <ExistingObjectReplication>
3360      <Status>Enabled</Status>
3361    </ExistingObjectReplication>
3362    <DeleteMarkerReplication>
3363      <Status>Disabled</Status>
3364    </DeleteMarkerReplication>
3365    <DeleteReplication>
3366      <Status>Enabled</Status>
3367    </DeleteReplication>
3368  </Rule>
3369</ReplicationConfiguration>"#;
3370
3371        let config = parse_replication_configuration_xml(body).expect("parse replication xml");
3372        assert_eq!(config.role, "arn:rustfs:replication:us-east-1:123:test");
3373        assert_eq!(config.rules.len(), 1);
3374        assert_eq!(config.rules[0].id, "rule-1");
3375        assert_eq!(config.rules[0].prefix.as_deref(), Some("logs/"));
3376        assert_eq!(config.rules[0].delete_replication, Some(true));
3377        assert_eq!(config.rules[0].delete_marker_replication, Some(false));
3378        assert_eq!(config.rules[0].existing_object_replication, Some(true));
3379    }
3380
3381    #[test]
3382    fn parse_replication_configuration_xml_preserves_tag_filters() {
3383        let body = r#"<?xml version="1.0" encoding="UTF-8"?>
3384<ReplicationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
3385  <Rule>
3386    <Status>Enabled</Status>
3387    <Destination>
3388      <Bucket>arn:rustfs:replication:us-east-1:123:dest</Bucket>
3389    </Destination>
3390    <ID>tagged-rule</ID>
3391    <Priority>2</Priority>
3392    <Filter>
3393      <And>
3394        <Prefix>logs/</Prefix>
3395        <Tag>
3396          <Key>env</Key>
3397          <Value>prod</Value>
3398        </Tag>
3399        <Tag>
3400          <Key>team</Key>
3401          <Value>core</Value>
3402        </Tag>
3403      </And>
3404    </Filter>
3405  </Rule>
3406</ReplicationConfiguration>"#;
3407
3408        let config = parse_replication_configuration_xml(body).expect("parse replication xml");
3409        let rule = &config.rules[0];
3410        assert_eq!(rule.prefix.as_deref(), Some("logs/"));
3411        let tags = rule.tags.as_ref().expect("tag filters");
3412        assert_eq!(tags.get("env").map(String::as_str), Some("prod"));
3413        assert_eq!(tags.get("team").map(String::as_str), Some("core"));
3414    }
3415
3416    #[test]
3417    fn build_replication_configuration_xml_writes_delete_replication() {
3418        let config = ReplicationConfiguration {
3419            role: "arn:rustfs:replication:us-east-1:123:test".to_string(),
3420            rules: vec![rc_core::ReplicationRule {
3421                id: "rule-1".to_string(),
3422                priority: 1,
3423                status: rc_core::ReplicationRuleStatus::Enabled,
3424                prefix: Some("logs/".to_string()),
3425                tags: None,
3426                destination: rc_core::ReplicationDestination {
3427                    bucket_arn: "arn:rustfs:replication:us-east-1:123:dest".to_string(),
3428                    storage_class: Some("STANDARD".to_string()),
3429                },
3430                delete_marker_replication: Some(true),
3431                existing_object_replication: Some(true),
3432                delete_replication: Some(true),
3433            }],
3434        };
3435
3436        let xml = build_replication_configuration_xml(&config);
3437        assert!(xml.contains("<DeleteReplication><Status>Enabled</Status></DeleteReplication>"));
3438        assert!(xml.contains(
3439            "<ExistingObjectReplication><Status>Enabled</Status></ExistingObjectReplication>"
3440        ));
3441        assert!(xml.contains(
3442            "<DeleteMarkerReplication><Status>Enabled</Status></DeleteMarkerReplication>"
3443        ));
3444        assert!(xml.contains("<Filter><Prefix>logs/</Prefix></Filter>"));
3445    }
3446
3447    #[test]
3448    fn build_replication_configuration_xml_writes_and_tag_filters() {
3449        let mut tags = HashMap::new();
3450        tags.insert("env".to_string(), "prod".to_string());
3451        tags.insert("team".to_string(), "core".to_string());
3452
3453        let config = ReplicationConfiguration {
3454            role: String::new(),
3455            rules: vec![rc_core::ReplicationRule {
3456                id: "rule-1".to_string(),
3457                priority: 1,
3458                status: rc_core::ReplicationRuleStatus::Enabled,
3459                prefix: Some("logs/".to_string()),
3460                tags: Some(tags),
3461                destination: rc_core::ReplicationDestination {
3462                    bucket_arn: "arn:rustfs:replication:us-east-1:123:dest".to_string(),
3463                    storage_class: None,
3464                },
3465                delete_marker_replication: None,
3466                existing_object_replication: None,
3467                delete_replication: None,
3468            }],
3469        };
3470
3471        let xml = build_replication_configuration_xml(&config);
3472        assert!(xml.contains("<Filter><And><Prefix>logs/</Prefix>"));
3473        assert!(xml.contains("<Tag><Key>env</Key><Value>prod</Value></Tag>"));
3474        assert!(xml.contains("<Tag><Key>team</Key><Value>core</Value></Tag>"));
3475    }
3476
3477    #[test]
3478    fn build_lifecycle_rule_filter_preserves_prefix_and_tags() {
3479        let mut tags = HashMap::new();
3480        tags.insert("env".to_string(), "prod".to_string());
3481        tags.insert("team".to_string(), "core".to_string());
3482
3483        let filter = build_lifecycle_rule_filter(Some("logs/"), Some(&tags))
3484            .expect("build lifecycle filter")
3485            .expect("lifecycle filter");
3486
3487        assert_eq!(
3488            parse_lifecycle_filter_prefix(Some(&filter)).as_deref(),
3489            Some("logs/")
3490        );
3491        let parsed_tags = parse_lifecycle_filter_tags(Some(&filter)).expect("parsed tags");
3492        assert_eq!(parsed_tags.get("env").map(String::as_str), Some("prod"));
3493        assert_eq!(parsed_tags.get("team").map(String::as_str), Some("core"));
3494    }
3495
3496    #[test]
3497    fn bucket_policy_error_kind_uses_error_code() {
3498        assert_eq!(
3499            S3Client::bucket_policy_error_kind(Some("NoSuchBucketPolicy"), Some(404), ""),
3500            BucketPolicyErrorKind::MissingPolicy
3501        );
3502        assert_eq!(
3503            S3Client::bucket_policy_error_kind(Some("NoSuchBucket"), Some(404), ""),
3504            BucketPolicyErrorKind::MissingBucket
3505        );
3506    }
3507
3508    #[test]
3509    fn bucket_policy_error_kind_prefers_bucket_not_found_over_404_fallback() {
3510        assert_eq!(
3511            S3Client::bucket_policy_error_kind(None, Some(404), "NoSuchBucket"),
3512            BucketPolicyErrorKind::MissingBucket
3513        );
3514        assert_eq!(
3515            S3Client::bucket_policy_error_kind(None, Some(404), "no details"),
3516            BucketPolicyErrorKind::MissingPolicy
3517        );
3518    }
3519
3520    #[test]
3521    fn bucket_policy_error_mapping_returns_expected_result() {
3522        let get_missing_policy = S3Client::map_get_bucket_policy_error(
3523            "bucket",
3524            BucketPolicyErrorKind::MissingPolicy,
3525            "NoSuchPolicy",
3526        )
3527        .expect("missing policy should map to Ok(None)");
3528        assert!(get_missing_policy.is_none());
3529
3530        match S3Client::map_get_bucket_policy_error(
3531            "bucket",
3532            BucketPolicyErrorKind::MissingBucket,
3533            "NoSuchBucket",
3534        ) {
3535            Err(Error::NotFound(message)) => assert!(message.contains("Bucket not found")),
3536            other => panic!("Expected NotFound for missing bucket, got: {:?}", other),
3537        }
3538
3539        let delete_missing_policy = S3Client::map_delete_bucket_policy_error(
3540            "bucket",
3541            BucketPolicyErrorKind::MissingPolicy,
3542            "NoSuchPolicy",
3543        );
3544        assert!(
3545            delete_missing_policy.is_ok(),
3546            "Missing policy should be treated as successful delete"
3547        );
3548    }
3549
3550    #[test]
3551    fn notification_filter_round_trip_prefix_and_suffix() {
3552        let filter = S3Client::build_notification_filter(Some("logs/"), Some(".json"))
3553            .expect("filter should be built");
3554        let (prefix, suffix) = S3Client::extract_notification_filter(Some(&filter));
3555        assert_eq!(prefix.as_deref(), Some("logs/"));
3556        assert_eq!(suffix.as_deref(), Some(".json"));
3557    }
3558
3559    #[test]
3560    fn notification_filter_none_when_empty() {
3561        assert!(S3Client::build_notification_filter(None, None).is_none());
3562    }
3563
3564    #[test]
3565    fn notifications_equivalent_ignores_order_and_duplicate_events() {
3566        let expected = vec![
3567            BucketNotification {
3568                id: Some("a".to_string()),
3569                target: NotificationTarget::Queue,
3570                arn: "arn:aws:sqs:us-east-1:123456789012:q".to_string(),
3571                events: vec![
3572                    "s3:ObjectCreated:*".to_string(),
3573                    "s3:ObjectCreated:*".to_string(),
3574                ],
3575                prefix: Some("images/".to_string()),
3576                suffix: Some(".jpg".to_string()),
3577            },
3578            BucketNotification {
3579                id: Some("b".to_string()),
3580                target: NotificationTarget::Topic,
3581                arn: "arn:aws:sns:us-east-1:123456789012:t".to_string(),
3582                events: vec!["s3:ObjectRemoved:*".to_string()],
3583                prefix: None,
3584                suffix: None,
3585            },
3586        ];
3587
3588        let actual = vec![
3589            BucketNotification {
3590                id: None,
3591                target: NotificationTarget::Topic,
3592                arn: "arn:aws:sns:us-east-1:123456789012:t".to_string(),
3593                events: vec!["s3:ObjectRemoved:*".to_string()],
3594                prefix: None,
3595                suffix: None,
3596            },
3597            BucketNotification {
3598                id: None,
3599                target: NotificationTarget::Queue,
3600                arn: "arn:aws:sqs:us-east-1:123456789012:q".to_string(),
3601                events: vec!["s3:ObjectCreated:*".to_string()],
3602                prefix: Some("images/".to_string()),
3603                suffix: Some(".jpg".to_string()),
3604            },
3605        ];
3606
3607        assert!(S3Client::notifications_equivalent(&expected, &actual));
3608    }
3609
3610    #[test]
3611    fn sdk_cors_rule_to_core_preserves_optional_fields() {
3612        let sdk_rule = aws_sdk_s3::types::CorsRule::builder()
3613            .id("web-app")
3614            .allowed_origins("https://app.example.com")
3615            .allowed_methods("get")
3616            .allowed_headers("Authorization")
3617            .expose_headers("ETag")
3618            .max_age_seconds(300)
3619            .build()
3620            .expect("build cors rule");
3621
3622        let rule = sdk_cors_rule_to_core(&sdk_rule);
3623        assert_eq!(rule.id.as_deref(), Some("web-app"));
3624        assert_eq!(
3625            rule.allowed_origins,
3626            vec!["https://app.example.com".to_string()]
3627        );
3628        assert_eq!(rule.allowed_methods, vec!["get".to_string()]);
3629        assert_eq!(
3630            rule.allowed_headers,
3631            Some(vec!["Authorization".to_string()])
3632        );
3633        assert_eq!(rule.expose_headers, Some(vec!["ETag".to_string()]));
3634        assert_eq!(rule.max_age_seconds, Some(300));
3635    }
3636
3637    #[test]
3638    fn bucket_encryption_rule_maps_sse_s3() {
3639        let value = aws_sdk_s3::types::ServerSideEncryptionByDefault::builder()
3640            .sse_algorithm(aws_sdk_s3::types::ServerSideEncryption::Aes256)
3641            .build()
3642            .expect("build rule");
3643
3644        let encryption = sdk_bucket_encryption_to_core(&value).expect("map rule");
3645        assert_eq!(encryption, BucketEncryption::SseS3);
3646    }
3647
3648    #[test]
3649    fn bucket_encryption_rule_maps_sse_kms() {
3650        let value = aws_sdk_s3::types::ServerSideEncryptionByDefault::builder()
3651            .sse_algorithm(aws_sdk_s3::types::ServerSideEncryption::AwsKms)
3652            .kms_master_key_id("kms-key")
3653            .build()
3654            .expect("build rule");
3655
3656        let encryption = sdk_bucket_encryption_to_core(&value).expect("map rule");
3657        assert_eq!(
3658            encryption,
3659            BucketEncryption::SseKms {
3660                key_id: Some("kms-key".to_string()),
3661            }
3662        );
3663    }
3664
3665    #[test]
3666    fn bucket_encryption_rule_without_kms_key_maps_to_default_kms() {
3667        let value = aws_sdk_s3::types::ServerSideEncryptionByDefault::builder()
3668            .sse_algorithm(aws_sdk_s3::types::ServerSideEncryption::AwsKms)
3669            .build()
3670            .expect("build rule");
3671
3672        let encryption = sdk_bucket_encryption_to_core(&value).expect("map default kms rule");
3673        assert_eq!(encryption, BucketEncryption::SseKms { key_id: None });
3674    }
3675
3676    #[test]
3677    fn missing_bucket_encryption_errors_are_detected() {
3678        assert!(is_missing_bucket_encryption_error(
3679            "ServerSideEncryptionConfigurationNotFoundError"
3680        ));
3681        assert!(is_missing_bucket_encryption_error(
3682            "The server-side encryption configuration was not found"
3683        ));
3684        assert!(!is_missing_bucket_encryption_error("AccessDenied"));
3685    }
3686
3687    #[test]
3688    fn missing_bucket_encryption_response_detects_code_and_status() {
3689        assert!(is_missing_bucket_encryption_response(
3690            Some("ServerSideEncryptionConfigurationNotFoundError"),
3691            Some(404),
3692            "service error"
3693        ));
3694        assert!(is_missing_bucket_encryption_response(
3695            Some("NoSuchBucketEncryption"),
3696            Some(404),
3697            "service error"
3698        ));
3699        assert!(is_missing_bucket_encryption_response(
3700            None,
3701            Some(404),
3702            "The server-side encryption configuration was not found"
3703        ));
3704        assert!(is_missing_bucket_encryption_response(
3705            None,
3706            None,
3707            "The server-side encryption configuration was not found"
3708        ));
3709        assert!(!is_missing_bucket_encryption_response(
3710            Some("AccessDenied"),
3711            Some(403),
3712            "access denied"
3713        ));
3714        assert!(!is_missing_bucket_encryption_response(
3715            None,
3716            Some(500),
3717            "The server-side encryption configuration was not found"
3718        ));
3719    }
3720
3721    #[test]
3722    fn sdk_cors_rule_to_core_drops_empty_optional_headers() {
3723        let sdk_rule = aws_sdk_s3::types::CorsRule::builder()
3724            .allowed_origins("https://app.example.com")
3725            .allowed_methods("GET")
3726            .build()
3727            .expect("build cors rule");
3728
3729        let rule = sdk_cors_rule_to_core(&sdk_rule);
3730        assert_eq!(rule.allowed_headers, None);
3731        assert_eq!(rule.expose_headers, None);
3732    }
3733
3734    #[test]
3735    fn core_cors_rule_to_sdk_normalizes_method_case() {
3736        let rule = CorsRule {
3737            id: Some("public-read".to_string()),
3738            allowed_origins: vec!["*".to_string()],
3739            allowed_methods: vec!["get".to_string(), "post".to_string()],
3740            allowed_headers: Some(vec!["*".to_string()]),
3741            expose_headers: None,
3742            max_age_seconds: Some(600),
3743        };
3744
3745        let sdk_rule = core_cors_rule_to_sdk(&rule).expect("convert cors rule");
3746        assert_eq!(sdk_rule.id(), Some("public-read"));
3747        assert_eq!(sdk_rule.allowed_origins(), ["*"]);
3748        assert_eq!(sdk_rule.allowed_methods(), ["GET", "POST"]);
3749        assert_eq!(sdk_rule.allowed_headers(), ["*"]);
3750        assert_eq!(sdk_rule.max_age_seconds(), Some(600));
3751    }
3752
3753    #[tokio::test]
3754    async fn set_bucket_cors_sends_rule_fields() {
3755        let response = http::Response::builder()
3756            .status(200)
3757            .body(SdkBody::from(""))
3758            .expect("build put bucket cors response");
3759        let (client, request_receiver) = test_s3_client(Some(response));
3760
3761        client
3762            .set_bucket_cors(
3763                "bucket",
3764                vec![CorsRule {
3765                    id: Some("web-app".to_string()),
3766                    allowed_origins: vec!["https://app.example.com".to_string()],
3767                    allowed_methods: vec!["GET".to_string(), "POST".to_string()],
3768                    allowed_headers: Some(vec!["Authorization".to_string()]),
3769                    expose_headers: Some(vec!["ETag".to_string()]),
3770                    max_age_seconds: Some(600),
3771                }],
3772            )
3773            .await
3774            .expect("set bucket cors");
3775
3776        let request = request_receiver.expect_request();
3777        assert_eq!(request.method(), http::Method::PUT);
3778        assert!(
3779            request.uri().to_string().contains("?cors"),
3780            "expected bucket CORS subresource in URI: {}",
3781            request.uri()
3782        );
3783
3784        let body = request.body().bytes().expect("request body bytes");
3785        let body = std::str::from_utf8(body).expect("request body is utf8");
3786        assert!(body.contains("<ID>web-app</ID>"));
3787        assert!(body.contains("<AllowedOrigin>https://app.example.com</AllowedOrigin>"));
3788        assert!(body.contains("<AllowedMethod>GET</AllowedMethod>"));
3789        assert!(body.contains("<AllowedMethod>POST</AllowedMethod>"));
3790        assert!(body.contains("<AllowedHeader>Authorization</AllowedHeader>"));
3791        assert!(body.contains("<ExposeHeader>ETag</ExposeHeader>"));
3792        assert!(body.contains("<MaxAgeSeconds>600</MaxAgeSeconds>"));
3793    }
3794
3795    #[tokio::test]
3796    async fn get_bucket_encryption_sends_expected_request_shape() {
3797        let response = http::Response::builder()
3798            .status(200)
3799            .body(SdkBody::from(
3800                r#"<?xml version="1.0" encoding="UTF-8"?>
3801<ServerSideEncryptionConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
3802  <Rule>
3803    <ApplyServerSideEncryptionByDefault>
3804      <SSEAlgorithm>AES256</SSEAlgorithm>
3805    </ApplyServerSideEncryptionByDefault>
3806  </Rule>
3807</ServerSideEncryptionConfiguration>"#,
3808            ))
3809            .expect("build get bucket encryption response");
3810        let (client, request_receiver) = test_s3_client(Some(response));
3811
3812        let encryption = client
3813            .get_bucket_encryption("bucket")
3814            .await
3815            .expect("get bucket encryption");
3816
3817        assert_eq!(encryption, Some(BucketEncryption::SseS3));
3818
3819        let request = request_receiver.expect_request();
3820        assert_eq!(request.method(), http::Method::GET);
3821        assert!(
3822            request.uri().to_string().contains("?encryption"),
3823            "expected bucket encryption subresource in URI: {}",
3824            request.uri()
3825        );
3826    }
3827
3828    #[tokio::test]
3829    async fn get_bucket_encryption_missing_configuration_returns_none() {
3830        let response = http::Response::builder()
3831            .status(404)
3832            .header(
3833                "x-amz-error-code",
3834                "ServerSideEncryptionConfigurationNotFoundError",
3835            )
3836            .body(SdkBody::from(
3837                r#"<?xml version="1.0" encoding="UTF-8"?>
3838<Error>
3839  <Code>ServerSideEncryptionConfigurationNotFoundError</Code>
3840  <Message>The server-side encryption configuration was not found</Message>
3841</Error>"#,
3842            ))
3843            .expect("build missing bucket encryption response");
3844        let (client, _) = test_s3_client(Some(response));
3845
3846        let encryption = client
3847            .get_bucket_encryption("bucket")
3848            .await
3849            .expect("missing bucket encryption should map to None");
3850
3851        assert_eq!(encryption, None);
3852    }
3853
3854    #[tokio::test]
3855    async fn get_bucket_encryption_missing_rule_errors() {
3856        let response = http::Response::builder()
3857            .status(200)
3858            .body(SdkBody::from(
3859                r#"<?xml version="1.0" encoding="UTF-8"?>
3860<ServerSideEncryptionConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
3861  <Rule />
3862</ServerSideEncryptionConfiguration>"#,
3863            ))
3864            .expect("build malformed bucket encryption response");
3865        let (client, _) = test_s3_client(Some(response));
3866
3867        match client.get_bucket_encryption("bucket").await {
3868            Err(Error::General(message)) => {
3869                assert!(message.contains("missing bucket encryption rule"))
3870            }
3871            other => panic!("expected missing rule error, got {other:?}"),
3872        }
3873    }
3874
3875    #[tokio::test]
3876    async fn set_bucket_encryption_sends_expected_request_shape() {
3877        let response = http::Response::builder()
3878            .status(200)
3879            .body(SdkBody::from(""))
3880            .expect("build put bucket encryption response");
3881        let (client, request_receiver) = test_s3_client(Some(response));
3882
3883        client
3884            .set_bucket_encryption(
3885                "bucket",
3886                BucketEncryption::SseKms {
3887                    key_id: Some("kms-key".to_string()),
3888                },
3889            )
3890            .await
3891            .expect("set bucket encryption");
3892
3893        let request = request_receiver.expect_request();
3894        assert_eq!(request.method(), http::Method::PUT);
3895        assert!(
3896            request.uri().to_string().contains("?encryption"),
3897            "expected bucket encryption subresource in URI: {}",
3898            request.uri()
3899        );
3900
3901        let body = request.body().bytes().expect("request body bytes");
3902        let body = std::str::from_utf8(body).expect("request body is utf8");
3903        assert!(body.contains("<ServerSideEncryptionConfiguration"));
3904        assert!(body.contains("<SSEAlgorithm>aws:kms</SSEAlgorithm>"));
3905        assert!(body.contains("<KMSMasterKeyID>kms-key</KMSMasterKeyID>"));
3906    }
3907
3908    #[tokio::test]
3909    async fn delete_bucket_encryption_missing_configuration_is_successful() {
3910        let response = http::Response::builder()
3911            .status(404)
3912            .header("x-amz-error-code", "NoSuchBucketEncryption")
3913            .body(SdkBody::from(
3914                r#"<?xml version="1.0" encoding="UTF-8"?>
3915<Error>
3916  <Code>NoSuchBucketEncryption</Code>
3917  <Message>The server-side encryption configuration was not found</Message>
3918</Error>"#,
3919            ))
3920            .expect("build missing bucket encryption delete response");
3921        let (client, _) = test_s3_client(Some(response));
3922
3923        client
3924            .delete_bucket_encryption("bucket")
3925            .await
3926            .expect("missing bucket encryption should be treated as successful delete");
3927    }
3928
3929    #[tokio::test]
3930    async fn delete_bucket_encryption_sends_expected_request_shape() {
3931        let response = http::Response::builder()
3932            .status(204)
3933            .body(SdkBody::from(""))
3934            .expect("build delete bucket encryption response");
3935        let (client, request_receiver) = test_s3_client(Some(response));
3936
3937        client
3938            .delete_bucket_encryption("bucket")
3939            .await
3940            .expect("delete bucket encryption");
3941
3942        let request = request_receiver.expect_request();
3943        assert_eq!(request.method(), http::Method::DELETE);
3944        assert!(
3945            request.uri().to_string().contains("?encryption"),
3946            "expected bucket encryption subresource in URI: {}",
3947            request.uri()
3948        );
3949    }
3950
3951    #[test]
3952    fn core_cors_rule_to_sdk_drops_empty_optional_headers() {
3953        let rule = CorsRule {
3954            id: None,
3955            allowed_origins: vec!["https://app.example.com".to_string()],
3956            allowed_methods: vec!["GET".to_string()],
3957            allowed_headers: Some(Vec::new()),
3958            expose_headers: Some(Vec::new()),
3959            max_age_seconds: None,
3960        };
3961
3962        let sdk_rule = core_cors_rule_to_sdk(&rule).expect("convert cors rule");
3963        assert!(sdk_rule.allowed_headers().is_empty());
3964        assert!(sdk_rule.expose_headers().is_empty());
3965    }
3966
3967    #[test]
3968    fn parse_cors_configuration_xml_round_trips_rule_fields() {
3969        let body = r#"
3970<CORSConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
3971  <CORSRule>
3972    <ID>mc-rule</ID>
3973    <AllowedOrigin>https://console.example.com</AllowedOrigin>
3974    <AllowedMethod>GET</AllowedMethod>
3975    <AllowedMethod>POST</AllowedMethod>
3976    <AllowedHeader>*</AllowedHeader>
3977    <ExposeHeader>ETag</ExposeHeader>
3978    <MaxAgeSeconds>1200</MaxAgeSeconds>
3979  </CORSRule>
3980</CORSConfiguration>
3981"#;
3982
3983        let rules = parse_cors_configuration_xml(body).expect("parse cors xml");
3984        assert_eq!(rules.len(), 1);
3985        assert_eq!(rules[0].id.as_deref(), Some("mc-rule"));
3986        assert_eq!(
3987            rules[0].allowed_origins,
3988            vec!["https://console.example.com".to_string()]
3989        );
3990        assert_eq!(
3991            rules[0].allowed_methods,
3992            vec!["GET".to_string(), "POST".to_string()]
3993        );
3994        assert_eq!(rules[0].allowed_headers, Some(vec!["*".to_string()]));
3995        assert_eq!(rules[0].expose_headers, Some(vec!["ETag".to_string()]));
3996        assert_eq!(rules[0].max_age_seconds, Some(1200));
3997    }
3998
3999    #[test]
4000    fn cors_url_uses_path_style_bucket_and_query() {
4001        let (client, _) = test_s3_client(None);
4002
4003        let url = client.cors_url("bucket-name").expect("build cors url");
4004
4005        assert_eq!(url.as_str(), "https://example.com/bucket-name?cors=");
4006    }
4007
4008    #[test]
4009    fn cors_url_rejects_endpoints_without_path_segments() {
4010        let (client, _) = test_s3_client_with_endpoint("mailto:test@example.com", None);
4011
4012        match client.cors_url("bucket-name") {
4013            Err(Error::Network(message)) => {
4014                assert!(message.contains("does not support path-style bucket operations"));
4015            }
4016            other => panic!("expected path-style endpoint error, got {other:?}"),
4017        }
4018    }
4019
4020    #[test]
4021    fn missing_cors_configuration_errors_are_detected() {
4022        assert!(is_missing_cors_configuration_error(
4023            "NoSuchCORSConfiguration"
4024        ));
4025        assert!(is_missing_cors_configuration_error(
4026            "The CORS configuration does not exist"
4027        ));
4028        assert!(!is_missing_cors_configuration_error("AccessDenied"));
4029    }
4030
4031    #[test]
4032    fn missing_cors_configuration_response_detects_code_and_status() {
4033        assert!(is_missing_cors_configuration_response(
4034            Some("NoSuchCORSConfiguration"),
4035            Some(404),
4036            "service error"
4037        ));
4038        assert!(is_missing_cors_configuration_response(
4039            None,
4040            Some(404),
4041            "The CORS configuration does not exist"
4042        ));
4043        assert!(!is_missing_cors_configuration_response(
4044            Some("AccessDenied"),
4045            Some(403),
4046            "access denied"
4047        ));
4048        assert!(!is_missing_cors_configuration_response(
4049            None,
4050            Some(404),
4051            "service error"
4052        ));
4053        assert!(!is_missing_cors_configuration_response(
4054            Some("NoSuchBucket"),
4055            Some(404),
4056            "NoSuchBucket"
4057        ));
4058        assert!(is_missing_cors_configuration_response(
4059            None,
4060            None,
4061            "The CORS configuration does not exist"
4062        ));
4063        assert!(!is_missing_cors_configuration_response(
4064            None,
4065            Some(500),
4066            "The CORS configuration does not exist"
4067        ));
4068    }
4069
4070    #[tokio::test]
4071    async fn get_bucket_cors_missing_configuration_returns_empty_rules() {
4072        let response = http::Response::builder()
4073            .status(404)
4074            .header("x-amz-error-code", "NoSuchCORSConfiguration")
4075            .body(SdkBody::from(
4076                r#"<?xml version="1.0" encoding="UTF-8"?>
4077<Error>
4078  <Code>NoSuchCORSConfiguration</Code>
4079  <Message>The CORS configuration does not exist</Message>
4080</Error>"#,
4081            ))
4082            .expect("build missing cors response");
4083        let (client, _) = test_s3_client(Some(response));
4084
4085        let rules = client
4086            .get_bucket_cors("bucket")
4087            .await
4088            .expect("missing cors config should be treated as empty");
4089
4090        assert!(rules.is_empty());
4091    }
4092
4093    #[tokio::test]
4094    async fn delete_bucket_cors_missing_configuration_is_successful() {
4095        let response = http::Response::builder()
4096            .status(404)
4097            .header("x-amz-error-code", "NoSuchCORSConfiguration")
4098            .body(SdkBody::from(
4099                r#"<?xml version="1.0" encoding="UTF-8"?>
4100<Error>
4101  <Code>NoSuchCORSConfiguration</Code>
4102  <Message>The CORS configuration does not exist</Message>
4103</Error>"#,
4104            ))
4105            .expect("build missing cors response");
4106        let (client, _) = test_s3_client(Some(response));
4107
4108        client
4109            .delete_bucket_cors("bucket")
4110            .await
4111            .expect("missing cors config should be treated as successful delete");
4112    }
4113
4114    #[tokio::test]
4115    async fn reqwest_connector_insecure_without_ca_bundle_succeeds() {
4116        // When insecure is true and no CA bundle is provided, the connector should be created.
4117        let connector = ReqwestConnector::new(true, None, None, None).await;
4118        assert!(
4119            connector.is_ok(),
4120            "Expected insecure connector creation to succeed"
4121        );
4122    }
4123
4124    #[tokio::test]
4125    async fn reqwest_connector_invalid_ca_bundle_path_surfaces_error() {
4126        // Use an obviously invalid path (empty string) to trigger a read error.
4127        let result = ReqwestConnector::new(false, Some(""), None, None).await;
4128        match result {
4129            Err(Error::Network(msg)) => {
4130                assert!(
4131                    msg.contains("Failed to read CA bundle"),
4132                    "Unexpected error message: {msg}"
4133                );
4134            }
4135            other => panic!("Expected Error::Network for invalid path, got: {:?}", other),
4136        }
4137    }
4138
4139    #[test]
4140    fn should_use_multipart_for_large_files() {
4141        assert!(S3Client::should_use_multipart(
4142            SINGLE_PUT_OBJECT_MAX_SIZE + 1
4143        ));
4144    }
4145
4146    #[test]
4147    fn should_use_single_part_for_small_files() {
4148        assert!(!S3Client::should_use_multipart(0));
4149        assert!(!S3Client::should_use_multipart(1024 * 1024));
4150        assert!(!S3Client::should_use_multipart(
4151            crate::multipart::DEFAULT_PART_SIZE
4152        ));
4153        assert!(!S3Client::should_use_multipart(SINGLE_PUT_OBJECT_MAX_SIZE));
4154    }
4155
4156    #[tokio::test]
4157    async fn delete_object_with_force_delete_sets_rustfs_header() {
4158        let (client, request_receiver) = test_s3_client(None);
4159        let path = RemotePath::new("test", "bucket", "key.txt");
4160
4161        let _ = client
4162            .delete_object_with_options(&path, DeleteRequestOptions { force_delete: true })
4163            .await;
4164
4165        let request = request_receiver.expect_request();
4166        assert_eq!(request.headers().get("x-rustfs-force-delete"), Some("true"));
4167    }
4168
4169    #[tokio::test]
4170    async fn custom_headers_are_added_before_sending_sdk_requests() {
4171        let (client, request_receiver) = test_s3_client_with_endpoint_and_headers(
4172            "https://example.com",
4173            None,
4174            vec![RequestHeader {
4175                name: "x-amz-bucket-encrypt-enabled".to_string(),
4176                value: "1".to_string(),
4177            }],
4178        );
4179        let path = RemotePath::new("test", "bucket", "key.txt");
4180
4181        let _ = client.delete_object(&path).await;
4182
4183        let request = request_receiver.expect_request();
4184        assert_eq!(
4185            request.headers().get("x-amz-bucket-encrypt-enabled"),
4186            Some("1")
4187        );
4188        assert!(
4189            request
4190                .headers()
4191                .get("authorization")
4192                .expect("authorization header")
4193                .contains("x-amz-bucket-encrypt-enabled")
4194        );
4195    }
4196
4197    #[tokio::test]
4198    async fn custom_headers_are_not_required_by_presigned_urls() {
4199        let (client, _request_receiver) = test_s3_client_with_endpoint_and_headers(
4200            "https://example.com",
4201            None,
4202            vec![RequestHeader {
4203                name: "x-amz-bucket-encrypt-enabled".to_string(),
4204                value: "1".to_string(),
4205            }],
4206        );
4207        let path = RemotePath::new("test", "bucket", "key.txt");
4208
4209        let url = client
4210            .presign_get(&path, 3600)
4211            .await
4212            .expect("presign get should succeed");
4213
4214        assert!(!url.contains("x-amz-bucket-encrypt-enabled"));
4215    }
4216
4217    #[tokio::test]
4218    async fn custom_headers_are_added_to_xml_requests_before_signing() {
4219        let (endpoint, request_receiver, server_handle) = start_xml_test_server();
4220        let (client, _sdk_request_receiver) = test_s3_client_with_endpoint_and_headers(
4221            &endpoint,
4222            None,
4223            vec![RequestHeader {
4224                name: "x-amz-bucket-encrypt-enabled".to_string(),
4225                value: "1".to_string(),
4226            }],
4227        );
4228        let url = client
4229            .replication_url("bucket")
4230            .expect("replication URL should build");
4231
4232        let response = client
4233            .xml_request(
4234                Method::PUT,
4235                url,
4236                Some("application/xml"),
4237                Some(b"<xml/>".to_vec()),
4238            )
4239            .await
4240            .expect("xml request should succeed");
4241
4242        assert_eq!(response, "ok");
4243        let request = request_receiver
4244            .recv_timeout(Duration::from_secs(5))
4245            .expect("server should capture XML request");
4246        assert_eq!(request.method, "PUT");
4247        assert_eq!(request.target, "/bucket?replication=");
4248        assert_eq!(
4249            header_value(&request.headers, "x-amz-bucket-encrypt-enabled"),
4250            Some("1")
4251        );
4252        assert!(
4253            header_value(&request.headers, "authorization")
4254                .expect("authorization header")
4255                .contains("x-amz-bucket-encrypt-enabled")
4256        );
4257        server_handle.join().expect("server thread should finish");
4258    }
4259
4260    #[tokio::test]
4261    async fn delete_object_without_force_delete_omits_rustfs_header() {
4262        let (client, request_receiver) = test_s3_client(None);
4263        let path = RemotePath::new("test", "bucket", "key.txt");
4264
4265        let _ = client
4266            .delete_object_with_options(&path, DeleteRequestOptions::default())
4267            .await;
4268
4269        let request = request_receiver.expect_request();
4270        assert!(request.headers().get("x-rustfs-force-delete").is_none());
4271    }
4272
4273    #[tokio::test]
4274    async fn put_object_applies_sse_s3_headers() {
4275        let (client, request_receiver) = test_s3_client(None);
4276        let path = RemotePath::new("test", "bucket", "file.txt");
4277
4278        client
4279            .put_object(
4280                &path,
4281                b"payload".to_vec(),
4282                Some("text/plain"),
4283                Some(&ObjectEncryptionRequest::SseS3),
4284            )
4285            .await
4286            .expect("put object");
4287
4288        let request = request_receiver.expect_request();
4289        assert_eq!(
4290            request.headers().get("x-amz-server-side-encryption"),
4291            Some("AES256")
4292        );
4293    }
4294
4295    #[tokio::test]
4296    async fn copy_object_applies_sse_kms_headers() {
4297        let response = http::Response::builder()
4298            .status(500)
4299            .header("x-amz-error-code", "InternalError")
4300            .body(SdkBody::from(
4301                r#"<?xml version="1.0" encoding="UTF-8"?>
4302<Error>
4303  <Code>InternalError</Code>
4304  <Message>Something went wrong.</Message>
4305</Error>"#,
4306            ))
4307            .expect("build copy object response");
4308        let (client, request_receiver) = test_s3_client(Some(response));
4309        let src = RemotePath::new("test", "bucket", "src.txt");
4310        let dst = RemotePath::new("test", "bucket", "dst.txt");
4311
4312        let _ = client
4313            .copy_object(
4314                &src,
4315                &dst,
4316                Some(&ObjectEncryptionRequest::SseKms {
4317                    key_id: "kms-key".to_string(),
4318                }),
4319            )
4320            .await;
4321
4322        let request = request_receiver.expect_request();
4323        assert_eq!(
4324            request.headers().get("x-amz-server-side-encryption"),
4325            Some("aws:kms")
4326        );
4327        assert_eq!(
4328            request
4329                .headers()
4330                .get("x-amz-server-side-encryption-aws-kms-key-id"),
4331            Some("kms-key")
4332        );
4333    }
4334
4335    #[tokio::test]
4336    async fn delete_object_wrapper_uses_default_options_without_rustfs_header() {
4337        let (client, request_receiver) = test_s3_client(None);
4338        let path = RemotePath::new("test", "bucket", "key.txt");
4339
4340        let _ = ObjectStore::delete_object(&client, &path).await;
4341
4342        let request = request_receiver.expect_request();
4343        assert!(request.headers().get("x-rustfs-force-delete").is_none());
4344    }
4345
4346    #[tokio::test]
4347    async fn delete_object_with_options_maps_missing_keys_to_not_found() {
4348        let response = http::Response::builder()
4349            .status(404)
4350            .header("x-amz-error-code", "NoSuchKey")
4351            .body(SdkBody::from(
4352                r#"<?xml version="1.0" encoding="UTF-8"?>
4353<Error>
4354  <Code>NoSuchKey</Code>
4355  <Message>The specified key does not exist.</Message>
4356</Error>"#,
4357            ))
4358            .expect("build delete object response");
4359        let (client, _request_receiver) = test_s3_client(Some(response));
4360        let path = RemotePath::new("test", "bucket", "missing.txt");
4361
4362        let result = client
4363            .delete_object_with_options(&path, DeleteRequestOptions::default())
4364            .await;
4365
4366        match result {
4367            Err(Error::NotFound(message)) => assert_eq!(message, path.to_string()),
4368            other => panic!("Expected NotFound for missing key, got: {other:?}"),
4369        }
4370    }
4371
4372    #[tokio::test]
4373    async fn delete_object_with_options_maps_other_failures_to_network() {
4374        let response = http::Response::builder()
4375            .status(500)
4376            .header("x-amz-error-code", "InternalError")
4377            .body(SdkBody::from(
4378                r#"<?xml version="1.0" encoding="UTF-8"?>
4379<Error>
4380  <Code>InternalError</Code>
4381  <Message>Something went wrong.</Message>
4382</Error>"#,
4383            ))
4384            .expect("build delete object response");
4385        let (client, _request_receiver) = test_s3_client(Some(response));
4386        let path = RemotePath::new("test", "bucket", "key.txt");
4387
4388        let result = client
4389            .delete_object_with_options(&path, DeleteRequestOptions::default())
4390            .await;
4391
4392        match result {
4393            Err(Error::Network(message)) => assert!(message.contains("InternalError")),
4394            other => panic!("Expected Network for delete failure, got: {other:?}"),
4395        }
4396    }
4397
4398    #[tokio::test]
4399    async fn list_object_versions_page_preserves_markers_and_delete_markers() {
4400        let response = http::Response::builder()
4401            .status(200)
4402            .body(SdkBody::from(
4403                r#"<?xml version="1.0" encoding="UTF-8"?>
4404<ListVersionsResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
4405  <Name>bucket</Name>
4406  <Prefix>logs/</Prefix>
4407  <KeyMarker></KeyMarker>
4408  <VersionIdMarker></VersionIdMarker>
4409  <NextKeyMarker>logs/c.txt</NextKeyMarker>
4410  <NextVersionIdMarker>v3</NextVersionIdMarker>
4411  <MaxKeys>25</MaxKeys>
4412  <IsTruncated>true</IsTruncated>
4413  <Version>
4414    <Key>logs/a.txt</Key>
4415    <VersionId>v1</VersionId>
4416    <IsLatest>true</IsLatest>
4417    <LastModified>2026-04-29T11:22:33.000Z</LastModified>
4418    <ETag>"etag-a"</ETag>
4419    <Size>12</Size>
4420    <StorageClass>STANDARD</StorageClass>
4421  </Version>
4422  <DeleteMarker>
4423    <Key>logs/b.txt</Key>
4424    <VersionId>v2</VersionId>
4425    <IsLatest>false</IsLatest>
4426    <LastModified>2026-04-28T10:20:30.000Z</LastModified>
4427    <Owner>
4428      <ID>owner</ID>
4429    </Owner>
4430  </DeleteMarker>
4431</ListVersionsResult>"#,
4432            ))
4433            .expect("build list object versions response");
4434        let (client, request_receiver) = test_s3_client(Some(response));
4435        let path = RemotePath::new("test", "bucket", "logs/");
4436
4437        let result = client
4438            .list_object_versions_page(&path, Some(25))
4439            .await
4440            .expect("list object versions page");
4441
4442        let request = request_receiver.expect_request();
4443        let uri = request.uri().to_string();
4444        assert!(
4445            uri.starts_with("https://example.com/bucket/?"),
4446            "unexpected URI: {uri}"
4447        );
4448        assert!(
4449            uri.contains("versions"),
4450            "expected versions subresource: {uri}"
4451        );
4452        assert!(
4453            uri.contains("prefix=logs%2F"),
4454            "expected prefix query: {uri}"
4455        );
4456        assert!(
4457            uri.contains("max-keys=25"),
4458            "expected max-keys query: {uri}"
4459        );
4460
4461        assert!(result.truncated);
4462        assert_eq!(result.continuation_token.as_deref(), Some("logs/c.txt"));
4463        assert_eq!(result.version_id_marker.as_deref(), Some("v3"));
4464        assert_eq!(result.items.len(), 2);
4465
4466        let version = &result.items[0];
4467        assert_eq!(version.key, "logs/a.txt");
4468        assert_eq!(version.version_id, "v1");
4469        assert!(version.is_latest);
4470        assert!(!version.is_delete_marker);
4471        assert_eq!(version.size_bytes, Some(12));
4472        assert_eq!(version.etag.as_deref(), Some("etag-a"));
4473
4474        let delete_marker = &result.items[1];
4475        assert_eq!(delete_marker.key, "logs/b.txt");
4476        assert_eq!(delete_marker.version_id, "v2");
4477        assert!(!delete_marker.is_latest);
4478        assert!(delete_marker.is_delete_marker);
4479        assert_eq!(delete_marker.size_bytes, None);
4480        assert_eq!(delete_marker.etag, None);
4481    }
4482
4483    #[tokio::test]
4484    async fn list_object_versions_page_maps_missing_bucket_to_not_found() {
4485        let response = http::Response::builder()
4486            .status(404)
4487            .header("x-amz-error-code", "NoSuchBucket")
4488            .body(SdkBody::from(
4489                r#"<?xml version="1.0" encoding="UTF-8"?>
4490<Error>
4491  <Code>NoSuchBucket</Code>
4492  <Message>The specified bucket does not exist.</Message>
4493</Error>"#,
4494            ))
4495            .expect("build missing bucket response");
4496        let (client, _request_receiver) = test_s3_client(Some(response));
4497        let path = RemotePath::new("test", "missing-bucket", "");
4498
4499        let result = client.list_object_versions_page(&path, Some(1000)).await;
4500
4501        match result {
4502            Err(Error::NotFound(message)) => {
4503                assert_eq!(message, "Bucket not found: missing-bucket")
4504            }
4505            other => panic!("Expected NotFound for missing bucket, got: {other:?}"),
4506        }
4507    }
4508
4509    #[tokio::test]
4510    async fn list_object_versions_page_maps_not_found_code_to_not_found() {
4511        let response = http::Response::builder()
4512            .status(404)
4513            .header("x-amz-error-code", "NotFound")
4514            .body(SdkBody::from(
4515                r#"<?xml version="1.0" encoding="UTF-8"?>
4516<Error>
4517  <Code>NotFound</Code>
4518  <Message>The specified bucket does not exist.</Message>
4519</Error>"#,
4520            ))
4521            .expect("build not found bucket response");
4522        let (client, _request_receiver) = test_s3_client(Some(response));
4523        let path = RemotePath::new("test", "missing-bucket", "");
4524
4525        let result = client.list_object_versions_page(&path, Some(1000)).await;
4526
4527        match result {
4528            Err(Error::NotFound(message)) => {
4529                assert_eq!(message, "Bucket not found: missing-bucket")
4530            }
4531            other => panic!("Expected NotFound for NotFound list versions error, got: {other:?}"),
4532        }
4533    }
4534
4535    #[tokio::test]
4536    async fn list_object_versions_page_maps_other_failures_to_network() {
4537        let response = http::Response::builder()
4538            .status(500)
4539            .header("x-amz-error-code", "InternalError")
4540            .body(SdkBody::from(
4541                r#"<?xml version="1.0" encoding="UTF-8"?>
4542<Error>
4543  <Code>InternalError</Code>
4544  <Message>Something went wrong.</Message>
4545</Error>"#,
4546            ))
4547            .expect("build internal error response");
4548        let (client, _request_receiver) = test_s3_client(Some(response));
4549        let path = RemotePath::new("test", "bucket", "");
4550
4551        let result = client.list_object_versions_page(&path, Some(1000)).await;
4552
4553        match result {
4554            Err(Error::Network(message)) => assert!(message.contains("InternalError")),
4555            other => panic!("Expected Network for list versions failure, got: {other:?}"),
4556        }
4557    }
4558
4559    #[tokio::test]
4560    async fn list_buckets_preserves_service_error_code() {
4561        let response = http::Response::builder()
4562            .status(403)
4563            .header("x-amz-error-code", "InvalidAccessKeyId")
4564            .body(SdkBody::from(
4565                r#"<?xml version="1.0" encoding="UTF-8"?>
4566<Error>
4567  <Code>InvalidAccessKeyId</Code>
4568  <Message>The AWS access key Id you provided does not exist in our records.</Message>
4569</Error>"#,
4570            ))
4571            .expect("build list buckets response");
4572        let (client, _request_receiver) = test_s3_client(Some(response));
4573
4574        let result = client.list_buckets().await;
4575
4576        match result {
4577            Err(Error::Network(message)) => assert!(message.contains("InvalidAccessKeyId")),
4578            other => panic!("Expected Network for list buckets failure, got: {other:?}"),
4579        }
4580    }
4581
4582    #[tokio::test]
4583    async fn create_bucket_preserves_service_error_code() {
4584        let response = http::Response::builder()
4585            .status(403)
4586            .header("x-amz-error-code", "InvalidAccessKeyId")
4587            .body(SdkBody::from(
4588                r#"<?xml version="1.0" encoding="UTF-8"?>
4589<Error>
4590  <Code>InvalidAccessKeyId</Code>
4591  <Message>The AWS access key Id you provided does not exist in our records.</Message>
4592</Error>"#,
4593            ))
4594            .expect("build create bucket response");
4595        let (client, _request_receiver) = test_s3_client(Some(response));
4596
4597        let result = client.create_bucket("bucket").await;
4598
4599        match result {
4600            Err(Error::Network(message)) => assert!(message.contains("InvalidAccessKeyId")),
4601            other => panic!("Expected Network for create bucket failure, got: {other:?}"),
4602        }
4603    }
4604
4605    #[tokio::test]
4606    async fn delete_bucket_maps_bucket_not_empty_to_conflict() {
4607        let response = http::Response::builder()
4608            .status(409)
4609            .header("x-amz-error-code", "BucketNotEmpty")
4610            .body(SdkBody::from(
4611                r#"<?xml version="1.0" encoding="UTF-8"?>
4612<Error>
4613  <Code>BucketNotEmpty</Code>
4614  <Message>The bucket you tried to delete is not empty.</Message>
4615</Error>"#,
4616            ))
4617            .expect("build delete bucket response");
4618        let (client, _request_receiver) = test_s3_client(Some(response));
4619
4620        let result = client.delete_bucket("bucket").await;
4621
4622        match result {
4623            Err(Error::Conflict(message)) => assert!(message.contains("BucketNotEmpty")),
4624            other => panic!("Expected Conflict for non-empty bucket, got: {other:?}"),
4625        }
4626    }
4627
4628    #[tokio::test]
4629    async fn delete_bucket_maps_missing_bucket_to_not_found() {
4630        let response = http::Response::builder()
4631            .status(404)
4632            .header("x-amz-error-code", "NoSuchBucket")
4633            .body(SdkBody::from(
4634                r#"<?xml version="1.0" encoding="UTF-8"?>
4635<Error>
4636  <Code>NoSuchBucket</Code>
4637  <Message>The specified bucket does not exist.</Message>
4638</Error>"#,
4639            ))
4640            .expect("build missing bucket response");
4641        let (client, _request_receiver) = test_s3_client(Some(response));
4642
4643        let result = client.delete_bucket("missing-bucket").await;
4644
4645        match result {
4646            Err(Error::NotFound(message)) => {
4647                assert_eq!(message, "Bucket not found: missing-bucket")
4648            }
4649            other => panic!("Expected NotFound for missing bucket, got: {other:?}"),
4650        }
4651    }
4652
4653    #[tokio::test]
4654    async fn delete_bucket_maps_not_found_code_to_not_found() {
4655        let response = http::Response::builder()
4656            .status(404)
4657            .header("x-amz-error-code", "NotFound")
4658            .body(SdkBody::from(
4659                r#"<?xml version="1.0" encoding="UTF-8"?>
4660<Error>
4661  <Code>NotFound</Code>
4662  <Message>The specified bucket does not exist.</Message>
4663</Error>"#,
4664            ))
4665            .expect("build not found bucket response");
4666        let (client, _request_receiver) = test_s3_client(Some(response));
4667
4668        let result = client.delete_bucket("missing-bucket").await;
4669
4670        match result {
4671            Err(Error::NotFound(message)) => {
4672                assert_eq!(message, "Bucket not found: missing-bucket")
4673            }
4674            other => panic!("Expected NotFound for NotFound delete bucket error, got: {other:?}"),
4675        }
4676    }
4677
4678    #[tokio::test]
4679    async fn delete_bucket_maps_other_failures_to_network() {
4680        let response = http::Response::builder()
4681            .status(500)
4682            .header("x-amz-error-code", "InternalError")
4683            .body(SdkBody::from(
4684                r#"<?xml version="1.0" encoding="UTF-8"?>
4685<Error>
4686  <Code>InternalError</Code>
4687  <Message>Something went wrong.</Message>
4688</Error>"#,
4689            ))
4690            .expect("build delete bucket response");
4691        let (client, _request_receiver) = test_s3_client(Some(response));
4692
4693        let result = client.delete_bucket("bucket").await;
4694
4695        match result {
4696            Err(Error::Network(message)) => assert!(message.contains("InternalError")),
4697            other => panic!("Expected Network for delete bucket failure, got: {other:?}"),
4698        }
4699    }
4700
4701    #[tokio::test]
4702    async fn delete_objects_with_force_delete_sets_rustfs_header() {
4703        let response = http::Response::builder()
4704            .status(200)
4705            .body(SdkBody::from(
4706                r#"<?xml version="1.0" encoding="UTF-8"?>
4707<DeleteResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/" />"#,
4708            ))
4709            .expect("build delete objects response");
4710        let (client, request_receiver) = test_s3_client(Some(response));
4711
4712        let _ = client
4713            .delete_objects_with_options(
4714                "bucket",
4715                vec!["key.txt".to_string()],
4716                DeleteRequestOptions { force_delete: true },
4717            )
4718            .await;
4719
4720        let request = request_receiver.expect_request();
4721        assert_eq!(request.headers().get("x-rustfs-force-delete"), Some("true"));
4722    }
4723
4724    #[tokio::test]
4725    async fn delete_objects_without_force_delete_omits_rustfs_header() {
4726        let response = http::Response::builder()
4727            .status(200)
4728            .body(SdkBody::from(
4729                r#"<?xml version="1.0" encoding="UTF-8"?>
4730<DeleteResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/" />"#,
4731            ))
4732            .expect("build delete objects response");
4733        let (client, request_receiver) = test_s3_client(Some(response));
4734
4735        let _ = client
4736            .delete_objects_with_options(
4737                "bucket",
4738                vec!["key.txt".to_string()],
4739                DeleteRequestOptions::default(),
4740            )
4741            .await;
4742
4743        let request = request_receiver.expect_request();
4744        assert!(request.headers().get("x-rustfs-force-delete").is_none());
4745    }
4746
4747    #[tokio::test]
4748    async fn delete_objects_wrapper_uses_default_options_without_rustfs_header() {
4749        let response = http::Response::builder()
4750            .status(200)
4751            .body(SdkBody::from(
4752                r#"<?xml version="1.0" encoding="UTF-8"?>
4753<DeleteResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/" />"#,
4754            ))
4755            .expect("build delete objects response");
4756        let (client, request_receiver) = test_s3_client(Some(response));
4757
4758        let _ = ObjectStore::delete_objects(&client, "bucket", vec!["key.txt".to_string()]).await;
4759
4760        let request = request_receiver.expect_request();
4761        assert!(request.headers().get("x-rustfs-force-delete").is_none());
4762    }
4763
4764    #[tokio::test]
4765    async fn delete_objects_with_empty_keys_skips_http_request() {
4766        let (client, request_receiver) = test_s3_client(None);
4767
4768        let deleted = client
4769            .delete_objects_with_options("bucket", Vec::new(), DeleteRequestOptions::default())
4770            .await
4771            .expect("empty delete should succeed");
4772
4773        assert!(deleted.is_empty());
4774        request_receiver.expect_no_request();
4775    }
4776
4777    #[tokio::test]
4778    async fn delete_objects_with_partial_errors_returns_deleted_keys() {
4779        let response = http::Response::builder()
4780            .status(200)
4781            .body(SdkBody::from(
4782                r#"<?xml version="1.0" encoding="UTF-8"?>
4783<DeleteResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
4784  <Deleted>
4785    <Key>kept.txt</Key>
4786  </Deleted>
4787  <Error>
4788    <Key>failed.txt</Key>
4789    <Code>AccessDenied</Code>
4790    <Message>Access Denied</Message>
4791  </Error>
4792</DeleteResult>"#,
4793            ))
4794            .expect("build partial delete response");
4795        let (client, request_receiver) = test_s3_client(Some(response));
4796
4797        let deleted = client
4798            .delete_objects_with_options(
4799                "bucket",
4800                vec!["kept.txt".to_string(), "failed.txt".to_string()],
4801                DeleteRequestOptions::default(),
4802            )
4803            .await
4804            .expect("partial delete should still return deleted keys");
4805
4806        let request = request_receiver.expect_request();
4807        assert_eq!(request.uri(), "https://example.com/bucket/?delete");
4808        assert_eq!(deleted, vec!["kept.txt".to_string()]);
4809    }
4810
4811    #[tokio::test]
4812    async fn read_next_part_fills_buffer_until_eof() {
4813        use tokio::io::AsyncWriteExt;
4814
4815        let temp_dir = tempfile::tempdir().expect("create temp dir");
4816        let file_path = temp_dir.path().join("payload.bin");
4817        let mut writer = tokio::fs::File::create(&file_path)
4818            .await
4819            .expect("create temp file");
4820        writer
4821            .write_all(b"abcdefghij")
4822            .await
4823            .expect("write temp file");
4824        writer.flush().await.expect("flush temp file");
4825        drop(writer);
4826
4827        let mut reader = tokio::fs::File::open(&file_path)
4828            .await
4829            .expect("open temp file");
4830        let mut buffer = vec![0u8; 8];
4831
4832        let first = S3Client::read_next_part(&mut reader, &file_path, &mut buffer)
4833            .await
4834            .expect("first read");
4835        assert_eq!(first, 8);
4836        assert_eq!(&buffer[..first], b"abcdefgh");
4837
4838        let second = S3Client::read_next_part(&mut reader, &file_path, &mut buffer)
4839            .await
4840            .expect("second read");
4841        assert_eq!(second, 2);
4842        assert_eq!(&buffer[..second], b"ij");
4843
4844        let third = S3Client::read_next_part(&mut reader, &file_path, &mut buffer)
4845            .await
4846            .expect("third read");
4847        assert_eq!(third, 0);
4848    }
4849}