Skip to main content

fakecloud_s3/service/
mod.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use bytes::Bytes;
5use chrono::{DateTime, Timelike, Utc};
6use http::{HeaderMap, Method, StatusCode};
7use md5::{Digest, Md5};
8
9use fakecloud_aws::arn::Arn;
10use fakecloud_core::delivery::DeliveryBus;
11use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
12use fakecloud_kms::SharedKmsState;
13use fakecloud_persistence::{MemoryS3Store, S3Store, StoreError};
14
15use base64::engine::general_purpose::STANDARD as BASE64;
16use base64::Engine as _;
17
18use crate::logging;
19use crate::state::{AclGrant, S3Bucket, S3Object, SharedS3State};
20
21mod access_points;
22mod acl;
23mod buckets;
24mod config;
25mod lock;
26mod multipart;
27mod notifications;
28mod objects;
29mod tags;
30
31// Re-export notification helpers for use in sub-modules
32#[cfg(test)]
33use notifications::replicate_object;
34pub(super) use notifications::{
35    deliver_notifications, normalize_notification_ids, normalize_replication_xml,
36    replicate_through_store,
37};
38
39// Used only within this file (parse_cors_config)
40use notifications::extract_all_xml_values;
41
42// Re-exports used only in tests
43#[cfg(test)]
44use notifications::{
45    event_matches, key_matches_filters, parse_notification_config, parse_replication_rules,
46    NotificationTargetType,
47};
48
49pub struct S3Service {
50    state: SharedS3State,
51    delivery: Arc<DeliveryBus>,
52    kms_state: Option<SharedKmsState>,
53    pub(crate) kms_hook: Option<Arc<dyn fakecloud_core::delivery::KmsHook>>,
54    store: Arc<dyn S3Store>,
55}
56
57/// Map a [`StoreError`] from the persistence layer to a 500 InternalError
58/// response. Invoked at every mutation site when the write-through persistence
59/// call fails: the in-memory mutation has already happened, but we surface the
60/// failure to the client so they know to retry (and so logs/metrics flag it).
61pub(crate) fn persistence_error(err: StoreError) -> AwsServiceError {
62    AwsServiceError::aws_error(
63        StatusCode::INTERNAL_SERVER_ERROR,
64        "InternalError",
65        format!("persistence store error: {err}"),
66    )
67}
68
69/// Convert a filesystem IO error from a disk-backed body read into an
70/// InternalError response.
71pub(crate) fn io_to_aws(err: std::io::Error) -> AwsServiceError {
72    AwsServiceError::aws_error(
73        StatusCode::INTERNAL_SERVER_ERROR,
74        "InternalError",
75        format!("failed to read object body from disk: {err}"),
76    )
77}
78
79impl S3Service {
80    pub fn new(state: SharedS3State, delivery: Arc<DeliveryBus>) -> Self {
81        Self::with_store(state, delivery, Arc::new(MemoryS3Store::new()))
82    }
83
84    pub fn with_store(
85        state: SharedS3State,
86        delivery: Arc<DeliveryBus>,
87        store: Arc<dyn S3Store>,
88    ) -> Self {
89        Self {
90            state,
91            delivery,
92            kms_state: None,
93            kms_hook: None,
94            store,
95        }
96    }
97
98    pub fn with_kms(mut self, kms_state: SharedKmsState) -> Self {
99        self.kms_state = Some(kms_state);
100        self
101    }
102
103    pub fn with_kms_hook(mut self, hook: Arc<dyn fakecloud_core::delivery::KmsHook>) -> Self {
104        self.kms_hook = Some(hook);
105        self
106    }
107
108    /// Encrypt object body bytes for SSE-KMS storage. Returns ciphertext as
109    /// raw bytes (a UTF-8 fakecloud-kms envelope) on success.
110    ///
111    /// Fail-closed: if the KMS hook reports an error (key denied, key not
112    /// found, etc.), this returns `Err` so PutObject aborts with a 500
113    /// rather than silently storing plaintext. AWS S3 has the same
114    /// behavior — `KMS.NotFoundException` and friends surface as
115    /// `AccessDenied` / `KMS.*` errors back to the caller. When no hook is
116    /// wired (legacy / in-process tests with no KMS dependency), the
117    /// plaintext is returned unchanged so existing tests keep working.
118    pub(crate) fn encrypt_object_body(
119        &self,
120        account_id: &str,
121        region: &str,
122        bucket: &str,
123        plaintext: &[u8],
124        kms_key_id: Option<&str>,
125    ) -> Result<bytes::Bytes, AwsServiceError> {
126        let Some(hook) = &self.kms_hook else {
127            return Ok(bytes::Bytes::copy_from_slice(plaintext));
128        };
129        let key = kms_key_id.filter(|k| !k.is_empty()).unwrap_or("aws/s3");
130        let bucket_arn = Arn::s3(bucket).to_string();
131        let mut ctx = std::collections::HashMap::new();
132        ctx.insert("aws:s3:arn".to_string(), bucket_arn);
133        match hook.encrypt(account_id, region, key, plaintext, "s3.amazonaws.com", ctx) {
134            Ok(envelope) => Ok(bytes::Bytes::from(envelope.into_bytes())),
135            Err(err) => {
136                tracing::warn!(bucket = %bucket, error = %err, "SSE-KMS encrypt failed");
137                Err(AwsServiceError::aws_error(
138                    StatusCode::INTERNAL_SERVER_ERROR,
139                    "KMS.InternalFailureException",
140                    format!("Failed to encrypt object via KMS: {err}"),
141                ))
142            }
143        }
144    }
145
146    /// Decrypt object body bytes that were stored as a fakecloud-kms
147    /// envelope. Caller is expected to gate this on
148    /// `obj.sse_algorithm == Some("aws:kms")`.
149    ///
150    /// Fail-closed: when a hook is wired and the bytes look like an
151    /// envelope but don't decrypt (key revoked, malformed ciphertext),
152    /// this returns `Err` so GetObject surfaces a 500. When no hook is
153    /// wired, or the bytes aren't UTF-8 (legacy snapshots from before
154    /// the hook landed), the bytes are returned unchanged.
155    pub(crate) fn decrypt_object_body(
156        &self,
157        account_id: &str,
158        bucket: &str,
159        ciphertext: &[u8],
160    ) -> Result<bytes::Bytes, AwsServiceError> {
161        let Some(hook) = &self.kms_hook else {
162            return Ok(bytes::Bytes::copy_from_slice(ciphertext));
163        };
164        // Stored envelope is base64 ASCII; non-UTF-8 bytes are pre-hook
165        // legacy snapshots, return as-is.
166        let envelope = match std::str::from_utf8(ciphertext) {
167            Ok(s) => s,
168            Err(_) => return Ok(bytes::Bytes::copy_from_slice(ciphertext)),
169        };
170        let bucket_arn = Arn::s3(bucket).to_string();
171        let mut ctx = std::collections::HashMap::new();
172        ctx.insert("aws:s3:arn".to_string(), bucket_arn);
173        match hook.decrypt(account_id, envelope, "s3.amazonaws.com", ctx) {
174            Ok(bytes) => Ok(bytes::Bytes::from(bytes)),
175            Err(err) => {
176                tracing::warn!(bucket = %bucket, error = %err, "SSE-KMS decrypt failed");
177                Err(AwsServiceError::aws_error(
178                    StatusCode::INTERNAL_SERVER_ERROR,
179                    "KMS.InternalFailureException",
180                    format!("Failed to decrypt object via KMS: {err}"),
181                ))
182            }
183        }
184    }
185}
186
187#[async_trait]
188impl AwsService for S3Service {
189    fn service_name(&self) -> &str {
190        "s3"
191    }
192
193    async fn handle(&self, mut req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
194        // PutObject / UploadPart enter dispatch via the streaming path
195        // with `body = Bytes::new()` and the raw HTTP body available on
196        // `req.body_stream`. Those handlers consume the stream directly
197        // — spooling chunks to disk while computing MD5 + size in
198        // constant memory — so a 1 GiB upload never materializes into
199        // RAM. Every *other* PUT-on-bucket-key the streaming dispatch
200        // flagged (PutObjectTagging, PutObjectAcl, PutObjectRetention,
201        // PutObjectLegalHold, CopyObject, …) reads a small XML/JSON
202        // body from `req.body`, so drain the stream for them.
203        let is_put_to_key = req.method == Method::PUT
204            && req.path_segments.len() >= 2
205            && req
206                .path_segments
207                .first()
208                .map(|s| !s.is_empty())
209                .unwrap_or(false);
210        let q = &req.query_params;
211        let is_put_object = is_put_to_key
212            && !q.contains_key("tagging")
213            && !q.contains_key("acl")
214            && !q.contains_key("retention")
215            && !q.contains_key("legal-hold")
216            && !q.contains_key("renameObject")
217            && !q.contains_key("encryption")
218            && !req.headers.contains_key("x-amz-copy-source");
219        // UploadPart requires both partNumber AND uploadId; checking
220        // only partNumber would skip body draining for stray PUTs that
221        // happened to carry a partNumber query param without being a
222        // real multipart upload.
223        let is_upload_part =
224            is_put_to_key && q.contains_key("partNumber") && q.contains_key("uploadId");
225        if !is_put_object && !is_upload_part {
226            if let Some(stream) = req.take_body_stream() {
227                req.body = fakecloud_core::service::drain_request_stream(stream).await?;
228            }
229        }
230
231        // Access point data plane: resolve alias -> bucket before routing.
232        access_points::resolve_access_point(self, &mut req)?;
233
234        let account_id = req.account_id.as_str();
235
236        // S3 Control endpoint (access point management).
237        let host = req
238            .headers
239            .get("host")
240            .and_then(|v| v.to_str().ok())
241            .unwrap_or("");
242        let is_control = host.to_ascii_lowercase().contains("s3-control");
243        if is_control {
244            let v1 = req.path_segments.first().map(|s| s.as_str());
245            let v2 = req.path_segments.get(1).map(|s| s.as_str());
246            let v3 = req.path_segments.get(2).map(|s| s.as_str());
247            if v1 == Some("v20180820") && v2 == Some("accesspoint") {
248                if let Some(name) = v3 {
249                    match req.method {
250                        Method::PUT => return self.create_access_point(account_id, &req, name),
251                        Method::GET => return self.get_access_point(account_id, &req, name),
252                        Method::DELETE => return self.delete_access_point(account_id, &req, name),
253                        _ => {}
254                    }
255                } else if req.method == Method::GET {
256                    return self.list_access_points(account_id, &req);
257                }
258            }
259        }
260
261        // S3 REST routing: method + path segments + query params
262        let bucket = req.path_segments.first().map(|s| s.as_str());
263        // Extract key from the raw path to preserve leading slashes and empty segments.
264        // The raw path is like "/bucket/key/parts" — we strip the bucket prefix.
265        let key = if let Some(b) = bucket {
266            let prefix = format!("/{b}/");
267            if req.raw_path.starts_with(&prefix) && req.raw_path.len() > prefix.len() {
268                let raw_key = &req.raw_path[prefix.len()..];
269                Some(
270                    percent_encoding::percent_decode_str(raw_key)
271                        .decode_utf8_lossy()
272                        .into_owned(),
273                )
274            } else if req.path_segments.len() > 1 {
275                let raw = req.path_segments[1..].join("/");
276                Some(
277                    percent_encoding::percent_decode_str(&raw)
278                        .decode_utf8_lossy()
279                        .into_owned(),
280                )
281            } else {
282                None
283            }
284        } else {
285            None
286        };
287
288        // Multipart upload operations (checked before main match)
289        if let Some(b) = bucket {
290            // POST /{bucket}/{key}?uploads — CreateMultipartUpload
291            if req.method == Method::POST
292                && key.is_some()
293                && req.query_params.contains_key("uploads")
294            {
295                return self.create_multipart_upload(account_id, &req, b, key.as_deref().unwrap());
296            }
297
298            // POST /{bucket}/{key}?restore
299            if req.method == Method::POST
300                && key.is_some()
301                && req.query_params.contains_key("restore")
302            {
303                return self.restore_object(account_id, &req, b, key.as_deref().unwrap());
304            }
305
306            // POST /{bucket}/{key}?uploadId=X — CompleteMultipartUpload
307            if req.method == Method::POST && key.is_some() {
308                if let Some(upload_id) = req.query_params.get("uploadId").cloned() {
309                    return self.complete_multipart_upload(
310                        account_id,
311                        &req,
312                        b,
313                        key.as_deref().unwrap(),
314                        &upload_id,
315                    );
316                }
317            }
318
319            // PUT /{bucket}/{key}?partNumber=N&uploadId=X — UploadPart or UploadPartCopy
320            if req.method == Method::PUT && key.is_some() {
321                if let (Some(part_num_str), Some(upload_id)) = (
322                    req.query_params.get("partNumber").cloned(),
323                    req.query_params.get("uploadId").cloned(),
324                ) {
325                    if let Ok(part_number) = part_num_str.parse::<i64>() {
326                        if req.headers.contains_key("x-amz-copy-source") {
327                            return self.upload_part_copy(
328                                account_id,
329                                &req,
330                                b,
331                                key.as_deref().unwrap(),
332                                &upload_id,
333                                part_number,
334                            );
335                        }
336                        return self
337                            .upload_part(
338                                account_id,
339                                &req,
340                                b,
341                                key.as_deref().unwrap(),
342                                &upload_id,
343                                part_number,
344                            )
345                            .await;
346                    }
347                }
348            }
349
350            // DELETE /{bucket}/{key}?uploadId=X — AbortMultipartUpload
351            if req.method == Method::DELETE && key.is_some() {
352                if let Some(upload_id) = req.query_params.get("uploadId").cloned() {
353                    return self.abort_multipart_upload(
354                        account_id,
355                        b,
356                        key.as_deref().unwrap(),
357                        &upload_id,
358                    );
359                }
360            }
361
362            // GET /{bucket}?uploads — ListMultipartUploads
363            if req.method == Method::GET
364                && key.is_none()
365                && req.query_params.contains_key("uploads")
366            {
367                return self.list_multipart_uploads(account_id, b);
368            }
369
370            // GET /{bucket}/{key}?uploadId=X — ListParts
371            if req.method == Method::GET && key.is_some() {
372                if let Some(upload_id) = req.query_params.get("uploadId").cloned() {
373                    return self.list_parts(
374                        account_id,
375                        &req,
376                        b,
377                        key.as_deref().unwrap(),
378                        &upload_id,
379                    );
380                }
381            }
382        }
383
384        // Handle OPTIONS preflight requests (CORS)
385        if req.method == Method::OPTIONS {
386            if let Some(b_name) = bucket {
387                let cors_config = {
388                    let accounts = self.state.read();
389                    let _empty_s3 = crate::state::S3State::new(&req.account_id, &req.region);
390                    let state = accounts.get(&req.account_id).unwrap_or(&_empty_s3);
391                    state
392                        .buckets
393                        .get(b_name)
394                        .and_then(|b| b.cors_config.clone())
395                };
396                if let Some(ref config) = cors_config {
397                    let origin = req
398                        .headers
399                        .get("origin")
400                        .and_then(|v| v.to_str().ok())
401                        .unwrap_or("");
402                    let request_method = req
403                        .headers
404                        .get("access-control-request-method")
405                        .and_then(|v| v.to_str().ok())
406                        .unwrap_or("");
407                    let rules = parse_cors_config(config);
408                    if let Some(rule) = find_cors_rule(&rules, origin, Some(request_method)) {
409                        let mut headers = HeaderMap::new();
410                        let matched_origin = if rule.allowed_origins.contains(&"*".to_string()) {
411                            "*"
412                        } else {
413                            origin
414                        };
415                        headers.insert(
416                            "access-control-allow-origin",
417                            matched_origin
418                                .parse()
419                                .unwrap_or_else(|_| http::HeaderValue::from_static("")),
420                        );
421                        headers.insert(
422                            "access-control-allow-methods",
423                            rule.allowed_methods
424                                .join(", ")
425                                .parse()
426                                .unwrap_or_else(|_| http::HeaderValue::from_static("")),
427                        );
428                        if !rule.allowed_headers.is_empty() {
429                            let ah = if rule.allowed_headers.contains(&"*".to_string()) {
430                                req.headers
431                                    .get("access-control-request-headers")
432                                    .and_then(|v| v.to_str().ok())
433                                    .unwrap_or("*")
434                                    .to_string()
435                            } else {
436                                rule.allowed_headers.join(", ")
437                            };
438                            headers.insert(
439                                "access-control-allow-headers",
440                                ah.parse()
441                                    .unwrap_or_else(|_| http::HeaderValue::from_static("")),
442                            );
443                        }
444                        if let Some(max_age) = rule.max_age_seconds {
445                            headers.insert(
446                                "access-control-max-age",
447                                max_age
448                                    .to_string()
449                                    .parse()
450                                    .unwrap_or_else(|_| http::HeaderValue::from_static("")),
451                            );
452                        }
453                        return Ok(AwsResponse {
454                            status: StatusCode::OK,
455                            content_type: String::new(),
456                            body: Bytes::new().into(),
457                            headers,
458                        });
459                    }
460                }
461                return Err(AwsServiceError::aws_error(
462                    StatusCode::FORBIDDEN,
463                    "CORSResponse",
464                    "CORS is not enabled for this bucket",
465                ));
466            }
467        }
468
469        // Capture origin for CORS response headers
470        let origin_header = req
471            .headers
472            .get("origin")
473            .and_then(|v| v.to_str().ok())
474            .map(|s| s.to_string());
475
476        let mut result = match (&req.method, bucket, key.as_deref()) {
477            // ListBuckets: GET /
478            (&Method::GET, None, None) => {
479                if req.query_params.get("x-id").map(|s| s.as_str()) == Some("ListDirectoryBuckets")
480                {
481                    self.list_directory_buckets(account_id, &req)
482                } else {
483                    self.list_buckets(account_id, &req)
484                }
485            }
486
487            // Bucket-level operations (no key)
488            (&Method::PUT, Some(b), None) => {
489                if req.query_params.contains_key("tagging") {
490                    self.put_bucket_tagging(account_id, &req, b)
491                } else if req.query_params.contains_key("acl") {
492                    self.put_bucket_acl(account_id, &req, b)
493                } else if req.query_params.contains_key("versioning") {
494                    self.put_bucket_versioning(account_id, &req, b)
495                } else if req.query_params.contains_key("cors") {
496                    self.put_bucket_cors(account_id, &req, b)
497                } else if req.query_params.contains_key("notification") {
498                    self.put_bucket_notification(account_id, &req, b)
499                } else if req.query_params.contains_key("website") {
500                    self.put_bucket_website(account_id, &req, b)
501                } else if req.query_params.contains_key("accelerate") {
502                    self.put_bucket_accelerate(account_id, &req, b)
503                } else if req.query_params.contains_key("publicAccessBlock") {
504                    self.put_public_access_block(account_id, &req, b)
505                } else if req.query_params.contains_key("encryption") {
506                    self.put_bucket_encryption(account_id, &req, b)
507                } else if req.query_params.contains_key("lifecycle") {
508                    self.put_bucket_lifecycle(account_id, &req, b)
509                } else if req.query_params.contains_key("logging") {
510                    self.put_bucket_logging(account_id, &req, b)
511                } else if req.query_params.contains_key("policy") {
512                    self.put_bucket_policy(account_id, &req, b)
513                } else if req.query_params.contains_key("object-lock") {
514                    self.put_object_lock_config(account_id, &req, b)
515                } else if req.query_params.contains_key("replication") {
516                    self.put_bucket_replication(account_id, &req, b)
517                } else if req.query_params.contains_key("ownershipControls") {
518                    self.put_bucket_ownership_controls(account_id, &req, b)
519                } else if req.query_params.contains_key("inventory") {
520                    self.put_bucket_inventory(account_id, &req, b)
521                } else if req.query_params.contains_key("analytics") {
522                    self.put_bucket_analytics_config(account_id, &req, b)
523                } else if req.query_params.contains_key("intelligent-tiering") {
524                    self.put_bucket_intelligent_tiering_config(account_id, &req, b)
525                } else if req.query_params.contains_key("metrics") {
526                    self.put_bucket_metrics_config(account_id, &req, b)
527                } else if req.query_params.contains_key("requestPayment") {
528                    self.put_bucket_request_payment(account_id, &req, b)
529                } else if req.query_params.contains_key("abac") {
530                    self.put_bucket_abac(account_id, &req, b)
531                } else if req.query_params.contains_key("metadataInventoryTable") {
532                    self.update_bucket_metadata_inventory_table(account_id, &req, b)
533                } else if req.query_params.contains_key("metadataJournalTable") {
534                    self.update_bucket_metadata_journal_table(account_id, &req, b)
535                } else {
536                    self.create_bucket(account_id, &req, b)
537                }
538            }
539            (&Method::DELETE, Some(b), None) => {
540                if req.query_params.contains_key("tagging") {
541                    self.delete_bucket_tagging(account_id, &req, b)
542                } else if req.query_params.contains_key("cors") {
543                    self.delete_bucket_cors(account_id, b)
544                } else if req.query_params.contains_key("website") {
545                    self.delete_bucket_website(account_id, b)
546                } else if req.query_params.contains_key("publicAccessBlock") {
547                    self.delete_public_access_block(account_id, b)
548                } else if req.query_params.contains_key("encryption") {
549                    self.delete_bucket_encryption(account_id, b)
550                } else if req.query_params.contains_key("lifecycle") {
551                    self.delete_bucket_lifecycle(account_id, b)
552                } else if req.query_params.contains_key("policy") {
553                    self.delete_bucket_policy(account_id, b)
554                } else if req.query_params.contains_key("replication") {
555                    self.delete_bucket_replication(account_id, b)
556                } else if req.query_params.contains_key("ownershipControls") {
557                    self.delete_bucket_ownership_controls(account_id, b)
558                } else if req.query_params.contains_key("inventory") {
559                    self.delete_bucket_inventory(account_id, &req, b)
560                } else if req.query_params.contains_key("analytics") {
561                    self.delete_bucket_analytics_config(account_id, &req, b)
562                } else if req.query_params.contains_key("intelligent-tiering") {
563                    self.delete_bucket_intelligent_tiering_config(account_id, &req, b)
564                } else if req.query_params.contains_key("metrics") {
565                    self.delete_bucket_metrics_config(account_id, &req, b)
566                } else if req.query_params.contains_key("metadataConfiguration") {
567                    self.delete_bucket_metadata_config(account_id, b)
568                } else if req.query_params.contains_key("metadataTable") {
569                    self.delete_bucket_metadata_table_config(account_id, b)
570                } else {
571                    self.delete_bucket(account_id, &req, b)
572                }
573            }
574            (&Method::HEAD, Some(b), None) => self.head_bucket(account_id, b),
575            (&Method::GET, Some(b), None) => {
576                if req.query_params.contains_key("tagging") {
577                    self.get_bucket_tagging(account_id, &req, b)
578                } else if req.query_params.contains_key("location") {
579                    self.get_bucket_location(account_id, b)
580                } else if req.query_params.contains_key("acl") {
581                    self.get_bucket_acl(account_id, &req, b)
582                } else if req.query_params.contains_key("versioning") {
583                    self.get_bucket_versioning(account_id, b)
584                } else if req.query_params.contains_key("versions") {
585                    self.list_object_versions(account_id, &req, b)
586                } else if req.query_params.contains_key("object-lock") {
587                    self.get_object_lock_configuration(account_id, b)
588                } else if req.query_params.contains_key("cors") {
589                    self.get_bucket_cors(account_id, b)
590                } else if req.query_params.contains_key("notification") {
591                    self.get_bucket_notification(account_id, b)
592                } else if req.query_params.contains_key("website") {
593                    self.get_bucket_website(account_id, b)
594                } else if req.query_params.contains_key("accelerate") {
595                    self.get_bucket_accelerate(account_id, b)
596                } else if req.query_params.contains_key("publicAccessBlock") {
597                    self.get_public_access_block(account_id, b)
598                } else if req.query_params.contains_key("encryption") {
599                    self.get_bucket_encryption(account_id, b)
600                } else if req.query_params.contains_key("lifecycle") {
601                    self.get_bucket_lifecycle(account_id, b)
602                } else if req.query_params.contains_key("logging") {
603                    self.get_bucket_logging(account_id, b)
604                } else if req.query_params.contains_key("policy") {
605                    self.get_bucket_policy(account_id, b)
606                } else if req.query_params.contains_key("replication") {
607                    self.get_bucket_replication(account_id, b)
608                } else if req.query_params.contains_key("ownershipControls") {
609                    self.get_bucket_ownership_controls(account_id, b)
610                } else if req.query_params.contains_key("inventory") {
611                    if req.query_params.contains_key("id") {
612                        self.get_bucket_inventory(account_id, &req, b)
613                    } else {
614                        self.list_bucket_inventory_configurations(account_id, b)
615                    }
616                } else if req.query_params.contains_key("analytics") {
617                    if req.query_params.contains_key("id") {
618                        self.get_bucket_analytics_config(account_id, &req, b)
619                    } else {
620                        self.list_bucket_analytics_configurations(account_id, b)
621                    }
622                } else if req.query_params.contains_key("intelligent-tiering") {
623                    if req.query_params.contains_key("id") {
624                        self.get_bucket_intelligent_tiering_config(account_id, &req, b)
625                    } else {
626                        self.list_bucket_intelligent_tiering_configurations(account_id, b)
627                    }
628                } else if req.query_params.contains_key("metrics") {
629                    if req.query_params.contains_key("id") {
630                        self.get_bucket_metrics_config(account_id, &req, b)
631                    } else {
632                        self.list_bucket_metrics_configurations(account_id, b)
633                    }
634                } else if req.query_params.contains_key("requestPayment") {
635                    self.get_bucket_request_payment(account_id, b)
636                } else if req.query_params.contains_key("abac") {
637                    self.get_bucket_abac(account_id, b)
638                } else if req.query_params.contains_key("policyStatus") {
639                    self.get_bucket_policy_status(account_id, b)
640                } else if req.query_params.contains_key("metadataConfiguration") {
641                    self.get_bucket_metadata_config(account_id, b)
642                } else if req.query_params.contains_key("metadataTable") {
643                    self.get_bucket_metadata_table_config(account_id, b)
644                } else if req.query_params.contains_key("session") {
645                    self.create_session(account_id, &req, b)
646                } else if req.query_params.get("list-type").map(|s| s.as_str()) == Some("2") {
647                    self.list_objects_v2(account_id, &req, b)
648                } else if req.query_params.is_empty() {
649                    // If bucket has website config and no query params, serve index document
650                    let website_config = {
651                        let accounts = self.state.read();
652                        let _empty_s3 = crate::state::S3State::new(&req.account_id, &req.region);
653                        let state = accounts.get(&req.account_id).unwrap_or(&_empty_s3);
654                        state
655                            .buckets
656                            .get(b)
657                            .and_then(|bkt| bkt.website_config.clone())
658                    };
659                    if let Some(ref config) = website_config {
660                        if let Some(index_doc) = extract_xml_value(config, "Suffix").or_else(|| {
661                            extract_xml_value(config, "IndexDocument").and_then(|inner| {
662                                let open = "<Suffix>";
663                                let close = "</Suffix>";
664                                let s = inner.find(open)? + open.len();
665                                let e = inner.find(close)?;
666                                Some(inner[s..e].trim().to_string())
667                            })
668                        }) {
669                            self.serve_website_object(account_id, &req, b, &index_doc, config)
670                        } else {
671                            self.list_objects_v1(account_id, &req, b)
672                        }
673                    } else {
674                        self.list_objects_v1(account_id, &req, b)
675                    }
676                } else {
677                    self.list_objects_v1(account_id, &req, b)
678                }
679            }
680
681            // Object-level operations
682            (&Method::PUT, Some(b), Some(k)) => {
683                if req.query_params.contains_key("tagging") {
684                    self.put_object_tagging(account_id, &req, b, k)
685                } else if req.query_params.contains_key("acl") {
686                    self.put_object_acl(account_id, &req, b, k)
687                } else if req.query_params.contains_key("retention") {
688                    self.put_object_retention(account_id, &req, b, k)
689                } else if req.query_params.contains_key("legal-hold") {
690                    self.put_object_legal_hold(account_id, &req, b, k)
691                } else if req.query_params.contains_key("renameObject") {
692                    self.rename_object(account_id, &req, b, k)
693                } else if req.query_params.contains_key("encryption") {
694                    self.update_object_encryption(account_id, &req, b, k)
695                } else if req.headers.contains_key("x-amz-copy-source") {
696                    self.copy_object(account_id, &req, b, k)
697                } else {
698                    self.put_object(account_id, &req, b, k).await
699                }
700            }
701            (&Method::GET, Some(b), Some(k)) => {
702                if req.query_params.contains_key("tagging") {
703                    self.get_object_tagging(account_id, &req, b, k)
704                } else if req.query_params.contains_key("acl") {
705                    self.get_object_acl(account_id, &req, b, k)
706                } else if req.query_params.contains_key("retention") {
707                    self.get_object_retention(account_id, &req, b, k)
708                } else if req.query_params.contains_key("legal-hold") {
709                    self.get_object_legal_hold(account_id, &req, b, k)
710                } else if req.query_params.contains_key("attributes") {
711                    self.get_object_attributes(account_id, &req, b, k)
712                } else if req.query_params.contains_key("torrent") {
713                    self.get_object_torrent(account_id, &req, b, k)
714                } else {
715                    let result = self.get_object(account_id, &req, b, k);
716                    // If object not found and bucket has website config, serve error document
717                    let is_not_found = matches!(
718                        &result,
719                        Err(e) if e.code() == "NoSuchKey"
720                    );
721                    if is_not_found {
722                        let website_config = {
723                            let accounts = self.state.read();
724                            let _empty_s3 =
725                                crate::state::S3State::new(&req.account_id, &req.region);
726                            let state = accounts.get(&req.account_id).unwrap_or(&_empty_s3);
727                            state
728                                .buckets
729                                .get(b)
730                                .and_then(|bkt| bkt.website_config.clone())
731                        };
732                        if let Some(ref config) = website_config {
733                            if let Some(error_key) = extract_xml_value(config, "ErrorDocument")
734                                .and_then(|inner| {
735                                    let open = "<Key>";
736                                    let close = "</Key>";
737                                    let s = inner.find(open)? + open.len();
738                                    let e = inner.find(close)?;
739                                    Some(inner[s..e].trim().to_string())
740                                })
741                                .or_else(|| extract_xml_value(config, "Key"))
742                            {
743                                return self.serve_website_error(account_id, &req, b, &error_key);
744                            }
745                        }
746                    }
747                    result
748                }
749            }
750            (&Method::DELETE, Some(b), Some(k)) => {
751                if req.query_params.contains_key("tagging") {
752                    self.delete_object_tagging(account_id, b, k)
753                } else {
754                    self.delete_object(account_id, &req, b, k)
755                }
756            }
757            (&Method::HEAD, Some(b), Some(k)) => self.head_object(account_id, &req, b, k),
758
759            // POST /{bucket}?delete — batch delete
760            (&Method::POST, Some(b), None) if req.query_params.contains_key("delete") => {
761                self.delete_objects(account_id, &req, b)
762            }
763            (&Method::POST, Some(b), None)
764                if req.query_params.contains_key("metadataConfiguration") =>
765            {
766                self.create_bucket_metadata_config(account_id, &req, b)
767            }
768            (&Method::POST, Some(b), None) if req.query_params.contains_key("metadataTable") => {
769                self.create_bucket_metadata_table_config(account_id, &req, b)
770            }
771            (&Method::POST, Some(b), Some(k))
772                if req.query_params.get("select-type").map(|s| s.as_str()) == Some("2") =>
773            {
774                self.select_object_content(account_id, &req, b, k)
775            }
776            (&Method::POST, Some("WriteGetObjectResponse"), None) => {
777                self.write_get_object_response(account_id, &req)
778            }
779
780            _ => Err(AwsServiceError::aws_error(
781                StatusCode::METHOD_NOT_ALLOWED,
782                "MethodNotAllowed",
783                "The specified method is not allowed against this resource",
784            )),
785        };
786
787        // Apply CORS headers to the response if Origin was present
788        if let (Some(ref origin), Some(b_name)) = (&origin_header, bucket) {
789            let cors_config = {
790                let accounts = self.state.read();
791                let _empty_s3 = crate::state::S3State::new(&req.account_id, &req.region);
792                let state = accounts.get(&req.account_id).unwrap_or(&_empty_s3);
793                state
794                    .buckets
795                    .get(b_name)
796                    .and_then(|b| b.cors_config.clone())
797            };
798            if let Some(ref config) = cors_config {
799                let rules = parse_cors_config(config);
800                if let Some(rule) = find_cors_rule(&rules, origin, None) {
801                    if let Ok(ref mut resp) = result {
802                        let matched_origin = if rule.allowed_origins.contains(&"*".to_string()) {
803                            "*"
804                        } else {
805                            origin
806                        };
807                        resp.headers.insert(
808                            "access-control-allow-origin",
809                            matched_origin
810                                .parse()
811                                .unwrap_or_else(|_| http::HeaderValue::from_static("")),
812                        );
813                        if !rule.expose_headers.is_empty() {
814                            resp.headers.insert(
815                                "access-control-expose-headers",
816                                rule.expose_headers
817                                    .join(", ")
818                                    .parse()
819                                    .unwrap_or_else(|_| http::HeaderValue::from_static("")),
820                            );
821                        }
822                    }
823                }
824            }
825        }
826
827        // Write S3 access log entry if the source bucket has logging enabled
828        if let Some(b_name) = bucket {
829            let status_code = match &result {
830                Ok(resp) => resp.status.as_u16(),
831                Err(e) => e.status().as_u16(),
832            };
833            let op = logging::operation_name(&req.method, key.as_deref());
834            logging::maybe_write_access_log(
835                &self.state,
836                &self.store,
837                b_name,
838                &logging::AccessLogRequest {
839                    operation: op,
840                    key: key.as_deref(),
841                    status: status_code,
842                    request_id: &req.request_id,
843                    method: req.method.as_str(),
844                    path: &req.raw_path,
845                },
846            );
847        }
848
849        result
850    }
851
852    fn supported_actions(&self) -> &[&str] {
853        &[
854            // Buckets
855            "ListBuckets",
856            "CreateBucket",
857            "DeleteBucket",
858            "HeadBucket",
859            "GetBucketLocation",
860            // Objects
861            "PutObject",
862            "GetObject",
863            "DeleteObject",
864            "HeadObject",
865            "CopyObject",
866            "DeleteObjects",
867            "ListObjectsV2",
868            "ListObjects",
869            "ListObjectVersions",
870            "GetObjectAttributes",
871            "RestoreObject",
872            // Object properties
873            "PutObjectTagging",
874            "GetObjectTagging",
875            "DeleteObjectTagging",
876            "PutObjectAcl",
877            "GetObjectAcl",
878            "PutObjectRetention",
879            "GetObjectRetention",
880            "PutObjectLegalHold",
881            "GetObjectLegalHold",
882            // Bucket configuration
883            "PutBucketTagging",
884            "GetBucketTagging",
885            "DeleteBucketTagging",
886            "PutBucketAcl",
887            "GetBucketAcl",
888            "PutBucketVersioning",
889            "GetBucketVersioning",
890            "PutBucketCors",
891            "GetBucketCors",
892            "DeleteBucketCors",
893            "PutBucketNotificationConfiguration",
894            "GetBucketNotificationConfiguration",
895            "PutBucketWebsite",
896            "GetBucketWebsite",
897            "DeleteBucketWebsite",
898            "PutBucketAccelerateConfiguration",
899            "GetBucketAccelerateConfiguration",
900            "PutPublicAccessBlock",
901            "GetPublicAccessBlock",
902            "DeletePublicAccessBlock",
903            "PutBucketEncryption",
904            "GetBucketEncryption",
905            "DeleteBucketEncryption",
906            "PutBucketLifecycleConfiguration",
907            "GetBucketLifecycleConfiguration",
908            "DeleteBucketLifecycle",
909            "PutBucketLogging",
910            "GetBucketLogging",
911            "PutBucketPolicy",
912            "GetBucketPolicy",
913            "DeleteBucketPolicy",
914            "PutObjectLockConfiguration",
915            "GetObjectLockConfiguration",
916            "PutBucketReplication",
917            "GetBucketReplication",
918            "DeleteBucketReplication",
919            "PutBucketOwnershipControls",
920            "GetBucketOwnershipControls",
921            "DeleteBucketOwnershipControls",
922            "PutBucketInventoryConfiguration",
923            "GetBucketInventoryConfiguration",
924            "DeleteBucketInventoryConfiguration",
925            "ListBucketInventoryConfigurations",
926            "PutBucketAnalyticsConfiguration",
927            "GetBucketAnalyticsConfiguration",
928            "DeleteBucketAnalyticsConfiguration",
929            "ListBucketAnalyticsConfigurations",
930            "PutBucketIntelligentTieringConfiguration",
931            "GetBucketIntelligentTieringConfiguration",
932            "DeleteBucketIntelligentTieringConfiguration",
933            "ListBucketIntelligentTieringConfigurations",
934            "PutBucketMetricsConfiguration",
935            "GetBucketMetricsConfiguration",
936            "DeleteBucketMetricsConfiguration",
937            "ListBucketMetricsConfigurations",
938            "PutBucketRequestPayment",
939            "GetBucketRequestPayment",
940            "PutBucketAbac",
941            "GetBucketAbac",
942            "GetBucketPolicyStatus",
943            "CreateBucketMetadataConfiguration",
944            "GetBucketMetadataConfiguration",
945            "DeleteBucketMetadataConfiguration",
946            "CreateBucketMetadataTableConfiguration",
947            "GetBucketMetadataTableConfiguration",
948            "DeleteBucketMetadataTableConfiguration",
949            "UpdateBucketMetadataInventoryTableConfiguration",
950            "UpdateBucketMetadataJournalTableConfiguration",
951            "GetObjectTorrent",
952            "RenameObject",
953            "SelectObjectContent",
954            "UpdateObjectEncryption",
955            "WriteGetObjectResponse",
956            "ListDirectoryBuckets",
957            "CreateSession",
958            // Multipart uploads
959            "CreateMultipartUpload",
960            "UploadPart",
961            "UploadPartCopy",
962            "CompleteMultipartUpload",
963            "AbortMultipartUpload",
964            "ListParts",
965            "ListMultipartUploads",
966        ]
967    }
968
969    fn iam_enforceable(&self) -> bool {
970        true
971    }
972
973    /// S3 resources are either:
974    /// - Bucket ARN (`arn:aws:s3:::bucket`) for bucket-level actions
975    /// - Object ARN (`arn:aws:s3:::bucket/key`) for object-level actions
976    /// - Wildcard (`*`) for `ListBuckets` which doesn't target a specific
977    ///   resource
978    ///
979    /// S3 ARNs notably omit the account id and region — this is the one
980    /// AWS service that carries neither in its ARN, because bucket names
981    /// are globally unique.
982    fn iam_action_for(&self, request: &AwsRequest) -> Option<fakecloud_core::auth::IamAction> {
983        // S3 doesn't set `request.action` — the handler dispatches on
984        // method + path + query params directly. Re-derive the action
985        // name here so enforcement can match against IAM policies the
986        // same way the real service would.
987        let bucket = request.path_segments.first().map(|s| s.as_str());
988        let key = if request.path_segments.len() > 1 {
989            Some(request.path_segments[1..].join("/"))
990        } else {
991            None
992        };
993        let action = s3_detect_action(
994            request.method.as_str(),
995            bucket,
996            key.as_deref(),
997            &request.query_params,
998        )?;
999        let resource = s3_resource_for(action, bucket, key.as_deref());
1000        Some(fakecloud_core::auth::IamAction {
1001            service: "s3",
1002            action,
1003            resource,
1004        })
1005    }
1006
1007    fn iam_condition_keys_for(
1008        &self,
1009        request: &AwsRequest,
1010        action: &fakecloud_core::auth::IamAction,
1011    ) -> std::collections::BTreeMap<String, Vec<String>> {
1012        s3_condition_keys(action.action, &request.query_params)
1013    }
1014
1015    fn resource_tags_for(
1016        &self,
1017        resource_arn: &str,
1018    ) -> Option<std::collections::HashMap<String, String>> {
1019        s3_resource_tags(&self.state, resource_arn)
1020            .map(|m| m.into_iter().collect::<std::collections::HashMap<_, _>>())
1021    }
1022
1023    fn request_tags_from(
1024        &self,
1025        request: &AwsRequest,
1026        action: &str,
1027    ) -> Option<std::collections::HashMap<String, String>> {
1028        s3_request_tags(request, action)
1029            .map(|m| m.into_iter().collect::<std::collections::HashMap<_, _>>())
1030    }
1031}
1032
1033/// Extract service-specific IAM condition keys from an S3 request.
1034///
1035/// Today only `ListObjects` / `ListObjectsV2` expose keys (`s3:prefix`,
1036/// `s3:delimiter`, `s3:max-keys`) via their query params. Other actions
1037/// return an empty map so the evaluator's safe-fail semantics treat any
1038/// policy condition referencing an unknown key as "doesn't apply".
1039fn s3_condition_keys(
1040    action: &str,
1041    query: &std::collections::HashMap<String, String>,
1042) -> std::collections::BTreeMap<String, Vec<String>> {
1043    let mut out = std::collections::BTreeMap::new();
1044    if matches!(action, "ListObjects" | "ListObjectsV2") {
1045        // Both list variants share the same query param shape.
1046        if let Some(prefix) = query.get("prefix") {
1047            out.insert("s3:prefix".to_string(), vec![prefix.clone()]);
1048        }
1049        if let Some(delimiter) = query.get("delimiter") {
1050            out.insert("s3:delimiter".to_string(), vec![delimiter.clone()]);
1051        }
1052        if let Some(max_keys) = query.get("max-keys") {
1053            out.insert("s3:max-keys".to_string(), vec![max_keys.clone()]);
1054        }
1055    }
1056    out
1057}
1058
1059/// Look up resource tags for an S3 ARN.
1060///
1061/// Bucket-level ARN (`arn:aws:s3:::bucket`) -> bucket tags.
1062/// Object-level ARN (`arn:aws:s3:::bucket/key`) -> object tags.
1063/// `*` (ListBuckets) -> `Some(empty)` (no resource to tag).
1064fn s3_resource_tags(
1065    state: &SharedS3State,
1066    resource_arn: &str,
1067) -> Option<std::collections::BTreeMap<String, String>> {
1068    if resource_arn == "*" {
1069        return Some(std::collections::BTreeMap::new());
1070    }
1071    // S3 ARNs: arn:aws:s3:::bucket or arn:aws:s3:::bucket/key
1072    let after_prefix = resource_arn.strip_prefix("arn:aws:s3:::")?;
1073    let mas = state.read();
1074    // S3 bucket names are globally unique; scan all accounts to find the bucket
1075    let bucket_name = after_prefix.split('/').next().unwrap_or(after_prefix);
1076    let state = mas
1077        .find_account(|s| s.buckets.contains_key(bucket_name))
1078        .and_then(|id| mas.get(id))
1079        .or_else(|| Some(mas.default_ref()))?;
1080    if let Some(slash_pos) = after_prefix.find('/') {
1081        // Object-level: bucket/key
1082        let bucket_name = &after_prefix[..slash_pos];
1083        let key = &after_prefix[slash_pos + 1..];
1084        let bucket = state.buckets.get(bucket_name)?;
1085        // Try current objects first, then versioned objects (latest version)
1086        if let Some(obj) = bucket.objects.get(key) {
1087            Some(obj.tags.clone())
1088        } else if let Some(versions) = bucket.object_versions.get(key) {
1089            versions.last().map(|v| v.tags.clone())
1090        } else {
1091            // Object doesn't exist yet (e.g. PutObject creating it)
1092            Some(std::collections::BTreeMap::new())
1093        }
1094    } else {
1095        // Bucket-level
1096        let bucket = state.buckets.get(after_prefix)?;
1097        Some(bucket.tags.clone())
1098    }
1099}
1100
1101/// Extract tags from an S3 request body/headers.
1102///
1103/// S3 sends tags via:
1104/// - `x-amz-tagging` header (URL-encoded `key=value&...`) on PutObject
1105/// - XML body on PutBucketTagging / PutObjectTagging
1106fn s3_request_tags(
1107    request: &AwsRequest,
1108    action: &str,
1109) -> Option<std::collections::BTreeMap<String, String>> {
1110    match action {
1111        "PutObject" | "CopyObject" | "CreateMultipartUpload" => {
1112            // Tags come via x-amz-tagging header
1113            if let Some(tagging) = request.headers.get("x-amz-tagging") {
1114                let tags = parse_url_encoded_tags(tagging.to_str().unwrap_or(""));
1115                Some(tags.into_iter().collect())
1116            } else {
1117                Some(std::collections::BTreeMap::new())
1118            }
1119        }
1120        "PutBucketTagging" | "PutObjectTagging" => {
1121            // Tags come in XML body
1122            let body = std::str::from_utf8(&request.body).unwrap_or("");
1123            let tags = parse_tagging_xml(body);
1124            Some(tags.into_iter().collect())
1125        }
1126        _ => Some(std::collections::BTreeMap::new()),
1127    }
1128}
1129
1130/// Derive the IAM action name from an S3 REST request. Handles the
1131/// common cases (GetObject, PutObject, DeleteObject, ListObjectsV2,
1132/// CreateBucket, ...) plus a subset of sub-resource operations
1133/// (`?acl`, `?tagging`, `?versioning`, `?policy`, `?cors`, `?website`,
1134/// `?lifecycle`, `?encryption`, `?logging`, `?notification`, `?replication`,
1135/// `?ownershipControls`, `?publicAccessBlock`, `?accelerate`, `?inventory`,
1136/// `?object-lock`, `?uploads`, `?uploadId`).
1137///
1138/// Returns `None` for requests that don't map to a known action — the
1139/// dispatch layer then skips enforcement for that request rather than
1140/// guessing (and a warn log fires via the "service is iam_enforceable
1141/// but has no mapping" branch in dispatch.rs).
1142fn s3_detect_action(
1143    method: &str,
1144    bucket: Option<&str>,
1145    key: Option<&str>,
1146    query: &std::collections::HashMap<String, String>,
1147) -> Option<&'static str> {
1148    let has = |q: &str| query.contains_key(q);
1149    let is_get = method == "GET";
1150    let is_put = method == "PUT";
1151    let is_post = method == "POST";
1152    let is_delete = method == "DELETE";
1153
1154    // Service root
1155    if bucket.is_none() {
1156        return match method {
1157            "GET" => Some("ListBuckets"),
1158            _ => None,
1159        };
1160    }
1161    let has_key = key.is_some();
1162
1163    // Multipart sub-resource forms
1164    if has_key && is_post && has("uploads") {
1165        return Some("CreateMultipartUpload");
1166    }
1167    if has_key && is_post && has("uploadId") {
1168        return Some("CompleteMultipartUpload");
1169    }
1170    if has_key && is_put && has("partNumber") && has("uploadId") {
1171        return Some("UploadPart");
1172    }
1173    if has_key && is_delete && has("uploadId") {
1174        return Some("AbortMultipartUpload");
1175    }
1176    if has_key && is_get && has("uploadId") {
1177        return Some("ListParts");
1178    }
1179    if !has_key && is_get && has("uploads") {
1180        return Some("ListMultipartUploads");
1181    }
1182
1183    // Sub-resource-keyed actions (?acl, ?tagging, ...). Order matters
1184    // since a request can carry multiple; we pick the most specific.
1185    // Object-level sub-resources come first (key present).
1186    if has_key {
1187        if has("tagging") {
1188            return Some(match method {
1189                "GET" => "GetObjectTagging",
1190                "PUT" => "PutObjectTagging",
1191                "DELETE" => "DeleteObjectTagging",
1192                _ => return None,
1193            });
1194        }
1195        if has("acl") {
1196            return Some(match method {
1197                "GET" => "GetObjectAcl",
1198                "PUT" => "PutObjectAcl",
1199                _ => return None,
1200            });
1201        }
1202        if has("retention") {
1203            return Some(match method {
1204                "GET" => "GetObjectRetention",
1205                "PUT" => "PutObjectRetention",
1206                _ => return None,
1207            });
1208        }
1209        if has("legal-hold") {
1210            return Some(match method {
1211                "GET" => "GetObjectLegalHold",
1212                "PUT" => "PutObjectLegalHold",
1213                _ => return None,
1214            });
1215        }
1216        // Identified by cubic on PR #399: both ?attributes and ?restore
1217        // need method guards — otherwise e.g. GET /bucket/key?restore
1218        // would be classified as RestoreObject (POST-only in AWS) and
1219        // IAM-evaluated against s3:RestoreObject instead of s3:GetObject.
1220        if has("attributes") && is_get {
1221            return Some("GetObjectAttributes");
1222        }
1223        if has("restore") && is_post {
1224            return Some("RestoreObject");
1225        }
1226    }
1227
1228    // Bucket-level sub-resources (key absent).
1229    if !has_key {
1230        if has("tagging") {
1231            return Some(match method {
1232                "GET" => "GetBucketTagging",
1233                "PUT" => "PutBucketTagging",
1234                "DELETE" => "DeleteBucketTagging",
1235                _ => return None,
1236            });
1237        }
1238        if has("acl") {
1239            return Some(match method {
1240                "GET" => "GetBucketAcl",
1241                "PUT" => "PutBucketAcl",
1242                _ => return None,
1243            });
1244        }
1245        if has("versioning") {
1246            return Some(match method {
1247                "GET" => "GetBucketVersioning",
1248                "PUT" => "PutBucketVersioning",
1249                _ => return None,
1250            });
1251        }
1252        if has("cors") {
1253            return Some(match method {
1254                "GET" => "GetBucketCors",
1255                "PUT" => "PutBucketCors",
1256                "DELETE" => "DeleteBucketCors",
1257                _ => return None,
1258            });
1259        }
1260        if has("policy") {
1261            return Some(match method {
1262                "GET" => "GetBucketPolicy",
1263                "PUT" => "PutBucketPolicy",
1264                "DELETE" => "DeleteBucketPolicy",
1265                _ => return None,
1266            });
1267        }
1268        if has("website") {
1269            return Some(match method {
1270                "GET" => "GetBucketWebsite",
1271                "PUT" => "PutBucketWebsite",
1272                "DELETE" => "DeleteBucketWebsite",
1273                _ => return None,
1274            });
1275        }
1276        if has("lifecycle") {
1277            return Some(match method {
1278                "GET" => "GetBucketLifecycleConfiguration",
1279                "PUT" => "PutBucketLifecycleConfiguration",
1280                "DELETE" => "DeleteBucketLifecycle",
1281                _ => return None,
1282            });
1283        }
1284        if has("encryption") {
1285            return Some(match method {
1286                "GET" => "GetBucketEncryption",
1287                "PUT" => "PutBucketEncryption",
1288                "DELETE" => "DeleteBucketEncryption",
1289                _ => return None,
1290            });
1291        }
1292        if has("logging") {
1293            return Some(match method {
1294                "GET" => "GetBucketLogging",
1295                "PUT" => "PutBucketLogging",
1296                _ => return None,
1297            });
1298        }
1299        if has("notification") {
1300            return Some(match method {
1301                "GET" => "GetBucketNotificationConfiguration",
1302                "PUT" => "PutBucketNotificationConfiguration",
1303                _ => return None,
1304            });
1305        }
1306        if has("replication") {
1307            return Some(match method {
1308                "GET" => "GetBucketReplication",
1309                "PUT" => "PutBucketReplication",
1310                "DELETE" => "DeleteBucketReplication",
1311                _ => return None,
1312            });
1313        }
1314        if has("ownershipControls") {
1315            return Some(match method {
1316                "GET" => "GetBucketOwnershipControls",
1317                "PUT" => "PutBucketOwnershipControls",
1318                "DELETE" => "DeleteBucketOwnershipControls",
1319                _ => return None,
1320            });
1321        }
1322        if has("publicAccessBlock") {
1323            return Some(match method {
1324                "GET" => "GetPublicAccessBlock",
1325                "PUT" => "PutPublicAccessBlock",
1326                "DELETE" => "DeletePublicAccessBlock",
1327                _ => return None,
1328            });
1329        }
1330        if has("accelerate") {
1331            return Some(match method {
1332                "GET" => "GetBucketAccelerateConfiguration",
1333                "PUT" => "PutBucketAccelerateConfiguration",
1334                _ => return None,
1335            });
1336        }
1337        if has("inventory") {
1338            return Some(match method {
1339                "GET" => "GetBucketInventoryConfiguration",
1340                "PUT" => "PutBucketInventoryConfiguration",
1341                "DELETE" => "DeleteBucketInventoryConfiguration",
1342                _ => return None,
1343            });
1344        }
1345        if has("analytics") {
1346            return Some(match method {
1347                "GET" if has("id") => "GetBucketAnalyticsConfiguration",
1348                "GET" => "ListBucketAnalyticsConfigurations",
1349                "PUT" => "PutBucketAnalyticsConfiguration",
1350                "DELETE" => "DeleteBucketAnalyticsConfiguration",
1351                _ => return None,
1352            });
1353        }
1354        if has("intelligent-tiering") {
1355            return Some(match method {
1356                "GET" if has("id") => "GetBucketIntelligentTieringConfiguration",
1357                "GET" => "ListBucketIntelligentTieringConfigurations",
1358                "PUT" => "PutBucketIntelligentTieringConfiguration",
1359                "DELETE" => "DeleteBucketIntelligentTieringConfiguration",
1360                _ => return None,
1361            });
1362        }
1363        if has("metrics") {
1364            return Some(match method {
1365                "GET" if has("id") => "GetBucketMetricsConfiguration",
1366                "GET" => "ListBucketMetricsConfigurations",
1367                "PUT" => "PutBucketMetricsConfiguration",
1368                "DELETE" => "DeleteBucketMetricsConfiguration",
1369                _ => return None,
1370            });
1371        }
1372        if has("requestPayment") {
1373            return Some(match method {
1374                "GET" => "GetBucketRequestPayment",
1375                "PUT" => "PutBucketRequestPayment",
1376                _ => return None,
1377            });
1378        }
1379        if has("policyStatus") && is_get {
1380            return Some("GetBucketPolicyStatus");
1381        }
1382        if has("metadataConfiguration") {
1383            return Some(match method {
1384                "GET" => "GetBucketMetadataConfiguration",
1385                "POST" => "CreateBucketMetadataConfiguration",
1386                "DELETE" => "DeleteBucketMetadataConfiguration",
1387                _ => return None,
1388            });
1389        }
1390        if has("metadataTable") {
1391            return Some(match method {
1392                "GET" => "GetBucketMetadataTableConfiguration",
1393                "POST" => "CreateBucketMetadataTableConfiguration",
1394                "DELETE" => "DeleteBucketMetadataTableConfiguration",
1395                _ => return None,
1396            });
1397        }
1398        if has("metadataInventoryTable") && is_put {
1399            return Some("UpdateBucketMetadataInventoryTableConfiguration");
1400        }
1401        if has("metadataJournalTable") && is_put {
1402            return Some("UpdateBucketMetadataJournalTableConfiguration");
1403        }
1404        if has("abac") && is_put {
1405            return Some("PutBucketAbacConfiguration");
1406        }
1407        if has("renameObject") && is_put {
1408            return Some("RenameObject");
1409        }
1410        if has("object-lock") {
1411            return Some(match method {
1412                "GET" => "GetObjectLockConfiguration",
1413                "PUT" => "PutObjectLockConfiguration",
1414                _ => return None,
1415            });
1416        }
1417        if has("location") {
1418            return Some("GetBucketLocation");
1419        }
1420        if is_post && has("delete") {
1421            return Some("DeleteObjects");
1422        }
1423        if is_get && has("versions") {
1424            return Some("ListObjectVersions");
1425        }
1426    }
1427
1428    // Plain bucket/object methods.
1429    match (method, has_key) {
1430        ("GET", true) => Some("GetObject"),
1431        ("PUT", true) => {
1432            // CopyObject uses x-amz-copy-source but we don't have headers
1433            // handy here — treat both PutObject and CopyObject as PutObject
1434            // for IAM purposes; CopyObject additionally requires
1435            // s3:GetObject on the source but that's evaluated per-request
1436            // by real AWS, not on the PUT call itself.
1437            Some("PutObject")
1438        }
1439        ("DELETE", true) => Some("DeleteObject"),
1440        ("HEAD", true) => Some("HeadObject"),
1441        ("GET", false) => {
1442            if query.contains_key("list-type") {
1443                Some("ListObjectsV2")
1444            } else {
1445                Some("ListObjects")
1446            }
1447        }
1448        ("PUT", false) => Some("CreateBucket"),
1449        ("DELETE", false) => Some("DeleteBucket"),
1450        ("HEAD", false) => Some("HeadBucket"),
1451        _ => None,
1452    }
1453}
1454
1455/// Build the S3 resource ARN for an action. Returns `*` for
1456/// `ListBuckets` (account-scoped), a bucket ARN for bucket-level
1457/// configuration actions, or an object ARN for object-level actions.
1458fn s3_resource_for(action: &'static str, bucket: Option<&str>, key: Option<&str>) -> String {
1459    // Object-level actions work on `bucket/key`.
1460    const OBJECT_ACTIONS: &[&str] = &[
1461        "PutObject",
1462        "GetObject",
1463        "DeleteObject",
1464        "HeadObject",
1465        "CopyObject",
1466        "GetObjectAttributes",
1467        "RestoreObject",
1468        "PutObjectTagging",
1469        "GetObjectTagging",
1470        "DeleteObjectTagging",
1471        "PutObjectAcl",
1472        "GetObjectAcl",
1473        "PutObjectRetention",
1474        "GetObjectRetention",
1475        "PutObjectLegalHold",
1476        "GetObjectLegalHold",
1477        "CreateMultipartUpload",
1478        "UploadPart",
1479        "UploadPartCopy",
1480        "CompleteMultipartUpload",
1481        "AbortMultipartUpload",
1482        "ListParts",
1483    ];
1484    if action == "ListBuckets" {
1485        return "*".to_string();
1486    }
1487    let Some(bucket) = bucket else {
1488        return "*".to_string();
1489    };
1490    if OBJECT_ACTIONS.contains(&action) {
1491        match key {
1492            Some(k) if !k.is_empty() => Arn::s3(&format!("{bucket}/{k}")).to_string(),
1493            _ => Arn::s3(&format!("{bucket}/*")).to_string(),
1494        }
1495    } else {
1496        // Bucket-level actions (ListObjectsV2, GetBucketTagging, ...).
1497        Arn::s3(bucket).to_string()
1498    }
1499}
1500
1501// Conditional request helpers
1502
1503/// Truncate a DateTime to second-level precision (HTTP dates have no sub-second info).
1504pub(crate) fn truncate_to_seconds(dt: DateTime<Utc>) -> DateTime<Utc> {
1505    dt.with_nanosecond(0).unwrap_or(dt)
1506}
1507
1508pub(crate) fn check_get_conditionals(
1509    req: &AwsRequest,
1510    obj: &S3Object,
1511) -> Result<(), AwsServiceError> {
1512    let obj_etag = format!("\"{}\"", obj.etag);
1513    let obj_time = truncate_to_seconds(obj.last_modified);
1514
1515    // If-Match
1516    if let Some(if_match) = req.headers.get("if-match").and_then(|v| v.to_str().ok()) {
1517        if !etag_matches(if_match, &obj_etag) {
1518            return Err(precondition_failed("If-Match"));
1519        }
1520    }
1521
1522    // If-None-Match
1523    if let Some(if_none_match) = req
1524        .headers
1525        .get("if-none-match")
1526        .and_then(|v| v.to_str().ok())
1527    {
1528        if etag_matches(if_none_match, &obj_etag) {
1529            return Err(not_modified_with_etag(&obj_etag));
1530        }
1531    }
1532
1533    // If-Unmodified-Since
1534    if let Some(since) = req
1535        .headers
1536        .get("if-unmodified-since")
1537        .and_then(|v| v.to_str().ok())
1538    {
1539        if let Some(dt) = parse_http_date(since) {
1540            if obj_time > dt {
1541                return Err(precondition_failed("If-Unmodified-Since"));
1542            }
1543        }
1544    }
1545
1546    // If-Modified-Since
1547    if let Some(since) = req
1548        .headers
1549        .get("if-modified-since")
1550        .and_then(|v| v.to_str().ok())
1551    {
1552        if let Some(dt) = parse_http_date(since) {
1553            if obj_time <= dt {
1554                return Err(not_modified());
1555            }
1556        }
1557    }
1558
1559    Ok(())
1560}
1561
1562pub(crate) fn check_head_conditionals(
1563    req: &AwsRequest,
1564    obj: &S3Object,
1565) -> Result<(), AwsServiceError> {
1566    let obj_etag = format!("\"{}\"", obj.etag);
1567    let obj_time = truncate_to_seconds(obj.last_modified);
1568
1569    // If-Match
1570    if let Some(if_match) = req.headers.get("if-match").and_then(|v| v.to_str().ok()) {
1571        if !etag_matches(if_match, &obj_etag) {
1572            return Err(AwsServiceError::aws_error(
1573                StatusCode::PRECONDITION_FAILED,
1574                "412",
1575                "Precondition Failed",
1576            ));
1577        }
1578    }
1579
1580    // If-None-Match
1581    if let Some(if_none_match) = req
1582        .headers
1583        .get("if-none-match")
1584        .and_then(|v| v.to_str().ok())
1585    {
1586        if etag_matches(if_none_match, &obj_etag) {
1587            return Err(not_modified_with_etag(&obj_etag));
1588        }
1589    }
1590
1591    // If-Unmodified-Since
1592    if let Some(since) = req
1593        .headers
1594        .get("if-unmodified-since")
1595        .and_then(|v| v.to_str().ok())
1596    {
1597        if let Some(dt) = parse_http_date(since) {
1598            if obj_time > dt {
1599                return Err(AwsServiceError::aws_error(
1600                    StatusCode::PRECONDITION_FAILED,
1601                    "412",
1602                    "Precondition Failed",
1603                ));
1604            }
1605        }
1606    }
1607
1608    // If-Modified-Since
1609    if let Some(since) = req
1610        .headers
1611        .get("if-modified-since")
1612        .and_then(|v| v.to_str().ok())
1613    {
1614        if let Some(dt) = parse_http_date(since) {
1615            if obj_time <= dt {
1616                return Err(not_modified());
1617            }
1618        }
1619    }
1620
1621    Ok(())
1622}
1623
1624pub(crate) fn etag_matches(condition: &str, obj_etag: &str) -> bool {
1625    let condition = condition.trim();
1626    if condition == "*" {
1627        return true;
1628    }
1629    let clean_etag = obj_etag.replace('"', "");
1630    // Split on comma to handle multi-value If-Match / If-None-Match
1631    for part in condition.split(',') {
1632        let part = part.trim().replace('"', "");
1633        if part == clean_etag {
1634            return true;
1635        }
1636    }
1637    false
1638}
1639
1640pub(crate) fn parse_http_date(s: &str) -> Option<DateTime<Utc>> {
1641    // Try RFC 2822 format: "Sat, 01 Jan 2000 00:00:00 GMT"
1642    if let Ok(dt) = DateTime::parse_from_rfc2822(s) {
1643        return Some(dt.with_timezone(&Utc));
1644    }
1645    // Try RFC 3339
1646    if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
1647        return Some(dt.with_timezone(&Utc));
1648    }
1649    // Try common HTTP date format: "%a, %d %b %Y %H:%M:%S GMT"
1650    if let Ok(dt) =
1651        chrono::NaiveDateTime::parse_from_str(s.trim_end_matches(" GMT"), "%a, %d %b %Y %H:%M:%S")
1652    {
1653        return Some(dt.and_utc());
1654    }
1655    // Try ISO 8601
1656    if let Ok(dt) = s.parse::<DateTime<Utc>>() {
1657        return Some(dt);
1658    }
1659    None
1660}
1661
1662pub(crate) fn not_modified() -> AwsServiceError {
1663    AwsServiceError::aws_error(StatusCode::NOT_MODIFIED, "304", "Not Modified")
1664}
1665
1666pub(crate) fn not_modified_with_etag(etag: &str) -> AwsServiceError {
1667    AwsServiceError::aws_error_with_headers(
1668        StatusCode::NOT_MODIFIED,
1669        "304",
1670        "Not Modified",
1671        vec![("etag".to_string(), etag.to_string())],
1672    )
1673}
1674
1675pub(crate) fn precondition_failed(condition: &str) -> AwsServiceError {
1676    AwsServiceError::aws_error_with_fields(
1677        StatusCode::PRECONDITION_FAILED,
1678        "PreconditionFailed",
1679        "At least one of the pre-conditions you specified did not hold",
1680        vec![("Condition".to_string(), condition.to_string())],
1681    )
1682}
1683
1684// ACL helpers
1685
1686pub(crate) fn build_acl_xml(owner_id: &str, grants: &[AclGrant], _account_id: &str) -> String {
1687    let mut grants_xml = String::new();
1688    for g in grants {
1689        let grantee_xml = if g.grantee_type == "Group" {
1690            let uri = g.grantee_uri.as_deref().unwrap_or("");
1691            format!(
1692                "<Grantee xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xsi:type=\"Group\">\
1693                 <URI>{}</URI></Grantee>",
1694                xml_escape(uri),
1695            )
1696        } else {
1697            let id = g.grantee_id.as_deref().unwrap_or("");
1698            format!(
1699                "<Grantee xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xsi:type=\"CanonicalUser\">\
1700                 <ID>{}</ID></Grantee>",
1701                xml_escape(id),
1702            )
1703        };
1704        grants_xml.push_str(&format!(
1705            "<Grant>{grantee_xml}<Permission>{}</Permission></Grant>",
1706            xml_escape(&g.permission),
1707        ));
1708    }
1709
1710    format!(
1711        "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
1712         <AccessControlPolicy xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
1713         <Owner><ID>{owner_id}</ID><DisplayName>{owner_id}</DisplayName></Owner>\
1714         <AccessControlList>{grants_xml}</AccessControlList>\
1715         </AccessControlPolicy>",
1716        owner_id = xml_escape(owner_id),
1717    )
1718}
1719
1720pub(crate) fn canned_acl_grants(acl: &str, owner_id: &str) -> Vec<AclGrant> {
1721    let owner_grant = AclGrant {
1722        grantee_type: "CanonicalUser".to_string(),
1723        grantee_id: Some(owner_id.to_string()),
1724        grantee_display_name: Some(owner_id.to_string()),
1725        grantee_uri: None,
1726        permission: "FULL_CONTROL".to_string(),
1727    };
1728    match acl {
1729        "private" => vec![owner_grant],
1730        "public-read" => vec![
1731            owner_grant,
1732            AclGrant {
1733                grantee_type: "Group".to_string(),
1734                grantee_id: None,
1735                grantee_display_name: None,
1736                grantee_uri: Some("http://acs.amazonaws.com/groups/global/AllUsers".to_string()),
1737                permission: "READ".to_string(),
1738            },
1739        ],
1740        "public-read-write" => vec![
1741            owner_grant,
1742            AclGrant {
1743                grantee_type: "Group".to_string(),
1744                grantee_id: None,
1745                grantee_display_name: None,
1746                grantee_uri: Some("http://acs.amazonaws.com/groups/global/AllUsers".to_string()),
1747                permission: "READ".to_string(),
1748            },
1749            AclGrant {
1750                grantee_type: "Group".to_string(),
1751                grantee_id: None,
1752                grantee_display_name: None,
1753                grantee_uri: Some("http://acs.amazonaws.com/groups/global/AllUsers".to_string()),
1754                permission: "WRITE".to_string(),
1755            },
1756        ],
1757        "authenticated-read" => vec![
1758            owner_grant,
1759            AclGrant {
1760                grantee_type: "Group".to_string(),
1761                grantee_id: None,
1762                grantee_display_name: None,
1763                grantee_uri: Some(
1764                    "http://acs.amazonaws.com/groups/global/AuthenticatedUsers".to_string(),
1765                ),
1766                permission: "READ".to_string(),
1767            },
1768        ],
1769        "bucket-owner-full-control" => vec![owner_grant],
1770        _ => vec![owner_grant],
1771    }
1772}
1773
1774pub(crate) fn canned_acl_grants_for_object(acl: &str, owner_id: &str) -> Vec<AclGrant> {
1775    // For objects, canned ACLs work the same way
1776    canned_acl_grants(acl, owner_id)
1777}
1778
1779pub(crate) fn parse_grant_headers(headers: &HeaderMap) -> Vec<AclGrant> {
1780    let mut grants = Vec::new();
1781    let header_permission_map = [
1782        ("x-amz-grant-read", "READ"),
1783        ("x-amz-grant-write", "WRITE"),
1784        ("x-amz-grant-read-acp", "READ_ACP"),
1785        ("x-amz-grant-write-acp", "WRITE_ACP"),
1786        ("x-amz-grant-full-control", "FULL_CONTROL"),
1787    ];
1788
1789    for (header, permission) in &header_permission_map {
1790        if let Some(value) = headers.get(*header).and_then(|v| v.to_str().ok()) {
1791            // Parse "id=xxx" or "uri=xxx" or "emailAddress=xxx"
1792            for part in value.split(',') {
1793                let part = part.trim();
1794                if let Some((key, val)) = part.split_once('=') {
1795                    let val = val.trim().trim_matches('"');
1796                    let key = key.trim().to_lowercase();
1797                    match key.as_str() {
1798                        "id" => {
1799                            grants.push(AclGrant {
1800                                grantee_type: "CanonicalUser".to_string(),
1801                                grantee_id: Some(val.to_string()),
1802                                grantee_display_name: Some(val.to_string()),
1803                                grantee_uri: None,
1804                                permission: permission.to_string(),
1805                            });
1806                        }
1807                        "uri" | "url" => {
1808                            grants.push(AclGrant {
1809                                grantee_type: "Group".to_string(),
1810                                grantee_id: None,
1811                                grantee_display_name: None,
1812                                grantee_uri: Some(val.to_string()),
1813                                permission: permission.to_string(),
1814                            });
1815                        }
1816                        _ => {}
1817                    }
1818                }
1819            }
1820        }
1821    }
1822    grants
1823}
1824
1825pub(crate) fn parse_acl_xml(xml: &str) -> Result<Vec<AclGrant>, AwsServiceError> {
1826    // Check for Owner presence
1827    if xml.contains("<AccessControlPolicy") && !xml.contains("<Owner>") {
1828        return Err(AwsServiceError::aws_error(
1829            StatusCode::BAD_REQUEST,
1830            "MalformedACLError",
1831            "The XML you provided was not well-formed or did not validate against our published schema",
1832        ));
1833    }
1834
1835    let valid_permissions = ["READ", "WRITE", "READ_ACP", "WRITE_ACP", "FULL_CONTROL"];
1836
1837    let mut grants = Vec::new();
1838    let mut remaining = xml;
1839    while let Some(start) = remaining.find("<Grant>") {
1840        let after = &remaining[start + 7..];
1841        if let Some(end) = after.find("</Grant>") {
1842            let grant_body = &after[..end];
1843
1844            // Extract permission
1845            let permission = extract_xml_value(grant_body, "Permission").unwrap_or_default();
1846            if !valid_permissions.contains(&permission.as_str()) {
1847                return Err(AwsServiceError::aws_error(
1848                    StatusCode::BAD_REQUEST,
1849                    "MalformedACLError",
1850                    "The XML you provided was not well-formed or did not validate against our published schema",
1851                ));
1852            }
1853
1854            // Determine grantee type
1855            if grant_body.contains("xsi:type=\"Group\"") || grant_body.contains("<URI>") {
1856                let uri = extract_xml_value(grant_body, "URI").unwrap_or_default();
1857                grants.push(AclGrant {
1858                    grantee_type: "Group".to_string(),
1859                    grantee_id: None,
1860                    grantee_display_name: None,
1861                    grantee_uri: Some(uri),
1862                    permission,
1863                });
1864            } else {
1865                let id = extract_xml_value(grant_body, "ID").unwrap_or_default();
1866                let display =
1867                    extract_xml_value(grant_body, "DisplayName").unwrap_or_else(|| id.clone());
1868                grants.push(AclGrant {
1869                    grantee_type: "CanonicalUser".to_string(),
1870                    grantee_id: Some(id),
1871                    grantee_display_name: Some(display),
1872                    grantee_uri: None,
1873                    permission,
1874                });
1875            }
1876
1877            remaining = &after[end + 8..];
1878        } else {
1879            break;
1880        }
1881    }
1882    Ok(grants)
1883}
1884
1885// Range helpers
1886
1887pub(crate) enum RangeResult {
1888    Satisfiable { start: usize, end: usize },
1889    NotSatisfiable,
1890    Ignored,
1891}
1892
1893pub(crate) fn parse_range_header(range_str: &str, total_size: usize) -> Option<RangeResult> {
1894    let range_str = range_str.strip_prefix("bytes=")?;
1895    let (start_str, end_str) = range_str.split_once('-')?;
1896    if start_str.is_empty() {
1897        let suffix_len: usize = end_str.parse().ok()?;
1898        if suffix_len == 0 || total_size == 0 {
1899            return Some(RangeResult::NotSatisfiable);
1900        }
1901        let start = total_size.saturating_sub(suffix_len);
1902        Some(RangeResult::Satisfiable {
1903            start,
1904            end: total_size - 1,
1905        })
1906    } else {
1907        let start: usize = start_str.parse().ok()?;
1908        if start >= total_size {
1909            return Some(RangeResult::NotSatisfiable);
1910        }
1911        let end = if end_str.is_empty() {
1912            total_size - 1
1913        } else {
1914            let e: usize = end_str.parse().ok()?;
1915            if e < start {
1916                return Some(RangeResult::Ignored);
1917            }
1918            std::cmp::min(e, total_size - 1)
1919        };
1920        Some(RangeResult::Satisfiable { start, end })
1921    }
1922}
1923
1924// Helpers
1925
1926/// S3 XML response with `application/xml` content type (unlike Query protocol's `text/xml`).
1927pub(crate) fn s3_xml(status: StatusCode, body: impl Into<Bytes>) -> AwsResponse {
1928    AwsResponse {
1929        status,
1930        content_type: "application/xml".to_string(),
1931        body: body.into().into(),
1932        headers: HeaderMap::new(),
1933    }
1934}
1935
1936pub(crate) fn empty_response(status: StatusCode) -> AwsResponse {
1937    AwsResponse {
1938        status,
1939        content_type: "application/xml".to_string(),
1940        body: Bytes::new().into(),
1941        headers: HeaderMap::new(),
1942    }
1943}
1944
1945/// Returns true when the object is stored in a "cold" storage class (GLACIER, DEEP_ARCHIVE)
1946/// and has NOT been restored (or restore is still in progress).
1947pub(crate) fn is_frozen(obj: &S3Object) -> bool {
1948    matches!(obj.storage_class.as_str(), "GLACIER" | "DEEP_ARCHIVE")
1949        && obj.restore_ongoing != Some(false)
1950}
1951
1952pub(crate) fn no_such_bucket(bucket: &str) -> AwsServiceError {
1953    AwsServiceError::aws_error_with_fields(
1954        StatusCode::NOT_FOUND,
1955        "NoSuchBucket",
1956        "The specified bucket does not exist",
1957        vec![("BucketName".to_string(), bucket.to_string())],
1958    )
1959}
1960
1961pub(crate) fn no_such_key(key: &str) -> AwsServiceError {
1962    AwsServiceError::aws_error_with_fields(
1963        StatusCode::NOT_FOUND,
1964        "NoSuchKey",
1965        "The specified key does not exist.",
1966        vec![("Key".to_string(), key.to_string())],
1967    )
1968}
1969
1970pub(crate) fn no_such_upload(upload_id: &str) -> AwsServiceError {
1971    AwsServiceError::aws_error_with_fields(
1972        StatusCode::NOT_FOUND,
1973        "NoSuchUpload",
1974        "The specified upload does not exist. The upload ID may be invalid, \
1975         or the upload may have been aborted or completed.",
1976        vec![("UploadId".to_string(), upload_id.to_string())],
1977    )
1978}
1979
1980pub(crate) fn no_such_key_with_detail(key: &str) -> AwsServiceError {
1981    AwsServiceError::aws_error_with_fields(
1982        StatusCode::NOT_FOUND,
1983        "NoSuchKey",
1984        "The specified key does not exist.",
1985        vec![("Key".to_string(), key.to_string())],
1986    )
1987}
1988
1989pub(crate) fn compute_md5(data: &[u8]) -> String {
1990    let digest = Md5::digest(data);
1991    format!("{:x}", digest)
1992}
1993
1994pub(crate) fn compute_checksum(algorithm: &str, data: &[u8]) -> String {
1995    match algorithm {
1996        "CRC32" => {
1997            let crc = crc32fast::hash(data);
1998            BASE64.encode(crc.to_be_bytes())
1999        }
2000        "SHA1" => {
2001            use sha1::Digest as _;
2002            let hash = sha1::Sha1::digest(data);
2003            BASE64.encode(hash)
2004        }
2005        "SHA256" => {
2006            use sha2::Digest as _;
2007            let hash = sha2::Sha256::digest(data);
2008            BASE64.encode(hash)
2009        }
2010        _ => String::new(),
2011    }
2012}
2013
2014/// Streaming variant of [`compute_checksum`] for spool files. Reads
2015/// the file in 1 MiB chunks and feeds each chunk into the hasher so a
2016/// 1 GiB upload computes its CRC32 / SHA-1 / SHA-256 in constant
2017/// memory rather than via `tokio::fs::read` (which would allocate the
2018/// whole file as one buffer).
2019pub(crate) async fn compute_checksum_streaming(
2020    algorithm: &str,
2021    path: &std::path::Path,
2022) -> Result<String, std::io::Error> {
2023    use tokio::io::AsyncReadExt;
2024    let mut file = tokio::fs::File::open(path).await?;
2025    let mut buf = vec![0u8; 1024 * 1024];
2026    match algorithm {
2027        "CRC32" => {
2028            let mut hasher = crc32fast::Hasher::new();
2029            loop {
2030                let n = file.read(&mut buf).await?;
2031                if n == 0 {
2032                    break;
2033                }
2034                hasher.update(&buf[..n]);
2035            }
2036            Ok(BASE64.encode(hasher.finalize().to_be_bytes()))
2037        }
2038        "CRC32C" => {
2039            let mut crc: u32 = 0;
2040            loop {
2041                let n = file.read(&mut buf).await?;
2042                if n == 0 {
2043                    break;
2044                }
2045                crc = crc32c::crc32c_append(crc, &buf[..n]);
2046            }
2047            Ok(BASE64.encode(crc.to_be_bytes()))
2048        }
2049        "CRC64NVME" => {
2050            let mut hasher = crc64fast_nvme::Digest::new();
2051            loop {
2052                let n = file.read(&mut buf).await?;
2053                if n == 0 {
2054                    break;
2055                }
2056                hasher.write(&buf[..n]);
2057            }
2058            Ok(BASE64.encode(hasher.sum64().to_be_bytes()))
2059        }
2060        "SHA1" => {
2061            use sha1::Digest as _;
2062            let mut hasher = sha1::Sha1::new();
2063            loop {
2064                let n = file.read(&mut buf).await?;
2065                if n == 0 {
2066                    break;
2067                }
2068                hasher.update(&buf[..n]);
2069            }
2070            Ok(BASE64.encode(hasher.finalize()))
2071        }
2072        "SHA256" => {
2073            use sha2::Digest as _;
2074            let mut hasher = sha2::Sha256::new();
2075            loop {
2076                let n = file.read(&mut buf).await?;
2077                if n == 0 {
2078                    break;
2079                }
2080                hasher.update(&buf[..n]);
2081            }
2082            Ok(BASE64.encode(hasher.finalize()))
2083        }
2084        _ => Ok(String::new()),
2085    }
2086}
2087
2088pub(crate) fn url_encode_s3_key(s: &str) -> String {
2089    let mut out = String::with_capacity(s.len() * 2);
2090    for byte in s.bytes() {
2091        match byte {
2092            b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' | b'/' => {
2093                out.push(byte as char);
2094            }
2095            _ => {
2096                out.push_str(&format!("%{:02X}", byte));
2097            }
2098        }
2099    }
2100    out
2101}
2102
2103pub(crate) use fakecloud_aws::xml::xml_escape;
2104
2105pub(crate) fn extract_user_metadata(
2106    headers: &HeaderMap,
2107) -> std::collections::BTreeMap<String, String> {
2108    let mut meta = std::collections::BTreeMap::new();
2109    for (name, value) in headers {
2110        if let Some(key) = name.as_str().strip_prefix("x-amz-meta-") {
2111            if let Ok(v) = value.to_str() {
2112                meta.insert(key.to_string(), v.to_string());
2113            }
2114        }
2115    }
2116    meta
2117}
2118
2119pub(crate) fn is_valid_storage_class(class: &str) -> bool {
2120    matches!(
2121        class,
2122        "STANDARD"
2123            | "REDUCED_REDUNDANCY"
2124            | "STANDARD_IA"
2125            | "ONEZONE_IA"
2126            | "INTELLIGENT_TIERING"
2127            | "GLACIER"
2128            | "DEEP_ARCHIVE"
2129            | "GLACIER_IR"
2130            | "OUTPOSTS"
2131            | "SNOW"
2132            | "EXPRESS_ONEZONE"
2133    )
2134}
2135
2136pub(crate) fn is_valid_bucket_name(name: &str) -> bool {
2137    if name.len() < 3 || name.len() > 63 {
2138        return false;
2139    }
2140    // Must start and end with alphanumeric
2141    let bytes = name.as_bytes();
2142    if !bytes[0].is_ascii_alphanumeric() || !bytes[bytes.len() - 1].is_ascii_alphanumeric() {
2143        return false;
2144    }
2145    // Only lowercase letters, digits, hyphens, dots (also allow underscores for compatibility)
2146    name.chars()
2147        .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '-' || c == '.' || c == '_')
2148}
2149
2150pub(crate) fn is_valid_region(region: &str) -> bool {
2151    // Basic validation: region should match pattern like us-east-1, eu-west-2, etc.
2152    let valid_regions = [
2153        "us-east-1",
2154        "us-east-2",
2155        "us-west-1",
2156        "us-west-2",
2157        "af-south-1",
2158        "ap-east-1",
2159        "ap-south-1",
2160        "ap-south-2",
2161        "ap-southeast-1",
2162        "ap-southeast-2",
2163        "ap-southeast-3",
2164        "ap-southeast-4",
2165        "ap-northeast-1",
2166        "ap-northeast-2",
2167        "ap-northeast-3",
2168        "ca-central-1",
2169        "ca-west-1",
2170        "eu-central-1",
2171        "eu-central-2",
2172        "eu-west-1",
2173        "eu-west-2",
2174        "eu-west-3",
2175        "eu-south-1",
2176        "eu-south-2",
2177        "eu-north-1",
2178        "il-central-1",
2179        "me-south-1",
2180        "me-central-1",
2181        "sa-east-1",
2182        "cn-north-1",
2183        "cn-northwest-1",
2184        "us-gov-east-1",
2185        "us-gov-east-2",
2186        "us-gov-west-1",
2187        "us-iso-east-1",
2188        "us-iso-west-1",
2189        "us-isob-east-1",
2190        "us-isof-south-1",
2191    ];
2192    valid_regions.contains(&region)
2193}
2194
2195pub(crate) fn resolve_object<'a>(
2196    b: &'a S3Bucket,
2197    key: &str,
2198    version_id: Option<&String>,
2199) -> Result<&'a S3Object, AwsServiceError> {
2200    if let Some(vid) = version_id {
2201        // "null" version ID refers to an object with no version_id (pre-versioning)
2202        if vid == "null" {
2203            // Check versions for a pre-versioning object (version_id == None or Some("null"))
2204            if let Some(versions) = b.object_versions.get(key) {
2205                if let Some(obj) = versions
2206                    .iter()
2207                    .find(|o| o.version_id.is_none() || o.version_id.as_deref() == Some("null"))
2208                {
2209                    return Ok(obj);
2210                }
2211            }
2212            // Also check current object if it has no version_id
2213            if let Some(obj) = b.objects.get(key) {
2214                if obj.version_id.is_none() || obj.version_id.as_deref() == Some("null") {
2215                    return Ok(obj);
2216                }
2217            }
2218        } else {
2219            // When a specific versionId is requested, check versions first
2220            if let Some(versions) = b.object_versions.get(key) {
2221                if let Some(obj) = versions
2222                    .iter()
2223                    .find(|o| o.version_id.as_deref() == Some(vid.as_str()))
2224                {
2225                    return Ok(obj);
2226                }
2227            }
2228            // Also check current object
2229            if let Some(obj) = b.objects.get(key) {
2230                if obj.version_id.as_deref() == Some(vid.as_str()) {
2231                    return Ok(obj);
2232                }
2233            }
2234        }
2235        // For versioned buckets, return NoSuchVersion; for non-versioned, return 400
2236        if b.versioning.is_some() {
2237            Err(AwsServiceError::aws_error_with_fields(
2238                StatusCode::NOT_FOUND,
2239                "NoSuchVersion",
2240                "The specified version does not exist.",
2241                vec![
2242                    ("Key".to_string(), key.to_string()),
2243                    ("VersionId".to_string(), vid.to_string()),
2244                ],
2245            ))
2246        } else {
2247            Err(AwsServiceError::aws_error(
2248                StatusCode::BAD_REQUEST,
2249                "InvalidArgument",
2250                "Invalid version id specified",
2251            ))
2252        }
2253    } else {
2254        b.objects.get(key).ok_or_else(|| no_such_key(key))
2255    }
2256}
2257
2258pub(crate) fn make_delete_marker(key: &str, dm_id: &str) -> S3Object {
2259    S3Object {
2260        key: key.to_string(),
2261        last_modified: Utc::now(),
2262        storage_class: "STANDARD".to_string(),
2263        version_id: Some(dm_id.to_string()),
2264        is_delete_marker: true,
2265        ..Default::default()
2266    }
2267}
2268
2269/// Represents an object to delete in a batch delete request.
2270pub(crate) struct DeleteObjectEntry {
2271    key: String,
2272    version_id: Option<String>,
2273}
2274
2275pub(crate) fn parse_delete_objects_xml(xml: &str) -> Vec<DeleteObjectEntry> {
2276    let mut entries = Vec::new();
2277    let mut remaining = xml;
2278    while let Some(obj_start) = remaining.find("<Object>") {
2279        let after = &remaining[obj_start + 8..];
2280        if let Some(obj_end) = after.find("</Object>") {
2281            let obj_body = &after[..obj_end];
2282            let key = extract_xml_value(obj_body, "Key");
2283            let version_id = extract_xml_value(obj_body, "VersionId");
2284            if let Some(k) = key {
2285                entries.push(DeleteObjectEntry { key: k, version_id });
2286            }
2287            remaining = &after[obj_end + 9..];
2288        } else {
2289            break;
2290        }
2291    }
2292    entries
2293}
2294
2295/// Returns true when the request body's top-level <Quiet> element is "true".
2296/// AWS docs: when Quiet=true, only failed deletes are emitted.
2297pub(crate) fn parse_delete_objects_quiet(xml: &str) -> bool {
2298    extract_xml_value(xml, "Quiet")
2299        .map(|v| v.eq_ignore_ascii_case("true"))
2300        .unwrap_or(false)
2301}
2302
2303/// Minimal XML parser for `<Tagging><TagSet><Tag><Key>k</Key><Value>v</Value></Tag>...`.
2304/// Returns a Vec to preserve insertion order and detect duplicates.
2305pub(crate) fn parse_tagging_xml(xml: &str) -> Vec<(String, String)> {
2306    let mut tags = Vec::new();
2307    let mut remaining = xml;
2308    while let Some(tag_start) = remaining.find("<Tag>") {
2309        let after = &remaining[tag_start + 5..];
2310        if let Some(tag_end) = after.find("</Tag>") {
2311            let tag_body = &after[..tag_end];
2312            let key = extract_xml_value(tag_body, "Key");
2313            let value = extract_xml_value(tag_body, "Value");
2314            if let (Some(k), Some(v)) = (key, value) {
2315                tags.push((k, v));
2316            }
2317            remaining = &after[tag_end + 6..];
2318        } else {
2319            break;
2320        }
2321    }
2322    tags
2323}
2324
2325pub(crate) fn validate_tags(tags: &[(String, String)]) -> Result<(), AwsServiceError> {
2326    // Check for duplicate keys
2327    let mut seen = std::collections::HashSet::new();
2328    for (k, _) in tags {
2329        if !seen.insert(k.as_str()) {
2330            return Err(AwsServiceError::aws_error(
2331                StatusCode::BAD_REQUEST,
2332                "InvalidTag",
2333                "Cannot provide multiple Tags with the same key",
2334            ));
2335        }
2336        // Check for aws: prefix
2337        if k.starts_with("aws:") {
2338            return Err(AwsServiceError::aws_error(
2339                StatusCode::BAD_REQUEST,
2340                "InvalidTag",
2341                "System tags cannot be added/updated by requester",
2342            ));
2343        }
2344    }
2345    Ok(())
2346}
2347
2348pub(crate) fn extract_xml_value(xml: &str, tag: &str) -> Option<String> {
2349    // Handle self-closing tags like <Value /> or <Value/>
2350    let self_closing1 = format!("<{tag} />");
2351    let self_closing2 = format!("<{tag}/>");
2352    if xml.contains(&self_closing1) || xml.contains(&self_closing2) {
2353        // Check if the self-closing tag appears before any open+close pair
2354        let self_pos = xml
2355            .find(&self_closing1)
2356            .or_else(|| xml.find(&self_closing2));
2357        let open = format!("<{tag}>");
2358        let open_pos = xml.find(&open);
2359        match (self_pos, open_pos) {
2360            (Some(sp), Some(op)) if sp < op => return Some(String::new()),
2361            (Some(_), None) => return Some(String::new()),
2362            _ => {}
2363        }
2364    }
2365
2366    let open = format!("<{tag}>");
2367    let close = format!("</{tag}>");
2368    let start = xml.find(&open)? + open.len();
2369    let end = xml.find(&close)?;
2370    Some(xml[start..end].to_string())
2371}
2372
2373/// Parse the CompleteMultipartUpload XML body into (part_number, etag) pairs.
2374pub(crate) fn parse_complete_multipart_xml(xml: &str) -> Vec<(u32, String)> {
2375    let mut parts = Vec::new();
2376    let mut remaining = xml;
2377    while let Some(part_start) = remaining.find("<Part>") {
2378        let after = &remaining[part_start + 6..];
2379        if let Some(part_end) = after.find("</Part>") {
2380            let part_body = &after[..part_end];
2381            let part_num =
2382                extract_xml_value(part_body, "PartNumber").and_then(|s| s.parse::<u32>().ok());
2383            let etag = extract_xml_value(part_body, "ETag")
2384                .map(|s| s.replace("&quot;", "").replace('"', ""));
2385            if let (Some(num), Some(e)) = (part_num, etag) {
2386                parts.push((num, e));
2387            }
2388            remaining = &after[part_end + 7..];
2389        } else {
2390            break;
2391        }
2392    }
2393    parts
2394}
2395
2396pub(crate) fn parse_url_encoded_tags(s: &str) -> Vec<(String, String)> {
2397    let mut tags = Vec::new();
2398    for pair in s.split('&') {
2399        if pair.is_empty() {
2400            continue;
2401        }
2402        let (key, value) = match pair.find('=') {
2403            Some(pos) => (&pair[..pos], &pair[pos + 1..]),
2404            None => (pair, ""),
2405        };
2406        tags.push((
2407            percent_encoding::percent_decode_str(key)
2408                .decode_utf8_lossy()
2409                .to_string(),
2410            percent_encoding::percent_decode_str(value)
2411                .decode_utf8_lossy()
2412                .to_string(),
2413        ));
2414    }
2415    tags
2416}
2417
2418/// Validate lifecycle configuration XML. Returns MalformedXML on invalid configs.
2419pub(crate) fn validate_lifecycle_xml(xml: &str) -> Result<(), AwsServiceError> {
2420    let malformed = || {
2421        AwsServiceError::aws_error(
2422            StatusCode::BAD_REQUEST,
2423            "MalformedXML",
2424            "The XML you provided was not well-formed or did not validate against our published schema",
2425        )
2426    };
2427
2428    let mut remaining = xml;
2429    while let Some(rule_start) = remaining.find("<Rule>") {
2430        let after = &remaining[rule_start + 6..];
2431        if let Some(rule_end) = after.find("</Rule>") {
2432            let rule_body = &after[..rule_end];
2433
2434            // Must have Filter or Prefix
2435            let has_filter = rule_body.contains("<Filter>")
2436                || rule_body.contains("<Filter/>")
2437                || rule_body.contains("<Filter />");
2438
2439            // Check for <Prefix> at rule level (outside of <Filter>...</Filter>)
2440            let has_prefix_outside_filter = {
2441                if !rule_body.contains("<Prefix") {
2442                    false
2443                } else if !has_filter {
2444                    true // No filter means any Prefix is at rule level
2445                } else {
2446                    // Remove the Filter block and check if Prefix remains
2447                    let mut stripped = rule_body.to_string();
2448                    // Remove <Filter>...</Filter> or self-closing variants
2449                    if let Some(fs) = stripped.find("<Filter") {
2450                        if let Some(fe) = stripped.find("</Filter>") {
2451                            stripped = format!("{}{}", &stripped[..fs], &stripped[fe + 9..]);
2452                        }
2453                    }
2454                    stripped.contains("<Prefix")
2455                }
2456            };
2457
2458            if !has_filter && !has_prefix_outside_filter {
2459                return Err(malformed());
2460            }
2461            // Can't have both Filter and rule-level Prefix
2462            if has_filter && has_prefix_outside_filter {
2463                return Err(malformed());
2464            }
2465
2466            // Expiration: if has ExpiredObjectDeleteMarker, cannot also have Days or Date
2467            // (only check within <Expiration> block)
2468            if let Some(exp_start) = rule_body.find("<Expiration>") {
2469                if let Some(exp_end) = rule_body[exp_start..].find("</Expiration>") {
2470                    let exp_body = &rule_body[exp_start..exp_start + exp_end];
2471                    if exp_body.contains("<ExpiredObjectDeleteMarker>")
2472                        && (exp_body.contains("<Days>") || exp_body.contains("<Date>"))
2473                    {
2474                        return Err(malformed());
2475                    }
2476                }
2477            }
2478
2479            // Filter validation
2480            if has_filter {
2481                if let Some(fs) = rule_body.find("<Filter>") {
2482                    if let Some(fe) = rule_body.find("</Filter>") {
2483                        let filter_body = &rule_body[fs + 8..fe];
2484                        let has_prefix_in_filter = filter_body.contains("<Prefix");
2485                        let has_tag_in_filter = filter_body.contains("<Tag>");
2486                        let has_and_in_filter = filter_body.contains("<And>");
2487                        // Can't have both Prefix and Tag without And
2488                        if has_prefix_in_filter && has_tag_in_filter && !has_and_in_filter {
2489                            return Err(malformed());
2490                        }
2491                        // Can't have Tag and And simultaneously at the Filter level
2492                        if has_tag_in_filter && has_and_in_filter {
2493                            // Check if the <Tag> is outside <And>
2494                            let and_start = filter_body.find("<And>").unwrap_or(0);
2495                            let tag_pos = filter_body.find("<Tag>").unwrap_or(0);
2496                            if tag_pos < and_start {
2497                                return Err(malformed());
2498                            }
2499                        }
2500                    }
2501                }
2502            }
2503
2504            // NoncurrentVersionTransition must have NoncurrentDays and StorageClass
2505            if rule_body.contains("<NoncurrentVersionTransition>") {
2506                let mut nvt_remaining = rule_body;
2507                while let Some(nvt_start) = nvt_remaining.find("<NoncurrentVersionTransition>") {
2508                    let nvt_after = &nvt_remaining[nvt_start + 29..];
2509                    if let Some(nvt_end) = nvt_after.find("</NoncurrentVersionTransition>") {
2510                        let nvt_body = &nvt_after[..nvt_end];
2511                        if !nvt_body.contains("<NoncurrentDays>") {
2512                            return Err(malformed());
2513                        }
2514                        if !nvt_body.contains("<StorageClass>") {
2515                            return Err(malformed());
2516                        }
2517                        nvt_remaining = &nvt_after[nvt_end + 30..];
2518                    } else {
2519                        break;
2520                    }
2521                }
2522            }
2523
2524            remaining = &after[rule_end + 7..];
2525        } else {
2526            break;
2527        }
2528    }
2529
2530    Ok(())
2531}
2532
2533/// Parsed CORS rule from bucket configuration XML.
2534pub(crate) struct CorsRule {
2535    allowed_origins: Vec<String>,
2536    allowed_methods: Vec<String>,
2537    allowed_headers: Vec<String>,
2538    expose_headers: Vec<String>,
2539    max_age_seconds: Option<u32>,
2540}
2541
2542/// Parse CORS configuration XML into rules.
2543pub(crate) fn parse_cors_config(xml: &str) -> Vec<CorsRule> {
2544    let mut rules = Vec::new();
2545    let mut remaining = xml;
2546    while let Some(start) = remaining.find("<CORSRule>") {
2547        let after = &remaining[start + 10..];
2548        if let Some(end) = after.find("</CORSRule>") {
2549            let block = &after[..end];
2550            let allowed_origins = extract_all_xml_values(block, "AllowedOrigin");
2551            let allowed_methods = extract_all_xml_values(block, "AllowedMethod");
2552            let allowed_headers = extract_all_xml_values(block, "AllowedHeader");
2553            let expose_headers = extract_all_xml_values(block, "ExposeHeader");
2554            let max_age_seconds =
2555                extract_xml_value(block, "MaxAgeSeconds").and_then(|s| s.parse().ok());
2556            rules.push(CorsRule {
2557                allowed_origins,
2558                allowed_methods,
2559                allowed_headers,
2560                expose_headers,
2561                max_age_seconds,
2562            });
2563            remaining = &after[end + 11..];
2564        } else {
2565            break;
2566        }
2567    }
2568    rules
2569}
2570
2571/// Match an origin against a CORS allowed origin pattern (supports "*" wildcard).
2572pub(crate) fn origin_matches(origin: &str, pattern: &str) -> bool {
2573    if pattern == "*" {
2574        return true;
2575    }
2576    // Simple wildcard: *.example.com
2577    if let Some(suffix) = pattern.strip_prefix('*') {
2578        return origin.ends_with(suffix);
2579    }
2580    origin == pattern
2581}
2582
2583/// Find the matching CORS rule for a given origin and method.
2584pub(crate) fn find_cors_rule<'a>(
2585    rules: &'a [CorsRule],
2586    origin: &str,
2587    method: Option<&str>,
2588) -> Option<&'a CorsRule> {
2589    rules.iter().find(|rule| {
2590        let origin_ok = rule
2591            .allowed_origins
2592            .iter()
2593            .any(|o| origin_matches(origin, o));
2594        let method_ok = match method {
2595            Some(m) => rule.allowed_methods.iter().any(|am| am == m),
2596            None => true,
2597        };
2598        origin_ok && method_ok
2599    })
2600}
2601
2602/// Check if an object is locked (retention or legal hold) and should block mutation.
2603/// Returns an error string if locked, None if allowed.
2604pub(crate) fn check_object_lock_for_overwrite(
2605    obj: &S3Object,
2606    req: &AwsRequest,
2607) -> Option<&'static str> {
2608    // Legal hold blocks overwrite
2609    if obj.lock_legal_hold.as_deref() == Some("ON") {
2610        return Some("AccessDenied");
2611    }
2612    // Retention check
2613    if let (Some(mode), Some(until)) = (&obj.lock_mode, &obj.lock_retain_until) {
2614        if *until > Utc::now() {
2615            if mode == "COMPLIANCE" {
2616                return Some("AccessDenied");
2617            }
2618            if mode == "GOVERNANCE" {
2619                let bypass = req
2620                    .headers
2621                    .get("x-amz-bypass-governance-retention")
2622                    .and_then(|v| v.to_str().ok())
2623                    .map(|s| s.eq_ignore_ascii_case("true"))
2624                    .unwrap_or(false);
2625                if !bypass {
2626                    return Some("AccessDenied");
2627                }
2628            }
2629        }
2630    }
2631    None
2632}
2633
2634#[cfg(test)]
2635mod tests;