Skip to main content

fakecloud_s3/service/
mod.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use bytes::Bytes;
5use chrono::{DateTime, Timelike, Utc};
6use http::{HeaderMap, Method, StatusCode};
7use md5::{Digest, Md5};
8
9use fakecloud_aws::arn::Arn;
10use fakecloud_core::delivery::DeliveryBus;
11use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
12use fakecloud_kms::SharedKmsState;
13use fakecloud_persistence::{MemoryS3Store, S3Store, StoreError};
14
15use base64::engine::general_purpose::STANDARD as BASE64;
16use base64::Engine as _;
17
18use crate::logging;
19use crate::state::{AclGrant, S3Bucket, S3Object, SharedS3State};
20
21mod access_points;
22mod acl;
23mod buckets;
24pub(crate) mod config;
25mod lock;
26mod multipart;
27mod notifications;
28mod objects;
29mod tags;
30
31// Re-export notification helpers for use in sub-modules
32#[cfg(test)]
33use notifications::replicate_object;
34pub(super) use notifications::{
35    deliver_notifications, normalize_notification_ids, normalize_replication_xml,
36    replicate_through_store,
37};
38
39// Used only within this file (parse_cors_config)
40use notifications::extract_all_xml_values;
41
42// Re-exports used only in tests
43#[cfg(test)]
44use notifications::{
45    event_matches, key_matches_filters, parse_notification_config, parse_replication_rules,
46    NotificationTargetType,
47};
48
49pub struct S3Service {
50    state: SharedS3State,
51    delivery: Arc<DeliveryBus>,
52    kms_state: Option<SharedKmsState>,
53    pub(crate) kms_hook: Option<Arc<dyn fakecloud_core::delivery::KmsHook>>,
54    store: Arc<dyn S3Store>,
55}
56
57/// Map a [`StoreError`] from the persistence layer to a 500 InternalError
58/// response. Invoked at every mutation site when the write-through persistence
59/// call fails: the in-memory mutation has already happened, but we surface the
60/// failure to the client so they know to retry (and so logs/metrics flag it).
61pub(crate) fn persistence_error(err: StoreError) -> AwsServiceError {
62    AwsServiceError::aws_error(
63        StatusCode::INTERNAL_SERVER_ERROR,
64        "InternalError",
65        format!("persistence store error: {err}"),
66    )
67}
68
69/// Convert a filesystem IO error from a disk-backed body read into an
70/// InternalError response.
71pub(crate) fn io_to_aws(err: std::io::Error) -> AwsServiceError {
72    AwsServiceError::aws_error(
73        StatusCode::INTERNAL_SERVER_ERROR,
74        "InternalError",
75        format!("failed to read object body from disk: {err}"),
76    )
77}
78
79impl S3Service {
80    pub fn new(state: SharedS3State, delivery: Arc<DeliveryBus>) -> Self {
81        Self::with_store(state, delivery, Arc::new(MemoryS3Store::new()))
82    }
83
84    pub fn with_store(
85        state: SharedS3State,
86        delivery: Arc<DeliveryBus>,
87        store: Arc<dyn S3Store>,
88    ) -> Self {
89        Self {
90            state,
91            delivery,
92            kms_state: None,
93            kms_hook: None,
94            store,
95        }
96    }
97
98    pub fn with_kms(mut self, kms_state: SharedKmsState) -> Self {
99        self.kms_state = Some(kms_state);
100        self
101    }
102
103    pub fn with_kms_hook(mut self, hook: Arc<dyn fakecloud_core::delivery::KmsHook>) -> Self {
104        self.kms_hook = Some(hook);
105        self
106    }
107
108    /// Encrypt object body bytes for SSE-KMS storage. Returns ciphertext as
109    /// raw bytes (a UTF-8 fakecloud-kms envelope) on success.
110    ///
111    /// Fail-closed: if the KMS hook reports an error (key denied, key not
112    /// found, etc.), this returns `Err` so PutObject aborts with a 500
113    /// rather than silently storing plaintext. AWS S3 has the same
114    /// behavior — `KMS.NotFoundException` and friends surface as
115    /// `AccessDenied` / `KMS.*` errors back to the caller. When no hook is
116    /// wired (legacy / in-process tests with no KMS dependency), the
117    /// plaintext is returned unchanged so existing tests keep working.
118    pub(crate) fn encrypt_object_body(
119        &self,
120        account_id: &str,
121        region: &str,
122        bucket: &str,
123        plaintext: &[u8],
124        kms_key_id: Option<&str>,
125    ) -> Result<bytes::Bytes, AwsServiceError> {
126        let Some(hook) = &self.kms_hook else {
127            return Ok(bytes::Bytes::copy_from_slice(plaintext));
128        };
129        let key = kms_key_id.filter(|k| !k.is_empty()).unwrap_or("aws/s3");
130        let bucket_arn = Arn::s3(bucket).to_string();
131        let mut ctx = std::collections::HashMap::new();
132        ctx.insert("aws:s3:arn".to_string(), bucket_arn);
133        match hook.encrypt(account_id, region, key, plaintext, "s3.amazonaws.com", ctx) {
134            Ok(envelope) => Ok(bytes::Bytes::from(envelope.into_bytes())),
135            Err(err) => {
136                tracing::warn!(bucket = %bucket, error = %err, "SSE-KMS encrypt failed");
137                Err(AwsServiceError::aws_error(
138                    StatusCode::INTERNAL_SERVER_ERROR,
139                    "KMS.InternalFailureException",
140                    format!("Failed to encrypt object via KMS: {err}"),
141                ))
142            }
143        }
144    }
145
146    /// Decrypt object body bytes that were stored as a fakecloud-kms
147    /// envelope. Caller is expected to gate this on
148    /// `obj.sse_algorithm == Some("aws:kms")`.
149    ///
150    /// Fail-closed: when a hook is wired and the bytes look like an
151    /// envelope but don't decrypt (key revoked, malformed ciphertext),
152    /// this returns `Err` so GetObject surfaces a 500. When no hook is
153    /// wired, or the bytes aren't UTF-8 (legacy snapshots from before
154    /// the hook landed), the bytes are returned unchanged.
155    pub(crate) fn decrypt_object_body(
156        &self,
157        account_id: &str,
158        bucket: &str,
159        ciphertext: &[u8],
160    ) -> Result<bytes::Bytes, AwsServiceError> {
161        let Some(hook) = &self.kms_hook else {
162            return Ok(bytes::Bytes::copy_from_slice(ciphertext));
163        };
164        // Stored envelope is base64 ASCII; non-UTF-8 bytes are pre-hook
165        // legacy snapshots, return as-is.
166        let envelope = match std::str::from_utf8(ciphertext) {
167            Ok(s) => s,
168            Err(_) => return Ok(bytes::Bytes::copy_from_slice(ciphertext)),
169        };
170        let bucket_arn = Arn::s3(bucket).to_string();
171        let mut ctx = std::collections::HashMap::new();
172        ctx.insert("aws:s3:arn".to_string(), bucket_arn);
173        match hook.decrypt(account_id, envelope, "s3.amazonaws.com", ctx) {
174            Ok(bytes) => Ok(bytes::Bytes::from(bytes)),
175            Err(err) => {
176                tracing::warn!(bucket = %bucket, error = %err, "SSE-KMS decrypt failed");
177                Err(AwsServiceError::aws_error(
178                    StatusCode::INTERNAL_SERVER_ERROR,
179                    "KMS.InternalFailureException",
180                    format!("Failed to decrypt object via KMS: {err}"),
181                ))
182            }
183        }
184    }
185}
186
187#[async_trait]
188impl AwsService for S3Service {
189    fn service_name(&self) -> &str {
190        "s3"
191    }
192
193    async fn handle(&self, mut req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
194        // PutObject / UploadPart enter dispatch via the streaming path
195        // with `body = Bytes::new()` and the raw HTTP body available on
196        // `req.body_stream`. Those handlers consume the stream directly
197        // — spooling chunks to disk while computing MD5 + size in
198        // constant memory — so a 1 GiB upload never materializes into
199        // RAM. Every *other* PUT-on-bucket-key the streaming dispatch
200        // flagged (PutObjectTagging, PutObjectAcl, PutObjectRetention,
201        // PutObjectLegalHold, CopyObject, …) reads a small XML/JSON
202        // body from `req.body`, so drain the stream for them.
203        let is_put_to_key = req.method == Method::PUT
204            && req.path_segments.len() >= 2
205            && req
206                .path_segments
207                .first()
208                .map(|s| !s.is_empty())
209                .unwrap_or(false);
210        let q = &req.query_params;
211        let is_put_object = is_put_to_key
212            && !q.contains_key("tagging")
213            && !q.contains_key("acl")
214            && !q.contains_key("retention")
215            && !q.contains_key("legal-hold")
216            && !q.contains_key("renameObject")
217            && !q.contains_key("encryption")
218            && !req.headers.contains_key("x-amz-copy-source");
219        // UploadPart requires both partNumber AND uploadId; checking
220        // only partNumber would skip body draining for stray PUTs that
221        // happened to carry a partNumber query param without being a
222        // real multipart upload.
223        let is_upload_part =
224            is_put_to_key && q.contains_key("partNumber") && q.contains_key("uploadId");
225        if !is_put_object && !is_upload_part {
226            if let Some(stream) = req.take_body_stream() {
227                req.body = fakecloud_core::service::drain_request_stream(stream).await?;
228            }
229        }
230
231        // Access point data plane: resolve alias -> bucket before routing.
232        access_points::resolve_access_point(self, &mut req)?;
233
234        let account_id = req.account_id.as_str();
235
236        // S3 Control endpoint (access point management).
237        let host = req
238            .headers
239            .get("host")
240            .and_then(|v| v.to_str().ok())
241            .unwrap_or("");
242        let is_control = host.to_ascii_lowercase().contains("s3-control");
243        if is_control {
244            let v1 = req.path_segments.first().map(|s| s.as_str());
245            let v2 = req.path_segments.get(1).map(|s| s.as_str());
246            let v3 = req.path_segments.get(2).map(|s| s.as_str());
247            if v1 == Some("v20180820") && v2 == Some("accesspoint") {
248                if let Some(name) = v3 {
249                    match req.method {
250                        Method::PUT => return self.create_access_point(account_id, &req, name),
251                        Method::GET => return self.get_access_point(account_id, &req, name),
252                        Method::DELETE => return self.delete_access_point(account_id, &req, name),
253                        _ => {}
254                    }
255                } else if req.method == Method::GET {
256                    return self.list_access_points(account_id, &req);
257                }
258            }
259        }
260
261        // S3 REST routing: method + path segments + query params
262        //
263        // Reject paths that start with `//` (an empty bucket segment). The
264        // path-segment splitter further down filters empty segments out,
265        // which would otherwise let a malformed URL like `PUT //my-key`
266        // collapse to a valid `PUT /my-key` (CreateBucket) and silently
267        // succeed. Real S3 rejects empty bucket segments with
268        // InvalidBucketName, so mirror that.
269        if req.raw_path.starts_with("//") {
270            return Err(AwsServiceError::aws_error(
271                StatusCode::BAD_REQUEST,
272                "InvalidBucketName",
273                "The specified bucket is not valid: bucket name cannot be empty",
274            ));
275        }
276        let bucket = req.path_segments.first().map(|s| s.as_str());
277        // Extract key from the raw path to preserve leading slashes and empty segments.
278        // The raw path is like "/bucket/key/parts" — we strip the bucket prefix.
279        let key = if let Some(b) = bucket {
280            let prefix = format!("/{b}/");
281            if req.raw_path.starts_with(&prefix) && req.raw_path.len() > prefix.len() {
282                let raw_key = &req.raw_path[prefix.len()..];
283                Some(
284                    percent_encoding::percent_decode_str(raw_key)
285                        .decode_utf8_lossy()
286                        .into_owned(),
287                )
288            } else if req.path_segments.len() > 1 {
289                let raw = req.path_segments[1..].join("/");
290                Some(
291                    percent_encoding::percent_decode_str(&raw)
292                        .decode_utf8_lossy()
293                        .into_owned(),
294                )
295            } else {
296                None
297            }
298        } else {
299            None
300        };
301
302        // Multipart upload operations (checked before main match)
303        if let Some(b) = bucket {
304            // POST /{bucket}/{key}?uploads — CreateMultipartUpload
305            if req.method == Method::POST
306                && key.is_some()
307                && req.query_params.contains_key("uploads")
308            {
309                return self.create_multipart_upload(account_id, &req, b, key.as_deref().unwrap());
310            }
311
312            // POST /{bucket}/{key}?restore
313            if req.method == Method::POST
314                && key.is_some()
315                && req.query_params.contains_key("restore")
316            {
317                return self.restore_object(account_id, &req, b, key.as_deref().unwrap());
318            }
319
320            // POST /{bucket}/{key}?uploadId=X — CompleteMultipartUpload
321            if req.method == Method::POST && key.is_some() {
322                if let Some(upload_id) = req.query_params.get("uploadId").cloned() {
323                    return self.complete_multipart_upload(
324                        account_id,
325                        &req,
326                        b,
327                        key.as_deref().unwrap(),
328                        &upload_id,
329                    );
330                }
331            }
332
333            // PUT /{bucket}/{key}?partNumber=N&uploadId=X — UploadPart or UploadPartCopy
334            if req.method == Method::PUT && key.is_some() {
335                if let (Some(part_num_str), Some(upload_id)) = (
336                    req.query_params.get("partNumber").cloned(),
337                    req.query_params.get("uploadId").cloned(),
338                ) {
339                    if let Ok(part_number) = part_num_str.parse::<i64>() {
340                        if req.headers.contains_key("x-amz-copy-source") {
341                            return self.upload_part_copy(
342                                account_id,
343                                &req,
344                                b,
345                                key.as_deref().unwrap(),
346                                &upload_id,
347                                part_number,
348                            );
349                        }
350                        return self
351                            .upload_part(
352                                account_id,
353                                &req,
354                                b,
355                                key.as_deref().unwrap(),
356                                &upload_id,
357                                part_number,
358                            )
359                            .await;
360                    }
361                }
362            }
363
364            // DELETE /{bucket}/{key}?uploadId=X — AbortMultipartUpload
365            if req.method == Method::DELETE && key.is_some() {
366                if let Some(upload_id) = req.query_params.get("uploadId").cloned() {
367                    return self.abort_multipart_upload(
368                        account_id,
369                        b,
370                        key.as_deref().unwrap(),
371                        &upload_id,
372                    );
373                }
374            }
375
376            // GET /{bucket}?uploads — ListMultipartUploads
377            if req.method == Method::GET
378                && key.is_none()
379                && req.query_params.contains_key("uploads")
380            {
381                return self.list_multipart_uploads(account_id, b, &req.query_params);
382            }
383
384            // GET /{bucket}/{key}?uploadId=X — ListParts
385            if req.method == Method::GET && key.is_some() {
386                if let Some(upload_id) = req.query_params.get("uploadId").cloned() {
387                    return self.list_parts(
388                        account_id,
389                        &req,
390                        b,
391                        key.as_deref().unwrap(),
392                        &upload_id,
393                    );
394                }
395            }
396        }
397
398        // Handle OPTIONS preflight requests (CORS)
399        if req.method == Method::OPTIONS {
400            if let Some(b_name) = bucket {
401                let cors_config = {
402                    let accounts = self.state.read();
403                    let _empty_s3 = crate::state::S3State::new(&req.account_id, &req.region);
404                    let state = accounts.get(&req.account_id).unwrap_or(&_empty_s3);
405                    state
406                        .buckets
407                        .get(b_name)
408                        .and_then(|b| b.cors_config.clone())
409                };
410                if let Some(ref config) = cors_config {
411                    let origin = req
412                        .headers
413                        .get("origin")
414                        .and_then(|v| v.to_str().ok())
415                        .unwrap_or("");
416                    let request_method = req
417                        .headers
418                        .get("access-control-request-method")
419                        .and_then(|v| v.to_str().ok())
420                        .unwrap_or("");
421                    let rules = parse_cors_config(config);
422                    if let Some(rule) = find_cors_rule(&rules, origin, Some(request_method)) {
423                        let mut headers = HeaderMap::new();
424                        let matched_origin = if rule.allowed_origins.contains(&"*".to_string()) {
425                            "*"
426                        } else {
427                            origin
428                        };
429                        headers.insert(
430                            "access-control-allow-origin",
431                            matched_origin
432                                .parse()
433                                .unwrap_or_else(|_| http::HeaderValue::from_static("")),
434                        );
435                        headers.insert(
436                            "access-control-allow-methods",
437                            rule.allowed_methods
438                                .join(", ")
439                                .parse()
440                                .unwrap_or_else(|_| http::HeaderValue::from_static("")),
441                        );
442                        if !rule.allowed_headers.is_empty() {
443                            let ah = if rule.allowed_headers.contains(&"*".to_string()) {
444                                req.headers
445                                    .get("access-control-request-headers")
446                                    .and_then(|v| v.to_str().ok())
447                                    .unwrap_or("*")
448                                    .to_string()
449                            } else {
450                                rule.allowed_headers.join(", ")
451                            };
452                            headers.insert(
453                                "access-control-allow-headers",
454                                ah.parse()
455                                    .unwrap_or_else(|_| http::HeaderValue::from_static("")),
456                            );
457                        }
458                        if let Some(max_age) = rule.max_age_seconds {
459                            headers.insert(
460                                "access-control-max-age",
461                                max_age
462                                    .to_string()
463                                    .parse()
464                                    .unwrap_or_else(|_| http::HeaderValue::from_static("")),
465                            );
466                        }
467                        return Ok(AwsResponse {
468                            status: StatusCode::OK,
469                            content_type: String::new(),
470                            body: Bytes::new().into(),
471                            headers,
472                        });
473                    }
474                }
475                return Err(AwsServiceError::aws_error(
476                    StatusCode::FORBIDDEN,
477                    "CORSResponse",
478                    "CORS is not enabled for this bucket",
479                ));
480            }
481        }
482
483        // Capture origin for CORS response headers
484        let origin_header = req
485            .headers
486            .get("origin")
487            .and_then(|v| v.to_str().ok())
488            .map(|s| s.to_string());
489
490        // Bucket-scoped sub-resource query params. If a request targets one
491        // of these without a bucket in the path, S3 returns an error rather
492        // than silently treating it as a top-level (service) operation.
493        // Real AWS rejects e.g. `GET /?tagging` with a routing/validation
494        // error; mirroring that prevents bucket-required ops from being
495        // accidentally answered by ListBuckets / WriteGetObjectResponse.
496        let bucket_subresources: &[&str] = &[
497            "tagging",
498            "acl",
499            "versioning",
500            "cors",
501            "notification",
502            "website",
503            "accelerate",
504            "publicAccessBlock",
505            "encryption",
506            "lifecycle",
507            "logging",
508            "policy",
509            "policyStatus",
510            "replication",
511            "ownershipControls",
512            "inventory",
513            "analytics",
514            "intelligent-tiering",
515            "metrics",
516            "requestPayment",
517            "abac",
518            "metadataConfiguration",
519            "metadataTable",
520            "metadataInventoryTable",
521            "metadataJournalTable",
522            "object-lock",
523            "versions",
524            "session",
525            "retention",
526            "legal-hold",
527            "attributes",
528            "torrent",
529            "renameObject",
530            "uploads",
531            "restore",
532            "select",
533            // Bucket-required ops the earlier list missed: their query
534            // markers (e.g. `?location` for GetBucketLocation,
535            // `?delete` for DeleteObjects, `?list-type=2` for
536            // ListObjectsV2) signal a bucket op so a request with the
537            // marker but no bucket segment is malformed, not a service-
538            // level call.
539            "location",
540            "delete",
541            "list-type",
542        ];
543        if bucket.is_none()
544            && bucket_subresources
545                .iter()
546                .any(|q| req.query_params.contains_key(*q))
547        {
548            return Err(AwsServiceError::aws_error(
549                StatusCode::BAD_REQUEST,
550                "InvalidBucketName",
551                "The specified bucket is not valid: bucket name is required for this operation",
552            ));
553        }
554
555        let mut result = match (&req.method, bucket, key.as_deref()) {
556            // ListBuckets: GET /
557            (&Method::GET, None, None) => {
558                if req.query_params.get("x-id").map(|s| s.as_str()) == Some("ListDirectoryBuckets")
559                {
560                    self.list_directory_buckets(account_id, &req)
561                } else {
562                    self.list_buckets(account_id, &req)
563                }
564            }
565
566            // Bucket-level operations (no key)
567            (&Method::PUT, Some(b), None) => {
568                if req.query_params.contains_key("tagging") {
569                    self.put_bucket_tagging(account_id, &req, b)
570                } else if req.query_params.contains_key("acl") {
571                    self.put_bucket_acl(account_id, &req, b)
572                } else if req.query_params.contains_key("versioning") {
573                    self.put_bucket_versioning(account_id, &req, b)
574                } else if req.query_params.contains_key("cors") {
575                    self.put_bucket_cors(account_id, &req, b)
576                } else if req.query_params.contains_key("notification") {
577                    self.put_bucket_notification(account_id, &req, b)
578                } else if req.query_params.contains_key("website") {
579                    self.put_bucket_website(account_id, &req, b)
580                } else if req.query_params.contains_key("accelerate") {
581                    self.put_bucket_accelerate(account_id, &req, b)
582                } else if req.query_params.contains_key("publicAccessBlock") {
583                    self.put_public_access_block(account_id, &req, b)
584                } else if req.query_params.contains_key("encryption") {
585                    self.put_bucket_encryption(account_id, &req, b)
586                } else if req.query_params.contains_key("lifecycle") {
587                    self.put_bucket_lifecycle(account_id, &req, b)
588                } else if req.query_params.contains_key("logging") {
589                    self.put_bucket_logging(account_id, &req, b)
590                } else if req.query_params.contains_key("policy") {
591                    self.put_bucket_policy(account_id, &req, b)
592                } else if req.query_params.contains_key("object-lock") {
593                    self.put_object_lock_config(account_id, &req, b)
594                } else if req.query_params.contains_key("replication") {
595                    self.put_bucket_replication(account_id, &req, b)
596                } else if req.query_params.contains_key("ownershipControls") {
597                    self.put_bucket_ownership_controls(account_id, &req, b)
598                } else if req.query_params.contains_key("inventory") {
599                    self.put_bucket_inventory(account_id, &req, b)
600                } else if req.query_params.contains_key("analytics") {
601                    self.put_bucket_analytics_config(account_id, &req, b)
602                } else if req.query_params.contains_key("intelligent-tiering") {
603                    self.put_bucket_intelligent_tiering_config(account_id, &req, b)
604                } else if req.query_params.contains_key("metrics") {
605                    self.put_bucket_metrics_config(account_id, &req, b)
606                } else if req.query_params.contains_key("requestPayment") {
607                    self.put_bucket_request_payment(account_id, &req, b)
608                } else if req.query_params.contains_key("abac") {
609                    self.put_bucket_abac(account_id, &req, b)
610                } else if req.query_params.contains_key("metadataInventoryTable") {
611                    self.update_bucket_metadata_inventory_table(account_id, &req, b)
612                } else if req.query_params.contains_key("metadataJournalTable") {
613                    self.update_bucket_metadata_journal_table(account_id, &req, b)
614                } else {
615                    self.create_bucket(account_id, &req, b)
616                }
617            }
618            (&Method::DELETE, Some(b), None) => {
619                if req.query_params.contains_key("tagging") {
620                    self.delete_bucket_tagging(account_id, &req, b)
621                } else if req.query_params.contains_key("cors") {
622                    self.delete_bucket_cors(account_id, b)
623                } else if req.query_params.contains_key("website") {
624                    self.delete_bucket_website(account_id, b)
625                } else if req.query_params.contains_key("publicAccessBlock") {
626                    self.delete_public_access_block(account_id, b)
627                } else if req.query_params.contains_key("encryption") {
628                    self.delete_bucket_encryption(account_id, b)
629                } else if req.query_params.contains_key("lifecycle") {
630                    self.delete_bucket_lifecycle(account_id, b)
631                } else if req.query_params.contains_key("policy") {
632                    self.delete_bucket_policy(account_id, b)
633                } else if req.query_params.contains_key("replication") {
634                    self.delete_bucket_replication(account_id, b)
635                } else if req.query_params.contains_key("ownershipControls") {
636                    self.delete_bucket_ownership_controls(account_id, b)
637                } else if req.query_params.contains_key("inventory") {
638                    self.delete_bucket_inventory(account_id, &req, b)
639                } else if req.query_params.contains_key("analytics") {
640                    self.delete_bucket_analytics_config(account_id, &req, b)
641                } else if req.query_params.contains_key("intelligent-tiering") {
642                    self.delete_bucket_intelligent_tiering_config(account_id, &req, b)
643                } else if req.query_params.contains_key("metrics") {
644                    self.delete_bucket_metrics_config(account_id, &req, b)
645                } else if req.query_params.contains_key("metadataConfiguration") {
646                    self.delete_bucket_metadata_config(account_id, b)
647                } else if req.query_params.contains_key("metadataTable") {
648                    self.delete_bucket_metadata_table_config(account_id, b)
649                } else {
650                    self.delete_bucket(account_id, &req, b)
651                }
652            }
653            (&Method::HEAD, Some(b), None) => self.head_bucket(account_id, b),
654            (&Method::GET, Some(b), None) => {
655                if req.query_params.contains_key("tagging") {
656                    self.get_bucket_tagging(account_id, &req, b)
657                } else if req.query_params.contains_key("location") {
658                    self.get_bucket_location(account_id, b)
659                } else if req.query_params.contains_key("acl") {
660                    self.get_bucket_acl(account_id, &req, b)
661                } else if req.query_params.contains_key("versioning") {
662                    self.get_bucket_versioning(account_id, b)
663                } else if req.query_params.contains_key("versions") {
664                    self.list_object_versions(account_id, &req, b)
665                } else if req.query_params.contains_key("object-lock") {
666                    self.get_object_lock_configuration(account_id, b)
667                } else if req.query_params.contains_key("cors") {
668                    self.get_bucket_cors(account_id, b)
669                } else if req.query_params.contains_key("notification") {
670                    self.get_bucket_notification(account_id, b)
671                } else if req.query_params.contains_key("website") {
672                    self.get_bucket_website(account_id, b)
673                } else if req.query_params.contains_key("accelerate") {
674                    self.get_bucket_accelerate(account_id, b)
675                } else if req.query_params.contains_key("publicAccessBlock") {
676                    self.get_public_access_block(account_id, b)
677                } else if req.query_params.contains_key("encryption") {
678                    self.get_bucket_encryption(account_id, b)
679                } else if req.query_params.contains_key("lifecycle") {
680                    self.get_bucket_lifecycle(account_id, b)
681                } else if req.query_params.contains_key("logging") {
682                    self.get_bucket_logging(account_id, b)
683                } else if req.query_params.contains_key("policy") {
684                    self.get_bucket_policy(account_id, b)
685                } else if req.query_params.contains_key("replication") {
686                    self.get_bucket_replication(account_id, b)
687                } else if req.query_params.contains_key("ownershipControls") {
688                    self.get_bucket_ownership_controls(account_id, b)
689                } else if req.query_params.contains_key("inventory") {
690                    if req.query_params.contains_key("id") {
691                        self.get_bucket_inventory(account_id, &req, b)
692                    } else {
693                        self.list_bucket_inventory_configurations(account_id, b)
694                    }
695                } else if req.query_params.contains_key("analytics") {
696                    if req.query_params.contains_key("id") {
697                        self.get_bucket_analytics_config(account_id, &req, b)
698                    } else {
699                        self.list_bucket_analytics_configurations(account_id, b)
700                    }
701                } else if req.query_params.contains_key("intelligent-tiering") {
702                    if req.query_params.contains_key("id") {
703                        self.get_bucket_intelligent_tiering_config(account_id, &req, b)
704                    } else {
705                        self.list_bucket_intelligent_tiering_configurations(account_id, b)
706                    }
707                } else if req.query_params.contains_key("metrics") {
708                    if req.query_params.contains_key("id") {
709                        self.get_bucket_metrics_config(account_id, &req, b)
710                    } else {
711                        self.list_bucket_metrics_configurations(account_id, b)
712                    }
713                } else if req.query_params.contains_key("requestPayment") {
714                    self.get_bucket_request_payment(account_id, b)
715                } else if req.query_params.contains_key("abac") {
716                    self.get_bucket_abac(account_id, b)
717                } else if req.query_params.contains_key("policyStatus") {
718                    self.get_bucket_policy_status(account_id, b)
719                } else if req.query_params.contains_key("metadataConfiguration") {
720                    self.get_bucket_metadata_config(account_id, b)
721                } else if req.query_params.contains_key("metadataTable") {
722                    self.get_bucket_metadata_table_config(account_id, b)
723                } else if req.query_params.contains_key("session") {
724                    self.create_session(account_id, &req, b)
725                } else if req.query_params.get("list-type").map(|s| s.as_str()) == Some("2") {
726                    self.list_objects_v2(account_id, &req, b)
727                } else if req.query_params.is_empty() {
728                    // If bucket has website config and no query params, serve index document
729                    let website_config = {
730                        let accounts = self.state.read();
731                        let _empty_s3 = crate::state::S3State::new(&req.account_id, &req.region);
732                        let state = accounts.get(&req.account_id).unwrap_or(&_empty_s3);
733                        state
734                            .buckets
735                            .get(b)
736                            .and_then(|bkt| bkt.website_config.clone())
737                    };
738                    if let Some(ref config) = website_config {
739                        if let Some(index_doc) = extract_xml_value(config, "Suffix").or_else(|| {
740                            extract_xml_value(config, "IndexDocument").and_then(|inner| {
741                                let open = "<Suffix>";
742                                let close = "</Suffix>";
743                                let s = inner.find(open)? + open.len();
744                                let e = inner.find(close)?;
745                                Some(inner[s..e].trim().to_string())
746                            })
747                        }) {
748                            self.serve_website_object(account_id, &req, b, &index_doc, config)
749                        } else {
750                            self.list_objects_v1(account_id, &req, b)
751                        }
752                    } else {
753                        self.list_objects_v1(account_id, &req, b)
754                    }
755                } else {
756                    self.list_objects_v1(account_id, &req, b)
757                }
758            }
759
760            // Object-level operations
761            (&Method::PUT, Some(b), Some(k)) => {
762                if req.query_params.contains_key("tagging") {
763                    self.put_object_tagging(account_id, &req, b, k)
764                } else if req.query_params.contains_key("acl") {
765                    self.put_object_acl(account_id, &req, b, k)
766                } else if req.query_params.contains_key("retention") {
767                    self.put_object_retention(account_id, &req, b, k)
768                } else if req.query_params.contains_key("legal-hold") {
769                    self.put_object_legal_hold(account_id, &req, b, k)
770                } else if req.query_params.contains_key("renameObject") {
771                    self.rename_object(account_id, &req, b, k)
772                } else if req.query_params.contains_key("encryption") {
773                    self.update_object_encryption(account_id, &req, b, k)
774                } else if req.headers.contains_key("x-amz-copy-source") {
775                    self.copy_object(account_id, &req, b, k)
776                } else {
777                    self.put_object(account_id, &req, b, k).await
778                }
779            }
780            (&Method::GET, Some(b), Some(k)) => {
781                if req.query_params.contains_key("tagging") {
782                    self.get_object_tagging(account_id, &req, b, k)
783                } else if req.query_params.contains_key("acl") {
784                    self.get_object_acl(account_id, &req, b, k)
785                } else if req.query_params.contains_key("retention") {
786                    self.get_object_retention(account_id, &req, b, k)
787                } else if req.query_params.contains_key("legal-hold") {
788                    self.get_object_legal_hold(account_id, &req, b, k)
789                } else if req.query_params.contains_key("attributes") {
790                    self.get_object_attributes(account_id, &req, b, k)
791                } else if req.query_params.contains_key("torrent") {
792                    self.get_object_torrent(account_id, &req, b, k)
793                } else {
794                    let result = self.get_object(account_id, &req, b, k);
795                    // If object not found and bucket has website config, serve error document
796                    let is_not_found = matches!(
797                        &result,
798                        Err(e) if e.code() == "NoSuchKey"
799                    );
800                    if is_not_found {
801                        let website_config = {
802                            let accounts = self.state.read();
803                            let _empty_s3 =
804                                crate::state::S3State::new(&req.account_id, &req.region);
805                            let state = accounts.get(&req.account_id).unwrap_or(&_empty_s3);
806                            state
807                                .buckets
808                                .get(b)
809                                .and_then(|bkt| bkt.website_config.clone())
810                        };
811                        if let Some(ref config) = website_config {
812                            if let Some(error_key) = extract_xml_value(config, "ErrorDocument")
813                                .and_then(|inner| {
814                                    let open = "<Key>";
815                                    let close = "</Key>";
816                                    let s = inner.find(open)? + open.len();
817                                    let e = inner.find(close)?;
818                                    Some(inner[s..e].trim().to_string())
819                                })
820                                .or_else(|| extract_xml_value(config, "Key"))
821                            {
822                                return self.serve_website_error(account_id, &req, b, &error_key);
823                            }
824                        }
825                    }
826                    result
827                }
828            }
829            (&Method::DELETE, Some(b), Some(k)) => {
830                if req.query_params.contains_key("tagging") {
831                    self.delete_object_tagging(account_id, b, k)
832                } else {
833                    self.delete_object(account_id, &req, b, k)
834                }
835            }
836            (&Method::HEAD, Some(b), Some(k)) => self.head_object(account_id, &req, b, k),
837
838            // POST /{bucket}?delete — batch delete
839            (&Method::POST, Some(b), None) if req.query_params.contains_key("delete") => {
840                self.delete_objects(account_id, &req, b)
841            }
842            (&Method::POST, Some(b), None)
843                if req.query_params.contains_key("metadataConfiguration") =>
844            {
845                self.create_bucket_metadata_config(account_id, &req, b)
846            }
847            (&Method::POST, Some(b), None) if req.query_params.contains_key("metadataTable") => {
848                self.create_bucket_metadata_table_config(account_id, &req, b)
849            }
850            (&Method::POST, Some(b), Some(k))
851                if req.query_params.get("select-type").map(|s| s.as_str()) == Some("2") =>
852            {
853                self.select_object_content(account_id, &req, b, k)
854            }
855            (&Method::POST, Some("WriteGetObjectResponse"), None) => {
856                self.write_get_object_response(account_id, &req)
857            }
858
859            _ => Err(AwsServiceError::aws_error(
860                StatusCode::METHOD_NOT_ALLOWED,
861                "MethodNotAllowed",
862                "The specified method is not allowed against this resource",
863            )),
864        };
865
866        // Apply CORS headers to the response if Origin was present
867        if let (Some(ref origin), Some(b_name)) = (&origin_header, bucket) {
868            let cors_config = {
869                let accounts = self.state.read();
870                let _empty_s3 = crate::state::S3State::new(&req.account_id, &req.region);
871                let state = accounts.get(&req.account_id).unwrap_or(&_empty_s3);
872                state
873                    .buckets
874                    .get(b_name)
875                    .and_then(|b| b.cors_config.clone())
876            };
877            if let Some(ref config) = cors_config {
878                let rules = parse_cors_config(config);
879                if let Some(rule) = find_cors_rule(&rules, origin, None) {
880                    if let Ok(ref mut resp) = result {
881                        let matched_origin = if rule.allowed_origins.contains(&"*".to_string()) {
882                            "*"
883                        } else {
884                            origin
885                        };
886                        resp.headers.insert(
887                            "access-control-allow-origin",
888                            matched_origin
889                                .parse()
890                                .unwrap_or_else(|_| http::HeaderValue::from_static("")),
891                        );
892                        if !rule.expose_headers.is_empty() {
893                            resp.headers.insert(
894                                "access-control-expose-headers",
895                                rule.expose_headers
896                                    .join(", ")
897                                    .parse()
898                                    .unwrap_or_else(|_| http::HeaderValue::from_static("")),
899                            );
900                        }
901                    }
902                }
903            }
904        }
905
906        // Write S3 access log entry if the source bucket has logging enabled
907        if let Some(b_name) = bucket {
908            let status_code = match &result {
909                Ok(resp) => resp.status.as_u16(),
910                Err(e) => e.status().as_u16(),
911            };
912            let op = logging::operation_name(&req.method, key.as_deref());
913            logging::maybe_write_access_log(
914                &self.state,
915                &self.store,
916                b_name,
917                &logging::AccessLogRequest {
918                    operation: op,
919                    key: key.as_deref(),
920                    status: status_code,
921                    request_id: &req.request_id,
922                    method: req.method.as_str(),
923                    path: &req.raw_path,
924                },
925            );
926        }
927
928        result
929    }
930
931    fn supported_actions(&self) -> &[&str] {
932        &[
933            // Buckets
934            "ListBuckets",
935            "CreateBucket",
936            "DeleteBucket",
937            "HeadBucket",
938            "GetBucketLocation",
939            // Objects
940            "PutObject",
941            "GetObject",
942            "DeleteObject",
943            "HeadObject",
944            "CopyObject",
945            "DeleteObjects",
946            "ListObjectsV2",
947            "ListObjects",
948            "ListObjectVersions",
949            "GetObjectAttributes",
950            "RestoreObject",
951            // Object properties
952            "PutObjectTagging",
953            "GetObjectTagging",
954            "DeleteObjectTagging",
955            "PutObjectAcl",
956            "GetObjectAcl",
957            "PutObjectRetention",
958            "GetObjectRetention",
959            "PutObjectLegalHold",
960            "GetObjectLegalHold",
961            // Bucket configuration
962            "PutBucketTagging",
963            "GetBucketTagging",
964            "DeleteBucketTagging",
965            "PutBucketAcl",
966            "GetBucketAcl",
967            "PutBucketVersioning",
968            "GetBucketVersioning",
969            "PutBucketCors",
970            "GetBucketCors",
971            "DeleteBucketCors",
972            "PutBucketNotificationConfiguration",
973            "GetBucketNotificationConfiguration",
974            "PutBucketWebsite",
975            "GetBucketWebsite",
976            "DeleteBucketWebsite",
977            "PutBucketAccelerateConfiguration",
978            "GetBucketAccelerateConfiguration",
979            "PutPublicAccessBlock",
980            "GetPublicAccessBlock",
981            "DeletePublicAccessBlock",
982            "PutBucketEncryption",
983            "GetBucketEncryption",
984            "DeleteBucketEncryption",
985            "PutBucketLifecycleConfiguration",
986            "GetBucketLifecycleConfiguration",
987            "DeleteBucketLifecycle",
988            "PutBucketLogging",
989            "GetBucketLogging",
990            "PutBucketPolicy",
991            "GetBucketPolicy",
992            "DeleteBucketPolicy",
993            "PutObjectLockConfiguration",
994            "GetObjectLockConfiguration",
995            "PutBucketReplication",
996            "GetBucketReplication",
997            "DeleteBucketReplication",
998            "PutBucketOwnershipControls",
999            "GetBucketOwnershipControls",
1000            "DeleteBucketOwnershipControls",
1001            "PutBucketInventoryConfiguration",
1002            "GetBucketInventoryConfiguration",
1003            "DeleteBucketInventoryConfiguration",
1004            "ListBucketInventoryConfigurations",
1005            "PutBucketAnalyticsConfiguration",
1006            "GetBucketAnalyticsConfiguration",
1007            "DeleteBucketAnalyticsConfiguration",
1008            "ListBucketAnalyticsConfigurations",
1009            "PutBucketIntelligentTieringConfiguration",
1010            "GetBucketIntelligentTieringConfiguration",
1011            "DeleteBucketIntelligentTieringConfiguration",
1012            "ListBucketIntelligentTieringConfigurations",
1013            "PutBucketMetricsConfiguration",
1014            "GetBucketMetricsConfiguration",
1015            "DeleteBucketMetricsConfiguration",
1016            "ListBucketMetricsConfigurations",
1017            "PutBucketRequestPayment",
1018            "GetBucketRequestPayment",
1019            "PutBucketAbac",
1020            "GetBucketAbac",
1021            "GetBucketPolicyStatus",
1022            "CreateBucketMetadataConfiguration",
1023            "GetBucketMetadataConfiguration",
1024            "DeleteBucketMetadataConfiguration",
1025            "CreateBucketMetadataTableConfiguration",
1026            "GetBucketMetadataTableConfiguration",
1027            "DeleteBucketMetadataTableConfiguration",
1028            "UpdateBucketMetadataInventoryTableConfiguration",
1029            "UpdateBucketMetadataJournalTableConfiguration",
1030            "GetObjectTorrent",
1031            "RenameObject",
1032            "SelectObjectContent",
1033            "UpdateObjectEncryption",
1034            "WriteGetObjectResponse",
1035            "ListDirectoryBuckets",
1036            "CreateSession",
1037            // Multipart uploads
1038            "CreateMultipartUpload",
1039            "UploadPart",
1040            "UploadPartCopy",
1041            "CompleteMultipartUpload",
1042            "AbortMultipartUpload",
1043            "ListParts",
1044            "ListMultipartUploads",
1045        ]
1046    }
1047
1048    fn iam_enforceable(&self) -> bool {
1049        true
1050    }
1051
1052    /// S3 resources are either:
1053    /// - Bucket ARN (`arn:aws:s3:::bucket`) for bucket-level actions
1054    /// - Object ARN (`arn:aws:s3:::bucket/key`) for object-level actions
1055    /// - Wildcard (`*`) for `ListBuckets` which doesn't target a specific
1056    ///   resource
1057    ///
1058    /// S3 ARNs notably omit the account id and region — this is the one
1059    /// AWS service that carries neither in its ARN, because bucket names
1060    /// are globally unique.
1061    fn iam_action_for(&self, request: &AwsRequest) -> Option<fakecloud_core::auth::IamAction> {
1062        // S3 doesn't set `request.action` — the handler dispatches on
1063        // method + path + query params directly. Re-derive the action
1064        // name here so enforcement can match against IAM policies the
1065        // same way the real service would.
1066        let bucket = request.path_segments.first().map(|s| s.as_str());
1067        let key = if request.path_segments.len() > 1 {
1068            Some(request.path_segments[1..].join("/"))
1069        } else {
1070            None
1071        };
1072        let action = s3_detect_action(
1073            request.method.as_str(),
1074            bucket,
1075            key.as_deref(),
1076            &request.query_params,
1077        )?;
1078        // A handful of S3-adjacent ops are authorized under sibling IAM
1079        // service prefixes, not `s3:`. Select the right prefix so a policy
1080        // targeting the real action string actually matches.
1081        let service = match action {
1082            "CreateSession" | "ListAllMyDirectoryBuckets" => "s3express",
1083            "WriteGetObjectResponse" => "s3-object-lambda",
1084            _ => "s3",
1085        };
1086        let resource = s3_resource_for(action, bucket, key.as_deref());
1087        Some(fakecloud_core::auth::IamAction {
1088            service,
1089            action,
1090            resource,
1091        })
1092    }
1093
1094    fn iam_condition_keys_for(
1095        &self,
1096        request: &AwsRequest,
1097        action: &fakecloud_core::auth::IamAction,
1098    ) -> std::collections::BTreeMap<String, Vec<String>> {
1099        s3_condition_keys(action.action, &request.query_params)
1100    }
1101
1102    fn resource_tags_for(
1103        &self,
1104        resource_arn: &str,
1105    ) -> Option<std::collections::HashMap<String, String>> {
1106        s3_resource_tags(&self.state, resource_arn)
1107            .map(|m| m.into_iter().collect::<std::collections::HashMap<_, _>>())
1108    }
1109
1110    fn request_tags_from(
1111        &self,
1112        request: &AwsRequest,
1113        action: &str,
1114    ) -> Option<std::collections::HashMap<String, String>> {
1115        s3_request_tags(request, action)
1116            .map(|m| m.into_iter().collect::<std::collections::HashMap<_, _>>())
1117    }
1118}
1119
1120/// Extract service-specific IAM condition keys from an S3 request.
1121///
1122/// Today only `ListObjects` / `ListObjectsV2` expose keys (`s3:prefix`,
1123/// `s3:delimiter`, `s3:max-keys`) via their query params. Other actions
1124/// return an empty map so the evaluator's safe-fail semantics treat any
1125/// policy condition referencing an unknown key as "doesn't apply".
1126fn s3_condition_keys(
1127    action: &str,
1128    query: &std::collections::HashMap<String, String>,
1129) -> std::collections::BTreeMap<String, Vec<String>> {
1130    let mut out = std::collections::BTreeMap::new();
1131    if matches!(action, "ListObjects" | "ListObjectsV2") {
1132        // Both list variants share the same query param shape.
1133        if let Some(prefix) = query.get("prefix") {
1134            out.insert("s3:prefix".to_string(), vec![prefix.clone()]);
1135        }
1136        if let Some(delimiter) = query.get("delimiter") {
1137            out.insert("s3:delimiter".to_string(), vec![delimiter.clone()]);
1138        }
1139        if let Some(max_keys) = query.get("max-keys") {
1140            out.insert("s3:max-keys".to_string(), vec![max_keys.clone()]);
1141        }
1142    }
1143    out
1144}
1145
1146/// Look up resource tags for an S3 ARN.
1147///
1148/// Bucket-level ARN (`arn:aws:s3:::bucket`) -> bucket tags.
1149/// Object-level ARN (`arn:aws:s3:::bucket/key`) -> object tags.
1150/// `*` (ListBuckets) -> `Some(empty)` (no resource to tag).
1151fn s3_resource_tags(
1152    state: &SharedS3State,
1153    resource_arn: &str,
1154) -> Option<std::collections::BTreeMap<String, String>> {
1155    if resource_arn == "*" {
1156        return Some(std::collections::BTreeMap::new());
1157    }
1158    // S3 ARNs: arn:aws:s3:::bucket or arn:aws:s3:::bucket/key
1159    let after_prefix = resource_arn.strip_prefix("arn:aws:s3:::")?;
1160    let mas = state.read();
1161    // S3 bucket names are globally unique; scan all accounts to find the bucket
1162    let bucket_name = after_prefix.split('/').next().unwrap_or(after_prefix);
1163    let state = mas
1164        .find_account(|s| s.buckets.contains_key(bucket_name))
1165        .and_then(|id| mas.get(id))
1166        .or_else(|| Some(mas.default_ref()))?;
1167    if let Some(slash_pos) = after_prefix.find('/') {
1168        // Object-level: bucket/key
1169        let bucket_name = &after_prefix[..slash_pos];
1170        let key = &after_prefix[slash_pos + 1..];
1171        let bucket = state.buckets.get(bucket_name)?;
1172        // Try current objects first, then versioned objects (latest version)
1173        if let Some(obj) = bucket.objects.get(key) {
1174            Some(obj.tags.clone())
1175        } else if let Some(versions) = bucket.object_versions.get(key) {
1176            versions.last().map(|v| v.tags.clone())
1177        } else {
1178            // Object doesn't exist yet (e.g. PutObject creating it)
1179            Some(std::collections::BTreeMap::new())
1180        }
1181    } else {
1182        // Bucket-level
1183        let bucket = state.buckets.get(after_prefix)?;
1184        Some(bucket.tags.clone())
1185    }
1186}
1187
1188/// Extract tags from an S3 request body/headers.
1189///
1190/// S3 sends tags via:
1191/// - `x-amz-tagging` header (URL-encoded `key=value&...`) on PutObject
1192/// - XML body on PutBucketTagging / PutObjectTagging
1193fn s3_request_tags(
1194    request: &AwsRequest,
1195    action: &str,
1196) -> Option<std::collections::BTreeMap<String, String>> {
1197    match action {
1198        "PutObject" | "CopyObject" | "CreateMultipartUpload" => {
1199            // Tags come via x-amz-tagging header
1200            if let Some(tagging) = request.headers.get("x-amz-tagging") {
1201                let tags = parse_url_encoded_tags(tagging.to_str().unwrap_or(""));
1202                Some(tags.into_iter().collect())
1203            } else {
1204                Some(std::collections::BTreeMap::new())
1205            }
1206        }
1207        "PutBucketTagging" | "PutObjectTagging" => {
1208            // Tags come in XML body
1209            let body = std::str::from_utf8(&request.body).unwrap_or("");
1210            let tags = parse_tagging_xml(body);
1211            Some(tags.into_iter().collect())
1212        }
1213        _ => Some(std::collections::BTreeMap::new()),
1214    }
1215}
1216
1217/// Derive the IAM action name from an S3 REST request. Handles the
1218/// common cases (GetObject, PutObject, DeleteObject, ListObjectsV2,
1219/// CreateBucket, ...) plus a subset of sub-resource operations
1220/// (`?acl`, `?tagging`, `?versioning`, `?policy`, `?cors`, `?website`,
1221/// `?lifecycle`, `?encryption`, `?logging`, `?notification`, `?replication`,
1222/// `?ownershipControls`, `?publicAccessBlock`, `?accelerate`, `?inventory`,
1223/// `?object-lock`, `?uploads`, `?uploadId`).
1224///
1225/// Returns `None` for requests that don't map to a known action — the
1226/// dispatch layer then skips enforcement for that request rather than
1227/// guessing (and a warn log fires via the "service is iam_enforceable
1228/// but has no mapping" branch in dispatch.rs).
1229fn s3_detect_action(
1230    method: &str,
1231    bucket: Option<&str>,
1232    key: Option<&str>,
1233    query: &std::collections::HashMap<String, String>,
1234) -> Option<&'static str> {
1235    let has = |q: &str| query.contains_key(q);
1236    let is_get = method == "GET";
1237    let is_put = method == "PUT";
1238    let is_post = method == "POST";
1239    let is_delete = method == "DELETE";
1240
1241    // Service root
1242    if bucket.is_none() {
1243        return match method {
1244            // A directory-bucket listing (`GET /?x-id=ListDirectoryBuckets`) is
1245            // authorized under `s3express:ListAllMyDirectoryBuckets`, NOT
1246            // `s3:ListBuckets`. Detecting it as ListBuckets let an
1247            // `s3:ListBuckets` grant wrongly enumerate directory buckets and
1248            // denied a policy that only granted the s3express action (bug-hunt
1249            // 2026-06-24, 5.5).
1250            "GET" if query.get("x-id").map(|s| s.as_str()) == Some("ListDirectoryBuckets") => {
1251                Some("ListAllMyDirectoryBuckets")
1252            }
1253            "GET" => Some("ListBuckets"),
1254            _ => None,
1255        };
1256    }
1257
1258    // WriteGetObjectResponse is an Object Lambda data-plane op routed as
1259    // `POST /WriteGetObjectResponse` (the literal value occupies the bucket
1260    // segment). AWS authorizes it under the separate
1261    // `s3-object-lambda:WriteGetObjectResponse` action — handled in
1262    // `iam_action_for`, which selects the `s3-object-lambda` service prefix
1263    // for this op. Previously unmapped -> enforcement skipped.
1264    if is_post && bucket == Some("WriteGetObjectResponse") && key.is_none() {
1265        return Some("WriteGetObjectResponse");
1266    }
1267
1268    let has_key = key.is_some();
1269
1270    // Multipart sub-resource forms
1271    if has_key && is_post && has("uploads") {
1272        return Some("CreateMultipartUpload");
1273    }
1274    if has_key && is_post && has("uploadId") {
1275        return Some("CompleteMultipartUpload");
1276    }
1277    if has_key && is_put && has("partNumber") && has("uploadId") {
1278        return Some("UploadPart");
1279    }
1280    if has_key && is_delete && has("uploadId") {
1281        return Some("AbortMultipartUpload");
1282    }
1283    if has_key && is_get && has("uploadId") {
1284        return Some("ListParts");
1285    }
1286    if !has_key && is_get && has("uploads") {
1287        return Some("ListMultipartUploads");
1288    }
1289
1290    // Sub-resource-keyed actions (?acl, ?tagging, ...). Order matters
1291    // since a request can carry multiple; we pick the most specific.
1292    // Object-level sub-resources come first (key present).
1293    if has_key {
1294        if has("tagging") {
1295            return Some(match method {
1296                "GET" => "GetObjectTagging",
1297                "PUT" => "PutObjectTagging",
1298                "DELETE" => "DeleteObjectTagging",
1299                _ => return None,
1300            });
1301        }
1302        if has("acl") {
1303            return Some(match method {
1304                "GET" => "GetObjectAcl",
1305                "PUT" => "PutObjectAcl",
1306                _ => return None,
1307            });
1308        }
1309        if has("retention") {
1310            return Some(match method {
1311                "GET" => "GetObjectRetention",
1312                "PUT" => "PutObjectRetention",
1313                _ => return None,
1314            });
1315        }
1316        if has("legal-hold") {
1317            return Some(match method {
1318                "GET" => "GetObjectLegalHold",
1319                "PUT" => "PutObjectLegalHold",
1320                _ => return None,
1321            });
1322        }
1323        // Identified by cubic on PR #399: both ?attributes and ?restore
1324        // need method guards — otherwise e.g. GET /bucket/key?restore
1325        // would be classified as RestoreObject (POST-only in AWS) and
1326        // IAM-evaluated against s3:RestoreObject instead of s3:GetObject.
1327        if has("attributes") && is_get {
1328            return Some("GetObjectAttributes");
1329        }
1330        if has("restore") && is_post {
1331            return Some("RestoreObject");
1332        }
1333        // RenameObject is an object-level op (`PUT /bucket/newkey?renameObject`
1334        // + `x-amz-rename-source`). It carries a key, so it must be detected
1335        // here — the `!has_key` arm below never sees it. Without this it fell
1336        // through to `PutObject`, so a policy denying `s3:RenameObject` was
1337        // silently ineffective.
1338        if has("renameObject") && is_put {
1339            return Some("RenameObject");
1340        }
1341        // UpdateObjectEncryption (`PUT /bucket/key?encryption`) changes an
1342        // existing object's server-side encryption. AWS gates this under
1343        // `s3:PutObject` (encryption is a write attribute), and there is no
1344        // distinct public `s3:UpdateObjectEncryption` IAM action — so map to
1345        // PutObject deliberately rather than letting it fall through.
1346        if has("encryption") && is_put {
1347            return Some("PutObject");
1348        }
1349        // SelectObjectContent (`POST /bucket/key?select&select-type=2`) reads
1350        // object data, so AWS authorizes it with `s3:GetObject`. Previously
1351        // unmapped -> `_ => None` -> enforcement skipped entirely.
1352        if has("select-type") && is_post {
1353            return Some("GetObject");
1354        }
1355    }
1356
1357    // Bucket-level sub-resources (key absent).
1358    if !has_key {
1359        if has("tagging") {
1360            return Some(match method {
1361                "GET" => "GetBucketTagging",
1362                "PUT" => "PutBucketTagging",
1363                "DELETE" => "DeleteBucketTagging",
1364                _ => return None,
1365            });
1366        }
1367        if has("acl") {
1368            return Some(match method {
1369                "GET" => "GetBucketAcl",
1370                "PUT" => "PutBucketAcl",
1371                _ => return None,
1372            });
1373        }
1374        if has("versioning") {
1375            return Some(match method {
1376                "GET" => "GetBucketVersioning",
1377                "PUT" => "PutBucketVersioning",
1378                _ => return None,
1379            });
1380        }
1381        if has("cors") {
1382            return Some(match method {
1383                "GET" => "GetBucketCors",
1384                "PUT" => "PutBucketCors",
1385                "DELETE" => "DeleteBucketCors",
1386                _ => return None,
1387            });
1388        }
1389        if has("policy") {
1390            return Some(match method {
1391                "GET" => "GetBucketPolicy",
1392                "PUT" => "PutBucketPolicy",
1393                "DELETE" => "DeleteBucketPolicy",
1394                _ => return None,
1395            });
1396        }
1397        if has("website") {
1398            return Some(match method {
1399                "GET" => "GetBucketWebsite",
1400                "PUT" => "PutBucketWebsite",
1401                "DELETE" => "DeleteBucketWebsite",
1402                _ => return None,
1403            });
1404        }
1405        if has("lifecycle") {
1406            return Some(match method {
1407                "GET" => "GetBucketLifecycleConfiguration",
1408                "PUT" => "PutBucketLifecycleConfiguration",
1409                "DELETE" => "DeleteBucketLifecycle",
1410                _ => return None,
1411            });
1412        }
1413        if has("encryption") {
1414            return Some(match method {
1415                "GET" => "GetBucketEncryption",
1416                "PUT" => "PutBucketEncryption",
1417                "DELETE" => "DeleteBucketEncryption",
1418                _ => return None,
1419            });
1420        }
1421        if has("logging") {
1422            return Some(match method {
1423                "GET" => "GetBucketLogging",
1424                "PUT" => "PutBucketLogging",
1425                _ => return None,
1426            });
1427        }
1428        if has("notification") {
1429            return Some(match method {
1430                "GET" => "GetBucketNotificationConfiguration",
1431                "PUT" => "PutBucketNotificationConfiguration",
1432                _ => return None,
1433            });
1434        }
1435        if has("replication") {
1436            return Some(match method {
1437                "GET" => "GetBucketReplication",
1438                "PUT" => "PutBucketReplication",
1439                "DELETE" => "DeleteBucketReplication",
1440                _ => return None,
1441            });
1442        }
1443        if has("ownershipControls") {
1444            return Some(match method {
1445                "GET" => "GetBucketOwnershipControls",
1446                "PUT" => "PutBucketOwnershipControls",
1447                "DELETE" => "DeleteBucketOwnershipControls",
1448                _ => return None,
1449            });
1450        }
1451        if has("publicAccessBlock") {
1452            return Some(match method {
1453                "GET" => "GetPublicAccessBlock",
1454                "PUT" => "PutPublicAccessBlock",
1455                "DELETE" => "DeletePublicAccessBlock",
1456                _ => return None,
1457            });
1458        }
1459        if has("accelerate") {
1460            return Some(match method {
1461                "GET" => "GetBucketAccelerateConfiguration",
1462                "PUT" => "PutBucketAccelerateConfiguration",
1463                _ => return None,
1464            });
1465        }
1466        if has("inventory") {
1467            return Some(match method {
1468                "GET" => "GetBucketInventoryConfiguration",
1469                "PUT" => "PutBucketInventoryConfiguration",
1470                "DELETE" => "DeleteBucketInventoryConfiguration",
1471                _ => return None,
1472            });
1473        }
1474        if has("analytics") {
1475            return Some(match method {
1476                "GET" if has("id") => "GetBucketAnalyticsConfiguration",
1477                "GET" => "ListBucketAnalyticsConfigurations",
1478                "PUT" => "PutBucketAnalyticsConfiguration",
1479                "DELETE" => "DeleteBucketAnalyticsConfiguration",
1480                _ => return None,
1481            });
1482        }
1483        if has("intelligent-tiering") {
1484            return Some(match method {
1485                "GET" if has("id") => "GetBucketIntelligentTieringConfiguration",
1486                "GET" => "ListBucketIntelligentTieringConfigurations",
1487                "PUT" => "PutBucketIntelligentTieringConfiguration",
1488                "DELETE" => "DeleteBucketIntelligentTieringConfiguration",
1489                _ => return None,
1490            });
1491        }
1492        if has("metrics") {
1493            return Some(match method {
1494                "GET" if has("id") => "GetBucketMetricsConfiguration",
1495                "GET" => "ListBucketMetricsConfigurations",
1496                "PUT" => "PutBucketMetricsConfiguration",
1497                "DELETE" => "DeleteBucketMetricsConfiguration",
1498                _ => return None,
1499            });
1500        }
1501        if has("requestPayment") {
1502            return Some(match method {
1503                "GET" => "GetBucketRequestPayment",
1504                "PUT" => "PutBucketRequestPayment",
1505                _ => return None,
1506            });
1507        }
1508        if has("policyStatus") && is_get {
1509            return Some("GetBucketPolicyStatus");
1510        }
1511        if has("metadataConfiguration") {
1512            return Some(match method {
1513                "GET" => "GetBucketMetadataConfiguration",
1514                "POST" => "CreateBucketMetadataConfiguration",
1515                "DELETE" => "DeleteBucketMetadataConfiguration",
1516                _ => return None,
1517            });
1518        }
1519        if has("metadataTable") {
1520            return Some(match method {
1521                "GET" => "GetBucketMetadataTableConfiguration",
1522                "POST" => "CreateBucketMetadataTableConfiguration",
1523                "DELETE" => "DeleteBucketMetadataTableConfiguration",
1524                _ => return None,
1525            });
1526        }
1527        if has("metadataInventoryTable") && is_put {
1528            return Some("UpdateBucketMetadataInventoryTableConfiguration");
1529        }
1530        if has("metadataJournalTable") && is_put {
1531            return Some("UpdateBucketMetadataJournalTableConfiguration");
1532        }
1533        if has("abac") {
1534            // PUT  -> PutBucketAbacConfiguration
1535            // GET  -> GetBucketAbacConfiguration (previously fell through to
1536            //         the plain `GET bucket` arm and was IAM-evaluated as
1537            //         s3:ListObjects — a policy on the ABAC config was never
1538            //         honored).
1539            return Some(match method {
1540                "PUT" => "PutBucketAbacConfiguration",
1541                "GET" => "GetBucketAbacConfiguration",
1542                _ => return None,
1543            });
1544        }
1545        if has("session") && is_get {
1546            // CreateSession (S3 Express directory buckets). AWS authorizes it
1547            // under the separate `s3express:CreateSession` action — handled in
1548            // `iam_action_for`, which selects the `s3express` service prefix
1549            // for this op. Previously fell through to the plain `GET bucket`
1550            // arm and was evaluated as s3:ListObjects.
1551            return Some("CreateSession");
1552        }
1553        if has("object-lock") {
1554            return Some(match method {
1555                "GET" => "GetObjectLockConfiguration",
1556                "PUT" => "PutObjectLockConfiguration",
1557                _ => return None,
1558            });
1559        }
1560        if has("location") {
1561            return Some("GetBucketLocation");
1562        }
1563        if is_post && has("delete") {
1564            return Some("DeleteObjects");
1565        }
1566        if is_get && has("versions") {
1567            return Some("ListObjectVersions");
1568        }
1569    }
1570
1571    // GET on an object with `?torrent` is GetObjectTorrent, authorized under
1572    // s3:GetObjectTorrent -- not s3:GetObject. Without this arm a policy that
1573    // allows GetObject but not GetObjectTorrent would wrongly permit it
1574    // (bug-audit 2026-06-20, 5.3).
1575    if is_get && has_key && has("torrent") {
1576        return Some("GetObjectTorrent");
1577    }
1578
1579    // Plain bucket/object methods.
1580    match (method, has_key) {
1581        ("GET", true) => Some("GetObject"),
1582        ("PUT", true) => {
1583            // CopyObject uses x-amz-copy-source but we don't have headers
1584            // handy here — treat both PutObject and CopyObject as PutObject
1585            // for IAM purposes; CopyObject additionally requires
1586            // s3:GetObject on the source but that's evaluated per-request
1587            // by real AWS, not on the PUT call itself.
1588            Some("PutObject")
1589        }
1590        ("DELETE", true) => Some("DeleteObject"),
1591        ("HEAD", true) => Some("HeadObject"),
1592        ("GET", false) => {
1593            if query.contains_key("list-type") {
1594                Some("ListObjectsV2")
1595            } else {
1596                Some("ListObjects")
1597            }
1598        }
1599        ("PUT", false) => Some("CreateBucket"),
1600        ("DELETE", false) => Some("DeleteBucket"),
1601        ("HEAD", false) => Some("HeadBucket"),
1602        _ => None,
1603    }
1604}
1605
1606/// Build the S3 resource ARN for an action. Returns `*` for
1607/// `ListBuckets` (account-scoped), a bucket ARN for bucket-level
1608/// configuration actions, or an object ARN for object-level actions.
1609fn s3_resource_for(action: &'static str, bucket: Option<&str>, key: Option<&str>) -> String {
1610    // Object-level actions work on `bucket/key`.
1611    const OBJECT_ACTIONS: &[&str] = &[
1612        "PutObject",
1613        "GetObject",
1614        "DeleteObject",
1615        "HeadObject",
1616        "CopyObject",
1617        "GetObjectAttributes",
1618        "RestoreObject",
1619        "PutObjectTagging",
1620        "GetObjectTagging",
1621        "DeleteObjectTagging",
1622        "PutObjectAcl",
1623        "GetObjectAcl",
1624        "PutObjectRetention",
1625        "GetObjectRetention",
1626        "PutObjectLegalHold",
1627        "GetObjectLegalHold",
1628        "CreateMultipartUpload",
1629        "UploadPart",
1630        "UploadPartCopy",
1631        "CompleteMultipartUpload",
1632        "AbortMultipartUpload",
1633        "ListParts",
1634    ];
1635    if action == "ListBuckets" {
1636        return "*".to_string();
1637    }
1638    let Some(bucket) = bucket else {
1639        return "*".to_string();
1640    };
1641    if OBJECT_ACTIONS.contains(&action) {
1642        match key {
1643            Some(k) if !k.is_empty() => Arn::s3(&format!("{bucket}/{k}")).to_string(),
1644            _ => Arn::s3(&format!("{bucket}/*")).to_string(),
1645        }
1646    } else {
1647        // Bucket-level actions (ListObjectsV2, GetBucketTagging, ...).
1648        Arn::s3(bucket).to_string()
1649    }
1650}
1651
1652// Conditional request helpers
1653
1654/// Truncate a DateTime to second-level precision (HTTP dates have no sub-second info).
1655pub(crate) fn truncate_to_seconds(dt: DateTime<Utc>) -> DateTime<Utc> {
1656    dt.with_nanosecond(0).unwrap_or(dt)
1657}
1658
1659pub(crate) fn check_get_conditionals(
1660    req: &AwsRequest,
1661    obj: &S3Object,
1662) -> Result<(), AwsServiceError> {
1663    let obj_etag = format!("\"{}\"", obj.etag);
1664    let obj_time = truncate_to_seconds(obj.last_modified);
1665
1666    // If-Match
1667    if let Some(if_match) = req.headers.get("if-match").and_then(|v| v.to_str().ok()) {
1668        if !etag_matches(if_match, &obj_etag) {
1669            return Err(precondition_failed("If-Match"));
1670        }
1671    }
1672
1673    // If-None-Match
1674    if let Some(if_none_match) = req
1675        .headers
1676        .get("if-none-match")
1677        .and_then(|v| v.to_str().ok())
1678    {
1679        if etag_matches(if_none_match, &obj_etag) {
1680            return Err(not_modified_with_etag(&obj_etag));
1681        }
1682    }
1683
1684    // If-Unmodified-Since
1685    if let Some(since) = req
1686        .headers
1687        .get("if-unmodified-since")
1688        .and_then(|v| v.to_str().ok())
1689    {
1690        if let Some(dt) = parse_http_date(since) {
1691            if obj_time > dt {
1692                return Err(precondition_failed("If-Unmodified-Since"));
1693            }
1694        }
1695    }
1696
1697    // If-Modified-Since
1698    if let Some(since) = req
1699        .headers
1700        .get("if-modified-since")
1701        .and_then(|v| v.to_str().ok())
1702    {
1703        if let Some(dt) = parse_http_date(since) {
1704            if obj_time <= dt {
1705                return Err(not_modified());
1706            }
1707        }
1708    }
1709
1710    Ok(())
1711}
1712
1713pub(crate) fn check_head_conditionals(
1714    req: &AwsRequest,
1715    obj: &S3Object,
1716) -> Result<(), AwsServiceError> {
1717    let obj_etag = format!("\"{}\"", obj.etag);
1718    let obj_time = truncate_to_seconds(obj.last_modified);
1719
1720    // If-Match
1721    if let Some(if_match) = req.headers.get("if-match").and_then(|v| v.to_str().ok()) {
1722        if !etag_matches(if_match, &obj_etag) {
1723            return Err(AwsServiceError::aws_error(
1724                StatusCode::PRECONDITION_FAILED,
1725                "412",
1726                "Precondition Failed",
1727            ));
1728        }
1729    }
1730
1731    // If-None-Match
1732    if let Some(if_none_match) = req
1733        .headers
1734        .get("if-none-match")
1735        .and_then(|v| v.to_str().ok())
1736    {
1737        if etag_matches(if_none_match, &obj_etag) {
1738            return Err(not_modified_with_etag(&obj_etag));
1739        }
1740    }
1741
1742    // If-Unmodified-Since
1743    if let Some(since) = req
1744        .headers
1745        .get("if-unmodified-since")
1746        .and_then(|v| v.to_str().ok())
1747    {
1748        if let Some(dt) = parse_http_date(since) {
1749            if obj_time > dt {
1750                return Err(AwsServiceError::aws_error(
1751                    StatusCode::PRECONDITION_FAILED,
1752                    "412",
1753                    "Precondition Failed",
1754                ));
1755            }
1756        }
1757    }
1758
1759    // If-Modified-Since
1760    if let Some(since) = req
1761        .headers
1762        .get("if-modified-since")
1763        .and_then(|v| v.to_str().ok())
1764    {
1765        if let Some(dt) = parse_http_date(since) {
1766            if obj_time <= dt {
1767                return Err(not_modified());
1768            }
1769        }
1770    }
1771
1772    Ok(())
1773}
1774
1775pub(crate) fn etag_matches(condition: &str, obj_etag: &str) -> bool {
1776    let condition = condition.trim();
1777    if condition == "*" {
1778        return true;
1779    }
1780    let clean_etag = obj_etag.replace('"', "");
1781    // Split on comma to handle multi-value If-Match / If-None-Match
1782    for part in condition.split(',') {
1783        let part = part.trim().replace('"', "");
1784        if part == clean_etag {
1785            return true;
1786        }
1787    }
1788    false
1789}
1790
1791pub(crate) fn parse_http_date(s: &str) -> Option<DateTime<Utc>> {
1792    // Try RFC 2822 format: "Sat, 01 Jan 2000 00:00:00 GMT"
1793    if let Ok(dt) = DateTime::parse_from_rfc2822(s) {
1794        return Some(dt.with_timezone(&Utc));
1795    }
1796    // Try RFC 3339
1797    if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
1798        return Some(dt.with_timezone(&Utc));
1799    }
1800    // Try common HTTP date format: "%a, %d %b %Y %H:%M:%S GMT"
1801    if let Ok(dt) =
1802        chrono::NaiveDateTime::parse_from_str(s.trim_end_matches(" GMT"), "%a, %d %b %Y %H:%M:%S")
1803    {
1804        return Some(dt.and_utc());
1805    }
1806    // Try ISO 8601
1807    if let Ok(dt) = s.parse::<DateTime<Utc>>() {
1808        return Some(dt);
1809    }
1810    None
1811}
1812
1813pub(crate) fn not_modified() -> AwsServiceError {
1814    AwsServiceError::aws_error(StatusCode::NOT_MODIFIED, "304", "Not Modified")
1815}
1816
1817pub(crate) fn not_modified_with_etag(etag: &str) -> AwsServiceError {
1818    AwsServiceError::aws_error_with_headers(
1819        StatusCode::NOT_MODIFIED,
1820        "304",
1821        "Not Modified",
1822        vec![("etag".to_string(), etag.to_string())],
1823    )
1824}
1825
1826pub(crate) fn precondition_failed(condition: &str) -> AwsServiceError {
1827    AwsServiceError::aws_error_with_fields(
1828        StatusCode::PRECONDITION_FAILED,
1829        "PreconditionFailed",
1830        "At least one of the pre-conditions you specified did not hold",
1831        vec![("Condition".to_string(), condition.to_string())],
1832    )
1833}
1834
1835// ACL helpers
1836
1837pub(crate) fn build_acl_xml(owner_id: &str, grants: &[AclGrant], _account_id: &str) -> String {
1838    let mut grants_xml = String::new();
1839    for g in grants {
1840        let grantee_xml = if g.grantee_type == "Group" {
1841            let uri = g.grantee_uri.as_deref().unwrap_or("");
1842            format!(
1843                "<Grantee xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xsi:type=\"Group\">\
1844                 <URI>{}</URI></Grantee>",
1845                xml_escape(uri),
1846            )
1847        } else {
1848            let id = g.grantee_id.as_deref().unwrap_or("");
1849            format!(
1850                "<Grantee xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xsi:type=\"CanonicalUser\">\
1851                 <ID>{}</ID></Grantee>",
1852                xml_escape(id),
1853            )
1854        };
1855        grants_xml.push_str(&format!(
1856            "<Grant>{grantee_xml}<Permission>{}</Permission></Grant>",
1857            xml_escape(&g.permission),
1858        ));
1859    }
1860
1861    format!(
1862        "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
1863         <AccessControlPolicy xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
1864         <Owner><ID>{owner_id}</ID><DisplayName>{owner_id}</DisplayName></Owner>\
1865         <AccessControlList>{grants_xml}</AccessControlList>\
1866         </AccessControlPolicy>",
1867        owner_id = xml_escape(owner_id),
1868    )
1869}
1870
1871pub(crate) fn canned_acl_grants(acl: &str, owner_id: &str) -> Vec<AclGrant> {
1872    let owner_grant = AclGrant {
1873        grantee_type: "CanonicalUser".to_string(),
1874        grantee_id: Some(owner_id.to_string()),
1875        grantee_display_name: Some(owner_id.to_string()),
1876        grantee_uri: None,
1877        permission: "FULL_CONTROL".to_string(),
1878    };
1879    match acl {
1880        "private" => vec![owner_grant],
1881        "public-read" => vec![
1882            owner_grant,
1883            AclGrant {
1884                grantee_type: "Group".to_string(),
1885                grantee_id: None,
1886                grantee_display_name: None,
1887                grantee_uri: Some("http://acs.amazonaws.com/groups/global/AllUsers".to_string()),
1888                permission: "READ".to_string(),
1889            },
1890        ],
1891        "public-read-write" => vec![
1892            owner_grant,
1893            AclGrant {
1894                grantee_type: "Group".to_string(),
1895                grantee_id: None,
1896                grantee_display_name: None,
1897                grantee_uri: Some("http://acs.amazonaws.com/groups/global/AllUsers".to_string()),
1898                permission: "READ".to_string(),
1899            },
1900            AclGrant {
1901                grantee_type: "Group".to_string(),
1902                grantee_id: None,
1903                grantee_display_name: None,
1904                grantee_uri: Some("http://acs.amazonaws.com/groups/global/AllUsers".to_string()),
1905                permission: "WRITE".to_string(),
1906            },
1907        ],
1908        "authenticated-read" => vec![
1909            owner_grant,
1910            AclGrant {
1911                grantee_type: "Group".to_string(),
1912                grantee_id: None,
1913                grantee_display_name: None,
1914                grantee_uri: Some(
1915                    "http://acs.amazonaws.com/groups/global/AuthenticatedUsers".to_string(),
1916                ),
1917                permission: "READ".to_string(),
1918            },
1919        ],
1920        "bucket-owner-full-control" => vec![owner_grant],
1921        _ => vec![owner_grant],
1922    }
1923}
1924
1925pub(crate) fn canned_acl_grants_for_object(acl: &str, owner_id: &str) -> Vec<AclGrant> {
1926    // For objects, canned ACLs work the same way
1927    canned_acl_grants(acl, owner_id)
1928}
1929
1930pub(crate) fn parse_grant_headers(headers: &HeaderMap) -> Vec<AclGrant> {
1931    let mut grants = Vec::new();
1932    let header_permission_map = [
1933        ("x-amz-grant-read", "READ"),
1934        ("x-amz-grant-write", "WRITE"),
1935        ("x-amz-grant-read-acp", "READ_ACP"),
1936        ("x-amz-grant-write-acp", "WRITE_ACP"),
1937        ("x-amz-grant-full-control", "FULL_CONTROL"),
1938    ];
1939
1940    for (header, permission) in &header_permission_map {
1941        if let Some(value) = headers.get(*header).and_then(|v| v.to_str().ok()) {
1942            // Parse "id=xxx" or "uri=xxx" or "emailAddress=xxx"
1943            for part in value.split(',') {
1944                let part = part.trim();
1945                if let Some((key, val)) = part.split_once('=') {
1946                    let val = val.trim().trim_matches('"');
1947                    let key = key.trim().to_lowercase();
1948                    match key.as_str() {
1949                        "id" => {
1950                            grants.push(AclGrant {
1951                                grantee_type: "CanonicalUser".to_string(),
1952                                grantee_id: Some(val.to_string()),
1953                                grantee_display_name: Some(val.to_string()),
1954                                grantee_uri: None,
1955                                permission: permission.to_string(),
1956                            });
1957                        }
1958                        "uri" | "url" => {
1959                            grants.push(AclGrant {
1960                                grantee_type: "Group".to_string(),
1961                                grantee_id: None,
1962                                grantee_display_name: None,
1963                                grantee_uri: Some(val.to_string()),
1964                                permission: permission.to_string(),
1965                            });
1966                        }
1967                        _ => {}
1968                    }
1969                }
1970            }
1971        }
1972    }
1973    grants
1974}
1975
1976pub(crate) fn parse_acl_xml(xml: &str) -> Result<Vec<AclGrant>, AwsServiceError> {
1977    // Check for Owner presence
1978    if xml.contains("<AccessControlPolicy") && !xml.contains("<Owner>") {
1979        return Err(AwsServiceError::aws_error(
1980            StatusCode::BAD_REQUEST,
1981            "MalformedACLError",
1982            "The XML you provided was not well-formed or did not validate against our published schema",
1983        ));
1984    }
1985
1986    let valid_permissions = ["READ", "WRITE", "READ_ACP", "WRITE_ACP", "FULL_CONTROL"];
1987
1988    let mut grants = Vec::new();
1989    let mut remaining = xml;
1990    while let Some(start) = remaining.find("<Grant>") {
1991        let after = &remaining[start + 7..];
1992        if let Some(end) = after.find("</Grant>") {
1993            let grant_body = &after[..end];
1994
1995            // Extract permission
1996            let permission = extract_xml_value(grant_body, "Permission").unwrap_or_default();
1997            if !valid_permissions.contains(&permission.as_str()) {
1998                return Err(AwsServiceError::aws_error(
1999                    StatusCode::BAD_REQUEST,
2000                    "MalformedACLError",
2001                    "The XML you provided was not well-formed or did not validate against our published schema",
2002                ));
2003            }
2004
2005            // Determine grantee type
2006            if grant_body.contains("xsi:type=\"Group\"") || grant_body.contains("<URI>") {
2007                let uri = extract_xml_value(grant_body, "URI").unwrap_or_default();
2008                grants.push(AclGrant {
2009                    grantee_type: "Group".to_string(),
2010                    grantee_id: None,
2011                    grantee_display_name: None,
2012                    grantee_uri: Some(uri),
2013                    permission,
2014                });
2015            } else {
2016                let id = extract_xml_value(grant_body, "ID").unwrap_or_default();
2017                let display =
2018                    extract_xml_value(grant_body, "DisplayName").unwrap_or_else(|| id.clone());
2019                grants.push(AclGrant {
2020                    grantee_type: "CanonicalUser".to_string(),
2021                    grantee_id: Some(id),
2022                    grantee_display_name: Some(display),
2023                    grantee_uri: None,
2024                    permission,
2025                });
2026            }
2027
2028            remaining = &after[end + 8..];
2029        } else {
2030            break;
2031        }
2032    }
2033    Ok(grants)
2034}
2035
2036// Range helpers
2037
2038pub(crate) enum RangeResult {
2039    Satisfiable { start: usize, end: usize },
2040    NotSatisfiable,
2041    Ignored,
2042}
2043
2044pub(crate) fn parse_range_header(range_str: &str, total_size: usize) -> Option<RangeResult> {
2045    let range_str = range_str.strip_prefix("bytes=")?;
2046    let (start_str, end_str) = range_str.split_once('-')?;
2047    if start_str.is_empty() {
2048        let suffix_len: usize = end_str.parse().ok()?;
2049        if suffix_len == 0 || total_size == 0 {
2050            return Some(RangeResult::NotSatisfiable);
2051        }
2052        let start = total_size.saturating_sub(suffix_len);
2053        Some(RangeResult::Satisfiable {
2054            start,
2055            end: total_size - 1,
2056        })
2057    } else {
2058        let start: usize = start_str.parse().ok()?;
2059        if start >= total_size {
2060            return Some(RangeResult::NotSatisfiable);
2061        }
2062        let end = if end_str.is_empty() {
2063            total_size - 1
2064        } else {
2065            let e: usize = end_str.parse().ok()?;
2066            if e < start {
2067                return Some(RangeResult::Ignored);
2068            }
2069            std::cmp::min(e, total_size - 1)
2070        };
2071        Some(RangeResult::Satisfiable { start, end })
2072    }
2073}
2074
2075// Helpers
2076
2077/// S3 XML response with `application/xml` content type (unlike Query protocol's `text/xml`).
2078pub(crate) fn s3_xml(status: StatusCode, body: impl Into<Bytes>) -> AwsResponse {
2079    AwsResponse {
2080        status,
2081        content_type: "application/xml".to_string(),
2082        body: body.into().into(),
2083        headers: HeaderMap::new(),
2084    }
2085}
2086
2087pub(crate) fn empty_response(status: StatusCode) -> AwsResponse {
2088    AwsResponse {
2089        status,
2090        content_type: "application/xml".to_string(),
2091        body: Bytes::new().into(),
2092        headers: HeaderMap::new(),
2093    }
2094}
2095
2096/// Returns true when the object is stored in a "cold" storage class (GLACIER, DEEP_ARCHIVE)
2097/// and has NOT been restored (or restore is still in progress).
2098pub(crate) fn is_frozen(obj: &S3Object) -> bool {
2099    matches!(obj.storage_class.as_str(), "GLACIER" | "DEEP_ARCHIVE")
2100        && obj.restore_ongoing != Some(false)
2101}
2102
2103pub(crate) fn no_such_bucket(bucket: &str) -> AwsServiceError {
2104    AwsServiceError::aws_error_with_fields(
2105        StatusCode::NOT_FOUND,
2106        "NoSuchBucket",
2107        "The specified bucket does not exist",
2108        vec![("BucketName".to_string(), bucket.to_string())],
2109    )
2110}
2111
2112pub(crate) fn no_such_key(key: &str) -> AwsServiceError {
2113    AwsServiceError::aws_error_with_fields(
2114        StatusCode::NOT_FOUND,
2115        "NoSuchKey",
2116        "The specified key does not exist.",
2117        vec![("Key".to_string(), key.to_string())],
2118    )
2119}
2120
2121pub(crate) fn no_such_upload(upload_id: &str) -> AwsServiceError {
2122    AwsServiceError::aws_error_with_fields(
2123        StatusCode::NOT_FOUND,
2124        "NoSuchUpload",
2125        "The specified upload does not exist. The upload ID may be invalid, \
2126         or the upload may have been aborted or completed.",
2127        vec![("UploadId".to_string(), upload_id.to_string())],
2128    )
2129}
2130
2131pub(crate) fn no_such_key_with_detail(key: &str) -> AwsServiceError {
2132    AwsServiceError::aws_error_with_fields(
2133        StatusCode::NOT_FOUND,
2134        "NoSuchKey",
2135        "The specified key does not exist.",
2136        vec![("Key".to_string(), key.to_string())],
2137    )
2138}
2139
2140pub(crate) fn compute_md5(data: &[u8]) -> String {
2141    let digest = Md5::digest(data);
2142    format!("{:x}", digest)
2143}
2144
2145pub(crate) fn compute_checksum(algorithm: &str, data: &[u8]) -> String {
2146    let raw = compute_checksum_raw(algorithm, data);
2147    if raw.is_empty() {
2148        String::new()
2149    } else {
2150        BASE64.encode(raw)
2151    }
2152}
2153
2154/// Compute the raw (un-base64'd) checksum digest bytes for `data`.
2155///
2156/// Returns the network-order digest bytes so callers can either base64
2157/// them ([`compute_checksum`]) or concatenate them when computing an S3
2158/// multipart COMPOSITE checksum (which hashes the concatenation of each
2159/// part's raw digest, then base64s the result and appends `-N`).
2160pub(crate) fn compute_checksum_raw(algorithm: &str, data: &[u8]) -> Vec<u8> {
2161    match algorithm {
2162        "CRC32" => crc32fast::hash(data).to_be_bytes().to_vec(),
2163        // CRC32C and CRC64NVME use the same crates as the streaming path so
2164        // CopyObject / CompleteMultipartUpload produce identical results to
2165        // a streaming PutObject (1.7); the COMPOSITE multipart checksum
2166        // concatenates these raw digests before hashing (1.18).
2167        "CRC32C" => crc32c::crc32c(data).to_be_bytes().to_vec(),
2168        "CRC64NVME" => {
2169            let mut hasher = crc64fast_nvme::Digest::new();
2170            hasher.write(data);
2171            hasher.sum64().to_be_bytes().to_vec()
2172        }
2173        "SHA1" => {
2174            use sha1::Digest as _;
2175            sha1::Sha1::digest(data).to_vec()
2176        }
2177        "SHA256" => {
2178            use sha2::Digest as _;
2179            sha2::Sha256::digest(data).to_vec()
2180        }
2181        _ => Vec::new(),
2182    }
2183}
2184
2185/// Compute an S3 multipart COMPOSITE additional checksum from each
2186/// part's raw digest bytes (in part-number order). AWS defines this as
2187/// `base64(HASH(concat(raw_part_digests)))-N` where HASH is the chosen
2188/// algorithm and N is the part count. Returns `None` for an unsupported
2189/// algorithm or empty part list.
2190pub(crate) fn compute_composite_checksum(
2191    algorithm: &str,
2192    part_raw_digests: &[Vec<u8>],
2193) -> Option<String> {
2194    if part_raw_digests.is_empty() {
2195        return None;
2196    }
2197    let mut concat = Vec::new();
2198    for d in part_raw_digests {
2199        concat.extend_from_slice(d);
2200    }
2201    let digest = compute_checksum_raw(algorithm, &concat);
2202    if digest.is_empty() {
2203        return None;
2204    }
2205    Some(format!(
2206        "{}-{}",
2207        BASE64.encode(digest),
2208        part_raw_digests.len()
2209    ))
2210}
2211
2212/// Streaming variant of [`compute_checksum`] for spool files. Reads
2213/// the file in 1 MiB chunks and feeds each chunk into the hasher so a
2214/// 1 GiB upload computes its CRC32 / SHA-1 / SHA-256 in constant
2215/// memory rather than via `tokio::fs::read` (which would allocate the
2216/// whole file as one buffer).
2217pub(crate) async fn compute_checksum_streaming(
2218    algorithm: &str,
2219    path: &std::path::Path,
2220) -> Result<String, std::io::Error> {
2221    use tokio::io::AsyncReadExt;
2222    let mut file = tokio::fs::File::open(path).await?;
2223    let mut buf = vec![0u8; 1024 * 1024];
2224    match algorithm {
2225        "CRC32" => {
2226            let mut hasher = crc32fast::Hasher::new();
2227            loop {
2228                let n = file.read(&mut buf).await?;
2229                if n == 0 {
2230                    break;
2231                }
2232                hasher.update(&buf[..n]);
2233            }
2234            Ok(BASE64.encode(hasher.finalize().to_be_bytes()))
2235        }
2236        "CRC32C" => {
2237            let mut crc: u32 = 0;
2238            loop {
2239                let n = file.read(&mut buf).await?;
2240                if n == 0 {
2241                    break;
2242                }
2243                crc = crc32c::crc32c_append(crc, &buf[..n]);
2244            }
2245            Ok(BASE64.encode(crc.to_be_bytes()))
2246        }
2247        "CRC64NVME" => {
2248            let mut hasher = crc64fast_nvme::Digest::new();
2249            loop {
2250                let n = file.read(&mut buf).await?;
2251                if n == 0 {
2252                    break;
2253                }
2254                hasher.write(&buf[..n]);
2255            }
2256            Ok(BASE64.encode(hasher.sum64().to_be_bytes()))
2257        }
2258        "SHA1" => {
2259            use sha1::Digest as _;
2260            let mut hasher = sha1::Sha1::new();
2261            loop {
2262                let n = file.read(&mut buf).await?;
2263                if n == 0 {
2264                    break;
2265                }
2266                hasher.update(&buf[..n]);
2267            }
2268            Ok(BASE64.encode(hasher.finalize()))
2269        }
2270        "SHA256" => {
2271            use sha2::Digest as _;
2272            let mut hasher = sha2::Sha256::new();
2273            loop {
2274                let n = file.read(&mut buf).await?;
2275                if n == 0 {
2276                    break;
2277                }
2278                hasher.update(&buf[..n]);
2279            }
2280            Ok(BASE64.encode(hasher.finalize()))
2281        }
2282        _ => Ok(String::new()),
2283    }
2284}
2285
2286pub(crate) fn url_encode_s3_key(s: &str) -> String {
2287    let mut out = String::with_capacity(s.len() * 2);
2288    for byte in s.bytes() {
2289        match byte {
2290            b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' | b'/' => {
2291                out.push(byte as char);
2292            }
2293            _ => {
2294                out.push_str(&format!("%{:02X}", byte));
2295            }
2296        }
2297    }
2298    out
2299}
2300
2301pub(crate) use fakecloud_aws::xml::xml_escape;
2302
2303pub(crate) fn extract_user_metadata(
2304    headers: &HeaderMap,
2305) -> std::collections::BTreeMap<String, String> {
2306    let mut meta = std::collections::BTreeMap::new();
2307    for (name, value) in headers {
2308        if let Some(key) = name.as_str().strip_prefix("x-amz-meta-") {
2309            if let Ok(v) = value.to_str() {
2310                meta.insert(key.to_string(), v.to_string());
2311            }
2312        }
2313    }
2314    meta
2315}
2316
2317pub(crate) fn is_valid_storage_class(class: &str) -> bool {
2318    matches!(
2319        class,
2320        "STANDARD"
2321            | "REDUCED_REDUNDANCY"
2322            | "STANDARD_IA"
2323            | "ONEZONE_IA"
2324            | "INTELLIGENT_TIERING"
2325            | "GLACIER"
2326            | "DEEP_ARCHIVE"
2327            | "GLACIER_IR"
2328            | "OUTPOSTS"
2329            | "SNOW"
2330            | "EXPRESS_ONEZONE"
2331    )
2332}
2333
2334pub(crate) fn is_valid_bucket_name(name: &str) -> bool {
2335    if name.len() < 3 || name.len() > 63 {
2336        return false;
2337    }
2338    // Must start and end with alphanumeric
2339    let bytes = name.as_bytes();
2340    if !bytes[0].is_ascii_alphanumeric() || !bytes[bytes.len() - 1].is_ascii_alphanumeric() {
2341        return false;
2342    }
2343    // Only lowercase letters, digits, hyphens, dots (also allow underscores for compatibility)
2344    name.chars()
2345        .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '-' || c == '.' || c == '_')
2346}
2347
2348pub(crate) fn is_valid_region(region: &str) -> bool {
2349    // Basic validation: region should match pattern like us-east-1, eu-west-2, etc.
2350    let valid_regions = [
2351        "us-east-1",
2352        "us-east-2",
2353        "us-west-1",
2354        "us-west-2",
2355        "af-south-1",
2356        "ap-east-1",
2357        "ap-south-1",
2358        "ap-south-2",
2359        "ap-southeast-1",
2360        "ap-southeast-2",
2361        "ap-southeast-3",
2362        "ap-southeast-4",
2363        "ap-northeast-1",
2364        "ap-northeast-2",
2365        "ap-northeast-3",
2366        "ca-central-1",
2367        "ca-west-1",
2368        "eu-central-1",
2369        "eu-central-2",
2370        "eu-west-1",
2371        "eu-west-2",
2372        "eu-west-3",
2373        "eu-south-1",
2374        "eu-south-2",
2375        "eu-north-1",
2376        "il-central-1",
2377        "me-south-1",
2378        "me-central-1",
2379        "sa-east-1",
2380        "cn-north-1",
2381        "cn-northwest-1",
2382        "us-gov-east-1",
2383        "us-gov-east-2",
2384        "us-gov-west-1",
2385        "us-iso-east-1",
2386        "us-iso-west-1",
2387        "us-isob-east-1",
2388        "us-isof-south-1",
2389    ];
2390    valid_regions.contains(&region)
2391}
2392
2393pub(crate) fn resolve_object<'a>(
2394    b: &'a S3Bucket,
2395    key: &str,
2396    version_id: Option<&String>,
2397) -> Result<&'a S3Object, AwsServiceError> {
2398    if let Some(vid) = version_id {
2399        // "null" version ID refers to an object with no version_id (pre-versioning)
2400        if vid == "null" {
2401            // Check versions for a pre-versioning object (version_id == None or Some("null"))
2402            if let Some(versions) = b.object_versions.get(key) {
2403                if let Some(obj) = versions
2404                    .iter()
2405                    .find(|o| o.version_id.is_none() || o.version_id.as_deref() == Some("null"))
2406                {
2407                    return Ok(obj);
2408                }
2409            }
2410            // Also check current object if it has no version_id
2411            if let Some(obj) = b.objects.get(key) {
2412                if obj.version_id.is_none() || obj.version_id.as_deref() == Some("null") {
2413                    return Ok(obj);
2414                }
2415            }
2416        } else {
2417            // When a specific versionId is requested, check versions first
2418            if let Some(versions) = b.object_versions.get(key) {
2419                if let Some(obj) = versions
2420                    .iter()
2421                    .find(|o| o.version_id.as_deref() == Some(vid.as_str()))
2422                {
2423                    return Ok(obj);
2424                }
2425            }
2426            // Also check current object
2427            if let Some(obj) = b.objects.get(key) {
2428                if obj.version_id.as_deref() == Some(vid.as_str()) {
2429                    return Ok(obj);
2430                }
2431            }
2432        }
2433        // For versioned buckets, return NoSuchVersion; for non-versioned, return 400
2434        if b.versioning.is_some() {
2435            Err(AwsServiceError::aws_error_with_fields(
2436                StatusCode::NOT_FOUND,
2437                "NoSuchVersion",
2438                "The specified version does not exist.",
2439                vec![
2440                    ("Key".to_string(), key.to_string()),
2441                    ("VersionId".to_string(), vid.to_string()),
2442                ],
2443            ))
2444        } else {
2445            Err(AwsServiceError::aws_error(
2446                StatusCode::BAD_REQUEST,
2447                "InvalidArgument",
2448                "Invalid version id specified",
2449            ))
2450        }
2451    } else {
2452        b.objects.get(key).ok_or_else(|| no_such_key(key))
2453    }
2454}
2455
2456pub(crate) fn make_delete_marker(key: &str, dm_id: &str) -> S3Object {
2457    S3Object {
2458        key: key.to_string(),
2459        last_modified: Utc::now(),
2460        storage_class: "STANDARD".to_string(),
2461        version_id: Some(dm_id.to_string()),
2462        is_delete_marker: true,
2463        ..Default::default()
2464    }
2465}
2466
2467/// Represents an object to delete in a batch delete request.
2468pub(crate) struct DeleteObjectEntry {
2469    key: String,
2470    version_id: Option<String>,
2471}
2472
2473pub(crate) fn parse_delete_objects_xml(xml: &str) -> Vec<DeleteObjectEntry> {
2474    let mut entries = Vec::new();
2475    let mut remaining = xml;
2476    while let Some(obj_start) = remaining.find("<Object>") {
2477        let after = &remaining[obj_start + 8..];
2478        if let Some(obj_end) = after.find("</Object>") {
2479            let obj_body = &after[..obj_end];
2480            let key = extract_xml_value(obj_body, "Key");
2481            let version_id = extract_xml_value(obj_body, "VersionId");
2482            if let Some(k) = key {
2483                entries.push(DeleteObjectEntry { key: k, version_id });
2484            }
2485            remaining = &after[obj_end + 9..];
2486        } else {
2487            break;
2488        }
2489    }
2490    entries
2491}
2492
2493/// Returns true when the request body's top-level <Quiet> element is "true".
2494/// AWS docs: when Quiet=true, only failed deletes are emitted.
2495pub(crate) fn parse_delete_objects_quiet(xml: &str) -> bool {
2496    extract_xml_value(xml, "Quiet")
2497        .map(|v| v.eq_ignore_ascii_case("true"))
2498        .unwrap_or(false)
2499}
2500
2501/// Minimal XML parser for `<Tagging><TagSet><Tag><Key>k</Key><Value>v</Value></Tag>...`.
2502/// Returns a Vec to preserve insertion order and detect duplicates.
2503pub(crate) fn parse_tagging_xml(xml: &str) -> Vec<(String, String)> {
2504    let mut tags = Vec::new();
2505    let mut remaining = xml;
2506    while let Some(tag_start) = remaining.find("<Tag>") {
2507        let after = &remaining[tag_start + 5..];
2508        if let Some(tag_end) = after.find("</Tag>") {
2509            let tag_body = &after[..tag_end];
2510            let key = extract_xml_value(tag_body, "Key");
2511            let value = extract_xml_value(tag_body, "Value");
2512            if let (Some(k), Some(v)) = (key, value) {
2513                tags.push((k, v));
2514            }
2515            remaining = &after[tag_end + 6..];
2516        } else {
2517            break;
2518        }
2519    }
2520    tags
2521}
2522
2523pub(crate) fn validate_tags(tags: &[(String, String)]) -> Result<(), AwsServiceError> {
2524    // Check for duplicate keys
2525    let mut seen = std::collections::HashSet::new();
2526    for (k, _) in tags {
2527        if !seen.insert(k.as_str()) {
2528            return Err(AwsServiceError::aws_error(
2529                StatusCode::BAD_REQUEST,
2530                "InvalidTag",
2531                "Cannot provide multiple Tags with the same key",
2532            ));
2533        }
2534        // Check for aws: prefix
2535        if k.starts_with("aws:") {
2536            return Err(AwsServiceError::aws_error(
2537                StatusCode::BAD_REQUEST,
2538                "InvalidTag",
2539                "System tags cannot be added/updated by requester",
2540            ));
2541        }
2542    }
2543    Ok(())
2544}
2545
2546pub(crate) fn extract_xml_value(xml: &str, tag: &str) -> Option<String> {
2547    // Handle self-closing tags like <Value /> or <Value/>
2548    let self_closing1 = format!("<{tag} />");
2549    let self_closing2 = format!("<{tag}/>");
2550    if xml.contains(&self_closing1) || xml.contains(&self_closing2) {
2551        // Check if the self-closing tag appears before any open+close pair
2552        let self_pos = xml
2553            .find(&self_closing1)
2554            .or_else(|| xml.find(&self_closing2));
2555        let open = format!("<{tag}>");
2556        let open_pos = xml.find(&open);
2557        match (self_pos, open_pos) {
2558            (Some(sp), Some(op)) if sp < op => return Some(String::new()),
2559            (Some(_), None) => return Some(String::new()),
2560            _ => {}
2561        }
2562    }
2563
2564    let open = format!("<{tag}>");
2565    let close = format!("</{tag}>");
2566    let start = xml.find(&open)? + open.len();
2567    // Search for the closing tag AFTER the opening one so a malformed body
2568    // (closing tag before opening) can't make end < start and panic the
2569    // slice (bug-audit 2026-05-28, 2.2).
2570    let end = xml[start..].find(&close)? + start;
2571    Some(xml[start..end].to_string())
2572}
2573
2574/// Parse the CompleteMultipartUpload XML body into (part_number, etag) pairs.
2575pub(crate) fn parse_complete_multipart_xml(xml: &str) -> Vec<(u32, String)> {
2576    let mut parts = Vec::new();
2577    let mut remaining = xml;
2578    while let Some(part_start) = remaining.find("<Part>") {
2579        let after = &remaining[part_start + 6..];
2580        if let Some(part_end) = after.find("</Part>") {
2581            let part_body = &after[..part_end];
2582            let part_num =
2583                extract_xml_value(part_body, "PartNumber").and_then(|s| s.parse::<u32>().ok());
2584            let etag = extract_xml_value(part_body, "ETag")
2585                .map(|s| s.replace("&quot;", "").replace('"', ""));
2586            if let (Some(num), Some(e)) = (part_num, etag) {
2587                parts.push((num, e));
2588            }
2589            remaining = &after[part_end + 7..];
2590        } else {
2591            break;
2592        }
2593    }
2594    parts
2595}
2596
2597pub(crate) fn parse_url_encoded_tags(s: &str) -> Vec<(String, String)> {
2598    let mut tags = Vec::new();
2599    for pair in s.split('&') {
2600        if pair.is_empty() {
2601            continue;
2602        }
2603        let (key, value) = match pair.find('=') {
2604            Some(pos) => (&pair[..pos], &pair[pos + 1..]),
2605            None => (pair, ""),
2606        };
2607        tags.push((
2608            percent_encoding::percent_decode_str(key)
2609                .decode_utf8_lossy()
2610                .to_string(),
2611            percent_encoding::percent_decode_str(value)
2612                .decode_utf8_lossy()
2613                .to_string(),
2614        ));
2615    }
2616    tags
2617}
2618
2619/// Validate lifecycle configuration XML. Returns MalformedXML on invalid configs.
2620pub(crate) fn validate_lifecycle_xml(xml: &str) -> Result<(), AwsServiceError> {
2621    let malformed = || {
2622        AwsServiceError::aws_error(
2623            StatusCode::BAD_REQUEST,
2624            "MalformedXML",
2625            "The XML you provided was not well-formed or did not validate against our published schema",
2626        )
2627    };
2628
2629    let mut remaining = xml;
2630    while let Some(rule_start) = remaining.find("<Rule>") {
2631        let after = &remaining[rule_start + 6..];
2632        if let Some(rule_end) = after.find("</Rule>") {
2633            let rule_body = &after[..rule_end];
2634
2635            // Must have Filter or Prefix
2636            let has_filter = rule_body.contains("<Filter>")
2637                || rule_body.contains("<Filter/>")
2638                || rule_body.contains("<Filter />");
2639
2640            // Check for <Prefix> at rule level (outside of <Filter>...</Filter>)
2641            let has_prefix_outside_filter = {
2642                if !rule_body.contains("<Prefix") {
2643                    false
2644                } else if !has_filter {
2645                    true // No filter means any Prefix is at rule level
2646                } else {
2647                    // Remove the Filter block and check if Prefix remains
2648                    let mut stripped = rule_body.to_string();
2649                    // Remove <Filter>...</Filter> or self-closing variants
2650                    if let Some(fs) = stripped.find("<Filter") {
2651                        if let Some(fe) = stripped.find("</Filter>") {
2652                            stripped = format!("{}{}", &stripped[..fs], &stripped[fe + 9..]);
2653                        }
2654                    }
2655                    stripped.contains("<Prefix")
2656                }
2657            };
2658
2659            if !has_filter && !has_prefix_outside_filter {
2660                return Err(malformed());
2661            }
2662            // Can't have both Filter and rule-level Prefix
2663            if has_filter && has_prefix_outside_filter {
2664                return Err(malformed());
2665            }
2666
2667            // Expiration: if has ExpiredObjectDeleteMarker, cannot also have Days or Date
2668            // (only check within <Expiration> block)
2669            if let Some(exp_start) = rule_body.find("<Expiration>") {
2670                if let Some(exp_end) = rule_body[exp_start..].find("</Expiration>") {
2671                    let exp_body = &rule_body[exp_start..exp_start + exp_end];
2672                    if exp_body.contains("<ExpiredObjectDeleteMarker>")
2673                        && (exp_body.contains("<Days>") || exp_body.contains("<Date>"))
2674                    {
2675                        return Err(malformed());
2676                    }
2677                }
2678            }
2679
2680            // Filter validation
2681            if has_filter {
2682                if let Some(fs) = rule_body.find("<Filter>") {
2683                    if let Some(fe) = rule_body.find("</Filter>") {
2684                        // `find` locates the two tags independently, so a body
2685                        // with `</Filter>` before `<Filter>` would slice with
2686                        // begin > end and panic (dropping the connection -- a
2687                        // reachable DoS). Reject as malformed instead.
2688                        if fe < fs + 8 {
2689                            return Err(malformed());
2690                        }
2691                        let filter_body = &rule_body[fs + 8..fe];
2692                        let has_prefix_in_filter = filter_body.contains("<Prefix");
2693                        let has_tag_in_filter = filter_body.contains("<Tag>");
2694                        let has_and_in_filter = filter_body.contains("<And>");
2695                        // Can't have both Prefix and Tag without And
2696                        if has_prefix_in_filter && has_tag_in_filter && !has_and_in_filter {
2697                            return Err(malformed());
2698                        }
2699                        // Can't have Tag and And simultaneously at the Filter level
2700                        if has_tag_in_filter && has_and_in_filter {
2701                            // Check if the <Tag> is outside <And>
2702                            let and_start = filter_body.find("<And>").unwrap_or(0);
2703                            let tag_pos = filter_body.find("<Tag>").unwrap_or(0);
2704                            if tag_pos < and_start {
2705                                return Err(malformed());
2706                            }
2707                        }
2708                    }
2709                }
2710            }
2711
2712            // NoncurrentVersionTransition must have NoncurrentDays and StorageClass
2713            if rule_body.contains("<NoncurrentVersionTransition>") {
2714                let mut nvt_remaining = rule_body;
2715                while let Some(nvt_start) = nvt_remaining.find("<NoncurrentVersionTransition>") {
2716                    let nvt_after = &nvt_remaining[nvt_start + 29..];
2717                    if let Some(nvt_end) = nvt_after.find("</NoncurrentVersionTransition>") {
2718                        let nvt_body = &nvt_after[..nvt_end];
2719                        if !nvt_body.contains("<NoncurrentDays>") {
2720                            return Err(malformed());
2721                        }
2722                        if !nvt_body.contains("<StorageClass>") {
2723                            return Err(malformed());
2724                        }
2725                        nvt_remaining = &nvt_after[nvt_end + 30..];
2726                    } else {
2727                        break;
2728                    }
2729                }
2730            }
2731
2732            remaining = &after[rule_end + 7..];
2733        } else {
2734            break;
2735        }
2736    }
2737
2738    Ok(())
2739}
2740
2741/// Parsed CORS rule from bucket configuration XML.
2742pub(crate) struct CorsRule {
2743    allowed_origins: Vec<String>,
2744    allowed_methods: Vec<String>,
2745    allowed_headers: Vec<String>,
2746    expose_headers: Vec<String>,
2747    max_age_seconds: Option<u32>,
2748}
2749
2750/// Parse CORS configuration XML into rules.
2751pub(crate) fn parse_cors_config(xml: &str) -> Vec<CorsRule> {
2752    let mut rules = Vec::new();
2753    let mut remaining = xml;
2754    while let Some(start) = remaining.find("<CORSRule>") {
2755        let after = &remaining[start + 10..];
2756        if let Some(end) = after.find("</CORSRule>") {
2757            let block = &after[..end];
2758            let allowed_origins = extract_all_xml_values(block, "AllowedOrigin");
2759            let allowed_methods = extract_all_xml_values(block, "AllowedMethod");
2760            let allowed_headers = extract_all_xml_values(block, "AllowedHeader");
2761            let expose_headers = extract_all_xml_values(block, "ExposeHeader");
2762            let max_age_seconds =
2763                extract_xml_value(block, "MaxAgeSeconds").and_then(|s| s.parse().ok());
2764            rules.push(CorsRule {
2765                allowed_origins,
2766                allowed_methods,
2767                allowed_headers,
2768                expose_headers,
2769                max_age_seconds,
2770            });
2771            remaining = &after[end + 11..];
2772        } else {
2773            break;
2774        }
2775    }
2776    rules
2777}
2778
2779/// Match an origin against a CORS allowed origin pattern (supports "*" wildcard).
2780pub(crate) fn origin_matches(origin: &str, pattern: &str) -> bool {
2781    if pattern == "*" {
2782        return true;
2783    }
2784    // Simple wildcard: *.example.com
2785    if let Some(suffix) = pattern.strip_prefix('*') {
2786        return origin.ends_with(suffix);
2787    }
2788    origin == pattern
2789}
2790
2791/// Find the matching CORS rule for a given origin and method.
2792pub(crate) fn find_cors_rule<'a>(
2793    rules: &'a [CorsRule],
2794    origin: &str,
2795    method: Option<&str>,
2796) -> Option<&'a CorsRule> {
2797    rules.iter().find(|rule| {
2798        let origin_ok = rule
2799            .allowed_origins
2800            .iter()
2801            .any(|o| origin_matches(origin, o));
2802        let method_ok = match method {
2803            Some(m) => rule.allowed_methods.iter().any(|am| am == m),
2804            None => true,
2805        };
2806        origin_ok && method_ok
2807    })
2808}
2809
2810/// Check if an object is locked (retention or legal hold) and should block mutation.
2811/// Returns an error string if locked, None if allowed.
2812pub(crate) fn check_object_lock_for_overwrite(
2813    obj: &S3Object,
2814    req: &AwsRequest,
2815) -> Option<&'static str> {
2816    // Legal hold blocks overwrite
2817    if obj.lock_legal_hold.as_deref() == Some("ON") {
2818        return Some("AccessDenied");
2819    }
2820    // Retention check
2821    if let (Some(mode), Some(until)) = (&obj.lock_mode, &obj.lock_retain_until) {
2822        if *until > Utc::now() {
2823            if mode == "COMPLIANCE" {
2824                return Some("AccessDenied");
2825            }
2826            if mode == "GOVERNANCE" {
2827                let bypass = req
2828                    .headers
2829                    .get("x-amz-bypass-governance-retention")
2830                    .and_then(|v| v.to_str().ok())
2831                    .map(|s| s.eq_ignore_ascii_case("true"))
2832                    .unwrap_or(false);
2833                if !bypass {
2834                    return Some("AccessDenied");
2835                }
2836            }
2837        }
2838    }
2839    None
2840}
2841
2842#[cfg(test)]
2843mod tests;
2844
2845#[cfg(test)]
2846mod s3_detect_action_tests {
2847    //! Auth-mapping regressions for bug-hunt 2026-06-15 §5.3: ops that
2848    //! were unmapped (-> enforcement skipped) or mapped to the wrong
2849    //! action (-> the policy targeting them never applied).
2850    use super::s3_detect_action;
2851    use std::collections::HashMap;
2852
2853    fn q(pairs: &[(&str, &str)]) -> HashMap<String, String> {
2854        pairs
2855            .iter()
2856            .map(|(k, v)| (k.to_string(), v.to_string()))
2857            .collect()
2858    }
2859
2860    #[test]
2861    fn select_object_content_maps_to_get_object() {
2862        // POST /bucket/key?select&select-type=2 reads object data.
2863        let action = s3_detect_action(
2864            "POST",
2865            Some("bucket"),
2866            Some("key"),
2867            &q(&[("select", ""), ("select-type", "2")]),
2868        );
2869        assert_eq!(action, Some("GetObject"));
2870    }
2871
2872    #[test]
2873    fn write_get_object_response_is_detected() {
2874        // POST /WriteGetObjectResponse (no key) — Object Lambda data plane.
2875        let action = s3_detect_action("POST", Some("WriteGetObjectResponse"), None, &q(&[]));
2876        assert_eq!(action, Some("WriteGetObjectResponse"));
2877    }
2878
2879    #[test]
2880    fn list_directory_buckets_is_distinct_from_list_buckets() {
2881        // GET /?x-id=ListDirectoryBuckets -> s3express action, not ListBuckets.
2882        let dir = s3_detect_action("GET", None, None, &q(&[("x-id", "ListDirectoryBuckets")]));
2883        assert_eq!(dir, Some("ListAllMyDirectoryBuckets"));
2884        // Plain GET / is still ListBuckets.
2885        let plain = s3_detect_action("GET", None, None, &q(&[]));
2886        assert_eq!(plain, Some("ListBuckets"));
2887    }
2888
2889    #[test]
2890    fn rename_object_is_detected_with_key() {
2891        // PUT /bucket/newkey?renameObject carries a key — previously fell
2892        // through to PutObject so s3:RenameObject denies were ineffective.
2893        let action = s3_detect_action(
2894            "PUT",
2895            Some("bucket"),
2896            Some("newkey"),
2897            &q(&[("renameObject", "")]),
2898        );
2899        assert_eq!(action, Some("RenameObject"));
2900    }
2901
2902    #[test]
2903    fn update_object_encryption_maps_to_put_object() {
2904        // PUT /bucket/key?encryption changes object SSE; AWS gates this on
2905        // s3:PutObject (no distinct public action). Deliberate, not a
2906        // silent PutObject fall-through.
2907        let action = s3_detect_action(
2908            "PUT",
2909            Some("bucket"),
2910            Some("key"),
2911            &q(&[("encryption", "")]),
2912        );
2913        assert_eq!(action, Some("PutObject"));
2914    }
2915
2916    #[test]
2917    fn create_session_not_misdetected_as_list_objects() {
2918        // GET /bucket?session — previously fell through to ListObjects.
2919        let action = s3_detect_action("GET", Some("bucket"), None, &q(&[("session", "")]));
2920        assert_eq!(action, Some("CreateSession"));
2921    }
2922
2923    #[test]
2924    fn get_bucket_abac_not_misdetected_as_list_objects() {
2925        // GET /bucket?abac — previously fell through to ListObjects.
2926        let action = s3_detect_action("GET", Some("bucket"), None, &q(&[("abac", "")]));
2927        assert_eq!(action, Some("GetBucketAbacConfiguration"));
2928    }
2929
2930    #[test]
2931    fn put_bucket_abac_still_detected() {
2932        let action = s3_detect_action("PUT", Some("bucket"), None, &q(&[("abac", "")]));
2933        assert_eq!(action, Some("PutBucketAbacConfiguration"));
2934    }
2935}
2936
2937#[cfg(test)]
2938mod s3_iam_service_prefix_tests {
2939    //! §5.3: CreateSession and WriteGetObjectResponse are authorized under
2940    //! sibling IAM service prefixes (s3express / s3-object-lambda), not s3.
2941    use super::*;
2942    use parking_lot::RwLock;
2943    use std::sync::Arc;
2944
2945    fn make_service() -> S3Service {
2946        let state: SharedS3State = Arc::new(RwLock::new(
2947            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
2948        ));
2949        S3Service::new(state, Arc::new(DeliveryBus::new()))
2950    }
2951
2952    fn req(method: Method, path: &str, query: &[(&str, &str)]) -> AwsRequest {
2953        let path_segments: Vec<String> = path
2954            .split('/')
2955            .filter(|s| !s.is_empty())
2956            .map(|s| s.to_string())
2957            .collect();
2958        AwsRequest {
2959            service: "s3".to_string(),
2960            action: String::new(),
2961            region: "us-east-1".to_string(),
2962            account_id: "123456789012".to_string(),
2963            request_id: "rid".to_string(),
2964            headers: http::HeaderMap::new(),
2965            query_params: query
2966                .iter()
2967                .map(|(k, v)| (k.to_string(), v.to_string()))
2968                .collect(),
2969            body: bytes::Bytes::new(),
2970            body_stream: parking_lot::Mutex::new(None),
2971            path_segments,
2972            raw_path: path.to_string(),
2973            raw_query: String::new(),
2974            method,
2975            is_query_protocol: false,
2976            access_key_id: None,
2977            principal: None,
2978        }
2979    }
2980
2981    #[test]
2982    fn create_session_uses_s3express_prefix() {
2983        let svc = make_service();
2984        let action = svc
2985            .iam_action_for(&req(Method::GET, "/mybucket", &[("session", "")]))
2986            .expect("must be mapped");
2987        assert_eq!(action.service, "s3express");
2988        assert_eq!(action.action, "CreateSession");
2989        assert_eq!(action.action_string(), "s3express:CreateSession");
2990    }
2991
2992    #[test]
2993    fn write_get_object_response_uses_object_lambda_prefix() {
2994        let svc = make_service();
2995        let action = svc
2996            .iam_action_for(&req(Method::POST, "/WriteGetObjectResponse", &[]))
2997            .expect("must be mapped");
2998        assert_eq!(action.service, "s3-object-lambda");
2999        assert_eq!(action.action, "WriteGetObjectResponse");
3000        assert_eq!(
3001            action.action_string(),
3002            "s3-object-lambda:WriteGetObjectResponse"
3003        );
3004    }
3005
3006    #[test]
3007    fn get_object_torrent_maps_to_its_own_action() {
3008        // GET object with `?torrent` must authorize under s3:GetObjectTorrent,
3009        // not s3:GetObject (bug-audit 2026-06-20, 5.3).
3010        let svc = make_service();
3011        let action = svc
3012            .iam_action_for(&req(Method::GET, "/mybucket/key.txt", &[("torrent", "")]))
3013            .expect("must be mapped");
3014        assert_eq!(action.action, "GetObjectTorrent");
3015        // A plain GET still maps to GetObject.
3016        let plain = svc
3017            .iam_action_for(&req(Method::GET, "/mybucket/key.txt", &[]))
3018            .expect("must be mapped");
3019        assert_eq!(plain.action, "GetObject");
3020    }
3021}
3022
3023#[cfg(test)]
3024mod extract_xml_value_tests {
3025    use super::extract_xml_value;
3026
3027    #[test]
3028    fn returns_inner_value() {
3029        assert_eq!(
3030            extract_xml_value("<Root><Key>value</Key></Root>", "Key"),
3031            Some("value".to_string())
3032        );
3033    }
3034
3035    #[test]
3036    fn missing_tag_is_none() {
3037        assert_eq!(extract_xml_value("<Root></Root>", "Key"), None);
3038    }
3039
3040    // bug-audit 2026-05-28, 2.2: a closing tag before the opening one used to
3041    // slice with end < start and panic; must return None instead.
3042    #[test]
3043    fn close_before_open_does_not_panic() {
3044        assert_eq!(extract_xml_value("</Key>oops<Key>value", "Key"), None);
3045    }
3046
3047    #[test]
3048    fn open_without_close_is_none() {
3049        assert_eq!(extract_xml_value("<Key>value", "Key"), None);
3050    }
3051}
3052
3053#[cfg(test)]
3054mod compute_checksum_tests {
3055    use super::{compute_checksum, compute_checksum_streaming};
3056
3057    // bug-audit 2026-06-15, 1.7: CopyObject / CompleteMultipartUpload routed
3058    // through the non-streaming compute_checksum, whose `_ =>` arm returned an
3059    // empty string for CRC32C / CRC64NVME -> GetObjectAttributes reported an
3060    // empty checksum and integrity checks silently broke.
3061    #[test]
3062    fn crc32c_is_non_empty_and_matches_known_vector() {
3063        // CRC32C of "123456789" is 0xE3069283; base64 of those 4 BE bytes.
3064        let out = compute_checksum("CRC32C", b"123456789");
3065        assert!(!out.is_empty());
3066        let bytes = base64::engine::general_purpose::STANDARD
3067            .decode(&out)
3068            .unwrap();
3069        assert_eq!(bytes, 0xE306_9283u32.to_be_bytes());
3070    }
3071
3072    #[test]
3073    fn crc64nvme_is_non_empty() {
3074        let out = compute_checksum("CRC64NVME", b"hello world");
3075        assert!(!out.is_empty());
3076        // 8 BE bytes of a u64.
3077        let bytes = base64::engine::general_purpose::STANDARD
3078            .decode(&out)
3079            .unwrap();
3080        assert_eq!(bytes.len(), 8);
3081    }
3082
3083    #[tokio::test]
3084    async fn non_streaming_matches_streaming_for_all_algorithms() {
3085        let data = b"the quick brown fox jumps over the lazy dog".repeat(100);
3086        let tmp = tempfile::NamedTempFile::new().unwrap();
3087        tokio::fs::write(tmp.path(), &data).await.unwrap();
3088
3089        for algo in ["CRC32", "CRC32C", "CRC64NVME", "SHA1", "SHA256"] {
3090            let direct = compute_checksum(algo, &data);
3091            let streamed = compute_checksum_streaming(algo, tmp.path()).await.unwrap();
3092            assert!(!direct.is_empty(), "{algo} direct empty");
3093            assert_eq!(direct, streamed, "{algo} direct != streaming");
3094        }
3095    }
3096
3097    use base64::Engine as _;
3098}