Skip to main content

rc_s3/
client.rs

1//! S3 client implementation
2//!
3//! Wraps aws-sdk-s3 and implements the ObjectStore trait from rc-core.
4
5use async_trait::async_trait;
6use aws_credential_types::Credentials;
7use aws_sdk_s3::error::ProvideErrorMetadata;
8use aws_sigv4::http_request::{
9    SignableBody, SignableRequest, SignatureLocation, SigningSettings, sign,
10};
11use aws_sigv4::sign::v4;
12use aws_smithy_runtime_api::client::http::{
13    HttpClient, HttpConnector, HttpConnectorFuture, HttpConnectorSettings, SharedHttpConnector,
14};
15use aws_smithy_runtime_api::client::orchestrator::HttpRequest;
16use aws_smithy_runtime_api::client::result::ConnectorError;
17use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
18use aws_smithy_runtime_api::http::{Response, StatusCode};
19use aws_smithy_types::body::SdkBody;
20use bytes::Bytes;
21use jiff::Timestamp;
22use quick_xml::de::from_str as from_xml_str;
23use rc_core::{
24    Alias, BucketNotification, Capabilities, CorsRule, Error, LifecycleRule, ListOptions,
25    ListResult, NotificationTarget, ObjectInfo, ObjectStore, ObjectVersion,
26    ObjectVersionListResult, RemotePath, ReplicationConfiguration, Result, SelectOptions,
27};
28use reqwest::Method;
29use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderName, HeaderValue};
30use serde::Deserialize;
31use sha2::{Digest, Sha256};
32use std::collections::HashMap;
33use tokio::io::AsyncReadExt;
34use tokio::io::AsyncWrite;
35
36/// Keep single-part uploads small to avoid backend incompatibilities with
37/// streaming aws-chunked payloads.
38const SINGLE_PUT_OBJECT_MAX_SIZE: u64 = crate::multipart::DEFAULT_PART_SIZE;
39const S3_SERVICE_NAME: &str = "s3";
40const S3_REPLICATION_XML_NAMESPACE: &str = "http://s3.amazonaws.com/doc/2006-03-01/";
41const RUSTFS_FORCE_DELETE_HEADER: &str = "x-rustfs-force-delete";
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
44enum BucketPolicyErrorKind {
45    MissingPolicy,
46    MissingBucket,
47    Other,
48}
49
50/// Custom HTTP connector using reqwest, supporting insecure TLS (skip cert verification)
51/// and custom CA bundles. Used when `alias.insecure = true` or `alias.ca_bundle.is_some()`.
52#[derive(Debug, Clone)]
53struct ReqwestConnector {
54    client: reqwest::Client,
55}
56
57impl ReqwestConnector {
58    async fn new(insecure: bool, ca_bundle: Option<&str>) -> Result<Self> {
59        let client = build_reqwest_client(insecure, ca_bundle).await?;
60        Ok(Self { client })
61    }
62}
63
64async fn build_reqwest_client(insecure: bool, ca_bundle: Option<&str>) -> Result<reqwest::Client> {
65    // NOTE: When `insecure = true`, `danger_accept_invalid_certs` disables all TLS
66    // certificate verification. Any CA bundle provided will still be added to the
67    // trust store but is rendered ineffective for this connection.
68    let mut builder = reqwest::Client::builder().danger_accept_invalid_certs(insecure);
69
70    if let Some(bundle_path) = ca_bundle {
71        // Use tokio::fs::read to avoid blocking the async runtime thread.
72        let pem = tokio::fs::read(bundle_path).await.map_err(|e| {
73            Error::Network(format!("Failed to read CA bundle '{bundle_path}': {e}"))
74        })?;
75        let cert = reqwest::Certificate::from_pem(&pem)
76            .map_err(|e| Error::Network(format!("Invalid CA bundle '{bundle_path}': {e}")))?;
77        builder = builder.add_root_certificate(cert);
78    }
79
80    let client = builder
81        .build()
82        .map_err(|e| Error::Network(format!("Failed to build HTTP client: {e}")))?;
83    Ok(client)
84}
85
86fn force_path_style_for_alias(alias: &Alias) -> bool {
87    match alias.bucket_lookup.as_str() {
88        "path" => true,
89        "dns" => false,
90        "auto" => !is_aliyun_oss_service_endpoint(&alias.endpoint),
91        _ => true,
92    }
93}
94
95fn is_aliyun_oss_service_endpoint(endpoint: &str) -> bool {
96    let Ok(url) = reqwest::Url::parse(endpoint.trim_end_matches('/')) else {
97        return false;
98    };
99
100    let Some(host) = url.host_str() else {
101        return false;
102    };
103
104    host.strip_suffix(".aliyuncs.com")
105        .and_then(|host| host.split('.').next())
106        .is_some_and(|first_label| first_label.starts_with("oss-"))
107}
108
109#[derive(Debug, Deserialize)]
110#[serde(rename_all = "PascalCase")]
111struct ReplicationConfigurationXml {
112    role: Option<String>,
113    #[serde(rename = "Rule", default)]
114    rules: Vec<ReplicationRuleXml>,
115}
116
117#[derive(Debug, Deserialize)]
118#[serde(rename_all = "PascalCase")]
119struct ReplicationRuleXml {
120    #[serde(rename = "ID")]
121    id: Option<String>,
122    priority: Option<i32>,
123    status: Option<String>,
124    #[serde(rename = "Prefix")]
125    legacy_prefix: Option<String>,
126    filter: Option<ReplicationFilterXml>,
127    destination: Option<ReplicationDestinationXml>,
128    delete_marker_replication: Option<ReplicationStatusXml>,
129    existing_object_replication: Option<ReplicationStatusXml>,
130    delete_replication: Option<ReplicationStatusXml>,
131}
132
133#[derive(Debug, Deserialize)]
134#[serde(rename_all = "PascalCase")]
135struct ReplicationFilterXml {
136    prefix: Option<String>,
137    tag: Option<TagXml>,
138    and: Option<ReplicationAndXml>,
139}
140
141#[derive(Debug, Deserialize)]
142#[serde(rename_all = "PascalCase")]
143struct ReplicationAndXml {
144    prefix: Option<String>,
145    #[serde(rename = "Tag", default)]
146    tags: Vec<TagXml>,
147}
148
149#[derive(Debug, Deserialize)]
150#[serde(rename_all = "PascalCase")]
151struct TagXml {
152    key: Option<String>,
153    value: Option<String>,
154}
155
156#[derive(Debug, Deserialize)]
157#[serde(rename_all = "PascalCase")]
158struct ReplicationDestinationXml {
159    bucket: Option<String>,
160    storage_class: Option<String>,
161}
162
163#[derive(Debug, Deserialize)]
164#[serde(rename_all = "PascalCase")]
165struct ReplicationStatusXml {
166    status: Option<String>,
167}
168
169#[derive(Debug, Deserialize)]
170#[serde(rename_all = "PascalCase")]
171struct CorsConfigurationXml {
172    #[serde(rename = "CORSRule", default)]
173    rules: Vec<CorsRuleXml>,
174}
175
176#[derive(Debug, Deserialize)]
177#[serde(rename_all = "PascalCase")]
178struct CorsRuleXml {
179    #[serde(rename = "ID")]
180    id: Option<String>,
181    #[serde(rename = "AllowedOrigin", default)]
182    allowed_origins: Vec<String>,
183    #[serde(rename = "AllowedMethod", default)]
184    allowed_methods: Vec<String>,
185    #[serde(rename = "AllowedHeader", default)]
186    allowed_headers: Vec<String>,
187    #[serde(rename = "ExposeHeader", default)]
188    expose_headers: Vec<String>,
189    max_age_seconds: Option<i32>,
190}
191
192fn parse_replication_status(status: Option<&ReplicationStatusXml>) -> Option<bool> {
193    status
194        .and_then(|value| value.status.as_deref())
195        .map(|value| value.eq_ignore_ascii_case("enabled"))
196}
197
198fn parse_replication_rule_status(status: Option<&str>) -> rc_core::ReplicationRuleStatus {
199    match status {
200        Some(value) if value.eq_ignore_ascii_case("enabled") => {
201            rc_core::ReplicationRuleStatus::Enabled
202        }
203        _ => rc_core::ReplicationRuleStatus::Disabled,
204    }
205}
206
207fn collect_tag_map<'a, I>(tags: I) -> Option<HashMap<String, String>>
208where
209    I: IntoIterator<Item = (&'a str, &'a str)>,
210{
211    let collected: HashMap<String, String> = tags
212        .into_iter()
213        .map(|(key, value)| (key.to_string(), value.to_string()))
214        .collect();
215    if collected.is_empty() {
216        None
217    } else {
218        Some(collected)
219    }
220}
221
222fn parse_tag_xml(tag: Option<&TagXml>) -> Option<HashMap<String, String>> {
223    collect_tag_map(tag.and_then(|tag| Some((tag.key.as_deref()?, tag.value.as_deref()?))))
224}
225
226fn parse_tag_xmls(tags: &[TagXml]) -> Option<HashMap<String, String>> {
227    collect_tag_map(
228        tags.iter()
229            .filter_map(|tag| Some((tag.key.as_deref()?, tag.value.as_deref()?))),
230    )
231}
232
233fn parse_replication_filter_prefix(filter: Option<&ReplicationFilterXml>) -> Option<String> {
234    filter
235        .and_then(|filter| filter.prefix.clone())
236        .or_else(|| filter.and_then(|filter| filter.and.as_ref()?.prefix.clone()))
237}
238
239fn parse_replication_filter_tags(
240    filter: Option<&ReplicationFilterXml>,
241) -> Option<HashMap<String, String>> {
242    filter
243        .and_then(|filter| parse_tag_xml(filter.tag.as_ref()))
244        .or_else(|| filter.and_then(|filter| parse_tag_xmls(&filter.and.as_ref()?.tags)))
245}
246
247fn sorted_tags(tags: &HashMap<String, String>) -> Vec<(&str, &str)> {
248    let mut pairs: Vec<(&str, &str)> = tags
249        .iter()
250        .map(|(key, value)| (key.as_str(), value.as_str()))
251        .collect();
252    pairs.sort_unstable();
253    pairs
254}
255
256fn append_tag_xml(xml: &mut String, key: &str, value: &str) {
257    xml.push_str("<Tag><Key>");
258    xml.push_str(&xml_escape(key));
259    xml.push_str("</Key><Value>");
260    xml.push_str(&xml_escape(value));
261    xml.push_str("</Value></Tag>");
262}
263
264fn append_replication_filter_xml(
265    xml: &mut String,
266    prefix: Option<&str>,
267    tags: Option<&HashMap<String, String>>,
268) {
269    let Some(tags) = tags.filter(|tags| !tags.is_empty()) else {
270        if let Some(prefix) = prefix {
271            xml.push_str("<Filter><Prefix>");
272            xml.push_str(&xml_escape(prefix));
273            xml.push_str("</Prefix></Filter>");
274        }
275        return;
276    };
277
278    xml.push_str("<Filter>");
279    if prefix.is_some() || tags.len() > 1 {
280        xml.push_str("<And>");
281        if let Some(prefix) = prefix {
282            xml.push_str("<Prefix>");
283            xml.push_str(&xml_escape(prefix));
284            xml.push_str("</Prefix>");
285        }
286        for (key, value) in sorted_tags(tags) {
287            append_tag_xml(xml, key, value);
288        }
289        xml.push_str("</And>");
290    } else if let Some((key, value)) = sorted_tags(tags).into_iter().next() {
291        append_tag_xml(xml, key, value);
292    }
293    xml.push_str("</Filter>");
294}
295
296fn normalize_optional_strings(values: Option<Vec<String>>) -> Option<Vec<String>> {
297    values.filter(|items| !items.is_empty())
298}
299
300fn is_missing_cors_configuration_error(error_text: &str) -> bool {
301    let normalized = error_text.to_ascii_lowercase();
302    normalized.contains("nosuchcorsconfiguration")
303        || normalized.contains("cors configuration does not exist")
304        || normalized.contains("the cors configuration does not exist")
305}
306
307fn is_missing_cors_configuration_response(
308    error_code: Option<&str>,
309    status_code: Option<u16>,
310    error_text: &str,
311) -> bool {
312    let error_code = error_code.map(|code| code.to_ascii_lowercase());
313    if matches!(error_code.as_deref(), Some("nosuchcorsconfiguration")) {
314        return true;
315    }
316
317    if !is_missing_cors_configuration_error(error_text) {
318        return false;
319    }
320
321    status_code.is_none_or(|status| status == 404)
322}
323
324fn sdk_cors_rule_to_core(rule: &aws_sdk_s3::types::CorsRule) -> CorsRule {
325    CorsRule {
326        id: rule.id().map(str::to_string),
327        allowed_origins: rule.allowed_origins().to_vec(),
328        allowed_methods: rule.allowed_methods().to_vec(),
329        allowed_headers: normalize_optional_strings(Some(rule.allowed_headers().to_vec())),
330        expose_headers: normalize_optional_strings(Some(rule.expose_headers().to_vec())),
331        max_age_seconds: rule.max_age_seconds(),
332    }
333}
334
335fn core_cors_rule_to_sdk(rule: &CorsRule) -> Result<aws_sdk_s3::types::CorsRule> {
336    aws_sdk_s3::types::CorsRule::builder()
337        .set_id(rule.id.clone())
338        .set_allowed_origins(Some(rule.allowed_origins.clone()))
339        .set_allowed_methods(Some(
340            rule.allowed_methods
341                .iter()
342                .map(|method| method.to_ascii_uppercase())
343                .collect(),
344        ))
345        .set_allowed_headers(normalize_optional_strings(rule.allowed_headers.clone()))
346        .set_expose_headers(normalize_optional_strings(rule.expose_headers.clone()))
347        .set_max_age_seconds(rule.max_age_seconds)
348        .build()
349        .map_err(|e| Error::General(format!("build bucket cors rule: {e}")))
350}
351
352fn parse_cors_configuration_xml(body: &str) -> Result<Vec<CorsRule>> {
353    let config: CorsConfigurationXml =
354        from_xml_str(body).map_err(|e| Error::General(format!("parse bucket cors xml: {e}")))?;
355
356    Ok(config
357        .rules
358        .into_iter()
359        .map(|rule| CorsRule {
360            id: rule.id,
361            allowed_origins: rule.allowed_origins,
362            allowed_methods: rule.allowed_methods,
363            allowed_headers: normalize_optional_strings(Some(rule.allowed_headers)),
364            expose_headers: normalize_optional_strings(Some(rule.expose_headers)),
365            max_age_seconds: rule.max_age_seconds,
366        })
367        .collect())
368}
369
370fn parse_replication_configuration_xml(body: &str) -> Result<ReplicationConfiguration> {
371    let config: ReplicationConfigurationXml = from_xml_str(body)
372        .map_err(|e| Error::General(format!("parse replication config xml: {e}")))?;
373
374    let rules = config
375        .rules
376        .into_iter()
377        .map(|rule| rc_core::ReplicationRule {
378            id: rule.id.unwrap_or_default(),
379            priority: rule.priority.unwrap_or_default(),
380            status: parse_replication_rule_status(rule.status.as_deref()),
381            prefix: parse_replication_filter_prefix(rule.filter.as_ref()).or(rule.legacy_prefix),
382            tags: parse_replication_filter_tags(rule.filter.as_ref()),
383            destination: rc_core::ReplicationDestination {
384                bucket_arn: rule
385                    .destination
386                    .as_ref()
387                    .and_then(|destination| destination.bucket.clone())
388                    .unwrap_or_default(),
389                storage_class: rule
390                    .destination
391                    .and_then(|destination| destination.storage_class),
392            },
393            delete_marker_replication: parse_replication_status(
394                rule.delete_marker_replication.as_ref(),
395            ),
396            existing_object_replication: parse_replication_status(
397                rule.existing_object_replication.as_ref(),
398            ),
399            delete_replication: parse_replication_status(rule.delete_replication.as_ref()),
400        })
401        .collect();
402
403    Ok(ReplicationConfiguration {
404        role: config.role.unwrap_or_default(),
405        rules,
406    })
407}
408
409fn xml_escape(value: &str) -> String {
410    value
411        .replace('&', "&amp;")
412        .replace('<', "&lt;")
413        .replace('>', "&gt;")
414        .replace('"', "&quot;")
415        .replace('\'', "&apos;")
416}
417
418fn append_replication_status_tag(xml: &mut String, tag: &str, enabled: Option<bool>) {
419    if let Some(enabled) = enabled {
420        let status = if enabled { "Enabled" } else { "Disabled" };
421        xml.push('<');
422        xml.push_str(tag);
423        xml.push_str("><Status>");
424        xml.push_str(status);
425        xml.push_str("</Status></");
426        xml.push_str(tag);
427        xml.push('>');
428    }
429}
430
431fn build_replication_configuration_xml(config: &ReplicationConfiguration) -> String {
432    let mut xml = String::from(r#"<?xml version="1.0" encoding="UTF-8"?>"#);
433    xml.push_str(r#"<ReplicationConfiguration xmlns=""#);
434    xml.push_str(S3_REPLICATION_XML_NAMESPACE);
435    xml.push_str(r#"">"#);
436
437    if !config.role.is_empty() {
438        xml.push_str("<Role>");
439        xml.push_str(&xml_escape(&config.role));
440        xml.push_str("</Role>");
441    }
442
443    for rule in &config.rules {
444        xml.push_str("<Rule>");
445
446        xml.push_str("<Status>");
447        xml.push_str(match rule.status {
448            rc_core::ReplicationRuleStatus::Enabled => "Enabled",
449            rc_core::ReplicationRuleStatus::Disabled => "Disabled",
450        });
451        xml.push_str("</Status>");
452
453        xml.push_str("<Destination><Bucket>");
454        xml.push_str(&xml_escape(&rule.destination.bucket_arn));
455        xml.push_str("</Bucket>");
456        if let Some(storage_class) = &rule.destination.storage_class {
457            xml.push_str("<StorageClass>");
458            xml.push_str(&xml_escape(storage_class));
459            xml.push_str("</StorageClass>");
460        }
461        xml.push_str("</Destination>");
462
463        if !rule.id.is_empty() {
464            xml.push_str("<ID>");
465            xml.push_str(&xml_escape(&rule.id));
466            xml.push_str("</ID>");
467        }
468
469        xml.push_str("<Priority>");
470        xml.push_str(&rule.priority.to_string());
471        xml.push_str("</Priority>");
472
473        append_replication_filter_xml(&mut xml, rule.prefix.as_deref(), rule.tags.as_ref());
474
475        append_replication_status_tag(
476            &mut xml,
477            "ExistingObjectReplication",
478            rule.existing_object_replication,
479        );
480        append_replication_status_tag(
481            &mut xml,
482            "DeleteMarkerReplication",
483            rule.delete_marker_replication,
484        );
485        append_replication_status_tag(&mut xml, "DeleteReplication", rule.delete_replication);
486
487        xml.push_str("</Rule>");
488    }
489
490    xml.push_str("</ReplicationConfiguration>");
491    xml
492}
493
494fn parse_lifecycle_filter_prefix(
495    filter: Option<&aws_sdk_s3::types::LifecycleRuleFilter>,
496) -> Option<String> {
497    filter
498        .and_then(|filter| filter.prefix().map(str::to_string))
499        .or_else(|| filter.and_then(|filter| filter.and()?.prefix().map(str::to_string)))
500}
501
502fn parse_lifecycle_filter_tags(
503    filter: Option<&aws_sdk_s3::types::LifecycleRuleFilter>,
504) -> Option<HashMap<String, String>> {
505    filter
506        .and_then(|filter| collect_tag_map(filter.tag().map(|tag| (tag.key(), tag.value()))))
507        .or_else(|| {
508            filter.and_then(|filter| {
509                collect_tag_map(
510                    filter
511                        .and()?
512                        .tags()
513                        .iter()
514                        .map(|tag| (tag.key(), tag.value())),
515                )
516            })
517        })
518}
519
520fn build_s3_tag(key: &str, value: &str) -> Result<aws_sdk_s3::types::Tag> {
521    aws_sdk_s3::types::Tag::builder()
522        .key(key)
523        .value(value)
524        .build()
525        .map_err(|error| Error::General(format!("build filter tag: {error}")))
526}
527
528fn build_lifecycle_rule_filter(
529    prefix: Option<&str>,
530    tags: Option<&HashMap<String, String>>,
531) -> Result<Option<aws_sdk_s3::types::LifecycleRuleFilter>> {
532    let Some(tags) = tags.filter(|tags| !tags.is_empty()) else {
533        return Ok(prefix.map(|prefix| {
534            aws_sdk_s3::types::LifecycleRuleFilter::builder()
535                .prefix(prefix)
536                .build()
537        }));
538    };
539
540    let tag_values = sorted_tags(tags)
541        .into_iter()
542        .map(|(key, value)| build_s3_tag(key, value))
543        .collect::<Result<Vec<_>>>()?;
544
545    let filter = if prefix.is_some() || tag_values.len() > 1 {
546        let mut and_builder = aws_sdk_s3::types::LifecycleRuleAndOperator::builder();
547        if let Some(prefix) = prefix {
548            and_builder = and_builder.prefix(prefix);
549        }
550        for tag in tag_values {
551            and_builder = and_builder.tags(tag);
552        }
553        aws_sdk_s3::types::LifecycleRuleFilter::builder()
554            .and(and_builder.build())
555            .build()
556    } else {
557        aws_sdk_s3::types::LifecycleRuleFilter::builder()
558            .tag(
559                tag_values
560                    .into_iter()
561                    .next()
562                    .expect("non-empty tags required to build lifecycle filter"),
563            )
564            .build()
565    };
566
567    Ok(Some(filter))
568}
569
570impl HttpConnector for ReqwestConnector {
571    fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
572        let client = self.client.clone();
573        HttpConnectorFuture::new(async move {
574            // Extract request parts before consuming the request
575            let uri = request.uri().to_string();
576            let method_str = request.method().to_string();
577            let headers = request.headers().clone();
578
579            // Try to get the body as buffered in-memory bytes.
580            // For streaming bodies (e.g., large file uploads), bytes() returns None and we
581            // return a clear error rather than silently sending an empty body, which would
582            // cause signature mismatches or server-side failures.
583            let body_bytes = match request.body().bytes() {
584                Some(b) => Bytes::copy_from_slice(b),
585                None => {
586                    return Err(ConnectorError::user(
587                        "Streaming request bodies are not supported in insecure/ca_bundle TLS mode; \
588                         use in-memory data for uploads with this connector"
589                            .into(),
590                    ));
591                }
592            };
593
594            // Build reqwest method
595            let method = reqwest::Method::from_bytes(method_str.as_bytes())
596                .map_err(|e| ConnectorError::user(Box::new(e)))?;
597
598            // Build reqwest URL
599            let url = reqwest::Url::parse(&uri).map_err(|e| ConnectorError::user(Box::new(e)))?;
600
601            // Build reqwest request
602            let mut req = reqwest::Request::new(method, url);
603
604            // Copy headers; S3 headers are all ASCII so failures here are unexpected
605            for (name, value) in headers.iter() {
606                match (
607                    reqwest::header::HeaderName::from_bytes(name.as_bytes()),
608                    reqwest::header::HeaderValue::from_bytes(value.as_bytes()),
609                ) {
610                    (Ok(header_name), Ok(header_value)) => {
611                        req.headers_mut().append(header_name, header_value);
612                    }
613                    _ => {
614                        tracing::warn!("Skipping non-convertible request header: {}", name);
615                    }
616                }
617            }
618
619            // Set body
620            *req.body_mut() = Some(reqwest::Body::from(body_bytes));
621
622            // Execute
623            let resp = client
624                .execute(req)
625                .await
626                .map_err(|e| ConnectorError::io(Box::new(e)))?;
627
628            // Convert response
629            let status = StatusCode::try_from(resp.status().as_u16())
630                .map_err(|e| ConnectorError::other(Box::new(e), None))?;
631            let resp_headers = resp.headers().clone();
632            let body = resp
633                .bytes()
634                .await
635                .map_err(|e| ConnectorError::io(Box::new(e)))?;
636
637            let mut sdk_response = Response::new(status, SdkBody::from(body));
638            for (name, value) in &resp_headers {
639                match value.to_str() {
640                    Ok(value_str) => {
641                        sdk_response
642                            .headers_mut()
643                            .append(name.as_str().to_owned(), value_str.to_owned());
644                    }
645                    Err(_) => {
646                        tracing::warn!("Skipping non-UTF8 response header: {}", name.as_str());
647                    }
648                }
649            }
650
651            Ok(sdk_response)
652        })
653    }
654}
655
656impl HttpClient for ReqwestConnector {
657    fn http_connector(
658        &self,
659        _settings: &HttpConnectorSettings,
660        _components: &RuntimeComponents,
661    ) -> SharedHttpConnector {
662        // NOTE: `ReqwestConnector` is preconfigured (e.g., insecure/CA-bundle options) when it
663        // is constructed, and does not currently apply `HttpConnectorSettings`. This means
664        // behavior in this mode may differ from the default connector w.r.t. SDK HTTP settings.
665        // If alignment is required, map relevant fields from `HttpConnectorSettings` onto the
666        // internal `reqwest::Client` when constructing the connector.
667        SharedHttpConnector::new(self.clone())
668    }
669}
670
671/// S3 client wrapper
672pub struct S3Client {
673    inner: aws_sdk_s3::Client,
674    xml_http_client: reqwest::Client,
675    alias: Alias,
676}
677
678/// Request-level options for delete operations.
679#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
680pub struct DeleteRequestOptions {
681    /// Ask RustFS to permanently delete data instead of creating delete markers.
682    pub force_delete: bool,
683}
684
685impl S3Client {
686    /// Create a new S3 client from an alias configuration
687    pub async fn new(alias: Alias) -> Result<Self> {
688        let endpoint = alias.endpoint.clone();
689        let region = alias.region.clone();
690        let access_key = alias.access_key.clone();
691        let secret_key = alias.secret_key.clone();
692
693        // Build credentials provider
694        let credentials = aws_credential_types::Credentials::new(
695            access_key,
696            secret_key,
697            None, // session token
698            None, // expiry
699            "rc-static-credentials",
700        );
701
702        // Build SDK config loader
703        let mut config_loader = aws_config::defaults(aws_config::BehaviorVersion::latest())
704            .credentials_provider(credentials)
705            .region(aws_config::Region::new(region))
706            .endpoint_url(&endpoint);
707
708        // When insecure mode is enabled or a custom CA bundle is provided, use the reqwest
709        // connector which supports danger_accept_invalid_certs and custom root certificates.
710        if alias.insecure || alias.ca_bundle.is_some() {
711            let connector =
712                ReqwestConnector::new(alias.insecure, alias.ca_bundle.as_deref()).await?;
713            config_loader = config_loader.http_client(connector);
714        }
715
716        let xml_http_client =
717            build_reqwest_client(alias.insecure, alias.ca_bundle.as_deref()).await?;
718        let config = config_loader.load().await;
719
720        // Build S3 client with path-style addressing for compatibility
721        let s3_config = aws_sdk_s3::config::Builder::from(&config)
722            .force_path_style(force_path_style_for_alias(&alias))
723            // Improve compatibility with S3-compatible backends by only sending request
724            // checksums when the operation explicitly requires them.
725            .request_checksum_calculation(
726                aws_sdk_s3::config::RequestChecksumCalculation::WhenRequired,
727            )
728            .response_checksum_validation(
729                aws_sdk_s3::config::ResponseChecksumValidation::WhenRequired,
730            )
731            .build();
732
733        let client = aws_sdk_s3::Client::from_conf(s3_config);
734
735        Ok(Self {
736            inner: client,
737            xml_http_client,
738            alias,
739        })
740    }
741
742    /// Get the underlying aws-sdk-s3 client
743    pub fn inner(&self) -> &aws_sdk_s3::Client {
744        &self.inner
745    }
746
747    /// List a single page of object versions and return pagination metadata.
748    pub async fn list_object_versions_page(
749        &self,
750        path: &RemotePath,
751        max_keys: Option<i32>,
752    ) -> Result<ObjectVersionListResult> {
753        let mut builder = self.inner.list_object_versions().bucket(&path.bucket);
754
755        if !path.key.is_empty() {
756            builder = builder.prefix(&path.key);
757        }
758
759        if let Some(max) = max_keys {
760            builder = builder.max_keys(max);
761        }
762
763        let response = builder.send().await.map_err(|e| {
764            let err_str = e.to_string();
765            if err_str.contains("NotFound") || err_str.contains("NoSuchBucket") {
766                Error::NotFound(format!("Bucket not found: {}", path.bucket))
767            } else {
768                Error::General(format!("list_object_versions: {e}"))
769            }
770        })?;
771
772        let mut items = Vec::new();
773
774        for v in response.versions() {
775            items.push(ObjectVersion {
776                key: v.key().unwrap_or_default().to_string(),
777                version_id: v.version_id().unwrap_or("null").to_string(),
778                is_latest: v.is_latest().unwrap_or(false),
779                is_delete_marker: false,
780                last_modified: v
781                    .last_modified()
782                    .and_then(|dt| Timestamp::from_second(dt.secs()).ok()),
783                size_bytes: v.size(),
784                etag: v.e_tag().map(|s| s.trim_matches('"').to_string()),
785            });
786        }
787
788        for m in response.delete_markers() {
789            items.push(ObjectVersion {
790                key: m.key().unwrap_or_default().to_string(),
791                version_id: m.version_id().unwrap_or("null").to_string(),
792                is_latest: m.is_latest().unwrap_or(false),
793                is_delete_marker: true,
794                last_modified: m
795                    .last_modified()
796                    .and_then(|dt| Timestamp::from_second(dt.secs()).ok()),
797                size_bytes: None,
798                etag: None,
799            });
800        }
801
802        items.sort_by(|a, b| {
803            a.key
804                .cmp(&b.key)
805                .then_with(|| b.last_modified.cmp(&a.last_modified))
806        });
807
808        Ok(ObjectVersionListResult {
809            items,
810            truncated: response.is_truncated().unwrap_or(false),
811            continuation_token: response.next_key_marker().map(ToString::to_string),
812            version_id_marker: response.next_version_id_marker().map(ToString::to_string),
813        })
814    }
815
816    /// Download object content and report downloaded bytes after each received chunk.
817    pub async fn get_object_with_progress(
818        &self,
819        path: &RemotePath,
820        mut on_progress: impl FnMut(u64, Option<u64>) + Send,
821    ) -> Result<Vec<u8>> {
822        let response = self
823            .inner
824            .get_object()
825            .bucket(&path.bucket)
826            .key(&path.key)
827            .send()
828            .await
829            .map_err(|e| {
830                let err_str = e.to_string();
831                if err_str.contains("NotFound") || err_str.contains("NoSuchKey") {
832                    Error::NotFound(path.to_string())
833                } else {
834                    Error::Network(err_str)
835                }
836            })?;
837
838        let content_length = response
839            .content_length()
840            .and_then(|length| u64::try_from(length).ok());
841        let mut data = Vec::with_capacity(
842            content_length
843                .and_then(|length| usize::try_from(length).ok())
844                .unwrap_or_default(),
845        );
846        let mut body = response.body;
847        let mut bytes_downloaded = 0u64;
848
849        while let Some(chunk) = body
850            .try_next()
851            .await
852            .map_err(|e| Error::Network(e.to_string()))?
853        {
854            bytes_downloaded += chunk.len() as u64;
855            data.extend_from_slice(&chunk);
856            on_progress(bytes_downloaded, content_length);
857        }
858
859        Ok(data)
860    }
861
862    /// Delete an object with RustFS-specific request options.
863    pub async fn delete_object_with_options(
864        &self,
865        path: &RemotePath,
866        options: DeleteRequestOptions,
867    ) -> Result<()> {
868        let mut request = self
869            .inner
870            .delete_object()
871            .bucket(&path.bucket)
872            .key(&path.key)
873            .customize();
874
875        if options.force_delete {
876            request = request.mutate_request(|request| {
877                request
878                    .headers_mut()
879                    .insert(RUSTFS_FORCE_DELETE_HEADER, "true");
880            });
881        }
882
883        request.send().await.map_err(|e| {
884            let err_str = Self::format_sdk_error(&e);
885            let is_missing_key = if let aws_sdk_s3::error::SdkError::ServiceError(service_err) = &e
886            {
887                let code = service_err.err().code().or_else(|| {
888                    service_err
889                        .raw()
890                        .headers()
891                        .get("x-amz-error-code")
892                        .and_then(|value| std::str::from_utf8(value.as_bytes()).ok())
893                });
894                matches!(code, Some("NoSuchKey") | Some("NotFound"))
895                    || service_err.raw().status().as_u16() == 404
896            } else {
897                err_str.contains("NotFound") || err_str.contains("NoSuchKey")
898            };
899
900            if is_missing_key {
901                Error::NotFound(path.to_string())
902            } else {
903                Error::Network(err_str)
904            }
905        })?;
906
907        Ok(())
908    }
909
910    /// Delete multiple objects with RustFS-specific request options.
911    pub async fn delete_objects_with_options(
912        &self,
913        bucket: &str,
914        keys: Vec<String>,
915        options: DeleteRequestOptions,
916    ) -> Result<Vec<String>> {
917        use aws_sdk_s3::types::{Delete, ObjectIdentifier};
918
919        if keys.is_empty() {
920            return Ok(vec![]);
921        }
922
923        let objects: Vec<ObjectIdentifier> =
924            keys.iter()
925                .map(|key| {
926                    ObjectIdentifier::builder().key(key).build().map_err(|e| {
927                        Error::General(format!("invalid delete object identifier: {e}"))
928                    })
929                })
930                .collect::<Result<Vec<_>>>()?;
931
932        let delete = Delete::builder()
933            .set_objects(Some(objects))
934            .build()
935            .map_err(|e| Error::General(e.to_string()))?;
936
937        let mut request = self
938            .inner
939            .delete_objects()
940            .bucket(bucket)
941            .delete(delete)
942            .customize();
943
944        if options.force_delete {
945            request = request.mutate_request(|request| {
946                request
947                    .headers_mut()
948                    .insert(RUSTFS_FORCE_DELETE_HEADER, "true");
949            });
950        }
951
952        let response = request
953            .send()
954            .await
955            .map_err(|e| Error::Network(e.to_string()))?;
956
957        let deleted: Vec<String> = response
958            .deleted()
959            .iter()
960            .filter_map(|d| d.key().map(|k| k.to_string()))
961            .collect();
962
963        if !response.errors().is_empty() {
964            let error_keys: Vec<String> = response
965                .errors()
966                .iter()
967                .filter_map(|e| e.key().map(|k| k.to_string()))
968                .collect();
969            tracing::warn!("Failed to delete some objects: {:?}", error_keys);
970        }
971
972        Ok(deleted)
973    }
974
975    /// Format AWS SDK error into a detailed error message
976    fn format_sdk_error<E: std::fmt::Display>(error: &aws_sdk_s3::error::SdkError<E>) -> String {
977        match error {
978            aws_sdk_s3::error::SdkError::ServiceError(service_err) => {
979                let err = service_err.err();
980                let meta = service_err.raw();
981                let mut msg = format!("Service error: {}", err);
982                // Try to extract additional error information from headers
983                if let Some(code) = meta.headers().get("x-amz-error-code")
984                    && let Ok(code_str) = std::str::from_utf8(code.as_bytes())
985                {
986                    msg.push_str(&format!(" (code: {})", code_str));
987                }
988                msg
989            }
990            aws_sdk_s3::error::SdkError::ConstructionFailure(err) => {
991                format!("Request construction failed: {:?}", err)
992            }
993            aws_sdk_s3::error::SdkError::TimeoutError(_) => "Request timeout".to_string(),
994            aws_sdk_s3::error::SdkError::DispatchFailure(err) => {
995                format!("Network dispatch error: {:?}", err)
996            }
997            aws_sdk_s3::error::SdkError::ResponseError(err) => {
998                format!("Response error: {:?}", err)
999            }
1000            _ => error.to_string(),
1001        }
1002    }
1003
1004    fn should_use_multipart(file_size: u64) -> bool {
1005        file_size > SINGLE_PUT_OBJECT_MAX_SIZE
1006    }
1007
1008    fn sha256_hash(body: &[u8]) -> String {
1009        let mut hasher = Sha256::new();
1010        hasher.update(body);
1011        hex::encode(hasher.finalize())
1012    }
1013
1014    fn request_host(&self, url: &reqwest::Url) -> Result<String> {
1015        let host = url
1016            .host_str()
1017            .ok_or_else(|| Error::Network("Missing host in request URL".to_string()))?;
1018        Ok(match url.port() {
1019            Some(port) => format!("{host}:{port}"),
1020            None => host.to_string(),
1021        })
1022    }
1023
1024    fn replication_url(&self, bucket: &str) -> Result<reqwest::Url> {
1025        let mut url =
1026            reqwest::Url::parse(self.alias.endpoint.trim_end_matches('/')).map_err(|e| {
1027                Error::Network(format!("Invalid endpoint '{}': {e}", self.alias.endpoint))
1028            })?;
1029
1030        {
1031            let mut segments = url.path_segments_mut().map_err(|_| {
1032                Error::Network(format!(
1033                    "Endpoint '{}' does not support path-style bucket operations",
1034                    self.alias.endpoint
1035                ))
1036            })?;
1037            segments.pop_if_empty();
1038            segments.push(bucket);
1039        }
1040
1041        url.set_query(Some("replication="));
1042        Ok(url)
1043    }
1044
1045    fn cors_url(&self, bucket: &str) -> Result<reqwest::Url> {
1046        let mut url =
1047            reqwest::Url::parse(self.alias.endpoint.trim_end_matches('/')).map_err(|e| {
1048                Error::Network(format!("Invalid endpoint '{}': {e}", self.alias.endpoint))
1049            })?;
1050
1051        {
1052            let mut segments = url.path_segments_mut().map_err(|_| {
1053                Error::Network(format!(
1054                    "Endpoint '{}' does not support path-style bucket operations",
1055                    self.alias.endpoint
1056                ))
1057            })?;
1058            segments.pop_if_empty();
1059            segments.push(bucket);
1060        }
1061
1062        url.set_query(Some("cors="));
1063        Ok(url)
1064    }
1065
1066    async fn sign_xml_request(
1067        &self,
1068        method: &Method,
1069        url: &str,
1070        headers: &HeaderMap,
1071        body: &[u8],
1072    ) -> Result<HeaderMap> {
1073        let credentials = Credentials::new(
1074            &self.alias.access_key,
1075            &self.alias.secret_key,
1076            None,
1077            None,
1078            "s3-xml-client",
1079        );
1080
1081        let identity = credentials.into();
1082        let mut signing_settings = SigningSettings::default();
1083        signing_settings.signature_location = SignatureLocation::Headers;
1084
1085        let signing_params = v4::SigningParams::builder()
1086            .identity(&identity)
1087            .region(&self.alias.region)
1088            .name(S3_SERVICE_NAME)
1089            .time(std::time::SystemTime::now())
1090            .settings(signing_settings)
1091            .build()
1092            .map_err(|e| Error::Auth(format!("Failed to build signing params: {e}")))?;
1093
1094        let header_pairs: Vec<(&str, &str)> = headers
1095            .iter()
1096            .filter_map(|(k, v)| v.to_str().ok().map(|v| (k.as_str(), v)))
1097            .collect();
1098
1099        let signable_request = SignableRequest::new(
1100            method.as_str(),
1101            url,
1102            header_pairs.into_iter(),
1103            SignableBody::Bytes(body),
1104        )
1105        .map_err(|e| Error::Auth(format!("Failed to create signable request: {e}")))?;
1106
1107        let (signing_instructions, _) = sign(signable_request, &signing_params.into())
1108            .map_err(|e| Error::Auth(format!("Failed to sign request: {e}")))?
1109            .into_parts();
1110
1111        let mut signed_headers = headers.clone();
1112        for (name, value) in signing_instructions.headers() {
1113            let header_name = HeaderName::try_from(name.to_string())
1114                .map_err(|e| Error::Auth(format!("Invalid header name: {e}")))?;
1115            let header_value = HeaderValue::try_from(value.to_string())
1116                .map_err(|e| Error::Auth(format!("Invalid header value: {e}")))?;
1117            signed_headers.insert(header_name, header_value);
1118        }
1119
1120        Ok(signed_headers)
1121    }
1122
1123    async fn xml_request(
1124        &self,
1125        method: Method,
1126        url: reqwest::Url,
1127        content_type: Option<&str>,
1128        body: Option<Vec<u8>>,
1129    ) -> Result<String> {
1130        let body = body.unwrap_or_default();
1131        let mut headers = HeaderMap::new();
1132        headers.insert(
1133            "x-amz-content-sha256",
1134            HeaderValue::from_str(&Self::sha256_hash(&body))
1135                .map_err(|e| Error::Auth(format!("Invalid content hash header: {e}")))?,
1136        );
1137        headers.insert(
1138            "host",
1139            HeaderValue::from_str(&self.request_host(&url)?)
1140                .map_err(|e| Error::Auth(format!("Invalid host header: {e}")))?,
1141        );
1142
1143        if let Some(content_type) = content_type {
1144            headers.insert(
1145                CONTENT_TYPE,
1146                HeaderValue::from_str(content_type)
1147                    .map_err(|e| Error::Auth(format!("Invalid content type header: {e}")))?,
1148            );
1149        }
1150
1151        let signed_headers = self
1152            .sign_xml_request(&method, url.as_str(), &headers, &body)
1153            .await?;
1154
1155        let mut request_builder = self.xml_http_client.request(method, url);
1156        for (name, value) in &signed_headers {
1157            request_builder = request_builder.header(name, value);
1158        }
1159        if !body.is_empty() {
1160            request_builder = request_builder.body(body);
1161        }
1162
1163        let response = request_builder
1164            .send()
1165            .await
1166            .map_err(|e| Error::Network(format!("Request failed: {e}")))?;
1167
1168        let status = response.status();
1169        let text = response
1170            .text()
1171            .await
1172            .map_err(|e| Error::Network(format!("Failed to read response: {e}")))?;
1173
1174        if !status.is_success() {
1175            return Err(Error::Network(format!(
1176                "HTTP {}: {}",
1177                status.as_u16(),
1178                text
1179            )));
1180        }
1181
1182        Ok(text)
1183    }
1184
1185    fn bucket_policy_error_kind(
1186        error_code: Option<&str>,
1187        status_code: Option<u16>,
1188        error_text: &str,
1189    ) -> BucketPolicyErrorKind {
1190        let error_code = error_code.map(|code| code.to_ascii_lowercase());
1191        if matches!(
1192            error_code.as_deref(),
1193            Some("nosuchbucketpolicy") | Some("nosuchpolicy")
1194        ) {
1195            return BucketPolicyErrorKind::MissingPolicy;
1196        }
1197        if matches!(error_code.as_deref(), Some("nosuchbucket")) {
1198            return BucketPolicyErrorKind::MissingBucket;
1199        }
1200
1201        let error_text = error_text.to_ascii_lowercase();
1202        if error_text.contains("nosuchbucketpolicy") || error_text.contains("nosuchpolicy") {
1203            return BucketPolicyErrorKind::MissingPolicy;
1204        }
1205        if error_text.contains("nosuchbucket") {
1206            return BucketPolicyErrorKind::MissingBucket;
1207        }
1208        if status_code == Some(404) {
1209            return BucketPolicyErrorKind::MissingPolicy;
1210        }
1211
1212        BucketPolicyErrorKind::Other
1213    }
1214
1215    fn map_get_bucket_policy_error(
1216        bucket: &str,
1217        kind: BucketPolicyErrorKind,
1218        error_text: &str,
1219    ) -> Result<Option<String>> {
1220        match kind {
1221            BucketPolicyErrorKind::MissingPolicy => Ok(None),
1222            BucketPolicyErrorKind::MissingBucket => {
1223                Err(Error::NotFound(format!("Bucket not found: {bucket}")))
1224            }
1225            BucketPolicyErrorKind::Other => {
1226                Err(Error::Network(format!("get_bucket_policy: {error_text}")))
1227            }
1228        }
1229    }
1230
1231    fn map_delete_bucket_policy_error(
1232        bucket: &str,
1233        kind: BucketPolicyErrorKind,
1234        error_text: &str,
1235    ) -> Result<()> {
1236        match kind {
1237            BucketPolicyErrorKind::MissingPolicy => Ok(()),
1238            BucketPolicyErrorKind::MissingBucket => {
1239                Err(Error::NotFound(format!("Bucket not found: {bucket}")))
1240            }
1241            BucketPolicyErrorKind::Other => Err(Error::General(format!(
1242                "delete_bucket_policy: {error_text}"
1243            ))),
1244        }
1245    }
1246
1247    fn extract_notification_filter(
1248        filter: Option<&aws_sdk_s3::types::NotificationConfigurationFilter>,
1249    ) -> (Option<String>, Option<String>) {
1250        let mut prefix = None;
1251        let mut suffix = None;
1252
1253        if let Some(key_filter) = filter.and_then(|value| value.key()) {
1254            for rule in key_filter.filter_rules() {
1255                match rule.name().map(|name| name.as_str()) {
1256                    Some("prefix") => {
1257                        prefix = rule.value().map(ToString::to_string);
1258                    }
1259                    Some("suffix") => {
1260                        suffix = rule.value().map(ToString::to_string);
1261                    }
1262                    _ => {}
1263                }
1264            }
1265        }
1266
1267        (prefix, suffix)
1268    }
1269
1270    fn build_notification_filter(
1271        prefix: Option<&str>,
1272        suffix: Option<&str>,
1273    ) -> Option<aws_sdk_s3::types::NotificationConfigurationFilter> {
1274        use aws_sdk_s3::types::{FilterRule, FilterRuleName, NotificationConfigurationFilter};
1275
1276        let mut rules = Vec::new();
1277        if let Some(value) = prefix {
1278            let rule = FilterRule::builder()
1279                .name(FilterRuleName::Prefix)
1280                .value(value)
1281                .build();
1282            rules.push(rule);
1283        }
1284        if let Some(value) = suffix {
1285            let rule = FilterRule::builder()
1286                .name(FilterRuleName::Suffix)
1287                .value(value)
1288                .build();
1289            rules.push(rule);
1290        }
1291        if rules.is_empty() {
1292            return None;
1293        }
1294
1295        let key_filter = aws_sdk_s3::types::S3KeyFilter::builder()
1296            .set_filter_rules(Some(rules))
1297            .build();
1298        NotificationConfigurationFilter::builder()
1299            .key(key_filter)
1300            .build()
1301            .into()
1302    }
1303
1304    fn event_list_to_strings(events: &[aws_sdk_s3::types::Event]) -> Vec<String> {
1305        events
1306            .iter()
1307            .map(|event| event.as_str().to_string())
1308            .collect()
1309    }
1310
1311    fn strings_to_event_list(events: &[String]) -> Vec<aws_sdk_s3::types::Event> {
1312        events
1313            .iter()
1314            .map(|event| aws_sdk_s3::types::Event::from(event.as_str()))
1315            .collect()
1316    }
1317
1318    fn notifications_equivalent(
1319        expected: &[BucketNotification],
1320        actual: &[BucketNotification],
1321    ) -> bool {
1322        type CanonicalEntry = (u8, String, Option<String>, Option<String>, Vec<String>);
1323
1324        fn target_order(target: NotificationTarget) -> u8 {
1325            match target {
1326                NotificationTarget::Queue => 0,
1327                NotificationTarget::Topic => 1,
1328                NotificationTarget::Lambda => 2,
1329            }
1330        }
1331
1332        fn canonical(notifications: &[BucketNotification]) -> Vec<CanonicalEntry> {
1333            let mut normalized: Vec<CanonicalEntry> = notifications
1334                .iter()
1335                .map(|item| {
1336                    let mut events = item.events.clone();
1337                    events.sort();
1338                    events.dedup();
1339                    (
1340                        target_order(item.target),
1341                        item.arn.clone(),
1342                        item.prefix.clone(),
1343                        item.suffix.clone(),
1344                        events,
1345                    )
1346                })
1347                .collect();
1348            normalized.sort();
1349            normalized
1350        }
1351
1352        canonical(expected) == canonical(actual)
1353    }
1354
1355    async fn read_next_part(
1356        file: &mut tokio::fs::File,
1357        file_path: &std::path::Path,
1358        buffer: &mut [u8],
1359    ) -> Result<usize> {
1360        let mut total_read = 0usize;
1361        while total_read < buffer.len() {
1362            let bytes_read = file
1363                .read(&mut buffer[total_read..])
1364                .await
1365                .map_err(|e| Error::General(format!("read file '{}': {e}", file_path.display())))?;
1366            if bytes_read == 0 {
1367                break;
1368            }
1369            total_read += bytes_read;
1370        }
1371        Ok(total_read)
1372    }
1373
1374    async fn put_object_single_part_from_path(
1375        &self,
1376        path: &RemotePath,
1377        file_path: &std::path::Path,
1378        content_type: Option<&str>,
1379        file_size: u64,
1380    ) -> Result<ObjectInfo> {
1381        let data = tokio::fs::read(file_path)
1382            .await
1383            .map_err(|e| Error::General(format!("read file '{}': {e}", file_path.display())))?;
1384        let body = aws_sdk_s3::primitives::ByteStream::from(data);
1385
1386        let mut request = self
1387            .inner
1388            .put_object()
1389            .bucket(&path.bucket)
1390            .key(&path.key)
1391            .body(body);
1392
1393        if let Some(ct) = content_type {
1394            request = request.content_type(ct);
1395        }
1396
1397        let response = request
1398            .send()
1399            .await
1400            .map_err(|e| Error::Network(e.to_string()))?;
1401
1402        let mut info = ObjectInfo::file(&path.key, file_size as i64);
1403        if let Some(etag) = response.e_tag() {
1404            info.etag = Some(etag.trim_matches('"').to_string());
1405        }
1406        info.last_modified = Some(jiff::Timestamp::now());
1407
1408        Ok(info)
1409    }
1410
1411    async fn abort_multipart_upload_best_effort(&self, path: &RemotePath, upload_id: &str) {
1412        let _ = self
1413            .inner
1414            .abort_multipart_upload()
1415            .bucket(&path.bucket)
1416            .key(&path.key)
1417            .upload_id(upload_id)
1418            .send()
1419            .await;
1420    }
1421
1422    async fn put_object_multipart_from_path(
1423        &self,
1424        path: &RemotePath,
1425        file_path: &std::path::Path,
1426        content_type: Option<&str>,
1427        file_size: u64,
1428        on_progress: impl Fn(u64) + Send,
1429    ) -> Result<ObjectInfo> {
1430        use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart};
1431
1432        let config = crate::multipart::MultipartConfig::default();
1433        let part_size = config.calculate_part_size(file_size);
1434        let part_buffer_size = usize::try_from(part_size)
1435            .map_err(|_| Error::General(format!("invalid part size: {part_size}")))?;
1436
1437        tracing::debug!(file_size, part_size, "Starting multipart upload");
1438
1439        let mut create_request = self
1440            .inner
1441            .create_multipart_upload()
1442            .bucket(&path.bucket)
1443            .key(&path.key);
1444
1445        if let Some(ct) = content_type {
1446            create_request = create_request.content_type(ct);
1447        }
1448
1449        let create_response = create_request
1450            .send()
1451            .await
1452            .map_err(|e| Error::Network(format!("create multipart upload: {e}")))?;
1453
1454        let upload_id = create_response
1455            .upload_id()
1456            .ok_or_else(|| Error::General("missing upload id from multipart upload".to_string()))?
1457            .to_string();
1458
1459        tracing::debug!(upload_id = %upload_id, "Multipart upload initiated");
1460
1461        let mut file = tokio::fs::File::open(file_path)
1462            .await
1463            .map_err(|e| Error::General(format!("open file '{}': {e}", file_path.display())))?;
1464        let mut completed_parts = Vec::new();
1465        let mut part_number: i32 = 1;
1466        let mut chunk = vec![0u8; part_buffer_size];
1467        let mut bytes_uploaded: u64 = 0;
1468
1469        loop {
1470            let bytes_read = Self::read_next_part(&mut file, file_path, &mut chunk).await?;
1471            if bytes_read == 0 {
1472                break;
1473            }
1474
1475            tracing::debug!(part_number, bytes_read, "Uploading part");
1476
1477            let body = aws_sdk_s3::primitives::ByteStream::from(chunk[..bytes_read].to_vec());
1478            let upload_part_result = self
1479                .inner
1480                .upload_part()
1481                .bucket(&path.bucket)
1482                .key(&path.key)
1483                .upload_id(&upload_id)
1484                .part_number(part_number)
1485                .body(body)
1486                .send()
1487                .await;
1488
1489            let upload_part_response = match upload_part_result {
1490                Ok(response) => response,
1491                Err(e) => {
1492                    tracing::debug!(
1493                        upload_id = %upload_id,
1494                        part_number,
1495                        "Aborting multipart upload due to error"
1496                    );
1497                    self.abort_multipart_upload_best_effort(path, &upload_id)
1498                        .await;
1499                    return Err(Error::Network(format!(
1500                        "upload multipart part {part_number}: {e}"
1501                    )));
1502                }
1503            };
1504
1505            let etag = match upload_part_response.e_tag() {
1506                Some(value) => value.trim_matches('"').to_string(),
1507                None => {
1508                    self.abort_multipart_upload_best_effort(path, &upload_id)
1509                        .await;
1510                    return Err(Error::General(format!(
1511                        "missing ETag for multipart part {part_number}"
1512                    )));
1513                }
1514            };
1515
1516            completed_parts.push(
1517                CompletedPart::builder()
1518                    .part_number(part_number)
1519                    .e_tag(etag)
1520                    .build(),
1521            );
1522
1523            bytes_uploaded += bytes_read as u64;
1524            on_progress(bytes_uploaded);
1525            tracing::debug!(part_number, bytes_uploaded, "Part uploaded");
1526
1527            part_number += 1;
1528        }
1529
1530        let completed_upload = CompletedMultipartUpload::builder()
1531            .set_parts(Some(completed_parts))
1532            .build();
1533        let complete_result = self
1534            .inner
1535            .complete_multipart_upload()
1536            .bucket(&path.bucket)
1537            .key(&path.key)
1538            .upload_id(&upload_id)
1539            .multipart_upload(completed_upload)
1540            .send()
1541            .await;
1542
1543        let complete_response = match complete_result {
1544            Ok(response) => response,
1545            Err(e) => {
1546                tracing::debug!(upload_id = %upload_id, "Attempting to abort multipart upload after completion failure");
1547                self.abort_multipart_upload_best_effort(path, &upload_id)
1548                    .await;
1549                return Err(Error::Network(format!("complete multipart upload: {e}")));
1550            }
1551        };
1552
1553        tracing::debug!("Multipart upload completed");
1554
1555        let mut info = ObjectInfo::file(&path.key, file_size as i64);
1556        if let Some(etag) = complete_response.e_tag() {
1557            info.etag = Some(etag.trim_matches('"').to_string());
1558        }
1559        info.last_modified = Some(jiff::Timestamp::now());
1560
1561        Ok(info)
1562    }
1563
1564    /// Upload a local file path to S3.
1565    ///
1566    /// Uses multipart upload for large files to avoid loading the entire file into memory.
1567    /// Calls `on_progress` after each uploaded part with total bytes sent so far.
1568    pub async fn put_object_from_path(
1569        &self,
1570        path: &RemotePath,
1571        file_path: &std::path::Path,
1572        content_type: Option<&str>,
1573        on_progress: impl Fn(u64) + Send,
1574    ) -> Result<ObjectInfo> {
1575        let metadata = tokio::fs::metadata(file_path).await.map_err(|e| {
1576            Error::General(format!("read metadata for '{}': {e}", file_path.display()))
1577        })?;
1578        if !metadata.is_file() {
1579            return Err(Error::General(format!(
1580                "source is not a file: {}",
1581                file_path.display()
1582            )));
1583        }
1584
1585        let file_size = metadata.len();
1586        if Self::should_use_multipart(file_size) {
1587            self.put_object_multipart_from_path(
1588                path,
1589                file_path,
1590                content_type,
1591                file_size,
1592                on_progress,
1593            )
1594            .await
1595        } else {
1596            self.put_object_single_part_from_path(path, file_path, content_type, file_size)
1597                .await
1598        }
1599    }
1600}
1601
1602fn build_tagging(
1603    tags: std::collections::HashMap<String, String>,
1604) -> Result<aws_sdk_s3::types::Tagging> {
1605    use aws_sdk_s3::types::{Tag, Tagging};
1606
1607    let mut tag_set = Vec::with_capacity(tags.len());
1608    for (key, value) in tags {
1609        let tag = Tag::builder()
1610            .key(key)
1611            .value(value)
1612            .build()
1613            .map_err(|e| Error::General(format!("invalid tag: {e}")))?;
1614        tag_set.push(tag);
1615    }
1616
1617    Tagging::builder()
1618        .set_tag_set(Some(tag_set))
1619        .build()
1620        .map_err(|e| Error::General(format!("invalid tagging payload: {e}")))
1621}
1622
1623#[async_trait]
1624impl ObjectStore for S3Client {
1625    async fn list_buckets(&self) -> Result<Vec<ObjectInfo>> {
1626        let response = self
1627            .inner
1628            .list_buckets()
1629            .send()
1630            .await
1631            .map_err(|e| Error::Network(e.to_string()))?;
1632
1633        let buckets = response
1634            .buckets()
1635            .iter()
1636            .map(|b| {
1637                let mut info = ObjectInfo::bucket(b.name().unwrap_or_default());
1638                if let Some(creation_date) = b.creation_date() {
1639                    info.last_modified = jiff::Timestamp::from_second(creation_date.secs()).ok();
1640                }
1641                info
1642            })
1643            .collect();
1644
1645        Ok(buckets)
1646    }
1647
1648    async fn list_objects(&self, path: &RemotePath, options: ListOptions) -> Result<ListResult> {
1649        let mut request = self.inner.list_objects_v2().bucket(&path.bucket);
1650
1651        // Set prefix
1652        let prefix = if path.key.is_empty() {
1653            options.prefix.clone()
1654        } else if let Some(p) = &options.prefix {
1655            Some(format!("{}{}", path.key, p))
1656        } else {
1657            Some(path.key.clone())
1658        };
1659
1660        if let Some(p) = prefix {
1661            request = request.prefix(p);
1662        }
1663
1664        // Set delimiter (for non-recursive listing)
1665        if !options.recursive {
1666            request = request.delimiter(options.delimiter.as_deref().unwrap_or("/"));
1667        }
1668
1669        // Set max keys
1670        if let Some(max) = options.max_keys {
1671            request = request.max_keys(max);
1672        }
1673
1674        // Set continuation token
1675        if let Some(token) = &options.continuation_token {
1676            request = request.continuation_token(token);
1677        }
1678
1679        let response = request.send().await.map_err(|e| {
1680            let err_str = Self::format_sdk_error(&e);
1681            if err_str.contains("NotFound") || err_str.contains("NoSuchBucket") {
1682                Error::NotFound(format!("Bucket not found: {}", path.bucket))
1683            } else {
1684                Error::Network(err_str)
1685            }
1686        })?;
1687
1688        let mut items = Vec::new();
1689
1690        // Add common prefixes (directories)
1691        for prefix in response.common_prefixes() {
1692            if let Some(p) = prefix.prefix() {
1693                items.push(ObjectInfo::dir(p));
1694            }
1695        }
1696
1697        // Add objects
1698        for object in response.contents() {
1699            let key = object.key().unwrap_or_default().to_string();
1700            let size = object.size().unwrap_or(0);
1701            let mut info = ObjectInfo::file(&key, size);
1702
1703            if let Some(modified) = object.last_modified() {
1704                info.last_modified = jiff::Timestamp::from_second(modified.secs()).ok();
1705            }
1706
1707            if let Some(etag) = object.e_tag() {
1708                info.etag = Some(etag.trim_matches('"').to_string());
1709            }
1710
1711            if let Some(sc) = object.storage_class() {
1712                info.storage_class = Some(sc.as_str().to_string());
1713            }
1714
1715            items.push(info);
1716        }
1717
1718        Ok(ListResult {
1719            items,
1720            truncated: response.is_truncated().unwrap_or(false),
1721            continuation_token: response.next_continuation_token().map(|s| s.to_string()),
1722        })
1723    }
1724
1725    async fn head_object(&self, path: &RemotePath) -> Result<ObjectInfo> {
1726        let response = self
1727            .inner
1728            .head_object()
1729            .bucket(&path.bucket)
1730            .key(&path.key)
1731            .send()
1732            .await
1733            .map_err(|e| {
1734                let err_str = e.to_string();
1735                if err_str.contains("NotFound") || err_str.contains("NoSuchKey") {
1736                    Error::NotFound(path.to_string())
1737                } else {
1738                    Error::Network(err_str)
1739                }
1740            })?;
1741
1742        let size = response.content_length().unwrap_or(0);
1743        let mut info = ObjectInfo::file(&path.key, size);
1744
1745        if let Some(modified) = response.last_modified() {
1746            info.last_modified = jiff::Timestamp::from_second(modified.secs()).ok();
1747        }
1748
1749        if let Some(etag) = response.e_tag() {
1750            info.etag = Some(etag.trim_matches('"').to_string());
1751        }
1752
1753        if let Some(ct) = response.content_type() {
1754            info.content_type = Some(ct.to_string());
1755        }
1756
1757        if let Some(sc) = response.storage_class() {
1758            info.storage_class = Some(sc.as_str().to_string());
1759        }
1760
1761        if let Some(meta) = response.metadata()
1762            && !meta.is_empty()
1763        {
1764            info.metadata = Some(meta.clone());
1765        }
1766
1767        Ok(info)
1768    }
1769
1770    async fn bucket_exists(&self, bucket: &str) -> Result<bool> {
1771        match self.inner.head_bucket().bucket(bucket).send().await {
1772            Ok(_) => Ok(true),
1773            Err(e) => {
1774                // Check HTTP status code for 404 first to avoid unnecessary string formatting
1775                // Some S3-compatible services may return 404 without standard error codes
1776                if let aws_sdk_s3::error::SdkError::ServiceError(ref service_err) = e
1777                    && service_err.raw().status().as_u16() == 404
1778                {
1779                    return Ok(false);
1780                }
1781                let err_str = e.to_string();
1782                if err_str.contains("NotFound") || err_str.contains("NoSuchBucket") {
1783                    return Ok(false);
1784                }
1785                Err(Error::Network(err_str))
1786            }
1787        }
1788    }
1789
1790    async fn create_bucket(&self, bucket: &str) -> Result<()> {
1791        self.inner
1792            .create_bucket()
1793            .bucket(bucket)
1794            .send()
1795            .await
1796            .map_err(|e| Error::Network(e.to_string()))?;
1797
1798        Ok(())
1799    }
1800
1801    async fn delete_bucket(&self, bucket: &str) -> Result<()> {
1802        self.inner
1803            .delete_bucket()
1804            .bucket(bucket)
1805            .send()
1806            .await
1807            .map_err(|e| {
1808                let err_str = Self::format_sdk_error(&e);
1809                if err_str.contains("NotFound") || err_str.contains("NoSuchBucket") {
1810                    Error::NotFound(format!("Bucket not found: {bucket}"))
1811                } else if err_str.contains("BucketNotEmpty") {
1812                    Error::Conflict(err_str)
1813                } else {
1814                    Error::Network(err_str)
1815                }
1816            })?;
1817
1818        Ok(())
1819    }
1820
1821    async fn capabilities(&self) -> Result<Capabilities> {
1822        // Best-effort hints for common S3-compatible backends. `select` is not inferred here
1823        // because `rc sql` determines support from the real request result.
1824        Ok(Capabilities {
1825            versioning: true,
1826            object_lock: false,
1827            tagging: true,
1828            anonymous: true,
1829            select: false,
1830            notifications: true,
1831            lifecycle: true,
1832            replication: true,
1833            cors: true,
1834        })
1835    }
1836
1837    async fn get_object(&self, path: &RemotePath) -> Result<Vec<u8>> {
1838        self.get_object_with_progress(path, |_, _| {}).await
1839    }
1840
1841    async fn put_object(
1842        &self,
1843        path: &RemotePath,
1844        data: Vec<u8>,
1845        content_type: Option<&str>,
1846    ) -> Result<ObjectInfo> {
1847        let size = data.len() as i64;
1848        let body = aws_sdk_s3::primitives::ByteStream::from(data);
1849
1850        let mut request = self
1851            .inner
1852            .put_object()
1853            .bucket(&path.bucket)
1854            .key(&path.key)
1855            .body(body);
1856
1857        if let Some(ct) = content_type {
1858            request = request.content_type(ct);
1859        }
1860
1861        let response = request
1862            .send()
1863            .await
1864            .map_err(|e| Error::Network(e.to_string()))?;
1865
1866        let mut info = ObjectInfo::file(&path.key, size);
1867        if let Some(etag) = response.e_tag() {
1868            info.etag = Some(etag.trim_matches('"').to_string());
1869        }
1870        info.last_modified = Some(jiff::Timestamp::now());
1871
1872        Ok(info)
1873    }
1874
1875    async fn delete_object(&self, path: &RemotePath) -> Result<()> {
1876        self.delete_object_with_options(path, DeleteRequestOptions::default())
1877            .await
1878    }
1879
1880    async fn delete_objects(&self, bucket: &str, keys: Vec<String>) -> Result<Vec<String>> {
1881        self.delete_objects_with_options(bucket, keys, DeleteRequestOptions::default())
1882            .await
1883    }
1884
1885    async fn copy_object(&self, src: &RemotePath, dst: &RemotePath) -> Result<ObjectInfo> {
1886        // Build copy source: bucket/key
1887        let copy_source = format!("{}/{}", src.bucket, src.key);
1888
1889        let response = self
1890            .inner
1891            .copy_object()
1892            .copy_source(&copy_source)
1893            .bucket(&dst.bucket)
1894            .key(&dst.key)
1895            .send()
1896            .await
1897            .map_err(|e| {
1898                let err_str = e.to_string();
1899                if err_str.contains("NotFound") || err_str.contains("NoSuchKey") {
1900                    Error::NotFound(src.to_string())
1901                } else {
1902                    Error::Network(err_str)
1903                }
1904            })?;
1905
1906        // Get size from head_object since copy doesn't return it
1907        let info = self.head_object(dst).await?;
1908
1909        // Update etag from copy response if available
1910        let mut result = info;
1911        if let Some(copy_result) = response.copy_object_result()
1912            && let Some(etag) = copy_result.e_tag()
1913        {
1914            result.etag = Some(etag.trim_matches('"').to_string());
1915        }
1916
1917        Ok(result)
1918    }
1919
1920    async fn presign_get(&self, path: &RemotePath, expires_secs: u64) -> Result<String> {
1921        let config = aws_sdk_s3::presigning::PresigningConfig::builder()
1922            .expires_in(std::time::Duration::from_secs(expires_secs))
1923            .build()
1924            .map_err(|e| Error::General(format!("presign_get config: {e}")))?;
1925
1926        let request = self
1927            .inner
1928            .get_object()
1929            .bucket(&path.bucket)
1930            .key(&path.key)
1931            .presigned(config)
1932            .await
1933            .map_err(|e| Error::General(format!("presign_get: {e}")))?;
1934
1935        Ok(request.uri().to_string())
1936    }
1937
1938    async fn presign_put(
1939        &self,
1940        path: &RemotePath,
1941        expires_secs: u64,
1942        content_type: Option<&str>,
1943    ) -> Result<String> {
1944        let config = aws_sdk_s3::presigning::PresigningConfig::builder()
1945            .expires_in(std::time::Duration::from_secs(expires_secs))
1946            .build()
1947            .map_err(|e| Error::General(format!("presign_put config: {e}")))?;
1948
1949        let mut builder = self.inner.put_object().bucket(&path.bucket).key(&path.key);
1950
1951        if let Some(ct) = content_type {
1952            builder = builder.content_type(ct);
1953        }
1954
1955        let request = builder
1956            .presigned(config)
1957            .await
1958            .map_err(|e| Error::General(format!("presign_put: {e}")))?;
1959
1960        Ok(request.uri().to_string())
1961    }
1962
1963    async fn get_versioning(&self, bucket: &str) -> Result<Option<bool>> {
1964        let response = self
1965            .inner
1966            .get_bucket_versioning()
1967            .bucket(bucket)
1968            .send()
1969            .await
1970            .map_err(|e| Error::General(format!("get_versioning: {e}")))?;
1971
1972        Ok(response
1973            .status()
1974            .map(|s| *s == aws_sdk_s3::types::BucketVersioningStatus::Enabled))
1975    }
1976
1977    async fn set_versioning(&self, bucket: &str, enabled: bool) -> Result<()> {
1978        use aws_sdk_s3::types::{BucketVersioningStatus, VersioningConfiguration};
1979
1980        let status = if enabled {
1981            BucketVersioningStatus::Enabled
1982        } else {
1983            BucketVersioningStatus::Suspended
1984        };
1985
1986        let config = VersioningConfiguration::builder().status(status).build();
1987
1988        self.inner
1989            .put_bucket_versioning()
1990            .bucket(bucket)
1991            .versioning_configuration(config)
1992            .send()
1993            .await
1994            .map_err(|e| Error::General(format!("set_versioning: {e}")))?;
1995
1996        Ok(())
1997    }
1998
1999    async fn list_object_versions(
2000        &self,
2001        path: &RemotePath,
2002        max_keys: Option<i32>,
2003    ) -> Result<Vec<ObjectVersion>> {
2004        Ok(self.list_object_versions_page(path, max_keys).await?.items)
2005    }
2006
2007    async fn get_object_tags(
2008        &self,
2009        path: &RemotePath,
2010    ) -> Result<std::collections::HashMap<String, String>> {
2011        let response = match self
2012            .inner
2013            .get_object_tagging()
2014            .bucket(&path.bucket)
2015            .key(&path.key)
2016            .send()
2017            .await
2018        {
2019            Ok(response) => response,
2020            Err(e) => {
2021                if e.to_string().contains("NoSuchTagSet") {
2022                    return Ok(std::collections::HashMap::new());
2023                }
2024                return Err(Error::General(format!("get_object_tags: {e}")));
2025            }
2026        };
2027
2028        let mut tags = std::collections::HashMap::new();
2029        for tag in response.tag_set() {
2030            let key = tag.key();
2031            let value = tag.value();
2032            tags.insert(key.to_string(), value.to_string());
2033        }
2034
2035        Ok(tags)
2036    }
2037
2038    async fn get_bucket_tags(
2039        &self,
2040        bucket: &str,
2041    ) -> Result<std::collections::HashMap<String, String>> {
2042        let response = match self.inner.get_bucket_tagging().bucket(bucket).send().await {
2043            Ok(response) => response,
2044            Err(e) => {
2045                if e.to_string().contains("NoSuchTagSet") {
2046                    return Ok(std::collections::HashMap::new());
2047                }
2048                return Err(Error::General(format!("get_bucket_tags: {e}")));
2049            }
2050        };
2051
2052        let mut tags = std::collections::HashMap::new();
2053        for tag in response.tag_set() {
2054            let key = tag.key();
2055            let value = tag.value();
2056            tags.insert(key.to_string(), value.to_string());
2057        }
2058
2059        Ok(tags)
2060    }
2061
2062    async fn set_object_tags(
2063        &self,
2064        path: &RemotePath,
2065        tags: std::collections::HashMap<String, String>,
2066    ) -> Result<()> {
2067        let tagging = build_tagging(tags)?;
2068
2069        self.inner
2070            .put_object_tagging()
2071            .bucket(&path.bucket)
2072            .key(&path.key)
2073            .tagging(tagging)
2074            .send()
2075            .await
2076            .map_err(|e| Error::General(format!("set_object_tags: {e}")))?;
2077
2078        Ok(())
2079    }
2080
2081    async fn set_bucket_tags(
2082        &self,
2083        bucket: &str,
2084        tags: std::collections::HashMap<String, String>,
2085    ) -> Result<()> {
2086        let tagging = build_tagging(tags)?;
2087
2088        self.inner
2089            .put_bucket_tagging()
2090            .bucket(bucket)
2091            .tagging(tagging)
2092            .send()
2093            .await
2094            .map_err(|e| Error::General(format!("set_bucket_tags: {e}")))?;
2095
2096        Ok(())
2097    }
2098
2099    async fn delete_object_tags(&self, path: &RemotePath) -> Result<()> {
2100        self.inner
2101            .delete_object_tagging()
2102            .bucket(&path.bucket)
2103            .key(&path.key)
2104            .send()
2105            .await
2106            .map_err(|e| Error::General(format!("delete_object_tags: {e}")))?;
2107
2108        Ok(())
2109    }
2110
2111    async fn delete_bucket_tags(&self, bucket: &str) -> Result<()> {
2112        self.inner
2113            .delete_bucket_tagging()
2114            .bucket(bucket)
2115            .send()
2116            .await
2117            .map_err(|e| Error::General(format!("delete_bucket_tags: {e}")))?;
2118
2119        Ok(())
2120    }
2121
2122    async fn get_bucket_policy(&self, bucket: &str) -> Result<Option<String>> {
2123        let response = match self.inner.get_bucket_policy().bucket(bucket).send().await {
2124            Ok(policy) => policy,
2125            Err(error) => {
2126                let error_text = error.to_string();
2127                let kind = if let aws_sdk_s3::error::SdkError::ServiceError(service_err) = &error {
2128                    let code = service_err
2129                        .raw()
2130                        .headers()
2131                        .get("x-amz-error-code")
2132                        .and_then(|value| std::str::from_utf8(value.as_bytes()).ok());
2133                    let status = Some(service_err.raw().status().as_u16());
2134                    Self::bucket_policy_error_kind(code, status, &error_text)
2135                } else {
2136                    Self::bucket_policy_error_kind(None, None, &error_text)
2137                };
2138                return Self::map_get_bucket_policy_error(bucket, kind, &error_text);
2139            }
2140        };
2141
2142        Ok(response.policy().map(|policy| policy.to_string()))
2143    }
2144
2145    async fn set_bucket_policy(&self, bucket: &str, policy: &str) -> Result<()> {
2146        self.inner
2147            .put_bucket_policy()
2148            .bucket(bucket)
2149            .policy(policy)
2150            .send()
2151            .await
2152            .map_err(|e| Error::General(format!("set_bucket_policy: {e}")))?;
2153
2154        Ok(())
2155    }
2156
2157    async fn delete_bucket_policy(&self, bucket: &str) -> Result<()> {
2158        match self
2159            .inner
2160            .delete_bucket_policy()
2161            .bucket(bucket)
2162            .send()
2163            .await
2164        {
2165            Ok(_) => Ok(()),
2166            Err(e) => {
2167                let error_text = e.to_string();
2168                let kind = if let aws_sdk_s3::error::SdkError::ServiceError(service_err) = &e {
2169                    let code = service_err
2170                        .raw()
2171                        .headers()
2172                        .get("x-amz-error-code")
2173                        .and_then(|value| std::str::from_utf8(value.as_bytes()).ok());
2174                    let status = Some(service_err.raw().status().as_u16());
2175                    Self::bucket_policy_error_kind(code, status, &error_text)
2176                } else {
2177                    Self::bucket_policy_error_kind(None, None, &error_text)
2178                };
2179                Self::map_delete_bucket_policy_error(bucket, kind, &error_text)
2180            }
2181        }
2182    }
2183
2184    async fn get_bucket_cors(&self, bucket: &str) -> Result<Vec<CorsRule>> {
2185        let response = match self.inner.get_bucket_cors().bucket(bucket).send().await {
2186            Ok(response) => response,
2187            Err(error) => {
2188                let error_text = error.to_string();
2189                if error_text.contains("service error")
2190                    && let Ok(url) = self.cors_url(bucket)
2191                {
2192                    match self.xml_request(Method::GET, url, None, None).await {
2193                        Ok(body) => return parse_cors_configuration_xml(&body),
2194                        Err(Error::Network(raw_error))
2195                            if is_missing_cors_configuration_error(&raw_error) =>
2196                        {
2197                            return Ok(Vec::new());
2198                        }
2199                        Err(_) => {}
2200                    }
2201                }
2202
2203                let missing_config =
2204                    if let aws_sdk_s3::error::SdkError::ServiceError(service_err) = &error {
2205                        is_missing_cors_configuration_response(
2206                            service_err.err().code(),
2207                            Some(service_err.raw().status().as_u16()),
2208                            &error_text,
2209                        )
2210                    } else {
2211                        is_missing_cors_configuration_response(None, None, &error_text)
2212                    };
2213
2214                if missing_config {
2215                    return Ok(Vec::new());
2216                }
2217                return Err(Error::General(format!("get_bucket_cors: {error_text}")));
2218            }
2219        };
2220
2221        Ok(response
2222            .cors_rules()
2223            .iter()
2224            .map(sdk_cors_rule_to_core)
2225            .collect())
2226    }
2227
2228    async fn set_bucket_cors(&self, bucket: &str, rules: Vec<CorsRule>) -> Result<()> {
2229        let cors_rules = rules
2230            .iter()
2231            .map(core_cors_rule_to_sdk)
2232            .collect::<Result<Vec<_>>>()?;
2233        let cors_configuration = aws_sdk_s3::types::CorsConfiguration::builder()
2234            .set_cors_rules(Some(cors_rules))
2235            .build()
2236            .map_err(|e| Error::General(format!("build bucket cors config: {e}")))?;
2237
2238        self.inner
2239            .put_bucket_cors()
2240            .bucket(bucket)
2241            .cors_configuration(cors_configuration)
2242            .send()
2243            .await
2244            .map_err(|e| Error::General(format!("set_bucket_cors: {e}")))?;
2245
2246        Ok(())
2247    }
2248
2249    async fn delete_bucket_cors(&self, bucket: &str) -> Result<()> {
2250        match self.inner.delete_bucket_cors().bucket(bucket).send().await {
2251            Ok(_) => Ok(()),
2252            Err(error) => {
2253                let error_text = error.to_string();
2254                if error_text.contains("service error")
2255                    && let Ok(url) = self.cors_url(bucket)
2256                {
2257                    match self.xml_request(Method::DELETE, url, None, None).await {
2258                        Ok(_) => return Ok(()),
2259                        Err(Error::Network(raw_error))
2260                            if is_missing_cors_configuration_error(&raw_error) =>
2261                        {
2262                            return Ok(());
2263                        }
2264                        Err(_) => {}
2265                    }
2266                }
2267
2268                let missing_config =
2269                    if let aws_sdk_s3::error::SdkError::ServiceError(service_err) = &error {
2270                        is_missing_cors_configuration_response(
2271                            service_err.err().code(),
2272                            Some(service_err.raw().status().as_u16()),
2273                            &error_text,
2274                        )
2275                    } else {
2276                        is_missing_cors_configuration_response(None, None, &error_text)
2277                    };
2278
2279                if missing_config {
2280                    Ok(())
2281                } else {
2282                    Err(Error::General(format!("delete_bucket_cors: {error_text}")))
2283                }
2284            }
2285        }
2286    }
2287
2288    async fn get_bucket_notifications(&self, bucket: &str) -> Result<Vec<BucketNotification>> {
2289        let response = self
2290            .inner
2291            .get_bucket_notification_configuration()
2292            .bucket(bucket)
2293            .send()
2294            .await
2295            .map_err(|e| Error::General(format!("get_bucket_notifications: {e}")))?;
2296
2297        let mut rules = Vec::new();
2298
2299        for cfg in response.queue_configurations() {
2300            let (prefix, suffix) = Self::extract_notification_filter(cfg.filter());
2301            rules.push(BucketNotification {
2302                id: cfg.id().map(ToString::to_string),
2303                target: NotificationTarget::Queue,
2304                arn: cfg.queue_arn().to_string(),
2305                events: Self::event_list_to_strings(cfg.events()),
2306                prefix,
2307                suffix,
2308            });
2309        }
2310
2311        for cfg in response.topic_configurations() {
2312            let (prefix, suffix) = Self::extract_notification_filter(cfg.filter());
2313            rules.push(BucketNotification {
2314                id: cfg.id().map(ToString::to_string),
2315                target: NotificationTarget::Topic,
2316                arn: cfg.topic_arn().to_string(),
2317                events: Self::event_list_to_strings(cfg.events()),
2318                prefix,
2319                suffix,
2320            });
2321        }
2322
2323        for cfg in response.lambda_function_configurations() {
2324            let (prefix, suffix) = Self::extract_notification_filter(cfg.filter());
2325            rules.push(BucketNotification {
2326                id: cfg.id().map(ToString::to_string),
2327                target: NotificationTarget::Lambda,
2328                arn: cfg.lambda_function_arn().to_string(),
2329                events: Self::event_list_to_strings(cfg.events()),
2330                prefix,
2331                suffix,
2332            });
2333        }
2334
2335        Ok(rules)
2336    }
2337
2338    async fn set_bucket_notifications(
2339        &self,
2340        bucket: &str,
2341        notifications: Vec<BucketNotification>,
2342    ) -> Result<()> {
2343        use aws_sdk_s3::types::{
2344            LambdaFunctionConfiguration, NotificationConfiguration, QueueConfiguration,
2345            TopicConfiguration,
2346        };
2347
2348        let expected_notifications = notifications.clone();
2349        let mut queues = Vec::new();
2350        let mut topics = Vec::new();
2351        let mut lambdas = Vec::new();
2352
2353        for rule in notifications {
2354            let events = Self::strings_to_event_list(&rule.events);
2355            if events.is_empty() {
2356                return Err(Error::General(format!(
2357                    "set_bucket_notifications: empty event list for target '{}'",
2358                    rule.arn
2359                )));
2360            }
2361
2362            let filter =
2363                Self::build_notification_filter(rule.prefix.as_deref(), rule.suffix.as_deref());
2364
2365            match rule.target {
2366                NotificationTarget::Queue => {
2367                    let mut builder = QueueConfiguration::builder()
2368                        .queue_arn(rule.arn)
2369                        .set_events(Some(events))
2370                        .set_id(rule.id);
2371                    if let Some(filter) = filter {
2372                        builder = builder.filter(filter);
2373                    }
2374                    let queue = builder
2375                        .build()
2376                        .map_err(|e| Error::General(format!("build queue notification: {e}")))?;
2377                    queues.push(queue);
2378                }
2379                NotificationTarget::Topic => {
2380                    let mut builder = TopicConfiguration::builder()
2381                        .topic_arn(rule.arn)
2382                        .set_events(Some(events))
2383                        .set_id(rule.id);
2384                    if let Some(filter) = filter {
2385                        builder = builder.filter(filter);
2386                    }
2387                    let topic = builder
2388                        .build()
2389                        .map_err(|e| Error::General(format!("build topic notification: {e}")))?;
2390                    topics.push(topic);
2391                }
2392                NotificationTarget::Lambda => {
2393                    let mut builder = LambdaFunctionConfiguration::builder()
2394                        .lambda_function_arn(rule.arn)
2395                        .set_events(Some(events))
2396                        .set_id(rule.id);
2397                    if let Some(filter) = filter {
2398                        builder = builder.filter(filter);
2399                    }
2400                    let lambda = builder
2401                        .build()
2402                        .map_err(|e| Error::General(format!("build lambda notification: {e}")))?;
2403                    lambdas.push(lambda);
2404                }
2405            }
2406        }
2407
2408        let config = NotificationConfiguration::builder()
2409            .set_queue_configurations(Some(queues))
2410            .set_topic_configurations(Some(topics))
2411            .set_lambda_function_configurations(Some(lambdas))
2412            .build();
2413
2414        match self
2415            .inner
2416            .put_bucket_notification_configuration()
2417            .bucket(bucket)
2418            .notification_configuration(config)
2419            .send()
2420            .await
2421        {
2422            Ok(_) => {}
2423            Err(error) => {
2424                let error_text = error.to_string();
2425                // RustFS may apply notification configuration but still return a non-AWS
2426                // response envelope that the SDK reports as "service error".
2427                if error_text.contains("service error")
2428                    && let Ok(actual) = self.get_bucket_notifications(bucket).await
2429                    && Self::notifications_equivalent(&expected_notifications, &actual)
2430                {
2431                    return Ok(());
2432                }
2433                return Err(Error::General(format!(
2434                    "set_bucket_notifications: {error_text}"
2435                )));
2436            }
2437        }
2438
2439        Ok(())
2440    }
2441
2442    async fn get_bucket_lifecycle(&self, bucket: &str) -> Result<Vec<LifecycleRule>> {
2443        let response = match self
2444            .inner
2445            .get_bucket_lifecycle_configuration()
2446            .bucket(bucket)
2447            .send()
2448            .await
2449        {
2450            Ok(resp) => resp,
2451            Err(error) => {
2452                let error_text = Self::format_sdk_error(&error);
2453                if error_text.contains("NoSuchLifecycleConfiguration")
2454                    || error_text.contains("lifecycle configuration is not found")
2455                {
2456                    return Ok(Vec::new());
2457                }
2458                return Err(Error::General(format!(
2459                    "get_bucket_lifecycle: {error_text}"
2460                )));
2461            }
2462        };
2463
2464        let mut rules = Vec::new();
2465        for sdk_rule in response.rules() {
2466            let id = sdk_rule.id().unwrap_or("").to_string();
2467            let status = match sdk_rule.status().as_str() {
2468                "Enabled" => rc_core::LifecycleRuleStatus::Enabled,
2469                _ => rc_core::LifecycleRuleStatus::Disabled,
2470            };
2471
2472            let prefix = parse_lifecycle_filter_prefix(sdk_rule.filter());
2473            let tags = parse_lifecycle_filter_tags(sdk_rule.filter());
2474
2475            let expiration = sdk_rule
2476                .expiration()
2477                .map(|exp| rc_core::LifecycleExpiration {
2478                    days: exp.days(),
2479                    date: exp.date().map(|d| d.to_string()),
2480                });
2481
2482            let transition = sdk_rule
2483                .transitions()
2484                .first()
2485                .map(|t| rc_core::LifecycleTransition {
2486                    days: t.days(),
2487                    date: t.date().map(|d| d.to_string()),
2488                    storage_class: t
2489                        .storage_class()
2490                        .map(|sc| sc.as_str().to_string())
2491                        .unwrap_or_default(),
2492                });
2493
2494            let noncurrent_version_expiration =
2495                sdk_rule.noncurrent_version_expiration().map(|nve| {
2496                    rc_core::NoncurrentVersionExpiration {
2497                        noncurrent_days: nve.noncurrent_days().unwrap_or(0),
2498                        newer_noncurrent_versions: nve.newer_noncurrent_versions(),
2499                    }
2500                });
2501
2502            let noncurrent_version_transition = sdk_rule
2503                .noncurrent_version_transitions()
2504                .first()
2505                .map(|nvt| rc_core::NoncurrentVersionTransition {
2506                    noncurrent_days: nvt.noncurrent_days().unwrap_or(0),
2507                    storage_class: nvt
2508                        .storage_class()
2509                        .map(|sc| sc.as_str().to_string())
2510                        .unwrap_or_default(),
2511                });
2512
2513            let abort_incomplete_multipart_upload_days = sdk_rule
2514                .abort_incomplete_multipart_upload()
2515                .and_then(|a| a.days_after_initiation());
2516
2517            let expired_object_delete_marker = sdk_rule
2518                .expiration()
2519                .and_then(|e| e.expired_object_delete_marker())
2520                .filter(|v| *v);
2521
2522            rules.push(LifecycleRule {
2523                id,
2524                status,
2525                prefix,
2526                tags,
2527                expiration,
2528                transition,
2529                noncurrent_version_expiration,
2530                noncurrent_version_transition,
2531                abort_incomplete_multipart_upload_days,
2532                expired_object_delete_marker,
2533            });
2534        }
2535
2536        Ok(rules)
2537    }
2538
2539    async fn set_bucket_lifecycle(&self, bucket: &str, rules: Vec<LifecycleRule>) -> Result<()> {
2540        use aws_sdk_s3::types::{
2541            AbortIncompleteMultipartUpload, BucketLifecycleConfiguration, ExpirationStatus,
2542            LifecycleExpiration as SdkExpiration, LifecycleRule as SdkRule,
2543            NoncurrentVersionExpiration as SdkNve, NoncurrentVersionTransition as SdkNvt,
2544            Transition, TransitionStorageClass,
2545        };
2546
2547        let mut sdk_rules = Vec::new();
2548        for rule in rules {
2549            let status = match rule.status {
2550                rc_core::LifecycleRuleStatus::Enabled => ExpirationStatus::Enabled,
2551                rc_core::LifecycleRuleStatus::Disabled => ExpirationStatus::Disabled,
2552            };
2553
2554            let filter = build_lifecycle_rule_filter(rule.prefix.as_deref(), rule.tags.as_ref())?;
2555
2556            let expiration = rule.expiration.map(|exp| {
2557                let mut builder = SdkExpiration::builder();
2558                if let Some(days) = exp.days {
2559                    builder = builder.days(days);
2560                }
2561                if let Some(ref date_str) = exp.date
2562                    && let Ok(dt) = aws_smithy_types::DateTime::from_str(
2563                        date_str,
2564                        aws_smithy_types::date_time::Format::DateTime,
2565                    )
2566                {
2567                    builder = builder.date(dt);
2568                }
2569                if let Some(true) = rule.expired_object_delete_marker {
2570                    builder = builder.expired_object_delete_marker(true);
2571                }
2572                builder.build()
2573            });
2574
2575            let transitions = rule.transition.map(|t| {
2576                #[allow(deprecated)]
2577                let sc = TransitionStorageClass::from(t.storage_class.as_str());
2578                let mut builder = Transition::builder().storage_class(sc);
2579                if let Some(days) = t.days {
2580                    builder = builder.days(days);
2581                }
2582                if let Some(ref date_str) = t.date
2583                    && let Ok(dt) = aws_smithy_types::DateTime::from_str(
2584                        date_str,
2585                        aws_smithy_types::date_time::Format::DateTime,
2586                    )
2587                {
2588                    builder = builder.date(dt);
2589                }
2590                vec![builder.build()]
2591            });
2592
2593            let nve = rule.noncurrent_version_expiration.map(|nve| {
2594                let mut builder = SdkNve::builder().noncurrent_days(nve.noncurrent_days);
2595                if let Some(newer) = nve.newer_noncurrent_versions {
2596                    builder = builder.newer_noncurrent_versions(newer);
2597                }
2598                builder.build()
2599            });
2600
2601            let nvt = rule.noncurrent_version_transition.map(|nvt| {
2602                let sc = TransitionStorageClass::from(nvt.storage_class.as_str());
2603                let builder = SdkNvt::builder()
2604                    .noncurrent_days(nvt.noncurrent_days)
2605                    .storage_class(sc);
2606                vec![builder.build()]
2607            });
2608
2609            let abort = rule.abort_incomplete_multipart_upload_days.map(|days| {
2610                AbortIncompleteMultipartUpload::builder()
2611                    .days_after_initiation(days)
2612                    .build()
2613            });
2614
2615            let mut builder = SdkRule::builder().id(&rule.id).status(status);
2616            if let Some(filter) = filter {
2617                builder = builder.filter(filter);
2618            }
2619            if let Some(expiration) = expiration {
2620                builder = builder.expiration(expiration);
2621            }
2622            if let Some(transitions) = transitions {
2623                builder = builder.set_transitions(Some(transitions));
2624            }
2625            if let Some(nve) = nve {
2626                builder = builder.noncurrent_version_expiration(nve);
2627            }
2628            if let Some(nvt) = nvt {
2629                builder = builder.set_noncurrent_version_transitions(Some(nvt));
2630            }
2631            if let Some(abort) = abort {
2632                builder = builder.abort_incomplete_multipart_upload(abort);
2633            }
2634
2635            let sdk_rule = builder
2636                .build()
2637                .map_err(|e| Error::General(format!("build lifecycle rule: {e}")))?;
2638            sdk_rules.push(sdk_rule);
2639        }
2640
2641        let config = BucketLifecycleConfiguration::builder()
2642            .set_rules(Some(sdk_rules))
2643            .build()
2644            .map_err(|e| Error::General(format!("build lifecycle config: {e}")))?;
2645
2646        self.inner
2647            .put_bucket_lifecycle_configuration()
2648            .bucket(bucket)
2649            .lifecycle_configuration(config)
2650            .send()
2651            .await
2652            .map_err(|e| {
2653                Error::General(format!(
2654                    "set_bucket_lifecycle: {}",
2655                    Self::format_sdk_error(&e)
2656                ))
2657            })?;
2658
2659        Ok(())
2660    }
2661
2662    async fn delete_bucket_lifecycle(&self, bucket: &str) -> Result<()> {
2663        self.inner
2664            .delete_bucket_lifecycle()
2665            .bucket(bucket)
2666            .send()
2667            .await
2668            .map_err(|e| {
2669                Error::General(format!(
2670                    "delete_bucket_lifecycle: {}",
2671                    Self::format_sdk_error(&e)
2672                ))
2673            })?;
2674        Ok(())
2675    }
2676
2677    async fn restore_object(&self, path: &RemotePath, days: i32) -> Result<()> {
2678        use aws_sdk_s3::types::RestoreRequest;
2679
2680        let request = RestoreRequest::builder().days(days).build();
2681        self.inner
2682            .restore_object()
2683            .bucket(&path.bucket)
2684            .key(&path.key)
2685            .restore_request(request)
2686            .send()
2687            .await
2688            .map_err(|e| {
2689                Error::General(format!("restore_object: {}", Self::format_sdk_error(&e)))
2690            })?;
2691        Ok(())
2692    }
2693
2694    async fn get_bucket_replication(
2695        &self,
2696        bucket: &str,
2697    ) -> Result<Option<ReplicationConfiguration>> {
2698        let url = self.replication_url(bucket)?;
2699        let body = match self.xml_request(Method::GET, url, None, None).await {
2700            Ok(body) => body,
2701            Err(Error::Network(error_text))
2702                if error_text.contains("ReplicationConfigurationNotFound")
2703                    || error_text.contains("replication configuration is not found")
2704                    || error_text.contains("replication not found") =>
2705            {
2706                return Ok(None);
2707            }
2708            Err(error) => {
2709                return Err(Error::General(format!("get_bucket_replication: {error}")));
2710            }
2711        };
2712
2713        parse_replication_configuration_xml(&body).map(Some)
2714    }
2715
2716    async fn set_bucket_replication(
2717        &self,
2718        bucket: &str,
2719        config: ReplicationConfiguration,
2720    ) -> Result<()> {
2721        let url = self.replication_url(bucket)?;
2722        let body = build_replication_configuration_xml(&config).into_bytes();
2723        self.xml_request(Method::PUT, url, Some("application/xml"), Some(body))
2724            .await
2725            .map_err(|e| Error::General(format!("set_bucket_replication: {e}")))?;
2726
2727        Ok(())
2728    }
2729
2730    async fn delete_bucket_replication(&self, bucket: &str) -> Result<()> {
2731        self.inner
2732            .delete_bucket_replication()
2733            .bucket(bucket)
2734            .send()
2735            .await
2736            .map_err(|e| {
2737                Error::General(format!(
2738                    "delete_bucket_replication: {}",
2739                    Self::format_sdk_error(&e)
2740                ))
2741            })?;
2742        Ok(())
2743    }
2744
2745    async fn select_object_content(
2746        &self,
2747        path: &RemotePath,
2748        options: &SelectOptions,
2749        writer: &mut (dyn AsyncWrite + Send + Unpin),
2750    ) -> Result<()> {
2751        crate::select::select_object_content(&self.inner, path, options, writer).await
2752    }
2753}
2754
2755#[cfg(test)]
2756mod tests {
2757    use super::*;
2758    use aws_smithy_http_client::test_util::{CaptureRequestReceiver, capture_request};
2759    use std::collections::HashMap;
2760
2761    fn test_s3_client(
2762        response: Option<http::Response<SdkBody>>,
2763    ) -> (S3Client, CaptureRequestReceiver) {
2764        test_s3_client_with_endpoint("https://example.com", response)
2765    }
2766
2767    fn test_s3_client_with_endpoint(
2768        endpoint: &str,
2769        response: Option<http::Response<SdkBody>>,
2770    ) -> (S3Client, CaptureRequestReceiver) {
2771        let (http_client, request_receiver) = capture_request(response);
2772        let credentials = Credentials::new(
2773            "access-key",
2774            "secret-key",
2775            None,
2776            None,
2777            "rc-test-credentials",
2778        );
2779        let config = aws_sdk_s3::config::Builder::new()
2780            .credentials_provider(credentials)
2781            .endpoint_url(endpoint)
2782            .region(aws_sdk_s3::config::Region::new("us-east-1"))
2783            .force_path_style(true)
2784            .behavior_version_latest()
2785            .http_client(http_client)
2786            .build();
2787
2788        let alias = Alias::new("test", endpoint, "access-key", "secret-key");
2789        let client = S3Client {
2790            inner: aws_sdk_s3::Client::from_conf(config),
2791            xml_http_client: reqwest::Client::new(),
2792            alias,
2793        };
2794
2795        (client, request_receiver)
2796    }
2797
2798    #[test]
2799    fn test_object_info_creation() {
2800        let info = ObjectInfo::file("test.txt", 1024);
2801        assert_eq!(info.key, "test.txt");
2802        assert_eq!(info.size_bytes, Some(1024));
2803    }
2804
2805    #[test]
2806    fn auto_bucket_lookup_uses_dns_for_aliyun_oss_service_endpoint() {
2807        let alias = Alias::new(
2808            "aliyun",
2809            "https://oss-cn-hangzhou.aliyuncs.com",
2810            "access-key",
2811            "secret-key",
2812        );
2813
2814        assert!(!force_path_style_for_alias(&alias));
2815    }
2816
2817    #[test]
2818    fn auto_bucket_lookup_uses_dns_for_aliyun_internal_service_endpoint() {
2819        let alias = Alias::new(
2820            "aliyun",
2821            "https://oss-cn-hangzhou-internal.aliyuncs.com",
2822            "access-key",
2823            "secret-key",
2824        );
2825
2826        assert!(!force_path_style_for_alias(&alias));
2827    }
2828
2829    #[test]
2830    fn auto_bucket_lookup_keeps_path_style_for_custom_endpoint() {
2831        let alias = Alias::new("local", "http://localhost:9000", "access-key", "secret-key");
2832
2833        assert!(force_path_style_for_alias(&alias));
2834    }
2835
2836    #[test]
2837    fn auto_bucket_lookup_keeps_path_style_for_non_oss_aliyun_endpoint() {
2838        let alias = Alias::new(
2839            "aliyun",
2840            "https://ecs-cn-hangzhou.aliyuncs.com",
2841            "access-key",
2842            "secret-key",
2843        );
2844
2845        assert!(force_path_style_for_alias(&alias));
2846    }
2847
2848    #[test]
2849    fn auto_bucket_lookup_keeps_path_style_for_invalid_endpoint() {
2850        let alias = Alias::new("broken", "not a valid endpoint", "access-key", "secret-key");
2851
2852        assert!(force_path_style_for_alias(&alias));
2853    }
2854
2855    #[test]
2856    fn explicit_bucket_lookup_overrides_auto_detection() {
2857        let mut path_alias = Alias::new(
2858            "aliyun",
2859            "https://oss-cn-hangzhou.aliyuncs.com",
2860            "access-key",
2861            "secret-key",
2862        );
2863        path_alias.bucket_lookup = "path".to_string();
2864
2865        let mut dns_alias =
2866            Alias::new("local", "http://localhost:9000", "access-key", "secret-key");
2867        dns_alias.bucket_lookup = "dns".to_string();
2868
2869        assert!(force_path_style_for_alias(&path_alias));
2870        assert!(!force_path_style_for_alias(&dns_alias));
2871    }
2872
2873    #[test]
2874    fn unknown_bucket_lookup_keeps_path_style() {
2875        let mut alias = Alias::new(
2876            "aliyun",
2877            "https://oss-cn-hangzhou.aliyuncs.com",
2878            "access-key",
2879            "secret-key",
2880        );
2881        alias.bucket_lookup = "unexpected".to_string();
2882
2883        assert!(force_path_style_for_alias(&alias));
2884    }
2885
2886    #[test]
2887    fn parse_replication_configuration_xml_reads_delete_replication() {
2888        let body = r#"<?xml version="1.0" encoding="UTF-8"?>
2889<ReplicationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
2890  <Role>arn:rustfs:replication:us-east-1:123:test</Role>
2891  <Rule>
2892    <Status>Enabled</Status>
2893    <Destination>
2894      <Bucket>arn:rustfs:replication:us-east-1:123:dest</Bucket>
2895      <StorageClass>STANDARD</StorageClass>
2896    </Destination>
2897    <ID>rule-1</ID>
2898    <Priority>1</Priority>
2899    <Filter>
2900      <Prefix>logs/</Prefix>
2901    </Filter>
2902    <ExistingObjectReplication>
2903      <Status>Enabled</Status>
2904    </ExistingObjectReplication>
2905    <DeleteMarkerReplication>
2906      <Status>Disabled</Status>
2907    </DeleteMarkerReplication>
2908    <DeleteReplication>
2909      <Status>Enabled</Status>
2910    </DeleteReplication>
2911  </Rule>
2912</ReplicationConfiguration>"#;
2913
2914        let config = parse_replication_configuration_xml(body).expect("parse replication xml");
2915        assert_eq!(config.role, "arn:rustfs:replication:us-east-1:123:test");
2916        assert_eq!(config.rules.len(), 1);
2917        assert_eq!(config.rules[0].id, "rule-1");
2918        assert_eq!(config.rules[0].prefix.as_deref(), Some("logs/"));
2919        assert_eq!(config.rules[0].delete_replication, Some(true));
2920        assert_eq!(config.rules[0].delete_marker_replication, Some(false));
2921        assert_eq!(config.rules[0].existing_object_replication, Some(true));
2922    }
2923
2924    #[test]
2925    fn parse_replication_configuration_xml_preserves_tag_filters() {
2926        let body = r#"<?xml version="1.0" encoding="UTF-8"?>
2927<ReplicationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
2928  <Rule>
2929    <Status>Enabled</Status>
2930    <Destination>
2931      <Bucket>arn:rustfs:replication:us-east-1:123:dest</Bucket>
2932    </Destination>
2933    <ID>tagged-rule</ID>
2934    <Priority>2</Priority>
2935    <Filter>
2936      <And>
2937        <Prefix>logs/</Prefix>
2938        <Tag>
2939          <Key>env</Key>
2940          <Value>prod</Value>
2941        </Tag>
2942        <Tag>
2943          <Key>team</Key>
2944          <Value>core</Value>
2945        </Tag>
2946      </And>
2947    </Filter>
2948  </Rule>
2949</ReplicationConfiguration>"#;
2950
2951        let config = parse_replication_configuration_xml(body).expect("parse replication xml");
2952        let rule = &config.rules[0];
2953        assert_eq!(rule.prefix.as_deref(), Some("logs/"));
2954        let tags = rule.tags.as_ref().expect("tag filters");
2955        assert_eq!(tags.get("env").map(String::as_str), Some("prod"));
2956        assert_eq!(tags.get("team").map(String::as_str), Some("core"));
2957    }
2958
2959    #[test]
2960    fn build_replication_configuration_xml_writes_delete_replication() {
2961        let config = ReplicationConfiguration {
2962            role: "arn:rustfs:replication:us-east-1:123:test".to_string(),
2963            rules: vec![rc_core::ReplicationRule {
2964                id: "rule-1".to_string(),
2965                priority: 1,
2966                status: rc_core::ReplicationRuleStatus::Enabled,
2967                prefix: Some("logs/".to_string()),
2968                tags: None,
2969                destination: rc_core::ReplicationDestination {
2970                    bucket_arn: "arn:rustfs:replication:us-east-1:123:dest".to_string(),
2971                    storage_class: Some("STANDARD".to_string()),
2972                },
2973                delete_marker_replication: Some(true),
2974                existing_object_replication: Some(true),
2975                delete_replication: Some(true),
2976            }],
2977        };
2978
2979        let xml = build_replication_configuration_xml(&config);
2980        assert!(xml.contains("<DeleteReplication><Status>Enabled</Status></DeleteReplication>"));
2981        assert!(xml.contains(
2982            "<ExistingObjectReplication><Status>Enabled</Status></ExistingObjectReplication>"
2983        ));
2984        assert!(xml.contains(
2985            "<DeleteMarkerReplication><Status>Enabled</Status></DeleteMarkerReplication>"
2986        ));
2987        assert!(xml.contains("<Filter><Prefix>logs/</Prefix></Filter>"));
2988    }
2989
2990    #[test]
2991    fn build_replication_configuration_xml_writes_and_tag_filters() {
2992        let mut tags = HashMap::new();
2993        tags.insert("env".to_string(), "prod".to_string());
2994        tags.insert("team".to_string(), "core".to_string());
2995
2996        let config = ReplicationConfiguration {
2997            role: String::new(),
2998            rules: vec![rc_core::ReplicationRule {
2999                id: "rule-1".to_string(),
3000                priority: 1,
3001                status: rc_core::ReplicationRuleStatus::Enabled,
3002                prefix: Some("logs/".to_string()),
3003                tags: Some(tags),
3004                destination: rc_core::ReplicationDestination {
3005                    bucket_arn: "arn:rustfs:replication:us-east-1:123:dest".to_string(),
3006                    storage_class: None,
3007                },
3008                delete_marker_replication: None,
3009                existing_object_replication: None,
3010                delete_replication: None,
3011            }],
3012        };
3013
3014        let xml = build_replication_configuration_xml(&config);
3015        assert!(xml.contains("<Filter><And><Prefix>logs/</Prefix>"));
3016        assert!(xml.contains("<Tag><Key>env</Key><Value>prod</Value></Tag>"));
3017        assert!(xml.contains("<Tag><Key>team</Key><Value>core</Value></Tag>"));
3018    }
3019
3020    #[test]
3021    fn build_lifecycle_rule_filter_preserves_prefix_and_tags() {
3022        let mut tags = HashMap::new();
3023        tags.insert("env".to_string(), "prod".to_string());
3024        tags.insert("team".to_string(), "core".to_string());
3025
3026        let filter = build_lifecycle_rule_filter(Some("logs/"), Some(&tags))
3027            .expect("build lifecycle filter")
3028            .expect("lifecycle filter");
3029
3030        assert_eq!(
3031            parse_lifecycle_filter_prefix(Some(&filter)).as_deref(),
3032            Some("logs/")
3033        );
3034        let parsed_tags = parse_lifecycle_filter_tags(Some(&filter)).expect("parsed tags");
3035        assert_eq!(parsed_tags.get("env").map(String::as_str), Some("prod"));
3036        assert_eq!(parsed_tags.get("team").map(String::as_str), Some("core"));
3037    }
3038
3039    #[test]
3040    fn bucket_policy_error_kind_uses_error_code() {
3041        assert_eq!(
3042            S3Client::bucket_policy_error_kind(Some("NoSuchBucketPolicy"), Some(404), ""),
3043            BucketPolicyErrorKind::MissingPolicy
3044        );
3045        assert_eq!(
3046            S3Client::bucket_policy_error_kind(Some("NoSuchBucket"), Some(404), ""),
3047            BucketPolicyErrorKind::MissingBucket
3048        );
3049    }
3050
3051    #[test]
3052    fn bucket_policy_error_kind_prefers_bucket_not_found_over_404_fallback() {
3053        assert_eq!(
3054            S3Client::bucket_policy_error_kind(None, Some(404), "NoSuchBucket"),
3055            BucketPolicyErrorKind::MissingBucket
3056        );
3057        assert_eq!(
3058            S3Client::bucket_policy_error_kind(None, Some(404), "no details"),
3059            BucketPolicyErrorKind::MissingPolicy
3060        );
3061    }
3062
3063    #[test]
3064    fn bucket_policy_error_mapping_returns_expected_result() {
3065        let get_missing_policy = S3Client::map_get_bucket_policy_error(
3066            "bucket",
3067            BucketPolicyErrorKind::MissingPolicy,
3068            "NoSuchPolicy",
3069        )
3070        .expect("missing policy should map to Ok(None)");
3071        assert!(get_missing_policy.is_none());
3072
3073        match S3Client::map_get_bucket_policy_error(
3074            "bucket",
3075            BucketPolicyErrorKind::MissingBucket,
3076            "NoSuchBucket",
3077        ) {
3078            Err(Error::NotFound(message)) => assert!(message.contains("Bucket not found")),
3079            other => panic!("Expected NotFound for missing bucket, got: {:?}", other),
3080        }
3081
3082        let delete_missing_policy = S3Client::map_delete_bucket_policy_error(
3083            "bucket",
3084            BucketPolicyErrorKind::MissingPolicy,
3085            "NoSuchPolicy",
3086        );
3087        assert!(
3088            delete_missing_policy.is_ok(),
3089            "Missing policy should be treated as successful delete"
3090        );
3091    }
3092
3093    #[test]
3094    fn notification_filter_round_trip_prefix_and_suffix() {
3095        let filter = S3Client::build_notification_filter(Some("logs/"), Some(".json"))
3096            .expect("filter should be built");
3097        let (prefix, suffix) = S3Client::extract_notification_filter(Some(&filter));
3098        assert_eq!(prefix.as_deref(), Some("logs/"));
3099        assert_eq!(suffix.as_deref(), Some(".json"));
3100    }
3101
3102    #[test]
3103    fn notification_filter_none_when_empty() {
3104        assert!(S3Client::build_notification_filter(None, None).is_none());
3105    }
3106
3107    #[test]
3108    fn notifications_equivalent_ignores_order_and_duplicate_events() {
3109        let expected = vec![
3110            BucketNotification {
3111                id: Some("a".to_string()),
3112                target: NotificationTarget::Queue,
3113                arn: "arn:aws:sqs:us-east-1:123456789012:q".to_string(),
3114                events: vec![
3115                    "s3:ObjectCreated:*".to_string(),
3116                    "s3:ObjectCreated:*".to_string(),
3117                ],
3118                prefix: Some("images/".to_string()),
3119                suffix: Some(".jpg".to_string()),
3120            },
3121            BucketNotification {
3122                id: Some("b".to_string()),
3123                target: NotificationTarget::Topic,
3124                arn: "arn:aws:sns:us-east-1:123456789012:t".to_string(),
3125                events: vec!["s3:ObjectRemoved:*".to_string()],
3126                prefix: None,
3127                suffix: None,
3128            },
3129        ];
3130
3131        let actual = vec![
3132            BucketNotification {
3133                id: None,
3134                target: NotificationTarget::Topic,
3135                arn: "arn:aws:sns:us-east-1:123456789012:t".to_string(),
3136                events: vec!["s3:ObjectRemoved:*".to_string()],
3137                prefix: None,
3138                suffix: None,
3139            },
3140            BucketNotification {
3141                id: None,
3142                target: NotificationTarget::Queue,
3143                arn: "arn:aws:sqs:us-east-1:123456789012:q".to_string(),
3144                events: vec!["s3:ObjectCreated:*".to_string()],
3145                prefix: Some("images/".to_string()),
3146                suffix: Some(".jpg".to_string()),
3147            },
3148        ];
3149
3150        assert!(S3Client::notifications_equivalent(&expected, &actual));
3151    }
3152
3153    #[test]
3154    fn sdk_cors_rule_to_core_preserves_optional_fields() {
3155        let sdk_rule = aws_sdk_s3::types::CorsRule::builder()
3156            .id("web-app")
3157            .allowed_origins("https://app.example.com")
3158            .allowed_methods("get")
3159            .allowed_headers("Authorization")
3160            .expose_headers("ETag")
3161            .max_age_seconds(300)
3162            .build()
3163            .expect("build cors rule");
3164
3165        let rule = sdk_cors_rule_to_core(&sdk_rule);
3166        assert_eq!(rule.id.as_deref(), Some("web-app"));
3167        assert_eq!(
3168            rule.allowed_origins,
3169            vec!["https://app.example.com".to_string()]
3170        );
3171        assert_eq!(rule.allowed_methods, vec!["get".to_string()]);
3172        assert_eq!(
3173            rule.allowed_headers,
3174            Some(vec!["Authorization".to_string()])
3175        );
3176        assert_eq!(rule.expose_headers, Some(vec!["ETag".to_string()]));
3177        assert_eq!(rule.max_age_seconds, Some(300));
3178    }
3179
3180    #[test]
3181    fn sdk_cors_rule_to_core_drops_empty_optional_headers() {
3182        let sdk_rule = aws_sdk_s3::types::CorsRule::builder()
3183            .allowed_origins("https://app.example.com")
3184            .allowed_methods("GET")
3185            .build()
3186            .expect("build cors rule");
3187
3188        let rule = sdk_cors_rule_to_core(&sdk_rule);
3189        assert_eq!(rule.allowed_headers, None);
3190        assert_eq!(rule.expose_headers, None);
3191    }
3192
3193    #[test]
3194    fn core_cors_rule_to_sdk_normalizes_method_case() {
3195        let rule = CorsRule {
3196            id: Some("public-read".to_string()),
3197            allowed_origins: vec!["*".to_string()],
3198            allowed_methods: vec!["get".to_string(), "post".to_string()],
3199            allowed_headers: Some(vec!["*".to_string()]),
3200            expose_headers: None,
3201            max_age_seconds: Some(600),
3202        };
3203
3204        let sdk_rule = core_cors_rule_to_sdk(&rule).expect("convert cors rule");
3205        assert_eq!(sdk_rule.id(), Some("public-read"));
3206        assert_eq!(sdk_rule.allowed_origins(), ["*"]);
3207        assert_eq!(sdk_rule.allowed_methods(), ["GET", "POST"]);
3208        assert_eq!(sdk_rule.allowed_headers(), ["*"]);
3209        assert_eq!(sdk_rule.max_age_seconds(), Some(600));
3210    }
3211
3212    #[tokio::test]
3213    async fn set_bucket_cors_sends_rule_fields() {
3214        let response = http::Response::builder()
3215            .status(200)
3216            .body(SdkBody::from(""))
3217            .expect("build put bucket cors response");
3218        let (client, request_receiver) = test_s3_client(Some(response));
3219
3220        client
3221            .set_bucket_cors(
3222                "bucket",
3223                vec![CorsRule {
3224                    id: Some("web-app".to_string()),
3225                    allowed_origins: vec!["https://app.example.com".to_string()],
3226                    allowed_methods: vec!["GET".to_string(), "POST".to_string()],
3227                    allowed_headers: Some(vec!["Authorization".to_string()]),
3228                    expose_headers: Some(vec!["ETag".to_string()]),
3229                    max_age_seconds: Some(600),
3230                }],
3231            )
3232            .await
3233            .expect("set bucket cors");
3234
3235        let request = request_receiver.expect_request();
3236        assert_eq!(request.method(), http::Method::PUT);
3237        assert!(
3238            request.uri().to_string().contains("?cors"),
3239            "expected bucket CORS subresource in URI: {}",
3240            request.uri()
3241        );
3242
3243        let body = request.body().bytes().expect("request body bytes");
3244        let body = std::str::from_utf8(body).expect("request body is utf8");
3245        assert!(body.contains("<ID>web-app</ID>"));
3246        assert!(body.contains("<AllowedOrigin>https://app.example.com</AllowedOrigin>"));
3247        assert!(body.contains("<AllowedMethod>GET</AllowedMethod>"));
3248        assert!(body.contains("<AllowedMethod>POST</AllowedMethod>"));
3249        assert!(body.contains("<AllowedHeader>Authorization</AllowedHeader>"));
3250        assert!(body.contains("<ExposeHeader>ETag</ExposeHeader>"));
3251        assert!(body.contains("<MaxAgeSeconds>600</MaxAgeSeconds>"));
3252    }
3253
3254    #[test]
3255    fn core_cors_rule_to_sdk_drops_empty_optional_headers() {
3256        let rule = CorsRule {
3257            id: None,
3258            allowed_origins: vec!["https://app.example.com".to_string()],
3259            allowed_methods: vec!["GET".to_string()],
3260            allowed_headers: Some(Vec::new()),
3261            expose_headers: Some(Vec::new()),
3262            max_age_seconds: None,
3263        };
3264
3265        let sdk_rule = core_cors_rule_to_sdk(&rule).expect("convert cors rule");
3266        assert!(sdk_rule.allowed_headers().is_empty());
3267        assert!(sdk_rule.expose_headers().is_empty());
3268    }
3269
3270    #[test]
3271    fn parse_cors_configuration_xml_round_trips_rule_fields() {
3272        let body = r#"
3273<CORSConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
3274  <CORSRule>
3275    <ID>mc-rule</ID>
3276    <AllowedOrigin>https://console.example.com</AllowedOrigin>
3277    <AllowedMethod>GET</AllowedMethod>
3278    <AllowedMethod>POST</AllowedMethod>
3279    <AllowedHeader>*</AllowedHeader>
3280    <ExposeHeader>ETag</ExposeHeader>
3281    <MaxAgeSeconds>1200</MaxAgeSeconds>
3282  </CORSRule>
3283</CORSConfiguration>
3284"#;
3285
3286        let rules = parse_cors_configuration_xml(body).expect("parse cors xml");
3287        assert_eq!(rules.len(), 1);
3288        assert_eq!(rules[0].id.as_deref(), Some("mc-rule"));
3289        assert_eq!(
3290            rules[0].allowed_origins,
3291            vec!["https://console.example.com".to_string()]
3292        );
3293        assert_eq!(
3294            rules[0].allowed_methods,
3295            vec!["GET".to_string(), "POST".to_string()]
3296        );
3297        assert_eq!(rules[0].allowed_headers, Some(vec!["*".to_string()]));
3298        assert_eq!(rules[0].expose_headers, Some(vec!["ETag".to_string()]));
3299        assert_eq!(rules[0].max_age_seconds, Some(1200));
3300    }
3301
3302    #[test]
3303    fn cors_url_uses_path_style_bucket_and_query() {
3304        let (client, _) = test_s3_client(None);
3305
3306        let url = client.cors_url("bucket-name").expect("build cors url");
3307
3308        assert_eq!(url.as_str(), "https://example.com/bucket-name?cors=");
3309    }
3310
3311    #[test]
3312    fn cors_url_rejects_endpoints_without_path_segments() {
3313        let (client, _) = test_s3_client_with_endpoint("mailto:test@example.com", None);
3314
3315        match client.cors_url("bucket-name") {
3316            Err(Error::Network(message)) => {
3317                assert!(message.contains("does not support path-style bucket operations"));
3318            }
3319            other => panic!("expected path-style endpoint error, got {other:?}"),
3320        }
3321    }
3322
3323    #[test]
3324    fn missing_cors_configuration_errors_are_detected() {
3325        assert!(is_missing_cors_configuration_error(
3326            "NoSuchCORSConfiguration"
3327        ));
3328        assert!(is_missing_cors_configuration_error(
3329            "The CORS configuration does not exist"
3330        ));
3331        assert!(!is_missing_cors_configuration_error("AccessDenied"));
3332    }
3333
3334    #[test]
3335    fn missing_cors_configuration_response_detects_code_and_status() {
3336        assert!(is_missing_cors_configuration_response(
3337            Some("NoSuchCORSConfiguration"),
3338            Some(404),
3339            "service error"
3340        ));
3341        assert!(is_missing_cors_configuration_response(
3342            None,
3343            Some(404),
3344            "The CORS configuration does not exist"
3345        ));
3346        assert!(!is_missing_cors_configuration_response(
3347            Some("AccessDenied"),
3348            Some(403),
3349            "access denied"
3350        ));
3351        assert!(!is_missing_cors_configuration_response(
3352            None,
3353            Some(404),
3354            "service error"
3355        ));
3356        assert!(!is_missing_cors_configuration_response(
3357            Some("NoSuchBucket"),
3358            Some(404),
3359            "NoSuchBucket"
3360        ));
3361        assert!(is_missing_cors_configuration_response(
3362            None,
3363            None,
3364            "The CORS configuration does not exist"
3365        ));
3366        assert!(!is_missing_cors_configuration_response(
3367            None,
3368            Some(500),
3369            "The CORS configuration does not exist"
3370        ));
3371    }
3372
3373    #[tokio::test]
3374    async fn get_bucket_cors_missing_configuration_returns_empty_rules() {
3375        let response = http::Response::builder()
3376            .status(404)
3377            .header("x-amz-error-code", "NoSuchCORSConfiguration")
3378            .body(SdkBody::from(
3379                r#"<?xml version="1.0" encoding="UTF-8"?>
3380<Error>
3381  <Code>NoSuchCORSConfiguration</Code>
3382  <Message>The CORS configuration does not exist</Message>
3383</Error>"#,
3384            ))
3385            .expect("build missing cors response");
3386        let (client, _) = test_s3_client(Some(response));
3387
3388        let rules = client
3389            .get_bucket_cors("bucket")
3390            .await
3391            .expect("missing cors config should be treated as empty");
3392
3393        assert!(rules.is_empty());
3394    }
3395
3396    #[tokio::test]
3397    async fn delete_bucket_cors_missing_configuration_is_successful() {
3398        let response = http::Response::builder()
3399            .status(404)
3400            .header("x-amz-error-code", "NoSuchCORSConfiguration")
3401            .body(SdkBody::from(
3402                r#"<?xml version="1.0" encoding="UTF-8"?>
3403<Error>
3404  <Code>NoSuchCORSConfiguration</Code>
3405  <Message>The CORS configuration does not exist</Message>
3406</Error>"#,
3407            ))
3408            .expect("build missing cors response");
3409        let (client, _) = test_s3_client(Some(response));
3410
3411        client
3412            .delete_bucket_cors("bucket")
3413            .await
3414            .expect("missing cors config should be treated as successful delete");
3415    }
3416
3417    #[tokio::test]
3418    async fn reqwest_connector_insecure_without_ca_bundle_succeeds() {
3419        // When insecure is true and no CA bundle is provided, the connector should be created.
3420        let connector = ReqwestConnector::new(true, None).await;
3421        assert!(
3422            connector.is_ok(),
3423            "Expected insecure connector creation to succeed"
3424        );
3425    }
3426
3427    #[tokio::test]
3428    async fn reqwest_connector_invalid_ca_bundle_path_surfaces_error() {
3429        // Use an obviously invalid path (empty string) to trigger a read error.
3430        let result = ReqwestConnector::new(false, Some("")).await;
3431        match result {
3432            Err(Error::Network(msg)) => {
3433                assert!(
3434                    msg.contains("Failed to read CA bundle"),
3435                    "Unexpected error message: {msg}"
3436                );
3437            }
3438            other => panic!("Expected Error::Network for invalid path, got: {:?}", other),
3439        }
3440    }
3441
3442    #[test]
3443    fn should_use_multipart_for_large_files() {
3444        assert!(S3Client::should_use_multipart(
3445            SINGLE_PUT_OBJECT_MAX_SIZE + 1
3446        ));
3447    }
3448
3449    #[test]
3450    fn should_use_single_part_for_small_files() {
3451        assert!(!S3Client::should_use_multipart(0));
3452        assert!(!S3Client::should_use_multipart(1024 * 1024));
3453        assert!(!S3Client::should_use_multipart(
3454            crate::multipart::DEFAULT_PART_SIZE
3455        ));
3456        assert!(!S3Client::should_use_multipart(SINGLE_PUT_OBJECT_MAX_SIZE));
3457    }
3458
3459    #[tokio::test]
3460    async fn delete_object_with_force_delete_sets_rustfs_header() {
3461        let (client, request_receiver) = test_s3_client(None);
3462        let path = RemotePath::new("test", "bucket", "key.txt");
3463
3464        let _ = client
3465            .delete_object_with_options(&path, DeleteRequestOptions { force_delete: true })
3466            .await;
3467
3468        let request = request_receiver.expect_request();
3469        assert_eq!(request.headers().get("x-rustfs-force-delete"), Some("true"));
3470    }
3471
3472    #[tokio::test]
3473    async fn delete_object_without_force_delete_omits_rustfs_header() {
3474        let (client, request_receiver) = test_s3_client(None);
3475        let path = RemotePath::new("test", "bucket", "key.txt");
3476
3477        let _ = client
3478            .delete_object_with_options(&path, DeleteRequestOptions::default())
3479            .await;
3480
3481        let request = request_receiver.expect_request();
3482        assert!(request.headers().get("x-rustfs-force-delete").is_none());
3483    }
3484
3485    #[tokio::test]
3486    async fn delete_object_wrapper_uses_default_options_without_rustfs_header() {
3487        let (client, request_receiver) = test_s3_client(None);
3488        let path = RemotePath::new("test", "bucket", "key.txt");
3489
3490        let _ = ObjectStore::delete_object(&client, &path).await;
3491
3492        let request = request_receiver.expect_request();
3493        assert!(request.headers().get("x-rustfs-force-delete").is_none());
3494    }
3495
3496    #[tokio::test]
3497    async fn delete_object_with_options_maps_missing_keys_to_not_found() {
3498        let response = http::Response::builder()
3499            .status(404)
3500            .header("x-amz-error-code", "NoSuchKey")
3501            .body(SdkBody::from(
3502                r#"<?xml version="1.0" encoding="UTF-8"?>
3503<Error>
3504  <Code>NoSuchKey</Code>
3505  <Message>The specified key does not exist.</Message>
3506</Error>"#,
3507            ))
3508            .expect("build delete object response");
3509        let (client, _request_receiver) = test_s3_client(Some(response));
3510        let path = RemotePath::new("test", "bucket", "missing.txt");
3511
3512        let result = client
3513            .delete_object_with_options(&path, DeleteRequestOptions::default())
3514            .await;
3515
3516        match result {
3517            Err(Error::NotFound(message)) => assert_eq!(message, path.to_string()),
3518            other => panic!("Expected NotFound for missing key, got: {other:?}"),
3519        }
3520    }
3521
3522    #[tokio::test]
3523    async fn delete_object_with_options_maps_other_failures_to_network() {
3524        let response = http::Response::builder()
3525            .status(500)
3526            .header("x-amz-error-code", "InternalError")
3527            .body(SdkBody::from(
3528                r#"<?xml version="1.0" encoding="UTF-8"?>
3529<Error>
3530  <Code>InternalError</Code>
3531  <Message>Something went wrong.</Message>
3532</Error>"#,
3533            ))
3534            .expect("build delete object response");
3535        let (client, _request_receiver) = test_s3_client(Some(response));
3536        let path = RemotePath::new("test", "bucket", "key.txt");
3537
3538        let result = client
3539            .delete_object_with_options(&path, DeleteRequestOptions::default())
3540            .await;
3541
3542        match result {
3543            Err(Error::Network(message)) => assert!(message.contains("InternalError")),
3544            other => panic!("Expected Network for delete failure, got: {other:?}"),
3545        }
3546    }
3547
3548    #[tokio::test]
3549    async fn delete_bucket_maps_bucket_not_empty_to_conflict() {
3550        let response = http::Response::builder()
3551            .status(409)
3552            .header("x-amz-error-code", "BucketNotEmpty")
3553            .body(SdkBody::from(
3554                r#"<?xml version="1.0" encoding="UTF-8"?>
3555<Error>
3556  <Code>BucketNotEmpty</Code>
3557  <Message>The bucket you tried to delete is not empty.</Message>
3558</Error>"#,
3559            ))
3560            .expect("build delete bucket response");
3561        let (client, _request_receiver) = test_s3_client(Some(response));
3562
3563        let result = client.delete_bucket("bucket").await;
3564
3565        match result {
3566            Err(Error::Conflict(message)) => assert!(message.contains("BucketNotEmpty")),
3567            other => panic!("Expected Conflict for non-empty bucket, got: {other:?}"),
3568        }
3569    }
3570
3571    #[tokio::test]
3572    async fn delete_bucket_maps_missing_bucket_to_not_found() {
3573        let response = http::Response::builder()
3574            .status(404)
3575            .header("x-amz-error-code", "NoSuchBucket")
3576            .body(SdkBody::from(
3577                r#"<?xml version="1.0" encoding="UTF-8"?>
3578<Error>
3579  <Code>NoSuchBucket</Code>
3580  <Message>The specified bucket does not exist.</Message>
3581</Error>"#,
3582            ))
3583            .expect("build missing bucket response");
3584        let (client, _request_receiver) = test_s3_client(Some(response));
3585
3586        let result = client.delete_bucket("missing-bucket").await;
3587
3588        match result {
3589            Err(Error::NotFound(message)) => {
3590                assert_eq!(message, "Bucket not found: missing-bucket")
3591            }
3592            other => panic!("Expected NotFound for missing bucket, got: {other:?}"),
3593        }
3594    }
3595
3596    #[tokio::test]
3597    async fn delete_objects_with_force_delete_sets_rustfs_header() {
3598        let response = http::Response::builder()
3599            .status(200)
3600            .body(SdkBody::from(
3601                r#"<?xml version="1.0" encoding="UTF-8"?>
3602<DeleteResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/" />"#,
3603            ))
3604            .expect("build delete objects response");
3605        let (client, request_receiver) = test_s3_client(Some(response));
3606
3607        let _ = client
3608            .delete_objects_with_options(
3609                "bucket",
3610                vec!["key.txt".to_string()],
3611                DeleteRequestOptions { force_delete: true },
3612            )
3613            .await;
3614
3615        let request = request_receiver.expect_request();
3616        assert_eq!(request.headers().get("x-rustfs-force-delete"), Some("true"));
3617    }
3618
3619    #[tokio::test]
3620    async fn delete_objects_without_force_delete_omits_rustfs_header() {
3621        let response = http::Response::builder()
3622            .status(200)
3623            .body(SdkBody::from(
3624                r#"<?xml version="1.0" encoding="UTF-8"?>
3625<DeleteResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/" />"#,
3626            ))
3627            .expect("build delete objects response");
3628        let (client, request_receiver) = test_s3_client(Some(response));
3629
3630        let _ = client
3631            .delete_objects_with_options(
3632                "bucket",
3633                vec!["key.txt".to_string()],
3634                DeleteRequestOptions::default(),
3635            )
3636            .await;
3637
3638        let request = request_receiver.expect_request();
3639        assert!(request.headers().get("x-rustfs-force-delete").is_none());
3640    }
3641
3642    #[tokio::test]
3643    async fn delete_objects_wrapper_uses_default_options_without_rustfs_header() {
3644        let response = http::Response::builder()
3645            .status(200)
3646            .body(SdkBody::from(
3647                r#"<?xml version="1.0" encoding="UTF-8"?>
3648<DeleteResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/" />"#,
3649            ))
3650            .expect("build delete objects response");
3651        let (client, request_receiver) = test_s3_client(Some(response));
3652
3653        let _ = ObjectStore::delete_objects(&client, "bucket", vec!["key.txt".to_string()]).await;
3654
3655        let request = request_receiver.expect_request();
3656        assert!(request.headers().get("x-rustfs-force-delete").is_none());
3657    }
3658
3659    #[tokio::test]
3660    async fn delete_objects_with_empty_keys_skips_http_request() {
3661        let (client, request_receiver) = test_s3_client(None);
3662
3663        let deleted = client
3664            .delete_objects_with_options("bucket", Vec::new(), DeleteRequestOptions::default())
3665            .await
3666            .expect("empty delete should succeed");
3667
3668        assert!(deleted.is_empty());
3669        request_receiver.expect_no_request();
3670    }
3671
3672    #[tokio::test]
3673    async fn delete_objects_with_partial_errors_returns_deleted_keys() {
3674        let response = http::Response::builder()
3675            .status(200)
3676            .body(SdkBody::from(
3677                r#"<?xml version="1.0" encoding="UTF-8"?>
3678<DeleteResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
3679  <Deleted>
3680    <Key>kept.txt</Key>
3681  </Deleted>
3682  <Error>
3683    <Key>failed.txt</Key>
3684    <Code>AccessDenied</Code>
3685    <Message>Access Denied</Message>
3686  </Error>
3687</DeleteResult>"#,
3688            ))
3689            .expect("build partial delete response");
3690        let (client, request_receiver) = test_s3_client(Some(response));
3691
3692        let deleted = client
3693            .delete_objects_with_options(
3694                "bucket",
3695                vec!["kept.txt".to_string(), "failed.txt".to_string()],
3696                DeleteRequestOptions::default(),
3697            )
3698            .await
3699            .expect("partial delete should still return deleted keys");
3700
3701        let request = request_receiver.expect_request();
3702        assert_eq!(request.uri(), "https://example.com/bucket/?delete");
3703        assert_eq!(deleted, vec!["kept.txt".to_string()]);
3704    }
3705
3706    #[tokio::test]
3707    async fn read_next_part_fills_buffer_until_eof() {
3708        use tokio::io::AsyncWriteExt;
3709
3710        let temp_dir = tempfile::tempdir().expect("create temp dir");
3711        let file_path = temp_dir.path().join("payload.bin");
3712        let mut writer = tokio::fs::File::create(&file_path)
3713            .await
3714            .expect("create temp file");
3715        writer
3716            .write_all(b"abcdefghij")
3717            .await
3718            .expect("write temp file");
3719        writer.flush().await.expect("flush temp file");
3720        drop(writer);
3721
3722        let mut reader = tokio::fs::File::open(&file_path)
3723            .await
3724            .expect("open temp file");
3725        let mut buffer = vec![0u8; 8];
3726
3727        let first = S3Client::read_next_part(&mut reader, &file_path, &mut buffer)
3728            .await
3729            .expect("first read");
3730        assert_eq!(first, 8);
3731        assert_eq!(&buffer[..first], b"abcdefgh");
3732
3733        let second = S3Client::read_next_part(&mut reader, &file_path, &mut buffer)
3734            .await
3735            .expect("second read");
3736        assert_eq!(second, 2);
3737        assert_eq!(&buffer[..second], b"ij");
3738
3739        let third = S3Client::read_next_part(&mut reader, &file_path, &mut buffer)
3740            .await
3741            .expect("third read");
3742        assert_eq!(third, 0);
3743    }
3744}