Skip to main content

fakecloud_s3/service/
mod.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use bytes::Bytes;
5use chrono::{DateTime, Timelike, Utc};
6use http::{HeaderMap, Method, StatusCode};
7use md5::{Digest, Md5};
8
9use fakecloud_aws::arn::Arn;
10use fakecloud_core::delivery::DeliveryBus;
11use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
12use fakecloud_kms::SharedKmsState;
13use fakecloud_persistence::{MemoryS3Store, S3Store, StoreError};
14
15use base64::engine::general_purpose::STANDARD as BASE64;
16use base64::Engine as _;
17
18use crate::logging;
19use crate::state::{AclGrant, S3Bucket, S3Object, SharedS3State};
20
21mod access_points;
22mod acl;
23mod buckets;
24pub(crate) mod config;
25mod lock;
26mod multipart;
27mod notifications;
28mod objects;
29mod tags;
30
31// Re-export notification helpers for use in sub-modules
32#[cfg(test)]
33use notifications::replicate_object;
34pub(super) use notifications::{
35    deliver_notifications, normalize_notification_ids, normalize_replication_xml,
36    replicate_through_store,
37};
38
39// Used only within this file (parse_cors_config)
40use notifications::extract_all_xml_values;
41
42// Re-exports used only in tests
43#[cfg(test)]
44use notifications::{
45    event_matches, key_matches_filters, parse_notification_config, parse_replication_rules,
46    NotificationTargetType,
47};
48
49pub struct S3Service {
50    state: SharedS3State,
51    delivery: Arc<DeliveryBus>,
52    kms_state: Option<SharedKmsState>,
53    pub(crate) kms_hook: Option<Arc<dyn fakecloud_core::delivery::KmsHook>>,
54    store: Arc<dyn S3Store>,
55}
56
57/// Map a [`StoreError`] from the persistence layer to a 500 InternalError
58/// response. Invoked at every mutation site when the write-through persistence
59/// call fails: the in-memory mutation has already happened, but we surface the
60/// failure to the client so they know to retry (and so logs/metrics flag it).
61pub(crate) fn persistence_error(err: StoreError) -> AwsServiceError {
62    AwsServiceError::aws_error(
63        StatusCode::INTERNAL_SERVER_ERROR,
64        "InternalError",
65        format!("persistence store error: {err}"),
66    )
67}
68
69/// Convert a filesystem IO error from a disk-backed body read into an
70/// InternalError response.
71pub(crate) fn io_to_aws(err: std::io::Error) -> AwsServiceError {
72    AwsServiceError::aws_error(
73        StatusCode::INTERNAL_SERVER_ERROR,
74        "InternalError",
75        format!("failed to read object body from disk: {err}"),
76    )
77}
78
79impl S3Service {
80    pub fn new(state: SharedS3State, delivery: Arc<DeliveryBus>) -> Self {
81        Self::with_store(state, delivery, Arc::new(MemoryS3Store::new()))
82    }
83
84    pub fn with_store(
85        state: SharedS3State,
86        delivery: Arc<DeliveryBus>,
87        store: Arc<dyn S3Store>,
88    ) -> Self {
89        Self {
90            state,
91            delivery,
92            kms_state: None,
93            kms_hook: None,
94            store,
95        }
96    }
97
98    pub fn with_kms(mut self, kms_state: SharedKmsState) -> Self {
99        self.kms_state = Some(kms_state);
100        self
101    }
102
103    pub fn with_kms_hook(mut self, hook: Arc<dyn fakecloud_core::delivery::KmsHook>) -> Self {
104        self.kms_hook = Some(hook);
105        self
106    }
107
108    /// Encrypt object body bytes for SSE-KMS storage. Returns ciphertext as
109    /// raw bytes (a UTF-8 fakecloud-kms envelope) on success.
110    ///
111    /// Fail-closed: if the KMS hook reports an error (key denied, key not
112    /// found, etc.), this returns `Err` so PutObject aborts with a 500
113    /// rather than silently storing plaintext. AWS S3 has the same
114    /// behavior — `KMS.NotFoundException` and friends surface as
115    /// `AccessDenied` / `KMS.*` errors back to the caller. When no hook is
116    /// wired (legacy / in-process tests with no KMS dependency), the
117    /// plaintext is returned unchanged so existing tests keep working.
118    pub(crate) fn encrypt_object_body(
119        &self,
120        account_id: &str,
121        region: &str,
122        bucket: &str,
123        plaintext: &[u8],
124        kms_key_id: Option<&str>,
125    ) -> Result<bytes::Bytes, AwsServiceError> {
126        let Some(hook) = &self.kms_hook else {
127            return Ok(bytes::Bytes::copy_from_slice(plaintext));
128        };
129        let key = kms_key_id.filter(|k| !k.is_empty()).unwrap_or("aws/s3");
130        let bucket_arn = Arn::s3(bucket).to_string();
131        let mut ctx = std::collections::HashMap::new();
132        ctx.insert("aws:s3:arn".to_string(), bucket_arn);
133        match hook.encrypt(account_id, region, key, plaintext, "s3.amazonaws.com", ctx) {
134            Ok(envelope) => Ok(bytes::Bytes::from(envelope.into_bytes())),
135            Err(err) => {
136                tracing::warn!(bucket = %bucket, error = %err, "SSE-KMS encrypt failed");
137                Err(AwsServiceError::aws_error(
138                    StatusCode::INTERNAL_SERVER_ERROR,
139                    "KMS.InternalFailureException",
140                    format!("Failed to encrypt object via KMS: {err}"),
141                ))
142            }
143        }
144    }
145
146    /// Decrypt object body bytes that were stored as a fakecloud-kms
147    /// envelope. Caller is expected to gate this on
148    /// `obj.sse_algorithm == Some("aws:kms")`.
149    ///
150    /// Fail-closed: when a hook is wired and the bytes look like an
151    /// envelope but don't decrypt (key revoked, malformed ciphertext),
152    /// this returns `Err` so GetObject surfaces a 500. When no hook is
153    /// wired, or the bytes aren't UTF-8 (legacy snapshots from before
154    /// the hook landed), the bytes are returned unchanged.
155    pub(crate) fn decrypt_object_body(
156        &self,
157        account_id: &str,
158        bucket: &str,
159        ciphertext: &[u8],
160    ) -> Result<bytes::Bytes, AwsServiceError> {
161        let Some(hook) = &self.kms_hook else {
162            return Ok(bytes::Bytes::copy_from_slice(ciphertext));
163        };
164        // Stored envelope is base64 ASCII; non-UTF-8 bytes are pre-hook
165        // legacy snapshots, return as-is.
166        let envelope = match std::str::from_utf8(ciphertext) {
167            Ok(s) => s,
168            Err(_) => return Ok(bytes::Bytes::copy_from_slice(ciphertext)),
169        };
170        let bucket_arn = Arn::s3(bucket).to_string();
171        let mut ctx = std::collections::HashMap::new();
172        ctx.insert("aws:s3:arn".to_string(), bucket_arn);
173        match hook.decrypt(account_id, envelope, "s3.amazonaws.com", ctx) {
174            Ok(bytes) => Ok(bytes::Bytes::from(bytes)),
175            Err(err) => {
176                tracing::warn!(bucket = %bucket, error = %err, "SSE-KMS decrypt failed");
177                Err(AwsServiceError::aws_error(
178                    StatusCode::INTERNAL_SERVER_ERROR,
179                    "KMS.InternalFailureException",
180                    format!("Failed to decrypt object via KMS: {err}"),
181                ))
182            }
183        }
184    }
185}
186
187#[async_trait]
188impl AwsService for S3Service {
189    fn service_name(&self) -> &str {
190        "s3"
191    }
192
193    async fn handle(&self, mut req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
194        // PutObject / UploadPart enter dispatch via the streaming path
195        // with `body = Bytes::new()` and the raw HTTP body available on
196        // `req.body_stream`. Those handlers consume the stream directly
197        // — spooling chunks to disk while computing MD5 + size in
198        // constant memory — so a 1 GiB upload never materializes into
199        // RAM. Every *other* PUT-on-bucket-key the streaming dispatch
200        // flagged (PutObjectTagging, PutObjectAcl, PutObjectRetention,
201        // PutObjectLegalHold, CopyObject, …) reads a small XML/JSON
202        // body from `req.body`, so drain the stream for them.
203        let is_put_to_key = req.method == Method::PUT
204            && req.path_segments.len() >= 2
205            && req
206                .path_segments
207                .first()
208                .map(|s| !s.is_empty())
209                .unwrap_or(false);
210        let q = &req.query_params;
211        let is_put_object = is_put_to_key
212            && !q.contains_key("tagging")
213            && !q.contains_key("acl")
214            && !q.contains_key("retention")
215            && !q.contains_key("legal-hold")
216            && !q.contains_key("renameObject")
217            && !q.contains_key("encryption")
218            && !req.headers.contains_key("x-amz-copy-source");
219        // UploadPart requires both partNumber AND uploadId; checking
220        // only partNumber would skip body draining for stray PUTs that
221        // happened to carry a partNumber query param without being a
222        // real multipart upload.
223        let is_upload_part =
224            is_put_to_key && q.contains_key("partNumber") && q.contains_key("uploadId");
225        if !is_put_object && !is_upload_part {
226            if let Some(stream) = req.take_body_stream() {
227                req.body = fakecloud_core::service::drain_request_stream(stream).await?;
228            }
229        }
230
231        // Access point data plane: resolve alias -> bucket before routing.
232        access_points::resolve_access_point(self, &mut req)?;
233
234        let account_id = req.account_id.as_str();
235
236        // S3 Control endpoint (access point management).
237        let host = req
238            .headers
239            .get("host")
240            .and_then(|v| v.to_str().ok())
241            .unwrap_or("");
242        let is_control = host.to_ascii_lowercase().contains("s3-control");
243        if is_control {
244            let v1 = req.path_segments.first().map(|s| s.as_str());
245            let v2 = req.path_segments.get(1).map(|s| s.as_str());
246            let v3 = req.path_segments.get(2).map(|s| s.as_str());
247            if v1 == Some("v20180820") && v2 == Some("accesspoint") {
248                if let Some(name) = v3 {
249                    match req.method {
250                        Method::PUT => return self.create_access_point(account_id, &req, name),
251                        Method::GET => return self.get_access_point(account_id, &req, name),
252                        Method::DELETE => return self.delete_access_point(account_id, &req, name),
253                        _ => {}
254                    }
255                } else if req.method == Method::GET {
256                    return self.list_access_points(account_id, &req);
257                }
258            }
259        }
260
261        // S3 REST routing: method + path segments + query params
262        //
263        // Reject paths that start with `//` (an empty bucket segment). The
264        // path-segment splitter further down filters empty segments out,
265        // which would otherwise let a malformed URL like `PUT //my-key`
266        // collapse to a valid `PUT /my-key` (CreateBucket) and silently
267        // succeed. Real S3 rejects empty bucket segments with
268        // InvalidBucketName, so mirror that.
269        if req.raw_path.starts_with("//") {
270            return Err(AwsServiceError::aws_error(
271                StatusCode::BAD_REQUEST,
272                "InvalidBucketName",
273                "The specified bucket is not valid: bucket name cannot be empty",
274            ));
275        }
276        let bucket = req.path_segments.first().map(|s| s.as_str());
277        // Extract key from the raw path to preserve leading slashes and empty segments.
278        // The raw path is like "/bucket/key/parts" — we strip the bucket prefix.
279        let key = if let Some(b) = bucket {
280            let prefix = format!("/{b}/");
281            if req.raw_path.starts_with(&prefix) && req.raw_path.len() > prefix.len() {
282                let raw_key = &req.raw_path[prefix.len()..];
283                Some(
284                    percent_encoding::percent_decode_str(raw_key)
285                        .decode_utf8_lossy()
286                        .into_owned(),
287                )
288            } else if req.path_segments.len() > 1 {
289                let raw = req.path_segments[1..].join("/");
290                Some(
291                    percent_encoding::percent_decode_str(&raw)
292                        .decode_utf8_lossy()
293                        .into_owned(),
294                )
295            } else {
296                None
297            }
298        } else {
299            None
300        };
301
302        // Multipart upload operations (checked before main match)
303        if let Some(b) = bucket {
304            // POST /{bucket}/{key}?uploads — CreateMultipartUpload
305            if req.method == Method::POST
306                && key.is_some()
307                && req.query_params.contains_key("uploads")
308            {
309                return self.create_multipart_upload(account_id, &req, b, key.as_deref().unwrap());
310            }
311
312            // POST /{bucket}/{key}?restore
313            if req.method == Method::POST
314                && key.is_some()
315                && req.query_params.contains_key("restore")
316            {
317                return self.restore_object(account_id, &req, b, key.as_deref().unwrap());
318            }
319
320            // POST /{bucket}/{key}?uploadId=X — CompleteMultipartUpload
321            if req.method == Method::POST && key.is_some() {
322                if let Some(upload_id) = req.query_params.get("uploadId").cloned() {
323                    return self.complete_multipart_upload(
324                        account_id,
325                        &req,
326                        b,
327                        key.as_deref().unwrap(),
328                        &upload_id,
329                    );
330                }
331            }
332
333            // PUT /{bucket}/{key}?partNumber=N&uploadId=X — UploadPart or UploadPartCopy
334            if req.method == Method::PUT && key.is_some() {
335                if let (Some(part_num_str), Some(upload_id)) = (
336                    req.query_params.get("partNumber").cloned(),
337                    req.query_params.get("uploadId").cloned(),
338                ) {
339                    if let Ok(part_number) = part_num_str.parse::<i64>() {
340                        if req.headers.contains_key("x-amz-copy-source") {
341                            return self.upload_part_copy(
342                                account_id,
343                                &req,
344                                b,
345                                key.as_deref().unwrap(),
346                                &upload_id,
347                                part_number,
348                            );
349                        }
350                        return self
351                            .upload_part(
352                                account_id,
353                                &req,
354                                b,
355                                key.as_deref().unwrap(),
356                                &upload_id,
357                                part_number,
358                            )
359                            .await;
360                    }
361                }
362            }
363
364            // DELETE /{bucket}/{key}?uploadId=X — AbortMultipartUpload
365            if req.method == Method::DELETE && key.is_some() {
366                if let Some(upload_id) = req.query_params.get("uploadId").cloned() {
367                    return self.abort_multipart_upload(
368                        account_id,
369                        b,
370                        key.as_deref().unwrap(),
371                        &upload_id,
372                    );
373                }
374            }
375
376            // GET /{bucket}?uploads — ListMultipartUploads
377            if req.method == Method::GET
378                && key.is_none()
379                && req.query_params.contains_key("uploads")
380            {
381                return self.list_multipart_uploads(account_id, b);
382            }
383
384            // GET /{bucket}/{key}?uploadId=X — ListParts
385            if req.method == Method::GET && key.is_some() {
386                if let Some(upload_id) = req.query_params.get("uploadId").cloned() {
387                    return self.list_parts(
388                        account_id,
389                        &req,
390                        b,
391                        key.as_deref().unwrap(),
392                        &upload_id,
393                    );
394                }
395            }
396        }
397
398        // Handle OPTIONS preflight requests (CORS)
399        if req.method == Method::OPTIONS {
400            if let Some(b_name) = bucket {
401                let cors_config = {
402                    let accounts = self.state.read();
403                    let _empty_s3 = crate::state::S3State::new(&req.account_id, &req.region);
404                    let state = accounts.get(&req.account_id).unwrap_or(&_empty_s3);
405                    state
406                        .buckets
407                        .get(b_name)
408                        .and_then(|b| b.cors_config.clone())
409                };
410                if let Some(ref config) = cors_config {
411                    let origin = req
412                        .headers
413                        .get("origin")
414                        .and_then(|v| v.to_str().ok())
415                        .unwrap_or("");
416                    let request_method = req
417                        .headers
418                        .get("access-control-request-method")
419                        .and_then(|v| v.to_str().ok())
420                        .unwrap_or("");
421                    let rules = parse_cors_config(config);
422                    if let Some(rule) = find_cors_rule(&rules, origin, Some(request_method)) {
423                        let mut headers = HeaderMap::new();
424                        let matched_origin = if rule.allowed_origins.contains(&"*".to_string()) {
425                            "*"
426                        } else {
427                            origin
428                        };
429                        headers.insert(
430                            "access-control-allow-origin",
431                            matched_origin
432                                .parse()
433                                .unwrap_or_else(|_| http::HeaderValue::from_static("")),
434                        );
435                        headers.insert(
436                            "access-control-allow-methods",
437                            rule.allowed_methods
438                                .join(", ")
439                                .parse()
440                                .unwrap_or_else(|_| http::HeaderValue::from_static("")),
441                        );
442                        if !rule.allowed_headers.is_empty() {
443                            let ah = if rule.allowed_headers.contains(&"*".to_string()) {
444                                req.headers
445                                    .get("access-control-request-headers")
446                                    .and_then(|v| v.to_str().ok())
447                                    .unwrap_or("*")
448                                    .to_string()
449                            } else {
450                                rule.allowed_headers.join(", ")
451                            };
452                            headers.insert(
453                                "access-control-allow-headers",
454                                ah.parse()
455                                    .unwrap_or_else(|_| http::HeaderValue::from_static("")),
456                            );
457                        }
458                        if let Some(max_age) = rule.max_age_seconds {
459                            headers.insert(
460                                "access-control-max-age",
461                                max_age
462                                    .to_string()
463                                    .parse()
464                                    .unwrap_or_else(|_| http::HeaderValue::from_static("")),
465                            );
466                        }
467                        return Ok(AwsResponse {
468                            status: StatusCode::OK,
469                            content_type: String::new(),
470                            body: Bytes::new().into(),
471                            headers,
472                        });
473                    }
474                }
475                return Err(AwsServiceError::aws_error(
476                    StatusCode::FORBIDDEN,
477                    "CORSResponse",
478                    "CORS is not enabled for this bucket",
479                ));
480            }
481        }
482
483        // Capture origin for CORS response headers
484        let origin_header = req
485            .headers
486            .get("origin")
487            .and_then(|v| v.to_str().ok())
488            .map(|s| s.to_string());
489
490        // Bucket-scoped sub-resource query params. If a request targets one
491        // of these without a bucket in the path, S3 returns an error rather
492        // than silently treating it as a top-level (service) operation.
493        // Real AWS rejects e.g. `GET /?tagging` with a routing/validation
494        // error; mirroring that prevents bucket-required ops from being
495        // accidentally answered by ListBuckets / WriteGetObjectResponse.
496        let bucket_subresources: &[&str] = &[
497            "tagging",
498            "acl",
499            "versioning",
500            "cors",
501            "notification",
502            "website",
503            "accelerate",
504            "publicAccessBlock",
505            "encryption",
506            "lifecycle",
507            "logging",
508            "policy",
509            "policyStatus",
510            "replication",
511            "ownershipControls",
512            "inventory",
513            "analytics",
514            "intelligent-tiering",
515            "metrics",
516            "requestPayment",
517            "abac",
518            "metadataConfiguration",
519            "metadataTable",
520            "metadataInventoryTable",
521            "metadataJournalTable",
522            "object-lock",
523            "versions",
524            "session",
525            "retention",
526            "legal-hold",
527            "attributes",
528            "torrent",
529            "renameObject",
530            "uploads",
531            "restore",
532            "select",
533            // Bucket-required ops the earlier list missed: their query
534            // markers (e.g. `?location` for GetBucketLocation,
535            // `?delete` for DeleteObjects, `?list-type=2` for
536            // ListObjectsV2) signal a bucket op so a request with the
537            // marker but no bucket segment is malformed, not a service-
538            // level call.
539            "location",
540            "delete",
541            "list-type",
542        ];
543        if bucket.is_none()
544            && bucket_subresources
545                .iter()
546                .any(|q| req.query_params.contains_key(*q))
547        {
548            return Err(AwsServiceError::aws_error(
549                StatusCode::BAD_REQUEST,
550                "InvalidBucketName",
551                "The specified bucket is not valid: bucket name is required for this operation",
552            ));
553        }
554
555        let mut result = match (&req.method, bucket, key.as_deref()) {
556            // ListBuckets: GET /
557            (&Method::GET, None, None) => {
558                if req.query_params.get("x-id").map(|s| s.as_str()) == Some("ListDirectoryBuckets")
559                {
560                    self.list_directory_buckets(account_id, &req)
561                } else {
562                    self.list_buckets(account_id, &req)
563                }
564            }
565
566            // Bucket-level operations (no key)
567            (&Method::PUT, Some(b), None) => {
568                if req.query_params.contains_key("tagging") {
569                    self.put_bucket_tagging(account_id, &req, b)
570                } else if req.query_params.contains_key("acl") {
571                    self.put_bucket_acl(account_id, &req, b)
572                } else if req.query_params.contains_key("versioning") {
573                    self.put_bucket_versioning(account_id, &req, b)
574                } else if req.query_params.contains_key("cors") {
575                    self.put_bucket_cors(account_id, &req, b)
576                } else if req.query_params.contains_key("notification") {
577                    self.put_bucket_notification(account_id, &req, b)
578                } else if req.query_params.contains_key("website") {
579                    self.put_bucket_website(account_id, &req, b)
580                } else if req.query_params.contains_key("accelerate") {
581                    self.put_bucket_accelerate(account_id, &req, b)
582                } else if req.query_params.contains_key("publicAccessBlock") {
583                    self.put_public_access_block(account_id, &req, b)
584                } else if req.query_params.contains_key("encryption") {
585                    self.put_bucket_encryption(account_id, &req, b)
586                } else if req.query_params.contains_key("lifecycle") {
587                    self.put_bucket_lifecycle(account_id, &req, b)
588                } else if req.query_params.contains_key("logging") {
589                    self.put_bucket_logging(account_id, &req, b)
590                } else if req.query_params.contains_key("policy") {
591                    self.put_bucket_policy(account_id, &req, b)
592                } else if req.query_params.contains_key("object-lock") {
593                    self.put_object_lock_config(account_id, &req, b)
594                } else if req.query_params.contains_key("replication") {
595                    self.put_bucket_replication(account_id, &req, b)
596                } else if req.query_params.contains_key("ownershipControls") {
597                    self.put_bucket_ownership_controls(account_id, &req, b)
598                } else if req.query_params.contains_key("inventory") {
599                    self.put_bucket_inventory(account_id, &req, b)
600                } else if req.query_params.contains_key("analytics") {
601                    self.put_bucket_analytics_config(account_id, &req, b)
602                } else if req.query_params.contains_key("intelligent-tiering") {
603                    self.put_bucket_intelligent_tiering_config(account_id, &req, b)
604                } else if req.query_params.contains_key("metrics") {
605                    self.put_bucket_metrics_config(account_id, &req, b)
606                } else if req.query_params.contains_key("requestPayment") {
607                    self.put_bucket_request_payment(account_id, &req, b)
608                } else if req.query_params.contains_key("abac") {
609                    self.put_bucket_abac(account_id, &req, b)
610                } else if req.query_params.contains_key("metadataInventoryTable") {
611                    self.update_bucket_metadata_inventory_table(account_id, &req, b)
612                } else if req.query_params.contains_key("metadataJournalTable") {
613                    self.update_bucket_metadata_journal_table(account_id, &req, b)
614                } else {
615                    self.create_bucket(account_id, &req, b)
616                }
617            }
618            (&Method::DELETE, Some(b), None) => {
619                if req.query_params.contains_key("tagging") {
620                    self.delete_bucket_tagging(account_id, &req, b)
621                } else if req.query_params.contains_key("cors") {
622                    self.delete_bucket_cors(account_id, b)
623                } else if req.query_params.contains_key("website") {
624                    self.delete_bucket_website(account_id, b)
625                } else if req.query_params.contains_key("publicAccessBlock") {
626                    self.delete_public_access_block(account_id, b)
627                } else if req.query_params.contains_key("encryption") {
628                    self.delete_bucket_encryption(account_id, b)
629                } else if req.query_params.contains_key("lifecycle") {
630                    self.delete_bucket_lifecycle(account_id, b)
631                } else if req.query_params.contains_key("policy") {
632                    self.delete_bucket_policy(account_id, b)
633                } else if req.query_params.contains_key("replication") {
634                    self.delete_bucket_replication(account_id, b)
635                } else if req.query_params.contains_key("ownershipControls") {
636                    self.delete_bucket_ownership_controls(account_id, b)
637                } else if req.query_params.contains_key("inventory") {
638                    self.delete_bucket_inventory(account_id, &req, b)
639                } else if req.query_params.contains_key("analytics") {
640                    self.delete_bucket_analytics_config(account_id, &req, b)
641                } else if req.query_params.contains_key("intelligent-tiering") {
642                    self.delete_bucket_intelligent_tiering_config(account_id, &req, b)
643                } else if req.query_params.contains_key("metrics") {
644                    self.delete_bucket_metrics_config(account_id, &req, b)
645                } else if req.query_params.contains_key("metadataConfiguration") {
646                    self.delete_bucket_metadata_config(account_id, b)
647                } else if req.query_params.contains_key("metadataTable") {
648                    self.delete_bucket_metadata_table_config(account_id, b)
649                } else {
650                    self.delete_bucket(account_id, &req, b)
651                }
652            }
653            (&Method::HEAD, Some(b), None) => self.head_bucket(account_id, b),
654            (&Method::GET, Some(b), None) => {
655                if req.query_params.contains_key("tagging") {
656                    self.get_bucket_tagging(account_id, &req, b)
657                } else if req.query_params.contains_key("location") {
658                    self.get_bucket_location(account_id, b)
659                } else if req.query_params.contains_key("acl") {
660                    self.get_bucket_acl(account_id, &req, b)
661                } else if req.query_params.contains_key("versioning") {
662                    self.get_bucket_versioning(account_id, b)
663                } else if req.query_params.contains_key("versions") {
664                    self.list_object_versions(account_id, &req, b)
665                } else if req.query_params.contains_key("object-lock") {
666                    self.get_object_lock_configuration(account_id, b)
667                } else if req.query_params.contains_key("cors") {
668                    self.get_bucket_cors(account_id, b)
669                } else if req.query_params.contains_key("notification") {
670                    self.get_bucket_notification(account_id, b)
671                } else if req.query_params.contains_key("website") {
672                    self.get_bucket_website(account_id, b)
673                } else if req.query_params.contains_key("accelerate") {
674                    self.get_bucket_accelerate(account_id, b)
675                } else if req.query_params.contains_key("publicAccessBlock") {
676                    self.get_public_access_block(account_id, b)
677                } else if req.query_params.contains_key("encryption") {
678                    self.get_bucket_encryption(account_id, b)
679                } else if req.query_params.contains_key("lifecycle") {
680                    self.get_bucket_lifecycle(account_id, b)
681                } else if req.query_params.contains_key("logging") {
682                    self.get_bucket_logging(account_id, b)
683                } else if req.query_params.contains_key("policy") {
684                    self.get_bucket_policy(account_id, b)
685                } else if req.query_params.contains_key("replication") {
686                    self.get_bucket_replication(account_id, b)
687                } else if req.query_params.contains_key("ownershipControls") {
688                    self.get_bucket_ownership_controls(account_id, b)
689                } else if req.query_params.contains_key("inventory") {
690                    if req.query_params.contains_key("id") {
691                        self.get_bucket_inventory(account_id, &req, b)
692                    } else {
693                        self.list_bucket_inventory_configurations(account_id, b)
694                    }
695                } else if req.query_params.contains_key("analytics") {
696                    if req.query_params.contains_key("id") {
697                        self.get_bucket_analytics_config(account_id, &req, b)
698                    } else {
699                        self.list_bucket_analytics_configurations(account_id, b)
700                    }
701                } else if req.query_params.contains_key("intelligent-tiering") {
702                    if req.query_params.contains_key("id") {
703                        self.get_bucket_intelligent_tiering_config(account_id, &req, b)
704                    } else {
705                        self.list_bucket_intelligent_tiering_configurations(account_id, b)
706                    }
707                } else if req.query_params.contains_key("metrics") {
708                    if req.query_params.contains_key("id") {
709                        self.get_bucket_metrics_config(account_id, &req, b)
710                    } else {
711                        self.list_bucket_metrics_configurations(account_id, b)
712                    }
713                } else if req.query_params.contains_key("requestPayment") {
714                    self.get_bucket_request_payment(account_id, b)
715                } else if req.query_params.contains_key("abac") {
716                    self.get_bucket_abac(account_id, b)
717                } else if req.query_params.contains_key("policyStatus") {
718                    self.get_bucket_policy_status(account_id, b)
719                } else if req.query_params.contains_key("metadataConfiguration") {
720                    self.get_bucket_metadata_config(account_id, b)
721                } else if req.query_params.contains_key("metadataTable") {
722                    self.get_bucket_metadata_table_config(account_id, b)
723                } else if req.query_params.contains_key("session") {
724                    self.create_session(account_id, &req, b)
725                } else if req.query_params.get("list-type").map(|s| s.as_str()) == Some("2") {
726                    self.list_objects_v2(account_id, &req, b)
727                } else if req.query_params.is_empty() {
728                    // If bucket has website config and no query params, serve index document
729                    let website_config = {
730                        let accounts = self.state.read();
731                        let _empty_s3 = crate::state::S3State::new(&req.account_id, &req.region);
732                        let state = accounts.get(&req.account_id).unwrap_or(&_empty_s3);
733                        state
734                            .buckets
735                            .get(b)
736                            .and_then(|bkt| bkt.website_config.clone())
737                    };
738                    if let Some(ref config) = website_config {
739                        if let Some(index_doc) = extract_xml_value(config, "Suffix").or_else(|| {
740                            extract_xml_value(config, "IndexDocument").and_then(|inner| {
741                                let open = "<Suffix>";
742                                let close = "</Suffix>";
743                                let s = inner.find(open)? + open.len();
744                                let e = inner.find(close)?;
745                                Some(inner[s..e].trim().to_string())
746                            })
747                        }) {
748                            self.serve_website_object(account_id, &req, b, &index_doc, config)
749                        } else {
750                            self.list_objects_v1(account_id, &req, b)
751                        }
752                    } else {
753                        self.list_objects_v1(account_id, &req, b)
754                    }
755                } else {
756                    self.list_objects_v1(account_id, &req, b)
757                }
758            }
759
760            // Object-level operations
761            (&Method::PUT, Some(b), Some(k)) => {
762                if req.query_params.contains_key("tagging") {
763                    self.put_object_tagging(account_id, &req, b, k)
764                } else if req.query_params.contains_key("acl") {
765                    self.put_object_acl(account_id, &req, b, k)
766                } else if req.query_params.contains_key("retention") {
767                    self.put_object_retention(account_id, &req, b, k)
768                } else if req.query_params.contains_key("legal-hold") {
769                    self.put_object_legal_hold(account_id, &req, b, k)
770                } else if req.query_params.contains_key("renameObject") {
771                    self.rename_object(account_id, &req, b, k)
772                } else if req.query_params.contains_key("encryption") {
773                    self.update_object_encryption(account_id, &req, b, k)
774                } else if req.headers.contains_key("x-amz-copy-source") {
775                    self.copy_object(account_id, &req, b, k)
776                } else {
777                    self.put_object(account_id, &req, b, k).await
778                }
779            }
780            (&Method::GET, Some(b), Some(k)) => {
781                if req.query_params.contains_key("tagging") {
782                    self.get_object_tagging(account_id, &req, b, k)
783                } else if req.query_params.contains_key("acl") {
784                    self.get_object_acl(account_id, &req, b, k)
785                } else if req.query_params.contains_key("retention") {
786                    self.get_object_retention(account_id, &req, b, k)
787                } else if req.query_params.contains_key("legal-hold") {
788                    self.get_object_legal_hold(account_id, &req, b, k)
789                } else if req.query_params.contains_key("attributes") {
790                    self.get_object_attributes(account_id, &req, b, k)
791                } else if req.query_params.contains_key("torrent") {
792                    self.get_object_torrent(account_id, &req, b, k)
793                } else {
794                    let result = self.get_object(account_id, &req, b, k);
795                    // If object not found and bucket has website config, serve error document
796                    let is_not_found = matches!(
797                        &result,
798                        Err(e) if e.code() == "NoSuchKey"
799                    );
800                    if is_not_found {
801                        let website_config = {
802                            let accounts = self.state.read();
803                            let _empty_s3 =
804                                crate::state::S3State::new(&req.account_id, &req.region);
805                            let state = accounts.get(&req.account_id).unwrap_or(&_empty_s3);
806                            state
807                                .buckets
808                                .get(b)
809                                .and_then(|bkt| bkt.website_config.clone())
810                        };
811                        if let Some(ref config) = website_config {
812                            if let Some(error_key) = extract_xml_value(config, "ErrorDocument")
813                                .and_then(|inner| {
814                                    let open = "<Key>";
815                                    let close = "</Key>";
816                                    let s = inner.find(open)? + open.len();
817                                    let e = inner.find(close)?;
818                                    Some(inner[s..e].trim().to_string())
819                                })
820                                .or_else(|| extract_xml_value(config, "Key"))
821                            {
822                                return self.serve_website_error(account_id, &req, b, &error_key);
823                            }
824                        }
825                    }
826                    result
827                }
828            }
829            (&Method::DELETE, Some(b), Some(k)) => {
830                if req.query_params.contains_key("tagging") {
831                    self.delete_object_tagging(account_id, b, k)
832                } else {
833                    self.delete_object(account_id, &req, b, k)
834                }
835            }
836            (&Method::HEAD, Some(b), Some(k)) => self.head_object(account_id, &req, b, k),
837
838            // POST /{bucket}?delete — batch delete
839            (&Method::POST, Some(b), None) if req.query_params.contains_key("delete") => {
840                self.delete_objects(account_id, &req, b)
841            }
842            (&Method::POST, Some(b), None)
843                if req.query_params.contains_key("metadataConfiguration") =>
844            {
845                self.create_bucket_metadata_config(account_id, &req, b)
846            }
847            (&Method::POST, Some(b), None) if req.query_params.contains_key("metadataTable") => {
848                self.create_bucket_metadata_table_config(account_id, &req, b)
849            }
850            (&Method::POST, Some(b), Some(k))
851                if req.query_params.get("select-type").map(|s| s.as_str()) == Some("2") =>
852            {
853                self.select_object_content(account_id, &req, b, k)
854            }
855            (&Method::POST, Some("WriteGetObjectResponse"), None) => {
856                self.write_get_object_response(account_id, &req)
857            }
858
859            _ => Err(AwsServiceError::aws_error(
860                StatusCode::METHOD_NOT_ALLOWED,
861                "MethodNotAllowed",
862                "The specified method is not allowed against this resource",
863            )),
864        };
865
866        // Apply CORS headers to the response if Origin was present
867        if let (Some(ref origin), Some(b_name)) = (&origin_header, bucket) {
868            let cors_config = {
869                let accounts = self.state.read();
870                let _empty_s3 = crate::state::S3State::new(&req.account_id, &req.region);
871                let state = accounts.get(&req.account_id).unwrap_or(&_empty_s3);
872                state
873                    .buckets
874                    .get(b_name)
875                    .and_then(|b| b.cors_config.clone())
876            };
877            if let Some(ref config) = cors_config {
878                let rules = parse_cors_config(config);
879                if let Some(rule) = find_cors_rule(&rules, origin, None) {
880                    if let Ok(ref mut resp) = result {
881                        let matched_origin = if rule.allowed_origins.contains(&"*".to_string()) {
882                            "*"
883                        } else {
884                            origin
885                        };
886                        resp.headers.insert(
887                            "access-control-allow-origin",
888                            matched_origin
889                                .parse()
890                                .unwrap_or_else(|_| http::HeaderValue::from_static("")),
891                        );
892                        if !rule.expose_headers.is_empty() {
893                            resp.headers.insert(
894                                "access-control-expose-headers",
895                                rule.expose_headers
896                                    .join(", ")
897                                    .parse()
898                                    .unwrap_or_else(|_| http::HeaderValue::from_static("")),
899                            );
900                        }
901                    }
902                }
903            }
904        }
905
906        // Write S3 access log entry if the source bucket has logging enabled
907        if let Some(b_name) = bucket {
908            let status_code = match &result {
909                Ok(resp) => resp.status.as_u16(),
910                Err(e) => e.status().as_u16(),
911            };
912            let op = logging::operation_name(&req.method, key.as_deref());
913            logging::maybe_write_access_log(
914                &self.state,
915                &self.store,
916                b_name,
917                &logging::AccessLogRequest {
918                    operation: op,
919                    key: key.as_deref(),
920                    status: status_code,
921                    request_id: &req.request_id,
922                    method: req.method.as_str(),
923                    path: &req.raw_path,
924                },
925            );
926        }
927
928        result
929    }
930
931    fn supported_actions(&self) -> &[&str] {
932        &[
933            // Buckets
934            "ListBuckets",
935            "CreateBucket",
936            "DeleteBucket",
937            "HeadBucket",
938            "GetBucketLocation",
939            // Objects
940            "PutObject",
941            "GetObject",
942            "DeleteObject",
943            "HeadObject",
944            "CopyObject",
945            "DeleteObjects",
946            "ListObjectsV2",
947            "ListObjects",
948            "ListObjectVersions",
949            "GetObjectAttributes",
950            "RestoreObject",
951            // Object properties
952            "PutObjectTagging",
953            "GetObjectTagging",
954            "DeleteObjectTagging",
955            "PutObjectAcl",
956            "GetObjectAcl",
957            "PutObjectRetention",
958            "GetObjectRetention",
959            "PutObjectLegalHold",
960            "GetObjectLegalHold",
961            // Bucket configuration
962            "PutBucketTagging",
963            "GetBucketTagging",
964            "DeleteBucketTagging",
965            "PutBucketAcl",
966            "GetBucketAcl",
967            "PutBucketVersioning",
968            "GetBucketVersioning",
969            "PutBucketCors",
970            "GetBucketCors",
971            "DeleteBucketCors",
972            "PutBucketNotificationConfiguration",
973            "GetBucketNotificationConfiguration",
974            "PutBucketWebsite",
975            "GetBucketWebsite",
976            "DeleteBucketWebsite",
977            "PutBucketAccelerateConfiguration",
978            "GetBucketAccelerateConfiguration",
979            "PutPublicAccessBlock",
980            "GetPublicAccessBlock",
981            "DeletePublicAccessBlock",
982            "PutBucketEncryption",
983            "GetBucketEncryption",
984            "DeleteBucketEncryption",
985            "PutBucketLifecycleConfiguration",
986            "GetBucketLifecycleConfiguration",
987            "DeleteBucketLifecycle",
988            "PutBucketLogging",
989            "GetBucketLogging",
990            "PutBucketPolicy",
991            "GetBucketPolicy",
992            "DeleteBucketPolicy",
993            "PutObjectLockConfiguration",
994            "GetObjectLockConfiguration",
995            "PutBucketReplication",
996            "GetBucketReplication",
997            "DeleteBucketReplication",
998            "PutBucketOwnershipControls",
999            "GetBucketOwnershipControls",
1000            "DeleteBucketOwnershipControls",
1001            "PutBucketInventoryConfiguration",
1002            "GetBucketInventoryConfiguration",
1003            "DeleteBucketInventoryConfiguration",
1004            "ListBucketInventoryConfigurations",
1005            "PutBucketAnalyticsConfiguration",
1006            "GetBucketAnalyticsConfiguration",
1007            "DeleteBucketAnalyticsConfiguration",
1008            "ListBucketAnalyticsConfigurations",
1009            "PutBucketIntelligentTieringConfiguration",
1010            "GetBucketIntelligentTieringConfiguration",
1011            "DeleteBucketIntelligentTieringConfiguration",
1012            "ListBucketIntelligentTieringConfigurations",
1013            "PutBucketMetricsConfiguration",
1014            "GetBucketMetricsConfiguration",
1015            "DeleteBucketMetricsConfiguration",
1016            "ListBucketMetricsConfigurations",
1017            "PutBucketRequestPayment",
1018            "GetBucketRequestPayment",
1019            "PutBucketAbac",
1020            "GetBucketAbac",
1021            "GetBucketPolicyStatus",
1022            "CreateBucketMetadataConfiguration",
1023            "GetBucketMetadataConfiguration",
1024            "DeleteBucketMetadataConfiguration",
1025            "CreateBucketMetadataTableConfiguration",
1026            "GetBucketMetadataTableConfiguration",
1027            "DeleteBucketMetadataTableConfiguration",
1028            "UpdateBucketMetadataInventoryTableConfiguration",
1029            "UpdateBucketMetadataJournalTableConfiguration",
1030            "GetObjectTorrent",
1031            "RenameObject",
1032            "SelectObjectContent",
1033            "UpdateObjectEncryption",
1034            "WriteGetObjectResponse",
1035            "ListDirectoryBuckets",
1036            "CreateSession",
1037            // Multipart uploads
1038            "CreateMultipartUpload",
1039            "UploadPart",
1040            "UploadPartCopy",
1041            "CompleteMultipartUpload",
1042            "AbortMultipartUpload",
1043            "ListParts",
1044            "ListMultipartUploads",
1045        ]
1046    }
1047
1048    fn iam_enforceable(&self) -> bool {
1049        true
1050    }
1051
1052    /// S3 resources are either:
1053    /// - Bucket ARN (`arn:aws:s3:::bucket`) for bucket-level actions
1054    /// - Object ARN (`arn:aws:s3:::bucket/key`) for object-level actions
1055    /// - Wildcard (`*`) for `ListBuckets` which doesn't target a specific
1056    ///   resource
1057    ///
1058    /// S3 ARNs notably omit the account id and region — this is the one
1059    /// AWS service that carries neither in its ARN, because bucket names
1060    /// are globally unique.
1061    fn iam_action_for(&self, request: &AwsRequest) -> Option<fakecloud_core::auth::IamAction> {
1062        // S3 doesn't set `request.action` — the handler dispatches on
1063        // method + path + query params directly. Re-derive the action
1064        // name here so enforcement can match against IAM policies the
1065        // same way the real service would.
1066        let bucket = request.path_segments.first().map(|s| s.as_str());
1067        let key = if request.path_segments.len() > 1 {
1068            Some(request.path_segments[1..].join("/"))
1069        } else {
1070            None
1071        };
1072        let action = s3_detect_action(
1073            request.method.as_str(),
1074            bucket,
1075            key.as_deref(),
1076            &request.query_params,
1077        )?;
1078        // A handful of S3-adjacent ops are authorized under sibling IAM
1079        // service prefixes, not `s3:`. Select the right prefix so a policy
1080        // targeting the real action string actually matches.
1081        let service = match action {
1082            "CreateSession" => "s3express",
1083            "WriteGetObjectResponse" => "s3-object-lambda",
1084            _ => "s3",
1085        };
1086        let resource = s3_resource_for(action, bucket, key.as_deref());
1087        Some(fakecloud_core::auth::IamAction {
1088            service,
1089            action,
1090            resource,
1091        })
1092    }
1093
1094    fn iam_condition_keys_for(
1095        &self,
1096        request: &AwsRequest,
1097        action: &fakecloud_core::auth::IamAction,
1098    ) -> std::collections::BTreeMap<String, Vec<String>> {
1099        s3_condition_keys(action.action, &request.query_params)
1100    }
1101
1102    fn resource_tags_for(
1103        &self,
1104        resource_arn: &str,
1105    ) -> Option<std::collections::HashMap<String, String>> {
1106        s3_resource_tags(&self.state, resource_arn)
1107            .map(|m| m.into_iter().collect::<std::collections::HashMap<_, _>>())
1108    }
1109
1110    fn request_tags_from(
1111        &self,
1112        request: &AwsRequest,
1113        action: &str,
1114    ) -> Option<std::collections::HashMap<String, String>> {
1115        s3_request_tags(request, action)
1116            .map(|m| m.into_iter().collect::<std::collections::HashMap<_, _>>())
1117    }
1118}
1119
1120/// Extract service-specific IAM condition keys from an S3 request.
1121///
1122/// Today only `ListObjects` / `ListObjectsV2` expose keys (`s3:prefix`,
1123/// `s3:delimiter`, `s3:max-keys`) via their query params. Other actions
1124/// return an empty map so the evaluator's safe-fail semantics treat any
1125/// policy condition referencing an unknown key as "doesn't apply".
1126fn s3_condition_keys(
1127    action: &str,
1128    query: &std::collections::HashMap<String, String>,
1129) -> std::collections::BTreeMap<String, Vec<String>> {
1130    let mut out = std::collections::BTreeMap::new();
1131    if matches!(action, "ListObjects" | "ListObjectsV2") {
1132        // Both list variants share the same query param shape.
1133        if let Some(prefix) = query.get("prefix") {
1134            out.insert("s3:prefix".to_string(), vec![prefix.clone()]);
1135        }
1136        if let Some(delimiter) = query.get("delimiter") {
1137            out.insert("s3:delimiter".to_string(), vec![delimiter.clone()]);
1138        }
1139        if let Some(max_keys) = query.get("max-keys") {
1140            out.insert("s3:max-keys".to_string(), vec![max_keys.clone()]);
1141        }
1142    }
1143    out
1144}
1145
1146/// Look up resource tags for an S3 ARN.
1147///
1148/// Bucket-level ARN (`arn:aws:s3:::bucket`) -> bucket tags.
1149/// Object-level ARN (`arn:aws:s3:::bucket/key`) -> object tags.
1150/// `*` (ListBuckets) -> `Some(empty)` (no resource to tag).
1151fn s3_resource_tags(
1152    state: &SharedS3State,
1153    resource_arn: &str,
1154) -> Option<std::collections::BTreeMap<String, String>> {
1155    if resource_arn == "*" {
1156        return Some(std::collections::BTreeMap::new());
1157    }
1158    // S3 ARNs: arn:aws:s3:::bucket or arn:aws:s3:::bucket/key
1159    let after_prefix = resource_arn.strip_prefix("arn:aws:s3:::")?;
1160    let mas = state.read();
1161    // S3 bucket names are globally unique; scan all accounts to find the bucket
1162    let bucket_name = after_prefix.split('/').next().unwrap_or(after_prefix);
1163    let state = mas
1164        .find_account(|s| s.buckets.contains_key(bucket_name))
1165        .and_then(|id| mas.get(id))
1166        .or_else(|| Some(mas.default_ref()))?;
1167    if let Some(slash_pos) = after_prefix.find('/') {
1168        // Object-level: bucket/key
1169        let bucket_name = &after_prefix[..slash_pos];
1170        let key = &after_prefix[slash_pos + 1..];
1171        let bucket = state.buckets.get(bucket_name)?;
1172        // Try current objects first, then versioned objects (latest version)
1173        if let Some(obj) = bucket.objects.get(key) {
1174            Some(obj.tags.clone())
1175        } else if let Some(versions) = bucket.object_versions.get(key) {
1176            versions.last().map(|v| v.tags.clone())
1177        } else {
1178            // Object doesn't exist yet (e.g. PutObject creating it)
1179            Some(std::collections::BTreeMap::new())
1180        }
1181    } else {
1182        // Bucket-level
1183        let bucket = state.buckets.get(after_prefix)?;
1184        Some(bucket.tags.clone())
1185    }
1186}
1187
1188/// Extract tags from an S3 request body/headers.
1189///
1190/// S3 sends tags via:
1191/// - `x-amz-tagging` header (URL-encoded `key=value&...`) on PutObject
1192/// - XML body on PutBucketTagging / PutObjectTagging
1193fn s3_request_tags(
1194    request: &AwsRequest,
1195    action: &str,
1196) -> Option<std::collections::BTreeMap<String, String>> {
1197    match action {
1198        "PutObject" | "CopyObject" | "CreateMultipartUpload" => {
1199            // Tags come via x-amz-tagging header
1200            if let Some(tagging) = request.headers.get("x-amz-tagging") {
1201                let tags = parse_url_encoded_tags(tagging.to_str().unwrap_or(""));
1202                Some(tags.into_iter().collect())
1203            } else {
1204                Some(std::collections::BTreeMap::new())
1205            }
1206        }
1207        "PutBucketTagging" | "PutObjectTagging" => {
1208            // Tags come in XML body
1209            let body = std::str::from_utf8(&request.body).unwrap_or("");
1210            let tags = parse_tagging_xml(body);
1211            Some(tags.into_iter().collect())
1212        }
1213        _ => Some(std::collections::BTreeMap::new()),
1214    }
1215}
1216
1217/// Derive the IAM action name from an S3 REST request. Handles the
1218/// common cases (GetObject, PutObject, DeleteObject, ListObjectsV2,
1219/// CreateBucket, ...) plus a subset of sub-resource operations
1220/// (`?acl`, `?tagging`, `?versioning`, `?policy`, `?cors`, `?website`,
1221/// `?lifecycle`, `?encryption`, `?logging`, `?notification`, `?replication`,
1222/// `?ownershipControls`, `?publicAccessBlock`, `?accelerate`, `?inventory`,
1223/// `?object-lock`, `?uploads`, `?uploadId`).
1224///
1225/// Returns `None` for requests that don't map to a known action — the
1226/// dispatch layer then skips enforcement for that request rather than
1227/// guessing (and a warn log fires via the "service is iam_enforceable
1228/// but has no mapping" branch in dispatch.rs).
1229fn s3_detect_action(
1230    method: &str,
1231    bucket: Option<&str>,
1232    key: Option<&str>,
1233    query: &std::collections::HashMap<String, String>,
1234) -> Option<&'static str> {
1235    let has = |q: &str| query.contains_key(q);
1236    let is_get = method == "GET";
1237    let is_put = method == "PUT";
1238    let is_post = method == "POST";
1239    let is_delete = method == "DELETE";
1240
1241    // Service root
1242    if bucket.is_none() {
1243        return match method {
1244            "GET" => Some("ListBuckets"),
1245            _ => None,
1246        };
1247    }
1248
1249    // WriteGetObjectResponse is an Object Lambda data-plane op routed as
1250    // `POST /WriteGetObjectResponse` (the literal value occupies the bucket
1251    // segment). AWS authorizes it under the separate
1252    // `s3-object-lambda:WriteGetObjectResponse` action — handled in
1253    // `iam_action_for`, which selects the `s3-object-lambda` service prefix
1254    // for this op. Previously unmapped -> enforcement skipped.
1255    if is_post && bucket == Some("WriteGetObjectResponse") && key.is_none() {
1256        return Some("WriteGetObjectResponse");
1257    }
1258
1259    let has_key = key.is_some();
1260
1261    // Multipart sub-resource forms
1262    if has_key && is_post && has("uploads") {
1263        return Some("CreateMultipartUpload");
1264    }
1265    if has_key && is_post && has("uploadId") {
1266        return Some("CompleteMultipartUpload");
1267    }
1268    if has_key && is_put && has("partNumber") && has("uploadId") {
1269        return Some("UploadPart");
1270    }
1271    if has_key && is_delete && has("uploadId") {
1272        return Some("AbortMultipartUpload");
1273    }
1274    if has_key && is_get && has("uploadId") {
1275        return Some("ListParts");
1276    }
1277    if !has_key && is_get && has("uploads") {
1278        return Some("ListMultipartUploads");
1279    }
1280
1281    // Sub-resource-keyed actions (?acl, ?tagging, ...). Order matters
1282    // since a request can carry multiple; we pick the most specific.
1283    // Object-level sub-resources come first (key present).
1284    if has_key {
1285        if has("tagging") {
1286            return Some(match method {
1287                "GET" => "GetObjectTagging",
1288                "PUT" => "PutObjectTagging",
1289                "DELETE" => "DeleteObjectTagging",
1290                _ => return None,
1291            });
1292        }
1293        if has("acl") {
1294            return Some(match method {
1295                "GET" => "GetObjectAcl",
1296                "PUT" => "PutObjectAcl",
1297                _ => return None,
1298            });
1299        }
1300        if has("retention") {
1301            return Some(match method {
1302                "GET" => "GetObjectRetention",
1303                "PUT" => "PutObjectRetention",
1304                _ => return None,
1305            });
1306        }
1307        if has("legal-hold") {
1308            return Some(match method {
1309                "GET" => "GetObjectLegalHold",
1310                "PUT" => "PutObjectLegalHold",
1311                _ => return None,
1312            });
1313        }
1314        // Identified by cubic on PR #399: both ?attributes and ?restore
1315        // need method guards — otherwise e.g. GET /bucket/key?restore
1316        // would be classified as RestoreObject (POST-only in AWS) and
1317        // IAM-evaluated against s3:RestoreObject instead of s3:GetObject.
1318        if has("attributes") && is_get {
1319            return Some("GetObjectAttributes");
1320        }
1321        if has("restore") && is_post {
1322            return Some("RestoreObject");
1323        }
1324        // RenameObject is an object-level op (`PUT /bucket/newkey?renameObject`
1325        // + `x-amz-rename-source`). It carries a key, so it must be detected
1326        // here — the `!has_key` arm below never sees it. Without this it fell
1327        // through to `PutObject`, so a policy denying `s3:RenameObject` was
1328        // silently ineffective.
1329        if has("renameObject") && is_put {
1330            return Some("RenameObject");
1331        }
1332        // UpdateObjectEncryption (`PUT /bucket/key?encryption`) changes an
1333        // existing object's server-side encryption. AWS gates this under
1334        // `s3:PutObject` (encryption is a write attribute), and there is no
1335        // distinct public `s3:UpdateObjectEncryption` IAM action — so map to
1336        // PutObject deliberately rather than letting it fall through.
1337        if has("encryption") && is_put {
1338            return Some("PutObject");
1339        }
1340        // SelectObjectContent (`POST /bucket/key?select&select-type=2`) reads
1341        // object data, so AWS authorizes it with `s3:GetObject`. Previously
1342        // unmapped -> `_ => None` -> enforcement skipped entirely.
1343        if has("select-type") && is_post {
1344            return Some("GetObject");
1345        }
1346    }
1347
1348    // Bucket-level sub-resources (key absent).
1349    if !has_key {
1350        if has("tagging") {
1351            return Some(match method {
1352                "GET" => "GetBucketTagging",
1353                "PUT" => "PutBucketTagging",
1354                "DELETE" => "DeleteBucketTagging",
1355                _ => return None,
1356            });
1357        }
1358        if has("acl") {
1359            return Some(match method {
1360                "GET" => "GetBucketAcl",
1361                "PUT" => "PutBucketAcl",
1362                _ => return None,
1363            });
1364        }
1365        if has("versioning") {
1366            return Some(match method {
1367                "GET" => "GetBucketVersioning",
1368                "PUT" => "PutBucketVersioning",
1369                _ => return None,
1370            });
1371        }
1372        if has("cors") {
1373            return Some(match method {
1374                "GET" => "GetBucketCors",
1375                "PUT" => "PutBucketCors",
1376                "DELETE" => "DeleteBucketCors",
1377                _ => return None,
1378            });
1379        }
1380        if has("policy") {
1381            return Some(match method {
1382                "GET" => "GetBucketPolicy",
1383                "PUT" => "PutBucketPolicy",
1384                "DELETE" => "DeleteBucketPolicy",
1385                _ => return None,
1386            });
1387        }
1388        if has("website") {
1389            return Some(match method {
1390                "GET" => "GetBucketWebsite",
1391                "PUT" => "PutBucketWebsite",
1392                "DELETE" => "DeleteBucketWebsite",
1393                _ => return None,
1394            });
1395        }
1396        if has("lifecycle") {
1397            return Some(match method {
1398                "GET" => "GetBucketLifecycleConfiguration",
1399                "PUT" => "PutBucketLifecycleConfiguration",
1400                "DELETE" => "DeleteBucketLifecycle",
1401                _ => return None,
1402            });
1403        }
1404        if has("encryption") {
1405            return Some(match method {
1406                "GET" => "GetBucketEncryption",
1407                "PUT" => "PutBucketEncryption",
1408                "DELETE" => "DeleteBucketEncryption",
1409                _ => return None,
1410            });
1411        }
1412        if has("logging") {
1413            return Some(match method {
1414                "GET" => "GetBucketLogging",
1415                "PUT" => "PutBucketLogging",
1416                _ => return None,
1417            });
1418        }
1419        if has("notification") {
1420            return Some(match method {
1421                "GET" => "GetBucketNotificationConfiguration",
1422                "PUT" => "PutBucketNotificationConfiguration",
1423                _ => return None,
1424            });
1425        }
1426        if has("replication") {
1427            return Some(match method {
1428                "GET" => "GetBucketReplication",
1429                "PUT" => "PutBucketReplication",
1430                "DELETE" => "DeleteBucketReplication",
1431                _ => return None,
1432            });
1433        }
1434        if has("ownershipControls") {
1435            return Some(match method {
1436                "GET" => "GetBucketOwnershipControls",
1437                "PUT" => "PutBucketOwnershipControls",
1438                "DELETE" => "DeleteBucketOwnershipControls",
1439                _ => return None,
1440            });
1441        }
1442        if has("publicAccessBlock") {
1443            return Some(match method {
1444                "GET" => "GetPublicAccessBlock",
1445                "PUT" => "PutPublicAccessBlock",
1446                "DELETE" => "DeletePublicAccessBlock",
1447                _ => return None,
1448            });
1449        }
1450        if has("accelerate") {
1451            return Some(match method {
1452                "GET" => "GetBucketAccelerateConfiguration",
1453                "PUT" => "PutBucketAccelerateConfiguration",
1454                _ => return None,
1455            });
1456        }
1457        if has("inventory") {
1458            return Some(match method {
1459                "GET" => "GetBucketInventoryConfiguration",
1460                "PUT" => "PutBucketInventoryConfiguration",
1461                "DELETE" => "DeleteBucketInventoryConfiguration",
1462                _ => return None,
1463            });
1464        }
1465        if has("analytics") {
1466            return Some(match method {
1467                "GET" if has("id") => "GetBucketAnalyticsConfiguration",
1468                "GET" => "ListBucketAnalyticsConfigurations",
1469                "PUT" => "PutBucketAnalyticsConfiguration",
1470                "DELETE" => "DeleteBucketAnalyticsConfiguration",
1471                _ => return None,
1472            });
1473        }
1474        if has("intelligent-tiering") {
1475            return Some(match method {
1476                "GET" if has("id") => "GetBucketIntelligentTieringConfiguration",
1477                "GET" => "ListBucketIntelligentTieringConfigurations",
1478                "PUT" => "PutBucketIntelligentTieringConfiguration",
1479                "DELETE" => "DeleteBucketIntelligentTieringConfiguration",
1480                _ => return None,
1481            });
1482        }
1483        if has("metrics") {
1484            return Some(match method {
1485                "GET" if has("id") => "GetBucketMetricsConfiguration",
1486                "GET" => "ListBucketMetricsConfigurations",
1487                "PUT" => "PutBucketMetricsConfiguration",
1488                "DELETE" => "DeleteBucketMetricsConfiguration",
1489                _ => return None,
1490            });
1491        }
1492        if has("requestPayment") {
1493            return Some(match method {
1494                "GET" => "GetBucketRequestPayment",
1495                "PUT" => "PutBucketRequestPayment",
1496                _ => return None,
1497            });
1498        }
1499        if has("policyStatus") && is_get {
1500            return Some("GetBucketPolicyStatus");
1501        }
1502        if has("metadataConfiguration") {
1503            return Some(match method {
1504                "GET" => "GetBucketMetadataConfiguration",
1505                "POST" => "CreateBucketMetadataConfiguration",
1506                "DELETE" => "DeleteBucketMetadataConfiguration",
1507                _ => return None,
1508            });
1509        }
1510        if has("metadataTable") {
1511            return Some(match method {
1512                "GET" => "GetBucketMetadataTableConfiguration",
1513                "POST" => "CreateBucketMetadataTableConfiguration",
1514                "DELETE" => "DeleteBucketMetadataTableConfiguration",
1515                _ => return None,
1516            });
1517        }
1518        if has("metadataInventoryTable") && is_put {
1519            return Some("UpdateBucketMetadataInventoryTableConfiguration");
1520        }
1521        if has("metadataJournalTable") && is_put {
1522            return Some("UpdateBucketMetadataJournalTableConfiguration");
1523        }
1524        if has("abac") {
1525            // PUT  -> PutBucketAbacConfiguration
1526            // GET  -> GetBucketAbacConfiguration (previously fell through to
1527            //         the plain `GET bucket` arm and was IAM-evaluated as
1528            //         s3:ListObjects — a policy on the ABAC config was never
1529            //         honored).
1530            return Some(match method {
1531                "PUT" => "PutBucketAbacConfiguration",
1532                "GET" => "GetBucketAbacConfiguration",
1533                _ => return None,
1534            });
1535        }
1536        if has("session") && is_get {
1537            // CreateSession (S3 Express directory buckets). AWS authorizes it
1538            // under the separate `s3express:CreateSession` action — handled in
1539            // `iam_action_for`, which selects the `s3express` service prefix
1540            // for this op. Previously fell through to the plain `GET bucket`
1541            // arm and was evaluated as s3:ListObjects.
1542            return Some("CreateSession");
1543        }
1544        if has("object-lock") {
1545            return Some(match method {
1546                "GET" => "GetObjectLockConfiguration",
1547                "PUT" => "PutObjectLockConfiguration",
1548                _ => return None,
1549            });
1550        }
1551        if has("location") {
1552            return Some("GetBucketLocation");
1553        }
1554        if is_post && has("delete") {
1555            return Some("DeleteObjects");
1556        }
1557        if is_get && has("versions") {
1558            return Some("ListObjectVersions");
1559        }
1560    }
1561
1562    // Plain bucket/object methods.
1563    match (method, has_key) {
1564        ("GET", true) => Some("GetObject"),
1565        ("PUT", true) => {
1566            // CopyObject uses x-amz-copy-source but we don't have headers
1567            // handy here — treat both PutObject and CopyObject as PutObject
1568            // for IAM purposes; CopyObject additionally requires
1569            // s3:GetObject on the source but that's evaluated per-request
1570            // by real AWS, not on the PUT call itself.
1571            Some("PutObject")
1572        }
1573        ("DELETE", true) => Some("DeleteObject"),
1574        ("HEAD", true) => Some("HeadObject"),
1575        ("GET", false) => {
1576            if query.contains_key("list-type") {
1577                Some("ListObjectsV2")
1578            } else {
1579                Some("ListObjects")
1580            }
1581        }
1582        ("PUT", false) => Some("CreateBucket"),
1583        ("DELETE", false) => Some("DeleteBucket"),
1584        ("HEAD", false) => Some("HeadBucket"),
1585        _ => None,
1586    }
1587}
1588
1589/// Build the S3 resource ARN for an action. Returns `*` for
1590/// `ListBuckets` (account-scoped), a bucket ARN for bucket-level
1591/// configuration actions, or an object ARN for object-level actions.
1592fn s3_resource_for(action: &'static str, bucket: Option<&str>, key: Option<&str>) -> String {
1593    // Object-level actions work on `bucket/key`.
1594    const OBJECT_ACTIONS: &[&str] = &[
1595        "PutObject",
1596        "GetObject",
1597        "DeleteObject",
1598        "HeadObject",
1599        "CopyObject",
1600        "GetObjectAttributes",
1601        "RestoreObject",
1602        "PutObjectTagging",
1603        "GetObjectTagging",
1604        "DeleteObjectTagging",
1605        "PutObjectAcl",
1606        "GetObjectAcl",
1607        "PutObjectRetention",
1608        "GetObjectRetention",
1609        "PutObjectLegalHold",
1610        "GetObjectLegalHold",
1611        "CreateMultipartUpload",
1612        "UploadPart",
1613        "UploadPartCopy",
1614        "CompleteMultipartUpload",
1615        "AbortMultipartUpload",
1616        "ListParts",
1617    ];
1618    if action == "ListBuckets" {
1619        return "*".to_string();
1620    }
1621    let Some(bucket) = bucket else {
1622        return "*".to_string();
1623    };
1624    if OBJECT_ACTIONS.contains(&action) {
1625        match key {
1626            Some(k) if !k.is_empty() => Arn::s3(&format!("{bucket}/{k}")).to_string(),
1627            _ => Arn::s3(&format!("{bucket}/*")).to_string(),
1628        }
1629    } else {
1630        // Bucket-level actions (ListObjectsV2, GetBucketTagging, ...).
1631        Arn::s3(bucket).to_string()
1632    }
1633}
1634
1635// Conditional request helpers
1636
1637/// Truncate a DateTime to second-level precision (HTTP dates have no sub-second info).
1638pub(crate) fn truncate_to_seconds(dt: DateTime<Utc>) -> DateTime<Utc> {
1639    dt.with_nanosecond(0).unwrap_or(dt)
1640}
1641
1642pub(crate) fn check_get_conditionals(
1643    req: &AwsRequest,
1644    obj: &S3Object,
1645) -> Result<(), AwsServiceError> {
1646    let obj_etag = format!("\"{}\"", obj.etag);
1647    let obj_time = truncate_to_seconds(obj.last_modified);
1648
1649    // If-Match
1650    if let Some(if_match) = req.headers.get("if-match").and_then(|v| v.to_str().ok()) {
1651        if !etag_matches(if_match, &obj_etag) {
1652            return Err(precondition_failed("If-Match"));
1653        }
1654    }
1655
1656    // If-None-Match
1657    if let Some(if_none_match) = req
1658        .headers
1659        .get("if-none-match")
1660        .and_then(|v| v.to_str().ok())
1661    {
1662        if etag_matches(if_none_match, &obj_etag) {
1663            return Err(not_modified_with_etag(&obj_etag));
1664        }
1665    }
1666
1667    // If-Unmodified-Since
1668    if let Some(since) = req
1669        .headers
1670        .get("if-unmodified-since")
1671        .and_then(|v| v.to_str().ok())
1672    {
1673        if let Some(dt) = parse_http_date(since) {
1674            if obj_time > dt {
1675                return Err(precondition_failed("If-Unmodified-Since"));
1676            }
1677        }
1678    }
1679
1680    // If-Modified-Since
1681    if let Some(since) = req
1682        .headers
1683        .get("if-modified-since")
1684        .and_then(|v| v.to_str().ok())
1685    {
1686        if let Some(dt) = parse_http_date(since) {
1687            if obj_time <= dt {
1688                return Err(not_modified());
1689            }
1690        }
1691    }
1692
1693    Ok(())
1694}
1695
1696pub(crate) fn check_head_conditionals(
1697    req: &AwsRequest,
1698    obj: &S3Object,
1699) -> Result<(), AwsServiceError> {
1700    let obj_etag = format!("\"{}\"", obj.etag);
1701    let obj_time = truncate_to_seconds(obj.last_modified);
1702
1703    // If-Match
1704    if let Some(if_match) = req.headers.get("if-match").and_then(|v| v.to_str().ok()) {
1705        if !etag_matches(if_match, &obj_etag) {
1706            return Err(AwsServiceError::aws_error(
1707                StatusCode::PRECONDITION_FAILED,
1708                "412",
1709                "Precondition Failed",
1710            ));
1711        }
1712    }
1713
1714    // If-None-Match
1715    if let Some(if_none_match) = req
1716        .headers
1717        .get("if-none-match")
1718        .and_then(|v| v.to_str().ok())
1719    {
1720        if etag_matches(if_none_match, &obj_etag) {
1721            return Err(not_modified_with_etag(&obj_etag));
1722        }
1723    }
1724
1725    // If-Unmodified-Since
1726    if let Some(since) = req
1727        .headers
1728        .get("if-unmodified-since")
1729        .and_then(|v| v.to_str().ok())
1730    {
1731        if let Some(dt) = parse_http_date(since) {
1732            if obj_time > dt {
1733                return Err(AwsServiceError::aws_error(
1734                    StatusCode::PRECONDITION_FAILED,
1735                    "412",
1736                    "Precondition Failed",
1737                ));
1738            }
1739        }
1740    }
1741
1742    // If-Modified-Since
1743    if let Some(since) = req
1744        .headers
1745        .get("if-modified-since")
1746        .and_then(|v| v.to_str().ok())
1747    {
1748        if let Some(dt) = parse_http_date(since) {
1749            if obj_time <= dt {
1750                return Err(not_modified());
1751            }
1752        }
1753    }
1754
1755    Ok(())
1756}
1757
1758pub(crate) fn etag_matches(condition: &str, obj_etag: &str) -> bool {
1759    let condition = condition.trim();
1760    if condition == "*" {
1761        return true;
1762    }
1763    let clean_etag = obj_etag.replace('"', "");
1764    // Split on comma to handle multi-value If-Match / If-None-Match
1765    for part in condition.split(',') {
1766        let part = part.trim().replace('"', "");
1767        if part == clean_etag {
1768            return true;
1769        }
1770    }
1771    false
1772}
1773
1774pub(crate) fn parse_http_date(s: &str) -> Option<DateTime<Utc>> {
1775    // Try RFC 2822 format: "Sat, 01 Jan 2000 00:00:00 GMT"
1776    if let Ok(dt) = DateTime::parse_from_rfc2822(s) {
1777        return Some(dt.with_timezone(&Utc));
1778    }
1779    // Try RFC 3339
1780    if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
1781        return Some(dt.with_timezone(&Utc));
1782    }
1783    // Try common HTTP date format: "%a, %d %b %Y %H:%M:%S GMT"
1784    if let Ok(dt) =
1785        chrono::NaiveDateTime::parse_from_str(s.trim_end_matches(" GMT"), "%a, %d %b %Y %H:%M:%S")
1786    {
1787        return Some(dt.and_utc());
1788    }
1789    // Try ISO 8601
1790    if let Ok(dt) = s.parse::<DateTime<Utc>>() {
1791        return Some(dt);
1792    }
1793    None
1794}
1795
1796pub(crate) fn not_modified() -> AwsServiceError {
1797    AwsServiceError::aws_error(StatusCode::NOT_MODIFIED, "304", "Not Modified")
1798}
1799
1800pub(crate) fn not_modified_with_etag(etag: &str) -> AwsServiceError {
1801    AwsServiceError::aws_error_with_headers(
1802        StatusCode::NOT_MODIFIED,
1803        "304",
1804        "Not Modified",
1805        vec![("etag".to_string(), etag.to_string())],
1806    )
1807}
1808
1809pub(crate) fn precondition_failed(condition: &str) -> AwsServiceError {
1810    AwsServiceError::aws_error_with_fields(
1811        StatusCode::PRECONDITION_FAILED,
1812        "PreconditionFailed",
1813        "At least one of the pre-conditions you specified did not hold",
1814        vec![("Condition".to_string(), condition.to_string())],
1815    )
1816}
1817
1818// ACL helpers
1819
1820pub(crate) fn build_acl_xml(owner_id: &str, grants: &[AclGrant], _account_id: &str) -> String {
1821    let mut grants_xml = String::new();
1822    for g in grants {
1823        let grantee_xml = if g.grantee_type == "Group" {
1824            let uri = g.grantee_uri.as_deref().unwrap_or("");
1825            format!(
1826                "<Grantee xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xsi:type=\"Group\">\
1827                 <URI>{}</URI></Grantee>",
1828                xml_escape(uri),
1829            )
1830        } else {
1831            let id = g.grantee_id.as_deref().unwrap_or("");
1832            format!(
1833                "<Grantee xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xsi:type=\"CanonicalUser\">\
1834                 <ID>{}</ID></Grantee>",
1835                xml_escape(id),
1836            )
1837        };
1838        grants_xml.push_str(&format!(
1839            "<Grant>{grantee_xml}<Permission>{}</Permission></Grant>",
1840            xml_escape(&g.permission),
1841        ));
1842    }
1843
1844    format!(
1845        "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
1846         <AccessControlPolicy xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
1847         <Owner><ID>{owner_id}</ID><DisplayName>{owner_id}</DisplayName></Owner>\
1848         <AccessControlList>{grants_xml}</AccessControlList>\
1849         </AccessControlPolicy>",
1850        owner_id = xml_escape(owner_id),
1851    )
1852}
1853
1854pub(crate) fn canned_acl_grants(acl: &str, owner_id: &str) -> Vec<AclGrant> {
1855    let owner_grant = AclGrant {
1856        grantee_type: "CanonicalUser".to_string(),
1857        grantee_id: Some(owner_id.to_string()),
1858        grantee_display_name: Some(owner_id.to_string()),
1859        grantee_uri: None,
1860        permission: "FULL_CONTROL".to_string(),
1861    };
1862    match acl {
1863        "private" => vec![owner_grant],
1864        "public-read" => vec![
1865            owner_grant,
1866            AclGrant {
1867                grantee_type: "Group".to_string(),
1868                grantee_id: None,
1869                grantee_display_name: None,
1870                grantee_uri: Some("http://acs.amazonaws.com/groups/global/AllUsers".to_string()),
1871                permission: "READ".to_string(),
1872            },
1873        ],
1874        "public-read-write" => vec![
1875            owner_grant,
1876            AclGrant {
1877                grantee_type: "Group".to_string(),
1878                grantee_id: None,
1879                grantee_display_name: None,
1880                grantee_uri: Some("http://acs.amazonaws.com/groups/global/AllUsers".to_string()),
1881                permission: "READ".to_string(),
1882            },
1883            AclGrant {
1884                grantee_type: "Group".to_string(),
1885                grantee_id: None,
1886                grantee_display_name: None,
1887                grantee_uri: Some("http://acs.amazonaws.com/groups/global/AllUsers".to_string()),
1888                permission: "WRITE".to_string(),
1889            },
1890        ],
1891        "authenticated-read" => vec![
1892            owner_grant,
1893            AclGrant {
1894                grantee_type: "Group".to_string(),
1895                grantee_id: None,
1896                grantee_display_name: None,
1897                grantee_uri: Some(
1898                    "http://acs.amazonaws.com/groups/global/AuthenticatedUsers".to_string(),
1899                ),
1900                permission: "READ".to_string(),
1901            },
1902        ],
1903        "bucket-owner-full-control" => vec![owner_grant],
1904        _ => vec![owner_grant],
1905    }
1906}
1907
1908pub(crate) fn canned_acl_grants_for_object(acl: &str, owner_id: &str) -> Vec<AclGrant> {
1909    // For objects, canned ACLs work the same way
1910    canned_acl_grants(acl, owner_id)
1911}
1912
1913pub(crate) fn parse_grant_headers(headers: &HeaderMap) -> Vec<AclGrant> {
1914    let mut grants = Vec::new();
1915    let header_permission_map = [
1916        ("x-amz-grant-read", "READ"),
1917        ("x-amz-grant-write", "WRITE"),
1918        ("x-amz-grant-read-acp", "READ_ACP"),
1919        ("x-amz-grant-write-acp", "WRITE_ACP"),
1920        ("x-amz-grant-full-control", "FULL_CONTROL"),
1921    ];
1922
1923    for (header, permission) in &header_permission_map {
1924        if let Some(value) = headers.get(*header).and_then(|v| v.to_str().ok()) {
1925            // Parse "id=xxx" or "uri=xxx" or "emailAddress=xxx"
1926            for part in value.split(',') {
1927                let part = part.trim();
1928                if let Some((key, val)) = part.split_once('=') {
1929                    let val = val.trim().trim_matches('"');
1930                    let key = key.trim().to_lowercase();
1931                    match key.as_str() {
1932                        "id" => {
1933                            grants.push(AclGrant {
1934                                grantee_type: "CanonicalUser".to_string(),
1935                                grantee_id: Some(val.to_string()),
1936                                grantee_display_name: Some(val.to_string()),
1937                                grantee_uri: None,
1938                                permission: permission.to_string(),
1939                            });
1940                        }
1941                        "uri" | "url" => {
1942                            grants.push(AclGrant {
1943                                grantee_type: "Group".to_string(),
1944                                grantee_id: None,
1945                                grantee_display_name: None,
1946                                grantee_uri: Some(val.to_string()),
1947                                permission: permission.to_string(),
1948                            });
1949                        }
1950                        _ => {}
1951                    }
1952                }
1953            }
1954        }
1955    }
1956    grants
1957}
1958
1959pub(crate) fn parse_acl_xml(xml: &str) -> Result<Vec<AclGrant>, AwsServiceError> {
1960    // Check for Owner presence
1961    if xml.contains("<AccessControlPolicy") && !xml.contains("<Owner>") {
1962        return Err(AwsServiceError::aws_error(
1963            StatusCode::BAD_REQUEST,
1964            "MalformedACLError",
1965            "The XML you provided was not well-formed or did not validate against our published schema",
1966        ));
1967    }
1968
1969    let valid_permissions = ["READ", "WRITE", "READ_ACP", "WRITE_ACP", "FULL_CONTROL"];
1970
1971    let mut grants = Vec::new();
1972    let mut remaining = xml;
1973    while let Some(start) = remaining.find("<Grant>") {
1974        let after = &remaining[start + 7..];
1975        if let Some(end) = after.find("</Grant>") {
1976            let grant_body = &after[..end];
1977
1978            // Extract permission
1979            let permission = extract_xml_value(grant_body, "Permission").unwrap_or_default();
1980            if !valid_permissions.contains(&permission.as_str()) {
1981                return Err(AwsServiceError::aws_error(
1982                    StatusCode::BAD_REQUEST,
1983                    "MalformedACLError",
1984                    "The XML you provided was not well-formed or did not validate against our published schema",
1985                ));
1986            }
1987
1988            // Determine grantee type
1989            if grant_body.contains("xsi:type=\"Group\"") || grant_body.contains("<URI>") {
1990                let uri = extract_xml_value(grant_body, "URI").unwrap_or_default();
1991                grants.push(AclGrant {
1992                    grantee_type: "Group".to_string(),
1993                    grantee_id: None,
1994                    grantee_display_name: None,
1995                    grantee_uri: Some(uri),
1996                    permission,
1997                });
1998            } else {
1999                let id = extract_xml_value(grant_body, "ID").unwrap_or_default();
2000                let display =
2001                    extract_xml_value(grant_body, "DisplayName").unwrap_or_else(|| id.clone());
2002                grants.push(AclGrant {
2003                    grantee_type: "CanonicalUser".to_string(),
2004                    grantee_id: Some(id),
2005                    grantee_display_name: Some(display),
2006                    grantee_uri: None,
2007                    permission,
2008                });
2009            }
2010
2011            remaining = &after[end + 8..];
2012        } else {
2013            break;
2014        }
2015    }
2016    Ok(grants)
2017}
2018
2019// Range helpers
2020
2021pub(crate) enum RangeResult {
2022    Satisfiable { start: usize, end: usize },
2023    NotSatisfiable,
2024    Ignored,
2025}
2026
2027pub(crate) fn parse_range_header(range_str: &str, total_size: usize) -> Option<RangeResult> {
2028    let range_str = range_str.strip_prefix("bytes=")?;
2029    let (start_str, end_str) = range_str.split_once('-')?;
2030    if start_str.is_empty() {
2031        let suffix_len: usize = end_str.parse().ok()?;
2032        if suffix_len == 0 || total_size == 0 {
2033            return Some(RangeResult::NotSatisfiable);
2034        }
2035        let start = total_size.saturating_sub(suffix_len);
2036        Some(RangeResult::Satisfiable {
2037            start,
2038            end: total_size - 1,
2039        })
2040    } else {
2041        let start: usize = start_str.parse().ok()?;
2042        if start >= total_size {
2043            return Some(RangeResult::NotSatisfiable);
2044        }
2045        let end = if end_str.is_empty() {
2046            total_size - 1
2047        } else {
2048            let e: usize = end_str.parse().ok()?;
2049            if e < start {
2050                return Some(RangeResult::Ignored);
2051            }
2052            std::cmp::min(e, total_size - 1)
2053        };
2054        Some(RangeResult::Satisfiable { start, end })
2055    }
2056}
2057
2058// Helpers
2059
2060/// S3 XML response with `application/xml` content type (unlike Query protocol's `text/xml`).
2061pub(crate) fn s3_xml(status: StatusCode, body: impl Into<Bytes>) -> AwsResponse {
2062    AwsResponse {
2063        status,
2064        content_type: "application/xml".to_string(),
2065        body: body.into().into(),
2066        headers: HeaderMap::new(),
2067    }
2068}
2069
2070pub(crate) fn empty_response(status: StatusCode) -> AwsResponse {
2071    AwsResponse {
2072        status,
2073        content_type: "application/xml".to_string(),
2074        body: Bytes::new().into(),
2075        headers: HeaderMap::new(),
2076    }
2077}
2078
2079/// Returns true when the object is stored in a "cold" storage class (GLACIER, DEEP_ARCHIVE)
2080/// and has NOT been restored (or restore is still in progress).
2081pub(crate) fn is_frozen(obj: &S3Object) -> bool {
2082    matches!(obj.storage_class.as_str(), "GLACIER" | "DEEP_ARCHIVE")
2083        && obj.restore_ongoing != Some(false)
2084}
2085
2086pub(crate) fn no_such_bucket(bucket: &str) -> AwsServiceError {
2087    AwsServiceError::aws_error_with_fields(
2088        StatusCode::NOT_FOUND,
2089        "NoSuchBucket",
2090        "The specified bucket does not exist",
2091        vec![("BucketName".to_string(), bucket.to_string())],
2092    )
2093}
2094
2095pub(crate) fn no_such_key(key: &str) -> AwsServiceError {
2096    AwsServiceError::aws_error_with_fields(
2097        StatusCode::NOT_FOUND,
2098        "NoSuchKey",
2099        "The specified key does not exist.",
2100        vec![("Key".to_string(), key.to_string())],
2101    )
2102}
2103
2104pub(crate) fn no_such_upload(upload_id: &str) -> AwsServiceError {
2105    AwsServiceError::aws_error_with_fields(
2106        StatusCode::NOT_FOUND,
2107        "NoSuchUpload",
2108        "The specified upload does not exist. The upload ID may be invalid, \
2109         or the upload may have been aborted or completed.",
2110        vec![("UploadId".to_string(), upload_id.to_string())],
2111    )
2112}
2113
2114pub(crate) fn no_such_key_with_detail(key: &str) -> AwsServiceError {
2115    AwsServiceError::aws_error_with_fields(
2116        StatusCode::NOT_FOUND,
2117        "NoSuchKey",
2118        "The specified key does not exist.",
2119        vec![("Key".to_string(), key.to_string())],
2120    )
2121}
2122
2123pub(crate) fn compute_md5(data: &[u8]) -> String {
2124    let digest = Md5::digest(data);
2125    format!("{:x}", digest)
2126}
2127
2128pub(crate) fn compute_checksum(algorithm: &str, data: &[u8]) -> String {
2129    let raw = compute_checksum_raw(algorithm, data);
2130    if raw.is_empty() {
2131        String::new()
2132    } else {
2133        BASE64.encode(raw)
2134    }
2135}
2136
2137/// Compute the raw (un-base64'd) checksum digest bytes for `data`.
2138///
2139/// Returns the network-order digest bytes so callers can either base64
2140/// them ([`compute_checksum`]) or concatenate them when computing an S3
2141/// multipart COMPOSITE checksum (which hashes the concatenation of each
2142/// part's raw digest, then base64s the result and appends `-N`).
2143pub(crate) fn compute_checksum_raw(algorithm: &str, data: &[u8]) -> Vec<u8> {
2144    match algorithm {
2145        "CRC32" => crc32fast::hash(data).to_be_bytes().to_vec(),
2146        // CRC32C and CRC64NVME use the same crates as the streaming path so
2147        // CopyObject / CompleteMultipartUpload produce identical results to
2148        // a streaming PutObject (1.7); the COMPOSITE multipart checksum
2149        // concatenates these raw digests before hashing (1.18).
2150        "CRC32C" => crc32c::crc32c(data).to_be_bytes().to_vec(),
2151        "CRC64NVME" => {
2152            let mut hasher = crc64fast_nvme::Digest::new();
2153            hasher.write(data);
2154            hasher.sum64().to_be_bytes().to_vec()
2155        }
2156        "SHA1" => {
2157            use sha1::Digest as _;
2158            sha1::Sha1::digest(data).to_vec()
2159        }
2160        "SHA256" => {
2161            use sha2::Digest as _;
2162            sha2::Sha256::digest(data).to_vec()
2163        }
2164        _ => Vec::new(),
2165    }
2166}
2167
2168/// Compute an S3 multipart COMPOSITE additional checksum from each
2169/// part's raw digest bytes (in part-number order). AWS defines this as
2170/// `base64(HASH(concat(raw_part_digests)))-N` where HASH is the chosen
2171/// algorithm and N is the part count. Returns `None` for an unsupported
2172/// algorithm or empty part list.
2173pub(crate) fn compute_composite_checksum(
2174    algorithm: &str,
2175    part_raw_digests: &[Vec<u8>],
2176) -> Option<String> {
2177    if part_raw_digests.is_empty() {
2178        return None;
2179    }
2180    let mut concat = Vec::new();
2181    for d in part_raw_digests {
2182        concat.extend_from_slice(d);
2183    }
2184    let digest = compute_checksum_raw(algorithm, &concat);
2185    if digest.is_empty() {
2186        return None;
2187    }
2188    Some(format!(
2189        "{}-{}",
2190        BASE64.encode(digest),
2191        part_raw_digests.len()
2192    ))
2193}
2194
2195/// Streaming variant of [`compute_checksum`] for spool files. Reads
2196/// the file in 1 MiB chunks and feeds each chunk into the hasher so a
2197/// 1 GiB upload computes its CRC32 / SHA-1 / SHA-256 in constant
2198/// memory rather than via `tokio::fs::read` (which would allocate the
2199/// whole file as one buffer).
2200pub(crate) async fn compute_checksum_streaming(
2201    algorithm: &str,
2202    path: &std::path::Path,
2203) -> Result<String, std::io::Error> {
2204    use tokio::io::AsyncReadExt;
2205    let mut file = tokio::fs::File::open(path).await?;
2206    let mut buf = vec![0u8; 1024 * 1024];
2207    match algorithm {
2208        "CRC32" => {
2209            let mut hasher = crc32fast::Hasher::new();
2210            loop {
2211                let n = file.read(&mut buf).await?;
2212                if n == 0 {
2213                    break;
2214                }
2215                hasher.update(&buf[..n]);
2216            }
2217            Ok(BASE64.encode(hasher.finalize().to_be_bytes()))
2218        }
2219        "CRC32C" => {
2220            let mut crc: u32 = 0;
2221            loop {
2222                let n = file.read(&mut buf).await?;
2223                if n == 0 {
2224                    break;
2225                }
2226                crc = crc32c::crc32c_append(crc, &buf[..n]);
2227            }
2228            Ok(BASE64.encode(crc.to_be_bytes()))
2229        }
2230        "CRC64NVME" => {
2231            let mut hasher = crc64fast_nvme::Digest::new();
2232            loop {
2233                let n = file.read(&mut buf).await?;
2234                if n == 0 {
2235                    break;
2236                }
2237                hasher.write(&buf[..n]);
2238            }
2239            Ok(BASE64.encode(hasher.sum64().to_be_bytes()))
2240        }
2241        "SHA1" => {
2242            use sha1::Digest as _;
2243            let mut hasher = sha1::Sha1::new();
2244            loop {
2245                let n = file.read(&mut buf).await?;
2246                if n == 0 {
2247                    break;
2248                }
2249                hasher.update(&buf[..n]);
2250            }
2251            Ok(BASE64.encode(hasher.finalize()))
2252        }
2253        "SHA256" => {
2254            use sha2::Digest as _;
2255            let mut hasher = sha2::Sha256::new();
2256            loop {
2257                let n = file.read(&mut buf).await?;
2258                if n == 0 {
2259                    break;
2260                }
2261                hasher.update(&buf[..n]);
2262            }
2263            Ok(BASE64.encode(hasher.finalize()))
2264        }
2265        _ => Ok(String::new()),
2266    }
2267}
2268
2269pub(crate) fn url_encode_s3_key(s: &str) -> String {
2270    let mut out = String::with_capacity(s.len() * 2);
2271    for byte in s.bytes() {
2272        match byte {
2273            b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' | b'/' => {
2274                out.push(byte as char);
2275            }
2276            _ => {
2277                out.push_str(&format!("%{:02X}", byte));
2278            }
2279        }
2280    }
2281    out
2282}
2283
2284pub(crate) use fakecloud_aws::xml::xml_escape;
2285
2286pub(crate) fn extract_user_metadata(
2287    headers: &HeaderMap,
2288) -> std::collections::BTreeMap<String, String> {
2289    let mut meta = std::collections::BTreeMap::new();
2290    for (name, value) in headers {
2291        if let Some(key) = name.as_str().strip_prefix("x-amz-meta-") {
2292            if let Ok(v) = value.to_str() {
2293                meta.insert(key.to_string(), v.to_string());
2294            }
2295        }
2296    }
2297    meta
2298}
2299
2300pub(crate) fn is_valid_storage_class(class: &str) -> bool {
2301    matches!(
2302        class,
2303        "STANDARD"
2304            | "REDUCED_REDUNDANCY"
2305            | "STANDARD_IA"
2306            | "ONEZONE_IA"
2307            | "INTELLIGENT_TIERING"
2308            | "GLACIER"
2309            | "DEEP_ARCHIVE"
2310            | "GLACIER_IR"
2311            | "OUTPOSTS"
2312            | "SNOW"
2313            | "EXPRESS_ONEZONE"
2314    )
2315}
2316
2317pub(crate) fn is_valid_bucket_name(name: &str) -> bool {
2318    if name.len() < 3 || name.len() > 63 {
2319        return false;
2320    }
2321    // Must start and end with alphanumeric
2322    let bytes = name.as_bytes();
2323    if !bytes[0].is_ascii_alphanumeric() || !bytes[bytes.len() - 1].is_ascii_alphanumeric() {
2324        return false;
2325    }
2326    // Only lowercase letters, digits, hyphens, dots (also allow underscores for compatibility)
2327    name.chars()
2328        .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '-' || c == '.' || c == '_')
2329}
2330
2331pub(crate) fn is_valid_region(region: &str) -> bool {
2332    // Basic validation: region should match pattern like us-east-1, eu-west-2, etc.
2333    let valid_regions = [
2334        "us-east-1",
2335        "us-east-2",
2336        "us-west-1",
2337        "us-west-2",
2338        "af-south-1",
2339        "ap-east-1",
2340        "ap-south-1",
2341        "ap-south-2",
2342        "ap-southeast-1",
2343        "ap-southeast-2",
2344        "ap-southeast-3",
2345        "ap-southeast-4",
2346        "ap-northeast-1",
2347        "ap-northeast-2",
2348        "ap-northeast-3",
2349        "ca-central-1",
2350        "ca-west-1",
2351        "eu-central-1",
2352        "eu-central-2",
2353        "eu-west-1",
2354        "eu-west-2",
2355        "eu-west-3",
2356        "eu-south-1",
2357        "eu-south-2",
2358        "eu-north-1",
2359        "il-central-1",
2360        "me-south-1",
2361        "me-central-1",
2362        "sa-east-1",
2363        "cn-north-1",
2364        "cn-northwest-1",
2365        "us-gov-east-1",
2366        "us-gov-east-2",
2367        "us-gov-west-1",
2368        "us-iso-east-1",
2369        "us-iso-west-1",
2370        "us-isob-east-1",
2371        "us-isof-south-1",
2372    ];
2373    valid_regions.contains(&region)
2374}
2375
2376pub(crate) fn resolve_object<'a>(
2377    b: &'a S3Bucket,
2378    key: &str,
2379    version_id: Option<&String>,
2380) -> Result<&'a S3Object, AwsServiceError> {
2381    if let Some(vid) = version_id {
2382        // "null" version ID refers to an object with no version_id (pre-versioning)
2383        if vid == "null" {
2384            // Check versions for a pre-versioning object (version_id == None or Some("null"))
2385            if let Some(versions) = b.object_versions.get(key) {
2386                if let Some(obj) = versions
2387                    .iter()
2388                    .find(|o| o.version_id.is_none() || o.version_id.as_deref() == Some("null"))
2389                {
2390                    return Ok(obj);
2391                }
2392            }
2393            // Also check current object if it has no version_id
2394            if let Some(obj) = b.objects.get(key) {
2395                if obj.version_id.is_none() || obj.version_id.as_deref() == Some("null") {
2396                    return Ok(obj);
2397                }
2398            }
2399        } else {
2400            // When a specific versionId is requested, check versions first
2401            if let Some(versions) = b.object_versions.get(key) {
2402                if let Some(obj) = versions
2403                    .iter()
2404                    .find(|o| o.version_id.as_deref() == Some(vid.as_str()))
2405                {
2406                    return Ok(obj);
2407                }
2408            }
2409            // Also check current object
2410            if let Some(obj) = b.objects.get(key) {
2411                if obj.version_id.as_deref() == Some(vid.as_str()) {
2412                    return Ok(obj);
2413                }
2414            }
2415        }
2416        // For versioned buckets, return NoSuchVersion; for non-versioned, return 400
2417        if b.versioning.is_some() {
2418            Err(AwsServiceError::aws_error_with_fields(
2419                StatusCode::NOT_FOUND,
2420                "NoSuchVersion",
2421                "The specified version does not exist.",
2422                vec![
2423                    ("Key".to_string(), key.to_string()),
2424                    ("VersionId".to_string(), vid.to_string()),
2425                ],
2426            ))
2427        } else {
2428            Err(AwsServiceError::aws_error(
2429                StatusCode::BAD_REQUEST,
2430                "InvalidArgument",
2431                "Invalid version id specified",
2432            ))
2433        }
2434    } else {
2435        b.objects.get(key).ok_or_else(|| no_such_key(key))
2436    }
2437}
2438
2439pub(crate) fn make_delete_marker(key: &str, dm_id: &str) -> S3Object {
2440    S3Object {
2441        key: key.to_string(),
2442        last_modified: Utc::now(),
2443        storage_class: "STANDARD".to_string(),
2444        version_id: Some(dm_id.to_string()),
2445        is_delete_marker: true,
2446        ..Default::default()
2447    }
2448}
2449
2450/// Represents an object to delete in a batch delete request.
2451pub(crate) struct DeleteObjectEntry {
2452    key: String,
2453    version_id: Option<String>,
2454}
2455
2456pub(crate) fn parse_delete_objects_xml(xml: &str) -> Vec<DeleteObjectEntry> {
2457    let mut entries = Vec::new();
2458    let mut remaining = xml;
2459    while let Some(obj_start) = remaining.find("<Object>") {
2460        let after = &remaining[obj_start + 8..];
2461        if let Some(obj_end) = after.find("</Object>") {
2462            let obj_body = &after[..obj_end];
2463            let key = extract_xml_value(obj_body, "Key");
2464            let version_id = extract_xml_value(obj_body, "VersionId");
2465            if let Some(k) = key {
2466                entries.push(DeleteObjectEntry { key: k, version_id });
2467            }
2468            remaining = &after[obj_end + 9..];
2469        } else {
2470            break;
2471        }
2472    }
2473    entries
2474}
2475
2476/// Returns true when the request body's top-level <Quiet> element is "true".
2477/// AWS docs: when Quiet=true, only failed deletes are emitted.
2478pub(crate) fn parse_delete_objects_quiet(xml: &str) -> bool {
2479    extract_xml_value(xml, "Quiet")
2480        .map(|v| v.eq_ignore_ascii_case("true"))
2481        .unwrap_or(false)
2482}
2483
2484/// Minimal XML parser for `<Tagging><TagSet><Tag><Key>k</Key><Value>v</Value></Tag>...`.
2485/// Returns a Vec to preserve insertion order and detect duplicates.
2486pub(crate) fn parse_tagging_xml(xml: &str) -> Vec<(String, String)> {
2487    let mut tags = Vec::new();
2488    let mut remaining = xml;
2489    while let Some(tag_start) = remaining.find("<Tag>") {
2490        let after = &remaining[tag_start + 5..];
2491        if let Some(tag_end) = after.find("</Tag>") {
2492            let tag_body = &after[..tag_end];
2493            let key = extract_xml_value(tag_body, "Key");
2494            let value = extract_xml_value(tag_body, "Value");
2495            if let (Some(k), Some(v)) = (key, value) {
2496                tags.push((k, v));
2497            }
2498            remaining = &after[tag_end + 6..];
2499        } else {
2500            break;
2501        }
2502    }
2503    tags
2504}
2505
2506pub(crate) fn validate_tags(tags: &[(String, String)]) -> Result<(), AwsServiceError> {
2507    // Check for duplicate keys
2508    let mut seen = std::collections::HashSet::new();
2509    for (k, _) in tags {
2510        if !seen.insert(k.as_str()) {
2511            return Err(AwsServiceError::aws_error(
2512                StatusCode::BAD_REQUEST,
2513                "InvalidTag",
2514                "Cannot provide multiple Tags with the same key",
2515            ));
2516        }
2517        // Check for aws: prefix
2518        if k.starts_with("aws:") {
2519            return Err(AwsServiceError::aws_error(
2520                StatusCode::BAD_REQUEST,
2521                "InvalidTag",
2522                "System tags cannot be added/updated by requester",
2523            ));
2524        }
2525    }
2526    Ok(())
2527}
2528
2529pub(crate) fn extract_xml_value(xml: &str, tag: &str) -> Option<String> {
2530    // Handle self-closing tags like <Value /> or <Value/>
2531    let self_closing1 = format!("<{tag} />");
2532    let self_closing2 = format!("<{tag}/>");
2533    if xml.contains(&self_closing1) || xml.contains(&self_closing2) {
2534        // Check if the self-closing tag appears before any open+close pair
2535        let self_pos = xml
2536            .find(&self_closing1)
2537            .or_else(|| xml.find(&self_closing2));
2538        let open = format!("<{tag}>");
2539        let open_pos = xml.find(&open);
2540        match (self_pos, open_pos) {
2541            (Some(sp), Some(op)) if sp < op => return Some(String::new()),
2542            (Some(_), None) => return Some(String::new()),
2543            _ => {}
2544        }
2545    }
2546
2547    let open = format!("<{tag}>");
2548    let close = format!("</{tag}>");
2549    let start = xml.find(&open)? + open.len();
2550    // Search for the closing tag AFTER the opening one so a malformed body
2551    // (closing tag before opening) can't make end < start and panic the
2552    // slice (bug-audit 2026-05-28, 2.2).
2553    let end = xml[start..].find(&close)? + start;
2554    Some(xml[start..end].to_string())
2555}
2556
2557/// Parse the CompleteMultipartUpload XML body into (part_number, etag) pairs.
2558pub(crate) fn parse_complete_multipart_xml(xml: &str) -> Vec<(u32, String)> {
2559    let mut parts = Vec::new();
2560    let mut remaining = xml;
2561    while let Some(part_start) = remaining.find("<Part>") {
2562        let after = &remaining[part_start + 6..];
2563        if let Some(part_end) = after.find("</Part>") {
2564            let part_body = &after[..part_end];
2565            let part_num =
2566                extract_xml_value(part_body, "PartNumber").and_then(|s| s.parse::<u32>().ok());
2567            let etag = extract_xml_value(part_body, "ETag")
2568                .map(|s| s.replace("&quot;", "").replace('"', ""));
2569            if let (Some(num), Some(e)) = (part_num, etag) {
2570                parts.push((num, e));
2571            }
2572            remaining = &after[part_end + 7..];
2573        } else {
2574            break;
2575        }
2576    }
2577    parts
2578}
2579
2580pub(crate) fn parse_url_encoded_tags(s: &str) -> Vec<(String, String)> {
2581    let mut tags = Vec::new();
2582    for pair in s.split('&') {
2583        if pair.is_empty() {
2584            continue;
2585        }
2586        let (key, value) = match pair.find('=') {
2587            Some(pos) => (&pair[..pos], &pair[pos + 1..]),
2588            None => (pair, ""),
2589        };
2590        tags.push((
2591            percent_encoding::percent_decode_str(key)
2592                .decode_utf8_lossy()
2593                .to_string(),
2594            percent_encoding::percent_decode_str(value)
2595                .decode_utf8_lossy()
2596                .to_string(),
2597        ));
2598    }
2599    tags
2600}
2601
2602/// Validate lifecycle configuration XML. Returns MalformedXML on invalid configs.
2603pub(crate) fn validate_lifecycle_xml(xml: &str) -> Result<(), AwsServiceError> {
2604    let malformed = || {
2605        AwsServiceError::aws_error(
2606            StatusCode::BAD_REQUEST,
2607            "MalformedXML",
2608            "The XML you provided was not well-formed or did not validate against our published schema",
2609        )
2610    };
2611
2612    let mut remaining = xml;
2613    while let Some(rule_start) = remaining.find("<Rule>") {
2614        let after = &remaining[rule_start + 6..];
2615        if let Some(rule_end) = after.find("</Rule>") {
2616            let rule_body = &after[..rule_end];
2617
2618            // Must have Filter or Prefix
2619            let has_filter = rule_body.contains("<Filter>")
2620                || rule_body.contains("<Filter/>")
2621                || rule_body.contains("<Filter />");
2622
2623            // Check for <Prefix> at rule level (outside of <Filter>...</Filter>)
2624            let has_prefix_outside_filter = {
2625                if !rule_body.contains("<Prefix") {
2626                    false
2627                } else if !has_filter {
2628                    true // No filter means any Prefix is at rule level
2629                } else {
2630                    // Remove the Filter block and check if Prefix remains
2631                    let mut stripped = rule_body.to_string();
2632                    // Remove <Filter>...</Filter> or self-closing variants
2633                    if let Some(fs) = stripped.find("<Filter") {
2634                        if let Some(fe) = stripped.find("</Filter>") {
2635                            stripped = format!("{}{}", &stripped[..fs], &stripped[fe + 9..]);
2636                        }
2637                    }
2638                    stripped.contains("<Prefix")
2639                }
2640            };
2641
2642            if !has_filter && !has_prefix_outside_filter {
2643                return Err(malformed());
2644            }
2645            // Can't have both Filter and rule-level Prefix
2646            if has_filter && has_prefix_outside_filter {
2647                return Err(malformed());
2648            }
2649
2650            // Expiration: if has ExpiredObjectDeleteMarker, cannot also have Days or Date
2651            // (only check within <Expiration> block)
2652            if let Some(exp_start) = rule_body.find("<Expiration>") {
2653                if let Some(exp_end) = rule_body[exp_start..].find("</Expiration>") {
2654                    let exp_body = &rule_body[exp_start..exp_start + exp_end];
2655                    if exp_body.contains("<ExpiredObjectDeleteMarker>")
2656                        && (exp_body.contains("<Days>") || exp_body.contains("<Date>"))
2657                    {
2658                        return Err(malformed());
2659                    }
2660                }
2661            }
2662
2663            // Filter validation
2664            if has_filter {
2665                if let Some(fs) = rule_body.find("<Filter>") {
2666                    if let Some(fe) = rule_body.find("</Filter>") {
2667                        let filter_body = &rule_body[fs + 8..fe];
2668                        let has_prefix_in_filter = filter_body.contains("<Prefix");
2669                        let has_tag_in_filter = filter_body.contains("<Tag>");
2670                        let has_and_in_filter = filter_body.contains("<And>");
2671                        // Can't have both Prefix and Tag without And
2672                        if has_prefix_in_filter && has_tag_in_filter && !has_and_in_filter {
2673                            return Err(malformed());
2674                        }
2675                        // Can't have Tag and And simultaneously at the Filter level
2676                        if has_tag_in_filter && has_and_in_filter {
2677                            // Check if the <Tag> is outside <And>
2678                            let and_start = filter_body.find("<And>").unwrap_or(0);
2679                            let tag_pos = filter_body.find("<Tag>").unwrap_or(0);
2680                            if tag_pos < and_start {
2681                                return Err(malformed());
2682                            }
2683                        }
2684                    }
2685                }
2686            }
2687
2688            // NoncurrentVersionTransition must have NoncurrentDays and StorageClass
2689            if rule_body.contains("<NoncurrentVersionTransition>") {
2690                let mut nvt_remaining = rule_body;
2691                while let Some(nvt_start) = nvt_remaining.find("<NoncurrentVersionTransition>") {
2692                    let nvt_after = &nvt_remaining[nvt_start + 29..];
2693                    if let Some(nvt_end) = nvt_after.find("</NoncurrentVersionTransition>") {
2694                        let nvt_body = &nvt_after[..nvt_end];
2695                        if !nvt_body.contains("<NoncurrentDays>") {
2696                            return Err(malformed());
2697                        }
2698                        if !nvt_body.contains("<StorageClass>") {
2699                            return Err(malformed());
2700                        }
2701                        nvt_remaining = &nvt_after[nvt_end + 30..];
2702                    } else {
2703                        break;
2704                    }
2705                }
2706            }
2707
2708            remaining = &after[rule_end + 7..];
2709        } else {
2710            break;
2711        }
2712    }
2713
2714    Ok(())
2715}
2716
2717/// Parsed CORS rule from bucket configuration XML.
2718pub(crate) struct CorsRule {
2719    allowed_origins: Vec<String>,
2720    allowed_methods: Vec<String>,
2721    allowed_headers: Vec<String>,
2722    expose_headers: Vec<String>,
2723    max_age_seconds: Option<u32>,
2724}
2725
2726/// Parse CORS configuration XML into rules.
2727pub(crate) fn parse_cors_config(xml: &str) -> Vec<CorsRule> {
2728    let mut rules = Vec::new();
2729    let mut remaining = xml;
2730    while let Some(start) = remaining.find("<CORSRule>") {
2731        let after = &remaining[start + 10..];
2732        if let Some(end) = after.find("</CORSRule>") {
2733            let block = &after[..end];
2734            let allowed_origins = extract_all_xml_values(block, "AllowedOrigin");
2735            let allowed_methods = extract_all_xml_values(block, "AllowedMethod");
2736            let allowed_headers = extract_all_xml_values(block, "AllowedHeader");
2737            let expose_headers = extract_all_xml_values(block, "ExposeHeader");
2738            let max_age_seconds =
2739                extract_xml_value(block, "MaxAgeSeconds").and_then(|s| s.parse().ok());
2740            rules.push(CorsRule {
2741                allowed_origins,
2742                allowed_methods,
2743                allowed_headers,
2744                expose_headers,
2745                max_age_seconds,
2746            });
2747            remaining = &after[end + 11..];
2748        } else {
2749            break;
2750        }
2751    }
2752    rules
2753}
2754
2755/// Match an origin against a CORS allowed origin pattern (supports "*" wildcard).
2756pub(crate) fn origin_matches(origin: &str, pattern: &str) -> bool {
2757    if pattern == "*" {
2758        return true;
2759    }
2760    // Simple wildcard: *.example.com
2761    if let Some(suffix) = pattern.strip_prefix('*') {
2762        return origin.ends_with(suffix);
2763    }
2764    origin == pattern
2765}
2766
2767/// Find the matching CORS rule for a given origin and method.
2768pub(crate) fn find_cors_rule<'a>(
2769    rules: &'a [CorsRule],
2770    origin: &str,
2771    method: Option<&str>,
2772) -> Option<&'a CorsRule> {
2773    rules.iter().find(|rule| {
2774        let origin_ok = rule
2775            .allowed_origins
2776            .iter()
2777            .any(|o| origin_matches(origin, o));
2778        let method_ok = match method {
2779            Some(m) => rule.allowed_methods.iter().any(|am| am == m),
2780            None => true,
2781        };
2782        origin_ok && method_ok
2783    })
2784}
2785
2786/// Check if an object is locked (retention or legal hold) and should block mutation.
2787/// Returns an error string if locked, None if allowed.
2788pub(crate) fn check_object_lock_for_overwrite(
2789    obj: &S3Object,
2790    req: &AwsRequest,
2791) -> Option<&'static str> {
2792    // Legal hold blocks overwrite
2793    if obj.lock_legal_hold.as_deref() == Some("ON") {
2794        return Some("AccessDenied");
2795    }
2796    // Retention check
2797    if let (Some(mode), Some(until)) = (&obj.lock_mode, &obj.lock_retain_until) {
2798        if *until > Utc::now() {
2799            if mode == "COMPLIANCE" {
2800                return Some("AccessDenied");
2801            }
2802            if mode == "GOVERNANCE" {
2803                let bypass = req
2804                    .headers
2805                    .get("x-amz-bypass-governance-retention")
2806                    .and_then(|v| v.to_str().ok())
2807                    .map(|s| s.eq_ignore_ascii_case("true"))
2808                    .unwrap_or(false);
2809                if !bypass {
2810                    return Some("AccessDenied");
2811                }
2812            }
2813        }
2814    }
2815    None
2816}
2817
2818#[cfg(test)]
2819mod tests;
2820
2821#[cfg(test)]
2822mod s3_detect_action_tests {
2823    //! Auth-mapping regressions for bug-hunt 2026-06-15 §5.3: ops that
2824    //! were unmapped (-> enforcement skipped) or mapped to the wrong
2825    //! action (-> the policy targeting them never applied).
2826    use super::s3_detect_action;
2827    use std::collections::HashMap;
2828
2829    fn q(pairs: &[(&str, &str)]) -> HashMap<String, String> {
2830        pairs
2831            .iter()
2832            .map(|(k, v)| (k.to_string(), v.to_string()))
2833            .collect()
2834    }
2835
2836    #[test]
2837    fn select_object_content_maps_to_get_object() {
2838        // POST /bucket/key?select&select-type=2 reads object data.
2839        let action = s3_detect_action(
2840            "POST",
2841            Some("bucket"),
2842            Some("key"),
2843            &q(&[("select", ""), ("select-type", "2")]),
2844        );
2845        assert_eq!(action, Some("GetObject"));
2846    }
2847
2848    #[test]
2849    fn write_get_object_response_is_detected() {
2850        // POST /WriteGetObjectResponse (no key) — Object Lambda data plane.
2851        let action = s3_detect_action("POST", Some("WriteGetObjectResponse"), None, &q(&[]));
2852        assert_eq!(action, Some("WriteGetObjectResponse"));
2853    }
2854
2855    #[test]
2856    fn rename_object_is_detected_with_key() {
2857        // PUT /bucket/newkey?renameObject carries a key — previously fell
2858        // through to PutObject so s3:RenameObject denies were ineffective.
2859        let action = s3_detect_action(
2860            "PUT",
2861            Some("bucket"),
2862            Some("newkey"),
2863            &q(&[("renameObject", "")]),
2864        );
2865        assert_eq!(action, Some("RenameObject"));
2866    }
2867
2868    #[test]
2869    fn update_object_encryption_maps_to_put_object() {
2870        // PUT /bucket/key?encryption changes object SSE; AWS gates this on
2871        // s3:PutObject (no distinct public action). Deliberate, not a
2872        // silent PutObject fall-through.
2873        let action = s3_detect_action(
2874            "PUT",
2875            Some("bucket"),
2876            Some("key"),
2877            &q(&[("encryption", "")]),
2878        );
2879        assert_eq!(action, Some("PutObject"));
2880    }
2881
2882    #[test]
2883    fn create_session_not_misdetected_as_list_objects() {
2884        // GET /bucket?session — previously fell through to ListObjects.
2885        let action = s3_detect_action("GET", Some("bucket"), None, &q(&[("session", "")]));
2886        assert_eq!(action, Some("CreateSession"));
2887    }
2888
2889    #[test]
2890    fn get_bucket_abac_not_misdetected_as_list_objects() {
2891        // GET /bucket?abac — previously fell through to ListObjects.
2892        let action = s3_detect_action("GET", Some("bucket"), None, &q(&[("abac", "")]));
2893        assert_eq!(action, Some("GetBucketAbacConfiguration"));
2894    }
2895
2896    #[test]
2897    fn put_bucket_abac_still_detected() {
2898        let action = s3_detect_action("PUT", Some("bucket"), None, &q(&[("abac", "")]));
2899        assert_eq!(action, Some("PutBucketAbacConfiguration"));
2900    }
2901}
2902
2903#[cfg(test)]
2904mod s3_iam_service_prefix_tests {
2905    //! §5.3: CreateSession and WriteGetObjectResponse are authorized under
2906    //! sibling IAM service prefixes (s3express / s3-object-lambda), not s3.
2907    use super::*;
2908    use parking_lot::RwLock;
2909    use std::sync::Arc;
2910
2911    fn make_service() -> S3Service {
2912        let state: SharedS3State = Arc::new(RwLock::new(
2913            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
2914        ));
2915        S3Service::new(state, Arc::new(DeliveryBus::new()))
2916    }
2917
2918    fn req(method: Method, path: &str, query: &[(&str, &str)]) -> AwsRequest {
2919        let path_segments: Vec<String> = path
2920            .split('/')
2921            .filter(|s| !s.is_empty())
2922            .map(|s| s.to_string())
2923            .collect();
2924        AwsRequest {
2925            service: "s3".to_string(),
2926            action: String::new(),
2927            region: "us-east-1".to_string(),
2928            account_id: "123456789012".to_string(),
2929            request_id: "rid".to_string(),
2930            headers: http::HeaderMap::new(),
2931            query_params: query
2932                .iter()
2933                .map(|(k, v)| (k.to_string(), v.to_string()))
2934                .collect(),
2935            body: bytes::Bytes::new(),
2936            body_stream: parking_lot::Mutex::new(None),
2937            path_segments,
2938            raw_path: path.to_string(),
2939            raw_query: String::new(),
2940            method,
2941            is_query_protocol: false,
2942            access_key_id: None,
2943            principal: None,
2944        }
2945    }
2946
2947    #[test]
2948    fn create_session_uses_s3express_prefix() {
2949        let svc = make_service();
2950        let action = svc
2951            .iam_action_for(&req(Method::GET, "/mybucket", &[("session", "")]))
2952            .expect("must be mapped");
2953        assert_eq!(action.service, "s3express");
2954        assert_eq!(action.action, "CreateSession");
2955        assert_eq!(action.action_string(), "s3express:CreateSession");
2956    }
2957
2958    #[test]
2959    fn write_get_object_response_uses_object_lambda_prefix() {
2960        let svc = make_service();
2961        let action = svc
2962            .iam_action_for(&req(Method::POST, "/WriteGetObjectResponse", &[]))
2963            .expect("must be mapped");
2964        assert_eq!(action.service, "s3-object-lambda");
2965        assert_eq!(action.action, "WriteGetObjectResponse");
2966        assert_eq!(
2967            action.action_string(),
2968            "s3-object-lambda:WriteGetObjectResponse"
2969        );
2970    }
2971}
2972
2973#[cfg(test)]
2974mod extract_xml_value_tests {
2975    use super::extract_xml_value;
2976
2977    #[test]
2978    fn returns_inner_value() {
2979        assert_eq!(
2980            extract_xml_value("<Root><Key>value</Key></Root>", "Key"),
2981            Some("value".to_string())
2982        );
2983    }
2984
2985    #[test]
2986    fn missing_tag_is_none() {
2987        assert_eq!(extract_xml_value("<Root></Root>", "Key"), None);
2988    }
2989
2990    // bug-audit 2026-05-28, 2.2: a closing tag before the opening one used to
2991    // slice with end < start and panic; must return None instead.
2992    #[test]
2993    fn close_before_open_does_not_panic() {
2994        assert_eq!(extract_xml_value("</Key>oops<Key>value", "Key"), None);
2995    }
2996
2997    #[test]
2998    fn open_without_close_is_none() {
2999        assert_eq!(extract_xml_value("<Key>value", "Key"), None);
3000    }
3001}
3002
3003#[cfg(test)]
3004mod compute_checksum_tests {
3005    use super::{compute_checksum, compute_checksum_streaming};
3006
3007    // bug-audit 2026-06-15, 1.7: CopyObject / CompleteMultipartUpload routed
3008    // through the non-streaming compute_checksum, whose `_ =>` arm returned an
3009    // empty string for CRC32C / CRC64NVME -> GetObjectAttributes reported an
3010    // empty checksum and integrity checks silently broke.
3011    #[test]
3012    fn crc32c_is_non_empty_and_matches_known_vector() {
3013        // CRC32C of "123456789" is 0xE3069283; base64 of those 4 BE bytes.
3014        let out = compute_checksum("CRC32C", b"123456789");
3015        assert!(!out.is_empty());
3016        let bytes = base64::engine::general_purpose::STANDARD
3017            .decode(&out)
3018            .unwrap();
3019        assert_eq!(bytes, 0xE306_9283u32.to_be_bytes());
3020    }
3021
3022    #[test]
3023    fn crc64nvme_is_non_empty() {
3024        let out = compute_checksum("CRC64NVME", b"hello world");
3025        assert!(!out.is_empty());
3026        // 8 BE bytes of a u64.
3027        let bytes = base64::engine::general_purpose::STANDARD
3028            .decode(&out)
3029            .unwrap();
3030        assert_eq!(bytes.len(), 8);
3031    }
3032
3033    #[tokio::test]
3034    async fn non_streaming_matches_streaming_for_all_algorithms() {
3035        let data = b"the quick brown fox jumps over the lazy dog".repeat(100);
3036        let tmp = tempfile::NamedTempFile::new().unwrap();
3037        tokio::fs::write(tmp.path(), &data).await.unwrap();
3038
3039        for algo in ["CRC32", "CRC32C", "CRC64NVME", "SHA1", "SHA256"] {
3040            let direct = compute_checksum(algo, &data);
3041            let streamed = compute_checksum_streaming(algo, tmp.path()).await.unwrap();
3042            assert!(!direct.is_empty(), "{algo} direct empty");
3043            assert_eq!(direct, streamed, "{algo} direct != streaming");
3044        }
3045    }
3046
3047    use base64::Engine as _;
3048}