Skip to main content

rc_s3/
client.rs

1//! S3 client implementation
2//!
3//! Wraps aws-sdk-s3 and implements the ObjectStore trait from rc-core.
4
5use async_trait::async_trait;
6use aws_credential_types::Credentials;
7use aws_sdk_s3::error::ProvideErrorMetadata;
8use aws_sigv4::http_request::{
9    SignableBody, SignableRequest, SignatureLocation, SigningSettings, sign,
10};
11use aws_sigv4::sign::v4;
12use aws_smithy_runtime_api::box_error::BoxError;
13use aws_smithy_runtime_api::client::http::{
14    HttpClient, HttpConnector, HttpConnectorFuture, HttpConnectorSettings, SharedHttpConnector,
15};
16use aws_smithy_runtime_api::client::interceptors::Intercept;
17use aws_smithy_runtime_api::client::interceptors::context::BeforeTransmitInterceptorContextMut;
18use aws_smithy_runtime_api::client::orchestrator::HttpRequest;
19use aws_smithy_runtime_api::client::result::ConnectorError;
20use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
21use aws_smithy_runtime_api::http::{Response, StatusCode};
22use aws_smithy_types::body::SdkBody;
23use aws_smithy_types::config_bag::ConfigBag;
24use bytes::Bytes;
25use jiff::Timestamp;
26use quick_xml::de::from_str as from_xml_str;
27use rc_core::{
28    Alias, BucketNotification, Capabilities, CorsRule, Error, LifecycleRule, ListOptions,
29    ListResult, NotificationTarget, ObjectInfo, ObjectStore, ObjectVersion,
30    ObjectVersionListResult, RemotePath, ReplicationConfiguration, RequestHeader, Result,
31    SelectOptions, global_request_headers,
32};
33use reqwest::Method;
34use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderName, HeaderValue};
35use serde::Deserialize;
36use sha2::{Digest, Sha256};
37use std::collections::HashMap;
38use tokio::io::AsyncReadExt;
39use tokio::io::AsyncWrite;
40
41/// Keep single-part uploads small to avoid backend incompatibilities with
42/// streaming aws-chunked payloads.
43const SINGLE_PUT_OBJECT_MAX_SIZE: u64 = crate::multipart::DEFAULT_PART_SIZE;
44const S3_SERVICE_NAME: &str = "s3";
45const S3_REPLICATION_XML_NAMESPACE: &str = "http://s3.amazonaws.com/doc/2006-03-01/";
46const RUSTFS_FORCE_DELETE_HEADER: &str = "x-rustfs-force-delete";
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49enum BucketPolicyErrorKind {
50    MissingPolicy,
51    MissingBucket,
52    Other,
53}
54
55/// Custom HTTP connector using reqwest, supporting insecure TLS (skip cert verification)
56/// and custom CA bundles. Used when `alias.insecure = true` or `alias.ca_bundle.is_some()`.
57#[derive(Debug, Clone)]
58struct ReqwestConnector {
59    client: reqwest::Client,
60}
61
62impl ReqwestConnector {
63    async fn new(insecure: bool, ca_bundle: Option<&str>) -> Result<Self> {
64        let client = build_reqwest_client(insecure, ca_bundle).await?;
65        Ok(Self { client })
66    }
67}
68
69async fn build_reqwest_client(insecure: bool, ca_bundle: Option<&str>) -> Result<reqwest::Client> {
70    // NOTE: When `insecure = true`, `danger_accept_invalid_certs` disables all TLS
71    // certificate verification. Any CA bundle provided will still be added to the
72    // trust store but is rendered ineffective for this connection.
73    let mut builder = reqwest::Client::builder().danger_accept_invalid_certs(insecure);
74
75    if let Some(bundle_path) = ca_bundle {
76        // Use tokio::fs::read to avoid blocking the async runtime thread.
77        let pem = tokio::fs::read(bundle_path).await.map_err(|e| {
78            Error::Network(format!("Failed to read CA bundle '{bundle_path}': {e}"))
79        })?;
80        let cert = reqwest::Certificate::from_pem(&pem)
81            .map_err(|e| Error::Network(format!("Invalid CA bundle '{bundle_path}': {e}")))?;
82        builder = builder.add_root_certificate(cert);
83    }
84
85    let client = builder
86        .build()
87        .map_err(|e| Error::Network(format!("Failed to build HTTP client: {e}")))?;
88    Ok(client)
89}
90
91fn force_path_style_for_alias(alias: &Alias) -> bool {
92    match alias.bucket_lookup.as_str() {
93        "path" => true,
94        "dns" => false,
95        "auto" => !is_aliyun_oss_service_endpoint(&alias.endpoint),
96        _ => true,
97    }
98}
99
100fn is_aliyun_oss_service_endpoint(endpoint: &str) -> bool {
101    let Ok(url) = reqwest::Url::parse(endpoint.trim_end_matches('/')) else {
102        return false;
103    };
104
105    let Some(host) = url.host_str() else {
106        return false;
107    };
108
109    host.strip_suffix(".aliyuncs.com")
110        .and_then(|host| host.split('.').next())
111        .is_some_and(|first_label| first_label.starts_with("oss-"))
112}
113
114#[derive(Debug, Deserialize)]
115#[serde(rename_all = "PascalCase")]
116struct ReplicationConfigurationXml {
117    role: Option<String>,
118    #[serde(rename = "Rule", default)]
119    rules: Vec<ReplicationRuleXml>,
120}
121
122#[derive(Debug, Deserialize)]
123#[serde(rename_all = "PascalCase")]
124struct ReplicationRuleXml {
125    #[serde(rename = "ID")]
126    id: Option<String>,
127    priority: Option<i32>,
128    status: Option<String>,
129    #[serde(rename = "Prefix")]
130    legacy_prefix: Option<String>,
131    filter: Option<ReplicationFilterXml>,
132    destination: Option<ReplicationDestinationXml>,
133    delete_marker_replication: Option<ReplicationStatusXml>,
134    existing_object_replication: Option<ReplicationStatusXml>,
135    delete_replication: Option<ReplicationStatusXml>,
136}
137
138#[derive(Debug, Deserialize)]
139#[serde(rename_all = "PascalCase")]
140struct ReplicationFilterXml {
141    prefix: Option<String>,
142    tag: Option<TagXml>,
143    and: Option<ReplicationAndXml>,
144}
145
146#[derive(Debug, Deserialize)]
147#[serde(rename_all = "PascalCase")]
148struct ReplicationAndXml {
149    prefix: Option<String>,
150    #[serde(rename = "Tag", default)]
151    tags: Vec<TagXml>,
152}
153
154#[derive(Debug, Deserialize)]
155#[serde(rename_all = "PascalCase")]
156struct TagXml {
157    key: Option<String>,
158    value: Option<String>,
159}
160
161#[derive(Debug, Deserialize)]
162#[serde(rename_all = "PascalCase")]
163struct ReplicationDestinationXml {
164    bucket: Option<String>,
165    storage_class: Option<String>,
166}
167
168#[derive(Debug, Deserialize)]
169#[serde(rename_all = "PascalCase")]
170struct ReplicationStatusXml {
171    status: Option<String>,
172}
173
174#[derive(Debug, Deserialize)]
175#[serde(rename_all = "PascalCase")]
176struct CorsConfigurationXml {
177    #[serde(rename = "CORSRule", default)]
178    rules: Vec<CorsRuleXml>,
179}
180
181#[derive(Debug, Deserialize)]
182#[serde(rename_all = "PascalCase")]
183struct CorsRuleXml {
184    #[serde(rename = "ID")]
185    id: Option<String>,
186    #[serde(rename = "AllowedOrigin", default)]
187    allowed_origins: Vec<String>,
188    #[serde(rename = "AllowedMethod", default)]
189    allowed_methods: Vec<String>,
190    #[serde(rename = "AllowedHeader", default)]
191    allowed_headers: Vec<String>,
192    #[serde(rename = "ExposeHeader", default)]
193    expose_headers: Vec<String>,
194    max_age_seconds: Option<i32>,
195}
196
197fn parse_replication_status(status: Option<&ReplicationStatusXml>) -> Option<bool> {
198    status
199        .and_then(|value| value.status.as_deref())
200        .map(|value| value.eq_ignore_ascii_case("enabled"))
201}
202
203fn parse_replication_rule_status(status: Option<&str>) -> rc_core::ReplicationRuleStatus {
204    match status {
205        Some(value) if value.eq_ignore_ascii_case("enabled") => {
206            rc_core::ReplicationRuleStatus::Enabled
207        }
208        _ => rc_core::ReplicationRuleStatus::Disabled,
209    }
210}
211
212fn collect_tag_map<'a, I>(tags: I) -> Option<HashMap<String, String>>
213where
214    I: IntoIterator<Item = (&'a str, &'a str)>,
215{
216    let collected: HashMap<String, String> = tags
217        .into_iter()
218        .map(|(key, value)| (key.to_string(), value.to_string()))
219        .collect();
220    if collected.is_empty() {
221        None
222    } else {
223        Some(collected)
224    }
225}
226
227fn parse_tag_xml(tag: Option<&TagXml>) -> Option<HashMap<String, String>> {
228    collect_tag_map(tag.and_then(|tag| Some((tag.key.as_deref()?, tag.value.as_deref()?))))
229}
230
231fn parse_tag_xmls(tags: &[TagXml]) -> Option<HashMap<String, String>> {
232    collect_tag_map(
233        tags.iter()
234            .filter_map(|tag| Some((tag.key.as_deref()?, tag.value.as_deref()?))),
235    )
236}
237
238fn parse_replication_filter_prefix(filter: Option<&ReplicationFilterXml>) -> Option<String> {
239    filter
240        .and_then(|filter| filter.prefix.clone())
241        .or_else(|| filter.and_then(|filter| filter.and.as_ref()?.prefix.clone()))
242}
243
244fn parse_replication_filter_tags(
245    filter: Option<&ReplicationFilterXml>,
246) -> Option<HashMap<String, String>> {
247    filter
248        .and_then(|filter| parse_tag_xml(filter.tag.as_ref()))
249        .or_else(|| filter.and_then(|filter| parse_tag_xmls(&filter.and.as_ref()?.tags)))
250}
251
252fn sorted_tags(tags: &HashMap<String, String>) -> Vec<(&str, &str)> {
253    let mut pairs: Vec<(&str, &str)> = tags
254        .iter()
255        .map(|(key, value)| (key.as_str(), value.as_str()))
256        .collect();
257    pairs.sort_unstable();
258    pairs
259}
260
261fn append_tag_xml(xml: &mut String, key: &str, value: &str) {
262    xml.push_str("<Tag><Key>");
263    xml.push_str(&xml_escape(key));
264    xml.push_str("</Key><Value>");
265    xml.push_str(&xml_escape(value));
266    xml.push_str("</Value></Tag>");
267}
268
269fn append_replication_filter_xml(
270    xml: &mut String,
271    prefix: Option<&str>,
272    tags: Option<&HashMap<String, String>>,
273) {
274    let Some(tags) = tags.filter(|tags| !tags.is_empty()) else {
275        if let Some(prefix) = prefix {
276            xml.push_str("<Filter><Prefix>");
277            xml.push_str(&xml_escape(prefix));
278            xml.push_str("</Prefix></Filter>");
279        }
280        return;
281    };
282
283    xml.push_str("<Filter>");
284    if prefix.is_some() || tags.len() > 1 {
285        xml.push_str("<And>");
286        if let Some(prefix) = prefix {
287            xml.push_str("<Prefix>");
288            xml.push_str(&xml_escape(prefix));
289            xml.push_str("</Prefix>");
290        }
291        for (key, value) in sorted_tags(tags) {
292            append_tag_xml(xml, key, value);
293        }
294        xml.push_str("</And>");
295    } else if let Some((key, value)) = sorted_tags(tags).into_iter().next() {
296        append_tag_xml(xml, key, value);
297    }
298    xml.push_str("</Filter>");
299}
300
301fn normalize_optional_strings(values: Option<Vec<String>>) -> Option<Vec<String>> {
302    values.filter(|items| !items.is_empty())
303}
304
305fn is_missing_cors_configuration_error(error_text: &str) -> bool {
306    let normalized = error_text.to_ascii_lowercase();
307    normalized.contains("nosuchcorsconfiguration")
308        || normalized.contains("cors configuration does not exist")
309        || normalized.contains("the cors configuration does not exist")
310}
311
312fn is_missing_cors_configuration_response(
313    error_code: Option<&str>,
314    status_code: Option<u16>,
315    error_text: &str,
316) -> bool {
317    let error_code = error_code.map(|code| code.to_ascii_lowercase());
318    if matches!(error_code.as_deref(), Some("nosuchcorsconfiguration")) {
319        return true;
320    }
321
322    if !is_missing_cors_configuration_error(error_text) {
323        return false;
324    }
325
326    status_code.is_none_or(|status| status == 404)
327}
328
329fn sdk_cors_rule_to_core(rule: &aws_sdk_s3::types::CorsRule) -> CorsRule {
330    CorsRule {
331        id: rule.id().map(str::to_string),
332        allowed_origins: rule.allowed_origins().to_vec(),
333        allowed_methods: rule.allowed_methods().to_vec(),
334        allowed_headers: normalize_optional_strings(Some(rule.allowed_headers().to_vec())),
335        expose_headers: normalize_optional_strings(Some(rule.expose_headers().to_vec())),
336        max_age_seconds: rule.max_age_seconds(),
337    }
338}
339
340fn core_cors_rule_to_sdk(rule: &CorsRule) -> Result<aws_sdk_s3::types::CorsRule> {
341    aws_sdk_s3::types::CorsRule::builder()
342        .set_id(rule.id.clone())
343        .set_allowed_origins(Some(rule.allowed_origins.clone()))
344        .set_allowed_methods(Some(
345            rule.allowed_methods
346                .iter()
347                .map(|method| method.to_ascii_uppercase())
348                .collect(),
349        ))
350        .set_allowed_headers(normalize_optional_strings(rule.allowed_headers.clone()))
351        .set_expose_headers(normalize_optional_strings(rule.expose_headers.clone()))
352        .set_max_age_seconds(rule.max_age_seconds)
353        .build()
354        .map_err(|e| Error::General(format!("build bucket cors rule: {e}")))
355}
356
357fn parse_cors_configuration_xml(body: &str) -> Result<Vec<CorsRule>> {
358    let config: CorsConfigurationXml =
359        from_xml_str(body).map_err(|e| Error::General(format!("parse bucket cors xml: {e}")))?;
360
361    Ok(config
362        .rules
363        .into_iter()
364        .map(|rule| CorsRule {
365            id: rule.id,
366            allowed_origins: rule.allowed_origins,
367            allowed_methods: rule.allowed_methods,
368            allowed_headers: normalize_optional_strings(Some(rule.allowed_headers)),
369            expose_headers: normalize_optional_strings(Some(rule.expose_headers)),
370            max_age_seconds: rule.max_age_seconds,
371        })
372        .collect())
373}
374
375fn parse_replication_configuration_xml(body: &str) -> Result<ReplicationConfiguration> {
376    let config: ReplicationConfigurationXml = from_xml_str(body)
377        .map_err(|e| Error::General(format!("parse replication config xml: {e}")))?;
378
379    let rules = config
380        .rules
381        .into_iter()
382        .map(|rule| rc_core::ReplicationRule {
383            id: rule.id.unwrap_or_default(),
384            priority: rule.priority.unwrap_or_default(),
385            status: parse_replication_rule_status(rule.status.as_deref()),
386            prefix: parse_replication_filter_prefix(rule.filter.as_ref()).or(rule.legacy_prefix),
387            tags: parse_replication_filter_tags(rule.filter.as_ref()),
388            destination: rc_core::ReplicationDestination {
389                bucket_arn: rule
390                    .destination
391                    .as_ref()
392                    .and_then(|destination| destination.bucket.clone())
393                    .unwrap_or_default(),
394                storage_class: rule
395                    .destination
396                    .and_then(|destination| destination.storage_class),
397            },
398            delete_marker_replication: parse_replication_status(
399                rule.delete_marker_replication.as_ref(),
400            ),
401            existing_object_replication: parse_replication_status(
402                rule.existing_object_replication.as_ref(),
403            ),
404            delete_replication: parse_replication_status(rule.delete_replication.as_ref()),
405        })
406        .collect();
407
408    Ok(ReplicationConfiguration {
409        role: config.role.unwrap_or_default(),
410        rules,
411    })
412}
413
414fn xml_escape(value: &str) -> String {
415    value
416        .replace('&', "&amp;")
417        .replace('<', "&lt;")
418        .replace('>', "&gt;")
419        .replace('"', "&quot;")
420        .replace('\'', "&apos;")
421}
422
423fn append_replication_status_tag(xml: &mut String, tag: &str, enabled: Option<bool>) {
424    if let Some(enabled) = enabled {
425        let status = if enabled { "Enabled" } else { "Disabled" };
426        xml.push('<');
427        xml.push_str(tag);
428        xml.push_str("><Status>");
429        xml.push_str(status);
430        xml.push_str("</Status></");
431        xml.push_str(tag);
432        xml.push('>');
433    }
434}
435
436fn build_replication_configuration_xml(config: &ReplicationConfiguration) -> String {
437    let mut xml = String::from(r#"<?xml version="1.0" encoding="UTF-8"?>"#);
438    xml.push_str(r#"<ReplicationConfiguration xmlns=""#);
439    xml.push_str(S3_REPLICATION_XML_NAMESPACE);
440    xml.push_str(r#"">"#);
441
442    if !config.role.is_empty() {
443        xml.push_str("<Role>");
444        xml.push_str(&xml_escape(&config.role));
445        xml.push_str("</Role>");
446    }
447
448    for rule in &config.rules {
449        xml.push_str("<Rule>");
450
451        xml.push_str("<Status>");
452        xml.push_str(match rule.status {
453            rc_core::ReplicationRuleStatus::Enabled => "Enabled",
454            rc_core::ReplicationRuleStatus::Disabled => "Disabled",
455        });
456        xml.push_str("</Status>");
457
458        xml.push_str("<Destination><Bucket>");
459        xml.push_str(&xml_escape(&rule.destination.bucket_arn));
460        xml.push_str("</Bucket>");
461        if let Some(storage_class) = &rule.destination.storage_class {
462            xml.push_str("<StorageClass>");
463            xml.push_str(&xml_escape(storage_class));
464            xml.push_str("</StorageClass>");
465        }
466        xml.push_str("</Destination>");
467
468        if !rule.id.is_empty() {
469            xml.push_str("<ID>");
470            xml.push_str(&xml_escape(&rule.id));
471            xml.push_str("</ID>");
472        }
473
474        xml.push_str("<Priority>");
475        xml.push_str(&rule.priority.to_string());
476        xml.push_str("</Priority>");
477
478        append_replication_filter_xml(&mut xml, rule.prefix.as_deref(), rule.tags.as_ref());
479
480        append_replication_status_tag(
481            &mut xml,
482            "ExistingObjectReplication",
483            rule.existing_object_replication,
484        );
485        append_replication_status_tag(
486            &mut xml,
487            "DeleteMarkerReplication",
488            rule.delete_marker_replication,
489        );
490        append_replication_status_tag(&mut xml, "DeleteReplication", rule.delete_replication);
491
492        xml.push_str("</Rule>");
493    }
494
495    xml.push_str("</ReplicationConfiguration>");
496    xml
497}
498
499fn parse_lifecycle_filter_prefix(
500    filter: Option<&aws_sdk_s3::types::LifecycleRuleFilter>,
501) -> Option<String> {
502    filter
503        .and_then(|filter| filter.prefix().map(str::to_string))
504        .or_else(|| filter.and_then(|filter| filter.and()?.prefix().map(str::to_string)))
505}
506
507fn parse_lifecycle_filter_tags(
508    filter: Option<&aws_sdk_s3::types::LifecycleRuleFilter>,
509) -> Option<HashMap<String, String>> {
510    filter
511        .and_then(|filter| collect_tag_map(filter.tag().map(|tag| (tag.key(), tag.value()))))
512        .or_else(|| {
513            filter.and_then(|filter| {
514                collect_tag_map(
515                    filter
516                        .and()?
517                        .tags()
518                        .iter()
519                        .map(|tag| (tag.key(), tag.value())),
520                )
521            })
522        })
523}
524
525fn build_s3_tag(key: &str, value: &str) -> Result<aws_sdk_s3::types::Tag> {
526    aws_sdk_s3::types::Tag::builder()
527        .key(key)
528        .value(value)
529        .build()
530        .map_err(|error| Error::General(format!("build filter tag: {error}")))
531}
532
533fn build_lifecycle_rule_filter(
534    prefix: Option<&str>,
535    tags: Option<&HashMap<String, String>>,
536) -> Result<Option<aws_sdk_s3::types::LifecycleRuleFilter>> {
537    let Some(tags) = tags.filter(|tags| !tags.is_empty()) else {
538        return Ok(prefix.map(|prefix| {
539            aws_sdk_s3::types::LifecycleRuleFilter::builder()
540                .prefix(prefix)
541                .build()
542        }));
543    };
544
545    let tag_values = sorted_tags(tags)
546        .into_iter()
547        .map(|(key, value)| build_s3_tag(key, value))
548        .collect::<Result<Vec<_>>>()?;
549
550    let filter = if prefix.is_some() || tag_values.len() > 1 {
551        let mut and_builder = aws_sdk_s3::types::LifecycleRuleAndOperator::builder();
552        if let Some(prefix) = prefix {
553            and_builder = and_builder.prefix(prefix);
554        }
555        for tag in tag_values {
556            and_builder = and_builder.tags(tag);
557        }
558        aws_sdk_s3::types::LifecycleRuleFilter::builder()
559            .and(and_builder.build())
560            .build()
561    } else {
562        aws_sdk_s3::types::LifecycleRuleFilter::builder()
563            .tag(
564                tag_values
565                    .into_iter()
566                    .next()
567                    .expect("non-empty tags required to build lifecycle filter"),
568            )
569            .build()
570    };
571
572    Ok(Some(filter))
573}
574
575impl HttpConnector for ReqwestConnector {
576    fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
577        let client = self.client.clone();
578        HttpConnectorFuture::new(async move {
579            // Extract request parts before consuming the request
580            let uri = request.uri().to_string();
581            let method_str = request.method().to_string();
582            let headers = request.headers().clone();
583
584            // Try to get the body as buffered in-memory bytes.
585            // For streaming bodies (e.g., large file uploads), bytes() returns None and we
586            // return a clear error rather than silently sending an empty body, which would
587            // cause signature mismatches or server-side failures.
588            let body_bytes = match request.body().bytes() {
589                Some(b) => Bytes::copy_from_slice(b),
590                None => {
591                    return Err(ConnectorError::user(
592                        "Streaming request bodies are not supported in insecure/ca_bundle TLS mode; \
593                         use in-memory data for uploads with this connector"
594                            .into(),
595                    ));
596                }
597            };
598
599            // Build reqwest method
600            let method = reqwest::Method::from_bytes(method_str.as_bytes())
601                .map_err(|e| ConnectorError::user(Box::new(e)))?;
602
603            // Build reqwest URL
604            let url = reqwest::Url::parse(&uri).map_err(|e| ConnectorError::user(Box::new(e)))?;
605
606            // Build reqwest request
607            let mut req = reqwest::Request::new(method, url);
608
609            // Copy headers; S3 headers are all ASCII so failures here are unexpected
610            for (name, value) in headers.iter() {
611                match (
612                    reqwest::header::HeaderName::from_bytes(name.as_bytes()),
613                    reqwest::header::HeaderValue::from_bytes(value.as_bytes()),
614                ) {
615                    (Ok(header_name), Ok(header_value)) => {
616                        req.headers_mut().append(header_name, header_value);
617                    }
618                    _ => {
619                        tracing::warn!("Skipping non-convertible request header: {}", name);
620                    }
621                }
622            }
623
624            // Set body
625            *req.body_mut() = Some(reqwest::Body::from(body_bytes));
626
627            // Execute
628            let resp = client
629                .execute(req)
630                .await
631                .map_err(|e| ConnectorError::io(Box::new(e)))?;
632
633            // Convert response
634            let status = StatusCode::try_from(resp.status().as_u16())
635                .map_err(|e| ConnectorError::other(Box::new(e), None))?;
636            let resp_headers = resp.headers().clone();
637            let body = resp
638                .bytes()
639                .await
640                .map_err(|e| ConnectorError::io(Box::new(e)))?;
641
642            let mut sdk_response = Response::new(status, SdkBody::from(body));
643            for (name, value) in &resp_headers {
644                match value.to_str() {
645                    Ok(value_str) => {
646                        sdk_response
647                            .headers_mut()
648                            .append(name.as_str().to_owned(), value_str.to_owned());
649                    }
650                    Err(_) => {
651                        tracing::warn!("Skipping non-UTF8 response header: {}", name.as_str());
652                    }
653                }
654            }
655
656            Ok(sdk_response)
657        })
658    }
659}
660
661impl HttpClient for ReqwestConnector {
662    fn http_connector(
663        &self,
664        _settings: &HttpConnectorSettings,
665        _components: &RuntimeComponents,
666    ) -> SharedHttpConnector {
667        // NOTE: `ReqwestConnector` is preconfigured (e.g., insecure/CA-bundle options) when it
668        // is constructed, and does not currently apply `HttpConnectorSettings`. This means
669        // behavior in this mode may differ from the default connector w.r.t. SDK HTTP settings.
670        // If alignment is required, map relevant fields from `HttpConnectorSettings` onto the
671        // internal `reqwest::Client` when constructing the connector.
672        SharedHttpConnector::new(self.clone())
673    }
674}
675
676/// S3 client wrapper
677pub struct S3Client {
678    inner: aws_sdk_s3::Client,
679    presign_inner: aws_sdk_s3::Client,
680    xml_http_client: reqwest::Client,
681    alias: Alias,
682    request_headers: Vec<RequestHeader>,
683}
684
685/// Request-level options for delete operations.
686#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
687pub struct DeleteRequestOptions {
688    /// Ask RustFS to permanently delete data instead of creating delete markers.
689    pub force_delete: bool,
690}
691
692#[derive(Debug, Clone)]
693struct CustomHeaderInterceptor {
694    headers: Vec<RequestHeader>,
695}
696
697impl Intercept for CustomHeaderInterceptor {
698    fn name(&self) -> &'static str {
699        "CustomHeaderInterceptor"
700    }
701
702    fn modify_before_signing(
703        &self,
704        context: &mut BeforeTransmitInterceptorContextMut<'_>,
705        _runtime_components: &RuntimeComponents,
706        _cfg: &mut ConfigBag,
707    ) -> std::result::Result<(), BoxError> {
708        let request = context.request_mut();
709        for header in &self.headers {
710            request
711                .headers_mut()
712                .try_insert(header.name.clone(), header.value.clone())
713                .map_err(|error| Box::new(error) as BoxError)?;
714        }
715        Ok(())
716    }
717}
718
719impl S3Client {
720    /// Create a new S3 client from an alias configuration
721    pub async fn new(alias: Alias) -> Result<Self> {
722        let endpoint = alias.endpoint.clone();
723        let region = alias.region.clone();
724        let access_key = alias.access_key.clone();
725        let secret_key = alias.secret_key.clone();
726
727        // Build credentials provider
728        let credentials = aws_credential_types::Credentials::new(
729            access_key,
730            secret_key,
731            None, // session token
732            None, // expiry
733            "rc-static-credentials",
734        );
735
736        // Build SDK config loader
737        let mut config_loader = aws_config::defaults(aws_config::BehaviorVersion::latest())
738            .credentials_provider(credentials)
739            .region(aws_config::Region::new(region))
740            .endpoint_url(&endpoint);
741
742        // When insecure mode is enabled or a custom CA bundle is provided, use the reqwest
743        // connector which supports danger_accept_invalid_certs and custom root certificates.
744        if alias.insecure || alias.ca_bundle.is_some() {
745            let connector =
746                ReqwestConnector::new(alias.insecure, alias.ca_bundle.as_deref()).await?;
747            config_loader = config_loader.http_client(connector);
748        }
749
750        let xml_http_client =
751            build_reqwest_client(alias.insecure, alias.ca_bundle.as_deref()).await?;
752        let config = config_loader.load().await;
753
754        // Build S3 client with path-style addressing for compatibility
755        let request_headers = global_request_headers();
756        let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&config)
757            .force_path_style(force_path_style_for_alias(&alias))
758            // Improve compatibility with S3-compatible backends by only sending request
759            // checksums when the operation explicitly requires them.
760            .request_checksum_calculation(
761                aws_sdk_s3::config::RequestChecksumCalculation::WhenRequired,
762            )
763            .response_checksum_validation(
764                aws_sdk_s3::config::ResponseChecksumValidation::WhenRequired,
765            );
766
767        let presign_s3_config = s3_config_builder.clone().build();
768
769        if !request_headers.is_empty() {
770            s3_config_builder = s3_config_builder.interceptor(CustomHeaderInterceptor {
771                headers: request_headers.clone(),
772            });
773        }
774
775        let s3_config = s3_config_builder.build();
776
777        let client = aws_sdk_s3::Client::from_conf(s3_config);
778        let presign_client = aws_sdk_s3::Client::from_conf(presign_s3_config);
779
780        Ok(Self {
781            inner: client,
782            presign_inner: presign_client,
783            xml_http_client,
784            alias,
785            request_headers,
786        })
787    }
788
789    /// Get the underlying aws-sdk-s3 client
790    pub fn inner(&self) -> &aws_sdk_s3::Client {
791        &self.inner
792    }
793
794    /// List a single page of object versions and return pagination metadata.
795    pub async fn list_object_versions_page(
796        &self,
797        path: &RemotePath,
798        max_keys: Option<i32>,
799    ) -> Result<ObjectVersionListResult> {
800        let mut builder = self.inner.list_object_versions().bucket(&path.bucket);
801
802        if !path.key.is_empty() {
803            builder = builder.prefix(&path.key);
804        }
805
806        if let Some(max) = max_keys {
807            builder = builder.max_keys(max);
808        }
809
810        let response = builder.send().await.map_err(|e| {
811            let err_str = Self::format_sdk_error(&e);
812            if err_str.contains("NotFound") || err_str.contains("NoSuchBucket") {
813                Error::NotFound(format!("Bucket not found: {}", path.bucket))
814            } else {
815                Error::Network(err_str)
816            }
817        })?;
818
819        let mut items = Vec::new();
820
821        for v in response.versions() {
822            items.push(ObjectVersion {
823                key: v.key().unwrap_or_default().to_string(),
824                version_id: v.version_id().unwrap_or("null").to_string(),
825                is_latest: v.is_latest().unwrap_or(false),
826                is_delete_marker: false,
827                last_modified: v
828                    .last_modified()
829                    .and_then(|dt| Timestamp::from_second(dt.secs()).ok()),
830                size_bytes: v.size(),
831                etag: v.e_tag().map(|s| s.trim_matches('"').to_string()),
832            });
833        }
834
835        for m in response.delete_markers() {
836            items.push(ObjectVersion {
837                key: m.key().unwrap_or_default().to_string(),
838                version_id: m.version_id().unwrap_or("null").to_string(),
839                is_latest: m.is_latest().unwrap_or(false),
840                is_delete_marker: true,
841                last_modified: m
842                    .last_modified()
843                    .and_then(|dt| Timestamp::from_second(dt.secs()).ok()),
844                size_bytes: None,
845                etag: None,
846            });
847        }
848
849        items.sort_by(|a, b| {
850            a.key
851                .cmp(&b.key)
852                .then_with(|| b.last_modified.cmp(&a.last_modified))
853        });
854
855        Ok(ObjectVersionListResult {
856            items,
857            truncated: response.is_truncated().unwrap_or(false),
858            continuation_token: response.next_key_marker().map(ToString::to_string),
859            version_id_marker: response.next_version_id_marker().map(ToString::to_string),
860        })
861    }
862
863    /// Download object content and report downloaded bytes after each received chunk.
864    pub async fn get_object_with_progress(
865        &self,
866        path: &RemotePath,
867        mut on_progress: impl FnMut(u64, Option<u64>) + Send,
868    ) -> Result<Vec<u8>> {
869        let response = self
870            .inner
871            .get_object()
872            .bucket(&path.bucket)
873            .key(&path.key)
874            .send()
875            .await
876            .map_err(|e| {
877                let err_str = e.to_string();
878                if err_str.contains("NotFound") || err_str.contains("NoSuchKey") {
879                    Error::NotFound(path.to_string())
880                } else {
881                    Error::Network(err_str)
882                }
883            })?;
884
885        let content_length = response
886            .content_length()
887            .and_then(|length| u64::try_from(length).ok());
888        let mut data = Vec::with_capacity(
889            content_length
890                .and_then(|length| usize::try_from(length).ok())
891                .unwrap_or_default(),
892        );
893        let mut body = response.body;
894        let mut bytes_downloaded = 0u64;
895
896        while let Some(chunk) = body
897            .try_next()
898            .await
899            .map_err(|e| Error::Network(e.to_string()))?
900        {
901            bytes_downloaded += chunk.len() as u64;
902            data.extend_from_slice(&chunk);
903            on_progress(bytes_downloaded, content_length);
904        }
905
906        Ok(data)
907    }
908
909    /// Delete an object with RustFS-specific request options.
910    pub async fn delete_object_with_options(
911        &self,
912        path: &RemotePath,
913        options: DeleteRequestOptions,
914    ) -> Result<()> {
915        let mut request = self
916            .inner
917            .delete_object()
918            .bucket(&path.bucket)
919            .key(&path.key)
920            .customize();
921
922        if options.force_delete {
923            request = request.mutate_request(|request| {
924                request
925                    .headers_mut()
926                    .insert(RUSTFS_FORCE_DELETE_HEADER, "true");
927            });
928        }
929
930        request.send().await.map_err(|e| {
931            let err_str = Self::format_sdk_error(&e);
932            let is_missing_key = if let aws_sdk_s3::error::SdkError::ServiceError(service_err) = &e
933            {
934                let code = service_err.err().code().or_else(|| {
935                    service_err
936                        .raw()
937                        .headers()
938                        .get("x-amz-error-code")
939                        .and_then(|value| std::str::from_utf8(value.as_bytes()).ok())
940                });
941                matches!(code, Some("NoSuchKey") | Some("NotFound"))
942                    || service_err.raw().status().as_u16() == 404
943            } else {
944                err_str.contains("NotFound") || err_str.contains("NoSuchKey")
945            };
946
947            if is_missing_key {
948                Error::NotFound(path.to_string())
949            } else {
950                Error::Network(err_str)
951            }
952        })?;
953
954        Ok(())
955    }
956
957    /// Delete multiple objects with RustFS-specific request options.
958    pub async fn delete_objects_with_options(
959        &self,
960        bucket: &str,
961        keys: Vec<String>,
962        options: DeleteRequestOptions,
963    ) -> Result<Vec<String>> {
964        use aws_sdk_s3::types::{Delete, ObjectIdentifier};
965
966        if keys.is_empty() {
967            return Ok(vec![]);
968        }
969
970        let objects: Vec<ObjectIdentifier> =
971            keys.iter()
972                .map(|key| {
973                    ObjectIdentifier::builder().key(key).build().map_err(|e| {
974                        Error::General(format!("invalid delete object identifier: {e}"))
975                    })
976                })
977                .collect::<Result<Vec<_>>>()?;
978
979        let delete = Delete::builder()
980            .set_objects(Some(objects))
981            .build()
982            .map_err(|e| Error::General(e.to_string()))?;
983
984        let mut request = self
985            .inner
986            .delete_objects()
987            .bucket(bucket)
988            .delete(delete)
989            .customize();
990
991        if options.force_delete {
992            request = request.mutate_request(|request| {
993                request
994                    .headers_mut()
995                    .insert(RUSTFS_FORCE_DELETE_HEADER, "true");
996            });
997        }
998
999        let response = request
1000            .send()
1001            .await
1002            .map_err(|e| Error::Network(e.to_string()))?;
1003
1004        let deleted: Vec<String> = response
1005            .deleted()
1006            .iter()
1007            .filter_map(|d| d.key().map(|k| k.to_string()))
1008            .collect();
1009
1010        if !response.errors().is_empty() {
1011            let error_keys: Vec<String> = response
1012                .errors()
1013                .iter()
1014                .filter_map(|e| e.key().map(|k| k.to_string()))
1015                .collect();
1016            tracing::warn!("Failed to delete some objects: {:?}", error_keys);
1017        }
1018
1019        Ok(deleted)
1020    }
1021
1022    /// Format AWS SDK error into a detailed error message
1023    fn format_sdk_error<E: std::fmt::Display>(error: &aws_sdk_s3::error::SdkError<E>) -> String {
1024        match error {
1025            aws_sdk_s3::error::SdkError::ServiceError(service_err) => {
1026                let err = service_err.err();
1027                let meta = service_err.raw();
1028                let mut msg = format!("Service error: {}", err);
1029                // Try to extract additional error information from headers
1030                if let Some(code) = meta.headers().get("x-amz-error-code")
1031                    && let Ok(code_str) = std::str::from_utf8(code.as_bytes())
1032                {
1033                    msg.push_str(&format!(" (code: {})", code_str));
1034                }
1035                msg
1036            }
1037            aws_sdk_s3::error::SdkError::ConstructionFailure(err) => {
1038                format!("Request construction failed: {:?}", err)
1039            }
1040            aws_sdk_s3::error::SdkError::TimeoutError(_) => "Request timeout".to_string(),
1041            aws_sdk_s3::error::SdkError::DispatchFailure(err) => {
1042                format!("Network dispatch error: {:?}", err)
1043            }
1044            aws_sdk_s3::error::SdkError::ResponseError(err) => {
1045                format!("Response error: {:?}", err)
1046            }
1047            _ => error.to_string(),
1048        }
1049    }
1050
1051    fn should_use_multipart(file_size: u64) -> bool {
1052        file_size > SINGLE_PUT_OBJECT_MAX_SIZE
1053    }
1054
1055    fn sha256_hash(body: &[u8]) -> String {
1056        let mut hasher = Sha256::new();
1057        hasher.update(body);
1058        hex::encode(hasher.finalize())
1059    }
1060
1061    fn request_host(&self, url: &reqwest::Url) -> Result<String> {
1062        let host = url
1063            .host_str()
1064            .ok_or_else(|| Error::Network("Missing host in request URL".to_string()))?;
1065        Ok(match url.port() {
1066            Some(port) => format!("{host}:{port}"),
1067            None => host.to_string(),
1068        })
1069    }
1070
1071    fn replication_url(&self, bucket: &str) -> Result<reqwest::Url> {
1072        let mut url =
1073            reqwest::Url::parse(self.alias.endpoint.trim_end_matches('/')).map_err(|e| {
1074                Error::Network(format!("Invalid endpoint '{}': {e}", self.alias.endpoint))
1075            })?;
1076
1077        {
1078            let mut segments = url.path_segments_mut().map_err(|_| {
1079                Error::Network(format!(
1080                    "Endpoint '{}' does not support path-style bucket operations",
1081                    self.alias.endpoint
1082                ))
1083            })?;
1084            segments.pop_if_empty();
1085            segments.push(bucket);
1086        }
1087
1088        url.set_query(Some("replication="));
1089        Ok(url)
1090    }
1091
1092    fn cors_url(&self, bucket: &str) -> Result<reqwest::Url> {
1093        let mut url =
1094            reqwest::Url::parse(self.alias.endpoint.trim_end_matches('/')).map_err(|e| {
1095                Error::Network(format!("Invalid endpoint '{}': {e}", self.alias.endpoint))
1096            })?;
1097
1098        {
1099            let mut segments = url.path_segments_mut().map_err(|_| {
1100                Error::Network(format!(
1101                    "Endpoint '{}' does not support path-style bucket operations",
1102                    self.alias.endpoint
1103                ))
1104            })?;
1105            segments.pop_if_empty();
1106            segments.push(bucket);
1107        }
1108
1109        url.set_query(Some("cors="));
1110        Ok(url)
1111    }
1112
1113    async fn sign_xml_request(
1114        &self,
1115        method: &Method,
1116        url: &str,
1117        headers: &HeaderMap,
1118        body: &[u8],
1119    ) -> Result<HeaderMap> {
1120        let credentials = Credentials::new(
1121            &self.alias.access_key,
1122            &self.alias.secret_key,
1123            None,
1124            None,
1125            "s3-xml-client",
1126        );
1127
1128        let identity = credentials.into();
1129        let mut signing_settings = SigningSettings::default();
1130        signing_settings.signature_location = SignatureLocation::Headers;
1131
1132        let signing_params = v4::SigningParams::builder()
1133            .identity(&identity)
1134            .region(&self.alias.region)
1135            .name(S3_SERVICE_NAME)
1136            .time(std::time::SystemTime::now())
1137            .settings(signing_settings)
1138            .build()
1139            .map_err(|e| Error::Auth(format!("Failed to build signing params: {e}")))?;
1140
1141        let header_pairs: Vec<(&str, &str)> = headers
1142            .iter()
1143            .filter_map(|(k, v)| v.to_str().ok().map(|v| (k.as_str(), v)))
1144            .collect();
1145
1146        let signable_request = SignableRequest::new(
1147            method.as_str(),
1148            url,
1149            header_pairs.into_iter(),
1150            SignableBody::Bytes(body),
1151        )
1152        .map_err(|e| Error::Auth(format!("Failed to create signable request: {e}")))?;
1153
1154        let (signing_instructions, _) = sign(signable_request, &signing_params.into())
1155            .map_err(|e| Error::Auth(format!("Failed to sign request: {e}")))?
1156            .into_parts();
1157
1158        let mut signed_headers = headers.clone();
1159        for (name, value) in signing_instructions.headers() {
1160            let header_name = HeaderName::try_from(name.to_string())
1161                .map_err(|e| Error::Auth(format!("Invalid header name: {e}")))?;
1162            let header_value = HeaderValue::try_from(value.to_string())
1163                .map_err(|e| Error::Auth(format!("Invalid header value: {e}")))?;
1164            signed_headers.insert(header_name, header_value);
1165        }
1166
1167        Ok(signed_headers)
1168    }
1169
1170    async fn xml_request(
1171        &self,
1172        method: Method,
1173        url: reqwest::Url,
1174        content_type: Option<&str>,
1175        body: Option<Vec<u8>>,
1176    ) -> Result<String> {
1177        let body = body.unwrap_or_default();
1178        let mut headers = HeaderMap::new();
1179        headers.insert(
1180            "x-amz-content-sha256",
1181            HeaderValue::from_str(&Self::sha256_hash(&body))
1182                .map_err(|e| Error::Auth(format!("Invalid content hash header: {e}")))?,
1183        );
1184        headers.insert(
1185            "host",
1186            HeaderValue::from_str(&self.request_host(&url)?)
1187                .map_err(|e| Error::Auth(format!("Invalid host header: {e}")))?,
1188        );
1189
1190        if let Some(content_type) = content_type {
1191            headers.insert(
1192                CONTENT_TYPE,
1193                HeaderValue::from_str(content_type)
1194                    .map_err(|e| Error::Auth(format!("Invalid content type header: {e}")))?,
1195            );
1196        }
1197
1198        for header in &self.request_headers {
1199            let name = HeaderName::from_bytes(header.name.as_bytes())
1200                .map_err(|e| Error::Auth(format!("Invalid custom header name: {e}")))?;
1201            let value = HeaderValue::from_str(&header.value)
1202                .map_err(|e| Error::Auth(format!("Invalid custom header value: {e}")))?;
1203            headers.insert(name, value);
1204        }
1205
1206        let signed_headers = self
1207            .sign_xml_request(&method, url.as_str(), &headers, &body)
1208            .await?;
1209
1210        let mut request_builder = self.xml_http_client.request(method, url);
1211        for (name, value) in &signed_headers {
1212            request_builder = request_builder.header(name, value);
1213        }
1214        if !body.is_empty() {
1215            request_builder = request_builder.body(body);
1216        }
1217
1218        let response = request_builder
1219            .send()
1220            .await
1221            .map_err(|e| Error::Network(format!("Request failed: {e}")))?;
1222
1223        let status = response.status();
1224        let text = response
1225            .text()
1226            .await
1227            .map_err(|e| Error::Network(format!("Failed to read response: {e}")))?;
1228
1229        if !status.is_success() {
1230            return Err(Error::Network(format!(
1231                "HTTP {}: {}",
1232                status.as_u16(),
1233                text
1234            )));
1235        }
1236
1237        Ok(text)
1238    }
1239
1240    fn bucket_policy_error_kind(
1241        error_code: Option<&str>,
1242        status_code: Option<u16>,
1243        error_text: &str,
1244    ) -> BucketPolicyErrorKind {
1245        let error_code = error_code.map(|code| code.to_ascii_lowercase());
1246        if matches!(
1247            error_code.as_deref(),
1248            Some("nosuchbucketpolicy") | Some("nosuchpolicy")
1249        ) {
1250            return BucketPolicyErrorKind::MissingPolicy;
1251        }
1252        if matches!(error_code.as_deref(), Some("nosuchbucket")) {
1253            return BucketPolicyErrorKind::MissingBucket;
1254        }
1255
1256        let error_text = error_text.to_ascii_lowercase();
1257        if error_text.contains("nosuchbucketpolicy") || error_text.contains("nosuchpolicy") {
1258            return BucketPolicyErrorKind::MissingPolicy;
1259        }
1260        if error_text.contains("nosuchbucket") {
1261            return BucketPolicyErrorKind::MissingBucket;
1262        }
1263        if status_code == Some(404) {
1264            return BucketPolicyErrorKind::MissingPolicy;
1265        }
1266
1267        BucketPolicyErrorKind::Other
1268    }
1269
1270    fn map_get_bucket_policy_error(
1271        bucket: &str,
1272        kind: BucketPolicyErrorKind,
1273        error_text: &str,
1274    ) -> Result<Option<String>> {
1275        match kind {
1276            BucketPolicyErrorKind::MissingPolicy => Ok(None),
1277            BucketPolicyErrorKind::MissingBucket => {
1278                Err(Error::NotFound(format!("Bucket not found: {bucket}")))
1279            }
1280            BucketPolicyErrorKind::Other => {
1281                Err(Error::Network(format!("get_bucket_policy: {error_text}")))
1282            }
1283        }
1284    }
1285
1286    fn map_delete_bucket_policy_error(
1287        bucket: &str,
1288        kind: BucketPolicyErrorKind,
1289        error_text: &str,
1290    ) -> Result<()> {
1291        match kind {
1292            BucketPolicyErrorKind::MissingPolicy => Ok(()),
1293            BucketPolicyErrorKind::MissingBucket => {
1294                Err(Error::NotFound(format!("Bucket not found: {bucket}")))
1295            }
1296            BucketPolicyErrorKind::Other => Err(Error::General(format!(
1297                "delete_bucket_policy: {error_text}"
1298            ))),
1299        }
1300    }
1301
1302    fn extract_notification_filter(
1303        filter: Option<&aws_sdk_s3::types::NotificationConfigurationFilter>,
1304    ) -> (Option<String>, Option<String>) {
1305        let mut prefix = None;
1306        let mut suffix = None;
1307
1308        if let Some(key_filter) = filter.and_then(|value| value.key()) {
1309            for rule in key_filter.filter_rules() {
1310                match rule.name().map(|name| name.as_str()) {
1311                    Some("prefix") => {
1312                        prefix = rule.value().map(ToString::to_string);
1313                    }
1314                    Some("suffix") => {
1315                        suffix = rule.value().map(ToString::to_string);
1316                    }
1317                    _ => {}
1318                }
1319            }
1320        }
1321
1322        (prefix, suffix)
1323    }
1324
1325    fn build_notification_filter(
1326        prefix: Option<&str>,
1327        suffix: Option<&str>,
1328    ) -> Option<aws_sdk_s3::types::NotificationConfigurationFilter> {
1329        use aws_sdk_s3::types::{FilterRule, FilterRuleName, NotificationConfigurationFilter};
1330
1331        let mut rules = Vec::new();
1332        if let Some(value) = prefix {
1333            let rule = FilterRule::builder()
1334                .name(FilterRuleName::Prefix)
1335                .value(value)
1336                .build();
1337            rules.push(rule);
1338        }
1339        if let Some(value) = suffix {
1340            let rule = FilterRule::builder()
1341                .name(FilterRuleName::Suffix)
1342                .value(value)
1343                .build();
1344            rules.push(rule);
1345        }
1346        if rules.is_empty() {
1347            return None;
1348        }
1349
1350        let key_filter = aws_sdk_s3::types::S3KeyFilter::builder()
1351            .set_filter_rules(Some(rules))
1352            .build();
1353        NotificationConfigurationFilter::builder()
1354            .key(key_filter)
1355            .build()
1356            .into()
1357    }
1358
1359    fn event_list_to_strings(events: &[aws_sdk_s3::types::Event]) -> Vec<String> {
1360        events
1361            .iter()
1362            .map(|event| event.as_str().to_string())
1363            .collect()
1364    }
1365
1366    fn strings_to_event_list(events: &[String]) -> Vec<aws_sdk_s3::types::Event> {
1367        events
1368            .iter()
1369            .map(|event| aws_sdk_s3::types::Event::from(event.as_str()))
1370            .collect()
1371    }
1372
1373    fn notifications_equivalent(
1374        expected: &[BucketNotification],
1375        actual: &[BucketNotification],
1376    ) -> bool {
1377        type CanonicalEntry = (u8, String, Option<String>, Option<String>, Vec<String>);
1378
1379        fn target_order(target: NotificationTarget) -> u8 {
1380            match target {
1381                NotificationTarget::Queue => 0,
1382                NotificationTarget::Topic => 1,
1383                NotificationTarget::Lambda => 2,
1384            }
1385        }
1386
1387        fn canonical(notifications: &[BucketNotification]) -> Vec<CanonicalEntry> {
1388            let mut normalized: Vec<CanonicalEntry> = notifications
1389                .iter()
1390                .map(|item| {
1391                    let mut events = item.events.clone();
1392                    events.sort();
1393                    events.dedup();
1394                    (
1395                        target_order(item.target),
1396                        item.arn.clone(),
1397                        item.prefix.clone(),
1398                        item.suffix.clone(),
1399                        events,
1400                    )
1401                })
1402                .collect();
1403            normalized.sort();
1404            normalized
1405        }
1406
1407        canonical(expected) == canonical(actual)
1408    }
1409
1410    async fn read_next_part(
1411        file: &mut tokio::fs::File,
1412        file_path: &std::path::Path,
1413        buffer: &mut [u8],
1414    ) -> Result<usize> {
1415        let mut total_read = 0usize;
1416        while total_read < buffer.len() {
1417            let bytes_read = file
1418                .read(&mut buffer[total_read..])
1419                .await
1420                .map_err(|e| Error::General(format!("read file '{}': {e}", file_path.display())))?;
1421            if bytes_read == 0 {
1422                break;
1423            }
1424            total_read += bytes_read;
1425        }
1426        Ok(total_read)
1427    }
1428
1429    async fn put_object_single_part_from_path(
1430        &self,
1431        path: &RemotePath,
1432        file_path: &std::path::Path,
1433        content_type: Option<&str>,
1434        file_size: u64,
1435    ) -> Result<ObjectInfo> {
1436        let data = tokio::fs::read(file_path)
1437            .await
1438            .map_err(|e| Error::General(format!("read file '{}': {e}", file_path.display())))?;
1439        let body = aws_sdk_s3::primitives::ByteStream::from(data);
1440
1441        let mut request = self
1442            .inner
1443            .put_object()
1444            .bucket(&path.bucket)
1445            .key(&path.key)
1446            .body(body);
1447
1448        if let Some(ct) = content_type {
1449            request = request.content_type(ct);
1450        }
1451
1452        let response = request
1453            .send()
1454            .await
1455            .map_err(|e| Error::Network(e.to_string()))?;
1456
1457        let mut info = ObjectInfo::file(&path.key, file_size as i64);
1458        if let Some(etag) = response.e_tag() {
1459            info.etag = Some(etag.trim_matches('"').to_string());
1460        }
1461        info.last_modified = Some(jiff::Timestamp::now());
1462
1463        Ok(info)
1464    }
1465
1466    async fn abort_multipart_upload_best_effort(&self, path: &RemotePath, upload_id: &str) {
1467        let _ = self
1468            .inner
1469            .abort_multipart_upload()
1470            .bucket(&path.bucket)
1471            .key(&path.key)
1472            .upload_id(upload_id)
1473            .send()
1474            .await;
1475    }
1476
1477    async fn put_object_multipart_from_path(
1478        &self,
1479        path: &RemotePath,
1480        file_path: &std::path::Path,
1481        content_type: Option<&str>,
1482        file_size: u64,
1483        on_progress: impl Fn(u64) + Send,
1484    ) -> Result<ObjectInfo> {
1485        use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart};
1486
1487        let config = crate::multipart::MultipartConfig::default();
1488        let part_size = config.calculate_part_size(file_size);
1489        let part_buffer_size = usize::try_from(part_size)
1490            .map_err(|_| Error::General(format!("invalid part size: {part_size}")))?;
1491
1492        tracing::debug!(file_size, part_size, "Starting multipart upload");
1493
1494        let mut create_request = self
1495            .inner
1496            .create_multipart_upload()
1497            .bucket(&path.bucket)
1498            .key(&path.key);
1499
1500        if let Some(ct) = content_type {
1501            create_request = create_request.content_type(ct);
1502        }
1503
1504        let create_response = create_request
1505            .send()
1506            .await
1507            .map_err(|e| Error::Network(format!("create multipart upload: {e}")))?;
1508
1509        let upload_id = create_response
1510            .upload_id()
1511            .ok_or_else(|| Error::General("missing upload id from multipart upload".to_string()))?
1512            .to_string();
1513
1514        tracing::debug!(upload_id = %upload_id, "Multipart upload initiated");
1515
1516        let mut file = tokio::fs::File::open(file_path)
1517            .await
1518            .map_err(|e| Error::General(format!("open file '{}': {e}", file_path.display())))?;
1519        let mut completed_parts = Vec::new();
1520        let mut part_number: i32 = 1;
1521        let mut chunk = vec![0u8; part_buffer_size];
1522        let mut bytes_uploaded: u64 = 0;
1523
1524        loop {
1525            let bytes_read = Self::read_next_part(&mut file, file_path, &mut chunk).await?;
1526            if bytes_read == 0 {
1527                break;
1528            }
1529
1530            tracing::debug!(part_number, bytes_read, "Uploading part");
1531
1532            let body = aws_sdk_s3::primitives::ByteStream::from(chunk[..bytes_read].to_vec());
1533            let upload_part_result = self
1534                .inner
1535                .upload_part()
1536                .bucket(&path.bucket)
1537                .key(&path.key)
1538                .upload_id(&upload_id)
1539                .part_number(part_number)
1540                .body(body)
1541                .send()
1542                .await;
1543
1544            let upload_part_response = match upload_part_result {
1545                Ok(response) => response,
1546                Err(e) => {
1547                    tracing::debug!(
1548                        upload_id = %upload_id,
1549                        part_number,
1550                        "Aborting multipart upload due to error"
1551                    );
1552                    self.abort_multipart_upload_best_effort(path, &upload_id)
1553                        .await;
1554                    return Err(Error::Network(format!(
1555                        "upload multipart part {part_number}: {e}"
1556                    )));
1557                }
1558            };
1559
1560            let etag = match upload_part_response.e_tag() {
1561                Some(value) => value.trim_matches('"').to_string(),
1562                None => {
1563                    self.abort_multipart_upload_best_effort(path, &upload_id)
1564                        .await;
1565                    return Err(Error::General(format!(
1566                        "missing ETag for multipart part {part_number}"
1567                    )));
1568                }
1569            };
1570
1571            completed_parts.push(
1572                CompletedPart::builder()
1573                    .part_number(part_number)
1574                    .e_tag(etag)
1575                    .build(),
1576            );
1577
1578            bytes_uploaded += bytes_read as u64;
1579            on_progress(bytes_uploaded);
1580            tracing::debug!(part_number, bytes_uploaded, "Part uploaded");
1581
1582            part_number += 1;
1583        }
1584
1585        let completed_upload = CompletedMultipartUpload::builder()
1586            .set_parts(Some(completed_parts))
1587            .build();
1588        let complete_result = self
1589            .inner
1590            .complete_multipart_upload()
1591            .bucket(&path.bucket)
1592            .key(&path.key)
1593            .upload_id(&upload_id)
1594            .multipart_upload(completed_upload)
1595            .send()
1596            .await;
1597
1598        let complete_response = match complete_result {
1599            Ok(response) => response,
1600            Err(e) => {
1601                tracing::debug!(upload_id = %upload_id, "Attempting to abort multipart upload after completion failure");
1602                self.abort_multipart_upload_best_effort(path, &upload_id)
1603                    .await;
1604                return Err(Error::Network(format!("complete multipart upload: {e}")));
1605            }
1606        };
1607
1608        tracing::debug!("Multipart upload completed");
1609
1610        let mut info = ObjectInfo::file(&path.key, file_size as i64);
1611        if let Some(etag) = complete_response.e_tag() {
1612            info.etag = Some(etag.trim_matches('"').to_string());
1613        }
1614        info.last_modified = Some(jiff::Timestamp::now());
1615
1616        Ok(info)
1617    }
1618
1619    /// Upload a local file path to S3.
1620    ///
1621    /// Uses multipart upload for large files to avoid loading the entire file into memory.
1622    /// Calls `on_progress` after each uploaded part with total bytes sent so far.
1623    pub async fn put_object_from_path(
1624        &self,
1625        path: &RemotePath,
1626        file_path: &std::path::Path,
1627        content_type: Option<&str>,
1628        on_progress: impl Fn(u64) + Send,
1629    ) -> Result<ObjectInfo> {
1630        let metadata = tokio::fs::metadata(file_path).await.map_err(|e| {
1631            Error::General(format!("read metadata for '{}': {e}", file_path.display()))
1632        })?;
1633        if !metadata.is_file() {
1634            return Err(Error::General(format!(
1635                "source is not a file: {}",
1636                file_path.display()
1637            )));
1638        }
1639
1640        let file_size = metadata.len();
1641        if Self::should_use_multipart(file_size) {
1642            self.put_object_multipart_from_path(
1643                path,
1644                file_path,
1645                content_type,
1646                file_size,
1647                on_progress,
1648            )
1649            .await
1650        } else {
1651            self.put_object_single_part_from_path(path, file_path, content_type, file_size)
1652                .await
1653        }
1654    }
1655}
1656
1657fn build_tagging(
1658    tags: std::collections::HashMap<String, String>,
1659) -> Result<aws_sdk_s3::types::Tagging> {
1660    use aws_sdk_s3::types::{Tag, Tagging};
1661
1662    let mut tag_set = Vec::with_capacity(tags.len());
1663    for (key, value) in tags {
1664        let tag = Tag::builder()
1665            .key(key)
1666            .value(value)
1667            .build()
1668            .map_err(|e| Error::General(format!("invalid tag: {e}")))?;
1669        tag_set.push(tag);
1670    }
1671
1672    Tagging::builder()
1673        .set_tag_set(Some(tag_set))
1674        .build()
1675        .map_err(|e| Error::General(format!("invalid tagging payload: {e}")))
1676}
1677
1678#[async_trait]
1679impl ObjectStore for S3Client {
1680    async fn list_buckets(&self) -> Result<Vec<ObjectInfo>> {
1681        let response = self
1682            .inner
1683            .list_buckets()
1684            .send()
1685            .await
1686            .map_err(|e| Error::Network(e.to_string()))?;
1687
1688        let buckets = response
1689            .buckets()
1690            .iter()
1691            .map(|b| {
1692                let mut info = ObjectInfo::bucket(b.name().unwrap_or_default());
1693                if let Some(creation_date) = b.creation_date() {
1694                    info.last_modified = jiff::Timestamp::from_second(creation_date.secs()).ok();
1695                }
1696                info
1697            })
1698            .collect();
1699
1700        Ok(buckets)
1701    }
1702
1703    async fn list_objects(&self, path: &RemotePath, options: ListOptions) -> Result<ListResult> {
1704        let mut request = self.inner.list_objects_v2().bucket(&path.bucket);
1705
1706        // Set prefix
1707        let prefix = if path.key.is_empty() {
1708            options.prefix.clone()
1709        } else if let Some(p) = &options.prefix {
1710            Some(format!("{}{}", path.key, p))
1711        } else {
1712            Some(path.key.clone())
1713        };
1714
1715        if let Some(p) = prefix {
1716            request = request.prefix(p);
1717        }
1718
1719        // Set delimiter (for non-recursive listing)
1720        if !options.recursive {
1721            request = request.delimiter(options.delimiter.as_deref().unwrap_or("/"));
1722        }
1723
1724        // Set max keys
1725        if let Some(max) = options.max_keys {
1726            request = request.max_keys(max);
1727        }
1728
1729        // Set continuation token
1730        if let Some(token) = &options.continuation_token {
1731            request = request.continuation_token(token);
1732        }
1733
1734        let response = request.send().await.map_err(|e| {
1735            let err_str = Self::format_sdk_error(&e);
1736            if err_str.contains("NotFound") || err_str.contains("NoSuchBucket") {
1737                Error::NotFound(format!("Bucket not found: {}", path.bucket))
1738            } else {
1739                Error::Network(err_str)
1740            }
1741        })?;
1742
1743        let mut items = Vec::new();
1744
1745        // Add common prefixes (directories)
1746        for prefix in response.common_prefixes() {
1747            if let Some(p) = prefix.prefix() {
1748                items.push(ObjectInfo::dir(p));
1749            }
1750        }
1751
1752        // Add objects
1753        for object in response.contents() {
1754            let key = object.key().unwrap_or_default().to_string();
1755            let size = object.size().unwrap_or(0);
1756            let mut info = ObjectInfo::file(&key, size);
1757
1758            if let Some(modified) = object.last_modified() {
1759                info.last_modified = jiff::Timestamp::from_second(modified.secs()).ok();
1760            }
1761
1762            if let Some(etag) = object.e_tag() {
1763                info.etag = Some(etag.trim_matches('"').to_string());
1764            }
1765
1766            if let Some(sc) = object.storage_class() {
1767                info.storage_class = Some(sc.as_str().to_string());
1768            }
1769
1770            items.push(info);
1771        }
1772
1773        Ok(ListResult {
1774            items,
1775            truncated: response.is_truncated().unwrap_or(false),
1776            continuation_token: response.next_continuation_token().map(|s| s.to_string()),
1777        })
1778    }
1779
1780    async fn head_object(&self, path: &RemotePath) -> Result<ObjectInfo> {
1781        let response = self
1782            .inner
1783            .head_object()
1784            .bucket(&path.bucket)
1785            .key(&path.key)
1786            .send()
1787            .await
1788            .map_err(|e| {
1789                let err_str = e.to_string();
1790                if err_str.contains("NotFound") || err_str.contains("NoSuchKey") {
1791                    Error::NotFound(path.to_string())
1792                } else {
1793                    Error::Network(err_str)
1794                }
1795            })?;
1796
1797        let size = response.content_length().unwrap_or(0);
1798        let mut info = ObjectInfo::file(&path.key, size);
1799
1800        if let Some(modified) = response.last_modified() {
1801            info.last_modified = jiff::Timestamp::from_second(modified.secs()).ok();
1802        }
1803
1804        if let Some(etag) = response.e_tag() {
1805            info.etag = Some(etag.trim_matches('"').to_string());
1806        }
1807
1808        if let Some(ct) = response.content_type() {
1809            info.content_type = Some(ct.to_string());
1810        }
1811
1812        if let Some(sc) = response.storage_class() {
1813            info.storage_class = Some(sc.as_str().to_string());
1814        }
1815
1816        if let Some(meta) = response.metadata()
1817            && !meta.is_empty()
1818        {
1819            info.metadata = Some(meta.clone());
1820        }
1821
1822        Ok(info)
1823    }
1824
1825    async fn bucket_exists(&self, bucket: &str) -> Result<bool> {
1826        match self.inner.head_bucket().bucket(bucket).send().await {
1827            Ok(_) => Ok(true),
1828            Err(e) => {
1829                // Check HTTP status code for 404 first to avoid unnecessary string formatting
1830                // Some S3-compatible services may return 404 without standard error codes
1831                if let aws_sdk_s3::error::SdkError::ServiceError(ref service_err) = e
1832                    && service_err.raw().status().as_u16() == 404
1833                {
1834                    return Ok(false);
1835                }
1836                let err_str = e.to_string();
1837                if err_str.contains("NotFound") || err_str.contains("NoSuchBucket") {
1838                    return Ok(false);
1839                }
1840                Err(Error::Network(err_str))
1841            }
1842        }
1843    }
1844
1845    async fn create_bucket(&self, bucket: &str) -> Result<()> {
1846        self.inner
1847            .create_bucket()
1848            .bucket(bucket)
1849            .send()
1850            .await
1851            .map_err(|e| Error::Network(e.to_string()))?;
1852
1853        Ok(())
1854    }
1855
1856    async fn delete_bucket(&self, bucket: &str) -> Result<()> {
1857        self.inner
1858            .delete_bucket()
1859            .bucket(bucket)
1860            .send()
1861            .await
1862            .map_err(|e| {
1863                let err_str = Self::format_sdk_error(&e);
1864                if err_str.contains("NotFound") || err_str.contains("NoSuchBucket") {
1865                    Error::NotFound(format!("Bucket not found: {bucket}"))
1866                } else if err_str.contains("BucketNotEmpty") {
1867                    Error::Conflict(err_str)
1868                } else {
1869                    Error::Network(err_str)
1870                }
1871            })?;
1872
1873        Ok(())
1874    }
1875
1876    async fn capabilities(&self) -> Result<Capabilities> {
1877        // Best-effort hints for common S3-compatible backends. `select` is not inferred here
1878        // because `rc sql` determines support from the real request result.
1879        Ok(Capabilities {
1880            versioning: true,
1881            object_lock: false,
1882            tagging: true,
1883            anonymous: true,
1884            select: false,
1885            notifications: true,
1886            lifecycle: true,
1887            replication: true,
1888            cors: true,
1889        })
1890    }
1891
1892    async fn get_object(&self, path: &RemotePath) -> Result<Vec<u8>> {
1893        self.get_object_with_progress(path, |_, _| {}).await
1894    }
1895
1896    async fn put_object(
1897        &self,
1898        path: &RemotePath,
1899        data: Vec<u8>,
1900        content_type: Option<&str>,
1901    ) -> Result<ObjectInfo> {
1902        let size = data.len() as i64;
1903        let body = aws_sdk_s3::primitives::ByteStream::from(data);
1904
1905        let mut request = self
1906            .inner
1907            .put_object()
1908            .bucket(&path.bucket)
1909            .key(&path.key)
1910            .body(body);
1911
1912        if let Some(ct) = content_type {
1913            request = request.content_type(ct);
1914        }
1915
1916        let response = request
1917            .send()
1918            .await
1919            .map_err(|e| Error::Network(e.to_string()))?;
1920
1921        let mut info = ObjectInfo::file(&path.key, size);
1922        if let Some(etag) = response.e_tag() {
1923            info.etag = Some(etag.trim_matches('"').to_string());
1924        }
1925        info.last_modified = Some(jiff::Timestamp::now());
1926
1927        Ok(info)
1928    }
1929
1930    async fn delete_object(&self, path: &RemotePath) -> Result<()> {
1931        self.delete_object_with_options(path, DeleteRequestOptions::default())
1932            .await
1933    }
1934
1935    async fn delete_objects(&self, bucket: &str, keys: Vec<String>) -> Result<Vec<String>> {
1936        self.delete_objects_with_options(bucket, keys, DeleteRequestOptions::default())
1937            .await
1938    }
1939
1940    async fn copy_object(&self, src: &RemotePath, dst: &RemotePath) -> Result<ObjectInfo> {
1941        // Build copy source: bucket/key
1942        let copy_source = format!("{}/{}", src.bucket, src.key);
1943
1944        let response = self
1945            .inner
1946            .copy_object()
1947            .copy_source(&copy_source)
1948            .bucket(&dst.bucket)
1949            .key(&dst.key)
1950            .send()
1951            .await
1952            .map_err(|e| {
1953                let err_str = e.to_string();
1954                if err_str.contains("NotFound") || err_str.contains("NoSuchKey") {
1955                    Error::NotFound(src.to_string())
1956                } else {
1957                    Error::Network(err_str)
1958                }
1959            })?;
1960
1961        // Get size from head_object since copy doesn't return it
1962        let info = self.head_object(dst).await?;
1963
1964        // Update etag from copy response if available
1965        let mut result = info;
1966        if let Some(copy_result) = response.copy_object_result()
1967            && let Some(etag) = copy_result.e_tag()
1968        {
1969            result.etag = Some(etag.trim_matches('"').to_string());
1970        }
1971
1972        Ok(result)
1973    }
1974
1975    async fn presign_get(&self, path: &RemotePath, expires_secs: u64) -> Result<String> {
1976        let config = aws_sdk_s3::presigning::PresigningConfig::builder()
1977            .expires_in(std::time::Duration::from_secs(expires_secs))
1978            .build()
1979            .map_err(|e| Error::General(format!("presign_get config: {e}")))?;
1980
1981        let request = self
1982            .presign_inner
1983            .get_object()
1984            .bucket(&path.bucket)
1985            .key(&path.key)
1986            .presigned(config)
1987            .await
1988            .map_err(|e| Error::General(format!("presign_get: {e}")))?;
1989
1990        Ok(request.uri().to_string())
1991    }
1992
1993    async fn presign_put(
1994        &self,
1995        path: &RemotePath,
1996        expires_secs: u64,
1997        content_type: Option<&str>,
1998    ) -> Result<String> {
1999        let config = aws_sdk_s3::presigning::PresigningConfig::builder()
2000            .expires_in(std::time::Duration::from_secs(expires_secs))
2001            .build()
2002            .map_err(|e| Error::General(format!("presign_put config: {e}")))?;
2003
2004        let mut builder = self
2005            .presign_inner
2006            .put_object()
2007            .bucket(&path.bucket)
2008            .key(&path.key);
2009
2010        if let Some(ct) = content_type {
2011            builder = builder.content_type(ct);
2012        }
2013
2014        let request = builder
2015            .presigned(config)
2016            .await
2017            .map_err(|e| Error::General(format!("presign_put: {e}")))?;
2018
2019        Ok(request.uri().to_string())
2020    }
2021
2022    async fn get_versioning(&self, bucket: &str) -> Result<Option<bool>> {
2023        let response = self
2024            .inner
2025            .get_bucket_versioning()
2026            .bucket(bucket)
2027            .send()
2028            .await
2029            .map_err(|e| Error::General(format!("get_versioning: {e}")))?;
2030
2031        Ok(response
2032            .status()
2033            .map(|s| *s == aws_sdk_s3::types::BucketVersioningStatus::Enabled))
2034    }
2035
2036    async fn set_versioning(&self, bucket: &str, enabled: bool) -> Result<()> {
2037        use aws_sdk_s3::types::{BucketVersioningStatus, VersioningConfiguration};
2038
2039        let status = if enabled {
2040            BucketVersioningStatus::Enabled
2041        } else {
2042            BucketVersioningStatus::Suspended
2043        };
2044
2045        let config = VersioningConfiguration::builder().status(status).build();
2046
2047        self.inner
2048            .put_bucket_versioning()
2049            .bucket(bucket)
2050            .versioning_configuration(config)
2051            .send()
2052            .await
2053            .map_err(|e| Error::General(format!("set_versioning: {e}")))?;
2054
2055        Ok(())
2056    }
2057
2058    async fn list_object_versions(
2059        &self,
2060        path: &RemotePath,
2061        max_keys: Option<i32>,
2062    ) -> Result<Vec<ObjectVersion>> {
2063        Ok(self.list_object_versions_page(path, max_keys).await?.items)
2064    }
2065
2066    async fn get_object_tags(
2067        &self,
2068        path: &RemotePath,
2069    ) -> Result<std::collections::HashMap<String, String>> {
2070        let response = match self
2071            .inner
2072            .get_object_tagging()
2073            .bucket(&path.bucket)
2074            .key(&path.key)
2075            .send()
2076            .await
2077        {
2078            Ok(response) => response,
2079            Err(e) => {
2080                if e.to_string().contains("NoSuchTagSet") {
2081                    return Ok(std::collections::HashMap::new());
2082                }
2083                return Err(Error::General(format!("get_object_tags: {e}")));
2084            }
2085        };
2086
2087        let mut tags = std::collections::HashMap::new();
2088        for tag in response.tag_set() {
2089            let key = tag.key();
2090            let value = tag.value();
2091            tags.insert(key.to_string(), value.to_string());
2092        }
2093
2094        Ok(tags)
2095    }
2096
2097    async fn get_bucket_tags(
2098        &self,
2099        bucket: &str,
2100    ) -> Result<std::collections::HashMap<String, String>> {
2101        let response = match self.inner.get_bucket_tagging().bucket(bucket).send().await {
2102            Ok(response) => response,
2103            Err(e) => {
2104                if e.to_string().contains("NoSuchTagSet") {
2105                    return Ok(std::collections::HashMap::new());
2106                }
2107                return Err(Error::General(format!("get_bucket_tags: {e}")));
2108            }
2109        };
2110
2111        let mut tags = std::collections::HashMap::new();
2112        for tag in response.tag_set() {
2113            let key = tag.key();
2114            let value = tag.value();
2115            tags.insert(key.to_string(), value.to_string());
2116        }
2117
2118        Ok(tags)
2119    }
2120
2121    async fn set_object_tags(
2122        &self,
2123        path: &RemotePath,
2124        tags: std::collections::HashMap<String, String>,
2125    ) -> Result<()> {
2126        let tagging = build_tagging(tags)?;
2127
2128        self.inner
2129            .put_object_tagging()
2130            .bucket(&path.bucket)
2131            .key(&path.key)
2132            .tagging(tagging)
2133            .send()
2134            .await
2135            .map_err(|e| Error::General(format!("set_object_tags: {e}")))?;
2136
2137        Ok(())
2138    }
2139
2140    async fn set_bucket_tags(
2141        &self,
2142        bucket: &str,
2143        tags: std::collections::HashMap<String, String>,
2144    ) -> Result<()> {
2145        let tagging = build_tagging(tags)?;
2146
2147        self.inner
2148            .put_bucket_tagging()
2149            .bucket(bucket)
2150            .tagging(tagging)
2151            .send()
2152            .await
2153            .map_err(|e| Error::General(format!("set_bucket_tags: {e}")))?;
2154
2155        Ok(())
2156    }
2157
2158    async fn delete_object_tags(&self, path: &RemotePath) -> Result<()> {
2159        self.inner
2160            .delete_object_tagging()
2161            .bucket(&path.bucket)
2162            .key(&path.key)
2163            .send()
2164            .await
2165            .map_err(|e| Error::General(format!("delete_object_tags: {e}")))?;
2166
2167        Ok(())
2168    }
2169
2170    async fn delete_bucket_tags(&self, bucket: &str) -> Result<()> {
2171        self.inner
2172            .delete_bucket_tagging()
2173            .bucket(bucket)
2174            .send()
2175            .await
2176            .map_err(|e| Error::General(format!("delete_bucket_tags: {e}")))?;
2177
2178        Ok(())
2179    }
2180
2181    async fn get_bucket_policy(&self, bucket: &str) -> Result<Option<String>> {
2182        let response = match self.inner.get_bucket_policy().bucket(bucket).send().await {
2183            Ok(policy) => policy,
2184            Err(error) => {
2185                let error_text = error.to_string();
2186                let kind = if let aws_sdk_s3::error::SdkError::ServiceError(service_err) = &error {
2187                    let code = service_err
2188                        .raw()
2189                        .headers()
2190                        .get("x-amz-error-code")
2191                        .and_then(|value| std::str::from_utf8(value.as_bytes()).ok());
2192                    let status = Some(service_err.raw().status().as_u16());
2193                    Self::bucket_policy_error_kind(code, status, &error_text)
2194                } else {
2195                    Self::bucket_policy_error_kind(None, None, &error_text)
2196                };
2197                return Self::map_get_bucket_policy_error(bucket, kind, &error_text);
2198            }
2199        };
2200
2201        Ok(response.policy().map(|policy| policy.to_string()))
2202    }
2203
2204    async fn set_bucket_policy(&self, bucket: &str, policy: &str) -> Result<()> {
2205        self.inner
2206            .put_bucket_policy()
2207            .bucket(bucket)
2208            .policy(policy)
2209            .send()
2210            .await
2211            .map_err(|e| Error::General(format!("set_bucket_policy: {e}")))?;
2212
2213        Ok(())
2214    }
2215
2216    async fn delete_bucket_policy(&self, bucket: &str) -> Result<()> {
2217        match self
2218            .inner
2219            .delete_bucket_policy()
2220            .bucket(bucket)
2221            .send()
2222            .await
2223        {
2224            Ok(_) => Ok(()),
2225            Err(e) => {
2226                let error_text = e.to_string();
2227                let kind = if let aws_sdk_s3::error::SdkError::ServiceError(service_err) = &e {
2228                    let code = service_err
2229                        .raw()
2230                        .headers()
2231                        .get("x-amz-error-code")
2232                        .and_then(|value| std::str::from_utf8(value.as_bytes()).ok());
2233                    let status = Some(service_err.raw().status().as_u16());
2234                    Self::bucket_policy_error_kind(code, status, &error_text)
2235                } else {
2236                    Self::bucket_policy_error_kind(None, None, &error_text)
2237                };
2238                Self::map_delete_bucket_policy_error(bucket, kind, &error_text)
2239            }
2240        }
2241    }
2242
2243    async fn get_bucket_cors(&self, bucket: &str) -> Result<Vec<CorsRule>> {
2244        let response = match self.inner.get_bucket_cors().bucket(bucket).send().await {
2245            Ok(response) => response,
2246            Err(error) => {
2247                let error_text = error.to_string();
2248                if error_text.contains("service error")
2249                    && let Ok(url) = self.cors_url(bucket)
2250                {
2251                    match self.xml_request(Method::GET, url, None, None).await {
2252                        Ok(body) => return parse_cors_configuration_xml(&body),
2253                        Err(Error::Network(raw_error))
2254                            if is_missing_cors_configuration_error(&raw_error) =>
2255                        {
2256                            return Ok(Vec::new());
2257                        }
2258                        Err(_) => {}
2259                    }
2260                }
2261
2262                let missing_config =
2263                    if let aws_sdk_s3::error::SdkError::ServiceError(service_err) = &error {
2264                        is_missing_cors_configuration_response(
2265                            service_err.err().code(),
2266                            Some(service_err.raw().status().as_u16()),
2267                            &error_text,
2268                        )
2269                    } else {
2270                        is_missing_cors_configuration_response(None, None, &error_text)
2271                    };
2272
2273                if missing_config {
2274                    return Ok(Vec::new());
2275                }
2276                return Err(Error::General(format!("get_bucket_cors: {error_text}")));
2277            }
2278        };
2279
2280        Ok(response
2281            .cors_rules()
2282            .iter()
2283            .map(sdk_cors_rule_to_core)
2284            .collect())
2285    }
2286
2287    async fn set_bucket_cors(&self, bucket: &str, rules: Vec<CorsRule>) -> Result<()> {
2288        let cors_rules = rules
2289            .iter()
2290            .map(core_cors_rule_to_sdk)
2291            .collect::<Result<Vec<_>>>()?;
2292        let cors_configuration = aws_sdk_s3::types::CorsConfiguration::builder()
2293            .set_cors_rules(Some(cors_rules))
2294            .build()
2295            .map_err(|e| Error::General(format!("build bucket cors config: {e}")))?;
2296
2297        self.inner
2298            .put_bucket_cors()
2299            .bucket(bucket)
2300            .cors_configuration(cors_configuration)
2301            .send()
2302            .await
2303            .map_err(|e| Error::General(format!("set_bucket_cors: {e}")))?;
2304
2305        Ok(())
2306    }
2307
2308    async fn delete_bucket_cors(&self, bucket: &str) -> Result<()> {
2309        match self.inner.delete_bucket_cors().bucket(bucket).send().await {
2310            Ok(_) => Ok(()),
2311            Err(error) => {
2312                let error_text = error.to_string();
2313                if error_text.contains("service error")
2314                    && let Ok(url) = self.cors_url(bucket)
2315                {
2316                    match self.xml_request(Method::DELETE, url, None, None).await {
2317                        Ok(_) => return Ok(()),
2318                        Err(Error::Network(raw_error))
2319                            if is_missing_cors_configuration_error(&raw_error) =>
2320                        {
2321                            return Ok(());
2322                        }
2323                        Err(_) => {}
2324                    }
2325                }
2326
2327                let missing_config =
2328                    if let aws_sdk_s3::error::SdkError::ServiceError(service_err) = &error {
2329                        is_missing_cors_configuration_response(
2330                            service_err.err().code(),
2331                            Some(service_err.raw().status().as_u16()),
2332                            &error_text,
2333                        )
2334                    } else {
2335                        is_missing_cors_configuration_response(None, None, &error_text)
2336                    };
2337
2338                if missing_config {
2339                    Ok(())
2340                } else {
2341                    Err(Error::General(format!("delete_bucket_cors: {error_text}")))
2342                }
2343            }
2344        }
2345    }
2346
2347    async fn get_bucket_notifications(&self, bucket: &str) -> Result<Vec<BucketNotification>> {
2348        let response = self
2349            .inner
2350            .get_bucket_notification_configuration()
2351            .bucket(bucket)
2352            .send()
2353            .await
2354            .map_err(|e| Error::General(format!("get_bucket_notifications: {e}")))?;
2355
2356        let mut rules = Vec::new();
2357
2358        for cfg in response.queue_configurations() {
2359            let (prefix, suffix) = Self::extract_notification_filter(cfg.filter());
2360            rules.push(BucketNotification {
2361                id: cfg.id().map(ToString::to_string),
2362                target: NotificationTarget::Queue,
2363                arn: cfg.queue_arn().to_string(),
2364                events: Self::event_list_to_strings(cfg.events()),
2365                prefix,
2366                suffix,
2367            });
2368        }
2369
2370        for cfg in response.topic_configurations() {
2371            let (prefix, suffix) = Self::extract_notification_filter(cfg.filter());
2372            rules.push(BucketNotification {
2373                id: cfg.id().map(ToString::to_string),
2374                target: NotificationTarget::Topic,
2375                arn: cfg.topic_arn().to_string(),
2376                events: Self::event_list_to_strings(cfg.events()),
2377                prefix,
2378                suffix,
2379            });
2380        }
2381
2382        for cfg in response.lambda_function_configurations() {
2383            let (prefix, suffix) = Self::extract_notification_filter(cfg.filter());
2384            rules.push(BucketNotification {
2385                id: cfg.id().map(ToString::to_string),
2386                target: NotificationTarget::Lambda,
2387                arn: cfg.lambda_function_arn().to_string(),
2388                events: Self::event_list_to_strings(cfg.events()),
2389                prefix,
2390                suffix,
2391            });
2392        }
2393
2394        Ok(rules)
2395    }
2396
2397    async fn set_bucket_notifications(
2398        &self,
2399        bucket: &str,
2400        notifications: Vec<BucketNotification>,
2401    ) -> Result<()> {
2402        use aws_sdk_s3::types::{
2403            LambdaFunctionConfiguration, NotificationConfiguration, QueueConfiguration,
2404            TopicConfiguration,
2405        };
2406
2407        let expected_notifications = notifications.clone();
2408        let mut queues = Vec::new();
2409        let mut topics = Vec::new();
2410        let mut lambdas = Vec::new();
2411
2412        for rule in notifications {
2413            let events = Self::strings_to_event_list(&rule.events);
2414            if events.is_empty() {
2415                return Err(Error::General(format!(
2416                    "set_bucket_notifications: empty event list for target '{}'",
2417                    rule.arn
2418                )));
2419            }
2420
2421            let filter =
2422                Self::build_notification_filter(rule.prefix.as_deref(), rule.suffix.as_deref());
2423
2424            match rule.target {
2425                NotificationTarget::Queue => {
2426                    let mut builder = QueueConfiguration::builder()
2427                        .queue_arn(rule.arn)
2428                        .set_events(Some(events))
2429                        .set_id(rule.id);
2430                    if let Some(filter) = filter {
2431                        builder = builder.filter(filter);
2432                    }
2433                    let queue = builder
2434                        .build()
2435                        .map_err(|e| Error::General(format!("build queue notification: {e}")))?;
2436                    queues.push(queue);
2437                }
2438                NotificationTarget::Topic => {
2439                    let mut builder = TopicConfiguration::builder()
2440                        .topic_arn(rule.arn)
2441                        .set_events(Some(events))
2442                        .set_id(rule.id);
2443                    if let Some(filter) = filter {
2444                        builder = builder.filter(filter);
2445                    }
2446                    let topic = builder
2447                        .build()
2448                        .map_err(|e| Error::General(format!("build topic notification: {e}")))?;
2449                    topics.push(topic);
2450                }
2451                NotificationTarget::Lambda => {
2452                    let mut builder = LambdaFunctionConfiguration::builder()
2453                        .lambda_function_arn(rule.arn)
2454                        .set_events(Some(events))
2455                        .set_id(rule.id);
2456                    if let Some(filter) = filter {
2457                        builder = builder.filter(filter);
2458                    }
2459                    let lambda = builder
2460                        .build()
2461                        .map_err(|e| Error::General(format!("build lambda notification: {e}")))?;
2462                    lambdas.push(lambda);
2463                }
2464            }
2465        }
2466
2467        let config = NotificationConfiguration::builder()
2468            .set_queue_configurations(Some(queues))
2469            .set_topic_configurations(Some(topics))
2470            .set_lambda_function_configurations(Some(lambdas))
2471            .build();
2472
2473        match self
2474            .inner
2475            .put_bucket_notification_configuration()
2476            .bucket(bucket)
2477            .notification_configuration(config)
2478            .send()
2479            .await
2480        {
2481            Ok(_) => {}
2482            Err(error) => {
2483                let error_text = error.to_string();
2484                // RustFS may apply notification configuration but still return a non-AWS
2485                // response envelope that the SDK reports as "service error".
2486                if error_text.contains("service error")
2487                    && let Ok(actual) = self.get_bucket_notifications(bucket).await
2488                    && Self::notifications_equivalent(&expected_notifications, &actual)
2489                {
2490                    return Ok(());
2491                }
2492                return Err(Error::General(format!(
2493                    "set_bucket_notifications: {error_text}"
2494                )));
2495            }
2496        }
2497
2498        Ok(())
2499    }
2500
2501    async fn get_bucket_lifecycle(&self, bucket: &str) -> Result<Vec<LifecycleRule>> {
2502        let response = match self
2503            .inner
2504            .get_bucket_lifecycle_configuration()
2505            .bucket(bucket)
2506            .send()
2507            .await
2508        {
2509            Ok(resp) => resp,
2510            Err(error) => {
2511                let error_text = Self::format_sdk_error(&error);
2512                if error_text.contains("NoSuchLifecycleConfiguration")
2513                    || error_text.contains("lifecycle configuration is not found")
2514                {
2515                    return Ok(Vec::new());
2516                }
2517                return Err(Error::General(format!(
2518                    "get_bucket_lifecycle: {error_text}"
2519                )));
2520            }
2521        };
2522
2523        let mut rules = Vec::new();
2524        for sdk_rule in response.rules() {
2525            let id = sdk_rule.id().unwrap_or("").to_string();
2526            let status = match sdk_rule.status().as_str() {
2527                "Enabled" => rc_core::LifecycleRuleStatus::Enabled,
2528                _ => rc_core::LifecycleRuleStatus::Disabled,
2529            };
2530
2531            let prefix = parse_lifecycle_filter_prefix(sdk_rule.filter());
2532            let tags = parse_lifecycle_filter_tags(sdk_rule.filter());
2533
2534            let expiration = sdk_rule
2535                .expiration()
2536                .map(|exp| rc_core::LifecycleExpiration {
2537                    days: exp.days(),
2538                    date: exp.date().map(|d| d.to_string()),
2539                });
2540
2541            let transition = sdk_rule
2542                .transitions()
2543                .first()
2544                .map(|t| rc_core::LifecycleTransition {
2545                    days: t.days(),
2546                    date: t.date().map(|d| d.to_string()),
2547                    storage_class: t
2548                        .storage_class()
2549                        .map(|sc| sc.as_str().to_string())
2550                        .unwrap_or_default(),
2551                });
2552
2553            let noncurrent_version_expiration =
2554                sdk_rule.noncurrent_version_expiration().map(|nve| {
2555                    rc_core::NoncurrentVersionExpiration {
2556                        noncurrent_days: nve.noncurrent_days().unwrap_or(0),
2557                        newer_noncurrent_versions: nve.newer_noncurrent_versions(),
2558                    }
2559                });
2560
2561            let noncurrent_version_transition = sdk_rule
2562                .noncurrent_version_transitions()
2563                .first()
2564                .map(|nvt| rc_core::NoncurrentVersionTransition {
2565                    noncurrent_days: nvt.noncurrent_days().unwrap_or(0),
2566                    storage_class: nvt
2567                        .storage_class()
2568                        .map(|sc| sc.as_str().to_string())
2569                        .unwrap_or_default(),
2570                });
2571
2572            let abort_incomplete_multipart_upload_days = sdk_rule
2573                .abort_incomplete_multipart_upload()
2574                .and_then(|a| a.days_after_initiation());
2575
2576            let expired_object_delete_marker = sdk_rule
2577                .expiration()
2578                .and_then(|e| e.expired_object_delete_marker())
2579                .filter(|v| *v);
2580
2581            rules.push(LifecycleRule {
2582                id,
2583                status,
2584                prefix,
2585                tags,
2586                expiration,
2587                transition,
2588                noncurrent_version_expiration,
2589                noncurrent_version_transition,
2590                abort_incomplete_multipart_upload_days,
2591                expired_object_delete_marker,
2592            });
2593        }
2594
2595        Ok(rules)
2596    }
2597
2598    async fn set_bucket_lifecycle(&self, bucket: &str, rules: Vec<LifecycleRule>) -> Result<()> {
2599        use aws_sdk_s3::types::{
2600            AbortIncompleteMultipartUpload, BucketLifecycleConfiguration, ExpirationStatus,
2601            LifecycleExpiration as SdkExpiration, LifecycleRule as SdkRule,
2602            NoncurrentVersionExpiration as SdkNve, NoncurrentVersionTransition as SdkNvt,
2603            Transition, TransitionStorageClass,
2604        };
2605
2606        let mut sdk_rules = Vec::new();
2607        for rule in rules {
2608            let status = match rule.status {
2609                rc_core::LifecycleRuleStatus::Enabled => ExpirationStatus::Enabled,
2610                rc_core::LifecycleRuleStatus::Disabled => ExpirationStatus::Disabled,
2611            };
2612
2613            let filter = build_lifecycle_rule_filter(rule.prefix.as_deref(), rule.tags.as_ref())?;
2614
2615            let expiration = rule.expiration.map(|exp| {
2616                let mut builder = SdkExpiration::builder();
2617                if let Some(days) = exp.days {
2618                    builder = builder.days(days);
2619                }
2620                if let Some(ref date_str) = exp.date
2621                    && let Ok(dt) = aws_smithy_types::DateTime::from_str(
2622                        date_str,
2623                        aws_smithy_types::date_time::Format::DateTime,
2624                    )
2625                {
2626                    builder = builder.date(dt);
2627                }
2628                if let Some(true) = rule.expired_object_delete_marker {
2629                    builder = builder.expired_object_delete_marker(true);
2630                }
2631                builder.build()
2632            });
2633
2634            let transitions = rule.transition.map(|t| {
2635                #[allow(deprecated)]
2636                let sc = TransitionStorageClass::from(t.storage_class.as_str());
2637                let mut builder = Transition::builder().storage_class(sc);
2638                if let Some(days) = t.days {
2639                    builder = builder.days(days);
2640                }
2641                if let Some(ref date_str) = t.date
2642                    && let Ok(dt) = aws_smithy_types::DateTime::from_str(
2643                        date_str,
2644                        aws_smithy_types::date_time::Format::DateTime,
2645                    )
2646                {
2647                    builder = builder.date(dt);
2648                }
2649                vec![builder.build()]
2650            });
2651
2652            let nve = rule.noncurrent_version_expiration.map(|nve| {
2653                let mut builder = SdkNve::builder().noncurrent_days(nve.noncurrent_days);
2654                if let Some(newer) = nve.newer_noncurrent_versions {
2655                    builder = builder.newer_noncurrent_versions(newer);
2656                }
2657                builder.build()
2658            });
2659
2660            let nvt = rule.noncurrent_version_transition.map(|nvt| {
2661                let sc = TransitionStorageClass::from(nvt.storage_class.as_str());
2662                let builder = SdkNvt::builder()
2663                    .noncurrent_days(nvt.noncurrent_days)
2664                    .storage_class(sc);
2665                vec![builder.build()]
2666            });
2667
2668            let abort = rule.abort_incomplete_multipart_upload_days.map(|days| {
2669                AbortIncompleteMultipartUpload::builder()
2670                    .days_after_initiation(days)
2671                    .build()
2672            });
2673
2674            let mut builder = SdkRule::builder().id(&rule.id).status(status);
2675            if let Some(filter) = filter {
2676                builder = builder.filter(filter);
2677            }
2678            if let Some(expiration) = expiration {
2679                builder = builder.expiration(expiration);
2680            }
2681            if let Some(transitions) = transitions {
2682                builder = builder.set_transitions(Some(transitions));
2683            }
2684            if let Some(nve) = nve {
2685                builder = builder.noncurrent_version_expiration(nve);
2686            }
2687            if let Some(nvt) = nvt {
2688                builder = builder.set_noncurrent_version_transitions(Some(nvt));
2689            }
2690            if let Some(abort) = abort {
2691                builder = builder.abort_incomplete_multipart_upload(abort);
2692            }
2693
2694            let sdk_rule = builder
2695                .build()
2696                .map_err(|e| Error::General(format!("build lifecycle rule: {e}")))?;
2697            sdk_rules.push(sdk_rule);
2698        }
2699
2700        let config = BucketLifecycleConfiguration::builder()
2701            .set_rules(Some(sdk_rules))
2702            .build()
2703            .map_err(|e| Error::General(format!("build lifecycle config: {e}")))?;
2704
2705        self.inner
2706            .put_bucket_lifecycle_configuration()
2707            .bucket(bucket)
2708            .lifecycle_configuration(config)
2709            .send()
2710            .await
2711            .map_err(|e| {
2712                Error::General(format!(
2713                    "set_bucket_lifecycle: {}",
2714                    Self::format_sdk_error(&e)
2715                ))
2716            })?;
2717
2718        Ok(())
2719    }
2720
2721    async fn delete_bucket_lifecycle(&self, bucket: &str) -> Result<()> {
2722        self.inner
2723            .delete_bucket_lifecycle()
2724            .bucket(bucket)
2725            .send()
2726            .await
2727            .map_err(|e| {
2728                Error::General(format!(
2729                    "delete_bucket_lifecycle: {}",
2730                    Self::format_sdk_error(&e)
2731                ))
2732            })?;
2733        Ok(())
2734    }
2735
2736    async fn restore_object(&self, path: &RemotePath, days: i32) -> Result<()> {
2737        use aws_sdk_s3::types::RestoreRequest;
2738
2739        let request = RestoreRequest::builder().days(days).build();
2740        self.inner
2741            .restore_object()
2742            .bucket(&path.bucket)
2743            .key(&path.key)
2744            .restore_request(request)
2745            .send()
2746            .await
2747            .map_err(|e| {
2748                Error::General(format!("restore_object: {}", Self::format_sdk_error(&e)))
2749            })?;
2750        Ok(())
2751    }
2752
2753    async fn get_bucket_replication(
2754        &self,
2755        bucket: &str,
2756    ) -> Result<Option<ReplicationConfiguration>> {
2757        let url = self.replication_url(bucket)?;
2758        let body = match self.xml_request(Method::GET, url, None, None).await {
2759            Ok(body) => body,
2760            Err(Error::Network(error_text))
2761                if error_text.contains("ReplicationConfigurationNotFound")
2762                    || error_text.contains("replication configuration is not found")
2763                    || error_text.contains("replication not found") =>
2764            {
2765                return Ok(None);
2766            }
2767            Err(error) => {
2768                return Err(Error::General(format!("get_bucket_replication: {error}")));
2769            }
2770        };
2771
2772        parse_replication_configuration_xml(&body).map(Some)
2773    }
2774
2775    async fn set_bucket_replication(
2776        &self,
2777        bucket: &str,
2778        config: ReplicationConfiguration,
2779    ) -> Result<()> {
2780        let url = self.replication_url(bucket)?;
2781        let body = build_replication_configuration_xml(&config).into_bytes();
2782        self.xml_request(Method::PUT, url, Some("application/xml"), Some(body))
2783            .await
2784            .map_err(|e| Error::General(format!("set_bucket_replication: {e}")))?;
2785
2786        Ok(())
2787    }
2788
2789    async fn delete_bucket_replication(&self, bucket: &str) -> Result<()> {
2790        self.inner
2791            .delete_bucket_replication()
2792            .bucket(bucket)
2793            .send()
2794            .await
2795            .map_err(|e| {
2796                Error::General(format!(
2797                    "delete_bucket_replication: {}",
2798                    Self::format_sdk_error(&e)
2799                ))
2800            })?;
2801        Ok(())
2802    }
2803
2804    async fn select_object_content(
2805        &self,
2806        path: &RemotePath,
2807        options: &SelectOptions,
2808        writer: &mut (dyn AsyncWrite + Send + Unpin),
2809    ) -> Result<()> {
2810        crate::select::select_object_content(&self.inner, path, options, writer).await
2811    }
2812}
2813
2814#[cfg(test)]
2815mod tests {
2816    use super::*;
2817    use aws_smithy_http_client::test_util::{CaptureRequestReceiver, capture_request};
2818    use std::collections::HashMap;
2819
2820    fn test_s3_client(
2821        response: Option<http::Response<SdkBody>>,
2822    ) -> (S3Client, CaptureRequestReceiver) {
2823        test_s3_client_with_endpoint("https://example.com", response)
2824    }
2825
2826    fn test_s3_client_with_endpoint(
2827        endpoint: &str,
2828        response: Option<http::Response<SdkBody>>,
2829    ) -> (S3Client, CaptureRequestReceiver) {
2830        test_s3_client_with_endpoint_and_headers(endpoint, response, Vec::new())
2831    }
2832
2833    fn test_s3_client_with_endpoint_and_headers(
2834        endpoint: &str,
2835        response: Option<http::Response<SdkBody>>,
2836        request_headers: Vec<RequestHeader>,
2837    ) -> (S3Client, CaptureRequestReceiver) {
2838        let (http_client, request_receiver) = capture_request(response);
2839        let credentials = Credentials::new(
2840            "access-key",
2841            "secret-key",
2842            None,
2843            None,
2844            "rc-test-credentials",
2845        );
2846        let mut config_builder = aws_sdk_s3::config::Builder::new()
2847            .credentials_provider(credentials)
2848            .endpoint_url(endpoint)
2849            .region(aws_sdk_s3::config::Region::new("us-east-1"))
2850            .force_path_style(true)
2851            .behavior_version_latest()
2852            .http_client(http_client);
2853
2854        let presign_config = config_builder.clone().build();
2855
2856        if !request_headers.is_empty() {
2857            config_builder = config_builder.interceptor(CustomHeaderInterceptor {
2858                headers: request_headers.clone(),
2859            });
2860        }
2861
2862        let config = config_builder.build();
2863
2864        let alias = Alias::new("test", endpoint, "access-key", "secret-key");
2865        let client = S3Client {
2866            inner: aws_sdk_s3::Client::from_conf(config),
2867            presign_inner: aws_sdk_s3::Client::from_conf(presign_config),
2868            xml_http_client: reqwest::Client::new(),
2869            alias,
2870            request_headers,
2871        };
2872
2873        (client, request_receiver)
2874    }
2875
2876    #[test]
2877    fn test_object_info_creation() {
2878        let info = ObjectInfo::file("test.txt", 1024);
2879        assert_eq!(info.key, "test.txt");
2880        assert_eq!(info.size_bytes, Some(1024));
2881    }
2882
2883    #[test]
2884    fn auto_bucket_lookup_uses_dns_for_aliyun_oss_service_endpoint() {
2885        let alias = Alias::new(
2886            "aliyun",
2887            "https://oss-cn-hangzhou.aliyuncs.com",
2888            "access-key",
2889            "secret-key",
2890        );
2891
2892        assert!(!force_path_style_for_alias(&alias));
2893    }
2894
2895    #[test]
2896    fn auto_bucket_lookup_uses_dns_for_aliyun_internal_service_endpoint() {
2897        let alias = Alias::new(
2898            "aliyun",
2899            "https://oss-cn-hangzhou-internal.aliyuncs.com",
2900            "access-key",
2901            "secret-key",
2902        );
2903
2904        assert!(!force_path_style_for_alias(&alias));
2905    }
2906
2907    #[test]
2908    fn auto_bucket_lookup_keeps_path_style_for_custom_endpoint() {
2909        let alias = Alias::new("local", "http://localhost:9000", "access-key", "secret-key");
2910
2911        assert!(force_path_style_for_alias(&alias));
2912    }
2913
2914    #[test]
2915    fn auto_bucket_lookup_keeps_path_style_for_non_oss_aliyun_endpoint() {
2916        let alias = Alias::new(
2917            "aliyun",
2918            "https://ecs-cn-hangzhou.aliyuncs.com",
2919            "access-key",
2920            "secret-key",
2921        );
2922
2923        assert!(force_path_style_for_alias(&alias));
2924    }
2925
2926    #[test]
2927    fn auto_bucket_lookup_keeps_path_style_for_invalid_endpoint() {
2928        let alias = Alias::new("broken", "not a valid endpoint", "access-key", "secret-key");
2929
2930        assert!(force_path_style_for_alias(&alias));
2931    }
2932
2933    #[test]
2934    fn explicit_bucket_lookup_overrides_auto_detection() {
2935        let mut path_alias = Alias::new(
2936            "aliyun",
2937            "https://oss-cn-hangzhou.aliyuncs.com",
2938            "access-key",
2939            "secret-key",
2940        );
2941        path_alias.bucket_lookup = "path".to_string();
2942
2943        let mut dns_alias =
2944            Alias::new("local", "http://localhost:9000", "access-key", "secret-key");
2945        dns_alias.bucket_lookup = "dns".to_string();
2946
2947        assert!(force_path_style_for_alias(&path_alias));
2948        assert!(!force_path_style_for_alias(&dns_alias));
2949    }
2950
2951    #[test]
2952    fn unknown_bucket_lookup_keeps_path_style() {
2953        let mut alias = Alias::new(
2954            "aliyun",
2955            "https://oss-cn-hangzhou.aliyuncs.com",
2956            "access-key",
2957            "secret-key",
2958        );
2959        alias.bucket_lookup = "unexpected".to_string();
2960
2961        assert!(force_path_style_for_alias(&alias));
2962    }
2963
2964    #[test]
2965    fn parse_replication_configuration_xml_reads_delete_replication() {
2966        let body = r#"<?xml version="1.0" encoding="UTF-8"?>
2967<ReplicationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
2968  <Role>arn:rustfs:replication:us-east-1:123:test</Role>
2969  <Rule>
2970    <Status>Enabled</Status>
2971    <Destination>
2972      <Bucket>arn:rustfs:replication:us-east-1:123:dest</Bucket>
2973      <StorageClass>STANDARD</StorageClass>
2974    </Destination>
2975    <ID>rule-1</ID>
2976    <Priority>1</Priority>
2977    <Filter>
2978      <Prefix>logs/</Prefix>
2979    </Filter>
2980    <ExistingObjectReplication>
2981      <Status>Enabled</Status>
2982    </ExistingObjectReplication>
2983    <DeleteMarkerReplication>
2984      <Status>Disabled</Status>
2985    </DeleteMarkerReplication>
2986    <DeleteReplication>
2987      <Status>Enabled</Status>
2988    </DeleteReplication>
2989  </Rule>
2990</ReplicationConfiguration>"#;
2991
2992        let config = parse_replication_configuration_xml(body).expect("parse replication xml");
2993        assert_eq!(config.role, "arn:rustfs:replication:us-east-1:123:test");
2994        assert_eq!(config.rules.len(), 1);
2995        assert_eq!(config.rules[0].id, "rule-1");
2996        assert_eq!(config.rules[0].prefix.as_deref(), Some("logs/"));
2997        assert_eq!(config.rules[0].delete_replication, Some(true));
2998        assert_eq!(config.rules[0].delete_marker_replication, Some(false));
2999        assert_eq!(config.rules[0].existing_object_replication, Some(true));
3000    }
3001
3002    #[test]
3003    fn parse_replication_configuration_xml_preserves_tag_filters() {
3004        let body = r#"<?xml version="1.0" encoding="UTF-8"?>
3005<ReplicationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
3006  <Rule>
3007    <Status>Enabled</Status>
3008    <Destination>
3009      <Bucket>arn:rustfs:replication:us-east-1:123:dest</Bucket>
3010    </Destination>
3011    <ID>tagged-rule</ID>
3012    <Priority>2</Priority>
3013    <Filter>
3014      <And>
3015        <Prefix>logs/</Prefix>
3016        <Tag>
3017          <Key>env</Key>
3018          <Value>prod</Value>
3019        </Tag>
3020        <Tag>
3021          <Key>team</Key>
3022          <Value>core</Value>
3023        </Tag>
3024      </And>
3025    </Filter>
3026  </Rule>
3027</ReplicationConfiguration>"#;
3028
3029        let config = parse_replication_configuration_xml(body).expect("parse replication xml");
3030        let rule = &config.rules[0];
3031        assert_eq!(rule.prefix.as_deref(), Some("logs/"));
3032        let tags = rule.tags.as_ref().expect("tag filters");
3033        assert_eq!(tags.get("env").map(String::as_str), Some("prod"));
3034        assert_eq!(tags.get("team").map(String::as_str), Some("core"));
3035    }
3036
3037    #[test]
3038    fn build_replication_configuration_xml_writes_delete_replication() {
3039        let config = ReplicationConfiguration {
3040            role: "arn:rustfs:replication:us-east-1:123:test".to_string(),
3041            rules: vec![rc_core::ReplicationRule {
3042                id: "rule-1".to_string(),
3043                priority: 1,
3044                status: rc_core::ReplicationRuleStatus::Enabled,
3045                prefix: Some("logs/".to_string()),
3046                tags: None,
3047                destination: rc_core::ReplicationDestination {
3048                    bucket_arn: "arn:rustfs:replication:us-east-1:123:dest".to_string(),
3049                    storage_class: Some("STANDARD".to_string()),
3050                },
3051                delete_marker_replication: Some(true),
3052                existing_object_replication: Some(true),
3053                delete_replication: Some(true),
3054            }],
3055        };
3056
3057        let xml = build_replication_configuration_xml(&config);
3058        assert!(xml.contains("<DeleteReplication><Status>Enabled</Status></DeleteReplication>"));
3059        assert!(xml.contains(
3060            "<ExistingObjectReplication><Status>Enabled</Status></ExistingObjectReplication>"
3061        ));
3062        assert!(xml.contains(
3063            "<DeleteMarkerReplication><Status>Enabled</Status></DeleteMarkerReplication>"
3064        ));
3065        assert!(xml.contains("<Filter><Prefix>logs/</Prefix></Filter>"));
3066    }
3067
3068    #[test]
3069    fn build_replication_configuration_xml_writes_and_tag_filters() {
3070        let mut tags = HashMap::new();
3071        tags.insert("env".to_string(), "prod".to_string());
3072        tags.insert("team".to_string(), "core".to_string());
3073
3074        let config = ReplicationConfiguration {
3075            role: String::new(),
3076            rules: vec![rc_core::ReplicationRule {
3077                id: "rule-1".to_string(),
3078                priority: 1,
3079                status: rc_core::ReplicationRuleStatus::Enabled,
3080                prefix: Some("logs/".to_string()),
3081                tags: Some(tags),
3082                destination: rc_core::ReplicationDestination {
3083                    bucket_arn: "arn:rustfs:replication:us-east-1:123:dest".to_string(),
3084                    storage_class: None,
3085                },
3086                delete_marker_replication: None,
3087                existing_object_replication: None,
3088                delete_replication: None,
3089            }],
3090        };
3091
3092        let xml = build_replication_configuration_xml(&config);
3093        assert!(xml.contains("<Filter><And><Prefix>logs/</Prefix>"));
3094        assert!(xml.contains("<Tag><Key>env</Key><Value>prod</Value></Tag>"));
3095        assert!(xml.contains("<Tag><Key>team</Key><Value>core</Value></Tag>"));
3096    }
3097
3098    #[test]
3099    fn build_lifecycle_rule_filter_preserves_prefix_and_tags() {
3100        let mut tags = HashMap::new();
3101        tags.insert("env".to_string(), "prod".to_string());
3102        tags.insert("team".to_string(), "core".to_string());
3103
3104        let filter = build_lifecycle_rule_filter(Some("logs/"), Some(&tags))
3105            .expect("build lifecycle filter")
3106            .expect("lifecycle filter");
3107
3108        assert_eq!(
3109            parse_lifecycle_filter_prefix(Some(&filter)).as_deref(),
3110            Some("logs/")
3111        );
3112        let parsed_tags = parse_lifecycle_filter_tags(Some(&filter)).expect("parsed tags");
3113        assert_eq!(parsed_tags.get("env").map(String::as_str), Some("prod"));
3114        assert_eq!(parsed_tags.get("team").map(String::as_str), Some("core"));
3115    }
3116
3117    #[test]
3118    fn bucket_policy_error_kind_uses_error_code() {
3119        assert_eq!(
3120            S3Client::bucket_policy_error_kind(Some("NoSuchBucketPolicy"), Some(404), ""),
3121            BucketPolicyErrorKind::MissingPolicy
3122        );
3123        assert_eq!(
3124            S3Client::bucket_policy_error_kind(Some("NoSuchBucket"), Some(404), ""),
3125            BucketPolicyErrorKind::MissingBucket
3126        );
3127    }
3128
3129    #[test]
3130    fn bucket_policy_error_kind_prefers_bucket_not_found_over_404_fallback() {
3131        assert_eq!(
3132            S3Client::bucket_policy_error_kind(None, Some(404), "NoSuchBucket"),
3133            BucketPolicyErrorKind::MissingBucket
3134        );
3135        assert_eq!(
3136            S3Client::bucket_policy_error_kind(None, Some(404), "no details"),
3137            BucketPolicyErrorKind::MissingPolicy
3138        );
3139    }
3140
3141    #[test]
3142    fn bucket_policy_error_mapping_returns_expected_result() {
3143        let get_missing_policy = S3Client::map_get_bucket_policy_error(
3144            "bucket",
3145            BucketPolicyErrorKind::MissingPolicy,
3146            "NoSuchPolicy",
3147        )
3148        .expect("missing policy should map to Ok(None)");
3149        assert!(get_missing_policy.is_none());
3150
3151        match S3Client::map_get_bucket_policy_error(
3152            "bucket",
3153            BucketPolicyErrorKind::MissingBucket,
3154            "NoSuchBucket",
3155        ) {
3156            Err(Error::NotFound(message)) => assert!(message.contains("Bucket not found")),
3157            other => panic!("Expected NotFound for missing bucket, got: {:?}", other),
3158        }
3159
3160        let delete_missing_policy = S3Client::map_delete_bucket_policy_error(
3161            "bucket",
3162            BucketPolicyErrorKind::MissingPolicy,
3163            "NoSuchPolicy",
3164        );
3165        assert!(
3166            delete_missing_policy.is_ok(),
3167            "Missing policy should be treated as successful delete"
3168        );
3169    }
3170
3171    #[test]
3172    fn notification_filter_round_trip_prefix_and_suffix() {
3173        let filter = S3Client::build_notification_filter(Some("logs/"), Some(".json"))
3174            .expect("filter should be built");
3175        let (prefix, suffix) = S3Client::extract_notification_filter(Some(&filter));
3176        assert_eq!(prefix.as_deref(), Some("logs/"));
3177        assert_eq!(suffix.as_deref(), Some(".json"));
3178    }
3179
3180    #[test]
3181    fn notification_filter_none_when_empty() {
3182        assert!(S3Client::build_notification_filter(None, None).is_none());
3183    }
3184
3185    #[test]
3186    fn notifications_equivalent_ignores_order_and_duplicate_events() {
3187        let expected = vec![
3188            BucketNotification {
3189                id: Some("a".to_string()),
3190                target: NotificationTarget::Queue,
3191                arn: "arn:aws:sqs:us-east-1:123456789012:q".to_string(),
3192                events: vec![
3193                    "s3:ObjectCreated:*".to_string(),
3194                    "s3:ObjectCreated:*".to_string(),
3195                ],
3196                prefix: Some("images/".to_string()),
3197                suffix: Some(".jpg".to_string()),
3198            },
3199            BucketNotification {
3200                id: Some("b".to_string()),
3201                target: NotificationTarget::Topic,
3202                arn: "arn:aws:sns:us-east-1:123456789012:t".to_string(),
3203                events: vec!["s3:ObjectRemoved:*".to_string()],
3204                prefix: None,
3205                suffix: None,
3206            },
3207        ];
3208
3209        let actual = vec![
3210            BucketNotification {
3211                id: None,
3212                target: NotificationTarget::Topic,
3213                arn: "arn:aws:sns:us-east-1:123456789012:t".to_string(),
3214                events: vec!["s3:ObjectRemoved:*".to_string()],
3215                prefix: None,
3216                suffix: None,
3217            },
3218            BucketNotification {
3219                id: None,
3220                target: NotificationTarget::Queue,
3221                arn: "arn:aws:sqs:us-east-1:123456789012:q".to_string(),
3222                events: vec!["s3:ObjectCreated:*".to_string()],
3223                prefix: Some("images/".to_string()),
3224                suffix: Some(".jpg".to_string()),
3225            },
3226        ];
3227
3228        assert!(S3Client::notifications_equivalent(&expected, &actual));
3229    }
3230
3231    #[test]
3232    fn sdk_cors_rule_to_core_preserves_optional_fields() {
3233        let sdk_rule = aws_sdk_s3::types::CorsRule::builder()
3234            .id("web-app")
3235            .allowed_origins("https://app.example.com")
3236            .allowed_methods("get")
3237            .allowed_headers("Authorization")
3238            .expose_headers("ETag")
3239            .max_age_seconds(300)
3240            .build()
3241            .expect("build cors rule");
3242
3243        let rule = sdk_cors_rule_to_core(&sdk_rule);
3244        assert_eq!(rule.id.as_deref(), Some("web-app"));
3245        assert_eq!(
3246            rule.allowed_origins,
3247            vec!["https://app.example.com".to_string()]
3248        );
3249        assert_eq!(rule.allowed_methods, vec!["get".to_string()]);
3250        assert_eq!(
3251            rule.allowed_headers,
3252            Some(vec!["Authorization".to_string()])
3253        );
3254        assert_eq!(rule.expose_headers, Some(vec!["ETag".to_string()]));
3255        assert_eq!(rule.max_age_seconds, Some(300));
3256    }
3257
3258    #[test]
3259    fn sdk_cors_rule_to_core_drops_empty_optional_headers() {
3260        let sdk_rule = aws_sdk_s3::types::CorsRule::builder()
3261            .allowed_origins("https://app.example.com")
3262            .allowed_methods("GET")
3263            .build()
3264            .expect("build cors rule");
3265
3266        let rule = sdk_cors_rule_to_core(&sdk_rule);
3267        assert_eq!(rule.allowed_headers, None);
3268        assert_eq!(rule.expose_headers, None);
3269    }
3270
3271    #[test]
3272    fn core_cors_rule_to_sdk_normalizes_method_case() {
3273        let rule = CorsRule {
3274            id: Some("public-read".to_string()),
3275            allowed_origins: vec!["*".to_string()],
3276            allowed_methods: vec!["get".to_string(), "post".to_string()],
3277            allowed_headers: Some(vec!["*".to_string()]),
3278            expose_headers: None,
3279            max_age_seconds: Some(600),
3280        };
3281
3282        let sdk_rule = core_cors_rule_to_sdk(&rule).expect("convert cors rule");
3283        assert_eq!(sdk_rule.id(), Some("public-read"));
3284        assert_eq!(sdk_rule.allowed_origins(), ["*"]);
3285        assert_eq!(sdk_rule.allowed_methods(), ["GET", "POST"]);
3286        assert_eq!(sdk_rule.allowed_headers(), ["*"]);
3287        assert_eq!(sdk_rule.max_age_seconds(), Some(600));
3288    }
3289
3290    #[tokio::test]
3291    async fn set_bucket_cors_sends_rule_fields() {
3292        let response = http::Response::builder()
3293            .status(200)
3294            .body(SdkBody::from(""))
3295            .expect("build put bucket cors response");
3296        let (client, request_receiver) = test_s3_client(Some(response));
3297
3298        client
3299            .set_bucket_cors(
3300                "bucket",
3301                vec![CorsRule {
3302                    id: Some("web-app".to_string()),
3303                    allowed_origins: vec!["https://app.example.com".to_string()],
3304                    allowed_methods: vec!["GET".to_string(), "POST".to_string()],
3305                    allowed_headers: Some(vec!["Authorization".to_string()]),
3306                    expose_headers: Some(vec!["ETag".to_string()]),
3307                    max_age_seconds: Some(600),
3308                }],
3309            )
3310            .await
3311            .expect("set bucket cors");
3312
3313        let request = request_receiver.expect_request();
3314        assert_eq!(request.method(), http::Method::PUT);
3315        assert!(
3316            request.uri().to_string().contains("?cors"),
3317            "expected bucket CORS subresource in URI: {}",
3318            request.uri()
3319        );
3320
3321        let body = request.body().bytes().expect("request body bytes");
3322        let body = std::str::from_utf8(body).expect("request body is utf8");
3323        assert!(body.contains("<ID>web-app</ID>"));
3324        assert!(body.contains("<AllowedOrigin>https://app.example.com</AllowedOrigin>"));
3325        assert!(body.contains("<AllowedMethod>GET</AllowedMethod>"));
3326        assert!(body.contains("<AllowedMethod>POST</AllowedMethod>"));
3327        assert!(body.contains("<AllowedHeader>Authorization</AllowedHeader>"));
3328        assert!(body.contains("<ExposeHeader>ETag</ExposeHeader>"));
3329        assert!(body.contains("<MaxAgeSeconds>600</MaxAgeSeconds>"));
3330    }
3331
3332    #[test]
3333    fn core_cors_rule_to_sdk_drops_empty_optional_headers() {
3334        let rule = CorsRule {
3335            id: None,
3336            allowed_origins: vec!["https://app.example.com".to_string()],
3337            allowed_methods: vec!["GET".to_string()],
3338            allowed_headers: Some(Vec::new()),
3339            expose_headers: Some(Vec::new()),
3340            max_age_seconds: None,
3341        };
3342
3343        let sdk_rule = core_cors_rule_to_sdk(&rule).expect("convert cors rule");
3344        assert!(sdk_rule.allowed_headers().is_empty());
3345        assert!(sdk_rule.expose_headers().is_empty());
3346    }
3347
3348    #[test]
3349    fn parse_cors_configuration_xml_round_trips_rule_fields() {
3350        let body = r#"
3351<CORSConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
3352  <CORSRule>
3353    <ID>mc-rule</ID>
3354    <AllowedOrigin>https://console.example.com</AllowedOrigin>
3355    <AllowedMethod>GET</AllowedMethod>
3356    <AllowedMethod>POST</AllowedMethod>
3357    <AllowedHeader>*</AllowedHeader>
3358    <ExposeHeader>ETag</ExposeHeader>
3359    <MaxAgeSeconds>1200</MaxAgeSeconds>
3360  </CORSRule>
3361</CORSConfiguration>
3362"#;
3363
3364        let rules = parse_cors_configuration_xml(body).expect("parse cors xml");
3365        assert_eq!(rules.len(), 1);
3366        assert_eq!(rules[0].id.as_deref(), Some("mc-rule"));
3367        assert_eq!(
3368            rules[0].allowed_origins,
3369            vec!["https://console.example.com".to_string()]
3370        );
3371        assert_eq!(
3372            rules[0].allowed_methods,
3373            vec!["GET".to_string(), "POST".to_string()]
3374        );
3375        assert_eq!(rules[0].allowed_headers, Some(vec!["*".to_string()]));
3376        assert_eq!(rules[0].expose_headers, Some(vec!["ETag".to_string()]));
3377        assert_eq!(rules[0].max_age_seconds, Some(1200));
3378    }
3379
3380    #[test]
3381    fn cors_url_uses_path_style_bucket_and_query() {
3382        let (client, _) = test_s3_client(None);
3383
3384        let url = client.cors_url("bucket-name").expect("build cors url");
3385
3386        assert_eq!(url.as_str(), "https://example.com/bucket-name?cors=");
3387    }
3388
3389    #[test]
3390    fn cors_url_rejects_endpoints_without_path_segments() {
3391        let (client, _) = test_s3_client_with_endpoint("mailto:test@example.com", None);
3392
3393        match client.cors_url("bucket-name") {
3394            Err(Error::Network(message)) => {
3395                assert!(message.contains("does not support path-style bucket operations"));
3396            }
3397            other => panic!("expected path-style endpoint error, got {other:?}"),
3398        }
3399    }
3400
3401    #[test]
3402    fn missing_cors_configuration_errors_are_detected() {
3403        assert!(is_missing_cors_configuration_error(
3404            "NoSuchCORSConfiguration"
3405        ));
3406        assert!(is_missing_cors_configuration_error(
3407            "The CORS configuration does not exist"
3408        ));
3409        assert!(!is_missing_cors_configuration_error("AccessDenied"));
3410    }
3411
3412    #[test]
3413    fn missing_cors_configuration_response_detects_code_and_status() {
3414        assert!(is_missing_cors_configuration_response(
3415            Some("NoSuchCORSConfiguration"),
3416            Some(404),
3417            "service error"
3418        ));
3419        assert!(is_missing_cors_configuration_response(
3420            None,
3421            Some(404),
3422            "The CORS configuration does not exist"
3423        ));
3424        assert!(!is_missing_cors_configuration_response(
3425            Some("AccessDenied"),
3426            Some(403),
3427            "access denied"
3428        ));
3429        assert!(!is_missing_cors_configuration_response(
3430            None,
3431            Some(404),
3432            "service error"
3433        ));
3434        assert!(!is_missing_cors_configuration_response(
3435            Some("NoSuchBucket"),
3436            Some(404),
3437            "NoSuchBucket"
3438        ));
3439        assert!(is_missing_cors_configuration_response(
3440            None,
3441            None,
3442            "The CORS configuration does not exist"
3443        ));
3444        assert!(!is_missing_cors_configuration_response(
3445            None,
3446            Some(500),
3447            "The CORS configuration does not exist"
3448        ));
3449    }
3450
3451    #[tokio::test]
3452    async fn get_bucket_cors_missing_configuration_returns_empty_rules() {
3453        let response = http::Response::builder()
3454            .status(404)
3455            .header("x-amz-error-code", "NoSuchCORSConfiguration")
3456            .body(SdkBody::from(
3457                r#"<?xml version="1.0" encoding="UTF-8"?>
3458<Error>
3459  <Code>NoSuchCORSConfiguration</Code>
3460  <Message>The CORS configuration does not exist</Message>
3461</Error>"#,
3462            ))
3463            .expect("build missing cors response");
3464        let (client, _) = test_s3_client(Some(response));
3465
3466        let rules = client
3467            .get_bucket_cors("bucket")
3468            .await
3469            .expect("missing cors config should be treated as empty");
3470
3471        assert!(rules.is_empty());
3472    }
3473
3474    #[tokio::test]
3475    async fn delete_bucket_cors_missing_configuration_is_successful() {
3476        let response = http::Response::builder()
3477            .status(404)
3478            .header("x-amz-error-code", "NoSuchCORSConfiguration")
3479            .body(SdkBody::from(
3480                r#"<?xml version="1.0" encoding="UTF-8"?>
3481<Error>
3482  <Code>NoSuchCORSConfiguration</Code>
3483  <Message>The CORS configuration does not exist</Message>
3484</Error>"#,
3485            ))
3486            .expect("build missing cors response");
3487        let (client, _) = test_s3_client(Some(response));
3488
3489        client
3490            .delete_bucket_cors("bucket")
3491            .await
3492            .expect("missing cors config should be treated as successful delete");
3493    }
3494
3495    #[tokio::test]
3496    async fn reqwest_connector_insecure_without_ca_bundle_succeeds() {
3497        // When insecure is true and no CA bundle is provided, the connector should be created.
3498        let connector = ReqwestConnector::new(true, None).await;
3499        assert!(
3500            connector.is_ok(),
3501            "Expected insecure connector creation to succeed"
3502        );
3503    }
3504
3505    #[tokio::test]
3506    async fn reqwest_connector_invalid_ca_bundle_path_surfaces_error() {
3507        // Use an obviously invalid path (empty string) to trigger a read error.
3508        let result = ReqwestConnector::new(false, Some("")).await;
3509        match result {
3510            Err(Error::Network(msg)) => {
3511                assert!(
3512                    msg.contains("Failed to read CA bundle"),
3513                    "Unexpected error message: {msg}"
3514                );
3515            }
3516            other => panic!("Expected Error::Network for invalid path, got: {:?}", other),
3517        }
3518    }
3519
3520    #[test]
3521    fn should_use_multipart_for_large_files() {
3522        assert!(S3Client::should_use_multipart(
3523            SINGLE_PUT_OBJECT_MAX_SIZE + 1
3524        ));
3525    }
3526
3527    #[test]
3528    fn should_use_single_part_for_small_files() {
3529        assert!(!S3Client::should_use_multipart(0));
3530        assert!(!S3Client::should_use_multipart(1024 * 1024));
3531        assert!(!S3Client::should_use_multipart(
3532            crate::multipart::DEFAULT_PART_SIZE
3533        ));
3534        assert!(!S3Client::should_use_multipart(SINGLE_PUT_OBJECT_MAX_SIZE));
3535    }
3536
3537    #[tokio::test]
3538    async fn delete_object_with_force_delete_sets_rustfs_header() {
3539        let (client, request_receiver) = test_s3_client(None);
3540        let path = RemotePath::new("test", "bucket", "key.txt");
3541
3542        let _ = client
3543            .delete_object_with_options(&path, DeleteRequestOptions { force_delete: true })
3544            .await;
3545
3546        let request = request_receiver.expect_request();
3547        assert_eq!(request.headers().get("x-rustfs-force-delete"), Some("true"));
3548    }
3549
3550    #[tokio::test]
3551    async fn custom_headers_are_added_before_sending_sdk_requests() {
3552        let (client, request_receiver) = test_s3_client_with_endpoint_and_headers(
3553            "https://example.com",
3554            None,
3555            vec![RequestHeader {
3556                name: "x-amz-bucket-encrypt-enabled".to_string(),
3557                value: "1".to_string(),
3558            }],
3559        );
3560        let path = RemotePath::new("test", "bucket", "key.txt");
3561
3562        let _ = client.delete_object(&path).await;
3563
3564        let request = request_receiver.expect_request();
3565        assert_eq!(
3566            request.headers().get("x-amz-bucket-encrypt-enabled"),
3567            Some("1")
3568        );
3569        assert!(
3570            request
3571                .headers()
3572                .get("authorization")
3573                .expect("authorization header")
3574                .contains("x-amz-bucket-encrypt-enabled")
3575        );
3576    }
3577
3578    #[tokio::test]
3579    async fn custom_headers_are_not_required_by_presigned_urls() {
3580        let (client, _request_receiver) = test_s3_client_with_endpoint_and_headers(
3581            "https://example.com",
3582            None,
3583            vec![RequestHeader {
3584                name: "x-amz-bucket-encrypt-enabled".to_string(),
3585                value: "1".to_string(),
3586            }],
3587        );
3588        let path = RemotePath::new("test", "bucket", "key.txt");
3589
3590        let url = client
3591            .presign_get(&path, 3600)
3592            .await
3593            .expect("presign get should succeed");
3594
3595        assert!(!url.contains("x-amz-bucket-encrypt-enabled"));
3596    }
3597
3598    #[tokio::test]
3599    async fn delete_object_without_force_delete_omits_rustfs_header() {
3600        let (client, request_receiver) = test_s3_client(None);
3601        let path = RemotePath::new("test", "bucket", "key.txt");
3602
3603        let _ = client
3604            .delete_object_with_options(&path, DeleteRequestOptions::default())
3605            .await;
3606
3607        let request = request_receiver.expect_request();
3608        assert!(request.headers().get("x-rustfs-force-delete").is_none());
3609    }
3610
3611    #[tokio::test]
3612    async fn delete_object_wrapper_uses_default_options_without_rustfs_header() {
3613        let (client, request_receiver) = test_s3_client(None);
3614        let path = RemotePath::new("test", "bucket", "key.txt");
3615
3616        let _ = ObjectStore::delete_object(&client, &path).await;
3617
3618        let request = request_receiver.expect_request();
3619        assert!(request.headers().get("x-rustfs-force-delete").is_none());
3620    }
3621
3622    #[tokio::test]
3623    async fn delete_object_with_options_maps_missing_keys_to_not_found() {
3624        let response = http::Response::builder()
3625            .status(404)
3626            .header("x-amz-error-code", "NoSuchKey")
3627            .body(SdkBody::from(
3628                r#"<?xml version="1.0" encoding="UTF-8"?>
3629<Error>
3630  <Code>NoSuchKey</Code>
3631  <Message>The specified key does not exist.</Message>
3632</Error>"#,
3633            ))
3634            .expect("build delete object response");
3635        let (client, _request_receiver) = test_s3_client(Some(response));
3636        let path = RemotePath::new("test", "bucket", "missing.txt");
3637
3638        let result = client
3639            .delete_object_with_options(&path, DeleteRequestOptions::default())
3640            .await;
3641
3642        match result {
3643            Err(Error::NotFound(message)) => assert_eq!(message, path.to_string()),
3644            other => panic!("Expected NotFound for missing key, got: {other:?}"),
3645        }
3646    }
3647
3648    #[tokio::test]
3649    async fn delete_object_with_options_maps_other_failures_to_network() {
3650        let response = http::Response::builder()
3651            .status(500)
3652            .header("x-amz-error-code", "InternalError")
3653            .body(SdkBody::from(
3654                r#"<?xml version="1.0" encoding="UTF-8"?>
3655<Error>
3656  <Code>InternalError</Code>
3657  <Message>Something went wrong.</Message>
3658</Error>"#,
3659            ))
3660            .expect("build delete object response");
3661        let (client, _request_receiver) = test_s3_client(Some(response));
3662        let path = RemotePath::new("test", "bucket", "key.txt");
3663
3664        let result = client
3665            .delete_object_with_options(&path, DeleteRequestOptions::default())
3666            .await;
3667
3668        match result {
3669            Err(Error::Network(message)) => assert!(message.contains("InternalError")),
3670            other => panic!("Expected Network for delete failure, got: {other:?}"),
3671        }
3672    }
3673
3674    #[tokio::test]
3675    async fn list_object_versions_page_preserves_markers_and_delete_markers() {
3676        let response = http::Response::builder()
3677            .status(200)
3678            .body(SdkBody::from(
3679                r#"<?xml version="1.0" encoding="UTF-8"?>
3680<ListVersionsResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
3681  <Name>bucket</Name>
3682  <Prefix>logs/</Prefix>
3683  <KeyMarker></KeyMarker>
3684  <VersionIdMarker></VersionIdMarker>
3685  <NextKeyMarker>logs/c.txt</NextKeyMarker>
3686  <NextVersionIdMarker>v3</NextVersionIdMarker>
3687  <MaxKeys>25</MaxKeys>
3688  <IsTruncated>true</IsTruncated>
3689  <Version>
3690    <Key>logs/a.txt</Key>
3691    <VersionId>v1</VersionId>
3692    <IsLatest>true</IsLatest>
3693    <LastModified>2026-04-29T11:22:33.000Z</LastModified>
3694    <ETag>"etag-a"</ETag>
3695    <Size>12</Size>
3696    <StorageClass>STANDARD</StorageClass>
3697  </Version>
3698  <DeleteMarker>
3699    <Key>logs/b.txt</Key>
3700    <VersionId>v2</VersionId>
3701    <IsLatest>false</IsLatest>
3702    <LastModified>2026-04-28T10:20:30.000Z</LastModified>
3703    <Owner>
3704      <ID>owner</ID>
3705    </Owner>
3706  </DeleteMarker>
3707</ListVersionsResult>"#,
3708            ))
3709            .expect("build list object versions response");
3710        let (client, request_receiver) = test_s3_client(Some(response));
3711        let path = RemotePath::new("test", "bucket", "logs/");
3712
3713        let result = client
3714            .list_object_versions_page(&path, Some(25))
3715            .await
3716            .expect("list object versions page");
3717
3718        let request = request_receiver.expect_request();
3719        let uri = request.uri().to_string();
3720        assert!(
3721            uri.starts_with("https://example.com/bucket/?"),
3722            "unexpected URI: {uri}"
3723        );
3724        assert!(
3725            uri.contains("versions"),
3726            "expected versions subresource: {uri}"
3727        );
3728        assert!(
3729            uri.contains("prefix=logs%2F"),
3730            "expected prefix query: {uri}"
3731        );
3732        assert!(
3733            uri.contains("max-keys=25"),
3734            "expected max-keys query: {uri}"
3735        );
3736
3737        assert!(result.truncated);
3738        assert_eq!(result.continuation_token.as_deref(), Some("logs/c.txt"));
3739        assert_eq!(result.version_id_marker.as_deref(), Some("v3"));
3740        assert_eq!(result.items.len(), 2);
3741
3742        let version = &result.items[0];
3743        assert_eq!(version.key, "logs/a.txt");
3744        assert_eq!(version.version_id, "v1");
3745        assert!(version.is_latest);
3746        assert!(!version.is_delete_marker);
3747        assert_eq!(version.size_bytes, Some(12));
3748        assert_eq!(version.etag.as_deref(), Some("etag-a"));
3749
3750        let delete_marker = &result.items[1];
3751        assert_eq!(delete_marker.key, "logs/b.txt");
3752        assert_eq!(delete_marker.version_id, "v2");
3753        assert!(!delete_marker.is_latest);
3754        assert!(delete_marker.is_delete_marker);
3755        assert_eq!(delete_marker.size_bytes, None);
3756        assert_eq!(delete_marker.etag, None);
3757    }
3758
3759    #[tokio::test]
3760    async fn list_object_versions_page_maps_missing_bucket_to_not_found() {
3761        let response = http::Response::builder()
3762            .status(404)
3763            .header("x-amz-error-code", "NoSuchBucket")
3764            .body(SdkBody::from(
3765                r#"<?xml version="1.0" encoding="UTF-8"?>
3766<Error>
3767  <Code>NoSuchBucket</Code>
3768  <Message>The specified bucket does not exist.</Message>
3769</Error>"#,
3770            ))
3771            .expect("build missing bucket response");
3772        let (client, _request_receiver) = test_s3_client(Some(response));
3773        let path = RemotePath::new("test", "missing-bucket", "");
3774
3775        let result = client.list_object_versions_page(&path, Some(1000)).await;
3776
3777        match result {
3778            Err(Error::NotFound(message)) => {
3779                assert_eq!(message, "Bucket not found: missing-bucket")
3780            }
3781            other => panic!("Expected NotFound for missing bucket, got: {other:?}"),
3782        }
3783    }
3784
3785    #[tokio::test]
3786    async fn list_object_versions_page_maps_not_found_code_to_not_found() {
3787        let response = http::Response::builder()
3788            .status(404)
3789            .header("x-amz-error-code", "NotFound")
3790            .body(SdkBody::from(
3791                r#"<?xml version="1.0" encoding="UTF-8"?>
3792<Error>
3793  <Code>NotFound</Code>
3794  <Message>The specified bucket does not exist.</Message>
3795</Error>"#,
3796            ))
3797            .expect("build not found bucket response");
3798        let (client, _request_receiver) = test_s3_client(Some(response));
3799        let path = RemotePath::new("test", "missing-bucket", "");
3800
3801        let result = client.list_object_versions_page(&path, Some(1000)).await;
3802
3803        match result {
3804            Err(Error::NotFound(message)) => {
3805                assert_eq!(message, "Bucket not found: missing-bucket")
3806            }
3807            other => panic!("Expected NotFound for NotFound list versions error, got: {other:?}"),
3808        }
3809    }
3810
3811    #[tokio::test]
3812    async fn list_object_versions_page_maps_other_failures_to_network() {
3813        let response = http::Response::builder()
3814            .status(500)
3815            .header("x-amz-error-code", "InternalError")
3816            .body(SdkBody::from(
3817                r#"<?xml version="1.0" encoding="UTF-8"?>
3818<Error>
3819  <Code>InternalError</Code>
3820  <Message>Something went wrong.</Message>
3821</Error>"#,
3822            ))
3823            .expect("build internal error response");
3824        let (client, _request_receiver) = test_s3_client(Some(response));
3825        let path = RemotePath::new("test", "bucket", "");
3826
3827        let result = client.list_object_versions_page(&path, Some(1000)).await;
3828
3829        match result {
3830            Err(Error::Network(message)) => assert!(message.contains("InternalError")),
3831            other => panic!("Expected Network for list versions failure, got: {other:?}"),
3832        }
3833    }
3834
3835    #[tokio::test]
3836    async fn delete_bucket_maps_bucket_not_empty_to_conflict() {
3837        let response = http::Response::builder()
3838            .status(409)
3839            .header("x-amz-error-code", "BucketNotEmpty")
3840            .body(SdkBody::from(
3841                r#"<?xml version="1.0" encoding="UTF-8"?>
3842<Error>
3843  <Code>BucketNotEmpty</Code>
3844  <Message>The bucket you tried to delete is not empty.</Message>
3845</Error>"#,
3846            ))
3847            .expect("build delete bucket response");
3848        let (client, _request_receiver) = test_s3_client(Some(response));
3849
3850        let result = client.delete_bucket("bucket").await;
3851
3852        match result {
3853            Err(Error::Conflict(message)) => assert!(message.contains("BucketNotEmpty")),
3854            other => panic!("Expected Conflict for non-empty bucket, got: {other:?}"),
3855        }
3856    }
3857
3858    #[tokio::test]
3859    async fn delete_bucket_maps_missing_bucket_to_not_found() {
3860        let response = http::Response::builder()
3861            .status(404)
3862            .header("x-amz-error-code", "NoSuchBucket")
3863            .body(SdkBody::from(
3864                r#"<?xml version="1.0" encoding="UTF-8"?>
3865<Error>
3866  <Code>NoSuchBucket</Code>
3867  <Message>The specified bucket does not exist.</Message>
3868</Error>"#,
3869            ))
3870            .expect("build missing bucket response");
3871        let (client, _request_receiver) = test_s3_client(Some(response));
3872
3873        let result = client.delete_bucket("missing-bucket").await;
3874
3875        match result {
3876            Err(Error::NotFound(message)) => {
3877                assert_eq!(message, "Bucket not found: missing-bucket")
3878            }
3879            other => panic!("Expected NotFound for missing bucket, got: {other:?}"),
3880        }
3881    }
3882
3883    #[tokio::test]
3884    async fn delete_bucket_maps_not_found_code_to_not_found() {
3885        let response = http::Response::builder()
3886            .status(404)
3887            .header("x-amz-error-code", "NotFound")
3888            .body(SdkBody::from(
3889                r#"<?xml version="1.0" encoding="UTF-8"?>
3890<Error>
3891  <Code>NotFound</Code>
3892  <Message>The specified bucket does not exist.</Message>
3893</Error>"#,
3894            ))
3895            .expect("build not found bucket response");
3896        let (client, _request_receiver) = test_s3_client(Some(response));
3897
3898        let result = client.delete_bucket("missing-bucket").await;
3899
3900        match result {
3901            Err(Error::NotFound(message)) => {
3902                assert_eq!(message, "Bucket not found: missing-bucket")
3903            }
3904            other => panic!("Expected NotFound for NotFound delete bucket error, got: {other:?}"),
3905        }
3906    }
3907
3908    #[tokio::test]
3909    async fn delete_bucket_maps_other_failures_to_network() {
3910        let response = http::Response::builder()
3911            .status(500)
3912            .header("x-amz-error-code", "InternalError")
3913            .body(SdkBody::from(
3914                r#"<?xml version="1.0" encoding="UTF-8"?>
3915<Error>
3916  <Code>InternalError</Code>
3917  <Message>Something went wrong.</Message>
3918</Error>"#,
3919            ))
3920            .expect("build delete bucket response");
3921        let (client, _request_receiver) = test_s3_client(Some(response));
3922
3923        let result = client.delete_bucket("bucket").await;
3924
3925        match result {
3926            Err(Error::Network(message)) => assert!(message.contains("InternalError")),
3927            other => panic!("Expected Network for delete bucket failure, got: {other:?}"),
3928        }
3929    }
3930
3931    #[tokio::test]
3932    async fn delete_objects_with_force_delete_sets_rustfs_header() {
3933        let response = http::Response::builder()
3934            .status(200)
3935            .body(SdkBody::from(
3936                r#"<?xml version="1.0" encoding="UTF-8"?>
3937<DeleteResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/" />"#,
3938            ))
3939            .expect("build delete objects response");
3940        let (client, request_receiver) = test_s3_client(Some(response));
3941
3942        let _ = client
3943            .delete_objects_with_options(
3944                "bucket",
3945                vec!["key.txt".to_string()],
3946                DeleteRequestOptions { force_delete: true },
3947            )
3948            .await;
3949
3950        let request = request_receiver.expect_request();
3951        assert_eq!(request.headers().get("x-rustfs-force-delete"), Some("true"));
3952    }
3953
3954    #[tokio::test]
3955    async fn delete_objects_without_force_delete_omits_rustfs_header() {
3956        let response = http::Response::builder()
3957            .status(200)
3958            .body(SdkBody::from(
3959                r#"<?xml version="1.0" encoding="UTF-8"?>
3960<DeleteResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/" />"#,
3961            ))
3962            .expect("build delete objects response");
3963        let (client, request_receiver) = test_s3_client(Some(response));
3964
3965        let _ = client
3966            .delete_objects_with_options(
3967                "bucket",
3968                vec!["key.txt".to_string()],
3969                DeleteRequestOptions::default(),
3970            )
3971            .await;
3972
3973        let request = request_receiver.expect_request();
3974        assert!(request.headers().get("x-rustfs-force-delete").is_none());
3975    }
3976
3977    #[tokio::test]
3978    async fn delete_objects_wrapper_uses_default_options_without_rustfs_header() {
3979        let response = http::Response::builder()
3980            .status(200)
3981            .body(SdkBody::from(
3982                r#"<?xml version="1.0" encoding="UTF-8"?>
3983<DeleteResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/" />"#,
3984            ))
3985            .expect("build delete objects response");
3986        let (client, request_receiver) = test_s3_client(Some(response));
3987
3988        let _ = ObjectStore::delete_objects(&client, "bucket", vec!["key.txt".to_string()]).await;
3989
3990        let request = request_receiver.expect_request();
3991        assert!(request.headers().get("x-rustfs-force-delete").is_none());
3992    }
3993
3994    #[tokio::test]
3995    async fn delete_objects_with_empty_keys_skips_http_request() {
3996        let (client, request_receiver) = test_s3_client(None);
3997
3998        let deleted = client
3999            .delete_objects_with_options("bucket", Vec::new(), DeleteRequestOptions::default())
4000            .await
4001            .expect("empty delete should succeed");
4002
4003        assert!(deleted.is_empty());
4004        request_receiver.expect_no_request();
4005    }
4006
4007    #[tokio::test]
4008    async fn delete_objects_with_partial_errors_returns_deleted_keys() {
4009        let response = http::Response::builder()
4010            .status(200)
4011            .body(SdkBody::from(
4012                r#"<?xml version="1.0" encoding="UTF-8"?>
4013<DeleteResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
4014  <Deleted>
4015    <Key>kept.txt</Key>
4016  </Deleted>
4017  <Error>
4018    <Key>failed.txt</Key>
4019    <Code>AccessDenied</Code>
4020    <Message>Access Denied</Message>
4021  </Error>
4022</DeleteResult>"#,
4023            ))
4024            .expect("build partial delete response");
4025        let (client, request_receiver) = test_s3_client(Some(response));
4026
4027        let deleted = client
4028            .delete_objects_with_options(
4029                "bucket",
4030                vec!["kept.txt".to_string(), "failed.txt".to_string()],
4031                DeleteRequestOptions::default(),
4032            )
4033            .await
4034            .expect("partial delete should still return deleted keys");
4035
4036        let request = request_receiver.expect_request();
4037        assert_eq!(request.uri(), "https://example.com/bucket/?delete");
4038        assert_eq!(deleted, vec!["kept.txt".to_string()]);
4039    }
4040
4041    #[tokio::test]
4042    async fn read_next_part_fills_buffer_until_eof() {
4043        use tokio::io::AsyncWriteExt;
4044
4045        let temp_dir = tempfile::tempdir().expect("create temp dir");
4046        let file_path = temp_dir.path().join("payload.bin");
4047        let mut writer = tokio::fs::File::create(&file_path)
4048            .await
4049            .expect("create temp file");
4050        writer
4051            .write_all(b"abcdefghij")
4052            .await
4053            .expect("write temp file");
4054        writer.flush().await.expect("flush temp file");
4055        drop(writer);
4056
4057        let mut reader = tokio::fs::File::open(&file_path)
4058            .await
4059            .expect("open temp file");
4060        let mut buffer = vec![0u8; 8];
4061
4062        let first = S3Client::read_next_part(&mut reader, &file_path, &mut buffer)
4063            .await
4064            .expect("first read");
4065        assert_eq!(first, 8);
4066        assert_eq!(&buffer[..first], b"abcdefgh");
4067
4068        let second = S3Client::read_next_part(&mut reader, &file_path, &mut buffer)
4069            .await
4070            .expect("second read");
4071        assert_eq!(second, 2);
4072        assert_eq!(&buffer[..second], b"ij");
4073
4074        let third = S3Client::read_next_part(&mut reader, &file_path, &mut buffer)
4075            .await
4076            .expect("third read");
4077        assert_eq!(third, 0);
4078    }
4079}