Skip to main content

fakecloud_s3/
service.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use bytes::Bytes;
5use chrono::{DateTime, Timelike, Utc};
6use http::{HeaderMap, Method, StatusCode};
7use md5::{Digest, Md5};
8use uuid::Uuid;
9
10use fakecloud_core::delivery::DeliveryBus;
11use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
12use fakecloud_kms::state::SharedKmsState;
13
14use base64::engine::general_purpose::STANDARD as BASE64;
15use base64::Engine as _;
16
17use crate::state::{AclGrant, MultipartUpload, S3Bucket, S3Object, SharedS3State, UploadPart};
18
19pub struct S3Service {
20    state: SharedS3State,
21    delivery: Arc<DeliveryBus>,
22    kms_state: Option<SharedKmsState>,
23}
24
25impl S3Service {
26    pub fn new(state: SharedS3State, delivery: Arc<DeliveryBus>) -> Self {
27        Self {
28            state,
29            delivery,
30            kms_state: None,
31        }
32    }
33
34    pub fn with_kms(mut self, kms_state: SharedKmsState) -> Self {
35        self.kms_state = Some(kms_state);
36        self
37    }
38}
39
40#[async_trait]
41impl AwsService for S3Service {
42    fn service_name(&self) -> &str {
43        "s3"
44    }
45
46    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
47        // S3 REST routing: method + path segments + query params
48        let bucket = req.path_segments.first().map(|s| s.as_str());
49        // Extract key from the raw path to preserve leading slashes and empty segments.
50        // The raw path is like "/bucket/key/parts" — we strip the bucket prefix.
51        let key = if let Some(b) = bucket {
52            let prefix = format!("/{b}/");
53            if req.raw_path.starts_with(&prefix) && req.raw_path.len() > prefix.len() {
54                let raw_key = &req.raw_path[prefix.len()..];
55                Some(
56                    percent_encoding::percent_decode_str(raw_key)
57                        .decode_utf8_lossy()
58                        .into_owned(),
59                )
60            } else if req.path_segments.len() > 1 {
61                let raw = req.path_segments[1..].join("/");
62                Some(
63                    percent_encoding::percent_decode_str(&raw)
64                        .decode_utf8_lossy()
65                        .into_owned(),
66                )
67            } else {
68                None
69            }
70        } else {
71            None
72        };
73
74        // Multipart upload operations (checked before main match)
75        if let Some(b) = bucket {
76            // POST /{bucket}/{key}?uploads — CreateMultipartUpload
77            if req.method == Method::POST
78                && key.is_some()
79                && req.query_params.contains_key("uploads")
80            {
81                return self.create_multipart_upload(&req, b, key.as_deref().unwrap());
82            }
83
84            // POST /{bucket}/{key}?restore
85            if req.method == Method::POST
86                && key.is_some()
87                && req.query_params.contains_key("restore")
88            {
89                return self.restore_object(&req, b, key.as_deref().unwrap());
90            }
91
92            // POST /{bucket}/{key}?uploadId=X — CompleteMultipartUpload
93            if req.method == Method::POST && key.is_some() {
94                if let Some(upload_id) = req.query_params.get("uploadId").cloned() {
95                    return self.complete_multipart_upload(
96                        &req,
97                        b,
98                        key.as_deref().unwrap(),
99                        &upload_id,
100                    );
101                }
102            }
103
104            // PUT /{bucket}/{key}?partNumber=N&uploadId=X — UploadPart or UploadPartCopy
105            if req.method == Method::PUT && key.is_some() {
106                if let (Some(part_num_str), Some(upload_id)) = (
107                    req.query_params.get("partNumber").cloned(),
108                    req.query_params.get("uploadId").cloned(),
109                ) {
110                    if let Ok(part_number) = part_num_str.parse::<i64>() {
111                        if req.headers.contains_key("x-amz-copy-source") {
112                            return self.upload_part_copy(
113                                &req,
114                                b,
115                                key.as_deref().unwrap(),
116                                &upload_id,
117                                part_number,
118                            );
119                        }
120                        return self.upload_part(
121                            &req,
122                            b,
123                            key.as_deref().unwrap(),
124                            &upload_id,
125                            part_number,
126                        );
127                    }
128                }
129            }
130
131            // DELETE /{bucket}/{key}?uploadId=X — AbortMultipartUpload
132            if req.method == Method::DELETE && key.is_some() {
133                if let Some(upload_id) = req.query_params.get("uploadId").cloned() {
134                    return self.abort_multipart_upload(b, key.as_deref().unwrap(), &upload_id);
135                }
136            }
137
138            // GET /{bucket}?uploads — ListMultipartUploads
139            if req.method == Method::GET
140                && key.is_none()
141                && req.query_params.contains_key("uploads")
142            {
143                return self.list_multipart_uploads(b);
144            }
145
146            // GET /{bucket}/{key}?uploadId=X — ListParts
147            if req.method == Method::GET && key.is_some() {
148                if let Some(upload_id) = req.query_params.get("uploadId").cloned() {
149                    return self.list_parts(&req, b, key.as_deref().unwrap(), &upload_id);
150                }
151            }
152        }
153
154        // Handle OPTIONS preflight requests (CORS)
155        if req.method == Method::OPTIONS {
156            if let Some(b_name) = bucket {
157                let cors_config = {
158                    let state = self.state.read();
159                    state
160                        .buckets
161                        .get(b_name)
162                        .and_then(|b| b.cors_config.clone())
163                };
164                if let Some(ref config) = cors_config {
165                    let origin = req
166                        .headers
167                        .get("origin")
168                        .and_then(|v| v.to_str().ok())
169                        .unwrap_or("");
170                    let request_method = req
171                        .headers
172                        .get("access-control-request-method")
173                        .and_then(|v| v.to_str().ok())
174                        .unwrap_or("");
175                    let rules = parse_cors_config(config);
176                    if let Some(rule) = find_cors_rule(&rules, origin, Some(request_method)) {
177                        let mut headers = HeaderMap::new();
178                        let matched_origin = if rule.allowed_origins.contains(&"*".to_string()) {
179                            "*"
180                        } else {
181                            origin
182                        };
183                        headers.insert(
184                            "access-control-allow-origin",
185                            matched_origin.parse().unwrap(),
186                        );
187                        headers.insert(
188                            "access-control-allow-methods",
189                            rule.allowed_methods.join(", ").parse().unwrap(),
190                        );
191                        if !rule.allowed_headers.is_empty() {
192                            let ah = if rule.allowed_headers.contains(&"*".to_string()) {
193                                req.headers
194                                    .get("access-control-request-headers")
195                                    .and_then(|v| v.to_str().ok())
196                                    .unwrap_or("*")
197                                    .to_string()
198                            } else {
199                                rule.allowed_headers.join(", ")
200                            };
201                            headers.insert("access-control-allow-headers", ah.parse().unwrap());
202                        }
203                        if let Some(max_age) = rule.max_age_seconds {
204                            headers.insert(
205                                "access-control-max-age",
206                                max_age.to_string().parse().unwrap(),
207                            );
208                        }
209                        return Ok(AwsResponse {
210                            status: StatusCode::OK,
211                            content_type: String::new(),
212                            body: Bytes::new(),
213                            headers,
214                        });
215                    }
216                }
217                return Err(AwsServiceError::aws_error(
218                    StatusCode::FORBIDDEN,
219                    "CORSResponse",
220                    "CORS is not enabled for this bucket",
221                ));
222            }
223        }
224
225        // Capture origin for CORS response headers
226        let origin_header = req
227            .headers
228            .get("origin")
229            .and_then(|v| v.to_str().ok())
230            .map(|s| s.to_string());
231
232        let mut result = match (&req.method, bucket, key.as_deref()) {
233            // ListBuckets: GET /
234            (&Method::GET, None, None) => self.list_buckets(&req),
235
236            // Bucket-level operations (no key)
237            (&Method::PUT, Some(b), None) => {
238                if req.query_params.contains_key("tagging") {
239                    self.put_bucket_tagging(&req, b)
240                } else if req.query_params.contains_key("acl") {
241                    self.put_bucket_acl(&req, b)
242                } else if req.query_params.contains_key("versioning") {
243                    self.put_bucket_versioning(&req, b)
244                } else if req.query_params.contains_key("cors") {
245                    self.put_bucket_cors(&req, b)
246                } else if req.query_params.contains_key("notification") {
247                    self.put_bucket_notification(&req, b)
248                } else if req.query_params.contains_key("website") {
249                    self.put_bucket_website(&req, b)
250                } else if req.query_params.contains_key("accelerate") {
251                    self.put_bucket_accelerate(&req, b)
252                } else if req.query_params.contains_key("publicAccessBlock") {
253                    self.put_public_access_block(&req, b)
254                } else if req.query_params.contains_key("encryption") {
255                    self.put_bucket_encryption(&req, b)
256                } else if req.query_params.contains_key("lifecycle") {
257                    self.put_bucket_lifecycle(&req, b)
258                } else if req.query_params.contains_key("logging") {
259                    self.put_bucket_logging(&req, b)
260                } else if req.query_params.contains_key("policy") {
261                    self.put_bucket_policy(&req, b)
262                } else if req.query_params.contains_key("object-lock") {
263                    self.put_object_lock_config(&req, b)
264                } else if req.query_params.contains_key("replication") {
265                    self.put_bucket_replication(&req, b)
266                } else if req.query_params.contains_key("ownershipControls") {
267                    self.put_bucket_ownership_controls(&req, b)
268                } else if req.query_params.contains_key("inventory") {
269                    self.put_bucket_inventory(&req, b)
270                } else {
271                    self.create_bucket(&req, b)
272                }
273            }
274            (&Method::DELETE, Some(b), None) => {
275                if req.query_params.contains_key("tagging") {
276                    self.delete_bucket_tagging(&req, b)
277                } else if req.query_params.contains_key("cors") {
278                    self.delete_bucket_cors(b)
279                } else if req.query_params.contains_key("website") {
280                    self.delete_bucket_website(b)
281                } else if req.query_params.contains_key("publicAccessBlock") {
282                    self.delete_public_access_block(b)
283                } else if req.query_params.contains_key("encryption") {
284                    self.delete_bucket_encryption(b)
285                } else if req.query_params.contains_key("lifecycle") {
286                    self.delete_bucket_lifecycle(b)
287                } else if req.query_params.contains_key("policy") {
288                    self.delete_bucket_policy(b)
289                } else if req.query_params.contains_key("replication") {
290                    self.delete_bucket_replication(b)
291                } else if req.query_params.contains_key("ownershipControls") {
292                    self.delete_bucket_ownership_controls(b)
293                } else if req.query_params.contains_key("inventory") {
294                    self.delete_bucket_inventory(&req, b)
295                } else {
296                    self.delete_bucket(&req, b)
297                }
298            }
299            (&Method::HEAD, Some(b), None) => self.head_bucket(b),
300            (&Method::GET, Some(b), None) => {
301                if req.query_params.contains_key("tagging") {
302                    self.get_bucket_tagging(&req, b)
303                } else if req.query_params.contains_key("location") {
304                    self.get_bucket_location(b)
305                } else if req.query_params.contains_key("acl") {
306                    self.get_bucket_acl(&req, b)
307                } else if req.query_params.contains_key("versioning") {
308                    self.get_bucket_versioning(b)
309                } else if req.query_params.contains_key("versions") {
310                    self.list_object_versions(&req, b)
311                } else if req.query_params.contains_key("object-lock") {
312                    self.get_object_lock_configuration(b)
313                } else if req.query_params.contains_key("cors") {
314                    self.get_bucket_cors(b)
315                } else if req.query_params.contains_key("notification") {
316                    self.get_bucket_notification(b)
317                } else if req.query_params.contains_key("website") {
318                    self.get_bucket_website(b)
319                } else if req.query_params.contains_key("accelerate") {
320                    self.get_bucket_accelerate(b)
321                } else if req.query_params.contains_key("publicAccessBlock") {
322                    self.get_public_access_block(b)
323                } else if req.query_params.contains_key("encryption") {
324                    self.get_bucket_encryption(b)
325                } else if req.query_params.contains_key("lifecycle") {
326                    self.get_bucket_lifecycle(b)
327                } else if req.query_params.contains_key("logging") {
328                    self.get_bucket_logging(b)
329                } else if req.query_params.contains_key("policy") {
330                    self.get_bucket_policy(b)
331                } else if req.query_params.contains_key("replication") {
332                    self.get_bucket_replication(b)
333                } else if req.query_params.contains_key("ownershipControls") {
334                    self.get_bucket_ownership_controls(b)
335                } else if req.query_params.contains_key("inventory") {
336                    if req.query_params.contains_key("id") {
337                        self.get_bucket_inventory(&req, b)
338                    } else {
339                        self.list_bucket_inventory_configurations(b)
340                    }
341                } else if req.query_params.get("list-type").map(|s| s.as_str()) == Some("2") {
342                    self.list_objects_v2(&req, b)
343                } else {
344                    self.list_objects_v1(&req, b)
345                }
346            }
347
348            // Object-level operations
349            (&Method::PUT, Some(b), Some(k)) => {
350                if req.query_params.contains_key("tagging") {
351                    self.put_object_tagging(&req, b, k)
352                } else if req.query_params.contains_key("acl") {
353                    self.put_object_acl(&req, b, k)
354                } else if req.query_params.contains_key("retention") {
355                    self.put_object_retention(&req, b, k)
356                } else if req.query_params.contains_key("legal-hold") {
357                    self.put_object_legal_hold(&req, b, k)
358                } else if req.headers.contains_key("x-amz-copy-source") {
359                    self.copy_object(&req, b, k)
360                } else {
361                    self.put_object(&req, b, k)
362                }
363            }
364            (&Method::GET, Some(b), Some(k)) => {
365                if req.query_params.contains_key("tagging") {
366                    self.get_object_tagging(&req, b, k)
367                } else if req.query_params.contains_key("acl") {
368                    self.get_object_acl(&req, b, k)
369                } else if req.query_params.contains_key("retention") {
370                    self.get_object_retention(&req, b, k)
371                } else if req.query_params.contains_key("legal-hold") {
372                    self.get_object_legal_hold(&req, b, k)
373                } else if req.query_params.contains_key("attributes") {
374                    self.get_object_attributes(&req, b, k)
375                } else {
376                    self.get_object(&req, b, k)
377                }
378            }
379            (&Method::DELETE, Some(b), Some(k)) => {
380                if req.query_params.contains_key("tagging") {
381                    self.delete_object_tagging(b, k)
382                } else {
383                    self.delete_object(&req, b, k)
384                }
385            }
386            (&Method::HEAD, Some(b), Some(k)) => self.head_object(&req, b, k),
387
388            // POST /{bucket}?delete — batch delete
389            (&Method::POST, Some(b), None) if req.query_params.contains_key("delete") => {
390                self.delete_objects(&req, b)
391            }
392
393            _ => Err(AwsServiceError::aws_error(
394                StatusCode::METHOD_NOT_ALLOWED,
395                "MethodNotAllowed",
396                "The specified method is not allowed against this resource",
397            )),
398        };
399
400        // Apply CORS headers to the response if Origin was present
401        if let (Some(ref origin), Some(b_name)) = (&origin_header, bucket) {
402            let cors_config = {
403                let state = self.state.read();
404                state
405                    .buckets
406                    .get(b_name)
407                    .and_then(|b| b.cors_config.clone())
408            };
409            if let Some(ref config) = cors_config {
410                let rules = parse_cors_config(config);
411                if let Some(rule) = find_cors_rule(&rules, origin, None) {
412                    if let Ok(ref mut resp) = result {
413                        let matched_origin = if rule.allowed_origins.contains(&"*".to_string()) {
414                            "*"
415                        } else {
416                            origin
417                        };
418                        resp.headers.insert(
419                            "access-control-allow-origin",
420                            matched_origin.parse().unwrap(),
421                        );
422                        if !rule.expose_headers.is_empty() {
423                            resp.headers.insert(
424                                "access-control-expose-headers",
425                                rule.expose_headers.join(", ").parse().unwrap(),
426                            );
427                        }
428                    }
429                }
430            }
431        }
432
433        result
434    }
435
436    fn supported_actions(&self) -> &[&str] {
437        &[
438            // Buckets
439            "ListBuckets",
440            "CreateBucket",
441            "DeleteBucket",
442            "HeadBucket",
443            "GetBucketLocation",
444            // Objects
445            "PutObject",
446            "GetObject",
447            "DeleteObject",
448            "HeadObject",
449            "CopyObject",
450            "DeleteObjects",
451            "ListObjectsV2",
452            "ListObjects",
453            "ListObjectVersions",
454            "GetObjectAttributes",
455            "RestoreObject",
456            // Object properties
457            "PutObjectTagging",
458            "GetObjectTagging",
459            "DeleteObjectTagging",
460            "PutObjectAcl",
461            "GetObjectAcl",
462            "PutObjectRetention",
463            "GetObjectRetention",
464            "PutObjectLegalHold",
465            "GetObjectLegalHold",
466            // Bucket configuration
467            "PutBucketTagging",
468            "GetBucketTagging",
469            "DeleteBucketTagging",
470            "PutBucketAcl",
471            "GetBucketAcl",
472            "PutBucketVersioning",
473            "GetBucketVersioning",
474            "PutBucketCors",
475            "GetBucketCors",
476            "DeleteBucketCors",
477            "PutBucketNotificationConfiguration",
478            "GetBucketNotificationConfiguration",
479            "PutBucketWebsite",
480            "GetBucketWebsite",
481            "DeleteBucketWebsite",
482            "PutBucketAccelerateConfiguration",
483            "GetBucketAccelerateConfiguration",
484            "PutPublicAccessBlock",
485            "GetPublicAccessBlock",
486            "DeletePublicAccessBlock",
487            "PutBucketEncryption",
488            "GetBucketEncryption",
489            "DeleteBucketEncryption",
490            "PutBucketLifecycleConfiguration",
491            "GetBucketLifecycleConfiguration",
492            "DeleteBucketLifecycle",
493            "PutBucketLogging",
494            "GetBucketLogging",
495            "PutBucketPolicy",
496            "GetBucketPolicy",
497            "DeleteBucketPolicy",
498            "PutObjectLockConfiguration",
499            "GetObjectLockConfiguration",
500            "PutBucketReplication",
501            "GetBucketReplication",
502            "DeleteBucketReplication",
503            "PutBucketOwnershipControls",
504            "GetBucketOwnershipControls",
505            "DeleteBucketOwnershipControls",
506            "PutBucketInventoryConfiguration",
507            "GetBucketInventoryConfiguration",
508            "DeleteBucketInventoryConfiguration",
509            // Multipart uploads
510            "CreateMultipartUpload",
511            "UploadPart",
512            "UploadPartCopy",
513            "CompleteMultipartUpload",
514            "AbortMultipartUpload",
515            "ListParts",
516            "ListMultipartUploads",
517        ]
518    }
519}
520
521// ---------------------------------------------------------------------------
522// Bucket operations
523// ---------------------------------------------------------------------------
524impl S3Service {
525    fn list_buckets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
526        let state = self.state.read();
527        let mut buckets_xml = String::new();
528        let mut sorted: Vec<_> = state.buckets.values().collect();
529        sorted.sort_by_key(|b| &b.name);
530        for b in sorted {
531            buckets_xml.push_str(&format!(
532                "<Bucket><Name>{}</Name><CreationDate>{}</CreationDate></Bucket>",
533                xml_escape(&b.name),
534                b.creation_date.format("%Y-%m-%dT%H:%M:%S%.3fZ"),
535            ));
536        }
537        let body = format!(
538            "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
539             <ListAllMyBucketsResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
540             <Owner><ID>{account}</ID><DisplayName>{account}</DisplayName></Owner>\
541             <Buckets>{buckets_xml}</Buckets>\
542             </ListAllMyBucketsResult>",
543            account = req.account_id,
544        );
545        Ok(s3_xml(StatusCode::OK, body))
546    }
547
548    fn create_bucket(
549        &self,
550        req: &AwsRequest,
551        bucket: &str,
552    ) -> Result<AwsResponse, AwsServiceError> {
553        if !is_valid_bucket_name(bucket) {
554            return Err(AwsServiceError::aws_error(
555                StatusCode::BAD_REQUEST,
556                "InvalidBucketName",
557                format!("The specified bucket is not valid: {bucket}"),
558            ));
559        }
560
561        // Parse LocationConstraint from body if present
562        let body_str = std::str::from_utf8(&req.body).unwrap_or("");
563        let has_config_body =
564            !body_str.is_empty() && body_str.contains("CreateBucketConfiguration");
565        let explicit_constraint = if has_config_body {
566            extract_xml_value(body_str, "LocationConstraint")
567        } else {
568            None
569        };
570
571        if let Some(ref constraint) = explicit_constraint {
572            if !constraint.is_empty() {
573                if constraint == "us-east-1" && req.region != "us-east-1" {
574                    return Err(AwsServiceError::aws_error(
575                        StatusCode::BAD_REQUEST,
576                        "IllegalLocationConstraintException",
577                        format!(
578                            "The {} location constraint is incompatible for the region specific endpoint this request was sent to.",
579                            constraint
580                        ),
581                    ));
582                }
583                if constraint == "us-east-1" && req.region == "us-east-1" {
584                    return Err(AwsServiceError::aws_error(
585                        StatusCode::BAD_REQUEST,
586                        "InvalidLocationConstraint",
587                        "The specified location-constraint is not valid",
588                    ));
589                }
590                if !is_valid_region(constraint) {
591                    return Err(AwsServiceError::aws_error(
592                        StatusCode::BAD_REQUEST,
593                        "InvalidLocationConstraint",
594                        format!("The specified location-constraint is not valid: {constraint}"),
595                    ));
596                }
597                if constraint != &req.region && req.region != "us-east-1" {
598                    return Err(AwsServiceError::aws_error(
599                        StatusCode::BAD_REQUEST,
600                        "IllegalLocationConstraintException",
601                        format!(
602                            "The {} location constraint is incompatible for the region specific endpoint this request was sent to.",
603                            constraint
604                        ),
605                    ));
606                }
607            }
608        }
609
610        let constraint_unspecified = match &explicit_constraint {
611            None => true,
612            Some(c) => c.is_empty(),
613        };
614        if constraint_unspecified && req.region != "us-east-1" {
615            return Err(AwsServiceError::aws_error(
616                StatusCode::BAD_REQUEST,
617                "IllegalLocationConstraintException",
618                "The unspecified location constraint is incompatible for the region specific endpoint this request was sent to.",
619            ));
620        }
621
622        let requested_region = match &explicit_constraint {
623            Some(c) if !c.is_empty() => c.clone(),
624            _ => req.region.clone(),
625        };
626
627        // Parse ACL from header
628        let acl = req
629            .headers
630            .get("x-amz-acl")
631            .and_then(|v| v.to_str().ok())
632            .unwrap_or("private");
633
634        let mut state = self.state.write();
635        if let Some(existing) = state.buckets.get(bucket) {
636            // In us-east-1, re-creating same bucket in same region is idempotent (returns 200)
637            if existing.region == requested_region && requested_region == "us-east-1" {
638                let mut headers = HeaderMap::new();
639                headers.insert("location", format!("/{bucket}").parse().unwrap());
640                return Ok(AwsResponse {
641                    status: StatusCode::OK,
642                    content_type: "application/xml".to_string(),
643                    body: Bytes::new(),
644                    headers,
645                });
646            }
647            return Err(AwsServiceError::aws_error_with_fields(
648                StatusCode::CONFLICT,
649                "BucketAlreadyOwnedByYou",
650                "Your previous request to create the named bucket succeeded and you already own it.",
651                vec![("BucketName".to_string(), bucket.to_string())],
652            ));
653        }
654        let object_lock_enabled = req
655            .headers
656            .get("x-amz-bucket-object-lock-enabled")
657            .and_then(|v| v.to_str().ok())
658            .map(|s| s.eq_ignore_ascii_case("true"))
659            .unwrap_or(false);
660
661        let mut b = S3Bucket::new(bucket, &requested_region, &req.account_id);
662        b.acl_grants = canned_acl_grants(acl, &req.account_id);
663        if object_lock_enabled {
664            b.versioning = Some("Enabled".to_string());
665            b.object_lock_config = Some(
666                "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
667                 <ObjectLockConfiguration xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
668                 <ObjectLockEnabled>Enabled</ObjectLockEnabled>\
669                 </ObjectLockConfiguration>"
670                    .to_string(),
671            );
672        }
673
674        // Handle x-amz-object-ownership header
675        if let Some(ownership) = req
676            .headers
677            .get("x-amz-object-ownership")
678            .and_then(|v| v.to_str().ok())
679        {
680            b.ownership_controls = Some(format!(
681                "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
682                 <OwnershipControls xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
683                 <Rule><ObjectOwnership>{ownership}</ObjectOwnership></Rule>\
684                 </OwnershipControls>"
685            ));
686        }
687
688        state.buckets.insert(bucket.to_string(), b);
689
690        let mut headers = HeaderMap::new();
691        headers.insert("location", format!("/{bucket}").parse().unwrap());
692        headers.insert(
693            "x-amz-bucket-arn",
694            format!("arn:aws:s3:::{bucket}").parse().unwrap(),
695        );
696        Ok(AwsResponse {
697            status: StatusCode::OK,
698            content_type: "application/xml".to_string(),
699            body: Bytes::new(),
700            headers,
701        })
702    }
703
704    fn delete_bucket(
705        &self,
706        _req: &AwsRequest,
707        bucket: &str,
708    ) -> Result<AwsResponse, AwsServiceError> {
709        let mut state = self.state.write();
710        let b = state
711            .buckets
712            .get(bucket)
713            .ok_or_else(|| no_such_bucket(bucket))?;
714        // Bucket must be empty to delete (no objects and no versions)
715        let has_real_objects = b.objects.values().any(|o| !o.is_delete_marker);
716        let has_versions = b.object_versions.values().any(|v| !v.is_empty());
717        if has_real_objects || has_versions {
718            return Err(AwsServiceError::aws_error_with_fields(
719                StatusCode::CONFLICT,
720                "BucketNotEmpty",
721                "The bucket you tried to delete is not empty",
722                vec![("BucketName".to_string(), bucket.to_string())],
723            ));
724        }
725        state.buckets.remove(bucket);
726        Ok(AwsResponse {
727            status: StatusCode::NO_CONTENT,
728            content_type: "application/xml".to_string(),
729            body: Bytes::new(),
730            headers: HeaderMap::new(),
731        })
732    }
733
734    fn head_bucket(&self, bucket: &str) -> Result<AwsResponse, AwsServiceError> {
735        let state = self.state.read();
736        if !state.buckets.contains_key(bucket) {
737            return Err(AwsServiceError::aws_error(
738                StatusCode::NOT_FOUND,
739                "NoSuchBucket",
740                format!("The specified bucket does not exist: {bucket}"),
741            ));
742        }
743        Ok(AwsResponse {
744            status: StatusCode::OK,
745            content_type: "application/xml".to_string(),
746            body: Bytes::new(),
747            headers: HeaderMap::new(),
748        })
749    }
750
751    fn get_bucket_location(&self, bucket: &str) -> Result<AwsResponse, AwsServiceError> {
752        let state = self.state.read();
753        let b = state
754            .buckets
755            .get(bucket)
756            .ok_or_else(|| no_such_bucket(bucket))?;
757        let loc = if b.region == "us-east-1" {
758            String::new()
759        } else {
760            b.region.clone()
761        };
762        let body = format!(
763            "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
764             <LocationConstraint xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">{loc}</LocationConstraint>"
765        );
766        Ok(s3_xml(StatusCode::OK, body))
767    }
768
769    // ---- Encryption ----
770
771    fn put_bucket_encryption(
772        &self,
773        req: &AwsRequest,
774        bucket: &str,
775    ) -> Result<AwsResponse, AwsServiceError> {
776        let body_str = std::str::from_utf8(&req.body).unwrap_or("").to_string();
777        let mut state = self.state.write();
778        let b = state
779            .buckets
780            .get_mut(bucket)
781            .ok_or_else(|| no_such_bucket(bucket))?;
782        // Normalize: add BucketKeyEnabled=false to each Rule if missing
783        let normalized = if body_str.contains("<Rule>") && !body_str.contains("<BucketKeyEnabled>")
784        {
785            body_str.replace(
786                "</Rule>",
787                "<BucketKeyEnabled>false</BucketKeyEnabled></Rule>",
788            )
789        } else {
790            body_str
791        };
792        b.encryption_config = Some(normalized);
793        Ok(empty_response(StatusCode::OK))
794    }
795
796    fn get_bucket_encryption(&self, bucket: &str) -> Result<AwsResponse, AwsServiceError> {
797        let state = self.state.read();
798        let b = state
799            .buckets
800            .get(bucket)
801            .ok_or_else(|| no_such_bucket(bucket))?;
802        match &b.encryption_config {
803            Some(config) => Ok(s3_xml(StatusCode::OK, config.clone())),
804            None => Err(AwsServiceError::aws_error_with_fields(
805                StatusCode::NOT_FOUND,
806                "ServerSideEncryptionConfigurationNotFoundError",
807                "The server side encryption configuration was not found",
808                vec![("BucketName".to_string(), bucket.to_string())],
809            )),
810        }
811    }
812
813    fn delete_bucket_encryption(&self, bucket: &str) -> Result<AwsResponse, AwsServiceError> {
814        let mut state = self.state.write();
815        let b = state
816            .buckets
817            .get_mut(bucket)
818            .ok_or_else(|| no_such_bucket(bucket))?;
819        b.encryption_config = None;
820        Ok(empty_response(StatusCode::NO_CONTENT))
821    }
822
823    // ---- Lifecycle ----
824
825    fn put_bucket_lifecycle(
826        &self,
827        req: &AwsRequest,
828        bucket: &str,
829    ) -> Result<AwsResponse, AwsServiceError> {
830        let body_str = std::str::from_utf8(&req.body).unwrap_or("").to_string();
831
832        // Validate lifecycle configuration
833        validate_lifecycle_xml(&body_str)?;
834
835        // If there are no <Rule> elements at all, treat as deleting the configuration
836        let has_rules = body_str.contains("<Rule>");
837
838        let mut state = self.state.write();
839        let b = state
840            .buckets
841            .get_mut(bucket)
842            .ok_or_else(|| no_such_bucket(bucket))?;
843        if has_rules {
844            b.lifecycle_config = Some(body_str);
845        } else {
846            b.lifecycle_config = None;
847        }
848        Ok(empty_response(StatusCode::OK))
849    }
850
851    fn get_bucket_lifecycle(&self, bucket: &str) -> Result<AwsResponse, AwsServiceError> {
852        let state = self.state.read();
853        let b = state
854            .buckets
855            .get(bucket)
856            .ok_or_else(|| no_such_bucket(bucket))?;
857        match &b.lifecycle_config {
858            Some(config) => Ok(s3_xml(StatusCode::OK, config.clone())),
859            None => Err(AwsServiceError::aws_error_with_fields(
860                StatusCode::NOT_FOUND,
861                "NoSuchLifecycleConfiguration",
862                "The lifecycle configuration does not exist",
863                vec![("BucketName".to_string(), bucket.to_string())],
864            )),
865        }
866    }
867
868    fn delete_bucket_lifecycle(&self, bucket: &str) -> Result<AwsResponse, AwsServiceError> {
869        let mut state = self.state.write();
870        let b = state
871            .buckets
872            .get_mut(bucket)
873            .ok_or_else(|| no_such_bucket(bucket))?;
874        b.lifecycle_config = None;
875        Ok(empty_response(StatusCode::NO_CONTENT))
876    }
877
878    // ---- Policy ----
879
880    fn put_bucket_policy(
881        &self,
882        req: &AwsRequest,
883        bucket: &str,
884    ) -> Result<AwsResponse, AwsServiceError> {
885        let body_str = std::str::from_utf8(&req.body).unwrap_or("").to_string();
886        if serde_json::from_str::<serde_json::Value>(&body_str).is_err() {
887            return Err(AwsServiceError::aws_error(
888                StatusCode::BAD_REQUEST,
889                "MalformedPolicy",
890                "This policy contains invalid Json",
891            ));
892        }
893        let mut state = self.state.write();
894        let b = state
895            .buckets
896            .get_mut(bucket)
897            .ok_or_else(|| no_such_bucket(bucket))?;
898        b.policy = Some(body_str);
899        Ok(empty_response(StatusCode::NO_CONTENT))
900    }
901
902    fn get_bucket_policy(&self, bucket: &str) -> Result<AwsResponse, AwsServiceError> {
903        let state = self.state.read();
904        let b = state
905            .buckets
906            .get(bucket)
907            .ok_or_else(|| no_such_bucket(bucket))?;
908        match &b.policy {
909            Some(policy) => Ok(AwsResponse {
910                status: StatusCode::OK,
911                content_type: "application/json".to_string(),
912                body: Bytes::from(policy.clone()),
913                headers: HeaderMap::new(),
914            }),
915            None => Err(AwsServiceError::aws_error_with_fields(
916                StatusCode::NOT_FOUND,
917                "NoSuchBucketPolicy",
918                "The bucket policy does not exist",
919                vec![("BucketName".to_string(), bucket.to_string())],
920            )),
921        }
922    }
923
924    fn delete_bucket_policy(&self, bucket: &str) -> Result<AwsResponse, AwsServiceError> {
925        let mut state = self.state.write();
926        let b = state
927            .buckets
928            .get_mut(bucket)
929            .ok_or_else(|| no_such_bucket(bucket))?;
930        b.policy = None;
931        Ok(empty_response(StatusCode::NO_CONTENT))
932    }
933
934    // ---- CORS ----
935
936    fn put_bucket_cors(
937        &self,
938        req: &AwsRequest,
939        bucket: &str,
940    ) -> Result<AwsResponse, AwsServiceError> {
941        let body_str = std::str::from_utf8(&req.body).unwrap_or("").to_string();
942
943        // Validate CORS configuration
944        let rule_count = body_str.matches("<CORSRule>").count();
945        if rule_count == 0 || rule_count > 100 {
946            return Err(AwsServiceError::aws_error(
947                StatusCode::BAD_REQUEST,
948                "MalformedXML",
949                "The XML you provided was not well-formed or did not validate against our published schema",
950            ));
951        }
952
953        // Validate HTTP methods
954        let valid_methods = ["GET", "PUT", "POST", "DELETE", "HEAD"];
955        let mut remaining = body_str.as_str();
956        while let Some(start) = remaining.find("<AllowedMethod>") {
957            let after = &remaining[start + 15..];
958            if let Some(end) = after.find("</AllowedMethod>") {
959                let method = after[..end].trim();
960                if !valid_methods.contains(&method) {
961                    return Err(AwsServiceError::aws_error(
962                        StatusCode::BAD_REQUEST,
963                        "InvalidRequest",
964                        format!(
965                            "Found unsupported HTTP method in CORS config. Unsupported method is {method}"
966                        ),
967                    ));
968                }
969                remaining = &after[end + 16..];
970            } else {
971                break;
972            }
973        }
974
975        let mut state = self.state.write();
976        let b = state
977            .buckets
978            .get_mut(bucket)
979            .ok_or_else(|| no_such_bucket(bucket))?;
980        b.cors_config = Some(body_str);
981        Ok(empty_response(StatusCode::OK))
982    }
983
984    fn get_bucket_cors(&self, bucket: &str) -> Result<AwsResponse, AwsServiceError> {
985        let state = self.state.read();
986        let b = state
987            .buckets
988            .get(bucket)
989            .ok_or_else(|| no_such_bucket(bucket))?;
990        match &b.cors_config {
991            Some(config) => Ok(s3_xml(StatusCode::OK, config.clone())),
992            None => Err(AwsServiceError::aws_error_with_fields(
993                StatusCode::NOT_FOUND,
994                "NoSuchCORSConfiguration",
995                "The CORS configuration does not exist",
996                vec![("BucketName".to_string(), bucket.to_string())],
997            )),
998        }
999    }
1000
1001    fn delete_bucket_cors(&self, bucket: &str) -> Result<AwsResponse, AwsServiceError> {
1002        let mut state = self.state.write();
1003        let b = state
1004            .buckets
1005            .get_mut(bucket)
1006            .ok_or_else(|| no_such_bucket(bucket))?;
1007        b.cors_config = None;
1008        Ok(empty_response(StatusCode::NO_CONTENT))
1009    }
1010
1011    // ---- Notification ----
1012
1013    fn put_bucket_notification(
1014        &self,
1015        req: &AwsRequest,
1016        bucket: &str,
1017    ) -> Result<AwsResponse, AwsServiceError> {
1018        let body_str = std::str::from_utf8(&req.body).unwrap_or("").to_string();
1019        let mut state = self.state.write();
1020        let b = state
1021            .buckets
1022            .get_mut(bucket)
1023            .ok_or_else(|| no_such_bucket(bucket))?;
1024        // Auto-generate Id for each configuration element if missing
1025        let normalized = normalize_notification_ids(&body_str);
1026        b.notification_config = Some(normalized);
1027        Ok(empty_response(StatusCode::OK))
1028    }
1029
1030    fn get_bucket_notification(&self, bucket: &str) -> Result<AwsResponse, AwsServiceError> {
1031        let state = self.state.read();
1032        let b = state
1033            .buckets
1034            .get(bucket)
1035            .ok_or_else(|| no_such_bucket(bucket))?;
1036        let body = match &b.notification_config {
1037            Some(config) => config.clone(),
1038            None => "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
1039                     <NotificationConfiguration xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
1040                     </NotificationConfiguration>"
1041                .to_string(),
1042        };
1043        Ok(s3_xml(StatusCode::OK, body))
1044    }
1045
1046    // ---- Logging ----
1047
1048    fn put_bucket_logging(
1049        &self,
1050        req: &AwsRequest,
1051        bucket: &str,
1052    ) -> Result<AwsResponse, AwsServiceError> {
1053        let body_str = std::str::from_utf8(&req.body).unwrap_or("").to_string();
1054        let mut state = self.state.write();
1055        let b = state
1056            .buckets
1057            .get_mut(bucket)
1058            .ok_or_else(|| no_such_bucket(bucket))?;
1059        b.logging_config = Some(body_str);
1060        Ok(empty_response(StatusCode::OK))
1061    }
1062
1063    fn get_bucket_logging(&self, bucket: &str) -> Result<AwsResponse, AwsServiceError> {
1064        let state = self.state.read();
1065        let b = state
1066            .buckets
1067            .get(bucket)
1068            .ok_or_else(|| no_such_bucket(bucket))?;
1069        let body = match &b.logging_config {
1070            Some(config) => config.clone(),
1071            None => "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
1072                     <BucketLoggingStatus xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
1073                     </BucketLoggingStatus>"
1074                .to_string(),
1075        };
1076        Ok(s3_xml(StatusCode::OK, body))
1077    }
1078
1079    // ---- Website ----
1080
1081    fn put_bucket_website(
1082        &self,
1083        req: &AwsRequest,
1084        bucket: &str,
1085    ) -> Result<AwsResponse, AwsServiceError> {
1086        let body_str = std::str::from_utf8(&req.body).unwrap_or("").to_string();
1087        let mut state = self.state.write();
1088        let b = state
1089            .buckets
1090            .get_mut(bucket)
1091            .ok_or_else(|| no_such_bucket(bucket))?;
1092        b.website_config = Some(body_str);
1093        Ok(empty_response(StatusCode::OK))
1094    }
1095
1096    fn get_bucket_website(&self, bucket: &str) -> Result<AwsResponse, AwsServiceError> {
1097        let state = self.state.read();
1098        let b = state
1099            .buckets
1100            .get(bucket)
1101            .ok_or_else(|| no_such_bucket(bucket))?;
1102        match &b.website_config {
1103            Some(config) => Ok(s3_xml(StatusCode::OK, config.clone())),
1104            None => Err(AwsServiceError::aws_error_with_fields(
1105                StatusCode::NOT_FOUND,
1106                "NoSuchWebsiteConfiguration",
1107                "The specified bucket does not have a website configuration",
1108                vec![("BucketName".to_string(), bucket.to_string())],
1109            )),
1110        }
1111    }
1112
1113    fn delete_bucket_website(&self, bucket: &str) -> Result<AwsResponse, AwsServiceError> {
1114        let mut state = self.state.write();
1115        let b = state
1116            .buckets
1117            .get_mut(bucket)
1118            .ok_or_else(|| no_such_bucket(bucket))?;
1119        b.website_config = None;
1120        Ok(empty_response(StatusCode::NO_CONTENT))
1121    }
1122
1123    // ---- Accelerate ----
1124
1125    fn put_bucket_accelerate(
1126        &self,
1127        req: &AwsRequest,
1128        bucket: &str,
1129    ) -> Result<AwsResponse, AwsServiceError> {
1130        if bucket.contains('.') {
1131            return Err(AwsServiceError::aws_error(
1132                StatusCode::BAD_REQUEST,
1133                "InvalidRequest",
1134                "S3 Transfer Acceleration is not supported for buckets with periods (.) in their names",
1135            ));
1136        }
1137        let body_str = std::str::from_utf8(&req.body).unwrap_or("");
1138        let status = extract_xml_value(body_str, "Status");
1139        let mut state = self.state.write();
1140        let b = state
1141            .buckets
1142            .get_mut(bucket)
1143            .ok_or_else(|| no_such_bucket(bucket))?;
1144        // Validate status
1145        if let Some(ref s) = status {
1146            if s != "Enabled" && s != "Suspended" {
1147                return Err(AwsServiceError::aws_error(
1148                    StatusCode::BAD_REQUEST,
1149                    "MalformedXML",
1150                    "The XML you provided was not well-formed or did not validate against our published schema",
1151                ));
1152            }
1153        }
1154        // Suspending a never-configured bucket is a no-op
1155        if status.as_deref() == Some("Suspended") && b.accelerate_status.is_none() {
1156            return Ok(empty_response(StatusCode::OK));
1157        }
1158        b.accelerate_status = status;
1159        Ok(empty_response(StatusCode::OK))
1160    }
1161
1162    fn get_bucket_accelerate(&self, bucket: &str) -> Result<AwsResponse, AwsServiceError> {
1163        let state = self.state.read();
1164        let b = state
1165            .buckets
1166            .get(bucket)
1167            .ok_or_else(|| no_such_bucket(bucket))?;
1168        let status_xml = match &b.accelerate_status {
1169            Some(s) => format!("<Status>{s}</Status>"),
1170            None => String::new(),
1171        };
1172        let body = format!(
1173            "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
1174             <AccelerateConfiguration xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
1175             {status_xml}\
1176             </AccelerateConfiguration>"
1177        );
1178        Ok(s3_xml(StatusCode::OK, body))
1179    }
1180
1181    // ---- PublicAccessBlock ----
1182
1183    fn put_public_access_block(
1184        &self,
1185        req: &AwsRequest,
1186        bucket: &str,
1187    ) -> Result<AwsResponse, AwsServiceError> {
1188        let body_str = std::str::from_utf8(&req.body).unwrap_or("").to_string();
1189        // Validate that at least one field is specified
1190        let has_field = body_str.contains("BlockPublicAcls")
1191            || body_str.contains("IgnorePublicAcls")
1192            || body_str.contains("BlockPublicPolicy")
1193            || body_str.contains("RestrictPublicBuckets");
1194        if !has_field {
1195            return Err(AwsServiceError::aws_error(
1196                StatusCode::BAD_REQUEST,
1197                "InvalidRequest",
1198                "Must specify at least one configuration.",
1199            ));
1200        }
1201        let mut state = self.state.write();
1202        let b = state
1203            .buckets
1204            .get_mut(bucket)
1205            .ok_or_else(|| no_such_bucket(bucket))?;
1206        b.public_access_block = Some(body_str);
1207        Ok(empty_response(StatusCode::OK))
1208    }
1209
1210    fn get_public_access_block(&self, bucket: &str) -> Result<AwsResponse, AwsServiceError> {
1211        let state = self.state.read();
1212        let b = state
1213            .buckets
1214            .get(bucket)
1215            .ok_or_else(|| no_such_bucket(bucket))?;
1216        match &b.public_access_block {
1217            Some(config) => {
1218                // Ensure all four fields are present with defaults of false
1219                let fields = [
1220                    "BlockPublicAcls",
1221                    "IgnorePublicAcls",
1222                    "BlockPublicPolicy",
1223                    "RestrictPublicBuckets",
1224                ];
1225                let mut result = config.clone();
1226                for field in fields {
1227                    if !result.contains(field) {
1228                        let closing = "</PublicAccessBlockConfiguration>";
1229                        if let Some(pos) = result.find(closing) {
1230                            result.insert_str(pos, &format!("<{field}>false</{field}>"));
1231                        }
1232                    }
1233                }
1234                Ok(s3_xml(StatusCode::OK, result))
1235            }
1236            None => Err(AwsServiceError::aws_error(
1237                StatusCode::NOT_FOUND,
1238                "NoSuchPublicAccessBlockConfiguration",
1239                "The public access block configuration was not found",
1240            )),
1241        }
1242    }
1243
1244    fn delete_public_access_block(&self, bucket: &str) -> Result<AwsResponse, AwsServiceError> {
1245        let mut state = self.state.write();
1246        let b = state
1247            .buckets
1248            .get_mut(bucket)
1249            .ok_or_else(|| no_such_bucket(bucket))?;
1250        b.public_access_block = None;
1251        Ok(empty_response(StatusCode::NO_CONTENT))
1252    }
1253
1254    // ---- ObjectLockConfiguration ----
1255
1256    fn put_object_lock_config(
1257        &self,
1258        req: &AwsRequest,
1259        bucket: &str,
1260    ) -> Result<AwsResponse, AwsServiceError> {
1261        let body_str = std::str::from_utf8(&req.body).unwrap_or("").to_string();
1262
1263        // Validate: body must not be empty
1264        if body_str.trim().is_empty() {
1265            return Err(AwsServiceError::aws_error(
1266                StatusCode::BAD_REQUEST,
1267                "MissingRequestBodyError",
1268                "Request Body is empty",
1269            ));
1270        }
1271
1272        // Must contain ObjectLockEnabled
1273        if !body_str.contains("<ObjectLockEnabled>") {
1274            return Err(AwsServiceError::aws_error(
1275                StatusCode::BAD_REQUEST,
1276                "MalformedXML",
1277                "The XML you provided was not well-formed or did not validate against our published schema",
1278            ));
1279        }
1280
1281        let mut state = self.state.write();
1282        let b = state
1283            .buckets
1284            .get_mut(bucket)
1285            .ok_or_else(|| no_such_bucket(bucket))?;
1286
1287        // Versioning must be enabled
1288        if b.versioning.as_deref() != Some("Enabled") {
1289            return Err(AwsServiceError::aws_error(
1290                StatusCode::CONFLICT,
1291                "InvalidBucketState",
1292                "Versioning must be 'Enabled' on the bucket to apply a Object Lock configuration",
1293            ));
1294        }
1295
1296        b.object_lock_config = Some(body_str);
1297        Ok(empty_response(StatusCode::OK))
1298    }
1299
1300    #[allow(dead_code)]
1301    fn get_object_lock_config(&self, bucket: &str) -> Result<AwsResponse, AwsServiceError> {
1302        let state = self.state.read();
1303        let b = state
1304            .buckets
1305            .get(bucket)
1306            .ok_or_else(|| no_such_bucket(bucket))?;
1307        match &b.object_lock_config {
1308            Some(config) => Ok(s3_xml(StatusCode::OK, config.clone())),
1309            None => Err(AwsServiceError::aws_error(
1310                StatusCode::NOT_FOUND,
1311                "ObjectLockConfigurationNotFoundError",
1312                "Object Lock configuration does not exist for this bucket",
1313            )),
1314        }
1315    }
1316
1317    // ---- List operations ----
1318
1319    #[allow(dead_code)]
1320    fn list_objects_v1(
1321        &self,
1322        req: &AwsRequest,
1323        bucket: &str,
1324    ) -> Result<AwsResponse, AwsServiceError> {
1325        let state = self.state.read();
1326        let b = state
1327            .buckets
1328            .get(bucket)
1329            .ok_or_else(|| no_such_bucket(bucket))?;
1330
1331        let prefix = req.query_params.get("prefix").cloned().unwrap_or_default();
1332        let delimiter = req.query_params.get("delimiter").cloned();
1333        let max_keys: usize = req
1334            .query_params
1335            .get("max-keys")
1336            .and_then(|v| v.parse().ok())
1337            .unwrap_or(1000);
1338        let marker = req.query_params.get("marker").cloned().unwrap_or_default();
1339        let encoding_type = req.query_params.get("encoding-type").cloned();
1340
1341        let mut contents = String::new();
1342        let mut common_prefixes: Vec<String> = Vec::new();
1343        let mut count = 0;
1344        let mut is_truncated = false;
1345        let mut last_key = String::new();
1346
1347        for (key, obj) in &b.objects {
1348            if obj.is_delete_marker {
1349                continue;
1350            }
1351            if !key.starts_with(&prefix) {
1352                continue;
1353            }
1354            if !marker.is_empty() && key.as_str() <= marker.as_str() {
1355                continue;
1356            }
1357
1358            // Handle delimiter-based grouping
1359            if let Some(ref delim) = delimiter {
1360                if !delim.is_empty() {
1361                    let suffix = &key[prefix.len()..];
1362                    if let Some(pos) = suffix.find(delim.as_str()) {
1363                        let cp = format!("{}{}", prefix, &suffix[..pos + delim.len()]);
1364                        if !common_prefixes.contains(&cp) {
1365                            if count >= max_keys {
1366                                is_truncated = true;
1367                                break;
1368                            }
1369                            common_prefixes.push(cp);
1370                            last_key = key.clone();
1371                            count += 1;
1372                        }
1373                        continue;
1374                    }
1375                }
1376            }
1377
1378            if count >= max_keys {
1379                is_truncated = true;
1380                break;
1381            }
1382
1383            let display_key = if encoding_type.as_deref() == Some("url") {
1384                url_encode_s3_key(key)
1385            } else {
1386                xml_escape(key)
1387            };
1388
1389            contents.push_str(&format!(
1390                "<Contents>\
1391                 <Key>{}</Key>\
1392                 <LastModified>{}</LastModified>\
1393                 <ETag>&quot;{}&quot;</ETag>\
1394                 <Size>{}</Size>\
1395                 <StorageClass>{}</StorageClass>\
1396                 </Contents>",
1397                display_key,
1398                obj.last_modified.format("%Y-%m-%dT%H:%M:%S%.3fZ"),
1399                obj.etag,
1400                obj.size,
1401                obj.storage_class,
1402            ));
1403            last_key = key.clone();
1404            count += 1;
1405        }
1406
1407        let mut common_prefixes_xml = String::new();
1408        for cp in &common_prefixes {
1409            let display_cp = if encoding_type.as_deref() == Some("url") {
1410                url_encode_s3_key(cp)
1411            } else {
1412                xml_escape(cp)
1413            };
1414            common_prefixes_xml.push_str(&format!(
1415                "<CommonPrefixes><Prefix>{display_cp}</Prefix></CommonPrefixes>",
1416            ));
1417        }
1418
1419        let next_marker = if is_truncated {
1420            format!("<NextMarker>{}</NextMarker>", xml_escape(&last_key))
1421        } else {
1422            String::new()
1423        };
1424
1425        let delimiter_xml = match &delimiter {
1426            Some(d) if !d.is_empty() => format!("<Delimiter>{}</Delimiter>", xml_escape(d)),
1427            _ => String::new(),
1428        };
1429
1430        let prefix_xml = if prefix.is_empty() {
1431            String::new()
1432        } else {
1433            let display_prefix = if encoding_type.as_deref() == Some("url") {
1434                url_encode_s3_key(&prefix)
1435            } else {
1436                xml_escape(&prefix)
1437            };
1438            format!("<Prefix>{display_prefix}</Prefix>")
1439        };
1440
1441        let marker_xml = if marker.is_empty() {
1442            String::new()
1443        } else {
1444            format!("<Marker>{}</Marker>", xml_escape(&marker))
1445        };
1446
1447        let encoding_xml = if encoding_type.as_deref() == Some("url") {
1448            "<EncodingType>url</EncodingType>".to_string()
1449        } else {
1450            String::new()
1451        };
1452
1453        let body = format!(
1454            "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
1455             <ListBucketResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
1456             <Name>{bucket}</Name>\
1457             {prefix_xml}\
1458             {marker_xml}\
1459             <MaxKeys>{max_keys}</MaxKeys>\
1460             {delimiter_xml}\
1461             {encoding_xml}\
1462             <IsTruncated>{is_truncated}</IsTruncated>\
1463             {contents}\
1464             {common_prefixes_xml}\
1465             {next_marker}\
1466             </ListBucketResult>",
1467        );
1468        Ok(s3_xml(StatusCode::OK, body))
1469    }
1470
1471    fn list_objects_v2(
1472        &self,
1473        req: &AwsRequest,
1474        bucket: &str,
1475    ) -> Result<AwsResponse, AwsServiceError> {
1476        let state = self.state.read();
1477        let b = state
1478            .buckets
1479            .get(bucket)
1480            .ok_or_else(|| no_such_bucket(bucket))?;
1481
1482        let prefix = req.query_params.get("prefix").cloned().unwrap_or_default();
1483        let delimiter = req
1484            .query_params
1485            .get("delimiter")
1486            .cloned()
1487            .unwrap_or_default();
1488        let max_keys: usize = req
1489            .query_params
1490            .get("max-keys")
1491            .and_then(|v| v.parse().ok())
1492            .unwrap_or(1000);
1493        let start_after = req
1494            .query_params
1495            .get("start-after")
1496            .cloned()
1497            .unwrap_or_default();
1498        let continuation = req.query_params.get("continuation-token").cloned();
1499        if let Some(ref ct) = continuation {
1500            if ct.is_empty() {
1501                return Err(AwsServiceError::aws_error(
1502                    StatusCode::BAD_REQUEST,
1503                    "InvalidArgument",
1504                    "The continuation token provided is incorrect",
1505                ));
1506            }
1507        }
1508        let fetch_owner = req
1509            .query_params
1510            .get("fetch-owner")
1511            .map(|v| v == "true")
1512            .unwrap_or(false);
1513
1514        let effective_start = continuation.as_deref().unwrap_or(&start_after);
1515
1516        let mut contents = String::new();
1517        let mut common_prefixes: Vec<String> = Vec::new();
1518        let mut count = 0;
1519        let mut is_truncated = false;
1520        let mut last_key = String::new();
1521
1522        for (key, obj) in &b.objects {
1523            if obj.is_delete_marker {
1524                continue;
1525            }
1526            if !key.starts_with(&prefix) {
1527                continue;
1528            }
1529            if !effective_start.is_empty() && key.as_str() <= effective_start {
1530                continue;
1531            }
1532
1533            // Handle delimiter-based grouping
1534            if !delimiter.is_empty() {
1535                if prefix.len() > key.len() {
1536                    continue;
1537                }
1538                let suffix = &key[prefix.len()..];
1539                if let Some(pos) = suffix.find(&delimiter) {
1540                    let end = (pos + delimiter.len()).min(suffix.len());
1541                    let cp = format!("{}{}", prefix, &suffix[..end]);
1542                    if !common_prefixes.contains(&cp) {
1543                        if count >= max_keys {
1544                            is_truncated = true;
1545                            break;
1546                        }
1547                        common_prefixes.push(cp);
1548                        last_key = key.clone();
1549                        count += 1;
1550                    }
1551                    continue;
1552                }
1553            }
1554
1555            if count >= max_keys {
1556                is_truncated = true;
1557                break;
1558            }
1559
1560            let owner_xml = if fetch_owner {
1561                let oid = obj.acl_owner_id.as_deref().unwrap_or(&b.acl_owner_id);
1562                format!(
1563                    "<Owner><ID>{}</ID><DisplayName>{}</DisplayName></Owner>",
1564                    xml_escape(oid),
1565                    xml_escape(oid),
1566                )
1567            } else {
1568                String::new()
1569            };
1570
1571            let checksum_xml = if let Some(ref algo) = obj.checksum_algorithm {
1572                format!(
1573                    "<ChecksumAlgorithm>{}</ChecksumAlgorithm>",
1574                    xml_escape(algo)
1575                )
1576            } else {
1577                String::new()
1578            };
1579
1580            let use_url_enc =
1581                req.query_params.get("encoding-type").map(|s| s.as_str()) == Some("url");
1582            let display_key = if use_url_enc {
1583                url_encode_s3_key(key)
1584            } else {
1585                xml_escape(key)
1586            };
1587
1588            contents.push_str(&format!(
1589                "<Contents>\
1590                 <Key>{}</Key>\
1591                 <LastModified>{}</LastModified>\
1592                 <ETag>&quot;{}&quot;</ETag>\
1593                 <Size>{}</Size>\
1594                 <StorageClass>{}</StorageClass>\
1595                 {owner_xml}{checksum_xml}\
1596                 </Contents>",
1597                display_key,
1598                obj.last_modified.format("%Y-%m-%dT%H:%M:%S%.3fZ"),
1599                obj.etag,
1600                obj.size,
1601                obj.storage_class,
1602            ));
1603            last_key = key.clone();
1604            count += 1;
1605        }
1606
1607        let encoding_type = req.query_params.get("encoding-type").cloned();
1608        let use_url_encoding = encoding_type.as_deref() == Some("url");
1609
1610        let mut common_prefixes_xml = String::new();
1611        for cp in &common_prefixes {
1612            let display_cp = if use_url_encoding {
1613                url_encode_s3_key(cp)
1614            } else {
1615                xml_escape(cp)
1616            };
1617            common_prefixes_xml.push_str(&format!(
1618                "<CommonPrefixes><Prefix>{display_cp}</Prefix></CommonPrefixes>",
1619            ));
1620        }
1621
1622        let next_token = if is_truncated {
1623            format!(
1624                "<NextContinuationToken>{}</NextContinuationToken>",
1625                xml_escape(&last_key)
1626            )
1627        } else {
1628            String::new()
1629        };
1630
1631        let cont_token = if let Some(ct) = &continuation {
1632            format!("<ContinuationToken>{}</ContinuationToken>", xml_escape(ct))
1633        } else {
1634            String::new()
1635        };
1636
1637        let encoding_xml = if use_url_encoding {
1638            "<EncodingType>url</EncodingType>".to_string()
1639        } else {
1640            String::new()
1641        };
1642        let delimiter_xml = if delimiter.is_empty() {
1643            String::new()
1644        } else {
1645            format!("<Delimiter>{}</Delimiter>", xml_escape(&delimiter))
1646        };
1647        // StartAfter is only included when no ContinuationToken is present
1648        let start_after_xml = if start_after.is_empty() || continuation.is_some() {
1649            String::new()
1650        } else {
1651            format!("<StartAfter>{}</StartAfter>", xml_escape(&start_after))
1652        };
1653
1654        let body = format!(
1655            "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
1656             <ListBucketResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
1657             <Name>{bucket}</Name><Prefix>{prefix}</Prefix>{delimiter_xml}{encoding_xml}\
1658             <KeyCount>{count}</KeyCount>\
1659             <MaxKeys>{max_keys}</MaxKeys>{start_after_xml}<IsTruncated>{is_truncated}</IsTruncated>\
1660             {cont_token}{next_token}{contents}{common_prefixes_xml}</ListBucketResult>",
1661            prefix = if use_url_encoding { url_encode_s3_key(&prefix) } else { xml_escape(&prefix) },
1662        );
1663        Ok(s3_xml(StatusCode::OK, body))
1664    }
1665
1666    fn get_bucket_tagging(
1667        &self,
1668        _req: &AwsRequest,
1669        bucket: &str,
1670    ) -> Result<AwsResponse, AwsServiceError> {
1671        let state = self.state.read();
1672        let b = state
1673            .buckets
1674            .get(bucket)
1675            .ok_or_else(|| no_such_bucket(bucket))?;
1676        if b.tags.is_empty() {
1677            return Err(AwsServiceError::aws_error_with_fields(
1678                StatusCode::NOT_FOUND,
1679                "NoSuchTagSet",
1680                "The TagSet does not exist",
1681                vec![("BucketName".to_string(), b.name.clone())],
1682            ));
1683        }
1684        let mut tags_xml = String::new();
1685        for (k, v) in &b.tags {
1686            tags_xml.push_str(&format!(
1687                "<Tag><Key>{}</Key><Value>{}</Value></Tag>",
1688                xml_escape(k),
1689                xml_escape(v),
1690            ));
1691        }
1692        let body = format!(
1693            "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
1694             <Tagging xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
1695             <TagSet>{tags_xml}</TagSet></Tagging>"
1696        );
1697        Ok(s3_xml(StatusCode::OK, body))
1698    }
1699
1700    fn put_bucket_tagging(
1701        &self,
1702        req: &AwsRequest,
1703        bucket: &str,
1704    ) -> Result<AwsResponse, AwsServiceError> {
1705        let body_str = std::str::from_utf8(&req.body).unwrap_or("");
1706        let tags = parse_tagging_xml(body_str);
1707
1708        // Validate tags: no duplicate keys
1709        validate_tags(&tags)?;
1710
1711        let mut state = self.state.write();
1712        let b = state
1713            .buckets
1714            .get_mut(bucket)
1715            .ok_or_else(|| no_such_bucket(bucket))?;
1716        b.tags = tags.into_iter().collect();
1717        Ok(AwsResponse {
1718            status: StatusCode::NO_CONTENT,
1719            content_type: "application/xml".to_string(),
1720            body: Bytes::new(),
1721            headers: HeaderMap::new(),
1722        })
1723    }
1724
1725    fn delete_bucket_tagging(
1726        &self,
1727        _req: &AwsRequest,
1728        bucket: &str,
1729    ) -> Result<AwsResponse, AwsServiceError> {
1730        let mut state = self.state.write();
1731        let b = state
1732            .buckets
1733            .get_mut(bucket)
1734            .ok_or_else(|| no_such_bucket(bucket))?;
1735        b.tags.clear();
1736        Ok(AwsResponse {
1737            status: StatusCode::NO_CONTENT,
1738            content_type: "application/xml".to_string(),
1739            body: Bytes::new(),
1740            headers: HeaderMap::new(),
1741        })
1742    }
1743
1744    // ---- Bucket ACL ----
1745
1746    fn get_bucket_acl(
1747        &self,
1748        req: &AwsRequest,
1749        bucket: &str,
1750    ) -> Result<AwsResponse, AwsServiceError> {
1751        let state = self.state.read();
1752        let b = state
1753            .buckets
1754            .get(bucket)
1755            .ok_or_else(|| no_such_bucket(bucket))?;
1756
1757        let body = build_acl_xml(&b.acl_owner_id, &b.acl_grants, &req.account_id);
1758        Ok(s3_xml(StatusCode::OK, body))
1759    }
1760
1761    fn put_bucket_acl(
1762        &self,
1763        req: &AwsRequest,
1764        bucket: &str,
1765    ) -> Result<AwsResponse, AwsServiceError> {
1766        // Check for canned ACL header
1767        let canned = req
1768            .headers
1769            .get("x-amz-acl")
1770            .and_then(|v| v.to_str().ok())
1771            .map(|s| s.to_string());
1772
1773        let mut state = self.state.write();
1774        let b = state
1775            .buckets
1776            .get_mut(bucket)
1777            .ok_or_else(|| no_such_bucket(bucket))?;
1778
1779        if let Some(acl) = canned {
1780            b.acl_grants = canned_acl_grants(&acl, &b.acl_owner_id.clone());
1781        } else {
1782            // Parse ACL from body (AccessControlPolicy XML)
1783            let body_str = std::str::from_utf8(&req.body).unwrap_or("");
1784            let grants = parse_acl_xml(body_str)?;
1785            b.acl_grants = grants;
1786        }
1787
1788        Ok(AwsResponse {
1789            status: StatusCode::OK,
1790            content_type: "application/xml".to_string(),
1791            body: Bytes::new(),
1792            headers: HeaderMap::new(),
1793        })
1794    }
1795
1796    // ---- Bucket Versioning ----
1797
1798    fn put_bucket_versioning(
1799        &self,
1800        req: &AwsRequest,
1801        bucket: &str,
1802    ) -> Result<AwsResponse, AwsServiceError> {
1803        let body_str = std::str::from_utf8(&req.body).unwrap_or("");
1804        let status_val = extract_xml_value(body_str, "Status").unwrap_or_default();
1805
1806        let mut state = self.state.write();
1807        let b = state
1808            .buckets
1809            .get_mut(bucket)
1810            .ok_or_else(|| no_such_bucket(bucket))?;
1811        if status_val == "Enabled" || status_val == "Suspended" {
1812            b.versioning = Some(status_val);
1813        }
1814        Ok(AwsResponse {
1815            status: StatusCode::OK,
1816            content_type: "application/xml".to_string(),
1817            body: Bytes::new(),
1818            headers: HeaderMap::new(),
1819        })
1820    }
1821
1822    fn get_bucket_versioning(&self, bucket: &str) -> Result<AwsResponse, AwsServiceError> {
1823        let state = self.state.read();
1824        let b = state
1825            .buckets
1826            .get(bucket)
1827            .ok_or_else(|| no_such_bucket(bucket))?;
1828        let status_xml = match &b.versioning {
1829            Some(s) => format!("<Status>{s}</Status>"),
1830            None => String::new(),
1831        };
1832        let body = format!(
1833            "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
1834             <VersioningConfiguration xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
1835             {status_xml}\
1836             </VersioningConfiguration>"
1837        );
1838        Ok(s3_xml(StatusCode::OK, body))
1839    }
1840
1841    fn list_object_versions(
1842        &self,
1843        req: &AwsRequest,
1844        bucket: &str,
1845    ) -> Result<AwsResponse, AwsServiceError> {
1846        let state = self.state.read();
1847        let b = state
1848            .buckets
1849            .get(bucket)
1850            .ok_or_else(|| no_such_bucket(bucket))?;
1851
1852        let prefix = req.query_params.get("prefix").cloned().unwrap_or_default();
1853        let delimiter = req.query_params.get("delimiter").cloned();
1854        let key_marker = req
1855            .query_params
1856            .get("key-marker")
1857            .cloned()
1858            .unwrap_or_default();
1859        let version_id_marker = req.query_params.get("version-id-marker").cloned();
1860        let max_keys: usize = req
1861            .query_params
1862            .get("max-keys")
1863            .and_then(|s| s.parse().ok())
1864            .unwrap_or(1000);
1865
1866        let owner_id = &b.acl_owner_id;
1867
1868        // Build a sorted list of all version entries: (key, obj, is_latest)
1869        let mut all_entries: Vec<(&str, &S3Object, bool)> = Vec::new();
1870
1871        if b.object_versions.is_empty() {
1872            // No versioning history — every object in b.objects is the only version
1873            for (key, obj) in &b.objects {
1874                all_entries.push((key.as_str(), obj, true));
1875            }
1876        } else {
1877            // Collect versioned keys
1878            let mut keys: Vec<&String> = b.object_versions.keys().collect();
1879            keys.sort();
1880            for key in &keys {
1881                if let Some(versions) = b.object_versions.get(key.as_str()) {
1882                    let len = versions.len();
1883                    // Latest version is last in the vec; iterate newest-first
1884                    for (i, obj) in versions.iter().enumerate().rev() {
1885                        let is_latest = i == len - 1;
1886                        all_entries.push((key.as_str(), obj, is_latest));
1887                    }
1888                }
1889            }
1890            // Include non-versioned objects (keys not in object_versions)
1891            for (key, obj) in &b.objects {
1892                if !b.object_versions.contains_key(key) {
1893                    all_entries.push((key.as_str(), obj, true));
1894                }
1895            }
1896            // Sort by key, then newest-first within key (already done by rev above,
1897            // but we need global sort since we mixed in non-versioned objects)
1898            all_entries.sort_by(|a, b_entry| a.0.cmp(b_entry.0));
1899        }
1900
1901        // Filter by prefix
1902        all_entries.retain(|(key, _, _)| key.starts_with(prefix.as_str()));
1903
1904        // Apply key-marker / version-id-marker pagination
1905        if !key_marker.is_empty() {
1906            let vid_marker = version_id_marker.as_deref();
1907            let mut skip = true;
1908            all_entries.retain(|(key, obj, _)| {
1909                if !skip {
1910                    return true;
1911                }
1912                if *key < key_marker.as_str() {
1913                    return false; // before marker, skip
1914                }
1915                if *key > key_marker.as_str() {
1916                    skip = false;
1917                    return true; // past marker key, include
1918                }
1919                // key == key_marker: skip until we find the version_id_marker
1920                if let Some(vid) = vid_marker {
1921                    if obj.version_id.as_deref().unwrap_or("null") == vid {
1922                        // Found the marker version — skip it, include everything after
1923                        skip = false;
1924                        return false;
1925                    }
1926                    false // still before the version marker
1927                } else {
1928                    false // skip entire key_marker key when no version-id-marker
1929                }
1930            });
1931        }
1932
1933        // Handle delimiter: collect common prefixes
1934        let mut common_prefixes: Vec<String> = Vec::new();
1935        if let Some(ref delim) = delimiter {
1936            let mut filtered_entries = Vec::new();
1937            let mut seen_prefixes = std::collections::HashSet::new();
1938            for entry @ (key, _, _) in &all_entries {
1939                let after_prefix = &key[prefix.len()..];
1940                if let Some(pos) = after_prefix.find(delim.as_str()) {
1941                    let cp = format!("{}{}", prefix, &after_prefix[..pos + delim.len()]);
1942                    if seen_prefixes.insert(cp.clone()) {
1943                        common_prefixes.push(cp);
1944                    }
1945                } else {
1946                    filtered_entries.push(*entry);
1947                }
1948            }
1949            all_entries = filtered_entries;
1950        }
1951
1952        // Pagination: truncate at max_keys (count versions + delete markers + common prefixes)
1953        let total_items = all_entries.len() + common_prefixes.len();
1954        let is_truncated = total_items > max_keys;
1955
1956        // We need to limit versions to max_keys minus common_prefixes already counted
1957        let version_limit = max_keys.saturating_sub(common_prefixes.len());
1958        let truncated_entries: Vec<_> = all_entries.iter().take(version_limit).collect();
1959        let next_markers = if is_truncated && !truncated_entries.is_empty() {
1960            let last = truncated_entries.last().unwrap();
1961            Some((
1962                last.0.to_string(),
1963                last.1
1964                    .version_id
1965                    .clone()
1966                    .unwrap_or_else(|| "null".to_string()),
1967            ))
1968        } else {
1969            None
1970        };
1971
1972        // Build XML
1973        let mut versions_xml = String::new();
1974        for (key, obj, is_latest) in &truncated_entries {
1975            if obj.is_delete_marker {
1976                versions_xml.push_str(&format!(
1977                    "<DeleteMarker>\
1978                     <Key>{}</Key>\
1979                     <VersionId>{}</VersionId>\
1980                     <IsLatest>{}</IsLatest>\
1981                     <LastModified>{}</LastModified>\
1982                     <Owner><ID>{owner_id}</ID><DisplayName>{owner_id}</DisplayName></Owner>\
1983                     </DeleteMarker>",
1984                    xml_escape(key),
1985                    obj.version_id.as_deref().unwrap_or("null"),
1986                    is_latest,
1987                    obj.last_modified.format("%Y-%m-%dT%H:%M:%S%.3fZ"),
1988                ));
1989            } else {
1990                versions_xml.push_str(&format!(
1991                    "<Version>\
1992                     <Key>{}</Key>\
1993                     <VersionId>{}</VersionId>\
1994                     <IsLatest>{}</IsLatest>\
1995                     <LastModified>{}</LastModified>\
1996                     <ETag>&quot;{}&quot;</ETag>\
1997                     <Size>{}</Size>\
1998                     <Owner><ID>{owner_id}</ID><DisplayName>{owner_id}</DisplayName></Owner>\
1999                     <StorageClass>{}</StorageClass>\
2000                     </Version>",
2001                    xml_escape(key),
2002                    obj.version_id.as_deref().unwrap_or("null"),
2003                    is_latest,
2004                    obj.last_modified.format("%Y-%m-%dT%H:%M:%S%.3fZ"),
2005                    obj.etag,
2006                    obj.size,
2007                    obj.storage_class,
2008                ));
2009            }
2010        }
2011
2012        // Common prefixes
2013        let mut cp_xml = String::new();
2014        for cp in &common_prefixes {
2015            cp_xml.push_str(&format!(
2016                "<CommonPrefixes><Prefix>{}</Prefix></CommonPrefixes>",
2017                xml_escape(cp),
2018            ));
2019        }
2020
2021        // Pagination markers
2022        let marker_xml = if let Some((ref nk, ref nv)) = next_markers {
2023            format!(
2024                "<NextKeyMarker>{}</NextKeyMarker>\
2025                 <NextVersionIdMarker>{}</NextVersionIdMarker>",
2026                xml_escape(nk),
2027                xml_escape(nv),
2028            )
2029        } else {
2030            String::new()
2031        };
2032
2033        let delimiter_xml = delimiter
2034            .as_ref()
2035            .map(|d| format!("<Delimiter>{}</Delimiter>", xml_escape(d)))
2036            .unwrap_or_default();
2037
2038        let body = format!(
2039            "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
2040             <ListVersionsResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
2041             <Name>{name}</Name>\
2042             <Prefix>{pfx}</Prefix>\
2043             <KeyMarker>{km}</KeyMarker>\
2044             {delimiter_xml}\
2045             <MaxKeys>{max_keys}</MaxKeys>\
2046             <IsTruncated>{is_truncated}</IsTruncated>\
2047             {marker_xml}\
2048             {versions_xml}\
2049             {cp_xml}\
2050             </ListVersionsResult>",
2051            name = xml_escape(bucket),
2052            pfx = xml_escape(&prefix),
2053            km = xml_escape(&key_marker),
2054        );
2055        Ok(s3_xml(StatusCode::OK, body))
2056    }
2057
2058    fn get_object_lock_configuration(&self, bucket: &str) -> Result<AwsResponse, AwsServiceError> {
2059        let state = self.state.read();
2060        let b = state
2061            .buckets
2062            .get(bucket)
2063            .ok_or_else(|| no_such_bucket(bucket))?;
2064        match &b.object_lock_config {
2065            Some(config) => Ok(s3_xml(StatusCode::OK, config.clone())),
2066            None => Err(AwsServiceError::aws_error(
2067                StatusCode::NOT_FOUND,
2068                "ObjectLockConfigurationNotFoundError",
2069                "Object Lock configuration does not exist for this bucket",
2070            )),
2071        }
2072    }
2073
2074    fn put_bucket_replication(
2075        &self,
2076        req: &AwsRequest,
2077        bucket: &str,
2078    ) -> Result<AwsResponse, AwsServiceError> {
2079        let body_str = std::str::from_utf8(&req.body).unwrap_or("").to_string();
2080        let mut state = self.state.write();
2081        let b = state
2082            .buckets
2083            .get_mut(bucket)
2084            .ok_or_else(|| no_such_bucket(bucket))?;
2085
2086        // Versioning must be enabled to set replication
2087        if b.versioning.as_deref() != Some("Enabled") {
2088            return Err(AwsServiceError::aws_error_with_fields(
2089                StatusCode::BAD_REQUEST,
2090                "InvalidRequest",
2091                "Versioning must be 'Enabled' on the bucket to apply a replication configuration",
2092                vec![("BucketName".to_string(), bucket.to_string())],
2093            ));
2094        }
2095
2096        b.replication_config = Some(normalize_replication_xml(&body_str));
2097        Ok(empty_response(StatusCode::OK))
2098    }
2099
2100    fn get_bucket_replication(&self, bucket: &str) -> Result<AwsResponse, AwsServiceError> {
2101        let state = self.state.read();
2102        let b = state
2103            .buckets
2104            .get(bucket)
2105            .ok_or_else(|| no_such_bucket(bucket))?;
2106        match &b.replication_config {
2107            Some(config) => Ok(s3_xml(StatusCode::OK, config.clone())),
2108            None => Err(AwsServiceError::aws_error_with_fields(
2109                StatusCode::NOT_FOUND,
2110                "ReplicationConfigurationNotFoundError",
2111                "The replication configuration was not found",
2112                vec![("BucketName".to_string(), bucket.to_string())],
2113            )),
2114        }
2115    }
2116
2117    fn delete_bucket_replication(&self, bucket: &str) -> Result<AwsResponse, AwsServiceError> {
2118        let mut state = self.state.write();
2119        let b = state
2120            .buckets
2121            .get_mut(bucket)
2122            .ok_or_else(|| no_such_bucket(bucket))?;
2123        b.replication_config = None;
2124        Ok(empty_response(StatusCode::NO_CONTENT))
2125    }
2126
2127    fn put_bucket_ownership_controls(
2128        &self,
2129        req: &AwsRequest,
2130        bucket: &str,
2131    ) -> Result<AwsResponse, AwsServiceError> {
2132        let body_str = std::str::from_utf8(&req.body).unwrap_or("").to_string();
2133        let mut state = self.state.write();
2134        let b = state
2135            .buckets
2136            .get_mut(bucket)
2137            .ok_or_else(|| no_such_bucket(bucket))?;
2138        b.ownership_controls = Some(body_str);
2139        Ok(empty_response(StatusCode::OK))
2140    }
2141
2142    fn get_bucket_ownership_controls(&self, bucket: &str) -> Result<AwsResponse, AwsServiceError> {
2143        let state = self.state.read();
2144        let b = state
2145            .buckets
2146            .get(bucket)
2147            .ok_or_else(|| no_such_bucket(bucket))?;
2148        match &b.ownership_controls {
2149            Some(config) => Ok(s3_xml(StatusCode::OK, config.clone())),
2150            None => Err(AwsServiceError::aws_error_with_fields(
2151                StatusCode::NOT_FOUND,
2152                "OwnershipControlsNotFoundError",
2153                "The bucket ownership controls were not found",
2154                vec![("BucketName".to_string(), bucket.to_string())],
2155            )),
2156        }
2157    }
2158
2159    fn delete_bucket_ownership_controls(
2160        &self,
2161        bucket: &str,
2162    ) -> Result<AwsResponse, AwsServiceError> {
2163        let mut state = self.state.write();
2164        let b = state
2165            .buckets
2166            .get_mut(bucket)
2167            .ok_or_else(|| no_such_bucket(bucket))?;
2168        b.ownership_controls = None;
2169        Ok(empty_response(StatusCode::NO_CONTENT))
2170    }
2171
2172    fn put_bucket_inventory(
2173        &self,
2174        req: &AwsRequest,
2175        bucket: &str,
2176    ) -> Result<AwsResponse, AwsServiceError> {
2177        let body_str = std::str::from_utf8(&req.body).unwrap_or("").to_string();
2178        // Use the Id from the XML body if available, otherwise fall back to query param
2179        let inv_id = extract_xml_value(&body_str, "Id")
2180            .or_else(|| req.query_params.get("id").cloned())
2181            .unwrap_or_default();
2182        let mut state = self.state.write();
2183        let b = state
2184            .buckets
2185            .get_mut(bucket)
2186            .ok_or_else(|| no_such_bucket(bucket))?;
2187        b.inventory_configs.insert(inv_id, body_str);
2188        Ok(empty_response(StatusCode::OK))
2189    }
2190
2191    fn get_bucket_inventory(
2192        &self,
2193        req: &AwsRequest,
2194        bucket: &str,
2195    ) -> Result<AwsResponse, AwsServiceError> {
2196        let inv_id = req.query_params.get("id").cloned().unwrap_or_default();
2197        let state = self.state.read();
2198        let b = state
2199            .buckets
2200            .get(bucket)
2201            .ok_or_else(|| no_such_bucket(bucket))?;
2202        match b.inventory_configs.get(&inv_id) {
2203            Some(config) => Ok(s3_xml(StatusCode::OK, config.clone())),
2204            None => Err(AwsServiceError::aws_error(
2205                StatusCode::NOT_FOUND,
2206                "NoSuchConfiguration",
2207                format!("The specified configuration does not exist: {inv_id}"),
2208            )),
2209        }
2210    }
2211
2212    fn list_bucket_inventory_configurations(
2213        &self,
2214        bucket: &str,
2215    ) -> Result<AwsResponse, AwsServiceError> {
2216        let state = self.state.read();
2217        let b = state
2218            .buckets
2219            .get(bucket)
2220            .ok_or_else(|| no_such_bucket(bucket))?;
2221
2222        let mut body = String::from(
2223            "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
2224             <ListInventoryConfigurationsResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
2225             <IsTruncated>false</IsTruncated>",
2226        );
2227        let mut sorted_keys: Vec<_> = b.inventory_configs.keys().collect();
2228        sorted_keys.sort();
2229        for key in sorted_keys {
2230            if let Some(config) = b.inventory_configs.get(key) {
2231                body.push_str(config);
2232            }
2233        }
2234        body.push_str("</ListInventoryConfigurationsResult>");
2235        Ok(s3_xml(StatusCode::OK, body))
2236    }
2237
2238    fn delete_bucket_inventory(
2239        &self,
2240        req: &AwsRequest,
2241        bucket: &str,
2242    ) -> Result<AwsResponse, AwsServiceError> {
2243        let inv_id = req.query_params.get("id").cloned().unwrap_or_default();
2244        let mut state = self.state.write();
2245        let b = state
2246            .buckets
2247            .get_mut(bucket)
2248            .ok_or_else(|| no_such_bucket(bucket))?;
2249        b.inventory_configs.remove(&inv_id);
2250        Ok(empty_response(StatusCode::NO_CONTENT))
2251    }
2252}
2253
2254// ---------------------------------------------------------------------------
2255// Object operations
2256// ---------------------------------------------------------------------------
2257impl S3Service {
2258    fn put_object(
2259        &self,
2260        req: &AwsRequest,
2261        bucket: &str,
2262        key: &str,
2263    ) -> Result<AwsResponse, AwsServiceError> {
2264        // Validate key length
2265        if key.len() > 1024 {
2266            return Err(AwsServiceError::aws_error(
2267                StatusCode::BAD_REQUEST,
2268                "KeyTooLongError",
2269                "Your key is too long",
2270            ));
2271        }
2272
2273        // Check for If-None-Match conditional on PUT
2274        let if_none_match = req
2275            .headers
2276            .get("if-none-match")
2277            .and_then(|v| v.to_str().ok())
2278            .map(|s| s.to_string());
2279
2280        // Check for If-Match conditional on PUT
2281        let if_match = req
2282            .headers
2283            .get("if-match")
2284            .and_then(|v| v.to_str().ok())
2285            .map(|s| s.to_string());
2286
2287        // Check for x-amz-tagging header
2288        let tagging_header = req
2289            .headers
2290            .get("x-amz-tagging")
2291            .and_then(|v| v.to_str().ok())
2292            .map(|s| s.to_string());
2293
2294        // Check for ACL header
2295        let acl_header = req
2296            .headers
2297            .get("x-amz-acl")
2298            .and_then(|v| v.to_str().ok())
2299            .map(|s| s.to_string());
2300
2301        // Check for grant headers alongside canned ACL
2302        let has_grant_headers = req.headers.keys().any(|k| {
2303            let name = k.as_str();
2304            name.starts_with("x-amz-grant-")
2305        });
2306
2307        if acl_header.is_some() && has_grant_headers {
2308            return Err(AwsServiceError::aws_error(
2309                StatusCode::BAD_REQUEST,
2310                "InvalidRequest",
2311                "Specifying both Canned ACLs and Header Grants is not allowed",
2312            ));
2313        }
2314
2315        // Parse tags from header
2316        let tags = if let Some(tagging) = &tagging_header {
2317            let parsed = parse_url_encoded_tags(tagging);
2318            // Validate aws: prefix
2319            for (k, _) in &parsed {
2320                if k.starts_with("aws:") {
2321                    return Err(AwsServiceError::aws_error(
2322                        StatusCode::BAD_REQUEST,
2323                        "InvalidTag",
2324                        "Your TagKey cannot be prefixed with aws:",
2325                    ));
2326                }
2327            }
2328            parsed.into_iter().collect()
2329        } else {
2330            std::collections::HashMap::new()
2331        };
2332
2333        let mut state = self.state.write();
2334        let b = state
2335            .buckets
2336            .get_mut(bucket)
2337            .ok_or_else(|| no_such_bucket(bucket))?;
2338
2339        // Handle If-Match: check existing object etag
2340        if let Some(ref if_match_val) = if_match {
2341            match b.objects.get(key) {
2342                Some(existing) => {
2343                    let existing_etag = format!("\"{}\"", existing.etag);
2344                    if !etag_matches(if_match_val, &existing_etag) {
2345                        return Err(precondition_failed("If-Match"));
2346                    }
2347                }
2348                None => {
2349                    return Err(no_such_key(key));
2350                }
2351            }
2352        }
2353
2354        // Handle If-None-Match: if "*", fail if object already exists
2355        if let Some(ref inm) = if_none_match {
2356            if inm.trim() == "*" && b.objects.contains_key(key) {
2357                return Err(precondition_failed("If-None-Match"));
2358            }
2359        }
2360
2361        let data = req.body.clone();
2362        let data_size = data.len() as u64;
2363        let etag = compute_md5(&data);
2364        let content_type = req
2365            .headers
2366            .get("content-type")
2367            .and_then(|v| v.to_str().ok())
2368            .unwrap_or("binary/octet-stream")
2369            .to_string();
2370        let version_id = if b.versioning.as_deref() == Some("Enabled") {
2371            Some(uuid::Uuid::new_v4().to_string())
2372        } else {
2373            None
2374        };
2375        let content_encoding = req
2376            .headers
2377            .get("content-encoding")
2378            .and_then(|v| v.to_str().ok())
2379            .map(|s| s.to_string());
2380        let storage_class = req
2381            .headers
2382            .get("x-amz-storage-class")
2383            .and_then(|v| v.to_str().ok())
2384            .unwrap_or("STANDARD")
2385            .to_string();
2386        if !is_valid_storage_class(&storage_class) {
2387            return Err(AwsServiceError::aws_error(
2388                StatusCode::BAD_REQUEST,
2389                "InvalidStorageClass",
2390                "The storage class you specified is not valid",
2391            ));
2392        }
2393        let website_redirect_location = req
2394            .headers
2395            .get("x-amz-website-redirect-location")
2396            .and_then(|v| v.to_str().ok())
2397            .map(|s| s.to_string());
2398
2399        let metadata = extract_user_metadata(&req.headers);
2400
2401        // Extract checksum algorithm and value
2402        let checksum_algorithm = req
2403            .headers
2404            .get("x-amz-sdk-checksum-algorithm")
2405            .or_else(|| req.headers.get("x-amz-checksum-algorithm"))
2406            .and_then(|v| v.to_str().ok())
2407            .map(|s| s.to_string());
2408        let _checksum_from_header = checksum_algorithm.as_deref().and_then(|algo| {
2409            let header_name = format!("x-amz-checksum-{}", algo.to_lowercase());
2410            req.headers
2411                .get(header_name.as_str())
2412                .and_then(|v| v.to_str().ok())
2413                .map(|s| s.to_string())
2414        });
2415
2416        // Build ACL grants for object
2417        let acl_grants = if has_grant_headers {
2418            parse_grant_headers(&req.headers)
2419        } else if let Some(ref acl) = acl_header {
2420            canned_acl_grants_for_object(acl, &b.acl_owner_id)
2421        } else {
2422            // Default: owner full control
2423            vec![AclGrant {
2424                grantee_type: "CanonicalUser".to_string(),
2425                grantee_id: Some(b.acl_owner_id.clone()),
2426                grantee_display_name: Some(b.acl_owner_id.clone()),
2427                grantee_uri: None,
2428                permission: "FULL_CONTROL".to_string(),
2429            }]
2430        };
2431
2432        // SSE headers
2433        let mut sse_algorithm = req
2434            .headers
2435            .get("x-amz-server-side-encryption")
2436            .and_then(|v| v.to_str().ok())
2437            .map(|s| s.to_string());
2438        let mut sse_kms_key_id = req
2439            .headers
2440            .get("x-amz-server-side-encryption-aws-kms-key-id")
2441            .and_then(|v| v.to_str().ok())
2442            .map(|s| s.to_string());
2443        let bucket_key_enabled = req
2444            .headers
2445            .get("x-amz-server-side-encryption-bucket-key-enabled")
2446            .and_then(|v| v.to_str().ok())
2447            .map(|s| s.eq_ignore_ascii_case("true"));
2448
2449        // Apply bucket default encryption if no explicit SSE headers
2450        if sse_algorithm.is_none() {
2451            if let Some(ref enc_config) = b.encryption_config {
2452                if let Some(algo) = extract_xml_value(enc_config, "SSEAlgorithm") {
2453                    if algo == "aws:kms" && sse_kms_key_id.is_none() {
2454                        sse_kms_key_id = extract_xml_value(enc_config, "KMSMasterKeyID");
2455                    }
2456                    sse_algorithm = Some(algo);
2457                }
2458            }
2459        }
2460
2461        // Validate KMS key exists when using aws:kms encryption
2462        if sse_algorithm.as_deref() == Some("aws:kms") {
2463            if let Some(ref kms) = self.kms_state {
2464                if let Some(ref key_id) = sse_kms_key_id {
2465                    let kms_state = kms.read();
2466                    let key_exists = kms_state
2467                        .keys
2468                        .values()
2469                        .any(|k| k.key_id == *key_id || k.arn == *key_id)
2470                        || kms_state
2471                            .aliases
2472                            .values()
2473                            .any(|a| a.alias_name == *key_id || a.alias_arn == *key_id);
2474                    if !key_exists {
2475                        // Still allow it — AWS doesn't always reject unknown keys
2476                        // for emulation purposes, just set the key ID
2477                        tracing::debug!(
2478                            key_id = %key_id,
2479                            "KMS key not found in state, proceeding anyway"
2480                        );
2481                    } else {
2482                        // Resolve alias to key ARN if needed
2483                        if let Some(alias) = kms_state
2484                            .aliases
2485                            .values()
2486                            .find(|a| a.alias_name == *key_id || a.alias_arn == *key_id)
2487                        {
2488                            if let Some(key) = kms_state.keys.get(&alias.target_key_id) {
2489                                sse_kms_key_id = Some(key.arn.clone());
2490                            }
2491                        } else if let Some(key) =
2492                            kms_state.keys.values().find(|k| k.key_id == *key_id)
2493                        {
2494                            sse_kms_key_id = Some(key.arn.clone());
2495                        }
2496                    }
2497                }
2498            }
2499        }
2500
2501        // Checksum: detect algorithm from various headers
2502        let explicit_checksum_algo = req
2503            .headers
2504            .get("x-amz-checksum-algorithm")
2505            .or_else(|| req.headers.get("x-amz-sdk-checksum-algorithm"))
2506            .and_then(|v| v.to_str().ok())
2507            .map(|s| s.to_uppercase());
2508        let checksum_algorithm = explicit_checksum_algo.clone().or_else(|| {
2509            // Also detect from checksum value headers
2510            if req.headers.contains_key("x-amz-checksum-crc32") {
2511                Some("CRC32".to_string())
2512            } else if req.headers.contains_key("x-amz-checksum-sha1") {
2513                Some("SHA1".to_string())
2514            } else if req.headers.contains_key("x-amz-checksum-sha256") {
2515                Some("SHA256".to_string())
2516            } else {
2517                None
2518            }
2519        });
2520        let checksum_value = checksum_algorithm
2521            .as_deref()
2522            .map(|algo| compute_checksum(algo, &data));
2523
2524        // Object lock: validate that bucket has object lock enabled if lock headers present
2525        let has_lock_headers = req.headers.contains_key("x-amz-object-lock-mode")
2526            || req
2527                .headers
2528                .contains_key("x-amz-object-lock-retain-until-date")
2529            || req.headers.contains_key("x-amz-object-lock-legal-hold");
2530        if has_lock_headers && b.object_lock_config.is_none() {
2531            return Err(AwsServiceError::aws_error(
2532                StatusCode::BAD_REQUEST,
2533                "InvalidRequest",
2534                "Bucket is missing ObjectLockConfiguration",
2535            ));
2536        }
2537
2538        // Object lock - explicit headers or bucket default
2539        let mut lock_mode = req
2540            .headers
2541            .get("x-amz-object-lock-mode")
2542            .and_then(|v| v.to_str().ok())
2543            .map(|s| s.to_string());
2544        let mut lock_retain_until = req
2545            .headers
2546            .get("x-amz-object-lock-retain-until-date")
2547            .and_then(|v| v.to_str().ok())
2548            .and_then(|s| s.parse::<DateTime<Utc>>().ok());
2549        let lock_legal_hold = req
2550            .headers
2551            .get("x-amz-object-lock-legal-hold")
2552            .and_then(|v| v.to_str().ok())
2553            .map(|s| s.to_string());
2554
2555        // Apply bucket default lock if no explicit lock headers
2556        if lock_mode.is_none() && lock_retain_until.is_none() {
2557            if let Some(ref config) = b.object_lock_config {
2558                if let Some(mode) = extract_xml_value(config, "Mode") {
2559                    let days =
2560                        extract_xml_value(config, "Days").and_then(|d| d.parse::<i64>().ok());
2561                    let years =
2562                        extract_xml_value(config, "Years").and_then(|y| y.parse::<i64>().ok());
2563                    let duration = if let Some(d) = days {
2564                        Some(chrono::Duration::days(d))
2565                    } else {
2566                        years.map(|y| chrono::Duration::days(y * 365))
2567                    };
2568                    if let Some(dur) = duration {
2569                        lock_mode = Some(mode);
2570                        lock_retain_until = Some(Utc::now() + dur);
2571                    }
2572                }
2573            }
2574        }
2575
2576        let obj = S3Object {
2577            key: key.to_string(),
2578            size: data.len() as u64,
2579            data,
2580            content_type,
2581            etag: etag.clone(),
2582            last_modified: Utc::now(),
2583            metadata,
2584            storage_class,
2585            tags,
2586            acl_grants,
2587            acl_owner_id: Some(b.acl_owner_id.clone()),
2588            parts_count: None,
2589            part_sizes: None,
2590            sse_algorithm: sse_algorithm.clone(),
2591            sse_kms_key_id: sse_kms_key_id.clone(),
2592            bucket_key_enabled,
2593            version_id: version_id.clone(),
2594            is_delete_marker: false,
2595            content_encoding,
2596            website_redirect_location,
2597            restore_ongoing: None,
2598            restore_expiry: None,
2599            checksum_algorithm: checksum_algorithm.clone(),
2600            checksum_value: checksum_value.clone(),
2601            lock_mode,
2602            lock_retain_until,
2603            lock_legal_hold,
2604        };
2605        if b.versioning.as_deref() == Some("Enabled") {
2606            let versions = b.object_versions.entry(key.to_string()).or_default();
2607            // If the existing current object is a pre-versioning object (no version_id)
2608            // and not yet tracked in object_versions, preserve it.
2609            if versions.is_empty() {
2610                if let Some(existing) = b.objects.get(key) {
2611                    if existing.version_id.is_none() {
2612                        versions.push(existing.clone());
2613                    }
2614                }
2615            }
2616            versions.push(obj.clone());
2617        }
2618        b.objects.insert(key.to_string(), obj);
2619
2620        let mut headers = HeaderMap::new();
2621        headers.insert("etag", format!("\"{etag}\"").parse().unwrap());
2622        if let Some(vid) = &version_id {
2623            headers.insert("x-amz-version-id", vid.parse().unwrap());
2624        }
2625        // Return SSE headers
2626        if let Some(algo) = &sse_algorithm {
2627            headers.insert("x-amz-server-side-encryption", algo.parse().unwrap());
2628        } else {
2629            headers.insert("x-amz-server-side-encryption", "AES256".parse().unwrap());
2630        }
2631        if let Some(kid) = &sse_kms_key_id {
2632            headers.insert(
2633                "x-amz-server-side-encryption-aws-kms-key-id",
2634                kid.parse().unwrap(),
2635            );
2636        }
2637        if bucket_key_enabled == Some(true) {
2638            headers.insert(
2639                "x-amz-server-side-encryption-bucket-key-enabled",
2640                "true".parse().unwrap(),
2641            );
2642        }
2643        // Checksum in response
2644        if let (Some(algo), Some(val)) = (&checksum_algorithm, &checksum_value) {
2645            let header_name = format!("x-amz-checksum-{}", algo.to_lowercase());
2646            if let Ok(name) = header_name.parse::<http::header::HeaderName>() {
2647                if let Ok(hval) = val.parse() {
2648                    headers.insert(name, hval);
2649                }
2650            }
2651            // Echo back the checksum algorithm only when explicitly requested
2652            if explicit_checksum_algo.is_some() {
2653                headers.insert("x-amz-sdk-checksum-algorithm", algo.parse().unwrap());
2654            }
2655        }
2656
2657        // Capture notification config before dropping state lock
2658        let notification_config = b.notification_config.clone();
2659        let obj_size = data_size;
2660        let obj_etag = etag.clone();
2661        let bucket_name = bucket.to_string();
2662        let obj_key = key.to_string();
2663        let region = state.region.clone();
2664        drop(state);
2665
2666        // Deliver S3 event notifications
2667        if let Some(ref config) = notification_config {
2668            deliver_notifications(
2669                &self.delivery,
2670                config,
2671                "ObjectCreated:Put",
2672                &bucket_name,
2673                &obj_key,
2674                obj_size,
2675                &obj_etag,
2676                &region,
2677            );
2678        }
2679
2680        Ok(AwsResponse {
2681            status: StatusCode::OK,
2682            content_type: String::new(),
2683            body: Bytes::new(),
2684            headers,
2685        })
2686    }
2687
2688    fn get_object(
2689        &self,
2690        req: &AwsRequest,
2691        bucket: &str,
2692        key: &str,
2693    ) -> Result<AwsResponse, AwsServiceError> {
2694        let state = self.state.read();
2695        let b = state
2696            .buckets
2697            .get(bucket)
2698            .ok_or_else(|| no_such_bucket(bucket))?;
2699        let obj = resolve_object(b, key, req.query_params.get("versionId"))?;
2700
2701        if obj.is_delete_marker {
2702            return Err(AwsServiceError::aws_error_with_fields(
2703                StatusCode::NOT_FOUND,
2704                "NoSuchKey",
2705                "The specified key does not exist.",
2706                vec![("Key".to_string(), key.to_string())],
2707            ));
2708        }
2709
2710        // Glacier / Deep Archive: cannot GET unless restored
2711        if is_frozen(obj) {
2712            return Err(AwsServiceError::aws_error_with_fields(
2713                StatusCode::FORBIDDEN,
2714                "InvalidObjectState",
2715                "The operation is not valid for the object's storage class",
2716                vec![("StorageClass".to_string(), obj.storage_class.clone())],
2717            ));
2718        }
2719
2720        // Conditional checks
2721        check_get_conditionals(req, obj)?;
2722        let total_size = obj.size as usize;
2723        let mut headers = HeaderMap::new();
2724        headers.insert("etag", format!("\"{}\"", obj.etag).parse().unwrap());
2725        headers.insert(
2726            "last-modified",
2727            obj.last_modified
2728                .format("%a, %d %b %Y %H:%M:%S GMT")
2729                .to_string()
2730                .parse()
2731                .unwrap(),
2732        );
2733        headers.insert("accept-ranges", "bytes".parse().unwrap());
2734        // Always include storage class
2735        headers.insert("x-amz-storage-class", obj.storage_class.parse().unwrap());
2736        if let Some(vid) = &obj.version_id {
2737            headers.insert("x-amz-version-id", vid.parse().unwrap());
2738        }
2739        if let Some(ref enc) = obj.content_encoding {
2740            headers.insert("content-encoding", enc.parse().unwrap());
2741        }
2742        for (k, v) in &obj.metadata {
2743            if let (Ok(name), Ok(val)) = (
2744                format!("x-amz-meta-{k}").parse::<http::header::HeaderName>(),
2745                v.parse::<http::header::HeaderValue>(),
2746            ) {
2747                headers.insert(name, val);
2748            }
2749        }
2750        if let Some(ref redirect) = obj.website_redirect_location {
2751            headers.insert("x-amz-website-redirect-location", redirect.parse().unwrap());
2752        }
2753        if !obj.tags.is_empty() {
2754            headers.insert(
2755                "x-amz-tagging-count",
2756                obj.tags.len().to_string().parse().unwrap(),
2757            );
2758        }
2759
2760        // SSE headers - only when explicitly set
2761        if let Some(algo) = &obj.sse_algorithm {
2762            headers.insert("x-amz-server-side-encryption", algo.parse().unwrap());
2763        }
2764        if let Some(kid) = &obj.sse_kms_key_id {
2765            headers.insert(
2766                "x-amz-server-side-encryption-aws-kms-key-id",
2767                kid.parse().unwrap(),
2768            );
2769        }
2770        if let Some(true) = obj.bucket_key_enabled {
2771            headers.insert(
2772                "x-amz-server-side-encryption-bucket-key-enabled",
2773                "true".parse().unwrap(),
2774            );
2775        }
2776
2777        // Object lock headers
2778        if let Some(ref mode) = obj.lock_mode {
2779            headers.insert("x-amz-object-lock-mode", mode.parse().unwrap());
2780        }
2781        if let Some(ref until) = obj.lock_retain_until {
2782            headers.insert(
2783                "x-amz-object-lock-retain-until-date",
2784                until.to_rfc3339().parse().unwrap(),
2785            );
2786        }
2787        if let Some(ref hold) = obj.lock_legal_hold {
2788            headers.insert("x-amz-object-lock-legal-hold", hold.parse().unwrap());
2789        }
2790        if let Some(ongoing) = obj.restore_ongoing {
2791            let rv = if ongoing {
2792                "ongoing-request=\"true\"".to_string()
2793            } else if let Some(ref exp) = obj.restore_expiry {
2794                format!("ongoing-request=\"false\", expiry-date=\"{exp}\"")
2795            } else {
2796                "ongoing-request=\"false\"".to_string()
2797            };
2798            headers.insert("x-amz-restore", rv.parse().unwrap());
2799        }
2800        let mut response_status = StatusCode::OK;
2801        let response_body;
2802        let mut is_range_request = false;
2803        if let Some(range_str) = req.headers.get("range").and_then(|v| v.to_str().ok()) {
2804            if let Some(rr) = parse_range_header(range_str, total_size) {
2805                match rr {
2806                    RangeResult::Satisfiable { start, end } => {
2807                        headers.insert(
2808                            "content-range",
2809                            format!("bytes {start}-{end}/{total_size}").parse().unwrap(),
2810                        );
2811                        headers.insert(
2812                            "content-length",
2813                            (end - start + 1).to_string().parse().unwrap(),
2814                        );
2815                        response_body = obj.data.slice(start..=end);
2816                        response_status = StatusCode::PARTIAL_CONTENT;
2817                        is_range_request = true;
2818                    }
2819                    RangeResult::NotSatisfiable => {
2820                        return Err(AwsServiceError::aws_error_with_fields(
2821                            StatusCode::RANGE_NOT_SATISFIABLE,
2822                            "InvalidRange",
2823                            "The requested range is not satisfiable",
2824                            vec![
2825                                ("ActualObjectSize".to_string(), total_size.to_string()),
2826                                ("RangeRequested".to_string(), range_str.to_string()),
2827                            ],
2828                        ));
2829                    }
2830                    RangeResult::Ignored => {
2831                        headers.insert("content-length", total_size.to_string().parse().unwrap());
2832                        response_body = obj.data.clone();
2833                    }
2834                }
2835            } else {
2836                headers.insert("content-length", total_size.to_string().parse().unwrap());
2837                response_body = obj.data.clone();
2838            }
2839        } else if let Some(part_num_str) = req.query_params.get("partNumber") {
2840            if let Ok(part_num) = part_num_str.parse::<u32>() {
2841                // Validate part number
2842                let max_parts = obj.parts_count.unwrap_or(1) as usize;
2843                if part_num < 1 || part_num as usize > max_parts {
2844                    return Err(AwsServiceError::aws_error(
2845                        StatusCode::RANGE_NOT_SATISFIABLE,
2846                        "InvalidRange",
2847                        "The requested range is not satisfiable",
2848                    ));
2849                }
2850                let mut part_start: usize = 0;
2851                let mut part_size = total_size;
2852                if let Some(ref part_sizes) = obj.part_sizes {
2853                    let mut offset: usize = 0;
2854                    for &(pn, sz) in part_sizes {
2855                        if pn == part_num {
2856                            part_start = offset;
2857                            part_size = sz as usize;
2858                            break;
2859                        }
2860                        offset += sz as usize;
2861                    }
2862                }
2863                if let Some(pc) = obj.parts_count {
2864                    headers.insert("x-amz-mp-parts-count", pc.to_string().parse().unwrap());
2865                }
2866                let part_end = part_start + part_size - 1;
2867                headers.insert(
2868                    "content-range",
2869                    format!("bytes {part_start}-{part_end}/{total_size}")
2870                        .parse()
2871                        .unwrap(),
2872                );
2873                headers.insert("content-length", part_size.to_string().parse().unwrap());
2874                response_body = obj.data.slice(part_start..part_start + part_size);
2875                response_status = StatusCode::PARTIAL_CONTENT;
2876            } else {
2877                headers.insert("content-length", total_size.to_string().parse().unwrap());
2878                response_body = obj.data.clone();
2879            }
2880        } else {
2881            headers.insert("content-length", total_size.to_string().parse().unwrap());
2882            response_body = obj.data.clone();
2883        }
2884        // Only include checksum headers for full (non-range) responses
2885        if !is_range_request {
2886            if let Some(algo) = &obj.checksum_algorithm {
2887                if let Some(val) = &obj.checksum_value {
2888                    let hn = format!("x-amz-checksum-{}", algo.to_lowercase());
2889                    if let Ok(name) = hn.parse::<http::header::HeaderName>() {
2890                        if let Ok(hv) = val.parse() {
2891                            headers.insert(name, hv);
2892                        }
2893                    }
2894                }
2895            }
2896        }
2897        Ok(AwsResponse {
2898            status: response_status,
2899            content_type: obj.content_type.clone(),
2900            body: response_body,
2901            headers,
2902        })
2903    }
2904
2905    fn delete_object(
2906        &self,
2907        req: &AwsRequest,
2908        bucket: &str,
2909        key: &str,
2910    ) -> Result<AwsResponse, AwsServiceError> {
2911        let if_match = req
2912            .headers
2913            .get("if-match")
2914            .and_then(|v| v.to_str().ok())
2915            .map(|s| s.to_string());
2916        let version_id_param = req.query_params.get("versionId").cloned();
2917
2918        let mut state = self.state.write();
2919        let region = state.region.clone();
2920        let b = state
2921            .buckets
2922            .get_mut(bucket)
2923            .ok_or_else(|| no_such_bucket(bucket))?;
2924
2925        if let Some(ref if_match_val) = if_match {
2926            match b.objects.get(key) {
2927                Some(existing) => {
2928                    let existing_etag = format!("\"{}\"", existing.etag);
2929                    if !etag_matches(if_match_val, &existing_etag) {
2930                        return Err(precondition_failed("If-Match"));
2931                    }
2932                }
2933                None => {
2934                    return Err(no_such_key(key));
2935                }
2936            }
2937        }
2938
2939        let mut resp_headers = HeaderMap::new();
2940        let versioning_enabled = b.versioning.as_deref() == Some("Enabled");
2941
2942        // Delete a specific version
2943        if let Some(ref vid) = version_id_param {
2944            // Check object lock before deleting a specific version
2945            let locked_obj = {
2946                let mut found: Option<&S3Object> = None;
2947                if let Some(versions) = b.object_versions.get(key) {
2948                    found = versions
2949                        .iter()
2950                        .find(|o| o.version_id.as_deref() == Some(vid.as_str()));
2951                }
2952                if found.is_none() {
2953                    if let Some(obj) = b.objects.get(key) {
2954                        let matches = obj.version_id.as_deref() == Some(vid.as_str())
2955                            || (vid == "null" && obj.version_id.is_none());
2956                        if matches {
2957                            found = Some(obj);
2958                        }
2959                    }
2960                }
2961                found.and_then(|obj| {
2962                    if obj.is_delete_marker {
2963                        return None;
2964                    }
2965                    // Legal hold blocks delete
2966                    if obj.lock_legal_hold.as_deref() == Some("ON") {
2967                        return Some("AccessDenied");
2968                    }
2969                    // Retention check
2970                    if let (Some(mode), Some(until)) = (&obj.lock_mode, &obj.lock_retain_until) {
2971                        if *until > Utc::now() {
2972                            if mode == "COMPLIANCE" {
2973                                return Some("AccessDenied");
2974                            }
2975                            if mode == "GOVERNANCE" {
2976                                // Check bypass header
2977                                let bypass = req
2978                                    .headers
2979                                    .get("x-amz-bypass-governance-retention")
2980                                    .and_then(|v| v.to_str().ok())
2981                                    .map(|s| s.eq_ignore_ascii_case("true"))
2982                                    .unwrap_or(false);
2983                                if !bypass {
2984                                    return Some("AccessDenied");
2985                                }
2986                            }
2987                        }
2988                    }
2989                    None
2990                })
2991            };
2992            if let Some(code) = locked_obj {
2993                return Err(AwsServiceError::aws_error(
2994                    StatusCode::FORBIDDEN,
2995                    code,
2996                    "Access Denied",
2997                ));
2998            }
2999
3000            let mut is_dm = false;
3001            if let Some(versions) = b.object_versions.get_mut(key) {
3002                let vid_matches = |o: &S3Object| {
3003                    o.version_id.as_deref() == Some(vid.as_str())
3004                        || (vid == "null" && o.version_id.is_none())
3005                };
3006                is_dm = versions
3007                    .iter()
3008                    .any(|o| vid_matches(o) && o.is_delete_marker);
3009                let len_before = versions.len();
3010                versions.retain(|o| !vid_matches(o));
3011                let removed = len_before != versions.len();
3012                // Only update current object if we actually removed a version
3013                if removed {
3014                    if let Some(latest) = versions.last() {
3015                        if latest.is_delete_marker {
3016                            b.objects.remove(key);
3017                        } else {
3018                            b.objects.insert(key.to_string(), latest.clone());
3019                        }
3020                    } else {
3021                        b.objects.remove(key);
3022                    }
3023                }
3024                if versions.is_empty() {
3025                    b.object_versions.remove(key);
3026                }
3027            } else if let Some(obj) = b.objects.get(key) {
3028                // Match explicit version id, or treat "null" as matching objects with no version
3029                let matches = obj.version_id.as_deref() == Some(vid.as_str())
3030                    || (vid == "null" && obj.version_id.is_none());
3031                if matches {
3032                    is_dm = obj.is_delete_marker;
3033                    b.objects.remove(key);
3034                }
3035            }
3036            resp_headers.insert("x-amz-version-id", vid.parse().unwrap());
3037            if is_dm {
3038                resp_headers.insert("x-amz-delete-marker", "true".parse().unwrap());
3039            }
3040            return Ok(AwsResponse {
3041                status: StatusCode::NO_CONTENT,
3042                content_type: "application/xml".to_string(),
3043                body: Bytes::new(),
3044                headers: resp_headers,
3045            });
3046        }
3047
3048        // Check object lock for non-version-specific deletes on non-versioned buckets
3049        if !versioning_enabled {
3050            if let Some(existing) = b.objects.get(key) {
3051                if !existing.is_delete_marker {
3052                    if let Some(code) = check_object_lock_for_overwrite(existing, req) {
3053                        return Err(AwsServiceError::aws_error(
3054                            StatusCode::FORBIDDEN,
3055                            code,
3056                            "Access Denied",
3057                        ));
3058                    }
3059                }
3060            }
3061        }
3062
3063        // Versioned bucket: create a delete marker
3064        if versioning_enabled {
3065            // If the existing object was created before versioning, preserve it
3066            if !b.object_versions.contains_key(key) {
3067                if let Some(existing) = b.objects.get(key) {
3068                    let mut preserved = existing.clone();
3069                    if preserved.version_id.is_none() {
3070                        preserved.version_id = Some("null".to_string());
3071                    }
3072                    b.object_versions
3073                        .entry(key.to_string())
3074                        .or_default()
3075                        .push(preserved);
3076                }
3077            }
3078            let dm_id = Uuid::new_v4().to_string();
3079            let marker = make_delete_marker(key, &dm_id);
3080            b.object_versions
3081                .entry(key.to_string())
3082                .or_default()
3083                .push(marker.clone());
3084            b.objects.insert(key.to_string(), marker);
3085            resp_headers.insert("x-amz-version-id", dm_id.parse().unwrap());
3086            resp_headers.insert("x-amz-delete-marker", "true".parse().unwrap());
3087
3088            // Notification for delete
3089            let notification_config = b.notification_config.clone();
3090            let bucket_name = bucket.to_string();
3091            let obj_key = key.to_string();
3092            let region = region.clone();
3093            drop(state);
3094            if let Some(ref config) = notification_config {
3095                deliver_notifications(
3096                    &self.delivery,
3097                    config,
3098                    "ObjectRemoved:DeleteMarkerCreated",
3099                    &bucket_name,
3100                    &obj_key,
3101                    0,
3102                    "",
3103                    &region,
3104                );
3105            }
3106
3107            return Ok(AwsResponse {
3108                status: StatusCode::NO_CONTENT,
3109                content_type: "application/xml".to_string(),
3110                body: Bytes::new(),
3111                headers: resp_headers,
3112            });
3113        }
3114
3115        // Capture notification config before removing
3116        let notification_config = b.notification_config.clone();
3117        let bucket_name = bucket.to_string();
3118        let obj_key = key.to_string();
3119
3120        b.objects.remove(key);
3121        drop(state);
3122
3123        // Deliver S3 event notifications
3124        if let Some(ref config) = notification_config {
3125            deliver_notifications(
3126                &self.delivery,
3127                config,
3128                "ObjectRemoved:Delete",
3129                &bucket_name,
3130                &obj_key,
3131                0,
3132                "",
3133                &region,
3134            );
3135        }
3136
3137        Ok(AwsResponse {
3138            status: StatusCode::NO_CONTENT,
3139            content_type: "application/xml".to_string(),
3140            body: Bytes::new(),
3141            headers: HeaderMap::new(),
3142        })
3143    }
3144
3145    fn head_object(
3146        &self,
3147        req: &AwsRequest,
3148        bucket: &str,
3149        key: &str,
3150    ) -> Result<AwsResponse, AwsServiceError> {
3151        let state = self.state.read();
3152        let b = state
3153            .buckets
3154            .get(bucket)
3155            .ok_or_else(|| no_such_bucket(bucket))?;
3156        let obj = resolve_object(b, key, req.query_params.get("versionId"))?;
3157        if obj.is_delete_marker {
3158            if req.query_params.contains_key("versionId") {
3159                let mut headers = HeaderMap::new();
3160                headers.insert("x-amz-delete-marker", "true".parse().unwrap());
3161                headers.insert("allow", "DELETE".parse().unwrap());
3162                if let Some(vid) = &obj.version_id {
3163                    headers.insert("x-amz-version-id", vid.parse().unwrap());
3164                }
3165                return Ok(AwsResponse {
3166                    status: StatusCode::METHOD_NOT_ALLOWED,
3167                    content_type: "application/xml".to_string(),
3168                    body: Bytes::new(),
3169                    headers,
3170                });
3171            }
3172            let mut headers = HeaderMap::new();
3173            headers.insert("x-amz-delete-marker", "true".parse().unwrap());
3174            if let Some(vid) = &obj.version_id {
3175                headers.insert("x-amz-version-id", vid.parse().unwrap());
3176            }
3177            return Ok(AwsResponse {
3178                status: StatusCode::NOT_FOUND,
3179                content_type: "application/xml".to_string(),
3180                body: Bytes::new(),
3181                headers,
3182            });
3183        }
3184
3185        // Conditional checks for HEAD
3186        check_head_conditionals(req, obj)?;
3187        let total_size = obj.size;
3188        let mut response_status = StatusCode::OK;
3189        let mut headers = HeaderMap::new();
3190        headers.insert("etag", format!("\"{}\"", obj.etag).parse().unwrap());
3191        headers.insert(
3192            "last-modified",
3193            obj.last_modified
3194                .format("%a, %d %b %Y %H:%M:%S GMT")
3195                .to_string()
3196                .parse()
3197                .unwrap(),
3198        );
3199        headers.insert("accept-ranges", "bytes".parse().unwrap());
3200        headers.insert("x-amz-storage-class", obj.storage_class.parse().unwrap());
3201        if let Some(ref enc) = obj.content_encoding {
3202            headers.insert("content-encoding", enc.parse().unwrap());
3203        }
3204        if let Some(range_str) = req.headers.get("range").and_then(|v| v.to_str().ok()) {
3205            if let Some(range_result) = parse_range_header(range_str, total_size as usize) {
3206                match range_result {
3207                    RangeResult::Satisfiable { start, end } => {
3208                        headers.insert(
3209                            "content-range",
3210                            format!("bytes {start}-{end}/{total_size}").parse().unwrap(),
3211                        );
3212                        headers.insert(
3213                            "content-length",
3214                            (end - start + 1).to_string().parse().unwrap(),
3215                        );
3216                        response_status = StatusCode::PARTIAL_CONTENT;
3217                    }
3218                    RangeResult::NotSatisfiable => {
3219                        return Err(AwsServiceError::aws_error(
3220                            StatusCode::RANGE_NOT_SATISFIABLE,
3221                            "InvalidRange",
3222                            "The requested range is not satisfiable",
3223                        ));
3224                    }
3225                    RangeResult::Ignored => {
3226                        headers.insert("content-length", total_size.to_string().parse().unwrap());
3227                    }
3228                }
3229            } else {
3230                headers.insert("content-length", total_size.to_string().parse().unwrap());
3231            }
3232        } else if let Some(part_num_str) = req.query_params.get("partNumber") {
3233            if let Ok(part_num) = part_num_str.parse::<u32>() {
3234                // Validate part number
3235                let max_parts = obj.parts_count.unwrap_or(1);
3236                if part_num < 1 || part_num > max_parts {
3237                    return Err(AwsServiceError::aws_error(
3238                        StatusCode::RANGE_NOT_SATISFIABLE,
3239                        "InvalidRange",
3240                        "The requested range is not satisfiable",
3241                    ));
3242                }
3243                let mut part_start: u64 = 0;
3244                let mut part_size = total_size;
3245                if let Some(ref part_sizes) = obj.part_sizes {
3246                    let mut offset: u64 = 0;
3247                    for &(pn, sz) in part_sizes {
3248                        if pn == part_num {
3249                            part_start = offset;
3250                            part_size = sz;
3251                            break;
3252                        }
3253                        offset += sz;
3254                    }
3255                }
3256                if let Some(pc) = obj.parts_count {
3257                    headers.insert("x-amz-mp-parts-count", pc.to_string().parse().unwrap());
3258                }
3259                let part_end = part_start + part_size - 1;
3260                headers.insert(
3261                    "content-range",
3262                    format!("bytes {part_start}-{part_end}/{total_size}")
3263                        .parse()
3264                        .unwrap(),
3265                );
3266                headers.insert("content-length", part_size.to_string().parse().unwrap());
3267                response_status = StatusCode::PARTIAL_CONTENT;
3268            } else {
3269                headers.insert("content-length", total_size.to_string().parse().unwrap());
3270            }
3271        } else {
3272            headers.insert("content-length", total_size.to_string().parse().unwrap());
3273        }
3274        for (k, v) in &obj.metadata {
3275            if let (Ok(name), Ok(val)) = (
3276                format!("x-amz-meta-{k}").parse::<http::header::HeaderName>(),
3277                v.parse::<http::header::HeaderValue>(),
3278            ) {
3279                headers.insert(name, val);
3280            }
3281        }
3282        if let Some(ref redirect) = obj.website_redirect_location {
3283            headers.insert("x-amz-website-redirect-location", redirect.parse().unwrap());
3284        }
3285
3286        if let Some(vid) = &obj.version_id {
3287            headers.insert("x-amz-version-id", vid.parse().unwrap());
3288        }
3289
3290        // SSE headers
3291        if let Some(algo) = &obj.sse_algorithm {
3292            headers.insert("x-amz-server-side-encryption", algo.parse().unwrap());
3293        }
3294        if let Some(kid) = &obj.sse_kms_key_id {
3295            headers.insert(
3296                "x-amz-server-side-encryption-aws-kms-key-id",
3297                kid.parse().unwrap(),
3298            );
3299        }
3300        if let Some(true) = obj.bucket_key_enabled {
3301            headers.insert(
3302                "x-amz-server-side-encryption-bucket-key-enabled",
3303                "true".parse().unwrap(),
3304            );
3305        }
3306
3307        // Object lock headers
3308        if let Some(ref mode) = obj.lock_mode {
3309            headers.insert("x-amz-object-lock-mode", mode.parse().unwrap());
3310        }
3311        if let Some(ref until) = obj.lock_retain_until {
3312            headers.insert(
3313                "x-amz-object-lock-retain-until-date",
3314                until.to_rfc3339().parse().unwrap(),
3315            );
3316        }
3317        if let Some(ref hold) = obj.lock_legal_hold {
3318            headers.insert("x-amz-object-lock-legal-hold", hold.parse().unwrap());
3319        }
3320        if let Some(ongoing) = obj.restore_ongoing {
3321            let restore_val = if ongoing {
3322                "ongoing-request=\"true\"".to_string()
3323            } else if let Some(ref expiry) = obj.restore_expiry {
3324                format!("ongoing-request=\"false\", expiry-date=\"{expiry}\"")
3325            } else {
3326                "ongoing-request=\"false\"".to_string()
3327            };
3328            headers.insert("x-amz-restore", restore_val.parse().unwrap());
3329        }
3330        // Checksum headers (returned when ChecksumMode=ENABLED or always if set)
3331        if let Some(algo) = &obj.checksum_algorithm {
3332            if let Some(val) = &obj.checksum_value {
3333                let hn = format!("x-amz-checksum-{}", algo.to_lowercase());
3334                if let Ok(name) = hn.parse::<http::header::HeaderName>() {
3335                    if let Ok(hv) = val.parse() {
3336                        headers.insert(name, hv);
3337                    }
3338                }
3339            }
3340        }
3341
3342        Ok(AwsResponse {
3343            status: response_status,
3344            content_type: obj.content_type.clone(),
3345            body: Bytes::new(),
3346            headers,
3347        })
3348    }
3349
3350    fn copy_object(
3351        &self,
3352        req: &AwsRequest,
3353        dest_bucket: &str,
3354        dest_key: &str,
3355    ) -> Result<AwsResponse, AwsServiceError> {
3356        let copy_source = req
3357            .headers
3358            .get("x-amz-copy-source")
3359            .and_then(|v| v.to_str().ok())
3360            .ok_or_else(|| {
3361                AwsServiceError::aws_error(
3362                    StatusCode::BAD_REQUEST,
3363                    "InvalidArgument",
3364                    "x-amz-copy-source header is required",
3365                )
3366            })?;
3367
3368        // Split on '?' BEFORE percent-decoding so keys containing literal '?' are preserved
3369        let raw_source = copy_source.strip_prefix('/').unwrap_or(copy_source);
3370        let (raw_path, src_version_id) = if let Some((path, query)) = raw_source.split_once('?') {
3371            let vid = query
3372                .split('&')
3373                .find_map(|p| p.strip_prefix("versionId="))
3374                .map(|s| s.to_string());
3375            (path, vid)
3376        } else {
3377            (raw_source, None)
3378        };
3379        let decoded_path = percent_encoding::percent_decode_str(raw_path)
3380            .decode_utf8_lossy()
3381            .to_string();
3382
3383        let (src_bucket, src_key) = decoded_path.split_once('/').ok_or_else(|| {
3384            AwsServiceError::aws_error(
3385                StatusCode::BAD_REQUEST,
3386                "InvalidArgument",
3387                "Invalid copy source format",
3388            )
3389        })?;
3390
3391        let metadata_directive = req
3392            .headers
3393            .get("x-amz-metadata-directive")
3394            .and_then(|v| v.to_str().ok())
3395            .unwrap_or("COPY");
3396
3397        let storage_class = req
3398            .headers
3399            .get("x-amz-storage-class")
3400            .and_then(|v| v.to_str().ok())
3401            .map(|s| s.to_string());
3402
3403        // Validate storage class if explicitly provided
3404        if let Some(ref sc) = storage_class {
3405            if !is_valid_storage_class(sc) {
3406                return Err(AwsServiceError::aws_error(
3407                    StatusCode::BAD_REQUEST,
3408                    "InvalidStorageClass",
3409                    "The storage class you specified is not valid",
3410                ));
3411            }
3412        }
3413
3414        let tagging_directive = req
3415            .headers
3416            .get("x-amz-tagging-directive")
3417            .and_then(|v| v.to_str().ok())
3418            .unwrap_or("COPY");
3419
3420        let sse_algorithm = req
3421            .headers
3422            .get("x-amz-server-side-encryption")
3423            .and_then(|v| v.to_str().ok())
3424            .map(|s| s.to_string());
3425
3426        let sse_kms_key_id = req
3427            .headers
3428            .get("x-amz-server-side-encryption-aws-kms-key-id")
3429            .and_then(|v| v.to_str().ok())
3430            .map(|s| s.to_string());
3431
3432        let bucket_key_enabled = req
3433            .headers
3434            .get("x-amz-server-side-encryption-bucket-key-enabled")
3435            .and_then(|v| v.to_str().ok())
3436            .map(|s| s.eq_ignore_ascii_case("true"));
3437
3438        let website_redirect = req
3439            .headers
3440            .get("x-amz-website-redirect-location")
3441            .and_then(|v| v.to_str().ok())
3442            .map(|s| s.to_string());
3443
3444        let if_none_match = req
3445            .headers
3446            .get("x-amz-copy-source-if-none-match")
3447            .and_then(|v| v.to_str().ok())
3448            .map(|s| s.to_string());
3449
3450        let checksum_algorithm = req
3451            .headers
3452            .get("x-amz-checksum-algorithm")
3453            .or_else(|| req.headers.get("x-amz-sdk-checksum-algorithm"))
3454            .and_then(|v| v.to_str().ok())
3455            .map(|s| s.to_uppercase());
3456
3457        let mut state = self.state.write();
3458
3459        // Resolve source object, possibly a specific version
3460        let (src_obj, src_version_id_actual) = {
3461            let sb = state
3462                .buckets
3463                .get(src_bucket)
3464                .ok_or_else(|| no_such_bucket(src_bucket))?;
3465            let obj = resolve_object(sb, src_key, src_version_id.as_ref())?.clone();
3466            (obj.clone(), obj.version_id.clone())
3467        };
3468
3469        // Delete markers cannot be used as copy source
3470        if src_obj.is_delete_marker {
3471            return Err(no_such_key(src_key));
3472        }
3473
3474        // Glacier/Deep Archive: cannot copy unless restored
3475        if is_frozen(&src_obj) {
3476            return Err(AwsServiceError::aws_error(
3477                StatusCode::FORBIDDEN,
3478                "ObjectNotInActiveTierError",
3479                "The source object of the COPY action is not in the active tier and is at the \
3480                 storage class type that does not support the COPY action.",
3481            ));
3482        }
3483
3484        if let Some(ref inm) = if_none_match {
3485            let src_etag = format!("\"{}\"", src_obj.etag);
3486            if etag_matches(inm, &src_etag) {
3487                return Err(AwsServiceError::aws_error_with_fields(
3488                    StatusCode::PRECONDITION_FAILED,
3489                    "PreconditionFailed",
3490                    "At least one of the pre-conditions you specified did not hold",
3491                    vec![(
3492                        "Condition".to_string(),
3493                        "x-amz-copy-source-If-None-Match".to_string(),
3494                    )],
3495                ));
3496            }
3497        }
3498
3499        // Check copy-in-place validity
3500        let has_version_id = src_version_id.is_some();
3501        if src_bucket == dest_bucket
3502            && src_key == dest_key
3503            && metadata_directive == "COPY"
3504            && storage_class.is_none()
3505            && sse_algorithm.is_none()
3506            && website_redirect.is_none()
3507            && !has_version_id
3508        {
3509            // Check if bucket encryption would make this a valid copy-in-place
3510            let sb = state
3511                .buckets
3512                .get(src_bucket)
3513                .ok_or_else(|| no_such_bucket(src_bucket))?;
3514            let has_bucket_encryption = sb.encryption_config.is_some();
3515            if !has_bucket_encryption {
3516                return Err(AwsServiceError::aws_error(
3517                    StatusCode::BAD_REQUEST,
3518                    "InvalidRequest",
3519                    "This copy request is illegal because it is trying to copy an object to itself \
3520                     without changing the object's metadata, storage class, website redirect location \
3521                     or encryption attributes.",
3522                ));
3523            }
3524        }
3525
3526        let etag = src_obj.etag.clone();
3527        let src_obj_size = src_obj.size;
3528        let last_modified = Utc::now();
3529
3530        let new_metadata = if metadata_directive == "REPLACE" {
3531            extract_user_metadata(&req.headers)
3532        } else {
3533            src_obj.metadata.clone()
3534        };
3535
3536        let new_content_type = if metadata_directive == "REPLACE" {
3537            req.headers
3538                .get("content-type")
3539                .and_then(|v| v.to_str().ok())
3540                .unwrap_or(&src_obj.content_type)
3541                .to_string()
3542        } else {
3543            src_obj.content_type.clone()
3544        };
3545
3546        let new_storage_class = storage_class.unwrap_or_else(|| "STANDARD".to_string());
3547
3548        let new_tags = if tagging_directive == "REPLACE" {
3549            let th = req
3550                .headers
3551                .get("x-amz-tagging")
3552                .and_then(|v| v.to_str().ok())
3553                .unwrap_or("");
3554            let tags = parse_url_encoded_tags(th);
3555            // Validate aws: prefix
3556            for (k, _) in &tags {
3557                if k.starts_with("aws:") {
3558                    return Err(AwsServiceError::aws_error(
3559                        StatusCode::BAD_REQUEST,
3560                        "InvalidTag",
3561                        "Your TagKey cannot be prefixed with aws:",
3562                    ));
3563                }
3564            }
3565            tags.into_iter().collect()
3566        } else {
3567            src_obj.tags.clone()
3568        };
3569
3570        // Determine bucket default encryption
3571        let dest_bucket_encryption = state
3572            .buckets
3573            .get(dest_bucket)
3574            .and_then(|b| b.encryption_config.as_ref())
3575            .and_then(|config| {
3576                if config.contains("AES256") {
3577                    Some("AES256".to_string())
3578                } else if config.contains("aws:kms") {
3579                    Some("aws:kms".to_string())
3580                } else {
3581                    None
3582                }
3583            });
3584
3585        // For SSE: if explicitly set, use new values; if copy-in-place changed SSE, use new;
3586        // otherwise fall back based on source or bucket default
3587        let new_sse = if sse_algorithm.is_some() {
3588            sse_algorithm
3589        } else if src_bucket == dest_bucket && src_key == dest_key {
3590            // Copy-in-place without SSE specified: if source had non-AES256 SSE, default to AES256
3591            if src_obj.sse_algorithm.is_some() && src_obj.sse_algorithm.as_deref() != Some("AES256")
3592            {
3593                Some("AES256".to_string())
3594            } else if src_obj.sse_algorithm.is_some() {
3595                src_obj.sse_algorithm.clone()
3596            } else {
3597                // Use bucket default encryption if available
3598                dest_bucket_encryption.clone()
3599            }
3600        } else {
3601            // For cross-key copy, use bucket default encryption if no explicit SSE
3602            dest_bucket_encryption.clone()
3603        };
3604
3605        let new_kms = if sse_kms_key_id.is_some() {
3606            sse_kms_key_id
3607        } else {
3608            None
3609        };
3610        let new_bke = bucket_key_enabled; // Only set if explicitly provided
3611        let new_redirect = website_redirect.or_else(|| {
3612            if metadata_directive == "COPY" {
3613                src_obj.website_redirect_location.clone()
3614            } else {
3615                None
3616            }
3617        });
3618
3619        // Checksum: compute new if algorithm specified, or copy from source
3620        let (new_checksum_algo, new_checksum_val) = if let Some(ref algo) = checksum_algorithm {
3621            let val = compute_checksum(algo, &src_obj.data);
3622            (Some(algo.clone()), Some(val))
3623        } else if src_obj.checksum_algorithm.is_some() {
3624            (
3625                src_obj.checksum_algorithm.clone(),
3626                src_obj.checksum_value.clone(),
3627            )
3628        } else {
3629            (None, None)
3630        };
3631
3632        let db = state
3633            .buckets
3634            .get_mut(dest_bucket)
3635            .ok_or_else(|| no_such_bucket(dest_bucket))?;
3636
3637        let version_id = if db.versioning.as_deref() == Some("Enabled") {
3638            Some(uuid::Uuid::new_v4().to_string())
3639        } else {
3640            None
3641        };
3642
3643        // Default ACL for destination (not copied from source)
3644        let dest_acl_grants = vec![AclGrant {
3645            grantee_type: "CanonicalUser".to_string(),
3646            grantee_id: Some(db.acl_owner_id.clone()),
3647            grantee_display_name: Some(db.acl_owner_id.clone()),
3648            grantee_uri: None,
3649            permission: "FULL_CONTROL".to_string(),
3650        }];
3651
3652        let dest_obj = S3Object {
3653            key: dest_key.to_string(),
3654            data: src_obj.data,
3655            size: src_obj.size,
3656            etag: etag.clone(),
3657            last_modified,
3658            content_type: new_content_type,
3659            metadata: new_metadata,
3660            storage_class: new_storage_class,
3661            tags: new_tags,
3662            acl_grants: dest_acl_grants,
3663            acl_owner_id: Some(db.acl_owner_id.clone()),
3664            parts_count: src_obj.parts_count,
3665            part_sizes: src_obj.part_sizes,
3666            sse_algorithm: new_sse.clone(),
3667            sse_kms_key_id: new_kms.clone(),
3668            bucket_key_enabled: new_bke,
3669            version_id: version_id.clone(),
3670            is_delete_marker: false,
3671            content_encoding: src_obj.content_encoding,
3672            website_redirect_location: new_redirect,
3673            restore_ongoing: None,
3674            restore_expiry: None,
3675            checksum_algorithm: new_checksum_algo.clone(),
3676            checksum_value: new_checksum_val.clone(),
3677            // Do not copy lock from source
3678            lock_mode: None,
3679            lock_retain_until: None,
3680            lock_legal_hold: None,
3681        };
3682
3683        // Store in version history if versioning enabled
3684        if db.versioning.as_deref() == Some("Enabled") {
3685            db.object_versions
3686                .entry(dest_key.to_string())
3687                .or_default()
3688                .push(dest_obj.clone());
3689        }
3690        db.objects.insert(dest_key.to_string(), dest_obj);
3691
3692        let mut response_headers = HeaderMap::new();
3693        if let Some(vid) = &version_id {
3694            response_headers.insert("x-amz-version-id", vid.parse().unwrap());
3695        }
3696        if let Some(ref svid) = src_version_id_actual {
3697            response_headers.insert("x-amz-copy-source-version-id", svid.parse().unwrap());
3698        }
3699        // SSE headers in copy response
3700        if let Some(ref algo) = new_sse {
3701            response_headers.insert("x-amz-server-side-encryption", algo.parse().unwrap());
3702        } else {
3703            response_headers.insert("x-amz-server-side-encryption", "AES256".parse().unwrap());
3704        }
3705        if let Some(ref kid) = new_kms {
3706            response_headers.insert(
3707                "x-amz-server-side-encryption-aws-kms-key-id",
3708                kid.parse().unwrap(),
3709            );
3710        }
3711        if new_bke == Some(true) {
3712            response_headers.insert(
3713                "x-amz-server-side-encryption-bucket-key-enabled",
3714                "true".parse().unwrap(),
3715            );
3716        }
3717
3718        // Build checksum XML if present
3719        let checksum_xml = if let (Some(algo), Some(val)) = (&new_checksum_algo, &new_checksum_val)
3720        {
3721            format!("<Checksum{algo}>{val}</Checksum{algo}>")
3722        } else {
3723            String::new()
3724        };
3725
3726        // Capture notification config before dropping lock
3727        let notification_config = db.notification_config.clone();
3728        let copy_size = src_obj_size;
3729        let copy_etag = etag.clone();
3730        let copy_bucket = dest_bucket.to_string();
3731        let copy_key = dest_key.to_string();
3732        let region = state.region.clone();
3733        drop(state);
3734
3735        let body = format!(
3736            "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
3737             <CopyObjectResult>\
3738             <ETag>&quot;{etag}&quot;</ETag>\
3739             <LastModified>{}</LastModified>\
3740             {checksum_xml}\
3741             </CopyObjectResult>",
3742            last_modified.format("%Y-%m-%dT%H:%M:%S%.3fZ"),
3743        );
3744
3745        // Deliver S3 event notifications
3746        if let Some(ref config) = notification_config {
3747            deliver_notifications(
3748                &self.delivery,
3749                config,
3750                "ObjectCreated:Copy",
3751                &copy_bucket,
3752                &copy_key,
3753                copy_size,
3754                &copy_etag,
3755                &region,
3756            );
3757        }
3758
3759        Ok(AwsResponse {
3760            status: StatusCode::OK,
3761            content_type: "application/xml".to_string(),
3762            body: body.into(),
3763            headers: response_headers,
3764        })
3765    }
3766
3767    fn delete_objects(
3768        &self,
3769        req: &AwsRequest,
3770        bucket: &str,
3771    ) -> Result<AwsResponse, AwsServiceError> {
3772        let body_str = std::str::from_utf8(&req.body).unwrap_or("");
3773        let entries = parse_delete_objects_xml(body_str);
3774
3775        if entries.is_empty() {
3776            return Err(AwsServiceError::aws_error(
3777                StatusCode::BAD_REQUEST,
3778                "MalformedXML",
3779                "The XML you provided was not well-formed or did not validate against our published schema",
3780            ));
3781        }
3782
3783        let mut state = self.state.write();
3784        let b = state
3785            .buckets
3786            .get_mut(bucket)
3787            .ok_or_else(|| no_such_bucket(bucket))?;
3788
3789        let bypass = req
3790            .headers
3791            .get("x-amz-bypass-governance-retention")
3792            .and_then(|v| v.to_str().ok())
3793            .map(|s| s.eq_ignore_ascii_case("true"))
3794            .unwrap_or(false);
3795
3796        let versioning_enabled = b.versioning.as_deref() == Some("Enabled");
3797        let mut deleted_xml = String::new();
3798        let mut error_xml = String::new();
3799        for entry in &entries {
3800            let key = &entry.key;
3801            if let Some(ref vid) = entry.version_id {
3802                // Check lock before deleting specific version
3803                let lock_denied = {
3804                    let obj_opt = b
3805                        .object_versions
3806                        .get(key)
3807                        .and_then(|vs| {
3808                            vs.iter()
3809                                .find(|o| o.version_id.as_deref() == Some(vid.as_str()))
3810                        })
3811                        .or_else(|| {
3812                            b.objects.get(key).filter(|o| {
3813                                o.version_id.as_deref() == Some(vid.as_str())
3814                                    || (vid == "null" && o.version_id.is_none())
3815                            })
3816                        });
3817                    if let Some(obj) = obj_opt {
3818                        if obj.is_delete_marker {
3819                            false
3820                        } else if obj.lock_legal_hold.as_deref() == Some("ON") {
3821                            true
3822                        } else if let (Some(mode), Some(until)) =
3823                            (&obj.lock_mode, &obj.lock_retain_until)
3824                        {
3825                            if *until > Utc::now() {
3826                                if mode == "COMPLIANCE" {
3827                                    true
3828                                } else if mode == "GOVERNANCE" {
3829                                    !bypass
3830                                } else {
3831                                    false
3832                                }
3833                            } else {
3834                                false
3835                            }
3836                        } else {
3837                            false
3838                        }
3839                    } else {
3840                        false
3841                    }
3842                };
3843
3844                if lock_denied {
3845                    error_xml.push_str(&format!(
3846                        "<Error><Key>{}</Key><VersionId>{}</VersionId><Code>AccessDenied</Code><Message>Access Denied because object protected by object lock.</Message></Error>",
3847                        xml_escape(key),
3848                        xml_escape(vid),
3849                    ));
3850                    continue;
3851                }
3852
3853                // Delete specific version
3854                if let Some(versions) = b.object_versions.get_mut(key) {
3855                    versions.retain(|o| {
3856                        !(o.version_id.as_deref() == Some(vid)
3857                            || (vid == "null" && o.version_id.is_none()))
3858                    });
3859                    if let Some(latest) = versions.last() {
3860                        if latest.is_delete_marker {
3861                            b.objects.remove(key);
3862                        } else {
3863                            b.objects.insert(key.to_string(), latest.clone());
3864                        }
3865                    } else {
3866                        b.objects.remove(key);
3867                    }
3868                    if versions.is_empty() {
3869                        b.object_versions.remove(key);
3870                    }
3871                }
3872                deleted_xml.push_str(&format!(
3873                    "<Deleted><Key>{}</Key><VersionId>{}</VersionId></Deleted>",
3874                    xml_escape(key),
3875                    xml_escape(vid),
3876                ));
3877            } else if versioning_enabled {
3878                let dm_id = Uuid::new_v4().to_string();
3879                let marker = make_delete_marker(key, &dm_id);
3880                b.object_versions
3881                    .entry(key.to_string())
3882                    .or_default()
3883                    .push(marker.clone());
3884                b.objects.insert(key.to_string(), marker);
3885                deleted_xml.push_str(&format!(
3886                    "<Deleted><Key>{}</Key><DeleteMarker>true</DeleteMarker><DeleteMarkerVersionId>{}</DeleteMarkerVersionId></Deleted>",
3887                    xml_escape(key), dm_id,
3888                ));
3889            } else {
3890                b.objects.remove(key);
3891                deleted_xml.push_str(&format!(
3892                    "<Deleted><Key>{}</Key></Deleted>",
3893                    xml_escape(key)
3894                ));
3895            }
3896        }
3897
3898        let body = format!(
3899            "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
3900             <DeleteResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
3901             {deleted_xml}\
3902             {error_xml}\
3903             </DeleteResult>"
3904        );
3905        Ok(s3_xml(StatusCode::OK, body))
3906    }
3907
3908    // ---- Object ACL ----
3909
3910    fn get_object_acl(
3911        &self,
3912        req: &AwsRequest,
3913        bucket: &str,
3914        key: &str,
3915    ) -> Result<AwsResponse, AwsServiceError> {
3916        let state = self.state.read();
3917        let b = state
3918            .buckets
3919            .get(bucket)
3920            .ok_or_else(|| no_such_bucket(bucket))?;
3921        let obj = b.objects.get(key).ok_or_else(|| no_such_key(key))?;
3922
3923        let owner_id = obj.acl_owner_id.as_deref().unwrap_or(&req.account_id);
3924        let body = build_acl_xml(owner_id, &obj.acl_grants, &req.account_id);
3925        Ok(s3_xml(StatusCode::OK, body))
3926    }
3927
3928    fn put_object_acl(
3929        &self,
3930        req: &AwsRequest,
3931        bucket: &str,
3932        key: &str,
3933    ) -> Result<AwsResponse, AwsServiceError> {
3934        let canned = req
3935            .headers
3936            .get("x-amz-acl")
3937            .and_then(|v| v.to_str().ok())
3938            .map(|s| s.to_string());
3939
3940        let mut state = self.state.write();
3941        let b = state
3942            .buckets
3943            .get_mut(bucket)
3944            .ok_or_else(|| no_such_bucket(bucket))?;
3945        let owner_id = b.acl_owner_id.clone();
3946        let obj = b.objects.get_mut(key).ok_or_else(|| no_such_key(key))?;
3947
3948        if let Some(acl) = canned {
3949            obj.acl_grants = canned_acl_grants_for_object(&acl, &owner_id);
3950        } else {
3951            // Check for grant headers
3952            let has_grant_headers = req.headers.keys().any(|k| {
3953                let name = k.as_str();
3954                name.starts_with("x-amz-grant-")
3955            });
3956            if has_grant_headers {
3957                obj.acl_grants = parse_grant_headers(&req.headers);
3958            } else {
3959                // Parse from body
3960                let body_str = std::str::from_utf8(&req.body).unwrap_or("");
3961                if !body_str.is_empty() {
3962                    let grants = parse_acl_xml(body_str)?;
3963                    obj.acl_grants = grants;
3964                }
3965            }
3966        }
3967
3968        Ok(AwsResponse {
3969            status: StatusCode::OK,
3970            content_type: "application/xml".to_string(),
3971            body: Bytes::new(),
3972            headers: HeaderMap::new(),
3973        })
3974    }
3975
3976    // ---- Object Tagging ----
3977
3978    fn get_object_tagging(
3979        &self,
3980        _req: &AwsRequest,
3981        bucket: &str,
3982        key: &str,
3983    ) -> Result<AwsResponse, AwsServiceError> {
3984        let state = self.state.read();
3985        let b = state
3986            .buckets
3987            .get(bucket)
3988            .ok_or_else(|| no_such_bucket(bucket))?;
3989        let obj = b.objects.get(key).ok_or_else(|| no_such_key(key))?;
3990
3991        let mut tags_xml = String::new();
3992        for (k, v) in &obj.tags {
3993            tags_xml.push_str(&format!(
3994                "<Tag><Key>{}</Key><Value>{}</Value></Tag>",
3995                xml_escape(k),
3996                xml_escape(v),
3997            ));
3998        }
3999        let body = format!(
4000            "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
4001             <Tagging xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
4002             <TagSet>{tags_xml}</TagSet></Tagging>"
4003        );
4004        Ok(s3_xml(StatusCode::OK, body))
4005    }
4006
4007    fn put_object_tagging(
4008        &self,
4009        req: &AwsRequest,
4010        bucket: &str,
4011        key: &str,
4012    ) -> Result<AwsResponse, AwsServiceError> {
4013        let body_str = std::str::from_utf8(&req.body).unwrap_or("");
4014        let tags = parse_tagging_xml(body_str);
4015
4016        // Validate: no aws: prefix
4017        for (k, _) in &tags {
4018            if k.starts_with("aws:") {
4019                return Err(AwsServiceError::aws_error(
4020                    StatusCode::BAD_REQUEST,
4021                    "InvalidTag",
4022                    "System tags cannot be added/updated by requester",
4023                ));
4024            }
4025        }
4026
4027        // Validate: max 10 tags
4028        if tags.len() > 10 {
4029            return Err(AwsServiceError::aws_error(
4030                StatusCode::BAD_REQUEST,
4031                "BadRequest",
4032                "Object tags cannot be greater than 10",
4033            ));
4034        }
4035
4036        let version_id = req.query_params.get("versionId").map(|s| s.to_string());
4037
4038        let mut state = self.state.write();
4039        let b = state
4040            .buckets
4041            .get_mut(bucket)
4042            .ok_or_else(|| no_such_bucket(bucket))?;
4043
4044        let mut response_headers = HeaderMap::new();
4045
4046        if let Some(ref vid) = version_id {
4047            // Version-specific tagging
4048            let mut found = false;
4049
4050            // Check versioned objects
4051            if let Some(versions) = b.object_versions.get_mut(key) {
4052                if let Some(obj) = versions
4053                    .iter_mut()
4054                    .find(|o| o.version_id.as_deref() == Some(vid.as_str()))
4055                {
4056                    if obj.is_delete_marker {
4057                        return Err(AwsServiceError::aws_error_with_fields(
4058                            StatusCode::METHOD_NOT_ALLOWED,
4059                            "MethodNotAllowed",
4060                            "The specified method is not allowed against this resource.",
4061                            vec![
4062                                ("Method".to_string(), "PUT".to_string()),
4063                                ("ResourceType".to_string(), "DeleteMarker".to_string()),
4064                            ],
4065                        ));
4066                    }
4067                    obj.tags = tags.clone().into_iter().collect();
4068                    response_headers.insert("x-amz-version-id", vid.parse().unwrap());
4069                    found = true;
4070                }
4071            }
4072
4073            // Also check current object
4074            if !found {
4075                if let Some(obj) = b.objects.get_mut(key) {
4076                    if obj.version_id.as_deref() == Some(vid.as_str()) {
4077                        if obj.is_delete_marker {
4078                            return Err(AwsServiceError::aws_error_with_fields(
4079                                StatusCode::METHOD_NOT_ALLOWED,
4080                                "MethodNotAllowed",
4081                                "The specified method is not allowed against this resource.",
4082                                vec![
4083                                    ("Method".to_string(), "PUT".to_string()),
4084                                    ("ResourceType".to_string(), "DeleteMarker".to_string()),
4085                                ],
4086                            ));
4087                        }
4088                        obj.tags = tags.into_iter().collect();
4089                        response_headers.insert("x-amz-version-id", vid.parse().unwrap());
4090                        found = true;
4091                    }
4092                }
4093            }
4094
4095            if !found {
4096                return Err(AwsServiceError::aws_error_with_fields(
4097                    StatusCode::NOT_FOUND,
4098                    "NoSuchVersion",
4099                    "The specified version does not exist.",
4100                    vec![
4101                        ("Key".to_string(), key.to_string()),
4102                        ("VersionId".to_string(), vid.to_string()),
4103                    ],
4104                ));
4105            }
4106        } else {
4107            let obj = b
4108                .objects
4109                .get_mut(key)
4110                .ok_or_else(|| no_such_key_with_detail(key))?;
4111            if obj.is_delete_marker {
4112                return Err(no_such_key_with_detail(key));
4113            }
4114            obj.tags = tags.into_iter().collect();
4115            if let Some(ref vid) = obj.version_id {
4116                response_headers.insert("x-amz-version-id", vid.parse().unwrap());
4117            }
4118        }
4119
4120        Ok(AwsResponse {
4121            status: StatusCode::OK,
4122            content_type: "application/xml".to_string(),
4123            body: Bytes::new(),
4124            headers: response_headers,
4125        })
4126    }
4127
4128    // ---- Multipart Upload ----
4129
4130    fn create_multipart_upload(
4131        &self,
4132        req: &AwsRequest,
4133        bucket: &str,
4134        key: &str,
4135    ) -> Result<AwsResponse, AwsServiceError> {
4136        let upload_id = uuid::Uuid::new_v4().to_string();
4137        let content_type = req
4138            .headers
4139            .get("content-type")
4140            .and_then(|v| v.to_str().ok())
4141            .unwrap_or("application/octet-stream")
4142            .to_string();
4143        let metadata = extract_user_metadata(&req.headers);
4144        let storage_class = req
4145            .headers
4146            .get("x-amz-storage-class")
4147            .and_then(|v| v.to_str().ok())
4148            .unwrap_or("STANDARD")
4149            .to_string();
4150        let sse_algorithm = req
4151            .headers
4152            .get("x-amz-server-side-encryption")
4153            .and_then(|v| v.to_str().ok())
4154            .map(|s| s.to_string());
4155        let sse_kms_key_id = req
4156            .headers
4157            .get("x-amz-server-side-encryption-aws-kms-key-id")
4158            .and_then(|v| v.to_str().ok())
4159            .map(|s| s.to_string());
4160        let tagging = req
4161            .headers
4162            .get("x-amz-tagging")
4163            .and_then(|v| v.to_str().ok())
4164            .map(|s| s.to_string());
4165        let acl_header = req
4166            .headers
4167            .get("x-amz-acl")
4168            .and_then(|v| v.to_str().ok())
4169            .map(|s| s.to_string());
4170        let has_grant_headers = req
4171            .headers
4172            .keys()
4173            .any(|k| k.as_str().starts_with("x-amz-grant-"));
4174
4175        if acl_header.is_some() && has_grant_headers {
4176            return Err(AwsServiceError::aws_error(
4177                StatusCode::BAD_REQUEST,
4178                "InvalidRequest",
4179                "Specifying both Canned ACLs and Header Grants is not allowed",
4180            ));
4181        }
4182
4183        let checksum_algorithm = req
4184            .headers
4185            .get("x-amz-checksum-algorithm")
4186            .or_else(|| req.headers.get("x-amz-sdk-checksum-algorithm"))
4187            .and_then(|v| v.to_str().ok())
4188            .map(|s| s.to_uppercase());
4189
4190        let mut state = self.state.write();
4191        let b = state
4192            .buckets
4193            .get_mut(bucket)
4194            .ok_or_else(|| no_such_bucket(bucket))?;
4195
4196        let acl_grants = if has_grant_headers {
4197            parse_grant_headers(&req.headers)
4198        } else {
4199            let acl = acl_header.as_deref().unwrap_or("private");
4200            canned_acl_grants(acl, &b.acl_owner_id)
4201        };
4202
4203        let upload = MultipartUpload {
4204            upload_id: upload_id.clone(),
4205            key: key.to_string(),
4206            initiated: Utc::now(),
4207            parts: std::collections::BTreeMap::new(),
4208            metadata,
4209            content_type,
4210            storage_class,
4211            sse_algorithm: sse_algorithm.clone(),
4212            sse_kms_key_id: sse_kms_key_id.clone(),
4213            tagging,
4214            acl_grants,
4215            checksum_algorithm,
4216        };
4217        b.multipart_uploads.insert(upload_id.clone(), upload);
4218
4219        let mut headers = HeaderMap::new();
4220        if let Some(algo) = &sse_algorithm {
4221            headers.insert("x-amz-server-side-encryption", algo.parse().unwrap());
4222        }
4223        if let Some(kid) = &sse_kms_key_id {
4224            headers.insert(
4225                "x-amz-server-side-encryption-aws-kms-key-id",
4226                kid.parse().unwrap(),
4227            );
4228        }
4229
4230        let body = format!(
4231            "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
4232             <InitiateMultipartUploadResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
4233             <Bucket>{}</Bucket>\
4234             <Key>{}</Key>\
4235             <UploadId>{}</UploadId>\
4236             </InitiateMultipartUploadResult>",
4237            xml_escape(bucket),
4238            xml_escape(key),
4239            xml_escape(&upload_id),
4240        );
4241        Ok(AwsResponse {
4242            status: StatusCode::OK,
4243            content_type: "application/xml".to_string(),
4244            body: body.into(),
4245            headers,
4246        })
4247    }
4248
4249    fn upload_part(
4250        &self,
4251        req: &AwsRequest,
4252        bucket: &str,
4253        key: &str,
4254        upload_id: &str,
4255        part_number: i64,
4256    ) -> Result<AwsResponse, AwsServiceError> {
4257        // Validate part number
4258        if part_number < 1 {
4259            return Err(no_such_upload(upload_id));
4260        }
4261        if part_number > 10000 {
4262            return Err(AwsServiceError::aws_error_with_fields(
4263                StatusCode::BAD_REQUEST,
4264                "InvalidArgument",
4265                "Part number must be an integer between 1 and 10000, inclusive",
4266                vec![
4267                    ("ArgumentName".to_string(), "partNumber".to_string()),
4268                    ("ArgumentValue".to_string(), part_number.to_string()),
4269                ],
4270            ));
4271        }
4272        let pn = part_number as u32;
4273
4274        let data = req.body.clone();
4275        let etag = compute_md5(&data);
4276
4277        let mut state = self.state.write();
4278        let b = state
4279            .buckets
4280            .get_mut(bucket)
4281            .ok_or_else(|| no_such_bucket(bucket))?;
4282        let upload = b
4283            .multipart_uploads
4284            .get_mut(upload_id)
4285            .ok_or_else(|| no_such_upload(upload_id))?;
4286        if upload.key != key {
4287            return Err(no_such_upload(upload_id));
4288        }
4289
4290        let part = UploadPart {
4291            part_number: pn,
4292            data: data.clone(),
4293            etag: etag.clone(),
4294            size: data.len() as u64,
4295            last_modified: Utc::now(),
4296        };
4297        upload.parts.insert(pn, part);
4298
4299        let mut headers = HeaderMap::new();
4300        headers.insert("etag", format!("\"{etag}\"").parse().unwrap());
4301        if let Some(algo) = &upload.sse_algorithm {
4302            headers.insert("x-amz-server-side-encryption", algo.parse().unwrap());
4303        }
4304        if let Some(kid) = &upload.sse_kms_key_id {
4305            headers.insert(
4306                "x-amz-server-side-encryption-aws-kms-key-id",
4307                kid.parse().unwrap(),
4308            );
4309        }
4310        Ok(AwsResponse {
4311            status: StatusCode::OK,
4312            content_type: "application/xml".to_string(),
4313            body: Bytes::new(),
4314            headers,
4315        })
4316    }
4317
4318    fn upload_part_copy(
4319        &self,
4320        req: &AwsRequest,
4321        bucket: &str,
4322        key: &str,
4323        upload_id: &str,
4324        part_number: i64,
4325    ) -> Result<AwsResponse, AwsServiceError> {
4326        let copy_source = req
4327            .headers
4328            .get("x-amz-copy-source")
4329            .and_then(|v| v.to_str().ok())
4330            .ok_or_else(|| {
4331                AwsServiceError::aws_error(
4332                    StatusCode::BAD_REQUEST,
4333                    "InvalidArgument",
4334                    "x-amz-copy-source header is required",
4335                )
4336            })?;
4337
4338        // Split on '?' BEFORE percent-decoding so keys containing literal '?' are preserved
4339        let raw_source = copy_source.strip_prefix('/').unwrap_or(copy_source);
4340
4341        // Parse versionId from ?versionId=X
4342        let (raw_path, source_version_id) = if let Some(idx) = raw_source.find("?versionId=") {
4343            let vid = raw_source[idx + 11..].to_string();
4344            (&raw_source[..idx], Some(vid))
4345        } else {
4346            (raw_source, None)
4347        };
4348        let decoded_path = percent_encoding::percent_decode_str(raw_path)
4349            .decode_utf8_lossy()
4350            .to_string();
4351
4352        let (src_bucket, src_key) = decoded_path.split_once('/').ok_or_else(|| {
4353            AwsServiceError::aws_error(
4354                StatusCode::BAD_REQUEST,
4355                "InvalidArgument",
4356                "Invalid copy source format",
4357            )
4358        })?;
4359
4360        let copy_range = req
4361            .headers
4362            .get("x-amz-copy-source-range")
4363            .and_then(|v| v.to_str().ok());
4364
4365        let mut state = self.state.write();
4366        let src_data = {
4367            let sb = state
4368                .buckets
4369                .get(src_bucket)
4370                .ok_or_else(|| no_such_bucket(src_bucket))?;
4371
4372            let src_obj = if let Some(ref vid) = source_version_id {
4373                resolve_object(sb, src_key, Some(vid))?
4374            } else {
4375                sb.objects
4376                    .get(src_key)
4377                    .ok_or_else(|| no_such_key(src_key))?
4378            };
4379
4380            if let Some(range_str) = copy_range {
4381                let range_part = range_str.strip_prefix("bytes=").unwrap_or(range_str);
4382                if let Some((start_str, end_str)) = range_part.split_once('-') {
4383                    let start: usize = start_str.parse().unwrap_or(0);
4384                    let end: usize = end_str.parse().unwrap_or(src_obj.data.len() - 1);
4385                    let end = std::cmp::min(end + 1, src_obj.data.len());
4386                    src_obj.data.slice(start..end)
4387                } else {
4388                    src_obj.data.clone()
4389                }
4390            } else {
4391                src_obj.data.clone()
4392            }
4393        };
4394
4395        let data_len = src_data.len() as u64;
4396        let etag = compute_md5(&src_data);
4397        let b = state
4398            .buckets
4399            .get_mut(bucket)
4400            .ok_or_else(|| no_such_bucket(bucket))?;
4401        let upload = b
4402            .multipart_uploads
4403            .get_mut(upload_id)
4404            .ok_or_else(|| no_such_upload(upload_id))?;
4405        if upload.key != key {
4406            return Err(no_such_upload(upload_id));
4407        }
4408
4409        let part = UploadPart {
4410            part_number: part_number as u32,
4411            data: src_data,
4412            etag: etag.clone(),
4413            size: data_len,
4414            last_modified: Utc::now(),
4415        };
4416        upload.parts.insert(part_number as u32, part);
4417
4418        let body = format!(
4419            "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
4420             <CopyPartResult>\
4421             <ETag>&quot;{etag}&quot;</ETag>\
4422             <LastModified>{}</LastModified>\
4423             </CopyPartResult>",
4424            Utc::now().format("%Y-%m-%dT%H:%M:%S%.3fZ"),
4425        );
4426        Ok(s3_xml(StatusCode::OK, body))
4427    }
4428
4429    fn complete_multipart_upload(
4430        &self,
4431        req: &AwsRequest,
4432        bucket: &str,
4433        key: &str,
4434        upload_id: &str,
4435    ) -> Result<AwsResponse, AwsServiceError> {
4436        let body_str = std::str::from_utf8(&req.body).unwrap_or("");
4437        let submitted_parts = parse_complete_multipart_xml(body_str);
4438
4439        if submitted_parts.is_empty() {
4440            return Err(AwsServiceError::aws_error(
4441                StatusCode::BAD_REQUEST,
4442                "MalformedXML",
4443                "The XML you provided was not well-formed or did not validate against our published schema",
4444            ));
4445        }
4446
4447        let if_none_match = req
4448            .headers
4449            .get("x-amz-if-none-match")
4450            .or_else(|| req.headers.get("if-none-match"))
4451            .and_then(|v| v.to_str().ok())
4452            .map(|s| s.to_string());
4453
4454        let mut state = self.state.write();
4455        let b = state
4456            .buckets
4457            .get_mut(bucket)
4458            .ok_or_else(|| no_such_bucket(bucket))?;
4459
4460        let upload = match b.multipart_uploads.get(upload_id) {
4461            Some(u) => u.clone(),
4462            None => {
4463                // Upload already completed - return existing object if it exists
4464                // IfNoneMatch does NOT apply to re-completions
4465                if let Some(obj) = b.objects.get(key) {
4466                    let etag = obj.etag.clone();
4467                    let body = format!(
4468                        "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
4469                         <CompleteMultipartUploadResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
4470                         <Bucket>{}</Bucket>\
4471                         <Key>{}</Key>\
4472                         <ETag>&quot;{}&quot;</ETag>\
4473                         </CompleteMultipartUploadResult>",
4474                        xml_escape(bucket),
4475                        xml_escape(key),
4476                        xml_escape(&etag),
4477                    );
4478                    return Ok(AwsResponse {
4479                        status: StatusCode::OK,
4480                        content_type: "application/xml".to_string(),
4481                        body: body.into(),
4482                        headers: HeaderMap::new(),
4483                    });
4484                }
4485                return Err(no_such_upload(upload_id));
4486            }
4487        };
4488
4489        if upload.key != key {
4490            return Err(no_such_upload(upload_id));
4491        }
4492
4493        // IfNoneMatch: if "*" and object already exists, reject (only for real completions)
4494        if let Some(ref inm) = if_none_match {
4495            if inm == "*" && b.objects.contains_key(key) {
4496                return Err(precondition_failed("If-None-Match"));
4497            }
4498        }
4499
4500        // Use parts in submitted order (AWS requires ascending, but we don't enforce)
4501        let sorted_parts = submitted_parts;
4502
4503        // Validate minimum part size: all non-last parts must be >= 5MB
4504        // Use a relaxed threshold for testing compatibility (the decorator
4505        // `reduced_min_part_size` in test suites lowers this to 256 bytes).
4506        if sorted_parts.len() > 1 {
4507            const MIN_PART_SIZE: usize = 256;
4508            for (i, (part_num, _)) in sorted_parts.iter().enumerate() {
4509                if i >= sorted_parts.len() - 1 {
4510                    break; // skip last part
4511                }
4512                if let Some(part) = upload.parts.get(part_num) {
4513                    if part.data.len() < MIN_PART_SIZE {
4514                        return Err(AwsServiceError::aws_error(
4515                            StatusCode::BAD_REQUEST,
4516                            "EntityTooSmall",
4517                            "Your proposed upload is smaller than the minimum allowed object size.",
4518                        ));
4519                    }
4520                }
4521            }
4522        }
4523
4524        // Assemble the object from parts
4525        let mut combined_data = Vec::new();
4526        let mut md5_digests = Vec::new();
4527        let mut part_sizes = Vec::new();
4528
4529        for (part_num, submitted_etag) in &sorted_parts {
4530            let part = upload.parts.get(part_num).ok_or_else(|| {
4531                AwsServiceError::aws_error(
4532                    StatusCode::BAD_REQUEST,
4533                    "InvalidPart",
4534                    "One or more of the specified parts could not be found.",
4535                )
4536            })?;
4537            if submitted_etag != &part.etag {
4538                return Err(AwsServiceError::aws_error(
4539                    StatusCode::BAD_REQUEST,
4540                    "InvalidPart",
4541                    "One or more of the specified parts could not be found. The part may not have been uploaded, or the specified entity tag may not have matched the part's entity tag.",
4542                ));
4543            }
4544            combined_data.extend_from_slice(&part.data);
4545            let part_md5 = Md5::digest(&part.data);
4546            md5_digests.extend_from_slice(&part_md5);
4547            part_sizes.push((*part_num, part.data.len() as u64));
4548        }
4549
4550        // Multipart ETag: MD5(concat(part_md5_digests))-N
4551        let combined_md5 = Md5::digest(&md5_digests);
4552        let etag = format!("{:x}-{}", combined_md5, sorted_parts.len());
4553        let checksum_value = upload
4554            .checksum_algorithm
4555            .as_deref()
4556            .map(|algo| compute_checksum(algo, &combined_data));
4557        let data = Bytes::from(combined_data);
4558
4559        let tags = if let Some(ref tagging) = upload.tagging {
4560            parse_url_encoded_tags(tagging).into_iter().collect()
4561        } else {
4562            std::collections::HashMap::new()
4563        };
4564
4565        let version_id = if b.versioning.as_deref() == Some("Enabled") {
4566            Some(uuid::Uuid::new_v4().to_string())
4567        } else {
4568            None
4569        };
4570
4571        let obj = S3Object {
4572            key: key.to_string(),
4573            size: data.len() as u64,
4574            data,
4575            content_type: upload.content_type.clone(),
4576            etag: etag.clone(),
4577            last_modified: Utc::now(),
4578            metadata: upload.metadata.clone(),
4579            storage_class: upload.storage_class.clone(),
4580            tags,
4581            acl_grants: upload.acl_grants.clone(),
4582            acl_owner_id: Some(b.acl_owner_id.clone()),
4583            parts_count: Some(sorted_parts.len() as u32),
4584            part_sizes: Some(part_sizes),
4585            sse_algorithm: upload.sse_algorithm.clone(),
4586            sse_kms_key_id: upload.sse_kms_key_id.clone(),
4587            bucket_key_enabled: None,
4588            version_id: version_id.clone(),
4589            is_delete_marker: false,
4590            content_encoding: None,
4591            website_redirect_location: None,
4592            restore_ongoing: None,
4593            restore_expiry: None,
4594            checksum_algorithm: upload.checksum_algorithm.clone(),
4595            checksum_value,
4596            lock_mode: None,
4597            lock_retain_until: None,
4598            lock_legal_hold: None,
4599        };
4600        b.objects.insert(key.to_string(), obj);
4601        b.multipart_uploads.remove(upload_id);
4602
4603        let mut headers = HeaderMap::new();
4604        if let Some(vid) = &version_id {
4605            headers.insert("x-amz-version-id", vid.parse().unwrap());
4606        }
4607        if let Some(algo) = &upload.sse_algorithm {
4608            headers.insert("x-amz-server-side-encryption", algo.parse().unwrap());
4609        }
4610        if let Some(kid) = &upload.sse_kms_key_id {
4611            headers.insert(
4612                "x-amz-server-side-encryption-aws-kms-key-id",
4613                kid.parse().unwrap(),
4614            );
4615        }
4616
4617        let body = format!(
4618            "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
4619             <CompleteMultipartUploadResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
4620             <Bucket>{}</Bucket>\
4621             <Key>{}</Key>\
4622             <ETag>&quot;{}&quot;</ETag>\
4623             </CompleteMultipartUploadResult>",
4624            xml_escape(bucket),
4625            xml_escape(key),
4626            xml_escape(&etag),
4627        );
4628        Ok(AwsResponse {
4629            status: StatusCode::OK,
4630            content_type: "application/xml".to_string(),
4631            body: body.into(),
4632            headers,
4633        })
4634    }
4635
4636    fn abort_multipart_upload(
4637        &self,
4638        bucket: &str,
4639        key: &str,
4640        upload_id: &str,
4641    ) -> Result<AwsResponse, AwsServiceError> {
4642        let mut state = self.state.write();
4643        let b = state
4644            .buckets
4645            .get_mut(bucket)
4646            .ok_or_else(|| no_such_bucket(bucket))?;
4647
4648        // Validate upload exists and belongs to the requested key
4649        match b.multipart_uploads.get(upload_id) {
4650            Some(upload) if upload.key != key => {
4651                return Err(no_such_upload(upload_id));
4652            }
4653            None => {
4654                return Err(no_such_upload(upload_id));
4655            }
4656            _ => {}
4657        }
4658        b.multipart_uploads.remove(upload_id);
4659
4660        Ok(AwsResponse {
4661            status: StatusCode::NO_CONTENT,
4662            content_type: "application/xml".to_string(),
4663            body: Bytes::new(),
4664            headers: HeaderMap::new(),
4665        })
4666    }
4667
4668    fn list_multipart_uploads(&self, bucket: &str) -> Result<AwsResponse, AwsServiceError> {
4669        let state = self.state.read();
4670        let b = state
4671            .buckets
4672            .get(bucket)
4673            .ok_or_else(|| no_such_bucket(bucket))?;
4674
4675        let mut uploads_xml = String::new();
4676        let mut sorted_uploads: Vec<_> = b.multipart_uploads.values().collect();
4677        sorted_uploads.sort_by_key(|u| &u.key);
4678        for upload in &sorted_uploads {
4679            uploads_xml.push_str(&format!(
4680                "<Upload>\
4681                 <Key>{}</Key>\
4682                 <UploadId>{}</UploadId>\
4683                 <Initiated>{}</Initiated>\
4684                 <StorageClass>{}</StorageClass>\
4685                 </Upload>",
4686                xml_escape(&upload.key),
4687                xml_escape(&upload.upload_id),
4688                upload.initiated.format("%Y-%m-%dT%H:%M:%S%.3fZ"),
4689                xml_escape(&upload.storage_class),
4690            ));
4691        }
4692
4693        let body = format!(
4694            "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
4695             <ListMultipartUploadsResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
4696             <Bucket>{}</Bucket>\
4697             <MaxUploads>1000</MaxUploads>\
4698             <IsTruncated>false</IsTruncated>\
4699             {uploads_xml}\
4700             </ListMultipartUploadsResult>",
4701            xml_escape(bucket),
4702        );
4703        Ok(s3_xml(StatusCode::OK, body))
4704    }
4705
4706    fn list_parts(
4707        &self,
4708        req: &AwsRequest,
4709        bucket: &str,
4710        key: &str,
4711        upload_id: &str,
4712    ) -> Result<AwsResponse, AwsServiceError> {
4713        let max_parts: i64 = match req.query_params.get("max-parts") {
4714            Some(v) => v.parse().map_err(|_| {
4715                AwsServiceError::aws_error(
4716                    StatusCode::BAD_REQUEST,
4717                    "InvalidArgument",
4718                    "Provided max-parts not an integer or within integer range",
4719                )
4720            })?,
4721            None => 1000,
4722        };
4723        let part_number_marker: i64 = match req.query_params.get("part-number-marker") {
4724            Some(v) => v.parse().map_err(|_| {
4725                AwsServiceError::aws_error(
4726                    StatusCode::BAD_REQUEST,
4727                    "InvalidArgument",
4728                    "Provided part-number-marker not an integer or within integer range",
4729                )
4730            })?,
4731            None => 0,
4732        };
4733
4734        // Validate max-parts and part-number-marker
4735        if max_parts < 0 {
4736            return Err(AwsServiceError::aws_error(
4737                StatusCode::BAD_REQUEST,
4738                "InvalidArgument",
4739                "Argument max-parts must be an integer between 0 and 2147483647",
4740            ));
4741        }
4742        if max_parts > 2147483647 {
4743            return Err(AwsServiceError::aws_error(
4744                StatusCode::BAD_REQUEST,
4745                "InvalidArgument",
4746                "Provided max-parts not an integer or within integer range",
4747            ));
4748        }
4749        if part_number_marker < 0 {
4750            return Err(AwsServiceError::aws_error(
4751                StatusCode::BAD_REQUEST,
4752                "InvalidArgument",
4753                "Argument part-number-marker must be an integer between 0 and 2147483647",
4754            ));
4755        }
4756        if part_number_marker > 2147483647 {
4757            return Err(AwsServiceError::aws_error(
4758                StatusCode::BAD_REQUEST,
4759                "InvalidArgument",
4760                "Provided part-number-marker not an integer or within integer range",
4761            ));
4762        }
4763
4764        let state = self.state.read();
4765        let b = state
4766            .buckets
4767            .get(bucket)
4768            .ok_or_else(|| no_such_bucket(bucket))?;
4769        let upload = b
4770            .multipart_uploads
4771            .get(upload_id)
4772            .ok_or_else(|| no_such_upload(upload_id))?;
4773        if upload.key != key {
4774            return Err(no_such_upload(upload_id));
4775        }
4776
4777        // Filter parts after marker and apply limit
4778        let all_parts: Vec<_> = upload
4779            .parts
4780            .values()
4781            .filter(|p| p.part_number as i64 > part_number_marker)
4782            .collect();
4783        let max = max_parts as usize;
4784        let is_truncated = all_parts.len() > max;
4785        let display_parts: Vec<_> = all_parts.into_iter().take(max).collect();
4786
4787        let mut parts_xml = String::new();
4788        let mut next_marker: i64 = 0;
4789        for part in &display_parts {
4790            next_marker = part.part_number as i64;
4791            parts_xml.push_str(&format!(
4792                "<Part>\
4793                 <PartNumber>{}</PartNumber>\
4794                 <ETag>&quot;{}&quot;</ETag>\
4795                 <Size>{}</Size>\
4796                 <LastModified>{}</LastModified>\
4797                 </Part>",
4798                part.part_number,
4799                xml_escape(&part.etag),
4800                part.size,
4801                part.last_modified.format("%Y-%m-%dT%H:%M:%S%.3fZ"),
4802            ));
4803        }
4804
4805        let body = format!(
4806            "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
4807             <ListPartsResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
4808             <Bucket>{}</Bucket>\
4809             <Key>{}</Key>\
4810             <UploadId>{}</UploadId>\
4811             <PartNumberMarker>{part_number_marker}</PartNumberMarker>\
4812             <NextPartNumberMarker>{next_marker}</NextPartNumberMarker>\
4813             <MaxParts>{max_parts}</MaxParts>\
4814             <IsTruncated>{is_truncated}</IsTruncated>\
4815             {parts_xml}\
4816             </ListPartsResult>",
4817            xml_escape(bucket),
4818            xml_escape(key),
4819            xml_escape(upload_id),
4820        );
4821        Ok(s3_xml(StatusCode::OK, body))
4822    }
4823
4824    fn delete_object_tagging(
4825        &self,
4826        bucket: &str,
4827        key: &str,
4828    ) -> Result<AwsResponse, AwsServiceError> {
4829        let mut state = self.state.write();
4830        let b = state
4831            .buckets
4832            .get_mut(bucket)
4833            .ok_or_else(|| no_such_bucket(bucket))?;
4834        let obj = b.objects.get_mut(key).ok_or_else(|| no_such_key(key))?;
4835        obj.tags.clear();
4836        Ok(AwsResponse {
4837            status: StatusCode::NO_CONTENT,
4838            content_type: "application/xml".to_string(),
4839            body: Bytes::new(),
4840            headers: HeaderMap::new(),
4841        })
4842    }
4843
4844    fn put_object_retention(
4845        &self,
4846        req: &AwsRequest,
4847        bucket: &str,
4848        key: &str,
4849    ) -> Result<AwsResponse, AwsServiceError> {
4850        let version_id = req.query_params.get("versionId").cloned();
4851        let body_str = std::str::from_utf8(&req.body).unwrap_or("");
4852        let mode = extract_xml_value(body_str, "Mode");
4853        let retain_until = extract_xml_value(body_str, "RetainUntilDate")
4854            .and_then(|s| s.parse::<DateTime<Utc>>().ok());
4855
4856        let mut state = self.state.write();
4857        let b = state
4858            .buckets
4859            .get_mut(bucket)
4860            .ok_or_else(|| no_such_bucket(bucket))?;
4861
4862        // Find and update the object (either current or specific version)
4863        if let Some(ref vid) = version_id {
4864            let mut found = false;
4865            if let Some(versions) = b.object_versions.get_mut(key) {
4866                for obj in versions.iter_mut() {
4867                    if obj.version_id.as_deref() == Some(vid) {
4868                        obj.lock_mode = mode.clone();
4869                        obj.lock_retain_until = retain_until;
4870                        found = true;
4871                        break;
4872                    }
4873                }
4874            }
4875            if let Some(obj) = b.objects.get_mut(key) {
4876                if obj.version_id.as_deref() == Some(vid) {
4877                    obj.lock_mode = mode;
4878                    obj.lock_retain_until = retain_until;
4879                    found = true;
4880                }
4881            }
4882            if !found {
4883                return Err(no_such_key(key));
4884            }
4885        } else {
4886            let obj = b.objects.get_mut(key).ok_or_else(|| no_such_key(key))?;
4887            obj.lock_mode = mode.clone();
4888            obj.lock_retain_until = retain_until;
4889            // Also update in object_versions if the current object has a version_id
4890            if let Some(ref vid) = obj.version_id {
4891                let vid = vid.clone();
4892                if let Some(versions) = b.object_versions.get_mut(key) {
4893                    for v in versions.iter_mut() {
4894                        if v.version_id.as_deref() == Some(&vid) {
4895                            v.lock_mode = mode.clone();
4896                            v.lock_retain_until = retain_until;
4897                            break;
4898                        }
4899                    }
4900                }
4901            }
4902        }
4903
4904        Ok(empty_response(StatusCode::OK))
4905    }
4906
4907    fn get_object_retention(
4908        &self,
4909        req: &AwsRequest,
4910        bucket: &str,
4911        key: &str,
4912    ) -> Result<AwsResponse, AwsServiceError> {
4913        let state = self.state.read();
4914        let b = state
4915            .buckets
4916            .get(bucket)
4917            .ok_or_else(|| no_such_bucket(bucket))?;
4918        let obj = resolve_object(b, key, req.query_params.get("versionId"))?;
4919
4920        match (&obj.lock_mode, &obj.lock_retain_until) {
4921            (Some(mode), Some(until)) => {
4922                let body = format!(
4923                    "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
4924                     <Retention xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
4925                     <Mode>{}</Mode>\
4926                     <RetainUntilDate>{}</RetainUntilDate>\
4927                     </Retention>",
4928                    xml_escape(mode),
4929                    until.to_rfc3339(),
4930                );
4931                Ok(s3_xml(StatusCode::OK, body))
4932            }
4933            _ => Err(AwsServiceError::aws_error(
4934                StatusCode::NOT_FOUND,
4935                "NoSuchObjectLockConfiguration",
4936                "The specified object does not have a ObjectLock configuration",
4937            )),
4938        }
4939    }
4940
4941    fn put_object_legal_hold(
4942        &self,
4943        req: &AwsRequest,
4944        bucket: &str,
4945        key: &str,
4946    ) -> Result<AwsResponse, AwsServiceError> {
4947        let version_id = req.query_params.get("versionId").cloned();
4948        let body_str = std::str::from_utf8(&req.body).unwrap_or("");
4949        let status = extract_xml_value(body_str, "Status");
4950
4951        let mut state = self.state.write();
4952        let b = state
4953            .buckets
4954            .get_mut(bucket)
4955            .ok_or_else(|| no_such_bucket(bucket))?;
4956
4957        if let Some(ref vid) = version_id {
4958            let mut found = false;
4959            if let Some(versions) = b.object_versions.get_mut(key) {
4960                for obj in versions.iter_mut() {
4961                    if obj.version_id.as_deref() == Some(vid) {
4962                        obj.lock_legal_hold = status.clone();
4963                        found = true;
4964                        break;
4965                    }
4966                }
4967            }
4968            if let Some(obj) = b.objects.get_mut(key) {
4969                if obj.version_id.as_deref() == Some(vid) {
4970                    obj.lock_legal_hold = status;
4971                    found = true;
4972                }
4973            }
4974            if !found {
4975                return Err(no_such_key(key));
4976            }
4977        } else {
4978            let obj = b.objects.get_mut(key).ok_or_else(|| no_such_key(key))?;
4979            obj.lock_legal_hold = status.clone();
4980            // Also update in object_versions if the current object has a version_id
4981            if let Some(ref vid) = obj.version_id {
4982                let vid = vid.clone();
4983                if let Some(versions) = b.object_versions.get_mut(key) {
4984                    for v in versions.iter_mut() {
4985                        if v.version_id.as_deref() == Some(&vid) {
4986                            v.lock_legal_hold = status.clone();
4987                            break;
4988                        }
4989                    }
4990                }
4991            }
4992        }
4993
4994        Ok(empty_response(StatusCode::OK))
4995    }
4996
4997    fn get_object_legal_hold(
4998        &self,
4999        req: &AwsRequest,
5000        bucket: &str,
5001        key: &str,
5002    ) -> Result<AwsResponse, AwsServiceError> {
5003        let state = self.state.read();
5004        let b = state
5005            .buckets
5006            .get(bucket)
5007            .ok_or_else(|| no_such_bucket(bucket))?;
5008        let obj = resolve_object(b, key, req.query_params.get("versionId"))?;
5009
5010        match &obj.lock_legal_hold {
5011            Some(hold) => {
5012                let body = format!(
5013                    "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
5014                     <LegalHold xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
5015                     <Status>{}</Status>\
5016                     </LegalHold>",
5017                    xml_escape(hold),
5018                );
5019                Ok(s3_xml(StatusCode::OK, body))
5020            }
5021            None => Err(AwsServiceError::aws_error(
5022                StatusCode::NOT_FOUND,
5023                "NoSuchObjectLockConfiguration",
5024                "The specified object does not have a ObjectLock configuration",
5025            )),
5026        }
5027    }
5028
5029    fn get_object_attributes(
5030        &self,
5031        req: &AwsRequest,
5032        bucket: &str,
5033        key: &str,
5034    ) -> Result<AwsResponse, AwsServiceError> {
5035        let state = self.state.read();
5036        let b = state
5037            .buckets
5038            .get(bucket)
5039            .ok_or_else(|| no_such_bucket(bucket))?;
5040        let obj = b.objects.get(key).ok_or_else(|| no_such_key(key))?;
5041
5042        let attrs = req
5043            .headers
5044            .get("x-amz-object-attributes")
5045            .and_then(|v| v.to_str().ok())
5046            .unwrap_or("");
5047
5048        let mut body_parts = Vec::new();
5049
5050        for attr in attrs.split(',') {
5051            let attr = attr.trim();
5052            match attr {
5053                "ETag" => {
5054                    body_parts.push(format!("<ETag>{}</ETag>", xml_escape(&obj.etag)));
5055                }
5056                "StorageClass" => {
5057                    body_parts.push(format!(
5058                        "<StorageClass>{}</StorageClass>",
5059                        xml_escape(&obj.storage_class)
5060                    ));
5061                }
5062                "ObjectSize" => {
5063                    body_parts.push(format!("<ObjectSize>{}</ObjectSize>", obj.size));
5064                }
5065                "Checksum" => {
5066                    if let (Some(algo), Some(val)) = (&obj.checksum_algorithm, &obj.checksum_value)
5067                    {
5068                        body_parts.push(format!(
5069                            "<Checksum><Checksum{algo}>{val}</Checksum{algo}></Checksum>"
5070                        ));
5071                    }
5072                }
5073                "ObjectParts" => {
5074                    if let Some(pc) = obj.parts_count {
5075                        let mut parts_inner = format!("<TotalPartsCount>{pc}</TotalPartsCount>");
5076                        if let Some(ref ps) = obj.part_sizes {
5077                            for (pn, sz) in ps {
5078                                parts_inner.push_str(&format!(
5079                                    "<Part><PartNumber>{pn}</PartNumber><Size>{sz}</Size></Part>"
5080                                ));
5081                            }
5082                        }
5083                        body_parts.push(format!("<ObjectParts>{parts_inner}</ObjectParts>"));
5084                    }
5085                }
5086                _ => {}
5087            }
5088        }
5089
5090        let mut headers = HeaderMap::new();
5091        if let Some(vid) = &obj.version_id {
5092            headers.insert("x-amz-version-id", vid.parse().unwrap());
5093        }
5094        headers.insert(
5095            "last-modified",
5096            obj.last_modified
5097                .format("%a, %d %b %Y %H:%M:%S GMT")
5098                .to_string()
5099                .parse()
5100                .unwrap(),
5101        );
5102
5103        let body = format!(
5104            "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
5105             <GetObjectAttributesResponse xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
5106             {}\
5107             </GetObjectAttributesResponse>",
5108            body_parts.join("")
5109        );
5110        Ok(AwsResponse {
5111            status: StatusCode::OK,
5112            content_type: "application/xml".to_string(),
5113            body: body.into(),
5114            headers,
5115        })
5116    }
5117
5118    fn restore_object(
5119        &self,
5120        _req: &AwsRequest,
5121        bucket: &str,
5122        key: &str,
5123    ) -> Result<AwsResponse, AwsServiceError> {
5124        let mut state = self.state.write();
5125        let b = state
5126            .buckets
5127            .get_mut(bucket)
5128            .ok_or_else(|| no_such_bucket(bucket))?;
5129        let obj = b.objects.get_mut(key).ok_or_else(|| no_such_key(key))?;
5130        let glacier_classes = [
5131            "GLACIER",
5132            "DEEP_ARCHIVE",
5133            "GLACIER_IR",
5134            "INTELLIGENT_TIERING",
5135        ];
5136        if !glacier_classes.contains(&obj.storage_class.as_str()) {
5137            return Err(AwsServiceError::aws_error_with_fields(
5138                StatusCode::FORBIDDEN,
5139                "InvalidObjectState",
5140                "The operation is not valid for the object's storage class",
5141                vec![("StorageClass".to_string(), obj.storage_class.clone())],
5142            ));
5143        }
5144        let status = if obj.restore_ongoing.is_some() {
5145            StatusCode::OK
5146        } else {
5147            StatusCode::ACCEPTED
5148        };
5149        let expiry = (Utc::now() + chrono::Duration::days(30))
5150            .format("%a, %d %b %Y %H:%M:%S GMT")
5151            .to_string();
5152        obj.restore_ongoing = Some(false);
5153        obj.restore_expiry = Some(expiry);
5154        Ok(AwsResponse {
5155            status,
5156            content_type: "application/xml".to_string(),
5157            body: Bytes::new(),
5158            headers: HeaderMap::new(),
5159        })
5160    }
5161}
5162
5163// ---------------------------------------------------------------------------
5164// Conditional request helpers
5165// ---------------------------------------------------------------------------
5166
5167/// Truncate a DateTime to second-level precision (HTTP dates have no sub-second info).
5168fn truncate_to_seconds(dt: DateTime<Utc>) -> DateTime<Utc> {
5169    dt.with_nanosecond(0).unwrap_or(dt)
5170}
5171
5172fn check_get_conditionals(req: &AwsRequest, obj: &S3Object) -> Result<(), AwsServiceError> {
5173    let obj_etag = format!("\"{}\"", obj.etag);
5174    let obj_time = truncate_to_seconds(obj.last_modified);
5175
5176    // If-Match
5177    if let Some(if_match) = req.headers.get("if-match").and_then(|v| v.to_str().ok()) {
5178        if !etag_matches(if_match, &obj_etag) {
5179            return Err(precondition_failed("If-Match"));
5180        }
5181    }
5182
5183    // If-None-Match
5184    if let Some(if_none_match) = req
5185        .headers
5186        .get("if-none-match")
5187        .and_then(|v| v.to_str().ok())
5188    {
5189        if etag_matches(if_none_match, &obj_etag) {
5190            return Err(not_modified_with_etag(&obj_etag));
5191        }
5192    }
5193
5194    // If-Unmodified-Since
5195    if let Some(since) = req
5196        .headers
5197        .get("if-unmodified-since")
5198        .and_then(|v| v.to_str().ok())
5199    {
5200        if let Some(dt) = parse_http_date(since) {
5201            if obj_time > dt {
5202                return Err(precondition_failed("If-Unmodified-Since"));
5203            }
5204        }
5205    }
5206
5207    // If-Modified-Since
5208    if let Some(since) = req
5209        .headers
5210        .get("if-modified-since")
5211        .and_then(|v| v.to_str().ok())
5212    {
5213        if let Some(dt) = parse_http_date(since) {
5214            if obj_time <= dt {
5215                return Err(not_modified());
5216            }
5217        }
5218    }
5219
5220    Ok(())
5221}
5222
5223fn check_head_conditionals(req: &AwsRequest, obj: &S3Object) -> Result<(), AwsServiceError> {
5224    let obj_etag = format!("\"{}\"", obj.etag);
5225    let obj_time = truncate_to_seconds(obj.last_modified);
5226
5227    // If-Match
5228    if let Some(if_match) = req.headers.get("if-match").and_then(|v| v.to_str().ok()) {
5229        if !etag_matches(if_match, &obj_etag) {
5230            return Err(AwsServiceError::aws_error(
5231                StatusCode::PRECONDITION_FAILED,
5232                "412",
5233                "Precondition Failed",
5234            ));
5235        }
5236    }
5237
5238    // If-None-Match
5239    if let Some(if_none_match) = req
5240        .headers
5241        .get("if-none-match")
5242        .and_then(|v| v.to_str().ok())
5243    {
5244        if etag_matches(if_none_match, &obj_etag) {
5245            return Err(not_modified_with_etag(&obj_etag));
5246        }
5247    }
5248
5249    // If-Unmodified-Since
5250    if let Some(since) = req
5251        .headers
5252        .get("if-unmodified-since")
5253        .and_then(|v| v.to_str().ok())
5254    {
5255        if let Some(dt) = parse_http_date(since) {
5256            if obj_time > dt {
5257                return Err(AwsServiceError::aws_error(
5258                    StatusCode::PRECONDITION_FAILED,
5259                    "412",
5260                    "Precondition Failed",
5261                ));
5262            }
5263        }
5264    }
5265
5266    // If-Modified-Since
5267    if let Some(since) = req
5268        .headers
5269        .get("if-modified-since")
5270        .and_then(|v| v.to_str().ok())
5271    {
5272        if let Some(dt) = parse_http_date(since) {
5273            if obj_time <= dt {
5274                return Err(not_modified());
5275            }
5276        }
5277    }
5278
5279    Ok(())
5280}
5281
5282fn etag_matches(condition: &str, obj_etag: &str) -> bool {
5283    let condition = condition.trim();
5284    if condition == "*" {
5285        return true;
5286    }
5287    let clean_etag = obj_etag.replace('"', "");
5288    // Split on comma to handle multi-value If-Match / If-None-Match
5289    for part in condition.split(',') {
5290        let part = part.trim().replace('"', "");
5291        if part == clean_etag {
5292            return true;
5293        }
5294    }
5295    false
5296}
5297
5298fn parse_http_date(s: &str) -> Option<DateTime<Utc>> {
5299    // Try RFC 2822 format: "Sat, 01 Jan 2000 00:00:00 GMT"
5300    if let Ok(dt) = DateTime::parse_from_rfc2822(s) {
5301        return Some(dt.with_timezone(&Utc));
5302    }
5303    // Try RFC 3339
5304    if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
5305        return Some(dt.with_timezone(&Utc));
5306    }
5307    // Try common HTTP date format: "%a, %d %b %Y %H:%M:%S GMT"
5308    if let Ok(dt) =
5309        chrono::NaiveDateTime::parse_from_str(s.trim_end_matches(" GMT"), "%a, %d %b %Y %H:%M:%S")
5310    {
5311        return Some(dt.and_utc());
5312    }
5313    // Try ISO 8601
5314    if let Ok(dt) = s.parse::<DateTime<Utc>>() {
5315        return Some(dt);
5316    }
5317    None
5318}
5319
5320fn not_modified() -> AwsServiceError {
5321    AwsServiceError::aws_error(StatusCode::NOT_MODIFIED, "304", "Not Modified")
5322}
5323
5324fn not_modified_with_etag(etag: &str) -> AwsServiceError {
5325    AwsServiceError::aws_error_with_headers(
5326        StatusCode::NOT_MODIFIED,
5327        "304",
5328        "Not Modified",
5329        vec![("etag".to_string(), etag.to_string())],
5330    )
5331}
5332
5333fn precondition_failed(condition: &str) -> AwsServiceError {
5334    AwsServiceError::aws_error_with_fields(
5335        StatusCode::PRECONDITION_FAILED,
5336        "PreconditionFailed",
5337        "At least one of the pre-conditions you specified did not hold",
5338        vec![("Condition".to_string(), condition.to_string())],
5339    )
5340}
5341
5342// ---------------------------------------------------------------------------
5343// ACL helpers
5344// ---------------------------------------------------------------------------
5345
5346fn build_acl_xml(owner_id: &str, grants: &[AclGrant], _account_id: &str) -> String {
5347    let mut grants_xml = String::new();
5348    for g in grants {
5349        let grantee_xml = if g.grantee_type == "Group" {
5350            let uri = g.grantee_uri.as_deref().unwrap_or("");
5351            format!(
5352                "<Grantee xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xsi:type=\"Group\">\
5353                 <URI>{}</URI></Grantee>",
5354                xml_escape(uri),
5355            )
5356        } else {
5357            let id = g.grantee_id.as_deref().unwrap_or("");
5358            format!(
5359                "<Grantee xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xsi:type=\"CanonicalUser\">\
5360                 <ID>{}</ID></Grantee>",
5361                xml_escape(id),
5362            )
5363        };
5364        grants_xml.push_str(&format!(
5365            "<Grant>{grantee_xml}<Permission>{}</Permission></Grant>",
5366            xml_escape(&g.permission),
5367        ));
5368    }
5369
5370    format!(
5371        "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
5372         <AccessControlPolicy xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
5373         <Owner><ID>{owner_id}</ID><DisplayName>{owner_id}</DisplayName></Owner>\
5374         <AccessControlList>{grants_xml}</AccessControlList>\
5375         </AccessControlPolicy>",
5376        owner_id = xml_escape(owner_id),
5377    )
5378}
5379
5380fn canned_acl_grants(acl: &str, owner_id: &str) -> Vec<AclGrant> {
5381    let owner_grant = AclGrant {
5382        grantee_type: "CanonicalUser".to_string(),
5383        grantee_id: Some(owner_id.to_string()),
5384        grantee_display_name: Some(owner_id.to_string()),
5385        grantee_uri: None,
5386        permission: "FULL_CONTROL".to_string(),
5387    };
5388    match acl {
5389        "private" => vec![owner_grant],
5390        "public-read" => vec![
5391            owner_grant,
5392            AclGrant {
5393                grantee_type: "Group".to_string(),
5394                grantee_id: None,
5395                grantee_display_name: None,
5396                grantee_uri: Some("http://acs.amazonaws.com/groups/global/AllUsers".to_string()),
5397                permission: "READ".to_string(),
5398            },
5399        ],
5400        "public-read-write" => vec![
5401            owner_grant,
5402            AclGrant {
5403                grantee_type: "Group".to_string(),
5404                grantee_id: None,
5405                grantee_display_name: None,
5406                grantee_uri: Some("http://acs.amazonaws.com/groups/global/AllUsers".to_string()),
5407                permission: "READ".to_string(),
5408            },
5409            AclGrant {
5410                grantee_type: "Group".to_string(),
5411                grantee_id: None,
5412                grantee_display_name: None,
5413                grantee_uri: Some("http://acs.amazonaws.com/groups/global/AllUsers".to_string()),
5414                permission: "WRITE".to_string(),
5415            },
5416        ],
5417        "authenticated-read" => vec![
5418            owner_grant,
5419            AclGrant {
5420                grantee_type: "Group".to_string(),
5421                grantee_id: None,
5422                grantee_display_name: None,
5423                grantee_uri: Some(
5424                    "http://acs.amazonaws.com/groups/global/AuthenticatedUsers".to_string(),
5425                ),
5426                permission: "READ".to_string(),
5427            },
5428        ],
5429        "bucket-owner-full-control" => vec![owner_grant],
5430        _ => vec![owner_grant],
5431    }
5432}
5433
5434fn canned_acl_grants_for_object(acl: &str, owner_id: &str) -> Vec<AclGrant> {
5435    // For objects, canned ACLs work the same way
5436    canned_acl_grants(acl, owner_id)
5437}
5438
5439fn parse_grant_headers(headers: &HeaderMap) -> Vec<AclGrant> {
5440    let mut grants = Vec::new();
5441    let header_permission_map = [
5442        ("x-amz-grant-read", "READ"),
5443        ("x-amz-grant-write", "WRITE"),
5444        ("x-amz-grant-read-acp", "READ_ACP"),
5445        ("x-amz-grant-write-acp", "WRITE_ACP"),
5446        ("x-amz-grant-full-control", "FULL_CONTROL"),
5447    ];
5448
5449    for (header, permission) in &header_permission_map {
5450        if let Some(value) = headers.get(*header).and_then(|v| v.to_str().ok()) {
5451            // Parse "id=xxx" or "uri=xxx" or "emailAddress=xxx"
5452            for part in value.split(',') {
5453                let part = part.trim();
5454                if let Some((key, val)) = part.split_once('=') {
5455                    let val = val.trim().trim_matches('"');
5456                    let key = key.trim().to_lowercase();
5457                    match key.as_str() {
5458                        "id" => {
5459                            grants.push(AclGrant {
5460                                grantee_type: "CanonicalUser".to_string(),
5461                                grantee_id: Some(val.to_string()),
5462                                grantee_display_name: Some(val.to_string()),
5463                                grantee_uri: None,
5464                                permission: permission.to_string(),
5465                            });
5466                        }
5467                        "uri" | "url" => {
5468                            grants.push(AclGrant {
5469                                grantee_type: "Group".to_string(),
5470                                grantee_id: None,
5471                                grantee_display_name: None,
5472                                grantee_uri: Some(val.to_string()),
5473                                permission: permission.to_string(),
5474                            });
5475                        }
5476                        _ => {}
5477                    }
5478                }
5479            }
5480        }
5481    }
5482    grants
5483}
5484
5485fn parse_acl_xml(xml: &str) -> Result<Vec<AclGrant>, AwsServiceError> {
5486    // Check for Owner presence
5487    if xml.contains("<AccessControlPolicy") && !xml.contains("<Owner>") {
5488        return Err(AwsServiceError::aws_error(
5489            StatusCode::BAD_REQUEST,
5490            "MalformedACLError",
5491            "The XML you provided was not well-formed or did not validate against our published schema",
5492        ));
5493    }
5494
5495    let valid_permissions = ["READ", "WRITE", "READ_ACP", "WRITE_ACP", "FULL_CONTROL"];
5496
5497    let mut grants = Vec::new();
5498    let mut remaining = xml;
5499    while let Some(start) = remaining.find("<Grant>") {
5500        let after = &remaining[start + 7..];
5501        if let Some(end) = after.find("</Grant>") {
5502            let grant_body = &after[..end];
5503
5504            // Extract permission
5505            let permission = extract_xml_value(grant_body, "Permission").unwrap_or_default();
5506            if !valid_permissions.contains(&permission.as_str()) {
5507                return Err(AwsServiceError::aws_error(
5508                    StatusCode::BAD_REQUEST,
5509                    "MalformedACLError",
5510                    "The XML you provided was not well-formed or did not validate against our published schema",
5511                ));
5512            }
5513
5514            // Determine grantee type
5515            if grant_body.contains("xsi:type=\"Group\"") || grant_body.contains("<URI>") {
5516                let uri = extract_xml_value(grant_body, "URI").unwrap_or_default();
5517                grants.push(AclGrant {
5518                    grantee_type: "Group".to_string(),
5519                    grantee_id: None,
5520                    grantee_display_name: None,
5521                    grantee_uri: Some(uri),
5522                    permission,
5523                });
5524            } else {
5525                let id = extract_xml_value(grant_body, "ID").unwrap_or_default();
5526                let display =
5527                    extract_xml_value(grant_body, "DisplayName").unwrap_or_else(|| id.clone());
5528                grants.push(AclGrant {
5529                    grantee_type: "CanonicalUser".to_string(),
5530                    grantee_id: Some(id),
5531                    grantee_display_name: Some(display),
5532                    grantee_uri: None,
5533                    permission,
5534                });
5535            }
5536
5537            remaining = &after[end + 8..];
5538        } else {
5539            break;
5540        }
5541    }
5542    Ok(grants)
5543}
5544
5545// ---------------------------------------------------------------------------
5546// Range helpers
5547// ---------------------------------------------------------------------------
5548
5549enum RangeResult {
5550    Satisfiable { start: usize, end: usize },
5551    NotSatisfiable,
5552    Ignored,
5553}
5554
5555fn parse_range_header(range_str: &str, total_size: usize) -> Option<RangeResult> {
5556    let range_str = range_str.strip_prefix("bytes=")?;
5557    let (start_str, end_str) = range_str.split_once('-')?;
5558    if start_str.is_empty() {
5559        let suffix_len: usize = end_str.parse().ok()?;
5560        if suffix_len == 0 || total_size == 0 {
5561            return Some(RangeResult::NotSatisfiable);
5562        }
5563        let start = total_size.saturating_sub(suffix_len);
5564        Some(RangeResult::Satisfiable {
5565            start,
5566            end: total_size - 1,
5567        })
5568    } else {
5569        let start: usize = start_str.parse().ok()?;
5570        if start >= total_size {
5571            return Some(RangeResult::NotSatisfiable);
5572        }
5573        let end = if end_str.is_empty() {
5574            total_size - 1
5575        } else {
5576            let e: usize = end_str.parse().ok()?;
5577            if e < start {
5578                return Some(RangeResult::Ignored);
5579            }
5580            std::cmp::min(e, total_size - 1)
5581        };
5582        Some(RangeResult::Satisfiable { start, end })
5583    }
5584}
5585
5586// ---------------------------------------------------------------------------
5587// Helpers
5588// ---------------------------------------------------------------------------
5589
5590/// S3 XML response with `application/xml` content type (unlike Query protocol's `text/xml`).
5591fn s3_xml(status: StatusCode, body: impl Into<Bytes>) -> AwsResponse {
5592    AwsResponse {
5593        status,
5594        content_type: "application/xml".to_string(),
5595        body: body.into(),
5596        headers: HeaderMap::new(),
5597    }
5598}
5599
5600fn empty_response(status: StatusCode) -> AwsResponse {
5601    AwsResponse {
5602        status,
5603        content_type: "application/xml".to_string(),
5604        body: Bytes::new(),
5605        headers: HeaderMap::new(),
5606    }
5607}
5608
5609/// Returns true when the object is stored in a "cold" storage class (GLACIER, DEEP_ARCHIVE)
5610/// and has NOT been restored (or restore is still in progress).
5611fn is_frozen(obj: &S3Object) -> bool {
5612    matches!(obj.storage_class.as_str(), "GLACIER" | "DEEP_ARCHIVE")
5613        && obj.restore_ongoing != Some(false)
5614}
5615
5616fn no_such_bucket(bucket: &str) -> AwsServiceError {
5617    AwsServiceError::aws_error_with_fields(
5618        StatusCode::NOT_FOUND,
5619        "NoSuchBucket",
5620        "The specified bucket does not exist",
5621        vec![("BucketName".to_string(), bucket.to_string())],
5622    )
5623}
5624
5625fn no_such_key(key: &str) -> AwsServiceError {
5626    AwsServiceError::aws_error_with_fields(
5627        StatusCode::NOT_FOUND,
5628        "NoSuchKey",
5629        "The specified key does not exist.",
5630        vec![("Key".to_string(), key.to_string())],
5631    )
5632}
5633
5634fn no_such_upload(upload_id: &str) -> AwsServiceError {
5635    AwsServiceError::aws_error_with_fields(
5636        StatusCode::NOT_FOUND,
5637        "NoSuchUpload",
5638        "The specified upload does not exist. The upload ID may be invalid, \
5639         or the upload may have been aborted or completed.",
5640        vec![("UploadId".to_string(), upload_id.to_string())],
5641    )
5642}
5643
5644fn no_such_key_with_detail(key: &str) -> AwsServiceError {
5645    AwsServiceError::aws_error_with_fields(
5646        StatusCode::NOT_FOUND,
5647        "NoSuchKey",
5648        "The specified key does not exist.",
5649        vec![("Key".to_string(), key.to_string())],
5650    )
5651}
5652
5653fn compute_md5(data: &[u8]) -> String {
5654    let digest = Md5::digest(data);
5655    format!("{:x}", digest)
5656}
5657
5658fn compute_checksum(algorithm: &str, data: &[u8]) -> String {
5659    match algorithm {
5660        "CRC32" => {
5661            let crc = crc32fast::hash(data);
5662            BASE64.encode(crc.to_be_bytes())
5663        }
5664        "SHA1" => {
5665            use sha1::Digest as _;
5666            let hash = sha1::Sha1::digest(data);
5667            BASE64.encode(hash)
5668        }
5669        "SHA256" => {
5670            use sha2::Digest as _;
5671            let hash = sha2::Sha256::digest(data);
5672            BASE64.encode(hash)
5673        }
5674        _ => String::new(),
5675    }
5676}
5677
5678#[allow(dead_code)]
5679fn url_encode_key(s: &str) -> String {
5680    percent_encoding::utf8_percent_encode(s, percent_encoding::NON_ALPHANUMERIC).to_string()
5681}
5682
5683fn url_encode_s3_key(s: &str) -> String {
5684    let mut out = String::with_capacity(s.len() * 2);
5685    for byte in s.bytes() {
5686        match byte {
5687            b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' | b'/' => {
5688                out.push(byte as char);
5689            }
5690            _ => {
5691                out.push_str(&format!("%{:02X}", byte));
5692            }
5693        }
5694    }
5695    out
5696}
5697
5698fn xml_escape(s: &str) -> String {
5699    let mut out = String::with_capacity(s.len());
5700    for c in s.chars() {
5701        match c {
5702            '&' => out.push_str("&amp;"),
5703            '<' => out.push_str("&lt;"),
5704            '>' => out.push_str("&gt;"),
5705            '"' => out.push_str("&quot;"),
5706            '\'' => out.push_str("&apos;"),
5707            // XML 1.0 allows \t, \n, \r as valid characters; all other control chars
5708            // need to be encoded as numeric character references.
5709            c if (c as u32) < 0x20 && c != '\t' && c != '\n' && c != '\r' => {
5710                out.push_str(&format!("&#x{:X};", c as u32));
5711            }
5712            c => out.push(c),
5713        }
5714    }
5715    out
5716}
5717
5718fn extract_user_metadata(headers: &HeaderMap) -> std::collections::HashMap<String, String> {
5719    let mut meta = std::collections::HashMap::new();
5720    for (name, value) in headers {
5721        if let Some(key) = name.as_str().strip_prefix("x-amz-meta-") {
5722            if let Ok(v) = value.to_str() {
5723                meta.insert(key.to_string(), v.to_string());
5724            }
5725        }
5726    }
5727    meta
5728}
5729
5730fn is_valid_storage_class(class: &str) -> bool {
5731    matches!(
5732        class,
5733        "STANDARD"
5734            | "REDUCED_REDUNDANCY"
5735            | "STANDARD_IA"
5736            | "ONEZONE_IA"
5737            | "INTELLIGENT_TIERING"
5738            | "GLACIER"
5739            | "DEEP_ARCHIVE"
5740            | "GLACIER_IR"
5741            | "OUTPOSTS"
5742            | "SNOW"
5743            | "EXPRESS_ONEZONE"
5744    )
5745}
5746
5747fn is_valid_bucket_name(name: &str) -> bool {
5748    if name.len() < 3 || name.len() > 63 {
5749        return false;
5750    }
5751    // Must start and end with alphanumeric
5752    let bytes = name.as_bytes();
5753    if !bytes[0].is_ascii_alphanumeric() || !bytes[bytes.len() - 1].is_ascii_alphanumeric() {
5754        return false;
5755    }
5756    // Only lowercase letters, digits, hyphens, dots (also allow underscores for compatibility)
5757    name.chars()
5758        .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '-' || c == '.' || c == '_')
5759}
5760
5761fn is_valid_region(region: &str) -> bool {
5762    // Basic validation: region should match pattern like us-east-1, eu-west-2, etc.
5763    let valid_regions = [
5764        "us-east-1",
5765        "us-east-2",
5766        "us-west-1",
5767        "us-west-2",
5768        "af-south-1",
5769        "ap-east-1",
5770        "ap-south-1",
5771        "ap-south-2",
5772        "ap-southeast-1",
5773        "ap-southeast-2",
5774        "ap-southeast-3",
5775        "ap-southeast-4",
5776        "ap-northeast-1",
5777        "ap-northeast-2",
5778        "ap-northeast-3",
5779        "ca-central-1",
5780        "ca-west-1",
5781        "eu-central-1",
5782        "eu-central-2",
5783        "eu-west-1",
5784        "eu-west-2",
5785        "eu-west-3",
5786        "eu-south-1",
5787        "eu-south-2",
5788        "eu-north-1",
5789        "il-central-1",
5790        "me-south-1",
5791        "me-central-1",
5792        "sa-east-1",
5793        "cn-north-1",
5794        "cn-northwest-1",
5795        "us-gov-east-1",
5796        "us-gov-east-2",
5797        "us-gov-west-1",
5798        "us-iso-east-1",
5799        "us-iso-west-1",
5800        "us-isob-east-1",
5801        "us-isof-south-1",
5802    ];
5803    valid_regions.contains(&region)
5804}
5805
5806fn resolve_object<'a>(
5807    b: &'a S3Bucket,
5808    key: &str,
5809    version_id: Option<&String>,
5810) -> Result<&'a S3Object, AwsServiceError> {
5811    if let Some(vid) = version_id {
5812        // "null" version ID refers to an object with no version_id (pre-versioning)
5813        if vid == "null" {
5814            // Check versions for a pre-versioning object (version_id == None or Some("null"))
5815            if let Some(versions) = b.object_versions.get(key) {
5816                if let Some(obj) = versions
5817                    .iter()
5818                    .find(|o| o.version_id.is_none() || o.version_id.as_deref() == Some("null"))
5819                {
5820                    return Ok(obj);
5821                }
5822            }
5823            // Also check current object if it has no version_id
5824            if let Some(obj) = b.objects.get(key) {
5825                if obj.version_id.is_none() || obj.version_id.as_deref() == Some("null") {
5826                    return Ok(obj);
5827                }
5828            }
5829        } else {
5830            // When a specific versionId is requested, check versions first
5831            if let Some(versions) = b.object_versions.get(key) {
5832                if let Some(obj) = versions
5833                    .iter()
5834                    .find(|o| o.version_id.as_deref() == Some(vid.as_str()))
5835                {
5836                    return Ok(obj);
5837                }
5838            }
5839            // Also check current object
5840            if let Some(obj) = b.objects.get(key) {
5841                if obj.version_id.as_deref() == Some(vid.as_str()) {
5842                    return Ok(obj);
5843                }
5844            }
5845        }
5846        // For versioned buckets, return NoSuchVersion; for non-versioned, return 400
5847        if b.versioning.is_some() {
5848            Err(AwsServiceError::aws_error_with_fields(
5849                StatusCode::NOT_FOUND,
5850                "NoSuchVersion",
5851                "The specified version does not exist.",
5852                vec![
5853                    ("Key".to_string(), key.to_string()),
5854                    ("VersionId".to_string(), vid.to_string()),
5855                ],
5856            ))
5857        } else {
5858            Err(AwsServiceError::aws_error(
5859                StatusCode::BAD_REQUEST,
5860                "InvalidArgument",
5861                "Invalid version id specified",
5862            ))
5863        }
5864    } else {
5865        b.objects.get(key).ok_or_else(|| no_such_key(key))
5866    }
5867}
5868
5869fn make_delete_marker(key: &str, dm_id: &str) -> S3Object {
5870    S3Object {
5871        key: key.to_string(),
5872        data: Bytes::new(),
5873        content_type: String::new(),
5874        etag: String::new(),
5875        size: 0,
5876        last_modified: Utc::now(),
5877        metadata: std::collections::HashMap::new(),
5878        storage_class: "STANDARD".to_string(),
5879        tags: std::collections::HashMap::new(),
5880        acl_grants: vec![],
5881        acl_owner_id: None,
5882        parts_count: None,
5883        part_sizes: None,
5884        sse_algorithm: None,
5885        sse_kms_key_id: None,
5886        bucket_key_enabled: None,
5887        version_id: Some(dm_id.to_string()),
5888        is_delete_marker: true,
5889        content_encoding: None,
5890        website_redirect_location: None,
5891        restore_ongoing: None,
5892        restore_expiry: None,
5893        checksum_algorithm: None,
5894        checksum_value: None,
5895        lock_mode: None,
5896        lock_retain_until: None,
5897        lock_legal_hold: None,
5898    }
5899}
5900
5901#[allow(dead_code)]
5902fn acl_xml(owner_id: &str) -> String {
5903    format!(
5904        "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
5905         <AccessControlPolicy xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
5906         <Owner><ID>{owner_id}</ID><DisplayName>{owner_id}</DisplayName></Owner>\
5907         <AccessControlList><Grant>\
5908         <Grantee xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xsi:type=\"CanonicalUser\">\
5909         <ID>{owner_id}</ID><DisplayName>{owner_id}</DisplayName></Grantee>\
5910         <Permission>FULL_CONTROL</Permission></Grant></AccessControlList>\
5911         </AccessControlPolicy>"
5912    )
5913}
5914
5915/// Represents an object to delete in a batch delete request.
5916struct DeleteObjectEntry {
5917    key: String,
5918    version_id: Option<String>,
5919}
5920
5921fn parse_delete_objects_xml(xml: &str) -> Vec<DeleteObjectEntry> {
5922    let mut entries = Vec::new();
5923    let mut remaining = xml;
5924    while let Some(obj_start) = remaining.find("<Object>") {
5925        let after = &remaining[obj_start + 8..];
5926        if let Some(obj_end) = after.find("</Object>") {
5927            let obj_body = &after[..obj_end];
5928            let key = extract_xml_value(obj_body, "Key");
5929            let version_id = extract_xml_value(obj_body, "VersionId");
5930            if let Some(k) = key {
5931                entries.push(DeleteObjectEntry { key: k, version_id });
5932            }
5933            remaining = &after[obj_end + 9..];
5934        } else {
5935            break;
5936        }
5937    }
5938    entries
5939}
5940
5941/// Minimal XML parser for `<Tagging><TagSet><Tag><Key>k</Key><Value>v</Value></Tag>...`.
5942/// Returns a Vec to preserve insertion order and detect duplicates.
5943fn parse_tagging_xml(xml: &str) -> Vec<(String, String)> {
5944    let mut tags = Vec::new();
5945    let mut remaining = xml;
5946    while let Some(tag_start) = remaining.find("<Tag>") {
5947        let after = &remaining[tag_start + 5..];
5948        if let Some(tag_end) = after.find("</Tag>") {
5949            let tag_body = &after[..tag_end];
5950            let key = extract_xml_value(tag_body, "Key");
5951            let value = extract_xml_value(tag_body, "Value");
5952            if let (Some(k), Some(v)) = (key, value) {
5953                tags.push((k, v));
5954            }
5955            remaining = &after[tag_end + 6..];
5956        } else {
5957            break;
5958        }
5959    }
5960    tags
5961}
5962
5963fn validate_tags(tags: &[(String, String)]) -> Result<(), AwsServiceError> {
5964    // Check for duplicate keys
5965    let mut seen = std::collections::HashSet::new();
5966    for (k, _) in tags {
5967        if !seen.insert(k.as_str()) {
5968            return Err(AwsServiceError::aws_error(
5969                StatusCode::BAD_REQUEST,
5970                "InvalidTag",
5971                "Cannot provide multiple Tags with the same key",
5972            ));
5973        }
5974        // Check for aws: prefix
5975        if k.starts_with("aws:") {
5976            return Err(AwsServiceError::aws_error(
5977                StatusCode::BAD_REQUEST,
5978                "InvalidTag",
5979                "System tags cannot be added/updated by requester",
5980            ));
5981        }
5982    }
5983    Ok(())
5984}
5985
5986fn extract_xml_value(xml: &str, tag: &str) -> Option<String> {
5987    // Handle self-closing tags like <Value /> or <Value/>
5988    let self_closing1 = format!("<{tag} />");
5989    let self_closing2 = format!("<{tag}/>");
5990    if xml.contains(&self_closing1) || xml.contains(&self_closing2) {
5991        // Check if the self-closing tag appears before any open+close pair
5992        let self_pos = xml
5993            .find(&self_closing1)
5994            .or_else(|| xml.find(&self_closing2));
5995        let open = format!("<{tag}>");
5996        let open_pos = xml.find(&open);
5997        match (self_pos, open_pos) {
5998            (Some(sp), Some(op)) if sp < op => return Some(String::new()),
5999            (Some(_), None) => return Some(String::new()),
6000            _ => {}
6001        }
6002    }
6003
6004    let open = format!("<{tag}>");
6005    let close = format!("</{tag}>");
6006    let start = xml.find(&open)? + open.len();
6007    let end = xml.find(&close)?;
6008    Some(xml[start..end].to_string())
6009}
6010
6011/// Parse the CompleteMultipartUpload XML body into (part_number, etag) pairs.
6012fn parse_complete_multipart_xml(xml: &str) -> Vec<(u32, String)> {
6013    let mut parts = Vec::new();
6014    let mut remaining = xml;
6015    while let Some(part_start) = remaining.find("<Part>") {
6016        let after = &remaining[part_start + 6..];
6017        if let Some(part_end) = after.find("</Part>") {
6018            let part_body = &after[..part_end];
6019            let part_num =
6020                extract_xml_value(part_body, "PartNumber").and_then(|s| s.parse::<u32>().ok());
6021            let etag = extract_xml_value(part_body, "ETag").map(|s| s.replace('"', ""));
6022            if let (Some(num), Some(e)) = (part_num, etag) {
6023                parts.push((num, e));
6024            }
6025            remaining = &after[part_end + 7..];
6026        } else {
6027            break;
6028        }
6029    }
6030    parts
6031}
6032
6033fn parse_url_encoded_tags(s: &str) -> Vec<(String, String)> {
6034    let mut tags = Vec::new();
6035    for pair in s.split('&') {
6036        if pair.is_empty() {
6037            continue;
6038        }
6039        let (key, value) = match pair.find('=') {
6040            Some(pos) => (&pair[..pos], &pair[pos + 1..]),
6041            None => (pair, ""),
6042        };
6043        tags.push((
6044            percent_encoding::percent_decode_str(key)
6045                .decode_utf8_lossy()
6046                .to_string(),
6047            percent_encoding::percent_decode_str(value)
6048                .decode_utf8_lossy()
6049                .to_string(),
6050        ));
6051    }
6052    tags
6053}
6054
6055/// Validate lifecycle configuration XML. Returns MalformedXML on invalid configs.
6056fn validate_lifecycle_xml(xml: &str) -> Result<(), AwsServiceError> {
6057    let malformed = || {
6058        AwsServiceError::aws_error(
6059            StatusCode::BAD_REQUEST,
6060            "MalformedXML",
6061            "The XML you provided was not well-formed or did not validate against our published schema",
6062        )
6063    };
6064
6065    let mut remaining = xml;
6066    while let Some(rule_start) = remaining.find("<Rule>") {
6067        let after = &remaining[rule_start + 6..];
6068        if let Some(rule_end) = after.find("</Rule>") {
6069            let rule_body = &after[..rule_end];
6070
6071            // Must have Filter or Prefix
6072            let has_filter = rule_body.contains("<Filter>")
6073                || rule_body.contains("<Filter/>")
6074                || rule_body.contains("<Filter />");
6075
6076            // Check for <Prefix> at rule level (outside of <Filter>...</Filter>)
6077            let has_prefix_outside_filter = {
6078                if !rule_body.contains("<Prefix") {
6079                    false
6080                } else if !has_filter {
6081                    true // No filter means any Prefix is at rule level
6082                } else {
6083                    // Remove the Filter block and check if Prefix remains
6084                    let mut stripped = rule_body.to_string();
6085                    // Remove <Filter>...</Filter> or self-closing variants
6086                    if let Some(fs) = stripped.find("<Filter") {
6087                        if let Some(fe) = stripped.find("</Filter>") {
6088                            stripped = format!("{}{}", &stripped[..fs], &stripped[fe + 9..]);
6089                        }
6090                    }
6091                    stripped.contains("<Prefix")
6092                }
6093            };
6094
6095            if !has_filter && !has_prefix_outside_filter {
6096                return Err(malformed());
6097            }
6098            // Can't have both Filter and rule-level Prefix
6099            if has_filter && has_prefix_outside_filter {
6100                return Err(malformed());
6101            }
6102
6103            // Expiration: if has ExpiredObjectDeleteMarker, cannot also have Days or Date
6104            // (only check within <Expiration> block)
6105            if let Some(exp_start) = rule_body.find("<Expiration>") {
6106                if let Some(exp_end) = rule_body[exp_start..].find("</Expiration>") {
6107                    let exp_body = &rule_body[exp_start..exp_start + exp_end];
6108                    if exp_body.contains("<ExpiredObjectDeleteMarker>")
6109                        && (exp_body.contains("<Days>") || exp_body.contains("<Date>"))
6110                    {
6111                        return Err(malformed());
6112                    }
6113                }
6114            }
6115
6116            // Filter validation
6117            if has_filter {
6118                if let Some(fs) = rule_body.find("<Filter>") {
6119                    if let Some(fe) = rule_body.find("</Filter>") {
6120                        let filter_body = &rule_body[fs + 8..fe];
6121                        let has_prefix_in_filter = filter_body.contains("<Prefix");
6122                        let has_tag_in_filter = filter_body.contains("<Tag>");
6123                        let has_and_in_filter = filter_body.contains("<And>");
6124                        // Can't have both Prefix and Tag without And
6125                        if has_prefix_in_filter && has_tag_in_filter && !has_and_in_filter {
6126                            return Err(malformed());
6127                        }
6128                        // Can't have Tag and And simultaneously at the Filter level
6129                        if has_tag_in_filter && has_and_in_filter {
6130                            // Check if the <Tag> is outside <And>
6131                            let and_start = filter_body.find("<And>").unwrap_or(0);
6132                            let tag_pos = filter_body.find("<Tag>").unwrap_or(0);
6133                            if tag_pos < and_start {
6134                                return Err(malformed());
6135                            }
6136                        }
6137                    }
6138                }
6139            }
6140
6141            // NoncurrentVersionTransition must have NoncurrentDays and StorageClass
6142            if rule_body.contains("<NoncurrentVersionTransition>") {
6143                let mut nvt_remaining = rule_body;
6144                while let Some(nvt_start) = nvt_remaining.find("<NoncurrentVersionTransition>") {
6145                    let nvt_after = &nvt_remaining[nvt_start + 29..];
6146                    if let Some(nvt_end) = nvt_after.find("</NoncurrentVersionTransition>") {
6147                        let nvt_body = &nvt_after[..nvt_end];
6148                        if !nvt_body.contains("<NoncurrentDays>") {
6149                            return Err(malformed());
6150                        }
6151                        if !nvt_body.contains("<StorageClass>") {
6152                            return Err(malformed());
6153                        }
6154                        nvt_remaining = &nvt_after[nvt_end + 30..];
6155                    } else {
6156                        break;
6157                    }
6158                }
6159            }
6160
6161            remaining = &after[rule_end + 7..];
6162        } else {
6163            break;
6164        }
6165    }
6166
6167    Ok(())
6168}
6169
6170/// Normalize replication configuration XML to include defaults AWS adds.
6171/// Auto-generate `<Id>` for notification configuration elements that lack one.
6172fn normalize_notification_ids(xml: &str) -> String {
6173    let config_tags = [
6174        "TopicConfiguration",
6175        "QueueConfiguration",
6176        "CloudFunctionConfiguration",
6177    ];
6178    let mut result = xml.to_string();
6179    for tag in &config_tags {
6180        let open = format!("<{tag}>");
6181        let close = format!("</{tag}>");
6182        let mut output = String::new();
6183        let mut remaining = result.as_str();
6184        while let Some(start) = remaining.find(&open) {
6185            output.push_str(&remaining[..start]);
6186            let after = &remaining[start + open.len()..];
6187            if let Some(end) = after.find(&close) {
6188                let body = &after[..end];
6189                output.push_str(&open);
6190                if !body.contains("<Id>") {
6191                    output.push_str(&format!("<Id>{}</Id>", uuid::Uuid::new_v4()));
6192                }
6193                output.push_str(body);
6194                output.push_str(&close);
6195                remaining = &after[end + close.len()..];
6196            } else {
6197                output.push_str(&open);
6198                output.push_str(after);
6199                remaining = "";
6200                break;
6201            }
6202        }
6203        output.push_str(remaining);
6204        result = output;
6205    }
6206    result
6207}
6208
6209fn normalize_replication_xml(xml: &str) -> String {
6210    let mut result = String::new();
6211    let mut remaining = xml;
6212    let mut auto_priority: u32 = 0;
6213
6214    // Find and process everything before the first <Rule>
6215    if let Some(first_rule) = remaining.find("<Rule>") {
6216        result.push_str(&remaining[..first_rule]);
6217        remaining = &remaining[first_rule..];
6218    } else {
6219        return xml.to_string();
6220    }
6221
6222    // Process each <Rule>
6223    while let Some(rule_start) = remaining.find("<Rule>") {
6224        let after = &remaining[rule_start + 6..];
6225        if let Some(rule_end) = after.find("</Rule>") {
6226            let rule_body = &after[..rule_end];
6227
6228            // Extract fields from the rule
6229            let id = extract_xml_value(rule_body, "ID");
6230            let priority = extract_xml_value(rule_body, "Priority");
6231            let status =
6232                extract_xml_value(rule_body, "Status").unwrap_or_else(|| "Enabled".to_string());
6233
6234            // Extract Destination block (keep as-is)
6235            let destination = rule_body.find("<Destination>").and_then(|ds| {
6236                rule_body
6237                    .find("</Destination>")
6238                    .map(|de| rule_body[ds..de + 14].to_string())
6239            });
6240
6241            // Extract existing Filter if any
6242            let filter_block = rule_body.find("<Filter>").and_then(|fs| {
6243                rule_body
6244                    .find("</Filter>")
6245                    .map(|fe| rule_body[fs..fe + 9].to_string())
6246            });
6247
6248            // Extract DeleteMarkerReplication if any
6249            let dmr_block = rule_body.find("<DeleteMarkerReplication>").and_then(|ds| {
6250                rule_body
6251                    .find("</DeleteMarkerReplication>")
6252                    .map(|de| rule_body[ds..de + 25].to_string())
6253            });
6254
6255            // Build normalized rule
6256            result.push_str("<Rule>");
6257
6258            // DeleteMarkerReplication (default to Disabled)
6259            result.push_str(dmr_block.as_deref().unwrap_or(
6260                "<DeleteMarkerReplication><Status>Disabled</Status></DeleteMarkerReplication>",
6261            ));
6262
6263            // Destination
6264            if let Some(ref dest) = destination {
6265                result.push_str(dest);
6266            }
6267
6268            // Filter (default to empty prefix)
6269            result.push_str(
6270                filter_block
6271                    .as_deref()
6272                    .unwrap_or("<Filter><Prefix></Prefix></Filter>"),
6273            );
6274
6275            // ID (auto-generate if missing)
6276            let rule_id = id.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
6277            result.push_str(&format!("<ID>{}</ID>", xml_escape(&rule_id)));
6278
6279            // Priority (auto-assign if missing)
6280            auto_priority += 1;
6281            let p = priority
6282                .and_then(|v| v.parse::<u32>().ok())
6283                .unwrap_or(auto_priority);
6284            result.push_str(&format!("<Priority>{p}</Priority>"));
6285
6286            // Status
6287            result.push_str(&format!("<Status>{status}</Status>"));
6288
6289            result.push_str("</Rule>");
6290
6291            remaining = &after[rule_end + 7..];
6292        } else {
6293            result.push_str(&remaining[rule_start..]);
6294            break;
6295        }
6296    }
6297
6298    // Append anything after the last </Rule>
6299    result.push_str(remaining);
6300
6301    result
6302}
6303
6304/// Build an S3 event notification JSON payload.
6305fn build_s3_event_notification(
6306    event_name: &str,
6307    bucket_name: &str,
6308    key: &str,
6309    size: u64,
6310    etag: &str,
6311    region: &str,
6312) -> String {
6313    let event_time = Utc::now().format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string();
6314    serde_json::json!({
6315        "Records": [{
6316            "eventVersion": "2.1",
6317            "eventSource": "aws:s3",
6318            "awsRegion": region,
6319            "eventTime": event_time,
6320            "eventName": event_name,
6321            "s3": {
6322                "bucket": {
6323                    "name": bucket_name,
6324                    "arn": format!("arn:aws:s3:::{}", bucket_name)
6325                },
6326                "object": {
6327                    "key": key,
6328                    "size": size,
6329                    "eTag": etag
6330                }
6331            }
6332        }]
6333    })
6334    .to_string()
6335}
6336
6337/// Parsed notification target from the bucket notification config XML.
6338struct NotificationTarget {
6339    target_type: NotificationTargetType,
6340    arn: String,
6341    events: Vec<String>,
6342}
6343
6344enum NotificationTargetType {
6345    Sqs,
6346    Sns,
6347}
6348
6349/// Parse the bucket notification configuration XML into targets.
6350fn parse_notification_config(xml: &str) -> Vec<NotificationTarget> {
6351    let mut targets = Vec::new();
6352
6353    // Parse QueueConfiguration entries
6354    let mut remaining = xml;
6355    while let Some(start) = remaining.find("<QueueConfiguration>") {
6356        let after = &remaining[start + 20..];
6357        if let Some(end) = after.find("</QueueConfiguration>") {
6358            let block = &after[..end];
6359            if let Some(arn) = extract_xml_value(block, "Queue") {
6360                let events = extract_all_xml_values(block, "Event");
6361                targets.push(NotificationTarget {
6362                    target_type: NotificationTargetType::Sqs,
6363                    arn,
6364                    events,
6365                });
6366            }
6367            remaining = &after[end + 21..];
6368        } else {
6369            break;
6370        }
6371    }
6372
6373    // Parse TopicConfiguration entries
6374    remaining = xml;
6375    while let Some(start) = remaining.find("<TopicConfiguration>") {
6376        let after = &remaining[start + 20..];
6377        if let Some(end) = after.find("</TopicConfiguration>") {
6378            let block = &after[..end];
6379            if let Some(arn) = extract_xml_value(block, "Topic") {
6380                let events = extract_all_xml_values(block, "Event");
6381                targets.push(NotificationTarget {
6382                    target_type: NotificationTargetType::Sns,
6383                    arn,
6384                    events,
6385                });
6386            }
6387            remaining = &after[end + 21..];
6388        } else {
6389            break;
6390        }
6391    }
6392
6393    targets
6394}
6395
6396/// Extract all values for a given XML tag (multiple occurrences).
6397fn extract_all_xml_values(xml: &str, tag: &str) -> Vec<String> {
6398    let mut values = Vec::new();
6399    let open = format!("<{tag}>");
6400    let close = format!("</{tag}>");
6401    let mut remaining = xml;
6402    while let Some(start) = remaining.find(&open) {
6403        let after = &remaining[start + open.len()..];
6404        if let Some(end) = after.find(&close) {
6405            values.push(after[..end].to_string());
6406            remaining = &after[end + close.len()..];
6407        } else {
6408            break;
6409        }
6410    }
6411    values
6412}
6413
6414/// Check if an S3 event name matches a notification event filter.
6415fn event_matches(event_name: &str, filter: &str) -> bool {
6416    // Exact match
6417    if filter == event_name {
6418        return true;
6419    }
6420    // Wildcard: s3:ObjectCreated:* matches s3:ObjectCreated:Put, etc.
6421    if filter.ends_with(":*") {
6422        let prefix = &filter[..filter.len() - 1]; // "s3:ObjectCreated:"
6423        if event_name.starts_with(prefix) {
6424            return true;
6425        }
6426    }
6427    // s3:* matches everything
6428    if filter == "s3:*" {
6429        return true;
6430    }
6431    false
6432}
6433
6434/// Deliver S3 event notifications for a bucket operation.
6435#[allow(clippy::too_many_arguments)]
6436fn deliver_notifications(
6437    delivery: &DeliveryBus,
6438    notification_config: &str,
6439    event_name: &str,
6440    bucket_name: &str,
6441    key: &str,
6442    size: u64,
6443    etag: &str,
6444    region: &str,
6445) {
6446    let targets = parse_notification_config(notification_config);
6447    let s3_event_name = format!("s3:{event_name}");
6448    let message = build_s3_event_notification(event_name, bucket_name, key, size, etag, region);
6449
6450    for target in &targets {
6451        let matches = target.events.is_empty()
6452            || target
6453                .events
6454                .iter()
6455                .any(|f| event_matches(&s3_event_name, f));
6456        if !matches {
6457            continue;
6458        }
6459        match target.target_type {
6460            NotificationTargetType::Sqs => {
6461                delivery.send_to_sqs(&target.arn, &message, &std::collections::HashMap::new());
6462            }
6463            NotificationTargetType::Sns => {
6464                delivery.publish_to_sns(&target.arn, &message, Some("Amazon S3 Notification"));
6465            }
6466        }
6467    }
6468}
6469
6470/// Parse a CORS rule from XML.
6471#[derive(Debug, Clone)]
6472struct CorsRule {
6473    allowed_origins: Vec<String>,
6474    allowed_methods: Vec<String>,
6475    allowed_headers: Vec<String>,
6476    expose_headers: Vec<String>,
6477    max_age_seconds: Option<u32>,
6478}
6479
6480/// Parse CORS configuration XML into rules.
6481fn parse_cors_config(xml: &str) -> Vec<CorsRule> {
6482    let mut rules = Vec::new();
6483    let mut remaining = xml;
6484    while let Some(start) = remaining.find("<CORSRule>") {
6485        let after = &remaining[start + 10..];
6486        if let Some(end) = after.find("</CORSRule>") {
6487            let block = &after[..end];
6488            let allowed_origins = extract_all_xml_values(block, "AllowedOrigin");
6489            let allowed_methods = extract_all_xml_values(block, "AllowedMethod");
6490            let allowed_headers = extract_all_xml_values(block, "AllowedHeader");
6491            let expose_headers = extract_all_xml_values(block, "ExposeHeader");
6492            let max_age_seconds =
6493                extract_xml_value(block, "MaxAgeSeconds").and_then(|s| s.parse().ok());
6494            rules.push(CorsRule {
6495                allowed_origins,
6496                allowed_methods,
6497                allowed_headers,
6498                expose_headers,
6499                max_age_seconds,
6500            });
6501            remaining = &after[end + 11..];
6502        } else {
6503            break;
6504        }
6505    }
6506    rules
6507}
6508
6509/// Match an origin against a CORS allowed origin pattern (supports "*" wildcard).
6510fn origin_matches(origin: &str, pattern: &str) -> bool {
6511    if pattern == "*" {
6512        return true;
6513    }
6514    // Simple wildcard: *.example.com
6515    if let Some(suffix) = pattern.strip_prefix('*') {
6516        return origin.ends_with(suffix);
6517    }
6518    origin == pattern
6519}
6520
6521/// Find the matching CORS rule for a given origin and method.
6522fn find_cors_rule<'a>(
6523    rules: &'a [CorsRule],
6524    origin: &str,
6525    method: Option<&str>,
6526) -> Option<&'a CorsRule> {
6527    rules.iter().find(|rule| {
6528        let origin_ok = rule
6529            .allowed_origins
6530            .iter()
6531            .any(|o| origin_matches(origin, o));
6532        let method_ok = match method {
6533            Some(m) => rule.allowed_methods.iter().any(|am| am == m),
6534            None => true,
6535        };
6536        origin_ok && method_ok
6537    })
6538}
6539
6540/// Check if an object is locked (retention or legal hold) and should block mutation.
6541/// Returns an error string if locked, None if allowed.
6542fn check_object_lock_for_overwrite(obj: &S3Object, req: &AwsRequest) -> Option<&'static str> {
6543    // Legal hold blocks overwrite
6544    if obj.lock_legal_hold.as_deref() == Some("ON") {
6545        return Some("AccessDenied");
6546    }
6547    // Retention check
6548    if let (Some(mode), Some(until)) = (&obj.lock_mode, &obj.lock_retain_until) {
6549        if *until > Utc::now() {
6550            if mode == "COMPLIANCE" {
6551                return Some("AccessDenied");
6552            }
6553            if mode == "GOVERNANCE" {
6554                let bypass = req
6555                    .headers
6556                    .get("x-amz-bypass-governance-retention")
6557                    .and_then(|v| v.to_str().ok())
6558                    .map(|s| s.eq_ignore_ascii_case("true"))
6559                    .unwrap_or(false);
6560                if !bypass {
6561                    return Some("AccessDenied");
6562                }
6563            }
6564        }
6565    }
6566    None
6567}
6568
6569#[cfg(test)]
6570mod tests {
6571    use super::*;
6572
6573    #[test]
6574    fn valid_bucket_names() {
6575        assert!(is_valid_bucket_name("my-bucket"));
6576        assert!(is_valid_bucket_name("my.bucket.name"));
6577        assert!(is_valid_bucket_name("abc"));
6578        assert!(!is_valid_bucket_name("ab"));
6579        assert!(!is_valid_bucket_name("-bucket"));
6580        assert!(!is_valid_bucket_name("Bucket"));
6581        assert!(!is_valid_bucket_name("bucket-"));
6582    }
6583
6584    #[test]
6585    fn parse_delete_xml() {
6586        let xml = r#"<Delete><Object><Key>a.txt</Key></Object><Object><Key>b/c.txt</Key></Object></Delete>"#;
6587        let entries = parse_delete_objects_xml(xml);
6588        assert_eq!(entries.len(), 2);
6589        assert_eq!(entries[0].key, "a.txt");
6590        assert!(entries[0].version_id.is_none());
6591        assert_eq!(entries[1].key, "b/c.txt");
6592    }
6593
6594    #[test]
6595    fn parse_delete_xml_with_version() {
6596        let xml = r#"<Delete><Object><Key>a.txt</Key><VersionId>v1</VersionId></Object></Delete>"#;
6597        let entries = parse_delete_objects_xml(xml);
6598        assert_eq!(entries.len(), 1);
6599        assert_eq!(entries[0].key, "a.txt");
6600        assert_eq!(entries[0].version_id.as_deref(), Some("v1"));
6601    }
6602
6603    #[test]
6604    fn parse_tags_xml() {
6605        let xml =
6606            r#"<Tagging><TagSet><Tag><Key>env</Key><Value>prod</Value></Tag></TagSet></Tagging>"#;
6607        let tags = parse_tagging_xml(xml);
6608        assert_eq!(tags, vec![("env".to_string(), "prod".to_string())]);
6609    }
6610
6611    #[test]
6612    fn md5_hash() {
6613        let hash = compute_md5(b"hello");
6614        assert_eq!(hash, "5d41402abc4b2a76b9719d911017c592");
6615    }
6616
6617    #[test]
6618    fn test_etag_matches() {
6619        assert!(etag_matches("\"abc\"", "\"abc\""));
6620        assert!(etag_matches("abc", "\"abc\""));
6621        assert!(etag_matches("*", "\"abc\""));
6622        assert!(!etag_matches("\"xyz\"", "\"abc\""));
6623    }
6624
6625    #[test]
6626    fn test_event_matches() {
6627        assert!(event_matches("s3:ObjectCreated:Put", "s3:ObjectCreated:*"));
6628        assert!(event_matches("s3:ObjectCreated:Copy", "s3:ObjectCreated:*"));
6629        assert!(event_matches(
6630            "s3:ObjectRemoved:Delete",
6631            "s3:ObjectRemoved:*"
6632        ));
6633        assert!(!event_matches(
6634            "s3:ObjectRemoved:Delete",
6635            "s3:ObjectCreated:*"
6636        ));
6637        assert!(event_matches(
6638            "s3:ObjectCreated:Put",
6639            "s3:ObjectCreated:Put"
6640        ));
6641        assert!(event_matches("s3:ObjectCreated:Put", "s3:*"));
6642    }
6643
6644    #[test]
6645    fn test_parse_notification_config() {
6646        let xml = r#"<NotificationConfiguration>
6647            <QueueConfiguration>
6648                <Queue>arn:aws:sqs:us-east-1:123456789012:my-queue</Queue>
6649                <Event>s3:ObjectCreated:*</Event>
6650            </QueueConfiguration>
6651            <TopicConfiguration>
6652                <Topic>arn:aws:sns:us-east-1:123456789012:my-topic</Topic>
6653                <Event>s3:ObjectRemoved:*</Event>
6654            </TopicConfiguration>
6655        </NotificationConfiguration>"#;
6656        let targets = parse_notification_config(xml);
6657        assert_eq!(targets.len(), 2);
6658        assert_eq!(
6659            targets[0].arn,
6660            "arn:aws:sqs:us-east-1:123456789012:my-queue"
6661        );
6662        assert_eq!(targets[0].events, vec!["s3:ObjectCreated:*"]);
6663        assert_eq!(
6664            targets[1].arn,
6665            "arn:aws:sns:us-east-1:123456789012:my-topic"
6666        );
6667        assert_eq!(targets[1].events, vec!["s3:ObjectRemoved:*"]);
6668    }
6669
6670    #[test]
6671    fn test_parse_cors_config() {
6672        let xml = r#"<CORSConfiguration>
6673            <CORSRule>
6674                <AllowedOrigin>https://example.com</AllowedOrigin>
6675                <AllowedMethod>GET</AllowedMethod>
6676                <AllowedMethod>PUT</AllowedMethod>
6677                <AllowedHeader>*</AllowedHeader>
6678                <ExposeHeader>x-amz-request-id</ExposeHeader>
6679                <MaxAgeSeconds>3600</MaxAgeSeconds>
6680            </CORSRule>
6681        </CORSConfiguration>"#;
6682        let rules = parse_cors_config(xml);
6683        assert_eq!(rules.len(), 1);
6684        assert_eq!(rules[0].allowed_origins, vec!["https://example.com"]);
6685        assert_eq!(rules[0].allowed_methods, vec!["GET", "PUT"]);
6686        assert_eq!(rules[0].allowed_headers, vec!["*"]);
6687        assert_eq!(rules[0].expose_headers, vec!["x-amz-request-id"]);
6688        assert_eq!(rules[0].max_age_seconds, Some(3600));
6689    }
6690
6691    #[test]
6692    fn test_origin_matches() {
6693        assert!(origin_matches("https://example.com", "https://example.com"));
6694        assert!(origin_matches("https://example.com", "*"));
6695        assert!(origin_matches("https://foo.example.com", "*.example.com"));
6696        assert!(!origin_matches("https://evil.com", "https://example.com"));
6697    }
6698
6699    /// Regression: resolve_object with versionId="null" must match objects
6700    /// whose version_id is either None or Some("null").
6701    #[test]
6702    fn resolve_null_version_matches_both_none_and_null_string() {
6703        use crate::state::S3Bucket;
6704        use bytes::Bytes;
6705        use chrono::Utc;
6706
6707        let mut b = S3Bucket::new("test", "us-east-1", "owner");
6708
6709        // Helper to create a minimal S3Object
6710        let make_obj = |key: &str, vid: Option<&str>| crate::state::S3Object {
6711            key: key.to_string(),
6712            data: Bytes::from_static(b"x"),
6713            content_type: "text/plain".to_string(),
6714            etag: "\"abc\"".to_string(),
6715            size: 1,
6716            last_modified: Utc::now(),
6717            metadata: Default::default(),
6718            storage_class: "STANDARD".to_string(),
6719            tags: Default::default(),
6720            acl_grants: vec![],
6721            acl_owner_id: None,
6722            parts_count: None,
6723            part_sizes: None,
6724            sse_algorithm: None,
6725            sse_kms_key_id: None,
6726            bucket_key_enabled: None,
6727            version_id: vid.map(|s| s.to_string()),
6728            is_delete_marker: false,
6729            content_encoding: None,
6730            website_redirect_location: None,
6731            restore_ongoing: None,
6732            restore_expiry: None,
6733            checksum_algorithm: None,
6734            checksum_value: None,
6735            lock_mode: None,
6736            lock_retain_until: None,
6737            lock_legal_hold: None,
6738        };
6739
6740        // Object with version_id = Some("null") (pre-versioning migrated)
6741        let obj = make_obj("file.txt", Some("null"));
6742        b.objects.insert("file.txt".to_string(), obj.clone());
6743        b.object_versions.insert("file.txt".to_string(), vec![obj]);
6744
6745        let null_str = "null".to_string();
6746        let result = resolve_object(&b, "file.txt", Some(&null_str));
6747        assert!(
6748            result.is_ok(),
6749            "versionId=null should match version_id=Some(\"null\")"
6750        );
6751
6752        // Object with version_id = None (true pre-versioning)
6753        let obj2 = make_obj("file2.txt", None);
6754        b.objects.insert("file2.txt".to_string(), obj2.clone());
6755        b.object_versions
6756            .insert("file2.txt".to_string(), vec![obj2]);
6757
6758        let result2 = resolve_object(&b, "file2.txt", Some(&null_str));
6759        assert!(
6760            result2.is_ok(),
6761            "versionId=null should match version_id=None"
6762        );
6763    }
6764}