Skip to main content

fakecloud_s3/service/
mod.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use bytes::Bytes;
5use chrono::{DateTime, Timelike, Utc};
6use http::{HeaderMap, Method, StatusCode};
7use md5::{Digest, Md5};
8
9use fakecloud_aws::arn::Arn;
10use fakecloud_core::delivery::DeliveryBus;
11use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
12use fakecloud_kms::SharedKmsState;
13use fakecloud_persistence::{MemoryS3Store, S3Store, StoreError};
14
15use base64::engine::general_purpose::STANDARD as BASE64;
16use base64::Engine as _;
17
18use crate::logging;
19use crate::state::{AclGrant, S3Bucket, S3Object, SharedS3State};
20
21mod access_points;
22mod acl;
23mod buckets;
24pub(crate) mod config;
25mod lock;
26mod multipart;
27mod notifications;
28mod objects;
29mod tags;
30
31// Re-export notification helpers for use in sub-modules
32#[cfg(test)]
33use notifications::replicate_object;
34pub(super) use notifications::{
35    deliver_notifications, normalize_notification_ids, normalize_replication_xml,
36    replicate_through_store,
37};
38
39// Used only within this file (parse_cors_config)
40use notifications::extract_all_xml_values;
41
42// Re-exports used only in tests
43#[cfg(test)]
44use notifications::{
45    event_matches, key_matches_filters, parse_notification_config, parse_replication_rules,
46    NotificationTargetType,
47};
48
49pub struct S3Service {
50    state: SharedS3State,
51    delivery: Arc<DeliveryBus>,
52    kms_state: Option<SharedKmsState>,
53    pub(crate) kms_hook: Option<Arc<dyn fakecloud_core::delivery::KmsHook>>,
54    store: Arc<dyn S3Store>,
55}
56
57/// Map a [`StoreError`] from the persistence layer to a 500 InternalError
58/// response. Invoked at every mutation site when the write-through persistence
59/// call fails: the in-memory mutation has already happened, but we surface the
60/// failure to the client so they know to retry (and so logs/metrics flag it).
61pub(crate) fn persistence_error(err: StoreError) -> AwsServiceError {
62    AwsServiceError::aws_error(
63        StatusCode::INTERNAL_SERVER_ERROR,
64        "InternalError",
65        format!("persistence store error: {err}"),
66    )
67}
68
69/// Convert a filesystem IO error from a disk-backed body read into an
70/// InternalError response.
71pub(crate) fn io_to_aws(err: std::io::Error) -> AwsServiceError {
72    AwsServiceError::aws_error(
73        StatusCode::INTERNAL_SERVER_ERROR,
74        "InternalError",
75        format!("failed to read object body from disk: {err}"),
76    )
77}
78
79impl S3Service {
80    pub fn new(state: SharedS3State, delivery: Arc<DeliveryBus>) -> Self {
81        Self::with_store(state, delivery, Arc::new(MemoryS3Store::new()))
82    }
83
84    pub fn with_store(
85        state: SharedS3State,
86        delivery: Arc<DeliveryBus>,
87        store: Arc<dyn S3Store>,
88    ) -> Self {
89        Self {
90            state,
91            delivery,
92            kms_state: None,
93            kms_hook: None,
94            store,
95        }
96    }
97
98    pub fn with_kms(mut self, kms_state: SharedKmsState) -> Self {
99        self.kms_state = Some(kms_state);
100        self
101    }
102
103    pub fn with_kms_hook(mut self, hook: Arc<dyn fakecloud_core::delivery::KmsHook>) -> Self {
104        self.kms_hook = Some(hook);
105        self
106    }
107
108    /// Encrypt object body bytes for SSE-KMS storage. Returns ciphertext as
109    /// raw bytes (a UTF-8 fakecloud-kms envelope) on success.
110    ///
111    /// Fail-closed: if the KMS hook reports an error (key denied, key not
112    /// found, etc.), this returns `Err` so PutObject aborts with a 500
113    /// rather than silently storing plaintext. AWS S3 has the same
114    /// behavior — `KMS.NotFoundException` and friends surface as
115    /// `AccessDenied` / `KMS.*` errors back to the caller. When no hook is
116    /// wired (legacy / in-process tests with no KMS dependency), the
117    /// plaintext is returned unchanged so existing tests keep working.
118    pub(crate) fn encrypt_object_body(
119        &self,
120        account_id: &str,
121        region: &str,
122        bucket: &str,
123        plaintext: &[u8],
124        kms_key_id: Option<&str>,
125    ) -> Result<bytes::Bytes, AwsServiceError> {
126        let Some(hook) = &self.kms_hook else {
127            return Ok(bytes::Bytes::copy_from_slice(plaintext));
128        };
129        let key = kms_key_id.filter(|k| !k.is_empty()).unwrap_or("aws/s3");
130        let bucket_arn = Arn::s3(bucket).to_string();
131        let mut ctx = std::collections::HashMap::new();
132        ctx.insert("aws:s3:arn".to_string(), bucket_arn);
133        match hook.encrypt(account_id, region, key, plaintext, "s3.amazonaws.com", ctx) {
134            Ok(envelope) => Ok(bytes::Bytes::from(envelope.into_bytes())),
135            Err(err) => {
136                tracing::warn!(bucket = %bucket, error = %err, "SSE-KMS encrypt failed");
137                Err(AwsServiceError::aws_error(
138                    StatusCode::INTERNAL_SERVER_ERROR,
139                    "KMS.InternalFailureException",
140                    format!("Failed to encrypt object via KMS: {err}"),
141                ))
142            }
143        }
144    }
145
146    /// Decrypt object body bytes that were stored as a fakecloud-kms
147    /// envelope. Caller is expected to gate this on
148    /// `obj.sse_algorithm == Some("aws:kms")`.
149    ///
150    /// Fail-closed: when a hook is wired and the bytes look like an
151    /// envelope but don't decrypt (key revoked, malformed ciphertext),
152    /// this returns `Err` so GetObject surfaces a 500. When no hook is
153    /// wired, or the bytes aren't UTF-8 (legacy snapshots from before
154    /// the hook landed), the bytes are returned unchanged.
155    pub(crate) fn decrypt_object_body(
156        &self,
157        account_id: &str,
158        bucket: &str,
159        ciphertext: &[u8],
160    ) -> Result<bytes::Bytes, AwsServiceError> {
161        let Some(hook) = &self.kms_hook else {
162            return Ok(bytes::Bytes::copy_from_slice(ciphertext));
163        };
164        // Stored envelope is base64 ASCII; non-UTF-8 bytes are pre-hook
165        // legacy snapshots, return as-is.
166        let envelope = match std::str::from_utf8(ciphertext) {
167            Ok(s) => s,
168            Err(_) => return Ok(bytes::Bytes::copy_from_slice(ciphertext)),
169        };
170        let bucket_arn = Arn::s3(bucket).to_string();
171        let mut ctx = std::collections::HashMap::new();
172        ctx.insert("aws:s3:arn".to_string(), bucket_arn);
173        match hook.decrypt(account_id, envelope, "s3.amazonaws.com", ctx) {
174            Ok(bytes) => Ok(bytes::Bytes::from(bytes)),
175            Err(err) => {
176                tracing::warn!(bucket = %bucket, error = %err, "SSE-KMS decrypt failed");
177                Err(AwsServiceError::aws_error(
178                    StatusCode::INTERNAL_SERVER_ERROR,
179                    "KMS.InternalFailureException",
180                    format!("Failed to decrypt object via KMS: {err}"),
181                ))
182            }
183        }
184    }
185}
186
187#[async_trait]
188impl AwsService for S3Service {
189    fn service_name(&self) -> &str {
190        "s3"
191    }
192
193    async fn handle(&self, mut req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
194        // PutObject / UploadPart enter dispatch via the streaming path
195        // with `body = Bytes::new()` and the raw HTTP body available on
196        // `req.body_stream`. Those handlers consume the stream directly
197        // — spooling chunks to disk while computing MD5 + size in
198        // constant memory — so a 1 GiB upload never materializes into
199        // RAM. Every *other* PUT-on-bucket-key the streaming dispatch
200        // flagged (PutObjectTagging, PutObjectAcl, PutObjectRetention,
201        // PutObjectLegalHold, CopyObject, …) reads a small XML/JSON
202        // body from `req.body`, so drain the stream for them.
203        let is_put_to_key = req.method == Method::PUT
204            && req.path_segments.len() >= 2
205            && req
206                .path_segments
207                .first()
208                .map(|s| !s.is_empty())
209                .unwrap_or(false);
210        let q = &req.query_params;
211        let is_put_object = is_put_to_key
212            && !q.contains_key("tagging")
213            && !q.contains_key("acl")
214            && !q.contains_key("retention")
215            && !q.contains_key("legal-hold")
216            && !q.contains_key("renameObject")
217            && !q.contains_key("encryption")
218            && !req.headers.contains_key("x-amz-copy-source");
219        // UploadPart requires both partNumber AND uploadId; checking
220        // only partNumber would skip body draining for stray PUTs that
221        // happened to carry a partNumber query param without being a
222        // real multipart upload.
223        let is_upload_part =
224            is_put_to_key && q.contains_key("partNumber") && q.contains_key("uploadId");
225        if !is_put_object && !is_upload_part {
226            if let Some(stream) = req.take_body_stream() {
227                req.body = fakecloud_core::service::drain_request_stream(stream).await?;
228            }
229        }
230
231        // Access point data plane: resolve alias -> bucket before routing.
232        access_points::resolve_access_point(self, &mut req)?;
233
234        let account_id = req.account_id.as_str();
235
236        // S3 Control endpoint (access point management).
237        let host = req
238            .headers
239            .get("host")
240            .and_then(|v| v.to_str().ok())
241            .unwrap_or("");
242        let is_control = host.to_ascii_lowercase().contains("s3-control");
243        if is_control {
244            let v1 = req.path_segments.first().map(|s| s.as_str());
245            let v2 = req.path_segments.get(1).map(|s| s.as_str());
246            let v3 = req.path_segments.get(2).map(|s| s.as_str());
247            if v1 == Some("v20180820") && v2 == Some("accesspoint") {
248                if let Some(name) = v3 {
249                    match req.method {
250                        Method::PUT => return self.create_access_point(account_id, &req, name),
251                        Method::GET => return self.get_access_point(account_id, &req, name),
252                        Method::DELETE => return self.delete_access_point(account_id, &req, name),
253                        _ => {}
254                    }
255                } else if req.method == Method::GET {
256                    return self.list_access_points(account_id, &req);
257                }
258            }
259        }
260
261        // S3 REST routing: method + path segments + query params
262        //
263        // Reject paths that start with `//` (an empty bucket segment). The
264        // path-segment splitter further down filters empty segments out,
265        // which would otherwise let a malformed URL like `PUT //my-key`
266        // collapse to a valid `PUT /my-key` (CreateBucket) and silently
267        // succeed. Real S3 rejects empty bucket segments with
268        // InvalidBucketName, so mirror that.
269        if req.raw_path.starts_with("//") {
270            return Err(AwsServiceError::aws_error(
271                StatusCode::BAD_REQUEST,
272                "InvalidBucketName",
273                "The specified bucket is not valid: bucket name cannot be empty",
274            ));
275        }
276        let bucket = req.path_segments.first().map(|s| s.as_str());
277        // Extract key from the raw path to preserve leading slashes and empty segments.
278        // The raw path is like "/bucket/key/parts" — we strip the bucket prefix.
279        let key = if let Some(b) = bucket {
280            let prefix = format!("/{b}/");
281            if req.raw_path.starts_with(&prefix) && req.raw_path.len() > prefix.len() {
282                let raw_key = &req.raw_path[prefix.len()..];
283                Some(
284                    percent_encoding::percent_decode_str(raw_key)
285                        .decode_utf8_lossy()
286                        .into_owned(),
287                )
288            } else if req.path_segments.len() > 1 {
289                let raw = req.path_segments[1..].join("/");
290                Some(
291                    percent_encoding::percent_decode_str(&raw)
292                        .decode_utf8_lossy()
293                        .into_owned(),
294                )
295            } else {
296                None
297            }
298        } else {
299            None
300        };
301
302        // Multipart upload operations (checked before main match)
303        if let Some(b) = bucket {
304            // POST /{bucket}/{key}?uploads — CreateMultipartUpload
305            if req.method == Method::POST
306                && key.is_some()
307                && req.query_params.contains_key("uploads")
308            {
309                return self.create_multipart_upload(account_id, &req, b, key.as_deref().unwrap());
310            }
311
312            // POST /{bucket}/{key}?restore
313            if req.method == Method::POST
314                && key.is_some()
315                && req.query_params.contains_key("restore")
316            {
317                return self.restore_object(account_id, &req, b, key.as_deref().unwrap());
318            }
319
320            // POST /{bucket}/{key}?uploadId=X — CompleteMultipartUpload
321            if req.method == Method::POST && key.is_some() {
322                if let Some(upload_id) = req.query_params.get("uploadId").cloned() {
323                    return self.complete_multipart_upload(
324                        account_id,
325                        &req,
326                        b,
327                        key.as_deref().unwrap(),
328                        &upload_id,
329                    );
330                }
331            }
332
333            // PUT /{bucket}/{key}?partNumber=N&uploadId=X — UploadPart or UploadPartCopy
334            if req.method == Method::PUT && key.is_some() {
335                if let (Some(part_num_str), Some(upload_id)) = (
336                    req.query_params.get("partNumber").cloned(),
337                    req.query_params.get("uploadId").cloned(),
338                ) {
339                    if let Ok(part_number) = part_num_str.parse::<i64>() {
340                        if req.headers.contains_key("x-amz-copy-source") {
341                            return self.upload_part_copy(
342                                account_id,
343                                &req,
344                                b,
345                                key.as_deref().unwrap(),
346                                &upload_id,
347                                part_number,
348                            );
349                        }
350                        return self
351                            .upload_part(
352                                account_id,
353                                &req,
354                                b,
355                                key.as_deref().unwrap(),
356                                &upload_id,
357                                part_number,
358                            )
359                            .await;
360                    }
361                }
362            }
363
364            // DELETE /{bucket}/{key}?uploadId=X — AbortMultipartUpload
365            if req.method == Method::DELETE && key.is_some() {
366                if let Some(upload_id) = req.query_params.get("uploadId").cloned() {
367                    return self.abort_multipart_upload(
368                        account_id,
369                        b,
370                        key.as_deref().unwrap(),
371                        &upload_id,
372                    );
373                }
374            }
375
376            // GET /{bucket}?uploads — ListMultipartUploads
377            if req.method == Method::GET
378                && key.is_none()
379                && req.query_params.contains_key("uploads")
380            {
381                return self.list_multipart_uploads(account_id, b);
382            }
383
384            // GET /{bucket}/{key}?uploadId=X — ListParts
385            if req.method == Method::GET && key.is_some() {
386                if let Some(upload_id) = req.query_params.get("uploadId").cloned() {
387                    return self.list_parts(
388                        account_id,
389                        &req,
390                        b,
391                        key.as_deref().unwrap(),
392                        &upload_id,
393                    );
394                }
395            }
396        }
397
398        // Handle OPTIONS preflight requests (CORS)
399        if req.method == Method::OPTIONS {
400            if let Some(b_name) = bucket {
401                let cors_config = {
402                    let accounts = self.state.read();
403                    let _empty_s3 = crate::state::S3State::new(&req.account_id, &req.region);
404                    let state = accounts.get(&req.account_id).unwrap_or(&_empty_s3);
405                    state
406                        .buckets
407                        .get(b_name)
408                        .and_then(|b| b.cors_config.clone())
409                };
410                if let Some(ref config) = cors_config {
411                    let origin = req
412                        .headers
413                        .get("origin")
414                        .and_then(|v| v.to_str().ok())
415                        .unwrap_or("");
416                    let request_method = req
417                        .headers
418                        .get("access-control-request-method")
419                        .and_then(|v| v.to_str().ok())
420                        .unwrap_or("");
421                    let rules = parse_cors_config(config);
422                    if let Some(rule) = find_cors_rule(&rules, origin, Some(request_method)) {
423                        let mut headers = HeaderMap::new();
424                        let matched_origin = if rule.allowed_origins.contains(&"*".to_string()) {
425                            "*"
426                        } else {
427                            origin
428                        };
429                        headers.insert(
430                            "access-control-allow-origin",
431                            matched_origin
432                                .parse()
433                                .unwrap_or_else(|_| http::HeaderValue::from_static("")),
434                        );
435                        headers.insert(
436                            "access-control-allow-methods",
437                            rule.allowed_methods
438                                .join(", ")
439                                .parse()
440                                .unwrap_or_else(|_| http::HeaderValue::from_static("")),
441                        );
442                        if !rule.allowed_headers.is_empty() {
443                            let ah = if rule.allowed_headers.contains(&"*".to_string()) {
444                                req.headers
445                                    .get("access-control-request-headers")
446                                    .and_then(|v| v.to_str().ok())
447                                    .unwrap_or("*")
448                                    .to_string()
449                            } else {
450                                rule.allowed_headers.join(", ")
451                            };
452                            headers.insert(
453                                "access-control-allow-headers",
454                                ah.parse()
455                                    .unwrap_or_else(|_| http::HeaderValue::from_static("")),
456                            );
457                        }
458                        if let Some(max_age) = rule.max_age_seconds {
459                            headers.insert(
460                                "access-control-max-age",
461                                max_age
462                                    .to_string()
463                                    .parse()
464                                    .unwrap_or_else(|_| http::HeaderValue::from_static("")),
465                            );
466                        }
467                        return Ok(AwsResponse {
468                            status: StatusCode::OK,
469                            content_type: String::new(),
470                            body: Bytes::new().into(),
471                            headers,
472                        });
473                    }
474                }
475                return Err(AwsServiceError::aws_error(
476                    StatusCode::FORBIDDEN,
477                    "CORSResponse",
478                    "CORS is not enabled for this bucket",
479                ));
480            }
481        }
482
483        // Capture origin for CORS response headers
484        let origin_header = req
485            .headers
486            .get("origin")
487            .and_then(|v| v.to_str().ok())
488            .map(|s| s.to_string());
489
490        // Bucket-scoped sub-resource query params. If a request targets one
491        // of these without a bucket in the path, S3 returns an error rather
492        // than silently treating it as a top-level (service) operation.
493        // Real AWS rejects e.g. `GET /?tagging` with a routing/validation
494        // error; mirroring that prevents bucket-required ops from being
495        // accidentally answered by ListBuckets / WriteGetObjectResponse.
496        let bucket_subresources: &[&str] = &[
497            "tagging",
498            "acl",
499            "versioning",
500            "cors",
501            "notification",
502            "website",
503            "accelerate",
504            "publicAccessBlock",
505            "encryption",
506            "lifecycle",
507            "logging",
508            "policy",
509            "policyStatus",
510            "replication",
511            "ownershipControls",
512            "inventory",
513            "analytics",
514            "intelligent-tiering",
515            "metrics",
516            "requestPayment",
517            "abac",
518            "metadataConfiguration",
519            "metadataTable",
520            "metadataInventoryTable",
521            "metadataJournalTable",
522            "object-lock",
523            "versions",
524            "session",
525            "retention",
526            "legal-hold",
527            "attributes",
528            "torrent",
529            "renameObject",
530            "uploads",
531            "restore",
532            "select",
533            // Bucket-required ops the earlier list missed: their query
534            // markers (e.g. `?location` for GetBucketLocation,
535            // `?delete` for DeleteObjects, `?list-type=2` for
536            // ListObjectsV2) signal a bucket op so a request with the
537            // marker but no bucket segment is malformed, not a service-
538            // level call.
539            "location",
540            "delete",
541            "list-type",
542        ];
543        if bucket.is_none()
544            && bucket_subresources
545                .iter()
546                .any(|q| req.query_params.contains_key(*q))
547        {
548            return Err(AwsServiceError::aws_error(
549                StatusCode::BAD_REQUEST,
550                "InvalidBucketName",
551                "The specified bucket is not valid: bucket name is required for this operation",
552            ));
553        }
554
555        let mut result = match (&req.method, bucket, key.as_deref()) {
556            // ListBuckets: GET /
557            (&Method::GET, None, None) => {
558                if req.query_params.get("x-id").map(|s| s.as_str()) == Some("ListDirectoryBuckets")
559                {
560                    self.list_directory_buckets(account_id, &req)
561                } else {
562                    self.list_buckets(account_id, &req)
563                }
564            }
565
566            // Bucket-level operations (no key)
567            (&Method::PUT, Some(b), None) => {
568                if req.query_params.contains_key("tagging") {
569                    self.put_bucket_tagging(account_id, &req, b)
570                } else if req.query_params.contains_key("acl") {
571                    self.put_bucket_acl(account_id, &req, b)
572                } else if req.query_params.contains_key("versioning") {
573                    self.put_bucket_versioning(account_id, &req, b)
574                } else if req.query_params.contains_key("cors") {
575                    self.put_bucket_cors(account_id, &req, b)
576                } else if req.query_params.contains_key("notification") {
577                    self.put_bucket_notification(account_id, &req, b)
578                } else if req.query_params.contains_key("website") {
579                    self.put_bucket_website(account_id, &req, b)
580                } else if req.query_params.contains_key("accelerate") {
581                    self.put_bucket_accelerate(account_id, &req, b)
582                } else if req.query_params.contains_key("publicAccessBlock") {
583                    self.put_public_access_block(account_id, &req, b)
584                } else if req.query_params.contains_key("encryption") {
585                    self.put_bucket_encryption(account_id, &req, b)
586                } else if req.query_params.contains_key("lifecycle") {
587                    self.put_bucket_lifecycle(account_id, &req, b)
588                } else if req.query_params.contains_key("logging") {
589                    self.put_bucket_logging(account_id, &req, b)
590                } else if req.query_params.contains_key("policy") {
591                    self.put_bucket_policy(account_id, &req, b)
592                } else if req.query_params.contains_key("object-lock") {
593                    self.put_object_lock_config(account_id, &req, b)
594                } else if req.query_params.contains_key("replication") {
595                    self.put_bucket_replication(account_id, &req, b)
596                } else if req.query_params.contains_key("ownershipControls") {
597                    self.put_bucket_ownership_controls(account_id, &req, b)
598                } else if req.query_params.contains_key("inventory") {
599                    self.put_bucket_inventory(account_id, &req, b)
600                } else if req.query_params.contains_key("analytics") {
601                    self.put_bucket_analytics_config(account_id, &req, b)
602                } else if req.query_params.contains_key("intelligent-tiering") {
603                    self.put_bucket_intelligent_tiering_config(account_id, &req, b)
604                } else if req.query_params.contains_key("metrics") {
605                    self.put_bucket_metrics_config(account_id, &req, b)
606                } else if req.query_params.contains_key("requestPayment") {
607                    self.put_bucket_request_payment(account_id, &req, b)
608                } else if req.query_params.contains_key("abac") {
609                    self.put_bucket_abac(account_id, &req, b)
610                } else if req.query_params.contains_key("metadataInventoryTable") {
611                    self.update_bucket_metadata_inventory_table(account_id, &req, b)
612                } else if req.query_params.contains_key("metadataJournalTable") {
613                    self.update_bucket_metadata_journal_table(account_id, &req, b)
614                } else {
615                    self.create_bucket(account_id, &req, b)
616                }
617            }
618            (&Method::DELETE, Some(b), None) => {
619                if req.query_params.contains_key("tagging") {
620                    self.delete_bucket_tagging(account_id, &req, b)
621                } else if req.query_params.contains_key("cors") {
622                    self.delete_bucket_cors(account_id, b)
623                } else if req.query_params.contains_key("website") {
624                    self.delete_bucket_website(account_id, b)
625                } else if req.query_params.contains_key("publicAccessBlock") {
626                    self.delete_public_access_block(account_id, b)
627                } else if req.query_params.contains_key("encryption") {
628                    self.delete_bucket_encryption(account_id, b)
629                } else if req.query_params.contains_key("lifecycle") {
630                    self.delete_bucket_lifecycle(account_id, b)
631                } else if req.query_params.contains_key("policy") {
632                    self.delete_bucket_policy(account_id, b)
633                } else if req.query_params.contains_key("replication") {
634                    self.delete_bucket_replication(account_id, b)
635                } else if req.query_params.contains_key("ownershipControls") {
636                    self.delete_bucket_ownership_controls(account_id, b)
637                } else if req.query_params.contains_key("inventory") {
638                    self.delete_bucket_inventory(account_id, &req, b)
639                } else if req.query_params.contains_key("analytics") {
640                    self.delete_bucket_analytics_config(account_id, &req, b)
641                } else if req.query_params.contains_key("intelligent-tiering") {
642                    self.delete_bucket_intelligent_tiering_config(account_id, &req, b)
643                } else if req.query_params.contains_key("metrics") {
644                    self.delete_bucket_metrics_config(account_id, &req, b)
645                } else if req.query_params.contains_key("metadataConfiguration") {
646                    self.delete_bucket_metadata_config(account_id, b)
647                } else if req.query_params.contains_key("metadataTable") {
648                    self.delete_bucket_metadata_table_config(account_id, b)
649                } else {
650                    self.delete_bucket(account_id, &req, b)
651                }
652            }
653            (&Method::HEAD, Some(b), None) => self.head_bucket(account_id, b),
654            (&Method::GET, Some(b), None) => {
655                if req.query_params.contains_key("tagging") {
656                    self.get_bucket_tagging(account_id, &req, b)
657                } else if req.query_params.contains_key("location") {
658                    self.get_bucket_location(account_id, b)
659                } else if req.query_params.contains_key("acl") {
660                    self.get_bucket_acl(account_id, &req, b)
661                } else if req.query_params.contains_key("versioning") {
662                    self.get_bucket_versioning(account_id, b)
663                } else if req.query_params.contains_key("versions") {
664                    self.list_object_versions(account_id, &req, b)
665                } else if req.query_params.contains_key("object-lock") {
666                    self.get_object_lock_configuration(account_id, b)
667                } else if req.query_params.contains_key("cors") {
668                    self.get_bucket_cors(account_id, b)
669                } else if req.query_params.contains_key("notification") {
670                    self.get_bucket_notification(account_id, b)
671                } else if req.query_params.contains_key("website") {
672                    self.get_bucket_website(account_id, b)
673                } else if req.query_params.contains_key("accelerate") {
674                    self.get_bucket_accelerate(account_id, b)
675                } else if req.query_params.contains_key("publicAccessBlock") {
676                    self.get_public_access_block(account_id, b)
677                } else if req.query_params.contains_key("encryption") {
678                    self.get_bucket_encryption(account_id, b)
679                } else if req.query_params.contains_key("lifecycle") {
680                    self.get_bucket_lifecycle(account_id, b)
681                } else if req.query_params.contains_key("logging") {
682                    self.get_bucket_logging(account_id, b)
683                } else if req.query_params.contains_key("policy") {
684                    self.get_bucket_policy(account_id, b)
685                } else if req.query_params.contains_key("replication") {
686                    self.get_bucket_replication(account_id, b)
687                } else if req.query_params.contains_key("ownershipControls") {
688                    self.get_bucket_ownership_controls(account_id, b)
689                } else if req.query_params.contains_key("inventory") {
690                    if req.query_params.contains_key("id") {
691                        self.get_bucket_inventory(account_id, &req, b)
692                    } else {
693                        self.list_bucket_inventory_configurations(account_id, b)
694                    }
695                } else if req.query_params.contains_key("analytics") {
696                    if req.query_params.contains_key("id") {
697                        self.get_bucket_analytics_config(account_id, &req, b)
698                    } else {
699                        self.list_bucket_analytics_configurations(account_id, b)
700                    }
701                } else if req.query_params.contains_key("intelligent-tiering") {
702                    if req.query_params.contains_key("id") {
703                        self.get_bucket_intelligent_tiering_config(account_id, &req, b)
704                    } else {
705                        self.list_bucket_intelligent_tiering_configurations(account_id, b)
706                    }
707                } else if req.query_params.contains_key("metrics") {
708                    if req.query_params.contains_key("id") {
709                        self.get_bucket_metrics_config(account_id, &req, b)
710                    } else {
711                        self.list_bucket_metrics_configurations(account_id, b)
712                    }
713                } else if req.query_params.contains_key("requestPayment") {
714                    self.get_bucket_request_payment(account_id, b)
715                } else if req.query_params.contains_key("abac") {
716                    self.get_bucket_abac(account_id, b)
717                } else if req.query_params.contains_key("policyStatus") {
718                    self.get_bucket_policy_status(account_id, b)
719                } else if req.query_params.contains_key("metadataConfiguration") {
720                    self.get_bucket_metadata_config(account_id, b)
721                } else if req.query_params.contains_key("metadataTable") {
722                    self.get_bucket_metadata_table_config(account_id, b)
723                } else if req.query_params.contains_key("session") {
724                    self.create_session(account_id, &req, b)
725                } else if req.query_params.get("list-type").map(|s| s.as_str()) == Some("2") {
726                    self.list_objects_v2(account_id, &req, b)
727                } else if req.query_params.is_empty() {
728                    // If bucket has website config and no query params, serve index document
729                    let website_config = {
730                        let accounts = self.state.read();
731                        let _empty_s3 = crate::state::S3State::new(&req.account_id, &req.region);
732                        let state = accounts.get(&req.account_id).unwrap_or(&_empty_s3);
733                        state
734                            .buckets
735                            .get(b)
736                            .and_then(|bkt| bkt.website_config.clone())
737                    };
738                    if let Some(ref config) = website_config {
739                        if let Some(index_doc) = extract_xml_value(config, "Suffix").or_else(|| {
740                            extract_xml_value(config, "IndexDocument").and_then(|inner| {
741                                let open = "<Suffix>";
742                                let close = "</Suffix>";
743                                let s = inner.find(open)? + open.len();
744                                let e = inner.find(close)?;
745                                Some(inner[s..e].trim().to_string())
746                            })
747                        }) {
748                            self.serve_website_object(account_id, &req, b, &index_doc, config)
749                        } else {
750                            self.list_objects_v1(account_id, &req, b)
751                        }
752                    } else {
753                        self.list_objects_v1(account_id, &req, b)
754                    }
755                } else {
756                    self.list_objects_v1(account_id, &req, b)
757                }
758            }
759
760            // Object-level operations
761            (&Method::PUT, Some(b), Some(k)) => {
762                if req.query_params.contains_key("tagging") {
763                    self.put_object_tagging(account_id, &req, b, k)
764                } else if req.query_params.contains_key("acl") {
765                    self.put_object_acl(account_id, &req, b, k)
766                } else if req.query_params.contains_key("retention") {
767                    self.put_object_retention(account_id, &req, b, k)
768                } else if req.query_params.contains_key("legal-hold") {
769                    self.put_object_legal_hold(account_id, &req, b, k)
770                } else if req.query_params.contains_key("renameObject") {
771                    self.rename_object(account_id, &req, b, k)
772                } else if req.query_params.contains_key("encryption") {
773                    self.update_object_encryption(account_id, &req, b, k)
774                } else if req.headers.contains_key("x-amz-copy-source") {
775                    self.copy_object(account_id, &req, b, k)
776                } else {
777                    self.put_object(account_id, &req, b, k).await
778                }
779            }
780            (&Method::GET, Some(b), Some(k)) => {
781                if req.query_params.contains_key("tagging") {
782                    self.get_object_tagging(account_id, &req, b, k)
783                } else if req.query_params.contains_key("acl") {
784                    self.get_object_acl(account_id, &req, b, k)
785                } else if req.query_params.contains_key("retention") {
786                    self.get_object_retention(account_id, &req, b, k)
787                } else if req.query_params.contains_key("legal-hold") {
788                    self.get_object_legal_hold(account_id, &req, b, k)
789                } else if req.query_params.contains_key("attributes") {
790                    self.get_object_attributes(account_id, &req, b, k)
791                } else if req.query_params.contains_key("torrent") {
792                    self.get_object_torrent(account_id, &req, b, k)
793                } else {
794                    let result = self.get_object(account_id, &req, b, k);
795                    // If object not found and bucket has website config, serve error document
796                    let is_not_found = matches!(
797                        &result,
798                        Err(e) if e.code() == "NoSuchKey"
799                    );
800                    if is_not_found {
801                        let website_config = {
802                            let accounts = self.state.read();
803                            let _empty_s3 =
804                                crate::state::S3State::new(&req.account_id, &req.region);
805                            let state = accounts.get(&req.account_id).unwrap_or(&_empty_s3);
806                            state
807                                .buckets
808                                .get(b)
809                                .and_then(|bkt| bkt.website_config.clone())
810                        };
811                        if let Some(ref config) = website_config {
812                            if let Some(error_key) = extract_xml_value(config, "ErrorDocument")
813                                .and_then(|inner| {
814                                    let open = "<Key>";
815                                    let close = "</Key>";
816                                    let s = inner.find(open)? + open.len();
817                                    let e = inner.find(close)?;
818                                    Some(inner[s..e].trim().to_string())
819                                })
820                                .or_else(|| extract_xml_value(config, "Key"))
821                            {
822                                return self.serve_website_error(account_id, &req, b, &error_key);
823                            }
824                        }
825                    }
826                    result
827                }
828            }
829            (&Method::DELETE, Some(b), Some(k)) => {
830                if req.query_params.contains_key("tagging") {
831                    self.delete_object_tagging(account_id, b, k)
832                } else {
833                    self.delete_object(account_id, &req, b, k)
834                }
835            }
836            (&Method::HEAD, Some(b), Some(k)) => self.head_object(account_id, &req, b, k),
837
838            // POST /{bucket}?delete — batch delete
839            (&Method::POST, Some(b), None) if req.query_params.contains_key("delete") => {
840                self.delete_objects(account_id, &req, b)
841            }
842            (&Method::POST, Some(b), None)
843                if req.query_params.contains_key("metadataConfiguration") =>
844            {
845                self.create_bucket_metadata_config(account_id, &req, b)
846            }
847            (&Method::POST, Some(b), None) if req.query_params.contains_key("metadataTable") => {
848                self.create_bucket_metadata_table_config(account_id, &req, b)
849            }
850            (&Method::POST, Some(b), Some(k))
851                if req.query_params.get("select-type").map(|s| s.as_str()) == Some("2") =>
852            {
853                self.select_object_content(account_id, &req, b, k)
854            }
855            (&Method::POST, Some("WriteGetObjectResponse"), None) => {
856                self.write_get_object_response(account_id, &req)
857            }
858
859            _ => Err(AwsServiceError::aws_error(
860                StatusCode::METHOD_NOT_ALLOWED,
861                "MethodNotAllowed",
862                "The specified method is not allowed against this resource",
863            )),
864        };
865
866        // Apply CORS headers to the response if Origin was present
867        if let (Some(ref origin), Some(b_name)) = (&origin_header, bucket) {
868            let cors_config = {
869                let accounts = self.state.read();
870                let _empty_s3 = crate::state::S3State::new(&req.account_id, &req.region);
871                let state = accounts.get(&req.account_id).unwrap_or(&_empty_s3);
872                state
873                    .buckets
874                    .get(b_name)
875                    .and_then(|b| b.cors_config.clone())
876            };
877            if let Some(ref config) = cors_config {
878                let rules = parse_cors_config(config);
879                if let Some(rule) = find_cors_rule(&rules, origin, None) {
880                    if let Ok(ref mut resp) = result {
881                        let matched_origin = if rule.allowed_origins.contains(&"*".to_string()) {
882                            "*"
883                        } else {
884                            origin
885                        };
886                        resp.headers.insert(
887                            "access-control-allow-origin",
888                            matched_origin
889                                .parse()
890                                .unwrap_or_else(|_| http::HeaderValue::from_static("")),
891                        );
892                        if !rule.expose_headers.is_empty() {
893                            resp.headers.insert(
894                                "access-control-expose-headers",
895                                rule.expose_headers
896                                    .join(", ")
897                                    .parse()
898                                    .unwrap_or_else(|_| http::HeaderValue::from_static("")),
899                            );
900                        }
901                    }
902                }
903            }
904        }
905
906        // Write S3 access log entry if the source bucket has logging enabled
907        if let Some(b_name) = bucket {
908            let status_code = match &result {
909                Ok(resp) => resp.status.as_u16(),
910                Err(e) => e.status().as_u16(),
911            };
912            let op = logging::operation_name(&req.method, key.as_deref());
913            logging::maybe_write_access_log(
914                &self.state,
915                &self.store,
916                b_name,
917                &logging::AccessLogRequest {
918                    operation: op,
919                    key: key.as_deref(),
920                    status: status_code,
921                    request_id: &req.request_id,
922                    method: req.method.as_str(),
923                    path: &req.raw_path,
924                },
925            );
926        }
927
928        result
929    }
930
931    fn supported_actions(&self) -> &[&str] {
932        &[
933            // Buckets
934            "ListBuckets",
935            "CreateBucket",
936            "DeleteBucket",
937            "HeadBucket",
938            "GetBucketLocation",
939            // Objects
940            "PutObject",
941            "GetObject",
942            "DeleteObject",
943            "HeadObject",
944            "CopyObject",
945            "DeleteObjects",
946            "ListObjectsV2",
947            "ListObjects",
948            "ListObjectVersions",
949            "GetObjectAttributes",
950            "RestoreObject",
951            // Object properties
952            "PutObjectTagging",
953            "GetObjectTagging",
954            "DeleteObjectTagging",
955            "PutObjectAcl",
956            "GetObjectAcl",
957            "PutObjectRetention",
958            "GetObjectRetention",
959            "PutObjectLegalHold",
960            "GetObjectLegalHold",
961            // Bucket configuration
962            "PutBucketTagging",
963            "GetBucketTagging",
964            "DeleteBucketTagging",
965            "PutBucketAcl",
966            "GetBucketAcl",
967            "PutBucketVersioning",
968            "GetBucketVersioning",
969            "PutBucketCors",
970            "GetBucketCors",
971            "DeleteBucketCors",
972            "PutBucketNotificationConfiguration",
973            "GetBucketNotificationConfiguration",
974            "PutBucketWebsite",
975            "GetBucketWebsite",
976            "DeleteBucketWebsite",
977            "PutBucketAccelerateConfiguration",
978            "GetBucketAccelerateConfiguration",
979            "PutPublicAccessBlock",
980            "GetPublicAccessBlock",
981            "DeletePublicAccessBlock",
982            "PutBucketEncryption",
983            "GetBucketEncryption",
984            "DeleteBucketEncryption",
985            "PutBucketLifecycleConfiguration",
986            "GetBucketLifecycleConfiguration",
987            "DeleteBucketLifecycle",
988            "PutBucketLogging",
989            "GetBucketLogging",
990            "PutBucketPolicy",
991            "GetBucketPolicy",
992            "DeleteBucketPolicy",
993            "PutObjectLockConfiguration",
994            "GetObjectLockConfiguration",
995            "PutBucketReplication",
996            "GetBucketReplication",
997            "DeleteBucketReplication",
998            "PutBucketOwnershipControls",
999            "GetBucketOwnershipControls",
1000            "DeleteBucketOwnershipControls",
1001            "PutBucketInventoryConfiguration",
1002            "GetBucketInventoryConfiguration",
1003            "DeleteBucketInventoryConfiguration",
1004            "ListBucketInventoryConfigurations",
1005            "PutBucketAnalyticsConfiguration",
1006            "GetBucketAnalyticsConfiguration",
1007            "DeleteBucketAnalyticsConfiguration",
1008            "ListBucketAnalyticsConfigurations",
1009            "PutBucketIntelligentTieringConfiguration",
1010            "GetBucketIntelligentTieringConfiguration",
1011            "DeleteBucketIntelligentTieringConfiguration",
1012            "ListBucketIntelligentTieringConfigurations",
1013            "PutBucketMetricsConfiguration",
1014            "GetBucketMetricsConfiguration",
1015            "DeleteBucketMetricsConfiguration",
1016            "ListBucketMetricsConfigurations",
1017            "PutBucketRequestPayment",
1018            "GetBucketRequestPayment",
1019            "PutBucketAbac",
1020            "GetBucketAbac",
1021            "GetBucketPolicyStatus",
1022            "CreateBucketMetadataConfiguration",
1023            "GetBucketMetadataConfiguration",
1024            "DeleteBucketMetadataConfiguration",
1025            "CreateBucketMetadataTableConfiguration",
1026            "GetBucketMetadataTableConfiguration",
1027            "DeleteBucketMetadataTableConfiguration",
1028            "UpdateBucketMetadataInventoryTableConfiguration",
1029            "UpdateBucketMetadataJournalTableConfiguration",
1030            "GetObjectTorrent",
1031            "RenameObject",
1032            "SelectObjectContent",
1033            "UpdateObjectEncryption",
1034            "WriteGetObjectResponse",
1035            "ListDirectoryBuckets",
1036            "CreateSession",
1037            // Multipart uploads
1038            "CreateMultipartUpload",
1039            "UploadPart",
1040            "UploadPartCopy",
1041            "CompleteMultipartUpload",
1042            "AbortMultipartUpload",
1043            "ListParts",
1044            "ListMultipartUploads",
1045        ]
1046    }
1047
1048    fn iam_enforceable(&self) -> bool {
1049        true
1050    }
1051
1052    /// S3 resources are either:
1053    /// - Bucket ARN (`arn:aws:s3:::bucket`) for bucket-level actions
1054    /// - Object ARN (`arn:aws:s3:::bucket/key`) for object-level actions
1055    /// - Wildcard (`*`) for `ListBuckets` which doesn't target a specific
1056    ///   resource
1057    ///
1058    /// S3 ARNs notably omit the account id and region — this is the one
1059    /// AWS service that carries neither in its ARN, because bucket names
1060    /// are globally unique.
1061    fn iam_action_for(&self, request: &AwsRequest) -> Option<fakecloud_core::auth::IamAction> {
1062        // S3 doesn't set `request.action` — the handler dispatches on
1063        // method + path + query params directly. Re-derive the action
1064        // name here so enforcement can match against IAM policies the
1065        // same way the real service would.
1066        let bucket = request.path_segments.first().map(|s| s.as_str());
1067        let key = if request.path_segments.len() > 1 {
1068            Some(request.path_segments[1..].join("/"))
1069        } else {
1070            None
1071        };
1072        let action = s3_detect_action(
1073            request.method.as_str(),
1074            bucket,
1075            key.as_deref(),
1076            &request.query_params,
1077        )?;
1078        // A handful of S3-adjacent ops are authorized under sibling IAM
1079        // service prefixes, not `s3:`. Select the right prefix so a policy
1080        // targeting the real action string actually matches.
1081        let service = match action {
1082            "CreateSession" => "s3express",
1083            "WriteGetObjectResponse" => "s3-object-lambda",
1084            _ => "s3",
1085        };
1086        let resource = s3_resource_for(action, bucket, key.as_deref());
1087        Some(fakecloud_core::auth::IamAction {
1088            service,
1089            action,
1090            resource,
1091        })
1092    }
1093
1094    fn iam_condition_keys_for(
1095        &self,
1096        request: &AwsRequest,
1097        action: &fakecloud_core::auth::IamAction,
1098    ) -> std::collections::BTreeMap<String, Vec<String>> {
1099        s3_condition_keys(action.action, &request.query_params)
1100    }
1101
1102    fn resource_tags_for(
1103        &self,
1104        resource_arn: &str,
1105    ) -> Option<std::collections::HashMap<String, String>> {
1106        s3_resource_tags(&self.state, resource_arn)
1107            .map(|m| m.into_iter().collect::<std::collections::HashMap<_, _>>())
1108    }
1109
1110    fn request_tags_from(
1111        &self,
1112        request: &AwsRequest,
1113        action: &str,
1114    ) -> Option<std::collections::HashMap<String, String>> {
1115        s3_request_tags(request, action)
1116            .map(|m| m.into_iter().collect::<std::collections::HashMap<_, _>>())
1117    }
1118}
1119
1120/// Extract service-specific IAM condition keys from an S3 request.
1121///
1122/// Today only `ListObjects` / `ListObjectsV2` expose keys (`s3:prefix`,
1123/// `s3:delimiter`, `s3:max-keys`) via their query params. Other actions
1124/// return an empty map so the evaluator's safe-fail semantics treat any
1125/// policy condition referencing an unknown key as "doesn't apply".
1126fn s3_condition_keys(
1127    action: &str,
1128    query: &std::collections::HashMap<String, String>,
1129) -> std::collections::BTreeMap<String, Vec<String>> {
1130    let mut out = std::collections::BTreeMap::new();
1131    if matches!(action, "ListObjects" | "ListObjectsV2") {
1132        // Both list variants share the same query param shape.
1133        if let Some(prefix) = query.get("prefix") {
1134            out.insert("s3:prefix".to_string(), vec![prefix.clone()]);
1135        }
1136        if let Some(delimiter) = query.get("delimiter") {
1137            out.insert("s3:delimiter".to_string(), vec![delimiter.clone()]);
1138        }
1139        if let Some(max_keys) = query.get("max-keys") {
1140            out.insert("s3:max-keys".to_string(), vec![max_keys.clone()]);
1141        }
1142    }
1143    out
1144}
1145
1146/// Look up resource tags for an S3 ARN.
1147///
1148/// Bucket-level ARN (`arn:aws:s3:::bucket`) -> bucket tags.
1149/// Object-level ARN (`arn:aws:s3:::bucket/key`) -> object tags.
1150/// `*` (ListBuckets) -> `Some(empty)` (no resource to tag).
1151fn s3_resource_tags(
1152    state: &SharedS3State,
1153    resource_arn: &str,
1154) -> Option<std::collections::BTreeMap<String, String>> {
1155    if resource_arn == "*" {
1156        return Some(std::collections::BTreeMap::new());
1157    }
1158    // S3 ARNs: arn:aws:s3:::bucket or arn:aws:s3:::bucket/key
1159    let after_prefix = resource_arn.strip_prefix("arn:aws:s3:::")?;
1160    let mas = state.read();
1161    // S3 bucket names are globally unique; scan all accounts to find the bucket
1162    let bucket_name = after_prefix.split('/').next().unwrap_or(after_prefix);
1163    let state = mas
1164        .find_account(|s| s.buckets.contains_key(bucket_name))
1165        .and_then(|id| mas.get(id))
1166        .or_else(|| Some(mas.default_ref()))?;
1167    if let Some(slash_pos) = after_prefix.find('/') {
1168        // Object-level: bucket/key
1169        let bucket_name = &after_prefix[..slash_pos];
1170        let key = &after_prefix[slash_pos + 1..];
1171        let bucket = state.buckets.get(bucket_name)?;
1172        // Try current objects first, then versioned objects (latest version)
1173        if let Some(obj) = bucket.objects.get(key) {
1174            Some(obj.tags.clone())
1175        } else if let Some(versions) = bucket.object_versions.get(key) {
1176            versions.last().map(|v| v.tags.clone())
1177        } else {
1178            // Object doesn't exist yet (e.g. PutObject creating it)
1179            Some(std::collections::BTreeMap::new())
1180        }
1181    } else {
1182        // Bucket-level
1183        let bucket = state.buckets.get(after_prefix)?;
1184        Some(bucket.tags.clone())
1185    }
1186}
1187
1188/// Extract tags from an S3 request body/headers.
1189///
1190/// S3 sends tags via:
1191/// - `x-amz-tagging` header (URL-encoded `key=value&...`) on PutObject
1192/// - XML body on PutBucketTagging / PutObjectTagging
1193fn s3_request_tags(
1194    request: &AwsRequest,
1195    action: &str,
1196) -> Option<std::collections::BTreeMap<String, String>> {
1197    match action {
1198        "PutObject" | "CopyObject" | "CreateMultipartUpload" => {
1199            // Tags come via x-amz-tagging header
1200            if let Some(tagging) = request.headers.get("x-amz-tagging") {
1201                let tags = parse_url_encoded_tags(tagging.to_str().unwrap_or(""));
1202                Some(tags.into_iter().collect())
1203            } else {
1204                Some(std::collections::BTreeMap::new())
1205            }
1206        }
1207        "PutBucketTagging" | "PutObjectTagging" => {
1208            // Tags come in XML body
1209            let body = std::str::from_utf8(&request.body).unwrap_or("");
1210            let tags = parse_tagging_xml(body);
1211            Some(tags.into_iter().collect())
1212        }
1213        _ => Some(std::collections::BTreeMap::new()),
1214    }
1215}
1216
1217/// Derive the IAM action name from an S3 REST request. Handles the
1218/// common cases (GetObject, PutObject, DeleteObject, ListObjectsV2,
1219/// CreateBucket, ...) plus a subset of sub-resource operations
1220/// (`?acl`, `?tagging`, `?versioning`, `?policy`, `?cors`, `?website`,
1221/// `?lifecycle`, `?encryption`, `?logging`, `?notification`, `?replication`,
1222/// `?ownershipControls`, `?publicAccessBlock`, `?accelerate`, `?inventory`,
1223/// `?object-lock`, `?uploads`, `?uploadId`).
1224///
1225/// Returns `None` for requests that don't map to a known action — the
1226/// dispatch layer then skips enforcement for that request rather than
1227/// guessing (and a warn log fires via the "service is iam_enforceable
1228/// but has no mapping" branch in dispatch.rs).
1229fn s3_detect_action(
1230    method: &str,
1231    bucket: Option<&str>,
1232    key: Option<&str>,
1233    query: &std::collections::HashMap<String, String>,
1234) -> Option<&'static str> {
1235    let has = |q: &str| query.contains_key(q);
1236    let is_get = method == "GET";
1237    let is_put = method == "PUT";
1238    let is_post = method == "POST";
1239    let is_delete = method == "DELETE";
1240
1241    // Service root
1242    if bucket.is_none() {
1243        return match method {
1244            "GET" => Some("ListBuckets"),
1245            _ => None,
1246        };
1247    }
1248
1249    // WriteGetObjectResponse is an Object Lambda data-plane op routed as
1250    // `POST /WriteGetObjectResponse` (the literal value occupies the bucket
1251    // segment). AWS authorizes it under the separate
1252    // `s3-object-lambda:WriteGetObjectResponse` action — handled in
1253    // `iam_action_for`, which selects the `s3-object-lambda` service prefix
1254    // for this op. Previously unmapped -> enforcement skipped.
1255    if is_post && bucket == Some("WriteGetObjectResponse") && key.is_none() {
1256        return Some("WriteGetObjectResponse");
1257    }
1258
1259    let has_key = key.is_some();
1260
1261    // Multipart sub-resource forms
1262    if has_key && is_post && has("uploads") {
1263        return Some("CreateMultipartUpload");
1264    }
1265    if has_key && is_post && has("uploadId") {
1266        return Some("CompleteMultipartUpload");
1267    }
1268    if has_key && is_put && has("partNumber") && has("uploadId") {
1269        return Some("UploadPart");
1270    }
1271    if has_key && is_delete && has("uploadId") {
1272        return Some("AbortMultipartUpload");
1273    }
1274    if has_key && is_get && has("uploadId") {
1275        return Some("ListParts");
1276    }
1277    if !has_key && is_get && has("uploads") {
1278        return Some("ListMultipartUploads");
1279    }
1280
1281    // Sub-resource-keyed actions (?acl, ?tagging, ...). Order matters
1282    // since a request can carry multiple; we pick the most specific.
1283    // Object-level sub-resources come first (key present).
1284    if has_key {
1285        if has("tagging") {
1286            return Some(match method {
1287                "GET" => "GetObjectTagging",
1288                "PUT" => "PutObjectTagging",
1289                "DELETE" => "DeleteObjectTagging",
1290                _ => return None,
1291            });
1292        }
1293        if has("acl") {
1294            return Some(match method {
1295                "GET" => "GetObjectAcl",
1296                "PUT" => "PutObjectAcl",
1297                _ => return None,
1298            });
1299        }
1300        if has("retention") {
1301            return Some(match method {
1302                "GET" => "GetObjectRetention",
1303                "PUT" => "PutObjectRetention",
1304                _ => return None,
1305            });
1306        }
1307        if has("legal-hold") {
1308            return Some(match method {
1309                "GET" => "GetObjectLegalHold",
1310                "PUT" => "PutObjectLegalHold",
1311                _ => return None,
1312            });
1313        }
1314        // Identified by cubic on PR #399: both ?attributes and ?restore
1315        // need method guards — otherwise e.g. GET /bucket/key?restore
1316        // would be classified as RestoreObject (POST-only in AWS) and
1317        // IAM-evaluated against s3:RestoreObject instead of s3:GetObject.
1318        if has("attributes") && is_get {
1319            return Some("GetObjectAttributes");
1320        }
1321        if has("restore") && is_post {
1322            return Some("RestoreObject");
1323        }
1324        // RenameObject is an object-level op (`PUT /bucket/newkey?renameObject`
1325        // + `x-amz-rename-source`). It carries a key, so it must be detected
1326        // here — the `!has_key` arm below never sees it. Without this it fell
1327        // through to `PutObject`, so a policy denying `s3:RenameObject` was
1328        // silently ineffective.
1329        if has("renameObject") && is_put {
1330            return Some("RenameObject");
1331        }
1332        // UpdateObjectEncryption (`PUT /bucket/key?encryption`) changes an
1333        // existing object's server-side encryption. AWS gates this under
1334        // `s3:PutObject` (encryption is a write attribute), and there is no
1335        // distinct public `s3:UpdateObjectEncryption` IAM action — so map to
1336        // PutObject deliberately rather than letting it fall through.
1337        if has("encryption") && is_put {
1338            return Some("PutObject");
1339        }
1340        // SelectObjectContent (`POST /bucket/key?select&select-type=2`) reads
1341        // object data, so AWS authorizes it with `s3:GetObject`. Previously
1342        // unmapped -> `_ => None` -> enforcement skipped entirely.
1343        if has("select-type") && is_post {
1344            return Some("GetObject");
1345        }
1346    }
1347
1348    // Bucket-level sub-resources (key absent).
1349    if !has_key {
1350        if has("tagging") {
1351            return Some(match method {
1352                "GET" => "GetBucketTagging",
1353                "PUT" => "PutBucketTagging",
1354                "DELETE" => "DeleteBucketTagging",
1355                _ => return None,
1356            });
1357        }
1358        if has("acl") {
1359            return Some(match method {
1360                "GET" => "GetBucketAcl",
1361                "PUT" => "PutBucketAcl",
1362                _ => return None,
1363            });
1364        }
1365        if has("versioning") {
1366            return Some(match method {
1367                "GET" => "GetBucketVersioning",
1368                "PUT" => "PutBucketVersioning",
1369                _ => return None,
1370            });
1371        }
1372        if has("cors") {
1373            return Some(match method {
1374                "GET" => "GetBucketCors",
1375                "PUT" => "PutBucketCors",
1376                "DELETE" => "DeleteBucketCors",
1377                _ => return None,
1378            });
1379        }
1380        if has("policy") {
1381            return Some(match method {
1382                "GET" => "GetBucketPolicy",
1383                "PUT" => "PutBucketPolicy",
1384                "DELETE" => "DeleteBucketPolicy",
1385                _ => return None,
1386            });
1387        }
1388        if has("website") {
1389            return Some(match method {
1390                "GET" => "GetBucketWebsite",
1391                "PUT" => "PutBucketWebsite",
1392                "DELETE" => "DeleteBucketWebsite",
1393                _ => return None,
1394            });
1395        }
1396        if has("lifecycle") {
1397            return Some(match method {
1398                "GET" => "GetBucketLifecycleConfiguration",
1399                "PUT" => "PutBucketLifecycleConfiguration",
1400                "DELETE" => "DeleteBucketLifecycle",
1401                _ => return None,
1402            });
1403        }
1404        if has("encryption") {
1405            return Some(match method {
1406                "GET" => "GetBucketEncryption",
1407                "PUT" => "PutBucketEncryption",
1408                "DELETE" => "DeleteBucketEncryption",
1409                _ => return None,
1410            });
1411        }
1412        if has("logging") {
1413            return Some(match method {
1414                "GET" => "GetBucketLogging",
1415                "PUT" => "PutBucketLogging",
1416                _ => return None,
1417            });
1418        }
1419        if has("notification") {
1420            return Some(match method {
1421                "GET" => "GetBucketNotificationConfiguration",
1422                "PUT" => "PutBucketNotificationConfiguration",
1423                _ => return None,
1424            });
1425        }
1426        if has("replication") {
1427            return Some(match method {
1428                "GET" => "GetBucketReplication",
1429                "PUT" => "PutBucketReplication",
1430                "DELETE" => "DeleteBucketReplication",
1431                _ => return None,
1432            });
1433        }
1434        if has("ownershipControls") {
1435            return Some(match method {
1436                "GET" => "GetBucketOwnershipControls",
1437                "PUT" => "PutBucketOwnershipControls",
1438                "DELETE" => "DeleteBucketOwnershipControls",
1439                _ => return None,
1440            });
1441        }
1442        if has("publicAccessBlock") {
1443            return Some(match method {
1444                "GET" => "GetPublicAccessBlock",
1445                "PUT" => "PutPublicAccessBlock",
1446                "DELETE" => "DeletePublicAccessBlock",
1447                _ => return None,
1448            });
1449        }
1450        if has("accelerate") {
1451            return Some(match method {
1452                "GET" => "GetBucketAccelerateConfiguration",
1453                "PUT" => "PutBucketAccelerateConfiguration",
1454                _ => return None,
1455            });
1456        }
1457        if has("inventory") {
1458            return Some(match method {
1459                "GET" => "GetBucketInventoryConfiguration",
1460                "PUT" => "PutBucketInventoryConfiguration",
1461                "DELETE" => "DeleteBucketInventoryConfiguration",
1462                _ => return None,
1463            });
1464        }
1465        if has("analytics") {
1466            return Some(match method {
1467                "GET" if has("id") => "GetBucketAnalyticsConfiguration",
1468                "GET" => "ListBucketAnalyticsConfigurations",
1469                "PUT" => "PutBucketAnalyticsConfiguration",
1470                "DELETE" => "DeleteBucketAnalyticsConfiguration",
1471                _ => return None,
1472            });
1473        }
1474        if has("intelligent-tiering") {
1475            return Some(match method {
1476                "GET" if has("id") => "GetBucketIntelligentTieringConfiguration",
1477                "GET" => "ListBucketIntelligentTieringConfigurations",
1478                "PUT" => "PutBucketIntelligentTieringConfiguration",
1479                "DELETE" => "DeleteBucketIntelligentTieringConfiguration",
1480                _ => return None,
1481            });
1482        }
1483        if has("metrics") {
1484            return Some(match method {
1485                "GET" if has("id") => "GetBucketMetricsConfiguration",
1486                "GET" => "ListBucketMetricsConfigurations",
1487                "PUT" => "PutBucketMetricsConfiguration",
1488                "DELETE" => "DeleteBucketMetricsConfiguration",
1489                _ => return None,
1490            });
1491        }
1492        if has("requestPayment") {
1493            return Some(match method {
1494                "GET" => "GetBucketRequestPayment",
1495                "PUT" => "PutBucketRequestPayment",
1496                _ => return None,
1497            });
1498        }
1499        if has("policyStatus") && is_get {
1500            return Some("GetBucketPolicyStatus");
1501        }
1502        if has("metadataConfiguration") {
1503            return Some(match method {
1504                "GET" => "GetBucketMetadataConfiguration",
1505                "POST" => "CreateBucketMetadataConfiguration",
1506                "DELETE" => "DeleteBucketMetadataConfiguration",
1507                _ => return None,
1508            });
1509        }
1510        if has("metadataTable") {
1511            return Some(match method {
1512                "GET" => "GetBucketMetadataTableConfiguration",
1513                "POST" => "CreateBucketMetadataTableConfiguration",
1514                "DELETE" => "DeleteBucketMetadataTableConfiguration",
1515                _ => return None,
1516            });
1517        }
1518        if has("metadataInventoryTable") && is_put {
1519            return Some("UpdateBucketMetadataInventoryTableConfiguration");
1520        }
1521        if has("metadataJournalTable") && is_put {
1522            return Some("UpdateBucketMetadataJournalTableConfiguration");
1523        }
1524        if has("abac") {
1525            // PUT  -> PutBucketAbacConfiguration
1526            // GET  -> GetBucketAbacConfiguration (previously fell through to
1527            //         the plain `GET bucket` arm and was IAM-evaluated as
1528            //         s3:ListObjects — a policy on the ABAC config was never
1529            //         honored).
1530            return Some(match method {
1531                "PUT" => "PutBucketAbacConfiguration",
1532                "GET" => "GetBucketAbacConfiguration",
1533                _ => return None,
1534            });
1535        }
1536        if has("session") && is_get {
1537            // CreateSession (S3 Express directory buckets). AWS authorizes it
1538            // under the separate `s3express:CreateSession` action — handled in
1539            // `iam_action_for`, which selects the `s3express` service prefix
1540            // for this op. Previously fell through to the plain `GET bucket`
1541            // arm and was evaluated as s3:ListObjects.
1542            return Some("CreateSession");
1543        }
1544        if has("object-lock") {
1545            return Some(match method {
1546                "GET" => "GetObjectLockConfiguration",
1547                "PUT" => "PutObjectLockConfiguration",
1548                _ => return None,
1549            });
1550        }
1551        if has("location") {
1552            return Some("GetBucketLocation");
1553        }
1554        if is_post && has("delete") {
1555            return Some("DeleteObjects");
1556        }
1557        if is_get && has("versions") {
1558            return Some("ListObjectVersions");
1559        }
1560    }
1561
1562    // GET on an object with `?torrent` is GetObjectTorrent, authorized under
1563    // s3:GetObjectTorrent -- not s3:GetObject. Without this arm a policy that
1564    // allows GetObject but not GetObjectTorrent would wrongly permit it
1565    // (bug-audit 2026-06-20, 5.3).
1566    if is_get && has_key && has("torrent") {
1567        return Some("GetObjectTorrent");
1568    }
1569
1570    // Plain bucket/object methods.
1571    match (method, has_key) {
1572        ("GET", true) => Some("GetObject"),
1573        ("PUT", true) => {
1574            // CopyObject uses x-amz-copy-source but we don't have headers
1575            // handy here — treat both PutObject and CopyObject as PutObject
1576            // for IAM purposes; CopyObject additionally requires
1577            // s3:GetObject on the source but that's evaluated per-request
1578            // by real AWS, not on the PUT call itself.
1579            Some("PutObject")
1580        }
1581        ("DELETE", true) => Some("DeleteObject"),
1582        ("HEAD", true) => Some("HeadObject"),
1583        ("GET", false) => {
1584            if query.contains_key("list-type") {
1585                Some("ListObjectsV2")
1586            } else {
1587                Some("ListObjects")
1588            }
1589        }
1590        ("PUT", false) => Some("CreateBucket"),
1591        ("DELETE", false) => Some("DeleteBucket"),
1592        ("HEAD", false) => Some("HeadBucket"),
1593        _ => None,
1594    }
1595}
1596
1597/// Build the S3 resource ARN for an action. Returns `*` for
1598/// `ListBuckets` (account-scoped), a bucket ARN for bucket-level
1599/// configuration actions, or an object ARN for object-level actions.
1600fn s3_resource_for(action: &'static str, bucket: Option<&str>, key: Option<&str>) -> String {
1601    // Object-level actions work on `bucket/key`.
1602    const OBJECT_ACTIONS: &[&str] = &[
1603        "PutObject",
1604        "GetObject",
1605        "DeleteObject",
1606        "HeadObject",
1607        "CopyObject",
1608        "GetObjectAttributes",
1609        "RestoreObject",
1610        "PutObjectTagging",
1611        "GetObjectTagging",
1612        "DeleteObjectTagging",
1613        "PutObjectAcl",
1614        "GetObjectAcl",
1615        "PutObjectRetention",
1616        "GetObjectRetention",
1617        "PutObjectLegalHold",
1618        "GetObjectLegalHold",
1619        "CreateMultipartUpload",
1620        "UploadPart",
1621        "UploadPartCopy",
1622        "CompleteMultipartUpload",
1623        "AbortMultipartUpload",
1624        "ListParts",
1625    ];
1626    if action == "ListBuckets" {
1627        return "*".to_string();
1628    }
1629    let Some(bucket) = bucket else {
1630        return "*".to_string();
1631    };
1632    if OBJECT_ACTIONS.contains(&action) {
1633        match key {
1634            Some(k) if !k.is_empty() => Arn::s3(&format!("{bucket}/{k}")).to_string(),
1635            _ => Arn::s3(&format!("{bucket}/*")).to_string(),
1636        }
1637    } else {
1638        // Bucket-level actions (ListObjectsV2, GetBucketTagging, ...).
1639        Arn::s3(bucket).to_string()
1640    }
1641}
1642
1643// Conditional request helpers
1644
1645/// Truncate a DateTime to second-level precision (HTTP dates have no sub-second info).
1646pub(crate) fn truncate_to_seconds(dt: DateTime<Utc>) -> DateTime<Utc> {
1647    dt.with_nanosecond(0).unwrap_or(dt)
1648}
1649
1650pub(crate) fn check_get_conditionals(
1651    req: &AwsRequest,
1652    obj: &S3Object,
1653) -> Result<(), AwsServiceError> {
1654    let obj_etag = format!("\"{}\"", obj.etag);
1655    let obj_time = truncate_to_seconds(obj.last_modified);
1656
1657    // If-Match
1658    if let Some(if_match) = req.headers.get("if-match").and_then(|v| v.to_str().ok()) {
1659        if !etag_matches(if_match, &obj_etag) {
1660            return Err(precondition_failed("If-Match"));
1661        }
1662    }
1663
1664    // If-None-Match
1665    if let Some(if_none_match) = req
1666        .headers
1667        .get("if-none-match")
1668        .and_then(|v| v.to_str().ok())
1669    {
1670        if etag_matches(if_none_match, &obj_etag) {
1671            return Err(not_modified_with_etag(&obj_etag));
1672        }
1673    }
1674
1675    // If-Unmodified-Since
1676    if let Some(since) = req
1677        .headers
1678        .get("if-unmodified-since")
1679        .and_then(|v| v.to_str().ok())
1680    {
1681        if let Some(dt) = parse_http_date(since) {
1682            if obj_time > dt {
1683                return Err(precondition_failed("If-Unmodified-Since"));
1684            }
1685        }
1686    }
1687
1688    // If-Modified-Since
1689    if let Some(since) = req
1690        .headers
1691        .get("if-modified-since")
1692        .and_then(|v| v.to_str().ok())
1693    {
1694        if let Some(dt) = parse_http_date(since) {
1695            if obj_time <= dt {
1696                return Err(not_modified());
1697            }
1698        }
1699    }
1700
1701    Ok(())
1702}
1703
1704pub(crate) fn check_head_conditionals(
1705    req: &AwsRequest,
1706    obj: &S3Object,
1707) -> Result<(), AwsServiceError> {
1708    let obj_etag = format!("\"{}\"", obj.etag);
1709    let obj_time = truncate_to_seconds(obj.last_modified);
1710
1711    // If-Match
1712    if let Some(if_match) = req.headers.get("if-match").and_then(|v| v.to_str().ok()) {
1713        if !etag_matches(if_match, &obj_etag) {
1714            return Err(AwsServiceError::aws_error(
1715                StatusCode::PRECONDITION_FAILED,
1716                "412",
1717                "Precondition Failed",
1718            ));
1719        }
1720    }
1721
1722    // If-None-Match
1723    if let Some(if_none_match) = req
1724        .headers
1725        .get("if-none-match")
1726        .and_then(|v| v.to_str().ok())
1727    {
1728        if etag_matches(if_none_match, &obj_etag) {
1729            return Err(not_modified_with_etag(&obj_etag));
1730        }
1731    }
1732
1733    // If-Unmodified-Since
1734    if let Some(since) = req
1735        .headers
1736        .get("if-unmodified-since")
1737        .and_then(|v| v.to_str().ok())
1738    {
1739        if let Some(dt) = parse_http_date(since) {
1740            if obj_time > dt {
1741                return Err(AwsServiceError::aws_error(
1742                    StatusCode::PRECONDITION_FAILED,
1743                    "412",
1744                    "Precondition Failed",
1745                ));
1746            }
1747        }
1748    }
1749
1750    // If-Modified-Since
1751    if let Some(since) = req
1752        .headers
1753        .get("if-modified-since")
1754        .and_then(|v| v.to_str().ok())
1755    {
1756        if let Some(dt) = parse_http_date(since) {
1757            if obj_time <= dt {
1758                return Err(not_modified());
1759            }
1760        }
1761    }
1762
1763    Ok(())
1764}
1765
1766pub(crate) fn etag_matches(condition: &str, obj_etag: &str) -> bool {
1767    let condition = condition.trim();
1768    if condition == "*" {
1769        return true;
1770    }
1771    let clean_etag = obj_etag.replace('"', "");
1772    // Split on comma to handle multi-value If-Match / If-None-Match
1773    for part in condition.split(',') {
1774        let part = part.trim().replace('"', "");
1775        if part == clean_etag {
1776            return true;
1777        }
1778    }
1779    false
1780}
1781
1782pub(crate) fn parse_http_date(s: &str) -> Option<DateTime<Utc>> {
1783    // Try RFC 2822 format: "Sat, 01 Jan 2000 00:00:00 GMT"
1784    if let Ok(dt) = DateTime::parse_from_rfc2822(s) {
1785        return Some(dt.with_timezone(&Utc));
1786    }
1787    // Try RFC 3339
1788    if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
1789        return Some(dt.with_timezone(&Utc));
1790    }
1791    // Try common HTTP date format: "%a, %d %b %Y %H:%M:%S GMT"
1792    if let Ok(dt) =
1793        chrono::NaiveDateTime::parse_from_str(s.trim_end_matches(" GMT"), "%a, %d %b %Y %H:%M:%S")
1794    {
1795        return Some(dt.and_utc());
1796    }
1797    // Try ISO 8601
1798    if let Ok(dt) = s.parse::<DateTime<Utc>>() {
1799        return Some(dt);
1800    }
1801    None
1802}
1803
1804pub(crate) fn not_modified() -> AwsServiceError {
1805    AwsServiceError::aws_error(StatusCode::NOT_MODIFIED, "304", "Not Modified")
1806}
1807
1808pub(crate) fn not_modified_with_etag(etag: &str) -> AwsServiceError {
1809    AwsServiceError::aws_error_with_headers(
1810        StatusCode::NOT_MODIFIED,
1811        "304",
1812        "Not Modified",
1813        vec![("etag".to_string(), etag.to_string())],
1814    )
1815}
1816
1817pub(crate) fn precondition_failed(condition: &str) -> AwsServiceError {
1818    AwsServiceError::aws_error_with_fields(
1819        StatusCode::PRECONDITION_FAILED,
1820        "PreconditionFailed",
1821        "At least one of the pre-conditions you specified did not hold",
1822        vec![("Condition".to_string(), condition.to_string())],
1823    )
1824}
1825
1826// ACL helpers
1827
1828pub(crate) fn build_acl_xml(owner_id: &str, grants: &[AclGrant], _account_id: &str) -> String {
1829    let mut grants_xml = String::new();
1830    for g in grants {
1831        let grantee_xml = if g.grantee_type == "Group" {
1832            let uri = g.grantee_uri.as_deref().unwrap_or("");
1833            format!(
1834                "<Grantee xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xsi:type=\"Group\">\
1835                 <URI>{}</URI></Grantee>",
1836                xml_escape(uri),
1837            )
1838        } else {
1839            let id = g.grantee_id.as_deref().unwrap_or("");
1840            format!(
1841                "<Grantee xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xsi:type=\"CanonicalUser\">\
1842                 <ID>{}</ID></Grantee>",
1843                xml_escape(id),
1844            )
1845        };
1846        grants_xml.push_str(&format!(
1847            "<Grant>{grantee_xml}<Permission>{}</Permission></Grant>",
1848            xml_escape(&g.permission),
1849        ));
1850    }
1851
1852    format!(
1853        "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
1854         <AccessControlPolicy xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
1855         <Owner><ID>{owner_id}</ID><DisplayName>{owner_id}</DisplayName></Owner>\
1856         <AccessControlList>{grants_xml}</AccessControlList>\
1857         </AccessControlPolicy>",
1858        owner_id = xml_escape(owner_id),
1859    )
1860}
1861
1862pub(crate) fn canned_acl_grants(acl: &str, owner_id: &str) -> Vec<AclGrant> {
1863    let owner_grant = AclGrant {
1864        grantee_type: "CanonicalUser".to_string(),
1865        grantee_id: Some(owner_id.to_string()),
1866        grantee_display_name: Some(owner_id.to_string()),
1867        grantee_uri: None,
1868        permission: "FULL_CONTROL".to_string(),
1869    };
1870    match acl {
1871        "private" => vec![owner_grant],
1872        "public-read" => vec![
1873            owner_grant,
1874            AclGrant {
1875                grantee_type: "Group".to_string(),
1876                grantee_id: None,
1877                grantee_display_name: None,
1878                grantee_uri: Some("http://acs.amazonaws.com/groups/global/AllUsers".to_string()),
1879                permission: "READ".to_string(),
1880            },
1881        ],
1882        "public-read-write" => vec![
1883            owner_grant,
1884            AclGrant {
1885                grantee_type: "Group".to_string(),
1886                grantee_id: None,
1887                grantee_display_name: None,
1888                grantee_uri: Some("http://acs.amazonaws.com/groups/global/AllUsers".to_string()),
1889                permission: "READ".to_string(),
1890            },
1891            AclGrant {
1892                grantee_type: "Group".to_string(),
1893                grantee_id: None,
1894                grantee_display_name: None,
1895                grantee_uri: Some("http://acs.amazonaws.com/groups/global/AllUsers".to_string()),
1896                permission: "WRITE".to_string(),
1897            },
1898        ],
1899        "authenticated-read" => vec![
1900            owner_grant,
1901            AclGrant {
1902                grantee_type: "Group".to_string(),
1903                grantee_id: None,
1904                grantee_display_name: None,
1905                grantee_uri: Some(
1906                    "http://acs.amazonaws.com/groups/global/AuthenticatedUsers".to_string(),
1907                ),
1908                permission: "READ".to_string(),
1909            },
1910        ],
1911        "bucket-owner-full-control" => vec![owner_grant],
1912        _ => vec![owner_grant],
1913    }
1914}
1915
1916pub(crate) fn canned_acl_grants_for_object(acl: &str, owner_id: &str) -> Vec<AclGrant> {
1917    // For objects, canned ACLs work the same way
1918    canned_acl_grants(acl, owner_id)
1919}
1920
1921pub(crate) fn parse_grant_headers(headers: &HeaderMap) -> Vec<AclGrant> {
1922    let mut grants = Vec::new();
1923    let header_permission_map = [
1924        ("x-amz-grant-read", "READ"),
1925        ("x-amz-grant-write", "WRITE"),
1926        ("x-amz-grant-read-acp", "READ_ACP"),
1927        ("x-amz-grant-write-acp", "WRITE_ACP"),
1928        ("x-amz-grant-full-control", "FULL_CONTROL"),
1929    ];
1930
1931    for (header, permission) in &header_permission_map {
1932        if let Some(value) = headers.get(*header).and_then(|v| v.to_str().ok()) {
1933            // Parse "id=xxx" or "uri=xxx" or "emailAddress=xxx"
1934            for part in value.split(',') {
1935                let part = part.trim();
1936                if let Some((key, val)) = part.split_once('=') {
1937                    let val = val.trim().trim_matches('"');
1938                    let key = key.trim().to_lowercase();
1939                    match key.as_str() {
1940                        "id" => {
1941                            grants.push(AclGrant {
1942                                grantee_type: "CanonicalUser".to_string(),
1943                                grantee_id: Some(val.to_string()),
1944                                grantee_display_name: Some(val.to_string()),
1945                                grantee_uri: None,
1946                                permission: permission.to_string(),
1947                            });
1948                        }
1949                        "uri" | "url" => {
1950                            grants.push(AclGrant {
1951                                grantee_type: "Group".to_string(),
1952                                grantee_id: None,
1953                                grantee_display_name: None,
1954                                grantee_uri: Some(val.to_string()),
1955                                permission: permission.to_string(),
1956                            });
1957                        }
1958                        _ => {}
1959                    }
1960                }
1961            }
1962        }
1963    }
1964    grants
1965}
1966
1967pub(crate) fn parse_acl_xml(xml: &str) -> Result<Vec<AclGrant>, AwsServiceError> {
1968    // Check for Owner presence
1969    if xml.contains("<AccessControlPolicy") && !xml.contains("<Owner>") {
1970        return Err(AwsServiceError::aws_error(
1971            StatusCode::BAD_REQUEST,
1972            "MalformedACLError",
1973            "The XML you provided was not well-formed or did not validate against our published schema",
1974        ));
1975    }
1976
1977    let valid_permissions = ["READ", "WRITE", "READ_ACP", "WRITE_ACP", "FULL_CONTROL"];
1978
1979    let mut grants = Vec::new();
1980    let mut remaining = xml;
1981    while let Some(start) = remaining.find("<Grant>") {
1982        let after = &remaining[start + 7..];
1983        if let Some(end) = after.find("</Grant>") {
1984            let grant_body = &after[..end];
1985
1986            // Extract permission
1987            let permission = extract_xml_value(grant_body, "Permission").unwrap_or_default();
1988            if !valid_permissions.contains(&permission.as_str()) {
1989                return Err(AwsServiceError::aws_error(
1990                    StatusCode::BAD_REQUEST,
1991                    "MalformedACLError",
1992                    "The XML you provided was not well-formed or did not validate against our published schema",
1993                ));
1994            }
1995
1996            // Determine grantee type
1997            if grant_body.contains("xsi:type=\"Group\"") || grant_body.contains("<URI>") {
1998                let uri = extract_xml_value(grant_body, "URI").unwrap_or_default();
1999                grants.push(AclGrant {
2000                    grantee_type: "Group".to_string(),
2001                    grantee_id: None,
2002                    grantee_display_name: None,
2003                    grantee_uri: Some(uri),
2004                    permission,
2005                });
2006            } else {
2007                let id = extract_xml_value(grant_body, "ID").unwrap_or_default();
2008                let display =
2009                    extract_xml_value(grant_body, "DisplayName").unwrap_or_else(|| id.clone());
2010                grants.push(AclGrant {
2011                    grantee_type: "CanonicalUser".to_string(),
2012                    grantee_id: Some(id),
2013                    grantee_display_name: Some(display),
2014                    grantee_uri: None,
2015                    permission,
2016                });
2017            }
2018
2019            remaining = &after[end + 8..];
2020        } else {
2021            break;
2022        }
2023    }
2024    Ok(grants)
2025}
2026
2027// Range helpers
2028
2029pub(crate) enum RangeResult {
2030    Satisfiable { start: usize, end: usize },
2031    NotSatisfiable,
2032    Ignored,
2033}
2034
2035pub(crate) fn parse_range_header(range_str: &str, total_size: usize) -> Option<RangeResult> {
2036    let range_str = range_str.strip_prefix("bytes=")?;
2037    let (start_str, end_str) = range_str.split_once('-')?;
2038    if start_str.is_empty() {
2039        let suffix_len: usize = end_str.parse().ok()?;
2040        if suffix_len == 0 || total_size == 0 {
2041            return Some(RangeResult::NotSatisfiable);
2042        }
2043        let start = total_size.saturating_sub(suffix_len);
2044        Some(RangeResult::Satisfiable {
2045            start,
2046            end: total_size - 1,
2047        })
2048    } else {
2049        let start: usize = start_str.parse().ok()?;
2050        if start >= total_size {
2051            return Some(RangeResult::NotSatisfiable);
2052        }
2053        let end = if end_str.is_empty() {
2054            total_size - 1
2055        } else {
2056            let e: usize = end_str.parse().ok()?;
2057            if e < start {
2058                return Some(RangeResult::Ignored);
2059            }
2060            std::cmp::min(e, total_size - 1)
2061        };
2062        Some(RangeResult::Satisfiable { start, end })
2063    }
2064}
2065
2066// Helpers
2067
2068/// S3 XML response with `application/xml` content type (unlike Query protocol's `text/xml`).
2069pub(crate) fn s3_xml(status: StatusCode, body: impl Into<Bytes>) -> AwsResponse {
2070    AwsResponse {
2071        status,
2072        content_type: "application/xml".to_string(),
2073        body: body.into().into(),
2074        headers: HeaderMap::new(),
2075    }
2076}
2077
2078pub(crate) fn empty_response(status: StatusCode) -> AwsResponse {
2079    AwsResponse {
2080        status,
2081        content_type: "application/xml".to_string(),
2082        body: Bytes::new().into(),
2083        headers: HeaderMap::new(),
2084    }
2085}
2086
2087/// Returns true when the object is stored in a "cold" storage class (GLACIER, DEEP_ARCHIVE)
2088/// and has NOT been restored (or restore is still in progress).
2089pub(crate) fn is_frozen(obj: &S3Object) -> bool {
2090    matches!(obj.storage_class.as_str(), "GLACIER" | "DEEP_ARCHIVE")
2091        && obj.restore_ongoing != Some(false)
2092}
2093
2094pub(crate) fn no_such_bucket(bucket: &str) -> AwsServiceError {
2095    AwsServiceError::aws_error_with_fields(
2096        StatusCode::NOT_FOUND,
2097        "NoSuchBucket",
2098        "The specified bucket does not exist",
2099        vec![("BucketName".to_string(), bucket.to_string())],
2100    )
2101}
2102
2103pub(crate) fn no_such_key(key: &str) -> AwsServiceError {
2104    AwsServiceError::aws_error_with_fields(
2105        StatusCode::NOT_FOUND,
2106        "NoSuchKey",
2107        "The specified key does not exist.",
2108        vec![("Key".to_string(), key.to_string())],
2109    )
2110}
2111
2112pub(crate) fn no_such_upload(upload_id: &str) -> AwsServiceError {
2113    AwsServiceError::aws_error_with_fields(
2114        StatusCode::NOT_FOUND,
2115        "NoSuchUpload",
2116        "The specified upload does not exist. The upload ID may be invalid, \
2117         or the upload may have been aborted or completed.",
2118        vec![("UploadId".to_string(), upload_id.to_string())],
2119    )
2120}
2121
2122pub(crate) fn no_such_key_with_detail(key: &str) -> AwsServiceError {
2123    AwsServiceError::aws_error_with_fields(
2124        StatusCode::NOT_FOUND,
2125        "NoSuchKey",
2126        "The specified key does not exist.",
2127        vec![("Key".to_string(), key.to_string())],
2128    )
2129}
2130
2131pub(crate) fn compute_md5(data: &[u8]) -> String {
2132    let digest = Md5::digest(data);
2133    format!("{:x}", digest)
2134}
2135
2136pub(crate) fn compute_checksum(algorithm: &str, data: &[u8]) -> String {
2137    let raw = compute_checksum_raw(algorithm, data);
2138    if raw.is_empty() {
2139        String::new()
2140    } else {
2141        BASE64.encode(raw)
2142    }
2143}
2144
2145/// Compute the raw (un-base64'd) checksum digest bytes for `data`.
2146///
2147/// Returns the network-order digest bytes so callers can either base64
2148/// them ([`compute_checksum`]) or concatenate them when computing an S3
2149/// multipart COMPOSITE checksum (which hashes the concatenation of each
2150/// part's raw digest, then base64s the result and appends `-N`).
2151pub(crate) fn compute_checksum_raw(algorithm: &str, data: &[u8]) -> Vec<u8> {
2152    match algorithm {
2153        "CRC32" => crc32fast::hash(data).to_be_bytes().to_vec(),
2154        // CRC32C and CRC64NVME use the same crates as the streaming path so
2155        // CopyObject / CompleteMultipartUpload produce identical results to
2156        // a streaming PutObject (1.7); the COMPOSITE multipart checksum
2157        // concatenates these raw digests before hashing (1.18).
2158        "CRC32C" => crc32c::crc32c(data).to_be_bytes().to_vec(),
2159        "CRC64NVME" => {
2160            let mut hasher = crc64fast_nvme::Digest::new();
2161            hasher.write(data);
2162            hasher.sum64().to_be_bytes().to_vec()
2163        }
2164        "SHA1" => {
2165            use sha1::Digest as _;
2166            sha1::Sha1::digest(data).to_vec()
2167        }
2168        "SHA256" => {
2169            use sha2::Digest as _;
2170            sha2::Sha256::digest(data).to_vec()
2171        }
2172        _ => Vec::new(),
2173    }
2174}
2175
2176/// Compute an S3 multipart COMPOSITE additional checksum from each
2177/// part's raw digest bytes (in part-number order). AWS defines this as
2178/// `base64(HASH(concat(raw_part_digests)))-N` where HASH is the chosen
2179/// algorithm and N is the part count. Returns `None` for an unsupported
2180/// algorithm or empty part list.
2181pub(crate) fn compute_composite_checksum(
2182    algorithm: &str,
2183    part_raw_digests: &[Vec<u8>],
2184) -> Option<String> {
2185    if part_raw_digests.is_empty() {
2186        return None;
2187    }
2188    let mut concat = Vec::new();
2189    for d in part_raw_digests {
2190        concat.extend_from_slice(d);
2191    }
2192    let digest = compute_checksum_raw(algorithm, &concat);
2193    if digest.is_empty() {
2194        return None;
2195    }
2196    Some(format!(
2197        "{}-{}",
2198        BASE64.encode(digest),
2199        part_raw_digests.len()
2200    ))
2201}
2202
2203/// Streaming variant of [`compute_checksum`] for spool files. Reads
2204/// the file in 1 MiB chunks and feeds each chunk into the hasher so a
2205/// 1 GiB upload computes its CRC32 / SHA-1 / SHA-256 in constant
2206/// memory rather than via `tokio::fs::read` (which would allocate the
2207/// whole file as one buffer).
2208pub(crate) async fn compute_checksum_streaming(
2209    algorithm: &str,
2210    path: &std::path::Path,
2211) -> Result<String, std::io::Error> {
2212    use tokio::io::AsyncReadExt;
2213    let mut file = tokio::fs::File::open(path).await?;
2214    let mut buf = vec![0u8; 1024 * 1024];
2215    match algorithm {
2216        "CRC32" => {
2217            let mut hasher = crc32fast::Hasher::new();
2218            loop {
2219                let n = file.read(&mut buf).await?;
2220                if n == 0 {
2221                    break;
2222                }
2223                hasher.update(&buf[..n]);
2224            }
2225            Ok(BASE64.encode(hasher.finalize().to_be_bytes()))
2226        }
2227        "CRC32C" => {
2228            let mut crc: u32 = 0;
2229            loop {
2230                let n = file.read(&mut buf).await?;
2231                if n == 0 {
2232                    break;
2233                }
2234                crc = crc32c::crc32c_append(crc, &buf[..n]);
2235            }
2236            Ok(BASE64.encode(crc.to_be_bytes()))
2237        }
2238        "CRC64NVME" => {
2239            let mut hasher = crc64fast_nvme::Digest::new();
2240            loop {
2241                let n = file.read(&mut buf).await?;
2242                if n == 0 {
2243                    break;
2244                }
2245                hasher.write(&buf[..n]);
2246            }
2247            Ok(BASE64.encode(hasher.sum64().to_be_bytes()))
2248        }
2249        "SHA1" => {
2250            use sha1::Digest as _;
2251            let mut hasher = sha1::Sha1::new();
2252            loop {
2253                let n = file.read(&mut buf).await?;
2254                if n == 0 {
2255                    break;
2256                }
2257                hasher.update(&buf[..n]);
2258            }
2259            Ok(BASE64.encode(hasher.finalize()))
2260        }
2261        "SHA256" => {
2262            use sha2::Digest as _;
2263            let mut hasher = sha2::Sha256::new();
2264            loop {
2265                let n = file.read(&mut buf).await?;
2266                if n == 0 {
2267                    break;
2268                }
2269                hasher.update(&buf[..n]);
2270            }
2271            Ok(BASE64.encode(hasher.finalize()))
2272        }
2273        _ => Ok(String::new()),
2274    }
2275}
2276
2277pub(crate) fn url_encode_s3_key(s: &str) -> String {
2278    let mut out = String::with_capacity(s.len() * 2);
2279    for byte in s.bytes() {
2280        match byte {
2281            b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' | b'/' => {
2282                out.push(byte as char);
2283            }
2284            _ => {
2285                out.push_str(&format!("%{:02X}", byte));
2286            }
2287        }
2288    }
2289    out
2290}
2291
2292pub(crate) use fakecloud_aws::xml::xml_escape;
2293
2294pub(crate) fn extract_user_metadata(
2295    headers: &HeaderMap,
2296) -> std::collections::BTreeMap<String, String> {
2297    let mut meta = std::collections::BTreeMap::new();
2298    for (name, value) in headers {
2299        if let Some(key) = name.as_str().strip_prefix("x-amz-meta-") {
2300            if let Ok(v) = value.to_str() {
2301                meta.insert(key.to_string(), v.to_string());
2302            }
2303        }
2304    }
2305    meta
2306}
2307
2308pub(crate) fn is_valid_storage_class(class: &str) -> bool {
2309    matches!(
2310        class,
2311        "STANDARD"
2312            | "REDUCED_REDUNDANCY"
2313            | "STANDARD_IA"
2314            | "ONEZONE_IA"
2315            | "INTELLIGENT_TIERING"
2316            | "GLACIER"
2317            | "DEEP_ARCHIVE"
2318            | "GLACIER_IR"
2319            | "OUTPOSTS"
2320            | "SNOW"
2321            | "EXPRESS_ONEZONE"
2322    )
2323}
2324
2325pub(crate) fn is_valid_bucket_name(name: &str) -> bool {
2326    if name.len() < 3 || name.len() > 63 {
2327        return false;
2328    }
2329    // Must start and end with alphanumeric
2330    let bytes = name.as_bytes();
2331    if !bytes[0].is_ascii_alphanumeric() || !bytes[bytes.len() - 1].is_ascii_alphanumeric() {
2332        return false;
2333    }
2334    // Only lowercase letters, digits, hyphens, dots (also allow underscores for compatibility)
2335    name.chars()
2336        .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '-' || c == '.' || c == '_')
2337}
2338
2339pub(crate) fn is_valid_region(region: &str) -> bool {
2340    // Basic validation: region should match pattern like us-east-1, eu-west-2, etc.
2341    let valid_regions = [
2342        "us-east-1",
2343        "us-east-2",
2344        "us-west-1",
2345        "us-west-2",
2346        "af-south-1",
2347        "ap-east-1",
2348        "ap-south-1",
2349        "ap-south-2",
2350        "ap-southeast-1",
2351        "ap-southeast-2",
2352        "ap-southeast-3",
2353        "ap-southeast-4",
2354        "ap-northeast-1",
2355        "ap-northeast-2",
2356        "ap-northeast-3",
2357        "ca-central-1",
2358        "ca-west-1",
2359        "eu-central-1",
2360        "eu-central-2",
2361        "eu-west-1",
2362        "eu-west-2",
2363        "eu-west-3",
2364        "eu-south-1",
2365        "eu-south-2",
2366        "eu-north-1",
2367        "il-central-1",
2368        "me-south-1",
2369        "me-central-1",
2370        "sa-east-1",
2371        "cn-north-1",
2372        "cn-northwest-1",
2373        "us-gov-east-1",
2374        "us-gov-east-2",
2375        "us-gov-west-1",
2376        "us-iso-east-1",
2377        "us-iso-west-1",
2378        "us-isob-east-1",
2379        "us-isof-south-1",
2380    ];
2381    valid_regions.contains(&region)
2382}
2383
2384pub(crate) fn resolve_object<'a>(
2385    b: &'a S3Bucket,
2386    key: &str,
2387    version_id: Option<&String>,
2388) -> Result<&'a S3Object, AwsServiceError> {
2389    if let Some(vid) = version_id {
2390        // "null" version ID refers to an object with no version_id (pre-versioning)
2391        if vid == "null" {
2392            // Check versions for a pre-versioning object (version_id == None or Some("null"))
2393            if let Some(versions) = b.object_versions.get(key) {
2394                if let Some(obj) = versions
2395                    .iter()
2396                    .find(|o| o.version_id.is_none() || o.version_id.as_deref() == Some("null"))
2397                {
2398                    return Ok(obj);
2399                }
2400            }
2401            // Also check current object if it has no version_id
2402            if let Some(obj) = b.objects.get(key) {
2403                if obj.version_id.is_none() || obj.version_id.as_deref() == Some("null") {
2404                    return Ok(obj);
2405                }
2406            }
2407        } else {
2408            // When a specific versionId is requested, check versions first
2409            if let Some(versions) = b.object_versions.get(key) {
2410                if let Some(obj) = versions
2411                    .iter()
2412                    .find(|o| o.version_id.as_deref() == Some(vid.as_str()))
2413                {
2414                    return Ok(obj);
2415                }
2416            }
2417            // Also check current object
2418            if let Some(obj) = b.objects.get(key) {
2419                if obj.version_id.as_deref() == Some(vid.as_str()) {
2420                    return Ok(obj);
2421                }
2422            }
2423        }
2424        // For versioned buckets, return NoSuchVersion; for non-versioned, return 400
2425        if b.versioning.is_some() {
2426            Err(AwsServiceError::aws_error_with_fields(
2427                StatusCode::NOT_FOUND,
2428                "NoSuchVersion",
2429                "The specified version does not exist.",
2430                vec![
2431                    ("Key".to_string(), key.to_string()),
2432                    ("VersionId".to_string(), vid.to_string()),
2433                ],
2434            ))
2435        } else {
2436            Err(AwsServiceError::aws_error(
2437                StatusCode::BAD_REQUEST,
2438                "InvalidArgument",
2439                "Invalid version id specified",
2440            ))
2441        }
2442    } else {
2443        b.objects.get(key).ok_or_else(|| no_such_key(key))
2444    }
2445}
2446
2447pub(crate) fn make_delete_marker(key: &str, dm_id: &str) -> S3Object {
2448    S3Object {
2449        key: key.to_string(),
2450        last_modified: Utc::now(),
2451        storage_class: "STANDARD".to_string(),
2452        version_id: Some(dm_id.to_string()),
2453        is_delete_marker: true,
2454        ..Default::default()
2455    }
2456}
2457
2458/// Represents an object to delete in a batch delete request.
2459pub(crate) struct DeleteObjectEntry {
2460    key: String,
2461    version_id: Option<String>,
2462}
2463
2464pub(crate) fn parse_delete_objects_xml(xml: &str) -> Vec<DeleteObjectEntry> {
2465    let mut entries = Vec::new();
2466    let mut remaining = xml;
2467    while let Some(obj_start) = remaining.find("<Object>") {
2468        let after = &remaining[obj_start + 8..];
2469        if let Some(obj_end) = after.find("</Object>") {
2470            let obj_body = &after[..obj_end];
2471            let key = extract_xml_value(obj_body, "Key");
2472            let version_id = extract_xml_value(obj_body, "VersionId");
2473            if let Some(k) = key {
2474                entries.push(DeleteObjectEntry { key: k, version_id });
2475            }
2476            remaining = &after[obj_end + 9..];
2477        } else {
2478            break;
2479        }
2480    }
2481    entries
2482}
2483
2484/// Returns true when the request body's top-level <Quiet> element is "true".
2485/// AWS docs: when Quiet=true, only failed deletes are emitted.
2486pub(crate) fn parse_delete_objects_quiet(xml: &str) -> bool {
2487    extract_xml_value(xml, "Quiet")
2488        .map(|v| v.eq_ignore_ascii_case("true"))
2489        .unwrap_or(false)
2490}
2491
2492/// Minimal XML parser for `<Tagging><TagSet><Tag><Key>k</Key><Value>v</Value></Tag>...`.
2493/// Returns a Vec to preserve insertion order and detect duplicates.
2494pub(crate) fn parse_tagging_xml(xml: &str) -> Vec<(String, String)> {
2495    let mut tags = Vec::new();
2496    let mut remaining = xml;
2497    while let Some(tag_start) = remaining.find("<Tag>") {
2498        let after = &remaining[tag_start + 5..];
2499        if let Some(tag_end) = after.find("</Tag>") {
2500            let tag_body = &after[..tag_end];
2501            let key = extract_xml_value(tag_body, "Key");
2502            let value = extract_xml_value(tag_body, "Value");
2503            if let (Some(k), Some(v)) = (key, value) {
2504                tags.push((k, v));
2505            }
2506            remaining = &after[tag_end + 6..];
2507        } else {
2508            break;
2509        }
2510    }
2511    tags
2512}
2513
2514pub(crate) fn validate_tags(tags: &[(String, String)]) -> Result<(), AwsServiceError> {
2515    // Check for duplicate keys
2516    let mut seen = std::collections::HashSet::new();
2517    for (k, _) in tags {
2518        if !seen.insert(k.as_str()) {
2519            return Err(AwsServiceError::aws_error(
2520                StatusCode::BAD_REQUEST,
2521                "InvalidTag",
2522                "Cannot provide multiple Tags with the same key",
2523            ));
2524        }
2525        // Check for aws: prefix
2526        if k.starts_with("aws:") {
2527            return Err(AwsServiceError::aws_error(
2528                StatusCode::BAD_REQUEST,
2529                "InvalidTag",
2530                "System tags cannot be added/updated by requester",
2531            ));
2532        }
2533    }
2534    Ok(())
2535}
2536
2537pub(crate) fn extract_xml_value(xml: &str, tag: &str) -> Option<String> {
2538    // Handle self-closing tags like <Value /> or <Value/>
2539    let self_closing1 = format!("<{tag} />");
2540    let self_closing2 = format!("<{tag}/>");
2541    if xml.contains(&self_closing1) || xml.contains(&self_closing2) {
2542        // Check if the self-closing tag appears before any open+close pair
2543        let self_pos = xml
2544            .find(&self_closing1)
2545            .or_else(|| xml.find(&self_closing2));
2546        let open = format!("<{tag}>");
2547        let open_pos = xml.find(&open);
2548        match (self_pos, open_pos) {
2549            (Some(sp), Some(op)) if sp < op => return Some(String::new()),
2550            (Some(_), None) => return Some(String::new()),
2551            _ => {}
2552        }
2553    }
2554
2555    let open = format!("<{tag}>");
2556    let close = format!("</{tag}>");
2557    let start = xml.find(&open)? + open.len();
2558    // Search for the closing tag AFTER the opening one so a malformed body
2559    // (closing tag before opening) can't make end < start and panic the
2560    // slice (bug-audit 2026-05-28, 2.2).
2561    let end = xml[start..].find(&close)? + start;
2562    Some(xml[start..end].to_string())
2563}
2564
2565/// Parse the CompleteMultipartUpload XML body into (part_number, etag) pairs.
2566pub(crate) fn parse_complete_multipart_xml(xml: &str) -> Vec<(u32, String)> {
2567    let mut parts = Vec::new();
2568    let mut remaining = xml;
2569    while let Some(part_start) = remaining.find("<Part>") {
2570        let after = &remaining[part_start + 6..];
2571        if let Some(part_end) = after.find("</Part>") {
2572            let part_body = &after[..part_end];
2573            let part_num =
2574                extract_xml_value(part_body, "PartNumber").and_then(|s| s.parse::<u32>().ok());
2575            let etag = extract_xml_value(part_body, "ETag")
2576                .map(|s| s.replace("&quot;", "").replace('"', ""));
2577            if let (Some(num), Some(e)) = (part_num, etag) {
2578                parts.push((num, e));
2579            }
2580            remaining = &after[part_end + 7..];
2581        } else {
2582            break;
2583        }
2584    }
2585    parts
2586}
2587
2588pub(crate) fn parse_url_encoded_tags(s: &str) -> Vec<(String, String)> {
2589    let mut tags = Vec::new();
2590    for pair in s.split('&') {
2591        if pair.is_empty() {
2592            continue;
2593        }
2594        let (key, value) = match pair.find('=') {
2595            Some(pos) => (&pair[..pos], &pair[pos + 1..]),
2596            None => (pair, ""),
2597        };
2598        tags.push((
2599            percent_encoding::percent_decode_str(key)
2600                .decode_utf8_lossy()
2601                .to_string(),
2602            percent_encoding::percent_decode_str(value)
2603                .decode_utf8_lossy()
2604                .to_string(),
2605        ));
2606    }
2607    tags
2608}
2609
2610/// Validate lifecycle configuration XML. Returns MalformedXML on invalid configs.
2611pub(crate) fn validate_lifecycle_xml(xml: &str) -> Result<(), AwsServiceError> {
2612    let malformed = || {
2613        AwsServiceError::aws_error(
2614            StatusCode::BAD_REQUEST,
2615            "MalformedXML",
2616            "The XML you provided was not well-formed or did not validate against our published schema",
2617        )
2618    };
2619
2620    let mut remaining = xml;
2621    while let Some(rule_start) = remaining.find("<Rule>") {
2622        let after = &remaining[rule_start + 6..];
2623        if let Some(rule_end) = after.find("</Rule>") {
2624            let rule_body = &after[..rule_end];
2625
2626            // Must have Filter or Prefix
2627            let has_filter = rule_body.contains("<Filter>")
2628                || rule_body.contains("<Filter/>")
2629                || rule_body.contains("<Filter />");
2630
2631            // Check for <Prefix> at rule level (outside of <Filter>...</Filter>)
2632            let has_prefix_outside_filter = {
2633                if !rule_body.contains("<Prefix") {
2634                    false
2635                } else if !has_filter {
2636                    true // No filter means any Prefix is at rule level
2637                } else {
2638                    // Remove the Filter block and check if Prefix remains
2639                    let mut stripped = rule_body.to_string();
2640                    // Remove <Filter>...</Filter> or self-closing variants
2641                    if let Some(fs) = stripped.find("<Filter") {
2642                        if let Some(fe) = stripped.find("</Filter>") {
2643                            stripped = format!("{}{}", &stripped[..fs], &stripped[fe + 9..]);
2644                        }
2645                    }
2646                    stripped.contains("<Prefix")
2647                }
2648            };
2649
2650            if !has_filter && !has_prefix_outside_filter {
2651                return Err(malformed());
2652            }
2653            // Can't have both Filter and rule-level Prefix
2654            if has_filter && has_prefix_outside_filter {
2655                return Err(malformed());
2656            }
2657
2658            // Expiration: if has ExpiredObjectDeleteMarker, cannot also have Days or Date
2659            // (only check within <Expiration> block)
2660            if let Some(exp_start) = rule_body.find("<Expiration>") {
2661                if let Some(exp_end) = rule_body[exp_start..].find("</Expiration>") {
2662                    let exp_body = &rule_body[exp_start..exp_start + exp_end];
2663                    if exp_body.contains("<ExpiredObjectDeleteMarker>")
2664                        && (exp_body.contains("<Days>") || exp_body.contains("<Date>"))
2665                    {
2666                        return Err(malformed());
2667                    }
2668                }
2669            }
2670
2671            // Filter validation
2672            if has_filter {
2673                if let Some(fs) = rule_body.find("<Filter>") {
2674                    if let Some(fe) = rule_body.find("</Filter>") {
2675                        // `find` locates the two tags independently, so a body
2676                        // with `</Filter>` before `<Filter>` would slice with
2677                        // begin > end and panic (dropping the connection -- a
2678                        // reachable DoS). Reject as malformed instead.
2679                        if fe < fs + 8 {
2680                            return Err(malformed());
2681                        }
2682                        let filter_body = &rule_body[fs + 8..fe];
2683                        let has_prefix_in_filter = filter_body.contains("<Prefix");
2684                        let has_tag_in_filter = filter_body.contains("<Tag>");
2685                        let has_and_in_filter = filter_body.contains("<And>");
2686                        // Can't have both Prefix and Tag without And
2687                        if has_prefix_in_filter && has_tag_in_filter && !has_and_in_filter {
2688                            return Err(malformed());
2689                        }
2690                        // Can't have Tag and And simultaneously at the Filter level
2691                        if has_tag_in_filter && has_and_in_filter {
2692                            // Check if the <Tag> is outside <And>
2693                            let and_start = filter_body.find("<And>").unwrap_or(0);
2694                            let tag_pos = filter_body.find("<Tag>").unwrap_or(0);
2695                            if tag_pos < and_start {
2696                                return Err(malformed());
2697                            }
2698                        }
2699                    }
2700                }
2701            }
2702
2703            // NoncurrentVersionTransition must have NoncurrentDays and StorageClass
2704            if rule_body.contains("<NoncurrentVersionTransition>") {
2705                let mut nvt_remaining = rule_body;
2706                while let Some(nvt_start) = nvt_remaining.find("<NoncurrentVersionTransition>") {
2707                    let nvt_after = &nvt_remaining[nvt_start + 29..];
2708                    if let Some(nvt_end) = nvt_after.find("</NoncurrentVersionTransition>") {
2709                        let nvt_body = &nvt_after[..nvt_end];
2710                        if !nvt_body.contains("<NoncurrentDays>") {
2711                            return Err(malformed());
2712                        }
2713                        if !nvt_body.contains("<StorageClass>") {
2714                            return Err(malformed());
2715                        }
2716                        nvt_remaining = &nvt_after[nvt_end + 30..];
2717                    } else {
2718                        break;
2719                    }
2720                }
2721            }
2722
2723            remaining = &after[rule_end + 7..];
2724        } else {
2725            break;
2726        }
2727    }
2728
2729    Ok(())
2730}
2731
2732/// Parsed CORS rule from bucket configuration XML.
2733pub(crate) struct CorsRule {
2734    allowed_origins: Vec<String>,
2735    allowed_methods: Vec<String>,
2736    allowed_headers: Vec<String>,
2737    expose_headers: Vec<String>,
2738    max_age_seconds: Option<u32>,
2739}
2740
2741/// Parse CORS configuration XML into rules.
2742pub(crate) fn parse_cors_config(xml: &str) -> Vec<CorsRule> {
2743    let mut rules = Vec::new();
2744    let mut remaining = xml;
2745    while let Some(start) = remaining.find("<CORSRule>") {
2746        let after = &remaining[start + 10..];
2747        if let Some(end) = after.find("</CORSRule>") {
2748            let block = &after[..end];
2749            let allowed_origins = extract_all_xml_values(block, "AllowedOrigin");
2750            let allowed_methods = extract_all_xml_values(block, "AllowedMethod");
2751            let allowed_headers = extract_all_xml_values(block, "AllowedHeader");
2752            let expose_headers = extract_all_xml_values(block, "ExposeHeader");
2753            let max_age_seconds =
2754                extract_xml_value(block, "MaxAgeSeconds").and_then(|s| s.parse().ok());
2755            rules.push(CorsRule {
2756                allowed_origins,
2757                allowed_methods,
2758                allowed_headers,
2759                expose_headers,
2760                max_age_seconds,
2761            });
2762            remaining = &after[end + 11..];
2763        } else {
2764            break;
2765        }
2766    }
2767    rules
2768}
2769
2770/// Match an origin against a CORS allowed origin pattern (supports "*" wildcard).
2771pub(crate) fn origin_matches(origin: &str, pattern: &str) -> bool {
2772    if pattern == "*" {
2773        return true;
2774    }
2775    // Simple wildcard: *.example.com
2776    if let Some(suffix) = pattern.strip_prefix('*') {
2777        return origin.ends_with(suffix);
2778    }
2779    origin == pattern
2780}
2781
2782/// Find the matching CORS rule for a given origin and method.
2783pub(crate) fn find_cors_rule<'a>(
2784    rules: &'a [CorsRule],
2785    origin: &str,
2786    method: Option<&str>,
2787) -> Option<&'a CorsRule> {
2788    rules.iter().find(|rule| {
2789        let origin_ok = rule
2790            .allowed_origins
2791            .iter()
2792            .any(|o| origin_matches(origin, o));
2793        let method_ok = match method {
2794            Some(m) => rule.allowed_methods.iter().any(|am| am == m),
2795            None => true,
2796        };
2797        origin_ok && method_ok
2798    })
2799}
2800
2801/// Check if an object is locked (retention or legal hold) and should block mutation.
2802/// Returns an error string if locked, None if allowed.
2803pub(crate) fn check_object_lock_for_overwrite(
2804    obj: &S3Object,
2805    req: &AwsRequest,
2806) -> Option<&'static str> {
2807    // Legal hold blocks overwrite
2808    if obj.lock_legal_hold.as_deref() == Some("ON") {
2809        return Some("AccessDenied");
2810    }
2811    // Retention check
2812    if let (Some(mode), Some(until)) = (&obj.lock_mode, &obj.lock_retain_until) {
2813        if *until > Utc::now() {
2814            if mode == "COMPLIANCE" {
2815                return Some("AccessDenied");
2816            }
2817            if mode == "GOVERNANCE" {
2818                let bypass = req
2819                    .headers
2820                    .get("x-amz-bypass-governance-retention")
2821                    .and_then(|v| v.to_str().ok())
2822                    .map(|s| s.eq_ignore_ascii_case("true"))
2823                    .unwrap_or(false);
2824                if !bypass {
2825                    return Some("AccessDenied");
2826                }
2827            }
2828        }
2829    }
2830    None
2831}
2832
2833#[cfg(test)]
2834mod tests;
2835
2836#[cfg(test)]
2837mod s3_detect_action_tests {
2838    //! Auth-mapping regressions for bug-hunt 2026-06-15 §5.3: ops that
2839    //! were unmapped (-> enforcement skipped) or mapped to the wrong
2840    //! action (-> the policy targeting them never applied).
2841    use super::s3_detect_action;
2842    use std::collections::HashMap;
2843
2844    fn q(pairs: &[(&str, &str)]) -> HashMap<String, String> {
2845        pairs
2846            .iter()
2847            .map(|(k, v)| (k.to_string(), v.to_string()))
2848            .collect()
2849    }
2850
2851    #[test]
2852    fn select_object_content_maps_to_get_object() {
2853        // POST /bucket/key?select&select-type=2 reads object data.
2854        let action = s3_detect_action(
2855            "POST",
2856            Some("bucket"),
2857            Some("key"),
2858            &q(&[("select", ""), ("select-type", "2")]),
2859        );
2860        assert_eq!(action, Some("GetObject"));
2861    }
2862
2863    #[test]
2864    fn write_get_object_response_is_detected() {
2865        // POST /WriteGetObjectResponse (no key) — Object Lambda data plane.
2866        let action = s3_detect_action("POST", Some("WriteGetObjectResponse"), None, &q(&[]));
2867        assert_eq!(action, Some("WriteGetObjectResponse"));
2868    }
2869
2870    #[test]
2871    fn rename_object_is_detected_with_key() {
2872        // PUT /bucket/newkey?renameObject carries a key — previously fell
2873        // through to PutObject so s3:RenameObject denies were ineffective.
2874        let action = s3_detect_action(
2875            "PUT",
2876            Some("bucket"),
2877            Some("newkey"),
2878            &q(&[("renameObject", "")]),
2879        );
2880        assert_eq!(action, Some("RenameObject"));
2881    }
2882
2883    #[test]
2884    fn update_object_encryption_maps_to_put_object() {
2885        // PUT /bucket/key?encryption changes object SSE; AWS gates this on
2886        // s3:PutObject (no distinct public action). Deliberate, not a
2887        // silent PutObject fall-through.
2888        let action = s3_detect_action(
2889            "PUT",
2890            Some("bucket"),
2891            Some("key"),
2892            &q(&[("encryption", "")]),
2893        );
2894        assert_eq!(action, Some("PutObject"));
2895    }
2896
2897    #[test]
2898    fn create_session_not_misdetected_as_list_objects() {
2899        // GET /bucket?session — previously fell through to ListObjects.
2900        let action = s3_detect_action("GET", Some("bucket"), None, &q(&[("session", "")]));
2901        assert_eq!(action, Some("CreateSession"));
2902    }
2903
2904    #[test]
2905    fn get_bucket_abac_not_misdetected_as_list_objects() {
2906        // GET /bucket?abac — previously fell through to ListObjects.
2907        let action = s3_detect_action("GET", Some("bucket"), None, &q(&[("abac", "")]));
2908        assert_eq!(action, Some("GetBucketAbacConfiguration"));
2909    }
2910
2911    #[test]
2912    fn put_bucket_abac_still_detected() {
2913        let action = s3_detect_action("PUT", Some("bucket"), None, &q(&[("abac", "")]));
2914        assert_eq!(action, Some("PutBucketAbacConfiguration"));
2915    }
2916}
2917
2918#[cfg(test)]
2919mod s3_iam_service_prefix_tests {
2920    //! §5.3: CreateSession and WriteGetObjectResponse are authorized under
2921    //! sibling IAM service prefixes (s3express / s3-object-lambda), not s3.
2922    use super::*;
2923    use parking_lot::RwLock;
2924    use std::sync::Arc;
2925
2926    fn make_service() -> S3Service {
2927        let state: SharedS3State = Arc::new(RwLock::new(
2928            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
2929        ));
2930        S3Service::new(state, Arc::new(DeliveryBus::new()))
2931    }
2932
2933    fn req(method: Method, path: &str, query: &[(&str, &str)]) -> AwsRequest {
2934        let path_segments: Vec<String> = path
2935            .split('/')
2936            .filter(|s| !s.is_empty())
2937            .map(|s| s.to_string())
2938            .collect();
2939        AwsRequest {
2940            service: "s3".to_string(),
2941            action: String::new(),
2942            region: "us-east-1".to_string(),
2943            account_id: "123456789012".to_string(),
2944            request_id: "rid".to_string(),
2945            headers: http::HeaderMap::new(),
2946            query_params: query
2947                .iter()
2948                .map(|(k, v)| (k.to_string(), v.to_string()))
2949                .collect(),
2950            body: bytes::Bytes::new(),
2951            body_stream: parking_lot::Mutex::new(None),
2952            path_segments,
2953            raw_path: path.to_string(),
2954            raw_query: String::new(),
2955            method,
2956            is_query_protocol: false,
2957            access_key_id: None,
2958            principal: None,
2959        }
2960    }
2961
2962    #[test]
2963    fn create_session_uses_s3express_prefix() {
2964        let svc = make_service();
2965        let action = svc
2966            .iam_action_for(&req(Method::GET, "/mybucket", &[("session", "")]))
2967            .expect("must be mapped");
2968        assert_eq!(action.service, "s3express");
2969        assert_eq!(action.action, "CreateSession");
2970        assert_eq!(action.action_string(), "s3express:CreateSession");
2971    }
2972
2973    #[test]
2974    fn write_get_object_response_uses_object_lambda_prefix() {
2975        let svc = make_service();
2976        let action = svc
2977            .iam_action_for(&req(Method::POST, "/WriteGetObjectResponse", &[]))
2978            .expect("must be mapped");
2979        assert_eq!(action.service, "s3-object-lambda");
2980        assert_eq!(action.action, "WriteGetObjectResponse");
2981        assert_eq!(
2982            action.action_string(),
2983            "s3-object-lambda:WriteGetObjectResponse"
2984        );
2985    }
2986
2987    #[test]
2988    fn get_object_torrent_maps_to_its_own_action() {
2989        // GET object with `?torrent` must authorize under s3:GetObjectTorrent,
2990        // not s3:GetObject (bug-audit 2026-06-20, 5.3).
2991        let svc = make_service();
2992        let action = svc
2993            .iam_action_for(&req(Method::GET, "/mybucket/key.txt", &[("torrent", "")]))
2994            .expect("must be mapped");
2995        assert_eq!(action.action, "GetObjectTorrent");
2996        // A plain GET still maps to GetObject.
2997        let plain = svc
2998            .iam_action_for(&req(Method::GET, "/mybucket/key.txt", &[]))
2999            .expect("must be mapped");
3000        assert_eq!(plain.action, "GetObject");
3001    }
3002}
3003
3004#[cfg(test)]
3005mod extract_xml_value_tests {
3006    use super::extract_xml_value;
3007
3008    #[test]
3009    fn returns_inner_value() {
3010        assert_eq!(
3011            extract_xml_value("<Root><Key>value</Key></Root>", "Key"),
3012            Some("value".to_string())
3013        );
3014    }
3015
3016    #[test]
3017    fn missing_tag_is_none() {
3018        assert_eq!(extract_xml_value("<Root></Root>", "Key"), None);
3019    }
3020
3021    // bug-audit 2026-05-28, 2.2: a closing tag before the opening one used to
3022    // slice with end < start and panic; must return None instead.
3023    #[test]
3024    fn close_before_open_does_not_panic() {
3025        assert_eq!(extract_xml_value("</Key>oops<Key>value", "Key"), None);
3026    }
3027
3028    #[test]
3029    fn open_without_close_is_none() {
3030        assert_eq!(extract_xml_value("<Key>value", "Key"), None);
3031    }
3032}
3033
3034#[cfg(test)]
3035mod compute_checksum_tests {
3036    use super::{compute_checksum, compute_checksum_streaming};
3037
3038    // bug-audit 2026-06-15, 1.7: CopyObject / CompleteMultipartUpload routed
3039    // through the non-streaming compute_checksum, whose `_ =>` arm returned an
3040    // empty string for CRC32C / CRC64NVME -> GetObjectAttributes reported an
3041    // empty checksum and integrity checks silently broke.
3042    #[test]
3043    fn crc32c_is_non_empty_and_matches_known_vector() {
3044        // CRC32C of "123456789" is 0xE3069283; base64 of those 4 BE bytes.
3045        let out = compute_checksum("CRC32C", b"123456789");
3046        assert!(!out.is_empty());
3047        let bytes = base64::engine::general_purpose::STANDARD
3048            .decode(&out)
3049            .unwrap();
3050        assert_eq!(bytes, 0xE306_9283u32.to_be_bytes());
3051    }
3052
3053    #[test]
3054    fn crc64nvme_is_non_empty() {
3055        let out = compute_checksum("CRC64NVME", b"hello world");
3056        assert!(!out.is_empty());
3057        // 8 BE bytes of a u64.
3058        let bytes = base64::engine::general_purpose::STANDARD
3059            .decode(&out)
3060            .unwrap();
3061        assert_eq!(bytes.len(), 8);
3062    }
3063
3064    #[tokio::test]
3065    async fn non_streaming_matches_streaming_for_all_algorithms() {
3066        let data = b"the quick brown fox jumps over the lazy dog".repeat(100);
3067        let tmp = tempfile::NamedTempFile::new().unwrap();
3068        tokio::fs::write(tmp.path(), &data).await.unwrap();
3069
3070        for algo in ["CRC32", "CRC32C", "CRC64NVME", "SHA1", "SHA256"] {
3071            let direct = compute_checksum(algo, &data);
3072            let streamed = compute_checksum_streaming(algo, tmp.path()).await.unwrap();
3073            assert!(!direct.is_empty(), "{algo} direct empty");
3074            assert_eq!(direct, streamed, "{algo} direct != streaming");
3075        }
3076    }
3077
3078    use base64::Engine as _;
3079}