Skip to main content

fakecloud_s3/service/
mod.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use bytes::Bytes;
5use chrono::{DateTime, Timelike, Utc};
6use http::{HeaderMap, Method, StatusCode};
7use md5::{Digest, Md5};
8
9use fakecloud_aws::arn::Arn;
10use fakecloud_core::delivery::DeliveryBus;
11use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
12use fakecloud_kms::SharedKmsState;
13use fakecloud_persistence::{MemoryS3Store, S3Store, StoreError};
14
15use base64::engine::general_purpose::STANDARD as BASE64;
16use base64::Engine as _;
17
18use crate::logging;
19use crate::state::{AclGrant, S3Bucket, S3Object, SharedS3State};
20
21mod access_points;
22mod acl;
23mod buckets;
24pub(crate) mod config;
25mod lock;
26mod multipart;
27mod notifications;
28mod objects;
29mod tags;
30
31// Re-export notification helpers for use in sub-modules
32#[cfg(test)]
33use notifications::replicate_object;
34pub(super) use notifications::{
35    deliver_notifications, normalize_notification_ids, normalize_replication_xml,
36    replicate_through_store,
37};
38
39// Used only within this file (parse_cors_config)
40use notifications::extract_all_xml_values;
41
42// Re-exports used only in tests
43#[cfg(test)]
44use notifications::{
45    event_matches, key_matches_filters, parse_notification_config, parse_replication_rules,
46    NotificationTargetType,
47};
48
49pub struct S3Service {
50    state: SharedS3State,
51    delivery: Arc<DeliveryBus>,
52    kms_state: Option<SharedKmsState>,
53    pub(crate) kms_hook: Option<Arc<dyn fakecloud_core::delivery::KmsHook>>,
54    store: Arc<dyn S3Store>,
55}
56
57/// Map a [`StoreError`] from the persistence layer to a 500 InternalError
58/// response. Invoked at every mutation site when the write-through persistence
59/// call fails: the in-memory mutation has already happened, but we surface the
60/// failure to the client so they know to retry (and so logs/metrics flag it).
61pub(crate) fn persistence_error(err: StoreError) -> AwsServiceError {
62    AwsServiceError::aws_error(
63        StatusCode::INTERNAL_SERVER_ERROR,
64        "InternalError",
65        format!("persistence store error: {err}"),
66    )
67}
68
69/// Convert a filesystem IO error from a disk-backed body read into an
70/// InternalError response.
71pub(crate) fn io_to_aws(err: std::io::Error) -> AwsServiceError {
72    AwsServiceError::aws_error(
73        StatusCode::INTERNAL_SERVER_ERROR,
74        "InternalError",
75        format!("failed to read object body from disk: {err}"),
76    )
77}
78
79impl S3Service {
80    pub fn new(state: SharedS3State, delivery: Arc<DeliveryBus>) -> Self {
81        Self::with_store(state, delivery, Arc::new(MemoryS3Store::new()))
82    }
83
84    pub fn with_store(
85        state: SharedS3State,
86        delivery: Arc<DeliveryBus>,
87        store: Arc<dyn S3Store>,
88    ) -> Self {
89        Self {
90            state,
91            delivery,
92            kms_state: None,
93            kms_hook: None,
94            store,
95        }
96    }
97
98    pub fn with_kms(mut self, kms_state: SharedKmsState) -> Self {
99        self.kms_state = Some(kms_state);
100        self
101    }
102
103    pub fn with_kms_hook(mut self, hook: Arc<dyn fakecloud_core::delivery::KmsHook>) -> Self {
104        self.kms_hook = Some(hook);
105        self
106    }
107
108    /// Encrypt object body bytes for SSE-KMS storage. Returns ciphertext as
109    /// raw bytes (a UTF-8 fakecloud-kms envelope) on success.
110    ///
111    /// Fail-closed: if the KMS hook reports an error (key denied, key not
112    /// found, etc.), this returns `Err` so PutObject aborts with a 500
113    /// rather than silently storing plaintext. AWS S3 has the same
114    /// behavior — `KMS.NotFoundException` and friends surface as
115    /// `AccessDenied` / `KMS.*` errors back to the caller. When no hook is
116    /// wired (legacy / in-process tests with no KMS dependency), the
117    /// plaintext is returned unchanged so existing tests keep working.
118    pub(crate) fn encrypt_object_body(
119        &self,
120        account_id: &str,
121        region: &str,
122        bucket: &str,
123        plaintext: &[u8],
124        kms_key_id: Option<&str>,
125    ) -> Result<bytes::Bytes, AwsServiceError> {
126        let Some(hook) = &self.kms_hook else {
127            return Ok(bytes::Bytes::copy_from_slice(plaintext));
128        };
129        let key = kms_key_id.filter(|k| !k.is_empty()).unwrap_or("aws/s3");
130        let bucket_arn = Arn::s3(bucket).to_string();
131        let mut ctx = std::collections::HashMap::new();
132        ctx.insert("aws:s3:arn".to_string(), bucket_arn);
133        match hook.encrypt(account_id, region, key, plaintext, "s3.amazonaws.com", ctx) {
134            Ok(envelope) => Ok(bytes::Bytes::from(envelope.into_bytes())),
135            Err(err) => {
136                tracing::warn!(bucket = %bucket, error = %err, "SSE-KMS encrypt failed");
137                Err(AwsServiceError::aws_error(
138                    StatusCode::INTERNAL_SERVER_ERROR,
139                    "KMS.InternalFailureException",
140                    format!("Failed to encrypt object via KMS: {err}"),
141                ))
142            }
143        }
144    }
145
146    /// Decrypt object body bytes that were stored as a fakecloud-kms
147    /// envelope. Caller is expected to gate this on
148    /// `obj.sse_algorithm == Some("aws:kms")`.
149    ///
150    /// Fail-closed: when a hook is wired and the bytes look like an
151    /// envelope but don't decrypt (key revoked, malformed ciphertext),
152    /// this returns `Err` so GetObject surfaces a 500. When no hook is
153    /// wired, or the bytes aren't UTF-8 (legacy snapshots from before
154    /// the hook landed), the bytes are returned unchanged.
155    pub(crate) fn decrypt_object_body(
156        &self,
157        account_id: &str,
158        bucket: &str,
159        ciphertext: &[u8],
160    ) -> Result<bytes::Bytes, AwsServiceError> {
161        let Some(hook) = &self.kms_hook else {
162            return Ok(bytes::Bytes::copy_from_slice(ciphertext));
163        };
164        // Stored envelope is base64 ASCII; non-UTF-8 bytes are pre-hook
165        // legacy snapshots, return as-is.
166        let envelope = match std::str::from_utf8(ciphertext) {
167            Ok(s) => s,
168            Err(_) => return Ok(bytes::Bytes::copy_from_slice(ciphertext)),
169        };
170        let bucket_arn = Arn::s3(bucket).to_string();
171        let mut ctx = std::collections::HashMap::new();
172        ctx.insert("aws:s3:arn".to_string(), bucket_arn);
173        match hook.decrypt(account_id, envelope, "s3.amazonaws.com", ctx) {
174            Ok(bytes) => Ok(bytes::Bytes::from(bytes)),
175            Err(err) => {
176                tracing::warn!(bucket = %bucket, error = %err, "SSE-KMS decrypt failed");
177                Err(AwsServiceError::aws_error(
178                    StatusCode::INTERNAL_SERVER_ERROR,
179                    "KMS.InternalFailureException",
180                    format!("Failed to decrypt object via KMS: {err}"),
181                ))
182            }
183        }
184    }
185}
186
187#[async_trait]
188impl AwsService for S3Service {
189    fn service_name(&self) -> &str {
190        "s3"
191    }
192
193    async fn handle(&self, mut req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
194        // PutObject / UploadPart enter dispatch via the streaming path
195        // with `body = Bytes::new()` and the raw HTTP body available on
196        // `req.body_stream`. Those handlers consume the stream directly
197        // — spooling chunks to disk while computing MD5 + size in
198        // constant memory — so a 1 GiB upload never materializes into
199        // RAM. Every *other* PUT-on-bucket-key the streaming dispatch
200        // flagged (PutObjectTagging, PutObjectAcl, PutObjectRetention,
201        // PutObjectLegalHold, CopyObject, …) reads a small XML/JSON
202        // body from `req.body`, so drain the stream for them.
203        let is_put_to_key = req.method == Method::PUT
204            && req.path_segments.len() >= 2
205            && req
206                .path_segments
207                .first()
208                .map(|s| !s.is_empty())
209                .unwrap_or(false);
210        let q = &req.query_params;
211        let is_put_object = is_put_to_key
212            && !q.contains_key("tagging")
213            && !q.contains_key("acl")
214            && !q.contains_key("retention")
215            && !q.contains_key("legal-hold")
216            && !q.contains_key("renameObject")
217            && !q.contains_key("encryption")
218            && !req.headers.contains_key("x-amz-copy-source");
219        // UploadPart requires both partNumber AND uploadId; checking
220        // only partNumber would skip body draining for stray PUTs that
221        // happened to carry a partNumber query param without being a
222        // real multipart upload.
223        let is_upload_part =
224            is_put_to_key && q.contains_key("partNumber") && q.contains_key("uploadId");
225        if !is_put_object && !is_upload_part {
226            if let Some(stream) = req.take_body_stream() {
227                req.body = fakecloud_core::service::drain_request_stream(stream).await?;
228            }
229        }
230
231        // Access point data plane: resolve alias -> bucket before routing.
232        access_points::resolve_access_point(self, &mut req)?;
233
234        let account_id = req.account_id.as_str();
235
236        // S3 Control endpoint (access point management).
237        let host = req
238            .headers
239            .get("host")
240            .and_then(|v| v.to_str().ok())
241            .unwrap_or("");
242        let is_control = host.to_ascii_lowercase().contains("s3-control");
243        if is_control {
244            let v1 = req.path_segments.first().map(|s| s.as_str());
245            let v2 = req.path_segments.get(1).map(|s| s.as_str());
246            let v3 = req.path_segments.get(2).map(|s| s.as_str());
247            if v1 == Some("v20180820") && v2 == Some("accesspoint") {
248                if let Some(name) = v3 {
249                    match req.method {
250                        Method::PUT => return self.create_access_point(account_id, &req, name),
251                        Method::GET => return self.get_access_point(account_id, &req, name),
252                        Method::DELETE => return self.delete_access_point(account_id, &req, name),
253                        _ => {}
254                    }
255                } else if req.method == Method::GET {
256                    return self.list_access_points(account_id, &req);
257                }
258            }
259        }
260
261        // S3 REST routing: method + path segments + query params
262        //
263        // Reject paths that start with `//` (an empty bucket segment). The
264        // path-segment splitter further down filters empty segments out,
265        // which would otherwise let a malformed URL like `PUT //my-key`
266        // collapse to a valid `PUT /my-key` (CreateBucket) and silently
267        // succeed. Real S3 rejects empty bucket segments with
268        // InvalidBucketName, so mirror that.
269        if req.raw_path.starts_with("//") {
270            return Err(AwsServiceError::aws_error(
271                StatusCode::BAD_REQUEST,
272                "InvalidBucketName",
273                "The specified bucket is not valid: bucket name cannot be empty",
274            ));
275        }
276        let bucket = req.path_segments.first().map(|s| s.as_str());
277        // Extract key from the raw path to preserve leading slashes and empty segments.
278        // The raw path is like "/bucket/key/parts" — we strip the bucket prefix.
279        let key = if let Some(b) = bucket {
280            let prefix = format!("/{b}/");
281            if req.raw_path.starts_with(&prefix) && req.raw_path.len() > prefix.len() {
282                let raw_key = &req.raw_path[prefix.len()..];
283                Some(
284                    percent_encoding::percent_decode_str(raw_key)
285                        .decode_utf8_lossy()
286                        .into_owned(),
287                )
288            } else if req.path_segments.len() > 1 {
289                let raw = req.path_segments[1..].join("/");
290                Some(
291                    percent_encoding::percent_decode_str(&raw)
292                        .decode_utf8_lossy()
293                        .into_owned(),
294                )
295            } else {
296                None
297            }
298        } else {
299            None
300        };
301
302        // Multipart upload operations (checked before main match)
303        if let Some(b) = bucket {
304            // POST /{bucket}/{key}?uploads — CreateMultipartUpload
305            if req.method == Method::POST
306                && key.is_some()
307                && req.query_params.contains_key("uploads")
308            {
309                return self.create_multipart_upload(account_id, &req, b, key.as_deref().unwrap());
310            }
311
312            // POST /{bucket}/{key}?restore
313            if req.method == Method::POST
314                && key.is_some()
315                && req.query_params.contains_key("restore")
316            {
317                return self.restore_object(account_id, &req, b, key.as_deref().unwrap());
318            }
319
320            // POST /{bucket}/{key}?uploadId=X — CompleteMultipartUpload
321            if req.method == Method::POST && key.is_some() {
322                if let Some(upload_id) = req.query_params.get("uploadId").cloned() {
323                    return self.complete_multipart_upload(
324                        account_id,
325                        &req,
326                        b,
327                        key.as_deref().unwrap(),
328                        &upload_id,
329                    );
330                }
331            }
332
333            // PUT /{bucket}/{key}?partNumber=N&uploadId=X — UploadPart or UploadPartCopy
334            if req.method == Method::PUT && key.is_some() {
335                if let (Some(part_num_str), Some(upload_id)) = (
336                    req.query_params.get("partNumber").cloned(),
337                    req.query_params.get("uploadId").cloned(),
338                ) {
339                    if let Ok(part_number) = part_num_str.parse::<i64>() {
340                        if req.headers.contains_key("x-amz-copy-source") {
341                            return self.upload_part_copy(
342                                account_id,
343                                &req,
344                                b,
345                                key.as_deref().unwrap(),
346                                &upload_id,
347                                part_number,
348                            );
349                        }
350                        return self
351                            .upload_part(
352                                account_id,
353                                &req,
354                                b,
355                                key.as_deref().unwrap(),
356                                &upload_id,
357                                part_number,
358                            )
359                            .await;
360                    }
361                }
362            }
363
364            // DELETE /{bucket}/{key}?uploadId=X — AbortMultipartUpload
365            if req.method == Method::DELETE && key.is_some() {
366                if let Some(upload_id) = req.query_params.get("uploadId").cloned() {
367                    return self.abort_multipart_upload(
368                        account_id,
369                        b,
370                        key.as_deref().unwrap(),
371                        &upload_id,
372                    );
373                }
374            }
375
376            // GET /{bucket}?uploads — ListMultipartUploads
377            if req.method == Method::GET
378                && key.is_none()
379                && req.query_params.contains_key("uploads")
380            {
381                return self.list_multipart_uploads(account_id, b, &req.query_params);
382            }
383
384            // GET /{bucket}/{key}?uploadId=X — ListParts
385            if req.method == Method::GET && key.is_some() {
386                if let Some(upload_id) = req.query_params.get("uploadId").cloned() {
387                    return self.list_parts(
388                        account_id,
389                        &req,
390                        b,
391                        key.as_deref().unwrap(),
392                        &upload_id,
393                    );
394                }
395            }
396        }
397
398        // Handle OPTIONS preflight requests (CORS)
399        if req.method == Method::OPTIONS {
400            if let Some(b_name) = bucket {
401                let cors_config = {
402                    let accounts = self.state.read();
403                    let _empty_s3 = crate::state::S3State::new(&req.account_id, &req.region);
404                    let state = accounts.get(&req.account_id).unwrap_or(&_empty_s3);
405                    state
406                        .buckets
407                        .get(b_name)
408                        .and_then(|b| b.cors_config.clone())
409                };
410                if let Some(ref config) = cors_config {
411                    let origin = req
412                        .headers
413                        .get("origin")
414                        .and_then(|v| v.to_str().ok())
415                        .unwrap_or("");
416                    let request_method = req
417                        .headers
418                        .get("access-control-request-method")
419                        .and_then(|v| v.to_str().ok())
420                        .unwrap_or("");
421                    let rules = parse_cors_config(config);
422                    if let Some(rule) = find_cors_rule(&rules, origin, Some(request_method)) {
423                        let mut headers = HeaderMap::new();
424                        let matched_origin = if rule.allowed_origins.contains(&"*".to_string()) {
425                            "*"
426                        } else {
427                            origin
428                        };
429                        headers.insert(
430                            "access-control-allow-origin",
431                            matched_origin
432                                .parse()
433                                .unwrap_or_else(|_| http::HeaderValue::from_static("")),
434                        );
435                        headers.insert(
436                            "access-control-allow-methods",
437                            rule.allowed_methods
438                                .join(", ")
439                                .parse()
440                                .unwrap_or_else(|_| http::HeaderValue::from_static("")),
441                        );
442                        if !rule.allowed_headers.is_empty() {
443                            let ah = if rule.allowed_headers.contains(&"*".to_string()) {
444                                req.headers
445                                    .get("access-control-request-headers")
446                                    .and_then(|v| v.to_str().ok())
447                                    .unwrap_or("*")
448                                    .to_string()
449                            } else {
450                                rule.allowed_headers.join(", ")
451                            };
452                            headers.insert(
453                                "access-control-allow-headers",
454                                ah.parse()
455                                    .unwrap_or_else(|_| http::HeaderValue::from_static("")),
456                            );
457                        }
458                        if let Some(max_age) = rule.max_age_seconds {
459                            headers.insert(
460                                "access-control-max-age",
461                                max_age
462                                    .to_string()
463                                    .parse()
464                                    .unwrap_or_else(|_| http::HeaderValue::from_static("")),
465                            );
466                        }
467                        return Ok(AwsResponse {
468                            status: StatusCode::OK,
469                            content_type: String::new(),
470                            body: Bytes::new().into(),
471                            headers,
472                        });
473                    }
474                }
475                return Err(AwsServiceError::aws_error(
476                    StatusCode::FORBIDDEN,
477                    "CORSResponse",
478                    "CORS is not enabled for this bucket",
479                ));
480            }
481        }
482
483        // Capture origin for CORS response headers
484        let origin_header = req
485            .headers
486            .get("origin")
487            .and_then(|v| v.to_str().ok())
488            .map(|s| s.to_string());
489
490        // Bucket-scoped sub-resource query params. If a request targets one
491        // of these without a bucket in the path, S3 returns an error rather
492        // than silently treating it as a top-level (service) operation.
493        // Real AWS rejects e.g. `GET /?tagging` with a routing/validation
494        // error; mirroring that prevents bucket-required ops from being
495        // accidentally answered by ListBuckets / WriteGetObjectResponse.
496        let bucket_subresources: &[&str] = &[
497            "tagging",
498            "acl",
499            "versioning",
500            "cors",
501            "notification",
502            "website",
503            "accelerate",
504            "publicAccessBlock",
505            "encryption",
506            "lifecycle",
507            "logging",
508            "policy",
509            "policyStatus",
510            "replication",
511            "ownershipControls",
512            "inventory",
513            "analytics",
514            "intelligent-tiering",
515            "metrics",
516            "requestPayment",
517            "abac",
518            "metadataConfiguration",
519            "metadataTable",
520            "metadataInventoryTable",
521            "metadataJournalTable",
522            "object-lock",
523            "versions",
524            "session",
525            "retention",
526            "legal-hold",
527            "attributes",
528            "torrent",
529            "renameObject",
530            "uploads",
531            "restore",
532            "select",
533            // Bucket-required ops the earlier list missed: their query
534            // markers (e.g. `?location` for GetBucketLocation,
535            // `?delete` for DeleteObjects, `?list-type=2` for
536            // ListObjectsV2) signal a bucket op so a request with the
537            // marker but no bucket segment is malformed, not a service-
538            // level call.
539            "location",
540            "delete",
541            "list-type",
542        ];
543        if bucket.is_none()
544            && bucket_subresources
545                .iter()
546                .any(|q| req.query_params.contains_key(*q))
547        {
548            return Err(AwsServiceError::aws_error(
549                StatusCode::BAD_REQUEST,
550                "InvalidBucketName",
551                "The specified bucket is not valid: bucket name is required for this operation",
552            ));
553        }
554
555        let mut result = match (&req.method, bucket, key.as_deref()) {
556            // ListBuckets: GET /
557            (&Method::GET, None, None) => {
558                if req.query_params.get("x-id").map(|s| s.as_str()) == Some("ListDirectoryBuckets")
559                {
560                    self.list_directory_buckets(account_id, &req)
561                } else {
562                    self.list_buckets(account_id, &req)
563                }
564            }
565
566            // Bucket-level operations (no key)
567            (&Method::PUT, Some(b), None) => {
568                if req.query_params.contains_key("tagging") {
569                    self.put_bucket_tagging(account_id, &req, b)
570                } else if req.query_params.contains_key("acl") {
571                    self.put_bucket_acl(account_id, &req, b)
572                } else if req.query_params.contains_key("versioning") {
573                    self.put_bucket_versioning(account_id, &req, b)
574                } else if req.query_params.contains_key("cors") {
575                    self.put_bucket_cors(account_id, &req, b)
576                } else if req.query_params.contains_key("notification") {
577                    self.put_bucket_notification(account_id, &req, b)
578                } else if req.query_params.contains_key("website") {
579                    self.put_bucket_website(account_id, &req, b)
580                } else if req.query_params.contains_key("accelerate") {
581                    self.put_bucket_accelerate(account_id, &req, b)
582                } else if req.query_params.contains_key("publicAccessBlock") {
583                    self.put_public_access_block(account_id, &req, b)
584                } else if req.query_params.contains_key("encryption") {
585                    self.put_bucket_encryption(account_id, &req, b)
586                } else if req.query_params.contains_key("lifecycle") {
587                    self.put_bucket_lifecycle(account_id, &req, b)
588                } else if req.query_params.contains_key("logging") {
589                    self.put_bucket_logging(account_id, &req, b)
590                } else if req.query_params.contains_key("policy") {
591                    self.put_bucket_policy(account_id, &req, b)
592                } else if req.query_params.contains_key("object-lock") {
593                    self.put_object_lock_config(account_id, &req, b)
594                } else if req.query_params.contains_key("replication") {
595                    self.put_bucket_replication(account_id, &req, b)
596                } else if req.query_params.contains_key("ownershipControls") {
597                    self.put_bucket_ownership_controls(account_id, &req, b)
598                } else if req.query_params.contains_key("inventory") {
599                    self.put_bucket_inventory(account_id, &req, b)
600                } else if req.query_params.contains_key("analytics") {
601                    self.put_bucket_analytics_config(account_id, &req, b)
602                } else if req.query_params.contains_key("intelligent-tiering") {
603                    self.put_bucket_intelligent_tiering_config(account_id, &req, b)
604                } else if req.query_params.contains_key("metrics") {
605                    self.put_bucket_metrics_config(account_id, &req, b)
606                } else if req.query_params.contains_key("requestPayment") {
607                    self.put_bucket_request_payment(account_id, &req, b)
608                } else if req.query_params.contains_key("abac") {
609                    self.put_bucket_abac(account_id, &req, b)
610                } else if req.query_params.contains_key("metadataInventoryTable") {
611                    self.update_bucket_metadata_inventory_table(account_id, &req, b)
612                } else if req.query_params.contains_key("metadataJournalTable") {
613                    self.update_bucket_metadata_journal_table(account_id, &req, b)
614                } else {
615                    self.create_bucket(account_id, &req, b)
616                }
617            }
618            (&Method::DELETE, Some(b), None) => {
619                if req.query_params.contains_key("tagging") {
620                    self.delete_bucket_tagging(account_id, &req, b)
621                } else if req.query_params.contains_key("cors") {
622                    self.delete_bucket_cors(account_id, b)
623                } else if req.query_params.contains_key("website") {
624                    self.delete_bucket_website(account_id, b)
625                } else if req.query_params.contains_key("publicAccessBlock") {
626                    self.delete_public_access_block(account_id, b)
627                } else if req.query_params.contains_key("encryption") {
628                    self.delete_bucket_encryption(account_id, b)
629                } else if req.query_params.contains_key("lifecycle") {
630                    self.delete_bucket_lifecycle(account_id, b)
631                } else if req.query_params.contains_key("policy") {
632                    self.delete_bucket_policy(account_id, b)
633                } else if req.query_params.contains_key("replication") {
634                    self.delete_bucket_replication(account_id, b)
635                } else if req.query_params.contains_key("ownershipControls") {
636                    self.delete_bucket_ownership_controls(account_id, b)
637                } else if req.query_params.contains_key("inventory") {
638                    self.delete_bucket_inventory(account_id, &req, b)
639                } else if req.query_params.contains_key("analytics") {
640                    self.delete_bucket_analytics_config(account_id, &req, b)
641                } else if req.query_params.contains_key("intelligent-tiering") {
642                    self.delete_bucket_intelligent_tiering_config(account_id, &req, b)
643                } else if req.query_params.contains_key("metrics") {
644                    self.delete_bucket_metrics_config(account_id, &req, b)
645                } else if req.query_params.contains_key("metadataConfiguration") {
646                    self.delete_bucket_metadata_config(account_id, b)
647                } else if req.query_params.contains_key("metadataTable") {
648                    self.delete_bucket_metadata_table_config(account_id, b)
649                } else {
650                    self.delete_bucket(account_id, &req, b)
651                }
652            }
653            (&Method::HEAD, Some(b), None) => self.head_bucket(account_id, b),
654            (&Method::GET, Some(b), None) => {
655                if req.query_params.contains_key("tagging") {
656                    self.get_bucket_tagging(account_id, &req, b)
657                } else if req.query_params.contains_key("location") {
658                    self.get_bucket_location(account_id, b)
659                } else if req.query_params.contains_key("acl") {
660                    self.get_bucket_acl(account_id, &req, b)
661                } else if req.query_params.contains_key("versioning") {
662                    self.get_bucket_versioning(account_id, b)
663                } else if req.query_params.contains_key("versions") {
664                    self.list_object_versions(account_id, &req, b)
665                } else if req.query_params.contains_key("object-lock") {
666                    self.get_object_lock_configuration(account_id, b)
667                } else if req.query_params.contains_key("cors") {
668                    self.get_bucket_cors(account_id, b)
669                } else if req.query_params.contains_key("notification") {
670                    self.get_bucket_notification(account_id, b)
671                } else if req.query_params.contains_key("website") {
672                    self.get_bucket_website(account_id, b)
673                } else if req.query_params.contains_key("accelerate") {
674                    self.get_bucket_accelerate(account_id, b)
675                } else if req.query_params.contains_key("publicAccessBlock") {
676                    self.get_public_access_block(account_id, b)
677                } else if req.query_params.contains_key("encryption") {
678                    self.get_bucket_encryption(account_id, b)
679                } else if req.query_params.contains_key("lifecycle") {
680                    self.get_bucket_lifecycle(account_id, b)
681                } else if req.query_params.contains_key("logging") {
682                    self.get_bucket_logging(account_id, b)
683                } else if req.query_params.contains_key("policy") {
684                    self.get_bucket_policy(account_id, b)
685                } else if req.query_params.contains_key("replication") {
686                    self.get_bucket_replication(account_id, b)
687                } else if req.query_params.contains_key("ownershipControls") {
688                    self.get_bucket_ownership_controls(account_id, b)
689                } else if req.query_params.contains_key("inventory") {
690                    if req.query_params.contains_key("id") {
691                        self.get_bucket_inventory(account_id, &req, b)
692                    } else {
693                        self.list_bucket_inventory_configurations(account_id, b)
694                    }
695                } else if req.query_params.contains_key("analytics") {
696                    if req.query_params.contains_key("id") {
697                        self.get_bucket_analytics_config(account_id, &req, b)
698                    } else {
699                        self.list_bucket_analytics_configurations(account_id, b)
700                    }
701                } else if req.query_params.contains_key("intelligent-tiering") {
702                    if req.query_params.contains_key("id") {
703                        self.get_bucket_intelligent_tiering_config(account_id, &req, b)
704                    } else {
705                        self.list_bucket_intelligent_tiering_configurations(account_id, b)
706                    }
707                } else if req.query_params.contains_key("metrics") {
708                    if req.query_params.contains_key("id") {
709                        self.get_bucket_metrics_config(account_id, &req, b)
710                    } else {
711                        self.list_bucket_metrics_configurations(account_id, b)
712                    }
713                } else if req.query_params.contains_key("requestPayment") {
714                    self.get_bucket_request_payment(account_id, b)
715                } else if req.query_params.contains_key("abac") {
716                    self.get_bucket_abac(account_id, b)
717                } else if req.query_params.contains_key("policyStatus") {
718                    self.get_bucket_policy_status(account_id, b)
719                } else if req.query_params.contains_key("metadataConfiguration") {
720                    self.get_bucket_metadata_config(account_id, b)
721                } else if req.query_params.contains_key("metadataTable") {
722                    self.get_bucket_metadata_table_config(account_id, b)
723                } else if req.query_params.contains_key("session") {
724                    self.create_session(account_id, &req, b)
725                } else if req.query_params.get("list-type").map(|s| s.as_str()) == Some("2") {
726                    self.list_objects_v2(account_id, &req, b)
727                } else if req.query_params.is_empty() {
728                    // If bucket has website config and no query params, serve index document
729                    let website_config = {
730                        let accounts = self.state.read();
731                        let _empty_s3 = crate::state::S3State::new(&req.account_id, &req.region);
732                        let state = accounts.get(&req.account_id).unwrap_or(&_empty_s3);
733                        state
734                            .buckets
735                            .get(b)
736                            .and_then(|bkt| bkt.website_config.clone())
737                    };
738                    if let Some(ref config) = website_config {
739                        if let Some(index_doc) = extract_xml_value(config, "Suffix").or_else(|| {
740                            extract_xml_value(config, "IndexDocument").and_then(|inner| {
741                                let open = "<Suffix>";
742                                let close = "</Suffix>";
743                                let s = inner.find(open)? + open.len();
744                                let e = inner.find(close)?;
745                                Some(inner[s..e].trim().to_string())
746                            })
747                        }) {
748                            self.serve_website_object(account_id, &req, b, &index_doc, config)
749                        } else {
750                            self.list_objects_v1(account_id, &req, b)
751                        }
752                    } else {
753                        self.list_objects_v1(account_id, &req, b)
754                    }
755                } else {
756                    self.list_objects_v1(account_id, &req, b)
757                }
758            }
759
760            // Object-level operations
761            (&Method::PUT, Some(b), Some(k)) => {
762                if req.query_params.contains_key("tagging") {
763                    self.put_object_tagging(account_id, &req, b, k)
764                } else if req.query_params.contains_key("acl") {
765                    self.put_object_acl(account_id, &req, b, k)
766                } else if req.query_params.contains_key("retention") {
767                    self.put_object_retention(account_id, &req, b, k)
768                } else if req.query_params.contains_key("legal-hold") {
769                    self.put_object_legal_hold(account_id, &req, b, k)
770                } else if req.query_params.contains_key("renameObject") {
771                    self.rename_object(account_id, &req, b, k)
772                } else if req.query_params.contains_key("encryption") {
773                    self.update_object_encryption(account_id, &req, b, k)
774                } else if req.headers.contains_key("x-amz-copy-source") {
775                    self.copy_object(account_id, &req, b, k)
776                } else {
777                    self.put_object(account_id, &req, b, k).await
778                }
779            }
780            (&Method::GET, Some(b), Some(k)) => {
781                if req.query_params.contains_key("tagging") {
782                    self.get_object_tagging(account_id, &req, b, k)
783                } else if req.query_params.contains_key("acl") {
784                    self.get_object_acl(account_id, &req, b, k)
785                } else if req.query_params.contains_key("retention") {
786                    self.get_object_retention(account_id, &req, b, k)
787                } else if req.query_params.contains_key("legal-hold") {
788                    self.get_object_legal_hold(account_id, &req, b, k)
789                } else if req.query_params.contains_key("attributes") {
790                    self.get_object_attributes(account_id, &req, b, k)
791                } else if req.query_params.contains_key("torrent") {
792                    self.get_object_torrent(account_id, &req, b, k)
793                } else {
794                    let result = self.get_object(account_id, &req, b, k);
795                    // If object not found and bucket has website config, serve error document
796                    let is_not_found = matches!(
797                        &result,
798                        Err(e) if e.code() == "NoSuchKey"
799                    );
800                    if is_not_found {
801                        let website_config = {
802                            let accounts = self.state.read();
803                            let _empty_s3 =
804                                crate::state::S3State::new(&req.account_id, &req.region);
805                            let state = accounts.get(&req.account_id).unwrap_or(&_empty_s3);
806                            state
807                                .buckets
808                                .get(b)
809                                .and_then(|bkt| bkt.website_config.clone())
810                        };
811                        if let Some(ref config) = website_config {
812                            if let Some(error_key) = extract_xml_value(config, "ErrorDocument")
813                                .and_then(|inner| {
814                                    let open = "<Key>";
815                                    let close = "</Key>";
816                                    let s = inner.find(open)? + open.len();
817                                    let e = inner.find(close)?;
818                                    Some(inner[s..e].trim().to_string())
819                                })
820                                .or_else(|| extract_xml_value(config, "Key"))
821                            {
822                                return self.serve_website_error(account_id, &req, b, &error_key);
823                            }
824                        }
825                    }
826                    result
827                }
828            }
829            (&Method::DELETE, Some(b), Some(k)) => {
830                if req.query_params.contains_key("tagging") {
831                    self.delete_object_tagging(account_id, b, k)
832                } else {
833                    self.delete_object(account_id, &req, b, k)
834                }
835            }
836            (&Method::HEAD, Some(b), Some(k)) => self.head_object(account_id, &req, b, k),
837
838            // POST /{bucket}?delete — batch delete
839            (&Method::POST, Some(b), None) if req.query_params.contains_key("delete") => {
840                self.delete_objects(account_id, &req, b)
841            }
842            (&Method::POST, Some(b), None)
843                if req.query_params.contains_key("metadataConfiguration") =>
844            {
845                self.create_bucket_metadata_config(account_id, &req, b)
846            }
847            (&Method::POST, Some(b), None) if req.query_params.contains_key("metadataTable") => {
848                self.create_bucket_metadata_table_config(account_id, &req, b)
849            }
850            (&Method::POST, Some(b), Some(k))
851                if req.query_params.get("select-type").map(|s| s.as_str()) == Some("2") =>
852            {
853                self.select_object_content(account_id, &req, b, k)
854            }
855            (&Method::POST, Some("WriteGetObjectResponse"), None) => {
856                self.write_get_object_response(account_id, &req)
857            }
858
859            _ => Err(AwsServiceError::aws_error(
860                StatusCode::METHOD_NOT_ALLOWED,
861                "MethodNotAllowed",
862                "The specified method is not allowed against this resource",
863            )),
864        };
865
866        // Apply CORS headers to the response if Origin was present
867        if let (Some(ref origin), Some(b_name)) = (&origin_header, bucket) {
868            let cors_config = {
869                let accounts = self.state.read();
870                let _empty_s3 = crate::state::S3State::new(&req.account_id, &req.region);
871                let state = accounts.get(&req.account_id).unwrap_or(&_empty_s3);
872                state
873                    .buckets
874                    .get(b_name)
875                    .and_then(|b| b.cors_config.clone())
876            };
877            if let Some(ref config) = cors_config {
878                let rules = parse_cors_config(config);
879                if let Some(rule) = find_cors_rule(&rules, origin, None) {
880                    if let Ok(ref mut resp) = result {
881                        let matched_origin = if rule.allowed_origins.contains(&"*".to_string()) {
882                            "*"
883                        } else {
884                            origin
885                        };
886                        resp.headers.insert(
887                            "access-control-allow-origin",
888                            matched_origin
889                                .parse()
890                                .unwrap_or_else(|_| http::HeaderValue::from_static("")),
891                        );
892                        if !rule.expose_headers.is_empty() {
893                            resp.headers.insert(
894                                "access-control-expose-headers",
895                                rule.expose_headers
896                                    .join(", ")
897                                    .parse()
898                                    .unwrap_or_else(|_| http::HeaderValue::from_static("")),
899                            );
900                        }
901                    }
902                }
903            }
904        }
905
906        // Write S3 access log entry if the source bucket has logging enabled
907        if let Some(b_name) = bucket {
908            let status_code = match &result {
909                Ok(resp) => resp.status.as_u16(),
910                Err(e) => e.status().as_u16(),
911            };
912            let op = logging::operation_name(&req.method, key.as_deref());
913            logging::maybe_write_access_log(
914                &self.state,
915                &self.store,
916                b_name,
917                &logging::AccessLogRequest {
918                    operation: op,
919                    key: key.as_deref(),
920                    status: status_code,
921                    request_id: &req.request_id,
922                    method: req.method.as_str(),
923                    path: &req.raw_path,
924                },
925            );
926        }
927
928        result
929    }
930
931    fn supported_actions(&self) -> &[&str] {
932        &[
933            // Buckets
934            "ListBuckets",
935            "CreateBucket",
936            "DeleteBucket",
937            "HeadBucket",
938            "GetBucketLocation",
939            // Objects
940            "PutObject",
941            "GetObject",
942            "DeleteObject",
943            "HeadObject",
944            "CopyObject",
945            "DeleteObjects",
946            "ListObjectsV2",
947            "ListObjects",
948            "ListObjectVersions",
949            "GetObjectAttributes",
950            "RestoreObject",
951            // Object properties
952            "PutObjectTagging",
953            "GetObjectTagging",
954            "DeleteObjectTagging",
955            "PutObjectAcl",
956            "GetObjectAcl",
957            "PutObjectRetention",
958            "GetObjectRetention",
959            "PutObjectLegalHold",
960            "GetObjectLegalHold",
961            // Bucket configuration
962            "PutBucketTagging",
963            "GetBucketTagging",
964            "DeleteBucketTagging",
965            "PutBucketAcl",
966            "GetBucketAcl",
967            "PutBucketVersioning",
968            "GetBucketVersioning",
969            "PutBucketCors",
970            "GetBucketCors",
971            "DeleteBucketCors",
972            "PutBucketNotificationConfiguration",
973            "GetBucketNotificationConfiguration",
974            "PutBucketWebsite",
975            "GetBucketWebsite",
976            "DeleteBucketWebsite",
977            "PutBucketAccelerateConfiguration",
978            "GetBucketAccelerateConfiguration",
979            "PutPublicAccessBlock",
980            "GetPublicAccessBlock",
981            "DeletePublicAccessBlock",
982            "PutBucketEncryption",
983            "GetBucketEncryption",
984            "DeleteBucketEncryption",
985            "PutBucketLifecycleConfiguration",
986            "GetBucketLifecycleConfiguration",
987            "DeleteBucketLifecycle",
988            "PutBucketLogging",
989            "GetBucketLogging",
990            "PutBucketPolicy",
991            "GetBucketPolicy",
992            "DeleteBucketPolicy",
993            "PutObjectLockConfiguration",
994            "GetObjectLockConfiguration",
995            "PutBucketReplication",
996            "GetBucketReplication",
997            "DeleteBucketReplication",
998            "PutBucketOwnershipControls",
999            "GetBucketOwnershipControls",
1000            "DeleteBucketOwnershipControls",
1001            "PutBucketInventoryConfiguration",
1002            "GetBucketInventoryConfiguration",
1003            "DeleteBucketInventoryConfiguration",
1004            "ListBucketInventoryConfigurations",
1005            "PutBucketAnalyticsConfiguration",
1006            "GetBucketAnalyticsConfiguration",
1007            "DeleteBucketAnalyticsConfiguration",
1008            "ListBucketAnalyticsConfigurations",
1009            "PutBucketIntelligentTieringConfiguration",
1010            "GetBucketIntelligentTieringConfiguration",
1011            "DeleteBucketIntelligentTieringConfiguration",
1012            "ListBucketIntelligentTieringConfigurations",
1013            "PutBucketMetricsConfiguration",
1014            "GetBucketMetricsConfiguration",
1015            "DeleteBucketMetricsConfiguration",
1016            "ListBucketMetricsConfigurations",
1017            "PutBucketRequestPayment",
1018            "GetBucketRequestPayment",
1019            "PutBucketAbac",
1020            "GetBucketAbac",
1021            "GetBucketPolicyStatus",
1022            "CreateBucketMetadataConfiguration",
1023            "GetBucketMetadataConfiguration",
1024            "DeleteBucketMetadataConfiguration",
1025            "CreateBucketMetadataTableConfiguration",
1026            "GetBucketMetadataTableConfiguration",
1027            "DeleteBucketMetadataTableConfiguration",
1028            "UpdateBucketMetadataInventoryTableConfiguration",
1029            "UpdateBucketMetadataJournalTableConfiguration",
1030            "GetObjectTorrent",
1031            "RenameObject",
1032            "SelectObjectContent",
1033            "UpdateObjectEncryption",
1034            "WriteGetObjectResponse",
1035            "ListDirectoryBuckets",
1036            "CreateSession",
1037            // Multipart uploads
1038            "CreateMultipartUpload",
1039            "UploadPart",
1040            "UploadPartCopy",
1041            "CompleteMultipartUpload",
1042            "AbortMultipartUpload",
1043            "ListParts",
1044            "ListMultipartUploads",
1045        ]
1046    }
1047
1048    fn iam_enforceable(&self) -> bool {
1049        true
1050    }
1051
1052    /// S3 resources are either:
1053    /// - Bucket ARN (`arn:aws:s3:::bucket`) for bucket-level actions
1054    /// - Object ARN (`arn:aws:s3:::bucket/key`) for object-level actions
1055    /// - Wildcard (`*`) for `ListBuckets` which doesn't target a specific
1056    ///   resource
1057    ///
1058    /// S3 ARNs notably omit the account id and region — this is the one
1059    /// AWS service that carries neither in its ARN, because bucket names
1060    /// are globally unique.
1061    fn iam_action_for(&self, request: &AwsRequest) -> Option<fakecloud_core::auth::IamAction> {
1062        // S3 doesn't set `request.action` — the handler dispatches on
1063        // method + path + query params directly. Re-derive the action
1064        // name here so enforcement can match against IAM policies the
1065        // same way the real service would.
1066        let bucket = request.path_segments.first().map(|s| s.as_str());
1067        let key = if request.path_segments.len() > 1 {
1068            Some(request.path_segments[1..].join("/"))
1069        } else {
1070            None
1071        };
1072        let action = s3_detect_action(
1073            request.method.as_str(),
1074            bucket,
1075            key.as_deref(),
1076            &request.query_params,
1077        )?;
1078        // A handful of S3-adjacent ops are authorized under sibling IAM
1079        // service prefixes, not `s3:`. Select the right prefix so a policy
1080        // targeting the real action string actually matches.
1081        let service = match action {
1082            "CreateSession" | "ListAllMyDirectoryBuckets" => "s3express",
1083            "WriteGetObjectResponse" => "s3-object-lambda",
1084            _ => "s3",
1085        };
1086        let resource = s3_resource_for(action, bucket, key.as_deref());
1087        Some(fakecloud_core::auth::IamAction {
1088            service,
1089            action,
1090            resource,
1091        })
1092    }
1093
1094    fn iam_condition_keys_for(
1095        &self,
1096        request: &AwsRequest,
1097        action: &fakecloud_core::auth::IamAction,
1098    ) -> std::collections::BTreeMap<String, Vec<String>> {
1099        s3_condition_keys(action.action, &request.query_params)
1100    }
1101
1102    fn resource_tags_for(
1103        &self,
1104        resource_arn: &str,
1105    ) -> Option<std::collections::HashMap<String, String>> {
1106        s3_resource_tags(&self.state, resource_arn)
1107            .map(|m| m.into_iter().collect::<std::collections::HashMap<_, _>>())
1108    }
1109
1110    fn request_tags_from(
1111        &self,
1112        request: &AwsRequest,
1113        action: &str,
1114    ) -> Option<std::collections::HashMap<String, String>> {
1115        s3_request_tags(request, action)
1116            .map(|m| m.into_iter().collect::<std::collections::HashMap<_, _>>())
1117    }
1118}
1119
1120/// Extract service-specific IAM condition keys from an S3 request.
1121///
1122/// Today only `ListObjects` / `ListObjectsV2` expose keys (`s3:prefix`,
1123/// `s3:delimiter`, `s3:max-keys`) via their query params. Other actions
1124/// return an empty map so the evaluator's safe-fail semantics treat any
1125/// policy condition referencing an unknown key as "doesn't apply".
1126fn s3_condition_keys(
1127    action: &str,
1128    query: &std::collections::HashMap<String, String>,
1129) -> std::collections::BTreeMap<String, Vec<String>> {
1130    let mut out = std::collections::BTreeMap::new();
1131    if matches!(action, "ListObjects" | "ListObjectsV2") {
1132        // Both list variants share the same query param shape.
1133        if let Some(prefix) = query.get("prefix") {
1134            out.insert("s3:prefix".to_string(), vec![prefix.clone()]);
1135        }
1136        if let Some(delimiter) = query.get("delimiter") {
1137            out.insert("s3:delimiter".to_string(), vec![delimiter.clone()]);
1138        }
1139        if let Some(max_keys) = query.get("max-keys") {
1140            out.insert("s3:max-keys".to_string(), vec![max_keys.clone()]);
1141        }
1142    }
1143    out
1144}
1145
1146/// Look up resource tags for an S3 ARN.
1147///
1148/// Bucket-level ARN (`arn:aws:s3:::bucket`) -> bucket tags.
1149/// Object-level ARN (`arn:aws:s3:::bucket/key`) -> object tags.
1150/// `*` (ListBuckets) -> `Some(empty)` (no resource to tag).
1151fn s3_resource_tags(
1152    state: &SharedS3State,
1153    resource_arn: &str,
1154) -> Option<std::collections::BTreeMap<String, String>> {
1155    if resource_arn == "*" {
1156        return Some(std::collections::BTreeMap::new());
1157    }
1158    // S3 ARNs: arn:aws:s3:::bucket or arn:aws:s3:::bucket/key
1159    let after_prefix = resource_arn.strip_prefix("arn:aws:s3:::")?;
1160    let mas = state.read();
1161    // S3 bucket names are globally unique; scan all accounts to find the bucket
1162    let bucket_name = after_prefix.split('/').next().unwrap_or(after_prefix);
1163    let state = mas
1164        .find_account(|s| s.buckets.contains_key(bucket_name))
1165        .and_then(|id| mas.get(id))
1166        .or_else(|| Some(mas.default_ref()))?;
1167    if let Some(slash_pos) = after_prefix.find('/') {
1168        // Object-level: bucket/key
1169        let bucket_name = &after_prefix[..slash_pos];
1170        let key = &after_prefix[slash_pos + 1..];
1171        let bucket = state.buckets.get(bucket_name)?;
1172        // Try current objects first, then versioned objects (latest version)
1173        if let Some(obj) = bucket.objects.get(key) {
1174            Some(obj.tags.clone())
1175        } else if let Some(versions) = bucket.object_versions.get(key) {
1176            versions.last().map(|v| v.tags.clone())
1177        } else {
1178            // Object doesn't exist yet (e.g. PutObject creating it)
1179            Some(std::collections::BTreeMap::new())
1180        }
1181    } else {
1182        // Bucket-level
1183        let bucket = state.buckets.get(after_prefix)?;
1184        Some(bucket.tags.clone())
1185    }
1186}
1187
1188/// Extract tags from an S3 request body/headers.
1189///
1190/// S3 sends tags via:
1191/// - `x-amz-tagging` header (URL-encoded `key=value&...`) on PutObject
1192/// - XML body on PutBucketTagging / PutObjectTagging
1193fn s3_request_tags(
1194    request: &AwsRequest,
1195    action: &str,
1196) -> Option<std::collections::BTreeMap<String, String>> {
1197    match action {
1198        "PutObject" | "CopyObject" | "CreateMultipartUpload" => {
1199            // Tags come via x-amz-tagging header
1200            if let Some(tagging) = request.headers.get("x-amz-tagging") {
1201                let tags = parse_url_encoded_tags(tagging.to_str().unwrap_or(""));
1202                Some(tags.into_iter().collect())
1203            } else {
1204                Some(std::collections::BTreeMap::new())
1205            }
1206        }
1207        "PutBucketTagging" | "PutObjectTagging" => {
1208            // Tags come in XML body
1209            let body = std::str::from_utf8(&request.body).unwrap_or("");
1210            let tags = parse_tagging_xml(body);
1211            Some(tags.into_iter().collect())
1212        }
1213        _ => Some(std::collections::BTreeMap::new()),
1214    }
1215}
1216
1217/// Derive the IAM action name from an S3 REST request. Handles the
1218/// common cases (GetObject, PutObject, DeleteObject, ListObjectsV2,
1219/// CreateBucket, ...) plus a subset of sub-resource operations
1220/// (`?acl`, `?tagging`, `?versioning`, `?policy`, `?cors`, `?website`,
1221/// `?lifecycle`, `?encryption`, `?logging`, `?notification`, `?replication`,
1222/// `?ownershipControls`, `?publicAccessBlock`, `?accelerate`, `?inventory`,
1223/// `?object-lock`, `?uploads`, `?uploadId`).
1224///
1225/// Returns `None` for requests that don't map to a known action — the
1226/// dispatch layer then skips enforcement for that request rather than
1227/// guessing (and a warn log fires via the "service is iam_enforceable
1228/// but has no mapping" branch in dispatch.rs).
1229fn s3_detect_action(
1230    method: &str,
1231    bucket: Option<&str>,
1232    key: Option<&str>,
1233    query: &std::collections::HashMap<String, String>,
1234) -> Option<&'static str> {
1235    let has = |q: &str| query.contains_key(q);
1236    let is_get = method == "GET";
1237    let is_put = method == "PUT";
1238    let is_post = method == "POST";
1239    let is_delete = method == "DELETE";
1240
1241    // Service root
1242    if bucket.is_none() {
1243        return match method {
1244            // A directory-bucket listing (`GET /?x-id=ListDirectoryBuckets`) is
1245            // authorized under `s3express:ListAllMyDirectoryBuckets`, NOT
1246            // `s3:ListBuckets`. Detecting it as ListBuckets let an
1247            // `s3:ListBuckets` grant wrongly enumerate directory buckets and
1248            // denied a policy that only granted the s3express action (bug-hunt
1249            // 2026-06-24, 5.5).
1250            "GET" if query.get("x-id").map(|s| s.as_str()) == Some("ListDirectoryBuckets") => {
1251                Some("ListAllMyDirectoryBuckets")
1252            }
1253            "GET" => Some("ListBuckets"),
1254            _ => None,
1255        };
1256    }
1257
1258    // WriteGetObjectResponse is an Object Lambda data-plane op routed as
1259    // `POST /WriteGetObjectResponse` (the literal value occupies the bucket
1260    // segment). AWS authorizes it under the separate
1261    // `s3-object-lambda:WriteGetObjectResponse` action — handled in
1262    // `iam_action_for`, which selects the `s3-object-lambda` service prefix
1263    // for this op. Previously unmapped -> enforcement skipped.
1264    if is_post && bucket == Some("WriteGetObjectResponse") && key.is_none() {
1265        return Some("WriteGetObjectResponse");
1266    }
1267
1268    let has_key = key.is_some();
1269
1270    // Multipart sub-resource forms
1271    if has_key && is_post && has("uploads") {
1272        return Some("CreateMultipartUpload");
1273    }
1274    if has_key && is_post && has("uploadId") {
1275        return Some("CompleteMultipartUpload");
1276    }
1277    if has_key && is_put && has("partNumber") && has("uploadId") {
1278        return Some("UploadPart");
1279    }
1280    if has_key && is_delete && has("uploadId") {
1281        return Some("AbortMultipartUpload");
1282    }
1283    if has_key && is_get && has("uploadId") {
1284        return Some("ListParts");
1285    }
1286    if !has_key && is_get && has("uploads") {
1287        return Some("ListMultipartUploads");
1288    }
1289
1290    // Sub-resource-keyed actions (?acl, ?tagging, ...). Order matters
1291    // since a request can carry multiple; we pick the most specific.
1292    // Object-level sub-resources come first (key present).
1293    if has_key {
1294        if has("tagging") {
1295            return Some(match method {
1296                "GET" => "GetObjectTagging",
1297                "PUT" => "PutObjectTagging",
1298                "DELETE" => "DeleteObjectTagging",
1299                _ => return None,
1300            });
1301        }
1302        if has("acl") {
1303            return Some(match method {
1304                "GET" => "GetObjectAcl",
1305                "PUT" => "PutObjectAcl",
1306                _ => return None,
1307            });
1308        }
1309        if has("retention") {
1310            return Some(match method {
1311                "GET" => "GetObjectRetention",
1312                "PUT" => "PutObjectRetention",
1313                _ => return None,
1314            });
1315        }
1316        if has("legal-hold") {
1317            return Some(match method {
1318                "GET" => "GetObjectLegalHold",
1319                "PUT" => "PutObjectLegalHold",
1320                _ => return None,
1321            });
1322        }
1323        // Identified by cubic on PR #399: both ?attributes and ?restore
1324        // need method guards — otherwise e.g. GET /bucket/key?restore
1325        // would be classified as RestoreObject (POST-only in AWS) and
1326        // IAM-evaluated against s3:RestoreObject instead of s3:GetObject.
1327        if has("attributes") && is_get {
1328            return Some("GetObjectAttributes");
1329        }
1330        if has("restore") && is_post {
1331            return Some("RestoreObject");
1332        }
1333        // RenameObject is an object-level op (`PUT /bucket/newkey?renameObject`
1334        // + `x-amz-rename-source`). It carries a key, so it must be detected
1335        // here — the `!has_key` arm below never sees it. Without this it fell
1336        // through to `PutObject`, so a policy denying `s3:RenameObject` was
1337        // silently ineffective.
1338        if has("renameObject") && is_put {
1339            return Some("RenameObject");
1340        }
1341        // UpdateObjectEncryption (`PUT /bucket/key?encryption`) changes an
1342        // existing object's server-side encryption. AWS gates this under
1343        // `s3:PutObject` (encryption is a write attribute), and there is no
1344        // distinct public `s3:UpdateObjectEncryption` IAM action — so map to
1345        // PutObject deliberately rather than letting it fall through.
1346        if has("encryption") && is_put {
1347            return Some("PutObject");
1348        }
1349        // SelectObjectContent (`POST /bucket/key?select&select-type=2`) reads
1350        // object data, so AWS authorizes it with `s3:GetObject`. Previously
1351        // unmapped -> `_ => None` -> enforcement skipped entirely.
1352        if has("select-type") && is_post {
1353            return Some("GetObject");
1354        }
1355    }
1356
1357    // Bucket-level sub-resources (key absent).
1358    if !has_key {
1359        if has("tagging") {
1360            return Some(match method {
1361                "GET" => "GetBucketTagging",
1362                "PUT" => "PutBucketTagging",
1363                "DELETE" => "DeleteBucketTagging",
1364                _ => return None,
1365            });
1366        }
1367        if has("acl") {
1368            return Some(match method {
1369                "GET" => "GetBucketAcl",
1370                "PUT" => "PutBucketAcl",
1371                _ => return None,
1372            });
1373        }
1374        if has("versioning") {
1375            return Some(match method {
1376                "GET" => "GetBucketVersioning",
1377                "PUT" => "PutBucketVersioning",
1378                _ => return None,
1379            });
1380        }
1381        if has("cors") {
1382            return Some(match method {
1383                "GET" => "GetBucketCors",
1384                "PUT" => "PutBucketCors",
1385                "DELETE" => "DeleteBucketCors",
1386                _ => return None,
1387            });
1388        }
1389        if has("policy") {
1390            return Some(match method {
1391                "GET" => "GetBucketPolicy",
1392                "PUT" => "PutBucketPolicy",
1393                "DELETE" => "DeleteBucketPolicy",
1394                _ => return None,
1395            });
1396        }
1397        if has("website") {
1398            return Some(match method {
1399                "GET" => "GetBucketWebsite",
1400                "PUT" => "PutBucketWebsite",
1401                "DELETE" => "DeleteBucketWebsite",
1402                _ => return None,
1403            });
1404        }
1405        if has("lifecycle") {
1406            return Some(match method {
1407                "GET" => "GetBucketLifecycleConfiguration",
1408                "PUT" => "PutBucketLifecycleConfiguration",
1409                "DELETE" => "DeleteBucketLifecycle",
1410                _ => return None,
1411            });
1412        }
1413        if has("encryption") {
1414            return Some(match method {
1415                "GET" => "GetBucketEncryption",
1416                "PUT" => "PutBucketEncryption",
1417                "DELETE" => "DeleteBucketEncryption",
1418                _ => return None,
1419            });
1420        }
1421        if has("logging") {
1422            return Some(match method {
1423                "GET" => "GetBucketLogging",
1424                "PUT" => "PutBucketLogging",
1425                _ => return None,
1426            });
1427        }
1428        if has("notification") {
1429            return Some(match method {
1430                "GET" => "GetBucketNotificationConfiguration",
1431                "PUT" => "PutBucketNotificationConfiguration",
1432                _ => return None,
1433            });
1434        }
1435        if has("replication") {
1436            return Some(match method {
1437                "GET" => "GetBucketReplication",
1438                "PUT" => "PutBucketReplication",
1439                "DELETE" => "DeleteBucketReplication",
1440                _ => return None,
1441            });
1442        }
1443        if has("ownershipControls") {
1444            return Some(match method {
1445                "GET" => "GetBucketOwnershipControls",
1446                "PUT" => "PutBucketOwnershipControls",
1447                "DELETE" => "DeleteBucketOwnershipControls",
1448                _ => return None,
1449            });
1450        }
1451        if has("publicAccessBlock") {
1452            return Some(match method {
1453                "GET" => "GetPublicAccessBlock",
1454                "PUT" => "PutPublicAccessBlock",
1455                "DELETE" => "DeletePublicAccessBlock",
1456                _ => return None,
1457            });
1458        }
1459        if has("accelerate") {
1460            return Some(match method {
1461                "GET" => "GetBucketAccelerateConfiguration",
1462                "PUT" => "PutBucketAccelerateConfiguration",
1463                _ => return None,
1464            });
1465        }
1466        if has("inventory") {
1467            return Some(match method {
1468                "GET" => "GetBucketInventoryConfiguration",
1469                "PUT" => "PutBucketInventoryConfiguration",
1470                "DELETE" => "DeleteBucketInventoryConfiguration",
1471                _ => return None,
1472            });
1473        }
1474        if has("analytics") {
1475            return Some(match method {
1476                "GET" if has("id") => "GetBucketAnalyticsConfiguration",
1477                "GET" => "ListBucketAnalyticsConfigurations",
1478                "PUT" => "PutBucketAnalyticsConfiguration",
1479                "DELETE" => "DeleteBucketAnalyticsConfiguration",
1480                _ => return None,
1481            });
1482        }
1483        if has("intelligent-tiering") {
1484            return Some(match method {
1485                "GET" if has("id") => "GetBucketIntelligentTieringConfiguration",
1486                "GET" => "ListBucketIntelligentTieringConfigurations",
1487                "PUT" => "PutBucketIntelligentTieringConfiguration",
1488                "DELETE" => "DeleteBucketIntelligentTieringConfiguration",
1489                _ => return None,
1490            });
1491        }
1492        if has("metrics") {
1493            return Some(match method {
1494                "GET" if has("id") => "GetBucketMetricsConfiguration",
1495                "GET" => "ListBucketMetricsConfigurations",
1496                "PUT" => "PutBucketMetricsConfiguration",
1497                "DELETE" => "DeleteBucketMetricsConfiguration",
1498                _ => return None,
1499            });
1500        }
1501        if has("requestPayment") {
1502            return Some(match method {
1503                "GET" => "GetBucketRequestPayment",
1504                "PUT" => "PutBucketRequestPayment",
1505                _ => return None,
1506            });
1507        }
1508        if has("policyStatus") && is_get {
1509            return Some("GetBucketPolicyStatus");
1510        }
1511        if has("metadataConfiguration") {
1512            return Some(match method {
1513                "GET" => "GetBucketMetadataConfiguration",
1514                "POST" => "CreateBucketMetadataConfiguration",
1515                "DELETE" => "DeleteBucketMetadataConfiguration",
1516                _ => return None,
1517            });
1518        }
1519        if has("metadataTable") {
1520            return Some(match method {
1521                "GET" => "GetBucketMetadataTableConfiguration",
1522                "POST" => "CreateBucketMetadataTableConfiguration",
1523                "DELETE" => "DeleteBucketMetadataTableConfiguration",
1524                _ => return None,
1525            });
1526        }
1527        if has("metadataInventoryTable") && is_put {
1528            return Some("UpdateBucketMetadataInventoryTableConfiguration");
1529        }
1530        if has("metadataJournalTable") && is_put {
1531            return Some("UpdateBucketMetadataJournalTableConfiguration");
1532        }
1533        if has("abac") {
1534            // PUT  -> PutBucketAbacConfiguration
1535            // GET  -> GetBucketAbacConfiguration (previously fell through to
1536            //         the plain `GET bucket` arm and was IAM-evaluated as
1537            //         s3:ListObjects — a policy on the ABAC config was never
1538            //         honored).
1539            return Some(match method {
1540                "PUT" => "PutBucketAbacConfiguration",
1541                "GET" => "GetBucketAbacConfiguration",
1542                _ => return None,
1543            });
1544        }
1545        if has("session") && is_get {
1546            // CreateSession (S3 Express directory buckets). AWS authorizes it
1547            // under the separate `s3express:CreateSession` action — handled in
1548            // `iam_action_for`, which selects the `s3express` service prefix
1549            // for this op. Previously fell through to the plain `GET bucket`
1550            // arm and was evaluated as s3:ListObjects.
1551            return Some("CreateSession");
1552        }
1553        if has("object-lock") {
1554            return Some(match method {
1555                "GET" => "GetObjectLockConfiguration",
1556                "PUT" => "PutObjectLockConfiguration",
1557                _ => return None,
1558            });
1559        }
1560        if has("location") {
1561            return Some("GetBucketLocation");
1562        }
1563        if is_post && has("delete") {
1564            return Some("DeleteObjects");
1565        }
1566        if is_get && has("versions") {
1567            return Some("ListObjectVersions");
1568        }
1569    }
1570
1571    // GET on an object with `?torrent` is GetObjectTorrent, authorized under
1572    // s3:GetObjectTorrent -- not s3:GetObject. Without this arm a policy that
1573    // allows GetObject but not GetObjectTorrent would wrongly permit it
1574    // (bug-audit 2026-06-20, 5.3).
1575    if is_get && has_key && has("torrent") {
1576        return Some("GetObjectTorrent");
1577    }
1578
1579    // Plain bucket/object methods.
1580    match (method, has_key) {
1581        ("GET", true) => Some("GetObject"),
1582        ("PUT", true) => {
1583            // CopyObject uses x-amz-copy-source but we don't have headers
1584            // handy here — treat both PutObject and CopyObject as PutObject
1585            // for IAM purposes; CopyObject additionally requires
1586            // s3:GetObject on the source but that's evaluated per-request
1587            // by real AWS, not on the PUT call itself.
1588            Some("PutObject")
1589        }
1590        ("DELETE", true) => Some("DeleteObject"),
1591        ("HEAD", true) => Some("HeadObject"),
1592        ("GET", false) => {
1593            if query.contains_key("list-type") {
1594                Some("ListObjectsV2")
1595            } else {
1596                Some("ListObjects")
1597            }
1598        }
1599        ("PUT", false) => Some("CreateBucket"),
1600        ("DELETE", false) => Some("DeleteBucket"),
1601        ("HEAD", false) => Some("HeadBucket"),
1602        _ => None,
1603    }
1604}
1605
1606/// Build the S3 resource ARN for an action. Returns `*` for
1607/// `ListBuckets` (account-scoped), a bucket ARN for bucket-level
1608/// configuration actions, or an object ARN for object-level actions.
1609fn s3_resource_for(action: &'static str, bucket: Option<&str>, key: Option<&str>) -> String {
1610    // Object-level actions work on `bucket/key`.
1611    const OBJECT_ACTIONS: &[&str] = &[
1612        "PutObject",
1613        "GetObject",
1614        "DeleteObject",
1615        "HeadObject",
1616        "CopyObject",
1617        "GetObjectAttributes",
1618        "RestoreObject",
1619        "PutObjectTagging",
1620        "GetObjectTagging",
1621        "DeleteObjectTagging",
1622        "PutObjectAcl",
1623        "GetObjectAcl",
1624        "PutObjectRetention",
1625        "GetObjectRetention",
1626        "PutObjectLegalHold",
1627        "GetObjectLegalHold",
1628        "CreateMultipartUpload",
1629        "UploadPart",
1630        "UploadPartCopy",
1631        "CompleteMultipartUpload",
1632        "AbortMultipartUpload",
1633        "ListParts",
1634    ];
1635    if action == "ListBuckets" {
1636        return "*".to_string();
1637    }
1638    let Some(bucket) = bucket else {
1639        return "*".to_string();
1640    };
1641    if OBJECT_ACTIONS.contains(&action) {
1642        match key {
1643            Some(k) if !k.is_empty() => Arn::s3(&format!("{bucket}/{k}")).to_string(),
1644            _ => Arn::s3(&format!("{bucket}/*")).to_string(),
1645        }
1646    } else {
1647        // Bucket-level actions (ListObjectsV2, GetBucketTagging, ...).
1648        Arn::s3(bucket).to_string()
1649    }
1650}
1651
1652// Conditional request helpers
1653
1654/// Truncate a DateTime to second-level precision (HTTP dates have no sub-second info).
1655pub(crate) fn truncate_to_seconds(dt: DateTime<Utc>) -> DateTime<Utc> {
1656    dt.with_nanosecond(0).unwrap_or(dt)
1657}
1658
1659pub(crate) fn check_get_conditionals(
1660    req: &AwsRequest,
1661    obj: &S3Object,
1662) -> Result<(), AwsServiceError> {
1663    let obj_etag = format!("\"{}\"", obj.etag);
1664    let obj_time = truncate_to_seconds(obj.last_modified);
1665
1666    // If-Match
1667    if let Some(if_match) = req.headers.get("if-match").and_then(|v| v.to_str().ok()) {
1668        if !etag_matches(if_match, &obj_etag) {
1669            return Err(precondition_failed("If-Match"));
1670        }
1671    }
1672
1673    // If-None-Match
1674    if let Some(if_none_match) = req
1675        .headers
1676        .get("if-none-match")
1677        .and_then(|v| v.to_str().ok())
1678    {
1679        if etag_matches(if_none_match, &obj_etag) {
1680            return Err(not_modified_with_etag(&obj_etag));
1681        }
1682    }
1683
1684    // If-Unmodified-Since — RFC 7232 §3.4: ignored when If-Match is present.
1685    if req.headers.get("if-match").is_none() {
1686        if let Some(since) = req
1687            .headers
1688            .get("if-unmodified-since")
1689            .and_then(|v| v.to_str().ok())
1690        {
1691            if let Some(dt) = parse_http_date(since) {
1692                if obj_time > dt {
1693                    return Err(precondition_failed("If-Unmodified-Since"));
1694                }
1695            }
1696        }
1697    }
1698
1699    // If-Modified-Since — RFC 7232 §3.3: ignored when If-None-Match is present.
1700    if req.headers.get("if-none-match").is_none() {
1701        if let Some(since) = req
1702            .headers
1703            .get("if-modified-since")
1704            .and_then(|v| v.to_str().ok())
1705        {
1706            if let Some(dt) = parse_http_date(since) {
1707                if obj_time <= dt {
1708                    return Err(not_modified());
1709                }
1710            }
1711        }
1712    }
1713
1714    Ok(())
1715}
1716
1717pub(crate) fn check_head_conditionals(
1718    req: &AwsRequest,
1719    obj: &S3Object,
1720) -> Result<(), AwsServiceError> {
1721    let obj_etag = format!("\"{}\"", obj.etag);
1722    let obj_time = truncate_to_seconds(obj.last_modified);
1723
1724    // If-Match
1725    if let Some(if_match) = req.headers.get("if-match").and_then(|v| v.to_str().ok()) {
1726        if !etag_matches(if_match, &obj_etag) {
1727            return Err(AwsServiceError::aws_error(
1728                StatusCode::PRECONDITION_FAILED,
1729                "412",
1730                "Precondition Failed",
1731            ));
1732        }
1733    }
1734
1735    // If-None-Match
1736    if let Some(if_none_match) = req
1737        .headers
1738        .get("if-none-match")
1739        .and_then(|v| v.to_str().ok())
1740    {
1741        if etag_matches(if_none_match, &obj_etag) {
1742            return Err(not_modified_with_etag(&obj_etag));
1743        }
1744    }
1745
1746    // If-Unmodified-Since — RFC 7232 §3.4: ignored when If-Match is present.
1747    if req.headers.get("if-match").is_none() {
1748        if let Some(since) = req
1749            .headers
1750            .get("if-unmodified-since")
1751            .and_then(|v| v.to_str().ok())
1752        {
1753            if let Some(dt) = parse_http_date(since) {
1754                if obj_time > dt {
1755                    return Err(AwsServiceError::aws_error(
1756                        StatusCode::PRECONDITION_FAILED,
1757                        "412",
1758                        "Precondition Failed",
1759                    ));
1760                }
1761            }
1762        }
1763    }
1764
1765    // If-Modified-Since — RFC 7232 §3.3: ignored when If-None-Match is present.
1766    if req.headers.get("if-none-match").is_none() {
1767        if let Some(since) = req
1768            .headers
1769            .get("if-modified-since")
1770            .and_then(|v| v.to_str().ok())
1771        {
1772            if let Some(dt) = parse_http_date(since) {
1773                if obj_time <= dt {
1774                    return Err(not_modified());
1775                }
1776            }
1777        }
1778    }
1779
1780    Ok(())
1781}
1782
1783pub(crate) fn etag_matches(condition: &str, obj_etag: &str) -> bool {
1784    let condition = condition.trim();
1785    if condition == "*" {
1786        return true;
1787    }
1788    let clean_etag = obj_etag.replace('"', "");
1789    // Split on comma to handle multi-value If-Match / If-None-Match
1790    for part in condition.split(',') {
1791        let part = part.trim().replace('"', "");
1792        if part == clean_etag {
1793            return true;
1794        }
1795    }
1796    false
1797}
1798
1799pub(crate) fn parse_http_date(s: &str) -> Option<DateTime<Utc>> {
1800    // Try RFC 2822 format: "Sat, 01 Jan 2000 00:00:00 GMT"
1801    if let Ok(dt) = DateTime::parse_from_rfc2822(s) {
1802        return Some(dt.with_timezone(&Utc));
1803    }
1804    // Try RFC 3339
1805    if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
1806        return Some(dt.with_timezone(&Utc));
1807    }
1808    // Try common HTTP date format: "%a, %d %b %Y %H:%M:%S GMT"
1809    if let Ok(dt) =
1810        chrono::NaiveDateTime::parse_from_str(s.trim_end_matches(" GMT"), "%a, %d %b %Y %H:%M:%S")
1811    {
1812        return Some(dt.and_utc());
1813    }
1814    // Try ISO 8601
1815    if let Ok(dt) = s.parse::<DateTime<Utc>>() {
1816        return Some(dt);
1817    }
1818    None
1819}
1820
1821pub(crate) fn not_modified() -> AwsServiceError {
1822    AwsServiceError::aws_error(StatusCode::NOT_MODIFIED, "304", "Not Modified")
1823}
1824
1825pub(crate) fn not_modified_with_etag(etag: &str) -> AwsServiceError {
1826    AwsServiceError::aws_error_with_headers(
1827        StatusCode::NOT_MODIFIED,
1828        "304",
1829        "Not Modified",
1830        vec![("etag".to_string(), etag.to_string())],
1831    )
1832}
1833
1834pub(crate) fn precondition_failed(condition: &str) -> AwsServiceError {
1835    AwsServiceError::aws_error_with_fields(
1836        StatusCode::PRECONDITION_FAILED,
1837        "PreconditionFailed",
1838        "At least one of the pre-conditions you specified did not hold",
1839        vec![("Condition".to_string(), condition.to_string())],
1840    )
1841}
1842
1843// ACL helpers
1844
1845pub(crate) fn build_acl_xml(owner_id: &str, grants: &[AclGrant], _account_id: &str) -> String {
1846    let mut grants_xml = String::new();
1847    for g in grants {
1848        let grantee_xml = if g.grantee_type == "Group" {
1849            let uri = g.grantee_uri.as_deref().unwrap_or("");
1850            format!(
1851                "<Grantee xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xsi:type=\"Group\">\
1852                 <URI>{}</URI></Grantee>",
1853                xml_escape(uri),
1854            )
1855        } else {
1856            let id = g.grantee_id.as_deref().unwrap_or("");
1857            format!(
1858                "<Grantee xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xsi:type=\"CanonicalUser\">\
1859                 <ID>{}</ID></Grantee>",
1860                xml_escape(id),
1861            )
1862        };
1863        grants_xml.push_str(&format!(
1864            "<Grant>{grantee_xml}<Permission>{}</Permission></Grant>",
1865            xml_escape(&g.permission),
1866        ));
1867    }
1868
1869    format!(
1870        "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
1871         <AccessControlPolicy xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
1872         <Owner><ID>{owner_id}</ID><DisplayName>{owner_id}</DisplayName></Owner>\
1873         <AccessControlList>{grants_xml}</AccessControlList>\
1874         </AccessControlPolicy>",
1875        owner_id = xml_escape(owner_id),
1876    )
1877}
1878
1879pub(crate) fn canned_acl_grants(acl: &str, owner_id: &str) -> Vec<AclGrant> {
1880    let owner_grant = AclGrant {
1881        grantee_type: "CanonicalUser".to_string(),
1882        grantee_id: Some(owner_id.to_string()),
1883        grantee_display_name: Some(owner_id.to_string()),
1884        grantee_uri: None,
1885        permission: "FULL_CONTROL".to_string(),
1886    };
1887    match acl {
1888        "private" => vec![owner_grant],
1889        "public-read" => vec![
1890            owner_grant,
1891            AclGrant {
1892                grantee_type: "Group".to_string(),
1893                grantee_id: None,
1894                grantee_display_name: None,
1895                grantee_uri: Some("http://acs.amazonaws.com/groups/global/AllUsers".to_string()),
1896                permission: "READ".to_string(),
1897            },
1898        ],
1899        "public-read-write" => vec![
1900            owner_grant,
1901            AclGrant {
1902                grantee_type: "Group".to_string(),
1903                grantee_id: None,
1904                grantee_display_name: None,
1905                grantee_uri: Some("http://acs.amazonaws.com/groups/global/AllUsers".to_string()),
1906                permission: "READ".to_string(),
1907            },
1908            AclGrant {
1909                grantee_type: "Group".to_string(),
1910                grantee_id: None,
1911                grantee_display_name: None,
1912                grantee_uri: Some("http://acs.amazonaws.com/groups/global/AllUsers".to_string()),
1913                permission: "WRITE".to_string(),
1914            },
1915        ],
1916        "authenticated-read" => vec![
1917            owner_grant,
1918            AclGrant {
1919                grantee_type: "Group".to_string(),
1920                grantee_id: None,
1921                grantee_display_name: None,
1922                grantee_uri: Some(
1923                    "http://acs.amazonaws.com/groups/global/AuthenticatedUsers".to_string(),
1924                ),
1925                permission: "READ".to_string(),
1926            },
1927        ],
1928        "bucket-owner-full-control" => vec![owner_grant],
1929        _ => vec![owner_grant],
1930    }
1931}
1932
1933pub(crate) fn canned_acl_grants_for_object(acl: &str, owner_id: &str) -> Vec<AclGrant> {
1934    // For objects, canned ACLs work the same way
1935    canned_acl_grants(acl, owner_id)
1936}
1937
1938pub(crate) fn parse_grant_headers(headers: &HeaderMap) -> Vec<AclGrant> {
1939    let mut grants = Vec::new();
1940    let header_permission_map = [
1941        ("x-amz-grant-read", "READ"),
1942        ("x-amz-grant-write", "WRITE"),
1943        ("x-amz-grant-read-acp", "READ_ACP"),
1944        ("x-amz-grant-write-acp", "WRITE_ACP"),
1945        ("x-amz-grant-full-control", "FULL_CONTROL"),
1946    ];
1947
1948    for (header, permission) in &header_permission_map {
1949        if let Some(value) = headers.get(*header).and_then(|v| v.to_str().ok()) {
1950            // Parse "id=xxx" or "uri=xxx" or "emailAddress=xxx"
1951            for part in value.split(',') {
1952                let part = part.trim();
1953                if let Some((key, val)) = part.split_once('=') {
1954                    let val = val.trim().trim_matches('"');
1955                    let key = key.trim().to_lowercase();
1956                    match key.as_str() {
1957                        "id" => {
1958                            grants.push(AclGrant {
1959                                grantee_type: "CanonicalUser".to_string(),
1960                                grantee_id: Some(val.to_string()),
1961                                grantee_display_name: Some(val.to_string()),
1962                                grantee_uri: None,
1963                                permission: permission.to_string(),
1964                            });
1965                        }
1966                        "uri" | "url" => {
1967                            grants.push(AclGrant {
1968                                grantee_type: "Group".to_string(),
1969                                grantee_id: None,
1970                                grantee_display_name: None,
1971                                grantee_uri: Some(val.to_string()),
1972                                permission: permission.to_string(),
1973                            });
1974                        }
1975                        _ => {}
1976                    }
1977                }
1978            }
1979        }
1980    }
1981    grants
1982}
1983
1984pub(crate) fn parse_acl_xml(xml: &str) -> Result<Vec<AclGrant>, AwsServiceError> {
1985    // Check for Owner presence
1986    if xml.contains("<AccessControlPolicy") && !xml.contains("<Owner>") {
1987        return Err(AwsServiceError::aws_error(
1988            StatusCode::BAD_REQUEST,
1989            "MalformedACLError",
1990            "The XML you provided was not well-formed or did not validate against our published schema",
1991        ));
1992    }
1993
1994    let valid_permissions = ["READ", "WRITE", "READ_ACP", "WRITE_ACP", "FULL_CONTROL"];
1995
1996    let mut grants = Vec::new();
1997    let mut remaining = xml;
1998    while let Some(start) = remaining.find("<Grant>") {
1999        let after = &remaining[start + 7..];
2000        if let Some(end) = after.find("</Grant>") {
2001            let grant_body = &after[..end];
2002
2003            // Extract permission
2004            let permission = extract_xml_value(grant_body, "Permission").unwrap_or_default();
2005            if !valid_permissions.contains(&permission.as_str()) {
2006                return Err(AwsServiceError::aws_error(
2007                    StatusCode::BAD_REQUEST,
2008                    "MalformedACLError",
2009                    "The XML you provided was not well-formed or did not validate against our published schema",
2010                ));
2011            }
2012
2013            // Determine grantee type
2014            if grant_body.contains("xsi:type=\"Group\"") || grant_body.contains("<URI>") {
2015                let uri = extract_xml_value(grant_body, "URI").unwrap_or_default();
2016                grants.push(AclGrant {
2017                    grantee_type: "Group".to_string(),
2018                    grantee_id: None,
2019                    grantee_display_name: None,
2020                    grantee_uri: Some(uri),
2021                    permission,
2022                });
2023            } else {
2024                let id = extract_xml_value(grant_body, "ID").unwrap_or_default();
2025                let display =
2026                    extract_xml_value(grant_body, "DisplayName").unwrap_or_else(|| id.clone());
2027                grants.push(AclGrant {
2028                    grantee_type: "CanonicalUser".to_string(),
2029                    grantee_id: Some(id),
2030                    grantee_display_name: Some(display),
2031                    grantee_uri: None,
2032                    permission,
2033                });
2034            }
2035
2036            remaining = &after[end + 8..];
2037        } else {
2038            break;
2039        }
2040    }
2041    Ok(grants)
2042}
2043
2044// Range helpers
2045
2046pub(crate) enum RangeResult {
2047    Satisfiable { start: usize, end: usize },
2048    NotSatisfiable,
2049    Ignored,
2050}
2051
2052pub(crate) fn parse_range_header(range_str: &str, total_size: usize) -> Option<RangeResult> {
2053    let range_str = range_str.strip_prefix("bytes=")?;
2054    let (start_str, end_str) = range_str.split_once('-')?;
2055    if start_str.is_empty() {
2056        let suffix_len: usize = end_str.parse().ok()?;
2057        if suffix_len == 0 || total_size == 0 {
2058            return Some(RangeResult::NotSatisfiable);
2059        }
2060        let start = total_size.saturating_sub(suffix_len);
2061        Some(RangeResult::Satisfiable {
2062            start,
2063            end: total_size - 1,
2064        })
2065    } else {
2066        let start: usize = start_str.parse().ok()?;
2067        if start >= total_size {
2068            return Some(RangeResult::NotSatisfiable);
2069        }
2070        let end = if end_str.is_empty() {
2071            total_size - 1
2072        } else {
2073            let e: usize = end_str.parse().ok()?;
2074            if e < start {
2075                return Some(RangeResult::Ignored);
2076            }
2077            std::cmp::min(e, total_size - 1)
2078        };
2079        Some(RangeResult::Satisfiable { start, end })
2080    }
2081}
2082
2083// Helpers
2084
2085/// S3 XML response with `application/xml` content type (unlike Query protocol's `text/xml`).
2086pub(crate) fn s3_xml(status: StatusCode, body: impl Into<Bytes>) -> AwsResponse {
2087    AwsResponse {
2088        status,
2089        content_type: "application/xml".to_string(),
2090        body: body.into().into(),
2091        headers: HeaderMap::new(),
2092    }
2093}
2094
2095pub(crate) fn empty_response(status: StatusCode) -> AwsResponse {
2096    AwsResponse {
2097        status,
2098        content_type: "application/xml".to_string(),
2099        body: Bytes::new().into(),
2100        headers: HeaderMap::new(),
2101    }
2102}
2103
2104/// Returns true when the object is stored in a "cold" storage class (GLACIER, DEEP_ARCHIVE)
2105/// and has NOT been restored (or restore is still in progress).
2106pub(crate) fn is_frozen(obj: &S3Object) -> bool {
2107    matches!(obj.storage_class.as_str(), "GLACIER" | "DEEP_ARCHIVE")
2108        && obj.restore_ongoing != Some(false)
2109}
2110
2111pub(crate) fn no_such_bucket(bucket: &str) -> AwsServiceError {
2112    AwsServiceError::aws_error_with_fields(
2113        StatusCode::NOT_FOUND,
2114        "NoSuchBucket",
2115        "The specified bucket does not exist",
2116        vec![("BucketName".to_string(), bucket.to_string())],
2117    )
2118}
2119
2120pub(crate) fn no_such_key(key: &str) -> AwsServiceError {
2121    AwsServiceError::aws_error_with_fields(
2122        StatusCode::NOT_FOUND,
2123        "NoSuchKey",
2124        "The specified key does not exist.",
2125        vec![("Key".to_string(), key.to_string())],
2126    )
2127}
2128
2129pub(crate) fn no_such_upload(upload_id: &str) -> AwsServiceError {
2130    AwsServiceError::aws_error_with_fields(
2131        StatusCode::NOT_FOUND,
2132        "NoSuchUpload",
2133        "The specified upload does not exist. The upload ID may be invalid, \
2134         or the upload may have been aborted or completed.",
2135        vec![("UploadId".to_string(), upload_id.to_string())],
2136    )
2137}
2138
2139pub(crate) fn no_such_key_with_detail(key: &str) -> AwsServiceError {
2140    AwsServiceError::aws_error_with_fields(
2141        StatusCode::NOT_FOUND,
2142        "NoSuchKey",
2143        "The specified key does not exist.",
2144        vec![("Key".to_string(), key.to_string())],
2145    )
2146}
2147
2148pub(crate) fn compute_md5(data: &[u8]) -> String {
2149    let digest = Md5::digest(data);
2150    format!("{:x}", digest)
2151}
2152
2153pub(crate) fn compute_checksum(algorithm: &str, data: &[u8]) -> String {
2154    let raw = compute_checksum_raw(algorithm, data);
2155    if raw.is_empty() {
2156        String::new()
2157    } else {
2158        BASE64.encode(raw)
2159    }
2160}
2161
2162/// Compute the raw (un-base64'd) checksum digest bytes for `data`.
2163///
2164/// Returns the network-order digest bytes so callers can either base64
2165/// them ([`compute_checksum`]) or concatenate them when computing an S3
2166/// multipart COMPOSITE checksum (which hashes the concatenation of each
2167/// part's raw digest, then base64s the result and appends `-N`).
2168pub(crate) fn compute_checksum_raw(algorithm: &str, data: &[u8]) -> Vec<u8> {
2169    match algorithm {
2170        "CRC32" => crc32fast::hash(data).to_be_bytes().to_vec(),
2171        // CRC32C and CRC64NVME use the same crates as the streaming path so
2172        // CopyObject / CompleteMultipartUpload produce identical results to
2173        // a streaming PutObject (1.7); the COMPOSITE multipart checksum
2174        // concatenates these raw digests before hashing (1.18).
2175        "CRC32C" => crc32c::crc32c(data).to_be_bytes().to_vec(),
2176        "CRC64NVME" => {
2177            let mut hasher = crc64fast_nvme::Digest::new();
2178            hasher.write(data);
2179            hasher.sum64().to_be_bytes().to_vec()
2180        }
2181        "SHA1" => {
2182            use sha1::Digest as _;
2183            sha1::Sha1::digest(data).to_vec()
2184        }
2185        "SHA256" => {
2186            use sha2::Digest as _;
2187            sha2::Sha256::digest(data).to_vec()
2188        }
2189        _ => Vec::new(),
2190    }
2191}
2192
2193/// Compute an S3 multipart COMPOSITE additional checksum from each
2194/// part's raw digest bytes (in part-number order). AWS defines this as
2195/// `base64(HASH(concat(raw_part_digests)))-N` where HASH is the chosen
2196/// algorithm and N is the part count. Returns `None` for an unsupported
2197/// algorithm or empty part list.
2198pub(crate) fn compute_composite_checksum(
2199    algorithm: &str,
2200    part_raw_digests: &[Vec<u8>],
2201) -> Option<String> {
2202    if part_raw_digests.is_empty() {
2203        return None;
2204    }
2205    let mut concat = Vec::new();
2206    for d in part_raw_digests {
2207        concat.extend_from_slice(d);
2208    }
2209    let digest = compute_checksum_raw(algorithm, &concat);
2210    if digest.is_empty() {
2211        return None;
2212    }
2213    Some(format!(
2214        "{}-{}",
2215        BASE64.encode(digest),
2216        part_raw_digests.len()
2217    ))
2218}
2219
2220/// Streaming variant of [`compute_checksum`] for spool files. Reads
2221/// the file in 1 MiB chunks and feeds each chunk into the hasher so a
2222/// 1 GiB upload computes its CRC32 / SHA-1 / SHA-256 in constant
2223/// memory rather than via `tokio::fs::read` (which would allocate the
2224/// whole file as one buffer).
2225pub(crate) async fn compute_checksum_streaming(
2226    algorithm: &str,
2227    path: &std::path::Path,
2228) -> Result<String, std::io::Error> {
2229    use tokio::io::AsyncReadExt;
2230    let mut file = tokio::fs::File::open(path).await?;
2231    let mut buf = vec![0u8; 1024 * 1024];
2232    match algorithm {
2233        "CRC32" => {
2234            let mut hasher = crc32fast::Hasher::new();
2235            loop {
2236                let n = file.read(&mut buf).await?;
2237                if n == 0 {
2238                    break;
2239                }
2240                hasher.update(&buf[..n]);
2241            }
2242            Ok(BASE64.encode(hasher.finalize().to_be_bytes()))
2243        }
2244        "CRC32C" => {
2245            let mut crc: u32 = 0;
2246            loop {
2247                let n = file.read(&mut buf).await?;
2248                if n == 0 {
2249                    break;
2250                }
2251                crc = crc32c::crc32c_append(crc, &buf[..n]);
2252            }
2253            Ok(BASE64.encode(crc.to_be_bytes()))
2254        }
2255        "CRC64NVME" => {
2256            let mut hasher = crc64fast_nvme::Digest::new();
2257            loop {
2258                let n = file.read(&mut buf).await?;
2259                if n == 0 {
2260                    break;
2261                }
2262                hasher.write(&buf[..n]);
2263            }
2264            Ok(BASE64.encode(hasher.sum64().to_be_bytes()))
2265        }
2266        "SHA1" => {
2267            use sha1::Digest as _;
2268            let mut hasher = sha1::Sha1::new();
2269            loop {
2270                let n = file.read(&mut buf).await?;
2271                if n == 0 {
2272                    break;
2273                }
2274                hasher.update(&buf[..n]);
2275            }
2276            Ok(BASE64.encode(hasher.finalize()))
2277        }
2278        "SHA256" => {
2279            use sha2::Digest as _;
2280            let mut hasher = sha2::Sha256::new();
2281            loop {
2282                let n = file.read(&mut buf).await?;
2283                if n == 0 {
2284                    break;
2285                }
2286                hasher.update(&buf[..n]);
2287            }
2288            Ok(BASE64.encode(hasher.finalize()))
2289        }
2290        _ => Ok(String::new()),
2291    }
2292}
2293
2294pub(crate) fn url_encode_s3_key(s: &str) -> String {
2295    let mut out = String::with_capacity(s.len() * 2);
2296    for byte in s.bytes() {
2297        match byte {
2298            b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' | b'/' => {
2299                out.push(byte as char);
2300            }
2301            _ => {
2302                out.push_str(&format!("%{:02X}", byte));
2303            }
2304        }
2305    }
2306    out
2307}
2308
2309pub(crate) use fakecloud_aws::xml::xml_escape;
2310
2311pub(crate) fn extract_user_metadata(
2312    headers: &HeaderMap,
2313) -> std::collections::BTreeMap<String, String> {
2314    let mut meta = std::collections::BTreeMap::new();
2315    for (name, value) in headers {
2316        if let Some(key) = name.as_str().strip_prefix("x-amz-meta-") {
2317            if let Ok(v) = value.to_str() {
2318                meta.insert(key.to_string(), v.to_string());
2319            }
2320        }
2321    }
2322    meta
2323}
2324
2325pub(crate) fn is_valid_storage_class(class: &str) -> bool {
2326    matches!(
2327        class,
2328        "STANDARD"
2329            | "REDUCED_REDUNDANCY"
2330            | "STANDARD_IA"
2331            | "ONEZONE_IA"
2332            | "INTELLIGENT_TIERING"
2333            | "GLACIER"
2334            | "DEEP_ARCHIVE"
2335            | "GLACIER_IR"
2336            | "OUTPOSTS"
2337            | "SNOW"
2338            | "EXPRESS_ONEZONE"
2339    )
2340}
2341
2342pub(crate) fn is_valid_bucket_name(name: &str) -> bool {
2343    if name.len() < 3 || name.len() > 63 {
2344        return false;
2345    }
2346    // Must start and end with alphanumeric
2347    let bytes = name.as_bytes();
2348    if !bytes[0].is_ascii_alphanumeric() || !bytes[bytes.len() - 1].is_ascii_alphanumeric() {
2349        return false;
2350    }
2351    // Only lowercase letters, digits, hyphens, dots (also allow underscores for compatibility)
2352    name.chars()
2353        .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '-' || c == '.' || c == '_')
2354}
2355
2356pub(crate) fn is_valid_region(region: &str) -> bool {
2357    // Basic validation: region should match pattern like us-east-1, eu-west-2, etc.
2358    let valid_regions = [
2359        "us-east-1",
2360        "us-east-2",
2361        "us-west-1",
2362        "us-west-2",
2363        "af-south-1",
2364        "ap-east-1",
2365        "ap-south-1",
2366        "ap-south-2",
2367        "ap-southeast-1",
2368        "ap-southeast-2",
2369        "ap-southeast-3",
2370        "ap-southeast-4",
2371        "ap-northeast-1",
2372        "ap-northeast-2",
2373        "ap-northeast-3",
2374        "ca-central-1",
2375        "ca-west-1",
2376        "eu-central-1",
2377        "eu-central-2",
2378        "eu-west-1",
2379        "eu-west-2",
2380        "eu-west-3",
2381        "eu-south-1",
2382        "eu-south-2",
2383        "eu-north-1",
2384        "il-central-1",
2385        "me-south-1",
2386        "me-central-1",
2387        "sa-east-1",
2388        "cn-north-1",
2389        "cn-northwest-1",
2390        "us-gov-east-1",
2391        "us-gov-east-2",
2392        "us-gov-west-1",
2393        "us-iso-east-1",
2394        "us-iso-west-1",
2395        "us-isob-east-1",
2396        "us-isof-south-1",
2397    ];
2398    valid_regions.contains(&region)
2399}
2400
2401pub(crate) fn resolve_object<'a>(
2402    b: &'a S3Bucket,
2403    key: &str,
2404    version_id: Option<&String>,
2405) -> Result<&'a S3Object, AwsServiceError> {
2406    if let Some(vid) = version_id {
2407        // "null" version ID refers to an object with no version_id (pre-versioning)
2408        if vid == "null" {
2409            // Check versions for a pre-versioning object (version_id == None or Some("null"))
2410            if let Some(versions) = b.object_versions.get(key) {
2411                if let Some(obj) = versions
2412                    .iter()
2413                    .find(|o| o.version_id.is_none() || o.version_id.as_deref() == Some("null"))
2414                {
2415                    return Ok(obj);
2416                }
2417            }
2418            // Also check current object if it has no version_id
2419            if let Some(obj) = b.objects.get(key) {
2420                if obj.version_id.is_none() || obj.version_id.as_deref() == Some("null") {
2421                    return Ok(obj);
2422                }
2423            }
2424        } else {
2425            // When a specific versionId is requested, check versions first
2426            if let Some(versions) = b.object_versions.get(key) {
2427                if let Some(obj) = versions
2428                    .iter()
2429                    .find(|o| o.version_id.as_deref() == Some(vid.as_str()))
2430                {
2431                    return Ok(obj);
2432                }
2433            }
2434            // Also check current object
2435            if let Some(obj) = b.objects.get(key) {
2436                if obj.version_id.as_deref() == Some(vid.as_str()) {
2437                    return Ok(obj);
2438                }
2439            }
2440        }
2441        // For versioned buckets, return NoSuchVersion; for non-versioned, return 400
2442        if b.versioning.is_some() {
2443            Err(AwsServiceError::aws_error_with_fields(
2444                StatusCode::NOT_FOUND,
2445                "NoSuchVersion",
2446                "The specified version does not exist.",
2447                vec![
2448                    ("Key".to_string(), key.to_string()),
2449                    ("VersionId".to_string(), vid.to_string()),
2450                ],
2451            ))
2452        } else {
2453            Err(AwsServiceError::aws_error(
2454                StatusCode::BAD_REQUEST,
2455                "InvalidArgument",
2456                "Invalid version id specified",
2457            ))
2458        }
2459    } else {
2460        b.objects.get(key).ok_or_else(|| no_such_key(key))
2461    }
2462}
2463
2464pub(crate) fn make_delete_marker(key: &str, dm_id: &str) -> S3Object {
2465    S3Object {
2466        key: key.to_string(),
2467        last_modified: Utc::now(),
2468        storage_class: "STANDARD".to_string(),
2469        version_id: Some(dm_id.to_string()),
2470        is_delete_marker: true,
2471        ..Default::default()
2472    }
2473}
2474
2475/// Represents an object to delete in a batch delete request.
2476pub(crate) struct DeleteObjectEntry {
2477    key: String,
2478    version_id: Option<String>,
2479}
2480
2481pub(crate) fn parse_delete_objects_xml(xml: &str) -> Vec<DeleteObjectEntry> {
2482    let mut entries = Vec::new();
2483    let mut remaining = xml;
2484    while let Some(obj_start) = remaining.find("<Object>") {
2485        let after = &remaining[obj_start + 8..];
2486        if let Some(obj_end) = after.find("</Object>") {
2487            let obj_body = &after[..obj_end];
2488            let key = extract_xml_value(obj_body, "Key");
2489            let version_id = extract_xml_value(obj_body, "VersionId");
2490            if let Some(k) = key {
2491                entries.push(DeleteObjectEntry { key: k, version_id });
2492            }
2493            remaining = &after[obj_end + 9..];
2494        } else {
2495            break;
2496        }
2497    }
2498    entries
2499}
2500
2501/// Returns true when the request body's top-level <Quiet> element is "true".
2502/// AWS docs: when Quiet=true, only failed deletes are emitted.
2503pub(crate) fn parse_delete_objects_quiet(xml: &str) -> bool {
2504    extract_xml_value(xml, "Quiet")
2505        .map(|v| v.eq_ignore_ascii_case("true"))
2506        .unwrap_or(false)
2507}
2508
2509/// Minimal XML parser for `<Tagging><TagSet><Tag><Key>k</Key><Value>v</Value></Tag>...`.
2510/// Returns a Vec to preserve insertion order and detect duplicates.
2511pub(crate) fn parse_tagging_xml(xml: &str) -> Vec<(String, String)> {
2512    let mut tags = Vec::new();
2513    let mut remaining = xml;
2514    while let Some(tag_start) = remaining.find("<Tag>") {
2515        let after = &remaining[tag_start + 5..];
2516        if let Some(tag_end) = after.find("</Tag>") {
2517            let tag_body = &after[..tag_end];
2518            let key = extract_xml_value(tag_body, "Key");
2519            let value = extract_xml_value(tag_body, "Value");
2520            if let (Some(k), Some(v)) = (key, value) {
2521                tags.push((k, v));
2522            }
2523            remaining = &after[tag_end + 6..];
2524        } else {
2525            break;
2526        }
2527    }
2528    tags
2529}
2530
2531pub(crate) fn validate_tags(tags: &[(String, String)]) -> Result<(), AwsServiceError> {
2532    // Check for duplicate keys
2533    let mut seen = std::collections::HashSet::new();
2534    for (k, _) in tags {
2535        if !seen.insert(k.as_str()) {
2536            return Err(AwsServiceError::aws_error(
2537                StatusCode::BAD_REQUEST,
2538                "InvalidTag",
2539                "Cannot provide multiple Tags with the same key",
2540            ));
2541        }
2542        // Check for aws: prefix
2543        if k.starts_with("aws:") {
2544            return Err(AwsServiceError::aws_error(
2545                StatusCode::BAD_REQUEST,
2546                "InvalidTag",
2547                "System tags cannot be added/updated by requester",
2548            ));
2549        }
2550    }
2551    Ok(())
2552}
2553
2554pub(crate) fn extract_xml_value(xml: &str, tag: &str) -> Option<String> {
2555    // Handle self-closing tags like <Value /> or <Value/>
2556    let self_closing1 = format!("<{tag} />");
2557    let self_closing2 = format!("<{tag}/>");
2558    if xml.contains(&self_closing1) || xml.contains(&self_closing2) {
2559        // Check if the self-closing tag appears before any open+close pair
2560        let self_pos = xml
2561            .find(&self_closing1)
2562            .or_else(|| xml.find(&self_closing2));
2563        let open = format!("<{tag}>");
2564        let open_pos = xml.find(&open);
2565        match (self_pos, open_pos) {
2566            (Some(sp), Some(op)) if sp < op => return Some(String::new()),
2567            (Some(_), None) => return Some(String::new()),
2568            _ => {}
2569        }
2570    }
2571
2572    let open = format!("<{tag}>");
2573    let close = format!("</{tag}>");
2574    let start = xml.find(&open)? + open.len();
2575    // Search for the closing tag AFTER the opening one so a malformed body
2576    // (closing tag before opening) can't make end < start and panic the
2577    // slice (bug-audit 2026-05-28, 2.2).
2578    let end = xml[start..].find(&close)? + start;
2579    Some(xml[start..end].to_string())
2580}
2581
2582/// Parse the CompleteMultipartUpload XML body into (part_number, etag) pairs.
2583pub(crate) fn parse_complete_multipart_xml(xml: &str) -> Vec<(u32, String)> {
2584    let mut parts = Vec::new();
2585    let mut remaining = xml;
2586    while let Some(part_start) = remaining.find("<Part>") {
2587        let after = &remaining[part_start + 6..];
2588        if let Some(part_end) = after.find("</Part>") {
2589            let part_body = &after[..part_end];
2590            let part_num =
2591                extract_xml_value(part_body, "PartNumber").and_then(|s| s.parse::<u32>().ok());
2592            let etag = extract_xml_value(part_body, "ETag")
2593                .map(|s| s.replace("&quot;", "").replace('"', ""));
2594            if let (Some(num), Some(e)) = (part_num, etag) {
2595                parts.push((num, e));
2596            }
2597            remaining = &after[part_end + 7..];
2598        } else {
2599            break;
2600        }
2601    }
2602    parts
2603}
2604
2605pub(crate) fn parse_url_encoded_tags(s: &str) -> Vec<(String, String)> {
2606    let mut tags = Vec::new();
2607    for pair in s.split('&') {
2608        if pair.is_empty() {
2609            continue;
2610        }
2611        let (key, value) = match pair.find('=') {
2612            Some(pos) => (&pair[..pos], &pair[pos + 1..]),
2613            None => (pair, ""),
2614        };
2615        tags.push((
2616            percent_encoding::percent_decode_str(key)
2617                .decode_utf8_lossy()
2618                .to_string(),
2619            percent_encoding::percent_decode_str(value)
2620                .decode_utf8_lossy()
2621                .to_string(),
2622        ));
2623    }
2624    tags
2625}
2626
2627/// Validate lifecycle configuration XML. Returns MalformedXML on invalid configs.
2628pub(crate) fn validate_lifecycle_xml(xml: &str) -> Result<(), AwsServiceError> {
2629    let malformed = || {
2630        AwsServiceError::aws_error(
2631            StatusCode::BAD_REQUEST,
2632            "MalformedXML",
2633            "The XML you provided was not well-formed or did not validate against our published schema",
2634        )
2635    };
2636
2637    let mut remaining = xml;
2638    while let Some(rule_start) = remaining.find("<Rule>") {
2639        let after = &remaining[rule_start + 6..];
2640        if let Some(rule_end) = after.find("</Rule>") {
2641            let rule_body = &after[..rule_end];
2642
2643            // Must have Filter or Prefix
2644            let has_filter = rule_body.contains("<Filter>")
2645                || rule_body.contains("<Filter/>")
2646                || rule_body.contains("<Filter />");
2647
2648            // Check for <Prefix> at rule level (outside of <Filter>...</Filter>)
2649            let has_prefix_outside_filter = {
2650                if !rule_body.contains("<Prefix") {
2651                    false
2652                } else if !has_filter {
2653                    true // No filter means any Prefix is at rule level
2654                } else {
2655                    // Remove the Filter block and check if Prefix remains
2656                    let mut stripped = rule_body.to_string();
2657                    // Remove <Filter>...</Filter> or self-closing variants
2658                    if let Some(fs) = stripped.find("<Filter") {
2659                        if let Some(fe) = stripped.find("</Filter>") {
2660                            stripped = format!("{}{}", &stripped[..fs], &stripped[fe + 9..]);
2661                        }
2662                    }
2663                    stripped.contains("<Prefix")
2664                }
2665            };
2666
2667            if !has_filter && !has_prefix_outside_filter {
2668                return Err(malformed());
2669            }
2670            // Can't have both Filter and rule-level Prefix
2671            if has_filter && has_prefix_outside_filter {
2672                return Err(malformed());
2673            }
2674
2675            // Expiration: if has ExpiredObjectDeleteMarker, cannot also have Days or Date
2676            // (only check within <Expiration> block)
2677            if let Some(exp_start) = rule_body.find("<Expiration>") {
2678                if let Some(exp_end) = rule_body[exp_start..].find("</Expiration>") {
2679                    let exp_body = &rule_body[exp_start..exp_start + exp_end];
2680                    if exp_body.contains("<ExpiredObjectDeleteMarker>")
2681                        && (exp_body.contains("<Days>") || exp_body.contains("<Date>"))
2682                    {
2683                        return Err(malformed());
2684                    }
2685                }
2686            }
2687
2688            // Filter validation
2689            if has_filter {
2690                if let Some(fs) = rule_body.find("<Filter>") {
2691                    if let Some(fe) = rule_body.find("</Filter>") {
2692                        // `find` locates the two tags independently, so a body
2693                        // with `</Filter>` before `<Filter>` would slice with
2694                        // begin > end and panic (dropping the connection -- a
2695                        // reachable DoS). Reject as malformed instead.
2696                        if fe < fs + 8 {
2697                            return Err(malformed());
2698                        }
2699                        let filter_body = &rule_body[fs + 8..fe];
2700                        let has_prefix_in_filter = filter_body.contains("<Prefix");
2701                        let has_tag_in_filter = filter_body.contains("<Tag>");
2702                        let has_and_in_filter = filter_body.contains("<And>");
2703                        // Can't have both Prefix and Tag without And
2704                        if has_prefix_in_filter && has_tag_in_filter && !has_and_in_filter {
2705                            return Err(malformed());
2706                        }
2707                        // Can't have Tag and And simultaneously at the Filter level
2708                        if has_tag_in_filter && has_and_in_filter {
2709                            // Check if the <Tag> is outside <And>
2710                            let and_start = filter_body.find("<And>").unwrap_or(0);
2711                            let tag_pos = filter_body.find("<Tag>").unwrap_or(0);
2712                            if tag_pos < and_start {
2713                                return Err(malformed());
2714                            }
2715                        }
2716                    }
2717                }
2718            }
2719
2720            // NoncurrentVersionTransition must have NoncurrentDays and StorageClass
2721            if rule_body.contains("<NoncurrentVersionTransition>") {
2722                let mut nvt_remaining = rule_body;
2723                while let Some(nvt_start) = nvt_remaining.find("<NoncurrentVersionTransition>") {
2724                    let nvt_after = &nvt_remaining[nvt_start + 29..];
2725                    if let Some(nvt_end) = nvt_after.find("</NoncurrentVersionTransition>") {
2726                        let nvt_body = &nvt_after[..nvt_end];
2727                        if !nvt_body.contains("<NoncurrentDays>") {
2728                            return Err(malformed());
2729                        }
2730                        if !nvt_body.contains("<StorageClass>") {
2731                            return Err(malformed());
2732                        }
2733                        nvt_remaining = &nvt_after[nvt_end + 30..];
2734                    } else {
2735                        break;
2736                    }
2737                }
2738            }
2739
2740            remaining = &after[rule_end + 7..];
2741        } else {
2742            break;
2743        }
2744    }
2745
2746    Ok(())
2747}
2748
2749/// Parsed CORS rule from bucket configuration XML.
2750pub(crate) struct CorsRule {
2751    allowed_origins: Vec<String>,
2752    allowed_methods: Vec<String>,
2753    allowed_headers: Vec<String>,
2754    expose_headers: Vec<String>,
2755    max_age_seconds: Option<u32>,
2756}
2757
2758/// Parse CORS configuration XML into rules.
2759pub(crate) fn parse_cors_config(xml: &str) -> Vec<CorsRule> {
2760    let mut rules = Vec::new();
2761    let mut remaining = xml;
2762    while let Some(start) = remaining.find("<CORSRule>") {
2763        let after = &remaining[start + 10..];
2764        if let Some(end) = after.find("</CORSRule>") {
2765            let block = &after[..end];
2766            let allowed_origins = extract_all_xml_values(block, "AllowedOrigin");
2767            let allowed_methods = extract_all_xml_values(block, "AllowedMethod");
2768            let allowed_headers = extract_all_xml_values(block, "AllowedHeader");
2769            let expose_headers = extract_all_xml_values(block, "ExposeHeader");
2770            let max_age_seconds =
2771                extract_xml_value(block, "MaxAgeSeconds").and_then(|s| s.parse().ok());
2772            rules.push(CorsRule {
2773                allowed_origins,
2774                allowed_methods,
2775                allowed_headers,
2776                expose_headers,
2777                max_age_seconds,
2778            });
2779            remaining = &after[end + 11..];
2780        } else {
2781            break;
2782        }
2783    }
2784    rules
2785}
2786
2787/// Match an origin against a CORS allowed origin pattern (supports "*" wildcard).
2788pub(crate) fn origin_matches(origin: &str, pattern: &str) -> bool {
2789    if pattern == "*" {
2790        return true;
2791    }
2792    // Simple wildcard: *.example.com
2793    if let Some(suffix) = pattern.strip_prefix('*') {
2794        return origin.ends_with(suffix);
2795    }
2796    origin == pattern
2797}
2798
2799/// Find the matching CORS rule for a given origin and method.
2800pub(crate) fn find_cors_rule<'a>(
2801    rules: &'a [CorsRule],
2802    origin: &str,
2803    method: Option<&str>,
2804) -> Option<&'a CorsRule> {
2805    rules.iter().find(|rule| {
2806        let origin_ok = rule
2807            .allowed_origins
2808            .iter()
2809            .any(|o| origin_matches(origin, o));
2810        let method_ok = match method {
2811            Some(m) => rule.allowed_methods.iter().any(|am| am == m),
2812            None => true,
2813        };
2814        origin_ok && method_ok
2815    })
2816}
2817
2818/// Check if an object is locked (retention or legal hold) and should block mutation.
2819/// Returns an error string if locked, None if allowed.
2820pub(crate) fn check_object_lock_for_overwrite(
2821    obj: &S3Object,
2822    req: &AwsRequest,
2823) -> Option<&'static str> {
2824    // Legal hold blocks overwrite
2825    if obj.lock_legal_hold.as_deref() == Some("ON") {
2826        return Some("AccessDenied");
2827    }
2828    // Retention check
2829    if let (Some(mode), Some(until)) = (&obj.lock_mode, &obj.lock_retain_until) {
2830        if *until > Utc::now() {
2831            if mode == "COMPLIANCE" {
2832                return Some("AccessDenied");
2833            }
2834            if mode == "GOVERNANCE" {
2835                let bypass = req
2836                    .headers
2837                    .get("x-amz-bypass-governance-retention")
2838                    .and_then(|v| v.to_str().ok())
2839                    .map(|s| s.eq_ignore_ascii_case("true"))
2840                    .unwrap_or(false);
2841                if !bypass {
2842                    return Some("AccessDenied");
2843                }
2844            }
2845        }
2846    }
2847    None
2848}
2849
2850#[cfg(test)]
2851mod tests;
2852
2853#[cfg(test)]
2854mod s3_detect_action_tests {
2855    //! Auth-mapping regressions for bug-hunt 2026-06-15 §5.3: ops that
2856    //! were unmapped (-> enforcement skipped) or mapped to the wrong
2857    //! action (-> the policy targeting them never applied).
2858    use super::s3_detect_action;
2859    use std::collections::HashMap;
2860
2861    fn q(pairs: &[(&str, &str)]) -> HashMap<String, String> {
2862        pairs
2863            .iter()
2864            .map(|(k, v)| (k.to_string(), v.to_string()))
2865            .collect()
2866    }
2867
2868    #[test]
2869    fn select_object_content_maps_to_get_object() {
2870        // POST /bucket/key?select&select-type=2 reads object data.
2871        let action = s3_detect_action(
2872            "POST",
2873            Some("bucket"),
2874            Some("key"),
2875            &q(&[("select", ""), ("select-type", "2")]),
2876        );
2877        assert_eq!(action, Some("GetObject"));
2878    }
2879
2880    #[test]
2881    fn write_get_object_response_is_detected() {
2882        // POST /WriteGetObjectResponse (no key) — Object Lambda data plane.
2883        let action = s3_detect_action("POST", Some("WriteGetObjectResponse"), None, &q(&[]));
2884        assert_eq!(action, Some("WriteGetObjectResponse"));
2885    }
2886
2887    #[test]
2888    fn list_directory_buckets_is_distinct_from_list_buckets() {
2889        // GET /?x-id=ListDirectoryBuckets -> s3express action, not ListBuckets.
2890        let dir = s3_detect_action("GET", None, None, &q(&[("x-id", "ListDirectoryBuckets")]));
2891        assert_eq!(dir, Some("ListAllMyDirectoryBuckets"));
2892        // Plain GET / is still ListBuckets.
2893        let plain = s3_detect_action("GET", None, None, &q(&[]));
2894        assert_eq!(plain, Some("ListBuckets"));
2895    }
2896
2897    #[test]
2898    fn rename_object_is_detected_with_key() {
2899        // PUT /bucket/newkey?renameObject carries a key — previously fell
2900        // through to PutObject so s3:RenameObject denies were ineffective.
2901        let action = s3_detect_action(
2902            "PUT",
2903            Some("bucket"),
2904            Some("newkey"),
2905            &q(&[("renameObject", "")]),
2906        );
2907        assert_eq!(action, Some("RenameObject"));
2908    }
2909
2910    #[test]
2911    fn update_object_encryption_maps_to_put_object() {
2912        // PUT /bucket/key?encryption changes object SSE; AWS gates this on
2913        // s3:PutObject (no distinct public action). Deliberate, not a
2914        // silent PutObject fall-through.
2915        let action = s3_detect_action(
2916            "PUT",
2917            Some("bucket"),
2918            Some("key"),
2919            &q(&[("encryption", "")]),
2920        );
2921        assert_eq!(action, Some("PutObject"));
2922    }
2923
2924    #[test]
2925    fn create_session_not_misdetected_as_list_objects() {
2926        // GET /bucket?session — previously fell through to ListObjects.
2927        let action = s3_detect_action("GET", Some("bucket"), None, &q(&[("session", "")]));
2928        assert_eq!(action, Some("CreateSession"));
2929    }
2930
2931    #[test]
2932    fn get_bucket_abac_not_misdetected_as_list_objects() {
2933        // GET /bucket?abac — previously fell through to ListObjects.
2934        let action = s3_detect_action("GET", Some("bucket"), None, &q(&[("abac", "")]));
2935        assert_eq!(action, Some("GetBucketAbacConfiguration"));
2936    }
2937
2938    #[test]
2939    fn put_bucket_abac_still_detected() {
2940        let action = s3_detect_action("PUT", Some("bucket"), None, &q(&[("abac", "")]));
2941        assert_eq!(action, Some("PutBucketAbacConfiguration"));
2942    }
2943}
2944
2945#[cfg(test)]
2946mod s3_iam_service_prefix_tests {
2947    //! §5.3: CreateSession and WriteGetObjectResponse are authorized under
2948    //! sibling IAM service prefixes (s3express / s3-object-lambda), not s3.
2949    use super::*;
2950    use parking_lot::RwLock;
2951    use std::sync::Arc;
2952
2953    fn make_service() -> S3Service {
2954        let state: SharedS3State = Arc::new(RwLock::new(
2955            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
2956        ));
2957        S3Service::new(state, Arc::new(DeliveryBus::new()))
2958    }
2959
2960    fn req(method: Method, path: &str, query: &[(&str, &str)]) -> AwsRequest {
2961        let path_segments: Vec<String> = path
2962            .split('/')
2963            .filter(|s| !s.is_empty())
2964            .map(|s| s.to_string())
2965            .collect();
2966        AwsRequest {
2967            service: "s3".to_string(),
2968            action: String::new(),
2969            region: "us-east-1".to_string(),
2970            account_id: "123456789012".to_string(),
2971            request_id: "rid".to_string(),
2972            headers: http::HeaderMap::new(),
2973            query_params: query
2974                .iter()
2975                .map(|(k, v)| (k.to_string(), v.to_string()))
2976                .collect(),
2977            body: bytes::Bytes::new(),
2978            body_stream: parking_lot::Mutex::new(None),
2979            path_segments,
2980            raw_path: path.to_string(),
2981            raw_query: String::new(),
2982            method,
2983            is_query_protocol: false,
2984            access_key_id: None,
2985            principal: None,
2986        }
2987    }
2988
2989    #[test]
2990    fn create_session_uses_s3express_prefix() {
2991        let svc = make_service();
2992        let action = svc
2993            .iam_action_for(&req(Method::GET, "/mybucket", &[("session", "")]))
2994            .expect("must be mapped");
2995        assert_eq!(action.service, "s3express");
2996        assert_eq!(action.action, "CreateSession");
2997        assert_eq!(action.action_string(), "s3express:CreateSession");
2998    }
2999
3000    #[test]
3001    fn write_get_object_response_uses_object_lambda_prefix() {
3002        let svc = make_service();
3003        let action = svc
3004            .iam_action_for(&req(Method::POST, "/WriteGetObjectResponse", &[]))
3005            .expect("must be mapped");
3006        assert_eq!(action.service, "s3-object-lambda");
3007        assert_eq!(action.action, "WriteGetObjectResponse");
3008        assert_eq!(
3009            action.action_string(),
3010            "s3-object-lambda:WriteGetObjectResponse"
3011        );
3012    }
3013
3014    #[test]
3015    fn get_object_torrent_maps_to_its_own_action() {
3016        // GET object with `?torrent` must authorize under s3:GetObjectTorrent,
3017        // not s3:GetObject (bug-audit 2026-06-20, 5.3).
3018        let svc = make_service();
3019        let action = svc
3020            .iam_action_for(&req(Method::GET, "/mybucket/key.txt", &[("torrent", "")]))
3021            .expect("must be mapped");
3022        assert_eq!(action.action, "GetObjectTorrent");
3023        // A plain GET still maps to GetObject.
3024        let plain = svc
3025            .iam_action_for(&req(Method::GET, "/mybucket/key.txt", &[]))
3026            .expect("must be mapped");
3027        assert_eq!(plain.action, "GetObject");
3028    }
3029}
3030
3031#[cfg(test)]
3032mod extract_xml_value_tests {
3033    use super::extract_xml_value;
3034
3035    #[test]
3036    fn returns_inner_value() {
3037        assert_eq!(
3038            extract_xml_value("<Root><Key>value</Key></Root>", "Key"),
3039            Some("value".to_string())
3040        );
3041    }
3042
3043    #[test]
3044    fn missing_tag_is_none() {
3045        assert_eq!(extract_xml_value("<Root></Root>", "Key"), None);
3046    }
3047
3048    // bug-audit 2026-05-28, 2.2: a closing tag before the opening one used to
3049    // slice with end < start and panic; must return None instead.
3050    #[test]
3051    fn close_before_open_does_not_panic() {
3052        assert_eq!(extract_xml_value("</Key>oops<Key>value", "Key"), None);
3053    }
3054
3055    #[test]
3056    fn open_without_close_is_none() {
3057        assert_eq!(extract_xml_value("<Key>value", "Key"), None);
3058    }
3059}
3060
3061#[cfg(test)]
3062mod compute_checksum_tests {
3063    use super::{compute_checksum, compute_checksum_streaming};
3064
3065    // bug-audit 2026-06-15, 1.7: CopyObject / CompleteMultipartUpload routed
3066    // through the non-streaming compute_checksum, whose `_ =>` arm returned an
3067    // empty string for CRC32C / CRC64NVME -> GetObjectAttributes reported an
3068    // empty checksum and integrity checks silently broke.
3069    #[test]
3070    fn crc32c_is_non_empty_and_matches_known_vector() {
3071        // CRC32C of "123456789" is 0xE3069283; base64 of those 4 BE bytes.
3072        let out = compute_checksum("CRC32C", b"123456789");
3073        assert!(!out.is_empty());
3074        let bytes = base64::engine::general_purpose::STANDARD
3075            .decode(&out)
3076            .unwrap();
3077        assert_eq!(bytes, 0xE306_9283u32.to_be_bytes());
3078    }
3079
3080    #[test]
3081    fn crc64nvme_is_non_empty() {
3082        let out = compute_checksum("CRC64NVME", b"hello world");
3083        assert!(!out.is_empty());
3084        // 8 BE bytes of a u64.
3085        let bytes = base64::engine::general_purpose::STANDARD
3086            .decode(&out)
3087            .unwrap();
3088        assert_eq!(bytes.len(), 8);
3089    }
3090
3091    #[tokio::test]
3092    async fn non_streaming_matches_streaming_for_all_algorithms() {
3093        let data = b"the quick brown fox jumps over the lazy dog".repeat(100);
3094        let tmp = tempfile::NamedTempFile::new().unwrap();
3095        tokio::fs::write(tmp.path(), &data).await.unwrap();
3096
3097        for algo in ["CRC32", "CRC32C", "CRC64NVME", "SHA1", "SHA256"] {
3098            let direct = compute_checksum(algo, &data);
3099            let streamed = compute_checksum_streaming(algo, tmp.path()).await.unwrap();
3100            assert!(!direct.is_empty(), "{algo} direct empty");
3101            assert_eq!(direct, streamed, "{algo} direct != streaming");
3102        }
3103    }
3104
3105    use base64::Engine as _;
3106}